Working with PySpark RDDs

Working with PySpark RDDs

Working with Spark's original data structure API: Resilient Distributed Datasets.

    For being the lifeblood of Spark, there’s surprisingly little documentation on how to actually work with them. If I had to guess, most of the world has been too spoiled by DataFrames to be bothered with non-tabular data. Strange world we live in when using the core data API of Spark is considered a “pro move.”

    We've already spent an awful lot of time in this series speaking about DataFrames, which are only one of the 3 data structure APIs we can work with in Spark (or one of two data structure APIs in PySpark, if you're keeping score). After learning how to clean DataFrames, transform DataFrames, and even building structured streams from DataFrames, it seems now is as good a time as ever to take a step back and explore RDDs.

    What are RDDs?

    RDDs (Resilient Distributed Datasets) are Spark's OG way of working with data. RDDs are distributed because they're partitioned across the various nodes that make up your Spark cluster. RDDs are resilient because they're highly available- if a node in our cluster happens to crap out in the middle of a computation, the RDD will be automatically recovered from the nodes still in operation. If this sounds familiar, it's because DataFrames share these same properties (indeed, this renders the name "RDD" non-descriptive).

    The best way to think about RDDs is “one-dimensional” data, which includes both arrays and key/value stores. RDDs are great for performing transformations on unstructured data at a lower-level than DataFrames: if you're looking to clean or manipulate data on a level that lives before tabular data (such as just formatting text files, etc) it will benefit the performance of your pipelines to do this in an RDD first. Alternatively, if you never intend to output tabular data at all, RDDs will be your friend (and only option).

    Creating RDDs

    RDDs can be created with hard coded data using the parallelize() method, or from text files by using either textfile() or wholeTextFiles(). We’ll be using parallelize() for this next part.

    Types of RDDs

    RDDs typically follow one of three patterns: an array, a simple key/value store, and a key/value store consisting of arrays.A list RDD accepts input as simple as you might imagine - lists containing strings, numbers, or both:

    rdd = sc.parallelize([1, 5, 60, 'a', 9, 'c', 4, 'z', 'f'])

    Key/value RDDs are a bit more unique. Instead of accepting a dictionary as you might except, RDDs accept lists of tuples, where the first value is the “key” and the second value is the “value”. This is because RDDs allow multiple values for the same key, unlike Python dictionaries:

    rdd = sc.parallelize([('a', 6),
                          ('a', 1),
                          ('b', 2),
                          ('c', 5),
                          ('c', 8),
                          ('c', 11)])

    The last pattern we see often are key/value stores of dicts. These types of RDDs have some interesting methods available to them which we’ll explore later:

    rdd4 = sc.parallelize([("a",[1, 2, 3]), ("b",[4, 5])])

    Inspecting the Contents of RDDs

    The natural first thing to do with an RDD is to inspect it. There are a bunch of ways to investigate the data stored in an RDD, but the most fundamental one is using collect(). Calling collect() on an RDD lists all the contents of that RDD:

    print(rdd.collect())

    This should return something like the following:

    ('a', 7)
    ('a', 2)
    ('b', 2)
    ('c', 5)
    ('c', 8)
    ('c', 11)

    For huge RDDs, it's probably a good idea not to print every record in our RDD. Instead, we can use .take(n) to get only the first n entries:

    print(rdd.take(2))
    ('a', 7)
    ('a', 2)

    Summarizing RDDs

    We can also inspect the contents of an RDD at a high-level by summarizing the contents of what's inside. Here are a few methods:

    print('RDD Count:', rdd.count())
    print('RDD Count by key:', rdd.countByKey())
    print('RDD Collect as map:',  rdd.collectAsMap())
    print('RDD Num Partitions:', rdd.getNumPartitions())

    Here's the output of the above:

    RDD Count: 6
    RDD Count by key: defaultdict(<class 'int'>, {'b': 1, 'a': 2, 'c': 3})
    RDD Collect as map: {'b': 2, 'a': 2, 'c': 11}
    RDD Num Partitions: 4
    • .count(): Counts the number of entries in our RDD.
    • .countByKey(): Count the number of values per key.
    • .collectAsMap(): Returns key/value pairs as a dictionary (NOTE: since this returns as a dictionary, duplicate keys will not be returned... only the last pair will be returned).
    • .getNumPartitions(): List the number of partitions our RDD is split into.

    We can also return run some stats operations on RDDs.

    print('RDD Min:', rdd.min())
    print('RDD Max:', rdd.max())
    print('RDD Mean:', rdd.mean())
    print('RDD Standard deviation:', rdd.stdev())
    print('RDD Variance:', rdd2.variance())

    Here's what the output of the above looks like if we were to run them on an RDD created with rdd.paralellize(range(100)):

    RDD2 Min: 0
    RDD2 Max: 99
    RDD2 Mean: 49.5
    RDD2 Standard deviation: 28.8660700477
    RDD2 Variance: 833.25

    An easier way to perform all the calculations above at once is with the stats() method (really useful!). Printing rdd.stats() returns the following:

    (count: 100, mean: 49.5, stdev: 28.8660700477, max: 99.0, min: 0.0)

    Creating RDDs From Text Files

    We'll commonly need to work with data sources that output raw text files (like device logs). This scenario is a great use case for when we'd want to use RDDs.

    I'm going to use a text file with Zoo animal information:

    sc.textFile('/FileStore/tables/animals.txt').take(5)

    Here's the output:

    ['id,name,common_name,'scientific_name,gender,birth_date',
     '1,Codi,Pale white-eye,Zosterops pallidus,Male,8/5/2018',
     '2,Janice,Sportive lemur,Lepilemur rufescens,Female,12/9/2018',
     '3,Barny,Bare-faced go away bird,Lorythaixoides concolor,Male,3/5/2019',
     '4,Rudd,Baboon,Papio cynocephalus,Male,6/16/2019',
     '5,Marybeth,Bushbaby,Galago crassicaudataus,Female,4/29/2019']

    Ahh, the first line in our RDD looks to be header names! We don't want these in our final RDD. Here's a common way of dealing with this:

    headers = full_csv.first()  
    rdd =  rdd.filter(lambda line: line != headers)

    first() retrieves the first line in our RDD, which we then remove from the RDD by using filter().

    Creating RDDs From Multiple Text Files

    If you’re dealing with a ton of data (the legendary phenomenon known as “big data”), you probably have a shit-ton of data constantly writing to multiple files in a single location like an S3 bucket. This scenario is when the wholeTextFiles() method comes into play:

    rdd = rdd.wholeTextFiles("/path/to/my/directory")

    This will read data from all files in a given directory as though they were all one single file.

    Parsing Data in RDDs

    If you look closely at our zoo animal example, you'll notice that each line became an item in our RDD as opposed to each item. This is where the RDD .map() method is crucial.

    map() is the most commonly used RDD method: it performs a single operation against every line in an RDD. Perhaps the most common use of map() is to split each line of an RDD by a delimiter:

    animalRDD = animalRDD.map(lambda line: line.split(","))

    Now we'll notice each line is an array of values, instead of a single string:

    [['1',
      'Codi',
      'Pale white-eye',
      'Zosterops pallidus',
      'Male',
      '8/5/2018'],
     ['2',
      'Janice',
      'Sportive lemur',
      'Lepilemur rufescens',
      'Female',
      '12/9/2018'],
     ['3',
      'Barny',
      'Bare-faced go away bird',
      'Lorythaixoides concolor',
      'Male',
      '3/5/2019'],
     ['4',
      'Rudd',
      'Baboon',
      'Papio cynocephalus',
      'Male',
      '6/16/2019'],
     ['5',
      'Marybeth',
      'Bushbaby',
      'Galago crassicaudataus',
      'Female',
      '4/29/2019']

    Create DataFrames From RDDs

    Let's say we want to read raw text files, but we want our result data to be tabular. This is where we can create DataFrames from RDDs!

    Using Text Files to Set Headers

    We're going to assume that our RDD will eventually become a DataFrame of tabular data, thus we need a way to structure our data. Here are the contents of the headers file we're using:

    id:string
    full_name:string
    email:string
    date_of_birth:string
    credit_card_number:int
    company_name:string
    customer_location_lat:double
    customer_location_long:double

    We're using the left side as our key names, and the right side as our data types. How do we make something of this raw .txt data, you ask? First, we'll use map():

    headers = sc.textFile('/FileStore/tables/customerheaders.txt').map(lambda line: line.split(":")).collect()

    This creates the following output:

    [['id', 'string'],
     ['full_name', 'string'],
     ['email', 'string'],
     ['date_of_birth', 'string'],
     ['credit_card_number', 'int'],
     ['company_name', 'string'],
     ['customer_location_lat', 'double'],
     ['customer_location_long', 'double']]

    With this RDD mapped correctly, we can now create a function to parse this map and create a workable relationship of column names and Spark data types:

    from pyspark.sql.types import IntegerType, DoubleType, StringType
    
    def strToType(str):
      if str == 'int':
        return IntegerType()
      elif str == 'double':
        return DoubleType()
      else:
        return StringType()
    
    schema = StructType([StructField(t[0], strToType(t[1]), True) for t in headers])
    
    for item in schema:
      print(item)

    Printing this will give us the following:

    StructField(id,IntegerType,true)
    StructField(full_name,StringType,true)
    StructField(email,StringType,true)
    StructField(date_of_birth,StringType,true)
    StructField(company_name,StringType,true)
    StructField(customer_location_lat,DoubleType,true)
    StructField(customer_location_long,DoubleType,true)

    Reading Records From a Text File

    Our headers are good to go. Next, we'll create an RDD of customer records:

    textFileRecordsRDD = sc.textFile('/FileStore/tables/customers.txt')
    
    textFileRecordsRDD.take(4)
    ['7,Mohandas MacRierie,[email protected],11/24/1990,Fliptune,-7.1309871,111.591546',
     '254,Rita Slany,[email protected],8/7/1961,Yodoo,48.7068855,2.3371075',
     '960,Jobey Defew,[email protected],4/26/1965,Dynazzy,41.4530119,-8.5322628',
     '34581,Sauncho Risen,[email protected],3/21/1988,Mudo,39.3096699,-7.4404863']

    Each line in this file is a string. If we split each string by comma, we'll have values for each of our columns:

    def parseLine(line):
      tokens = zip(line.split(","), headers)
      parsed_tokens = []
      for token in tokens:
        token_type = token[1][1]
        print('token_type = ', token[0])
        if token_type == 'double':
          parsed_tokens.append(float(token[0]))
        elif token_type == 'int':
          parsed_tokens.append(int(token[0]))
        else:
          parsed_tokens.append(token[0])
      return parsed_tokens
    
    
    records = textFileRecordsRDD.map(parseLine)
    
    for item in records.take(4):
        print(item)

    Output:

    [7, 'Mohandas MacRierie', '[email protected]', '11/24/1990', 'Fliptune', -7.1309871, 111.591546]
    [254, 'Rita Slany', '[email protected]', '8/7/1961', 'Yodoo', 48.7068855, 2.3371075]
    [960, 'Jobey Defew', '[email protected]', '4/26/1965', 'Dynazzy', 41.4530119, -8.5322628]
    [34581, 'Sauncho Risen', '[email protected]', '3/21/1988', 'Mudo', 39.3096699, -7.4404863]

    Finally, let's create a DataFrame from this information. To do this, we can use the createDataFrame() method which is called on the Spark context object:

    df = sc.createDataFrame(records, schema)

    createDataFrame() requires two arguments: the first being the content of the DataFrame, and the second being a schema which contains the column names and data types.

    Here's what displaying this DataFrame looks like:

    id full_name email date_of_birth company_name customer_location_lat customer_location_long
    7 Mohandas MacRierie [email protected] 11/24/1990 Fliptune -7.1309871 111.591546
    254 Rita Slany [email protected] 8/7/1961 Yodoo 48.7068855 2.3371075
    960 Jobey Defew [email protected] 4/26/1965 Dynazzy 41.4530119 -8.5322628
    34581 Sauncho Risen [email protected] 3/21/1988 Mudo 39.3096699 -7.4404863
    73653 Humfried Matteotti [email protected] 11/29/1950 Feedfish 30.172782 111.0765
    49240 Rickert Venditto [email protected] 11/10/1968 Jayo 52.0130334 15.4233905
    938 Florie Strank [email protected] 1/8/1965 Rhyloo 44.255137 5.262028
    36 Clarabelle Katte [email protected] 8/7/1954 Divape 7.539403 -72.77228
    0 Matelda Guidini [email protected] 2/2/1982 Eidel -18.8200534 49.0652963
    7 Sheffie Hallworth [email protected] 6/18/1972 Zoonder 50.75425 86.13414

    More Useful RDD Methods

    Want more RDD goodness? Here are a few other useful RDD methods to play with before I send you on your way:

    • rdd.foreach([FUNCTION]): Performs a function for each item in an RDD.
    • rdd.groupBy([CRITERA]): Performs a groupby aggregate.
    • rdd.subtract(rdd2): Returns values from RDD #1 which also exist in RDD #2.
    • rdd.subtractByKey(rdd2): Similar to the above, but matches key/value pairs specifically.
    • rdd.sortBy([FUNCTION]): Sort an RDD by a given function.
    • rdd.sortByKey(): Sort an RDD of key/value pairs in chronological order of the key name.
    • rdd.join(rdd2): Joins two RDDs, even for RDDs which are lists! This is an interesting method in itself which is worth investigation in its own right, if you have the time.

    Useful RDD Documentation

    As mentioned, RDD documentation is scarce. Before you go, I feel obligated to share two resources with you which helped me along my way:

    • Datacamp RDD Cheatsheet: A quick useful reference for the most commonly used RDD methods and patterns.
    • Spark RDD API Docs: This was unusually hard to find, but it's a link to some official Apache Spark RDD documentation which lists every RDD method as well as an example of each one being used.
    Todd Birchard's' avatar
    New York City Website
    Engineer with an ongoing identity crisis. Breaks everything before learning best practices. Completely normal and emotionally stable.

    Engineer with an ongoing identity crisis. Breaks everything before learning best practices. Completely normal and emotionally stable.