소스 검색

消息队列分级并设置动态消费者

luojiehua 1 일 전
부모
커밋
629e21b8ca
2개의 변경된 파일51개의 추가작업 그리고 16개의 파일을 삭제
  1. 2 0
      BaseDataMaintenance/common/activateMQUtils.py
  2. 49 16
      BaseDataMaintenance/maintenance/dataflow_mq.py

+ 2 - 0
BaseDataMaintenance/common/activateMQUtils.py

@@ -52,6 +52,8 @@ def createComsumer(listener,dest,ack="client-individual",*args,**kwargs):
     # conn.subscribe(destination=dest, ack=ack, id="")
     conn.subscribe(destination=dest,ack=ack,id="", headers={'activemq.prefetchSize': 1})
 
+    return substription
+
 
 
 def test():

+ 49 - 16
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -86,7 +86,7 @@ class DynamicProcess(Process):
             _c = self.list_comsumer[i]
             _c.conn.disconnect()
 
-def dynamic_listener(sorted_names,process_count,listener_cls,comsumer_handler,comsumer_count):
+def dynamic_listener(sorted_names,process_count,listener_cls,comsumer_handler,comsumer_count,min_count=5000):
     from BaseDataMaintenance.java.MQInfo import getQueueSize
 
     list_queue_dict = []
@@ -97,32 +97,56 @@ def dynamic_listener(sorted_names,process_count,listener_cls,comsumer_handler,co
         return
     last_name = ""
     list_comsumer = []
-    while 1:
+    try:
         for _d in list_queue_dict:
             _size = getQueueSize(_d.get("name"))
             _d["queue_size"] = _size
-            if _size>6900:
+            if _size>min_count:
                 _d["size_level"] = 1
             else:
                 _d["size_level"] = 2
 
         list_queue_dict.sort(key=lambda x:(x.get("size_level"),x.get("level")))
         queue_name = list_queue_dict[0].get("name")
+        queue_size = list_queue_dict[0].get("queue_size")
         log("dynamic_listener last_name:%s queue_name:%s %s"%(last_name,queue_name,str(list_queue_dict)))
-        queue_name = "dataflow_attachment_his"
+
+        if last_name==queue_name:
+            last_name = list_queue_dict[1].get("name")
+
+        # 存在于一个进程
         if last_name!=queue_name:
-            for _c in list_comsumer:
-                _c.kill()
-                _c.join()
+            for listener_attachment,listener_name in list_comsumer:
+                listener_attachment.conn.remove_listener(listener_name)
+                listener_attachment.conn.disconnect()
             log("dynamic_listener terminate")
             list_comsumer.clear()
-            for _i in range(process_count):
-                listener_p = DynamicProcess(listener_cls,comsumer_handler,comsumer_count,queue_name=queue_name)
-                listener_p.start()
-                list_comsumer.append(listener_p)
-
+            for i in range(comsumer_count):
+                listener_attachment = listener_cls(getConnect_activateMQ(),comsumer_handler ,_i)
+                listener_name = createComsumer(listener_attachment,queue_name)
+                list_comsumer.append((listener_attachment,listener_name))
             last_name = queue_name
-        time.sleep(120)
+        time.sleep(600)
+        for listener_attachment,listener_name in list_comsumer:
+            listener_attachment.conn.remove_listener(listener_name)
+            listener_attachment.conn.disconnect()
+
+        # # 用进程启动
+        # if last_name!=queue_name:
+        #     for _c in list_comsumer:
+        #         _c.kill()
+        #         _c.join()
+        #     log("dynamic_listener terminate")
+        #     list_comsumer.clear()
+        #     for _i in range(process_count):
+        #         listener_p = DynamicProcess(listener_cls,comsumer_handler,comsumer_count,queue_name=queue_name)
+        #         listener_p.start()
+        #         list_comsumer.append(listener_p)
+        #
+        #     last_name = queue_name
+        # time.sleep(120)
+    except Exception as e:
+        traceback.print_exc()
 
 class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
@@ -231,6 +255,9 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 createComsumer(listener,self.mq_attachment)
                 self.list_attachment_comsumer[i] = listener
 
+    def dynamic_listener_attachment(self):
+        dynamic_listener(["dataflow_attachment","dataflow_attachment_fix","dataflow_attachment_his"],1,self.AttachmentMQListener,lambda _dict: self.attachment_listener_handler(_dict),5)
+
 
     def process_failed_attachment(self):
 
@@ -887,6 +914,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
         # schedule.add_job(self.monitor_attachment_process,"cron",second="*/10")
         # schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
+        schedule.add_job(self.dynamic_listener_attachment,"cron",second="*/10")
         schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
         schedule.start()
 
@@ -978,12 +1006,13 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
                 listener_p = Process(target=self.start_extract_listener,args=(2,self.mq_extract_fix))
                 listener_p.start()
             for ii in range(2):
-                listener_p = Process(target=self.start_extract_listener,args=(2,self.mq_extract_his))
+                listener_p = Process(target=self.start_extract_listener,args=(4,self.mq_extract_his))
                 listener_p.start()
             # listener_p = Process(target=dynamic_listener,args=(["dataflow_extract","dataflow_extract_fix","dataflow_extract_his"],8,self.ExtractListener,lambda _dict,result_queue:self.comsumer_handle(_dict,result_queue),2))
             # listener_p.start()
-            listener_p_ai = Thread(target=self.start_extract_AI_listener)
-            listener_p_ai.start()
+            # listener_p_ai = Thread(target=self.start_extract_AI_listener)
+            # listener_p_ai.start()
+            pass
 
 
 
@@ -1078,6 +1107,9 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
         q_size = self.queue_extract.qsize()
         log("queue extract size:%d"%(q_size))
 
+    def dynamic_listener_extract(self):
+        dynamic_listener(["dataflow_extract","dataflow_extract_fix","dataflow_extract_his"],1,self.ExtractListener,lambda _dict,result_queue:self.comsumer_handle(_dict,result_queue),5)
+
     def process_extract_failed(self):
         def _handle(_dict,result_queue):
             frame = _dict.get("frame")
@@ -1998,6 +2030,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
     def start_flow_extract(self):
         schedule = BlockingScheduler()
         schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
+        schedule.add_job(self.dynamic_listener_extract,"cron",second="*/10")
         schedule.add_job(self.process_extract_failed,"cron",minute="*/5")
         # schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
         # schedule.add_job(self.monitor_listener,"cron",minute="*/5")