MultiHandler.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. from multiprocessing import Queue,Process
  2. import traceback
  3. import threading
  4. import time
  5. from queue import Empty
  6. from BiddingKG.dl.common.multiThread import MultiThreadHandler,stop_thread
  7. import sys
  8. class TaskHandler(threading.Thread):
  9. def __init__(self,task_queue,result_queue,*args,**kwargs):
  10. threading.Thread.__init__(self)
  11. self.task_queue = task_queue
  12. self.result_queue = result_queue
  13. self.args = args
  14. self.kwargs = kwargs
  15. def task_handler(self):
  16. raise NotImplementedError
  17. def run(self):
  18. while(True):
  19. try:
  20. if not self.task_queue.empty():
  21. print("task queue size is %d"%(self.task_queue.qsize()))
  22. item = self.task_queue.get(True,timeout=1)
  23. self.task_handler(item,self.result_queue,*self.args,**self.kwargs)
  24. else:
  25. print("%s thread is done"%(self.name))
  26. break
  27. except Empty as e:
  28. print("%s thread is done"%(self.name))
  29. break
  30. except Exception as e:
  31. print("error: %s"%(e))
  32. print(traceback.format_exc())
  33. # class MultiThreadHandler(object):
  34. #
  35. #
  36. # def __init__(self,task_queue,Task_handler,result_queue,thread_count=1,*args,**kwargs):
  37. # self.task_queue = task_queue
  38. # self.Task_handler = Task_handler
  39. # self.result_queue = result_queue
  40. # self.list_thread = []
  41. # self.thread_count = thread_count
  42. #
  43. # def run(self):
  44. # for i in range(self.thread_count):
  45. # th = self.Task_handler(self.task_queue,self.task_handler,self.result_queue)
  46. # self.list_thread.append(th)
  47. #
  48. # for th in self.list_thread:
  49. # th.start()
  50. #
  51. # while(not self._check_all_done()):
  52. # try:
  53. # time.sleep(1)
  54. # except KeyboardInterrupt:
  55. # print("interrupted by keyboard")
  56. # self.stop_all()
  57. # break
  58. #
  59. #
  60. # def _check_all_done(self):
  61. # bool_done = True
  62. # for th in self.list_thread:
  63. # if th.isAlive():
  64. # bool_done = False
  65. # return bool_done
  66. #
  67. # def stop_all(self):
  68. # for th in self.list_thread:
  69. # th.stop()
  70. def test_handler(item,result_queue):
  71. item["change"] += 1
  72. result_queue.put(item)
  73. class MultiHandler():
  74. def __init__(self,task_queue,task_handler,result_queue,process_count=1,thread_count=1,*args,**kwargs):
  75. self.task_queue = task_queue
  76. self.task_handler = task_handler
  77. self.result_queue = result_queue
  78. self.process_count = process_count
  79. self.thread_count = thread_count
  80. def processHandler(self,processId,*args,**kwargs):
  81. threadHandler = MultiThreadHandler(self.task_queue,self.task_handler,self.result_queue,self.thread_count)
  82. threadHandler.run()
  83. print("process %s is done"%processId)
  84. def run(self):
  85. self.list_process = []
  86. for i in range(self.process_count):
  87. p = Process(target=self.processHandler,args=("process-%d"%(i),""))
  88. self.list_process.append(p)
  89. for p in self.list_process:
  90. p.start()
  91. while(not self._check_all_done()):
  92. try:
  93. time.sleep(1)
  94. _quit = False
  95. line = sys.stdin.readline()
  96. if line.strip()=="quit":
  97. _quit = True
  98. if _quit:
  99. break
  100. except KeyboardInterrupt:
  101. print("interrupted by keyboard")
  102. self.stop_all()
  103. break
  104. print("the whole process is done")
  105. def _check_all_done(self):
  106. bool_done = True
  107. for th in self.list_process:
  108. if th.is_alive():
  109. bool_done = False
  110. return bool_done
  111. def stop_all(self):
  112. for th in self.list_process:
  113. if th.is_alive:
  114. stop_thread(th)
  115. if __name__=="__main__":
  116. list_i = []
  117. task_queue = Queue()
  118. result_queue = Queue()
  119. for i in range(100):
  120. _dict = {"source":i,"change":i}
  121. list_i.append(_dict)
  122. task_queue.put(_dict)
  123. # a = MultiThreadHandler(task_queue,test_handler,result_queue,thread_count=3)
  124. a = MultiHandler(task_queue,test_handler,result_queue,process_count=3,thread_count=3)
  125. a.run()
  126. while(True):
  127. try:
  128. item = result_queue.get(block=True,timeout=1)
  129. print(item)
  130. except Exception as e:
  131. print(traceback.format_exc())
  132. break