import threading import queue import time import traceback import ctypes import inspect import sys,os from multiprocessing import Process,Queue import logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def _async_raise(tid, exctype): """raises the exception, performs cleanup if needed""" tid = ctypes.c_long(tid) if not inspect.isclass(exctype): exctype = type(exctype) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) if res == 0: raise ValueError("invalid thread id") elif res != 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) raise SystemError("PyThreadState_SetAsyncExc failed") def stop_thread(thread): _async_raise(thread.ident, SystemExit) class _taskHandler(threading.Thread): def __init__(self,task_queue,task_handler,result_queue,need_stop=True,timeout=60,*args,**kwargs): threading.Thread.__init__(self) # Process.__init__(self) self.task_queue = task_queue self.task_handler = task_handler self.result_queue = result_queue self.need_stop = need_stop self.args = args self.kwargs = kwargs self.timeout= timeout def run(self): while(True): try: logging.info("%s - handler task queue size is %d need_stop %s thread_id:%d-%d"%(self.task_handler.__name__, self.task_queue.qsize(),str(self.need_stop),os.getpid(),threading.get_ident())) item = self.task_queue.get(True,timeout=5) self.task_handler(item,self.result_queue,*self.args,**self.kwargs) # t = threading.Thread(target=self.task_handler,args=(item,self.result_queue,*self.args,self.kwargs)) # t.start() # start_time = time.time() # while 1: # if not t.is_alive(): # break # if time.time()-start_time>self.timeout: # logging.info("thread %d run to long time ,killed"%(threading.get_ident())) # stop_thread(t) # break # time.sleep(0.1) # self.task_queue.task_done() except queue.Empty as e: # logging.info("%s thread is done"%(self.name)) if self.need_stop and self.task_queue.empty(): break except Exception as e: logging.info("error: %s"%(e)) traceback.print_exc() class MultiThreadHandler(object): def __init__(self,task_queue,task_handler,result_queue,thread_count=1,process_count=1,need_stop=True,restart=False,timeout=60,*args,**kwargs): self.task_queue = task_queue self.task_handler = task_handler self.result_queue = result_queue self.list_thread = [] self.thread_count = thread_count self.process_count = process_count self.args = args self.kwargs = kwargs self.restart = restart self.need_stop = need_stop self.timeout = timeout def getThreadStatus(self): _count = 0 restart = 0 for _t in self.list_thread: if _t.is_alive(): _count += 1 else: if self.restart: _t = _taskHandler(self.task_queue,self.task_handler,self.result_queue,self.need_stop,self.timeout,*self.args,**self.kwargs) _t.start() restart += 1 logging.debug("thread status alive:%d restart:%d total:%d need_stop %s"%(_count,restart,len(self.list_thread),str(self.need_stop))) return _count,restart,len(self.list_thread) def run(self): self.list_thread = [] for i in range(self.thread_count): th = _taskHandler(self.task_queue,self.task_handler,self.result_queue,self.need_stop,*self.args,**self.kwargs) # th.setDaemon(True) self.list_thread.append(th) for th in self.list_thread: th.start() while(not self._check_all_done()): try: time.sleep(1) _,_restart,_count = self.getThreadStatus() if _count==0: if self.task_queue.qsize()==0: break else: self.restart = True _,_restart,_count = self.getThreadStatus() # if _count==_restart and self.task_queue.qsize()==0: # break # �����ֶ�ֹͣ # _quit = False # line = sys.stdin.readline() # if line.strip()=="quit": # _quit = True # if _quit: # break except KeyboardInterrupt: print("interrupted by keyboard") self.stop_all() break print("the whole task is done") def _check_all_done(self): if not self.need_stop: return False bool_done = True for th in self.list_thread: if th.isAlive(): bool_done = False return bool_done def stop_all(self): for th in self.list_thread: if th.isAlive: stop_thread(th) def test_handler(item,result_queue): print(item) if __name__=="__main__": task_queue = queue.Queue() result_queue = queue.Queue() for i in range(100): task_queue.put(i) a = MultiThreadHandler(task_queue=task_queue,task_handler=test_handler,result_queue=result_queue,thread_count=3) a.run()