Optimizing slow Group By aggregations in Spark: From 20 Hours to 40 minutes
Apache Spark is a very popular engine for running complex distributed data pipelines. Sometimes when using Spark, we need to tune our logic in order to get the best performance. That process sometimes reveals Spark’s “inner workings.” At Explorium, we learned about Spark’s EXPAND command while investigating a query over 1 billion records that failed after 20 hours of run time.
In this article, I will guide you through how to improve slow group by aggregations on top of billions of records, especially when using GROUPING SETS, COUNT DISTINCT, CUBE and ROLLUP; and how to debug your Spark execution plan in order to find issues.
Spark Execution Plan: Debugging Spark’s data query
When we submit a query or dataframe action, Spark’s optimizer, Catalyst, generates an execution plan for computing the data. Catalyst knows how to translate the SQL query into logical and physical operations that will eventually compute the data.
If you experience a slow group by or any other slow command in Spark, I strongly recommend you check the Spark UI.
The Spark UI enables you to monitor Spark execution progress of tasks, stages and jobs. In the Spark UI, you can go into the SQL tab and see the Directed Acyclic Graph (DAG) of a specific job, along with how many rows have already passed through a specific operation.
Inside the job, you can see multiple stages of the job and where the process is stuck.
What was my issue with Group By, and how did I solve it?
In one of our projects at Explorium, we had a Spark job that took an input of about 1 billion records. The computation was super slow and failed after more than 20 hours.
When we looked at the Spark UI, we saw that calling a GROUP BY on top of 1 billion records was the cause of the slowness.
The query looked like this (column names have been replaced):
1 select 2 column_a, 3 column_b, 4 column_c, 5 column_d, 6 count( 7 distinct case 8 when (m.column_e = 1) then id 9 else null 10 end 11 ) count_distinct_column_a, 12 count( 13 distinct case 14 when (m.column_f = 1) then id 15 else null 16 end 17 ) count_distinct_column_b, 18 count( 19 distinct case 20 when (m.column_g = 1) then id 21 else null 22 end 23 ) count_distinct_column_c, 24 count( 25 distinct case 26 when (m.column_h = 1) then id 27 else null 28 end 29 ) count_distinct_column_d, 30 ..... (Additional 60+ Count distinct expressions) 31 32 from my_table 33 group by 34 column_a, 35 column_b, 36 column_c, 37 column_d
Basically, we grouped by some columns in the table and then computed multiple count distinct over some expressions.
By checking the job ID and the execution plan in the DAG, we figured out that the issue was in this query:
We can see that Spark tries to Hash Aggregate the data, which means it is doing a group by on top of the data.
When we looked at the SQL Tab, we noticed that Spark uses a command called EXPAND, which for some reason generated 300 billion records. That did not make sense, since we knew the input amount of records to the group by was 1 billion.
We saw that the progress of the task when the data passed through EXPAND was super slow. Therefore, we understood we should investigate more to learn why Spark is using EXPAND, and if we can replace it.
Spark EXPAND command
So what is this EXPAND command, and when it is used?
EXPAND is not very well-known, and Spark employs it in specific cases. We had to look into Spark’s source code in order to get more information about this operation.
Expand - Apply all of the GroupExpressions to every input row, hence we will get * multiple output rows for an input row.
Spark executes this command when the user has GROUPING SETS, CUBE and ROLLUP in their logic; and also when using multiple COUNT DISTINCT as part of a group by.
We had dozens of COUNT DISTINCT operations, which caused Spark to EXPAND.
EXPAND is basically duplicating the data for each COUNT DISTINCT / GROUPING SETS, CUBE and ROLLUP combination.
In our project, Spark decided to use EXPAND and since we had a lot of COUNT DISTINCT in our logic, it ended up generating 300 billion records. That was too much for Spark to process in a reasonable amount of time.
So what did we do to bypass EXPAND?
The EXPAND cannot be disabled, and it is used by Spark’s optimizer, Catalyst. In order to overcome this issue, we had to refactor our logic to make sure our COUNT DISTINCT operations could be replaced with SUM operations.
We changed the logic to make sure the expressions are already distinct, and then we could change this expression. For example, going from this:
1COUNT( 2 distinct case 3 when (m.dummy_column = 1) then id 4 else null 5 end 6 ) some_count
1SUM(case when dummy_column = 1 then 1 else 0 end ) as some_count
Once we did that for all of the COUNT DISTINCT expressions, Spark changed the execution plan of the query and removed the usage of EXPAND.
Result: We re-ran the Spark job and it finished in 40 minutes!
Spark optimization is a very important skill that can help you improve your Spark jobs. Even if the result is not as dramatic as going from an unfinished 20 hour query to a completed query in 40 minutes, learning how to use Spark UI and identify issues in Spark’s execution plan is an important way to level up your skills and solve more complicated problems on ever-bigger data sets.