Переглянути джерело

拟在建完善竣工流程,去重增加针对网站提升严格度

luojiehua 2 роки тому
батько
коміт
5e30502e1d

+ 4 - 4
BaseDataMaintenance/dataSource/setttings.py

@@ -43,10 +43,10 @@ oracle_host = "192.168.0.150"
 oracle_port = 1522
 oracle_port = 1522
 # oracle_user = "bxkc_data_readonly"
 # oracle_user = "bxkc_data_readonly"
 # oracle_pass = "P7WUrgcz0@#j8pjg"
 # oracle_pass = "P7WUrgcz0@#j8pjg"
-# oracle_user = "bxkc_data"
-# oracle_pass = "Z0rTLHo@nIu5Zk1Z"
-oracle_user = "bxkc_db"
-oracle_pass = "TKVF#3idC4UQlDVy"
+oracle_user = "bxkc_write"
+oracle_pass = "aBrTKNl9SaPk@Yy3"
+# oracle_user = "bxkc_db"
+# oracle_pass = "TKVF#3idC4UQlDVy"
 oracle_db = "yanphone"
 oracle_db = "yanphone"
 
 
 ots_AccessKeyId = 'LTAI5tFuoxHm8Uxrr5nT8wTZ'
 ots_AccessKeyId = 'LTAI5tFuoxHm8Uxrr5nT8wTZ'

+ 13 - 4
BaseDataMaintenance/maintenance/dataflow.py

@@ -2230,7 +2230,7 @@ class Dataflow_dumplicate(Dataflow):
             if fingerprint_less==base_fingerprint:
             if fingerprint_less==base_fingerprint:
                 _index = _i
                 _index = _i
                 continue
                 continue
-            for _j in range(min(_i,5)):
+            for _j in range(min(_i,10)):
                 _dict2 = base_list[_j]
                 _dict2 = base_list[_j]
                 _prob = self.dumplicate_check(_dict1,_dict2,_dict2.get("min_counts",10),b_log=False)
                 _prob = self.dumplicate_check(_dict1,_dict2,_dict2.get("min_counts",10),b_log=False)
                 # print("_prob:",_prob)
                 # print("_prob:",_prob)
@@ -2277,6 +2277,7 @@ class Dataflow_dumplicate(Dataflow):
         project_name_less = document_less["project_name"]
         project_name_less = document_less["project_name"]
         fingerprint_less = document_less["fingerprint"]
         fingerprint_less = document_less["fingerprint"]
         extract_count_less = document_less["extract_count"]
         extract_count_less = document_less["extract_count"]
+        web_source_no_less = document_less.get("web_source_no")
 
 
         document_greater = _dict2
         document_greater = _dict2
         docid_greater = _dict2["docid"]
         docid_greater = _dict2["docid"]
@@ -2296,8 +2297,13 @@ class Dataflow_dumplicate(Dataflow):
         project_name_greater = document_greater["project_name"]
         project_name_greater = document_greater["project_name"]
         fingerprint_greater = document_greater["fingerprint"]
         fingerprint_greater = document_greater["fingerprint"]
         extract_count_greater = document_greater["extract_count"]
         extract_count_greater = document_greater["extract_count"]
+        web_source_no_greater = document_greater.get("web_source_no")
 
 
-        return check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,min_counts,b_log=b_log)
+        hard_level=1
+        if web_source_no_less==web_source_no_greater=="17397-3":
+            hard_level=2
+
+        return check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,min_counts,b_log=b_log,hard_level=hard_level)
 
 
 
 
     def dumplicate_check_bak(self,_dict1,_dict2,min_counts,b_log=False):
     def dumplicate_check_bak(self,_dict1,_dict2,min_counts,b_log=False):
@@ -2531,7 +2537,10 @@ class Dataflow_dumplicate(Dataflow):
                             _dict["confidence"] = confidence
                             _dict["confidence"] = confidence
                             _dict["min_counts"] = total_count
                             _dict["min_counts"] = total_count
 
 
-                            list_data.append(_dict)
+                            print("check====",item.get("docid"),_dict.get("docid"),confidence)
+
+                            if not confidence<0.1:
+                                list_data.append(_dict)
                 all_time = time.time()-_time
                 all_time = time.time()-_time
                 # log("check:%d rows takes%.4f,check%.4f"%(len(list_dict),all_time-check_time,check_time))
                 # log("check:%d rows takes%.4f,check%.4f"%(len(list_dict),all_time-check_time,check_time))
                 return list_data
                 return list_data
@@ -4130,7 +4139,7 @@ if __name__ == '__main__':
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
     # df_dump.start_flow_dumplicate()
     a = time.time()
     a = time.time()
-    df_dump.test_dumplicate(324785921)
+    df_dump.test_dumplicate(326288275)
     # df_dump.test_merge([292315564],[287890754])
     # df_dump.test_merge([292315564],[287890754])
     # df_dump.flow_remove_project_tmp()
     # df_dump.flow_remove_project_tmp()
     print("takes",time.time()-a)
     print("takes",time.time()-a)

+ 60 - 3
BaseDataMaintenance/maintenance/proposedBuilding/DataSynchronization.py

@@ -146,6 +146,59 @@ class DataSynchronization():
                 _proposed.update_row(ots_client)
                 _proposed.update_row(ots_client)
             _count += len(list_data)
             _count += len(list_data)
 
 
+    def fix_progress(self):
+        ots_client = self.ots_client
+        current_date = getCurrent_date("%Y-%m-%d")
+
+        bool_query = BoolQuery(must_queries=[RangeQuery("latest_service_time",range_to="1001"),
+                                             TermQuery("progress","竣工阶段")])
+        columns = ["latest_service_time","json_list_group"]
+        task_queue = queue.Queue()
+        rows, next_token, total_count, is_all_succeed = ots_client.search(self.designed_project_table, self.designed_project_table_index,
+                                                                          SearchQuery(bool_query ,sort=Sort(sorters=[FieldSort("crtime",SortOrder.DESC)]), limit=100, get_total_count=True),
+                                                                          ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+
+
+        list_data = getRow_ots(rows)
+        for _data in list_data:
+            if _data.get("latest_service_time") is not None:
+                _proposed = proposedBuilding_tmp(_data)
+                task_queue.put(_proposed,True)
+                break
+        print(list_data)
+        print("total_count",total_count)
+        while next_token:
+            rows, next_token, total_count, is_all_succeed = ots_client.search(self.designed_project_table, self.designed_project_table_index,
+                                                                              SearchQuery(bool_query ,next_token=next_token, limit=100, get_total_count=True),
+                                                                              ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+            list_data = getRow_ots(rows)
+            for _data in list_data:
+                if _data.get("latest_service_time") is not None:
+                    _proposed = proposedBuilding_tmp(_data)
+                    task_queue.put(_proposed,True)
+        print("task_queue size %d"%(task_queue.qsize()))
+
+        def _handle(_proposed,result_queue):
+            try:
+                _time = time.time()
+                _project_dict = _proposed.toDesigned_project(ots_client)
+                log("toDesigned_project takes %.2fs"%(time.time()-_time))
+                _time = time.time()
+                if _project_dict is not None:
+                    #更新数据
+                    log("project_dict:"+json.dumps(_project_dict,ensure_ascii=False))
+                    _designed_project = designed_project(_project_dict)
+                    _designed_project.update_project(ots_client)
+                    log("update desined takes %.2fs"%(time.time()-_time))
+
+            except Exception as e:
+                log("comsumer failed cause of %s"%(str(e)))
+                log("proposed:%s"%(str(_proposed)))
+                log(traceback.format_exc())
+
+        mt=MultiThreadHandler(task_queue,_handle,None,30)
+        mt.run()
+
     def scheduler(self):
     def scheduler(self):
         _scheduler = BlockingScheduler()
         _scheduler = BlockingScheduler()
         _scheduler.add_job(self.maxcompute2ots,"cron",minute="*/1")
         _scheduler.add_job(self.maxcompute2ots,"cron",minute="*/1")
@@ -156,10 +209,14 @@ def startSychro():
     ds = DataSynchronization()
     ds = DataSynchronization()
     ds.scheduler()
     ds.scheduler()
 
 
+
+
+
 if __name__=="__main__":
 if __name__=="__main__":
     ds = DataSynchronization()
     ds = DataSynchronization()
-    # ds.scheduler()
-    # ds.maxcompute2ots()
-    ds.turn_stage()
+    # # ds.scheduler()
+    # # ds.maxcompute2ots()
+    # ds.turn_stage()
+    ds.fix_progress()
 
 
 
 

+ 3 - 1
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -1028,7 +1028,7 @@ def check_time(json_time_less,json_time_greater):
                         return False
                         return False
     return True
     return True
 
 
-def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,min_counts,b_log=False):
+def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,min_counts,b_log=False,hard_level=1):
     if fingerprint_less==fingerprint_greater and getLength(fingerprint_less)>0:
     if fingerprint_less==fingerprint_greater and getLength(fingerprint_less)>0:
         return 1
         return 1
     if isinstance(project_codes_less,str):
     if isinstance(project_codes_less,str):
@@ -1187,6 +1187,8 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
         else:
         else:
             check_result["time"] = 1
             check_result["time"] = 1
 
 
+    if hard_level==2 and check_result["product"]<=1:
+        return 0
     if check_result.get("pass",0)==0:
     if check_result.get("pass",0)==0:
         if b_log:
         if b_log:
             logging.info(str(check_result))
             logging.info(str(check_result))

+ 1 - 0
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -1733,6 +1733,7 @@ def generate_projects(list_docs):
     project_dict = generate_common_properties(list_docs)
     project_dict = generate_common_properties(list_docs)
 
 
     list_package_properties = generate_packages_properties(list_docs)
     list_package_properties = generate_packages_properties(list_docs)
+
     #生成包数据
     #生成包数据
     for _pp in list_package_properties:
     for _pp in list_package_properties:
         _pp.update(project_dict)
         _pp.update(project_dict)

Різницю між файлами не показано, бо вона завелика
+ 4 - 0
BaseDataMaintenance/model/ots/proposedBuilding_tmp.py


Деякі файли не було показано, через те що забагато файлів було змінено