In my last project, I needed synchronize producer and consumers threads. The producer thread was responsible to generate the tasks while the consumer threads were responsible to execute them. This classic producer and consumer problem can be easily solved using the Queue module.

Task , TaskWorker and TaskWorkerPool

To solve the problem let’s define some classes:

Task

A Task can be any class which has a run method

Task
class Task:
    def run(self):
        pass

TaskWorker

The following TaskWorker consumer thread is responsible for executing the tasks:

TaskWorker consumer thread
class TaskWorker(threading.Thread):
    def __init__(self, source):
        threading.Thread.__init__(self)
        self.source = source
        self.daemon = True                # <== A1
        self.start()                      # <== A2
    def run(self):
        while True:
            task = self.source.get()      # <== B1
            try:
                task.run()                # <== B2
            except Exception, ee:
                pass
            self.source.task_done()       # <== B3

The TaskWorker constructor takes a the thread’s task source. The source can be one of Queue,LifoQueue and PriorityQueue classes from the Queue module. This source, which should be populated by the producer thread with tasks, is used to retrieve the next task to be executed.

The strategy what is the next task is based on source class:

  • Queue use the FIFO(First In, First Out) strategy. Thus, the next task to be executed will be the oldest task in the source.
  • LifoQueue use the LIFO(Last In, First Out) strategy. Thus, the next task to be executed will be the newest task in the source.
  • PriorityQueue use priority strategy. Thus, the next task to be executed will be the lowest valued task in the source. Please note that the task should implement operator < (__le__ method).

The constructor also automatically start the thread in A2. It will also convert it to a daemon thread (by setting its daemon attribute in A1) - This step is important since the thread main activity is infinite loop and therefore will never end. As daemon thread, It will allow the python interpreter to end the program by abruptly terminate it when all non daemon threads finish their execution.

About daemon threads

There are two types of threads in python which are treated differently by the python interpreter:

  • normal threads or non daemon threads are threads which do not allow the interpreter to abruptly terminate them. Thus, they force the interpreter to wait until the last non daemon threads terminates before the interpreter terminates the program.
  • daemon threads are threads which allow the interpreter to abruptly terminate them. Thus, when all non daemon threads finish their execution, the interpreter will abruptly terminate those threads and terminate the program.
As a rule of thumb, The program will end when the its last non daemon threads terminates even if some daemon threads are still running.

In the run method (the thread’s activity) , we loop forever, in each loop iteration, we get a new task from the thread’s task source (B1) run the task (B2) and notify the source that the has completed (B3):

  • In B1 - the get method will suspend the thread as necessary - In the case there is no task available in the source the thread will be suspended. It will be resumed only when a new task is available in the source - this will happen when other (producer) thread will put a new task.
  • We enclose the task’s run in try except construct that if an exception is thrown it will not terminate the thread.
  • In B3 we notify the source that the has completed, this will be used by the source to determine whether all available tasks has completed by testing whether the source is empty and the number of get calls is equal to task_done calls.

TaskWorkerPool

The following TaskWorkerPool is responsible to create the TaskWorker threads and allows the producer threads to add task and wait until they are completed.

TaskWorkerPool - Manage TaskWorker threads and Task
class TaskWorkerPool:
    def __init__(self, numWorkers):
        self.source = Queue.Queue()        # <== A1
        for _ in range(numWorkers):        # <== A2
            TaskWorker(self.source)
    def add(self, task):
        self.source.put(task)
    def wait(self):
        self.source.join()
  • The TaskWorkerPool constructor takes the number of workers and create them(A2). It also create the source of the tasks (A2). As discussed this source can be one of Queue,LifoQueue and PriorityQueue classes from Queue module.
  • The add method allows to add new task. Please note, if the queue is empty, It will resume one of the worker threads which get suspended while trying to get a new task.
  • The wait method will block until all the tasks has been executed. The thread will be suspended until the source is empty and the number of get calls is equal to task_done calls.

The demo program

In the demo program, the task do some random work.

Sample - The Task
class Task:
    def __init__(self,id):
        self.id = id
    def run(self):
        self.log('Started')
        self.doWork()
        self.log('Ended')
    def log(self,msg,*args):
        logLock.acquire(True)
        now = datetime.datetime.now()
        sys.stdout.write( '{1:>02}:{2:>02}:{3:>02} #{4:>04} | {0}\n'.format( msg.format(*args), now.hour, now.minute, now.second , self.id))
        logLock.release()
    def doWork(self):
        sum = 0
        for xx in range(0 , random.randint(1,100000) ):
            sum += xx
  • It prints a message at the start of the task and at its end. It will also print its unique id.
  • It do some random work - summing numbers from 1 to a random number.
  • It will also print the number of tasks left in the queue after the task has completed.

    Sample - TaskWorkerPool
    twp = TaskWorkerPool(10)   # <== A1
    for ii in range(500):      # <== A2
      twp.add( Task(ii) )
    twp.wait()                 # <== A3
    print "Done with all tasks :)"
    

The producer thread is the main thread.

  • In A1 we define a TaskWorkerPool with 10 worker threads.
  • In A2 we add some tasks. Please note that the worker threads start executing the tasks while we are adding them.
  • In A3 we wait until all the tasks completed.

Running the program will produce the output similar to next one:

Sample - output
10:00:00 #0000 | Started
10:00:00 #0001 | Started
...
10:00:00 #0009 | Started
10:00:00 #0000 | Ended
10:00:00 #0000 | Left 41 Tasks
10:00:00 #0010 | Started
10:00:00 #0010 | Ended
...
10:00:09 #0493 | Left 3 Tasks
10:00:09 #0491 | Left 3 Tasks
10:00:09 #0486 | Left 0 Tasks
...
10:00:09 #0499 | Ended
10:00:09 #0499 | Left 0 Tasks
Done with all tasks :)