from BaseDataMaintenance.maintenance.dataflow import * from BaseDataMaintenance.common.activateMQUtils import * from BaseDataMaintenance.dataSource.source import getConnect_activateMQ,getConnection_postgres,getConnection_mysql,getConnection_oracle,getConnect_ots_capacity,getConnect_redis_doc from BaseDataMaintenance.dataSource.setttings import * from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres import os from BaseDataMaintenance.common.ossUtils import * from BaseDataMaintenance.dataSource.pool import ConnectorPool from BaseDataMaintenance.model.ots.document import Document,document_attachment_path_filemd5 from BaseDataMaintenance.common.Utils import article_limit from BaseDataMaintenance.common.documentFingerprint import getFingerprint from BaseDataMaintenance.model.postgres.document_extract import * from BaseDataMaintenance.model.oracle.T_SHEN_PI_XIANG_MU import * import sys sys.setrecursionlimit(1000000) from multiprocessing import Process class ActiveMQListener(): def __init__(self,conn,_queue,*args,**kwargs): self.conn = conn self._queue = _queue def on_error(self, headers): log("===============") log('received an error %s' % str(headers.body)) def on_message(self, headers): log("====message====") message_id = headers.headers["message-id"] body = headers.body self._queue.put({"frame":headers,"conn":self.conn},True) def __del__(self): self.conn.disconnect() class Dataflow_ActivteMQ_attachment(Dataflow_attachment): class AttachmentMQListener(): def __init__(self,conn,_func,_idx,*args,**kwargs): self.conn = conn self._func = _func self._idx = _idx def on_error(self, headers): log("===============") log('received an error %s' % str(headers.body)) def on_message(self, headers): try: log("get message of idx:%s"%(str(self._idx))) message_id = headers.headers["message-id"] body = headers.body _dict = {"frame":headers,"conn":self.conn,"idx":self._idx} self._func(_dict=_dict) except Exception as e: traceback.print_exc() pass def __del__(self): self.conn.disconnect() def __init__(self): Dataflow_attachment.__init__(self) self.mq_attachment = "/queue/dataflow_attachment" self.mq_attachment_failed = "/queue/dataflow_attachment_failed" self.mq_extract = "/queue/dataflow_extract" self.queue_attachment_ocr = Queue() self.queue_attachment_not_ocr = Queue() self.comsumer_count = 20 self.comsumer_process_count = 5 self.retry_comsumer_count = 10 self.retry_times = 5 self.list_attachment_comsumer = [] # for _i in range(self.comsumer_count): # listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.queue_attachment) # createComsumer(listener_attachment,self.mq_attachment) # self.list_attachment_comsumer.append(listener_attachment) self.attach_pool = ConnectorPool(10,30,getConnection_postgres) self.redis_pool = ConnectorPool(10,30,getConnect_redis_doc) self.conn_mq = getConnect_activateMQ() self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ) self.session = None for _ in range(self.comsumer_process_count): listener_p = Process(target=self.start_attachment_listener) listener_p.start() # listener_p = Process(target=self.start_attachment_listener) # listener_p.start() def start_attachment_listener(self): for _i in range(self.comsumer_count): listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler ,_i) createComsumer(listener_attachment,self.mq_attachment) self.list_attachment_comsumer.append(listener_attachment) while 1: for i in range(len(self.list_attachment_comsumer)): if self.list_attachment_comsumer[i].conn.is_connected(): continue else: listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,i) createComsumer(listener,self.mq_attachment) self.list_attachment_comsumer[i] = listener time.sleep(5) def monitor_listener(self): for i in range(len(self.list_attachment_comsumer)): if self.list_attachment_comsumer[i].conn.is_connected(): continue else: listener = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment) createComsumer(listener,self.mq_attachment) self.list_attachment_comsumer[i] = listener def process_failed_attachment(self): from BaseDataMaintenance.java.MQInfo import getQueueSize attachment_size = getQueueSize("dataflow_attachment") failed_attachment_size = getQueueSize("dataflow_attachment_failed") if attachment_size<100 and failed_attachment_size>0: list_comsumer = [] for _i in range(self.retry_comsumer_count): listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i) list_comsumer.append(listener_attachment) createComsumer(listener_attachment,self.mq_attachment_failed) while 1: failed_attachment_size = getQueueSize("dataflow_attachment_failed") if failed_attachment_size==0: break time.sleep(10) for _c in list_comsumer: _c.conn.disconnect() def attachment_listener_handler(self,_dict): try: frame = _dict["frame"] conn = _dict["conn"] message_id = frame.headers["message-id"] item = json.loads(frame.body) _idx = _dict.get("idx",1) page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]")) _dochtmlcon = item.get(document_tmp_dochtmlcon,"") if random.random()<0.2: log("jump by random") if send_msg_toacmq(self.pool_mq,frame.body,self.mq_attachment): ackMsg(conn,message_id) return if len(page_attachments)==0: newitem ={"item":item,"list_attach":[],"message_id":message_id,"conn":conn} else: list_fileMd5 = [] for _atta in page_attachments: list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5)) list_attach = self.getAttachments(list_fileMd5,_dochtmlcon) newitem = {"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn} log("attachment get doc:%s"%(str(newitem.get("item",{}).get("docid")))) self.attachment_recognize(newitem,None) log("attachment get doc:%s succeed"%(str(newitem.get("item",{}).get("docid")))) except Exception as e: traceback.print_exc() def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True): try: list_html = [] swf_urls = [] _not_failed = True for _attach in list_attach: #测试全跑 _filemd5 = _attach.getProperties().get(attachment_filemd5) if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE): log("%s has processed or toolarge"%(_filemd5)) _html = _attach.getProperties().get(attachment_attachmenthtml,"") if _html is None: _html = "" list_html.append({attachment_filemd5:_filemd5, "html":_html}) else: #has process_time then jump if len(str(_attach.getProperties().get(attachment_process_time,"")))>10 and _attach.getProperties().get(attachment_status)!=ATTACHMENT_INIT and not (_attach.getProperties().get(attachment_status)>=ATTACHMENT_MC_FAILED_FROM and _attach.getProperties().get(attachment_status)<=ATTACHMENT_MC_FAILED_TO): log("%s has process_time jump"%(_filemd5)) _html = _attach.getProperties().get(attachment_attachmenthtml,"") if _html is None: _html = "" list_html.append({attachment_filemd5:_filemd5, "html":_html}) else: log("%s requesting interface"%(_filemd5)) _succeed = self.request_attachment_interface(_attach,_dochtmlcon) if not _succeed: _not_failed = False _html = _attach.getProperties().get(attachment_attachmenthtml,"") if _html is None: _html = "" list_html.append({attachment_filemd5:_filemd5, "html":_html}) if _attach.getProperties().get(attachment_filetype)=="swf": # swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]"))) _swf_urls = _attach.getProperties().get(attachment_swfUrls, "[]") if _swf_urls: _swf_urls = _swf_urls.replace('\\', '') else: _swf_urls = '[]' _swf_urls = json.loads(_swf_urls) swf_urls.extend(_swf_urls) if not _not_failed: return False,list_html,swf_urls return True,list_html,swf_urls except requests.ConnectionError as e1: raise e1 except Exception as e: return False,list_html,swf_urls def attachment_recognize(self,_dict,result_queue): ''' 识别附件内容 :param _dict: 附件内容 :param result_queue: :return: ''' try: start_time = time.time() item = _dict.get("item") list_attach = _dict.get("list_attach") conn = _dict["conn"] message_id = _dict.get("message_id") if "retry_times" not in item: item["retry_times"] = 5 _retry_times = item.get("retry_times",0) dhtml = Document_html({"partitionkey":item.get("partitionkey"), "docid":item.get("docid")}) _dochtmlcon = item.get(document_tmp_dochtmlcon,"") dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True) dhtml.delete_bidi_a() #调用识别接口 _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True) # 将附件分类写回document page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]")) if len(page_attachments)>0: for _attachment in page_attachments: filemd5 = _attachment.get(document_attachment_path_filemd5,"") classification = None for _attach in list_attach: if _attach.getProperties().get(attachment_filemd5,"")==filemd5: classification = _attach.getProperties().get(attachment_classification,"") break if classification is not None: _attachment[attachment_classification] = classification item[document_tmp_attachment_path] = json.dumps(page_attachments,ensure_ascii=False) dtmp = Document_tmp(item) _to_ack = False if not _succeed and _retry_times=self.retry_times: send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed) send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment) #失败保存 if _retry_times==0: dtmp.setValue(document_tmp_dochtmlcon,"",False) dtmp.setValue(document_tmp_status,0,True) if not dtmp.exists_row(self.ots_client): dtmp.update_row(self.ots_client) dhtml.update_row(self.ots_client) if send_succeed: _to_ack = True else: try: log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),str(len(list_html)))) dhtml.updateSWFImages(swf_urls) dhtml.updateAttachment(list_html) dtmp.setValue(document_tmp_attachment_extract_status,1,True) dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True) send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract) if send_succeed: _to_ack = True except Exception as e: traceback.print_exc() if _to_ack: ackMsg(conn,message_id) log("document:%d get attachments with result:%s %s retry_times:%d"%(item.get("docid"),str(_succeed),str(_to_ack),_retry_times)) except Exception as e: traceback.print_exc() if time.time()-start_time<10: item["retry_times"] -= 1 if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment): ackMsg(conn,message_id) def request_attachment_interface(self,attach,_dochtmlcon): filemd5 = attach.getProperties().get(attachment_filemd5) _status = attach.getProperties().get(attachment_status) _filetype = attach.getProperties().get(attachment_filetype) _path = attach.getProperties().get(attachment_path) _uuid = uuid4() objectPath = attach.getProperties().get(attachment_path) docids = attach.getProperties().get(attachment_docids) _ots_exists = attach.getProperties().get("ots_exists") if objectPath is None: relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex) else: relative_path = objectPath[5:].replace("//","/") localpath = "/FileInfo/%s"%(relative_path) if not os.path.exists(localpath): if not os.path.exists(os.path.dirname(localpath)): os.makedirs(os.path.dirname(localpath)) local_exists = False else: local_exists = True _size = os.path.getsize(localpath) not_failed_flag = True try: d_start_time = time.time() if not local_exists: log("md5:%s path:%s not exists,start downloading"%(filemd5,objectPath)) try: download_succeed = downloadFile(self.bucket,objectPath,localpath) except Exception as e: download_succeed = False else: log("md5:%s path:%s exists"%(filemd5,objectPath[5:])) if not (local_exists or download_succeed): _ots_attach = attachment(attach.getProperties_ots()) _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_classification,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True) log("md5:%s path:%s file not in local or oss,search ots.attachment"%(filemd5,objectPath)) if _ots_attach.getProperties().get(attachment_attachmenthtml,"")!="" and str(_ots_attach.getProperties().get(attachment_status))!=str(ATTACHMENT_INIT): attach.setValue(attachment_attachmenthtml,_ots_attach.getProperties().get(attachment_attachmenthtml,"")) attach.setValue(attachment_attachmentcon,_ots_attach.getProperties().get(attachment_attachmentcon,"")) attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,"")) attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,"")) attach.setValue(attachment_classification,_ots_attach.getProperties().get(attachment_classification,"")) # if attach.exists(self.attach_pool): # attach.update_row(self.attach_pool) # else: # attach.insert_row(self.attach_pool) self.putAttach_json_toRedis(filemd5,attach.getProperties()) try: if os.exists(localpath): os.remove(localpath) except Exception as e: pass return True if _ots_exists: objectPath = attach.getProperties().get(attachment_path) download_succeed = downloadFile(self.bucket,objectPath,localpath) if download_succeed: log("md5:%s path:%s download file from oss succeed"%(filemd5,objectPath)) else: log("md5:%s path:%s download file from ots failed=="%(filemd5,objectPath)) else: log("md5:%s path:%s not found in ots"%(filemd5,objectPath)) if local_exists or download_succeed: _size = os.path.getsize(localpath) attach.setValue(attachment_size,_size,True) if _size>ATTACHMENT_LARGESIZE: attach.setValue(attachment_status, ATTACHMENT_TOOLARGE,True) log("attachment :%s of path:%s to large"%(filemd5,_path)) _ots_attach = attachment(attach.getProperties_ots()) _ots_attach.update_row(self.ots_client) # #更新postgres # if attach.exists(self.attach_pool): # attach.update_row(self.attach_pool) # else: # attach.insert_row(self.attach_pool) self.putAttach_json_toRedis(filemd5,attach.getProperties()) if local_exists: if not _ots_exists: upload_status = uploadFileByPath(self.bucket,localpath,objectPath) os.remove(localpath) return True time_download = time.time()-d_start_time #调用接口处理结果 start_time = time.time() _filetype = attach.getProperties().get(attachment_filetype) # _data_base64 = base64.b64encode(open(localpath,"rb").read()) # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype) _success,_html,swf_images,classification = getAttachDealInterface(None,_filetype,path=localpath,session=self.session) _reg_time = time.time()-start_time if _success: if len(_html)<5: _html = "" else: if len(_html)>1: _html = "interface return error" else: # sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html))) _html = "" return False # 重跑swf时,删除原来的swf_urls中的"\" if attach.getProperties().get(attachment_filetype) == "swf": swf_urls = attach.getProperties().get(attachment_swfUrls, "[]") swf_urls = swf_urls.replace('\\', '') if swf_urls else '[]' swf_urls = json.loads(swf_urls) attach.setValue(attachment_swfUrls, json.dumps(swf_urls, ensure_ascii=False), True) swf_images = eval(swf_images) if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0: swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]")) if len(swf_urls)==0: objectPath = attach.getProperties().get(attachment_path,"") swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex) if not os.path.exists(swf_dir): os.mkdir(swf_dir) for _i in range(len(swf_images)): _base = swf_images[_i] _base = base64.b64decode(_base) filename = "swf_page_%d.png"%(_i) filepath = os.path.join(swf_dir,filename) with open(filepath,"wb") as f: f.write(_base) swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir) if os.path.exists(swf_dir): os.rmdir(swf_dir) attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True) else: attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True) if re.search("=0: _path = filelink.split("/file") if len(_path)>1: return _path[1] def getAttach_json_fromRedis(self,filemd5): db = self.redis_pool.getConnector() try: _key = "attach-%s"%(filemd5) _attach_json = db.get(_key) return _attach_json except Exception as e: log("getAttach_json_fromRedis error %s"%(str(e))) finally: try: if db.connection.check_health(): self.redis_pool.putConnector(db) except Exception as e: pass return None def putAttach_json_toRedis(self,filemd5,extract_dict): db = self.redis_pool.getConnector() try: new_dict = {} for k,v in extract_dict.items(): if not isinstance(v,set): new_dict[k] = v _key = "attach-%s"%(filemd5) _extract_json = db.set(str(_key),json.dumps(new_dict)) db.expire(_key,3600*3) return _extract_json except Exception as e: log("putExtract_json_toRedis error%s"%(str(e))) traceback.print_exc() finally: try: if db.connection.check_health(): self.redis_pool.putConnector(db) except Exception as e: pass def getAttachments(self,list_filemd5,_dochtmlcon): conn = self.attach_pool.getConnector() #搜索postgres try: to_find_md5 = [] for _filemd5 in list_filemd5[:50]: if _filemd5 is not None: to_find_md5.append(_filemd5) conditions = ["filemd5 in ('%s')"%("','".join(to_find_md5))] list_attachment = [] set_md5 = set() # list_attachment = Attachment_postgres.select_rows(conn,Attachment_postgres,"attachment",conditions) # for _attach in list_attachment: # set_md5.add(_attach.getProperties().get(attachment_filemd5)) for _filemd5 in to_find_md5: _json = self.getAttach_json_fromRedis(_filemd5) if _json is not None: set_md5.add(_filemd5) list_attachment.append(Attachment_postgres(json.loads(_json))) log("select localpath database %d/%d"%(len(set_md5),len(to_find_md5))) for _filemd5 in to_find_md5: if _filemd5 not in set_md5: _path = self.getAttachPath(_filemd5,_dochtmlcon) log("getAttachments search in ots:%s"%(_filemd5)) _attach = {attachment_filemd5:_filemd5} _attach_ots = attachment(_attach) if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time,attachment_classification],True): if _attach_ots.getProperties().get(attachment_status) is not None: log("getAttachments find in ots:%s"%(_filemd5)) _attach_pg = Attachment_postgres(_attach_ots.getProperties()) _attach_pg.setValue("ots_exists",True,True) list_attachment.append(_attach_pg) else: log("getAttachments search in path:%s"%(_filemd5)) if _path: log("getAttachments find in path:%s"%(_filemd5)) if _path[0]=="/": _path = _path[1:] _filetype = _path.split(".")[-1] _attach = {attachment_filemd5:_filemd5, attachment_filetype:_filetype, attachment_status:20, attachment_path:"%s/%s"%(_filemd5[:4],_path), attachment_crtime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")} list_attachment.append(Attachment_postgres(_attach)) return list_attachment except Exception as e: log("attachProcess comsumer error %s"%str(e)) log(str(to_find_md5)) traceback.print_exc() return [] finally: self.attach_pool.putConnector(conn) def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]): q_size = self.queue_attachment.qsize() qsize_ocr = self.queue_attachment_ocr.qsize() qsize_not_ocr = self.queue_attachment_not_ocr.qsize() log("queue_attachment:%d,queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(q_size,qsize_ocr,qsize_not_ocr)) def flow_attachment_producer_comsumer(self): log("start flow_attachment comsumer") mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1,need_stop=False,restart=True) mt.run() def flow_attachment_process(self): self.process_comsumer() # p = Process(target = self.process_comsumer) # p.start() # p.join() def set_queue(self,_dict): list_attach = _dict.get("list_attach") to_ocr = False for attach in list_attach: if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]: to_ocr = True break if to_ocr: self.queue_attachment_ocr.put(_dict,True) else: self.queue_attachment_not_ocr.put(_dict,True) def comsumer_handle(self,_dict,result_queue): try: frame = _dict["frame"] conn = _dict["conn"] message_id = frame.headers["message-id"] item = json.loads(frame.body) page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]")) _dochtmlcon = item.get(document_tmp_dochtmlcon,"") if len(page_attachments)==0: self.set_queue({"item":item,"list_attach":[],"message_id":message_id,"conn":conn}) else: list_fileMd5 = [] for _atta in page_attachments: list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5)) list_attach = self.getAttachments(list_fileMd5,_dochtmlcon) self.set_queue({"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn}) except Exception as e: traceback.print_exc() def remove_attachment_postgres(self): current_date = getCurrent_date(format="%Y-%m-%d") last_date = timeAdd(current_date,-2,format="%Y-%m-%d") sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date) conn = self.attach_pool.getConnector() try: cursor = conn.cursor() cursor.execute(sql) conn.commit() self.attach_pool.putConnector(conn) except Exception as e: conn.close() def start_flow_attachment(self): schedule = BlockingScheduler() # schedule.add_job(self.flow_attachment_process,"cron",second="*/20") # schedule.add_job(self.flow_attachment,"cron",second="*/10") # schedule.add_job(self.flow_attachment_producer,"cron",second="*/10") # schedule.add_job(self.flow_attachment_producer_comsumer,"cron",second="*/10") # schedule.add_job(self.monitor_listener,"cron",minute="*/1") # schedule.add_job(self.monitor_attachment_process,"cron",second="*/10") # schedule.add_job(self.remove_attachment_postgres,"cron",hour="6") schedule.add_job(self.process_failed_attachment,"cron",minute="*/10") schedule.start() class Dataflow_ActivteMQ_extract(Dataflow_extract): class ExtractListener(): def __init__(self,conn,_func,_idx,*args,**kwargs): self.conn = conn self._func = _func self._idx = _idx def on_message(self, headers): try: log("get message of idx:%d"%(self._idx)) message_id = headers.headers["message-id"] body = headers.body log("get message %s crtime:%s"%(message_id,json.loads(body).get("crtime",""))) self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None) except Exception as e: traceback.print_exc() pass def on_error(self, headers): log('received an error %s' % str(headers.body)) def __del__(self): self.conn.disconnect() def __init__(self,create_listener=True): Dataflow_extract.__init__(self) self.industy_url = "http://127.0.0.1:15000/industry_extract" self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",20], ["http://192.168.0.115:15030/content_extract",10] ] self.mq_extract = "/queue/dataflow_extract" self.mq_extract_failed = "/queue/dataflow_extract_failed" self.whole_weight = 0 for _url,weight in self.extract_interfaces: self.whole_weight+= weight current_weight = 0 for _i in range(len(self.extract_interfaces)): current_weight += self.extract_interfaces[_i][1] self.extract_interfaces[_i][1] = current_weight/self.whole_weight self.comsumer_count = 5 # self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres) self.pool_redis_doc = ConnectorPool(1,self.comsumer_count,getConnect_redis_doc) self.conn_mq = getConnect_activateMQ() self.pool_mq = ConnectorPool(1,30,getConnect_activateMQ) self.block_url = RLock() self.url_count = 0 self.session = None self.list_extract_comsumer = [] # for _i in range(self.comsumer_count): # listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i) # createComsumer(listener_extract,self.mq_extract) # self.list_extract_comsumer.append(listener_extract) if create_listener: for ii in range(10): listener_p = Process(target=self.start_extract_listener) listener_p.start() def start_extract_listener(self): self.list_extract_comsumer = [] for _i in range(self.comsumer_count): listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i) createComsumer(listener_extract,self.mq_extract) self.list_extract_comsumer.append(listener_extract) while 1: try: for _i in range(len(self.list_extract_comsumer)): if self.list_extract_comsumer[_i].conn.is_connected(): continue else: listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i) createComsumer(listener,self.mq_extract) self.list_extract_comsumer[_i] = listener time.sleep(5) except Exception as e: traceback.print_exc() def monitor_listener(self): for i in range(len(self.list_extract_comsumer)): if self.list_extract_comsumer[i].conn.is_connected(): continue else: listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle) createComsumer(listener,self.mq_extract) self.list_extract_comsumer[i] = listener def getExtract_url(self): # _url_num = 0 # with self.block_url: # self.url_count += 1 # self.url_count %= self.whole_weight # _url_num = self.url_count _r = random.random() # _r = _url_num/self.whole_weight for _i in range(len(self.extract_interfaces)): if _r<=self.extract_interfaces[_i][1]: return self.extract_interfaces[_i][0] def request_extract_interface(self,json,headers): # _i = random.randint(0,len(self.extract_interfaces)-1) # _i = 0 # _url = self.extract_interfaces[_i] _url = self.getExtract_url() log("extract_url:%s"%(str(_url))) with requests.Session() as session: resp = session.post(_url,json=json,headers=headers,timeout=10*60) return resp def request_industry_interface(self,json,headers): resp = requests.post(self.industy_url,json=json,headers=headers) return resp def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]): q_size = self.queue_extract.qsize() log("queue extract size:%d"%(q_size)) def process_extract_failed(self): def _handle(_dict,result_queue): frame = _dict.get("frame") message_id = frame.headers["message-id"] subscription = frame.headers.setdefault('subscription', None) conn = _dict.get("conn") body = frame.body if body is not None: item = json.loads(body) item["extract_times"] = 10 if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract): ackMsg(conn,message_id,subscription) from BaseDataMaintenance.java.MQInfo import getQueueSize try: extract_failed_size = getQueueSize("dataflow_extract_failed") extract_size = getQueueSize("dataflow_extract") log("extract_failed_size %s extract_size %s"%(str(extract_failed_size),str(extract_size))) if extract_failed_size>0 and extract_size<100: failed_listener = self.ExtractListener(getConnect_activateMQ(),_handle,1) createComsumer(failed_listener,self.mq_extract_failed) while 1: extract_failed_size = getQueueSize("dataflow_extract_failed") if extract_failed_size==0: break time.sleep(10) failed_listener.conn.disconnect() except Exception as e: traceback.print_exc() def flow_extract(self,): self.comsumer() def comsumer(self): mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,20,1,True) mt.run() def getExtract_json_fromDB(self,_fingerprint): conn = self.pool_postgres.getConnector() try: list_extract = Document_extract_postgres.select_rows(conn,Document_extract_postgres,"document_extract",[" fingerprint='%s'"%_fingerprint]) if len(list_extract)>0: _extract = list_extract[0] return _extract.getProperties().get(document_extract_extract_json) except Exception as e: traceback.print_exc() finally: self.pool_postgres.putConnector(conn) return None def putExtract_json_toDB(self,fingerprint,docid,extract_json): _de = Document_extract_postgres({document_extract_fingerprint:fingerprint, document_extract_docid:docid, document_extract_extract_json:extract_json}) _de.insert_row(self.pool_postgres,1) def getExtract_json_fromRedis(self,_fingerprint): db = self.pool_redis_doc.getConnector() try: _extract_json = db.get(_fingerprint) return _extract_json except Exception as e: log("getExtract_json_fromRedis error %s"%(str(e))) finally: try: if db.connection.check_health(): self.pool_redis_doc.putConnector(db) except Exception as e: pass return None def putExtract_json_toRedis(self,fingerprint,extract_json): db = self.pool_redis_doc.getConnector() try: _extract_json = db.set(str(fingerprint),extract_json) db.expire(fingerprint,3600*2) return _extract_json except Exception as e: log("putExtract_json_toRedis error%s"%(str(e))) traceback.print_exc() finally: try: if db.connection.check_health(): self.pool_redis_doc.putConnector(db) except Exception as e: pass def comsumer_handle(self,_dict,result_queue): try: log("start handle") data = {} frame = _dict["frame"] conn = _dict["conn"] message_id = frame.headers["message-id"] subscription = frame.headers.setdefault('subscription', None) item = json.loads(frame.body) dtmp = Document_tmp(item) dhtml = Document_html({"partitionkey":item.get("partitionkey"), "docid":item.get("docid")}) extract_times = item.get("extract_times",0)+1 item["extract_times"] = extract_times _dochtmlcon = item.get(document_tmp_dochtmlcon,"") html_len = len(_dochtmlcon) # html 文本长度 limit_text_len = 50000 # 内容(或附件)正文限制文本长度 if html_len > limit_text_len: log("docid %s dochtmlcon too long len %d "%(str(item.get("docid")),html_len)) try: _dochtmlcon = re.sub("|||", "", _dochtmlcon) _soup = BeautifulSoup(_dochtmlcon,"lxml") all_len = len(_soup.get_text()) # 全公告内容text长度 _attachment = _soup.find("div", attrs={"class": "richTextFetch"}) attachment_len = len(_attachment.get_text()) if _attachment else 0 # 附件内容text长度 main_text_len = all_len - attachment_len # 正文内容text长度 if attachment_len>150000: # 附件内容过长删除(处理超时) if _attachment is not None: _attachment.decompose() attachment_len = 0 # 正文或附件内容text长度大于limit_text_len才执行article_limit if main_text_len>limit_text_len or attachment_len>limit_text_len: _soup = article_limit(_soup,limit_text_len) _dochtmlcon = str(_soup) except Exception as e: traceback.print_exc() ackMsg(conn,message_id,subscription) return log("docid %s len %d limit to %d"%(str(item.get("docid")),html_len,len(_dochtmlcon))) dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True) _extract = Document_extract({}) _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey)) _extract.setValue(document_extract2_docid,item.get(document_docid)) all_done = 1 for k,v in item.items(): data[k] = v data["timeout"] = 440 data["doc_id"] = data.get(document_tmp_docid,0) # if data["docid"]<298986054 and data["docid"]>0: # log("jump docid %s"%(str(data["docid"]))) # ackMsg(conn,message_id,subscription) # return data["content"] = data.get(document_tmp_dochtmlcon,"") if document_tmp_dochtmlcon in data: data.pop(document_tmp_dochtmlcon) data["title"] = data.get(document_tmp_doctitle,"") data["web_source_no"] = item.get(document_tmp_web_source_no,"") data["web_source_name"] = item.get(document_tmp_web_source_name,"") data["original_docchannel"] = item.get(document_tmp_original_docchannel,"") data["page_attachments"] = item.get(document_tmp_attachment_path,"[]") _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))+str(data["original_docchannel"]) if all_done>0: _time = time.time() # extract_json = self.getExtract_json_fromDB(_fingerprint) extract_json = self.getExtract_json_fromRedis(_fingerprint) log("get json from db takes %.4f"%(time.time()-_time)) # extract_json = None _docid = int(data["doc_id"]) if extract_json is not None: log("fingerprint %s exists docid:%s"%(_fingerprint,str(_docid))) _extract.setValue(document_extract2_extract_json,extract_json,True) else: resp = self.request_extract_interface(json=data,headers=self.header) if (resp.status_code >=200 and resp.status_code<=213): extract_json = resp.content.decode("utf8") _extract.setValue(document_extract2_extract_json,extract_json,True) _time = time.time() # self.putExtract_json_toDB(_fingerprint,_docid,extract_json) self.putExtract_json_toRedis(_fingerprint,extract_json) log("get json to db takes %.4f"%(time.time()-_time)) else: log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8"))) all_done = -2 # if all_done>0: # resp = self.request_industry_interface(json=data,headers=self.header) # if (resp.status_code >=200 and resp.status_code<=213): # _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True) # else: # log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8"))) # all_done = -3 # _to_ack = False # if all_done>0 and len(_extract.getProperties().get(document_extract2_extract_json,""))<=2: # all_done = -4 _extract.setValue(document_extract2_industry_json,"{}",True) _to_ack = True try: if all_done!=1: # sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done)) log("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done)) if extract_times>=10: #process as succeed dtmp.setValue(document_tmp_dochtmlcon,"",False) dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True) dtmp.update_row(self.ots_client) dhtml.update_row(self.ots_client) #replace as {} _extract.setValue(document_extract2_extract_json,"{}",True) _extract.setValue(document_extract2_industry_json,"{}",True) _extract.setValue(document_extract2_status,random.randint(1,50),True) _extract.update_row(self.ots_client) _to_ack = True elif extract_times>5: #transform to the extract_failed queue if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed): #process as succeed dtmp.setValue(document_tmp_dochtmlcon,"",False) dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True) dtmp.update_row(self.ots_client) dhtml.update_row(self.ots_client) #replace as {} _extract.setValue(document_extract2_extract_json,"{}",True) _extract.setValue(document_extract2_industry_json,"{}",True) _extract.setValue(document_extract2_status,random.randint(1,50),True) _extract.update_row(self.ots_client) _to_ack = True else: send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract) #失败保存 dtmp.setValue(document_tmp_dochtmlcon,"",False) dtmp.setValue(document_tmp_status,60,True) if not dtmp.exists_row(self.ots_client): dtmp.update_row(self.ots_client) dhtml.update_row(self.ots_client) if send_succeed: _to_ack = True else: #process succeed dtmp.setValue(document_tmp_dochtmlcon,"",False) dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True) if _docid==290816703: dtmp.setValue("test_json",extract_json,True) dtmp.update_row(self.ots_client) dhtml.update_row(self.ots_client) _extract.setValue(document_extract2_status,random.randint(1,50),True) _extract.update_row(self.ots_client) _to_ack = True except Exception: traceback.print_exc() if _to_ack: ackMsg(conn,message_id,subscription) else: item["extract_times"] -= 1 send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract) ackMsg(conn,message_id,subscription) log("process %s docid:%d %s"%(str(_to_ack),data.get("doc_id"),str(all_done))) except requests.ConnectionError as e1: item["extract_times"] -= 1 if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract): ackMsg(conn,message_id,subscription) except Exception as e: traceback.print_exc() # sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e))) log("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e))) log("process %s docid: failed message_id:%s"%(data.get("doc_id"),message_id)) if extract_times>=10: #process as succeed dtmp.setValue(document_tmp_dochtmlcon,"",False) dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True) dtmp.update_row(self.ots_client) dhtml.update_row(self.ots_client) #replace as {} _extract.setValue(document_extract2_extract_json,"{}",True) _extract.setValue(document_extract2_industry_json,"{}",True) _extract.setValue(document_extract2_status,random.randint(1,50),True) _extract.update_row(self.ots_client) ackMsg(conn,message_id,subscription) elif extract_times>5: #transform to the extract_failed queue if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed): #process as succeed dtmp.setValue(document_tmp_dochtmlcon,"",False) dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True) dtmp.update_row(self.ots_client) dhtml.update_row(self.ots_client) #replace as {} _extract.setValue(document_extract2_extract_json,"{}",True) _extract.setValue(document_extract2_industry_json,"{}",True) _extract.setValue(document_extract2_status,random.randint(1,50),True) _extract.update_row(self.ots_client) ackMsg(conn,message_id,subscription) else: #transform to the extract queue #失败保存 dtmp.setValue(document_tmp_dochtmlcon,"",False) dtmp.setValue(document_tmp_status,60,True) if not dtmp.exists_row(self.ots_client): dtmp.update_row(self.ots_client) dhtml.update_row(self.ots_client) if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract): ackMsg(conn,message_id,subscription) def delete_document_extract(self,save_count=70*10000): conn = self.pool_postgres.getConnector() try: cursor = conn.cursor() sql = " select max(docid),min(docid) from document_extract " cursor.execute(sql) rows = cursor.fetchall() if len(rows)>0: maxdocid,mindocid = rows[0] d_mindocid = int(maxdocid)-save_count if mindocid=self.get_count: self.getRangeDocid() next_docid = self.begin_docid+self.count self.count += 1 return next_docid def on_message(self, headers): try: next_docid = int(self.getNextDocid()) partitionkey = int(next_docid%500+1) message_id = headers.headers["message-id"] body = json.loads(headers.body) body[document_tmp_partitionkey] = partitionkey body[document_tmp_docid] = next_docid if body.get(document_original_docchannel) is None: body[document_original_docchannel] = body.get(document_docchannel) page_attachments = body.get(document_tmp_attachment_path,"[]") _uuid = body.get(document_tmp_uuid,"") if page_attachments!="[]": status = random.randint(1,10) body[document_tmp_status] = status if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment): log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid))) ackMsg(self.conn,message_id) else: log("send_msg_error on init listener") else: status = random.randint(11,50) body[document_tmp_status] = status if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract): log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid))) ackMsg(self.conn,message_id) else: log("send_msg_error on init listener") except Exception as e: traceback.print_exc() if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_init): log("init error") ackMsg(self.conn,message_id) def __del__(self): self.conn.disconnect() del self.pool_mq1 def __init__(self): Dataflow.__init__(self) self.max_shenpi_id = None self.base_shenpi_id = 400000000000 self.mq_init = "/queue/dataflow_init" self.mq_attachment = "/queue/dataflow_attachment" self.mq_extract = "/queue/dataflow_extract" self.pool_oracle = ConnectorPool(10,15,getConnection_oracle) self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ) self.ots_capacity = getConnect_ots_capacity() self.init_comsumer_counts = 2 self.list_init_comsumer = [] for i in range(self.init_comsumer_counts): listener = self.InitListener(getConnect_activateMQ()) createComsumer(listener,self.mq_init) self.list_init_comsumer.append(listener) def monitor_listener(self): for i in range(len(self.list_init_comsumer)): if self.list_init_comsumer[i].conn.is_connected(): continue else: listener = self.InitListener(getConnect_activateMQ()) createComsumer(listener,self.mq_init) self.list_init_comsumer[i] = listener def temp2mq(self,object): conn_oracle = self.pool_oracle.getConnector() try: list_obj = object.select_rows(conn_oracle,type(object),object.table_name,[]) for _obj in list_obj: ots_dict = _obj.getProperties_ots() if len(ots_dict.get("dochtmlcon",""))>500000: _obj.delete_row(conn_oracle) log("msg too long:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon","")))) continue if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_init): #删除数据,上线放开 _obj.delete_row(conn_oracle) else: log("send_msg_error111:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon","")))) self.pool_oracle.putConnector(conn_oracle) except Exception as e: traceback.print_exc() self.pool_oracle.decrease() def shenpi2mq(self): conn_oracle = self.pool_oracle.getConnector() try: if self.max_shenpi_id is None: # get the max_shenpi_id _query = BoolQuery(must_queries=[ExistsQuery("id")]) rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index", SearchQuery(_query,sort=Sort(sorters=[FieldSort("id",SortOrder.DESC)]),limit=1)) list_data = getRow_ots(rows) if len(list_data)>0: max_shenpi_id = list_data[0].get("id") if max_shenpi_id>self.base_shenpi_id: max_shenpi_id -= self.base_shenpi_id self.max_shenpi_id = max_shenpi_id if self.max_shenpi_id<60383953: self.max_shenpi_id = 60383953 if self.max_shenpi_id is not None: # select data in order origin_max_shenpi_id = T_SHEN_PI_XIANG_MU.get_max_id(conn_oracle) if origin_max_shenpi_id is not None: log("shenpi origin_max_shenpi_id:%d current_id:%d"%(origin_max_shenpi_id,self.max_shenpi_id)) for _id_i in range(self.max_shenpi_id+1,origin_max_shenpi_id+1): list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i) # send data to mq one by one with max_shenpi_id updated for _data in list_data: _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID) ots_dict = _data.getProperties_ots() if ots_dict["docid"]0: continue # bool_query = BoolQuery(must_queries=[ # TermQuery("id",_id_i), # ]) # rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index", # SearchQuery(bool_query,get_total_count=True)) # if total_count>0: # continue try: list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i) except Exception as e: continue # send data to mq one by one with max_shenpi_id updated for _data in list_data: _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID) ots_dict = _data.getProperties_ots() if ots_dict["docid"]=1000: break def _handle(_data,result_queue): _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey), document_tmp_docid:_data.get(document_tmp_docid), document_tmp_status:0} _document = Document(_d) _document.fix_columns(self.ots_client,None,True) _data = _document.getProperties() page_attachments = _data.get(document_tmp_attachment_path,"[]") _document_html = Document(_data) _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True) if page_attachments!="[]": status = random.randint(1,10) _data[document_tmp_status] = status send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment) else: status = random.randint(11,50) _data[document_tmp_status] = status send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract) if send_succeed: _document.setValue(document_tmp_status,0,True) _document.update_row(self.ots_client) else: log("send_msg_error2222") if task_queue.qsize()>0: mt = MultiThreadHandler(task_queue,_handle,None,15) mt.run() except Exception as e: traceback.print_exc() def otstmp2mq(self): try: bool_query = BoolQuery(must_queries=[TermQuery("status",0)]) rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100), ColumnsToGet(return_type=ColumnReturnType.ALL)) list_data = getRow_ots(rows) for _data in list_data: _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey), document_tmp_docid:_data.get(document_tmp_docid), document_tmp_status:0} _document = Document_tmp(_d) page_attachments = _data.get(document_tmp_attachment_path,"[]") log("refix doc %s from document_tmp"%(str(_data.get(document_tmp_docid)))) _document_html = Document_html(_data) _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True) if page_attachments!="[]": status = random.randint(1,10) _data[document_tmp_status] = status send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment) else: status = random.randint(11,50) _data[document_tmp_status] = status send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract) if send_succeed: _document.setValue(document_tmp_status,1,True) _document.update_row(self.ots_client) else: log("send_msg_error2222") while next_token: rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index", SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100), ColumnsToGet(return_type=ColumnReturnType.ALL)) list_data = getRow_ots(rows) for _data in list_data: _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey), document_tmp_docid:_data.get(document_tmp_docid), document_tmp_status:0} _document = Document_tmp(_d) page_attachments = _data.get(document_tmp_attachment_path,"[]") _document_html = Document_html(_data) _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True) if page_attachments!="[]": status = random.randint(1,10) _data[document_tmp_status] = status send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment) else: status = random.randint(11,50) _data[document_tmp_status] = status send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract) if send_succeed: _document.setValue(document_tmp_status,1,True) _document.update_row(self.ots_client) else: log("send_msg_error2222") except Exception as e: traceback.print_exc() def test_dump_docid(self): class TestDumpListener(ActiveMQListener): def on_message(self, headers): message_id = headers.headers["message-id"] body = headers.body self._queue.put(headers,True) ackMsg(self.conn,message_id) _queue = Queue() listener1 = TestDumpListener(getConnect_activateMQ(),_queue) listener2 = TestDumpListener(getConnect_activateMQ(),_queue) createComsumer(listener1,"/queue/dataflow_attachment") createComsumer(listener2,"/queue/dataflow_extract") time.sleep(10) list_item = [] list_docid = [] while 1: try: _item = _queue.get(timeout=2) list_item.append(_item) except Exception as e: break for item in list_item: _item = json.loads(item.body) list_docid.append(_item.get("docid")) log(list_docid[:10]) log("len docid:%d set len:%d"%(len(list_docid),len(set(list_docid)))) def start_dataflow_init(self): # self.test_dump_docid() from BaseDataMaintenance.model.oracle.CaiGouYiXiangTemp import CaiGouYiXiangTemp from BaseDataMaintenance.model.oracle.PaiMaiChuRangTemp import PaiMaiChuRangTemp from BaseDataMaintenance.model.oracle.ZhaoBiaoGongGaoTemp import ZhaoBiaoGongGaoTemp from BaseDataMaintenance.model.oracle.ZhaoBiaoYuGaoTemp import ZhaoBiaoYuGaoTemp from BaseDataMaintenance.model.oracle.ZhongBiaoXinXiTemp import ZhongBiaoXinXiTemp from BaseDataMaintenance.model.oracle.ZiShenJieGuoTemp import ZiShenJieGuoTemp from BaseDataMaintenance.model.oracle.ChanQuanJiaoYiTemp import ChanQuanJiaoYiTemp from BaseDataMaintenance.model.oracle.GongGaoBianGengTemp import GongGaoBianGeng from BaseDataMaintenance.model.oracle.KongZhiJiaTemp import KongZhiJiaTemp from BaseDataMaintenance.model.oracle.TuDiKuangChanTemp import TuDiKuangChanTemp from BaseDataMaintenance.model.oracle.ZhaoBiaoDaYiTemp import ZhaoBiaoDaYiTemp from BaseDataMaintenance.model.oracle.ZhaoBiaoWenJianTemp import ZhaoBiaoWenJianTemp from BaseDataMaintenance.model.oracle.TouSuChuLiTemp import TouSuChuLiTemp from BaseDataMaintenance.model.oracle.WeiFaJiLuTemp import WeiFaJiLuTemp from BaseDataMaintenance.model.oracle.QiTaShiXinTemp import QiTaShiXin schedule = BlockingScheduler() schedule.add_job(self.temp2mq,"cron",args=(CaiGouYiXiangTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(PaiMaiChuRangTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoGongGaoTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoYuGaoTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(ZhongBiaoXinXiTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(ZiShenJieGuoTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(ChanQuanJiaoYiTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(GongGaoBianGeng({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(KongZhiJiaTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(TuDiKuangChanTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(TouSuChuLiTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(WeiFaJiLuTemp({}),),second="*/10") schedule.add_job(self.temp2mq,"cron",args=(QiTaShiXin({}),),second="*/10") schedule.add_job(self.ots2mq,"cron",second="*/10") schedule.add_job(self.otstmp2mq,"cron",second="*/10") schedule.add_job(self.monitor_listener,"cron",minute="*/1") schedule.add_job(self.shenpi2mq,"cron",minute="*/1") schedule.start() def transform_attachment(): from BaseDataMaintenance.model.ots.attachment import attachment from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots from threading import Thread from queue import Queue task_queue = Queue() def comsumer(task_queue,pool_postgres): while 1: _dict = task_queue.get(True) try: attach = Attachment_postgres(_dict) if not attach.exists(pool_postgres): attach.insert_row(pool_postgres) except Exception as e: traceback.print_exc() def start_comsumer(task_queue): pool_postgres = ConnectorPool(10,30,getConnection_postgres) comsumer_count = 30 list_thread = [] for i in range(comsumer_count): _t = Thread(target=comsumer,args=(task_queue,pool_postgres)) list_thread.append(_t) for _t in list_thread: _t.start() ots_client = getConnect_ots() _thread = Thread(target=start_comsumer,args=(task_queue,)) _thread.start() bool_query = BoolQuery(must_queries=[ RangeQuery(attachment_crtime,"2022-05-06"), BoolQuery(should_queries=[TermQuery(attachment_status,10), TermQuery(attachment_status,ATTACHMENT_TOOLARGE)]) ]) rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(attachment_crtime,sort_order=SortOrder.DESC)]),get_total_count=True,limit=100), columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL)) list_dict = getRow_ots(rows) for _dict in list_dict: task_queue.put(_dict,True) _count = len(list_dict) while next_token: rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index", SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100), columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL)) list_dict = getRow_ots(rows) for _dict in list_dict: task_queue.put(_dict,True) _count += len(list_dict) print("%d/%d,queue:%d"%(_count,total_count,task_queue.qsize())) def del_test_doc(): ots_client = getConnect_ots() bool_query = BoolQuery(must_queries=[RangeQuery("docid",range_to=0)]) list_data = [] rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100), columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE)) print(total_count) list_row = getRow_ots(rows) list_data.extend(list_row) while next_token: rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index", SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100), columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE)) list_row = getRow_ots(rows) list_data.extend(list_row) for _row in list_data: _doc = Document_tmp(_row) _doc.delete_row(ots_client) _html = Document_html(_row) if _html.exists_row(ots_client): _html.delete_row(ots_client) def fixDoc_to_queue_extract(): pool_mq = ConnectorPool(10,20,getConnect_activateMQ) try: ots_client = getConnect_ots() ots_capacity = getConnect_ots_capacity() bool_query = BoolQuery(must_queries=[ RangeQuery("crtime","2022-05-31"), TermQuery("docchannel",114) ]) list_data = [] rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100), columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL)) print(total_count) list_row = getRow_ots(rows) list_data.extend(list_row) _count = len(list_row) while next_token: rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index", SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100), columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL)) list_row = getRow_ots(rows) list_data.extend(list_row) _count = len(list_data) print("%d/%d"%(_count,total_count)) task_queue = Queue() for _row in list_data: if "all_columns" in _row: _row.pop("all_columns") _html = Document(_row) task_queue.put(_html) def _handle(item,result_queue): _html = item _html.fix_columns(ots_capacity,["dochtmlcon"],True) print(_html.getProperties().get(document_tmp_docid)) send_msg_toacmq(pool_mq,json.dumps(_html.getProperties()),"/queue/dataflow_extract") mt = MultiThreadHandler(task_queue,_handle,None,30) mt.run() except Exception as e: traceback.print_exc() finally: pool_mq.destory() def check_data_synchronization(): # filepath = "C:\\Users\\Administrator\\Desktop\\to_check.log" # list_uuid = [] # _regrex = "delete\s+(?P[^\s]+)\s+.*ID='(?P.+)'" # with open(filepath,"r",encoding="utf8") as f: # while 1: # _line = f.readline() # if not _line: # break # _match = re.search(_regrex,_line) # if _match is not None: # _uuid = _match.groupdict().get("uuid") # tablename = _match.groupdict.get("tablename") # if _uuid is not None: # list_uuid.append({"uuid":_uuid,"tablename":tablename}) # print("total_count:",len(list_uuid)) import pandas as pd from BaseDataMaintenance.common.Utils import load task_queue = Queue() list_data = [] df_data = load("uuid.pk") # df_data = pd.read_excel("check.xlsx") for uuid,tablename in zip(df_data["uuid"],df_data["tablename"]): _dict = {"uuid":uuid, "tablename":tablename} list_data.append(_dict) task_queue.put(_dict) print("qsize:",task_queue.qsize()) ots_client = getConnect_ots() def _handle(_item,result_queue): bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))]) rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index", SearchQuery(bool_query,get_total_count=True), columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE)) _item["exists"] = total_count mt = MultiThreadHandler(task_queue,_handle,None,30) mt.run() df_data = {"uuid":[], "tablename":[], "exists":[]} for _data in list_data: if _data["exists"]==0: for k,v in df_data.items(): v.append(_data.get(k)) import pandas as pd df2 = pd.DataFrame(df_data) df2.to_excel("check1.xlsx") current_path = os.path.abspath(os.path.dirname(__file__)) def fixDoc_to_queue_init(filename=""): import pandas as pd from BaseDataMaintenance.model.oracle.GongGaoTemp import dict_oracle2ots from BaseDataMaintenance.model.oracle.TouSuTemp import dict_oracle2ots as dict_oracle2ots_tousu if filename=="": filename = os.path.join(current_path,"check.xlsx") df = pd.read_excel(filename) if "docchannel" in dict_oracle2ots: dict_oracle2ots.pop("docchannel") row_name = ",".join(list(dict_oracle2ots.keys())) list_tousu_keys = [] for k,v in dict_oracle2ots_tousu.items(): if str(k).isupper(): list_tousu_keys.append(k) row_name_tousu = ",".join(list(list_tousu_keys)) conn = getConnection_oracle() cursor = conn.cursor() _count = 0 for uuid,tablename,_exists,_toolong in zip(df["uuid"],df["tablename"],df["exists"],df["tolong"]): if _exists==0 and _toolong==0: _count += 1 is_tousu = False if tablename in ('bxkc.t_wei_fa_ji_lu_temp','bxkc.t_tou_su_chu_li_temp','bxkc.t_qi_ta_shi_xin_temp'): is_tousu = True _source = str(tablename).replace("_TEMP","") if is_tousu: _source = str(tablename).replace("_temp","") _rowname = row_name_tousu if is_tousu else row_name sql = " insert into %s(%s) select %s from %s where id='%s' "%(tablename,_rowname,_rowname,_source,uuid) log("%d:%s"%(_count,sql)) cursor.execute(sql) conn.commit() conn.close() return _count if __name__ == '__main__': # di = Dataflow_init() # di.start_dataflow_init() # transform_attachment() # del_test_doc() # de = Dataflow_ActivteMQ_extract() # de.start_flow_extract() # fixDoc_to_queue_extract() # check_data_synchronization() # fixDoc_to_queue_init(filename="C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-07.xlsx") current_date = getCurrent_date(format="%Y-%m-%d") last_date = timeAdd(current_date,-30,format="%Y-%m-%d") sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date) print(sql)