dataflow_mq.py 68 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506
  1. from BaseDataMaintenance.maintenance.dataflow import *
  2. from BaseDataMaintenance.common.activateMQUtils import *
  3. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ,getConnection_postgres,getConnection_mysql,getConnection_oracle,getConnect_ots_capacity,getConnect_redis_doc
  4. from BaseDataMaintenance.dataSource.setttings import *
  5. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  6. import os
  7. from BaseDataMaintenance.common.ossUtils import *
  8. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  9. from BaseDataMaintenance.model.ots.document import Document
  10. from BaseDataMaintenance.common.Utils import article_limit
  11. from BaseDataMaintenance.common.documentFingerprint import getFingerprint
  12. from BaseDataMaintenance.model.postgres.document_extract import *
  13. class ActiveMQListener():
  14. def __init__(self,conn,_queue,*args,**kwargs):
  15. self.conn = conn
  16. self._queue = _queue
  17. def on_error(self, headers):
  18. log("===============")
  19. log('received an error %s' % str(headers.body))
  20. def on_message(self, headers):
  21. log("====message====")
  22. message_id = headers.headers["message-id"]
  23. body = headers.body
  24. self._queue.put({"frame":headers,"conn":self.conn},True)
  25. def __del__(self):
  26. self.conn.disconnect()
  27. class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
  28. def __init__(self):
  29. Dataflow_attachment.__init__(self)
  30. self.mq_attachment = "/queue/dataflow_attachment"
  31. self.mq_attachment_failed = "/queue/dataflow_attachment_failed"
  32. self.mq_extract = "/queue/dataflow_extract"
  33. self.comsumer_count = 120
  34. self.retry_comsumer_count = 10
  35. self.retry_times = 5
  36. self.list_attachment_comsumer = []
  37. for _i in range(self.comsumer_count):
  38. listener_attachment = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  39. createComsumer(listener_attachment,self.mq_attachment)
  40. self.list_attachment_comsumer.append(listener_attachment)
  41. self.attach_pool = ConnectorPool(10,30,getConnection_postgres)
  42. self.conn_mq = getConnect_activateMQ()
  43. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  44. def monitor_listener(self):
  45. for i in range(len(self.list_attachment_comsumer)):
  46. if self.list_attachment_comsumer[i].conn.is_connected():
  47. continue
  48. else:
  49. listener = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  50. createComsumer(listener,self.mq_attachment)
  51. self.list_attachment_comsumer[i] = listener
  52. def process_failed_attachment(self):
  53. from BaseDataMaintenance.java.MQInfo import getQueueSize
  54. attachment_size = getQueueSize("dataflow_attachment")
  55. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  56. if attachment_size<100 and failed_attachment_size>0:
  57. list_comsumer = []
  58. for _i in range(self.retry_comsumer_count):
  59. listener_attachment = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  60. list_comsumer.append(listener_attachment)
  61. createComsumer(listener_attachment,self.mq_attachment_failed)
  62. while 1:
  63. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  64. if failed_attachment_size==0:
  65. break
  66. time.sleep(10)
  67. for _c in list_comsumer:
  68. _c.conn.disconnect()
  69. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  70. try:
  71. list_html = []
  72. swf_urls = []
  73. _not_failed = True
  74. for _attach in list_attach:
  75. #测试全跑
  76. _filemd5 = _attach.getProperties().get(attachment_filemd5)
  77. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  78. log("%s has processed or toolarge"%(_filemd5))
  79. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  80. if _html is None:
  81. _html = ""
  82. list_html.append({attachment_filemd5:_filemd5,
  83. "html":_html})
  84. else:
  85. #has process_time then jump
  86. if len(str(_attach.getProperties().get(attachment_process_time,"")))>10:
  87. log("%s has process_time jump"%(_filemd5))
  88. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  89. if _html is None:
  90. _html = ""
  91. list_html.append({attachment_filemd5:_filemd5,
  92. "html":_html})
  93. else:
  94. log("%s requesting interface"%(_filemd5))
  95. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  96. if not _succeed:
  97. _not_failed = False
  98. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  99. if _html is None:
  100. _html = ""
  101. list_html.append({attachment_filemd5:_filemd5,
  102. "html":_html})
  103. if _attach.getProperties().get(attachment_filetype)=="swf":
  104. swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  105. if not _not_failed:
  106. return False,list_html,swf_urls
  107. return True,list_html,swf_urls
  108. except requests.ConnectionError as e1:
  109. raise e1
  110. except Exception as e:
  111. return False,list_html,swf_urls
  112. def attachment_recognize(self,_dict,result_queue):
  113. '''
  114. 识别附件内容
  115. :param _dict: 附件内容
  116. :param result_queue:
  117. :return:
  118. '''
  119. try:
  120. item = _dict.get("item")
  121. list_attach = _dict.get("list_attach")
  122. conn = _dict["conn"]
  123. message_id = _dict.get("message_id")
  124. _retry_times = item.get("retry_times",0)
  125. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  126. "docid":item.get("docid")})
  127. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  128. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  129. dtmp = Document_tmp(item)
  130. #调用识别接口
  131. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  132. _to_ack = False
  133. if not _succeed and _retry_times<self.retry_times:
  134. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  135. item["retry_times"] = _retry_times+1
  136. #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
  137. if item["retry_times"]>=self.retry_times:
  138. send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
  139. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
  140. #失败保存
  141. if _retry_times==0:
  142. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  143. dtmp.setValue(document_tmp_status,0,True)
  144. if not dtmp.exists_row(self.ots_client):
  145. dtmp.update_row(self.ots_client)
  146. dhtml.update_row(self.ots_client)
  147. if send_succeed:
  148. _to_ack = True
  149. else:
  150. try:
  151. log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),str(len(list_html))))
  152. dhtml.updateSWFImages(swf_urls)
  153. dhtml.updateAttachment(list_html)
  154. dtmp.setValue(document_tmp_attachment_extract_status,1,True)
  155. dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
  156. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
  157. if send_succeed:
  158. _to_ack = True
  159. except Exception as e:
  160. traceback.print_exc()
  161. if _to_ack:
  162. ackMsg(conn,message_id)
  163. log("document:%d get attachments with result:%s %s retry_times:%d"%(item.get("docid"),str(_succeed),str(_to_ack),_retry_times))
  164. except Exception as e:
  165. if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment):
  166. ackMsg(conn,message_id)
  167. def request_attachment_interface(self,attach,_dochtmlcon):
  168. filemd5 = attach.getProperties().get(attachment_filemd5)
  169. _status = attach.getProperties().get(attachment_status)
  170. _filetype = attach.getProperties().get(attachment_filetype)
  171. _path = attach.getProperties().get(attachment_path)
  172. _uuid = uuid4()
  173. objectPath = attach.getProperties().get(attachment_path)
  174. docids = attach.getProperties().get(attachment_docids)
  175. if objectPath is None:
  176. relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex)
  177. else:
  178. relative_path = objectPath[5:]
  179. localpath = "/FileInfo/%s"%(relative_path)
  180. if not os.path.exists(localpath):
  181. if not os.path.exists(os.path.dirname(localpath)):
  182. os.makedirs(os.path.dirname(localpath))
  183. local_exists = False
  184. else:
  185. local_exists = True
  186. _size = os.path.getsize(localpath)
  187. not_failed_flag = True
  188. try:
  189. d_start_time = time.time()
  190. if not local_exists:
  191. log("md5:%s path:%s not exists,start downloading"%(filemd5,objectPath[5:]))
  192. try:
  193. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  194. except Exception as e:
  195. download_succeed = False
  196. else:
  197. log("md5:%s path:%s exists"%(filemd5,objectPath[5:]))
  198. if not (local_exists or download_succeed):
  199. _ots_attach = attachment(attach.getProperties_ots())
  200. _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_classification,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True)
  201. log("md5:%s path:%s file not in local or oss,search ots.attachment"%(filemd5,objectPath[5:]))
  202. if _ots_attach.getProperties().get(attachment_attachmenthtml,"")!="":
  203. attach.setValue(attachment_attachmenthtml,_ots_attach.getProperties().get(attachment_attachmenthtml,""))
  204. attach.setValue(attachment_attachmentcon,_ots_attach.getProperties().get(attachment_attachmentcon,""))
  205. attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,""))
  206. attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,""))
  207. attach.setValue(attachment_classification,_ots_attach.getProperties().get(attachment_classification,""))
  208. if attach.exists(self.attach_pool):
  209. attach.update_row(self.attach_pool)
  210. else:
  211. attach.insert_row(self.attach_pool)
  212. try:
  213. if os.exists(localpath):
  214. os.remove(localpath)
  215. except Exception as e:
  216. pass
  217. return True
  218. if _ots_exists:
  219. objectPath = attach.getProperties().get(attachment_path)
  220. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  221. if download_succeed:
  222. log("md5:%s path:%s download file from oss succeed"%(filemd5,objectPath[5:]))
  223. else:
  224. log("md5:%s path:%s download file from ots failed=="%(filemd5,objectPath[5:]))
  225. else:
  226. log("md5:%s path:%s not found in ots"%(filemd5,objectPath[5:]))
  227. if local_exists or download_succeed:
  228. _size = os.path.getsize(localpath)
  229. attach.setValue(attachment_size,_size,True)
  230. if _size>ATTACHMENT_LARGESIZE:
  231. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE,True)
  232. log("attachment :%s of path:%s to large"%(filemd5,_path))
  233. _ots_attach = attachment(attach.getProperties_ots())
  234. _ots_attach.update_row(self.ots_client)
  235. #更新postgres
  236. if attach.exists(self.attach_pool):
  237. attach.update_row(self.attach_pool)
  238. else:
  239. attach.insert_row(self.attach_pool)
  240. if local_exists:
  241. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  242. os.remove(localpath)
  243. return True
  244. time_download = time.time()-d_start_time
  245. #调用接口处理结果
  246. start_time = time.time()
  247. _filetype = attach.getProperties().get(attachment_filetype)
  248. # _data_base64 = base64.b64encode(open(localpath,"rb").read())
  249. # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
  250. _success,_html,swf_images,classification = getAttachDealInterface(None,_filetype,path=localpath)
  251. log("process filemd5:%s %s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
  252. if _success:
  253. if len(_html)<5:
  254. _html = ""
  255. else:
  256. sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  257. _html = ""
  258. return False
  259. swf_images = eval(swf_images)
  260. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  261. swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  262. if len(swf_urls)==0:
  263. objectPath = attach.getProperties().get(attachment_path,"")
  264. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  265. if not os.path.exists(swf_dir):
  266. os.mkdir(swf_dir)
  267. for _i in range(len(swf_images)):
  268. _base = swf_images[_i]
  269. _base = base64.b64decode(_base)
  270. filename = "swf_page_%d.png"%(_i)
  271. filepath = os.path.join(swf_dir,filename)
  272. with open(filepath,"wb") as f:
  273. f.write(_base)
  274. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  275. if os.path.exists(swf_dir):
  276. os.rmdir(swf_dir)
  277. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  278. if re.search("<td",_html) is not None:
  279. attach.setValue(attachment_has_table,1,True)
  280. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  281. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  282. if _file_title!="":
  283. attach.setValue(attachment_file_title,_file_title,True)
  284. if filelink!="":
  285. attach.setValue(attachment_file_link,filelink,True)
  286. attach.setValue(attachment_attachmenthtml,_html,True)
  287. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  288. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  289. attach.setValue(attachment_recsize,len(_html),True)
  290. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  291. attach.setValue(attachment_classification,classification,True)
  292. #更新ots
  293. _ots_attach = attachment(attach.getProperties_ots())
  294. _ots_attach.update_row(self.ots_client) #线上再开放更新
  295. #更新postgres
  296. if attach.exists(self.attach_pool):
  297. attach.update_row(self.attach_pool)
  298. else:
  299. attach.insert_row(self.attach_pool)
  300. if local_exists:
  301. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  302. try:
  303. if os.exists(localpath):
  304. os.remove(localpath)
  305. except Exception as e:
  306. pass
  307. return True
  308. else:
  309. return True
  310. except requests.ConnectionError as e1:
  311. raise e1
  312. except oss2.exceptions.NotFound as e:
  313. return True
  314. except Exception as e:
  315. traceback.print_exc()
  316. def flow_attachment(self):
  317. self.flow_attachment_producer()
  318. self.flow_attachment_producer_comsumer()
  319. def getAttachPath(self,filemd5,_dochtmlcon):
  320. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  321. _find = _soup.find("a",attrs={"data":filemd5})
  322. filelink = ""
  323. if _find is None:
  324. _find = _soup.find("img",attrs={"data":filemd5})
  325. if _find is not None:
  326. filelink = _find.attrs.get("src","")
  327. else:
  328. filelink = _find.attrs.get("href","")
  329. _path = filelink.split("/file")
  330. if len(_path)>1:
  331. return _path[1]
  332. def getAttachments(self,list_filemd5,_dochtmlcon):
  333. conn = self.attach_pool.getConnector()
  334. #搜索postgres
  335. try:
  336. to_find_md5 = []
  337. for _filemd5 in list_filemd5[:50]:
  338. if _filemd5 is not None:
  339. to_find_md5.append(_filemd5)
  340. conditions = ["filemd5 in ('%s')"%("','".join(to_find_md5))]
  341. list_attachment = Attachment_postgres.select_rows(conn,Attachment_postgres,"attachment",conditions)
  342. log("select localpath database %d/%d"%(len(list_attachment),len(to_find_md5)))
  343. set_md5 = set()
  344. for _attach in list_attachment:
  345. set_md5.add(_attach.getProperties().get(attachment_filemd5))
  346. list_not_in_md5 = []
  347. for _filemd5 in to_find_md5:
  348. if _filemd5 not in set_md5:
  349. list_not_in_md5.append(_filemd5)
  350. _path = self.getAttachPath(_filemd5,_dochtmlcon)
  351. if _path is not None:
  352. _filetype = _path.split(".")[-1]
  353. _attach = {attachment_filemd5:_filemd5,
  354. attachment_filetype:_filetype,
  355. attachment_status:20,
  356. attachment_path:"%s/%s"%(_filemd5[:4],_path),
  357. attachment_crtime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  358. list_attachment.append(Attachment_postgres(_attach))
  359. else:
  360. log("getAttachments search in ots:%s"%(_filemd5))
  361. _attach = {attachment_filemd5:_filemd5}
  362. _attach_ots = attachment(_attach)
  363. _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time],True)
  364. if _attach_ots.getProperties().get(attachment_status) is not None:
  365. log("getAttachments find in ots:%s"%(_filemd5))
  366. list_attachment.append(Attachment_postgres(_attach_ots.getProperties()))
  367. return list_attachment
  368. except Exception as e:
  369. log("attachProcess comsumer error %s"%str(e))
  370. log(str(to_find_md5))
  371. traceback.print_exc()
  372. return []
  373. finally:
  374. self.attach_pool.putConnector(conn)
  375. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  376. q_size = self.queue_attachment.qsize()
  377. qsize_ocr = self.queue_attachment_ocr.qsize()
  378. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  379. log("queue_attachment:%d,queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(q_size,qsize_ocr,qsize_not_ocr))
  380. def flow_attachment_producer_comsumer(self):
  381. log("start flow_attachment comsumer")
  382. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1)
  383. mt.run()
  384. def set_queue(self,_dict):
  385. list_attach = _dict.get("list_attach")
  386. to_ocr = False
  387. for attach in list_attach:
  388. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  389. to_ocr = True
  390. break
  391. if to_ocr:
  392. self.queue_attachment_ocr.put(_dict,True)
  393. else:
  394. self.queue_attachment_not_ocr.put(_dict,True)
  395. def comsumer_handle(self,_dict,result_queue):
  396. try:
  397. frame = _dict["frame"]
  398. conn = _dict["conn"]
  399. message_id = frame.headers["message-id"]
  400. item = json.loads(frame.body)
  401. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  402. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  403. if len(page_attachments)==0:
  404. self.set_queue({"item":item,"list_attach":[],"message_id":message_id,"conn":conn})
  405. else:
  406. list_fileMd5 = []
  407. for _atta in page_attachments:
  408. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  409. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  410. self.set_queue({"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn})
  411. except Exception as e:
  412. traceback.print_exc()
  413. def remove_attachment_postgres(self):
  414. current_date = getCurrent_date(format="%Y-%m-%d")
  415. last_date = timeAdd(current_date,-30,format="%Y-%m-%d")
  416. sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
  417. conn = getConnection_postgres()
  418. cursor = conn.cursor()
  419. cursor.execute(sql)
  420. self.conn_attach.commit()
  421. conn.close()
  422. def start_flow_attachment(self):
  423. schedule = BlockingScheduler()
  424. schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  425. schedule.add_job(self.flow_attachment,"cron",second="*/10")
  426. schedule.add_job(self.monitor_attachment_process,"cron",second="*/10")
  427. schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
  428. schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
  429. schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  430. schedule.start()
  431. class Dataflow_ActivteMQ_extract(Dataflow_extract):
  432. class ExtractListener():
  433. def __init__(self,conn,_func,*args,**kwargs):
  434. self.conn = conn
  435. self._func = _func
  436. def on_message(self, headers):
  437. try:
  438. message_id = headers.headers["message-id"]
  439. body = headers.body
  440. log("get message %s crtime:%s"%(message_id,json.loads(body)["crtime"]))
  441. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  442. except Exception as e:
  443. pass
  444. def on_error(self, headers):
  445. log('received an error %s' % str(headers.body))
  446. def __del__(self):
  447. self.conn.disconnect()
  448. def __init__(self):
  449. Dataflow_extract.__init__(self)
  450. self.industy_url = "http://127.0.0.1:15000/industry_extract"
  451. self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",20],
  452. ["http://192.168.0.115:15030/content_extract",10]
  453. ]
  454. self.mq_extract = "/queue/dataflow_extract"
  455. self.mq_extract_failed = "/queue/dataflow_extract_failed"
  456. self.whole_weight = 0
  457. for _url,weight in self.extract_interfaces:
  458. self.whole_weight+= weight
  459. current_weight = 0
  460. for _i in range(len(self.extract_interfaces)):
  461. current_weight += self.extract_interfaces[_i][1]
  462. self.extract_interfaces[_i][1] = current_weight/self.whole_weight
  463. self.comsumer_count = 40
  464. self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres)
  465. self.pool_redis_doc = ConnectorPool(10,self.comsumer_count,getConnect_redis_doc)
  466. self.conn_mq = getConnect_activateMQ()
  467. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  468. self.block_url = RLock()
  469. self.url_count = 0
  470. self.list_extract_comsumer = []
  471. for _i in range(self.comsumer_count):
  472. listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
  473. createComsumer(listener_extract,self.mq_extract)
  474. self.list_extract_comsumer.append(listener_extract)
  475. def monitor_listener(self):
  476. for i in range(len(self.list_extract_comsumer)):
  477. if self.list_extract_comsumer[i].conn.is_connected():
  478. continue
  479. else:
  480. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
  481. createComsumer(listener,self.mq_extract)
  482. self.list_extract_comsumer[i] = listener
  483. def getExtract_url(self):
  484. _url_num = 0
  485. with self.block_url:
  486. self.url_count += 1
  487. self.url_count %= self.whole_weight
  488. _url_num = self.url_count
  489. # _r = random.random()
  490. _r = _url_num/self.whole_weight
  491. for _i in range(len(self.extract_interfaces)):
  492. if _r<=self.extract_interfaces[_i][1]:
  493. return self.extract_interfaces[_i][0]
  494. def request_extract_interface(self,json,headers):
  495. # _i = random.randint(0,len(self.extract_interfaces)-1)
  496. # _i = 0
  497. # _url = self.extract_interfaces[_i]
  498. _url = self.getExtract_url()
  499. log("extract_url:%s"%(str(_url)))
  500. resp = requests.post(_url,json=json,headers=headers,timeout=10*60)
  501. return resp
  502. def request_industry_interface(self,json,headers):
  503. resp = requests.post(self.industy_url,json=json,headers=headers)
  504. return resp
  505. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  506. q_size = self.queue_extract.qsize()
  507. log("queue extract size:%d"%(q_size))
  508. def process_extract_failed(self):
  509. def _handle(_dict,result_queue):
  510. frame = _dict.get("frame")
  511. message_id = frame.headers["message-id"]
  512. subscription = frame.headers.setdefault('subscription', None)
  513. conn = _dict.get("conn")
  514. body = frame.body
  515. if body is not None:
  516. item = json.loads(body)
  517. item["extract_times"] = 10
  518. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  519. ackMsg(conn,message_id,subscription)
  520. from BaseDataMaintenance.java.MQInfo import getQueueSize
  521. extract_failed_size = getQueueSize("dataflow_extract_failed")
  522. extract_size = getQueueSize("dataflow_extract")
  523. log("extract_failed_size %s extract_size %s"%(str(extract_failed_size),str(extract_size)))
  524. if extract_failed_size>0 and extract_size<100:
  525. failed_listener = self.ExtractListener(getConnect_activateMQ(),_handle)
  526. createComsumer(failed_listener,self.mq_extract_failed)
  527. while 1:
  528. extract_failed_size = getQueueSize("dataflow_extract_failed")
  529. if extract_failed_size==0:
  530. break
  531. time.sleep(10)
  532. failed_listener.conn.disconnect()
  533. def flow_extract(self,):
  534. self.comsumer()
  535. def comsumer(self):
  536. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,20,1,True)
  537. mt.run()
  538. def getExtract_json_fromDB(self,_fingerprint):
  539. conn = self.pool_postgres.getConnector()
  540. try:
  541. list_extract = Document_extract_postgres.select_rows(conn,Document_extract_postgres,"document_extract",[" fingerprint='%s'"%_fingerprint])
  542. if len(list_extract)>0:
  543. _extract = list_extract[0]
  544. return _extract.getProperties().get(document_extract_extract_json)
  545. except Exception as e:
  546. traceback.print_exc()
  547. finally:
  548. self.pool_postgres.putConnector(conn)
  549. return None
  550. def putExtract_json_toDB(self,fingerprint,docid,extract_json):
  551. _de = Document_extract_postgres({document_extract_fingerprint:fingerprint,
  552. document_extract_docid:docid,
  553. document_extract_extract_json:extract_json})
  554. _de.insert_row(self.pool_postgres,1)
  555. def getExtract_json_fromRedis(self,_fingerprint):
  556. db = self.pool_redis_doc.getConnector()
  557. try:
  558. _extract_json = db.get(_fingerprint)
  559. return _extract_json
  560. except Exception as e:
  561. log("getExtract_json_fromRedis error %s"%(str(e)))
  562. finally:
  563. try:
  564. if db.connection.check_health():
  565. self.pool_redis_doc.putConnector(db)
  566. except Exception as e:
  567. pass
  568. return None
  569. def putExtract_json_toRedis(self,fingerprint,extract_json):
  570. db = self.pool_redis_doc.getConnector()
  571. try:
  572. _extract_json = db.set(str(fingerprint),extract_json)
  573. db.expire(fingerprint,3600*2)
  574. return _extract_json
  575. except Exception as e:
  576. log("putExtract_json_toRedis error%s"%(str(e)))
  577. traceback.print_exc()
  578. finally:
  579. try:
  580. if db.connection.check_health():
  581. self.pool_redis_doc.putConnector(db)
  582. except Exception as e:
  583. pass
  584. def comsumer_handle(self,_dict,result_queue):
  585. try:
  586. frame = _dict["frame"]
  587. conn = _dict["conn"]
  588. message_id = frame.headers["message-id"]
  589. subscription = frame.headers.setdefault('subscription', None)
  590. item = json.loads(frame.body)
  591. dtmp = Document_tmp(item)
  592. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  593. "docid":item.get("docid")})
  594. extract_times = item.get("extract_times",0)+1
  595. item["extract_times"] = extract_times
  596. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  597. if len(_dochtmlcon)>200000:
  598. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  599. _soup = article_limit(_soup,200000)
  600. _dochtmlcon = str(_soup)
  601. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  602. _extract = Document_extract({})
  603. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  604. _extract.setValue(document_extract2_docid,item.get(document_docid))
  605. all_done = 1
  606. data = {}
  607. for k,v in item.items():
  608. data[k] = v
  609. data["timeout"] = 440
  610. data["doc_id"] = data.get(document_tmp_docid,0)
  611. # if data["docid"]<298986054 and data["docid"]>0:
  612. # log("jump docid %s"%(str(data["docid"])))
  613. # ackMsg(conn,message_id,subscription)
  614. # return
  615. data["content"] = data.get(document_tmp_dochtmlcon,"")
  616. if document_tmp_dochtmlcon in data:
  617. data.pop(document_tmp_dochtmlcon)
  618. data["title"] = data.get(document_tmp_doctitle,"")
  619. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  620. data["web_source_name"] = item.get(document_tmp_web_source_name,"")
  621. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  622. _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))
  623. if all_done>0:
  624. _time = time.time()
  625. # extract_json = self.getExtract_json_fromDB(_fingerprint)
  626. extract_json = self.getExtract_json_fromRedis(_fingerprint)
  627. log("get json from db takes %.4f"%(time.time()-_time))
  628. # extract_json = None
  629. _docid = int(data["doc_id"])
  630. if extract_json is not None:
  631. log("fingerprint %s exists docid:%s"%(_fingerprint,str(_docid)))
  632. _extract.setValue(document_extract2_extract_json,extract_json,True)
  633. else:
  634. resp = self.request_extract_interface(json=data,headers=self.header)
  635. if (resp.status_code >=200 and resp.status_code<=213):
  636. extract_json = resp.content.decode("utf8")
  637. _extract.setValue(document_extract2_extract_json,extract_json,True)
  638. _time = time.time()
  639. # self.putExtract_json_toDB(_fingerprint,_docid,extract_json)
  640. self.putExtract_json_toRedis(_fingerprint,extract_json)
  641. log("get json to db takes %.4f"%(time.time()-_time))
  642. else:
  643. log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  644. all_done = -2
  645. # if all_done>0:
  646. # resp = self.request_industry_interface(json=data,headers=self.header)
  647. # if (resp.status_code >=200 and resp.status_code<=213):
  648. # _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  649. # else:
  650. # log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  651. # all_done = -3
  652. # _to_ack = False
  653. # if all_done>0 and len(_extract.getProperties().get(document_extract2_extract_json,""))<=2:
  654. # all_done = -4
  655. _extract.setValue(document_extract2_industry_json,"{}",True)
  656. try:
  657. if all_done!=1:
  658. sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  659. if extract_times>=10:
  660. #process as succeed
  661. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  662. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  663. dtmp.update_row(self.ots_client)
  664. dhtml.update_row(self.ots_client)
  665. #replace as {}
  666. _extract.setValue(document_extract2_extract_json,"{}",True)
  667. _extract.setValue(document_extract2_industry_json,"{}",True)
  668. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  669. _extract.update_row(self.ots_client)
  670. _to_ack = True
  671. elif extract_times>5:
  672. #transform to the extract_failed queue
  673. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  674. #process as succeed
  675. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  676. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  677. dtmp.update_row(self.ots_client)
  678. dhtml.update_row(self.ots_client)
  679. #replace as {}
  680. _extract.setValue(document_extract2_extract_json,"{}",True)
  681. _extract.setValue(document_extract2_industry_json,"{}",True)
  682. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  683. _extract.update_row(self.ots_client)
  684. _to_ack = True
  685. else:
  686. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
  687. #失败保存
  688. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  689. dtmp.setValue(document_tmp_status,60,True)
  690. if not dtmp.exists_row(self.ots_client):
  691. dtmp.update_row(self.ots_client)
  692. dhtml.update_row(self.ots_client)
  693. if send_succeed:
  694. _to_ack = True
  695. else:
  696. #process succeed
  697. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  698. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  699. if _docid==290816703:
  700. dtmp.setValue("test_json",extract_json,True)
  701. dtmp.update_row(self.ots_client)
  702. dhtml.update_row(self.ots_client)
  703. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  704. _extract.update_row(self.ots_client)
  705. _to_ack = True
  706. except Exception:
  707. traceback.print_exc()
  708. if _to_ack:
  709. ackMsg(conn,message_id,subscription)
  710. log("process %s docid:%d %s"%(str(_to_ack),data["doc_id"],str(all_done)))
  711. except requests.ConnectionError as e1:
  712. item["extract_times"] -= 1
  713. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  714. ackMsg(conn,message_id,subscription)
  715. except Exception as e:
  716. traceback.print_exc()
  717. sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
  718. log("process %s docid: failed message_id:%s"%(data["doc_id"],message_id))
  719. if extract_times>=10:
  720. #process as succeed
  721. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  722. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  723. dtmp.update_row(self.ots_client)
  724. dhtml.update_row(self.ots_client)
  725. #replace as {}
  726. _extract.setValue(document_extract2_extract_json,"{}",True)
  727. _extract.setValue(document_extract2_industry_json,"{}",True)
  728. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  729. _extract.update_row(self.ots_client)
  730. ackMsg(conn,message_id,subscription)
  731. elif extract_times>5:
  732. #transform to the extract_failed queue
  733. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  734. #process as succeed
  735. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  736. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  737. dtmp.update_row(self.ots_client)
  738. dhtml.update_row(self.ots_client)
  739. #replace as {}
  740. _extract.setValue(document_extract2_extract_json,"{}",True)
  741. _extract.setValue(document_extract2_industry_json,"{}",True)
  742. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  743. _extract.update_row(self.ots_client)
  744. ackMsg(conn,message_id,subscription)
  745. else:
  746. #transform to the extract queue
  747. #失败保存
  748. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  749. dtmp.setValue(document_tmp_status,60,True)
  750. if not dtmp.exists_row(self.ots_client):
  751. dtmp.update_row(self.ots_client)
  752. dhtml.update_row(self.ots_client)
  753. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  754. ackMsg(conn,message_id,subscription)
  755. def delete_document_extract(self,save_count=70*10000):
  756. conn = self.pool_postgres.getConnector()
  757. try:
  758. cursor = conn.cursor()
  759. sql = " select max(docid),min(docid) from document_extract "
  760. cursor.execute(sql)
  761. rows = cursor.fetchall()
  762. if len(rows)>0:
  763. maxdocid,mindocid = rows[0]
  764. d_mindocid = int(maxdocid)-save_count
  765. if mindocid<d_mindocid:
  766. sql = " delete from document_extract where docid<%d"%d_mindocid
  767. cursor.execute(sql)
  768. conn.commit()
  769. except Exception as e:
  770. traceback.print_exc()
  771. finally:
  772. self.pool_postgres.putConnector(conn)
  773. def start_flow_extract(self):
  774. schedule = BlockingScheduler()
  775. schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
  776. schedule.add_job(self.process_extract_failed,"cron",minute="*/5")
  777. schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
  778. schedule.add_job(self.monitor_listener,"cron",minute="*/5")
  779. schedule.start()
  780. from multiprocessing import RLock
  781. docid_lock = RLock()
  782. conn_mysql = None
  783. def generateRangeDocid(nums):
  784. global conn_mysql
  785. while 1:
  786. try:
  787. with docid_lock:
  788. if conn_mysql is None:
  789. conn_mysql = getConnection_mysql()
  790. cursor = conn_mysql.cursor()
  791. sql = "select serial_value from b2c_serial_no where serial_name='DocumentIdSerial'"
  792. cursor.execute(sql)
  793. rows = cursor.fetchall()
  794. current_docid = rows[0][0]
  795. next_docid = current_docid+1
  796. update_docid = current_docid+nums
  797. sql = " update b2c_serial_no set serial_value=%d where serial_name='DocumentIdSerial'"%(update_docid)
  798. cursor.execute(sql)
  799. conn_mysql.commit()
  800. return next_docid
  801. except Exception as e:
  802. conn_mysql = getConnection_mysql()
  803. # 自定义jsonEncoder
  804. class MyEncoder(json.JSONEncoder):
  805. def default(self, obj):
  806. if isinstance(obj, np.ndarray):
  807. return obj.tolist()
  808. elif isinstance(obj, bytes):
  809. return str(obj, encoding='utf-8')
  810. elif isinstance(obj, (np.float_, np.float16, np.float32,
  811. np.float64,Decimal)):
  812. return float(obj)
  813. elif isinstance(obj,str):
  814. return obj
  815. return json.JSONEncoder.default(self, obj)
  816. class Dataflow_init(Dataflow):
  817. class InitListener():
  818. def __init__(self,conn,*args,**kwargs):
  819. self.conn = conn
  820. self.get_count = 1000
  821. self.count = self.get_count
  822. self.begin_docid = None
  823. self.mq_attachment = "/queue/dataflow_attachment"
  824. self.mq_extract = "/queue/dataflow_extract"
  825. self.pool_mq1 = ConnectorPool(1,4,getConnect_activateMQ)
  826. def on_error(self, headers):
  827. log('received an error %s' % headers.body)
  828. def getRangeDocid(self):
  829. begin_docid = generateRangeDocid(self.get_count)
  830. self.begin_docid = begin_docid
  831. self.count = 0
  832. def getNextDocid(self):
  833. if self.count>=self.get_count:
  834. self.getRangeDocid()
  835. next_docid = self.begin_docid+self.count
  836. self.count += 1
  837. return next_docid
  838. def on_message(self, headers):
  839. next_docid = int(self.getNextDocid())
  840. partitionkey = int(next_docid%500+1)
  841. message_id = headers.headers["message-id"]
  842. body = json.loads(headers.body)
  843. body[document_tmp_partitionkey] = partitionkey
  844. body[document_tmp_docid] = next_docid
  845. if body.get(document_original_docchannel) is None:
  846. body[document_original_docchannel] = body.get(document_docchannel)
  847. page_attachments = body.get(document_tmp_attachment_path,"[]")
  848. _uuid = body.get(document_tmp_uuid,"")
  849. if page_attachments!="[]":
  850. status = random.randint(1,10)
  851. body[document_tmp_status] = status
  852. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment):
  853. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  854. ackMsg(self.conn,message_id)
  855. else:
  856. log("send_msg_error on init listener")
  857. else:
  858. status = random.randint(11,50)
  859. body[document_tmp_status] = status
  860. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract):
  861. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  862. ackMsg(self.conn,message_id)
  863. else:
  864. log("send_msg_error on init listener")
  865. def __del__(self):
  866. self.conn.disconnect()
  867. del self.pool_mq1
  868. def __init__(self):
  869. Dataflow.__init__(self)
  870. self.mq_init = "/queue/dataflow_init"
  871. self.mq_attachment = "/queue/dataflow_attachment"
  872. self.mq_extract = "/queue/dataflow_extract"
  873. self.pool_oracle = ConnectorPool(10,15,getConnection_oracle)
  874. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  875. self.ots_capacity = getConnect_ots_capacity()
  876. self.init_comsumer_counts = 2
  877. self.list_init_comsumer = []
  878. for i in range(self.init_comsumer_counts):
  879. listener = self.InitListener(getConnect_activateMQ())
  880. createComsumer(listener,self.mq_init)
  881. self.list_init_comsumer.append(listener)
  882. def monitor_listener(self):
  883. for i in range(len(self.list_init_comsumer)):
  884. if self.list_init_comsumer[i].conn.is_connected():
  885. continue
  886. else:
  887. listener = self.InitListener(getConnect_activateMQ())
  888. createComsumer(listener,self.mq_init)
  889. self.list_init_comsumer[i] = listener
  890. def temp2mq(self,object):
  891. conn_oracle = self.pool_oracle.getConnector()
  892. try:
  893. list_obj = object.select_rows(conn_oracle,type(object),object.table_name,[],limit=1000)
  894. for _obj in list_obj:
  895. ots_dict = _obj.getProperties_ots()
  896. if len(ots_dict.get("dochtmlcon",""))>500000:
  897. _obj.delete_row(conn_oracle)
  898. log("msg too long:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  899. continue
  900. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_init):
  901. #删除数据,上线放开
  902. _obj.delete_row(conn_oracle)
  903. else:
  904. log("send_msg_error111:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  905. self.pool_oracle.putConnector(conn_oracle)
  906. except Exception as e:
  907. traceback.print_exc()
  908. self.pool_oracle.decrease()
  909. def ots2mq(self):
  910. try:
  911. bool_query = BoolQuery(must_queries=[RangeQuery("status",1,51)])
  912. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  913. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  914. ColumnsToGet(return_type=ColumnReturnType.ALL))
  915. list_data = getRow_ots(rows)
  916. for _data in list_data:
  917. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  918. document_tmp_docid:_data.get(document_tmp_docid),
  919. document_tmp_status:0}
  920. _document = Document(_d)
  921. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  922. _document_html = Document(_data)
  923. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  924. if page_attachments!="[]":
  925. status = random.randint(1,10)
  926. _data[document_tmp_status] = status
  927. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  928. else:
  929. status = random.randint(11,50)
  930. _data[document_tmp_status] = status
  931. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  932. if send_succeed:
  933. _document.update_row(self.ots_client)
  934. else:
  935. log("send_msg_error2222")
  936. while next_token:
  937. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  938. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  939. ColumnsToGet(return_type=ColumnReturnType.ALL))
  940. list_data = getRow_ots(rows)
  941. for _data in list_data:
  942. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  943. document_tmp_docid:_data.get(document_tmp_docid),
  944. document_tmp_status:0}
  945. _document = Document(_d)
  946. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  947. _document_html = Document(_data)
  948. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  949. if page_attachments!="[]":
  950. status = random.randint(1,10)
  951. _data[document_tmp_status] = status
  952. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  953. else:
  954. status = random.randint(11,50)
  955. _data[document_tmp_status] = status
  956. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  957. if send_succeed:
  958. _document.update_row(self.ots_client)
  959. else:
  960. log("send_msg_error2222")
  961. except Exception as e:
  962. traceback.print_exc()
  963. def otstmp2mq(self):
  964. try:
  965. bool_query = BoolQuery(must_queries=[TermQuery("status",0)])
  966. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  967. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  968. ColumnsToGet(return_type=ColumnReturnType.ALL))
  969. list_data = getRow_ots(rows)
  970. for _data in list_data:
  971. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  972. document_tmp_docid:_data.get(document_tmp_docid),
  973. document_tmp_status:0}
  974. _document = Document_tmp(_d)
  975. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  976. _document_html = Document_html(_data)
  977. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  978. if page_attachments!="[]":
  979. status = random.randint(1,10)
  980. _data[document_tmp_status] = status
  981. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  982. else:
  983. status = random.randint(11,50)
  984. _data[document_tmp_status] = status
  985. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  986. if send_succeed:
  987. _document.setValue(document_tmp_status,1,True)
  988. _document.update_row(self.ots_client)
  989. else:
  990. log("send_msg_error2222")
  991. while next_token:
  992. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  993. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  994. ColumnsToGet(return_type=ColumnReturnType.ALL))
  995. list_data = getRow_ots(rows)
  996. for _data in list_data:
  997. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  998. document_tmp_docid:_data.get(document_tmp_docid),
  999. document_tmp_status:0}
  1000. _document = Document_tmp(_d)
  1001. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1002. _document_html = Document_html(_data)
  1003. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  1004. if page_attachments!="[]":
  1005. status = random.randint(1,10)
  1006. _data[document_tmp_status] = status
  1007. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1008. else:
  1009. status = random.randint(11,50)
  1010. _data[document_tmp_status] = status
  1011. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1012. if send_succeed:
  1013. _document.setValue(document_tmp_status,1,True)
  1014. _document.update_row(self.ots_client)
  1015. else:
  1016. log("send_msg_error2222")
  1017. except Exception as e:
  1018. traceback.print_exc()
  1019. def test_dump_docid(self):
  1020. class TestDumpListener(ActiveMQListener):
  1021. def on_message(self, headers):
  1022. message_id = headers.headers["message-id"]
  1023. body = headers.body
  1024. self._queue.put(headers,True)
  1025. ackMsg(self.conn,message_id)
  1026. _queue = Queue()
  1027. listener1 = TestDumpListener(getConnect_activateMQ(),_queue)
  1028. listener2 = TestDumpListener(getConnect_activateMQ(),_queue)
  1029. createComsumer(listener1,"/queue/dataflow_attachment")
  1030. createComsumer(listener2,"/queue/dataflow_extract")
  1031. time.sleep(10)
  1032. list_item = []
  1033. list_docid = []
  1034. while 1:
  1035. try:
  1036. _item = _queue.get(timeout=2)
  1037. list_item.append(_item)
  1038. except Exception as e:
  1039. break
  1040. for item in list_item:
  1041. _item = json.loads(item.body)
  1042. list_docid.append(_item.get("docid"))
  1043. log(list_docid[:10])
  1044. log("len docid:%d set len:%d"%(len(list_docid),len(set(list_docid))))
  1045. def start_dataflow_init(self):
  1046. # self.test_dump_docid()
  1047. from BaseDataMaintenance.model.oracle.CaiGouYiXiangTemp import CaiGouYiXiangTemp
  1048. from BaseDataMaintenance.model.oracle.PaiMaiChuRangTemp import PaiMaiChuRangTemp
  1049. from BaseDataMaintenance.model.oracle.ZhaoBiaoGongGaoTemp import ZhaoBiaoGongGaoTemp
  1050. from BaseDataMaintenance.model.oracle.ZhaoBiaoYuGaoTemp import ZhaoBiaoYuGaoTemp
  1051. from BaseDataMaintenance.model.oracle.ZhongBiaoXinXiTemp import ZhongBiaoXinXiTemp
  1052. from BaseDataMaintenance.model.oracle.ZiShenJieGuoTemp import ZiShenJieGuoTemp
  1053. from BaseDataMaintenance.model.oracle.ChanQuanJiaoYiTemp import ChanQuanJiaoYiTemp
  1054. from BaseDataMaintenance.model.oracle.GongGaoBianGengTemp import GongGaoBianGeng
  1055. from BaseDataMaintenance.model.oracle.KongZhiJiaTemp import KongZhiJiaTemp
  1056. from BaseDataMaintenance.model.oracle.TuDiKuangChanTemp import TuDiKuangChanTemp
  1057. from BaseDataMaintenance.model.oracle.ZhaoBiaoDaYiTemp import ZhaoBiaoDaYiTemp
  1058. from BaseDataMaintenance.model.oracle.ZhaoBiaoWenJianTemp import ZhaoBiaoWenJianTemp
  1059. schedule = BlockingScheduler()
  1060. schedule.add_job(self.temp2mq,"cron",args=(CaiGouYiXiangTemp({}),),second="*/10")
  1061. schedule.add_job(self.temp2mq,"cron",args=(PaiMaiChuRangTemp({}),),second="*/10")
  1062. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoGongGaoTemp({}),),second="*/10")
  1063. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoYuGaoTemp({}),),second="*/10")
  1064. schedule.add_job(self.temp2mq,"cron",args=(ZhongBiaoXinXiTemp({}),),second="*/10")
  1065. schedule.add_job(self.temp2mq,"cron",args=(ZiShenJieGuoTemp({}),),second="*/10")
  1066. schedule.add_job(self.temp2mq,"cron",args=(ChanQuanJiaoYiTemp({}),),second="*/10")
  1067. schedule.add_job(self.temp2mq,"cron",args=(GongGaoBianGeng({}),),second="*/10")
  1068. schedule.add_job(self.temp2mq,"cron",args=(KongZhiJiaTemp({}),),second="*/10")
  1069. schedule.add_job(self.temp2mq,"cron",args=(TuDiKuangChanTemp({}),),second="*/10")
  1070. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10")
  1071. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10")
  1072. schedule.add_job(self.ots2mq,"cron",second="*/10")
  1073. schedule.add_job(self.otstmp2mq,"cron",second="*/10")
  1074. schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  1075. schedule.start()
  1076. def transform_attachment():
  1077. from BaseDataMaintenance.model.ots.attachment import attachment
  1078. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  1079. from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots
  1080. from threading import Thread
  1081. from queue import Queue
  1082. task_queue = Queue()
  1083. def comsumer(task_queue,pool_postgres):
  1084. while 1:
  1085. _dict = task_queue.get(True)
  1086. try:
  1087. attach = Attachment_postgres(_dict)
  1088. if not attach.exists(pool_postgres):
  1089. attach.insert_row(pool_postgres)
  1090. except Exception as e:
  1091. traceback.print_exc()
  1092. def start_comsumer(task_queue):
  1093. pool_postgres = ConnectorPool(10,30,getConnection_postgres)
  1094. comsumer_count = 30
  1095. list_thread = []
  1096. for i in range(comsumer_count):
  1097. _t = Thread(target=comsumer,args=(task_queue,pool_postgres))
  1098. list_thread.append(_t)
  1099. for _t in list_thread:
  1100. _t.start()
  1101. ots_client = getConnect_ots()
  1102. _thread = Thread(target=start_comsumer,args=(task_queue,))
  1103. _thread.start()
  1104. bool_query = BoolQuery(must_queries=[
  1105. RangeQuery(attachment_crtime,"2022-05-06"),
  1106. BoolQuery(should_queries=[TermQuery(attachment_status,10),
  1107. TermQuery(attachment_status,ATTACHMENT_TOOLARGE)])
  1108. ])
  1109. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  1110. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(attachment_crtime,sort_order=SortOrder.DESC)]),get_total_count=True,limit=100),
  1111. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1112. list_dict = getRow_ots(rows)
  1113. for _dict in list_dict:
  1114. task_queue.put(_dict,True)
  1115. _count = len(list_dict)
  1116. while next_token:
  1117. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  1118. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1119. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1120. list_dict = getRow_ots(rows)
  1121. for _dict in list_dict:
  1122. task_queue.put(_dict,True)
  1123. _count += len(list_dict)
  1124. print("%d/%d,queue:%d"%(_count,total_count,task_queue.qsize()))
  1125. def del_test_doc():
  1126. ots_client = getConnect_ots()
  1127. bool_query = BoolQuery(must_queries=[RangeQuery("docid",range_to=0)])
  1128. list_data = []
  1129. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1130. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  1131. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1132. print(total_count)
  1133. list_row = getRow_ots(rows)
  1134. list_data.extend(list_row)
  1135. while next_token:
  1136. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1137. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1138. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1139. list_row = getRow_ots(rows)
  1140. list_data.extend(list_row)
  1141. for _row in list_data:
  1142. _doc = Document_tmp(_row)
  1143. _doc.delete_row(ots_client)
  1144. _html = Document_html(_row)
  1145. if _html.exists_row(ots_client):
  1146. _html.delete_row(ots_client)
  1147. def fixDoc_to_queue_extract():
  1148. pool_mq = ConnectorPool(10,20,getConnect_activateMQ)
  1149. try:
  1150. ots_client = getConnect_ots()
  1151. ots_capacity = getConnect_ots_capacity()
  1152. bool_query = BoolQuery(must_queries=[
  1153. RangeQuery("crtime","2022-05-31"),
  1154. TermQuery("docchannel",114)
  1155. ])
  1156. list_data = []
  1157. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1158. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  1159. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1160. print(total_count)
  1161. list_row = getRow_ots(rows)
  1162. list_data.extend(list_row)
  1163. _count = len(list_row)
  1164. while next_token:
  1165. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1166. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1167. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1168. list_row = getRow_ots(rows)
  1169. list_data.extend(list_row)
  1170. _count = len(list_data)
  1171. print("%d/%d"%(_count,total_count))
  1172. task_queue = Queue()
  1173. for _row in list_data:
  1174. if "all_columns" in _row:
  1175. _row.pop("all_columns")
  1176. _html = Document(_row)
  1177. task_queue.put(_html)
  1178. def _handle(item,result_queue):
  1179. _html = item
  1180. _html.fix_columns(ots_capacity,["dochtmlcon"],True)
  1181. print(_html.getProperties().get(document_tmp_docid))
  1182. send_msg_toacmq(pool_mq,json.dumps(_html.getProperties()),"/queue/dataflow_extract")
  1183. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1184. mt.run()
  1185. except Exception as e:
  1186. traceback.print_exc()
  1187. finally:
  1188. pool_mq.destory()
  1189. def check_data_synchronization():
  1190. # filepath = "C:\\Users\\Administrator\\Desktop\\to_check.log"
  1191. # list_uuid = []
  1192. # _regrex = "delete\s+(?P<tablename>[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  1193. # with open(filepath,"r",encoding="utf8") as f:
  1194. # while 1:
  1195. # _line = f.readline()
  1196. # if not _line:
  1197. # break
  1198. # _match = re.search(_regrex,_line)
  1199. # if _match is not None:
  1200. # _uuid = _match.groupdict().get("uuid")
  1201. # tablename = _match.groupdict.get("tablename")
  1202. # if _uuid is not None:
  1203. # list_uuid.append({"uuid":_uuid,"tablename":tablename})
  1204. # print("total_count:",len(list_uuid))
  1205. import pandas as pd
  1206. from BaseDataMaintenance.common.Utils import load
  1207. task_queue = Queue()
  1208. list_data = []
  1209. df_data = load("uuid.pk")
  1210. # df_data = pd.read_excel("check.xlsx")
  1211. for uuid,tablename in zip(df_data["uuid"],df_data["tablename"]):
  1212. _dict = {"uuid":uuid,
  1213. "tablename":tablename}
  1214. list_data.append(_dict)
  1215. task_queue.put(_dict)
  1216. print("qsize:",task_queue.qsize())
  1217. ots_client = getConnect_ots()
  1218. def _handle(_item,result_queue):
  1219. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  1220. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1221. SearchQuery(bool_query,get_total_count=True),
  1222. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1223. _item["exists"] = total_count
  1224. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1225. mt.run()
  1226. df_data = {"uuid":[],
  1227. "tablename":[],
  1228. "exists":[]}
  1229. for _data in list_data:
  1230. if _data["exists"]==0:
  1231. for k,v in df_data.items():
  1232. v.append(_data.get(k))
  1233. import pandas as pd
  1234. df2 = pd.DataFrame(df_data)
  1235. df2.to_excel("check1.xlsx")
  1236. current_path = os.path.abspath(os.path.dirname(__file__))
  1237. def fixDoc_to_queue_init(filename=""):
  1238. import pandas as pd
  1239. from BaseDataMaintenance.model.oracle.GongGaoTemp import dict_oracle2ots
  1240. if filename=="":
  1241. filename = os.path.join(current_path,"check.xlsx")
  1242. df = pd.read_excel(filename)
  1243. dict_oracle2ots.pop("docchannel")
  1244. row_name = ",".join(list(dict_oracle2ots.keys()))
  1245. conn = getConnection_oracle()
  1246. cursor = conn.cursor()
  1247. _count = 0
  1248. for uuid,tablename,_exists in zip(df["uuid"],df["tablename"],df["exists"]):
  1249. if _exists==0:
  1250. _count += 1
  1251. _source = str(tablename).replace("_TEMP","")
  1252. sql = " insert into %s(%s) select %s from %s where id='%s' "%(tablename,row_name,row_name,_source,uuid)
  1253. cursor.execute(sql)
  1254. log("%d:%s"%(_count,sql))
  1255. conn.commit()
  1256. if __name__ == '__main__':
  1257. # di = Dataflow_init()
  1258. # di.start_dataflow_init()
  1259. # transform_attachment()
  1260. # del_test_doc()
  1261. # de = Dataflow_ActivteMQ_extract()
  1262. # de.start_flow_extract()
  1263. # fixDoc_to_queue_extract()
  1264. # check_data_synchronization()
  1265. fixDoc_to_queue_init(filename="C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-03.log_check.xlsx")