#coding:UTF8 from odps.udf import annotate from odps.udf import BaseUDAF from odps.udf import BaseUDTF def getSet(list_dict,key): _set = set() for item in list_dict: if key in item: if item[key]!='' and item[key] is not None: if re.search("^[\d\.]+$",item[key]) is not None: _set.add(str(float(item[key]))) else: _set.add(str(item[key])) return _set def split_with_time(list_dict,sort_key,timedelta=86400*120): if len(list_dict)>0: if sort_key in list_dict[0]: list_dict.sort(key=lambda x:x[sort_key]) list_group = [] _begin = 0 for i in range(len(list_dict)-1): if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])1: list_group.append(_group) _begin = i + 1 if len(list_dict)>1: _group = [] for j in range(_begin,len(list_dict)): _group.append(list_dict[j]) if len(_group)>1: list_group.append(_group) return list_group return [list_dict] @annotate('bigint,bigint,string,string,string,string,string,string,bigint->string') class f_merge_rule_limit_num_contain_greater(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同 ''' def __init__(self): import logging import json,re global json,logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [list()] def iterate(self, buffer,docid,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column,greater_column,MAX_NUM): buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1, "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4, "contain_column":contain_column,"greater_column":greater_column,"MAX_NUM":MAX_NUM}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): MAX_NUM = 5 if len(buffer[0])>0: MAX_NUM = buffer[0][0]["MAX_NUM"] list_split = split_with_time(buffer[0],"page_time_stamp") list_group = [] for _split in list_split: flag = True keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"] dict_set = {} for _key in keys: dict_set[_key] = set() if len(_split)>MAX_NUM: flag = False else: for _key in keys: logging.info(_key+str(getSet(_split,_key))) if len(getSet(_split,_key))>1: flag = False break MAX_CONTAIN_COLUMN = None #判断组内每条公告是否包含 if flag: for _d in _split: contain_column = _d["contain_column"] if contain_column is not None and contain_column !="": if MAX_CONTAIN_COLUMN is None: MAX_CONTAIN_COLUMN = contain_column else: if len(MAX_CONTAIN_COLUMN)1: list_group.append(list(_set_docid)) return json.dumps(list_group) @annotate('bigint,string,string,string,string,string,string->string') class f_remege_limit_num_contain(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同 ''' def __init__(self): import logging import json,re global json,logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [list()] def iterate(self, buffer,docid,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column1,contain_column2): buffer[0].append({"docid":docid,"set_limit_column1":set_limit_column1, "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4, "contain_column1":contain_column1,"contain_column2":contain_column2}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_group = [] the_group = buffer[0] keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"] re_merge = False for _key in keys: if len(getSet(the_group,_key))>1: re_merge = True break contain_keys = ["contain_column1","contain_column2"] logging.info(the_group) if re_merge: dict_docid_doc = {} for _doc in the_group: dict_docid_doc[_doc["docid"]] = _doc for _doc in the_group: merge_flag = False for _index in range(len(list_group)): _g = list_group[_index] hit_count = 0 dict_temp = dict() for _c_key in contain_keys: if _g[_c_key] is not None and _doc[_c_key] is not None: if len(_g[_c_key])>len(_doc[_c_key]): if str(_g[_c_key]).find(str(_doc[_c_key]))>=0: dict_temp[_c_key] = _g[_c_key] hit_count += 1 else: if str(_doc[_c_key]).find(str(_g[_c_key]))>=0: dict_temp[_c_key] = _doc[_c_key] _g[_c_key] = _doc[_c_key] hit_count += 1 logging.info(_doc["docid"]) logging.info(_g) logging.info(str(hit_count)) if hit_count==len(contain_keys): for _c_key in contain_keys: _g[_c_key] = dict_temp[_c_key] _g["docid"].append(_doc["docid"]) merge_flag = True break if not merge_flag: _dict = dict() _dict["docid"] = [_doc["docid"]] for _c_key in contain_keys: _dict[_c_key] = _doc[_c_key] list_group.append(_dict) final_group = [] #判断是否符合一个值 for _group in list_group: _split = [] for _docid in _group["docid"]: _split.append(dict_docid_doc[_docid]) _flag = True for _key in keys: if len(getSet(_split,_key))>1: _flag = False break if not _flag: for _docid in _group["docid"]: final_group.append([_docid]) else: final_group.append(list(set(_group["docid"]))) else: final_group = [list(set([item["docid"] for item in the_group]))] return json.dumps(final_group) @annotate('string -> string') class f_get_remerge_group(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,json_remerge): if json_remerge is not None: list_group = json.loads(json_remerge) for _group in list_group: l_g = list(set(_group)) list_docid = [str(_docid) for _docid in l_g] self.forward(",".join(list_docid)) @annotate('string -> bigint,bigint') class f_check_remerge(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,json_remerge): if json_remerge is not None: list_group = json.loads(json_remerge) for _group in list_group: for _docid in _group: self.forward(_group[-1],_docid) @annotate('string -> bigint,bigint') class f_arrange_group_single(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,json_set_docid): if json_set_docid is not None: list_group = json.loads(json_set_docid) for _group in list_group: for index_i in range(len(_group)): for index_j in range(len(_group)): # if index_i!=index_j and _group[index_i]!=_group[index_j]: if index_i!=index_j: self.forward(_group[index_i],_group[index_j]) @annotate('bigint,bigint->string') class f_get_merge_docids(BaseUDAF): ''' 合并组为一条记录 ''' def __init__(self): import json global json def new_buffer(self): return [set()] def iterate(self, buffer,docid1,docid2): buffer[0].add(docid1) buffer[0].add(docid2) def merge(self, buffer, pbuffer): buffer[0] |= pbuffer[0] def terminate(self, buffer): set_docid = buffer[0] list_docid = list(set_docid) list_docid.sort(key=lambda x:x) list_docid_str = [] for _docid in list_docid: list_docid_str.append(str(_docid)) return ",".join(list_docid_str)