from BaseDataMaintenance.model.ots.BaseModel import BaseModel from tablestore import * from BaseDataMaintenance.common.Utils import * from bs4 import BeautifulSoup from BaseDataMaintenance.common.Utils import article_limit document_partitionkey = "partitionkey" document_docid = "docid" document_dochtmlcon = "dochtmlcon" document_doctextcon = "doctextcon" document_doctitle = "doctitle" document_attachmenttextcon = "attachmenttextcon" document_attachment_path = "page_attachments" document_attachment_path_filemd5 = "fileMd5" document_attachment_path_fileTitle = "fileTitle" document_attachment_path_fileLink = "fileLink" document_crtime = "crtime" document_status = "status" document_page_time = "page_time" document_attachment_extract_status = "attachment_extract_status" document_web_source_no = "web_source_no" document_web_source_name = "web_source_name" document_fingerprint = "fingerprint" document_opertime = "opertime" document_docchannel = "docchannel" document_original_docchannel = "original_docchannel" document_life_docchannel = "life_docchannel" document_area = "area" document_province = "province" document_city = "city" document_district = "district" document_extract_json = "extract_json" document_bidway = "bidway" document_industry = "industry" document_info_type = "info_type" document_qcodes = "qcodes" document_project_name = "project_name" document_project_code = "project_code" document_project_codes = "project_codes" document_tenderee = "tenderee" document_tenderee_addr = "tenderee_addr" document_tenderee_phone = "tenderee_phone" document_tenderee_contact = "tenderee_contact" document_agency = "agency" document_agency_phone = "agency_phone" document_agency_contact = "agency_contact" document_product = "product" document_moneysource = "moneysource" document_service_time = "service_time" document_time_bidclose = "time_bidclose" document_time_bidopen = "time_bidopen" document_time_bidstart = "time_bidstart" document_time_commencement = "time_commencement" document_time_completion = "time_completion" document_time_earnest_money_start = "time_earnest_money_start" document_time_earnest_money_end = "time_earnest_money_end" document_time_get_file_end = "time_get_file_end" document_time_get_file_start = "time_get_file_start" document_time_publicity_end = "time_publicity_end" document_time_publicity_start = "time_publicity_start" document_time_registration_end = "time_registration_end" document_time_registration_start = "time_registration_start" document_time_release = "time_release" document_info_source = "info_source" document_nlp_enterprise = "nlp_enterprise" document_nlp_enterprise_attachment = "nlp_enterprise_attachment" document_total_tenderee_money = "total_tenderee_money" document_update_document = "update_document" class Document(BaseModel): def __init__(self,_dict): BaseModel.__init__(self) for k,v in _dict.items(): self.setValue(k,v,True) self.table_name = "document" self.prefixs = ["www.bidizhaobiao.com","bxkc.oss-cn-shanghai.aliyuncs.com"] def getPrimary_keys(self): return ["partitionkey","docid"] def getAttribute_turple(self): _list = [] for _key in self.getAttribute_keys(): if _key=="all_columns": continue _v = self.getProperties().get(_key) if _v is not None and _v!="": if isinstance(_v,list): _v = json.dumps(_v) _list.append((_key,_v)) return _list # def delete_row(self,ots_client): # raise NotImplementedError() def isLegalUrl(self,_url,_type): _flag = False for _prefix in self.prefixs: if _url.find(_prefix)>=0: _flag = True if _type==0: if _flag: return True else: return False else: if _flag: return False else: return True def fromInitialed(self): self.setValue(document_status,random.randint(1,50),True) def fromEas2Maxcompute(self): self.setValue(document_status,random.randint(151,170),True) def fromEasFailed(self): self.setValue(document_status,random.randint(51,60),True) def fromEas2Extract(self): self.setValue(document_status,random.randint(61,70),True) def updateSWFImages(self,swf_urls): if len(swf_urls)>0: _dochtmlcon = self.getProperties().get(document_dochtmlcon) _soup = BeautifulSoup(_dochtmlcon,"lxml") if _soup.find("img",{"src":swf_urls[0]}) is None: _div = '
' for _url in swf_urls: _div += '

'%(_url) _div += "
" _dochtmlcon += _div self.setValue(document_dochtmlcon,_dochtmlcon,True) def getRichTextFetch(self,list_html): _text = "" for _ht in list_html: if isinstance(_ht,str): _text += "
%s
"%(_ht) elif isinstance(_ht,dict): _filemd5 = _ht.get("filemd5","") _html = _ht.get("html","") _text += '
%s
'%(_filemd5,_html) if len(_text)>50000: _soup = BeautifulSoup(_text,"lxml") _soup = article_limit(_soup,50000) _text = re.sub("|||","",str(_soup)) return _text def updateAttachment(self,list_html): if len(list_html)>0: _dochtmlcon = self.getProperties().get(document_dochtmlcon,"") _dochtmlcon = re.sub("|||","",_dochtmlcon) _dochtmlcon_len = len(bytes(_dochtmlcon,encoding="utf8")) fix_len = self.COLUMN_MAX_SIZE-_dochtmlcon_len-100 # _text = '\n'%("\n".join(list_html)) _text = ''%(self.getRichTextFetch(list_html)) if _dochtmlcon is not None: _soup = BeautifulSoup(_dochtmlcon,"lxml") _node = _soup.find("div",attrs={"class":"richTextFetch"}) if _node is not None: _node.decompose() self.setValue(document_dochtmlcon,str(_soup)+_text,True) def getTitleFromHtml(self,filemd5,_html): _soup = BeautifulSoup(_html,"lxml") _find = _soup.find("a",attrs={"data":filemd5}) _title = "" if _find is not None: _title = _find.get_text() return _title def getSourceLinkFromHtml(self,filemd5,_html): _soup = BeautifulSoup(_html,"lxml") _find = _soup.find("a",attrs={"filelink":filemd5}) filelink = "" if _find is None: _find = _soup.find("img",attrs={"filelink":filemd5}) if _find is not None: filelink = _find.attrs.get("src","") else: filelink = _find.attrs.get("href","") return filelink import random def turn_extract_status(): from BaseDataMaintenance.dataSource.source import getConnect_ots from BaseDataMaintenance.common.multiThread import MultiThreadHandler import queue from threading import Thread import json task_queue = queue.Queue() from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link ots_client = getConnect_ots() def producer(task_queue,ots_client): bool_query = BoolQuery(must_queries=[ # WildcardQuery(document_web_source_no,"00295*"), # RangeQuery(document_crtime,"2021-07-26 00:00:00"), RangeQuery(document_status,61,70,True,True), #TermQuery(document_docid,171146519), ] ) rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True), columns_to_get=ColumnsToGet([document_fingerprint],return_type=ColumnReturnType.SPECIFIED)) list_data = getRow_ots(rows) print(total_count) _count = len(list_data) for _data in list_data: _document = Document(_data) task_queue.put(_document) while next_token: rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index", SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True), columns_to_get=ColumnsToGet([document_fingerprint],return_type=ColumnReturnType.SPECIFIED)) list_data = getRow_ots(rows) _count += len(list_data) print("%d/%d"%(_count,total_count)) for _data in list_data: _document = Document(_data) task_queue.put(_document) def _handle(item,result_queue,ots_client): #change attach value # list_attachment = json.loads(item.getProperties().get(document_attachment_path)) # print("docid",item.getProperties().get(document_docid)) # for attach in list_attachment: # # filemd5 = attach.get(document_attachment_path_filemd5,"") # _document_html = item.getProperties().get(document_dochtmlcon,"") # # _file_title = item.getTitleFromHtml(filemd5,_document_html) # filelink = item.getSourceLinkFromHtml(filemd5,_document_html) # attach[document_attachment_path_fileTitle] = _file_title # attach[document_attachment_path_fileLink] = filelink # # item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True) # item.all_columns.remove(document_dochtmlcon) #change status item.setValue(document_status,random.randint(1,50),True) item.update_row(ots_client) t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client}) t_producer.start() t_producer.join() # mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client) # mt.run() dict_fingerprint = {} while True: try: item = task_queue.get(timeout=2) fingerprint = item.getProperties().get(document_fingerprint) if fingerprint is not None: if fingerprint not in dict_fingerprint: dict_fingerprint[fingerprint] = [] dict_fingerprint[fingerprint].append(item) except Exception as e: print(e) break print(len(dict_fingerprint.keys())) status_queue = queue.Queue() for k,v in dict_fingerprint.items(): print("key",k,len(v)) v.sort(key=lambda x:x.docid) for _d in v[1:]: _d.setValue(document_status,random.randint(401,450),True) status_queue.put(_d) mt = MultiThreadHandler(status_queue,_handle,None,30,ots_client=ots_client) mt.run() def turn_document_status(): from BaseDataMaintenance.dataSource.source import getConnect_ots from BaseDataMaintenance.common.multiThread import MultiThreadHandler import queue from threading import Thread import json task_queue = queue.Queue() from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link ots_client = getConnect_ots() def producer(task_queue,ots_client): from BaseDataMaintenance.model.ots.document_tmp import Document_tmp bool_query = BoolQuery( must_queries=[ # MatchPhraseQuery("doctitle","珠海城市职业技术学院2022年05月至2022年06月政府采购意向"), WildcardQuery("web_source_no","03716-*"), RangeQuery("page_time","2024-04-24"), TermQuery("save",1) # RangeQuery("status",0,1), # NestedQuery("page_attachments",ExistsQuery("page_attachments.fileMd5")), # TermQuery("docid",397656324) # BoolQuery(should_queries=[ # # TermQuery("tenderee","山西利民工业有限责任公司"), # # MatchPhraseQuery("doctitle","中国电信"), # # MatchPhraseQuery("doctextcon","中国电信"), # # MatchPhraseQuery("attachmenttextcon","中国电信")]), # # RangeQuery(document_status,88,120,True,True), # RangeQuery("page_time","2022-03-24","2022-03-25",True,False), # ExistsQuery # #,TermQuery(document_docid,171146519) # ] # ) ], # must_not_queries=[WildcardQuery("DX004354*")] ) # bool_query = BoolQuery( # # must_queries=[ # # RangeQuery("crtime","2023-08-30 15:00:00","2023-08-30 23:59:59"), # # NestedQuery("page_attachments",ExistsQuery("page_attachments.fileMd5")) # # ], # # must_not_queries=[WildcardQuery("attachmenttextcon","*")], # should_queries=[ # NestedQuery("sub_docs_json",TermQuery("sub_docs_json.win_tenderer","个体工商户")), # NestedQuery("sub_docs_json",TermQuery("sub_docs_json.win_tenderer","机械设备")), # ] # # ) rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True), columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED)) list_data = getRow_ots(rows) print(total_count) _count = len(list_data) for _data in list_data: _document = Document_tmp(_data) task_queue.put(_document) while next_token: rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index", SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True), columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED)) list_data = getRow_ots(rows) _count += len(list_data) print("%d/%d"%(_count,total_count)) for _data in list_data: _document = Document_tmp(_data) task_queue.put(_document) # docids = [223820830,224445409] # for docid in docids: # _dict = {document_docid:int(docid), # document_partitionkey:int(docid)%500+1, # } # task_queue.put(Document(_dict)) # import pandas as pd # df = pd.read_excel(r"F:\Workspace2016\DataMining\export\abc1.xlsx") # for docid in df["docid1"]: # _dict = {document_docid:int(docid), # document_partitionkey:int(docid)%500+1, # } # task_queue.put(Document(_dict)) # for docid in df["docid2"]: # _dict = {document_docid:int(docid), # document_partitionkey:int(docid)%500+1, # } # task_queue.put(Document(_dict)) # log("task_queue size:%d"%(task_queue.qsize())) def _handle(item,result_queue,ots_client): #change attach value # list_attachment = json.loads(item.getProperties().get(document_attachment_path)) # print("docid",item.getProperties().get(document_docid)) # for attach in list_attachment: # # filemd5 = attach.get(document_attachment_path_filemd5,"") # _document_html = item.getProperties().get(document_dochtmlcon,"") # # _file_title = item.getTitleFromHtml(filemd5,_document_html) # filelink = item.getSourceLinkFromHtml(filemd5,_document_html) # attach[document_attachment_path_fileTitle] = _file_title # attach[document_attachment_path_fileLink] = filelink # # item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True) # item.all_columns.remove(document_dochtmlcon) #change status # item.setValue(document_docchannel,item.getProperties().get(document_original_docchannel),True) # item.setValue(document_status,random.randint(151,171),True) # item.setValue(document_area,"华南",True) # item.setValue(document_province,"广东",True) # item.setValue(document_city,"珠海",True) # item.setValue(document_district,"金湾区",True) # item.setValue(document_status,66,True) # print(item.getProperties()) # item.update_row(ots_client) # log("update %d status done"%(item.getProperties().get(document_docid))) pass t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client}) t_producer.start() t_producer.join() mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client) mt.run() def drop_extract2(): from BaseDataMaintenance.dataSource.source import getConnect_ots from BaseDataMaintenance.common.multiThread import MultiThreadHandler import queue from threading import Thread import json task_queue = queue.Queue() from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link ots_client = getConnect_ots() from BaseDataMaintenance.model.ots.document_extract2 import Document_extract2 def producer(task_queue,ots_client): bool_query = BoolQuery(must_queries=[ BoolQuery(should_queries=[ # TermQuery("tenderee","山西利民工业有限责任公司"), # MatchPhraseQuery("doctitle","中国电信"), # MatchPhraseQuery("doctextcon","中国电信"), # MatchPhraseQuery("attachmenttextcon","中国电信")]), RangeQuery("status",1,1000,True,True), # RangeQuery("page_time","2021-12-20","2022-01-05",True,False), #,TermQuery(document_docid,171146519) ] ), # TermQuery("docid",228359000) ], # must_not_queries=[NestedQuery("sub_docs_json",WildcardQuery("sub_docs_json.win_tenderer","*"))] ) rows,next_token,total_count,is_all_succeed = ots_client.search("document_extract2","document_extract2_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True), columns_to_get=ColumnsToGet(["status"],return_type=ColumnReturnType.SPECIFIED)) list_data = getRow_ots(rows) print(total_count) _count = len(list_data) for _data in list_data: task_queue.put(_data) while next_token: rows,next_token,total_count,is_all_succeed = ots_client.search("document_extract2","document_extract2_index", SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True), columns_to_get=ColumnsToGet(["status"],return_type=ColumnReturnType.SPECIFIED)) list_data = getRow_ots(rows) _count += len(list_data) print("%d/%d"%(_count,total_count)) for _data in list_data: task_queue.put(_data) # docids = [223820830,224445409] # for docid in docids: # _dict = {document_docid:int(docid), # document_partitionkey:int(docid)%500+1, # } # task_queue.put(Document(_dict)) # import pandas as pd # df = pd.read_excel("2022-01-19_214304_export11.xlsx") # for docid,tenderee,win in zip(df["docid"],df["招标单位"],df["中标单位"]): # if not isinstance(tenderee,(str)) or not isinstance(win,(str)) or win=="" or tenderee=="": # # print(docid) # _dict = {document_docid:int(docid), # document_partitionkey:int(docid)%500+1, # } # task_queue.put(Document(_dict)) log("task_queue size:%d"%(task_queue.qsize())) def _handle(item,result_queue,ots_client): #change attach value # list_attachment = json.loads(item.getProperties().get(document_attachment_path)) # print("docid",item.getProperties().get(document_docid)) # for attach in list_attachment: # # filemd5 = attach.get(document_attachment_path_filemd5,"") # _document_html = item.getProperties().get(document_dochtmlcon,"") # # _file_title = item.getTitleFromHtml(filemd5,_document_html) # filelink = item.getSourceLinkFromHtml(filemd5,_document_html) # attach[document_attachment_path_fileTitle] = _file_title # attach[document_attachment_path_fileLink] = filelink # # item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True) # item.all_columns.remove(document_dochtmlcon) #change status # item.setValue(document_docchannel,item.getProperties().get(document_original_docchannel),True) # item.setValue(document_status,random.randint(151,170),True) # item.update_row(ots_client) # log("update %d status done"%(item.getProperties().get(document_docid))) _dict = {} _dict.update(item) _dict.pop("status") _dict["status"] = 1 print(_dict) _document = Document(_dict) _document.update_row(ots_client) _d_extract = Document_extract2(_dict) _d_extract.delete_row(ots_client) pass t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client}) t_producer.start() t_producer.join() mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client) mt.run() def fixDocumentHtml(): from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_ots_capacity from queue import Queue ots_client = getConnect_ots() from BaseDataMaintenance.common.multiThread import MultiThreadHandler from BaseDataMaintenance.model.ots.document_html import Document_html capacity_client = getConnect_ots_capacity() list_data = [] bool_query = BoolQuery(must_queries=[ MatchPhraseQuery("doctextcon","信友-城市之光"), MatchPhraseQuery("doctextcon","Copyright"), # TermQuery("docid",254249505) ]) rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100), columns_to_get=ColumnsToGet(["doctextcon"],return_type=ColumnReturnType.SPECIFIED)) print("total_count",total_count) list_data.extend(getRow_ots(rows)) while next_token: rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index", SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100), columns_to_get=ColumnsToGet(["doctextcon"],return_type=ColumnReturnType.SPECIFIED)) list_data.extend(getRow_ots(rows)) task_queue = Queue() for _data in list_data: task_queue.put(_data) _pattern = "(?P<_find>城市之光.*Ltd.)" _pattern1 = "(?P<_find>Evaluation.*Ltd.)" def _handle(item,result_queue): _doctextcon = item.get("doctextcon") _search = re.search(_pattern,_doctextcon) print(_search.groupdict().get("_find")) item["doctextcon"] = re.sub(_pattern,"",_doctextcon) _d = Document(item) _d.update_row(ots_client) _d1 = {"partitionkey":item.get("partitionkey"), "docid":item.get("docid")} _dh = Document(_d1) _dh.fix_columns(capacity_client,["dochtmlcon"],True) _dochtmlcon = _dh.getProperties().get("dochtmlcon") _dochtmlcon = re.sub("\n","",_dochtmlcon) _search = re.search(_pattern1,_dochtmlcon) _dochtmlcon = re.sub(_pattern1,"",_dochtmlcon) _d1["dochtmlcon"] = _dochtmlcon _dh = Document(_d1) _dh.update_row(capacity_client) # print(re.sub(_pattern,"

",_dochtmlcon)) mt = MultiThreadHandler(task_queue,_handle,None,2) mt.run() def delete_documents(): from BaseDataMaintenance.dataSource.source import getConnect_ots from BaseDataMaintenance.dataSource.source import getConnect_ots_capacity ots_client = getConnect_ots() ots_capacity = getConnect_ots_capacity() import pandas as pd df = pd.read_excel("2022-10-14_190838_数据导出.xlsx") _count = 0 for _docid in df["docid"]: partitionkey = int(_docid)%500+1 _d = {document_partitionkey:partitionkey, document_docid:int(_docid)} _doc = Document(_d) _doc.delete_row(ots_client) _doc.delete_row(ots_capacity) _count += 1 print(_docid) print("delete count:%d"%_count) def turn_document_docchannel(): from BaseDataMaintenance.dataSource.source import getConnect_ots from BaseDataMaintenance.common.multiThread import MultiThreadHandler import queue from threading import Thread import json task_queue = queue.Queue() from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link ots_client = getConnect_ots() def producer(task_queue,ots_client): bool_query = BoolQuery( must_queries=[ TermQuery("web_source_no","DX007520-7"), # TermQuery("docid",363793104) # MatchPhraseQuery("doctitle","珠海城市职业技术学院2022年05月至2022年06月政府采购意向"), # BoolQuery(should_queries=[ # # TermQuery("tenderee","山西利民工业有限责任公司"), # # MatchPhraseQuery("doctitle","中国电信"), # # MatchPhraseQuery("doctextcon","中国电信"), # # MatchPhraseQuery("attachmenttextcon","中国电信")]), # # RangeQuery(document_status,88,120,True,True), # RangeQuery("page_time","2022-03-24","2022-03-25",True,False), # ExistsQuery # #,TermQuery(document_docid,171146519) # ] # ) ], # must_not_queries=[WildcardQuery("DX004354*")] ) # bool_query = BoolQuery( # # must_queries=[ # # RangeQuery("crtime","2023-08-30 15:00:00","2023-08-30 23:59:59"), # # NestedQuery("page_attachments",ExistsQuery("page_attachments.fileMd5")) # # ], # # must_not_queries=[WildcardQuery("attachmenttextcon","*")], # should_queries=[ # NestedQuery("sub_docs_json",TermQuery("sub_docs_json.win_tenderer","个体工商户")), # NestedQuery("sub_docs_json",TermQuery("sub_docs_json.win_tenderer","机械设备")), # ] # # ) rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True), columns_to_get=ColumnsToGet(["detail_link"],return_type=ColumnReturnType.SPECIFIED)) list_data = getRow_ots(rows) print(total_count) _count = len(list_data) for _data in list_data: _document = Document(_data) task_queue.put(_document) while next_token: rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index", SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True), columns_to_get=ColumnsToGet(["detail_link"],return_type=ColumnReturnType.SPECIFIED)) list_data = getRow_ots(rows) _count += len(list_data) print("%d/%d"%(_count,total_count)) for _data in list_data: _document = Document(_data) task_queue.put(_document) # docids = [223820830,224445409] # for docid in docids: # _dict = {document_docid:int(docid), # document_partitionkey:int(docid)%500+1, # } # task_queue.put(Document(_dict)) # import pandas as pd # df = pd.read_excel("G:\\20221212error.xlsx") # for docid in df["docid"]: # _dict = {document_docid:int(docid), # document_partitionkey:int(docid)%500+1, # } # task_queue.put(Document(_dict)) log("task_queue size:%d"%(task_queue.qsize())) def _handle(item,result_queue,ots_client): #change attach value # list_attachment = json.loads(item.getProperties().get(document_attachment_path)) # print("docid",item.getProperties().get(document_docid)) # for attach in list_attachment: # # filemd5 = attach.get(document_attachment_path_filemd5,"") # _document_html = item.getProperties().get(document_dochtmlcon,"") # # _file_title = item.getTitleFromHtml(filemd5,_document_html) # filelink = item.getSourceLinkFromHtml(filemd5,_document_html) # attach[document_attachment_path_fileTitle] = _file_title # attach[document_attachment_path_fileLink] = filelink # # item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True) # item.all_columns.remove(document_dochtmlcon) #change status # item.setValue(document_docchannel,item.getProperties().get(document_original_docchannel),True) # item.setValue(document_status,random.randint(151,171),True) # item.setValue(document_area,"华南",True) # item.setValue(document_province,"广东",True) # item.setValue(document_city,"珠海",True) # item.setValue(document_district,"金湾区",True) # item.setValue(document_status,1,True) # print(item.getProperties()) # item.update_row(ots_client) detail_link = item.getProperties().get("detail_link","") if "/012002002/" in detail_link: partitionkey = item.getProperties().get("partitionkey") docid = item.getProperties().get("docid") _dict = {document_partitionkey:partitionkey, document_docid:docid, document_docchannel:101, document_original_docchannel:101} doc = Document(_dict) doc.update_row(ots_client) print(_dict) # log("update %d status done"%(item.getProperties().get(document_docid))) pass t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client}) t_producer.start() t_producer.join() mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client) mt.run() if __name__=="__main__": # turn_extract_status() turn_document_status() # drop_extract2() # fixDocumentHtml() # turn_document_docchannel()