Learning Apache Spark with PySpark & Databricks

Get started with Apache Spark in part 1 of our series where we leverage Databricks and PySpark.

Learning Apache Spark with PySpark & Databricks

    Something we've only begun to touch on so far is the benefit of utilizing Apache Spark is larger-scale data pipelines. Spark is a quintessential part of the Apache data stack: built atop of Hadoop, Spark is intended to handle resource-intensive jobs such as data streaming and graph processing.

    Much of Spark's allure comes from the fact that it is written in Scala & Java. Java and its offshoot languages are notorious for running extremely memory-heavy at run time, which can be used to our advantage. Because our jobs become predictably resource-intensive as everything is stored in memory, this allows us to scale up our Spark Clusters to accommodate accordingly.

    A Note About Spark vs. Hadoop

    It's crucial for us to understand where Spark fits in the greater Apache ecosystem. Technically, Spark is built atop of Hadoop: Spark borrows a lot from Hadoop's distributed file system thus comparing "Spark vs. Hadoop" isn't an accurate 1-to-1 comparison. Instead, it's best to think of Spark as being an alternative to Hadoop's MapReduce.

    When compared to Hadoop's MapReduce, Spark runs faster. Spark stores DataFrames in memory until otherwise stated, thus giving it a speed bonus over MapReduce, which writes to disk. Of course, Spark comes with the bonus of being accessible via Spark's Python library: PySpark.

    If you're interested in learning how these two play together, I recommend this video which breaks this down quite nicely. If you're still a bit green to Hadoop's MapReduce, look no further than this video, which is the best explanation I've found to date.

    Using Databricks to Get Started

    Spinning up a Spark cluster is a topic that deserves a post (or multiple posts) in itself. It's a non-trivial process that varies per cloud provider and isn't necessarily the right place to start for those just learning Spark. That's where Databricks comes in.

    Databricks is a private company co-founded from the original creator of Apache Spark. Databricks provides a clean notebook interface (similar to Jupyter) which is preconfigured to hook into a Spark cluster. Thus, we can dodge the initial setup associated with creating a cluster ourselves. Databricks' free Community Tier is perhaps the best way for anybody to get started learning Spark.

    An example Databricks Notebook.

    If we wanted, we could very well allow Databricks to own our entire Spark stack, aside from maybe where we keep our final data. Check it out the various tabs that make up the Databricks interface:

    • Data: This is where we control the input of data. Databricks is heavily integrated with AWS and Azure.
    • Clusters: Options for scaling our cluster of servers. Any changes made here are directly reflected in AWS.
    • Jobs: The cool things about Databricks notebooks is that they don't have to be just notebooks. By linking our notebooks to scheduled jobs, we can use our notebooks as actual scripts to execute regularly. Thus, our notebooks become entirely functional production code as a result.
    • Import Library: This one is a bit hard to find… it lives under Databricks > Import library. This is the tool we use to import third-party Python (or Java) libraries for usage in our notebooks. The easiest way to accomplish this is by selecting “PyPI”, and then provide the name of the library (ie: pandas). This will install the library on your cluster and make it available for use.

    Uploading Sample Data

    The easiest way to add some data to work with in Databricks is via their UI. Going back to the data tab, we have the options to hook into an S3 bucket, upload a CSV, or even select from sources such as Amazon Redshift or Apache Kaftka. Unfortunately, Google Cloud is not yet supported by Databricks 😡.

    We'll keep things simple and upload a CSV to kick things off:

    Databricks' interface for importing data.

    Upon uploading, we immediately get the option to Create Table in Notebook. It's best to move forward with this option, as it shows us how to programmatically load data into Spark for the future. Selecting this option will create a new notebook with some boilerplate for us.

    Types of Tabular Data in Spark

    The first thing we'll notice in the newly created notebook we just made is mention of DataFrames and Datasets. There is a third type of tabular data which exists as well,  called an RDD.

    • An RDD is a set of data populated across our many Spark clusters. At their core, RDDs use the combination of many Java or Scala objects to represent you data as though it were two-dimensional. RDD data can be generated from jus about any data source.
    • DataFrames are an abstraction of RDDs designed to be easier to use, similar to how we might expect SQL tables to work. As a result, DataFrames created from structured data only. Unlike RDDs, DataFrames automatically have a number of optimizations applied to them which make working with structured data faster and easier. Unlike RDDs, however, DataFrames cannot be modified after their initial creation. This is the main caveat of DataFrames, which which we'll explore in detail in a moment.
    • DataSets are type-safe tables of data. Pulling data into Spark from a data source happens via Datasets; these are are the lowest-level data type in spark.

    Creating Spark DataFrames

    The first cell of our new notebook shows us how to import data from a CSV using PySpark. There are a few options to take note of:

    • infer_schema will attempt to look at patterns in the data we've uploaded and automatically downcast each column to the proper data type. This has proven to be surprisingly accurate; let's preview this functionality by setting the value to "true".
    • first_row_is_header does exactly what you'd assume. We should also set this to "true".
    • delimiter allows us to separate columns by a character other than a comma. Leave this as ",", unless your Comma Separated Values are separated by something other than commas, you madman.

    As a result, our first cell should now look like this:

    # File location and type
    file_location = "/FileStore/tables/posts.csv"
    file_type = "csv"
    
    # CSV options
    infer_schema = "true"
    first_row_is_header = "true"
    delimiter = ","
    
    # The applied options are for CSV files. For other file types, these will be ignored.
    df = spark.read.format(file_type) \
      .option("inferSchema", infer_schema) \
      .option("header", first_row_is_header) \
      .option("sep", delimiter) \
      .load(file_location)
    
    display(df)
    

    We'll notice a few things that might look familiar, as well as a few quirks:

    • First off, Boolean values in PySpark are set by strings (either "true" or "false", as opposed to True or False). This is presumably an artifact of Java/Scala, as our Python code is translated into Java jobs.
    • Next is the presence of df, which you'll recognize as shorthand for DataFrame. PySpark has its own implementation of DataFrames. These are conceptually similar to what we already know with Pandas, but the syntax and underlying tech are quite different.
    • Finally, display() takes the place of print(). As we work, you'll find that Databricks has some very cool ways of displaying data previews.

    Our first cell is now ready to rock. Let's see what running this does:

    Our data loaded inline.

    Our data is now loaded into memory!

    Saving Our Data to Disk

    While most of the work we do will revolve around modifying data in memory, we'll eventually want to save our data to disk. We can do this by either creating a temporary view, which is only accessible by this notebook, or by creating a persistent table which can be accessed across views.

    Creating a temporary view is as simple as:

    # Create a temporary view or table
    
    temp_table_name = "sampledata"
    
    df.createOrReplaceTempView(temp_table_name)
    

    Alternatively, creating a persistent table looks like this:

    # Create a permanent table
    
    permanent_table_name = "JiraIssue_csv"
    
    df.write.format("parquet").saveAsTable(permanent_table_name)
    

    Writing SQL in Databricks

    You may have noticed that the auto-generated notebook contains a cell which begins with %sql, and then contains some SQL code. % denotes a magic function: specifically in this case, its a magic function which sets the programming language the cell will be written in. Cells which begin with %sql will be run as SQL code. %python is the default, but %scala, %java, and %r are supported as well.

    Writing SQL in a Databricks notebook has some very cool features. For example, check out what happens when we run a SQL query containing aggregate functions as per this example in the SQL quickstart notebook:

    Oh snap, we immediately see a visual of the aggregate we just ran!

    We could easily spend all day getting into the subtleties of Databricks SQL, but that's a post for another time.

    Spark DataFrame Basics

    DataFrames are the bread and butter of how we'll be working with data in Spark. Before we go further into Spark DataFrames, I'm obligated to mention three essential truths:

    • Immutable: Spark DataFrames like to be created once upfront, without being modified after the fact. As a result, the way we typically transform DataFrames is by creating a new DataFrame from the original, with transformations applied.
    • Lazy evaluation: DataFrame tasks are not executed until explicitly executed. This shouldn't be a new concept; those familiar with the PEP style guide know that most data-related tasks in Python are executed this way.
    • Distributed: Spark DataFrames are fault-tolerant and highly-available, much like vanilla Hadoop. Thus, we are at little risk of something going horribly wrong and wiping our DataFrame from existence due to external factors - if a node in our Spark cluster goes down, Spark can charge forward regardless.

    Great, but what does working with a Spark DataFrame look like? The general structure of modifying a Spark DataFrame typically looks like this:

    new_df = original_df.
        transformation_1(original_df).
        transformation_2(original_df).
        transformation_3(original_df)
    

    As we mentioned before, Spark DataFrames are immutable, so we need to create a new DataFrame from our original each time we'd like to make adjustments (AKA: new_df = original_df.etc().

    From there, we can chain together transformations to ensure we don't create a new DataFrame per transformation, which would be a ridiculous waste of memory.

    Built-in DataFrame Transformation Functions

    Before we get too deep into creating custom transformations, We should start by looking at which transformations are available to us out of the box. Take a moment to become familiar with the following built-in functions:

    explode()

    Similar to Panda's .apply(), explode applies a change to each row in the DataFrame and returns the result row-by-row.

    new_df = original_df.select(explode("posts").alias("p"))
    explode_DF = new_df.selectExpr("p.title", "p.content", "p.author", "p.published_at")
    
    explode_DF.show()
    

    filter() and where()

    Returns rows which match given criteria. The resulting DataFrame will return only rows where the conditional inside filter() returns True. For example, the below will return a DataFrame which only contains rows where the author column has a value of todd:

    filter_df = explode_df.filter(explode_df.author == "todd")
    
    display(filter_df)
    

    We can also use sort() to order our results:

    filter_df = explode_df.filter(explode_df.author == "todd").sort(explode_df.published_at)
    
    display(filter_df)
    

    .where() is another way of achieving the same effect:

    where_df = explode_df.where((col("firstName") == "todd")).sort(desc("published_at"))
    
    display(where_df)
    

    Not the difference in syntax between filter() and select()!

    dropna(), fillna(), and isNull()

    When dealing with N/A values, we can drop empty rows as such:

    dropped_na = explode_df.dropna()
    
    display(dropped_na)
    

    We can also replace N/A values with a string:

    filled_na = explode_df.fillna("Empty Value!")
    
    display(filled_na)
    

    We can also leverage select() and isNull() to select rows with null values:

    filtered_null_df = explodeDF.filter(col("author").isNull() | col("title").isNull()).sort("published_at")
    
    display(filtered_null_df)
    

    .agg() and aggregate functions

    from pyspark.sql.functions import countDistinct
    
    aggregate_df = explode_df.select("title", "author")\
      .groupBy("author")\
      .agg(countDistinct("author"))
    
    display(aggregate_df)
    

    Get Your Hands Dirty

    As much as I love throwing function definitions your way, attempting to memorize functions is no way to learn. At this point, I'd recommend playing around in a Databricks notebook of your own before going much further.

    We have a whole lot more ground to cover; we've only barely started to scratch the surface of what we can do with PySpark. We still need to create our own custom transformations and much, much more. Stick around for part 2!

    Todd Birchard's' avatar
    New York City Website
    Product manager turned engineer with an ongoing identity crisis. Breaks everything before learning best practices. Completely normal and emotionally stable.

    Product manager turned engineer with an ongoing identity crisis. Breaks everything before learning best practices. Completely normal and emotionally stable.