Browse Source

处理线上数据合并效率,解决合并重复公告的问题

luojiehua 2 years ago
parent
commit
118ae0d95e

+ 7 - 4
BaseDataMaintenance/common/multiProcess.py

@@ -352,13 +352,14 @@ class ThreadHandler(threading.Thread):
 
 class ProcessHandler(Process):
 
-    def __init__(self,thread_count,task_queue,task_handler,result_queue,*args,**kwargs):
+    def __init__(self,thread_count,task_queue,task_handler,result_queue,need_stop=True,*args,**kwargs):
         # threading.Thread.__init__(self)
         Process.__init__(self)
         self.thread_count = thread_count
         self.task_queue = task_queue
         self.task_handler = task_handler
         self.result_queue = result_queue
+        self.need_stop = need_stop
         self.args = args
         self.kwargs = kwargs
         self.thread_queue = Queue(100)
@@ -389,7 +390,8 @@ class ProcessHandler(Process):
                 except queue.Empty as e:
                     if self.task_queue.qsize()==0:
                         logging.debug("%s main of process %s is done"%(self.name,self.pid))
-                        break
+                        if self.need_stop:
+                            break
                     else:
                         logging.debug("%s of process %s get item failed"%(self.name,self.pid))
                 except Exception as e:
@@ -494,12 +496,13 @@ class ProcessHandler(Process):
 
 class MultiHandler(object):
 
-    def __init__(self,task_queue,task_handler,result_queue,thread_count=1,process_count=1,*args,**kwargs):
+    def __init__(self,task_queue,task_handler,result_queue,thread_count=1,process_count=1,need_stop=True,*args,**kwargs):
         self.task_queue = task_queue
         self.task_handler = task_handler
         self.result_queue = result_queue
         self.thread_count = thread_count
         self.process_count = process_count
+        self.need_stop = need_stop
         self.args = args
         self.kwargs = kwargs
         self.list_process = []
@@ -515,7 +518,7 @@ class MultiHandler(object):
                 _count += 1
             else:
                 _t.terminate()
-                self.list_process[_i] = ProcessHandler(self.thread_count,self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
+                self.list_process[_i] = ProcessHandler(self.thread_count,self.task_queue,self.task_handler,self.result_queue,self.need_stop*self.args,**self.kwargs)
                 self.list_process[_i].start()
                 restart += 1
         logging.debug("process status alive:%d restart:%d total:%d"%(_count,restart,len(self.list_process)))

+ 47 - 19
BaseDataMaintenance/maintenance/dataflow.py

@@ -2,7 +2,9 @@
 
 from BaseDataMaintenance.dataSource.source import getConnect_activateMQ_ali
 from BaseDataMaintenance.common.multiThread import MultiThreadHandler
-from queue import Queue
+from BaseDataMaintenance.common.multiProcess import MultiHandler
+# from queue import Queue
+from multiprocessing import Queue
 
 from BaseDataMaintenance.model.ots.document_tmp import *
 from BaseDataMaintenance.model.ots.attachment import *
@@ -103,6 +105,7 @@ class Dataflow():
         self.queue_extract = Queue()
         self.list_extract = []
         self.queue_dumplicate = Queue()
+        self.dumplicate_set = set()
         self.queue_merge = Queue()
         self.queue_syncho = Queue()
         self.queue_remove = Queue()
@@ -2691,6 +2694,8 @@ class Dataflow_dumplicate(Dataflow):
 
     def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
         def producer(columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]):
+            if self.queue_dumplicate.qsize()>flow_process_count//3:
+                return
             bool_query = BoolQuery(must_queries=[
                 RangeQuery(document_tmp_status,*status_from,True,True),
                 # TermQuery("docid",271983871)
@@ -2701,6 +2706,10 @@ class Dataflow_dumplicate(Dataflow):
             log("flow_dumplicate producer total_count:%d"%total_count)
             list_dict = getRow_ots(rows)
             for _dict in list_dict:
+                # docid = _dict.get(document_tmp_docid)
+                # if docid in self.dumplicate_set:
+                #     continue
+                # self.dumplicate_set.add(docid)
                 self.queue_dumplicate.put(_dict)
             _count = len(list_dict)
             while next_token and _count<flow_process_count:
@@ -2709,15 +2718,27 @@ class Dataflow_dumplicate(Dataflow):
                                                                                     ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
                 list_dict = getRow_ots(rows)
                 for _dict in list_dict:
+                    # docid = _dict.get(document_tmp_docid)
+                    # if docid in self.dumplicate_set:
+                    #     continue
+                    # self.dumplicate_set.add(docid)
                     self.queue_dumplicate.put(_dict)
                 _count += len(list_dict)
+            _l = list(self.dumplicate_set)
+            _l.sort(key=lambda x:x,reverse=True)
+            # self.dumplicate_set = set(_l[:flow_process_count*2])
         def comsumer():
-            mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,50,1,ots_client=self.ots_client)
+            mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,60,1,ots_client=self.ots_client)
             mt.run()
 
         producer()
         comsumer()
 
+    def flow_dumpcate_comsumer(self):
+        mt = MultiHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,30,2,need_stop=False,ots_client=self.ots_client)
+        mt.run()
+
+
     def search_docs(self,list_docids,columns_to_get = [document_doctitle,document_tmp_save,document_bidway,document_status,document_page_time,document_info_source,document_fingerprint,document_docchannel,document_life_docchannel,document_area,document_province,document_city,document_district,document_tmp_sub_docs_json,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_project_codes,document_product,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count,document_nlp_enterprise,document_nlp_enterprise_attachment]):
         '''
         根据docid查询公告内容,先查询document_tmp,再查询document
@@ -2800,7 +2821,7 @@ class Dataflow_dumplicate(Dataflow):
 
 
 
-    def update_projects_by_document(self,docid,projects):
+    def update_projects_by_document(self,docid,save,projects):
         '''
         更新projects中对应的document的属性
         :param docid:
@@ -2841,7 +2862,13 @@ class Dataflow_dumplicate(Dataflow):
             _docids = _proj.get(project_docids,"")
             _codes = _proj.get(project_project_codes,"")
             _product = _proj.get(project_product,"")
-            set_docid = set_docid | set(_docids.split(","))
+
+            set_docid = set(_docids.split(","))
+            if save==1:
+                set_docid.add(docid)
+            else:
+                if str(docid) in set_docid:
+                    set_docid.remove(str(docid))
             set_code = set_code | set(_codes.split(","))
             set_product = set_product | set(_product.split(","))
             try:
@@ -2849,7 +2876,7 @@ class Dataflow_dumplicate(Dataflow):
                 set_nlp_enterprise_attachment |= set(json.loads(_proj.get(project_nlp_enterprise_attachment,"[]")))
             except Exception as e:
                 pass
-            set_docid = set_docid | set(project_dict.get(project_docids,"").split(","))
+
             set_code = set_code | set(project_dict.get(project_project_codes,"").split(","))
             set_product = set_product | set(project_dict.get(project_product,"").split(","))
 
@@ -3585,7 +3612,7 @@ class Dataflow_dumplicate(Dataflow):
 
 
 
-    def merge_document_real(self,item,dup_docid,table_name,status_to=None,b_log=False):
+    def merge_document_real(self,item,dup_docid,table_name,save,status_to=None,b_log=False):
         '''
         实时项目合并
         :param item:
@@ -3602,32 +3629,32 @@ class Dataflow_dumplicate(Dataflow):
 
         _time = time.time()
         list_projects = self.search_projects_with_document(list_docids)
-        log("search projects takes:%.3f"%(time.time()-_time))
+        # log("search projects takes:%.3f"%(time.time()-_time))
         if len(list_projects)==0:
-            _time = time.time()
+            # _time = time.time()
             list_docs = self.search_docs(list_docids)
-            log("search document takes:%.3f"%(time.time()-_time))
-            _time = time.time()
+            # log("search document takes:%.3f"%(time.time()-_time))
+            # _time = time.time()
             list_projects = self.generate_projects_from_document(list_docs)
-            log("generate projects takes:%.3f"%(time.time()-_time))
+            # log("generate projects takes:%.3f"%(time.time()-_time))
         else:
             _time = time.time()
-            self.update_projects_by_document(_docid,list_projects)
-            log("update projects takes:%.3f"%(time.time()-_time))
+            self.update_projects_by_document(_docid,save,list_projects)
+            # log("update projects takes:%.3f"%(time.time()-_time))
         _time = time.time()
         list_projects = dumplicate_projects(list_projects)
-        log("dumplicate projects takes:%.3f"%(time.time()-_time))
+        # log("dumplicate projects takes:%.3f"%(time.time()-_time))
         _time = time.time()
         list_projects = self.merge_projects(list_projects,b_log)
-        log("merge projects takes:%.3f"%(time.time()-_time))
+        # log("merge projects takes:%.3f"%(time.time()-_time))
 
         _time = time.time()
         dumplicate_document_in_merge(list_projects)
-        log("dumplicate document %d takes:%.3f"%(len(list_projects),time.time()-_time))
+        # log("dumplicate document %d takes:%.3f"%(len(list_projects),time.time()-_time))
 
         _time = time.time()
         project_json = to_project_json(list_projects)
-        log("json projects takes:%.3f"%(time.time()-_time))
+        # log("json projects takes:%.3f"%(time.time()-_time))
         if b_log:
             log("project_json:%s"%project_json)
         return project_json
@@ -3717,7 +3744,7 @@ class Dataflow_dumplicate(Dataflow):
             list_docids = list(dup_docid)
             list_docids.append(best_docid)
             b_log = False if upgrade else True
-            dtmp.setValue(document_tmp_projects,self.merge_document_real(item,list_docids,table_name,flow_dumplicate_status_to,b_log),True)
+            dtmp.setValue(document_tmp_projects,self.merge_document_real(item,list_docids,table_name,dtmp.getProperties().get(document_tmp_save),flow_dumplicate_status_to,b_log),True)
 
             log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
             if upgrade:
@@ -3799,6 +3826,7 @@ class Dataflow_dumplicate(Dataflow):
     def start_flow_dumplicate(self):
         schedule = BlockingScheduler()
         schedule.add_job(self.flow_dumplicate,"cron",second="*/5")
+        # schedule.add_job(self.flow_dumpcate_comsumer,"cron",second="*/10")
         schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="55")
         schedule.start()
 
@@ -3902,7 +3930,7 @@ if __name__ == '__main__':
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
     a = time.time()
-    df_dump.test_dumplicate(278818571)
+    df_dump.test_dumplicate(288272156)
     print("takes",time.time()-a)
     # df_dump.fix_doc_which_not_in_project()
     # df_dump.delete_projects_by_document(16288036)

+ 1 - 1
BaseDataMaintenance/maintenance/dataflow_settings.py

@@ -1,6 +1,6 @@
 
 
-flow_process_count = 1000
+flow_process_count = 3000
 
 flow_attachment_status_from = [0,10]
 flow_attachment_status_failed_to = [0,0]

+ 3 - 2
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -731,8 +731,9 @@ def check_money(bidding_budget_less,bidding_budget_greater,
         price_greater = float(win_bid_price_greater)
         if price_less!=price_greater:
 
-            if max(price_less,price_greater)/min(price_less,price_greater)==10000:
-                price_is_same = True
+            if min(price_less,price_greater)>0:
+                if max(price_less,price_greater)/min(price_less,price_greater)==10000:
+                    price_is_same = True
             if price_less>10000 and price_greater>10000 and round(price_less/10000,2)==round(price_greater/10000,2):
                 price_is_same = True
             if price_is_same=="":

+ 9 - 4
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -2801,14 +2801,14 @@ def dumplicate_document_in_merge(list_projects):
             _time = time.time()
             for _d in list_dynamics:
                 docid = _d.get(document_docid)
-                if docid not in set_docids:
+                if str(docid) not in set_docids:
                     continue
                 _status = _d.get(document_status,201)
                 is_multipack = _d.get("is_multipack",True)
                 extract_count = _d.get(document_tmp_extract_count,0)
                 docchannel = _d.get(document_docchannel,0)
                 page_time = _d.get(document_page_time,"")
-                if _status>=201 and _status<=300 and docchannel>0:
+                if docchannel>0:
                     if docchannel in dict_channel_proj:
                         n_d = dict_channel_proj[docchannel]
                         n_docid = n_d.get(document_docid)
@@ -2819,31 +2819,36 @@ def dumplicate_document_in_merge(list_projects):
                             continue
                         if not check_page_time_dup(page_time,n_page_time):
                             continue
+
                         if extract_count>n_extract_count:
+                            n_d[document_status] = 401
                             set_dup_docid.add(str(n_docid))
                             dict_channel_proj[docchannel] = _d
                         elif extract_count==n_extract_count:
                             if int(n_docid)>int(docid):
+                                n_d[document_status] = 401
                                 set_dup_docid.add(str(n_docid))
                                 dict_channel_proj[docchannel] = _d
                             elif int(n_docid)<int(docid):
+                                _d[document_status] = 401
                                 set_dup_docid.add(str(docid))
                         else:
+                            _d[document_status] = 401
                             set_dup_docid.add(str(docid))
                         if not is_multipack and not n_is_multipack:
                             pass
                     else:
                         dict_channel_proj[docchannel] = _d
 
-
             set_docids = set_docids-set_dup_docid
             if len(set_docids)==0:
                 log("projects set_docids length is zero %s"%(docids))
             else:
                 _proj[project_docids] = ",".join(list(set_docids))
+            _proj[project_project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)
             _proj[project_docid_number] = len(set_docids)
             _proj[project_dup_docid] = ",".join(list(set_dup_docid))
-            log("dumplicate_document docid%s dynamic %d takes%.3f"%(str(docid),len(list_dynamics),time.time()-_time))
+            # log("dumplicate_document docid%s dynamic %d takes%.3f"%(str(docid),len(list_dynamics),time.time()-_time))
         except Exception as e:
             traceback.print_exc()
 

+ 15 - 4
BaseDataMaintenance/model/redis/enterprise.py

@@ -47,12 +47,23 @@ def set_enterprise_to_redis(filepath):
     else:
         log("enterprise filepath not exists")
 
-def remove_enterprise_key(_keys):
+def remove_enterprise_key(filename,_keys):
     from BaseDataMaintenance.dataSource.source import getConnect_redis_baseline
     db = getConnect_redis_baseline()
-    for _key in _keys.split(","):
-        _v = db.delete(_key)
-        print("delete key %s %s"%(_key,str(_v)))
+    if filename is not None:
+        with open(filename,"r",encoding="utf8") as f:
+            while 1:
+                _line = f.readline()
+                if not _line:
+                    break
+                _key = _line.strip()
+                _v = db.delete(_key)
+                print("delete key %s %s"%(_key,str(_v)))
+
+    if _keys is not None:
+        for _key in _keys.split(","):
+            _v = db.delete(_key)
+            print("delete key %s %s"%(_key,str(_v)))
     del db
 
 

+ 2 - 2
BaseDataMaintenance/start_main.py

@@ -21,8 +21,8 @@ def main(args=None):
             set_enterprise_to_redis(args.filename)
     if args.deleteEnterpriseKey:
         from BaseDataMaintenance.model.redis.enterprise import remove_enterprise_key
-        if args.keys:
-            remove_enterprise_key(args.keys)
+        if args.keys or args.filename:
+            remove_enterprise_key(args.filename,args.keys)
 
 
 if __name__ == '__main__':

+ 10 - 2
BaseDataMaintenance/test/ab.py

@@ -1,4 +1,12 @@
 
 
-a = {1:1,2:1}
-print(a)
+from BaseDataMaintenance.model.ots.project import *
+
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+
+ots_client = getConnect_ots()
+_d = {project_uuid:"f9afc516-7ae1-4ddf-83f3-395f294b288a",
+      project_docids:"288256566,288133788,288157306,288137729",
+      project_docid_number:4}
+_p = Project(_d)
+_p.update_row(ots_client)