Producer and Consumer Problem Made Simple [With 3 Python Classes]

In my last project, I needed to synchronize producer and consumer threads. The producer thread was responsible for generating the tasks, while the consumer threads were responsible for executing them. This classic producer and consumer problem can be easily solved using the python Queue module.

Task, TaskWorker, and TaskWorkerPool

To solve the problem, let’s define some classes:

The Task Class – The Job That Needs To Be Done

A Task can be any class that has a run method

Task
class Task:
    def run(self):
        pass

The TaskWorker Class – The Consumer Thread

The following TaskWorker consumer thread is responsible for executing the tasks.

TaskWorker Consumer Thread
class TaskWorker(threading.Thread):
     def __init__(self, source):
         threading.Thread.__init__(self)
         self.source = source
         self.daemon = True                # <== A1
         self.start()                      # <== A2
     def run(self):
         while True:
             task = self.source.get()      # <== B1
             try:
                 task.run()                # <== B2
             except Exception, ee:
                 pass
             self.source.task_done()       # <== B3

The TaskWorker constructor takes a the thread’s task source. The source can be one of Queue,LifoQueue and PriorityQueue classes from the Queue module. This source, which should be populated by the producer thread with tasks, is used to retrieve the next task to be executed.

About daemon threads.

There are two types of threads in python which are treated differently by the python interpreter:

  • Normal threads or non-daemon threads are threads that do not allow the interpreter to terminate them abruptly. Thus, they force the interpreter to wait until the last non-daemon threads terminate before the interpreter terminates the program.
  • Daemon threads are threads that allow the interpreter to terminate them abruptly. Thus, when all non-daemon threads finish their execution, the interpreter will abruptly terminate those threads and terminate the program.

As a rule of thumb, The program will end when its last non-daemon thread terminates, even if some daemon threads are still running.

The strategy what is the next task is based on source class:

  • Queue uses the FIFO(First In, First Out) strategy. Thus, the next task to be executed will be the oldest in the source.
  • LifoQueue uses the LIFO(Last In, First Out) strategy. Thus, the next task to be executed will be the newest in the source.
  • PriorityQueue uses a priority strategy. Thus, the next task to be executed will be the lowest valued task in the source. Please note that the task should implement operator < ( the __le__ method).

The constructor also automatically starts the thread in A2. It will also convert it to a daemon thread (by setting its daemon attribute in A1) – This step is essential since the thread’s main activity is an infinite loop and will never end. The daemon thread will allow the python interpreter to abruptly terminate the program when all non-daemon threads finish their execution.

In the run method (the thread’s activity), we loop forever, in each loop iteration, we get a new task from the thread’s task source (B1), run the task (B2), and notify the source that the has completed (B3)

  • In B1 – the get method will suspend the thread as necessary – In the case, there is no task available in the source, the thread will be suspended. It will be resumed only when a new task is available in the source – this will happen when another (producer) thread puts a new task.
  • In B2, We run the the task. We also enclose the task’s run in try-except construct that if an exception is thrown, it will not terminate the thread.
  • In B3 we notify the source that it has been completed; the source will use this to determine whether all available tasks have been completed by testing whether the source is empty and the number of getting calls is equal to task_done calls.

The TaskWorkerPool Class – Allow Producer Thread To Add Tasks

The following TaskWorkerPool is responsible for creating the TaskWorker threads and allows the producer threads to add task and wait until they are completed.

TaskWorkerPool – Manage TaskWorker threads and Task
 class TaskWorkerPool:
     def __init__(self, numWorkers):
         self.source = Queue.Queue()        # <== A1
         for _ in range(numWorkers):        # <== A2
             TaskWorker(self.source)
     def add(self, task):
         self.source.put(task)
     def wait(self):
         self.source.join()
  • The TaskWorkerPool constructor takes the number of workers and creates them(A2). It also creates the source of the tasks (A1). As discussed, this source can be one of the Queue,LifoQueue, and PriorityQueue classes from Queue module
  • The add method allows adding a new task. Please note that if the queue is empty, It will resume one of the worker threads, which gets suspended while trying to get a new task.
  • The wait method will block until all the tasks have been executed. The thread will be suspended until the source is empty and the number of getting calls is equal to task_done calls.

Producer and Consumer Problem in Action

Let’s see how can we use Task, TaskWorker and TaskWorkerPool to easily create producer thread the generate tasks and let the consumer threads to do them.

In the demo program, the task is summing numbers from 1 to a random number between 1 and 100000. Before we start this work, and when this work is completed, we print the corresponding message with the unique ID of the task.

Demo’s Task – The Work That Needs To Be Done
class Task:
     def __init__(self,id):
         self.id = id
     def run(self):
         self.log('Started')
         self.doWork()
         self.log('Ended')
     def log(self,msg,*args):
         logLock.acquire(True)
         now = datetime.datetime.now()
         sys.stdout.write( '{1:>02}:{2:>02}:{3:>02} #{4:>04} | {0}\n'.format( msg.format(*args), now.hour, now.minute, now.second , self.id))
         logLock.release()
     def doWork(self):
         sum = 0
         for xx in range(0 , random.randint(1,100000) ):
             sum += xx

Let’s see how to use the TaskWorkerPool in the producer thread.

The Producer Thread – Add Tasks With TaskWorkerPool
 twp = TaskWorkerPool(10)   # <== A1
 for ii in range(500):      # <== A2
     twp.add( Task(ii) )

 twp.wait()                 # <== A3
 print "Done with all tasks :)"

  • In A1 we define a TaskWorkerPool with 10 worker threads. that is; this TaskWorkerPool will create 10 consumer threads.
  • In A2 we add some tasks. Please note that the worker threads start executing the tasks while we are adding them.
  • In A3 we wait until all the tasks completed.

Running the program will produce the output similar to next one:

Demo “Producer and Consumer Problem” – Output
 10:00:00 #0000 | Started
 10:00:00 #0001 | Started
 ...
 10:00:00 #0009 | Started
 10:00:00 #0000 | Ended
 10:00:00 #0000 | Left 41 Tasks
 10:00:00 #0010 | Started
 10:00:00 #0010 | Ended
 ...
 10:00:09 #0493 | Left 3 Tasks
 10:00:09 #0491 | Left 3 Tasks
 10:00:09 #0486 | Left 0 Tasks
 ...
 10:00:09 #0499 | Ended
 10:00:09 #0499 | Left 0 Tasks
 Done with all tasks :)


Leave a Reply

Your email address will not be published. Required fields are marked *