Dask is a parallel computing library in python. It provides a bunch of API for doing parallel computing using data frames, arrays, iterators, etc very easily. Dask APIs are very flexible that can be scaled down to one computer for computation as well as can be easily scaled up to a cluster of computers. Python already has a list of libraries for doing parallel computing like multiprocessing
, concurrent.futures
, threading
, pyspark
, joblib
, ipyparallel
, etc. All of these libraries have some kind of limitations that are nicely tackled by dask APIs. We'll be introducing various APIs available in dask for introduction purpose but our main concentration as a part of this tutorials will be dask.bag
API.
Below we have given a list of APIs available in dask.
pyspark
which lets us work on a list of items or iterators while applying some functions on each item. This API is commonly used when working with big data. We can combine more than one function by applying them one by one on a list of values. This API divides list/iterators, works on list of values in parallel, and then combines them. pandas
data frames but it can handle quite big data frames and do computation on them in parallel.numpy
array but it can handle very large arrays as well as perform computation on them in parallel.lazy evaluation
and creates the computation graph
of each computation in sequence. It then optimizes these computations based on graph when lazy objects are evaluated. Delayed objects do not complete computation immediately when called, instead, they evaluate when they are explicitly called to evaluate. This, in turn, evaluates all delayed objects in the graph in parallel.concurrent.futures
which is a flexible API to submit tasks to threads, processes and even on clusters. It lets us submit tasks that can be run in parallel on one computer or cluster of computers.We'll be focusing on dask.bag
API as a part of this tutorial. It provides bunch methods like map
, filter
, groupby
, product
, max
, join
, fold
, topk
etc. The list of all possible methods with dask.bag
API can be found on this link. We'll explain their usage below with different examples. We can also combine these methods on our iterator/list of values to perform complicated computations.
dask.bag
API is commonly used when working with unstructured data like JSON files, text files, log files, etc. It's almost similar to pyspark
which can benefit a person having a bit of spark
background.
Benefit: One of the benefits of dask is that it sidelines GIL when working on pure python objects and hence fasten the parallel computations even more.
Drawback: The main drawback of dask.bag
API is that it's not suitable when data needs to be passed between multiple processes/workers. It can work embarrassingly fast when data passing is as minimal as possible between workers/processes. In short, it's not suitable for a situation when processes communicate a lot for computations.
This ends our small intro of dask. We'll be exploring various functions available through API now.
So without further delay, let’s get started with the coding part.
We'll start by importing all the necessary libraries.
import dask
import dask.bag as db
import sys
print("Python Version : ", sys.version)
print("Dask Version : ", dask.__version__)
dask.bag
API. ¶The usage of dask.bag API involves a list of steps which are commonly used to perform complicated computations in parallel which are as below:
from_sequence()
, from_delayed()
or from_url()
.map()
, filter()
, groupby()
, etc one by one on lazy dask bag object created from step 1.compute()
method on final bag object from step 2 which was created after calling all operations.Please make a note that dask creates directed graphs of lazy objects when you call methods on it one after another from step 1 & 2 above and will only evaluate and run all methods when compute() from step 3 is called on final lazy object. When compute() method is called on final object than it'll run all operations in parallel and return final result.
Dask provides an optional dashboard which we can utilize to see the performance of operations getting performed in parallel. This step is optional if you don't want to analyze results but it can be very useful to debug things.
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=4)
client
We suggest that you pass n_workers
same as a number of cores on your PC and threads_per_worker
as 1 if you are running code on local.
You can open the URL mentioned next to the dashboard. It'll have a list of tabs that provides information like running time of the task, CPU usage, memory usage, directed graphs, etc. All this can be useful for analyzing parallel execution of tasks.
We'll first create lazy objects using methods available from dask.bag API.
from_sequence()
to turn list/iterators into Lazy Dask Bag Objects¶The from_sequence()
method is commonly used as a starting point in converting a list of operations into dask compatible operations so that they run in parallel. The from_sequence()
method accepts a list of values, iterators and converts it to a lazy dask bag object consisting of a list of values which will go input to the next methods called on it.
bag1 = db.from_sequence(range(1000))
bag1
By default from_sequence()
will divide data into 100 partitions. We can also explicitly pass a number of values to keep in each partition as well as partition size by setting npartitions
and partition_size
parameter.
bag2 = db.from_sequence(range(1000000), partition_size=1000, npartitions=1000)
bag2
We'll check size of bag
object using sys.getsizeof()
which returns the size of the object in bytes.
sys.getsizeof(bag1), sys.getsizeof(bag2)
We can see that both have a size of 56 bytes even though input to both are different size lists.
We'll now apply a list of commonly available functions to perform various computations on the list of values. These methods will also generate another lazy dask bag object.
Below are list of commonly used operations:
bag_object.map(function):
It'll apply function passed to map
on all individual entry of bag_object.bag_object.filter(condition):
It'll check condition passed to filter
on all individual entry of bag_object and only keep entries which satisfies condition.bag_object.product(another_bag):
It calculates cross product of both bags and creates another bag of that values.bag_object.max():
It returns maximum from list.bag_object.min():
It returns minimum from list.bag_object.accumulate():
It takes as input binary function which operates on two input values and returns one value. This value is given as input as first parameter in next iteration.bag_object.count():
It returns number of values in a bag object.bag_object.sum():
It returns sum of all values of list.bag_object.std():
It returns standard deviation.bag_object.frequencies():
- It returns frequency of each value in bag.bag_object.groupby():
- It groups all values in list based on some key specified. We can then perform operations on these grouped values.bag_object.join():
- It joins one list with another list based on key specified. It merges values where key matches.bag_object.topk():
- It joins one list with another list based on key specified. It merges values where key matches.Please make a note that the above steps will also create another lazy bag object only. It'll only perform actual computation when we call compute()
on the final bag object.
final_bag1 = bag1.map(lambda x: x*2)
final_bag1
final_bag2 = bag2.filter(lambda x: x%100 == 0)
final_bag2
compute()
on Final Bag Object to Perform Computation in Parallel ¶The final step to actually perform computation in parallel and return result is to call compute()
method on the final bag object. We'll call compute() on both of our final objects created in the previous step.
len(final_bag1.compute())
final_bag2.compute()[:10]
We can evaluate bag objects from step 1 and it'll return the actual list. We can even directly call the method list
passing bag object and it'll also return all values.
final_list = bag1.compute()
print("Size : %d bytes"%sys.getsizeof(final_list))
print("Length of Values : ", len(final_list))
list(bag1)[:10]
We'll now explain the usage of various ways to use dask.bag
API with examples.
map()
& filter()
Usage ¶Below we have created one simple example which loops through 10Mn numbers takes the square of each and only keeps number which is divisible by 100. We have first implemented it with a loop in pure python and then converted it to the dask version.
final_list = []
for i in range(1000000):
x = i*2
if x%100 == 0:
final_list.append(x)
final_list[:10]
bag1 = db.from_sequence(range(1000000))
result = bag1.map(lambda x: x*2).filter(lambda x : x%100 == 0)
result.compute()[:10]
map()
& filter()
usage ¶Below we have explained another example where we have a loop inside the loop and we are taking values where the loop index is not the same and summing that indexes up. We have first introduced it as normal for loop and then dask version as well.
final_list = []
for i in range(10):
for j in range(10):
if i != j:
final_list.append(i+j)
final_list[:10]
bag1 = db.from_sequence(range(10))
bag2 = db.from_sequence(range(10))
result = bag1.product(bag2)\
.filter(lambda x : x[0]!=x[1])\
.map(lambda x : x[0]+x[1])
result.compute()[:10]
sum()
& mean()
usage ¶Below we have explained another example where we are creating an array of random numbers of size 100x100, looping through each row of the array, taking every 5th element, summing them up, and adding final summed value to list. We are then taking the mean of that list of summed numbers. We have explained example with both normal python loop and using dask.bag
API as well.
import numpy as np
rnd_state = np.random.RandomState(100)
x = rnd_state.randn(100,100)
result = []
for arr in x:
result.append(arr[::5].sum())
final_result = sum(result) / len(result)
print(final_result)
bag1 = db.from_sequence(x)
result = bag1.map(lambda x: x[::5].sum()).mean()
final_result = result.compute()
print(final_result)
accumulate()
usage ¶Below we are explaining accumulate()
function of dask.bag
API. The accumulated function takes as input binary function which operates on two inputs and returns output which is given as input to the next iteration. It
import itertools
list(itertools.accumulate(range(100)))[:10]
bag1 = db.from_sequence(range(100))
result = bag1.accumulate(lambda x, y : x+y).compute()
result[:10]
distinct()
usage ¶Below we are showing another example that can help us remove duplicates from a list of values based on some key. We have implemented it first using pure python and then presented a dask version of the same.
final_dict = {}
for key, val in [("a",100), ("b",200), ("c",300), ("d",400), ("e",500), ("a",200), ("e",300)]:
if key not in final_dict:
final_dict[key] = val
list(final_dict.items())
bag1 = db.from_sequence([("a",100), ("b",200), ("c",300), ("d",400), ("e",500), ("a",200), ("e",300)])
bag1.distinct(key=lambda x: x[0]).compute()
frequencies()
usage ¶Below we have given example where we are explaining the usage of frequencies()
method of dask.bag
API. We are looping through 1000 random number between 1-100, taking only numbers which are divisible by 5 and counting frequencies of each. We have implemented both normal python and dask.bag API versions of code.
from collections import Counter
x = np.random.randint(1,100, 1000)
result = []
for i in x:
if i % 5 == 0:
result.append(i)
list(Counter(result).items())[:10]
bag1 = db.from_sequence(x)
bag1.filter(lambda x: x%5 == 0).frequencies().compute()[:10]
groupby()
usage ¶Below we are explaining the usage of groupby()
where we are looping through a list of two values tuple. We want to take a sum of the second value in tuples where the first value is the same. We have explained it below with both normal python and dask.bag
API version.
x = [("a",100), ("b",200), ("c",300), ("d",400), ("e",500), ("a",200), ("e",300)]
result = {}
for key, val in x:
if key in result:
result[key] += val
else:
result[key] = val
list(result.items())
bag1 = db.from_sequence([("a",100), ("b",200), ("c",300), ("d",400), ("e",500), ("a",200), ("e",300)])
bag1.groupby(lambda x: x[0]).map(lambda x: (x[0], sum([i[1] for i in x[1]]))).compute()
join()
usage ¶Below we are explaining the usage of join()
function of dask.bag API where we have lists of tuples and we want to sum up values in both lists where first values of tuple matches. We have implemented both pure Python and dask.bag
API versions for an explanation.
x = [("a",100), ("b",200), ("c",300), ("d",400), ("e",500)]
y = [("a",150), ("b",250), ("c",350), ("d",450), ("e",550)]
result = {}
for key, val in x+y:
if key in result:
result[key] += val
else:
result[key] = val
list(result.items())
bag1 = db.from_sequence(x)
bag1.join(y, lambda x: x[0]).map(lambda x: (x[0][0], sum([val[1] for val in x]))).compute()
topk()
usage ¶Below we are explaining the usage of topk()
method of dask.bag API. We are using the same example as the last step but getting 2 tuples where 2nd value is highest in the list.
x = [("a",100), ("b",200), ("c",300), ("d",400), ("e",500)]
y = [("a",150), ("b",250), ("c",350), ("d",450), ("e",550)]
result = {}
for key, val in x+y:
if key in result:
result[key] += val
else:
result[key] = val
sorted(result.items(), key=lambda x : x[1], reverse=True)[:2]
bag1 = db.from_sequence(x)
bag1.join(y, lambda x: x[0])\
.map(lambda x: (x[0][0], sum([val[1] for val in x])))\
.topk(2, key=lambda x: x[1])\
.compute()
x = np.random.randint(1,100,1000)
sorted(x)[-2:]
bag1 = db.from_sequence(x)
bag1.topk(2).compute()
It's normally advisable to save the output of the bag to save it to another file after computation is complete. It might happen that output after performing compute()
is very big and can not be held in memory than its better to save it to disk and then verify results.
Dask bag provides a list of methods for converting the output to another format and saving it to various file formats. Below are three methods available for converting a bag of values to another format and saving it.
to_avro()
- It can be used to save a bag of values as Avro files.to_dataframe()
- It can be used to convert bag of values to dask data frames which is available through the dask.dataframe
module and lets us work on pandas data frames in parallel.to_textfiles()
- It can be used to save a bag of values as text files.This ends our small tutorial on dask.bag API. Please feel free to let us know your views in the comments section.
Below is list of other python libraries for performing the computation in parallel on a single computer.
If you are more comfortable learning through video tutorials then we would recommend that you subscribe to our YouTube channel.
When going through coding examples, it's quite common to have doubts and errors.
If you have doubts about some code examples or are stuck somewhere when trying our code, send us an email at coderzcolumn07@gmail.com. We'll help you or point you in the direction where you can find a solution to your problem.
You can even send us a mail if you are trying something new and need guidance regarding coding. We'll try to respond as soon as possible.
If you want to