Преглед на файлове

数据遗漏检查报警修复

luojiehua преди 10 месеца
родител
ревизия
a6b9ed6cad

+ 12 - 1
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -120,7 +120,18 @@ class BaseDataMonitor():
                                                                            SearchQuery(bool_query,get_total_count=True),
                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
 
-            _item["exists"] = total_count
+            if total_count>0:
+                _item["exists"] = total_count
+            else:
+                bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
+
+                rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+                                                                               SearchQuery(bool_query,get_total_count=True),
+                                                                               columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+                _item["exists"] = total_count
+
+
         try:
             current_date = getCurrent_date("%Y-%m-%d")
 

+ 1 - 1
BaseDataMaintenance/maintenance/dataflow.py

@@ -2231,7 +2231,7 @@ class Dataflow_dumplicate(Dataflow):
         _dict["moneys_attachment"] = set(_extract.get("moneys_attachment",[]))
         _dict["nlp_enterprise"] = json.dumps({"indoctextcon":_extract.get("nlp_enterprise",[]),
                                        "notindoctextcon":_extract.get("nlp_enterprise_attachment",[])},ensure_ascii=False)
-        _dict["extract_count"] = self.c_f_get_extractCount.evaluate(extract_json)
+        _dict["extract_count"] = _extract.get("extract_count",0)
         _dict["package"] = self.c_f_get_package.evaluate(extract_json)
         _dict["project_name"] = _extract.get("name","")
         _dict["dict_time"] = self.get_dict_time(_extract)

+ 20 - 5
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -9,7 +9,7 @@ from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
 import os
 from BaseDataMaintenance.common.ossUtils import *
 from BaseDataMaintenance.dataSource.pool import ConnectorPool
-from BaseDataMaintenance.model.ots.document import Document
+from BaseDataMaintenance.model.ots.document import Document,document_attachment_path_filemd5
 
 from BaseDataMaintenance.common.Utils import article_limit
 from BaseDataMaintenance.common.documentFingerprint import getFingerprint
@@ -264,20 +264,33 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
             if "retry_times" not in item:
                 item["retry_times"] = 5
             _retry_times = item.get("retry_times",0)
+
+
             dhtml = Document_html({"partitionkey":item.get("partitionkey"),
                                    "docid":item.get("docid")})
 
             _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
             dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
             dhtml.delete_bidi_a()
-            dtmp = Document_tmp(item)
-
-
 
             #调用识别接口
             _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
 
+            # 将附件分类写回document
+            page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
+            if len(page_attachments)>0:
+                for _attachment in page_attachments:
+                    filemd5 = _attachment.get(document_attachment_path_filemd5,"")
+                    classification = None
+                    for _attach in list_attach:
+                        if _attach.getProperties().get(attachment_filemd5,"")==filemd5:
+                            classification = _attach.getProperties().get(attachment_classification,"")
+                            break
+                    if classification is not None:
+                        _attachment[attachment_classification] = classification
+                item[document_tmp_attachment_path] = json.dumps(page_attachments,ensure_ascii=False)
 
+            dtmp = Document_tmp(item)
 
             _to_ack = False
             if not _succeed and _retry_times<self.retry_times:
@@ -305,6 +318,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                     dhtml.updateSWFImages(swf_urls)
                     dhtml.updateAttachment(list_html)
 
+
                     dtmp.setValue(document_tmp_attachment_extract_status,1,True)
                     dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
                     send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
@@ -634,7 +648,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                     log("getAttachments search in ots:%s"%(_filemd5))
                     _attach = {attachment_filemd5:_filemd5}
                     _attach_ots = attachment(_attach)
-                    if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time],True):
+                    if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time,attachment_classification],True):
                         if _attach_ots.getProperties().get(attachment_status) is not None:
                             log("getAttachments find in ots:%s"%(_filemd5))
                             _attach_pg = Attachment_postgres(_attach_ots.getProperties())
@@ -1051,6 +1065,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             data["web_source_no"] = item.get(document_tmp_web_source_no,"")
             data["web_source_name"] = item.get(document_tmp_web_source_name,"")
             data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
+            data["page_attachments"] = item.get(document_tmp_attachment_path,"[]")
 
             _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))+str(data["original_docchannel"])
 

+ 2 - 2
BaseDataMaintenance/maintenance/product/extract_data.py

@@ -37,7 +37,7 @@ def extract_parameters(_html):
                     childs = get_childs([_data])
                     for c in childs:
                         requirement_text += c["text"]+"\n"
-                    _data_i += len(childs)
+                    _data_i += len(childs)-1
     _data_i = -1
     while _data_i<len(list_data)-1:
         _data_i += 1
@@ -53,7 +53,7 @@ def extract_parameters(_html):
 
                     for c in childs:
                         aptitude_text += c["text"]+"\n"
-                    _data_i += len(childs)
+                    _data_i += len(childs)-1
 
         if _type=="table":
             list_table = _data["list_table"]

Файловите разлики са ограничени, защото са твърде много
+ 21 - 1
BaseDataMaintenance/maxcompute/1.py


+ 1 - 0
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -237,6 +237,7 @@ class f_get_extractCount(object):
     def evaluate(self, extractjson):
         if extractjson is not None:
             _extract = json.loads(extractjson)
+            return _extract.get("extract_count",0)
         else:
             _extract = {}
         dict_pack = _extract.get("prem",{})

+ 2 - 2
BaseDataMaintenance/model/ots/document.py

@@ -405,9 +405,9 @@ def turn_document_status():
         # item.setValue(document_province,"广东",True)
         # item.setValue(document_city,"珠海",True)
         # item.setValue(document_district,"金湾区",True)
-        item.setValue(document_status,66,True)
+        # item.setValue(document_status,66,True)
         # print(item.getProperties())
-        item.update_row(ots_client)
+        # item.update_row(ots_client)
         # log("update %d status done"%(item.getProperties().get(document_docid)))
         pass
 

Някои файлове не бяха показани, защото твърде много файлове са промени