document.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. from BaseDataMaintenance.model.ots.BaseModel import BaseModel
  2. from tablestore import *
  3. from BaseDataMaintenance.common.Utils import *
  4. from bs4 import BeautifulSoup
  5. document_partitionkey = "partitionkey"
  6. document_docid = "docid"
  7. document_dochtmlcon = "dochtmlcon"
  8. document_doctextcon = "doctextcon"
  9. document_doctitle = "doctitle"
  10. document_attachmenttextcon = "attachmenttextcon"
  11. document_attachment_path = "page_attachments"
  12. document_attachment_path_filemd5 = "fileMd5"
  13. document_attachment_path_fileTitle = "fileTitle"
  14. document_attachment_path_fileLink = "fileLink"
  15. document_crtime = "crtime"
  16. document_status = "status"
  17. document_page_time = "page_time"
  18. document_attachment_extract_status = "attachment_extract_status"
  19. document_web_source_no = "web_source_no"
  20. document_fingerprint = "fingerprint"
  21. document_opertime = "opertime"
  22. document_docchannel = "docchannel"
  23. document_original_docchannel = "original_docchannel"
  24. document_area = "area"
  25. document_province = "province"
  26. document_city = "city"
  27. document_district = "district"
  28. class Document(BaseModel):
  29. def __init__(self,_dict):
  30. BaseModel.__init__(self)
  31. for k,v in _dict.items():
  32. self.setValue(k,v,True)
  33. self.table_name = "document"
  34. self.prefixs = ["www.bidizhaobiao.com","bxkc.oss-cn-shanghai.aliyuncs.com"]
  35. def getPrimary_keys(self):
  36. return ["partitionkey","docid"]
  37. def delete_row(self,ots_client):
  38. raise NotImplementedError()
  39. def isLegalUrl(self,_url,_type):
  40. _flag = False
  41. for _prefix in self.prefixs:
  42. if _url.find(_prefix)>=0:
  43. _flag = True
  44. if _type==0:
  45. if _flag:
  46. return True
  47. else:
  48. return False
  49. else:
  50. if _flag:
  51. return False
  52. else:
  53. return True
  54. def fromInitialed(self):
  55. self.setValue(document_status,random.randint(1,50),True)
  56. def fromEas2Maxcompute(self):
  57. self.setValue(document_status,random.randint(151,170),True)
  58. def fromEasFailed(self):
  59. self.setValue(document_status,random.randint(51,60),True)
  60. def fromEas2Extract(self):
  61. self.setValue(document_status,random.randint(61,70),True)
  62. def updateSWFImages(self,swf_urls):
  63. if len(swf_urls)>0:
  64. _dochtmlcon = self.getProperties().get(document_dochtmlcon)
  65. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  66. if _soup.find("img",{"src":swf_urls[0]}) is None:
  67. _div = '<div class="swf_urls">'
  68. for _url in swf_urls:
  69. _div += '<p><img src="%s"/></p>'%(_url)
  70. _div += "</div>"
  71. _dochtmlcon += _div
  72. self.setValue(document_dochtmlcon,_dochtmlcon,True)
  73. def updateAttachment(self,list_html):
  74. if len(list_html)>0:
  75. _dochtmlcon = self.getProperties().get(document_dochtmlcon,"")
  76. _dochtmlcon_len = len(bytes(_dochtmlcon,encoding="utf8"))
  77. fix_len = self.COLUMN_MAX_SIZE-_dochtmlcon_len-100
  78. _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%("\n".join(list_html))
  79. if len(bytes(_text,encoding="utf8"))>fix_len:
  80. list_t = []
  81. for _html in list_html:
  82. list_t.append(BeautifulSoup(_html,"lxml").get_text())
  83. _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%("\n".join(cut_str(list_html,list_t,fix_len)))
  84. if _dochtmlcon is not None:
  85. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  86. _node = _soup.find("div",attrs={"class":"richTextFetch"})
  87. if _node is not None:
  88. _node.decompose()
  89. self.setValue(document_dochtmlcon,str(_soup)+_text,True)
  90. def getTitleFromHtml(self,filemd5,_html):
  91. _soup = BeautifulSoup(_html,"lxml")
  92. _find = _soup.find("a",attrs={"data":filemd5})
  93. _title = ""
  94. if _find is not None:
  95. _title = _find.get_text()
  96. return _title
  97. def getSourceLinkFromHtml(self,filemd5,_html):
  98. _soup = BeautifulSoup(_html,"lxml")
  99. _find = _soup.find("a",attrs={"filelink":filemd5})
  100. filelink = ""
  101. if _find is None:
  102. _find = _soup.find("img",attrs={"filelink":filemd5})
  103. if _find is not None:
  104. filelink = _find.attrs.get("src","")
  105. else:
  106. filelink = _find.attrs.get("href","")
  107. return filelink
  108. import random
  109. def turn_extract_status():
  110. from BaseDataMaintenance.dataSource.source import getConnect_ots
  111. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  112. import queue
  113. from threading import Thread
  114. import json
  115. task_queue = queue.Queue()
  116. from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
  117. ots_client = getConnect_ots()
  118. def producer(task_queue,ots_client):
  119. bool_query = BoolQuery(must_queries=[
  120. # WildcardQuery(document_web_source_no,"00295*"),
  121. # RangeQuery(document_crtime,"2021-07-26 00:00:00"),
  122. RangeQuery(document_status,61,70,True,True),
  123. #TermQuery(document_docid,171146519),
  124. ]
  125. )
  126. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  127. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
  128. columns_to_get=ColumnsToGet([document_fingerprint],return_type=ColumnReturnType.SPECIFIED))
  129. list_data = getRow_ots(rows)
  130. print(total_count)
  131. _count = len(list_data)
  132. for _data in list_data:
  133. _document = Document(_data)
  134. task_queue.put(_document)
  135. while next_token:
  136. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  137. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  138. columns_to_get=ColumnsToGet([document_fingerprint],return_type=ColumnReturnType.SPECIFIED))
  139. list_data = getRow_ots(rows)
  140. _count += len(list_data)
  141. print("%d/%d"%(_count,total_count))
  142. for _data in list_data:
  143. _document = Document(_data)
  144. task_queue.put(_document)
  145. def _handle(item,result_queue,ots_client):
  146. #change attach value
  147. # list_attachment = json.loads(item.getProperties().get(document_attachment_path))
  148. # print("docid",item.getProperties().get(document_docid))
  149. # for attach in list_attachment:
  150. #
  151. # filemd5 = attach.get(document_attachment_path_filemd5,"")
  152. # _document_html = item.getProperties().get(document_dochtmlcon,"")
  153. #
  154. # _file_title = item.getTitleFromHtml(filemd5,_document_html)
  155. # filelink = item.getSourceLinkFromHtml(filemd5,_document_html)
  156. # attach[document_attachment_path_fileTitle] = _file_title
  157. # attach[document_attachment_path_fileLink] = filelink
  158. #
  159. # item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
  160. # item.all_columns.remove(document_dochtmlcon)
  161. #change status
  162. item.setValue(document_status,random.randint(1,50),True)
  163. item.update_row(ots_client)
  164. t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
  165. t_producer.start()
  166. t_producer.join()
  167. # mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
  168. # mt.run()
  169. dict_fingerprint = {}
  170. while True:
  171. try:
  172. item = task_queue.get(timeout=2)
  173. fingerprint = item.getProperties().get(document_fingerprint)
  174. if fingerprint is not None:
  175. if fingerprint not in dict_fingerprint:
  176. dict_fingerprint[fingerprint] = []
  177. dict_fingerprint[fingerprint].append(item)
  178. except Exception as e:
  179. print(e)
  180. break
  181. print(len(dict_fingerprint.keys()))
  182. status_queue = queue.Queue()
  183. for k,v in dict_fingerprint.items():
  184. print("key",k,len(v))
  185. v.sort(key=lambda x:x.docid)
  186. for _d in v[1:]:
  187. _d.setValue(document_status,random.randint(401,450),True)
  188. status_queue.put(_d)
  189. mt = MultiThreadHandler(status_queue,_handle,None,30,ots_client=ots_client)
  190. mt.run()
  191. def turn_document_status():
  192. from BaseDataMaintenance.dataSource.source import getConnect_ots
  193. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  194. import queue
  195. from threading import Thread
  196. import json
  197. task_queue = queue.Queue()
  198. from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
  199. ots_client = getConnect_ots()
  200. def producer(task_queue,ots_client):
  201. bool_query = BoolQuery(
  202. must_queries=[
  203. MatchPhraseQuery("doctitle","珠海城市职业技术学院2022年05月至2022年06月政府采购意向"),
  204. # BoolQuery(should_queries=[
  205. # # TermQuery("tenderee","山西利民工业有限责任公司"),
  206. # # MatchPhraseQuery("doctitle","中国电信"),
  207. # # MatchPhraseQuery("doctextcon","中国电信"),
  208. # # MatchPhraseQuery("attachmenttextcon","中国电信")]),
  209. # # RangeQuery(document_status,88,120,True,True),
  210. # RangeQuery("page_time","2022-03-24","2022-03-25",True,False),
  211. # ExistsQuery
  212. # #,TermQuery(document_docid,171146519)
  213. # ]
  214. # )
  215. ],
  216. # must_not_queries=[WildcardQuery("DX004354*")]
  217. )
  218. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  219. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
  220. columns_to_get=ColumnsToGet([document_area],return_type=ColumnReturnType.SPECIFIED))
  221. list_data = getRow_ots(rows)
  222. print(total_count)
  223. _count = len(list_data)
  224. for _data in list_data:
  225. _document = Document(_data)
  226. task_queue.put(_document)
  227. while next_token:
  228. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  229. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  230. columns_to_get=ColumnsToGet([document_area],return_type=ColumnReturnType.SPECIFIED))
  231. list_data = getRow_ots(rows)
  232. _count += len(list_data)
  233. print("%d/%d"%(_count,total_count))
  234. for _data in list_data:
  235. _document = Document(_data)
  236. task_queue.put(_document)
  237. # docids = [223820830,224445409]
  238. # for docid in docids:
  239. # _dict = {document_docid:int(docid),
  240. # document_partitionkey:int(docid)%500+1,
  241. # }
  242. # task_queue.put(Document(_dict))
  243. # import pandas as pd
  244. # df = pd.read_excel("2022-01-19_214304_export11.xlsx")
  245. # for docid,tenderee,win in zip(df["docid"],df["招标单位"],df["中标单位"]):
  246. # if not isinstance(tenderee,(str)) or not isinstance(win,(str)) or win=="" or tenderee=="":
  247. # # print(docid)
  248. # _dict = {document_docid:int(docid),
  249. # document_partitionkey:int(docid)%500+1,
  250. # }
  251. # task_queue.put(Document(_dict))
  252. log("task_queue size:%d"%(task_queue.qsize()))
  253. def _handle(item,result_queue,ots_client):
  254. #change attach value
  255. # list_attachment = json.loads(item.getProperties().get(document_attachment_path))
  256. # print("docid",item.getProperties().get(document_docid))
  257. # for attach in list_attachment:
  258. #
  259. # filemd5 = attach.get(document_attachment_path_filemd5,"")
  260. # _document_html = item.getProperties().get(document_dochtmlcon,"")
  261. #
  262. # _file_title = item.getTitleFromHtml(filemd5,_document_html)
  263. # filelink = item.getSourceLinkFromHtml(filemd5,_document_html)
  264. # attach[document_attachment_path_fileTitle] = _file_title
  265. # attach[document_attachment_path_fileLink] = filelink
  266. #
  267. # item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
  268. # item.all_columns.remove(document_dochtmlcon)
  269. #change status
  270. # item.setValue(document_docchannel,item.getProperties().get(document_original_docchannel),True)
  271. # item.setValue(document_status,random.randint(151,171),True)
  272. item.setValue(document_area,"华南",True)
  273. item.setValue(document_province,"广东",True)
  274. item.setValue(document_city,"珠海",True)
  275. item.setValue(document_district,"金湾区",True)
  276. item.update_row(ots_client)
  277. log("update %d status done"%(item.getProperties().get(document_docid)))
  278. pass
  279. t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
  280. t_producer.start()
  281. t_producer.join()
  282. mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
  283. mt.run()
  284. def drop_extract2():
  285. from BaseDataMaintenance.dataSource.source import getConnect_ots
  286. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  287. import queue
  288. from threading import Thread
  289. import json
  290. task_queue = queue.Queue()
  291. from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
  292. ots_client = getConnect_ots()
  293. from BaseDataMaintenance.model.ots.document_extract2 import Document_extract2
  294. def producer(task_queue,ots_client):
  295. bool_query = BoolQuery(must_queries=[
  296. BoolQuery(should_queries=[
  297. # TermQuery("tenderee","山西利民工业有限责任公司"),
  298. # MatchPhraseQuery("doctitle","中国电信"),
  299. # MatchPhraseQuery("doctextcon","中国电信"),
  300. # MatchPhraseQuery("attachmenttextcon","中国电信")]),
  301. RangeQuery("status",1,1000,True,True),
  302. # RangeQuery("page_time","2021-12-20","2022-01-05",True,False),
  303. #,TermQuery(document_docid,171146519)
  304. ]
  305. ),
  306. # TermQuery("docid",228359000)
  307. ],
  308. # must_not_queries=[NestedQuery("sub_docs_json",WildcardQuery("sub_docs_json.win_tenderer","*"))]
  309. )
  310. rows,next_token,total_count,is_all_succeed = ots_client.search("document_extract2","document_extract2_index",
  311. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
  312. columns_to_get=ColumnsToGet(["status"],return_type=ColumnReturnType.SPECIFIED))
  313. list_data = getRow_ots(rows)
  314. print(total_count)
  315. _count = len(list_data)
  316. for _data in list_data:
  317. task_queue.put(_data)
  318. while next_token:
  319. rows,next_token,total_count,is_all_succeed = ots_client.search("document_extract2","document_extract2_index",
  320. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  321. columns_to_get=ColumnsToGet(["status"],return_type=ColumnReturnType.SPECIFIED))
  322. list_data = getRow_ots(rows)
  323. _count += len(list_data)
  324. print("%d/%d"%(_count,total_count))
  325. for _data in list_data:
  326. task_queue.put(_data)
  327. # docids = [223820830,224445409]
  328. # for docid in docids:
  329. # _dict = {document_docid:int(docid),
  330. # document_partitionkey:int(docid)%500+1,
  331. # }
  332. # task_queue.put(Document(_dict))
  333. # import pandas as pd
  334. # df = pd.read_excel("2022-01-19_214304_export11.xlsx")
  335. # for docid,tenderee,win in zip(df["docid"],df["招标单位"],df["中标单位"]):
  336. # if not isinstance(tenderee,(str)) or not isinstance(win,(str)) or win=="" or tenderee=="":
  337. # # print(docid)
  338. # _dict = {document_docid:int(docid),
  339. # document_partitionkey:int(docid)%500+1,
  340. # }
  341. # task_queue.put(Document(_dict))
  342. log("task_queue size:%d"%(task_queue.qsize()))
  343. def _handle(item,result_queue,ots_client):
  344. #change attach value
  345. # list_attachment = json.loads(item.getProperties().get(document_attachment_path))
  346. # print("docid",item.getProperties().get(document_docid))
  347. # for attach in list_attachment:
  348. #
  349. # filemd5 = attach.get(document_attachment_path_filemd5,"")
  350. # _document_html = item.getProperties().get(document_dochtmlcon,"")
  351. #
  352. # _file_title = item.getTitleFromHtml(filemd5,_document_html)
  353. # filelink = item.getSourceLinkFromHtml(filemd5,_document_html)
  354. # attach[document_attachment_path_fileTitle] = _file_title
  355. # attach[document_attachment_path_fileLink] = filelink
  356. #
  357. # item.setValue(document_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
  358. # item.all_columns.remove(document_dochtmlcon)
  359. #change status
  360. # item.setValue(document_docchannel,item.getProperties().get(document_original_docchannel),True)
  361. # item.setValue(document_status,random.randint(151,170),True)
  362. # item.update_row(ots_client)
  363. # log("update %d status done"%(item.getProperties().get(document_docid)))
  364. _dict = {}
  365. _dict.update(item)
  366. _dict.pop("status")
  367. _dict["status"] = 1
  368. print(_dict)
  369. _document = Document(_dict)
  370. _document.update_row(ots_client)
  371. _d_extract = Document_extract2(_dict)
  372. _d_extract.delete_row(ots_client)
  373. pass
  374. t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
  375. t_producer.start()
  376. t_producer.join()
  377. mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
  378. mt.run()
  379. if __name__=="__main__":
  380. # turn_extract_status()
  381. turn_document_status()
  382. # drop_extract2()