123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- import threading
- import queue
- import time
- import traceback
- import ctypes
- import inspect
- import sys,os
- from multiprocessing import Process,Queue
- import logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def _async_raise(tid, exctype):
- """raises the exception, performs cleanup if needed"""
- tid = ctypes.c_long(tid)
- if not inspect.isclass(exctype):
- exctype = type(exctype)
- res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
- if res == 0:
- raise ValueError("invalid thread id")
- elif res != 1:
- ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
- raise SystemError("PyThreadState_SetAsyncExc failed")
- def stop_thread(thread):
- _async_raise(thread.ident, SystemExit)
- class _taskHandler(threading.Thread):
- def __init__(self,task_queue,task_handler,result_queue,need_stop=True,timeout=60,*args,**kwargs):
- threading.Thread.__init__(self)
- # Process.__init__(self)
- self.task_queue = task_queue
- self.task_handler = task_handler
- self.result_queue = result_queue
- self.need_stop = need_stop
- self.args = args
- self.kwargs = kwargs
- self.timeout= timeout
- def run(self):
- while(True):
- try:
- logging.info("%s - handler task queue size is %d need_stop %s thread_id:%d-%d"%(self.task_handler.__name__, self.task_queue.qsize(),str(self.need_stop),os.getpid(),threading.get_ident()))
- item = self.task_queue.get(True,timeout=5)
- self.task_handler(item,self.result_queue,*self.args,**self.kwargs)
- # t = threading.Thread(target=self.task_handler,args=(item,self.result_queue,*self.args,self.kwargs))
- # t.start()
- # start_time = time.time()
- # while 1:
- # if not t.is_alive():
- # break
- # if time.time()-start_time>self.timeout:
- # logging.info("thread %d run to long time ,killed"%(threading.get_ident()))
- # stop_thread(t)
- # break
- # time.sleep(0.1)
- # self.task_queue.task_done()
- except queue.Empty as e:
- # logging.info("%s thread is done"%(self.name))
- if self.need_stop and self.task_queue.empty():
- break
- except Exception as e:
- logging.info("error: %s"%(e))
- traceback.print_exc()
- class MultiThreadHandler(object):
- def __init__(self,task_queue,task_handler,result_queue,thread_count=1,process_count=1,need_stop=True,restart=False,timeout=60,*args,**kwargs):
- self.task_queue = task_queue
- self.task_handler = task_handler
- self.result_queue = result_queue
- self.list_thread = []
- self.thread_count = thread_count
- self.process_count = process_count
- self.args = args
- self.kwargs = kwargs
- self.restart = restart
- self.need_stop = need_stop
- self.timeout = timeout
- def getThreadStatus(self):
- _count = 0
- restart = 0
- for _t in self.list_thread:
- if _t.is_alive():
- _count += 1
- else:
- if self.restart:
- _t = _taskHandler(self.task_queue,self.task_handler,self.result_queue,self.need_stop,self.timeout,*self.args,**self.kwargs)
- _t.start()
- restart += 1
- logging.debug("thread status alive:%d restart:%d total:%d need_stop %s"%(_count,restart,len(self.list_thread),str(self.need_stop)))
- return _count,restart,len(self.list_thread)
- def run(self):
- self.list_thread = []
- for i in range(self.thread_count):
- th = _taskHandler(self.task_queue,self.task_handler,self.result_queue,self.need_stop,*self.args,**self.kwargs)
- # th.setDaemon(True)
- self.list_thread.append(th)
- for th in self.list_thread:
- th.start()
- while(not self._check_all_done()):
- try:
- time.sleep(1)
- _,_restart,_count = self.getThreadStatus()
- if _count==0:
- if self.task_queue.qsize()==0:
- break
- else:
- self.restart = True
- _,_restart,_count = self.getThreadStatus()
- # if _count==_restart and self.task_queue.qsize()==0:
- # break
- # �����ֶ�ֹͣ
- # _quit = False
- # line = sys.stdin.readline()
- # if line.strip()=="quit":
- # _quit = True
- # if _quit:
- # break
- except KeyboardInterrupt:
- print("interrupted by keyboard")
- self.stop_all()
- break
- print("the whole task is done")
- def _check_all_done(self):
- if not self.need_stop:
- return False
- bool_done = True
- for th in self.list_thread:
- if th.isAlive():
- bool_done = False
- return bool_done
- def stop_all(self):
- for th in self.list_thread:
- if th.isAlive:
- stop_thread(th)
- def test_handler(item,result_queue):
- print(item)
- if __name__=="__main__":
- task_queue = queue.Queue()
- result_queue = queue.Queue()
- for i in range(100):
- task_queue.put(i)
- a = MultiThreadHandler(task_queue=task_queue,task_handler=test_handler,result_queue=result_queue,thread_count=3)
- a.run()
|