In my last project, I needed to synchronize producer and consumers threads. The producer thread was responsible to generate the tasks while the consumer threads were responsible to execute them. This classic producer and consumer problem can be easily solved using the Queue module.
To solve the problem let's define some classes
Task can be any class which has a
class Task: def run(self): pass
TaskWorker consumer thread is responsible for executing the tasks.
class TaskWorker(threading.Thread): def __init__(self, source): threading.Thread.__init__(self) self.source = source self.daemon = True # <== A1 self.start() # <== A2 def run(self): while True: task = self.source.get() # <== B1 try: task.run() # <== B2 except Exception, ee: pass self.source.task_done() # <== B3
TaskWorker constructor takes a the thread's task source. The source can be one of
PriorityQueue classes from the Queue module. This source, which should be populated by the producer thread with tasks, is used to retrieve the next task to be executed.
The strategy what is the next task is based on source class:
FIFO(First In, First Out)strategy. Thus, the next task to be executed will be the oldest task in the source.
LIFO(Last In, First Out)strategy. Thus, the next task to be executed will be the newest task in the source.
prioritystrategy. Thus, the next task to be executed will be the lowest valued task in the source. Please note that the task should implement operator < (__le__ method).
The constructor also automatically start the thread in
A2. It will also convert it to a
daemon thread (by setting its
daemon attribute in
A1) - This step is important since the thread main activity is infinite loop and therefore will never end. As daemon thread, It will allow the python interpreter to end the program by abruptly terminate it when all
non daemon threads finish their execution.
There are two types of threads in python which are treated differently by the python interpreter:
non daemon threadsare threads which do not allow the interpreter to abruptly terminate them. Thus, they force the interpreter to wait until the last
non daemon threadsterminates before the interpreter terminates the program.
daemon threadsare threads which allow the interpreter to abruptly terminate them. Thus, when all
non daemon threadsfinish their execution, the interpreter will abruptly terminate those threads and terminate the program.
As a rule of thumb, The program will end when the its last
non daemon threads terminates even if some
daemon threads are still running.
run method (the thread’s activity) , we loop forever, in each loop iteration, we get a new task from the thread's task source (
B1) run the task (
B2) and notify the source that the has completed (
getmethod will suspend the thread as necessary - In the case there is no task available in the source the thread will be suspended. It will be resumed only when a new task is available in the source - this will happen when other (producer) thread will
puta new task.
runin try except construct that if an exception is thrown it will not terminate the thread.
B3we notify the source that the has completed, this will be used by the source to determine whether all available tasks has completed by testing whether the source is empty and the number of
getcalls is equal to
TaskWorkerPool is responsible to create the
TaskWorker threads and allows the producer threads to add task and wait until they are completed.
class TaskWorkerPool: def __init__(self, numWorkers): self.source = Queue.Queue() # <== A1 for _ in range(numWorkers): # <== A2 TaskWorker(self.source) def add(self, task): self.source.put(task) def wait(self): self.source.join()
TaskWorkerPoolconstructor takes the number of workers and create them(
A2). It also create the source of the tasks (
A2). As discussed this source can be one of
PriorityQueueclasses from Queue module
addmethod allows to add new task. Please note, if the queue is empty, It will resume one of the worker threads which get suspended while trying to get a new task.
waitmethod will block until all the tasks has been executed. The thread will be suspended until the source is empty and the number of
getcalls is equal to
In the next part, we will see how can we use those classes in a demo program