#coding:utf8 import psycopg2 from keras import models from keras import layers from keras import optimizers,losses,metrics from keras.callbacks import ModelCheckpoint import codecs import copy from BiddingKG.dl.common.Utils import * #sourcetable = "label_guest_money" sourcetable = "hand_label_money" domain = sourcetable.split("_")[2] model_file = "model_"+domain+".model" entity_type = "money" input_shape = (2,10,128) input2_shape = [7] output_shape = [3] def getTokensLabels(t,isTrain=True,predict=False): ''' @param: t:标注数据所在表 isTrain:是否训练 predict:是否是验证 @return:返回标注数据的处理后的输入和标签 ''' conn = psycopg2.connect(dbname="BiddingKM_test_10000",user="postgres",password="postgres",host="192.168.2.101") cursor = conn.cursor() if predict: sql = ''' select A.tokens,B.begin_index,B.end_index,0,B.entity_text,B.entity_id from sentences A,entity_mention B where B.entity_type in ('money') and A.doc_id=B.doc_id and A.sentence_index=B.sentence_index and B.doc_id='360e1a62-7f82-11e8-ae30-ecf4bbc56acd' order by B.doc_id ''' else: select_sql = " select A.tokens,B.begin_index,B.end_index,C.label,B.entity_text,C.entity_id " ''' if isTrain: train_sql = " and C.id not in(select variable_id from dd_graph_variables_holdout) " else: train_sql = " and C.id in(select variable_id from dd_graph_variables_holdout)" ''' if isTrain: train_sql = " and A.doc_id not in(select id from articles_processed order by id limit 1000) " else: train_sql = " and A.doc_id in(select id from articles_processed order by id limit 1000)" sql = select_sql+" from sentences A,entity_mention_copy B,"+t+" C where B.entity_type='"+entity_type+"' and A.doc_id=B.doc_id and A.sentence_index=B.sentence_index and B.entity_id=C.entity_id "+train_sql cursor.execute(sql) print(sql) data_x = [] data_x1 = [] data_y = [] data_context = [] rows = cursor.fetchmany(1000) allLimit = 250000 all = 0 while(rows): for row in rows: if all>=allLimit: break item_x = embedding(spanWindow(tokens=row[0],begin_index=row[1],end_index=row[2],size=input_shape[1]),shape=input_shape) item_x1 = partMoney(row[4]) item_y = np.zeros(output_shape) item_y[row[3]] = 1 all += 1 if not isTrain: item_context = [] item_context.append(row[5]) data_context.append(item_context) data_x.append(item_x) data_x1.append(item_x1) data_y.append(item_y) rows = cursor.fetchmany(1000) return np.transpose(np.array(data_x),(1,0,2,3)),np.array(data_x1),np.array(data_y),data_context def getBiRNNModel(): ''' @summary:获取模型 ''' L_input = layers.Input(shape=input_shape[1:],dtype="float32") C_input = layers.Input(shape=([input2_shape[0]]),dtype="float32") R_input = layers.Input(shape=input_shape[1:],dtype="float32") #lstm_0 = layers.Bidirectional(layers.LSTM(16,return_sequences=True))(ThreeBilstm(0)(input)) lstm_0 = layers.Bidirectional(layers.LSTM(16,return_sequences=True))(L_input) avg_0 = layers.GlobalAveragePooling1D()(lstm_0) C_embed = layers.Dense(4,activation="sigmoid")(C_input) #lstm_1 = layers.Bidirectional(layers.LSTM(16,return_sequences=True))(C_input) #avg_1 = layers.GlobalAveragePooling1D()(lstm_1) lstm_2 = layers.Bidirectional(layers.LSTM(16,return_sequences=True))(R_input) avg_2 = layers.GlobalAveragePooling1D()(lstm_2) #concat = layers.merge([avg_0,avg_1,avg_2],mode="concat") concat = layers.merge([avg_0,C_embed,avg_2],mode="concat") output = layers.Dense(output_shape[0],activation="softmax")(concat) model = models.Model(inputs=[L_input,C_input,R_input],outputs=output) model.compile(optimizer=optimizers.RMSprop(lr=0.001),loss=losses.binary_crossentropy,metrics=[precision,recall,f1_score]) return model def training(): ''' @summary:训练模型 ''' model = getBiRNNModel() model.summary() train_x,train_x1,train_y,_ = getTokensLabels(isTrain=True,t=sourcetable) print(np.shape(train_x)) test_x,test_x1,test_y,test_context = getTokensLabels(isTrain=False,t=sourcetable) checkpoint = ModelCheckpoint(model_file+".hdf5",monitor="val_loss",verbose=1,save_best_only=True,mode='min') history_model = model.fit(x=[train_x[0],train_x1,train_x[1]],y=train_y,validation_data=([test_x[0],test_x1,test_x[1]],test_y),epochs=100,batch_size=300,shuffle=True,callbacks=[checkpoint]) predict_y = model.predict([test_x[0],test_x1,test_x[1]]) with codecs.open("predict.txt","w",encoding="utf8") as f: for i in range(len(predict_y)): f.write(str(test_context[i][0])) f.write("\t") f.write(str(np.argmax(predict_y[i]))) f.write("\n") f.flush() f.close() model.save(model_file) #print_metrics(history_model) def trainingIteration_category(iterate=2,label_table="label_guest_money"): ''' @summary:迭代训练模型,修改标签,适用于当数据准确率不高的条件 @param: iterate:迭代次数 label_table:标签数据所在表 ''' def getDatasets(): conn = psycopg2.connect(dbname="BiddingKM_test_10000",user="postgres",password="postgres",host="192.168.2.101") cursor = conn.cursor() select_sql = " select A.tokens,B.begin_index,B.end_index,C.label,B.entity_text,C.entity_id " sql = select_sql+" from sentences A,entity_mention B,"+label_table+" C where B.entity_type='"+entity_type+"' and A.doc_id=B.doc_id and A.sentence_index=B.sentence_index and B.entity_id=C.entity_id order by A.doc_id " cursor.execute(sql) print(sql) data_x = [] data_x1 = [] data_y = [] id_set = [] rows = cursor.fetchmany(1000) allLimit = 250000 all = 0 while(rows): for row in rows: if all>=allLimit: break item_x = embedding(spanWindow(tokens=row[0],begin_index=row[1],end_index=row[2])) item_x1 = partMoney(row[4]) item_y = np.zeros(output_shape) item_y[row[3]] = 1 all += 1 data_x.append(item_x) data_x1.append(item_x1) data_y.append(item_y) id_set.append(row[5]) rows = cursor.fetchmany(1000) return np.transpose(np.array(data_x),(1,0,2,3)),np.array(data_x1),np.array(data_y),id_set train_x,train_x1,train_y,id_set = getDatasets() alllength = len(train_x[0]) parts = 8 num_parts = alllength//parts copy_y = copy.copy(train_y) for ite in range(iterate): for j in range(parts-1): print("iterate:",str(ite)+"/"+str(iterate-1),str(j)+"/"+str(parts-1)) model = getBiRNNModel() model.summary() test_begin = j*num_parts test_end = (j+1)*num_parts checkpoint = ModelCheckpoint(model_file+".hdf5",monitor="val_loss",verbose=1,save_best_only=True,mode='min') history_model = model.fit(x=[np.concatenate((train_x[0][0:test_begin],train_x[0][test_end:])),np.concatenate((train_x1[0:test_begin],train_x1[test_end:])),np.concatenate((train_x[1][0:test_begin],train_x[1][test_end:]))],y=np.concatenate((copy_y[0:test_begin],copy_y[test_end:])),validation_data=([train_x[0][test_begin:test_end],train_x1[test_begin:test_end],train_x[1][test_begin:test_end]],copy_y[test_begin:test_end]),epochs=30,batch_size=300,shuffle=True,callbacks=[checkpoint]) model.load_weights(model_file+".hdf5") predict_y = model.predict([train_x[0][test_begin:test_end],train_x1[test_begin:test_end],train_x[1][test_begin:test_end]]) for i in range(len(predict_y)): if np.max(predict_y[i])>=0.8: max_index = np.argmax(predict_y[i]) for h in range(len(predict_y[i])): if h==max_index: copy_y[i+test_begin][h] = 1 else: copy_y[i+test_begin][h] = 0 print("iterate:",str(ite)+"/"+str(iterate-1),str(j)+"/"+str(parts-1)) model = getBiRNNModel() model.summary() test_begin = j*num_parts checkpoint = ModelCheckpoint(model_file+".hdf5",monitor="val_loss",verbose=1,save_best_only=True,mode='min') history_model = model.fit(x=[train_x[0][0:test_begin],train_x1[0:test_begin],train_x[1][0:test_begin]],y=copy_y[0:test_begin],validation_data=([train_x[0][test_begin:],train_x1[test_begin:],train_x[1][test_begin:]],copy_y[test_begin:]),epochs=30,batch_size=300,shuffle=True,callbacks=[checkpoint]) model.load_weights(model_file+".hdf5") predict_y = model.predict([train_x[0][test_begin:],train_x1[test_begin:],train_x[1][test_begin:]]) for i in range(len(predict_y)): if np.max(predict_y[i])>=0.9: max_index = np.argmax(predict_y[i]) for h in range(len(predict_y[i])): if h==max_index: copy_y[i+test_begin][h] = 1 else: copy_y[i+test_begin][h] = 0 #把结果写入一个文件中 with codecs.open("final_label_"+domain+".txt","w",encoding="utf8") as f: for i in range(len(id_set)): f.write(id_set[i]) f.write("\t") f.write(str(np.argmax(copy_y[i]))) f.write("\n") f.flush() f.close() def predict(): ''' @summary:预测测试数据 ''' test_x,text_x1,_,ids = getTokensLabels(sourcetable, isTrain=False,predict="True") model = getBiRNNModel() model.load_weights(model_file+".hdf5") predict_y = model.predict([test_x[0],text_x1,test_x[1]]) with codecs.open("test_predict_"+domain+".txt","w",encoding="utf8") as f: for i in range(len(predict_y)): f.write(ids[i][0]) f.write("\t") f.write(str(np.argmax(predict_y[i]))) f.write("\t") value = "" for item in predict_y[i]: value += str(item)+"," f.write(value[:-1]) f.write("\n") f.flush() f.close() def importIterateLabel(): ''' @summary:导入迭代之后的标签值 ''' file = "final_label_"+domain+".txt" conn = psycopg2.connect(dbname="BiddingKM_test_10000",user="postgres",password="postgres",host="192.168.2.101") cursor = conn.cursor() tablename = file.split(".")[0] # 创建表 cursor.execute(" SELECT to_regclass('"+tablename+"') is null ") flag = cursor.fetchall()[0][0] if flag: cursor.execute(" create table "+tablename+"(entity_id text,label int)") else: cursor.execute(" delete from "+tablename) with codecs.open(file,"r",encoding="utf8") as f: while(True): line = f.readline() if not line: break line_split = line.split("\t") entity_id=line_split[0] label = line_split[1] sql = " insert into "+tablename+"(entity_id,label) values('"+str(entity_id)+"',"+str(label)+")" cursor.execute(sql) f.close() conn.commit() conn.close() def importtestPredict(): ''' @summary:导入测试数据的预测值 ''' file = "test_predict_"+domain+".txt" conn = psycopg2.connect(dbname="BiddingKG",user="postgres",password="postgres",host="192.168.2.101") cursor = conn.cursor() tablename = file.split(".")[0] # 创建表 cursor.execute(" SELECT to_regclass('"+tablename+"') is null ") flag = cursor.fetchall()[0][0] if flag: cursor.execute(" create table "+tablename+"(entity_id text,label int,value text)") else: cursor.execute(" delete from "+tablename) with codecs.open(file,"r",encoding="utf8") as f: while(True): line = f.readline() if not line: break line_split = line.split("\t") entity_id=line_split[0] predict = line_split[1] value = line_split[2] sql = " insert into "+tablename+"(entity_id,label,value) values('"+str(entity_id)+"',"+str(predict)+",'"+str(value)+"')" cursor.execute(sql) f.close() conn.commit() conn.close() def autoIterate(): #trainingIteration_binary() trainingIteration_category() importIterateLabel() training() predict() if __name__ == "__main__": training() #trainingIteration_category() #importIterateLabel() #predict() #importtestPredict() #autoIterate()