Selaa lähdekoodia

消息队列分级并设置动态消费者

luojiehua 2 päivää sitten
vanhempi
commit
ceecdcf7bd
1 muutettua tiedostoa jossa 5 lisäystä ja 34 poistoa
  1. 5 34
      BaseDataMaintenance/maintenance/dataflow_mq.py

+ 5 - 34
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -189,7 +189,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
             listener_p = Process(target=self.start_attachment_listener,args=(5,self.mq_attachment_his))
             listener_p.start()
 
-        listener_p = Process(target=dynamic_listener,args=(["dataflow_attachment","dataflow_attachment_fix","dataflow_attachment_his"],6,self.AttachmentMQListener,self.start_attachment_listener,5))
+        listener_p = Process(target=dynamic_listener,args=(["dataflow_attachment","dataflow_attachment_fix","dataflow_attachment_his"],6,self.AttachmentMQListener,lambda comsumer_count,queue_name:self.start_attachment_listener(comsumer_count,queue_name),5))
         listener_p.start()
 
         # listener_p = Process(target=self.start_attachment_listener)
@@ -881,38 +881,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
 class Dataflow_ActivteMQ_extract(Dataflow_extract):
 
-    class DynamicProcess(Process):
 
-        def __init__(self,listener_cls,comsumer_handler,comsumer_count,queue_name):
-            self.listener_cls = listener_cls
-            self.comsumer_handler = comsumer_handler
-            self.comsumer_count = comsumer_count
-            self.queue_name = queue_name
-            super().__init__()
-
-        def run(self):
-
-            self.list_comsumer = []
-            for _i in range(self.comsumer_count):
-                listener_attachment = self.listener_cls(getConnect_activateMQ(),self.comsumer_handler ,_i)
-                createComsumer(listener_attachment,self.queue_name)
-                self.list_comsumer.append(listener_attachment)
-
-            while 1:
-                for i in range(len(self.list_comsumer)):
-                    if self.list_comsumer[i].conn.is_connected():
-                        continue
-                    else:
-                        listener = self.listener_cls(getConnect_activateMQ(),self.comsumer_handler,i)
-                        createComsumer(listener,self.queue_name)
-                        self.list_comsumer[i] = listener
-                time.sleep(5)
-
-        def terminate(self) -> None:
-            for i in range(len(self.list_comsumer)):
-                _c = self.list_comsumer[i]
-                _c.conn.disconnect()
-            super().terminate()
 
     class ExtractListener():
 
@@ -938,6 +907,8 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
         def __del__(self):
             self.conn.disconnect()
 
+
+
     def __init__(self,create_listener=True):
         Dataflow_extract.__init__(self)
 
@@ -998,8 +969,8 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             for ii in range(4):
                 listener_p = Process(target=self.start_extract_listener,args=(5,self.mq_extract_his))
                 listener_p.start()
-            # listener_p = Process(target=dynamic_listener,args=(["dataflow_extract","dataflow_extract_fix","dataflow_extract_his"],8,self.ExtractListener,self.start_extract_listener,5))
-            # listener_p.start()
+            listener_p = Process(target=dynamic_listener,args=(["dataflow_extract","dataflow_extract_fix","dataflow_extract_his"],8,self.ExtractListener,lambda comsumer_count,queue_name:self.start_extract_listener(comsumer_count,queue_name),5))
+            listener_p.start()
             listener_p_ai = Thread(target=self.start_extract_AI_listener)
             listener_p_ai.start()