multiThread.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. import threading
  2. import queue
  3. import time
  4. import traceback
  5. import ctypes
  6. import inspect
  7. import sys
  8. from multiprocessing import Process,Queue
  9. import logging
  10. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  11. def _async_raise(tid, exctype):
  12. """raises the exception, performs cleanup if needed"""
  13. tid = ctypes.c_long(tid)
  14. if not inspect.isclass(exctype):
  15. exctype = type(exctype)
  16. res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
  17. if res == 0:
  18. raise ValueError("invalid thread id")
  19. elif res != 1:
  20. ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
  21. raise SystemError("PyThreadState_SetAsyncExc failed")
  22. def stop_thread(thread):
  23. _async_raise(thread.ident, SystemExit)
  24. class _taskHandler(threading.Thread):
  25. def __init__(self,task_queue,task_handler,result_queue,*args,**kwargs):
  26. threading.Thread.__init__(self)
  27. # Process.__init__(self)
  28. self.task_queue = task_queue
  29. self.task_handler = task_handler
  30. self.result_queue = result_queue
  31. self.args = args
  32. self.kwargs = kwargs
  33. def run(self):
  34. while(True):
  35. try:
  36. # logging.info("task queue size is %d"%(self.task_queue.qsize()))
  37. item = self.task_queue.get(True,timeout=1)
  38. self.task_handler(item,self.result_queue,*self.args,**self.kwargs)
  39. # self.task_queue.task_done()
  40. except queue.Empty as e:
  41. # logging.info("%s thread is done"%(self.name))
  42. if self.task_queue.empty():
  43. break
  44. except Exception as e:
  45. logging.info("error: %s"%(e))
  46. traceback.print_exc()
  47. class MultiThreadHandler(object):
  48. def __init__(self,task_queue,task_handler,result_queue,thread_count=1,process_count=1,restart=False,*args,**kwargs):
  49. self.task_queue = task_queue
  50. self.task_handler = task_handler
  51. self.result_queue = result_queue
  52. self.list_thread = []
  53. self.thread_count = thread_count
  54. self.process_count = process_count
  55. self.args = args
  56. self.kwargs = kwargs
  57. self.restart = restart
  58. def getThreadStatus(self):
  59. _count = 0
  60. restart = 0
  61. for _t in self.list_thread:
  62. if _t.is_alive():
  63. _count += 1
  64. else:
  65. if self.restart:
  66. _t = _taskHandler(self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
  67. _t.start()
  68. restart += 1
  69. logging.info("thread status alive:%d restart:%d total:%d"%(_count,restart,len(self.list_thread)))
  70. return _count,restart,len(self.list_thread)
  71. def run(self):
  72. self.list_thread = []
  73. for i in range(self.thread_count):
  74. th = _taskHandler(self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
  75. # th.setDaemon(True)
  76. self.list_thread.append(th)
  77. for th in self.list_thread:
  78. th.start()
  79. while(not self._check_all_done()):
  80. try:
  81. time.sleep(1)
  82. _,_restart,_count = self.getThreadStatus()
  83. if _count==0:
  84. if self.task_queue.qsize()==0:
  85. break
  86. else:
  87. self.restart = True
  88. _,_restart,_count = self.getThreadStatus()
  89. # if _count==_restart and self.task_queue.qsize()==0:
  90. # break
  91. # �����ֶ�ֹͣ
  92. # _quit = False
  93. # line = sys.stdin.readline()
  94. # if line.strip()=="quit":
  95. # _quit = True
  96. # if _quit:
  97. # break
  98. except KeyboardInterrupt:
  99. print("interrupted by keyboard")
  100. self.stop_all()
  101. break
  102. print("the whole task is done")
  103. def _check_all_done(self):
  104. bool_done = True
  105. for th in self.list_thread:
  106. if th.isAlive():
  107. bool_done = False
  108. return bool_done
  109. def stop_all(self):
  110. for th in self.list_thread:
  111. if th.isAlive:
  112. stop_thread(th)
  113. def test_handler(item,result_queue):
  114. print(item)
  115. if __name__=="__main__":
  116. task_queue = queue.Queue()
  117. result_queue = queue.Queue()
  118. for i in range(100):
  119. task_queue.put(i)
  120. a = MultiThreadHandler(task_queue=task_queue,task_handler=test_handler,result_queue=result_queue,thread_count=3)
  121. a.run()