dataflow_mq.py 112 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432
  1. #coding:utf8
  2. from BaseDataMaintenance.maintenance.dataflow import *
  3. from BaseDataMaintenance.common.activateMQUtils import *
  4. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ,getConnection_postgres,getConnection_mysql,getConnection_oracle,getConnect_ots_capacity,getConnect_redis_doc
  5. from BaseDataMaintenance.dataSource.setttings import *
  6. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  7. import os
  8. from BaseDataMaintenance.common.ossUtils import *
  9. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  10. from BaseDataMaintenance.model.ots.document import Document,document_attachment_path_filemd5
  11. from BaseDataMaintenance.common.Utils import article_limit
  12. from BaseDataMaintenance.common.documentFingerprint import getFingerprint
  13. from BaseDataMaintenance.model.postgres.document_extract import *
  14. from BaseDataMaintenance.model.oracle.T_SHEN_PI_XIANG_MU import *
  15. import sys
  16. sys.setrecursionlimit(1000000)
  17. from multiprocessing import Process
  18. from BaseDataMaintenance.AIUtils.DoubaoUtils import chat_doubao,get_json_from_text
  19. from BaseDataMaintenance.AIUtils.html2text import html2text_with_tablehtml
  20. from BaseDataMaintenance.AIUtils.prompts import get_prompt_extract_role
  21. from BaseDataMaintenance.common.Utils import getUnifyMoney
  22. class ActiveMQListener():
  23. def __init__(self,conn,_queue,*args,**kwargs):
  24. self.conn = conn
  25. self._queue = _queue
  26. def on_error(self, headers):
  27. log("===============")
  28. log('received an error %s' % str(headers.body))
  29. def on_message(self, headers):
  30. log("====message====")
  31. message_id = headers.headers["message-id"]
  32. body = headers.body
  33. self._queue.put({"frame":headers,"conn":self.conn},True)
  34. def __del__(self):
  35. self.conn.disconnect()
  36. class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
  37. class AttachmentMQListener():
  38. def __init__(self,conn,_func,_idx,*args,**kwargs):
  39. self.conn = conn
  40. self._func = _func
  41. self._idx = _idx
  42. def on_error(self, headers):
  43. log("===============")
  44. log('received an error %s' % str(headers.body))
  45. def on_message(self, headers):
  46. try:
  47. log("get message of idx:%s"%(str(self._idx)))
  48. message_id = headers.headers["message-id"]
  49. body = headers.body
  50. _dict = {"frame":headers,"conn":self.conn,"idx":self._idx}
  51. self._func(_dict=_dict)
  52. except Exception as e:
  53. traceback.print_exc()
  54. pass
  55. def __del__(self):
  56. self.conn.disconnect()
  57. def __init__(self):
  58. Dataflow_attachment.__init__(self)
  59. self.mq_attachment = "/queue/dataflow_attachment"
  60. self.mq_attachment_failed = "/queue/dataflow_attachment_failed"
  61. self.mq_extract = "/queue/dataflow_extract"
  62. self.queue_attachment_ocr = Queue()
  63. self.queue_attachment_not_ocr = Queue()
  64. self.comsumer_count = 20
  65. self.comsumer_process_count = 5
  66. self.retry_comsumer_count = 10
  67. self.retry_times = 5
  68. self.list_attachment_comsumer = []
  69. # for _i in range(self.comsumer_count):
  70. # listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.queue_attachment)
  71. # createComsumer(listener_attachment,self.mq_attachment)
  72. # self.list_attachment_comsumer.append(listener_attachment)
  73. self.attach_pool = ConnectorPool(10,30,getConnection_postgres)
  74. self.redis_pool = ConnectorPool(10,30,getConnect_redis_doc)
  75. self.conn_mq = getConnect_activateMQ()
  76. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  77. self.session = None
  78. for _ in range(self.comsumer_process_count):
  79. listener_p = Process(target=self.start_attachment_listener)
  80. listener_p.start()
  81. # listener_p = Process(target=self.start_attachment_listener)
  82. # listener_p.start()
  83. def start_attachment_listener(self):
  84. for _i in range(self.comsumer_count):
  85. listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler ,_i)
  86. createComsumer(listener_attachment,self.mq_attachment)
  87. self.list_attachment_comsumer.append(listener_attachment)
  88. while 1:
  89. for i in range(len(self.list_attachment_comsumer)):
  90. if self.list_attachment_comsumer[i].conn.is_connected():
  91. continue
  92. else:
  93. listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,i)
  94. createComsumer(listener,self.mq_attachment)
  95. self.list_attachment_comsumer[i] = listener
  96. time.sleep(5)
  97. def monitor_listener(self):
  98. for i in range(len(self.list_attachment_comsumer)):
  99. if self.list_attachment_comsumer[i].conn.is_connected():
  100. continue
  101. else:
  102. listener = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  103. createComsumer(listener,self.mq_attachment)
  104. self.list_attachment_comsumer[i] = listener
  105. def process_failed_attachment(self):
  106. from BaseDataMaintenance.java.MQInfo import getQueueSize
  107. attachment_size = getQueueSize("dataflow_attachment")
  108. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  109. if attachment_size<100 and failed_attachment_size>0:
  110. list_comsumer = []
  111. for _i in range(self.retry_comsumer_count):
  112. listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
  113. list_comsumer.append(listener_attachment)
  114. createComsumer(listener_attachment,self.mq_attachment_failed)
  115. while 1:
  116. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  117. if failed_attachment_size==0:
  118. break
  119. time.sleep(10)
  120. for _c in list_comsumer:
  121. _c.conn.disconnect()
  122. def attachment_listener_handler(self,_dict):
  123. try:
  124. frame = _dict["frame"]
  125. conn = _dict["conn"]
  126. message_id = frame.headers["message-id"]
  127. item = json.loads(frame.body)
  128. _idx = _dict.get("idx",1)
  129. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  130. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  131. if random.random()<0.2:
  132. log("jump by random")
  133. if send_msg_toacmq(self.pool_mq,frame.body,self.mq_attachment):
  134. ackMsg(conn,message_id)
  135. return
  136. if len(page_attachments)==0:
  137. newitem ={"item":item,"list_attach":[],"message_id":message_id,"conn":conn}
  138. else:
  139. list_fileMd5 = []
  140. for _atta in page_attachments:
  141. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  142. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  143. newitem = {"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn}
  144. log("attachment get doc:%s"%(str(newitem.get("item",{}).get("docid"))))
  145. self.attachment_recognize(newitem,None)
  146. log("attachment get doc:%s succeed"%(str(newitem.get("item",{}).get("docid"))))
  147. except Exception as e:
  148. traceback.print_exc()
  149. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  150. try:
  151. list_html = []
  152. swf_urls = []
  153. _not_failed = True
  154. for _attach in list_attach:
  155. #测试全跑
  156. _filemd5 = _attach.getProperties().get(attachment_filemd5)
  157. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  158. log("%s has processed or toolarge"%(_filemd5))
  159. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  160. if _html is None:
  161. _html = ""
  162. list_html.append({attachment_filemd5:_filemd5,
  163. "html":_html})
  164. else:
  165. #has process_time then jump
  166. if len(str(_attach.getProperties().get(attachment_process_time,"")))>10 and _attach.getProperties().get(attachment_status)!=ATTACHMENT_INIT and not (_attach.getProperties().get(attachment_status)>=ATTACHMENT_MC_FAILED_FROM and _attach.getProperties().get(attachment_status)<=ATTACHMENT_MC_FAILED_TO):
  167. log("%s has process_time jump"%(_filemd5))
  168. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  169. if _html is None:
  170. _html = ""
  171. list_html.append({attachment_filemd5:_filemd5,
  172. "html":_html})
  173. else:
  174. log("%s requesting interface"%(_filemd5))
  175. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  176. if not _succeed:
  177. _not_failed = False
  178. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  179. if _html is None:
  180. _html = ""
  181. list_html.append({attachment_filemd5:_filemd5,
  182. "html":_html})
  183. if _attach.getProperties().get(attachment_filetype)=="swf":
  184. # swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  185. _swf_urls = _attach.getProperties().get(attachment_swfUrls, "[]")
  186. if _swf_urls:
  187. _swf_urls = _swf_urls.replace('\\', '')
  188. else:
  189. _swf_urls = '[]'
  190. _swf_urls = json.loads(_swf_urls)
  191. swf_urls.extend(_swf_urls)
  192. if not _not_failed:
  193. return False,list_html,swf_urls
  194. return True,list_html,swf_urls
  195. except requests.ConnectionError as e1:
  196. raise e1
  197. except Exception as e:
  198. return False,list_html,swf_urls
  199. def attachment_recognize(self,_dict,result_queue):
  200. '''
  201. 识别附件内容
  202. :param _dict: 附件内容
  203. :param result_queue:
  204. :return:
  205. '''
  206. try:
  207. start_time = time.time()
  208. item = _dict.get("item")
  209. list_attach = _dict.get("list_attach")
  210. conn = _dict["conn"]
  211. message_id = _dict.get("message_id")
  212. if "retry_times" not in item:
  213. item["retry_times"] = 5
  214. _retry_times = item.get("retry_times",0)
  215. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  216. "docid":item.get("docid")})
  217. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  218. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  219. dhtml.delete_bidi_a()
  220. #调用识别接口
  221. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  222. # 将附件分类写回document
  223. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  224. if len(page_attachments)>0:
  225. for _attachment in page_attachments:
  226. filemd5 = _attachment.get(document_attachment_path_filemd5,"")
  227. classification = None
  228. for _attach in list_attach:
  229. if _attach.getProperties().get(attachment_filemd5,"")==filemd5:
  230. classification = _attach.getProperties().get(attachment_classification,"")
  231. break
  232. if classification is not None:
  233. _attachment[attachment_classification] = classification
  234. item[document_tmp_attachment_path] = json.dumps(page_attachments,ensure_ascii=False)
  235. dtmp = Document_tmp(item)
  236. _to_ack = False
  237. if not _succeed and _retry_times<self.retry_times:
  238. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  239. item["retry_times"] = _retry_times+1
  240. #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
  241. if item["retry_times"]>=self.retry_times:
  242. send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
  243. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
  244. #失败保存
  245. if _retry_times==0:
  246. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  247. dtmp.setValue(document_tmp_status,0,True)
  248. if not dtmp.exists_row(self.ots_client):
  249. dtmp.update_row(self.ots_client)
  250. dhtml.update_row(self.ots_client)
  251. if send_succeed:
  252. _to_ack = True
  253. else:
  254. try:
  255. log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),str(len(list_html))))
  256. dhtml.updateSWFImages(swf_urls)
  257. dhtml.updateAttachment(list_html)
  258. dtmp.setValue(document_tmp_attachment_extract_status,1,True)
  259. dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
  260. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
  261. if send_succeed:
  262. _to_ack = True
  263. except Exception as e:
  264. traceback.print_exc()
  265. if _to_ack:
  266. ackMsg(conn,message_id)
  267. log("document:%d get attachments with result:%s %s retry_times:%d"%(item.get("docid"),str(_succeed),str(_to_ack),_retry_times))
  268. except Exception as e:
  269. traceback.print_exc()
  270. if time.time()-start_time<10:
  271. item["retry_times"] -= 1
  272. if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment):
  273. ackMsg(conn,message_id)
  274. def request_attachment_interface(self,attach,_dochtmlcon):
  275. filemd5 = attach.getProperties().get(attachment_filemd5)
  276. _status = attach.getProperties().get(attachment_status)
  277. _filetype = attach.getProperties().get(attachment_filetype)
  278. _path = attach.getProperties().get(attachment_path)
  279. _uuid = uuid4()
  280. objectPath = attach.getProperties().get(attachment_path)
  281. docids = attach.getProperties().get(attachment_docids)
  282. _ots_exists = attach.getProperties().get("ots_exists")
  283. if objectPath is None:
  284. relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex)
  285. else:
  286. relative_path = objectPath[5:].replace("//","/")
  287. localpath = "/FileInfo/%s"%(relative_path)
  288. if not os.path.exists(localpath):
  289. if not os.path.exists(os.path.dirname(localpath)):
  290. os.makedirs(os.path.dirname(localpath))
  291. local_exists = False
  292. else:
  293. local_exists = True
  294. _size = os.path.getsize(localpath)
  295. not_failed_flag = True
  296. try:
  297. d_start_time = time.time()
  298. if not local_exists:
  299. log("md5:%s path:%s not exists,start downloading"%(filemd5,objectPath))
  300. try:
  301. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  302. except Exception as e:
  303. download_succeed = False
  304. else:
  305. log("md5:%s path:%s exists"%(filemd5,objectPath[5:]))
  306. if not (local_exists or download_succeed):
  307. _ots_attach = attachment(attach.getProperties_ots())
  308. _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_classification,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True)
  309. log("md5:%s path:%s file not in local or oss,search ots.attachment"%(filemd5,objectPath))
  310. if _ots_attach.getProperties().get(attachment_attachmenthtml,"")!="" and str(_ots_attach.getProperties().get(attachment_status))!=str(ATTACHMENT_INIT):
  311. attach.setValue(attachment_attachmenthtml,_ots_attach.getProperties().get(attachment_attachmenthtml,""))
  312. attach.setValue(attachment_attachmentcon,_ots_attach.getProperties().get(attachment_attachmentcon,""))
  313. attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,""))
  314. attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,""))
  315. attach.setValue(attachment_classification,_ots_attach.getProperties().get(attachment_classification,""))
  316. # if attach.exists(self.attach_pool):
  317. # attach.update_row(self.attach_pool)
  318. # else:
  319. # attach.insert_row(self.attach_pool)
  320. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  321. try:
  322. if os.exists(localpath):
  323. os.remove(localpath)
  324. except Exception as e:
  325. pass
  326. return True
  327. if _ots_exists:
  328. objectPath = attach.getProperties().get(attachment_path)
  329. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  330. if download_succeed:
  331. log("md5:%s path:%s download file from oss succeed"%(filemd5,objectPath))
  332. else:
  333. log("md5:%s path:%s download file from ots failed=="%(filemd5,objectPath))
  334. else:
  335. log("md5:%s path:%s not found in ots"%(filemd5,objectPath))
  336. if local_exists or download_succeed:
  337. _size = os.path.getsize(localpath)
  338. attach.setValue(attachment_size,_size,True)
  339. if _size>ATTACHMENT_LARGESIZE:
  340. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE,True)
  341. log("attachment :%s of path:%s to large"%(filemd5,_path))
  342. _ots_attach = attachment(attach.getProperties_ots())
  343. _ots_attach.update_row(self.ots_client)
  344. # #更新postgres
  345. # if attach.exists(self.attach_pool):
  346. # attach.update_row(self.attach_pool)
  347. # else:
  348. # attach.insert_row(self.attach_pool)
  349. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  350. if local_exists:
  351. if not _ots_exists:
  352. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  353. os.remove(localpath)
  354. return True
  355. time_download = time.time()-d_start_time
  356. #调用接口处理结果
  357. start_time = time.time()
  358. _filetype = attach.getProperties().get(attachment_filetype)
  359. # _data_base64 = base64.b64encode(open(localpath,"rb").read())
  360. # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
  361. _success,_html,swf_images,classification = getAttachDealInterface(None,_filetype,path=localpath,session=self.session)
  362. _reg_time = time.time()-start_time
  363. if _success:
  364. if len(_html)<5:
  365. _html = ""
  366. else:
  367. if len(_html)>1:
  368. _html = "interface return error"
  369. else:
  370. # sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  371. _html = ""
  372. return False
  373. # 重跑swf时,删除原来的swf_urls中的"\"
  374. if attach.getProperties().get(attachment_filetype) == "swf":
  375. swf_urls = attach.getProperties().get(attachment_swfUrls, "[]")
  376. swf_urls = swf_urls.replace('\\', '') if swf_urls else '[]'
  377. swf_urls = json.loads(swf_urls)
  378. attach.setValue(attachment_swfUrls, json.dumps(swf_urls, ensure_ascii=False), True)
  379. swf_images = eval(swf_images)
  380. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  381. swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  382. if len(swf_urls)==0:
  383. objectPath = attach.getProperties().get(attachment_path,"")
  384. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  385. if not os.path.exists(swf_dir):
  386. os.mkdir(swf_dir)
  387. for _i in range(len(swf_images)):
  388. _base = swf_images[_i]
  389. _base = base64.b64decode(_base)
  390. filename = "swf_page_%d.png"%(_i)
  391. filepath = os.path.join(swf_dir,filename)
  392. with open(filepath,"wb") as f:
  393. f.write(_base)
  394. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  395. if os.path.exists(swf_dir):
  396. os.rmdir(swf_dir)
  397. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  398. else:
  399. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  400. if re.search("<td",_html) is not None:
  401. attach.setValue(attachment_has_table,1,True)
  402. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  403. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  404. if _file_title!="":
  405. attach.setValue(attachment_file_title,_file_title,True)
  406. if filelink!="":
  407. attach.setValue(attachment_file_link,filelink,True)
  408. attach.setValue(attachment_attachmenthtml,_html,True)
  409. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  410. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  411. attach.setValue(attachment_recsize,len(_html),True)
  412. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  413. attach.setValue(attachment_classification,classification,True)
  414. #更新ots
  415. _ots_attach = attachment(attach.getProperties_ots())
  416. _ots_attach.update_row(self.ots_client) #线上再开放更新
  417. # #更新postgres
  418. # if attach.exists(self.attach_pool):
  419. # attach.update_row(self.attach_pool)
  420. # else:
  421. # attach.insert_row(self.attach_pool)
  422. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  423. start_time = time.time()
  424. if local_exists:
  425. if not _ots_exists:
  426. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  427. try:
  428. if upload_status and os.exists(localpath):
  429. os.remove(localpath)
  430. except Exception as e:
  431. pass
  432. _upload_time = time.time()-start_time
  433. log("process filemd5:%s %s of type:%s with size:%.3fM download:%.2fs recognize takes %ds upload takes %.2fs _ots_exists %s,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,_reg_time,_upload_time,str(_ots_exists),len(_html)))
  434. return True
  435. else:
  436. return True
  437. except requests.ConnectionError as e1:
  438. raise e1
  439. except oss2.exceptions.NotFound as e:
  440. return True
  441. except Exception as e:
  442. traceback.print_exc()
  443. # def flow_attachment(self):
  444. # self.flow_attachment_producer()
  445. # self.flow_attachment_producer_comsumer()
  446. def getAttachPath(self,filemd5,_dochtmlcon):
  447. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  448. list_mark = ["data","filelink"]
  449. for _mark in list_mark:
  450. _find = _soup.find("a",attrs={_mark:filemd5})
  451. filelink = ""
  452. if _find is None:
  453. _find = _soup.find("img",attrs={_mark:filemd5})
  454. if _find is not None:
  455. filelink = _find.attrs.get("src","")
  456. else:
  457. filelink = _find.attrs.get("href","")
  458. if filelink.find("bidizhaobiao")>=0:
  459. _path = filelink.split("/file")
  460. if len(_path)>1:
  461. return _path[1]
  462. def getAttach_json_fromRedis(self,filemd5):
  463. db = self.redis_pool.getConnector()
  464. try:
  465. _key = "attach-%s"%(filemd5)
  466. _attach_json = db.get(_key)
  467. return _attach_json
  468. except Exception as e:
  469. log("getAttach_json_fromRedis error %s"%(str(e)))
  470. finally:
  471. try:
  472. if db.connection.check_health():
  473. self.redis_pool.putConnector(db)
  474. except Exception as e:
  475. pass
  476. return None
  477. def putAttach_json_toRedis(self,filemd5,extract_dict):
  478. db = self.redis_pool.getConnector()
  479. try:
  480. new_dict = {}
  481. for k,v in extract_dict.items():
  482. if not isinstance(v,set):
  483. new_dict[k] = v
  484. _key = "attach-%s"%(filemd5)
  485. _extract_json = db.set(str(_key),json.dumps(new_dict))
  486. db.expire(_key,3600*3)
  487. return _extract_json
  488. except Exception as e:
  489. log("putExtract_json_toRedis error%s"%(str(e)))
  490. traceback.print_exc()
  491. finally:
  492. try:
  493. if db.connection.check_health():
  494. self.redis_pool.putConnector(db)
  495. except Exception as e:
  496. pass
  497. def getAttachments(self,list_filemd5,_dochtmlcon):
  498. conn = self.attach_pool.getConnector()
  499. #搜索postgres
  500. try:
  501. to_find_md5 = []
  502. for _filemd5 in list_filemd5[:50]:
  503. if _filemd5 is not None:
  504. to_find_md5.append(_filemd5)
  505. conditions = ["filemd5 in ('%s')"%("','".join(to_find_md5))]
  506. list_attachment = []
  507. set_md5 = set()
  508. # list_attachment = Attachment_postgres.select_rows(conn,Attachment_postgres,"attachment",conditions)
  509. # for _attach in list_attachment:
  510. # set_md5.add(_attach.getProperties().get(attachment_filemd5))
  511. for _filemd5 in to_find_md5:
  512. _json = self.getAttach_json_fromRedis(_filemd5)
  513. if _json is not None:
  514. set_md5.add(_filemd5)
  515. list_attachment.append(Attachment_postgres(json.loads(_json)))
  516. log("select localpath database %d/%d"%(len(set_md5),len(to_find_md5)))
  517. for _filemd5 in to_find_md5:
  518. if _filemd5 not in set_md5:
  519. _path = self.getAttachPath(_filemd5,_dochtmlcon)
  520. log("getAttachments search in ots:%s"%(_filemd5))
  521. _attach = {attachment_filemd5:_filemd5}
  522. _attach_ots = attachment(_attach)
  523. if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time,attachment_classification],True):
  524. if _attach_ots.getProperties().get(attachment_status) is not None:
  525. log("getAttachments find in ots:%s"%(_filemd5))
  526. _attach_pg = Attachment_postgres(_attach_ots.getProperties())
  527. _attach_pg.setValue("ots_exists",True,True)
  528. list_attachment.append(_attach_pg)
  529. else:
  530. log("getAttachments status None find in ots:%s"%(_filemd5))
  531. _attach_pg = Attachment_postgres(_attach_ots.getProperties())
  532. _attach_pg.setValue("ots_exists",True,True)
  533. list_attachment.append(_attach_pg)
  534. else:
  535. log("getAttachments search in path:%s"%(_filemd5))
  536. if _path:
  537. log("getAttachments find in path:%s"%(_filemd5))
  538. if _path[0]=="/":
  539. _path = _path[1:]
  540. _filetype = _path.split(".")[-1]
  541. _attach = {attachment_filemd5:_filemd5,
  542. attachment_filetype:_filetype,
  543. attachment_status:20,
  544. attachment_path:"%s/%s"%(_filemd5[:4],_path),
  545. attachment_crtime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  546. list_attachment.append(Attachment_postgres(_attach))
  547. return list_attachment
  548. except Exception as e:
  549. log("attachProcess comsumer error %s"%str(e))
  550. log(str(to_find_md5))
  551. traceback.print_exc()
  552. return []
  553. finally:
  554. self.attach_pool.putConnector(conn)
  555. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  556. q_size = self.queue_attachment.qsize()
  557. qsize_ocr = self.queue_attachment_ocr.qsize()
  558. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  559. log("queue_attachment:%d,queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(q_size,qsize_ocr,qsize_not_ocr))
  560. def flow_attachment_producer_comsumer(self):
  561. log("start flow_attachment comsumer")
  562. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1,need_stop=False,restart=True)
  563. mt.run()
  564. def flow_attachment_process(self):
  565. self.process_comsumer()
  566. # p = Process(target = self.process_comsumer)
  567. # p.start()
  568. # p.join()
  569. def set_queue(self,_dict):
  570. list_attach = _dict.get("list_attach")
  571. to_ocr = False
  572. for attach in list_attach:
  573. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  574. to_ocr = True
  575. break
  576. if to_ocr:
  577. self.queue_attachment_ocr.put(_dict,True)
  578. else:
  579. self.queue_attachment_not_ocr.put(_dict,True)
  580. def comsumer_handle(self,_dict,result_queue):
  581. try:
  582. frame = _dict["frame"]
  583. conn = _dict["conn"]
  584. message_id = frame.headers["message-id"]
  585. item = json.loads(frame.body)
  586. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  587. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  588. if len(page_attachments)==0:
  589. self.set_queue({"item":item,"list_attach":[],"message_id":message_id,"conn":conn})
  590. else:
  591. list_fileMd5 = []
  592. for _atta in page_attachments:
  593. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  594. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  595. self.set_queue({"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn})
  596. except Exception as e:
  597. traceback.print_exc()
  598. def remove_attachment_postgres(self):
  599. current_date = getCurrent_date(format="%Y-%m-%d")
  600. last_date = timeAdd(current_date,-2,format="%Y-%m-%d")
  601. sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
  602. conn = self.attach_pool.getConnector()
  603. try:
  604. cursor = conn.cursor()
  605. cursor.execute(sql)
  606. conn.commit()
  607. self.attach_pool.putConnector(conn)
  608. except Exception as e:
  609. conn.close()
  610. def start_flow_attachment(self):
  611. schedule = BlockingScheduler()
  612. # schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  613. # schedule.add_job(self.flow_attachment,"cron",second="*/10")
  614. # schedule.add_job(self.flow_attachment_producer,"cron",second="*/10")
  615. # schedule.add_job(self.flow_attachment_producer_comsumer,"cron",second="*/10")
  616. # schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  617. # schedule.add_job(self.monitor_attachment_process,"cron",second="*/10")
  618. # schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
  619. schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
  620. schedule.start()
  621. class Dataflow_ActivteMQ_extract(Dataflow_extract):
  622. class ExtractListener():
  623. def __init__(self,conn,_func,_idx,*args,**kwargs):
  624. self.conn = conn
  625. self._func = _func
  626. self._idx = _idx
  627. def on_message(self, headers):
  628. try:
  629. log("get message of idx:%d"%(self._idx))
  630. message_id = headers.headers["message-id"]
  631. body = headers.body
  632. log("get message %s crtime:%s"%(message_id,json.loads(body).get("crtime","")))
  633. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  634. except Exception as e:
  635. traceback.print_exc()
  636. pass
  637. def on_error(self, headers):
  638. log('received an error %s' % str(headers.body))
  639. def __del__(self):
  640. self.conn.disconnect()
  641. def __init__(self,create_listener=True):
  642. Dataflow_extract.__init__(self)
  643. self.industy_url = "http://127.0.0.1:15000/industry_extract"
  644. self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",20],
  645. ["http://192.168.0.115:15030/content_extract",10]
  646. ]
  647. self.mq_extract = "/queue/dataflow_extract"
  648. self.mq_extract_ai = "/queue/dataflow_extract_AI"
  649. self.mq_extract_failed = "/queue/dataflow_extract_failed"
  650. self.whole_weight = 0
  651. for _url,weight in self.extract_interfaces:
  652. self.whole_weight+= weight
  653. current_weight = 0
  654. for _i in range(len(self.extract_interfaces)):
  655. current_weight += self.extract_interfaces[_i][1]
  656. self.extract_interfaces[_i][1] = current_weight/self.whole_weight
  657. self.comsumer_count = 5
  658. # self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres)
  659. self.pool_redis_doc = ConnectorPool(1,self.comsumer_count,getConnect_redis_doc)
  660. self.conn_mq = getConnect_activateMQ()
  661. self.pool_mq = ConnectorPool(1,30,getConnect_activateMQ)
  662. self.block_url = RLock()
  663. self.url_count = 0
  664. self.session = None
  665. self.list_extract_comsumer = []
  666. self.list_extract_ai_comsumer = []
  667. # for _i in range(self.comsumer_count):
  668. # listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  669. # createComsumer(listener_extract,self.mq_extract)
  670. # self.list_extract_comsumer.append(listener_extract)
  671. # 提取listener
  672. if create_listener:
  673. for ii in range(10):
  674. listener_p = Process(target=self.start_extract_listener)
  675. listener_p.start()
  676. listener_p_ai = Thread(target=self.start_extract_AI_listener)
  677. listener_p_ai.start()
  678. def start_extract_AI_listener(self,_count=4):
  679. self.list_extract_ai_comsumer = []
  680. for _i in range(_count):
  681. listener_extract = self.ExtractListener(getConnect_activateMQ(),self.extract_ai_handle,_i)
  682. createComsumer(listener_extract,self.mq_extract_ai)
  683. self.list_extract_ai_comsumer.append(listener_extract)
  684. while 1:
  685. try:
  686. for _i in range(len(self.list_extract_ai_comsumer)):
  687. if self.list_extract_ai_comsumer[_i].conn.is_connected():
  688. continue
  689. else:
  690. listener = self.ExtractListener(getConnect_activateMQ(),self.extract_ai_handle,_i)
  691. createComsumer(listener,self.mq_extract_ai)
  692. self.list_extract_ai_comsumer[_i] = listener
  693. time.sleep(5)
  694. except Exception as e:
  695. traceback.print_exc()
  696. def start_extract_listener(self):
  697. self.list_extract_comsumer = []
  698. for _i in range(self.comsumer_count):
  699. listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  700. createComsumer(listener_extract,self.mq_extract)
  701. self.list_extract_comsumer.append(listener_extract)
  702. while 1:
  703. try:
  704. for _i in range(len(self.list_extract_comsumer)):
  705. if self.list_extract_comsumer[_i].conn.is_connected():
  706. continue
  707. else:
  708. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  709. createComsumer(listener,self.mq_extract)
  710. self.list_extract_comsumer[_i] = listener
  711. time.sleep(5)
  712. except Exception as e:
  713. traceback.print_exc()
  714. def monitor_listener(self):
  715. for i in range(len(self.list_extract_comsumer)):
  716. if self.list_extract_comsumer[i].conn.is_connected():
  717. continue
  718. else:
  719. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
  720. createComsumer(listener,self.mq_extract)
  721. self.list_extract_comsumer[i] = listener
  722. def getExtract_url(self):
  723. # _url_num = 0
  724. # with self.block_url:
  725. # self.url_count += 1
  726. # self.url_count %= self.whole_weight
  727. # _url_num = self.url_count
  728. _r = random.random()
  729. # _r = _url_num/self.whole_weight
  730. for _i in range(len(self.extract_interfaces)):
  731. if _r<=self.extract_interfaces[_i][1]:
  732. return self.extract_interfaces[_i][0]
  733. def request_extract_interface(self,json,headers):
  734. # _i = random.randint(0,len(self.extract_interfaces)-1)
  735. # _i = 0
  736. # _url = self.extract_interfaces[_i]
  737. _url = self.getExtract_url()
  738. log("extract_url:%s"%(str(_url)))
  739. with requests.Session() as session:
  740. resp = session.post(_url,json=json,headers=headers,timeout=10*60)
  741. return resp
  742. def request_industry_interface(self,json,headers):
  743. resp = requests.post(self.industy_url,json=json,headers=headers)
  744. return resp
  745. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  746. q_size = self.queue_extract.qsize()
  747. log("queue extract size:%d"%(q_size))
  748. def process_extract_failed(self):
  749. def _handle(_dict,result_queue):
  750. frame = _dict.get("frame")
  751. message_id = frame.headers["message-id"]
  752. subscription = frame.headers.setdefault('subscription', None)
  753. conn = _dict.get("conn")
  754. body = frame.body
  755. if body is not None:
  756. item = json.loads(body)
  757. item["extract_times"] = 10
  758. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  759. ackMsg(conn,message_id,subscription)
  760. from BaseDataMaintenance.java.MQInfo import getQueueSize
  761. try:
  762. extract_failed_size = getQueueSize("dataflow_extract_failed")
  763. extract_size = getQueueSize("dataflow_extract")
  764. log("extract_failed_size %s extract_size %s"%(str(extract_failed_size),str(extract_size)))
  765. if extract_failed_size>0 and extract_size<100:
  766. failed_listener = self.ExtractListener(getConnect_activateMQ(),_handle,1)
  767. createComsumer(failed_listener,self.mq_extract_failed)
  768. while 1:
  769. extract_failed_size = getQueueSize("dataflow_extract_failed")
  770. if extract_failed_size==0:
  771. break
  772. time.sleep(10)
  773. failed_listener.conn.disconnect()
  774. except Exception as e:
  775. traceback.print_exc()
  776. def flow_extract(self,):
  777. self.comsumer()
  778. def comsumer(self):
  779. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,20,1,True)
  780. mt.run()
  781. def getExtract_json_fromDB(self,_fingerprint):
  782. conn = self.pool_postgres.getConnector()
  783. try:
  784. list_extract = Document_extract_postgres.select_rows(conn,Document_extract_postgres,"document_extract",[" fingerprint='%s'"%_fingerprint])
  785. if len(list_extract)>0:
  786. _extract = list_extract[0]
  787. return _extract.getProperties().get(document_extract_extract_json)
  788. except Exception as e:
  789. traceback.print_exc()
  790. finally:
  791. self.pool_postgres.putConnector(conn)
  792. return None
  793. def putExtract_json_toDB(self,fingerprint,docid,extract_json):
  794. _de = Document_extract_postgres({document_extract_fingerprint:fingerprint,
  795. document_extract_docid:docid,
  796. document_extract_extract_json:extract_json})
  797. _de.insert_row(self.pool_postgres,1)
  798. def getExtract_json_fromRedis(self,_fingerprint):
  799. db = self.pool_redis_doc.getConnector()
  800. try:
  801. _extract_json = db.get(_fingerprint)
  802. return _extract_json
  803. except Exception as e:
  804. log("getExtract_json_fromRedis error %s"%(str(e)))
  805. finally:
  806. try:
  807. if db.connection.check_health():
  808. self.pool_redis_doc.putConnector(db)
  809. except Exception as e:
  810. pass
  811. return None
  812. def putExtract_json_toRedis(self,fingerprint,extract_json):
  813. db = self.pool_redis_doc.getConnector()
  814. try:
  815. _extract_json = db.set(str(fingerprint),extract_json)
  816. db.expire(fingerprint,3600*2)
  817. return _extract_json
  818. except Exception as e:
  819. log("putExtract_json_toRedis error%s"%(str(e)))
  820. traceback.print_exc()
  821. finally:
  822. try:
  823. if db.connection.check_health():
  824. self.pool_redis_doc.putConnector(db)
  825. except Exception as e:
  826. pass
  827. def comsumer_handle(self,_dict,result_queue):
  828. try:
  829. log("start handle")
  830. data = {}
  831. frame = _dict["frame"]
  832. conn = _dict["conn"]
  833. message_id = frame.headers["message-id"]
  834. subscription = frame.headers.setdefault('subscription', None)
  835. item = json.loads(frame.body)
  836. for k,v in item.items():
  837. try:
  838. if isinstance(v,bytes):
  839. item[k] = v.decode("utf-8")
  840. except Exception as e:
  841. log("docid %d types bytes can not decode"%(item.get("docid")))
  842. item[k] = ""
  843. dtmp = Document_tmp(item)
  844. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  845. "docid":item.get("docid")})
  846. extract_times = item.get("extract_times",0)+1
  847. item["extract_times"] = extract_times
  848. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  849. html_len = len(_dochtmlcon) # html 文本长度
  850. limit_text_len = 50000 # 内容(或附件)正文限制文本长度
  851. if html_len > limit_text_len:
  852. log("docid %s dochtmlcon too long len %d "%(str(item.get("docid")),html_len))
  853. try:
  854. _dochtmlcon = re.sub("<html>|</html>|<body>|</body>", "", _dochtmlcon)
  855. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  856. all_len = len(_soup.get_text()) # 全公告内容text长度
  857. _attachment = _soup.find("div", attrs={"class": "richTextFetch"})
  858. attachment_len = len(_attachment.get_text()) if _attachment else 0 # 附件内容text长度
  859. main_text_len = all_len - attachment_len # 正文内容text长度
  860. if attachment_len>150000: # 附件内容过长删除(处理超时)
  861. if _attachment is not None:
  862. _attachment.decompose()
  863. attachment_len = 0
  864. # 正文或附件内容text长度大于limit_text_len才执行article_limit
  865. if main_text_len>limit_text_len or attachment_len>limit_text_len:
  866. _soup = article_limit(_soup,limit_text_len)
  867. _dochtmlcon = str(_soup)
  868. except Exception as e:
  869. traceback.print_exc()
  870. ackMsg(conn,message_id,subscription)
  871. return
  872. log("docid %s len %d limit to %d"%(str(item.get("docid")),html_len,len(_dochtmlcon)))
  873. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  874. _extract = Document_extract({})
  875. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  876. _extract.setValue(document_extract2_docid,item.get(document_docid))
  877. all_done = 1
  878. for k,v in item.items():
  879. data[k] = v
  880. data["timeout"] = 440
  881. data["doc_id"] = data.get(document_tmp_docid,0)
  882. # if data["docid"]<298986054 and data["docid"]>0:
  883. # log("jump docid %s"%(str(data["docid"])))
  884. # ackMsg(conn,message_id,subscription)
  885. # return
  886. data["content"] = data.get(document_tmp_dochtmlcon,"")
  887. if document_tmp_dochtmlcon in data:
  888. data.pop(document_tmp_dochtmlcon)
  889. data["title"] = data.get(document_tmp_doctitle,"")
  890. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  891. data["web_source_name"] = item.get(document_tmp_web_source_name,"")
  892. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  893. data["page_attachments"] = item.get(document_tmp_attachment_path,"[]")
  894. _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))+str(data["original_docchannel"])
  895. to_ai = False
  896. if all_done>0:
  897. _time = time.time()
  898. # extract_json = self.getExtract_json_fromDB(_fingerprint)
  899. extract_json = self.getExtract_json_fromRedis(_fingerprint)
  900. log("get json from db takes %.4f"%(time.time()-_time))
  901. # extract_json = None
  902. _docid = int(data["doc_id"])
  903. if extract_json is not None:
  904. log("fingerprint %s exists docid:%s"%(_fingerprint,str(_docid)))
  905. _extract.setValue(document_extract2_extract_json,extract_json,True)
  906. else:
  907. resp = self.request_extract_interface(json=data,headers=self.header)
  908. if (resp.status_code >=200 and resp.status_code<=213):
  909. extract_json = resp.content.decode("utf8")
  910. _extract.setValue(document_extract2_extract_json,extract_json,True)
  911. _time = time.time()
  912. # self.putExtract_json_toDB(_fingerprint,_docid,extract_json)
  913. self.putExtract_json_toRedis(_fingerprint,extract_json)
  914. log("get json to db takes %.4f"%(time.time()-_time))
  915. else:
  916. log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  917. all_done = -2
  918. to_ai = self.should_to_extract_ai(extract_json)
  919. # if all_done>0:
  920. # resp = self.request_industry_interface(json=data,headers=self.header)
  921. # if (resp.status_code >=200 and resp.status_code<=213):
  922. # _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  923. # else:
  924. # log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  925. # all_done = -3
  926. # _to_ack = False
  927. # if all_done>0 and len(_extract.getProperties().get(document_extract2_extract_json,""))<=2:
  928. # all_done = -4
  929. _extract.setValue(document_extract2_industry_json,"{}",True)
  930. _to_ack = True
  931. try:
  932. if all_done!=1:
  933. # sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  934. log("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  935. if extract_times>=10:
  936. #process as succeed
  937. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  938. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  939. dtmp.update_row(self.ots_client)
  940. dhtml.update_row(self.ots_client)
  941. #replace as {}
  942. _extract.setValue(document_extract2_extract_json,"{}",True)
  943. _extract.setValue(document_extract2_industry_json,"{}",True)
  944. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  945. _extract.update_row(self.ots_client)
  946. _to_ack = True
  947. elif extract_times>5:
  948. #transform to the extract_failed queue
  949. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  950. #process as succeed
  951. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  952. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  953. dtmp.update_row(self.ots_client)
  954. dhtml.update_row(self.ots_client)
  955. #replace as {}
  956. _extract.setValue(document_extract2_extract_json,"{}",True)
  957. _extract.setValue(document_extract2_industry_json,"{}",True)
  958. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  959. _extract.update_row(self.ots_client)
  960. _to_ack = True
  961. else:
  962. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
  963. #失败保存
  964. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  965. dtmp.setValue(document_tmp_status,60,True)
  966. if not dtmp.exists_row(self.ots_client):
  967. dtmp.update_row(self.ots_client)
  968. dhtml.update_row(self.ots_client)
  969. if send_succeed:
  970. _to_ack = True
  971. else:
  972. #process succeed
  973. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  974. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  975. if _docid==290816703:
  976. dtmp.setValue("test_json",extract_json,True)
  977. dtmp.update_row(self.ots_client)
  978. dhtml.update_row(self.ots_client)
  979. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  980. _extract.update_row(self.ots_client)
  981. _to_ack = True
  982. except Exception:
  983. traceback.print_exc()
  984. if _to_ack:
  985. ackMsg(conn,message_id,subscription)
  986. if to_ai:
  987. #sent to ai
  988. item[document_extract2_extract_json] = json.loads(_extract.getProperties().get(document_extract2_extract_json,"{}"))
  989. send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai)
  990. else:
  991. item["extract_times"] -= 1
  992. send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
  993. ackMsg(conn,message_id,subscription)
  994. log("process %s docid:%d %s"%(str(_to_ack),data.get("doc_id"),str(all_done)))
  995. except requests.ConnectionError as e1:
  996. item["extract_times"] -= 1
  997. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  998. ackMsg(conn,message_id,subscription)
  999. except Exception as e:
  1000. traceback.print_exc()
  1001. # sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
  1002. log("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
  1003. log("process %s docid: failed message_id:%s"%(data.get("doc_id"),message_id))
  1004. if extract_times>=10:
  1005. #process as succeed
  1006. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1007. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1008. dtmp.update_row(self.ots_client)
  1009. dhtml.update_row(self.ots_client)
  1010. #replace as {}
  1011. _extract.setValue(document_extract2_extract_json,"{}",True)
  1012. _extract.setValue(document_extract2_industry_json,"{}",True)
  1013. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1014. _extract.update_row(self.ots_client)
  1015. ackMsg(conn,message_id,subscription)
  1016. elif extract_times>5:
  1017. #transform to the extract_failed queue
  1018. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  1019. #process as succeed
  1020. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1021. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1022. dtmp.update_row(self.ots_client)
  1023. dhtml.update_row(self.ots_client)
  1024. #replace as {}
  1025. _extract.setValue(document_extract2_extract_json,"{}",True)
  1026. _extract.setValue(document_extract2_industry_json,"{}",True)
  1027. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1028. _extract.update_row(self.ots_client)
  1029. ackMsg(conn,message_id,subscription)
  1030. else:
  1031. #transform to the extract queue
  1032. #失败保存
  1033. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1034. dtmp.setValue(document_tmp_status,60,True)
  1035. if not dtmp.exists_row(self.ots_client):
  1036. dtmp.update_row(self.ots_client)
  1037. dhtml.update_row(self.ots_client)
  1038. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  1039. ackMsg(conn,message_id,subscription)
  1040. def should_to_extract_ai(self,extract_json):
  1041. _extract = {}
  1042. if extract_json is not None:
  1043. try:
  1044. _extract = json.loads(extract_json)
  1045. except Exception as e:
  1046. pass
  1047. has_entity = False
  1048. has_tenderee = False
  1049. has_win_tenderer = False
  1050. docchannel = _extract.get("docchannel",{}).get("docchannel","")
  1051. if len(_extract.get("dict_enterprise",{}).keys())>0:
  1052. has_entity = True
  1053. prem = _extract.get("prem",{})
  1054. for k,v in prem.items():
  1055. for role in v.get("roleList",[]):
  1056. if role.get("role_name")=="tenderee":
  1057. has_tenderee = True
  1058. if role.get("role_name")=="win_tenderer":
  1059. has_win_tenderer = True
  1060. has_tenderee = False
  1061. has_win_tenderer = False
  1062. has_budget = False
  1063. budget_unexpected = False
  1064. winprice_unexpected = False
  1065. for _pack,_pack_value in prem.items():
  1066. _rolelist = _pack_value.get("roleList",[])
  1067. for _role in _rolelist:
  1068. if _role.get("role_name","")=="tenderee":
  1069. has_tenderee = True
  1070. if _role.get("role_name","")=="win_tenderer":
  1071. has_win_tenderer = True
  1072. win_price = _role.get("role_money",{}).get("money",0)
  1073. try:
  1074. win_price = float(win_price)
  1075. except Exception as e:
  1076. win_price = 0
  1077. if win_price>0:
  1078. if win_price>100000000 or win_price<1000:
  1079. winprice_unexpected = True
  1080. tendereeMoney = _pack_value.get("tendereeMoney",0)
  1081. try:
  1082. tendereeMoney = float(tendereeMoney)
  1083. except Exception as e:
  1084. tendereeMoney = 0
  1085. if tendereeMoney>0:
  1086. has_budget = True
  1087. if tendereeMoney>100000000 or tendereeMoney<1000:
  1088. budget_unexpected = True
  1089. if has_entity:
  1090. if not has_tenderee and docchannel in {"招标公告","中标信息","候选人公示","合同公告","开标记录","验收合同"}:
  1091. return True
  1092. if not has_win_tenderer and docchannel in {"中标信息","候选人公示","合同公告","开标记录","验收合同"}:
  1093. return True
  1094. if budget_unexpected or winprice_unexpected:
  1095. return True
  1096. def extractCount(self,extract_dict,page_attachments,web_source_name):
  1097. # time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  1098. if len(extract_dict.keys()):
  1099. _extract = extract_dict
  1100. else:
  1101. _extract = {}
  1102. # print(_extract)
  1103. dict_pack = _extract.get("prem",{})
  1104. extract_count = 0
  1105. list_code = _extract.get("code",[])
  1106. word_count = _extract.get("word_count",{})
  1107. if word_count.get("正文",0)>500:
  1108. extract_count += 3
  1109. if len(list_code)>0:
  1110. project_code = list_code[0]
  1111. else:
  1112. project_code = ""
  1113. project_name = _extract.get("name","")
  1114. bidding_budget = ""
  1115. win_tenderer = ""
  1116. win_bid_price = ""
  1117. linklist_count = 0
  1118. for _key in dict_pack.keys():
  1119. if "tendereeMoney" in dict_pack[_key] and dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0:
  1120. extract_count += 1
  1121. if bidding_budget=="":
  1122. bidding_budget = str(float(dict_pack[_key]["tendereeMoney"]))
  1123. for _role in dict_pack[_key]["roleList"]:
  1124. if isinstance(_role,list):
  1125. extract_count += 1
  1126. if _role[2]!='' and float(_role[2])>0:
  1127. extract_count += 1
  1128. if _role[0]=="tenderee":
  1129. tenderee = _role[1]
  1130. if _role[0]=="win_tenderer":
  1131. if _role[1] is not None and _role[1]!="":
  1132. extract_count += 2
  1133. if win_tenderer=="":
  1134. win_tenderer = _role[1]
  1135. if _role[2]!='' and float(_role[2])>0:
  1136. extract_count += 2
  1137. if win_bid_price=="":
  1138. win_bid_price = str(float(_role[2]))
  1139. if _role[0]=="agency":
  1140. agency = _role[1]
  1141. if isinstance(_role,dict):
  1142. extract_count += 1
  1143. if "role_money" in _role:
  1144. if str(_role["role_money"].get("money",""))!='' and float(_role["role_money"].get("money",""))>0:
  1145. extract_count += 1
  1146. if _role.get("role_name")=="tenderee":
  1147. tenderee = _role["role_text"]
  1148. if _role.get("role_name")=="win_tenderer":
  1149. if _role["role_text"] is not None and _role["role_text"]!="":
  1150. extract_count += 2
  1151. if win_tenderer=="":
  1152. win_tenderer = _role["role_text"]
  1153. if "role_money" in _role:
  1154. if str(_role["role_money"]["money"])!='' and float(_role["role_money"]["money"])>0:
  1155. extract_count += 2
  1156. if win_bid_price=="":
  1157. win_bid_price = str(float(_role["role_money"]["money"]))
  1158. if _role["role_name"]=="agency":
  1159. agency = _role["role_text"]
  1160. linklist = _role.get("linklist",[])
  1161. for link in linklist:
  1162. for l in link:
  1163. if l!="":
  1164. linklist_count += 1
  1165. extract_count += linklist_count//2
  1166. if project_code!="":
  1167. extract_count += 1
  1168. if project_name!="":
  1169. extract_count += 1
  1170. if page_attachments is not None and page_attachments!='':
  1171. try:
  1172. _attachments = json.loads(page_attachments)
  1173. set_md5 = set()
  1174. has_zhaobiao = False
  1175. has_qingdan = False
  1176. if len(_attachments)>0:
  1177. for _atta in _attachments:
  1178. classification = _atta.get("classification","")
  1179. set_md5.add(_atta.get("fileMd5"))
  1180. if str(classification)=='招标文件':
  1181. has_zhaobiao = True
  1182. if str(classification)=='采购清单':
  1183. has_qingdan = True
  1184. extract_count += len(set_md5)//2+1
  1185. if has_zhaobiao:
  1186. extract_count += 2
  1187. if has_qingdan:
  1188. extract_count += 1
  1189. except Exception as e:
  1190. traceback.print_exc()
  1191. pass
  1192. list_approval_dict = _extract.get("approval",[])
  1193. for _dict in list_approval_dict:
  1194. for k,v in _dict.items():
  1195. if v is not None and v!='' and v!="未知":
  1196. extract_count += 1
  1197. punish_dict = _extract.get("punish",{})
  1198. for k,v in punish_dict.items():
  1199. if v is not None and v!='' and v!="未知":
  1200. extract_count += 1
  1201. if web_source_name in set_login_web:
  1202. extract_count -= 3
  1203. product = _extract.get("product","")
  1204. extract_count += len(str(product).split(","))//5
  1205. return extract_count
  1206. def extract_ai_handle(self,_dict,result_queue):
  1207. frame = _dict["frame"]
  1208. conn = _dict["conn"]
  1209. message_id = frame.headers["message-id"]
  1210. subscription = frame.headers.setdefault('subscription', None)
  1211. item = json.loads(frame.body)
  1212. _extract_json = None
  1213. if document_extract2_extract_json in item:
  1214. _extract_json = item.get(document_extract2_extract_json)
  1215. item.pop(document_extract2_extract_json)
  1216. dtmp = Document_tmp(item)
  1217. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  1218. "docid":item.get("docid")})
  1219. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  1220. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  1221. extract_times = item.get("extract_times",0)+1
  1222. item["extract_times"] = extract_times
  1223. _extract_ai = {}
  1224. try:
  1225. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  1226. _text = html2text_with_tablehtml(_dochtmlcon)
  1227. msg = get_prompt_extract_role(_text)
  1228. result = chat_doubao(msg)
  1229. _json = get_json_from_text(result)
  1230. if _json is not None:
  1231. try:
  1232. _extract_ai = json.loads(_json)
  1233. except Exception as e:
  1234. pass
  1235. if len(_extract_ai.keys())>0:
  1236. _new_json,_changed = self.merge_json(_extract_json,_json)
  1237. if _changed:
  1238. dtmp.setValue("extract_json_ai",json.dumps(_extract_ai,ensure_ascii=False))
  1239. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  1240. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  1241. dtmp.update_row(self.ots_client)
  1242. if not dhtml.exists_row(self.ots_client):
  1243. dhtml.update_row(self.ots_client)
  1244. _extract = Document_extract({})
  1245. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  1246. _extract.setValue(document_extract2_docid,item.get(document_docid))
  1247. _extract.setValue(document_extract2_extract_json,_new_json,True)
  1248. _extract.setValue(document_extract2_industry_json,"{}",True)
  1249. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  1250. _extract.update_row(self.ots_client)
  1251. log("extract_ai of docid:%d"%(item.get(document_docid)))
  1252. ackMsg(conn,message_id,subscription)
  1253. except Exception as e:
  1254. traceback.print_exc()
  1255. ackMsg(conn,message_id,subscription)
  1256. pass
  1257. def merge_json(self,extract_json,extract_ai_json):
  1258. _extract = {}
  1259. if extract_json is not None:
  1260. try:
  1261. if isinstance(extract_json,str):
  1262. _extract = json.loads(extract_json)
  1263. else:
  1264. _extract = extract_json
  1265. except Exception as e:
  1266. pass
  1267. if "extract_count" not in _extract:
  1268. _extract["extract_count"] = 0
  1269. _extract_ai = {}
  1270. if extract_ai_json is not None:
  1271. try:
  1272. _extract_ai = json.loads(extract_ai_json)
  1273. except Exception as e:
  1274. pass
  1275. prem = _extract.get("prem")
  1276. if prem is None:
  1277. _extract["prem"] = {}
  1278. prem = _extract["prem"]
  1279. Project = prem.get("Project")
  1280. if Project is None:
  1281. prem["Project"] = {}
  1282. Project = prem["Project"]
  1283. Project_rolelist = Project.get("roleList")
  1284. if Project_rolelist is None:
  1285. Project["roleList"] = []
  1286. Project_rolelist = Project["roleList"]
  1287. has_tenderee = False
  1288. has_win_tenderer = False
  1289. has_budget = False
  1290. budget_unexpected = False
  1291. winprice_unexpected = False
  1292. for _pack,_pack_value in prem.items():
  1293. _rolelist = _pack_value.get("roleList",[])
  1294. for _role in _rolelist:
  1295. if _role.get("role_name","")=="tenderee":
  1296. has_tenderee = True
  1297. if _role.get("role_name","")=="win_tenderer":
  1298. has_win_tenderer = True
  1299. win_price = _role.get("role_money",{}).get("money",0)
  1300. try:
  1301. win_price = float(win_price)
  1302. except Exception as e:
  1303. win_price = 0
  1304. if win_price>0:
  1305. if win_price>100000000 or win_price<1000:
  1306. winprice_unexpected = True
  1307. tendereeMoney = _pack_value.get("tendereeMoney",0)
  1308. try:
  1309. tendereeMoney = float(tendereeMoney)
  1310. except Exception as e:
  1311. tendereeMoney = 0
  1312. if tendereeMoney>0:
  1313. has_budget = True
  1314. if tendereeMoney>100000000 or tendereeMoney<1000:
  1315. budget_unexpected = True
  1316. _changed = False
  1317. if not has_tenderee:
  1318. _tenderee_ai = _extract_ai.get("招标信息",{}).get("招标人名称")
  1319. _contacts = _extract_ai.get("招标信息",{}).get("招标人联系方式",[])
  1320. _linklist = []
  1321. for _conta in _contacts:
  1322. _person = _conta.get("联系人","")
  1323. _phone = _conta.get("联系电话","")
  1324. if _person!="" or _phone!="":
  1325. _linklist.append([_person,_phone])
  1326. if _tenderee_ai is not None and _tenderee_ai!="" and len(_tenderee_ai)>=4:
  1327. _role_dict = {
  1328. "role_name": "tenderee",
  1329. "role_text": _tenderee_ai,
  1330. }
  1331. if len(_linklist)>0:
  1332. _role_dict["linklist"] = _linklist
  1333. Project_rolelist.append(_role_dict)
  1334. _changed = True
  1335. _extract["extract_count"] += 1
  1336. if not has_budget or budget_unexpected:
  1337. _budget = _extract_ai.get("招标信息",{}).get("项目预算","")
  1338. if _budget is not None and _budget!="":
  1339. _budget = getUnifyMoney(_budget)
  1340. if _budget>0:
  1341. Project["tendereeMoney"] = str(float(_budget))
  1342. _changed = True
  1343. _extract["extract_count"] += 1
  1344. if not has_win_tenderer or winprice_unexpected:
  1345. list_win = _extract_ai.get("中标信息",[])
  1346. if len(list_win)>0:
  1347. if winprice_unexpected:
  1348. for _pack,_pack_value in prem.items():
  1349. _rolelist = _pack_value.get("roleList",[])
  1350. new_rolelist = []
  1351. for _role in _rolelist:
  1352. if _role.get("role_name","")!="win_tenderer":
  1353. new_rolelist.append(_role)
  1354. _pack_value["roleList"] = new_rolelist
  1355. for _win_dict_i in range(len(list_win)):
  1356. _win_dict = list_win[_win_dict_i]
  1357. _pack = _win_dict.get("标段号","")
  1358. if _pack=="":
  1359. if len(list_win)>1:
  1360. _pack = "AI_%d"%(_win_dict_i)
  1361. else:
  1362. _pack = "Project"
  1363. _win_money = _win_dict.get("中标金额")
  1364. if _win_money!="":
  1365. _win_money = getUnifyMoney(_win_money)
  1366. else:
  1367. _win_money = 0
  1368. _win_tenderer = _win_dict.get("中标人名称")
  1369. if _win_tenderer!="" and len(_win_tenderer)>=4:
  1370. _role_dict = {
  1371. "role_name": "win_tenderer",
  1372. "role_text": _win_tenderer,
  1373. }
  1374. if _win_money>2000:
  1375. _role_dict["role_money"] = {
  1376. "money": str(float(_win_money))
  1377. }
  1378. _changed = True
  1379. _extract["extract_count"] += 2
  1380. if _pack=="Project":
  1381. Project_rolelist.append(_role_dict)
  1382. else:
  1383. prem[_pack] = {
  1384. "roleList":[
  1385. _role_dict
  1386. ]
  1387. }
  1388. return json.dumps(_extract,ensure_ascii=False),_changed
  1389. def delete_document_extract(self,save_count=70*10000):
  1390. conn = self.pool_postgres.getConnector()
  1391. try:
  1392. cursor = conn.cursor()
  1393. sql = " select max(docid),min(docid) from document_extract "
  1394. cursor.execute(sql)
  1395. rows = cursor.fetchall()
  1396. if len(rows)>0:
  1397. maxdocid,mindocid = rows[0]
  1398. d_mindocid = int(maxdocid)-save_count
  1399. if mindocid<d_mindocid:
  1400. sql = " delete from document_extract where docid<%d"%d_mindocid
  1401. cursor.execute(sql)
  1402. conn.commit()
  1403. except Exception as e:
  1404. traceback.print_exc()
  1405. finally:
  1406. self.pool_postgres.putConnector(conn)
  1407. def start_flow_extract(self):
  1408. schedule = BlockingScheduler()
  1409. schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
  1410. schedule.add_job(self.process_extract_failed,"cron",minute="*/5")
  1411. # schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
  1412. # schedule.add_job(self.monitor_listener,"cron",minute="*/5")
  1413. schedule.start()
  1414. from multiprocessing import RLock
  1415. docid_lock = RLock()
  1416. conn_mysql = None
  1417. def generateRangeDocid(nums):
  1418. global conn_mysql
  1419. while 1:
  1420. try:
  1421. with docid_lock:
  1422. if conn_mysql is None:
  1423. conn_mysql = getConnection_mysql()
  1424. cursor = conn_mysql.cursor()
  1425. sql = "select serial_value from b2c_serial_no where serial_name='DocumentIdSerial'"
  1426. cursor.execute(sql)
  1427. rows = cursor.fetchall()
  1428. current_docid = rows[0][0]
  1429. next_docid = current_docid+1
  1430. update_docid = current_docid+nums
  1431. sql = " update b2c_serial_no set serial_value=%d where serial_name='DocumentIdSerial'"%(update_docid)
  1432. cursor.execute(sql)
  1433. conn_mysql.commit()
  1434. return next_docid
  1435. except Exception as e:
  1436. conn_mysql = getConnection_mysql()
  1437. # 自定义jsonEncoder
  1438. class MyEncoder(json.JSONEncoder):
  1439. def default(self, obj):
  1440. if isinstance(obj, np.ndarray):
  1441. return obj.tolist()
  1442. elif isinstance(obj, bytes):
  1443. return str(obj, encoding='utf-8')
  1444. elif isinstance(obj, (np.float_, np.float16, np.float32,
  1445. np.float64,Decimal)):
  1446. return float(obj)
  1447. elif isinstance(obj,str):
  1448. return obj
  1449. return json.JSONEncoder.default(self, obj)
  1450. class Dataflow_init(Dataflow):
  1451. class InitListener():
  1452. def __init__(self,conn,*args,**kwargs):
  1453. self.conn = conn
  1454. self.get_count = 1000
  1455. self.count = self.get_count
  1456. self.begin_docid = None
  1457. self.mq_init = "/queue/dataflow_init"
  1458. self.mq_attachment = "/queue/dataflow_attachment"
  1459. self.mq_extract = "/queue/dataflow_extract"
  1460. self.pool_mq1 = ConnectorPool(1,4,getConnect_activateMQ)
  1461. def on_error(self, headers):
  1462. log('received an error %s' % headers.body)
  1463. def getRangeDocid(self):
  1464. begin_docid = generateRangeDocid(self.get_count)
  1465. self.begin_docid = begin_docid
  1466. self.count = 0
  1467. def getNextDocid(self):
  1468. if self.count>=self.get_count:
  1469. self.getRangeDocid()
  1470. next_docid = self.begin_docid+self.count
  1471. self.count += 1
  1472. return next_docid
  1473. def on_message(self, headers):
  1474. try:
  1475. next_docid = int(self.getNextDocid())
  1476. partitionkey = int(next_docid%500+1)
  1477. message_id = headers.headers["message-id"]
  1478. body = json.loads(headers.body)
  1479. body[document_tmp_partitionkey] = partitionkey
  1480. body[document_tmp_docid] = next_docid
  1481. if body.get(document_original_docchannel) is None:
  1482. body[document_original_docchannel] = body.get(document_docchannel)
  1483. page_attachments = body.get(document_tmp_attachment_path,"[]")
  1484. _uuid = body.get(document_tmp_uuid,"")
  1485. if page_attachments!="[]":
  1486. status = random.randint(1,10)
  1487. body[document_tmp_status] = status
  1488. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment):
  1489. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  1490. ackMsg(self.conn,message_id)
  1491. else:
  1492. log("send_msg_error on init listener")
  1493. else:
  1494. status = random.randint(11,50)
  1495. body[document_tmp_status] = status
  1496. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract):
  1497. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  1498. ackMsg(self.conn,message_id)
  1499. else:
  1500. log("send_msg_error on init listener")
  1501. except Exception as e:
  1502. traceback.print_exc()
  1503. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_init):
  1504. log("init error")
  1505. ackMsg(self.conn,message_id)
  1506. def __del__(self):
  1507. self.conn.disconnect()
  1508. del self.pool_mq1
  1509. def __init__(self):
  1510. Dataflow.__init__(self)
  1511. self.max_shenpi_id = None
  1512. self.base_shenpi_id = 400000000000
  1513. self.mq_init = "/queue/dataflow_init"
  1514. self.mq_attachment = "/queue/dataflow_attachment"
  1515. self.mq_extract = "/queue/dataflow_extract"
  1516. self.pool_oracle = ConnectorPool(10,15,getConnection_oracle)
  1517. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  1518. self.ots_capacity = getConnect_ots_capacity()
  1519. self.init_comsumer_counts = 2
  1520. self.list_init_comsumer = []
  1521. for i in range(self.init_comsumer_counts):
  1522. listener = self.InitListener(getConnect_activateMQ())
  1523. createComsumer(listener,self.mq_init)
  1524. self.list_init_comsumer.append(listener)
  1525. def monitor_listener(self):
  1526. for i in range(len(self.list_init_comsumer)):
  1527. if self.list_init_comsumer[i].conn.is_connected():
  1528. continue
  1529. else:
  1530. listener = self.InitListener(getConnect_activateMQ())
  1531. createComsumer(listener,self.mq_init)
  1532. self.list_init_comsumer[i] = listener
  1533. def temp2mq(self,object):
  1534. conn_oracle = self.pool_oracle.getConnector()
  1535. try:
  1536. list_obj = object.select_rows(conn_oracle,type(object),object.table_name,[])
  1537. for _obj in list_obj:
  1538. ots_dict = _obj.getProperties_ots()
  1539. if len(ots_dict.get("dochtmlcon",""))>500000:
  1540. _obj.delete_row(conn_oracle)
  1541. log("msg too long:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  1542. continue
  1543. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_init):
  1544. #删除数据,上线放开
  1545. _obj.delete_row(conn_oracle)
  1546. else:
  1547. log("send_msg_error111:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  1548. self.pool_oracle.putConnector(conn_oracle)
  1549. except Exception as e:
  1550. traceback.print_exc()
  1551. self.pool_oracle.decrease()
  1552. def shenpi2mq(self):
  1553. conn_oracle = self.pool_oracle.getConnector()
  1554. try:
  1555. if self.max_shenpi_id is None:
  1556. # get the max_shenpi_id
  1557. _query = BoolQuery(must_queries=[ExistsQuery("id")])
  1558. rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
  1559. SearchQuery(_query,sort=Sort(sorters=[FieldSort("id",SortOrder.DESC)]),limit=1))
  1560. list_data = getRow_ots(rows)
  1561. if len(list_data)>0:
  1562. max_shenpi_id = list_data[0].get("id")
  1563. if max_shenpi_id>self.base_shenpi_id:
  1564. max_shenpi_id -= self.base_shenpi_id
  1565. self.max_shenpi_id = max_shenpi_id
  1566. if self.max_shenpi_id<60383953:
  1567. self.max_shenpi_id = 60383953
  1568. if self.max_shenpi_id is not None:
  1569. # select data in order
  1570. origin_max_shenpi_id = T_SHEN_PI_XIANG_MU.get_max_id(conn_oracle)
  1571. if origin_max_shenpi_id is not None:
  1572. log("shenpi origin_max_shenpi_id:%d current_id:%d"%(origin_max_shenpi_id,self.max_shenpi_id))
  1573. for _id_i in range(self.max_shenpi_id+1,origin_max_shenpi_id+1):
  1574. list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i)
  1575. # send data to mq one by one with max_shenpi_id updated
  1576. for _data in list_data:
  1577. _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
  1578. ots_dict = _data.getProperties_ots()
  1579. if ots_dict["docid"]<self.base_shenpi_id:
  1580. ots_dict["docid"] += self.base_shenpi_id
  1581. ots_dict["partitionkey"] = ots_dict["docid"]%500+1
  1582. if ots_dict.get(T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS,"") !='[]':
  1583. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_attachment):
  1584. self.max_shenpi_id = _id
  1585. else:
  1586. log("sent shenpi message to mq failed %s"%(_id))
  1587. break
  1588. else:
  1589. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_extract):
  1590. self.max_shenpi_id = _id
  1591. else:
  1592. log("sent shenpi message to mq failed %s"%(_id))
  1593. break
  1594. self.pool_oracle.putConnector(conn_oracle)
  1595. except Exception as e:
  1596. log("shenpi error")
  1597. traceback.print_exc()
  1598. self.pool_oracle.decrease()
  1599. def fix_shenpi(self):
  1600. pool_oracle = ConnectorPool(10,15,getConnection_oracle)
  1601. begin_id = 0
  1602. end_id = 64790010
  1603. thread_num = 15
  1604. step = (end_id-begin_id)//thread_num
  1605. list_items = []
  1606. for _i in range(thread_num):
  1607. _begin = _i*step
  1608. _end = (_i+1)*step-1
  1609. if _i==thread_num-1:
  1610. _end = end_id
  1611. list_items.append((_begin,_end,_i))
  1612. task_queue = Queue()
  1613. for item in list_items:
  1614. task_queue.put(item)
  1615. fix_count_list = []
  1616. def _handle(item,result_queue):
  1617. conn_oracle = pool_oracle.getConnector()
  1618. (begin_id,end_id,thread_id) = item
  1619. _count = 0
  1620. for _id_i in range(begin_id,end_id):
  1621. try:
  1622. bool_query = BoolQuery(must_queries=[
  1623. TermQuery("docchannel",302),
  1624. TermQuery("original_id",_id_i)
  1625. ])
  1626. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1627. SearchQuery(bool_query,get_total_count=True))
  1628. if total_count>0:
  1629. continue
  1630. # bool_query = BoolQuery(must_queries=[
  1631. # TermQuery("id",_id_i),
  1632. # ])
  1633. # rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
  1634. # SearchQuery(bool_query,get_total_count=True))
  1635. # if total_count>0:
  1636. # continue
  1637. try:
  1638. list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i)
  1639. except Exception as e:
  1640. continue
  1641. # send data to mq one by one with max_shenpi_id updated
  1642. for _data in list_data:
  1643. _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
  1644. ots_dict = _data.getProperties_ots()
  1645. if ots_dict["docid"]<self.base_shenpi_id:
  1646. ots_dict["docid"] += self.base_shenpi_id
  1647. ots_dict["partitionkey"] = ots_dict["docid"]%500+1
  1648. ots_dict["status"] = 201
  1649. dict_1 = {}
  1650. dict_2 = {}
  1651. for k,v in ots_dict.items():
  1652. if k!="dochtmlcon":
  1653. dict_1[k] = v
  1654. if k in ('partitionkey',"docid","dochtmlcon"):
  1655. dict_2[k] = v
  1656. d_1 = Document(dict_1)
  1657. d_2 = Document(dict_2)
  1658. d_1.update_row(self.ots_client)
  1659. d_2.update_row(self.ots_capacity)
  1660. _count += 1
  1661. except Exception as e:
  1662. traceback.print_exc()
  1663. log("thread_id:%d=%d/%d/%d"%(thread_id,_id_i-begin_id,_count,end_id-begin_id))
  1664. fix_count_list.append(_count)
  1665. pool_oracle.putConnector(conn_oracle)
  1666. mt = MultiThreadHandler(task_queue,_handle,None,thread_count=thread_num)
  1667. mt.run()
  1668. print(fix_count_list,sum(fix_count_list))
  1669. def ots2mq(self):
  1670. try:
  1671. bool_query = BoolQuery(must_queries=[RangeQuery("status",1,51)])
  1672. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1673. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  1674. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1675. list_data = getRow_ots(rows)
  1676. task_queue = Queue()
  1677. for _data in list_data:
  1678. task_queue.put(_data)
  1679. while next_token:
  1680. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1681. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1682. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1683. list_data = getRow_ots(rows)
  1684. for _data in list_data:
  1685. task_queue.put(_data)
  1686. if task_queue.qsize()>=1000:
  1687. break
  1688. def _handle(_data,result_queue):
  1689. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1690. document_tmp_docid:_data.get(document_tmp_docid),
  1691. document_tmp_status:0}
  1692. _document = Document(_d)
  1693. _document.fix_columns(self.ots_client,None,True)
  1694. _data = _document.getProperties()
  1695. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1696. _document_html = Document(_data)
  1697. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  1698. if page_attachments!="[]":
  1699. status = random.randint(1,10)
  1700. _data[document_tmp_status] = status
  1701. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1702. else:
  1703. status = random.randint(11,50)
  1704. _data[document_tmp_status] = status
  1705. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1706. if send_succeed:
  1707. _document.setValue(document_tmp_status,0,True)
  1708. _document.update_row(self.ots_client)
  1709. else:
  1710. log("send_msg_error2222")
  1711. if task_queue.qsize()>0:
  1712. mt = MultiThreadHandler(task_queue,_handle,None,15)
  1713. mt.run()
  1714. except Exception as e:
  1715. traceback.print_exc()
  1716. def otstmp2mq(self):
  1717. try:
  1718. bool_query = BoolQuery(must_queries=[TermQuery("status",0)])
  1719. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1720. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  1721. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1722. list_data = getRow_ots(rows)
  1723. for _data in list_data:
  1724. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1725. document_tmp_docid:_data.get(document_tmp_docid),
  1726. document_tmp_status:0}
  1727. _document = Document_tmp(_d)
  1728. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1729. log("refix doc %s from document_tmp"%(str(_data.get(document_tmp_docid))))
  1730. _document_html = Document_html(_data)
  1731. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  1732. if page_attachments!="[]":
  1733. status = random.randint(1,10)
  1734. _data[document_tmp_status] = status
  1735. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1736. else:
  1737. status = random.randint(11,50)
  1738. _data[document_tmp_status] = status
  1739. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1740. if send_succeed:
  1741. _document.setValue(document_tmp_status,1,True)
  1742. _document.update_row(self.ots_client)
  1743. else:
  1744. log("send_msg_error2222")
  1745. while next_token:
  1746. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1747. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1748. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1749. list_data = getRow_ots(rows)
  1750. for _data in list_data:
  1751. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1752. document_tmp_docid:_data.get(document_tmp_docid),
  1753. document_tmp_status:0}
  1754. _document = Document_tmp(_d)
  1755. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1756. _document_html = Document_html(_data)
  1757. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  1758. if page_attachments!="[]":
  1759. status = random.randint(1,10)
  1760. _data[document_tmp_status] = status
  1761. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1762. else:
  1763. status = random.randint(11,50)
  1764. _data[document_tmp_status] = status
  1765. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1766. if send_succeed:
  1767. _document.setValue(document_tmp_status,1,True)
  1768. _document.update_row(self.ots_client)
  1769. else:
  1770. log("send_msg_error2222")
  1771. except Exception as e:
  1772. traceback.print_exc()
  1773. def test_dump_docid(self):
  1774. class TestDumpListener(ActiveMQListener):
  1775. def on_message(self, headers):
  1776. message_id = headers.headers["message-id"]
  1777. body = headers.body
  1778. self._queue.put(headers,True)
  1779. ackMsg(self.conn,message_id)
  1780. _queue = Queue()
  1781. listener1 = TestDumpListener(getConnect_activateMQ(),_queue)
  1782. listener2 = TestDumpListener(getConnect_activateMQ(),_queue)
  1783. createComsumer(listener1,"/queue/dataflow_attachment")
  1784. createComsumer(listener2,"/queue/dataflow_extract")
  1785. time.sleep(10)
  1786. list_item = []
  1787. list_docid = []
  1788. while 1:
  1789. try:
  1790. _item = _queue.get(timeout=2)
  1791. list_item.append(_item)
  1792. except Exception as e:
  1793. break
  1794. for item in list_item:
  1795. _item = json.loads(item.body)
  1796. list_docid.append(_item.get("docid"))
  1797. log(list_docid[:10])
  1798. log("len docid:%d set len:%d"%(len(list_docid),len(set(list_docid))))
  1799. def start_dataflow_init(self):
  1800. # self.test_dump_docid()
  1801. from BaseDataMaintenance.model.oracle.CaiGouYiXiangTemp import CaiGouYiXiangTemp
  1802. from BaseDataMaintenance.model.oracle.PaiMaiChuRangTemp import PaiMaiChuRangTemp
  1803. from BaseDataMaintenance.model.oracle.ZhaoBiaoGongGaoTemp import ZhaoBiaoGongGaoTemp
  1804. from BaseDataMaintenance.model.oracle.ZhaoBiaoYuGaoTemp import ZhaoBiaoYuGaoTemp
  1805. from BaseDataMaintenance.model.oracle.ZhongBiaoXinXiTemp import ZhongBiaoXinXiTemp
  1806. from BaseDataMaintenance.model.oracle.ZiShenJieGuoTemp import ZiShenJieGuoTemp
  1807. from BaseDataMaintenance.model.oracle.ChanQuanJiaoYiTemp import ChanQuanJiaoYiTemp
  1808. from BaseDataMaintenance.model.oracle.GongGaoBianGengTemp import GongGaoBianGeng
  1809. from BaseDataMaintenance.model.oracle.KongZhiJiaTemp import KongZhiJiaTemp
  1810. from BaseDataMaintenance.model.oracle.TuDiKuangChanTemp import TuDiKuangChanTemp
  1811. from BaseDataMaintenance.model.oracle.ZhaoBiaoDaYiTemp import ZhaoBiaoDaYiTemp
  1812. from BaseDataMaintenance.model.oracle.ZhaoBiaoWenJianTemp import ZhaoBiaoWenJianTemp
  1813. from BaseDataMaintenance.model.oracle.TouSuChuLiTemp import TouSuChuLiTemp
  1814. from BaseDataMaintenance.model.oracle.WeiFaJiLuTemp import WeiFaJiLuTemp
  1815. from BaseDataMaintenance.model.oracle.QiTaShiXinTemp import QiTaShiXin
  1816. schedule = BlockingScheduler()
  1817. schedule.add_job(self.temp2mq,"cron",args=(CaiGouYiXiangTemp({}),),second="*/10")
  1818. schedule.add_job(self.temp2mq,"cron",args=(PaiMaiChuRangTemp({}),),second="*/10")
  1819. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoGongGaoTemp({}),),second="*/10")
  1820. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoYuGaoTemp({}),),second="*/10")
  1821. schedule.add_job(self.temp2mq,"cron",args=(ZhongBiaoXinXiTemp({}),),second="*/10")
  1822. schedule.add_job(self.temp2mq,"cron",args=(ZiShenJieGuoTemp({}),),second="*/10")
  1823. schedule.add_job(self.temp2mq,"cron",args=(ChanQuanJiaoYiTemp({}),),second="*/10")
  1824. schedule.add_job(self.temp2mq,"cron",args=(GongGaoBianGeng({}),),second="*/10")
  1825. schedule.add_job(self.temp2mq,"cron",args=(KongZhiJiaTemp({}),),second="*/10")
  1826. schedule.add_job(self.temp2mq,"cron",args=(TuDiKuangChanTemp({}),),second="*/10")
  1827. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10")
  1828. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10")
  1829. schedule.add_job(self.temp2mq,"cron",args=(TouSuChuLiTemp({}),),second="*/10")
  1830. schedule.add_job(self.temp2mq,"cron",args=(WeiFaJiLuTemp({}),),second="*/10")
  1831. schedule.add_job(self.temp2mq,"cron",args=(QiTaShiXin({}),),second="*/10")
  1832. schedule.add_job(self.ots2mq,"cron",second="*/10")
  1833. schedule.add_job(self.otstmp2mq,"cron",second="*/10")
  1834. schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  1835. schedule.add_job(self.shenpi2mq,"cron",minute="*/1")
  1836. schedule.start()
  1837. def transform_attachment():
  1838. from BaseDataMaintenance.model.ots.attachment import attachment
  1839. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  1840. from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots
  1841. from threading import Thread
  1842. from queue import Queue
  1843. task_queue = Queue()
  1844. def comsumer(task_queue,pool_postgres):
  1845. while 1:
  1846. _dict = task_queue.get(True)
  1847. try:
  1848. attach = Attachment_postgres(_dict)
  1849. if not attach.exists(pool_postgres):
  1850. attach.insert_row(pool_postgres)
  1851. except Exception as e:
  1852. traceback.print_exc()
  1853. def start_comsumer(task_queue):
  1854. pool_postgres = ConnectorPool(10,30,getConnection_postgres)
  1855. comsumer_count = 30
  1856. list_thread = []
  1857. for i in range(comsumer_count):
  1858. _t = Thread(target=comsumer,args=(task_queue,pool_postgres))
  1859. list_thread.append(_t)
  1860. for _t in list_thread:
  1861. _t.start()
  1862. ots_client = getConnect_ots()
  1863. _thread = Thread(target=start_comsumer,args=(task_queue,))
  1864. _thread.start()
  1865. bool_query = BoolQuery(must_queries=[
  1866. RangeQuery(attachment_crtime,"2022-05-06"),
  1867. BoolQuery(should_queries=[TermQuery(attachment_status,10),
  1868. TermQuery(attachment_status,ATTACHMENT_TOOLARGE)])
  1869. ])
  1870. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  1871. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(attachment_crtime,sort_order=SortOrder.DESC)]),get_total_count=True,limit=100),
  1872. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1873. list_dict = getRow_ots(rows)
  1874. for _dict in list_dict:
  1875. task_queue.put(_dict,True)
  1876. _count = len(list_dict)
  1877. while next_token:
  1878. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  1879. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1880. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1881. list_dict = getRow_ots(rows)
  1882. for _dict in list_dict:
  1883. task_queue.put(_dict,True)
  1884. _count += len(list_dict)
  1885. print("%d/%d,queue:%d"%(_count,total_count,task_queue.qsize()))
  1886. def del_test_doc():
  1887. ots_client = getConnect_ots()
  1888. bool_query = BoolQuery(must_queries=[RangeQuery("docid",range_to=0)])
  1889. list_data = []
  1890. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1891. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  1892. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1893. print(total_count)
  1894. list_row = getRow_ots(rows)
  1895. list_data.extend(list_row)
  1896. while next_token:
  1897. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1898. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1899. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1900. list_row = getRow_ots(rows)
  1901. list_data.extend(list_row)
  1902. for _row in list_data:
  1903. _doc = Document_tmp(_row)
  1904. _doc.delete_row(ots_client)
  1905. _html = Document_html(_row)
  1906. if _html.exists_row(ots_client):
  1907. _html.delete_row(ots_client)
  1908. def fixDoc_to_queue_extract():
  1909. pool_mq = ConnectorPool(10,20,getConnect_activateMQ)
  1910. try:
  1911. ots_client = getConnect_ots()
  1912. ots_capacity = getConnect_ots_capacity()
  1913. bool_query = BoolQuery(must_queries=[
  1914. RangeQuery("crtime","2022-05-31"),
  1915. TermQuery("docchannel",114)
  1916. ])
  1917. list_data = []
  1918. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1919. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  1920. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1921. print(total_count)
  1922. list_row = getRow_ots(rows)
  1923. list_data.extend(list_row)
  1924. _count = len(list_row)
  1925. while next_token:
  1926. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1927. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1928. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1929. list_row = getRow_ots(rows)
  1930. list_data.extend(list_row)
  1931. _count = len(list_data)
  1932. print("%d/%d"%(_count,total_count))
  1933. task_queue = Queue()
  1934. for _row in list_data:
  1935. if "all_columns" in _row:
  1936. _row.pop("all_columns")
  1937. _html = Document(_row)
  1938. task_queue.put(_html)
  1939. def _handle(item,result_queue):
  1940. _html = item
  1941. _html.fix_columns(ots_capacity,["dochtmlcon"],True)
  1942. print(_html.getProperties().get(document_tmp_docid))
  1943. send_msg_toacmq(pool_mq,json.dumps(_html.getProperties()),"/queue/dataflow_extract")
  1944. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1945. mt.run()
  1946. except Exception as e:
  1947. traceback.print_exc()
  1948. finally:
  1949. pool_mq.destory()
  1950. def check_data_synchronization():
  1951. # filepath = "C:\\Users\\Administrator\\Desktop\\to_check.log"
  1952. # list_uuid = []
  1953. # _regrex = "delete\s+(?P<tablename>[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  1954. # with open(filepath,"r",encoding="utf8") as f:
  1955. # while 1:
  1956. # _line = f.readline()
  1957. # if not _line:
  1958. # break
  1959. # _match = re.search(_regrex,_line)
  1960. # if _match is not None:
  1961. # _uuid = _match.groupdict().get("uuid")
  1962. # tablename = _match.groupdict.get("tablename")
  1963. # if _uuid is not None:
  1964. # list_uuid.append({"uuid":_uuid,"tablename":tablename})
  1965. # print("total_count:",len(list_uuid))
  1966. import pandas as pd
  1967. from BaseDataMaintenance.common.Utils import load
  1968. task_queue = Queue()
  1969. list_data = []
  1970. df_data = load("uuid.pk")
  1971. # df_data = pd.read_excel("check.xlsx")
  1972. for uuid,tablename in zip(df_data["uuid"],df_data["tablename"]):
  1973. _dict = {"uuid":uuid,
  1974. "tablename":tablename}
  1975. list_data.append(_dict)
  1976. task_queue.put(_dict)
  1977. print("qsize:",task_queue.qsize())
  1978. ots_client = getConnect_ots()
  1979. def _handle(_item,result_queue):
  1980. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  1981. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1982. SearchQuery(bool_query,get_total_count=True),
  1983. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1984. _item["exists"] = total_count
  1985. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1986. mt.run()
  1987. df_data = {"uuid":[],
  1988. "tablename":[],
  1989. "exists":[]}
  1990. for _data in list_data:
  1991. if _data["exists"]==0:
  1992. for k,v in df_data.items():
  1993. v.append(_data.get(k))
  1994. import pandas as pd
  1995. df2 = pd.DataFrame(df_data)
  1996. df2.to_excel("check1.xlsx")
  1997. current_path = os.path.abspath(os.path.dirname(__file__))
  1998. if __name__ == '__main__':
  1999. # di = Dataflow_init()
  2000. # di.start_dataflow_init()
  2001. # transform_attachment()
  2002. # del_test_doc()
  2003. de = Dataflow_ActivteMQ_extract(create_listener=True)
  2004. # print(getUnifyMoney('第1 - 5年承包费为1135元/年/亩,第6 - 10年承包费为1235元/年/亩'))
  2005. # a = '''
  2006. # {"addr_dic": {"addr_bidsend": "江西省九江市"}, "aptitude": "二、供应商资格条件,符合《中华人民共和国政府采购法》第二十二条规定,具有独立承担民事责任的能力,是中国境内注册的独立法人企业。1、是以上采购药品的生产企业或生产厂家授权的代理商,具有药品生产或经营许可证。2、具有良好的商业信誉和健全的财务会计制度。3、具有履行合同、提供优质服务的能力。4、提供的产品质量达标、性能可靠、技术合格。5、不接受联合体报名。6、供应商为非外资独资或外资控股企业。7、此次议价报名面向省标新平台有配送资质的配送商。", "attachmentTypes": "", "bid_score": [], "bidway": "", "candidate": "", "code": [], "code_investment": "", "cost_time": {"attrs": 0.14, "codename": 0.08, "deposit": 0.0, "district": 0.08, "kvtree": 0.08, "moneygrade": 0.0, "nerToken": 0.2, "outline": 0.05, "pb_extract": 0.18, "person": 0.01, "prem": 0.02, "preprocess": 0.42, "product": 0.08, "product_attrs": 0.03, "roleRuleFinal": 0.01, "rolegrade": 0.0, "rule": 0.03, "rule_channel": 0.03, "rule_channel2": 0.24, "tableToText": 0.13000357627868653, "tendereeRuleRecall": 0.0, "time": 0.01, "total_unit_money": 0.0}, "demand_info": {"data": [], "header": [], "header_col": []}, "deposit_patment_way": "", "dict_enterprise": {"中华人民共和国": {"in_text": 1}, "中国政府": {"in_text": 1}}, "district": {"area": "华东", "city": "九江", "district": "未知", "is_in_text": false, "province": "江西"}, "docchannel": {"docchannel": "招标公告", "doctype": "采招数据", "life_docchannel": "招标公告", "use_original_docchannel": 0}, "docid": "", "doctitle_refine": "某院2025年断供药品议价", "exist_table": 1, "extract_count": 4, "fail_reason": "", "fingerprint": "md5=a4b79914622c8ba604835f7159fa3bf7", "industry": {"class": "零售批发", "class_name": "医药品", "subclass": "医药品"}, "is_deposit_project": false, "label_dic": {"is_direct_procurement": 1, "mode_of_partipation": 1, "need_qualification": 1}, "match_enterprise": [], "match_enterprise_type": 0, "moneys": [], "moneys_attachment": [], "moneysource": "", "name": "2025年断供药品议价", "nlp_enterprise": ["中华人民共和国", "中国政府"], "nlp_enterprise_attachment": [], "pb": {"projectDigest": "项目内容,我院就以下药品进行议价,欢迎贵单位参加报名。一、项目名称:2025年断供药品议价公告,二、供应商资格条件,符合《中华人民共和国政府采购法》第二十二条规定,具有独立承担民事责任的能力,是中国境内注册的独立法人企业。1、是以上采购药品的生产企业或生产厂家授权的代理商,具有药品生产或经营许可证", "project_name_refind": "某院2025年断供药品议价公告", "project_property": "新建"}, "pb_project_name": "某院2025年断供药品议价公告", "person_review": [], "pinmu_name": "", "policies": ["《中华人民共和国政府采购法》", "《中华人民共和国政府采购法》"], "prem": {"Project": {"roleList": [], "tendereeMoney": "0.000000"}}, "process_time": "2025-03-11 21:55:31", "product": ["断供药品", "药品", "葡醛内酯片", "对乙酰氨基酚干混悬剂", "氢化可的松注射液", "维生素B12注射液", "血脂康胶囊", "复方甘露醇注射液"], "product_attrs": {"data": [{"product": "葡醛内酯片"}, {"product": "对乙酰氨基酚干混悬剂"}, {"product": "氢化可的松注射液"}, {"product": "维生素B12注射液"}, {"product": "血脂康胶囊"}, {"product": "复方甘露醇注射液"}], "header": ["药品名称_____规格*包装数_____"], "header_col": ["序号_药品名称_剂型_规格_规格*包装数_生产厂家_平台价(元)_供应商报价_是否两票制_是否医保_是否可稳定供货_备注"]}, "project_contacts": [["卢女士", "0792-7166052"]], "project_label": {"标题": {}, "核心字段": {"试剂": [["甘露醇", 1]]}}, "property_label": "", "proportion": "", "requirement": "一、采购项目内容,我院就以下药品进行议价,欢迎贵单位参加报名。一、项目名称:2025年断供药品议价公告,二、供应商资格条件,符合《中华人民共和国政府采购法》第二十二条规定,具有独立承担民事责任的能力,是中国境内注册的独立法人企业。1、是以上采购药品的生产企业或生产厂家授权的代理商,具有药品生产或经营许可证。2、具有良好的商业信誉和健全的财务会计制度。3、具有履行合同、提供优质服务的能力。4、提供的产品质量达标、性能可靠、技术合格。5、不接受联合体报名。6、供应商为非外资独资或外资控股企业。7、此次议价报名面向省标新平台有配送资质的配送商。四、报名资料提交要求(复印件需加盖公司公章),(一)企业资料,1、报名企业营业执照(含统一社会信用代码,副本复印件(非三证合一企业须同时提供税务登记证、组织机构代码证,副本复印件)。2、报名企业谈判代表授权书,授权人、被授权人身份证复印件,并加盖公章。3、报名企业须提供军队采购网注册备案登记截图,4、报名企业须提供企业以及产品生产企业的中国政府采购网、军队采购网、信用中国网站信用查询截图。(二)产品资料,1、报名表(根据报名目录将产品信息填好),2、价格承诺书(附件一),3、报价单(现场报价),每个项目按以上顺序要求提供一份完整的报名资料装订,其中(二)产品资料中第3项“报价单”须单独装订,有厂家授权委托书的,一起密封在一个信封内,报价现场拆封,现场报价。其余报名资料必须在公示截止之前交到指定报名地点,五、报名文件递交时间、地点及方式,1、报名文件递交时间:2025年3月12日至2025年3月14日17:00(北京时间)。2、报名文件递交地点:江西省九江市,3、报名文件递交方式:指定专人递交报名文件,接受邮寄等其他方式。六、联系人及联系方式,联系人:卢女士联系电话:0792-7166052,", "serviceTime": {"service_days": 0, "service_end": "", "service_start": ""}, "success": true, "time_bidclose": "2025-03-14", "time_bidopen": "", "time_bidstart": "2025-03-12 17:00:00", "time_commencement": "", "time_completion": "", "time_contractEnd": "", "time_contractStart": "", "time_earnestMoneyEnd": "", "time_earnestMoneyStart": "", "time_getFileEnd": "", "time_getFileStart": "", "time_listingEnd": "", "time_listingStart": "", "time_planned": "", "time_publicityEnd": "", "time_publicityStart": "", "time_registrationEnd": "", "time_registrationStart": "", "time_release": "2025-03-11 16:44:00", "time_signContract": "", "total_tendereeMoney": 0, "total_tendereeMoneyUnit": "", "version_date": "2025-03-10", "word_count": {"正文": 1329, "附件": 577}}
  2007. # '''
  2008. # b = '''
  2009. # {"招标信息": {"招标人名称": "某院", "项目预算": "0.000000万元(人民币)", "招标人联系方式": [{"联系人": "卢女士", "联系电话": "0792-7166052"}]}, "中标信息": []}
  2010. # '''
  2011. # print(de.merge_json(a,b))
  2012. # de.start_flow_extract()
  2013. # fixDoc_to_queue_extract()
  2014. # check_data_synchronization()
  2015. # fixDoc_to_queue_init(filename="C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-07.xlsx")