import threading import queue import time import traceback import ctypes import inspect import sys def _async_raise(tid, exctype): """raises the exception, performs cleanup if needed""" tid = ctypes.c_long(tid) if not inspect.isclass(exctype): exctype = type(exctype) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) if res == 0: raise ValueError("invalid thread id") elif res != 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) raise SystemError("PyThreadState_SetAsyncExc failed") def stop_thread(thread): _async_raise(thread.ident, SystemExit) class _taskHandler(threading.Thread): def __init__(self,task_queue,task_handler,result_queue,*args,**kwargs): threading.Thread.__init__(self) self.task_queue = task_queue self.task_handler = task_handler self.result_queue = result_queue self.args = args self.kwargs = kwargs def run(self): while(True): try: print("task queue size is %d"%(self.task_queue.qsize())) item = self.task_queue.get(True) self.task_handler(item,self.result_queue,*self.args,**self.kwargs) # self.task_queue.task_done() except queue.Empty as e: print("%s thread is done"%(self.name)) break except Exception as e: print("error: %s"%(e)) print(traceback.format_exc()) class MultiThreadHandler(object): def __init__(self,task_queue,task_handler,result_queue,thread_count=1,*args,**kwargs): self.task_queue = task_queue self.task_handler = task_handler self.result_queue = result_queue self.list_thread = [] self.thread_count = thread_count self.args = args self.kwargs = kwargs def run(self): for i in range(self.thread_count): th = _taskHandler(self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs) # th.setDaemon(True) self.list_thread.append(th) for th in self.list_thread: th.start() while(not self._check_all_done()): try: time.sleep(1) # _quit = False # line = sys.stdin.readline() # if line.strip()=="quit": # _quit = True # if _quit: # break except KeyboardInterrupt: print("interrupted by keyboard") self.stop_all() break print("the whole task is done") def _check_all_done(self): bool_done = True for th in self.list_thread: if th.isAlive(): bool_done = False return bool_done def stop_all(self): for th in self.list_thread: if th.isAlive: stop_thread(th) def test_handler(item,result_queue): print(item) if __name__=="__main__": task_queue = queue.Queue() result_queue = queue.Queue() for i in range(100): task_queue.put(i) a = MultiThreadHandler(task_queue=task_queue,task_handler=test_handler,result_queue=result_queue,thread_count=3) a.run()