Spark Secondary Sort
MapReduce was first introduced by Google in 2004. It was their solution for processing large volumes of data(such as counting the number of backlinks for millions of pages) by distributing computation among thousands of machines. MapReduce systems are built around key-value data structures. They deliver data from a mapper machine to a reducer machine grouped by key. The idea behind MapReduce is fairly simple: it starts with mapping/transforming data into a format that fits the problem at hand, continues by shuffling/moving data between machines until all data which have to be processed at once reside on the same machine and finishes with a reduce/process phase during which the actual result is computed. All phases presented are executed on a large number of machines in parallel. If during a MapReduce execution, a machine fails, the entire process continues by reassigning that machine’s tasks and not restarting the full process. Not only that a single machine could not handle such data volumes, but running such a job on a single machine would necessarily mean the restart of the entire process in case of a failure.
Since 2004 multiple MapReduce like products were born which follow more or less the three MapReduce phases in order to process large volumes of data. Having multiple products that can solve the same range of problems, data engineers started putting together design patterns for the commonly occurring big data problems. Design patterns in computer science are templates which aim to solve recurring problems and ease communication between software engineers. One such pattern is secondary sort, the subject for this article. The purpose of the article is to present a secondary sort implementation using pySpark. Secondary sort is nothing more than sorting data by two values. What makes the problem challenging is sorting large volumes of data as fast as possible without running out of memory and thus crashing the machines. Thus, Secondary sort represents the process of grouping data by key and sorting the payloads within each group before running further computation[1].
Secondary Sort Design Pattern
One commonly occurring big data problem which can be tackled with secondary sort is sessionization. Sessionization is the process in which one tries to analyze all data which represent a session, such as all the actions executed by a user or all the events generated by a device[2].
To better illustrate how sessionization relates to secondary sort, one can take the dataset about taxi trips reported in Chicago:
Trip ID | Taxi ID | Trip Start Timestamp | Trip End Timestamp | ... |
b258df504d2fbf85... | b7b0be6d3ec6c589aeac84c0... | 07/19/2016 02:45:00 PM | 07/19/2016 02:45:00 PM | ... |
b258df6e5ace922... | 0d61c4f9c8cb8d280fce3887... | 01/22/2015 04:15:00 PM | 01/22/2015 04:30:00 PM | ... |
... | ... | ... | ... | ... |
Suppose that one has the task to compute the down time of each taxi cab since all time. A session can be mapped as follows: the taxi cab as the analyzed object and the trip series as the events over time(Fig. 1).
Fig. 1 - Sessions
More concrete, the analyzed object represented by Taxi ID will become the key, while the rest of the values(Trip ID, Trip Start Timestamp, Trip End Timestamp, etc.) representing the trip series will become the payload(Fig. 2). The problem can further be reduced to: group the dataset by the Taxi ID and sort ascending the payloads within each group according to the Trip Start Timestamp.
Fig. 2 - Sessionized Trips
After the sort is finished, one can loop over the payloads within each group and sum up the time deltas between each trip.
Fig. 3 - Delta computation
As one can see, the solution is reduced to a group by key and sort by another value, which matches exactly the definition of secondary sort.
Depending on the map reduce system, secondary sort can be implemented differently. There are two common strategies for implementing secondary sort[1]:
- in reducer sort - for some map reduce systems secondary sort relies on buffering data into each reducer and sorting each group in memory.
- in shuffle sort - other map reduce systems deliver data from mappers to reducers not only grouped by key, but also sorted by key. With this strategy, one can take advantage of the shuffle process to sort data by a second value before delivering it to the reducers.
In reducer sort
A simple and straightforward strategy is to let the system execute a standard map reduce job. Once the grouped data arrives at each reducer, it will get buffered and sorted in memory. This solution might perform better time wise, but it cannot scale because of the in memory sort. As soon as the group will exceed the reducer’s memory an Out of Memory exception is thrown and the job will fail.
In shuffle sort
As mentioned before, some map reduce systems also sort by key before delivering data to the reducers. In order to force the system to also sort by a second value, one has to create a composite key, thus a value from the payload is promoted and is appended to the key. The system shuffle will first sort by the first part of the key and then by the second. This approach of creating a composite key out of the initial key and another value from the payload is known as value to key conversion.
Fig. 4 - Value to key conversion
The advantage of using this strategy is that it leverages the system’s shuffle process in order to sort the payloads, which is very efficient memory wise. The downside is that extra code has to be written to tell the system to group data only by the first part of the composite key before sending it to the reducer.
Secondary sort in Spark
Sessionization can appear quite often when working on Spark projects. Fortunately the system supports implementing both strategies when it comes to secondary sort, which is why the rest of the article will explore the two strategies with concrete implementations using Spark Core.
Spark development environment
The dataset used for the Spark implementation is the one aforementioned about taxi trips in Chicago. From this dataset only the first 16353116 entries are used. These represent about 6.5G of data that will be grouped by Taxi ID and sorted by Trip Start Timestamp.
In order to use the Spark environment as smooth as possible, one can use this github repository for spark docker deployment in order to quickly run Spark jobs from PyCharm and visualize progress and history in browser. There is also a github repository for spark secondary sort containing the two solutions presented in this article and using the docker deployment previously mentioned.
To better difference between the two secondary sort solutions, some memory limits have been set for Spark[3]:
spark.driver.memory 512m
spark.executor.memory 512m
spark.executor.pyspark.memory 1g
spark.python.worker.memory 512m
The first two options limit the JVM memory, while the last two will limit the python processes. Because the solutions are implemented using python, each python process will only be able to use 1G of memory and will start spilling to disk as soon as it hits 512M. With this arrangement one can easily run the solutions on a modest local environment.
Spark reducer sort
Fig. 6 - Spark groupByKey
Spark supports the first solution for secondary sort without much boilerplate code, but with a penalty when it comes to memory consumption. Because the sort will be done at reducer level, each reducer has to be able to keep in memory the entire group while it sorts its values. In case of big datasets this might raise an Out Of Memory error.
The first thing one has to do is to create a pair RDD(Resilient Distributed Dataset) which will contain the key Taxi ID and a payload (Trip Start Timestamp, Trip End Timestamp):
def create_pair_rdd(ctx):
rawRDD = ctx.textFile(INPUT_FILE)
headerlessRDD = rawRDD.filter(lambda x: not x.startswith('Trip ID'))
rdd = headerlessRDD.map(lambda x: COMMA_DELIMITER.split(x))
validRDD = rdd.filter(lambda x: len(x[FIRST_KEY]) > 0 and len(x[SECOND_KEY]) > 0 and len(x[TRIP_END_TIMESTAMP]) > 0)
pairRDD = validRDD.map(make_pair)
compressedRDD = pairRDD.mapValues(lambda x: (x[SECOND_KEY], x[TRIP_END_TIMESTAMP]))
return compressedRDD
In the create_pair_rdd code snippet one can see that data is ingested from a file, the CSV header is dropped, each line is split by its separators, empty values which will be used in computations are dropped and the RDD row is mapped using make_pair. Lastly the payload resulted from make_pair is compress even more to only two values: Trip Start Timestamp and Trip End Timestamp. With a pair RDD in place, a Spark groupByKey can be applying. This operations will trigger the shuffle process, but also group data according to the key.
groupedRDD = rdd.groupByKey(numPartitions=4)
Next, the groups have to get sorted. With Spark mapValues one can sort the values within each group. Although the grouped values are received as iterators, by trying to sort them, they will get materialized flooding the reducer’s memory. Not materializing iterators is extremely important when working with Spark as one will see in the second implementation, there is a technique called iterator-to-iterator transformation which transforms data by keeping it as iterators in order to save space. For obvious reasons this cannot be applied while trying to sort the values from an iterator.
sortedRDD = groupedRDD.mapValues(sort_group)
In order to compute the total downtime for each taxi cab, one can call mapValues with calculate_loss on the grouped RDD.
def calculate_loss(entry):
group = entry
loss = 0
_, prev_end = group[0]
for item in group:
start, end = item
delta = datetime.strptime(start, '%m/%d/%Y %I:%M:%S %p').timestamp()\
- datetime.strptime(prev_end, '%m/%d/%Y %I:%M:%S %p').timestamp()
if delta > 0:
loss += delta
prev_end = end
return loss
...
lossRDD = sortedRDD.mapValues(calculate_loss)
Spark shuffle sort
Fig. 5 - repartitionAndSortWithinPartitions
In this section one can see a pySpark implementation for secondary sort using the system shuffle process. Spark 1.2 introduced repartitionAndSortWithinPartitions, which allows to repartition the entire dataframe according to a partitioner and also sort data within each partition according to a comparator. In other words, repartitionAndSortWithinPartitions will shuffle(move) all data having the same key to the same partition, while also sorting it according to a comparator. Because both partitioner and the sort comparator are customizable this is ideal for implementing secondary sort.
One just has to define a composite key, implement a partitioner which only uses the first part of the key and a sort comparator which uses the entire composite key. The system will then put all data that has the same partitioning key on the same partition, in the order given by the composite key.
repartitionAndSortWithinPartitions is not a group by operation. It will only move data having the same key to the same partition and sort it according to the comparator(Fig. 5). To actually transform the data into groups, one has to iterate through all items of each partition keeping track of when a group ends and the next one begins[4] as one will see shortly.
As before, the first thing one has to do is to create a pair RDD. For this implementation make_pair will return a pair having a composite key (Taxi ID, Trip Start Timestamp) and a payload, after which the payload is compressed to only two values: Trip Start Timestamp and Trip End Timestamp:
def create_pair_rdd(ctx):
rawRDD = ctx.textFile(INPUT_FILE)
headerlessRDD = rawRDD.filter(lambda x: not x.startswith('Trip ID'))
rdd = headerlessRDD.map(lambda x: COMMA_DELIMITER.split(x))
validRDD = rdd.filter(lambda x: len(x[FIRST_KEY]) > 0 and len(x[SECOND_KEY]) > 0 and len(x[TRIP_END_TIMESTAMP]) > 0)
pairRDD = validRDD.map(make_pair)
compressedRDD = pairRDD.mapValues(lambda x: (x[SECOND_KEY], x[TRIP_END_TIMESTAMP]))
return compressedRDD
Once there is a pair RDD in place, one can run repartitionAndSortWithinPartitions:
def partition_func(key):
return portable_hash(key[0])
def key_func(entry):
return entry[0], entry[1]
...
sortedRDD = rdd.repartitionAndSortWithinPartitions(partitionFunc=partition_func, numPartitions=4, keyfunc=key_func, ascending=True)
As discussed, repartitionAndSortWithinPartitions receives a partitioner(partition_func) and a sort comparator (keyfunc). One also can specify the number of partitions to use and the sorting order. The partitioner just uses the standard portable_hash function with the first part of the key, while the sort comparator just returns back the composite key. After the shuffling process invoked by repartitionAndSortWithinPartitions is over, one can iterate over each RDD partition in order to group the data as presented in the next code snippet:
def sorted_group(lines):
return itertools.groupby(lines, key=lambda x: x[0])
...
unpairedRDD = sortedRDD.map(unpair, preservesPartitioning=True)
groupedRDD = unpairedRDD.mapPartitions(sorted_group, preservesPartitioning=True)
One very important aspect to discuss before moving forward is why itertools.groupby was used and not a regular group by algorithm. Sure, the standard python library is elegant and it does the job of grouping the data, but there is more to it. The itertools.groupby, keeps the iterator received by sorted_group still as an iterator. Keeping it as an iterator and not materializing it to something else saves memory and prevents Spark from going out of memory. This strategy is known as iterator-to-iterator transformation and it has the advantage of chaining multiple data transformations delaying the actual evaluation, thus saving space and time[5]. Using the iterator-to-iterator transformation strategy, one can continue processing the secondary sort result, thus sorted_group just returns now a new iterator obtained from itertools.groupby. This transforms the partitions grouping data by key. Because this is kept as an iterator, nothing is evaluated yet, therefor no extra space is used.
For the last step, one can apply a map transformation on the fresh groups in order to iterate over them and compute the total down time.
def calculate_loss(entry):
key, group = entry
loss = 0
_, _, prev_end = next(group)
for item in group:
_, start, end = item
delta = datetime.strptime(start, "%m/%d/%Y %I:%M:%S %p").timestamp() - datetime.strptime(prev_end,
"%m/%d/%Y %I:%M:%S %p").timestamp()
if delta > 0:
loss += delta
prev_end = end
return key, loss
...
lossRDD = groupRDD.map(calculate_loss)
The map method receives calculate_loss, which just iterates over each group and computes the total downtime. Elements are accessed one by one, which again does not allocate extra space.
Although this implementation might not seem straight forward, it successfully uses the system shuffle process in order to sort the groups without running out of memory, in contrast with the previous solution where the sort is done in-memory.
Conclusion
In order to draw a conclusion one can check Spark’s job history running on port localhost:18080(if using the aforementioned github repository). If the total memory limit is increased to 2G, both solutions can run without raising an out of memory exception.
spark.executor.pyspark.memory 2g
The job metrics for both solutions are presented in Fig. 7 and they show that the groupByKey implementation is 25% faster than the repartitionAndSortWithinPartitions implementation.
Fig. 7 - Time comparison between repartitionAndSortWithinPartitions secondary sort and groupByKey secondary sort
One reason for it being faster is because it has to shuffle/move less data, as one can see in Fig. 8.
Fig. 8 - Memory comparison between repartitionAndSortWithinPartitions secondary sort and groupByKey secondary sort
Once the limit is set back to 1G, the repartitionAndSortWithinPartitions implementation delivers the expected result while the groupByKey implementation runs out of memory during the sort phase.
In comparison, the second strategy is more verbose and also requires more insight about how the map reduce system works, while the first one just relies on the fact that the map reduce system delivers data grouped by key. In practice the first strategy proves to be faster than the second one when running on small datasets, but runs out of memory once the dataset does not fit in the reducer’s memory.
References:
[1] Data Algorithms: Recipes for Scaling Up with Hadoop and Spark - Chapter 1. Secondary Sort: Introduction
[2] Advanced Analytics with Spark: Patterns for Learning from Data at Scale - Chapter 8. Geospatial and Temporal Data Analysis on New York City Taxi Trip Data
[3] https://issues.apache.org/jira/browse/SPARK-26679
[4] https://issues.apache.org/jira/browse/SPARK-3655
[5] High Performance Spark - Best Practices for Scaling and Optimizing Apache Spark - Chapter 5. Effective Transformations