Explorar o código

附件识别流程增加公告附件分类

luojiehua %!s(int64=2) %!d(string=hai) anos
pai
achega
7d2e0ba062

+ 2 - 1
BaseDataMaintenance/dataSource/interface.py

@@ -39,11 +39,12 @@ def getAttachDealInterface(_data,_type,path="",restry=1):
                 _result = json.loads(_resp.content.decode())
                 _html = "".join(_result.get("result_html",""))
                 swf_images = _result.get("swf_images",[])
+                classification = _result.get("classification","")
                 if _result["is_success"]==1:
                     _succeed = True
                     # print(_result)
                     _html = "".join(_result["result_html"])
-                    return _succeed,_html,swf_images
+                    return _succeed,_html,swf_images,classification
                 else:
                     pass
         except ConnectionError as e1:

+ 8 - 6
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -221,13 +221,14 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 log("md5:%s path:%s exists"%(filemd5,objectPath[5:]))
             if not (local_exists or download_succeed):
                 _ots_attach = attachment(attach.getProperties_ots())
-                _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True)
+                _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_classification,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True)
                 log("md5:%s path:%s file not in local or oss,search ots.attachment"%(filemd5,objectPath[5:]))
                 if _ots_attach.getProperties().get(attachment_attachmenthtml,"")!="":
                     attach.setValue(attachment_attachmenthtml,_ots_attach.getProperties().get(attachment_attachmenthtml,""))
                     attach.setValue(attachment_attachmentcon,_ots_attach.getProperties().get(attachment_attachmentcon,""))
                     attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,""))
                     attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,""))
+                    attach.setValue(attachment_classification,_ots_attach.getProperties().get(attachment_classification,""))
                     if attach.exists(self.attach_pool):
                         attach.update_row(self.attach_pool)
                     else:
@@ -279,7 +280,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
                 # _data_base64 = base64.b64encode(open(localpath,"rb").read())
                 # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
-                _success,_html,swf_images = getAttachDealInterface(None,_filetype,path=localpath)
+                _success,_html,swf_images,classification = getAttachDealInterface(None,_filetype,path=localpath)
                 log("process filemd5:%s %s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
                 if _success:
                     if len(_html)<5:
@@ -328,6 +329,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
                 attach.setValue(attachment_recsize,len(_html),True)
                 attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
+                attach.setValue(attachment_classification,classification,True)
 
 
                 #更新ots
@@ -525,8 +527,8 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
 
         self.industy_url = "http://127.0.0.1:15000/industry_extract"
 
-        self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",17],
-                                   ["http://192.168.0.115:15030/content_extract",3]
+        self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",15],
+                                   ["http://192.168.0.115:15030/content_extract",7]
                                    ]
 
 
@@ -546,7 +548,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
         self.block_url = RLock()
         self.url_count = 0
 
-        self.comsumer_count = 20
+        self.comsumer_count = 30
         for _i in range(self.comsumer_count):
             listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
             createComsumer(listener_extract,self.mq_extract)
@@ -621,7 +623,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
         self.comsumer()
 
     def comsumer(self):
-        mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,self.comsumer_count,1,True)
+        mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,20,1,True)
         mt.run()
 
 

+ 1 - 0
BaseDataMaintenance/model/ots/attachment.py

@@ -33,6 +33,7 @@ attachment_crtime = "crtime"
 attachment_attachmenthtml = "attachmenthtml"
 attachment_attachmentcon = "attachmentcon" #-1代表逻辑错误,-2接口挂了,-3文件格式错误
 attachment_process_time = "process_time"
+attachment_classification = "classification"
 attachment_page_time = "page_time"
 attachment_docids = "docids"
 attachment_filetype = "filetype"

+ 0 - 0
BaseDataMaintenance/model/ots_capacity/__init__.py


+ 137 - 0
BaseDataMaintenance/model/ots_capacity/credit_item_info.py

@@ -0,0 +1,137 @@
+from BaseDataMaintenance.model.ots.BaseModel import BaseModel
+from tablestore import *
+from BaseDataMaintenance.common.Utils import *
+from bs4 import BeautifulSoup
+
+
+
+class Credit_item_info(BaseModel):
+
+    def __init__(self,_dict):
+        BaseModel.__init__(self)
+        for k,v in _dict.items():
+            self.setValue(k,v,True)
+        self.table_name = "credit_item_info"
+        self.prefixs = ["www.bidizhaobiao.com","bxkc.oss-cn-shanghai.aliyuncs.com"]
+
+    def getPrimary_keys(self):
+        return ["record_id"]
+
+    def getAll_columns(self):
+        return ["record_id","qylb"]
+
+
+
+def drop_credit_item():
+    from BaseDataMaintenance.dataSource.source import getConnect_ots_capacity
+    from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+    import queue
+    from threading import Thread
+    import json
+    task_queue = queue.Queue()
+    from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
+    ots_capacity = getConnect_ots_capacity()
+    from BaseDataMaintenance.model.ots.document_extract2 import Document_extract2
+
+    def producer(task_queue,ots_client):
+
+
+        bool_query = BoolQuery(must_queries=[
+            TermQuery("web_source","青岛市建设市场监管与信用信息综合平台"),
+            BoolQuery(must_queries=[
+                # TermQuery("tenderee","山西利民工业有限责任公司"),
+                # MatchPhraseQuery("doctitle","中国电信"),
+                # MatchPhraseQuery("doctextcon","中国电信"),
+                # MatchPhraseQuery("attachmenttextcon","中国电信")]),
+                WildcardQuery("qylb","建筑行业-施工企业*")
+                # RangeQuery("page_time","2021-12-20","2022-01-05",True,False),
+                #,TermQuery(document_docid,171146519)
+                ]
+            ),
+            # TermQuery("docid",228359000)
+        ],
+            # must_not_queries=[NestedQuery("sub_docs_json",WildcardQuery("sub_docs_json.win_tenderer","*"))]
+        )
+
+        rows,next_token,total_count,is_all_succeed = ots_client.search("credit_item_info","credit_item_info_index",
+                                                                       SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("create_time",SortOrder.DESC)]),limit=100,get_total_count=True),
+                                                                       columns_to_get=ColumnsToGet(["qylb"],return_type=ColumnReturnType.SPECIFIED))
+        list_data = getRow_ots(rows)
+        print(total_count)
+        _count = len(list_data)
+        for _data in list_data:
+            task_queue.put(_data)
+        while next_token:
+            rows,next_token,total_count,is_all_succeed = ots_client.search("credit_item_info","credit_item_info_index",
+                                                                           SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                           columns_to_get=ColumnsToGet(["qylb"],return_type=ColumnReturnType.SPECIFIED))
+            list_data = getRow_ots(rows)
+            _count += len(list_data)
+            print("%d/%d"%(_count,total_count))
+            for _data in list_data:
+                task_queue.put(_data)
+
+        # docids = [223820830,224445409]
+        # for docid in docids:
+        #     _dict = {document_docid:int(docid),
+        #              document_partitionkey:int(docid)%500+1,
+        #              }
+        #     task_queue.put(Document(_dict))
+        # import pandas as pd
+        # df = pd.read_excel("2022-01-19_214304_export11.xlsx")
+        # for docid,tenderee,win in zip(df["docid"],df["招标单位"],df["中标单位"]):
+        #     if not isinstance(tenderee,(str)) or not isinstance(win,(str)) or win=="" or tenderee=="":
+        #         # print(docid)
+        #         _dict = {document_docid:int(docid),
+        #                  document_partitionkey:int(docid)%500+1,
+        #                  }
+        #         task_queue.put(Document(_dict))
+        log("task_queue size:%d"%(task_queue.qsize()))
+
+    def _handle(item,result_queue,ots_client):
+        #change attach value
+        # list_attachment = json.loads(item.getProperties().get(document_attachment_path))
+        # print("docid",item.getProperties().get(document_docid))
+        # for attach in list_attachment:
+        #
+        #     filemd5 = attach.get(document_attachment_path_filemd5,"")
+        #     _document_html = item.getProperties().get(document_dochtmlcon,"")
+        #
+        #     _file_title = item.getTitleFromHtml(filemd5,_document_html)
+        #     filelink = item.getSourceLinkFromHtml(filemd5,_document_html)
+        #     attach[document_attachment_path_fileTitle] = _file_title
+        #     attach[document_attachment_path_fileLink] = filelink
+        #
+        # item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
+        # item.all_columns.remove(document_dochtmlcon)
+
+        #change status
+        # item.setValue(document_docchannel,item.getProperties().get(document_original_docchannel),True)
+        # item.setValue(document_status,random.randint(151,170),True)
+        # item.update_row(ots_client)
+        # log("update %d status done"%(item.getProperties().get(document_docid)))
+        # _dict = {}
+        # _dict.update(item)
+        # _dict.pop("status")
+        # _dict["status"] = 1
+        # print(_dict)
+        # _document = Document(_dict)
+        # _document.update_row(ots_client)
+        # _d_extract = Document_extract2(_dict)
+        # _d_extract.delete_row(ots_client)
+        _credit = Credit_item_info(item)
+        _credit.setValue("qylb",re.sub("建筑行业-施工企业-","",_credit.getProperties().get("qylb","")),True)
+        print(_credit.getAttribute_turple())
+        _credit.update_row(ots_capacity)
+        pass
+
+
+    t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_capacity})
+    t_producer.start()
+    t_producer.join()
+    mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_capacity)
+    mt.run()
+
+
+if __name__=="__main__":
+    drop_credit_item()

+ 2 - 1
BaseDataMaintenance/model/postgres/attachment.py

@@ -33,6 +33,7 @@ attachment_crtime = "crtime"
 attachment_attachmenthtml = "attachmenthtml"
 attachment_attachmentcon = "attachmentcon" #-1代表逻辑错误,-2接口挂了,-3文件格式错误
 attachment_process_time = "process_time"
+attachment_classification = "classification"
 attachment_page_time = "page_time"
 attachment_docids = "docids"
 attachment_filetype = "filetype"
@@ -56,7 +57,7 @@ class Attachment_postgres(BaseModel):
     #filemd5,path,crtime,attachmenthtml,attachmentcon,process_time,page_time,docids,filetype,status,sourcelink,filetitle
 
     def __init__(self,_dict):
-        self.columns = set([attachment_filemd5,attachment_path,attachment_crtime,attachment_attachmenthtml,attachment_attachmentcon,attachment_process_time,attachment_docids,attachment_filetype,attachment_status,attachment_file_title,attachment_file_link,attachment_recsize,attachment_swfUrls,attachment_size,attachment_has_table])
+        self.columns = set([attachment_filemd5,attachment_path,attachment_crtime,attachment_attachmenthtml,attachment_attachmentcon,attachment_classification ,attachment_process_time,attachment_docids,attachment_filetype,attachment_status,attachment_file_title,attachment_file_link,attachment_recsize,attachment_swfUrls,attachment_size,attachment_has_table])
         for k,v in _dict.items():
             if k in self.columns:
                 # if k in ("attachmenthtml","attachmentcon") and v is not None: