|
@@ -2214,7 +2214,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
_dict["project_name"] = _extract.get("name","")
|
|
|
_dict["dict_time"] = self.get_dict_time(_extract)
|
|
|
|
|
|
- def dumplicate_fianl_check(self,base_list):
|
|
|
+ def dumplicate_fianl_check(self,base_list,b_log=False):
|
|
|
the_group = base_list
|
|
|
the_group.sort(key=lambda x:x["confidence"],reverse=True)
|
|
|
|
|
@@ -2232,17 +2232,16 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
continue
|
|
|
for _j in range(min(_i,10)):
|
|
|
_dict2 = base_list[_j]
|
|
|
- _prob = self.dumplicate_check(_dict1,_dict2,_dict2.get("min_counts",10),b_log=False)
|
|
|
- # print("_prob:",_prob)
|
|
|
+ _prob = self.dumplicate_check(_dict1,_dict2,_dict1.get("min_counts",10),b_log=b_log)
|
|
|
+ print("_prob:",_prob)
|
|
|
if _prob<=0.1:
|
|
|
_pass = False
|
|
|
break
|
|
|
- log("checking index:%d"%(_i))
|
|
|
+ log("checking index:%d %s %.2f"%(_i,str(_pass),_prob))
|
|
|
_index = _i
|
|
|
if not _pass:
|
|
|
_index -= 1
|
|
|
break
|
|
|
-
|
|
|
if _index>=1:
|
|
|
# #对重复入库的进行去重
|
|
|
# _l = the_group[:_index+1]
|
|
@@ -2258,7 +2257,8 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
return the_group[:_index+1]
|
|
|
return []
|
|
|
|
|
|
- def dumplicate_check(self,_dict1,_dict2,min_counts,b_log=True):
|
|
|
+ def dumplicate_check(self,_dict1,_dict2,min_counts,b_log=False):
|
|
|
+ b_log=True
|
|
|
document_less = _dict1
|
|
|
docid_less = _dict1["docid"]
|
|
|
docchannel_less = document_less["docchannel"]
|
|
@@ -2370,7 +2370,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
same_count += 1
|
|
|
if getLength(project_name_less)>0 and project_name_less==project_name_greater:
|
|
|
same_count += 1
|
|
|
- if getLength(doctitle_refine_less)>0 and doctitle_refine_less==doctitle_refine_greater:
|
|
|
+ if getLength(doctitle_refine_less)>0 and (doctitle_refine_less==doctitle_refine_greater or doctitle_refine_less in doctitle_refine_greater or doctitle_refine_greater in doctitle_refine_less):
|
|
|
same_count += 1
|
|
|
base_prob = 0
|
|
|
if min_counts<3:
|
|
@@ -2849,7 +2849,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
|
|
|
def flow_dumpcate_comsumer(self):
|
|
|
from multiprocessing import Process
|
|
|
- process_count = 2
|
|
|
+ process_count = 3
|
|
|
thread_count = 20
|
|
|
list_process = []
|
|
|
def start_thread():
|
|
@@ -3543,7 +3543,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
if tenderee!="" and len(list_product)>0:
|
|
|
_query = [TermQuery(project_tenderee,tenderee),
|
|
|
should_q_product]
|
|
|
- list_query.append([_query,2])
|
|
|
+ list_query.append([_query,1])
|
|
|
|
|
|
if tenderee!="" and project_name!="":
|
|
|
_query = [TermQuery(project_tenderee,tenderee),
|
|
@@ -3553,7 +3553,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
if tenderee!="" and agency!="":
|
|
|
_query = [TermQuery(project_tenderee,tenderee),
|
|
|
TermQuery(project_agency,agency)]
|
|
|
- list_query.append([_query,1])
|
|
|
+ list_query.append([_query,0])
|
|
|
|
|
|
if tenderee!="" and float(bidding_budget)>0:
|
|
|
_query = [TermQuery(project_tenderee,tenderee),
|
|
@@ -3574,12 +3574,12 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
if agency!="" and win_tenderer!="":
|
|
|
_query = [TermQuery(project_agency,agency),
|
|
|
TermQuery(project_win_tenderer,win_tenderer)]
|
|
|
- list_query.append([_query,2])
|
|
|
+ list_query.append([_query,0])
|
|
|
|
|
|
if agency!="" and len(list_product)>0:
|
|
|
_query = [TermQuery(project_agency,agency),
|
|
|
should_q_product]
|
|
|
- list_query.append([_query,2])
|
|
|
+ list_query.append([_query,1])
|
|
|
|
|
|
if win_tenderer!="" and len(list_code)>0:
|
|
|
_query = [TermQuery(project_win_tenderer,win_tenderer),
|
|
@@ -3608,7 +3608,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
if len(list_code)>0:
|
|
|
_query = [
|
|
|
should_q_code]
|
|
|
- list_query.append([_query,1])
|
|
|
+ list_query.append([_query,2])
|
|
|
|
|
|
_query = [
|
|
|
should_q_cod]
|
|
@@ -3623,11 +3623,11 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
if len(list_product)>0 and should_q_area is not None:
|
|
|
_query = [should_q_area,
|
|
|
should_q_product]
|
|
|
- list_query.append([_query,1])
|
|
|
+ list_query.append([_query,0])
|
|
|
|
|
|
generate_time = time.time()-_time
|
|
|
whole_time = time.time()-whole_time_start
|
|
|
- log("projects merge rules whole_time:%.3f prepare_time:%.3f log_time:%.3f generate_time:%.3f"%(whole_time,prepare_time,log_time,generate_time))
|
|
|
+ # log("projects merge rules whole_time:%.3f prepare_time:%.3f log_time:%.3f generate_time:%.3f"%(whole_time,prepare_time,log_time,generate_time))
|
|
|
return list_query
|
|
|
|
|
|
|
|
@@ -3649,6 +3649,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
must_not_q = []
|
|
|
for _uuid in list(set_uuid):
|
|
|
must_not_q.append(TermQuery("uuid",_uuid))
|
|
|
+ print("must_not_q uuid:%s"%(_uuid))
|
|
|
|
|
|
|
|
|
projects_merge_count = 0
|
|
@@ -3675,13 +3676,25 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
bidding_budget = _proj.get(project_bidding_budget,-1)
|
|
|
win_tenderer = _proj.get(project_win_tenderer,"")
|
|
|
win_bid_price = _proj.get(project_win_bid_price,-1)
|
|
|
+ _dynamic = _proj.get(project_project_dynamics,"[]")
|
|
|
+ is_yanshou = False
|
|
|
+ list_dynamic = json.loads(_dynamic)
|
|
|
+ for _d in list_dynamic:
|
|
|
+ _title = _d.get("doctitle","")
|
|
|
+ if re.search("验收公[示告]",_title) is not None:
|
|
|
+ is_yanshou = True
|
|
|
+ break
|
|
|
|
|
|
province = _proj.get(project_province,"")
|
|
|
city = _proj.get(project_city,"")
|
|
|
district = _proj.get(project_district,"")
|
|
|
|
|
|
- page_time_less = timeAdd(page_time,-150)
|
|
|
- page_time_greater = timeAdd(page_time,120)
|
|
|
+ if is_yanshou:
|
|
|
+ page_time_less = timeAdd(page_time,-750)
|
|
|
+ page_time_greater = timeAdd(page_time,720)
|
|
|
+ else:
|
|
|
+ page_time_less = timeAdd(page_time,-450)
|
|
|
+ page_time_greater = timeAdd(page_time,420)
|
|
|
sub_project_q = TermQuery(project_sub_project_name,sub_project_name) if sub_project_name.replace("Project","")!="" else None
|
|
|
_time = time.time()
|
|
|
list_must_query = self.getMerge_rules(page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price,province,city,district)
|
|
@@ -3693,14 +3706,14 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
search_table_index = "project2_index_formerge"
|
|
|
project_cls = Project
|
|
|
|
|
|
- print("page_time,min_date",page_time,min_date)
|
|
|
- if page_time>=min_date:
|
|
|
- search_table = "project2_tmp"
|
|
|
- search_table_index = "project2_tmp_index"
|
|
|
- project_cls = Project_tmp
|
|
|
+ # print("page_time,min_date",page_time,min_date)
|
|
|
+ # if page_time>=min_date:
|
|
|
+ # search_table = "project2_tmp"
|
|
|
+ # search_table_index = "project2_tmp_index"
|
|
|
+ # project_cls = Project_tmp
|
|
|
|
|
|
|
|
|
- _step = 4
|
|
|
+ _step = 2
|
|
|
_begin = 0
|
|
|
must_queries = []
|
|
|
|
|
@@ -3709,22 +3722,26 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
must_queries = [RangeQuery(project_page_time,page_time_less,page_time_greater,True,True),
|
|
|
]
|
|
|
|
|
|
- print("page_time_less,page_time_greater",page_time,page_time_less,page_time_greater)
|
|
|
#sub_project_name非必要条件
|
|
|
# if sub_project_q is not None:
|
|
|
# must_queries.append(sub_project_q)
|
|
|
|
|
|
projects_prepare_time += time.time()-_time
|
|
|
_time = time.time()
|
|
|
+ sort_type = SortOrder.DESC
|
|
|
while _begin<len(list_must_query):
|
|
|
+ if sort_type==SortOrder.DESC:
|
|
|
+ sort_type=SortOrder.ASC
|
|
|
+ if sort_type==SortOrder.ASC:
|
|
|
+ sort_type=SortOrder.DESC
|
|
|
list_should_q = []
|
|
|
- _limit = 20
|
|
|
+ _limit = 10
|
|
|
for must_q,_count in list_must_query[_begin:_begin+_step]:
|
|
|
must_q1 = list(must_q)
|
|
|
must_q1.extend(must_queries)
|
|
|
list_should_q.append(BoolQuery(must_queries=must_q1))
|
|
|
|
|
|
- # _limit += _count*5
|
|
|
+ _limit += _count*5
|
|
|
_query = BoolQuery(
|
|
|
should_queries=list_should_q,
|
|
|
must_not_queries=must_not_q[:100]
|
|
@@ -3734,7 +3751,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
# columns_to_get=ColumnsToGet(column_names=[project_uuid,project_docids,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_name,project_project_code,project_project_codes,project_project_addr,project_tenderee,project_tenderee_addr,project_tenderee_phone,project_tenderee_contact,project_agency,project_agency_phone,project_agency_contact,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_project_dynamics,project_product,project_moneysource,project_service_time,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_dup_docid,project_info_source,project_nlp_enterprise,project_nlp_enterprise_attachment],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
|
|
rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search(search_table,search_table_index,
|
|
|
- SearchQuery(_query,limit=_limit),
|
|
|
+ SearchQuery(_query,sort=Sort(sorters=[FieldSort(project_page_time,sort_type)]),limit=_limit),
|
|
|
columns_to_get=ColumnsToGet(column_names=check_columns,return_type=ColumnReturnType.SPECIFIED))
|
|
|
list_data = getRow_ots(rows)
|
|
|
|
|
@@ -3829,8 +3846,9 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
list_projects = self.merge_projects(list_projects,b_log)
|
|
|
# log("merge projects takes:%.3f"%(time.time()-_time))
|
|
|
|
|
|
+
|
|
|
_time = time.time()
|
|
|
- dumplicate_document_in_merge(list_projects)
|
|
|
+ list_merge_dump = dumplicate_document_in_merge(list_projects,dup_docid[:-1])
|
|
|
# log("dumplicate document %d takes:%.3f"%(len(list_projects),time.time()-_time))
|
|
|
|
|
|
_time = time.time()
|
|
@@ -3838,7 +3856,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
# log("json projects takes:%.3f"%(time.time()-_time))
|
|
|
if b_log:
|
|
|
log("project_json:%s"%project_json)
|
|
|
- return project_json
|
|
|
+ return project_json,list_merge_dump
|
|
|
except Exception as e:
|
|
|
raise RuntimeError("error on dumplicate")
|
|
|
|
|
@@ -3858,13 +3876,29 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
else:
|
|
|
if _save==1:
|
|
|
set_fingerprint.add(fingerprint_less)
|
|
|
- print("_fingerprint",_fingerprint)
|
|
|
- print(set_fingerprint)
|
|
|
if _fingerprint in set_fingerprint:
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
|
|
|
+ def check_page_time(self,item):
|
|
|
+ page_time = item.get(document_page_time,"")
|
|
|
+ has_before = False
|
|
|
+ has_after = False
|
|
|
+ if len(page_time)>0:
|
|
|
+ l_page_time = timeAdd(page_time,days=-90)
|
|
|
+ dict_time = item.get("dict_time",{})
|
|
|
+ for k,v in dict_time.items():
|
|
|
+ if v is not None and len(v)>0:
|
|
|
+ if l_page_time>v:
|
|
|
+ has_before = True
|
|
|
+ if v>page_time:
|
|
|
+ has_after = True
|
|
|
+ if not has_after and has_before:
|
|
|
+ log("check page_time false %s==%s-%s"%(l_page_time,k,v))
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
|
|
|
def dumplicate_comsumer_handle(self,item,result_queue,ots_client,get_all=False,upgrade=True):
|
|
|
try:
|
|
@@ -3901,9 +3935,10 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
|
|
|
|
|
|
|
|
|
+ b_log = False if upgrade else True
|
|
|
_time = time.time()
|
|
|
# log("%d start final check with length:%d"%(item["docid"],len(base_list)))
|
|
|
- final_list = self.dumplicate_fianl_check(base_list)
|
|
|
+ final_list = self.dumplicate_fianl_check(base_list,b_log)
|
|
|
|
|
|
exist_finterprint = self.is_exist_fingerprint(final_list,item.get(document_tmp_docid),item.get(document_tmp_fingerprint),table_name)
|
|
|
# log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
|
|
@@ -3929,7 +3964,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
remove_list = []
|
|
|
|
|
|
|
|
|
- if len(final_list)==0 or best_docid==item.get(document_tmp_docid):
|
|
|
+ if self.check_page_time(item) and (len(final_list)==0 or best_docid==item.get(document_tmp_docid)):
|
|
|
dtmp.setValue(document_tmp_save,1,True)
|
|
|
# dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
|
|
|
dmp_docid = ",".join([str(a) for a in list(dup_docid)])
|
|
@@ -3953,26 +3988,25 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
|
|
|
list_docids = list(dup_docid)
|
|
|
list_docids.append(best_docid)
|
|
|
- b_log = False if upgrade else True
|
|
|
|
|
|
if item.get(document_update_document)=="true":
|
|
|
dtmp.setValue(document_tmp_save,1,True)
|
|
|
|
|
|
+ list_merge_dump = []
|
|
|
if exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0:
|
|
|
log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
|
|
|
dtmp.setValue(document_tmp_projects,"[]",True)
|
|
|
else:
|
|
|
- dtmp.setValue(document_tmp_projects,self.merge_document_real(item,list_docids,table_name,dtmp.getProperties().get(document_tmp_save),flow_dumplicate_status_to,b_log),True)
|
|
|
+ project_json,list_merge_dump = self.merge_document_real(item,list_docids,table_name,dtmp.getProperties().get(document_tmp_save),flow_dumplicate_status_to,b_log)
|
|
|
+ dtmp.setValue(document_tmp_projects,project_json,True)
|
|
|
log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
|
|
|
|
|
|
if upgrade:
|
|
|
- if table_name=="document_tmp":
|
|
|
- self.changeSaveStatus(remove_list)
|
|
|
-
|
|
|
# print(dtmp.getProperties())
|
|
|
dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
|
|
|
dtmp.setValue(document_tmp_best_docid,best_docid,True)
|
|
|
_flag = dtmp.update_row(self.ots_client)
|
|
|
+
|
|
|
if not _flag:
|
|
|
for i in range(10):
|
|
|
list_proj_json = dtmp.getProperties().get(document_tmp_projects)
|
|
@@ -3981,6 +4015,11 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
dtmp.setValue(document_tmp_projects,json.dumps(list_proj[:len(list_proj)//2]),True)
|
|
|
if dtmp.update_row(self.ots_client):
|
|
|
break
|
|
|
+ if table_name=="document_tmp":
|
|
|
+ self.changeSaveStatus(remove_list)
|
|
|
+ self.changeSaveStatus(list_merge_dump)
|
|
|
+
|
|
|
+
|
|
|
|
|
|
|
|
|
# log("dump takes %.2f"%(time.time()-start_time))
|
|
@@ -4053,7 +4092,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
schedule = BlockingScheduler()
|
|
|
schedule.add_job(self.flow_dumplicate,"cron",second="*/10")
|
|
|
schedule.add_job(self.flow_dumpcate_comsumer,"cron",second="*/30")
|
|
|
- schedule.add_job(self.bdm.monitor_dumplicate,"cron",minute="*/10")
|
|
|
+ schedule.add_job(self.bdm.monitor_dumplicate,"cron",minute="*/15")
|
|
|
schedule.add_job(self.flow_remove,"cron",hour="20")
|
|
|
schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
|
|
|
# schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="55")
|
|
@@ -4061,13 +4100,25 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
|
|
|
def changeSaveStatus(self,list_dict):
|
|
|
for _dict in list_dict:
|
|
|
- if _dict.get(document_tmp_save,1)==1:
|
|
|
- _d = {"partitionkey":_dict["partitionkey"],
|
|
|
- "docid":_dict["docid"],
|
|
|
+ if isinstance(_dict,dict):
|
|
|
+ if _dict.get(document_tmp_save,1)==1:
|
|
|
+ _d = {"partitionkey":_dict["partitionkey"],
|
|
|
+ "docid":_dict["docid"],
|
|
|
+ document_tmp_save:0
|
|
|
+ }
|
|
|
+ _d_tmp = Document_tmp(_d)
|
|
|
+ if _d_tmp.exists_row(self.ots_client):
|
|
|
+ _d_tmp.update_row(self.ots_client)
|
|
|
+ elif isinstance(_dict,int):
|
|
|
+ _d = {"partitionkey":_dict%500+1,
|
|
|
+ "docid":_dict,
|
|
|
document_tmp_save:0
|
|
|
}
|
|
|
_d_tmp = Document_tmp(_d)
|
|
|
- _d_tmp.update_row(self.ots_client)
|
|
|
+ if _d_tmp.fix_columns(self.ots_client,["status"],True):
|
|
|
+ if _d_tmp.getProperties().get("status")==1:
|
|
|
+ _d_tmp.setValue("status",0,True)
|
|
|
+ _d_tmp.update_row(self.ots_client)
|
|
|
|
|
|
|
|
|
|
|
@@ -4175,8 +4226,10 @@ if __name__ == '__main__':
|
|
|
df_dump = Dataflow_dumplicate(start_delete_listener=False)
|
|
|
# df_dump.start_flow_dumplicate()
|
|
|
a = time.time()
|
|
|
- df_dump.test_dumplicate(339737931)
|
|
|
- # df_dump.test_merge([292315564],[287890754])
|
|
|
+ df_dump.test_dumplicate(380763870
|
|
|
+ )
|
|
|
+ # df_dump.test_merge([372841008
|
|
|
+ # ],[370595571])
|
|
|
# df_dump.flow_remove_project_tmp()
|
|
|
print("takes",time.time()-a)
|
|
|
# df_dump.fix_doc_which_not_in_project()
|