#coding:utf8 from BaseDataMaintenance.maintenance.dataflow import * from BaseDataMaintenance.common.activateMQUtils import * from BaseDataMaintenance.dataSource.source import getConnect_activateMQ,getConnection_postgres,getConnection_mysql,getConnection_oracle,getConnect_ots_capacity,getConnect_redis_doc from BaseDataMaintenance.dataSource.setttings import * from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres import os from BaseDataMaintenance.common.ossUtils import * from BaseDataMaintenance.dataSource.pool import ConnectorPool from BaseDataMaintenance.model.ots.document import Document,document_attachment_path_filemd5 from BaseDataMaintenance.common.Utils import article_limit from BaseDataMaintenance.common.documentFingerprint import getFingerprint from BaseDataMaintenance.model.postgres.document_extract import * from BaseDataMaintenance.model.oracle.T_SHEN_PI_XIANG_MU import * import sys sys.setrecursionlimit(1000000) from multiprocessing import Process from BaseDataMaintenance.AIUtils.DoubaoUtils import chat_doubao,get_json_from_text from BaseDataMaintenance.AIUtils.html2text import html2text_with_tablehtml from BaseDataMaintenance.AIUtils.prompts import get_prompt_extract_role from BaseDataMaintenance.common.Utils import getUnifyMoney from BaseDataMaintenance.maintenance.product.medical_product import MedicalProduct class ActiveMQListener(): def __init__(self,conn,_queue,*args,**kwargs): self.conn = conn self._queue = _queue def on_error(self, headers): log("===============") log('received an error %s' % str(headers.body)) def on_message(self, headers): log("====message====") message_id = headers.headers["message-id"] body = headers.body self._queue.put({"frame":headers,"conn":self.conn},True) def __del__(self): self.conn.disconnect() class Dataflow_ActivteMQ_attachment(Dataflow_attachment): class AttachmentMQListener(): def __init__(self,conn,_func,_idx,*args,**kwargs): self.conn = conn self._func = _func self._idx = _idx def on_error(self, headers): log("===============") log('received an error %s' % str(headers.body)) def on_message(self, headers): try: log("get message of idx:%s"%(str(self._idx))) message_id = headers.headers["message-id"] body = headers.body _dict = {"frame":headers,"conn":self.conn,"idx":self._idx} self._func(_dict=_dict) except Exception as e: traceback.print_exc() pass def __del__(self): self.conn.disconnect() def __init__(self): Dataflow_attachment.__init__(self) self.mq_attachment = "/queue/dataflow_attachment" self.mq_attachment_failed = "/queue/dataflow_attachment_failed" self.mq_extract = "/queue/dataflow_extract" self.queue_attachment_ocr = Queue() self.queue_attachment_not_ocr = Queue() self.comsumer_count = 20 self.comsumer_process_count = 5 self.retry_comsumer_count = 10 self.retry_times = 5 self.list_attachment_comsumer = [] # for _i in range(self.comsumer_count): # listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.queue_attachment) # createComsumer(listener_attachment,self.mq_attachment) # self.list_attachment_comsumer.append(listener_attachment) self.attach_pool = ConnectorPool(10,30,getConnection_postgres) self.redis_pool = ConnectorPool(10,30,getConnect_redis_doc) self.conn_mq = getConnect_activateMQ() self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ) self.session = None for _ in range(self.comsumer_process_count): listener_p = Process(target=self.start_attachment_listener) listener_p.start() # listener_p = Process(target=self.start_attachment_listener) # listener_p.start() def start_attachment_listener(self): for _i in range(self.comsumer_count): listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler ,_i) createComsumer(listener_attachment,self.mq_attachment) self.list_attachment_comsumer.append(listener_attachment) while 1: for i in range(len(self.list_attachment_comsumer)): if self.list_attachment_comsumer[i].conn.is_connected(): continue else: listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,i) createComsumer(listener,self.mq_attachment) self.list_attachment_comsumer[i] = listener time.sleep(5) def monitor_listener(self): for i in range(len(self.list_attachment_comsumer)): if self.list_attachment_comsumer[i].conn.is_connected(): continue else: listener = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment) createComsumer(listener,self.mq_attachment) self.list_attachment_comsumer[i] = listener def process_failed_attachment(self): from BaseDataMaintenance.java.MQInfo import getQueueSize attachment_size = getQueueSize("dataflow_attachment") failed_attachment_size = getQueueSize("dataflow_attachment_failed") if attachment_size<100 and failed_attachment_size>0: list_comsumer = [] for _i in range(self.retry_comsumer_count): listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i) list_comsumer.append(listener_attachment) createComsumer(listener_attachment,self.mq_attachment_failed) while 1: failed_attachment_size = getQueueSize("dataflow_attachment_failed") if failed_attachment_size==0: break time.sleep(10) for _c in list_comsumer: _c.conn.disconnect() def attachment_listener_handler(self,_dict): try: frame = _dict["frame"] conn = _dict["conn"] message_id = frame.headers["message-id"] item = json.loads(frame.body) _idx = _dict.get("idx",1) page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]")) _dochtmlcon = item.get(document_tmp_dochtmlcon,"") if random.random()<0.2: log("jump by random") if send_msg_toacmq(self.pool_mq,frame.body,self.mq_attachment): ackMsg(conn,message_id) return if len(page_attachments)==0: newitem ={"item":item,"list_attach":[],"message_id":message_id,"conn":conn} else: list_fileMd5 = [] for _atta in page_attachments: list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5)) list_attach = self.getAttachments(list_fileMd5,_dochtmlcon) newitem = {"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn} log("attachment get doc:%s"%(str(newitem.get("item",{}).get("docid")))) self.attachment_recognize(newitem,None) log("attachment get doc:%s succeed"%(str(newitem.get("item",{}).get("docid")))) except Exception as e: traceback.print_exc() def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True): try: list_html = [] swf_urls = [] _not_failed = True for _attach in list_attach: #测试全跑 _filemd5 = _attach.getProperties().get(attachment_filemd5) if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE): log("%s has processed or toolarge"%(_filemd5)) _html = _attach.getProperties().get(attachment_attachmenthtml,"") if _html is None: _html = "" list_html.append({attachment_filemd5:_filemd5, "html":_html}) else: #has process_time then jump if len(str(_attach.getProperties().get(attachment_process_time,"")))>10 and _attach.getProperties().get(attachment_status)!=ATTACHMENT_INIT and not (_attach.getProperties().get(attachment_status)>=ATTACHMENT_MC_FAILED_FROM and _attach.getProperties().get(attachment_status)<=ATTACHMENT_MC_FAILED_TO): log("%s has process_time jump"%(_filemd5)) _html = _attach.getProperties().get(attachment_attachmenthtml,"") if _html is None: _html = "" list_html.append({attachment_filemd5:_filemd5, "html":_html}) else: log("%s requesting interface"%(_filemd5)) _succeed = self.request_attachment_interface(_attach,_dochtmlcon) if not _succeed: _not_failed = False _html = _attach.getProperties().get(attachment_attachmenthtml,"") if _html is None: _html = "" list_html.append({attachment_filemd5:_filemd5, "html":_html}) if _attach.getProperties().get(attachment_filetype)=="swf": # swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]"))) _swf_urls = _attach.getProperties().get(attachment_swfUrls, "[]") if _swf_urls: _swf_urls = _swf_urls.replace('\\', '') else: _swf_urls = '[]' _swf_urls = json.loads(_swf_urls) swf_urls.extend(_swf_urls) if not _not_failed: return False,list_html,swf_urls return True,list_html,swf_urls except requests.ConnectionError as e1: raise e1 except Exception as e: return False,list_html,swf_urls def attachment_recognize(self,_dict,result_queue): ''' 识别附件内容 :param _dict: 附件内容 :param result_queue: :return: ''' try: start_time = time.time() item = _dict.get("item") list_attach = _dict.get("list_attach") conn = _dict["conn"] message_id = _dict.get("message_id") if "retry_times" not in item: item["retry_times"] = 5 _retry_times = item.get("retry_times",0) dhtml = Document_html({"partitionkey":item.get("partitionkey"), "docid":item.get("docid")}) _dochtmlcon = item.get(document_tmp_dochtmlcon,"") dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True) dhtml.delete_bidi_a() #调用识别接口 _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True) # 将附件分类写回document page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]")) if len(page_attachments)>0: for _attachment in page_attachments: filemd5 = _attachment.get(document_attachment_path_filemd5,"") classification = None for _attach in list_attach: if _attach.getProperties().get(attachment_filemd5,"")==filemd5: classification = _attach.getProperties().get(attachment_classification,"") break if classification is not None: _attachment[attachment_classification] = classification item[document_tmp_attachment_path] = json.dumps(page_attachments,ensure_ascii=False) dtmp = Document_tmp(item) _to_ack = False if not _succeed and _retry_times=self.retry_times: send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed) send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment) #失败保存 if _retry_times==0: dtmp.setValue(document_tmp_dochtmlcon,"",False) dtmp.setValue(document_tmp_status,0,True) if not dtmp.exists_row(self.ots_client): dtmp.update_row(self.ots_client) dhtml.update_row(self.ots_client) if send_succeed: _to_ack = True else: try: log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),str(len(list_html)))) dhtml.updateSWFImages(swf_urls) dhtml.updateAttachment(list_html) dtmp.setValue(document_tmp_attachment_extract_status,1,True) dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True) send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract) if send_succeed: _to_ack = True except Exception as e: traceback.print_exc() if _to_ack: ackMsg(conn,message_id) log("document:%d get attachments with result:%s %s retry_times:%d"%(item.get("docid"),str(_succeed),str(_to_ack),_retry_times)) except Exception as e: traceback.print_exc() if time.time()-start_time<10: item["retry_times"] -= 1 if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment): ackMsg(conn,message_id) def request_attachment_interface(self,attach,_dochtmlcon): filemd5 = attach.getProperties().get(attachment_filemd5) _status = attach.getProperties().get(attachment_status) _filetype = attach.getProperties().get(attachment_filetype) _path = attach.getProperties().get(attachment_path) _uuid = uuid4() objectPath = attach.getProperties().get(attachment_path) docids = attach.getProperties().get(attachment_docids) _ots_exists = attach.getProperties().get("ots_exists") if objectPath is None: relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex) else: relative_path = objectPath[5:].replace("//","/") localpath = "/FileInfo/%s"%(relative_path) if not os.path.exists(localpath): if not os.path.exists(os.path.dirname(localpath)): os.makedirs(os.path.dirname(localpath)) local_exists = False else: local_exists = True _size = os.path.getsize(localpath) not_failed_flag = True try: d_start_time = time.time() if not local_exists: log("md5:%s path:%s not exists,start downloading"%(filemd5,objectPath)) try: download_succeed = downloadFile(self.bucket,objectPath,localpath) except Exception as e: download_succeed = False else: log("md5:%s path:%s exists"%(filemd5,objectPath[5:])) if not (local_exists or download_succeed): _ots_attach = attachment(attach.getProperties_ots()) _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_classification,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True) log("md5:%s path:%s file not in local or oss,search ots.attachment"%(filemd5,objectPath)) if _ots_attach.getProperties().get(attachment_attachmenthtml,"")!="" and str(_ots_attach.getProperties().get(attachment_status))!=str(ATTACHMENT_INIT): attach.setValue(attachment_attachmenthtml,_ots_attach.getProperties().get(attachment_attachmenthtml,"")) attach.setValue(attachment_attachmentcon,_ots_attach.getProperties().get(attachment_attachmentcon,"")) attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,"")) attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,"")) attach.setValue(attachment_classification,_ots_attach.getProperties().get(attachment_classification,"")) # if attach.exists(self.attach_pool): # attach.update_row(self.attach_pool) # else: # attach.insert_row(self.attach_pool) self.putAttach_json_toRedis(filemd5,attach.getProperties()) try: if os.exists(localpath): os.remove(localpath) except Exception as e: pass return True if _ots_exists: objectPath = attach.getProperties().get(attachment_path) download_succeed = downloadFile(self.bucket,objectPath,localpath) if download_succeed: log("md5:%s path:%s download file from oss succeed"%(filemd5,objectPath)) else: log("md5:%s path:%s download file from ots failed=="%(filemd5,objectPath)) else: log("md5:%s path:%s not found in ots"%(filemd5,objectPath)) if local_exists or download_succeed: _size = os.path.getsize(localpath) attach.setValue(attachment_size,_size,True) if _size>ATTACHMENT_LARGESIZE: attach.setValue(attachment_status, ATTACHMENT_TOOLARGE,True) log("attachment :%s of path:%s to large"%(filemd5,_path)) _ots_attach = attachment(attach.getProperties_ots()) _ots_attach.update_row(self.ots_client) # #更新postgres # if attach.exists(self.attach_pool): # attach.update_row(self.attach_pool) # else: # attach.insert_row(self.attach_pool) self.putAttach_json_toRedis(filemd5,attach.getProperties()) if local_exists: if not _ots_exists: upload_status = uploadFileByPath(self.bucket,localpath,objectPath) os.remove(localpath) return True time_download = time.time()-d_start_time #调用接口处理结果 start_time = time.time() _filetype = attach.getProperties().get(attachment_filetype) # _data_base64 = base64.b64encode(open(localpath,"rb").read()) # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype) _success,_html,swf_images,classification = getAttachDealInterface(None,_filetype,path=localpath,session=self.session) _reg_time = time.time()-start_time if _success: if len(_html)<5: _html = "" else: if len(_html)>1: _html = "interface return error" else: # sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html))) _html = "" return False # 重跑swf时,删除原来的swf_urls中的"\" if attach.getProperties().get(attachment_filetype) == "swf": swf_urls = attach.getProperties().get(attachment_swfUrls, "[]") swf_urls = swf_urls.replace('\\', '') if swf_urls else '[]' swf_urls = json.loads(swf_urls) attach.setValue(attachment_swfUrls, json.dumps(swf_urls, ensure_ascii=False), True) swf_images = eval(swf_images) if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0: swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]")) if len(swf_urls)==0: objectPath = attach.getProperties().get(attachment_path,"") swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex) if not os.path.exists(swf_dir): os.mkdir(swf_dir) for _i in range(len(swf_images)): _base = swf_images[_i] _base = base64.b64decode(_base) filename = "swf_page_%d.png"%(_i) filepath = os.path.join(swf_dir,filename) with open(filepath,"wb") as f: f.write(_base) swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir) if os.path.exists(swf_dir): os.rmdir(swf_dir) attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True) else: attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True) if re.search("=0: _path = filelink.split("/file") if len(_path)>1: return _path[1] def getAttach_json_fromRedis(self,filemd5): db = self.redis_pool.getConnector() try: _key = "attach-%s"%(filemd5) _attach_json = db.get(_key) return _attach_json except Exception as e: log("getAttach_json_fromRedis error %s"%(str(e))) finally: try: if db.connection.check_health(): self.redis_pool.putConnector(db) except Exception as e: pass return None def putAttach_json_toRedis(self,filemd5,extract_dict): db = self.redis_pool.getConnector() try: new_dict = {} for k,v in extract_dict.items(): if not isinstance(v,set): new_dict[k] = v _key = "attach-%s"%(filemd5) _extract_json = db.set(str(_key),json.dumps(new_dict)) db.expire(_key,3600*3) return _extract_json except Exception as e: log("putExtract_json_toRedis error%s"%(str(e))) traceback.print_exc() finally: try: if db.connection.check_health(): self.redis_pool.putConnector(db) except Exception as e: pass def getAttachments(self,list_filemd5,_dochtmlcon): conn = self.attach_pool.getConnector() #搜索postgres try: to_find_md5 = [] for _filemd5 in list_filemd5[:50]: if _filemd5 is not None: to_find_md5.append(_filemd5) conditions = ["filemd5 in ('%s')"%("','".join(to_find_md5))] list_attachment = [] set_md5 = set() # list_attachment = Attachment_postgres.select_rows(conn,Attachment_postgres,"attachment",conditions) # for _attach in list_attachment: # set_md5.add(_attach.getProperties().get(attachment_filemd5)) for _filemd5 in to_find_md5: _json = self.getAttach_json_fromRedis(_filemd5) if _json is not None: set_md5.add(_filemd5) list_attachment.append(Attachment_postgres(json.loads(_json))) log("select localpath database %d/%d"%(len(set_md5),len(to_find_md5))) for _filemd5 in to_find_md5: if _filemd5 not in set_md5: _path = self.getAttachPath(_filemd5,_dochtmlcon) log("getAttachments search in ots:%s"%(_filemd5)) _attach = {attachment_filemd5:_filemd5} _attach_ots = attachment(_attach) if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time,attachment_classification],True): if _attach_ots.getProperties().get(attachment_status) is not None: log("getAttachments find in ots:%s"%(_filemd5)) _attach_pg = Attachment_postgres(_attach_ots.getProperties()) _attach_pg.setValue("ots_exists",True,True) list_attachment.append(_attach_pg) else: log("getAttachments status None find in ots:%s"%(_filemd5)) _attach_pg = Attachment_postgres(_attach_ots.getProperties()) _attach_pg.setValue("ots_exists",True,True) list_attachment.append(_attach_pg) else: log("getAttachments search in path:%s"%(_filemd5)) if _path: log("getAttachments find in path:%s"%(_filemd5)) if _path[0]=="/": _path = _path[1:] _filetype = _path.split(".")[-1] _attach = {attachment_filemd5:_filemd5, attachment_filetype:_filetype, attachment_status:20, attachment_path:"%s/%s"%(_filemd5[:4],_path), attachment_crtime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")} list_attachment.append(Attachment_postgres(_attach)) return list_attachment except Exception as e: log("attachProcess comsumer error %s"%str(e)) log(str(to_find_md5)) traceback.print_exc() return [] finally: self.attach_pool.putConnector(conn) def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]): q_size = self.queue_attachment.qsize() qsize_ocr = self.queue_attachment_ocr.qsize() qsize_not_ocr = self.queue_attachment_not_ocr.qsize() log("queue_attachment:%d,queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(q_size,qsize_ocr,qsize_not_ocr)) def flow_attachment_producer_comsumer(self): log("start flow_attachment comsumer") mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1,need_stop=False,restart=True) mt.run() def flow_attachment_process(self): self.process_comsumer() # p = Process(target = self.process_comsumer) # p.start() # p.join() def set_queue(self,_dict): list_attach = _dict.get("list_attach") to_ocr = False for attach in list_attach: if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]: to_ocr = True break if to_ocr: self.queue_attachment_ocr.put(_dict,True) else: self.queue_attachment_not_ocr.put(_dict,True) def comsumer_handle(self,_dict,result_queue): try: frame = _dict["frame"] conn = _dict["conn"] message_id = frame.headers["message-id"] item = json.loads(frame.body) page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]")) _dochtmlcon = item.get(document_tmp_dochtmlcon,"") if len(page_attachments)==0: self.set_queue({"item":item,"list_attach":[],"message_id":message_id,"conn":conn}) else: list_fileMd5 = [] for _atta in page_attachments: list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5)) list_attach = self.getAttachments(list_fileMd5,_dochtmlcon) self.set_queue({"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn}) except Exception as e: traceback.print_exc() def remove_attachment_postgres(self): current_date = getCurrent_date(format="%Y-%m-%d") last_date = timeAdd(current_date,-2,format="%Y-%m-%d") sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date) conn = self.attach_pool.getConnector() try: cursor = conn.cursor() cursor.execute(sql) conn.commit() self.attach_pool.putConnector(conn) except Exception as e: conn.close() def start_flow_attachment(self): schedule = BlockingScheduler() # schedule.add_job(self.flow_attachment_process,"cron",second="*/20") # schedule.add_job(self.flow_attachment,"cron",second="*/10") # schedule.add_job(self.flow_attachment_producer,"cron",second="*/10") # schedule.add_job(self.flow_attachment_producer_comsumer,"cron",second="*/10") # schedule.add_job(self.monitor_listener,"cron",minute="*/1") # schedule.add_job(self.monitor_attachment_process,"cron",second="*/10") # schedule.add_job(self.remove_attachment_postgres,"cron",hour="6") schedule.add_job(self.process_failed_attachment,"cron",minute="*/10") schedule.start() class Dataflow_ActivteMQ_extract(Dataflow_extract): class ExtractListener(): def __init__(self,conn,_func,_idx,*args,**kwargs): self.conn = conn self._func = _func self._idx = _idx def on_message(self, headers): try: log("get message of idx:%d"%(self._idx)) message_id = headers.headers["message-id"] body = headers.body log("get message %s crtime:%s"%(message_id,json.loads(body).get("crtime",""))) self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None) except Exception as e: traceback.print_exc() pass def on_error(self, headers): log('received an error %s' % str(headers.body)) def __del__(self): self.conn.disconnect() def __init__(self,create_listener=True): Dataflow_extract.__init__(self) self.industy_url = "http://127.0.0.1:15000/industry_extract" self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",20], ["http://192.168.0.115:15030/content_extract",10] ] self.mq_extract = "/queue/dataflow_extract" self.mq_extract_ai = "/queue/dataflow_extract_AI" self.mq_extract_failed = "/queue/dataflow_extract_failed" self.whole_weight = 0 for _url,weight in self.extract_interfaces: self.whole_weight+= weight current_weight = 0 for _i in range(len(self.extract_interfaces)): current_weight += self.extract_interfaces[_i][1] self.extract_interfaces[_i][1] = current_weight/self.whole_weight self.comsumer_count = 5 # self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres) self.pool_redis_doc = ConnectorPool(1,self.comsumer_count,getConnect_redis_doc) init_nums = 1 self.conn_mq = None if create_listener: self.conn_mq = getConnect_activateMQ() else: init_nums = 0 self.pool_mq = ConnectorPool(init_nums,30,getConnect_activateMQ) self.block_url = RLock() self.url_count = 0 self.session = None self.MP = MedicalProduct() self.list_extract_comsumer = [] self.list_extract_ai_comsumer = [] # for _i in range(self.comsumer_count): # listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i) # createComsumer(listener_extract,self.mq_extract) # self.list_extract_comsumer.append(listener_extract) # 提取listener if create_listener: for ii in range(10): listener_p = Process(target=self.start_extract_listener) listener_p.start() listener_p_ai = Thread(target=self.start_extract_AI_listener) listener_p_ai.start() def start_extract_AI_listener(self,_count=6): self.list_extract_ai_comsumer = [] for _i in range(_count): listener_extract = self.ExtractListener(getConnect_activateMQ(),self.extract_ai_handle,_i) createComsumer(listener_extract,self.mq_extract_ai) self.list_extract_ai_comsumer.append(listener_extract) while 1: try: for _i in range(len(self.list_extract_ai_comsumer)): if self.list_extract_ai_comsumer[_i].conn.is_connected(): continue else: listener = self.ExtractListener(getConnect_activateMQ(),self.extract_ai_handle,_i) createComsumer(listener,self.mq_extract_ai) self.list_extract_ai_comsumer[_i] = listener time.sleep(5) except Exception as e: traceback.print_exc() def start_extract_listener(self): self.list_extract_comsumer = [] for _i in range(self.comsumer_count): listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i) createComsumer(listener_extract,self.mq_extract) self.list_extract_comsumer.append(listener_extract) while 1: try: for _i in range(len(self.list_extract_comsumer)): if self.list_extract_comsumer[_i].conn.is_connected(): continue else: listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i) createComsumer(listener,self.mq_extract) self.list_extract_comsumer[_i] = listener time.sleep(5) except Exception as e: traceback.print_exc() def monitor_listener(self): for i in range(len(self.list_extract_comsumer)): if self.list_extract_comsumer[i].conn.is_connected(): continue else: listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle) createComsumer(listener,self.mq_extract) self.list_extract_comsumer[i] = listener def getExtract_url(self): # _url_num = 0 # with self.block_url: # self.url_count += 1 # self.url_count %= self.whole_weight # _url_num = self.url_count _r = random.random() # _r = _url_num/self.whole_weight for _i in range(len(self.extract_interfaces)): if _r<=self.extract_interfaces[_i][1]: return self.extract_interfaces[_i][0] def request_extract_interface(self,json,headers): # _i = random.randint(0,len(self.extract_interfaces)-1) # _i = 0 # _url = self.extract_interfaces[_i] _url = self.getExtract_url() log("extract_url:%s"%(str(_url))) with requests.Session() as session: resp = session.post(_url,json=json,headers=headers,timeout=10*60) return resp def request_industry_interface(self,json,headers): resp = requests.post(self.industy_url,json=json,headers=headers) return resp def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]): q_size = self.queue_extract.qsize() log("queue extract size:%d"%(q_size)) def process_extract_failed(self): def _handle(_dict,result_queue): frame = _dict.get("frame") message_id = frame.headers["message-id"] subscription = frame.headers.setdefault('subscription', None) conn = _dict.get("conn") body = frame.body if body is not None: item = json.loads(body) item["extract_times"] = 10 if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract): ackMsg(conn,message_id,subscription) from BaseDataMaintenance.java.MQInfo import getQueueSize try: extract_failed_size = getQueueSize("dataflow_extract_failed") extract_size = getQueueSize("dataflow_extract") log("extract_failed_size %s extract_size %s"%(str(extract_failed_size),str(extract_size))) if extract_failed_size>0 and extract_size<100: failed_listener = self.ExtractListener(getConnect_activateMQ(),_handle,1) createComsumer(failed_listener,self.mq_extract_failed) while 1: extract_failed_size = getQueueSize("dataflow_extract_failed") if extract_failed_size==0: break time.sleep(10) failed_listener.conn.disconnect() except Exception as e: traceback.print_exc() def flow_extract(self,): self.comsumer() def comsumer(self): mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,20,1,True) mt.run() def getExtract_json_fromDB(self,_fingerprint): conn = self.pool_postgres.getConnector() try: list_extract = Document_extract_postgres.select_rows(conn,Document_extract_postgres,"document_extract",[" fingerprint='%s'"%_fingerprint]) if len(list_extract)>0: _extract = list_extract[0] return _extract.getProperties().get(document_extract_extract_json) except Exception as e: traceback.print_exc() finally: self.pool_postgres.putConnector(conn) return None def putExtract_json_toDB(self,fingerprint,docid,extract_json): _de = Document_extract_postgres({document_extract_fingerprint:fingerprint, document_extract_docid:docid, document_extract_extract_json:extract_json}) _de.insert_row(self.pool_postgres,1) def getExtract_json_fromRedis(self,_fingerprint): db = self.pool_redis_doc.getConnector() try: _extract_json = db.get(_fingerprint) return _extract_json except Exception as e: log("getExtract_json_fromRedis error %s"%(str(e))) finally: try: if db.connection.check_health(): self.pool_redis_doc.putConnector(db) except Exception as e: pass return None def putExtract_json_toRedis(self,fingerprint,extract_json): db = self.pool_redis_doc.getConnector() try: _extract_json = db.set(str(fingerprint),extract_json) db.expire(fingerprint,3600*2) return _extract_json except Exception as e: log("putExtract_json_toRedis error%s"%(str(e))) traceback.print_exc() finally: try: if db.connection.check_health(): self.pool_redis_doc.putConnector(db) except Exception as e: pass def comsumer_handle(self,_dict,result_queue): try: log("start handle") data = {} frame = _dict["frame"] conn = _dict["conn"] message_id = frame.headers["message-id"] subscription = frame.headers.setdefault('subscription', None) item = json.loads(frame.body) for k,v in item.items(): try: if isinstance(v,bytes): item[k] = v.decode("utf-8") except Exception as e: log("docid %d types bytes can not decode"%(item.get("docid"))) item[k] = "" dtmp = Document_tmp(item) dhtml = Document_html({"partitionkey":item.get("partitionkey"), "docid":item.get("docid")}) extract_times = item.get("extract_times",0)+1 item["extract_times"] = extract_times _dochtmlcon = item.get(document_tmp_dochtmlcon,"") html_len = len(_dochtmlcon) # html 文本长度 limit_text_len = 50000 # 内容(或附件)正文限制文本长度 if html_len > limit_text_len: log("docid %s dochtmlcon too long len %d "%(str(item.get("docid")),html_len)) try: _dochtmlcon = re.sub("|||", "", _dochtmlcon) _soup = BeautifulSoup(_dochtmlcon,"lxml") all_len = len(_soup.get_text()) # 全公告内容text长度 _attachment = _soup.find("div", attrs={"class": "richTextFetch"}) attachment_len = len(_attachment.get_text()) if _attachment else 0 # 附件内容text长度 main_text_len = all_len - attachment_len # 正文内容text长度 if attachment_len>150000: # 附件内容过长删除(处理超时) if _attachment is not None: _attachment.decompose() attachment_len = 0 # 正文或附件内容text长度大于limit_text_len才执行article_limit if main_text_len>limit_text_len or attachment_len>limit_text_len: _soup = article_limit(_soup,limit_text_len) _dochtmlcon = str(_soup) except Exception as e: traceback.print_exc() ackMsg(conn,message_id,subscription) return log("docid %s len %d limit to %d"%(str(item.get("docid")),html_len,len(_dochtmlcon))) dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True) _extract = Document_extract({}) _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey)) _extract.setValue(document_extract2_docid,item.get(document_docid)) all_done = 1 for k,v in item.items(): data[k] = v data["timeout"] = 440 data["doc_id"] = data.get(document_tmp_docid,0) # if data["docid"]<298986054 and data["docid"]>0: # log("jump docid %s"%(str(data["docid"]))) # ackMsg(conn,message_id,subscription) # return data["content"] = data.get(document_tmp_dochtmlcon,"") if document_tmp_dochtmlcon in data: data.pop(document_tmp_dochtmlcon) data["title"] = data.get(document_tmp_doctitle,"") data["web_source_no"] = item.get(document_tmp_web_source_no,"") data["web_source_name"] = item.get(document_tmp_web_source_name,"") data["original_docchannel"] = item.get(document_tmp_original_docchannel,"") data["page_attachments"] = item.get(document_tmp_attachment_path,"[]") _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))+str(data["original_docchannel"]) to_ai = False if all_done>0: _time = time.time() # extract_json = self.getExtract_json_fromDB(_fingerprint) extract_json = self.getExtract_json_fromRedis(_fingerprint) log("get json from db takes %.4f"%(time.time()-_time)) # extract_json = None _docid = int(data["doc_id"]) if extract_json is not None: log("fingerprint %s exists docid:%s"%(_fingerprint,str(_docid))) _extract.setValue(document_extract2_extract_json,extract_json,True) else: resp = self.request_extract_interface(json=data,headers=self.header) if (resp.status_code >=200 and resp.status_code<=213): extract_json = resp.content.decode("utf8") _extract.setValue(document_extract2_extract_json,extract_json,True) _time = time.time() # self.putExtract_json_toDB(_fingerprint,_docid,extract_json) self.putExtract_json_toRedis(_fingerprint,extract_json) log("get json to db takes %.4f"%(time.time()-_time)) else: log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8"))) all_done = -2 to_ai,_reason = self.should_to_extract_ai(extract_json) if to_ai: log("to_ai of docid:%s of reason:%s"%(str(_docid),str(_reason))) # if all_done>0: # resp = self.request_industry_interface(json=data,headers=self.header) # if (resp.status_code >=200 and resp.status_code<=213): # _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True) # else: # log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8"))) # all_done = -3 # _to_ack = False # if all_done>0 and len(_extract.getProperties().get(document_extract2_extract_json,""))<=2: # all_done = -4 _extract.setValue(document_extract2_industry_json,"{}",True) _to_ack = True try: if all_done!=1: # sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done)) log("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done)) if extract_times>=10: #process as succeed dtmp.setValue(document_tmp_dochtmlcon,"",False) dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True) dtmp.update_row(self.ots_client) dhtml.update_row(self.ots_client) #replace as {} _extract.setValue(document_extract2_extract_json,"{}",True) _extract.setValue(document_extract2_industry_json,"{}",True) _extract.setValue(document_extract2_status,random.randint(1,50),True) _extract.update_row(self.ots_client) _to_ack = True elif extract_times>5: #transform to the extract_failed queue if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed): #process as succeed dtmp.setValue(document_tmp_dochtmlcon,"",False) dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True) dtmp.update_row(self.ots_client) dhtml.update_row(self.ots_client) #replace as {} _extract.setValue(document_extract2_extract_json,"{}",True) _extract.setValue(document_extract2_industry_json,"{}",True) _extract.setValue(document_extract2_status,random.randint(1,50),True) _extract.update_row(self.ots_client) _to_ack = True else: send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract) #失败保存 dtmp.setValue(document_tmp_dochtmlcon,"",False) dtmp.setValue(document_tmp_status,60,True) if not dtmp.exists_row(self.ots_client): dtmp.update_row(self.ots_client) dhtml.update_row(self.ots_client) if send_succeed: _to_ack = True else: #process succeed dtmp.setValue(document_tmp_dochtmlcon,"",False) dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True) if _docid==290816703: dtmp.setValue("test_json",extract_json,True) dtmp.update_row(self.ots_client) dhtml.update_row(self.ots_client) _extract.setValue(document_extract2_status,random.randint(1,50),True) _extract.update_row(self.ots_client) _to_ack = True except Exception: traceback.print_exc() if _to_ack: ackMsg(conn,message_id,subscription) if to_ai: #sent to ai item[document_extract2_extract_json] = json.loads(_extract.getProperties().get(document_extract2_extract_json,"{}")) send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai) else: item["extract_times"] -= 1 send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract) ackMsg(conn,message_id,subscription) log("process %s docid:%d %s"%(str(_to_ack),data.get("doc_id"),str(all_done))) except requests.ConnectionError as e1: item["extract_times"] -= 1 if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract): ackMsg(conn,message_id,subscription) except Exception as e: traceback.print_exc() # sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e))) log("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e))) log("process %s docid: failed message_id:%s"%(data.get("doc_id"),message_id)) if extract_times>=10: #process as succeed dtmp.setValue(document_tmp_dochtmlcon,"",False) dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True) dtmp.update_row(self.ots_client) dhtml.update_row(self.ots_client) #replace as {} _extract.setValue(document_extract2_extract_json,"{}",True) _extract.setValue(document_extract2_industry_json,"{}",True) _extract.setValue(document_extract2_status,random.randint(1,50),True) _extract.update_row(self.ots_client) ackMsg(conn,message_id,subscription) elif extract_times>5: #transform to the extract_failed queue if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed): #process as succeed dtmp.setValue(document_tmp_dochtmlcon,"",False) dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True) dtmp.update_row(self.ots_client) dhtml.update_row(self.ots_client) #replace as {} _extract.setValue(document_extract2_extract_json,"{}",True) _extract.setValue(document_extract2_industry_json,"{}",True) _extract.setValue(document_extract2_status,random.randint(1,50),True) _extract.update_row(self.ots_client) ackMsg(conn,message_id,subscription) else: #transform to the extract queue #失败保存 dtmp.setValue(document_tmp_dochtmlcon,"",False) dtmp.setValue(document_tmp_status,60,True) if not dtmp.exists_row(self.ots_client): dtmp.update_row(self.ots_client) dhtml.update_row(self.ots_client) if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract): ackMsg(conn,message_id,subscription) def should_to_extract_ai(self,extract_json): _reason = "" # return False,_reason _extract = {} if extract_json is not None: try: _extract = json.loads(extract_json) except Exception as e: pass has_entity = False docchannel = _extract.get("docchannel",{}).get("docchannel","") doctype = _extract.get("docchannel",{}).get("doctype","") entity_len = len(_extract.get("dict_enterprise",{}).keys()) product = str(_extract.get("product","")) class_name = _extract.get("industry",{}).get("class_name","") _entity = None if entity_len>0: has_entity = True if entity_len==1: _entity = list(_extract.get("dict_enterprise",{}).keys())[0] prem = _extract.get("prem",{}) one_entity_used = False has_tenderee = False has_win_tenderer = False has_budget = False budget_unexpected = False winprice_unexpected = False for _pack,_pack_value in prem.items(): _rolelist = _pack_value.get("roleList",[]) for _role in _rolelist: if _role.get("role_name","")=="tenderee": has_tenderee = True if _role.get("role_text","")==_entity: one_entity_used = True if _role.get("role_name","")=="agency": if _role.get("role_text","")==_entity: one_entity_used = True if _role.get("role_name","")=="win_tenderer": has_win_tenderer = True if _role.get("role_text","")==_entity: one_entity_used = True win_price = _role.get("role_money",{}).get("money",0) try: win_price = float(win_price) except Exception as e: win_price = 0 if win_price>0: if win_price>100000000 or win_price<100: winprice_unexpected = True tendereeMoney = _pack_value.get("tendereeMoney",0) try: tendereeMoney = float(tendereeMoney) except Exception as e: tendereeMoney = 0 if tendereeMoney>0: has_budget = True if tendereeMoney>100000000 or tendereeMoney<100: budget_unexpected = True if doctype=="采招数据": if has_entity and not one_entity_used: if not has_tenderee and docchannel in {"招标公告","中标信息","候选人公示","合同公告","验收合同"}: return True,_reason if not has_win_tenderer and docchannel in {"中标信息","候选人公示","合同公告","验收合同"}: return True,_reason if class_name=="医疗设备" or self.MP.is_medical_product(product): _reason = "medical product" return True,_reason if budget_unexpected or winprice_unexpected: return True,_reason return False,_reason def extract_ai_handle(self,_dict,result_queue): frame = _dict["frame"] conn = _dict["conn"] message_id = frame.headers["message-id"] subscription = frame.headers.setdefault('subscription', None) item = json.loads(frame.body) _extract_json = None if document_extract2_extract_json in item: _extract_json = item.get(document_extract2_extract_json) item.pop(document_extract2_extract_json) try: message_acknowledged = False dtmp = Document_tmp(item) _tomq = False if dtmp.fix_columns(self.ots_client,["status","save"],True): if dtmp.getProperties().get("status",0)>=71: if dtmp.getProperties().get("save",1)==0: message_acknowledged = True log("extract_dump_ai of docid:%d"%(item.get(document_docid))) ackMsg(conn,message_id,subscription) return else: _tomq = True else: _tomq = True if _tomq: aitimes = item.get("aitimes") if aitimes is None: aitimes = getCurrent_date(format="%Y-%m-%d %H:%M:%S") item["aitimes"] = aitimes if not message_acknowledged: item[document_extract2_extract_json] = _extract_json if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai): message_acknowledged = True ackMsg(conn,message_id,subscription) time.sleep(1) return else: if not timeAdd(aitimes,0,format="%Y-%m-%d %H:%M:%S",minutes=10)0: _new_json,_changed = self.merge_json(_extract_json,_json) if _changed: dtmp.setValue("extract_json_ai",json.dumps(_extract_ai,ensure_ascii=False)) dtmp.setValue(document_tmp_dochtmlcon,"",False) dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True) dtmp.update_row(self.ots_client) if not dhtml.exists_row(self.ots_client): dhtml.update_row(self.ots_client) _extract = Document_extract({}) _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey)) _extract.setValue(document_extract2_docid,item.get(document_docid)) _extract.setValue(document_extract2_extract_json,_new_json,True) _extract.setValue(document_extract2_industry_json,"{}",True) _extract.setValue(document_extract2_status,random.randint(1,50),True) _extract.update_row(self.ots_client) log("extract_ai of docid:%d"%(item.get(document_docid))) else: doc = Document({"partitionkey":item.get("partitionkey"), "docid":item.get("docid"), "extract_json_ai":_json }) if doc.exists_row(self.ots_client): doc.update_row(self.ots_client) log("extract_nochange_ai of docid:%d"%(item.get(document_docid))) if not message_acknowledged: message_acknowledged = True ackMsg(conn,message_id,subscription) except Exception as e: traceback.print_exc() if not message_acknowledged: ackMsg(conn,message_id,subscription) def merge_json(self,extract_json,extract_ai_json): def get_ai_money(_text): b = re.search(r'[\d,,\.]+[亿万元人民币]+',str(_text)) if b is not None: return b.group() def clean_ai_entity(entity,set_entitys): if isinstance(entity,str): if re.search("(未|无)(明确|提及)|某(部|单位|医院|公司)|\*\*|XX|登录|详见|招标单位",entity) is not None: return "" if re.search("无|区|县|市|省|某|中心|部|公司",entity) is not None and len(entity)<=5: return "" # return entity for _i in range(len(entity)-5): if entity[:len(entity)-_i] in set_entitys: return entity[:len(entity)-_i] return "" def clean_ai_extract(_extract,_extract_ai): set_entitys = set() set_moneys = set() dict_enterprise = _extract.get("dict_enterprise",{}) for k,v in dict_enterprise.items(): set_entitys.add(str(k)) _sum = 0 for _money in _extract.get("moneys",[]): _money = float(_money) set_moneys.add(str(_money)) _sum += _money if _sum>0: set_moneys.add(str(float(_sum))) _sum = 0 for _money in _extract.get("moneys_attachment",[]): _money = float(_money) set_moneys.add(str(_money)) _sum += _money if _sum>0: set_moneys.add(str(float(_sum))) _tenderee_dict = _extract_ai.get("招标信息",{}) _tenderee_ai = _tenderee_dict.get("招标人名称") if _tenderee_ai is not None: _tenderee_ai = clean_ai_entity(_tenderee_ai,set_entitys) _tenderee_dict["招标人名称"] = _tenderee_ai # if _tenderee_ai in set_entitys: # _tenderee_dict["招标人名称"] = _tenderee_ai # else: # _tenderee_dict["招标人名称"] = "" _budget = _tenderee_dict.get("项目预算") if _budget is not None: _budget = getUnifyMoney(str(_budget)) _budget = str(float(_budget)) if _budget in set_moneys: _tenderee_dict["项目预算"] = _budget else: _tenderee_dict["项目预算"] = "" list_win = _extract_ai.get("中标信息",[]) for _win_dict_i in range(len(list_win)): _win_dict = list_win[_win_dict_i] _pack = _win_dict.get("标段号","") if _pack=="": if len(list_win)>1: _pack = "AI_%d"%(_win_dict_i) else: _pack = "Project" _win_money = _win_dict.get("中标金额") if str(_win_money).find("%")>=0: _win_money = 0 _win_money = getUnifyMoney(str(_win_money)) _win_money = str(float(_win_money)) if _win_money in set_moneys: _win_dict["中标金额"] = _win_money else: _win_dict["中标金额"] = "" _win_tenderer = _win_dict.get("中标人名称") _win_tenderer = clean_ai_entity(_win_tenderer,set_entitys) _win_dict["中标人名称"] = _win_tenderer # if _win_tenderer in set_entitys: # _win_dict["中标人名称"] = _win_tenderer # else: # _win_dict["中标人名称"] = "" list_product = _extract_ai.get("产品信息",[]) for product_i in range(len(list_product)): product_dict = list_product[product_i] uni_price = product_dict.get("单价","") quantity = product_dict.get("数量","") total_price = product_dict.get("总价","") if uni_price is not None and uni_price!="": uni_price = getUnifyMoney(str(uni_price)) uni_price = str(float(uni_price)) if uni_price in set_moneys: product_dict["单价"] = uni_price else: product_dict["单价"] = "" if quantity is not None and quantity!="": quantity = getUnifyMoney(str(quantity)) product_dict["数量"] = quantity if total_price is not None and total_price!="": total_price = getUnifyMoney(str(total_price)) total_price = str(float(total_price)) if total_price in set_moneys: product_dict["总价"] = total_price else: product_dict["总价"] = "" _extract = {} if extract_json is not None: try: if isinstance(extract_json,str): _extract = json.loads(extract_json) else: _extract = extract_json except Exception as e: pass if "extract_count" not in _extract: _extract["extract_count"] = 0 _extract_ai = {} if extract_ai_json is not None: try: _extract_ai = json.loads(extract_ai_json) except Exception as e: pass clean_ai_extract(_extract,_extract_ai) _product = _extract.get("product",[]) list_new_product = _extract_ai.get("产品信息",[]) prem = _extract.get("prem") if prem is None: _extract["prem"] = {} prem = _extract["prem"] Project = prem.get("Project") if Project is None: prem["Project"] = {} Project = prem["Project"] Project_rolelist = Project.get("roleList") if Project_rolelist is None: Project["roleList"] = [] Project_rolelist = Project["roleList"] has_tenderee = False has_win_tenderer = False has_budget = False budget_unexpected = False winprice_unexpected = False for _pack,_pack_value in prem.items(): _rolelist = _pack_value.get("roleList",[]) for _role in _rolelist: if _role.get("role_name","")=="tenderee": has_tenderee = True if _role.get("role_name","")=="win_tenderer": has_win_tenderer = True win_price = _role.get("role_money",{}).get("money",0) try: win_price = float(win_price) except Exception as e: win_price = 0 if win_price>0: if win_price>100000000 or win_price<100: winprice_unexpected = True tendereeMoney = _pack_value.get("tendereeMoney",0) try: tendereeMoney = float(tendereeMoney) except Exception as e: tendereeMoney = 0 if tendereeMoney>0: has_budget = True if tendereeMoney>100000000 or tendereeMoney<100: budget_unexpected = True _changed = False prem_original = {} product_attrs_original = {} try: prem_original = json.loads(json.dumps(_extract.get("prem",{}))) product_attrs_original = json.loads(json.dumps(_extract.get("product_attrs",{}))) except Exception as e: pass if len(list_new_product)>0 and len(list_new_product)>len(product_attrs_original.get("data",[]))//3: set_product = set(_product) product_attrs_new = {"data":[]} for product_i in range(len(list_new_product)): brand = list_new_product[product_i].get("品牌","") product = list_new_product[product_i].get("产品名称","") quantity = list_new_product[product_i].get("数量","") quantity_unit = list_new_product[product_i].get("数量单位","") specs = list_new_product[product_i].get("规格型号","") uni_price = list_new_product[product_i].get("单价","") total_price = list_new_product[product_i].get("总价","") pinmu_no = list_new_product[product_i].get("品目编号","") pinmu_name = list_new_product[product_i].get("品目名称","") product_attrs_new["data"].append({ "brand": str(brand), "product": product, "quantity": str(quantity), "quantity_unit": quantity_unit, "specs": str(specs), "unitPrice": str(uni_price), "parameter": "", "total_price": str(total_price), "pinmu_no": str(pinmu_no), "pinmu_name": str(pinmu_name) }) if product not in set_product: set_product.add(product) _changed = True _extract["product"] = list(set_product) _extract["product_attrs"] = product_attrs_new _extract["extract_count"] += 3 if not has_tenderee: _tenderee_ai = _extract_ai.get("招标信息",{}).get("招标人名称") _contacts = _extract_ai.get("招标信息",{}).get("招标人联系方式",[]) _budget = _extract_ai.get("招标信息",{}).get("项目预算","") _linklist = [] for _conta in _contacts: _person = _conta.get("联系人","") _phone = _conta.get("联系电话","") if _person!="" or _phone!="": _linklist.append([_person,_phone]) if _tenderee_ai is not None and _tenderee_ai!="" and len(_tenderee_ai)>=4: _role_dict = { "role_name": "tenderee", "role_text": _tenderee_ai, "from_ai":True } if len(_linklist)>0: _role_dict["linklist"] = _linklist Project_rolelist.append(_role_dict) _changed = True _extract["extract_count"] += 1 if not has_budget or budget_unexpected: _budget = _extract_ai.get("招标信息",{}).get("项目预算","") if _budget is not None and _budget!="": _budget = getUnifyMoney(_budget) if _budget>0: if "tendereeMoney" in Project: Project["tendereeMoney_original"] = Project["tendereeMoney"] Project["tendereeMoney"] = str(float(_budget)) _changed = True _extract["extract_count"] += 1 else: if budget_unexpected: if "tendereeMoney" in Project: Project["tendereeMoney_original"] = Project["tendereeMoney"] Project["tendereeMoney"] = "0" if not has_win_tenderer or winprice_unexpected: list_win = _extract_ai.get("中标信息",[]) if len(list_win)>0: winprice_unexpected_new = False for _win_dict_i in range(len(list_win)): _win_dict = list_win[_win_dict_i] _pack = _win_dict.get("标段号","") if _pack=="": if len(list_win)>1: _pack = "AI_%d"%(_win_dict_i) else: _pack = "Project" _win_money = _win_dict.get("中标金额") if _win_money is not None and _win_money!="": _win_money = getUnifyMoney(_win_money) else: _win_money = 0 if _win_money>0: if _win_money>100000000 or _win_money<100: winprice_unexpected_new = True has_delete_win = False if winprice_unexpected and not winprice_unexpected_new: has_delete_win = True pop_packs = [] for _pack,_pack_value in prem.items(): _rolelist = _pack_value.get("roleList",[]) new_rolelist = [] for _role in _rolelist: if _role.get("role_name","")!="win_tenderer": new_rolelist.append(_role) _pack_value["roleList"] = new_rolelist if len(new_rolelist)==0 and _pack!="Project": pop_packs.append(_pack) for _pack in pop_packs: prem.pop(_pack) if not has_win_tenderer or has_delete_win: Project_rolelist = Project.get("roleList") if Project_rolelist is None: Project["roleList"] = [] Project_rolelist = Project["roleList"] for _win_dict_i in range(len(list_win)): _win_dict = list_win[_win_dict_i] _pack = _win_dict.get("标段号","") if _pack=="": if len(list_win)>1: _pack = "AI_%d"%(_win_dict_i) else: _pack = "Project" _win_money = _win_dict.get("中标金额") if _win_money is not None and _win_money!="": _win_money = getUnifyMoney(_win_money) else: _win_money = 0 _win_tenderer = _win_dict.get("中标人名称") if _win_tenderer!="" and len(_win_tenderer)>=4: _role_dict = { "role_name": "win_tenderer", "role_text": _win_tenderer, "winprice_unexpected":winprice_unexpected, "from_ai":True } _role_dict["role_money"] = { "money": str(float(_win_money)) } _changed = True _extract["extract_count"] += 2 if _pack=="Project": Project_rolelist.append(_role_dict) else: prem[_pack] = { "roleList":[ _role_dict ] } if _changed: _extract["extract_ai"] = True _extract["prem_original"] = prem_original _extract["product_attrs_original"] = product_attrs_original return json.dumps(_extract,ensure_ascii=False),_changed def delete_document_extract(self,save_count=70*10000): conn = self.pool_postgres.getConnector() try: cursor = conn.cursor() sql = " select max(docid),min(docid) from document_extract " cursor.execute(sql) rows = cursor.fetchall() if len(rows)>0: maxdocid,mindocid = rows[0] d_mindocid = int(maxdocid)-save_count if mindocid=self.get_count: self.getRangeDocid() next_docid = self.begin_docid+self.count self.count += 1 return next_docid def on_message(self, headers): try: next_docid = int(self.getNextDocid()) partitionkey = int(next_docid%500+1) message_id = headers.headers["message-id"] body = json.loads(headers.body) body[document_tmp_partitionkey] = partitionkey body[document_tmp_docid] = next_docid if body.get(document_original_docchannel) is None: body[document_original_docchannel] = body.get(document_docchannel) page_attachments = body.get(document_tmp_attachment_path,"[]") _uuid = body.get(document_tmp_uuid,"") if page_attachments!="[]": status = random.randint(1,10) body[document_tmp_status] = status if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment): log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid))) ackMsg(self.conn,message_id) else: log("send_msg_error on init listener") else: status = random.randint(11,50) body[document_tmp_status] = status if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract): log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid))) ackMsg(self.conn,message_id) else: log("send_msg_error on init listener") except Exception as e: traceback.print_exc() if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_init): log("init error") ackMsg(self.conn,message_id) def __del__(self): self.conn.disconnect() del self.pool_mq1 def __init__(self): Dataflow.__init__(self) self.max_shenpi_id = None self.base_shenpi_id = 400000000000 self.mq_init = "/queue/dataflow_init" self.mq_attachment = "/queue/dataflow_attachment" self.mq_extract = "/queue/dataflow_extract" self.pool_oracle = ConnectorPool(10,15,getConnection_oracle) self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ) self.ots_capacity = getConnect_ots_capacity() self.init_comsumer_counts = 2 self.list_init_comsumer = [] for i in range(self.init_comsumer_counts): listener = self.InitListener(getConnect_activateMQ()) createComsumer(listener,self.mq_init) self.list_init_comsumer.append(listener) def monitor_listener(self): for i in range(len(self.list_init_comsumer)): if self.list_init_comsumer[i].conn.is_connected(): continue else: listener = self.InitListener(getConnect_activateMQ()) createComsumer(listener,self.mq_init) self.list_init_comsumer[i] = listener def temp2mq(self,object): conn_oracle = self.pool_oracle.getConnector() try: list_obj = object.select_rows(conn_oracle,type(object),object.table_name,[]) for _obj in list_obj: ots_dict = _obj.getProperties_ots() if len(ots_dict.get("dochtmlcon",""))>500000: _obj.delete_row(conn_oracle) log("msg too long:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon","")))) continue if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_init): #删除数据,上线放开 _obj.delete_row(conn_oracle) else: log("send_msg_error111:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon","")))) self.pool_oracle.putConnector(conn_oracle) except Exception as e: traceback.print_exc() self.pool_oracle.decrease() def shenpi2mq(self): conn_oracle = self.pool_oracle.getConnector() try: if self.max_shenpi_id is None: # get the max_shenpi_id _query = BoolQuery(must_queries=[ExistsQuery("id")]) rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index", SearchQuery(_query,sort=Sort(sorters=[FieldSort("id",SortOrder.DESC)]),limit=1)) list_data = getRow_ots(rows) if len(list_data)>0: max_shenpi_id = list_data[0].get("id") if max_shenpi_id>self.base_shenpi_id: max_shenpi_id -= self.base_shenpi_id self.max_shenpi_id = max_shenpi_id if self.max_shenpi_id<60383953: self.max_shenpi_id = 60383953 if self.max_shenpi_id is not None: # select data in order origin_max_shenpi_id = T_SHEN_PI_XIANG_MU.get_max_id(conn_oracle) if origin_max_shenpi_id is not None: log("shenpi origin_max_shenpi_id:%d current_id:%d"%(origin_max_shenpi_id,self.max_shenpi_id)) for _id_i in range(self.max_shenpi_id+1,origin_max_shenpi_id+1): list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i) # send data to mq one by one with max_shenpi_id updated for _data in list_data: _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID) ots_dict = _data.getProperties_ots() if ots_dict["docid"]0: continue # bool_query = BoolQuery(must_queries=[ # TermQuery("id",_id_i), # ]) # rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index", # SearchQuery(bool_query,get_total_count=True)) # if total_count>0: # continue try: list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i) except Exception as e: continue # send data to mq one by one with max_shenpi_id updated for _data in list_data: _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID) ots_dict = _data.getProperties_ots() if ots_dict["docid"]1000 or extract_size>1000: log("ots2mq break because of long queue size") return bool_query = BoolQuery(must_queries=[RangeQuery("status",1,51)]) rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100), ColumnsToGet(return_type=ColumnReturnType.NONE)) list_data = getRow_ots(rows) task_queue = Queue() for _data in list_data: task_queue.put(_data) while next_token: rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index", SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100), ColumnsToGet(return_type=ColumnReturnType.NONE)) list_data = getRow_ots(rows) for _data in list_data: task_queue.put(_data) if task_queue.qsize()>=1000: break def _handle(_data,result_queue): _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey), document_tmp_docid:_data.get(document_tmp_docid), document_tmp_status:0} _document = Document(_d) _document.fix_columns(self.ots_client,None,True) _data = _document.getProperties() page_attachments = _data.get(document_tmp_attachment_path,"[]") _document_html = Document(_data) _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True) if page_attachments!="[]": status = random.randint(1,10) _data[document_tmp_status] = status send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment) else: status = random.randint(11,50) _data[document_tmp_status] = status send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract) if send_succeed: _document.setValue(document_tmp_status,0,True) _document.update_row(self.ots_client) else: log("send_msg_error2222") if task_queue.qsize()>0: mt = MultiThreadHandler(task_queue,_handle,None,15) mt.run() except Exception as e: traceback.print_exc() def otstmp2mq(self): try: from BaseDataMaintenance.java.MQInfo import getQueueSize attachment_size = getQueueSize("dataflow_attachment") extract_size = getQueueSize("dataflow_extract") if attachment_size>1000 or extract_size>1000: log("otstmp2mq break because of long queue size") return bool_query = BoolQuery(must_queries=[TermQuery("status",0)]) rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100), ColumnsToGet(return_type=ColumnReturnType.ALL)) list_data = getRow_ots(rows) for _data in list_data: _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey), document_tmp_docid:_data.get(document_tmp_docid), document_tmp_status:0} _document = Document_tmp(_d) page_attachments = _data.get(document_tmp_attachment_path,"[]") log("refix doc %s from document_tmp"%(str(_data.get(document_tmp_docid)))) _document_html = Document_html(_data) _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True) if page_attachments!="[]": status = random.randint(1,10) _data[document_tmp_status] = status send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment) else: status = random.randint(11,50) _data[document_tmp_status] = status send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract) if send_succeed: _document.setValue(document_tmp_status,1,True) _document.update_row(self.ots_client) else: log("send_msg_error2222") while next_token: rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index", SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100), ColumnsToGet(return_type=ColumnReturnType.ALL)) list_data = getRow_ots(rows) for _data in list_data: _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey), document_tmp_docid:_data.get(document_tmp_docid), document_tmp_status:0} _document = Document_tmp(_d) page_attachments = _data.get(document_tmp_attachment_path,"[]") _document_html = Document_html(_data) _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True) if page_attachments!="[]": status = random.randint(1,10) _data[document_tmp_status] = status send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment) else: status = random.randint(11,50) _data[document_tmp_status] = status send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract) if send_succeed: _document.setValue(document_tmp_status,1,True) _document.update_row(self.ots_client) else: log("send_msg_error2222") except Exception as e: traceback.print_exc() def test_dump_docid(self): class TestDumpListener(ActiveMQListener): def on_message(self, headers): message_id = headers.headers["message-id"] body = headers.body self._queue.put(headers,True) ackMsg(self.conn,message_id) _queue = Queue() listener1 = TestDumpListener(getConnect_activateMQ(),_queue) listener2 = TestDumpListener(getConnect_activateMQ(),_queue) createComsumer(listener1,"/queue/dataflow_attachment") createComsumer(listener2,"/queue/dataflow_extract") time.sleep(10) list_item = [] list_docid = [] while 1: try: _item = _queue.get(timeout=2) list_item.append(_item) except Exception as e: break for item in list_item: _item = json.loads(item.body) list_docid.append(_item.get("docid")) log(list_docid[:10]) log("len docid:%d set len:%d"%(len(list_docid),len(set(list_docid)))) def start_dataflow_init(self): # self.test_dump_docid() from BaseDataMaintenance.model.oracle.CaiGouYiXiangTemp import CaiGouYiXiangTemp from BaseDataMaintenance.model.oracle.PaiMaiChuRangTemp import PaiMaiChuRangTemp from BaseDataMaintenance.model.oracle.ZhaoBiaoGongGaoTemp import ZhaoBiaoGongGaoTemp from BaseDataMaintenance.model.oracle.ZhaoBiaoYuGaoTemp import ZhaoBiaoYuGaoTemp from BaseDataMaintenance.model.oracle.ZhongBiaoXinXiTemp import ZhongBiaoXinXiTemp from BaseDataMaintenance.model.oracle.ZiShenJieGuoTemp import ZiShenJieGuoTemp from BaseDataMaintenance.model.oracle.ChanQuanJiaoYiTemp import ChanQuanJiaoYiTemp from BaseDataMaintenance.model.oracle.GongGaoBianGengTemp import GongGaoBianGeng from BaseDataMaintenance.model.oracle.KongZhiJiaTemp import KongZhiJiaTemp from BaseDataMaintenance.model.oracle.TuDiKuangChanTemp import TuDiKuangChanTemp from BaseDataMaintenance.model.oracle.ZhaoBiaoDaYiTemp import ZhaoBiaoDaYiTemp from BaseDataMaintenance.model.oracle.ZhaoBiaoWenJianTemp import ZhaoBiaoWenJianTemp from BaseDataMaintenance.model.oracle.TouSuChuLiTemp import TouSuChuLiTemp from BaseDataMaintenance.model.oracle.WeiFaJiLuTemp import WeiFaJiLuTemp from BaseDataMaintenance.model.oracle.QiTaShiXinTemp import QiTaShiXin schedule = BlockingScheduler() schedule.add_job(self.temp2mq,"cron",args=(CaiGouYiXiangTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(PaiMaiChuRangTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoGongGaoTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoYuGaoTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(ZhongBiaoXinXiTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(ZiShenJieGuoTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(ChanQuanJiaoYiTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(GongGaoBianGeng({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(KongZhiJiaTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(TuDiKuangChanTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(TouSuChuLiTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(WeiFaJiLuTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(QiTaShiXin({}),),second="*/10") schedule.add_job(self.ots2mq,"cron",second="*/10") schedule.add_job(self.otstmp2mq,"cron",second="*/10") schedule.add_job(self.monitor_listener,"cron",minute="*/1") schedule.add_job(self.shenpi2mq,"cron",minute="*/1") schedule.start() def transform_attachment(): from BaseDataMaintenance.model.ots.attachment import attachment from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots from threading import Thread from queue import Queue task_queue = Queue() def comsumer(task_queue,pool_postgres): while 1: _dict = task_queue.get(True) try: attach = Attachment_postgres(_dict) if not attach.exists(pool_postgres): attach.insert_row(pool_postgres) except Exception as e: traceback.print_exc() def start_comsumer(task_queue): pool_postgres = ConnectorPool(10,30,getConnection_postgres) comsumer_count = 30 list_thread = [] for i in range(comsumer_count): _t = Thread(target=comsumer,args=(task_queue,pool_postgres)) list_thread.append(_t) for _t in list_thread: _t.start() ots_client = getConnect_ots() _thread = Thread(target=start_comsumer,args=(task_queue,)) _thread.start() bool_query = BoolQuery(must_queries=[ RangeQuery(attachment_crtime,"2022-05-06"), BoolQuery(should_queries=[TermQuery(attachment_status,10), TermQuery(attachment_status,ATTACHMENT_TOOLARGE)]) ]) rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(attachment_crtime,sort_order=SortOrder.DESC)]),get_total_count=True,limit=100), columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL)) list_dict = getRow_ots(rows) for _dict in list_dict: task_queue.put(_dict,True) _count = len(list_dict) while next_token: rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index", SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100), columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL)) list_dict = getRow_ots(rows) for _dict in list_dict: task_queue.put(_dict,True) _count += len(list_dict) print("%d/%d,queue:%d"%(_count,total_count,task_queue.qsize())) def del_test_doc(): ots_client = getConnect_ots() bool_query = BoolQuery(must_queries=[RangeQuery("docid",range_to=0)]) list_data = [] rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100), columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE)) print(total_count) list_row = getRow_ots(rows) list_data.extend(list_row) while next_token: rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index", SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100), columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE)) list_row = getRow_ots(rows) list_data.extend(list_row) for _row in list_data: _doc = Document_tmp(_row) _doc.delete_row(ots_client) _html = Document_html(_row) if _html.exists_row(ots_client): _html.delete_row(ots_client) def fixDoc_to_queue_extract(): pool_mq = ConnectorPool(10,20,getConnect_activateMQ) try: ots_client = getConnect_ots() ots_capacity = getConnect_ots_capacity() bool_query = BoolQuery(must_queries=[ RangeQuery("crtime","2022-05-31"), TermQuery("docchannel",114) ]) list_data = [] rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100), columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL)) print(total_count) list_row = getRow_ots(rows) list_data.extend(list_row) _count = len(list_row) while next_token: rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index", SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100), columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL)) list_row = getRow_ots(rows) list_data.extend(list_row) _count = len(list_data) print("%d/%d"%(_count,total_count)) task_queue = Queue() for _row in list_data: if "all_columns" in _row: _row.pop("all_columns") _html = Document(_row) task_queue.put(_html) def _handle(item,result_queue): _html = item _html.fix_columns(ots_capacity,["dochtmlcon"],True) print(_html.getProperties().get(document_tmp_docid)) send_msg_toacmq(pool_mq,json.dumps(_html.getProperties()),"/queue/dataflow_extract") mt = MultiThreadHandler(task_queue,_handle,None,30) mt.run() except Exception as e: traceback.print_exc() finally: pool_mq.destory() def check_data_synchronization(): # filepath = "C:\\Users\\Administrator\\Desktop\\to_check.log" # list_uuid = [] # _regrex = "delete\s+(?P[^\s]+)\s+.*ID='(?P.+)'" # with open(filepath,"r",encoding="utf8") as f: # while 1: # _line = f.readline() # if not _line: # break # _match = re.search(_regrex,_line) # if _match is not None: # _uuid = _match.groupdict().get("uuid") # tablename = _match.groupdict.get("tablename") # if _uuid is not None: # list_uuid.append({"uuid":_uuid,"tablename":tablename}) # print("total_count:",len(list_uuid)) import pandas as pd from BaseDataMaintenance.common.Utils import load task_queue = Queue() list_data = [] df_data = load("uuid.pk") # df_data = pd.read_excel("check.xlsx") for uuid,tablename in zip(df_data["uuid"],df_data["tablename"]): _dict = {"uuid":uuid, "tablename":tablename} list_data.append(_dict) task_queue.put(_dict) print("qsize:",task_queue.qsize()) ots_client = getConnect_ots() def _handle(_item,result_queue): bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))]) rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index", SearchQuery(bool_query,get_total_count=True), columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE)) _item["exists"] = total_count mt = MultiThreadHandler(task_queue,_handle,None,30) mt.run() df_data = {"uuid":[], "tablename":[], "exists":[]} for _data in list_data: if _data["exists"]==0: for k,v in df_data.items(): v.append(_data.get(k)) import pandas as pd df2 = pd.DataFrame(df_data) df2.to_excel("check1.xlsx") current_path = os.path.abspath(os.path.dirname(__file__)) def test_ai_extract(): _dochtmlcon = '''
公告内容:

宫腔镜

440101-2025-07530

一、采购人: 广州医科大学附属妇女儿童医疗中心
二、采购计划编号: 440101-2025-07530
三、采购计划名称: 宫腔镜
四、采购品目名称: 医用内窥镜
五、采购预算金额(元): 1920000.00
六、需求时间:
七、采购方式: 公开招标
八、备案时间: 2025-05-06 09:12:11
发布人:广州医科大学附属妇女儿童医疗中心
发布时间: 2025年 05月 06日
''' _extract_json = ''' { "addr_dic": {}, "aptitude": "", "attachmentTypes": "", "bid_score": [], "bidway": "公开招标", "candidate": "", "code": [ "440101-2025-07530" ], "code_investment": "", "cost_time": { "attrs": 0.07, "codename": 0.03, "deposit": 0.0, "district": 0.12, "kvtree": 0.03, "moneygrade": 0.0, "nerToken": 0.04, "outline": 0.03, "pb_extract": 0.16, "person": 0.0, "prem": 0.04, "preprocess": 0.11, "product": 0.02, "product_attrs": 0.01, "roleRuleFinal": 0.01, "rolegrade": 0.0, "rule": 0.01, "rule_channel": 0.03, "rule_channel2": 0.16, "tableToText": 0.03000166893005371, "tendereeRuleRecall": 0.0, "time": 0.01, "total_unit_money": 0.0 }, "demand_info": { "data": [], "header": [], "header_col": [] }, "deposit_patment_way": "", "dict_enterprise": { "广州医科大学附属妇女儿童医疗中心": { "in_text": 1 } }, "district": { "area": "华南", "city": "广州", "district": "未知", "is_in_text": false, "province": "广东" }, "docchannel": { "docchannel": "招标预告", "doctype": "采招数据", "life_docchannel": "招标预告", "use_original_docchannel": 0 }, "docid": "", "doctitle_refine": "宫腔镜", "exist_table": 1, "extract_count": 4, "fail_reason": "", "fingerprint": "md5=27c02035e60e7cc1b7b8be65eedf92fd", "industry": { "class": "零售批发", "class_name": "医疗设备", "subclass": "专用设备" }, "is_deposit_project": false, "label_dic": {}, "match_enterprise": [], "match_enterprise_type": 0, "moneys": [ 1920000.0 ], "moneys_attachment": [], "moneysource": "", "name": "医用内窥镜", "nlp_enterprise": [ "广州医科大学附属妇女儿童医疗中心" ], "nlp_enterprise_attachment": [], "pb": { "industry": "教育及研究", "project_name_refind": "医用内窥镜", "project_property": "新建" }, "pb_project_name": "医用内窥镜", "person_review": [], "pinmu_name": "医用内窥镜", "policies": [], "prem": { "Project": { "code": "", "name": "医用内窥镜", "roleList": [ { "address": "", "linklist": [], "role_money": { "discount_ratio": "", "downward_floating_ratio": "", "floating_ratio": "", "money": 0, "money_unit": "" }, "role_name": "tenderee", "role_prob": 0.9499999463558197, "role_text": "广州医科大学附属妇女儿童医疗中心", "serviceTime": "" } ], "tendereeMoney": "1920000.00", "tendereeMoneyUnit": "元", "uuid": "b1f25984-d473-4995-aec6-dbdba27fa898" } }, "process_time": "2025-05-06 09:24:32", "product": [ "宫腔镜", "医用内窥镜" ], "product_attrs": { "data": [], "header": [], "header_col": [] }, "project_contacts": [], "project_label": { "标题": { "医疗检测设备": [ [ "宫腔镜", 1 ] ] }, "核心字段": { "医疗检测设备": [ [ "宫腔镜", 2 ], [ "内窥镜", 2 ] ] } }, "property_label": "", "proportion": "", "requirement": "", "serviceTime": { "service_days": 0, "service_end": "", "service_start": "" }, "success": true, "time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_contractEnd": "", "time_contractStart": "", "time_earnestMoneyEnd": "", "time_earnestMoneyStart": "", "time_getFileEnd": "", "time_getFileStart": "", "time_listingEnd": "", "time_listingStart": "", "time_planned": "", "time_publicityEnd": "", "time_publicityStart": "", "time_registrationEnd": "", "time_registrationStart": "", "time_release": "2025-05-06", "time_signContract": "", "total_tendereeMoney": 0, "total_tendereeMoneyUnit": "", "version_date": "2025-04-23", "word_count": { "正文": 211, "附件": 0 } } ''' _text = html2text_with_tablehtml(_dochtmlcon) if len(_text)<25000: model_name = "ep-20250314164242-jd62g" #1.5pro 32k else: _text = _text[:100000] model_name = "ep-20250212111145-fflr7" #1.5pro 256k msg = get_prompt_extract_role(_text) #model_name = "ep-20250212111145-fflr7" #1.5pro 256k #model_name = "ep-20250314164242-jd62g" #1.5pro 32k result = chat_doubao(msg,model_name=model_name) _json = get_json_from_text(result) if _json is not None: try: _extract_ai = json.loads(_json) except Exception as e: pass if len(_extract_ai.keys())>0: _new_json,_changed = de.merge_json(_extract_json,_json) print("new_json") print(_new_json) if __name__ == '__main__': # di = Dataflow_init() # di.start_dataflow_init() # transform_attachment() # del_test_doc() de = Dataflow_ActivteMQ_extract(create_listener=False) # print(getUnifyMoney('第1 - 5年承包费为1135元/年/亩,第6 - 10年承包费为1235元/年/亩')) a = ''' { "addr_dic": {}, "aptitude": "", "attachmentTypes": "", "bid_score": [], "bidway": "询价", "candidate": "", "code": [ "20250309921" ], "code_investment": "", "cost_time": { "attrs": 0.07, "codename": 0.03, "deposit": 0.0, "district": 0.1, "kvtree": 0.05, "moneygrade": 0.0, "nerToken": 0.06, "outline": 0.02, "pb_extract": 0.14, "person": 0.0, "prem": 0.01, "preprocess": 0.1, "product": 0.03, "product_attrs": 0.0, "roleRuleFinal": 0.01, "rolegrade": 0.0, "rule": 0.02, "rule_channel": 0.02, "rule_channel2": 0.25, "tableToText": 0.04000095367431641, "tendereeRuleRecall": 0.0, "time": 0.0, "total_unit_money": 0.0 }, "demand_info": { "data": [], "header": [], "header_col": [] }, "deposit_patment_way": "", "dict_enterprise": { "五矿铜业(湖南)有限公司": { "credit_code": "91430482081376930E", "in_text": 1 } }, "district": { "area": "华中", "city": "未知", "district": "未知", "is_in_text": false, "province": "湖南" }, "docchannel": { "docchannel": "招标公告", "doctype": "采招数据", "life_docchannel": "招标公告", "use_original_docchannel": 0 }, "docid": "", "doctitle_refine": "湖南有色铜业方闸", "exist_table": 1, "extract_count": 2, "fail_reason": "", "fingerprint": "md5=84d3541612f56fc9dc07b0cd7fa2c644", "industry": { "class": "零售批发", "class_name": "有色金属冶炼及压延产品", "subclass": "建筑建材" }, "is_deposit_project": false, "label_dic": { "is_direct_procurement": 1 }, "match_enterprise": [], "match_enterprise_type": 0, "moneys": [], "moneys_attachment": [], "moneysource": "", "name": "湖南有色铜业方闸采购", "nlp_enterprise": [ "五矿铜业(湖南)有限公司" ], "nlp_enterprise_attachment": [], "pb": { "location": "湖南", "project_name_refind": "湖南有色铜业方闸采购", "project_property": "新建" }, "pb_project_name": "湖南有色铜业方闸采购", "person_review": [], "pinmu_name": "", "policies": [], "prem": {}, "process_time": "2025-04-16 14:58:20", "product": [ "铜业方闸", "手电不锈钢复合式速开方闸门配一体式启闭机" ], "product_attrs": { "data": [], "header": [], "header_col": [] }, "project_contacts": [], "project_label": { "标题": {}, "核心字段": { "门类": [ [ "闸门", 1 ] ] } }, "property_label": "", "proportion": "", "requirement": "", "serviceTime": { "service_days": 0, "service_end": "", "service_start": "" }, "success": true, "time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_contractEnd": "", "time_contractStart": "", "time_earnestMoneyEnd": "", "time_earnestMoneyStart": "", "time_getFileEnd": "", "time_getFileStart": "", "time_listingEnd": "", "time_listingStart": "", "time_planned": "", "time_publicityEnd": "", "time_publicityStart": "", "time_registrationEnd": "", "time_registrationStart": "", "time_release": "", "time_signContract": "", "total_tendereeMoney": 0, "total_tendereeMoneyUnit": "", "version_date": "2025-04-02", "word_count": { "正文": 240, "附件": 0 } } ''' b = ''' {"招标信息":{"招标人名称":"五矿铜业(湖南)有限公司","项目预算":"","招标人联系方式":[{"联系人":"李锟","联系电话":"13575144492"}]},"中标信息":[{"中标人名称":"","中标金额":"","标段号":""}]} ''' # print(de.should_to_extract_ai(a)) print(de.merge_json(a,b)) print(test_ai_extract()) # de.start_flow_extract() # fixDoc_to_queue_extract() # check_data_synchronization() # fixDoc_to_queue_init(filename="C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-07.xlsx")