attachmentFix.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559
  1. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  2. from BaseDataMaintenance.dataSource.source import *
  3. from BaseDataMaintenance.common.Utils import *
  4. import queue
  5. from tablestore import *
  6. from threading import Thread,RLock
  7. from apscheduler.schedulers.blocking import BlockingScheduler
  8. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  9. from BaseDataMaintenance.model.mysql.BaseModel import BaseModel
  10. from BaseDataMaintenance.model.ots.document import *
  11. import traceback
  12. from BaseDataMaintenance.dataSource.download import download
  13. import base64
  14. from BaseDataMaintenance.dataSource.interface import getAttachDealInterface,sentMsgToDD
  15. from BaseDataMaintenance.model.ots.attachment import *
  16. from BaseDataMaintenance.common.ossUtils import *
  17. from uuid import uuid4
  18. from bs4 import BeautifulSoup
  19. from BaseDataMaintenance.model.ots.document_fix_page_attachments import *
  20. import threading
  21. class AttachmentFix():
  22. def __init__(self):
  23. self.ots_client = getConnect_ots()
  24. self.auth = getAuth()
  25. check_url = "oss-cn-hangzhou-internal.aliyuncs.com"
  26. if is_internal:
  27. self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  28. else:
  29. self.bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  30. self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
  31. log("bucket_url:%s"%(self.bucket_url))
  32. self.attachment_bucket_name = "attachment-hub"
  33. self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
  34. self.task_queue = queue.Queue(3000)
  35. self.current_path = os.path.dirname(__file__)
  36. self.fileSuffix = [".zip", ".rar", ".tar", ".7z", ".wim", ".docx", ".doc", ".xlsx", ".xls", ".pdf", ".txt", ".hnzf", ".bmp", ".jpg", ".jpeg", ".png", ".tif", ".swf"]
  37. self.cul_size = 0
  38. self.cul_lock = RLock()
  39. self.download_path = os.path.join(self.current_path,"fixdownload")
  40. for file in os.listdir(self.download_path):
  41. filepath = os.path.join(self.download_path,file)
  42. if os.path.isfile(filepath):
  43. os.remove(filepath)
  44. self.dict_filelink = dict()
  45. self.set_failed = set()
  46. self.lock_download = RLock()
  47. self.lock_failed = RLock()
  48. def producer(self):
  49. bool_query = BoolQuery(
  50. must_not_queries=[RangeQuery(document_fix_status,1,3,True,True)]
  51. # must_queries=[TermQuery(document_fix_docid,113881457)]
  52. )
  53. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_fix_page_attachments","document_fix_page_attachments_index",
  54. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),limit=100,get_total_count=True),
  55. columns_to_get=ColumnsToGet([document_fix_attachments,document_fix_new_attachment],ColumnReturnType.SPECIFIED))
  56. list_dict = getRow_ots(rows)
  57. for _dict in list_dict:
  58. _fix = document_fix_page_attachments(_dict)
  59. self.task_queue.put(_fix,True)
  60. while next_token:
  61. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_fix_page_attachments","document_fix_page_attachments_index",
  62. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  63. columns_to_get=ColumnsToGet([document_fix_attachments,document_fix_new_attachment],ColumnReturnType.SPECIFIED))
  64. list_dict = getRow_ots(rows)
  65. for _dict in list_dict:
  66. _fix = document_fix_page_attachments(_dict)
  67. self.task_queue.put(_fix,True)
  68. def getDownloaded(self,_url):
  69. for _i1 in range(600):
  70. with self.lock_download:
  71. if len(self.dict_filelink)>100000:
  72. log("clear filelink %d"%(len(self.dict_filelink)))
  73. self.dict_filelink.clear()
  74. if _url in self.dict_filelink:
  75. _result = self.dict_filelink[_url]
  76. if _result == "":
  77. t_id = threading.current_thread().ident
  78. # log("%s watting %s"%(str(t_id),_url))
  79. else:
  80. return _result
  81. else:
  82. self.dict_filelink[_url] = ""
  83. return "current"
  84. time.sleep(0.1)
  85. return "current"
  86. def getFailed(self,_url):
  87. with self.lock_failed:
  88. _flag = False
  89. if _url in self.set_failed:
  90. _flag= True
  91. if len(self.set_failed)>100000:
  92. self.set_failed.clear()
  93. return _flag
  94. def comsumer(self):
  95. def _handle(_fix,result_queue):
  96. try:
  97. page_attachments = json.loads(_fix.getProperties().get(document_fix_attachments,"[]"))
  98. docid = _fix.getProperties().get(document_fix_docid)
  99. new_page_attachments = []
  100. _count = 0
  101. for _attach in page_attachments:
  102. try:
  103. _count += 1
  104. if _count>10:
  105. break
  106. _url = _attach.get("fileLink","")
  107. has_download = self.getDownloaded(_url)
  108. if has_download=="current":
  109. pass
  110. elif has_download is None:
  111. log("has download but to large %s"%(_url))
  112. continue
  113. else:
  114. log("has download %s"%(_url))
  115. new_page_attachments.append(has_download)
  116. continue
  117. _title = _attach.get("fileTitle","")
  118. filetype = ""
  119. _split = _url.split(".")
  120. for _suf in self.fileSuffix:
  121. if _url.find(_suf)>=0 or _title.find(_suf)>=0:
  122. filetype = _suf[1:]
  123. break
  124. if filetype!="":
  125. if _url.find("http")>=0:
  126. filename = uuid4().hex
  127. filepath = "%s/fixdownload/%s"%(self.current_path,filename)
  128. try:
  129. flag,content = download(_url,filepath,timeout=7)
  130. # with open(filepath,"wb") as f:
  131. # f.write(content)
  132. if not flag:
  133. log("download failed %s"%(_url))
  134. if flag:
  135. log("download %d succeed %s"%(docid,_url))
  136. file_size = os.path.getsize(filepath)
  137. if file_size>ATTACHMENT_LARGESIZE*30:
  138. self.dict_filelink[_url] = None
  139. if os.path.exists(filepath):
  140. os.remove(filepath)
  141. continue
  142. while True:
  143. with self.cul_lock:
  144. if file_size<1024*1024:
  145. self.cul_size += file_size
  146. break
  147. if self.cul_size<300*1024*1024:
  148. self.cul_size += file_size
  149. break
  150. time.sleep(1)
  151. _md5,_size = getMDFFromFile(filepath)
  152. with self.cul_lock:
  153. self.cul_size -= file_size
  154. log("current cul_size:%6f"%(self.cul_size/(1024*1024*1024)))
  155. log(_md5)
  156. objectPath = "%s/fix/%s/%s.%s"%(_md5[:4],getCurrent_date("%Y-%m-%d"),filename,filetype)
  157. _status = random.randint(40,50)
  158. if _size>ATTACHMENT_LARGESIZE:
  159. _status = -1
  160. attach_dict = {attachment_filemd5:_md5,attachment_filetype:filetype,attachment_path:objectPath,
  161. attachment_size:_size,attachment_crtime:getCurrent_date(),attachment_status:_status,attachment_file_link:_url}
  162. attach = attachment(attach_dict)
  163. if not attach.exists_row(self.ots_client):
  164. attach.update_row(self.ots_client)
  165. uploadFileByPath(self.bucket,filepath,objectPath)
  166. log("docid:%d upload succeed %s"%(docid,_md5))
  167. else:
  168. log("docid:%d has upload %s"%(docid,_md5))
  169. new_page_attachments.append(_attach)
  170. self.dict_filelink[_url] = _attach
  171. _attach[document_attachment_path_filemd5] = _md5
  172. except Exception as e:
  173. pass
  174. finally:
  175. if os.path.exists(filepath):
  176. os.remove(filepath)
  177. except Exception as e:
  178. log(str(e))
  179. traceback.print_exc()
  180. _fix.setValue(document_fix_count,len(page_attachments),True)
  181. _fix.setValue(document_fix_succeed_count,len(new_page_attachments),True)
  182. _fix.setValue(document_fix_new_attachment,json.dumps(new_page_attachments,ensure_ascii=False),True)
  183. if len(new_page_attachments)>0:
  184. if len(new_page_attachments)==len(page_attachments):
  185. _fix.setValue(document_fix_status,1,True)
  186. else:
  187. _fix.setValue(document_fix_status,2,True)
  188. _dict = {document_partitionkey:_fix.getProperties().get(document_fix_partitionkey),
  189. document_docid:_fix.getProperties().get(document_fix_docid),
  190. document_attachment_path:_fix.getProperties().get(document_fix_new_attachment)}
  191. _document = Document(_dict)
  192. log("insert docid %s"%(str(_fix.getProperties().get(document_fix_docid))))
  193. _document.update_row(self.ots_client)
  194. else:
  195. _fix.setValue(document_fix_status,3,True)
  196. _fix.update_row(self.ots_client)
  197. log("handle docid:%d with status:%d"%(_fix.getProperties().get(document_fix_docid),_fix.getProperties().get(document_fix_status)))
  198. except Exception as e:
  199. traceback.print_exc()
  200. mt = MultiThreadHandler(self.task_queue,_handle,None,90)
  201. mt.run()
  202. def schedule(self):
  203. scheduler = BlockingScheduler()
  204. scheduler.add_job(self.producer,"cron",minute="*/1")
  205. scheduler.add_job(self.comsumer,"cron",minute="*/1")
  206. scheduler.start()
  207. def start_attachFix():
  208. af = AttachmentFix()
  209. af.schedule()
  210. def getAttachFromPath(filepath,filetype):
  211. if os.path.exists(filepath):
  212. filemd5,size = getMDFFromFile(filepath)
  213. if filemd5 is not None:
  214. relapath = "%s/supplement/%s/%s.%s"%(filemd5[:4],getCurrent_date("%Y-%m-%d"),filemd5,filetype)
  215. _t = relapath.split(".")
  216. if len(_t)>0:
  217. filetype = _t[-1]
  218. filetype = filetype.lower()
  219. print("filemd5",filemd5)
  220. _dict = {attachment_filemd5:filemd5,attachment_size:size,attachment_path:relapath,attachment_crtime:getCurrent_date("%Y-%m-%d %H:%M:%S"),attachment_filetype:filetype,attachment_status:ATTACHMENT_INIT}
  221. print(_dict)
  222. _attach = attachment(_dict)
  223. if _attach.getProperties().get(attachment_size)<ATTACHMENT_SMALL_SIZE:
  224. #写入base64到attachment
  225. # _data_base64 = base64.b64encode(open(filepath,"rb").read())
  226. # _attach.setValue(attachment_base64,_data_base64,True)
  227. _attach.setStatusToMCDeal()
  228. else:
  229. _attach.setValue(attachment_status,ATTACHMENT_TOOLARGE,True)
  230. return _attach
  231. else:
  232. log("md5 is None")
  233. else:
  234. log("file not exists %s"%(filepath))
  235. return None
  236. from BaseDataMaintenance.maintenance.attachment.attachmentProcess import AttachProcess
  237. common_bucket = None
  238. common_ots_client = None
  239. def getOtsClient():
  240. global common_ots_client
  241. if common_ots_client is None:
  242. common_ots_client = getConnect_ots()
  243. return common_ots_client
  244. def getBucket():
  245. global common_bucket
  246. if common_bucket is None:
  247. auth = getAuth()
  248. check_url = "oss-cn-hangzhou-internal.aliyuncs.com"
  249. if is_internal:
  250. bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  251. else:
  252. bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  253. attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
  254. log("bucket_url:%s"%(bucket_url))
  255. attachment_bucket_name = "attachment-hub"
  256. common_bucket = oss2.Bucket(auth,bucket_url,attachment_bucket_name)
  257. return common_bucket
  258. common_ap = AttachProcess()
  259. def makeFixing(_document):
  260. common_ap.comsumer_handle(_document,None)
  261. def fixAttachmentOfDoc(docid,new_files):
  262. #补充未下载的公告方法
  263. ots_client = getOtsClient()
  264. bucket = getBucket()
  265. list_attach = []
  266. new_attachments = []
  267. partitionkey = docid%500+1
  268. _document = Document({document_partitionkey:partitionkey,document_docid:docid})
  269. _document.fix_columns(ots_client,[document_attachment_path,document_attachment_extract_status],True)
  270. log(_document.getProperties().get(document_attachment_path,"[]"))
  271. _old_attachments = _document.getProperties().get(document_attachment_path,"[]")
  272. if _old_attachments=="":
  273. _old_attachments = "[]"
  274. _page_attachments = json.loads(_old_attachments)
  275. for _file in new_files:
  276. filepath = _file.get("filepath")
  277. filetype = _file.get("file_type")
  278. _attach = getAttachFromPath(filepath,filetype)
  279. if _attach is None:
  280. return
  281. current_filemd5 = _attach.getProperties().get(attachment_filemd5)
  282. log("supplement %s"%current_filemd5)
  283. _old_status = None
  284. if not _attach.exists_row(ots_client):
  285. log("not exists %s"%current_filemd5)
  286. _old_status = _attach.getProperties().get(attachment_status)
  287. _attach.setValue(attachment_status,0,True)
  288. _attach.setValue("old_status",_old_status,False)
  289. _attach.update_row(ots_client)
  290. uploadFileByPath(bucket,filepath,_attach.getProperties().get(attachment_path))
  291. else:
  292. _attach.getProperties().pop(attachment_path)
  293. _attach.fix_columns(ots_client,[attachment_status],True)
  294. _old_status = _attach.getProperties().get(attachment_status)
  295. _attach.setValue("old_status",_old_status,False)
  296. _attach.setValue(attachment_status,10,True)
  297. _attach.update_row(ots_client)
  298. list_attach.append(_attach)
  299. new_attachments.append({document_attachment_path_filemd5:current_filemd5})
  300. print("fixing",docid,_page_attachments)
  301. for new_item in new_attachments:
  302. _exists = False
  303. for item in _page_attachments:
  304. if item.get(document_attachment_path_filemd5)==new_item.get(document_attachment_path_filemd5):
  305. _exists = True
  306. if not _exists:
  307. _page_attachments.append(new_item)
  308. _document.setValue(document_attachment_path,json.dumps(_page_attachments,ensure_ascii=False),True)
  309. if len(_page_attachments)>0:
  310. _document.setValue(document_status,1,True)
  311. _document.update_row(ots_client)
  312. # makeFixing(_document) #新流程不再需要这一步
  313. for _attach in list_attach:
  314. if _attach.getProperties().get("old_status") is not None:
  315. _attach.setValue(attachment_status,_attach.getProperties().get("old_status"),True)
  316. _attach.update_row(ots_client)
  317. attach_status = _document.getProperties().get(document_attachment_extract_status)
  318. # _document.setValue(document_crtime,getCurrent_date(),True)
  319. if attach_status==1:
  320. _document.setValue(document_attachment_extract_status,0,True)
  321. _document.update_row(ots_client)
  322. def extract_pageAttachments(_html):
  323. fileSuffix = [".zip", ".rar", ".tar", ".7z", ".wim", ".docx", ".doc", ".xlsx", ".xls", ".pdf", ".txt", ".hnzf", ".bmp", ".jpg", ".jpeg", ".png", ".tif", ".swf"]
  324. _soup = BeautifulSoup(_html,"lxml")
  325. list_a = _soup.find_all("a")
  326. list_img = _soup.find_all("img")
  327. page_attachments = []
  328. for _a in list_a:
  329. _text =_a.get_text()
  330. _url = _a.attrs.get("href","")
  331. if _url.find("http://www.bidizhaobiao.com")>=0:
  332. continue
  333. if _url.find("detail-area-list-icon.png")>=0:
  334. continue
  335. is_attach = False
  336. if _url.find("http://zbtb.gd.gov.cn/platform/attach")>=0:
  337. is_attach = True
  338. file_type = ".pdf"
  339. for suf in fileSuffix:
  340. if _text.find(suf)>=0 or _url.find(suf)>=0:
  341. is_attach = True
  342. file_type = suf.lower()
  343. if not is_attach:
  344. for suf in fileSuffix:
  345. if _text.find(suf[1:])>=0 or _url.find(suf[1:])>=0:
  346. is_attach = True
  347. file_type = suf.lower()
  348. if is_attach:
  349. page_attachments.append({"fileLink":_url,"fileTitle":_text,"file_type":file_type[1:]})
  350. for _a in list_img:
  351. _text =_a.get_text()
  352. _url = _a.attrs.get("src","")
  353. if _url.find("http://www.bidizhaobiao.com")>=0:
  354. continue
  355. is_attach = False
  356. for suf in fileSuffix:
  357. if _text.find(suf)>=0 or _url.find(suf)>=0:
  358. is_attach = True
  359. file_type = suf.lower()
  360. if is_attach:
  361. page_attachments.append({"fileLink":_url,"fileTitle":_text,"file_type":file_type})
  362. return page_attachments
  363. def fixDoc(docid,ots_client=None):
  364. '''
  365. 修复公告内的附件未下载的数据
  366. :param docid:
  367. :param ots_client:
  368. :return:
  369. '''
  370. if ots_client is None:
  371. ots_client = getConnect_ots()
  372. capacity_client = getConnect_ots_capacity()
  373. partitionkey = docid%500+1
  374. _document = Document({document_partitionkey:partitionkey,document_docid:docid})
  375. _document.fix_columns(ots_client,[document_attachment_path,document_attachment_extract_status],True)
  376. _document.fix_columns(capacity_client,[document_dochtmlcon],True)
  377. log("=1")
  378. if _document.getProperties().get(document_attachment_extract_status,0)!=1 or 1:
  379. log("=2")
  380. if _document.getProperties().get(document_attachment_path,'[]')=="[]" or 1:
  381. log("=3")
  382. new_attachments = extract_pageAttachments(_document.getProperties().get(document_dochtmlcon))
  383. log(str(new_attachments))
  384. new_files = []
  385. for _atta in new_attachments:
  386. new_file_name = "%s/fixdownload/%s"%(os.path.dirname(__file__),uuid4().hex)
  387. _flag,_ = download(_atta.get("fileLink"),new_file_name)
  388. if _flag:
  389. new_files.append({"filepath":new_file_name,"file_type":_atta.get("file_type")})
  390. fixAttachmentOfDoc(docid,new_files)
  391. for _file in new_files:
  392. if os.path.exists(_file.get("filepath")):
  393. os.remove(_file.get("filepath"))
  394. def has_attachments(docid):
  395. capacity_client = getConnect_ots_capacity()
  396. partitionkey = docid%500+1
  397. _document = Document({document_partitionkey:partitionkey,document_docid:docid})
  398. _document.fix_columns(capacity_client,[document_dochtmlcon],True)
  399. _html = _document.getProperties().get(document_dochtmlcon,"")
  400. _soup = BeautifulSoup(_html,"lxml")
  401. list_a = _soup.find_all("a")
  402. return True if len(list_a)>0 else False
  403. class FixDocument():
  404. def __init__(self):
  405. self.ots_client = getOtsClient()
  406. self.bucket = getBucket()
  407. self.task_queue = queue.Queue()
  408. def producer(self):
  409. should_docid = BoolQuery(should_queries=[
  410. RangeQuery("docid",173470530,174453795),
  411. RangeQuery("docid",180724983,200250181)
  412. ])
  413. bool_query = BoolQuery(must_queries=[
  414. # RangeQuery("crtime",'2021-08-10 00:00:00','2021-10-10 00:00:00'),
  415. RangeQuery("docid",86363824),
  416. WildcardQuery("web_source_no","00141-1*"),
  417. # should_docid
  418. ]
  419. ,must_not_queries=[NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*"))])
  420. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  421. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),get_total_count=True,limit=100),
  422. ColumnsToGet([],ColumnReturnType.NONE))
  423. log("total_count:%d"%total_count)
  424. dict_rows = getRow_ots(rows)
  425. for _row in dict_rows:
  426. self.task_queue.put(_row)
  427. while next_token:
  428. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
  429. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  430. ColumnsToGet([],ColumnReturnType.NONE))
  431. dict_rows = getRow_ots(rows)
  432. for _row in dict_rows:
  433. self.task_queue.put(_row)
  434. # import pandas as pd
  435. # df = pd.read_excel("2022-01-18_183521_export11.xlsx")
  436. # for docid in df["docid"]:
  437. # _dict = {document_docid:int(docid),
  438. # document_partitionkey:int(docid)%500+1,
  439. # }
  440. # self.task_queue.put(_dict)
  441. def comsumer(self):
  442. def _handle(item,result_queue,ots_client):
  443. docid = item.get(document_docid)
  444. fixDoc(docid,ots_client)
  445. ots_client = getConnect_ots()
  446. mt = MultiThreadHandler(self.task_queue,_handle,None,30,ots_client=ots_client)
  447. mt.run()
  448. def start(self):
  449. schedule = BlockingScheduler()
  450. t = Thread(target=self.producer)
  451. t.start()
  452. t.join()
  453. # schedule.add_job(self.producer,"cron",minute="*/1")
  454. schedule.add_job(self.comsumer,"cron",minute="*/1")
  455. schedule.start()
  456. t.join()
  457. def start_docFix():
  458. fd = FixDocument()
  459. fd.start()
  460. def export_doc_has_attachment(docs):
  461. for _doc in docs:
  462. if has_attachments(_doc):
  463. df_data["docid"].append(_doc)
  464. import pandas as pd
  465. df = pd.DataFrame(df_data)
  466. df.to_excel("2022-01-18_183521_export11.xlsx")
  467. if __name__=="__main__":
  468. docs = []
  469. a = '''
  470. 188178166
  471. '''
  472. for a in a.split("\n"):
  473. b = a.strip()
  474. if len(b)>0:
  475. docs.append(int(b))
  476. df_data = {"docid":[]}
  477. for _doc in docs:
  478. fixDoc(_doc)
  479. # start_docFix()
  480. # af = AttachmentFix()
  481. # af.schedule()
  482. # _flag,_ = download("http://59.55.120.164:8088/upload/images_file/shop/image/productType/standard/101.jpg")
  483. # print(_flag)