فهرست منبع

消息队列分级并设置动态消费者

luojiehua 2 روز پیش
والد
کامیت
a054466d16
2فایلهای تغییر یافته به همراه71 افزوده شده و 36 حذف شده
  1. 0 34
      BaseDataMaintenance/maintenance/dataflow.py
  2. 71 2
      BaseDataMaintenance/maintenance/dataflow_mq.py

+ 0 - 34
BaseDataMaintenance/maintenance/dataflow.py

@@ -144,40 +144,6 @@ class Dataflow():
         self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
         self.current_path = os.path.dirname(__file__)
 
-    def dynamic_listener(self,sorted_names,process_count,target,comsumer_count):
-        from BaseDataMaintenance.java.MQInfo import getQueueSize
-
-        list_queue_dict = []
-        for _i in range(len(sorted_names)):
-            list_queue_dict.append({"name":sorted_names[_i],
-                                    "level":_i})
-        if len(list_queue_dict)==0:
-            return
-        last_name = ""
-        list_comsumer = []
-        while 1:
-            for _d in list_queue_dict:
-                _size = getQueueSize(_d.get("name"))
-                _d["queue_size"] = _size
-                if _size>1000:
-                    _d["size_level"] = 1
-                else:
-                    _d["size_level"] = 2
-
-            list_queue_dict.sort(key=lambda x:(x.get("size_level"),x.get("level")))
-            queue_name = list_queue_dict[0].get("name")
-            if last_name!=queue_name:
-                for _c in list_comsumer:
-                    _c.terminate()
-                    del _c
-                list_comsumer = []
-                for _i in range(process_count):
-                    listener_p = Process(target=target,args=(comsumer_count,queue_name))
-                    listener_p.start()
-                    list_comsumer.append(listener_p)
-
-                last_name = queue_name
-            time.sleep(120)
 
     def flow_init(self):
         def producer():

+ 71 - 2
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -47,6 +47,75 @@ class ActiveMQListener():
     def __del__(self):
         self.conn.disconnect()
 
+
+class DynamicProcess(Process):
+
+    def __init__(self,listener_cls,comsumer_handler,comsumer_count,queue_name):
+        self.listener_cls = listener_cls
+        self.comsumer_handler = comsumer_handler
+        self.comsumer_count = comsumer_count
+        self.queue_name = queue_name
+
+    def run(self):
+
+        self.list_comsumer = []
+        for _i in range(self.comsumer_count):
+            listener_attachment = self.listener_cls(getConnect_activateMQ(),self.comsumer_handler ,_i)
+            createComsumer(listener_attachment,self.queue_name)
+            self.list_comsumer.append(listener_attachment)
+
+        while 1:
+            for i in range(len(self.list_comsumer)):
+                if self.list_comsumer[i].conn.is_connected():
+                    continue
+                else:
+                    listener = self.listener_cls(getConnect_activateMQ(),self.comsumer_handler,i)
+                    createComsumer(listener,self.queue_name)
+                    self.list_comsumer[i] = listener
+            time.sleep(5)
+
+    def terminate(self) -> None:
+        for i in range(len(self.list_comsumer)):
+            _c = self.list_comsumer
+            _c.conn.disconnect()
+        super().terminate()
+
+
+def dynamic_listener(sorted_names,process_count,listener_cls,comsumer_handler,comsumer_count):
+    from BaseDataMaintenance.java.MQInfo import getQueueSize
+
+    list_queue_dict = []
+    for _i in range(len(sorted_names)):
+        list_queue_dict.append({"name":sorted_names[_i],
+                                "level":_i})
+    if len(list_queue_dict)==0:
+        return
+    last_name = ""
+    list_comsumer = []
+    while 1:
+        for _d in list_queue_dict:
+            _size = getQueueSize(_d.get("name"))
+            _d["queue_size"] = _size
+            if _size>1000:
+                _d["size_level"] = 1
+            else:
+                _d["size_level"] = 2
+
+        list_queue_dict.sort(key=lambda x:(x.get("size_level"),x.get("level")))
+        queue_name = list_queue_dict[0].get("name")
+        if last_name!=queue_name:
+            for _c in list_comsumer:
+                _c.terminate()
+                del _c
+            list_comsumer = []
+            for _i in range(process_count):
+                listener_p = DynamicProcess(listener_cls,comsumer_handler,comsumer_count,queue_name=queue_name)
+                listener_p.start()
+                list_comsumer.append(listener_p)
+
+            last_name = queue_name
+        time.sleep(120)
+
 class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
     class AttachmentMQListener():
@@ -119,7 +188,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
             listener_p = Process(target=self.start_attachment_listener,args=(5,self.mq_attachment_his))
             listener_p.start()
 
-        listener_p = Process(target=self.dynamic_listener,args=(["dataflow_attachment","dataflow_attachment_fix","dataflow_attachment_his"],6,self.start_attachment_listener,5))
+        listener_p = Process(target=dynamic_listener,args=(["dataflow_attachment","dataflow_attachment_fix","dataflow_attachment_his"],6,self.AttachmentMQListener,self.start_attachment_listener,5))
         listener_p.start()
 
         # listener_p = Process(target=self.start_attachment_listener)
@@ -895,7 +964,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             for ii in range(1):
                 listener_p = Process(target=self.start_extract_listener,args=(5,self.mq_extract_his))
                 listener_p.start()
-            listener_p = Process(target=self.dynamic_listener,args=(["dataflow_extract","dataflow_extract_fix","dataflow_extract_his"],8,self.start_extract_listener,5))
+            listener_p = Process(target=dynamic_listener,args=(["dataflow_extract","dataflow_extract_fix","dataflow_extract_his"],8,self.ExtractListener,self.start_extract_listener,5))
             listener_p.start()
             listener_p_ai = Thread(target=self.start_extract_AI_listener)
             listener_p_ai.start()