Преглед на файлове

消息队列分级并设置动态消费者

luojiehua преди 2 дни
родител
ревизия
e148de25a3
променени са 3 файла, в които са добавени 137 реда и са изтрити 33 реда
  1. 36 2
      BaseDataMaintenance/maintenance/dataflow.py
  2. 99 29
      BaseDataMaintenance/maintenance/dataflow_mq.py
  3. 2 2
      BaseDataMaintenance/maxcompute/documentMerge.py

+ 36 - 2
BaseDataMaintenance/maintenance/dataflow.py

@@ -5,6 +5,7 @@ from BaseDataMaintenance.common.multiThread import MultiThreadHandler
 from BaseDataMaintenance.common.multiProcess import MultiHandler
 from queue import Queue
 from multiprocessing import Queue as PQueue
+from multiprocessing import Process
 
 from BaseDataMaintenance.model.ots.document_tmp import *
 from BaseDataMaintenance.model.ots.attachment import *
@@ -143,7 +144,40 @@ class Dataflow():
         self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
         self.current_path = os.path.dirname(__file__)
 
+    def dynamic_listener(self,sorted_names,process_count,target,comsumer_count):
+        from BaseDataMaintenance.java.MQInfo import getQueueSize
 
+        list_queue_dict = []
+        for _i in range(len(sorted_names)):
+            list_queue_dict.append({"name":sorted_names[_i],
+                                    "level":_i})
+        if len(list_queue_dict)==0:
+            return
+        last_name = ""
+        list_comsumer = []
+        while 1:
+            for _d in list_queue_dict:
+                _size = getQueueSize(_d.get("name"))
+                _d["queue_size"] = _size
+                if _size>1000:
+                    _d["size_level"] = 1
+                else:
+                    _d["size_level"] = 2
+
+            list_queue_dict.sort(key=lambda x:(x.get("size_level"),x.get("level")))
+            queue_name = list_queue_dict[0].get("name")
+            if last_name!=queue_name:
+                for _c in list_comsumer:
+                    _c.terminate()
+                    del _c
+                list_comsumer = []
+                for _i in range(process_count):
+                    listener_p = Process(target=target,args=(comsumer_count,queue_name))
+                    listener_p.start()
+                    list_comsumer.append(listener_p)
+
+                last_name = queue_name
+            time.sleep(120)
 
     def flow_init(self):
         def producer():
@@ -5274,7 +5308,7 @@ class Dataflow_dumplicate(Dataflow):
 
         if item:
             log("start dumplicate_comsumer_handle")
-            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=False)
+            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=True)
             # self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
             return
 
@@ -5514,7 +5548,7 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(656253771
+    df_dump.test_dumplicate(627326288
                             )
     # df_dump.dumplicate_comsumer_handle_interface(603504420,document_table="document_0000",document_table_index="document_0000_index",project_table="project_0000",project_table_index="project_0000_index_formerge")
     # compare_dumplicate_check()

+ 99 - 29
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -81,13 +81,17 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
 
         self.mq_attachment = "/queue/dataflow_attachment"
+        self.mq_attachment_fix = "/queue/dataflow_attachment_fix"
+        self.mq_attachment_his = "/queue/dataflow_attachment_his"
         self.mq_attachment_failed = "/queue/dataflow_attachment_failed"
         self.mq_extract = "/queue/dataflow_extract"
 
         self.queue_attachment_ocr = Queue()
         self.queue_attachment_not_ocr = Queue()
         self.comsumer_count = 20
-        self.comsumer_process_count = 5
+        self.comsumer_process_count = 2
+        self.comsumer_process_count_fix = 1
+        self.comsumer_process_count_his = 1
         self.retry_comsumer_count = 10
         self.retry_times = 5
         self.list_attachment_comsumer = []
@@ -105,28 +109,39 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
         self.session = None
 
         for _ in range(self.comsumer_process_count):
-            listener_p = Process(target=self.start_attachment_listener)
+            listener_p = Process(target=self.start_attachment_listener,args=(5,self.mq_attachment))
             listener_p.start()
+        for _ in range(self.comsumer_process_count_fix):
+            listener_p = Process(target=self.start_attachment_listener,args=(5,self.mq_attachment_fix))
+            listener_p.start()
+        for _ in range(self.comsumer_process_count_his):
+            listener_p = Process(target=self.start_attachment_listener,args=(5,self.mq_attachment_his))
+            listener_p.start()
+
+        listener_p = Process(target=self.dynamic_listener,args=(["dataflow_attachment","dataflow_attachment_fix","dataflow_attachment_his"],4,self.start_attachment_listener,5))
+        listener_p.start()
 
         # listener_p = Process(target=self.start_attachment_listener)
         # listener_p.start()
 
 
 
-    def start_attachment_listener(self):
-        for _i in range(self.comsumer_count):
+
+    def start_attachment_listener(self,comsumer_count,queue_name):
+        list_attachment_comsumer = []
+        for _i in range(comsumer_count):
             listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler ,_i)
-            createComsumer(listener_attachment,self.mq_attachment)
-            self.list_attachment_comsumer.append(listener_attachment)
+            createComsumer(listener_attachment,queue_name)
+            list_attachment_comsumer.append(listener_attachment)
 
         while 1:
-            for i in range(len(self.list_attachment_comsumer)):
-                if self.list_attachment_comsumer[i].conn.is_connected():
+            for i in range(len(list_attachment_comsumer)):
+                if list_attachment_comsumer[i].conn.is_connected():
                     continue
                 else:
                     listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,i)
-                    createComsumer(listener,self.mq_attachment)
-                    self.list_attachment_comsumer[i] = listener
+                    createComsumer(listener,queue_name)
+                    list_attachment_comsumer[i] = listener
             time.sleep(5)
 
     def monitor_listener(self):
@@ -818,6 +833,8 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
 
 
         self.mq_extract = "/queue/dataflow_extract"
+        self.mq_extract_fix = "/queue/dataflow_extract_fix"
+        self.mq_extract_his = "/queue/dataflow_extract_his"
         self.mq_extract_ai = "/queue/dataflow_extract_AI"
         self.mq_extract_failed = "/queue/dataflow_extract_failed"
 
@@ -855,14 +872,25 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
 
         # 提取listener
         if create_listener:
-            for ii in range(10):
-                listener_p = Process(target=self.start_extract_listener)
+            for ii in range(2):
+                listener_p = Process(target=self.start_extract_listener,args=(5,self.mq_extract))
+                listener_p.start()
+            for ii in range(1):
+                listener_p = Process(target=self.start_extract_listener,args=(5,self.mq_extract_fix))
+                listener_p.start()
+            for ii in range(1):
+                listener_p = Process(target=self.start_extract_listener,args=(5,self.mq_extract_his))
                 listener_p.start()
+            listener_p = Process(target=self.dynamic_listener,args=(["dataflow_extract","dataflow_extract_fix","dataflow_extract_his"],6,self.start_extract_listener,5))
+            listener_p.start()
             listener_p_ai = Thread(target=self.start_extract_AI_listener)
             listener_p_ai.start()
 
 
 
+
+
+
     def start_extract_AI_listener(self,_count=8):
 
         self.list_extract_ai_comsumer = []
@@ -886,24 +914,24 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
                 traceback.print_exc()
 
 
-    def start_extract_listener(self):
+    def start_extract_listener(self,comsumer_count,queue_name):
 
-        self.list_extract_comsumer = []
+        list_extract_comsumer = []
 
-        for _i in range(self.comsumer_count):
+        for _i in range(comsumer_count):
             listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
-            createComsumer(listener_extract,self.mq_extract)
-            self.list_extract_comsumer.append(listener_extract)
+            createComsumer(listener_extract,queue_name)
+            list_extract_comsumer.append(listener_extract)
 
         while 1:
             try:
-                for _i in range(len(self.list_extract_comsumer)):
-                    if self.list_extract_comsumer[_i].conn.is_connected():
+                for _i in range(len(list_extract_comsumer)):
+                    if list_extract_comsumer[_i].conn.is_connected():
                         continue
                     else:
                         listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
-                        createComsumer(listener,self.mq_extract)
-                        self.list_extract_comsumer[_i] = listener
+                        createComsumer(listener,queue_name)
+                        list_extract_comsumer[_i] = listener
                 time.sleep(5)
             except Exception as e:
                 traceback.print_exc()
@@ -1930,7 +1958,11 @@ class Dataflow_init(Dataflow):
             self.begin_docid = None
             self.mq_init = "/queue/dataflow_init"
             self.mq_attachment = "/queue/dataflow_attachment"
+            self.mq_attachment_fix = "/queue/dataflow_attachment_fix"
+            self.mq_attachment_his = "/queue/dataflow_attachment_his"
             self.mq_extract = "/queue/dataflow_extract"
+            self.mq_extract_fix = "/queue/dataflow_extract_fix"
+            self.mq_extract_his = "/queue/dataflow_extract_his"
             self.pool_mq1 = ConnectorPool(1,4,getConnect_activateMQ)
 
         def on_error(self, headers):
@@ -1960,10 +1992,23 @@ class Dataflow_init(Dataflow):
                     body[document_original_docchannel] = body.get(document_docchannel)
                 page_attachments = body.get(document_tmp_attachment_path,"[]")
                 _uuid = body.get(document_tmp_uuid,"")
+                current_date = getCurrent_date(format='%Y-%m-%d')
+                last_7_date = timeAdd(current_date,7,format='%Y-%m-%d')
+                page_time = body.get(document_page_time,"")
+                if page_time<last_7_date:
+                    is_his = True
+                else:
+                    is_his = False
+
+
                 if page_attachments!="[]":
                     status = random.randint(1,10)
                     body[document_tmp_status] = status
-                    if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment):
+                    if not is_his:
+                        queue_name = self.mq_attachment
+                    else:
+                        queue_name = self.mq_attachment_his
+                    if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),queue_name):
                         log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
                         ackMsg(self.conn,message_id)
                     else:
@@ -1971,7 +2016,11 @@ class Dataflow_init(Dataflow):
                 else:
                     status = random.randint(11,50)
                     body[document_tmp_status] = status
-                    if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract):
+                    if not is_his:
+                        queue_name = self.mq_extract
+                    else:
+                        queue_name = self.mq_extract_his
+                    if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),queue_name):
                         log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
                         ackMsg(self.conn,message_id)
                     else:
@@ -1993,7 +2042,11 @@ class Dataflow_init(Dataflow):
         self.mq_init = "/queue/dataflow_init"
 
         self.mq_attachment = "/queue/dataflow_attachment"
+        self.mq_attachment_fix = "/queue/dataflow_attachment_fix"
+        self.mq_attachment_his = "/queue/dataflow_attachment_his"
         self.mq_extract = "/queue/dataflow_extract"
+        self.mq_extract_fix = "/queue/dataflow_extract_fix"
+        self.mq_extract_his = "/queue/dataflow_extract_his"
         self.pool_oracle = ConnectorPool(10,15,getConnection_oracle)
         self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
 
@@ -2082,14 +2135,31 @@ class Dataflow_init(Dataflow):
                                 ots_dict["docid"] += self.base_shenpi_id
                                 ots_dict["partitionkey"] = ots_dict["docid"]%500+1
 
+                            current_date = getCurrent_date(format='%Y-%m-%d')
+                            last_7_date = timeAdd(current_date,7,format='%Y-%m-%d')
+                            page_time = ots_dict.get(document_page_time,"")
+                            if page_time<last_7_date:
+                                is_his = True
+                            else:
+                                is_his = False
+
+
                             if ots_dict.get(T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS,"") !='[]':
-                                if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_attachment):
+                                if not is_his:
+                                    queue_name = self.mq_attachment
+                                else:
+                                    queue_name = self.mq_attachment_his
+                                if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),queue_name):
                                     self.max_shenpi_id = _id
                                 else:
                                     log("sent shenpi message to mq failed %s"%(_id))
                                     break
                             else:
-                                if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_extract):
+                                if not is_his:
+                                    queue_name = self.mq_extract
+                                else:
+                                    queue_name = self.mq_extract_his
+                                if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),queue_name):
                                     self.max_shenpi_id = _id
                                 else:
                                     log("sent shenpi message to mq failed %s"%(_id))
@@ -2235,11 +2305,11 @@ class Dataflow_init(Dataflow):
                 if page_attachments!="[]":
                     status = random.randint(1,10)
                     _data[document_tmp_status] = status
-                    send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
+                    send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment_fix)
                 else:
                     status = random.randint(11,50)
                     _data[document_tmp_status] = status
-                    send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
+                    send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract_fix)
                 if send_succeed:
                     _document.setValue(document_tmp_status,0,True)
                     _document.update_row(self.ots_client)
@@ -2285,11 +2355,11 @@ class Dataflow_init(Dataflow):
                 if page_attachments!="[]":
                     status = random.randint(1,10)
                     _data[document_tmp_status] = status
-                    send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
+                    send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment_fix)
                 else:
                     status = random.randint(11,50)
                     _data[document_tmp_status] = status
-                    send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
+                    send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract_fix)
                 if send_succeed:
                     _document.setValue(document_tmp_status,1,True)
                     _document.update_row(self.ots_client)

+ 2 - 2
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -2637,8 +2637,8 @@ def check_project_codes_merge(list_code,list_code_to_merge,b_log):
     has_similar = False
     for _c in list_code[:100]:
         for _c1 in list_code_to_merge[:100]:
-            _c = str(_c).replace("【","[").replace("】","]")
-            _c1 = str(_c1).replace("【","[").replace("】","]")
+            _c = str(_c).replace("【","[").replace("】","]").replace("(","(").replace(")",")")
+            _c1 = str(_c1).replace("【","[").replace("】","]").replace("(","(").replace(")",")")
             _simi = getSimilarityOfString(_c,_c1,3)
             if _simi==1:
                 has_same = True