Python module asyncio provides API which lets us write code that runs concurrently. We have already discussed in detail how we can use asyncio module to create tasks, run them in parallel, make tasks wait for other tasks, cancel tasks, etc. Here's a link to that tutorial.
This tutorial will build on that tutorial and explain the usage of synchronization primitives provided by asyncio module. The synchronization primitives are programming constructs that can be used to control access to share data/ execute a particular part of the code by multiple parallel tasks. They make sure that only a single task or a specified amount of tasks can only access shared data/execute a particular part of the code. Below is a list of synchronization primitives available with asyncio.
Please make a NOTE that our tutorial expects that reader has background knowledge of using asyncio to create tasks, cancel tasks, etc. If you are interested in learning about them then please check our tutorial on the same before beginning with this tutorial as it'll help you grasp the material of this tutorial soon.
As a part of our first example, we'll explain how we can use Lock primitive which can be used to control access to shared data so that when multiple tasks (coroutines) concurrently access shared data they do not end up creating data inconsistencies. We'll introduce our first example which will not be using lock and how it creates data inconsistencies. We'll then solve the data inconsistency problem created by concurrent access using the lock.
Our examples in this tutorial are inspired by our examples of Python threading module tutorial. Please feel free to check it if you are interested in learning in-depth about working with threads using Python.
Our code for this example starts by creating a coroutine named Raise which takes as input a single number. It then raises the global variable X to the power of the number given as input using for loop. It multiplies global variables by itself to raise them to the power given as input. It also sleeps for 2 seconds inside of for loop when doing multiplication. This will handover execution to a different routine. We are also printing the initial and end values of the global variable X.
Our main coroutine will start by creating three tasks each of which will call Raise with the argument of 3 asking it to raise global variable X to the power of 3. We are then waiting for the completion of each task. We are executing our main coroutine using asyncio.run() method. We are also noting the time of execution.
When we run the below script, each task concurrently accesses global variable X hence prints the initial value of it as 3. They then concurrently modify the value of X leaving it in an inconsistent state. Ideally, if tasks would have synchronously accessed X then its value after execution of the first task would be 27, after the execution of second tasks 19683 and after the execution of third tasks would be 7625597484987.
Please make a NOTE that we'll use terms tasks and coroutines interchangeably but both will refer to the same thing.
import asyncio
import time
from datetime import datetime
X = 3
async def Raise(y):
global X
print("Value of X Initially : {}".format(X))
X_init = X
for i in range(y-1):
await asyncio.sleep(2)
X *= X_init
print("Value of X After Raise : {}".format(X))
async def main():
task1 = asyncio.create_task(Raise(3), name="Raise1", )
task2 = asyncio.create_task(Raise(3), name="Raise2", )
task3 = asyncio.create_task(Raise(3), name="Raise3", )
await task1
await task2
await task3
print("Start Time : ", datetime.now(), "\n")
start = time.time()
asyncio.run(main())
print("\nEnd Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))
OUTPUT
Start Time : 2021-03-11 16:37:13.309355
Value of X Initially : 3
Value of X Initially : 3
Value of X Initially : 3
Value of X After Raise : 243
Value of X After Raise : 729
Value of X After Raise : 2187
End Time : 2021-03-11 16:37:17.315951
Total Time Taken : 4.006582736968994 Seconds
As a part of our this example, we are explaining how we can introduce lock to the previous example so that concurrent access to shared global variable can be controlled and inconsistencies can be prevented.
Our code for this example has modified the definition of Raise() coroutine. It now takes two arguments where the first argument is Lock instance and the second argument is number for raising global variable. We have then wrapped code inside of Raise() coroutine between call of acquire() method and release() method of Lock instance. This will make sure that the code inside between these two method calls will be executed by only one coroutine at a time. This will make sure that only one coroutine modifies the global variable X at a time.
Our main coroutine creates an instance of Lock and passes it to each task executing Raise.
When we run the below script, we can notice that it now lets only one coroutine access to global variable X and modify it. We can also notice that output is now correct when compared to the previous example.
import asyncio
import time
from datetime import datetime
X = 3
async def Raise(lock, y):
global X
acquired = await lock.acquire()
print("'{}' acquired lock? {}".format(asyncio.current_task().get_name(), acquired))
print("Value of X Initially : {}".format(X))
X_init = X
for i in range(y-1):
await asyncio.sleep(2)
X *= X_init
print("Value of X After Raise : {}".format(X))
lock.release()
async def main():
lock = asyncio.Lock()
task1 = asyncio.create_task(Raise(lock, 3), name="Raise1", )
task2 = asyncio.create_task(Raise(lock, 3), name="Raise2", )
task3 = asyncio.create_task(Raise(lock, 3), name="Raise3", )
await task1
await task2
await task3
print("Start Time : ", datetime.now(), "\n")
start = time.time()
asyncio.run(main())
print("\nEnd Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))
OUTPUT
Start Time : 2021-03-11 16:40:23.004775
'Raise1' acquired lock? True
Value of X Initially : 3
Value of X After Raise : 27
'Raise2' acquired lock? True
Value of X Initially : 27
Value of X After Raise : 19683
'Raise3' acquired lock? True
Value of X Initially : 19683
Value of X After Raise : 7625597484987
End Time : 2021-03-11 16:40:35.017621
Total Time Taken : 12.012864112854004 Seconds
Our code for this example is almost the same as our code for the previous example with only change that we have introduced the usage of locked() method.
We have introduced a while loop inside of Raise() coroutine which checks if the lock is locked and if it’s locked then it puts calling coroutine to sleep for two seconds. We are also printing messages to log information about which coroutines went to sleep and which one got access to the lock.
When we run the below script, we can notice from printed messages that how coroutines are trying every two seconds for accessing the lock.
import asyncio
import time
from datetime import datetime
X = 3
async def Raise(lock, y):
global X
while lock.locked():
print("'{}' tried to acquire lock but it was occupied. Going to sleep again.".format(asyncio.current_task().get_name()))
await asyncio.sleep(2)
acquired = await lock.acquire()
print("'{}' acquired lock? {}".format(asyncio.current_task().get_name(), acquired))
print("Value of X Initially : {}".format(X))
try:
X_init = X
for i in range(y-1):
await asyncio.sleep(2)
X *= X_init
except Exception as e:
print("Update Failed : {}".format(str(e)))
finally:
lock.release()
print("Value of X After Raise : {}".format(X))
async def main():
lock = asyncio.Lock()
task1 = asyncio.create_task(Raise(lock, 3), name="Raise1", )
task2 = asyncio.create_task(Raise(lock, 3), name="Raise2", )
task3 = asyncio.create_task(Raise(lock, 3), name="Raise3", )
await task1
await task2
await task3
print("Start Time : ", datetime.now(), "\n")
start = time.time()
asyncio.run(main())
print("\nEnd Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))
OUTPUT
Start Time : 2021-03-11 16:41:40.994181
'Raise1' acquired lock? True
Value of X Initially : 3
'Raise2' tried to acquire lock but it was occupied. Going to sleep again.
'Raise3' tried to acquire lock but it was occupied. Going to sleep again.
'Raise2' tried to acquire lock but it was occupied. Going to sleep again.
'Raise3' tried to acquire lock but it was occupied. Going to sleep again.
Value of X After Raise : 27
'Raise2' acquired lock? True
Value of X Initially : 27
'Raise3' tried to acquire lock but it was occupied. Going to sleep again.
'Raise3' tried to acquire lock but it was occupied. Going to sleep again.
Value of X After Raise : 19683
'Raise3' acquired lock? True
Value of X Initially : 19683
Value of X After Raise : 7625597484987
End Time : 2021-03-11 16:41:53.008151
Total Time Taken : 12.013993740081787 Seconds
Our code for this example is exactly the same as our code for example 1.2 with the only change that we are using Lock instance as a context manager. If we use Lock as a context manager then we don't need to call acquire() and release() methods.
When we run the below script the output is exactly the same as the output from example 1.2.
If you are interested in learning about context managers then please feel free to check our tutorial on the Python module named contextlib which provides a list of methods for creating context managers.
import asyncio
import time
from datetime import datetime
X = 3
async def Raise(lock, y):
global X
async with lock:
print("'{}' acquired lock? {}".format(asyncio.current_task().get_name(), lock.locked()))
print("Value of X Initially : {}".format(X))
X_init = X
for i in range(y-1):
await asyncio.sleep(2)
X *= X_init
print("Value of X After Raise : {}".format(X))
async def main():
lock = asyncio.Lock()
task1 = asyncio.create_task(Raise(lock, 3), name="Raise1", )
task2 = asyncio.create_task(Raise(lock, 3), name="Raise2", )
task3 = asyncio.create_task(Raise(lock, 3), name="Raise3", )
await task1
await task2
await task3
print("Start Time : ", datetime.now(), "\n")
start = time.time()
asyncio.run(main())
print("\nEnd Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))
OUTPUT
Start Time : 2021-03-11 16:42:43.291451
'Raise1' acquired lock? True
Value of X Initially : 3
Value of X After Raise : 27
'Raise2' acquired lock? True
Value of X Initially : 27
Value of X After Raise : 19683
'Raise3' acquired lock? True
Value of X Initially : 19683
Value of X After Raise : 7625597484987
End Time : 2021-03-11 16:42:55.306481
Total Time Taken : 12.015046834945679 Seconds
As a part of our second example, we'll explain how we can use Condition primitive for communication between coroutines. We can make coroutines wait for some condition and when that condition is fulfilled by another coroutine then it wakes up some/all coroutines which were waiting for that condition to fulfill. The Condition primitive is very useful for communication between coroutines.
Please make a NOTE that wait() method releases the lock which was acquired when waiting for the condition. When the condition is fulfilled, all notified coroutines try to acquire the lock and one of them succeeds to acquire it and proceed.
Our code for this example has 2 coroutines.
Our main coroutine starts by creating a condition instance. It then creates five tasks each of which executes set_item() method with random numbers in the range 1-50. We then create 5 other tasks which execute process_item() method. We keep track of all 10 tasks and make the main task wait for the completion of all tasks.
At last we execute main coroutine using asyncio.run() method and recode time.
When we run the below script, we can notice that how tasks are communicating with each other using condition primitive. All set tasks wait if the global variable is set. All process tasks wait if the global variable is not set. When set task sets a global variable, it notifies all single waiting process tasks about it so that it can process the set value.
import asyncio
import time
from datetime import datetime
import random
X = None
async def process_item(condition_var):
await condition_var.acquire() ######## Acquired Lock
global X
while X == None:
print("Task : '{}' tried to process item but it was not set. It'll wait for the condition.".format(asyncio.current_task().get_name()))
await condition_var.wait()
print("Task : '{}' processing an item : {}".format(asyncio.current_task().get_name(), X))
X = None
condition_var.release() ######## Released Lock
async def set_item(condition_var, value):
global X
while X != None:
print("Task : '{}' tried to set item but its already set. It'll go to sleep for 2 seconds now.".format(asyncio.current_task().get_name()))
await asyncio.sleep(2)
await condition_var.acquire() ######## Acquired Lock
X = value
print("Task : '{}' setting an item : {}".format(asyncio.current_task().get_name(), X))
condition_var.notify(n=1)
condition_var.release() ######## Released Lock
async def main():
condition_var = asyncio.Condition()
set_tasks = []
for i in range(5):
task = asyncio.create_task(set_item(condition_var, random.randint(1,50)), name="SetItem%d"%(i+1))
set_tasks.append(task)
process_tasks = []
for i in range(5):
task = asyncio.create_task(process_item(condition_var, ), name="ProcessItem%d"%(i+1))
process_tasks.append(task)
## Make main task wait for all other tasks (5 set items + 5 process items) to complete
for task in set_tasks + process_tasks:
await task
print("Start Time : ", datetime.now(), "\n")
start = time.time()
asyncio.run(main())
print("\nEnd Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))
OUTPUT
Start Time : 2021-03-11 16:44:38.840444
Task : 'SetItem1' setting an item : 10
Task : 'SetItem2' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem1' processing an item : 10
Task : 'ProcessItem2' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem3' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem2' setting an item : 4
Task : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem2' processing an item : 4
Task : 'SetItem3' setting an item : 48
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem3' processing an item : 48
Task : 'SetItem4' setting an item : 40
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem4' processing an item : 40
Task : 'SetItem5' setting an item : 45
Task : 'ProcessItem5' processing an item : 45
End Time : 2021-03-11 16:44:46.850631
Total Time Taken : 8.010211706161499 Seconds
Our code for this example is almost same as our code from the previous example with only change that we are explaining the usage of wait_for() in this example. We have created a method named is_global_var_set() which returns True if X is not set else False. We have removed while loop from process_item() coroutine and have used wait_for() method instead. We have also created our own Lock instance and have passed it to Condition. The remaining code is exactly the same as the previous one.
import asyncio
import time
from datetime import datetime
import random
X = None
def is_global_var_set():
return X != None
async def process_item(condition_var):
await condition_var.acquire() ######## Acquired Lock
global X
await condition_var.wait_for(is_global_var_set)
#print("Task : '{}' tried to process item but it was not set. It'll wait for the condition.".format(asyncio.current_task().get_name()))
print("Task : '{}' processing an item : {}".format(asyncio.current_task().get_name(), X))
X = None
condition_var.release() ######## Released Lock
async def set_item(condition_var, value):
global X
while X != None:
print("Task : '{}' tried to set item but its already set. It'll go to sleep for 2 seconds now.".format(asyncio.current_task().get_name()))
await asyncio.sleep(2)
await condition_var.acquire() ######## Acquired Lock
X = value
print("Task : '{}' setting an item : {}".format(asyncio.current_task().get_name(), X))
condition_var.notify(n=1)
condition_var.release() ######## Released Lock
async def main():
lock = asyncio.Lock()
condition_var = asyncio.Condition(lock=lock)
set_tasks = []
for i in range(5):
task = asyncio.create_task(set_item(condition_var, random.randint(1,50)), name="SetItem%d"%(i+1))
set_tasks.append(task)
process_tasks = []
for i in range(5):
task = asyncio.create_task(process_item(condition_var, ), name="ProcessItem%d"%(i+1))
process_tasks.append(task)
## Make main task wait for all other tasks (5 set items + 5 process items) to complete
for task in set_tasks + process_tasks:
await task
print("Start Time : ", datetime.now(), "\n")
start = time.time()
asyncio.run(main())
print("\nEnd Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))
OUTPUT
Start Time : 2021-03-11 16:53:54.595776
Task : 'SetItem1' setting an item : 12
Task : 'SetItem2' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem1' processing an item : 12
Task : 'SetItem2' setting an item : 25
Task : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem2' processing an item : 25
Task : 'SetItem3' setting an item : 18
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem3' processing an item : 18
Task : 'SetItem4' setting an item : 36
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem4' processing an item : 36
Task : 'SetItem5' setting an item : 45
Task : 'ProcessItem5' processing an item : 45
End Time : 2021-03-11 16:54:02.605656
Total Time Taken : 8.009908437728882 Seconds
Our code for this example is exactly the same as our code of example 2.1 with the only change that we are using Condition instance as a context manager. As we are using the condition as a context manager, we don't need to call acquire() and release() methods. It'll be called by the methods of context manager.
import asyncio
import time
from datetime import datetime
import random
X = None
async def process_item(condition_var):
async with condition_var: ######## Acquired Lock
global X
while X == None:
print("Task : '{}' tried to process item but it was not set. It'll wait for the condition.".format(asyncio.current_task().get_name()))
await condition_var.wait()
print("Task : '{}' processing an item : {}".format(asyncio.current_task().get_name(), X))
X = None
######## Released Lock
async def set_item(condition_var, value):
global X
while X != None:
print("Task : '{}' tried to set item but its already set. It'll go to sleep for 2 seconds now.".format(asyncio.current_task().get_name()))
await asyncio.sleep(2)
async with condition_var: ######## Acquired Lock
X = value
print("Task : '{}' setting an item : {}".format(asyncio.current_task().get_name(), X))
condition_var.notify(n=1)
######## Released Lock
async def main():
lock = asyncio.Lock()
condition_var = asyncio.Condition(lock=lock)
set_tasks = []
for i in range(5):
task = asyncio.create_task(set_item(condition_var, random.randint(1,50)), name="SetItem%d"%(i+1))
set_tasks.append(task)
process_tasks = []
for i in range(5):
task = asyncio.create_task(process_item(condition_var, ), name="ProcessItem%d"%(i+1))
process_tasks.append(task)
## Make main task wait for all other tasks (5 set items + 5 process items) to complete
for task in set_tasks + process_tasks:
await task
print("Start Time : ", datetime.now(), "\n")
start = time.time()
asyncio.run(main())
print("\nEnd Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))
OUTPUT
Start Time : 2021-03-11 16:54:28.646660
Task : 'SetItem1' setting an item : 30
Task : 'SetItem2' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem1' processing an item : 30
Task : 'ProcessItem2' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem3' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem2' setting an item : 6
Task : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem2' processing an item : 6
Task : 'SetItem3' setting an item : 17
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem3' processing an item : 17
Task : 'SetItem4' setting an item : 41
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem4' processing an item : 41
Task : 'SetItem5' setting an item : 3
Task : 'ProcessItem5' processing an item : 3
End Time : 2021-03-11 16:54:36.659216
Total Time Taken : 8.01259970664978 Seconds
Our code for this example is exactly same as our code for the previous example with only change that we are calling notify_all() method instead of notify() this time.
import asyncio
import time
from datetime import datetime
import random
X = None
async def process_item(condition_var):
async with condition_var: ######## Acquired Lock
global X
while X == None:
print("Task : '{}' tried to process item but it was not set. It'll wait for the condition.".format(asyncio.current_task().get_name()))
await condition_var.wait()
print("Task : '{}' processing an item : {}".format(asyncio.current_task().get_name(), X))
X = None
######## Released Lock
async def set_item(condition_var, value):
global X
while X != None:
print("Task : '{}' tried to set item but its already set. It'll go to sleep for 2 seconds now.".format(asyncio.current_task().get_name()))
await asyncio.sleep(2)
async with condition_var: ######## Acquired Lock
X = value
print("Task : '{}' setting an item : {}".format(asyncio.current_task().get_name(), X))
condition_var.notify_all()
######## Released Lock
async def main():
lock = asyncio.Lock()
condition_var = asyncio.Condition(lock=lock)
set_tasks = []
for i in range(5):
task = asyncio.create_task(set_item(condition_var, random.randint(1,50)), name="SetItem%d"%(i+1))
set_tasks.append(task)
process_tasks = []
for i in range(5):
task = asyncio.create_task(process_item(condition_var, ), name="ProcessItem%d"%(i+1))
process_tasks.append(task)
## Make main task wait for all other tasks (5 set items + 5 process items) to complete
for task in set_tasks + process_tasks:
await task
print("Start Time : ", datetime.now(), "\n")
start = time.time()
asyncio.run(main())
print("\nEnd Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))
OUTPUT
Start Time : 2021-03-11 16:55:02.354445
Task : 'SetItem1' setting an item : 33
Task : 'SetItem2' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem1' processing an item : 33
Task : 'ProcessItem2' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem3' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem2' setting an item : 21
Task : 'SetItem3' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem2' processing an item : 21
Task : 'ProcessItem3' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem3' setting an item : 11
Task : 'SetItem4' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem3' processing an item : 11
Task : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Task : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem4' setting an item : 15
Task : 'SetItem5' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem4' processing an item : 15
Task : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem5' setting an item : 34
Task : 'ProcessItem5' processing an item : 34
End Time : 2021-03-11 16:55:10.364366
Total Time Taken : 8.009950637817383 Seconds
As a part of our third example, we'll demonstrate the usage of Semaphore primitive which lets a specified number of coroutines access shared resources. This can be used in a situation where shared resources can be modified/access concurrently by a specified number of coroutines (Like 5 tasks can concurrently access DB at a time.).
Our code for this has created three coroutines.
We are then running main() coroutine through asyncio.run() method. We have also recorded running times.
When we run the script, we can notice from the output that only 3 tasks are able to access global variable X through use_connection() coroutine at a time. Even though we have created 10 tasks, only 3 will be executing use_connection() at a time.
import asyncio
import time
from datetime import datetime
import random
X = ["Connection1", "Connection2", "Connection3"]
async def use_connection():
conn_obj = X.pop(0)
_ = [i*i for i in range(5000000)] ### Using CPU to give impression of using connection object.
print("Task : '{}' uses '{}' object for transferring data.".format(asyncio.current_task().get_name(), conn_obj))
X.append(conn_obj)
async def process_item(resources):
await resources.acquire() ## Code inside acquire() and release() can be accesses by 3 tasks only at a time.
print("Task : '{}' acquired resource. Semaphore : {}".format(asyncio.current_task().get_name(), resources._value))
await use_connection()
print("Task : '{}' released resource. X : {}".format(asyncio.current_task().get_name(), X))
resources.release()
async def main():
resources = asyncio.Semaphore(value=3)
tasks = []
for i in range(10):
task = asyncio.create_task(process_item(resources), name="ProcessItem%d"%(i+1), )
tasks.append(task)
## Make main task wait for all tasks (10 process items) to complete
for task in tasks:
await task
print("Start Time : ", datetime.now(), "\n")
start = time.time()
asyncio.run(main())
print("\nEnd Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))
OUTPUT
Start Time : 2021-03-11 16:48:57.942096
Task : 'ProcessItem1' acquired resource. Semaphore : 2
Task : 'ProcessItem1' uses 'Connection1' object for transferring data.
Task : 'ProcessItem1' released resource. X : ['Connection2', 'Connection3', 'Connection1']
Task : 'ProcessItem2' acquired resource. Semaphore : 2
Task : 'ProcessItem2' uses 'Connection2' object for transferring data.
Task : 'ProcessItem2' released resource. X : ['Connection3', 'Connection1', 'Connection2']
Task : 'ProcessItem3' acquired resource. Semaphore : 2
Task : 'ProcessItem3' uses 'Connection3' object for transferring data.
Task : 'ProcessItem3' released resource. X : ['Connection1', 'Connection2', 'Connection3']
Task : 'ProcessItem4' acquired resource. Semaphore : 2
Task : 'ProcessItem4' uses 'Connection1' object for transferring data.
Task : 'ProcessItem4' released resource. X : ['Connection2', 'Connection3', 'Connection1']
Task : 'ProcessItem5' acquired resource. Semaphore : 2
Task : 'ProcessItem5' uses 'Connection2' object for transferring data.
Task : 'ProcessItem5' released resource. X : ['Connection3', 'Connection1', 'Connection2']
Task : 'ProcessItem6' acquired resource. Semaphore : 2
Task : 'ProcessItem6' uses 'Connection3' object for transferring data.
Task : 'ProcessItem6' released resource. X : ['Connection1', 'Connection2', 'Connection3']
Task : 'ProcessItem7' acquired resource. Semaphore : 2
Task : 'ProcessItem7' uses 'Connection1' object for transferring data.
Task : 'ProcessItem7' released resource. X : ['Connection2', 'Connection3', 'Connection1']
Task : 'ProcessItem8' acquired resource. Semaphore : 2
Task : 'ProcessItem8' uses 'Connection2' object for transferring data.
Task : 'ProcessItem8' released resource. X : ['Connection3', 'Connection1', 'Connection2']
Task : 'ProcessItem9' acquired resource. Semaphore : 2
Task : 'ProcessItem9' uses 'Connection3' object for transferring data.
Task : 'ProcessItem9' released resource. X : ['Connection1', 'Connection2', 'Connection3']
Task : 'ProcessItem10' acquired resource. Semaphore : 2
Task : 'ProcessItem10' uses 'Connection1' object for transferring data.
Task : 'ProcessItem10' released resource. X : ['Connection2', 'Connection3', 'Connection1']
End Time : 2021-03-11 16:49:01.810761
Total Time Taken : 3.868617296218872 Seconds
Our code for this example is exactly the same as our code for the last example with the only change that we are using Semaphore instance as context manager inside of process_item() coroutine. This frees up from calling acquire() and release() methods on it.
The output of running this script is almost exactly the same as our previous example.
import asyncio
import time
from datetime import datetime
import random
X = ["Connection1", "Connection2", "Connection3"]
async def use_connection():
conn_obj = X.pop(0)
_ = [i*i for i in range(5000000)] ### Using CPU to give impression of using connection object.
print("Task : '{}' uses '{}' object for transferring data.".format(asyncio.current_task().get_name(), conn_obj))
X.append(conn_obj)
async def process_item(resources):
async with resources: ## Code inside acquire() and release() can be accesses by 3 tasks only at a time.
print("Task : '{}' acquired resource. Semaphore : {}".format(asyncio.current_task().get_name(), resources._value))
await use_connection()
print("Task : '{}' released resource. X : {}".format(asyncio.current_task().get_name(), X))
async def main():
resources = asyncio.Semaphore(value=3)
tasks = []
for i in range(10):
task = asyncio.create_task(process_item(resources), name="ProcessItem%d"%(i+1), )
tasks.append(task)
## Make main task wait for all tasks (10 process items) to complete
for task in tasks:
await task
print("Start Time : ", datetime.now(), "\n")
start = time.time()
asyncio.run(main())
print("\nEnd Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))
OUTPUT
Start Time : 2021-03-11 16:50:03.476742
Task : 'ProcessItem1' acquired resource. Semaphore : 2
Task : 'ProcessItem1' uses 'Connection1' object for transferring data.
Task : 'ProcessItem1' released resource. X : ['Connection2', 'Connection3', 'Connection1']
Task : 'ProcessItem2' acquired resource. Semaphore : 2
Task : 'ProcessItem2' uses 'Connection2' object for transferring data.
Task : 'ProcessItem2' released resource. X : ['Connection3', 'Connection1', 'Connection2']
Task : 'ProcessItem3' acquired resource. Semaphore : 2
Task : 'ProcessItem3' uses 'Connection3' object for transferring data.
Task : 'ProcessItem3' released resource. X : ['Connection1', 'Connection2', 'Connection3']
Task : 'ProcessItem4' acquired resource. Semaphore : 2
Task : 'ProcessItem4' uses 'Connection1' object for transferring data.
Task : 'ProcessItem4' released resource. X : ['Connection2', 'Connection3', 'Connection1']
Task : 'ProcessItem5' acquired resource. Semaphore : 2
Task : 'ProcessItem5' uses 'Connection2' object for transferring data.
Task : 'ProcessItem5' released resource. X : ['Connection3', 'Connection1', 'Connection2']
Task : 'ProcessItem6' acquired resource. Semaphore : 2
Task : 'ProcessItem6' uses 'Connection3' object for transferring data.
Task : 'ProcessItem6' released resource. X : ['Connection1', 'Connection2', 'Connection3']
Task : 'ProcessItem7' acquired resource. Semaphore : 2
Task : 'ProcessItem7' uses 'Connection1' object for transferring data.
Task : 'ProcessItem7' released resource. X : ['Connection2', 'Connection3', 'Connection1']
Task : 'ProcessItem8' acquired resource. Semaphore : 2
Task : 'ProcessItem8' uses 'Connection2' object for transferring data.
Task : 'ProcessItem8' released resource. X : ['Connection3', 'Connection1', 'Connection2']
Task : 'ProcessItem9' acquired resource. Semaphore : 2
Task : 'ProcessItem9' uses 'Connection3' object for transferring data.
Task : 'ProcessItem9' released resource. X : ['Connection1', 'Connection2', 'Connection3']
Task : 'ProcessItem10' acquired resource. Semaphore : 2
Task : 'ProcessItem10' uses 'Connection1' object for transferring data.
Task : 'ProcessItem10' released resource. X : ['Connection2', 'Connection3', 'Connection1']
End Time : 2021-03-11 16:50:06.624505
Total Time Taken : 3.1477339267730713 Seconds
As a part of our fourth example, we'll explain how we can use Event primitive for communication between tasks/coroutines. The Event is a very simple primitive to use for communication based on some events.
Our code for this example has created three coroutines.
Our code then runs main() coroutine using asyncio.run() method. We have also recorded time of running.
When we run the below script, we can notice from the output that how tasks are coordinating with each other using Event instance. The process tasks wait until the global variable is set by the set task. Once a global variable is set, the event instance awakes all process item tasks and one of them gets to process the global variable. All others go to wait again waiting for a set task to again set a global variable. The set task also goes to sleep until the global variable's value is processed by one of the process item tasks. It sets the value as soon as it wakes up and the global variable's value is processed.
import asyncio
import time
from datetime import datetime
import random
X = None
async def process_item(event, lock):
async with lock:
while not event.is_set():
print("Task : '{}' tried to process item but it was not set. It'll wait for the condition.".format(asyncio.current_task().get_name()))
await event.wait()
global X
print("Task : '{}' processing an item : {}".format(asyncio.current_task().get_name(), X))
X = None
event.clear()
async def set_item(event, lock):
global X
for i in range(5):
while event.is_set():
print("Task : '{}' tried to set item but its already set. It'll go to sleep for 2 seconds now.".format(asyncio.current_task().get_name()))
await asyncio.sleep(2)
X = random.randint(1,50)
event.set()
print("Task : '{}' setting an item : {}".format(asyncio.current_task().get_name(), X))
async def main():
event = asyncio.Event()
lock = asyncio.Lock()
set_task = asyncio.create_task(set_item(event, lock), name="SetItem", )
process_tasks = []
for i in range(5):
task = asyncio.create_task(process_item(event, lock), name="ProcessItem%d"%(i+1))
process_tasks.append(task)
## Make main task wait for all tasks (1 set items + 5 process items) to complete
for task in [set_task] + process_tasks:
await task
print("Start Time : ", datetime.now(), "\n")
start = time.time()
asyncio.run(main())
print("\nEnd Time : ", datetime.now())
print("\nTotal Time Taken : {} Seconds".format(time.time() - start))
OUTPUT
Start Time : 2021-03-11 16:52:23.133419
Task : 'SetItem' setting an item : 26
Task : 'SetItem' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem1' processing an item : 26
Task : 'ProcessItem2' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem' setting an item : 1
Task : 'SetItem' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem2' processing an item : 1
Task : 'ProcessItem3' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem' setting an item : 47
Task : 'SetItem' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem3' processing an item : 47
Task : 'ProcessItem4' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem' setting an item : 14
Task : 'SetItem' tried to set item but its already set. It'll go to sleep for 2 seconds now.
Task : 'ProcessItem4' processing an item : 14
Task : 'ProcessItem5' tried to process item but it was not set. It'll wait for the condition.
Task : 'SetItem' setting an item : 39
Task : 'ProcessItem5' processing an item : 39
End Time : 2021-03-11 16:52:31.145038
Total Time Taken : 8.01164984703064 Seconds
This ends our small tutorial explaining how we can use synchronization primitives available from asyncio module when creating concurrent code with async/await syntax. Please feel free to let us know your views in the comments section.
If you are more comfortable learning through video tutorials then we would recommend that you subscribe to our YouTube channel.
When going through coding examples, it's quite common to have doubts and errors.
If you have doubts about some code examples or are stuck somewhere when trying our code, send us an email at coderzcolumn07@gmail.com. We'll help you or point you in the direction where you can find a solution to your problem.
You can even send us a mail if you are trying something new and need guidance regarding coding. We'll try to respond as soon as possible.
If you want to