multiThread.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import threading
  2. import queue
  3. import time
  4. import traceback
  5. import ctypes
  6. import inspect
  7. import sys,os
  8. from multiprocessing import Process,Queue
  9. import logging
  10. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  11. def _async_raise(tid, exctype):
  12. """raises the exception, performs cleanup if needed"""
  13. tid = ctypes.c_long(tid)
  14. if not inspect.isclass(exctype):
  15. exctype = type(exctype)
  16. res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
  17. if res == 0:
  18. raise ValueError("invalid thread id")
  19. elif res != 1:
  20. ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
  21. raise SystemError("PyThreadState_SetAsyncExc failed")
  22. def stop_thread(thread):
  23. _async_raise(thread.ident, SystemExit)
  24. class _taskHandler(threading.Thread):
  25. def __init__(self,task_queue,task_handler,result_queue,need_stop=True,timeout=60,*args,**kwargs):
  26. threading.Thread.__init__(self)
  27. # Process.__init__(self)
  28. self.task_queue = task_queue
  29. self.task_handler = task_handler
  30. self.result_queue = result_queue
  31. self.need_stop = need_stop
  32. self.args = args
  33. self.kwargs = kwargs
  34. self.timeout= timeout
  35. def run(self):
  36. while(True):
  37. try:
  38. logging.info("%s - handler task queue size is %d need_stop %s thread_id:%d-%d"%(self.task_handler.__name__, self.task_queue.qsize(),str(self.need_stop),os.getpid(),threading.get_ident()))
  39. item = self.task_queue.get(True,timeout=5)
  40. self.task_handler(item,self.result_queue,*self.args,**self.kwargs)
  41. # t = threading.Thread(target=self.task_handler,args=(item,self.result_queue,*self.args,self.kwargs))
  42. # t.start()
  43. # start_time = time.time()
  44. # while 1:
  45. # if not t.is_alive():
  46. # break
  47. # if time.time()-start_time>self.timeout:
  48. # logging.info("thread %d run to long time ,killed"%(threading.get_ident()))
  49. # stop_thread(t)
  50. # break
  51. # time.sleep(0.1)
  52. # self.task_queue.task_done()
  53. except queue.Empty as e:
  54. # logging.info("%s thread is done"%(self.name))
  55. if self.need_stop and self.task_queue.empty():
  56. break
  57. except Exception as e:
  58. logging.info("error: %s"%(e))
  59. traceback.print_exc()
  60. class MultiThreadHandler(object):
  61. def __init__(self,task_queue,task_handler,result_queue,thread_count=1,process_count=1,need_stop=True,restart=False,timeout=60,*args,**kwargs):
  62. self.task_queue = task_queue
  63. self.task_handler = task_handler
  64. self.result_queue = result_queue
  65. self.list_thread = []
  66. self.thread_count = thread_count
  67. self.process_count = process_count
  68. self.args = args
  69. self.kwargs = kwargs
  70. self.restart = restart
  71. self.need_stop = need_stop
  72. self.timeout = timeout
  73. def getThreadStatus(self):
  74. _count = 0
  75. restart = 0
  76. for _t in self.list_thread:
  77. if _t.is_alive():
  78. _count += 1
  79. else:
  80. if self.restart:
  81. _t = _taskHandler(self.task_queue,self.task_handler,self.result_queue,self.need_stop,self.timeout,*self.args,**self.kwargs)
  82. _t.start()
  83. restart += 1
  84. logging.debug("thread status alive:%d restart:%d total:%d need_stop %s"%(_count,restart,len(self.list_thread),str(self.need_stop)))
  85. return _count,restart,len(self.list_thread)
  86. def run(self):
  87. self.list_thread = []
  88. for i in range(self.thread_count):
  89. th = _taskHandler(self.task_queue,self.task_handler,self.result_queue,self.need_stop,*self.args,**self.kwargs)
  90. # th.setDaemon(True)
  91. self.list_thread.append(th)
  92. for th in self.list_thread:
  93. th.start()
  94. while(not self._check_all_done()):
  95. try:
  96. time.sleep(1)
  97. _,_restart,_count = self.getThreadStatus()
  98. if _count==0:
  99. if self.task_queue.qsize()==0:
  100. break
  101. else:
  102. self.restart = True
  103. _,_restart,_count = self.getThreadStatus()
  104. # if _count==_restart and self.task_queue.qsize()==0:
  105. # break
  106. # �����ֶ�ֹͣ
  107. # _quit = False
  108. # line = sys.stdin.readline()
  109. # if line.strip()=="quit":
  110. # _quit = True
  111. # if _quit:
  112. # break
  113. except KeyboardInterrupt:
  114. print("interrupted by keyboard")
  115. self.stop_all()
  116. break
  117. print("the whole task is done")
  118. def _check_all_done(self):
  119. if not self.need_stop:
  120. return False
  121. bool_done = True
  122. for th in self.list_thread:
  123. if th.isAlive():
  124. bool_done = False
  125. return bool_done
  126. def stop_all(self):
  127. for th in self.list_thread:
  128. if th.isAlive:
  129. stop_thread(th)
  130. def test_handler(item,result_queue):
  131. print(item)
  132. if __name__=="__main__":
  133. task_queue = queue.Queue()
  134. result_queue = queue.Queue()
  135. for i in range(100):
  136. task_queue.put(i)
  137. a = MultiThreadHandler(task_queue=task_queue,task_handler=test_handler,result_queue=result_queue,thread_count=3)
  138. a.run()