123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623 |
- import threading
- import queue
- import time
- import traceback
- import ctypes
- import inspect
- import sys,os
- from multiprocessing import Process,Queue
- import multiprocessing
- # try:
- # def monkeypatch_os_fork_functions():
- # """
- # Replace os.fork* with wrappers that use ForkSafeLock to acquire
- # all locks before forking and release them afterwards.
- # """
- # builtin_function = type(''.join)
- # if hasattr(os, 'fork') and isinstance(os.fork, builtin_function):
- # global _orig_os_fork
- # _orig_os_fork = os.fork
- # os.fork = os_fork_wrapper
- # if hasattr(os, 'forkpty') and isinstance(os.forkpty, builtin_function):
- # global _orig_os_forkpty
- # _orig_os_forkpty = os.forkpty
- # os.forkpty = os_forkpty_wrapper
- #
- #
- # # This lock protects all of the lists below.
- # _fork_lock = threading.Lock()
- # _prepare_call_list = []
- # _prepare_call_exceptions = []
- # _parent_call_list = []
- # _child_call_list = []
- #
- #
- # def atfork(prepare=None, parent=None, child=None):
- # """A Python work-a-like of pthread_atfork.
- #
- # Any time a fork() is called from Python, all 'prepare' callables will
- # be called in the order they were registered using this function.
- # After the fork (successful or not), all 'parent' callables will be called in
- # the parent process. If the fork succeeded, all 'child' callables will be
- # called in the child process.
- # No exceptions may be raised from any of the registered callables. If so
- # they will be printed to sys.stderr after the fork call once it is safe
- # to do so.
- # """
- # assert not prepare or callable(prepare)
- # assert not parent or callable(parent)
- # assert not child or callable(child)
- # _fork_lock.acquire()
- # try:
- # if prepare:
- # _prepare_call_list.append(prepare)
- # if parent:
- # _parent_call_list.append(parent)
- # if child:
- # _child_call_list.append(child)
- # finally:
- # _fork_lock.release()
- #
- #
- # def _call_atfork_list(call_list):
- # """
- # Given a list of callables in call_list, call them all in order and save
- # and return a list of sys.exc_info() tuples for each exception raised.
- # """
- # exception_list = []
- # for func in call_list:
- # try:
- # func()
- # except:
- # exception_list.append(sys.exc_info())
- # return exception_list
- #
- #
- # def prepare_to_fork_acquire():
- # """Acquire our lock and call all prepare callables."""
- # _fork_lock.acquire()
- # _prepare_call_exceptions.extend(_call_atfork_list(_prepare_call_list))
- #
- #
- # def parent_after_fork_release():
- # """
- # Call all parent after fork callables, release the lock and print
- # all prepare and parent callback exceptions.
- # """
- # prepare_exceptions = list(_prepare_call_exceptions)
- # del _prepare_call_exceptions[:]
- # exceptions = _call_atfork_list(_parent_call_list)
- # _fork_lock.release()
- # _print_exception_list(prepare_exceptions, 'before fork')
- # _print_exception_list(exceptions, 'after fork from parent')
- #
- #
- # def child_after_fork_release():
- # """
- # Call all child after fork callables, release lock and print all
- # all child callback exceptions.
- # """
- # del _prepare_call_exceptions[:]
- # exceptions = _call_atfork_list(_child_call_list)
- # _fork_lock.release()
- # _print_exception_list(exceptions, 'after fork from child')
- #
- #
- # def _print_exception_list(exceptions, message, output_file=None):
- # """
- # Given a list of sys.exc_info tuples, print them all using the traceback
- # module preceeded by a message and separated by a blank line.
- # """
- # output_file = output_file or sys.stderr
- # message = 'Exception %s:\n' % message
- # for exc_type, exc_value, exc_traceback in exceptions:
- # output_file.write(message)
- # traceback.print_exception(exc_type, exc_value, exc_traceback,
- # file=output_file)
- # output_file.write('\n')
- #
- #
- # def os_fork_wrapper():
- # """Wraps os.fork() to run atfork handlers."""
- # pid = None
- # prepare_to_fork_acquire()
- # try:
- # pid = _orig_os_fork()
- # finally:
- # if pid == 0:
- # child_after_fork_release()
- # else:
- # # We call this regardless of fork success in order for
- # # the program to be in a sane state afterwards.
- # parent_after_fork_release()
- # return pid
- #
- #
- # def os_forkpty_wrapper():
- # """Wraps os.forkpty() to run atfork handlers."""
- # pid = None
- # prepare_to_fork_acquire()
- # try:
- # pid, fd = _orig_os_forkpty()
- # finally:
- # if pid == 0:
- # child_after_fork_release()
- # else:
- # parent_after_fork_release()
- # return pid, fd
- # class Error(Exception):
- # pass
- # def fix_logging_module():
- # logging = sys.modules.get('logging')
- # # Prevent fixing multiple times as that would cause a deadlock.
- # if logging and getattr(logging, 'fixed_for_atfork', None):
- # return
- # # if logging:
- # # warnings.warn('logging module already imported before fixup.')
- # import logging
- # if logging.getLogger().handlers:
- # # We could register each lock with atfork for these handlers but if
- # # these exist, other loggers or not yet added handlers could as well.
- # # Its safer to insist that this fix is applied before logging has been
- # # configured.
- # raise Error('logging handlers already registered.')
- #
- # logging._acquireLock()
- # try:
- # def fork_safe_createLock(self):
- # self._orig_createLock()
- # atfork(self.lock.acquire,
- # self.lock.release, self.lock.release)
- #
- # # Fix the logging.Handler lock (a major source of deadlocks).
- # logging.Handler._orig_createLock = logging.Handler.createLock
- # logging.Handler.createLock = fork_safe_createLock
- #
- # # Fix the module level lock.
- # atfork(logging._acquireLock,
- # logging._releaseLock, logging._releaseLock)
- #
- # logging.fixed_for_atfork = True
- # finally:
- # logging._releaseLock()
- # monkeypatch_os_fork_functions()
- # fix_logging_module()
- # except Exception as e:
- # traceback.print_exc()
- # pass
- import logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def _async_raise(tid, exctype):
- """raises the exception, performs cleanup if needed"""
- tid = ctypes.c_long(tid)
- if not inspect.isclass(exctype):
- exctype = type(exctype)
- res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
- if res == 0:
- raise ValueError("invalid thread id")
- elif res != 1:
- ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
- raise SystemError("PyThreadState_SetAsyncExc failed")
- def stop_thread(thread):
- try:
- _async_raise(thread.ident, SystemExit)
- except Exception as e:
- traceback.print_exc()
- class _taskHandler(Process):
- def __init__(self,task_queue,task_handler,result_queue,*args,**kwargs):
- Process.__init__(self)
- # Process.__init__(self)
- self.task_queue = task_queue
- self.task_handler = task_handler
- self.result_queue = result_queue
- self.args = args
- self.kwargs = kwargs
- def run(self):
- while(True):
- try:
- # logging.info("task queue size is %d"%(self.task_queue.qsize()))
- item = self.task_queue.get(True,timeout=1)
- self.task_handler(item,self.result_queue,*self.args,**self.kwargs)
- # self.task_queue.task_done()
- except queue.Empty as e:
- # logging.info("%s thread is done"%(self.name))
- # if self.task_queue.empty():
- # break
- time.sleep(1)
- except Exception as e:
- logging.info("error: %s"%(e))
- traceback.print_exc()
- class MultiProcessHandler(object):
- def __init__(self,task_queue,task_handler,result_queue,process_count=1,*args,**kwargs):
- self.task_queue = task_queue
- self.task_handler = task_handler
- self.result_queue = result_queue
- self.list_thread = []
- self.process_count = process_count
- self.args = args
- self.kwargs = kwargs
- self.restart = False
- def getThreadStatus(self):
- _count = 0
- restart = 0
- for _t in self.list_thread:
- if _t.is_alive():
- _count += 1
- else:
- if self.restart:
- _t = _taskHandler(self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
- _t.start()
- restart += 1
- logging.info("thread status alive:%d restart:%d total:%d"%(_count,restart,len(self.list_thread)))
- return _count,restart,len(self.list_thread)
- def run(self):
- for i in range(self.process_count):
- th = _taskHandler(self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
- # th.setDaemon(True)
- self.list_thread.append(th)
- for th in self.list_thread:
- th.start()
- while(not self._check_all_done()):
- try:
- time.sleep(1)
- _,_restart,_count = self.getThreadStatus()
- if _count==0:
- if self.task_queue.qsize()==0:
- break
- else:
- self.restart = True
- _,_restart,_count = self.getThreadStatus()
- # if _count==_restart and self.task_queue.qsize()==0:
- # break
- # �����ֶ�ֹͣ
- # _quit = False
- # line = sys.stdin.readline()
- # if line.strip()=="quit":
- # _quit = True
- # if _quit:
- # break
- except KeyboardInterrupt:
- print("interrupted by keyboard")
- self.stop_all()
- break
- print("the whole task is done")
- def _check_all_done(self):
- bool_done = True
- for th in self.list_thread:
- if th.is_alive():
- bool_done = False
- return bool_done
- def stop_all(self):
- for th in self.list_thread:
- if th.isAlive:
- th.terminate()
- class ThreadHandler(threading.Thread):
- def __init__(self,task_queue,task_handler,result_queue,*args,**kwargs):
- threading.Thread.__init__(self)
- # Process.__init__(self)
- self.task_queue = task_queue
- self.task_handler = task_handler
- self.result_queue = result_queue
- self.args = args
- self.kwargs = kwargs
- self.pid = os.getpid()
- def run(self):
- while(True):
- try:
- # logging.info("thread of process:%d name:%s getting item..."%(self.pid,self.name))
- logging.info("thread of process:%d name: %s task queue size is %d"%(self.pid,self.name,self.task_queue.qsize()))
- item = self.task_queue.get(False,timeout=1)
- self.task_handler(item,self.result_queue,*self.args,**self.kwargs)
- # self.task_queue.task_done()
- except queue.Empty as e:
- if self.task_queue.qsize()==0:
- logging.info("%s thread of process %s is done"%(self.name,self.pid))
- break
- else:
- logging.info("%s of process %s get item failed"%(self.name,self.pid))
- except Exception as e:
- logging.info("error: %s"%(e))
- traceback.print_exc()
- class ProcessHandler(Process):
- def __init__(self,thread_count,task_queue,task_handler,result_queue,need_stop=True,*args,**kwargs):
- # threading.Thread.__init__(self)
- Process.__init__(self)
- self.thread_count = thread_count
- self.task_queue = task_queue
- self.task_handler = task_handler
- self.result_queue = result_queue
- self.need_stop = need_stop
- self.args = args
- self.kwargs = kwargs
- self.thread_queue = Queue(100)
- self.has_run = False
- self.list_thread = []
- def run(self):
- logging._lock = threading.RLock()
- for logging_h in logging.getLogger().handlers:
- logging_h.createLock()
- if self.thread_count>0:
- try:
- self.init_thread()
- self.start_thread()
- self.setQueue()
- except Exception as e:
- traceback.print_exc()
- else:
- while(True):
- try:
- # logging.info("thread of process:%d name:%s getting item..."%(self.pid,self.name))
- logging.debug("main of process:%d name: %s task queue size is %d"%(self.pid,self.name,self.task_queue.qsize()))
- item = self.task_queue.get(False,timeout=1)
- self.task_handler(item,self.result_queue,*self.args,**self.kwargs)
- # self.task_queue.task_done()
- except queue.Empty as e:
- if self.task_queue.qsize()==0:
- logging.debug("%s main of process %s is done"%(self.name,self.pid))
- if self.need_stop:
- break
- else:
- logging.debug("%s of process %s get item failed"%(self.name,self.pid))
- except Exception as e:
- logging.info("error: %s"%(e))
- traceback.print_exc()
- logging.debug("process %d is done"%(os.getpid()))
- self.stop_all()
- def init_thread(self):
- logging.debug("process %d init thread"%(os.getpid()))
- self.list_thread = []
- for _i in range(self.thread_count):
- self.list_thread.append(ThreadHandler(self.thread_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs))
- def start_thread(self):
- logging.debug("process %d start thread"%(os.getpid()))
- for th in self.list_thread:
- th.start()
- def setQueue(self):
- logging.debug("process %d start setQueue"%(os.getpid()))
- _count = 0
- while True:
- logging.debug("setQueue from task_queue")
- item = None
- try:
- item = self.task_queue.get(False,timeout=0.1)
- _count = (_count+1)%10
- if _count==0:
- self.checkThread()
- except Exception as empty:
- logging.debug("process %d setQueue failed with task_queue size:%d and is_all_done:%s error:%s"%(os.getpid(),self.task_queue.qsize(),str(self.is_all_done()),str(empty)))
- _flag = self.checkThread()
- if self.task_queue.qsize()==0 and _flag:
- break
- time.sleep(1)
- while item is not None:
- logging.debug("setQueue to thread_queue")
- try:
- self.thread_queue.put(item,False,timeout=1)
- break
- except Exception as empty:
- self.checkThread()
- logging.debug("process %d setQueue failed with thread_queue size:%d and is_all_done:%s"%(os.getpid(),self.thread_queue.qsize(),str(self.is_all_done())))
- time.sleep(2)
- def is_all_done(self):
- all_done = True
- for _i in range(len(self.list_thread)):
- th = self.list_thread[_i]
- if th.is_alive():
- all_done = False
- break
- return all_done
- def checkThread(self):
- alive_count = 0
- restart_count = 0
- list_name = []
- for _i in range(len(self.list_thread)):
- th = self.list_thread[_i]
- if th.is_alive():
- alive_count += 1
- list_name.append(th.name)
- else:
- # stop_thread(th)
- self.list_thread[_i] = ThreadHandler(self.thread_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
- self.list_thread[_i].start()
- list_name.append(self.list_thread[_i].name)
- restart_count += 1
- logging.debug("process id:%d %s queue_count:%d thread_count:%d alive_count:%d restart_count:%d"%(os.getpid(),str(list_name),self.thread_queue.qsize(),len(self.list_thread),alive_count,restart_count))
- if self.thread_queue.qsize()==0 and restart_count>0:
- return True
- return False
- def getStatus(self):
- while True:
- time.sleep(5)
- if self.checkThread():
- break
- def stop_all(self):
- try:
- for th in self.list_thread:
- if th.is_alive():
- # th.terminate()
- # stop_thread(th)
- # th._stop()
- pass
- except Exception as e:
- traceback.print_exc()
- class MultiHandler(object):
- def __init__(self,task_queue,task_handler,result_queue,thread_count=1,process_count=1,need_stop=True,*args,**kwargs):
- self.task_queue = task_queue
- self.task_handler = task_handler
- self.result_queue = result_queue
- self.thread_count = thread_count
- self.process_count = process_count
- self.need_stop = need_stop
- self.args = args
- self.kwargs = kwargs
- self.list_process = []
- self.logger = logging.getLogger()
- def getThreadStatus(self):
- _count = 0
- restart = 0
- for _i in range(len(self.list_process)):
- _t = self.list_process[_i]
- if _t.is_alive():
- _count += 1
- else:
- _t.terminate()
- self.list_process[_i] = ProcessHandler(self.thread_count,self.task_queue,self.task_handler,self.result_queue,self.need_stop*self.args,**self.kwargs)
- self.list_process[_i].start()
- restart += 1
- logging.debug("process status alive:%d restart:%d total:%d"%(_count,restart,len(self.list_process)))
- return _count,restart,len(self.list_process)
- def run(self):
- for _ in range(self.process_count):
- pr = ProcessHandler(self.thread_count,self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
- self.list_process.append(pr)
- # for i in range(self.thread_count):
- # th = _taskHandler(self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
- # # th.setDaemon(True)
- # self.list_thread.append(th)
- for th in self.list_process:
- th.start()
- while(not self._check_all_done()):
- try:
- _,_restart,_count = self.getThreadStatus()
- time.sleep(5)
- if _restart>0 and self.task_queue.qsize()==0:
- break
- # �����ֶ�ֹͣ
- # _quit = False
- # line = sys.stdin.readline()
- # if line.strip()=="quit":
- # _quit = True
- # if _quit:
- # break
- except KeyboardInterrupt:
- logging.info("interrupted by keyboard")
- self.stop_all()
- break
- self.stop_all()
- logging.debug("the whole task is done")
- def _check_all_done(self):
- bool_done = True
- for th in self.list_process:
- if th.is_alive():
- bool_done = False
- return bool_done
- def stop_all(self):
- try:
- for th in self.list_process:
- if th.is_alive():
- th.terminate()
- # stop_thread(th)
- pass
- except Exception as e:
- traceback.print_exc()
- def test_handler(item,result_queue):
- print(item)
- from apscheduler.schedulers.blocking import BlockingScheduler
- class Test():
- def __init__(self):
- self.task_queue = Queue(500)
- self.result_queue = Queue()
- self.count = 5
- def producer(self):
- time.sleep(10)
- if self.count>0:
- self.count -= 1
- for i in range(2):
- self.task_queue.put(i)
- def comsumer(self):
- a = MultiHandler(task_queue=self.task_queue,task_handler=test_handler,result_queue=self.result_queue,thread_count=0,process_count=2)
- a.run()
- def start(self):
- scheduler = BlockingScheduler()
- scheduler.add_job(self.producer,"cron",second="*/5")
- scheduler.add_job(self.comsumer,"cron",second="*/5")
- scheduler.start()
- if __name__=="__main__":
- task_queue = Queue()
- result_queue = Queue()
- for i in range(100):
- task_queue.put(i)
- a = MultiProcessHandler(task_queue=task_queue,task_handler=test_handler,result_queue=result_queue,process_count=2)
- a.run()
- # test = Test()
- # test.start()
- # # a = ProcessHandler(2,None,lambda x:x,None)
- # # print(a.is_alive())
- # from multiprocessing import RLock
- # import os
- # os.fork()
|