|
@@ -1273,6 +1273,20 @@ class Dataflow():
|
|
dict_source_count[_web_source].add(_fingerprint)
|
|
dict_source_count[_web_source].add(_fingerprint)
|
|
if len(dict_source_count[_web_source])>=2:
|
|
if len(dict_source_count[_web_source])>=2:
|
|
to_reverse=True
|
|
to_reverse=True
|
|
|
|
+ # 专项债
|
|
|
|
+ if len(base_list)>0 and base_list[0].get("is_special_bonds")==1:
|
|
|
|
+ for _item in base_list:
|
|
|
|
+ detail_link = _item.get("detail_link")
|
|
|
|
+ detail_link = detail_link.strip() if detail_link else ""
|
|
|
|
+ if "bondId=" in detail_link:
|
|
|
|
+ bondId = detail_link.split("bondId=")[1]
|
|
|
|
+ bondId = bondId.split(",") if bondId else []
|
|
|
|
+ else:
|
|
|
|
+ bondId = []
|
|
|
|
+ _item['bondId_num'] = len(bondId)
|
|
|
|
+ # print([i.get("bondId_num") for i in base_list])
|
|
|
|
+ base_list.sort(key=lambda x:x["bondId_num"],reverse=True)
|
|
|
|
+ return base_list[0]["docid"]
|
|
if len(base_list)>0:
|
|
if len(base_list)>0:
|
|
base_list.sort(key=lambda x:x["docid"],reverse=to_reverse)
|
|
base_list.sort(key=lambda x:x["docid"],reverse=to_reverse)
|
|
base_list.sort(key=lambda x:x.get(document_attachment_extract_status,0),reverse=True)
|
|
base_list.sort(key=lambda x:x.get(document_attachment_extract_status,0),reverse=True)
|
|
@@ -2209,7 +2223,9 @@ class Dataflow_dumplicate(Dataflow):
|
|
def get_dict_time(self,_extract,keys=["time_bidclose","time_bidopen","time_bidstart","time_commencement","time_completion","time_earnestMoneyEnd","time_earnestMoneyStart","time_getFileEnd","time_getFileStart","time_publicityEnd","time_publicityStart","time_registrationEnd","time_registrationStart"]):
|
|
def get_dict_time(self,_extract,keys=["time_bidclose","time_bidopen","time_bidstart","time_commencement","time_completion","time_earnestMoneyEnd","time_earnestMoneyStart","time_getFileEnd","time_getFileStart","time_publicityEnd","time_publicityStart","time_registrationEnd","time_registrationStart"]):
|
|
dict_time = {}
|
|
dict_time = {}
|
|
for k in keys:
|
|
for k in keys:
|
|
- dict_time[k] = _extract.get(k)
|
|
|
|
|
|
+ _time = _extract.get(k)
|
|
|
|
+ _time = _time[:10] if _time else ""
|
|
|
|
+ dict_time[k] = _time
|
|
return dict_time
|
|
return dict_time
|
|
|
|
|
|
|
|
|
|
@@ -2258,6 +2274,15 @@ class Dataflow_dumplicate(Dataflow):
|
|
_dict["dict_time"] = self.get_dict_time(_extract)
|
|
_dict["dict_time"] = self.get_dict_time(_extract)
|
|
_dict["punish"] = _extract.get("punish",{})
|
|
_dict["punish"] = _extract.get("punish",{})
|
|
_dict["approval"] = _extract.get("approval",[])
|
|
_dict["approval"] = _extract.get("approval",[])
|
|
|
|
+
|
|
|
|
+ # 专项债字段
|
|
|
|
+ issue_details = _extract.get("debt_dic",{}).get("issue_details",[])
|
|
|
|
+ _dict["is_special_bonds"] = 1 if _dict.get(document_tmp_docchannel)==302 and _dict.get(document_tmp_web_source_name)=='专项债券信息网' and issue_details else 0
|
|
|
|
+ # 采购意向字段
|
|
|
|
+ if _dict.get("docchannel")==114:
|
|
|
|
+ _dict["demand_info"] = _extract.get("demand_info",{}).get("data",[])
|
|
|
|
+ else:
|
|
|
|
+ _dict["demand_info"] = []
|
|
return _dict
|
|
return _dict
|
|
|
|
|
|
def dumplicate_fianl_check(self,base_list,b_log=False):
|
|
def dumplicate_fianl_check(self,base_list,b_log=False):
|
|
@@ -2371,11 +2396,14 @@ class Dataflow_dumplicate(Dataflow):
|
|
pagetime_stamp_greater = getTimeStamp(page_time_greater)
|
|
pagetime_stamp_greater = getTimeStamp(page_time_greater)
|
|
|
|
|
|
day_dis = abs(pagetime_stamp_greater-pagetime_stamp_less)//86400
|
|
day_dis = abs(pagetime_stamp_greater-pagetime_stamp_less)//86400
|
|
- if day_dis>7:
|
|
|
|
- _prob = 0
|
|
|
|
- elif day_dis>3:
|
|
|
|
- if _prob<0.4:
|
|
|
|
|
|
+ if document_less.get("is_special_bonds",0)==document_greater.get("is_special_bonds",0)==1:
|
|
|
|
+ pass
|
|
|
|
+ else:
|
|
|
|
+ if day_dis>7:
|
|
_prob = 0
|
|
_prob = 0
|
|
|
|
+ elif day_dis>3:
|
|
|
|
+ if _prob<0.4:
|
|
|
|
+ _prob = 0
|
|
|
|
|
|
return _prob,day_dis
|
|
return _prob,day_dis
|
|
|
|
|
|
@@ -2661,7 +2689,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
|
|
|
if table_name in {"document_tmp","document"}:
|
|
if table_name in {"document_tmp","document"}:
|
|
|
|
|
|
- if page_time>=timeAdd(current_date,-7):
|
|
|
|
|
|
+ if page_time>=timeAdd(current_date,-7) and item.get("is_special_bonds")!=1:
|
|
table_name = "document_tmp"
|
|
table_name = "document_tmp"
|
|
table_index = "document_tmp_index"
|
|
table_index = "document_tmp_index"
|
|
base_dict = {
|
|
base_dict = {
|
|
@@ -2891,6 +2919,17 @@ class Dataflow_dumplicate(Dataflow):
|
|
confidence=80
|
|
confidence=80
|
|
_dict = {doctitle_refine_name:doctitle_refine}
|
|
_dict = {doctitle_refine_name:doctitle_refine}
|
|
self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
|
|
self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
|
|
|
|
+ # 专项债
|
|
|
|
+ if item.get("is_special_bonds")==1:
|
|
|
|
+ confidence = 90
|
|
|
|
+ _dict = {doctitle_refine_name: doctitle_refine,
|
|
|
|
+ document_tmp_web_source_name:"专项债券信息网"}
|
|
|
|
+ tmp_base_dict = {
|
|
|
|
+ "docchannel": item["docchannel"],
|
|
|
|
+ "status": [201, 450],
|
|
|
|
+ # "page_time": [timeAdd(page_time, -365), timeAdd(page_time, 365)]
|
|
|
|
+ }
|
|
|
|
+ self.appendRule(list_rules, _dict, tmp_base_dict, must_not_dict, confidence, item, b_log=to_log)
|
|
|
|
|
|
|
|
|
|
confidence=70
|
|
confidence=70
|
|
@@ -2900,7 +2939,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
|
|
|
return list_rules,table_name,table_index
|
|
return list_rules,table_name,table_index
|
|
|
|
|
|
- def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type]):
|
|
|
|
|
|
+ def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,"detail_link"]):
|
|
q_size = self.queue_dumplicate.qsize()
|
|
q_size = self.queue_dumplicate.qsize()
|
|
log("dumplicate queue size %d"%(q_size))
|
|
log("dumplicate queue size %d"%(q_size))
|
|
|
|
|
|
@@ -4424,7 +4463,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
singleNum_keys = _rule["singleNum_keys"]
|
|
singleNum_keys = _rule["singleNum_keys"]
|
|
contain_keys = _rule["contain_keys"]
|
|
contain_keys = _rule["contain_keys"]
|
|
multiNum_keys = _rule["multiNum_keys"]
|
|
multiNum_keys = _rule["multiNum_keys"]
|
|
- self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document],b_log=b_log)
|
|
|
|
|
|
+ self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document,document_tmp_web_source_name,'detail_link'],b_log=b_log)
|
|
_i += step
|
|
_i += step
|
|
|
|
|
|
|
|
|
|
@@ -4874,12 +4913,13 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
|
|
|
def test_dumplicate(self,docid):
|
|
def test_dumplicate(self,docid):
|
|
# columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
|
|
# columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
|
|
- columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type]
|
|
|
|
|
|
+ columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,'detail_link']
|
|
|
|
+ # print('columns',columns)
|
|
item = self.get_attrs_before_dump(docid,columns)
|
|
item = self.get_attrs_before_dump(docid,columns)
|
|
|
|
|
|
if item:
|
|
if item:
|
|
log("start dumplicate_comsumer_handle")
|
|
log("start dumplicate_comsumer_handle")
|
|
- self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=True)
|
|
|
|
|
|
+ self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=False)
|
|
return
|
|
return
|
|
|
|
|
|
def test_merge(self,list_docid_less,list_docid_greater):
|
|
def test_merge(self,list_docid_less,list_docid_greater):
|
|
@@ -5118,7 +5158,7 @@ if __name__ == '__main__':
|
|
# test_attachment_interface()
|
|
# test_attachment_interface()
|
|
df_dump = Dataflow_dumplicate(start_delete_listener=False)
|
|
df_dump = Dataflow_dumplicate(start_delete_listener=False)
|
|
# df_dump.start_flow_dumplicate()
|
|
# df_dump.start_flow_dumplicate()
|
|
- df_dump.test_dumplicate(613075691
|
|
|
|
|
|
+ df_dump.test_dumplicate(400075415256
|
|
)
|
|
)
|
|
# df_dump.dumplicate_comsumer_handle_interface(603504420,document_table="document_0000",document_table_index="document_0000_index",project_table="project_0000",project_table_index="project_0000_index_formerge")
|
|
# df_dump.dumplicate_comsumer_handle_interface(603504420,document_table="document_0000",document_table_index="document_0000_index",project_table="project_0000",project_table_index="project_0000_index_formerge")
|
|
# compare_dumplicate_check()
|
|
# compare_dumplicate_check()
|