Forráskód Böngészése

消息队列分级并设置动态消费者

luojiehua 2 napja
szülő
commit
d0450bd4ea
1 módosított fájl, 31 hozzáadás és 0 törlés
  1. 31 0
      BaseDataMaintenance/maintenance/dataflow_mq.py

+ 31 - 0
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -48,7 +48,38 @@ class ActiveMQListener():
         self.conn.disconnect()
 
 
+class DynamicProcess(Process):
 
+    def __init__(self,listener_cls,comsumer_handler,comsumer_count,queue_name):
+        self.listener_cls = listener_cls
+        self.comsumer_handler = comsumer_handler
+        self.comsumer_count = comsumer_count
+        self.queue_name = queue_name
+        super().__init__()
+
+    def run(self):
+
+        self.list_comsumer = []
+        for _i in range(self.comsumer_count):
+            listener_attachment = self.listener_cls(getConnect_activateMQ(),self.comsumer_handler ,_i)
+            createComsumer(listener_attachment,self.queue_name)
+            self.list_comsumer.append(listener_attachment)
+
+        while 1:
+            for i in range(len(self.list_comsumer)):
+                if self.list_comsumer[i].conn.is_connected():
+                    continue
+                else:
+                    listener = self.listener_cls(getConnect_activateMQ(),self.comsumer_handler,i)
+                    createComsumer(listener,self.queue_name)
+                    self.list_comsumer[i] = listener
+            time.sleep(5)
+
+    def terminate(self) -> None:
+        for i in range(len(self.list_comsumer)):
+            _c = self.list_comsumer[i]
+            _c.conn.disconnect()
+        super().terminate()
 
 
 def dynamic_listener(sorted_names,process_count,listener_cls,comsumer_handler,comsumer_count):