Explorar o código

Merge remote-tracking branch 'origin/master'

znj hai 3 meses
pai
achega
285fa0d1ff

+ 1 - 1
BaseDataMaintenance/dataSource/setttings.py

@@ -68,7 +68,7 @@ activateMQ_pswd = "admin"
 
 activateMQ_ali_host = "172.16.160.72"
 # activateMQ_ali_host = "172.16.147.13"
-# activateMQ_ali_host = "116.62.167.43"
+# activateMQ_ali_host = "8.149.130.129"
 activateMQ_ali_port = 61613
 activateMQ_ali_user = "admin"
 activateMQ_ali_pswd = "admin"

+ 73 - 51
BaseDataMaintenance/interface/project_merge_interface.py

@@ -1,4 +1,7 @@
 
+import sys
+import os
+sys.path.append(os.path.dirname(__file__)+"/../..")
 
 from BaseDataMaintenance.maintenance.dataflow import Dataflow_dumplicate,log
 
@@ -10,6 +13,7 @@ flow = Dataflow_dumplicate(start_delete_listener=False)
 from BaseDataMaintenance.common.Utils import uniform_package_name
 import json
 import re
+import traceback
 
 
 def merge_document_interface(item,b_log=False):
@@ -21,54 +25,67 @@ def merge_document_interface(item,b_log=False):
     :return:
     '''
     try:
-
-        _proj = {
-            "page_time":item.get("page_time"),
-            "project_codes":item.get("project_codes"),
-            "project_name":item.get("project_name"),
-            "tenderee":item.get("tenderee"),
-            "agency":item.get("agency"),
-            "product":item.get("product"),
-            "sub_project_name":item.get("sub_project_name"),
-            "bidding_budget":item.get("bidding_budget"),
-            "win_tenderer":item.get("win_tenderer"),
-            "win_bid_price":item.get("win_bid_price"),
-            "province":item.get("province"),
-            "city":item.get("city"),
-            "district":item.get("district"),
-            "zhao_biao_page_time":item.get("zhao_biao_page_time"),
-            "zhong_biao_page_time":item.get("zhong_biao_page_time"),
-            "enterprise":item.get("enterprise"),
-            "detail_link":item.get("detail_link"),
-            "doctitle":item.get("doctitle"),
-
-        }
-
-        if _proj.get("province"):
-            _proj["province"] = re.sub("省","",str(_proj["province"]))
-        if _proj.get("city"):
-            if len(str(_proj["city"]))>2:
-                _proj["city"] = re.sub("市","",str(_proj["city"]))
-        if _proj.get("district"):
-            if len(str(_proj["district"]))>2:
-                _proj["district"] = re.sub("区|县|镇","",str(_proj["district"]))
-
-        _proj["sub_project_name"] = uniform_package_name(_proj["sub_project_name"])
-
-        enterprise = _proj.get("enterprise","")
-        list_enterprise = enterprise.split(",") if enterprise else []
-        enterprise = {"nlp_enterprise":list_enterprise}
-        _proj["enterprise"] = json.dumps(enterprise,ensure_ascii= False)
-
-        list_projects = flow.merge_projects([_proj],b_log=b_log)
-        if len(list_projects)>0:
-            uuids = list_projects[0].get("uuid","")
-            if uuids:
-                l_uuid = uuids.split(",")
-                if l_uuid:
-                    return l_uuid[0]
+        if isinstance(item,dict):
+            log("merge_document_interface %s"%json.dumps(item,ensure_ascii=False))
+            _proj = {
+                "page_time":item.get("page_time",""),
+                "project_codes":item.get("project_codes",""),
+                "project_name":item.get("project_name",""),
+                "tenderee":item.get("tenderee",""),
+                "agency":item.get("agency",""),
+                "product":item.get("product",""),
+                "sub_project_name":item.get("sub_project_name",""),
+                "bidding_budget":item.get("bidding_budget","0"),
+                "win_tenderer":item.get("win_tenderer",""),
+                "win_bid_price":item.get("win_bid_price","0"),
+                "province":item.get("province",""),
+                "city":item.get("city",""),
+                "district":item.get("district",""),
+                "zhao_biao_page_time":item.get("zhao_biao_page_time",""),
+                "zhong_biao_page_time":item.get("zhong_biao_page_time",""),
+                "enterprise":item.get("enterprise",""),
+                "detail_link":item.get("detail_link",""),
+                "doctitle":item.get("doctitle",""),
+
+            }
+
+            if _proj.get("page_time")=="":
+                raise RuntimeError("page_time参数不能为空")
+
+            if _proj.get("province"):
+                _proj["province"] = re.sub("省","",str(_proj["province"]))
+            if _proj.get("city"):
+                if len(str(_proj["city"]))>2:
+                    _proj["city"] = re.sub("市","",str(_proj["city"]))
+            if _proj.get("district"):
+                if len(str(_proj["district"]))>2:
+                    _proj["district"] = re.sub("区|县|镇","",str(_proj["district"]))
+
+            if _proj.get("bidding_budget"):
+                _proj["bidding_budget"] = float(_proj["bidding_budget"])
+
+            if _proj.get("win_bid_price"):
+                _proj["win_bid_price"] = float(_proj["win_bid_price"])
+
+            _proj["sub_project_name"] = uniform_package_name(_proj["sub_project_name"])
+
+            enterprise = _proj.get("enterprise","")
+            list_enterprise = enterprise.split(",") if enterprise else []
+            enterprise = {"nlp_enterprise":list_enterprise}
+            _proj["enterprise"] = json.dumps(enterprise,ensure_ascii= False)
+
+            list_projects = flow.merge_projects([_proj],b_log=b_log)
+            if len(list_projects)>0:
+                uuids = list_projects[0].get("uuid","")
+                if uuids:
+                    l_uuid = uuids.split(",")
+                    if l_uuid:
+                        return l_uuid[0]
+        else:
+            raise RuntimeError("参数应该以字典方式传递")
     except Exception as e:
-        raise RuntimeError("error on dumplicate")
+        traceback.print_exc()
+        raise RuntimeError("error: %s"%(str(e)))
 
 
 import os
@@ -89,6 +106,7 @@ def embedding():
             project_uuid = ""
         _r["project_uuid"] = project_uuid
     except Exception as e:
+        traceback.print_exc()
         _r["success"] = False
         _r["msg"] = str(e)
 
@@ -97,10 +115,7 @@ def embedding():
 def start_project_merge_server():
     app.run(host="0.0.0.0",port="15010",debug=False)
 
-if __name__ == '__main__':
-
-    # start_project_merge_server()
-
+def test():
     _proj = {
         "page_time":"2025-01-14",
         "project_codes":"SHX-ZB-2024-01013-07",
@@ -130,3 +145,10 @@ if __name__ == '__main__':
     # print(merge_document_interface(_proj,b_log=True))
 
 
+if __name__ == '__main__':
+
+    # start_project_merge_server()
+    test()
+
+
+

+ 57 - 34
BaseDataMaintenance/maintenance/dataflow.py

@@ -3286,47 +3286,57 @@ class Dataflow_dumplicate(Dataflow):
 
         list_projects = []
         if len(list_docid)>0:
+
             list_docs = self.search_docs(list_docid)
+            print("search_docs(list_docid)")
             list_projects = self.generate_projects_from_document(list_docs)
-            list_projects = dumplicate_projects(list_projects)
+            print("generate_projects_from_document")
+            list_projects = dumplicate_projects(list_projects,max_count=20)
+            print("dumplicate_projects")
         list_projects.extend(list_delete_projects)
         project_json = to_project_json(list_projects)
         return project_json
 
 
     def delete_doc_handle(self,_dict,result_queue):
-        headers = _dict.get("frame")
-        conn = _dict.get("conn")
-
-        if headers is not None:
-            message_id = headers.headers["message-id"]
-            body = headers.body
-            item = json.loads(body)
-            docid = item.get("docid")
-            log("==========start delete docid:%s"%(str(docid)))
-            if docid is None:
-                ackMsg(conn,message_id)
-            delete_result = self.delete_projects_by_document(docid)
-
-            log("1")
-            _uuid = uuid4().hex
-            _d = {PROJECT_PROCESS_UUID:_uuid,
-                  PROJECT_PROCESS_CRTIME:1,
-                  PROJECT_PROCESS_PROJECTS:delete_result}
-            _pp = Project_process(_d)
-            log("2")
-            try:
-                if _pp.update_row(self.ots_client):
+
+        try:
+            headers = _dict.get("frame")
+            conn = _dict.get("conn")
+
+            if headers is not None:
+                message_id = headers.headers["message-id"]
+                body = headers.body
+                item = json.loads(body)
+                docid = item.get("docid")
+                log("==========start delete docid:%s"%(str(docid)))
+                if docid is None:
                     ackMsg(conn,message_id)
-            except Exception as e:
-                ackMsg(conn,message_id)
-            log("3")
-            #取消插入结果队列,改成插入project_process表
-            # if send_msg_toacmq(self.pool_mq_ali,delete_result,self.doc_delete_result):
-            #     ackMsg(conn,message_id)
+                delete_result = self.delete_projects_by_document(docid)
+
+                log("1")
+                _uuid = uuid4().hex
+                _d = {PROJECT_PROCESS_UUID:_uuid,
+                      PROJECT_PROCESS_CRTIME:1,
+                      PROJECT_PROCESS_PROJECTS:delete_result}
+                _pp = Project_process(_d)
+                log("2")
+                try:
+                    if _pp.update_row(self.ots_client):
+                        ackMsg(conn,message_id)
+                except Exception as e:
+                    ackMsg(conn,message_id)
+                log("3")
+                #取消插入结果队列,改成插入project_process表
+                # if send_msg_toacmq(self.pool_mq_ali,delete_result,self.doc_delete_result):
+                #     ackMsg(conn,message_id)
+                log("==========end delete docid:%s"%(str(docid)))
+            else:
+                log("has not headers")
+        except Exception as e:
+            traceback.print_exc()
+            ackMsg(conn,message_id)
             log("==========end delete docid:%s"%(str(docid)))
-        else:
-            log("has not headers")
 
     def generate_common_properties(self,list_docs):
         '''
@@ -4030,6 +4040,17 @@ class Dataflow_dumplicate(Dataflow):
             return True
         return False
 
+    def exists_normal_fingerprint(self,_fingerprint):
+        query = BoolQuery(must_queries=[
+            RangeQuery("status",201,301),
+            TermQuery("fingerprint",_fingerprint)
+        ])
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                            SearchQuery(query,get_total_count=True,limit=1))
+        if total_count>0:
+            return True
+        return False
+
 
     def check_page_time(self,item):
         page_time = item.get(document_page_time,"")
@@ -4132,6 +4153,8 @@ class Dataflow_dumplicate(Dataflow):
             final_list = self.dumplicate_fianl_check(base_list,b_log)
 
             exist_finterprint = self.is_exist_fingerprint(final_list,item.get(document_tmp_docid),item.get(document_tmp_fingerprint),table_name)
+            exist_normal_fingerprint = self.exists_normal_fingerprint(item.get(document_tmp_fingerprint))
+            # print("exist_normal_fingerprint",exist_normal_fingerprint)
             # log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
             best_docid = self.get_best_docid(final_list)
 
@@ -4156,7 +4179,7 @@ class Dataflow_dumplicate(Dataflow):
             remove_list = []
 
 
-            if (self.check_page_time(item) and (len(final_list)==0 or best_docid==item.get(document_tmp_docid))) or item.get("update_document","")=="true":
+            if (self.check_page_time(item) and not exist_normal_fingerprint  and (len(final_list)==0 or best_docid==item.get(document_tmp_docid))) or item.get("update_document","")=="true":
                 dtmp.setValue(document_tmp_save,1,True)
                 # dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
                 dmp_docid = ",".join([str(a) for a in list(dup_docid)])
@@ -4587,7 +4610,7 @@ class Dataflow_dumplicate(Dataflow):
         list_dict = getRow_ots(rows)
 
         for item in list_dict:
-            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=True)
+            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
             return
 
     def test_merge(self,list_docid_less,list_docid_greater):
@@ -4763,7 +4786,7 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(583564377
+    df_dump.test_dumplicate(400068967697
                             )
     # compare_dumplicate_check()
     # df_dump.test_merge([391898061

+ 2 - 2
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -2181,7 +2181,7 @@ def appendKeyvalueCount(list_projects,keys=[project_tenderee,project_agency,proj
         _proj["keyvaluecount"] = _count
 
 
-def dumplicate_projects(list_projects,b_log=False):
+def dumplicate_projects(list_projects,b_log=False,max_count=100):
     '''
     对多标段项目进行去重
     :return:
@@ -2189,7 +2189,7 @@ def dumplicate_projects(list_projects,b_log=False):
     appendKeyvalueCount(list_projects)
     list_projects.sort(key=lambda x:str(x.get(project_page_time,"")))
     list_projects.sort(key=lambda x:x.get("keyvaluecount",0),reverse=True)
-    cluster_projects = list_projects[:100]
+    cluster_projects = list_projects[:max_count]
     _count = 10
     log("dumplicate projects rest %d"%len(cluster_projects))
     while _count>0:

+ 1 - 0
BaseDataMaintenance/start_project_merge_server.py

@@ -2,6 +2,7 @@
 
 import sys
 import os
+sys.path.append(os.path.dirname(__file__)+"/..")
 from BaseDataMaintenance.interface.project_merge_interface import start_project_merge_server
 
 if __name__ == '__main__':