12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- #coding:utf8
- from BaseDataMaintenance.model.postgres.BaseModel import BaseModel
- import random
- document_extract_docid = "docid"
- document_extract_fingerprint = "fingerprint"
- document_extract_extract_json = "extract_json"
- import re
- import pymysql
- import psycopg2
- from BaseDataMaintenance.common.Utils import *
- class Document_extract_postgres(BaseModel):
- #filemd5,path,crtime,attachmenthtml,attachmentcon,process_time,page_time,docids,filetype,status,sourcelink,filetitle
- def __init__(self,_dict):
- self.columns = set([document_extract_docid,document_extract_fingerprint,document_extract_extract_json])
- for k,v in _dict.items():
- if k in self.columns:
- # if k in ("attachmenthtml","attachmentcon") and v is not None:
- # v = v.replace("%", "\\\\%").replace("_", "\\\\_").replace("'","\\\\'").replace("(","\\\\(").replace(")","\\\\)")
- if isinstance(v,str):
- # self.setValue(k,getLegal_str(pymysql.escape_string(v.replace('\x00',''))),True)
- self.setValue(k,v,True)
- else:
- self.setValue(k,v,True)
- self.table_name = "document_extract"
- def getPrimary_keys(self):
- return [document_extract_docid]
- # def delete_row(self,ots_client):
- # raise NotImplementedError()
- def test():
- from BaseDataMaintenance.dataSource.pool import ConnectorPool
- from BaseDataMaintenance.dataSource.source import getConnection_postgres
- pool = ConnectorPool(1,2,getConnection_postgres)
- # a = '''
- # { "attachmentTypes": "", "bidway": "", "code": [ "HJHGC2022A051" ], "cost_time": { "attrs": 0.04, "codename": 0.04, "deposit": 0.0, "nerToken": 0.1, "person": 0.01, "prem": 0.03, "preprocess": 0.12, "product": 0.04, "product_attrs": 0.0, "punish": 0.08, "roleRuleFinal": 0.0, "rule": 0.0, "rule_channel": 0.0, "tableToText": 9.298324584960938e-06, "tendereeRuleRecall": 0.0, "time": 0.01, "total_unit_money": 0.0 }, "demand_info": { "data": [], "header": [], "header_col": [] }, "deposit_patment_way": "", "docchannel": { "docchannel": "中标信息", "doctype": "采招数据", "life_docchannel": "中标信息" }, "docid": "", "doctitle_refine": "2022年全域智慧旅游交通基础设施提升-三口镇风景道节点提升之老街古民居修缮和展陈项目", "exist_table": 0, "extract_count": 5, "fail_reason": "", "fingerprint": "md5=657127efcc977d6aa86eed56fed44d49", "industry": { "class": "文化、体育和娱乐业", "class_name": "文物及非物质文化遗产保护", "subclass": "文化艺术业" }, "match_enterprise": [], "match_enterprise_type": 0, "moneysource": "", "name": "2022年全域智慧旅游交通基础设施提升工程-三口镇风景道节点提升工程之老街古民居修缮和展陈项目项目", "nlp_enterprise": [ "黄山市大明古建工程有限公司", "黄山市黄山区耿城镇人民政府", "安徽振兴工程咨询有限公司" ], "nlp_enterprise_attachment": [], "person_review": [ "汪宏福", "吕维柱", "王威" ], "prem": { "Project": { "code": "", "roleList": [ { "address": "", "linklist": [ [ "徐虎剑", "" ] ], "role_money": { "discount_ratio": "", "downward_floating_ratio": "", "floating_ratio": "", "money": "1309502.00", "money_unit": "元" }, "role_name": "win_tenderer", "role_text": "黄山市大明古建工程有限公司", "serviceTime": "70日历天" } ], "tendereeMoney": 0, "tendereeMoneyUnit": "" } }, "process_time": "2022-08-23 09:36:40", "product": [ "全域智慧旅游交通基础设施提升工程", "风景道节点提升工程之老街古民居修缮和展陈" ], "product_attrs": { "data": [], "header": [], "header_col": [] }, "serviceTime": "70日历天", "success": true, "time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnestMoneyEnd": "", "time_earnestMoneyStart": "", "time_getFileEnd": "", "time_getFileStart": "", "time_publicityEnd": "", "time_publicityStart": "2022-08-23", "time_registrationEnd": "", "time_registrationStart": "", "time_release": "", "total_tendereeMoney": 0, "total_tendereeMoneyUnit": ""}
- # '''
- # b = Document_extract_postgres({document_extract_fingerprint:'11111',
- # document_extract_docid:213,
- # document_extract_extract_json:a})
- # b.insert_row(pool)
- # print(getLegal_str(pymysql.escape_string(a.replace('\x00',''))))
- conn = pool.getConnector()
- list_extract = Document_extract_postgres.select_rows(conn,Document_extract_postgres,"document_extract",[" fingerprint='%s'"%"md5=354bbc7cdbab7f63f53fb31331a78f25"])
- print("=",list_extract[0].getProperties().get(document_extract_extract_json),"=")
- from tablestore import *
- def fix_document_extract():
- def _handle(item,result_queue):
- de = Document_extract_postgres(item)
- de.insert_row(pool_postgres)
- from BaseDataMaintenance.dataSource.pool import ConnectorPool
- from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots
- from queue import Queue
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- pool_postgres = ConnectorPool(10,20,getConnection_postgres)
- task_queue = Queue()
- ots_client = getConnect_ots()
- bool_query = BoolQuery(must_queries=[RangeQuery("crtime","2022-08-22"),
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),limit=100,get_total_count=True),
- ColumnsToGet(column_names=["fingerprint","extract_json"],return_type=ColumnReturnType.SPECIFIED))
- print(total_count)
- list_data = getRow_ots(rows)
- print(list_data[0])
- for _data in list_data:
- task_queue.put(_data)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(column_names=["fingerprint","extract_json"],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- print("%d/%d"%(task_queue.qsize(),total_count))
- for _data in list_data:
- task_queue.put(_data)
- mt = MultiThreadHandler(task_queue,_handle,None,20)
- mt.run()
- if __name__=="__main__":
- pass
- fix_document_extract()
|