Bladeren bron

修复附件识别和要素提取消费不足问题

luojiehua 1 jaar geleden
bovenliggende
commit
a67d8bdcef
2 gewijzigde bestanden met toevoegingen van 130 en 22 verwijderingen
  1. 11 9
      BaseDataMaintenance/maintenance/dataflow.py
  2. 119 13
      BaseDataMaintenance/maintenance/dataflow_mq.py

+ 11 - 9
BaseDataMaintenance/maintenance/dataflow.py

@@ -1818,15 +1818,17 @@ class Dataflow_attachment(Dataflow):
             for t in self.process_list_thread:
                 t.start()
 
-        failed_count = 0
-        for _i in range(len(self.process_list_thread)):
-            t = self.process_list_thread[_i]
-            if not t.is_alive():
-                failed_count += 1
-                self.prcess_list_thread[_i] = Thread(target=self.process_comsumer_handle)
-                self.prcess_list_thread[_i].start()
-        if failed_count>0:
-            log("attachment failed %d"%(failed_count))
+        while 1:
+            failed_count = 0
+            for _i in range(len(self.process_list_thread)):
+                t = self.process_list_thread[_i]
+                if not t.is_alive():
+                    failed_count += 1
+                    self.prcess_list_thread[_i] = Thread(target=self.process_comsumer_handle)
+                    self.prcess_list_thread[_i].start()
+            if failed_count>0:
+                log("attachment failed %d"%(failed_count))
+            time.sleep(5)
 
 
     def process_comsumer_handle(self):

+ 119 - 13
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -18,6 +18,7 @@ from BaseDataMaintenance.model.postgres.document_extract import *
 import sys
 sys.setrecursionlimit(1000000)
 
+from multiprocessing import Process
 
 class ActiveMQListener():
 
@@ -40,6 +41,32 @@ class ActiveMQListener():
 
 class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
+    class AttachmentMQListener():
+
+        def __init__(self,conn,_func,_idx,*args,**kwargs):
+            self.conn = conn
+            self._func = _func
+            self._idx = _idx
+
+        def on_error(self, headers):
+            log("===============")
+            log('received an error %s' % str(headers.body))
+
+        def on_message(self, headers):
+            try:
+                log("get message of idx:%s"%(str(self._idx)))
+                message_id = headers.headers["message-id"]
+                body = headers.body
+                _dict = {"frame":headers,"conn":self.conn}
+                self._func(_dict=_dict)
+            except Exception as e:
+                traceback.print_exc()
+                pass
+
+
+        def __del__(self):
+            self.conn.disconnect()
+
     def __init__(self):
         Dataflow_attachment.__init__(self)
 
@@ -47,20 +74,46 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
         self.mq_attachment = "/queue/dataflow_attachment"
         self.mq_attachment_failed = "/queue/dataflow_attachment_failed"
         self.mq_extract = "/queue/dataflow_extract"
+
+        self.queue_attachment_ocr = Queue()
+        self.queue_attachment_not_ocr = Queue()
         self.comsumer_count = 120
         self.retry_comsumer_count = 10
         self.retry_times = 5
         self.list_attachment_comsumer = []
-        for _i in range(self.comsumer_count):
-            listener_attachment = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
-            createComsumer(listener_attachment,self.mq_attachment)
-            self.list_attachment_comsumer.append(listener_attachment)
+
+        # for _i in range(self.comsumer_count):
+        #     listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.queue_attachment)
+        #     createComsumer(listener_attachment,self.mq_attachment)
+        #     self.list_attachment_comsumer.append(listener_attachment)
+
         self.attach_pool = ConnectorPool(10,30,getConnection_postgres)
         self.conn_mq = getConnect_activateMQ()
         self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
 
         self.session = requests.Session()
 
+        listener_p = Process(target=self.start_attachment_listener)
+        listener_p.start()
+
+
+
+    def start_attachment_listener(self):
+        for _i in range(self.comsumer_count):
+            listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
+            createComsumer(listener_attachment,self.mq_attachment)
+            self.list_attachment_comsumer.append(listener_attachment)
+
+        while 1:
+            for i in range(len(self.list_attachment_comsumer)):
+                if self.list_attachment_comsumer[i].conn.is_connected():
+                    continue
+                else:
+                    listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
+                    createComsumer(listener,self.mq_attachment)
+                    self.list_attachment_comsumer[i] = listener
+            time.sleep(5)
+
     def monitor_listener(self):
         for i in range(len(self.list_attachment_comsumer)):
             if self.list_attachment_comsumer[i].conn.is_connected():
@@ -90,7 +143,34 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
             for _c in list_comsumer:
                 _c.conn.disconnect()
 
+    def attachment_listener_handler(self,_dict):
+        try:
+            frame = _dict["frame"]
+            conn = _dict["conn"]
+            message_id = frame.headers["message-id"]
+            item = json.loads(frame.body)
+            page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
+            _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
+
+            if len(page_attachments)==0:
+                newitem ={"item":item,"list_attach":[],"message_id":message_id,"conn":conn}
+            else:
+                list_fileMd5 = []
+                for _atta in page_attachments:
+                    list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
+
+                list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
+
+                newitem = {"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn}
+
+
+            log("attachment get doc:%s"%(str(newitem.get("item",{}).get("docid"))))
+            self.attachment_recognize(newitem,None)
 
+            log("attachment get doc:%s succeed"%(str(newitem.get("item",{}).get("docid"))))
+
+        except Exception as e:
+            traceback.print_exc()
 
     def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
         try:
@@ -173,7 +253,6 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
 
 
-
             _to_ack = False
             if not _succeed and _retry_times<self.retry_times:
                 item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
@@ -490,6 +569,14 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
         mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1,need_stop=False,restart=True)
         mt.run()
 
+    def flow_attachment_process(self):
+        self.process_comsumer()
+
+        # p = Process(target = self.process_comsumer)
+        # p.start()
+        # p.join()
+
+
     def set_queue(self,_dict):
         list_attach = _dict.get("list_attach")
         to_ocr = False
@@ -537,14 +624,16 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
     def start_flow_attachment(self):
         schedule = BlockingScheduler()
-        schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
+        # schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
         # schedule.add_job(self.flow_attachment,"cron",second="*/10")
-        schedule.add_job(self.flow_attachment_producer,"cron",second="*/10")
-        schedule.add_job(self.flow_attachment_producer_comsumer,"cron",second="*/10")
-        schedule.add_job(self.monitor_attachment_process,"cron",second="*/10")
+        # schedule.add_job(self.flow_attachment_producer,"cron",second="*/10")
+        # schedule.add_job(self.flow_attachment_producer_comsumer,"cron",second="*/10")
+        # schedule.add_job(self.monitor_listener,"cron",minute="*/1")
+
+
+        # schedule.add_job(self.monitor_attachment_process,"cron",second="*/10")
         schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
         schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
-        schedule.add_job(self.monitor_listener,"cron",minute="*/1")
         schedule.start()
 
 class Dataflow_ActivteMQ_extract(Dataflow_extract):
@@ -606,13 +695,30 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
 
 
         self.list_extract_comsumer = []
+
+        # for _i in range(self.comsumer_count):
+        #     listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
+        #     createComsumer(listener_extract,self.mq_extract)
+        #     self.list_extract_comsumer.append(listener_extract)
+        listener_p = Process(target=self.start_extract_listener)
+        listener_p.start()
+
+    def start_extract_listener(self):
+
         for _i in range(self.comsumer_count):
             listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
             createComsumer(listener_extract,self.mq_extract)
             self.list_extract_comsumer.append(listener_extract)
 
-
-
+        while 1:
+            for i in range(len(self.list_extract_comsumer)):
+                if self.list_extract_comsumer[i].conn.is_connected():
+                    continue
+                else:
+                    listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
+                    createComsumer(listener,self.mq_extract)
+                    self.list_extract_comsumer[i] = listener
+            time.sleep(5)
 
     def monitor_listener(self):
         for i in range(len(self.list_extract_comsumer)):
@@ -977,7 +1083,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
         schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
         schedule.add_job(self.process_extract_failed,"cron",minute="*/5")
         schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
-        schedule.add_job(self.monitor_listener,"cron",minute="*/5")
+        # schedule.add_job(self.monitor_listener,"cron",minute="*/5")
         schedule.start()
 
 from multiprocessing import RLock