document_extract.py 6.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. #coding:utf8
  2. from BaseDataMaintenance.model.postgres.BaseModel import BaseModel
  3. import random
  4. document_extract_docid = "docid"
  5. document_extract_fingerprint = "fingerprint"
  6. document_extract_extract_json = "extract_json"
  7. import re
  8. import pymysql
  9. import psycopg2
  10. from BaseDataMaintenance.common.Utils import *
  11. class Document_extract_postgres(BaseModel):
  12. #filemd5,path,crtime,attachmenthtml,attachmentcon,process_time,page_time,docids,filetype,status,sourcelink,filetitle
  13. def __init__(self,_dict):
  14. self.columns = set([document_extract_docid,document_extract_fingerprint,document_extract_extract_json])
  15. for k,v in _dict.items():
  16. if k in self.columns:
  17. # if k in ("attachmenthtml","attachmentcon") and v is not None:
  18. # v = v.replace("%", "\\\\%").replace("_", "\\\\_").replace("'","\\\\'").replace("(","\\\\(").replace(")","\\\\)")
  19. if isinstance(v,str):
  20. # self.setValue(k,getLegal_str(pymysql.escape_string(v.replace('\x00',''))),True)
  21. self.setValue(k,v,True)
  22. else:
  23. self.setValue(k,v,True)
  24. self.table_name = "document_extract"
  25. def getPrimary_keys(self):
  26. return [document_extract_docid]
  27. # def delete_row(self,ots_client):
  28. # raise NotImplementedError()
  29. def test():
  30. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  31. from BaseDataMaintenance.dataSource.source import getConnection_postgres
  32. pool = ConnectorPool(1,2,getConnection_postgres)
  33. # a = '''
  34. # { "attachmentTypes": "", "bidway": "", "code": [ "HJHGC2022A051" ], "cost_time": { "attrs": 0.04, "codename": 0.04, "deposit": 0.0, "nerToken": 0.1, "person": 0.01, "prem": 0.03, "preprocess": 0.12, "product": 0.04, "product_attrs": 0.0, "punish": 0.08, "roleRuleFinal": 0.0, "rule": 0.0, "rule_channel": 0.0, "tableToText": 9.298324584960938e-06, "tendereeRuleRecall": 0.0, "time": 0.01, "total_unit_money": 0.0 }, "demand_info": { "data": [], "header": [], "header_col": [] }, "deposit_patment_way": "", "docchannel": { "docchannel": "中标信息", "doctype": "采招数据", "life_docchannel": "中标信息" }, "docid": "", "doctitle_refine": "2022年全域智慧旅游交通基础设施提升-三口镇风景道节点提升之老街古民居修缮和展陈项目", "exist_table": 0, "extract_count": 5, "fail_reason": "", "fingerprint": "md5=657127efcc977d6aa86eed56fed44d49", "industry": { "class": "文化、体育和娱乐业", "class_name": "文物及非物质文化遗产保护", "subclass": "文化艺术业" }, "match_enterprise": [], "match_enterprise_type": 0, "moneysource": "", "name": "2022年全域智慧旅游交通基础设施提升工程-三口镇风景道节点提升工程之老街古民居修缮和展陈项目项目", "nlp_enterprise": [ "黄山市大明古建工程有限公司", "黄山市黄山区耿城镇人民政府", "安徽振兴工程咨询有限公司" ], "nlp_enterprise_attachment": [], "person_review": [ "汪宏福", "吕维柱", "王威" ], "prem": { "Project": { "code": "", "roleList": [ { "address": "", "linklist": [ [ "徐虎剑", "" ] ], "role_money": { "discount_ratio": "", "downward_floating_ratio": "", "floating_ratio": "", "money": "1309502.00", "money_unit": "元" }, "role_name": "win_tenderer", "role_text": "黄山市大明古建工程有限公司", "serviceTime": "70日历天" } ], "tendereeMoney": 0, "tendereeMoneyUnit": "" } }, "process_time": "2022-08-23 09:36:40", "product": [ "全域智慧旅游交通基础设施提升工程", "风景道节点提升工程之老街古民居修缮和展陈" ], "product_attrs": { "data": [], "header": [], "header_col": [] }, "serviceTime": "70日历天", "success": true, "time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnestMoneyEnd": "", "time_earnestMoneyStart": "", "time_getFileEnd": "", "time_getFileStart": "", "time_publicityEnd": "", "time_publicityStart": "2022-08-23", "time_registrationEnd": "", "time_registrationStart": "", "time_release": "", "total_tendereeMoney": 0, "total_tendereeMoneyUnit": ""}
  35. # '''
  36. # b = Document_extract_postgres({document_extract_fingerprint:'11111',
  37. # document_extract_docid:213,
  38. # document_extract_extract_json:a})
  39. # b.insert_row(pool)
  40. # print(getLegal_str(pymysql.escape_string(a.replace('\x00',''))))
  41. conn = pool.getConnector()
  42. list_extract = Document_extract_postgres.select_rows(conn,Document_extract_postgres,"document_extract",[" fingerprint='%s'"%"md5=354bbc7cdbab7f63f53fb31331a78f25"])
  43. print("=",list_extract[0].getProperties().get(document_extract_extract_json),"=")
  44. from tablestore import *
  45. def fix_document_extract():
  46. def _handle(item,result_queue):
  47. de = Document_extract_postgres(item)
  48. de.insert_row(pool_postgres)
  49. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  50. from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots
  51. from queue import Queue
  52. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  53. pool_postgres = ConnectorPool(10,20,getConnection_postgres)
  54. task_queue = Queue()
  55. ots_client = getConnect_ots()
  56. bool_query = BoolQuery(must_queries=[RangeQuery("crtime","2022-08-22"),
  57. ])
  58. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  59. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),limit=100,get_total_count=True),
  60. ColumnsToGet(column_names=["fingerprint","extract_json"],return_type=ColumnReturnType.SPECIFIED))
  61. print(total_count)
  62. list_data = getRow_ots(rows)
  63. print(list_data[0])
  64. for _data in list_data:
  65. task_queue.put(_data)
  66. while next_token:
  67. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  68. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  69. ColumnsToGet(column_names=["fingerprint","extract_json"],return_type=ColumnReturnType.SPECIFIED))
  70. list_data = getRow_ots(rows)
  71. print("%d/%d"%(task_queue.qsize(),total_count))
  72. for _data in list_data:
  73. task_queue.put(_data)
  74. mt = MultiThreadHandler(task_queue,_handle,None,20)
  75. mt.run()
  76. if __name__=="__main__":
  77. pass
  78. fix_document_extract()