In my last project, I needed to synchronize producer and consumer threads. The producer thread was responsible for generating the tasks, while the consumer threads were responsible for executing them. This classic producer and consumer problem can be easily solved using the python Queue
module.
Task, TaskWorker, and TaskWorkerPool
To solve the problem, let’s define some classes:
The Task Class – The Job That Needs To Be Done
A Task
can be any class that has a run method
Task
class Task:
def run(self):
pass
The TaskWorker Class – The Consumer Thread
The following TaskWorker
consumer thread is responsible for executing the tasks.
TaskWorker Consumer Thread
class TaskWorker(threading.Thread):
def __init__(self, source):
threading.Thread.__init__(self)
self.source = source
self.daemon = True # <== A1
self.start() # <== A2
def run(self):
while True:
task = self.source.get() # <== B1
try:
task.run() # <== B2
except Exception, ee:
pass
self.source.task_done() # <== B3
The TaskWorker
constructor takes a the thread’s task source. The source can be one of Queue
,LifoQueue
and PriorityQueue
classes from the Queue module. This source, which should be populated by the producer thread with tasks, is used to retrieve the next task to be executed.
About daemon threads.
There are two types of threads in python which are treated differently by the python interpreter:
- Normal threads or non-daemon threads are threads that do not allow the interpreter to terminate them abruptly. Thus, they force the interpreter to wait until the last non-daemon threads terminate before the interpreter terminates the program.
- Daemon threads are threads that allow the interpreter to terminate them abruptly. Thus, when all non-daemon threads finish their execution, the interpreter will abruptly terminate those threads and terminate the program.
As a rule of thumb, The program will end when its last non-daemon thread terminates, even if some daemon threads are still running.
The strategy what is the next task is based on source class:
Queue
uses the FIFO(First In, First Out) strategy. Thus, the next task to be executed will be the oldest in the source.LifoQueue
uses the LIFO(Last In, First Out) strategy. Thus, the next task to be executed will be the newest in the source.PriorityQueue
uses a priority strategy. Thus, the next task to be executed will be the lowest valued task in the source. Please note that the task should implement operator < ( the__le__
method).
The constructor also automatically starts the thread in A2
. It will also convert it to a daemon thread (by setting its daemon attribute in A1
) – This step is essential since the thread’s main activity is an infinite loop and will never end. The daemon thread will allow the python interpreter to abruptly terminate the program when all non-daemon threads finish their execution.
In the run
method (the thread’s activity), we loop forever, in each loop iteration, we get a new task from the thread’s task source (B1
), run the task (B2
), and notify the source that the has completed (B3
)
- In
B1
– theget
method will suspend the thread as necessary – In the case, there is no task available in the source, the thread will be suspended. It will be resumed only when a new task is available in the source – this will happen when another (producer) thread puts a new task. - In
B2
, We run the the task. We also enclose the task’s run in try-except construct that if an exception is thrown, it will not terminate the thread. - In
B3
we notify the source that it has been completed; the source will use this to determine whether all available tasks have been completed by testing whether the source is empty and the number of getting calls is equal totask_done
calls.
The TaskWorkerPool Class – Allow Producer Thread To Add Tasks
The following TaskWorkerPool is responsible for creating the TaskWorker threads and allows the producer threads to add task and wait until they are completed.
TaskWorkerPool – Manage TaskWorker threads and Task
class TaskWorkerPool:
def __init__(self, numWorkers):
self.source = Queue.Queue() # <== A1
for _ in range(numWorkers): # <== A2
TaskWorker(self.source)
def add(self, task):
self.source.put(task)
def wait(self):
self.source.join()
- The
TaskWorkerPool
constructor takes the number of workers and creates them(A2
). It also creates the source of the tasks (A1
). As discussed, this source can be one of theQueue
,LifoQueue
, andPriorityQueue
classes fromĀ Queue module - The
add
method allows adding a new task. Please note that if the queue is empty, It will resume one of the worker threads, which gets suspended while trying to get a new task. - The
wait
method will block until all the tasks have been executed. The thread will be suspended until the source is empty and the number of getting calls is equal totask_done
calls.
Producer and Consumer Problem in Action
Let’s see how can we use Task
, TaskWorker
and TaskWorkerPool
to easily create producer thread the generate tasks and let the consumer threads to do them.
In the demo program, the task is summing numbers from 1 to a random number between 1 and 100000. Before we start this work, and when this work is completed, we print the corresponding message with the unique ID of the task.
Demo’s Task – The Work That Needs To Be Done
class Task:
def __init__(self,id):
self.id = id
def run(self):
self.log('Started')
self.doWork()
self.log('Ended')
def log(self,msg,*args):
logLock.acquire(True)
now = datetime.datetime.now()
sys.stdout.write( '{1:>02}:{2:>02}:{3:>02} #{4:>04} | {0}\n'.format( msg.format(*args), now.hour, now.minute, now.second , self.id))
logLock.release()
def doWork(self):
sum = 0
for xx in range(0 , random.randint(1,100000) ):
sum += xx
Let’s see how to use the TaskWorkerPool
in the producer thread.
The Producer Thread – Add Tasks With TaskWorkerPool
twp = TaskWorkerPool(10) # <== A1
for ii in range(500): # <== A2
twp.add( Task(ii) )
twp.wait() # <== A3
print "Done with all tasks :)"
- In
A1
we define aTaskWorkerPool
with 10 worker threads. that is; thisTaskWorkerPool
will create 10 consumer threads. - In
A2
we add some tasks. Please note that the worker threads start executing the tasks while we are adding them. - In
A3
we wait until all the tasks completed.
Running the program will produce the output similar to next one:
Demo “Producer and Consumer Problem” – Output
10:00:00 #0000 | Started
10:00:00 #0001 | Started
...
10:00:00 #0009 | Started
10:00:00 #0000 | Ended
10:00:00 #0000 | Left 41 Tasks
10:00:00 #0010 | Started
10:00:00 #0010 | Ended
...
10:00:09 #0493 | Left 3 Tasks
10:00:09 #0491 | Left 3 Tasks
10:00:09 #0486 | Left 0 Tasks
...
10:00:09 #0499 | Ended
10:00:09 #0499 | Left 0 Tasks
Done with all tasks :)