How we learned to love Dask and achieved a 40x speedup

Targomo
9 min readNov 29, 2021

Written by Daniel Jahn, spatial data engineer at Targomo.

Geospatial applications frequently require processing large amounts of data. From Microsoft’s Planetary Computer to NASA’s Earth Observation System, the tool of choice for many such applications has been Dask, a Python library for parallel computing.

Today, Dask is an established and widely used library. Dask plays a crucial role in scaling out Python workflows, within the geospatial world and outside.

Yet, despite its wide adoption, many users are struggling to use Dask effectively. As an example, here is a frequently echoed sentiment from the 2021 Dask Summit:

The problem I need to solve is embarrassingly parallel, yet my pipeline does not parallelize well and blows up in memory.

We have experienced this exact problem. Our problem was easily parallelizable. We expected to run our pipeline, open the Dask diagnostic dashboard, and see all threads independently whizzing through the data partitions:

Instead, we were welcomed by threads trudging along, transferring data amongst each other and hitting memory limits:

You may feel lost in the dashboard here. So did we. If that’s the case, keep reading!

The pipeline ran sluggishly, was unstable, crashed or deadlocked, and eventually ran out of memory.

The first dashboard example above is our pipeline now. This is a story of how we’ve achieved it.

Problem statement

At Targomo, we deal with location data.

One of the best sources of insight is population movement data.

In its basic form, you can imagine movement data as a simple dataframe containing coordinates and a timestamp for each point.

Our goal was to turn each user’s trajectory into a set of stay points: locations where the user stayed for a given period of time.

Visualization of all stay points in Germany, Austria, and Switzerland. Do not try to interpret.

After an initial investigation into which technology would fit our problem, we came up with two requirements for our solution:

  • The data volume is likely to grow in the future so the solution must be able to scale well.
  • We require a highly customizable workflow based on a well-established data stack

This led us to choose Dask. We have used Dask dataframes previously for one-off computations, treating it naively as “parallel Pandas”. The ability to utilize any Python code and to speed up the critical parts using libraries such as Numba and Cython seemed to be exactly what we needed.

We thus set out to build the core functionality of the pipeline in Dask, profiling our code using small data samples.

However, as soon as we started testing on larger datasets, it quickly became clear that something is gravely wrong. Instead of a scalable fast pipeline, we had built an unstable, slow, memory-hungry colossus.

The problem, however, was not Dask. The problem was our lack of understanding of it.

Diagnosing the problems

Apart from its powerful machinery for scaling computations, Dask also provides you with detailed diagnostics through its dashboard.

At the same time, it’s easy to get overwhelmed by the amount of information the dashboard provides, especially when the pipeline runs truly badly.

Many parts of the pipeline hardly seemed optimal. We saw a lot of red data transfers. At times, Dask needed to spill data to disk (orange), slowing down significantly. Many parts of the pipeline had large chunks of white space, meaning no actual computation was done at the given thread at that time.

Yet none of these seemed to quite explain the incredibly slow performance and did not even begin to explain the unexpected memory blowups.

And for a good reason: they were mere symptoms of deeper problems.

Problem 1: Hidden bottlenecks

Take a look a the following task stream of a groupby().apply() operation (in purple). How efficient does it seem?

All threads seem to be working on a nicely parallelized task. Nothing seems to be wrong here.

However, when we used Dask’s profiling, we found out that most time is spent calling DataFrame.values.

Note: You can bring up a task’s profiling by clicking on its block in the task stream

In fact, multiple places in our pipeline exhibited similar behavior. Functions that should constitute a small overhead were instead dominating the computation time.

After some investigation, we have found that removing a single preprocessing function, split_by_time_jump, drastically improved this situation.

This method dealt with the fact that users often stopped emitting any points for long periods of time. The users were not staying anywhere — we were simply missing data. To address that, split_by_time_jump re-numbered the index to start treating each trajectory at its own group.

However, this introduced a large number of small groups, leading to performance problems.

To illustrate this point, consider the following example: a and b have the same total length, but doing many small conversions is much slower than doing one large conversion.

This brings us to the general lesson: a large number of groups in groupby().apply() might bring an unexpected overhead.

What can be done about this?

The solution in our case was to keep the original groups, and simply force all methods to be aware of the possibility of the time-jump.

However, such a strategy is not always an option. There are other solutions one might consider in such cases:

  • Write the aggregation as a custom Dask Aggregation.
  • Distribute the aggregating logic through map_partition (Also see Problem 3 below).

Compare the previous stream to that of the analogous operation in the optimized pipeline. Notice the units on the horizontal scale: the purple groupby().apply() now takes only seconds, compared to the minute above.

Problem 2: Eager computation

Something curious happens at several points in the pipeline — the calculation seems to re-set and begin again:

This happens because some operations in the pipeline are not lazy — they are executed immediately.

Even worse: they also then cause any preceding computations to run, effectively doubling the work.

To understand sources of non-laziness, let’s first have a look at how Dask manages to perform most operations fully lazily.

A Dask dataframe does not contain actual data. Instead, it merely keeps track of all the operations needed to compute it, its current structure and the divisions of the partitions.

If we perform a simple operation such df['z'] = df['x'] + df['y'], Dask merely adds the operations to the graph and keeps track of the new column in _meta.

However, if we perform a more complex operation, such as df.set_index('name'), we run into a problem. Dask uses the dataframe's sorted index to organize its partitions. Not knowing what name contains, Dask does not know what the divisions would be after set_index. Without divisions, Dask cannot set up subsequent lazy operations.

Dask solves this by computing the divisions eagerly.

Note that the resulting dataframe was never computed — only its divisions.

# This would still require the actual values to be computed
df.compute()

However, there is a way around it. We can tell Dask what the divisions are beforehand and prevent any computation from happening.

divisions = (845, 980, 1019, 1163)
df = df.set_index('id', divisions=divisions) # Fully lazy again

Now you’re asking “That’s great, but where do I get the divisions from?”

There are multiple possibilities, here are some:

  • You can come up with the divisions manually. For example, if your dataframe’s index is a timestamp then you could partition by day.
  • You can use another similar dataframe’s divisions.
  • You can compute the divisions on a subset of the dataframe once and then reuse them each time.

In our case, we had an eager set_index twice in our pipeline and it turns out we could do without both eager computations.

First occurred during the re-numbering. As noted above, we could completely do without this operation.

Second occurred after transforming the user’s points into stay points. It was not clear how to split the new dataframe. However, we figured we can simply reuse the same partitions the original dataframe had.

Problem 3: Misunderstanding Dask groupby

At this point, we have significantly improved the performance, but we have done nothing to improve the memory consumption. This was especially painful since the pipeline was extremely memory hungry, often crashing with out of memory errors. Even worse: there could be no talk of scaling the computation to larger datasets since the memory usage scaled with the dataset size.

Dask errors suggested possible memory leaks. This led us to a long journey of investigating possible sources of unmanaged memory, worker memory limits, Parquet partition sizes, data spilling, specifying worker resources, malloc settings, and many more.

In the end, the problem was elsewhere: Dask dataframe’s groupby method functions entirely differently than we’ve expected.

Here’s an example to illustrate this. Let’s set up a dataframe indexed by name to simulate our problem.

import dask
df = dask.datasets.timeseries(partition_freq='15d')
df = df.set_index('name').persist()

If we wish to see the sum of x per name, we can use groupby.

result = df.groupby('name')['x'].sum()

This seems like a simple enough operation. We are grouping on the index, so there is no shuffling of the data involved. However, looking at the task graph uncovers an interesting fact.

result.visualize(size='4')

Even though the partitions are completely separate, Dask creates a graph where they end up interrelated.

This is the crux of the issue: the groupby operation creates an artificial dependency, forcing Dask to compute everything before the final operation.

Digging deeper into the problem, we’ve discovered the parameter split_out, which splits the result of groupby into multiple partitions.

df.groupby('name')['x'].sum(split_out=2).visualize(size='4')

But, as we can see from the task graph, this does nothing to alleviate the dependency problem.

The actual solution is surprisingly simple: perform the groupby operation using map_partitions.

Here, each partition is completely separated.

We applied this solution to multiple places in our pipeline. The pipeline again ran about 3x faster, but, most importantly, it now ran completely in parallel, processing the data partition by partition.

To conclude, here is a summary of the performance improvements and their impact to speed and memory consumption.

Conclusion

In retrospect, the solutions were simple. But, without deeper knowledge of how Dask operates, they were hidden in plain sight.

Dask Dataframes are great at providing a simple Pandas-like API. But its simplicity can be deceiving: large machinery hides behind its operations. As much as the clean API tries to hide this fact, one often requires a detailed understanding of what actually happens under the hood to use Dask effectively.

The key is to stop expecting a “parallelized NumPy/Pandas” and instead dive deeper into how Dask works.

The good news is that this has never been easier. What’s more, Dask developers are currently focusing on documentation and intermediate Dask usage examples.

We recommend starting with Parallel and Distributed Computing in Python with Dask and then continuing right onto Hacking Dask: Diving into Dask’s Internals. For more, see the resources box below.

Dask resources

Join the community:

Reading the codebase:

Thanks: Adam Roberts, Gideon Cohen, Adela Nguyễn, Matěj Kovář.

--

--

Targomo

Combining location analytics & AI, we help organizations generate data-based insights and forecasts to boost performance. www.targomo.com