In this article I give a high level, example driven, overview of writing data processing programs using the Python programming language bindings for Spark which is commonly known as PySpark. I specifically cover the Spark SQL DataFrame API which I've found to be the most useful way to write data analytics code with PySpark. The target audience for this article are Python developers, ideally who have a cursory understanding of other popular PyData Stack libraries such as Pandas and Numpy.
The Apache Spark docs describe Spark as a unified analytics engine for large-scale data processing complete with APIs for popular programming languages: Java, Scala, Python and, R. In my mental map and personal experience I have come to understand Spark as a performant, highly scalable, distributed computing environment that offers a suite of tools for accomplishing a myriad of big data engineering tasks ranging from data engineering, data analysis and, even machine learning.
Spark is designed to be a distributed computing environment conceptually similar to that of Map Reduce of the Apache Hadoop ecosystem. However, in the case of Spark there is a strong emphasis on caching data in memory thus it is rapidly available for computation as opposed to writing and reading many intermediate results to files distributed across a Hadoop cluster as in the case of Map Reduce.
The architecture of a Spark environment is composed of the following components.
Spark Driver: A component of an application being ran on Spark which instantiates a SparkSession and subsequently interacts with the Cluster manager (if in a distributed cluster environment), and handles the creation of Spark Compute jobs.
Spark Session: The entry point to the underlying Spark API though which you funnel application logic through to run in the Spark Environment.
Cluster Manager: A program that manages the resources of the Spark Environment allocating processes for task executors to run in.
Spark Executor: Is the program where Spark carries out computation tasks as instructured by the Cluster Manager and Driver Program.
The last important aspect of Spark programming that I want to cover before diving into the code are the concepts of transformations and actions. Transformations are operations that are built up cummulatively to alter the structure of a dataset in Spark but, they are lazily evaluated until an action is taken. Actions are like a trigger results in the Spark program executing one or many different transformation steps.
Below are the transformations and actions I'll be presenting throughout this tutorial.
Transformations
Actions
Spark itself is engineered to execute in the Java Virtual Machine (JVM) but, as mentioned earlier, there are API bindings which have been built allowing you to use Python or R in place of Java or Scala. For this article I am only covering the use of the PySpark bindings for the Spark environment. To facilitate getting setup quicky for interactive learning I advise simply installing PySpark from PyPI using pip, preferably within a Python virtual environment. However, before installing PySpark from PyPi you need to have Java 8 or later (ideally at least version 11) installed and the JAVA_HOME variable added to your environment's PATH variable. I recommend installing the Java Development Kit from either AdoptJDK or Azul.
As mentioned earlier the Spark Session is the interface through which you funnel the logic you program through to the Spark execution environment.
When you fire up the pyspark shell from the command line the Spark Session object is already available in the environment for you as the common variable name of spark.
$ pyspark
Python 3.8.7 (default, Feb 20 2021, 23:06:12)
[Clang 12.0.0 (clang-1200.0.32.2)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.0.2
/_/
Using Python version 3.8.7 (default, Feb 20 2021 23:06:12)
SparkSession available as 'spark'.
>>> spark
<pyspark.sql.session.SparkSession object at 0x113261220>
However, when working with Spark from a script or an interactive notebook such as Jupyter you are likely to have to create a Spark session youself like so.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkOverview').getOrCreate()
The Spark DataFrame is modeled after the wildly popular Pandas DataFrame which both represent a tabular dataset
with named columns and are commonly operated on as matrices or a series of vectorized operations. The Spark Dataframe is a bit different from the Pandas version in that it represents a distributed in-memory table composed of specifically typed Row objects with a uniform Schema.
In PySpark there are two main categories of data types: scalar (sometimes referred to as simple or basic) and complex. Below I've broken down the Spark Data Types linking them to the Python types that they correspond to but, please also be sure to rely on the official Apache Spark documentation as the authoritative source for technical specifications and understand this article is mearly an example rich overview of PySpark.
Category | Spark Data Type | Python Data Type(s) |
Scalar | ByteType | int |
Scalar | ShortType | int |
Scalar | IntegerType | int |
Scalar | LongType | int |
Scalar | FloatType | float |
Scalar | DoubleType | float |
Scalar | DecimalType | decimal.Decimal |
Scalar | StringType | str |
Scalar | BooleanType | bool |
Complex | BinaryType | bytearray |
Complex | TimestampType | datetime.datetime |
Complex | DateType | datetime.date |
Complex | ArrayType | list, tuple, array |
Complex | MapType | dict |
Complex | StructType | list or tuple |
Complex | StructField | the Python value type |
First DataFrame using Simple Standard Python Types
I want to start off in a very Python friendly way by creating the first PySpark DataFrame build from input data consisting solely of Python types.
rows = [
("2021-02-15", "Mon", 50, 0.0),
("2021-02-16", "Tue", 57, 0.0),
("2021-02-17", "Wed", 64, 5.0),
("2021-02-18", "Thr", 53, 17.0),
("2021-02-19", "Fri", 56, 0.0),
("2021-02-20", "Sat", 61, 0.0),
("2021-02-21", "Sun", 60, 10.0)
]
col_names = ['date', 'day', 'tempF', 'precipMM']
temps_df = spark.createDataFrame(rows, col_names)
temps_df.show()
The call to the show() action produces the following output which nicely represents the data structured in analytics friendly grid like form.
+----------+---+-----+--------+
| date|day|tempF|precipMM|
+----------+---+-----+--------+
|2021-02-15|Mon| 50| 0.0|
|2021-02-16|Tue| 57| 0.0|
|2021-02-17|Wed| 64| 5.0|
|2021-02-18|Thr| 53| 17.0|
|2021-02-19|Fri| 56| 0.0|
|2021-02-20|Sat| 61| 0.0|
|2021-02-21|Sun| 60| 10.0|
+----------+---+-----+--------+
I can go a step lower in the layers of abstraction to uncover the fact that Spark DataFrames are actually composed of Row objects. If I use another action method, the collect() method, I get a list of Row objects returned.
temps_df.collect()
[Row(date='2021-02-15', day='Mon', tempF=50, precipMM=0.0), Row(date='2021-02-16', day='Tue', tempF=57, precipMM=0.0), Row(date='2021-02-17', day='Wed', tempF=64, precipMM=5.0), Row(date='2021-02-18', day='Thr', tempF=53, precipMM=17.0), Row(date='2021-02-19', day='Fri', tempF=56, precipMM=0.0), Row(date='2021-02-20', day='Sat', tempF=61, precipMM=0.0), Row(date='2021-02-21', day='Sun', tempF=60, precipMM=10.0)]
In fact, you can actually construct a DataFrame from a list of Row objects as opposed to the list of tuples I showed earlier.
from pyspark.sql import Row
rows = [
Row("2021-02-15", "Mon", 50, 0.0),
Row("2021-02-16", "Tue", 57, 0.0),
Row("2021-02-17", "Wed", 64, 5.0),
Row("2021-02-18", "Thr", 53, 17.0),
Row("2021-02-19", "Fri", 56, 0.0),
Row("2021-02-20", "Sat", 61, 0.0),
Row("2021-02-21", "Sun", 60, 10.0)
]
temps_df2 = spark.createDataFrame(rows, ['date', 'day', 'tempF', 'precipMM'])
temps_df2.show()
+----------+---+-----+--------+ | date|day|tempF|precipMM| +----------+---+-----+--------+ |2021-02-15|Mon| 50| 0.0| |2021-02-16|Tue| 57| 0.0| |2021-02-17|Wed| 64| 5.0| |2021-02-18|Thr| 53| 17.0| |2021-02-19|Fri| 56| 0.0| |2021-02-20|Sat| 61| 0.0| |2021-02-21|Sun| 60| 10.0| +----------+---+-----+--------+
I also mentioned previously that there are parallels between the concepts of a Pandas DataFrame and a Spark DataFrame and, in fact, you can use the toPandas() method to convert a Spark DataFrame into a Pandas DataFrame.
temps2_df.toPandas()
Inspecting DataFrames
Under the hood a Spark DataFrame is composed of Row objects and each column of a DataFrame is bound to one of the specific data types I listed in my earlier table. You will often find it useful to inspect the schema of a DataFrame to check a data type or column name which can be accomplished with the printSchema() method of a DataFrame.
temps_df.printSchema()
Output
root
|-- date: string (nullable = true)
|-- day: string (nullable = true)
|-- tempF: long (nullable = true)
|-- precipMM: double (nullable = true)
Of course the show action method may also be of use if you want to see the data in grid form or have a look at a nested column.
temps_df.show()
+----------+---+-----+--------+ | date|day|tempF|precipMM| +----------+---+-----+--------+ |2021-02-15|Mon| 50| 0.0| |2021-02-16|Tue| 57| 0.0| |2021-02-17|Wed| 64| 5.0| |2021-02-18|Thr| 53| 17.0| |2021-02-19|Fri| 56| 0.0| |2021-02-20|Sat| 61| 0.0| |2021-02-21|Sun| 60| 10.0| +----------+---+-----+--------+
For numerical columns it is also useful to look at the summary statistics that comprise the underlying data which can be accomplished by calling the describe(...) DataFrame method and passing it one or more columns to describe.
temps_df.describe(['tempF', 'precipMM']).show()
+-------+------------------+-----------------+ |summary| tempF| precipMM| +-------+------------------+-----------------+ | count| 7| 7| | mean|57.285714285714285|4.571428571428571| | stddev| 4.820590756130958| 6.67974906856894| | min| 50| 0.0| | max| 64| 17.0| +-------+------------------+-----------------+
Spark programming can be a bit of a paradigm shift for some who come from traditional object oriented programming background and even is a signficant departure from the Pandas DataFrame model it derives many of its design concepts from. A fundamental component of the functional nature of Spark programming is the concept of immutability. Operations that you perform on Spark datasets never change object state data as a result of a method call on an object or when an object is passed into a function call. Instead, the original object's state data is cloned, transformed via one or more operations taking the form of method or function calls then finally, a new object wrapped data structure is returned.
A very common task when working with DataFrames in Spark, Pandas, or R is adding columns to the dataset. Most often this is a derived column based off the data from one or more other columns in the DataFrame but, it is quite important to remember that unlike DataFrames in Pandas or R, the Spark DataFrame is immutable so you will always be creating a new DataFrame each time you add a column.
There are many useful functions in the pyspark.sql.functions module you will use when operating on columns of a DataFrame.
import pyspark.sql.functions as F
# Add a Column Specifying if it is the Weekend or Not
from pyspark.sql.types import StringType
weekend_labeler = F.udf(lambda day: 'yes' if day in ['Sat', 'Sun'] else 'no', StringType())
temps_df = temps_df.withColumn('weekend', weekend_labeler(temps_df.day))
temps_df.show()
+----------+---+-----+--------+-------+ | date|day|tempF|precipMM|weekend| +----------+---+-----+--------+-------+ |2021-02-15|Mon| 50| 0.0| no| |2021-02-16|Tue| 57| 0.0| no| |2021-02-17|Wed| 64| 5.0| no| |2021-02-18|Thr| 53| 17.0| no| |2021-02-19|Fri| 56| 0.0| no| |2021-02-20|Sat| 61| 0.0| yes| |2021-02-21|Sun| 60| 10.0| yes| +----------+---+-----+--------+-------+
I could also add a column for Celsius which would involve a calculation to convert the Fahrenheit unit of measure like so.
from pyspark.sql.types import FloatType
calc_celsius = F.udf(lambda tf: round((tf - 32) * 5 / 9, 1), FloatType())
# using F.col() instead of temps_df.tempF for variety
temps_df = temps_df.withColumn('tempC', calc_celsius(F.col('tempF')))
temps_df.show()
+----------+---+-----+--------+-------+-----+ | date|day|tempF|precipMM|weekend|tempC| +----------+---+-----+--------+-------+-----+ |2021-02-15|Mon| 50| 0.0| no| 10.0| |2021-02-16|Tue| 57| 0.0| no| 13.9| |2021-02-17|Wed| 64| 5.0| no| 17.8| |2021-02-18|Thr| 53| 17.0| no| 11.7| |2021-02-19|Fri| 56| 0.0| no| 13.3| |2021-02-20|Sat| 61| 0.0| yes| 16.1| |2021-02-21|Sun| 60| 10.0| yes| 15.6| +----------+---+-----+--------+-------+-----+
In addition to calculations you can also add in a literal value such as string or a number as a constant value for an entire column.
temps_df = temps_df.withColumn('Author', F.lit('Adam'))
temps_df.show()
+----------+---+-----+--------+-------+-----+------+ | date|day|tempF|precipMM|weekend|tempC|Author| +----------+---+-----+--------+-------+-----+------+ |2021-02-15|Mon| 50| 0.0| no| 10.0| Adam| |2021-02-16|Tue| 57| 0.0| no| 13.9| Adam| |2021-02-17|Wed| 64| 5.0| no| 17.8| Adam| |2021-02-18|Thr| 53| 17.0| no| 11.7| Adam| |2021-02-19|Fri| 56| 0.0| no| 13.3| Adam| |2021-02-20|Sat| 61| 0.0| yes| 16.1| Adam| |2021-02-21|Sun| 60| 10.0| yes| 15.6| Adam| +----------+---+-----+--------+-------+-----+------+
Another way to create a derived column, or said another way, a column as a result of a calculation is to use the expression function from the functions module.
temps_df = temps_df.withColumn('even_tempF', F.expr('tempF % 2 == 0'))
temps_df.show()
+----------+---+-----+--------+-------+-----+------+----------+ | date|day|tempF|precipMM|weekend|tempC|Author|even_tempF| +----------+---+-----+--------+-------+-----+------+----------+ |2021-02-15|Mon| 50| 0.0| no| 10.0| Adam| true| |2021-02-16|Tue| 57| 0.0| no| 13.9| Adam| false| |2021-02-17|Wed| 64| 5.0| no| 17.8| Adam| true| |2021-02-18|Thr| 53| 17.0| no| 11.7| Adam| false| |2021-02-19|Fri| 56| 0.0| no| 13.3| Adam| true| |2021-02-20|Sat| 61| 0.0| yes| 16.1| Adam| false| |2021-02-21|Sun| 60| 10.0| yes| 15.6| Adam| true| +----------+---+-----+--------+-------+-----+------+----------+
Another common task you may find yourself doing is "replacing a column" with transformed values of the original. At first this may seem to blur the lines of the immutability principle if one lets their mental map of the operation get the best of them, likely due to a convention of reassigning the output of an operation to the same original variable.
For example, if I again print out the schema of the temps_df DataFrame you see that the date column is of type string which restricts our ability to perform date specific arithemic operations on it.
temps_df.printSchema()
root |-- date: string (nullable = true) |-- day: string (nullable = true) |-- tempF: long (nullable = true) |-- precipMM: double (nullable = true) |-- weekend: string (nullable = true) |-- tempC: float (nullable = true) |-- Author: string (nullable = false) |-- even_tempF: boolean (nullable = true)
Clearly a useful approach here would be to convert the date column to be of Spark DateType type which is accomplishable via the to_date(...) function within the pyspark.sql.functions module along with the withColumn(...) method we've just explored. I use these together but, I assign the value to the new variable tmp then inspect both the original temps_df and the new tmp DataFrames to accentuate that in fact the original temps_df DataFrame remains unchanged.
tmp = temps_df.withColumn('date', F.to_date(F.col('date'), 'yyyy-MM-dd'))
tmp.printSchema()
root |-- date: date (nullable = true) |-- day: string (nullable = true) |-- tempF: long (nullable = true) |-- precipMM: double (nullable = true) |-- weekend: string (nullable = true) |-- tempC: float (nullable = true) |-- Author: string (nullable = false) |-- even_tempF: boolean (nullable = true)
Then to relook at temps_df
temps_df.printSchema()
root |-- date: string (nullable = true) |-- day: string (nullable = true) |-- tempF: long (nullable = true) |-- precipMM: double (nullable = true) |-- weekend: string (nullable = true) |-- tempC: float (nullable = true) |-- Author: string (nullable = false) |-- even_tempF: boolean (nullable = true)
I do actually want my temps_df DataFrame to have it's date column be composed of the DateType type so, I will use the common convention and perform the transformation and reassign it to the temps_df variable providing the illusion that the column has been replace. However, we should now be able to fight off the mental trap of thinking we actually mutated the column data even though I am being so cavalier in my use of the word replace.
temps_df = temps_df.withColumn('date', F.to_date(F.col('date'), 'yyyy-MM-dd'))
temps_df.printSchema()
root |-- date: date (nullable = true) |-- day: string (nullable = true) |-- tempF: long (nullable = true) |-- precipMM: double (nullable = true) |-- weekend: string (nullable = true) |-- tempC: float (nullable = true) |-- Author: string (nullable = false) |-- even_tempF: boolean (nullable = true)
Again, we must be cognizant of our use of words that seemingly convey we are mutating data when we actually are not. That being said, we can do operations that appear to rename a column using the withColumnRenamed(...)
method of a DataFrame but, of course, we know we are simply generating a new DataFrame with its specified column renamed to something new not changing the original.
temps_df = temps_df.withColumnRenamed('Author', 'Engineer')
temps_df.show()
+----------+---+-----+--------+-------+-----+--------+----------+ | date|day|tempF|precipMM|weekend|tempC|Engineer|even_tempF| +----------+---+-----+--------+-------+-----+--------+----------+ |2021-02-15|Mon| 50| 0.0| no| 10.0| Adam| true| |2021-02-16|Tue| 57| 0.0| no| 13.9| Adam| false| |2021-02-17|Wed| 64| 5.0| no| 17.8| Adam| true| |2021-02-18|Thr| 53| 17.0| no| 11.7| Adam| false| |2021-02-19|Fri| 56| 0.0| no| 13.3| Adam| true| |2021-02-20|Sat| 61| 0.0| yes| 16.1| Adam| false| |2021-02-21|Sun| 60| 10.0| yes| 15.6| Adam| true| +----------+---+-----+--------+-------+-----+--------+----------+
There will be many times when you only want to target a subset of the columns of a DataFrame for your Spark programming and analysis. This is easily accomplished using the select(...) method of the DataFrame class.
Similar to what was demonstrated in the examples showing how to add or manipulate columns you can use the col(...) function from the functions module to specify a column to select.
temps_df.select(F.col('tempF')).show()
+-----+ |tempF| +-----+ | 50| | 57| | 64| | 53| | 56| | 61| | 60| +-----+
You can also simply use a string literal to specify one or more columns like so.
temps_df.select('tempF').show()
+-----+ |tempF| +-----+ | 50| | 57| | 64| | 53| | 56| | 61| | 60| +-----+
However, you will often want to select multiple columns and those columns may even be something that you build dynamically using a list. This is also easily accomplished.
select_cols = ['tempF', 'even_tempF']
temps_df.select(select_cols).show()
+-----+----------+ |tempF|even_tempF| +-----+----------+ | 50| true| | 57| false| | 64| true| | 53| false| | 56| true| | 61| false| | 60| true| +-----+----------+
Or you can even just pass in multiple arguments like so.
temps_df.select('tempF', 'even_tempF').show()
+-----+----------+ |tempF|even_tempF| +-----+----------+ | 50| true| | 57| false| | 64| true| | 53| false| | 56| true| | 61| false| | 60| true| +-----+----------+
To retrieve a column as a unique set of values you can chain on the distinct() method then you can subsequently call the collect action to recieve a list of object to further process.
weekend_values = temps_df.select('weekend').distinct().collect()
weekend_values
[Row(weekend='no'), Row(weekend='yes')]
You can then convert this list of Row objects into standard Python str values
weekend_values_list = [item.weekend for item in weekend_values]
weekend_values_list
['no', 'yes']
To create a new DataFrame which omits one or more of the original columns you can clearly use the select(...)
method just covered and include fewer columns but, in cases where you have many column and you only what to carve off a small number of them it is easier to use the drop(...)
method as demonstrated below.
temps_df = temps_df.drop('Engineer', 'tempC')
temps_df.show()
+----------+---+-----+--------+-------+----------+ | date|day|tempF|precipMM|weekend|even_tempF| +----------+---+-----+--------+-------+----------+ |2021-02-15|Mon| 50| 0.0| no| true| |2021-02-16|Tue| 57| 0.0| no| false| |2021-02-17|Wed| 64| 5.0| no| true| |2021-02-18|Thr| 53| 17.0| no| false| |2021-02-19|Fri| 56| 0.0| no| true| |2021-02-20|Sat| 61| 0.0| yes| false| |2021-02-21|Sun| 60| 10.0| yes| true| +----------+---+-----+--------+-------+----------+
Probably the most common thing you will do with DataFrames is aggregating values to calcuate meaningful information such as counts, mins, maxes, averages and such. To perform aggregations you will typically use one of the aggregation operators from the functions module along with the agg(...) method of the DataFrame class.
For example, to find the average temperature of the weather dataset I've been working with is as simple as shown below.
avg_tempF = temps_df.agg(F.avg('tempF')).collect()
avg_tempF
[Row(avg(tempF)=57.285714285714285)]
You will also likely want to partition a data in a DataFrame by a set of common values then perform an aggregation over that partition or group and collapse that down to the aggregate set of results per grouping. So to find the average temperature for weekends and non-weekends that looks as follows.
avg_tempF = temps_df.groupBy('weekend').agg(F.avg('tempF'))
avg_tempF.show()
+-------+----------+ |weekend|avg(tempF)| +-------+----------+ | no| 56.0| | yes| 60.5| +-------+----------+
To perform multiple aggregation operations simply specify more than one and, you can also use the alias(...) in conjunction to the aggregation function to give more appealing names.
nice_temps_summary = temps_df.agg(
F.avg('tempF').alias('avg_tempF'),
F.min('tempF').alias('min_tempF'),
F.max('tempF').alias('max_tempF')
)
nice_temps_summary.show()
+------------------+---------+---------+ | avg_tempF|min_tempF|max_tempF| +------------------+---------+---------+ |57.285714285714285| 50| 64| +------------------+---------+---------+
There are two functional ways to filter the rows of a Spark SQL DataFrame accomplishable using either the where(...)
or filter(...)
methods. Both do the same thing but I tend to use the where(...)
function pretty much exclusively because of its parity with SQL queries and its just one less thing I need to remember.
Just like any other predicate you use a comparison operator between two operands. The left hand operand in this case is specified using the col(column_name)
function from the pyspark.sql.function module and the right hand operator is typically a variable or a literal value. The comparison operators are your typical ==, !=, >, <, >= and <= ones you use in if statements and others.
For example, I can filter down to just even temperatures like so.
even_temps = temps_df.where(F.col('even_tempF') == True)
even_temps.show()
+----------+---+-----+--------+-------+-----+--------+----------+ | date|day|tempF|precipMM|weekend|tempC|Engineer|even_tempF| +----------+---+-----+--------+-------+-----+--------+----------+ |2021-02-15|Mon| 50| 0.0| no| 10.0| Adam| true| |2021-02-17|Wed| 64| 5.0| no| 17.8| Adam| true| |2021-02-19|Fri| 56| 0.0| no| 13.3| Adam| true| |2021-02-21|Sun| 60| 10.0| yes| 15.6| Adam| true| +----------+---+-----+--------+-------+-----+--------+----------+
To perform multiple, or chained, filtering simply tack on additional where(...) method calls.
even_wkday_temps = temps_df.where(F.col('even_tempF') == True)\
.where(F.col('weekend') == 'no')
even_wkday_temps.show()
+----------+---+-----+--------+-------+-----+--------+----------+ | date|day|tempF|precipMM|weekend|tempC|Engineer|even_tempF| +----------+---+-----+--------+-------+-----+--------+----------+ |2021-02-15|Mon| 50| 0.0| no| 10.0| Adam| true| |2021-02-17|Wed| 64| 5.0| no| 17.8| Adam| true| |2021-02-19|Fri| 56| 0.0| no| 13.3| Adam| true| +----------+---+-----+--------+-------+-----+--------+----------+
Filtering can also be used in conjunction with row subsetting via the select(...) method like so.
even_wkday_temps = temps_df.where(F.col('even_tempF') == True)\
.where(F.col('weekend') == 'no')\
.select('day', 'tempF')
even_wkday_temps.show()
+---+-----+ |day|tempF| +---+-----+ |Mon| 50| |Wed| 64| |Fri| 56| +---+-----+
Sorting rows of a DataFrame is accomplishable with the orderBy(...)
as well as the sort(...)
methods but, again I tend to stick to the orderBy(...)
method because of it's parity with the SQL language.
To order the weather dataset by the tempF column's values is as simple as follows.
temps_df.orderBy('tempF').show()
+----------+---+-----+--------+-------+----------+ | date|day|tempF|precipMM|weekend|even_tempF| +----------+---+-----+--------+-------+----------+ |2021-02-15|Mon| 50| 0.0| no| true| |2021-02-18|Thr| 53| 17.0| no| false| |2021-02-19|Fri| 56| 0.0| no| true| |2021-02-16|Tue| 57| 0.0| no| false| |2021-02-21|Sun| 60| 10.0| yes| true| |2021-02-20|Sat| 61| 0.0| yes| false| |2021-02-17|Wed| 64| 5.0| no| true| +----------+---+-----+--------+-------+----------+
Sorting by descending order requires chaining on a desc() call in conjunction with using the col(...) function from the functions module.
temps_df.orderBy(F.col('tempF').desc()).show()
+----------+---+-----+--------+-------+----------+ | date|day|tempF|precipMM|weekend|even_tempF| +----------+---+-----+--------+-------+----------+ |2021-02-17|Wed| 64| 5.0| no| true| |2021-02-20|Sat| 61| 0.0| yes| false| |2021-02-21|Sun| 60| 10.0| yes| true| |2021-02-16|Tue| 57| 0.0| no| false| |2021-02-19|Fri| 56| 0.0| no| true| |2021-02-18|Thr| 53| 17.0| no| false| |2021-02-15|Mon| 50| 0.0| no| true| +----------+---+-----+--------+-------+----------+
Ordering by multiple columns follows the same pattern as the select method for multiple columns.
temps_df.orderBy('even_tempF', 'tempF').show()
+----------+---+-----+--------+-------+----------+ | date|day|tempF|precipMM|weekend|even_tempF| +----------+---+-----+--------+-------+----------+ |2021-02-18|Thr| 53| 17.0| no| false| |2021-02-16|Tue| 57| 0.0| no| false| |2021-02-20|Sat| 61| 0.0| yes| false| |2021-02-15|Mon| 50| 0.0| no| true| |2021-02-19|Fri| 56| 0.0| no| true| |2021-02-21|Sun| 60| 10.0| yes| true| |2021-02-17|Wed| 64| 5.0| no| true| +----------+---+-----+--------+-------+----------+
One of the things that Spark is popularly used for and incredibly powerful at doing is performing Extract, Transform and Load (ETL) tasks as part of a Data Pipeline. However, a challenging aspect common to many Data Pipelines are the presence of multi-valued complex fields often found in semi-structured data formats like JSON files. As you might guess Spark provides for ways of working with such columns within the DataFrame.
As an example of let me concoct a new DataFrame named weather which contains an ArrayType[LongType] column representing windspeed measured every 6 hours of a day along with another column of type MapType[StringType, IntegerType] reprenting the min and max temperatures of a day.
rows = [
("2021-02-15", [2, 5, 4, 2], {'min': 38, 'max': 60}),
("2021-02-16", [11, 6, 9, 2], {'min': 27, 'max':59 }),
("2021-02-17", [0, 4, 5, 3], {'min': 33, 'max': 62}),
("2021-02-18", [1, 12, 9, 11], {'min': 19, 'max': 55}),
("2021-02-19", [23, 18, 18, 19], {'min': 20, 'max': 56}),
("2021-02-20", [8, 7, 3, 2], {'min': 32, 'max': 61}),
("2021-02-21", [4, 6, 4, 4], {'min': 37, 'max': 67})
]
col_names = ['date', 'windspeeds', 'temperatures']
weather_df = spark.createDataFrame(rows, col_names)
weather_df.show(truncate=False)
+----------+----------------+----------------------+ |date |windspeeds |temperatures | +----------+----------------+----------------------+ |2021-02-15|[2, 5, 4, 2] |[max -> 60, min -> 38]| |2021-02-16|[11, 6, 9, 2] |[max -> 59, min -> 27]| |2021-02-17|[0, 4, 5, 3] |[max -> 62, min -> 33]| |2021-02-18|[1, 12, 9, 11] |[max -> 55, min -> 19]| |2021-02-19|[23, 18, 18, 19]|[max -> 56, min -> 20]| |2021-02-20|[8, 7, 3, 2] |[max -> 61, min -> 32]| |2021-02-21|[4, 6, 4, 4] |[max -> 67, min -> 37]| +----------+----------------+----------------------+
Now lets have a look at the schema.
weather_df.printSchema()
root |-- date: string (nullable = true) |-- windspeeds: array (nullable = true) | |-- element: long (containsNull = true) |-- temperatures: map (nullable = true) | |-- key: string | |-- value: long (valueContainsNull = true)
To lift up values as an aggregate scalar like the max temperature within the nested array column one could use a Spark User Defined Function in conjunction with the withColumn(...) from the DataFrame class as shown here.
from pyspark.sql.types import LongType
max_windspeed = F.udf(lambda speeds: max(speeds), LongType())
weather_df = weather_df.withColumn('max_windspeed', max_windspeed(weather_df.windspeeds))
weather_df.show(truncate=False)
+----------+----------------+----------------------+-------------+ |date |windspeeds |temperatures |max_windspeed| +----------+----------------+----------------------+-------------+ |2021-02-15|[2, 5, 4, 2] |[max -> 60, min -> 38]|5 | |2021-02-16|[11, 6, 9, 2] |[max -> 59, min -> 27]|11 | |2021-02-17|[0, 4, 5, 3] |[max -> 62, min -> 33]|5 | |2021-02-18|[1, 12, 9, 11] |[max -> 55, min -> 19]|12 | |2021-02-19|[23, 18, 18, 19]|[max -> 56, min -> 20]|23 | |2021-02-20|[8, 7, 3, 2] |[max -> 61, min -> 32]|8 | |2021-02-21|[4, 6, 4, 4] |[max -> 67, min -> 37]|6 | +----------+----------------+----------------------+-------------+
I can similiarly calculate the temperature range from the nested array column also.
temp_range = F.udf(lambda temp: temp['max'] - temp['min'], LongType())
weather_df = weather_df.withColumn('temp_range', temp_range(weather_df.temperatures))
weather_df.show(truncate=False)
+----------+----------------+----------------------+-------------+----------+ |date |windspeeds |temperatures |max_windspeed|temp_range| +----------+----------------+----------------------+-------------+----------+ |2021-02-15|[2, 5, 4, 2] |[max -> 60, min -> 38]|5 |22 | |2021-02-16|[11, 6, 9, 2] |[max -> 59, min -> 27]|11 |32 | |2021-02-17|[0, 4, 5, 3] |[max -> 62, min -> 33]|5 |29 | |2021-02-18|[1, 12, 9, 11] |[max -> 55, min -> 19]|12 |36 | |2021-02-19|[23, 18, 18, 19]|[max -> 56, min -> 20]|23 |36 | |2021-02-20|[8, 7, 3, 2] |[max -> 61, min -> 32]|8 |29 | |2021-02-21|[4, 6, 4, 4] |[max -> 67, min -> 37]|6 |30 | +----------+----------------+----------------------+-------------+----------+
There is often considerable thought that goes into how data is stored on disk when working in the context of Big Data and the reading and writing methods used with Spark. Architectural considerations regarding Big Data Storage can be complex and often lesser understood topics such as partitioning and row vs columnar data formats are applied. I will not be coverting these topics in this tutorial and instead will be focusing on providing explanations and examples of simple file I/O with relatively vanilla CSV and JSON files to provide a basic foundation upon which to build on.
Spark exposes an interface for reading data via the DataFrameReader which is accessed via the read property of an instantiated SparkSession object commonly assigned to a variable named spark. There are many ways to read data from disk in many different formats such as CSV, JSON, Avaro, and Parquet. I will focus on two relatively simple examples using JSON and CSV but the approach is fairly consistent across data formats.
First I need to use an HTTP client to download a CSV data file I've placed on AWS S3. You could use your browser, curl, HTTPie or any number of method. I will use HTTPie.
http --download https://tci-data.s3.us-east-2.amazonaws.com/spark/weather/weather.csv
HTTP/1.1 200 OK Accept-Ranges: bytes Content-Length: 15328 Content-Type: text/csv Date: Fri, 12 Mar 2021 03:54:15 GMT ETag: "44011286487ce8258a81c5733dc79585" Last-Modified: Tue, 23 Feb 2021 14:12:45 GMT Server: AmazonS3 x-amz-id-2: 6yek9u9CVj6O9Uo15YRbeNu47KQxAQzrAAlpjGz9p4C9DTqTAjGjxrOpFvB5ZRXDoxgWInfG4Yg= x-amz-request-id: 79VV0PBZ0DH5BJP5 Downloading 14.97 kB to "weather.csv-1" Done. 14.97 kB in 0.00064s (22.91 MB/s)
It's generally a good idea to get an idea of the structure of the data, whenever possible, before ingesting it into Spark. I will use the unix head command to peek at the first 10 rows then use the wc command to see how long the file is.
head weather.csv
day,time,tempF,windspeedMiles,winddirDegree,precipMM,humidity,pressure,WindGustMiles 2020-01-11,0,14,17,241,0.0,67,1028,23 2020-01-11,100,13,17,203,0.0,64,1028,22 2020-01-11,200,12,16,165,0.0,62,1027,21 2020-01-11,300,12,15,127,0.0,60,1027,20 2020-01-11,400,12,15,165,0.0,59,1027,19 2020-01-11,500,11,14,204,0.0,59,1026,18 2020-01-11,600,11,13,242,0.0,59,1026,17 2020-01-11,700,11,13,203,0.0,58,1026,17 2020-01-11,800,11,13,164,0.0,58,1026,17
wc -l weather.csv
385 weather.csv
From these two simple inspection methods I see that this in fact a CSV file with 9 columns, one string representing a date and 8 numeric columns, along with one row representing a header and 384 rows of data.
These pieces of information are directly useful when reading data into a Spark DataFrame via the read.csv(...) helper method in that I can specify that the CSV file contains a header and I can even tell Spark upfront information about the schema of the data.
csv_path = "weather.csv"
weather_csv = spark.read.csv(path=csv_path, inferSchema=True, header=True)
weather_csv.printSchema()
root |-- day: string (nullable = true) |-- time: integer (nullable = true) |-- tempF: integer (nullable = true) |-- windspeedMiles: integer (nullable = true) |-- winddirDegree: integer (nullable = true) |-- precipMM: double (nullable = true) |-- humidity: integer (nullable = true) |-- pressure: integer (nullable = true) |-- WindGustMiles: integer (nullable = true)
weather_csv.show(5, truncate=False)
+----------+----+-----+--------------+-------------+--------+--------+--------+-------------+ |day |time|tempF|windspeedMiles|winddirDegree|precipMM|humidity|pressure|WindGustMiles| +----------+----+-----+--------------+-------------+--------+--------+--------+-------------+ |2020-01-11|0 |14 |17 |241 |0.0 |67 |1028 |23 | |2020-01-11|100 |13 |17 |203 |0.0 |64 |1028 |22 | |2020-01-11|200 |12 |16 |165 |0.0 |62 |1027 |21 | |2020-01-11|300 |12 |15 |127 |0.0 |60 |1027 |20 | |2020-01-11|400 |12 |15 |165 |0.0 |59 |1027 |19 | +----------+----+-----+--------------+-------------+--------+--------+--------+-------------+ only showing top 5 rows
Notice that Spark determined the schema of the data automatically by making its best guess at different column data types which I explicitly instructed it to do so via the inferSchema=True parameter to the csv(...) method. This is a helpful but oftem abused feature of Spark's DataFrameReader because while its clearly useful that data types can be automatically inferred it is also a performance drain and potentially error prone.
From a performance perspective this can be deliterious because by default Spark will take a full extra pass over the data but, you can tune this by specifying a proportional value up to 1 for the parameter samplingRatio which instructs Spark to sample a fraction of the data. Conversely, from a quality perspective by tuning the samplingRatio setting it to a value less than one may result in the incorrect schema being inferred.
You can instead specify the schema that you want enforced via the schema parameter of the csv(...) method. This is especially important on large data sets. There are two ways to specify the schema, one via StructType and StructField objects imported from pyspark.sql.types module or, a second method is to use SQL Data Definition Language (DDL) specified as a string. I find using DDL string preferrable as they are less verbose and SQL type definitions are something I've seen often in my career so I'm most conformtable with them.
schema_ddl = """
day DATE,
time INTEGER,
tempF INTEGER,
windspeedMile INTEGER,
winddirDegree INTEGER,
precipMM DOUBLE,
humidity INTEGER,
pressure INTEGER,
windGustMiles INTEGER
"""
weather_df = spark.read.csv(path=csv_path, schema=schema_ddl, header=True)
weather_df.printSchema()
root |-- day: date (nullable = true) |-- time: integer (nullable = true) |-- tempF: integer (nullable = true) |-- windspeedMile: integer (nullable = true) |-- winddirDegree: integer (nullable = true) |-- precipMM: double (nullable = true) |-- humidity: integer (nullable = true) |-- pressure: integer (nullable = true) |-- windGustMiles: integer (nullable = true)
weather_df.show(5)
+----------+----+-----+-------------+-------------+--------+--------+--------+-------------+ | day|time|tempF|windspeedMile|winddirDegree|precipMM|humidity|pressure|windGustMiles| +----------+----+-----+-------------+-------------+--------+--------+--------+-------------+ |2020-01-11| 0| 14| 17| 241| 0.0| 67| 1028| 23| |2020-01-11| 100| 13| 17| 203| 0.0| 64| 1028| 22| |2020-01-11| 200| 12| 16| 165| 0.0| 62| 1027| 21| |2020-01-11| 300| 12| 15| 127| 0.0| 60| 1027| 20| |2020-01-11| 400| 12| 15| 165| 0.0| 59| 1027| 19| +----------+----+-----+-------------+-------------+--------+--------+--------+-------------+ only showing top 5 rows
I also have the same data formatted as a JSON file which I will demonstrate reading in next except this time I use the read.json(...) convienence method instead as shown below. I similarly down load the data stored in an AWS S3 bucket using HTTPie then I'm off and running with Spark again.
http --download https://tci-data.s3.us-east-2.amazonaws.com/spark/weather/weather.json
HTTP/1.1 200 OK Accept-Ranges: bytes Content-Length: 62476 Content-Type: application/json Date: Tue, 23 Feb 2021 15:08:45 GMT ETag: "947d3ea26b804995a7ef12a70761e215" Last-Modified: Tue, 23 Feb 2021 14:12:46 GMT Server: AmazonS3 x-amz-id-2: qiFJX6GgyBCAje5hSlXGCuLLEmYceRLqjSHy2tDVLknCad/mYmqxxR5goLttjnnP6jztjvfeP9E= x-amz-request-id: VX760AM785C27BCD Downloading 61.01 kB to "weather.json" Done. 61.01 kB in 0.12931s (471.81 kB/s)
json_path = "weather.json"
weather_json = spark.read.json(path=json_path)
weather_json.printSchema()
root |-- WindGustMiles: string (nullable = true) |-- day: string (nullable = true) |-- humidity: string (nullable = true) |-- precipMM: string (nullable = true) |-- pressure: string (nullable = true) |-- tempF: string (nullable = true) |-- time: string (nullable = true) |-- winddirDegree: string (nullable = true) |-- windspeedMiles: string (nullable = true)
weather_json.show(5)
+-------------+----------+--------+--------+--------+-----+----+-------------+--------------+ |WindGustMiles| day|humidity|precipMM|pressure|tempF|time|winddirDegree|windspeedMiles| +-------------+----------+--------+--------+--------+-----+----+-------------+--------------+ | 23|2020-01-11| 67| 0.0| 1028| 14| 0| 241| 17| | 22|2020-01-11| 64| 0.0| 1028| 13| 100| 203| 17| | 21|2020-01-11| 62| 0.0| 1027| 12| 200| 165| 16| | 20|2020-01-11| 60| 0.0| 1027| 12| 300| 127| 15| | 19|2020-01-11| 59| 0.0| 1027| 12| 400| 165| 15| +-------------+----------+--------+--------+--------+-----+----+-------------+--------------+ only showing top 5 rows
Writing Spark DataFrame data to disk is nearly a mirror inverse of the read operations which are accomplished through the DataFrameWriter class and exposed via the writer property of a DataFrame object. Again, there are helper methods available such as csv(...), json(...), parquet(...) and others.
I'll start by writing the weather_csv DataFrame back out to a new CSV file like so.
csv_path2 = "new_data"
weather_csv.write.csv(path="new_csv", header=True)
Writing JSON data is very similar.
weather_json.write.json(path="new_json")
Thus far I've focussed on the functional aspects of the SparkSQL DataFrame API which entail the various methods and functions that have been used to manipulate data. All throughout this tutorial I've tried to continually point out the parallels between the SQL language and various functions / methods such as select(...), where(...), orderBy(...), and groupBy(...). It turns out you can actually pretty much fully embrace the ANSI SQL 2003 standard to program your SparkSQL workloads. In this section I am going to briefly cover how to analyze data in Spark using the familiar SQL language while still capturing the extremely fast and scalable nature of Spark.
Spark gives us the ability to access data from many data sources such as standard files, databases as well as HDFS data in conjunction with Hive metastores but, I will leave the later two sources for another discussion and instead continue to focus on my sample weather data sourced from local filesystem. In order to use the SQL interface to a Spark DataFrame you must establish either a table or a view which is conceptually similar to the same relational database constructs. For this tutorial I focus only on temporary views.
http --download https://tci-data.s3.us-east-2.amazonaws.com/spark/weather/weather.csv
HTTP/1.1 200 OK Accept-Ranges: bytes Content-Length: 15328 Content-Type: text/csv Date: Fri, 12 Mar 2021 03:55:24 GMT ETag: "44011286487ce8258a81c5733dc79585" Last-Modified: Tue, 23 Feb 2021 14:12:45 GMT Server: AmazonS3 x-amz-id-2: UYWXOhn9DiZII3hFBMxz3y27aGAKvp7l931dvpljsYtqt6QdMzBevEjj2SO0PmJMtNtpmufM9KU= x-amz-request-id: FCB7CDBVTF7R2BGJ Downloading 14.97 kB to "weather.csv-2" Done. 14.97 kB in 0.00053s (27.47 MB/s)
schema_ddl = """
day DATE,
time INTEGER,
tempF INTEGER,
windspeedMile INTEGER,
winddirDegree INTEGER,
precipMM DOUBLE,
humidity INTEGER,
pressure INTEGER,
windGustMiles INTEGER
"""
weather_df = spark.read.csv(path="weather.csv", schema=schema_ddl, header=True)
weather_df.printSchema()
root |-- day: date (nullable = true) |-- time: integer (nullable = true) |-- tempF: integer (nullable = true) |-- windspeedMile: integer (nullable = true) |-- winddirDegree: integer (nullable = true) |-- precipMM: double (nullable = true) |-- humidity: integer (nullable = true) |-- pressure: integer (nullable = true) |-- windGustMiles: integer (nullable = true)
To create a temporary view in Spark use the createOrReplaceTempView('viewname') method on a SparkSQL DataFrame then use the sql(...) method on the SparkSQL Session object to execute queries to the Spark SQL engine.
weather_df.createOrReplaceTempView('weather_tbl')
spark.sql('DESCRIBE weather_tbl').show()
+-------------+---------+-------+ | col_name|data_type|comment| +-------------+---------+-------+ | day| date| null| | time| int| null| | tempF| int| null| |windspeedMile| int| null| |winddirDegree| int| null| | precipMM| double| null| | humidity| int| null| | pressure| int| null| |windGustMiles| int| null| +-------------+---------+-------+
df2 = spark.sql('''
SELECT a.*, SUM(precip_mm) OVER (ORDER BY day) AS total_precip
FROM (
SELECT day,
MIN(tempF) AS min_temp,
MAX(tempF) AS max_temp,
AVG(tempF) AS avg_temp,
SUM(precipMM) AS precip_mm
FROM weather_tbl
GROUP BY day
ORDER BY day
) a''')
df2.show()
+----------+--------+--------+------------------+------------------+------------------+ | day|min_temp|max_temp| avg_temp| precip_mm| total_precip| +----------+--------+--------+------------------+------------------+------------------+ |2020-01-02| 33| 41|36.166666666666664| 0.0| 0.0| |2020-01-03| 29| 35|31.291666666666668| 0.0| 0.0| |2020-01-04| 27| 43| 33.125| 0.0| 0.0| |2020-01-05| 33| 43| 37.5| 0.0| 0.0| |2020-01-06| 31| 42| 35.125| 0.0| 0.0| |2020-01-07| 30| 45|36.166666666666664| 0.0| 0.0| |2020-01-08| 27| 40|32.458333333333336| 0.0| 0.0| |2020-01-09| 35| 45| 40.0| 0.0| 0.0| |2020-01-10| 14| 34| 24.25| 0.0| 0.0| |2020-01-11| 11| 20|14.208333333333334| 0.0| 0.0| |2020-01-12| 8| 21|14.166666666666666|1.7000000000000004|1.7000000000000004| |2020-01-13| 15| 31|21.583333333333332| 0.0|1.7000000000000004| |2020-01-14| 20| 30| 26.125| 0.0|1.7000000000000004| |2020-01-15| 13| 33| 23.875| 0.0|1.7000000000000004| |2020-01-16| 11| 18| 14.125| 0.0|1.7000000000000004| |2020-01-17| 16| 35|27.583333333333332| 5.5| 7.2| +----------+--------+--------+------------------+------------------+------------------+
I am going to assume the reader is already familiar with standard analytical SQL so rather than present a bunch of examples and instead leave that experimentation up the reader. However, I do want make clear that both the functional API as well as the SQL interface are interchangable. For example, below I demonstrate the ineroperability of both paradigms.j
df2 = df2.withColumn('temp_range', df2.max_temp - df2.min_temp).show()
+----------+--------+--------+------------------+------------------+------------------+----------+ | day|min_temp|max_temp| avg_temp| precip_mm| total_precip|temp_range| +----------+--------+--------+------------------+------------------+------------------+----------+ |2020-01-02| 33| 41|36.166666666666664| 0.0| 0.0| 8| |2020-01-03| 29| 35|31.291666666666668| 0.0| 0.0| 6| |2020-01-04| 27| 43| 33.125| 0.0| 0.0| 16| |2020-01-05| 33| 43| 37.5| 0.0| 0.0| 10| |2020-01-06| 31| 42| 35.125| 0.0| 0.0| 11| |2020-01-07| 30| 45|36.166666666666664| 0.0| 0.0| 15| |2020-01-08| 27| 40|32.458333333333336| 0.0| 0.0| 13| |2020-01-09| 35| 45| 40.0| 0.0| 0.0| 10| |2020-01-10| 14| 34| 24.25| 0.0| 0.0| 20| |2020-01-11| 11| 20|14.208333333333334| 0.0| 0.0| 9| |2020-01-12| 8| 21|14.166666666666666|1.7000000000000004|1.7000000000000004| 13| |2020-01-13| 15| 31|21.583333333333332| 0.0|1.7000000000000004| 16| |2020-01-14| 20| 30| 26.125| 0.0|1.7000000000000004| 10| |2020-01-15| 13| 33| 23.875| 0.0|1.7000000000000004| 20| |2020-01-16| 11| 18| 14.125| 0.0|1.7000000000000004| 7| |2020-01-17| 16| 35|27.583333333333332| 5.5| 7.2| 19| +----------+--------+--------+------------------+------------------+------------------+----------+
It is up to the engineer to determine what level or combination of functional operations along with SQL are appropriate based off skill level, readability and, maintainability.
thecodinginterface.com earns commision from sales of linked products such as the book suggestions above. This enables providing high quality, frequent, and most importantly FREE tutorials and content for readers interested in Software Engineering so, thank you for supporting the authors of these resources as well as thecodinginterface.com
In this tutorial I've taken an example driven approach to introducing what I have found are the most important and useful to programming in Spark to perform large scale data analytics and data engineering tasks.
As always, thanks for reading and please do not hesitate to critique or comment below.