123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- from multiprocessing import Queue,Process
- import traceback
- import threading
- import time
- from queue import Empty
- from BiddingKG.dl.common.multiThread import MultiThreadHandler,stop_thread
- import sys
- class TaskHandler(threading.Thread):
- def __init__(self,task_queue,result_queue,*args,**kwargs):
- threading.Thread.__init__(self)
- self.task_queue = task_queue
- self.result_queue = result_queue
- self.args = args
- self.kwargs = kwargs
- def task_handler(self):
- raise NotImplementedError
- def run(self):
- while(True):
- try:
- if not self.task_queue.empty():
- print("task queue size is %d"%(self.task_queue.qsize()))
- item = self.task_queue.get(True,timeout=1)
- self.task_handler(item,self.result_queue,*self.args,**self.kwargs)
- else:
- print("%s thread is done"%(self.name))
- break
- except Empty as e:
- print("%s thread is done"%(self.name))
- break
- except Exception as e:
- print("error: %s"%(e))
- print(traceback.format_exc())
- # class MultiThreadHandler(object):
- #
- #
- # def __init__(self,task_queue,Task_handler,result_queue,thread_count=1,*args,**kwargs):
- # self.task_queue = task_queue
- # self.Task_handler = Task_handler
- # self.result_queue = result_queue
- # self.list_thread = []
- # self.thread_count = thread_count
- #
- # def run(self):
- # for i in range(self.thread_count):
- # th = self.Task_handler(self.task_queue,self.task_handler,self.result_queue)
- # self.list_thread.append(th)
- #
- # for th in self.list_thread:
- # th.start()
- #
- # while(not self._check_all_done()):
- # try:
- # time.sleep(1)
- # except KeyboardInterrupt:
- # print("interrupted by keyboard")
- # self.stop_all()
- # break
- #
- #
- # def _check_all_done(self):
- # bool_done = True
- # for th in self.list_thread:
- # if th.isAlive():
- # bool_done = False
- # return bool_done
- #
- # def stop_all(self):
- # for th in self.list_thread:
- # th.stop()
- def test_handler(item,result_queue):
- item["change"] += 1
- result_queue.put(item)
- class MultiHandler():
- def __init__(self,task_queue,task_handler,result_queue,process_count=1,thread_count=1,*args,**kwargs):
- self.task_queue = task_queue
- self.task_handler = task_handler
- self.result_queue = result_queue
- self.process_count = process_count
- self.thread_count = thread_count
- def processHandler(self,processId,*args,**kwargs):
- threadHandler = MultiThreadHandler(self.task_queue,self.task_handler,self.result_queue,self.thread_count)
- threadHandler.run()
- print("process %s is done"%processId)
- def run(self):
- self.list_process = []
- for i in range(self.process_count):
- p = Process(target=self.processHandler,args=("process-%d"%(i),""))
- self.list_process.append(p)
- for p in self.list_process:
- p.start()
- while(not self._check_all_done()):
- try:
- time.sleep(1)
- _quit = False
- line = sys.stdin.readline()
- if line.strip()=="quit":
- _quit = True
- if _quit:
- break
- except KeyboardInterrupt:
- print("interrupted by keyboard")
- self.stop_all()
- break
- print("the whole process is done")
- def _check_all_done(self):
- bool_done = True
- for th in self.list_process:
- if th.is_alive():
- bool_done = False
- return bool_done
- def stop_all(self):
- for th in self.list_process:
- if th.is_alive:
- stop_thread(th)
- if __name__=="__main__":
- list_i = []
- task_queue = Queue()
- result_queue = Queue()
- for i in range(100):
- _dict = {"source":i,"change":i}
- list_i.append(_dict)
- task_queue.put(_dict)
- # a = MultiThreadHandler(task_queue,test_handler,result_queue,thread_count=3)
- a = MultiHandler(task_queue,test_handler,result_queue,process_count=3,thread_count=3)
- a.run()
- while(True):
- try:
- item = result_queue.get(block=True,timeout=1)
- print(item)
- except Exception as e:
- print(traceback.format_exc())
- break
|