Explorar el Código

提高配置提取的效率,修复因重启接口导致的附件未识别问题

luojiehua hace 1 año
padre
commit
f7e579cf7e

+ 3 - 0
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -164,6 +164,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
             dtmp = Document_tmp(item)
 
 
+            start_time = time.time()
             #调用识别接口
             _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
 
@@ -208,6 +209,8 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 ackMsg(conn,message_id)
             log("document:%d get attachments with result:%s %s retry_times:%d"%(item.get("docid"),str(_succeed),str(_to_ack),_retry_times))
         except Exception as e:
+            if time.time()-start_time<10:
+                item["retry_times"] -= 1
             if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment):
                 ackMsg(conn,message_id)
 

+ 3 - 1
BaseDataMaintenance/maintenance/product/htmlparser.py

@@ -7,7 +7,7 @@ import logging
 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
 
 logger = logging.getLogger(__name__)
-logger.setLevel(logging.DEBUG)
+logger.setLevel(logging.INFO)
 
 
 from bs4 import BeautifulSoup
@@ -448,6 +448,8 @@ class ParseDocument():
                 _table = _soup.find("table")
                 if _table is not None:
                     list_table = getTable(_table)
+                    if len(list_table)==0:
+                        continue
                     table_columns = len(list_table[0])
 
                     if auto_merge_table:

+ 136 - 100
BaseDataMaintenance/maintenance/product/product_attachment.py → BaseDataMaintenance/maintenance/product/product_parameter.py

@@ -24,12 +24,14 @@ parameter_status_not_found = 4
 
 import redis
 
+from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
+
 class Product_Attachment_Processor():
 
     def __init__(self,):
         self.ots_client = getConnect_ots()
         self.product_attachment_queue = PQueue()
-        self.product_attachment_queue_size = 100
+        self.product_attachment_queue_size = 50
         self.set_product_attachment = set()
         self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
         self.auth = getAuth()
@@ -47,38 +49,53 @@ class Product_Attachment_Processor():
         self.test_url="http://192.168.2.102:15011/convert"
 
     def process_parameters_producer(self,):
-
-        if self.product_attachment_queue.qsize()>self.product_attachment_queue_size/3:
-            return
-        bool_query = BoolQuery(must_queries=[
-            TermQuery("parameter_status",parameter_status_to_process)
-        ])
-        list_id = []
-        rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
-                                                                            SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("parameter_status")]),limit=100,get_total_count=True),
-                                                                            ColumnsToGet([DOCUMENT_PRODUCT_ATTACHMENTS,DOCUMENT_PRODUCT_NAME,DOCUMENT_PRODUCT_ORIGINAL_NAME],return_type=ColumnReturnType.SPECIFIED))
-
-        list_data = getRow_ots(rows)
-        for data in list_data:
-            _id = data.get(DOCUMENT_PRODUCT_ID)
-            if _id in self.set_product_attachment:
-                continue
-            self.product_attachment_queue.put(data)
-            list_id.append(_id)
-        while next_token:
-            if self.product_attachment_queue.qsize()>=self.product_attachment_queue_size:
-                break
+        attachment_size = getQueueSize("dataflow_attachment")
+        if attachment_size<100:
+
+            if self.product_attachment_queue.qsize()>self.product_attachment_queue_size/3:
+                return
+            bool_query = BoolQuery(must_queries=[
+                TermQuery("parameter_status",parameter_status_to_process)
+            ])
+            list_id = []
+            dict_docid_list = {}
             rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
-                                                                                SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
-                                                                                ColumnsToGet([DOCUMENT_PRODUCT_ATTACHMENTS,DOCUMENT_PRODUCT_NAME,DOCUMENT_PRODUCT_ORIGINAL_NAME],return_type=ColumnReturnType.SPECIFIED))
+                                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
+                                                                                ColumnsToGet([DOCUMENT_PRODUCT_ATTACHMENTS,DOCUMENT_PRODUCT_NAME,DOCUMENT_PRODUCT_ORIGINAL_NAME,DOCUMENT_PRODUCT_DOCID],return_type=ColumnReturnType.SPECIFIED))
+
             list_data = getRow_ots(rows)
+            _count = 0
             for data in list_data:
                 _id = data.get(DOCUMENT_PRODUCT_ID)
                 if _id in self.set_product_attachment:
                     continue
-                self.product_attachment_queue.put(data)
+                docid = data.get(DOCUMENT_PRODUCT_DOCID)
+                if docid not in dict_docid_list:
+                    dict_docid_list[docid] = []
+                dict_docid_list[docid].append(data)
+
                 list_id.append(_id)
-        self.set_product_attachment =  set(list_id)
+                _count += 1
+            while next_token:
+                if len(dict_docid_list.keys())>=self.product_attachment_queue_size:
+                    break
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
+                                                                                    SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                                    ColumnsToGet([DOCUMENT_PRODUCT_ATTACHMENTS,DOCUMENT_PRODUCT_NAME,DOCUMENT_PRODUCT_ORIGINAL_NAME,DOCUMENT_PRODUCT_DOCID],return_type=ColumnReturnType.SPECIFIED))
+                list_data = getRow_ots(rows)
+                for data in list_data:
+                    _id = data.get(DOCUMENT_PRODUCT_ID)
+                    if _id in self.set_product_attachment:
+                        continue
+                    docid = data.get(DOCUMENT_PRODUCT_DOCID)
+                    if docid not in dict_docid_list:
+                        dict_docid_list[docid] = []
+                    dict_docid_list[docid].append(data)
+                    list_id.append(_id)
+                    _count += 1
+            for k,v in dict_docid_list.items():
+                self.product_attachment_queue.put(v)
+            self.set_product_attachment =  set(list_id)
 
     def get_whole_html(self,_filemd5):
         atta = attachment({attachment_filemd5:_filemd5})
@@ -96,9 +113,12 @@ class Product_Attachment_Processor():
         if _cache_html is not None:
             _html = _cache_html
         else:
-            if atta.fix_columns(self.ots_client,[attachment_path,attachment_filetype],True):
+            if atta.fix_columns(self.ots_client,[attachment_path,attachment_filetype,attachment_size],True):
                 objectPath = atta.getProperties().get(attachment_path)
                 _filetype = atta.getProperties().get(attachment_filetype)
+                _size = atta.getProperties().get(attachment_size,0)
+                if _size<=0 or _size>=20*1024*1024:
+                    return _html
 
                 # not supported on windows
                 # if _filetype in ("doc","xls"):
@@ -121,7 +141,7 @@ class Product_Attachment_Processor():
                     download_succeed = False
                 if download_succeed:
                     try:
-
+                        start_time = time.time()
                         if os.path.exists(localhtml):
                             _html = open(localhtml,"r",encoding="utf8").read()
                             _success = True
@@ -129,7 +149,8 @@ class Product_Attachment_Processor():
                             _success = True
                         else:
                             _data_base64 = base64.b64encode(open(localpath,"rb").read())
-                            _success,_html,swf_images,classification = getAttachDealInterface(_data_base64,_filetype,kwargs={'page_no': '1,-1',"max_bytes":"-1"},timeout=6000)
+
+                            _success,_html,swf_images,classification = getAttachDealInterface(_data_base64,_filetype,kwargs={'page_no': '1,-1',"max_bytes":"-1","timeout":6000},timeout=6000)
 
                             if _success:
                                 db.set(_key,_html,24*60*60)
@@ -138,6 +159,11 @@ class Product_Attachment_Processor():
                                 # with open(localhtml,"w",encoding="utf8") as f:
                                 #     f.write(_html)
 
+                    except ConnectionError as e1:
+                        if time.time()-start_time>5000:
+                            db.set(_key,_html,24*60*60)
+                        else:
+                            raise e1
                     except Exception as e:
                         traceback.print_exc()
                     finally:
@@ -147,89 +173,99 @@ class Product_Attachment_Processor():
                             pass
                         except Exception as e:
                             pass
+            else:
+                log("attachment %s not exists"%_filemd5)
         return _html
 
-    def process_parameters_handler(self,item,result_queue):
-        attachments = item.get(DOCUMENT_PRODUCT_ATTACHMENTS)
-        product_name = item.get(DOCUMENT_PRODUCT_NAME)
-        product_original_name = item.get(DOCUMENT_PRODUCT_ORIGINAL_NAME)
-        list_product = []
-        if product_original_name is not None:
-            _l = product_original_name.split("_")
-            _l.reverse()
-            list_product.extend(_l)
-        if product_name is not None:
-            list_product.append(product_name)
-        list_product = list(set(list_product))
-        dp = Document_product(item)
-        if attachments is None or attachments=="" or len(list_product)==0:
-            dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_no_bidfile)
-            dp.update_row(self.ots_client)
-            return
-        list_attachment = json.loads(attachments)
-        list_filemd5 = [a.get("filemd5","") for a in list_attachment]
-        _find = False
-        _success = False
-        list_text = []
-        for _filemd5 in list_filemd5:
-            _html = self.get_whole_html(_filemd5)
-            if len(_html)>5:
-
-                pd = ParseDocument(_html,True)
-                for _product in list_product:
-                    pd.fix_tree(_product)
-                    list_data = pd.tree
-                    _text,_count = extract_product_parameters(list_data,_product)
-                    if _count>0:
-                        _find = True
-                    if _text is not None:
-                        list_text.append(_text)
-
-                pd = ParseDocument(_html,False)
-                for _product in list_product:
-                    pd.fix_tree(_product)
-                    list_data = pd.tree
-                    _text,_count = extract_product_parameters(list_data,_product)
-                    if _count>0:
-                        _find = True
-                    if _text is not None:
-                        list_text.append(_text)
-            else:
-                log("product attachment process filemd5 %s has no content"%(_filemd5))
-
-        if len(list_text)>0:
-            _text = getBestProductText(list_text,'',[])
-            logger.info("extract_parameter_text bid_filemd5s:%s name:%s original_name:%s parameter_text:%s"%(str(list_filemd5),product_name,product_original_name,_text))
-            dp.setValue(DOCUMENT_PRODUCT_PARAMETER,_text,True)
-            dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_succeed,True)
-            dp.update_row(self.ots_client)
-        else:
-            if not _find:
-                dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_not_found,True)
-                dp.update_row(self.ots_client)
-            else:
-                dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_failed,True)
+    def process_parameters_handler(self,list_item,result_queue):
+        for item in list_item:
+            attachments = item.get(DOCUMENT_PRODUCT_ATTACHMENTS)
+            product_name = item.get(DOCUMENT_PRODUCT_NAME)
+            product_original_name = item.get(DOCUMENT_PRODUCT_ORIGINAL_NAME)
+            list_product = []
+            log("processing name:%s original_name:%s attachments:%s"%(product_name,product_original_name,attachments))
+            if product_original_name is not None:
+                _l = product_original_name.split("_")
+                _l.reverse()
+                list_product.extend(_l)
+            if product_name is not None:
+                list_product.append(product_name)
+            list_product = list(set(list_product))
+            dp = Document_product(item)
+            if attachments is None or attachments=="" or len(list_product)==0:
+                dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_no_bidfile)
                 dp.update_row(self.ots_client)
+                return
+            list_attachment = json.loads(attachments)
+            list_attachment.sort(key=lambda x:0 if x.get("classification")=="招标文件" else 1 if x.get("classification")=="采购清单" else 2)
+            list_filemd5 = [a.get("filemd5","") for a in list_attachment]
+            _find = False
+            _success = False
+            list_text = []
+            for _filemd5 in list_filemd5:
+                _html = self.get_whole_html(_filemd5)
+                if len(_html)>5:
+
+                    pd = ParseDocument(_html,True)
+                    for _product in list_product:
+                        pd.fix_tree(_product)
+                        list_data = pd.tree
+                        _text,_count = extract_product_parameters(list_data,_product)
+                        if _count>0:
+                            _find = True
+                        if _text is not None:
+                            list_text.append(_text)
+
+                    pd = ParseDocument(_html,False)
+                    for _product in list_product:
+                        pd.fix_tree(_product)
+                        list_data = pd.tree
+                        _text,_count = extract_product_parameters(list_data,_product)
+                        if _count>0:
+                            _find = True
+                        if _text is not None:
+                            list_text.append(_text)
+                else:
+                    log("product attachment process filemd5 %s has no content"%(_filemd5))
+                if len(list_text)>0:
+                    _text = getBestProductText(list_text,'',[])
+                    logger.info("extract_parameter_text bid_filemd5s:%s name:%s original_name:%s parameter_text:%s"%(str(list_filemd5),product_name,product_original_name,_text))
+                    dp.setValue(DOCUMENT_PRODUCT_PARAMETER,_text,True)
+                    dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_succeed,True)
+                    dp.update_row(self.ots_client)
+                    _success = True
+                    break
+
+            if not _success:
+                if not _find:
+                    dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_not_found,True)
+                    dp.update_row(self.ots_client)
+                else:
+                    dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_failed,True)
+                    dp.update_row(self.ots_client)
 
     def start_process(self):
-        mt = MultiThreadHandler(self.product_attachment_queue,self.process_parameters_handler,None,2,need_stop=False,restart=True)
+        self.process_parameters_producer()
+        thread_count = 7
+        mt = MultiThreadHandler(self.product_attachment_queue,self.process_parameters_handler,None,thread_count,need_stop=False,restart=True)
         mt.run()
 
     def process_parameters_comsumer(self,):
-        process_count = 2
-        list_process = []
-        for i in range(process_count):
-            p = Process(target=self.start_process)
-            list_process.append(p)
-        for p in list_process:
-            p.start()
-        for p in list_process:
-            p.join()
+        # process_count = 2
+        # list_process = []
+        # for i in range(process_count):
+        #     p = Process(target=self.start_process)
+        #     list_process.append(p)
+        # for p in list_process:
+        #     p.start()
+        # for p in list_process:
+        #     p.join()
+        self.start_process()
 
 
     def start_process_parameters(self):
         scheduler = BlockingScheduler()
-        scheduler.add_job(self.process_parameters_producer,"cron",second="*/10")
+        scheduler.add_job(self.process_parameters_producer,"cron",second="*/20")
         scheduler.add_job(self.process_parameters_comsumer,"cron",second="*/30")
         scheduler.start()
 

+ 4 - 0
BaseDataMaintenance/start_product.py

@@ -11,6 +11,7 @@ def main(args=None):
     parser.add_argument("--search_similar",dest="search_similar",action="store_true",help="start product_dict_synchonize process")
     parser.add_argument("--start_process_product",dest="start_process_product",action="store_true",help="start product_dict_synchonize process")
     parser.add_argument("--test",dest="test",action="store_true",help="start product_dict_synchonize process")
+    parser.add_argument("--start_extract_parameter",dest="start_extract_parameter",action="store_true",help="start extract_parameter")
 
     args = parser.parse_args(args)
     if args.product_dict_synchonize:
@@ -28,6 +29,9 @@ def main(args=None):
     if args.test:
         from BaseDataMaintenance.maintenance.product.products import test
         test()
+    if args.start_extract_parameter:
+        from BaseDataMaintenance.maintenance.product.product_parameter import start_process_parameters
+        start_process_parameters()
 
 
 if __name__ == '__main__':