123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- #coding:utf8
- from dataSource.source import getConnection_testmysql,getConnection_postgres
- from utils.Utils import save,getCurrent_date
- def exportMergeTrainData():
- conn = getConnection_testmysql()
- cursor = conn.cursor()
- for _type in ["is null","is not null"]:
- for i in range(20):
- _limit = 1000000
- _offset = i*_limit
- sql = "select docid_less,docid_greater,json_matrix,prob from merge_probability_pairs_featurematrix_train where prob %s limit %d offset %d"%(_type,_limit,_offset)
- cursor.execute(sql)
- row_name = cursor.description
- print(row_name)
- rows = cursor.fetchall()
- list_data = []
- for row in rows:
- _dict = dict()
- for _n,_v in zip(row_name,row):
- _dict[_n[0]] = _v
- list_data.append(_dict)
- save(list_data,"../data/%s-mergeTrain_%s_part%d.pk"%(getCurrent_date("%Y-%m-%d"),_type.replace(" ",""),i))
- import pandas as pd
- import re
- def labelTime():
- filename = "../data/延期数据.xlsx"
- df = pd.read_excel(filename)
- columns = ["docid","entity_text","label","sentence_left","sentence_right","context_left","context_right","new_label"]
- df_data = {}
- for _c in columns:
- df_data[_c] = df[_c]
- append_columns = ["reg_count"]
- columns.extend(append_columns)
- for _c in append_columns:
- df_data[_c] = []
- for _left,_new_label,_text in zip(df["sentence_left"],df["new_label"],df["entity_text"]):
- _line = _left[-7:]
- if str(_new_label)!='nan' and _new_label is not None and _new_label!="":
- df_data["reg_count"].append("-1")
- else:
- # if re.search(".*月.*日",_text) is not None or re.search("^\d+\-\d+\-",_text) is not None:
- # df_data["reg_count"].append("1")
- # else:
- # df_data["reg_count"].append("0")
- if re.search("截止(时间|日期):$",_line) is not None:
- df_data["reg_count"].append("1")
- else:
- df_data["reg_count"].append("0")
- new_df = pd.DataFrame(df_data)
- new_df.to_excel(filename,columns=columns)
- import time
- def getAttachmentProcessTime():
- list_line = []
- with open("flow_attachment.log","r",encoding="utf") as f:
- list_line = f.readlines()
- # _regrex = "filemd5:(?P<filemd5>[a-z0-9]+) of type:(?P<filetype>[a-z0-9]+) with size:(?P<filesize>\d+(\.\d+)?)M download:(?P<dowload>\d+(\.\d+)?)s,recognize takes (?P<recognize>\d+(\.\d+)?)s,ret_size:(?P<ret_size>\d+(\.\d+)?)"
- # _regrex = "process filemd5:(?P<filemd5>[a-z0-9]+) of type:(?P<filetype>[a-z0-9]+) with size:(?P<filesize>\d+(\.\d+)?)M download:(?P<dowload>\d+(\.\d+)?)s recognize takes (?P<recognize>\d+(\.\d+)?)s,ret_size:(?P<ret_size>\d+(\.\d+)?)"
- # # _regrex = "filemd5:(?P<filemd5>[a-z0-9]+) of type:(?P<filetype>[a-z0-9]+) download:(?P<dowload>\d+(\.\d+)?)s recognize:(?P<recognize>\d+(\.\d+)?)s result:True rec_size:(?P<ret_size>\d+(\.\d+)?)"
- # keys = ["filemd5","filetype","filesize","dowload","recognize","ret_size"]
- _regrex = "(?P<process_time>.+),\d+ - root - INFO - thread status alive:(?P<alive_count>\d+) restart:0 total:40"
- keys = ["process_time","alive_count"]
- df_data = {}
- for k in keys:
- df_data[k] = []
- for _line in list_line:
- _search = re.search(_regrex,_line)
- if _search is not None:
- gd = _search.groupdict()
- for k in keys:
- df_data[k].append(gd.get(k,""))
- _sum1 = 0
- _sum2 = 0
- last_time = None
- for _time,_count in zip(df_data["process_time"],df_data["alive_count"]):
- if last_time is None:
- last_time = time.mktime(time.strptime(_time,"%Y-%m-%d %H:%M:%S"))
- current_time = time.mktime(time.strptime(_time,"%Y-%m-%d %H:%M:%S"))
- if int(_count)!=40:
- _sum1 += (40-int(_count))*(current_time-last_time)
- else:
- _sum2 += 40*(current_time-last_time)
- last_time = current_time
- print("sum1",_sum1)
- print("sum2",_sum2)
- df = pd.DataFrame(df_data)
- df.to_excel("attachmentProcessTime3.xlsx",columns=keys)
- _exportID = 0
- def getExportID():
- global _exportID
- _exportID += 1
- return _exportID
- def getTypeName():
- _typestr = '''
- code | 项目编号
- name | 项目名称
- org | 组织
- company | 公司
- job | 职业
- person | 人名
- time | 时间
- location | 地址
- package | 包号
- phone | 电话
- money | 金额
- money_tendereeMoney | 招标金额
- money_tendererMoney | 中投标金额
-
- org_tenderee | 招标人
- org_agency | 代理人
- org_tenderer | 中标人
- org_secondTenderer | 第二候选人
- org_thirdTenderer | 第三候选人
- company_tenderee | 招标人
- company_agency | 代理人
- company_tenderer | 中标人
- company_secondTenderer | 第二候选人
- company_thirdTenderer | 第三候选人
-
- person_tendereePerson | 招标联系人
- person_agencyPerson | 代理联系人
- person_person | 联系人
-
- rel_tendererMoney | 中投标金额
- rel_tendereeMoney | 招标金额
- rel_person | 联系人
- rel_pack | 所属包
- rel_address | 地址
- rel_phone | 联系电话
- rel_pack_code | 包件编号
- rel_pack_name | 包件名称
-
- person_review | 评审专家
- time_release | 发布时间
- time_bidopen | 开标时间
- time_bidclose | 截标时间
- moneysource | 资金来源
- bidway | 招标方式
- serviceTime | 服务期限
- product | 产品
- abandon_reason | 失败原因
- '''
- dict_type_name = {}
- for _s_n in _typestr.split("\n"):
- _s_n = _s_n.strip()
- if _s_n=="":
- continue
- _s_t = _s_n.split("|")
- if len(_s_t) ==2:
- _type = _s_t[0].strip()
- _name = _s_t[1].strip()
- dict_type_name[_type] = _name
- return dict_type_name
- dict_type_name = getTypeName()
- import json
- def toJson(list_anno,content):
- json_dict = {}
- json_dict["id"] = getExportID()
- json_dict["text"] = content
- dict_anno = {}
- for _anno in list_anno:
- value = _anno["value"]
- _split = value.split("\t")
- if _split[0][0]=="T":
- _type,_begin,_end = _split[1].split(" ")
- dict_anno[_split[0]] = {"id":_split[0],"type":_type,"text":_split[2],"begin":int(_begin),"end":int(_end)}
- elif _split[0][0]=="R":
- _type,arg1,arg2 = _split[1].split(" ")
- dict_anno[_split[0]] = {"id":_split[0],"type":_type,"arg1":arg1.split(":")[1],"arg2":arg2.split(":")[1]}
- for k,v in dict_anno.items():
- if v["id"][0]=="T":
- v["new_id"] = getExportID()
- v["label"] = dict_type_name[v["type"]]
- v["start_offset"] = v["begin"]
- v["end_offset"] = v["end"]
- for k,v in dict_anno.items():
- if v["id"][0]=="R":
- v["new_id"] = getExportID()
- v["type"] = dict_type_name[v["type"]]
- v["from_id"] = dict_anno[v["arg1"]]["new_id"]
- v["to_id"] = dict_anno[v["arg2"]]["new_id"]
- list_entitys = []
- list_relations = []
- for k,v in dict_anno.items():
- if v["id"][0]=="T":
- _dict = {"id":v["new_id"],
- "label":v["label"],
- "start_offset":v["start_offset"],
- "end_offset":v["end_offset"]}
- list_entitys.append(_dict)
- if v["id"][0]=="R":
- _dict = {"id":v["new_id"],
- "type":v["type"],
- "from_id":v["from_id"],
- "to_id":v["to_id"]}
- list_relations.append(_dict)
- json_dict["entities"] = list_entitys
- json_dict["relations"] = list_relations
- return json.dumps(json_dict,ensure_ascii=False)
- def exportIepyLabel():
- conn = getConnection_postgres("iepy")
- cursor = conn.cursor()
- sql = ' select begin_time,end_time,"user",doc_count from corpus_payroll where end_time<=\'2021-07-25\' order by end_time desc limit 20'
- cursor.execute(sql)
- list_diff = []
- rows_payroll = cursor.fetchall()
- list_json = []
- for _payroll in rows_payroll:
- _begin_time = _payroll[0]
- _end_time = _payroll[1]
- _user = _payroll[2]
- doc_count = _payroll[3]
- print(_user,_begin_time,_end_time,doc_count)
- _sql = "select document_id,value from brat_bratannotation where document_id in (select human_identifier from corpus_iedocument where edituser='%s' and to_char(edittime,'yyyy-mm-dd')>='%s' and to_char(edittime,'yyyy-mm-dd')<='%s' limit 100) order by document_id"%(_user,_begin_time,_end_time)
- cursor.execute(_sql)
- rows = cursor.fetchall()
- if len(rows)>0:
- current_docid = rows[0][0]
- _index = -1
- list_values = []
- while _index<len(rows)-1:
- _index += 1
- row = rows[_index]
- document_id = row[0]
- value = row[1]
- if document_id!=current_docid:
- print(current_docid)
- sql = "select text from corpus_iedocument where human_identifier='%s'"%(str(current_docid))
- cursor.execute(sql)
- content = cursor.fetchall()[0][0]
- _json = toJson(list_values,content)
- list_json.append(_json)
- _index -= 1
- current_docid = document_id
- list_values = []
- else:
- list_values.append({"document_id":document_id,"value":value})
- print("length:",len(list_json))
- _count = 0
- with open("iepy_export.json","w",encoding="utf8") as f:
- for _json in list_json:
- if len(_json)<600:
- f.write(_json)
- f.write("\n")
- _count += 1
- if _count>=100:
- break
- print(_count)
- def exportMergeWrongData():
- from dataSource.source import getConnection_mysql
- from dataSource.pool import ConnectorPool
- from utils.multiThread import MultiThreadHandler
- from queue import Queue
- import datetime
- min_id = 1850000
- # max_id = 2000000
- max_id = 328426202+1
- thread_count = 30
- every_count = (max_id-min_id)//30+1
- list_dis = []
- for _i in range(thread_count):
- dis = [min_id+_i*every_count,min_id+(_i+1)*every_count]
- list_dis.append(dis)
- task_queue = Queue()
- result_queue = Queue()
- for _dis in list_dis:
- task_queue.put(_dis)
- def _handle(item,result_queue):
- conn = getConnection_mysql()
- cursor = conn.cursor()
- start,end = item
- set_docid = set()
- for _id in range(start,end):
- sql = "select rule,docid,operate_time from bxkc.bxkc_delete_document_log where id=%d"%(_id)
- cursor.execute(sql)
- rows = cursor.fetchall()
- if len(rows)>0:
- rule,docid,operate_time = rows[0]
- if docid in set_docid:
- continue
- if str(rule)[:4]=="项目合并":
- set_docid.add(docid)
- print(len(set_docid),_id,end,end-_id)
- if operate_time.strftime("%Y-%m-%d")<"2022-10-14":
- break
- result_queue.put(list(set_docid))
- mt = MultiThreadHandler(task_queue,_handle,result_queue,30)
- mt.run()
- list_data = []
- while 1:
- try:
- list_docid = result_queue.get(True,1)
- list_data.extend(list_docid)
- except Exception as e:
- break
- list_data = list(set(list_data))
- df = pd.DataFrame({"docid":list_data})
- df.to_excel("mergeWrong.xlsx")
- if __name__=="__main__":
- # exportMergeTrainData()
- # labelTime()
- # getAttachmentProcessTime()
- # exportIepyLabel()
- exportMergeWrongData()
|