Prechádzať zdrojové kódy

将提取失败的钉钉发送放到监控中

luojiehua 11 mesiacov pred
rodič
commit
d836bb269c

+ 8 - 8
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -340,7 +340,11 @@ class BaseDataMonitor():
 
             total_count_todeal = getQueueSize("dataflow_extract")
 
-            if total_count_todeal>5000:
+            if total_count_todeal>1000:
+                _cmd = 'cat %s | grep "%s" | grep -c "要素提取失败:docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
+                log(_cmd)
+                process_failed_count = self.cmd_execute(_cmd)
+
                 _cmd = 'cat %s | grep "%s" | grep -c "process.*docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
                 log(_cmd)
                 process_count = self.cmd_execute(_cmd)
@@ -375,8 +379,8 @@ class BaseDataMonitor():
                 #                                                                              SearchQuery(query,None,True),
                 #                                                                              columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
 
-                _msg = "要素提取队列报警:队列堆积%s条公告,最近十分钟入库数:%s,最近十分钟处理公告数:%s,其中成功处理数:%s,查库免提取数:%s" % (
-                str(total_count_todeal), str(init_count), str(process_count), str(success_count), str(exists_count))
+                _msg = "要素提取队列报警:队列堆积%s条公告,最近十分钟入库数:%s,最近十分钟处理公告数:%s,其中成功处理数:%s,处理失败数:%s,查库免提取数:%s" % (
+                str(total_count_todeal), str(init_count), str(process_count), str(success_count), str(process_failed_count),str(exists_count))
                 log(_msg)
                 atAll = False
                 if success_count == 0:
@@ -697,13 +701,9 @@ class BaseDataMonitor():
         scheduler.add_job(self.monitor_extract,"cron",minute="*/10")
         scheduler.add_job(self.monitor_proposedBuilding,"cron",hour="*/11")
         # scheduler.add_job(self.monitor_dumplicate,"cron",minute="*/10")
-        scheduler.add_job(self.monitor_sychr,"cron",minute="*/10")
-        scheduler.add_job(self.monitor_preproject,"cron",hour="8")
         scheduler.add_job(self.monitor_merge,"cron",hour="*/2")
-        scheduler.add_job(self.monitor_init,"cron",hour="*/3")
-        scheduler.add_job(self.monitor_sychr, "cron", minute="*/10")
+        scheduler.add_job(self.monitor_sychr, "cron", minute="*/30")
         scheduler.add_job(self.monitor_preproject, "cron", hour="8")
-        scheduler.add_job(self.monitor_merge, "cron", minute="*/60")
         scheduler.add_job(self.monitor_init, "cron", hour="*/3")
         scheduler.start()
 

+ 4 - 2
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -1087,7 +1087,8 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             _to_ack = True
             try:
                 if all_done!=1:
-                    sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
+                    # sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
+                    log("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
                     if extract_times>=10:
                         #process as succeed
                         dtmp.setValue(document_tmp_dochtmlcon,"",False)
@@ -1156,7 +1157,8 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
                 ackMsg(conn,message_id,subscription)
         except Exception as e:
             traceback.print_exc()
-            sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
+            # sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
+            log("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
             log("process %s docid: failed message_id:%s"%(data.get("doc_id"),message_id))
             if extract_times>=10:
                 #process as succeed