Kaynağa Gözat

更改数据修复逻辑

luojiehua 2 yıl önce
ebeveyn
işleme
1053b8e26d

+ 1 - 1
BaseDataMaintenance/common/multiProcess.py

@@ -518,7 +518,7 @@ class MultiHandler(object):
                 _count += 1
             else:
                 _t.terminate()
-                self.list_process[_i] = ProcessHandler(self.thread_count,self.task_queue,self.task_handler,self.result_queue,self.need_stop*self.args,**self.kwargs)
+                self.list_process[_i] = ProcessHandler(self.thread_count,self.task_queue,self.task_handler,self.result_queue,self.need_stop,*self.args,**self.kwargs)
                 self.list_process[_i].start()
                 restart += 1
         logging.debug("process status alive:%d restart:%d total:%d"%(_count,restart,len(self.list_process)))

+ 12 - 7
BaseDataMaintenance/common/multiThread.py

@@ -32,25 +32,26 @@ def stop_thread(thread):
 
 class _taskHandler(threading.Thread):
 
-    def __init__(self,task_queue,task_handler,result_queue,*args,**kwargs):
+    def __init__(self,task_queue,task_handler,result_queue,need_stop=True,*args,**kwargs):
         threading.Thread.__init__(self)
         # Process.__init__(self)
         self.task_queue = task_queue
         self.task_handler = task_handler
         self.result_queue = result_queue
+        self.need_stop = need_stop
         self.args = args
         self.kwargs = kwargs
 
     def run(self):
         while(True):
             try:
-                # logging.info("task queue size is %d"%(self.task_queue.qsize()))
+                logging.info("handler task queue size is %d need_stop %s"%(self.task_queue.qsize(),str(self.need_stop)))
                 item = self.task_queue.get(True,timeout=1)
                 self.task_handler(item,self.result_queue,*self.args,**self.kwargs)
                 # self.task_queue.task_done()
             except queue.Empty as e:
                 # logging.info("%s thread is done"%(self.name))
-                if self.task_queue.empty():
+                if self.need_stop and self.task_queue.empty():
                     break
             except Exception as e:
                 logging.info("error: %s"%(e))
@@ -58,7 +59,7 @@ class _taskHandler(threading.Thread):
 
 class MultiThreadHandler(object):
 
-    def __init__(self,task_queue,task_handler,result_queue,thread_count=1,process_count=1,restart=False,*args,**kwargs):
+    def __init__(self,task_queue,task_handler,result_queue,thread_count=1,process_count=1,need_stop=True,restart=False,*args,**kwargs):
         self.task_queue = task_queue
         self.task_handler = task_handler
         self.result_queue = result_queue
@@ -68,6 +69,7 @@ class MultiThreadHandler(object):
         self.args = args
         self.kwargs = kwargs
         self.restart = restart
+        self.need_stop = need_stop
 
     def getThreadStatus(self):
         _count = 0
@@ -77,17 +79,17 @@ class MultiThreadHandler(object):
                 _count += 1
             else:
                 if self.restart:
-                    _t = _taskHandler(self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
+                    _t = _taskHandler(self.task_queue,self.task_handler,self.result_queue,self.need_stop,*self.args,**self.kwargs)
                     _t.start()
                     restart += 1
-        logging.info("thread status alive:%d restart:%d total:%d"%(_count,restart,len(self.list_thread)))
+        logging.info("thread status alive:%d restart:%d total:%d need_stop %s"%(_count,restart,len(self.list_thread),str(self.need_stop)))
         return _count,restart,len(self.list_thread)
 
     def run(self):
 
         self.list_thread = []
         for i in range(self.thread_count):
-            th = _taskHandler(self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
+            th = _taskHandler(self.task_queue,self.task_handler,self.result_queue,self.need_stop,*self.args,**self.kwargs)
             # th.setDaemon(True)
             self.list_thread.append(th)
 
@@ -122,6 +124,9 @@ class MultiThreadHandler(object):
 
 
     def _check_all_done(self):
+        if not self.need_stop:
+            return False
+
         bool_done = True
         for th in self.list_thread:
             if th.isAlive():

+ 18 - 12
BaseDataMaintenance/maintenance/attachment/attachmentFix.py

@@ -319,7 +319,7 @@ def fixAttachmentOfDoc(docid,new_files):
         if not _attach.exists_row(ots_client):
             log("not exists %s"%current_filemd5)
             _old_status = _attach.getProperties().get(attachment_status)
-            _attach.setValue(attachment_status,10,True)
+            _attach.setValue(attachment_status,0,True)
             _attach.setValue("old_status",_old_status,False)
             _attach.update_row(ots_client)
             uploadFileByPath(bucket,filepath,_attach.getProperties().get(attachment_path))
@@ -342,8 +342,7 @@ def fixAttachmentOfDoc(docid,new_files):
     if _old_attachments=="":
         _old_attachments = "[]"
     _page_attachments = json.loads(_old_attachments)
-    print(_page_attachments)
-
+    print("fixing",docid,_page_attachments)
 
     for new_item in new_attachments:
         _exists = False
@@ -353,10 +352,12 @@ def fixAttachmentOfDoc(docid,new_files):
         if not _exists:
             _page_attachments.append(new_item)
     _document.setValue(document_attachment_path,json.dumps(_page_attachments,ensure_ascii=False),True)
+    if len(_page_attachments)>0:
+        _document.setValue(document_status,1,True)
     _document.update_row(ots_client)
 
 
-    makeFixing(_document)
+    # makeFixing(_document) #新流程不再需要这一步
 
     for _attach in list_attach:
         if _attach.getProperties().get("old_status") is not None:
@@ -379,7 +380,12 @@ def extract_pageAttachments(_html):
         _url = _a.attrs.get("href","")
         if _url.find("http://www.bidizhaobiao.com")>=0:
             continue
+        if _url.find("detail-area-list-icon.png")>=0:
+            continue
         is_attach = False
+        if _url.find("http://zbtb.gd.gov.cn/platform/attach")>=0:
+            is_attach = True
+            file_type = ".pdf"
         for suf in fileSuffix:
             if _text.find(suf)>=0 or _url.find(suf)>=0:
                 is_attach = True
@@ -407,16 +413,17 @@ def fixDoc(docid,ots_client=None):
     :param ots_client:
     :return:
     '''
-    log("=1")
     if ots_client is None:
         ots_client = getConnect_ots()
+    capacity_client = getConnect_ots_capacity()
     partitionkey = docid%500+1
     _document = Document({document_partitionkey:partitionkey,document_docid:docid})
-    _document.fix_columns(ots_client,[document_attachment_path,document_attachment_extract_status,document_dochtmlcon],True)
+    _document.fix_columns(ots_client,[document_attachment_path,document_attachment_extract_status],True)
+    _document.fix_columns(capacity_client,[document_dochtmlcon],True)
     log("=1")
-    if _document.getProperties().get(document_attachment_extract_status,0)!=1:
+    if _document.getProperties().get(document_attachment_extract_status,0)!=1 or 1:
         log("=2")
-        if _document.getProperties().get(document_attachment_path,'[]')=="[]" or 1:
+        if _document.getProperties().get(document_attachment_path,'[]')=="[]":
             log("=3")
             new_attachments = extract_pageAttachments(_document.getProperties().get(document_dochtmlcon))
             log(str(new_attachments))
@@ -448,8 +455,8 @@ class FixDocument():
 
         bool_query = BoolQuery(must_queries=[
             # RangeQuery("crtime",'2021-08-10 00:00:00','2021-10-10 00:00:00'),
-            # RangeQuery("docid",174453795)
-            WildcardQuery("web_source_no","17147*"),
+            RangeQuery("docid",86363824),
+            WildcardQuery("web_source_no","00141-1*"),
             # should_docid
         ]
                                ,must_not_queries=[NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*"))])
@@ -458,7 +465,6 @@ class FixDocument():
                                                                             SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),get_total_count=True,limit=100),
                                                                             ColumnsToGet([],ColumnReturnType.NONE))
         log("total_count:%d"%total_count)
-        return
         dict_rows = getRow_ots(rows)
         for _row in dict_rows:
             self.task_queue.put(_row)
@@ -501,7 +507,7 @@ def start_docFix():
     fd.start()
 
 if __name__=="__main__":
-    # docs = [183047740]
+    # docs = [156668514]
     # for _doc in docs:
     #     fixDoc(_doc)
 

+ 36 - 23
BaseDataMaintenance/maintenance/dataflow.py

@@ -4,7 +4,7 @@ from BaseDataMaintenance.dataSource.source import getConnect_activateMQ_ali
 from BaseDataMaintenance.common.multiThread import MultiThreadHandler
 from BaseDataMaintenance.common.multiProcess import MultiHandler
 from queue import Queue
-# from multiprocessing import Queue
+from multiprocessing import Queue as PQueue
 
 from BaseDataMaintenance.model.ots.document_tmp import *
 from BaseDataMaintenance.model.ots.attachment import *
@@ -107,7 +107,7 @@ class Dataflow():
         self.list_attachment_not_ocr = []
         self.queue_extract = Queue()
         self.list_extract = []
-        self.queue_dumplicate = Queue()
+        self.queue_dumplicate = PQueue()
         self.dumplicate_set = set()
         self.queue_merge = Queue()
         self.queue_syncho = Queue()
@@ -2531,7 +2531,7 @@ class Dataflow_dumplicate(Dataflow):
                     else:
                         if _docid!=item.get(document_tmp_docid):
                             _time1 = time.time()
-                            confidence = self.dumplicate_check(item,_dict,total_count,b_log=True)
+                            confidence = self.dumplicate_check(item,_dict,total_count,b_log=False)
                             check_time+= time.time()-_time1
 
                             _dict["confidence"] = confidence
@@ -2799,8 +2799,10 @@ class Dataflow_dumplicate(Dataflow):
 
     def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
         def producer(columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]):
-            # if self.queue_dumplicate.qsize()>flow_process_count//3:
-            #     return
+            q_size = self.queue_dumplicate.qsize()
+            log("dumplicate queue size %d"%(q_size))
+            if q_size>flow_process_count//3:
+                return
             bool_query = BoolQuery(must_queries=[
                 RangeQuery(document_tmp_status,*status_from,True,True),
                 # TermQuery("docid",271983871)
@@ -2811,10 +2813,10 @@ class Dataflow_dumplicate(Dataflow):
             log("flow_dumplicate producer total_count:%d"%total_count)
             list_dict = getRow_ots(rows)
             for _dict in list_dict:
-                # docid = _dict.get(document_tmp_docid)
-                # if docid in self.dumplicate_set:
-                #     continue
-                # self.dumplicate_set.add(docid)
+                docid = _dict.get(document_tmp_docid)
+                if docid in self.dumplicate_set:
+                    continue
+                self.dumplicate_set.add(docid)
                 self.queue_dumplicate.put(_dict)
             _count = len(list_dict)
             while next_token and _count<flow_process_count:
@@ -2823,25 +2825,39 @@ class Dataflow_dumplicate(Dataflow):
                                                                                     ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
                 list_dict = getRow_ots(rows)
                 for _dict in list_dict:
-                    # docid = _dict.get(document_tmp_docid)
-                    # if docid in self.dumplicate_set:
-                    #     continue
-                    # self.dumplicate_set.add(docid)
+                    docid = _dict.get(document_tmp_docid)
+                    if docid in self.dumplicate_set:
+                        continue
+                    self.dumplicate_set.add(docid)
                     self.queue_dumplicate.put(_dict)
                 _count += len(list_dict)
+
             _l = list(self.dumplicate_set)
             _l.sort(key=lambda x:x,reverse=True)
-            # self.dumplicate_set = set(_l[:flow_process_count*2])
+            self.dumplicate_set = set(_l[:flow_process_count]) | set(_l[-flow_process_count:])
         def comsumer():
             mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,60,1,ots_client=self.ots_client)
             mt.run()
 
         producer()
-        comsumer()
+        # comsumer()
 
     def flow_dumpcate_comsumer(self):
-        mt = MultiHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,30,2,need_stop=False,ots_client=self.ots_client)
-        mt.run()
+        from multiprocessing import Process
+        process_count = 2
+        thread_count = 30
+        list_process = []
+        def start_thread():
+            mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,thread_count,1,need_stop=False,ots_client=self.ots_client)
+            mt.run()
+
+        for _ in range(process_count):
+            p = Process(target=start_thread)
+            list_process.append(p)
+        for p in list_process:
+            p.start()
+        for p in list_process:
+            p.join()
 
 
     def search_docs(self,list_docids,columns_to_get = [document_doctitle,document_tmp_save,document_bidway,document_status,document_page_time,document_info_source,document_fingerprint,document_docchannel,document_life_docchannel,document_area,document_province,document_city,document_district,document_tmp_sub_docs_json,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_project_codes,document_product,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count,document_nlp_enterprise,document_nlp_enterprise_attachment]):
@@ -4015,8 +4031,8 @@ class Dataflow_dumplicate(Dataflow):
 
     def start_flow_dumplicate(self):
         schedule = BlockingScheduler()
-        schedule.add_job(self.flow_dumplicate,"cron",second="*/5")
-        # schedule.add_job(self.flow_dumpcate_comsumer,"cron",second="*/10")
+        schedule.add_job(self.flow_dumplicate,"cron",second="*/40")
+        schedule.add_job(self.flow_dumpcate_comsumer,"cron",second="*/10")
         schedule.add_job(self.bdm.monitor_dumplicate,"cron",minute="*/10")
         schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="55")
         schedule.start()
@@ -4056,12 +4072,9 @@ class Dataflow_dumplicate(Dataflow):
         list_docs_less = self.search_docs(list_docid_less)
         list_projects_less = self.generate_projects_from_document(list_docs_less)
 
-        print("======list_projects_less",list_projects_less)
         list_docs_greater = self.search_docs(list_docid_greater)
-        print("==list_docs_greater",[a.getProperties() for a in list_docs_greater])
         list_projects_greater = self.generate_projects_from_document(list_docs_greater)
 
-        print("=========list_projects_greater",list_projects_greater)
         list_projects_less.extend(list_projects_greater)
         list_projects = dumplicate_projects(list_projects_less,b_log=True)
         project_json = to_project_json(list_projects)
@@ -4139,7 +4152,7 @@ if __name__ == '__main__':
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
     a = time.time()
-    df_dump.test_dumplicate(328386698)
+    df_dump.test_dumplicate(330679217)
     # df_dump.test_merge([292315564],[287890754])
     # df_dump.flow_remove_project_tmp()
     print("takes",time.time()-a)

+ 3 - 2
BaseDataMaintenance/start_sychro_attachFix.py

@@ -3,9 +3,10 @@ import sys
 import os
 sys.path.append(os.path.dirname(__file__)+"/..")
 
-from BaseDataMaintenance.maintenance.attachment.attachmentFix import start_attachFix,fixDoc
+from BaseDataMaintenance.maintenance.attachment.attachmentFix import start_attachFix,fixDoc,start_docFix
 
 
 if __name__=="__main__":
     # start_attachFix()
-    pass
+    start_docFix()
+    # pass