Преглед изворни кода

修复附件识别和要素提取消费不足问题,项目合并标段问题解决

luojiehua пре 1 година
родитељ
комит
4f0bc3ecb0

+ 16 - 4
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -490,19 +490,31 @@ class BaseDataMonitor():
                                                                             SearchQuery(query,None,True),
                                                                             columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
 
+
         if total_count>=1000:
             _cmd = 'cat %s | grep -c "%s.*merge_project whole_time"'%(flow_dumplicate_log_path,self.get_last_tenmin_time())
             process_count = self.cmd_execute(_cmd)
             atAll = False
             if process_count=="":
                 process_count = 0
+
+            query = BoolQuery(must_queries=[
+                RangeQuery("status",flow_dumplicate_status_from[1]),
+                RangeQuery("opertime",self.get_last_tenmin_time())
+            ])
+
+            rows,next_token,total_count_oper,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                                                                                SearchQuery(query,None,True),
+                                                                                columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
             if int(process_count)==0:
-                atAll = True
+                if total_count_oper==0:
+                    atAll = True
                 _cmd = "echo `tail %s -c 10000k` > %s"%(flow_dumplicate_log_path,flow_dumplicate_log_path)
                 self.cmd_execute(_cmd)
-            if int(process_count)>0 and int(process_count)<100:
-                self.cmd_execute("ps -ef | grep dumplicate | grep -v grep|cut -c 9-15|xargs kill -9")
-            _msg = "数据流报警:待去重公告数为:%d,最近十分钟去重数为:%s"%(total_count,str(process_count))
+            # if int(process_count)>0 and int(process_count)<100:
+            #     self.cmd_execute("ps -ef | grep dumplicate | grep -v grep|cut -c 9-15|xargs kill -9")
+            _msg = "数据流报警:待去重公告数为:%d,最近十分钟日志去重数为:%s,ots去重数为:%s"%(total_count,str(process_count),str(total_count_oper))
             sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
             # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 

+ 4 - 4
BaseDataMaintenance/maintenance/dataflow.py

@@ -4100,7 +4100,7 @@ class Dataflow_dumplicate(Dataflow):
 
     def start_flow_dumplicate(self):
         schedule = BlockingScheduler()
-        schedule.add_job(self.flow_dumplicate,"cron",second="*/10")
+        schedule.add_job(self.flow_dumplicate,"cron",second="*/5")
         schedule.add_job(self.flow_dumpcate_comsumer,"cron",second="*/30")
         schedule.add_job(self.bdm.monitor_dumplicate,"cron",minute="*/10")
         schedule.add_job(self.flow_remove,"cron",hour="20")
@@ -4236,10 +4236,10 @@ if __name__ == '__main__':
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
     a = time.time()
-    df_dump.test_dumplicate(380763870
+    df_dump.test_dumplicate(386161571
                             )
-    # df_dump.test_merge([372841008
-    #                     ],[370595571])
+    # df_dump.test_merge([385521167
+    #                     ],[385521113])
     # df_dump.flow_remove_project_tmp()
     print("takes",time.time()-a)
     # df_dump.fix_doc_which_not_in_project()

+ 1 - 1
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -77,7 +77,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
         self.queue_attachment_ocr = Queue()
         self.queue_attachment_not_ocr = Queue()
-        self.comsumer_count = 120
+        self.comsumer_count = 90
         self.retry_comsumer_count = 10
         self.retry_times = 5
         self.list_attachment_comsumer = []

+ 1 - 1
BaseDataMaintenance/maintenance/dataflow_settings.py

@@ -1,6 +1,6 @@
 
 
-flow_process_count = 3000
+flow_process_count = 600
 
 flow_attachment_status_from = [0,10]
 flow_attachment_status_failed_to = [0,0]

+ 45 - 4
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -2170,7 +2170,7 @@ def update_projects_by_project(project_dict,projects):
     _dict = {}
     #更新公共属性
     for k,v in project_dict.items():
-        if k in (project_project_dynamics,project_page_time,project_product,project_project_codes,project_docids,project_uuid,project_nlp_enterprise,project_nlp_enterprise_attachment):
+        if k in (project_project_dynamics,project_page_time,project_sub_project_name,project_product,project_project_codes,project_docids,project_uuid,project_nlp_enterprise,project_nlp_enterprise_attachment):
             continue
         for _proj in projects:
             if k not in _proj:
@@ -2184,10 +2184,15 @@ def update_projects_by_project(project_dict,projects):
                     elif isinstance(_v,(int,float)):
                         if _v==0:
                             _dict[k] = v
+
     for _proj in projects:
         _proj.update(_dict)
         if _proj.get(project_page_time,"")<project_dict.get(project_page_time,""):
             _proj[project_page_time] = project_dict.get(project_page_time,"")
+        if project_dict.get(project_sub_project_name) is not None and project_dict.get(project_sub_project_name) not in {"","Project"}:
+            if not (_proj.get(project_sub_project_name) is not None and _proj.get(project_sub_project_name) not in {"","Project"}):
+                _proj[project_sub_project_name] = project_dict.get(project_sub_project_name)
+
 
     #拼接属性
     append_dict = {}
@@ -2398,7 +2403,7 @@ def check_zhaozhong_page_time_merge(zhao_biao_page_time,zhong_biao_page_time,zha
         return -1
     return 1
 
-def check_sub_project_name_merge(sub_project_name,sub_project_name_to_merge,b_log):
+def check_sub_project_name_merge(sub_project_name,sub_project_name_to_merge,project_dynamics,project_dynamics_to_merge,b_log,package_number_pattern = re.compile("((包|标[段号的包]|分?包|包组|项目)编?号?[::]?[\((]?[0-9A-Za-z一二三四五六七八九十]{1,4})|(第?[0-9A-Za-z一二三四五六七八九十]{1,4}(包号|标[段号的包]|分?包))")):
     #check sub_project_name
     sub_project_name = str(sub_project_name).replace("Project","")
     sub_project_name_to_merge = str(sub_project_name_to_merge).replace("Project","")
@@ -2409,6 +2414,42 @@ def check_sub_project_name_merge(sub_project_name,sub_project_name_to_merge,b_lo
                 log("check sub_project_name failed %s===%s"%(str(sub_project_name),str(sub_project_name_to_merge)))
             return -1
         return 1
+    if project_dynamics is not None and project_dynamics_to_merge is not None:
+        try:
+            project_dynamics = json.loads(project_dynamics)
+            project_dynamics_to_merge = json.loads(project_dynamics_to_merge)
+            set_title_name = set()
+            set_title_name_to_merge = set()
+            for _d in project_dynamics:
+                _title1 = _d.get(document_doctitle,"")
+                _title_name = None
+                _title_name_search = re.search(package_number_pattern,_title1)
+                if _title_name_search is not None:
+                    _title_name = _title_name_search.group()
+                    _title_name = re.sub("[^0-9A-Za-z一二三四五六七八九十]",'',_title_name)
+                    if _title_name!="":
+                        set_title_name.add(_title_name)
+
+            for _dm in project_dynamics_to_merge:
+
+                _title2 = _dm.get(document_doctitle,"")
+                _title_name = None
+                _title_name_search = re.search(package_number_pattern,_title2)
+                if _title_name_search is not None:
+                    _title_name = _title_name_search.group()
+                    _title_name = re.sub("[^0-9A-Za-z一二三四五六七八九十]",'',_title_name)
+                    if _title_name!="":
+                        set_title_name_to_merge.add(_title_name)
+            if len(set_title_name)>0 and len(set_title_name_to_merge)>0:
+                if len(set_title_name&set_title_name_to_merge)==0:
+                    if b_log:
+                        log("check sub_project_name title set failed %s===%s"%(str(set_title_name),str(set_title_name_to_merge)))
+                    return -1
+                else:
+                    return 1
+        except Exception as e:
+            traceback.print_exc()
+
     return 0
 
 def check_roles_merge(enterprise,enterprise_to_merge,tenderee,tenderee_to_merge,agency,agency_to_merge,win_tenderer,win_tenderer_to_merge,b_log):
@@ -2681,13 +2722,13 @@ def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*300,return_prob=Fa
     check_dict[_time_check] += 1
 
     #时间判断-分包编号
-    _sub_project_name_check = check_sub_project_name_merge(sub_project_name,sub_project_name_to_merge,b_log)
+    _sub_project_name_check = check_sub_project_name_merge(sub_project_name,sub_project_name_to_merge,project_dynamics,project_dynamics_to_merge,b_log)
     if docids==docids_to_merge and _sub_project_name_check==-1:
         if return_prob:
             return False,0
         return False
     check_dict[_sub_project_name_check] += 1
-    prob_count += _sub_project_name_check
+    prob_count += _sub_project_name_check*3
 
     #时间判断-发布时间
     _page_time_check = check_page_time_merge(page_time,page_time_to_merge,b_log,time_limit)