123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587 |
- from BaseDataMaintenance.model.ots.BaseModel import BaseModel
- from tablestore import *
- from BaseDataMaintenance.common.Utils import *
- from bs4 import BeautifulSoup
- from BaseDataMaintenance.common.Utils import article_limit
- document_partitionkey = "partitionkey"
- document_docid = "docid"
- document_dochtmlcon = "dochtmlcon"
- document_doctextcon = "doctextcon"
- document_doctitle = "doctitle"
- document_attachmenttextcon = "attachmenttextcon"
- document_attachment_path = "page_attachments"
- document_attachment_path_filemd5 = "fileMd5"
- document_attachment_path_fileTitle = "fileTitle"
- document_attachment_path_fileLink = "fileLink"
- document_crtime = "crtime"
- document_status = "status"
- document_page_time = "page_time"
- document_attachment_extract_status = "attachment_extract_status"
- document_web_source_no = "web_source_no"
- document_fingerprint = "fingerprint"
- document_opertime = "opertime"
- document_docchannel = "docchannel"
- document_original_docchannel = "original_docchannel"
- document_life_docchannel = "life_docchannel"
- document_area = "area"
- document_province = "province"
- document_city = "city"
- document_district = "district"
- document_extract_json = "extract_json"
- document_bidway = "bidway"
- document_industry = "industry"
- document_info_type = "info_type"
- document_qcodes = "qcodes"
- document_project_name = "project_name"
- document_project_code = "project_code"
- document_project_codes = "project_codes"
- document_tenderee = "tenderee"
- document_tenderee_addr = "tenderee_addr"
- document_tenderee_phone = "tenderee_phone"
- document_tenderee_contact = "tenderee_contact"
- document_agency = "agency"
- document_agency_phone = "agency_phone"
- document_agency_contact = "agency_contact"
- document_product = "product"
- document_moneysource = "moneysource"
- document_service_time = "service_time"
- document_time_bidclose = "time_bidclose"
- document_time_bidopen = "time_bidopen"
- document_time_bidstart = "time_bidstart"
- document_time_commencement = "time_commencement"
- document_time_completion = "time_completion"
- document_time_earnest_money_start = "time_earnest_money_start"
- document_time_earnest_money_end = "time_earnest_money_end"
- document_time_get_file_end = "time_get_file_end"
- document_time_get_file_start = "time_get_file_start"
- document_time_publicity_end = "time_publicity_end"
- document_time_publicity_start = "time_publicity_start"
- document_time_registration_end = "time_registration_end"
- document_time_registration_start = "time_registration_start"
- document_time_release = "time_release"
- document_info_source = "info_source"
- document_nlp_enterprise = "nlp_enterprise"
- document_nlp_enterprise_attachment = "nlp_enterprise_attachment"
- document_total_tenderee_money = "total_tenderee_money"
- document_update_document = "update_document"
- class Document(BaseModel):
- def __init__(self,_dict):
- BaseModel.__init__(self)
- for k,v in _dict.items():
- self.setValue(k,v,True)
- self.table_name = "document"
- self.prefixs = ["www.bidizhaobiao.com","bxkc.oss-cn-shanghai.aliyuncs.com"]
- def getPrimary_keys(self):
- return ["partitionkey","docid"]
- def getAttribute_turple(self):
- _list = []
- for _key in self.getAttribute_keys():
- if _key=="all_columns":
- continue
- _v = self.getProperties().get(_key)
- if _v is not None and _v!="":
- if isinstance(_v,list):
- _v = json.dumps(_v)
- _list.append((_key,_v))
- return _list
- # def delete_row(self,ots_client):
- # raise NotImplementedError()
- def isLegalUrl(self,_url,_type):
- _flag = False
- for _prefix in self.prefixs:
- if _url.find(_prefix)>=0:
- _flag = True
- if _type==0:
- if _flag:
- return True
- else:
- return False
- else:
- if _flag:
- return False
- else:
- return True
- def fromInitialed(self):
- self.setValue(document_status,random.randint(1,50),True)
- def fromEas2Maxcompute(self):
- self.setValue(document_status,random.randint(151,170),True)
- def fromEasFailed(self):
- self.setValue(document_status,random.randint(51,60),True)
- def fromEas2Extract(self):
- self.setValue(document_status,random.randint(61,70),True)
- def updateSWFImages(self,swf_urls):
- if len(swf_urls)>0:
- _dochtmlcon = self.getProperties().get(document_dochtmlcon)
- _soup = BeautifulSoup(_dochtmlcon,"lxml")
- if _soup.find("img",{"src":swf_urls[0]}) is None:
- _div = '<div class="swf_urls">'
- for _url in swf_urls:
- _div += '<p><img src="%s"/></p>'%(_url)
- _div += "</div>"
- _dochtmlcon += _div
- self.setValue(document_dochtmlcon,_dochtmlcon,True)
- def getRichTextFetch(self,list_html):
- _text = ""
- for _ht in list_html:
- if isinstance(_ht,str):
- _text += "<div>%s</div>"%(_ht)
- elif isinstance(_ht,dict):
- _filemd5 = _ht.get("filemd5","")
- _html = _ht.get("html","")
- _text += '<div filemd5="%s">%s</div>'%(_filemd5,_html)
- if len(_text)>50000:
- _soup = BeautifulSoup(_text,"lxml")
- _soup = article_limit(_soup,50000)
- _text = re.sub("<html>|</html>|<body>|</body>","",str(_soup))
- return _text
- def updateAttachment(self,list_html):
- if len(list_html)>0:
- _dochtmlcon = self.getProperties().get(document_dochtmlcon,"")
- _dochtmlcon = re.sub("<html>|</html>|<body>|</body>","",_dochtmlcon)
- _dochtmlcon_len = len(bytes(_dochtmlcon,encoding="utf8"))
- fix_len = self.COLUMN_MAX_SIZE-_dochtmlcon_len-100
- # _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%("\n".join(list_html))
- _text = '<div style="display:none;" class="richTextFetch">%s</div>'%(self.getRichTextFetch(list_html))
- if _dochtmlcon is not None:
- _soup = BeautifulSoup(_dochtmlcon,"lxml")
- _node = _soup.find("div",attrs={"class":"richTextFetch"})
- if _node is not None:
- _node.decompose()
- self.setValue(document_dochtmlcon,str(_soup)+_text,True)
- def getTitleFromHtml(self,filemd5,_html):
- _soup = BeautifulSoup(_html,"lxml")
- _find = _soup.find("a",attrs={"data":filemd5})
- _title = ""
- if _find is not None:
- _title = _find.get_text()
- return _title
- def getSourceLinkFromHtml(self,filemd5,_html):
- _soup = BeautifulSoup(_html,"lxml")
- _find = _soup.find("a",attrs={"filelink":filemd5})
- filelink = ""
- if _find is None:
- _find = _soup.find("img",attrs={"filelink":filemd5})
- if _find is not None:
- filelink = _find.attrs.get("src","")
- else:
- filelink = _find.attrs.get("href","")
- return filelink
- import random
- def turn_extract_status():
- from BaseDataMaintenance.dataSource.source import getConnect_ots
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- import queue
- from threading import Thread
- import json
- task_queue = queue.Queue()
- from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
- ots_client = getConnect_ots()
- def producer(task_queue,ots_client):
- bool_query = BoolQuery(must_queries=[
- # WildcardQuery(document_web_source_no,"00295*"),
- # RangeQuery(document_crtime,"2021-07-26 00:00:00"),
- RangeQuery(document_status,61,70,True,True),
- #TermQuery(document_docid,171146519),
- ]
- )
- rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([document_fingerprint],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- print(total_count)
- _count = len(list_data)
- for _data in list_data:
- _document = Document(_data)
- task_queue.put(_document)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([document_fingerprint],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- _count += len(list_data)
- print("%d/%d"%(_count,total_count))
- for _data in list_data:
- _document = Document(_data)
- task_queue.put(_document)
- def _handle(item,result_queue,ots_client):
- #change attach value
- # list_attachment = json.loads(item.getProperties().get(document_attachment_path))
- # print("docid",item.getProperties().get(document_docid))
- # for attach in list_attachment:
- #
- # filemd5 = attach.get(document_attachment_path_filemd5,"")
- # _document_html = item.getProperties().get(document_dochtmlcon,"")
- #
- # _file_title = item.getTitleFromHtml(filemd5,_document_html)
- # filelink = item.getSourceLinkFromHtml(filemd5,_document_html)
- # attach[document_attachment_path_fileTitle] = _file_title
- # attach[document_attachment_path_fileLink] = filelink
- #
- # item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
- # item.all_columns.remove(document_dochtmlcon)
- #change status
- item.setValue(document_status,random.randint(1,50),True)
- item.update_row(ots_client)
- t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
- t_producer.start()
- t_producer.join()
- # mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
- # mt.run()
- dict_fingerprint = {}
- while True:
- try:
- item = task_queue.get(timeout=2)
- fingerprint = item.getProperties().get(document_fingerprint)
- if fingerprint is not None:
- if fingerprint not in dict_fingerprint:
- dict_fingerprint[fingerprint] = []
- dict_fingerprint[fingerprint].append(item)
- except Exception as e:
- print(e)
- break
- print(len(dict_fingerprint.keys()))
- status_queue = queue.Queue()
- for k,v in dict_fingerprint.items():
- print("key",k,len(v))
- v.sort(key=lambda x:x.docid)
- for _d in v[1:]:
- _d.setValue(document_status,random.randint(401,450),True)
- status_queue.put(_d)
- mt = MultiThreadHandler(status_queue,_handle,None,30,ots_client=ots_client)
- mt.run()
- def turn_document_status():
- from BaseDataMaintenance.dataSource.source import getConnect_ots
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- import queue
- from threading import Thread
- import json
- task_queue = queue.Queue()
- from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
- ots_client = getConnect_ots()
- def producer(task_queue,ots_client):
- # bool_query = BoolQuery(
- # must_queries=[
- # MatchPhraseQuery("doctitle","珠海城市职业技术学院2022年05月至2022年06月政府采购意向"),
- # # BoolQuery(should_queries=[
- # # # TermQuery("tenderee","山西利民工业有限责任公司"),
- # # # MatchPhraseQuery("doctitle","中国电信"),
- # # # MatchPhraseQuery("doctextcon","中国电信"),
- # # # MatchPhraseQuery("attachmenttextcon","中国电信")]),
- # # # RangeQuery(document_status,88,120,True,True),
- # # RangeQuery("page_time","2022-03-24","2022-03-25",True,False),
- # # ExistsQuery
- # # #,TermQuery(document_docid,171146519)
- # # ]
- # # )
- # ],
- # # must_not_queries=[WildcardQuery("DX004354*")]
- # )
- #
- # rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
- # SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
- # columns_to_get=ColumnsToGet([document_area],return_type=ColumnReturnType.SPECIFIED))
- # list_data = getRow_ots(rows)
- # print(total_count)
- # _count = len(list_data)
- # for _data in list_data:
- # _document = Document(_data)
- # task_queue.put(_document)
- # while next_token:
- # rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
- # SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- # columns_to_get=ColumnsToGet([document_area],return_type=ColumnReturnType.SPECIFIED))
- # list_data = getRow_ots(rows)
- # _count += len(list_data)
- # print("%d/%d"%(_count,total_count))
- # for _data in list_data:
- # _document = Document(_data)
- # task_queue.put(_document)
- # docids = [223820830,224445409]
- # for docid in docids:
- # _dict = {document_docid:int(docid),
- # document_partitionkey:int(docid)%500+1,
- # }
- # task_queue.put(Document(_dict))
- import pandas as pd
- df = pd.read_excel("G:\\20221212error.xlsx")
- for docid in df["docid"]:
- _dict = {document_docid:int(docid),
- document_partitionkey:int(docid)%500+1,
- }
- task_queue.put(Document(_dict))
- log("task_queue size:%d"%(task_queue.qsize()))
- def _handle(item,result_queue,ots_client):
- #change attach value
- # list_attachment = json.loads(item.getProperties().get(document_attachment_path))
- # print("docid",item.getProperties().get(document_docid))
- # for attach in list_attachment:
- #
- # filemd5 = attach.get(document_attachment_path_filemd5,"")
- # _document_html = item.getProperties().get(document_dochtmlcon,"")
- #
- # _file_title = item.getTitleFromHtml(filemd5,_document_html)
- # filelink = item.getSourceLinkFromHtml(filemd5,_document_html)
- # attach[document_attachment_path_fileTitle] = _file_title
- # attach[document_attachment_path_fileLink] = filelink
- #
- # item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
- # item.all_columns.remove(document_dochtmlcon)
- #change status
- # item.setValue(document_docchannel,item.getProperties().get(document_original_docchannel),True)
- # item.setValue(document_status,random.randint(151,171),True)
- # item.setValue(document_area,"华南",True)
- # item.setValue(document_province,"广东",True)
- # item.setValue(document_city,"珠海",True)
- # item.setValue(document_district,"金湾区",True)
- item.setValue(document_status,1,True)
- # print(item.getProperties())
- item.update_row(ots_client)
- # log("update %d status done"%(item.getProperties().get(document_docid)))
- pass
- t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
- t_producer.start()
- t_producer.join()
- mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
- mt.run()
- def drop_extract2():
- from BaseDataMaintenance.dataSource.source import getConnect_ots
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- import queue
- from threading import Thread
- import json
- task_queue = queue.Queue()
- from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
- ots_client = getConnect_ots()
- from BaseDataMaintenance.model.ots.document_extract2 import Document_extract2
- def producer(task_queue,ots_client):
- bool_query = BoolQuery(must_queries=[
- BoolQuery(should_queries=[
- # TermQuery("tenderee","山西利民工业有限责任公司"),
- # MatchPhraseQuery("doctitle","中国电信"),
- # MatchPhraseQuery("doctextcon","中国电信"),
- # MatchPhraseQuery("attachmenttextcon","中国电信")]),
- RangeQuery("status",1,1000,True,True),
- # RangeQuery("page_time","2021-12-20","2022-01-05",True,False),
- #,TermQuery(document_docid,171146519)
- ]
- ),
- # TermQuery("docid",228359000)
- ],
- # must_not_queries=[NestedQuery("sub_docs_json",WildcardQuery("sub_docs_json.win_tenderer","*"))]
- )
- rows,next_token,total_count,is_all_succeed = ots_client.search("document_extract2","document_extract2_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(["status"],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- print(total_count)
- _count = len(list_data)
- for _data in list_data:
- task_queue.put(_data)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search("document_extract2","document_extract2_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(["status"],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- _count += len(list_data)
- print("%d/%d"%(_count,total_count))
- for _data in list_data:
- task_queue.put(_data)
- # docids = [223820830,224445409]
- # for docid in docids:
- # _dict = {document_docid:int(docid),
- # document_partitionkey:int(docid)%500+1,
- # }
- # task_queue.put(Document(_dict))
- # import pandas as pd
- # df = pd.read_excel("2022-01-19_214304_export11.xlsx")
- # for docid,tenderee,win in zip(df["docid"],df["招标单位"],df["中标单位"]):
- # if not isinstance(tenderee,(str)) or not isinstance(win,(str)) or win=="" or tenderee=="":
- # # print(docid)
- # _dict = {document_docid:int(docid),
- # document_partitionkey:int(docid)%500+1,
- # }
- # task_queue.put(Document(_dict))
- log("task_queue size:%d"%(task_queue.qsize()))
- def _handle(item,result_queue,ots_client):
- #change attach value
- # list_attachment = json.loads(item.getProperties().get(document_attachment_path))
- # print("docid",item.getProperties().get(document_docid))
- # for attach in list_attachment:
- #
- # filemd5 = attach.get(document_attachment_path_filemd5,"")
- # _document_html = item.getProperties().get(document_dochtmlcon,"")
- #
- # _file_title = item.getTitleFromHtml(filemd5,_document_html)
- # filelink = item.getSourceLinkFromHtml(filemd5,_document_html)
- # attach[document_attachment_path_fileTitle] = _file_title
- # attach[document_attachment_path_fileLink] = filelink
- #
- # item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
- # item.all_columns.remove(document_dochtmlcon)
- #change status
- # item.setValue(document_docchannel,item.getProperties().get(document_original_docchannel),True)
- # item.setValue(document_status,random.randint(151,170),True)
- # item.update_row(ots_client)
- # log("update %d status done"%(item.getProperties().get(document_docid)))
- _dict = {}
- _dict.update(item)
- _dict.pop("status")
- _dict["status"] = 1
- print(_dict)
- _document = Document(_dict)
- _document.update_row(ots_client)
- _d_extract = Document_extract2(_dict)
- _d_extract.delete_row(ots_client)
- pass
- t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
- t_producer.start()
- t_producer.join()
- mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
- mt.run()
- def fixDocumentHtml():
- from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_ots_capacity
- from queue import Queue
- ots_client = getConnect_ots()
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- from BaseDataMaintenance.model.ots.document_html import Document_html
- capacity_client = getConnect_ots_capacity()
- list_data = []
- bool_query = BoolQuery(must_queries=[
- MatchPhraseQuery("doctextcon","信友-城市之光"),
- MatchPhraseQuery("doctextcon","Copyright"),
- # TermQuery("docid",254249505)
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(["doctextcon"],return_type=ColumnReturnType.SPECIFIED))
- print("total_count",total_count)
- list_data.extend(getRow_ots(rows))
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
- SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(["doctextcon"],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- task_queue = Queue()
- for _data in list_data:
- task_queue.put(_data)
- _pattern = "(?P<_find>城市之光.*Ltd.)"
- _pattern1 = "(?P<_find>Evaluation.*Ltd.)"
- def _handle(item,result_queue):
- _doctextcon = item.get("doctextcon")
- _search = re.search(_pattern,_doctextcon)
- print(_search.groupdict().get("_find"))
- item["doctextcon"] = re.sub(_pattern,"",_doctextcon)
- _d = Document(item)
- _d.update_row(ots_client)
- _d1 = {"partitionkey":item.get("partitionkey"),
- "docid":item.get("docid")}
- _dh = Document(_d1)
- _dh.fix_columns(capacity_client,["dochtmlcon"],True)
- _dochtmlcon = _dh.getProperties().get("dochtmlcon")
- _dochtmlcon = re.sub("\n","",_dochtmlcon)
- _search = re.search(_pattern1,_dochtmlcon)
- _dochtmlcon = re.sub(_pattern1,"",_dochtmlcon)
- _d1["dochtmlcon"] = _dochtmlcon
- _dh = Document(_d1)
- _dh.update_row(capacity_client)
- # print(re.sub(_pattern,"</div><p><span>",_dochtmlcon))
- mt = MultiThreadHandler(task_queue,_handle,None,2)
- mt.run()
- def delete_documents():
- from BaseDataMaintenance.dataSource.source import getConnect_ots
- from BaseDataMaintenance.dataSource.source import getConnect_ots_capacity
- ots_client = getConnect_ots()
- ots_capacity = getConnect_ots_capacity()
- import pandas as pd
- df = pd.read_excel("2022-10-14_190838_数据导出.xlsx")
- _count = 0
- for _docid in df["docid"]:
- partitionkey = int(_docid)%500+1
- _d = {document_partitionkey:partitionkey,
- document_docid:int(_docid)}
- _doc = Document(_d)
- _doc.delete_row(ots_client)
- _doc.delete_row(ots_capacity)
- _count += 1
- print(_docid)
- print("delete count:%d"%_count)
- if __name__=="__main__":
- # turn_extract_status()
- turn_document_status()
- # drop_extract2()
- # fixDocumentHtml()
|