diff --git a/baukit/workerpool.py b/baukit/workerpool.py index e9d2bde..f2169d7 100644 --- a/baukit/workerpool.py +++ b/baukit/workerpool.py @@ -58,14 +58,16 @@ class WorkerBase(Process): # Do the work until None is dequeued while True: try: - work_batch = self.queue.get() + work = self.queue.get() except (KeyboardInterrupt, SystemExit): print('Exiting...') break - if work_batch is None: + if work is None: self.queue.put(None) # for another worker return - self.work(*work_batch) + else: + work_batch, work_kw = work + self.work(*work_batch, **work_kw) def setup(self, **initargs): ''' @@ -74,7 +76,7 @@ class WorkerBase(Process): ''' pass - def work(self, *args): + def work(self, *args, **kwargs): ''' Override this method for one-time initialization. Args are passed from WorkerPool.add() arguments. @@ -113,16 +115,16 @@ class WorkerPool(object): # The main process should handle ctrl-C. Restore this now. signal.signal(signal.SIGINT, original_sigint_handler) - def add(self, *work_batch): + def add(self, *work_batch, **work_kw): if self.queue is None: if hasattr(self, 'worker'): - self.worker.work(*work_batch) + self.worker.work(*work_batch, **work_kw) else: print('WorkerPool shutting down.', file=sys.stderr) else: try: # The queue can block if the work is so slow it gets full. - self.queue.put(work_batch) + self.queue.put([work_batch, work_kw]) except (KeyboardInterrupt, SystemExit): # Handle ctrl-C if done while waiting for the queue. self.early_terminate()