dataflow_mq.py 80 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798
  1. from BaseDataMaintenance.maintenance.dataflow import *
  2. from BaseDataMaintenance.common.activateMQUtils import *
  3. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ,getConnection_postgres,getConnection_mysql,getConnection_oracle,getConnect_ots_capacity,getConnect_redis_doc
  4. from BaseDataMaintenance.dataSource.setttings import *
  5. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  6. import os
  7. from BaseDataMaintenance.common.ossUtils import *
  8. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  9. from BaseDataMaintenance.model.ots.document import Document
  10. from BaseDataMaintenance.common.Utils import article_limit
  11. from BaseDataMaintenance.common.documentFingerprint import getFingerprint
  12. from BaseDataMaintenance.model.postgres.document_extract import *
  13. from BaseDataMaintenance.model.oracle.T_SHEN_PI_XIANG_MU import *
  14. import sys
  15. sys.setrecursionlimit(1000000)
  16. from multiprocessing import Process
  17. class ActiveMQListener():
  18. def __init__(self,conn,_queue,*args,**kwargs):
  19. self.conn = conn
  20. self._queue = _queue
  21. def on_error(self, headers):
  22. log("===============")
  23. log('received an error %s' % str(headers.body))
  24. def on_message(self, headers):
  25. log("====message====")
  26. message_id = headers.headers["message-id"]
  27. body = headers.body
  28. self._queue.put({"frame":headers,"conn":self.conn},True)
  29. def __del__(self):
  30. self.conn.disconnect()
  31. class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
  32. class AttachmentMQListener():
  33. def __init__(self,conn,_func,_idx,*args,**kwargs):
  34. self.conn = conn
  35. self._func = _func
  36. self._idx = _idx
  37. def on_error(self, headers):
  38. log("===============")
  39. log('received an error %s' % str(headers.body))
  40. def on_message(self, headers):
  41. try:
  42. log("get message of idx:%s"%(str(self._idx)))
  43. message_id = headers.headers["message-id"]
  44. body = headers.body
  45. _dict = {"frame":headers,"conn":self.conn}
  46. self._func(_dict=_dict)
  47. except Exception as e:
  48. traceback.print_exc()
  49. pass
  50. def __del__(self):
  51. self.conn.disconnect()
  52. def __init__(self):
  53. Dataflow_attachment.__init__(self)
  54. self.mq_attachment = "/queue/dataflow_attachment"
  55. self.mq_attachment_failed = "/queue/dataflow_attachment_failed"
  56. self.mq_extract = "/queue/dataflow_extract"
  57. self.queue_attachment_ocr = Queue()
  58. self.queue_attachment_not_ocr = Queue()
  59. self.comsumer_count = 90
  60. self.comsumer_process_count = 5
  61. self.retry_comsumer_count = 10
  62. self.retry_times = 5
  63. self.list_attachment_comsumer = []
  64. # for _i in range(self.comsumer_count):
  65. # listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.queue_attachment)
  66. # createComsumer(listener_attachment,self.mq_attachment)
  67. # self.list_attachment_comsumer.append(listener_attachment)
  68. self.attach_pool = ConnectorPool(10,30,getConnection_postgres)
  69. self.redis_pool = ConnectorPool(10,30,getConnect_redis_doc)
  70. self.conn_mq = getConnect_activateMQ()
  71. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  72. self.session = None
  73. # for _ in range(self.comsumer_process_count):
  74. # listener_p = Process(target=self.start_attachment_listener)
  75. # listener_p.start()
  76. listener_p = Process(target=self.start_attachment_listener)
  77. listener_p.start()
  78. def start_attachment_listener(self):
  79. for _i in range(self.comsumer_count):
  80. listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
  81. createComsumer(listener_attachment,self.mq_attachment)
  82. self.list_attachment_comsumer.append(listener_attachment)
  83. while 1:
  84. for i in range(len(self.list_attachment_comsumer)):
  85. if self.list_attachment_comsumer[i].conn.is_connected():
  86. continue
  87. else:
  88. listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
  89. createComsumer(listener,self.mq_attachment)
  90. self.list_attachment_comsumer[i] = listener
  91. time.sleep(5)
  92. def monitor_listener(self):
  93. for i in range(len(self.list_attachment_comsumer)):
  94. if self.list_attachment_comsumer[i].conn.is_connected():
  95. continue
  96. else:
  97. listener = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  98. createComsumer(listener,self.mq_attachment)
  99. self.list_attachment_comsumer[i] = listener
  100. def process_failed_attachment(self):
  101. from BaseDataMaintenance.java.MQInfo import getQueueSize
  102. attachment_size = getQueueSize("dataflow_attachment")
  103. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  104. if attachment_size<100 and failed_attachment_size>0:
  105. list_comsumer = []
  106. for _i in range(self.retry_comsumer_count):
  107. listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
  108. list_comsumer.append(listener_attachment)
  109. createComsumer(listener_attachment,self.mq_attachment_failed)
  110. while 1:
  111. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  112. if failed_attachment_size==0:
  113. break
  114. time.sleep(10)
  115. for _c in list_comsumer:
  116. _c.conn.disconnect()
  117. def attachment_listener_handler(self,_dict):
  118. try:
  119. frame = _dict["frame"]
  120. conn = _dict["conn"]
  121. message_id = frame.headers["message-id"]
  122. item = json.loads(frame.body)
  123. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  124. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  125. if len(page_attachments)==0:
  126. newitem ={"item":item,"list_attach":[],"message_id":message_id,"conn":conn}
  127. else:
  128. list_fileMd5 = []
  129. for _atta in page_attachments:
  130. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  131. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  132. newitem = {"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn}
  133. log("attachment get doc:%s"%(str(newitem.get("item",{}).get("docid"))))
  134. self.attachment_recognize(newitem,None)
  135. log("attachment get doc:%s succeed"%(str(newitem.get("item",{}).get("docid"))))
  136. except Exception as e:
  137. traceback.print_exc()
  138. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  139. try:
  140. list_html = []
  141. swf_urls = []
  142. _not_failed = True
  143. for _attach in list_attach:
  144. #测试全跑
  145. _filemd5 = _attach.getProperties().get(attachment_filemd5)
  146. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  147. log("%s has processed or toolarge"%(_filemd5))
  148. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  149. if _html is None:
  150. _html = ""
  151. list_html.append({attachment_filemd5:_filemd5,
  152. "html":_html})
  153. else:
  154. #has process_time then jump
  155. if len(str(_attach.getProperties().get(attachment_process_time,"")))>10 and _attach.getProperties().get(attachment_status)!=ATTACHMENT_INIT and not (_attach.getProperties().get(attachment_status)>=ATTACHMENT_MC_FAILED_FROM and _attach.getProperties().get(attachment_status)<=ATTACHMENT_MC_FAILED_TO):
  156. log("%s has process_time jump"%(_filemd5))
  157. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  158. if _html is None:
  159. _html = ""
  160. list_html.append({attachment_filemd5:_filemd5,
  161. "html":_html})
  162. else:
  163. log("%s requesting interface"%(_filemd5))
  164. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  165. if not _succeed:
  166. _not_failed = False
  167. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  168. if _html is None:
  169. _html = ""
  170. list_html.append({attachment_filemd5:_filemd5,
  171. "html":_html})
  172. if _attach.getProperties().get(attachment_filetype)=="swf":
  173. swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  174. if not _not_failed:
  175. return False,list_html,swf_urls
  176. return True,list_html,swf_urls
  177. except requests.ConnectionError as e1:
  178. raise e1
  179. except Exception as e:
  180. return False,list_html,swf_urls
  181. def attachment_recognize(self,_dict,result_queue):
  182. '''
  183. 识别附件内容
  184. :param _dict: 附件内容
  185. :param result_queue:
  186. :return:
  187. '''
  188. try:
  189. item = _dict.get("item")
  190. list_attach = _dict.get("list_attach")
  191. conn = _dict["conn"]
  192. message_id = _dict.get("message_id")
  193. _retry_times = item.get("retry_times",0)
  194. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  195. "docid":item.get("docid")})
  196. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  197. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  198. dhtml.delete_bidi_a()
  199. dtmp = Document_tmp(item)
  200. start_time = time.time()
  201. #调用识别接口
  202. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  203. _to_ack = False
  204. if not _succeed and _retry_times<self.retry_times:
  205. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  206. item["retry_times"] = _retry_times+1
  207. #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
  208. if item["retry_times"]>=self.retry_times:
  209. send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
  210. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
  211. #失败保存
  212. if _retry_times==0:
  213. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  214. dtmp.setValue(document_tmp_status,0,True)
  215. if not dtmp.exists_row(self.ots_client):
  216. dtmp.update_row(self.ots_client)
  217. dhtml.update_row(self.ots_client)
  218. if send_succeed:
  219. _to_ack = True
  220. else:
  221. try:
  222. log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),str(len(list_html))))
  223. dhtml.updateSWFImages(swf_urls)
  224. dhtml.updateAttachment(list_html)
  225. dtmp.setValue(document_tmp_attachment_extract_status,1,True)
  226. dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
  227. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
  228. if send_succeed:
  229. _to_ack = True
  230. except Exception as e:
  231. traceback.print_exc()
  232. if _to_ack:
  233. ackMsg(conn,message_id)
  234. log("document:%d get attachments with result:%s %s retry_times:%d"%(item.get("docid"),str(_succeed),str(_to_ack),_retry_times))
  235. except Exception as e:
  236. traceback.print_exc()
  237. if time.time()-start_time<10:
  238. item["retry_times"] -= 1
  239. if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment):
  240. ackMsg(conn,message_id)
  241. def request_attachment_interface(self,attach,_dochtmlcon):
  242. filemd5 = attach.getProperties().get(attachment_filemd5)
  243. _status = attach.getProperties().get(attachment_status)
  244. _filetype = attach.getProperties().get(attachment_filetype)
  245. _path = attach.getProperties().get(attachment_path)
  246. _uuid = uuid4()
  247. objectPath = attach.getProperties().get(attachment_path)
  248. docids = attach.getProperties().get(attachment_docids)
  249. if objectPath is None:
  250. relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex)
  251. else:
  252. relative_path = objectPath[5:].replace("//","/")
  253. localpath = "/FileInfo/%s"%(relative_path)
  254. if not os.path.exists(localpath):
  255. if not os.path.exists(os.path.dirname(localpath)):
  256. os.makedirs(os.path.dirname(localpath))
  257. local_exists = False
  258. else:
  259. local_exists = True
  260. _size = os.path.getsize(localpath)
  261. not_failed_flag = True
  262. try:
  263. d_start_time = time.time()
  264. if not local_exists:
  265. log("md5:%s path:%s not exists,start downloading"%(filemd5,objectPath))
  266. try:
  267. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  268. except Exception as e:
  269. download_succeed = False
  270. else:
  271. log("md5:%s path:%s exists"%(filemd5,objectPath[5:]))
  272. if not (local_exists or download_succeed):
  273. _ots_attach = attachment(attach.getProperties_ots())
  274. _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_classification,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True)
  275. log("md5:%s path:%s file not in local or oss,search ots.attachment"%(filemd5,objectPath))
  276. if _ots_attach.getProperties().get(attachment_attachmenthtml,"")!="" and str(_ots_attach.getProperties().get(attachment_status))!=str(ATTACHMENT_INIT):
  277. attach.setValue(attachment_attachmenthtml,_ots_attach.getProperties().get(attachment_attachmenthtml,""))
  278. attach.setValue(attachment_attachmentcon,_ots_attach.getProperties().get(attachment_attachmentcon,""))
  279. attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,""))
  280. attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,""))
  281. attach.setValue(attachment_classification,_ots_attach.getProperties().get(attachment_classification,""))
  282. # if attach.exists(self.attach_pool):
  283. # attach.update_row(self.attach_pool)
  284. # else:
  285. # attach.insert_row(self.attach_pool)
  286. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  287. try:
  288. if os.exists(localpath):
  289. os.remove(localpath)
  290. except Exception as e:
  291. pass
  292. return True
  293. if _ots_exists:
  294. objectPath = attach.getProperties().get(attachment_path)
  295. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  296. if download_succeed:
  297. log("md5:%s path:%s download file from oss succeed"%(filemd5,objectPath))
  298. else:
  299. log("md5:%s path:%s download file from ots failed=="%(filemd5,objectPath))
  300. else:
  301. log("md5:%s path:%s not found in ots"%(filemd5,objectPath))
  302. if local_exists or download_succeed:
  303. _size = os.path.getsize(localpath)
  304. attach.setValue(attachment_size,_size,True)
  305. if _size>ATTACHMENT_LARGESIZE:
  306. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE,True)
  307. log("attachment :%s of path:%s to large"%(filemd5,_path))
  308. _ots_attach = attachment(attach.getProperties_ots())
  309. _ots_attach.update_row(self.ots_client)
  310. # #更新postgres
  311. # if attach.exists(self.attach_pool):
  312. # attach.update_row(self.attach_pool)
  313. # else:
  314. # attach.insert_row(self.attach_pool)
  315. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  316. if local_exists:
  317. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  318. os.remove(localpath)
  319. return True
  320. time_download = time.time()-d_start_time
  321. #调用接口处理结果
  322. start_time = time.time()
  323. _filetype = attach.getProperties().get(attachment_filetype)
  324. # _data_base64 = base64.b64encode(open(localpath,"rb").read())
  325. # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
  326. _success,_html,swf_images,classification = getAttachDealInterface(None,_filetype,path=localpath,session=self.session)
  327. log("process filemd5:%s %s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
  328. if _success:
  329. if len(_html)<5:
  330. _html = ""
  331. else:
  332. if len(_html)>1:
  333. _html = "interface return error"
  334. else:
  335. sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  336. _html = ""
  337. return False
  338. swf_images = eval(swf_images)
  339. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  340. swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  341. if len(swf_urls)==0:
  342. objectPath = attach.getProperties().get(attachment_path,"")
  343. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  344. if not os.path.exists(swf_dir):
  345. os.mkdir(swf_dir)
  346. for _i in range(len(swf_images)):
  347. _base = swf_images[_i]
  348. _base = base64.b64decode(_base)
  349. filename = "swf_page_%d.png"%(_i)
  350. filepath = os.path.join(swf_dir,filename)
  351. with open(filepath,"wb") as f:
  352. f.write(_base)
  353. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  354. if os.path.exists(swf_dir):
  355. os.rmdir(swf_dir)
  356. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  357. if re.search("<td",_html) is not None:
  358. attach.setValue(attachment_has_table,1,True)
  359. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  360. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  361. if _file_title!="":
  362. attach.setValue(attachment_file_title,_file_title,True)
  363. if filelink!="":
  364. attach.setValue(attachment_file_link,filelink,True)
  365. attach.setValue(attachment_attachmenthtml,_html,True)
  366. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  367. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  368. attach.setValue(attachment_recsize,len(_html),True)
  369. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  370. attach.setValue(attachment_classification,classification,True)
  371. #更新ots
  372. _ots_attach = attachment(attach.getProperties_ots())
  373. _ots_attach.update_row(self.ots_client) #线上再开放更新
  374. # #更新postgres
  375. # if attach.exists(self.attach_pool):
  376. # attach.update_row(self.attach_pool)
  377. # else:
  378. # attach.insert_row(self.attach_pool)
  379. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  380. if local_exists:
  381. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  382. try:
  383. if upload_status and os.exists(localpath):
  384. os.remove(localpath)
  385. except Exception as e:
  386. pass
  387. return True
  388. else:
  389. return True
  390. except requests.ConnectionError as e1:
  391. raise e1
  392. except oss2.exceptions.NotFound as e:
  393. return True
  394. except Exception as e:
  395. traceback.print_exc()
  396. # def flow_attachment(self):
  397. # self.flow_attachment_producer()
  398. # self.flow_attachment_producer_comsumer()
  399. def getAttachPath(self,filemd5,_dochtmlcon):
  400. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  401. list_mark = ["data","filelink"]
  402. for _mark in list_mark:
  403. _find = _soup.find("a",attrs={_mark:filemd5})
  404. filelink = ""
  405. if _find is None:
  406. _find = _soup.find("img",attrs={_mark:filemd5})
  407. if _find is not None:
  408. filelink = _find.attrs.get("src","")
  409. else:
  410. filelink = _find.attrs.get("href","")
  411. if filelink.find("bidizhaobiao")>=0:
  412. _path = filelink.split("/file")
  413. if len(_path)>1:
  414. return _path[1]
  415. def getAttach_json_fromRedis(self,filemd5):
  416. db = self.redis_pool.getConnector()
  417. try:
  418. _key = "attach-%s"%(filemd5)
  419. _attach_json = db.get(_key)
  420. return _attach_json
  421. except Exception as e:
  422. log("getAttach_json_fromRedis error %s"%(str(e)))
  423. finally:
  424. try:
  425. if db.connection.check_health():
  426. self.redis_pool.putConnector(db)
  427. except Exception as e:
  428. pass
  429. return None
  430. def putAttach_json_toRedis(self,filemd5,extract_dict):
  431. db = self.redis_pool.getConnector()
  432. try:
  433. new_dict = {}
  434. for k,v in extract_dict.items():
  435. if not isinstance(v,set):
  436. new_dict[k] = v
  437. _key = "attach-%s"%(filemd5)
  438. _extract_json = db.set(str(_key),json.dumps(new_dict))
  439. db.expire(_key,3600*3)
  440. return _extract_json
  441. except Exception as e:
  442. log("putExtract_json_toRedis error%s"%(str(e)))
  443. traceback.print_exc()
  444. finally:
  445. try:
  446. if db.connection.check_health():
  447. self.redis_pool.putConnector(db)
  448. except Exception as e:
  449. pass
  450. def getAttachments(self,list_filemd5,_dochtmlcon):
  451. conn = self.attach_pool.getConnector()
  452. #搜索postgres
  453. try:
  454. to_find_md5 = []
  455. for _filemd5 in list_filemd5[:50]:
  456. if _filemd5 is not None:
  457. to_find_md5.append(_filemd5)
  458. conditions = ["filemd5 in ('%s')"%("','".join(to_find_md5))]
  459. list_attachment = []
  460. set_md5 = set()
  461. # list_attachment = Attachment_postgres.select_rows(conn,Attachment_postgres,"attachment",conditions)
  462. # for _attach in list_attachment:
  463. # set_md5.add(_attach.getProperties().get(attachment_filemd5))
  464. for _filemd5 in to_find_md5:
  465. _json = self.getAttach_json_fromRedis(_filemd5)
  466. if _json is not None:
  467. set_md5.add(_filemd5)
  468. list_attachment.append(Attachment_postgres(json.loads(_json)))
  469. log("select localpath database %d/%d"%(len(set_md5),len(to_find_md5)))
  470. for _filemd5 in to_find_md5:
  471. if _filemd5 not in set_md5:
  472. _path = self.getAttachPath(_filemd5,_dochtmlcon)
  473. log("getAttachments search in ots:%s"%(_filemd5))
  474. _attach = {attachment_filemd5:_filemd5}
  475. _attach_ots = attachment(_attach)
  476. if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time],True):
  477. if _attach_ots.getProperties().get(attachment_status) is not None:
  478. log("getAttachments find in ots:%s"%(_filemd5))
  479. list_attachment.append(Attachment_postgres(_attach_ots.getProperties()))
  480. else:
  481. log("getAttachments search in path:%s"%(_filemd5))
  482. if _path:
  483. log("getAttachments find in path:%s"%(_filemd5))
  484. if _path[0]=="/":
  485. _path = _path[1:]
  486. _filetype = _path.split(".")[-1]
  487. _attach = {attachment_filemd5:_filemd5,
  488. attachment_filetype:_filetype,
  489. attachment_status:20,
  490. attachment_path:"%s/%s"%(_filemd5[:4],_path),
  491. attachment_crtime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  492. list_attachment.append(Attachment_postgres(_attach))
  493. return list_attachment
  494. except Exception as e:
  495. log("attachProcess comsumer error %s"%str(e))
  496. log(str(to_find_md5))
  497. traceback.print_exc()
  498. return []
  499. finally:
  500. self.attach_pool.putConnector(conn)
  501. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  502. q_size = self.queue_attachment.qsize()
  503. qsize_ocr = self.queue_attachment_ocr.qsize()
  504. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  505. log("queue_attachment:%d,queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(q_size,qsize_ocr,qsize_not_ocr))
  506. def flow_attachment_producer_comsumer(self):
  507. log("start flow_attachment comsumer")
  508. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1,need_stop=False,restart=True)
  509. mt.run()
  510. def flow_attachment_process(self):
  511. self.process_comsumer()
  512. # p = Process(target = self.process_comsumer)
  513. # p.start()
  514. # p.join()
  515. def set_queue(self,_dict):
  516. list_attach = _dict.get("list_attach")
  517. to_ocr = False
  518. for attach in list_attach:
  519. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  520. to_ocr = True
  521. break
  522. if to_ocr:
  523. self.queue_attachment_ocr.put(_dict,True)
  524. else:
  525. self.queue_attachment_not_ocr.put(_dict,True)
  526. def comsumer_handle(self,_dict,result_queue):
  527. try:
  528. frame = _dict["frame"]
  529. conn = _dict["conn"]
  530. message_id = frame.headers["message-id"]
  531. item = json.loads(frame.body)
  532. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  533. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  534. if len(page_attachments)==0:
  535. self.set_queue({"item":item,"list_attach":[],"message_id":message_id,"conn":conn})
  536. else:
  537. list_fileMd5 = []
  538. for _atta in page_attachments:
  539. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  540. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  541. self.set_queue({"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn})
  542. except Exception as e:
  543. traceback.print_exc()
  544. def remove_attachment_postgres(self):
  545. current_date = getCurrent_date(format="%Y-%m-%d")
  546. last_date = timeAdd(current_date,-2,format="%Y-%m-%d")
  547. sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
  548. conn = self.attach_pool.getConnector()
  549. try:
  550. cursor = conn.cursor()
  551. cursor.execute(sql)
  552. conn.commit()
  553. self.attach_pool.putConnector(conn)
  554. except Exception as e:
  555. conn.close()
  556. def start_flow_attachment(self):
  557. schedule = BlockingScheduler()
  558. # schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  559. # schedule.add_job(self.flow_attachment,"cron",second="*/10")
  560. # schedule.add_job(self.flow_attachment_producer,"cron",second="*/10")
  561. # schedule.add_job(self.flow_attachment_producer_comsumer,"cron",second="*/10")
  562. # schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  563. # schedule.add_job(self.monitor_attachment_process,"cron",second="*/10")
  564. # schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
  565. schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
  566. schedule.start()
  567. class Dataflow_ActivteMQ_extract(Dataflow_extract):
  568. class ExtractListener():
  569. def __init__(self,conn,_func,_idx,*args,**kwargs):
  570. self.conn = conn
  571. self._func = _func
  572. self._idx = _idx
  573. def on_message(self, headers):
  574. try:
  575. log("get message of idx:%d"%(self._idx))
  576. message_id = headers.headers["message-id"]
  577. body = headers.body
  578. log("get message %s crtime:%s"%(message_id,json.loads(body).get("crtime","")))
  579. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  580. except Exception as e:
  581. traceback.print_exc()
  582. pass
  583. def on_error(self, headers):
  584. log('received an error %s' % str(headers.body))
  585. def __del__(self):
  586. self.conn.disconnect()
  587. def __init__(self,create_listener=True):
  588. Dataflow_extract.__init__(self)
  589. self.industy_url = "http://127.0.0.1:15000/industry_extract"
  590. self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",20],
  591. ["http://192.168.0.115:15030/content_extract",10]
  592. ]
  593. self.mq_extract = "/queue/dataflow_extract"
  594. self.mq_extract_failed = "/queue/dataflow_extract_failed"
  595. self.whole_weight = 0
  596. for _url,weight in self.extract_interfaces:
  597. self.whole_weight+= weight
  598. current_weight = 0
  599. for _i in range(len(self.extract_interfaces)):
  600. current_weight += self.extract_interfaces[_i][1]
  601. self.extract_interfaces[_i][1] = current_weight/self.whole_weight
  602. self.comsumer_count = 5
  603. # self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres)
  604. self.pool_redis_doc = ConnectorPool(1,self.comsumer_count,getConnect_redis_doc)
  605. self.conn_mq = getConnect_activateMQ()
  606. self.pool_mq = ConnectorPool(1,30,getConnect_activateMQ)
  607. self.block_url = RLock()
  608. self.url_count = 0
  609. self.session = None
  610. self.list_extract_comsumer = []
  611. # for _i in range(self.comsumer_count):
  612. # listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  613. # createComsumer(listener_extract,self.mq_extract)
  614. # self.list_extract_comsumer.append(listener_extract)
  615. if create_listener:
  616. for ii in range(10):
  617. listener_p = Process(target=self.start_extract_listener)
  618. listener_p.start()
  619. def start_extract_listener(self):
  620. self.list_extract_comsumer = []
  621. for _i in range(self.comsumer_count):
  622. listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  623. createComsumer(listener_extract,self.mq_extract)
  624. self.list_extract_comsumer.append(listener_extract)
  625. while 1:
  626. for _i in range(len(self.list_extract_comsumer)):
  627. if self.list_extract_comsumer[_i].conn.is_connected():
  628. continue
  629. else:
  630. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  631. createComsumer(listener,self.mq_extract)
  632. self.list_extract_comsumer[_i] = listener
  633. time.sleep(5)
  634. def monitor_listener(self):
  635. for i in range(len(self.list_extract_comsumer)):
  636. if self.list_extract_comsumer[i].conn.is_connected():
  637. continue
  638. else:
  639. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
  640. createComsumer(listener,self.mq_extract)
  641. self.list_extract_comsumer[i] = listener
  642. def getExtract_url(self):
  643. # _url_num = 0
  644. # with self.block_url:
  645. # self.url_count += 1
  646. # self.url_count %= self.whole_weight
  647. # _url_num = self.url_count
  648. _r = random.random()
  649. # _r = _url_num/self.whole_weight
  650. for _i in range(len(self.extract_interfaces)):
  651. if _r<=self.extract_interfaces[_i][1]:
  652. return self.extract_interfaces[_i][0]
  653. def request_extract_interface(self,json,headers):
  654. # _i = random.randint(0,len(self.extract_interfaces)-1)
  655. # _i = 0
  656. # _url = self.extract_interfaces[_i]
  657. _url = self.getExtract_url()
  658. log("extract_url:%s"%(str(_url)))
  659. with requests.Session() as session:
  660. resp = session.post(_url,json=json,headers=headers,timeout=10*60)
  661. return resp
  662. def request_industry_interface(self,json,headers):
  663. resp = requests.post(self.industy_url,json=json,headers=headers)
  664. return resp
  665. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  666. q_size = self.queue_extract.qsize()
  667. log("queue extract size:%d"%(q_size))
  668. def process_extract_failed(self):
  669. def _handle(_dict,result_queue):
  670. frame = _dict.get("frame")
  671. message_id = frame.headers["message-id"]
  672. subscription = frame.headers.setdefault('subscription', None)
  673. conn = _dict.get("conn")
  674. body = frame.body
  675. if body is not None:
  676. item = json.loads(body)
  677. item["extract_times"] = 10
  678. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  679. ackMsg(conn,message_id,subscription)
  680. from BaseDataMaintenance.java.MQInfo import getQueueSize
  681. try:
  682. extract_failed_size = getQueueSize("dataflow_extract_failed")
  683. extract_size = getQueueSize("dataflow_extract")
  684. log("extract_failed_size %s extract_size %s"%(str(extract_failed_size),str(extract_size)))
  685. if extract_failed_size>0 and extract_size<100:
  686. failed_listener = self.ExtractListener(getConnect_activateMQ(),_handle,1)
  687. createComsumer(failed_listener,self.mq_extract_failed)
  688. while 1:
  689. extract_failed_size = getQueueSize("dataflow_extract_failed")
  690. if extract_failed_size==0:
  691. break
  692. time.sleep(10)
  693. failed_listener.conn.disconnect()
  694. except Exception as e:
  695. traceback.print_exc()
  696. def flow_extract(self,):
  697. self.comsumer()
  698. def comsumer(self):
  699. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,20,1,True)
  700. mt.run()
  701. def getExtract_json_fromDB(self,_fingerprint):
  702. conn = self.pool_postgres.getConnector()
  703. try:
  704. list_extract = Document_extract_postgres.select_rows(conn,Document_extract_postgres,"document_extract",[" fingerprint='%s'"%_fingerprint])
  705. if len(list_extract)>0:
  706. _extract = list_extract[0]
  707. return _extract.getProperties().get(document_extract_extract_json)
  708. except Exception as e:
  709. traceback.print_exc()
  710. finally:
  711. self.pool_postgres.putConnector(conn)
  712. return None
  713. def putExtract_json_toDB(self,fingerprint,docid,extract_json):
  714. _de = Document_extract_postgres({document_extract_fingerprint:fingerprint,
  715. document_extract_docid:docid,
  716. document_extract_extract_json:extract_json})
  717. _de.insert_row(self.pool_postgres,1)
  718. def getExtract_json_fromRedis(self,_fingerprint):
  719. db = self.pool_redis_doc.getConnector()
  720. try:
  721. _extract_json = db.get(_fingerprint)
  722. return _extract_json
  723. except Exception as e:
  724. log("getExtract_json_fromRedis error %s"%(str(e)))
  725. finally:
  726. try:
  727. if db.connection.check_health():
  728. self.pool_redis_doc.putConnector(db)
  729. except Exception as e:
  730. pass
  731. return None
  732. def putExtract_json_toRedis(self,fingerprint,extract_json):
  733. db = self.pool_redis_doc.getConnector()
  734. try:
  735. _extract_json = db.set(str(fingerprint),extract_json)
  736. db.expire(fingerprint,3600*2)
  737. return _extract_json
  738. except Exception as e:
  739. log("putExtract_json_toRedis error%s"%(str(e)))
  740. traceback.print_exc()
  741. finally:
  742. try:
  743. if db.connection.check_health():
  744. self.pool_redis_doc.putConnector(db)
  745. except Exception as e:
  746. pass
  747. def comsumer_handle(self,_dict,result_queue):
  748. try:
  749. log("start handle")
  750. frame = _dict["frame"]
  751. conn = _dict["conn"]
  752. message_id = frame.headers["message-id"]
  753. subscription = frame.headers.setdefault('subscription', None)
  754. item = json.loads(frame.body)
  755. dtmp = Document_tmp(item)
  756. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  757. "docid":item.get("docid")})
  758. extract_times = item.get("extract_times",0)+1
  759. item["extract_times"] = extract_times
  760. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  761. html_len = len(_dochtmlcon)
  762. if html_len>50000:
  763. log("docid %s dochtmlcon too long len %d "%(str(item.get("docid")),html_len))
  764. try:
  765. _dochtmlcon = re.sub("<html>|</html>|<body>|</body>", "", _dochtmlcon)
  766. _soup = BeautifulSoup(_dochtmlcon,"html5lib")
  767. _soup = article_limit(_soup,10000)
  768. # if len(_dochtmlcon)>200000:
  769. # _find = _soup.find("div",attrs={"class":"richTextFetch"})
  770. # if _find is not None:
  771. # _find.decompose()
  772. # else:
  773. # _soup = article_limit(_soup,50000)
  774. _dochtmlcon = str(_soup)
  775. except Exception as e:
  776. traceback.print_exc()
  777. ackMsg(conn,message_id,subscription)
  778. return
  779. log("docid %s len %d limit to %d"%(str(item.get("docid")),html_len,len(_dochtmlcon)))
  780. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  781. _extract = Document_extract({})
  782. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  783. _extract.setValue(document_extract2_docid,item.get(document_docid))
  784. all_done = 1
  785. data = {}
  786. for k,v in item.items():
  787. data[k] = v
  788. data["timeout"] = 440
  789. data["doc_id"] = data.get(document_tmp_docid,0)
  790. # if data["docid"]<298986054 and data["docid"]>0:
  791. # log("jump docid %s"%(str(data["docid"])))
  792. # ackMsg(conn,message_id,subscription)
  793. # return
  794. data["content"] = data.get(document_tmp_dochtmlcon,"")
  795. if document_tmp_dochtmlcon in data:
  796. data.pop(document_tmp_dochtmlcon)
  797. data["title"] = data.get(document_tmp_doctitle,"")
  798. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  799. data["web_source_name"] = item.get(document_tmp_web_source_name,"")
  800. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  801. _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))
  802. if all_done>0:
  803. _time = time.time()
  804. # extract_json = self.getExtract_json_fromDB(_fingerprint)
  805. extract_json = self.getExtract_json_fromRedis(_fingerprint)
  806. log("get json from db takes %.4f"%(time.time()-_time))
  807. # extract_json = None
  808. _docid = int(data["doc_id"])
  809. if extract_json is not None:
  810. log("fingerprint %s exists docid:%s"%(_fingerprint,str(_docid)))
  811. _extract.setValue(document_extract2_extract_json,extract_json,True)
  812. else:
  813. resp = self.request_extract_interface(json=data,headers=self.header)
  814. if (resp.status_code >=200 and resp.status_code<=213):
  815. extract_json = resp.content.decode("utf8")
  816. _extract.setValue(document_extract2_extract_json,extract_json,True)
  817. _time = time.time()
  818. # self.putExtract_json_toDB(_fingerprint,_docid,extract_json)
  819. self.putExtract_json_toRedis(_fingerprint,extract_json)
  820. log("get json to db takes %.4f"%(time.time()-_time))
  821. else:
  822. log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  823. all_done = -2
  824. # if all_done>0:
  825. # resp = self.request_industry_interface(json=data,headers=self.header)
  826. # if (resp.status_code >=200 and resp.status_code<=213):
  827. # _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  828. # else:
  829. # log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  830. # all_done = -3
  831. # _to_ack = False
  832. # if all_done>0 and len(_extract.getProperties().get(document_extract2_extract_json,""))<=2:
  833. # all_done = -4
  834. _extract.setValue(document_extract2_industry_json,"{}",True)
  835. try:
  836. if all_done!=1:
  837. sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  838. if extract_times>=10:
  839. #process as succeed
  840. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  841. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  842. dtmp.update_row(self.ots_client)
  843. dhtml.update_row(self.ots_client)
  844. #replace as {}
  845. _extract.setValue(document_extract2_extract_json,"{}",True)
  846. _extract.setValue(document_extract2_industry_json,"{}",True)
  847. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  848. _extract.update_row(self.ots_client)
  849. _to_ack = True
  850. elif extract_times>5:
  851. #transform to the extract_failed queue
  852. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  853. #process as succeed
  854. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  855. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  856. dtmp.update_row(self.ots_client)
  857. dhtml.update_row(self.ots_client)
  858. #replace as {}
  859. _extract.setValue(document_extract2_extract_json,"{}",True)
  860. _extract.setValue(document_extract2_industry_json,"{}",True)
  861. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  862. _extract.update_row(self.ots_client)
  863. _to_ack = True
  864. else:
  865. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
  866. #失败保存
  867. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  868. dtmp.setValue(document_tmp_status,60,True)
  869. if not dtmp.exists_row(self.ots_client):
  870. dtmp.update_row(self.ots_client)
  871. dhtml.update_row(self.ots_client)
  872. if send_succeed:
  873. _to_ack = True
  874. else:
  875. #process succeed
  876. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  877. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  878. if _docid==290816703:
  879. dtmp.setValue("test_json",extract_json,True)
  880. dtmp.update_row(self.ots_client)
  881. dhtml.update_row(self.ots_client)
  882. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  883. _extract.update_row(self.ots_client)
  884. _to_ack = True
  885. except Exception:
  886. traceback.print_exc()
  887. if _to_ack:
  888. ackMsg(conn,message_id,subscription)
  889. log("process %s docid:%d %s"%(str(_to_ack),data["doc_id"],str(all_done)))
  890. except requests.ConnectionError as e1:
  891. item["extract_times"] -= 1
  892. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  893. ackMsg(conn,message_id,subscription)
  894. except Exception as e:
  895. traceback.print_exc()
  896. sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
  897. log("process %s docid: failed message_id:%s"%(data["doc_id"],message_id))
  898. if extract_times>=10:
  899. #process as succeed
  900. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  901. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  902. dtmp.update_row(self.ots_client)
  903. dhtml.update_row(self.ots_client)
  904. #replace as {}
  905. _extract.setValue(document_extract2_extract_json,"{}",True)
  906. _extract.setValue(document_extract2_industry_json,"{}",True)
  907. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  908. _extract.update_row(self.ots_client)
  909. ackMsg(conn,message_id,subscription)
  910. elif extract_times>5:
  911. #transform to the extract_failed queue
  912. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  913. #process as succeed
  914. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  915. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  916. dtmp.update_row(self.ots_client)
  917. dhtml.update_row(self.ots_client)
  918. #replace as {}
  919. _extract.setValue(document_extract2_extract_json,"{}",True)
  920. _extract.setValue(document_extract2_industry_json,"{}",True)
  921. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  922. _extract.update_row(self.ots_client)
  923. ackMsg(conn,message_id,subscription)
  924. else:
  925. #transform to the extract queue
  926. #失败保存
  927. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  928. dtmp.setValue(document_tmp_status,60,True)
  929. if not dtmp.exists_row(self.ots_client):
  930. dtmp.update_row(self.ots_client)
  931. dhtml.update_row(self.ots_client)
  932. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  933. ackMsg(conn,message_id,subscription)
  934. def delete_document_extract(self,save_count=70*10000):
  935. conn = self.pool_postgres.getConnector()
  936. try:
  937. cursor = conn.cursor()
  938. sql = " select max(docid),min(docid) from document_extract "
  939. cursor.execute(sql)
  940. rows = cursor.fetchall()
  941. if len(rows)>0:
  942. maxdocid,mindocid = rows[0]
  943. d_mindocid = int(maxdocid)-save_count
  944. if mindocid<d_mindocid:
  945. sql = " delete from document_extract where docid<%d"%d_mindocid
  946. cursor.execute(sql)
  947. conn.commit()
  948. except Exception as e:
  949. traceback.print_exc()
  950. finally:
  951. self.pool_postgres.putConnector(conn)
  952. def start_flow_extract(self):
  953. schedule = BlockingScheduler()
  954. schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
  955. schedule.add_job(self.process_extract_failed,"cron",minute="*/5")
  956. # schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
  957. # schedule.add_job(self.monitor_listener,"cron",minute="*/5")
  958. schedule.start()
  959. from multiprocessing import RLock
  960. docid_lock = RLock()
  961. conn_mysql = None
  962. def generateRangeDocid(nums):
  963. global conn_mysql
  964. while 1:
  965. try:
  966. with docid_lock:
  967. if conn_mysql is None:
  968. conn_mysql = getConnection_mysql()
  969. cursor = conn_mysql.cursor()
  970. sql = "select serial_value from b2c_serial_no where serial_name='DocumentIdSerial'"
  971. cursor.execute(sql)
  972. rows = cursor.fetchall()
  973. current_docid = rows[0][0]
  974. next_docid = current_docid+1
  975. update_docid = current_docid+nums
  976. sql = " update b2c_serial_no set serial_value=%d where serial_name='DocumentIdSerial'"%(update_docid)
  977. cursor.execute(sql)
  978. conn_mysql.commit()
  979. return next_docid
  980. except Exception as e:
  981. conn_mysql = getConnection_mysql()
  982. # 自定义jsonEncoder
  983. class MyEncoder(json.JSONEncoder):
  984. def default(self, obj):
  985. if isinstance(obj, np.ndarray):
  986. return obj.tolist()
  987. elif isinstance(obj, bytes):
  988. return str(obj, encoding='utf-8')
  989. elif isinstance(obj, (np.float_, np.float16, np.float32,
  990. np.float64,Decimal)):
  991. return float(obj)
  992. elif isinstance(obj,str):
  993. return obj
  994. return json.JSONEncoder.default(self, obj)
  995. class Dataflow_init(Dataflow):
  996. class InitListener():
  997. def __init__(self,conn,*args,**kwargs):
  998. self.conn = conn
  999. self.get_count = 1000
  1000. self.count = self.get_count
  1001. self.begin_docid = None
  1002. self.mq_init = "/queue/dataflow_init"
  1003. self.mq_attachment = "/queue/dataflow_attachment"
  1004. self.mq_extract = "/queue/dataflow_extract"
  1005. self.pool_mq1 = ConnectorPool(1,4,getConnect_activateMQ)
  1006. def on_error(self, headers):
  1007. log('received an error %s' % headers.body)
  1008. def getRangeDocid(self):
  1009. begin_docid = generateRangeDocid(self.get_count)
  1010. self.begin_docid = begin_docid
  1011. self.count = 0
  1012. def getNextDocid(self):
  1013. if self.count>=self.get_count:
  1014. self.getRangeDocid()
  1015. next_docid = self.begin_docid+self.count
  1016. self.count += 1
  1017. return next_docid
  1018. def on_message(self, headers):
  1019. try:
  1020. next_docid = int(self.getNextDocid())
  1021. partitionkey = int(next_docid%500+1)
  1022. message_id = headers.headers["message-id"]
  1023. body = json.loads(headers.body)
  1024. body[document_tmp_partitionkey] = partitionkey
  1025. body[document_tmp_docid] = next_docid
  1026. if body.get(document_original_docchannel) is None:
  1027. body[document_original_docchannel] = body.get(document_docchannel)
  1028. page_attachments = body.get(document_tmp_attachment_path,"[]")
  1029. _uuid = body.get(document_tmp_uuid,"")
  1030. if page_attachments!="[]":
  1031. status = random.randint(1,10)
  1032. body[document_tmp_status] = status
  1033. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment):
  1034. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  1035. ackMsg(self.conn,message_id)
  1036. else:
  1037. log("send_msg_error on init listener")
  1038. else:
  1039. status = random.randint(11,50)
  1040. body[document_tmp_status] = status
  1041. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract):
  1042. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  1043. ackMsg(self.conn,message_id)
  1044. else:
  1045. log("send_msg_error on init listener")
  1046. except Exception as e:
  1047. traceback.print_exc()
  1048. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_init):
  1049. log("init error")
  1050. ackMsg(self.conn,message_id)
  1051. def __del__(self):
  1052. self.conn.disconnect()
  1053. del self.pool_mq1
  1054. def __init__(self):
  1055. Dataflow.__init__(self)
  1056. self.max_shenpi_id = None
  1057. self.base_shenpi_id = 400000000000
  1058. self.mq_init = "/queue/dataflow_init"
  1059. self.mq_attachment = "/queue/dataflow_attachment"
  1060. self.mq_extract = "/queue/dataflow_extract"
  1061. self.pool_oracle = ConnectorPool(10,15,getConnection_oracle)
  1062. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  1063. self.ots_capacity = getConnect_ots_capacity()
  1064. self.init_comsumer_counts = 2
  1065. self.list_init_comsumer = []
  1066. for i in range(self.init_comsumer_counts):
  1067. listener = self.InitListener(getConnect_activateMQ())
  1068. createComsumer(listener,self.mq_init)
  1069. self.list_init_comsumer.append(listener)
  1070. def monitor_listener(self):
  1071. for i in range(len(self.list_init_comsumer)):
  1072. if self.list_init_comsumer[i].conn.is_connected():
  1073. continue
  1074. else:
  1075. listener = self.InitListener(getConnect_activateMQ())
  1076. createComsumer(listener,self.mq_init)
  1077. self.list_init_comsumer[i] = listener
  1078. def temp2mq(self,object):
  1079. conn_oracle = self.pool_oracle.getConnector()
  1080. try:
  1081. list_obj = object.select_rows(conn_oracle,type(object),object.table_name,[],limit=1000)
  1082. for _obj in list_obj:
  1083. ots_dict = _obj.getProperties_ots()
  1084. if len(ots_dict.get("dochtmlcon",""))>500000:
  1085. _obj.delete_row(conn_oracle)
  1086. log("msg too long:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  1087. continue
  1088. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_init):
  1089. #删除数据,上线放开
  1090. _obj.delete_row(conn_oracle)
  1091. else:
  1092. log("send_msg_error111:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  1093. self.pool_oracle.putConnector(conn_oracle)
  1094. except Exception as e:
  1095. traceback.print_exc()
  1096. self.pool_oracle.decrease()
  1097. def shengpi2mq(self):
  1098. conn_oracle = self.pool_oracle.getConnector()
  1099. try:
  1100. if self.max_shenpi_id is None:
  1101. # get the max_shenpi_id
  1102. _query = BoolQuery(must_queries=[ExistsQuery("id")])
  1103. rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
  1104. SearchQuery(_query,sort=Sort(sorters=[FieldSort("id",SortOrder.DESC)]),limit=1))
  1105. list_data = getRow_ots(rows)
  1106. if len(list_data)>0:
  1107. max_shenpi_id = list_data[0].get("id")
  1108. if max_shenpi_id>self.base_shenpi_id:
  1109. max_shenpi_id -= self.base_shenpi_id
  1110. self.max_shenpi_id = max_shenpi_id
  1111. if self.max_shenpi_id is not None:
  1112. # select data in order
  1113. list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,self.max_shenpi_id,)
  1114. # send data to mq one by one with max_shenpi_id updated
  1115. for _data in list_data:
  1116. _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
  1117. ots_dict = _data.getProperties_ots()
  1118. if ots_dict["docid"]<self.base_shenpi_id:
  1119. ots_dict["docid"] += self.base_shenpi_id
  1120. if ots_dict.get(T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS,"") !='[]':
  1121. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_attachment):
  1122. self.max_shenpi_id = _id
  1123. else:
  1124. log("sent shenpi message to mq failed %s"%(_id))
  1125. break
  1126. else:
  1127. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_extract):
  1128. self.max_shenpi_id = _id
  1129. else:
  1130. log("sent shenpi message to mq failed %s"%(_id))
  1131. break
  1132. except Exception as e:
  1133. traceback.print_exc()
  1134. self.pool_oracle.decrease()
  1135. def ots2mq(self):
  1136. try:
  1137. bool_query = BoolQuery(must_queries=[RangeQuery("status",1,51)])
  1138. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1139. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  1140. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1141. list_data = getRow_ots(rows)
  1142. for _data in list_data:
  1143. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1144. document_tmp_docid:_data.get(document_tmp_docid),
  1145. document_tmp_status:0}
  1146. _document = Document(_d)
  1147. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1148. _document_html = Document(_data)
  1149. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  1150. if page_attachments!="[]":
  1151. status = random.randint(1,10)
  1152. _data[document_tmp_status] = status
  1153. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1154. else:
  1155. status = random.randint(11,50)
  1156. _data[document_tmp_status] = status
  1157. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1158. if send_succeed:
  1159. _document.update_row(self.ots_client)
  1160. else:
  1161. log("send_msg_error2222")
  1162. while next_token:
  1163. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1164. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1165. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1166. list_data = getRow_ots(rows)
  1167. for _data in list_data:
  1168. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1169. document_tmp_docid:_data.get(document_tmp_docid),
  1170. document_tmp_status:0}
  1171. _document = Document(_d)
  1172. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1173. _document_html = Document(_data)
  1174. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  1175. if page_attachments!="[]":
  1176. status = random.randint(1,10)
  1177. _data[document_tmp_status] = status
  1178. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1179. else:
  1180. status = random.randint(11,50)
  1181. _data[document_tmp_status] = status
  1182. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1183. if send_succeed:
  1184. _document.update_row(self.ots_client)
  1185. else:
  1186. log("send_msg_error2222")
  1187. except Exception as e:
  1188. traceback.print_exc()
  1189. def otstmp2mq(self):
  1190. try:
  1191. bool_query = BoolQuery(must_queries=[TermQuery("status",0)])
  1192. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1193. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  1194. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1195. list_data = getRow_ots(rows)
  1196. for _data in list_data:
  1197. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1198. document_tmp_docid:_data.get(document_tmp_docid),
  1199. document_tmp_status:0}
  1200. _document = Document_tmp(_d)
  1201. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1202. _document_html = Document_html(_data)
  1203. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  1204. if page_attachments!="[]":
  1205. status = random.randint(1,10)
  1206. _data[document_tmp_status] = status
  1207. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1208. else:
  1209. status = random.randint(11,50)
  1210. _data[document_tmp_status] = status
  1211. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1212. if send_succeed:
  1213. _document.setValue(document_tmp_status,1,True)
  1214. _document.update_row(self.ots_client)
  1215. else:
  1216. log("send_msg_error2222")
  1217. while next_token:
  1218. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1219. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1220. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1221. list_data = getRow_ots(rows)
  1222. for _data in list_data:
  1223. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1224. document_tmp_docid:_data.get(document_tmp_docid),
  1225. document_tmp_status:0}
  1226. _document = Document_tmp(_d)
  1227. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1228. _document_html = Document_html(_data)
  1229. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  1230. if page_attachments!="[]":
  1231. status = random.randint(1,10)
  1232. _data[document_tmp_status] = status
  1233. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1234. else:
  1235. status = random.randint(11,50)
  1236. _data[document_tmp_status] = status
  1237. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1238. if send_succeed:
  1239. _document.setValue(document_tmp_status,1,True)
  1240. _document.update_row(self.ots_client)
  1241. else:
  1242. log("send_msg_error2222")
  1243. except Exception as e:
  1244. traceback.print_exc()
  1245. def test_dump_docid(self):
  1246. class TestDumpListener(ActiveMQListener):
  1247. def on_message(self, headers):
  1248. message_id = headers.headers["message-id"]
  1249. body = headers.body
  1250. self._queue.put(headers,True)
  1251. ackMsg(self.conn,message_id)
  1252. _queue = Queue()
  1253. listener1 = TestDumpListener(getConnect_activateMQ(),_queue)
  1254. listener2 = TestDumpListener(getConnect_activateMQ(),_queue)
  1255. createComsumer(listener1,"/queue/dataflow_attachment")
  1256. createComsumer(listener2,"/queue/dataflow_extract")
  1257. time.sleep(10)
  1258. list_item = []
  1259. list_docid = []
  1260. while 1:
  1261. try:
  1262. _item = _queue.get(timeout=2)
  1263. list_item.append(_item)
  1264. except Exception as e:
  1265. break
  1266. for item in list_item:
  1267. _item = json.loads(item.body)
  1268. list_docid.append(_item.get("docid"))
  1269. log(list_docid[:10])
  1270. log("len docid:%d set len:%d"%(len(list_docid),len(set(list_docid))))
  1271. def start_dataflow_init(self):
  1272. # self.test_dump_docid()
  1273. from BaseDataMaintenance.model.oracle.CaiGouYiXiangTemp import CaiGouYiXiangTemp
  1274. from BaseDataMaintenance.model.oracle.PaiMaiChuRangTemp import PaiMaiChuRangTemp
  1275. from BaseDataMaintenance.model.oracle.ZhaoBiaoGongGaoTemp import ZhaoBiaoGongGaoTemp
  1276. from BaseDataMaintenance.model.oracle.ZhaoBiaoYuGaoTemp import ZhaoBiaoYuGaoTemp
  1277. from BaseDataMaintenance.model.oracle.ZhongBiaoXinXiTemp import ZhongBiaoXinXiTemp
  1278. from BaseDataMaintenance.model.oracle.ZiShenJieGuoTemp import ZiShenJieGuoTemp
  1279. from BaseDataMaintenance.model.oracle.ChanQuanJiaoYiTemp import ChanQuanJiaoYiTemp
  1280. from BaseDataMaintenance.model.oracle.GongGaoBianGengTemp import GongGaoBianGeng
  1281. from BaseDataMaintenance.model.oracle.KongZhiJiaTemp import KongZhiJiaTemp
  1282. from BaseDataMaintenance.model.oracle.TuDiKuangChanTemp import TuDiKuangChanTemp
  1283. from BaseDataMaintenance.model.oracle.ZhaoBiaoDaYiTemp import ZhaoBiaoDaYiTemp
  1284. from BaseDataMaintenance.model.oracle.ZhaoBiaoWenJianTemp import ZhaoBiaoWenJianTemp
  1285. schedule = BlockingScheduler()
  1286. schedule.add_job(self.temp2mq,"cron",args=(CaiGouYiXiangTemp({}),),second="*/10")
  1287. schedule.add_job(self.temp2mq,"cron",args=(PaiMaiChuRangTemp({}),),second="*/10")
  1288. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoGongGaoTemp({}),),second="*/10")
  1289. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoYuGaoTemp({}),),second="*/10")
  1290. schedule.add_job(self.temp2mq,"cron",args=(ZhongBiaoXinXiTemp({}),),second="*/10")
  1291. schedule.add_job(self.temp2mq,"cron",args=(ZiShenJieGuoTemp({}),),second="*/10")
  1292. schedule.add_job(self.temp2mq,"cron",args=(ChanQuanJiaoYiTemp({}),),second="*/10")
  1293. schedule.add_job(self.temp2mq,"cron",args=(GongGaoBianGeng({}),),second="*/10")
  1294. schedule.add_job(self.temp2mq,"cron",args=(KongZhiJiaTemp({}),),second="*/10")
  1295. schedule.add_job(self.temp2mq,"cron",args=(TuDiKuangChanTemp({}),),second="*/10")
  1296. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10")
  1297. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10")
  1298. schedule.add_job(self.ots2mq,"cron",second="*/10")
  1299. schedule.add_job(self.otstmp2mq,"cron",second="*/10")
  1300. schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  1301. schedule.start()
  1302. def transform_attachment():
  1303. from BaseDataMaintenance.model.ots.attachment import attachment
  1304. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  1305. from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots
  1306. from threading import Thread
  1307. from queue import Queue
  1308. task_queue = Queue()
  1309. def comsumer(task_queue,pool_postgres):
  1310. while 1:
  1311. _dict = task_queue.get(True)
  1312. try:
  1313. attach = Attachment_postgres(_dict)
  1314. if not attach.exists(pool_postgres):
  1315. attach.insert_row(pool_postgres)
  1316. except Exception as e:
  1317. traceback.print_exc()
  1318. def start_comsumer(task_queue):
  1319. pool_postgres = ConnectorPool(10,30,getConnection_postgres)
  1320. comsumer_count = 30
  1321. list_thread = []
  1322. for i in range(comsumer_count):
  1323. _t = Thread(target=comsumer,args=(task_queue,pool_postgres))
  1324. list_thread.append(_t)
  1325. for _t in list_thread:
  1326. _t.start()
  1327. ots_client = getConnect_ots()
  1328. _thread = Thread(target=start_comsumer,args=(task_queue,))
  1329. _thread.start()
  1330. bool_query = BoolQuery(must_queries=[
  1331. RangeQuery(attachment_crtime,"2022-05-06"),
  1332. BoolQuery(should_queries=[TermQuery(attachment_status,10),
  1333. TermQuery(attachment_status,ATTACHMENT_TOOLARGE)])
  1334. ])
  1335. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  1336. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(attachment_crtime,sort_order=SortOrder.DESC)]),get_total_count=True,limit=100),
  1337. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1338. list_dict = getRow_ots(rows)
  1339. for _dict in list_dict:
  1340. task_queue.put(_dict,True)
  1341. _count = len(list_dict)
  1342. while next_token:
  1343. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  1344. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1345. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1346. list_dict = getRow_ots(rows)
  1347. for _dict in list_dict:
  1348. task_queue.put(_dict,True)
  1349. _count += len(list_dict)
  1350. print("%d/%d,queue:%d"%(_count,total_count,task_queue.qsize()))
  1351. def del_test_doc():
  1352. ots_client = getConnect_ots()
  1353. bool_query = BoolQuery(must_queries=[RangeQuery("docid",range_to=0)])
  1354. list_data = []
  1355. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1356. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  1357. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1358. print(total_count)
  1359. list_row = getRow_ots(rows)
  1360. list_data.extend(list_row)
  1361. while next_token:
  1362. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1363. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1364. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1365. list_row = getRow_ots(rows)
  1366. list_data.extend(list_row)
  1367. for _row in list_data:
  1368. _doc = Document_tmp(_row)
  1369. _doc.delete_row(ots_client)
  1370. _html = Document_html(_row)
  1371. if _html.exists_row(ots_client):
  1372. _html.delete_row(ots_client)
  1373. def fixDoc_to_queue_extract():
  1374. pool_mq = ConnectorPool(10,20,getConnect_activateMQ)
  1375. try:
  1376. ots_client = getConnect_ots()
  1377. ots_capacity = getConnect_ots_capacity()
  1378. bool_query = BoolQuery(must_queries=[
  1379. RangeQuery("crtime","2022-05-31"),
  1380. TermQuery("docchannel",114)
  1381. ])
  1382. list_data = []
  1383. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1384. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  1385. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1386. print(total_count)
  1387. list_row = getRow_ots(rows)
  1388. list_data.extend(list_row)
  1389. _count = len(list_row)
  1390. while next_token:
  1391. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1392. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1393. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1394. list_row = getRow_ots(rows)
  1395. list_data.extend(list_row)
  1396. _count = len(list_data)
  1397. print("%d/%d"%(_count,total_count))
  1398. task_queue = Queue()
  1399. for _row in list_data:
  1400. if "all_columns" in _row:
  1401. _row.pop("all_columns")
  1402. _html = Document(_row)
  1403. task_queue.put(_html)
  1404. def _handle(item,result_queue):
  1405. _html = item
  1406. _html.fix_columns(ots_capacity,["dochtmlcon"],True)
  1407. print(_html.getProperties().get(document_tmp_docid))
  1408. send_msg_toacmq(pool_mq,json.dumps(_html.getProperties()),"/queue/dataflow_extract")
  1409. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1410. mt.run()
  1411. except Exception as e:
  1412. traceback.print_exc()
  1413. finally:
  1414. pool_mq.destory()
  1415. def check_data_synchronization():
  1416. # filepath = "C:\\Users\\Administrator\\Desktop\\to_check.log"
  1417. # list_uuid = []
  1418. # _regrex = "delete\s+(?P<tablename>[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  1419. # with open(filepath,"r",encoding="utf8") as f:
  1420. # while 1:
  1421. # _line = f.readline()
  1422. # if not _line:
  1423. # break
  1424. # _match = re.search(_regrex,_line)
  1425. # if _match is not None:
  1426. # _uuid = _match.groupdict().get("uuid")
  1427. # tablename = _match.groupdict.get("tablename")
  1428. # if _uuid is not None:
  1429. # list_uuid.append({"uuid":_uuid,"tablename":tablename})
  1430. # print("total_count:",len(list_uuid))
  1431. import pandas as pd
  1432. from BaseDataMaintenance.common.Utils import load
  1433. task_queue = Queue()
  1434. list_data = []
  1435. df_data = load("uuid.pk")
  1436. # df_data = pd.read_excel("check.xlsx")
  1437. for uuid,tablename in zip(df_data["uuid"],df_data["tablename"]):
  1438. _dict = {"uuid":uuid,
  1439. "tablename":tablename}
  1440. list_data.append(_dict)
  1441. task_queue.put(_dict)
  1442. print("qsize:",task_queue.qsize())
  1443. ots_client = getConnect_ots()
  1444. def _handle(_item,result_queue):
  1445. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  1446. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1447. SearchQuery(bool_query,get_total_count=True),
  1448. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1449. _item["exists"] = total_count
  1450. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1451. mt.run()
  1452. df_data = {"uuid":[],
  1453. "tablename":[],
  1454. "exists":[]}
  1455. for _data in list_data:
  1456. if _data["exists"]==0:
  1457. for k,v in df_data.items():
  1458. v.append(_data.get(k))
  1459. import pandas as pd
  1460. df2 = pd.DataFrame(df_data)
  1461. df2.to_excel("check1.xlsx")
  1462. current_path = os.path.abspath(os.path.dirname(__file__))
  1463. def fixDoc_to_queue_init(filename=""):
  1464. import pandas as pd
  1465. from BaseDataMaintenance.model.oracle.GongGaoTemp import dict_oracle2ots
  1466. if filename=="":
  1467. filename = os.path.join(current_path,"check.xlsx")
  1468. df = pd.read_excel(filename)
  1469. if "docchannel" in dict_oracle2ots:
  1470. dict_oracle2ots.pop("docchannel")
  1471. row_name = ",".join(list(dict_oracle2ots.keys()))
  1472. conn = getConnection_oracle()
  1473. cursor = conn.cursor()
  1474. _count = 0
  1475. for uuid,tablename,_exists,_toolong in zip(df["uuid"],df["tablename"],df["exists"],df["tolong"]):
  1476. if _exists==0 and _toolong==0:
  1477. _count += 1
  1478. _source = str(tablename).replace("_TEMP","")
  1479. sql = " insert into %s(%s) select %s from %s where id='%s' "%(tablename,row_name,row_name,_source,uuid)
  1480. cursor.execute(sql)
  1481. log("%d:%s"%(_count,sql))
  1482. conn.commit()
  1483. conn.close()
  1484. if __name__ == '__main__':
  1485. # di = Dataflow_init()
  1486. # di.start_dataflow_init()
  1487. # transform_attachment()
  1488. # del_test_doc()
  1489. # de = Dataflow_ActivteMQ_extract()
  1490. # de.start_flow_extract()
  1491. # fixDoc_to_queue_extract()
  1492. # check_data_synchronization()
  1493. # fixDoc_to_queue_init(filename="C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-07.xlsx")
  1494. current_date = getCurrent_date(format="%Y-%m-%d")
  1495. last_date = timeAdd(current_date,-30,format="%Y-%m-%d")
  1496. sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
  1497. print(sql)