attachmentFix.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  2. from BaseDataMaintenance.dataSource.source import *
  3. from BaseDataMaintenance.common.Utils import *
  4. import queue
  5. from tablestore import *
  6. from threading import Thread,RLock
  7. from apscheduler.schedulers.blocking import BlockingScheduler
  8. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  9. from BaseDataMaintenance.model.mysql.BaseModel import BaseModel
  10. from BaseDataMaintenance.model.ots.document import *
  11. import traceback
  12. from BaseDataMaintenance.dataSource.download import download
  13. import base64
  14. from BaseDataMaintenance.dataSource.interface import getAttachDealInterface,sentMsgToDD
  15. from BaseDataMaintenance.model.ots.attachment import *
  16. from BaseDataMaintenance.common.ossUtils import *
  17. from uuid import uuid4
  18. from bs4 import BeautifulSoup
  19. from BaseDataMaintenance.model.ots.document_fix_page_attachments import *
  20. import threading
  21. class AttachmentFix():
  22. def __init__(self):
  23. self.ots_client = getConnect_ots()
  24. self.auth = getAuth()
  25. check_url = "oss-cn-hangzhou-internal.aliyuncs.com"
  26. if is_internal:
  27. self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  28. else:
  29. self.bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  30. self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
  31. log("bucket_url:%s"%(self.bucket_url))
  32. self.attachment_bucket_name = "attachment-hub"
  33. self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
  34. self.task_queue = queue.Queue(3000)
  35. self.current_path = os.path.dirname(__file__)
  36. self.fileSuffix = [".zip", ".rar", ".tar", ".7z", ".wim", ".docx", ".doc", ".xlsx", ".xls", ".pdf", ".txt", ".hnzf", ".bmp", ".jpg", ".jpeg", ".png", ".tif", ".swf"]
  37. self.cul_size = 0
  38. self.cul_lock = RLock()
  39. self.download_path = os.path.join(self.current_path,"fixdownload")
  40. for file in os.listdir(self.download_path):
  41. filepath = os.path.join(self.download_path,file)
  42. if os.path.isfile(filepath):
  43. os.remove(filepath)
  44. self.dict_filelink = dict()
  45. self.set_failed = set()
  46. self.lock_download = RLock()
  47. self.lock_failed = RLock()
  48. def producer(self):
  49. bool_query = BoolQuery(
  50. must_not_queries=[RangeQuery(document_fix_status,1,3,True,True)]
  51. # must_queries=[TermQuery(document_fix_docid,113881457)]
  52. )
  53. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_fix_page_attachments","document_fix_page_attachments_index",
  54. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),limit=100,get_total_count=True),
  55. columns_to_get=ColumnsToGet([document_fix_attachments,document_fix_new_attachment],ColumnReturnType.SPECIFIED))
  56. list_dict = getRow_ots(rows)
  57. for _dict in list_dict:
  58. _fix = document_fix_page_attachments(_dict)
  59. self.task_queue.put(_fix,True)
  60. while next_token:
  61. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_fix_page_attachments","document_fix_page_attachments_index",
  62. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  63. columns_to_get=ColumnsToGet([document_fix_attachments,document_fix_new_attachment],ColumnReturnType.SPECIFIED))
  64. list_dict = getRow_ots(rows)
  65. for _dict in list_dict:
  66. _fix = document_fix_page_attachments(_dict)
  67. self.task_queue.put(_fix,True)
  68. def getDownloaded(self,_url):
  69. for _i1 in range(600):
  70. with self.lock_download:
  71. if len(self.dict_filelink)>100000:
  72. log("clear filelink %d"%(len(self.dict_filelink)))
  73. self.dict_filelink.clear()
  74. if _url in self.dict_filelink:
  75. _result = self.dict_filelink[_url]
  76. if _result == "":
  77. t_id = threading.current_thread().ident
  78. # log("%s watting %s"%(str(t_id),_url))
  79. else:
  80. return _result
  81. else:
  82. self.dict_filelink[_url] = ""
  83. return "current"
  84. time.sleep(0.1)
  85. return "current"
  86. def getFailed(self,_url):
  87. with self.lock_failed:
  88. _flag = False
  89. if _url in self.set_failed:
  90. _flag= True
  91. if len(self.set_failed)>100000:
  92. self.set_failed.clear()
  93. return _flag
  94. def comsumer(self):
  95. def _handle(_fix,result_queue):
  96. try:
  97. page_attachments = json.loads(_fix.getProperties().get(document_fix_attachments,"[]"))
  98. docid = _fix.getProperties().get(document_fix_docid)
  99. new_page_attachments = []
  100. _count = 0
  101. for _attach in page_attachments:
  102. try:
  103. _count += 1
  104. if _count>10:
  105. break
  106. _url = _attach.get("fileLink","")
  107. has_download = self.getDownloaded(_url)
  108. if has_download=="current":
  109. pass
  110. elif has_download is None:
  111. log("has download but to large %s"%(_url))
  112. continue
  113. else:
  114. log("has download %s"%(_url))
  115. new_page_attachments.append(has_download)
  116. continue
  117. _title = _attach.get("fileTitle","")
  118. filetype = ""
  119. _split = _url.split(".")
  120. for _suf in self.fileSuffix:
  121. if _url.find(_suf)>=0 or _title.find(_suf)>=0:
  122. filetype = _suf[1:]
  123. break
  124. if filetype!="":
  125. if _url.find("http")>=0:
  126. filename = uuid4().hex
  127. filepath = "%s/fixdownload/%s"%(self.current_path,filename)
  128. try:
  129. flag,content = download(_url,filepath,timeout=7)
  130. # with open(filepath,"wb") as f:
  131. # f.write(content)
  132. if not flag:
  133. log("download failed %s"%(_url))
  134. if flag:
  135. log("download %d succeed %s"%(docid,_url))
  136. file_size = os.path.getsize(filepath)
  137. if file_size>ATTACHMENT_LARGESIZE*30:
  138. self.dict_filelink[_url] = None
  139. if os.path.exists(filepath):
  140. os.remove(filepath)
  141. continue
  142. while True:
  143. with self.cul_lock:
  144. if file_size<1024*1024:
  145. self.cul_size += file_size
  146. break
  147. if self.cul_size<300*1024*1024:
  148. self.cul_size += file_size
  149. break
  150. time.sleep(1)
  151. _md5,_size = getMDFFromFile(filepath)
  152. with self.cul_lock:
  153. self.cul_size -= file_size
  154. log("current cul_size:%6f"%(self.cul_size/(1024*1024*1024)))
  155. log(_md5)
  156. objectPath = "%s/fix/%s/%s.%s"%(_md5[:4],getCurrent_date("%Y-%m-%d"),filename,filetype)
  157. _status = random.randint(40,50)
  158. if _size>ATTACHMENT_LARGESIZE:
  159. _status = -1
  160. attach_dict = {attachment_filemd5:_md5,attachment_filetype:filetype,attachment_path:objectPath,
  161. attachment_size:_size,attachment_crtime:getCurrent_date(),attachment_status:_status,attachment_file_link:_url}
  162. attach = attachment(attach_dict)
  163. if not attach.exists_row(self.ots_client):
  164. attach.update_row(self.ots_client)
  165. uploadFileByPath(self.bucket,filepath,objectPath)
  166. log("docid:%d upload succeed %s"%(docid,_md5))
  167. else:
  168. log("docid:%d has upload %s"%(docid,_md5))
  169. new_page_attachments.append(_attach)
  170. self.dict_filelink[_url] = _attach
  171. _attach[document_attachment_path_filemd5] = _md5
  172. except Exception as e:
  173. pass
  174. finally:
  175. if os.path.exists(filepath):
  176. os.remove(filepath)
  177. except Exception as e:
  178. log(str(e))
  179. traceback.print_exc()
  180. _fix.setValue(document_fix_count,len(page_attachments),True)
  181. _fix.setValue(document_fix_succeed_count,len(new_page_attachments),True)
  182. _fix.setValue(document_fix_new_attachment,json.dumps(new_page_attachments,ensure_ascii=False),True)
  183. if len(new_page_attachments)>0:
  184. if len(new_page_attachments)==len(page_attachments):
  185. _fix.setValue(document_fix_status,1,True)
  186. else:
  187. _fix.setValue(document_fix_status,2,True)
  188. _dict = {document_partitionkey:_fix.getProperties().get(document_fix_partitionkey),
  189. document_docid:_fix.getProperties().get(document_fix_docid),
  190. document_attachment_path:_fix.getProperties().get(document_fix_new_attachment)}
  191. _document = Document(_dict)
  192. log("insert docid %s"%(str(_fix.getProperties().get(document_fix_docid))))
  193. _document.update_row(self.ots_client)
  194. else:
  195. _fix.setValue(document_fix_status,3,True)
  196. _fix.update_row(self.ots_client)
  197. log("handle docid:%d with status:%d"%(_fix.getProperties().get(document_fix_docid),_fix.getProperties().get(document_fix_status)))
  198. except Exception as e:
  199. traceback.print_exc()
  200. mt = MultiThreadHandler(self.task_queue,_handle,None,90)
  201. mt.run()
  202. def schedule(self):
  203. scheduler = BlockingScheduler()
  204. scheduler.add_job(self.producer,"cron",minute="*/1")
  205. scheduler.add_job(self.comsumer,"cron",minute="*/1")
  206. scheduler.start()
  207. def start_attachFix():
  208. af = AttachmentFix()
  209. af.schedule()
  210. def getAttachFromPath(filepath,filetype):
  211. if os.path.exists(filepath):
  212. filemd5,size = getMDFFromFile(filepath)
  213. if filemd5 is not None:
  214. relapath = "%s/supplement/%s/%s.%s"%(filemd5[:4],getCurrent_date("%Y-%m-%d"),filemd5,filetype)
  215. _t = relapath.split(".")
  216. if len(_t)>0:
  217. filetype = _t[-1]
  218. filetype = filetype.lower()
  219. print("filemd5",filemd5)
  220. _dict = {attachment_filemd5:filemd5,attachment_size:size,attachment_path:relapath,attachment_crtime:getCurrent_date("%Y-%m-%d %H:%M:%S"),attachment_filetype:filetype,attachment_status:ATTACHMENT_INIT}
  221. print(_dict)
  222. _attach = attachment(_dict)
  223. if _attach.getProperties().get(attachment_size)<ATTACHMENT_SMALL_SIZE:
  224. #写入base64到attachment
  225. # _data_base64 = base64.b64encode(open(filepath,"rb").read())
  226. # _attach.setValue(attachment_base64,_data_base64,True)
  227. _attach.setStatusToMCDeal()
  228. else:
  229. _attach.setValue(attachment_status,ATTACHMENT_TOOLARGE,True)
  230. return _attach
  231. else:
  232. log("md5 is None")
  233. else:
  234. log("file not exists %s"%(filepath))
  235. return None
  236. from BaseDataMaintenance.maintenance.attachment.attachmentProcess import AttachProcess
  237. common_bucket = None
  238. common_ots_client = None
  239. def getOtsClient():
  240. global common_ots_client
  241. if common_ots_client is None:
  242. common_ots_client = getConnect_ots()
  243. return common_ots_client
  244. def getBucket():
  245. global common_bucket
  246. if common_bucket is None:
  247. auth = getAuth()
  248. check_url = "oss-cn-hangzhou-internal.aliyuncs.com"
  249. if is_internal:
  250. bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  251. else:
  252. bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  253. attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
  254. log("bucket_url:%s"%(bucket_url))
  255. attachment_bucket_name = "attachment-hub"
  256. common_bucket = oss2.Bucket(auth,bucket_url,attachment_bucket_name)
  257. return common_bucket
  258. common_ap = AttachProcess()
  259. def makeFixing(_document):
  260. common_ap.comsumer_handle(_document,None)
  261. def fixAttachmentOfDoc(docid,new_files):
  262. #补充未下载的公告方法
  263. ots_client = getOtsClient()
  264. bucket = getBucket()
  265. list_attach = []
  266. new_attachments = []
  267. for _file in new_files:
  268. filepath = _file.get("filepath")
  269. filetype = _file.get("file_type")
  270. _attach = getAttachFromPath(filepath,filetype)
  271. if _attach is None:
  272. return
  273. current_filemd5 = _attach.getProperties().get(attachment_filemd5)
  274. log("supplement %s"%current_filemd5)
  275. _old_status = None
  276. if not _attach.exists_row(ots_client):
  277. log("not exists %s"%current_filemd5)
  278. _old_status = _attach.getProperties().get(attachment_status)
  279. _attach.setValue(attachment_status,0,True)
  280. _attach.setValue("old_status",_old_status,False)
  281. _attach.update_row(ots_client)
  282. uploadFileByPath(bucket,filepath,_attach.getProperties().get(attachment_path))
  283. else:
  284. _attach.getProperties().pop(attachment_path)
  285. _attach.fix_columns(ots_client,[attachment_status],True)
  286. _old_status = _attach.getProperties().get(attachment_status)
  287. _attach.setValue("old_status",_old_status,False)
  288. _attach.setValue(attachment_status,10,True)
  289. _attach.update_row(ots_client)
  290. list_attach.append(_attach)
  291. new_attachments.append({document_attachment_path_filemd5:current_filemd5})
  292. partitionkey = docid%500+1
  293. _document = Document({document_partitionkey:partitionkey,document_docid:docid})
  294. _document.fix_columns(ots_client,[document_attachment_path,document_attachment_extract_status],True)
  295. log(_document.getProperties().get(document_attachment_path,"[]"))
  296. _old_attachments = _document.getProperties().get(document_attachment_path,"[]")
  297. if _old_attachments=="":
  298. _old_attachments = "[]"
  299. _page_attachments = json.loads(_old_attachments)
  300. print("fixing",docid,_page_attachments)
  301. for new_item in new_attachments:
  302. _exists = False
  303. for item in _page_attachments:
  304. if item.get(document_attachment_path_filemd5)==new_item.get(document_attachment_path_filemd5):
  305. _exists = True
  306. if not _exists:
  307. _page_attachments.append(new_item)
  308. _document.setValue(document_attachment_path,json.dumps(_page_attachments,ensure_ascii=False),True)
  309. if len(_page_attachments)>0:
  310. _document.setValue(document_status,1,True)
  311. _document.update_row(ots_client)
  312. # makeFixing(_document) #新流程不再需要这一步
  313. for _attach in list_attach:
  314. if _attach.getProperties().get("old_status") is not None:
  315. _attach.setValue(attachment_status,_attach.getProperties().get("old_status"),True)
  316. _attach.update_row(ots_client)
  317. attach_status = _document.getProperties().get(document_attachment_extract_status)
  318. # _document.setValue(document_crtime,getCurrent_date(),True)
  319. if attach_status==1:
  320. _document.setValue(document_attachment_extract_status,0,True)
  321. _document.update_row(ots_client)
  322. def extract_pageAttachments(_html):
  323. fileSuffix = [".zip", ".rar", ".tar", ".7z", ".wim", ".docx", ".doc", ".xlsx", ".xls", ".pdf", ".txt", ".hnzf", ".bmp", ".jpg", ".jpeg", ".png", ".tif", ".swf"]
  324. _soup = BeautifulSoup(_html,"lxml")
  325. list_a = _soup.find_all("a")
  326. list_img = _soup.find_all("img")
  327. page_attachments = []
  328. for _a in list_a:
  329. _text =_a.get_text()
  330. _url = _a.attrs.get("href","")
  331. if _url.find("http://www.bidizhaobiao.com")>=0:
  332. continue
  333. if _url.find("detail-area-list-icon.png")>=0:
  334. continue
  335. is_attach = False
  336. if _url.find("http://zbtb.gd.gov.cn/platform/attach")>=0:
  337. is_attach = True
  338. file_type = ".pdf"
  339. for suf in fileSuffix:
  340. if _text.find(suf)>=0 or _url.find(suf)>=0:
  341. is_attach = True
  342. file_type = suf.lower()
  343. if is_attach:
  344. page_attachments.append({"fileLink":_url,"fileTitle":_text,"file_type":file_type[1:]})
  345. for _a in list_img:
  346. _text =_a.get_text()
  347. _url = _a.attrs.get("src","")
  348. if _url.find("http://www.bidizhaobiao.com")>=0:
  349. continue
  350. is_attach = False
  351. for suf in fileSuffix:
  352. if _text.find(suf)>=0 or _url.find(suf)>=0:
  353. is_attach = True
  354. file_type = suf.lower()
  355. if is_attach:
  356. page_attachments.append({"fileLink":_url,"fileTitle":_text,"file_type":file_type})
  357. return page_attachments
  358. def fixDoc(docid,ots_client=None):
  359. '''
  360. 修复公告内的附件未下载的数据
  361. :param docid:
  362. :param ots_client:
  363. :return:
  364. '''
  365. if ots_client is None:
  366. ots_client = getConnect_ots()
  367. capacity_client = getConnect_ots_capacity()
  368. partitionkey = docid%500+1
  369. _document = Document({document_partitionkey:partitionkey,document_docid:docid})
  370. _document.fix_columns(ots_client,[document_attachment_path,document_attachment_extract_status],True)
  371. _document.fix_columns(capacity_client,[document_dochtmlcon],True)
  372. log("=1")
  373. if _document.getProperties().get(document_attachment_extract_status,0)!=1 or 1:
  374. log("=2")
  375. if _document.getProperties().get(document_attachment_path,'[]')=="[]":
  376. log("=3")
  377. new_attachments = extract_pageAttachments(_document.getProperties().get(document_dochtmlcon))
  378. log(str(new_attachments))
  379. new_files = []
  380. for _atta in new_attachments:
  381. new_file_name = "%s/fixdownload/%s"%(os.path.dirname(__file__),uuid4().hex)
  382. _flag,_ = download(_atta.get("fileLink"),new_file_name)
  383. if _flag:
  384. new_files.append({"filepath":new_file_name,"file_type":_atta.get("file_type")})
  385. fixAttachmentOfDoc(docid,new_files)
  386. for _file in new_files:
  387. if os.path.exists(_file.get("filepath")):
  388. os.remove(_file.get("filepath"))
  389. class FixDocument():
  390. def __init__(self):
  391. self.ots_client = getOtsClient()
  392. self.bucket = getBucket()
  393. self.task_queue = queue.Queue()
  394. def producer(self):
  395. should_docid = BoolQuery(should_queries=[
  396. RangeQuery("docid",173470530,174453795),
  397. RangeQuery("docid",180724983,200250181)
  398. ])
  399. bool_query = BoolQuery(must_queries=[
  400. # RangeQuery("crtime",'2021-08-10 00:00:00','2021-10-10 00:00:00'),
  401. RangeQuery("docid",86363824),
  402. WildcardQuery("web_source_no","00141-1*"),
  403. # should_docid
  404. ]
  405. ,must_not_queries=[NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*"))])
  406. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  407. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),get_total_count=True,limit=100),
  408. ColumnsToGet([],ColumnReturnType.NONE))
  409. log("total_count:%d"%total_count)
  410. dict_rows = getRow_ots(rows)
  411. for _row in dict_rows:
  412. self.task_queue.put(_row)
  413. while next_token:
  414. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  415. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  416. ColumnsToGet([],ColumnReturnType.NONE))
  417. dict_rows = getRow_ots(rows)
  418. for _row in dict_rows:
  419. self.task_queue.put(_row)
  420. # import pandas as pd
  421. # df = pd.read_excel("2022-01-18_183521_export11.xlsx")
  422. # for docid in df["docid"]:
  423. # _dict = {document_docid:int(docid),
  424. # document_partitionkey:int(docid)%500+1,
  425. # }
  426. # self.task_queue.put(_dict)
  427. def comsumer(self):
  428. def _handle(item,result_queue,ots_client):
  429. docid = item.get(document_docid)
  430. fixDoc(docid,ots_client)
  431. ots_client = getConnect_ots()
  432. mt = MultiThreadHandler(self.task_queue,_handle,None,30,ots_client=ots_client)
  433. mt.run()
  434. def start(self):
  435. schedule = BlockingScheduler()
  436. t = Thread(target=self.producer)
  437. t.start()
  438. t.join()
  439. # schedule.add_job(self.producer,"cron",minute="*/1")
  440. schedule.add_job(self.comsumer,"cron",minute="*/1")
  441. schedule.start()
  442. t.join()
  443. def start_docFix():
  444. fd = FixDocument()
  445. fd.start()
  446. if __name__=="__main__":
  447. # docs = [156668514]
  448. # for _doc in docs:
  449. # fixDoc(_doc)
  450. start_docFix()
  451. # af = AttachmentFix()
  452. # af.schedule()
  453. # _flag,_ = download("http://59.55.120.164:8088/upload/images_file/shop/image/productType/standard/101.jpg")
  454. # print(_flag)