123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394 |
- from apscheduler.schedulers.blocking import BlockingScheduler
- from tablestore import *
- from BaseDataMaintenance.dataSource.source import getConnect_ots,getAuth,is_internal
- from BaseDataMaintenance.dataSource.interface import *
- from multiprocessing import Queue as PQueue
- from multiprocessing import Process
- from BaseDataMaintenance.model.ots.document_product import *
- from BaseDataMaintenance.model.ots.attachment import *
- from BaseDataMaintenance.model.ots.document import *
- from BaseDataMaintenance.common.Utils import *
- from BaseDataMaintenance.common.ossUtils import *
- from BaseDataMaintenance.maintenance.product.htmlparser import *
- from BaseDataMaintenance.maintenance.product.productUtils import pool_product
- import oss2
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- parameter_status_no_bidfile = -1
- parameter_status_to_process = 0
- parameter_status_process_succeed = 1
- parameter_status_process_failed = 2
- parameter_status_process_jump = 3
- parameter_status_not_found = 4
- parameter_status_to_process_his = 100
- import redis
- from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
- class Product_Attachment_Processor():
- def __init__(self,):
- self.ots_client = getConnect_ots()
- self.product_attachment_queue = PQueue()
- self.product_attachment_processed_queue = PQueue()
- self.product_attachment_queue_size = 50
- self.set_product_attachment = set()
- self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
- self.auth = getAuth()
- oss2.defaults.connection_pool_size = 100
- oss2.defaults.multiget_num_threads = 20
- if is_internal:
- self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
- else:
- self.bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
- log("bucket_url:%s"%(self.bucket_url))
- self.attachment_bucket_name = "attachment-hub"
- self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
- self.current_path = os.path.dirname(__file__)
- self.download_path = "%s/%s"%(self.current_path,"download")
- self.test_url="http://192.168.2.102:15011/convert"
- def getAttachments(self,docid):
- list_atta = []
- partitionkey = docid%500+1
- doc = Document({document_partitionkey:partitionkey,
- document_docid:docid})
- doc.fix_columns(self.ots_client,[document_attachment_path],True)
- page_attachments = doc.getProperties().get(document_attachment_path)
- if page_attachments is not None and page_attachments!="":
- attachments = json.loads(page_attachments)
- for _a in attachments:
- _filemd5 = _a.get(document_attachment_path_filemd5)
- _da = {attachment_filemd5:_filemd5}
- _attach = attachment(_da)
- if _attach.fix_columns(ots_client,[attachment_classification,attachment_filetype],True):
- _da[attachment_classification] = _attach.getProperties().get(attachment_classification)
- _da[attachment_filetype] = _attach.getProperties().get(attachment_filetype)
- list_atta.append(_da)
- return json.dumps(list_atta,ensure_ascii=False)
- def process_parameters_producer(self,):
- attachment_size = getQueueSize("dataflow_attachment")
- if attachment_size<100:
- while 1:
- try:
- _id = self.product_attachment_processed_queue.get(False)
- if _id in self.set_product_attachment:
- self.set_product_attachment.remove(_id)
- except Exception as e:
- break
- _qsize = self.product_attachment_queue.qsize()
- log("product_attachment_queue %d"%(_qsize))
- if _qsize>self.product_attachment_queue_size/3:
- return
- bool_query = BoolQuery(should_queries=[
- # TermQuery(DOCUMENT_PRODUCT_DOCID,305253400)
- TermQuery(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_to_process),
- TermQuery(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_to_process_his),
- BoolQuery(must_not_queries=[ExistsQuery(DOCUMENT_PRODUCT_PARAMETER_STATUS)])
- ])
- list_id = []
- dict_docid_list = {}
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",sort_order=SortOrder.DESC)]),limit=100,get_total_count=True),
- ColumnsToGet([DOCUMENT_PRODUCT_ATTACHMENTS,DOCUMENT_PRODUCT_NAME,DOCUMENT_PRODUCT_ORIGINAL_NAME,DOCUMENT_PRODUCT_DOCID],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- _count = 0
- for data in list_data:
- _id = data.get(DOCUMENT_PRODUCT_ID)
- list_id.append(_id)
- if _id in self.set_product_attachment:
- continue
- self.set_product_attachment.add(_id)
- docid = data.get(DOCUMENT_PRODUCT_DOCID)
- if docid not in dict_docid_list:
- dict_docid_list[docid] = []
- dict_docid_list[docid].append(data)
- _count += 1
- while next_token:
- if len(dict_docid_list.keys())>=self.product_attachment_queue_size:
- break
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet([DOCUMENT_PRODUCT_ATTACHMENTS,DOCUMENT_PRODUCT_NAME,DOCUMENT_PRODUCT_ORIGINAL_NAME,DOCUMENT_PRODUCT_DOCID],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- for data in list_data:
- _id = data.get(DOCUMENT_PRODUCT_ID)
- list_id.append(_id)
- if _id in self.set_product_attachment:
- continue
- self.set_product_attachment.add(_id)
- docid = data.get(DOCUMENT_PRODUCT_DOCID)
- if docid not in dict_docid_list:
- dict_docid_list[docid] = []
- dict_docid_list[docid].append(data)
- _count += 1
- for k,v in dict_docid_list.items():
- if v[0].get(DOCUMENT_PRODUCT_ATTACHMENTS) is None:
- _attachments = self.getAttachments(v[0].get(DOCUMENT_PRODUCT_DOCID))
- for _v in v:
- _v[DOCUMENT_PRODUCT_ATTACHMENTS] = _attachments
- self.product_attachment_queue.put(v)
- _qsize = self.product_attachment_queue.qsize()
- log("after product_attachment_queue %d"%(_qsize))
- # self.set_product_attachment = set(list_id[-20000:])
- def get_whole_html(self,_filemd5):
- atta = attachment({attachment_filemd5:_filemd5})
- _html = ""
- db = redis.Redis(connection_pool=pool_product)
- _key = "filemd5:%s"%(_filemd5)
- _cache_html = None
- try:
- _cache_html = db.get(_key)
- except Exception as e:
- logger.info("get redis cache html error")
-
- if _cache_html is not None:
- _html = _cache_html
- else:
- if atta.fix_columns(self.ots_client,[attachment_path,attachment_filetype,attachment_size],True):
- objectPath = atta.getProperties().get(attachment_path)
- _filetype = atta.getProperties().get(attachment_filetype)
- _size = atta.getProperties().get(attachment_size,0)
- if _size<=0 or _size>=20*1024*1024:
- return _html
- # not supported on windows
- # if _filetype in ("doc","xls"):
- # if len(list_filemd5)==1:
- # dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_jump,True)
- # dp.update_row(self.ots_client)
- # return
- # else:
- # continue
- localpath = "%s/%s.%s"%(self.download_path,_filemd5,_filetype)
- localhtml = "%s/%s.%s"%(self.download_path,_filemd5,"html")
- download_succeed = False
- try:
- if not os.path.exists(localpath):
- download_succeed = downloadFile(self.bucket,objectPath,localpath)
- else:
- download_succeed = True
- except Exception as e:
- download_succeed = False
- if download_succeed:
- try:
- start_time = time.time()
- if os.path.exists(localhtml):
- _html = open(localhtml,"r",encoding="utf8").read()
- _success = True
- if len(_html)>10:
- _success = True
- else:
- _data_base64 = base64.b64encode(open(localpath,"rb").read())
- _success,_html,swf_images,classification = getAttachDealInterface(_data_base64,_filetype,kwargs={'page_no': '1,-1',"max_bytes":"-1","timeout":6000},timeout=6000)
- if _success:
- db.set(_key,_html,24*60*60)
- # save for dubug
- # localhtml = "%s/%s.%s"%(self.download_path,_filemd5,"html")
- # with open(localhtml,"w",encoding="utf8") as f:
- # f.write(_html)
- except ConnectionError as e1:
- if time.time()-start_time>5000:
- db.set(_key,_html,24*60*60)
- else:
- raise e1
- except Exception as e:
- traceback.print_exc()
- finally:
- try:
- if os.path.exists(localpath):
- os.remove(localpath)
- pass
- except Exception as e:
- pass
- else:
- log("attachment %s not exists"%_filemd5)
- return _html
- def process_parameters_handler(self,list_item,result_queue):
- for item in list_item:
- try:
- attachments = item.get(DOCUMENT_PRODUCT_ATTACHMENTS)
- product_name = item.get(DOCUMENT_PRODUCT_NAME)
- product_original_name = item.get(DOCUMENT_PRODUCT_ORIGINAL_NAME)
- list_product = []
- log("processing name:%s original_name:%s attachments:%s"%(product_name,product_original_name,attachments))
- if product_original_name is not None:
- _l = product_original_name.split("_")
- _l.reverse()
- list_product.extend(_l)
- if product_name is not None:
- list_product.append(product_name)
- list_product = list(set(list_product))
- dp = Document_product(item)
- if attachments is None or attachments=="" or len(list_product)==0:
- dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_no_bidfile,True)
- dp.update_row(self.ots_client)
- return
- if isinstance(attachments,str):
- list_attachment = json.loads(attachments)
- elif isinstance(attachments,list):
- list_attachment = attachments
- else:
- log("attachment type error")
- dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_no_bidfile,True)
- dp.update_row(self.ots_client)
- return
- list_attachment.sort(key=lambda x:0 if x.get("classification")=="招标文件" else 1 if x.get("classification")=="采购清单" else 2)
- list_filemd5 = [a.get("filemd5","") for a in list_attachment]
- _find = False
- _success = False
- list_text = []
- for _filemd5 in list_filemd5:
- _html = self.get_whole_html(_filemd5)
- if len(_html)>5:
- pd = ParseDocument(_html,True)
- for _product in list_product:
- pd.fix_tree(_product)
- list_data = pd.tree
- _text,_count = extract_product_parameters(list_data,_product)
- if _count>0:
- _find = True
- if _text is not None:
- list_text.append(_text)
- pd = ParseDocument(_html,False)
- for _product in list_product:
- pd.fix_tree(_product)
- list_data = pd.tree
- _text,_count = extract_product_parameters(list_data,_product)
- if _count>0:
- _find = True
- if _text is not None:
- list_text.append(_text)
- else:
- log("product attachment process filemd5 %s has no content"%(_filemd5))
- if len(list_text)>0:
- _html = getBestProductText(list_text,'',[])
- _html = format_text(_html)
- _soup = BeautifulSoup(_html,"lxml")
- _text = _soup.get_text()
- logger.info("extract_parameter_text bid_filemd5s:%s name:%s original_name:%s parameter_text:%s"%(str(list_filemd5),product_name,product_original_name,_text))
- dp.setValue(DOCUMENT_PRODUCT_PARAMETER,_text,True)
- dp.setValue(DOCUMENT_PRODUCT_PARAMETER_HTML,_html,True)
- dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_succeed,True)
- dp.setValue(DOCUMENT_PRODUCT_IS_PARAMETER,1,True)
- dp.update_row(self.ots_client)
- _success = True
- break
- if not _success:
- if not _find:
- dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_not_found,True)
- dp.update_row(self.ots_client)
- else:
- dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_failed,True)
- dp.update_row(self.ots_client)
- except Exception as e:
- traceback.print_exc()
- finally:
- self.product_attachment_processed_queue.put(item.get(DOCUMENT_PRODUCT_ID))
- def start_process(self):
- self.process_parameters_producer()
- thread_count = 7
- mt = MultiThreadHandler(self.product_attachment_queue,self.process_parameters_handler,None,thread_count,need_stop=False,restart=True)
- mt.run()
- def process_parameters_comsumer(self,):
- # process_count = 3
- # list_process = []
- # for i in range(process_count):
- # p = Process(target=self.start_process)
- # list_process.append(p)
- # for p in list_process:
- # p.start()
- # for p in list_process:
- # p.join()
- self.start_process()
- def start_process_parameters(self):
- scheduler = BlockingScheduler()
- scheduler.add_job(self.process_parameters_producer,"cron",second="*/20")
- scheduler.add_job(self.process_parameters_comsumer,"cron",second="*/30")
- scheduler.start()
- def start_process_parameters():
- pap = Product_Attachment_Processor()
- pap.start_process_parameters()
- def change_parameters_status():
- ots_client =getConnect_ots()
- bool_query = BoolQuery(must_queries=[
- RangeQuery("parameter_status",-1)
- ],
- must_not_queries=[
- TermQuery("parameter_status",parameter_status_to_process),
- TermQuery("parameter_status",parameter_status_process_succeed),
- TermQuery("parameter_status",parameter_status_process_jump),
- TermQuery("parameter_status",parameter_status_no_bidfile),
- TermQuery("parameter_status",parameter_status_not_found),
- ])
- list_data = []
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("parameter_status")]),limit=100,get_total_count=True),
- ColumnsToGet([DOCUMENT_PRODUCT_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- print("total_count",total_count)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet([DOCUMENT_PRODUCT_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- from queue import Queue
- task_queue = Queue()
- for data in list_data:
- task_queue.put(data)
- def _handle(data,result_queue):
- dp = Document_product(data)
- dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_to_process,True)
- dp.setValue(DOCUMENT_PRODUCT_PARAMETER,"",True)
- dp.update_row(ots_client)
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- if __name__ == '__main__':
- # start_process_parameters()
- # change_parameters_status()
- ots_client = getConnect_ots()
- a = Document_product({DOCUMENT_PRODUCT_ID:"00000d8f94ba32d840c21fc9343ce4fb"})
- a.fix_columns(ots_client,[DOCUMENT_PRODUCT_PARAMETER,DOCUMENT_PRODUCT_IS_PARAMETER],True)
- with open("a.html","w",encoding="utf8") as f:
- f.write(a.getProperties().get(DOCUMENT_PRODUCT_PARAMETER))
- print(a.getProperties().get(DOCUMENT_PRODUCT_PARAMETER))
|