multiThread.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import threading
  2. import queue
  3. import time
  4. import traceback
  5. import ctypes
  6. import inspect
  7. import sys
  8. def _async_raise(tid, exctype):
  9. """raises the exception, performs cleanup if needed"""
  10. tid = ctypes.c_long(tid)
  11. if not inspect.isclass(exctype):
  12. exctype = type(exctype)
  13. res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
  14. if res == 0:
  15. raise ValueError("invalid thread id")
  16. elif res != 1:
  17. ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
  18. raise SystemError("PyThreadState_SetAsyncExc failed")
  19. def stop_thread(thread):
  20. _async_raise(thread.ident, SystemExit)
  21. class _taskHandler(threading.Thread):
  22. def __init__(self,task_queue,task_handler,result_queue,*args,**kwargs):
  23. threading.Thread.__init__(self)
  24. self.task_queue = task_queue
  25. self.task_handler = task_handler
  26. self.result_queue = result_queue
  27. self.args = args
  28. self.kwargs = kwargs
  29. def run(self):
  30. while(True):
  31. try:
  32. print("task queue size is %d"%(self.task_queue.qsize()))
  33. item = self.task_queue.get(False)
  34. self.task_handler(item,self.result_queue,*self.args,**self.kwargs)
  35. # self.task_queue.task_done()
  36. except queue.Empty as e:
  37. print("%s thread is done"%(self.name))
  38. break
  39. except Exception as e:
  40. print("error: %s"%(e))
  41. print(traceback.format_exc())
  42. class MultiThreadHandler(object):
  43. def __init__(self,task_queue,task_handler,result_queue,thread_count=1,*args,**kwargs):
  44. self.task_queue = task_queue
  45. self.task_handler = task_handler
  46. self.result_queue = result_queue
  47. self.list_thread = []
  48. self.thread_count = thread_count
  49. self.args = args
  50. self.kwargs = kwargs
  51. def run(self):
  52. for i in range(self.thread_count):
  53. th = _taskHandler(self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
  54. th.setDaemon(True)
  55. self.list_thread.append(th)
  56. for th in self.list_thread:
  57. th.start()
  58. while(not self._check_all_done()):
  59. try:
  60. time.sleep(1)
  61. # _quit = False
  62. # line = sys.stdin.readline()
  63. # if line.strip()=="quit":
  64. # _quit = True
  65. # if _quit:
  66. # break
  67. except KeyboardInterrupt:
  68. print("interrupted by keyboard")
  69. self.stop_all()
  70. break
  71. print("the whole task is done")
  72. def _check_all_done(self):
  73. bool_done = True
  74. for th in self.list_thread:
  75. if th.isAlive():
  76. bool_done = False
  77. return bool_done
  78. def stop_all(self):
  79. for th in self.list_thread:
  80. if th.isAlive:
  81. stop_thread(th)
  82. def test_handler(item,result_queue):
  83. print(item)
  84. if __name__=="__main__":
  85. task_queue = queue.Queue()
  86. result_queue = queue.Queue()
  87. for i in range(100):
  88. task_queue.put(i)
  89. a = MultiThreadHandler(task_queue=task_queue,task_handler=test_handler,result_queue=result_queue,thread_count=3)
  90. a.run()