When working with threads in a multithreaded environment, it’s quite common to access and modify shared data multiple threads. The process of modifying shared data concurrently can leave shared data in an inconsistent state if access to it is not synchronized. Python provides a list of lock primitives like Lock, RLock, and Semaphore which can be used to wrap the code which is modifying the shared data to make sure that only one thread executes the code hence only one updates data at a time. It's quite easy to use these primitives to prevent part of the code from getting concurrent access. This will require us to add code each time we are accessing and modifying shared data which can result in bugs if locks are not handled properly (acquired and released in sequence). Python provides a module named queue which handles concurrent access to queue data structure by using lock semantics for us. We don't need to worry about handling concurrent threads from accessing and modifying queue data structure as it'll be handled by the module itself internally. If you want to use queue data structure in a multithreading environment then it’s recommended to use queue module as its thread-safe.
The queue module provides three different kinds of queues to work with a multithreading environment.
Above mentioned class makes sure that threads access is synchronized. As a part of this tutorial, we'll be introducing how we can use these three kinds of queues for storing data in a multithreading environment where multiple threads will concurrently access and modify data stored in them. We'll be explaining the usage of these queue with simple and easy to understand examples.
Please make a note that all examples below are run with Python 3.9.1 and the output sequence can significantly change if run more than once or on the different operating systems due to their internal thread handling.
As a part of our first example, we'll explain with a simple example how we can use Queue to create a FIFO queue and access its data through multiple threads. We can create a FIFO queue by just creating an instance of Queue class. It has an optional parameter named maxsize which accepts the size of the queue. If we give 0 or negative value to this parameter then it'll create a queue of size infinite else queue of the size given as a positive number. The default value is 0 for it.
Below we have highlighted some important methods of the queues which we'll be explaining in this example.
The queue module has two other methods named get_nowait() and put_nowait() which are equivalent to get(block=False) and put(item, block=False). They are shortcut for that functionalities.
As a part of our example, we have created a method named pop_nitems(n) which accepts a single argument integer. It then loops that many times retrieving the element from the queue, printing them, and going to sleep for 2 seconds. We have put in 2 seconds of sleep to mimic the situation that we are processing data retrieved from the queue. Our main code first adds 20 elements to the queue and creates three threads each of which retrieves a number of elements from the queue given as a parameter.
We can notice from the output how three threads are taking turns in accessing elements from the queue. The main thread exits immediately once it has started all three threads.
If you are interested in learning about threads in python then please feel free to check our tutorial on the same.
import threading
import queue
import time
fifo_queue = queue.Queue()
def pop_nitems(n):
for i in range(n):
item = fifo_queue.get()
time.sleep(2)
print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))
#print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
if __name__ == "__main__":
for i in range(1, 21):
fifo_queue.put("Task-{}".format(i))
thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="Process-3Items")
thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="Process-4Items")
thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="Process-5Items")
thread1.start(), thread2.start(), thread3.start()
print("\nExited from Main Thread\n")
OUTPUT
Exited from Main Thread
Thread : Process-3Items. Retrieved & Processed Item : Task-1
Thread : Process-4Items. Retrieved & Processed Item : Task-2
Thread : Process-5Items. Retrieved & Processed Item : Task-3
Thread : Process-4Items. Retrieved & Processed Item : Task-5
Thread : Process-3Items. Retrieved & Processed Item : Task-4
Thread : Process-5Items. Retrieved & Processed Item : Task-6
Thread : Process-4Items. Retrieved & Processed Item : Task-7
Thread : Process-5Items. Retrieved & Processed Item : Task-9
Thread : Process-3Items. Retrieved & Processed Item : Task-8
Thread : Process-5Items. Retrieved & Processed Item : Task-11
Thread : Process-4Items. Retrieved & Processed Item : Task-10
Thread : Process-5Items. Retrieved & Processed Item : Task-12
As a part of our second example, we'll explain how we can make a thread wait until all items of the queue are retrieved and processed. We'll make our main thread wait until all items in the queue are processed before it exits.
We'll be introducing the usage of the below-mentioned methods in this example.
Our example of this part is exactly the same as our previous example with few minor changes. We have added call to task_done() method inside pop_nitems() method. It'll indicate the processing of queue items. We have modified our main code loop to only add 12 elements to the queue. We have then called join() method on queue to make the main thread wait until all items of the queue are processed.
We can compare the output of this example with the previous example. It has one difference that now exit message get printed after all items in the queue is processed. The exit message in the previous example was printed once all threads have started and the main thread exits.
import threading
import queue
import time
fifo_queue = queue.Queue()
def pop_nitems(n):
for i in range(n):
item = fifo_queue.get()
time.sleep(2)
print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))
#print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
fifo_queue.task_done()
if __name__ == "__main__":
for i in range(1, 13):
fifo_queue.put("Task-{}".format(i))
thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="Process-3Items")
thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="Process-4Items")
thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="Process-5Items")
thread1.start(), thread2.start(), thread3.start()
fifo_queue.join()
print("\nAll items in queue completed. Exited from Main Thread\n")
OUTPUT
Thread : Process-3Items. Retrieved & Processed Item : Task-1
Thread : Process-4Items. Retrieved & Processed Item : Task-2
Thread : Process-5Items. Retrieved & Processed Item : Task-3
Thread : Process-3Items. Retrieved & Processed Item : Task-4
Thread : Process-4Items. Retrieved & Processed Item : Task-5
Thread : Process-5Items. Retrieved & Processed Item : Task-6
Thread : Process-4Items. Retrieved & Processed Item : Task-8
Thread : Process-3Items. Retrieved & Processed Item : Task-7
Thread : Process-5Items. Retrieved & Processed Item : Task-9
Thread : Process-4Items. Retrieved & Processed Item : Task-10
Thread : Process-5Items. Retrieved & Processed Item : Task-11
Thread : Process-5Items. Retrieved & Processed Item : Task-12
All items in queue completed. Exited from Main Thread
As a part of our third example, we are building on the code from previous examples and explaining how timeout parameter of the methods get() and put() works.
We have introduced a new method named push_item(value) which pushes the value to the queue with a timeout of 1 second. We have even introduced time sleep of 2 seconds so that it looks like the process of putting an item is taking time, just to mimic real-life situations. We have wrapped the code which tries to put value to queue in try-except block to catch queue Full error. This error can happen when the thread tries to put data to queue and the queue is full. It waits for a specified amount of timeout and then raises Full error. Please make a note that block parameter is True by default.
We have wrapped code inside of pop_nitems() in try-except block to catch queue Empty error. This can happen when the thread tries to get data from an empty queue and fails after waiting for 1 second if data is still not available. We have also printing messages to show the progress of code which includes queue size.
We have also set the queue size to 12 using maxsize parameter.
In our main code, we are trying to put 20 items in the queue so that some will fail as well due to limited queue size.
We can notice from the output that initially all threads fail to get data from the queue as it’s empty. We can notice from queue size as well that in which order items are put into a queue and compare that order to items that we are getting out of the queue.
import threading
import queue
import time
fifo_queue = queue.Queue(maxsize=12)
def pop_nitems(n):
for i in range(n):
try:
item = fifo_queue.get(timeout=1)
time.sleep(2)
print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))
#print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
fifo_queue.task_done()
except Exception as e:
print("Thread : {}. Error : {}. Queue Size : {}".format(threading.current_thread().name, type(e).__name__, fifo_queue.qsize()))
def push_item(value):
try:
time.sleep(2)
fifo_queue.put(value, timeout=1)
print("Put Item : {}. Queue Size : {}".format(value, fifo_queue.qsize()))
except Exception as e:
print("Thread : {}. Error : {}. Failed to add value ({}) to queue. Queue Size : {}".format(threading.current_thread().name, type(e).__name__, value, fifo_queue.qsize()))
if __name__ == "__main__":
for i in range(1, 21):
t = threading.Thread(target=push_item, args=("Task-{}".format(i), ), name="PutItem-%d"%i)
t.start()
thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="P3I")
thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="P4I")
thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="P5I")
thread1.start(), thread2.start(), thread3.start()
OUTPUT
Thread : P3I. Error : Empty. Queue Size : 0
Thread : P4I. Error : Empty. Queue Size : 0
Thread : P5I. Error : Empty. Queue Size : 0
Put Item : Task-1. Queue Size : 1
Put Item : Task-11. Queue Size : 2
Put Item : Task-12. Queue Size : 3
Put Item : Task-2. Queue Size : 6
Put Item : Task-18. Queue Size : 7
Put Item : Task-5. Queue Size : 8
Put Item : Task-14. Queue Size : 4
Put Item : Task-16. Queue Size : 12
Put Item : Task-17. Queue Size : 12
Put Item : Task-19. Queue Size : 5
Put Item : Task-13. Queue Size : 9
Put Item : Task-20. Queue Size : 10
Put Item : Task-7. Queue Size : 12
Put Item : Task-10. Queue Size : 8
Put Item : Task-15. Queue Size : 11
Thread : PutItem-9. Error : Full. Failed to add value (Task-9) to queue. Queue Size : 12
Thread : PutItem-4. Error : Full. Failed to add value (Task-4) to queue. Queue Size : 12
Thread : PutItem-8. Error : Full. Failed to add value (Task-8) to queue. Queue Size : 12
Thread : PutItem-6. Error : Full. Failed to add value (Task-6) to queue. Queue Size : 12
Thread : PutItem-3. Error : Full. Failed to add value (Task-3) to queue. Queue Size : 12
Thread : P3I. Retrieved & Processed Item : Task-1
Thread : P5I. Retrieved & Processed Item : Task-12
Thread : P4I. Retrieved & Processed Item : Task-11
Thread : P3I. Retrieved & Processed Item : Task-14
Thread : P5I. Retrieved & Processed Item : Task-19
Thread : P4I. Retrieved & Processed Item : Task-2
Thread : P4I. Retrieved & Processed Item : Task-5
Thread : P5I. Retrieved & Processed Item : Task-18
Thread : P5I. Retrieved & Processed Item : Task-10
As a part of this example, we are explaining how get() and put() method works with block parameter set to False.
Our code for this example is exactly the same as our previous example with the only change that we have set block to False in both get() and put() methods. The timeout will be ineffective with the block set to False. As we have set the block to False, if the thread is not able to put an item to queue or get the item from the queue, then it'll immediately fail with Full or Empty error respectively immediately without waiting.
When we run the script we can see that it initially raises an Empty error when threads try to get data from the queue and there are few Full errors as well when in-between threads try to put data to the queue.
import threading
import queue
import time
fifo_queue = queue.Queue(maxsize=12)
def pop_nitems(n):
for i in range(n):
try:
item = fifo_queue.get(block=False)
time.sleep(2)
print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))
#print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
fifo_queue.task_done()
except Exception as e:
print("Thread : {}. Error : {}. Queue Size : {}".format(threading.current_thread().name, type(e).__name__, fifo_queue.qsize()))
time.sleep(2)
def push_item(value):
try:
time.sleep(2)
fifo_queue.put(value, block=False)
print("Put Item : {}. Queue Size : {}".format(value, fifo_queue.qsize()))
except Exception as e:
print("Thread : {}. Error : {}. Failed to add value ({}) to queue. Queue Size : {}".format(threading.current_thread().name, type(e).__name__, value, fifo_queue.qsize()))
if __name__ == "__main__":
for i in range(1, 21):
t = threading.Thread(target=push_item, args=("Task-{}".format(i), ), name="PutItem-%d"%i)
t.start()
thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="P3I")
thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="P4I")
thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="P5I")
thread1.start(), thread2.start(), thread3.start()
OUTPUT
Thread : P3I. Error : Empty. Queue Size : 0
Thread : P4I. Error : Empty. Queue Size : 0
Thread : P5I. Error : Empty. Queue Size : 0
Put Item : Task-1. Queue Size : 1
Put Item : Task-14. Queue Size : 2
Put Item : Task-10. Queue Size : 3
Put Item : Task-18. Queue Size : 6
Put Item : Task-17. Queue Size : 7
Put Item : Task-2. Queue Size : 10
Put Item : Task-7. Queue Size : 12
Thread : PutItem-8. Error : Full. Failed to add value (Task-8) to queue. Queue Size : 12
Thread : PutItem-13. Error : Full. Failed to add value (Task-13) to queue. Queue Size : 12
Thread : PutItem-11. Error : Full. Failed to add value (Task-11) to queue. Queue Size : 12
Put Item : Task-5. Queue Size : 4
Put Item : Task-12. Queue Size : 8
Put Item : Task-15. Queue Size : 11
Put Item : Task-19. Queue Size : 5
Thread : PutItem-20. Error : Full. Failed to add value (Task-20) to queue. Queue Size : 12
Put Item : Task-4. Queue Size : 9
Thread : PutItem-9. Error : Full. Failed to add value (Task-9) to queue. Queue Size : 12
Thread : PutItem-3. Error : Full. Failed to add value (Task-3) to queue. Queue Size : 12
Thread : PutItem-6. Error : Full. Failed to add value (Task-6) to queue. Queue Size : 12
Thread : PutItem-16. Error : Full. Failed to add value (Task-16) to queue. Queue Size : 12
Thread : P5I. Retrieved & Processed Item : Task-1
Thread : P3I. Retrieved & Processed Item : Task-14
Thread : P4I. Retrieved & Processed Item : Task-10
Thread : P4I. Retrieved & Processed Item : Task-18
Thread : P3I. Retrieved & Processed Item : Task-19
Thread : P5I. Retrieved & Processed Item : Task-5
Thread : P4I. Retrieved & Processed Item : Task-17
Thread : P5I. Retrieved & Processed Item : Task-12
Thread : P5I. Retrieved & Processed Item : Task-4
As a part of our fifth example, we are demonstrating two new methods of the queue classes in the queue module.
Our code for this example builds on the previous example. We have introduced while loop inside of the both pop_nitems() and push_item() methods. The while loops check constantly whether the queue is empty inside of pop_nitems() method and whether the queue is full inside of push_item() method. If the queue is empty or full then it puts the thread which is trying to put/get data to sleep for 2 seconds and check again until a condition is satisfied.
import threading
import queue
import time
fifo_queue = queue.Queue(maxsize=12)
def pop_nitems(n):
for i in range(n):
while fifo_queue.empty():
print("Thread : {}. Queue Empty. Queue Size : {}".format(threading.current_thread().name, fifo_queue.qsize()))
time.sleep(2)
item = fifo_queue.get()
time.sleep(2)
print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))
#print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
fifo_queue.task_done()
def push_item(value):
while fifo_queue.full():
print("Thread : {}. Full Queue. Failed to add value ({}) to queue. Queue Size : {}".format(threading.current_thread().name, value, fifo_queue.qsize()))
time.sleep(2)
fifo_queue.put(value)
print("Put Item : {}. Queue Size : {}".format(value, fifo_queue.qsize()))
if __name__ == "__main__":
for i in range(1, 21):
t = threading.Thread(target=push_item, args=("Task-{}".format(i), ), name="PutItem-%d"%i)
t.start()
thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="P3I")
thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="P4I")
thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="P5I")
thread1.start(), thread2.start(), thread3.start()
OUTPUT
Put Item : Task-1. Queue Size : 1
Put Item : Task-2. Queue Size : 2
Put Item : Task-3. Queue Size : 3
Put Item : Task-4. Queue Size : 4
Put Item : Task-5. Queue Size : 5
Put Item : Task-6. Queue Size : 6
Put Item : Task-7. Queue Size : 7
Put Item : Task-8. Queue Size : 8
Put Item : Task-9. Queue Size : 9
Put Item : Task-10. Queue Size : 10
Put Item : Task-11. Queue Size : 11
Put Item : Task-12. Queue Size : 12
Thread : PutItem-13. Full Queue. Failed to add value (Task-13) to queue. Queue Size : 12
Thread : PutItem-14. Full Queue. Failed to add value (Task-14) to queue. Queue Size : 12
Thread : PutItem-15. Full Queue. Failed to add value (Task-15) to queue. Queue Size : 12
Thread : PutItem-16. Full Queue. Failed to add value (Task-16) to queue. Queue Size : 12
Thread : PutItem-17. Full Queue. Failed to add value (Task-17) to queue. Queue Size : 12
Thread : PutItem-18. Full Queue. Failed to add value (Task-18) to queue. Queue Size : 12
Thread : PutItem-19. Full Queue. Failed to add value (Task-19) to queue. Queue Size : 12
Thread : PutItem-20. Full Queue. Failed to add value (Task-20) to queue. Queue Size : 12
Put Item : Task-13. Queue Size : 10
Put Item : Task-20. Queue Size : 11
Thread : PutItem-15. Full Queue. Failed to add value (Task-15) to queue. Queue Size : 12
Thread : PutItem-17. Full Queue. Failed to add value (Task-17) to queue. Queue Size : 12
Thread : PutItem-19. Full Queue. Failed to add value (Task-19) to queue. Queue Size : 12
Thread : PutItem-18. Full Queue. Failed to add value (Task-18) to queue. Queue Size : 12
Put Item : Task-14. Queue Size : 12
Thread : PutItem-16. Full Queue. Failed to add value (Task-16) to queue. Queue Size : 12
Thread : P3I. Retrieved & Processed Item : Task-1
Thread : P4I. Retrieved & Processed Item : Task-2
Thread : P5I. Retrieved & Processed Item : Task-3
Put Item : Task-16. Queue Size : 10
Put Item : Task-19. Queue Size : 11
Thread : PutItem-17. Full Queue. Failed to add value (Task-17) to queue. Queue Size : 12
Put Item : Task-15. Queue Size : 12
Thread : PutItem-18. Full Queue. Failed to add value (Task-18) to queue. Queue Size : 12
Thread : P3I. Retrieved & Processed Item : Task-4
Thread : P4I. Retrieved & Processed Item : Task-5
Thread : P5I. Retrieved & Processed Item : Task-6
Put Item : Task-17. Queue Size : 10
Thread : P5I. Retrieved & Processed Item : Task-9
Put Item : Task-18. Queue Size : 11
Thread : P3I. Retrieved & Processed Item : Task-7
Thread : P4I. Retrieved & Processed Item : Task-8
Thread : P5I. Retrieved & Processed Item : Task-10
Thread : P4I. Retrieved & Processed Item : Task-11
Thread : P5I. Retrieved & Processed Item : Task-12
Our sixth example is an exact copy of our second example with the only change that we are using LifoQueue in this example.
We can notice a change in the output in the sequence in which elements are retrieved.
import threading
import queue
import time
lifo_queue = queue.LifoQueue()
def pop_nitems(n):
for i in range(n):
item = lifo_queue.get()
time.sleep(2)
print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))
#print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
lifo_queue.task_done()
if __name__ == "__main__":
for i in range(1, 13):
lifo_queue.put("Task-{}".format(i))
thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="P3I")
thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="P4I")
thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="P5I")
thread1.start(), thread2.start(), thread3.start()
lifo_queue.join()
print("\nAll items in queue completed. Exited from Main Thread\n")
OUTPUT
Thread : P3I. Retrieved & Processed Item : Task-12
Thread : P4I. Retrieved & Processed Item : Task-11
Thread : P5I. Retrieved & Processed Item : Task-10
Thread : P4I. Retrieved & Processed Item : Task-8
Thread : P3I. Retrieved & Processed Item : Task-9
Thread : P5I. Retrieved & Processed Item : Task-7
Thread : P4I. Retrieved & Processed Item : Task-6
Thread : P3I. Retrieved & Processed Item : Task-5
Thread : P5I. Retrieved & Processed Item : Task-4
Thread : P4I. Retrieved & Processed Item : Task-3
Thread : P5I. Retrieved & Processed Item : Task-2
Thread : P5I. Retrieved & Processed Item : Task-1
All items in queue completed. Exited from Main Thread
Our seventh example is an exact copy of our third example with the only change that we are using LifoQueue instead.
We can notice from the output of script change in sequence elements are retrieved.
import threading
import queue
import time
lifo_queue = queue.LifoQueue()
def pop_nitems(n):
for i in range(n):
try:
item = lifo_queue.get(timeout=1)
time.sleep(2)
print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))
#print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
lifo_queue.task_done()
except Exception as e:
print("Thread : {}. Error : {}. Queue Size : {}".format(threading.current_thread().name, type(e).__name__, lifo_queue.qsize()))
def push_item(value):
try:
time.sleep(2)
lifo_queue.put(value, timeout=1)
print("Put Item : {}. Queue Size : {}".format(value, lifo_queue.qsize()))
except Exception as e:
print("Thread : {}. Error : {}. Failed to add value ({}) to queue. Queue Size : {}".format(threading.current_thread().name, type(e).__name__, value, lifo_queue.qsize()))
if __name__ == "__main__":
for i in range(1, 21):
t = threading.Thread(target=push_item, args=("Task-{}".format(i), ), name="PutItem-%d"%i)
t.start()
thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="P3I")
thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="P4I")
thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="P5I")
thread1.start(), thread2.start(), thread3.start()
OUTPUT
Thread : P3I. Error : Empty. Queue Size : 0
Thread : P4I. Error : Empty. Queue Size : 0
Thread : P5I. Error : Empty. Queue Size : 0
Put Item : Task-1. Queue Size : 1
Put Item : Task-13. Queue Size : 2
Put Item : Task-3. Queue Size : 3
Put Item : Task-2. Queue Size : 5
Put Item : Task-6. Queue Size : 7
Put Item : Task-4. Queue Size : 3
Put Item : Task-8. Queue Size : 9
Put Item : Task-10. Queue Size : 11
Put Item : Task-5. Queue Size : 4
Put Item : Task-16. Queue Size : 11
Put Item : Task-15. Queue Size : 13
Put Item : Task-19. Queue Size : 12
Put Item : Task-18. Queue Size : 16
Put Item : Task-12. Queue Size : 10
Put Item : Task-9. Queue Size : 6
Put Item : Task-17. Queue Size : 14
Put Item : Task-20. Queue Size : 15
Put Item : Task-14. Queue Size : 17
Put Item : Task-11. Queue Size : 8
Put Item : Task-7. Queue Size : 8
Thread : P3I. Retrieved & Processed Item : Task-3
Thread : P4I. Retrieved & Processed Item : Task-10
Thread : P5I. Retrieved & Processed Item : Task-7
Thread : P5I. Retrieved & Processed Item : Task-20
Thread : P3I. Retrieved & Processed Item : Task-14
Thread : P4I. Retrieved & Processed Item : Task-18
Thread : P5I. Retrieved & Processed Item : Task-17
Thread : P4I. Retrieved & Processed Item : Task-15
Thread : P5I. Retrieved & Processed Item : Task-19
Our code for example eight is exactly the same as our second example but we have used PriorityQueue instead. We have made changes to the element that is getting put into the priority queue. We are now putting a two-element tuple inside of queue where a first element is a random number between 1-50 based on which queue element will be sorted. Each call to get method will return elements with the lowest priority first.
We can notice from the output how items are retrieved in sorted order based on the first value of the tuple.
import threading
import queue
import time
import random
priority_queue = queue.PriorityQueue()
def pop_nitems(n):
for i in range(n):
time.sleep(2)
item = priority_queue.get()
print("Thread : {}. Retrieved & Proccessed Item : {}".format(threading.current_thread().name, item))
#print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
priority_queue.task_done()
if __name__ == "__main__":
for i in range(1, 13):
priority_queue.put((random.randint(1, 51), "Task-{}".format(i)))
thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="P3I")
thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="P4I")
thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="P5I")
thread1.start(), thread2.start(), thread3.start()
priority_queue.join()
print("\nAll items in queue completed. Exited from Main Thread\n")
OUTPUT
Thread : P3I. Retrieved & Proccessed Item : (2, 'Task-3')
Thread : P4I. Retrieved & Proccessed Item : (3, 'Task-11')
Thread : P5I. Retrieved & Proccessed Item : (3, 'Task-4')
Thread : P3I. Retrieved & Proccessed Item : (6, 'Task-10')
Thread : P4I. Retrieved & Proccessed Item : (17, 'Task-6')
Thread : P5I. Retrieved & Proccessed Item : (22, 'Task-5')
Thread : P3I. Retrieved & Proccessed Item : (24, 'Task-7')
Thread : P4I. Retrieved & Proccessed Item : (37, 'Task-8')
Thread : P5I. Retrieved & Proccessed Item : (43, 'Task-12')
Thread : P5I. Retrieved & Proccessed Item : (44, 'Task-2')
Thread : P4I. Retrieved & Proccessed Item : (46, 'Task-9')
Thread : P5I. Retrieved & Proccessed Item : (49, 'Task-1')
All items in queue completed. Exited from Main Thread
Our ninth example is an exact copy of our third example with few minor changes. We are using PriorityQueue in this example and we are putting two tuple values in the queue as an item where the first value in the tuple is a random number in the range 1-50 and the second value is the actual value.
import threading
import queue
import time
import random
priority_queue = queue.PriorityQueue()
def pop_nitems(n):
for i in range(n):
try:
item = priority_queue.get(timeout=1)
time.sleep(2)
print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))
#print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
priority_queue.task_done()
except Exception as e:
print("Thread : {}. Error : {}. Queue Size : {}".format(threading.current_thread().name, type(e).__name__, priority_queue.qsize()))
def push_item(value):
try:
time.sleep(2)
priority_queue.put(value, timeout=1)
print("Put Item : {}. Queue Size : {}".format(value, priority_queue.qsize()))
except Exception as e:
print("Thread : {}. Error : {}. Failed to add value ({}) to queue. Queue Size : {}".format(threading.current_thread().name, type(e).__name__, value, lifo_queue.qsize()))
if __name__ == "__main__":
for i in range(1, 21):
t = threading.Thread(target=push_item, args=((random.randint(1, 51), "Task-{}".format(i)), ), name="PutItem-%d"%i)
t.start()
thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="P3I")
thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="P4I")
thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="P5I")
thread1.start(), thread2.start(), thread3.start()
OUTPUT
Thread : P3I. Error : Empty. Queue Size : 0
Thread : P5I. Error : Empty. Queue Size : 0
Thread : P4I. Error : Empty. Queue Size : 0
Put Item : (4, 'Task-5'). Queue Size : 1
Put Item : (2, 'Task-8'). Queue Size : 2
Put Item : (4, 'Task-1'). Queue Size : 4
Put Item : (34, 'Task-11'). Queue Size : 5
Put Item : (1, 'Task-10'). Queue Size : 6
Put Item : (51, 'Task-15'). Queue Size : 9
Put Item : (23, 'Task-16'). Queue Size : 10
Put Item : (38, 'Task-3'). Queue Size : 3
Put Item : (48, 'Task-17'). Queue Size : 11
Put Item : (41, 'Task-14'). Queue Size : 14
Put Item : (12, 'Task-20'). Queue Size : 16
Put Item : (46, 'Task-4'). Queue Size : 17
Put Item : (44, 'Task-7'). Queue Size : 8
Put Item : (18, 'Task-19'). Queue Size : 10
Put Item : (4, 'Task-18'). Queue Size : 15
Put Item : (11, 'Task-9'). Queue Size : 4
Put Item : (9, 'Task-6'). Queue Size : 11
Put Item : (34, 'Task-13'). Queue Size : 13
Put Item : (7, 'Task-12'). Queue Size : 12
Put Item : (10, 'Task-2'). Queue Size : 7
Thread : P5I. Retrieved & Processed Item : (2, 'Task-8')
Thread : P4I. Retrieved & Processed Item : (4, 'Task-1')
Thread : P3I. Retrieved & Processed Item : (1, 'Task-10')
Thread : P5I. Retrieved & Processed Item : (4, 'Task-18')
Thread : P4I. Retrieved & Processed Item : (4, 'Task-5')
Thread : P3I. Retrieved & Processed Item : (7, 'Task-12')
Thread : P5I. Retrieved & Processed Item : (9, 'Task-6')
Thread : P4I. Retrieved & Processed Item : (10, 'Task-2')
Thread : P5I. Retrieved & Processed Item : (11, 'Task-9')
Our tenth example is an exact copy of our second example with two minor changes. The first change is that we are using SimpleQueue as a queue type in this example. The second change is that we haven't called task_done() and join() methods as they are not available with SimpleQueue.
import threading
import queue
import time
simple_queue = queue.SimpleQueue()
def pop_nitems(n):
for i in range(n):
item = simple_queue.get()
time.sleep(2)
print("Thread : {}. Retrieved & Processed Item : {}".format(threading.current_thread().name, item))
#print("Thread {}. Processed Item : {}".format(threading.current_thread().name, item))
if __name__ == "__main__":
for i in range(1, 13):
simple_queue.put("Task-{}".format(i))
thread1 = threading.Thread(target=pop_nitems, args=(3, ), name="P3I")
thread2 = threading.Thread(target=pop_nitems, args=(4, ), name="P4I")
thread3 = threading.Thread(target=pop_nitems, args=(5, ), name="P5I")
thread1.start(), thread2.start(), thread3.start()
OUTPUT
Thread : P3I. Retrieved & Processed Item : Task-1
Thread : P4I. Retrieved & Processed Item : Task-2
Thread : P5I. Retrieved & Processed Item : Task-3
Thread : P4I. Retrieved & Processed Item : Task-5
Thread : P3I. Retrieved & Processed Item : Task-4
Thread : P5I. Retrieved & Processed Item : Task-6
Thread : P4I. Retrieved & Processed Item : Task-7
Thread : P5I. Retrieved & Processed Item : Task-9
Thread : P3I. Retrieved & Processed Item : Task-8
Thread : P4I. Retrieved & Processed Item : Task-10
Thread : P5I. Retrieved & Processed Item : Task-11
Thread : P5I. Retrieved & Processed Item : Task-12
This ends our small tutorial explaining different types of thread-safe synchronized queue data structures available in Python to work within a multithreading environment. Please feel free to let us know your views in the comments section.
If you are more comfortable learning through video tutorials then we would recommend that you subscribe to our YouTube channel.
When going through coding examples, it's quite common to have doubts and errors.
If you have doubts about some code examples or are stuck somewhere when trying our code, send us an email at coderzcolumn07@gmail.com. We'll help you or point you in the direction where you can find a solution to your problem.
You can even send us a mail if you are trying something new and need guidance regarding coding. We'll try to respond as soon as possible.
If you want to