#coding:UTF8 from odps.udf import annotate from odps.distcache import get_cache_archive from odps.distcache import get_cache_file from odps.udf import BaseUDTF,BaseUDAF import threading import logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') import time import json def log(msg): logging.info(msg) # 配置pandas依赖包 def include_package_path(res_name): import os, sys archive_files = get_cache_archive(res_name) dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files if '.dist_info' not in f.name], key=lambda v: len(v)) _path = dir_names[0].split(".zip/files")[0]+".zip/files" log("add path:%s"%(_path)) sys.path.append(_path) return os.path.dirname(dir_names[0]) # 可能出现类似RuntimeError: xxx has been blocked by sandbox # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true def include_file(file_name): import os, sys so_file = get_cache_file(file_name) sys.path.append(os.path.dirname(os.path.abspath(so_file.name))) def include_so(file_name): import os, sys so_file = get_cache_file(file_name) with open(so_file.name, 'rb') as fp: content=fp.read() so = open(file_name, "wb") so.write(content) so.flush() so.close() #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入 def init_env(list_files,package_name): import os,sys if len(list_files)==1: so_file = get_cache_file(list_files[0]) cmd_line = os.path.abspath(so_file.name) os.system("unzip -o %s -d %s"%(cmd_line,package_name)) elif len(list_files)>1: cmd_line = "cat" for _file in list_files: so_file = get_cache_file(_file) cmd_line += " "+os.path.abspath(so_file.name) cmd_line += " > temp.zip" os.system(cmd_line) os.system("unzip -o temp.zip -d %s"%(package_name)) # os.system("rm -rf %s/*.dist-info"%(package_name)) # return os.listdir(os.path.abspath("local_package")) # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package"))) # os.system("source ~/.bashrc") sys.path.insert(0,os.path.abspath(package_name)) # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real")) import platform def getSet(list_dict,key): _set = set() for item in list_dict: if key in item: if item[key]!='' and item[key] is not None: if re.search("^[\d\.]+$",item[key]) is not None: _set.add(str(float(item[key]))) else: _set.add(str(item[key])) return _set def split_with_time(list_dict,sort_key,timedelta=86400*120,more_than_one=True): group_num = 1 if more_than_one: group_num = 2 if len(list_dict)>0: if sort_key in list_dict[0]: list_dict.sort(key=lambda x:x[sort_key]) list_group = [] _begin = 0 for i in range(len(list_dict)-1): if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])1: list_group.append(_group) _begin = i + 1 if len(list_dict)>=group_num: _group = [] for j in range(_begin,len(list_dict)): _group.append(list_dict[j]) if len(_group)>0: list_group.append(_group) return list_group return [list_dict] @annotate('bigint,bigint,string,string,string,string,string,string,bigint->string') class f_merge_rule_limit_num_contain_greater(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同 ''' def __init__(self): import logging import json,re global json,logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [list()] def iterate(self, buffer,docid,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column,greater_column,MAX_NUM): buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1, "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4, "contain_column":contain_column,"greater_column":greater_column,"MAX_NUM":MAX_NUM}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): MAX_NUM = 5 if len(buffer[0])>0: MAX_NUM = buffer[0][0]["MAX_NUM"] list_split = split_with_time(buffer[0],"page_time_stamp") list_group = [] for _split in list_split: flag = True keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"] dict_set = {} for _key in keys: dict_set[_key] = set() if len(_split)>MAX_NUM: flag = False else: for _key in keys: logging.info(_key+str(getSet(_split,_key))) if len(getSet(_split,_key))>1: flag = False break MAX_CONTAIN_COLUMN = None #判断组内每条公告是否包含 if flag: for _d in _split: contain_column = _d["contain_column"] if contain_column is not None and contain_column !="": if MAX_CONTAIN_COLUMN is None: MAX_CONTAIN_COLUMN = contain_column else: if len(MAX_CONTAIN_COLUMN)1: list_group.append(list(_set_docid)) return json.dumps(list_group) def getDiffIndex(list_dict,key): _set = set() for _i in range(len(list_dict)): item = list_dict[_i] if key in item: if item[key]!='' and item[key] is not None: if re.search("^\d[\d\.]*$",item[key]) is not None: _set.add(str(float(item[key]))) else: _set.add(str(item[key])) if len(_set)>1: return _i return len(list_dict) @annotate('bigint,bigint,string,string,string,string,string,string,string,bigint->string') class f_remege_limit_num_contain(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同 ''' def __init__(self): import logging import json,re global json,logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [list()] def iterate(self, buffer,docid,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column1,contain_column2,notLike_column,confidence): buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1, "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4, "contain_column1":contain_column1,"contain_column2":contain_column2,"notLike_column":notLike_column,"confidence":confidence}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def getNotLikeSet(self,_dict,column_name): column_value = _dict.get(column_name,None) _set = set() if column_value is not None: for _i in range(1,len(column_value)): _set.add(column_value[_i-1:_i+1]) _dict["notLike_set"] = _set def getSimilarity(self,_set1,_set2): _sum = max([1,min([len(_set1),len(_set2)])]) return len(_set1&_set2)/_sum def terminate(self, buffer): list_group = [] the_group = buffer[0] SIM_PROB = 0.6 for _d in the_group: self.getNotLikeSet(_d,"notLike_column") #判断多个值与否 keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"] re_merge = False for _key in keys: if len(getSet(the_group,_key))>1: re_merge = True break #判断是否相似而不相同 re_merge_sim = False for _i1 in range(0,len(the_group)): for _j1 in range(_i1+1,len(the_group)): _set1 = the_group[_i1]["notLike_set"] _set2 = the_group[_j1]["notLike_set"] _sim = self.getSimilarity(_set1,_set2) if _sim>SIM_PROB and _sim<1: re_merge_sim = True break contain_keys = ["contain_column1","contain_column2"] logging.info(the_group) logging.info(str(re_merge)+str(re_merge_sim)) if re_merge or re_merge_sim: the_group.sort(key=lambda x:x["confidence"],reverse=True) the_group.sort(key=lambda x:x["page_time_stamp"]) #重新成组 dict_docid_doc = {} for _doc in the_group: dict_docid_doc[_doc["docid"]] = _doc for _doc in the_group: merge_flag = False for _index in range(len(list_group)): _g = list_group[_index] hit_count = 0 dict_temp = dict() #多个值的异常 if re_merge: for _c_key in contain_keys: dict_temp[_c_key] = _g[_c_key] if _g[_c_key] is not None and _doc[_c_key] is not None: if len(_g[_c_key])>len(_doc[_c_key]): if str(_g[_c_key]).find(str(_doc[_c_key]))>=0: dict_temp[_c_key] = _g[_c_key] hit_count += 1 else: if str(_doc[_c_key]).find(str(_g[_c_key]))>=0: dict_temp[_c_key] = _doc[_c_key] _g[_c_key] = _doc[_c_key] hit_count += 1 else: hit_count = 1 # if hit_count==len(contain_keys): if hit_count>0: _flag_sim = False #相似而不相同的异常 if re_merge_sim: for _docid in _g["docid"]: tmp_d = dict_docid_doc[_docid] _sim = self.getSimilarity(tmp_d["notLike_set"],_doc["notLike_set"]) if _sim>SIM_PROB and _sim<1: _flag_sim = True if not _flag_sim: for _c_key in dict_temp.keys(): _g[_c_key] = dict_temp[_c_key] _g["docid"].append(_doc["docid"]) merge_flag = True break if not merge_flag: _dict = dict() _dict["docid"] = [_doc["docid"]] for _c_key in contain_keys: _dict[_c_key] = _doc[_c_key] list_group.append(_dict) final_group = [] #判断是否符合一个值 for _group in list_group: _split = [] for _docid in _group["docid"]: _split.append(dict_docid_doc[_docid]) #通过置信度排序,尽可能保留组 _split.sort(key=lambda x:x["confidence"],reverse=True) #置信度 list_key_index = [] for _k in keys: list_key_index.append(getDiffIndex(_split,_k)) _index = min(list_key_index) final_group.append([_c["docid"] for _c in _split[:_index]]) for _c in _split[_index:]: final_group.append([_c["docid"]]) #若是找到两个以上,则全部单独成组,否则成一组 # _flag = True # for _key in keys: # if len(getSet(_split,_key))>1: # _flag = False # break # if not _flag: # for _docid in _group["docid"]: # final_group.append([_docid]) # else: # final_group.append(list(set(_group["docid"]))) else: final_group = [list(set([item["docid"] for item in the_group]))] log(str(final_group)) return json.dumps(final_group) def getCurrent_date(format="%Y-%m-%d %H:%M:%S"): _time = time.strftime(format,time.localtime()) return _time @annotate('bigint->string') class f_get_single_merged_bychannel(BaseUDTF): def process(self,docid): _d = {"data":{str(docid):[]},"process_time":getCurrent_date()} self.forward(json.dumps(_d)) @annotate('bigint,bigint,bigint,string,string,string,string,string,string,string,bigint,bigint,string->string') class f_remege_limit_num_contain_bychannel(BaseUDAF): '''f_remege_limit_num_contain_bychannel 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同 ''' def __init__(self): import logging import json,re global json,logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [list()] def iterate(self, buffer,docid,docchannel,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column1,contain_column2,notLike_column,confidence,extract_count,json_dicttime): _dict = {"docid":docid,"docchannel":docchannel,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1, "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4, "contain_column1":contain_column1,"contain_column2":contain_column2,"notLike_column":notLike_column,"confidence":confidence, "extract_count":extract_count,"json_dicttime":json_dicttime} buffer[0].append(_dict) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def getNotLikeSet(self,_dict,column_name): column_value = _dict.get(column_name,None) _set = set() if column_value is not None: for _i in range(1,len(column_value)): _set.add(column_value[_i-1:_i+1]) _dict["notLike_set"] = _set def getSimilarity(self,_set1,_set2): _sum = max([1,min([len(_set1),len(_set2)])]) return len(_set1&_set2)/_sum def difftimecount(self,_dict1,_dict2): _count = 0 for k,v in _dict1.items(): if v is not None and v!="": v1 = _dict2.get(k) if v1 is not None and v1!="": if v!=v1: _count += 1 return _count def splitByTimezone(self,list_dict,_key): cluster_docid = [] dict_docid_key = {} dict_docid = {} for _dict in list_dict: if _dict.get(_key,"") is None or _dict.get(_key,"")=="": dict_docid_key[_dict.get("docid")] = {} else: dict_docid_key[_dict.get("docid")] = json.loads(_dict.get(_key)) dict_docid[_dict.get("docid")] = _dict for _dict in list_dict: _find = False for _cl in cluster_docid: _legal = True for _c in _cl: if self.difftimecount(dict_docid_key.get(_c),dict_docid_key.get(_dict.get("docid")))>0: _legal = False break if _legal: _cl.append(_dict.get("docid")) _find = True if not _find: cluster_docid.append([_dict.get("docid")]) _result = [] for _cl in cluster_docid: _r = [] for _c in _cl: _r.append(dict_docid.get(_c)) _result.append(_r) return _result def terminate(self, buffer): list_group = [] the_group = buffer[0] SIM_PROB = 0.6 for _d in the_group: self.getNotLikeSet(_d,"notLike_column") #判断多个值与否 keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"] re_merge = False for _key in keys: if len(getSet(the_group,_key))>1: log("has_more_than_one:%s"%str(getSet(the_group,_key))) re_merge = True break #判断是否相似而不相同 re_merge_sim = False for _i1 in range(0,len(the_group)): for _j1 in range(_i1+1,len(the_group)): _set1 = the_group[_i1]["notLike_set"] _set2 = the_group[_j1]["notLike_set"] _sim = self.getSimilarity(_set1,_set2) if _sim>SIM_PROB and _sim<1: re_merge_sim = True break contain_keys = ["contain_column1","contain_column2"] logging.info(the_group) logging.info(str(re_merge)+str(re_merge_sim)) #重新成组 dict_docid_doc = {} for _doc in the_group: dict_docid_doc[_doc["docid"]] = _doc if re_merge or re_merge_sim: the_group.sort(key=lambda x:x["confidence"],reverse=True) the_group.sort(key=lambda x:x["page_time_stamp"]) for _doc in the_group: merge_flag = False for _index in range(len(list_group)): _g = list_group[_index] hit_count = 0 dict_temp = dict() #多个值的异常 if re_merge: for _c_key in contain_keys: dict_temp[_c_key] = _g[_c_key] if _g[_c_key] is not None and _doc[_c_key] is not None: if len(_g[_c_key])>len(_doc[_c_key]): if str(_g[_c_key]).find(str(_doc[_c_key]))>=0: dict_temp[_c_key] = _g[_c_key] hit_count += 1 else: if str(_doc[_c_key]).find(str(_g[_c_key]))>=0: dict_temp[_c_key] = _doc[_c_key] _g[_c_key] = _doc[_c_key] hit_count += 1 else: hit_count = 1 # if hit_count==len(contain_keys): if hit_count>0: _flag_sim = False #相似而不相同的异常 if re_merge_sim: for _docid in _g["docid"]: tmp_d = dict_docid_doc[_docid] _sim = self.getSimilarity(tmp_d["notLike_set"],_doc["notLike_set"]) if _sim>SIM_PROB and _sim<1: _flag_sim = True if not _flag_sim: for _c_key in dict_temp.keys(): _g[_c_key] = dict_temp[_c_key] _g["docid"].append(_doc["docid"]) merge_flag = True break if not merge_flag: _dict = dict() _dict["docid"] = [_doc["docid"]] for _c_key in contain_keys: _dict[_c_key] = _doc[_c_key] list_group.append(_dict) final_group = [] #判断是否符合一个值 for _group in list_group: _split = [] for _docid in _group["docid"]: _split.append(dict_docid_doc[_docid]) #通过置信度排序,尽可能保留组 _split.sort(key=lambda x:x["confidence"],reverse=True) #置信度 list_key_index = [] for _k in keys: list_key_index.append(getDiffIndex(_split,_k)) _index = min(list_key_index) final_group.append([_c["docid"] for _c in _split[:_index]]) for _c in _split[_index:]: final_group.append([_c["docid"]]) #若是找到两个以上,则全部单独成组,否则成一组 # _flag = True # for _key in keys: # if len(getSet(_split,_key))>1: # _flag = False # break # if not _flag: # for _docid in _group["docid"]: # final_group.append([_docid]) # else: # final_group.append(list(set(_group["docid"]))) else: final_group = [list(set([item["docid"] for item in the_group]))] log("%s--%s"%("final_group",str(final_group))) #每个channel选择一篇公告 final_group_channel = [] for _group in final_group: dict_channel_id = {} otherChannel = 10000 for _docid in _group: _channel = dict_docid_doc[_docid].get("docchannel") if _channel in [114,115,116,117]: otherChannel += 1 _channel = otherChannel if _channel not in dict_channel_id: dict_channel_id[_channel] = [] dict_channel_id[_channel].append({"docid":_docid,"page_time_stamp":dict_docid_doc[_docid].get("page_time_stamp"), "extract_count":dict_docid_doc[_docid].get("extract_count"), "json_dicttime":dict_docid_doc[_docid].get("json_dicttime")}) #根据日期进行切分 new_dict_channel_id = {} log("%s:%s"%("dict_channel_id",str(dict_channel_id))) for k,v in dict_channel_id.items(): list_time_docids = split_with_time(v,"page_time_stamp",86400*6,more_than_one=False) log(list_time_docids) for _l in list_time_docids: list_t = self.splitByTimezone(_l,"json_dicttime") for _t in list_t: otherChannel += 1 new_dict_channel_id[otherChannel] = _t log("%s:%s"%("new_dict_channel_id",str(new_dict_channel_id))) channel_dict = {} for k,v in new_dict_channel_id.items(): v.sort(key=lambda x:x["docid"]) v.sort(key=lambda x:x["extract_count"],reverse=True) channel_dict[v[0]["docid"]] = [] for _docs in v[1:]: channel_dict[v[0]["docid"]].append(_docs["docid"]) _d = {"data":channel_dict,"process_time":getCurrent_date()} final_group_channel.append(_d) return json.dumps(final_group_channel) @annotate('string -> string') class f_get_remerge_group_channel(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,json_remerge): if json_remerge is not None: list_group = json.loads(json_remerge) for _group in list_group: self.forward(json.dumps(_group)) @annotate('string -> string') class f_get_remerge_group(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,json_remerge): if json_remerge is not None: list_group = json.loads(json_remerge) for _group in list_group: l_g = list(set(_group)) l_g.sort(key=lambda x:x) list_docid = [str(_docid) for _docid in l_g] self.forward(",".join(list_docid)) @annotate('bigint,bigint,string->string') class f_merge_probability(BaseUDAF): ''' 合并组为一条记录 ''' def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,docid,page_time_stamp,_type): buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"type":_type}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_dict = buffer[0] list_dict = list_dict[:10000] list_group = split_with_time(list_dict,sort_key="page_time_stamp",timedelta=86400*120) return json.dumps(list_group) @annotate('string -> bigint,bigint,bigint,bigint,string') class f_split_merge_probability(BaseUDTF): def __init__(self): import logging import json global logging,json logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,list_group_str): logging.info("0") logging.info(list_group_str) if list_group_str is not None: logging.info("1") try: list_group = json.loads(list_group_str) logging.info("2") for _group in list_group: if len(_group)>0: _type = _group[0].get("type","") logging.info("3%d"%len(list_group)) # _group.sort(key=lambda x:x["page_time_stamp"]) _len = min(100,len(_group)) for _index_i in range(_len): _count = 0 for _index_j in range(_index_i+1,_len): if abs(_group[_index_j]["page_time_stamp"]-_group[_index_i]["page_time_stamp"])>86400*120: break _count += 1 _docid1 = _group[_index_i]["docid"] _docid2 = _group[_index_j]["docid"] if _docid1<_docid2: self.forward(_docid1,_docid2,1,_len,_type) else: self.forward(_docid2,_docid1,1,_len,_type) except Exception as e: logging(str(e)) @annotate('bigint,bigint,string->string') class f_merge_groupPairs(BaseUDAF): ''' 合并组为一条记录 ''' def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,is_exists,counts,_type): buffer[0].append({"is_exists":is_exists,"counts":counts,"_type":_type}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_dict = buffer[0] list_dict = list_dict[:10000] return json.dumps(list_dict) @annotate("string -> bigint,bigint,bigint") class f_merge_getLabel(BaseUDTF): def __init__(self): import logging import json global logging,json def process(self,str_docids): if str_docids is not None: list_docids = [int(i) for i in str_docids.split(",")] list_docids.sort(key=lambda x:x) _len = min(100,len(list_docids)) for index_i in range(_len): docid_less = list_docids[index_i] for index_j in range(index_i+1,_len): docid_greater = list_docids[index_j] self.forward(docid_less,docid_greater,1) def getSimilarityOfString(str1,str2): _set1 = set() _set2 = set() if str1 is not None: for i in range(1,len(str1)): _set1.add(str1[i-1:i+1]) if str2 is not None: for i in range(1,len(str2)): _set2.add(str2[i-1:i+1]) _len = max(1,min(len(_set1),len(_set2))) return len(_set1&_set2)/_len def check_columns(tenderee_less,tenderee_greater, agency_less,agency_greater,project_code_less,project_code_greater,project_name_less,project_name_greater, win_tenderer_less,win_tenderer_greater,win_bid_price_less,win_bid_price_greater, bidding_budget_less,bidding_budget_greater,doctitle_refine_less,doctitle_refine_greater): flag = True _set_tenderee = set() if tenderee_less is not None and tenderee_less!="": _set_tenderee.add(tenderee_less) if tenderee_greater is not None and tenderee_greater!="": _set_tenderee.add(tenderee_greater) if len(_set_tenderee)>1: return False code_sim = getSimilarityOfString(project_code_less,project_code_greater) if code_sim>0.6 and code_sim<1: return False #同批次不同编号 if getLength(project_code_less)>0 and getLength(project_code_greater)>0: _split_code_less = project_code_less.split("-") _split_code_greater = project_code_greater.split("-") if len(_split_code_less)>1 and len(_split_code_greater)>1: if _split_code_less[0]==_split_code_greater[0] and project_code_less!=project_code_greater: return False _set_win_tenderer = set() if win_tenderer_less is not None and win_tenderer_less!="": _set_win_tenderer.add(win_tenderer_less) if win_tenderer_greater is not None and win_tenderer_greater!="": _set_win_tenderer.add(win_tenderer_greater) if len(_set_win_tenderer)>1: return False _set_win_bid_price = set() if win_bid_price_less is not None and win_bid_price_less!="": _set_win_bid_price.add(float(win_bid_price_less)) if win_bid_price_greater is not None and win_bid_price_greater!="": _set_win_bid_price.add(float(win_bid_price_greater)) if len(_set_win_bid_price)>1: return False _set_bidding_budget = set() if bidding_budget_less is not None and bidding_budget_less!="": _set_bidding_budget.add(float(bidding_budget_less)) if bidding_budget_greater is not None and bidding_budget_greater!="": _set_bidding_budget.add(float(bidding_budget_greater)) if len(_set_bidding_budget)>1: return False return True def getSimLevel(str1,str2): str1_null = False str2_null = False _v = 0 if str1 is None or str1=="": str1_null = True if str2 is None or str2=="": str2_null = True if str1_null and str2_null: _v = 2 elif str1_null and not str2_null: _v = 4 elif not str1_null and str2_null: _v = 6 elif not str1_null and not str2_null: if str1==str2: _v = 10 else: _v = 0 return _v import math def featurnCount(_count,max_count=100): return max(0,min(1,_count))*(1/math.sqrt(max(1,_count-1))) def getLength(_str): return len(_str if _str is not None else "") @annotate("string->bigint") class f_get_min_counts(object): def evaluate(self,json_context): _context = json.loads(json_context) min_counts = 100 for item in _context: if item["counts"]string,double") class f_merge_featureMatrix(BaseUDTF): def __init__(self): import logging import json global logging,json def process(self,json_context,tenderee_less,tenderee_greater, agency_less,agency_greater,project_code_less,project_code_greater,project_name_less,project_name_greater, win_tenderer_less,win_tenderer_greater,win_bid_price_less,win_bid_price_greater, bidding_budget_less,bidding_budget_greater,doctitle_refine_less,doctitle_refine_greater): if not check_columns(tenderee_less,tenderee_greater, agency_less,agency_greater,project_code_less,project_code_greater,project_name_less,project_name_greater, win_tenderer_less,win_tenderer_greater,win_bid_price_less,win_bid_price_greater, bidding_budget_less,bidding_budget_greater,doctitle_refine_less,doctitle_refine_greater): return _context = json.loads(json_context) min_counts = 100 dict_context = {} for item in _context: if item["counts"]0: same_project_code = True same_project_name = False if project_name_less==project_name_greater and getLength(project_name_less)>0: same_project_name = True same_doctitle_refine = False if doctitle_refine_less==doctitle_refine_greater and getLength(doctitle_refine_less)>0: same_doctitle_refine = True same_tenderee = False if tenderee_less==tenderee_greater and getLength(tenderee_less)>0: same_tenderee = True same_agency = False if agency_less==agency_greater and getLength(agency_less)>0: same_agency = True same_bidding_budget = False if bidding_budget_less==bidding_budget_greater and getLength(bidding_budget_less)>0: same_bidding_budget = True same_win_tenderer = False if win_tenderer_less==win_tenderer_greater and getLength(win_tenderer_less)>0: same_win_tenderer = True same_win_bid_price = False if win_bid_price_less==win_bid_price_greater and getLength(win_bid_price_less)>0: same_win_bid_price = True contain_doctitle = False if getLength(doctitle_refine_less)>0 and getLength(doctitle_refine_greater)>0 and (doctitle_refine_less in doctitle_refine_greater or doctitle_refine_greater in doctitle_refine_less): contain_doctitle = True contain_project_name = False if getLength(project_name_less)>0 and getLength(project_name_greater)>0 and (project_name_less in project_name_greater or project_name_greater in project_name_less): contain_project_name = True total_money_less = 0 if getLength(bidding_budget_less)==0 else float(bidding_budget_less)+0 if getLength(win_bid_price_less)==0 else float(win_bid_price_less) total_money_greater = 0 if getLength(bidding_budget_greater)==0 else float(bidding_budget_greater) +0 if getLength(win_bid_price_greater)==0 else float(win_bid_price_greater) if min_counts<10: _prob = 0.9 if same_project_code and same_win_tenderer and same_tenderee: self.forward(json_matrix,_prob) return if same_tenderee and same_project_name and same_win_tenderer: self.forward(json_matrix,_prob) return if same_tenderee and same_doctitle_refine and same_win_tenderer: self.forward(json_matrix,_prob) return if same_tenderee and same_win_bid_price and same_win_tenderer: self.forward(json_matrix,_prob) return if same_project_code and same_win_bid_price and same_win_tenderer: self.forward(json_matrix,_prob) return if same_project_name and same_win_bid_price and same_win_tenderer: self.forward(json_matrix,_prob) return if same_doctitle_refine and same_win_bid_price and same_win_tenderer: self.forward(json_matrix,_prob) return if same_doctitle_refine and same_bidding_budget and same_win_tenderer: self.forward(json_matrix,_prob) return if same_tenderee and same_doctitle_refine and same_win_tenderer: self.forward(json_matrix,_prob) return if same_tenderee and same_project_code and same_project_name: self.forward(json_matrix,_prob) return if same_tenderee and same_project_code and same_doctitle_refine: self.forward(json_matrix,_prob) return if same_tenderee and same_bidding_budget and same_project_code: self.forward(json_matrix,_prob) return if same_tenderee and same_bidding_budget and same_doctitle_refine: self.forward(json_matrix,_prob) return if same_tenderee and same_bidding_budget and same_project_name: self.forward(json_matrix,_prob) return if same_doctitle_refine and same_project_code and same_project_name: self.forward(json_matrix,_prob) return if min_counts<=5: _prob = 0.8 if same_project_code and same_tenderee: self.forward(json_matrix,_prob) return if same_project_code and same_win_tenderer: self.forward(json_matrix,_prob) return if same_project_name and same_project_code: self.forward(json_matrix,_prob) return if same_project_code and same_doctitle_refine: self.forward(json_matrix,_prob) return if total_money_less==total_money_greater and total_money_less>100000: if same_win_tenderer and (same_win_bid_price or same_bidding_budget): self.forward(json_matrix,_prob) return if same_project_code and same_bidding_budget: self.forward(json_matrix,_prob) return if same_project_code and same_win_bid_price: self.forward(json_matrix,_prob) return if same_bidding_budget and same_win_bid_price and (contain_project_name or contain_doctitle): self.forward(json_matrix,_prob) return if min_counts<=3: _prob = 0.7 if same_project_name or same_project_code or same_doctitle_refine or contain_doctitle or contain_project_name: self.forward(json_matrix,_prob) return self.forward(json_matrix,0) class MergePredictor(): def __init__(self): self.input_size = 46 self.output_size = 2 self.matrix = np.array([[-5.817399024963379, 3.367797374725342], [-18.3098201751709, 17.649206161499023], [-7.115952014923096, 9.236002922058105], [-5.054129123687744, 1.8316771984100342], [6.391637325286865, -7.57396125793457], [-2.8721542358398438, 6.826520919799805], [-5.426159858703613, 10.235260009765625], [-4.240962982177734, -0.32092899084091187], [-0.6378090381622314, 0.4834124445915222], [-1.7574478387832642, -0.17846578359603882], [4.325063228607178, -2.345501661300659], [0.6086963415145874, 0.8325914740562439], [2.5674285888671875, 1.8432368040084839], [-11.195490837097168, 17.4630184173584], [-11.334247589111328, 10.294097900390625], [2.639320135116577, -8.072785377502441], [-2.2689898014068604, -3.6194612979888916], [-11.129570960998535, 18.907018661499023], [4.526485919952393, 4.57423210144043], [-3.170452356338501, -1.3847776651382446], [-0.03280467540025711, -3.0471489429473877], [-6.601675510406494, -10.05613899230957], [-2.9116673469543457, 4.819308280944824], [1.4398306608200073, -0.6549674272537231], [7.091512203216553, -0.142232745885849], [-0.14478975534439087, 0.06628061085939407], [-6.775437831878662, 9.279582023620605], [-0.006781991105526686, 1.6472798585891724], [3.83730149269104, 1.4072834253311157], [1.2229349613189697, -2.1653425693511963], [1.445560336112976, -0.8397432565689087], [-11.325132369995117, 11.231744766235352], [2.3229124546051025, -4.623719215393066], [0.38562265038490295, -1.2645516395568848], [-1.3670002222061157, 2.4323790073394775], [-3.6994268894195557, 0.7515658736228943], [-0.11617227643728256, -0.820703387260437], [4.089913368225098, -4.693605422973633], [-0.4959050714969635, 1.5272167921066284], [-2.7135870456695557, -0.5120691657066345], [0.573157548904419, -1.9375460147857666], [-4.262857437133789, 0.6375582814216614], [-1.8825865983963013, 2.427532911300659], [-4.565115451812744, 4.0269083976745605], [-4.339804649353027, 6.754288196563721], [-4.31907320022583, 0.28193211555480957]]) self.bias = np.array([16.79706382751465, -13.713337898254395]) # self.model = load_model("model/merge.h5",custom_objects={"precision":precision,"recall":recall,"f1_score":f1_score}) def activation(self,vec,_type): if _type=="relu": _vec = np.array(vec) return _vec*(_vec>0) if _type=="tanh": return np.tanh(vec) if _type=="softmax": _vec = np.array(vec) _exp = np.exp(_vec) return _exp/np.sum(_exp) def predict(self,input): _out = self.activation(self.activation(np.matmul(np.array(input).reshape(-1,self.input_size),self.matrix)+self.bias,"tanh"),"softmax") # print(self.model.predict(np.array(input).reshape(-1,46))) return _out @annotate('string,double -> double') class f_getMergeProb(BaseUDTF): def __init__(self): import json include_package_path("numpy-1.18.zip") import numpy as np global json,np self.mp = MergePredictor() def process(self,json_matrix,pre_prob): if not pre_prob>0.5: _matrix = json.loads(json_matrix) _prob = self.mp.predict(_matrix)[0][1] else: _prob = pre_prob if _prob>0.5: self.forward(float(_prob)) @annotate('string -> bigint,bigint') class f_check_remerge_channel(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,json_remerge): if json_remerge is not None: list_group = json.loads(json_remerge) for _group in list_group: _keys = _group.get("data").keys() if len(_keys)>0: main_docid = int(list(_keys)[0]) for k,v in _group.get("data",{}).items(): self.forward(main_docid,int(k)) for _v in v: self.forward(main_docid,int(_v)) @annotate('string -> bigint,bigint') class f_check_remerge(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,json_remerge): if json_remerge is not None: list_group = json.loads(json_remerge) for _group in list_group: for _docid in _group: self.forward(_group[-1],_docid) def getConfidence(rule_id): if rule_id >=1 and rule_id <=20: return 30 elif rule_id>=31 and rule_id<=50: return 20 else: return 10 @annotate('string,bigint -> bigint,bigint,bigint') class f_arrange_group_single(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,json_set_docid,rule_id): if json_set_docid is not None: list_group = json.loads(json_set_docid) for _group in list_group: for index_i in range(len(_group)): for index_j in range(len(_group)): # if index_i!=index_j and _group[index_i]!=_group[index_j]: if index_i!=index_j: self.forward(_group[index_i],_group[index_j],getConfidence(rule_id)) @annotate('bigint,bigint->string') class f_get_merge_docids(BaseUDAF): ''' 合并组为一条记录 ''' def __init__(self): import json global json def new_buffer(self): return [set()] def iterate(self, buffer,docid1,docid2): buffer[0].add(docid1) buffer[0].add(docid2) def merge(self, buffer, pbuffer): buffer[0] |= pbuffer[0] def terminate(self, buffer): set_docid = buffer[0] list_docid = list(set_docid) list_docid.sort(key=lambda x:x) list_docid_str = [] for _docid in list_docid: list_docid_str.append(str(_docid)) return ",".join(list_docid_str) @annotate("string,string,string,string,string,string,string,string,string,string,string,string,string,string->string") class f_encode_time(object): def evaluate(self,time_bidclose,time_bidopen,time_bidstart,time_commencement,time_completion,time_earnest_money_end,time_earnest_money_start,time_get_file_end,time_get_file_start,time_publicity_end,time_publicity_start,time_registration_end,time_registration_start,time_release): _dict = {"time_bidclose":time_bidclose,"time_bidopen":time_bidopen,"time_bidstart":time_bidstart, "time_commencement":time_commencement,"time_completion":time_completion,"time_earnest_money_end":time_earnest_money_end, "time_earnest_money_start":time_earnest_money_start,"time_get_file_end":time_get_file_end,"time_get_file_start":time_get_file_start, "time_publicity_end":time_publicity_end,"time_publicity_start":time_publicity_start,"time_registration_end":time_registration_end, "time_registration_start":time_registration_start,"time_release":time_release} _encode = json.dumps(_dict) return _encode @annotate('string,string -> string,string') class f_decode_ruwei(BaseUDTF): def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self, page_time,sub_docs_json): if sub_docs_json is not None: for sub_docs in json.loads(sub_docs_json): if sub_docs.get("win_tenderer","")!="": self.forward(page_time,sub_docs.get("win_tenderer","")) if sub_docs.get("second_tenderer","")!="": self.forward(page_time,sub_docs.get("second_tenderer","")) if sub_docs.get("third_tenderer","")!="": self.forward(page_time,sub_docs.get("third_tenderer","")) if __name__ == '__main__': a = f_remege_limit_num_contain_bychannel() buffer = a.new_buffer() tmp_s = ''' 234858920 229011768 2022-03-25 1648137600 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工招标文件.pdf 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化施工文件.pdf 珠海大横琴公共设施建设管理有限公司 珠海德联工程咨询有限公司 103 0 7 "{"time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "2022-04-29", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "", "time_publicity_start": "", "time_registration_end": "", "time_registration_start": "", "time_release": ""}" 234858920 232745950 2022-04-12 1649692800 E4404000001002779001 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工招标答疑 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化施工答疑 珠海大横琴公共设施建设管理有限公司 珠海德联工程咨询有限公司 103 0 8 "{"time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "", "time_publicity_start": "", "time_registration_end": "", "time_registration_start": "", "time_release": ""}" 234858920 234858920 2022-04-21 1650470400 E4404000001002779001001 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化施工 101 1 2 "{"time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "", "time_publicity_start": "", "time_registration_end": "", "time_registration_start": "", "time_release": ""}" 234858920 234595980 2022-04-20 1650384000 E4404000001002779001001 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化施工 珠海大横琴公共设施建设管理有限公司 珠海德联工程咨询有限公司 105 0 10 "{"time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "2022-04-22", "time_publicity_start": "2022-04-21", "time_registration_end": "", "time_registration_start": "", "time_release": ""}" 234858920 228908786 2022-03-25 1648137600 E4404000001002779001001 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化施工 珠海大横琴公共设施建设管理有限公司 珠海德联工程咨询有限公司 1795743.68 52 0 8 "{"time_bidclose": "2022-04-20", "time_bidopen": "2022-04-20", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "2022-04-20", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "2022-03-26", "time_publicity_end": "2022-04-26", "time_publicity_start": "", "time_registration_end": "", "time_registration_start": "", "time_release": ""}" 234858920 234523333 2022-04-20 1650384000 E4404000001002779001001 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化施工 101 0 2 "{"time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "", "time_publicity_start": "", "time_registration_end": "", "time_registration_start": "", "time_release": ""}" 234858920 234787082 2022-04-20 1650384000 E4404000001002779001001 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工开标记录表 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化施工开标记录表 1795743.68 101 0 6 "{"time_bidclose": "", "time_bidopen": "2022-04-20", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "", "time_publicity_start": "", "time_registration_end": "", "time_registration_start": "", "time_release": ""}" 234858920 235240618 2022-04-22 1650556800 E4404000001002779001001 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化施工 广东博思信息技术股份有限公司 1775136.23 101 0 12 "{"time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "2022-04-26", "time_publicity_start": "2022-04-24", "time_registration_end": "", "time_registration_start": "", "time_release": ""}" ''' for _s in tmp_s.split("\n"): ls = _s.split("\t") if len(ls)!=17: continue _confid = 1 if ls[14] =="" else ls[14] a.iterate(buffer,ls[1],ls[13],int(ls[3]),ls[8],ls[10],ls[11],ls[12],ls[7],ls[5],ls[4],_confid,ls[15],ls[16][1:-1]) # a.iterate(buffer,219957825,101,86400*4,"1","1","1","1","1","1","1",0,5,'{"time_bidclose": "", "time_bidopen": "2022-02-10", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "2022-02-21", "time_publicity_start": "2022-02-11", "time_registration_end": "", "time_registration_start": "", "time_release": ""}') # a.iterate(buffer,219957825,101,86400*4,"1","1","1","1","1","1","1",0,5,'{"time_bidclose": "", "time_bidopen": "2022-02-10", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "2022-02-21", "time_publicity_start": "2022-02-11", "time_registration_end": "", "time_registration_start": "", "time_release": ""}') # a.iterate(buffer,219957825,101,86400*4,"1","1","1","1","1","1","1",0,5,'{"time_bidclose": "", "time_bidopen": "2022-02-10", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "2022-02-22", "time_publicity_start": "2022-02-11", "time_registration_end": "", "time_registration_start": "", "time_release": ""}') print(a.terminate(buffer)) print(1)