123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519 |
- from BaseDataMaintenance.dataSource.pool import ConnectorPool
- from BaseDataMaintenance.dataSource.source import *
- from BaseDataMaintenance.common.Utils import *
- import queue
- from tablestore import *
- from threading import Thread,RLock
- from apscheduler.schedulers.blocking import BlockingScheduler
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- from BaseDataMaintenance.model.mysql.BaseModel import BaseModel
- from BaseDataMaintenance.model.ots.document import *
- import traceback
- from BaseDataMaintenance.dataSource.download import download
- import base64
- from BaseDataMaintenance.dataSource.interface import getAttachDealInterface,sentMsgToDD
- from BaseDataMaintenance.model.ots.attachment import *
- from BaseDataMaintenance.common.ossUtils import *
- from uuid import uuid4
- from bs4 import BeautifulSoup
- from BaseDataMaintenance.model.ots.document_fix_page_attachments import *
- import threading
- class AttachmentFix():
- def __init__(self):
- self.ots_client = getConnect_ots()
- self.auth = getAuth()
- check_url = "oss-cn-hangzhou-internal.aliyuncs.com"
- if is_internal:
- self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
- else:
- self.bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
- self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
- log("bucket_url:%s"%(self.bucket_url))
- self.attachment_bucket_name = "attachment-hub"
- self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
- self.task_queue = queue.Queue(3000)
- self.current_path = os.path.dirname(__file__)
- self.fileSuffix = [".zip", ".rar", ".tar", ".7z", ".wim", ".docx", ".doc", ".xlsx", ".xls", ".pdf", ".txt", ".hnzf", ".bmp", ".jpg", ".jpeg", ".png", ".tif", ".swf"]
- self.cul_size = 0
- self.cul_lock = RLock()
- self.download_path = os.path.join(self.current_path,"fixdownload")
- for file in os.listdir(self.download_path):
- filepath = os.path.join(self.download_path,file)
- if os.path.isfile(filepath):
- os.remove(filepath)
- self.dict_filelink = dict()
- self.set_failed = set()
- self.lock_download = RLock()
- self.lock_failed = RLock()
- def producer(self):
- bool_query = BoolQuery(
- must_not_queries=[RangeQuery(document_fix_status,1,3,True,True)]
- # must_queries=[TermQuery(document_fix_docid,113881457)]
- )
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_fix_page_attachments","document_fix_page_attachments_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([document_fix_attachments,document_fix_new_attachment],ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- _fix = document_fix_page_attachments(_dict)
- self.task_queue.put(_fix,True)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_fix_page_attachments","document_fix_page_attachments_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([document_fix_attachments,document_fix_new_attachment],ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- _fix = document_fix_page_attachments(_dict)
- self.task_queue.put(_fix,True)
- def getDownloaded(self,_url):
- for _i1 in range(600):
- with self.lock_download:
- if len(self.dict_filelink)>100000:
- log("clear filelink %d"%(len(self.dict_filelink)))
- self.dict_filelink.clear()
- if _url in self.dict_filelink:
- _result = self.dict_filelink[_url]
- if _result == "":
- t_id = threading.current_thread().ident
- # log("%s watting %s"%(str(t_id),_url))
- else:
- return _result
- else:
- self.dict_filelink[_url] = ""
- return "current"
- time.sleep(0.1)
- return "current"
- def getFailed(self,_url):
- with self.lock_failed:
- _flag = False
- if _url in self.set_failed:
- _flag= True
- if len(self.set_failed)>100000:
- self.set_failed.clear()
- return _flag
- def comsumer(self):
- def _handle(_fix,result_queue):
- try:
- page_attachments = json.loads(_fix.getProperties().get(document_fix_attachments,"[]"))
- docid = _fix.getProperties().get(document_fix_docid)
- new_page_attachments = []
- _count = 0
- for _attach in page_attachments:
- try:
- _count += 1
- if _count>10:
- break
- _url = _attach.get("fileLink","")
- has_download = self.getDownloaded(_url)
- if has_download=="current":
- pass
- elif has_download is None:
- log("has download but to large %s"%(_url))
- continue
- else:
- log("has download %s"%(_url))
- new_page_attachments.append(has_download)
- continue
- _title = _attach.get("fileTitle","")
- filetype = ""
- _split = _url.split(".")
- for _suf in self.fileSuffix:
- if _url.find(_suf)>=0 or _title.find(_suf)>=0:
- filetype = _suf[1:]
- break
- if filetype!="":
- if _url.find("http")>=0:
- filename = uuid4().hex
- filepath = "%s/fixdownload/%s"%(self.current_path,filename)
- try:
- flag,content = download(_url,filepath,timeout=7)
- # with open(filepath,"wb") as f:
- # f.write(content)
- if not flag:
- log("download failed %s"%(_url))
- if flag:
- log("download %d succeed %s"%(docid,_url))
- file_size = os.path.getsize(filepath)
- if file_size>ATTACHMENT_LARGESIZE*30:
- self.dict_filelink[_url] = None
- if os.path.exists(filepath):
- os.remove(filepath)
- continue
- while True:
- with self.cul_lock:
- if file_size<1024*1024:
- self.cul_size += file_size
- break
- if self.cul_size<300*1024*1024:
- self.cul_size += file_size
- break
- time.sleep(1)
- _md5,_size = getMDFFromFile(filepath)
- with self.cul_lock:
- self.cul_size -= file_size
- log("current cul_size:%6f"%(self.cul_size/(1024*1024*1024)))
- log(_md5)
- objectPath = "%s/fix/%s/%s.%s"%(_md5[:4],getCurrent_date("%Y-%m-%d"),filename,filetype)
- _status = random.randint(40,50)
- if _size>ATTACHMENT_LARGESIZE:
- _status = -1
- attach_dict = {attachment_filemd5:_md5,attachment_filetype:filetype,attachment_path:objectPath,
- attachment_size:_size,attachment_crtime:getCurrent_date(),attachment_status:_status,attachment_file_link:_url}
- attach = attachment(attach_dict)
- if not attach.exists_row(self.ots_client):
- attach.update_row(self.ots_client)
- uploadFileByPath(self.bucket,filepath,objectPath)
- log("docid:%d upload succeed %s"%(docid,_md5))
- else:
- log("docid:%d has upload %s"%(docid,_md5))
- new_page_attachments.append(_attach)
- self.dict_filelink[_url] = _attach
- _attach[document_attachment_path_filemd5] = _md5
- except Exception as e:
- pass
- finally:
- if os.path.exists(filepath):
- os.remove(filepath)
- except Exception as e:
- log(str(e))
- traceback.print_exc()
- _fix.setValue(document_fix_count,len(page_attachments),True)
- _fix.setValue(document_fix_succeed_count,len(new_page_attachments),True)
- _fix.setValue(document_fix_new_attachment,json.dumps(new_page_attachments,ensure_ascii=False),True)
- if len(new_page_attachments)>0:
- if len(new_page_attachments)==len(page_attachments):
- _fix.setValue(document_fix_status,1,True)
- else:
- _fix.setValue(document_fix_status,2,True)
- _dict = {document_partitionkey:_fix.getProperties().get(document_fix_partitionkey),
- document_docid:_fix.getProperties().get(document_fix_docid),
- document_attachment_path:_fix.getProperties().get(document_fix_new_attachment)}
- _document = Document(_dict)
- log("insert docid %s"%(str(_fix.getProperties().get(document_fix_docid))))
- _document.update_row(self.ots_client)
- else:
- _fix.setValue(document_fix_status,3,True)
- _fix.update_row(self.ots_client)
- log("handle docid:%d with status:%d"%(_fix.getProperties().get(document_fix_docid),_fix.getProperties().get(document_fix_status)))
- except Exception as e:
- traceback.print_exc()
- mt = MultiThreadHandler(self.task_queue,_handle,None,90)
- mt.run()
- def schedule(self):
- scheduler = BlockingScheduler()
- scheduler.add_job(self.producer,"cron",minute="*/1")
- scheduler.add_job(self.comsumer,"cron",minute="*/1")
- scheduler.start()
- def start_attachFix():
- af = AttachmentFix()
- af.schedule()
- def getAttachFromPath(filepath,filetype):
- if os.path.exists(filepath):
- filemd5,size = getMDFFromFile(filepath)
- if filemd5 is not None:
- relapath = "%s/supplement/%s/%s.%s"%(filemd5[:4],getCurrent_date("%Y-%m-%d"),filemd5,filetype)
- _t = relapath.split(".")
- if len(_t)>0:
- filetype = _t[-1]
- filetype = filetype.lower()
- print("filemd5",filemd5)
- _dict = {attachment_filemd5:filemd5,attachment_size:size,attachment_path:relapath,attachment_crtime:getCurrent_date("%Y-%m-%d %H:%M:%S"),attachment_filetype:filetype,attachment_status:ATTACHMENT_INIT}
- print(_dict)
- _attach = attachment(_dict)
- if _attach.getProperties().get(attachment_size)<ATTACHMENT_SMALL_SIZE:
- #写入base64到attachment
- # _data_base64 = base64.b64encode(open(filepath,"rb").read())
- # _attach.setValue(attachment_base64,_data_base64,True)
- _attach.setStatusToMCDeal()
- else:
- _attach.setValue(attachment_status,ATTACHMENT_TOOLARGE,True)
- return _attach
- else:
- log("md5 is None")
- else:
- log("file not exists %s"%(filepath))
- return None
- from BaseDataMaintenance.maintenance.attachment.attachmentProcess import AttachProcess
- common_bucket = None
- common_ots_client = None
- def getOtsClient():
- global common_ots_client
- if common_ots_client is None:
- common_ots_client = getConnect_ots()
- return common_ots_client
- def getBucket():
- global common_bucket
- if common_bucket is None:
- auth = getAuth()
- check_url = "oss-cn-hangzhou-internal.aliyuncs.com"
- if is_internal:
- bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
- else:
- bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
- attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
- log("bucket_url:%s"%(bucket_url))
- attachment_bucket_name = "attachment-hub"
- common_bucket = oss2.Bucket(auth,bucket_url,attachment_bucket_name)
- return common_bucket
- common_ap = AttachProcess()
- def makeFixing(_document):
- common_ap.comsumer_handle(_document,None)
- def fixAttachmentOfDoc(docid,new_files):
- #补充未下载的公告方法
- ots_client = getOtsClient()
- bucket = getBucket()
- list_attach = []
- new_attachments = []
- for _file in new_files:
- filepath = _file.get("filepath")
- filetype = _file.get("file_type")
- _attach = getAttachFromPath(filepath,filetype)
- if _attach is None:
- return
- current_filemd5 = _attach.getProperties().get(attachment_filemd5)
- log("supplement %s"%current_filemd5)
- _old_status = None
- if not _attach.exists_row(ots_client):
- log("not exists %s"%current_filemd5)
- _old_status = _attach.getProperties().get(attachment_status)
- _attach.setValue(attachment_status,0,True)
- _attach.setValue("old_status",_old_status,False)
- _attach.update_row(ots_client)
- uploadFileByPath(bucket,filepath,_attach.getProperties().get(attachment_path))
- else:
- _attach.getProperties().pop(attachment_path)
- _attach.fix_columns(ots_client,[attachment_status],True)
- _old_status = _attach.getProperties().get(attachment_status)
- _attach.setValue("old_status",_old_status,False)
- _attach.setValue(attachment_status,10,True)
- _attach.update_row(ots_client)
- list_attach.append(_attach)
- new_attachments.append({document_attachment_path_filemd5:current_filemd5})
- partitionkey = docid%500+1
- _document = Document({document_partitionkey:partitionkey,document_docid:docid})
- _document.fix_columns(ots_client,[document_attachment_path,document_attachment_extract_status],True)
- log(_document.getProperties().get(document_attachment_path,"[]"))
- _old_attachments = _document.getProperties().get(document_attachment_path,"[]")
- if _old_attachments=="":
- _old_attachments = "[]"
- _page_attachments = json.loads(_old_attachments)
- print("fixing",docid,_page_attachments)
- for new_item in new_attachments:
- _exists = False
- for item in _page_attachments:
- if item.get(document_attachment_path_filemd5)==new_item.get(document_attachment_path_filemd5):
- _exists = True
- if not _exists:
- _page_attachments.append(new_item)
- _document.setValue(document_attachment_path,json.dumps(_page_attachments,ensure_ascii=False),True)
- if len(_page_attachments)>0:
- _document.setValue(document_status,1,True)
- _document.update_row(ots_client)
- # makeFixing(_document) #新流程不再需要这一步
- for _attach in list_attach:
- if _attach.getProperties().get("old_status") is not None:
- _attach.setValue(attachment_status,_attach.getProperties().get("old_status"),True)
- _attach.update_row(ots_client)
- attach_status = _document.getProperties().get(document_attachment_extract_status)
- # _document.setValue(document_crtime,getCurrent_date(),True)
- if attach_status==1:
- _document.setValue(document_attachment_extract_status,0,True)
- _document.update_row(ots_client)
- def extract_pageAttachments(_html):
- fileSuffix = [".zip", ".rar", ".tar", ".7z", ".wim", ".docx", ".doc", ".xlsx", ".xls", ".pdf", ".txt", ".hnzf", ".bmp", ".jpg", ".jpeg", ".png", ".tif", ".swf"]
- _soup = BeautifulSoup(_html,"lxml")
- list_a = _soup.find_all("a")
- list_img = _soup.find_all("img")
- page_attachments = []
- for _a in list_a:
- _text =_a.get_text()
- _url = _a.attrs.get("href","")
- if _url.find("http://www.bidizhaobiao.com")>=0:
- continue
- if _url.find("detail-area-list-icon.png")>=0:
- continue
- is_attach = False
- if _url.find("http://zbtb.gd.gov.cn/platform/attach")>=0:
- is_attach = True
- file_type = ".pdf"
- for suf in fileSuffix:
- if _text.find(suf)>=0 or _url.find(suf)>=0:
- is_attach = True
- file_type = suf.lower()
- if is_attach:
- page_attachments.append({"fileLink":_url,"fileTitle":_text,"file_type":file_type[1:]})
- for _a in list_img:
- _text =_a.get_text()
- _url = _a.attrs.get("src","")
- if _url.find("http://www.bidizhaobiao.com")>=0:
- continue
- is_attach = False
- for suf in fileSuffix:
- if _text.find(suf)>=0 or _url.find(suf)>=0:
- is_attach = True
- file_type = suf.lower()
- if is_attach:
- page_attachments.append({"fileLink":_url,"fileTitle":_text,"file_type":file_type})
- return page_attachments
- def fixDoc(docid,ots_client=None):
- '''
- 修复公告内的附件未下载的数据
- :param docid:
- :param ots_client:
- :return:
- '''
- if ots_client is None:
- ots_client = getConnect_ots()
- capacity_client = getConnect_ots_capacity()
- partitionkey = docid%500+1
- _document = Document({document_partitionkey:partitionkey,document_docid:docid})
- _document.fix_columns(ots_client,[document_attachment_path,document_attachment_extract_status],True)
- _document.fix_columns(capacity_client,[document_dochtmlcon],True)
- log("=1")
- if _document.getProperties().get(document_attachment_extract_status,0)!=1 or 1:
- log("=2")
- if _document.getProperties().get(document_attachment_path,'[]')=="[]":
- log("=3")
- new_attachments = extract_pageAttachments(_document.getProperties().get(document_dochtmlcon))
- log(str(new_attachments))
- new_files = []
- for _atta in new_attachments:
- new_file_name = "%s/fixdownload/%s"%(os.path.dirname(__file__),uuid4().hex)
- _flag,_ = download(_atta.get("fileLink"),new_file_name)
- if _flag:
- new_files.append({"filepath":new_file_name,"file_type":_atta.get("file_type")})
- fixAttachmentOfDoc(docid,new_files)
- for _file in new_files:
- if os.path.exists(_file.get("filepath")):
- os.remove(_file.get("filepath"))
- class FixDocument():
- def __init__(self):
- self.ots_client = getOtsClient()
- self.bucket = getBucket()
- self.task_queue = queue.Queue()
- def producer(self):
- should_docid = BoolQuery(should_queries=[
- RangeQuery("docid",173470530,174453795),
- RangeQuery("docid",180724983,200250181)
- ])
- bool_query = BoolQuery(must_queries=[
- # RangeQuery("crtime",'2021-08-10 00:00:00','2021-10-10 00:00:00'),
- RangeQuery("docid",86363824),
- WildcardQuery("web_source_no","00141-1*"),
- # should_docid
- ]
- ,must_not_queries=[NestedQuery("page_attachments",WildcardQuery("page_attachments.fileMd5","*"))])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),get_total_count=True,limit=100),
- ColumnsToGet([],ColumnReturnType.NONE))
- log("total_count:%d"%total_count)
- dict_rows = getRow_ots(rows)
- for _row in dict_rows:
- self.task_queue.put(_row)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
- SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
- ColumnsToGet([],ColumnReturnType.NONE))
- dict_rows = getRow_ots(rows)
- for _row in dict_rows:
- self.task_queue.put(_row)
- # import pandas as pd
- # df = pd.read_excel("2022-01-18_183521_export11.xlsx")
- # for docid in df["docid"]:
- # _dict = {document_docid:int(docid),
- # document_partitionkey:int(docid)%500+1,
- # }
- # self.task_queue.put(_dict)
- def comsumer(self):
- def _handle(item,result_queue,ots_client):
- docid = item.get(document_docid)
- fixDoc(docid,ots_client)
- ots_client = getConnect_ots()
- mt = MultiThreadHandler(self.task_queue,_handle,None,30,ots_client=ots_client)
- mt.run()
- def start(self):
- schedule = BlockingScheduler()
- t = Thread(target=self.producer)
- t.start()
- t.join()
- # schedule.add_job(self.producer,"cron",minute="*/1")
- schedule.add_job(self.comsumer,"cron",minute="*/1")
- schedule.start()
- t.join()
- def start_docFix():
- fd = FixDocument()
- fd.start()
- if __name__=="__main__":
- # docs = [156668514]
- # for _doc in docs:
- # fixDoc(_doc)
- start_docFix()
- # af = AttachmentFix()
- # af.schedule()
- # _flag,_ = download("http://59.55.120.164:8088/upload/images_file/shop/image/productType/standard/101.jpg")
- # print(_flag)
|