dataflow_mq.py 52 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192
  1. from BaseDataMaintenance.maintenance.dataflow import *
  2. from BaseDataMaintenance.common.activateMQUtils import *
  3. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ,getConnection_postgres,getConnection_mysql,getConnection_oracle
  4. from BaseDataMaintenance.dataSource.setttings import *
  5. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  6. import os
  7. from BaseDataMaintenance.common.ossUtils import *
  8. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  9. from BaseDataMaintenance.model.ots.document import Document
  10. from BaseDataMaintenance.common.Utils import article_limit
  11. class ActiveMQListener():
  12. def __init__(self,conn,_queue,*args,**kwargs):
  13. self.conn = conn
  14. self._queue = _queue
  15. def on_error(self, headers):
  16. log("===============")
  17. log('received an error %s' % str(headers.body))
  18. def on_message(self, headers):
  19. log("====message====")
  20. message_id = headers.headers["message-id"]
  21. body = headers.body
  22. self._queue.put({"frame":headers,"conn":self.conn},True)
  23. def __del__(self):
  24. self.conn.disconnect()
  25. class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
  26. def __init__(self):
  27. Dataflow_attachment.__init__(self)
  28. self.mq_attachment = "/queue/dataflow_attachment"
  29. self.mq_attachment_failed = "/queue/dataflow_attachment_failed"
  30. self.mq_extract = "/queue/dataflow_extract"
  31. self.comsumer_count = 80
  32. self.retry_comsumer_count = 10
  33. self.retry_times = 5
  34. for _i in range(self.comsumer_count):
  35. listener_attachment = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  36. createComsumer(listener_attachment,self.mq_attachment)
  37. self.attach_pool = ConnectorPool(10,30,getConnection_postgres)
  38. self.conn_mq = getConnect_activateMQ()
  39. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  40. def process_failed_attachment(self):
  41. from BaseDataMaintenance.java.MQInfo import getQueueSize
  42. attachment_size = getQueueSize("dataflow_attachment")
  43. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  44. if attachment_size<100 and failed_attachment_size>0:
  45. list_comsumer = []
  46. for _i in range(self.retry_comsumer_count):
  47. listener_attachment = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  48. list_comsumer.append(listener_attachment)
  49. createComsumer(listener_attachment,self.mq_attachment_failed)
  50. while 1:
  51. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  52. if failed_attachment_size==0:
  53. break
  54. time.sleep(600)
  55. for _c in list_comsumer:
  56. _c.conn.disconnect()
  57. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  58. try:
  59. list_html = []
  60. swf_urls = []
  61. _not_failed = True
  62. for _attach in list_attach:
  63. #测试全跑
  64. _filemd5 = _attach.getProperties().get(attachment_filemd5)
  65. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  66. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  67. if _html is None:
  68. _html = ""
  69. list_html.append({attachment_filemd5:_filemd5,
  70. "html":_html})
  71. else:
  72. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  73. if not _succeed:
  74. _not_failed = False
  75. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  76. if _html is None:
  77. _html = ""
  78. list_html.append({attachment_filemd5:_filemd5,
  79. "html":_html})
  80. if _attach.getProperties().get(attachment_filetype)=="swf":
  81. swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  82. if not _not_failed:
  83. return False,list_html,swf_urls
  84. return True,list_html,swf_urls
  85. except requests.ConnectionError as e1:
  86. raise e1
  87. except Exception as e:
  88. return False,list_html,swf_urls
  89. def attachment_recognize(self,_dict,result_queue):
  90. '''
  91. 识别附件内容
  92. :param _dict: 附件内容
  93. :param result_queue:
  94. :return:
  95. '''
  96. try:
  97. item = _dict.get("item")
  98. list_attach = _dict.get("list_attach")
  99. conn = _dict["conn"]
  100. message_id = _dict.get("message_id")
  101. _retry_times = item.get("retry_times",0)
  102. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  103. "docid":item.get("docid")})
  104. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  105. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  106. dtmp = Document_tmp(item)
  107. #调用识别接口
  108. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  109. _to_ack = False
  110. if not _succeed and _retry_times<self.retry_times:
  111. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  112. item["retry_times"] = _retry_times+1
  113. #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
  114. if item["retry_times"]>=self.retry_times:
  115. send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
  116. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
  117. #失败保存
  118. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  119. dtmp.setValue(document_tmp_status,0,True)
  120. if not dtmp.exists_row(self.ots_client):
  121. dtmp.update_row(self.ots_client)
  122. dhtml.update_row(self.ots_client)
  123. if send_succeed:
  124. _to_ack = True
  125. else:
  126. try:
  127. log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),"==".join([a[:10] for a in list_html])))
  128. dhtml.updateSWFImages(swf_urls)
  129. dhtml.updateAttachment(list_html)
  130. dtmp.setValue(document_tmp_attachment_extract_status,1,True)
  131. dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
  132. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
  133. if send_succeed:
  134. _to_ack = True
  135. except Exception as e:
  136. traceback.print_exc()
  137. if _to_ack:
  138. ackMsg(conn,message_id)
  139. log("document:%d get attachments with result:%s %s retry_times:%d"%(item.get("docid"),str(_succeed),str(_to_ack),_retry_times))
  140. except Exception as e:
  141. if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment):
  142. ackMsg(conn,message_id)
  143. def request_attachment_interface(self,attach,_dochtmlcon):
  144. filemd5 = attach.getProperties().get(attachment_filemd5)
  145. _status = attach.getProperties().get(attachment_status)
  146. _filetype = attach.getProperties().get(attachment_filetype)
  147. _path = attach.getProperties().get(attachment_path)
  148. _uuid = uuid4()
  149. objectPath = attach.getProperties().get(attachment_path)
  150. docids = attach.getProperties().get(attachment_docids)
  151. if objectPath is None:
  152. relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex)
  153. else:
  154. relative_path = objectPath[5:]
  155. localpath = "/FileInfo/%s"%(relative_path)
  156. if not os.path.exists(localpath):
  157. if not os.path.exists(os.path.dirname(localpath)):
  158. os.makedirs(os.path.dirname(localpath))
  159. local_exists = False
  160. else:
  161. local_exists = True
  162. _size = os.path.getsize(localpath)
  163. not_failed_flag = True
  164. try:
  165. d_start_time = time.time()
  166. if not local_exists:
  167. log("md5:%s path:%s not exists,start downloading"%(filemd5,objectPath[5:]))
  168. try:
  169. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  170. except Exception as e:
  171. download_succeed = False
  172. else:
  173. log("md5:%s path:%s exists"%(filemd5,objectPath[5:]))
  174. if not (local_exists or download_succeed):
  175. _ots_attach = attachment(attach.getProperties_ots())
  176. _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True)
  177. log("md5:%s path:%s file not in local or oss,search ots.attachment"%(filemd5,objectPath[5:]))
  178. if _ots_attach.getProperties().get(attachment_attachmenthtml,"")!="":
  179. attach.setValue(attachment_attachmenthtml,_ots_attach.getProperties().get(attachment_attachmenthtml,""))
  180. attach.setValue(attachment_attachmentcon,_ots_attach.getProperties().get(attachment_attachmentcon,""))
  181. attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,""))
  182. attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,""))
  183. if attach.exists(self.attach_pool):
  184. attach.update_row(self.attach_pool)
  185. else:
  186. attach.insert_row(self.attach_pool)
  187. try:
  188. if os.exists(localpath):
  189. os.remove(localpath)
  190. except Exception as e:
  191. pass
  192. return True
  193. if _ots_exists:
  194. objectPath = attach.getProperties().get(attachment_path)
  195. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  196. if download_succeed:
  197. log("md5:%s path:%s download file from oss succeed"%(filemd5,objectPath[5:]))
  198. else:
  199. log("md5:%s path:%s download file from ots failed=="%(filemd5,objectPath[5:]))
  200. else:
  201. log("md5:%s path:%s not found in ots"%(filemd5,objectPath[5:]))
  202. if local_exists or download_succeed:
  203. _size = os.path.getsize(localpath)
  204. attach.setValue(attachment_size,_size,True)
  205. if _size>ATTACHMENT_LARGESIZE:
  206. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE,True)
  207. log("attachment :%s of path:%s to large"%(filemd5,_path))
  208. _ots_attach = attachment(attach.getProperties_ots())
  209. _ots_attach.update_row(self.ots_client)
  210. #更新postgres
  211. if attach.exists(self.attach_pool):
  212. attach.update_row(self.attach_pool)
  213. else:
  214. attach.insert_row(self.attach_pool)
  215. if local_exists:
  216. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  217. os.remove(localpath)
  218. return True
  219. time_download = time.time()-d_start_time
  220. #调用接口处理结果
  221. start_time = time.time()
  222. _filetype = attach.getProperties().get(attachment_filetype)
  223. # _data_base64 = base64.b64encode(open(localpath,"rb").read())
  224. # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
  225. _success,_html,swf_images = getAttachDealInterface(None,_filetype,path=localpath)
  226. log("process filemd5:%s %s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
  227. if _success:
  228. if len(_html)<5:
  229. _html = ""
  230. else:
  231. sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  232. _html = ""
  233. return False
  234. swf_images = eval(swf_images)
  235. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  236. swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  237. if len(swf_urls)==0:
  238. objectPath = attach.getProperties().get(attachment_path,"")
  239. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  240. if not os.path.exists(swf_dir):
  241. os.mkdir(swf_dir)
  242. for _i in range(len(swf_images)):
  243. _base = swf_images[_i]
  244. _base = base64.b64decode(_base)
  245. filename = "swf_page_%d.png"%(_i)
  246. filepath = os.path.join(swf_dir,filename)
  247. with open(filepath,"wb") as f:
  248. f.write(_base)
  249. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  250. if os.path.exists(swf_dir):
  251. os.rmdir(swf_dir)
  252. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  253. if re.search("<td",_html) is not None:
  254. attach.setValue(attachment_has_table,1,True)
  255. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  256. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  257. if _file_title!="":
  258. attach.setValue(attachment_file_title,_file_title,True)
  259. if filelink!="":
  260. attach.setValue(attachment_file_link,filelink,True)
  261. attach.setValue(attachment_attachmenthtml,_html,True)
  262. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  263. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  264. attach.setValue(attachment_recsize,len(_html),True)
  265. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  266. #更新ots
  267. _ots_attach = attachment(attach.getProperties_ots())
  268. _ots_attach.update_row(self.ots_client) #线上再开放更新
  269. #更新postgres
  270. if attach.exists(self.attach_pool):
  271. attach.update_row(self.attach_pool)
  272. else:
  273. attach.insert_row(self.attach_pool)
  274. if local_exists:
  275. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  276. try:
  277. if os.exists(localpath):
  278. os.remove(localpath)
  279. except Exception as e:
  280. pass
  281. return True
  282. else:
  283. return True
  284. except requests.ConnectionError as e1:
  285. raise e1
  286. except oss2.exceptions.NotFound as e:
  287. return True
  288. except Exception as e:
  289. traceback.print_exc()
  290. def flow_attachment(self):
  291. self.flow_attachment_producer()
  292. self.flow_attachment_producer_comsumer()
  293. def getAttachPath(self,filemd5,_dochtmlcon):
  294. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  295. _find = _soup.find("a",attrs={"data":filemd5})
  296. filelink = ""
  297. if _find is None:
  298. _find = _soup.find("img",attrs={"data":filemd5})
  299. if _find is not None:
  300. filelink = _find.attrs.get("src","")
  301. else:
  302. filelink = _find.attrs.get("href","")
  303. _path = filelink.split("/file")
  304. if len(_path)>1:
  305. return _path[1]
  306. def getAttachments(self,list_filemd5,_dochtmlcon):
  307. conn = self.attach_pool.getConnector()
  308. #搜索postgres
  309. try:
  310. to_find_md5 = []
  311. for _filemd5 in list_filemd5[:50]:
  312. if _filemd5 is not None:
  313. to_find_md5.append(_filemd5)
  314. conditions = ["filemd5 in ('%s')"%("','".join(to_find_md5))]
  315. list_attachment = Attachment_postgres.select_rows(conn,Attachment_postgres,"attachment",conditions)
  316. log("select localpath database %d/%d"%(len(list_attachment),len(to_find_md5)))
  317. set_md5 = set()
  318. for _attach in list_attachment:
  319. set_md5.add(_attach.getProperties().get(attachment_filemd5))
  320. list_not_in_md5 = []
  321. for _filemd5 in to_find_md5:
  322. if _filemd5 not in set_md5:
  323. list_not_in_md5.append(_filemd5)
  324. _path = self.getAttachPath(_filemd5,_dochtmlcon)
  325. if _path is not None:
  326. _filetype = _path.split(".")[-1]
  327. _attach = {attachment_filemd5:_filemd5,
  328. attachment_filetype:_filetype,
  329. attachment_status:20,
  330. attachment_path:"%s/%s"%(_filemd5[:4],_path),
  331. attachment_crtime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  332. list_attachment.append(Attachment_postgres(_attach))
  333. else:
  334. log("getAttachments search in ots:%s"%(_filemd5))
  335. _attach = {attachment_filemd5:_filemd5}
  336. _attach_ots = attachment(_attach)
  337. _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls],True)
  338. if _attach_ots.getProperties().get(attachment_status) is not None:
  339. log("getAttachments find in ots:%s"%(_filemd5))
  340. list_attachment.append(Attachment_postgres(_attach_ots.getProperties()))
  341. return list_attachment
  342. except Exception as e:
  343. log("attachProcess comsumer error %s"%str(e))
  344. log(str(to_find_md5))
  345. traceback.print_exc()
  346. return []
  347. finally:
  348. self.attach_pool.putConnector(conn)
  349. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  350. q_size = self.queue_attachment.qsize()
  351. qsize_ocr = self.queue_attachment_ocr.qsize()
  352. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  353. log("queue_attachment:%d,queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(q_size,qsize_ocr,qsize_not_ocr))
  354. def flow_attachment_producer_comsumer(self):
  355. log("start flow_attachment comsumer")
  356. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1)
  357. mt.run()
  358. def set_queue(self,_dict):
  359. list_attach = _dict.get("list_attach")
  360. to_ocr = False
  361. for attach in list_attach:
  362. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  363. to_ocr = True
  364. break
  365. if to_ocr:
  366. self.queue_attachment_ocr.put(_dict,True)
  367. else:
  368. self.queue_attachment_not_ocr.put(_dict,True)
  369. def comsumer_handle(self,_dict,result_queue):
  370. try:
  371. frame = _dict["frame"]
  372. conn = _dict["conn"]
  373. message_id = frame.headers["message-id"]
  374. item = json.loads(frame.body)
  375. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  376. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  377. if len(page_attachments)==0:
  378. self.set_queue({"item":item,"list_attach":[],"message_id":message_id,"conn":conn})
  379. else:
  380. list_fileMd5 = []
  381. for _atta in page_attachments:
  382. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  383. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  384. self.set_queue({"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn})
  385. except Exception as e:
  386. traceback.print_exc()
  387. def remove_attachment_postgres(self):
  388. current_date = getCurrent_date(format="%Y-%m-%d")
  389. last_date = timeAdd(current_date,-30,format="%Y-%m-%d")
  390. sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
  391. conn = getConnection_postgres()
  392. cursor = conn.cursor()
  393. cursor.execute(sql)
  394. self.conn_attach.commit()
  395. conn.close()
  396. def start_flow_attachment(self):
  397. schedule = BlockingScheduler()
  398. schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  399. schedule.add_job(self.flow_attachment,"cron",second="*/10")
  400. schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
  401. schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
  402. schedule.start()
  403. class Dataflow_ActivteMQ_extract(Dataflow_extract):
  404. class ExtractListener():
  405. def __init__(self,conn,_func,*args,**kwargs):
  406. self.conn = conn
  407. self._func = _func
  408. def on_error(self, headers):
  409. log('received an error %s' % str(headers.body))
  410. def on_message(self, headers):
  411. try:
  412. message_id = headers.headers["message-id"]
  413. body = headers.body
  414. log("get message %s crtime:%s"%(message_id,json.loads(body)["crtime"]))
  415. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  416. except Exception as e:
  417. pass
  418. def __del__(self):
  419. self.conn.disconnect()
  420. def __init__(self):
  421. Dataflow_extract.__init__(self)
  422. self.industy_url = "http://127.0.0.1:15000/industry_extract"
  423. self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",15],
  424. ["http://192.168.0.115:15030/content_extract",10]
  425. ]
  426. self.mq_extract = "/queue/dataflow_extract"
  427. self.mq_extract_failed = "/queue/dataflow_extract_failed"
  428. whole_weight = 0
  429. for _url,weight in self.extract_interfaces:
  430. whole_weight+= weight
  431. current_weight = 0
  432. for _i in range(len(self.extract_interfaces)):
  433. current_weight += self.extract_interfaces[_i][1]
  434. self.extract_interfaces[_i][1] = current_weight/whole_weight
  435. self.comsumer_count = 20
  436. for _i in range(self.comsumer_count):
  437. listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
  438. createComsumer(listener_extract,self.mq_extract)
  439. self.conn_mq = getConnect_activateMQ()
  440. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  441. def getExtract_url(self):
  442. _r = random.random()
  443. for _i in range(len(self.extract_interfaces)):
  444. if _r<=self.extract_interfaces[_i][1]:
  445. return self.extract_interfaces[_i][0]
  446. def request_extract_interface(self,json,headers):
  447. # _i = random.randint(0,len(self.extract_interfaces)-1)
  448. # _i = 0
  449. # _url = self.extract_interfaces[_i]
  450. _url = self.getExtract_url()
  451. resp = requests.post(_url,json=json,headers=headers)
  452. return resp
  453. def request_industry_interface(self,json,headers):
  454. resp = requests.post(self.industy_url,json=json,headers=headers)
  455. return resp
  456. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  457. q_size = self.queue_extract.qsize()
  458. log("queue extract size:%d"%(q_size))
  459. def flow_extract(self,):
  460. self.comsumer()
  461. def comsumer(self):
  462. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,50,1,True)
  463. mt.run()
  464. def comsumer_handle(self,_dict,result_queue):
  465. try:
  466. frame = _dict["frame"]
  467. conn = _dict["conn"]
  468. message_id = frame.headers["message-id"]
  469. subscription = frame.headers.setdefault('subscription', None)
  470. item = json.loads(frame.body)
  471. dtmp = Document_tmp(item)
  472. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  473. "docid":item.get("docid")})
  474. extract_times = item.get("extract_times",0)+1
  475. item["extract_times"] = extract_times
  476. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  477. if len(_dochtmlcon)>200000:
  478. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  479. _soup = article_limit(_soup,200000)
  480. _dochtmlcon = str(_soup)
  481. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  482. _extract = Document_extract({})
  483. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  484. _extract.setValue(document_extract2_docid,item.get(document_docid))
  485. all_done = 1
  486. data = {}
  487. for k,v in item.items():
  488. data[k] = v
  489. data["timeout"] = 440
  490. data["doc_id"] = data.get(document_tmp_docid)
  491. data["content"] = data.get(document_tmp_dochtmlcon,"")
  492. if document_tmp_dochtmlcon in data:
  493. data.pop(document_tmp_dochtmlcon)
  494. data["title"] = data.get(document_tmp_doctitle,"")
  495. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  496. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  497. if all_done>0:
  498. resp = self.request_extract_interface(json=data,headers=self.header)
  499. if (resp.status_code >=200 and resp.status_code<=213):
  500. _extract.setValue(document_extract2_extract_json,resp.content.decode("utf8"),True)
  501. else:
  502. log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  503. all_done = -2
  504. # if all_done>0:
  505. # resp = self.request_industry_interface(json=data,headers=self.header)
  506. # if (resp.status_code >=200 and resp.status_code<=213):
  507. # _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  508. # else:
  509. # log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  510. # all_done = -3
  511. # _to_ack = False
  512. # if all_done>0 and len(_extract.getProperties().get(document_extract2_extract_json,""))<=2:
  513. # all_done = -4
  514. _extract.setValue(document_extract2_industry_json,"{}",True)
  515. try:
  516. if all_done!=1:
  517. sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  518. if extract_times>2:
  519. #transform to the extract_failed queue
  520. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  521. #process as succeed
  522. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  523. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  524. dtmp.update_row(self.ots_client)
  525. dhtml.update_row(self.ots_client)
  526. #replace as {}
  527. _extract.setValue(document_extract2_extract_json,"{}",True)
  528. _extract.setValue(document_extract2_industry_json,"{}",True)
  529. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  530. _extract.update_row(self.ots_client)
  531. _to_ack = True
  532. else:
  533. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
  534. #失败保存
  535. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  536. dtmp.setValue(document_tmp_status,60,True)
  537. if not dtmp.exists_row(self.ots_client):
  538. dtmp.update_row(self.ots_client)
  539. dhtml.update_row(self.ots_client)
  540. if send_succeed:
  541. _to_ack = True
  542. else:
  543. #process succeed
  544. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  545. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  546. dtmp.update_row(self.ots_client)
  547. dhtml.update_row(self.ots_client)
  548. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  549. _extract.update_row(self.ots_client)
  550. _to_ack = True
  551. except Exception:
  552. traceback.print_exc()
  553. if _to_ack:
  554. ackMsg(conn,message_id,subscription)
  555. log("process %s docid:%d %s"%(str(_to_ack),data["doc_id"],str(all_done)))
  556. except Exception as e:
  557. traceback.print_exc()
  558. sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
  559. log("process %s docid: failed message_id:%s"%(data["doc_id"],message_id))
  560. if extract_times>2:
  561. #transform to the extract_failed queue
  562. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  563. #process as succeed
  564. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  565. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  566. dtmp.update_row(self.ots_client)
  567. dhtml.update_row(self.ots_client)
  568. #replace as {}
  569. _extract.setValue(document_extract2_extract_json,"{}",True)
  570. _extract.setValue(document_extract2_industry_json,"{}",True)
  571. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  572. _extract.update_row(self.ots_client)
  573. ackMsg(conn,message_id,subscription)
  574. else:
  575. #transform to the extract queue
  576. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  577. ackMsg(conn,message_id,subscription)
  578. def start_flow_extract(self):
  579. schedule = BlockingScheduler()
  580. schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
  581. schedule.start()
  582. from multiprocessing import RLock
  583. docid_lock = RLock()
  584. conn_mysql = None
  585. def generateRangeDocid(nums):
  586. global conn_mysql
  587. with docid_lock:
  588. if conn_mysql is None:
  589. conn_mysql = getConnection_mysql()
  590. cursor = conn_mysql.cursor()
  591. sql = "select serial_value from b2c_serial_no where serial_name='DocumentIdSerial'"
  592. cursor.execute(sql)
  593. rows = cursor.fetchall()
  594. current_docid = rows[0][0]
  595. next_docid = current_docid+1
  596. update_docid = current_docid+nums
  597. sql = " update b2c_serial_no set serial_value=%d where serial_name='DocumentIdSerial'"%(update_docid)
  598. cursor.execute(sql)
  599. conn_mysql.commit()
  600. return next_docid
  601. # 自定义jsonEncoder
  602. class MyEncoder(json.JSONEncoder):
  603. def default(self, obj):
  604. if isinstance(obj, np.ndarray):
  605. return obj.tolist()
  606. elif isinstance(obj, bytes):
  607. return str(obj, encoding='utf-8')
  608. elif isinstance(obj, (np.float_, np.float16, np.float32,
  609. np.float64,Decimal)):
  610. return float(obj)
  611. elif isinstance(obj,str):
  612. return obj
  613. return json.JSONEncoder.default(self, obj)
  614. class Dataflow_init(Dataflow):
  615. class InitListener():
  616. def __init__(self,conn,*args,**kwargs):
  617. self.conn = conn
  618. self.get_count = 1000
  619. self.count = self.get_count
  620. self.begin_docid = None
  621. self.mq_attachment = "/queue/dataflow_attachment"
  622. self.mq_extract = "/queue/dataflow_extract"
  623. self.pool_mq1 = ConnectorPool(1,4,getConnect_activateMQ)
  624. def on_error(self, headers):
  625. log('received an error %s' % headers.body)
  626. def getRangeDocid(self):
  627. begin_docid = generateRangeDocid(self.get_count)
  628. self.begin_docid = begin_docid
  629. self.count = 0
  630. def getNextDocid(self):
  631. if self.count>=self.get_count:
  632. self.getRangeDocid()
  633. next_docid = self.begin_docid+self.count
  634. self.count += 1
  635. return next_docid
  636. def on_message(self, headers):
  637. next_docid = int(self.getNextDocid())
  638. partitionkey = int(next_docid%500+1)
  639. message_id = headers.headers["message-id"]
  640. body = json.loads(headers.body)
  641. body[document_tmp_partitionkey] = partitionkey
  642. body[document_tmp_docid] = next_docid
  643. page_attachments = body.get(document_tmp_attachment_path,"[]")
  644. _uuid = body.get(document_tmp_uuid,"")
  645. if page_attachments!="[]":
  646. status = random.randint(1,10)
  647. body[document_tmp_status] = status
  648. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment):
  649. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  650. ackMsg(self.conn,message_id)
  651. else:
  652. log("send_msg_error on init listener")
  653. else:
  654. status = random.randint(11,50)
  655. body[document_tmp_status] = status
  656. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract):
  657. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  658. ackMsg(self.conn,message_id)
  659. else:
  660. log("send_msg_error on init listener")
  661. def __del__(self):
  662. self.conn.disconnect()
  663. del self.pool_mq1
  664. def __init__(self):
  665. Dataflow.__init__(self)
  666. self.mq_init = "/queue/dataflow_init"
  667. self.mq_attachment = "/queue/dataflow_attachment"
  668. self.mq_extract = "/queue/dataflow_extract"
  669. self.pool_oracle = ConnectorPool(10,15,getConnection_oracle)
  670. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  671. self.ots_capacity = getConnect_ots_capacity()
  672. self.init_comsumer_counts = 2
  673. for i in range(self.init_comsumer_counts):
  674. listener = self.InitListener(getConnect_activateMQ())
  675. createComsumer(listener,self.mq_init)
  676. def temp2mq(self,object):
  677. conn_oracle = self.pool_oracle.getConnector()
  678. try:
  679. list_obj = object.select_rows(conn_oracle,type(object),object.table_name,[],limit=1000)
  680. for _obj in list_obj:
  681. ots_dict = _obj.getProperties_ots()
  682. if len(ots_dict.get("dochtmlcon",""))>500000:
  683. _obj.delete_row(conn_oracle)
  684. log("msg too long:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  685. continue
  686. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_init):
  687. #删除数据,上线放开
  688. _obj.delete_row(conn_oracle)
  689. else:
  690. log("send_msg_error111:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  691. except Exception as e:
  692. traceback.print_exc()
  693. finally:
  694. self.pool_oracle.putConnector(conn_oracle)
  695. def ots2mq(self):
  696. try:
  697. bool_query = BoolQuery(must_queries=[RangeQuery("status",1,51)])
  698. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  699. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  700. ColumnsToGet(return_type=ColumnReturnType.ALL))
  701. list_data = getRow_ots(rows)
  702. for _data in list_data:
  703. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  704. document_tmp_docid:_data.get(document_tmp_docid),
  705. document_tmp_status:0}
  706. _document = Document(_d)
  707. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  708. _document_html = Document(_data)
  709. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  710. if page_attachments!="[]":
  711. status = random.randint(1,10)
  712. _data[document_tmp_status] = status
  713. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  714. else:
  715. status = random.randint(11,50)
  716. _data[document_tmp_status] = status
  717. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  718. if send_succeed:
  719. _document.update_row(self.ots_client)
  720. else:
  721. log("send_msg_error2222")
  722. while next_token:
  723. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  724. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  725. ColumnsToGet(return_type=ColumnReturnType.ALL))
  726. list_data = getRow_ots(rows)
  727. for _data in list_data:
  728. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  729. document_tmp_docid:_data.get(document_tmp_docid),
  730. document_tmp_status:0}
  731. _document = Document(_d)
  732. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  733. _document_html = Document(_data)
  734. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  735. if page_attachments!="[]":
  736. status = random.randint(1,10)
  737. _data[document_tmp_status] = status
  738. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  739. else:
  740. status = random.randint(11,50)
  741. _data[document_tmp_status] = status
  742. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  743. if send_succeed:
  744. _document.update_row(self.ots_client)
  745. else:
  746. log("send_msg_error2222")
  747. except Exception as e:
  748. traceback.print_exc()
  749. def test_dump_docid(self):
  750. class TestDumpListener(ActiveMQListener):
  751. def on_message(self, headers):
  752. message_id = headers.headers["message-id"]
  753. body = headers.body
  754. self._queue.put(headers,True)
  755. ackMsg(self.conn,message_id)
  756. _queue = Queue()
  757. listener1 = TestDumpListener(getConnect_activateMQ(),_queue)
  758. listener2 = TestDumpListener(getConnect_activateMQ(),_queue)
  759. createComsumer(listener1,"/queue/dataflow_attachment")
  760. createComsumer(listener2,"/queue/dataflow_extract")
  761. time.sleep(10)
  762. list_item = []
  763. list_docid = []
  764. while 1:
  765. try:
  766. _item = _queue.get(timeout=2)
  767. list_item.append(_item)
  768. except Exception as e:
  769. break
  770. for item in list_item:
  771. _item = json.loads(item.body)
  772. list_docid.append(_item.get("docid"))
  773. log(list_docid[:10])
  774. log("len docid:%d set len:%d"%(len(list_docid),len(set(list_docid))))
  775. def start_dataflow_init(self):
  776. # self.test_dump_docid()
  777. from BaseDataMaintenance.model.oracle.CaiGouYiXiangTemp import CaiGouYiXiangTemp
  778. from BaseDataMaintenance.model.oracle.PaiMaiChuRangTemp import PaiMaiChuRangTemp
  779. from BaseDataMaintenance.model.oracle.ZhaoBiaoGongGaoTemp import ZhaoBiaoGongGaoTemp
  780. from BaseDataMaintenance.model.oracle.ZhaoBiaoYuGaoTemp import ZhaoBiaoYuGaoTemp
  781. from BaseDataMaintenance.model.oracle.ZhongBiaoXinXiTemp import ZhongBiaoXinXiTemp
  782. from BaseDataMaintenance.model.oracle.ZiShenJieGuoTemp import ZiShenJieGuoTemp
  783. from BaseDataMaintenance.model.oracle.ChanQuanJiaoYiTemp import ChanQuanJiaoYiTemp
  784. from BaseDataMaintenance.model.oracle.GongGaoBianGengTemp import GongGaoBianGeng
  785. from BaseDataMaintenance.model.oracle.KongZhiJiaTemp import KongZhiJiaTemp
  786. from BaseDataMaintenance.model.oracle.TuDiKuangChanTemp import TuDiKuangChanTemp
  787. from BaseDataMaintenance.model.oracle.ZhaoBiaoDaYiTemp import ZhaoBiaoDaYiTemp
  788. from BaseDataMaintenance.model.oracle.ZhaoBiaoWenJianTemp import ZhaoBiaoWenJianTemp
  789. schedule = BlockingScheduler()
  790. schedule.add_job(self.temp2mq,"cron",args=(CaiGouYiXiangTemp({}),),second="*/10")
  791. schedule.add_job(self.temp2mq,"cron",args=(PaiMaiChuRangTemp({}),),second="*/10")
  792. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoGongGaoTemp({}),),second="*/10")
  793. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoYuGaoTemp({}),),second="*/10")
  794. schedule.add_job(self.temp2mq,"cron",args=(ZhongBiaoXinXiTemp({}),),second="*/10")
  795. schedule.add_job(self.temp2mq,"cron",args=(ZiShenJieGuoTemp({}),),second="*/10")
  796. schedule.add_job(self.temp2mq,"cron",args=(ChanQuanJiaoYiTemp({}),),second="*/10")
  797. schedule.add_job(self.temp2mq,"cron",args=(GongGaoBianGeng({}),),second="*/10")
  798. schedule.add_job(self.temp2mq,"cron",args=(KongZhiJiaTemp({}),),second="*/10")
  799. schedule.add_job(self.temp2mq,"cron",args=(TuDiKuangChanTemp({}),),second="*/10")
  800. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10")
  801. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10")
  802. schedule.add_job(self.ots2mq,"cron",second="*/10")
  803. schedule.start()
  804. def transform_attachment():
  805. from BaseDataMaintenance.model.ots.attachment import attachment
  806. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  807. from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots
  808. from threading import Thread
  809. from queue import Queue
  810. task_queue = Queue()
  811. def comsumer(task_queue,pool_postgres):
  812. while 1:
  813. _dict = task_queue.get(True)
  814. try:
  815. attach = Attachment_postgres(_dict)
  816. if not attach.exists(pool_postgres):
  817. attach.insert_row(pool_postgres)
  818. except Exception as e:
  819. traceback.print_exc()
  820. def start_comsumer(task_queue):
  821. pool_postgres = ConnectorPool(10,30,getConnection_postgres)
  822. comsumer_count = 30
  823. list_thread = []
  824. for i in range(comsumer_count):
  825. _t = Thread(target=comsumer,args=(task_queue,pool_postgres))
  826. list_thread.append(_t)
  827. for _t in list_thread:
  828. _t.start()
  829. ots_client = getConnect_ots()
  830. _thread = Thread(target=start_comsumer,args=(task_queue,))
  831. _thread.start()
  832. bool_query = BoolQuery(must_queries=[
  833. RangeQuery(attachment_crtime,"2022-05-06"),
  834. BoolQuery(should_queries=[TermQuery(attachment_status,10),
  835. TermQuery(attachment_status,ATTACHMENT_TOOLARGE)])
  836. ])
  837. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  838. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(attachment_crtime,sort_order=SortOrder.DESC)]),get_total_count=True,limit=100),
  839. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  840. list_dict = getRow_ots(rows)
  841. for _dict in list_dict:
  842. task_queue.put(_dict,True)
  843. _count = len(list_dict)
  844. while next_token:
  845. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  846. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  847. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  848. list_dict = getRow_ots(rows)
  849. for _dict in list_dict:
  850. task_queue.put(_dict,True)
  851. _count += len(list_dict)
  852. print("%d/%d,queue:%d"%(_count,total_count,task_queue.qsize()))
  853. def del_test_doc():
  854. ots_client = getConnect_ots()
  855. bool_query = BoolQuery(must_queries=[RangeQuery("docid",range_to=0)])
  856. list_data = []
  857. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  858. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  859. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  860. print(total_count)
  861. list_row = getRow_ots(rows)
  862. list_data.extend(list_row)
  863. while next_token:
  864. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  865. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  866. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  867. list_row = getRow_ots(rows)
  868. list_data.extend(list_row)
  869. for _row in list_data:
  870. _doc = Document_tmp(_row)
  871. _doc.delete_row(ots_client)
  872. _html = Document_html(_row)
  873. if _html.exists_row(ots_client):
  874. _html.delete_row(ots_client)
  875. def fixDoc_to_queue_extract():
  876. pool_mq = ConnectorPool(10,20,getConnect_activateMQ)
  877. try:
  878. ots_client = getConnect_ots()
  879. ots_capacity = getConnect_ots_capacity()
  880. bool_query = BoolQuery(must_queries=[
  881. RangeQuery("crtime","2022-05-31"),
  882. TermQuery("docchannel",114)
  883. ])
  884. list_data = []
  885. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  886. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  887. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  888. print(total_count)
  889. list_row = getRow_ots(rows)
  890. list_data.extend(list_row)
  891. _count = len(list_row)
  892. while next_token:
  893. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  894. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  895. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  896. list_row = getRow_ots(rows)
  897. list_data.extend(list_row)
  898. _count = len(list_data)
  899. print("%d/%d"%(_count,total_count))
  900. task_queue = Queue()
  901. for _row in list_data:
  902. if "all_columns" in _row:
  903. _row.pop("all_columns")
  904. _html = Document(_row)
  905. task_queue.put(_html)
  906. def _handle(item,result_queue):
  907. _html = item
  908. _html.fix_columns(ots_capacity,["dochtmlcon"],True)
  909. print(_html.getProperties().get(document_tmp_docid))
  910. send_msg_toacmq(pool_mq,json.dumps(_html.getProperties()),"/queue/dataflow_extract")
  911. mt = MultiThreadHandler(task_queue,_handle,None,30)
  912. mt.run()
  913. except Exception as e:
  914. traceback.print_exc()
  915. finally:
  916. pool_mq.destory()
  917. def check_data_synchronization():
  918. # filepath = "C:\\Users\\Administrator\\Desktop\\to_check.log"
  919. # list_uuid = []
  920. # _regrex = "delete\s+(?P<tablename>[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  921. # with open(filepath,"r",encoding="utf8") as f:
  922. # while 1:
  923. # _line = f.readline()
  924. # if not _line:
  925. # break
  926. # _match = re.search(_regrex,_line)
  927. # if _match is not None:
  928. # _uuid = _match.groupdict().get("uuid")
  929. # tablename = _match.groupdict.get("tablename")
  930. # if _uuid is not None:
  931. # list_uuid.append({"uuid":_uuid,"tablename":tablename})
  932. # print("total_count:",len(list_uuid))
  933. import pandas as pd
  934. from BaseDataMaintenance.common.Utils import load
  935. task_queue = Queue()
  936. list_data = []
  937. df_data = load("uuid.pk")
  938. # df_data = pd.read_excel("check.xlsx")
  939. for uuid,tablename in zip(df_data["uuid"],df_data["tablename"]):
  940. _dict = {"uuid":uuid,
  941. "tablename":tablename}
  942. list_data.append(_dict)
  943. task_queue.put(_dict)
  944. print("qsize:",task_queue.qsize())
  945. ots_client = getConnect_ots()
  946. def _handle(_item,result_queue):
  947. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  948. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  949. SearchQuery(bool_query,get_total_count=True),
  950. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  951. _item["exists"] = total_count
  952. mt = MultiThreadHandler(task_queue,_handle,None,30)
  953. mt.run()
  954. df_data = {"uuid":[],
  955. "tablename":[],
  956. "exists":[]}
  957. for _data in list_data:
  958. if _data["exists"]==0:
  959. for k,v in df_data.items():
  960. v.append(_data.get(k))
  961. import pandas as pd
  962. df2 = pd.DataFrame(df_data)
  963. df2.to_excel("check1.xlsx")
  964. current_path = os.path.abspath(os.path.dirname(__file__))
  965. def fixDoc_to_queue_init(filename=""):
  966. import pandas as pd
  967. from BaseDataMaintenance.model.oracle.GongGaoTemp import dict_oracle2ots
  968. if filename=="":
  969. filename = os.path.join(current_path,"check.xlsx")
  970. df = pd.read_excel(filename)
  971. dict_oracle2ots.pop("docchannel")
  972. row_name = ",".join(list(dict_oracle2ots.keys()))
  973. conn = getConnection_oracle()
  974. cursor = conn.cursor()
  975. for uuid,tablename,_exists in zip(df["uuid"],df["tablename"],df["exists"]):
  976. if _exists==0:
  977. _source = str(tablename).replace("_TEMP","")
  978. sql = " insert into %s(%s) select %s from %s where id='%s' "%(tablename,row_name,row_name,_source,uuid)
  979. cursor.execute(sql)
  980. log(sql)
  981. conn.commit()
  982. if __name__ == '__main__':
  983. # di = Dataflow_init()
  984. # di.start_dataflow_init()
  985. # transform_attachment()
  986. # del_test_doc()
  987. # de = Dataflow_ActivteMQ_extract()
  988. # de.start_flow_extract()
  989. # fixDoc_to_queue_extract()
  990. # check_data_synchronization()
  991. fixDoc_to_queue_init()