import numpy as np import pandas as pd from py2neo import Graph, Node, Relationship import time import scipy.spatial from fuzzywuzzy import fuzz import hashlib import json import multiprocessing as mp import heapq import os from functools import partial from multiprocessing.pool import Pool dir_project = os.getcwd()+"\\data1\\100000\\zh_en\\" dir_align_project = os.getcwd()+"\\data1\\Align\\zh_en\\" dir_Neo4j_File = "C:\\Users\\admin\\Desktop\\Neo4j.csv" dir_Neo4j_Align_File = "C:\\Users\\admin\\Desktop\\Neo4j_aligned.csv" dir_ent_ids_1_real = dir_project + "ent_ids_1_real" dir_ent_ids_2_real = dir_project + "ent_ids_2_real" dir_triples_1_real = dir_project + "triples_1_real" dir_triples_2_real = dir_project + "triples_2_real" dir_ref_ent_ids_real = dir_project + "ref_ent_ids_real" dir_ref_ent_ids_real_neg = dir_project + "ref_ent_ids_real_neg" dir_training_attrs_1 = dir_project + "training_attrs_1" dir_training_attrs_2 = dir_project + "training_attrs_2" dir_ModelId2RealId = dir_project + "ModelId2RealId" dir_ent_ids_1 = dir_project + "ent_ids_1" dir_triples_1 = dir_project + "triples_1" dir_ref_ent_ids = dir_project + "ref_ent_ids" dir_ref_ent_ids_neg = dir_project + "ref_ent_ids_neg" def getDataFromNeo4j3(skip_num, limit_num): """ 取包括已对齐的公司ID在内的一共20w个公司的ID,每个公司ID取最多10条关联 输入多进程需要的开始结束下标 :return: """ graph = Graph('http://118.31.10.60:7474', auth=('bxkc_web', 'bxkc_web')) lines = file2Data(dir_ref_ent_ids_real) id_list = [] # for index in range(len(lines)): # ids = lines[index][0:-1].split("\t") # id_list.append(int(ids[0])) # id_list.append(int(ids[1])) # id_list = list(set(id_list)) print("Match Neo4j, getting ID...") result = graph.run("match (o:Organization) " "return id(o) order by id(o) " " skip " + str(skip_num) + " limit " + str(limit_num) ).to_ndarray() result = result.tolist() for index in range(len(result)): id_list.append(int(result[index][0])) id_list = list(set(id_list)) id_list.sort() print(len(id_list)) # print(id_list) print("Match Neo4j...") start_time = time.time() results = [] for index in range(len(id_list)): result = graph.run("MATCH (o:Organization)-[r]->(p:Project) " "where p.project_name is not null and id(o)=" + str(id_list[index]) + " RETURN id(o), o.name, o.nicknames, o.area, o.city, o.district, " "id(r), type(r), r.price, " "id(p), p.project_name, p.area, p.city, p.district " "LIMIT 10" ).to_ndarray() results.append(result.tolist()) # print(index, len(id_list)) if index % 500 == 0: end_time = time.time() print(index, end_time-start_time) print("write into csv...") start_time = time.time() df_list = [] for i in range(len(results)): for j in range(len(results[i])): df_list.append(results[i][j]) df = pd.DataFrame(df_list) df.columns = ["id(o)", "o.name", "o.nicknames", "o.area", "o.city", "o.district", "id(r)", "type(r)", "r.price", "id(p)", "p.project_name", "p.area", "p.city", "p.district"] df.to_csv("C:\\Users\\admin\\Desktop\\Neo4j"+"_"+str(skip_num)+"_"+str(limit_num)+".csv") end_time = time.time() print(end_time-start_time) def getDataFromNeo4j_align(): """ 取已对齐的公司ID的所有关联 :return: """ graph = Graph('http://118.31.10.60:7474', auth=('bxkc_web', 'bxkc_web')) print("Match Neo4j, getting ID...") lines = file2Data(dir_align_project+"ref_ent_ids_real") id_list = [] for index in range(len(lines)): ids = lines[index][0:-1].split("\t") id_list.append(ids[0]) id_list.append(ids[1]) id_list = list(set(id_list)) id_list.sort() # print(len(id_list)) print("Match Neo4j...") start_time = time.time() results = [] for index in range(len(id_list)): result = graph.run("MATCH (o:Organization)-[r]->(p:Project) " "where p.project_name is not null and id(o)=" + id_list[index] + " RETURN id(o), o.name, o.nicknames, o.area, o.city, o.district, " "id(r), type(r), r.price, " "id(p), p.project_name, p.area, p.city, p.district " "LIMIT 10" ).to_ndarray() results.append(result.tolist()) # print(index, len(id_list)) end_time = time.time() print(end_time-start_time) print("write into csv...") start_time = time.time() df_list = [] for i in range(len(results)): for j in range(len(results[i])): df_list.append(results[i][j]) df = pd.DataFrame(df_list) df.columns = ["id(o)", "o.name", "o.nicknames", "o.area", "o.city", "o.district", "id(r)", "type(r)", "r.price", "id(p)", "p.project_name", "p.area", "p.city", "p.district"] df.to_csv("C:\\Users\\admin\\Desktop\\Neo4j_aligned.csv") end_time = time.time() print(end_time-start_time) def getDataFromNeo4j(): """ 随机取前200w条数据 :return: """ graph = Graph('http://118.31.10.60:7474', auth=('bxkc_web', 'bxkc_web')) print("Match Neo4j...") start_time = time.time() results = [] result = graph.run("MATCH (o:Organization)-[r]->(p:Project) " "where p.project_name is not null " "RETURN id(o), o.name, o.nicknames, o.area, o.city, o.district, " "id(r), type(r), r.price, " "id(p), p.project_name, p.area, p.city, p.district " "skip 2000000 LIMIT 2000000" ).to_ndarray() results.append(result.tolist()) end_time = time.time() print(end_time-start_time) print("write into csv...") start_time = time.time() df_list = [] for i in range(len(results)): for j in range(len(results[i])): df_list.append(results[i][j]) df = pd.DataFrame(df_list) df.columns = ["id(o)", "o.name", "o.nicknames", "o.area", "o.city", "o.district", "id(r)", "type(r)", "r.price", "id(p)", "p.project_name", "p.area", "p.city", "p.district"] df.to_csv("C:\\Users\\admin\\Desktop\\Neo4j.csv") end_time = time.time() print(end_time-start_time) def testNeo4j(): graph = Graph('http://118.31.10.60:7474', auth=('bxkc_web', 'bxkc_web')) print("Match Neo4j...") start_time = time.time() a = graph.run("MATCH (o:Organization)-[r]->(p:Project) " "where p.project_name is not null " "RETURN id(o), o.name, o.nicknames, o.area, o.city, o.district, " "id(r), type(r), r.price, " "id(p), p.project_name, p.area, p.city, p.district " "skip 2000000 LIMIT 2000000").to_ndarray() end_time = time.time() print(end_time-start_time) print("write into csv...") start_time = time.time() df = pd.DataFrame(a) df.columns = ["id(o)", "o.name", "o.nicknames", "o.area", "o.city", "o.district", "id(r)", "type(r)", "r.price", "id(p)", "p.project_name", "p.area", "p.city", "p.district"] df.to_csv("C:\\Users\\admin\\Desktop\\Neo4j1.csv") end_time = time.time() print(end_time-start_time) def Neo4j2Data(filename): dir = dir_project df = pd.read_csv(filename) # files triples = [] ent_ids = [] training_attrs = [] # df = df[0:100000] for column in df.columns: df[column] = df[column].apply(lambda x: "" if x == "未知" else x) for index, row in df.iterrows(): triples.append(str(row["id(o)"]) + "\t" + str(row["id(r)"]) + "\t" + str(row["id(p)"]) + "\n") ent_ids.append(str(row["id(o)"]) + "\t" + "O" + str(row["o.name"]) + "\n") ent_ids.append(str(row["id(p)"]) + "\t" + "P" + str(row["p.project_name"]) + "\n") training_attrs.append("O" + str(row["o.name"]) + "\t" + str(row["o.nicknames"]) + "\t" + str(row["o.area"]) + "\t" + str(row["o.city"]) + "\t" + str(row["o.district"]) + "\n") training_attrs.append("P" + str(row["p.project_name"]) + "\t" + str(row["p.area"]) + "\t" + str(row["p.city"]) + "\t" + str(row["p.district"]) + "\n") triples1 = triples ent_ids1 = ent_ids training_attrs1 = training_attrs data2File(triples1, dir+"triples_1_real") # data2File(triples2, dir+"triples_2_real") data2File(ent_ids1, dir+"ent_ids_1_real") # data2File(ent_ids2, dir+"ent_ids_2_real") data2File(training_attrs1, dir+"training_attrs_1") # data2File(training_attrs1, dir+"training_attrs_2_a") def data2File(_list, filename): with open(filename, 'w', encoding='UTF-8') as f: f.writelines(_list) def file2Data(filename): with open(filename, 'r', encoding='UTF-8') as f: _list = f.readlines() return _list def data2FileAppend(_list, filename): with open(filename, 'a+', encoding='UTF-8') as f: f.writelines(_list) def similarity(): df = pd.read_csv(dir_Neo4j_File) # df1 = df[10000:20000] # 59000:1000000 1037000:1500000 df = df[:100000] df = df[["id(o)", "o.name", "o.area", "o.city"]] # df1 = df1[["id(o)", "o.name", "o.area", "o.city"]] org_list = [] for index, row in df.iterrows(): org_list.append(str(row["id(o)"]) + " " + str(row["o.name"]) + " " + str(row["o.area"]) + " " + str(row["o.city"])) # org_list1 = [] # for index, row in df1.iterrows(): # org_list1.append(str(row["id(o)"]) + " " + str(row["o.name"]) + " " + str(row["o.area"]) # + " " + str(row["o.city"])) # 去重 org_list = list(set(org_list)) print("去重后org_list:", len(org_list)) # org_list1 = list(set(org_list1)) # print("去重后org_list1:", len(org_list1)) # 分离id和其他字段 id_list = [] for i in range(len(org_list)): ss = org_list[i].split(" ") id_list.append(ss[0]) org_list[i] = ss[1] + " " + ss[2] + " " + ss[3] # id_list1 = [] # for i in range(len(org_list1)): # ss = org_list1[i].split(" ") # id_list1.append(ss[0]) # org_list1[i] = ss[1] + " " + ss[2] + " " + ss[3] # 循环计算相似度 # 阈值 alpha = 95 beta = 98 alpha_dict ={} beta_dict = {} alpha_list = [] beta_list = [] for i in range(len(org_list)): if i % 1000 == 0: print("Loop:", i) i2 = i+1 for j in range(i2): org1 = org_list[i] org2 = org_list[j] # 初步匹配两个字符串中的字符,避免每条都计算相似度,效率低 org_name1 = org1.split(" ")[0] org_name2 = org2.split(" ")[0] cnt = 0 if len(org_name1) <= len(org_name2): for c in org_name1: if c in org_name2: cnt += 1 else: for c in org_name2: if c in org_name1: cnt += 1 if cnt / (len(org_name1) if len(org_name1) <= len(org_name2) else len(org_name2)) <= 0.7: continue if org1 == org2: continue if org1 in org2: org1 = org2 if org2 in org1: org2 = org1 sim = fuzz.ratio(org1, org2) negative_flag = 1 if alpha <= sim: alpha_dict[(org_list[i], org_list[j])] = sim # 判断IDs是否在已对齐中 aligned_list = file2Data(dir_ref_ent_ids_real) for ids in aligned_list: if str(id_list[i])+"\t"+str(id_list[j]) == ids \ or str(id_list[j])+"\t"+str(id_list[i]) == ids: negative_flag = 0 break if negative_flag: alpha_list.append(org_list[i]+"#"+org_list[j]+"#"+str(sim)+"\t" +str(id_list[i])+"\t"+str(id_list[j])+"\n") print(org_list[i], "#", org_list[j], "#", sim, id_list[i], id_list[j]) # if sim >= beta: # beta_dict[(org_list[i], org_list[j])] = sim # beta_list.append([org_list[i], id_list[i], org_list[j], id_list[j], sim]) # print(org_list[i], "#", org_list[j], "#", sim, id_list[i], id_list[j]) # df = pd.DataFrame.from_dict(alpha_dict, orient='index') # df1 = pd.DataFrame.from_dict(beta_dict, orient='index') # df = pd.DataFrame(alpha_list) # df1 = pd.DataFrame(beta_list) data2File(alpha_list, "C:\\Users\\admin\\Desktop\\ref_ent_ids_real_neg") # data2File(alpha_list, "C:\\Users\\admin\\Desktop\\Similar_10w") # df.to_csv("C:\\Users\\admin\\Desktop\\Similar0.9.csv") # df1.to_csv("C:\\Users\\admin\\Desktop\\Similar0.95.csv") def resetID4KeyValue(filename, start_index): _list = file2Data(filename) map_dict = loadDict(dir_ModelId2RealId) for i in range(len(_list)): ss = _list[i].split("\t") real_id = ss[0] if len(ss) <= 1: _list[i].split(" ") continue value = ss[1] if str(real_id) in map_dict.keys(): model_id = map_dict[str(real_id)] else: map_dict[str(real_id)] = start_index model_id = start_index _list[i] = str(model_id) + "\t" + value # print(_list[i]) start_index += 1 data2File(_list, dir_ent_ids_1) saveDict(dir_ModelId2RealId, map_dict) return def resetID4Tuple(filename): _list = file2Data(filename) map_dict = loadDict(dir_ModelId2RealId) for i in range(len(_list)): ss = _list[i][:-1].split("\t") real_id_1 = ss[0] real_id_2 = ss[1] if str(real_id_1) in map_dict.keys(): model_id_1 = map_dict[str(real_id_1)] else: print("not exists ID1 mapping:", real_id_1) if str(real_id_2) in map_dict.keys(): model_id_2 = map_dict[str(real_id_2)] else: print("not exists ID2 mapping:", real_id_2) _list[i] = str(model_id_1) + "\t" + str(model_id_2) + "\n" data2File(_list, dir_ref_ent_ids_neg) def resetID4Triple(filename): _list = file2Data(filename) map_dict = loadDict(dir_ModelId2RealId) for i in range(len(_list)): ss = _list[i][:-1].split("\t") real_id_1 = ss[0] ref = ss[1] real_id_2 = ss[2] if str(real_id_1) in map_dict.keys(): model_id_1 = map_dict[str(real_id_1)] else: print("not exists ID1 mapping:", real_id_1) if str(real_id_2) in map_dict.keys(): model_id_2 = map_dict[str(real_id_2)] else: print("not exists ID2 mapping:", real_id_2) _list[i] = str(model_id_1) + "\t" + str(ref) + "\t" + str(model_id_2) + "\n" data2File(_list, dir_triples_1) return def loadDict(filename): with open(filename, "r") as json_file: dic = json.load(json_file) return dic def saveDict(filename, dic): with open(filename, 'w') as json_file: json.dump(dic, json_file) def deleteDuplicateId(filename): ## 去重 _dir = filename lines = file2Data(_dir) print(len(lines)) lines = list(set(lines)) lines.sort(key=lambda x: x.split("\t")[0]) print(len(lines)) data2File(lines, _dir) def deleteDuplicateId2(filename): ## 去重 _dir = filename lines = file2Data(_dir) print(len(lines)) new_lines = [] for i in range(len(lines)): ss = lines[i][:-1].split("\t") if int(ss[0]) > int(ss[1]): new_lines.append(ss[1] + "\t" + ss[0] + "\n") else: new_lines.append(ss[0] + "\t" + ss[1] + "\n") new_lines = list(set(new_lines)) new_lines.sort(key=lambda x: x.split("\t")[0]) print(len(new_lines)) data2File(new_lines, _dir) def initMapDict(): d = {"-1": -1} saveDict(dir_ModelId2RealId, d) d = loadDict(dir_ModelId2RealId) # print(d, type(d)) def multiProcess(): process_num = 15 skip_num = 300000 limit_num = 50000 for i in range(process_num): print(skip_num, limit_num) p = mp.Process(target=getDataFromNeo4j3, args=(skip_num, limit_num)) skip_num += limit_num p.start() def multiThread(): return def clean_ID_Entity(filename): lines = file2Data(filename) print(len(lines)) new_lines = [] for i in range(len(lines)): ss = lines[i].split("\t") if len(ss) == 2: new_lines.append(ss[0] + "\t" + ss[1]) print(len(new_lines)) data2File(new_lines, filename) def mergeFile(file_name_prefix, target_file, file_num, limit_num): skip_num = 0 df_all = pd.DataFrame() for i in range(file_num): df = pd.read_csv(file_name_prefix + "_" + str(skip_num) + "_" + str(limit_num) + ".csv") df = df[["id(o)", "o.name", "o.nicknames", "o.area", "o.city", "o.district", "id(r)", "type(r)", "r.price", "id(p)", "p.project_name", "p.area", "p.city", "p.district"]] df_all = df_all.append(df) # print(df_all) skip_num += limit_num df_all.columns = ["id(o)", "o.name", "o.nicknames", "o.area", "o.city", "o.district", "id(r)", "type(r)", "r.price", "id(p)", "p.project_name", "p.area", "p.city", "p.district"] df_all.to_csv(target_file) def alignAppend2Data(filename=dir_Neo4j_Align_File): dir = dir_align_project dir2dir = dir_project df = pd.read_csv(filename) # files triples = [] ent_ids = [] training_attrs = [] for column in df.columns: df[column] = df[column].apply(lambda x: "" if x == "未知" else x) for index, row in df.iterrows(): triples.append(str(row["id(o)"]) + "\t" + str(row["id(r)"]) + "\t" + str(row["id(p)"]) + "\n") ent_ids.append(str(row["id(o)"]) + "\t" + "O" + str(row["o.name"]) + "\n") ent_ids.append(str(row["id(p)"]) + "\t" + "P" + str(row["p.project_name"]) + "\n") training_attrs.append("O" + str(row["o.name"]) + "\t" + str(row["o.nicknames"]) + "\t" + str(row["o.area"]) + "\t" + str(row["o.city"]) + "\t" + str(row["o.district"]) + "\n") training_attrs.append("P" + str(row["p.project_name"]) + "\t" + str(row["p.area"]) + "\t" + str(row["p.city"]) + "\t" + str(row["p.district"]) + "\n") data2File(triples, dir+"triples_1_real") data2File(ent_ids, dir+"ent_ids_1_real") data2File(training_attrs, dir+"training_attrs_1") data2FileAppend(triples, dir2dir+"triples_1_real") data2FileAppend(ent_ids, dir2dir+"ent_ids_1_real") data2FileAppend(training_attrs, dir2dir+"training_attrs_1") return def getAlignIDOrg(): list1 = file2Data(dir_align_project+"ref_ent_ids_real") list2 = file2Data(dir_align_project+"ent_ids_1_real") id_org_dict = {} id_align = [] for i in range(len(list2)): ss = list2[i][:-1].split("\t") if ss[1][0] == "O": id_org_dict[str(ss[0])] = ss[1] for i in range(len(list1)): ss = list1[i][:-1].split("\t") if str(ss[0]) in id_org_dict.keys(): id_align.append(str(ss[0])+"\t"+id_org_dict[str(ss[0])]+"\n") if str(ss[1]) in id_org_dict.keys(): id_align.append(str(ss[1])+"\t"+id_org_dict[str(ss[1])]+"\n") id_align = list(set(id_align)) data2File(id_align, dir_align_project+"ent_ids_aligned") def test(): print(os.getcwd()) # id_vec = [[100, 1], [60, 2], [4, 200], [33, 300]] # id_ndarray = np.array(id_vec) # print(id_vec) # sim = scipy.spatial.distance.cdist(id_vec, id_vec[:2], metric='cityblock') # print(sim) # print(sim.shape) # print(heapq.nlargest(2, range(len(id_ndarray)), id_ndarray.take)) if __name__ == '__main__': # testNeo4j() # Neo4j2Data() # similarity() # initMapDict() # deleteDuplicateId() # resetID4KeyValue(dir_ent_ids_1_real, 0) # resetID4Tuple(dir_ref_ent_ids_real) # resetID4Triple(dir_triples_1_real) # getDataFromNeo4j2() ##################################### # Neo4j2Data(dir_Neo4j_File) # alignAppend2Data(dir_Neo4j_Align_File) # deleteDuplicateId(dir_ent_ids_1_real) # initMapDict() # clean_ID_Entity(dir_ent_ids_1_real) # resetID4KeyValue(dir_ent_ids_1_real, 0) # resetID4Tuple(dir_ref_ent_ids_real_neg) # resetID4Triple(dir_triples_1_real) # deleteDuplicateId2(dir_ref_ent_ids_real) ####################################### # deleteDuplicateId2() # getDataFromNeo4j3() # multiProcess() # dir = "C:\\Users\\admin\\Desktop\\Neo4j" # mergeFile(dir, dir+"4.csv", 8, 30000) # ll = [0, 1, 2, 3] # for i in range(0, len(ll), 2): # i2 = i+1 # for j in range(0, len(ll), 2): # print(ll[i], "-", ll[j]) test() # getAlignIDOrg() # resetID4KeyValue(dir_project+"ent_ids_aligned", 0) # for i in range(0, 22, 5): # print(i) # _dict = loadDict(dir_ModelId2RealId) # # for i in _dict.keys(): # if i == "": # print(i, _dict[i]) # j = i # _dict.pop(j) # saveDict(dir_ModelId2RealId, _dict) # getDataFromNeo4j_align() # alignAppend2Data()