Explorar o código

消息队列分级并设置动态消费者

luojiehua hai 2 días
pai
achega
bc1d99ac88
Modificáronse 1 ficheiros con 27 adicións e 16 borrados
  1. 27 16
      BaseDataMaintenance/maintenance/dataflow_mq.py

+ 27 - 16
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -81,6 +81,10 @@ class DynamicProcess(Process):
             _c.conn.disconnect()
         super().terminate()
 
+    def __del__(self):
+        for i in range(len(self.list_comsumer)):
+            _c = self.list_comsumer[i]
+            _c.conn.disconnect()
 
 def dynamic_listener(sorted_names,process_count,listener_cls,comsumer_handler,comsumer_count):
     from BaseDataMaintenance.java.MQInfo import getQueueSize
@@ -97,18 +101,21 @@ def dynamic_listener(sorted_names,process_count,listener_cls,comsumer_handler,co
         for _d in list_queue_dict:
             _size = getQueueSize(_d.get("name"))
             _d["queue_size"] = _size
-            if _size>1000:
+            if _size>6900:
                 _d["size_level"] = 1
             else:
                 _d["size_level"] = 2
 
         list_queue_dict.sort(key=lambda x:(x.get("size_level"),x.get("level")))
         queue_name = list_queue_dict[0].get("name")
+        log("dynamic_listener last_name:%s queue_name:%s %s"%(last_name,queue_name,str(list_queue_dict)))
+        queue_name = "dataflow_attachment_his"
         if last_name!=queue_name:
             for _c in list_comsumer:
-                _c.terminate()
-                del _c
-            list_comsumer = []
+                _c.kill()
+                _c.join()
+            log("dynamic_listener terminate")
+            list_comsumer.clear()
             for _i in range(process_count):
                 listener_p = DynamicProcess(listener_cls,comsumer_handler,comsumer_count,queue_name=queue_name)
                 listener_p.start()
@@ -160,7 +167,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
         self.queue_attachment_ocr = Queue()
         self.queue_attachment_not_ocr = Queue()
         self.comsumer_count = 20
-        self.comsumer_process_count = 2
+        self.comsumer_process_count = 5
         self.comsumer_process_count_fix = 1
         self.comsumer_process_count_his = 1
         self.retry_comsumer_count = 10
@@ -180,7 +187,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
         self.session = None
 
         for _ in range(self.comsumer_process_count):
-            listener_p = Process(target=self.start_attachment_listener,args=(5,self.mq_attachment))
+            listener_p = Process(target=self.start_attachment_listener,args=(10,self.mq_attachment))
             listener_p.start()
         for _ in range(self.comsumer_process_count_fix):
             listener_p = Process(target=self.start_attachment_listener,args=(5,self.mq_attachment_fix))
@@ -189,8 +196,8 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
             listener_p = Process(target=self.start_attachment_listener,args=(5,self.mq_attachment_his))
             listener_p.start()
 
-        listener_p = Process(target=dynamic_listener,args=(["dataflow_attachment","dataflow_attachment_fix","dataflow_attachment_his"],6,self.AttachmentMQListener,lambda _dict:self.attachment_listener_handler(_dict),5))
-        listener_p.start()
+        # listener_p = Process(target=dynamic_listener,args=(["dataflow_attachment","dataflow_attachment_fix","dataflow_attachment_his"],1,self.AttachmentMQListener,lambda _dict: self.attachment_listener_handler(_dict),2))
+        # listener_p.start()
 
         # listener_p = Process(target=self.start_attachment_listener)
         # listener_p.start()
@@ -251,7 +258,10 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
             message_id = frame.headers["message-id"]
             item = json.loads(frame.body)
             _idx = _dict.get("idx",1)
-            page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
+            try:
+                page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
+            except Exception as e:
+                page_attachments = []
             _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
 
             if random.random()<0.2:
@@ -280,6 +290,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
         except Exception as e:
             traceback.print_exc()
 
+
     def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
         try:
             list_html = []
@@ -750,10 +761,10 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
                 if _filemd5 not in set_md5:
 
+                    log("getAttachments search in ots:%s"%(_filemd5))
                     _path = self.getAttachPath(_filemd5,_dochtmlcon)
 
 
-                    log("getAttachments search in ots:%s"%(_filemd5))
                     _attach = {attachment_filemd5:_filemd5}
                     _attach_ots = attachment(_attach)
                     if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time,attachment_classification],True):
@@ -960,17 +971,17 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
 
         # 提取listener
         if create_listener:
-            for ii in range(2):
+            for ii in range(8):
                 listener_p = Process(target=self.start_extract_listener,args=(5,self.mq_extract))
                 listener_p.start()
             for ii in range(1):
-                listener_p = Process(target=self.start_extract_listener,args=(5,self.mq_extract_fix))
+                listener_p = Process(target=self.start_extract_listener,args=(2,self.mq_extract_fix))
                 listener_p.start()
-            for ii in range(1):
-                listener_p = Process(target=self.start_extract_listener,args=(5,self.mq_extract_his))
+            for ii in range(2):
+                listener_p = Process(target=self.start_extract_listener,args=(2,self.mq_extract_his))
                 listener_p.start()
-            listener_p = Process(target=dynamic_listener,args=(["dataflow_extract","dataflow_extract_fix","dataflow_extract_his"],8,self.ExtractListener,lambda _dict,result_queue:self.comsumer_handle(_dict,result_queue),5))
-            listener_p.start()
+            # listener_p = Process(target=dynamic_listener,args=(["dataflow_extract","dataflow_extract_fix","dataflow_extract_his"],8,self.ExtractListener,lambda _dict,result_queue:self.comsumer_handle(_dict,result_queue),2))
+            # listener_p.start()
             listener_p_ai = Thread(target=self.start_extract_AI_listener)
             listener_p_ai.start()