from BaseDataMaintenance.model.ots.BaseModel import BaseModel
from tablestore import *
from BaseDataMaintenance.common.Utils import *
from bs4 import BeautifulSoup
from BaseDataMaintenance.common.Utils import article_limit
document_partitionkey = "partitionkey"
document_docid = "docid"
document_dochtmlcon = "dochtmlcon"
document_doctextcon = "doctextcon"
document_doctitle = "doctitle"
document_attachmenttextcon = "attachmenttextcon"
document_attachment_path = "page_attachments"
document_attachment_path_filemd5 = "fileMd5"
document_attachment_path_fileTitle = "fileTitle"
document_attachment_path_fileLink = "fileLink"
document_crtime = "crtime"
document_status = "status"
document_page_time = "page_time"
document_attachment_extract_status = "attachment_extract_status"
document_web_source_no = "web_source_no"
document_web_source_name = "web_source_name"
document_fingerprint = "fingerprint"
document_opertime = "opertime"
document_docchannel = "docchannel"
document_original_docchannel = "original_docchannel"
document_life_docchannel = "life_docchannel"
document_area = "area"
document_province = "province"
document_city = "city"
document_district = "district"
document_extract_json = "extract_json"
document_bidway = "bidway"
document_industry = "industry"
document_info_type = "info_type"
document_qcodes = "qcodes"
document_project_name = "project_name"
document_project_code = "project_code"
document_project_codes = "project_codes"
document_tenderee = "tenderee"
document_tenderee_addr = "tenderee_addr"
document_tenderee_phone = "tenderee_phone"
document_tenderee_contact = "tenderee_contact"
document_agency = "agency"
document_agency_phone = "agency_phone"
document_agency_contact = "agency_contact"
document_product = "product"
document_moneysource = "moneysource"
document_service_time = "service_time"
document_time_bidclose = "time_bidclose"
document_time_bidopen = "time_bidopen"
document_time_bidstart = "time_bidstart"
document_time_commencement = "time_commencement"
document_time_completion = "time_completion"
document_time_earnest_money_start = "time_earnest_money_start"
document_time_earnest_money_end = "time_earnest_money_end"
document_time_get_file_end = "time_get_file_end"
document_time_get_file_start = "time_get_file_start"
document_time_publicity_end = "time_publicity_end"
document_time_publicity_start = "time_publicity_start"
document_time_registration_end = "time_registration_end"
document_time_registration_start = "time_registration_start"
document_time_release = "time_release"
document_info_source = "info_source"
document_nlp_enterprise = "nlp_enterprise"
document_nlp_enterprise_attachment = "nlp_enterprise_attachment"
document_total_tenderee_money = "total_tenderee_money"
document_update_document = "update_document"
class Document(BaseModel):
def __init__(self,_dict):
BaseModel.__init__(self)
for k,v in _dict.items():
self.setValue(k,v,True)
self.table_name = "document"
self.prefixs = ["www.bidizhaobiao.com","bxkc.oss-cn-shanghai.aliyuncs.com"]
def getPrimary_keys(self):
return ["partitionkey","docid"]
def getAttribute_turple(self):
_list = []
for _key in self.getAttribute_keys():
if _key=="all_columns":
continue
_v = self.getProperties().get(_key)
if _v is not None and _v!="":
if isinstance(_v,list):
_v = json.dumps(_v)
_list.append((_key,_v))
return _list
# def delete_row(self,ots_client):
# raise NotImplementedError()
def isLegalUrl(self,_url,_type):
_flag = False
for _prefix in self.prefixs:
if _url.find(_prefix)>=0:
_flag = True
if _type==0:
if _flag:
return True
else:
return False
else:
if _flag:
return False
else:
return True
def fromInitialed(self):
self.setValue(document_status,random.randint(1,50),True)
def fromEas2Maxcompute(self):
self.setValue(document_status,random.randint(151,170),True)
def fromEasFailed(self):
self.setValue(document_status,random.randint(51,60),True)
def fromEas2Extract(self):
self.setValue(document_status,random.randint(61,70),True)
def updateSWFImages(self,swf_urls):
if len(swf_urls)>0:
_dochtmlcon = self.getProperties().get(document_dochtmlcon)
_soup = BeautifulSoup(_dochtmlcon,"lxml")
if _soup.find("img",{"src":swf_urls[0]}) is None:
_div = '
'
for _url in swf_urls:
_div += '

'%(_url)
_div += "
"
_dochtmlcon += _div
self.setValue(document_dochtmlcon,_dochtmlcon,True)
def getRichTextFetch(self,list_html):
_text = ""
for _ht in list_html:
if isinstance(_ht,str):
_text += "%s
"%(_ht)
elif isinstance(_ht,dict):
_filemd5 = _ht.get("filemd5","")
_html = _ht.get("html","")
_text += '%s
'%(_filemd5,_html)
if len(_text)>50000:
_soup = BeautifulSoup(_text,"lxml")
_soup = article_limit(_soup,50000)
_text = re.sub("|||","",str(_soup))
return _text
def updateAttachment(self,list_html):
if len(list_html)>0:
_dochtmlcon = self.getProperties().get(document_dochtmlcon,"")
_dochtmlcon = re.sub("|||","",_dochtmlcon)
_dochtmlcon_len = len(bytes(_dochtmlcon,encoding="utf8"))
fix_len = self.COLUMN_MAX_SIZE-_dochtmlcon_len-100
# _text = '\n%s
'%("\n".join(list_html))
_text = '%s
'%(self.getRichTextFetch(list_html))
if _dochtmlcon is not None:
_soup = BeautifulSoup(_dochtmlcon,"lxml")
_node = _soup.find("div",attrs={"class":"richTextFetch"})
if _node is not None:
_node.decompose()
self.setValue(document_dochtmlcon,str(_soup)+_text,True)
def getTitleFromHtml(self,filemd5,_html):
_soup = BeautifulSoup(_html,"lxml")
_find = _soup.find("a",attrs={"data":filemd5})
_title = ""
if _find is not None:
_title = _find.get_text()
return _title
def getSourceLinkFromHtml(self,filemd5,_html):
_soup = BeautifulSoup(_html,"lxml")
_find = _soup.find("a",attrs={"filelink":filemd5})
filelink = ""
if _find is None:
_find = _soup.find("img",attrs={"filelink":filemd5})
if _find is not None:
filelink = _find.attrs.get("src","")
else:
filelink = _find.attrs.get("href","")
return filelink
import random
def turn_extract_status():
from BaseDataMaintenance.dataSource.source import getConnect_ots
from BaseDataMaintenance.common.multiThread import MultiThreadHandler
import queue
from threading import Thread
import json
task_queue = queue.Queue()
from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
ots_client = getConnect_ots()
def producer(task_queue,ots_client):
bool_query = BoolQuery(must_queries=[
# WildcardQuery(document_web_source_no,"00295*"),
# RangeQuery(document_crtime,"2021-07-26 00:00:00"),
RangeQuery(document_status,61,70,True,True),
#TermQuery(document_docid,171146519),
]
)
rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
columns_to_get=ColumnsToGet([document_fingerprint],return_type=ColumnReturnType.SPECIFIED))
list_data = getRow_ots(rows)
print(total_count)
_count = len(list_data)
for _data in list_data:
_document = Document(_data)
task_queue.put(_document)
while next_token:
rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
columns_to_get=ColumnsToGet([document_fingerprint],return_type=ColumnReturnType.SPECIFIED))
list_data = getRow_ots(rows)
_count += len(list_data)
print("%d/%d"%(_count,total_count))
for _data in list_data:
_document = Document(_data)
task_queue.put(_document)
def _handle(item,result_queue,ots_client):
#change attach value
# list_attachment = json.loads(item.getProperties().get(document_attachment_path))
# print("docid",item.getProperties().get(document_docid))
# for attach in list_attachment:
#
# filemd5 = attach.get(document_attachment_path_filemd5,"")
# _document_html = item.getProperties().get(document_dochtmlcon,"")
#
# _file_title = item.getTitleFromHtml(filemd5,_document_html)
# filelink = item.getSourceLinkFromHtml(filemd5,_document_html)
# attach[document_attachment_path_fileTitle] = _file_title
# attach[document_attachment_path_fileLink] = filelink
#
# item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
# item.all_columns.remove(document_dochtmlcon)
#change status
item.setValue(document_status,random.randint(1,50),True)
item.update_row(ots_client)
t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
t_producer.start()
t_producer.join()
# mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
# mt.run()
dict_fingerprint = {}
while True:
try:
item = task_queue.get(timeout=2)
fingerprint = item.getProperties().get(document_fingerprint)
if fingerprint is not None:
if fingerprint not in dict_fingerprint:
dict_fingerprint[fingerprint] = []
dict_fingerprint[fingerprint].append(item)
except Exception as e:
print(e)
break
print(len(dict_fingerprint.keys()))
status_queue = queue.Queue()
for k,v in dict_fingerprint.items():
print("key",k,len(v))
v.sort(key=lambda x:x.docid)
for _d in v[1:]:
_d.setValue(document_status,random.randint(401,450),True)
status_queue.put(_d)
mt = MultiThreadHandler(status_queue,_handle,None,30,ots_client=ots_client)
mt.run()
def turn_document_status():
from BaseDataMaintenance.dataSource.source import getConnect_ots
from BaseDataMaintenance.common.multiThread import MultiThreadHandler
import queue
from threading import Thread
import json
task_queue = queue.Queue()
from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
ots_client = getConnect_ots()
def producer(task_queue,ots_client):
from BaseDataMaintenance.model.ots.document_tmp import Document_tmp
bool_query = BoolQuery(
must_queries=[
# MatchPhraseQuery("doctitle","珠海城市职业技术学院2022年05月至2022年06月政府采购意向"),
WildcardQuery("web_source_no","03716-*"),
RangeQuery("page_time","2024-04-24"),
TermQuery("save",1)
# RangeQuery("status",0,1),
# NestedQuery("page_attachments",ExistsQuery("page_attachments.fileMd5")),
# TermQuery("docid",397656324)
# BoolQuery(should_queries=[
# # TermQuery("tenderee","山西利民工业有限责任公司"),
# # MatchPhraseQuery("doctitle","中国电信"),
# # MatchPhraseQuery("doctextcon","中国电信"),
# # MatchPhraseQuery("attachmenttextcon","中国电信")]),
# # RangeQuery(document_status,88,120,True,True),
# RangeQuery("page_time","2022-03-24","2022-03-25",True,False),
# ExistsQuery
# #,TermQuery(document_docid,171146519)
# ]
# )
],
# must_not_queries=[WildcardQuery("DX004354*")]
)
# bool_query = BoolQuery(
# # must_queries=[
# # RangeQuery("crtime","2023-08-30 15:00:00","2023-08-30 23:59:59"),
# # NestedQuery("page_attachments",ExistsQuery("page_attachments.fileMd5"))
# # ],
# # must_not_queries=[WildcardQuery("attachmenttextcon","*")],
# should_queries=[
# NestedQuery("sub_docs_json",TermQuery("sub_docs_json.win_tenderer","个体工商户")),
# NestedQuery("sub_docs_json",TermQuery("sub_docs_json.win_tenderer","机械设备")),
# ]
#
# )
rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
list_data = getRow_ots(rows)
print(total_count)
_count = len(list_data)
for _data in list_data:
_document = Document_tmp(_data)
task_queue.put(_document)
while next_token:
rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
list_data = getRow_ots(rows)
_count += len(list_data)
print("%d/%d"%(_count,total_count))
for _data in list_data:
_document = Document_tmp(_data)
task_queue.put(_document)
# docids = [223820830,224445409]
# for docid in docids:
# _dict = {document_docid:int(docid),
# document_partitionkey:int(docid)%500+1,
# }
# task_queue.put(Document(_dict))
# import pandas as pd
# df = pd.read_excel(r"F:\Workspace2016\DataMining\export\abc1.xlsx")
# for docid in df["docid1"]:
# _dict = {document_docid:int(docid),
# document_partitionkey:int(docid)%500+1,
# }
# task_queue.put(Document(_dict))
# for docid in df["docid2"]:
# _dict = {document_docid:int(docid),
# document_partitionkey:int(docid)%500+1,
# }
# task_queue.put(Document(_dict))
# log("task_queue size:%d"%(task_queue.qsize()))
def _handle(item,result_queue,ots_client):
#change attach value
# list_attachment = json.loads(item.getProperties().get(document_attachment_path))
# print("docid",item.getProperties().get(document_docid))
# for attach in list_attachment:
#
# filemd5 = attach.get(document_attachment_path_filemd5,"")
# _document_html = item.getProperties().get(document_dochtmlcon,"")
#
# _file_title = item.getTitleFromHtml(filemd5,_document_html)
# filelink = item.getSourceLinkFromHtml(filemd5,_document_html)
# attach[document_attachment_path_fileTitle] = _file_title
# attach[document_attachment_path_fileLink] = filelink
#
# item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
# item.all_columns.remove(document_dochtmlcon)
#change status
# item.setValue(document_docchannel,item.getProperties().get(document_original_docchannel),True)
# item.setValue(document_status,random.randint(151,171),True)
# item.setValue(document_area,"华南",True)
# item.setValue(document_province,"广东",True)
# item.setValue(document_city,"珠海",True)
# item.setValue(document_district,"金湾区",True)
# item.setValue(document_status,66,True)
# print(item.getProperties())
# item.update_row(ots_client)
# log("update %d status done"%(item.getProperties().get(document_docid)))
pass
t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
t_producer.start()
t_producer.join()
mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
mt.run()
def drop_extract2():
from BaseDataMaintenance.dataSource.source import getConnect_ots
from BaseDataMaintenance.common.multiThread import MultiThreadHandler
import queue
from threading import Thread
import json
task_queue = queue.Queue()
from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
ots_client = getConnect_ots()
from BaseDataMaintenance.model.ots.document_extract2 import Document_extract2
def producer(task_queue,ots_client):
bool_query = BoolQuery(must_queries=[
BoolQuery(should_queries=[
# TermQuery("tenderee","山西利民工业有限责任公司"),
# MatchPhraseQuery("doctitle","中国电信"),
# MatchPhraseQuery("doctextcon","中国电信"),
# MatchPhraseQuery("attachmenttextcon","中国电信")]),
RangeQuery("status",1,1000,True,True),
# RangeQuery("page_time","2021-12-20","2022-01-05",True,False),
#,TermQuery(document_docid,171146519)
]
),
# TermQuery("docid",228359000)
],
# must_not_queries=[NestedQuery("sub_docs_json",WildcardQuery("sub_docs_json.win_tenderer","*"))]
)
rows,next_token,total_count,is_all_succeed = ots_client.search("document_extract2","document_extract2_index",
SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
columns_to_get=ColumnsToGet(["status"],return_type=ColumnReturnType.SPECIFIED))
list_data = getRow_ots(rows)
print(total_count)
_count = len(list_data)
for _data in list_data:
task_queue.put(_data)
while next_token:
rows,next_token,total_count,is_all_succeed = ots_client.search("document_extract2","document_extract2_index",
SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
columns_to_get=ColumnsToGet(["status"],return_type=ColumnReturnType.SPECIFIED))
list_data = getRow_ots(rows)
_count += len(list_data)
print("%d/%d"%(_count,total_count))
for _data in list_data:
task_queue.put(_data)
# docids = [223820830,224445409]
# for docid in docids:
# _dict = {document_docid:int(docid),
# document_partitionkey:int(docid)%500+1,
# }
# task_queue.put(Document(_dict))
# import pandas as pd
# df = pd.read_excel("2022-01-19_214304_export11.xlsx")
# for docid,tenderee,win in zip(df["docid"],df["招标单位"],df["中标单位"]):
# if not isinstance(tenderee,(str)) or not isinstance(win,(str)) or win=="" or tenderee=="":
# # print(docid)
# _dict = {document_docid:int(docid),
# document_partitionkey:int(docid)%500+1,
# }
# task_queue.put(Document(_dict))
log("task_queue size:%d"%(task_queue.qsize()))
def _handle(item,result_queue,ots_client):
#change attach value
# list_attachment = json.loads(item.getProperties().get(document_attachment_path))
# print("docid",item.getProperties().get(document_docid))
# for attach in list_attachment:
#
# filemd5 = attach.get(document_attachment_path_filemd5,"")
# _document_html = item.getProperties().get(document_dochtmlcon,"")
#
# _file_title = item.getTitleFromHtml(filemd5,_document_html)
# filelink = item.getSourceLinkFromHtml(filemd5,_document_html)
# attach[document_attachment_path_fileTitle] = _file_title
# attach[document_attachment_path_fileLink] = filelink
#
# item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
# item.all_columns.remove(document_dochtmlcon)
#change status
# item.setValue(document_docchannel,item.getProperties().get(document_original_docchannel),True)
# item.setValue(document_status,random.randint(151,170),True)
# item.update_row(ots_client)
# log("update %d status done"%(item.getProperties().get(document_docid)))
_dict = {}
_dict.update(item)
_dict.pop("status")
_dict["status"] = 1
print(_dict)
_document = Document(_dict)
_document.update_row(ots_client)
_d_extract = Document_extract2(_dict)
_d_extract.delete_row(ots_client)
pass
t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
t_producer.start()
t_producer.join()
mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
mt.run()
def fixDocumentHtml():
from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_ots_capacity
from queue import Queue
ots_client = getConnect_ots()
from BaseDataMaintenance.common.multiThread import MultiThreadHandler
from BaseDataMaintenance.model.ots.document_html import Document_html
capacity_client = getConnect_ots_capacity()
list_data = []
bool_query = BoolQuery(must_queries=[
MatchPhraseQuery("doctextcon","信友-城市之光"),
MatchPhraseQuery("doctextcon","Copyright"),
# TermQuery("docid",254249505)
])
rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
columns_to_get=ColumnsToGet(["doctextcon"],return_type=ColumnReturnType.SPECIFIED))
print("total_count",total_count)
list_data.extend(getRow_ots(rows))
while next_token:
rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
columns_to_get=ColumnsToGet(["doctextcon"],return_type=ColumnReturnType.SPECIFIED))
list_data.extend(getRow_ots(rows))
task_queue = Queue()
for _data in list_data:
task_queue.put(_data)
_pattern = "(?P<_find>城市之光.*Ltd.)"
_pattern1 = "(?P<_find>Evaluation.*Ltd.)"
def _handle(item,result_queue):
_doctextcon = item.get("doctextcon")
_search = re.search(_pattern,_doctextcon)
print(_search.groupdict().get("_find"))
item["doctextcon"] = re.sub(_pattern,"",_doctextcon)
_d = Document(item)
_d.update_row(ots_client)
_d1 = {"partitionkey":item.get("partitionkey"),
"docid":item.get("docid")}
_dh = Document(_d1)
_dh.fix_columns(capacity_client,["dochtmlcon"],True)
_dochtmlcon = _dh.getProperties().get("dochtmlcon")
_dochtmlcon = re.sub("\n","",_dochtmlcon)
_search = re.search(_pattern1,_dochtmlcon)
_dochtmlcon = re.sub(_pattern1,"",_dochtmlcon)
_d1["dochtmlcon"] = _dochtmlcon
_dh = Document(_d1)
_dh.update_row(capacity_client)
# print(re.sub(_pattern,"",_dochtmlcon))
mt = MultiThreadHandler(task_queue,_handle,None,2)
mt.run()
def delete_documents():
from BaseDataMaintenance.dataSource.source import getConnect_ots
from BaseDataMaintenance.dataSource.source import getConnect_ots_capacity
ots_client = getConnect_ots()
ots_capacity = getConnect_ots_capacity()
import pandas as pd
df = pd.read_excel("2022-10-14_190838_数据导出.xlsx")
_count = 0
for _docid in df["docid"]:
partitionkey = int(_docid)%500+1
_d = {document_partitionkey:partitionkey,
document_docid:int(_docid)}
_doc = Document(_d)
_doc.delete_row(ots_client)
_doc.delete_row(ots_capacity)
_count += 1
print(_docid)
print("delete count:%d"%_count)
def turn_document_docchannel():
from BaseDataMaintenance.dataSource.source import getConnect_ots
from BaseDataMaintenance.common.multiThread import MultiThreadHandler
import queue
from threading import Thread
import json
task_queue = queue.Queue()
from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
ots_client = getConnect_ots()
def producer(task_queue,ots_client):
bool_query = BoolQuery(
must_queries=[
TermQuery("web_source_no","DX007520-7"),
# TermQuery("docid",363793104)
# MatchPhraseQuery("doctitle","珠海城市职业技术学院2022年05月至2022年06月政府采购意向"),
# BoolQuery(should_queries=[
# # TermQuery("tenderee","山西利民工业有限责任公司"),
# # MatchPhraseQuery("doctitle","中国电信"),
# # MatchPhraseQuery("doctextcon","中国电信"),
# # MatchPhraseQuery("attachmenttextcon","中国电信")]),
# # RangeQuery(document_status,88,120,True,True),
# RangeQuery("page_time","2022-03-24","2022-03-25",True,False),
# ExistsQuery
# #,TermQuery(document_docid,171146519)
# ]
# )
],
# must_not_queries=[WildcardQuery("DX004354*")]
)
# bool_query = BoolQuery(
# # must_queries=[
# # RangeQuery("crtime","2023-08-30 15:00:00","2023-08-30 23:59:59"),
# # NestedQuery("page_attachments",ExistsQuery("page_attachments.fileMd5"))
# # ],
# # must_not_queries=[WildcardQuery("attachmenttextcon","*")],
# should_queries=[
# NestedQuery("sub_docs_json",TermQuery("sub_docs_json.win_tenderer","个体工商户")),
# NestedQuery("sub_docs_json",TermQuery("sub_docs_json.win_tenderer","机械设备")),
# ]
#
# )
rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
columns_to_get=ColumnsToGet(["detail_link"],return_type=ColumnReturnType.SPECIFIED))
list_data = getRow_ots(rows)
print(total_count)
_count = len(list_data)
for _data in list_data:
_document = Document(_data)
task_queue.put(_document)
while next_token:
rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
columns_to_get=ColumnsToGet(["detail_link"],return_type=ColumnReturnType.SPECIFIED))
list_data = getRow_ots(rows)
_count += len(list_data)
print("%d/%d"%(_count,total_count))
for _data in list_data:
_document = Document(_data)
task_queue.put(_document)
# docids = [223820830,224445409]
# for docid in docids:
# _dict = {document_docid:int(docid),
# document_partitionkey:int(docid)%500+1,
# }
# task_queue.put(Document(_dict))
# import pandas as pd
# df = pd.read_excel("G:\\20221212error.xlsx")
# for docid in df["docid"]:
# _dict = {document_docid:int(docid),
# document_partitionkey:int(docid)%500+1,
# }
# task_queue.put(Document(_dict))
log("task_queue size:%d"%(task_queue.qsize()))
def _handle(item,result_queue,ots_client):
#change attach value
# list_attachment = json.loads(item.getProperties().get(document_attachment_path))
# print("docid",item.getProperties().get(document_docid))
# for attach in list_attachment:
#
# filemd5 = attach.get(document_attachment_path_filemd5,"")
# _document_html = item.getProperties().get(document_dochtmlcon,"")
#
# _file_title = item.getTitleFromHtml(filemd5,_document_html)
# filelink = item.getSourceLinkFromHtml(filemd5,_document_html)
# attach[document_attachment_path_fileTitle] = _file_title
# attach[document_attachment_path_fileLink] = filelink
#
# item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
# item.all_columns.remove(document_dochtmlcon)
#change status
# item.setValue(document_docchannel,item.getProperties().get(document_original_docchannel),True)
# item.setValue(document_status,random.randint(151,171),True)
# item.setValue(document_area,"华南",True)
# item.setValue(document_province,"广东",True)
# item.setValue(document_city,"珠海",True)
# item.setValue(document_district,"金湾区",True)
# item.setValue(document_status,1,True)
# print(item.getProperties())
# item.update_row(ots_client)
detail_link = item.getProperties().get("detail_link","")
if "/012002002/" in detail_link:
partitionkey = item.getProperties().get("partitionkey")
docid = item.getProperties().get("docid")
_dict = {document_partitionkey:partitionkey,
document_docid:docid,
document_docchannel:101,
document_original_docchannel:101}
doc = Document(_dict)
doc.update_row(ots_client)
print(_dict)
# log("update %d status done"%(item.getProperties().get(document_docid)))
pass
t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
t_producer.start()
t_producer.join()
mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
mt.run()
if __name__=="__main__":
# turn_extract_status()
turn_document_status()
# drop_extract2()
# fixDocumentHtml()
# turn_document_docchannel()