#coding:utf8 from dataSource.source import getConnection_testmysql,getConnection_postgres from utils.Utils import save,getCurrent_date def exportMergeTrainData(): conn = getConnection_testmysql() cursor = conn.cursor() for _type in ["is null","is not null"]: for i in range(20): _limit = 1000000 _offset = i*_limit sql = "select docid_less,docid_greater,json_matrix,prob from merge_probability_pairs_featurematrix_train where prob %s limit %d offset %d"%(_type,_limit,_offset) cursor.execute(sql) row_name = cursor.description print(row_name) rows = cursor.fetchall() list_data = [] for row in rows: _dict = dict() for _n,_v in zip(row_name,row): _dict[_n[0]] = _v list_data.append(_dict) save(list_data,"../data/%s-mergeTrain_%s_part%d.pk"%(getCurrent_date("%Y-%m-%d"),_type.replace(" ",""),i)) import pandas as pd import re def labelTime(): filename = "../data/延期数据.xlsx" df = pd.read_excel(filename) columns = ["docid","entity_text","label","sentence_left","sentence_right","context_left","context_right","new_label"] df_data = {} for _c in columns: df_data[_c] = df[_c] append_columns = ["reg_count"] columns.extend(append_columns) for _c in append_columns: df_data[_c] = [] for _left,_new_label,_text in zip(df["sentence_left"],df["new_label"],df["entity_text"]): _line = _left[-7:] if str(_new_label)!='nan' and _new_label is not None and _new_label!="": df_data["reg_count"].append("-1") else: # if re.search(".*月.*日",_text) is not None or re.search("^\d+\-\d+\-",_text) is not None: # df_data["reg_count"].append("1") # else: # df_data["reg_count"].append("0") if re.search("截止(时间|日期):$",_line) is not None: df_data["reg_count"].append("1") else: df_data["reg_count"].append("0") new_df = pd.DataFrame(df_data) new_df.to_excel(filename,columns=columns) import time def getAttachmentProcessTime(): list_line = [] with open("flow_attachment.log","r",encoding="utf") as f: list_line = f.readlines() # _regrex = "filemd5:(?P[a-z0-9]+) of type:(?P[a-z0-9]+) with size:(?P\d+(\.\d+)?)M download:(?P\d+(\.\d+)?)s,recognize takes (?P\d+(\.\d+)?)s,ret_size:(?P\d+(\.\d+)?)" # _regrex = "process filemd5:(?P[a-z0-9]+) of type:(?P[a-z0-9]+) with size:(?P\d+(\.\d+)?)M download:(?P\d+(\.\d+)?)s recognize takes (?P\d+(\.\d+)?)s,ret_size:(?P\d+(\.\d+)?)" # # _regrex = "filemd5:(?P[a-z0-9]+) of type:(?P[a-z0-9]+) download:(?P\d+(\.\d+)?)s recognize:(?P\d+(\.\d+)?)s result:True rec_size:(?P\d+(\.\d+)?)" # keys = ["filemd5","filetype","filesize","dowload","recognize","ret_size"] _regrex = "(?P.+),\d+ - root - INFO - thread status alive:(?P\d+) restart:0 total:40" keys = ["process_time","alive_count"] df_data = {} for k in keys: df_data[k] = [] for _line in list_line: _search = re.search(_regrex,_line) if _search is not None: gd = _search.groupdict() for k in keys: df_data[k].append(gd.get(k,"")) _sum1 = 0 _sum2 = 0 last_time = None for _time,_count in zip(df_data["process_time"],df_data["alive_count"]): if last_time is None: last_time = time.mktime(time.strptime(_time,"%Y-%m-%d %H:%M:%S")) current_time = time.mktime(time.strptime(_time,"%Y-%m-%d %H:%M:%S")) if int(_count)!=40: _sum1 += (40-int(_count))*(current_time-last_time) else: _sum2 += 40*(current_time-last_time) last_time = current_time print("sum1",_sum1) print("sum2",_sum2) df = pd.DataFrame(df_data) df.to_excel("attachmentProcessTime3.xlsx",columns=keys) _exportID = 0 def getExportID(): global _exportID _exportID += 1 return _exportID def getTypeName(): _typestr = ''' code | 项目编号 name | 项目名称 org | 组织 company | 公司 job | 职业 person | 人名 time | 时间 location | 地址 package | 包号 phone | 电话 money | 金额 money_tendereeMoney | 招标金额 money_tendererMoney | 中投标金额 org_tenderee | 招标人 org_agency | 代理人 org_tenderer | 中标人 org_secondTenderer | 第二候选人 org_thirdTenderer | 第三候选人 company_tenderee | 招标人 company_agency | 代理人 company_tenderer | 中标人 company_secondTenderer | 第二候选人 company_thirdTenderer | 第三候选人 person_tendereePerson | 招标联系人 person_agencyPerson | 代理联系人 person_person | 联系人 rel_tendererMoney | 中投标金额 rel_tendereeMoney | 招标金额 rel_person | 联系人 rel_pack | 所属包 rel_address | 地址 rel_phone | 联系电话 rel_pack_code | 包件编号 rel_pack_name | 包件名称 person_review | 评审专家 time_release | 发布时间 time_bidopen | 开标时间 time_bidclose | 截标时间 moneysource | 资金来源 bidway | 招标方式 serviceTime | 服务期限 product | 产品 abandon_reason | 失败原因 ''' dict_type_name = {} for _s_n in _typestr.split("\n"): _s_n = _s_n.strip() if _s_n=="": continue _s_t = _s_n.split("|") if len(_s_t) ==2: _type = _s_t[0].strip() _name = _s_t[1].strip() dict_type_name[_type] = _name return dict_type_name dict_type_name = getTypeName() import json def toJson(list_anno,content): json_dict = {} json_dict["id"] = getExportID() json_dict["text"] = content dict_anno = {} for _anno in list_anno: value = _anno["value"] _split = value.split("\t") if _split[0][0]=="T": _type,_begin,_end = _split[1].split(" ") dict_anno[_split[0]] = {"id":_split[0],"type":_type,"text":_split[2],"begin":int(_begin),"end":int(_end)} elif _split[0][0]=="R": _type,arg1,arg2 = _split[1].split(" ") dict_anno[_split[0]] = {"id":_split[0],"type":_type,"arg1":arg1.split(":")[1],"arg2":arg2.split(":")[1]} for k,v in dict_anno.items(): if v["id"][0]=="T": v["new_id"] = getExportID() v["label"] = dict_type_name[v["type"]] v["start_offset"] = v["begin"] v["end_offset"] = v["end"] for k,v in dict_anno.items(): if v["id"][0]=="R": v["new_id"] = getExportID() v["type"] = dict_type_name[v["type"]] v["from_id"] = dict_anno[v["arg1"]]["new_id"] v["to_id"] = dict_anno[v["arg2"]]["new_id"] list_entitys = [] list_relations = [] for k,v in dict_anno.items(): if v["id"][0]=="T": _dict = {"id":v["new_id"], "label":v["label"], "start_offset":v["start_offset"], "end_offset":v["end_offset"]} list_entitys.append(_dict) if v["id"][0]=="R": _dict = {"id":v["new_id"], "type":v["type"], "from_id":v["from_id"], "to_id":v["to_id"]} list_relations.append(_dict) json_dict["entities"] = list_entitys json_dict["relations"] = list_relations return json.dumps(json_dict,ensure_ascii=False) def exportIepyLabel(): conn = getConnection_postgres("iepy") cursor = conn.cursor() sql = ' select begin_time,end_time,"user",doc_count from corpus_payroll where end_time<=\'2021-07-25\' order by end_time desc limit 20' cursor.execute(sql) list_diff = [] rows_payroll = cursor.fetchall() list_json = [] for _payroll in rows_payroll: _begin_time = _payroll[0] _end_time = _payroll[1] _user = _payroll[2] doc_count = _payroll[3] print(_user,_begin_time,_end_time,doc_count) _sql = "select document_id,value from brat_bratannotation where document_id in (select human_identifier from corpus_iedocument where edituser='%s' and to_char(edittime,'yyyy-mm-dd')>='%s' and to_char(edittime,'yyyy-mm-dd')<='%s' limit 100) order by document_id"%(_user,_begin_time,_end_time) cursor.execute(_sql) rows = cursor.fetchall() if len(rows)>0: current_docid = rows[0][0] _index = -1 list_values = [] while _index=100: break print(_count) def exportMergeWrongData(): from dataSource.source import getConnection_mysql from dataSource.pool import ConnectorPool from utils.multiThread import MultiThreadHandler from queue import Queue import datetime min_id = 1850000 # max_id = 2000000 max_id = 328426202+1 thread_count = 30 every_count = (max_id-min_id)//30+1 list_dis = [] for _i in range(thread_count): dis = [min_id+_i*every_count,min_id+(_i+1)*every_count] list_dis.append(dis) task_queue = Queue() result_queue = Queue() for _dis in list_dis: task_queue.put(_dis) def _handle(item,result_queue): conn = getConnection_mysql() cursor = conn.cursor() start,end = item set_docid = set() for _id in range(start,end): sql = "select rule,docid,operate_time from bxkc.bxkc_delete_document_log where id=%d"%(_id) cursor.execute(sql) rows = cursor.fetchall() if len(rows)>0: rule,docid,operate_time = rows[0] if docid in set_docid: continue if str(rule)[:4]=="项目合并": set_docid.add(docid) print(len(set_docid),_id,end,end-_id) if operate_time.strftime("%Y-%m-%d")<"2022-10-14": break result_queue.put(list(set_docid)) mt = MultiThreadHandler(task_queue,_handle,result_queue,30) mt.run() list_data = [] while 1: try: list_docid = result_queue.get(True,1) list_data.extend(list_docid) except Exception as e: break list_data = list(set(list_data)) df = pd.DataFrame({"docid":list_data}) df.to_excel("mergeWrong.xlsx") if __name__=="__main__": # exportMergeTrainData() # labelTime() # getAttachmentProcessTime() # exportIepyLabel() exportMergeWrongData()