|
@@ -9,6 +9,7 @@ from multiprocessing import Queue as PQueue
|
|
|
from multiprocessing import Process
|
|
|
from BaseDataMaintenance.model.ots.document_product import *
|
|
|
from BaseDataMaintenance.model.ots.attachment import *
|
|
|
+from BaseDataMaintenance.model.ots.document import *
|
|
|
from BaseDataMaintenance.common.Utils import *
|
|
|
from BaseDataMaintenance.common.ossUtils import *
|
|
|
from BaseDataMaintenance.maintenance.product.htmlparser import *
|
|
@@ -23,6 +24,9 @@ parameter_status_process_failed = 2
|
|
|
parameter_status_process_jump = 3
|
|
|
parameter_status_not_found = 4
|
|
|
|
|
|
+parameter_status_to_process_his = 100
|
|
|
+
|
|
|
+
|
|
|
import redis
|
|
|
|
|
|
from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
|
|
@@ -32,6 +36,7 @@ class Product_Attachment_Processor():
|
|
|
def __init__(self,):
|
|
|
self.ots_client = getConnect_ots()
|
|
|
self.product_attachment_queue = PQueue()
|
|
|
+ self.product_attachment_processed_queue = PQueue()
|
|
|
self.product_attachment_queue_size = 50
|
|
|
self.set_product_attachment = set()
|
|
|
self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
|
|
@@ -49,21 +54,55 @@ class Product_Attachment_Processor():
|
|
|
self.download_path = "%s/%s"%(self.current_path,"download")
|
|
|
self.test_url="http://192.168.2.102:15011/convert"
|
|
|
|
|
|
+
|
|
|
+ def getAttachments(self,docid):
|
|
|
+
|
|
|
+ list_atta = []
|
|
|
+ partitionkey = docid%500+1
|
|
|
+ doc = Document({document_partitionkey:partitionkey,
|
|
|
+ document_docid:docid})
|
|
|
+ doc.fix_columns(self.ots_client,[document_attachment_path],True)
|
|
|
+
|
|
|
+ page_attachments = doc.getProperties().get(document_attachment_path)
|
|
|
+ if page_attachments is not None and page_attachments!="":
|
|
|
+ attachments = json.loads(page_attachments)
|
|
|
+ for _a in attachments:
|
|
|
+ _filemd5 = _a.get(document_attachment_path_filemd5)
|
|
|
+ _da = {attachment_filemd5:_filemd5}
|
|
|
+ _attach = attachment(_da)
|
|
|
+ if _attach.fix_columns(ots_client,[attachment_classification,attachment_filetype],True):
|
|
|
+ _da[attachment_classification] = _attach.getProperties().get(attachment_classification)
|
|
|
+ _da[attachment_filetype] = _attach.getProperties().get(attachment_filetype)
|
|
|
+ list_atta.append(_da)
|
|
|
+ return json.dumps(list_atta,ensure_ascii=False)
|
|
|
+
|
|
|
+
|
|
|
def process_parameters_producer(self,):
|
|
|
attachment_size = getQueueSize("dataflow_attachment")
|
|
|
if attachment_size<100:
|
|
|
+ while 1:
|
|
|
+ try:
|
|
|
+ _id = self.product_attachment_processed_queue.get(False)
|
|
|
+ if _id in self.set_product_attachment:
|
|
|
+ self.set_product_attachment.remove(_id)
|
|
|
+ except Exception as e:
|
|
|
+ break
|
|
|
|
|
|
_qsize = self.product_attachment_queue.qsize()
|
|
|
log("product_attachment_queue %d"%(_qsize))
|
|
|
if _qsize>self.product_attachment_queue_size/3:
|
|
|
return
|
|
|
- bool_query = BoolQuery(must_queries=[
|
|
|
- TermQuery("parameter_status",parameter_status_to_process)
|
|
|
+ bool_query = BoolQuery(should_queries=[
|
|
|
+ # TermQuery(DOCUMENT_PRODUCT_DOCID,305253400)
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_to_process),
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_to_process_his),
|
|
|
+ BoolQuery(must_not_queries=[ExistsQuery(DOCUMENT_PRODUCT_PARAMETER_STATUS)])
|
|
|
+
|
|
|
])
|
|
|
list_id = []
|
|
|
dict_docid_list = {}
|
|
|
rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
|
|
|
+ SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",sort_order=SortOrder.DESC)]),limit=100,get_total_count=True),
|
|
|
ColumnsToGet([DOCUMENT_PRODUCT_ATTACHMENTS,DOCUMENT_PRODUCT_NAME,DOCUMENT_PRODUCT_ORIGINAL_NAME,DOCUMENT_PRODUCT_DOCID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
|
|
list_data = getRow_ots(rows)
|
|
@@ -73,6 +112,7 @@ class Product_Attachment_Processor():
|
|
|
list_id.append(_id)
|
|
|
if _id in self.set_product_attachment:
|
|
|
continue
|
|
|
+ self.set_product_attachment.add(_id)
|
|
|
docid = data.get(DOCUMENT_PRODUCT_DOCID)
|
|
|
if docid not in dict_docid_list:
|
|
|
dict_docid_list[docid] = []
|
|
@@ -91,6 +131,7 @@ class Product_Attachment_Processor():
|
|
|
list_id.append(_id)
|
|
|
if _id in self.set_product_attachment:
|
|
|
continue
|
|
|
+ self.set_product_attachment.add(_id)
|
|
|
docid = data.get(DOCUMENT_PRODUCT_DOCID)
|
|
|
if docid not in dict_docid_list:
|
|
|
dict_docid_list[docid] = []
|
|
@@ -98,10 +139,14 @@ class Product_Attachment_Processor():
|
|
|
|
|
|
_count += 1
|
|
|
for k,v in dict_docid_list.items():
|
|
|
+ if v[0].get(DOCUMENT_PRODUCT_ATTACHMENTS) is None:
|
|
|
+ _attachments = self.getAttachments(v[0].get(DOCUMENT_PRODUCT_DOCID))
|
|
|
+ for _v in v:
|
|
|
+ _v[DOCUMENT_PRODUCT_ATTACHMENTS] = _attachments
|
|
|
self.product_attachment_queue.put(v)
|
|
|
_qsize = self.product_attachment_queue.qsize()
|
|
|
log("after product_attachment_queue %d"%(_qsize))
|
|
|
- self.set_product_attachment = set(list_id)
|
|
|
+ # self.set_product_attachment = set(list_id[-20000:])
|
|
|
|
|
|
def get_whole_html(self,_filemd5):
|
|
|
atta = attachment({attachment_filemd5:_filemd5})
|
|
@@ -185,70 +230,90 @@ class Product_Attachment_Processor():
|
|
|
|
|
|
def process_parameters_handler(self,list_item,result_queue):
|
|
|
for item in list_item:
|
|
|
- attachments = item.get(DOCUMENT_PRODUCT_ATTACHMENTS)
|
|
|
- product_name = item.get(DOCUMENT_PRODUCT_NAME)
|
|
|
- product_original_name = item.get(DOCUMENT_PRODUCT_ORIGINAL_NAME)
|
|
|
- list_product = []
|
|
|
- log("processing name:%s original_name:%s attachments:%s"%(product_name,product_original_name,attachments))
|
|
|
- if product_original_name is not None:
|
|
|
- _l = product_original_name.split("_")
|
|
|
- _l.reverse()
|
|
|
- list_product.extend(_l)
|
|
|
- if product_name is not None:
|
|
|
- list_product.append(product_name)
|
|
|
- list_product = list(set(list_product))
|
|
|
- dp = Document_product(item)
|
|
|
- if attachments is None or attachments=="" or len(list_product)==0:
|
|
|
- dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_no_bidfile,True)
|
|
|
- dp.update_row(self.ots_client)
|
|
|
- return
|
|
|
- list_attachment = json.loads(attachments)
|
|
|
- list_attachment.sort(key=lambda x:0 if x.get("classification")=="招标文件" else 1 if x.get("classification")=="采购清单" else 2)
|
|
|
- list_filemd5 = [a.get("filemd5","") for a in list_attachment]
|
|
|
- _find = False
|
|
|
- _success = False
|
|
|
- list_text = []
|
|
|
- for _filemd5 in list_filemd5:
|
|
|
- _html = self.get_whole_html(_filemd5)
|
|
|
- if len(_html)>5:
|
|
|
-
|
|
|
- pd = ParseDocument(_html,True)
|
|
|
- for _product in list_product:
|
|
|
- pd.fix_tree(_product)
|
|
|
- list_data = pd.tree
|
|
|
- _text,_count = extract_product_parameters(list_data,_product)
|
|
|
- if _count>0:
|
|
|
- _find = True
|
|
|
- if _text is not None:
|
|
|
- list_text.append(_text)
|
|
|
-
|
|
|
- pd = ParseDocument(_html,False)
|
|
|
- for _product in list_product:
|
|
|
- pd.fix_tree(_product)
|
|
|
- list_data = pd.tree
|
|
|
- _text,_count = extract_product_parameters(list_data,_product)
|
|
|
- if _count>0:
|
|
|
- _find = True
|
|
|
- if _text is not None:
|
|
|
- list_text.append(_text)
|
|
|
- else:
|
|
|
- log("product attachment process filemd5 %s has no content"%(_filemd5))
|
|
|
- if len(list_text)>0:
|
|
|
- _text = getBestProductText(list_text,'',[])
|
|
|
- logger.info("extract_parameter_text bid_filemd5s:%s name:%s original_name:%s parameter_text:%s"%(str(list_filemd5),product_name,product_original_name,_text))
|
|
|
- dp.setValue(DOCUMENT_PRODUCT_PARAMETER,_text,True)
|
|
|
- dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_succeed,True)
|
|
|
- dp.update_row(self.ots_client)
|
|
|
- _success = True
|
|
|
- break
|
|
|
-
|
|
|
- if not _success:
|
|
|
- if not _find:
|
|
|
- dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_not_found,True)
|
|
|
+ try:
|
|
|
+ attachments = item.get(DOCUMENT_PRODUCT_ATTACHMENTS)
|
|
|
+ product_name = item.get(DOCUMENT_PRODUCT_NAME)
|
|
|
+ product_original_name = item.get(DOCUMENT_PRODUCT_ORIGINAL_NAME)
|
|
|
+ list_product = []
|
|
|
+ log("processing name:%s original_name:%s attachments:%s"%(product_name,product_original_name,attachments))
|
|
|
+ if product_original_name is not None:
|
|
|
+ _l = product_original_name.split("_")
|
|
|
+ _l.reverse()
|
|
|
+ list_product.extend(_l)
|
|
|
+ if product_name is not None:
|
|
|
+ list_product.append(product_name)
|
|
|
+ list_product = list(set(list_product))
|
|
|
+ dp = Document_product(item)
|
|
|
+ if attachments is None or attachments=="" or len(list_product)==0:
|
|
|
+ dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_no_bidfile,True)
|
|
|
dp.update_row(self.ots_client)
|
|
|
+ return
|
|
|
+ if isinstance(attachments,str):
|
|
|
+ list_attachment = json.loads(attachments)
|
|
|
+ elif isinstance(attachments,list):
|
|
|
+ list_attachment = attachments
|
|
|
else:
|
|
|
- dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_failed,True)
|
|
|
+ log("attachment type error")
|
|
|
+ dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_no_bidfile,True)
|
|
|
dp.update_row(self.ots_client)
|
|
|
+ return
|
|
|
+ list_attachment.sort(key=lambda x:0 if x.get("classification")=="招标文件" else 1 if x.get("classification")=="采购清单" else 2)
|
|
|
+ list_filemd5 = [a.get("filemd5","") for a in list_attachment]
|
|
|
+ _find = False
|
|
|
+ _success = False
|
|
|
+ list_text = []
|
|
|
+ for _filemd5 in list_filemd5:
|
|
|
+ _html = self.get_whole_html(_filemd5)
|
|
|
+ if len(_html)>5:
|
|
|
+
|
|
|
+ pd = ParseDocument(_html,True)
|
|
|
+ for _product in list_product:
|
|
|
+ pd.fix_tree(_product)
|
|
|
+ list_data = pd.tree
|
|
|
+ _text,_count = extract_product_parameters(list_data,_product)
|
|
|
+ if _count>0:
|
|
|
+ _find = True
|
|
|
+ if _text is not None:
|
|
|
+ list_text.append(_text)
|
|
|
+
|
|
|
+ pd = ParseDocument(_html,False)
|
|
|
+ for _product in list_product:
|
|
|
+ pd.fix_tree(_product)
|
|
|
+ list_data = pd.tree
|
|
|
+ _text,_count = extract_product_parameters(list_data,_product)
|
|
|
+ if _count>0:
|
|
|
+ _find = True
|
|
|
+ if _text is not None:
|
|
|
+ list_text.append(_text)
|
|
|
+ else:
|
|
|
+ log("product attachment process filemd5 %s has no content"%(_filemd5))
|
|
|
+ if len(list_text)>0:
|
|
|
+ _html = getBestProductText(list_text,'',[])
|
|
|
+ _html = format_text(_html)
|
|
|
+ _soup = BeautifulSoup(_html,"lxml")
|
|
|
+ _text = _soup.get_text()
|
|
|
+
|
|
|
+ logger.info("extract_parameter_text bid_filemd5s:%s name:%s original_name:%s parameter_text:%s"%(str(list_filemd5),product_name,product_original_name,_text))
|
|
|
+ dp.setValue(DOCUMENT_PRODUCT_PARAMETER,_text,True)
|
|
|
+ dp.setValue(DOCUMENT_PRODUCT_PARAMETER_HTML,_html,True)
|
|
|
+ dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_succeed,True)
|
|
|
+ dp.setValue(DOCUMENT_PRODUCT_IS_PARAMETER,1,True)
|
|
|
+ dp.update_row(self.ots_client)
|
|
|
+ _success = True
|
|
|
+ break
|
|
|
+
|
|
|
+ if not _success:
|
|
|
+ if not _find:
|
|
|
+ dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_not_found,True)
|
|
|
+ dp.update_row(self.ots_client)
|
|
|
+ else:
|
|
|
+ dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_failed,True)
|
|
|
+ dp.update_row(self.ots_client)
|
|
|
+ except Exception as e:
|
|
|
+ traceback.print_exc()
|
|
|
+ finally:
|
|
|
+ self.product_attachment_processed_queue.put(item.get(DOCUMENT_PRODUCT_ID))
|
|
|
+
|
|
|
|
|
|
def start_process(self):
|
|
|
self.process_parameters_producer()
|
|
@@ -318,5 +383,12 @@ def change_parameters_status():
|
|
|
mt.run()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
- start_process_parameters()
|
|
|
- # change_parameters_status()
|
|
|
+ # start_process_parameters()
|
|
|
+ # change_parameters_status()
|
|
|
+ ots_client = getConnect_ots()
|
|
|
+ a = Document_product({DOCUMENT_PRODUCT_ID:"00000d8f94ba32d840c21fc9343ce4fb"})
|
|
|
+ a.fix_columns(ots_client,[DOCUMENT_PRODUCT_PARAMETER,DOCUMENT_PRODUCT_IS_PARAMETER],True)
|
|
|
+ with open("a.html","w",encoding="utf8") as f:
|
|
|
+ f.write(a.getProperties().get(DOCUMENT_PRODUCT_PARAMETER))
|
|
|
+ print(a.getProperties().get(DOCUMENT_PRODUCT_PARAMETER))
|
|
|
+
|