document_tmp.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. from BaseDataMaintenance.model.ots.BaseModel import BaseModel
  2. from tablestore import *
  3. from BaseDataMaintenance.common.Utils import *
  4. from bs4 import BeautifulSoup
  5. document_tmp_partitionkey = "partitionkey"
  6. document_tmp_docid = "docid"
  7. document_tmp_dochtmlcon = "dochtmlcon"
  8. document_tmp_doctextcon = "doctextcon"
  9. document_tmp_doctitle = "doctitle"
  10. document_tmp_attachmenttextcon = "attachmenttextcon"
  11. document_tmp_attachment_path = "page_attachments"
  12. document_tmp_attachment_path_filemd5 = "fileMd5"
  13. document_tmp_attachment_path_fileTitle = "fileTitle"
  14. document_tmp_attachment_path_fileLink = "fileLink"
  15. document_tmp_uuid = "uuid"
  16. document_tmp_crtime = "crtime"
  17. document_tmp_status = "status"
  18. document_tmp_tenderee = "tenderee"
  19. document_tmp_agency = "agency"
  20. document_tmp_project_code = "project_code"
  21. document_tmp_product = "product"
  22. document_tmp_project_name = "project_name"
  23. document_tmp_doctitle_refine = "doctitle_refine"
  24. document_tmp_extract_count = "extract_count"
  25. document_tmp_sub_docs_json = "sub_docs_json"
  26. document_tmp_save = "save"
  27. document_tmp_dup_docid = "dup_docid"
  28. document_tmp_best_docid = "best_docid"
  29. document_tmp_merge_uuid = "merge_uuid"
  30. document_tmp_projects = "projects"
  31. document_tmp_page_time = "page_time"
  32. document_tmp_attachment_extract_status = "attachment_extract_status"
  33. document_tmp_web_source_no = "web_source_no"
  34. document_tmp_fingerprint = "fingerprint"
  35. document_tmp_opertime = "opertime"
  36. document_tmp_docchannel = "docchannel"
  37. document_tmp_original_docchannel = "original_docchannel"
  38. document_tmp_extract_json = "extract_json"
  39. document_tmp_industry_json = "industry_json"
  40. document_tmp_other_json = "other_json"
  41. document_tmp_time_bidclose = "time_bidclose"
  42. document_tmp_time_bidopen = "time_bidopen"
  43. document_tmp_time_completion = "time_completion"
  44. document_tmp_time_earnest_money_end = "time_earnest_money_end"
  45. document_tmp_time_earnest_money_start = "time_earnest_money_start"
  46. document_tmp_time_get_file_end = "time_get_file_end"
  47. document_tmp_time_get_file_start = "time_get_file_start"
  48. document_tmp_time_publicity_end = "time_publicity_end"
  49. document_tmp_time_publicity_start = "time_publicity_start"
  50. document_tmp_time_registration_end = "time_registration_end"
  51. document_tmp_time_registration_start = "time_registration_start"
  52. document_tmp_time_release = "time_release"
  53. class Document_tmp(BaseModel):
  54. def __init__(self,_dict):
  55. BaseModel.__init__(self)
  56. for k,v in _dict.items():
  57. self.setValue(k,v,True)
  58. self.table_name = "document_tmp"
  59. self.prefixs = ["www.bidizhaobiao.com","bxkc.oss-cn-shanghai.aliyuncs.com"]
  60. def getPrimary_keys(self):
  61. return [document_tmp_partitionkey,document_tmp_docid]
  62. def isLegalUrl(self,_url,_type):
  63. _flag = False
  64. for _prefix in self.prefixs:
  65. if _url.find(_prefix)>=0:
  66. _flag = True
  67. if _type==0:
  68. if _flag:
  69. return True
  70. else:
  71. return False
  72. else:
  73. if _flag:
  74. return False
  75. else:
  76. return True
  77. def updateSWFImages(self,swf_urls):
  78. if len(swf_urls)>0:
  79. _dochtmlcon = self.getProperties().get(document_tmp_dochtmlcon)
  80. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  81. if _soup.find("img",{"src":swf_urls[0]}) is None:
  82. _div = "<div>"
  83. for _url in swf_urls:
  84. _div += '<p><img src="%s"/></p>'%(_url)
  85. _div += "</div>"
  86. _dochtmlcon += _div
  87. self.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
  88. def getRichTextFetch(self,list_html):
  89. _text = ""
  90. for _ht in list_html:
  91. if isinstance(_ht,str):
  92. _text += "<div>%s</div>"%(_ht)
  93. elif isinstance(_ht,dict):
  94. _filemd5 = _ht.get("filemd5","")
  95. _html = _ht.get("html","")
  96. _text += '<div filemd5="%s">%s</div>'%(_filemd5,_html)
  97. return _text
  98. def updateAttachment(self,list_html):
  99. if len(list_html)>0:
  100. _dochtmlcon = self.getProperties().get(document_tmp_dochtmlcon,"")
  101. _dochtmlcon = re.sub("<html>|</html>|<body>|</body>","",_dochtmlcon)
  102. _dochtmlcon_len = len(bytes(_dochtmlcon,encoding="utf8"))
  103. fix_len = self.COLUMN_MAX_SIZE-_dochtmlcon_len-100
  104. # _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%("\n".join(list_html))
  105. _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%(self.getRichTextFetch(list_html))
  106. if _dochtmlcon is not None:
  107. _soup = BeautifulSoup(_dochtmlcon,"lxml")
  108. _node = _soup.find("div",attrs={"class":"richTextFetch"})
  109. if _node is not None:
  110. _node.decompose()
  111. self.setValue(document_tmp_dochtmlcon,str(_soup)+_text,True)
  112. def getTitleFromHtml(self,filemd5,_html):
  113. _soup = BeautifulSoup(_html,"lxml")
  114. _find = _soup.find("a",attrs={"data":filemd5})
  115. _title = ""
  116. if _find is not None:
  117. _title = _find.get_text()
  118. return _title
  119. def getSourceLinkFromHtml(self,filemd5,_html):
  120. _soup = BeautifulSoup(_html,"lxml")
  121. _find = _soup.find("a",attrs={"filelink":filemd5})
  122. filelink = ""
  123. if _find is None:
  124. _find = _soup.find("img",attrs={"filelink":filemd5})
  125. if _find is not None:
  126. filelink = _find.attrs.get("src","")
  127. else:
  128. filelink = _find.attrs.get("href","")
  129. return filelink
  130. import random
  131. def turn_extract_status():
  132. from BaseDataMaintenance.dataSource.source import getConnect_ots
  133. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  134. import queue
  135. from threading import Thread
  136. import json
  137. task_queue = queue.Queue()
  138. from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
  139. ots_client = getConnect_ots()
  140. def producer(task_queue,ots_client):
  141. bool_query = BoolQuery(must_queries=[
  142. # WildcardQuery(document_tmp_web_source_no,"00295*"),
  143. # RangeQuery(document_tmp_crtime,"2021-07-26 00:00:00"),
  144. RangeQuery(document_tmp_status,61,70,True,True),
  145. #TermQuery(document_tmp_docid,171146519),
  146. ]
  147. )
  148. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_tmp_index",
  149. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
  150. columns_to_get=ColumnsToGet([document_tmp_fingerprint],return_type=ColumnReturnType.SPECIFIED))
  151. list_data = getRow_ots(rows)
  152. print(total_count)
  153. _count = len(list_data)
  154. for _data in list_data:
  155. _document = Document_tmp(_data)
  156. task_queue.put(_document)
  157. while next_token:
  158. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_tmp_index",
  159. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  160. columns_to_get=ColumnsToGet([document_tmp_fingerprint],return_type=ColumnReturnType.SPECIFIED))
  161. list_data = getRow_ots(rows)
  162. _count += len(list_data)
  163. print("%d/%d"%(_count,total_count))
  164. for _data in list_data:
  165. _document = Document_tmp(_data)
  166. task_queue.put(_document)
  167. def _handle(item,result_queue,ots_client):
  168. #change attach value
  169. # list_attachment = json.loads(item.getProperties().get(document_tmp_attachment_path))
  170. # print("docid",item.getProperties().get(document_tmp_docid))
  171. # for attach in list_attachment:
  172. #
  173. # filemd5 = attach.get(document_tmp_attachment_path_filemd5,"")
  174. # _document_tmp_html = item.getProperties().get(document_tmp_dochtmlcon,"")
  175. #
  176. # _file_title = item.getTitleFromHtml(filemd5,_document_tmp_html)
  177. # filelink = item.getSourceLinkFromHtml(filemd5,_document_tmp_html)
  178. # attach[document_tmp_attachment_path_fileTitle] = _file_title
  179. # attach[document_tmp_attachment_path_fileLink] = filelink
  180. #
  181. # item.setValue(document_tmp_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
  182. # item.all_columns.remove(document_tmp_dochtmlcon)
  183. #change status
  184. item.setValue(document_tmp_status,random.randint(1,50),True)
  185. item.update_row(ots_client)
  186. t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
  187. t_producer.start()
  188. t_producer.join()
  189. # mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
  190. # mt.run()
  191. dict_fingerprint = {}
  192. while True:
  193. try:
  194. item = task_queue.get(timeout=2)
  195. fingerprint = item.getProperties().get(document_tmp_fingerprint)
  196. if fingerprint is not None:
  197. if fingerprint not in dict_fingerprint:
  198. dict_fingerprint[fingerprint] = []
  199. dict_fingerprint[fingerprint].append(item)
  200. except Exception as e:
  201. print(e)
  202. break
  203. print(len(dict_fingerprint.keys()))
  204. status_queue = queue.Queue()
  205. for k,v in dict_fingerprint.items():
  206. print("key",k,len(v))
  207. v.sort(key=lambda x:x.docid)
  208. for _d in v[1:]:
  209. _d.setValue(document_tmp_status,random.randint(401,450),True)
  210. status_queue.put(_d)
  211. mt = MultiThreadHandler(status_queue,_handle,None,30,ots_client=ots_client)
  212. mt.run()
  213. def turn_document_tmp_status():
  214. from BaseDataMaintenance.dataSource.source import getConnect_ots
  215. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  216. import queue
  217. from threading import Thread
  218. import json
  219. task_queue = queue.Queue()
  220. from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
  221. ots_client = getConnect_ots()
  222. def producer1(task_queue,ots_client):
  223. a = ''
  224. for l_a in a.split("\n"):
  225. l_a = l_a.strip()
  226. if l_a !="":
  227. task_queue.put(Document_tmp({document_tmp_partitionkey:int(l_a)%500+1,
  228. document_tmp_docid:int(l_a),
  229. document_tmp_status:66}))
  230. def producer(task_queue,ots_client):
  231. bool_query = BoolQuery(
  232. must_queries=[
  233. # TermQuery("fingerprint","md5=2cc044b81ec13acddcc970b71b780365")
  234. # TermQuery("save",66),
  235. RangeQuery("status",1,51),
  236. # BoolQuery(should_queries=[
  237. # # TermQuery("tenderee","山西利民工业有限责任公司"),
  238. # # MatchPhraseQuery("doctitle","中国电信"),
  239. # # MatchPhraseQuery("doctextcon","中国电信"),
  240. # # MatchPhraseQuery("attachmenttextcon","中国电信")]),
  241. # # RangeQuery(document_tmp_status,88,120,True,True),
  242. # RangeQuery("page_time","2022-03-24","2022-03-25",True,False),
  243. # ExistsQuery
  244. # #,TermQuery(document_tmp_docid,171146519)
  245. # ]
  246. # )
  247. ],
  248. must_not_queries=[
  249. # TermQuery("docid",288599518)
  250. # ExistsQuery("doctitle"),
  251. # ExistsQuery("page_time"),
  252. ]
  253. )
  254. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  255. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
  256. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  257. list_data = getRow_ots(rows)
  258. print(total_count)
  259. # print(list_data)
  260. _count = len(list_data)
  261. for _data in list_data:
  262. _document = Document_tmp(_data)
  263. task_queue.put(_document)
  264. print(list_data)
  265. while next_token:
  266. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  267. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  268. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  269. list_data = getRow_ots(rows)
  270. _count += len(list_data)
  271. print("%d/%d"%(_count,total_count))
  272. for _data in list_data:
  273. _document = Document_tmp(_data)
  274. task_queue.put(_document)
  275. # docids = [223820830,224445409]
  276. # for docid in docids:
  277. # _dict = {document_tmp_docid:int(docid),
  278. # document_tmp_partitionkey:int(docid)%500+1,
  279. # }
  280. # task_queue.put(Document(_dict))
  281. # import pandas as pd
  282. # df = pd.read_excel("2022-01-19_214304_export11.xlsx")
  283. # for docid,tenderee,win in zip(df["docid"],df["招标单位"],df["中标单位"]):
  284. # if not isinstance(tenderee,(str)) or not isinstance(win,(str)) or win=="" or tenderee=="":
  285. # # print(docid)
  286. # _dict = {document_tmp_docid:int(docid),
  287. # document_tmp_partitionkey:int(docid)%500+1,
  288. # }
  289. # task_queue.put(Document(_dict))
  290. log("task_queue size:%d"%(task_queue.qsize()))
  291. def _handle(item,result_queue,ots_client):
  292. #change attach value
  293. # list_attachment = json.loads(item.getProperties().get(document_tmp_attachment_path))
  294. # print("docid",item.getProperties().get(document_tmp_docid))
  295. # for attach in list_attachment:
  296. #
  297. # filemd5 = attach.get(document_tmp_attachment_path_filemd5,"")
  298. # _document_tmp_html = item.getProperties().get(document_tmp_dochtmlcon,"")
  299. #
  300. # _file_title = item.getTitleFromHtml(filemd5,_document_tmp_html)
  301. # filelink = item.getSourceLinkFromHtml(filemd5,_document_tmp_html)
  302. # attach[document_tmp_attachment_path_fileTitle] = _file_title
  303. # attach[document_tmp_attachment_path_fileLink] = filelink
  304. #
  305. # item.setValue(document_tmp_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
  306. # item.all_columns.remove(document_tmp_dochtmlcon)
  307. #change status
  308. # item.setValue(document_tmp_docchannel,item.getProperties().get(document_tmp_original_docchannel),True)
  309. # _extract_json = item.getProperties().get(document_tmp_extract_json,"")
  310. # _extract_json = _extract_json.replace("\x06", "").replace("\x05", "").replace("\x07", "").replace('\\', '')
  311. # item.setValue(document_tmp_extract_json,_extract_json,True)
  312. # json.loads(_extract_json)
  313. item.setValue(document_tmp_status,0,True)
  314. # item.setValue(document_tmp_save,1,True)
  315. # if item.exists_row(ots_client):
  316. # item.update_row(ots_client)
  317. # print(item.getProperties())
  318. item.update_row(ots_client)
  319. # log("update %d status done"%(item.getProperties().get(document_tmp_docid)))
  320. # item.delete_row(ots_client)
  321. # from BaseDataMaintenance.model.ots.document import Document
  322. #
  323. # Doc = Document(item.getProperties())
  324. # if Doc.fix_columns(ots_client,["status"],True):
  325. # if Doc.getProperties().get("status",0)>=401:
  326. # print(Doc.getProperties().get("docid"),"redo")
  327. # item.setValue("status",66,True)
  328. # item.update_row(ots_client)
  329. # pass
  330. t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
  331. t_producer.start()
  332. t_producer.join()
  333. mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
  334. mt.run()
  335. if __name__=="__main__":
  336. # turn_extract_status()
  337. turn_document_tmp_status()