For being the lifeblood of Spark, there’s surprisingly little documentation on how to actually work with them. If I had to guess, most of the world has been too spoiled by DataFrames to be bothered with non-tabular data. Strange world we live in when using the core data API of Spark is considered a “pro move.”

We've already spent an awful lot of time in this series speaking about DataFrames, which are only one of the 3 data structure APIs we can work with in Spark (or one of two data structure APIs in PySpark, if you're keeping score). After learning how to clean DataFrames, transform DataFrames, and even building structured streams from DataFrames, it seems now is as good a time as ever to take a step back and explore RDDs.

What are RDDs?

RDDs (Resilient Distributed Datasets) are Spark's OG way of working with data. RDDs are distributed because they're partitioned across the various nodes that make up your Spark cluster. RDDs are resilient because they're highly available- if a node in our cluster happens to crap out in the middle of a computation, the RDD will be automatically recovered from the nodes still in operation. If this sounds familiar, it's because DataFrames share these same properties (indeed, this renders the name "RDD" non-descriptive).

The best way to think about RDDs is “one-dimensional” data, which includes both arrays and key/value stores. RDDs are great for performing transformations on unstructured data at a lower-level than DataFrames: if you're looking to clean or manipulate data on a level that lives before tabular data (such as just formatting text files, etc) it will benefit the performance of your pipelines to do this in an RDD first. Alternatively, if you never intend to output tabular data at all, RDDs will be your friend (and only option).

Creating RDDs

RDDs can be created with hard coded data using the parallelize() method, or from text files by using either textfile() or wholeTextFiles(). We’ll be using parallelize() for this next part.

Types of RDDs

RDDs typically follow one of three patterns: an array, a simple key/value store, and a key/value store consisting of arrays.A list RDD accepts input as simple as you might imagine - lists containing strings, numbers, or both:

rdd = sc.parallelize([1, 5, 60, 'a', 9, 'c', 4, 'z', 'f'])

Key/value RDDs are a bit more unique. Instead of accepting a dictionary as you might except, RDDs accept lists of tuples, where the first value is the “key” and the second value is the “value”. This is because RDDs allow multiple values for the same key, unlike Python dictionaries:

rdd = sc.parallelize([('a', 6),
                      ('a', 1),
                      ('b', 2),
                      ('c', 5),
                      ('c', 8),
                      ('c', 11)])

The last pattern we see often are key/value stores of dicts. These types of RDDs have some interesting methods available to them which we’ll explore later:

rdd4 = sc.parallelize([("a",[1, 2, 3]), ("b",[4, 5])])

Inspecting the Contents of RDDs

The natural first thing to do with an RDD is to inspect it. There are a bunch of ways to investigate the data stored in an RDD, but the most fundamental one is using collect(). Calling collect() on an RDD lists all the contents of that RDD:

print(rdd.collect())

This should return something like the following:

('a', 7)
('a', 2)
('b', 2)
('c', 5)
('c', 8)
('c', 11)

For huge RDDs, it's probably a good idea not to print every record in our RDD. Instead, we can use .take(n) to get only the first n entries:

print(rdd.take(2))
('a', 7)
('a', 2)

Summarizing RDDs

We can also inspect the contents of an RDD at a high-level by summarizing the contents of what's inside. Here are a few methods:

print('RDD Count:', rdd.count())
print('RDD Count by key:', rdd.countByKey())
print('RDD Collect as map:',  rdd.collectAsMap())
print('RDD Num Partitions:', rdd.getNumPartitions())

Here's the output of the above:

RDD Count: 6
RDD Count by key: defaultdict(<class 'int'>, {'b': 1, 'a': 2, 'c': 3})
RDD Collect as map: {'b': 2, 'a': 2, 'c': 11}
RDD Num Partitions: 4
  • .count(): Counts the number of entries in our RDD.
  • .countByKey(): Count the number of values per key.
  • .collectAsMap(): Returns key/value pairs as a dictionary (NOTE: since this returns as a dictionary, duplicate keys will not be returned... only the last pair will be returned).
  • .getNumPartitions(): List the number of partitions our RDD is split into.

We can also return run some stats operations on RDDs.

print('RDD Min:', rdd.min())
print('RDD Max:', rdd.max())
print('RDD Mean:', rdd.mean())
print('RDD Standard deviation:', rdd.stdev())
print('RDD Variance:', rdd2.variance())

Here's what the output of the above looks like if we were to run them on an RDD created with rdd.paralellize(range(100)):

RDD2 Min: 0
RDD2 Max: 99
RDD2 Mean: 49.5
RDD2 Standard deviation: 28.8660700477
RDD2 Variance: 833.25

An easier way to perform all the calculations above at once is with the stats() method (really useful!). Printing rdd.stats() returns the following:

(count: 100, mean: 49.5, stdev: 28.8660700477, max: 99.0, min: 0.0)

Creating RDDs From Text Files

We'll commonly need to work with data sources that output raw text files (like device logs). This scenario is a great use case for when we'd want to use RDDs.

I'm going to use a text file with Zoo animal information:

sc.textFile('/FileStore/tables/animals.txt').take(5)

Here's the output:

['id,name,common_name,'scientific_name,gender,birth_date',
 '1,Codi,Pale white-eye,Zosterops pallidus,Male,8/5/2018',
 '2,Janice,Sportive lemur,Lepilemur rufescens,Female,12/9/2018',
 '3,Barny,Bare-faced go away bird,Lorythaixoides concolor,Male,3/5/2019',
 '4,Rudd,Baboon,Papio cynocephalus,Male,6/16/2019',
 '5,Marybeth,Bushbaby,Galago crassicaudataus,Female,4/29/2019']

Ahh, the first line in our RDD looks to be header names! We don't want these in our final RDD. Here's a common way of dealing with this:

headers = full_csv.first()  
rdd =  rdd.filter(lambda line: line != headers)

first() retrieves the first line in our RDD, which we then remove from the RDD by using filter().

Creating RDDs From Multiple Text Files

If you’re dealing with a ton of data (the legendary phenomenon known as “big data”), you probably have a shit-ton of data constantly writing to multiple files in a single location like an S3 bucket. This scenario is when the wholeTextFiles() method comes into play:

rdd = rdd.wholeTextFiles("/path/to/my/directory")

This will read data from all files in a given directory as though they were all one single file.

Parsing Data in RDDs

If you look closely at our zoo animal example, you'll notice that each line became an item in our RDD as opposed to each item. This is where the RDD .map() method is crucial.

map() is the most commonly used RDD method: it performs a single operation against every line in an RDD. Perhaps the most common use of map() is to split each line of an RDD by a delimiter:

animalRDD = animalRDD.map(lambda line: line.split(","))

Now we'll notice each line is an array of values, instead of a single string:

[['1',
  'Codi',
  'Pale white-eye',
  'Zosterops pallidus',
  'Male',
  '8/5/2018'],
 ['2',
  'Janice',
  'Sportive lemur',
  'Lepilemur rufescens',
  'Female',
  '12/9/2018'],
 ['3',
  'Barny',
  'Bare-faced go away bird',
  'Lorythaixoides concolor',
  'Male',
  '3/5/2019'],
 ['4',
  'Rudd',
  'Baboon',
  'Papio cynocephalus',
  'Male',
  '6/16/2019'],
 ['5',
  'Marybeth',
  'Bushbaby',
  'Galago crassicaudataus',
  'Female',
  '4/29/2019']

Create DataFrames From RDDs

Let's say we want to read raw text files, but we want our result data to be tabular. This is where we can create DataFrames from RDDs!

Using Text Files to Set Headers

We're going to assume that our RDD will eventually become a DataFrame of tabular data, thus we need a way to structure our data. Here are the contents of the headers file we're using:

id:string
full_name:string
email:string
date_of_birth:string
credit_card_number:int
company_name:string
customer_location_lat:double
customer_location_long:double

We're using the left side as our key names, and the right side as our data types. How do we make something of this raw .txt data, you ask? First, we'll use map():

headers = sc.textFile('/FileStore/tables/customerheaders.txt').map(lambda line: line.split(":")).collect()

This creates the following output:

[['id', 'string'],
 ['full_name', 'string'],
 ['email', 'string'],
 ['date_of_birth', 'string'],
 ['credit_card_number', 'int'],
 ['company_name', 'string'],
 ['customer_location_lat', 'double'],
 ['customer_location_long', 'double']]

With this RDD mapped correctly, we can now create a function to parse this map and create a workable relationship of column names and Spark data types:

from pyspark.sql.types import IntegerType, DoubleType, StringType

def strToType(str):
  if str == 'int':
    return IntegerType()
  elif str == 'double':
    return DoubleType()
  else:
    return StringType()

schema = StructType([StructField(t[0], strToType(t[1]), True) for t in headers])

for item in schema:
  print(item)

Printing this will give us the following:

StructField(id,IntegerType,true)
StructField(full_name,StringType,true)
StructField(email,StringType,true)
StructField(date_of_birth,StringType,true)
StructField(company_name,StringType,true)
StructField(customer_location_lat,DoubleType,true)
StructField(customer_location_long,DoubleType,true)

Reading Records From a Text File

Our headers are good to go. Next, we'll create an RDD of customer records:

textFileRecordsRDD = sc.textFile('/FileStore/tables/customers.txt')

textFileRecordsRDD.take(4)
['7,Mohandas MacRierie,mmacrierie0@xrea.com,11/24/1990,Fliptune,-7.1309871,111.591546',
 '254,Rita Slany,rslany1@ucla.edu,8/7/1961,Yodoo,48.7068855,2.3371075',
 '960,Jobey Defew,jdefew2@nbcnews.com,4/26/1965,Dynazzy,41.4530119,-8.5322628',
 '34581,Sauncho Risen,srisen3@ebay.co.uk,3/21/1988,Mudo,39.3096699,-7.4404863']

Each line in this file is a string. If we split each string by comma, we'll have values for each of our columns:

def parseLine(line):
  tokens = zip(line.split(","), headers)
  parsed_tokens = []
  for token in tokens:
    token_type = token[1][1]
    print('token_type = ', token[0])
    if token_type == 'double':
      parsed_tokens.append(float(token[0]))
    elif token_type == 'int':
      parsed_tokens.append(int(token[0]))
    else:
      parsed_tokens.append(token[0])
  return parsed_tokens


records = textFileRecordsRDD.map(parseLine)

for item in records.take(4):
    print(item)

Output:

[7, 'Mohandas MacRierie', 'mmacrierie0@xrea.com', '11/24/1990', 'Fliptune', -7.1309871, 111.591546]
[254, 'Rita Slany', 'rslany1@ucla.edu', '8/7/1961', 'Yodoo', 48.7068855, 2.3371075]
[960, 'Jobey Defew', 'jdefew2@nbcnews.com', '4/26/1965', 'Dynazzy', 41.4530119, -8.5322628]
[34581, 'Sauncho Risen', 'srisen3@ebay.co.uk', '3/21/1988', 'Mudo', 39.3096699, -7.4404863]

Finally, let's create a DataFrame from this information. To do this, we can use the createDataFrame() method which is called on the Spark context object:

df = sc.createDataFrame(records, schema)

createDataFrame() requires two arguments: the first being the content of the DataFrame, and the second being a schema which contains the column names and data types.

Here's what displaying this DataFrame looks like:

id full_name email date_of_birth company_name customer_location_lat customer_location_long
7 Mohandas MacRierie mmacrierie0@xrea.com 11/24/1990 Fliptune -7.1309871 111.591546
254 Rita Slany rslany1@ucla.edu 8/7/1961 Yodoo 48.7068855 2.3371075
960 Jobey Defew jdefew2@nbcnews.com 4/26/1965 Dynazzy 41.4530119 -8.5322628
34581 Sauncho Risen srisen3@ebay.co.uk 3/21/1988 Mudo 39.3096699 -7.4404863
73653 Humfried Matteotti hmatteotti4@bandcamp.com 11/29/1950 Feedfish 30.172782 111.0765
49240 Rickert Venditto rvenditto5@mapy.cz 11/10/1968 Jayo 52.0130334 15.4233905
938 Florie Strank fstrank6@soundcloud.com 1/8/1965 Rhyloo 44.255137 5.262028
36 Clarabelle Katte ckatte7@lulu.com 8/7/1954 Divape 7.539403 -72.77228
0 Matelda Guidini mguidini8@1688.com 2/2/1982 Eidel -18.8200534 49.0652963
7 Sheffie Hallworth shallworth9@wunderground.com 6/18/1972 Zoonder 50.75425 86.13414

More Useful RDD Methods

Want more RDD goodness? Here are a few other useful RDD methods to play with before I send you on your way:

  • rdd.foreach([FUNCTION]): Performs a function for each item in an RDD.
  • rdd.groupBy([CRITERA]): Performs a groupby aggregate.
  • rdd.subtract(rdd2): Returns values from RDD #1 which also exist in RDD #2.
  • rdd.subtractByKey(rdd2): Similar to the above, but matches key/value pairs specifically.
  • rdd.sortBy([FUNCTION]): Sort an RDD by a given function.
  • rdd.sortByKey(): Sort an RDD of key/value pairs in chronological order of the key name.
  • rdd.join(rdd2): Joins two RDDs, even for RDDs which are lists! This is an interesting method in itself which is worth investigation in its own right, if you have the time.

Useful RDD Documentation

As mentioned, RDD documentation is scarce. Before you go, I feel obligated to share two resources with you which helped me along my way:

  • Datacamp RDD Cheatsheet: A quick useful reference for the most commonly used RDD methods and patterns.
  • Spark RDD API Docs: This was unusually hard to find, but it's a link to some official Apache Spark RDD documentation which lists every RDD method as well as an example of each one being used.