Working with PySpark RDDs

Working with Spark's original data structure API: Resilient Distributed Datasets.

Working with PySpark RDDs

    We've spent an awful lot of time in this series speaking about DataFrames, which is only one of the 3 data structure APIs we can work with in Spark (or one of two data structure APIs in PySpark, if you're keeping score). Chances are you've worked with DataFrames in capacities outside of Spark, so picking up Spark DataFrames probably wasn't too much of a shift. So what about RDDs?

    We've already covered DataFrames to the point of building a structured stream, so now is as good a time as ever to take a step back and explore RDDs.

    What are RDDs?

    RDDs (Resilient Distributed Datasets) are Spark's OG way of working with data. RDDs are distributed because they're partitioned across the various nodes that make up your Spark cluster. RDDs are resilient because they're highly available- if a node in our cluster happens to crap out in the middle of a computation, the RDD will be automatically recovered from the nodes still in operation. If this sounds familiar, it's because DataFrames share these same properties (indeed, this renders the name "RDD" non-descriptive).

    RDDs are different from DataFrames in that RDDs do not store data in labeled columns. RDDs are great for performing transformations on unstructured data at a lower-level than DataFrames. If you're looking to clean or manipulate data on a level that lives before tabular data (such as just formatting incoming strings, etc) it will benefit the performance of your pipelines to do this in an RDD first. Alternatively, if you never intend to output tabular data at all, RDDs will be your friend.

    Creating an RDD

    Let's create an RDD from some nonsense data. As with the previous posts in this series, we'll be using the Databricks notebook interface to simply things a bit.

    To create an RDD, we're going to use the Spark .parallelize() function:

    rdd = sc.parallelize([('a', 6),
                          ('a', 1),
                          ('b', 2),
                          ('c', 5),
                          ('c', 8),
                          ('c', 11)])

    .parallelize() takes a Python collection and distributes it for creation as an RDD. We passed a list of tuples, thus our resulting RDD will store the tuple as a key/value store (with the first value being the key and the second value being the value). Unlike a Python dictionary, we can store multiple values per key.

    Inspecting the Contents of our RDD

    Let's verify that our RDD was created correctly. To do this, we need to print out each entry individually:

    for items in rdd.collect():
        print(items)
    ('a', 7)
    ('a', 2)
    ('b', 2)
    ('c', 5)
    ('c', 8)
    ('c', 11)

    For huge RDDs, it's probably a good idea not to print every record in our RDD. Instead, we can use .take() to get only the first n entries:

    for items in rdd.take(2):
        print(items)
    ('a', 7)
    ('a', 2)

    There are plenty of other ways we can inspect the contents of an RDD besides printing every entry. Here are a few methods:

    print('RDD Count:', rdd.count())
    print('RDD Count by key:', rdd.countByKey())
    print('RDD Collect as map:',  rdd.collectAsMap())
    print('RDD Num Partitions:', rdd.getNumPartitions())
    RDD Count: 6
    RDD Count by key: defaultdict(<class 'int'>, {'b': 1, 'a': 2, 'c': 3})
    RDD Collect as map: {'b': 2, 'a': 2, 'c': 11}
    RDD Num Partitions: 4
    • .count(): Counts the number of entries in our RDD.
    • .countByKey(): Count the number of values per key.
    • .collectAsMap(): Returns key/value pairs as a dictionary (NOTE: since this returns as a dictionary, duplicate keys will not be returned... only the last pair will be returned).
    • .getNumPartitions(): List the number of partitions our RDD is split into.

    RDDs don't need to be key/value stores. Here's an example of an RDD as an array:

    rdd2 = sc.parallelize(range(100))
    
    print('RDD2 Count:', rdd2.count())

    This print() function will output 100, as we now have a list which is 100 integers long as our RDD. We can also return run some standard stats operations on our RDD:

    print('RDD2 Min:', rdd2.min())
    print('RDD2 Max:', rdd2.max())
    print('RDD2 Mean:', rdd2.mean())
    print('RDD2 Standard deviation:', rdd2.stdev())
    print('RDD2 Variance:', rdd2.variance())
    RDD2 Min: 0
    RDD2 Max: 99
    RDD2 Mean: 49.5
    RDD2 Standard deviation: 28.8660700477
    RDD2 Variance: 833.25

    An easier way to perform all these calculations is with the stats() method. Printing rdd2.stats() returns the following:

    (count: 100, mean: 49.5, stdev: 28.8660700477, max: 99.0, min: 0.0)

    Create an RDD From a Text File

    Raw text files are a great use case for using RDDs. A good example of this might be something like reading logs from IoT devices. To demonstrate reading RDDs, we're going to use some test data.

    We're going to assume that we're receiving data in an unstructured format. It's common that data intended to serve into RDDs will separate the headers into their own text file, and entries will be stored in subsequent text files.

    Using Text Files to Set Headers

    We're going to assume that our RDD will eventually become a DataFrame of tabular data, thus we need a way to structure our data. Here are the contents of the headers file we're using:

    id:string
    full_name:string
    email:string
    date_of_birth:string
    credit_card_number:int
    company_name:string
    customer_location_lat:double
    customer_location_long:double

    We're using the left side as our key names, and the right side as our data types. How do we make something of this raw .txt data, you ask? First, we'll create a map:

    textFileHeadersRDD = sc.textFile('/FileStore/tables/customerheaders.txt').map(lambda line: line.split(":")).collect()
    
    textFileHeadersRDD

    This creates the following output:

    [['id', 'string'],
     ['full_name', 'string'],
     ['email', 'string'],
     ['date_of_birth', 'string'],
     ['credit_card_number', 'int'],
     ['company_name', 'string'],
     ['customer_location_lat', 'double'],
     ['customer_location_long', 'double']]

    With this map, we can now create a function to parse this map and create a workable relationship of column names and Spark data types:

    from pyspark.sql.types import IntegerType, DoubleType, StringType
    
    def strToType(str):
      if str == 'int':
        return IntegerType()
      elif str == 'double':
        return DoubleType()
      else:
        return StringType()
    
    schema = StructType([StructField(t[0], strToType(t[1]), True) for t in headers])
    
    for item in schema:
      print(item)

    Printing this will give us the following:

    StructField(id,IntegerType,true)
    StructField(full_name,StringType,true)
    StructField(email,StringType,true)
    StructField(date_of_birth,StringType,true)
    StructField(company_name,StringType,true)
    StructField(customer_location_lat,DoubleType,true)
    StructField(customer_location_long,DoubleType,true)

    Reading Records From a Text File

    Our headers are good to go. Next, we'll create an RDD of customer records:

    textFileRecordsRDD = sc.textFile('/FileStore/tables/customers.txt')
    
    textFileRecordsRDD.take(4)
    ['7,Mohandas MacRierie,[email protected],11/24/1990,Fliptune,-7.1309871,111.591546',
     '254,Rita Slany,[email protected],8/7/1961,Yodoo,48.7068855,2.3371075',
     '960,Jobey Defew,[email protected],4/26/1965,Dynazzy,41.4530119,-8.5322628',
     '34581,Sauncho Risen,[email protected]ebay.co.uk,3/21/1988,Mudo,39.3096699,-7.4404863']

    Each line in this file is a string. If we split each string by comma, we'll have values for each of our columns:

    def parseLine(line):
      tokens = zip(line.split(","), headers)
      parsed_tokens = []
      for token in tokens:
        token_type = token[1][1]
        print('token_type = ', token[0])
        if token_type == 'double':
          parsed_tokens.append(float(token[0]))
        elif token_type == 'int':
          parsed_tokens.append(int(token[0]))
        else:
          parsed_tokens.append(token[0])
      return parsed_tokens
    
    
    records = textFileRecordsRDD.map(parseLine)
    
    for item in records.take(4):
        print(item)

    Output:

    [7, 'Mohandas MacRierie', '[email protected]', '11/24/1990', 'Fliptune', -7.1309871, 111.591546]
    [254, 'Rita Slany', '[email protected]', '8/7/1961', 'Yodoo', 48.7068855, 2.3371075]
    [960, 'Jobey Defew', '[email protected]', '4/26/1965', 'Dynazzy', 41.4530119, -8.5322628]
    [34581, 'Sauncho Risen', '[email protected]', '3/21/1988', 'Mudo', 39.3096699, -7.4404863]

    Finally, let's create a DataFrame from this information:

    df = sqlContext.createDataFrame(records, schema)
    display(df)

    Here's what displaying this DataFrame looks like:

    id full_name email date_of_birth company_name customer_location_lat customer_location_long
    7 Mohandas MacRierie [email protected] 11/24/1990 Fliptune -7.1309871 111.591546
    254 Rita Slany [email protected] 8/7/1961 Yodoo 48.7068855 2.3371075
    960 Jobey Defew [email protected] 4/26/1965 Dynazzy 41.4530119 -8.5322628
    34581 Sauncho Risen [email protected] 3/21/1988 Mudo 39.3096699 -7.4404863
    73653 Humfried Matteotti [email protected] 11/29/1950 Feedfish 30.172782 111.0765
    49240 Rickert Venditto [email protected] 11/10/1968 Jayo 52.0130334 15.4233905
    938 Florie Strank [email protected] 1/8/1965 Rhyloo 44.255137 5.262028
    36 Clarabelle Katte [email protected] 8/7/1954 Divape 7.539403 -72.77228
    0 Matelda Guidini [email protected] 2/2/1982 Eidel -18.8200534 49.0652963
    7 Sheffie Hallworth [email protected] 6/18/1972 Zoonder 50.75425 86.13414

    More Useful RDD Methods

    Want more RDD goodness? Here are a few other useful RDD methods to play with before I send you on your way:

    • rdd.foreach([FUNCTION_NAME]): Performs a function for each item in an RDD.
    • rdd.groupBy([CRITERA]): Performs a groupby aggregate.
    • rdd.subtract(rdd2).collect(): Returns values from RDD #1 which also exist in RDD #2.
    • rdd.subtractByKey(rdd2).collect(): Similar to the above, but matches key/value pairs specifically.
    • rdd.sortBy([FUNCTION]).collect(): Sort an RDD by a given function.
    • rdd.sortByKey().collect(): Sort an RDD of key/value pairs in chronological order of the key name.
    Todd Birchard's' avatar
    New York City Website
    Product manager turned engineer with an ongoing identity crisis. Breaks everything before learning best practices. Completely normal and emotionally stable.

    Product manager turned engineer with an ongoing identity crisis. Breaks everything before learning best practices. Completely normal and emotionally stable.