from multiprocessing import Queue,Process import traceback import threading import time from queue import Empty from BiddingKG.dl.common.multiThread import MultiThreadHandler,stop_thread import sys class TaskHandler(threading.Thread): def __init__(self,task_queue,result_queue,*args,**kwargs): threading.Thread.__init__(self) self.task_queue = task_queue self.result_queue = result_queue self.args = args self.kwargs = kwargs def task_handler(self): raise NotImplementedError def run(self): while(True): try: if not self.task_queue.empty(): print("task queue size is %d"%(self.task_queue.qsize())) item = self.task_queue.get(True,timeout=1) self.task_handler(item,self.result_queue,*self.args,**self.kwargs) else: print("%s thread is done"%(self.name)) break except Empty as e: print("%s thread is done"%(self.name)) break except Exception as e: print("error: %s"%(e)) print(traceback.format_exc()) # class MultiThreadHandler(object): # # # def __init__(self,task_queue,Task_handler,result_queue,thread_count=1,*args,**kwargs): # self.task_queue = task_queue # self.Task_handler = Task_handler # self.result_queue = result_queue # self.list_thread = [] # self.thread_count = thread_count # # def run(self): # for i in range(self.thread_count): # th = self.Task_handler(self.task_queue,self.task_handler,self.result_queue) # self.list_thread.append(th) # # for th in self.list_thread: # th.start() # # while(not self._check_all_done()): # try: # time.sleep(1) # except KeyboardInterrupt: # print("interrupted by keyboard") # self.stop_all() # break # # # def _check_all_done(self): # bool_done = True # for th in self.list_thread: # if th.isAlive(): # bool_done = False # return bool_done # # def stop_all(self): # for th in self.list_thread: # th.stop() def test_handler(item,result_queue): item["change"] += 1 result_queue.put(item) class MultiHandler(): def __init__(self,task_queue,task_handler,result_queue,process_count=1,thread_count=1,*args,**kwargs): self.task_queue = task_queue self.task_handler = task_handler self.result_queue = result_queue self.process_count = process_count self.thread_count = thread_count def processHandler(self,processId,*args,**kwargs): threadHandler = MultiThreadHandler(self.task_queue,self.task_handler,self.result_queue,self.thread_count) threadHandler.run() print("process %s is done"%processId) def run(self): self.list_process = [] for i in range(self.process_count): p = Process(target=self.processHandler,args=("process-%d"%(i),"")) self.list_process.append(p) for p in self.list_process: p.start() while(not self._check_all_done()): try: time.sleep(1) _quit = False line = sys.stdin.readline() if line.strip()=="quit": _quit = True if _quit: break except KeyboardInterrupt: print("interrupted by keyboard") self.stop_all() break print("the whole process is done") def _check_all_done(self): bool_done = True for th in self.list_process: if th.is_alive(): bool_done = False return bool_done def stop_all(self): for th in self.list_process: if th.is_alive: stop_thread(th) if __name__=="__main__": list_i = [] task_queue = Queue() result_queue = Queue() for i in range(100): _dict = {"source":i,"change":i} list_i.append(_dict) task_queue.put(_dict) # a = MultiThreadHandler(task_queue,test_handler,result_queue,thread_count=3) a = MultiHandler(task_queue,test_handler,result_queue,process_count=3,thread_count=3) a.run() while(True): try: item = result_queue.get(block=True,timeout=1) print(item) except Exception as e: print(traceback.format_exc()) break