dataflow_mq.py 134 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870
  1. #coding:utf8
  2. from BaseDataMaintenance.maintenance.dataflow import *
  3. from BaseDataMaintenance.common.activateMQUtils import *
  4. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ,getConnection_postgres,getConnection_mysql,getConnection_oracle,getConnect_ots_capacity,getConnect_redis_doc
  5. from BaseDataMaintenance.dataSource.setttings import *
  6. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  7. import os
  8. from BaseDataMaintenance.common.ossUtils import *
  9. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  10. from BaseDataMaintenance.model.ots.document import Document,document_attachment_path_filemd5
  11. from BaseDataMaintenance.common.Utils import article_limit
  12. from BaseDataMaintenance.common.documentFingerprint import getFingerprint
  13. from BaseDataMaintenance.model.postgres.document_extract import *
  14. from BaseDataMaintenance.model.oracle.T_SHEN_PI_XIANG_MU import *
  15. import sys
  16. sys.setrecursionlimit(1000000)
  17. from multiprocessing import Process
  18. from BaseDataMaintenance.AIUtils.DoubaoUtils import chat_doubao,get_json_from_text
  19. from BaseDataMaintenance.AIUtils.html2text import html2text_with_tablehtml
  20. from BaseDataMaintenance.AIUtils.prompts import get_prompt_extract_role
  21. from BaseDataMaintenance.common.Utils import getUnifyMoney
  22. from BaseDataMaintenance.maintenance.product.medical_product import MedicalProduct
  23. class ActiveMQListener():
  24. def __init__(self,conn,_queue,*args,**kwargs):
  25. self.conn = conn
  26. self._queue = _queue
  27. def on_error(self, headers):
  28. log("===============")
  29. log('received an error %s' % str(headers.body))
  30. def on_message(self, headers):
  31. log("====message====")
  32. message_id = headers.headers["message-id"]
  33. body = headers.body
  34. self._queue.put({"frame":headers,"conn":self.conn},True)
  35. def __del__(self):
  36. self.conn.disconnect()
  37. class DynamicProcess(Process):
  38. def __init__(self,listener_cls,comsumer_handler,comsumer_count,queue_name):
  39. self.listener_cls = listener_cls
  40. self.comsumer_handler = comsumer_handler
  41. self.comsumer_count = comsumer_count
  42. self.queue_name = queue_name
  43. def run(self):
  44. self.list_comsumer = []
  45. for _i in range(self.comsumer_count):
  46. listener_attachment = self.listener_cls(getConnect_activateMQ(),self.comsumer_handler ,_i)
  47. createComsumer(listener_attachment,self.queue_name)
  48. self.list_comsumer.append(listener_attachment)
  49. while 1:
  50. for i in range(len(self.list_comsumer)):
  51. if self.list_comsumer[i].conn.is_connected():
  52. continue
  53. else:
  54. listener = self.listener_cls(getConnect_activateMQ(),self.comsumer_handler,i)
  55. createComsumer(listener,self.queue_name)
  56. self.list_comsumer[i] = listener
  57. time.sleep(5)
  58. def terminate(self) -> None:
  59. for i in range(len(self.list_comsumer)):
  60. _c = self.list_comsumer[i]
  61. _c.conn.disconnect()
  62. super().terminate()
  63. def dynamic_listener(sorted_names,process_count,listener_cls,comsumer_handler,comsumer_count):
  64. from BaseDataMaintenance.java.MQInfo import getQueueSize
  65. list_queue_dict = []
  66. for _i in range(len(sorted_names)):
  67. list_queue_dict.append({"name":sorted_names[_i],
  68. "level":_i})
  69. if len(list_queue_dict)==0:
  70. return
  71. last_name = ""
  72. list_comsumer = []
  73. while 1:
  74. for _d in list_queue_dict:
  75. _size = getQueueSize(_d.get("name"))
  76. _d["queue_size"] = _size
  77. if _size>1000:
  78. _d["size_level"] = 1
  79. else:
  80. _d["size_level"] = 2
  81. list_queue_dict.sort(key=lambda x:(x.get("size_level"),x.get("level")))
  82. queue_name = list_queue_dict[0].get("name")
  83. if last_name!=queue_name:
  84. for _c in list_comsumer:
  85. _c.terminate()
  86. del _c
  87. list_comsumer = []
  88. for _i in range(process_count):
  89. listener_p = DynamicProcess(listener_cls,comsumer_handler,comsumer_count,queue_name=queue_name)
  90. listener_p.start()
  91. list_comsumer.append(listener_p)
  92. last_name = queue_name
  93. time.sleep(120)
  94. class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
  95. class AttachmentMQListener():
  96. def __init__(self,conn,_func,_idx,*args,**kwargs):
  97. self.conn = conn
  98. self._func = _func
  99. self._idx = _idx
  100. def on_error(self, headers):
  101. log("===============")
  102. log('received an error %s' % str(headers.body))
  103. def on_message(self, headers):
  104. try:
  105. log("get message of idx:%s"%(str(self._idx)))
  106. message_id = headers.headers["message-id"]
  107. body = headers.body
  108. _dict = {"frame":headers,"conn":self.conn,"idx":self._idx}
  109. self._func(_dict=_dict)
  110. except Exception as e:
  111. traceback.print_exc()
  112. pass
  113. def __del__(self):
  114. self.conn.disconnect()
  115. def __init__(self):
  116. Dataflow_attachment.__init__(self)
  117. self.mq_attachment = "/queue/dataflow_attachment"
  118. self.mq_attachment_fix = "/queue/dataflow_attachment_fix"
  119. self.mq_attachment_his = "/queue/dataflow_attachment_his"
  120. self.mq_attachment_failed = "/queue/dataflow_attachment_failed"
  121. self.mq_extract = "/queue/dataflow_extract"
  122. self.mq_extract_his = "/queue/dataflow_extract_his"
  123. self.queue_attachment_ocr = Queue()
  124. self.queue_attachment_not_ocr = Queue()
  125. self.comsumer_count = 20
  126. self.comsumer_process_count = 2
  127. self.comsumer_process_count_fix = 1
  128. self.comsumer_process_count_his = 1
  129. self.retry_comsumer_count = 10
  130. self.retry_times = 5
  131. self.list_attachment_comsumer = []
  132. # for _i in range(self.comsumer_count):
  133. # listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.queue_attachment)
  134. # createComsumer(listener_attachment,self.mq_attachment)
  135. # self.list_attachment_comsumer.append(listener_attachment)
  136. self.attach_pool = ConnectorPool(10,30,getConnection_postgres)
  137. self.redis_pool = ConnectorPool(10,30,getConnect_redis_doc)
  138. self.conn_mq = getConnect_activateMQ()
  139. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  140. self.session = None
  141. for _ in range(self.comsumer_process_count):
  142. listener_p = Process(target=self.start_attachment_listener,args=(5,self.mq_attachment))
  143. listener_p.start()
  144. for _ in range(self.comsumer_process_count_fix):
  145. listener_p = Process(target=self.start_attachment_listener,args=(5,self.mq_attachment_fix))
  146. listener_p.start()
  147. for _ in range(self.comsumer_process_count_his):
  148. listener_p = Process(target=self.start_attachment_listener,args=(5,self.mq_attachment_his))
  149. listener_p.start()
  150. listener_p = Process(target=dynamic_listener,args=(["dataflow_attachment","dataflow_attachment_fix","dataflow_attachment_his"],6,self.AttachmentMQListener,self.start_attachment_listener,5))
  151. listener_p.start()
  152. # listener_p = Process(target=self.start_attachment_listener)
  153. # listener_p.start()
  154. def start_attachment_listener(self,comsumer_count,queue_name):
  155. list_attachment_comsumer = []
  156. for _i in range(comsumer_count):
  157. listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler ,_i)
  158. createComsumer(listener_attachment,queue_name)
  159. list_attachment_comsumer.append(listener_attachment)
  160. while 1:
  161. for i in range(len(list_attachment_comsumer)):
  162. if list_attachment_comsumer[i].conn.is_connected():
  163. continue
  164. else:
  165. listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,i)
  166. createComsumer(listener,queue_name)
  167. list_attachment_comsumer[i] = listener
  168. time.sleep(5)
  169. def monitor_listener(self):
  170. for i in range(len(self.list_attachment_comsumer)):
  171. if self.list_attachment_comsumer[i].conn.is_connected():
  172. continue
  173. else:
  174. listener = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  175. createComsumer(listener,self.mq_attachment)
  176. self.list_attachment_comsumer[i] = listener
  177. def process_failed_attachment(self):
  178. from BaseDataMaintenance.java.MQInfo import getQueueSize
  179. attachment_size = getQueueSize("dataflow_attachment")
  180. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  181. if attachment_size<100 and failed_attachment_size>0:
  182. list_comsumer = []
  183. for _i in range(self.retry_comsumer_count):
  184. listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
  185. list_comsumer.append(listener_attachment)
  186. createComsumer(listener_attachment,self.mq_attachment_failed)
  187. while 1:
  188. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  189. if failed_attachment_size==0:
  190. break
  191. time.sleep(10)
  192. for _c in list_comsumer:
  193. _c.conn.disconnect()
  194. def attachment_listener_handler(self,_dict):
  195. try:
  196. frame = _dict["frame"]
  197. conn = _dict["conn"]
  198. message_id = frame.headers["message-id"]
  199. item = json.loads(frame.body)
  200. _idx = _dict.get("idx",1)
  201. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  202. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  203. if random.random()<0.2:
  204. log("jump by random")
  205. if send_msg_toacmq(self.pool_mq,frame.body,self.mq_attachment):
  206. ackMsg(conn,message_id)
  207. return
  208. if len(page_attachments)==0:
  209. newitem ={"item":item,"list_attach":[],"message_id":message_id,"conn":conn}
  210. else:
  211. list_fileMd5 = []
  212. for _atta in page_attachments:
  213. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  214. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  215. newitem = {"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn}
  216. log("attachment get doc:%s"%(str(newitem.get("item",{}).get("docid"))))
  217. self.attachment_recognize(newitem,None)
  218. log("attachment get doc:%s succeed"%(str(newitem.get("item",{}).get("docid"))))
  219. except Exception as e:
  220. traceback.print_exc()
  221. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  222. try:
  223. list_html = []
  224. swf_urls = []
  225. _not_failed = True
  226. for _attach in list_attach:
  227. #测试全跑
  228. _filemd5 = _attach.getProperties().get(attachment_filemd5)
  229. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  230. log("%s has processed or toolarge"%(_filemd5))
  231. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  232. if _html is None:
  233. _html = ""
  234. list_html.append({attachment_filemd5:_filemd5,
  235. "html":_html})
  236. else:
  237. #has process_time then jump
  238. if len(str(_attach.getProperties().get(attachment_process_time,"")))>10 and _attach.getProperties().get(attachment_status)!=ATTACHMENT_INIT and not (_attach.getProperties().get(attachment_status)>=ATTACHMENT_MC_FAILED_FROM and _attach.getProperties().get(attachment_status)<=ATTACHMENT_MC_FAILED_TO):
  239. log("%s has process_time jump"%(_filemd5))
  240. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  241. if _html is None:
  242. _html = ""
  243. list_html.append({attachment_filemd5:_filemd5,
  244. "html":_html})
  245. else:
  246. log("%s requesting interface"%(_filemd5))
  247. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  248. if not _succeed:
  249. _not_failed = False
  250. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  251. if _html is None:
  252. _html = ""
  253. list_html.append({attachment_filemd5:_filemd5,
  254. "html":_html})
  255. if _attach.getProperties().get(attachment_filetype)=="swf":
  256. # swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  257. _swf_urls = _attach.getProperties().get(attachment_swfUrls, "[]")
  258. if _swf_urls:
  259. _swf_urls = _swf_urls.replace('\\', '')
  260. else:
  261. _swf_urls = '[]'
  262. _swf_urls = json.loads(_swf_urls)
  263. swf_urls.extend(_swf_urls)
  264. if not _not_failed:
  265. return False,list_html,swf_urls
  266. return True,list_html,swf_urls
  267. except requests.ConnectionError as e1:
  268. raise e1
  269. except Exception as e:
  270. return False,list_html,swf_urls
  271. def attachment_recognize(self,_dict,result_queue):
  272. '''
  273. 识别附件内容
  274. :param _dict: 附件内容
  275. :param result_queue:
  276. :return:
  277. '''
  278. try:
  279. start_time = time.time()
  280. item = _dict.get("item")
  281. list_attach = _dict.get("list_attach")
  282. conn = _dict["conn"]
  283. message_id = _dict.get("message_id")
  284. if "retry_times" not in item:
  285. item["retry_times"] = 5
  286. _retry_times = item.get("retry_times",0)
  287. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  288. "docid":item.get("docid")})
  289. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  290. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  291. dhtml.delete_bidi_a()
  292. #调用识别接口
  293. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  294. # 将附件分类写回document
  295. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  296. if len(page_attachments)>0:
  297. for _attachment in page_attachments:
  298. filemd5 = _attachment.get(document_attachment_path_filemd5,"")
  299. classification = None
  300. for _attach in list_attach:
  301. if _attach.getProperties().get(attachment_filemd5,"")==filemd5:
  302. classification = _attach.getProperties().get(attachment_classification,"")
  303. break
  304. if classification is not None:
  305. _attachment[attachment_classification] = classification
  306. item[document_tmp_attachment_path] = json.dumps(page_attachments,ensure_ascii=False)
  307. dtmp = Document_tmp(item)
  308. _to_ack = False
  309. if not _succeed and _retry_times<self.retry_times:
  310. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  311. item["retry_times"] = _retry_times+1
  312. #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
  313. if item["retry_times"]>=self.retry_times:
  314. send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
  315. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
  316. #失败保存
  317. if _retry_times==0:
  318. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  319. dtmp.setValue(document_tmp_status,0,True)
  320. if not dtmp.exists_row(self.ots_client):
  321. dtmp.update_row(self.ots_client)
  322. dhtml.update_row(self.ots_client)
  323. if send_succeed:
  324. _to_ack = True
  325. else:
  326. try:
  327. log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),str(len(list_html))))
  328. dhtml.updateSWFImages(swf_urls)
  329. dhtml.updateAttachment(list_html)
  330. dtmp.setValue(document_tmp_attachment_extract_status,1,True)
  331. dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
  332. page_time = item.get(document_page_time,"")
  333. current_date = getCurrent_date(format='%Y-%m-%d')
  334. last_7_date = timeAdd(current_date,-7,format='%Y-%m-%d')
  335. if page_time<last_7_date:
  336. is_his = True
  337. else:
  338. is_his = False
  339. if not is_his:
  340. queue_name = self.mq_extract
  341. else:
  342. queue_name = self.mq_extract_his
  343. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),queue_name)
  344. if send_succeed:
  345. _to_ack = True
  346. except Exception as e:
  347. traceback.print_exc()
  348. if _to_ack:
  349. ackMsg(conn,message_id)
  350. log("document:%d get attachments with result:%s %s retry_times:%d"%(item.get("docid"),str(_succeed),str(_to_ack),_retry_times))
  351. except Exception as e:
  352. traceback.print_exc()
  353. if time.time()-start_time<10:
  354. item["retry_times"] -= 1
  355. if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment):
  356. ackMsg(conn,message_id)
  357. def request_attachment_interface(self,attach,_dochtmlcon):
  358. filemd5 = attach.getProperties().get(attachment_filemd5)
  359. _status = attach.getProperties().get(attachment_status)
  360. _filetype = attach.getProperties().get(attachment_filetype)
  361. _path = attach.getProperties().get(attachment_path)
  362. _uuid = uuid4()
  363. objectPath = attach.getProperties().get(attachment_path)
  364. docids = attach.getProperties().get(attachment_docids)
  365. _ots_exists = attach.getProperties().get("ots_exists")
  366. if objectPath is None:
  367. relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex)
  368. else:
  369. relative_path = objectPath[5:].replace("//","/")
  370. localpath = "/FileInfo/%s"%(relative_path)
  371. if not os.path.exists(localpath):
  372. if not os.path.exists(os.path.dirname(localpath)):
  373. os.makedirs(os.path.dirname(localpath))
  374. local_exists = False
  375. else:
  376. local_exists = True
  377. _size = os.path.getsize(localpath)
  378. not_failed_flag = True
  379. try:
  380. d_start_time = time.time()
  381. if not local_exists:
  382. log("md5:%s path:%s not exists,start downloading"%(filemd5,objectPath))
  383. try:
  384. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  385. except Exception as e:
  386. download_succeed = False
  387. else:
  388. log("md5:%s path:%s exists"%(filemd5,objectPath[5:]))
  389. if not (local_exists or download_succeed):
  390. _ots_attach = attachment(attach.getProperties_ots())
  391. _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_classification,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True)
  392. log("md5:%s path:%s file not in local or oss,search ots.attachment"%(filemd5,objectPath))
  393. if _ots_attach.getProperties().get(attachment_attachmenthtml,"")!="" and str(_ots_attach.getProperties().get(attachment_status))!=str(ATTACHMENT_INIT):
  394. attach.setValue(attachment_attachmenthtml,_ots_attach.getProperties().get(attachment_attachmenthtml,""))
  395. attach.setValue(attachment_attachmentcon,_ots_attach.getProperties().get(attachment_attachmentcon,""))
  396. attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,""))
  397. attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,""))
  398. attach.setValue(attachment_classification,_ots_attach.getProperties().get(attachment_classification,""))
  399. # if attach.exists(self.attach_pool):
  400. # attach.update_row(self.attach_pool)
  401. # else:
  402. # attach.insert_row(self.attach_pool)
  403. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  404. try:
  405. if os.exists(localpath):
  406. os.remove(localpath)
  407. except Exception as e:
  408. pass
  409. return True
  410. if _ots_exists:
  411. objectPath = attach.getProperties().get(attachment_path)
  412. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  413. if download_succeed:
  414. log("md5:%s path:%s download file from oss succeed"%(filemd5,objectPath))
  415. else:
  416. log("md5:%s path:%s download file from ots failed=="%(filemd5,objectPath))
  417. else:
  418. log("md5:%s path:%s not found in ots"%(filemd5,objectPath))
  419. if local_exists or download_succeed:
  420. _size = os.path.getsize(localpath)
  421. attach.setValue(attachment_size,_size,True)
  422. if _size>ATTACHMENT_LARGESIZE:
  423. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE,True)
  424. log("attachment :%s of path:%s to large"%(filemd5,_path))
  425. _ots_attach = attachment(attach.getProperties_ots())
  426. _ots_attach.update_row(self.ots_client)
  427. # #更新postgres
  428. # if attach.exists(self.attach_pool):
  429. # attach.update_row(self.attach_pool)
  430. # else:
  431. # attach.insert_row(self.attach_pool)
  432. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  433. if local_exists:
  434. if not _ots_exists:
  435. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  436. os.remove(localpath)
  437. return True
  438. time_download = time.time()-d_start_time
  439. #调用接口处理结果
  440. start_time = time.time()
  441. _filetype = attach.getProperties().get(attachment_filetype)
  442. if str(_filetype)=="wps":
  443. _filetype = "doc"
  444. # _data_base64 = base64.b64encode(open(localpath,"rb").read())
  445. # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
  446. _success,_html,swf_images,classification = getAttachDealInterface(None,_filetype,path=localpath,session=self.session)
  447. _reg_time = time.time()-start_time
  448. if _success:
  449. if len(_html)<5:
  450. _html = ""
  451. else:
  452. if len(_html)>1:
  453. _html = "interface return error"
  454. else:
  455. # sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  456. _html = ""
  457. return False
  458. # 重跑swf时,删除原来的swf_urls中的"\"
  459. if attach.getProperties().get(attachment_filetype) == "swf":
  460. swf_urls = attach.getProperties().get(attachment_swfUrls, "[]")
  461. swf_urls = swf_urls.replace('\\', '') if swf_urls else '[]'
  462. swf_urls = json.loads(swf_urls)
  463. attach.setValue(attachment_swfUrls, json.dumps(swf_urls, ensure_ascii=False), True)
  464. swf_images = eval(swf_images)
  465. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  466. swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  467. if len(swf_urls)==0:
  468. objectPath = attach.getProperties().get(attachment_path,"")
  469. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  470. if not os.path.exists(swf_dir):
  471. os.mkdir(swf_dir)
  472. for _i in range(len(swf_images)):
  473. _base = swf_images[_i]
  474. _base = base64.b64decode(_base)
  475. filename = "swf_page_%d.png"%(_i)
  476. filepath = os.path.join(swf_dir,filename)
  477. with open(filepath,"wb") as f:
  478. f.write(_base)
  479. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  480. if os.path.exists(swf_dir):
  481. os.rmdir(swf_dir)
  482. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  483. else:
  484. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  485. if re.search("<td",_html) is not None:
  486. attach.setValue(attachment_has_table,1,True)
  487. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  488. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  489. if _file_title!="":
  490. attach.setValue(attachment_file_title,_file_title,True)
  491. if filelink!="":
  492. attach.setValue(attachment_file_link,filelink,True)
  493. attach.setValue(attachment_attachmenthtml,_html,True)
  494. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  495. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  496. attach.setValue(attachment_recsize,len(_html),True)
  497. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  498. attach.setValue(attachment_classification,classification,True)
  499. #更新ots
  500. _ots_attach = attachment(attach.getProperties_ots())
  501. _ots_attach.update_row(self.ots_client) #线上再开放更新
  502. # #更新postgres
  503. # if attach.exists(self.attach_pool):
  504. # attach.update_row(self.attach_pool)
  505. # else:
  506. # attach.insert_row(self.attach_pool)
  507. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  508. start_time = time.time()
  509. if local_exists:
  510. if not _ots_exists:
  511. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  512. try:
  513. if upload_status and os.exists(localpath):
  514. os.remove(localpath)
  515. except Exception as e:
  516. pass
  517. _upload_time = time.time()-start_time
  518. log("process filemd5:%s %s of type:%s with size:%.3fM download:%.2fs recognize takes %ds upload takes %.2fs _ots_exists %s,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,_reg_time,_upload_time,str(_ots_exists),len(_html)))
  519. return True
  520. else:
  521. return True
  522. except requests.ConnectionError as e1:
  523. raise e1
  524. except oss2.exceptions.NotFound as e:
  525. return True
  526. except Exception as e:
  527. traceback.print_exc()
  528. # def flow_attachment(self):
  529. # self.flow_attachment_producer()
  530. # self.flow_attachment_producer_comsumer()
  531. def getAttachPath(self,filemd5,_dochtmlcon):
  532. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  533. list_mark = ["data","filelink"]
  534. for _mark in list_mark:
  535. _find = _soup.find("a",attrs={_mark:filemd5})
  536. filelink = ""
  537. if _find is None:
  538. _find = _soup.find("img",attrs={_mark:filemd5})
  539. if _find is not None:
  540. filelink = _find.attrs.get("src","")
  541. else:
  542. filelink = _find.attrs.get("href","")
  543. if filelink.find("bidizhaobiao")>=0:
  544. _path = filelink.split("/file")
  545. if len(_path)>1:
  546. return _path[1]
  547. def getAttach_json_fromRedis(self,filemd5):
  548. db = self.redis_pool.getConnector()
  549. try:
  550. _key = "attach-%s"%(filemd5)
  551. _attach_json = db.get(_key)
  552. return _attach_json
  553. except Exception as e:
  554. log("getAttach_json_fromRedis error %s"%(str(e)))
  555. finally:
  556. try:
  557. if db.connection.check_health():
  558. self.redis_pool.putConnector(db)
  559. except Exception as e:
  560. pass
  561. return None
  562. def putAttach_json_toRedis(self,filemd5,extract_dict):
  563. db = self.redis_pool.getConnector()
  564. try:
  565. new_dict = {}
  566. for k,v in extract_dict.items():
  567. if not isinstance(v,set):
  568. new_dict[k] = v
  569. _key = "attach-%s"%(filemd5)
  570. _extract_json = db.set(str(_key),json.dumps(new_dict))
  571. db.expire(_key,3600*3)
  572. return _extract_json
  573. except Exception as e:
  574. log("putExtract_json_toRedis error%s"%(str(e)))
  575. traceback.print_exc()
  576. finally:
  577. try:
  578. if db.connection.check_health():
  579. self.redis_pool.putConnector(db)
  580. except Exception as e:
  581. pass
  582. def getAttachments(self,list_filemd5,_dochtmlcon):
  583. conn = self.attach_pool.getConnector()
  584. #搜索postgres
  585. try:
  586. to_find_md5 = []
  587. for _filemd5 in list_filemd5[:50]:
  588. if _filemd5 is not None:
  589. to_find_md5.append(_filemd5)
  590. conditions = ["filemd5 in ('%s')"%("','".join(to_find_md5))]
  591. list_attachment = []
  592. set_md5 = set()
  593. # list_attachment = Attachment_postgres.select_rows(conn,Attachment_postgres,"attachment",conditions)
  594. # for _attach in list_attachment:
  595. # set_md5.add(_attach.getProperties().get(attachment_filemd5))
  596. for _filemd5 in to_find_md5:
  597. _json = self.getAttach_json_fromRedis(_filemd5)
  598. if _json is not None:
  599. set_md5.add(_filemd5)
  600. list_attachment.append(Attachment_postgres(json.loads(_json)))
  601. log("select localpath database %d/%d"%(len(set_md5),len(to_find_md5)))
  602. for _filemd5 in to_find_md5:
  603. if _filemd5 not in set_md5:
  604. _path = self.getAttachPath(_filemd5,_dochtmlcon)
  605. log("getAttachments search in ots:%s"%(_filemd5))
  606. _attach = {attachment_filemd5:_filemd5}
  607. _attach_ots = attachment(_attach)
  608. if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time,attachment_classification],True):
  609. if _attach_ots.getProperties().get(attachment_status) is not None:
  610. log("getAttachments find in ots:%s"%(_filemd5))
  611. _attach_pg = Attachment_postgres(_attach_ots.getProperties())
  612. _attach_pg.setValue("ots_exists",True,True)
  613. list_attachment.append(_attach_pg)
  614. else:
  615. log("getAttachments status None find in ots:%s"%(_filemd5))
  616. _attach_pg = Attachment_postgres(_attach_ots.getProperties())
  617. _attach_pg.setValue("ots_exists",True,True)
  618. list_attachment.append(_attach_pg)
  619. else:
  620. log("getAttachments search in path:%s"%(_filemd5))
  621. if _path:
  622. log("getAttachments find in path:%s"%(_filemd5))
  623. if _path[0]=="/":
  624. _path = _path[1:]
  625. _filetype = _path.split(".")[-1]
  626. _attach = {attachment_filemd5:_filemd5,
  627. attachment_filetype:_filetype,
  628. attachment_status:20,
  629. attachment_path:"%s/%s"%(_filemd5[:4],_path),
  630. attachment_crtime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  631. list_attachment.append(Attachment_postgres(_attach))
  632. return list_attachment
  633. except Exception as e:
  634. log("attachProcess comsumer error %s"%str(e))
  635. log(str(to_find_md5))
  636. traceback.print_exc()
  637. return []
  638. finally:
  639. self.attach_pool.putConnector(conn)
  640. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  641. q_size = self.queue_attachment.qsize()
  642. qsize_ocr = self.queue_attachment_ocr.qsize()
  643. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  644. log("queue_attachment:%d,queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(q_size,qsize_ocr,qsize_not_ocr))
  645. def flow_attachment_producer_comsumer(self):
  646. log("start flow_attachment comsumer")
  647. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1,need_stop=False,restart=True)
  648. mt.run()
  649. def flow_attachment_process(self):
  650. self.process_comsumer()
  651. # p = Process(target = self.process_comsumer)
  652. # p.start()
  653. # p.join()
  654. def set_queue(self,_dict):
  655. list_attach = _dict.get("list_attach")
  656. to_ocr = False
  657. for attach in list_attach:
  658. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  659. to_ocr = True
  660. break
  661. if to_ocr:
  662. self.queue_attachment_ocr.put(_dict,True)
  663. else:
  664. self.queue_attachment_not_ocr.put(_dict,True)
  665. def comsumer_handle(self,_dict,result_queue):
  666. try:
  667. frame = _dict["frame"]
  668. conn = _dict["conn"]
  669. message_id = frame.headers["message-id"]
  670. item = json.loads(frame.body)
  671. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  672. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  673. if len(page_attachments)==0:
  674. self.set_queue({"item":item,"list_attach":[],"message_id":message_id,"conn":conn})
  675. else:
  676. list_fileMd5 = []
  677. for _atta in page_attachments:
  678. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  679. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  680. self.set_queue({"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn})
  681. except Exception as e:
  682. traceback.print_exc()
  683. def remove_attachment_postgres(self):
  684. current_date = getCurrent_date(format="%Y-%m-%d")
  685. last_date = timeAdd(current_date,-2,format="%Y-%m-%d")
  686. sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
  687. conn = self.attach_pool.getConnector()
  688. try:
  689. cursor = conn.cursor()
  690. cursor.execute(sql)
  691. conn.commit()
  692. self.attach_pool.putConnector(conn)
  693. except Exception as e:
  694. conn.close()
  695. def start_flow_attachment(self):
  696. schedule = BlockingScheduler()
  697. # schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  698. # schedule.add_job(self.flow_attachment,"cron",second="*/10")
  699. # schedule.add_job(self.flow_attachment_producer,"cron",second="*/10")
  700. # schedule.add_job(self.flow_attachment_producer_comsumer,"cron",second="*/10")
  701. # schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  702. # schedule.add_job(self.monitor_attachment_process,"cron",second="*/10")
  703. # schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
  704. schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
  705. schedule.start()
  706. class Dataflow_ActivteMQ_extract(Dataflow_extract):
  707. class ExtractListener():
  708. def __init__(self,conn,_func,_idx,*args,**kwargs):
  709. self.conn = conn
  710. self._func = _func
  711. self._idx = _idx
  712. def on_message(self, headers):
  713. try:
  714. log("get message of idx:%d"%(self._idx))
  715. message_id = headers.headers["message-id"]
  716. body = headers.body
  717. log("get message %s crtime:%s"%(message_id,json.loads(body).get("crtime","")))
  718. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  719. except Exception as e:
  720. traceback.print_exc()
  721. pass
  722. def on_error(self, headers):
  723. log('received an error %s' % str(headers.body))
  724. def __del__(self):
  725. self.conn.disconnect()
  726. def __init__(self,create_listener=True):
  727. Dataflow_extract.__init__(self)
  728. self.industy_url = "http://127.0.0.1:15000/industry_extract"
  729. self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",20],
  730. ["http://192.168.0.115:15030/content_extract",10]
  731. ]
  732. self.mq_extract = "/queue/dataflow_extract"
  733. self.mq_extract_fix = "/queue/dataflow_extract_fix"
  734. self.mq_extract_his = "/queue/dataflow_extract_his"
  735. self.mq_extract_ai = "/queue/dataflow_extract_AI"
  736. self.mq_extract_failed = "/queue/dataflow_extract_failed"
  737. self.whole_weight = 0
  738. for _url,weight in self.extract_interfaces:
  739. self.whole_weight+= weight
  740. current_weight = 0
  741. for _i in range(len(self.extract_interfaces)):
  742. current_weight += self.extract_interfaces[_i][1]
  743. self.extract_interfaces[_i][1] = current_weight/self.whole_weight
  744. self.comsumer_count = 5
  745. # self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres)
  746. self.pool_redis_doc = ConnectorPool(1,self.comsumer_count,getConnect_redis_doc)
  747. init_nums = 1
  748. self.conn_mq = None
  749. if create_listener:
  750. self.conn_mq = getConnect_activateMQ()
  751. else:
  752. init_nums = 0
  753. self.pool_mq = ConnectorPool(init_nums,30,getConnect_activateMQ)
  754. self.block_url = RLock()
  755. self.url_count = 0
  756. self.session = None
  757. self.MP = MedicalProduct()
  758. self.list_extract_comsumer = []
  759. self.list_extract_ai_comsumer = []
  760. # for _i in range(self.comsumer_count):
  761. # listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  762. # createComsumer(listener_extract,self.mq_extract)
  763. # self.list_extract_comsumer.append(listener_extract)
  764. # 提取listener
  765. if create_listener:
  766. for ii in range(2):
  767. listener_p = Process(target=self.start_extract_listener,args=(5,self.mq_extract))
  768. listener_p.start()
  769. for ii in range(1):
  770. listener_p = Process(target=self.start_extract_listener,args=(5,self.mq_extract_fix))
  771. listener_p.start()
  772. for ii in range(1):
  773. listener_p = Process(target=self.start_extract_listener,args=(5,self.mq_extract_his))
  774. listener_p.start()
  775. listener_p = Process(target=dynamic_listener,args=(["dataflow_extract","dataflow_extract_fix","dataflow_extract_his"],8,self.ExtractListener,self.start_extract_listener,5))
  776. listener_p.start()
  777. listener_p_ai = Thread(target=self.start_extract_AI_listener)
  778. listener_p_ai.start()
  779. def start_extract_AI_listener(self,_count=8):
  780. self.list_extract_ai_comsumer = []
  781. for _i in range(_count):
  782. listener_extract = self.ExtractListener(getConnect_activateMQ(),self.extract_ai_handle,_i)
  783. createComsumer(listener_extract,self.mq_extract_ai)
  784. self.list_extract_ai_comsumer.append(listener_extract)
  785. while 1:
  786. try:
  787. for _i in range(len(self.list_extract_ai_comsumer)):
  788. if self.list_extract_ai_comsumer[_i].conn.is_connected():
  789. continue
  790. else:
  791. listener = self.ExtractListener(getConnect_activateMQ(),self.extract_ai_handle,_i)
  792. createComsumer(listener,self.mq_extract_ai)
  793. self.list_extract_ai_comsumer[_i] = listener
  794. time.sleep(5)
  795. except Exception as e:
  796. traceback.print_exc()
  797. def start_extract_listener(self,comsumer_count,queue_name):
  798. list_extract_comsumer = []
  799. for _i in range(comsumer_count):
  800. listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  801. createComsumer(listener_extract,queue_name)
  802. list_extract_comsumer.append(listener_extract)
  803. while 1:
  804. try:
  805. for _i in range(len(list_extract_comsumer)):
  806. if list_extract_comsumer[_i].conn.is_connected():
  807. continue
  808. else:
  809. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  810. createComsumer(listener,queue_name)
  811. list_extract_comsumer[_i] = listener
  812. time.sleep(5)
  813. except Exception as e:
  814. traceback.print_exc()
  815. def monitor_listener(self):
  816. for i in range(len(self.list_extract_comsumer)):
  817. if self.list_extract_comsumer[i].conn.is_connected():
  818. continue
  819. else:
  820. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
  821. createComsumer(listener,self.mq_extract)
  822. self.list_extract_comsumer[i] = listener
  823. def getExtract_url(self):
  824. # _url_num = 0
  825. # with self.block_url:
  826. # self.url_count += 1
  827. # self.url_count %= self.whole_weight
  828. # _url_num = self.url_count
  829. _r = random.random()
  830. # _r = _url_num/self.whole_weight
  831. for _i in range(len(self.extract_interfaces)):
  832. if _r<=self.extract_interfaces[_i][1]:
  833. return self.extract_interfaces[_i][0]
  834. def request_extract_interface(self,json,headers):
  835. # _i = random.randint(0,len(self.extract_interfaces)-1)
  836. # _i = 0
  837. # _url = self.extract_interfaces[_i]
  838. _url = self.getExtract_url()
  839. log("extract_url:%s"%(str(_url)))
  840. with requests.Session() as session:
  841. resp = session.post(_url,json=json,headers=headers,timeout=10*60)
  842. return resp
  843. def request_industry_interface(self,json,headers):
  844. resp = requests.post(self.industy_url,json=json,headers=headers)
  845. return resp
  846. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  847. q_size = self.queue_extract.qsize()
  848. log("queue extract size:%d"%(q_size))
  849. def process_extract_failed(self):
  850. def _handle(_dict,result_queue):
  851. frame = _dict.get("frame")
  852. message_id = frame.headers["message-id"]
  853. subscription = frame.headers.setdefault('subscription', None)
  854. conn = _dict.get("conn")
  855. body = frame.body
  856. if body is not None:
  857. item = json.loads(body)
  858. item["extract_times"] = 10
  859. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  860. ackMsg(conn,message_id,subscription)
  861. from BaseDataMaintenance.java.MQInfo import getQueueSize
  862. try:
  863. extract_failed_size = getQueueSize("dataflow_extract_failed")
  864. extract_size = getQueueSize("dataflow_extract")
  865. log("extract_failed_size %s extract_size %s"%(str(extract_failed_size),str(extract_size)))
  866. if extract_failed_size>0 and extract_size<100:
  867. failed_listener = self.ExtractListener(getConnect_activateMQ(),_handle,1)
  868. createComsumer(failed_listener,self.mq_extract_failed)
  869. while 1:
  870. extract_failed_size = getQueueSize("dataflow_extract_failed")
  871. if extract_failed_size==0:
  872. break
  873. time.sleep(10)
  874. failed_listener.conn.disconnect()
  875. except Exception as e:
  876. traceback.print_exc()
  877. def flow_extract(self,):
  878. self.comsumer()
  879. def comsumer(self):
  880. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,20,1,True)
  881. mt.run()
  882. def getExtract_json_fromDB(self,_fingerprint):
  883. conn = self.pool_postgres.getConnector()
  884. try:
  885. list_extract = Document_extract_postgres.select_rows(conn,Document_extract_postgres,"document_extract",[" fingerprint='%s'"%_fingerprint])
  886. if len(list_extract)>0:
  887. _extract = list_extract[0]
  888. return _extract.getProperties().get(document_extract_extract_json)
  889. except Exception as e:
  890. traceback.print_exc()
  891. finally:
  892. self.pool_postgres.putConnector(conn)
  893. return None
  894. def putExtract_json_toDB(self,fingerprint,docid,extract_json):
  895. _de = Document_extract_postgres({document_extract_fingerprint:fingerprint,
  896. document_extract_docid:docid,
  897. document_extract_extract_json:extract_json})
  898. _de.insert_row(self.pool_postgres,1)
  899. def getExtract_json_fromRedis(self,_fingerprint):
  900. db = self.pool_redis_doc.getConnector()
  901. try:
  902. _extract_json = db.get(_fingerprint)
  903. return _extract_json
  904. except Exception as e:
  905. log("getExtract_json_fromRedis error %s"%(str(e)))
  906. finally:
  907. try:
  908. if db.connection.check_health():
  909. self.pool_redis_doc.putConnector(db)
  910. except Exception as e:
  911. pass
  912. return None
  913. def putExtract_json_toRedis(self,fingerprint,extract_json):
  914. db = self.pool_redis_doc.getConnector()
  915. try:
  916. _extract_json = db.set(str(fingerprint),extract_json)
  917. db.expire(fingerprint,3600*2)
  918. return _extract_json
  919. except Exception as e:
  920. log("putExtract_json_toRedis error%s"%(str(e)))
  921. traceback.print_exc()
  922. finally:
  923. try:
  924. if db.connection.check_health():
  925. self.pool_redis_doc.putConnector(db)
  926. except Exception as e:
  927. pass
  928. def comsumer_handle(self,_dict,result_queue):
  929. try:
  930. log("start handle")
  931. data = {}
  932. frame = _dict["frame"]
  933. conn = _dict["conn"]
  934. message_id = frame.headers["message-id"]
  935. subscription = frame.headers.setdefault('subscription', None)
  936. item = json.loads(frame.body)
  937. for k,v in item.items():
  938. try:
  939. if isinstance(v,bytes):
  940. item[k] = v.decode("utf-8")
  941. except Exception as e:
  942. log("docid %d types bytes can not decode"%(item.get("docid")))
  943. item[k] = ""
  944. dtmp = Document_tmp(item)
  945. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  946. "docid":item.get("docid")})
  947. extract_times = item.get("extract_times",0)+1
  948. item["extract_times"] = extract_times
  949. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  950. html_len = len(_dochtmlcon) # html 文本长度
  951. limit_text_len = 50000 # 内容(或附件)正文限制文本长度
  952. if html_len > limit_text_len:
  953. log("docid %s dochtmlcon too long len %d "%(str(item.get("docid")),html_len))
  954. try:
  955. _dochtmlcon = re.sub("<html>|</html>|<body>|</body>", "", _dochtmlcon)
  956. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  957. all_len = len(_soup.get_text()) # 全公告内容text长度
  958. _attachment = _soup.find("div", attrs={"class": "richTextFetch"})
  959. attachment_len = len(_attachment.get_text()) if _attachment else 0 # 附件内容text长度
  960. main_text_len = all_len - attachment_len # 正文内容text长度
  961. # if attachment_len>150000: # 附件内容过长删除(处理超时)
  962. # if _attachment is not None:
  963. # _attachment.decompose()
  964. # attachment_len = 0
  965. # 正文或附件内容text长度大于limit_text_len才执行article_limit
  966. if main_text_len>limit_text_len or attachment_len>limit_text_len:
  967. _soup = article_limit(_soup,limit_text_len)
  968. _dochtmlcon = str(_soup)
  969. except Exception as e:
  970. traceback.print_exc()
  971. ackMsg(conn,message_id,subscription)
  972. return
  973. log("docid %s len %d limit to %d"%(str(item.get("docid")),html_len,len(_dochtmlcon)))
  974. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  975. _extract = Document_extract({})
  976. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  977. _extract.setValue(document_extract2_docid,item.get(document_docid))
  978. all_done = 1
  979. for k,v in item.items():
  980. data[k] = v
  981. data["timeout"] = 440
  982. data["doc_id"] = data.get(document_tmp_docid,0)
  983. # if data["docid"]<298986054 and data["docid"]>0:
  984. # log("jump docid %s"%(str(data["docid"])))
  985. # ackMsg(conn,message_id,subscription)
  986. # return
  987. data["content"] = data.get(document_tmp_dochtmlcon,"")
  988. if document_tmp_dochtmlcon in data:
  989. data.pop(document_tmp_dochtmlcon)
  990. data["title"] = data.get(document_tmp_doctitle,"")
  991. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  992. data["web_source_name"] = item.get(document_tmp_web_source_name,"")
  993. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  994. data["page_attachments"] = item.get(document_tmp_attachment_path,"[]")
  995. _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))+str(data["original_docchannel"])
  996. to_ai = False
  997. if all_done>0:
  998. _time = time.time()
  999. # extract_json = self.getExtract_json_fromDB(_fingerprint)
  1000. extract_json = self.getExtract_json_fromRedis(_fingerprint)
  1001. log("get json from db takes %.4f"%(time.time()-_time))
  1002. # extract_json = None
  1003. _docid = int(data["doc_id"])
  1004. if extract_json is not None:
  1005. log("fingerprint %s exists docid:%s"%(_fingerprint,str(_docid)))
  1006. _extract.setValue(document_extract2_extract_json,extract_json,True)
  1007. else:
  1008. resp = self.request_extract_interface(json=data,headers=self.header)
  1009. if (resp.status_code >=200 and resp.status_code<=213):
  1010. extract_json = resp.content.decode("utf8")
  1011. _extract.setValue(document_extract2_extract_json,extract_json,True)
  1012. _time = time.time()
  1013. # self.putExtract_json_toDB(_fingerprint,_docid,extract_json)
  1014. self.putExtract_json_toRedis(_fingerprint,extract_json)
  1015. log("get json to db takes %.4f"%(time.time()-_time))
  1016. else:
  1017. log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  1018. all_done = -2
  1019. to_ai,_reason = self.should_to_extract_ai(extract_json)
  1020. if to_ai:
  1021. log("to_ai of docid:%s of reason:%s"%(str(_docid),str(_reason)))
  1022. # if all_done>0:
  1023. # resp = self.request_industry_interface(json=data,headers=self.header)
  1024. # if (resp.status_code >=200 and resp.status_code<=213):
  1025. # _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  1026. # else:
  1027. # log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  1028. # all_done = -3
  1029. # _to_ack = False
  1030. # if all_done>0 and len(_extract.getProperties().get(document_extract2_extract_json,""))<=2:
  1031. # all_done = -4
  1032. _extract.setValue(document_extract2_industry_json,"{}",True)
  1033. _to_ack = True
  1034. try:
  1035. if all_done!=1:
  1036. # sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  1037. log("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  1038. if extract_times>=10:
  1039. #process as succeed
  1040. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1041. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1042. dtmp.update_row(self.ots_client)
  1043. dhtml.update_row(self.ots_client)
  1044. #replace as {}
  1045. _extract.setValue(document_extract2_extract_json,"{}",True)
  1046. _extract.setValue(document_extract2_industry_json,"{}",True)
  1047. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1048. _extract.update_row(self.ots_client)
  1049. _to_ack = True
  1050. elif extract_times>5:
  1051. #transform to the extract_failed queue
  1052. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  1053. #process as succeed
  1054. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1055. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1056. dtmp.update_row(self.ots_client)
  1057. dhtml.update_row(self.ots_client)
  1058. #replace as {}
  1059. _extract.setValue(document_extract2_extract_json,"{}",True)
  1060. _extract.setValue(document_extract2_industry_json,"{}",True)
  1061. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1062. _extract.update_row(self.ots_client)
  1063. _to_ack = True
  1064. else:
  1065. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
  1066. #失败保存
  1067. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1068. dtmp.setValue(document_tmp_status,60,True)
  1069. if not dtmp.exists_row(self.ots_client):
  1070. dtmp.update_row(self.ots_client)
  1071. dhtml.update_row(self.ots_client)
  1072. if send_succeed:
  1073. _to_ack = True
  1074. else:
  1075. #process succeed
  1076. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1077. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1078. if _docid==290816703:
  1079. dtmp.setValue("test_json",extract_json,True)
  1080. dtmp.update_row(self.ots_client)
  1081. dhtml.update_row(self.ots_client)
  1082. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1083. _extract.update_row(self.ots_client)
  1084. _to_ack = True
  1085. except Exception:
  1086. traceback.print_exc()
  1087. if _to_ack:
  1088. ackMsg(conn,message_id,subscription)
  1089. if to_ai:
  1090. #sent to ai
  1091. item[document_extract2_extract_json] = json.loads(_extract.getProperties().get(document_extract2_extract_json,"{}"))
  1092. send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai)
  1093. else:
  1094. item["extract_times"] -= 1
  1095. send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
  1096. ackMsg(conn,message_id,subscription)
  1097. log("process %s docid:%d %s"%(str(_to_ack),data.get("doc_id"),str(all_done)))
  1098. except requests.ConnectionError as e1:
  1099. item["extract_times"] -= 1
  1100. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  1101. ackMsg(conn,message_id,subscription)
  1102. except Exception as e:
  1103. traceback.print_exc()
  1104. # sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
  1105. log("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
  1106. log("process %s docid: failed message_id:%s"%(data.get("doc_id"),message_id))
  1107. if extract_times>=10:
  1108. #process as succeed
  1109. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1110. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1111. dtmp.update_row(self.ots_client)
  1112. dhtml.update_row(self.ots_client)
  1113. #replace as {}
  1114. _extract.setValue(document_extract2_extract_json,"{}",True)
  1115. _extract.setValue(document_extract2_industry_json,"{}",True)
  1116. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1117. _extract.update_row(self.ots_client)
  1118. ackMsg(conn,message_id,subscription)
  1119. elif extract_times>5:
  1120. #transform to the extract_failed queue
  1121. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  1122. #process as succeed
  1123. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1124. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1125. dtmp.update_row(self.ots_client)
  1126. dhtml.update_row(self.ots_client)
  1127. #replace as {}
  1128. _extract.setValue(document_extract2_extract_json,"{}",True)
  1129. _extract.setValue(document_extract2_industry_json,"{}",True)
  1130. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1131. _extract.update_row(self.ots_client)
  1132. ackMsg(conn,message_id,subscription)
  1133. else:
  1134. #transform to the extract queue
  1135. #失败保存
  1136. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1137. dtmp.setValue(document_tmp_status,60,True)
  1138. if not dtmp.exists_row(self.ots_client):
  1139. dtmp.update_row(self.ots_client)
  1140. dhtml.update_row(self.ots_client)
  1141. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  1142. ackMsg(conn,message_id,subscription)
  1143. def should_to_extract_ai(self,extract_json):
  1144. _reason = ""
  1145. # return False,_reason
  1146. _extract = {}
  1147. if extract_json is not None:
  1148. try:
  1149. _extract = json.loads(extract_json)
  1150. except Exception as e:
  1151. pass
  1152. has_entity = False
  1153. docchannel = _extract.get("docchannel",{}).get("docchannel","")
  1154. doctype = _extract.get("docchannel",{}).get("doctype","")
  1155. entity_len = len(_extract.get("dict_enterprise",{}).keys())
  1156. products = str(_extract.get("product_attrs",""))
  1157. demand_info = str(_extract.get("demand_info",""))
  1158. project_name = str(_extract.get("name",""))
  1159. class_name = _extract.get("industry",{}).get("class_name","")
  1160. _entity = None
  1161. if entity_len>0:
  1162. has_entity = True
  1163. if entity_len==1:
  1164. _entity = list(_extract.get("dict_enterprise",{}).keys())[0]
  1165. prem = _extract.get("prem",{})
  1166. one_entity_used = False
  1167. has_tenderee = False
  1168. has_win_tenderer = False
  1169. has_budget = False
  1170. budget_unexpected = False
  1171. winprice_unexpected = False
  1172. for _pack,_pack_value in prem.items():
  1173. _rolelist = _pack_value.get("roleList",[])
  1174. for _role in _rolelist:
  1175. if _role.get("role_name","")=="tenderee":
  1176. has_tenderee = True
  1177. if _role.get("role_text","")==_entity:
  1178. one_entity_used = True
  1179. if _role.get("role_name","")=="agency":
  1180. if _role.get("role_text","")==_entity:
  1181. one_entity_used = True
  1182. if _role.get("role_name","")=="win_tenderer":
  1183. has_win_tenderer = True
  1184. if _role.get("role_text","")==_entity:
  1185. one_entity_used = True
  1186. win_price = _role.get("role_money",{}).get("money",0)
  1187. try:
  1188. win_price = float(win_price)
  1189. except Exception as e:
  1190. win_price = 0
  1191. if win_price>0:
  1192. if win_price>100000000 or win_price<100:
  1193. winprice_unexpected = True
  1194. tendereeMoney = _pack_value.get("tendereeMoney",0)
  1195. try:
  1196. tendereeMoney = float(tendereeMoney)
  1197. except Exception as e:
  1198. tendereeMoney = 0
  1199. if tendereeMoney>0:
  1200. has_budget = True
  1201. if tendereeMoney>100000000 or tendereeMoney<100:
  1202. budget_unexpected = True
  1203. if doctype=="采招数据":
  1204. if has_entity and not one_entity_used:
  1205. if not has_tenderee and docchannel in {"招标公告","中标信息","候选人公示","合同公告","验收合同"}:
  1206. return True,_reason
  1207. if not has_win_tenderer and docchannel in {"中标信息","候选人公示","合同公告","验收合同"}:
  1208. return True,_reason
  1209. if class_name=="医疗设备" or self.MP.is_medical_product(products) or self.MP.is_medical_product(demand_info) or self.MP.is_medical_product(project_name):
  1210. _reason = "medical product"
  1211. return True,_reason
  1212. if budget_unexpected or winprice_unexpected:
  1213. return True,_reason
  1214. return False,_reason
  1215. def extract_ai_handle(self,_dict,result_queue):
  1216. frame = _dict["frame"]
  1217. conn = _dict["conn"]
  1218. message_id = frame.headers["message-id"]
  1219. subscription = frame.headers.setdefault('subscription', None)
  1220. item = json.loads(frame.body)
  1221. try:
  1222. _extract_json = None
  1223. if document_extract2_extract_json in item:
  1224. _extract_json = item.get(document_extract2_extract_json)
  1225. item.pop(document_extract2_extract_json)
  1226. message_acknowledged = False
  1227. dtmp = Document_tmp(item)
  1228. _tomq = False
  1229. if dtmp.fix_columns(self.ots_client,["status","save"],True):
  1230. if dtmp.getProperties().get("status",0)>=71:
  1231. if dtmp.getProperties().get("save",1)==0:
  1232. message_acknowledged = True
  1233. log("extract_dump_ai of docid:%d"%(item.get(document_docid)))
  1234. ackMsg(conn,message_id,subscription)
  1235. return
  1236. else:
  1237. _tomq = True
  1238. else:
  1239. _tomq = True
  1240. if _tomq:
  1241. aitimes = item.get("aitimes")
  1242. if aitimes is None:
  1243. aitimes = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  1244. item["aitimes"] = aitimes
  1245. if not message_acknowledged:
  1246. item[document_extract2_extract_json] = _extract_json
  1247. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai):
  1248. message_acknowledged = True
  1249. ackMsg(conn,message_id,subscription)
  1250. time.sleep(1)
  1251. return
  1252. else:
  1253. if not timeAdd(aitimes,0,format="%Y-%m-%d %H:%M:%S",minutes=10)<getCurrent_date(format="%Y-%m-%d %H:%M:%S"):
  1254. if not message_acknowledged:
  1255. item[document_extract2_extract_json] = _extract_json
  1256. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai):
  1257. message_acknowledged = True
  1258. ackMsg(conn,message_id,subscription)
  1259. time.sleep(1)
  1260. return
  1261. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  1262. "docid":item.get("docid")})
  1263. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  1264. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  1265. _extract_ai = {}
  1266. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  1267. _text = html2text_with_tablehtml(_dochtmlcon)
  1268. if len(_text)<25000:
  1269. model_name = "ep-20250314164242-jd62g" #1.5pro 32k
  1270. else:
  1271. _text = _text[:100000]
  1272. model_name = "ep-20250212111145-fflr7" #1.5pro 256k
  1273. msg = get_prompt_extract_role(_text)
  1274. #model_name = "ep-20250212111145-fflr7" #1.5pro 256k
  1275. #model_name = "ep-20250314164242-jd62g" #1.5pro 32k
  1276. result = chat_doubao(msg,model_name=model_name)
  1277. _json = get_json_from_text(result)
  1278. if _json is not None:
  1279. try:
  1280. _extract_ai = json.loads(_json)
  1281. except Exception as e:
  1282. pass
  1283. if len(_extract_ai.keys())>0:
  1284. _new_json,_changed = self.merge_json(_extract_json,_json)
  1285. if _changed:
  1286. dtmp.setValue("extract_json_ai",json.dumps(_extract_ai,ensure_ascii=False))
  1287. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1288. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1289. dtmp.update_row(self.ots_client)
  1290. if not dhtml.exists_row(self.ots_client):
  1291. dhtml.update_row(self.ots_client)
  1292. _extract = Document_extract({})
  1293. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  1294. _extract.setValue(document_extract2_docid,item.get(document_docid))
  1295. _extract.setValue(document_extract2_extract_json,_new_json,True)
  1296. _extract.setValue(document_extract2_industry_json,"{}",True)
  1297. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1298. _extract.update_row(self.ots_client)
  1299. log("extract_ai of docid:%d"%(item.get(document_docid)))
  1300. else:
  1301. doc = Document({"partitionkey":item.get("partitionkey"),
  1302. "docid":item.get("docid"),
  1303. "extract_json_ai":_json
  1304. })
  1305. if doc.exists_row(self.ots_client):
  1306. doc.update_row(self.ots_client)
  1307. log("extract_nochange_ai of docid:%d"%(item.get(document_docid)))
  1308. if not message_acknowledged:
  1309. message_acknowledged = True
  1310. ackMsg(conn,message_id,subscription)
  1311. except Exception as e:
  1312. traceback.print_exc()
  1313. if not message_acknowledged:
  1314. ackMsg(conn,message_id,subscription)
  1315. else:
  1316. send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai)
  1317. ackMsg(conn,message_id,subscription)
  1318. def merge_json(self,extract_json,extract_ai_json):
  1319. def get_ai_money(_text):
  1320. b = re.search(r'[\d,,\.]+[亿万元人民币]+',str(_text))
  1321. if b is not None:
  1322. return b.group()
  1323. def clean_ai_entity(entity,set_entitys):
  1324. if isinstance(entity,str):
  1325. if re.search("(未|无)(明确|提及)|某(部|单位|医院|公司)|\*\*|XX|登录|详见|招标单位",entity) is not None:
  1326. return ""
  1327. if re.search("无|区|县|市|省|某|中心|部|公司",entity) is not None and len(entity)<=5:
  1328. return ""
  1329. # return entity
  1330. for _i in range(len(entity)-5):
  1331. if entity[:len(entity)-_i] in set_entitys:
  1332. return entity[:len(entity)-_i]
  1333. return ""
  1334. def clean_ai_extract(_extract,_extract_ai):
  1335. set_entitys = set()
  1336. set_moneys = set()
  1337. dict_enterprise = _extract.get("dict_enterprise",{})
  1338. for k,v in dict_enterprise.items():
  1339. set_entitys.add(str(k))
  1340. _sum = 0
  1341. for _money in _extract.get("moneys",[]):
  1342. _money = float(_money)
  1343. set_moneys.add(str(_money))
  1344. _sum += _money
  1345. if _sum>0:
  1346. set_moneys.add(str(float(_sum)))
  1347. _sum = 0
  1348. for _money in _extract.get("moneys_attachment",[]):
  1349. _money = float(_money)
  1350. set_moneys.add(str(_money))
  1351. _sum += _money
  1352. if _sum>0:
  1353. set_moneys.add(str(float(_sum)))
  1354. _tenderee_dict = _extract_ai.get("招标信息",{})
  1355. _tenderee_ai = _tenderee_dict.get("招标人名称")
  1356. if _tenderee_ai is not None:
  1357. _tenderee_ai = clean_ai_entity(_tenderee_ai,set_entitys)
  1358. _tenderee_dict["招标人名称"] = _tenderee_ai
  1359. # if _tenderee_ai in set_entitys:
  1360. # _tenderee_dict["招标人名称"] = _tenderee_ai
  1361. # else:
  1362. # _tenderee_dict["招标人名称"] = ""
  1363. _budget = _tenderee_dict.get("项目预算")
  1364. if _budget is not None:
  1365. _budget = getUnifyMoney(str(_budget))
  1366. _budget = str(float(_budget))
  1367. if _budget in set_moneys:
  1368. _tenderee_dict["项目预算"] = _budget
  1369. else:
  1370. _tenderee_dict["项目预算"] = ""
  1371. list_win = _extract_ai.get("中标信息",[])
  1372. for _win_dict_i in range(len(list_win)):
  1373. _win_dict = list_win[_win_dict_i]
  1374. _pack = _win_dict.get("标段号","")
  1375. if _pack=="":
  1376. if len(list_win)>1:
  1377. _pack = "AI_%d"%(_win_dict_i)
  1378. else:
  1379. _pack = "Project"
  1380. _win_money = _win_dict.get("中标金额")
  1381. if str(_win_money).find("%")>=0:
  1382. _win_money = 0
  1383. _win_money = getUnifyMoney(str(_win_money))
  1384. _win_money = str(float(_win_money))
  1385. if _win_money in set_moneys:
  1386. _win_dict["中标金额"] = _win_money
  1387. else:
  1388. _win_dict["中标金额"] = ""
  1389. _win_tenderer = _win_dict.get("中标人名称")
  1390. _win_tenderer = clean_ai_entity(_win_tenderer,set_entitys)
  1391. _win_dict["中标人名称"] = _win_tenderer
  1392. # if _win_tenderer in set_entitys:
  1393. # _win_dict["中标人名称"] = _win_tenderer
  1394. # else:
  1395. # _win_dict["中标人名称"] = ""
  1396. list_product = _extract_ai.get("产品信息",[])
  1397. for product_i in range(len(list_product)):
  1398. product_dict = list_product[product_i]
  1399. uni_price = product_dict.get("单价","")
  1400. quantity = product_dict.get("数量","")
  1401. total_price = product_dict.get("总价","")
  1402. if uni_price is not None and uni_price!="":
  1403. uni_price = getUnifyMoney(str(uni_price))
  1404. uni_price = str(float(uni_price))
  1405. if uni_price in set_moneys:
  1406. product_dict["单价"] = uni_price
  1407. else:
  1408. product_dict["单价"] = ""
  1409. if quantity is not None and quantity!="":
  1410. quantity = getUnifyMoney(str(quantity))
  1411. product_dict["数量"] = quantity
  1412. if total_price is not None and total_price!="":
  1413. total_price = getUnifyMoney(str(total_price))
  1414. total_price = str(float(total_price))
  1415. if total_price in set_moneys:
  1416. product_dict["总价"] = total_price
  1417. else:
  1418. product_dict["总价"] = ""
  1419. _extract = {}
  1420. if extract_json is not None:
  1421. try:
  1422. if isinstance(extract_json,str):
  1423. _extract = json.loads(extract_json)
  1424. else:
  1425. _extract = extract_json
  1426. except Exception as e:
  1427. pass
  1428. if "extract_count" not in _extract:
  1429. _extract["extract_count"] = 0
  1430. _extract_ai = {}
  1431. if extract_ai_json is not None:
  1432. try:
  1433. _extract_ai = json.loads(extract_ai_json)
  1434. except Exception as e:
  1435. pass
  1436. clean_ai_extract(_extract,_extract_ai)
  1437. _product = _extract.get("product",[])
  1438. list_new_product = _extract_ai.get("产品信息",[])
  1439. prem = _extract.get("prem")
  1440. if prem is None:
  1441. _extract["prem"] = {}
  1442. prem = _extract["prem"]
  1443. Project = prem.get("Project")
  1444. if Project is None:
  1445. prem["Project"] = {}
  1446. Project = prem["Project"]
  1447. Project_rolelist = Project.get("roleList")
  1448. if Project_rolelist is None:
  1449. Project["roleList"] = []
  1450. Project_rolelist = Project["roleList"]
  1451. has_tenderee = False
  1452. has_win_tenderer = False
  1453. has_budget = False
  1454. budget_unexpected = False
  1455. winprice_unexpected = False
  1456. for _pack,_pack_value in prem.items():
  1457. _rolelist = _pack_value.get("roleList",[])
  1458. for _role in _rolelist:
  1459. if _role.get("role_name","")=="tenderee":
  1460. has_tenderee = True
  1461. if _role.get("role_name","")=="win_tenderer":
  1462. has_win_tenderer = True
  1463. win_price = _role.get("role_money",{}).get("money",0)
  1464. try:
  1465. win_price = float(win_price)
  1466. except Exception as e:
  1467. win_price = 0
  1468. if win_price>0:
  1469. if win_price>100000000 or win_price<100:
  1470. winprice_unexpected = True
  1471. tendereeMoney = _pack_value.get("tendereeMoney",0)
  1472. try:
  1473. tendereeMoney = float(tendereeMoney)
  1474. except Exception as e:
  1475. tendereeMoney = 0
  1476. if tendereeMoney>0:
  1477. has_budget = True
  1478. if tendereeMoney>100000000 or tendereeMoney<100:
  1479. budget_unexpected = True
  1480. _changed = False
  1481. prem_original = {}
  1482. product_attrs_original = {}
  1483. try:
  1484. prem_original = json.loads(json.dumps(_extract.get("prem",{})))
  1485. product_attrs_original = json.loads(json.dumps(_extract.get("product_attrs",{})))
  1486. except Exception as e:
  1487. pass
  1488. if len(list_new_product)>0 and len(list_new_product)>len(product_attrs_original.get("data",[]))//3:
  1489. set_product = set(_product)
  1490. product_attrs_new = {"data":[]}
  1491. for product_i in range(len(list_new_product)):
  1492. brand = list_new_product[product_i].get("品牌","")
  1493. product = list_new_product[product_i].get("产品名称","")
  1494. quantity = list_new_product[product_i].get("数量","")
  1495. quantity_unit = list_new_product[product_i].get("数量单位","")
  1496. specs = list_new_product[product_i].get("规格型号","")
  1497. uni_price = list_new_product[product_i].get("单价","")
  1498. total_price = list_new_product[product_i].get("总价","")
  1499. pinmu_no = list_new_product[product_i].get("品目编号","")
  1500. pinmu_name = list_new_product[product_i].get("品目名称","")
  1501. if product=="":
  1502. continue
  1503. product_attrs_new["data"].append({
  1504. "brand": str(brand),
  1505. "product": product,
  1506. "quantity": str(quantity),
  1507. "quantity_unit": quantity_unit,
  1508. "specs": str(specs),
  1509. "unitPrice": str(uni_price),
  1510. "parameter": "",
  1511. "total_price": str(total_price),
  1512. "pinmu_no": str(pinmu_no),
  1513. "pinmu_name": str(pinmu_name)
  1514. })
  1515. if product not in set_product:
  1516. set_product.add(product)
  1517. _changed = True
  1518. _extract["product"] = list(set_product)
  1519. _extract["product_attrs"] = product_attrs_new
  1520. _extract["extract_count"] += 3
  1521. if not has_tenderee:
  1522. _tenderee_ai = _extract_ai.get("招标信息",{}).get("招标人名称")
  1523. _contacts = _extract_ai.get("招标信息",{}).get("招标人联系方式",[])
  1524. _budget = _extract_ai.get("招标信息",{}).get("项目预算","")
  1525. _linklist = []
  1526. for _conta in _contacts:
  1527. _person = _conta.get("联系人","")
  1528. _phone = _conta.get("联系电话","")
  1529. if _person!="" or _phone!="":
  1530. _linklist.append([_person,_phone])
  1531. if _tenderee_ai is not None and _tenderee_ai!="" and len(_tenderee_ai)>=4:
  1532. _role_dict = {
  1533. "role_name": "tenderee",
  1534. "role_text": _tenderee_ai,
  1535. "from_ai":True
  1536. }
  1537. if len(_linklist)>0:
  1538. _role_dict["linklist"] = _linklist
  1539. Project_rolelist.append(_role_dict)
  1540. _changed = True
  1541. _extract["extract_count"] += 1
  1542. if not has_budget or budget_unexpected:
  1543. _budget = _extract_ai.get("招标信息",{}).get("项目预算","")
  1544. if _budget is not None and _budget!="":
  1545. _budget = getUnifyMoney(_budget)
  1546. if _budget>0:
  1547. if "tendereeMoney" in Project:
  1548. Project["tendereeMoney_original"] = Project["tendereeMoney"]
  1549. Project["tendereeMoney"] = str(float(_budget))
  1550. _changed = True
  1551. _extract["extract_count"] += 1
  1552. else:
  1553. if budget_unexpected:
  1554. if "tendereeMoney" in Project:
  1555. Project["tendereeMoney_original"] = Project["tendereeMoney"]
  1556. Project["tendereeMoney"] = "0"
  1557. if not has_win_tenderer or winprice_unexpected:
  1558. list_win = _extract_ai.get("中标信息",[])
  1559. if len(list_win)>0:
  1560. winprice_unexpected_new = False
  1561. for _win_dict_i in range(len(list_win)):
  1562. _win_dict = list_win[_win_dict_i]
  1563. _pack = _win_dict.get("标段号","")
  1564. if _pack=="":
  1565. if len(list_win)>1:
  1566. _pack = "AI_%d"%(_win_dict_i)
  1567. else:
  1568. _pack = "Project"
  1569. _win_money = _win_dict.get("中标金额")
  1570. if _win_money is not None and _win_money!="":
  1571. _win_money = getUnifyMoney(_win_money)
  1572. else:
  1573. _win_money = 0
  1574. if _win_money>0:
  1575. if _win_money>100000000 or _win_money<100:
  1576. winprice_unexpected_new = True
  1577. has_delete_win = False
  1578. if winprice_unexpected and not winprice_unexpected_new:
  1579. has_delete_win = True
  1580. pop_packs = []
  1581. for _pack,_pack_value in prem.items():
  1582. _rolelist = _pack_value.get("roleList",[])
  1583. new_rolelist = []
  1584. for _role in _rolelist:
  1585. if _role.get("role_name","")!="win_tenderer":
  1586. new_rolelist.append(_role)
  1587. _pack_value["roleList"] = new_rolelist
  1588. if len(new_rolelist)==0 and _pack!="Project":
  1589. pop_packs.append(_pack)
  1590. for _pack in pop_packs:
  1591. prem.pop(_pack)
  1592. if not has_win_tenderer or has_delete_win:
  1593. Project_rolelist = Project.get("roleList")
  1594. if Project_rolelist is None:
  1595. Project["roleList"] = []
  1596. Project_rolelist = Project["roleList"]
  1597. for _win_dict_i in range(len(list_win)):
  1598. _win_dict = list_win[_win_dict_i]
  1599. _pack = _win_dict.get("标段号","")
  1600. if _pack=="":
  1601. if len(list_win)>1:
  1602. _pack = "AI_%d"%(_win_dict_i)
  1603. else:
  1604. _pack = "Project"
  1605. _win_money = _win_dict.get("中标金额")
  1606. if _win_money is not None and _win_money!="":
  1607. _win_money = getUnifyMoney(_win_money)
  1608. else:
  1609. _win_money = 0
  1610. _win_tenderer = _win_dict.get("中标人名称")
  1611. if _win_tenderer!="" and len(_win_tenderer)>=4:
  1612. _role_dict = {
  1613. "role_name": "win_tenderer",
  1614. "role_text": _win_tenderer,
  1615. "winprice_unexpected":winprice_unexpected,
  1616. "from_ai":True
  1617. }
  1618. _role_dict["role_money"] = {
  1619. "money": str(float(_win_money))
  1620. }
  1621. _changed = True
  1622. _extract["extract_count"] += 2
  1623. if _pack=="Project":
  1624. Project_rolelist.append(_role_dict)
  1625. else:
  1626. prem[_pack] = {
  1627. "roleList":[
  1628. _role_dict
  1629. ]
  1630. }
  1631. if _changed:
  1632. _extract["extract_ai"] = True
  1633. _extract["prem_original"] = prem_original
  1634. _extract["product_attrs_original"] = product_attrs_original
  1635. return json.dumps(_extract,ensure_ascii=False),_changed
  1636. def delete_document_extract(self,save_count=70*10000):
  1637. conn = self.pool_postgres.getConnector()
  1638. try:
  1639. cursor = conn.cursor()
  1640. sql = " select max(docid),min(docid) from document_extract "
  1641. cursor.execute(sql)
  1642. rows = cursor.fetchall()
  1643. if len(rows)>0:
  1644. maxdocid,mindocid = rows[0]
  1645. d_mindocid = int(maxdocid)-save_count
  1646. if mindocid<d_mindocid:
  1647. sql = " delete from document_extract where docid<%d"%d_mindocid
  1648. cursor.execute(sql)
  1649. conn.commit()
  1650. except Exception as e:
  1651. traceback.print_exc()
  1652. finally:
  1653. self.pool_postgres.putConnector(conn)
  1654. def start_flow_extract(self):
  1655. schedule = BlockingScheduler()
  1656. schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
  1657. schedule.add_job(self.process_extract_failed,"cron",minute="*/5")
  1658. # schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
  1659. # schedule.add_job(self.monitor_listener,"cron",minute="*/5")
  1660. schedule.start()
  1661. from multiprocessing import RLock
  1662. docid_lock = RLock()
  1663. conn_mysql = None
  1664. def generateRangeDocid(nums):
  1665. global conn_mysql
  1666. while 1:
  1667. try:
  1668. with docid_lock:
  1669. if conn_mysql is None:
  1670. conn_mysql = getConnection_mysql()
  1671. cursor = conn_mysql.cursor()
  1672. sql = "select serial_value from b2c_serial_no where serial_name='DocumentIdSerial'"
  1673. cursor.execute(sql)
  1674. rows = cursor.fetchall()
  1675. current_docid = rows[0][0]
  1676. next_docid = current_docid+1
  1677. update_docid = current_docid+nums
  1678. sql = " update b2c_serial_no set serial_value=%d where serial_name='DocumentIdSerial'"%(update_docid)
  1679. cursor.execute(sql)
  1680. conn_mysql.commit()
  1681. return next_docid
  1682. except Exception as e:
  1683. conn_mysql = getConnection_mysql()
  1684. # 自定义jsonEncoder
  1685. class MyEncoder(json.JSONEncoder):
  1686. def default(self, obj):
  1687. if isinstance(obj, np.ndarray):
  1688. return obj.tolist()
  1689. elif isinstance(obj, bytes):
  1690. return str(obj, encoding='utf-8')
  1691. elif isinstance(obj, (np.float_, np.float16, np.float32,
  1692. np.float64,Decimal)):
  1693. return float(obj)
  1694. elif isinstance(obj,str):
  1695. return obj
  1696. return json.JSONEncoder.default(self, obj)
  1697. class Dataflow_init(Dataflow):
  1698. class InitListener():
  1699. def __init__(self,conn,*args,**kwargs):
  1700. self.conn = conn
  1701. self.get_count = 1000
  1702. self.count = self.get_count
  1703. self.begin_docid = None
  1704. self.mq_init = "/queue/dataflow_init"
  1705. self.mq_attachment = "/queue/dataflow_attachment"
  1706. self.mq_attachment_fix = "/queue/dataflow_attachment_fix"
  1707. self.mq_attachment_his = "/queue/dataflow_attachment_his"
  1708. self.mq_extract = "/queue/dataflow_extract"
  1709. self.mq_extract_fix = "/queue/dataflow_extract_fix"
  1710. self.mq_extract_his = "/queue/dataflow_extract_his"
  1711. self.pool_mq1 = ConnectorPool(1,4,getConnect_activateMQ)
  1712. def on_error(self, headers):
  1713. log('received an error %s' % headers.body)
  1714. def getRangeDocid(self):
  1715. begin_docid = generateRangeDocid(self.get_count)
  1716. self.begin_docid = begin_docid
  1717. self.count = 0
  1718. def getNextDocid(self):
  1719. if self.count>=self.get_count:
  1720. self.getRangeDocid()
  1721. next_docid = self.begin_docid+self.count
  1722. self.count += 1
  1723. return next_docid
  1724. def on_message(self, headers):
  1725. try:
  1726. next_docid = int(self.getNextDocid())
  1727. partitionkey = int(next_docid%500+1)
  1728. message_id = headers.headers["message-id"]
  1729. body = json.loads(headers.body)
  1730. body[document_tmp_partitionkey] = partitionkey
  1731. body[document_tmp_docid] = next_docid
  1732. if body.get(document_original_docchannel) is None:
  1733. body[document_original_docchannel] = body.get(document_docchannel)
  1734. page_attachments = body.get(document_tmp_attachment_path,"[]")
  1735. _uuid = body.get(document_tmp_uuid,"")
  1736. current_date = getCurrent_date(format='%Y-%m-%d')
  1737. last_7_date = timeAdd(current_date,-7,format='%Y-%m-%d')
  1738. page_time = body.get(document_page_time,"")
  1739. if page_time<last_7_date:
  1740. is_his = True
  1741. else:
  1742. is_his = False
  1743. if page_attachments!="[]":
  1744. status = random.randint(1,10)
  1745. body[document_tmp_status] = status
  1746. if not is_his:
  1747. queue_name = self.mq_attachment
  1748. else:
  1749. queue_name = self.mq_attachment_his
  1750. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),queue_name):
  1751. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  1752. ackMsg(self.conn,message_id)
  1753. else:
  1754. log("send_msg_error on init listener")
  1755. else:
  1756. status = random.randint(11,50)
  1757. body[document_tmp_status] = status
  1758. if not is_his:
  1759. queue_name = self.mq_extract
  1760. else:
  1761. queue_name = self.mq_extract_his
  1762. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),queue_name):
  1763. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  1764. ackMsg(self.conn,message_id)
  1765. else:
  1766. log("send_msg_error on init listener")
  1767. except Exception as e:
  1768. traceback.print_exc()
  1769. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_init):
  1770. log("init error")
  1771. ackMsg(self.conn,message_id)
  1772. def __del__(self):
  1773. self.conn.disconnect()
  1774. del self.pool_mq1
  1775. def __init__(self):
  1776. Dataflow.__init__(self)
  1777. self.max_shenpi_id = None
  1778. self.base_shenpi_id = 400000000000
  1779. self.mq_init = "/queue/dataflow_init"
  1780. self.mq_attachment = "/queue/dataflow_attachment"
  1781. self.mq_attachment_fix = "/queue/dataflow_attachment_fix"
  1782. self.mq_attachment_his = "/queue/dataflow_attachment_his"
  1783. self.mq_extract = "/queue/dataflow_extract"
  1784. self.mq_extract_fix = "/queue/dataflow_extract_fix"
  1785. self.mq_extract_his = "/queue/dataflow_extract_his"
  1786. self.pool_oracle = ConnectorPool(10,15,getConnection_oracle)
  1787. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  1788. self.ots_capacity = getConnect_ots_capacity()
  1789. self.init_comsumer_counts = 2
  1790. self.list_init_comsumer = []
  1791. for i in range(self.init_comsumer_counts):
  1792. listener = self.InitListener(getConnect_activateMQ())
  1793. createComsumer(listener,self.mq_init)
  1794. self.list_init_comsumer.append(listener)
  1795. def monitor_listener(self):
  1796. for i in range(len(self.list_init_comsumer)):
  1797. if self.list_init_comsumer[i].conn.is_connected():
  1798. continue
  1799. else:
  1800. listener = self.InitListener(getConnect_activateMQ())
  1801. createComsumer(listener,self.mq_init)
  1802. self.list_init_comsumer[i] = listener
  1803. def temp2mq(self,object):
  1804. conn_oracle = self.pool_oracle.getConnector()
  1805. try:
  1806. list_obj = object.select_rows(conn_oracle,type(object),object.table_name,[])
  1807. for _obj in list_obj:
  1808. ots_dict = _obj.getProperties_ots()
  1809. if len(ots_dict.get("dochtmlcon",""))>500000:
  1810. _obj.delete_row(conn_oracle)
  1811. log("msg too long:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  1812. continue
  1813. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_init):
  1814. #删除数据,上线放开
  1815. _obj.delete_row(conn_oracle)
  1816. else:
  1817. log("send_msg_error111:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  1818. self.pool_oracle.putConnector(conn_oracle)
  1819. except Exception as e:
  1820. traceback.print_exc()
  1821. self.pool_oracle.decrease()
  1822. def shenpi2mq(self):
  1823. conn_oracle = self.pool_oracle.getConnector()
  1824. try:
  1825. if self.max_shenpi_id is None:
  1826. # get the max_shenpi_id
  1827. _query = BoolQuery(must_queries=[ExistsQuery("id")])
  1828. rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
  1829. SearchQuery(_query,sort=Sort(sorters=[FieldSort("id",SortOrder.DESC)]),limit=1))
  1830. list_data = getRow_ots(rows)
  1831. if len(list_data)>0:
  1832. max_shenpi_id = list_data[0].get("id")
  1833. if max_shenpi_id>self.base_shenpi_id:
  1834. max_shenpi_id -= self.base_shenpi_id
  1835. self.max_shenpi_id = max_shenpi_id
  1836. if self.max_shenpi_id<60383953:
  1837. self.max_shenpi_id = 60383953
  1838. if self.max_shenpi_id is not None:
  1839. # select data in order
  1840. origin_max_shenpi_id = T_SHEN_PI_XIANG_MU.get_max_id(conn_oracle)
  1841. if origin_max_shenpi_id is not None:
  1842. log("shenpi origin_max_shenpi_id:%d current_id:%d"%(origin_max_shenpi_id,self.max_shenpi_id))
  1843. for _id_i in range(self.max_shenpi_id+1,origin_max_shenpi_id+1):
  1844. list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i)
  1845. # send data to mq one by one with max_shenpi_id updated
  1846. for _data in list_data:
  1847. _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
  1848. ots_dict = _data.getProperties_ots()
  1849. if ots_dict["docid"]<self.base_shenpi_id:
  1850. ots_dict["docid"] += self.base_shenpi_id
  1851. ots_dict["partitionkey"] = ots_dict["docid"]%500+1
  1852. current_date = getCurrent_date(format='%Y-%m-%d')
  1853. last_7_date = timeAdd(current_date,-7,format='%Y-%m-%d')
  1854. page_time = ots_dict.get(document_page_time,"")
  1855. if page_time<last_7_date:
  1856. is_his = True
  1857. else:
  1858. is_his = False
  1859. if ots_dict.get(T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS,"") !='[]':
  1860. if not is_his:
  1861. queue_name = self.mq_attachment
  1862. else:
  1863. queue_name = self.mq_attachment_his
  1864. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),queue_name):
  1865. self.max_shenpi_id = _id
  1866. else:
  1867. log("sent shenpi message to mq failed %s"%(_id))
  1868. break
  1869. else:
  1870. if not is_his:
  1871. queue_name = self.mq_extract
  1872. else:
  1873. queue_name = self.mq_extract_his
  1874. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),queue_name):
  1875. self.max_shenpi_id = _id
  1876. else:
  1877. log("sent shenpi message to mq failed %s"%(_id))
  1878. break
  1879. self.pool_oracle.putConnector(conn_oracle)
  1880. except Exception as e:
  1881. log("shenpi error")
  1882. traceback.print_exc()
  1883. self.pool_oracle.decrease()
  1884. def fix_shenpi(self):
  1885. pool_oracle = ConnectorPool(10,15,getConnection_oracle)
  1886. begin_id = 0
  1887. end_id = 64790010
  1888. thread_num = 15
  1889. step = (end_id-begin_id)//thread_num
  1890. list_items = []
  1891. for _i in range(thread_num):
  1892. _begin = _i*step
  1893. _end = (_i+1)*step-1
  1894. if _i==thread_num-1:
  1895. _end = end_id
  1896. list_items.append((_begin,_end,_i))
  1897. task_queue = Queue()
  1898. for item in list_items:
  1899. task_queue.put(item)
  1900. fix_count_list = []
  1901. def _handle(item,result_queue):
  1902. conn_oracle = pool_oracle.getConnector()
  1903. (begin_id,end_id,thread_id) = item
  1904. _count = 0
  1905. for _id_i in range(begin_id,end_id):
  1906. try:
  1907. bool_query = BoolQuery(must_queries=[
  1908. TermQuery("docchannel",302),
  1909. TermQuery("original_id",_id_i)
  1910. ])
  1911. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1912. SearchQuery(bool_query,get_total_count=True))
  1913. if total_count>0:
  1914. continue
  1915. # bool_query = BoolQuery(must_queries=[
  1916. # TermQuery("id",_id_i),
  1917. # ])
  1918. # rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
  1919. # SearchQuery(bool_query,get_total_count=True))
  1920. # if total_count>0:
  1921. # continue
  1922. try:
  1923. list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i)
  1924. except Exception as e:
  1925. continue
  1926. # send data to mq one by one with max_shenpi_id updated
  1927. for _data in list_data:
  1928. _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
  1929. ots_dict = _data.getProperties_ots()
  1930. if ots_dict["docid"]<self.base_shenpi_id:
  1931. ots_dict["docid"] += self.base_shenpi_id
  1932. ots_dict["partitionkey"] = ots_dict["docid"]%500+1
  1933. ots_dict["status"] = 201
  1934. dict_1 = {}
  1935. dict_2 = {}
  1936. for k,v in ots_dict.items():
  1937. if k!="dochtmlcon":
  1938. dict_1[k] = v
  1939. if k in ('partitionkey',"docid","dochtmlcon"):
  1940. dict_2[k] = v
  1941. d_1 = Document(dict_1)
  1942. d_2 = Document(dict_2)
  1943. d_1.update_row(self.ots_client)
  1944. d_2.update_row(self.ots_capacity)
  1945. _count += 1
  1946. except Exception as e:
  1947. traceback.print_exc()
  1948. log("thread_id:%d=%d/%d/%d"%(thread_id,_id_i-begin_id,_count,end_id-begin_id))
  1949. fix_count_list.append(_count)
  1950. pool_oracle.putConnector(conn_oracle)
  1951. mt = MultiThreadHandler(task_queue,_handle,None,thread_count=thread_num)
  1952. mt.run()
  1953. print(fix_count_list,sum(fix_count_list))
  1954. def ots2mq(self):
  1955. try:
  1956. from BaseDataMaintenance.java.MQInfo import getQueueSize
  1957. attachment_size = getQueueSize("dataflow_attachment")
  1958. extract_size = getQueueSize("dataflow_extract")
  1959. if attachment_size>1000 or extract_size>1000:
  1960. log("ots2mq break because of long queue size")
  1961. return
  1962. bool_query = BoolQuery(must_queries=[RangeQuery("status",1,51)])
  1963. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1964. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  1965. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1966. list_data = getRow_ots(rows)
  1967. task_queue = Queue()
  1968. for _data in list_data:
  1969. task_queue.put(_data)
  1970. while next_token:
  1971. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1972. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1973. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1974. list_data = getRow_ots(rows)
  1975. for _data in list_data:
  1976. task_queue.put(_data)
  1977. if task_queue.qsize()>=1000:
  1978. break
  1979. def _handle(_data,result_queue):
  1980. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1981. document_tmp_docid:_data.get(document_tmp_docid),
  1982. document_tmp_status:0}
  1983. _document = Document(_d)
  1984. _document.fix_columns(self.ots_client,None,True)
  1985. _data = _document.getProperties()
  1986. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1987. _document_html = Document(_data)
  1988. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  1989. if page_attachments!="[]":
  1990. status = random.randint(1,10)
  1991. _data[document_tmp_status] = status
  1992. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment_fix)
  1993. else:
  1994. status = random.randint(11,50)
  1995. _data[document_tmp_status] = status
  1996. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract_fix)
  1997. if send_succeed:
  1998. _document.setValue(document_tmp_status,0,True)
  1999. _document.update_row(self.ots_client)
  2000. else:
  2001. log("send_msg_error2222")
  2002. if task_queue.qsize()>0:
  2003. mt = MultiThreadHandler(task_queue,_handle,None,15)
  2004. mt.run()
  2005. except Exception as e:
  2006. traceback.print_exc()
  2007. def otstmp2mq(self):
  2008. try:
  2009. from BaseDataMaintenance.java.MQInfo import getQueueSize
  2010. attachment_size = getQueueSize("dataflow_attachment")
  2011. extract_size = getQueueSize("dataflow_extract")
  2012. if attachment_size>1000 or extract_size>1000:
  2013. log("otstmp2mq break because of long queue size")
  2014. return
  2015. bool_query = BoolQuery(must_queries=[TermQuery("status",0)])
  2016. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  2017. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  2018. ColumnsToGet(return_type=ColumnReturnType.ALL))
  2019. list_data = getRow_ots(rows)
  2020. for _data in list_data:
  2021. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  2022. document_tmp_docid:_data.get(document_tmp_docid),
  2023. document_tmp_status:0}
  2024. _document = Document_tmp(_d)
  2025. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  2026. log("refix doc %s from document_tmp"%(str(_data.get(document_tmp_docid))))
  2027. _document_html = Document_html(_data)
  2028. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  2029. if page_attachments!="[]":
  2030. status = random.randint(1,10)
  2031. _data[document_tmp_status] = status
  2032. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment_fix)
  2033. else:
  2034. status = random.randint(11,50)
  2035. _data[document_tmp_status] = status
  2036. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract_fix)
  2037. if send_succeed:
  2038. _document.setValue(document_tmp_status,1,True)
  2039. _document.update_row(self.ots_client)
  2040. else:
  2041. log("send_msg_error2222")
  2042. while next_token:
  2043. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  2044. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  2045. ColumnsToGet(return_type=ColumnReturnType.ALL))
  2046. list_data = getRow_ots(rows)
  2047. for _data in list_data:
  2048. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  2049. document_tmp_docid:_data.get(document_tmp_docid),
  2050. document_tmp_status:0}
  2051. _document = Document_tmp(_d)
  2052. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  2053. _document_html = Document_html(_data)
  2054. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  2055. if page_attachments!="[]":
  2056. status = random.randint(1,10)
  2057. _data[document_tmp_status] = status
  2058. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  2059. else:
  2060. status = random.randint(11,50)
  2061. _data[document_tmp_status] = status
  2062. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  2063. if send_succeed:
  2064. _document.setValue(document_tmp_status,1,True)
  2065. _document.update_row(self.ots_client)
  2066. else:
  2067. log("send_msg_error2222")
  2068. except Exception as e:
  2069. traceback.print_exc()
  2070. def test_dump_docid(self):
  2071. class TestDumpListener(ActiveMQListener):
  2072. def on_message(self, headers):
  2073. message_id = headers.headers["message-id"]
  2074. body = headers.body
  2075. self._queue.put(headers,True)
  2076. ackMsg(self.conn,message_id)
  2077. _queue = Queue()
  2078. listener1 = TestDumpListener(getConnect_activateMQ(),_queue)
  2079. listener2 = TestDumpListener(getConnect_activateMQ(),_queue)
  2080. createComsumer(listener1,"/queue/dataflow_attachment")
  2081. createComsumer(listener2,"/queue/dataflow_extract")
  2082. time.sleep(10)
  2083. list_item = []
  2084. list_docid = []
  2085. while 1:
  2086. try:
  2087. _item = _queue.get(timeout=2)
  2088. list_item.append(_item)
  2089. except Exception as e:
  2090. break
  2091. for item in list_item:
  2092. _item = json.loads(item.body)
  2093. list_docid.append(_item.get("docid"))
  2094. log(list_docid[:10])
  2095. log("len docid:%d set len:%d"%(len(list_docid),len(set(list_docid))))
  2096. def start_dataflow_init(self):
  2097. # self.test_dump_docid()
  2098. from BaseDataMaintenance.model.oracle.CaiGouYiXiangTemp import CaiGouYiXiangTemp
  2099. from BaseDataMaintenance.model.oracle.PaiMaiChuRangTemp import PaiMaiChuRangTemp
  2100. from BaseDataMaintenance.model.oracle.ZhaoBiaoGongGaoTemp import ZhaoBiaoGongGaoTemp
  2101. from BaseDataMaintenance.model.oracle.ZhaoBiaoYuGaoTemp import ZhaoBiaoYuGaoTemp
  2102. from BaseDataMaintenance.model.oracle.ZhongBiaoXinXiTemp import ZhongBiaoXinXiTemp
  2103. from BaseDataMaintenance.model.oracle.ZiShenJieGuoTemp import ZiShenJieGuoTemp
  2104. from BaseDataMaintenance.model.oracle.ChanQuanJiaoYiTemp import ChanQuanJiaoYiTemp
  2105. from BaseDataMaintenance.model.oracle.GongGaoBianGengTemp import GongGaoBianGeng
  2106. from BaseDataMaintenance.model.oracle.KongZhiJiaTemp import KongZhiJiaTemp
  2107. from BaseDataMaintenance.model.oracle.TuDiKuangChanTemp import TuDiKuangChanTemp
  2108. from BaseDataMaintenance.model.oracle.ZhaoBiaoDaYiTemp import ZhaoBiaoDaYiTemp
  2109. from BaseDataMaintenance.model.oracle.ZhaoBiaoWenJianTemp import ZhaoBiaoWenJianTemp
  2110. from BaseDataMaintenance.model.oracle.TouSuChuLiTemp import TouSuChuLiTemp
  2111. from BaseDataMaintenance.model.oracle.WeiFaJiLuTemp import WeiFaJiLuTemp
  2112. from BaseDataMaintenance.model.oracle.QiTaShiXinTemp import QiTaShiXin
  2113. schedule = BlockingScheduler()
  2114. schedule.add_job(self.temp2mq,"cron",args=(CaiGouYiXiangTemp({}),),second="*/10")
  2115. schedule.add_job(self.temp2mq,"cron",args=(PaiMaiChuRangTemp({}),),second="*/10")
  2116. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoGongGaoTemp({}),),second="*/10")
  2117. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoYuGaoTemp({}),),second="*/10")
  2118. schedule.add_job(self.temp2mq,"cron",args=(ZhongBiaoXinXiTemp({}),),second="*/10")
  2119. schedule.add_job(self.temp2mq,"cron",args=(ZiShenJieGuoTemp({}),),second="*/10")
  2120. schedule.add_job(self.temp2mq,"cron",args=(ChanQuanJiaoYiTemp({}),),second="*/10")
  2121. schedule.add_job(self.temp2mq,"cron",args=(GongGaoBianGeng({}),),second="*/10")
  2122. schedule.add_job(self.temp2mq,"cron",args=(KongZhiJiaTemp({}),),second="*/10")
  2123. schedule.add_job(self.temp2mq,"cron",args=(TuDiKuangChanTemp({}),),second="*/10")
  2124. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10")
  2125. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10")
  2126. schedule.add_job(self.temp2mq,"cron",args=(TouSuChuLiTemp({}),),second="*/10")
  2127. schedule.add_job(self.temp2mq,"cron",args=(WeiFaJiLuTemp({}),),second="*/10")
  2128. schedule.add_job(self.temp2mq,"cron",args=(QiTaShiXin({}),),second="*/10")
  2129. schedule.add_job(self.ots2mq,"cron",second="*/10")
  2130. schedule.add_job(self.otstmp2mq,"cron",second="*/10")
  2131. schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  2132. schedule.add_job(self.shenpi2mq,"cron",minute="*/1")
  2133. schedule.start()
  2134. def transform_attachment():
  2135. from BaseDataMaintenance.model.ots.attachment import attachment
  2136. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  2137. from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots
  2138. from threading import Thread
  2139. from queue import Queue
  2140. task_queue = Queue()
  2141. def comsumer(task_queue,pool_postgres):
  2142. while 1:
  2143. _dict = task_queue.get(True)
  2144. try:
  2145. attach = Attachment_postgres(_dict)
  2146. if not attach.exists(pool_postgres):
  2147. attach.insert_row(pool_postgres)
  2148. except Exception as e:
  2149. traceback.print_exc()
  2150. def start_comsumer(task_queue):
  2151. pool_postgres = ConnectorPool(10,30,getConnection_postgres)
  2152. comsumer_count = 30
  2153. list_thread = []
  2154. for i in range(comsumer_count):
  2155. _t = Thread(target=comsumer,args=(task_queue,pool_postgres))
  2156. list_thread.append(_t)
  2157. for _t in list_thread:
  2158. _t.start()
  2159. ots_client = getConnect_ots()
  2160. _thread = Thread(target=start_comsumer,args=(task_queue,))
  2161. _thread.start()
  2162. bool_query = BoolQuery(must_queries=[
  2163. RangeQuery(attachment_crtime,"2022-05-06"),
  2164. BoolQuery(should_queries=[TermQuery(attachment_status,10),
  2165. TermQuery(attachment_status,ATTACHMENT_TOOLARGE)])
  2166. ])
  2167. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  2168. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(attachment_crtime,sort_order=SortOrder.DESC)]),get_total_count=True,limit=100),
  2169. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  2170. list_dict = getRow_ots(rows)
  2171. for _dict in list_dict:
  2172. task_queue.put(_dict,True)
  2173. _count = len(list_dict)
  2174. while next_token:
  2175. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  2176. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  2177. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  2178. list_dict = getRow_ots(rows)
  2179. for _dict in list_dict:
  2180. task_queue.put(_dict,True)
  2181. _count += len(list_dict)
  2182. print("%d/%d,queue:%d"%(_count,total_count,task_queue.qsize()))
  2183. def del_test_doc():
  2184. ots_client = getConnect_ots()
  2185. bool_query = BoolQuery(must_queries=[RangeQuery("docid",range_to=0)])
  2186. list_data = []
  2187. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  2188. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  2189. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  2190. print(total_count)
  2191. list_row = getRow_ots(rows)
  2192. list_data.extend(list_row)
  2193. while next_token:
  2194. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  2195. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  2196. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  2197. list_row = getRow_ots(rows)
  2198. list_data.extend(list_row)
  2199. for _row in list_data:
  2200. _doc = Document_tmp(_row)
  2201. _doc.delete_row(ots_client)
  2202. _html = Document_html(_row)
  2203. if _html.exists_row(ots_client):
  2204. _html.delete_row(ots_client)
  2205. def fixDoc_to_queue_extract():
  2206. pool_mq = ConnectorPool(10,20,getConnect_activateMQ)
  2207. try:
  2208. ots_client = getConnect_ots()
  2209. ots_capacity = getConnect_ots_capacity()
  2210. bool_query = BoolQuery(must_queries=[
  2211. RangeQuery("crtime","2022-05-31"),
  2212. TermQuery("docchannel",114)
  2213. ])
  2214. list_data = []
  2215. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  2216. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  2217. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  2218. print(total_count)
  2219. list_row = getRow_ots(rows)
  2220. list_data.extend(list_row)
  2221. _count = len(list_row)
  2222. while next_token:
  2223. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  2224. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  2225. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  2226. list_row = getRow_ots(rows)
  2227. list_data.extend(list_row)
  2228. _count = len(list_data)
  2229. print("%d/%d"%(_count,total_count))
  2230. task_queue = Queue()
  2231. for _row in list_data:
  2232. if "all_columns" in _row:
  2233. _row.pop("all_columns")
  2234. _html = Document(_row)
  2235. task_queue.put(_html)
  2236. def _handle(item,result_queue):
  2237. _html = item
  2238. _html.fix_columns(ots_capacity,["dochtmlcon"],True)
  2239. print(_html.getProperties().get(document_tmp_docid))
  2240. send_msg_toacmq(pool_mq,json.dumps(_html.getProperties()),"/queue/dataflow_extract")
  2241. mt = MultiThreadHandler(task_queue,_handle,None,30)
  2242. mt.run()
  2243. except Exception as e:
  2244. traceback.print_exc()
  2245. finally:
  2246. pool_mq.destory()
  2247. def check_data_synchronization():
  2248. # filepath = "C:\\Users\\Administrator\\Desktop\\to_check.log"
  2249. # list_uuid = []
  2250. # _regrex = "delete\s+(?P<tablename>[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  2251. # with open(filepath,"r",encoding="utf8") as f:
  2252. # while 1:
  2253. # _line = f.readline()
  2254. # if not _line:
  2255. # break
  2256. # _match = re.search(_regrex,_line)
  2257. # if _match is not None:
  2258. # _uuid = _match.groupdict().get("uuid")
  2259. # tablename = _match.groupdict.get("tablename")
  2260. # if _uuid is not None:
  2261. # list_uuid.append({"uuid":_uuid,"tablename":tablename})
  2262. # print("total_count:",len(list_uuid))
  2263. import pandas as pd
  2264. from BaseDataMaintenance.common.Utils import load
  2265. task_queue = Queue()
  2266. list_data = []
  2267. df_data = load("uuid.pk")
  2268. # df_data = pd.read_excel("check.xlsx")
  2269. for uuid,tablename in zip(df_data["uuid"],df_data["tablename"]):
  2270. _dict = {"uuid":uuid,
  2271. "tablename":tablename}
  2272. list_data.append(_dict)
  2273. task_queue.put(_dict)
  2274. print("qsize:",task_queue.qsize())
  2275. ots_client = getConnect_ots()
  2276. def _handle(_item,result_queue):
  2277. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  2278. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  2279. SearchQuery(bool_query,get_total_count=True),
  2280. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  2281. _item["exists"] = total_count
  2282. mt = MultiThreadHandler(task_queue,_handle,None,30)
  2283. mt.run()
  2284. df_data = {"uuid":[],
  2285. "tablename":[],
  2286. "exists":[]}
  2287. for _data in list_data:
  2288. if _data["exists"]==0:
  2289. for k,v in df_data.items():
  2290. v.append(_data.get(k))
  2291. import pandas as pd
  2292. df2 = pd.DataFrame(df_data)
  2293. df2.to_excel("check1.xlsx")
  2294. current_path = os.path.abspath(os.path.dirname(__file__))
  2295. def test_ai_extract():
  2296. _dochtmlcon = '''
  2297. <div>
  2298. <div>
  2299. 公告内容:
  2300. </div>
  2301. <div>
  2302. <p>宫腔镜</p>
  2303. <div>
  2304. <p> </p>
  2305. <p> <span>440101-2025-07530</span> </p>
  2306. </div>
  2307. <div></div>
  2308. <div>
  2309. <div>
  2310. <div>
  2311. <div>
  2312. <table>
  2313. <tbody>
  2314. <tr>
  2315. <td> 一、采购人: <a target="_blank" class="markBlue" href="/bdqyhx/670312265132728320.html" style="color: #3083EB !important;text-decoration: underline;">广州医科大学附属妇女儿童医疗中心</a> </td>
  2316. </tr>
  2317. <tr>
  2318. <td> 二、采购计划编号: 440101-2025-07530 </td>
  2319. </tr>
  2320. <tr>
  2321. <td> 三、采购计划名称: 宫腔镜 </td>
  2322. </tr>
  2323. <tr>
  2324. <td> 四、采购品目名称: 医用内窥镜 </td>
  2325. </tr>
  2326. <tr>
  2327. <td> 五、采购预算金额(元): 1920000.00 </td>
  2328. </tr>
  2329. <tr>
  2330. <td> 六、需求时间: </td>
  2331. </tr>
  2332. <tr>
  2333. <td> 七、采购方式: 公开招标 </td>
  2334. </tr>
  2335. <tr>
  2336. <td> 八、备案时间: 2025-05-06 09:12:11 </td>
  2337. </tr>
  2338. </tbody>
  2339. </table>
  2340. <div>
  2341. <table>
  2342. <tbody>
  2343. <tr>
  2344. <td>发布人:<a target="_blank" class="markBlue" href="/bdqyhx/670312265132728320.html" style="color: #3083EB !important;text-decoration: underline;">广州医科大学附属妇女儿童医疗中心</a></td>
  2345. </tr>
  2346. <tr>
  2347. <td>发布时间: 2025年 05月 06日 </td>
  2348. </tr>
  2349. </tbody>
  2350. </table>
  2351. </div>
  2352. </div>
  2353. </div>
  2354. </div>
  2355. </div>
  2356. </div>
  2357. </div>
  2358. '''
  2359. _extract_json = '''
  2360. { "addr_dic": {}, "aptitude": "", "attachmentTypes": "", "bid_score": [], "bidway": "公开招标", "candidate": "", "code": [ "440101-2025-07530" ], "code_investment": "", "cost_time": { "attrs": 0.07, "codename": 0.03, "deposit": 0.0, "district": 0.12, "kvtree": 0.03, "moneygrade": 0.0, "nerToken": 0.04, "outline": 0.03, "pb_extract": 0.16, "person": 0.0, "prem": 0.04, "preprocess": 0.11, "product": 0.02, "product_attrs": 0.01, "roleRuleFinal": 0.01, "rolegrade": 0.0, "rule": 0.01, "rule_channel": 0.03, "rule_channel2": 0.16, "tableToText": 0.03000166893005371, "tendereeRuleRecall": 0.0, "time": 0.01, "total_unit_money": 0.0 }, "demand_info": { "data": [], "header": [], "header_col": [] }, "deposit_patment_way": "", "dict_enterprise": { "广州医科大学附属妇女儿童医疗中心": { "in_text": 1 } }, "district": { "area": "华南", "city": "广州", "district": "未知", "is_in_text": false, "province": "广东" }, "docchannel": { "docchannel": "招标预告", "doctype": "采招数据", "life_docchannel": "招标预告", "use_original_docchannel": 0 }, "docid": "", "doctitle_refine": "宫腔镜", "exist_table": 1, "extract_count": 4, "fail_reason": "", "fingerprint": "md5=27c02035e60e7cc1b7b8be65eedf92fd", "industry": { "class": "零售批发", "class_name": "医疗设备", "subclass": "专用设备" }, "is_deposit_project": false, "label_dic": {}, "match_enterprise": [], "match_enterprise_type": 0, "moneys": [ 1920000.0 ], "moneys_attachment": [], "moneysource": "", "name": "医用内窥镜", "nlp_enterprise": [ "广州医科大学附属妇女儿童医疗中心" ], "nlp_enterprise_attachment": [], "pb": { "industry": "教育及研究", "project_name_refind": "医用内窥镜", "project_property": "新建" }, "pb_project_name": "医用内窥镜", "person_review": [], "pinmu_name": "医用内窥镜", "policies": [], "prem": { "Project": { "code": "", "name": "医用内窥镜", "roleList": [ { "address": "", "linklist": [], "role_money": { "discount_ratio": "", "downward_floating_ratio": "", "floating_ratio": "", "money": 0, "money_unit": "" }, "role_name": "tenderee", "role_prob": 0.9499999463558197, "role_text": "广州医科大学附属妇女儿童医疗中心", "serviceTime": "" } ], "tendereeMoney": "1920000.00", "tendereeMoneyUnit": "元", "uuid": "b1f25984-d473-4995-aec6-dbdba27fa898" } }, "process_time": "2025-05-06 09:24:32", "product": [ "宫腔镜", "医用内窥镜" ], "product_attrs": { "data": [], "header": [], "header_col": [] }, "project_contacts": [], "project_label": { "标题": { "医疗检测设备": [ [ "宫腔镜", 1 ] ] }, "核心字段": { "医疗检测设备": [ [ "宫腔镜", 2 ], [ "内窥镜", 2 ] ] } }, "property_label": "", "proportion": "", "requirement": "", "serviceTime": { "service_days": 0, "service_end": "", "service_start": "" }, "success": true, "time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_contractEnd": "", "time_contractStart": "", "time_earnestMoneyEnd": "", "time_earnestMoneyStart": "", "time_getFileEnd": "", "time_getFileStart": "", "time_listingEnd": "", "time_listingStart": "", "time_planned": "", "time_publicityEnd": "", "time_publicityStart": "", "time_registrationEnd": "", "time_registrationStart": "", "time_release": "2025-05-06", "time_signContract": "", "total_tendereeMoney": 0, "total_tendereeMoneyUnit": "", "version_date": "2025-04-23", "word_count": { "正文": 211, "附件": 0 } }
  2361. '''
  2362. _text = html2text_with_tablehtml(_dochtmlcon)
  2363. if len(_text)<25000:
  2364. model_name = "ep-20250314164242-jd62g" #1.5pro 32k
  2365. else:
  2366. _text = _text[:100000]
  2367. model_name = "ep-20250212111145-fflr7" #1.5pro 256k
  2368. msg = get_prompt_extract_role(_text)
  2369. #model_name = "ep-20250212111145-fflr7" #1.5pro 256k
  2370. #model_name = "ep-20250314164242-jd62g" #1.5pro 32k
  2371. result = chat_doubao(msg,model_name=model_name)
  2372. _json = get_json_from_text(result)
  2373. if _json is not None:
  2374. try:
  2375. _extract_ai = json.loads(_json)
  2376. except Exception as e:
  2377. pass
  2378. if len(_extract_ai.keys())>0:
  2379. _new_json,_changed = de.merge_json(_extract_json,_json)
  2380. print("new_json")
  2381. print(_new_json)
  2382. if __name__ == '__main__':
  2383. # di = Dataflow_init()
  2384. # di.start_dataflow_init()
  2385. # transform_attachment()
  2386. # del_test_doc()
  2387. de = Dataflow_ActivteMQ_extract(create_listener=False)
  2388. # print(getUnifyMoney('第1 - 5年承包费为1135元/年/亩,第6 - 10年承包费为1235元/年/亩'))
  2389. a = '''
  2390. '{ "addr_dic": {}, "aptitude": "", "attachmentTypes": "", "bid_score": [], "bidway": "", "candidate": "", "code": [], "code_investment": "", "cost_time": { "attrs": 0.4, "codename": 0.37, "deposit": 0.0, "district": 0.08, "kvtree": 0.57, "moneygrade": 0.0, "nerToken": 0.95, "outline": 0.13, "pb_extract": 0.45, "person": 0.04, "prem": 0.03, "preprocess": 1.77, "product": 0.39, "product_attrs": 0.12, "roleRuleFinal": 0.01, "rolegrade": 0.0, "rule": 0.03, "rule_channel": 0.15, "rule_channel2": 0.38, "tableToText": 0.4500255107879639, "tendereeRuleRecall": 0.0, "time": 0.01, "total_unit_money": 0.0 }, "demand_info": { "data": [], "header": [], "header_col": [] }, "deposit_patment_way": "", "dict_enterprise": { "中国建设银行股份有限公司烟台西南河支行": { "credit_code": "91370600865039107D", "in_text": 1 }, "烟台东泰物流有限公司": { "credit_code": "91370600088903828T", "in_text": 1 } }, "district": { "area": "华东", "city": "烟台", "district": "未知", "is_in_text": true, "province": "山东" }, "docchannel": { "docchannel": "招标公告", "doctype": "采招数据", "life_docchannel": "招标公告", "use_original_docchannel": 0 }, "docid": "", "doctitle_refine": "通风管道设备:东泰物流任务", "exist_table": 1, "extract_count": 6, "fail_reason": "", "fingerprint": "md5=39779a8d2c8700c767f8cc77794f4466", "industry": { "class": "零售批发", "class_name": "机械设备", "subclass": "通用设备" }, "is_deposit_project": false, "label_dic": { "need_qualification": 1 }, "match_enterprise": [], "match_enterprise_type": 0, "moneys": [ 580000.0 ], "moneys_attachment": [], "moneysource": "", "name": "通风管道设备", "nlp_enterprise": [ "烟台东泰物流有限公司", "中国建设银行股份有限公司烟台西南河支行" ], "nlp_enterprise_attachment": [], "pb": { "project_name_refind": "通风管道设备东泰物流任务", "project_property": "新建" }, "pb_project_name": "通风管道设备东泰物流任务", "person_review": [], "pinmu_name": "", "policies": [], "prem": { "Project": { "code": "", "name": "通风管道设备", "roleList": [], "tendereeMoney": "580000", "tendereeMoneyUnit": "万", "uuid": "e16168ae-3222-4bff-9b65-f8daf985da6e" } }, "process_time": "2025-05-22 12:06:34", "product": [ "通风管道设备", "镀锌钢板风管", "单层百叶风口", "多叶送风口", "网式送风口", "矩形排烟防火阀", "矩形280°C防火阀", "矩形280°C排烟防火阀", "圆形280°C防火阀", "矩形70°C防火阀", "圆形70°C防火阀", "风管止回阀", "圆形电动对开多叶调节阀", "矩形手动对开多叶调节阀", "防火软连接", "阻抗复合式消声器", "轴流式排烟风机", "混流风机", "排烟风机", "排风机", "送风机", "换气扇" ], "product_attrs": { "data": [ { "product": "镀锌钢板风管", "quantity": "800", "quantity_unit": "平方米", "specs": "δ 1.5mm" }, { "product": "单层百叶风口", "quantity": "1", "quantity_unit": "个", "specs": "1800*1800" }, { "product": "多叶送风口", "quantity": "8", "quantity_unit": "个", "specs": "630*(1000+250)" }, { "product": "网式送风口", "quantity": "1", "quantity_unit": "个", "specs": "800*500" }, { "product": "矩形排烟防火阀", "quantity": "6", "quantity_unit": "个", "specs": "2000*320" }, { "product": "矩形280°C防火阀", "quantity": "6", "quantity_unit": "个", "specs": "2000*250 2000*320" }, { "product": "矩形280°C排烟防火阀", "quantity": "6", "quantity_unit": "个", "specs": "1000*1000" }, { "product": "圆形280°C防火阀", "quantity": "8", "quantity_unit": "个", "specs": "Φ670" }, { "product": "矩形70°C防火阀", "quantity": "4", "quantity_unit": "个", "specs": "1000*320" }, { "product": "圆形70°C防火阀", "quantity": "16", "quantity_unit": "个", "specs": "Φ750" }, { "product": "风管止回阀", "quantity": "6", "quantity_unit": "个", "specs": "1000*1000" }, { "product": "圆形电动对开多叶调节阀", "quantity": "4", "quantity_unit": "个", "specs": "Φ400" }, { "product": "矩形手动对开多叶调节阀", "quantity": "1", "quantity_unit": "个", "specs": "1250*320" }, { "product": "防火软连接", "quantity": "90", "quantity_unit": "m2", "specs": "A级不燃" }, { "product": "阻抗复合式消声器", "quantity": "6", "quantity_unit": "台", "specs": "2000*320*900" }, { "brand": "格瑞德、亚太、中大空调", "product": "轴流式排烟风机", "quantity": "8", "quantity_unit": "台", "specs": "12693m3/h,全压536pa,风机效率为0.72,功率4.0kw,转速2900rpm,噪声86dB,(A)ws=0.24" }, { "brand": "格瑞德、亚太、中大空调", "product": "混流风机", "quantity": "8", "quantity_unit": "台", "specs": "16260/10732m3/h,全压504/222pa风机效率为0.72,功率4.7/1.5KW,转1440/940rpm,噪声82/73dB(A,Ws=0.11" }, { "brand": "格瑞德、亚太、中大空调", "product": "排烟风机", "quantity": "6", "quantity_unit": "台", "specs": "消防排烟风机:37800/25500m3/h,风压837/374pa,功率15.5/5.5,转速730/500r/min,重量585kg,噪声73/70dB(A)" }, { "brand": "格瑞德、亚太、中大空调", "product": "排风机", "quantity": "2", "quantity_unit": "台", "specs": "8900m3/h,压力262pa,功率1.5kw,重量150kg" }, { "brand": "格瑞德、亚太、中大空调", "product": "送风机", "quantity": "2", "quantity_unit": "台", "specs": "8121m3/h,压力325pa,功率1.5kw,重量150kg" }, { "product": "换气扇", "quantity": "300", "quantity_unit": "个", "specs": "APB15A排气扇,风量:288m3/h功率24W" } ], "header": [ "清单名称_数量_单位__品牌/厂家_规格/项目特征_____", "清单名称_数量_单位__品牌/厂家_规格/项目特征_____" ], "header_col": [ "序号_清单名称_规格/项目特征_品牌/厂家_单位_数量_备 注_备 注_备 注", "序号_清单名称_规格/项目特征_品牌/厂家_单位_数量_实际验收标准_技术标\n联系人\n及电话_其它\n说明" ] }, "project_contacts": [], "project_label": { "标题": { "管道工程": [ [ "通风管道", 1 ] ], "通风散热配件": [ [ "风管", 1 ] ] }, "核心字段": { "管道工程": [ [ "通风管道", 3 ] ], "通风散热设备": [ [ "风机", 5 ] ], "通风散热配件": [ [ "风管", 5 ] ] } }, "property_label": "", "proportion": "", "requirement": "", "serviceTime": { "service_days": 0, "service_end": "", "service_start": "" }, "success": true, "time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_contractEnd": "", "time_contractStart": "", "time_earnestMoneyEnd": "", "time_earnestMoneyStart": "", "time_getFileEnd": "", "time_getFileStart": "", "time_listingEnd": "", "time_listingStart": "", "time_planned": "", "time_publicityEnd": "", "time_publicityStart": "", "time_registrationEnd": "", "time_registrationStart": "", "time_release": "", "time_signContract": "", "total_tendereeMoney": 0, "total_tendereeMoneyUnit": "", "version_date": "2025-05-15", "word_count": { "正文": 10439, "附件": 0 } }'
  2391. '''
  2392. b = '''
  2393. {"招标信息":{"招标人名称":"五矿铜业(湖南)有限公司","项目预算":"","招标人联系方式":[{"联系人":"李锟","联系电话":"13575144492"}]},"中标信息":[{"中标人名称":"","中标金额":"","标段号":""}]}
  2394. '''
  2395. print(de.should_to_extract_ai(a))
  2396. # print(de.merge_json(a,b))
  2397. # print(test_ai_extract())
  2398. # de.start_flow_extract()
  2399. # fixDoc_to_queue_extract()
  2400. # check_data_synchronization()
  2401. # fixDoc_to_queue_init(filename="C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-07.xlsx")