Просмотр исходного кода

调整附件监控,调整提取调用接口权重,调整公告附件入库规则

luojiehua 2 лет назад
Родитель
Сommit
0f3a75f5f0

+ 47 - 10
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -137,6 +137,7 @@ class BaseDataMonitor():
 
 
     def monitor_attachment(self):
+
         try:
             # query = BoolQuery(must_queries=[
             #     RangeQuery("status",0,11),
@@ -165,21 +166,57 @@ class BaseDataMonitor():
                 #                                                                            SearchQuery(query,None,True),
                 #                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
 
-                _cmd = 'cat %s | grep -c "%s.*process filemd5"'%(flow_attachment_log_path,self.get_last_tenmin_time())
-                log(_cmd)
-                process_count = self.cmd_execute(_cmd)
 
-                _cmd = 'cat %s | grep -c "%s.*process filemd5.*True"'%(flow_attachment_log_path,self.get_last_tenmin_time())
-                log(_cmd)
-                process_succeed_count = self.cmd_execute(_cmd)
 
-                _cmd = 'cat %s | grep -c "%s.*delete sql"'%(flow_init_path,self.get_last_tenmin_time())
-                log(_cmd)
-                init_count = self.cmd_execute(_cmd)
 
 
+                #通过命令行获取日志情况
+                # _cmd = 'cat %s | grep -c "%s.*process filemd5"'%(flow_attachment_log_path,self.get_last_tenmin_time())
+                # log(_cmd)
+                # process_count = self.cmd_execute(_cmd)
+                #
+                # _cmd = 'cat %s | grep -c "%s.*process filemd5.*True"'%(flow_attachment_log_path,self.get_last_tenmin_time())
+                # log(_cmd)
+                # process_succeed_count = self.cmd_execute(_cmd)
+                #
+                # _cmd = 'cat %s | grep -c "%s.*delete sql"'%(flow_init_path,self.get_last_tenmin_time())
+                # log(_cmd)
+                # init_count = self.cmd_execute(_cmd)
+
+                # _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
+
+                #通过读取文件获取日志情况
+                dict_type = {}
+                _pattern = "%s.*process filemd5\:[^\s]* (?P<result>(True|False)) of type\:(?P<type>[^\s]*).*recognize takes (?P<costtime>\d+)s"%(re.escape(self.get_last_tenmin_time()))
+                with open(flow_attachment_log_path,"r",encoding="utf8") as f:
+                    while True:
+                        line = f.readline()
+                        if not line:
+                            break
+                        _match = re.search(_pattern,str(line))
+                        if _match is not None:
+                            _type = _match.groupdict().get("type")
+                            _result = _match.groupdict().get("result")
+                            _costtime = _match.groupdict().get("costtime")
+                            if _type not in dict_type:
+                                dict_type[_type] = {"success":0,"fail":0,"success_costtime":0,"fail_costtime":0}
+                            if _result=="True":
+                                dict_type[_type]["success"] += 1
+                                dict_type[_type]["success_costtime"] += int(_costtime)
+                            else:
+                                dict_type[_type]["fail"] += 1
+                                dict_type[_type]["fail_costtime"] += int(_costtime)
+
+                process_count = 0
+                process_succeed_count = 0
+                _msg_type = ""
+                for k,v in dict_type.items():
+                    process_count += v.get("success",0)+v.get("fail",0)
+                    process_succeed_count += v.get("success",0)
+                    _msg_type += "\n类型%s\n\t成功%s,消耗%s秒,%s秒/个,\n\t失败%s,消耗%s秒,%s秒/个"%(k,str(v.get("success")),str(v.get("success_costtime")),str(v.get("success_costtime")/max(1,v.get("success"))),str(v.get("fail")),str(v.get("fail_costtime")),str(v.get("fail_costtime")/max(1,v.get("fail"))))
+
                 _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
-                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+                sentMsgToDD(_msg+_msg_type,ACCESS_TOKEN_DATAWORKS)
                 # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
         except Exception as e:
             traceback.print_exc()

+ 1 - 1
BaseDataMaintenance/maintenance/dataflow.py

@@ -1746,7 +1746,7 @@ class Dataflow_attachment(Dataflow):
 
     def process_comsumer(self):
         list_thread = []
-        thread_count = 40
+        thread_count = 60
 
         for i in range(thread_count):
             list_thread.append(Thread(target=self.process_comsumer_handle))

+ 1 - 1
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -524,7 +524,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
         self.industy_url = "http://127.0.0.1:15000/industry_extract"
 
         self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",15],
-                                   ["http://192.168.0.115:15030/content_extract",10]
+                                   ["http://192.168.0.115:15030/content_extract",5]
                                    ]
 
 

+ 14 - 6
BaseDataMaintenance/model/ots/document.py

@@ -82,19 +82,26 @@ class Document(BaseModel):
                 _dochtmlcon += _div
                 self.setValue(document_dochtmlcon,_dochtmlcon,True)
 
+    def getRichTextFetch(self,list_html):
+        _text = ""
+        for _ht in list_html:
+            if isinstance(_ht,str):
+                _text += "<div>%s</div>"%(_ht)
+            elif isinstance(_ht,dict):
+                _filemd5 = _ht.get("filemd5","")
+                _html = _ht.get("html","")
+                _text += '<div filemd5="%s">%s</div>'%(_filemd5,_html)
+        return _text
 
     def updateAttachment(self,list_html):
         if len(list_html)>0:
             _dochtmlcon = self.getProperties().get(document_dochtmlcon,"")
+            _dochtmlcon = re.sub("<html>|</html>|<body>|</body>","",_dochtmlcon)
             _dochtmlcon_len = len(bytes(_dochtmlcon,encoding="utf8"))
             fix_len = self.COLUMN_MAX_SIZE-_dochtmlcon_len-100
 
-            _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%("\n".join(list_html))
-            if len(bytes(_text,encoding="utf8"))>fix_len:
-                list_t = []
-                for _html in list_html:
-                    list_t.append(BeautifulSoup(_html,"lxml").get_text())
-                _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%("\n".join(cut_str(list_html,list_t,fix_len)))
+            # _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%("\n".join(list_html))
+            _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%(self.getRichTextFetch(list_html))
 
 
 
@@ -105,6 +112,7 @@ class Document(BaseModel):
                     _node.decompose()
                 self.setValue(document_dochtmlcon,str(_soup)+_text,True)
 
+
     def getTitleFromHtml(self,filemd5,_html):
         _soup = BeautifulSoup(_html,"lxml")
 

+ 0 - 7
BaseDataMaintenance/model/ots/document_html.py

@@ -69,13 +69,6 @@ class Document_html(BaseModel):
 
             # _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%("\n".join(list_html))
             _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%(self.getRichTextFetch(list_html))
-            if len(bytes(_text,encoding="utf8"))>fix_len:
-                list_t = []
-                for _html in list_html:
-                    list_t.append(BeautifulSoup(_html,"lxml").get_text())
-                _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%("\n".join(cut_str(list_html,list_t,fix_len)))
-
-
 
             if _dochtmlcon is not None:
                 _soup = BeautifulSoup(_dochtmlcon,"lxml")

+ 16 - 6
BaseDataMaintenance/model/ots/document_tmp.py

@@ -94,19 +94,26 @@ class Document_tmp(BaseModel):
                 _dochtmlcon += _div
                 self.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
 
+    def getRichTextFetch(self,list_html):
+        _text = ""
+        for _ht in list_html:
+            if isinstance(_ht,str):
+                _text += "<div>%s</div>"%(_ht)
+            elif isinstance(_ht,dict):
+                _filemd5 = _ht.get("filemd5","")
+                _html = _ht.get("html","")
+                _text += '<div filemd5="%s">%s</div>'%(_filemd5,_html)
+        return _text
 
     def updateAttachment(self,list_html):
         if len(list_html)>0:
             _dochtmlcon = self.getProperties().get(document_tmp_dochtmlcon,"")
+            _dochtmlcon = re.sub("<html>|</html>|<body>|</body>","",_dochtmlcon)
             _dochtmlcon_len = len(bytes(_dochtmlcon,encoding="utf8"))
             fix_len = self.COLUMN_MAX_SIZE-_dochtmlcon_len-100
 
-            _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%("\n".join(list_html))
-            if len(bytes(_text,encoding="utf8"))>fix_len:
-                list_t = []
-                for _html in list_html:
-                    list_t.append(BeautifulSoup(_html,"lxml").get_text())
-                _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%("\n".join(cut_str(list_html,list_t,fix_len)))
+            # _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%("\n".join(list_html))
+            _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%(self.getRichTextFetch(list_html))
 
 
 
@@ -117,6 +124,7 @@ class Document_tmp(BaseModel):
                     _node.decompose()
                 self.setValue(document_tmp_dochtmlcon,str(_soup)+_text,True)
 
+
     def getTitleFromHtml(self,filemd5,_html):
         _soup = BeautifulSoup(_html,"lxml")
 
@@ -269,6 +277,7 @@ def turn_document_tmp_status():
                                                                        columns_to_get=ColumnsToGet(["extract_json"],return_type=ColumnReturnType.SPECIFIED))
         list_data = getRow_ots(rows)
         print(total_count)
+        print(list_data)
         _count = len(list_data)
         for _data in list_data:
             _document = Document_tmp(_data)
@@ -323,6 +332,7 @@ def turn_document_tmp_status():
         # item.setValue(document_tmp_status,random.randint(151,171),True)
         # item.update_row(ots_client)
         # log("update %d status done"%(item.getProperties().get(document_tmp_docid)))
+        item.delete_row(ots_client)
         pass