Эх сурвалжийг харах

增加漏数据及修复数据;附件处理时如果历史已经处理过,则跳过

luojiehua 2 жил өмнө
parent
commit
690b52f0c3

+ 15 - 4
BaseDataMaintenance/maintenance/dataflow.py

@@ -1764,17 +1764,24 @@ class Dataflow_attachment(Dataflow):
     def flow_attachment_process(self):
         self.process_comsumer()
 
+    def monitor_attachment_process(self):
+        alive_count = 0
+        for _t in self.process_list_thread:
+            if _t.is_alive():
+                alive_count += 1
+        log("attachment_process alive:%d total:%d"%(alive_count,len(self.process_list_thread)))
+
     def process_comsumer(self):
-        list_thread = []
+        self.process_list_thread = []
         thread_count = 60
 
         for i in range(thread_count):
-            list_thread.append(Thread(target=self.process_comsumer_handle))
+            self.process_list_thread.append(Thread(target=self.process_comsumer_handle))
 
-        for t in list_thread:
+        for t in self.process_list_thread:
             t.start()
 
-        for t in list_thread:
+        for t in self.process_list_thread:
             t.join()
 
 
@@ -1783,13 +1790,17 @@ class Dataflow_attachment(Dataflow):
             _flag = False
             try:
                 item = self.queue_attachment_ocr.get(True,timeout=0.2)
+                log("attachment get doc:%s"%(str(item.get("docid"))))
                 self.attachment_recognize(item,None)
+                log("attachment get doc:%s succeed"%(str(item.get("docid"))))
             except Exception as e:
                 _flag = True
                 pass
             try:
                 item = self.queue_attachment_not_ocr.get(True,timeout=0.2)
+                log("attachment get doc:%s"%(str(item.get("docid"))))
                 self.attachment_recognize(item,None)
+                log("attachment get doc:%s succeed"%(str(item.get("docid"))))
             except Exception as e:
                 _flag = True and _flag
                 pass

+ 1 - 0
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -518,6 +518,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
         schedule = BlockingScheduler()
         schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
         schedule.add_job(self.flow_attachment,"cron",second="*/10")
+        schedule.add_job(self.monitor_attachment_process,"cron",second="*/10")
         schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
         schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
         schedule.add_job(self.monitor_listener,"cron",minute="*/1")