Dask is a python library that provides a list of APIs for performing the computation in parallel on a single computer as well as a cluster of computers. We have already created tutorial on dask.bag API which is spark like API for parallelizing big data. Dask has other APIs like dask.delayed
, dask.dataframe
, dask.array
and dask.distributed
whose introduction we have covered in the previous tutorial. We suggest that you go through that tutorial to get basic information about the dask. We'll be specifically concentrating on dask.delayed
API as a part of this tutorial.
The dask.delayed
provides a very flexible API which lets us parallelize our python functions. It's very suitable for problems when it doesn't involve data structures like dask.array
or dask.dataframe
. The API of dask.delayed is simplified which lets us parallelize our python code very easily.
The dask.delayed
API is used to convert normal function to lazy function. When a function is converted from normal to lazy, it prevents function to execute immediately. Instead, its execution is delayed in the future. Dask can easily run these lazy functions in parallel.
The dask.delayed
API keeps on creating a directed acyclic graph of these lazy functions in the background. This graph keeps track of the execution sequence of these lazy functions which helps it understand which parts can be executed in parallel. When we evaluate this graph by calling compute()
method of dask.delayed API then it evaluates all functions by running them in parallel. We'll be visualizing graphs created by dask when explaining the usage of dask.delayed
API.
This ends our small introduction of dask.delayed
API. We'll be explaining its usage with few examples below.
So without further delay, let’s get started with the coding part.
We'll first import all the necessary libraries for a tutorial.
import dask
from dask import delayed, compute
import numpy as np
import pandas as pd
import time
import sys
import os
print("Python Version : ", sys.version)
print("Dask Version : ", dask.__version__)
dask.delayed
API ¶We can easily parallelize our normal python code using dask by following list of below steps:
normal
to lazy
by using dask.delayed
. All functions calling these lazy functions will also become lazy.Scenario 1:
You only need to execute one final function which calls a list of lazy functions. If this is the scenario then you can call compute() on final lazy function and it'll run all lazy functions it depends on in parallel.Scenario 2:
If there are more than one final lazy functions then we can use compute()
method of dask
. We need to pass all lazy functions to this method and it'll run all of them in parallel and returns results for all of them.We'll be explaining these steps below with few examples to make things clear. We'll be creating normal python functions which takes time to execute normally but runs faster when parallelized using dask.
Dask provides an optional dashboard which we can utilize to see the performance of operations getting performed in parallel. This step is optional if you don't want to analyze results but it can be very useful to debug things.
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=4)
client
We suggest that you pass n_workers
same as a number of cores on your PC and threads_per_worker
as 1 if you are running code on local.
You can open the URL mentioned next to the dashboard. It'll have a list of tabs that provides information like running time of the task, CPU usage, memory usage, directed graphs, etc. All this can be useful for analyzing parallel execution of tasks. You'll be able to see the whole directed acyclic graph of lazy functions as well in the Graph
section of the dashboard. They will be all marked blue
once that particular lazy function node has completed execution. If your function has many lazy functions then you'll be able to see changing their values in real-time to blue
when they complete.
Below we have created a simple python example which takes the square of two numbers and adds them. We have created a function for getting square of numbers. We have artificially added sleep time of 1 second to function to mimic a real-life situations where functions can take time to execute. We are calling this function 2 times to get a square of 2 different numbers before adding them. As per below pure python code, it executes square
function sequentially which results in taking 2 seconds for code to complete.
Please make a note that we'll be using %%time and %time jupyter notebook magic commands to measure execution of particular code section. It'll be used for performance comparison between normal python and dask parallelized python code.
%%time
def square(x):
time.sleep(1)
return x*x
x_sq = square(3)
y_sq = square(4)
res = x_sq + y_sq
res
We'll now try to convert the above example to the dask version so that it can run fast using parallelization. From the above code, we can easily notice that square
function takes time to execute. We can even notice that each execution of the square is totally independent of any other execution of the same function which makes it ideal for parallel execution.
We'll convert each square function execution to lazy
function by calling delayed
on them as explained below.
%%time
x_sq = delayed(square)(3)
y_sq = delayed(square)(4)
res = x_sq + y_sq
res
We can notice from above cell execution that it executed immediately. It's because of dask.delayed API has just converted normal square execution to lazy execution and has created an execution graph as of now. But it has not actually executed functions. We can see that the return type of res is Delayed
object instead of the result.
Please make a note on process of creating lazy function in above example. We need to pass original function name to delayed() method and then list of parameters in another parenthesis. Please feel free to go through this list of dask best practices which will prevent you from making common mistakes.
We can make dask execute code in parallel by calling compute()
method on the final res
object. It'll evaluate the graph and execute a square function in parallel.
%time res.compute()
We can notice from the above cell that it takes less time to compute the sum of squares using dask.delayed
API.
We can even visualize DAG created by dask by calling visualize()
method on the final res
object.
res.visualize()
Below we are explaining another usage of dask.delayed API. Here we are creating an array of random numbers of size 10x1000. We then want to compute the correlation between the first row of the array and all other rows. We have implemented this solution as a loop. We also have designed the function corr
which calculates correlation. We have added a time delay of 1 second into function execution to mimic real-life situations.
%%time
x = np.random.rand(10,1000)
def corr(x, y):
time.sleep(1)
return np.corrcoef(x, y)[0][1]
correlations = []
for row in x:
correlations.append(corr(x[0], row))
avg_corr = np.mean(correlations)
avg_corr
We can notice that the above code takes 10 seconds for execution because there are 10 rows in an array. We are computing a correlation between the first row and all other rows, all of which take 1 second.
We are below converting the above code to dask delayed code so that it runs in parallel. We have used delayed at two places this time. First on corr()
function and then on np.mean()
functions.
%%time
def corr(x, y):
time.sleep(1)
return np.corrcoef(x, y)[0][1]
correlations = []
for row in x:
correlations.append(delayed(corr)(x[0], row))
avg_corr = delayed(np.mean)(correlations)
avg_corr
Like the previous example, we can notice that the above code with lazy functions executes fast because actual execution of code has not happened yet.
We'll not call compute()
on final avg_corr
variable to complete execution.
%time avg_corr.compute()
We can notice that it completes execution is quite less time compared to normal sequential execution.
Below we are visualizing the execution graph of the above code.
avg_corr.visualize()
Below we are demonstrating usage of dask.delayed
API using the third example. We are first looping through number 1-10 taking the square of the number if the number is divisible by 2 else taking power 3 of number. We have defined function for taking a square and power 3 of numbers. We have also added time delay in function so that it looks like a real-life situation.
%%time
def square(x):
time.sleep(1)
return x*x
def power_3(x):
time.sleep(1)
return x*x*x
final_list = []
for i in range(1,11):
if i%2 == 0:
final_list.append(square(i))
else:
final_list.append(power_3(i))
res = sum(final_list)
res
We can see above that when we run above loop in parallel then it takes 10 seconds for it to complete. We can easily convert this code using dask.delayed
as explained below. This example demonstrates how conditional functions can also be parallelized using dask.
We have made use of @delayed
annotation this time instead of wrapping calls of function like previous examples. We can also use the annotation
above function rather than wrapping each call of function into delayed
.
%%time
@delayed
def square(x):
time.sleep(1)
return x*x
@delayed
def power_3(x):
time.sleep(1)
return x*x*x
final_list = []
for i in range(1,11):
if i%2 == 0:
final_list.append(square(i))
else:
final_list.append(power_3(i))
res = delayed(sum)(final_list)
res
%time res.compute()
res.visualize()
As a part of our 4th example, we'll be working with some files. We are creating 30 CSV files for 30 days of temperature data where the temperature is recorded every 5 seconds. We are first creating CSV files into a separate folder for each day. Each CSV file will have 17280 temperature values for that day recorded at every 5 seconds. We are filling temperature data with a random temperature between 1-50.
if not os.path.exists("temp_dataframes"):
os.mkdir("temp_dataframes")
for i in range(1,31):
dates= pd.date_range(start="1-%d-2020"%i, end="1-%d-2020"%(i+1), freq="5S")[:-1]
temp_df = pd.DataFrame(np.random.randint(1,50, 17280), index=dates, columns=["Temp"])
temp_df.to_csv("temp_dataframes/1-%d-2020.csv"%i)
Below we are looping through temperature file of each day, reading it, taking an average of that day, and adding it to list. We are then taking an average of all days combined. Our main aim of this example was to explain the usage of dask.delayed
API working with files.
%%time
def calc_mean(f_name):
df = pd.read_csv(f_name)
return df.mean().values[0]
avg_temp_per_day = []
for i in range(1,31):
avg_temp_per_day.append(calc_mean("temp_dataframes/1-%d-2020.csv"%i))
avg_temp = np.mean(avg_temp_per_day)
avg_temp
We can see that it takes nearly 12 seconds for the above code to complete.
We have converted it to dask code below which completes in nearly 3 seconds.
%%time
@delayed
def calc_mean(f_name):
df = pd.read_csv(f_name)
return df.mean().values[0]
avg_temp_per_day = []
for i in range(1,31):
avg_temp_per_day.append(calc_mean("temp_dataframes/1-%d-2020.csv"%i))
avg_temp = delayed(np.mean)(avg_temp_per_day)
avg_temp
%time avg_temp.compute()
Our fifth and last example is designed to explain the usage of compute()
method available directly from dask which takes an input list of lazy objects and can evaluate them.
Below we are looping through 1-10, creating an array of random numbers of size 100 between 1-100. We are then taking sum and count of numbers divisible by 2. We add all sums and counts to an array. We then sum the array of sums and divide it by sum of counts array to get average. We have added time delay to function so that it takes time to mimicking real-life situations.
%%time
def total_func(x):
time.sleep(0.5)
return sum([i for i in x if i%2==0])
def count_func(x):
time.sleep(0.5)
return len([i for i in x if i%2==0])
totals, counts = [], []
for i in range(10):
x = np.random.randint(1,100,1000)
total = total_func(x)
count = count_func(x)
totals.append(total)
counts.append(count)
avg_num = sum(totals) / sum(counts)
avg_num
Below we have converted the above example to dask.delayed
version. We have annotated both functions with @delayed
annotation. We have evaluated both totals_orig
and counts_orig
array which are list of delayed objects using compute()
function. We can see that this one takes nearly 1 second to complete compared to the previous version.
%%time
@delayed
def total_func(x):
time.sleep(0.5)
return sum([i for i in x if i%2==0])
@delayed
def count_func(x):
time.sleep(0.5)
return len([i for i in x if i%2==0])
totals_orig, counts_orig = [], []
for i in range(10):
x = np.random.randint(1,100,1000)
total = total_func(x)
count = count_func(x)
totals_orig.append(total)
counts_orig.append(count)
totals, counts = compute(totals_orig, counts_orig)
avg_num = sum(totals) / sum(counts)
avg_num
totals_orig
counts_orig
This ends our small tutorial explaining the usage of dask.delayed API for parallelizing python code. Please feel free to let us know your views in the comments section.
If you are more comfortable learning through video tutorials then we would recommend that you subscribe to our YouTube channel.
When going through coding examples, it's quite common to have doubts and errors.
If you have doubts about some code examples or are stuck somewhere when trying our code, send us an email at coderzcolumn07@gmail.com. We'll help you or point you in the direction where you can find a solution to your problem.
You can even send us a mail if you are trying something new and need guidance regarding coding. We'll try to respond as soon as possible.
If you want to