Dask is commonly used for data processing in parallel compute.
However I wanted to quickly explore using dask
for parallel processing of generic python functions.
Full script available at: https://github.com/stanton119/data-analysis/tree/master/tools_python/parallel_processing/dask_vs_multiprocessing.py
This can apply to functions like making http requests etc.. If we need to make a series of requests, running those within a for loop etc. would be fairly slow as we wait for a response for each call before moving on.
For this quick example we can make a http request, or to make it more fair we can mock that with a simple sleep command:
def fetch_results(date):
date_str = date.strftime("%Y-%m-%d")
r = requests.get(
f"https://api.carbonintensity.org.uk/intensity/date/{date_str}"
)
return r.json()
def fetch_results_mock(date):
time.sleep(0.2)
return date
fcn = fetch_results
params = pd.date_range(datetime(2020, 1, 1), periods=20).tolist()
For a baseline we can run the above query with a for loop
and also the map
command to iterate through the list of dates sequentially.
# for loop
t1 = timeit.default_timer()
results_for = list()
for param in params:
results_for.append(fcn(param))
t2 = timeit.default_timer()
t_for = t2 - t1
print(f"For loop: {t_for:.1f}")
# map
t1 = timeit.default_timer()
results_map = list(map(fcn, params))
t2 = timeit.default_timer()
t_map = t2 - t1
print(f"Map: {t_map:.1f}")
These do the same thing so no surprises that they return similar times:
For loop: 4.1
Map: 4.1
Now let’s parallelise the calls. We could typically use multiprocessing
to split out the jobs and combine them at the end.
Multiprocessing requires some particular code design, which can make it slightly more cumbersome when doing interactive exploration.
Also to note, multiprocessing
doesn’t seem to work well with Jupyter sessions/ipython - StackOverflow.
# multiprocessing
mp = multiprocessing.Pool(8)
t1 = timeit.default_timer()
results_mp = mp.map(fcn, params)
t2 = timeit.default_timer()
t_mp = t2 - t1
print(f"Multiprocessing: {t_mp:.1f}")
By running in parallel with 8 nodes we see a speed up:
Multiprocessing: 1.6
Using dask
seemed a bit easier.
We use the dask.delayed
function call to take in a generic python function.
This allows it to be distributed to the various dask
nodes.
We can then call it as normal with the list of dates.
In this case I’m running it locally should the nodes will spin up when needed locally.
Dask uses delayed computation, so the processing only takes place when we need the results.
Here I wanted them straight back, so we call the dask.compute
function to start the processing.
# dask
t1 = timeit.default_timer()
results_dask = list(map(dask.delayed(fcn), params))
results_dask = dask.compute(results_dask)[0]
t2 = timeit.default_timer()
t_dask = t2 - t1
print(f"Dask: {t_dask:.1f}")
We similarly get a good improvement in time compared to the sequential processing.
Dask: 0.6
The difference between multiprocessing
and dask
may not be too repeatable.
The time spent processing in either case will depend on the number of nodes we spin up.
The start the multiprocessing pool can take some time so I left it out the timing loop.
Dask may do this at import or not, if it does not, then it seems to be quite quick…
In summary - to setup parallel work in dask
is easy, and more straight forward than multiprocessing
.