dataflow_mq.py 77 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736
  1. from BaseDataMaintenance.maintenance.dataflow import *
  2. from BaseDataMaintenance.common.activateMQUtils import *
  3. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ,getConnection_postgres,getConnection_mysql,getConnection_oracle,getConnect_ots_capacity,getConnect_redis_doc
  4. from BaseDataMaintenance.dataSource.setttings import *
  5. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  6. import os
  7. from BaseDataMaintenance.common.ossUtils import *
  8. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  9. from BaseDataMaintenance.model.ots.document import Document
  10. from BaseDataMaintenance.common.Utils import article_limit
  11. from BaseDataMaintenance.common.documentFingerprint import getFingerprint
  12. from BaseDataMaintenance.model.postgres.document_extract import *
  13. import sys
  14. sys.setrecursionlimit(1000000)
  15. from multiprocessing import Process
  16. class ActiveMQListener():
  17. def __init__(self,conn,_queue,*args,**kwargs):
  18. self.conn = conn
  19. self._queue = _queue
  20. def on_error(self, headers):
  21. log("===============")
  22. log('received an error %s' % str(headers.body))
  23. def on_message(self, headers):
  24. log("====message====")
  25. message_id = headers.headers["message-id"]
  26. body = headers.body
  27. self._queue.put({"frame":headers,"conn":self.conn},True)
  28. def __del__(self):
  29. self.conn.disconnect()
  30. class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
  31. class AttachmentMQListener():
  32. def __init__(self,conn,_func,_idx,*args,**kwargs):
  33. self.conn = conn
  34. self._func = _func
  35. self._idx = _idx
  36. def on_error(self, headers):
  37. log("===============")
  38. log('received an error %s' % str(headers.body))
  39. def on_message(self, headers):
  40. try:
  41. log("get message of idx:%s"%(str(self._idx)))
  42. message_id = headers.headers["message-id"]
  43. body = headers.body
  44. _dict = {"frame":headers,"conn":self.conn}
  45. self._func(_dict=_dict)
  46. except Exception as e:
  47. traceback.print_exc()
  48. pass
  49. def __del__(self):
  50. self.conn.disconnect()
  51. def __init__(self):
  52. Dataflow_attachment.__init__(self)
  53. self.mq_attachment = "/queue/dataflow_attachment"
  54. self.mq_attachment_failed = "/queue/dataflow_attachment_failed"
  55. self.mq_extract = "/queue/dataflow_extract"
  56. self.queue_attachment_ocr = Queue()
  57. self.queue_attachment_not_ocr = Queue()
  58. self.comsumer_count = 90
  59. self.retry_comsumer_count = 10
  60. self.retry_times = 5
  61. self.list_attachment_comsumer = []
  62. # for _i in range(self.comsumer_count):
  63. # listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.queue_attachment)
  64. # createComsumer(listener_attachment,self.mq_attachment)
  65. # self.list_attachment_comsumer.append(listener_attachment)
  66. self.attach_pool = ConnectorPool(10,30,getConnection_postgres)
  67. self.redis_pool = ConnectorPool(10,30,getConnect_redis_doc)
  68. self.conn_mq = getConnect_activateMQ()
  69. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  70. self.session = None
  71. listener_p = Process(target=self.start_attachment_listener)
  72. listener_p.start()
  73. def start_attachment_listener(self):
  74. for _i in range(self.comsumer_count):
  75. listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
  76. createComsumer(listener_attachment,self.mq_attachment)
  77. self.list_attachment_comsumer.append(listener_attachment)
  78. while 1:
  79. for i in range(len(self.list_attachment_comsumer)):
  80. if self.list_attachment_comsumer[i].conn.is_connected():
  81. continue
  82. else:
  83. listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
  84. createComsumer(listener,self.mq_attachment)
  85. self.list_attachment_comsumer[i] = listener
  86. time.sleep(5)
  87. def monitor_listener(self):
  88. for i in range(len(self.list_attachment_comsumer)):
  89. if self.list_attachment_comsumer[i].conn.is_connected():
  90. continue
  91. else:
  92. listener = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  93. createComsumer(listener,self.mq_attachment)
  94. self.list_attachment_comsumer[i] = listener
  95. def process_failed_attachment(self):
  96. from BaseDataMaintenance.java.MQInfo import getQueueSize
  97. attachment_size = getQueueSize("dataflow_attachment")
  98. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  99. if attachment_size<100 and failed_attachment_size>0:
  100. list_comsumer = []
  101. for _i in range(self.retry_comsumer_count):
  102. listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
  103. list_comsumer.append(listener_attachment)
  104. createComsumer(listener_attachment,self.mq_attachment_failed)
  105. while 1:
  106. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  107. if failed_attachment_size==0:
  108. break
  109. time.sleep(10)
  110. for _c in list_comsumer:
  111. _c.conn.disconnect()
  112. def attachment_listener_handler(self,_dict):
  113. try:
  114. frame = _dict["frame"]
  115. conn = _dict["conn"]
  116. message_id = frame.headers["message-id"]
  117. item = json.loads(frame.body)
  118. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  119. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  120. if len(page_attachments)==0:
  121. newitem ={"item":item,"list_attach":[],"message_id":message_id,"conn":conn}
  122. else:
  123. list_fileMd5 = []
  124. for _atta in page_attachments:
  125. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  126. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  127. newitem = {"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn}
  128. log("attachment get doc:%s"%(str(newitem.get("item",{}).get("docid"))))
  129. self.attachment_recognize(newitem,None)
  130. log("attachment get doc:%s succeed"%(str(newitem.get("item",{}).get("docid"))))
  131. except Exception as e:
  132. traceback.print_exc()
  133. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  134. try:
  135. list_html = []
  136. swf_urls = []
  137. _not_failed = True
  138. for _attach in list_attach:
  139. #测试全跑
  140. _filemd5 = _attach.getProperties().get(attachment_filemd5)
  141. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  142. log("%s has processed or toolarge"%(_filemd5))
  143. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  144. if _html is None:
  145. _html = ""
  146. list_html.append({attachment_filemd5:_filemd5,
  147. "html":_html})
  148. else:
  149. #has process_time then jump
  150. if len(str(_attach.getProperties().get(attachment_process_time,"")))>10 and _attach.getProperties().get(attachment_status)!=ATTACHMENT_INIT and not (_attach.getProperties().get(attachment_status)>=ATTACHMENT_MC_FAILED_FROM and _attach.getProperties().get(attachment_status)<=ATTACHMENT_MC_FAILED_TO):
  151. log("%s has process_time jump"%(_filemd5))
  152. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  153. if _html is None:
  154. _html = ""
  155. list_html.append({attachment_filemd5:_filemd5,
  156. "html":_html})
  157. else:
  158. log("%s requesting interface"%(_filemd5))
  159. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  160. if not _succeed:
  161. _not_failed = False
  162. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  163. if _html is None:
  164. _html = ""
  165. list_html.append({attachment_filemd5:_filemd5,
  166. "html":_html})
  167. if _attach.getProperties().get(attachment_filetype)=="swf":
  168. swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  169. if not _not_failed:
  170. return False,list_html,swf_urls
  171. return True,list_html,swf_urls
  172. except requests.ConnectionError as e1:
  173. raise e1
  174. except Exception as e:
  175. return False,list_html,swf_urls
  176. def attachment_recognize(self,_dict,result_queue):
  177. '''
  178. 识别附件内容
  179. :param _dict: 附件内容
  180. :param result_queue:
  181. :return:
  182. '''
  183. try:
  184. item = _dict.get("item")
  185. list_attach = _dict.get("list_attach")
  186. conn = _dict["conn"]
  187. message_id = _dict.get("message_id")
  188. _retry_times = item.get("retry_times",0)
  189. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  190. "docid":item.get("docid")})
  191. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  192. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  193. dhtml.delete_bidi_a()
  194. dtmp = Document_tmp(item)
  195. start_time = time.time()
  196. #调用识别接口
  197. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  198. _to_ack = False
  199. if not _succeed and _retry_times<self.retry_times:
  200. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  201. item["retry_times"] = _retry_times+1
  202. #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
  203. if item["retry_times"]>=self.retry_times:
  204. send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
  205. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
  206. #失败保存
  207. if _retry_times==0:
  208. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  209. dtmp.setValue(document_tmp_status,0,True)
  210. if not dtmp.exists_row(self.ots_client):
  211. dtmp.update_row(self.ots_client)
  212. dhtml.update_row(self.ots_client)
  213. if send_succeed:
  214. _to_ack = True
  215. else:
  216. try:
  217. log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),str(len(list_html))))
  218. dhtml.updateSWFImages(swf_urls)
  219. dhtml.updateAttachment(list_html)
  220. dtmp.setValue(document_tmp_attachment_extract_status,1,True)
  221. dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
  222. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
  223. if send_succeed:
  224. _to_ack = True
  225. except Exception as e:
  226. traceback.print_exc()
  227. if _to_ack:
  228. ackMsg(conn,message_id)
  229. log("document:%d get attachments with result:%s %s retry_times:%d"%(item.get("docid"),str(_succeed),str(_to_ack),_retry_times))
  230. except Exception as e:
  231. traceback.print_exc()
  232. if time.time()-start_time<10:
  233. item["retry_times"] -= 1
  234. if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment):
  235. ackMsg(conn,message_id)
  236. def request_attachment_interface(self,attach,_dochtmlcon):
  237. filemd5 = attach.getProperties().get(attachment_filemd5)
  238. _status = attach.getProperties().get(attachment_status)
  239. _filetype = attach.getProperties().get(attachment_filetype)
  240. _path = attach.getProperties().get(attachment_path)
  241. _uuid = uuid4()
  242. objectPath = attach.getProperties().get(attachment_path)
  243. docids = attach.getProperties().get(attachment_docids)
  244. if objectPath is None:
  245. relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex)
  246. else:
  247. relative_path = objectPath[5:].replace("//","/")
  248. localpath = "/FileInfo/%s"%(relative_path)
  249. if not os.path.exists(localpath):
  250. if not os.path.exists(os.path.dirname(localpath)):
  251. os.makedirs(os.path.dirname(localpath))
  252. local_exists = False
  253. else:
  254. local_exists = True
  255. _size = os.path.getsize(localpath)
  256. not_failed_flag = True
  257. try:
  258. d_start_time = time.time()
  259. if not local_exists:
  260. log("md5:%s path:%s not exists,start downloading"%(filemd5,objectPath))
  261. try:
  262. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  263. except Exception as e:
  264. download_succeed = False
  265. else:
  266. log("md5:%s path:%s exists"%(filemd5,objectPath[5:]))
  267. if not (local_exists or download_succeed):
  268. _ots_attach = attachment(attach.getProperties_ots())
  269. _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_classification,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True)
  270. log("md5:%s path:%s file not in local or oss,search ots.attachment"%(filemd5,objectPath))
  271. if _ots_attach.getProperties().get(attachment_attachmenthtml,"")!="" and str(_ots_attach.getProperties().get(attachment_status))!=str(ATTACHMENT_INIT):
  272. attach.setValue(attachment_attachmenthtml,_ots_attach.getProperties().get(attachment_attachmenthtml,""))
  273. attach.setValue(attachment_attachmentcon,_ots_attach.getProperties().get(attachment_attachmentcon,""))
  274. attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,""))
  275. attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,""))
  276. attach.setValue(attachment_classification,_ots_attach.getProperties().get(attachment_classification,""))
  277. # if attach.exists(self.attach_pool):
  278. # attach.update_row(self.attach_pool)
  279. # else:
  280. # attach.insert_row(self.attach_pool)
  281. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  282. try:
  283. if os.exists(localpath):
  284. os.remove(localpath)
  285. except Exception as e:
  286. pass
  287. return True
  288. if _ots_exists:
  289. objectPath = attach.getProperties().get(attachment_path)
  290. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  291. if download_succeed:
  292. log("md5:%s path:%s download file from oss succeed"%(filemd5,objectPath))
  293. else:
  294. log("md5:%s path:%s download file from ots failed=="%(filemd5,objectPath))
  295. else:
  296. log("md5:%s path:%s not found in ots"%(filemd5,objectPath))
  297. if local_exists or download_succeed:
  298. _size = os.path.getsize(localpath)
  299. attach.setValue(attachment_size,_size,True)
  300. if _size>ATTACHMENT_LARGESIZE:
  301. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE,True)
  302. log("attachment :%s of path:%s to large"%(filemd5,_path))
  303. _ots_attach = attachment(attach.getProperties_ots())
  304. _ots_attach.update_row(self.ots_client)
  305. # #更新postgres
  306. # if attach.exists(self.attach_pool):
  307. # attach.update_row(self.attach_pool)
  308. # else:
  309. # attach.insert_row(self.attach_pool)
  310. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  311. if local_exists:
  312. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  313. os.remove(localpath)
  314. return True
  315. time_download = time.time()-d_start_time
  316. #调用接口处理结果
  317. start_time = time.time()
  318. _filetype = attach.getProperties().get(attachment_filetype)
  319. # _data_base64 = base64.b64encode(open(localpath,"rb").read())
  320. # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
  321. _success,_html,swf_images,classification = getAttachDealInterface(None,_filetype,path=localpath,session=self.session)
  322. log("process filemd5:%s %s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
  323. if _success:
  324. if len(_html)<5:
  325. _html = ""
  326. else:
  327. if len(_html)>1:
  328. _html = "interface return error"
  329. else:
  330. sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  331. _html = ""
  332. return False
  333. swf_images = eval(swf_images)
  334. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  335. swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  336. if len(swf_urls)==0:
  337. objectPath = attach.getProperties().get(attachment_path,"")
  338. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  339. if not os.path.exists(swf_dir):
  340. os.mkdir(swf_dir)
  341. for _i in range(len(swf_images)):
  342. _base = swf_images[_i]
  343. _base = base64.b64decode(_base)
  344. filename = "swf_page_%d.png"%(_i)
  345. filepath = os.path.join(swf_dir,filename)
  346. with open(filepath,"wb") as f:
  347. f.write(_base)
  348. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  349. if os.path.exists(swf_dir):
  350. os.rmdir(swf_dir)
  351. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  352. if re.search("<td",_html) is not None:
  353. attach.setValue(attachment_has_table,1,True)
  354. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  355. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  356. if _file_title!="":
  357. attach.setValue(attachment_file_title,_file_title,True)
  358. if filelink!="":
  359. attach.setValue(attachment_file_link,filelink,True)
  360. attach.setValue(attachment_attachmenthtml,_html,True)
  361. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  362. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  363. attach.setValue(attachment_recsize,len(_html),True)
  364. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  365. attach.setValue(attachment_classification,classification,True)
  366. #更新ots
  367. _ots_attach = attachment(attach.getProperties_ots())
  368. _ots_attach.update_row(self.ots_client) #线上再开放更新
  369. # #更新postgres
  370. # if attach.exists(self.attach_pool):
  371. # attach.update_row(self.attach_pool)
  372. # else:
  373. # attach.insert_row(self.attach_pool)
  374. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  375. if local_exists:
  376. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  377. try:
  378. if upload_status and os.exists(localpath):
  379. os.remove(localpath)
  380. except Exception as e:
  381. pass
  382. return True
  383. else:
  384. return True
  385. except requests.ConnectionError as e1:
  386. raise e1
  387. except oss2.exceptions.NotFound as e:
  388. return True
  389. except Exception as e:
  390. traceback.print_exc()
  391. # def flow_attachment(self):
  392. # self.flow_attachment_producer()
  393. # self.flow_attachment_producer_comsumer()
  394. def getAttachPath(self,filemd5,_dochtmlcon):
  395. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  396. list_mark = ["data","filelink"]
  397. for _mark in list_mark:
  398. _find = _soup.find("a",attrs={_mark:filemd5})
  399. filelink = ""
  400. if _find is None:
  401. _find = _soup.find("img",attrs={_mark:filemd5})
  402. if _find is not None:
  403. filelink = _find.attrs.get("src","")
  404. else:
  405. filelink = _find.attrs.get("href","")
  406. if filelink.find("bidizhaobiao")>=0:
  407. _path = filelink.split("/file")
  408. if len(_path)>1:
  409. return _path[1]
  410. def getAttach_json_fromRedis(self,filemd5):
  411. db = self.redis_pool.getConnector()
  412. try:
  413. _key = "attach-%s"%(filemd5)
  414. _attach_json = db.get(_key)
  415. return _attach_json
  416. except Exception as e:
  417. log("getAttach_json_fromRedis error %s"%(str(e)))
  418. finally:
  419. try:
  420. if db.connection.check_health():
  421. self.redis_pool.putConnector(db)
  422. except Exception as e:
  423. pass
  424. return None
  425. def putAttach_json_toRedis(self,filemd5,extract_dict):
  426. db = self.redis_pool.getConnector()
  427. try:
  428. new_dict = {}
  429. for k,v in extract_dict.items():
  430. if not isinstance(v,set):
  431. new_dict[k] = v
  432. _key = "attach-%s"%(filemd5)
  433. _extract_json = db.set(str(_key),json.dumps(new_dict))
  434. db.expire(_key,3600*3)
  435. return _extract_json
  436. except Exception as e:
  437. log("putExtract_json_toRedis error%s"%(str(e)))
  438. traceback.print_exc()
  439. finally:
  440. try:
  441. if db.connection.check_health():
  442. self.redis_pool.putConnector(db)
  443. except Exception as e:
  444. pass
  445. def getAttachments(self,list_filemd5,_dochtmlcon):
  446. conn = self.attach_pool.getConnector()
  447. #搜索postgres
  448. try:
  449. to_find_md5 = []
  450. for _filemd5 in list_filemd5[:50]:
  451. if _filemd5 is not None:
  452. to_find_md5.append(_filemd5)
  453. conditions = ["filemd5 in ('%s')"%("','".join(to_find_md5))]
  454. list_attachment = []
  455. set_md5 = set()
  456. # list_attachment = Attachment_postgres.select_rows(conn,Attachment_postgres,"attachment",conditions)
  457. # for _attach in list_attachment:
  458. # set_md5.add(_attach.getProperties().get(attachment_filemd5))
  459. for _filemd5 in to_find_md5:
  460. _json = self.getAttach_json_fromRedis(_filemd5)
  461. if _json is not None:
  462. set_md5.add(_filemd5)
  463. list_attachment.append(Attachment_postgres(json.loads(_json)))
  464. log("select localpath database %d/%d"%(len(set_md5),len(to_find_md5)))
  465. for _filemd5 in to_find_md5:
  466. if _filemd5 not in set_md5:
  467. _path = self.getAttachPath(_filemd5,_dochtmlcon)
  468. log("getAttachments search in ots:%s"%(_filemd5))
  469. _attach = {attachment_filemd5:_filemd5}
  470. _attach_ots = attachment(_attach)
  471. if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time],True):
  472. if _attach_ots.getProperties().get(attachment_status) is not None:
  473. log("getAttachments find in ots:%s"%(_filemd5))
  474. list_attachment.append(Attachment_postgres(_attach_ots.getProperties()))
  475. else:
  476. if _path:
  477. if _path[0]=="/":
  478. _path = _path[1:]
  479. _filetype = _path.split(".")[-1]
  480. _attach = {attachment_filemd5:_filemd5,
  481. attachment_filetype:_filetype,
  482. attachment_status:20,
  483. attachment_path:"%s/%s"%(_filemd5[:4],_path),
  484. attachment_crtime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  485. list_attachment.append(Attachment_postgres(_attach))
  486. return list_attachment
  487. except Exception as e:
  488. log("attachProcess comsumer error %s"%str(e))
  489. log(str(to_find_md5))
  490. traceback.print_exc()
  491. return []
  492. finally:
  493. self.attach_pool.putConnector(conn)
  494. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  495. q_size = self.queue_attachment.qsize()
  496. qsize_ocr = self.queue_attachment_ocr.qsize()
  497. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  498. log("queue_attachment:%d,queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(q_size,qsize_ocr,qsize_not_ocr))
  499. def flow_attachment_producer_comsumer(self):
  500. log("start flow_attachment comsumer")
  501. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1,need_stop=False,restart=True)
  502. mt.run()
  503. def flow_attachment_process(self):
  504. self.process_comsumer()
  505. # p = Process(target = self.process_comsumer)
  506. # p.start()
  507. # p.join()
  508. def set_queue(self,_dict):
  509. list_attach = _dict.get("list_attach")
  510. to_ocr = False
  511. for attach in list_attach:
  512. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  513. to_ocr = True
  514. break
  515. if to_ocr:
  516. self.queue_attachment_ocr.put(_dict,True)
  517. else:
  518. self.queue_attachment_not_ocr.put(_dict,True)
  519. def comsumer_handle(self,_dict,result_queue):
  520. try:
  521. frame = _dict["frame"]
  522. conn = _dict["conn"]
  523. message_id = frame.headers["message-id"]
  524. item = json.loads(frame.body)
  525. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  526. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  527. if len(page_attachments)==0:
  528. self.set_queue({"item":item,"list_attach":[],"message_id":message_id,"conn":conn})
  529. else:
  530. list_fileMd5 = []
  531. for _atta in page_attachments:
  532. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  533. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  534. self.set_queue({"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn})
  535. except Exception as e:
  536. traceback.print_exc()
  537. def remove_attachment_postgres(self):
  538. current_date = getCurrent_date(format="%Y-%m-%d")
  539. last_date = timeAdd(current_date,-2,format="%Y-%m-%d")
  540. sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
  541. conn = self.attach_pool.getConnector()
  542. try:
  543. cursor = conn.cursor()
  544. cursor.execute(sql)
  545. conn.commit()
  546. self.attach_pool.putConnector(conn)
  547. except Exception as e:
  548. conn.close()
  549. def start_flow_attachment(self):
  550. schedule = BlockingScheduler()
  551. # schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  552. # schedule.add_job(self.flow_attachment,"cron",second="*/10")
  553. # schedule.add_job(self.flow_attachment_producer,"cron",second="*/10")
  554. # schedule.add_job(self.flow_attachment_producer_comsumer,"cron",second="*/10")
  555. # schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  556. # schedule.add_job(self.monitor_attachment_process,"cron",second="*/10")
  557. # schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
  558. schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
  559. schedule.start()
  560. class Dataflow_ActivteMQ_extract(Dataflow_extract):
  561. class ExtractListener():
  562. def __init__(self,conn,_func,_idx,*args,**kwargs):
  563. self.conn = conn
  564. self._func = _func
  565. self._idx = _idx
  566. def on_message(self, headers):
  567. try:
  568. log("get message of idx:%d"%(self._idx))
  569. message_id = headers.headers["message-id"]
  570. body = headers.body
  571. log("get message %s crtime:%s"%(message_id,json.loads(body).get("crtime","")))
  572. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  573. except Exception as e:
  574. traceback.print_exc()
  575. pass
  576. def on_error(self, headers):
  577. log('received an error %s' % str(headers.body))
  578. def __del__(self):
  579. self.conn.disconnect()
  580. def __init__(self,create_listener=True):
  581. Dataflow_extract.__init__(self)
  582. self.industy_url = "http://127.0.0.1:15000/industry_extract"
  583. self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",20],
  584. ["http://192.168.0.115:15030/content_extract",10]
  585. ]
  586. self.mq_extract = "/queue/dataflow_extract"
  587. self.mq_extract_failed = "/queue/dataflow_extract_failed"
  588. self.whole_weight = 0
  589. for _url,weight in self.extract_interfaces:
  590. self.whole_weight+= weight
  591. current_weight = 0
  592. for _i in range(len(self.extract_interfaces)):
  593. current_weight += self.extract_interfaces[_i][1]
  594. self.extract_interfaces[_i][1] = current_weight/self.whole_weight
  595. self.comsumer_count = 50
  596. # self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres)
  597. self.pool_redis_doc = ConnectorPool(10,self.comsumer_count,getConnect_redis_doc)
  598. self.conn_mq = getConnect_activateMQ()
  599. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  600. self.block_url = RLock()
  601. self.url_count = 0
  602. self.session = None
  603. self.list_extract_comsumer = []
  604. # for _i in range(self.comsumer_count):
  605. # listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  606. # createComsumer(listener_extract,self.mq_extract)
  607. # self.list_extract_comsumer.append(listener_extract)
  608. if create_listener:
  609. listener_p = Process(target=self.start_extract_listener)
  610. listener_p.start()
  611. def start_extract_listener(self):
  612. for _i in range(self.comsumer_count):
  613. listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  614. createComsumer(listener_extract,self.mq_extract)
  615. self.list_extract_comsumer.append(listener_extract)
  616. while 1:
  617. for _i in range(len(self.list_extract_comsumer)):
  618. if self.list_extract_comsumer[_i].conn.is_connected():
  619. continue
  620. else:
  621. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  622. createComsumer(listener,self.mq_extract)
  623. self.list_extract_comsumer[_i] = listener
  624. time.sleep(5)
  625. def monitor_listener(self):
  626. for i in range(len(self.list_extract_comsumer)):
  627. if self.list_extract_comsumer[i].conn.is_connected():
  628. continue
  629. else:
  630. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
  631. createComsumer(listener,self.mq_extract)
  632. self.list_extract_comsumer[i] = listener
  633. def getExtract_url(self):
  634. # _url_num = 0
  635. # with self.block_url:
  636. # self.url_count += 1
  637. # self.url_count %= self.whole_weight
  638. # _url_num = self.url_count
  639. _r = random.random()
  640. # _r = _url_num/self.whole_weight
  641. for _i in range(len(self.extract_interfaces)):
  642. if _r<=self.extract_interfaces[_i][1]:
  643. return self.extract_interfaces[_i][0]
  644. def request_extract_interface(self,json,headers):
  645. # _i = random.randint(0,len(self.extract_interfaces)-1)
  646. # _i = 0
  647. # _url = self.extract_interfaces[_i]
  648. _url = self.getExtract_url()
  649. log("extract_url:%s"%(str(_url)))
  650. with requests.Session() as session:
  651. resp = session.post(_url,json=json,headers=headers,timeout=10*60)
  652. return resp
  653. def request_industry_interface(self,json,headers):
  654. resp = requests.post(self.industy_url,json=json,headers=headers)
  655. return resp
  656. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  657. q_size = self.queue_extract.qsize()
  658. log("queue extract size:%d"%(q_size))
  659. def process_extract_failed(self):
  660. def _handle(_dict,result_queue):
  661. frame = _dict.get("frame")
  662. message_id = frame.headers["message-id"]
  663. subscription = frame.headers.setdefault('subscription', None)
  664. conn = _dict.get("conn")
  665. body = frame.body
  666. if body is not None:
  667. item = json.loads(body)
  668. item["extract_times"] = 10
  669. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  670. ackMsg(conn,message_id,subscription)
  671. from BaseDataMaintenance.java.MQInfo import getQueueSize
  672. try:
  673. extract_failed_size = getQueueSize("dataflow_extract_failed")
  674. extract_size = getQueueSize("dataflow_extract")
  675. log("extract_failed_size %s extract_size %s"%(str(extract_failed_size),str(extract_size)))
  676. if extract_failed_size>0 and extract_size<100:
  677. failed_listener = self.ExtractListener(getConnect_activateMQ(),_handle,1)
  678. createComsumer(failed_listener,self.mq_extract_failed)
  679. while 1:
  680. extract_failed_size = getQueueSize("dataflow_extract_failed")
  681. if extract_failed_size==0:
  682. break
  683. time.sleep(10)
  684. failed_listener.conn.disconnect()
  685. except Exception as e:
  686. traceback.print_exc()
  687. def flow_extract(self,):
  688. self.comsumer()
  689. def comsumer(self):
  690. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,20,1,True)
  691. mt.run()
  692. def getExtract_json_fromDB(self,_fingerprint):
  693. conn = self.pool_postgres.getConnector()
  694. try:
  695. list_extract = Document_extract_postgres.select_rows(conn,Document_extract_postgres,"document_extract",[" fingerprint='%s'"%_fingerprint])
  696. if len(list_extract)>0:
  697. _extract = list_extract[0]
  698. return _extract.getProperties().get(document_extract_extract_json)
  699. except Exception as e:
  700. traceback.print_exc()
  701. finally:
  702. self.pool_postgres.putConnector(conn)
  703. return None
  704. def putExtract_json_toDB(self,fingerprint,docid,extract_json):
  705. _de = Document_extract_postgres({document_extract_fingerprint:fingerprint,
  706. document_extract_docid:docid,
  707. document_extract_extract_json:extract_json})
  708. _de.insert_row(self.pool_postgres,1)
  709. def getExtract_json_fromRedis(self,_fingerprint):
  710. db = self.pool_redis_doc.getConnector()
  711. try:
  712. _extract_json = db.get(_fingerprint)
  713. return _extract_json
  714. except Exception as e:
  715. log("getExtract_json_fromRedis error %s"%(str(e)))
  716. finally:
  717. try:
  718. if db.connection.check_health():
  719. self.pool_redis_doc.putConnector(db)
  720. except Exception as e:
  721. pass
  722. return None
  723. def putExtract_json_toRedis(self,fingerprint,extract_json):
  724. db = self.pool_redis_doc.getConnector()
  725. try:
  726. _extract_json = db.set(str(fingerprint),extract_json)
  727. db.expire(fingerprint,3600*2)
  728. return _extract_json
  729. except Exception as e:
  730. log("putExtract_json_toRedis error%s"%(str(e)))
  731. traceback.print_exc()
  732. finally:
  733. try:
  734. if db.connection.check_health():
  735. self.pool_redis_doc.putConnector(db)
  736. except Exception as e:
  737. pass
  738. def comsumer_handle(self,_dict,result_queue):
  739. try:
  740. log("start handle")
  741. frame = _dict["frame"]
  742. conn = _dict["conn"]
  743. message_id = frame.headers["message-id"]
  744. subscription = frame.headers.setdefault('subscription', None)
  745. item = json.loads(frame.body)
  746. dtmp = Document_tmp(item)
  747. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  748. "docid":item.get("docid")})
  749. extract_times = item.get("extract_times",0)+1
  750. item["extract_times"] = extract_times
  751. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  752. html_len = len(_dochtmlcon)
  753. if html_len>50000:
  754. log("docid %s dochtmlcon too long len %d "%(str(item.get("docid")),html_len))
  755. try:
  756. _dochtmlcon = re.sub("<html>|</html>|<body>|</body>", "", _dochtmlcon)
  757. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  758. if len(_dochtmlcon)>200000:
  759. _find = _soup.find("div",attrs={"class":"richTextFetch"})
  760. if _find is not None:
  761. _find.decompose()
  762. else:
  763. _soup = article_limit(_soup,50000)
  764. _dochtmlcon = str(_soup)
  765. except Exception as e:
  766. traceback.print_exc()
  767. ackMsg(conn,message_id,subscription)
  768. return
  769. log("docid %s len %d limit to %d"%(str(item.get("docid")),html_len,len(_dochtmlcon)))
  770. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  771. _extract = Document_extract({})
  772. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  773. _extract.setValue(document_extract2_docid,item.get(document_docid))
  774. all_done = 1
  775. data = {}
  776. for k,v in item.items():
  777. data[k] = v
  778. data["timeout"] = 440
  779. data["doc_id"] = data.get(document_tmp_docid,0)
  780. # if data["docid"]<298986054 and data["docid"]>0:
  781. # log("jump docid %s"%(str(data["docid"])))
  782. # ackMsg(conn,message_id,subscription)
  783. # return
  784. data["content"] = data.get(document_tmp_dochtmlcon,"")
  785. if document_tmp_dochtmlcon in data:
  786. data.pop(document_tmp_dochtmlcon)
  787. data["title"] = data.get(document_tmp_doctitle,"")
  788. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  789. data["web_source_name"] = item.get(document_tmp_web_source_name,"")
  790. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  791. _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))
  792. if all_done>0:
  793. _time = time.time()
  794. # extract_json = self.getExtract_json_fromDB(_fingerprint)
  795. extract_json = self.getExtract_json_fromRedis(_fingerprint)
  796. log("get json from db takes %.4f"%(time.time()-_time))
  797. # extract_json = None
  798. _docid = int(data["doc_id"])
  799. if extract_json is not None:
  800. log("fingerprint %s exists docid:%s"%(_fingerprint,str(_docid)))
  801. _extract.setValue(document_extract2_extract_json,extract_json,True)
  802. else:
  803. resp = self.request_extract_interface(json=data,headers=self.header)
  804. if (resp.status_code >=200 and resp.status_code<=213):
  805. extract_json = resp.content.decode("utf8")
  806. _extract.setValue(document_extract2_extract_json,extract_json,True)
  807. _time = time.time()
  808. # self.putExtract_json_toDB(_fingerprint,_docid,extract_json)
  809. self.putExtract_json_toRedis(_fingerprint,extract_json)
  810. log("get json to db takes %.4f"%(time.time()-_time))
  811. else:
  812. log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  813. all_done = -2
  814. # if all_done>0:
  815. # resp = self.request_industry_interface(json=data,headers=self.header)
  816. # if (resp.status_code >=200 and resp.status_code<=213):
  817. # _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  818. # else:
  819. # log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  820. # all_done = -3
  821. # _to_ack = False
  822. # if all_done>0 and len(_extract.getProperties().get(document_extract2_extract_json,""))<=2:
  823. # all_done = -4
  824. _extract.setValue(document_extract2_industry_json,"{}",True)
  825. try:
  826. if all_done!=1:
  827. sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  828. if extract_times>=10:
  829. #process as succeed
  830. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  831. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  832. dtmp.update_row(self.ots_client)
  833. dhtml.update_row(self.ots_client)
  834. #replace as {}
  835. _extract.setValue(document_extract2_extract_json,"{}",True)
  836. _extract.setValue(document_extract2_industry_json,"{}",True)
  837. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  838. _extract.update_row(self.ots_client)
  839. _to_ack = True
  840. elif extract_times>5:
  841. #transform to the extract_failed queue
  842. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  843. #process as succeed
  844. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  845. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  846. dtmp.update_row(self.ots_client)
  847. dhtml.update_row(self.ots_client)
  848. #replace as {}
  849. _extract.setValue(document_extract2_extract_json,"{}",True)
  850. _extract.setValue(document_extract2_industry_json,"{}",True)
  851. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  852. _extract.update_row(self.ots_client)
  853. _to_ack = True
  854. else:
  855. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
  856. #失败保存
  857. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  858. dtmp.setValue(document_tmp_status,60,True)
  859. if not dtmp.exists_row(self.ots_client):
  860. dtmp.update_row(self.ots_client)
  861. dhtml.update_row(self.ots_client)
  862. if send_succeed:
  863. _to_ack = True
  864. else:
  865. #process succeed
  866. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  867. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  868. if _docid==290816703:
  869. dtmp.setValue("test_json",extract_json,True)
  870. dtmp.update_row(self.ots_client)
  871. dhtml.update_row(self.ots_client)
  872. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  873. _extract.update_row(self.ots_client)
  874. _to_ack = True
  875. except Exception:
  876. traceback.print_exc()
  877. if _to_ack:
  878. ackMsg(conn,message_id,subscription)
  879. log("process %s docid:%d %s"%(str(_to_ack),data["doc_id"],str(all_done)))
  880. except requests.ConnectionError as e1:
  881. item["extract_times"] -= 1
  882. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  883. ackMsg(conn,message_id,subscription)
  884. except Exception as e:
  885. traceback.print_exc()
  886. sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
  887. log("process %s docid: failed message_id:%s"%(data["doc_id"],message_id))
  888. if extract_times>=10:
  889. #process as succeed
  890. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  891. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  892. dtmp.update_row(self.ots_client)
  893. dhtml.update_row(self.ots_client)
  894. #replace as {}
  895. _extract.setValue(document_extract2_extract_json,"{}",True)
  896. _extract.setValue(document_extract2_industry_json,"{}",True)
  897. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  898. _extract.update_row(self.ots_client)
  899. ackMsg(conn,message_id,subscription)
  900. elif extract_times>5:
  901. #transform to the extract_failed queue
  902. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  903. #process as succeed
  904. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  905. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  906. dtmp.update_row(self.ots_client)
  907. dhtml.update_row(self.ots_client)
  908. #replace as {}
  909. _extract.setValue(document_extract2_extract_json,"{}",True)
  910. _extract.setValue(document_extract2_industry_json,"{}",True)
  911. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  912. _extract.update_row(self.ots_client)
  913. ackMsg(conn,message_id,subscription)
  914. else:
  915. #transform to the extract queue
  916. #失败保存
  917. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  918. dtmp.setValue(document_tmp_status,60,True)
  919. if not dtmp.exists_row(self.ots_client):
  920. dtmp.update_row(self.ots_client)
  921. dhtml.update_row(self.ots_client)
  922. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  923. ackMsg(conn,message_id,subscription)
  924. def delete_document_extract(self,save_count=70*10000):
  925. conn = self.pool_postgres.getConnector()
  926. try:
  927. cursor = conn.cursor()
  928. sql = " select max(docid),min(docid) from document_extract "
  929. cursor.execute(sql)
  930. rows = cursor.fetchall()
  931. if len(rows)>0:
  932. maxdocid,mindocid = rows[0]
  933. d_mindocid = int(maxdocid)-save_count
  934. if mindocid<d_mindocid:
  935. sql = " delete from document_extract where docid<%d"%d_mindocid
  936. cursor.execute(sql)
  937. conn.commit()
  938. except Exception as e:
  939. traceback.print_exc()
  940. finally:
  941. self.pool_postgres.putConnector(conn)
  942. def start_flow_extract(self):
  943. schedule = BlockingScheduler()
  944. schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
  945. schedule.add_job(self.process_extract_failed,"cron",minute="*/5")
  946. # schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
  947. # schedule.add_job(self.monitor_listener,"cron",minute="*/5")
  948. schedule.start()
  949. from multiprocessing import RLock
  950. docid_lock = RLock()
  951. conn_mysql = None
  952. def generateRangeDocid(nums):
  953. global conn_mysql
  954. while 1:
  955. try:
  956. with docid_lock:
  957. if conn_mysql is None:
  958. conn_mysql = getConnection_mysql()
  959. cursor = conn_mysql.cursor()
  960. sql = "select serial_value from b2c_serial_no where serial_name='DocumentIdSerial'"
  961. cursor.execute(sql)
  962. rows = cursor.fetchall()
  963. current_docid = rows[0][0]
  964. next_docid = current_docid+1
  965. update_docid = current_docid+nums
  966. sql = " update b2c_serial_no set serial_value=%d where serial_name='DocumentIdSerial'"%(update_docid)
  967. cursor.execute(sql)
  968. conn_mysql.commit()
  969. return next_docid
  970. except Exception as e:
  971. conn_mysql = getConnection_mysql()
  972. # 自定义jsonEncoder
  973. class MyEncoder(json.JSONEncoder):
  974. def default(self, obj):
  975. if isinstance(obj, np.ndarray):
  976. return obj.tolist()
  977. elif isinstance(obj, bytes):
  978. return str(obj, encoding='utf-8')
  979. elif isinstance(obj, (np.float_, np.float16, np.float32,
  980. np.float64,Decimal)):
  981. return float(obj)
  982. elif isinstance(obj,str):
  983. return obj
  984. return json.JSONEncoder.default(self, obj)
  985. class Dataflow_init(Dataflow):
  986. class InitListener():
  987. def __init__(self,conn,*args,**kwargs):
  988. self.conn = conn
  989. self.get_count = 1000
  990. self.count = self.get_count
  991. self.begin_docid = None
  992. self.mq_init = "/queue/dataflow_init"
  993. self.mq_attachment = "/queue/dataflow_attachment"
  994. self.mq_extract = "/queue/dataflow_extract"
  995. self.pool_mq1 = ConnectorPool(1,4,getConnect_activateMQ)
  996. def on_error(self, headers):
  997. log('received an error %s' % headers.body)
  998. def getRangeDocid(self):
  999. begin_docid = generateRangeDocid(self.get_count)
  1000. self.begin_docid = begin_docid
  1001. self.count = 0
  1002. def getNextDocid(self):
  1003. if self.count>=self.get_count:
  1004. self.getRangeDocid()
  1005. next_docid = self.begin_docid+self.count
  1006. self.count += 1
  1007. return next_docid
  1008. def on_message(self, headers):
  1009. try:
  1010. next_docid = int(self.getNextDocid())
  1011. partitionkey = int(next_docid%500+1)
  1012. message_id = headers.headers["message-id"]
  1013. body = json.loads(headers.body)
  1014. body[document_tmp_partitionkey] = partitionkey
  1015. body[document_tmp_docid] = next_docid
  1016. if body.get(document_original_docchannel) is None:
  1017. body[document_original_docchannel] = body.get(document_docchannel)
  1018. page_attachments = body.get(document_tmp_attachment_path,"[]")
  1019. _uuid = body.get(document_tmp_uuid,"")
  1020. if page_attachments!="[]":
  1021. status = random.randint(1,10)
  1022. body[document_tmp_status] = status
  1023. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment):
  1024. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  1025. ackMsg(self.conn,message_id)
  1026. else:
  1027. log("send_msg_error on init listener")
  1028. else:
  1029. status = random.randint(11,50)
  1030. body[document_tmp_status] = status
  1031. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract):
  1032. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  1033. ackMsg(self.conn,message_id)
  1034. else:
  1035. log("send_msg_error on init listener")
  1036. except Exception as e:
  1037. traceback.print_exc()
  1038. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_init):
  1039. log("init error")
  1040. ackMsg(self.conn,message_id)
  1041. def __del__(self):
  1042. self.conn.disconnect()
  1043. del self.pool_mq1
  1044. def __init__(self):
  1045. Dataflow.__init__(self)
  1046. self.mq_init = "/queue/dataflow_init"
  1047. self.mq_attachment = "/queue/dataflow_attachment"
  1048. self.mq_extract = "/queue/dataflow_extract"
  1049. self.pool_oracle = ConnectorPool(10,15,getConnection_oracle)
  1050. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  1051. self.ots_capacity = getConnect_ots_capacity()
  1052. self.init_comsumer_counts = 2
  1053. self.list_init_comsumer = []
  1054. for i in range(self.init_comsumer_counts):
  1055. listener = self.InitListener(getConnect_activateMQ())
  1056. createComsumer(listener,self.mq_init)
  1057. self.list_init_comsumer.append(listener)
  1058. def monitor_listener(self):
  1059. for i in range(len(self.list_init_comsumer)):
  1060. if self.list_init_comsumer[i].conn.is_connected():
  1061. continue
  1062. else:
  1063. listener = self.InitListener(getConnect_activateMQ())
  1064. createComsumer(listener,self.mq_init)
  1065. self.list_init_comsumer[i] = listener
  1066. def temp2mq(self,object):
  1067. conn_oracle = self.pool_oracle.getConnector()
  1068. try:
  1069. list_obj = object.select_rows(conn_oracle,type(object),object.table_name,[],limit=1000)
  1070. for _obj in list_obj:
  1071. ots_dict = _obj.getProperties_ots()
  1072. if len(ots_dict.get("dochtmlcon",""))>500000:
  1073. _obj.delete_row(conn_oracle)
  1074. log("msg too long:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  1075. continue
  1076. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_init):
  1077. #删除数据,上线放开
  1078. _obj.delete_row(conn_oracle)
  1079. else:
  1080. log("send_msg_error111:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  1081. self.pool_oracle.putConnector(conn_oracle)
  1082. except Exception as e:
  1083. traceback.print_exc()
  1084. self.pool_oracle.decrease()
  1085. def ots2mq(self):
  1086. try:
  1087. bool_query = BoolQuery(must_queries=[RangeQuery("status",1,51)])
  1088. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1089. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  1090. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1091. list_data = getRow_ots(rows)
  1092. for _data in list_data:
  1093. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1094. document_tmp_docid:_data.get(document_tmp_docid),
  1095. document_tmp_status:0}
  1096. _document = Document(_d)
  1097. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1098. _document_html = Document(_data)
  1099. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  1100. if page_attachments!="[]":
  1101. status = random.randint(1,10)
  1102. _data[document_tmp_status] = status
  1103. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1104. else:
  1105. status = random.randint(11,50)
  1106. _data[document_tmp_status] = status
  1107. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1108. if send_succeed:
  1109. _document.update_row(self.ots_client)
  1110. else:
  1111. log("send_msg_error2222")
  1112. while next_token:
  1113. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1114. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1115. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1116. list_data = getRow_ots(rows)
  1117. for _data in list_data:
  1118. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1119. document_tmp_docid:_data.get(document_tmp_docid),
  1120. document_tmp_status:0}
  1121. _document = Document(_d)
  1122. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1123. _document_html = Document(_data)
  1124. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  1125. if page_attachments!="[]":
  1126. status = random.randint(1,10)
  1127. _data[document_tmp_status] = status
  1128. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1129. else:
  1130. status = random.randint(11,50)
  1131. _data[document_tmp_status] = status
  1132. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1133. if send_succeed:
  1134. _document.update_row(self.ots_client)
  1135. else:
  1136. log("send_msg_error2222")
  1137. except Exception as e:
  1138. traceback.print_exc()
  1139. def otstmp2mq(self):
  1140. try:
  1141. bool_query = BoolQuery(must_queries=[TermQuery("status",0)])
  1142. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1143. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  1144. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1145. list_data = getRow_ots(rows)
  1146. for _data in list_data:
  1147. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1148. document_tmp_docid:_data.get(document_tmp_docid),
  1149. document_tmp_status:0}
  1150. _document = Document_tmp(_d)
  1151. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1152. _document_html = Document_html(_data)
  1153. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  1154. if page_attachments!="[]":
  1155. status = random.randint(1,10)
  1156. _data[document_tmp_status] = status
  1157. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1158. else:
  1159. status = random.randint(11,50)
  1160. _data[document_tmp_status] = status
  1161. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1162. if send_succeed:
  1163. _document.setValue(document_tmp_status,1,True)
  1164. _document.update_row(self.ots_client)
  1165. else:
  1166. log("send_msg_error2222")
  1167. while next_token:
  1168. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1169. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1170. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1171. list_data = getRow_ots(rows)
  1172. for _data in list_data:
  1173. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1174. document_tmp_docid:_data.get(document_tmp_docid),
  1175. document_tmp_status:0}
  1176. _document = Document_tmp(_d)
  1177. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1178. _document_html = Document_html(_data)
  1179. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  1180. if page_attachments!="[]":
  1181. status = random.randint(1,10)
  1182. _data[document_tmp_status] = status
  1183. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1184. else:
  1185. status = random.randint(11,50)
  1186. _data[document_tmp_status] = status
  1187. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1188. if send_succeed:
  1189. _document.setValue(document_tmp_status,1,True)
  1190. _document.update_row(self.ots_client)
  1191. else:
  1192. log("send_msg_error2222")
  1193. except Exception as e:
  1194. traceback.print_exc()
  1195. def test_dump_docid(self):
  1196. class TestDumpListener(ActiveMQListener):
  1197. def on_message(self, headers):
  1198. message_id = headers.headers["message-id"]
  1199. body = headers.body
  1200. self._queue.put(headers,True)
  1201. ackMsg(self.conn,message_id)
  1202. _queue = Queue()
  1203. listener1 = TestDumpListener(getConnect_activateMQ(),_queue)
  1204. listener2 = TestDumpListener(getConnect_activateMQ(),_queue)
  1205. createComsumer(listener1,"/queue/dataflow_attachment")
  1206. createComsumer(listener2,"/queue/dataflow_extract")
  1207. time.sleep(10)
  1208. list_item = []
  1209. list_docid = []
  1210. while 1:
  1211. try:
  1212. _item = _queue.get(timeout=2)
  1213. list_item.append(_item)
  1214. except Exception as e:
  1215. break
  1216. for item in list_item:
  1217. _item = json.loads(item.body)
  1218. list_docid.append(_item.get("docid"))
  1219. log(list_docid[:10])
  1220. log("len docid:%d set len:%d"%(len(list_docid),len(set(list_docid))))
  1221. def start_dataflow_init(self):
  1222. # self.test_dump_docid()
  1223. from BaseDataMaintenance.model.oracle.CaiGouYiXiangTemp import CaiGouYiXiangTemp
  1224. from BaseDataMaintenance.model.oracle.PaiMaiChuRangTemp import PaiMaiChuRangTemp
  1225. from BaseDataMaintenance.model.oracle.ZhaoBiaoGongGaoTemp import ZhaoBiaoGongGaoTemp
  1226. from BaseDataMaintenance.model.oracle.ZhaoBiaoYuGaoTemp import ZhaoBiaoYuGaoTemp
  1227. from BaseDataMaintenance.model.oracle.ZhongBiaoXinXiTemp import ZhongBiaoXinXiTemp
  1228. from BaseDataMaintenance.model.oracle.ZiShenJieGuoTemp import ZiShenJieGuoTemp
  1229. from BaseDataMaintenance.model.oracle.ChanQuanJiaoYiTemp import ChanQuanJiaoYiTemp
  1230. from BaseDataMaintenance.model.oracle.GongGaoBianGengTemp import GongGaoBianGeng
  1231. from BaseDataMaintenance.model.oracle.KongZhiJiaTemp import KongZhiJiaTemp
  1232. from BaseDataMaintenance.model.oracle.TuDiKuangChanTemp import TuDiKuangChanTemp
  1233. from BaseDataMaintenance.model.oracle.ZhaoBiaoDaYiTemp import ZhaoBiaoDaYiTemp
  1234. from BaseDataMaintenance.model.oracle.ZhaoBiaoWenJianTemp import ZhaoBiaoWenJianTemp
  1235. schedule = BlockingScheduler()
  1236. schedule.add_job(self.temp2mq,"cron",args=(CaiGouYiXiangTemp({}),),second="*/10")
  1237. schedule.add_job(self.temp2mq,"cron",args=(PaiMaiChuRangTemp({}),),second="*/10")
  1238. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoGongGaoTemp({}),),second="*/10")
  1239. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoYuGaoTemp({}),),second="*/10")
  1240. schedule.add_job(self.temp2mq,"cron",args=(ZhongBiaoXinXiTemp({}),),second="*/10")
  1241. schedule.add_job(self.temp2mq,"cron",args=(ZiShenJieGuoTemp({}),),second="*/10")
  1242. schedule.add_job(self.temp2mq,"cron",args=(ChanQuanJiaoYiTemp({}),),second="*/10")
  1243. schedule.add_job(self.temp2mq,"cron",args=(GongGaoBianGeng({}),),second="*/10")
  1244. schedule.add_job(self.temp2mq,"cron",args=(KongZhiJiaTemp({}),),second="*/10")
  1245. schedule.add_job(self.temp2mq,"cron",args=(TuDiKuangChanTemp({}),),second="*/10")
  1246. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10")
  1247. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10")
  1248. schedule.add_job(self.ots2mq,"cron",second="*/10")
  1249. schedule.add_job(self.otstmp2mq,"cron",second="*/10")
  1250. schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  1251. schedule.start()
  1252. def transform_attachment():
  1253. from BaseDataMaintenance.model.ots.attachment import attachment
  1254. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  1255. from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots
  1256. from threading import Thread
  1257. from queue import Queue
  1258. task_queue = Queue()
  1259. def comsumer(task_queue,pool_postgres):
  1260. while 1:
  1261. _dict = task_queue.get(True)
  1262. try:
  1263. attach = Attachment_postgres(_dict)
  1264. if not attach.exists(pool_postgres):
  1265. attach.insert_row(pool_postgres)
  1266. except Exception as e:
  1267. traceback.print_exc()
  1268. def start_comsumer(task_queue):
  1269. pool_postgres = ConnectorPool(10,30,getConnection_postgres)
  1270. comsumer_count = 30
  1271. list_thread = []
  1272. for i in range(comsumer_count):
  1273. _t = Thread(target=comsumer,args=(task_queue,pool_postgres))
  1274. list_thread.append(_t)
  1275. for _t in list_thread:
  1276. _t.start()
  1277. ots_client = getConnect_ots()
  1278. _thread = Thread(target=start_comsumer,args=(task_queue,))
  1279. _thread.start()
  1280. bool_query = BoolQuery(must_queries=[
  1281. RangeQuery(attachment_crtime,"2022-05-06"),
  1282. BoolQuery(should_queries=[TermQuery(attachment_status,10),
  1283. TermQuery(attachment_status,ATTACHMENT_TOOLARGE)])
  1284. ])
  1285. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  1286. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(attachment_crtime,sort_order=SortOrder.DESC)]),get_total_count=True,limit=100),
  1287. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1288. list_dict = getRow_ots(rows)
  1289. for _dict in list_dict:
  1290. task_queue.put(_dict,True)
  1291. _count = len(list_dict)
  1292. while next_token:
  1293. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  1294. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1295. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1296. list_dict = getRow_ots(rows)
  1297. for _dict in list_dict:
  1298. task_queue.put(_dict,True)
  1299. _count += len(list_dict)
  1300. print("%d/%d,queue:%d"%(_count,total_count,task_queue.qsize()))
  1301. def del_test_doc():
  1302. ots_client = getConnect_ots()
  1303. bool_query = BoolQuery(must_queries=[RangeQuery("docid",range_to=0)])
  1304. list_data = []
  1305. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1306. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  1307. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1308. print(total_count)
  1309. list_row = getRow_ots(rows)
  1310. list_data.extend(list_row)
  1311. while next_token:
  1312. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1313. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1314. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1315. list_row = getRow_ots(rows)
  1316. list_data.extend(list_row)
  1317. for _row in list_data:
  1318. _doc = Document_tmp(_row)
  1319. _doc.delete_row(ots_client)
  1320. _html = Document_html(_row)
  1321. if _html.exists_row(ots_client):
  1322. _html.delete_row(ots_client)
  1323. def fixDoc_to_queue_extract():
  1324. pool_mq = ConnectorPool(10,20,getConnect_activateMQ)
  1325. try:
  1326. ots_client = getConnect_ots()
  1327. ots_capacity = getConnect_ots_capacity()
  1328. bool_query = BoolQuery(must_queries=[
  1329. RangeQuery("crtime","2022-05-31"),
  1330. TermQuery("docchannel",114)
  1331. ])
  1332. list_data = []
  1333. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1334. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  1335. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1336. print(total_count)
  1337. list_row = getRow_ots(rows)
  1338. list_data.extend(list_row)
  1339. _count = len(list_row)
  1340. while next_token:
  1341. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1342. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1343. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1344. list_row = getRow_ots(rows)
  1345. list_data.extend(list_row)
  1346. _count = len(list_data)
  1347. print("%d/%d"%(_count,total_count))
  1348. task_queue = Queue()
  1349. for _row in list_data:
  1350. if "all_columns" in _row:
  1351. _row.pop("all_columns")
  1352. _html = Document(_row)
  1353. task_queue.put(_html)
  1354. def _handle(item,result_queue):
  1355. _html = item
  1356. _html.fix_columns(ots_capacity,["dochtmlcon"],True)
  1357. print(_html.getProperties().get(document_tmp_docid))
  1358. send_msg_toacmq(pool_mq,json.dumps(_html.getProperties()),"/queue/dataflow_extract")
  1359. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1360. mt.run()
  1361. except Exception as e:
  1362. traceback.print_exc()
  1363. finally:
  1364. pool_mq.destory()
  1365. def check_data_synchronization():
  1366. # filepath = "C:\\Users\\Administrator\\Desktop\\to_check.log"
  1367. # list_uuid = []
  1368. # _regrex = "delete\s+(?P<tablename>[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  1369. # with open(filepath,"r",encoding="utf8") as f:
  1370. # while 1:
  1371. # _line = f.readline()
  1372. # if not _line:
  1373. # break
  1374. # _match = re.search(_regrex,_line)
  1375. # if _match is not None:
  1376. # _uuid = _match.groupdict().get("uuid")
  1377. # tablename = _match.groupdict.get("tablename")
  1378. # if _uuid is not None:
  1379. # list_uuid.append({"uuid":_uuid,"tablename":tablename})
  1380. # print("total_count:",len(list_uuid))
  1381. import pandas as pd
  1382. from BaseDataMaintenance.common.Utils import load
  1383. task_queue = Queue()
  1384. list_data = []
  1385. df_data = load("uuid.pk")
  1386. # df_data = pd.read_excel("check.xlsx")
  1387. for uuid,tablename in zip(df_data["uuid"],df_data["tablename"]):
  1388. _dict = {"uuid":uuid,
  1389. "tablename":tablename}
  1390. list_data.append(_dict)
  1391. task_queue.put(_dict)
  1392. print("qsize:",task_queue.qsize())
  1393. ots_client = getConnect_ots()
  1394. def _handle(_item,result_queue):
  1395. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  1396. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1397. SearchQuery(bool_query,get_total_count=True),
  1398. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1399. _item["exists"] = total_count
  1400. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1401. mt.run()
  1402. df_data = {"uuid":[],
  1403. "tablename":[],
  1404. "exists":[]}
  1405. for _data in list_data:
  1406. if _data["exists"]==0:
  1407. for k,v in df_data.items():
  1408. v.append(_data.get(k))
  1409. import pandas as pd
  1410. df2 = pd.DataFrame(df_data)
  1411. df2.to_excel("check1.xlsx")
  1412. current_path = os.path.abspath(os.path.dirname(__file__))
  1413. def fixDoc_to_queue_init(filename=""):
  1414. import pandas as pd
  1415. from BaseDataMaintenance.model.oracle.GongGaoTemp import dict_oracle2ots
  1416. if filename=="":
  1417. filename = os.path.join(current_path,"check.xlsx")
  1418. df = pd.read_excel(filename)
  1419. if "docchannel" in dict_oracle2ots:
  1420. dict_oracle2ots.pop("docchannel")
  1421. row_name = ",".join(list(dict_oracle2ots.keys()))
  1422. conn = getConnection_oracle()
  1423. cursor = conn.cursor()
  1424. _count = 0
  1425. for uuid,tablename,_exists,_toolong in zip(df["uuid"],df["tablename"],df["exists"],df["tolong"]):
  1426. if _exists==0 and _toolong==0:
  1427. _count += 1
  1428. _source = str(tablename).replace("_TEMP","")
  1429. sql = " insert into %s(%s) select %s from %s where id='%s' "%(tablename,row_name,row_name,_source,uuid)
  1430. cursor.execute(sql)
  1431. log("%d:%s"%(_count,sql))
  1432. conn.commit()
  1433. conn.close()
  1434. if __name__ == '__main__':
  1435. # di = Dataflow_init()
  1436. # di.start_dataflow_init()
  1437. # transform_attachment()
  1438. # del_test_doc()
  1439. # de = Dataflow_ActivteMQ_extract()
  1440. # de.start_flow_extract()
  1441. # fixDoc_to_queue_extract()
  1442. # check_data_synchronization()
  1443. # fixDoc_to_queue_init(filename="C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-07.xlsx")
  1444. current_date = getCurrent_date(format="%Y-%m-%d")
  1445. last_date = timeAdd(current_date,-30,format="%Y-%m-%d")
  1446. sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
  1447. print(sql)