DataFrame Transformations in PySpark (Continued)

Continuing to apply transformations to Spark DataFrames using PySpark.

DataFrame Transformations in PySpark (Continued)

    We've covered a fair amount of ground when it comes to Spark DataFrame transformations in this series. In part 1, we touched on filter(), select(), dropna(), fillna(), and isNull(). Then, we moved on to dropDuplicates and user-defined functions ( udf) in part 2. This time around, we'll be building on these concepts and introduce some new ways to transform data so you can officially be awarded your PySpark Guru Certification, award by us here at Hackers & Slackers.*

    *Hackers & Slackers is not an accredited institution and is respected by virtually nobody in general.

    Of course, we need to get things started with some sample data. I'm going to use the 2016 MLB postseason dataset we worked with last time. For a quick refresher, here's a preview:

    awayTeamName homeTeamName homeFinalRuns awayFinalRuns durationMinutes
    Red Sox Indians 6 0 199
    Nationals Dodgers 3 8 252
    Orioles Blue Jays 5 2 205
    Dodgers Nationals 3 4 272
    Cubs Dodgers 2 10 238
    Indians Red Sox 3 4 221
    Red Sox Indians 5 4 213
    Nationals Dodgers 6 5 224
    Blue Jays Rangers 3 5 210
    Rangers Blue Jays 7 6 201
    Giants Cubs 1 0 150
    Dodgers Cubs 0 1 165
    Cubs Dodgers 6 0 198
    Dodgers Cubs 5 0 156
    Giants Mets 0 3 191
    Cubs Giants 6 5 303
    Dodgers Cubs 8 4 217
    Blue Jays Indians 2 1 164
    Indians Blue Jays 5 1 181
    Cubs Giants 5 6 205
    Blue Jays Rangers 1 10 178
    Blue Jays Indians 2 0 164
    Cubs Dodgers 4 8 256
    Dodgers Nationals 3 4 226
    Giants Cubs 5 2 183
    Indians Blue Jays 0 3 157
    Dodgers Nationals 5 2 235
    Indians Blue Jays 2 4 203

    Adding and Modifying Columns

    As you already know, we can create new columns by calling withColumn() operation on a DataFrame, while passing the name of the new column (the first argument), as well as an operation for which values should live in each row of that column (second argument).

    It's .lit() Fam

    It's hard to mention columns without talking about PySpark's lit() function. lit() is simply one of those unsexy but critically important parts of PySpark that we need to understand, simply because PySpark a Python API which interacts with a Java JVM (as you might be painfully aware).

    lit() is a way for us to interact with column literals in PySpark: Java expects us to explicitly mention when we're trying to work with a column object. Because Python has no native way of doing, we must instead use lit() to tell the JVM that what we're talking about is a column literal.

    To import lit(), we need to import functions from pyspark.sql:

    from pyspark.sql.functions import lit, when, col
    

    I've imported a few other things here which we'll get to later. With these imported, we can add new columns to a DataFrame the quick and dirty way:

    from pyspark.sql.functions import lit, when, col, regexp_extract
    
    df = df_with_winner.withColumn('testColumn', F.lit('this is a test'))
    
    display(df)
    

    This will add a column, and populate each cell in that column with occurrences of the string: this is a test.

    lit() is necessary when creating columns with values directly. If we use another function like concat(), there is no need to use lit() as it is implied that we're working with columns.

    Creating Columns Based on Criteria

    Another function we imported with functions is the where function. PySpark's when() functions kind of like SQL's WHERE clause (remember, we've imported this the from pyspark.sql package). In this case, we can use when() to create a column when the outcome of a conditional is true.

    The first parameter we pass into when() is the conditional (or multiple conditionals, if you want). I'm not a huge fan of this syntax, but here's the format of this looks:

    df = df.withColumn([COLUMN_NAME]. F.when([CONDITIONAL], [COLUMN_VALUE]))
    

    Let's use our baseball example to see the when() function in action.  Remember last time when we added a "winner" column to out DataFrame? Well, we can do this using when() instead!

    # Construct a new dynamic column
    df = df_with_test_column.withColumn('game_winner', when(
        (col("homeFinalRuns") > col("awayFinalRuns")), col("homeFinalRuns")
    ))
    

    Let's walk through this step by step, shall we? We set the name of our column to be game_winner off the bat. Next is the most important part: the conditional. In our example, we have (col("homeFinalRuns") > col("awayFinalRuns")). Remember how we said that The JVM we're interacting absolutely must know which data type we're talking about at all times? col() means we're comparing the values of two columns; or more specifically, we're comparing the values in every row in these columns.

    Also, notice that we don't specify something like df.col([COLUMN_NAME]), or even df.[COLUMN_NAME]. We instead pass a string containing the name of our columns to col(), and things just seem to work. PySpark is smart enough to assume that the columns we provide via col() (in the context of being in when()) refers to the columns of the DataFrame being acted on. After all, why wouldn't they? See... PySpark isn't annoying all the time - it's just inconsistently annoying (which may be even more annoying to the aspiring Sparker, admittedly).

    But wait, something's missing! We've only added the winner when the winner happens to be the home team! How do we add the away team? Is there some sort of else equivalent to when()? Why yes, I'm so glad you've asked! It happens to be called otherwise().

    With otherwise(), we can tack on an action to take when conditional in our when() statement returns False! That means our when() statement now takes this shape:

    df = df.withColumn([COLUMN_NAME]. F.when([CONDITIONAL], [COLUMN_VALUE]).otherwsie([COLUMN_VALUE]))
    

    Now modifying our original snippet:

    # Construct a new dynamic column
    df = df_with_test_column.withColumn('gameWinner', when(
        (col("homeFinalRuns") > col("awayFinalRuns")), col("homeFinalRuns")
    ).otherwise(lit('awayTeamName')))
    
    display(df)
    

    Excellent, dear boy and/or girl! Let's get a look at what we've done so far:

    awayTeamName homeTeamName homeFinalRuns awayFinalRuns durationMinutes testColumn gameWinner
    Red Sox Indians 6 0 199 this is a test Indians
    Nationals Dodgers 3 8 252 this is a test Nationals
    Orioles Blue Jays 5 2 205 this is a test Blue Jays
    Dodgers Nationals 3 4 272 this is a test Dodgers
    Cubs Dodgers 2 10 238 this is a test Cubs
    Indians Red Sox 3 4 221 this is a test Indians
    Red Sox Indians 5 4 213 this is a test Indians
    Nationals Dodgers 6 5 224 this is a test Dodgers
    Blue Jays Rangers 3 5 210 this is a test Blue Jays
    Rangers Blue Jays 7 6 201 this is a test Blue Jays
    Giants Cubs 1 0 150 this is a test Cubs
    Dodgers Cubs 0 1 165 this is a test Dodgers
    Cubs Dodgers 6 0 198 this is a test Dodgers
    Dodgers Cubs 5 0 156 this is a test Cubs
    Giants Mets 0 3 191 this is a test Giants
    Cubs Giants 6 5 303 this is a test Giants
    Dodgers Cubs 8 4 217 this is a test Cubs
    Blue Jays Indians 2 1 164 this is a test Indians
    Indians Blue Jays 5 1 181 this is a test Blue Jays
    Cubs Giants 5 6 205 this is a test Cubs
    Blue Jays Rangers 1 10 178 this is a test Blue Jays
    Blue Jays Indians 2 0 164 this is a test Indians
    Cubs Dodgers 4 8 256 this is a test Cubs
    Dodgers Nationals 3 4 226 this is a test Dodgers
    Giants Cubs 5 2 183 this is a test Cubs
    Indians Blue Jays 0 3 157 this is a test Indians
    Dodgers Nationals 5 2 235 this is a test Nationals
    Indians Blue Jays 2 4 203 this is a test Indians

    So far so good, but maybe we should get rid of that useless testColumn we created?

    Dropping Entire Columns

    Dropping columns is easy! we can simply use the drop() method on our DataFrame, and pass the name of the column:

    df = df_with_test_column.drop('testColumn')
    

    That does the trick! Feel free to check by using display(df).

    String Operations & Filters

    We've looked at how filter() works pretty extensively. Why don't we pair this with some of Spark's common string operations to see how powerful filtering can be?

    The PySpark like() method works exactly like the SQL equivalent: % denotes a wild card which means "any character or number of characters". Take a look at how we'd use like() to find winning teams whose name start with "Nat":

    df = df.filter(df.winner.like('Nat%'))
    
    display(df)
    
    awayTeamName homeTeamName durationMinutes homeFinalRuns awayFinalRuns winner
    Nationals Dodgers 252 3 8 Nationals
    Dodgers Nationals 235 5 2 Nationals

    Wow, the Nationals only won two games! Hah, they sure do suck. Sorry about Harper, by the way.

    like() covers a lot. We can use like() to see if a string contains, starts with, or ends with a value. I highly recommend using like() where possible, but if that's not your cup of tea, you can use contains(), startswith(), and endsWith() instead. They all do what you'd imagine.

    isIn() to Match Multiple Values

    If we want to match by multiple values, isIn() is pretty great. This takes multiple values as it's parameters, and will return all rows where the columns of column X match any of n values:

    df = df.filter(df.gameWinner.isin('Cubs', 'Indians'))
    
    display(df)
    
    awayTeamName homeTeamName durationMinutes homeFinalRuns awayFinalRuns gameWinner
    Red Sox Indians 199 6 0 Indians
    Indians Red Sox 221 3 4 Indians
    Red Sox Indians 213 5 4 Indians
    Giants Cubs 150 1 0 Cubs
    Dodgers Cubs 156 5 0 Cubs
    Dodgers Cubs 217 8 4 Cubs
    Blue Jays Indians 164 2 1 Indians
    Cubs Giants 205 5 6 Cubs
    Blue Jays Indians 164 2 0 Indians
    Cubs Dodgers 256 4 8 Cubs
    Giants Cubs 183 5 2 Cubs
    Indians Blue Jays 157 0 3 Indians
    Indians Blue Jays 203 2 4 Indians

    concat() For Appending Strings

    Here's another SQL sweetheart. Using concat() adds strings together:

    df = df.withColumn('gameTitle', concat(df.homeTeamName, lit(' vs. '), df.awayTeamName))
    
    display(df)
    
    awayTeamName homeTeamName homeFinalRuns awayFinalRuns durationMinutes winner gameTitle
    Red Sox Indians 6 0 199 Indians Indians vs. Red Sox
    Nationals Dodgers 3 8 252 Nationals Dodgers vs. Nationals
    Orioles Blue Jays 5 2 205 Blue Jays Blue Jays vs. Orioles
    Dodgers Nationals 3 4 272 Dodgers Nationals vs. Dodgers
    Cubs Dodgers 2 10 238 Dodgers Dodgers vs. Cubs
    Indians Red Sox 3 4 221 Indians Red Sox vs. Indians
    Red Sox Indians 5 4 213 Indians Indians vs. Red Sox
    Nationals Dodgers 6 5 224 Dodgers Dodgers vs. Nationals
    Blue Jays Rangers 3 5 210 Blue Jays Rangers vs. Blue Jays
    Rangers Blue Jays 7 6 201 Blue Jays Blue Jays vs. Rangers
    Giants Cubs 1 0 150 Cubs Cubs vs. Giants
    Dodgers Cubs 0 1 165 Dodgers Cubs vs. Dodgers
    Cubs Dodgers 6 0 198 Dodgers Dodgers vs. Cubs
    Dodgers Cubs 5 0 156 Cubs Cubs vs. Dodgers
    Giants Mets 0 3 191 Giants Mets vs. Giants
    Cubs Giants 6 5 303 Giants Giants vs. Cubs
    Dodgers Cubs 8 4 217 Cubs Cubs vs. Dodgers
    Blue Jays Indians 2 1 164 Indians Indians vs. Blue Jays
    Indians Blue Jays 5 1 181 Blue Jays Blue Jays vs. Indians
    Cubs Giants 5 6 205 Cubs Giants vs. Cubs
    Blue Jays Rangers 1 10 178 Blue Jays Rangers vs. Blue Jays
    Blue Jays Indians 2 0 164 Indians Indians vs. Blue Jays
    Cubs Dodgers 4 8 256 Cubs Dodgers vs. Cubs
    Dodgers Nationals 3 4 226 Dodgers Nationals vs. Dodgers
    Giants Cubs 5 2 183 Cubs Cubs vs. Giants
    Indians Blue Jays 0 3 157 Indians Blue Jays vs. Indians
    Dodgers Nationals 5 2 235 Nationals Nationals vs. Dodgers
    Indians Blue Jays 2 4 203 Indians Blue Jays vs. Indians

    .regexp_extract() for Regex Functions

    For real programmers who know what they're actually doing, you can disregard all those other lame string operations: regexp_extract() is the moment you've been waiting for. This takes one parameter: your regex. I could totally demonstrate my 1337 regex skills right here, but uh, I just don't feel like it right now:

    df = df.withColumn('newColumn', regexp_extract([INSERT_1337_REGEX_HERE]))
    

    Number Operators

    You should be getting the point by now! Here are some number operators, which you can use just like the string operators (speaking syntactically, which is apparently a word):

    • round: Rounds to the nearest scale. Accepts a column as the first parameter, and a scale as the second.
    • floor: Returns the lowest number in a set. Accepts a column name as the first parameter.
    • ceil: Returns the highest number in a set. Accepts a column name as the first parameter.

    That'll Do, Pig (or Equivalent Spirit Animal)

    Once again, I've dumped an ungodly amount of documentation veiled as a tutorial in our lap. Thank you for putting up with my nonsense by continuing to click on links to these stupid posts.

    There's still more to do! Join us next time when we walk through the extracting and loading of data in PySpark while we give this transformation stuff a rest. Peace fam.

    Todd Birchard's' avatar
    New York City Website
    Product manager turned engineer with an ongoing identity crisis. Breaks everything before learning best practices. Completely normal and emotionally stable.

    Product manager turned engineer with an ongoing identity crisis. Breaks everything before learning best practices. Completely normal and emotionally stable.