Browse Source

消息队列分级并设置动态消费者

luojiehua 2 ngày trước cách đây
mục cha
commit
9e54b08f34
1 tập tin đã thay đổi với 37 bổ sung35 xóa
  1. 37 35
      BaseDataMaintenance/maintenance/dataflow_mq.py

+ 37 - 35
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -48,38 +48,7 @@ class ActiveMQListener():
         self.conn.disconnect()
 
 
-class DynamicProcess(Process):
 
-    def __init__(self,listener_cls,comsumer_handler,comsumer_count,queue_name):
-        self.listener_cls = listener_cls
-        self.comsumer_handler = comsumer_handler
-        self.comsumer_count = comsumer_count
-        self.queue_name = queue_name
-        super().__init__()
-
-    def run(self):
-
-        self.list_comsumer = []
-        for _i in range(self.comsumer_count):
-            listener_attachment = self.listener_cls(getConnect_activateMQ(),self.comsumer_handler ,_i)
-            createComsumer(listener_attachment,self.queue_name)
-            self.list_comsumer.append(listener_attachment)
-
-        while 1:
-            for i in range(len(self.list_comsumer)):
-                if self.list_comsumer[i].conn.is_connected():
-                    continue
-                else:
-                    listener = self.listener_cls(getConnect_activateMQ(),self.comsumer_handler,i)
-                    createComsumer(listener,self.queue_name)
-                    self.list_comsumer[i] = listener
-            time.sleep(5)
-
-    def terminate(self) -> None:
-        for i in range(len(self.list_comsumer)):
-            _c = self.list_comsumer[i]
-            _c.conn.disconnect()
-        super().terminate()
 
 
 def dynamic_listener(sorted_names,process_count,listener_cls,comsumer_handler,comsumer_count):
@@ -881,6 +850,39 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
 class Dataflow_ActivteMQ_extract(Dataflow_extract):
 
+    class DynamicProcess(Process):
+
+        def __init__(self,listener_cls,comsumer_handler,comsumer_count,queue_name):
+            self.listener_cls = listener_cls
+            self.comsumer_handler = comsumer_handler
+            self.comsumer_count = comsumer_count
+            self.queue_name = queue_name
+            super().__init__()
+
+        def run(self):
+
+            self.list_comsumer = []
+            for _i in range(self.comsumer_count):
+                listener_attachment = self.listener_cls(getConnect_activateMQ(),self.comsumer_handler ,_i)
+                createComsumer(listener_attachment,self.queue_name)
+                self.list_comsumer.append(listener_attachment)
+
+            while 1:
+                for i in range(len(self.list_comsumer)):
+                    if self.list_comsumer[i].conn.is_connected():
+                        continue
+                    else:
+                        listener = self.listener_cls(getConnect_activateMQ(),self.comsumer_handler,i)
+                        createComsumer(listener,self.queue_name)
+                        self.list_comsumer[i] = listener
+                time.sleep(5)
+
+        def terminate(self) -> None:
+            for i in range(len(self.list_comsumer)):
+                _c = self.list_comsumer[i]
+                _c.conn.disconnect()
+            super().terminate()
+
     class ExtractListener():
 
         def __init__(self,conn,_func,_idx,*args,**kwargs):
@@ -956,17 +958,17 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
 
         # 提取listener
         if create_listener:
-            for ii in range(2):
+            for ii in range(6):
                 listener_p = Process(target=self.start_extract_listener,args=(5,self.mq_extract))
                 listener_p.start()
             for ii in range(1):
                 listener_p = Process(target=self.start_extract_listener,args=(5,self.mq_extract_fix))
                 listener_p.start()
-            for ii in range(1):
+            for ii in range(4):
                 listener_p = Process(target=self.start_extract_listener,args=(5,self.mq_extract_his))
                 listener_p.start()
-            listener_p = Process(target=dynamic_listener,args=(["dataflow_extract","dataflow_extract_fix","dataflow_extract_his"],8,self.ExtractListener,self.start_extract_listener,5))
-            listener_p.start()
+            # listener_p = Process(target=dynamic_listener,args=(["dataflow_extract","dataflow_extract_fix","dataflow_extract_his"],8,self.ExtractListener,self.start_extract_listener,5))
+            # listener_p.start()
             listener_p_ai = Thread(target=self.start_extract_AI_listener)
             listener_p_ai.start()