document.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741
  1. from BaseDataMaintenance.model.ots.BaseModel import BaseModel
  2. from tablestore import *
  3. from BaseDataMaintenance.common.Utils import *
  4. from bs4 import BeautifulSoup
  5. from BaseDataMaintenance.common.Utils import article_limit
  6. document_partitionkey = "partitionkey"
  7. document_docid = "docid"
  8. document_dochtmlcon = "dochtmlcon"
  9. document_doctextcon = "doctextcon"
  10. document_doctitle = "doctitle"
  11. document_attachmenttextcon = "attachmenttextcon"
  12. document_attachment_path = "page_attachments"
  13. document_attachment_path_filemd5 = "fileMd5"
  14. document_attachment_path_fileTitle = "fileTitle"
  15. document_attachment_path_fileLink = "fileLink"
  16. document_crtime = "crtime"
  17. document_status = "status"
  18. document_page_time = "page_time"
  19. document_attachment_extract_status = "attachment_extract_status"
  20. document_web_source_no = "web_source_no"
  21. document_web_source_name = "web_source_name"
  22. document_fingerprint = "fingerprint"
  23. document_opertime = "opertime"
  24. document_docchannel = "docchannel"
  25. document_original_docchannel = "original_docchannel"
  26. document_life_docchannel = "life_docchannel"
  27. document_area = "area"
  28. document_province = "province"
  29. document_city = "city"
  30. document_district = "district"
  31. document_extract_json = "extract_json"
  32. document_bidway = "bidway"
  33. document_industry = "industry"
  34. document_info_type = "info_type"
  35. document_qcodes = "qcodes"
  36. document_project_name = "project_name"
  37. document_project_code = "project_code"
  38. document_project_codes = "project_codes"
  39. document_tenderee = "tenderee"
  40. document_tenderee_addr = "tenderee_addr"
  41. document_tenderee_phone = "tenderee_phone"
  42. document_tenderee_contact = "tenderee_contact"
  43. document_agency = "agency"
  44. document_agency_phone = "agency_phone"
  45. document_agency_contact = "agency_contact"
  46. document_product = "product"
  47. document_moneysource = "moneysource"
  48. document_service_time = "service_time"
  49. document_time_bidclose = "time_bidclose"
  50. document_time_bidopen = "time_bidopen"
  51. document_time_bidstart = "time_bidstart"
  52. document_time_commencement = "time_commencement"
  53. document_time_completion = "time_completion"
  54. document_time_earnest_money_start = "time_earnest_money_start"
  55. document_time_earnest_money_end = "time_earnest_money_end"
  56. document_time_get_file_end = "time_get_file_end"
  57. document_time_get_file_start = "time_get_file_start"
  58. document_time_publicity_end = "time_publicity_end"
  59. document_time_publicity_start = "time_publicity_start"
  60. document_time_registration_end = "time_registration_end"
  61. document_time_registration_start = "time_registration_start"
  62. document_time_release = "time_release"
  63. document_info_source = "info_source"
  64. document_nlp_enterprise = "nlp_enterprise"
  65. document_nlp_enterprise_attachment = "nlp_enterprise_attachment"
  66. document_total_tenderee_money = "total_tenderee_money"
  67. document_update_document = "update_document"
  68. class Document(BaseModel):
  69. def __init__(self,_dict):
  70. BaseModel.__init__(self)
  71. for k,v in _dict.items():
  72. self.setValue(k,v,True)
  73. self.table_name = "document"
  74. self.prefixs = ["www.bidizhaobiao.com","bxkc.oss-cn-shanghai.aliyuncs.com"]
  75. def getPrimary_keys(self):
  76. return ["partitionkey","docid"]
  77. def getAttribute_turple(self):
  78. _list = []
  79. for _key in self.getAttribute_keys():
  80. if _key=="all_columns":
  81. continue
  82. _v = self.getProperties().get(_key)
  83. if _v is not None and _v!="":
  84. if isinstance(_v,list):
  85. _v = json.dumps(_v)
  86. _list.append((_key,_v))
  87. return _list
  88. # def delete_row(self,ots_client):
  89. # raise NotImplementedError()
  90. def isLegalUrl(self,_url,_type):
  91. _flag = False
  92. for _prefix in self.prefixs:
  93. if _url.find(_prefix)>=0:
  94. _flag = True
  95. if _type==0:
  96. if _flag:
  97. return True
  98. else:
  99. return False
  100. else:
  101. if _flag:
  102. return False
  103. else:
  104. return True
  105. def fromInitialed(self):
  106. self.setValue(document_status,random.randint(1,50),True)
  107. def fromEas2Maxcompute(self):
  108. self.setValue(document_status,random.randint(151,170),True)
  109. def fromEasFailed(self):
  110. self.setValue(document_status,random.randint(51,60),True)
  111. def fromEas2Extract(self):
  112. self.setValue(document_status,random.randint(61,70),True)
  113. def updateSWFImages(self,swf_urls):
  114. if len(swf_urls)>0:
  115. _dochtmlcon = self.getProperties().get(document_dochtmlcon)
  116. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  117. if _soup.find("img",{"src":swf_urls[0]}) is None:
  118. _div = '<div class="swf_urls">'
  119. for _url in swf_urls:
  120. _div += '<p><img src="%s"/></p>'%(_url)
  121. _div += "</div>"
  122. _dochtmlcon += _div
  123. self.setValue(document_dochtmlcon,_dochtmlcon,True)
  124. def getRichTextFetch(self,list_html):
  125. _text = ""
  126. for _ht in list_html:
  127. if isinstance(_ht,str):
  128. _text += "<div>%s</div>"%(_ht)
  129. elif isinstance(_ht,dict):
  130. _filemd5 = _ht.get("filemd5","")
  131. _html = _ht.get("html","")
  132. _text += '<div filemd5="%s">%s</div>'%(_filemd5,_html)
  133. if len(_text)>50000:
  134. _soup = BeautifulSoup(_text,"lxml")
  135. _soup = article_limit(_soup,50000)
  136. _text = re.sub("<html>|</html>|<body>|</body>","",str(_soup))
  137. return _text
  138. def updateAttachment(self,list_html):
  139. if len(list_html)>0:
  140. _dochtmlcon = self.getProperties().get(document_dochtmlcon,"")
  141. _dochtmlcon = re.sub("<html>|</html>|<body>|</body>","",_dochtmlcon)
  142. _dochtmlcon_len = len(bytes(_dochtmlcon,encoding="utf8"))
  143. fix_len = self.COLUMN_MAX_SIZE-_dochtmlcon_len-100
  144. # _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%("\n".join(list_html))
  145. _text = '<div style="display:none;" class="richTextFetch">%s</div>'%(self.getRichTextFetch(list_html))
  146. if _dochtmlcon is not None:
  147. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  148. _node = _soup.find("div",attrs={"class":"richTextFetch"})
  149. if _node is not None:
  150. _node.decompose()
  151. self.setValue(document_dochtmlcon,str(_soup)+_text,True)
  152. def getTitleFromHtml(self,filemd5,_html):
  153. _soup = BeautifulSoup(_html,"lxml")
  154. _find = _soup.find("a",attrs={"data":filemd5})
  155. _title = ""
  156. if _find is not None:
  157. _title = _find.get_text()
  158. return _title
  159. def getSourceLinkFromHtml(self,filemd5,_html):
  160. _soup = BeautifulSoup(_html,"lxml")
  161. _find = _soup.find("a",attrs={"filelink":filemd5})
  162. filelink = ""
  163. if _find is None:
  164. _find = _soup.find("img",attrs={"filelink":filemd5})
  165. if _find is not None:
  166. filelink = _find.attrs.get("src","")
  167. else:
  168. filelink = _find.attrs.get("href","")
  169. return filelink
  170. import random
  171. def turn_extract_status():
  172. from BaseDataMaintenance.dataSource.source import getConnect_ots
  173. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  174. import queue
  175. from threading import Thread
  176. import json
  177. task_queue = queue.Queue()
  178. from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
  179. ots_client = getConnect_ots()
  180. def producer(task_queue,ots_client):
  181. bool_query = BoolQuery(must_queries=[
  182. # WildcardQuery(document_web_source_no,"00295*"),
  183. # RangeQuery(document_crtime,"2021-07-26 00:00:00"),
  184. RangeQuery(document_status,61,70,True,True),
  185. #TermQuery(document_docid,171146519),
  186. ]
  187. )
  188. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  189. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
  190. columns_to_get=ColumnsToGet([document_fingerprint],return_type=ColumnReturnType.SPECIFIED))
  191. list_data = getRow_ots(rows)
  192. print(total_count)
  193. _count = len(list_data)
  194. for _data in list_data:
  195. _document = Document(_data)
  196. task_queue.put(_document)
  197. while next_token:
  198. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  199. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  200. columns_to_get=ColumnsToGet([document_fingerprint],return_type=ColumnReturnType.SPECIFIED))
  201. list_data = getRow_ots(rows)
  202. _count += len(list_data)
  203. print("%d/%d"%(_count,total_count))
  204. for _data in list_data:
  205. _document = Document(_data)
  206. task_queue.put(_document)
  207. def _handle(item,result_queue,ots_client):
  208. #change attach value
  209. # list_attachment = json.loads(item.getProperties().get(document_attachment_path))
  210. # print("docid",item.getProperties().get(document_docid))
  211. # for attach in list_attachment:
  212. #
  213. # filemd5 = attach.get(document_attachment_path_filemd5,"")
  214. # _document_html = item.getProperties().get(document_dochtmlcon,"")
  215. #
  216. # _file_title = item.getTitleFromHtml(filemd5,_document_html)
  217. # filelink = item.getSourceLinkFromHtml(filemd5,_document_html)
  218. # attach[document_attachment_path_fileTitle] = _file_title
  219. # attach[document_attachment_path_fileLink] = filelink
  220. #
  221. # item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
  222. # item.all_columns.remove(document_dochtmlcon)
  223. #change status
  224. item.setValue(document_status,random.randint(1,50),True)
  225. item.update_row(ots_client)
  226. t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
  227. t_producer.start()
  228. t_producer.join()
  229. # mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
  230. # mt.run()
  231. dict_fingerprint = {}
  232. while True:
  233. try:
  234. item = task_queue.get(timeout=2)
  235. fingerprint = item.getProperties().get(document_fingerprint)
  236. if fingerprint is not None:
  237. if fingerprint not in dict_fingerprint:
  238. dict_fingerprint[fingerprint] = []
  239. dict_fingerprint[fingerprint].append(item)
  240. except Exception as e:
  241. print(e)
  242. break
  243. print(len(dict_fingerprint.keys()))
  244. status_queue = queue.Queue()
  245. for k,v in dict_fingerprint.items():
  246. print("key",k,len(v))
  247. v.sort(key=lambda x:x.docid)
  248. for _d in v[1:]:
  249. _d.setValue(document_status,random.randint(401,450),True)
  250. status_queue.put(_d)
  251. mt = MultiThreadHandler(status_queue,_handle,None,30,ots_client=ots_client)
  252. mt.run()
  253. def turn_document_status():
  254. from BaseDataMaintenance.dataSource.source import getConnect_ots
  255. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  256. import queue
  257. from threading import Thread
  258. import json
  259. task_queue = queue.Queue()
  260. from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
  261. ots_client = getConnect_ots()
  262. def producer(task_queue,ots_client):
  263. from BaseDataMaintenance.model.ots.document_tmp import Document_tmp
  264. bool_query = BoolQuery(
  265. must_queries=[
  266. # MatchPhraseQuery("doctitle","珠海城市职业技术学院2022年05月至2022年06月政府采购意向"),
  267. WildcardQuery("web_source_no","03716-*"),
  268. RangeQuery("page_time","2024-04-24"),
  269. TermQuery("save",1)
  270. # RangeQuery("status",0,1),
  271. # NestedQuery("page_attachments",ExistsQuery("page_attachments.fileMd5")),
  272. # TermQuery("docid",397656324)
  273. # BoolQuery(should_queries=[
  274. # # TermQuery("tenderee","山西利民工业有限责任公司"),
  275. # # MatchPhraseQuery("doctitle","中国电信"),
  276. # # MatchPhraseQuery("doctextcon","中国电信"),
  277. # # MatchPhraseQuery("attachmenttextcon","中国电信")]),
  278. # # RangeQuery(document_status,88,120,True,True),
  279. # RangeQuery("page_time","2022-03-24","2022-03-25",True,False),
  280. # ExistsQuery
  281. # #,TermQuery(document_docid,171146519)
  282. # ]
  283. # )
  284. ],
  285. # must_not_queries=[WildcardQuery("DX004354*")]
  286. )
  287. # bool_query = BoolQuery(
  288. # # must_queries=[
  289. # # RangeQuery("crtime","2023-08-30 15:00:00","2023-08-30 23:59:59"),
  290. # # NestedQuery("page_attachments",ExistsQuery("page_attachments.fileMd5"))
  291. # # ],
  292. # # must_not_queries=[WildcardQuery("attachmenttextcon","*")],
  293. # should_queries=[
  294. # NestedQuery("sub_docs_json",TermQuery("sub_docs_json.win_tenderer","个体工商户")),
  295. # NestedQuery("sub_docs_json",TermQuery("sub_docs_json.win_tenderer","机械设备")),
  296. # ]
  297. #
  298. # )
  299. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  300. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
  301. columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
  302. list_data = getRow_ots(rows)
  303. print(total_count)
  304. _count = len(list_data)
  305. for _data in list_data:
  306. _document = Document_tmp(_data)
  307. task_queue.put(_document)
  308. while next_token:
  309. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  310. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  311. columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
  312. list_data = getRow_ots(rows)
  313. _count += len(list_data)
  314. print("%d/%d"%(_count,total_count))
  315. for _data in list_data:
  316. _document = Document_tmp(_data)
  317. task_queue.put(_document)
  318. # docids = [223820830,224445409]
  319. # for docid in docids:
  320. # _dict = {document_docid:int(docid),
  321. # document_partitionkey:int(docid)%500+1,
  322. # }
  323. # task_queue.put(Document(_dict))
  324. # import pandas as pd
  325. # df = pd.read_excel(r"F:\Workspace2016\DataMining\export\abc1.xlsx")
  326. # for docid in df["docid1"]:
  327. # _dict = {document_docid:int(docid),
  328. # document_partitionkey:int(docid)%500+1,
  329. # }
  330. # task_queue.put(Document(_dict))
  331. # for docid in df["docid2"]:
  332. # _dict = {document_docid:int(docid),
  333. # document_partitionkey:int(docid)%500+1,
  334. # }
  335. # task_queue.put(Document(_dict))
  336. # log("task_queue size:%d"%(task_queue.qsize()))
  337. def _handle(item,result_queue,ots_client):
  338. #change attach value
  339. # list_attachment = json.loads(item.getProperties().get(document_attachment_path))
  340. # print("docid",item.getProperties().get(document_docid))
  341. # for attach in list_attachment:
  342. #
  343. # filemd5 = attach.get(document_attachment_path_filemd5,"")
  344. # _document_html = item.getProperties().get(document_dochtmlcon,"")
  345. #
  346. # _file_title = item.getTitleFromHtml(filemd5,_document_html)
  347. # filelink = item.getSourceLinkFromHtml(filemd5,_document_html)
  348. # attach[document_attachment_path_fileTitle] = _file_title
  349. # attach[document_attachment_path_fileLink] = filelink
  350. #
  351. # item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
  352. # item.all_columns.remove(document_dochtmlcon)
  353. #change status
  354. # item.setValue(document_docchannel,item.getProperties().get(document_original_docchannel),True)
  355. # item.setValue(document_status,random.randint(151,171),True)
  356. # item.setValue(document_area,"华南",True)
  357. # item.setValue(document_province,"广东",True)
  358. # item.setValue(document_city,"珠海",True)
  359. # item.setValue(document_district,"金湾区",True)
  360. # item.setValue(document_status,66,True)
  361. # print(item.getProperties())
  362. # item.update_row(ots_client)
  363. # log("update %d status done"%(item.getProperties().get(document_docid)))
  364. pass
  365. t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
  366. t_producer.start()
  367. t_producer.join()
  368. mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
  369. mt.run()
  370. def drop_extract2():
  371. from BaseDataMaintenance.dataSource.source import getConnect_ots
  372. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  373. import queue
  374. from threading import Thread
  375. import json
  376. task_queue = queue.Queue()
  377. from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
  378. ots_client = getConnect_ots()
  379. from BaseDataMaintenance.model.ots.document_extract2 import Document_extract2
  380. def producer(task_queue,ots_client):
  381. bool_query = BoolQuery(must_queries=[
  382. BoolQuery(should_queries=[
  383. # TermQuery("tenderee","山西利民工业有限责任公司"),
  384. # MatchPhraseQuery("doctitle","中国电信"),
  385. # MatchPhraseQuery("doctextcon","中国电信"),
  386. # MatchPhraseQuery("attachmenttextcon","中国电信")]),
  387. RangeQuery("status",1,1000,True,True),
  388. # RangeQuery("page_time","2021-12-20","2022-01-05",True,False),
  389. #,TermQuery(document_docid,171146519)
  390. ]
  391. ),
  392. # TermQuery("docid",228359000)
  393. ],
  394. # must_not_queries=[NestedQuery("sub_docs_json",WildcardQuery("sub_docs_json.win_tenderer","*"))]
  395. )
  396. rows,next_token,total_count,is_all_succeed = ots_client.search("document_extract2","document_extract2_index",
  397. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
  398. columns_to_get=ColumnsToGet(["status"],return_type=ColumnReturnType.SPECIFIED))
  399. list_data = getRow_ots(rows)
  400. print(total_count)
  401. _count = len(list_data)
  402. for _data in list_data:
  403. task_queue.put(_data)
  404. while next_token:
  405. rows,next_token,total_count,is_all_succeed = ots_client.search("document_extract2","document_extract2_index",
  406. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  407. columns_to_get=ColumnsToGet(["status"],return_type=ColumnReturnType.SPECIFIED))
  408. list_data = getRow_ots(rows)
  409. _count += len(list_data)
  410. print("%d/%d"%(_count,total_count))
  411. for _data in list_data:
  412. task_queue.put(_data)
  413. # docids = [223820830,224445409]
  414. # for docid in docids:
  415. # _dict = {document_docid:int(docid),
  416. # document_partitionkey:int(docid)%500+1,
  417. # }
  418. # task_queue.put(Document(_dict))
  419. # import pandas as pd
  420. # df = pd.read_excel("2022-01-19_214304_export11.xlsx")
  421. # for docid,tenderee,win in zip(df["docid"],df["招标单位"],df["中标单位"]):
  422. # if not isinstance(tenderee,(str)) or not isinstance(win,(str)) or win=="" or tenderee=="":
  423. # # print(docid)
  424. # _dict = {document_docid:int(docid),
  425. # document_partitionkey:int(docid)%500+1,
  426. # }
  427. # task_queue.put(Document(_dict))
  428. log("task_queue size:%d"%(task_queue.qsize()))
  429. def _handle(item,result_queue,ots_client):
  430. #change attach value
  431. # list_attachment = json.loads(item.getProperties().get(document_attachment_path))
  432. # print("docid",item.getProperties().get(document_docid))
  433. # for attach in list_attachment:
  434. #
  435. # filemd5 = attach.get(document_attachment_path_filemd5,"")
  436. # _document_html = item.getProperties().get(document_dochtmlcon,"")
  437. #
  438. # _file_title = item.getTitleFromHtml(filemd5,_document_html)
  439. # filelink = item.getSourceLinkFromHtml(filemd5,_document_html)
  440. # attach[document_attachment_path_fileTitle] = _file_title
  441. # attach[document_attachment_path_fileLink] = filelink
  442. #
  443. # item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
  444. # item.all_columns.remove(document_dochtmlcon)
  445. #change status
  446. # item.setValue(document_docchannel,item.getProperties().get(document_original_docchannel),True)
  447. # item.setValue(document_status,random.randint(151,170),True)
  448. # item.update_row(ots_client)
  449. # log("update %d status done"%(item.getProperties().get(document_docid)))
  450. _dict = {}
  451. _dict.update(item)
  452. _dict.pop("status")
  453. _dict["status"] = 1
  454. print(_dict)
  455. _document = Document(_dict)
  456. _document.update_row(ots_client)
  457. _d_extract = Document_extract2(_dict)
  458. _d_extract.delete_row(ots_client)
  459. pass
  460. t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
  461. t_producer.start()
  462. t_producer.join()
  463. mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
  464. mt.run()
  465. def fixDocumentHtml():
  466. from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_ots_capacity
  467. from queue import Queue
  468. ots_client = getConnect_ots()
  469. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  470. from BaseDataMaintenance.model.ots.document_html import Document_html
  471. capacity_client = getConnect_ots_capacity()
  472. list_data = []
  473. bool_query = BoolQuery(must_queries=[
  474. MatchPhraseQuery("doctextcon","信友-城市之光"),
  475. MatchPhraseQuery("doctextcon","Copyright"),
  476. # TermQuery("docid",254249505)
  477. ])
  478. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  479. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
  480. columns_to_get=ColumnsToGet(["doctextcon"],return_type=ColumnReturnType.SPECIFIED))
  481. print("total_count",total_count)
  482. list_data.extend(getRow_ots(rows))
  483. while next_token:
  484. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  485. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  486. columns_to_get=ColumnsToGet(["doctextcon"],return_type=ColumnReturnType.SPECIFIED))
  487. list_data.extend(getRow_ots(rows))
  488. task_queue = Queue()
  489. for _data in list_data:
  490. task_queue.put(_data)
  491. _pattern = "(?P<_find>城市之光.*Ltd.)"
  492. _pattern1 = "(?P<_find>Evaluation.*Ltd.)"
  493. def _handle(item,result_queue):
  494. _doctextcon = item.get("doctextcon")
  495. _search = re.search(_pattern,_doctextcon)
  496. print(_search.groupdict().get("_find"))
  497. item["doctextcon"] = re.sub(_pattern,"",_doctextcon)
  498. _d = Document(item)
  499. _d.update_row(ots_client)
  500. _d1 = {"partitionkey":item.get("partitionkey"),
  501. "docid":item.get("docid")}
  502. _dh = Document(_d1)
  503. _dh.fix_columns(capacity_client,["dochtmlcon"],True)
  504. _dochtmlcon = _dh.getProperties().get("dochtmlcon")
  505. _dochtmlcon = re.sub("\n","",_dochtmlcon)
  506. _search = re.search(_pattern1,_dochtmlcon)
  507. _dochtmlcon = re.sub(_pattern1,"",_dochtmlcon)
  508. _d1["dochtmlcon"] = _dochtmlcon
  509. _dh = Document(_d1)
  510. _dh.update_row(capacity_client)
  511. # print(re.sub(_pattern,"</div><p><span>",_dochtmlcon))
  512. mt = MultiThreadHandler(task_queue,_handle,None,2)
  513. mt.run()
  514. def delete_documents():
  515. from BaseDataMaintenance.dataSource.source import getConnect_ots
  516. from BaseDataMaintenance.dataSource.source import getConnect_ots_capacity
  517. ots_client = getConnect_ots()
  518. ots_capacity = getConnect_ots_capacity()
  519. import pandas as pd
  520. df = pd.read_excel("2022-10-14_190838_数据导出.xlsx")
  521. _count = 0
  522. for _docid in df["docid"]:
  523. partitionkey = int(_docid)%500+1
  524. _d = {document_partitionkey:partitionkey,
  525. document_docid:int(_docid)}
  526. _doc = Document(_d)
  527. _doc.delete_row(ots_client)
  528. _doc.delete_row(ots_capacity)
  529. _count += 1
  530. print(_docid)
  531. print("delete count:%d"%_count)
  532. def turn_document_docchannel():
  533. from BaseDataMaintenance.dataSource.source import getConnect_ots
  534. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  535. import queue
  536. from threading import Thread
  537. import json
  538. task_queue = queue.Queue()
  539. from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
  540. ots_client = getConnect_ots()
  541. def producer(task_queue,ots_client):
  542. bool_query = BoolQuery(
  543. must_queries=[
  544. TermQuery("web_source_no","DX007520-7"),
  545. # TermQuery("docid",363793104)
  546. # MatchPhraseQuery("doctitle","珠海城市职业技术学院2022年05月至2022年06月政府采购意向"),
  547. # BoolQuery(should_queries=[
  548. # # TermQuery("tenderee","山西利民工业有限责任公司"),
  549. # # MatchPhraseQuery("doctitle","中国电信"),
  550. # # MatchPhraseQuery("doctextcon","中国电信"),
  551. # # MatchPhraseQuery("attachmenttextcon","中国电信")]),
  552. # # RangeQuery(document_status,88,120,True,True),
  553. # RangeQuery("page_time","2022-03-24","2022-03-25",True,False),
  554. # ExistsQuery
  555. # #,TermQuery(document_docid,171146519)
  556. # ]
  557. # )
  558. ],
  559. # must_not_queries=[WildcardQuery("DX004354*")]
  560. )
  561. # bool_query = BoolQuery(
  562. # # must_queries=[
  563. # # RangeQuery("crtime","2023-08-30 15:00:00","2023-08-30 23:59:59"),
  564. # # NestedQuery("page_attachments",ExistsQuery("page_attachments.fileMd5"))
  565. # # ],
  566. # # must_not_queries=[WildcardQuery("attachmenttextcon","*")],
  567. # should_queries=[
  568. # NestedQuery("sub_docs_json",TermQuery("sub_docs_json.win_tenderer","个体工商户")),
  569. # NestedQuery("sub_docs_json",TermQuery("sub_docs_json.win_tenderer","机械设备")),
  570. # ]
  571. #
  572. # )
  573. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  574. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
  575. columns_to_get=ColumnsToGet(["detail_link"],return_type=ColumnReturnType.SPECIFIED))
  576. list_data = getRow_ots(rows)
  577. print(total_count)
  578. _count = len(list_data)
  579. for _data in list_data:
  580. _document = Document(_data)
  581. task_queue.put(_document)
  582. while next_token:
  583. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  584. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  585. columns_to_get=ColumnsToGet(["detail_link"],return_type=ColumnReturnType.SPECIFIED))
  586. list_data = getRow_ots(rows)
  587. _count += len(list_data)
  588. print("%d/%d"%(_count,total_count))
  589. for _data in list_data:
  590. _document = Document(_data)
  591. task_queue.put(_document)
  592. # docids = [223820830,224445409]
  593. # for docid in docids:
  594. # _dict = {document_docid:int(docid),
  595. # document_partitionkey:int(docid)%500+1,
  596. # }
  597. # task_queue.put(Document(_dict))
  598. # import pandas as pd
  599. # df = pd.read_excel("G:\\20221212error.xlsx")
  600. # for docid in df["docid"]:
  601. # _dict = {document_docid:int(docid),
  602. # document_partitionkey:int(docid)%500+1,
  603. # }
  604. # task_queue.put(Document(_dict))
  605. log("task_queue size:%d"%(task_queue.qsize()))
  606. def _handle(item,result_queue,ots_client):
  607. #change attach value
  608. # list_attachment = json.loads(item.getProperties().get(document_attachment_path))
  609. # print("docid",item.getProperties().get(document_docid))
  610. # for attach in list_attachment:
  611. #
  612. # filemd5 = attach.get(document_attachment_path_filemd5,"")
  613. # _document_html = item.getProperties().get(document_dochtmlcon,"")
  614. #
  615. # _file_title = item.getTitleFromHtml(filemd5,_document_html)
  616. # filelink = item.getSourceLinkFromHtml(filemd5,_document_html)
  617. # attach[document_attachment_path_fileTitle] = _file_title
  618. # attach[document_attachment_path_fileLink] = filelink
  619. #
  620. # item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
  621. # item.all_columns.remove(document_dochtmlcon)
  622. #change status
  623. # item.setValue(document_docchannel,item.getProperties().get(document_original_docchannel),True)
  624. # item.setValue(document_status,random.randint(151,171),True)
  625. # item.setValue(document_area,"华南",True)
  626. # item.setValue(document_province,"广东",True)
  627. # item.setValue(document_city,"珠海",True)
  628. # item.setValue(document_district,"金湾区",True)
  629. # item.setValue(document_status,1,True)
  630. # print(item.getProperties())
  631. # item.update_row(ots_client)
  632. detail_link = item.getProperties().get("detail_link","")
  633. if "/012002002/" in detail_link:
  634. partitionkey = item.getProperties().get("partitionkey")
  635. docid = item.getProperties().get("docid")
  636. _dict = {document_partitionkey:partitionkey,
  637. document_docid:docid,
  638. document_docchannel:101,
  639. document_original_docchannel:101}
  640. doc = Document(_dict)
  641. doc.update_row(ots_client)
  642. print(_dict)
  643. # log("update %d status done"%(item.getProperties().get(document_docid)))
  644. pass
  645. t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
  646. t_producer.start()
  647. t_producer.join()
  648. mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
  649. mt.run()
  650. if __name__=="__main__":
  651. # turn_extract_status()
  652. turn_document_status()
  653. # drop_extract2()
  654. # fixDocumentHtml()
  655. # turn_document_docchannel()