123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709 |
- #coding:utf8
- from BaseDataMaintenance.maintenance.dataflow import *
- from BaseDataMaintenance.common.activateMQUtils import *
- from BaseDataMaintenance.dataSource.source import getConnect_activateMQ,getConnection_postgres,getConnection_mysql,getConnection_oracle,getConnect_ots_capacity,getConnect_redis_doc
- from BaseDataMaintenance.dataSource.setttings import *
- from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
- import os
- from BaseDataMaintenance.common.ossUtils import *
- from BaseDataMaintenance.dataSource.pool import ConnectorPool
- from BaseDataMaintenance.model.ots.document import Document,document_attachment_path_filemd5
- from BaseDataMaintenance.common.Utils import article_limit
- from BaseDataMaintenance.common.documentFingerprint import getFingerprint
- from BaseDataMaintenance.model.postgres.document_extract import *
- from BaseDataMaintenance.model.oracle.T_SHEN_PI_XIANG_MU import *
- import sys
- sys.setrecursionlimit(1000000)
- from multiprocessing import Process
- from BaseDataMaintenance.AIUtils.DoubaoUtils import chat_doubao,get_json_from_text
- from BaseDataMaintenance.AIUtils.html2text import html2text_with_tablehtml
- from BaseDataMaintenance.AIUtils.prompts import get_prompt_extract_role
- from BaseDataMaintenance.common.Utils import getUnifyMoney
- from BaseDataMaintenance.maintenance.product.medical_product import MedicalProduct
- class ActiveMQListener():
- def __init__(self,conn,_queue,*args,**kwargs):
- self.conn = conn
- self._queue = _queue
- def on_error(self, headers):
- log("===============")
- log('received an error %s' % str(headers.body))
- def on_message(self, headers):
- log("====message====")
- message_id = headers.headers["message-id"]
- body = headers.body
- self._queue.put({"frame":headers,"conn":self.conn},True)
- def __del__(self):
- self.conn.disconnect()
- class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
- class AttachmentMQListener():
- def __init__(self,conn,_func,_idx,*args,**kwargs):
- self.conn = conn
- self._func = _func
- self._idx = _idx
- def on_error(self, headers):
- log("===============")
- log('received an error %s' % str(headers.body))
- def on_message(self, headers):
- try:
- log("get message of idx:%s"%(str(self._idx)))
- message_id = headers.headers["message-id"]
- body = headers.body
- _dict = {"frame":headers,"conn":self.conn,"idx":self._idx}
- self._func(_dict=_dict)
- except Exception as e:
- traceback.print_exc()
- pass
- def __del__(self):
- self.conn.disconnect()
- def __init__(self):
- Dataflow_attachment.__init__(self)
- self.mq_attachment = "/queue/dataflow_attachment"
- self.mq_attachment_failed = "/queue/dataflow_attachment_failed"
- self.mq_extract = "/queue/dataflow_extract"
- self.queue_attachment_ocr = Queue()
- self.queue_attachment_not_ocr = Queue()
- self.comsumer_count = 20
- self.comsumer_process_count = 5
- self.retry_comsumer_count = 10
- self.retry_times = 5
- self.list_attachment_comsumer = []
- # for _i in range(self.comsumer_count):
- # listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.queue_attachment)
- # createComsumer(listener_attachment,self.mq_attachment)
- # self.list_attachment_comsumer.append(listener_attachment)
- self.attach_pool = ConnectorPool(10,30,getConnection_postgres)
- self.redis_pool = ConnectorPool(10,30,getConnect_redis_doc)
- self.conn_mq = getConnect_activateMQ()
- self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
- self.session = None
- for _ in range(self.comsumer_process_count):
- listener_p = Process(target=self.start_attachment_listener)
- listener_p.start()
- # listener_p = Process(target=self.start_attachment_listener)
- # listener_p.start()
- def start_attachment_listener(self):
- for _i in range(self.comsumer_count):
- listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler ,_i)
- createComsumer(listener_attachment,self.mq_attachment)
- self.list_attachment_comsumer.append(listener_attachment)
- while 1:
- for i in range(len(self.list_attachment_comsumer)):
- if self.list_attachment_comsumer[i].conn.is_connected():
- continue
- else:
- listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,i)
- createComsumer(listener,self.mq_attachment)
- self.list_attachment_comsumer[i] = listener
- time.sleep(5)
- def monitor_listener(self):
- for i in range(len(self.list_attachment_comsumer)):
- if self.list_attachment_comsumer[i].conn.is_connected():
- continue
- else:
- listener = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
- createComsumer(listener,self.mq_attachment)
- self.list_attachment_comsumer[i] = listener
- def process_failed_attachment(self):
- from BaseDataMaintenance.java.MQInfo import getQueueSize
- attachment_size = getQueueSize("dataflow_attachment")
- failed_attachment_size = getQueueSize("dataflow_attachment_failed")
- if attachment_size<100 and failed_attachment_size>0:
- list_comsumer = []
- for _i in range(self.retry_comsumer_count):
- listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
- list_comsumer.append(listener_attachment)
- createComsumer(listener_attachment,self.mq_attachment_failed)
- while 1:
- failed_attachment_size = getQueueSize("dataflow_attachment_failed")
- if failed_attachment_size==0:
- break
- time.sleep(10)
- for _c in list_comsumer:
- _c.conn.disconnect()
- def attachment_listener_handler(self,_dict):
- try:
- frame = _dict["frame"]
- conn = _dict["conn"]
- message_id = frame.headers["message-id"]
- item = json.loads(frame.body)
- _idx = _dict.get("idx",1)
- page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
- _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
- if random.random()<0.2:
- log("jump by random")
- if send_msg_toacmq(self.pool_mq,frame.body,self.mq_attachment):
- ackMsg(conn,message_id)
- return
- if len(page_attachments)==0:
- newitem ={"item":item,"list_attach":[],"message_id":message_id,"conn":conn}
- else:
- list_fileMd5 = []
- for _atta in page_attachments:
- list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
- list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
- newitem = {"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn}
- log("attachment get doc:%s"%(str(newitem.get("item",{}).get("docid"))))
- self.attachment_recognize(newitem,None)
- log("attachment get doc:%s succeed"%(str(newitem.get("item",{}).get("docid"))))
- except Exception as e:
- traceback.print_exc()
- def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
- try:
- list_html = []
- swf_urls = []
- _not_failed = True
- for _attach in list_attach:
- #测试全跑
- _filemd5 = _attach.getProperties().get(attachment_filemd5)
- if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
- log("%s has processed or toolarge"%(_filemd5))
- _html = _attach.getProperties().get(attachment_attachmenthtml,"")
- if _html is None:
- _html = ""
- list_html.append({attachment_filemd5:_filemd5,
- "html":_html})
- else:
- #has process_time then jump
- if len(str(_attach.getProperties().get(attachment_process_time,"")))>10 and _attach.getProperties().get(attachment_status)!=ATTACHMENT_INIT and not (_attach.getProperties().get(attachment_status)>=ATTACHMENT_MC_FAILED_FROM and _attach.getProperties().get(attachment_status)<=ATTACHMENT_MC_FAILED_TO):
- log("%s has process_time jump"%(_filemd5))
- _html = _attach.getProperties().get(attachment_attachmenthtml,"")
- if _html is None:
- _html = ""
- list_html.append({attachment_filemd5:_filemd5,
- "html":_html})
- else:
- log("%s requesting interface"%(_filemd5))
- _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
- if not _succeed:
- _not_failed = False
- _html = _attach.getProperties().get(attachment_attachmenthtml,"")
- if _html is None:
- _html = ""
- list_html.append({attachment_filemd5:_filemd5,
- "html":_html})
- if _attach.getProperties().get(attachment_filetype)=="swf":
- # swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
- _swf_urls = _attach.getProperties().get(attachment_swfUrls, "[]")
- if _swf_urls:
- _swf_urls = _swf_urls.replace('\\', '')
- else:
- _swf_urls = '[]'
- _swf_urls = json.loads(_swf_urls)
- swf_urls.extend(_swf_urls)
- if not _not_failed:
- return False,list_html,swf_urls
- return True,list_html,swf_urls
- except requests.ConnectionError as e1:
- raise e1
- except Exception as e:
- return False,list_html,swf_urls
- def attachment_recognize(self,_dict,result_queue):
- '''
- 识别附件内容
- :param _dict: 附件内容
- :param result_queue:
- :return:
- '''
- try:
- start_time = time.time()
- item = _dict.get("item")
- list_attach = _dict.get("list_attach")
- conn = _dict["conn"]
- message_id = _dict.get("message_id")
- if "retry_times" not in item:
- item["retry_times"] = 5
- _retry_times = item.get("retry_times",0)
- dhtml = Document_html({"partitionkey":item.get("partitionkey"),
- "docid":item.get("docid")})
- _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
- dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
- dhtml.delete_bidi_a()
- #调用识别接口
- _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
- # 将附件分类写回document
- page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
- if len(page_attachments)>0:
- for _attachment in page_attachments:
- filemd5 = _attachment.get(document_attachment_path_filemd5,"")
- classification = None
- for _attach in list_attach:
- if _attach.getProperties().get(attachment_filemd5,"")==filemd5:
- classification = _attach.getProperties().get(attachment_classification,"")
- break
- if classification is not None:
- _attachment[attachment_classification] = classification
- item[document_tmp_attachment_path] = json.dumps(page_attachments,ensure_ascii=False)
- dtmp = Document_tmp(item)
- _to_ack = False
- if not _succeed and _retry_times<self.retry_times:
- item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
- item["retry_times"] = _retry_times+1
- #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
- if item["retry_times"]>=self.retry_times:
- send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
- send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
- #失败保存
- if _retry_times==0:
- dtmp.setValue(document_tmp_dochtmlcon,"",False)
- dtmp.setValue(document_tmp_status,0,True)
- if not dtmp.exists_row(self.ots_client):
- dtmp.update_row(self.ots_client)
- dhtml.update_row(self.ots_client)
- if send_succeed:
- _to_ack = True
- else:
- try:
- log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),str(len(list_html))))
- dhtml.updateSWFImages(swf_urls)
- dhtml.updateAttachment(list_html)
- dtmp.setValue(document_tmp_attachment_extract_status,1,True)
- dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
- send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
- if send_succeed:
- _to_ack = True
- except Exception as e:
- traceback.print_exc()
- if _to_ack:
- ackMsg(conn,message_id)
- log("document:%d get attachments with result:%s %s retry_times:%d"%(item.get("docid"),str(_succeed),str(_to_ack),_retry_times))
- except Exception as e:
- traceback.print_exc()
- if time.time()-start_time<10:
- item["retry_times"] -= 1
- if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment):
- ackMsg(conn,message_id)
- def request_attachment_interface(self,attach,_dochtmlcon):
- filemd5 = attach.getProperties().get(attachment_filemd5)
- _status = attach.getProperties().get(attachment_status)
- _filetype = attach.getProperties().get(attachment_filetype)
- _path = attach.getProperties().get(attachment_path)
- _uuid = uuid4()
- objectPath = attach.getProperties().get(attachment_path)
- docids = attach.getProperties().get(attachment_docids)
- _ots_exists = attach.getProperties().get("ots_exists")
- if objectPath is None:
- relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex)
- else:
- relative_path = objectPath[5:].replace("//","/")
- localpath = "/FileInfo/%s"%(relative_path)
- if not os.path.exists(localpath):
- if not os.path.exists(os.path.dirname(localpath)):
- os.makedirs(os.path.dirname(localpath))
- local_exists = False
- else:
- local_exists = True
- _size = os.path.getsize(localpath)
- not_failed_flag = True
- try:
- d_start_time = time.time()
- if not local_exists:
- log("md5:%s path:%s not exists,start downloading"%(filemd5,objectPath))
- try:
- download_succeed = downloadFile(self.bucket,objectPath,localpath)
- except Exception as e:
- download_succeed = False
- else:
- log("md5:%s path:%s exists"%(filemd5,objectPath[5:]))
- if not (local_exists or download_succeed):
- _ots_attach = attachment(attach.getProperties_ots())
- _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_classification,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True)
- log("md5:%s path:%s file not in local or oss,search ots.attachment"%(filemd5,objectPath))
- if _ots_attach.getProperties().get(attachment_attachmenthtml,"")!="" and str(_ots_attach.getProperties().get(attachment_status))!=str(ATTACHMENT_INIT):
- attach.setValue(attachment_attachmenthtml,_ots_attach.getProperties().get(attachment_attachmenthtml,""))
- attach.setValue(attachment_attachmentcon,_ots_attach.getProperties().get(attachment_attachmentcon,""))
- attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,""))
- attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,""))
- attach.setValue(attachment_classification,_ots_attach.getProperties().get(attachment_classification,""))
- # if attach.exists(self.attach_pool):
- # attach.update_row(self.attach_pool)
- # else:
- # attach.insert_row(self.attach_pool)
- self.putAttach_json_toRedis(filemd5,attach.getProperties())
- try:
- if os.exists(localpath):
- os.remove(localpath)
- except Exception as e:
- pass
- return True
- if _ots_exists:
- objectPath = attach.getProperties().get(attachment_path)
- download_succeed = downloadFile(self.bucket,objectPath,localpath)
- if download_succeed:
- log("md5:%s path:%s download file from oss succeed"%(filemd5,objectPath))
- else:
- log("md5:%s path:%s download file from ots failed=="%(filemd5,objectPath))
- else:
- log("md5:%s path:%s not found in ots"%(filemd5,objectPath))
- if local_exists or download_succeed:
- _size = os.path.getsize(localpath)
- attach.setValue(attachment_size,_size,True)
- if _size>ATTACHMENT_LARGESIZE:
- attach.setValue(attachment_status, ATTACHMENT_TOOLARGE,True)
- log("attachment :%s of path:%s to large"%(filemd5,_path))
- _ots_attach = attachment(attach.getProperties_ots())
- _ots_attach.update_row(self.ots_client)
- # #更新postgres
- # if attach.exists(self.attach_pool):
- # attach.update_row(self.attach_pool)
- # else:
- # attach.insert_row(self.attach_pool)
- self.putAttach_json_toRedis(filemd5,attach.getProperties())
- if local_exists:
- if not _ots_exists:
- upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
- os.remove(localpath)
- return True
- time_download = time.time()-d_start_time
- #调用接口处理结果
- start_time = time.time()
- _filetype = attach.getProperties().get(attachment_filetype)
- # _data_base64 = base64.b64encode(open(localpath,"rb").read())
- # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
- _success,_html,swf_images,classification = getAttachDealInterface(None,_filetype,path=localpath,session=self.session)
- _reg_time = time.time()-start_time
- if _success:
- if len(_html)<5:
- _html = ""
- else:
- if len(_html)>1:
- _html = "interface return error"
- else:
- # sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
- _html = ""
- return False
- # 重跑swf时,删除原来的swf_urls中的"\"
- if attach.getProperties().get(attachment_filetype) == "swf":
- swf_urls = attach.getProperties().get(attachment_swfUrls, "[]")
- swf_urls = swf_urls.replace('\\', '') if swf_urls else '[]'
- swf_urls = json.loads(swf_urls)
- attach.setValue(attachment_swfUrls, json.dumps(swf_urls, ensure_ascii=False), True)
- swf_images = eval(swf_images)
- if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
- swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
- if len(swf_urls)==0:
- objectPath = attach.getProperties().get(attachment_path,"")
- swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
- if not os.path.exists(swf_dir):
- os.mkdir(swf_dir)
- for _i in range(len(swf_images)):
- _base = swf_images[_i]
- _base = base64.b64decode(_base)
- filename = "swf_page_%d.png"%(_i)
- filepath = os.path.join(swf_dir,filename)
- with open(filepath,"wb") as f:
- f.write(_base)
- swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
- if os.path.exists(swf_dir):
- os.rmdir(swf_dir)
- attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
- else:
- attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
- if re.search("<td",_html) is not None:
- attach.setValue(attachment_has_table,1,True)
- _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
- filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
- if _file_title!="":
- attach.setValue(attachment_file_title,_file_title,True)
- if filelink!="":
- attach.setValue(attachment_file_link,filelink,True)
- attach.setValue(attachment_attachmenthtml,_html,True)
- attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
- attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
- attach.setValue(attachment_recsize,len(_html),True)
- attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
- attach.setValue(attachment_classification,classification,True)
- #更新ots
- _ots_attach = attachment(attach.getProperties_ots())
- _ots_attach.update_row(self.ots_client) #线上再开放更新
- # #更新postgres
- # if attach.exists(self.attach_pool):
- # attach.update_row(self.attach_pool)
- # else:
- # attach.insert_row(self.attach_pool)
- self.putAttach_json_toRedis(filemd5,attach.getProperties())
- start_time = time.time()
- if local_exists:
- if not _ots_exists:
- upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
- try:
- if upload_status and os.exists(localpath):
- os.remove(localpath)
- except Exception as e:
- pass
- _upload_time = time.time()-start_time
- log("process filemd5:%s %s of type:%s with size:%.3fM download:%.2fs recognize takes %ds upload takes %.2fs _ots_exists %s,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,_reg_time,_upload_time,str(_ots_exists),len(_html)))
- return True
- else:
- return True
- except requests.ConnectionError as e1:
- raise e1
- except oss2.exceptions.NotFound as e:
- return True
- except Exception as e:
- traceback.print_exc()
- # def flow_attachment(self):
- # self.flow_attachment_producer()
- # self.flow_attachment_producer_comsumer()
- def getAttachPath(self,filemd5,_dochtmlcon):
- _soup = BeautifulSoup(_dochtmlcon,"lxml")
- list_mark = ["data","filelink"]
- for _mark in list_mark:
- _find = _soup.find("a",attrs={_mark:filemd5})
- filelink = ""
- if _find is None:
- _find = _soup.find("img",attrs={_mark:filemd5})
- if _find is not None:
- filelink = _find.attrs.get("src","")
- else:
- filelink = _find.attrs.get("href","")
- if filelink.find("bidizhaobiao")>=0:
- _path = filelink.split("/file")
- if len(_path)>1:
- return _path[1]
- def getAttach_json_fromRedis(self,filemd5):
- db = self.redis_pool.getConnector()
- try:
- _key = "attach-%s"%(filemd5)
- _attach_json = db.get(_key)
- return _attach_json
- except Exception as e:
- log("getAttach_json_fromRedis error %s"%(str(e)))
- finally:
- try:
- if db.connection.check_health():
- self.redis_pool.putConnector(db)
- except Exception as e:
- pass
- return None
- def putAttach_json_toRedis(self,filemd5,extract_dict):
- db = self.redis_pool.getConnector()
- try:
- new_dict = {}
- for k,v in extract_dict.items():
- if not isinstance(v,set):
- new_dict[k] = v
- _key = "attach-%s"%(filemd5)
- _extract_json = db.set(str(_key),json.dumps(new_dict))
- db.expire(_key,3600*3)
- return _extract_json
- except Exception as e:
- log("putExtract_json_toRedis error%s"%(str(e)))
- traceback.print_exc()
- finally:
- try:
- if db.connection.check_health():
- self.redis_pool.putConnector(db)
- except Exception as e:
- pass
- def getAttachments(self,list_filemd5,_dochtmlcon):
- conn = self.attach_pool.getConnector()
- #搜索postgres
- try:
- to_find_md5 = []
- for _filemd5 in list_filemd5[:50]:
- if _filemd5 is not None:
- to_find_md5.append(_filemd5)
- conditions = ["filemd5 in ('%s')"%("','".join(to_find_md5))]
- list_attachment = []
- set_md5 = set()
- # list_attachment = Attachment_postgres.select_rows(conn,Attachment_postgres,"attachment",conditions)
- # for _attach in list_attachment:
- # set_md5.add(_attach.getProperties().get(attachment_filemd5))
- for _filemd5 in to_find_md5:
- _json = self.getAttach_json_fromRedis(_filemd5)
- if _json is not None:
- set_md5.add(_filemd5)
- list_attachment.append(Attachment_postgres(json.loads(_json)))
- log("select localpath database %d/%d"%(len(set_md5),len(to_find_md5)))
- for _filemd5 in to_find_md5:
- if _filemd5 not in set_md5:
- _path = self.getAttachPath(_filemd5,_dochtmlcon)
- log("getAttachments search in ots:%s"%(_filemd5))
- _attach = {attachment_filemd5:_filemd5}
- _attach_ots = attachment(_attach)
- if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time,attachment_classification],True):
- if _attach_ots.getProperties().get(attachment_status) is not None:
- log("getAttachments find in ots:%s"%(_filemd5))
- _attach_pg = Attachment_postgres(_attach_ots.getProperties())
- _attach_pg.setValue("ots_exists",True,True)
- list_attachment.append(_attach_pg)
- else:
- log("getAttachments status None find in ots:%s"%(_filemd5))
- _attach_pg = Attachment_postgres(_attach_ots.getProperties())
- _attach_pg.setValue("ots_exists",True,True)
- list_attachment.append(_attach_pg)
- else:
- log("getAttachments search in path:%s"%(_filemd5))
- if _path:
- log("getAttachments find in path:%s"%(_filemd5))
- if _path[0]=="/":
- _path = _path[1:]
- _filetype = _path.split(".")[-1]
- _attach = {attachment_filemd5:_filemd5,
- attachment_filetype:_filetype,
- attachment_status:20,
- attachment_path:"%s/%s"%(_filemd5[:4],_path),
- attachment_crtime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
- list_attachment.append(Attachment_postgres(_attach))
- return list_attachment
- except Exception as e:
- log("attachProcess comsumer error %s"%str(e))
- log(str(to_find_md5))
- traceback.print_exc()
- return []
- finally:
- self.attach_pool.putConnector(conn)
- def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
- q_size = self.queue_attachment.qsize()
- qsize_ocr = self.queue_attachment_ocr.qsize()
- qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
- log("queue_attachment:%d,queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(q_size,qsize_ocr,qsize_not_ocr))
- def flow_attachment_producer_comsumer(self):
- log("start flow_attachment comsumer")
- mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1,need_stop=False,restart=True)
- mt.run()
- def flow_attachment_process(self):
- self.process_comsumer()
- # p = Process(target = self.process_comsumer)
- # p.start()
- # p.join()
- def set_queue(self,_dict):
- list_attach = _dict.get("list_attach")
- to_ocr = False
- for attach in list_attach:
- if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
- to_ocr = True
- break
- if to_ocr:
- self.queue_attachment_ocr.put(_dict,True)
- else:
- self.queue_attachment_not_ocr.put(_dict,True)
- def comsumer_handle(self,_dict,result_queue):
- try:
- frame = _dict["frame"]
- conn = _dict["conn"]
- message_id = frame.headers["message-id"]
- item = json.loads(frame.body)
- page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
- _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
- if len(page_attachments)==0:
- self.set_queue({"item":item,"list_attach":[],"message_id":message_id,"conn":conn})
- else:
- list_fileMd5 = []
- for _atta in page_attachments:
- list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
- list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
- self.set_queue({"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn})
- except Exception as e:
- traceback.print_exc()
- def remove_attachment_postgres(self):
- current_date = getCurrent_date(format="%Y-%m-%d")
- last_date = timeAdd(current_date,-2,format="%Y-%m-%d")
- sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
- conn = self.attach_pool.getConnector()
- try:
- cursor = conn.cursor()
- cursor.execute(sql)
- conn.commit()
- self.attach_pool.putConnector(conn)
- except Exception as e:
- conn.close()
- def start_flow_attachment(self):
- schedule = BlockingScheduler()
- # schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
- # schedule.add_job(self.flow_attachment,"cron",second="*/10")
- # schedule.add_job(self.flow_attachment_producer,"cron",second="*/10")
- # schedule.add_job(self.flow_attachment_producer_comsumer,"cron",second="*/10")
- # schedule.add_job(self.monitor_listener,"cron",minute="*/1")
- # schedule.add_job(self.monitor_attachment_process,"cron",second="*/10")
- # schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
- schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
- schedule.start()
- class Dataflow_ActivteMQ_extract(Dataflow_extract):
- class ExtractListener():
- def __init__(self,conn,_func,_idx,*args,**kwargs):
- self.conn = conn
- self._func = _func
- self._idx = _idx
- def on_message(self, headers):
- try:
- log("get message of idx:%d"%(self._idx))
- message_id = headers.headers["message-id"]
- body = headers.body
- log("get message %s crtime:%s"%(message_id,json.loads(body).get("crtime","")))
- self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
- except Exception as e:
- traceback.print_exc()
- pass
- def on_error(self, headers):
- log('received an error %s' % str(headers.body))
- def __del__(self):
- self.conn.disconnect()
- def __init__(self,create_listener=True):
- Dataflow_extract.__init__(self)
- self.industy_url = "http://127.0.0.1:15000/industry_extract"
- self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",20],
- ["http://192.168.0.115:15030/content_extract",10]
- ]
- self.mq_extract = "/queue/dataflow_extract"
- self.mq_extract_ai = "/queue/dataflow_extract_AI"
- self.mq_extract_failed = "/queue/dataflow_extract_failed"
- self.whole_weight = 0
- for _url,weight in self.extract_interfaces:
- self.whole_weight+= weight
- current_weight = 0
- for _i in range(len(self.extract_interfaces)):
- current_weight += self.extract_interfaces[_i][1]
- self.extract_interfaces[_i][1] = current_weight/self.whole_weight
- self.comsumer_count = 5
- # self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres)
- self.pool_redis_doc = ConnectorPool(1,self.comsumer_count,getConnect_redis_doc)
- init_nums = 1
- self.conn_mq = None
- if create_listener:
- self.conn_mq = getConnect_activateMQ()
- else:
- init_nums = 0
- self.pool_mq = ConnectorPool(init_nums,30,getConnect_activateMQ)
- self.block_url = RLock()
- self.url_count = 0
- self.session = None
- self.MP = MedicalProduct()
- self.list_extract_comsumer = []
- self.list_extract_ai_comsumer = []
- # for _i in range(self.comsumer_count):
- # listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
- # createComsumer(listener_extract,self.mq_extract)
- # self.list_extract_comsumer.append(listener_extract)
- # 提取listener
- if create_listener:
- for ii in range(10):
- listener_p = Process(target=self.start_extract_listener)
- listener_p.start()
- listener_p_ai = Thread(target=self.start_extract_AI_listener)
- listener_p_ai.start()
- def start_extract_AI_listener(self,_count=6):
- self.list_extract_ai_comsumer = []
- for _i in range(_count):
- listener_extract = self.ExtractListener(getConnect_activateMQ(),self.extract_ai_handle,_i)
- createComsumer(listener_extract,self.mq_extract_ai)
- self.list_extract_ai_comsumer.append(listener_extract)
- while 1:
- try:
- for _i in range(len(self.list_extract_ai_comsumer)):
- if self.list_extract_ai_comsumer[_i].conn.is_connected():
- continue
- else:
- listener = self.ExtractListener(getConnect_activateMQ(),self.extract_ai_handle,_i)
- createComsumer(listener,self.mq_extract_ai)
- self.list_extract_ai_comsumer[_i] = listener
- time.sleep(5)
- except Exception as e:
- traceback.print_exc()
- def start_extract_listener(self):
- self.list_extract_comsumer = []
- for _i in range(self.comsumer_count):
- listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
- createComsumer(listener_extract,self.mq_extract)
- self.list_extract_comsumer.append(listener_extract)
- while 1:
- try:
- for _i in range(len(self.list_extract_comsumer)):
- if self.list_extract_comsumer[_i].conn.is_connected():
- continue
- else:
- listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
- createComsumer(listener,self.mq_extract)
- self.list_extract_comsumer[_i] = listener
- time.sleep(5)
- except Exception as e:
- traceback.print_exc()
- def monitor_listener(self):
- for i in range(len(self.list_extract_comsumer)):
- if self.list_extract_comsumer[i].conn.is_connected():
- continue
- else:
- listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
- createComsumer(listener,self.mq_extract)
- self.list_extract_comsumer[i] = listener
- def getExtract_url(self):
- # _url_num = 0
- # with self.block_url:
- # self.url_count += 1
- # self.url_count %= self.whole_weight
- # _url_num = self.url_count
- _r = random.random()
- # _r = _url_num/self.whole_weight
- for _i in range(len(self.extract_interfaces)):
- if _r<=self.extract_interfaces[_i][1]:
- return self.extract_interfaces[_i][0]
- def request_extract_interface(self,json,headers):
- # _i = random.randint(0,len(self.extract_interfaces)-1)
- # _i = 0
- # _url = self.extract_interfaces[_i]
- _url = self.getExtract_url()
- log("extract_url:%s"%(str(_url)))
- with requests.Session() as session:
- resp = session.post(_url,json=json,headers=headers,timeout=10*60)
- return resp
- def request_industry_interface(self,json,headers):
- resp = requests.post(self.industy_url,json=json,headers=headers)
- return resp
- def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
- q_size = self.queue_extract.qsize()
- log("queue extract size:%d"%(q_size))
- def process_extract_failed(self):
- def _handle(_dict,result_queue):
- frame = _dict.get("frame")
- message_id = frame.headers["message-id"]
- subscription = frame.headers.setdefault('subscription', None)
- conn = _dict.get("conn")
- body = frame.body
- if body is not None:
- item = json.loads(body)
- item["extract_times"] = 10
- if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
- ackMsg(conn,message_id,subscription)
- from BaseDataMaintenance.java.MQInfo import getQueueSize
- try:
- extract_failed_size = getQueueSize("dataflow_extract_failed")
- extract_size = getQueueSize("dataflow_extract")
- log("extract_failed_size %s extract_size %s"%(str(extract_failed_size),str(extract_size)))
- if extract_failed_size>0 and extract_size<100:
- failed_listener = self.ExtractListener(getConnect_activateMQ(),_handle,1)
- createComsumer(failed_listener,self.mq_extract_failed)
- while 1:
- extract_failed_size = getQueueSize("dataflow_extract_failed")
- if extract_failed_size==0:
- break
- time.sleep(10)
- failed_listener.conn.disconnect()
- except Exception as e:
- traceback.print_exc()
- def flow_extract(self,):
- self.comsumer()
- def comsumer(self):
- mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,20,1,True)
- mt.run()
- def getExtract_json_fromDB(self,_fingerprint):
- conn = self.pool_postgres.getConnector()
- try:
- list_extract = Document_extract_postgres.select_rows(conn,Document_extract_postgres,"document_extract",[" fingerprint='%s'"%_fingerprint])
- if len(list_extract)>0:
- _extract = list_extract[0]
- return _extract.getProperties().get(document_extract_extract_json)
- except Exception as e:
- traceback.print_exc()
- finally:
- self.pool_postgres.putConnector(conn)
- return None
- def putExtract_json_toDB(self,fingerprint,docid,extract_json):
- _de = Document_extract_postgres({document_extract_fingerprint:fingerprint,
- document_extract_docid:docid,
- document_extract_extract_json:extract_json})
- _de.insert_row(self.pool_postgres,1)
- def getExtract_json_fromRedis(self,_fingerprint):
- db = self.pool_redis_doc.getConnector()
- try:
- _extract_json = db.get(_fingerprint)
- return _extract_json
- except Exception as e:
- log("getExtract_json_fromRedis error %s"%(str(e)))
- finally:
- try:
- if db.connection.check_health():
- self.pool_redis_doc.putConnector(db)
- except Exception as e:
- pass
- return None
- def putExtract_json_toRedis(self,fingerprint,extract_json):
- db = self.pool_redis_doc.getConnector()
- try:
- _extract_json = db.set(str(fingerprint),extract_json)
- db.expire(fingerprint,3600*2)
- return _extract_json
- except Exception as e:
- log("putExtract_json_toRedis error%s"%(str(e)))
- traceback.print_exc()
- finally:
- try:
- if db.connection.check_health():
- self.pool_redis_doc.putConnector(db)
- except Exception as e:
- pass
- def comsumer_handle(self,_dict,result_queue):
- try:
- log("start handle")
- data = {}
- frame = _dict["frame"]
- conn = _dict["conn"]
- message_id = frame.headers["message-id"]
- subscription = frame.headers.setdefault('subscription', None)
- item = json.loads(frame.body)
- for k,v in item.items():
- try:
- if isinstance(v,bytes):
- item[k] = v.decode("utf-8")
- except Exception as e:
- log("docid %d types bytes can not decode"%(item.get("docid")))
- item[k] = ""
- dtmp = Document_tmp(item)
- dhtml = Document_html({"partitionkey":item.get("partitionkey"),
- "docid":item.get("docid")})
- extract_times = item.get("extract_times",0)+1
- item["extract_times"] = extract_times
- _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
- html_len = len(_dochtmlcon) # html 文本长度
- limit_text_len = 50000 # 内容(或附件)正文限制文本长度
- if html_len > limit_text_len:
- log("docid %s dochtmlcon too long len %d "%(str(item.get("docid")),html_len))
- try:
- _dochtmlcon = re.sub("<html>|</html>|<body>|</body>", "", _dochtmlcon)
- _soup = BeautifulSoup(_dochtmlcon,"lxml")
- all_len = len(_soup.get_text()) # 全公告内容text长度
- _attachment = _soup.find("div", attrs={"class": "richTextFetch"})
- attachment_len = len(_attachment.get_text()) if _attachment else 0 # 附件内容text长度
- main_text_len = all_len - attachment_len # 正文内容text长度
- if attachment_len>150000: # 附件内容过长删除(处理超时)
- if _attachment is not None:
- _attachment.decompose()
- attachment_len = 0
- # 正文或附件内容text长度大于limit_text_len才执行article_limit
- if main_text_len>limit_text_len or attachment_len>limit_text_len:
- _soup = article_limit(_soup,limit_text_len)
- _dochtmlcon = str(_soup)
- except Exception as e:
- traceback.print_exc()
- ackMsg(conn,message_id,subscription)
- return
- log("docid %s len %d limit to %d"%(str(item.get("docid")),html_len,len(_dochtmlcon)))
- dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
- _extract = Document_extract({})
- _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
- _extract.setValue(document_extract2_docid,item.get(document_docid))
- all_done = 1
- for k,v in item.items():
- data[k] = v
- data["timeout"] = 440
- data["doc_id"] = data.get(document_tmp_docid,0)
- # if data["docid"]<298986054 and data["docid"]>0:
- # log("jump docid %s"%(str(data["docid"])))
- # ackMsg(conn,message_id,subscription)
- # return
- data["content"] = data.get(document_tmp_dochtmlcon,"")
- if document_tmp_dochtmlcon in data:
- data.pop(document_tmp_dochtmlcon)
- data["title"] = data.get(document_tmp_doctitle,"")
- data["web_source_no"] = item.get(document_tmp_web_source_no,"")
- data["web_source_name"] = item.get(document_tmp_web_source_name,"")
- data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
- data["page_attachments"] = item.get(document_tmp_attachment_path,"[]")
- _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))+str(data["original_docchannel"])
- to_ai = False
- if all_done>0:
- _time = time.time()
- # extract_json = self.getExtract_json_fromDB(_fingerprint)
- extract_json = self.getExtract_json_fromRedis(_fingerprint)
- log("get json from db takes %.4f"%(time.time()-_time))
- # extract_json = None
- _docid = int(data["doc_id"])
- if extract_json is not None:
- log("fingerprint %s exists docid:%s"%(_fingerprint,str(_docid)))
- _extract.setValue(document_extract2_extract_json,extract_json,True)
- else:
- resp = self.request_extract_interface(json=data,headers=self.header)
- if (resp.status_code >=200 and resp.status_code<=213):
- extract_json = resp.content.decode("utf8")
- _extract.setValue(document_extract2_extract_json,extract_json,True)
- _time = time.time()
- # self.putExtract_json_toDB(_fingerprint,_docid,extract_json)
- self.putExtract_json_toRedis(_fingerprint,extract_json)
- log("get json to db takes %.4f"%(time.time()-_time))
- else:
- log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
- all_done = -2
- to_ai,_reason = self.should_to_extract_ai(extract_json)
- if to_ai:
- log("to_ai of docid:%s of reason:%s"%(str(_docid),str(_reason)))
- # if all_done>0:
- # resp = self.request_industry_interface(json=data,headers=self.header)
- # if (resp.status_code >=200 and resp.status_code<=213):
- # _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
- # else:
- # log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
- # all_done = -3
- # _to_ack = False
- # if all_done>0 and len(_extract.getProperties().get(document_extract2_extract_json,""))<=2:
- # all_done = -4
- _extract.setValue(document_extract2_industry_json,"{}",True)
- _to_ack = True
- try:
- if all_done!=1:
- # sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
- log("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
- if extract_times>=10:
- #process as succeed
- dtmp.setValue(document_tmp_dochtmlcon,"",False)
- dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
- dtmp.update_row(self.ots_client)
- dhtml.update_row(self.ots_client)
- #replace as {}
- _extract.setValue(document_extract2_extract_json,"{}",True)
- _extract.setValue(document_extract2_industry_json,"{}",True)
- _extract.setValue(document_extract2_status,random.randint(1,50),True)
- _extract.update_row(self.ots_client)
- _to_ack = True
- elif extract_times>5:
- #transform to the extract_failed queue
- if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
- #process as succeed
- dtmp.setValue(document_tmp_dochtmlcon,"",False)
- dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
- dtmp.update_row(self.ots_client)
- dhtml.update_row(self.ots_client)
- #replace as {}
- _extract.setValue(document_extract2_extract_json,"{}",True)
- _extract.setValue(document_extract2_industry_json,"{}",True)
- _extract.setValue(document_extract2_status,random.randint(1,50),True)
- _extract.update_row(self.ots_client)
- _to_ack = True
- else:
- send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
- #失败保存
- dtmp.setValue(document_tmp_dochtmlcon,"",False)
- dtmp.setValue(document_tmp_status,60,True)
- if not dtmp.exists_row(self.ots_client):
- dtmp.update_row(self.ots_client)
- dhtml.update_row(self.ots_client)
- if send_succeed:
- _to_ack = True
- else:
- #process succeed
- dtmp.setValue(document_tmp_dochtmlcon,"",False)
- dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
- if _docid==290816703:
- dtmp.setValue("test_json",extract_json,True)
- dtmp.update_row(self.ots_client)
- dhtml.update_row(self.ots_client)
- _extract.setValue(document_extract2_status,random.randint(1,50),True)
- _extract.update_row(self.ots_client)
- _to_ack = True
- except Exception:
- traceback.print_exc()
- if _to_ack:
- ackMsg(conn,message_id,subscription)
- if to_ai:
- #sent to ai
- item[document_extract2_extract_json] = json.loads(_extract.getProperties().get(document_extract2_extract_json,"{}"))
- send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai)
- else:
- item["extract_times"] -= 1
- send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
- ackMsg(conn,message_id,subscription)
- log("process %s docid:%d %s"%(str(_to_ack),data.get("doc_id"),str(all_done)))
- except requests.ConnectionError as e1:
- item["extract_times"] -= 1
- if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
- ackMsg(conn,message_id,subscription)
- except Exception as e:
- traceback.print_exc()
- # sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
- log("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
- log("process %s docid: failed message_id:%s"%(data.get("doc_id"),message_id))
- if extract_times>=10:
- #process as succeed
- dtmp.setValue(document_tmp_dochtmlcon,"",False)
- dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
- dtmp.update_row(self.ots_client)
- dhtml.update_row(self.ots_client)
- #replace as {}
- _extract.setValue(document_extract2_extract_json,"{}",True)
- _extract.setValue(document_extract2_industry_json,"{}",True)
- _extract.setValue(document_extract2_status,random.randint(1,50),True)
- _extract.update_row(self.ots_client)
- ackMsg(conn,message_id,subscription)
- elif extract_times>5:
- #transform to the extract_failed queue
- if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
- #process as succeed
- dtmp.setValue(document_tmp_dochtmlcon,"",False)
- dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
- dtmp.update_row(self.ots_client)
- dhtml.update_row(self.ots_client)
- #replace as {}
- _extract.setValue(document_extract2_extract_json,"{}",True)
- _extract.setValue(document_extract2_industry_json,"{}",True)
- _extract.setValue(document_extract2_status,random.randint(1,50),True)
- _extract.update_row(self.ots_client)
- ackMsg(conn,message_id,subscription)
- else:
- #transform to the extract queue
- #失败保存
- dtmp.setValue(document_tmp_dochtmlcon,"",False)
- dtmp.setValue(document_tmp_status,60,True)
- if not dtmp.exists_row(self.ots_client):
- dtmp.update_row(self.ots_client)
- dhtml.update_row(self.ots_client)
- if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
- ackMsg(conn,message_id,subscription)
- def should_to_extract_ai(self,extract_json):
- _reason = ""
- # return False,_reason
- _extract = {}
- if extract_json is not None:
- try:
- _extract = json.loads(extract_json)
- except Exception as e:
- pass
- has_entity = False
- docchannel = _extract.get("docchannel",{}).get("docchannel","")
- doctype = _extract.get("docchannel",{}).get("doctype","")
- entity_len = len(_extract.get("dict_enterprise",{}).keys())
- product = str(_extract.get("product",""))
- class_name = _extract.get("industry",{}).get("class_name","")
- _entity = None
- if entity_len>0:
- has_entity = True
- if entity_len==1:
- _entity = list(_extract.get("dict_enterprise",{}).keys())[0]
- prem = _extract.get("prem",{})
- one_entity_used = False
- has_tenderee = False
- has_win_tenderer = False
- has_budget = False
- budget_unexpected = False
- winprice_unexpected = False
- for _pack,_pack_value in prem.items():
- _rolelist = _pack_value.get("roleList",[])
- for _role in _rolelist:
- if _role.get("role_name","")=="tenderee":
- has_tenderee = True
- if _role.get("role_text","")==_entity:
- one_entity_used = True
- if _role.get("role_name","")=="agency":
- if _role.get("role_text","")==_entity:
- one_entity_used = True
- if _role.get("role_name","")=="win_tenderer":
- has_win_tenderer = True
- if _role.get("role_text","")==_entity:
- one_entity_used = True
- win_price = _role.get("role_money",{}).get("money",0)
- try:
- win_price = float(win_price)
- except Exception as e:
- win_price = 0
- if win_price>0:
- if win_price>100000000 or win_price<100:
- winprice_unexpected = True
- tendereeMoney = _pack_value.get("tendereeMoney",0)
- try:
- tendereeMoney = float(tendereeMoney)
- except Exception as e:
- tendereeMoney = 0
- if tendereeMoney>0:
- has_budget = True
- if tendereeMoney>100000000 or tendereeMoney<100:
- budget_unexpected = True
- if doctype=="采招数据":
- if has_entity and not one_entity_used:
- if not has_tenderee and docchannel in {"招标公告","中标信息","候选人公示","合同公告","验收合同"}:
- return True,_reason
- if not has_win_tenderer and docchannel in {"中标信息","候选人公示","合同公告","验收合同"}:
- return True,_reason
- if class_name=="医疗设备" or self.MP.is_medical_product(product):
- _reason = "medical product"
- return True,_reason
- if budget_unexpected or winprice_unexpected:
- return True,_reason
- return False,_reason
- def extract_ai_handle(self,_dict,result_queue):
- frame = _dict["frame"]
- conn = _dict["conn"]
- message_id = frame.headers["message-id"]
- subscription = frame.headers.setdefault('subscription', None)
- item = json.loads(frame.body)
- _extract_json = None
- if document_extract2_extract_json in item:
- _extract_json = item.get(document_extract2_extract_json)
- item.pop(document_extract2_extract_json)
- try:
- message_acknowledged = False
- dtmp = Document_tmp(item)
- _tomq = False
- if dtmp.fix_columns(self.ots_client,["status","save"],True):
- if dtmp.getProperties().get("status",0)>=71:
- if dtmp.getProperties().get("save",1)==0:
- message_acknowledged = True
- log("extract_dump_ai of docid:%d"%(item.get(document_docid)))
- ackMsg(conn,message_id,subscription)
- return
- else:
- _tomq = True
- else:
- _tomq = True
- if _tomq:
- aitimes = item.get("aitimes")
- if aitimes is None:
- aitimes = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
- item["aitimes"] = aitimes
- if not message_acknowledged:
- item[document_extract2_extract_json] = _extract_json
- if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai):
- message_acknowledged = True
- ackMsg(conn,message_id,subscription)
- time.sleep(1)
- return
- else:
- if not timeAdd(aitimes,0,format="%Y-%m-%d %H:%M:%S",minutes=10)<getCurrent_date(format="%Y-%m-%d %H:%M:%S"):
- if not message_acknowledged:
- item[document_extract2_extract_json] = _extract_json
- if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai):
- message_acknowledged = True
- ackMsg(conn,message_id,subscription)
- time.sleep(1)
- return
- dhtml = Document_html({"partitionkey":item.get("partitionkey"),
- "docid":item.get("docid")})
- _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
- dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
- _extract_ai = {}
- _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
- _text = html2text_with_tablehtml(_dochtmlcon)
- if len(_text)<25000:
- model_name = "ep-20250314164242-jd62g" #1.5pro 32k
- else:
- _text = _text[:100000]
- model_name = "ep-20250212111145-fflr7" #1.5pro 256k
- msg = get_prompt_extract_role(_text)
- #model_name = "ep-20250212111145-fflr7" #1.5pro 256k
- #model_name = "ep-20250314164242-jd62g" #1.5pro 32k
- result = chat_doubao(msg,model_name=model_name)
- _json = get_json_from_text(result)
- if _json is not None:
- try:
- _extract_ai = json.loads(_json)
- except Exception as e:
- pass
- if len(_extract_ai.keys())>0:
- _new_json,_changed = self.merge_json(_extract_json,_json)
- if _changed:
- dtmp.setValue("extract_json_ai",json.dumps(_extract_ai,ensure_ascii=False))
- dtmp.setValue(document_tmp_dochtmlcon,"",False)
- dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
- dtmp.update_row(self.ots_client)
- if not dhtml.exists_row(self.ots_client):
- dhtml.update_row(self.ots_client)
- _extract = Document_extract({})
- _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
- _extract.setValue(document_extract2_docid,item.get(document_docid))
- _extract.setValue(document_extract2_extract_json,_new_json,True)
- _extract.setValue(document_extract2_industry_json,"{}",True)
- _extract.setValue(document_extract2_status,random.randint(1,50),True)
- _extract.update_row(self.ots_client)
- log("extract_ai of docid:%d"%(item.get(document_docid)))
- else:
- doc = Document({"partitionkey":item.get("partitionkey"),
- "docid":item.get("docid"),
- "extract_json_ai":_json
- })
- if doc.exists_row(self.ots_client):
- doc.update_row(self.ots_client)
- log("extract_nochange_ai of docid:%d"%(item.get(document_docid)))
- if not message_acknowledged:
- message_acknowledged = True
- ackMsg(conn,message_id,subscription)
- except Exception as e:
- traceback.print_exc()
- if not message_acknowledged:
- ackMsg(conn,message_id,subscription)
- def merge_json(self,extract_json,extract_ai_json):
- def get_ai_money(_text):
- b = re.search(r'[\d,,\.]+[亿万元人民币]+',str(_text))
- if b is not None:
- return b.group()
- def clean_ai_entity(entity,set_entitys):
- if isinstance(entity,str):
- if re.search("(未|无)(明确|提及)|某(部|单位|医院|公司)|\*\*|XX|登录|详见|招标单位",entity) is not None:
- return ""
- if re.search("无|区|县|市|省|某|中心|部|公司",entity) is not None and len(entity)<=5:
- return ""
- # return entity
- for _i in range(len(entity)-5):
- if entity[:len(entity)-_i] in set_entitys:
- return entity[:len(entity)-_i]
- return ""
- def clean_ai_extract(_extract,_extract_ai):
- set_entitys = set()
- set_moneys = set()
- dict_enterprise = _extract.get("dict_enterprise",{})
- for k,v in dict_enterprise.items():
- set_entitys.add(str(k))
- _sum = 0
- for _money in _extract.get("moneys",[]):
- _money = float(_money)
- set_moneys.add(str(_money))
- _sum += _money
- if _sum>0:
- set_moneys.add(str(float(_sum)))
- _sum = 0
- for _money in _extract.get("moneys_attachment",[]):
- _money = float(_money)
- set_moneys.add(str(_money))
- _sum += _money
- if _sum>0:
- set_moneys.add(str(float(_sum)))
- _tenderee_dict = _extract_ai.get("招标信息",{})
- _tenderee_ai = _tenderee_dict.get("招标人名称")
- if _tenderee_ai is not None:
- _tenderee_ai = clean_ai_entity(_tenderee_ai,set_entitys)
- _tenderee_dict["招标人名称"] = _tenderee_ai
- # if _tenderee_ai in set_entitys:
- # _tenderee_dict["招标人名称"] = _tenderee_ai
- # else:
- # _tenderee_dict["招标人名称"] = ""
- _budget = _tenderee_dict.get("项目预算")
- if _budget is not None:
- _budget = getUnifyMoney(str(_budget))
- _budget = str(float(_budget))
- if _budget in set_moneys:
- _tenderee_dict["项目预算"] = _budget
- else:
- _tenderee_dict["项目预算"] = ""
- list_win = _extract_ai.get("中标信息",[])
- for _win_dict_i in range(len(list_win)):
- _win_dict = list_win[_win_dict_i]
- _pack = _win_dict.get("标段号","")
- if _pack=="":
- if len(list_win)>1:
- _pack = "AI_%d"%(_win_dict_i)
- else:
- _pack = "Project"
- _win_money = _win_dict.get("中标金额")
- if str(_win_money).find("%")>=0:
- _win_money = 0
- _win_money = getUnifyMoney(str(_win_money))
- _win_money = str(float(_win_money))
- if _win_money in set_moneys:
- _win_dict["中标金额"] = _win_money
- else:
- _win_dict["中标金额"] = ""
- _win_tenderer = _win_dict.get("中标人名称")
- _win_tenderer = clean_ai_entity(_win_tenderer,set_entitys)
- _win_dict["中标人名称"] = _win_tenderer
- # if _win_tenderer in set_entitys:
- # _win_dict["中标人名称"] = _win_tenderer
- # else:
- # _win_dict["中标人名称"] = ""
- list_product = _extract_ai.get("产品信息",[])
- for product_i in range(len(list_product)):
- product_dict = list_product[product_i]
- uni_price = product_dict.get("单价","")
- quantity = product_dict.get("数量","")
- total_price = product_dict.get("总价","")
- if uni_price is not None and uni_price!="":
- uni_price = getUnifyMoney(str(uni_price))
- uni_price = str(float(uni_price))
- if uni_price in set_moneys:
- product_dict["单价"] = uni_price
- else:
- product_dict["单价"] = ""
- if quantity is not None and quantity!="":
- quantity = getUnifyMoney(str(quantity))
- product_dict["数量"] = quantity
- if total_price is not None and total_price!="":
- total_price = getUnifyMoney(str(total_price))
- total_price = str(float(total_price))
- if total_price in set_moneys:
- product_dict["总价"] = total_price
- else:
- product_dict["总价"] = ""
- _extract = {}
- if extract_json is not None:
- try:
- if isinstance(extract_json,str):
- _extract = json.loads(extract_json)
- else:
- _extract = extract_json
- except Exception as e:
- pass
- if "extract_count" not in _extract:
- _extract["extract_count"] = 0
- _extract_ai = {}
- if extract_ai_json is not None:
- try:
- _extract_ai = json.loads(extract_ai_json)
- except Exception as e:
- pass
- clean_ai_extract(_extract,_extract_ai)
- _product = _extract.get("product",[])
- list_new_product = _extract_ai.get("产品信息",[])
- prem = _extract.get("prem")
- if prem is None:
- _extract["prem"] = {}
- prem = _extract["prem"]
- Project = prem.get("Project")
- if Project is None:
- prem["Project"] = {}
- Project = prem["Project"]
- Project_rolelist = Project.get("roleList")
- if Project_rolelist is None:
- Project["roleList"] = []
- Project_rolelist = Project["roleList"]
- has_tenderee = False
- has_win_tenderer = False
- has_budget = False
- budget_unexpected = False
- winprice_unexpected = False
- for _pack,_pack_value in prem.items():
- _rolelist = _pack_value.get("roleList",[])
- for _role in _rolelist:
- if _role.get("role_name","")=="tenderee":
- has_tenderee = True
- if _role.get("role_name","")=="win_tenderer":
- has_win_tenderer = True
- win_price = _role.get("role_money",{}).get("money",0)
- try:
- win_price = float(win_price)
- except Exception as e:
- win_price = 0
- if win_price>0:
- if win_price>100000000 or win_price<100:
- winprice_unexpected = True
- tendereeMoney = _pack_value.get("tendereeMoney",0)
- try:
- tendereeMoney = float(tendereeMoney)
- except Exception as e:
- tendereeMoney = 0
- if tendereeMoney>0:
- has_budget = True
- if tendereeMoney>100000000 or tendereeMoney<100:
- budget_unexpected = True
- _changed = False
- prem_original = {}
- product_attrs_original = {}
- try:
- prem_original = json.loads(json.dumps(_extract.get("prem",{})))
- product_attrs_original = json.loads(json.dumps(_extract.get("product_attrs",{})))
- except Exception as e:
- pass
- if len(list_new_product)>0 and len(list_new_product)>len(product_attrs_original.get("data",[]))//3:
- set_product = set(_product)
- product_attrs_new = {"data":[]}
- for product_i in range(len(list_new_product)):
- brand = list_new_product[product_i].get("品牌","")
- product = list_new_product[product_i].get("产品名称","")
- quantity = list_new_product[product_i].get("数量","")
- quantity_unit = list_new_product[product_i].get("数量单位","")
- specs = list_new_product[product_i].get("规格型号","")
- uni_price = list_new_product[product_i].get("单价","")
- total_price = list_new_product[product_i].get("总价","")
- pinmu_no = list_new_product[product_i].get("品目编号","")
- pinmu_name = list_new_product[product_i].get("品目名称","")
- product_attrs_new["data"].append({
- "brand": str(brand),
- "product": product,
- "quantity": str(quantity),
- "quantity_unit": quantity_unit,
- "specs": str(specs),
- "unitPrice": str(uni_price),
- "parameter": "",
- "total_price": str(total_price),
- "pinmu_no": str(pinmu_no),
- "pinmu_name": str(pinmu_name)
- })
- if product not in set_product:
- set_product.add(product)
- _changed = True
- _extract["product"] = list(set_product)
- _extract["product_attrs"] = product_attrs_new
- _extract["extract_count"] += 3
- if not has_tenderee:
- _tenderee_ai = _extract_ai.get("招标信息",{}).get("招标人名称")
- _contacts = _extract_ai.get("招标信息",{}).get("招标人联系方式",[])
- _budget = _extract_ai.get("招标信息",{}).get("项目预算","")
- _linklist = []
- for _conta in _contacts:
- _person = _conta.get("联系人","")
- _phone = _conta.get("联系电话","")
- if _person!="" or _phone!="":
- _linklist.append([_person,_phone])
- if _tenderee_ai is not None and _tenderee_ai!="" and len(_tenderee_ai)>=4:
- _role_dict = {
- "role_name": "tenderee",
- "role_text": _tenderee_ai,
- "from_ai":True
- }
- if len(_linklist)>0:
- _role_dict["linklist"] = _linklist
- Project_rolelist.append(_role_dict)
- _changed = True
- _extract["extract_count"] += 1
- if not has_budget or budget_unexpected:
- _budget = _extract_ai.get("招标信息",{}).get("项目预算","")
- if _budget is not None and _budget!="":
- _budget = getUnifyMoney(_budget)
- if _budget>0:
- if "tendereeMoney" in Project:
- Project["tendereeMoney_original"] = Project["tendereeMoney"]
- Project["tendereeMoney"] = str(float(_budget))
- _changed = True
- _extract["extract_count"] += 1
- else:
- if budget_unexpected:
- if "tendereeMoney" in Project:
- Project["tendereeMoney_original"] = Project["tendereeMoney"]
- Project["tendereeMoney"] = "0"
- if not has_win_tenderer or winprice_unexpected:
- list_win = _extract_ai.get("中标信息",[])
- if len(list_win)>0:
- winprice_unexpected_new = False
- for _win_dict_i in range(len(list_win)):
- _win_dict = list_win[_win_dict_i]
- _pack = _win_dict.get("标段号","")
- if _pack=="":
- if len(list_win)>1:
- _pack = "AI_%d"%(_win_dict_i)
- else:
- _pack = "Project"
- _win_money = _win_dict.get("中标金额")
- if _win_money is not None and _win_money!="":
- _win_money = getUnifyMoney(_win_money)
- else:
- _win_money = 0
- if _win_money>0:
- if _win_money>100000000 or _win_money<100:
- winprice_unexpected_new = True
- has_delete_win = False
- if winprice_unexpected and not winprice_unexpected_new:
- has_delete_win = True
- pop_packs = []
- for _pack,_pack_value in prem.items():
- _rolelist = _pack_value.get("roleList",[])
- new_rolelist = []
- for _role in _rolelist:
- if _role.get("role_name","")!="win_tenderer":
- new_rolelist.append(_role)
- _pack_value["roleList"] = new_rolelist
- if len(new_rolelist)==0 and _pack!="Project":
- pop_packs.append(_pack)
- for _pack in pop_packs:
- prem.pop(_pack)
- if not has_win_tenderer or has_delete_win:
- Project_rolelist = Project.get("roleList")
- if Project_rolelist is None:
- Project["roleList"] = []
- Project_rolelist = Project["roleList"]
- for _win_dict_i in range(len(list_win)):
- _win_dict = list_win[_win_dict_i]
- _pack = _win_dict.get("标段号","")
- if _pack=="":
- if len(list_win)>1:
- _pack = "AI_%d"%(_win_dict_i)
- else:
- _pack = "Project"
- _win_money = _win_dict.get("中标金额")
- if _win_money is not None and _win_money!="":
- _win_money = getUnifyMoney(_win_money)
- else:
- _win_money = 0
- _win_tenderer = _win_dict.get("中标人名称")
- if _win_tenderer!="" and len(_win_tenderer)>=4:
- _role_dict = {
- "role_name": "win_tenderer",
- "role_text": _win_tenderer,
- "winprice_unexpected":winprice_unexpected,
- "from_ai":True
- }
- _role_dict["role_money"] = {
- "money": str(float(_win_money))
- }
- _changed = True
- _extract["extract_count"] += 2
- if _pack=="Project":
- Project_rolelist.append(_role_dict)
- else:
- prem[_pack] = {
- "roleList":[
- _role_dict
- ]
- }
- if _changed:
- _extract["extract_ai"] = True
- _extract["prem_original"] = prem_original
- _extract["product_attrs_original"] = product_attrs_original
- return json.dumps(_extract,ensure_ascii=False),_changed
- def delete_document_extract(self,save_count=70*10000):
- conn = self.pool_postgres.getConnector()
- try:
- cursor = conn.cursor()
- sql = " select max(docid),min(docid) from document_extract "
- cursor.execute(sql)
- rows = cursor.fetchall()
- if len(rows)>0:
- maxdocid,mindocid = rows[0]
- d_mindocid = int(maxdocid)-save_count
- if mindocid<d_mindocid:
- sql = " delete from document_extract where docid<%d"%d_mindocid
- cursor.execute(sql)
- conn.commit()
- except Exception as e:
- traceback.print_exc()
- finally:
- self.pool_postgres.putConnector(conn)
- def start_flow_extract(self):
- schedule = BlockingScheduler()
- schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
- schedule.add_job(self.process_extract_failed,"cron",minute="*/5")
- # schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
- # schedule.add_job(self.monitor_listener,"cron",minute="*/5")
- schedule.start()
- from multiprocessing import RLock
- docid_lock = RLock()
- conn_mysql = None
- def generateRangeDocid(nums):
- global conn_mysql
- while 1:
- try:
- with docid_lock:
- if conn_mysql is None:
- conn_mysql = getConnection_mysql()
- cursor = conn_mysql.cursor()
- sql = "select serial_value from b2c_serial_no where serial_name='DocumentIdSerial'"
- cursor.execute(sql)
- rows = cursor.fetchall()
- current_docid = rows[0][0]
- next_docid = current_docid+1
- update_docid = current_docid+nums
- sql = " update b2c_serial_no set serial_value=%d where serial_name='DocumentIdSerial'"%(update_docid)
- cursor.execute(sql)
- conn_mysql.commit()
- return next_docid
- except Exception as e:
- conn_mysql = getConnection_mysql()
- # 自定义jsonEncoder
- class MyEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, np.ndarray):
- return obj.tolist()
- elif isinstance(obj, bytes):
- return str(obj, encoding='utf-8')
- elif isinstance(obj, (np.float_, np.float16, np.float32,
- np.float64,Decimal)):
- return float(obj)
- elif isinstance(obj,str):
- return obj
- return json.JSONEncoder.default(self, obj)
- class Dataflow_init(Dataflow):
- class InitListener():
- def __init__(self,conn,*args,**kwargs):
- self.conn = conn
- self.get_count = 1000
- self.count = self.get_count
- self.begin_docid = None
- self.mq_init = "/queue/dataflow_init"
- self.mq_attachment = "/queue/dataflow_attachment"
- self.mq_extract = "/queue/dataflow_extract"
- self.pool_mq1 = ConnectorPool(1,4,getConnect_activateMQ)
- def on_error(self, headers):
- log('received an error %s' % headers.body)
- def getRangeDocid(self):
- begin_docid = generateRangeDocid(self.get_count)
- self.begin_docid = begin_docid
- self.count = 0
- def getNextDocid(self):
- if self.count>=self.get_count:
- self.getRangeDocid()
- next_docid = self.begin_docid+self.count
- self.count += 1
- return next_docid
- def on_message(self, headers):
- try:
- next_docid = int(self.getNextDocid())
- partitionkey = int(next_docid%500+1)
- message_id = headers.headers["message-id"]
- body = json.loads(headers.body)
- body[document_tmp_partitionkey] = partitionkey
- body[document_tmp_docid] = next_docid
- if body.get(document_original_docchannel) is None:
- body[document_original_docchannel] = body.get(document_docchannel)
- page_attachments = body.get(document_tmp_attachment_path,"[]")
- _uuid = body.get(document_tmp_uuid,"")
- if page_attachments!="[]":
- status = random.randint(1,10)
- body[document_tmp_status] = status
- if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment):
- log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
- ackMsg(self.conn,message_id)
- else:
- log("send_msg_error on init listener")
- else:
- status = random.randint(11,50)
- body[document_tmp_status] = status
- if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract):
- log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
- ackMsg(self.conn,message_id)
- else:
- log("send_msg_error on init listener")
- except Exception as e:
- traceback.print_exc()
- if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_init):
- log("init error")
- ackMsg(self.conn,message_id)
- def __del__(self):
- self.conn.disconnect()
- del self.pool_mq1
- def __init__(self):
- Dataflow.__init__(self)
- self.max_shenpi_id = None
- self.base_shenpi_id = 400000000000
- self.mq_init = "/queue/dataflow_init"
- self.mq_attachment = "/queue/dataflow_attachment"
- self.mq_extract = "/queue/dataflow_extract"
- self.pool_oracle = ConnectorPool(10,15,getConnection_oracle)
- self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
- self.ots_capacity = getConnect_ots_capacity()
- self.init_comsumer_counts = 2
- self.list_init_comsumer = []
- for i in range(self.init_comsumer_counts):
- listener = self.InitListener(getConnect_activateMQ())
- createComsumer(listener,self.mq_init)
- self.list_init_comsumer.append(listener)
- def monitor_listener(self):
- for i in range(len(self.list_init_comsumer)):
- if self.list_init_comsumer[i].conn.is_connected():
- continue
- else:
- listener = self.InitListener(getConnect_activateMQ())
- createComsumer(listener,self.mq_init)
- self.list_init_comsumer[i] = listener
- def temp2mq(self,object):
- conn_oracle = self.pool_oracle.getConnector()
- try:
- list_obj = object.select_rows(conn_oracle,type(object),object.table_name,[])
- for _obj in list_obj:
- ots_dict = _obj.getProperties_ots()
- if len(ots_dict.get("dochtmlcon",""))>500000:
- _obj.delete_row(conn_oracle)
- log("msg too long:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
- continue
- if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_init):
- #删除数据,上线放开
- _obj.delete_row(conn_oracle)
- else:
- log("send_msg_error111:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
- self.pool_oracle.putConnector(conn_oracle)
- except Exception as e:
- traceback.print_exc()
- self.pool_oracle.decrease()
- def shenpi2mq(self):
- conn_oracle = self.pool_oracle.getConnector()
- try:
- if self.max_shenpi_id is None:
- # get the max_shenpi_id
- _query = BoolQuery(must_queries=[ExistsQuery("id")])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
- SearchQuery(_query,sort=Sort(sorters=[FieldSort("id",SortOrder.DESC)]),limit=1))
- list_data = getRow_ots(rows)
- if len(list_data)>0:
- max_shenpi_id = list_data[0].get("id")
- if max_shenpi_id>self.base_shenpi_id:
- max_shenpi_id -= self.base_shenpi_id
- self.max_shenpi_id = max_shenpi_id
- if self.max_shenpi_id<60383953:
- self.max_shenpi_id = 60383953
- if self.max_shenpi_id is not None:
- # select data in order
- origin_max_shenpi_id = T_SHEN_PI_XIANG_MU.get_max_id(conn_oracle)
- if origin_max_shenpi_id is not None:
- log("shenpi origin_max_shenpi_id:%d current_id:%d"%(origin_max_shenpi_id,self.max_shenpi_id))
- for _id_i in range(self.max_shenpi_id+1,origin_max_shenpi_id+1):
- list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i)
- # send data to mq one by one with max_shenpi_id updated
- for _data in list_data:
- _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
- ots_dict = _data.getProperties_ots()
- if ots_dict["docid"]<self.base_shenpi_id:
- ots_dict["docid"] += self.base_shenpi_id
- ots_dict["partitionkey"] = ots_dict["docid"]%500+1
- if ots_dict.get(T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS,"") !='[]':
- if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_attachment):
- self.max_shenpi_id = _id
- else:
- log("sent shenpi message to mq failed %s"%(_id))
- break
- else:
- if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_extract):
- self.max_shenpi_id = _id
- else:
- log("sent shenpi message to mq failed %s"%(_id))
- break
- self.pool_oracle.putConnector(conn_oracle)
- except Exception as e:
- log("shenpi error")
- traceback.print_exc()
- self.pool_oracle.decrease()
- def fix_shenpi(self):
- pool_oracle = ConnectorPool(10,15,getConnection_oracle)
- begin_id = 0
- end_id = 64790010
- thread_num = 15
- step = (end_id-begin_id)//thread_num
- list_items = []
- for _i in range(thread_num):
- _begin = _i*step
- _end = (_i+1)*step-1
- if _i==thread_num-1:
- _end = end_id
- list_items.append((_begin,_end,_i))
- task_queue = Queue()
- for item in list_items:
- task_queue.put(item)
- fix_count_list = []
- def _handle(item,result_queue):
- conn_oracle = pool_oracle.getConnector()
- (begin_id,end_id,thread_id) = item
- _count = 0
- for _id_i in range(begin_id,end_id):
- try:
- bool_query = BoolQuery(must_queries=[
- TermQuery("docchannel",302),
- TermQuery("original_id",_id_i)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
- SearchQuery(bool_query,get_total_count=True))
- if total_count>0:
- continue
- # bool_query = BoolQuery(must_queries=[
- # TermQuery("id",_id_i),
- # ])
- # rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
- # SearchQuery(bool_query,get_total_count=True))
- # if total_count>0:
- # continue
- try:
- list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i)
- except Exception as e:
- continue
- # send data to mq one by one with max_shenpi_id updated
- for _data in list_data:
- _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
- ots_dict = _data.getProperties_ots()
- if ots_dict["docid"]<self.base_shenpi_id:
- ots_dict["docid"] += self.base_shenpi_id
- ots_dict["partitionkey"] = ots_dict["docid"]%500+1
- ots_dict["status"] = 201
- dict_1 = {}
- dict_2 = {}
- for k,v in ots_dict.items():
- if k!="dochtmlcon":
- dict_1[k] = v
- if k in ('partitionkey',"docid","dochtmlcon"):
- dict_2[k] = v
- d_1 = Document(dict_1)
- d_2 = Document(dict_2)
- d_1.update_row(self.ots_client)
- d_2.update_row(self.ots_capacity)
- _count += 1
- except Exception as e:
- traceback.print_exc()
- log("thread_id:%d=%d/%d/%d"%(thread_id,_id_i-begin_id,_count,end_id-begin_id))
- fix_count_list.append(_count)
- pool_oracle.putConnector(conn_oracle)
- mt = MultiThreadHandler(task_queue,_handle,None,thread_count=thread_num)
- mt.run()
- print(fix_count_list,sum(fix_count_list))
- def ots2mq(self):
- try:
- from BaseDataMaintenance.java.MQInfo import getQueueSize
- attachment_size = getQueueSize("dataflow_attachment")
- extract_size = getQueueSize("dataflow_extract")
- if attachment_size>1000 or extract_size>1000:
- log("ots2mq break because of long queue size")
- return
- bool_query = BoolQuery(must_queries=[RangeQuery("status",1,51)])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data = getRow_ots(rows)
- task_queue = Queue()
- for _data in list_data:
- task_queue.put(_data)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
- SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data = getRow_ots(rows)
- for _data in list_data:
- task_queue.put(_data)
- if task_queue.qsize()>=1000:
- break
- def _handle(_data,result_queue):
- _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
- document_tmp_docid:_data.get(document_tmp_docid),
- document_tmp_status:0}
- _document = Document(_d)
- _document.fix_columns(self.ots_client,None,True)
- _data = _document.getProperties()
- page_attachments = _data.get(document_tmp_attachment_path,"[]")
- _document_html = Document(_data)
- _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
- if page_attachments!="[]":
- status = random.randint(1,10)
- _data[document_tmp_status] = status
- send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
- else:
- status = random.randint(11,50)
- _data[document_tmp_status] = status
- send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
- if send_succeed:
- _document.setValue(document_tmp_status,0,True)
- _document.update_row(self.ots_client)
- else:
- log("send_msg_error2222")
- if task_queue.qsize()>0:
- mt = MultiThreadHandler(task_queue,_handle,None,15)
- mt.run()
- except Exception as e:
- traceback.print_exc()
- def otstmp2mq(self):
- try:
- from BaseDataMaintenance.java.MQInfo import getQueueSize
- attachment_size = getQueueSize("dataflow_attachment")
- extract_size = getQueueSize("dataflow_extract")
- if attachment_size>1000 or extract_size>1000:
- log("otstmp2mq break because of long queue size")
- return
- bool_query = BoolQuery(must_queries=[TermQuery("status",0)])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
- ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- for _data in list_data:
- _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
- document_tmp_docid:_data.get(document_tmp_docid),
- document_tmp_status:0}
- _document = Document_tmp(_d)
- page_attachments = _data.get(document_tmp_attachment_path,"[]")
- log("refix doc %s from document_tmp"%(str(_data.get(document_tmp_docid))))
- _document_html = Document_html(_data)
- _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
- if page_attachments!="[]":
- status = random.randint(1,10)
- _data[document_tmp_status] = status
- send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
- else:
- status = random.randint(11,50)
- _data[document_tmp_status] = status
- send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
- if send_succeed:
- _document.setValue(document_tmp_status,1,True)
- _document.update_row(self.ots_client)
- else:
- log("send_msg_error2222")
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
- ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- for _data in list_data:
- _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
- document_tmp_docid:_data.get(document_tmp_docid),
- document_tmp_status:0}
- _document = Document_tmp(_d)
- page_attachments = _data.get(document_tmp_attachment_path,"[]")
- _document_html = Document_html(_data)
- _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
- if page_attachments!="[]":
- status = random.randint(1,10)
- _data[document_tmp_status] = status
- send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
- else:
- status = random.randint(11,50)
- _data[document_tmp_status] = status
- send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
- if send_succeed:
- _document.setValue(document_tmp_status,1,True)
- _document.update_row(self.ots_client)
- else:
- log("send_msg_error2222")
- except Exception as e:
- traceback.print_exc()
- def test_dump_docid(self):
- class TestDumpListener(ActiveMQListener):
- def on_message(self, headers):
- message_id = headers.headers["message-id"]
- body = headers.body
- self._queue.put(headers,True)
- ackMsg(self.conn,message_id)
- _queue = Queue()
- listener1 = TestDumpListener(getConnect_activateMQ(),_queue)
- listener2 = TestDumpListener(getConnect_activateMQ(),_queue)
- createComsumer(listener1,"/queue/dataflow_attachment")
- createComsumer(listener2,"/queue/dataflow_extract")
- time.sleep(10)
- list_item = []
- list_docid = []
- while 1:
- try:
- _item = _queue.get(timeout=2)
- list_item.append(_item)
- except Exception as e:
- break
- for item in list_item:
- _item = json.loads(item.body)
- list_docid.append(_item.get("docid"))
- log(list_docid[:10])
- log("len docid:%d set len:%d"%(len(list_docid),len(set(list_docid))))
- def start_dataflow_init(self):
- # self.test_dump_docid()
- from BaseDataMaintenance.model.oracle.CaiGouYiXiangTemp import CaiGouYiXiangTemp
- from BaseDataMaintenance.model.oracle.PaiMaiChuRangTemp import PaiMaiChuRangTemp
- from BaseDataMaintenance.model.oracle.ZhaoBiaoGongGaoTemp import ZhaoBiaoGongGaoTemp
- from BaseDataMaintenance.model.oracle.ZhaoBiaoYuGaoTemp import ZhaoBiaoYuGaoTemp
- from BaseDataMaintenance.model.oracle.ZhongBiaoXinXiTemp import ZhongBiaoXinXiTemp
- from BaseDataMaintenance.model.oracle.ZiShenJieGuoTemp import ZiShenJieGuoTemp
- from BaseDataMaintenance.model.oracle.ChanQuanJiaoYiTemp import ChanQuanJiaoYiTemp
- from BaseDataMaintenance.model.oracle.GongGaoBianGengTemp import GongGaoBianGeng
- from BaseDataMaintenance.model.oracle.KongZhiJiaTemp import KongZhiJiaTemp
- from BaseDataMaintenance.model.oracle.TuDiKuangChanTemp import TuDiKuangChanTemp
- from BaseDataMaintenance.model.oracle.ZhaoBiaoDaYiTemp import ZhaoBiaoDaYiTemp
- from BaseDataMaintenance.model.oracle.ZhaoBiaoWenJianTemp import ZhaoBiaoWenJianTemp
- from BaseDataMaintenance.model.oracle.TouSuChuLiTemp import TouSuChuLiTemp
- from BaseDataMaintenance.model.oracle.WeiFaJiLuTemp import WeiFaJiLuTemp
- from BaseDataMaintenance.model.oracle.QiTaShiXinTemp import QiTaShiXin
- schedule = BlockingScheduler()
- schedule.add_job(self.temp2mq,"cron",args=(CaiGouYiXiangTemp({}),),second="*/10")
- schedule.add_job(self.temp2mq,"cron",args=(PaiMaiChuRangTemp({}),),second="*/10")
- schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoGongGaoTemp({}),),second="*/10")
- schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoYuGaoTemp({}),),second="*/10")
- schedule.add_job(self.temp2mq,"cron",args=(ZhongBiaoXinXiTemp({}),),second="*/10")
- schedule.add_job(self.temp2mq,"cron",args=(ZiShenJieGuoTemp({}),),second="*/10")
- schedule.add_job(self.temp2mq,"cron",args=(ChanQuanJiaoYiTemp({}),),second="*/10")
- schedule.add_job(self.temp2mq,"cron",args=(GongGaoBianGeng({}),),second="*/10")
- schedule.add_job(self.temp2mq,"cron",args=(KongZhiJiaTemp({}),),second="*/10")
- schedule.add_job(self.temp2mq,"cron",args=(TuDiKuangChanTemp({}),),second="*/10")
- schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10")
- schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10")
- schedule.add_job(self.temp2mq,"cron",args=(TouSuChuLiTemp({}),),second="*/10")
- schedule.add_job(self.temp2mq,"cron",args=(WeiFaJiLuTemp({}),),second="*/10")
- schedule.add_job(self.temp2mq,"cron",args=(QiTaShiXin({}),),second="*/10")
- schedule.add_job(self.ots2mq,"cron",second="*/10")
- schedule.add_job(self.otstmp2mq,"cron",second="*/10")
- schedule.add_job(self.monitor_listener,"cron",minute="*/1")
- schedule.add_job(self.shenpi2mq,"cron",minute="*/1")
- schedule.start()
- def transform_attachment():
- from BaseDataMaintenance.model.ots.attachment import attachment
- from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
- from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots
- from threading import Thread
- from queue import Queue
- task_queue = Queue()
- def comsumer(task_queue,pool_postgres):
- while 1:
- _dict = task_queue.get(True)
- try:
- attach = Attachment_postgres(_dict)
- if not attach.exists(pool_postgres):
- attach.insert_row(pool_postgres)
- except Exception as e:
- traceback.print_exc()
- def start_comsumer(task_queue):
- pool_postgres = ConnectorPool(10,30,getConnection_postgres)
- comsumer_count = 30
- list_thread = []
- for i in range(comsumer_count):
- _t = Thread(target=comsumer,args=(task_queue,pool_postgres))
- list_thread.append(_t)
- for _t in list_thread:
- _t.start()
- ots_client = getConnect_ots()
- _thread = Thread(target=start_comsumer,args=(task_queue,))
- _thread.start()
- bool_query = BoolQuery(must_queries=[
- RangeQuery(attachment_crtime,"2022-05-06"),
- BoolQuery(should_queries=[TermQuery(attachment_status,10),
- TermQuery(attachment_status,ATTACHMENT_TOOLARGE)])
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(attachment_crtime,sort_order=SortOrder.DESC)]),get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- task_queue.put(_dict,True)
- _count = len(list_dict)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
- SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- task_queue.put(_dict,True)
- _count += len(list_dict)
- print("%d/%d,queue:%d"%(_count,total_count,task_queue.qsize()))
- def del_test_doc():
- ots_client = getConnect_ots()
- bool_query = BoolQuery(must_queries=[RangeQuery("docid",range_to=0)])
- list_data = []
- rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- print(total_count)
- list_row = getRow_ots(rows)
- list_data.extend(list_row)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_row = getRow_ots(rows)
- list_data.extend(list_row)
- for _row in list_data:
- _doc = Document_tmp(_row)
- _doc.delete_row(ots_client)
- _html = Document_html(_row)
- if _html.exists_row(ots_client):
- _html.delete_row(ots_client)
- def fixDoc_to_queue_extract():
- pool_mq = ConnectorPool(10,20,getConnect_activateMQ)
- try:
- ots_client = getConnect_ots()
- ots_capacity = getConnect_ots_capacity()
- bool_query = BoolQuery(must_queries=[
- RangeQuery("crtime","2022-05-31"),
- TermQuery("docchannel",114)
- ])
- list_data = []
- rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- print(total_count)
- list_row = getRow_ots(rows)
- list_data.extend(list_row)
- _count = len(list_row)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
- SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_row = getRow_ots(rows)
- list_data.extend(list_row)
- _count = len(list_data)
- print("%d/%d"%(_count,total_count))
- task_queue = Queue()
- for _row in list_data:
- if "all_columns" in _row:
- _row.pop("all_columns")
- _html = Document(_row)
- task_queue.put(_html)
- def _handle(item,result_queue):
- _html = item
- _html.fix_columns(ots_capacity,["dochtmlcon"],True)
- print(_html.getProperties().get(document_tmp_docid))
- send_msg_toacmq(pool_mq,json.dumps(_html.getProperties()),"/queue/dataflow_extract")
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- except Exception as e:
- traceback.print_exc()
- finally:
- pool_mq.destory()
- def check_data_synchronization():
- # filepath = "C:\\Users\\Administrator\\Desktop\\to_check.log"
- # list_uuid = []
- # _regrex = "delete\s+(?P<tablename>[^\s]+)\s+.*ID='(?P<uuid>.+)'"
- # with open(filepath,"r",encoding="utf8") as f:
- # while 1:
- # _line = f.readline()
- # if not _line:
- # break
- # _match = re.search(_regrex,_line)
- # if _match is not None:
- # _uuid = _match.groupdict().get("uuid")
- # tablename = _match.groupdict.get("tablename")
- # if _uuid is not None:
- # list_uuid.append({"uuid":_uuid,"tablename":tablename})
- # print("total_count:",len(list_uuid))
- import pandas as pd
- from BaseDataMaintenance.common.Utils import load
- task_queue = Queue()
- list_data = []
- df_data = load("uuid.pk")
- # df_data = pd.read_excel("check.xlsx")
- for uuid,tablename in zip(df_data["uuid"],df_data["tablename"]):
- _dict = {"uuid":uuid,
- "tablename":tablename}
- list_data.append(_dict)
- task_queue.put(_dict)
- print("qsize:",task_queue.qsize())
- ots_client = getConnect_ots()
- def _handle(_item,result_queue):
- bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
- rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
- SearchQuery(bool_query,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- _item["exists"] = total_count
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- df_data = {"uuid":[],
- "tablename":[],
- "exists":[]}
- for _data in list_data:
- if _data["exists"]==0:
- for k,v in df_data.items():
- v.append(_data.get(k))
- import pandas as pd
- df2 = pd.DataFrame(df_data)
- df2.to_excel("check1.xlsx")
- current_path = os.path.abspath(os.path.dirname(__file__))
- def test_ai_extract():
- _dochtmlcon = '''
- <div>
- <div>
- 公告内容:
- </div>
- <div>
- <p>宫腔镜</p>
- <div>
- <p> </p>
- <p> <span>440101-2025-07530</span> </p>
- </div>
- <div></div>
- <div>
- <div>
- <div>
- <div>
- <table>
- <tbody>
- <tr>
- <td> 一、采购人: <a target="_blank" class="markBlue" href="/bdqyhx/670312265132728320.html" style="color: #3083EB !important;text-decoration: underline;">广州医科大学附属妇女儿童医疗中心</a> </td>
- </tr>
- <tr>
- <td> 二、采购计划编号: 440101-2025-07530 </td>
- </tr>
- <tr>
- <td> 三、采购计划名称: 宫腔镜 </td>
- </tr>
- <tr>
- <td> 四、采购品目名称: 医用内窥镜 </td>
- </tr>
- <tr>
- <td> 五、采购预算金额(元): 1920000.00 </td>
- </tr>
- <tr>
- <td> 六、需求时间: </td>
- </tr>
- <tr>
- <td> 七、采购方式: 公开招标 </td>
- </tr>
- <tr>
- <td> 八、备案时间: 2025-05-06 09:12:11 </td>
- </tr>
- </tbody>
- </table>
- <div>
- <table>
- <tbody>
- <tr>
- <td>发布人:<a target="_blank" class="markBlue" href="/bdqyhx/670312265132728320.html" style="color: #3083EB !important;text-decoration: underline;">广州医科大学附属妇女儿童医疗中心</a></td>
- </tr>
- <tr>
- <td>发布时间: 2025年 05月 06日 </td>
- </tr>
- </tbody>
- </table>
- </div>
- </div>
- </div>
- </div>
- </div>
- </div>
- </div>
- '''
- _extract_json = '''
- { "addr_dic": {}, "aptitude": "", "attachmentTypes": "", "bid_score": [], "bidway": "公开招标", "candidate": "", "code": [ "440101-2025-07530" ], "code_investment": "", "cost_time": { "attrs": 0.07, "codename": 0.03, "deposit": 0.0, "district": 0.12, "kvtree": 0.03, "moneygrade": 0.0, "nerToken": 0.04, "outline": 0.03, "pb_extract": 0.16, "person": 0.0, "prem": 0.04, "preprocess": 0.11, "product": 0.02, "product_attrs": 0.01, "roleRuleFinal": 0.01, "rolegrade": 0.0, "rule": 0.01, "rule_channel": 0.03, "rule_channel2": 0.16, "tableToText": 0.03000166893005371, "tendereeRuleRecall": 0.0, "time": 0.01, "total_unit_money": 0.0 }, "demand_info": { "data": [], "header": [], "header_col": [] }, "deposit_patment_way": "", "dict_enterprise": { "广州医科大学附属妇女儿童医疗中心": { "in_text": 1 } }, "district": { "area": "华南", "city": "广州", "district": "未知", "is_in_text": false, "province": "广东" }, "docchannel": { "docchannel": "招标预告", "doctype": "采招数据", "life_docchannel": "招标预告", "use_original_docchannel": 0 }, "docid": "", "doctitle_refine": "宫腔镜", "exist_table": 1, "extract_count": 4, "fail_reason": "", "fingerprint": "md5=27c02035e60e7cc1b7b8be65eedf92fd", "industry": { "class": "零售批发", "class_name": "医疗设备", "subclass": "专用设备" }, "is_deposit_project": false, "label_dic": {}, "match_enterprise": [], "match_enterprise_type": 0, "moneys": [ 1920000.0 ], "moneys_attachment": [], "moneysource": "", "name": "医用内窥镜", "nlp_enterprise": [ "广州医科大学附属妇女儿童医疗中心" ], "nlp_enterprise_attachment": [], "pb": { "industry": "教育及研究", "project_name_refind": "医用内窥镜", "project_property": "新建" }, "pb_project_name": "医用内窥镜", "person_review": [], "pinmu_name": "医用内窥镜", "policies": [], "prem": { "Project": { "code": "", "name": "医用内窥镜", "roleList": [ { "address": "", "linklist": [], "role_money": { "discount_ratio": "", "downward_floating_ratio": "", "floating_ratio": "", "money": 0, "money_unit": "" }, "role_name": "tenderee", "role_prob": 0.9499999463558197, "role_text": "广州医科大学附属妇女儿童医疗中心", "serviceTime": "" } ], "tendereeMoney": "1920000.00", "tendereeMoneyUnit": "元", "uuid": "b1f25984-d473-4995-aec6-dbdba27fa898" } }, "process_time": "2025-05-06 09:24:32", "product": [ "宫腔镜", "医用内窥镜" ], "product_attrs": { "data": [], "header": [], "header_col": [] }, "project_contacts": [], "project_label": { "标题": { "医疗检测设备": [ [ "宫腔镜", 1 ] ] }, "核心字段": { "医疗检测设备": [ [ "宫腔镜", 2 ], [ "内窥镜", 2 ] ] } }, "property_label": "", "proportion": "", "requirement": "", "serviceTime": { "service_days": 0, "service_end": "", "service_start": "" }, "success": true, "time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_contractEnd": "", "time_contractStart": "", "time_earnestMoneyEnd": "", "time_earnestMoneyStart": "", "time_getFileEnd": "", "time_getFileStart": "", "time_listingEnd": "", "time_listingStart": "", "time_planned": "", "time_publicityEnd": "", "time_publicityStart": "", "time_registrationEnd": "", "time_registrationStart": "", "time_release": "2025-05-06", "time_signContract": "", "total_tendereeMoney": 0, "total_tendereeMoneyUnit": "", "version_date": "2025-04-23", "word_count": { "正文": 211, "附件": 0 } }
- '''
- _text = html2text_with_tablehtml(_dochtmlcon)
- if len(_text)<25000:
- model_name = "ep-20250314164242-jd62g" #1.5pro 32k
- else:
- _text = _text[:100000]
- model_name = "ep-20250212111145-fflr7" #1.5pro 256k
- msg = get_prompt_extract_role(_text)
- #model_name = "ep-20250212111145-fflr7" #1.5pro 256k
- #model_name = "ep-20250314164242-jd62g" #1.5pro 32k
- result = chat_doubao(msg,model_name=model_name)
- _json = get_json_from_text(result)
- if _json is not None:
- try:
- _extract_ai = json.loads(_json)
- except Exception as e:
- pass
- if len(_extract_ai.keys())>0:
- _new_json,_changed = de.merge_json(_extract_json,_json)
- print("new_json")
- print(_new_json)
- if __name__ == '__main__':
- # di = Dataflow_init()
- # di.start_dataflow_init()
- # transform_attachment()
- # del_test_doc()
- de = Dataflow_ActivteMQ_extract(create_listener=False)
- # print(getUnifyMoney('第1 - 5年承包费为1135元/年/亩,第6 - 10年承包费为1235元/年/亩'))
- a = '''
- { "addr_dic": {}, "aptitude": "", "attachmentTypes": "", "bid_score": [], "bidway": "询价", "candidate": "", "code": [ "20250309921" ], "code_investment": "", "cost_time": { "attrs": 0.07, "codename": 0.03, "deposit": 0.0, "district": 0.1, "kvtree": 0.05, "moneygrade": 0.0, "nerToken": 0.06, "outline": 0.02, "pb_extract": 0.14, "person": 0.0, "prem": 0.01, "preprocess": 0.1, "product": 0.03, "product_attrs": 0.0, "roleRuleFinal": 0.01, "rolegrade": 0.0, "rule": 0.02, "rule_channel": 0.02, "rule_channel2": 0.25, "tableToText": 0.04000095367431641, "tendereeRuleRecall": 0.0, "time": 0.0, "total_unit_money": 0.0 }, "demand_info": { "data": [], "header": [], "header_col": [] }, "deposit_patment_way": "", "dict_enterprise": { "五矿铜业(湖南)有限公司": { "credit_code": "91430482081376930E", "in_text": 1 } }, "district": { "area": "华中", "city": "未知", "district": "未知", "is_in_text": false, "province": "湖南" }, "docchannel": { "docchannel": "招标公告", "doctype": "采招数据", "life_docchannel": "招标公告", "use_original_docchannel": 0 }, "docid": "", "doctitle_refine": "湖南有色铜业方闸", "exist_table": 1, "extract_count": 2, "fail_reason": "", "fingerprint": "md5=84d3541612f56fc9dc07b0cd7fa2c644", "industry": { "class": "零售批发", "class_name": "有色金属冶炼及压延产品", "subclass": "建筑建材" }, "is_deposit_project": false, "label_dic": { "is_direct_procurement": 1 }, "match_enterprise": [], "match_enterprise_type": 0, "moneys": [], "moneys_attachment": [], "moneysource": "", "name": "湖南有色铜业方闸采购", "nlp_enterprise": [ "五矿铜业(湖南)有限公司" ], "nlp_enterprise_attachment": [], "pb": { "location": "湖南", "project_name_refind": "湖南有色铜业方闸采购", "project_property": "新建" }, "pb_project_name": "湖南有色铜业方闸采购", "person_review": [], "pinmu_name": "", "policies": [], "prem": {}, "process_time": "2025-04-16 14:58:20", "product": [ "铜业方闸", "手电不锈钢复合式速开方闸门配一体式启闭机" ], "product_attrs": { "data": [], "header": [], "header_col": [] }, "project_contacts": [], "project_label": { "标题": {}, "核心字段": { "门类": [ [ "闸门", 1 ] ] } }, "property_label": "", "proportion": "", "requirement": "", "serviceTime": { "service_days": 0, "service_end": "", "service_start": "" }, "success": true, "time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_contractEnd": "", "time_contractStart": "", "time_earnestMoneyEnd": "", "time_earnestMoneyStart": "", "time_getFileEnd": "", "time_getFileStart": "", "time_listingEnd": "", "time_listingStart": "", "time_planned": "", "time_publicityEnd": "", "time_publicityStart": "", "time_registrationEnd": "", "time_registrationStart": "", "time_release": "", "time_signContract": "", "total_tendereeMoney": 0, "total_tendereeMoneyUnit": "", "version_date": "2025-04-02", "word_count": { "正文": 240, "附件": 0 } }
- '''
- b = '''
- {"招标信息":{"招标人名称":"五矿铜业(湖南)有限公司","项目预算":"","招标人联系方式":[{"联系人":"李锟","联系电话":"13575144492"}]},"中标信息":[{"中标人名称":"","中标金额":"","标段号":""}]}
- '''
- # print(de.should_to_extract_ai(a))
- print(de.merge_json(a,b))
- print(test_ai_extract())
- # de.start_flow_extract()
- # fixDoc_to_queue_extract()
- # check_data_synchronization()
- # fixDoc_to_queue_init(filename="C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-07.xlsx")
|