dataflow_mq.py 123 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708
  1. #coding:utf8
  2. from BaseDataMaintenance.maintenance.dataflow import *
  3. from BaseDataMaintenance.common.activateMQUtils import *
  4. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ,getConnection_postgres,getConnection_mysql,getConnection_oracle,getConnect_ots_capacity,getConnect_redis_doc
  5. from BaseDataMaintenance.dataSource.setttings import *
  6. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  7. import os
  8. from BaseDataMaintenance.common.ossUtils import *
  9. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  10. from BaseDataMaintenance.model.ots.document import Document,document_attachment_path_filemd5
  11. from BaseDataMaintenance.common.Utils import article_limit
  12. from BaseDataMaintenance.common.documentFingerprint import getFingerprint
  13. from BaseDataMaintenance.model.postgres.document_extract import *
  14. from BaseDataMaintenance.model.oracle.T_SHEN_PI_XIANG_MU import *
  15. import sys
  16. sys.setrecursionlimit(1000000)
  17. from multiprocessing import Process
  18. from BaseDataMaintenance.AIUtils.DoubaoUtils import chat_doubao,get_json_from_text
  19. from BaseDataMaintenance.AIUtils.html2text import html2text_with_tablehtml
  20. from BaseDataMaintenance.AIUtils.prompts import get_prompt_extract_role
  21. from BaseDataMaintenance.common.Utils import getUnifyMoney
  22. from BaseDataMaintenance.maintenance.product.medical_product import MedicalProduct
  23. class ActiveMQListener():
  24. def __init__(self,conn,_queue,*args,**kwargs):
  25. self.conn = conn
  26. self._queue = _queue
  27. def on_error(self, headers):
  28. log("===============")
  29. log('received an error %s' % str(headers.body))
  30. def on_message(self, headers):
  31. log("====message====")
  32. message_id = headers.headers["message-id"]
  33. body = headers.body
  34. self._queue.put({"frame":headers,"conn":self.conn},True)
  35. def __del__(self):
  36. self.conn.disconnect()
  37. class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
  38. class AttachmentMQListener():
  39. def __init__(self,conn,_func,_idx,*args,**kwargs):
  40. self.conn = conn
  41. self._func = _func
  42. self._idx = _idx
  43. def on_error(self, headers):
  44. log("===============")
  45. log('received an error %s' % str(headers.body))
  46. def on_message(self, headers):
  47. try:
  48. log("get message of idx:%s"%(str(self._idx)))
  49. message_id = headers.headers["message-id"]
  50. body = headers.body
  51. _dict = {"frame":headers,"conn":self.conn,"idx":self._idx}
  52. self._func(_dict=_dict)
  53. except Exception as e:
  54. traceback.print_exc()
  55. pass
  56. def __del__(self):
  57. self.conn.disconnect()
  58. def __init__(self):
  59. Dataflow_attachment.__init__(self)
  60. self.mq_attachment = "/queue/dataflow_attachment"
  61. self.mq_attachment_failed = "/queue/dataflow_attachment_failed"
  62. self.mq_extract = "/queue/dataflow_extract"
  63. self.queue_attachment_ocr = Queue()
  64. self.queue_attachment_not_ocr = Queue()
  65. self.comsumer_count = 20
  66. self.comsumer_process_count = 5
  67. self.retry_comsumer_count = 10
  68. self.retry_times = 5
  69. self.list_attachment_comsumer = []
  70. # for _i in range(self.comsumer_count):
  71. # listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.queue_attachment)
  72. # createComsumer(listener_attachment,self.mq_attachment)
  73. # self.list_attachment_comsumer.append(listener_attachment)
  74. self.attach_pool = ConnectorPool(10,30,getConnection_postgres)
  75. self.redis_pool = ConnectorPool(10,30,getConnect_redis_doc)
  76. self.conn_mq = getConnect_activateMQ()
  77. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  78. self.session = None
  79. for _ in range(self.comsumer_process_count):
  80. listener_p = Process(target=self.start_attachment_listener)
  81. listener_p.start()
  82. # listener_p = Process(target=self.start_attachment_listener)
  83. # listener_p.start()
  84. def start_attachment_listener(self):
  85. for _i in range(self.comsumer_count):
  86. listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler ,_i)
  87. createComsumer(listener_attachment,self.mq_attachment)
  88. self.list_attachment_comsumer.append(listener_attachment)
  89. while 1:
  90. for i in range(len(self.list_attachment_comsumer)):
  91. if self.list_attachment_comsumer[i].conn.is_connected():
  92. continue
  93. else:
  94. listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,i)
  95. createComsumer(listener,self.mq_attachment)
  96. self.list_attachment_comsumer[i] = listener
  97. time.sleep(5)
  98. def monitor_listener(self):
  99. for i in range(len(self.list_attachment_comsumer)):
  100. if self.list_attachment_comsumer[i].conn.is_connected():
  101. continue
  102. else:
  103. listener = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  104. createComsumer(listener,self.mq_attachment)
  105. self.list_attachment_comsumer[i] = listener
  106. def process_failed_attachment(self):
  107. from BaseDataMaintenance.java.MQInfo import getQueueSize
  108. attachment_size = getQueueSize("dataflow_attachment")
  109. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  110. if attachment_size<100 and failed_attachment_size>0:
  111. list_comsumer = []
  112. for _i in range(self.retry_comsumer_count):
  113. listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
  114. list_comsumer.append(listener_attachment)
  115. createComsumer(listener_attachment,self.mq_attachment_failed)
  116. while 1:
  117. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  118. if failed_attachment_size==0:
  119. break
  120. time.sleep(10)
  121. for _c in list_comsumer:
  122. _c.conn.disconnect()
  123. def attachment_listener_handler(self,_dict):
  124. try:
  125. frame = _dict["frame"]
  126. conn = _dict["conn"]
  127. message_id = frame.headers["message-id"]
  128. item = json.loads(frame.body)
  129. _idx = _dict.get("idx",1)
  130. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  131. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  132. if random.random()<0.2:
  133. log("jump by random")
  134. if send_msg_toacmq(self.pool_mq,frame.body,self.mq_attachment):
  135. ackMsg(conn,message_id)
  136. return
  137. if len(page_attachments)==0:
  138. newitem ={"item":item,"list_attach":[],"message_id":message_id,"conn":conn}
  139. else:
  140. list_fileMd5 = []
  141. for _atta in page_attachments:
  142. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  143. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  144. newitem = {"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn}
  145. log("attachment get doc:%s"%(str(newitem.get("item",{}).get("docid"))))
  146. self.attachment_recognize(newitem,None)
  147. log("attachment get doc:%s succeed"%(str(newitem.get("item",{}).get("docid"))))
  148. except Exception as e:
  149. traceback.print_exc()
  150. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  151. try:
  152. list_html = []
  153. swf_urls = []
  154. _not_failed = True
  155. for _attach in list_attach:
  156. #测试全跑
  157. _filemd5 = _attach.getProperties().get(attachment_filemd5)
  158. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  159. log("%s has processed or toolarge"%(_filemd5))
  160. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  161. if _html is None:
  162. _html = ""
  163. list_html.append({attachment_filemd5:_filemd5,
  164. "html":_html})
  165. else:
  166. #has process_time then jump
  167. if len(str(_attach.getProperties().get(attachment_process_time,"")))>10 and _attach.getProperties().get(attachment_status)!=ATTACHMENT_INIT and not (_attach.getProperties().get(attachment_status)>=ATTACHMENT_MC_FAILED_FROM and _attach.getProperties().get(attachment_status)<=ATTACHMENT_MC_FAILED_TO):
  168. log("%s has process_time jump"%(_filemd5))
  169. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  170. if _html is None:
  171. _html = ""
  172. list_html.append({attachment_filemd5:_filemd5,
  173. "html":_html})
  174. else:
  175. log("%s requesting interface"%(_filemd5))
  176. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  177. if not _succeed:
  178. _not_failed = False
  179. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  180. if _html is None:
  181. _html = ""
  182. list_html.append({attachment_filemd5:_filemd5,
  183. "html":_html})
  184. if _attach.getProperties().get(attachment_filetype)=="swf":
  185. # swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  186. _swf_urls = _attach.getProperties().get(attachment_swfUrls, "[]")
  187. if _swf_urls:
  188. _swf_urls = _swf_urls.replace('\\', '')
  189. else:
  190. _swf_urls = '[]'
  191. _swf_urls = json.loads(_swf_urls)
  192. swf_urls.extend(_swf_urls)
  193. if not _not_failed:
  194. return False,list_html,swf_urls
  195. return True,list_html,swf_urls
  196. except requests.ConnectionError as e1:
  197. raise e1
  198. except Exception as e:
  199. return False,list_html,swf_urls
  200. def attachment_recognize(self,_dict,result_queue):
  201. '''
  202. 识别附件内容
  203. :param _dict: 附件内容
  204. :param result_queue:
  205. :return:
  206. '''
  207. try:
  208. start_time = time.time()
  209. item = _dict.get("item")
  210. list_attach = _dict.get("list_attach")
  211. conn = _dict["conn"]
  212. message_id = _dict.get("message_id")
  213. if "retry_times" not in item:
  214. item["retry_times"] = 5
  215. _retry_times = item.get("retry_times",0)
  216. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  217. "docid":item.get("docid")})
  218. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  219. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  220. dhtml.delete_bidi_a()
  221. #调用识别接口
  222. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  223. # 将附件分类写回document
  224. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  225. if len(page_attachments)>0:
  226. for _attachment in page_attachments:
  227. filemd5 = _attachment.get(document_attachment_path_filemd5,"")
  228. classification = None
  229. for _attach in list_attach:
  230. if _attach.getProperties().get(attachment_filemd5,"")==filemd5:
  231. classification = _attach.getProperties().get(attachment_classification,"")
  232. break
  233. if classification is not None:
  234. _attachment[attachment_classification] = classification
  235. item[document_tmp_attachment_path] = json.dumps(page_attachments,ensure_ascii=False)
  236. dtmp = Document_tmp(item)
  237. _to_ack = False
  238. if not _succeed and _retry_times<self.retry_times:
  239. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  240. item["retry_times"] = _retry_times+1
  241. #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
  242. if item["retry_times"]>=self.retry_times:
  243. send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
  244. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
  245. #失败保存
  246. if _retry_times==0:
  247. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  248. dtmp.setValue(document_tmp_status,0,True)
  249. if not dtmp.exists_row(self.ots_client):
  250. dtmp.update_row(self.ots_client)
  251. dhtml.update_row(self.ots_client)
  252. if send_succeed:
  253. _to_ack = True
  254. else:
  255. try:
  256. log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),str(len(list_html))))
  257. dhtml.updateSWFImages(swf_urls)
  258. dhtml.updateAttachment(list_html)
  259. dtmp.setValue(document_tmp_attachment_extract_status,1,True)
  260. dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
  261. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
  262. if send_succeed:
  263. _to_ack = True
  264. except Exception as e:
  265. traceback.print_exc()
  266. if _to_ack:
  267. ackMsg(conn,message_id)
  268. log("document:%d get attachments with result:%s %s retry_times:%d"%(item.get("docid"),str(_succeed),str(_to_ack),_retry_times))
  269. except Exception as e:
  270. traceback.print_exc()
  271. if time.time()-start_time<10:
  272. item["retry_times"] -= 1
  273. if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment):
  274. ackMsg(conn,message_id)
  275. def request_attachment_interface(self,attach,_dochtmlcon):
  276. filemd5 = attach.getProperties().get(attachment_filemd5)
  277. _status = attach.getProperties().get(attachment_status)
  278. _filetype = attach.getProperties().get(attachment_filetype)
  279. _path = attach.getProperties().get(attachment_path)
  280. _uuid = uuid4()
  281. objectPath = attach.getProperties().get(attachment_path)
  282. docids = attach.getProperties().get(attachment_docids)
  283. _ots_exists = attach.getProperties().get("ots_exists")
  284. if objectPath is None:
  285. relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex)
  286. else:
  287. relative_path = objectPath[5:].replace("//","/")
  288. localpath = "/FileInfo/%s"%(relative_path)
  289. if not os.path.exists(localpath):
  290. if not os.path.exists(os.path.dirname(localpath)):
  291. os.makedirs(os.path.dirname(localpath))
  292. local_exists = False
  293. else:
  294. local_exists = True
  295. _size = os.path.getsize(localpath)
  296. not_failed_flag = True
  297. try:
  298. d_start_time = time.time()
  299. if not local_exists:
  300. log("md5:%s path:%s not exists,start downloading"%(filemd5,objectPath))
  301. try:
  302. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  303. except Exception as e:
  304. download_succeed = False
  305. else:
  306. log("md5:%s path:%s exists"%(filemd5,objectPath[5:]))
  307. if not (local_exists or download_succeed):
  308. _ots_attach = attachment(attach.getProperties_ots())
  309. _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_classification,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True)
  310. log("md5:%s path:%s file not in local or oss,search ots.attachment"%(filemd5,objectPath))
  311. if _ots_attach.getProperties().get(attachment_attachmenthtml,"")!="" and str(_ots_attach.getProperties().get(attachment_status))!=str(ATTACHMENT_INIT):
  312. attach.setValue(attachment_attachmenthtml,_ots_attach.getProperties().get(attachment_attachmenthtml,""))
  313. attach.setValue(attachment_attachmentcon,_ots_attach.getProperties().get(attachment_attachmentcon,""))
  314. attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,""))
  315. attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,""))
  316. attach.setValue(attachment_classification,_ots_attach.getProperties().get(attachment_classification,""))
  317. # if attach.exists(self.attach_pool):
  318. # attach.update_row(self.attach_pool)
  319. # else:
  320. # attach.insert_row(self.attach_pool)
  321. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  322. try:
  323. if os.exists(localpath):
  324. os.remove(localpath)
  325. except Exception as e:
  326. pass
  327. return True
  328. if _ots_exists:
  329. objectPath = attach.getProperties().get(attachment_path)
  330. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  331. if download_succeed:
  332. log("md5:%s path:%s download file from oss succeed"%(filemd5,objectPath))
  333. else:
  334. log("md5:%s path:%s download file from ots failed=="%(filemd5,objectPath))
  335. else:
  336. log("md5:%s path:%s not found in ots"%(filemd5,objectPath))
  337. if local_exists or download_succeed:
  338. _size = os.path.getsize(localpath)
  339. attach.setValue(attachment_size,_size,True)
  340. if _size>ATTACHMENT_LARGESIZE:
  341. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE,True)
  342. log("attachment :%s of path:%s to large"%(filemd5,_path))
  343. _ots_attach = attachment(attach.getProperties_ots())
  344. _ots_attach.update_row(self.ots_client)
  345. # #更新postgres
  346. # if attach.exists(self.attach_pool):
  347. # attach.update_row(self.attach_pool)
  348. # else:
  349. # attach.insert_row(self.attach_pool)
  350. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  351. if local_exists:
  352. if not _ots_exists:
  353. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  354. os.remove(localpath)
  355. return True
  356. time_download = time.time()-d_start_time
  357. #调用接口处理结果
  358. start_time = time.time()
  359. _filetype = attach.getProperties().get(attachment_filetype)
  360. # _data_base64 = base64.b64encode(open(localpath,"rb").read())
  361. # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
  362. _success,_html,swf_images,classification = getAttachDealInterface(None,_filetype,path=localpath,session=self.session)
  363. _reg_time = time.time()-start_time
  364. if _success:
  365. if len(_html)<5:
  366. _html = ""
  367. else:
  368. if len(_html)>1:
  369. _html = "interface return error"
  370. else:
  371. # sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  372. _html = ""
  373. return False
  374. # 重跑swf时,删除原来的swf_urls中的"\"
  375. if attach.getProperties().get(attachment_filetype) == "swf":
  376. swf_urls = attach.getProperties().get(attachment_swfUrls, "[]")
  377. swf_urls = swf_urls.replace('\\', '') if swf_urls else '[]'
  378. swf_urls = json.loads(swf_urls)
  379. attach.setValue(attachment_swfUrls, json.dumps(swf_urls, ensure_ascii=False), True)
  380. swf_images = eval(swf_images)
  381. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  382. swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  383. if len(swf_urls)==0:
  384. objectPath = attach.getProperties().get(attachment_path,"")
  385. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  386. if not os.path.exists(swf_dir):
  387. os.mkdir(swf_dir)
  388. for _i in range(len(swf_images)):
  389. _base = swf_images[_i]
  390. _base = base64.b64decode(_base)
  391. filename = "swf_page_%d.png"%(_i)
  392. filepath = os.path.join(swf_dir,filename)
  393. with open(filepath,"wb") as f:
  394. f.write(_base)
  395. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  396. if os.path.exists(swf_dir):
  397. os.rmdir(swf_dir)
  398. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  399. else:
  400. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  401. if re.search("<td",_html) is not None:
  402. attach.setValue(attachment_has_table,1,True)
  403. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  404. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  405. if _file_title!="":
  406. attach.setValue(attachment_file_title,_file_title,True)
  407. if filelink!="":
  408. attach.setValue(attachment_file_link,filelink,True)
  409. attach.setValue(attachment_attachmenthtml,_html,True)
  410. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  411. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  412. attach.setValue(attachment_recsize,len(_html),True)
  413. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  414. attach.setValue(attachment_classification,classification,True)
  415. #更新ots
  416. _ots_attach = attachment(attach.getProperties_ots())
  417. _ots_attach.update_row(self.ots_client) #线上再开放更新
  418. # #更新postgres
  419. # if attach.exists(self.attach_pool):
  420. # attach.update_row(self.attach_pool)
  421. # else:
  422. # attach.insert_row(self.attach_pool)
  423. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  424. start_time = time.time()
  425. if local_exists:
  426. if not _ots_exists:
  427. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  428. try:
  429. if upload_status and os.exists(localpath):
  430. os.remove(localpath)
  431. except Exception as e:
  432. pass
  433. _upload_time = time.time()-start_time
  434. log("process filemd5:%s %s of type:%s with size:%.3fM download:%.2fs recognize takes %ds upload takes %.2fs _ots_exists %s,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,_reg_time,_upload_time,str(_ots_exists),len(_html)))
  435. return True
  436. else:
  437. return True
  438. except requests.ConnectionError as e1:
  439. raise e1
  440. except oss2.exceptions.NotFound as e:
  441. return True
  442. except Exception as e:
  443. traceback.print_exc()
  444. # def flow_attachment(self):
  445. # self.flow_attachment_producer()
  446. # self.flow_attachment_producer_comsumer()
  447. def getAttachPath(self,filemd5,_dochtmlcon):
  448. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  449. list_mark = ["data","filelink"]
  450. for _mark in list_mark:
  451. _find = _soup.find("a",attrs={_mark:filemd5})
  452. filelink = ""
  453. if _find is None:
  454. _find = _soup.find("img",attrs={_mark:filemd5})
  455. if _find is not None:
  456. filelink = _find.attrs.get("src","")
  457. else:
  458. filelink = _find.attrs.get("href","")
  459. if filelink.find("bidizhaobiao")>=0:
  460. _path = filelink.split("/file")
  461. if len(_path)>1:
  462. return _path[1]
  463. def getAttach_json_fromRedis(self,filemd5):
  464. db = self.redis_pool.getConnector()
  465. try:
  466. _key = "attach-%s"%(filemd5)
  467. _attach_json = db.get(_key)
  468. return _attach_json
  469. except Exception as e:
  470. log("getAttach_json_fromRedis error %s"%(str(e)))
  471. finally:
  472. try:
  473. if db.connection.check_health():
  474. self.redis_pool.putConnector(db)
  475. except Exception as e:
  476. pass
  477. return None
  478. def putAttach_json_toRedis(self,filemd5,extract_dict):
  479. db = self.redis_pool.getConnector()
  480. try:
  481. new_dict = {}
  482. for k,v in extract_dict.items():
  483. if not isinstance(v,set):
  484. new_dict[k] = v
  485. _key = "attach-%s"%(filemd5)
  486. _extract_json = db.set(str(_key),json.dumps(new_dict))
  487. db.expire(_key,3600*3)
  488. return _extract_json
  489. except Exception as e:
  490. log("putExtract_json_toRedis error%s"%(str(e)))
  491. traceback.print_exc()
  492. finally:
  493. try:
  494. if db.connection.check_health():
  495. self.redis_pool.putConnector(db)
  496. except Exception as e:
  497. pass
  498. def getAttachments(self,list_filemd5,_dochtmlcon):
  499. conn = self.attach_pool.getConnector()
  500. #搜索postgres
  501. try:
  502. to_find_md5 = []
  503. for _filemd5 in list_filemd5[:50]:
  504. if _filemd5 is not None:
  505. to_find_md5.append(_filemd5)
  506. conditions = ["filemd5 in ('%s')"%("','".join(to_find_md5))]
  507. list_attachment = []
  508. set_md5 = set()
  509. # list_attachment = Attachment_postgres.select_rows(conn,Attachment_postgres,"attachment",conditions)
  510. # for _attach in list_attachment:
  511. # set_md5.add(_attach.getProperties().get(attachment_filemd5))
  512. for _filemd5 in to_find_md5:
  513. _json = self.getAttach_json_fromRedis(_filemd5)
  514. if _json is not None:
  515. set_md5.add(_filemd5)
  516. list_attachment.append(Attachment_postgres(json.loads(_json)))
  517. log("select localpath database %d/%d"%(len(set_md5),len(to_find_md5)))
  518. for _filemd5 in to_find_md5:
  519. if _filemd5 not in set_md5:
  520. _path = self.getAttachPath(_filemd5,_dochtmlcon)
  521. log("getAttachments search in ots:%s"%(_filemd5))
  522. _attach = {attachment_filemd5:_filemd5}
  523. _attach_ots = attachment(_attach)
  524. if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time,attachment_classification],True):
  525. if _attach_ots.getProperties().get(attachment_status) is not None:
  526. log("getAttachments find in ots:%s"%(_filemd5))
  527. _attach_pg = Attachment_postgres(_attach_ots.getProperties())
  528. _attach_pg.setValue("ots_exists",True,True)
  529. list_attachment.append(_attach_pg)
  530. else:
  531. log("getAttachments status None find in ots:%s"%(_filemd5))
  532. _attach_pg = Attachment_postgres(_attach_ots.getProperties())
  533. _attach_pg.setValue("ots_exists",True,True)
  534. list_attachment.append(_attach_pg)
  535. else:
  536. log("getAttachments search in path:%s"%(_filemd5))
  537. if _path:
  538. log("getAttachments find in path:%s"%(_filemd5))
  539. if _path[0]=="/":
  540. _path = _path[1:]
  541. _filetype = _path.split(".")[-1]
  542. _attach = {attachment_filemd5:_filemd5,
  543. attachment_filetype:_filetype,
  544. attachment_status:20,
  545. attachment_path:"%s/%s"%(_filemd5[:4],_path),
  546. attachment_crtime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  547. list_attachment.append(Attachment_postgres(_attach))
  548. return list_attachment
  549. except Exception as e:
  550. log("attachProcess comsumer error %s"%str(e))
  551. log(str(to_find_md5))
  552. traceback.print_exc()
  553. return []
  554. finally:
  555. self.attach_pool.putConnector(conn)
  556. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  557. q_size = self.queue_attachment.qsize()
  558. qsize_ocr = self.queue_attachment_ocr.qsize()
  559. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  560. log("queue_attachment:%d,queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(q_size,qsize_ocr,qsize_not_ocr))
  561. def flow_attachment_producer_comsumer(self):
  562. log("start flow_attachment comsumer")
  563. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1,need_stop=False,restart=True)
  564. mt.run()
  565. def flow_attachment_process(self):
  566. self.process_comsumer()
  567. # p = Process(target = self.process_comsumer)
  568. # p.start()
  569. # p.join()
  570. def set_queue(self,_dict):
  571. list_attach = _dict.get("list_attach")
  572. to_ocr = False
  573. for attach in list_attach:
  574. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  575. to_ocr = True
  576. break
  577. if to_ocr:
  578. self.queue_attachment_ocr.put(_dict,True)
  579. else:
  580. self.queue_attachment_not_ocr.put(_dict,True)
  581. def comsumer_handle(self,_dict,result_queue):
  582. try:
  583. frame = _dict["frame"]
  584. conn = _dict["conn"]
  585. message_id = frame.headers["message-id"]
  586. item = json.loads(frame.body)
  587. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  588. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  589. if len(page_attachments)==0:
  590. self.set_queue({"item":item,"list_attach":[],"message_id":message_id,"conn":conn})
  591. else:
  592. list_fileMd5 = []
  593. for _atta in page_attachments:
  594. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  595. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  596. self.set_queue({"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn})
  597. except Exception as e:
  598. traceback.print_exc()
  599. def remove_attachment_postgres(self):
  600. current_date = getCurrent_date(format="%Y-%m-%d")
  601. last_date = timeAdd(current_date,-2,format="%Y-%m-%d")
  602. sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
  603. conn = self.attach_pool.getConnector()
  604. try:
  605. cursor = conn.cursor()
  606. cursor.execute(sql)
  607. conn.commit()
  608. self.attach_pool.putConnector(conn)
  609. except Exception as e:
  610. conn.close()
  611. def start_flow_attachment(self):
  612. schedule = BlockingScheduler()
  613. # schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  614. # schedule.add_job(self.flow_attachment,"cron",second="*/10")
  615. # schedule.add_job(self.flow_attachment_producer,"cron",second="*/10")
  616. # schedule.add_job(self.flow_attachment_producer_comsumer,"cron",second="*/10")
  617. # schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  618. # schedule.add_job(self.monitor_attachment_process,"cron",second="*/10")
  619. # schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
  620. schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
  621. schedule.start()
  622. class Dataflow_ActivteMQ_extract(Dataflow_extract):
  623. class ExtractListener():
  624. def __init__(self,conn,_func,_idx,*args,**kwargs):
  625. self.conn = conn
  626. self._func = _func
  627. self._idx = _idx
  628. def on_message(self, headers):
  629. try:
  630. log("get message of idx:%d"%(self._idx))
  631. message_id = headers.headers["message-id"]
  632. body = headers.body
  633. log("get message %s crtime:%s"%(message_id,json.loads(body).get("crtime","")))
  634. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  635. except Exception as e:
  636. traceback.print_exc()
  637. pass
  638. def on_error(self, headers):
  639. log('received an error %s' % str(headers.body))
  640. def __del__(self):
  641. self.conn.disconnect()
  642. def __init__(self,create_listener=True):
  643. Dataflow_extract.__init__(self)
  644. self.industy_url = "http://127.0.0.1:15000/industry_extract"
  645. self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",20],
  646. ["http://192.168.0.115:15030/content_extract",10]
  647. ]
  648. self.mq_extract = "/queue/dataflow_extract"
  649. self.mq_extract_ai = "/queue/dataflow_extract_AI"
  650. self.mq_extract_failed = "/queue/dataflow_extract_failed"
  651. self.whole_weight = 0
  652. for _url,weight in self.extract_interfaces:
  653. self.whole_weight+= weight
  654. current_weight = 0
  655. for _i in range(len(self.extract_interfaces)):
  656. current_weight += self.extract_interfaces[_i][1]
  657. self.extract_interfaces[_i][1] = current_weight/self.whole_weight
  658. self.comsumer_count = 5
  659. # self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres)
  660. self.pool_redis_doc = ConnectorPool(1,self.comsumer_count,getConnect_redis_doc)
  661. init_nums = 1
  662. self.conn_mq = None
  663. if create_listener:
  664. self.conn_mq = getConnect_activateMQ()
  665. else:
  666. init_nums = 0
  667. self.pool_mq = ConnectorPool(init_nums,30,getConnect_activateMQ)
  668. self.block_url = RLock()
  669. self.url_count = 0
  670. self.session = None
  671. self.MP = MedicalProduct()
  672. self.list_extract_comsumer = []
  673. self.list_extract_ai_comsumer = []
  674. # for _i in range(self.comsumer_count):
  675. # listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  676. # createComsumer(listener_extract,self.mq_extract)
  677. # self.list_extract_comsumer.append(listener_extract)
  678. # 提取listener
  679. if create_listener:
  680. for ii in range(10):
  681. listener_p = Process(target=self.start_extract_listener)
  682. listener_p.start()
  683. listener_p_ai = Thread(target=self.start_extract_AI_listener)
  684. listener_p_ai.start()
  685. def start_extract_AI_listener(self,_count=4):
  686. self.list_extract_ai_comsumer = []
  687. for _i in range(_count):
  688. listener_extract = self.ExtractListener(getConnect_activateMQ(),self.extract_ai_handle,_i)
  689. createComsumer(listener_extract,self.mq_extract_ai)
  690. self.list_extract_ai_comsumer.append(listener_extract)
  691. while 1:
  692. try:
  693. for _i in range(len(self.list_extract_ai_comsumer)):
  694. if self.list_extract_ai_comsumer[_i].conn.is_connected():
  695. continue
  696. else:
  697. listener = self.ExtractListener(getConnect_activateMQ(),self.extract_ai_handle,_i)
  698. createComsumer(listener,self.mq_extract_ai)
  699. self.list_extract_ai_comsumer[_i] = listener
  700. time.sleep(5)
  701. except Exception as e:
  702. traceback.print_exc()
  703. def start_extract_listener(self):
  704. self.list_extract_comsumer = []
  705. for _i in range(self.comsumer_count):
  706. listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  707. createComsumer(listener_extract,self.mq_extract)
  708. self.list_extract_comsumer.append(listener_extract)
  709. while 1:
  710. try:
  711. for _i in range(len(self.list_extract_comsumer)):
  712. if self.list_extract_comsumer[_i].conn.is_connected():
  713. continue
  714. else:
  715. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  716. createComsumer(listener,self.mq_extract)
  717. self.list_extract_comsumer[_i] = listener
  718. time.sleep(5)
  719. except Exception as e:
  720. traceback.print_exc()
  721. def monitor_listener(self):
  722. for i in range(len(self.list_extract_comsumer)):
  723. if self.list_extract_comsumer[i].conn.is_connected():
  724. continue
  725. else:
  726. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
  727. createComsumer(listener,self.mq_extract)
  728. self.list_extract_comsumer[i] = listener
  729. def getExtract_url(self):
  730. # _url_num = 0
  731. # with self.block_url:
  732. # self.url_count += 1
  733. # self.url_count %= self.whole_weight
  734. # _url_num = self.url_count
  735. _r = random.random()
  736. # _r = _url_num/self.whole_weight
  737. for _i in range(len(self.extract_interfaces)):
  738. if _r<=self.extract_interfaces[_i][1]:
  739. return self.extract_interfaces[_i][0]
  740. def request_extract_interface(self,json,headers):
  741. # _i = random.randint(0,len(self.extract_interfaces)-1)
  742. # _i = 0
  743. # _url = self.extract_interfaces[_i]
  744. _url = self.getExtract_url()
  745. log("extract_url:%s"%(str(_url)))
  746. with requests.Session() as session:
  747. resp = session.post(_url,json=json,headers=headers,timeout=10*60)
  748. return resp
  749. def request_industry_interface(self,json,headers):
  750. resp = requests.post(self.industy_url,json=json,headers=headers)
  751. return resp
  752. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  753. q_size = self.queue_extract.qsize()
  754. log("queue extract size:%d"%(q_size))
  755. def process_extract_failed(self):
  756. def _handle(_dict,result_queue):
  757. frame = _dict.get("frame")
  758. message_id = frame.headers["message-id"]
  759. subscription = frame.headers.setdefault('subscription', None)
  760. conn = _dict.get("conn")
  761. body = frame.body
  762. if body is not None:
  763. item = json.loads(body)
  764. item["extract_times"] = 10
  765. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  766. ackMsg(conn,message_id,subscription)
  767. from BaseDataMaintenance.java.MQInfo import getQueueSize
  768. try:
  769. extract_failed_size = getQueueSize("dataflow_extract_failed")
  770. extract_size = getQueueSize("dataflow_extract")
  771. log("extract_failed_size %s extract_size %s"%(str(extract_failed_size),str(extract_size)))
  772. if extract_failed_size>0 and extract_size<100:
  773. failed_listener = self.ExtractListener(getConnect_activateMQ(),_handle,1)
  774. createComsumer(failed_listener,self.mq_extract_failed)
  775. while 1:
  776. extract_failed_size = getQueueSize("dataflow_extract_failed")
  777. if extract_failed_size==0:
  778. break
  779. time.sleep(10)
  780. failed_listener.conn.disconnect()
  781. except Exception as e:
  782. traceback.print_exc()
  783. def flow_extract(self,):
  784. self.comsumer()
  785. def comsumer(self):
  786. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,20,1,True)
  787. mt.run()
  788. def getExtract_json_fromDB(self,_fingerprint):
  789. conn = self.pool_postgres.getConnector()
  790. try:
  791. list_extract = Document_extract_postgres.select_rows(conn,Document_extract_postgres,"document_extract",[" fingerprint='%s'"%_fingerprint])
  792. if len(list_extract)>0:
  793. _extract = list_extract[0]
  794. return _extract.getProperties().get(document_extract_extract_json)
  795. except Exception as e:
  796. traceback.print_exc()
  797. finally:
  798. self.pool_postgres.putConnector(conn)
  799. return None
  800. def putExtract_json_toDB(self,fingerprint,docid,extract_json):
  801. _de = Document_extract_postgres({document_extract_fingerprint:fingerprint,
  802. document_extract_docid:docid,
  803. document_extract_extract_json:extract_json})
  804. _de.insert_row(self.pool_postgres,1)
  805. def getExtract_json_fromRedis(self,_fingerprint):
  806. db = self.pool_redis_doc.getConnector()
  807. try:
  808. _extract_json = db.get(_fingerprint)
  809. return _extract_json
  810. except Exception as e:
  811. log("getExtract_json_fromRedis error %s"%(str(e)))
  812. finally:
  813. try:
  814. if db.connection.check_health():
  815. self.pool_redis_doc.putConnector(db)
  816. except Exception as e:
  817. pass
  818. return None
  819. def putExtract_json_toRedis(self,fingerprint,extract_json):
  820. db = self.pool_redis_doc.getConnector()
  821. try:
  822. _extract_json = db.set(str(fingerprint),extract_json)
  823. db.expire(fingerprint,3600*2)
  824. return _extract_json
  825. except Exception as e:
  826. log("putExtract_json_toRedis error%s"%(str(e)))
  827. traceback.print_exc()
  828. finally:
  829. try:
  830. if db.connection.check_health():
  831. self.pool_redis_doc.putConnector(db)
  832. except Exception as e:
  833. pass
  834. def comsumer_handle(self,_dict,result_queue):
  835. try:
  836. log("start handle")
  837. data = {}
  838. frame = _dict["frame"]
  839. conn = _dict["conn"]
  840. message_id = frame.headers["message-id"]
  841. subscription = frame.headers.setdefault('subscription', None)
  842. item = json.loads(frame.body)
  843. for k,v in item.items():
  844. try:
  845. if isinstance(v,bytes):
  846. item[k] = v.decode("utf-8")
  847. except Exception as e:
  848. log("docid %d types bytes can not decode"%(item.get("docid")))
  849. item[k] = ""
  850. dtmp = Document_tmp(item)
  851. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  852. "docid":item.get("docid")})
  853. extract_times = item.get("extract_times",0)+1
  854. item["extract_times"] = extract_times
  855. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  856. html_len = len(_dochtmlcon) # html 文本长度
  857. limit_text_len = 50000 # 内容(或附件)正文限制文本长度
  858. if html_len > limit_text_len:
  859. log("docid %s dochtmlcon too long len %d "%(str(item.get("docid")),html_len))
  860. try:
  861. _dochtmlcon = re.sub("<html>|</html>|<body>|</body>", "", _dochtmlcon)
  862. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  863. all_len = len(_soup.get_text()) # 全公告内容text长度
  864. _attachment = _soup.find("div", attrs={"class": "richTextFetch"})
  865. attachment_len = len(_attachment.get_text()) if _attachment else 0 # 附件内容text长度
  866. main_text_len = all_len - attachment_len # 正文内容text长度
  867. if attachment_len>150000: # 附件内容过长删除(处理超时)
  868. if _attachment is not None:
  869. _attachment.decompose()
  870. attachment_len = 0
  871. # 正文或附件内容text长度大于limit_text_len才执行article_limit
  872. if main_text_len>limit_text_len or attachment_len>limit_text_len:
  873. _soup = article_limit(_soup,limit_text_len)
  874. _dochtmlcon = str(_soup)
  875. except Exception as e:
  876. traceback.print_exc()
  877. ackMsg(conn,message_id,subscription)
  878. return
  879. log("docid %s len %d limit to %d"%(str(item.get("docid")),html_len,len(_dochtmlcon)))
  880. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  881. _extract = Document_extract({})
  882. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  883. _extract.setValue(document_extract2_docid,item.get(document_docid))
  884. all_done = 1
  885. for k,v in item.items():
  886. data[k] = v
  887. data["timeout"] = 440
  888. data["doc_id"] = data.get(document_tmp_docid,0)
  889. # if data["docid"]<298986054 and data["docid"]>0:
  890. # log("jump docid %s"%(str(data["docid"])))
  891. # ackMsg(conn,message_id,subscription)
  892. # return
  893. data["content"] = data.get(document_tmp_dochtmlcon,"")
  894. if document_tmp_dochtmlcon in data:
  895. data.pop(document_tmp_dochtmlcon)
  896. data["title"] = data.get(document_tmp_doctitle,"")
  897. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  898. data["web_source_name"] = item.get(document_tmp_web_source_name,"")
  899. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  900. data["page_attachments"] = item.get(document_tmp_attachment_path,"[]")
  901. _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))+str(data["original_docchannel"])
  902. to_ai = False
  903. if all_done>0:
  904. _time = time.time()
  905. # extract_json = self.getExtract_json_fromDB(_fingerprint)
  906. extract_json = self.getExtract_json_fromRedis(_fingerprint)
  907. log("get json from db takes %.4f"%(time.time()-_time))
  908. # extract_json = None
  909. _docid = int(data["doc_id"])
  910. if extract_json is not None:
  911. log("fingerprint %s exists docid:%s"%(_fingerprint,str(_docid)))
  912. _extract.setValue(document_extract2_extract_json,extract_json,True)
  913. else:
  914. resp = self.request_extract_interface(json=data,headers=self.header)
  915. if (resp.status_code >=200 and resp.status_code<=213):
  916. extract_json = resp.content.decode("utf8")
  917. _extract.setValue(document_extract2_extract_json,extract_json,True)
  918. _time = time.time()
  919. # self.putExtract_json_toDB(_fingerprint,_docid,extract_json)
  920. self.putExtract_json_toRedis(_fingerprint,extract_json)
  921. log("get json to db takes %.4f"%(time.time()-_time))
  922. else:
  923. log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  924. all_done = -2
  925. to_ai,_reason = self.should_to_extract_ai(extract_json)
  926. if to_ai:
  927. log("to_ai of docid:%s of reason:%s"%(str(_docid),str(_reason)))
  928. # if all_done>0:
  929. # resp = self.request_industry_interface(json=data,headers=self.header)
  930. # if (resp.status_code >=200 and resp.status_code<=213):
  931. # _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  932. # else:
  933. # log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  934. # all_done = -3
  935. # _to_ack = False
  936. # if all_done>0 and len(_extract.getProperties().get(document_extract2_extract_json,""))<=2:
  937. # all_done = -4
  938. _extract.setValue(document_extract2_industry_json,"{}",True)
  939. _to_ack = True
  940. try:
  941. if all_done!=1:
  942. # sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  943. log("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  944. if extract_times>=10:
  945. #process as succeed
  946. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  947. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  948. dtmp.update_row(self.ots_client)
  949. dhtml.update_row(self.ots_client)
  950. #replace as {}
  951. _extract.setValue(document_extract2_extract_json,"{}",True)
  952. _extract.setValue(document_extract2_industry_json,"{}",True)
  953. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  954. _extract.update_row(self.ots_client)
  955. _to_ack = True
  956. elif extract_times>5:
  957. #transform to the extract_failed queue
  958. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  959. #process as succeed
  960. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  961. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  962. dtmp.update_row(self.ots_client)
  963. dhtml.update_row(self.ots_client)
  964. #replace as {}
  965. _extract.setValue(document_extract2_extract_json,"{}",True)
  966. _extract.setValue(document_extract2_industry_json,"{}",True)
  967. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  968. _extract.update_row(self.ots_client)
  969. _to_ack = True
  970. else:
  971. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
  972. #失败保存
  973. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  974. dtmp.setValue(document_tmp_status,60,True)
  975. if not dtmp.exists_row(self.ots_client):
  976. dtmp.update_row(self.ots_client)
  977. dhtml.update_row(self.ots_client)
  978. if send_succeed:
  979. _to_ack = True
  980. else:
  981. #process succeed
  982. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  983. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  984. if _docid==290816703:
  985. dtmp.setValue("test_json",extract_json,True)
  986. dtmp.update_row(self.ots_client)
  987. dhtml.update_row(self.ots_client)
  988. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  989. _extract.update_row(self.ots_client)
  990. _to_ack = True
  991. except Exception:
  992. traceback.print_exc()
  993. if _to_ack:
  994. ackMsg(conn,message_id,subscription)
  995. if to_ai:
  996. #sent to ai
  997. item[document_extract2_extract_json] = json.loads(_extract.getProperties().get(document_extract2_extract_json,"{}"))
  998. send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai)
  999. else:
  1000. item["extract_times"] -= 1
  1001. send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
  1002. ackMsg(conn,message_id,subscription)
  1003. log("process %s docid:%d %s"%(str(_to_ack),data.get("doc_id"),str(all_done)))
  1004. except requests.ConnectionError as e1:
  1005. item["extract_times"] -= 1
  1006. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  1007. ackMsg(conn,message_id,subscription)
  1008. except Exception as e:
  1009. traceback.print_exc()
  1010. # sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
  1011. log("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
  1012. log("process %s docid: failed message_id:%s"%(data.get("doc_id"),message_id))
  1013. if extract_times>=10:
  1014. #process as succeed
  1015. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1016. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1017. dtmp.update_row(self.ots_client)
  1018. dhtml.update_row(self.ots_client)
  1019. #replace as {}
  1020. _extract.setValue(document_extract2_extract_json,"{}",True)
  1021. _extract.setValue(document_extract2_industry_json,"{}",True)
  1022. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1023. _extract.update_row(self.ots_client)
  1024. ackMsg(conn,message_id,subscription)
  1025. elif extract_times>5:
  1026. #transform to the extract_failed queue
  1027. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  1028. #process as succeed
  1029. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1030. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1031. dtmp.update_row(self.ots_client)
  1032. dhtml.update_row(self.ots_client)
  1033. #replace as {}
  1034. _extract.setValue(document_extract2_extract_json,"{}",True)
  1035. _extract.setValue(document_extract2_industry_json,"{}",True)
  1036. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1037. _extract.update_row(self.ots_client)
  1038. ackMsg(conn,message_id,subscription)
  1039. else:
  1040. #transform to the extract queue
  1041. #失败保存
  1042. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1043. dtmp.setValue(document_tmp_status,60,True)
  1044. if not dtmp.exists_row(self.ots_client):
  1045. dtmp.update_row(self.ots_client)
  1046. dhtml.update_row(self.ots_client)
  1047. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  1048. ackMsg(conn,message_id,subscription)
  1049. def should_to_extract_ai(self,extract_json):
  1050. _reason = ""
  1051. # return False,_reason
  1052. _extract = {}
  1053. if extract_json is not None:
  1054. try:
  1055. _extract = json.loads(extract_json)
  1056. except Exception as e:
  1057. pass
  1058. has_entity = False
  1059. docchannel = _extract.get("docchannel",{}).get("docchannel","")
  1060. doctype = _extract.get("docchannel",{}).get("doctype","")
  1061. entity_len = len(_extract.get("dict_enterprise",{}).keys())
  1062. product = str(_extract.get("product",""))
  1063. class_name = _extract.get("industry",{}).get("class_name","")
  1064. _entity = None
  1065. if entity_len>0:
  1066. has_entity = True
  1067. if entity_len==1:
  1068. _entity = list(_extract.get("dict_enterprise",{}).keys())[0]
  1069. prem = _extract.get("prem",{})
  1070. one_entity_used = False
  1071. has_tenderee = False
  1072. has_win_tenderer = False
  1073. has_budget = False
  1074. budget_unexpected = False
  1075. winprice_unexpected = False
  1076. for _pack,_pack_value in prem.items():
  1077. _rolelist = _pack_value.get("roleList",[])
  1078. for _role in _rolelist:
  1079. if _role.get("role_name","")=="tenderee":
  1080. has_tenderee = True
  1081. if _role.get("role_text","")==_entity:
  1082. one_entity_used = True
  1083. if _role.get("role_name","")=="agency":
  1084. if _role.get("role_text","")==_entity:
  1085. one_entity_used = True
  1086. if _role.get("role_name","")=="win_tenderer":
  1087. has_win_tenderer = True
  1088. if _role.get("role_text","")==_entity:
  1089. one_entity_used = True
  1090. win_price = _role.get("role_money",{}).get("money",0)
  1091. try:
  1092. win_price = float(win_price)
  1093. except Exception as e:
  1094. win_price = 0
  1095. if win_price>0:
  1096. if win_price>100000000 or win_price<100:
  1097. winprice_unexpected = True
  1098. tendereeMoney = _pack_value.get("tendereeMoney",0)
  1099. try:
  1100. tendereeMoney = float(tendereeMoney)
  1101. except Exception as e:
  1102. tendereeMoney = 0
  1103. if tendereeMoney>0:
  1104. has_budget = True
  1105. if tendereeMoney>100000000 or tendereeMoney<100:
  1106. budget_unexpected = True
  1107. if doctype=="采招数据":
  1108. if has_entity and not one_entity_used:
  1109. if not has_tenderee and docchannel in {"招标公告","中标信息","候选人公示","合同公告","验收合同"}:
  1110. return True,_reason
  1111. if not has_win_tenderer and docchannel in {"中标信息","候选人公示","合同公告","验收合同"}:
  1112. return True,_reason
  1113. if budget_unexpected or winprice_unexpected:
  1114. return True,_reason
  1115. if class_name=="医疗设备" or self.MP.is_medical_product(product):
  1116. _reason = "medical product"
  1117. return True,_reason
  1118. return False,_reason
  1119. def extract_ai_handle(self,_dict,result_queue):
  1120. frame = _dict["frame"]
  1121. conn = _dict["conn"]
  1122. message_id = frame.headers["message-id"]
  1123. subscription = frame.headers.setdefault('subscription', None)
  1124. item = json.loads(frame.body)
  1125. _extract_json = None
  1126. if document_extract2_extract_json in item:
  1127. _extract_json = item.get(document_extract2_extract_json)
  1128. item.pop(document_extract2_extract_json)
  1129. try:
  1130. message_acknowledged = False
  1131. dtmp = Document_tmp(item)
  1132. _tomq = False
  1133. if dtmp.fix_columns(self.ots_client,["status","save"],True):
  1134. if dtmp.getProperties().get("status",0)>=71:
  1135. if dtmp.getProperties().get("save",1)==0:
  1136. message_acknowledged = True
  1137. log("extract_dump_ai of docid:%d"%(item.get(document_docid)))
  1138. ackMsg(conn,message_id,subscription)
  1139. return
  1140. else:
  1141. _tomq = True
  1142. else:
  1143. _tomq = True
  1144. if _tomq:
  1145. aitimes = item.get("aitimes")
  1146. if aitimes is None:
  1147. aitimes = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  1148. item["aitimes"] = aitimes
  1149. if not message_acknowledged:
  1150. item[document_extract2_extract_json] = _extract_json
  1151. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai):
  1152. message_acknowledged = True
  1153. ackMsg(conn,message_id,subscription)
  1154. time.sleep(1)
  1155. return
  1156. else:
  1157. if not timeAdd(aitimes,0,format="%Y-%m-%d %H:%M:%S",minutes=10)<getCurrent_date(format="%Y-%m-%d %H:%M:%S"):
  1158. if not message_acknowledged:
  1159. item[document_extract2_extract_json] = _extract_json
  1160. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai):
  1161. message_acknowledged = True
  1162. ackMsg(conn,message_id,subscription)
  1163. time.sleep(1)
  1164. return
  1165. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  1166. "docid":item.get("docid")})
  1167. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  1168. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  1169. _extract_ai = {}
  1170. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  1171. _text = html2text_with_tablehtml(_dochtmlcon)
  1172. if len(_text)<25000:
  1173. model_name = "ep-20250314164242-jd62g" #1.5pro 32k
  1174. else:
  1175. _text = _text[:100000]
  1176. model_name = "ep-20250212111145-fflr7" #1.5pro 256k
  1177. msg = get_prompt_extract_role(_text)
  1178. #model_name = "ep-20250212111145-fflr7" #1.5pro 256k
  1179. #model_name = "ep-20250314164242-jd62g" #1.5pro 32k
  1180. result = chat_doubao(msg,model_name=model_name)
  1181. _json = get_json_from_text(result)
  1182. if _json is not None:
  1183. try:
  1184. _extract_ai = json.loads(_json)
  1185. except Exception as e:
  1186. pass
  1187. if len(_extract_ai.keys())>0:
  1188. _new_json,_changed = self.merge_json(_extract_json,_json)
  1189. if _changed:
  1190. dtmp.setValue("extract_json_ai",json.dumps(_extract_ai,ensure_ascii=False))
  1191. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1192. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1193. dtmp.update_row(self.ots_client)
  1194. if not dhtml.exists_row(self.ots_client):
  1195. dhtml.update_row(self.ots_client)
  1196. _extract = Document_extract({})
  1197. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  1198. _extract.setValue(document_extract2_docid,item.get(document_docid))
  1199. _extract.setValue(document_extract2_extract_json,_new_json,True)
  1200. _extract.setValue(document_extract2_industry_json,"{}",True)
  1201. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1202. _extract.update_row(self.ots_client)
  1203. log("extract_ai of docid:%d"%(item.get(document_docid)))
  1204. else:
  1205. doc = Document({"partitionkey":item.get("partitionkey"),
  1206. "docid":item.get("docid"),
  1207. "extract_json_ai":_json
  1208. })
  1209. if doc.exists_row(self.ots_client):
  1210. doc.update_row(self.ots_client)
  1211. log("extract_nochange_ai of docid:%d"%(item.get(document_docid)))
  1212. if not message_acknowledged:
  1213. message_acknowledged = True
  1214. ackMsg(conn,message_id,subscription)
  1215. except Exception as e:
  1216. traceback.print_exc()
  1217. if not message_acknowledged:
  1218. ackMsg(conn,message_id,subscription)
  1219. def merge_json(self,extract_json,extract_ai_json):
  1220. def get_ai_money(_text):
  1221. b = re.search(r'[\d,,\.]+[亿万元人民币]+',str(_text))
  1222. if b is not None:
  1223. return b.group()
  1224. def clean_ai_entity(entity,set_entitys):
  1225. if isinstance(entity,str):
  1226. if re.search("(未|无)(明确|提及)|某(部|单位|医院|公司)|\*\*|XX|登录|详见|招标单位",entity) is not None:
  1227. return ""
  1228. if re.search("无|区|县|市|省|某|中心|部|公司",entity) is not None and len(entity)<=5:
  1229. return ""
  1230. # return entity
  1231. for _i in range(len(entity)-5):
  1232. if entity[:len(entity)-_i] in set_entitys:
  1233. return entity[:len(entity)-_i]
  1234. return ""
  1235. def clean_ai_extract(_extract,_extract_ai):
  1236. set_entitys = set()
  1237. set_moneys = set()
  1238. dict_enterprise = _extract.get("dict_enterprise",{})
  1239. for k,v in dict_enterprise.items():
  1240. set_entitys.add(str(k))
  1241. _sum = 0
  1242. for _money in _extract.get("moneys",[]):
  1243. _money = float(_money)
  1244. set_moneys.add(str(_money))
  1245. _sum += _money
  1246. if _sum>0:
  1247. set_moneys.add(str(float(_sum)))
  1248. _sum = 0
  1249. for _money in _extract.get("moneys_attachment",[]):
  1250. _money = float(_money)
  1251. set_moneys.add(str(_money))
  1252. _sum += _money
  1253. if _sum>0:
  1254. set_moneys.add(str(float(_sum)))
  1255. _tenderee_dict = _extract_ai.get("招标信息",{})
  1256. _tenderee_ai = _tenderee_dict.get("招标人名称")
  1257. if _tenderee_ai is not None:
  1258. _tenderee_ai = clean_ai_entity(_tenderee_ai,set_entitys)
  1259. _tenderee_dict["招标人名称"] = _tenderee_ai
  1260. # if _tenderee_ai in set_entitys:
  1261. # _tenderee_dict["招标人名称"] = _tenderee_ai
  1262. # else:
  1263. # _tenderee_dict["招标人名称"] = ""
  1264. _budget = _tenderee_dict.get("项目预算")
  1265. if _budget is not None:
  1266. _budget = getUnifyMoney(str(_budget))
  1267. _budget = str(float(_budget))
  1268. if _budget in set_moneys:
  1269. _tenderee_dict["项目预算"] = _budget
  1270. else:
  1271. _tenderee_dict["项目预算"] = ""
  1272. list_win = _extract_ai.get("中标信息",[])
  1273. for _win_dict_i in range(len(list_win)):
  1274. _win_dict = list_win[_win_dict_i]
  1275. _pack = _win_dict.get("标段号","")
  1276. if _pack=="":
  1277. if len(list_win)>1:
  1278. _pack = "AI_%d"%(_win_dict_i)
  1279. else:
  1280. _pack = "Project"
  1281. _win_money = _win_dict.get("中标金额")
  1282. if str(_win_money).find("%")>=0:
  1283. _win_money = 0
  1284. _win_money = getUnifyMoney(str(_win_money))
  1285. _win_money = str(float(_win_money))
  1286. if _win_money in set_moneys:
  1287. _win_dict["中标金额"] = _win_money
  1288. else:
  1289. _win_dict["中标金额"] = ""
  1290. _win_tenderer = _win_dict.get("中标人名称")
  1291. _win_tenderer = clean_ai_entity(_win_tenderer,set_entitys)
  1292. _win_dict["中标人名称"] = _win_tenderer
  1293. # if _win_tenderer in set_entitys:
  1294. # _win_dict["中标人名称"] = _win_tenderer
  1295. # else:
  1296. # _win_dict["中标人名称"] = ""
  1297. list_product = _extract_ai.get("产品信息",[])
  1298. for product_i in range(len(list_product)):
  1299. product_dict = list_product[product_i]
  1300. uni_price = product_dict.get("单价","")
  1301. quantity = product_dict.get("数量","")
  1302. total_price = product_dict.get("总价","")
  1303. if uni_price is not None and uni_price!="":
  1304. uni_price = getUnifyMoney(str(uni_price))
  1305. uni_price = str(float(uni_price))
  1306. if uni_price in set_moneys:
  1307. product_dict["单价"] = uni_price
  1308. else:
  1309. product_dict["单价"] = ""
  1310. if quantity is not None and quantity!="":
  1311. quantity = getUnifyMoney(str(quantity))
  1312. product_dict["数量"] = quantity
  1313. if total_price is not None and total_price!="":
  1314. total_price = getUnifyMoney(str(total_price))
  1315. total_price = str(float(total_price))
  1316. if total_price in set_moneys:
  1317. product_dict["总价"] = total_price
  1318. else:
  1319. product_dict["总价"] = ""
  1320. _extract = {}
  1321. if extract_json is not None:
  1322. try:
  1323. if isinstance(extract_json,str):
  1324. _extract = json.loads(extract_json)
  1325. else:
  1326. _extract = extract_json
  1327. except Exception as e:
  1328. pass
  1329. if "extract_count" not in _extract:
  1330. _extract["extract_count"] = 0
  1331. _extract_ai = {}
  1332. if extract_ai_json is not None:
  1333. try:
  1334. _extract_ai = json.loads(extract_ai_json)
  1335. except Exception as e:
  1336. pass
  1337. clean_ai_extract(_extract,_extract_ai)
  1338. _product = _extract.get("product",[])
  1339. list_new_product = _extract_ai.get("产品信息",[])
  1340. prem = _extract.get("prem")
  1341. if prem is None:
  1342. _extract["prem"] = {}
  1343. prem = _extract["prem"]
  1344. Project = prem.get("Project")
  1345. if Project is None:
  1346. prem["Project"] = {}
  1347. Project = prem["Project"]
  1348. Project_rolelist = Project.get("roleList")
  1349. if Project_rolelist is None:
  1350. Project["roleList"] = []
  1351. Project_rolelist = Project["roleList"]
  1352. has_tenderee = False
  1353. has_win_tenderer = False
  1354. has_budget = False
  1355. budget_unexpected = False
  1356. winprice_unexpected = False
  1357. for _pack,_pack_value in prem.items():
  1358. _rolelist = _pack_value.get("roleList",[])
  1359. for _role in _rolelist:
  1360. if _role.get("role_name","")=="tenderee":
  1361. has_tenderee = True
  1362. if _role.get("role_name","")=="win_tenderer":
  1363. has_win_tenderer = True
  1364. win_price = _role.get("role_money",{}).get("money",0)
  1365. try:
  1366. win_price = float(win_price)
  1367. except Exception as e:
  1368. win_price = 0
  1369. if win_price>0:
  1370. if win_price>100000000 or win_price<100:
  1371. winprice_unexpected = True
  1372. tendereeMoney = _pack_value.get("tendereeMoney",0)
  1373. try:
  1374. tendereeMoney = float(tendereeMoney)
  1375. except Exception as e:
  1376. tendereeMoney = 0
  1377. if tendereeMoney>0:
  1378. has_budget = True
  1379. if tendereeMoney>100000000 or tendereeMoney<100:
  1380. budget_unexpected = True
  1381. _changed = False
  1382. prem_original = {}
  1383. product_attrs_original = {}
  1384. try:
  1385. prem_original = json.loads(json.dumps(_extract.get("prem",{})))
  1386. product_attrs_original = json.loads(json.dumps(_extract.get("product_attrs",{})))
  1387. except Exception as e:
  1388. pass
  1389. if len(list_new_product)>0 and len(list_new_product)>product_attrs_original.get("data",[])//3:
  1390. set_product = set(_product)
  1391. product_attrs_new = {"data":[]}
  1392. for product_i in range(len(list_new_product)):
  1393. brand = list_new_product[product_i].get("品牌","")
  1394. product = list_new_product[product_i].get("产品名称","")
  1395. quantity = list_new_product[product_i].get("数量","")
  1396. quantity_unit = list_new_product[product_i].get("数量单位","")
  1397. specs = list_new_product[product_i].get("规格型号","")
  1398. uni_price = list_new_product[product_i].get("单价","")
  1399. total_price = list_new_product[product_i].get("总价","")
  1400. pinmu_no = list_new_product[product_i].get("品目编号","")
  1401. pinmu_name = list_new_product[product_i].get("品目名称","")
  1402. product_attrs_new["data"].append({
  1403. "brand": str(brand),
  1404. "product": product,
  1405. "quantity": str(quantity),
  1406. "quantity_unit": quantity_unit,
  1407. "specs": str(specs),
  1408. "unitPrice": str(uni_price),
  1409. "parameter": "",
  1410. "total_price": str(total_price),
  1411. "pinmu_no": str(pinmu_no),
  1412. "pinmu_name": str(pinmu_name)
  1413. })
  1414. if product not in set_product:
  1415. set_product.add(product)
  1416. _changed = True
  1417. _extract["product"] = list(set_product)
  1418. _extract["product_attrs"] = product_attrs_new
  1419. _extract["extract_count"] += 3
  1420. if not has_tenderee:
  1421. _tenderee_ai = _extract_ai.get("招标信息",{}).get("招标人名称")
  1422. _contacts = _extract_ai.get("招标信息",{}).get("招标人联系方式",[])
  1423. _budget = _extract_ai.get("招标信息",{}).get("项目预算","")
  1424. _linklist = []
  1425. for _conta in _contacts:
  1426. _person = _conta.get("联系人","")
  1427. _phone = _conta.get("联系电话","")
  1428. if _person!="" or _phone!="":
  1429. _linklist.append([_person,_phone])
  1430. if _tenderee_ai is not None and _tenderee_ai!="" and len(_tenderee_ai)>=4:
  1431. _role_dict = {
  1432. "role_name": "tenderee",
  1433. "role_text": _tenderee_ai,
  1434. "from_ai":True
  1435. }
  1436. if len(_linklist)>0:
  1437. _role_dict["linklist"] = _linklist
  1438. Project_rolelist.append(_role_dict)
  1439. _changed = True
  1440. _extract["extract_count"] += 1
  1441. if not has_budget or budget_unexpected:
  1442. _budget = _extract_ai.get("招标信息",{}).get("项目预算","")
  1443. if _budget is not None and _budget!="":
  1444. _budget = getUnifyMoney(_budget)
  1445. if _budget>0:
  1446. if "tendereeMoney" in Project:
  1447. Project["tendereeMoney_original"] = Project["tendereeMoney"]
  1448. Project["tendereeMoney"] = str(float(_budget))
  1449. _changed = True
  1450. _extract["extract_count"] += 1
  1451. else:
  1452. if budget_unexpected:
  1453. if "tendereeMoney" in Project:
  1454. Project["tendereeMoney_original"] = Project["tendereeMoney"]
  1455. Project["tendereeMoney"] = "0"
  1456. if not has_win_tenderer or winprice_unexpected:
  1457. list_win = _extract_ai.get("中标信息",[])
  1458. if len(list_win)>0:
  1459. winprice_unexpected_new = False
  1460. for _win_dict_i in range(len(list_win)):
  1461. _win_dict = list_win[_win_dict_i]
  1462. _pack = _win_dict.get("标段号","")
  1463. if _pack=="":
  1464. if len(list_win)>1:
  1465. _pack = "AI_%d"%(_win_dict_i)
  1466. else:
  1467. _pack = "Project"
  1468. _win_money = _win_dict.get("中标金额")
  1469. if _win_money is not None and _win_money!="":
  1470. _win_money = getUnifyMoney(_win_money)
  1471. else:
  1472. _win_money = 0
  1473. if _win_money>0:
  1474. if _win_money>100000000 or _win_money<100:
  1475. winprice_unexpected_new = True
  1476. has_delete_win = False
  1477. if winprice_unexpected and not winprice_unexpected_new:
  1478. has_delete_win = True
  1479. pop_packs = []
  1480. for _pack,_pack_value in prem.items():
  1481. _rolelist = _pack_value.get("roleList",[])
  1482. new_rolelist = []
  1483. for _role in _rolelist:
  1484. if _role.get("role_name","")!="win_tenderer":
  1485. new_rolelist.append(_role)
  1486. _pack_value["roleList"] = new_rolelist
  1487. if len(new_rolelist)==0 and _pack!="Project":
  1488. pop_packs.append(_pack)
  1489. for _pack in pop_packs:
  1490. prem.pop(_pack)
  1491. if not has_win_tenderer or has_delete_win:
  1492. Project_rolelist = Project.get("roleList")
  1493. if Project_rolelist is None:
  1494. Project["roleList"] = []
  1495. Project_rolelist = Project["roleList"]
  1496. for _win_dict_i in range(len(list_win)):
  1497. _win_dict = list_win[_win_dict_i]
  1498. _pack = _win_dict.get("标段号","")
  1499. if _pack=="":
  1500. if len(list_win)>1:
  1501. _pack = "AI_%d"%(_win_dict_i)
  1502. else:
  1503. _pack = "Project"
  1504. _win_money = _win_dict.get("中标金额")
  1505. if _win_money is not None and _win_money!="":
  1506. _win_money = getUnifyMoney(_win_money)
  1507. else:
  1508. _win_money = 0
  1509. _win_tenderer = _win_dict.get("中标人名称")
  1510. if _win_tenderer!="" and len(_win_tenderer)>=4:
  1511. _role_dict = {
  1512. "role_name": "win_tenderer",
  1513. "role_text": _win_tenderer,
  1514. "winprice_unexpected":winprice_unexpected,
  1515. "from_ai":True
  1516. }
  1517. _role_dict["role_money"] = {
  1518. "money": str(float(_win_money))
  1519. }
  1520. _changed = True
  1521. _extract["extract_count"] += 2
  1522. if _pack=="Project":
  1523. Project_rolelist.append(_role_dict)
  1524. else:
  1525. prem[_pack] = {
  1526. "roleList":[
  1527. _role_dict
  1528. ]
  1529. }
  1530. if _changed:
  1531. _extract["extract_ai"] = True
  1532. _extract["prem_original"] = prem_original
  1533. _extract["product_attrs_original"] = product_attrs_original
  1534. return json.dumps(_extract,ensure_ascii=False),_changed
  1535. def delete_document_extract(self,save_count=70*10000):
  1536. conn = self.pool_postgres.getConnector()
  1537. try:
  1538. cursor = conn.cursor()
  1539. sql = " select max(docid),min(docid) from document_extract "
  1540. cursor.execute(sql)
  1541. rows = cursor.fetchall()
  1542. if len(rows)>0:
  1543. maxdocid,mindocid = rows[0]
  1544. d_mindocid = int(maxdocid)-save_count
  1545. if mindocid<d_mindocid:
  1546. sql = " delete from document_extract where docid<%d"%d_mindocid
  1547. cursor.execute(sql)
  1548. conn.commit()
  1549. except Exception as e:
  1550. traceback.print_exc()
  1551. finally:
  1552. self.pool_postgres.putConnector(conn)
  1553. def start_flow_extract(self):
  1554. schedule = BlockingScheduler()
  1555. schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
  1556. schedule.add_job(self.process_extract_failed,"cron",minute="*/5")
  1557. # schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
  1558. # schedule.add_job(self.monitor_listener,"cron",minute="*/5")
  1559. schedule.start()
  1560. from multiprocessing import RLock
  1561. docid_lock = RLock()
  1562. conn_mysql = None
  1563. def generateRangeDocid(nums):
  1564. global conn_mysql
  1565. while 1:
  1566. try:
  1567. with docid_lock:
  1568. if conn_mysql is None:
  1569. conn_mysql = getConnection_mysql()
  1570. cursor = conn_mysql.cursor()
  1571. sql = "select serial_value from b2c_serial_no where serial_name='DocumentIdSerial'"
  1572. cursor.execute(sql)
  1573. rows = cursor.fetchall()
  1574. current_docid = rows[0][0]
  1575. next_docid = current_docid+1
  1576. update_docid = current_docid+nums
  1577. sql = " update b2c_serial_no set serial_value=%d where serial_name='DocumentIdSerial'"%(update_docid)
  1578. cursor.execute(sql)
  1579. conn_mysql.commit()
  1580. return next_docid
  1581. except Exception as e:
  1582. conn_mysql = getConnection_mysql()
  1583. # 自定义jsonEncoder
  1584. class MyEncoder(json.JSONEncoder):
  1585. def default(self, obj):
  1586. if isinstance(obj, np.ndarray):
  1587. return obj.tolist()
  1588. elif isinstance(obj, bytes):
  1589. return str(obj, encoding='utf-8')
  1590. elif isinstance(obj, (np.float_, np.float16, np.float32,
  1591. np.float64,Decimal)):
  1592. return float(obj)
  1593. elif isinstance(obj,str):
  1594. return obj
  1595. return json.JSONEncoder.default(self, obj)
  1596. class Dataflow_init(Dataflow):
  1597. class InitListener():
  1598. def __init__(self,conn,*args,**kwargs):
  1599. self.conn = conn
  1600. self.get_count = 1000
  1601. self.count = self.get_count
  1602. self.begin_docid = None
  1603. self.mq_init = "/queue/dataflow_init"
  1604. self.mq_attachment = "/queue/dataflow_attachment"
  1605. self.mq_extract = "/queue/dataflow_extract"
  1606. self.pool_mq1 = ConnectorPool(1,4,getConnect_activateMQ)
  1607. def on_error(self, headers):
  1608. log('received an error %s' % headers.body)
  1609. def getRangeDocid(self):
  1610. begin_docid = generateRangeDocid(self.get_count)
  1611. self.begin_docid = begin_docid
  1612. self.count = 0
  1613. def getNextDocid(self):
  1614. if self.count>=self.get_count:
  1615. self.getRangeDocid()
  1616. next_docid = self.begin_docid+self.count
  1617. self.count += 1
  1618. return next_docid
  1619. def on_message(self, headers):
  1620. try:
  1621. next_docid = int(self.getNextDocid())
  1622. partitionkey = int(next_docid%500+1)
  1623. message_id = headers.headers["message-id"]
  1624. body = json.loads(headers.body)
  1625. body[document_tmp_partitionkey] = partitionkey
  1626. body[document_tmp_docid] = next_docid
  1627. if body.get(document_original_docchannel) is None:
  1628. body[document_original_docchannel] = body.get(document_docchannel)
  1629. page_attachments = body.get(document_tmp_attachment_path,"[]")
  1630. _uuid = body.get(document_tmp_uuid,"")
  1631. if page_attachments!="[]":
  1632. status = random.randint(1,10)
  1633. body[document_tmp_status] = status
  1634. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment):
  1635. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  1636. ackMsg(self.conn,message_id)
  1637. else:
  1638. log("send_msg_error on init listener")
  1639. else:
  1640. status = random.randint(11,50)
  1641. body[document_tmp_status] = status
  1642. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract):
  1643. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  1644. ackMsg(self.conn,message_id)
  1645. else:
  1646. log("send_msg_error on init listener")
  1647. except Exception as e:
  1648. traceback.print_exc()
  1649. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_init):
  1650. log("init error")
  1651. ackMsg(self.conn,message_id)
  1652. def __del__(self):
  1653. self.conn.disconnect()
  1654. del self.pool_mq1
  1655. def __init__(self):
  1656. Dataflow.__init__(self)
  1657. self.max_shenpi_id = None
  1658. self.base_shenpi_id = 400000000000
  1659. self.mq_init = "/queue/dataflow_init"
  1660. self.mq_attachment = "/queue/dataflow_attachment"
  1661. self.mq_extract = "/queue/dataflow_extract"
  1662. self.pool_oracle = ConnectorPool(10,15,getConnection_oracle)
  1663. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  1664. self.ots_capacity = getConnect_ots_capacity()
  1665. self.init_comsumer_counts = 2
  1666. self.list_init_comsumer = []
  1667. for i in range(self.init_comsumer_counts):
  1668. listener = self.InitListener(getConnect_activateMQ())
  1669. createComsumer(listener,self.mq_init)
  1670. self.list_init_comsumer.append(listener)
  1671. def monitor_listener(self):
  1672. for i in range(len(self.list_init_comsumer)):
  1673. if self.list_init_comsumer[i].conn.is_connected():
  1674. continue
  1675. else:
  1676. listener = self.InitListener(getConnect_activateMQ())
  1677. createComsumer(listener,self.mq_init)
  1678. self.list_init_comsumer[i] = listener
  1679. def temp2mq(self,object):
  1680. conn_oracle = self.pool_oracle.getConnector()
  1681. try:
  1682. list_obj = object.select_rows(conn_oracle,type(object),object.table_name,[])
  1683. for _obj in list_obj:
  1684. ots_dict = _obj.getProperties_ots()
  1685. if len(ots_dict.get("dochtmlcon",""))>500000:
  1686. _obj.delete_row(conn_oracle)
  1687. log("msg too long:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  1688. continue
  1689. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_init):
  1690. #删除数据,上线放开
  1691. _obj.delete_row(conn_oracle)
  1692. else:
  1693. log("send_msg_error111:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  1694. self.pool_oracle.putConnector(conn_oracle)
  1695. except Exception as e:
  1696. traceback.print_exc()
  1697. self.pool_oracle.decrease()
  1698. def shenpi2mq(self):
  1699. conn_oracle = self.pool_oracle.getConnector()
  1700. try:
  1701. if self.max_shenpi_id is None:
  1702. # get the max_shenpi_id
  1703. _query = BoolQuery(must_queries=[ExistsQuery("id")])
  1704. rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
  1705. SearchQuery(_query,sort=Sort(sorters=[FieldSort("id",SortOrder.DESC)]),limit=1))
  1706. list_data = getRow_ots(rows)
  1707. if len(list_data)>0:
  1708. max_shenpi_id = list_data[0].get("id")
  1709. if max_shenpi_id>self.base_shenpi_id:
  1710. max_shenpi_id -= self.base_shenpi_id
  1711. self.max_shenpi_id = max_shenpi_id
  1712. if self.max_shenpi_id<60383953:
  1713. self.max_shenpi_id = 60383953
  1714. if self.max_shenpi_id is not None:
  1715. # select data in order
  1716. origin_max_shenpi_id = T_SHEN_PI_XIANG_MU.get_max_id(conn_oracle)
  1717. if origin_max_shenpi_id is not None:
  1718. log("shenpi origin_max_shenpi_id:%d current_id:%d"%(origin_max_shenpi_id,self.max_shenpi_id))
  1719. for _id_i in range(self.max_shenpi_id+1,origin_max_shenpi_id+1):
  1720. list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i)
  1721. # send data to mq one by one with max_shenpi_id updated
  1722. for _data in list_data:
  1723. _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
  1724. ots_dict = _data.getProperties_ots()
  1725. if ots_dict["docid"]<self.base_shenpi_id:
  1726. ots_dict["docid"] += self.base_shenpi_id
  1727. ots_dict["partitionkey"] = ots_dict["docid"]%500+1
  1728. if ots_dict.get(T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS,"") !='[]':
  1729. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_attachment):
  1730. self.max_shenpi_id = _id
  1731. else:
  1732. log("sent shenpi message to mq failed %s"%(_id))
  1733. break
  1734. else:
  1735. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_extract):
  1736. self.max_shenpi_id = _id
  1737. else:
  1738. log("sent shenpi message to mq failed %s"%(_id))
  1739. break
  1740. self.pool_oracle.putConnector(conn_oracle)
  1741. except Exception as e:
  1742. log("shenpi error")
  1743. traceback.print_exc()
  1744. self.pool_oracle.decrease()
  1745. def fix_shenpi(self):
  1746. pool_oracle = ConnectorPool(10,15,getConnection_oracle)
  1747. begin_id = 0
  1748. end_id = 64790010
  1749. thread_num = 15
  1750. step = (end_id-begin_id)//thread_num
  1751. list_items = []
  1752. for _i in range(thread_num):
  1753. _begin = _i*step
  1754. _end = (_i+1)*step-1
  1755. if _i==thread_num-1:
  1756. _end = end_id
  1757. list_items.append((_begin,_end,_i))
  1758. task_queue = Queue()
  1759. for item in list_items:
  1760. task_queue.put(item)
  1761. fix_count_list = []
  1762. def _handle(item,result_queue):
  1763. conn_oracle = pool_oracle.getConnector()
  1764. (begin_id,end_id,thread_id) = item
  1765. _count = 0
  1766. for _id_i in range(begin_id,end_id):
  1767. try:
  1768. bool_query = BoolQuery(must_queries=[
  1769. TermQuery("docchannel",302),
  1770. TermQuery("original_id",_id_i)
  1771. ])
  1772. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1773. SearchQuery(bool_query,get_total_count=True))
  1774. if total_count>0:
  1775. continue
  1776. # bool_query = BoolQuery(must_queries=[
  1777. # TermQuery("id",_id_i),
  1778. # ])
  1779. # rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
  1780. # SearchQuery(bool_query,get_total_count=True))
  1781. # if total_count>0:
  1782. # continue
  1783. try:
  1784. list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i)
  1785. except Exception as e:
  1786. continue
  1787. # send data to mq one by one with max_shenpi_id updated
  1788. for _data in list_data:
  1789. _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
  1790. ots_dict = _data.getProperties_ots()
  1791. if ots_dict["docid"]<self.base_shenpi_id:
  1792. ots_dict["docid"] += self.base_shenpi_id
  1793. ots_dict["partitionkey"] = ots_dict["docid"]%500+1
  1794. ots_dict["status"] = 201
  1795. dict_1 = {}
  1796. dict_2 = {}
  1797. for k,v in ots_dict.items():
  1798. if k!="dochtmlcon":
  1799. dict_1[k] = v
  1800. if k in ('partitionkey',"docid","dochtmlcon"):
  1801. dict_2[k] = v
  1802. d_1 = Document(dict_1)
  1803. d_2 = Document(dict_2)
  1804. d_1.update_row(self.ots_client)
  1805. d_2.update_row(self.ots_capacity)
  1806. _count += 1
  1807. except Exception as e:
  1808. traceback.print_exc()
  1809. log("thread_id:%d=%d/%d/%d"%(thread_id,_id_i-begin_id,_count,end_id-begin_id))
  1810. fix_count_list.append(_count)
  1811. pool_oracle.putConnector(conn_oracle)
  1812. mt = MultiThreadHandler(task_queue,_handle,None,thread_count=thread_num)
  1813. mt.run()
  1814. print(fix_count_list,sum(fix_count_list))
  1815. def ots2mq(self):
  1816. try:
  1817. from BaseDataMaintenance.java.MQInfo import getQueueSize
  1818. attachment_size = getQueueSize("dataflow_attachment")
  1819. extract_size = getQueueSize("dataflow_extract")
  1820. if attachment_size>1000 or extract_size>1000:
  1821. log("ots2mq break because of long queue size")
  1822. return
  1823. bool_query = BoolQuery(must_queries=[RangeQuery("status",1,51)])
  1824. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1825. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  1826. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1827. list_data = getRow_ots(rows)
  1828. task_queue = Queue()
  1829. for _data in list_data:
  1830. task_queue.put(_data)
  1831. while next_token:
  1832. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1833. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1834. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1835. list_data = getRow_ots(rows)
  1836. for _data in list_data:
  1837. task_queue.put(_data)
  1838. if task_queue.qsize()>=1000:
  1839. break
  1840. def _handle(_data,result_queue):
  1841. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1842. document_tmp_docid:_data.get(document_tmp_docid),
  1843. document_tmp_status:0}
  1844. _document = Document(_d)
  1845. _document.fix_columns(self.ots_client,None,True)
  1846. _data = _document.getProperties()
  1847. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1848. _document_html = Document(_data)
  1849. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  1850. if page_attachments!="[]":
  1851. status = random.randint(1,10)
  1852. _data[document_tmp_status] = status
  1853. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1854. else:
  1855. status = random.randint(11,50)
  1856. _data[document_tmp_status] = status
  1857. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1858. if send_succeed:
  1859. _document.setValue(document_tmp_status,0,True)
  1860. _document.update_row(self.ots_client)
  1861. else:
  1862. log("send_msg_error2222")
  1863. if task_queue.qsize()>0:
  1864. mt = MultiThreadHandler(task_queue,_handle,None,15)
  1865. mt.run()
  1866. except Exception as e:
  1867. traceback.print_exc()
  1868. def otstmp2mq(self):
  1869. try:
  1870. from BaseDataMaintenance.java.MQInfo import getQueueSize
  1871. attachment_size = getQueueSize("dataflow_attachment")
  1872. extract_size = getQueueSize("dataflow_extract")
  1873. if attachment_size>1000 or extract_size>1000:
  1874. log("otstmp2mq break because of long queue size")
  1875. return
  1876. bool_query = BoolQuery(must_queries=[TermQuery("status",0)])
  1877. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1878. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  1879. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1880. list_data = getRow_ots(rows)
  1881. for _data in list_data:
  1882. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1883. document_tmp_docid:_data.get(document_tmp_docid),
  1884. document_tmp_status:0}
  1885. _document = Document_tmp(_d)
  1886. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1887. log("refix doc %s from document_tmp"%(str(_data.get(document_tmp_docid))))
  1888. _document_html = Document_html(_data)
  1889. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  1890. if page_attachments!="[]":
  1891. status = random.randint(1,10)
  1892. _data[document_tmp_status] = status
  1893. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1894. else:
  1895. status = random.randint(11,50)
  1896. _data[document_tmp_status] = status
  1897. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1898. if send_succeed:
  1899. _document.setValue(document_tmp_status,1,True)
  1900. _document.update_row(self.ots_client)
  1901. else:
  1902. log("send_msg_error2222")
  1903. while next_token:
  1904. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1905. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1906. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1907. list_data = getRow_ots(rows)
  1908. for _data in list_data:
  1909. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1910. document_tmp_docid:_data.get(document_tmp_docid),
  1911. document_tmp_status:0}
  1912. _document = Document_tmp(_d)
  1913. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1914. _document_html = Document_html(_data)
  1915. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  1916. if page_attachments!="[]":
  1917. status = random.randint(1,10)
  1918. _data[document_tmp_status] = status
  1919. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1920. else:
  1921. status = random.randint(11,50)
  1922. _data[document_tmp_status] = status
  1923. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1924. if send_succeed:
  1925. _document.setValue(document_tmp_status,1,True)
  1926. _document.update_row(self.ots_client)
  1927. else:
  1928. log("send_msg_error2222")
  1929. except Exception as e:
  1930. traceback.print_exc()
  1931. def test_dump_docid(self):
  1932. class TestDumpListener(ActiveMQListener):
  1933. def on_message(self, headers):
  1934. message_id = headers.headers["message-id"]
  1935. body = headers.body
  1936. self._queue.put(headers,True)
  1937. ackMsg(self.conn,message_id)
  1938. _queue = Queue()
  1939. listener1 = TestDumpListener(getConnect_activateMQ(),_queue)
  1940. listener2 = TestDumpListener(getConnect_activateMQ(),_queue)
  1941. createComsumer(listener1,"/queue/dataflow_attachment")
  1942. createComsumer(listener2,"/queue/dataflow_extract")
  1943. time.sleep(10)
  1944. list_item = []
  1945. list_docid = []
  1946. while 1:
  1947. try:
  1948. _item = _queue.get(timeout=2)
  1949. list_item.append(_item)
  1950. except Exception as e:
  1951. break
  1952. for item in list_item:
  1953. _item = json.loads(item.body)
  1954. list_docid.append(_item.get("docid"))
  1955. log(list_docid[:10])
  1956. log("len docid:%d set len:%d"%(len(list_docid),len(set(list_docid))))
  1957. def start_dataflow_init(self):
  1958. # self.test_dump_docid()
  1959. from BaseDataMaintenance.model.oracle.CaiGouYiXiangTemp import CaiGouYiXiangTemp
  1960. from BaseDataMaintenance.model.oracle.PaiMaiChuRangTemp import PaiMaiChuRangTemp
  1961. from BaseDataMaintenance.model.oracle.ZhaoBiaoGongGaoTemp import ZhaoBiaoGongGaoTemp
  1962. from BaseDataMaintenance.model.oracle.ZhaoBiaoYuGaoTemp import ZhaoBiaoYuGaoTemp
  1963. from BaseDataMaintenance.model.oracle.ZhongBiaoXinXiTemp import ZhongBiaoXinXiTemp
  1964. from BaseDataMaintenance.model.oracle.ZiShenJieGuoTemp import ZiShenJieGuoTemp
  1965. from BaseDataMaintenance.model.oracle.ChanQuanJiaoYiTemp import ChanQuanJiaoYiTemp
  1966. from BaseDataMaintenance.model.oracle.GongGaoBianGengTemp import GongGaoBianGeng
  1967. from BaseDataMaintenance.model.oracle.KongZhiJiaTemp import KongZhiJiaTemp
  1968. from BaseDataMaintenance.model.oracle.TuDiKuangChanTemp import TuDiKuangChanTemp
  1969. from BaseDataMaintenance.model.oracle.ZhaoBiaoDaYiTemp import ZhaoBiaoDaYiTemp
  1970. from BaseDataMaintenance.model.oracle.ZhaoBiaoWenJianTemp import ZhaoBiaoWenJianTemp
  1971. from BaseDataMaintenance.model.oracle.TouSuChuLiTemp import TouSuChuLiTemp
  1972. from BaseDataMaintenance.model.oracle.WeiFaJiLuTemp import WeiFaJiLuTemp
  1973. from BaseDataMaintenance.model.oracle.QiTaShiXinTemp import QiTaShiXin
  1974. schedule = BlockingScheduler()
  1975. schedule.add_job(self.temp2mq,"cron",args=(CaiGouYiXiangTemp({}),),second="*/10")
  1976. schedule.add_job(self.temp2mq,"cron",args=(PaiMaiChuRangTemp({}),),second="*/10")
  1977. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoGongGaoTemp({}),),second="*/10")
  1978. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoYuGaoTemp({}),),second="*/10")
  1979. schedule.add_job(self.temp2mq,"cron",args=(ZhongBiaoXinXiTemp({}),),second="*/10")
  1980. schedule.add_job(self.temp2mq,"cron",args=(ZiShenJieGuoTemp({}),),second="*/10")
  1981. schedule.add_job(self.temp2mq,"cron",args=(ChanQuanJiaoYiTemp({}),),second="*/10")
  1982. schedule.add_job(self.temp2mq,"cron",args=(GongGaoBianGeng({}),),second="*/10")
  1983. schedule.add_job(self.temp2mq,"cron",args=(KongZhiJiaTemp({}),),second="*/10")
  1984. schedule.add_job(self.temp2mq,"cron",args=(TuDiKuangChanTemp({}),),second="*/10")
  1985. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10")
  1986. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10")
  1987. schedule.add_job(self.temp2mq,"cron",args=(TouSuChuLiTemp({}),),second="*/10")
  1988. schedule.add_job(self.temp2mq,"cron",args=(WeiFaJiLuTemp({}),),second="*/10")
  1989. schedule.add_job(self.temp2mq,"cron",args=(QiTaShiXin({}),),second="*/10")
  1990. schedule.add_job(self.ots2mq,"cron",second="*/10")
  1991. schedule.add_job(self.otstmp2mq,"cron",second="*/10")
  1992. schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  1993. schedule.add_job(self.shenpi2mq,"cron",minute="*/1")
  1994. schedule.start()
  1995. def transform_attachment():
  1996. from BaseDataMaintenance.model.ots.attachment import attachment
  1997. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  1998. from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots
  1999. from threading import Thread
  2000. from queue import Queue
  2001. task_queue = Queue()
  2002. def comsumer(task_queue,pool_postgres):
  2003. while 1:
  2004. _dict = task_queue.get(True)
  2005. try:
  2006. attach = Attachment_postgres(_dict)
  2007. if not attach.exists(pool_postgres):
  2008. attach.insert_row(pool_postgres)
  2009. except Exception as e:
  2010. traceback.print_exc()
  2011. def start_comsumer(task_queue):
  2012. pool_postgres = ConnectorPool(10,30,getConnection_postgres)
  2013. comsumer_count = 30
  2014. list_thread = []
  2015. for i in range(comsumer_count):
  2016. _t = Thread(target=comsumer,args=(task_queue,pool_postgres))
  2017. list_thread.append(_t)
  2018. for _t in list_thread:
  2019. _t.start()
  2020. ots_client = getConnect_ots()
  2021. _thread = Thread(target=start_comsumer,args=(task_queue,))
  2022. _thread.start()
  2023. bool_query = BoolQuery(must_queries=[
  2024. RangeQuery(attachment_crtime,"2022-05-06"),
  2025. BoolQuery(should_queries=[TermQuery(attachment_status,10),
  2026. TermQuery(attachment_status,ATTACHMENT_TOOLARGE)])
  2027. ])
  2028. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  2029. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(attachment_crtime,sort_order=SortOrder.DESC)]),get_total_count=True,limit=100),
  2030. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  2031. list_dict = getRow_ots(rows)
  2032. for _dict in list_dict:
  2033. task_queue.put(_dict,True)
  2034. _count = len(list_dict)
  2035. while next_token:
  2036. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  2037. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  2038. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  2039. list_dict = getRow_ots(rows)
  2040. for _dict in list_dict:
  2041. task_queue.put(_dict,True)
  2042. _count += len(list_dict)
  2043. print("%d/%d,queue:%d"%(_count,total_count,task_queue.qsize()))
  2044. def del_test_doc():
  2045. ots_client = getConnect_ots()
  2046. bool_query = BoolQuery(must_queries=[RangeQuery("docid",range_to=0)])
  2047. list_data = []
  2048. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  2049. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  2050. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  2051. print(total_count)
  2052. list_row = getRow_ots(rows)
  2053. list_data.extend(list_row)
  2054. while next_token:
  2055. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  2056. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  2057. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  2058. list_row = getRow_ots(rows)
  2059. list_data.extend(list_row)
  2060. for _row in list_data:
  2061. _doc = Document_tmp(_row)
  2062. _doc.delete_row(ots_client)
  2063. _html = Document_html(_row)
  2064. if _html.exists_row(ots_client):
  2065. _html.delete_row(ots_client)
  2066. def fixDoc_to_queue_extract():
  2067. pool_mq = ConnectorPool(10,20,getConnect_activateMQ)
  2068. try:
  2069. ots_client = getConnect_ots()
  2070. ots_capacity = getConnect_ots_capacity()
  2071. bool_query = BoolQuery(must_queries=[
  2072. RangeQuery("crtime","2022-05-31"),
  2073. TermQuery("docchannel",114)
  2074. ])
  2075. list_data = []
  2076. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  2077. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  2078. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  2079. print(total_count)
  2080. list_row = getRow_ots(rows)
  2081. list_data.extend(list_row)
  2082. _count = len(list_row)
  2083. while next_token:
  2084. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  2085. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  2086. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  2087. list_row = getRow_ots(rows)
  2088. list_data.extend(list_row)
  2089. _count = len(list_data)
  2090. print("%d/%d"%(_count,total_count))
  2091. task_queue = Queue()
  2092. for _row in list_data:
  2093. if "all_columns" in _row:
  2094. _row.pop("all_columns")
  2095. _html = Document(_row)
  2096. task_queue.put(_html)
  2097. def _handle(item,result_queue):
  2098. _html = item
  2099. _html.fix_columns(ots_capacity,["dochtmlcon"],True)
  2100. print(_html.getProperties().get(document_tmp_docid))
  2101. send_msg_toacmq(pool_mq,json.dumps(_html.getProperties()),"/queue/dataflow_extract")
  2102. mt = MultiThreadHandler(task_queue,_handle,None,30)
  2103. mt.run()
  2104. except Exception as e:
  2105. traceback.print_exc()
  2106. finally:
  2107. pool_mq.destory()
  2108. def check_data_synchronization():
  2109. # filepath = "C:\\Users\\Administrator\\Desktop\\to_check.log"
  2110. # list_uuid = []
  2111. # _regrex = "delete\s+(?P<tablename>[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  2112. # with open(filepath,"r",encoding="utf8") as f:
  2113. # while 1:
  2114. # _line = f.readline()
  2115. # if not _line:
  2116. # break
  2117. # _match = re.search(_regrex,_line)
  2118. # if _match is not None:
  2119. # _uuid = _match.groupdict().get("uuid")
  2120. # tablename = _match.groupdict.get("tablename")
  2121. # if _uuid is not None:
  2122. # list_uuid.append({"uuid":_uuid,"tablename":tablename})
  2123. # print("total_count:",len(list_uuid))
  2124. import pandas as pd
  2125. from BaseDataMaintenance.common.Utils import load
  2126. task_queue = Queue()
  2127. list_data = []
  2128. df_data = load("uuid.pk")
  2129. # df_data = pd.read_excel("check.xlsx")
  2130. for uuid,tablename in zip(df_data["uuid"],df_data["tablename"]):
  2131. _dict = {"uuid":uuid,
  2132. "tablename":tablename}
  2133. list_data.append(_dict)
  2134. task_queue.put(_dict)
  2135. print("qsize:",task_queue.qsize())
  2136. ots_client = getConnect_ots()
  2137. def _handle(_item,result_queue):
  2138. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  2139. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  2140. SearchQuery(bool_query,get_total_count=True),
  2141. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  2142. _item["exists"] = total_count
  2143. mt = MultiThreadHandler(task_queue,_handle,None,30)
  2144. mt.run()
  2145. df_data = {"uuid":[],
  2146. "tablename":[],
  2147. "exists":[]}
  2148. for _data in list_data:
  2149. if _data["exists"]==0:
  2150. for k,v in df_data.items():
  2151. v.append(_data.get(k))
  2152. import pandas as pd
  2153. df2 = pd.DataFrame(df_data)
  2154. df2.to_excel("check1.xlsx")
  2155. current_path = os.path.abspath(os.path.dirname(__file__))
  2156. def test_ai_extract():
  2157. _dochtmlcon = '''
  2158. <div>
  2159. <div>
  2160. 公告内容:
  2161. </div>
  2162. <div>
  2163. <p>宫腔镜</p>
  2164. <div>
  2165. <p> </p>
  2166. <p> <span>440101-2025-07530</span> </p>
  2167. </div>
  2168. <div></div>
  2169. <div>
  2170. <div>
  2171. <div>
  2172. <div>
  2173. <table>
  2174. <tbody>
  2175. <tr>
  2176. <td> 一、采购人: <a target="_blank" class="markBlue" href="/bdqyhx/670312265132728320.html" style="color: #3083EB !important;text-decoration: underline;">广州医科大学附属妇女儿童医疗中心</a> </td>
  2177. </tr>
  2178. <tr>
  2179. <td> 二、采购计划编号: 440101-2025-07530 </td>
  2180. </tr>
  2181. <tr>
  2182. <td> 三、采购计划名称: 宫腔镜 </td>
  2183. </tr>
  2184. <tr>
  2185. <td> 四、采购品目名称: 医用内窥镜 </td>
  2186. </tr>
  2187. <tr>
  2188. <td> 五、采购预算金额(元): 1920000.00 </td>
  2189. </tr>
  2190. <tr>
  2191. <td> 六、需求时间: </td>
  2192. </tr>
  2193. <tr>
  2194. <td> 七、采购方式: 公开招标 </td>
  2195. </tr>
  2196. <tr>
  2197. <td> 八、备案时间: 2025-05-06 09:12:11 </td>
  2198. </tr>
  2199. </tbody>
  2200. </table>
  2201. <div>
  2202. <table>
  2203. <tbody>
  2204. <tr>
  2205. <td>发布人:<a target="_blank" class="markBlue" href="/bdqyhx/670312265132728320.html" style="color: #3083EB !important;text-decoration: underline;">广州医科大学附属妇女儿童医疗中心</a></td>
  2206. </tr>
  2207. <tr>
  2208. <td>发布时间: 2025年 05月 06日 </td>
  2209. </tr>
  2210. </tbody>
  2211. </table>
  2212. </div>
  2213. </div>
  2214. </div>
  2215. </div>
  2216. </div>
  2217. </div>
  2218. </div>
  2219. '''
  2220. _extract_json = '''
  2221. { "addr_dic": {}, "aptitude": "", "attachmentTypes": "", "bid_score": [], "bidway": "公开招标", "candidate": "", "code": [ "440101-2025-07530" ], "code_investment": "", "cost_time": { "attrs": 0.07, "codename": 0.03, "deposit": 0.0, "district": 0.12, "kvtree": 0.03, "moneygrade": 0.0, "nerToken": 0.04, "outline": 0.03, "pb_extract": 0.16, "person": 0.0, "prem": 0.04, "preprocess": 0.11, "product": 0.02, "product_attrs": 0.01, "roleRuleFinal": 0.01, "rolegrade": 0.0, "rule": 0.01, "rule_channel": 0.03, "rule_channel2": 0.16, "tableToText": 0.03000166893005371, "tendereeRuleRecall": 0.0, "time": 0.01, "total_unit_money": 0.0 }, "demand_info": { "data": [], "header": [], "header_col": [] }, "deposit_patment_way": "", "dict_enterprise": { "广州医科大学附属妇女儿童医疗中心": { "in_text": 1 } }, "district": { "area": "华南", "city": "广州", "district": "未知", "is_in_text": false, "province": "广东" }, "docchannel": { "docchannel": "招标预告", "doctype": "采招数据", "life_docchannel": "招标预告", "use_original_docchannel": 0 }, "docid": "", "doctitle_refine": "宫腔镜", "exist_table": 1, "extract_count": 4, "fail_reason": "", "fingerprint": "md5=27c02035e60e7cc1b7b8be65eedf92fd", "industry": { "class": "零售批发", "class_name": "医疗设备", "subclass": "专用设备" }, "is_deposit_project": false, "label_dic": {}, "match_enterprise": [], "match_enterprise_type": 0, "moneys": [ 1920000.0 ], "moneys_attachment": [], "moneysource": "", "name": "医用内窥镜", "nlp_enterprise": [ "广州医科大学附属妇女儿童医疗中心" ], "nlp_enterprise_attachment": [], "pb": { "industry": "教育及研究", "project_name_refind": "医用内窥镜", "project_property": "新建" }, "pb_project_name": "医用内窥镜", "person_review": [], "pinmu_name": "医用内窥镜", "policies": [], "prem": { "Project": { "code": "", "name": "医用内窥镜", "roleList": [ { "address": "", "linklist": [], "role_money": { "discount_ratio": "", "downward_floating_ratio": "", "floating_ratio": "", "money": 0, "money_unit": "" }, "role_name": "tenderee", "role_prob": 0.9499999463558197, "role_text": "广州医科大学附属妇女儿童医疗中心", "serviceTime": "" } ], "tendereeMoney": "1920000.00", "tendereeMoneyUnit": "元", "uuid": "b1f25984-d473-4995-aec6-dbdba27fa898" } }, "process_time": "2025-05-06 09:24:32", "product": [ "宫腔镜", "医用内窥镜" ], "product_attrs": { "data": [], "header": [], "header_col": [] }, "project_contacts": [], "project_label": { "标题": { "医疗检测设备": [ [ "宫腔镜", 1 ] ] }, "核心字段": { "医疗检测设备": [ [ "宫腔镜", 2 ], [ "内窥镜", 2 ] ] } }, "property_label": "", "proportion": "", "requirement": "", "serviceTime": { "service_days": 0, "service_end": "", "service_start": "" }, "success": true, "time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_contractEnd": "", "time_contractStart": "", "time_earnestMoneyEnd": "", "time_earnestMoneyStart": "", "time_getFileEnd": "", "time_getFileStart": "", "time_listingEnd": "", "time_listingStart": "", "time_planned": "", "time_publicityEnd": "", "time_publicityStart": "", "time_registrationEnd": "", "time_registrationStart": "", "time_release": "2025-05-06", "time_signContract": "", "total_tendereeMoney": 0, "total_tendereeMoneyUnit": "", "version_date": "2025-04-23", "word_count": { "正文": 211, "附件": 0 } }
  2222. '''
  2223. _text = html2text_with_tablehtml(_dochtmlcon)
  2224. if len(_text)<25000:
  2225. model_name = "ep-20250314164242-jd62g" #1.5pro 32k
  2226. else:
  2227. _text = _text[:100000]
  2228. model_name = "ep-20250212111145-fflr7" #1.5pro 256k
  2229. msg = get_prompt_extract_role(_text)
  2230. #model_name = "ep-20250212111145-fflr7" #1.5pro 256k
  2231. #model_name = "ep-20250314164242-jd62g" #1.5pro 32k
  2232. result = chat_doubao(msg,model_name=model_name)
  2233. _json = get_json_from_text(result)
  2234. if _json is not None:
  2235. try:
  2236. _extract_ai = json.loads(_json)
  2237. except Exception as e:
  2238. pass
  2239. if len(_extract_ai.keys())>0:
  2240. _new_json,_changed = de.merge_json(_extract_json,_json)
  2241. print("new_json")
  2242. print(_new_json)
  2243. if __name__ == '__main__':
  2244. # di = Dataflow_init()
  2245. # di.start_dataflow_init()
  2246. # transform_attachment()
  2247. # del_test_doc()
  2248. de = Dataflow_ActivteMQ_extract(create_listener=False)
  2249. # print(getUnifyMoney('第1 - 5年承包费为1135元/年/亩,第6 - 10年承包费为1235元/年/亩'))
  2250. a = '''
  2251. { "addr_dic": {}, "aptitude": "", "attachmentTypes": "", "bid_score": [], "bidway": "询价", "candidate": "", "code": [ "20250309921" ], "code_investment": "", "cost_time": { "attrs": 0.07, "codename": 0.03, "deposit": 0.0, "district": 0.1, "kvtree": 0.05, "moneygrade": 0.0, "nerToken": 0.06, "outline": 0.02, "pb_extract": 0.14, "person": 0.0, "prem": 0.01, "preprocess": 0.1, "product": 0.03, "product_attrs": 0.0, "roleRuleFinal": 0.01, "rolegrade": 0.0, "rule": 0.02, "rule_channel": 0.02, "rule_channel2": 0.25, "tableToText": 0.04000095367431641, "tendereeRuleRecall": 0.0, "time": 0.0, "total_unit_money": 0.0 }, "demand_info": { "data": [], "header": [], "header_col": [] }, "deposit_patment_way": "", "dict_enterprise": { "五矿铜业(湖南)有限公司": { "credit_code": "91430482081376930E", "in_text": 1 } }, "district": { "area": "华中", "city": "未知", "district": "未知", "is_in_text": false, "province": "湖南" }, "docchannel": { "docchannel": "招标公告", "doctype": "采招数据", "life_docchannel": "招标公告", "use_original_docchannel": 0 }, "docid": "", "doctitle_refine": "湖南有色铜业方闸", "exist_table": 1, "extract_count": 2, "fail_reason": "", "fingerprint": "md5=84d3541612f56fc9dc07b0cd7fa2c644", "industry": { "class": "零售批发", "class_name": "有色金属冶炼及压延产品", "subclass": "建筑建材" }, "is_deposit_project": false, "label_dic": { "is_direct_procurement": 1 }, "match_enterprise": [], "match_enterprise_type": 0, "moneys": [], "moneys_attachment": [], "moneysource": "", "name": "湖南有色铜业方闸采购", "nlp_enterprise": [ "五矿铜业(湖南)有限公司" ], "nlp_enterprise_attachment": [], "pb": { "location": "湖南", "project_name_refind": "湖南有色铜业方闸采购", "project_property": "新建" }, "pb_project_name": "湖南有色铜业方闸采购", "person_review": [], "pinmu_name": "", "policies": [], "prem": {}, "process_time": "2025-04-16 14:58:20", "product": [ "铜业方闸", "手电不锈钢复合式速开方闸门配一体式启闭机" ], "product_attrs": { "data": [], "header": [], "header_col": [] }, "project_contacts": [], "project_label": { "标题": {}, "核心字段": { "门类": [ [ "闸门", 1 ] ] } }, "property_label": "", "proportion": "", "requirement": "", "serviceTime": { "service_days": 0, "service_end": "", "service_start": "" }, "success": true, "time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_contractEnd": "", "time_contractStart": "", "time_earnestMoneyEnd": "", "time_earnestMoneyStart": "", "time_getFileEnd": "", "time_getFileStart": "", "time_listingEnd": "", "time_listingStart": "", "time_planned": "", "time_publicityEnd": "", "time_publicityStart": "", "time_registrationEnd": "", "time_registrationStart": "", "time_release": "", "time_signContract": "", "total_tendereeMoney": 0, "total_tendereeMoneyUnit": "", "version_date": "2025-04-02", "word_count": { "正文": 240, "附件": 0 } }
  2252. '''
  2253. b = '''
  2254. {"招标信息":{"招标人名称":"五矿铜业(湖南)有限公司","项目预算":"","招标人联系方式":[{"联系人":"李锟","联系电话":"13575144492"}]},"中标信息":[{"中标人名称":"","中标金额":"","标段号":""}]}
  2255. '''
  2256. # print(de.should_to_extract_ai(a))
  2257. print(de.merge_json(a,b))
  2258. print(test_ai_extract())
  2259. # de.start_flow_extract()
  2260. # fixDoc_to_queue_extract()
  2261. # check_data_synchronization()
  2262. # fixDoc_to_queue_init(filename="C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-07.xlsx")