Browse Source

消息队列分级并设置动态消费者

luojiehua 22 hours ago
parent
commit
2ae333f0cf
1 changed files with 32 additions and 32 deletions
  1. 32 32
      BaseDataMaintenance/maintenance/dataflow_mq.py

+ 32 - 32
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -98,38 +98,38 @@ def dynamic_listener(sorted_names,process_count,listener_cls,comsumer_handler,co
     last_name = ""
     list_comsumer = []
     try:
-        for _d in list_queue_dict:
-            _size = getQueueSize(_d.get("name"))
-            _d["queue_size"] = _size
-            if _size>min_count:
-                _d["size_level"] = 1
-            else:
-                _d["size_level"] = 2
-
-        list_queue_dict.sort(key=lambda x:(x.get("size_level"),x.get("level")))
-        queue_name = list_queue_dict[0].get("name")
-        queue_size = list_queue_dict[0].get("queue_size")
-        log("dynamic_listener last_name:%s queue_name:%s %s"%(last_name,queue_name,str(list_queue_dict)))
-
-        if last_name==queue_name:
-            last_name = list_queue_dict[1].get("name")
-
-        # 存在于一个进程
-        if last_name!=queue_name:
-            for listener_attachment,listener_name in list_comsumer:
-                listener_attachment.conn.remove_listener(listener_name)
-                listener_attachment.conn.disconnect()
-            log("dynamic_listener terminate")
-            list_comsumer.clear()
-            for i in range(comsumer_count):
-                listener_attachment = listener_cls(getConnect_activateMQ(),comsumer_handler ,_i)
-                listener_name = createComsumer(listener_attachment,queue_name)
-                list_comsumer.append((listener_attachment,listener_name))
-            last_name = queue_name
-        time.sleep(600)
-        for listener_attachment,listener_name in list_comsumer:
-            listener_attachment.conn.remove_listener(listener_name)
-            listener_attachment.conn.disconnect()
+        while 1:
+            for _d in list_queue_dict:
+                _size = getQueueSize(_d.get("name"))
+                _d["queue_size"] = _size
+                if _size>min_count:
+                    _d["size_level"] = 1
+                else:
+                    _d["size_level"] = 2
+
+            list_queue_dict.sort(key=lambda x:(x.get("size_level"),x.get("level")))
+            queue_name = list_queue_dict[0].get("name")
+            queue_size = list_queue_dict[0].get("queue_size")
+            log("dynamic_listener last_name:%s queue_name:%s %s"%(last_name,queue_name,str(list_queue_dict)))
+
+            # 存在于一个进程
+
+            if last_name=="":
+                for i in range(comsumer_count):
+                    listener_attachment = listener_cls(getConnect_activateMQ(),comsumer_handler ,_i)
+                    listener_name = createComsumer(listener_attachment,queue_name)
+                    list_comsumer.append((listener_attachment,listener_name))
+                last_name = queue_name
+            elif last_name!=queue_name:
+                log("dynamic_listener terminate")
+                for listener_attachment,listener_name in list_comsumer:
+                    listener_attachment.conn.remove_listener(listener_name)
+                    listener_attachment.conn.disconnect()
+                list_comsumer.clear()
+                break
+
+            time.sleep(60)
+
 
         # # 用进程启动
         # if last_name!=queue_name: