Ver código fonte

实时项目合并日志修复、更新字段过滤空值;初始化时放入源类别

luojiehua 2 anos atrás
pai
commit
d5999fc111

+ 1 - 2
.idea/encodings.xml

@@ -1,12 +1,11 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project version="4">
   <component name="Encoding">
-    <file url="file://$PROJECT_DIR$/BaseDataMaintenance/attachmentProcessTime.xlsx" charset="GBK" />
     <file url="file://$PROJECT_DIR$/BaseDataMaintenance/attachmentProcessTime.xlsx" charset="GBK" />
     <file url="file://$PROJECT_DIR$/BaseDataMaintenance/dataSource/searchPaddle.py" charset="GBK" />
     <file url="file://$PROJECT_DIR$/BaseDataMaintenance/maintenance/attachment/2022-01-18_183521_export11.xlsx" charset="GBK" />
     <file url="file://$PROJECT_DIR$/BaseDataMaintenance/model/ots/2022-01-19_214304_export11.xlsx" charset="GBK" />
-    <file url="file://$PROJECT_DIR$/attachmentProcessTime2.xlsx" charset="GBK" />
+    <file url="file://$PROJECT_DIR$/BaseDataMaintenance/model/ots/2022-10-14_190838_数据导出.xlsx" charset="GBK" />
     <file url="file://$PROJECT_DIR$/attachmentProcessTime2.xlsx" charset="GBK" />
     <file url="file://$PROJECT_DIR$/common/Utils.py" charset="GBK" />
     <file url="file://$PROJECT_DIR$/common/multiThread.py" charset="GBK" />

+ 2 - 2
BaseDataMaintenance/dataSource/setttings.py

@@ -64,8 +64,8 @@ activateMQ_port = 61613
 activateMQ_user = "admin"
 activateMQ_pswd = "admin"
 
-# activateMQ_ali_host = "172.16.147.13"
-activateMQ_ali_host = "116.62.167.43"
+activateMQ_ali_host = "172.16.147.13"
+# activateMQ_ali_host = "116.62.167.43"
 activateMQ_ali_port = 61613
 activateMQ_ali_user = "admin"
 activateMQ_ali_pswd = "admin"

+ 52 - 16
BaseDataMaintenance/maintenance/dataflow.py

@@ -2166,7 +2166,7 @@ class Dataflow_dumplicate(Dataflow):
                 continue
             for _j in range(min(_i,5)):
                 _dict2 = base_list[_j]
-                _prob = self.dumplicate_check(_dict1,_dict2,_dict2.get("min_counts",10),b_log=True)
+                _prob = self.dumplicate_check(_dict1,_dict2,_dict2.get("min_counts",10),b_log=False)
                 # print("_prob:",_prob)
                 if _prob<=0.1:
                     _pass = False
@@ -2406,7 +2406,7 @@ class Dataflow_dumplicate(Dataflow):
                     else:
                         if _docid!=item.get(document_tmp_docid):
                             _time1 = time.time()
-                            confidence = self.dumplicate_check(item,_dict,total_count,b_log=True)
+                            confidence = self.dumplicate_check(item,_dict,total_count,b_log=False)
                             check_time+= time.time()-_time1
 
                             _dict["confidence"] = confidence
@@ -2696,7 +2696,7 @@ class Dataflow_dumplicate(Dataflow):
         producer()
         comsumer()
 
-    def search_docs(self,list_docids,columns_to_get = [document_doctitle,document_tmp_save,document_bidway,document_status,document_page_time,document_fingerprint,document_docchannel,document_area,document_province,document_city,document_district,document_tmp_sub_docs_json,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_life_docchannel,document_project_codes,document_product,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count]):
+    def search_docs(self,list_docids,columns_to_get = [document_doctitle,document_tmp_save,document_bidway,document_status,document_page_time,document_info_source,document_fingerprint,document_docchannel,document_life_docchannel,document_area,document_province,document_city,document_district,document_tmp_sub_docs_json,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_project_codes,document_product,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count]):
         '''
         根据docid查询公告内容,先查询document_tmp,再查询document
         :param list_docids:
@@ -2851,6 +2851,8 @@ class Dataflow_dumplicate(Dataflow):
         _dict = {}
         #更新公共属性
         for k,v in project_dict.items():
+            if v is None or v=="":
+                continue
             if k in (project_dynamics,project_product,project_project_codes,project_docids):
                 continue
             for _proj in projects:
@@ -2987,7 +2989,7 @@ class Dataflow_dumplicate(Dataflow):
                 _counts = 0
                 if win_tenderer!="" and bidding_budget!=0:
                     _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
-                dict_package[_key] = _pp
+                    dict_package[_key] = _pp
                 _counts += 1
                 if win_tenderer!="" and  win_bid_price!=0:
                     _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
@@ -3066,7 +3068,7 @@ class Dataflow_dumplicate(Dataflow):
         #计数法选择
         choose_dict = {}
         project_dict = {}
-        for _key in [document_bidway,document_area,document_province,document_city,document_district,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count]:
+        for _key in [document_bidway,document_industry,document_info_type,document_info_source,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count]:
             for _doc in list_docs:
                 _value = _doc.getProperties().get(_key,"")
                 if _value!="":
@@ -3076,6 +3078,36 @@ class Dataflow_dumplicate(Dataflow):
                         choose_dict[_key][_value] = 0
                     choose_dict[_key][_value] += 1
 
+
+        _find = False
+        for _key in [document_district,document_city,document_province,document_area]:
+            area_dict = {}
+            for _doc in list_docs:
+                loc = _doc.getProperties().get(_key,"未知")
+                if loc not in ('全国','未知',"0"):
+                    if loc not in area_dict:
+                        area_dict[loc] = 0
+                    area_dict[loc] += 1
+            list_loc = []
+            for k,v in area_dict.items():
+                list_loc.append([k,v])
+            list_loc.sort(key=lambda x:x[1],reverse=True)
+            if len(list_loc)>0:
+                project_dict[document_district] = _doc.getProperties().get(document_district)
+                project_dict[document_city] = _doc.getProperties().get(document_city)
+                project_dict[document_province] = _doc.getProperties().get(document_province)
+                project_dict[document_area] = _doc.getProperties().get(document_area)
+                _find = True
+                break
+        if not _find:
+            if len(list_docs)>0:
+                project_dict[document_district] = list_docs[0].getProperties().get(document_district)
+                project_dict[document_city] = list_docs[0].getProperties().get(document_city)
+                project_dict[document_province] = list_docs[0].getProperties().get(document_province)
+                project_dict[document_area] = list_docs[0].getProperties().get(document_area)
+
+
+
         print("choose_dict",choose_dict)
         for _key,_value in choose_dict.items():
             _l = []
@@ -3103,7 +3135,7 @@ class Dataflow_dumplicate(Dataflow):
         for _doc in list_docs:
             table_name = _doc.getProperties().get("table_name")
             status = _doc.getProperties().get(document_status,0)
-            _save = _doc.getProperties().get(document_tmp_save,0)
+            _save = _doc.getProperties().get(document_tmp_save,1)
             doctitle = _doc.getProperties().get(document_doctitle,"")
             docchannel = _doc.getProperties().get(document_docchannel)
             page_time = _doc.getProperties().get(document_page_time,"")
@@ -3127,6 +3159,7 @@ class Dataflow_dumplicate(Dataflow):
             if p_page_time=="":
                 p_page_time = page_time
 
+            print("docid %s page_time:%s docchannel %s"%(str(_docid),str(page_time),str(_docchannel)))
             if zhao_biao_page_time=="" and _docchannel in (51,52,102,103,114):
                 zhao_biao_page_time = page_time
             if zhong_biao_page_time=="" and _docchannel in (101,118,119,120):
@@ -3148,7 +3181,7 @@ class Dataflow_dumplicate(Dataflow):
                     remove_docids.add(str(_docid))
             list_dynamics.append({document_docid:_docid,
                                      document_doctitle:doctitle,
-                                     document_docchannel:docchannel,
+                                     document_docchannel:_docchannel,
                                      document_bidway:_bidway,
                                      document_page_time:page_time,
                                      document_status:201 if is_visuable==1 else 401,
@@ -3160,8 +3193,10 @@ class Dataflow_dumplicate(Dataflow):
         project_dict[project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)
         project_dict[project_docid_number] = docid_number
         project_dict[project_docids] = ",".join(list(set(visuable_docids)-remove_docids))
-        project_dict[project_zhao_biao_page_time] = zhao_biao_page_time
-        project_dict[project_zhong_biao_page_time] = zhong_biao_page_time
+        if zhao_biao_page_time !="":
+            project_dict[project_zhao_biao_page_time] = zhao_biao_page_time
+        if zhong_biao_page_time !="":
+            project_dict[project_zhong_biao_page_time] = zhong_biao_page_time
         project_dict[project_project_codes] = ",".join(list(set(list_codes)))
         project_dict[project_page_time] = p_page_time
         project_dict[project_product] = ",".join(list(set(list_product)))
@@ -3213,7 +3248,6 @@ class Dataflow_dumplicate(Dataflow):
             _pp.update(project_dict)
             list_projects.append(_pp)
 
-
         return list_projects
 
     def search_projects_with_document(self,list_docids):
@@ -3411,7 +3445,7 @@ class Dataflow_dumplicate(Dataflow):
             list_query.append([_query,1])
         return list_query
 
-    def check_merge_rule(self,_proj,_dict,b_log=True):
+    def check_merge_rule(self,_proj,_dict,b_log=False):
         page_time = _proj.get(project_page_time,"")
         project_codes = _proj.get(project_project_codes,"")
         project_name = _proj.get(project_project_name,"")
@@ -3568,7 +3602,7 @@ class Dataflow_dumplicate(Dataflow):
             #优先匹配招标金额相近的
             list_merge_data.sort(key=lambda x:x.get(project_bidding_budget,-1))
             for _data in list_merge_data:
-                if self.check_merge_rule(_proj,_data,b_log=True):
+                if self.check_merge_rule(_proj,_data,b_log=False):
                     print("pass",_data)
                     self.update_projects_by_project(_data,[_proj])
 
@@ -3607,8 +3641,8 @@ class Dataflow_dumplicate(Dataflow):
                 _status = _d.get(document_status,201)
                 is_multipack = _d.get("is_multipack",True)
                 extract_count = _d.get(document_tmp_extract_count,0)
-                docchannel = _d.get(document_docchannel)
-                if _status>=201 and _status<=300:
+                docchannel = _d.get(document_docchannel,0)
+                if _status>=201 and _status<=300 and docchannel>0:
                     if docchannel in dict_channel_proj:
                         n_d = dict_channel_proj[docchannel]
                         n_docid = n_d.get(document_docid)
@@ -3880,8 +3914,10 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate()
     # df_dump.start_flow_dumplicate()
-    # df_dump.test_dumplicate(219352381)
-    df_dump.delete_projects_by_document(16288036)
+    a = time.time()
+    df_dump.test_dumplicate(272934158)
+    print("takes",time.time()-a)
+    # df_dump.delete_projects_by_document(16288036)
     # log("=======")
     # for i in range(3):
     #     time.sleep(20)

+ 1 - 0
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -957,6 +957,7 @@ class Dataflow_init(Dataflow):
             body = json.loads(headers.body)
             body[document_tmp_partitionkey] = partitionkey
             body[document_tmp_docid] = next_docid
+            body[document_original_docchannel] = body.get(document_docchannel)
             page_attachments = body.get(document_tmp_attachment_path,"[]")
             _uuid = body.get(document_tmp_uuid,"")
             if page_attachments!="[]":

+ 1 - 1
BaseDataMaintenance/model/ots/BaseModel.py

@@ -88,7 +88,7 @@ class BaseModel():
         try:
             consumed, return_row = ots_client.delete_row(self.table_name, row, None)
         except OTSClientError as e:
-            log("update row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message()))
+            log("update row failed, http_status:%s, error_message:%s" % (str(e.get_http_status()), e.get_error_message()))
             return False
         # 服务端异常,一般为参数错误或者流控错误。
         except OTSServiceError as e:

+ 21 - 4
BaseDataMaintenance/model/ots/document.py

@@ -61,7 +61,7 @@ document_time_registration_end = "time_registration_end"
 document_time_registration_start = "time_registration_start"
 document_time_release = "time_release"
 
-
+document_info_source = "info_source"
 
 
 class Document(BaseModel):
@@ -76,8 +76,8 @@ class Document(BaseModel):
     def getPrimary_keys(self):
         return ["partitionkey","docid"]
 
-    def delete_row(self,ots_client):
-        raise NotImplementedError()
+    # def delete_row(self,ots_client):
+    #     raise NotImplementedError()
 
     def isLegalUrl(self,_url,_type):
         _flag = False
@@ -542,4 +542,21 @@ if __name__=="__main__":
     # turn_extract_status()
     # turn_document_status()
     # drop_extract2()
-    fixDocumentHtml()
+    # fixDocumentHtml()
+    from BaseDataMaintenance.dataSource.source import getConnect_ots
+    from BaseDataMaintenance.dataSource.source import getConnect_ots_capacity
+    ots_client = getConnect_ots()
+    ots_capacity = getConnect_ots_capacity()
+    import pandas as pd
+    df = pd.read_excel("2022-10-14_190838_数据导出.xlsx")
+    _count = 0
+    for _docid in df["docid"]:
+        partitionkey = int(_docid)//500+1
+        _d = {document_partitionkey:partitionkey,
+              document_docid:int(_docid)}
+        _doc = Document(_d)
+        _doc.delete_row(ots_client)
+        _doc.delete_row(ots_capacity)
+        _count += 1
+        print(_docid)
+    print("delete count:%d"%_count)

+ 1 - 1
BaseDataMaintenance/model/ots/project.py

@@ -64,7 +64,7 @@ project_time_registration_start = "time_registration_start"
 project_time_release = "time_release"
 
 project_dup_docid = "dup_docid"
-
+project_info_source = "info_source"
 
 class Project(BaseModel):