123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- import logging
- import multiprocessing
- import sys,os
- from time import sleep
- import threading
- import traceback
- import warnings
- try:
- def monkeypatch_os_fork_functions():
- """
- Replace os.fork* with wrappers that use ForkSafeLock to acquire
- all locks before forking and release them afterwards.
- """
- builtin_function = type(''.join)
- if hasattr(os, 'fork') and isinstance(os.fork, builtin_function):
- global _orig_os_fork
- _orig_os_fork = os.fork
- os.fork = os_fork_wrapper
- if hasattr(os, 'forkpty') and isinstance(os.forkpty, builtin_function):
- global _orig_os_forkpty
- _orig_os_forkpty = os.forkpty
- os.forkpty = os_forkpty_wrapper
- # This lock protects all of the lists below.
- _fork_lock = threading.Lock()
- _prepare_call_list = []
- _prepare_call_exceptions = []
- _parent_call_list = []
- _child_call_list = []
- def atfork(prepare=None, parent=None, child=None):
- """A Python work-a-like of pthread_atfork.
- Any time a fork() is called from Python, all 'prepare' callables will
- be called in the order they were registered using this function.
- After the fork (successful or not), all 'parent' callables will be called in
- the parent process. If the fork succeeded, all 'child' callables will be
- called in the child process.
- No exceptions may be raised from any of the registered callables. If so
- they will be printed to sys.stderr after the fork call once it is safe
- to do so.
- """
- assert not prepare or callable(prepare)
- assert not parent or callable(parent)
- assert not child or callable(child)
- _fork_lock.acquire()
- try:
- if prepare:
- _prepare_call_list.append(prepare)
- if parent:
- _parent_call_list.append(parent)
- if child:
- _child_call_list.append(child)
- finally:
- _fork_lock.release()
- def _call_atfork_list(call_list):
- """
- Given a list of callables in call_list, call them all in order and save
- and return a list of sys.exc_info() tuples for each exception raised.
- """
- exception_list = []
- for func in call_list:
- try:
- func()
- except:
- exception_list.append(sys.exc_info())
- return exception_list
- def prepare_to_fork_acquire():
- """Acquire our lock and call all prepare callables."""
- _fork_lock.acquire()
- _prepare_call_exceptions.extend(_call_atfork_list(_prepare_call_list))
- def parent_after_fork_release():
- """
- Call all parent after fork callables, release the lock and print
- all prepare and parent callback exceptions.
- """
- prepare_exceptions = list(_prepare_call_exceptions)
- del _prepare_call_exceptions[:]
- exceptions = _call_atfork_list(_parent_call_list)
- _fork_lock.release()
- _print_exception_list(prepare_exceptions, 'before fork')
- _print_exception_list(exceptions, 'after fork from parent')
- def child_after_fork_release():
- """
- Call all child after fork callables, release lock and print all
- all child callback exceptions.
- """
- del _prepare_call_exceptions[:]
- exceptions = _call_atfork_list(_child_call_list)
- _fork_lock.release()
- _print_exception_list(exceptions, 'after fork from child')
- def _print_exception_list(exceptions, message, output_file=None):
- """
- Given a list of sys.exc_info tuples, print them all using the traceback
- module preceeded by a message and separated by a blank line.
- """
- output_file = output_file or sys.stderr
- message = 'Exception %s:\n' % message
- for exc_type, exc_value, exc_traceback in exceptions:
- output_file.write(message)
- traceback.print_exception(exc_type, exc_value, exc_traceback,
- file=output_file)
- output_file.write('\n')
- def os_fork_wrapper():
- """Wraps os.fork() to run atfork handlers."""
- pid = None
- prepare_to_fork_acquire()
- try:
- pid = _orig_os_fork()
- finally:
- if pid == 0:
- child_after_fork_release()
- else:
- # We call this regardless of fork success in order for
- # the program to be in a sane state afterwards.
- parent_after_fork_release()
- return pid
- def os_forkpty_wrapper():
- """Wraps os.forkpty() to run atfork handlers."""
- pid = None
- prepare_to_fork_acquire()
- try:
- pid, fd = _orig_os_forkpty()
- finally:
- if pid == 0:
- child_after_fork_release()
- else:
- parent_after_fork_release()
- return pid, fd
- class Error(Exception):
- pass
- def fix_logging_module():
- logging = sys.modules.get('logging')
- # Prevent fixing multiple times as that would cause a deadlock.
- if logging and getattr(logging, 'fixed_for_atfork', None):
- return
- # if logging:
- # warnings.warn('logging module already imported before fixup.')
- import logging
- if logging.getLogger().handlers:
- # We could register each lock with atfork for these handlers but if
- # these exist, other loggers or not yet added handlers could as well.
- # Its safer to insist that this fix is applied before logging has been
- # configured.
- raise Error('logging handlers already registered.')
- logging._acquireLock()
- try:
- def fork_safe_createLock(self):
- self._orig_createLock()
- atfork(self.lock.acquire,
- self.lock.release, self.lock.release)
- # Fix the logging.Handler lock (a major source of deadlocks).
- logging.Handler._orig_createLock = logging.Handler.createLock
- logging.Handler.createLock = fork_safe_createLock
- # Fix the module level lock.
- atfork(logging._acquireLock,
- logging._releaseLock, logging._releaseLock)
- logging.fixed_for_atfork = True
- finally:
- logging._releaseLock()
- # monkeypatch_os_fork_functions()
- # fix_logging_module()
- except Exception as e:
- traceback.print_exc()
- pass
- # multiprocessing.freeze_support()
- class CustomStreamHandler(logging.StreamHandler):
- def emit(self, record):
- sleep(1)
- super(CustomStreamHandler, self).emit(record)
- root = logging.getLogger()
- root.setLevel(logging.DEBUG)
- root.addHandler(CustomStreamHandler(sys.stdout))
- def f():
- # import logging
- # print(logging.threading)
- # root = logging.getLogger()
- # root.handlers[0].createLock()
- for logging_h in logging.getLogger().handlers:
- logging_h.createLock()
- logging.info(1)
- logging.info(1)
- def g():
- logging.info(2)
- logging.info(2)
- if __name__ == '__main__':
- import threading
- p = multiprocessing.Process(target=f)
- t = threading.Thread(target=g)
- t.start()
- p.start()
- logging.info("===")
|