dataflow_mq.py 50 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151
  1. from BaseDataMaintenance.maintenance.dataflow import *
  2. from BaseDataMaintenance.common.activateMQUtils import *
  3. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ,getConnection_postgres,getConnection_mysql,getConnection_oracle
  4. from BaseDataMaintenance.dataSource.setttings import *
  5. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  6. import os
  7. from BaseDataMaintenance.common.ossUtils import *
  8. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  9. from BaseDataMaintenance.model.ots.document import Document
  10. from BaseDataMaintenance.common.Utils import article_limit
  11. class ActiveMQListener():
  12. def __init__(self,conn,_queue,*args,**kwargs):
  13. self.conn = conn
  14. self._queue = _queue
  15. def on_error(self, headers):
  16. log("===============")
  17. log('received an error %s' % str(headers.body))
  18. def on_message(self, headers):
  19. log("====message====")
  20. message_id = headers.headers["message-id"]
  21. body = headers.body
  22. self._queue.put({"frame":headers,"conn":self.conn},True)
  23. def __del__(self):
  24. self.conn.disconnect()
  25. class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
  26. def __init__(self):
  27. Dataflow_attachment.__init__(self)
  28. self.mq_attachment = "/queue/dataflow_attachment"
  29. self.mq_attachment_failed = "/queue/dataflow_attachment_failed"
  30. self.mq_extract = "/queue/dataflow_extract"
  31. self.comsumer_count = 80
  32. self.retry_comsumer_count = 10
  33. self.retry_times = 5
  34. for _i in range(self.comsumer_count):
  35. listener_attachment = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  36. createComsumer(listener_attachment,self.mq_attachment)
  37. self.attach_pool = ConnectorPool(10,30,getConnection_postgres)
  38. self.conn_mq = getConnect_activateMQ()
  39. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  40. def process_failed_attachment(self):
  41. from BaseDataMaintenance.java.MQInfo import getQueueSize
  42. attachment_size = getQueueSize("dataflow_attachment")
  43. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  44. if attachment_size<100 and failed_attachment_size>0:
  45. list_comsumer = []
  46. for _i in range(self.retry_comsumer_count):
  47. listener_attachment = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  48. list_comsumer.append(listener_attachment)
  49. createComsumer(listener_attachment,self.mq_attachment_failed)
  50. while 1:
  51. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  52. if failed_attachment_size==0:
  53. break
  54. time.sleep(600)
  55. for _c in list_comsumer:
  56. _c.conn.disconnect()
  57. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  58. try:
  59. list_html = []
  60. swf_urls = []
  61. _not_failed = True
  62. for _attach in list_attach:
  63. #测试全跑
  64. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  65. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  66. if _html is None:
  67. _html = ""
  68. list_html.append(_html)
  69. else:
  70. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  71. if not _succeed:
  72. _not_failed = False
  73. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  74. if _html is None:
  75. _html = ""
  76. list_html.append(_html)
  77. if _attach.getProperties().get(attachment_filetype)=="swf":
  78. swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  79. if not _not_failed:
  80. return False,list_html,swf_urls
  81. return True,list_html,swf_urls
  82. except requests.ConnectionError as e1:
  83. raise e1
  84. except Exception as e:
  85. return False,list_html,swf_urls
  86. def attachment_recognize(self,_dict,result_queue):
  87. '''
  88. 识别附件内容
  89. :param _dict: 附件内容
  90. :param result_queue:
  91. :return:
  92. '''
  93. try:
  94. item = _dict.get("item")
  95. list_attach = _dict.get("list_attach")
  96. conn = _dict["conn"]
  97. message_id = _dict.get("message_id")
  98. _retry_times = item.get("retry_times",0)
  99. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  100. "docid":item.get("docid")})
  101. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  102. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  103. dtmp = Document_tmp(item)
  104. #调用识别接口
  105. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  106. _to_ack = False
  107. if not _succeed and _retry_times<self.retry_times:
  108. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  109. item["retry_times"] = _retry_times+1
  110. #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
  111. if item["retry_times"]>=self.retry_times:
  112. send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
  113. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
  114. #失败保存
  115. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  116. dtmp.setValue(document_tmp_status,0,True)
  117. if not dtmp.exists_row(self.ots_client):
  118. dtmp.update_row(self.ots_client)
  119. dhtml.update_row(self.ots_client)
  120. if send_succeed:
  121. _to_ack = True
  122. else:
  123. try:
  124. log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),"==".join([a[:10] for a in list_html])))
  125. dhtml.updateSWFImages(swf_urls)
  126. dhtml.updateAttachment(list_html)
  127. dtmp.setValue(document_tmp_attachment_extract_status,1,True)
  128. dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
  129. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
  130. if send_succeed:
  131. _to_ack = True
  132. except Exception as e:
  133. traceback.print_exc()
  134. if _to_ack:
  135. ackMsg(conn,message_id)
  136. log("document:%d get attachments with result:%s %s retry_times:%d"%(item.get("docid"),str(_succeed),str(_to_ack),_retry_times))
  137. except Exception as e:
  138. if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment):
  139. ackMsg(conn,message_id)
  140. def request_attachment_interface(self,attach,_dochtmlcon):
  141. filemd5 = attach.getProperties().get(attachment_filemd5)
  142. _status = attach.getProperties().get(attachment_status)
  143. _filetype = attach.getProperties().get(attachment_filetype)
  144. _path = attach.getProperties().get(attachment_path)
  145. _uuid = uuid4()
  146. objectPath = attach.getProperties().get(attachment_path)
  147. docids = attach.getProperties().get(attachment_docids)
  148. if objectPath is None:
  149. relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex)
  150. else:
  151. relative_path = objectPath[5:]
  152. localpath = "/FileInfo/%s"%(relative_path)
  153. if not os.path.exists(localpath):
  154. if not os.path.exists(os.path.dirname(localpath)):
  155. os.makedirs(os.path.dirname(localpath))
  156. local_exists = False
  157. else:
  158. local_exists = True
  159. _size = os.path.getsize(localpath)
  160. not_failed_flag = True
  161. try:
  162. d_start_time = time.time()
  163. if not local_exists:
  164. log("md5:%s path:%s not exists,start downloading"%(filemd5,objectPath[5:]))
  165. try:
  166. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  167. except Exception as e:
  168. download_succeed = False
  169. else:
  170. log("md5:%s path:%s exists"%(filemd5,objectPath[5:]))
  171. if not (local_exists or download_succeed):
  172. _ots_attach = attachment(attach.getProperties_ots())
  173. _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True)
  174. log("md5:%s path:%s file not in local or oss,search ots.attachment"%(filemd5,objectPath[5:]))
  175. if _ots_attach.getProperties().get(attachment_attachmenthtml,"")!="":
  176. attach.setValue(attachment_attachmenthtml,_ots_attach.getProperties().get(attachment_attachmenthtml,""))
  177. attach.setValue(attachment_attachmentcon,_ots_attach.getProperties().get(attachment_attachmentcon,""))
  178. attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,""))
  179. attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,""))
  180. if attach.exists(self.attach_pool):
  181. attach.update_row(self.attach_pool)
  182. else:
  183. attach.insert_row(self.attach_pool)
  184. try:
  185. if os.exists(localpath):
  186. os.remove(localpath)
  187. except Exception as e:
  188. pass
  189. return True
  190. if _ots_exists:
  191. objectPath = attach.getProperties().get(attachment_path)
  192. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  193. if download_succeed:
  194. log("md5:%s path:%s download file from oss succeed"%(filemd5,objectPath[5:]))
  195. else:
  196. log("md5:%s path:%s download file from ots failed=="%(filemd5,objectPath[5:]))
  197. else:
  198. log("md5:%s path:%s not found in ots"%(filemd5,objectPath[5:]))
  199. if local_exists or download_succeed:
  200. _size = os.path.getsize(localpath)
  201. attach.setValue(attachment_size,_size,True)
  202. if _size>ATTACHMENT_LARGESIZE:
  203. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE,True)
  204. log("attachment :%s of path:%s to large"%(filemd5,_path))
  205. _ots_attach = attachment(attach.getProperties_ots())
  206. _ots_attach.update_row(self.ots_client)
  207. #更新postgres
  208. if attach.exists(self.attach_pool):
  209. attach.update_row(self.attach_pool)
  210. else:
  211. attach.insert_row(self.attach_pool)
  212. if local_exists:
  213. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  214. os.remove(localpath)
  215. return True
  216. time_download = time.time()-d_start_time
  217. #调用接口处理结果
  218. start_time = time.time()
  219. _filetype = attach.getProperties().get(attachment_filetype)
  220. # _data_base64 = base64.b64encode(open(localpath,"rb").read())
  221. # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
  222. _success,_html,swf_images = getAttachDealInterface(None,_filetype,path=localpath)
  223. log("process filemd5:%s %s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
  224. if _success:
  225. if len(_html)<5:
  226. _html = ""
  227. else:
  228. sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  229. _html = ""
  230. return False
  231. swf_images = eval(swf_images)
  232. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  233. swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  234. if len(swf_urls)==0:
  235. objectPath = attach.getProperties().get(attachment_path,"")
  236. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  237. if not os.path.exists(swf_dir):
  238. os.mkdir(swf_dir)
  239. for _i in range(len(swf_images)):
  240. _base = swf_images[_i]
  241. _base = base64.b64decode(_base)
  242. filename = "swf_page_%d.png"%(_i)
  243. filepath = os.path.join(swf_dir,filename)
  244. with open(filepath,"wb") as f:
  245. f.write(_base)
  246. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  247. if os.path.exists(swf_dir):
  248. os.rmdir(swf_dir)
  249. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  250. if re.search("<td",_html) is not None:
  251. attach.setValue(attachment_has_table,1,True)
  252. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  253. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  254. if _file_title!="":
  255. attach.setValue(attachment_file_title,_file_title,True)
  256. if filelink!="":
  257. attach.setValue(attachment_file_link,filelink,True)
  258. attach.setValue(attachment_attachmenthtml,_html,True)
  259. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  260. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  261. attach.setValue(attachment_recsize,len(_html),True)
  262. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  263. #更新ots
  264. _ots_attach = attachment(attach.getProperties_ots())
  265. _ots_attach.update_row(self.ots_client) #线上再开放更新
  266. #更新postgres
  267. if attach.exists(self.attach_pool):
  268. attach.update_row(self.attach_pool)
  269. else:
  270. attach.insert_row(self.attach_pool)
  271. if local_exists:
  272. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  273. try:
  274. if os.exists(localpath):
  275. os.remove(localpath)
  276. except Exception as e:
  277. pass
  278. return True
  279. else:
  280. return True
  281. except requests.ConnectionError as e1:
  282. raise e1
  283. except oss2.exceptions.NotFound as e:
  284. return True
  285. except Exception as e:
  286. traceback.print_exc()
  287. def flow_attachment(self):
  288. self.flow_attachment_producer()
  289. self.flow_attachment_producer_comsumer()
  290. def getAttachPath(self,filemd5,_dochtmlcon):
  291. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  292. _find = _soup.find("a",attrs={"data":filemd5})
  293. filelink = ""
  294. if _find is None:
  295. _find = _soup.find("img",attrs={"data":filemd5})
  296. if _find is not None:
  297. filelink = _find.attrs.get("src","")
  298. else:
  299. filelink = _find.attrs.get("href","")
  300. _path = filelink.split("/file")
  301. if len(_path)>1:
  302. return _path[1]
  303. def getAttachments(self,list_filemd5,_dochtmlcon):
  304. conn = self.attach_pool.getConnector()
  305. #搜索postgres
  306. try:
  307. to_find_md5 = []
  308. for _filemd5 in list_filemd5[:50]:
  309. if _filemd5 is not None:
  310. to_find_md5.append(_filemd5)
  311. conditions = ["filemd5 in ('%s')"%("','".join(to_find_md5))]
  312. list_attachment = Attachment_postgres.select_rows(conn,Attachment_postgres,"attachment",conditions)
  313. log("select localpath database %d/%d"%(len(list_attachment),len(to_find_md5)))
  314. set_md5 = set()
  315. for _attach in list_attachment:
  316. set_md5.add(_attach.getProperties().get(attachment_filemd5))
  317. list_not_in_md5 = []
  318. for _filemd5 in to_find_md5:
  319. if _filemd5 not in set_md5:
  320. list_not_in_md5.append(_filemd5)
  321. _path = self.getAttachPath(_filemd5,_dochtmlcon)
  322. if _path is not None:
  323. _filetype = _path.split(".")[-1]
  324. _attach = {attachment_filemd5:_filemd5,
  325. attachment_filetype:_filetype,
  326. attachment_status:20,
  327. attachment_path:"%s/%s"%(_filemd5[:4],_path),
  328. attachment_crtime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  329. list_attachment.append(Attachment_postgres(_attach))
  330. else:
  331. log("getAttachments search in ots:%s"%(_filemd5))
  332. _attach = {attachment_filemd5:_filemd5}
  333. _attach_ots = attachment(_attach)
  334. _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls],True)
  335. if _attach_ots.getProperties().get(attachment_status) is not None:
  336. log("getAttachments find in ots:%s"%(_filemd5))
  337. list_attachment.append(Attachment_postgres(_attach_ots.getProperties()))
  338. return list_attachment
  339. except Exception as e:
  340. log("attachProcess comsumer error %s"%str(e))
  341. log(str(to_find_md5))
  342. traceback.print_exc()
  343. return []
  344. finally:
  345. self.attach_pool.putConnector(conn)
  346. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  347. q_size = self.queue_attachment.qsize()
  348. qsize_ocr = self.queue_attachment_ocr.qsize()
  349. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  350. log("queue_attachment:%d,queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(q_size,qsize_ocr,qsize_not_ocr))
  351. def flow_attachment_producer_comsumer(self):
  352. log("start flow_attachment comsumer")
  353. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1)
  354. mt.run()
  355. def set_queue(self,_dict):
  356. list_attach = _dict.get("list_attach")
  357. to_ocr = False
  358. for attach in list_attach:
  359. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  360. to_ocr = True
  361. break
  362. if to_ocr:
  363. self.queue_attachment_ocr.put(_dict,True)
  364. else:
  365. self.queue_attachment_not_ocr.put(_dict,True)
  366. def comsumer_handle(self,_dict,result_queue):
  367. try:
  368. frame = _dict["frame"]
  369. conn = _dict["conn"]
  370. message_id = frame.headers["message-id"]
  371. item = json.loads(frame.body)
  372. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  373. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  374. if len(page_attachments)==0:
  375. self.set_queue({"item":item,"list_attach":[],"message_id":message_id,"conn":conn})
  376. else:
  377. list_fileMd5 = []
  378. for _atta in page_attachments:
  379. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  380. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  381. self.set_queue({"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn})
  382. except Exception as e:
  383. traceback.print_exc()
  384. def remove_attachment_postgres(self):
  385. current_date = getCurrent_date(format="%Y-%m-%d")
  386. last_date = timeAdd(current_date,-30,format="%Y-%m-%d")
  387. sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
  388. conn = getConnection_postgres()
  389. cursor = conn.cursor()
  390. cursor.execute(sql)
  391. self.conn_attach.commit()
  392. conn.close()
  393. def start_flow_attachment(self):
  394. schedule = BlockingScheduler()
  395. schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  396. schedule.add_job(self.flow_attachment,"cron",second="*/10")
  397. schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
  398. schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
  399. schedule.start()
  400. class Dataflow_ActivteMQ_extract(Dataflow_extract):
  401. class ExtractListener():
  402. def __init__(self,conn,_func,*args,**kwargs):
  403. self.conn = conn
  404. self._func = _func
  405. def on_error(self, headers):
  406. log('received an error %s' % str(headers.body))
  407. def on_message(self, headers):
  408. try:
  409. message_id = headers.headers["message-id"]
  410. body = headers.body
  411. log("get message %s crtime:%s"%(message_id,json.loads(body)["crtime"]))
  412. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  413. except Exception as e:
  414. pass
  415. def __del__(self):
  416. self.conn.disconnect()
  417. def __init__(self):
  418. Dataflow_extract.__init__(self)
  419. self.industy_url = "http://127.0.0.1:15000/industry_extract"
  420. self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",11],
  421. ["http://192.168.0.115:15030/content_extract",10]
  422. ]
  423. self.mq_extract = "/queue/dataflow_extract"
  424. whole_weight = 0
  425. for _url,weight in self.extract_interfaces:
  426. whole_weight+= weight
  427. current_weight = 0
  428. for _i in range(len(self.extract_interfaces)):
  429. current_weight += self.extract_interfaces[_i][1]
  430. self.extract_interfaces[_i][1] = current_weight/whole_weight
  431. self.comsumer_count = 20
  432. for _i in range(self.comsumer_count):
  433. listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
  434. createComsumer(listener_extract,self.mq_extract)
  435. self.conn_mq = getConnect_activateMQ()
  436. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  437. def getExtract_url(self):
  438. _r = random.random()
  439. for _i in range(len(self.extract_interfaces)):
  440. if _r<=self.extract_interfaces[_i][1]:
  441. return self.extract_interfaces[_i][0]
  442. def request_extract_interface(self,json,headers):
  443. # _i = random.randint(0,len(self.extract_interfaces)-1)
  444. # _i = 0
  445. # _url = self.extract_interfaces[_i]
  446. _url = self.getExtract_url()
  447. resp = requests.post(_url,json=json,headers=headers)
  448. return resp
  449. def request_industry_interface(self,json,headers):
  450. resp = requests.post(self.industy_url,json=json,headers=headers)
  451. return resp
  452. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  453. q_size = self.queue_extract.qsize()
  454. log("queue extract size:%d"%(q_size))
  455. def flow_extract(self,):
  456. self.comsumer()
  457. def comsumer(self):
  458. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,50,1,True)
  459. mt.run()
  460. def comsumer_handle(self,_dict,result_queue):
  461. try:
  462. frame = _dict["frame"]
  463. conn = _dict["conn"]
  464. message_id = frame.headers["message-id"]
  465. subscription = frame.headers.setdefault('subscription', None)
  466. item = json.loads(frame.body)
  467. dtmp = Document_tmp(item)
  468. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  469. "docid":item.get("docid")})
  470. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  471. if len(_dochtmlcon)>200000:
  472. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  473. _soup = article_limit(_soup,200000)
  474. _dochtmlcon = str(_soup)
  475. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  476. _extract = Document_extract({})
  477. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  478. _extract.setValue(document_extract2_docid,item.get(document_docid))
  479. all_done = 1
  480. data = {}
  481. for k,v in item.items():
  482. data[k] = v
  483. data["timeout"] = 440
  484. data["doc_id"] = data.get(document_tmp_docid)
  485. data["content"] = data.get(document_tmp_dochtmlcon,"")
  486. if document_tmp_dochtmlcon in data:
  487. data.pop(document_tmp_dochtmlcon)
  488. data["title"] = data.get(document_tmp_doctitle,"")
  489. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  490. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  491. if all_done>0:
  492. resp = self.request_extract_interface(json=data,headers=self.header)
  493. if (resp.status_code >=200 and resp.status_code<=213):
  494. _extract.setValue(document_extract2_extract_json,resp.content.decode("utf8"),True)
  495. else:
  496. log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  497. all_done = -2
  498. # if all_done>0:
  499. # resp = self.request_industry_interface(json=data,headers=self.header)
  500. # if (resp.status_code >=200 and resp.status_code<=213):
  501. # _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  502. # else:
  503. # log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  504. # all_done = -3
  505. # _to_ack = False
  506. # if all_done>0 and len(_extract.getProperties().get(document_extract2_extract_json,""))<=2:
  507. # all_done = -4
  508. _extract.setValue(document_extract2_industry_json,"{}",True)
  509. try:
  510. if all_done!=1:
  511. sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  512. send_succeed = send_msg_toacmq(self.pool_mq,frame.body,self.mq_extract)
  513. #失败保存
  514. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  515. dtmp.setValue(document_tmp_status,60,True)
  516. if not dtmp.exists_row(self.ots_client):
  517. dtmp.update_row(self.ots_client)
  518. dhtml.update_row(self.ots_client)
  519. if send_succeed:
  520. _to_ack = True
  521. else:
  522. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  523. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  524. dtmp.update_row(self.ots_client)
  525. dhtml.update_row(self.ots_client)
  526. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  527. _extract.update_row(self.ots_client)
  528. _to_ack = True
  529. except Exception:
  530. traceback.print_exc()
  531. if _to_ack:
  532. ackMsg(conn,message_id,subscription)
  533. log("process %s docid:%d %s"%(str(_to_ack),data["doc_id"],str(all_done)))
  534. except Exception as e:
  535. traceback.print_exc()
  536. log("process %s docid: failed message_id:%s"%(data["doc_id"],message_id))
  537. if send_msg_toacmq(self.pool_mq,frame.body,self.mq_extract):
  538. ackMsg(conn,message_id,subscription)
  539. def start_flow_extract(self):
  540. schedule = BlockingScheduler()
  541. schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
  542. schedule.start()
  543. from multiprocessing import RLock
  544. docid_lock = RLock()
  545. conn_mysql = None
  546. def generateRangeDocid(nums):
  547. global conn_mysql
  548. with docid_lock:
  549. if conn_mysql is None:
  550. conn_mysql = getConnection_mysql()
  551. cursor = conn_mysql.cursor()
  552. sql = "select serial_value from b2c_serial_no where serial_name='DocumentIdSerial'"
  553. cursor.execute(sql)
  554. rows = cursor.fetchall()
  555. current_docid = rows[0][0]
  556. next_docid = current_docid+1
  557. update_docid = current_docid+nums
  558. sql = " update b2c_serial_no set serial_value=%d where serial_name='DocumentIdSerial'"%(update_docid)
  559. cursor.execute(sql)
  560. conn_mysql.commit()
  561. return next_docid
  562. # 自定义jsonEncoder
  563. class MyEncoder(json.JSONEncoder):
  564. def default(self, obj):
  565. if isinstance(obj, np.ndarray):
  566. return obj.tolist()
  567. elif isinstance(obj, bytes):
  568. return str(obj, encoding='utf-8')
  569. elif isinstance(obj, (np.float_, np.float16, np.float32,
  570. np.float64,Decimal)):
  571. return float(obj)
  572. elif isinstance(obj,str):
  573. return obj
  574. return json.JSONEncoder.default(self, obj)
  575. class Dataflow_init(Dataflow):
  576. class InitListener():
  577. def __init__(self,conn,*args,**kwargs):
  578. self.conn = conn
  579. self.get_count = 1000
  580. self.count = self.get_count
  581. self.begin_docid = None
  582. self.mq_attachment = "/queue/dataflow_attachment"
  583. self.mq_extract = "/queue/dataflow_extract"
  584. self.pool_mq1 = ConnectorPool(1,4,getConnect_activateMQ)
  585. def on_error(self, headers):
  586. log('received an error %s' % headers.body)
  587. def getRangeDocid(self):
  588. begin_docid = generateRangeDocid(self.get_count)
  589. self.begin_docid = begin_docid
  590. self.count = 0
  591. def getNextDocid(self):
  592. if self.count>=self.get_count:
  593. self.getRangeDocid()
  594. next_docid = self.begin_docid+self.count
  595. self.count += 1
  596. return next_docid
  597. def on_message(self, headers):
  598. next_docid = int(self.getNextDocid())
  599. partitionkey = int(next_docid%500+1)
  600. message_id = headers.headers["message-id"]
  601. body = json.loads(headers.body)
  602. body[document_tmp_partitionkey] = partitionkey
  603. body[document_tmp_docid] = next_docid
  604. page_attachments = body.get(document_tmp_attachment_path,"[]")
  605. _uuid = body.get(document_tmp_uuid,"")
  606. if page_attachments!="[]":
  607. status = random.randint(1,10)
  608. body[document_tmp_status] = status
  609. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment):
  610. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  611. ackMsg(self.conn,message_id)
  612. else:
  613. log("send_msg_error on init listener")
  614. else:
  615. status = random.randint(11,50)
  616. body[document_tmp_status] = status
  617. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract):
  618. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  619. ackMsg(self.conn,message_id)
  620. else:
  621. log("send_msg_error on init listener")
  622. def __del__(self):
  623. self.conn.disconnect()
  624. del self.pool_mq1
  625. def __init__(self):
  626. Dataflow.__init__(self)
  627. self.mq_init = "/queue/dataflow_init"
  628. self.mq_attachment = "/queue/dataflow_attachment"
  629. self.mq_extract = "/queue/dataflow_extract"
  630. self.pool_oracle = ConnectorPool(10,15,getConnection_oracle)
  631. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  632. self.ots_capacity = getConnect_ots_capacity()
  633. self.init_comsumer_counts = 2
  634. for i in range(self.init_comsumer_counts):
  635. listener = self.InitListener(getConnect_activateMQ())
  636. createComsumer(listener,self.mq_init)
  637. def temp2mq(self,object):
  638. conn_oracle = self.pool_oracle.getConnector()
  639. try:
  640. list_obj = object.select_rows(conn_oracle,type(object),object.table_name,[],limit=1000)
  641. for _obj in list_obj:
  642. ots_dict = _obj.getProperties_ots()
  643. if len(ots_dict.get("dochtmlcon",""))>500000:
  644. _obj.delete_row(conn_oracle)
  645. log("msg too long:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  646. continue
  647. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_init):
  648. #删除数据,上线放开
  649. _obj.delete_row(conn_oracle)
  650. else:
  651. log("send_msg_error111:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  652. except Exception as e:
  653. traceback.print_exc()
  654. finally:
  655. self.pool_oracle.putConnector(conn_oracle)
  656. def ots2mq(self):
  657. try:
  658. bool_query = BoolQuery(must_queries=[RangeQuery("status",1,51)])
  659. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  660. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  661. ColumnsToGet(return_type=ColumnReturnType.ALL))
  662. list_data = getRow_ots(rows)
  663. for _data in list_data:
  664. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  665. document_tmp_docid:_data.get(document_tmp_docid),
  666. document_tmp_status:0}
  667. _document = Document(_d)
  668. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  669. _document_html = Document(_data)
  670. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  671. if page_attachments!="[]":
  672. status = random.randint(1,10)
  673. _data[document_tmp_status] = status
  674. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  675. else:
  676. status = random.randint(11,50)
  677. _data[document_tmp_status] = status
  678. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  679. if send_succeed:
  680. _document.update_row(self.ots_client)
  681. else:
  682. log("send_msg_error2222")
  683. while next_token:
  684. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  685. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  686. ColumnsToGet(return_type=ColumnReturnType.ALL))
  687. list_data = getRow_ots(rows)
  688. for _data in list_data:
  689. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  690. document_tmp_docid:_data.get(document_tmp_docid),
  691. document_tmp_status:0}
  692. _document = Document(_d)
  693. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  694. _document_html = Document(_data)
  695. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  696. if page_attachments!="[]":
  697. status = random.randint(1,10)
  698. _data[document_tmp_status] = status
  699. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  700. else:
  701. status = random.randint(11,50)
  702. _data[document_tmp_status] = status
  703. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  704. if send_succeed:
  705. _document.update_row(self.ots_client)
  706. else:
  707. log("send_msg_error2222")
  708. except Exception as e:
  709. traceback.print_exc()
  710. def test_dump_docid(self):
  711. class TestDumpListener(ActiveMQListener):
  712. def on_message(self, headers):
  713. message_id = headers.headers["message-id"]
  714. body = headers.body
  715. self._queue.put(headers,True)
  716. ackMsg(self.conn,message_id)
  717. _queue = Queue()
  718. listener1 = TestDumpListener(getConnect_activateMQ(),_queue)
  719. listener2 = TestDumpListener(getConnect_activateMQ(),_queue)
  720. createComsumer(listener1,"/queue/dataflow_attachment")
  721. createComsumer(listener2,"/queue/dataflow_extract")
  722. time.sleep(10)
  723. list_item = []
  724. list_docid = []
  725. while 1:
  726. try:
  727. _item = _queue.get(timeout=2)
  728. list_item.append(_item)
  729. except Exception as e:
  730. break
  731. for item in list_item:
  732. _item = json.loads(item.body)
  733. list_docid.append(_item.get("docid"))
  734. log(list_docid[:10])
  735. log("len docid:%d set len:%d"%(len(list_docid),len(set(list_docid))))
  736. def start_dataflow_init(self):
  737. # self.test_dump_docid()
  738. from BaseDataMaintenance.model.oracle.CaiGouYiXiangTemp import CaiGouYiXiangTemp
  739. from BaseDataMaintenance.model.oracle.PaiMaiChuRangTemp import PaiMaiChuRangTemp
  740. from BaseDataMaintenance.model.oracle.ZhaoBiaoGongGaoTemp import ZhaoBiaoGongGaoTemp
  741. from BaseDataMaintenance.model.oracle.ZhaoBiaoYuGaoTemp import ZhaoBiaoYuGaoTemp
  742. from BaseDataMaintenance.model.oracle.ZhongBiaoXinXiTemp import ZhongBiaoXinXiTemp
  743. from BaseDataMaintenance.model.oracle.ZiShenJieGuoTemp import ZiShenJieGuoTemp
  744. from BaseDataMaintenance.model.oracle.ChanQuanJiaoYiTemp import ChanQuanJiaoYiTemp
  745. from BaseDataMaintenance.model.oracle.GongGaoBianGengTemp import GongGaoBianGeng
  746. from BaseDataMaintenance.model.oracle.KongZhiJiaTemp import KongZhiJiaTemp
  747. from BaseDataMaintenance.model.oracle.TuDiKuangChanTemp import TuDiKuangChanTemp
  748. from BaseDataMaintenance.model.oracle.ZhaoBiaoDaYiTemp import ZhaoBiaoDaYiTemp
  749. from BaseDataMaintenance.model.oracle.ZhaoBiaoWenJianTemp import ZhaoBiaoWenJianTemp
  750. schedule = BlockingScheduler()
  751. schedule.add_job(self.temp2mq,"cron",args=(CaiGouYiXiangTemp({}),),second="*/10")
  752. schedule.add_job(self.temp2mq,"cron",args=(PaiMaiChuRangTemp({}),),second="*/10")
  753. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoGongGaoTemp({}),),second="*/10")
  754. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoYuGaoTemp({}),),second="*/10")
  755. schedule.add_job(self.temp2mq,"cron",args=(ZhongBiaoXinXiTemp({}),),second="*/10")
  756. schedule.add_job(self.temp2mq,"cron",args=(ZiShenJieGuoTemp({}),),second="*/10")
  757. schedule.add_job(self.temp2mq,"cron",args=(ChanQuanJiaoYiTemp({}),),second="*/10")
  758. schedule.add_job(self.temp2mq,"cron",args=(GongGaoBianGeng({}),),second="*/10")
  759. schedule.add_job(self.temp2mq,"cron",args=(KongZhiJiaTemp({}),),second="*/10")
  760. schedule.add_job(self.temp2mq,"cron",args=(TuDiKuangChanTemp({}),),second="*/10")
  761. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10")
  762. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10")
  763. schedule.add_job(self.ots2mq,"cron",second="*/10")
  764. schedule.start()
  765. def transform_attachment():
  766. from BaseDataMaintenance.model.ots.attachment import attachment
  767. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  768. from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots
  769. from threading import Thread
  770. from queue import Queue
  771. task_queue = Queue()
  772. def comsumer(task_queue,pool_postgres):
  773. while 1:
  774. _dict = task_queue.get(True)
  775. try:
  776. attach = Attachment_postgres(_dict)
  777. if not attach.exists(pool_postgres):
  778. attach.insert_row(pool_postgres)
  779. except Exception as e:
  780. traceback.print_exc()
  781. def start_comsumer(task_queue):
  782. pool_postgres = ConnectorPool(10,30,getConnection_postgres)
  783. comsumer_count = 30
  784. list_thread = []
  785. for i in range(comsumer_count):
  786. _t = Thread(target=comsumer,args=(task_queue,pool_postgres))
  787. list_thread.append(_t)
  788. for _t in list_thread:
  789. _t.start()
  790. ots_client = getConnect_ots()
  791. _thread = Thread(target=start_comsumer,args=(task_queue,))
  792. _thread.start()
  793. bool_query = BoolQuery(must_queries=[
  794. RangeQuery(attachment_crtime,"2022-05-06"),
  795. BoolQuery(should_queries=[TermQuery(attachment_status,10),
  796. TermQuery(attachment_status,ATTACHMENT_TOOLARGE)])
  797. ])
  798. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  799. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(attachment_crtime,sort_order=SortOrder.DESC)]),get_total_count=True,limit=100),
  800. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  801. list_dict = getRow_ots(rows)
  802. for _dict in list_dict:
  803. task_queue.put(_dict,True)
  804. _count = len(list_dict)
  805. while next_token:
  806. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  807. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  808. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  809. list_dict = getRow_ots(rows)
  810. for _dict in list_dict:
  811. task_queue.put(_dict,True)
  812. _count += len(list_dict)
  813. print("%d/%d,queue:%d"%(_count,total_count,task_queue.qsize()))
  814. def del_test_doc():
  815. ots_client = getConnect_ots()
  816. bool_query = BoolQuery(must_queries=[RangeQuery("docid",range_to=0)])
  817. list_data = []
  818. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  819. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  820. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  821. print(total_count)
  822. list_row = getRow_ots(rows)
  823. list_data.extend(list_row)
  824. while next_token:
  825. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  826. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  827. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  828. list_row = getRow_ots(rows)
  829. list_data.extend(list_row)
  830. for _row in list_data:
  831. _doc = Document_tmp(_row)
  832. _doc.delete_row(ots_client)
  833. _html = Document_html(_row)
  834. if _html.exists_row(ots_client):
  835. _html.delete_row(ots_client)
  836. def fixDoc_to_queue_extract():
  837. pool_mq = ConnectorPool(10,20,getConnect_activateMQ)
  838. try:
  839. ots_client = getConnect_ots()
  840. ots_capacity = getConnect_ots_capacity()
  841. bool_query = BoolQuery(must_queries=[
  842. RangeQuery("crtime","2022-05-31"),
  843. TermQuery("docchannel",114)
  844. ])
  845. list_data = []
  846. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  847. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  848. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  849. print(total_count)
  850. list_row = getRow_ots(rows)
  851. list_data.extend(list_row)
  852. _count = len(list_row)
  853. while next_token:
  854. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  855. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  856. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  857. list_row = getRow_ots(rows)
  858. list_data.extend(list_row)
  859. _count = len(list_data)
  860. print("%d/%d"%(_count,total_count))
  861. task_queue = Queue()
  862. for _row in list_data:
  863. if "all_columns" in _row:
  864. _row.pop("all_columns")
  865. _html = Document(_row)
  866. task_queue.put(_html)
  867. def _handle(item,result_queue):
  868. _html = item
  869. _html.fix_columns(ots_capacity,["dochtmlcon"],True)
  870. print(_html.getProperties().get(document_tmp_docid))
  871. send_msg_toacmq(pool_mq,json.dumps(_html.getProperties()),"/queue/dataflow_extract")
  872. mt = MultiThreadHandler(task_queue,_handle,None,30)
  873. mt.run()
  874. except Exception as e:
  875. traceback.print_exc()
  876. finally:
  877. pool_mq.destory()
  878. def check_data_synchronization():
  879. # filepath = "C:\\Users\\Administrator\\Desktop\\to_check.log"
  880. # list_uuid = []
  881. # _regrex = "delete\s+(?P<tablename>[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  882. # with open(filepath,"r",encoding="utf8") as f:
  883. # while 1:
  884. # _line = f.readline()
  885. # if not _line:
  886. # break
  887. # _match = re.search(_regrex,_line)
  888. # if _match is not None:
  889. # _uuid = _match.groupdict().get("uuid")
  890. # tablename = _match.groupdict.get("tablename")
  891. # if _uuid is not None:
  892. # list_uuid.append({"uuid":_uuid,"tablename":tablename})
  893. # print("total_count:",len(list_uuid))
  894. import pandas as pd
  895. from BaseDataMaintenance.common.Utils import load
  896. task_queue = Queue()
  897. list_data = []
  898. df_data = load("uuid.pk")
  899. # df_data = pd.read_excel("check.xlsx")
  900. for uuid,tablename in zip(df_data["uuid"],df_data["tablename"]):
  901. _dict = {"uuid":uuid,
  902. "tablename":tablename}
  903. list_data.append(_dict)
  904. task_queue.put(_dict)
  905. print("qsize:",task_queue.qsize())
  906. ots_client = getConnect_ots()
  907. def _handle(_item,result_queue):
  908. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  909. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  910. SearchQuery(bool_query,get_total_count=True),
  911. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  912. _item["exists"] = total_count
  913. mt = MultiThreadHandler(task_queue,_handle,None,30)
  914. mt.run()
  915. df_data = {"uuid":[],
  916. "tablename":[],
  917. "exists":[]}
  918. for _data in list_data:
  919. if _data["exists"]==0:
  920. for k,v in df_data.items():
  921. v.append(_data.get(k))
  922. import pandas as pd
  923. df2 = pd.DataFrame(df_data)
  924. df2.to_excel("check1.xlsx")
  925. current_path = os.path.abspath(os.path.dirname(__file__))
  926. def fixDoc_to_queue_init(filename=""):
  927. import pandas as pd
  928. from BaseDataMaintenance.model.oracle.GongGaoTemp import dict_oracle2ots
  929. if filename=="":
  930. filename = os.path.join(current_path,"check.xlsx")
  931. df = pd.read_excel(filename)
  932. dict_oracle2ots.pop("docchannel")
  933. row_name = ",".join(list(dict_oracle2ots.keys()))
  934. conn = getConnection_oracle()
  935. cursor = conn.cursor()
  936. for uuid,tablename,_exists in zip(df["uuid"],df["tablename"],df["exists"]):
  937. if _exists==0:
  938. _source = str(tablename).replace("_TEMP","")
  939. sql = " insert into %s(%s) select %s from %s where id='%s' "%(tablename,row_name,row_name,_source,uuid)
  940. cursor.execute(sql)
  941. log(sql)
  942. conn.commit()
  943. if __name__ == '__main__':
  944. # di = Dataflow_init()
  945. # di.start_dataflow_init()
  946. # transform_attachment()
  947. # del_test_doc()
  948. # de = Dataflow_ActivteMQ_extract()
  949. # de.start_flow_extract()
  950. # fixDoc_to_queue_extract()
  951. # check_data_synchronization()
  952. fixDoc_to_queue_init()