multiProcess.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623
  1. import threading
  2. import queue
  3. import time
  4. import traceback
  5. import ctypes
  6. import inspect
  7. import sys,os
  8. from multiprocessing import Process,Queue
  9. import multiprocessing
  10. # try:
  11. # def monkeypatch_os_fork_functions():
  12. # """
  13. # Replace os.fork* with wrappers that use ForkSafeLock to acquire
  14. # all locks before forking and release them afterwards.
  15. # """
  16. # builtin_function = type(''.join)
  17. # if hasattr(os, 'fork') and isinstance(os.fork, builtin_function):
  18. # global _orig_os_fork
  19. # _orig_os_fork = os.fork
  20. # os.fork = os_fork_wrapper
  21. # if hasattr(os, 'forkpty') and isinstance(os.forkpty, builtin_function):
  22. # global _orig_os_forkpty
  23. # _orig_os_forkpty = os.forkpty
  24. # os.forkpty = os_forkpty_wrapper
  25. #
  26. #
  27. # # This lock protects all of the lists below.
  28. # _fork_lock = threading.Lock()
  29. # _prepare_call_list = []
  30. # _prepare_call_exceptions = []
  31. # _parent_call_list = []
  32. # _child_call_list = []
  33. #
  34. #
  35. # def atfork(prepare=None, parent=None, child=None):
  36. # """A Python work-a-like of pthread_atfork.
  37. #
  38. # Any time a fork() is called from Python, all 'prepare' callables will
  39. # be called in the order they were registered using this function.
  40. # After the fork (successful or not), all 'parent' callables will be called in
  41. # the parent process. If the fork succeeded, all 'child' callables will be
  42. # called in the child process.
  43. # No exceptions may be raised from any of the registered callables. If so
  44. # they will be printed to sys.stderr after the fork call once it is safe
  45. # to do so.
  46. # """
  47. # assert not prepare or callable(prepare)
  48. # assert not parent or callable(parent)
  49. # assert not child or callable(child)
  50. # _fork_lock.acquire()
  51. # try:
  52. # if prepare:
  53. # _prepare_call_list.append(prepare)
  54. # if parent:
  55. # _parent_call_list.append(parent)
  56. # if child:
  57. # _child_call_list.append(child)
  58. # finally:
  59. # _fork_lock.release()
  60. #
  61. #
  62. # def _call_atfork_list(call_list):
  63. # """
  64. # Given a list of callables in call_list, call them all in order and save
  65. # and return a list of sys.exc_info() tuples for each exception raised.
  66. # """
  67. # exception_list = []
  68. # for func in call_list:
  69. # try:
  70. # func()
  71. # except:
  72. # exception_list.append(sys.exc_info())
  73. # return exception_list
  74. #
  75. #
  76. # def prepare_to_fork_acquire():
  77. # """Acquire our lock and call all prepare callables."""
  78. # _fork_lock.acquire()
  79. # _prepare_call_exceptions.extend(_call_atfork_list(_prepare_call_list))
  80. #
  81. #
  82. # def parent_after_fork_release():
  83. # """
  84. # Call all parent after fork callables, release the lock and print
  85. # all prepare and parent callback exceptions.
  86. # """
  87. # prepare_exceptions = list(_prepare_call_exceptions)
  88. # del _prepare_call_exceptions[:]
  89. # exceptions = _call_atfork_list(_parent_call_list)
  90. # _fork_lock.release()
  91. # _print_exception_list(prepare_exceptions, 'before fork')
  92. # _print_exception_list(exceptions, 'after fork from parent')
  93. #
  94. #
  95. # def child_after_fork_release():
  96. # """
  97. # Call all child after fork callables, release lock and print all
  98. # all child callback exceptions.
  99. # """
  100. # del _prepare_call_exceptions[:]
  101. # exceptions = _call_atfork_list(_child_call_list)
  102. # _fork_lock.release()
  103. # _print_exception_list(exceptions, 'after fork from child')
  104. #
  105. #
  106. # def _print_exception_list(exceptions, message, output_file=None):
  107. # """
  108. # Given a list of sys.exc_info tuples, print them all using the traceback
  109. # module preceeded by a message and separated by a blank line.
  110. # """
  111. # output_file = output_file or sys.stderr
  112. # message = 'Exception %s:\n' % message
  113. # for exc_type, exc_value, exc_traceback in exceptions:
  114. # output_file.write(message)
  115. # traceback.print_exception(exc_type, exc_value, exc_traceback,
  116. # file=output_file)
  117. # output_file.write('\n')
  118. #
  119. #
  120. # def os_fork_wrapper():
  121. # """Wraps os.fork() to run atfork handlers."""
  122. # pid = None
  123. # prepare_to_fork_acquire()
  124. # try:
  125. # pid = _orig_os_fork()
  126. # finally:
  127. # if pid == 0:
  128. # child_after_fork_release()
  129. # else:
  130. # # We call this regardless of fork success in order for
  131. # # the program to be in a sane state afterwards.
  132. # parent_after_fork_release()
  133. # return pid
  134. #
  135. #
  136. # def os_forkpty_wrapper():
  137. # """Wraps os.forkpty() to run atfork handlers."""
  138. # pid = None
  139. # prepare_to_fork_acquire()
  140. # try:
  141. # pid, fd = _orig_os_forkpty()
  142. # finally:
  143. # if pid == 0:
  144. # child_after_fork_release()
  145. # else:
  146. # parent_after_fork_release()
  147. # return pid, fd
  148. # class Error(Exception):
  149. # pass
  150. # def fix_logging_module():
  151. # logging = sys.modules.get('logging')
  152. # # Prevent fixing multiple times as that would cause a deadlock.
  153. # if logging and getattr(logging, 'fixed_for_atfork', None):
  154. # return
  155. # # if logging:
  156. # # warnings.warn('logging module already imported before fixup.')
  157. # import logging
  158. # if logging.getLogger().handlers:
  159. # # We could register each lock with atfork for these handlers but if
  160. # # these exist, other loggers or not yet added handlers could as well.
  161. # # Its safer to insist that this fix is applied before logging has been
  162. # # configured.
  163. # raise Error('logging handlers already registered.')
  164. #
  165. # logging._acquireLock()
  166. # try:
  167. # def fork_safe_createLock(self):
  168. # self._orig_createLock()
  169. # atfork(self.lock.acquire,
  170. # self.lock.release, self.lock.release)
  171. #
  172. # # Fix the logging.Handler lock (a major source of deadlocks).
  173. # logging.Handler._orig_createLock = logging.Handler.createLock
  174. # logging.Handler.createLock = fork_safe_createLock
  175. #
  176. # # Fix the module level lock.
  177. # atfork(logging._acquireLock,
  178. # logging._releaseLock, logging._releaseLock)
  179. #
  180. # logging.fixed_for_atfork = True
  181. # finally:
  182. # logging._releaseLock()
  183. # monkeypatch_os_fork_functions()
  184. # fix_logging_module()
  185. # except Exception as e:
  186. # traceback.print_exc()
  187. # pass
  188. import logging
  189. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  190. def _async_raise(tid, exctype):
  191. """raises the exception, performs cleanup if needed"""
  192. tid = ctypes.c_long(tid)
  193. if not inspect.isclass(exctype):
  194. exctype = type(exctype)
  195. res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
  196. if res == 0:
  197. raise ValueError("invalid thread id")
  198. elif res != 1:
  199. ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
  200. raise SystemError("PyThreadState_SetAsyncExc failed")
  201. def stop_thread(thread):
  202. try:
  203. _async_raise(thread.ident, SystemExit)
  204. except Exception as e:
  205. traceback.print_exc()
  206. class _taskHandler(Process):
  207. def __init__(self,task_queue,task_handler,result_queue,*args,**kwargs):
  208. Process.__init__(self)
  209. # Process.__init__(self)
  210. self.task_queue = task_queue
  211. self.task_handler = task_handler
  212. self.result_queue = result_queue
  213. self.args = args
  214. self.kwargs = kwargs
  215. def run(self):
  216. while(True):
  217. try:
  218. # logging.info("task queue size is %d"%(self.task_queue.qsize()))
  219. item = self.task_queue.get(True,timeout=1)
  220. self.task_handler(item,self.result_queue,*self.args,**self.kwargs)
  221. # self.task_queue.task_done()
  222. except queue.Empty as e:
  223. # logging.info("%s thread is done"%(self.name))
  224. # if self.task_queue.empty():
  225. # break
  226. time.sleep(1)
  227. except Exception as e:
  228. logging.info("error: %s"%(e))
  229. traceback.print_exc()
  230. class MultiProcessHandler(object):
  231. def __init__(self,task_queue,task_handler,result_queue,process_count=1,*args,**kwargs):
  232. self.task_queue = task_queue
  233. self.task_handler = task_handler
  234. self.result_queue = result_queue
  235. self.list_thread = []
  236. self.process_count = process_count
  237. self.args = args
  238. self.kwargs = kwargs
  239. self.restart = False
  240. def getThreadStatus(self):
  241. _count = 0
  242. restart = 0
  243. for _t in self.list_thread:
  244. if _t.is_alive():
  245. _count += 1
  246. else:
  247. if self.restart:
  248. _t = _taskHandler(self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
  249. _t.start()
  250. restart += 1
  251. logging.info("thread status alive:%d restart:%d total:%d"%(_count,restart,len(self.list_thread)))
  252. return _count,restart,len(self.list_thread)
  253. def run(self):
  254. for i in range(self.process_count):
  255. th = _taskHandler(self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
  256. # th.setDaemon(True)
  257. self.list_thread.append(th)
  258. for th in self.list_thread:
  259. th.start()
  260. while(not self._check_all_done()):
  261. try:
  262. time.sleep(1)
  263. _,_restart,_count = self.getThreadStatus()
  264. if _count==0:
  265. if self.task_queue.qsize()==0:
  266. break
  267. else:
  268. self.restart = True
  269. _,_restart,_count = self.getThreadStatus()
  270. # if _count==_restart and self.task_queue.qsize()==0:
  271. # break
  272. # �����ֶ�ֹͣ
  273. # _quit = False
  274. # line = sys.stdin.readline()
  275. # if line.strip()=="quit":
  276. # _quit = True
  277. # if _quit:
  278. # break
  279. except KeyboardInterrupt:
  280. print("interrupted by keyboard")
  281. self.stop_all()
  282. break
  283. print("the whole task is done")
  284. def _check_all_done(self):
  285. bool_done = True
  286. for th in self.list_thread:
  287. if th.is_alive():
  288. bool_done = False
  289. return bool_done
  290. def stop_all(self):
  291. for th in self.list_thread:
  292. if th.isAlive:
  293. th.terminate()
  294. class ThreadHandler(threading.Thread):
  295. def __init__(self,task_queue,task_handler,result_queue,*args,**kwargs):
  296. threading.Thread.__init__(self)
  297. # Process.__init__(self)
  298. self.task_queue = task_queue
  299. self.task_handler = task_handler
  300. self.result_queue = result_queue
  301. self.args = args
  302. self.kwargs = kwargs
  303. self.pid = os.getpid()
  304. def run(self):
  305. while(True):
  306. try:
  307. # logging.info("thread of process:%d name:%s getting item..."%(self.pid,self.name))
  308. logging.info("thread of process:%d name: %s task queue size is %d"%(self.pid,self.name,self.task_queue.qsize()))
  309. item = self.task_queue.get(False,timeout=1)
  310. self.task_handler(item,self.result_queue,*self.args,**self.kwargs)
  311. # self.task_queue.task_done()
  312. except queue.Empty as e:
  313. if self.task_queue.qsize()==0:
  314. logging.info("%s thread of process %s is done"%(self.name,self.pid))
  315. break
  316. else:
  317. logging.info("%s of process %s get item failed"%(self.name,self.pid))
  318. except Exception as e:
  319. logging.info("error: %s"%(e))
  320. traceback.print_exc()
  321. class ProcessHandler(Process):
  322. def __init__(self,thread_count,task_queue,task_handler,result_queue,need_stop=True,*args,**kwargs):
  323. # threading.Thread.__init__(self)
  324. Process.__init__(self)
  325. self.thread_count = thread_count
  326. self.task_queue = task_queue
  327. self.task_handler = task_handler
  328. self.result_queue = result_queue
  329. self.need_stop = need_stop
  330. self.args = args
  331. self.kwargs = kwargs
  332. self.thread_queue = Queue(100)
  333. self.has_run = False
  334. self.list_thread = []
  335. def run(self):
  336. logging._lock = threading.RLock()
  337. for logging_h in logging.getLogger().handlers:
  338. logging_h.createLock()
  339. if self.thread_count>0:
  340. try:
  341. self.init_thread()
  342. self.start_thread()
  343. self.setQueue()
  344. except Exception as e:
  345. traceback.print_exc()
  346. else:
  347. while(True):
  348. try:
  349. # logging.info("thread of process:%d name:%s getting item..."%(self.pid,self.name))
  350. logging.debug("main of process:%d name: %s task queue size is %d"%(self.pid,self.name,self.task_queue.qsize()))
  351. item = self.task_queue.get(False,timeout=1)
  352. self.task_handler(item,self.result_queue,*self.args,**self.kwargs)
  353. # self.task_queue.task_done()
  354. except queue.Empty as e:
  355. if self.task_queue.qsize()==0:
  356. logging.debug("%s main of process %s is done"%(self.name,self.pid))
  357. if self.need_stop:
  358. break
  359. else:
  360. logging.debug("%s of process %s get item failed"%(self.name,self.pid))
  361. except Exception as e:
  362. logging.info("error: %s"%(e))
  363. traceback.print_exc()
  364. logging.debug("process %d is done"%(os.getpid()))
  365. self.stop_all()
  366. def init_thread(self):
  367. logging.debug("process %d init thread"%(os.getpid()))
  368. self.list_thread = []
  369. for _i in range(self.thread_count):
  370. self.list_thread.append(ThreadHandler(self.thread_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs))
  371. def start_thread(self):
  372. logging.debug("process %d start thread"%(os.getpid()))
  373. for th in self.list_thread:
  374. th.start()
  375. def setQueue(self):
  376. logging.debug("process %d start setQueue"%(os.getpid()))
  377. _count = 0
  378. while True:
  379. logging.debug("setQueue from task_queue")
  380. item = None
  381. try:
  382. item = self.task_queue.get(False,timeout=0.1)
  383. _count = (_count+1)%10
  384. if _count==0:
  385. self.checkThread()
  386. except Exception as empty:
  387. logging.debug("process %d setQueue failed with task_queue size:%d and is_all_done:%s error:%s"%(os.getpid(),self.task_queue.qsize(),str(self.is_all_done()),str(empty)))
  388. _flag = self.checkThread()
  389. if self.task_queue.qsize()==0 and _flag:
  390. break
  391. time.sleep(1)
  392. while item is not None:
  393. logging.debug("setQueue to thread_queue")
  394. try:
  395. self.thread_queue.put(item,False,timeout=1)
  396. break
  397. except Exception as empty:
  398. self.checkThread()
  399. logging.debug("process %d setQueue failed with thread_queue size:%d and is_all_done:%s"%(os.getpid(),self.thread_queue.qsize(),str(self.is_all_done())))
  400. time.sleep(2)
  401. def is_all_done(self):
  402. all_done = True
  403. for _i in range(len(self.list_thread)):
  404. th = self.list_thread[_i]
  405. if th.is_alive():
  406. all_done = False
  407. break
  408. return all_done
  409. def checkThread(self):
  410. alive_count = 0
  411. restart_count = 0
  412. list_name = []
  413. for _i in range(len(self.list_thread)):
  414. th = self.list_thread[_i]
  415. if th.is_alive():
  416. alive_count += 1
  417. list_name.append(th.name)
  418. else:
  419. # stop_thread(th)
  420. self.list_thread[_i] = ThreadHandler(self.thread_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
  421. self.list_thread[_i].start()
  422. list_name.append(self.list_thread[_i].name)
  423. restart_count += 1
  424. logging.debug("process id:%d %s queue_count:%d thread_count:%d alive_count:%d restart_count:%d"%(os.getpid(),str(list_name),self.thread_queue.qsize(),len(self.list_thread),alive_count,restart_count))
  425. if self.thread_queue.qsize()==0 and restart_count>0:
  426. return True
  427. return False
  428. def getStatus(self):
  429. while True:
  430. time.sleep(5)
  431. if self.checkThread():
  432. break
  433. def stop_all(self):
  434. try:
  435. for th in self.list_thread:
  436. if th.is_alive():
  437. # th.terminate()
  438. # stop_thread(th)
  439. # th._stop()
  440. pass
  441. except Exception as e:
  442. traceback.print_exc()
  443. class MultiHandler(object):
  444. def __init__(self,task_queue,task_handler,result_queue,thread_count=1,process_count=1,need_stop=True,*args,**kwargs):
  445. self.task_queue = task_queue
  446. self.task_handler = task_handler
  447. self.result_queue = result_queue
  448. self.thread_count = thread_count
  449. self.process_count = process_count
  450. self.need_stop = need_stop
  451. self.args = args
  452. self.kwargs = kwargs
  453. self.list_process = []
  454. self.logger = logging.getLogger()
  455. def getThreadStatus(self):
  456. _count = 0
  457. restart = 0
  458. for _i in range(len(self.list_process)):
  459. _t = self.list_process[_i]
  460. if _t.is_alive():
  461. _count += 1
  462. else:
  463. _t.terminate()
  464. self.list_process[_i] = ProcessHandler(self.thread_count,self.task_queue,self.task_handler,self.result_queue,self.need_stop*self.args,**self.kwargs)
  465. self.list_process[_i].start()
  466. restart += 1
  467. logging.debug("process status alive:%d restart:%d total:%d"%(_count,restart,len(self.list_process)))
  468. return _count,restart,len(self.list_process)
  469. def run(self):
  470. for _ in range(self.process_count):
  471. pr = ProcessHandler(self.thread_count,self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
  472. self.list_process.append(pr)
  473. # for i in range(self.thread_count):
  474. # th = _taskHandler(self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
  475. # # th.setDaemon(True)
  476. # self.list_thread.append(th)
  477. for th in self.list_process:
  478. th.start()
  479. while(not self._check_all_done()):
  480. try:
  481. _,_restart,_count = self.getThreadStatus()
  482. time.sleep(5)
  483. if _restart>0 and self.task_queue.qsize()==0:
  484. break
  485. # �����ֶ�ֹͣ
  486. # _quit = False
  487. # line = sys.stdin.readline()
  488. # if line.strip()=="quit":
  489. # _quit = True
  490. # if _quit:
  491. # break
  492. except KeyboardInterrupt:
  493. logging.info("interrupted by keyboard")
  494. self.stop_all()
  495. break
  496. self.stop_all()
  497. logging.debug("the whole task is done")
  498. def _check_all_done(self):
  499. bool_done = True
  500. for th in self.list_process:
  501. if th.is_alive():
  502. bool_done = False
  503. return bool_done
  504. def stop_all(self):
  505. try:
  506. for th in self.list_process:
  507. if th.is_alive():
  508. th.terminate()
  509. # stop_thread(th)
  510. pass
  511. except Exception as e:
  512. traceback.print_exc()
  513. def test_handler(item,result_queue):
  514. print(item)
  515. from apscheduler.schedulers.blocking import BlockingScheduler
  516. class Test():
  517. def __init__(self):
  518. self.task_queue = Queue(500)
  519. self.result_queue = Queue()
  520. self.count = 5
  521. def producer(self):
  522. time.sleep(10)
  523. if self.count>0:
  524. self.count -= 1
  525. for i in range(2):
  526. self.task_queue.put(i)
  527. def comsumer(self):
  528. a = MultiHandler(task_queue=self.task_queue,task_handler=test_handler,result_queue=self.result_queue,thread_count=0,process_count=2)
  529. a.run()
  530. def start(self):
  531. scheduler = BlockingScheduler()
  532. scheduler.add_job(self.producer,"cron",second="*/5")
  533. scheduler.add_job(self.comsumer,"cron",second="*/5")
  534. scheduler.start()
  535. if __name__=="__main__":
  536. task_queue = Queue()
  537. result_queue = Queue()
  538. for i in range(100):
  539. task_queue.put(i)
  540. a = MultiProcessHandler(task_queue=task_queue,task_handler=test_handler,result_queue=result_queue,process_count=2)
  541. a.run()
  542. # test = Test()
  543. # test.start()
  544. # # a = ProcessHandler(2,None,lambda x:x,None)
  545. # # print(a.is_alive())
  546. # from multiprocessing import RLock
  547. # import os
  548. # os.fork()