123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- import threading
- import queue
- import time
- import traceback
- import ctypes
- import inspect
- import sys
- def _async_raise(tid, exctype):
- """raises the exception, performs cleanup if needed"""
- tid = ctypes.c_long(tid)
- if not inspect.isclass(exctype):
- exctype = type(exctype)
- res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
- if res == 0:
- raise ValueError("invalid thread id")
- elif res != 1:
- ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
- raise SystemError("PyThreadState_SetAsyncExc failed")
- def stop_thread(thread):
- _async_raise(thread.ident, SystemExit)
- class _taskHandler(threading.Thread):
- def __init__(self,task_queue,task_handler,result_queue,*args,**kwargs):
- threading.Thread.__init__(self)
- self.task_queue = task_queue
- self.task_handler = task_handler
- self.result_queue = result_queue
- self.args = args
- self.kwargs = kwargs
- def run(self):
- while(True):
- try:
- print("task queue size is %d"%(self.task_queue.qsize()))
- item = self.task_queue.get(True)
- self.task_handler(item,self.result_queue,*self.args,**self.kwargs)
- # self.task_queue.task_done()
- except queue.Empty as e:
- print("%s thread is done"%(self.name))
- break
- except Exception as e:
- print("error: %s"%(e))
- print(traceback.format_exc())
- class MultiThreadHandler(object):
- def __init__(self,task_queue,task_handler,result_queue,thread_count=1,*args,**kwargs):
- self.task_queue = task_queue
- self.task_handler = task_handler
- self.result_queue = result_queue
- self.list_thread = []
- self.thread_count = thread_count
- self.args = args
- self.kwargs = kwargs
- def run(self):
- for i in range(self.thread_count):
- th = _taskHandler(self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
- # th.setDaemon(True)
- self.list_thread.append(th)
- for th in self.list_thread:
- th.start()
- while(not self._check_all_done()):
- try:
- time.sleep(1)
- # _quit = False
- # line = sys.stdin.readline()
- # if line.strip()=="quit":
- # _quit = True
- # if _quit:
- # break
- except KeyboardInterrupt:
- print("interrupted by keyboard")
- self.stop_all()
- break
- print("the whole task is done")
- def _check_all_done(self):
- bool_done = True
- for th in self.list_thread:
- if th.isAlive():
- bool_done = False
- return bool_done
- def stop_all(self):
- for th in self.list_thread:
- if th.isAlive:
- stop_thread(th)
- def test_handler(item,result_queue):
- print(item)
- if __name__=="__main__":
- task_queue = queue.Queue()
- result_queue = queue.Queue()
- for i in range(100):
- task_queue.put(i)
- a = MultiThreadHandler(task_queue=task_queue,task_handler=test_handler,result_queue=result_queue,thread_count=3)
- a.run()
|