dataflow_mq.py 88 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003
  1. from BaseDataMaintenance.maintenance.dataflow import *
  2. from BaseDataMaintenance.common.activateMQUtils import *
  3. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ,getConnection_postgres,getConnection_mysql,getConnection_oracle,getConnect_ots_capacity,getConnect_redis_doc
  4. from BaseDataMaintenance.dataSource.setttings import *
  5. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  6. import os
  7. from BaseDataMaintenance.common.ossUtils import *
  8. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  9. from BaseDataMaintenance.model.ots.document import Document,document_attachment_path_filemd5
  10. from BaseDataMaintenance.common.Utils import article_limit
  11. from BaseDataMaintenance.common.documentFingerprint import getFingerprint
  12. from BaseDataMaintenance.model.postgres.document_extract import *
  13. from BaseDataMaintenance.model.oracle.T_SHEN_PI_XIANG_MU import *
  14. import sys
  15. sys.setrecursionlimit(1000000)
  16. from multiprocessing import Process
  17. class ActiveMQListener():
  18. def __init__(self,conn,_queue,*args,**kwargs):
  19. self.conn = conn
  20. self._queue = _queue
  21. def on_error(self, headers):
  22. log("===============")
  23. log('received an error %s' % str(headers.body))
  24. def on_message(self, headers):
  25. log("====message====")
  26. message_id = headers.headers["message-id"]
  27. body = headers.body
  28. self._queue.put({"frame":headers,"conn":self.conn},True)
  29. def __del__(self):
  30. self.conn.disconnect()
  31. class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
  32. class AttachmentMQListener():
  33. def __init__(self,conn,_func,_idx,*args,**kwargs):
  34. self.conn = conn
  35. self._func = _func
  36. self._idx = _idx
  37. def on_error(self, headers):
  38. log("===============")
  39. log('received an error %s' % str(headers.body))
  40. def on_message(self, headers):
  41. try:
  42. log("get message of idx:%s"%(str(self._idx)))
  43. message_id = headers.headers["message-id"]
  44. body = headers.body
  45. _dict = {"frame":headers,"conn":self.conn,"idx":self._idx}
  46. self._func(_dict=_dict)
  47. except Exception as e:
  48. traceback.print_exc()
  49. pass
  50. def __del__(self):
  51. self.conn.disconnect()
  52. def __init__(self):
  53. Dataflow_attachment.__init__(self)
  54. self.mq_attachment = "/queue/dataflow_attachment"
  55. self.mq_attachment_failed = "/queue/dataflow_attachment_failed"
  56. self.mq_extract = "/queue/dataflow_extract"
  57. self.queue_attachment_ocr = Queue()
  58. self.queue_attachment_not_ocr = Queue()
  59. self.comsumer_count = 20
  60. self.comsumer_process_count = 5
  61. self.retry_comsumer_count = 10
  62. self.retry_times = 5
  63. self.list_attachment_comsumer = []
  64. # for _i in range(self.comsumer_count):
  65. # listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.queue_attachment)
  66. # createComsumer(listener_attachment,self.mq_attachment)
  67. # self.list_attachment_comsumer.append(listener_attachment)
  68. self.attach_pool = ConnectorPool(10,30,getConnection_postgres)
  69. self.redis_pool = ConnectorPool(10,30,getConnect_redis_doc)
  70. self.conn_mq = getConnect_activateMQ()
  71. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  72. self.session = None
  73. for _ in range(self.comsumer_process_count):
  74. listener_p = Process(target=self.start_attachment_listener)
  75. listener_p.start()
  76. # listener_p = Process(target=self.start_attachment_listener)
  77. # listener_p.start()
  78. def start_attachment_listener(self):
  79. for _i in range(self.comsumer_count):
  80. listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler ,_i)
  81. createComsumer(listener_attachment,self.mq_attachment)
  82. self.list_attachment_comsumer.append(listener_attachment)
  83. while 1:
  84. for i in range(len(self.list_attachment_comsumer)):
  85. if self.list_attachment_comsumer[i].conn.is_connected():
  86. continue
  87. else:
  88. listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,i)
  89. createComsumer(listener,self.mq_attachment)
  90. self.list_attachment_comsumer[i] = listener
  91. time.sleep(5)
  92. def monitor_listener(self):
  93. for i in range(len(self.list_attachment_comsumer)):
  94. if self.list_attachment_comsumer[i].conn.is_connected():
  95. continue
  96. else:
  97. listener = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  98. createComsumer(listener,self.mq_attachment)
  99. self.list_attachment_comsumer[i] = listener
  100. def process_failed_attachment(self):
  101. from BaseDataMaintenance.java.MQInfo import getQueueSize
  102. attachment_size = getQueueSize("dataflow_attachment")
  103. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  104. if attachment_size<100 and failed_attachment_size>0:
  105. list_comsumer = []
  106. for _i in range(self.retry_comsumer_count):
  107. listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
  108. list_comsumer.append(listener_attachment)
  109. createComsumer(listener_attachment,self.mq_attachment_failed)
  110. while 1:
  111. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  112. if failed_attachment_size==0:
  113. break
  114. time.sleep(10)
  115. for _c in list_comsumer:
  116. _c.conn.disconnect()
  117. def attachment_listener_handler(self,_dict):
  118. try:
  119. frame = _dict["frame"]
  120. conn = _dict["conn"]
  121. message_id = frame.headers["message-id"]
  122. item = json.loads(frame.body)
  123. _idx = _dict.get("idx",1)
  124. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  125. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  126. if random.random()<0.2:
  127. log("jump by random")
  128. if send_msg_toacmq(self.pool_mq,frame.body,self.mq_attachment):
  129. ackMsg(conn,message_id)
  130. return
  131. if len(page_attachments)==0:
  132. newitem ={"item":item,"list_attach":[],"message_id":message_id,"conn":conn}
  133. else:
  134. list_fileMd5 = []
  135. for _atta in page_attachments:
  136. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  137. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  138. newitem = {"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn}
  139. log("attachment get doc:%s"%(str(newitem.get("item",{}).get("docid"))))
  140. self.attachment_recognize(newitem,None)
  141. log("attachment get doc:%s succeed"%(str(newitem.get("item",{}).get("docid"))))
  142. except Exception as e:
  143. traceback.print_exc()
  144. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  145. try:
  146. list_html = []
  147. swf_urls = []
  148. _not_failed = True
  149. for _attach in list_attach:
  150. #测试全跑
  151. _filemd5 = _attach.getProperties().get(attachment_filemd5)
  152. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  153. log("%s has processed or toolarge"%(_filemd5))
  154. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  155. if _html is None:
  156. _html = ""
  157. list_html.append({attachment_filemd5:_filemd5,
  158. "html":_html})
  159. else:
  160. #has process_time then jump
  161. if len(str(_attach.getProperties().get(attachment_process_time,"")))>10 and _attach.getProperties().get(attachment_status)!=ATTACHMENT_INIT and not (_attach.getProperties().get(attachment_status)>=ATTACHMENT_MC_FAILED_FROM and _attach.getProperties().get(attachment_status)<=ATTACHMENT_MC_FAILED_TO):
  162. log("%s has process_time jump"%(_filemd5))
  163. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  164. if _html is None:
  165. _html = ""
  166. list_html.append({attachment_filemd5:_filemd5,
  167. "html":_html})
  168. else:
  169. log("%s requesting interface"%(_filemd5))
  170. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  171. if not _succeed:
  172. _not_failed = False
  173. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  174. if _html is None:
  175. _html = ""
  176. list_html.append({attachment_filemd5:_filemd5,
  177. "html":_html})
  178. if _attach.getProperties().get(attachment_filetype)=="swf":
  179. # swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  180. _swf_urls = _attach.getProperties().get(attachment_swfUrls, "[]")
  181. if _swf_urls:
  182. _swf_urls = _swf_urls.replace('\\', '')
  183. else:
  184. _swf_urls = '[]'
  185. _swf_urls = json.loads(_swf_urls)
  186. swf_urls.extend(_swf_urls)
  187. if not _not_failed:
  188. return False,list_html,swf_urls
  189. return True,list_html,swf_urls
  190. except requests.ConnectionError as e1:
  191. raise e1
  192. except Exception as e:
  193. return False,list_html,swf_urls
  194. def attachment_recognize(self,_dict,result_queue):
  195. '''
  196. 识别附件内容
  197. :param _dict: 附件内容
  198. :param result_queue:
  199. :return:
  200. '''
  201. try:
  202. start_time = time.time()
  203. item = _dict.get("item")
  204. list_attach = _dict.get("list_attach")
  205. conn = _dict["conn"]
  206. message_id = _dict.get("message_id")
  207. if "retry_times" not in item:
  208. item["retry_times"] = 5
  209. _retry_times = item.get("retry_times",0)
  210. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  211. "docid":item.get("docid")})
  212. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  213. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  214. dhtml.delete_bidi_a()
  215. #调用识别接口
  216. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  217. # 将附件分类写回document
  218. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  219. if len(page_attachments)>0:
  220. for _attachment in page_attachments:
  221. filemd5 = _attachment.get(document_attachment_path_filemd5,"")
  222. classification = None
  223. for _attach in list_attach:
  224. if _attach.getProperties().get(attachment_filemd5,"")==filemd5:
  225. classification = _attach.getProperties().get(attachment_classification,"")
  226. break
  227. if classification is not None:
  228. _attachment[attachment_classification] = classification
  229. item[document_tmp_attachment_path] = json.dumps(page_attachments,ensure_ascii=False)
  230. dtmp = Document_tmp(item)
  231. _to_ack = False
  232. if not _succeed and _retry_times<self.retry_times:
  233. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  234. item["retry_times"] = _retry_times+1
  235. #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
  236. if item["retry_times"]>=self.retry_times:
  237. send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
  238. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
  239. #失败保存
  240. if _retry_times==0:
  241. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  242. dtmp.setValue(document_tmp_status,0,True)
  243. if not dtmp.exists_row(self.ots_client):
  244. dtmp.update_row(self.ots_client)
  245. dhtml.update_row(self.ots_client)
  246. if send_succeed:
  247. _to_ack = True
  248. else:
  249. try:
  250. log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),str(len(list_html))))
  251. dhtml.updateSWFImages(swf_urls)
  252. dhtml.updateAttachment(list_html)
  253. dtmp.setValue(document_tmp_attachment_extract_status,1,True)
  254. dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
  255. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
  256. if send_succeed:
  257. _to_ack = True
  258. except Exception as e:
  259. traceback.print_exc()
  260. if _to_ack:
  261. ackMsg(conn,message_id)
  262. log("document:%d get attachments with result:%s %s retry_times:%d"%(item.get("docid"),str(_succeed),str(_to_ack),_retry_times))
  263. except Exception as e:
  264. traceback.print_exc()
  265. if time.time()-start_time<10:
  266. item["retry_times"] -= 1
  267. if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment):
  268. ackMsg(conn,message_id)
  269. def request_attachment_interface(self,attach,_dochtmlcon):
  270. filemd5 = attach.getProperties().get(attachment_filemd5)
  271. _status = attach.getProperties().get(attachment_status)
  272. _filetype = attach.getProperties().get(attachment_filetype)
  273. _path = attach.getProperties().get(attachment_path)
  274. _uuid = uuid4()
  275. objectPath = attach.getProperties().get(attachment_path)
  276. docids = attach.getProperties().get(attachment_docids)
  277. _ots_exists = attach.getProperties().get("ots_exists")
  278. if objectPath is None:
  279. relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex)
  280. else:
  281. relative_path = objectPath[5:].replace("//","/")
  282. localpath = "/FileInfo/%s"%(relative_path)
  283. if not os.path.exists(localpath):
  284. if not os.path.exists(os.path.dirname(localpath)):
  285. os.makedirs(os.path.dirname(localpath))
  286. local_exists = False
  287. else:
  288. local_exists = True
  289. _size = os.path.getsize(localpath)
  290. not_failed_flag = True
  291. try:
  292. d_start_time = time.time()
  293. if not local_exists:
  294. log("md5:%s path:%s not exists,start downloading"%(filemd5,objectPath))
  295. try:
  296. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  297. except Exception as e:
  298. download_succeed = False
  299. else:
  300. log("md5:%s path:%s exists"%(filemd5,objectPath[5:]))
  301. if not (local_exists or download_succeed):
  302. _ots_attach = attachment(attach.getProperties_ots())
  303. _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_classification,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True)
  304. log("md5:%s path:%s file not in local or oss,search ots.attachment"%(filemd5,objectPath))
  305. if _ots_attach.getProperties().get(attachment_attachmenthtml,"")!="" and str(_ots_attach.getProperties().get(attachment_status))!=str(ATTACHMENT_INIT):
  306. attach.setValue(attachment_attachmenthtml,_ots_attach.getProperties().get(attachment_attachmenthtml,""))
  307. attach.setValue(attachment_attachmentcon,_ots_attach.getProperties().get(attachment_attachmentcon,""))
  308. attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,""))
  309. attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,""))
  310. attach.setValue(attachment_classification,_ots_attach.getProperties().get(attachment_classification,""))
  311. # if attach.exists(self.attach_pool):
  312. # attach.update_row(self.attach_pool)
  313. # else:
  314. # attach.insert_row(self.attach_pool)
  315. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  316. try:
  317. if os.exists(localpath):
  318. os.remove(localpath)
  319. except Exception as e:
  320. pass
  321. return True
  322. if _ots_exists:
  323. objectPath = attach.getProperties().get(attachment_path)
  324. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  325. if download_succeed:
  326. log("md5:%s path:%s download file from oss succeed"%(filemd5,objectPath))
  327. else:
  328. log("md5:%s path:%s download file from ots failed=="%(filemd5,objectPath))
  329. else:
  330. log("md5:%s path:%s not found in ots"%(filemd5,objectPath))
  331. if local_exists or download_succeed:
  332. _size = os.path.getsize(localpath)
  333. attach.setValue(attachment_size,_size,True)
  334. if _size>ATTACHMENT_LARGESIZE:
  335. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE,True)
  336. log("attachment :%s of path:%s to large"%(filemd5,_path))
  337. _ots_attach = attachment(attach.getProperties_ots())
  338. _ots_attach.update_row(self.ots_client)
  339. # #更新postgres
  340. # if attach.exists(self.attach_pool):
  341. # attach.update_row(self.attach_pool)
  342. # else:
  343. # attach.insert_row(self.attach_pool)
  344. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  345. if local_exists:
  346. if not _ots_exists:
  347. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  348. os.remove(localpath)
  349. return True
  350. time_download = time.time()-d_start_time
  351. #调用接口处理结果
  352. start_time = time.time()
  353. _filetype = attach.getProperties().get(attachment_filetype)
  354. # _data_base64 = base64.b64encode(open(localpath,"rb").read())
  355. # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
  356. _success,_html,swf_images,classification = getAttachDealInterface(None,_filetype,path=localpath,session=self.session)
  357. _reg_time = time.time()-start_time
  358. if _success:
  359. if len(_html)<5:
  360. _html = ""
  361. else:
  362. if len(_html)>1:
  363. _html = "interface return error"
  364. else:
  365. # sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  366. _html = ""
  367. return False
  368. # 重跑swf时,删除原来的swf_urls中的"\"
  369. if attach.getProperties().get(attachment_filetype) == "swf":
  370. swf_urls = attach.getProperties().get(attachment_swfUrls, "[]")
  371. swf_urls = swf_urls.replace('\\', '') if swf_urls else '[]'
  372. swf_urls = json.loads(swf_urls)
  373. attach.setValue(attachment_swfUrls, json.dumps(swf_urls, ensure_ascii=False), True)
  374. swf_images = eval(swf_images)
  375. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  376. swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  377. if len(swf_urls)==0:
  378. objectPath = attach.getProperties().get(attachment_path,"")
  379. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  380. if not os.path.exists(swf_dir):
  381. os.mkdir(swf_dir)
  382. for _i in range(len(swf_images)):
  383. _base = swf_images[_i]
  384. _base = base64.b64decode(_base)
  385. filename = "swf_page_%d.png"%(_i)
  386. filepath = os.path.join(swf_dir,filename)
  387. with open(filepath,"wb") as f:
  388. f.write(_base)
  389. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  390. if os.path.exists(swf_dir):
  391. os.rmdir(swf_dir)
  392. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  393. else:
  394. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  395. if re.search("<td",_html) is not None:
  396. attach.setValue(attachment_has_table,1,True)
  397. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  398. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  399. if _file_title!="":
  400. attach.setValue(attachment_file_title,_file_title,True)
  401. if filelink!="":
  402. attach.setValue(attachment_file_link,filelink,True)
  403. attach.setValue(attachment_attachmenthtml,_html,True)
  404. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  405. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  406. attach.setValue(attachment_recsize,len(_html),True)
  407. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  408. attach.setValue(attachment_classification,classification,True)
  409. #更新ots
  410. _ots_attach = attachment(attach.getProperties_ots())
  411. _ots_attach.update_row(self.ots_client) #线上再开放更新
  412. # #更新postgres
  413. # if attach.exists(self.attach_pool):
  414. # attach.update_row(self.attach_pool)
  415. # else:
  416. # attach.insert_row(self.attach_pool)
  417. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  418. start_time = time.time()
  419. if local_exists:
  420. if not _ots_exists:
  421. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  422. try:
  423. if upload_status and os.exists(localpath):
  424. os.remove(localpath)
  425. except Exception as e:
  426. pass
  427. _upload_time = time.time()-start_time
  428. log("process filemd5:%s %s of type:%s with size:%.3fM download:%.2fs recognize takes %ds upload takes %.2fs _ots_exists %s,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,_reg_time,_upload_time,str(_ots_exists),len(_html)))
  429. return True
  430. else:
  431. return True
  432. except requests.ConnectionError as e1:
  433. raise e1
  434. except oss2.exceptions.NotFound as e:
  435. return True
  436. except Exception as e:
  437. traceback.print_exc()
  438. # def flow_attachment(self):
  439. # self.flow_attachment_producer()
  440. # self.flow_attachment_producer_comsumer()
  441. def getAttachPath(self,filemd5,_dochtmlcon):
  442. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  443. list_mark = ["data","filelink"]
  444. for _mark in list_mark:
  445. _find = _soup.find("a",attrs={_mark:filemd5})
  446. filelink = ""
  447. if _find is None:
  448. _find = _soup.find("img",attrs={_mark:filemd5})
  449. if _find is not None:
  450. filelink = _find.attrs.get("src","")
  451. else:
  452. filelink = _find.attrs.get("href","")
  453. if filelink.find("bidizhaobiao")>=0:
  454. _path = filelink.split("/file")
  455. if len(_path)>1:
  456. return _path[1]
  457. def getAttach_json_fromRedis(self,filemd5):
  458. db = self.redis_pool.getConnector()
  459. try:
  460. _key = "attach-%s"%(filemd5)
  461. _attach_json = db.get(_key)
  462. return _attach_json
  463. except Exception as e:
  464. log("getAttach_json_fromRedis error %s"%(str(e)))
  465. finally:
  466. try:
  467. if db.connection.check_health():
  468. self.redis_pool.putConnector(db)
  469. except Exception as e:
  470. pass
  471. return None
  472. def putAttach_json_toRedis(self,filemd5,extract_dict):
  473. db = self.redis_pool.getConnector()
  474. try:
  475. new_dict = {}
  476. for k,v in extract_dict.items():
  477. if not isinstance(v,set):
  478. new_dict[k] = v
  479. _key = "attach-%s"%(filemd5)
  480. _extract_json = db.set(str(_key),json.dumps(new_dict))
  481. db.expire(_key,3600*3)
  482. return _extract_json
  483. except Exception as e:
  484. log("putExtract_json_toRedis error%s"%(str(e)))
  485. traceback.print_exc()
  486. finally:
  487. try:
  488. if db.connection.check_health():
  489. self.redis_pool.putConnector(db)
  490. except Exception as e:
  491. pass
  492. def getAttachments(self,list_filemd5,_dochtmlcon):
  493. conn = self.attach_pool.getConnector()
  494. #搜索postgres
  495. try:
  496. to_find_md5 = []
  497. for _filemd5 in list_filemd5[:50]:
  498. if _filemd5 is not None:
  499. to_find_md5.append(_filemd5)
  500. conditions = ["filemd5 in ('%s')"%("','".join(to_find_md5))]
  501. list_attachment = []
  502. set_md5 = set()
  503. # list_attachment = Attachment_postgres.select_rows(conn,Attachment_postgres,"attachment",conditions)
  504. # for _attach in list_attachment:
  505. # set_md5.add(_attach.getProperties().get(attachment_filemd5))
  506. for _filemd5 in to_find_md5:
  507. _json = self.getAttach_json_fromRedis(_filemd5)
  508. if _json is not None:
  509. set_md5.add(_filemd5)
  510. list_attachment.append(Attachment_postgres(json.loads(_json)))
  511. log("select localpath database %d/%d"%(len(set_md5),len(to_find_md5)))
  512. for _filemd5 in to_find_md5:
  513. if _filemd5 not in set_md5:
  514. _path = self.getAttachPath(_filemd5,_dochtmlcon)
  515. log("getAttachments search in ots:%s"%(_filemd5))
  516. _attach = {attachment_filemd5:_filemd5}
  517. _attach_ots = attachment(_attach)
  518. if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time,attachment_classification],True):
  519. if _attach_ots.getProperties().get(attachment_status) is not None:
  520. log("getAttachments find in ots:%s"%(_filemd5))
  521. _attach_pg = Attachment_postgres(_attach_ots.getProperties())
  522. _attach_pg.setValue("ots_exists",True,True)
  523. list_attachment.append(_attach_pg)
  524. else:
  525. log("getAttachments search in path:%s"%(_filemd5))
  526. if _path:
  527. log("getAttachments find in path:%s"%(_filemd5))
  528. if _path[0]=="/":
  529. _path = _path[1:]
  530. _filetype = _path.split(".")[-1]
  531. _attach = {attachment_filemd5:_filemd5,
  532. attachment_filetype:_filetype,
  533. attachment_status:20,
  534. attachment_path:"%s/%s"%(_filemd5[:4],_path),
  535. attachment_crtime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  536. list_attachment.append(Attachment_postgres(_attach))
  537. return list_attachment
  538. except Exception as e:
  539. log("attachProcess comsumer error %s"%str(e))
  540. log(str(to_find_md5))
  541. traceback.print_exc()
  542. return []
  543. finally:
  544. self.attach_pool.putConnector(conn)
  545. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  546. q_size = self.queue_attachment.qsize()
  547. qsize_ocr = self.queue_attachment_ocr.qsize()
  548. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  549. log("queue_attachment:%d,queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(q_size,qsize_ocr,qsize_not_ocr))
  550. def flow_attachment_producer_comsumer(self):
  551. log("start flow_attachment comsumer")
  552. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1,need_stop=False,restart=True)
  553. mt.run()
  554. def flow_attachment_process(self):
  555. self.process_comsumer()
  556. # p = Process(target = self.process_comsumer)
  557. # p.start()
  558. # p.join()
  559. def set_queue(self,_dict):
  560. list_attach = _dict.get("list_attach")
  561. to_ocr = False
  562. for attach in list_attach:
  563. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  564. to_ocr = True
  565. break
  566. if to_ocr:
  567. self.queue_attachment_ocr.put(_dict,True)
  568. else:
  569. self.queue_attachment_not_ocr.put(_dict,True)
  570. def comsumer_handle(self,_dict,result_queue):
  571. try:
  572. frame = _dict["frame"]
  573. conn = _dict["conn"]
  574. message_id = frame.headers["message-id"]
  575. item = json.loads(frame.body)
  576. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  577. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  578. if len(page_attachments)==0:
  579. self.set_queue({"item":item,"list_attach":[],"message_id":message_id,"conn":conn})
  580. else:
  581. list_fileMd5 = []
  582. for _atta in page_attachments:
  583. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  584. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  585. self.set_queue({"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn})
  586. except Exception as e:
  587. traceback.print_exc()
  588. def remove_attachment_postgres(self):
  589. current_date = getCurrent_date(format="%Y-%m-%d")
  590. last_date = timeAdd(current_date,-2,format="%Y-%m-%d")
  591. sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
  592. conn = self.attach_pool.getConnector()
  593. try:
  594. cursor = conn.cursor()
  595. cursor.execute(sql)
  596. conn.commit()
  597. self.attach_pool.putConnector(conn)
  598. except Exception as e:
  599. conn.close()
  600. def start_flow_attachment(self):
  601. schedule = BlockingScheduler()
  602. # schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  603. # schedule.add_job(self.flow_attachment,"cron",second="*/10")
  604. # schedule.add_job(self.flow_attachment_producer,"cron",second="*/10")
  605. # schedule.add_job(self.flow_attachment_producer_comsumer,"cron",second="*/10")
  606. # schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  607. # schedule.add_job(self.monitor_attachment_process,"cron",second="*/10")
  608. # schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
  609. schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
  610. schedule.start()
  611. class Dataflow_ActivteMQ_extract(Dataflow_extract):
  612. class ExtractListener():
  613. def __init__(self,conn,_func,_idx,*args,**kwargs):
  614. self.conn = conn
  615. self._func = _func
  616. self._idx = _idx
  617. def on_message(self, headers):
  618. try:
  619. log("get message of idx:%d"%(self._idx))
  620. message_id = headers.headers["message-id"]
  621. body = headers.body
  622. log("get message %s crtime:%s"%(message_id,json.loads(body).get("crtime","")))
  623. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  624. except Exception as e:
  625. traceback.print_exc()
  626. pass
  627. def on_error(self, headers):
  628. log('received an error %s' % str(headers.body))
  629. def __del__(self):
  630. self.conn.disconnect()
  631. def __init__(self,create_listener=True):
  632. Dataflow_extract.__init__(self)
  633. self.industy_url = "http://127.0.0.1:15000/industry_extract"
  634. self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",20],
  635. ["http://192.168.0.115:15030/content_extract",10]
  636. ]
  637. self.mq_extract = "/queue/dataflow_extract"
  638. self.mq_extract_failed = "/queue/dataflow_extract_failed"
  639. self.whole_weight = 0
  640. for _url,weight in self.extract_interfaces:
  641. self.whole_weight+= weight
  642. current_weight = 0
  643. for _i in range(len(self.extract_interfaces)):
  644. current_weight += self.extract_interfaces[_i][1]
  645. self.extract_interfaces[_i][1] = current_weight/self.whole_weight
  646. self.comsumer_count = 5
  647. # self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres)
  648. self.pool_redis_doc = ConnectorPool(1,self.comsumer_count,getConnect_redis_doc)
  649. self.conn_mq = getConnect_activateMQ()
  650. self.pool_mq = ConnectorPool(1,30,getConnect_activateMQ)
  651. self.block_url = RLock()
  652. self.url_count = 0
  653. self.session = None
  654. self.list_extract_comsumer = []
  655. # for _i in range(self.comsumer_count):
  656. # listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  657. # createComsumer(listener_extract,self.mq_extract)
  658. # self.list_extract_comsumer.append(listener_extract)
  659. if create_listener:
  660. for ii in range(10):
  661. listener_p = Process(target=self.start_extract_listener)
  662. listener_p.start()
  663. def start_extract_listener(self):
  664. self.list_extract_comsumer = []
  665. for _i in range(self.comsumer_count):
  666. listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  667. createComsumer(listener_extract,self.mq_extract)
  668. self.list_extract_comsumer.append(listener_extract)
  669. while 1:
  670. try:
  671. for _i in range(len(self.list_extract_comsumer)):
  672. if self.list_extract_comsumer[_i].conn.is_connected():
  673. continue
  674. else:
  675. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  676. createComsumer(listener,self.mq_extract)
  677. self.list_extract_comsumer[_i] = listener
  678. time.sleep(5)
  679. except Exception as e:
  680. traceback.print_exc()
  681. def monitor_listener(self):
  682. for i in range(len(self.list_extract_comsumer)):
  683. if self.list_extract_comsumer[i].conn.is_connected():
  684. continue
  685. else:
  686. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
  687. createComsumer(listener,self.mq_extract)
  688. self.list_extract_comsumer[i] = listener
  689. def getExtract_url(self):
  690. # _url_num = 0
  691. # with self.block_url:
  692. # self.url_count += 1
  693. # self.url_count %= self.whole_weight
  694. # _url_num = self.url_count
  695. _r = random.random()
  696. # _r = _url_num/self.whole_weight
  697. for _i in range(len(self.extract_interfaces)):
  698. if _r<=self.extract_interfaces[_i][1]:
  699. return self.extract_interfaces[_i][0]
  700. def request_extract_interface(self,json,headers):
  701. # _i = random.randint(0,len(self.extract_interfaces)-1)
  702. # _i = 0
  703. # _url = self.extract_interfaces[_i]
  704. _url = self.getExtract_url()
  705. log("extract_url:%s"%(str(_url)))
  706. with requests.Session() as session:
  707. resp = session.post(_url,json=json,headers=headers,timeout=10*60)
  708. return resp
  709. def request_industry_interface(self,json,headers):
  710. resp = requests.post(self.industy_url,json=json,headers=headers)
  711. return resp
  712. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  713. q_size = self.queue_extract.qsize()
  714. log("queue extract size:%d"%(q_size))
  715. def process_extract_failed(self):
  716. def _handle(_dict,result_queue):
  717. frame = _dict.get("frame")
  718. message_id = frame.headers["message-id"]
  719. subscription = frame.headers.setdefault('subscription', None)
  720. conn = _dict.get("conn")
  721. body = frame.body
  722. if body is not None:
  723. item = json.loads(body)
  724. item["extract_times"] = 10
  725. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  726. ackMsg(conn,message_id,subscription)
  727. from BaseDataMaintenance.java.MQInfo import getQueueSize
  728. try:
  729. extract_failed_size = getQueueSize("dataflow_extract_failed")
  730. extract_size = getQueueSize("dataflow_extract")
  731. log("extract_failed_size %s extract_size %s"%(str(extract_failed_size),str(extract_size)))
  732. if extract_failed_size>0 and extract_size<100:
  733. failed_listener = self.ExtractListener(getConnect_activateMQ(),_handle,1)
  734. createComsumer(failed_listener,self.mq_extract_failed)
  735. while 1:
  736. extract_failed_size = getQueueSize("dataflow_extract_failed")
  737. if extract_failed_size==0:
  738. break
  739. time.sleep(10)
  740. failed_listener.conn.disconnect()
  741. except Exception as e:
  742. traceback.print_exc()
  743. def flow_extract(self,):
  744. self.comsumer()
  745. def comsumer(self):
  746. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,20,1,True)
  747. mt.run()
  748. def getExtract_json_fromDB(self,_fingerprint):
  749. conn = self.pool_postgres.getConnector()
  750. try:
  751. list_extract = Document_extract_postgres.select_rows(conn,Document_extract_postgres,"document_extract",[" fingerprint='%s'"%_fingerprint])
  752. if len(list_extract)>0:
  753. _extract = list_extract[0]
  754. return _extract.getProperties().get(document_extract_extract_json)
  755. except Exception as e:
  756. traceback.print_exc()
  757. finally:
  758. self.pool_postgres.putConnector(conn)
  759. return None
  760. def putExtract_json_toDB(self,fingerprint,docid,extract_json):
  761. _de = Document_extract_postgres({document_extract_fingerprint:fingerprint,
  762. document_extract_docid:docid,
  763. document_extract_extract_json:extract_json})
  764. _de.insert_row(self.pool_postgres,1)
  765. def getExtract_json_fromRedis(self,_fingerprint):
  766. db = self.pool_redis_doc.getConnector()
  767. try:
  768. _extract_json = db.get(_fingerprint)
  769. return _extract_json
  770. except Exception as e:
  771. log("getExtract_json_fromRedis error %s"%(str(e)))
  772. finally:
  773. try:
  774. if db.connection.check_health():
  775. self.pool_redis_doc.putConnector(db)
  776. except Exception as e:
  777. pass
  778. return None
  779. def putExtract_json_toRedis(self,fingerprint,extract_json):
  780. db = self.pool_redis_doc.getConnector()
  781. try:
  782. _extract_json = db.set(str(fingerprint),extract_json)
  783. db.expire(fingerprint,3600*2)
  784. return _extract_json
  785. except Exception as e:
  786. log("putExtract_json_toRedis error%s"%(str(e)))
  787. traceback.print_exc()
  788. finally:
  789. try:
  790. if db.connection.check_health():
  791. self.pool_redis_doc.putConnector(db)
  792. except Exception as e:
  793. pass
  794. def comsumer_handle(self,_dict,result_queue):
  795. try:
  796. log("start handle")
  797. data = {}
  798. frame = _dict["frame"]
  799. conn = _dict["conn"]
  800. message_id = frame.headers["message-id"]
  801. subscription = frame.headers.setdefault('subscription', None)
  802. item = json.loads(frame.body)
  803. dtmp = Document_tmp(item)
  804. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  805. "docid":item.get("docid")})
  806. extract_times = item.get("extract_times",0)+1
  807. item["extract_times"] = extract_times
  808. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  809. html_len = len(_dochtmlcon) # html 文本长度
  810. limit_text_len = 50000 # 内容(或附件)正文限制文本长度
  811. if html_len > limit_text_len:
  812. log("docid %s dochtmlcon too long len %d "%(str(item.get("docid")),html_len))
  813. try:
  814. _dochtmlcon = re.sub("<html>|</html>|<body>|</body>", "", _dochtmlcon)
  815. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  816. all_len = len(_soup.get_text()) # 全公告内容text长度
  817. _attachment = _soup.find("div", attrs={"class": "richTextFetch"})
  818. attachment_len = len(_attachment.get_text()) if _attachment else 0 # 附件内容text长度
  819. main_text_len = all_len - attachment_len # 正文内容text长度
  820. if attachment_len>150000: # 附件内容过长删除(处理超时)
  821. if _attachment is not None:
  822. _attachment.decompose()
  823. attachment_len = 0
  824. # 正文或附件内容text长度大于limit_text_len才执行article_limit
  825. if main_text_len>limit_text_len or attachment_len>limit_text_len:
  826. _soup = article_limit(_soup,limit_text_len)
  827. _dochtmlcon = str(_soup)
  828. except Exception as e:
  829. traceback.print_exc()
  830. ackMsg(conn,message_id,subscription)
  831. return
  832. log("docid %s len %d limit to %d"%(str(item.get("docid")),html_len,len(_dochtmlcon)))
  833. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  834. _extract = Document_extract({})
  835. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  836. _extract.setValue(document_extract2_docid,item.get(document_docid))
  837. all_done = 1
  838. for k,v in item.items():
  839. data[k] = v
  840. data["timeout"] = 440
  841. data["doc_id"] = data.get(document_tmp_docid,0)
  842. # if data["docid"]<298986054 and data["docid"]>0:
  843. # log("jump docid %s"%(str(data["docid"])))
  844. # ackMsg(conn,message_id,subscription)
  845. # return
  846. data["content"] = data.get(document_tmp_dochtmlcon,"")
  847. if document_tmp_dochtmlcon in data:
  848. data.pop(document_tmp_dochtmlcon)
  849. data["title"] = data.get(document_tmp_doctitle,"")
  850. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  851. data["web_source_name"] = item.get(document_tmp_web_source_name,"")
  852. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  853. data["page_attachments"] = item.get(document_tmp_attachment_path,"[]")
  854. _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))+str(data["original_docchannel"])
  855. if all_done>0:
  856. _time = time.time()
  857. # extract_json = self.getExtract_json_fromDB(_fingerprint)
  858. extract_json = self.getExtract_json_fromRedis(_fingerprint)
  859. log("get json from db takes %.4f"%(time.time()-_time))
  860. # extract_json = None
  861. _docid = int(data["doc_id"])
  862. if extract_json is not None:
  863. log("fingerprint %s exists docid:%s"%(_fingerprint,str(_docid)))
  864. _extract.setValue(document_extract2_extract_json,extract_json,True)
  865. else:
  866. resp = self.request_extract_interface(json=data,headers=self.header)
  867. if (resp.status_code >=200 and resp.status_code<=213):
  868. extract_json = resp.content.decode("utf8")
  869. _extract.setValue(document_extract2_extract_json,extract_json,True)
  870. _time = time.time()
  871. # self.putExtract_json_toDB(_fingerprint,_docid,extract_json)
  872. self.putExtract_json_toRedis(_fingerprint,extract_json)
  873. log("get json to db takes %.4f"%(time.time()-_time))
  874. else:
  875. log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  876. all_done = -2
  877. # if all_done>0:
  878. # resp = self.request_industry_interface(json=data,headers=self.header)
  879. # if (resp.status_code >=200 and resp.status_code<=213):
  880. # _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  881. # else:
  882. # log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  883. # all_done = -3
  884. # _to_ack = False
  885. # if all_done>0 and len(_extract.getProperties().get(document_extract2_extract_json,""))<=2:
  886. # all_done = -4
  887. _extract.setValue(document_extract2_industry_json,"{}",True)
  888. _to_ack = True
  889. try:
  890. if all_done!=1:
  891. # sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  892. log("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  893. if extract_times>=10:
  894. #process as succeed
  895. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  896. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  897. dtmp.update_row(self.ots_client)
  898. dhtml.update_row(self.ots_client)
  899. #replace as {}
  900. _extract.setValue(document_extract2_extract_json,"{}",True)
  901. _extract.setValue(document_extract2_industry_json,"{}",True)
  902. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  903. _extract.update_row(self.ots_client)
  904. _to_ack = True
  905. elif extract_times>5:
  906. #transform to the extract_failed queue
  907. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  908. #process as succeed
  909. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  910. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  911. dtmp.update_row(self.ots_client)
  912. dhtml.update_row(self.ots_client)
  913. #replace as {}
  914. _extract.setValue(document_extract2_extract_json,"{}",True)
  915. _extract.setValue(document_extract2_industry_json,"{}",True)
  916. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  917. _extract.update_row(self.ots_client)
  918. _to_ack = True
  919. else:
  920. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
  921. #失败保存
  922. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  923. dtmp.setValue(document_tmp_status,60,True)
  924. if not dtmp.exists_row(self.ots_client):
  925. dtmp.update_row(self.ots_client)
  926. dhtml.update_row(self.ots_client)
  927. if send_succeed:
  928. _to_ack = True
  929. else:
  930. #process succeed
  931. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  932. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  933. if _docid==290816703:
  934. dtmp.setValue("test_json",extract_json,True)
  935. dtmp.update_row(self.ots_client)
  936. dhtml.update_row(self.ots_client)
  937. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  938. _extract.update_row(self.ots_client)
  939. _to_ack = True
  940. except Exception:
  941. traceback.print_exc()
  942. if _to_ack:
  943. ackMsg(conn,message_id,subscription)
  944. else:
  945. item["extract_times"] -= 1
  946. send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
  947. ackMsg(conn,message_id,subscription)
  948. log("process %s docid:%d %s"%(str(_to_ack),data.get("doc_id"),str(all_done)))
  949. except requests.ConnectionError as e1:
  950. item["extract_times"] -= 1
  951. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  952. ackMsg(conn,message_id,subscription)
  953. except Exception as e:
  954. traceback.print_exc()
  955. # sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
  956. log("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
  957. log("process %s docid: failed message_id:%s"%(data.get("doc_id"),message_id))
  958. if extract_times>=10:
  959. #process as succeed
  960. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  961. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  962. dtmp.update_row(self.ots_client)
  963. dhtml.update_row(self.ots_client)
  964. #replace as {}
  965. _extract.setValue(document_extract2_extract_json,"{}",True)
  966. _extract.setValue(document_extract2_industry_json,"{}",True)
  967. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  968. _extract.update_row(self.ots_client)
  969. ackMsg(conn,message_id,subscription)
  970. elif extract_times>5:
  971. #transform to the extract_failed queue
  972. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  973. #process as succeed
  974. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  975. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  976. dtmp.update_row(self.ots_client)
  977. dhtml.update_row(self.ots_client)
  978. #replace as {}
  979. _extract.setValue(document_extract2_extract_json,"{}",True)
  980. _extract.setValue(document_extract2_industry_json,"{}",True)
  981. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  982. _extract.update_row(self.ots_client)
  983. ackMsg(conn,message_id,subscription)
  984. else:
  985. #transform to the extract queue
  986. #失败保存
  987. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  988. dtmp.setValue(document_tmp_status,60,True)
  989. if not dtmp.exists_row(self.ots_client):
  990. dtmp.update_row(self.ots_client)
  991. dhtml.update_row(self.ots_client)
  992. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  993. ackMsg(conn,message_id,subscription)
  994. def delete_document_extract(self,save_count=70*10000):
  995. conn = self.pool_postgres.getConnector()
  996. try:
  997. cursor = conn.cursor()
  998. sql = " select max(docid),min(docid) from document_extract "
  999. cursor.execute(sql)
  1000. rows = cursor.fetchall()
  1001. if len(rows)>0:
  1002. maxdocid,mindocid = rows[0]
  1003. d_mindocid = int(maxdocid)-save_count
  1004. if mindocid<d_mindocid:
  1005. sql = " delete from document_extract where docid<%d"%d_mindocid
  1006. cursor.execute(sql)
  1007. conn.commit()
  1008. except Exception as e:
  1009. traceback.print_exc()
  1010. finally:
  1011. self.pool_postgres.putConnector(conn)
  1012. def start_flow_extract(self):
  1013. schedule = BlockingScheduler()
  1014. schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
  1015. schedule.add_job(self.process_extract_failed,"cron",minute="*/5")
  1016. # schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
  1017. # schedule.add_job(self.monitor_listener,"cron",minute="*/5")
  1018. schedule.start()
  1019. from multiprocessing import RLock
  1020. docid_lock = RLock()
  1021. conn_mysql = None
  1022. def generateRangeDocid(nums):
  1023. global conn_mysql
  1024. while 1:
  1025. try:
  1026. with docid_lock:
  1027. if conn_mysql is None:
  1028. conn_mysql = getConnection_mysql()
  1029. cursor = conn_mysql.cursor()
  1030. sql = "select serial_value from b2c_serial_no where serial_name='DocumentIdSerial'"
  1031. cursor.execute(sql)
  1032. rows = cursor.fetchall()
  1033. current_docid = rows[0][0]
  1034. next_docid = current_docid+1
  1035. update_docid = current_docid+nums
  1036. sql = " update b2c_serial_no set serial_value=%d where serial_name='DocumentIdSerial'"%(update_docid)
  1037. cursor.execute(sql)
  1038. conn_mysql.commit()
  1039. return next_docid
  1040. except Exception as e:
  1041. conn_mysql = getConnection_mysql()
  1042. # 自定义jsonEncoder
  1043. class MyEncoder(json.JSONEncoder):
  1044. def default(self, obj):
  1045. if isinstance(obj, np.ndarray):
  1046. return obj.tolist()
  1047. elif isinstance(obj, bytes):
  1048. return str(obj, encoding='utf-8')
  1049. elif isinstance(obj, (np.float_, np.float16, np.float32,
  1050. np.float64,Decimal)):
  1051. return float(obj)
  1052. elif isinstance(obj,str):
  1053. return obj
  1054. return json.JSONEncoder.default(self, obj)
  1055. class Dataflow_init(Dataflow):
  1056. class InitListener():
  1057. def __init__(self,conn,*args,**kwargs):
  1058. self.conn = conn
  1059. self.get_count = 1000
  1060. self.count = self.get_count
  1061. self.begin_docid = None
  1062. self.mq_init = "/queue/dataflow_init"
  1063. self.mq_attachment = "/queue/dataflow_attachment"
  1064. self.mq_extract = "/queue/dataflow_extract"
  1065. self.pool_mq1 = ConnectorPool(1,4,getConnect_activateMQ)
  1066. def on_error(self, headers):
  1067. log('received an error %s' % headers.body)
  1068. def getRangeDocid(self):
  1069. begin_docid = generateRangeDocid(self.get_count)
  1070. self.begin_docid = begin_docid
  1071. self.count = 0
  1072. def getNextDocid(self):
  1073. if self.count>=self.get_count:
  1074. self.getRangeDocid()
  1075. next_docid = self.begin_docid+self.count
  1076. self.count += 1
  1077. return next_docid
  1078. def on_message(self, headers):
  1079. try:
  1080. next_docid = int(self.getNextDocid())
  1081. partitionkey = int(next_docid%500+1)
  1082. message_id = headers.headers["message-id"]
  1083. body = json.loads(headers.body)
  1084. body[document_tmp_partitionkey] = partitionkey
  1085. body[document_tmp_docid] = next_docid
  1086. if body.get(document_original_docchannel) is None:
  1087. body[document_original_docchannel] = body.get(document_docchannel)
  1088. page_attachments = body.get(document_tmp_attachment_path,"[]")
  1089. _uuid = body.get(document_tmp_uuid,"")
  1090. if page_attachments!="[]":
  1091. status = random.randint(1,10)
  1092. body[document_tmp_status] = status
  1093. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment):
  1094. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  1095. ackMsg(self.conn,message_id)
  1096. else:
  1097. log("send_msg_error on init listener")
  1098. else:
  1099. status = random.randint(11,50)
  1100. body[document_tmp_status] = status
  1101. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract):
  1102. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  1103. ackMsg(self.conn,message_id)
  1104. else:
  1105. log("send_msg_error on init listener")
  1106. except Exception as e:
  1107. traceback.print_exc()
  1108. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_init):
  1109. log("init error")
  1110. ackMsg(self.conn,message_id)
  1111. def __del__(self):
  1112. self.conn.disconnect()
  1113. del self.pool_mq1
  1114. def __init__(self):
  1115. Dataflow.__init__(self)
  1116. self.max_shenpi_id = None
  1117. self.base_shenpi_id = 400000000000
  1118. self.mq_init = "/queue/dataflow_init"
  1119. self.mq_attachment = "/queue/dataflow_attachment"
  1120. self.mq_extract = "/queue/dataflow_extract"
  1121. self.pool_oracle = ConnectorPool(10,15,getConnection_oracle)
  1122. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  1123. self.ots_capacity = getConnect_ots_capacity()
  1124. self.init_comsumer_counts = 2
  1125. self.list_init_comsumer = []
  1126. for i in range(self.init_comsumer_counts):
  1127. listener = self.InitListener(getConnect_activateMQ())
  1128. createComsumer(listener,self.mq_init)
  1129. self.list_init_comsumer.append(listener)
  1130. def monitor_listener(self):
  1131. for i in range(len(self.list_init_comsumer)):
  1132. if self.list_init_comsumer[i].conn.is_connected():
  1133. continue
  1134. else:
  1135. listener = self.InitListener(getConnect_activateMQ())
  1136. createComsumer(listener,self.mq_init)
  1137. self.list_init_comsumer[i] = listener
  1138. def temp2mq(self,object):
  1139. conn_oracle = self.pool_oracle.getConnector()
  1140. try:
  1141. list_obj = object.select_rows(conn_oracle,type(object),object.table_name,[])
  1142. for _obj in list_obj:
  1143. ots_dict = _obj.getProperties_ots()
  1144. if len(ots_dict.get("dochtmlcon",""))>500000:
  1145. _obj.delete_row(conn_oracle)
  1146. log("msg too long:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  1147. continue
  1148. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_init):
  1149. #删除数据,上线放开
  1150. _obj.delete_row(conn_oracle)
  1151. else:
  1152. log("send_msg_error111:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  1153. self.pool_oracle.putConnector(conn_oracle)
  1154. except Exception as e:
  1155. traceback.print_exc()
  1156. self.pool_oracle.decrease()
  1157. def shenpi2mq(self):
  1158. conn_oracle = self.pool_oracle.getConnector()
  1159. try:
  1160. if self.max_shenpi_id is None:
  1161. # get the max_shenpi_id
  1162. _query = BoolQuery(must_queries=[ExistsQuery("id")])
  1163. rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
  1164. SearchQuery(_query,sort=Sort(sorters=[FieldSort("id",SortOrder.DESC)]),limit=1))
  1165. list_data = getRow_ots(rows)
  1166. if len(list_data)>0:
  1167. max_shenpi_id = list_data[0].get("id")
  1168. if max_shenpi_id>self.base_shenpi_id:
  1169. max_shenpi_id -= self.base_shenpi_id
  1170. self.max_shenpi_id = max_shenpi_id
  1171. if self.max_shenpi_id<60383953:
  1172. self.max_shenpi_id = 60383953
  1173. if self.max_shenpi_id is not None:
  1174. # select data in order
  1175. origin_max_shenpi_id = T_SHEN_PI_XIANG_MU.get_max_id(conn_oracle)
  1176. if origin_max_shenpi_id is not None:
  1177. log("shenpi origin_max_shenpi_id:%d current_id:%d"%(origin_max_shenpi_id,self.max_shenpi_id))
  1178. for _id_i in range(self.max_shenpi_id+1,origin_max_shenpi_id+1):
  1179. list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i)
  1180. # send data to mq one by one with max_shenpi_id updated
  1181. for _data in list_data:
  1182. _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
  1183. ots_dict = _data.getProperties_ots()
  1184. if ots_dict["docid"]<self.base_shenpi_id:
  1185. ots_dict["docid"] += self.base_shenpi_id
  1186. ots_dict["partitionkey"] = ots_dict["docid"]%500+1
  1187. if ots_dict.get(T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS,"") !='[]':
  1188. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_attachment):
  1189. self.max_shenpi_id = _id
  1190. else:
  1191. log("sent shenpi message to mq failed %s"%(_id))
  1192. break
  1193. else:
  1194. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_extract):
  1195. self.max_shenpi_id = _id
  1196. else:
  1197. log("sent shenpi message to mq failed %s"%(_id))
  1198. break
  1199. self.pool_oracle.putConnector(conn_oracle)
  1200. except Exception as e:
  1201. log("shenpi error")
  1202. traceback.print_exc()
  1203. self.pool_oracle.decrease()
  1204. def fix_shenpi(self):
  1205. pool_oracle = ConnectorPool(10,15,getConnection_oracle)
  1206. begin_id = 0
  1207. end_id = 64790010
  1208. thread_num = 15
  1209. step = (end_id-begin_id)//thread_num
  1210. list_items = []
  1211. for _i in range(thread_num):
  1212. _begin = _i*step
  1213. _end = (_i+1)*step-1
  1214. if _i==thread_num-1:
  1215. _end = end_id
  1216. list_items.append((_begin,_end,_i))
  1217. task_queue = Queue()
  1218. for item in list_items:
  1219. task_queue.put(item)
  1220. fix_count_list = []
  1221. def _handle(item,result_queue):
  1222. conn_oracle = pool_oracle.getConnector()
  1223. (begin_id,end_id,thread_id) = item
  1224. _count = 0
  1225. for _id_i in range(begin_id,end_id):
  1226. try:
  1227. bool_query = BoolQuery(must_queries=[
  1228. TermQuery("docchannel",302),
  1229. TermQuery("original_id",_id_i)
  1230. ])
  1231. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1232. SearchQuery(bool_query,get_total_count=True))
  1233. if total_count>0:
  1234. continue
  1235. # bool_query = BoolQuery(must_queries=[
  1236. # TermQuery("id",_id_i),
  1237. # ])
  1238. # rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
  1239. # SearchQuery(bool_query,get_total_count=True))
  1240. # if total_count>0:
  1241. # continue
  1242. try:
  1243. list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i)
  1244. except Exception as e:
  1245. continue
  1246. # send data to mq one by one with max_shenpi_id updated
  1247. for _data in list_data:
  1248. _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
  1249. ots_dict = _data.getProperties_ots()
  1250. if ots_dict["docid"]<self.base_shenpi_id:
  1251. ots_dict["docid"] += self.base_shenpi_id
  1252. ots_dict["partitionkey"] = ots_dict["docid"]%500+1
  1253. ots_dict["status"] = 201
  1254. dict_1 = {}
  1255. dict_2 = {}
  1256. for k,v in ots_dict.items():
  1257. if k!="dochtmlcon":
  1258. dict_1[k] = v
  1259. if k in ('partitionkey',"docid","dochtmlcon"):
  1260. dict_2[k] = v
  1261. d_1 = Document(dict_1)
  1262. d_2 = Document(dict_2)
  1263. d_1.update_row(self.ots_client)
  1264. d_2.update_row(self.ots_capacity)
  1265. _count += 1
  1266. except Exception as e:
  1267. traceback.print_exc()
  1268. log("thread_id:%d=%d/%d/%d"%(thread_id,_id_i-begin_id,_count,end_id-begin_id))
  1269. fix_count_list.append(_count)
  1270. pool_oracle.putConnector(conn_oracle)
  1271. mt = MultiThreadHandler(task_queue,_handle,None,thread_count=thread_num)
  1272. mt.run()
  1273. print(fix_count_list,sum(fix_count_list))
  1274. def ots2mq(self):
  1275. try:
  1276. bool_query = BoolQuery(must_queries=[RangeQuery("status",1,51)])
  1277. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1278. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  1279. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1280. list_data = getRow_ots(rows)
  1281. task_queue = Queue()
  1282. for _data in list_data:
  1283. task_queue.put(_data)
  1284. while next_token:
  1285. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1286. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1287. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1288. list_data = getRow_ots(rows)
  1289. for _data in list_data:
  1290. task_queue.put(_data)
  1291. if task_queue.qsize()>=1000:
  1292. break
  1293. def _handle(_data,result_queue):
  1294. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1295. document_tmp_docid:_data.get(document_tmp_docid),
  1296. document_tmp_status:0}
  1297. _document = Document(_d)
  1298. _document.fix_columns(self.ots_client,None,True)
  1299. _data = _document.getProperties()
  1300. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1301. _document_html = Document(_data)
  1302. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  1303. if page_attachments!="[]":
  1304. status = random.randint(1,10)
  1305. _data[document_tmp_status] = status
  1306. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1307. else:
  1308. status = random.randint(11,50)
  1309. _data[document_tmp_status] = status
  1310. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1311. if send_succeed:
  1312. _document.setValue(document_tmp_status,0,True)
  1313. _document.update_row(self.ots_client)
  1314. else:
  1315. log("send_msg_error2222")
  1316. if task_queue.qsize()>0:
  1317. mt = MultiThreadHandler(task_queue,_handle,None,15)
  1318. mt.run()
  1319. except Exception as e:
  1320. traceback.print_exc()
  1321. def otstmp2mq(self):
  1322. try:
  1323. bool_query = BoolQuery(must_queries=[TermQuery("status",0)])
  1324. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1325. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  1326. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1327. list_data = getRow_ots(rows)
  1328. for _data in list_data:
  1329. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1330. document_tmp_docid:_data.get(document_tmp_docid),
  1331. document_tmp_status:0}
  1332. _document = Document_tmp(_d)
  1333. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1334. log("refix doc %s from document_tmp"%(str(_data.get(document_tmp_docid))))
  1335. _document_html = Document_html(_data)
  1336. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  1337. if page_attachments!="[]":
  1338. status = random.randint(1,10)
  1339. _data[document_tmp_status] = status
  1340. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1341. else:
  1342. status = random.randint(11,50)
  1343. _data[document_tmp_status] = status
  1344. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1345. if send_succeed:
  1346. _document.setValue(document_tmp_status,1,True)
  1347. _document.update_row(self.ots_client)
  1348. else:
  1349. log("send_msg_error2222")
  1350. while next_token:
  1351. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1352. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1353. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1354. list_data = getRow_ots(rows)
  1355. for _data in list_data:
  1356. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1357. document_tmp_docid:_data.get(document_tmp_docid),
  1358. document_tmp_status:0}
  1359. _document = Document_tmp(_d)
  1360. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1361. _document_html = Document_html(_data)
  1362. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  1363. if page_attachments!="[]":
  1364. status = random.randint(1,10)
  1365. _data[document_tmp_status] = status
  1366. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1367. else:
  1368. status = random.randint(11,50)
  1369. _data[document_tmp_status] = status
  1370. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1371. if send_succeed:
  1372. _document.setValue(document_tmp_status,1,True)
  1373. _document.update_row(self.ots_client)
  1374. else:
  1375. log("send_msg_error2222")
  1376. except Exception as e:
  1377. traceback.print_exc()
  1378. def test_dump_docid(self):
  1379. class TestDumpListener(ActiveMQListener):
  1380. def on_message(self, headers):
  1381. message_id = headers.headers["message-id"]
  1382. body = headers.body
  1383. self._queue.put(headers,True)
  1384. ackMsg(self.conn,message_id)
  1385. _queue = Queue()
  1386. listener1 = TestDumpListener(getConnect_activateMQ(),_queue)
  1387. listener2 = TestDumpListener(getConnect_activateMQ(),_queue)
  1388. createComsumer(listener1,"/queue/dataflow_attachment")
  1389. createComsumer(listener2,"/queue/dataflow_extract")
  1390. time.sleep(10)
  1391. list_item = []
  1392. list_docid = []
  1393. while 1:
  1394. try:
  1395. _item = _queue.get(timeout=2)
  1396. list_item.append(_item)
  1397. except Exception as e:
  1398. break
  1399. for item in list_item:
  1400. _item = json.loads(item.body)
  1401. list_docid.append(_item.get("docid"))
  1402. log(list_docid[:10])
  1403. log("len docid:%d set len:%d"%(len(list_docid),len(set(list_docid))))
  1404. def start_dataflow_init(self):
  1405. # self.test_dump_docid()
  1406. from BaseDataMaintenance.model.oracle.CaiGouYiXiangTemp import CaiGouYiXiangTemp
  1407. from BaseDataMaintenance.model.oracle.PaiMaiChuRangTemp import PaiMaiChuRangTemp
  1408. from BaseDataMaintenance.model.oracle.ZhaoBiaoGongGaoTemp import ZhaoBiaoGongGaoTemp
  1409. from BaseDataMaintenance.model.oracle.ZhaoBiaoYuGaoTemp import ZhaoBiaoYuGaoTemp
  1410. from BaseDataMaintenance.model.oracle.ZhongBiaoXinXiTemp import ZhongBiaoXinXiTemp
  1411. from BaseDataMaintenance.model.oracle.ZiShenJieGuoTemp import ZiShenJieGuoTemp
  1412. from BaseDataMaintenance.model.oracle.ChanQuanJiaoYiTemp import ChanQuanJiaoYiTemp
  1413. from BaseDataMaintenance.model.oracle.GongGaoBianGengTemp import GongGaoBianGeng
  1414. from BaseDataMaintenance.model.oracle.KongZhiJiaTemp import KongZhiJiaTemp
  1415. from BaseDataMaintenance.model.oracle.TuDiKuangChanTemp import TuDiKuangChanTemp
  1416. from BaseDataMaintenance.model.oracle.ZhaoBiaoDaYiTemp import ZhaoBiaoDaYiTemp
  1417. from BaseDataMaintenance.model.oracle.ZhaoBiaoWenJianTemp import ZhaoBiaoWenJianTemp
  1418. from BaseDataMaintenance.model.oracle.TouSuChuLiTemp import TouSuChuLiTemp
  1419. from BaseDataMaintenance.model.oracle.WeiFaJiLuTemp import WeiFaJiLuTemp
  1420. from BaseDataMaintenance.model.oracle.QiTaShiXinTemp import QiTaShiXin
  1421. schedule = BlockingScheduler()
  1422. schedule.add_job(self.temp2mq,"cron",args=(CaiGouYiXiangTemp({}),),second="*/10")
  1423. schedule.add_job(self.temp2mq,"cron",args=(PaiMaiChuRangTemp({}),),second="*/10")
  1424. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoGongGaoTemp({}),),second="*/10")
  1425. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoYuGaoTemp({}),),second="*/10")
  1426. schedule.add_job(self.temp2mq,"cron",args=(ZhongBiaoXinXiTemp({}),),second="*/10")
  1427. schedule.add_job(self.temp2mq,"cron",args=(ZiShenJieGuoTemp({}),),second="*/10")
  1428. schedule.add_job(self.temp2mq,"cron",args=(ChanQuanJiaoYiTemp({}),),second="*/10")
  1429. schedule.add_job(self.temp2mq,"cron",args=(GongGaoBianGeng({}),),second="*/10")
  1430. schedule.add_job(self.temp2mq,"cron",args=(KongZhiJiaTemp({}),),second="*/10")
  1431. schedule.add_job(self.temp2mq,"cron",args=(TuDiKuangChanTemp({}),),second="*/10")
  1432. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10")
  1433. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10")
  1434. schedule.add_job(self.temp2mq,"cron",args=(TouSuChuLiTemp({}),),second="*/10")
  1435. schedule.add_job(self.temp2mq,"cron",args=(WeiFaJiLuTemp({}),),second="*/10")
  1436. schedule.add_job(self.temp2mq,"cron",args=(QiTaShiXin({}),),second="*/10")
  1437. schedule.add_job(self.ots2mq,"cron",second="*/10")
  1438. schedule.add_job(self.otstmp2mq,"cron",second="*/10")
  1439. schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  1440. schedule.add_job(self.shenpi2mq,"cron",minute="*/1")
  1441. schedule.start()
  1442. def transform_attachment():
  1443. from BaseDataMaintenance.model.ots.attachment import attachment
  1444. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  1445. from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots
  1446. from threading import Thread
  1447. from queue import Queue
  1448. task_queue = Queue()
  1449. def comsumer(task_queue,pool_postgres):
  1450. while 1:
  1451. _dict = task_queue.get(True)
  1452. try:
  1453. attach = Attachment_postgres(_dict)
  1454. if not attach.exists(pool_postgres):
  1455. attach.insert_row(pool_postgres)
  1456. except Exception as e:
  1457. traceback.print_exc()
  1458. def start_comsumer(task_queue):
  1459. pool_postgres = ConnectorPool(10,30,getConnection_postgres)
  1460. comsumer_count = 30
  1461. list_thread = []
  1462. for i in range(comsumer_count):
  1463. _t = Thread(target=comsumer,args=(task_queue,pool_postgres))
  1464. list_thread.append(_t)
  1465. for _t in list_thread:
  1466. _t.start()
  1467. ots_client = getConnect_ots()
  1468. _thread = Thread(target=start_comsumer,args=(task_queue,))
  1469. _thread.start()
  1470. bool_query = BoolQuery(must_queries=[
  1471. RangeQuery(attachment_crtime,"2022-05-06"),
  1472. BoolQuery(should_queries=[TermQuery(attachment_status,10),
  1473. TermQuery(attachment_status,ATTACHMENT_TOOLARGE)])
  1474. ])
  1475. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  1476. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(attachment_crtime,sort_order=SortOrder.DESC)]),get_total_count=True,limit=100),
  1477. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1478. list_dict = getRow_ots(rows)
  1479. for _dict in list_dict:
  1480. task_queue.put(_dict,True)
  1481. _count = len(list_dict)
  1482. while next_token:
  1483. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  1484. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1485. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1486. list_dict = getRow_ots(rows)
  1487. for _dict in list_dict:
  1488. task_queue.put(_dict,True)
  1489. _count += len(list_dict)
  1490. print("%d/%d,queue:%d"%(_count,total_count,task_queue.qsize()))
  1491. def del_test_doc():
  1492. ots_client = getConnect_ots()
  1493. bool_query = BoolQuery(must_queries=[RangeQuery("docid",range_to=0)])
  1494. list_data = []
  1495. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1496. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  1497. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1498. print(total_count)
  1499. list_row = getRow_ots(rows)
  1500. list_data.extend(list_row)
  1501. while next_token:
  1502. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1503. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1504. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1505. list_row = getRow_ots(rows)
  1506. list_data.extend(list_row)
  1507. for _row in list_data:
  1508. _doc = Document_tmp(_row)
  1509. _doc.delete_row(ots_client)
  1510. _html = Document_html(_row)
  1511. if _html.exists_row(ots_client):
  1512. _html.delete_row(ots_client)
  1513. def fixDoc_to_queue_extract():
  1514. pool_mq = ConnectorPool(10,20,getConnect_activateMQ)
  1515. try:
  1516. ots_client = getConnect_ots()
  1517. ots_capacity = getConnect_ots_capacity()
  1518. bool_query = BoolQuery(must_queries=[
  1519. RangeQuery("crtime","2022-05-31"),
  1520. TermQuery("docchannel",114)
  1521. ])
  1522. list_data = []
  1523. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1524. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  1525. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1526. print(total_count)
  1527. list_row = getRow_ots(rows)
  1528. list_data.extend(list_row)
  1529. _count = len(list_row)
  1530. while next_token:
  1531. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1532. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1533. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1534. list_row = getRow_ots(rows)
  1535. list_data.extend(list_row)
  1536. _count = len(list_data)
  1537. print("%d/%d"%(_count,total_count))
  1538. task_queue = Queue()
  1539. for _row in list_data:
  1540. if "all_columns" in _row:
  1541. _row.pop("all_columns")
  1542. _html = Document(_row)
  1543. task_queue.put(_html)
  1544. def _handle(item,result_queue):
  1545. _html = item
  1546. _html.fix_columns(ots_capacity,["dochtmlcon"],True)
  1547. print(_html.getProperties().get(document_tmp_docid))
  1548. send_msg_toacmq(pool_mq,json.dumps(_html.getProperties()),"/queue/dataflow_extract")
  1549. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1550. mt.run()
  1551. except Exception as e:
  1552. traceback.print_exc()
  1553. finally:
  1554. pool_mq.destory()
  1555. def check_data_synchronization():
  1556. # filepath = "C:\\Users\\Administrator\\Desktop\\to_check.log"
  1557. # list_uuid = []
  1558. # _regrex = "delete\s+(?P<tablename>[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  1559. # with open(filepath,"r",encoding="utf8") as f:
  1560. # while 1:
  1561. # _line = f.readline()
  1562. # if not _line:
  1563. # break
  1564. # _match = re.search(_regrex,_line)
  1565. # if _match is not None:
  1566. # _uuid = _match.groupdict().get("uuid")
  1567. # tablename = _match.groupdict.get("tablename")
  1568. # if _uuid is not None:
  1569. # list_uuid.append({"uuid":_uuid,"tablename":tablename})
  1570. # print("total_count:",len(list_uuid))
  1571. import pandas as pd
  1572. from BaseDataMaintenance.common.Utils import load
  1573. task_queue = Queue()
  1574. list_data = []
  1575. df_data = load("uuid.pk")
  1576. # df_data = pd.read_excel("check.xlsx")
  1577. for uuid,tablename in zip(df_data["uuid"],df_data["tablename"]):
  1578. _dict = {"uuid":uuid,
  1579. "tablename":tablename}
  1580. list_data.append(_dict)
  1581. task_queue.put(_dict)
  1582. print("qsize:",task_queue.qsize())
  1583. ots_client = getConnect_ots()
  1584. def _handle(_item,result_queue):
  1585. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  1586. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1587. SearchQuery(bool_query,get_total_count=True),
  1588. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1589. _item["exists"] = total_count
  1590. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1591. mt.run()
  1592. df_data = {"uuid":[],
  1593. "tablename":[],
  1594. "exists":[]}
  1595. for _data in list_data:
  1596. if _data["exists"]==0:
  1597. for k,v in df_data.items():
  1598. v.append(_data.get(k))
  1599. import pandas as pd
  1600. df2 = pd.DataFrame(df_data)
  1601. df2.to_excel("check1.xlsx")
  1602. current_path = os.path.abspath(os.path.dirname(__file__))
  1603. def fixDoc_to_queue_init(filename=""):
  1604. import pandas as pd
  1605. from BaseDataMaintenance.model.oracle.GongGaoTemp import dict_oracle2ots
  1606. from BaseDataMaintenance.model.oracle.TouSuTemp import dict_oracle2ots as dict_oracle2ots_tousu
  1607. if filename=="":
  1608. filename = os.path.join(current_path,"check.xlsx")
  1609. df = pd.read_excel(filename)
  1610. if "docchannel" in dict_oracle2ots:
  1611. dict_oracle2ots.pop("docchannel")
  1612. row_name = ",".join(list(dict_oracle2ots.keys()))
  1613. list_tousu_keys = []
  1614. for k,v in dict_oracle2ots_tousu.items():
  1615. if str(k).isupper():
  1616. list_tousu_keys.append(k)
  1617. row_name_tousu = ",".join(list(list_tousu_keys))
  1618. conn = getConnection_oracle()
  1619. cursor = conn.cursor()
  1620. _count = 0
  1621. for uuid,tablename,_exists,_toolong in zip(df["uuid"],df["tablename"],df["exists"],df["tolong"]):
  1622. if _exists==0 and _toolong==0:
  1623. _count += 1
  1624. is_tousu = False
  1625. if tablename in ('bxkc.t_wei_fa_ji_lu_temp','bxkc.t_tou_su_chu_li_temp','bxkc.t_qi_ta_shi_xin_temp'):
  1626. is_tousu = True
  1627. _source = str(tablename).replace("_TEMP","")
  1628. if is_tousu:
  1629. _source = str(tablename).replace("_temp","")
  1630. _rowname = row_name_tousu if is_tousu else row_name
  1631. sql = " insert into %s(%s) select %s from %s where id='%s' "%(tablename,_rowname,_rowname,_source,uuid)
  1632. log("%d:%s"%(_count,sql))
  1633. cursor.execute(sql)
  1634. conn.commit()
  1635. conn.close()
  1636. return _count
  1637. if __name__ == '__main__':
  1638. # di = Dataflow_init()
  1639. # di.start_dataflow_init()
  1640. # transform_attachment()
  1641. # del_test_doc()
  1642. # de = Dataflow_ActivteMQ_extract()
  1643. # de.start_flow_extract()
  1644. # fixDoc_to_queue_extract()
  1645. # check_data_synchronization()
  1646. # fixDoc_to_queue_init(filename="C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-07.xlsx")
  1647. current_date = getCurrent_date(format="%Y-%m-%d")
  1648. last_date = timeAdd(current_date,-30,format="%Y-%m-%d")
  1649. sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
  1650. print(sql)