|
@@ -2213,6 +2213,24 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
return dict_time
|
|
|
|
|
|
|
|
|
+ def get_attrs_before_dump(self,docid,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type]):
|
|
|
+
|
|
|
+ bool_query = BoolQuery(must_queries=[
|
|
|
+ TermQuery("docid",docid)
|
|
|
+ ])
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
|
|
|
+ SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
|
|
|
+ ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
|
|
|
+ log("flow_dumplicate producer total_count:%d"%total_count)
|
|
|
+ if total_count==0:
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
|
|
|
+ SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
|
|
|
+ ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
|
|
|
+ list_dict = getRow_ots(rows)
|
|
|
+ if len(list_dict)>0:
|
|
|
+ return self.post_extract(list_dict[0])
|
|
|
+
|
|
|
+
|
|
|
def post_extract(self,_dict):
|
|
|
win_tenderer,bidding_budget,win_bid_price,_ = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
|
|
|
_dict["win_tenderer"] = win_tenderer
|
|
@@ -2240,6 +2258,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
_dict["dict_time"] = self.get_dict_time(_extract)
|
|
|
_dict["punish"] = _extract.get("punish",{})
|
|
|
_dict["approval"] = _extract.get("approval",[])
|
|
|
+ return _dict
|
|
|
|
|
|
def dumplicate_fianl_check(self,base_list,b_log=False):
|
|
|
the_group = base_list
|
|
@@ -2299,7 +2318,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
district_less = document_less.get("district")
|
|
|
moneys_less = document_less.get("moneys")
|
|
|
moneys_attachment_less = document_less.get("moneys_attachment")
|
|
|
- page_attachments_less = document_less.get(document_tmp_attachment_path,"[]")
|
|
|
+ page_attachments_less = document_less.get("page_attachments","[]")
|
|
|
punish_less = document_less.get("punish",{})
|
|
|
approval_less = document_less.get("approval",[])
|
|
|
source_type_less = document_less.get("source_type")
|
|
@@ -2330,7 +2349,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
|
|
|
moneys_greater = document_greater.get("moneys")
|
|
|
moneys_attachment_greater = document_greater.get("moneys_attachment")
|
|
|
- page_attachments_greater = document_greater.get(document_tmp_attachment_path,"[]")
|
|
|
+ page_attachments_greater = document_greater.get("page_attachments","[]")
|
|
|
|
|
|
punish_greater = document_greater.get("punish",{})
|
|
|
approval_greater = document_greater.get("approval",[])
|
|
@@ -2341,7 +2360,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
hard_level=2
|
|
|
|
|
|
if self.check_rule==1:
|
|
|
- _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater,moneys_less=moneys_less,moneys_greater=moneys_greater,moneys_attachment_less=moneys_attachment_less,moneys_attachment_greater=moneys_attachment_greater,page_attachments_less=page_attachments_less,page_attachments_greater=page_attachments_greater,punish_less=punish_less,punish_greater=punish_greater,approval_less=approval_less,approval_greater=approval_greater,source_type_less=source_type_less,source_type_greater=source_type_greater)
|
|
|
+ _prob = check_dumplicate_rule(document_less,document_greater,min_counts,b_log=b_log,hard_level=hard_level)
|
|
|
else:
|
|
|
_prob = check_dumplicate_rule_test(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater)
|
|
|
|
|
@@ -3937,10 +3956,89 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
assert 1==2
|
|
|
|
|
|
|
|
|
+ def dumplicate_document_in_merge(self,list_projects,dup_docid,_docid,_docchannel,b_log=False):
|
|
|
+ '''
|
|
|
+ 合并时去重
|
|
|
+ :param list_projects:
|
|
|
+ :return:
|
|
|
+ '''
|
|
|
+
|
|
|
+ dup_docid = set([str(a) for a in dup_docid])
|
|
|
+ set_dup_total = set()
|
|
|
+ docid_item = self.get_attrs_before_dump(_docid)
|
|
|
+ best_docid = None
|
|
|
+ for _proj in list_projects:
|
|
|
+ try:
|
|
|
+ docids = _proj.get(project_docids,"")
|
|
|
+ set_docids = set([a for a in docids.split(",") if a!=""])
|
|
|
+ _project_dynamics = _proj.get(project_project_dynamics,"[]")
|
|
|
+ list_dynamics = json.loads(_project_dynamics)
|
|
|
+ set_dup_docid = set()
|
|
|
+ list_dup_result = [(_docid,docid_item.get("extract_count"))]
|
|
|
+ log("=========%s---%s"%(str(set_docids),str(_docid)))
|
|
|
+ if str(_docid) in set_docids:
|
|
|
+ list_to_dup_docid = []
|
|
|
+ for _d in list_dynamics:
|
|
|
+ docid = _d.get(document_docid)
|
|
|
+ doctitle = _d.get(document_doctitle,"")
|
|
|
+ docchannel = _d.get(document_docchannel,0)
|
|
|
+ status = _d.get(document_status,0)
|
|
|
+ if status>=401:
|
|
|
+ continue
|
|
|
+ if str(docid) not in set_docids:
|
|
|
+ continue
|
|
|
+ if str(docid) in dup_docid:
|
|
|
+ continue
|
|
|
+ if docchannel!=_docchannel:
|
|
|
+ continue
|
|
|
+ if docid==_docid:
|
|
|
+ continue
|
|
|
+ list_to_dup_docid.append(_d)
|
|
|
+ for _d in list_to_dup_docid:
|
|
|
+ docid = _d.get(document_docid)
|
|
|
+ _item = self.get_attrs_before_dump(docid)
|
|
|
+ _prob = check_dumplicate_rule(docid_item,_item,5,b_log=b_log)
|
|
|
+ log("dumplicate_document_in_merge %s-%s prob %.2f"%(str(_docid),str(docid),_prob))
|
|
|
+ if _prob>0.4:
|
|
|
+ docid = int(docid)
|
|
|
+ _d = {"partitionkey":docid%500+1,
|
|
|
+ "docid":docid,
|
|
|
+ }
|
|
|
+ _doc = Document(_d)
|
|
|
+
|
|
|
+ if _doc.fix_columns(self.ots_client,[document_page_time,document_update_document],True):
|
|
|
+ if _doc.getProperties().get(document_update_document,"")!="true":
|
|
|
+ list_dup_result.append((docid,_item.get("extract_count")))
|
|
|
+
|
|
|
+ list_dup_result.sort(key=lambda x:x[0])
|
|
|
+ list_dup_result.sort(key=lambda x:x[1],reverse=True)
|
|
|
+ if len(list_dup_result)>0:
|
|
|
+ best_docid = list_dup_result[0][0]
|
|
|
+ for _d in list_dup_result[1:]:
|
|
|
+ set_dup_docid.add(str(_d[0]))
|
|
|
+ for _dynamic in list_dynamics:
|
|
|
+ if _dynamic.get(document_docid) in set_dup_docid:
|
|
|
+ _dynamic[document_status] = 401
|
|
|
+
|
|
|
+ set_docids = set_docids-set_dup_docid-dup_docid
|
|
|
+ set_dup_total |= set_dup_docid
|
|
|
+ if len(set_docids)==0:
|
|
|
+ print(set_dup_docid,dup_docid)
|
|
|
+ log("projects set_docids length is zero %s"%(docids))
|
|
|
+ return None,None
|
|
|
+ else:
|
|
|
+ _proj[project_docids] = ",".join(list(set_docids))
|
|
|
+ _proj[project_project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)
|
|
|
+ _proj[project_docid_number] = len(set_docids)
|
|
|
+ _proj[project_dup_docid] = ",".join(list(set_dup_docid))
|
|
|
|
|
|
|
|
|
+ # log("dumplicate_document docid%s dynamic %d takes%.3f"%(str(docid),len(list_dynamics),time.time()-_time))
|
|
|
|
|
|
+ except Exception as e:
|
|
|
+ traceback.print_exc()
|
|
|
|
|
|
+ return best_docid,list(set_dup_total)
|
|
|
|
|
|
|
|
|
def merge_document_real(self,item,dup_docid,table_name,save,status_to=None,b_log=False):
|
|
@@ -3955,6 +4053,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
list_docids = []
|
|
|
_docid = item.get(document_tmp_docid)
|
|
|
list_docids.append(_docid)
|
|
|
+ print("dup_docid",dup_docid)
|
|
|
if save==0:
|
|
|
dup_docid.insert(0,_docid)
|
|
|
if isinstance(dup_docid,list):
|
|
@@ -3984,7 +4083,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
# log("merge projects takes:%.3f"%(time.time()-_time))
|
|
|
|
|
|
_time = time.time()
|
|
|
- list_merge_dump = dumplicate_document_in_merge(list_projects,dup_docid[:-1])
|
|
|
+ best_docid,list_merge_dump = self.dumplicate_document_in_merge(list_projects,dup_docid,_docid,item.get(document_docchannel),b_log=b_log)
|
|
|
# log("dumplicate document %d takes:%.3f"%(len(list_projects),time.time()-_time))
|
|
|
|
|
|
if list_merge_dump is None:
|
|
@@ -3992,31 +4091,11 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
|
|
|
_time = time.time()
|
|
|
|
|
|
- projects = list_projects
|
|
|
- for _proj in projects:
|
|
|
- dup_docid = _proj.get(project_dup_docid,"")
|
|
|
- list_dup_docid = dup_docid.split(",")
|
|
|
- new_dup_docid = []
|
|
|
- for _docid in list_dup_docid:
|
|
|
- if _docid=="":
|
|
|
- continue
|
|
|
- docid = int(_docid)
|
|
|
- _d = {"partitionkey":docid%500+1,
|
|
|
- "docid":docid,
|
|
|
- }
|
|
|
- _doc = Document(_d)
|
|
|
-
|
|
|
- if _doc.fix_columns(self.ots_client,[document_update_document],True):
|
|
|
- if _doc.getProperties().get(document_update_document,"")!="true":
|
|
|
- new_dup_docid.append(str(docid))
|
|
|
- _proj[project_dup_docid] = ",".join(new_dup_docid)
|
|
|
- list_projects = projects
|
|
|
-
|
|
|
project_json = to_project_json(list_projects)
|
|
|
# log("json projects takes:%.3f"%(time.time()-_time))
|
|
|
if b_log:
|
|
|
log("project_json:%s"%project_json)
|
|
|
- return project_json,list_merge_dump
|
|
|
+ return project_json,best_docid,list_merge_dump
|
|
|
except Exception as e:
|
|
|
raise RuntimeError("error on dumplicate")
|
|
|
|
|
@@ -4040,11 +4119,15 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
- def exists_normal_fingerprint(self,_fingerprint):
|
|
|
+ def exists_normal_fingerprint(self,_fingerprint,docid):
|
|
|
query = BoolQuery(must_queries=[
|
|
|
RangeQuery("status",201,301),
|
|
|
TermQuery("fingerprint",_fingerprint)
|
|
|
- ])
|
|
|
+ ],
|
|
|
+ must_not_queries=[
|
|
|
+ TermQuery("docid",docid)
|
|
|
+ ]
|
|
|
+ )
|
|
|
rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
|
|
|
SearchQuery(query,get_total_count=True,limit=1))
|
|
|
if total_count>0:
|
|
@@ -4085,21 +4168,22 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
if web_source_name in set_web_source and bidclose_time<page_time:
|
|
|
return False
|
|
|
|
|
|
- log("check page_time has_before %s has_after %s"%(str(has_before),str(has_after)))
|
|
|
+ log("%s check page_time has_before %s has_after %s"%(str(item.get(document_docid)),str(has_before),str(has_after)))
|
|
|
if has_before:
|
|
|
_query = BoolQuery(must_queries=[MatchPhraseQuery(document_doctitle,item.get(document_doctitle,""))],
|
|
|
must_not_queries=[TermQuery(document_docid,item.get(document_docid,0))])
|
|
|
if not has_after:
|
|
|
- log("check page_time false %s==%s-%s"%(l_page_time,k,v))
|
|
|
|
|
|
rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
|
|
|
SearchQuery(_query,get_total_count=True,limit=1))
|
|
|
if total_count>0:
|
|
|
+ log("%s check page_time false %s==%s-%s"%(str(item.get(document_docid)),l_page_time,k,v))
|
|
|
return False
|
|
|
if item.get(document_web_source_name,"")=="中国政府采购网":
|
|
|
rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
|
|
|
SearchQuery(_query,get_total_count=True,limit=1))
|
|
|
if total_count>0:
|
|
|
+ log("%s check 中国政府采购网 false "%(str(item.get(document_docid))))
|
|
|
return False
|
|
|
|
|
|
return True
|
|
@@ -4153,7 +4237,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
final_list = self.dumplicate_fianl_check(base_list,b_log)
|
|
|
|
|
|
exist_finterprint = self.is_exist_fingerprint(final_list,item.get(document_tmp_docid),item.get(document_tmp_fingerprint),table_name)
|
|
|
- exist_normal_fingerprint = self.exists_normal_fingerprint(item.get(document_tmp_fingerprint))
|
|
|
+ exist_normal_fingerprint = self.exists_normal_fingerprint(item.get(document_tmp_fingerprint),item.get(document_tmp_docid))
|
|
|
# print("exist_normal_fingerprint",exist_normal_fingerprint)
|
|
|
# log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
|
|
|
best_docid = self.get_best_docid(final_list)
|
|
@@ -4187,6 +4271,8 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
if _dict.get(document_tmp_docid) in dup_docid:
|
|
|
remove_list.append(_dict)
|
|
|
else:
|
|
|
+ if exist_normal_fingerprint:
|
|
|
+ log("%s has exist_normal_fingerprint"%(str(item.get(document_docid))))
|
|
|
dtmp.setValue(document_tmp_save,0,True)
|
|
|
if best_docid in dup_docid:
|
|
|
dup_docid.remove(best_docid)
|
|
@@ -4214,11 +4300,26 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
|
|
|
dtmp.setValue(document_tmp_projects,"[]",True)
|
|
|
else:
|
|
|
- project_json,list_merge_dump = self.merge_document_real(item,list_docids,table_name,dtmp.getProperties().get(document_tmp_save),flow_dumplicate_status_to,b_log)
|
|
|
+ project_json,merge_best_docid,list_merge_dump = self.merge_document_real(item,list_docids[:-1],table_name,dtmp.getProperties().get(document_tmp_save),flow_dumplicate_status_to,b_log)
|
|
|
|
|
|
+ if merge_best_docid is not None and (best_docid is None or best_docid==item.get(document_tmp_docid)):
|
|
|
+ best_docid = merge_best_docid
|
|
|
|
|
|
if list_merge_dump is not None and str(item.get(document_tmp_docid)) in list_merge_dump and item.get("update_document","")!="true":
|
|
|
dtmp.setValue(document_tmp_save,0,True)
|
|
|
+
|
|
|
+ if dmp_docid=="":
|
|
|
+ if best_docid is not None:
|
|
|
+ dmp_docid = "%s,%s"%(str(best_docid),",".join([str(a) for a in list_merge_dump]))
|
|
|
+ else:
|
|
|
+ dmp_docid = "%s,%s"%(dmp_docid,",".join([str(a) for a in list_merge_dump]))
|
|
|
+ elif dtmp.getProperties().get(document_tmp_save)==1:
|
|
|
+ if dmp_docid=="":
|
|
|
+ dmp_docid = "%s"%(",".join([str(a) for a in list_docids]))
|
|
|
+ else:
|
|
|
+ dmp_docid = "%s,%s"%(dmp_docid,",".join([str(a) for a in list_docids]))
|
|
|
+
|
|
|
+
|
|
|
dtmp.setValue(document_tmp_projects,project_json,True)
|
|
|
log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
|
|
|
|
|
@@ -4414,21 +4515,11 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
def test_dumplicate(self,docid):
|
|
|
# columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
|
|
|
columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type]
|
|
|
- bool_query = BoolQuery(must_queries=[
|
|
|
- TermQuery("docid",docid)
|
|
|
- ])
|
|
|
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
|
|
|
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
|
|
|
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
|
|
|
- log("flow_dumplicate producer total_count:%d"%total_count)
|
|
|
- if total_count==0:
|
|
|
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
|
|
|
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
|
|
|
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
|
|
|
- list_dict = getRow_ots(rows)
|
|
|
+ item = self.get_attrs_before_dump(docid,columns)
|
|
|
|
|
|
- for item in list_dict:
|
|
|
- self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
|
|
|
+ if item:
|
|
|
+ log("start dumplicate_comsumer_handle")
|
|
|
+ self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=False)
|
|
|
return
|
|
|
|
|
|
def test_merge(self,list_docid_less,list_docid_greater):
|
|
@@ -4604,7 +4695,7 @@ if __name__ == '__main__':
|
|
|
# test_attachment_interface()
|
|
|
df_dump = Dataflow_dumplicate(start_delete_listener=False)
|
|
|
# df_dump.start_flow_dumplicate()
|
|
|
- df_dump.test_dumplicate(400068967697
|
|
|
+ df_dump.test_dumplicate(597760933
|
|
|
)
|
|
|
# compare_dumplicate_check()
|
|
|
# df_dump.test_merge([391898061
|