ac.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. import logging
  2. import multiprocessing
  3. import sys,os
  4. from time import sleep
  5. import threading
  6. import traceback
  7. import warnings
  8. try:
  9. def monkeypatch_os_fork_functions():
  10. """
  11. Replace os.fork* with wrappers that use ForkSafeLock to acquire
  12. all locks before forking and release them afterwards.
  13. """
  14. builtin_function = type(''.join)
  15. if hasattr(os, 'fork') and isinstance(os.fork, builtin_function):
  16. global _orig_os_fork
  17. _orig_os_fork = os.fork
  18. os.fork = os_fork_wrapper
  19. if hasattr(os, 'forkpty') and isinstance(os.forkpty, builtin_function):
  20. global _orig_os_forkpty
  21. _orig_os_forkpty = os.forkpty
  22. os.forkpty = os_forkpty_wrapper
  23. # This lock protects all of the lists below.
  24. _fork_lock = threading.Lock()
  25. _prepare_call_list = []
  26. _prepare_call_exceptions = []
  27. _parent_call_list = []
  28. _child_call_list = []
  29. def atfork(prepare=None, parent=None, child=None):
  30. """A Python work-a-like of pthread_atfork.
  31. Any time a fork() is called from Python, all 'prepare' callables will
  32. be called in the order they were registered using this function.
  33. After the fork (successful or not), all 'parent' callables will be called in
  34. the parent process. If the fork succeeded, all 'child' callables will be
  35. called in the child process.
  36. No exceptions may be raised from any of the registered callables. If so
  37. they will be printed to sys.stderr after the fork call once it is safe
  38. to do so.
  39. """
  40. assert not prepare or callable(prepare)
  41. assert not parent or callable(parent)
  42. assert not child or callable(child)
  43. _fork_lock.acquire()
  44. try:
  45. if prepare:
  46. _prepare_call_list.append(prepare)
  47. if parent:
  48. _parent_call_list.append(parent)
  49. if child:
  50. _child_call_list.append(child)
  51. finally:
  52. _fork_lock.release()
  53. def _call_atfork_list(call_list):
  54. """
  55. Given a list of callables in call_list, call them all in order and save
  56. and return a list of sys.exc_info() tuples for each exception raised.
  57. """
  58. exception_list = []
  59. for func in call_list:
  60. try:
  61. func()
  62. except:
  63. exception_list.append(sys.exc_info())
  64. return exception_list
  65. def prepare_to_fork_acquire():
  66. """Acquire our lock and call all prepare callables."""
  67. _fork_lock.acquire()
  68. _prepare_call_exceptions.extend(_call_atfork_list(_prepare_call_list))
  69. def parent_after_fork_release():
  70. """
  71. Call all parent after fork callables, release the lock and print
  72. all prepare and parent callback exceptions.
  73. """
  74. prepare_exceptions = list(_prepare_call_exceptions)
  75. del _prepare_call_exceptions[:]
  76. exceptions = _call_atfork_list(_parent_call_list)
  77. _fork_lock.release()
  78. _print_exception_list(prepare_exceptions, 'before fork')
  79. _print_exception_list(exceptions, 'after fork from parent')
  80. def child_after_fork_release():
  81. """
  82. Call all child after fork callables, release lock and print all
  83. all child callback exceptions.
  84. """
  85. del _prepare_call_exceptions[:]
  86. exceptions = _call_atfork_list(_child_call_list)
  87. _fork_lock.release()
  88. _print_exception_list(exceptions, 'after fork from child')
  89. def _print_exception_list(exceptions, message, output_file=None):
  90. """
  91. Given a list of sys.exc_info tuples, print them all using the traceback
  92. module preceeded by a message and separated by a blank line.
  93. """
  94. output_file = output_file or sys.stderr
  95. message = 'Exception %s:\n' % message
  96. for exc_type, exc_value, exc_traceback in exceptions:
  97. output_file.write(message)
  98. traceback.print_exception(exc_type, exc_value, exc_traceback,
  99. file=output_file)
  100. output_file.write('\n')
  101. def os_fork_wrapper():
  102. """Wraps os.fork() to run atfork handlers."""
  103. pid = None
  104. prepare_to_fork_acquire()
  105. try:
  106. pid = _orig_os_fork()
  107. finally:
  108. if pid == 0:
  109. child_after_fork_release()
  110. else:
  111. # We call this regardless of fork success in order for
  112. # the program to be in a sane state afterwards.
  113. parent_after_fork_release()
  114. return pid
  115. def os_forkpty_wrapper():
  116. """Wraps os.forkpty() to run atfork handlers."""
  117. pid = None
  118. prepare_to_fork_acquire()
  119. try:
  120. pid, fd = _orig_os_forkpty()
  121. finally:
  122. if pid == 0:
  123. child_after_fork_release()
  124. else:
  125. parent_after_fork_release()
  126. return pid, fd
  127. class Error(Exception):
  128. pass
  129. def fix_logging_module():
  130. logging = sys.modules.get('logging')
  131. # Prevent fixing multiple times as that would cause a deadlock.
  132. if logging and getattr(logging, 'fixed_for_atfork', None):
  133. return
  134. # if logging:
  135. # warnings.warn('logging module already imported before fixup.')
  136. import logging
  137. if logging.getLogger().handlers:
  138. # We could register each lock with atfork for these handlers but if
  139. # these exist, other loggers or not yet added handlers could as well.
  140. # Its safer to insist that this fix is applied before logging has been
  141. # configured.
  142. raise Error('logging handlers already registered.')
  143. logging._acquireLock()
  144. try:
  145. def fork_safe_createLock(self):
  146. self._orig_createLock()
  147. atfork(self.lock.acquire,
  148. self.lock.release, self.lock.release)
  149. # Fix the logging.Handler lock (a major source of deadlocks).
  150. logging.Handler._orig_createLock = logging.Handler.createLock
  151. logging.Handler.createLock = fork_safe_createLock
  152. # Fix the module level lock.
  153. atfork(logging._acquireLock,
  154. logging._releaseLock, logging._releaseLock)
  155. logging.fixed_for_atfork = True
  156. finally:
  157. logging._releaseLock()
  158. # monkeypatch_os_fork_functions()
  159. # fix_logging_module()
  160. except Exception as e:
  161. traceback.print_exc()
  162. pass
  163. # multiprocessing.freeze_support()
  164. class CustomStreamHandler(logging.StreamHandler):
  165. def emit(self, record):
  166. sleep(1)
  167. super(CustomStreamHandler, self).emit(record)
  168. root = logging.getLogger()
  169. root.setLevel(logging.DEBUG)
  170. root.addHandler(CustomStreamHandler(sys.stdout))
  171. def f():
  172. # import logging
  173. # print(logging.threading)
  174. # root = logging.getLogger()
  175. # root.handlers[0].createLock()
  176. for logging_h in logging.getLogger().handlers:
  177. logging_h.createLock()
  178. logging.info(1)
  179. logging.info(1)
  180. def g():
  181. logging.info(2)
  182. logging.info(2)
  183. if __name__ == '__main__':
  184. import threading
  185. p = multiprocessing.Process(target=f)
  186. t = threading.Thread(target=g)
  187. t.start()
  188. p.start()
  189. logging.info("===")