dataflow_mq.py 68 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520
  1. from BaseDataMaintenance.maintenance.dataflow import *
  2. from BaseDataMaintenance.common.activateMQUtils import *
  3. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ,getConnection_postgres,getConnection_mysql,getConnection_oracle,getConnect_ots_capacity,getConnect_redis_doc
  4. from BaseDataMaintenance.dataSource.setttings import *
  5. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  6. import os
  7. from BaseDataMaintenance.common.ossUtils import *
  8. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  9. from BaseDataMaintenance.model.ots.document import Document
  10. from BaseDataMaintenance.common.Utils import article_limit
  11. from BaseDataMaintenance.common.documentFingerprint import getFingerprint
  12. from BaseDataMaintenance.model.postgres.document_extract import *
  13. import sys
  14. sys.setrecursionlimit(1000000)
  15. class ActiveMQListener():
  16. def __init__(self,conn,_queue,*args,**kwargs):
  17. self.conn = conn
  18. self._queue = _queue
  19. def on_error(self, headers):
  20. log("===============")
  21. log('received an error %s' % str(headers.body))
  22. def on_message(self, headers):
  23. log("====message====")
  24. message_id = headers.headers["message-id"]
  25. body = headers.body
  26. self._queue.put({"frame":headers,"conn":self.conn},True)
  27. def __del__(self):
  28. self.conn.disconnect()
  29. class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
  30. def __init__(self):
  31. Dataflow_attachment.__init__(self)
  32. self.mq_attachment = "/queue/dataflow_attachment"
  33. self.mq_attachment_failed = "/queue/dataflow_attachment_failed"
  34. self.mq_extract = "/queue/dataflow_extract"
  35. self.comsumer_count = 120
  36. self.retry_comsumer_count = 10
  37. self.retry_times = 5
  38. self.list_attachment_comsumer = []
  39. for _i in range(self.comsumer_count):
  40. listener_attachment = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  41. createComsumer(listener_attachment,self.mq_attachment)
  42. self.list_attachment_comsumer.append(listener_attachment)
  43. self.attach_pool = ConnectorPool(10,30,getConnection_postgres)
  44. self.conn_mq = getConnect_activateMQ()
  45. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  46. def monitor_listener(self):
  47. for i in range(len(self.list_attachment_comsumer)):
  48. if self.list_attachment_comsumer[i].conn.is_connected():
  49. continue
  50. else:
  51. listener = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  52. createComsumer(listener,self.mq_attachment)
  53. self.list_attachment_comsumer[i] = listener
  54. def process_failed_attachment(self):
  55. from BaseDataMaintenance.java.MQInfo import getQueueSize
  56. attachment_size = getQueueSize("dataflow_attachment")
  57. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  58. if attachment_size<100 and failed_attachment_size>0:
  59. list_comsumer = []
  60. for _i in range(self.retry_comsumer_count):
  61. listener_attachment = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  62. list_comsumer.append(listener_attachment)
  63. createComsumer(listener_attachment,self.mq_attachment_failed)
  64. while 1:
  65. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  66. if failed_attachment_size==0:
  67. break
  68. time.sleep(10)
  69. for _c in list_comsumer:
  70. _c.conn.disconnect()
  71. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  72. try:
  73. list_html = []
  74. swf_urls = []
  75. _not_failed = True
  76. for _attach in list_attach:
  77. #测试全跑
  78. _filemd5 = _attach.getProperties().get(attachment_filemd5)
  79. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  80. log("%s has processed or toolarge"%(_filemd5))
  81. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  82. if _html is None:
  83. _html = ""
  84. list_html.append({attachment_filemd5:_filemd5,
  85. "html":_html})
  86. else:
  87. #has process_time then jump
  88. if len(str(_attach.getProperties().get(attachment_process_time,"")))>10:
  89. log("%s has process_time jump"%(_filemd5))
  90. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  91. if _html is None:
  92. _html = ""
  93. list_html.append({attachment_filemd5:_filemd5,
  94. "html":_html})
  95. else:
  96. log("%s requesting interface"%(_filemd5))
  97. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  98. if not _succeed:
  99. _not_failed = False
  100. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  101. if _html is None:
  102. _html = ""
  103. list_html.append({attachment_filemd5:_filemd5,
  104. "html":_html})
  105. if _attach.getProperties().get(attachment_filetype)=="swf":
  106. swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  107. if not _not_failed:
  108. return False,list_html,swf_urls
  109. return True,list_html,swf_urls
  110. except requests.ConnectionError as e1:
  111. raise e1
  112. except Exception as e:
  113. return False,list_html,swf_urls
  114. def attachment_recognize(self,_dict,result_queue):
  115. '''
  116. 识别附件内容
  117. :param _dict: 附件内容
  118. :param result_queue:
  119. :return:
  120. '''
  121. try:
  122. item = _dict.get("item")
  123. list_attach = _dict.get("list_attach")
  124. conn = _dict["conn"]
  125. message_id = _dict.get("message_id")
  126. _retry_times = item.get("retry_times",0)
  127. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  128. "docid":item.get("docid")})
  129. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  130. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  131. dtmp = Document_tmp(item)
  132. #调用识别接口
  133. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  134. _to_ack = False
  135. if not _succeed and _retry_times<self.retry_times:
  136. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  137. item["retry_times"] = _retry_times+1
  138. #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
  139. if item["retry_times"]>=self.retry_times:
  140. send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
  141. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
  142. #失败保存
  143. if _retry_times==0:
  144. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  145. dtmp.setValue(document_tmp_status,0,True)
  146. if not dtmp.exists_row(self.ots_client):
  147. dtmp.update_row(self.ots_client)
  148. dhtml.update_row(self.ots_client)
  149. if send_succeed:
  150. _to_ack = True
  151. else:
  152. try:
  153. log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),str(len(list_html))))
  154. dhtml.updateSWFImages(swf_urls)
  155. dhtml.updateAttachment(list_html)
  156. dtmp.setValue(document_tmp_attachment_extract_status,1,True)
  157. dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
  158. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
  159. if send_succeed:
  160. _to_ack = True
  161. except Exception as e:
  162. traceback.print_exc()
  163. if _to_ack:
  164. ackMsg(conn,message_id)
  165. log("document:%d get attachments with result:%s %s retry_times:%d"%(item.get("docid"),str(_succeed),str(_to_ack),_retry_times))
  166. except Exception as e:
  167. if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment):
  168. ackMsg(conn,message_id)
  169. def request_attachment_interface(self,attach,_dochtmlcon):
  170. filemd5 = attach.getProperties().get(attachment_filemd5)
  171. _status = attach.getProperties().get(attachment_status)
  172. _filetype = attach.getProperties().get(attachment_filetype)
  173. _path = attach.getProperties().get(attachment_path)
  174. _uuid = uuid4()
  175. objectPath = attach.getProperties().get(attachment_path)
  176. docids = attach.getProperties().get(attachment_docids)
  177. if objectPath is None:
  178. relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex)
  179. else:
  180. relative_path = objectPath[5:]
  181. localpath = "/FileInfo/%s"%(relative_path)
  182. if not os.path.exists(localpath):
  183. if not os.path.exists(os.path.dirname(localpath)):
  184. os.makedirs(os.path.dirname(localpath))
  185. local_exists = False
  186. else:
  187. local_exists = True
  188. _size = os.path.getsize(localpath)
  189. not_failed_flag = True
  190. try:
  191. d_start_time = time.time()
  192. if not local_exists:
  193. log("md5:%s path:%s not exists,start downloading"%(filemd5,objectPath))
  194. try:
  195. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  196. except Exception as e:
  197. download_succeed = False
  198. else:
  199. log("md5:%s path:%s exists"%(filemd5,objectPath[5:]))
  200. if not (local_exists or download_succeed):
  201. _ots_attach = attachment(attach.getProperties_ots())
  202. _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_classification,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True)
  203. log("md5:%s path:%s file not in local or oss,search ots.attachment"%(filemd5,objectPath[5:]))
  204. if _ots_attach.getProperties().get(attachment_attachmenthtml,"")!="" and str(_ots_attach.getProperties().get(attachment_status))!=str(ATTACHMENT_INIT):
  205. attach.setValue(attachment_attachmenthtml,_ots_attach.getProperties().get(attachment_attachmenthtml,""))
  206. attach.setValue(attachment_attachmentcon,_ots_attach.getProperties().get(attachment_attachmentcon,""))
  207. attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,""))
  208. attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,""))
  209. attach.setValue(attachment_classification,_ots_attach.getProperties().get(attachment_classification,""))
  210. if attach.exists(self.attach_pool):
  211. attach.update_row(self.attach_pool)
  212. else:
  213. attach.insert_row(self.attach_pool)
  214. try:
  215. if os.exists(localpath):
  216. os.remove(localpath)
  217. except Exception as e:
  218. pass
  219. return True
  220. if _ots_exists:
  221. objectPath = attach.getProperties().get(attachment_path)
  222. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  223. if download_succeed:
  224. log("md5:%s path:%s download file from oss succeed"%(filemd5,objectPath[5:]))
  225. else:
  226. log("md5:%s path:%s download file from ots failed=="%(filemd5,objectPath[5:]))
  227. else:
  228. log("md5:%s path:%s not found in ots"%(filemd5,objectPath[5:]))
  229. if local_exists or download_succeed:
  230. _size = os.path.getsize(localpath)
  231. attach.setValue(attachment_size,_size,True)
  232. if _size>ATTACHMENT_LARGESIZE:
  233. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE,True)
  234. log("attachment :%s of path:%s to large"%(filemd5,_path))
  235. _ots_attach = attachment(attach.getProperties_ots())
  236. _ots_attach.update_row(self.ots_client)
  237. #更新postgres
  238. if attach.exists(self.attach_pool):
  239. attach.update_row(self.attach_pool)
  240. else:
  241. attach.insert_row(self.attach_pool)
  242. if local_exists:
  243. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  244. os.remove(localpath)
  245. return True
  246. time_download = time.time()-d_start_time
  247. #调用接口处理结果
  248. start_time = time.time()
  249. _filetype = attach.getProperties().get(attachment_filetype)
  250. # _data_base64 = base64.b64encode(open(localpath,"rb").read())
  251. # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
  252. _success,_html,swf_images,classification = getAttachDealInterface(None,_filetype,path=localpath)
  253. log("process filemd5:%s %s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
  254. if _success:
  255. if len(_html)<5:
  256. _html = ""
  257. else:
  258. sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  259. _html = ""
  260. return False
  261. swf_images = eval(swf_images)
  262. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  263. swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  264. if len(swf_urls)==0:
  265. objectPath = attach.getProperties().get(attachment_path,"")
  266. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  267. if not os.path.exists(swf_dir):
  268. os.mkdir(swf_dir)
  269. for _i in range(len(swf_images)):
  270. _base = swf_images[_i]
  271. _base = base64.b64decode(_base)
  272. filename = "swf_page_%d.png"%(_i)
  273. filepath = os.path.join(swf_dir,filename)
  274. with open(filepath,"wb") as f:
  275. f.write(_base)
  276. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  277. if os.path.exists(swf_dir):
  278. os.rmdir(swf_dir)
  279. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  280. if re.search("<td",_html) is not None:
  281. attach.setValue(attachment_has_table,1,True)
  282. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  283. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  284. if _file_title!="":
  285. attach.setValue(attachment_file_title,_file_title,True)
  286. if filelink!="":
  287. attach.setValue(attachment_file_link,filelink,True)
  288. attach.setValue(attachment_attachmenthtml,_html,True)
  289. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  290. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  291. attach.setValue(attachment_recsize,len(_html),True)
  292. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  293. attach.setValue(attachment_classification,classification,True)
  294. #更新ots
  295. _ots_attach = attachment(attach.getProperties_ots())
  296. _ots_attach.update_row(self.ots_client) #线上再开放更新
  297. #更新postgres
  298. if attach.exists(self.attach_pool):
  299. attach.update_row(self.attach_pool)
  300. else:
  301. attach.insert_row(self.attach_pool)
  302. if local_exists:
  303. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  304. try:
  305. if upload_status and os.exists(localpath):
  306. os.remove(localpath)
  307. except Exception as e:
  308. pass
  309. return True
  310. else:
  311. return True
  312. except requests.ConnectionError as e1:
  313. raise e1
  314. except oss2.exceptions.NotFound as e:
  315. return True
  316. except Exception as e:
  317. traceback.print_exc()
  318. def flow_attachment(self):
  319. self.flow_attachment_producer()
  320. self.flow_attachment_producer_comsumer()
  321. def getAttachPath(self,filemd5,_dochtmlcon):
  322. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  323. _find = _soup.find("a",attrs={"data":filemd5})
  324. filelink = ""
  325. if _find is None:
  326. _find = _soup.find("img",attrs={"data":filemd5})
  327. if _find is not None:
  328. filelink = _find.attrs.get("src","")
  329. else:
  330. filelink = _find.attrs.get("href","")
  331. _path = filelink.split("/file")
  332. if len(_path)>1:
  333. return _path[1]
  334. def getAttachments(self,list_filemd5,_dochtmlcon):
  335. conn = self.attach_pool.getConnector()
  336. #搜索postgres
  337. try:
  338. to_find_md5 = []
  339. for _filemd5 in list_filemd5[:50]:
  340. if _filemd5 is not None:
  341. to_find_md5.append(_filemd5)
  342. conditions = ["filemd5 in ('%s')"%("','".join(to_find_md5))]
  343. list_attachment = Attachment_postgres.select_rows(conn,Attachment_postgres,"attachment",conditions)
  344. log("select localpath database %d/%d"%(len(list_attachment),len(to_find_md5)))
  345. set_md5 = set()
  346. for _attach in list_attachment:
  347. set_md5.add(_attach.getProperties().get(attachment_filemd5))
  348. list_not_in_md5 = []
  349. for _filemd5 in to_find_md5:
  350. if _filemd5 not in set_md5:
  351. list_not_in_md5.append(_filemd5)
  352. _path = self.getAttachPath(_filemd5,_dochtmlcon)
  353. if _path is not None:
  354. if _path[0]=="/":
  355. _path = _path[1:]
  356. _filetype = _path.split(".")[-1]
  357. _attach = {attachment_filemd5:_filemd5,
  358. attachment_filetype:_filetype,
  359. attachment_status:20,
  360. attachment_path:"%s/%s"%(_filemd5[:4],_path),
  361. attachment_crtime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  362. list_attachment.append(Attachment_postgres(_attach))
  363. else:
  364. log("getAttachments search in ots:%s"%(_filemd5))
  365. _attach = {attachment_filemd5:_filemd5}
  366. _attach_ots = attachment(_attach)
  367. _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time],True)
  368. if _attach_ots.getProperties().get(attachment_status) is not None:
  369. log("getAttachments find in ots:%s"%(_filemd5))
  370. list_attachment.append(Attachment_postgres(_attach_ots.getProperties()))
  371. return list_attachment
  372. except Exception as e:
  373. log("attachProcess comsumer error %s"%str(e))
  374. log(str(to_find_md5))
  375. traceback.print_exc()
  376. return []
  377. finally:
  378. self.attach_pool.putConnector(conn)
  379. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  380. q_size = self.queue_attachment.qsize()
  381. qsize_ocr = self.queue_attachment_ocr.qsize()
  382. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  383. log("queue_attachment:%d,queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(q_size,qsize_ocr,qsize_not_ocr))
  384. def flow_attachment_producer_comsumer(self):
  385. log("start flow_attachment comsumer")
  386. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1,restart=True)
  387. mt.run()
  388. def set_queue(self,_dict):
  389. list_attach = _dict.get("list_attach")
  390. to_ocr = False
  391. for attach in list_attach:
  392. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  393. to_ocr = True
  394. break
  395. if to_ocr:
  396. self.queue_attachment_ocr.put(_dict,True)
  397. else:
  398. self.queue_attachment_not_ocr.put(_dict,True)
  399. def comsumer_handle(self,_dict,result_queue):
  400. try:
  401. frame = _dict["frame"]
  402. conn = _dict["conn"]
  403. message_id = frame.headers["message-id"]
  404. item = json.loads(frame.body)
  405. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  406. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  407. if len(page_attachments)==0:
  408. self.set_queue({"item":item,"list_attach":[],"message_id":message_id,"conn":conn})
  409. else:
  410. list_fileMd5 = []
  411. for _atta in page_attachments:
  412. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  413. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  414. self.set_queue({"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn})
  415. except Exception as e:
  416. traceback.print_exc()
  417. def remove_attachment_postgres(self):
  418. current_date = getCurrent_date(format="%Y-%m-%d")
  419. last_date = timeAdd(current_date,-30,format="%Y-%m-%d")
  420. sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
  421. conn = getConnection_postgres()
  422. cursor = conn.cursor()
  423. cursor.execute(sql)
  424. self.conn_attach.commit()
  425. conn.close()
  426. def start_flow_attachment(self):
  427. schedule = BlockingScheduler()
  428. schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  429. schedule.add_job(self.flow_attachment,"cron",second="*/10")
  430. schedule.add_job(self.monitor_attachment_process,"cron",second="*/10")
  431. schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
  432. schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
  433. schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  434. schedule.start()
  435. class Dataflow_ActivteMQ_extract(Dataflow_extract):
  436. class ExtractListener():
  437. def __init__(self,conn,_func,*args,**kwargs):
  438. self.conn = conn
  439. self._func = _func
  440. def on_message(self, headers):
  441. try:
  442. log("get message")
  443. message_id = headers.headers["message-id"]
  444. body = headers.body
  445. log("get message %s crtime:%s"%(message_id,json.loads(body).get("crtime","")))
  446. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  447. except Exception as e:
  448. traceback.print_exc()
  449. pass
  450. def on_error(self, headers):
  451. log('received an error %s' % str(headers.body))
  452. def __del__(self):
  453. self.conn.disconnect()
  454. def __init__(self):
  455. Dataflow_extract.__init__(self)
  456. self.industy_url = "http://127.0.0.1:15000/industry_extract"
  457. self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",20],
  458. ["http://192.168.0.115:15030/content_extract",10]
  459. ]
  460. self.mq_extract = "/queue/dataflow_extract"
  461. self.mq_extract_failed = "/queue/dataflow_extract_failed"
  462. self.whole_weight = 0
  463. for _url,weight in self.extract_interfaces:
  464. self.whole_weight+= weight
  465. current_weight = 0
  466. for _i in range(len(self.extract_interfaces)):
  467. current_weight += self.extract_interfaces[_i][1]
  468. self.extract_interfaces[_i][1] = current_weight/self.whole_weight
  469. self.comsumer_count = 40
  470. self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres)
  471. self.pool_redis_doc = ConnectorPool(10,self.comsumer_count,getConnect_redis_doc)
  472. self.conn_mq = getConnect_activateMQ()
  473. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  474. self.block_url = RLock()
  475. self.url_count = 0
  476. self.list_extract_comsumer = []
  477. for _i in range(self.comsumer_count):
  478. listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
  479. createComsumer(listener_extract,self.mq_extract)
  480. self.list_extract_comsumer.append(listener_extract)
  481. def monitor_listener(self):
  482. for i in range(len(self.list_extract_comsumer)):
  483. if self.list_extract_comsumer[i].conn.is_connected():
  484. continue
  485. else:
  486. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
  487. createComsumer(listener,self.mq_extract)
  488. self.list_extract_comsumer[i] = listener
  489. def getExtract_url(self):
  490. _url_num = 0
  491. with self.block_url:
  492. self.url_count += 1
  493. self.url_count %= self.whole_weight
  494. _url_num = self.url_count
  495. # _r = random.random()
  496. _r = _url_num/self.whole_weight
  497. for _i in range(len(self.extract_interfaces)):
  498. if _r<=self.extract_interfaces[_i][1]:
  499. return self.extract_interfaces[_i][0]
  500. def request_extract_interface(self,json,headers):
  501. # _i = random.randint(0,len(self.extract_interfaces)-1)
  502. # _i = 0
  503. # _url = self.extract_interfaces[_i]
  504. _url = self.getExtract_url()
  505. log("extract_url:%s"%(str(_url)))
  506. resp = requests.post(_url,json=json,headers=headers,timeout=10*60)
  507. return resp
  508. def request_industry_interface(self,json,headers):
  509. resp = requests.post(self.industy_url,json=json,headers=headers)
  510. return resp
  511. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  512. q_size = self.queue_extract.qsize()
  513. log("queue extract size:%d"%(q_size))
  514. def process_extract_failed(self):
  515. def _handle(_dict,result_queue):
  516. frame = _dict.get("frame")
  517. message_id = frame.headers["message-id"]
  518. subscription = frame.headers.setdefault('subscription', None)
  519. conn = _dict.get("conn")
  520. body = frame.body
  521. if body is not None:
  522. item = json.loads(body)
  523. item["extract_times"] = 10
  524. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  525. ackMsg(conn,message_id,subscription)
  526. from BaseDataMaintenance.java.MQInfo import getQueueSize
  527. extract_failed_size = getQueueSize("dataflow_extract_failed")
  528. extract_size = getQueueSize("dataflow_extract")
  529. log("extract_failed_size %s extract_size %s"%(str(extract_failed_size),str(extract_size)))
  530. if extract_failed_size>0 and extract_size<100:
  531. failed_listener = self.ExtractListener(getConnect_activateMQ(),_handle)
  532. createComsumer(failed_listener,self.mq_extract_failed)
  533. while 1:
  534. extract_failed_size = getQueueSize("dataflow_extract_failed")
  535. if extract_failed_size==0:
  536. break
  537. time.sleep(10)
  538. failed_listener.conn.disconnect()
  539. def flow_extract(self,):
  540. self.comsumer()
  541. def comsumer(self):
  542. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,20,1,True)
  543. mt.run()
  544. def getExtract_json_fromDB(self,_fingerprint):
  545. conn = self.pool_postgres.getConnector()
  546. try:
  547. list_extract = Document_extract_postgres.select_rows(conn,Document_extract_postgres,"document_extract",[" fingerprint='%s'"%_fingerprint])
  548. if len(list_extract)>0:
  549. _extract = list_extract[0]
  550. return _extract.getProperties().get(document_extract_extract_json)
  551. except Exception as e:
  552. traceback.print_exc()
  553. finally:
  554. self.pool_postgres.putConnector(conn)
  555. return None
  556. def putExtract_json_toDB(self,fingerprint,docid,extract_json):
  557. _de = Document_extract_postgres({document_extract_fingerprint:fingerprint,
  558. document_extract_docid:docid,
  559. document_extract_extract_json:extract_json})
  560. _de.insert_row(self.pool_postgres,1)
  561. def getExtract_json_fromRedis(self,_fingerprint):
  562. db = self.pool_redis_doc.getConnector()
  563. try:
  564. _extract_json = db.get(_fingerprint)
  565. return _extract_json
  566. except Exception as e:
  567. log("getExtract_json_fromRedis error %s"%(str(e)))
  568. finally:
  569. try:
  570. if db.connection.check_health():
  571. self.pool_redis_doc.putConnector(db)
  572. except Exception as e:
  573. pass
  574. return None
  575. def putExtract_json_toRedis(self,fingerprint,extract_json):
  576. db = self.pool_redis_doc.getConnector()
  577. try:
  578. _extract_json = db.set(str(fingerprint),extract_json)
  579. db.expire(fingerprint,3600*2)
  580. return _extract_json
  581. except Exception as e:
  582. log("putExtract_json_toRedis error%s"%(str(e)))
  583. traceback.print_exc()
  584. finally:
  585. try:
  586. if db.connection.check_health():
  587. self.pool_redis_doc.putConnector(db)
  588. except Exception as e:
  589. pass
  590. def comsumer_handle(self,_dict,result_queue):
  591. try:
  592. log("start handle")
  593. frame = _dict["frame"]
  594. conn = _dict["conn"]
  595. message_id = frame.headers["message-id"]
  596. subscription = frame.headers.setdefault('subscription', None)
  597. item = json.loads(frame.body)
  598. dtmp = Document_tmp(item)
  599. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  600. "docid":item.get("docid")})
  601. extract_times = item.get("extract_times",0)+1
  602. item["extract_times"] = extract_times
  603. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  604. if len(_dochtmlcon)>200000:
  605. try:
  606. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  607. _soup = article_limit(_soup,200000)
  608. _dochtmlcon = str(_soup)
  609. except Exception as e:
  610. traceback.print_exc()
  611. ackMsg(conn,message_id,subscription)
  612. return
  613. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  614. _extract = Document_extract({})
  615. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  616. _extract.setValue(document_extract2_docid,item.get(document_docid))
  617. all_done = 1
  618. data = {}
  619. for k,v in item.items():
  620. data[k] = v
  621. data["timeout"] = 440
  622. data["doc_id"] = data.get(document_tmp_docid,0)
  623. # if data["docid"]<298986054 and data["docid"]>0:
  624. # log("jump docid %s"%(str(data["docid"])))
  625. # ackMsg(conn,message_id,subscription)
  626. # return
  627. data["content"] = data.get(document_tmp_dochtmlcon,"")
  628. if document_tmp_dochtmlcon in data:
  629. data.pop(document_tmp_dochtmlcon)
  630. data["title"] = data.get(document_tmp_doctitle,"")
  631. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  632. data["web_source_name"] = item.get(document_tmp_web_source_name,"")
  633. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  634. _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))
  635. if all_done>0:
  636. _time = time.time()
  637. # extract_json = self.getExtract_json_fromDB(_fingerprint)
  638. extract_json = self.getExtract_json_fromRedis(_fingerprint)
  639. log("get json from db takes %.4f"%(time.time()-_time))
  640. # extract_json = None
  641. _docid = int(data["doc_id"])
  642. if extract_json is not None:
  643. log("fingerprint %s exists docid:%s"%(_fingerprint,str(_docid)))
  644. _extract.setValue(document_extract2_extract_json,extract_json,True)
  645. else:
  646. resp = self.request_extract_interface(json=data,headers=self.header)
  647. if (resp.status_code >=200 and resp.status_code<=213):
  648. extract_json = resp.content.decode("utf8")
  649. _extract.setValue(document_extract2_extract_json,extract_json,True)
  650. _time = time.time()
  651. # self.putExtract_json_toDB(_fingerprint,_docid,extract_json)
  652. self.putExtract_json_toRedis(_fingerprint,extract_json)
  653. log("get json to db takes %.4f"%(time.time()-_time))
  654. else:
  655. log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  656. all_done = -2
  657. # if all_done>0:
  658. # resp = self.request_industry_interface(json=data,headers=self.header)
  659. # if (resp.status_code >=200 and resp.status_code<=213):
  660. # _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  661. # else:
  662. # log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  663. # all_done = -3
  664. # _to_ack = False
  665. # if all_done>0 and len(_extract.getProperties().get(document_extract2_extract_json,""))<=2:
  666. # all_done = -4
  667. _extract.setValue(document_extract2_industry_json,"{}",True)
  668. try:
  669. if all_done!=1:
  670. sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  671. if extract_times>=10:
  672. #process as succeed
  673. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  674. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  675. dtmp.update_row(self.ots_client)
  676. dhtml.update_row(self.ots_client)
  677. #replace as {}
  678. _extract.setValue(document_extract2_extract_json,"{}",True)
  679. _extract.setValue(document_extract2_industry_json,"{}",True)
  680. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  681. _extract.update_row(self.ots_client)
  682. _to_ack = True
  683. elif extract_times>5:
  684. #transform to the extract_failed queue
  685. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  686. #process as succeed
  687. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  688. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  689. dtmp.update_row(self.ots_client)
  690. dhtml.update_row(self.ots_client)
  691. #replace as {}
  692. _extract.setValue(document_extract2_extract_json,"{}",True)
  693. _extract.setValue(document_extract2_industry_json,"{}",True)
  694. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  695. _extract.update_row(self.ots_client)
  696. _to_ack = True
  697. else:
  698. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
  699. #失败保存
  700. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  701. dtmp.setValue(document_tmp_status,60,True)
  702. if not dtmp.exists_row(self.ots_client):
  703. dtmp.update_row(self.ots_client)
  704. dhtml.update_row(self.ots_client)
  705. if send_succeed:
  706. _to_ack = True
  707. else:
  708. #process succeed
  709. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  710. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  711. if _docid==290816703:
  712. dtmp.setValue("test_json",extract_json,True)
  713. dtmp.update_row(self.ots_client)
  714. dhtml.update_row(self.ots_client)
  715. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  716. _extract.update_row(self.ots_client)
  717. _to_ack = True
  718. except Exception:
  719. traceback.print_exc()
  720. if _to_ack:
  721. ackMsg(conn,message_id,subscription)
  722. log("process %s docid:%d %s"%(str(_to_ack),data["doc_id"],str(all_done)))
  723. except requests.ConnectionError as e1:
  724. item["extract_times"] -= 1
  725. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  726. ackMsg(conn,message_id,subscription)
  727. except Exception as e:
  728. traceback.print_exc()
  729. sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
  730. log("process %s docid: failed message_id:%s"%(data["doc_id"],message_id))
  731. if extract_times>=10:
  732. #process as succeed
  733. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  734. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  735. dtmp.update_row(self.ots_client)
  736. dhtml.update_row(self.ots_client)
  737. #replace as {}
  738. _extract.setValue(document_extract2_extract_json,"{}",True)
  739. _extract.setValue(document_extract2_industry_json,"{}",True)
  740. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  741. _extract.update_row(self.ots_client)
  742. ackMsg(conn,message_id,subscription)
  743. elif extract_times>5:
  744. #transform to the extract_failed queue
  745. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  746. #process as succeed
  747. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  748. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  749. dtmp.update_row(self.ots_client)
  750. dhtml.update_row(self.ots_client)
  751. #replace as {}
  752. _extract.setValue(document_extract2_extract_json,"{}",True)
  753. _extract.setValue(document_extract2_industry_json,"{}",True)
  754. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  755. _extract.update_row(self.ots_client)
  756. ackMsg(conn,message_id,subscription)
  757. else:
  758. #transform to the extract queue
  759. #失败保存
  760. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  761. dtmp.setValue(document_tmp_status,60,True)
  762. if not dtmp.exists_row(self.ots_client):
  763. dtmp.update_row(self.ots_client)
  764. dhtml.update_row(self.ots_client)
  765. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  766. ackMsg(conn,message_id,subscription)
  767. def delete_document_extract(self,save_count=70*10000):
  768. conn = self.pool_postgres.getConnector()
  769. try:
  770. cursor = conn.cursor()
  771. sql = " select max(docid),min(docid) from document_extract "
  772. cursor.execute(sql)
  773. rows = cursor.fetchall()
  774. if len(rows)>0:
  775. maxdocid,mindocid = rows[0]
  776. d_mindocid = int(maxdocid)-save_count
  777. if mindocid<d_mindocid:
  778. sql = " delete from document_extract where docid<%d"%d_mindocid
  779. cursor.execute(sql)
  780. conn.commit()
  781. except Exception as e:
  782. traceback.print_exc()
  783. finally:
  784. self.pool_postgres.putConnector(conn)
  785. def start_flow_extract(self):
  786. schedule = BlockingScheduler()
  787. schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
  788. schedule.add_job(self.process_extract_failed,"cron",minute="*/5")
  789. schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
  790. schedule.add_job(self.monitor_listener,"cron",minute="*/5")
  791. schedule.start()
  792. from multiprocessing import RLock
  793. docid_lock = RLock()
  794. conn_mysql = None
  795. def generateRangeDocid(nums):
  796. global conn_mysql
  797. while 1:
  798. try:
  799. with docid_lock:
  800. if conn_mysql is None:
  801. conn_mysql = getConnection_mysql()
  802. cursor = conn_mysql.cursor()
  803. sql = "select serial_value from b2c_serial_no where serial_name='DocumentIdSerial'"
  804. cursor.execute(sql)
  805. rows = cursor.fetchall()
  806. current_docid = rows[0][0]
  807. next_docid = current_docid+1
  808. update_docid = current_docid+nums
  809. sql = " update b2c_serial_no set serial_value=%d where serial_name='DocumentIdSerial'"%(update_docid)
  810. cursor.execute(sql)
  811. conn_mysql.commit()
  812. return next_docid
  813. except Exception as e:
  814. conn_mysql = getConnection_mysql()
  815. # 自定义jsonEncoder
  816. class MyEncoder(json.JSONEncoder):
  817. def default(self, obj):
  818. if isinstance(obj, np.ndarray):
  819. return obj.tolist()
  820. elif isinstance(obj, bytes):
  821. return str(obj, encoding='utf-8')
  822. elif isinstance(obj, (np.float_, np.float16, np.float32,
  823. np.float64,Decimal)):
  824. return float(obj)
  825. elif isinstance(obj,str):
  826. return obj
  827. return json.JSONEncoder.default(self, obj)
  828. class Dataflow_init(Dataflow):
  829. class InitListener():
  830. def __init__(self,conn,*args,**kwargs):
  831. self.conn = conn
  832. self.get_count = 1000
  833. self.count = self.get_count
  834. self.begin_docid = None
  835. self.mq_attachment = "/queue/dataflow_attachment"
  836. self.mq_extract = "/queue/dataflow_extract"
  837. self.pool_mq1 = ConnectorPool(1,4,getConnect_activateMQ)
  838. def on_error(self, headers):
  839. log('received an error %s' % headers.body)
  840. def getRangeDocid(self):
  841. begin_docid = generateRangeDocid(self.get_count)
  842. self.begin_docid = begin_docid
  843. self.count = 0
  844. def getNextDocid(self):
  845. if self.count>=self.get_count:
  846. self.getRangeDocid()
  847. next_docid = self.begin_docid+self.count
  848. self.count += 1
  849. return next_docid
  850. def on_message(self, headers):
  851. next_docid = int(self.getNextDocid())
  852. partitionkey = int(next_docid%500+1)
  853. message_id = headers.headers["message-id"]
  854. body = json.loads(headers.body)
  855. body[document_tmp_partitionkey] = partitionkey
  856. body[document_tmp_docid] = next_docid
  857. if body.get(document_original_docchannel) is None:
  858. body[document_original_docchannel] = body.get(document_docchannel)
  859. page_attachments = body.get(document_tmp_attachment_path,"[]")
  860. _uuid = body.get(document_tmp_uuid,"")
  861. if page_attachments!="[]":
  862. status = random.randint(1,10)
  863. body[document_tmp_status] = status
  864. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment):
  865. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  866. ackMsg(self.conn,message_id)
  867. else:
  868. log("send_msg_error on init listener")
  869. else:
  870. status = random.randint(11,50)
  871. body[document_tmp_status] = status
  872. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract):
  873. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  874. ackMsg(self.conn,message_id)
  875. else:
  876. log("send_msg_error on init listener")
  877. def __del__(self):
  878. self.conn.disconnect()
  879. del self.pool_mq1
  880. def __init__(self):
  881. Dataflow.__init__(self)
  882. self.mq_init = "/queue/dataflow_init"
  883. self.mq_attachment = "/queue/dataflow_attachment"
  884. self.mq_extract = "/queue/dataflow_extract"
  885. self.pool_oracle = ConnectorPool(10,15,getConnection_oracle)
  886. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  887. self.ots_capacity = getConnect_ots_capacity()
  888. self.init_comsumer_counts = 2
  889. self.list_init_comsumer = []
  890. for i in range(self.init_comsumer_counts):
  891. listener = self.InitListener(getConnect_activateMQ())
  892. createComsumer(listener,self.mq_init)
  893. self.list_init_comsumer.append(listener)
  894. def monitor_listener(self):
  895. for i in range(len(self.list_init_comsumer)):
  896. if self.list_init_comsumer[i].conn.is_connected():
  897. continue
  898. else:
  899. listener = self.InitListener(getConnect_activateMQ())
  900. createComsumer(listener,self.mq_init)
  901. self.list_init_comsumer[i] = listener
  902. def temp2mq(self,object):
  903. conn_oracle = self.pool_oracle.getConnector()
  904. try:
  905. list_obj = object.select_rows(conn_oracle,type(object),object.table_name,[],limit=1000)
  906. for _obj in list_obj:
  907. ots_dict = _obj.getProperties_ots()
  908. if len(ots_dict.get("dochtmlcon",""))>500000:
  909. _obj.delete_row(conn_oracle)
  910. log("msg too long:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  911. continue
  912. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_init):
  913. #删除数据,上线放开
  914. _obj.delete_row(conn_oracle)
  915. else:
  916. log("send_msg_error111:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  917. self.pool_oracle.putConnector(conn_oracle)
  918. except Exception as e:
  919. traceback.print_exc()
  920. self.pool_oracle.decrease()
  921. def ots2mq(self):
  922. try:
  923. bool_query = BoolQuery(must_queries=[RangeQuery("status",1,51)])
  924. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  925. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  926. ColumnsToGet(return_type=ColumnReturnType.ALL))
  927. list_data = getRow_ots(rows)
  928. for _data in list_data:
  929. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  930. document_tmp_docid:_data.get(document_tmp_docid),
  931. document_tmp_status:0}
  932. _document = Document(_d)
  933. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  934. _document_html = Document(_data)
  935. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  936. if page_attachments!="[]":
  937. status = random.randint(1,10)
  938. _data[document_tmp_status] = status
  939. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  940. else:
  941. status = random.randint(11,50)
  942. _data[document_tmp_status] = status
  943. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  944. if send_succeed:
  945. _document.update_row(self.ots_client)
  946. else:
  947. log("send_msg_error2222")
  948. while next_token:
  949. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  950. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  951. ColumnsToGet(return_type=ColumnReturnType.ALL))
  952. list_data = getRow_ots(rows)
  953. for _data in list_data:
  954. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  955. document_tmp_docid:_data.get(document_tmp_docid),
  956. document_tmp_status:0}
  957. _document = Document(_d)
  958. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  959. _document_html = Document(_data)
  960. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  961. if page_attachments!="[]":
  962. status = random.randint(1,10)
  963. _data[document_tmp_status] = status
  964. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  965. else:
  966. status = random.randint(11,50)
  967. _data[document_tmp_status] = status
  968. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  969. if send_succeed:
  970. _document.update_row(self.ots_client)
  971. else:
  972. log("send_msg_error2222")
  973. except Exception as e:
  974. traceback.print_exc()
  975. def otstmp2mq(self):
  976. try:
  977. bool_query = BoolQuery(must_queries=[TermQuery("status",0)])
  978. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  979. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  980. ColumnsToGet(return_type=ColumnReturnType.ALL))
  981. list_data = getRow_ots(rows)
  982. for _data in list_data:
  983. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  984. document_tmp_docid:_data.get(document_tmp_docid),
  985. document_tmp_status:0}
  986. _document = Document_tmp(_d)
  987. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  988. _document_html = Document_html(_data)
  989. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  990. if page_attachments!="[]":
  991. status = random.randint(1,10)
  992. _data[document_tmp_status] = status
  993. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  994. else:
  995. status = random.randint(11,50)
  996. _data[document_tmp_status] = status
  997. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  998. if send_succeed:
  999. _document.setValue(document_tmp_status,1,True)
  1000. _document.update_row(self.ots_client)
  1001. else:
  1002. log("send_msg_error2222")
  1003. while next_token:
  1004. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1005. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1006. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1007. list_data = getRow_ots(rows)
  1008. for _data in list_data:
  1009. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1010. document_tmp_docid:_data.get(document_tmp_docid),
  1011. document_tmp_status:0}
  1012. _document = Document_tmp(_d)
  1013. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1014. _document_html = Document_html(_data)
  1015. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  1016. if page_attachments!="[]":
  1017. status = random.randint(1,10)
  1018. _data[document_tmp_status] = status
  1019. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1020. else:
  1021. status = random.randint(11,50)
  1022. _data[document_tmp_status] = status
  1023. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1024. if send_succeed:
  1025. _document.setValue(document_tmp_status,1,True)
  1026. _document.update_row(self.ots_client)
  1027. else:
  1028. log("send_msg_error2222")
  1029. except Exception as e:
  1030. traceback.print_exc()
  1031. def test_dump_docid(self):
  1032. class TestDumpListener(ActiveMQListener):
  1033. def on_message(self, headers):
  1034. message_id = headers.headers["message-id"]
  1035. body = headers.body
  1036. self._queue.put(headers,True)
  1037. ackMsg(self.conn,message_id)
  1038. _queue = Queue()
  1039. listener1 = TestDumpListener(getConnect_activateMQ(),_queue)
  1040. listener2 = TestDumpListener(getConnect_activateMQ(),_queue)
  1041. createComsumer(listener1,"/queue/dataflow_attachment")
  1042. createComsumer(listener2,"/queue/dataflow_extract")
  1043. time.sleep(10)
  1044. list_item = []
  1045. list_docid = []
  1046. while 1:
  1047. try:
  1048. _item = _queue.get(timeout=2)
  1049. list_item.append(_item)
  1050. except Exception as e:
  1051. break
  1052. for item in list_item:
  1053. _item = json.loads(item.body)
  1054. list_docid.append(_item.get("docid"))
  1055. log(list_docid[:10])
  1056. log("len docid:%d set len:%d"%(len(list_docid),len(set(list_docid))))
  1057. def start_dataflow_init(self):
  1058. # self.test_dump_docid()
  1059. from BaseDataMaintenance.model.oracle.CaiGouYiXiangTemp import CaiGouYiXiangTemp
  1060. from BaseDataMaintenance.model.oracle.PaiMaiChuRangTemp import PaiMaiChuRangTemp
  1061. from BaseDataMaintenance.model.oracle.ZhaoBiaoGongGaoTemp import ZhaoBiaoGongGaoTemp
  1062. from BaseDataMaintenance.model.oracle.ZhaoBiaoYuGaoTemp import ZhaoBiaoYuGaoTemp
  1063. from BaseDataMaintenance.model.oracle.ZhongBiaoXinXiTemp import ZhongBiaoXinXiTemp
  1064. from BaseDataMaintenance.model.oracle.ZiShenJieGuoTemp import ZiShenJieGuoTemp
  1065. from BaseDataMaintenance.model.oracle.ChanQuanJiaoYiTemp import ChanQuanJiaoYiTemp
  1066. from BaseDataMaintenance.model.oracle.GongGaoBianGengTemp import GongGaoBianGeng
  1067. from BaseDataMaintenance.model.oracle.KongZhiJiaTemp import KongZhiJiaTemp
  1068. from BaseDataMaintenance.model.oracle.TuDiKuangChanTemp import TuDiKuangChanTemp
  1069. from BaseDataMaintenance.model.oracle.ZhaoBiaoDaYiTemp import ZhaoBiaoDaYiTemp
  1070. from BaseDataMaintenance.model.oracle.ZhaoBiaoWenJianTemp import ZhaoBiaoWenJianTemp
  1071. schedule = BlockingScheduler()
  1072. schedule.add_job(self.temp2mq,"cron",args=(CaiGouYiXiangTemp({}),),second="*/10")
  1073. schedule.add_job(self.temp2mq,"cron",args=(PaiMaiChuRangTemp({}),),second="*/10")
  1074. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoGongGaoTemp({}),),second="*/10")
  1075. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoYuGaoTemp({}),),second="*/10")
  1076. schedule.add_job(self.temp2mq,"cron",args=(ZhongBiaoXinXiTemp({}),),second="*/10")
  1077. schedule.add_job(self.temp2mq,"cron",args=(ZiShenJieGuoTemp({}),),second="*/10")
  1078. schedule.add_job(self.temp2mq,"cron",args=(ChanQuanJiaoYiTemp({}),),second="*/10")
  1079. schedule.add_job(self.temp2mq,"cron",args=(GongGaoBianGeng({}),),second="*/10")
  1080. schedule.add_job(self.temp2mq,"cron",args=(KongZhiJiaTemp({}),),second="*/10")
  1081. schedule.add_job(self.temp2mq,"cron",args=(TuDiKuangChanTemp({}),),second="*/10")
  1082. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10")
  1083. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10")
  1084. schedule.add_job(self.ots2mq,"cron",second="*/10")
  1085. schedule.add_job(self.otstmp2mq,"cron",second="*/10")
  1086. schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  1087. schedule.start()
  1088. def transform_attachment():
  1089. from BaseDataMaintenance.model.ots.attachment import attachment
  1090. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  1091. from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots
  1092. from threading import Thread
  1093. from queue import Queue
  1094. task_queue = Queue()
  1095. def comsumer(task_queue,pool_postgres):
  1096. while 1:
  1097. _dict = task_queue.get(True)
  1098. try:
  1099. attach = Attachment_postgres(_dict)
  1100. if not attach.exists(pool_postgres):
  1101. attach.insert_row(pool_postgres)
  1102. except Exception as e:
  1103. traceback.print_exc()
  1104. def start_comsumer(task_queue):
  1105. pool_postgres = ConnectorPool(10,30,getConnection_postgres)
  1106. comsumer_count = 30
  1107. list_thread = []
  1108. for i in range(comsumer_count):
  1109. _t = Thread(target=comsumer,args=(task_queue,pool_postgres))
  1110. list_thread.append(_t)
  1111. for _t in list_thread:
  1112. _t.start()
  1113. ots_client = getConnect_ots()
  1114. _thread = Thread(target=start_comsumer,args=(task_queue,))
  1115. _thread.start()
  1116. bool_query = BoolQuery(must_queries=[
  1117. RangeQuery(attachment_crtime,"2022-05-06"),
  1118. BoolQuery(should_queries=[TermQuery(attachment_status,10),
  1119. TermQuery(attachment_status,ATTACHMENT_TOOLARGE)])
  1120. ])
  1121. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  1122. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(attachment_crtime,sort_order=SortOrder.DESC)]),get_total_count=True,limit=100),
  1123. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1124. list_dict = getRow_ots(rows)
  1125. for _dict in list_dict:
  1126. task_queue.put(_dict,True)
  1127. _count = len(list_dict)
  1128. while next_token:
  1129. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  1130. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1131. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1132. list_dict = getRow_ots(rows)
  1133. for _dict in list_dict:
  1134. task_queue.put(_dict,True)
  1135. _count += len(list_dict)
  1136. print("%d/%d,queue:%d"%(_count,total_count,task_queue.qsize()))
  1137. def del_test_doc():
  1138. ots_client = getConnect_ots()
  1139. bool_query = BoolQuery(must_queries=[RangeQuery("docid",range_to=0)])
  1140. list_data = []
  1141. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1142. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  1143. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1144. print(total_count)
  1145. list_row = getRow_ots(rows)
  1146. list_data.extend(list_row)
  1147. while next_token:
  1148. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1149. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1150. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1151. list_row = getRow_ots(rows)
  1152. list_data.extend(list_row)
  1153. for _row in list_data:
  1154. _doc = Document_tmp(_row)
  1155. _doc.delete_row(ots_client)
  1156. _html = Document_html(_row)
  1157. if _html.exists_row(ots_client):
  1158. _html.delete_row(ots_client)
  1159. def fixDoc_to_queue_extract():
  1160. pool_mq = ConnectorPool(10,20,getConnect_activateMQ)
  1161. try:
  1162. ots_client = getConnect_ots()
  1163. ots_capacity = getConnect_ots_capacity()
  1164. bool_query = BoolQuery(must_queries=[
  1165. RangeQuery("crtime","2022-05-31"),
  1166. TermQuery("docchannel",114)
  1167. ])
  1168. list_data = []
  1169. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1170. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  1171. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1172. print(total_count)
  1173. list_row = getRow_ots(rows)
  1174. list_data.extend(list_row)
  1175. _count = len(list_row)
  1176. while next_token:
  1177. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1178. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1179. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1180. list_row = getRow_ots(rows)
  1181. list_data.extend(list_row)
  1182. _count = len(list_data)
  1183. print("%d/%d"%(_count,total_count))
  1184. task_queue = Queue()
  1185. for _row in list_data:
  1186. if "all_columns" in _row:
  1187. _row.pop("all_columns")
  1188. _html = Document(_row)
  1189. task_queue.put(_html)
  1190. def _handle(item,result_queue):
  1191. _html = item
  1192. _html.fix_columns(ots_capacity,["dochtmlcon"],True)
  1193. print(_html.getProperties().get(document_tmp_docid))
  1194. send_msg_toacmq(pool_mq,json.dumps(_html.getProperties()),"/queue/dataflow_extract")
  1195. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1196. mt.run()
  1197. except Exception as e:
  1198. traceback.print_exc()
  1199. finally:
  1200. pool_mq.destory()
  1201. def check_data_synchronization():
  1202. # filepath = "C:\\Users\\Administrator\\Desktop\\to_check.log"
  1203. # list_uuid = []
  1204. # _regrex = "delete\s+(?P<tablename>[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  1205. # with open(filepath,"r",encoding="utf8") as f:
  1206. # while 1:
  1207. # _line = f.readline()
  1208. # if not _line:
  1209. # break
  1210. # _match = re.search(_regrex,_line)
  1211. # if _match is not None:
  1212. # _uuid = _match.groupdict().get("uuid")
  1213. # tablename = _match.groupdict.get("tablename")
  1214. # if _uuid is not None:
  1215. # list_uuid.append({"uuid":_uuid,"tablename":tablename})
  1216. # print("total_count:",len(list_uuid))
  1217. import pandas as pd
  1218. from BaseDataMaintenance.common.Utils import load
  1219. task_queue = Queue()
  1220. list_data = []
  1221. df_data = load("uuid.pk")
  1222. # df_data = pd.read_excel("check.xlsx")
  1223. for uuid,tablename in zip(df_data["uuid"],df_data["tablename"]):
  1224. _dict = {"uuid":uuid,
  1225. "tablename":tablename}
  1226. list_data.append(_dict)
  1227. task_queue.put(_dict)
  1228. print("qsize:",task_queue.qsize())
  1229. ots_client = getConnect_ots()
  1230. def _handle(_item,result_queue):
  1231. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  1232. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1233. SearchQuery(bool_query,get_total_count=True),
  1234. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1235. _item["exists"] = total_count
  1236. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1237. mt.run()
  1238. df_data = {"uuid":[],
  1239. "tablename":[],
  1240. "exists":[]}
  1241. for _data in list_data:
  1242. if _data["exists"]==0:
  1243. for k,v in df_data.items():
  1244. v.append(_data.get(k))
  1245. import pandas as pd
  1246. df2 = pd.DataFrame(df_data)
  1247. df2.to_excel("check1.xlsx")
  1248. current_path = os.path.abspath(os.path.dirname(__file__))
  1249. def fixDoc_to_queue_init(filename=""):
  1250. import pandas as pd
  1251. from BaseDataMaintenance.model.oracle.GongGaoTemp import dict_oracle2ots
  1252. if filename=="":
  1253. filename = os.path.join(current_path,"check.xlsx")
  1254. df = pd.read_excel(filename)
  1255. dict_oracle2ots.pop("docchannel")
  1256. row_name = ",".join(list(dict_oracle2ots.keys()))
  1257. conn = getConnection_oracle()
  1258. cursor = conn.cursor()
  1259. _count = 0
  1260. for uuid,tablename,_exists,_toolong in zip(df["uuid"],df["tablename"],df["exists"],df["tolong"]):
  1261. if _exists==0 and _toolong==0:
  1262. _count += 1
  1263. _source = str(tablename).replace("_TEMP","")
  1264. sql = " insert into %s(%s) select %s from %s where id='%s' "%(tablename,row_name,row_name,_source,uuid)
  1265. cursor.execute(sql)
  1266. log("%d:%s"%(_count,sql))
  1267. conn.commit()
  1268. if __name__ == '__main__':
  1269. # di = Dataflow_init()
  1270. # di.start_dataflow_init()
  1271. # transform_attachment()
  1272. # del_test_doc()
  1273. # de = Dataflow_ActivteMQ_extract()
  1274. # de.start_flow_extract()
  1275. # fixDoc_to_queue_extract()
  1276. # check_data_synchronization()
  1277. fixDoc_to_queue_init(filename="C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-07.xlsx")