|
@@ -3018,7 +3018,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
def flow_dumpcate_comsumer(self):
|
|
|
from multiprocessing import Process
|
|
|
process_count = 6
|
|
|
- thread_count = 3
|
|
|
+ thread_count = 12
|
|
|
list_process = []
|
|
|
def start_thread():
|
|
|
mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,thread_count,1,need_stop=False,restart=True,timeout=600,ots_client=self.ots_client)
|
|
@@ -3376,7 +3376,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
|
|
|
set_docid = set()
|
|
|
list_delete_projects = []
|
|
|
- list_projects = self.search_projects_with_document([docid],"project2","project2_index")
|
|
|
+ list_projects = self.search_projects_with_document([docid],project_table="project2",project_table_index="project2_index_formerge")
|
|
|
for _proj in list_projects:
|
|
|
_p = {}
|
|
|
_docids = _proj.get(project_docids,"")
|
|
@@ -4017,7 +4017,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
_time = time.time()
|
|
|
_check,_prob = check_merge_rule(_proj,_data,b_log=b_log,return_prob=True)
|
|
|
if b_log:
|
|
|
- log(str(_check))
|
|
|
+ log("merge rule res: "+str(_check)+" prob: "+str(_prob))
|
|
|
projects_check_rule_time += time.time()-_time
|
|
|
if _check:
|
|
|
list_check_data.append([_data,_prob])
|
|
@@ -4173,7 +4173,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
self.update_projects_by_document(_docid,save,list_projects,document_name=document_name)
|
|
|
# log("update projects takes:%.3f"%(time.time()-_time))
|
|
|
_time = time.time()
|
|
|
- list_projects = dumplicate_projects(list_projects)
|
|
|
+ list_projects = dumplicate_projects(list_projects,max_count=20)
|
|
|
|
|
|
|
|
|
# log("dumplicate projects takes:%.3f"%(time.time()-_time))
|