Dask is a parallel processing library that provides various APIs for performing parallel processing in a different way on different types of data structures. We has already discussed about dask APIs like dask.bag, dask.delayed, dask.distributed, etc in separate tutorials. We have also covered a basic introduction of all APIs in our dask.bag tutorial. As a part of this tutorial, we are going further and introducing other important topics like global variables
, locks
, and publish/subscribe patterns
which are commonly used in parallel processing. This kind of primitives lets work coordination happen between workers and clients. Dask provides each of these coordination primitives which lets us handle shared data without it getting corrupted when shared between various processes. We'll be discussing each one with various examples below.
Below is a list of classes available in dask.distributed
the package which we'll be discussing as a part of this tutorial:
We'll be going through each of these classes and explain each with examples.
We'll start by creating dask.distributed
the cluster which will hold a list of workers for running tasks in parallel.
dask.distributed
Cluster ¶We can create a dask cluster on a local machine by creating a client object as described below. It'll create consisting of workers the same as a number of cores on a computer. Please refer to our tutorial on dask.distribtued if you want to know different ways to create clusters. We'll be using this client object to submit tasks to the worker in clusters.
from dask.distributed import Client
client = Client()
client
We generally declare constant variables as global variables in python which can be accessed by any part of an application. We might need global variables in parallel processing applications as well. Dask provides us with a way to keep global variables using Variable
class from dask.distributed
. We'll be explaining below its the usage with examples. It provides get()
and set()
which lets us get and set values of global variable.
from dask.distributed import Variable
Below we have declared global variable y
which can be accessed by any method running in parallel on dask workers. We have defined a method named slow_pow()
which raises the number passed to it to the power of value set in the global variable. We loop through 1-10 and call slow_pow()
to get the power of 5 for each number in parallel.
global_var = Variable(name="y")
global_var.set(5)
import time
def slow_pow(x):
time.sleep(1)
y = global_var.get()
return x**y
%%time
futures = []
for i in range(1,11):
future = client.submit(slow_pow, i)
futures.append(future)
[f.result() for f in futures]
Below we have designed another example demonstrating usage of the global variables. Here we first scatter the value of numpy array using scatter()
method which distributes values to all workers and has got reference future to it. We have set this reference future as a global variable that can be evaluated on all workers to get the value of an array.
import numpy as np
arr = np.arange(1,11)
future = client.scatter(arr, broadcast=False)
global_var = Variable(name="array")
global_var.set(future)
%%time
def slow_add(x):
time.sleep(1)
future = global_var.get()
return x + future.result()[x-1]
futures = []
for i in range(1,11):
future = client.submit(slow_add, i)
futures.append(future)
[f.result() for f in futures]
Queues are another data structure available in dask which can be used to share data between workers or workers and clients. We can create a queue by using the Queue
class of dask.distributed
module. It let us declare the name of the queue as well as specify the maximum size of the queue. If we don't specify max size then it let us grow the queue as long as we want. We can use a queue to pass any kind of data like int, float, dict, list, etc.
Queues are managed by schedulers hence everything passed between workers or clients 7 workers will route through the scheduler. Queues are not ideal for moving a long amount of data. It’s well suited for a small amount of data or futures (futures can point to a large amounts of data).
from dask.distributed import Queue
Below we have declared queue which we'll be using to collect error messages when the function fails. We have created a function named slow_pow()
which accepts two arguments x and y. It raises x to the power of y and returns the result. If it fails then we capture the error and put error details into the queue.
We then execute function 10 times from 1-10 passing it index as x and y as 5 if an index is even else string 5 if an index is odd. We then loop through futures and when we can't get results we retrieve error detail from the queue.
queue = Queue(name="Exceptions")
def slow_pow(x, y):
try:
time.sleep(1)
return x ** y
except Exception as e:
queue.put("{} ** {} Failed : {}".format(x,y, str(e)))
return None
futures = []
for i in range(1,11):
f = client.submit(slow_pow, i, 5 if i%2==0 else "5")
futures.append(f)
for f in futures:
res = f.result()
if not res:
print(queue.get())
else:
print(res)
Locks are very useful primitive when you are working with shared data. Dask provides a lock future that works exactly like threading.Lock
. We can declare lock using Lock
class of dask.distributed
module. We can restrict access to a particular part of code to only one worker at a time which can prevent concurrent updates. We'll explain the usage of the lock below with examples.
from dask.distributed import Lock
Below we have created an example where we try to access the global variable in a method which runs on workers. We then try to modify the value of that global variable as well. We have divided it into 2 sections where we run code without lock primitive and with lock primitive to show a difference.
Below we have declared a method named update_i()
which accesses the global variable and sets its value. We have then called this method 10 times on different workers. We want that each one gets the last updated value. But when we print results we can see that all of them are printing the same which was not expected.
variable = Variable(name="shared_data")
variable.set(1)
def update_i():
variable.set(variable.get() + 1)
return variable.get()
futures = []
for i in range(1,11):
f = client.submit(update_i)
futures.append(f)
client.gather(futures)
As a part of this section, we have first declared a lock. We then passed this lock to method update_i()
. We have covered section where we update the global variable in lock using context manager. This will make sure that only one worker can access this part of the code at one time. This will make all other workers wait while the previous work is done. We then execute the same code as last time and can notice that it returns values that are incremented by 1 from previous values even though they are not in sequence.
lock = Lock(name="PreventConcurentUpdates")
variable.set(1)
def update_i(lock):
with lock:
variable.set(variable.get() + 1)
return variable.get()
futures = []
for i in range(1,11):
f = client.submit(update_i, lock)
futures.append(f)
client.gather(futures)
Below we have explained another example where we try to access queue data without lock and with lock. We can see that without lock all of the workers seem to return same data. But when we wrap code into lock context manager then only one worker can access data at a time hence properly takes one value from the queue. We can see that with lock, all of the outputs are different as all were able to access queue one by one.
queue = Queue(name="shared_data")
for i in range(1, 11):
queue.put(i)
def get_queue_data():
return queue.get()
futures = []
for i in range(1,11):
f = client.submit(get_queue_data)
futures.append(f)
client.gather(futures)
queue = Queue(name="shared_data")
for i in range(1, 11):
queue.put(i)
def get_queue_data(lock):
with lock:
return queue.get()
futures = []
for i in range(1,11):
f = client.submit(get_queue_data, lock)
futures.append(f)
client.gather(futures)
Dask provides implementation of publish-subscribe pattern. We can declare publish and subscribe to objects using Pub
and Sub
class of dask.distributed
. We can create a topic by setting the topic name as a string to published and create subscribers with the same topic name. The subscriber will be subscribed to the publisher with that topic name.
We can declare as many subscribers as we want and link them to the publisher with a particular topic name. Publishers and subscribers find each other using the scheduler but do not use it for passing data between them. They can pass data between each other once they find each other using the scheduler. Publisher subscriber is ideal to use when you want workers to communicate with each other and don't want data to be passed through scheduler which can be slow as well as a bottleneck.
from dask.distributed import Sub, Pub
Below we have declared publisher with the topic name ClientIdTopic
and have declared two subscribers to this topic. We also have declared a method named publish_to_topic()
which will publish to the topic by with that worker’s id passed to it as an argument. We then try to retrieve data published by calling subscribers. We can call either get()
or next()
method on subscriber to get data.
pub = Pub(name="ClientIdTopic")
sub1 = Sub(name="ClientIdTopic")
sub2 = Sub(name="ClientIdTopic")
def publish_to_topic(x):
time.sleep(1)
pub.put("Worker ID : %d"%x)
futures = []
for i in range(1,11):
f = publish_to_topic(i)
futures.append(f)
for i in range(10):
print(sub1.get())
sub2.get()
sub2.next()
Please make a note that if you call get() or next() methods on subscriber but there is not data published to topic then it'll stop execution of any further statement after it waiting for publisher to publish anything to topic so that it can get that value and return.
This ends our small tutorials on coordination primitives in dask.distributed API. Please feel free to let us know your views in the comments section.
If you are more comfortable learning through video tutorials then we would recommend that you subscribe to our YouTube channel.
When going through coding examples, it's quite common to have doubts and errors.
If you have doubts about some code examples or are stuck somewhere when trying our code, send us an email at coderzcolumn07@gmail.com. We'll help you or point you in the direction where you can find a solution to your problem.
You can even send us a mail if you are trying something new and need guidance regarding coding. We'll try to respond as soon as possible.
If you want to