dataflow_mq.py 81 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821
  1. from BaseDataMaintenance.maintenance.dataflow import *
  2. from BaseDataMaintenance.common.activateMQUtils import *
  3. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ,getConnection_postgres,getConnection_mysql,getConnection_oracle,getConnect_ots_capacity,getConnect_redis_doc
  4. from BaseDataMaintenance.dataSource.setttings import *
  5. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  6. import os
  7. from BaseDataMaintenance.common.ossUtils import *
  8. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  9. from BaseDataMaintenance.model.ots.document import Document
  10. from BaseDataMaintenance.common.Utils import article_limit
  11. from BaseDataMaintenance.common.documentFingerprint import getFingerprint
  12. from BaseDataMaintenance.model.postgres.document_extract import *
  13. from BaseDataMaintenance.model.oracle.T_SHEN_PI_XIANG_MU import *
  14. import sys
  15. sys.setrecursionlimit(1000000)
  16. from multiprocessing import Process
  17. class ActiveMQListener():
  18. def __init__(self,conn,_queue,*args,**kwargs):
  19. self.conn = conn
  20. self._queue = _queue
  21. def on_error(self, headers):
  22. log("===============")
  23. log('received an error %s' % str(headers.body))
  24. def on_message(self, headers):
  25. log("====message====")
  26. message_id = headers.headers["message-id"]
  27. body = headers.body
  28. self._queue.put({"frame":headers,"conn":self.conn},True)
  29. def __del__(self):
  30. self.conn.disconnect()
  31. class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
  32. class AttachmentMQListener():
  33. def __init__(self,conn,_func,_idx,*args,**kwargs):
  34. self.conn = conn
  35. self._func = _func
  36. self._idx = _idx
  37. def on_error(self, headers):
  38. log("===============")
  39. log('received an error %s' % str(headers.body))
  40. def on_message(self, headers):
  41. try:
  42. log("get message of idx:%s"%(str(self._idx)))
  43. message_id = headers.headers["message-id"]
  44. body = headers.body
  45. _dict = {"frame":headers,"conn":self.conn,"idx":self._idx}
  46. self._func(_dict=_dict)
  47. except Exception as e:
  48. traceback.print_exc()
  49. pass
  50. def __del__(self):
  51. self.conn.disconnect()
  52. def __init__(self):
  53. Dataflow_attachment.__init__(self)
  54. self.mq_attachment = "/queue/dataflow_attachment"
  55. self.mq_attachment_failed = "/queue/dataflow_attachment_failed"
  56. self.mq_extract = "/queue/dataflow_extract"
  57. self.queue_attachment_ocr = Queue()
  58. self.queue_attachment_not_ocr = Queue()
  59. self.comsumer_count = 90
  60. self.comsumer_process_count = 5
  61. self.retry_comsumer_count = 10
  62. self.retry_times = 5
  63. self.list_attachment_comsumer = []
  64. # for _i in range(self.comsumer_count):
  65. # listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.queue_attachment)
  66. # createComsumer(listener_attachment,self.mq_attachment)
  67. # self.list_attachment_comsumer.append(listener_attachment)
  68. self.attach_pool = ConnectorPool(10,30,getConnection_postgres)
  69. self.redis_pool = ConnectorPool(10,30,getConnect_redis_doc)
  70. self.conn_mq = getConnect_activateMQ()
  71. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  72. self.session = None
  73. # for _ in range(self.comsumer_process_count):
  74. # listener_p = Process(target=self.start_attachment_listener)
  75. # listener_p.start()
  76. listener_p = Process(target=self.start_attachment_listener)
  77. listener_p.start()
  78. def start_attachment_listener(self):
  79. for _i in range(self.comsumer_count):
  80. listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
  81. createComsumer(listener_attachment,self.mq_attachment)
  82. self.list_attachment_comsumer.append(listener_attachment)
  83. while 1:
  84. for i in range(len(self.list_attachment_comsumer)):
  85. if self.list_attachment_comsumer[i].conn.is_connected():
  86. continue
  87. else:
  88. listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,i)
  89. createComsumer(listener,self.mq_attachment)
  90. self.list_attachment_comsumer[i] = listener
  91. time.sleep(5)
  92. def monitor_listener(self):
  93. for i in range(len(self.list_attachment_comsumer)):
  94. if self.list_attachment_comsumer[i].conn.is_connected():
  95. continue
  96. else:
  97. listener = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
  98. createComsumer(listener,self.mq_attachment)
  99. self.list_attachment_comsumer[i] = listener
  100. def process_failed_attachment(self):
  101. from BaseDataMaintenance.java.MQInfo import getQueueSize
  102. attachment_size = getQueueSize("dataflow_attachment")
  103. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  104. if attachment_size<100 and failed_attachment_size>0:
  105. list_comsumer = []
  106. for _i in range(self.retry_comsumer_count):
  107. listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
  108. list_comsumer.append(listener_attachment)
  109. createComsumer(listener_attachment,self.mq_attachment_failed)
  110. while 1:
  111. failed_attachment_size = getQueueSize("dataflow_attachment_failed")
  112. if failed_attachment_size==0:
  113. break
  114. time.sleep(10)
  115. for _c in list_comsumer:
  116. _c.conn.disconnect()
  117. def attachment_listener_handler(self,_dict):
  118. try:
  119. frame = _dict["frame"]
  120. conn = _dict["conn"]
  121. message_id = frame.headers["message-id"]
  122. item = json.loads(frame.body)
  123. _idx = _dict.get("idx",1)
  124. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  125. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  126. if random.random()<0.2:
  127. log("jump by random")
  128. if send_msg_toacmq(self.pool_mq,frame.body,self.mq_attachment):
  129. ackMsg(conn,message_id)
  130. return
  131. if len(page_attachments)==0:
  132. newitem ={"item":item,"list_attach":[],"message_id":message_id,"conn":conn}
  133. else:
  134. list_fileMd5 = []
  135. for _atta in page_attachments:
  136. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  137. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  138. newitem = {"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn}
  139. log("attachment get doc:%s"%(str(newitem.get("item",{}).get("docid"))))
  140. self.attachment_recognize(newitem,None)
  141. log("attachment get doc:%s succeed"%(str(newitem.get("item",{}).get("docid"))))
  142. except Exception as e:
  143. traceback.print_exc()
  144. def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
  145. try:
  146. list_html = []
  147. swf_urls = []
  148. _not_failed = True
  149. for _attach in list_attach:
  150. #测试全跑
  151. _filemd5 = _attach.getProperties().get(attachment_filemd5)
  152. if _attach.getProperties().get(attachment_status) in (ATTACHMENT_PROCESSED,ATTACHMENT_TOOLARGE):
  153. log("%s has processed or toolarge"%(_filemd5))
  154. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  155. if _html is None:
  156. _html = ""
  157. list_html.append({attachment_filemd5:_filemd5,
  158. "html":_html})
  159. else:
  160. #has process_time then jump
  161. if len(str(_attach.getProperties().get(attachment_process_time,"")))>10 and _attach.getProperties().get(attachment_status)!=ATTACHMENT_INIT and not (_attach.getProperties().get(attachment_status)>=ATTACHMENT_MC_FAILED_FROM and _attach.getProperties().get(attachment_status)<=ATTACHMENT_MC_FAILED_TO):
  162. log("%s has process_time jump"%(_filemd5))
  163. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  164. if _html is None:
  165. _html = ""
  166. list_html.append({attachment_filemd5:_filemd5,
  167. "html":_html})
  168. else:
  169. log("%s requesting interface"%(_filemd5))
  170. _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
  171. if not _succeed:
  172. _not_failed = False
  173. _html = _attach.getProperties().get(attachment_attachmenthtml,"")
  174. if _html is None:
  175. _html = ""
  176. list_html.append({attachment_filemd5:_filemd5,
  177. "html":_html})
  178. if _attach.getProperties().get(attachment_filetype)=="swf":
  179. # swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
  180. _swf_urls = _attach.getProperties().get(attachment_swfUrls, "[]")
  181. if _swf_urls:
  182. _swf_urls = _swf_urls.replace('\\', '')
  183. else:
  184. _swf_urls = '[]'
  185. _swf_urls = json.loads(_swf_urls)
  186. swf_urls.extend(_swf_urls)
  187. if not _not_failed:
  188. return False,list_html,swf_urls
  189. return True,list_html,swf_urls
  190. except requests.ConnectionError as e1:
  191. raise e1
  192. except Exception as e:
  193. return False,list_html,swf_urls
  194. def attachment_recognize(self,_dict,result_queue):
  195. '''
  196. 识别附件内容
  197. :param _dict: 附件内容
  198. :param result_queue:
  199. :return:
  200. '''
  201. try:
  202. item = _dict.get("item")
  203. list_attach = _dict.get("list_attach")
  204. conn = _dict["conn"]
  205. message_id = _dict.get("message_id")
  206. _retry_times = item.get("retry_times",0)
  207. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  208. "docid":item.get("docid")})
  209. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  210. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  211. dhtml.delete_bidi_a()
  212. dtmp = Document_tmp(item)
  213. start_time = time.time()
  214. #调用识别接口
  215. _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
  216. _to_ack = False
  217. if not _succeed and _retry_times<self.retry_times:
  218. item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
  219. item["retry_times"] = _retry_times+1
  220. #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
  221. if item["retry_times"]>=self.retry_times:
  222. send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
  223. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
  224. #失败保存
  225. if _retry_times==0:
  226. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  227. dtmp.setValue(document_tmp_status,0,True)
  228. if not dtmp.exists_row(self.ots_client):
  229. dtmp.update_row(self.ots_client)
  230. dhtml.update_row(self.ots_client)
  231. if send_succeed:
  232. _to_ack = True
  233. else:
  234. try:
  235. log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),str(len(list_html))))
  236. dhtml.updateSWFImages(swf_urls)
  237. dhtml.updateAttachment(list_html)
  238. dtmp.setValue(document_tmp_attachment_extract_status,1,True)
  239. dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
  240. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
  241. if send_succeed:
  242. _to_ack = True
  243. except Exception as e:
  244. traceback.print_exc()
  245. if _to_ack:
  246. ackMsg(conn,message_id)
  247. log("document:%d get attachments with result:%s %s retry_times:%d"%(item.get("docid"),str(_succeed),str(_to_ack),_retry_times))
  248. except Exception as e:
  249. traceback.print_exc()
  250. if time.time()-start_time<10:
  251. item["retry_times"] -= 1
  252. if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment):
  253. ackMsg(conn,message_id)
  254. def request_attachment_interface(self,attach,_dochtmlcon):
  255. filemd5 = attach.getProperties().get(attachment_filemd5)
  256. _status = attach.getProperties().get(attachment_status)
  257. _filetype = attach.getProperties().get(attachment_filetype)
  258. _path = attach.getProperties().get(attachment_path)
  259. _uuid = uuid4()
  260. objectPath = attach.getProperties().get(attachment_path)
  261. docids = attach.getProperties().get(attachment_docids)
  262. if objectPath is None:
  263. relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex)
  264. else:
  265. relative_path = objectPath[5:].replace("//","/")
  266. localpath = "/FileInfo/%s"%(relative_path)
  267. if not os.path.exists(localpath):
  268. if not os.path.exists(os.path.dirname(localpath)):
  269. os.makedirs(os.path.dirname(localpath))
  270. local_exists = False
  271. else:
  272. local_exists = True
  273. _size = os.path.getsize(localpath)
  274. not_failed_flag = True
  275. try:
  276. d_start_time = time.time()
  277. if not local_exists:
  278. log("md5:%s path:%s not exists,start downloading"%(filemd5,objectPath))
  279. try:
  280. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  281. except Exception as e:
  282. download_succeed = False
  283. else:
  284. log("md5:%s path:%s exists"%(filemd5,objectPath[5:]))
  285. if not (local_exists or download_succeed):
  286. _ots_attach = attachment(attach.getProperties_ots())
  287. _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_classification,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True)
  288. log("md5:%s path:%s file not in local or oss,search ots.attachment"%(filemd5,objectPath))
  289. if _ots_attach.getProperties().get(attachment_attachmenthtml,"")!="" and str(_ots_attach.getProperties().get(attachment_status))!=str(ATTACHMENT_INIT):
  290. attach.setValue(attachment_attachmenthtml,_ots_attach.getProperties().get(attachment_attachmenthtml,""))
  291. attach.setValue(attachment_attachmentcon,_ots_attach.getProperties().get(attachment_attachmentcon,""))
  292. attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,""))
  293. attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,""))
  294. attach.setValue(attachment_classification,_ots_attach.getProperties().get(attachment_classification,""))
  295. # if attach.exists(self.attach_pool):
  296. # attach.update_row(self.attach_pool)
  297. # else:
  298. # attach.insert_row(self.attach_pool)
  299. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  300. try:
  301. if os.exists(localpath):
  302. os.remove(localpath)
  303. except Exception as e:
  304. pass
  305. return True
  306. if _ots_exists:
  307. objectPath = attach.getProperties().get(attachment_path)
  308. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  309. if download_succeed:
  310. log("md5:%s path:%s download file from oss succeed"%(filemd5,objectPath))
  311. else:
  312. log("md5:%s path:%s download file from ots failed=="%(filemd5,objectPath))
  313. else:
  314. log("md5:%s path:%s not found in ots"%(filemd5,objectPath))
  315. if local_exists or download_succeed:
  316. _size = os.path.getsize(localpath)
  317. attach.setValue(attachment_size,_size,True)
  318. if _size>ATTACHMENT_LARGESIZE:
  319. attach.setValue(attachment_status, ATTACHMENT_TOOLARGE,True)
  320. log("attachment :%s of path:%s to large"%(filemd5,_path))
  321. _ots_attach = attachment(attach.getProperties_ots())
  322. _ots_attach.update_row(self.ots_client)
  323. # #更新postgres
  324. # if attach.exists(self.attach_pool):
  325. # attach.update_row(self.attach_pool)
  326. # else:
  327. # attach.insert_row(self.attach_pool)
  328. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  329. if local_exists:
  330. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  331. os.remove(localpath)
  332. return True
  333. time_download = time.time()-d_start_time
  334. #调用接口处理结果
  335. start_time = time.time()
  336. _filetype = attach.getProperties().get(attachment_filetype)
  337. # _data_base64 = base64.b64encode(open(localpath,"rb").read())
  338. # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
  339. _success,_html,swf_images,classification = getAttachDealInterface(None,_filetype,path=localpath,session=self.session)
  340. log("process filemd5:%s %s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
  341. if _success:
  342. if len(_html)<5:
  343. _html = ""
  344. else:
  345. if len(_html)>1:
  346. _html = "interface return error"
  347. else:
  348. sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
  349. _html = ""
  350. return False
  351. # 重跑swf时,删除原来的swf_urls中的"\"
  352. if attach.getProperties().get(attachment_filetype) == "swf":
  353. swf_urls = attach.getProperties().get(attachment_swfUrls, "[]")
  354. swf_urls = swf_urls.replace('\\', '') if swf_urls else '[]'
  355. swf_urls = json.loads(swf_urls)
  356. attach.setValue(attachment_swfUrls, json.dumps(swf_urls, ensure_ascii=False), True)
  357. swf_images = eval(swf_images)
  358. if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
  359. # swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
  360. if len(swf_urls)==0:
  361. objectPath = attach.getProperties().get(attachment_path,"")
  362. swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
  363. if not os.path.exists(swf_dir):
  364. os.mkdir(swf_dir)
  365. for _i in range(len(swf_images)):
  366. _base = swf_images[_i]
  367. _base = base64.b64decode(_base)
  368. filename = "swf_page_%d.png"%(_i)
  369. filepath = os.path.join(swf_dir,filename)
  370. with open(filepath,"wb") as f:
  371. f.write(_base)
  372. swf_urls = transformSWF(self.bucket,self.attachment_hub_url,objectPath,None,swf_dir)
  373. if os.path.exists(swf_dir):
  374. os.rmdir(swf_dir)
  375. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  376. else:
  377. attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
  378. if re.search("<td",_html) is not None:
  379. attach.setValue(attachment_has_table,1,True)
  380. _file_title = self.getTitleFromHtml(filemd5,_dochtmlcon)
  381. filelink = self.getSourceLinkFromHtml(filemd5,_dochtmlcon)
  382. if _file_title!="":
  383. attach.setValue(attachment_file_title,_file_title,True)
  384. if filelink!="":
  385. attach.setValue(attachment_file_link,filelink,True)
  386. attach.setValue(attachment_attachmenthtml,_html,True)
  387. attach.setValue(attachment_attachmentcon,BeautifulSoup(_html,"lxml").get_text(),True)
  388. attach.setValue(attachment_status,ATTACHMENT_PROCESSED,True)
  389. attach.setValue(attachment_recsize,len(_html),True)
  390. attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  391. attach.setValue(attachment_classification,classification,True)
  392. #更新ots
  393. _ots_attach = attachment(attach.getProperties_ots())
  394. _ots_attach.update_row(self.ots_client) #线上再开放更新
  395. # #更新postgres
  396. # if attach.exists(self.attach_pool):
  397. # attach.update_row(self.attach_pool)
  398. # else:
  399. # attach.insert_row(self.attach_pool)
  400. self.putAttach_json_toRedis(filemd5,attach.getProperties())
  401. if local_exists:
  402. upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
  403. try:
  404. if upload_status and os.exists(localpath):
  405. os.remove(localpath)
  406. except Exception as e:
  407. pass
  408. return True
  409. else:
  410. return True
  411. except requests.ConnectionError as e1:
  412. raise e1
  413. except oss2.exceptions.NotFound as e:
  414. return True
  415. except Exception as e:
  416. traceback.print_exc()
  417. # def flow_attachment(self):
  418. # self.flow_attachment_producer()
  419. # self.flow_attachment_producer_comsumer()
  420. def getAttachPath(self,filemd5,_dochtmlcon):
  421. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  422. list_mark = ["data","filelink"]
  423. for _mark in list_mark:
  424. _find = _soup.find("a",attrs={_mark:filemd5})
  425. filelink = ""
  426. if _find is None:
  427. _find = _soup.find("img",attrs={_mark:filemd5})
  428. if _find is not None:
  429. filelink = _find.attrs.get("src","")
  430. else:
  431. filelink = _find.attrs.get("href","")
  432. if filelink.find("bidizhaobiao")>=0:
  433. _path = filelink.split("/file")
  434. if len(_path)>1:
  435. return _path[1]
  436. def getAttach_json_fromRedis(self,filemd5):
  437. db = self.redis_pool.getConnector()
  438. try:
  439. _key = "attach-%s"%(filemd5)
  440. _attach_json = db.get(_key)
  441. return _attach_json
  442. except Exception as e:
  443. log("getAttach_json_fromRedis error %s"%(str(e)))
  444. finally:
  445. try:
  446. if db.connection.check_health():
  447. self.redis_pool.putConnector(db)
  448. except Exception as e:
  449. pass
  450. return None
  451. def putAttach_json_toRedis(self,filemd5,extract_dict):
  452. db = self.redis_pool.getConnector()
  453. try:
  454. new_dict = {}
  455. for k,v in extract_dict.items():
  456. if not isinstance(v,set):
  457. new_dict[k] = v
  458. _key = "attach-%s"%(filemd5)
  459. _extract_json = db.set(str(_key),json.dumps(new_dict))
  460. db.expire(_key,3600*3)
  461. return _extract_json
  462. except Exception as e:
  463. log("putExtract_json_toRedis error%s"%(str(e)))
  464. traceback.print_exc()
  465. finally:
  466. try:
  467. if db.connection.check_health():
  468. self.redis_pool.putConnector(db)
  469. except Exception as e:
  470. pass
  471. def getAttachments(self,list_filemd5,_dochtmlcon):
  472. conn = self.attach_pool.getConnector()
  473. #搜索postgres
  474. try:
  475. to_find_md5 = []
  476. for _filemd5 in list_filemd5[:50]:
  477. if _filemd5 is not None:
  478. to_find_md5.append(_filemd5)
  479. conditions = ["filemd5 in ('%s')"%("','".join(to_find_md5))]
  480. list_attachment = []
  481. set_md5 = set()
  482. # list_attachment = Attachment_postgres.select_rows(conn,Attachment_postgres,"attachment",conditions)
  483. # for _attach in list_attachment:
  484. # set_md5.add(_attach.getProperties().get(attachment_filemd5))
  485. for _filemd5 in to_find_md5:
  486. _json = self.getAttach_json_fromRedis(_filemd5)
  487. if _json is not None:
  488. set_md5.add(_filemd5)
  489. list_attachment.append(Attachment_postgres(json.loads(_json)))
  490. log("select localpath database %d/%d"%(len(set_md5),len(to_find_md5)))
  491. for _filemd5 in to_find_md5:
  492. if _filemd5 not in set_md5:
  493. _path = self.getAttachPath(_filemd5,_dochtmlcon)
  494. log("getAttachments search in ots:%s"%(_filemd5))
  495. _attach = {attachment_filemd5:_filemd5}
  496. _attach_ots = attachment(_attach)
  497. if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time],True):
  498. if _attach_ots.getProperties().get(attachment_status) is not None:
  499. log("getAttachments find in ots:%s"%(_filemd5))
  500. list_attachment.append(Attachment_postgres(_attach_ots.getProperties()))
  501. else:
  502. log("getAttachments search in path:%s"%(_filemd5))
  503. if _path:
  504. log("getAttachments find in path:%s"%(_filemd5))
  505. if _path[0]=="/":
  506. _path = _path[1:]
  507. _filetype = _path.split(".")[-1]
  508. _attach = {attachment_filemd5:_filemd5,
  509. attachment_filetype:_filetype,
  510. attachment_status:20,
  511. attachment_path:"%s/%s"%(_filemd5[:4],_path),
  512. attachment_crtime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  513. list_attachment.append(Attachment_postgres(_attach))
  514. return list_attachment
  515. except Exception as e:
  516. log("attachProcess comsumer error %s"%str(e))
  517. log(str(to_find_md5))
  518. traceback.print_exc()
  519. return []
  520. finally:
  521. self.attach_pool.putConnector(conn)
  522. def flow_attachment_producer(self,columns=[document_tmp_attachment_path,document_tmp_crtime]):
  523. q_size = self.queue_attachment.qsize()
  524. qsize_ocr = self.queue_attachment_ocr.qsize()
  525. qsize_not_ocr = self.queue_attachment_not_ocr.qsize()
  526. log("queue_attachment:%d,queue_attachment_ocr:%d,queue_attachment_not_ocr:%d"%(q_size,qsize_ocr,qsize_not_ocr))
  527. def flow_attachment_producer_comsumer(self):
  528. log("start flow_attachment comsumer")
  529. mt = MultiThreadHandler(self.queue_attachment,self.comsumer_handle,None,10,1,need_stop=False,restart=True)
  530. mt.run()
  531. def flow_attachment_process(self):
  532. self.process_comsumer()
  533. # p = Process(target = self.process_comsumer)
  534. # p.start()
  535. # p.join()
  536. def set_queue(self,_dict):
  537. list_attach = _dict.get("list_attach")
  538. to_ocr = False
  539. for attach in list_attach:
  540. if attach.getProperties().get(attachment_filetype) in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  541. to_ocr = True
  542. break
  543. if to_ocr:
  544. self.queue_attachment_ocr.put(_dict,True)
  545. else:
  546. self.queue_attachment_not_ocr.put(_dict,True)
  547. def comsumer_handle(self,_dict,result_queue):
  548. try:
  549. frame = _dict["frame"]
  550. conn = _dict["conn"]
  551. message_id = frame.headers["message-id"]
  552. item = json.loads(frame.body)
  553. page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
  554. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  555. if len(page_attachments)==0:
  556. self.set_queue({"item":item,"list_attach":[],"message_id":message_id,"conn":conn})
  557. else:
  558. list_fileMd5 = []
  559. for _atta in page_attachments:
  560. list_fileMd5.append(_atta.get(document_tmp_attachment_path_filemd5))
  561. list_attach = self.getAttachments(list_fileMd5,_dochtmlcon)
  562. self.set_queue({"item":item,"list_attach":list_attach,"message_id":message_id,"conn":conn})
  563. except Exception as e:
  564. traceback.print_exc()
  565. def remove_attachment_postgres(self):
  566. current_date = getCurrent_date(format="%Y-%m-%d")
  567. last_date = timeAdd(current_date,-2,format="%Y-%m-%d")
  568. sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
  569. conn = self.attach_pool.getConnector()
  570. try:
  571. cursor = conn.cursor()
  572. cursor.execute(sql)
  573. conn.commit()
  574. self.attach_pool.putConnector(conn)
  575. except Exception as e:
  576. conn.close()
  577. def start_flow_attachment(self):
  578. schedule = BlockingScheduler()
  579. # schedule.add_job(self.flow_attachment_process,"cron",second="*/20")
  580. # schedule.add_job(self.flow_attachment,"cron",second="*/10")
  581. # schedule.add_job(self.flow_attachment_producer,"cron",second="*/10")
  582. # schedule.add_job(self.flow_attachment_producer_comsumer,"cron",second="*/10")
  583. # schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  584. # schedule.add_job(self.monitor_attachment_process,"cron",second="*/10")
  585. # schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
  586. schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
  587. schedule.start()
  588. class Dataflow_ActivteMQ_extract(Dataflow_extract):
  589. class ExtractListener():
  590. def __init__(self,conn,_func,_idx,*args,**kwargs):
  591. self.conn = conn
  592. self._func = _func
  593. self._idx = _idx
  594. def on_message(self, headers):
  595. try:
  596. log("get message of idx:%d"%(self._idx))
  597. message_id = headers.headers["message-id"]
  598. body = headers.body
  599. log("get message %s crtime:%s"%(message_id,json.loads(body).get("crtime","")))
  600. self._func(_dict={"frame":headers,"conn":self.conn},result_queue=None)
  601. except Exception as e:
  602. traceback.print_exc()
  603. pass
  604. def on_error(self, headers):
  605. log('received an error %s' % str(headers.body))
  606. def __del__(self):
  607. self.conn.disconnect()
  608. def __init__(self,create_listener=True):
  609. Dataflow_extract.__init__(self)
  610. self.industy_url = "http://127.0.0.1:15000/industry_extract"
  611. self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",20],
  612. ["http://192.168.0.115:15030/content_extract",10]
  613. ]
  614. self.mq_extract = "/queue/dataflow_extract"
  615. self.mq_extract_failed = "/queue/dataflow_extract_failed"
  616. self.whole_weight = 0
  617. for _url,weight in self.extract_interfaces:
  618. self.whole_weight+= weight
  619. current_weight = 0
  620. for _i in range(len(self.extract_interfaces)):
  621. current_weight += self.extract_interfaces[_i][1]
  622. self.extract_interfaces[_i][1] = current_weight/self.whole_weight
  623. self.comsumer_count = 5
  624. # self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres)
  625. self.pool_redis_doc = ConnectorPool(1,self.comsumer_count,getConnect_redis_doc)
  626. self.conn_mq = getConnect_activateMQ()
  627. self.pool_mq = ConnectorPool(1,30,getConnect_activateMQ)
  628. self.block_url = RLock()
  629. self.url_count = 0
  630. self.session = None
  631. self.list_extract_comsumer = []
  632. # for _i in range(self.comsumer_count):
  633. # listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  634. # createComsumer(listener_extract,self.mq_extract)
  635. # self.list_extract_comsumer.append(listener_extract)
  636. if create_listener:
  637. for ii in range(10):
  638. listener_p = Process(target=self.start_extract_listener)
  639. listener_p.start()
  640. def start_extract_listener(self):
  641. self.list_extract_comsumer = []
  642. for _i in range(self.comsumer_count):
  643. listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  644. createComsumer(listener_extract,self.mq_extract)
  645. self.list_extract_comsumer.append(listener_extract)
  646. while 1:
  647. for _i in range(len(self.list_extract_comsumer)):
  648. if self.list_extract_comsumer[_i].conn.is_connected():
  649. continue
  650. else:
  651. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
  652. createComsumer(listener,self.mq_extract)
  653. self.list_extract_comsumer[_i] = listener
  654. time.sleep(5)
  655. def monitor_listener(self):
  656. for i in range(len(self.list_extract_comsumer)):
  657. if self.list_extract_comsumer[i].conn.is_connected():
  658. continue
  659. else:
  660. listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
  661. createComsumer(listener,self.mq_extract)
  662. self.list_extract_comsumer[i] = listener
  663. def getExtract_url(self):
  664. # _url_num = 0
  665. # with self.block_url:
  666. # self.url_count += 1
  667. # self.url_count %= self.whole_weight
  668. # _url_num = self.url_count
  669. _r = random.random()
  670. # _r = _url_num/self.whole_weight
  671. for _i in range(len(self.extract_interfaces)):
  672. if _r<=self.extract_interfaces[_i][1]:
  673. return self.extract_interfaces[_i][0]
  674. def request_extract_interface(self,json,headers):
  675. # _i = random.randint(0,len(self.extract_interfaces)-1)
  676. # _i = 0
  677. # _url = self.extract_interfaces[_i]
  678. _url = self.getExtract_url()
  679. log("extract_url:%s"%(str(_url)))
  680. with requests.Session() as session:
  681. resp = session.post(_url,json=json,headers=headers,timeout=10*60)
  682. return resp
  683. def request_industry_interface(self,json,headers):
  684. resp = requests.post(self.industy_url,json=json,headers=headers)
  685. return resp
  686. def flow_extract_producer(self,columns=[document_tmp_page_time,document_tmp_doctitle,document_tmp_docchannel,document_tmp_status,document_tmp_original_docchannel,document_tmp_web_source_no]):
  687. q_size = self.queue_extract.qsize()
  688. log("queue extract size:%d"%(q_size))
  689. def process_extract_failed(self):
  690. def _handle(_dict,result_queue):
  691. frame = _dict.get("frame")
  692. message_id = frame.headers["message-id"]
  693. subscription = frame.headers.setdefault('subscription', None)
  694. conn = _dict.get("conn")
  695. body = frame.body
  696. if body is not None:
  697. item = json.loads(body)
  698. item["extract_times"] = 10
  699. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  700. ackMsg(conn,message_id,subscription)
  701. from BaseDataMaintenance.java.MQInfo import getQueueSize
  702. try:
  703. extract_failed_size = getQueueSize("dataflow_extract_failed")
  704. extract_size = getQueueSize("dataflow_extract")
  705. log("extract_failed_size %s extract_size %s"%(str(extract_failed_size),str(extract_size)))
  706. if extract_failed_size>0 and extract_size<100:
  707. failed_listener = self.ExtractListener(getConnect_activateMQ(),_handle,1)
  708. createComsumer(failed_listener,self.mq_extract_failed)
  709. while 1:
  710. extract_failed_size = getQueueSize("dataflow_extract_failed")
  711. if extract_failed_size==0:
  712. break
  713. time.sleep(10)
  714. failed_listener.conn.disconnect()
  715. except Exception as e:
  716. traceback.print_exc()
  717. def flow_extract(self,):
  718. self.comsumer()
  719. def comsumer(self):
  720. mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,20,1,True)
  721. mt.run()
  722. def getExtract_json_fromDB(self,_fingerprint):
  723. conn = self.pool_postgres.getConnector()
  724. try:
  725. list_extract = Document_extract_postgres.select_rows(conn,Document_extract_postgres,"document_extract",[" fingerprint='%s'"%_fingerprint])
  726. if len(list_extract)>0:
  727. _extract = list_extract[0]
  728. return _extract.getProperties().get(document_extract_extract_json)
  729. except Exception as e:
  730. traceback.print_exc()
  731. finally:
  732. self.pool_postgres.putConnector(conn)
  733. return None
  734. def putExtract_json_toDB(self,fingerprint,docid,extract_json):
  735. _de = Document_extract_postgres({document_extract_fingerprint:fingerprint,
  736. document_extract_docid:docid,
  737. document_extract_extract_json:extract_json})
  738. _de.insert_row(self.pool_postgres,1)
  739. def getExtract_json_fromRedis(self,_fingerprint):
  740. db = self.pool_redis_doc.getConnector()
  741. try:
  742. _extract_json = db.get(_fingerprint)
  743. return _extract_json
  744. except Exception as e:
  745. log("getExtract_json_fromRedis error %s"%(str(e)))
  746. finally:
  747. try:
  748. if db.connection.check_health():
  749. self.pool_redis_doc.putConnector(db)
  750. except Exception as e:
  751. pass
  752. return None
  753. def putExtract_json_toRedis(self,fingerprint,extract_json):
  754. db = self.pool_redis_doc.getConnector()
  755. try:
  756. _extract_json = db.set(str(fingerprint),extract_json)
  757. db.expire(fingerprint,3600*2)
  758. return _extract_json
  759. except Exception as e:
  760. log("putExtract_json_toRedis error%s"%(str(e)))
  761. traceback.print_exc()
  762. finally:
  763. try:
  764. if db.connection.check_health():
  765. self.pool_redis_doc.putConnector(db)
  766. except Exception as e:
  767. pass
  768. def comsumer_handle(self,_dict,result_queue):
  769. try:
  770. log("start handle")
  771. frame = _dict["frame"]
  772. conn = _dict["conn"]
  773. message_id = frame.headers["message-id"]
  774. subscription = frame.headers.setdefault('subscription', None)
  775. item = json.loads(frame.body)
  776. dtmp = Document_tmp(item)
  777. dhtml = Document_html({"partitionkey":item.get("partitionkey"),
  778. "docid":item.get("docid")})
  779. extract_times = item.get("extract_times",0)+1
  780. item["extract_times"] = extract_times
  781. _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
  782. html_len = len(_dochtmlcon)
  783. if html_len>50000:
  784. log("docid %s dochtmlcon too long len %d "%(str(item.get("docid")),html_len))
  785. try:
  786. _dochtmlcon = re.sub("<html>|</html>|<body>|</body>", "", _dochtmlcon)
  787. _soup = BeautifulSoup(_dochtmlcon,"html5lib")
  788. if len(_dochtmlcon)>200000:
  789. _find = _soup.find("div",attrs={"class":"richTextFetch"})
  790. if _find is not None:
  791. _find.decompose()
  792. else:
  793. _soup = article_limit(_soup,50000)
  794. _dochtmlcon = str(_soup)
  795. except Exception as e:
  796. traceback.print_exc()
  797. ackMsg(conn,message_id,subscription)
  798. return
  799. log("docid %s len %d limit to %d"%(str(item.get("docid")),html_len,len(_dochtmlcon)))
  800. dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  801. _extract = Document_extract({})
  802. _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
  803. _extract.setValue(document_extract2_docid,item.get(document_docid))
  804. all_done = 1
  805. data = {}
  806. for k,v in item.items():
  807. data[k] = v
  808. data["timeout"] = 440
  809. data["doc_id"] = data.get(document_tmp_docid,0)
  810. # if data["docid"]<298986054 and data["docid"]>0:
  811. # log("jump docid %s"%(str(data["docid"])))
  812. # ackMsg(conn,message_id,subscription)
  813. # return
  814. data["content"] = data.get(document_tmp_dochtmlcon,"")
  815. if document_tmp_dochtmlcon in data:
  816. data.pop(document_tmp_dochtmlcon)
  817. data["title"] = data.get(document_tmp_doctitle,"")
  818. data["web_source_no"] = item.get(document_tmp_web_source_no,"")
  819. data["web_source_name"] = item.get(document_tmp_web_source_name,"")
  820. data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
  821. _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))
  822. if all_done>0:
  823. _time = time.time()
  824. # extract_json = self.getExtract_json_fromDB(_fingerprint)
  825. extract_json = self.getExtract_json_fromRedis(_fingerprint)
  826. log("get json from db takes %.4f"%(time.time()-_time))
  827. # extract_json = None
  828. _docid = int(data["doc_id"])
  829. if extract_json is not None:
  830. log("fingerprint %s exists docid:%s"%(_fingerprint,str(_docid)))
  831. _extract.setValue(document_extract2_extract_json,extract_json,True)
  832. else:
  833. resp = self.request_extract_interface(json=data,headers=self.header)
  834. if (resp.status_code >=200 and resp.status_code<=213):
  835. extract_json = resp.content.decode("utf8")
  836. _extract.setValue(document_extract2_extract_json,extract_json,True)
  837. _time = time.time()
  838. # self.putExtract_json_toDB(_fingerprint,_docid,extract_json)
  839. self.putExtract_json_toRedis(_fingerprint,extract_json)
  840. log("get json to db takes %.4f"%(time.time()-_time))
  841. else:
  842. log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  843. all_done = -2
  844. # if all_done>0:
  845. # resp = self.request_industry_interface(json=data,headers=self.header)
  846. # if (resp.status_code >=200 and resp.status_code<=213):
  847. # _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
  848. # else:
  849. # log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
  850. # all_done = -3
  851. # _to_ack = False
  852. # if all_done>0 and len(_extract.getProperties().get(document_extract2_extract_json,""))<=2:
  853. # all_done = -4
  854. _extract.setValue(document_extract2_industry_json,"{}",True)
  855. try:
  856. if all_done!=1:
  857. sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
  858. if extract_times>=10:
  859. #process as succeed
  860. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  861. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  862. dtmp.update_row(self.ots_client)
  863. dhtml.update_row(self.ots_client)
  864. #replace as {}
  865. _extract.setValue(document_extract2_extract_json,"{}",True)
  866. _extract.setValue(document_extract2_industry_json,"{}",True)
  867. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  868. _extract.update_row(self.ots_client)
  869. _to_ack = True
  870. elif extract_times>5:
  871. #transform to the extract_failed queue
  872. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  873. #process as succeed
  874. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  875. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  876. dtmp.update_row(self.ots_client)
  877. dhtml.update_row(self.ots_client)
  878. #replace as {}
  879. _extract.setValue(document_extract2_extract_json,"{}",True)
  880. _extract.setValue(document_extract2_industry_json,"{}",True)
  881. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  882. _extract.update_row(self.ots_client)
  883. _to_ack = True
  884. else:
  885. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
  886. #失败保存
  887. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  888. dtmp.setValue(document_tmp_status,60,True)
  889. if not dtmp.exists_row(self.ots_client):
  890. dtmp.update_row(self.ots_client)
  891. dhtml.update_row(self.ots_client)
  892. if send_succeed:
  893. _to_ack = True
  894. else:
  895. #process succeed
  896. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  897. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  898. if _docid==290816703:
  899. dtmp.setValue("test_json",extract_json,True)
  900. dtmp.update_row(self.ots_client)
  901. dhtml.update_row(self.ots_client)
  902. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  903. _extract.update_row(self.ots_client)
  904. _to_ack = True
  905. except Exception:
  906. traceback.print_exc()
  907. if _to_ack:
  908. ackMsg(conn,message_id,subscription)
  909. log("process %s docid:%d %s"%(str(_to_ack),data["doc_id"],str(all_done)))
  910. except requests.ConnectionError as e1:
  911. item["extract_times"] -= 1
  912. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  913. ackMsg(conn,message_id,subscription)
  914. except Exception as e:
  915. traceback.print_exc()
  916. sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
  917. log("process %s docid: failed message_id:%s"%(data["doc_id"],message_id))
  918. if extract_times>=10:
  919. #process as succeed
  920. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  921. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  922. dtmp.update_row(self.ots_client)
  923. dhtml.update_row(self.ots_client)
  924. #replace as {}
  925. _extract.setValue(document_extract2_extract_json,"{}",True)
  926. _extract.setValue(document_extract2_industry_json,"{}",True)
  927. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  928. _extract.update_row(self.ots_client)
  929. ackMsg(conn,message_id,subscription)
  930. elif extract_times>5:
  931. #transform to the extract_failed queue
  932. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
  933. #process as succeed
  934. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  935. dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
  936. dtmp.update_row(self.ots_client)
  937. dhtml.update_row(self.ots_client)
  938. #replace as {}
  939. _extract.setValue(document_extract2_extract_json,"{}",True)
  940. _extract.setValue(document_extract2_industry_json,"{}",True)
  941. _extract.setValue(document_extract2_status,random.randint(1,50),True)
  942. _extract.update_row(self.ots_client)
  943. ackMsg(conn,message_id,subscription)
  944. else:
  945. #transform to the extract queue
  946. #失败保存
  947. dtmp.setValue(document_tmp_dochtmlcon,"",False)
  948. dtmp.setValue(document_tmp_status,60,True)
  949. if not dtmp.exists_row(self.ots_client):
  950. dtmp.update_row(self.ots_client)
  951. dhtml.update_row(self.ots_client)
  952. if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
  953. ackMsg(conn,message_id,subscription)
  954. def delete_document_extract(self,save_count=70*10000):
  955. conn = self.pool_postgres.getConnector()
  956. try:
  957. cursor = conn.cursor()
  958. sql = " select max(docid),min(docid) from document_extract "
  959. cursor.execute(sql)
  960. rows = cursor.fetchall()
  961. if len(rows)>0:
  962. maxdocid,mindocid = rows[0]
  963. d_mindocid = int(maxdocid)-save_count
  964. if mindocid<d_mindocid:
  965. sql = " delete from document_extract where docid<%d"%d_mindocid
  966. cursor.execute(sql)
  967. conn.commit()
  968. except Exception as e:
  969. traceback.print_exc()
  970. finally:
  971. self.pool_postgres.putConnector(conn)
  972. def start_flow_extract(self):
  973. schedule = BlockingScheduler()
  974. schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
  975. schedule.add_job(self.process_extract_failed,"cron",minute="*/5")
  976. # schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
  977. # schedule.add_job(self.monitor_listener,"cron",minute="*/5")
  978. schedule.start()
  979. from multiprocessing import RLock
  980. docid_lock = RLock()
  981. conn_mysql = None
  982. def generateRangeDocid(nums):
  983. global conn_mysql
  984. while 1:
  985. try:
  986. with docid_lock:
  987. if conn_mysql is None:
  988. conn_mysql = getConnection_mysql()
  989. cursor = conn_mysql.cursor()
  990. sql = "select serial_value from b2c_serial_no where serial_name='DocumentIdSerial'"
  991. cursor.execute(sql)
  992. rows = cursor.fetchall()
  993. current_docid = rows[0][0]
  994. next_docid = current_docid+1
  995. update_docid = current_docid+nums
  996. sql = " update b2c_serial_no set serial_value=%d where serial_name='DocumentIdSerial'"%(update_docid)
  997. cursor.execute(sql)
  998. conn_mysql.commit()
  999. return next_docid
  1000. except Exception as e:
  1001. conn_mysql = getConnection_mysql()
  1002. # 自定义jsonEncoder
  1003. class MyEncoder(json.JSONEncoder):
  1004. def default(self, obj):
  1005. if isinstance(obj, np.ndarray):
  1006. return obj.tolist()
  1007. elif isinstance(obj, bytes):
  1008. return str(obj, encoding='utf-8')
  1009. elif isinstance(obj, (np.float_, np.float16, np.float32,
  1010. np.float64,Decimal)):
  1011. return float(obj)
  1012. elif isinstance(obj,str):
  1013. return obj
  1014. return json.JSONEncoder.default(self, obj)
  1015. class Dataflow_init(Dataflow):
  1016. class InitListener():
  1017. def __init__(self,conn,*args,**kwargs):
  1018. self.conn = conn
  1019. self.get_count = 1000
  1020. self.count = self.get_count
  1021. self.begin_docid = None
  1022. self.mq_init = "/queue/dataflow_init"
  1023. self.mq_attachment = "/queue/dataflow_attachment"
  1024. self.mq_extract = "/queue/dataflow_extract"
  1025. self.pool_mq1 = ConnectorPool(1,4,getConnect_activateMQ)
  1026. def on_error(self, headers):
  1027. log('received an error %s' % headers.body)
  1028. def getRangeDocid(self):
  1029. begin_docid = generateRangeDocid(self.get_count)
  1030. self.begin_docid = begin_docid
  1031. self.count = 0
  1032. def getNextDocid(self):
  1033. if self.count>=self.get_count:
  1034. self.getRangeDocid()
  1035. next_docid = self.begin_docid+self.count
  1036. self.count += 1
  1037. return next_docid
  1038. def on_message(self, headers):
  1039. try:
  1040. next_docid = int(self.getNextDocid())
  1041. partitionkey = int(next_docid%500+1)
  1042. message_id = headers.headers["message-id"]
  1043. body = json.loads(headers.body)
  1044. body[document_tmp_partitionkey] = partitionkey
  1045. body[document_tmp_docid] = next_docid
  1046. if body.get(document_original_docchannel) is None:
  1047. body[document_original_docchannel] = body.get(document_docchannel)
  1048. page_attachments = body.get(document_tmp_attachment_path,"[]")
  1049. _uuid = body.get(document_tmp_uuid,"")
  1050. if page_attachments!="[]":
  1051. status = random.randint(1,10)
  1052. body[document_tmp_status] = status
  1053. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment):
  1054. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  1055. ackMsg(self.conn,message_id)
  1056. else:
  1057. log("send_msg_error on init listener")
  1058. else:
  1059. status = random.randint(11,50)
  1060. body[document_tmp_status] = status
  1061. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract):
  1062. log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
  1063. ackMsg(self.conn,message_id)
  1064. else:
  1065. log("send_msg_error on init listener")
  1066. except Exception as e:
  1067. traceback.print_exc()
  1068. if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_init):
  1069. log("init error")
  1070. ackMsg(self.conn,message_id)
  1071. def __del__(self):
  1072. self.conn.disconnect()
  1073. del self.pool_mq1
  1074. def __init__(self):
  1075. Dataflow.__init__(self)
  1076. self.max_shenpi_id = None
  1077. self.base_shenpi_id = 400000000000
  1078. self.mq_init = "/queue/dataflow_init"
  1079. self.mq_attachment = "/queue/dataflow_attachment"
  1080. self.mq_extract = "/queue/dataflow_extract"
  1081. self.pool_oracle = ConnectorPool(10,15,getConnection_oracle)
  1082. self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
  1083. self.ots_capacity = getConnect_ots_capacity()
  1084. self.init_comsumer_counts = 2
  1085. self.list_init_comsumer = []
  1086. for i in range(self.init_comsumer_counts):
  1087. listener = self.InitListener(getConnect_activateMQ())
  1088. createComsumer(listener,self.mq_init)
  1089. self.list_init_comsumer.append(listener)
  1090. def monitor_listener(self):
  1091. for i in range(len(self.list_init_comsumer)):
  1092. if self.list_init_comsumer[i].conn.is_connected():
  1093. continue
  1094. else:
  1095. listener = self.InitListener(getConnect_activateMQ())
  1096. createComsumer(listener,self.mq_init)
  1097. self.list_init_comsumer[i] = listener
  1098. def temp2mq(self,object):
  1099. conn_oracle = self.pool_oracle.getConnector()
  1100. try:
  1101. list_obj = object.select_rows(conn_oracle,type(object),object.table_name,[],limit=1000)
  1102. for _obj in list_obj:
  1103. ots_dict = _obj.getProperties_ots()
  1104. if len(ots_dict.get("dochtmlcon",""))>500000:
  1105. _obj.delete_row(conn_oracle)
  1106. log("msg too long:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  1107. continue
  1108. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_init):
  1109. #删除数据,上线放开
  1110. _obj.delete_row(conn_oracle)
  1111. else:
  1112. log("send_msg_error111:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
  1113. self.pool_oracle.putConnector(conn_oracle)
  1114. except Exception as e:
  1115. traceback.print_exc()
  1116. self.pool_oracle.decrease()
  1117. def shengpi2mq(self):
  1118. conn_oracle = self.pool_oracle.getConnector()
  1119. try:
  1120. if self.max_shenpi_id is None:
  1121. # get the max_shenpi_id
  1122. _query = BoolQuery(must_queries=[ExistsQuery("id")])
  1123. rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
  1124. SearchQuery(_query,sort=Sort(sorters=[FieldSort("id",SortOrder.DESC)]),limit=1))
  1125. list_data = getRow_ots(rows)
  1126. if len(list_data)>0:
  1127. max_shenpi_id = list_data[0].get("id")
  1128. if max_shenpi_id>self.base_shenpi_id:
  1129. max_shenpi_id -= self.base_shenpi_id
  1130. self.max_shenpi_id = max_shenpi_id
  1131. if self.max_shenpi_id is not None:
  1132. # select data in order
  1133. list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,self.max_shenpi_id,)
  1134. # send data to mq one by one with max_shenpi_id updated
  1135. for _data in list_data:
  1136. _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
  1137. ots_dict = _data.getProperties_ots()
  1138. if ots_dict["docid"]<self.base_shenpi_id:
  1139. ots_dict["docid"] += self.base_shenpi_id
  1140. if ots_dict.get(T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS,"") !='[]':
  1141. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_attachment):
  1142. self.max_shenpi_id = _id
  1143. else:
  1144. log("sent shenpi message to mq failed %s"%(_id))
  1145. break
  1146. else:
  1147. if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_extract):
  1148. self.max_shenpi_id = _id
  1149. else:
  1150. log("sent shenpi message to mq failed %s"%(_id))
  1151. break
  1152. except Exception as e:
  1153. traceback.print_exc()
  1154. self.pool_oracle.decrease()
  1155. def ots2mq(self):
  1156. try:
  1157. bool_query = BoolQuery(must_queries=[RangeQuery("status",1,51)])
  1158. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1159. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  1160. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1161. list_data = getRow_ots(rows)
  1162. for _data in list_data:
  1163. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1164. document_tmp_docid:_data.get(document_tmp_docid),
  1165. document_tmp_status:0}
  1166. _document = Document(_d)
  1167. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1168. _document_html = Document(_data)
  1169. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  1170. if page_attachments!="[]":
  1171. status = random.randint(1,10)
  1172. _data[document_tmp_status] = status
  1173. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1174. else:
  1175. status = random.randint(11,50)
  1176. _data[document_tmp_status] = status
  1177. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1178. if send_succeed:
  1179. _document.update_row(self.ots_client)
  1180. else:
  1181. log("send_msg_error2222")
  1182. while next_token:
  1183. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  1184. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1185. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1186. list_data = getRow_ots(rows)
  1187. for _data in list_data:
  1188. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1189. document_tmp_docid:_data.get(document_tmp_docid),
  1190. document_tmp_status:0}
  1191. _document = Document(_d)
  1192. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1193. _document_html = Document(_data)
  1194. _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
  1195. if page_attachments!="[]":
  1196. status = random.randint(1,10)
  1197. _data[document_tmp_status] = status
  1198. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1199. else:
  1200. status = random.randint(11,50)
  1201. _data[document_tmp_status] = status
  1202. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1203. if send_succeed:
  1204. _document.update_row(self.ots_client)
  1205. else:
  1206. log("send_msg_error2222")
  1207. except Exception as e:
  1208. traceback.print_exc()
  1209. def otstmp2mq(self):
  1210. try:
  1211. bool_query = BoolQuery(must_queries=[TermQuery("status",0)])
  1212. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1213. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
  1214. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1215. list_data = getRow_ots(rows)
  1216. for _data in list_data:
  1217. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1218. document_tmp_docid:_data.get(document_tmp_docid),
  1219. document_tmp_status:0}
  1220. _document = Document_tmp(_d)
  1221. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1222. _document_html = Document_html(_data)
  1223. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  1224. if page_attachments!="[]":
  1225. status = random.randint(1,10)
  1226. _data[document_tmp_status] = status
  1227. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1228. else:
  1229. status = random.randint(11,50)
  1230. _data[document_tmp_status] = status
  1231. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1232. if send_succeed:
  1233. _document.setValue(document_tmp_status,1,True)
  1234. _document.update_row(self.ots_client)
  1235. else:
  1236. log("send_msg_error2222")
  1237. while next_token:
  1238. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  1239. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1240. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1241. list_data = getRow_ots(rows)
  1242. for _data in list_data:
  1243. _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
  1244. document_tmp_docid:_data.get(document_tmp_docid),
  1245. document_tmp_status:0}
  1246. _document = Document_tmp(_d)
  1247. page_attachments = _data.get(document_tmp_attachment_path,"[]")
  1248. _document_html = Document_html(_data)
  1249. _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
  1250. if page_attachments!="[]":
  1251. status = random.randint(1,10)
  1252. _data[document_tmp_status] = status
  1253. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
  1254. else:
  1255. status = random.randint(11,50)
  1256. _data[document_tmp_status] = status
  1257. send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
  1258. if send_succeed:
  1259. _document.setValue(document_tmp_status,1,True)
  1260. _document.update_row(self.ots_client)
  1261. else:
  1262. log("send_msg_error2222")
  1263. except Exception as e:
  1264. traceback.print_exc()
  1265. def test_dump_docid(self):
  1266. class TestDumpListener(ActiveMQListener):
  1267. def on_message(self, headers):
  1268. message_id = headers.headers["message-id"]
  1269. body = headers.body
  1270. self._queue.put(headers,True)
  1271. ackMsg(self.conn,message_id)
  1272. _queue = Queue()
  1273. listener1 = TestDumpListener(getConnect_activateMQ(),_queue)
  1274. listener2 = TestDumpListener(getConnect_activateMQ(),_queue)
  1275. createComsumer(listener1,"/queue/dataflow_attachment")
  1276. createComsumer(listener2,"/queue/dataflow_extract")
  1277. time.sleep(10)
  1278. list_item = []
  1279. list_docid = []
  1280. while 1:
  1281. try:
  1282. _item = _queue.get(timeout=2)
  1283. list_item.append(_item)
  1284. except Exception as e:
  1285. break
  1286. for item in list_item:
  1287. _item = json.loads(item.body)
  1288. list_docid.append(_item.get("docid"))
  1289. log(list_docid[:10])
  1290. log("len docid:%d set len:%d"%(len(list_docid),len(set(list_docid))))
  1291. def start_dataflow_init(self):
  1292. # self.test_dump_docid()
  1293. from BaseDataMaintenance.model.oracle.CaiGouYiXiangTemp import CaiGouYiXiangTemp
  1294. from BaseDataMaintenance.model.oracle.PaiMaiChuRangTemp import PaiMaiChuRangTemp
  1295. from BaseDataMaintenance.model.oracle.ZhaoBiaoGongGaoTemp import ZhaoBiaoGongGaoTemp
  1296. from BaseDataMaintenance.model.oracle.ZhaoBiaoYuGaoTemp import ZhaoBiaoYuGaoTemp
  1297. from BaseDataMaintenance.model.oracle.ZhongBiaoXinXiTemp import ZhongBiaoXinXiTemp
  1298. from BaseDataMaintenance.model.oracle.ZiShenJieGuoTemp import ZiShenJieGuoTemp
  1299. from BaseDataMaintenance.model.oracle.ChanQuanJiaoYiTemp import ChanQuanJiaoYiTemp
  1300. from BaseDataMaintenance.model.oracle.GongGaoBianGengTemp import GongGaoBianGeng
  1301. from BaseDataMaintenance.model.oracle.KongZhiJiaTemp import KongZhiJiaTemp
  1302. from BaseDataMaintenance.model.oracle.TuDiKuangChanTemp import TuDiKuangChanTemp
  1303. from BaseDataMaintenance.model.oracle.ZhaoBiaoDaYiTemp import ZhaoBiaoDaYiTemp
  1304. from BaseDataMaintenance.model.oracle.ZhaoBiaoWenJianTemp import ZhaoBiaoWenJianTemp
  1305. schedule = BlockingScheduler()
  1306. schedule.add_job(self.temp2mq,"cron",args=(CaiGouYiXiangTemp({}),),second="*/10")
  1307. schedule.add_job(self.temp2mq,"cron",args=(PaiMaiChuRangTemp({}),),second="*/10")
  1308. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoGongGaoTemp({}),),second="*/10")
  1309. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoYuGaoTemp({}),),second="*/10")
  1310. schedule.add_job(self.temp2mq,"cron",args=(ZhongBiaoXinXiTemp({}),),second="*/10")
  1311. schedule.add_job(self.temp2mq,"cron",args=(ZiShenJieGuoTemp({}),),second="*/10")
  1312. schedule.add_job(self.temp2mq,"cron",args=(ChanQuanJiaoYiTemp({}),),second="*/10")
  1313. schedule.add_job(self.temp2mq,"cron",args=(GongGaoBianGeng({}),),second="*/10")
  1314. schedule.add_job(self.temp2mq,"cron",args=(KongZhiJiaTemp({}),),second="*/10")
  1315. schedule.add_job(self.temp2mq,"cron",args=(TuDiKuangChanTemp({}),),second="*/10")
  1316. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10")
  1317. schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10")
  1318. schedule.add_job(self.ots2mq,"cron",second="*/10")
  1319. schedule.add_job(self.otstmp2mq,"cron",second="*/10")
  1320. schedule.add_job(self.monitor_listener,"cron",minute="*/1")
  1321. schedule.start()
  1322. def transform_attachment():
  1323. from BaseDataMaintenance.model.ots.attachment import attachment
  1324. from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
  1325. from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots
  1326. from threading import Thread
  1327. from queue import Queue
  1328. task_queue = Queue()
  1329. def comsumer(task_queue,pool_postgres):
  1330. while 1:
  1331. _dict = task_queue.get(True)
  1332. try:
  1333. attach = Attachment_postgres(_dict)
  1334. if not attach.exists(pool_postgres):
  1335. attach.insert_row(pool_postgres)
  1336. except Exception as e:
  1337. traceback.print_exc()
  1338. def start_comsumer(task_queue):
  1339. pool_postgres = ConnectorPool(10,30,getConnection_postgres)
  1340. comsumer_count = 30
  1341. list_thread = []
  1342. for i in range(comsumer_count):
  1343. _t = Thread(target=comsumer,args=(task_queue,pool_postgres))
  1344. list_thread.append(_t)
  1345. for _t in list_thread:
  1346. _t.start()
  1347. ots_client = getConnect_ots()
  1348. _thread = Thread(target=start_comsumer,args=(task_queue,))
  1349. _thread.start()
  1350. bool_query = BoolQuery(must_queries=[
  1351. RangeQuery(attachment_crtime,"2022-05-06"),
  1352. BoolQuery(should_queries=[TermQuery(attachment_status,10),
  1353. TermQuery(attachment_status,ATTACHMENT_TOOLARGE)])
  1354. ])
  1355. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  1356. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(attachment_crtime,sort_order=SortOrder.DESC)]),get_total_count=True,limit=100),
  1357. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1358. list_dict = getRow_ots(rows)
  1359. for _dict in list_dict:
  1360. task_queue.put(_dict,True)
  1361. _count = len(list_dict)
  1362. while next_token:
  1363. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  1364. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1365. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1366. list_dict = getRow_ots(rows)
  1367. for _dict in list_dict:
  1368. task_queue.put(_dict,True)
  1369. _count += len(list_dict)
  1370. print("%d/%d,queue:%d"%(_count,total_count,task_queue.qsize()))
  1371. def del_test_doc():
  1372. ots_client = getConnect_ots()
  1373. bool_query = BoolQuery(must_queries=[RangeQuery("docid",range_to=0)])
  1374. list_data = []
  1375. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1376. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  1377. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1378. print(total_count)
  1379. list_row = getRow_ots(rows)
  1380. list_data.extend(list_row)
  1381. while next_token:
  1382. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  1383. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1384. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1385. list_row = getRow_ots(rows)
  1386. list_data.extend(list_row)
  1387. for _row in list_data:
  1388. _doc = Document_tmp(_row)
  1389. _doc.delete_row(ots_client)
  1390. _html = Document_html(_row)
  1391. if _html.exists_row(ots_client):
  1392. _html.delete_row(ots_client)
  1393. def fixDoc_to_queue_extract():
  1394. pool_mq = ConnectorPool(10,20,getConnect_activateMQ)
  1395. try:
  1396. ots_client = getConnect_ots()
  1397. ots_capacity = getConnect_ots_capacity()
  1398. bool_query = BoolQuery(must_queries=[
  1399. RangeQuery("crtime","2022-05-31"),
  1400. TermQuery("docchannel",114)
  1401. ])
  1402. list_data = []
  1403. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1404. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  1405. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1406. print(total_count)
  1407. list_row = getRow_ots(rows)
  1408. list_data.extend(list_row)
  1409. _count = len(list_row)
  1410. while next_token:
  1411. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1412. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1413. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  1414. list_row = getRow_ots(rows)
  1415. list_data.extend(list_row)
  1416. _count = len(list_data)
  1417. print("%d/%d"%(_count,total_count))
  1418. task_queue = Queue()
  1419. for _row in list_data:
  1420. if "all_columns" in _row:
  1421. _row.pop("all_columns")
  1422. _html = Document(_row)
  1423. task_queue.put(_html)
  1424. def _handle(item,result_queue):
  1425. _html = item
  1426. _html.fix_columns(ots_capacity,["dochtmlcon"],True)
  1427. print(_html.getProperties().get(document_tmp_docid))
  1428. send_msg_toacmq(pool_mq,json.dumps(_html.getProperties()),"/queue/dataflow_extract")
  1429. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1430. mt.run()
  1431. except Exception as e:
  1432. traceback.print_exc()
  1433. finally:
  1434. pool_mq.destory()
  1435. def check_data_synchronization():
  1436. # filepath = "C:\\Users\\Administrator\\Desktop\\to_check.log"
  1437. # list_uuid = []
  1438. # _regrex = "delete\s+(?P<tablename>[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  1439. # with open(filepath,"r",encoding="utf8") as f:
  1440. # while 1:
  1441. # _line = f.readline()
  1442. # if not _line:
  1443. # break
  1444. # _match = re.search(_regrex,_line)
  1445. # if _match is not None:
  1446. # _uuid = _match.groupdict().get("uuid")
  1447. # tablename = _match.groupdict.get("tablename")
  1448. # if _uuid is not None:
  1449. # list_uuid.append({"uuid":_uuid,"tablename":tablename})
  1450. # print("total_count:",len(list_uuid))
  1451. import pandas as pd
  1452. from BaseDataMaintenance.common.Utils import load
  1453. task_queue = Queue()
  1454. list_data = []
  1455. df_data = load("uuid.pk")
  1456. # df_data = pd.read_excel("check.xlsx")
  1457. for uuid,tablename in zip(df_data["uuid"],df_data["tablename"]):
  1458. _dict = {"uuid":uuid,
  1459. "tablename":tablename}
  1460. list_data.append(_dict)
  1461. task_queue.put(_dict)
  1462. print("qsize:",task_queue.qsize())
  1463. ots_client = getConnect_ots()
  1464. def _handle(_item,result_queue):
  1465. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  1466. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1467. SearchQuery(bool_query,get_total_count=True),
  1468. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1469. _item["exists"] = total_count
  1470. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1471. mt.run()
  1472. df_data = {"uuid":[],
  1473. "tablename":[],
  1474. "exists":[]}
  1475. for _data in list_data:
  1476. if _data["exists"]==0:
  1477. for k,v in df_data.items():
  1478. v.append(_data.get(k))
  1479. import pandas as pd
  1480. df2 = pd.DataFrame(df_data)
  1481. df2.to_excel("check1.xlsx")
  1482. current_path = os.path.abspath(os.path.dirname(__file__))
  1483. def fixDoc_to_queue_init(filename=""):
  1484. import pandas as pd
  1485. from BaseDataMaintenance.model.oracle.GongGaoTemp import dict_oracle2ots
  1486. if filename=="":
  1487. filename = os.path.join(current_path,"check.xlsx")
  1488. df = pd.read_excel(filename)
  1489. if "docchannel" in dict_oracle2ots:
  1490. dict_oracle2ots.pop("docchannel")
  1491. row_name = ",".join(list(dict_oracle2ots.keys()))
  1492. conn = getConnection_oracle()
  1493. cursor = conn.cursor()
  1494. _count = 0
  1495. for uuid,tablename,_exists,_toolong in zip(df["uuid"],df["tablename"],df["exists"],df["tolong"]):
  1496. if _exists==0 and _toolong==0:
  1497. _count += 1
  1498. _source = str(tablename).replace("_TEMP","")
  1499. sql = " insert into %s(%s) select %s from %s where id='%s' "%(tablename,row_name,row_name,_source,uuid)
  1500. cursor.execute(sql)
  1501. log("%d:%s"%(_count,sql))
  1502. conn.commit()
  1503. conn.close()
  1504. if __name__ == '__main__':
  1505. # di = Dataflow_init()
  1506. # di.start_dataflow_init()
  1507. # transform_attachment()
  1508. # del_test_doc()
  1509. # de = Dataflow_ActivteMQ_extract()
  1510. # de.start_flow_extract()
  1511. # fixDoc_to_queue_extract()
  1512. # check_data_synchronization()
  1513. # fixDoc_to_queue_init(filename="C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-07.xlsx")
  1514. current_date = getCurrent_date(format="%Y-%m-%d")
  1515. last_date = timeAdd(current_date,-30,format="%Y-%m-%d")
  1516. sql = " delete from attachment where crtime<='%s 00:00:00' "%(last_date)
  1517. print(sql)