dataflow_mq.py 123 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713
  1. #coding:utf8
  2. from BaseDataMaintenance.maintenance.dataflow import *
  3. from BaseDataMaintenance.common.activateMQUtils import *
  4. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ,getConnection_postgres,getConnection_mysql,getConnection_oracle,getConnect_ots_capacity,getConnect_redis_doc
  5. from BaseDataMaintenance.dataSource.setttings import *
  6. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  7. import os
  8. from BaseDataMaintenance.common.ossUtils import *
  9. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  10. from BaseDataMaintenance.model.ots.document import Document,document_attachment_path_filemd5
  11. from BaseDataMaintenance.common.Utils import article_limit
  12. from BaseDataMaintenance.common.documentFingerprint import getFingerprint
  13. from BaseDataMaintenance.model.postgres.document_extract import *
  14. from BaseDataMaintenance.model.oracle.T_SHEN_PI_XIANG_MU import *
  15. import sys
  16. sys.setrecursionlimit(1000000)
  17. from multiprocessing import Process
  18. from BaseDataMaintenance.AIUtils.DoubaoUtils import chat_doubao,get_json_from_text
  19. from BaseDataMaintenance.AIUtils.html2text import html2text_with_tablehtml
  20. from BaseDataMaintenance.AIUtils.prompts import get_prompt_extract_role
  21. from BaseDataMaintenance.common.Utils import getUnifyMoney
  22. from BaseDataMaintenance.maintenance.product.medical_product import MedicalProduct
  23. class ActiveMQListener():
  24. def __init__(self,conn,_queue,*args,**kwargs):
  25. self.conn = conn
  26. self._queue = _queue
  27. def on_error(self, headers):
  28. log("===============")
  29. log('received an error %s' % str(headers.body))
  30. def on_message(self, headers):
  31. log("====message====")
  32. message_id = headers.headers["message-id"]
  33. body = headers.body
  34. self._queue.put({"frame":headers,"conn":self.conn},True)
  35. def __del__(self):
  36. self.conn.disconnect()
  37. class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
  38. class AttachmentMQListener():
  39. def __init__(self,conn,_func,_idx,*args,**kwargs):
  40. self.conn = conn
  41. self._func = _func
  42. self._idx = _idx
  43. def on_error(self, headers):
  44. log("===============")
  45. log('received an error %s' % str(headers.body))
  46. def on_message(self, headers):
  47. try:
  48. log("get message of idx:%s"%(str(self._idx)))
  49. message_id = headers.headers["message-id"]
  50. body = headers.body
  51. _dict = {"frame":headers,"conn":self.conn,"idx":self._idx}
  52. self._func(_dict=_dict)
  53. except Exception as e:
  54. traceback.print_exc()
  55. pass
  56. def __del__(self):
  57. self.conn.disconnect()
  58. def __init__(self):
  59. Dataflow_attachment.__init__(self)
  60. self.mq_attachment = "/queue/dataflow_attachment"
  61. self.mq_attachment_failed = "/queue/dataflow_attachment_failed"
  62. self.mq_extract = "/queue/dataflow_extract"
  63. self.queue_attachment_ocr = Queue()
  64. self.queue_attachment_not_ocr = Queue()
  65. self.comsumer_count = 20
  66. self.comsumer_process_count = 5
  67. self.retry_comsumer_count = 10
  68. self.retry_times = 5
  69. self.list_attachment_comsumer = []
  70. # for _i in range(self.comsumer_count):
  71. # listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.queue_attachment)
  72. # createComsumer(listener_attachment,self.mq_attachment)
  73. # self.list_attachment_comsumer.append(listener_attachment)
  74. self.attach_pool = ConnectorPool(10,30,getConnection_postgres)
  75. self.redis_pool = ConnectorPool(10,30,getConnect_redis_doc)
  76. self.conn_mq = getConnect_activateMQ()
  77. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  78. self.session = None
  79. for _ in range(self.comsumer_process_count):
  80. listener_p = Process(target=self.start_attachment_listener)
  81. listener_p.start()
  82. # listener_p = Process(target=self.start_attachment_listener)
  83. # listener_p.start()
  84. def start_attachment_listener(self):
  85. for _i in range(self.comsumer_count):
  86. listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler ,_i)
  87. createComsumer(listener_attachment,self.mq_attachment)
  88. self.list_attachment_comsumer.append(listener_attachment)
  89. while 1:
  90. for i in range(len(self.list_attachment_comsumer)):
  91. if self.list_attachment_comsumer[i].conn.is_connected():
  92. continue
  93. else:
  94. listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,i)
  95. createComsumer(listener,self.mq_attachment)
  96. self.list_attachment_comsumer[i] = listener
  97. time.sleep(5)
  98. def monitor_listener(self):
  99. for i in range(len(self.list_attachment_comsumer)):
  100. if self.list_attachment_comsumer[i].conn.is_connected():
  101. continue
  102. else:
  103. listener = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  104. createComsumer(listener,self.mq_attachment)
  105. self.list_attachment_comsumer[i] = listener
  106. def process_failed_attachment(self):
  107. from BaseDataMaintenance.java.MQInfo import getQueueSize
  108. attachment_size = getQueueSize("dataflow_attachment")
  109. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  110. if attachment_size<100 and failed_attachment_size>0:
  111. list_comsumer = []
  112. for _i in range(self.retry_comsumer_count):
  113. listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
  114. list_comsumer.append(listener_attachment)
  115. createComsumer(listener_attachment,self.mq_attachment_failed)
  116. while 1:
  117. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  118. if failed_attachment_size==0:
  119. break
  120. time.sleep(10)
  121. for _c in list_comsumer:
  122. _c.conn.disconnect()
  123. def attachment_listener_handler(self,_dict):
  124. try:
  125. frame = _dict["frame"]
  126. conn = _dict["conn"]
  127. message_id = frame.headers["message-id"]
  128. item = json.loads(frame.body)
  129. _idx = _dict.get("idx",1)
  130. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  131. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  132. if random.random()<0.2:
  133. log("jump by random")
  134. if send_msg_toacmq(self.pool_mq,frame.body,self.mq_attachment):
  135. ackMsg(conn,message_id)
  136. return
  137. if len(page_attachments)==0:
  138. newitem ={"item":item,"list_attach":[],"message_id":message_id,"conn":conn}
  139. else:
  140. list_fileMd5 = []
  141. for _atta in page_attachments:
  142. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  143. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  144. newitem = {"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn}
  145. log("attachment get doc:%s"%(str(newitem.get("item",{}).get("docid"))))
  146. self.attachment_recognize(newitem,None)
  147. log("attachment get doc:%s succeed"%(str(newitem.get("item",{}).get("docid"))))
  148. except Exception as e:
  149. traceback.print_exc()
  150. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  151. try:
  152. list_html = []
  153. swf_urls = []
  154. _not_failed = True
  155. for _attach in list_attach:
  156. #测试全跑
  157. _filemd5 = _attach.getProperties().get(attachment_filemd5)
  158. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  159. log("%s has processed or toolarge"%(_filemd5))
  160. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  161. if _html is None:
  162. _html = ""
  163. list_html.append({attachment_filemd5:_filemd5,
  164. "html":_html})
  165. else:
  166. #has process_time then jump
  167. if len(str(_attach.getProperties().get(attachment_process_time,"")))>10 and _attach.getProperties().get(attachment_status)!=ATTACHMENT_INIT and not (_attach.getProperties().get(attachment_status)>=ATTACHMENT_MC_FAILED_FROM and _attach.getProperties().get(attachment_status)<=ATTACHMENT_MC_FAILED_TO):
  168. log("%s has process_time jump"%(_filemd5))
  169. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  170. if _html is None:
  171. _html = ""
  172. list_html.append({attachment_filemd5:_filemd5,
  173. "html":_html})
  174. else:
  175. log("%s requesting interface"%(_filemd5))
  176. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  177. if not _succeed:
  178. _not_failed = False
  179. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  180. if _html is None:
  181. _html = ""
  182. list_html.append({attachment_filemd5:_filemd5,
  183. "html":_html})
  184. if _attach.getProperties().get(attachment_filetype)=="swf":
  185. # swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  186. _swf_urls = _attach.getProperties().get(attachment_swfUrls, "[]")
  187. if _swf_urls:
  188. _swf_urls = _swf_urls.replace('\\', '')
  189. else:
  190. _swf_urls = '[]'
  191. _swf_urls = json.loads(_swf_urls)
  192. swf_urls.extend(_swf_urls)
  193. if not _not_failed:
  194. return False,list_html,swf_urls
  195. return True,list_html,swf_urls
  196. except requests.ConnectionError as e1:
  197. raise e1
  198. except Exception as e:
  199. return False,list_html,swf_urls
  200. def attachment_recognize(self,_dict,result_queue):
  201. '''
  202. 识别附件内容
  203. :param _dict: 附件内容
  204. :param result_queue:
  205. :return:
  206. '''
  207. try:
  208. start_time = time.time()
  209. item = _dict.get("item")
  210. list_attach = _dict.get("list_attach")
  211. conn = _dict["conn"]
  212. message_id = _dict.get("message_id")
  213. if "retry_times" not in item:
  214. item["retry_times"] = 5
  215. _retry_times = item.get("retry_times",0)
  216. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  217. "docid":item.get("docid")})
  218. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  219. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  220. dhtml.delete_bidi_a()
  221. #调用识别接口
  222. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  223. # 将附件分类写回document
  224. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  225. if len(page_attachments)>0:
  226. for _attachment in page_attachments:
  227. filemd5 = _attachment.get(document_attachment_path_filemd5,"")
  228. classification = None
  229. for _attach in list_attach:
  230. if _attach.getProperties().get(attachment_filemd5,"")==filemd5:
  231. classification = _attach.getProperties().get(attachment_classification,"")
  232. break
  233. if classification is not None:
  234. _attachment[attachment_classification] = classification
  235. item[document_tmp_attachment_path] = json.dumps(page_attachments,ensure_ascii=False)
  236. dtmp = Document_tmp(item)
  237. _to_ack = False
  238. if not _succeed and _retry_times<self.retry_times:
  239. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  240. item["retry_times"] = _retry_times+1
  241. #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
  242. if item["retry_times"]>=self.retry_times:
  243. send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
  244. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
  245. #失败保存
  246. if _retry_times==0:
  247. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  248. dtmp.setValue(document_tmp_status,0,True)
  249. if not dtmp.exists_row(self.ots_client):
  250. dtmp.update_row(self.ots_client)
  251. dhtml.update_row(self.ots_client)
  252. if send_succeed:
  253. _to_ack = True
  254. else:
  255. try:
  256. log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),str(len(list_html))))
  257. dhtml.updateSWFImages(swf_urls)
  258. dhtml.updateAttachment(list_html)
  259. dtmp.setValue(document_tmp_attachment_extract_status,1,True)
  260. dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
  261. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
  262. if send_succeed:
  263. _to_ack = True
  264. except Exception as e:
  265. traceback.print_exc()
  266. if _to_ack:
  267. ackMsg(conn,message_id)
  268. log("document:%d get attachments with result:%s %s retry_times:%d"%(item.get("docid"),str(_succeed),str(_to_ack),_retry_times))
  269. except Exception as e:
  270. traceback.print_exc()
  271. if time.time()-start_time<10:
  272. item["retry_times"] -= 1
  273. if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment):
  274. ackMsg(conn,message_id)
  275. def request_attachment_interface(self,attach,_dochtmlcon):
  276. filemd5 = attach.getProperties().get(attachment_filemd5)
  277. _status = attach.getProperties().get(attachment_status)
  278. _filetype = attach.getProperties().get(attachment_filetype)
  279. _path = attach.getProperties().get(attachment_path)
  280. _uuid = uuid4()
  281. objectPath = attach.getProperties().get(attachment_path)
  282. docids = attach.getProperties().get(attachment_docids)
  283. _ots_exists = attach.getProperties().get("ots_exists")
  284. if objectPath is None:
  285. relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex)
  286. else:
  287. relative_path = objectPath[5:].replace("//","/")
  288. localpath = "/FileInfo/%s"%(relative_path)
  289. if not os.path.exists(localpath):
  290. if not os.path.exists(os.path.dirname(localpath)):
  291. os.makedirs(os.path.dirname(localpath))
  292. local_exists = False
  293. else:
  294. local_exists = True
  295. _size = os.path.getsize(localpath)
  296. not_failed_flag = True
  297. try:
  298. d_start_time = time.time()
  299. if not local_exists:
  300. log("md5:%s path:%s not exists,start downloading"%(filemd5,objectPath))
  301. try:
  302. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  303. except Exception as e:
  304. download_succeed = False
  305. else:
  306. log("md5:%s path:%s exists"%(filemd5,objectPath[5:]))
  307. if not (local_exists or download_succeed):
  308. _ots_attach = attachment(attach.getProperties_ots())
  309. _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_classification,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True)
  310. log("md5:%s path:%s file not in local or oss,search ots.attachment"%(filemd5,objectPath))
  311. if _ots_attach.getProperties().get(attachment_attachmenthtml,"")!="" and str(_ots_attach.getProperties().get(attachment_status))!=str(ATTACHMENT_INIT):
  312. attach.setValue(attachment_attachmenthtml,_ots_attach.getProperties().get(attachment_attachmenthtml,""))
  313. attach.setValue(attachment_attachmentcon,_ots_attach.getProperties().get(attachment_attachmentcon,""))
  314. attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,""))
  315. attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,""))
  316. attach.setValue(attachment_classification,_ots_attach.getProperties().get(attachment_classification,""))
  317. # if attach.exists(self.attach_pool):
  318. # attach.update_row(self.attach_pool)
  319. # else:
  320. # attach.insert_row(self.attach_pool)
  321. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  322. try:
  323. if os.exists(localpath):
  324. os.remove(localpath)
  325. except Exception as e:
  326. pass
  327. return True
  328. if _ots_exists:
  329. objectPath = attach.getProperties().get(attachment_path)
  330. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  331. if download_succeed:
  332. log("md5:%s path:%s download file from oss succeed"%(filemd5,objectPath))
  333. else:
  334. log("md5:%s path:%s download file from ots failed=="%(filemd5,objectPath))
  335. else:
  336. log("md5:%s path:%s not found in ots"%(filemd5,objectPath))
  337. if local_exists or download_succeed:
  338. _size = os.path.getsize(localpath)
  339. attach.setValue(attachment_size,_size,True)
  340. if _size>ATTACHMENT_LARGESIZE:
  341. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE,True)
  342. log("attachment :%s of path:%s to large"%(filemd5,_path))
  343. _ots_attach = attachment(attach.getProperties_ots())
  344. _ots_attach.update_row(self.ots_client)
  345. # #更新postgres
  346. # if attach.exists(self.attach_pool):
  347. # attach.update_row(self.attach_pool)
  348. # else:
  349. # attach.insert_row(self.attach_pool)
  350. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  351. if local_exists:
  352. if not _ots_exists:
  353. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  354. os.remove(localpath)
  355. return True
  356. time_download = time.time()-d_start_time
  357. #调用接口处理结果
  358. start_time = time.time()
  359. _filetype = attach.getProperties().get(attachment_filetype)
  360. # _data_base64 = base64.b64encode(open(localpath,"rb").read())
  361. # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
  362. _success,_html,swf_images,classification = getAttachDealInterface(None,_filetype,path=localpath,session=self.session)
  363. _reg_time = time.time()-start_time
  364. if _success:
  365. if len(_html)<5:
  366. _html = ""
  367. else:
  368. if len(_html)>1:
  369. _html = "interface return error"
  370. else:
  371. # sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  372. _html = ""
  373. return False
  374. # 重跑swf时,删除原来的swf_urls中的"\"
  375. if attach.getProperties().get(attachment_filetype) == "swf":
  376. swf_urls = attach.getProperties().get(attachment_swfUrls, "[]")
  377. swf_urls = swf_urls.replace('\\', '') if swf_urls else '[]'
  378. swf_urls = json.loads(swf_urls)
  379. attach.setValue(attachment_swfUrls, json.dumps(swf_urls, ensure_ascii=False), True)
  380. swf_images = eval(swf_images)
  381. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  382. swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  383. if len(swf_urls)==0:
  384. objectPath = attach.getProperties().get(attachment_path,"")
  385. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  386. if not os.path.exists(swf_dir):
  387. os.mkdir(swf_dir)
  388. for _i in range(len(swf_images)):
  389. _base = swf_images[_i]
  390. _base = base64.b64decode(_base)
  391. filename = "swf_page_%d.png"%(_i)
  392. filepath = os.path.join(swf_dir,filename)
  393. with open(filepath,"wb") as f:
  394. f.write(_base)
  395. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  396. if os.path.exists(swf_dir):
  397. os.rmdir(swf_dir)
  398. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  399. else:
  400. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  401. if re.search("<td",_html) is not None:
  402. attach.setValue(attachment_has_table,1,True)
  403. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  404. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  405. if _file_title!="":
  406. attach.setValue(attachment_file_title,_file_title,True)
  407. if filelink!="":
  408. attach.setValue(attachment_file_link,filelink,True)
  409. attach.setValue(attachment_attachmenthtml,_html,True)
  410. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  411. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  412. attach.setValue(attachment_recsize,len(_html),True)
  413. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  414. attach.setValue(attachment_classification,classification,True)
  415. #更新ots
  416. _ots_attach = attachment(attach.getProperties_ots())
  417. _ots_attach.update_row(self.ots_client) #线上再开放更新
  418. # #更新postgres
  419. # if attach.exists(self.attach_pool):
  420. # attach.update_row(self.attach_pool)
  421. # else:
  422. # attach.insert_row(self.attach_pool)
  423. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  424. start_time = time.time()
  425. if local_exists:
  426. if not _ots_exists:
  427. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  428. try:
  429. if upload_status and os.exists(localpath):
  430. os.remove(localpath)
  431. except Exception as e:
  432. pass
  433. _upload_time = time.time()-start_time
  434. log("process filemd5:%s %s of type:%s with size:%.3fM download:%.2fs recognize takes %ds upload takes %.2fs _ots_exists %s,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,_reg_time,_upload_time,str(_ots_exists),len(_html)))
  435. return True
  436. else:
  437. return True
  438. except requests.ConnectionError as e1:
  439. raise e1
  440. except oss2.exceptions.NotFound as e:
  441. return True
  442. except Exception as e:
  443. traceback.print_exc()
  444. # def flow_attachment(self):
  445. # self.flow_attachment_producer()
  446. # self.flow_attachment_producer_comsumer()
  447. def getAttachPath(self,filemd5,_dochtmlcon):
  448. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  449. list_mark = ["data","filelink"]
  450. for _mark in list_mark:
  451. _find = _soup.find("a",attrs={_mark:filemd5})
  452. filelink = ""
  453. if _find is None:
  454. _find = _soup.find("img",attrs={_mark:filemd5})
  455. if _find is not None:
  456. filelink = _find.attrs.get("src","")
  457. else:
  458. filelink = _find.attrs.get("href","")
  459. if filelink.find("bidizhaobiao")>=0:
  460. _path = filelink.split("/file")
  461. if len(_path)>1:
  462. return _path[1]
  463. def getAttach_json_fromRedis(self,filemd5):
  464. db = self.redis_pool.getConnector()
  465. try:
  466. _key = "attach-%s"%(filemd5)
  467. _attach_json = db.get(_key)
  468. return _attach_json
  469. except Exception as e:
  470. log("getAttach_json_fromRedis error %s"%(str(e)))
  471. finally:
  472. try:
  473. if db.connection.check_health():
  474. self.redis_pool.putConnector(db)
  475. except Exception as e:
  476. pass
  477. return None
  478. def putAttach_json_toRedis(self,filemd5,extract_dict):
  479. db = self.redis_pool.getConnector()
  480. try:
  481. new_dict = {}
  482. for k,v in extract_dict.items():
  483. if not isinstance(v,set):
  484. new_dict[k] = v
  485. _key = "attach-%s"%(filemd5)
  486. _extract_json = db.set(str(_key),json.dumps(new_dict))
  487. db.expire(_key,3600*3)
  488. return _extract_json
  489. except Exception as e:
  490. log("putExtract_json_toRedis error%s"%(str(e)))
  491. traceback.print_exc()
  492. finally:
  493. try:
  494. if db.connection.check_health():
  495. self.redis_pool.putConnector(db)
  496. except Exception as e:
  497. pass
  498. def getAttachments(self,list_filemd5,_dochtmlcon):
  499. conn = self.attach_pool.getConnector()
  500. #搜索postgres
  501. try:
  502. to_find_md5 = []
  503. for _filemd5 in list_filemd5[:50]:
  504. if _filemd5 is not None:
  505. to_find_md5.append(_filemd5)
  506. conditions = ["filemd5 in ('%s')"%("','".join(to_find_md5))]
  507. list_attachment = []
  508. set_md5 = set()
  509. # list_attachment = Attachment_postgres.select_rows(conn,Attachment_postgres,"attachment",conditions)
  510. # for _attach in list_attachment:
  511. # set_md5.add(_attach.getProperties().get(attachment_filemd5))
  512. for _filemd5 in to_find_md5:
  513. _json = self.getAttach_json_fromRedis(_filemd5)
  514. if _json is not None:
  515. set_md5.add(_filemd5)
  516. list_attachment.append(Attachment_postgres(json.loads(_json)))
  517. log("select localpath database %d/%d"%(len(set_md5),len(to_find_md5)))
  518. for _filemd5 in to_find_md5:
  519. if _filemd5 not in set_md5:
  520. _path = self.getAttachPath(_filemd5,_dochtmlcon)
  521. log("getAttachments search in ots:%s"%(_filemd5))
  522. _attach = {attachment_filemd5:_filemd5}
  523. _attach_ots = attachment(_attach)
  524. if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time,attachment_classification],True):
  525. if _attach_ots.getProperties().get(attachment_status) is not None:
  526. log("getAttachments find in ots:%s"%(_filemd5))
  527. _attach_pg = Attachment_postgres(_attach_ots.getProperties())
  528. _attach_pg.setValue("ots_exists",True,True)
  529. list_attachment.append(_attach_pg)
  530. else:
  531. log("getAttachments status None find in ots:%s"%(_filemd5))
  532. _attach_pg = Attachment_postgres(_attach_ots.getProperties())
  533. _attach_pg.setValue("ots_exists",True,True)
  534. list_attachment.append(_attach_pg)
  535. else:
  536. log("getAttachments search in path:%s"%(_filemd5))
  537. if _path:
  538. log("getAttachments find in path:%s"%(_filemd5))
  539. if _path[0]=="/":
  540. _path = _path[1:]
  541. _filetype = _path.split(".")[-1]
  542. _attach = {attachment_filemd5:_filemd5,
  543. attachment_filetype:_filetype,
  544. attachment_status:20,
  545. attachment_path:"%s/%s"%(_filemd5[:4],_path),
  546. attachment_crtime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  547. list_attachment.append(Attachment_postgres(_attach))
  548. return list_attachment
  549. except Exception as e:
  550. log("attachProcess comsumer error %s"%str(e))
  551. log(str(to_find_md5))
  552. traceback.print_exc()
  553. return []
  554. finally:
  555. self.attach_pool.putConnector(conn)
  556. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  557. q_size = self.queue_attachment.qsize()
  558. qsize_ocr = self.queue_attachment_ocr.qsize()
  559. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  560. log("queue_attachment:%d,queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(q_size,qsize_ocr,qsize_not_ocr))
  561. def flow_attachment_producer_comsumer(self):
  562. log("start flow_attachment comsumer")
  563. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1,need_stop=False,restart=True)
  564. mt.run()
  565. def flow_attachment_process(self):
  566. self.process_comsumer()
  567. # p = Process(target = self.process_comsumer)
  568. # p.start()
  569. # p.join()
  570. def set_queue(self,_dict):
  571. list_attach = _dict.get("list_attach")
  572. to_ocr = False
  573. for attach in list_attach:
  574. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  575. to_ocr = True
  576. break
  577. if to_ocr:
  578. self.queue_attachment_ocr.put(_dict,True)
  579. else:
  580. self.queue_attachment_not_ocr.put(_dict,True)
  581. def comsumer_handle(self,_dict,result_queue):
  582. try:
  583. frame = _dict["frame"]
  584. conn = _dict["conn"]
  585. message_id = frame.headers["message-id"]
  586. item = json.loads(frame.body)
  587. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  588. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  589. if len(page_attachments)==0:
  590. self.set_queue({"item":item,"list_attach":[],"message_id":message_id,"conn":conn})
  591. else:
  592. list_fileMd5 = []
  593. for _atta in page_attachments:
  594. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  595. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  596. self.set_queue({"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn})
  597. except Exception as e:
  598. traceback.print_exc()
  599. def remove_attachment_postgres(self):
  600. current_date = getCurrent_date(format="%Y-%m-%d")
  601. last_date = timeAdd(current_date,-2,format="%Y-%m-%d")
  602. sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
  603. conn = self.attach_pool.getConnector()
  604. try:
  605. cursor = conn.cursor()
  606. cursor.execute(sql)
  607. conn.commit()
  608. self.attach_pool.putConnector(conn)
  609. except Exception as e:
  610. conn.close()
  611. def start_flow_attachment(self):
  612. schedule = BlockingScheduler()
  613. # schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  614. # schedule.add_job(self.flow_attachment,"cron",second="*/10")
  615. # schedule.add_job(self.flow_attachment_producer,"cron",second="*/10")
  616. # schedule.add_job(self.flow_attachment_producer_comsumer,"cron",second="*/10")
  617. # schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  618. # schedule.add_job(self.monitor_attachment_process,"cron",second="*/10")
  619. # schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
  620. schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
  621. schedule.start()
  622. class Dataflow_ActivteMQ_extract(Dataflow_extract):
  623. class ExtractListener():
  624. def __init__(self,conn,_func,_idx,*args,**kwargs):
  625. self.conn = conn
  626. self._func = _func
  627. self._idx = _idx
  628. def on_message(self, headers):
  629. try:
  630. log("get message of idx:%d"%(self._idx))
  631. message_id = headers.headers["message-id"]
  632. body = headers.body
  633. log("get message %s crtime:%s"%(message_id,json.loads(body).get("crtime","")))
  634. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  635. except Exception as e:
  636. traceback.print_exc()
  637. pass
  638. def on_error(self, headers):
  639. log('received an error %s' % str(headers.body))
  640. def __del__(self):
  641. self.conn.disconnect()
  642. def __init__(self,create_listener=True):
  643. Dataflow_extract.__init__(self)
  644. self.industy_url = "http://127.0.0.1:15000/industry_extract"
  645. self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",20],
  646. ["http://192.168.0.115:15030/content_extract",10]
  647. ]
  648. self.mq_extract = "/queue/dataflow_extract"
  649. self.mq_extract_ai = "/queue/dataflow_extract_AI"
  650. self.mq_extract_failed = "/queue/dataflow_extract_failed"
  651. self.whole_weight = 0
  652. for _url,weight in self.extract_interfaces:
  653. self.whole_weight+= weight
  654. current_weight = 0
  655. for _i in range(len(self.extract_interfaces)):
  656. current_weight += self.extract_interfaces[_i][1]
  657. self.extract_interfaces[_i][1] = current_weight/self.whole_weight
  658. self.comsumer_count = 5
  659. # self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres)
  660. self.pool_redis_doc = ConnectorPool(1,self.comsumer_count,getConnect_redis_doc)
  661. init_nums = 1
  662. self.conn_mq = None
  663. if create_listener:
  664. self.conn_mq = getConnect_activateMQ()
  665. else:
  666. init_nums = 0
  667. self.pool_mq = ConnectorPool(init_nums,30,getConnect_activateMQ)
  668. self.block_url = RLock()
  669. self.url_count = 0
  670. self.session = None
  671. self.MP = MedicalProduct()
  672. self.list_extract_comsumer = []
  673. self.list_extract_ai_comsumer = []
  674. # for _i in range(self.comsumer_count):
  675. # listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  676. # createComsumer(listener_extract,self.mq_extract)
  677. # self.list_extract_comsumer.append(listener_extract)
  678. # 提取listener
  679. if create_listener:
  680. for ii in range(10):
  681. listener_p = Process(target=self.start_extract_listener)
  682. listener_p.start()
  683. listener_p_ai = Thread(target=self.start_extract_AI_listener)
  684. listener_p_ai.start()
  685. def start_extract_AI_listener(self,_count=8):
  686. self.list_extract_ai_comsumer = []
  687. for _i in range(_count):
  688. listener_extract = self.ExtractListener(getConnect_activateMQ(),self.extract_ai_handle,_i)
  689. createComsumer(listener_extract,self.mq_extract_ai)
  690. self.list_extract_ai_comsumer.append(listener_extract)
  691. while 1:
  692. try:
  693. for _i in range(len(self.list_extract_ai_comsumer)):
  694. if self.list_extract_ai_comsumer[_i].conn.is_connected():
  695. continue
  696. else:
  697. listener = self.ExtractListener(getConnect_activateMQ(),self.extract_ai_handle,_i)
  698. createComsumer(listener,self.mq_extract_ai)
  699. self.list_extract_ai_comsumer[_i] = listener
  700. time.sleep(5)
  701. except Exception as e:
  702. traceback.print_exc()
  703. def start_extract_listener(self):
  704. self.list_extract_comsumer = []
  705. for _i in range(self.comsumer_count):
  706. listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  707. createComsumer(listener_extract,self.mq_extract)
  708. self.list_extract_comsumer.append(listener_extract)
  709. while 1:
  710. try:
  711. for _i in range(len(self.list_extract_comsumer)):
  712. if self.list_extract_comsumer[_i].conn.is_connected():
  713. continue
  714. else:
  715. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  716. createComsumer(listener,self.mq_extract)
  717. self.list_extract_comsumer[_i] = listener
  718. time.sleep(5)
  719. except Exception as e:
  720. traceback.print_exc()
  721. def monitor_listener(self):
  722. for i in range(len(self.list_extract_comsumer)):
  723. if self.list_extract_comsumer[i].conn.is_connected():
  724. continue
  725. else:
  726. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
  727. createComsumer(listener,self.mq_extract)
  728. self.list_extract_comsumer[i] = listener
  729. def getExtract_url(self):
  730. # _url_num = 0
  731. # with self.block_url:
  732. # self.url_count += 1
  733. # self.url_count %= self.whole_weight
  734. # _url_num = self.url_count
  735. _r = random.random()
  736. # _r = _url_num/self.whole_weight
  737. for _i in range(len(self.extract_interfaces)):
  738. if _r<=self.extract_interfaces[_i][1]:
  739. return self.extract_interfaces[_i][0]
  740. def request_extract_interface(self,json,headers):
  741. # _i = random.randint(0,len(self.extract_interfaces)-1)
  742. # _i = 0
  743. # _url = self.extract_interfaces[_i]
  744. _url = self.getExtract_url()
  745. log("extract_url:%s"%(str(_url)))
  746. with requests.Session() as session:
  747. resp = session.post(_url,json=json,headers=headers,timeout=10*60)
  748. return resp
  749. def request_industry_interface(self,json,headers):
  750. resp = requests.post(self.industy_url,json=json,headers=headers)
  751. return resp
  752. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  753. q_size = self.queue_extract.qsize()
  754. log("queue extract size:%d"%(q_size))
  755. def process_extract_failed(self):
  756. def _handle(_dict,result_queue):
  757. frame = _dict.get("frame")
  758. message_id = frame.headers["message-id"]
  759. subscription = frame.headers.setdefault('subscription', None)
  760. conn = _dict.get("conn")
  761. body = frame.body
  762. if body is not None:
  763. item = json.loads(body)
  764. item["extract_times"] = 10
  765. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  766. ackMsg(conn,message_id,subscription)
  767. from BaseDataMaintenance.java.MQInfo import getQueueSize
  768. try:
  769. extract_failed_size = getQueueSize("dataflow_extract_failed")
  770. extract_size = getQueueSize("dataflow_extract")
  771. log("extract_failed_size %s extract_size %s"%(str(extract_failed_size),str(extract_size)))
  772. if extract_failed_size>0 and extract_size<100:
  773. failed_listener = self.ExtractListener(getConnect_activateMQ(),_handle,1)
  774. createComsumer(failed_listener,self.mq_extract_failed)
  775. while 1:
  776. extract_failed_size = getQueueSize("dataflow_extract_failed")
  777. if extract_failed_size==0:
  778. break
  779. time.sleep(10)
  780. failed_listener.conn.disconnect()
  781. except Exception as e:
  782. traceback.print_exc()
  783. def flow_extract(self,):
  784. self.comsumer()
  785. def comsumer(self):
  786. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,20,1,True)
  787. mt.run()
  788. def getExtract_json_fromDB(self,_fingerprint):
  789. conn = self.pool_postgres.getConnector()
  790. try:
  791. list_extract = Document_extract_postgres.select_rows(conn,Document_extract_postgres,"document_extract",[" fingerprint='%s'"%_fingerprint])
  792. if len(list_extract)>0:
  793. _extract = list_extract[0]
  794. return _extract.getProperties().get(document_extract_extract_json)
  795. except Exception as e:
  796. traceback.print_exc()
  797. finally:
  798. self.pool_postgres.putConnector(conn)
  799. return None
  800. def putExtract_json_toDB(self,fingerprint,docid,extract_json):
  801. _de = Document_extract_postgres({document_extract_fingerprint:fingerprint,
  802. document_extract_docid:docid,
  803. document_extract_extract_json:extract_json})
  804. _de.insert_row(self.pool_postgres,1)
  805. def getExtract_json_fromRedis(self,_fingerprint):
  806. db = self.pool_redis_doc.getConnector()
  807. try:
  808. _extract_json = db.get(_fingerprint)
  809. return _extract_json
  810. except Exception as e:
  811. log("getExtract_json_fromRedis error %s"%(str(e)))
  812. finally:
  813. try:
  814. if db.connection.check_health():
  815. self.pool_redis_doc.putConnector(db)
  816. except Exception as e:
  817. pass
  818. return None
  819. def putExtract_json_toRedis(self,fingerprint,extract_json):
  820. db = self.pool_redis_doc.getConnector()
  821. try:
  822. _extract_json = db.set(str(fingerprint),extract_json)
  823. db.expire(fingerprint,3600*2)
  824. return _extract_json
  825. except Exception as e:
  826. log("putExtract_json_toRedis error%s"%(str(e)))
  827. traceback.print_exc()
  828. finally:
  829. try:
  830. if db.connection.check_health():
  831. self.pool_redis_doc.putConnector(db)
  832. except Exception as e:
  833. pass
  834. def comsumer_handle(self,_dict,result_queue):
  835. try:
  836. log("start handle")
  837. data = {}
  838. frame = _dict["frame"]
  839. conn = _dict["conn"]
  840. message_id = frame.headers["message-id"]
  841. subscription = frame.headers.setdefault('subscription', None)
  842. item = json.loads(frame.body)
  843. for k,v in item.items():
  844. try:
  845. if isinstance(v,bytes):
  846. item[k] = v.decode("utf-8")
  847. except Exception as e:
  848. log("docid %d types bytes can not decode"%(item.get("docid")))
  849. item[k] = ""
  850. dtmp = Document_tmp(item)
  851. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  852. "docid":item.get("docid")})
  853. extract_times = item.get("extract_times",0)+1
  854. item["extract_times"] = extract_times
  855. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  856. html_len = len(_dochtmlcon) # html 文本长度
  857. limit_text_len = 50000 # 内容(或附件)正文限制文本长度
  858. if html_len > limit_text_len:
  859. log("docid %s dochtmlcon too long len %d "%(str(item.get("docid")),html_len))
  860. try:
  861. _dochtmlcon = re.sub("<html>|</html>|<body>|</body>", "", _dochtmlcon)
  862. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  863. all_len = len(_soup.get_text()) # 全公告内容text长度
  864. _attachment = _soup.find("div", attrs={"class": "richTextFetch"})
  865. attachment_len = len(_attachment.get_text()) if _attachment else 0 # 附件内容text长度
  866. main_text_len = all_len - attachment_len # 正文内容text长度
  867. if attachment_len>150000: # 附件内容过长删除(处理超时)
  868. if _attachment is not None:
  869. _attachment.decompose()
  870. attachment_len = 0
  871. # 正文或附件内容text长度大于limit_text_len才执行article_limit
  872. if main_text_len>limit_text_len or attachment_len>limit_text_len:
  873. _soup = article_limit(_soup,limit_text_len)
  874. _dochtmlcon = str(_soup)
  875. except Exception as e:
  876. traceback.print_exc()
  877. ackMsg(conn,message_id,subscription)
  878. return
  879. log("docid %s len %d limit to %d"%(str(item.get("docid")),html_len,len(_dochtmlcon)))
  880. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  881. _extract = Document_extract({})
  882. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  883. _extract.setValue(document_extract2_docid,item.get(document_docid))
  884. all_done = 1
  885. for k,v in item.items():
  886. data[k] = v
  887. data["timeout"] = 440
  888. data["doc_id"] = data.get(document_tmp_docid,0)
  889. # if data["docid"]<298986054 and data["docid"]>0:
  890. # log("jump docid %s"%(str(data["docid"])))
  891. # ackMsg(conn,message_id,subscription)
  892. # return
  893. data["content"] = data.get(document_tmp_dochtmlcon,"")
  894. if document_tmp_dochtmlcon in data:
  895. data.pop(document_tmp_dochtmlcon)
  896. data["title"] = data.get(document_tmp_doctitle,"")
  897. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  898. data["web_source_name"] = item.get(document_tmp_web_source_name,"")
  899. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  900. data["page_attachments"] = item.get(document_tmp_attachment_path,"[]")
  901. _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))+str(data["original_docchannel"])
  902. to_ai = False
  903. if all_done>0:
  904. _time = time.time()
  905. # extract_json = self.getExtract_json_fromDB(_fingerprint)
  906. extract_json = self.getExtract_json_fromRedis(_fingerprint)
  907. log("get json from db takes %.4f"%(time.time()-_time))
  908. # extract_json = None
  909. _docid = int(data["doc_id"])
  910. if extract_json is not None:
  911. log("fingerprint %s exists docid:%s"%(_fingerprint,str(_docid)))
  912. _extract.setValue(document_extract2_extract_json,extract_json,True)
  913. else:
  914. resp = self.request_extract_interface(json=data,headers=self.header)
  915. if (resp.status_code >=200 and resp.status_code<=213):
  916. extract_json = resp.content.decode("utf8")
  917. _extract.setValue(document_extract2_extract_json,extract_json,True)
  918. _time = time.time()
  919. # self.putExtract_json_toDB(_fingerprint,_docid,extract_json)
  920. self.putExtract_json_toRedis(_fingerprint,extract_json)
  921. log("get json to db takes %.4f"%(time.time()-_time))
  922. else:
  923. log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  924. all_done = -2
  925. to_ai,_reason = self.should_to_extract_ai(extract_json)
  926. if to_ai:
  927. log("to_ai of docid:%s of reason:%s"%(str(_docid),str(_reason)))
  928. # if all_done>0:
  929. # resp = self.request_industry_interface(json=data,headers=self.header)
  930. # if (resp.status_code >=200 and resp.status_code<=213):
  931. # _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  932. # else:
  933. # log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  934. # all_done = -3
  935. # _to_ack = False
  936. # if all_done>0 and len(_extract.getProperties().get(document_extract2_extract_json,""))<=2:
  937. # all_done = -4
  938. _extract.setValue(document_extract2_industry_json,"{}",True)
  939. _to_ack = True
  940. try:
  941. if all_done!=1:
  942. # sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  943. log("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  944. if extract_times>=10:
  945. #process as succeed
  946. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  947. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  948. dtmp.update_row(self.ots_client)
  949. dhtml.update_row(self.ots_client)
  950. #replace as {}
  951. _extract.setValue(document_extract2_extract_json,"{}",True)
  952. _extract.setValue(document_extract2_industry_json,"{}",True)
  953. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  954. _extract.update_row(self.ots_client)
  955. _to_ack = True
  956. elif extract_times>5:
  957. #transform to the extract_failed queue
  958. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  959. #process as succeed
  960. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  961. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  962. dtmp.update_row(self.ots_client)
  963. dhtml.update_row(self.ots_client)
  964. #replace as {}
  965. _extract.setValue(document_extract2_extract_json,"{}",True)
  966. _extract.setValue(document_extract2_industry_json,"{}",True)
  967. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  968. _extract.update_row(self.ots_client)
  969. _to_ack = True
  970. else:
  971. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
  972. #失败保存
  973. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  974. dtmp.setValue(document_tmp_status,60,True)
  975. if not dtmp.exists_row(self.ots_client):
  976. dtmp.update_row(self.ots_client)
  977. dhtml.update_row(self.ots_client)
  978. if send_succeed:
  979. _to_ack = True
  980. else:
  981. #process succeed
  982. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  983. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  984. if _docid==290816703:
  985. dtmp.setValue("test_json",extract_json,True)
  986. dtmp.update_row(self.ots_client)
  987. dhtml.update_row(self.ots_client)
  988. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  989. _extract.update_row(self.ots_client)
  990. _to_ack = True
  991. except Exception:
  992. traceback.print_exc()
  993. if _to_ack:
  994. ackMsg(conn,message_id,subscription)
  995. if to_ai:
  996. #sent to ai
  997. item[document_extract2_extract_json] = json.loads(_extract.getProperties().get(document_extract2_extract_json,"{}"))
  998. send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai)
  999. else:
  1000. item["extract_times"] -= 1
  1001. send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
  1002. ackMsg(conn,message_id,subscription)
  1003. log("process %s docid:%d %s"%(str(_to_ack),data.get("doc_id"),str(all_done)))
  1004. except requests.ConnectionError as e1:
  1005. item["extract_times"] -= 1
  1006. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  1007. ackMsg(conn,message_id,subscription)
  1008. except Exception as e:
  1009. traceback.print_exc()
  1010. # sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
  1011. log("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
  1012. log("process %s docid: failed message_id:%s"%(data.get("doc_id"),message_id))
  1013. if extract_times>=10:
  1014. #process as succeed
  1015. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1016. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1017. dtmp.update_row(self.ots_client)
  1018. dhtml.update_row(self.ots_client)
  1019. #replace as {}
  1020. _extract.setValue(document_extract2_extract_json,"{}",True)
  1021. _extract.setValue(document_extract2_industry_json,"{}",True)
  1022. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1023. _extract.update_row(self.ots_client)
  1024. ackMsg(conn,message_id,subscription)
  1025. elif extract_times>5:
  1026. #transform to the extract_failed queue
  1027. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  1028. #process as succeed
  1029. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1030. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1031. dtmp.update_row(self.ots_client)
  1032. dhtml.update_row(self.ots_client)
  1033. #replace as {}
  1034. _extract.setValue(document_extract2_extract_json,"{}",True)
  1035. _extract.setValue(document_extract2_industry_json,"{}",True)
  1036. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1037. _extract.update_row(self.ots_client)
  1038. ackMsg(conn,message_id,subscription)
  1039. else:
  1040. #transform to the extract queue
  1041. #失败保存
  1042. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1043. dtmp.setValue(document_tmp_status,60,True)
  1044. if not dtmp.exists_row(self.ots_client):
  1045. dtmp.update_row(self.ots_client)
  1046. dhtml.update_row(self.ots_client)
  1047. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  1048. ackMsg(conn,message_id,subscription)
  1049. def should_to_extract_ai(self,extract_json):
  1050. _reason = ""
  1051. # return False,_reason
  1052. _extract = {}
  1053. if extract_json is not None:
  1054. try:
  1055. _extract = json.loads(extract_json)
  1056. except Exception as e:
  1057. pass
  1058. has_entity = False
  1059. docchannel = _extract.get("docchannel",{}).get("docchannel","")
  1060. doctype = _extract.get("docchannel",{}).get("doctype","")
  1061. entity_len = len(_extract.get("dict_enterprise",{}).keys())
  1062. products = str(_extract.get("product_attrs",""))
  1063. demand_info = str(_extract.get("demand_info",""))
  1064. project_name = str(_extract.get("name",""))
  1065. class_name = _extract.get("industry",{}).get("class_name","")
  1066. _entity = None
  1067. if entity_len>0:
  1068. has_entity = True
  1069. if entity_len==1:
  1070. _entity = list(_extract.get("dict_enterprise",{}).keys())[0]
  1071. prem = _extract.get("prem",{})
  1072. one_entity_used = False
  1073. has_tenderee = False
  1074. has_win_tenderer = False
  1075. has_budget = False
  1076. budget_unexpected = False
  1077. winprice_unexpected = False
  1078. for _pack,_pack_value in prem.items():
  1079. _rolelist = _pack_value.get("roleList",[])
  1080. for _role in _rolelist:
  1081. if _role.get("role_name","")=="tenderee":
  1082. has_tenderee = True
  1083. if _role.get("role_text","")==_entity:
  1084. one_entity_used = True
  1085. if _role.get("role_name","")=="agency":
  1086. if _role.get("role_text","")==_entity:
  1087. one_entity_used = True
  1088. if _role.get("role_name","")=="win_tenderer":
  1089. has_win_tenderer = True
  1090. if _role.get("role_text","")==_entity:
  1091. one_entity_used = True
  1092. win_price = _role.get("role_money",{}).get("money",0)
  1093. try:
  1094. win_price = float(win_price)
  1095. except Exception as e:
  1096. win_price = 0
  1097. if win_price>0:
  1098. if win_price>100000000 or win_price<100:
  1099. winprice_unexpected = True
  1100. tendereeMoney = _pack_value.get("tendereeMoney",0)
  1101. try:
  1102. tendereeMoney = float(tendereeMoney)
  1103. except Exception as e:
  1104. tendereeMoney = 0
  1105. if tendereeMoney>0:
  1106. has_budget = True
  1107. if tendereeMoney>100000000 or tendereeMoney<100:
  1108. budget_unexpected = True
  1109. if doctype=="采招数据":
  1110. if has_entity and not one_entity_used:
  1111. if not has_tenderee and docchannel in {"招标公告","中标信息","候选人公示","合同公告","验收合同"}:
  1112. return True,_reason
  1113. if not has_win_tenderer and docchannel in {"中标信息","候选人公示","合同公告","验收合同"}:
  1114. return True,_reason
  1115. if class_name=="医疗设备" or self.MP.is_medical_product(products) or self.MP.is_medical_product(demand_info) or self.MP.is_medical_product(project_name):
  1116. _reason = "medical product"
  1117. return True,_reason
  1118. if budget_unexpected or winprice_unexpected:
  1119. return True,_reason
  1120. return False,_reason
  1121. def extract_ai_handle(self,_dict,result_queue):
  1122. frame = _dict["frame"]
  1123. conn = _dict["conn"]
  1124. message_id = frame.headers["message-id"]
  1125. subscription = frame.headers.setdefault('subscription', None)
  1126. item = json.loads(frame.body)
  1127. _extract_json = None
  1128. if document_extract2_extract_json in item:
  1129. _extract_json = item.get(document_extract2_extract_json)
  1130. item.pop(document_extract2_extract_json)
  1131. try:
  1132. message_acknowledged = False
  1133. dtmp = Document_tmp(item)
  1134. _tomq = False
  1135. if dtmp.fix_columns(self.ots_client,["status","save"],True):
  1136. if dtmp.getProperties().get("status",0)>=71:
  1137. if dtmp.getProperties().get("save",1)==0:
  1138. message_acknowledged = True
  1139. log("extract_dump_ai of docid:%d"%(item.get(document_docid)))
  1140. ackMsg(conn,message_id,subscription)
  1141. return
  1142. else:
  1143. _tomq = True
  1144. else:
  1145. _tomq = True
  1146. if _tomq:
  1147. aitimes = item.get("aitimes")
  1148. if aitimes is None:
  1149. aitimes = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  1150. item["aitimes"] = aitimes
  1151. if not message_acknowledged:
  1152. item[document_extract2_extract_json] = _extract_json
  1153. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai):
  1154. message_acknowledged = True
  1155. ackMsg(conn,message_id,subscription)
  1156. time.sleep(1)
  1157. return
  1158. else:
  1159. if not timeAdd(aitimes,0,format="%Y-%m-%d %H:%M:%S",minutes=10)<getCurrent_date(format="%Y-%m-%d %H:%M:%S"):
  1160. if not message_acknowledged:
  1161. item[document_extract2_extract_json] = _extract_json
  1162. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai):
  1163. message_acknowledged = True
  1164. ackMsg(conn,message_id,subscription)
  1165. time.sleep(1)
  1166. return
  1167. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  1168. "docid":item.get("docid")})
  1169. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  1170. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  1171. _extract_ai = {}
  1172. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  1173. _text = html2text_with_tablehtml(_dochtmlcon)
  1174. if len(_text)<25000:
  1175. model_name = "ep-20250314164242-jd62g" #1.5pro 32k
  1176. else:
  1177. _text = _text[:100000]
  1178. model_name = "ep-20250212111145-fflr7" #1.5pro 256k
  1179. msg = get_prompt_extract_role(_text)
  1180. #model_name = "ep-20250212111145-fflr7" #1.5pro 256k
  1181. #model_name = "ep-20250314164242-jd62g" #1.5pro 32k
  1182. result = chat_doubao(msg,model_name=model_name)
  1183. _json = get_json_from_text(result)
  1184. if _json is not None:
  1185. try:
  1186. _extract_ai = json.loads(_json)
  1187. except Exception as e:
  1188. pass
  1189. if len(_extract_ai.keys())>0:
  1190. _new_json,_changed = self.merge_json(_extract_json,_json)
  1191. if _changed:
  1192. dtmp.setValue("extract_json_ai",json.dumps(_extract_ai,ensure_ascii=False))
  1193. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1194. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1195. dtmp.update_row(self.ots_client)
  1196. if not dhtml.exists_row(self.ots_client):
  1197. dhtml.update_row(self.ots_client)
  1198. _extract = Document_extract({})
  1199. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  1200. _extract.setValue(document_extract2_docid,item.get(document_docid))
  1201. _extract.setValue(document_extract2_extract_json,_new_json,True)
  1202. _extract.setValue(document_extract2_industry_json,"{}",True)
  1203. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1204. _extract.update_row(self.ots_client)
  1205. log("extract_ai of docid:%d"%(item.get(document_docid)))
  1206. else:
  1207. doc = Document({"partitionkey":item.get("partitionkey"),
  1208. "docid":item.get("docid"),
  1209. "extract_json_ai":_json
  1210. })
  1211. if doc.exists_row(self.ots_client):
  1212. doc.update_row(self.ots_client)
  1213. log("extract_nochange_ai of docid:%d"%(item.get(document_docid)))
  1214. if not message_acknowledged:
  1215. message_acknowledged = True
  1216. ackMsg(conn,message_id,subscription)
  1217. except Exception as e:
  1218. traceback.print_exc()
  1219. if not message_acknowledged:
  1220. ackMsg(conn,message_id,subscription)
  1221. def merge_json(self,extract_json,extract_ai_json):
  1222. def get_ai_money(_text):
  1223. b = re.search(r'[\d,,\.]+[亿万元人民币]+',str(_text))
  1224. if b is not None:
  1225. return b.group()
  1226. def clean_ai_entity(entity,set_entitys):
  1227. if isinstance(entity,str):
  1228. if re.search("(未|无)(明确|提及)|某(部|单位|医院|公司)|\*\*|XX|登录|详见|招标单位",entity) is not None:
  1229. return ""
  1230. if re.search("无|区|县|市|省|某|中心|部|公司",entity) is not None and len(entity)<=5:
  1231. return ""
  1232. # return entity
  1233. for _i in range(len(entity)-5):
  1234. if entity[:len(entity)-_i] in set_entitys:
  1235. return entity[:len(entity)-_i]
  1236. return ""
  1237. def clean_ai_extract(_extract,_extract_ai):
  1238. set_entitys = set()
  1239. set_moneys = set()
  1240. dict_enterprise = _extract.get("dict_enterprise",{})
  1241. for k,v in dict_enterprise.items():
  1242. set_entitys.add(str(k))
  1243. _sum = 0
  1244. for _money in _extract.get("moneys",[]):
  1245. _money = float(_money)
  1246. set_moneys.add(str(_money))
  1247. _sum += _money
  1248. if _sum>0:
  1249. set_moneys.add(str(float(_sum)))
  1250. _sum = 0
  1251. for _money in _extract.get("moneys_attachment",[]):
  1252. _money = float(_money)
  1253. set_moneys.add(str(_money))
  1254. _sum += _money
  1255. if _sum>0:
  1256. set_moneys.add(str(float(_sum)))
  1257. _tenderee_dict = _extract_ai.get("招标信息",{})
  1258. _tenderee_ai = _tenderee_dict.get("招标人名称")
  1259. if _tenderee_ai is not None:
  1260. _tenderee_ai = clean_ai_entity(_tenderee_ai,set_entitys)
  1261. _tenderee_dict["招标人名称"] = _tenderee_ai
  1262. # if _tenderee_ai in set_entitys:
  1263. # _tenderee_dict["招标人名称"] = _tenderee_ai
  1264. # else:
  1265. # _tenderee_dict["招标人名称"] = ""
  1266. _budget = _tenderee_dict.get("项目预算")
  1267. if _budget is not None:
  1268. _budget = getUnifyMoney(str(_budget))
  1269. _budget = str(float(_budget))
  1270. if _budget in set_moneys:
  1271. _tenderee_dict["项目预算"] = _budget
  1272. else:
  1273. _tenderee_dict["项目预算"] = ""
  1274. list_win = _extract_ai.get("中标信息",[])
  1275. for _win_dict_i in range(len(list_win)):
  1276. _win_dict = list_win[_win_dict_i]
  1277. _pack = _win_dict.get("标段号","")
  1278. if _pack=="":
  1279. if len(list_win)>1:
  1280. _pack = "AI_%d"%(_win_dict_i)
  1281. else:
  1282. _pack = "Project"
  1283. _win_money = _win_dict.get("中标金额")
  1284. if str(_win_money).find("%")>=0:
  1285. _win_money = 0
  1286. _win_money = getUnifyMoney(str(_win_money))
  1287. _win_money = str(float(_win_money))
  1288. if _win_money in set_moneys:
  1289. _win_dict["中标金额"] = _win_money
  1290. else:
  1291. _win_dict["中标金额"] = ""
  1292. _win_tenderer = _win_dict.get("中标人名称")
  1293. _win_tenderer = clean_ai_entity(_win_tenderer,set_entitys)
  1294. _win_dict["中标人名称"] = _win_tenderer
  1295. # if _win_tenderer in set_entitys:
  1296. # _win_dict["中标人名称"] = _win_tenderer
  1297. # else:
  1298. # _win_dict["中标人名称"] = ""
  1299. list_product = _extract_ai.get("产品信息",[])
  1300. for product_i in range(len(list_product)):
  1301. product_dict = list_product[product_i]
  1302. uni_price = product_dict.get("单价","")
  1303. quantity = product_dict.get("数量","")
  1304. total_price = product_dict.get("总价","")
  1305. if uni_price is not None and uni_price!="":
  1306. uni_price = getUnifyMoney(str(uni_price))
  1307. uni_price = str(float(uni_price))
  1308. if uni_price in set_moneys:
  1309. product_dict["单价"] = uni_price
  1310. else:
  1311. product_dict["单价"] = ""
  1312. if quantity is not None and quantity!="":
  1313. quantity = getUnifyMoney(str(quantity))
  1314. product_dict["数量"] = quantity
  1315. if total_price is not None and total_price!="":
  1316. total_price = getUnifyMoney(str(total_price))
  1317. total_price = str(float(total_price))
  1318. if total_price in set_moneys:
  1319. product_dict["总价"] = total_price
  1320. else:
  1321. product_dict["总价"] = ""
  1322. _extract = {}
  1323. if extract_json is not None:
  1324. try:
  1325. if isinstance(extract_json,str):
  1326. _extract = json.loads(extract_json)
  1327. else:
  1328. _extract = extract_json
  1329. except Exception as e:
  1330. pass
  1331. if "extract_count" not in _extract:
  1332. _extract["extract_count"] = 0
  1333. _extract_ai = {}
  1334. if extract_ai_json is not None:
  1335. try:
  1336. _extract_ai = json.loads(extract_ai_json)
  1337. except Exception as e:
  1338. pass
  1339. clean_ai_extract(_extract,_extract_ai)
  1340. _product = _extract.get("product",[])
  1341. list_new_product = _extract_ai.get("产品信息",[])
  1342. prem = _extract.get("prem")
  1343. if prem is None:
  1344. _extract["prem"] = {}
  1345. prem = _extract["prem"]
  1346. Project = prem.get("Project")
  1347. if Project is None:
  1348. prem["Project"] = {}
  1349. Project = prem["Project"]
  1350. Project_rolelist = Project.get("roleList")
  1351. if Project_rolelist is None:
  1352. Project["roleList"] = []
  1353. Project_rolelist = Project["roleList"]
  1354. has_tenderee = False
  1355. has_win_tenderer = False
  1356. has_budget = False
  1357. budget_unexpected = False
  1358. winprice_unexpected = False
  1359. for _pack,_pack_value in prem.items():
  1360. _rolelist = _pack_value.get("roleList",[])
  1361. for _role in _rolelist:
  1362. if _role.get("role_name","")=="tenderee":
  1363. has_tenderee = True
  1364. if _role.get("role_name","")=="win_tenderer":
  1365. has_win_tenderer = True
  1366. win_price = _role.get("role_money",{}).get("money",0)
  1367. try:
  1368. win_price = float(win_price)
  1369. except Exception as e:
  1370. win_price = 0
  1371. if win_price>0:
  1372. if win_price>100000000 or win_price<100:
  1373. winprice_unexpected = True
  1374. tendereeMoney = _pack_value.get("tendereeMoney",0)
  1375. try:
  1376. tendereeMoney = float(tendereeMoney)
  1377. except Exception as e:
  1378. tendereeMoney = 0
  1379. if tendereeMoney>0:
  1380. has_budget = True
  1381. if tendereeMoney>100000000 or tendereeMoney<100:
  1382. budget_unexpected = True
  1383. _changed = False
  1384. prem_original = {}
  1385. product_attrs_original = {}
  1386. try:
  1387. prem_original = json.loads(json.dumps(_extract.get("prem",{})))
  1388. product_attrs_original = json.loads(json.dumps(_extract.get("product_attrs",{})))
  1389. except Exception as e:
  1390. pass
  1391. if len(list_new_product)>0 and len(list_new_product)>len(product_attrs_original.get("data",[]))//3:
  1392. set_product = set(_product)
  1393. product_attrs_new = {"data":[]}
  1394. for product_i in range(len(list_new_product)):
  1395. brand = list_new_product[product_i].get("品牌","")
  1396. product = list_new_product[product_i].get("产品名称","")
  1397. quantity = list_new_product[product_i].get("数量","")
  1398. quantity_unit = list_new_product[product_i].get("数量单位","")
  1399. specs = list_new_product[product_i].get("规格型号","")
  1400. uni_price = list_new_product[product_i].get("单价","")
  1401. total_price = list_new_product[product_i].get("总价","")
  1402. pinmu_no = list_new_product[product_i].get("品目编号","")
  1403. pinmu_name = list_new_product[product_i].get("品目名称","")
  1404. if product=="":
  1405. continue
  1406. product_attrs_new["data"].append({
  1407. "brand": str(brand),
  1408. "product": product,
  1409. "quantity": str(quantity),
  1410. "quantity_unit": quantity_unit,
  1411. "specs": str(specs),
  1412. "unitPrice": str(uni_price),
  1413. "parameter": "",
  1414. "total_price": str(total_price),
  1415. "pinmu_no": str(pinmu_no),
  1416. "pinmu_name": str(pinmu_name)
  1417. })
  1418. if product not in set_product:
  1419. set_product.add(product)
  1420. _changed = True
  1421. _extract["product"] = list(set_product)
  1422. _extract["product_attrs"] = product_attrs_new
  1423. _extract["extract_count"] += 3
  1424. if not has_tenderee:
  1425. _tenderee_ai = _extract_ai.get("招标信息",{}).get("招标人名称")
  1426. _contacts = _extract_ai.get("招标信息",{}).get("招标人联系方式",[])
  1427. _budget = _extract_ai.get("招标信息",{}).get("项目预算","")
  1428. _linklist = []
  1429. for _conta in _contacts:
  1430. _person = _conta.get("联系人","")
  1431. _phone = _conta.get("联系电话","")
  1432. if _person!="" or _phone!="":
  1433. _linklist.append([_person,_phone])
  1434. if _tenderee_ai is not None and _tenderee_ai!="" and len(_tenderee_ai)>=4:
  1435. _role_dict = {
  1436. "role_name": "tenderee",
  1437. "role_text": _tenderee_ai,
  1438. "from_ai":True
  1439. }
  1440. if len(_linklist)>0:
  1441. _role_dict["linklist"] = _linklist
  1442. Project_rolelist.append(_role_dict)
  1443. _changed = True
  1444. _extract["extract_count"] += 1
  1445. if not has_budget or budget_unexpected:
  1446. _budget = _extract_ai.get("招标信息",{}).get("项目预算","")
  1447. if _budget is not None and _budget!="":
  1448. _budget = getUnifyMoney(_budget)
  1449. if _budget>0:
  1450. if "tendereeMoney" in Project:
  1451. Project["tendereeMoney_original"] = Project["tendereeMoney"]
  1452. Project["tendereeMoney"] = str(float(_budget))
  1453. _changed = True
  1454. _extract["extract_count"] += 1
  1455. else:
  1456. if budget_unexpected:
  1457. if "tendereeMoney" in Project:
  1458. Project["tendereeMoney_original"] = Project["tendereeMoney"]
  1459. Project["tendereeMoney"] = "0"
  1460. if not has_win_tenderer or winprice_unexpected:
  1461. list_win = _extract_ai.get("中标信息",[])
  1462. if len(list_win)>0:
  1463. winprice_unexpected_new = False
  1464. for _win_dict_i in range(len(list_win)):
  1465. _win_dict = list_win[_win_dict_i]
  1466. _pack = _win_dict.get("标段号","")
  1467. if _pack=="":
  1468. if len(list_win)>1:
  1469. _pack = "AI_%d"%(_win_dict_i)
  1470. else:
  1471. _pack = "Project"
  1472. _win_money = _win_dict.get("中标金额")
  1473. if _win_money is not None and _win_money!="":
  1474. _win_money = getUnifyMoney(_win_money)
  1475. else:
  1476. _win_money = 0
  1477. if _win_money>0:
  1478. if _win_money>100000000 or _win_money<100:
  1479. winprice_unexpected_new = True
  1480. has_delete_win = False
  1481. if winprice_unexpected and not winprice_unexpected_new:
  1482. has_delete_win = True
  1483. pop_packs = []
  1484. for _pack,_pack_value in prem.items():
  1485. _rolelist = _pack_value.get("roleList",[])
  1486. new_rolelist = []
  1487. for _role in _rolelist:
  1488. if _role.get("role_name","")!="win_tenderer":
  1489. new_rolelist.append(_role)
  1490. _pack_value["roleList"] = new_rolelist
  1491. if len(new_rolelist)==0 and _pack!="Project":
  1492. pop_packs.append(_pack)
  1493. for _pack in pop_packs:
  1494. prem.pop(_pack)
  1495. if not has_win_tenderer or has_delete_win:
  1496. Project_rolelist = Project.get("roleList")
  1497. if Project_rolelist is None:
  1498. Project["roleList"] = []
  1499. Project_rolelist = Project["roleList"]
  1500. for _win_dict_i in range(len(list_win)):
  1501. _win_dict = list_win[_win_dict_i]
  1502. _pack = _win_dict.get("标段号","")
  1503. if _pack=="":
  1504. if len(list_win)>1:
  1505. _pack = "AI_%d"%(_win_dict_i)
  1506. else:
  1507. _pack = "Project"
  1508. _win_money = _win_dict.get("中标金额")
  1509. if _win_money is not None and _win_money!="":
  1510. _win_money = getUnifyMoney(_win_money)
  1511. else:
  1512. _win_money = 0
  1513. _win_tenderer = _win_dict.get("中标人名称")
  1514. if _win_tenderer!="" and len(_win_tenderer)>=4:
  1515. _role_dict = {
  1516. "role_name": "win_tenderer",
  1517. "role_text": _win_tenderer,
  1518. "winprice_unexpected":winprice_unexpected,
  1519. "from_ai":True
  1520. }
  1521. _role_dict["role_money"] = {
  1522. "money": str(float(_win_money))
  1523. }
  1524. _changed = True
  1525. _extract["extract_count"] += 2
  1526. if _pack=="Project":
  1527. Project_rolelist.append(_role_dict)
  1528. else:
  1529. prem[_pack] = {
  1530. "roleList":[
  1531. _role_dict
  1532. ]
  1533. }
  1534. if _changed:
  1535. _extract["extract_ai"] = True
  1536. _extract["prem_original"] = prem_original
  1537. _extract["product_attrs_original"] = product_attrs_original
  1538. return json.dumps(_extract,ensure_ascii=False),_changed
  1539. def delete_document_extract(self,save_count=70*10000):
  1540. conn = self.pool_postgres.getConnector()
  1541. try:
  1542. cursor = conn.cursor()
  1543. sql = " select max(docid),min(docid) from document_extract "
  1544. cursor.execute(sql)
  1545. rows = cursor.fetchall()
  1546. if len(rows)>0:
  1547. maxdocid,mindocid = rows[0]
  1548. d_mindocid = int(maxdocid)-save_count
  1549. if mindocid<d_mindocid:
  1550. sql = " delete from document_extract where docid<%d"%d_mindocid
  1551. cursor.execute(sql)
  1552. conn.commit()
  1553. except Exception as e:
  1554. traceback.print_exc()
  1555. finally:
  1556. self.pool_postgres.putConnector(conn)
  1557. def start_flow_extract(self):
  1558. schedule = BlockingScheduler()
  1559. schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
  1560. schedule.add_job(self.process_extract_failed,"cron",minute="*/5")
  1561. # schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
  1562. # schedule.add_job(self.monitor_listener,"cron",minute="*/5")
  1563. schedule.start()
  1564. from multiprocessing import RLock
  1565. docid_lock = RLock()
  1566. conn_mysql = None
  1567. def generateRangeDocid(nums):
  1568. global conn_mysql
  1569. while 1:
  1570. try:
  1571. with docid_lock:
  1572. if conn_mysql is None:
  1573. conn_mysql = getConnection_mysql()
  1574. cursor = conn_mysql.cursor()
  1575. sql = "select serial_value from b2c_serial_no where serial_name='DocumentIdSerial'"
  1576. cursor.execute(sql)
  1577. rows = cursor.fetchall()
  1578. current_docid = rows[0][0]
  1579. next_docid = current_docid+1
  1580. update_docid = current_docid+nums
  1581. sql = " update b2c_serial_no set serial_value=%d where serial_name='DocumentIdSerial'"%(update_docid)
  1582. cursor.execute(sql)
  1583. conn_mysql.commit()
  1584. return next_docid
  1585. except Exception as e:
  1586. conn_mysql = getConnection_mysql()
  1587. # 自定义jsonEncoder
  1588. class MyEncoder(json.JSONEncoder):
  1589. def default(self, obj):
  1590. if isinstance(obj, np.ndarray):
  1591. return obj.tolist()
  1592. elif isinstance(obj, bytes):
  1593. return str(obj, encoding='utf-8')
  1594. elif isinstance(obj, (np.float_, np.float16, np.float32,
  1595. np.float64,Decimal)):
  1596. return float(obj)
  1597. elif isinstance(obj,str):
  1598. return obj
  1599. return json.JSONEncoder.default(self, obj)
  1600. class Dataflow_init(Dataflow):
  1601. class InitListener():
  1602. def __init__(self,conn,*args,**kwargs):
  1603. self.conn = conn
  1604. self.get_count = 1000
  1605. self.count = self.get_count
  1606. self.begin_docid = None
  1607. self.mq_init = "/queue/dataflow_init"
  1608. self.mq_attachment = "/queue/dataflow_attachment"
  1609. self.mq_extract = "/queue/dataflow_extract"
  1610. self.pool_mq1 = ConnectorPool(1,4,getConnect_activateMQ)
  1611. def on_error(self, headers):
  1612. log('received an error %s' % headers.body)
  1613. def getRangeDocid(self):
  1614. begin_docid = generateRangeDocid(self.get_count)
  1615. self.begin_docid = begin_docid
  1616. self.count = 0
  1617. def getNextDocid(self):
  1618. if self.count>=self.get_count:
  1619. self.getRangeDocid()
  1620. next_docid = self.begin_docid+self.count
  1621. self.count += 1
  1622. return next_docid
  1623. def on_message(self, headers):
  1624. try:
  1625. next_docid = int(self.getNextDocid())
  1626. partitionkey = int(next_docid%500+1)
  1627. message_id = headers.headers["message-id"]
  1628. body = json.loads(headers.body)
  1629. body[document_tmp_partitionkey] = partitionkey
  1630. body[document_tmp_docid] = next_docid
  1631. if body.get(document_original_docchannel) is None:
  1632. body[document_original_docchannel] = body.get(document_docchannel)
  1633. page_attachments = body.get(document_tmp_attachment_path,"[]")
  1634. _uuid = body.get(document_tmp_uuid,"")
  1635. if page_attachments!="[]":
  1636. status = random.randint(1,10)
  1637. body[document_tmp_status] = status
  1638. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment):
  1639. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  1640. ackMsg(self.conn,message_id)
  1641. else:
  1642. log("send_msg_error on init listener")
  1643. else:
  1644. status = random.randint(11,50)
  1645. body[document_tmp_status] = status
  1646. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract):
  1647. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  1648. ackMsg(self.conn,message_id)
  1649. else:
  1650. log("send_msg_error on init listener")
  1651. except Exception as e:
  1652. traceback.print_exc()
  1653. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_init):
  1654. log("init error")
  1655. ackMsg(self.conn,message_id)
  1656. def __del__(self):
  1657. self.conn.disconnect()
  1658. del self.pool_mq1
  1659. def __init__(self):
  1660. Dataflow.__init__(self)
  1661. self.max_shenpi_id = None
  1662. self.base_shenpi_id = 400000000000
  1663. self.mq_init = "/queue/dataflow_init"
  1664. self.mq_attachment = "/queue/dataflow_attachment"
  1665. self.mq_extract = "/queue/dataflow_extract"
  1666. self.pool_oracle = ConnectorPool(10,15,getConnection_oracle)
  1667. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  1668. self.ots_capacity = getConnect_ots_capacity()
  1669. self.init_comsumer_counts = 2
  1670. self.list_init_comsumer = []
  1671. for i in range(self.init_comsumer_counts):
  1672. listener = self.InitListener(getConnect_activateMQ())
  1673. createComsumer(listener,self.mq_init)
  1674. self.list_init_comsumer.append(listener)
  1675. def monitor_listener(self):
  1676. for i in range(len(self.list_init_comsumer)):
  1677. if self.list_init_comsumer[i].conn.is_connected():
  1678. continue
  1679. else:
  1680. listener = self.InitListener(getConnect_activateMQ())
  1681. createComsumer(listener,self.mq_init)
  1682. self.list_init_comsumer[i] = listener
  1683. def temp2mq(self,object):
  1684. conn_oracle = self.pool_oracle.getConnector()
  1685. try:
  1686. list_obj = object.select_rows(conn_oracle,type(object),object.table_name,[])
  1687. for _obj in list_obj:
  1688. ots_dict = _obj.getProperties_ots()
  1689. if len(ots_dict.get("dochtmlcon",""))>500000:
  1690. _obj.delete_row(conn_oracle)
  1691. log("msg too long:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  1692. continue
  1693. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_init):
  1694. #删除数据,上线放开
  1695. _obj.delete_row(conn_oracle)
  1696. else:
  1697. log("send_msg_error111:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  1698. self.pool_oracle.putConnector(conn_oracle)
  1699. except Exception as e:
  1700. traceback.print_exc()
  1701. self.pool_oracle.decrease()
  1702. def shenpi2mq(self):
  1703. conn_oracle = self.pool_oracle.getConnector()
  1704. try:
  1705. if self.max_shenpi_id is None:
  1706. # get the max_shenpi_id
  1707. _query = BoolQuery(must_queries=[ExistsQuery("id")])
  1708. rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
  1709. SearchQuery(_query,sort=Sort(sorters=[FieldSort("id",SortOrder.DESC)]),limit=1))
  1710. list_data = getRow_ots(rows)
  1711. if len(list_data)>0:
  1712. max_shenpi_id = list_data[0].get("id")
  1713. if max_shenpi_id>self.base_shenpi_id:
  1714. max_shenpi_id -= self.base_shenpi_id
  1715. self.max_shenpi_id = max_shenpi_id
  1716. if self.max_shenpi_id<60383953:
  1717. self.max_shenpi_id = 60383953
  1718. if self.max_shenpi_id is not None:
  1719. # select data in order
  1720. origin_max_shenpi_id = T_SHEN_PI_XIANG_MU.get_max_id(conn_oracle)
  1721. if origin_max_shenpi_id is not None:
  1722. log("shenpi origin_max_shenpi_id:%d current_id:%d"%(origin_max_shenpi_id,self.max_shenpi_id))
  1723. for _id_i in range(self.max_shenpi_id+1,origin_max_shenpi_id+1):
  1724. list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i)
  1725. # send data to mq one by one with max_shenpi_id updated
  1726. for _data in list_data:
  1727. _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
  1728. ots_dict = _data.getProperties_ots()
  1729. if ots_dict["docid"]<self.base_shenpi_id:
  1730. ots_dict["docid"] += self.base_shenpi_id
  1731. ots_dict["partitionkey"] = ots_dict["docid"]%500+1
  1732. if ots_dict.get(T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS,"") !='[]':
  1733. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_attachment):
  1734. self.max_shenpi_id = _id
  1735. else:
  1736. log("sent shenpi message to mq failed %s"%(_id))
  1737. break
  1738. else:
  1739. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_extract):
  1740. self.max_shenpi_id = _id
  1741. else:
  1742. log("sent shenpi message to mq failed %s"%(_id))
  1743. break
  1744. self.pool_oracle.putConnector(conn_oracle)
  1745. except Exception as e:
  1746. log("shenpi error")
  1747. traceback.print_exc()
  1748. self.pool_oracle.decrease()
  1749. def fix_shenpi(self):
  1750. pool_oracle = ConnectorPool(10,15,getConnection_oracle)
  1751. begin_id = 0
  1752. end_id = 64790010
  1753. thread_num = 15
  1754. step = (end_id-begin_id)//thread_num
  1755. list_items = []
  1756. for _i in range(thread_num):
  1757. _begin = _i*step
  1758. _end = (_i+1)*step-1
  1759. if _i==thread_num-1:
  1760. _end = end_id
  1761. list_items.append((_begin,_end,_i))
  1762. task_queue = Queue()
  1763. for item in list_items:
  1764. task_queue.put(item)
  1765. fix_count_list = []
  1766. def _handle(item,result_queue):
  1767. conn_oracle = pool_oracle.getConnector()
  1768. (begin_id,end_id,thread_id) = item
  1769. _count = 0
  1770. for _id_i in range(begin_id,end_id):
  1771. try:
  1772. bool_query = BoolQuery(must_queries=[
  1773. TermQuery("docchannel",302),
  1774. TermQuery("original_id",_id_i)
  1775. ])
  1776. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1777. SearchQuery(bool_query,get_total_count=True))
  1778. if total_count>0:
  1779. continue
  1780. # bool_query = BoolQuery(must_queries=[
  1781. # TermQuery("id",_id_i),
  1782. # ])
  1783. # rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
  1784. # SearchQuery(bool_query,get_total_count=True))
  1785. # if total_count>0:
  1786. # continue
  1787. try:
  1788. list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i)
  1789. except Exception as e:
  1790. continue
  1791. # send data to mq one by one with max_shenpi_id updated
  1792. for _data in list_data:
  1793. _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
  1794. ots_dict = _data.getProperties_ots()
  1795. if ots_dict["docid"]<self.base_shenpi_id:
  1796. ots_dict["docid"] += self.base_shenpi_id
  1797. ots_dict["partitionkey"] = ots_dict["docid"]%500+1
  1798. ots_dict["status"] = 201
  1799. dict_1 = {}
  1800. dict_2 = {}
  1801. for k,v in ots_dict.items():
  1802. if k!="dochtmlcon":
  1803. dict_1[k] = v
  1804. if k in ('partitionkey',"docid","dochtmlcon"):
  1805. dict_2[k] = v
  1806. d_1 = Document(dict_1)
  1807. d_2 = Document(dict_2)
  1808. d_1.update_row(self.ots_client)
  1809. d_2.update_row(self.ots_capacity)
  1810. _count += 1
  1811. except Exception as e:
  1812. traceback.print_exc()
  1813. log("thread_id:%d=%d/%d/%d"%(thread_id,_id_i-begin_id,_count,end_id-begin_id))
  1814. fix_count_list.append(_count)
  1815. pool_oracle.putConnector(conn_oracle)
  1816. mt = MultiThreadHandler(task_queue,_handle,None,thread_count=thread_num)
  1817. mt.run()
  1818. print(fix_count_list,sum(fix_count_list))
  1819. def ots2mq(self):
  1820. try:
  1821. from BaseDataMaintenance.java.MQInfo import getQueueSize
  1822. attachment_size = getQueueSize("dataflow_attachment")
  1823. extract_size = getQueueSize("dataflow_extract")
  1824. if attachment_size>1000 or extract_size>1000:
  1825. log("ots2mq break because of long queue size")
  1826. return
  1827. bool_query = BoolQuery(must_queries=[RangeQuery("status",1,51)])
  1828. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1829. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  1830. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1831. list_data = getRow_ots(rows)
  1832. task_queue = Queue()
  1833. for _data in list_data:
  1834. task_queue.put(_data)
  1835. while next_token:
  1836. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1837. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1838. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1839. list_data = getRow_ots(rows)
  1840. for _data in list_data:
  1841. task_queue.put(_data)
  1842. if task_queue.qsize()>=1000:
  1843. break
  1844. def _handle(_data,result_queue):
  1845. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1846. document_tmp_docid:_data.get(document_tmp_docid),
  1847. document_tmp_status:0}
  1848. _document = Document(_d)
  1849. _document.fix_columns(self.ots_client,None,True)
  1850. _data = _document.getProperties()
  1851. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1852. _document_html = Document(_data)
  1853. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  1854. if page_attachments!="[]":
  1855. status = random.randint(1,10)
  1856. _data[document_tmp_status] = status
  1857. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1858. else:
  1859. status = random.randint(11,50)
  1860. _data[document_tmp_status] = status
  1861. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1862. if send_succeed:
  1863. _document.setValue(document_tmp_status,0,True)
  1864. _document.update_row(self.ots_client)
  1865. else:
  1866. log("send_msg_error2222")
  1867. if task_queue.qsize()>0:
  1868. mt = MultiThreadHandler(task_queue,_handle,None,15)
  1869. mt.run()
  1870. except Exception as e:
  1871. traceback.print_exc()
  1872. def otstmp2mq(self):
  1873. try:
  1874. from BaseDataMaintenance.java.MQInfo import getQueueSize
  1875. attachment_size = getQueueSize("dataflow_attachment")
  1876. extract_size = getQueueSize("dataflow_extract")
  1877. if attachment_size>1000 or extract_size>1000:
  1878. log("otstmp2mq break because of long queue size")
  1879. return
  1880. bool_query = BoolQuery(must_queries=[TermQuery("status",0)])
  1881. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1882. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  1883. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1884. list_data = getRow_ots(rows)
  1885. for _data in list_data:
  1886. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1887. document_tmp_docid:_data.get(document_tmp_docid),
  1888. document_tmp_status:0}
  1889. _document = Document_tmp(_d)
  1890. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1891. log("refix doc %s from document_tmp"%(str(_data.get(document_tmp_docid))))
  1892. _document_html = Document_html(_data)
  1893. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  1894. if page_attachments!="[]":
  1895. status = random.randint(1,10)
  1896. _data[document_tmp_status] = status
  1897. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1898. else:
  1899. status = random.randint(11,50)
  1900. _data[document_tmp_status] = status
  1901. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1902. if send_succeed:
  1903. _document.setValue(document_tmp_status,1,True)
  1904. _document.update_row(self.ots_client)
  1905. else:
  1906. log("send_msg_error2222")
  1907. while next_token:
  1908. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1909. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1910. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1911. list_data = getRow_ots(rows)
  1912. for _data in list_data:
  1913. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1914. document_tmp_docid:_data.get(document_tmp_docid),
  1915. document_tmp_status:0}
  1916. _document = Document_tmp(_d)
  1917. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1918. _document_html = Document_html(_data)
  1919. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  1920. if page_attachments!="[]":
  1921. status = random.randint(1,10)
  1922. _data[document_tmp_status] = status
  1923. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1924. else:
  1925. status = random.randint(11,50)
  1926. _data[document_tmp_status] = status
  1927. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1928. if send_succeed:
  1929. _document.setValue(document_tmp_status,1,True)
  1930. _document.update_row(self.ots_client)
  1931. else:
  1932. log("send_msg_error2222")
  1933. except Exception as e:
  1934. traceback.print_exc()
  1935. def test_dump_docid(self):
  1936. class TestDumpListener(ActiveMQListener):
  1937. def on_message(self, headers):
  1938. message_id = headers.headers["message-id"]
  1939. body = headers.body
  1940. self._queue.put(headers,True)
  1941. ackMsg(self.conn,message_id)
  1942. _queue = Queue()
  1943. listener1 = TestDumpListener(getConnect_activateMQ(),_queue)
  1944. listener2 = TestDumpListener(getConnect_activateMQ(),_queue)
  1945. createComsumer(listener1,"/queue/dataflow_attachment")
  1946. createComsumer(listener2,"/queue/dataflow_extract")
  1947. time.sleep(10)
  1948. list_item = []
  1949. list_docid = []
  1950. while 1:
  1951. try:
  1952. _item = _queue.get(timeout=2)
  1953. list_item.append(_item)
  1954. except Exception as e:
  1955. break
  1956. for item in list_item:
  1957. _item = json.loads(item.body)
  1958. list_docid.append(_item.get("docid"))
  1959. log(list_docid[:10])
  1960. log("len docid:%d set len:%d"%(len(list_docid),len(set(list_docid))))
  1961. def start_dataflow_init(self):
  1962. # self.test_dump_docid()
  1963. from BaseDataMaintenance.model.oracle.CaiGouYiXiangTemp import CaiGouYiXiangTemp
  1964. from BaseDataMaintenance.model.oracle.PaiMaiChuRangTemp import PaiMaiChuRangTemp
  1965. from BaseDataMaintenance.model.oracle.ZhaoBiaoGongGaoTemp import ZhaoBiaoGongGaoTemp
  1966. from BaseDataMaintenance.model.oracle.ZhaoBiaoYuGaoTemp import ZhaoBiaoYuGaoTemp
  1967. from BaseDataMaintenance.model.oracle.ZhongBiaoXinXiTemp import ZhongBiaoXinXiTemp
  1968. from BaseDataMaintenance.model.oracle.ZiShenJieGuoTemp import ZiShenJieGuoTemp
  1969. from BaseDataMaintenance.model.oracle.ChanQuanJiaoYiTemp import ChanQuanJiaoYiTemp
  1970. from BaseDataMaintenance.model.oracle.GongGaoBianGengTemp import GongGaoBianGeng
  1971. from BaseDataMaintenance.model.oracle.KongZhiJiaTemp import KongZhiJiaTemp
  1972. from BaseDataMaintenance.model.oracle.TuDiKuangChanTemp import TuDiKuangChanTemp
  1973. from BaseDataMaintenance.model.oracle.ZhaoBiaoDaYiTemp import ZhaoBiaoDaYiTemp
  1974. from BaseDataMaintenance.model.oracle.ZhaoBiaoWenJianTemp import ZhaoBiaoWenJianTemp
  1975. from BaseDataMaintenance.model.oracle.TouSuChuLiTemp import TouSuChuLiTemp
  1976. from BaseDataMaintenance.model.oracle.WeiFaJiLuTemp import WeiFaJiLuTemp
  1977. from BaseDataMaintenance.model.oracle.QiTaShiXinTemp import QiTaShiXin
  1978. schedule = BlockingScheduler()
  1979. schedule.add_job(self.temp2mq,"cron",args=(CaiGouYiXiangTemp({}),),second="*/10")
  1980. schedule.add_job(self.temp2mq,"cron",args=(PaiMaiChuRangTemp({}),),second="*/10")
  1981. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoGongGaoTemp({}),),second="*/10")
  1982. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoYuGaoTemp({}),),second="*/10")
  1983. schedule.add_job(self.temp2mq,"cron",args=(ZhongBiaoXinXiTemp({}),),second="*/10")
  1984. schedule.add_job(self.temp2mq,"cron",args=(ZiShenJieGuoTemp({}),),second="*/10")
  1985. schedule.add_job(self.temp2mq,"cron",args=(ChanQuanJiaoYiTemp({}),),second="*/10")
  1986. schedule.add_job(self.temp2mq,"cron",args=(GongGaoBianGeng({}),),second="*/10")
  1987. schedule.add_job(self.temp2mq,"cron",args=(KongZhiJiaTemp({}),),second="*/10")
  1988. schedule.add_job(self.temp2mq,"cron",args=(TuDiKuangChanTemp({}),),second="*/10")
  1989. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10")
  1990. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10")
  1991. schedule.add_job(self.temp2mq,"cron",args=(TouSuChuLiTemp({}),),second="*/10")
  1992. schedule.add_job(self.temp2mq,"cron",args=(WeiFaJiLuTemp({}),),second="*/10")
  1993. schedule.add_job(self.temp2mq,"cron",args=(QiTaShiXin({}),),second="*/10")
  1994. schedule.add_job(self.ots2mq,"cron",second="*/10")
  1995. schedule.add_job(self.otstmp2mq,"cron",second="*/10")
  1996. schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  1997. schedule.add_job(self.shenpi2mq,"cron",minute="*/1")
  1998. schedule.start()
  1999. def transform_attachment():
  2000. from BaseDataMaintenance.model.ots.attachment import attachment
  2001. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  2002. from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots
  2003. from threading import Thread
  2004. from queue import Queue
  2005. task_queue = Queue()
  2006. def comsumer(task_queue,pool_postgres):
  2007. while 1:
  2008. _dict = task_queue.get(True)
  2009. try:
  2010. attach = Attachment_postgres(_dict)
  2011. if not attach.exists(pool_postgres):
  2012. attach.insert_row(pool_postgres)
  2013. except Exception as e:
  2014. traceback.print_exc()
  2015. def start_comsumer(task_queue):
  2016. pool_postgres = ConnectorPool(10,30,getConnection_postgres)
  2017. comsumer_count = 30
  2018. list_thread = []
  2019. for i in range(comsumer_count):
  2020. _t = Thread(target=comsumer,args=(task_queue,pool_postgres))
  2021. list_thread.append(_t)
  2022. for _t in list_thread:
  2023. _t.start()
  2024. ots_client = getConnect_ots()
  2025. _thread = Thread(target=start_comsumer,args=(task_queue,))
  2026. _thread.start()
  2027. bool_query = BoolQuery(must_queries=[
  2028. RangeQuery(attachment_crtime,"2022-05-06"),
  2029. BoolQuery(should_queries=[TermQuery(attachment_status,10),
  2030. TermQuery(attachment_status,ATTACHMENT_TOOLARGE)])
  2031. ])
  2032. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  2033. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(attachment_crtime,sort_order=SortOrder.DESC)]),get_total_count=True,limit=100),
  2034. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  2035. list_dict = getRow_ots(rows)
  2036. for _dict in list_dict:
  2037. task_queue.put(_dict,True)
  2038. _count = len(list_dict)
  2039. while next_token:
  2040. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  2041. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  2042. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  2043. list_dict = getRow_ots(rows)
  2044. for _dict in list_dict:
  2045. task_queue.put(_dict,True)
  2046. _count += len(list_dict)
  2047. print("%d/%d,queue:%d"%(_count,total_count,task_queue.qsize()))
  2048. def del_test_doc():
  2049. ots_client = getConnect_ots()
  2050. bool_query = BoolQuery(must_queries=[RangeQuery("docid",range_to=0)])
  2051. list_data = []
  2052. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  2053. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  2054. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  2055. print(total_count)
  2056. list_row = getRow_ots(rows)
  2057. list_data.extend(list_row)
  2058. while next_token:
  2059. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  2060. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  2061. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  2062. list_row = getRow_ots(rows)
  2063. list_data.extend(list_row)
  2064. for _row in list_data:
  2065. _doc = Document_tmp(_row)
  2066. _doc.delete_row(ots_client)
  2067. _html = Document_html(_row)
  2068. if _html.exists_row(ots_client):
  2069. _html.delete_row(ots_client)
  2070. def fixDoc_to_queue_extract():
  2071. pool_mq = ConnectorPool(10,20,getConnect_activateMQ)
  2072. try:
  2073. ots_client = getConnect_ots()
  2074. ots_capacity = getConnect_ots_capacity()
  2075. bool_query = BoolQuery(must_queries=[
  2076. RangeQuery("crtime","2022-05-31"),
  2077. TermQuery("docchannel",114)
  2078. ])
  2079. list_data = []
  2080. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  2081. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  2082. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  2083. print(total_count)
  2084. list_row = getRow_ots(rows)
  2085. list_data.extend(list_row)
  2086. _count = len(list_row)
  2087. while next_token:
  2088. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  2089. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  2090. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  2091. list_row = getRow_ots(rows)
  2092. list_data.extend(list_row)
  2093. _count = len(list_data)
  2094. print("%d/%d"%(_count,total_count))
  2095. task_queue = Queue()
  2096. for _row in list_data:
  2097. if "all_columns" in _row:
  2098. _row.pop("all_columns")
  2099. _html = Document(_row)
  2100. task_queue.put(_html)
  2101. def _handle(item,result_queue):
  2102. _html = item
  2103. _html.fix_columns(ots_capacity,["dochtmlcon"],True)
  2104. print(_html.getProperties().get(document_tmp_docid))
  2105. send_msg_toacmq(pool_mq,json.dumps(_html.getProperties()),"/queue/dataflow_extract")
  2106. mt = MultiThreadHandler(task_queue,_handle,None,30)
  2107. mt.run()
  2108. except Exception as e:
  2109. traceback.print_exc()
  2110. finally:
  2111. pool_mq.destory()
  2112. def check_data_synchronization():
  2113. # filepath = "C:\\Users\\Administrator\\Desktop\\to_check.log"
  2114. # list_uuid = []
  2115. # _regrex = "delete\s+(?P<tablename>[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  2116. # with open(filepath,"r",encoding="utf8") as f:
  2117. # while 1:
  2118. # _line = f.readline()
  2119. # if not _line:
  2120. # break
  2121. # _match = re.search(_regrex,_line)
  2122. # if _match is not None:
  2123. # _uuid = _match.groupdict().get("uuid")
  2124. # tablename = _match.groupdict.get("tablename")
  2125. # if _uuid is not None:
  2126. # list_uuid.append({"uuid":_uuid,"tablename":tablename})
  2127. # print("total_count:",len(list_uuid))
  2128. import pandas as pd
  2129. from BaseDataMaintenance.common.Utils import load
  2130. task_queue = Queue()
  2131. list_data = []
  2132. df_data = load("uuid.pk")
  2133. # df_data = pd.read_excel("check.xlsx")
  2134. for uuid,tablename in zip(df_data["uuid"],df_data["tablename"]):
  2135. _dict = {"uuid":uuid,
  2136. "tablename":tablename}
  2137. list_data.append(_dict)
  2138. task_queue.put(_dict)
  2139. print("qsize:",task_queue.qsize())
  2140. ots_client = getConnect_ots()
  2141. def _handle(_item,result_queue):
  2142. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  2143. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  2144. SearchQuery(bool_query,get_total_count=True),
  2145. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  2146. _item["exists"] = total_count
  2147. mt = MultiThreadHandler(task_queue,_handle,None,30)
  2148. mt.run()
  2149. df_data = {"uuid":[],
  2150. "tablename":[],
  2151. "exists":[]}
  2152. for _data in list_data:
  2153. if _data["exists"]==0:
  2154. for k,v in df_data.items():
  2155. v.append(_data.get(k))
  2156. import pandas as pd
  2157. df2 = pd.DataFrame(df_data)
  2158. df2.to_excel("check1.xlsx")
  2159. current_path = os.path.abspath(os.path.dirname(__file__))
  2160. def test_ai_extract():
  2161. _dochtmlcon = '''
  2162. <div>
  2163. <div>
  2164. 公告内容:
  2165. </div>
  2166. <div>
  2167. <p>宫腔镜</p>
  2168. <div>
  2169. <p> </p>
  2170. <p> <span>440101-2025-07530</span> </p>
  2171. </div>
  2172. <div></div>
  2173. <div>
  2174. <div>
  2175. <div>
  2176. <div>
  2177. <table>
  2178. <tbody>
  2179. <tr>
  2180. <td> 一、采购人: <a target="_blank" class="markBlue" href="/bdqyhx/670312265132728320.html" style="color: #3083EB !important;text-decoration: underline;">广州医科大学附属妇女儿童医疗中心</a> </td>
  2181. </tr>
  2182. <tr>
  2183. <td> 二、采购计划编号: 440101-2025-07530 </td>
  2184. </tr>
  2185. <tr>
  2186. <td> 三、采购计划名称: 宫腔镜 </td>
  2187. </tr>
  2188. <tr>
  2189. <td> 四、采购品目名称: 医用内窥镜 </td>
  2190. </tr>
  2191. <tr>
  2192. <td> 五、采购预算金额(元): 1920000.00 </td>
  2193. </tr>
  2194. <tr>
  2195. <td> 六、需求时间: </td>
  2196. </tr>
  2197. <tr>
  2198. <td> 七、采购方式: 公开招标 </td>
  2199. </tr>
  2200. <tr>
  2201. <td> 八、备案时间: 2025-05-06 09:12:11 </td>
  2202. </tr>
  2203. </tbody>
  2204. </table>
  2205. <div>
  2206. <table>
  2207. <tbody>
  2208. <tr>
  2209. <td>发布人:<a target="_blank" class="markBlue" href="/bdqyhx/670312265132728320.html" style="color: #3083EB !important;text-decoration: underline;">广州医科大学附属妇女儿童医疗中心</a></td>
  2210. </tr>
  2211. <tr>
  2212. <td>发布时间: 2025年 05月 06日 </td>
  2213. </tr>
  2214. </tbody>
  2215. </table>
  2216. </div>
  2217. </div>
  2218. </div>
  2219. </div>
  2220. </div>
  2221. </div>
  2222. </div>
  2223. '''
  2224. _extract_json = '''
  2225. { "addr_dic": {}, "aptitude": "", "attachmentTypes": "", "bid_score": [], "bidway": "公开招标", "candidate": "", "code": [ "440101-2025-07530" ], "code_investment": "", "cost_time": { "attrs": 0.07, "codename": 0.03, "deposit": 0.0, "district": 0.12, "kvtree": 0.03, "moneygrade": 0.0, "nerToken": 0.04, "outline": 0.03, "pb_extract": 0.16, "person": 0.0, "prem": 0.04, "preprocess": 0.11, "product": 0.02, "product_attrs": 0.01, "roleRuleFinal": 0.01, "rolegrade": 0.0, "rule": 0.01, "rule_channel": 0.03, "rule_channel2": 0.16, "tableToText": 0.03000166893005371, "tendereeRuleRecall": 0.0, "time": 0.01, "total_unit_money": 0.0 }, "demand_info": { "data": [], "header": [], "header_col": [] }, "deposit_patment_way": "", "dict_enterprise": { "广州医科大学附属妇女儿童医疗中心": { "in_text": 1 } }, "district": { "area": "华南", "city": "广州", "district": "未知", "is_in_text": false, "province": "广东" }, "docchannel": { "docchannel": "招标预告", "doctype": "采招数据", "life_docchannel": "招标预告", "use_original_docchannel": 0 }, "docid": "", "doctitle_refine": "宫腔镜", "exist_table": 1, "extract_count": 4, "fail_reason": "", "fingerprint": "md5=27c02035e60e7cc1b7b8be65eedf92fd", "industry": { "class": "零售批发", "class_name": "医疗设备", "subclass": "专用设备" }, "is_deposit_project": false, "label_dic": {}, "match_enterprise": [], "match_enterprise_type": 0, "moneys": [ 1920000.0 ], "moneys_attachment": [], "moneysource": "", "name": "医用内窥镜", "nlp_enterprise": [ "广州医科大学附属妇女儿童医疗中心" ], "nlp_enterprise_attachment": [], "pb": { "industry": "教育及研究", "project_name_refind": "医用内窥镜", "project_property": "新建" }, "pb_project_name": "医用内窥镜", "person_review": [], "pinmu_name": "医用内窥镜", "policies": [], "prem": { "Project": { "code": "", "name": "医用内窥镜", "roleList": [ { "address": "", "linklist": [], "role_money": { "discount_ratio": "", "downward_floating_ratio": "", "floating_ratio": "", "money": 0, "money_unit": "" }, "role_name": "tenderee", "role_prob": 0.9499999463558197, "role_text": "广州医科大学附属妇女儿童医疗中心", "serviceTime": "" } ], "tendereeMoney": "1920000.00", "tendereeMoneyUnit": "元", "uuid": "b1f25984-d473-4995-aec6-dbdba27fa898" } }, "process_time": "2025-05-06 09:24:32", "product": [ "宫腔镜", "医用内窥镜" ], "product_attrs": { "data": [], "header": [], "header_col": [] }, "project_contacts": [], "project_label": { "标题": { "医疗检测设备": [ [ "宫腔镜", 1 ] ] }, "核心字段": { "医疗检测设备": [ [ "宫腔镜", 2 ], [ "内窥镜", 2 ] ] } }, "property_label": "", "proportion": "", "requirement": "", "serviceTime": { "service_days": 0, "service_end": "", "service_start": "" }, "success": true, "time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_contractEnd": "", "time_contractStart": "", "time_earnestMoneyEnd": "", "time_earnestMoneyStart": "", "time_getFileEnd": "", "time_getFileStart": "", "time_listingEnd": "", "time_listingStart": "", "time_planned": "", "time_publicityEnd": "", "time_publicityStart": "", "time_registrationEnd": "", "time_registrationStart": "", "time_release": "2025-05-06", "time_signContract": "", "total_tendereeMoney": 0, "total_tendereeMoneyUnit": "", "version_date": "2025-04-23", "word_count": { "正文": 211, "附件": 0 } }
  2226. '''
  2227. _text = html2text_with_tablehtml(_dochtmlcon)
  2228. if len(_text)<25000:
  2229. model_name = "ep-20250314164242-jd62g" #1.5pro 32k
  2230. else:
  2231. _text = _text[:100000]
  2232. model_name = "ep-20250212111145-fflr7" #1.5pro 256k
  2233. msg = get_prompt_extract_role(_text)
  2234. #model_name = "ep-20250212111145-fflr7" #1.5pro 256k
  2235. #model_name = "ep-20250314164242-jd62g" #1.5pro 32k
  2236. result = chat_doubao(msg,model_name=model_name)
  2237. _json = get_json_from_text(result)
  2238. if _json is not None:
  2239. try:
  2240. _extract_ai = json.loads(_json)
  2241. except Exception as e:
  2242. pass
  2243. if len(_extract_ai.keys())>0:
  2244. _new_json,_changed = de.merge_json(_extract_json,_json)
  2245. print("new_json")
  2246. print(_new_json)
  2247. if __name__ == '__main__':
  2248. # di = Dataflow_init()
  2249. # di.start_dataflow_init()
  2250. # transform_attachment()
  2251. # del_test_doc()
  2252. de = Dataflow_ActivteMQ_extract(create_listener=False)
  2253. # print(getUnifyMoney('第1 - 5年承包费为1135元/年/亩,第6 - 10年承包费为1235元/年/亩'))
  2254. a = '''
  2255. { "addr_dic": {}, "aptitude": "", "attachmentTypes": "", "bid_score": [], "bidway": "询价", "candidate": "", "code": [ "20250309921" ], "code_investment": "", "cost_time": { "attrs": 0.07, "codename": 0.03, "deposit": 0.0, "district": 0.1, "kvtree": 0.05, "moneygrade": 0.0, "nerToken": 0.06, "outline": 0.02, "pb_extract": 0.14, "person": 0.0, "prem": 0.01, "preprocess": 0.1, "product": 0.03, "product_attrs": 0.0, "roleRuleFinal": 0.01, "rolegrade": 0.0, "rule": 0.02, "rule_channel": 0.02, "rule_channel2": 0.25, "tableToText": 0.04000095367431641, "tendereeRuleRecall": 0.0, "time": 0.0, "total_unit_money": 0.0 }, "demand_info": { "data": [], "header": [], "header_col": [] }, "deposit_patment_way": "", "dict_enterprise": { "五矿铜业(湖南)有限公司": { "credit_code": "91430482081376930E", "in_text": 1 } }, "district": { "area": "华中", "city": "未知", "district": "未知", "is_in_text": false, "province": "湖南" }, "docchannel": { "docchannel": "招标公告", "doctype": "采招数据", "life_docchannel": "招标公告", "use_original_docchannel": 0 }, "docid": "", "doctitle_refine": "湖南有色铜业方闸", "exist_table": 1, "extract_count": 2, "fail_reason": "", "fingerprint": "md5=84d3541612f56fc9dc07b0cd7fa2c644", "industry": { "class": "零售批发", "class_name": "有色金属冶炼及压延产品", "subclass": "建筑建材" }, "is_deposit_project": false, "label_dic": { "is_direct_procurement": 1 }, "match_enterprise": [], "match_enterprise_type": 0, "moneys": [], "moneys_attachment": [], "moneysource": "", "name": "湖南有色铜业方闸采购", "nlp_enterprise": [ "五矿铜业(湖南)有限公司" ], "nlp_enterprise_attachment": [], "pb": { "location": "湖南", "project_name_refind": "湖南有色铜业方闸采购", "project_property": "新建" }, "pb_project_name": "湖南有色铜业方闸采购", "person_review": [], "pinmu_name": "", "policies": [], "prem": {}, "process_time": "2025-04-16 14:58:20", "product": [ "铜业方闸", "手电不锈钢复合式速开方闸门配一体式启闭机" ], "product_attrs": { "data": [], "header": [], "header_col": [] }, "project_contacts": [], "project_label": { "标题": {}, "核心字段": { "门类": [ [ "闸门", 1 ] ] } }, "property_label": "", "proportion": "", "requirement": "", "serviceTime": { "service_days": 0, "service_end": "", "service_start": "" }, "success": true, "time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_contractEnd": "", "time_contractStart": "", "time_earnestMoneyEnd": "", "time_earnestMoneyStart": "", "time_getFileEnd": "", "time_getFileStart": "", "time_listingEnd": "", "time_listingStart": "", "time_planned": "", "time_publicityEnd": "", "time_publicityStart": "", "time_registrationEnd": "", "time_registrationStart": "", "time_release": "", "time_signContract": "", "total_tendereeMoney": 0, "total_tendereeMoneyUnit": "", "version_date": "2025-04-02", "word_count": { "正文": 240, "附件": 0 } }
  2256. '''
  2257. b = '''
  2258. {"招标信息":{"招标人名称":"五矿铜业(湖南)有限公司","项目预算":"","招标人联系方式":[{"联系人":"李锟","联系电话":"13575144492"}]},"中标信息":[{"中标人名称":"","中标金额":"","标段号":""}]}
  2259. '''
  2260. # print(de.should_to_extract_ai(a))
  2261. print(de.merge_json(a,b))
  2262. print(test_ai_extract())
  2263. # de.start_flow_extract()
  2264. # fixDoc_to_queue_extract()
  2265. # check_data_synchronization()
  2266. # fixDoc_to_queue_init(filename="C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-07.xlsx")