123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- #coding:UTF8
- from odps.udf import annotate
- from odps.udf import BaseUDAF
- from odps.udf import BaseUDTF
- def getSet(list_dict,key):
- _set = set()
- for item in list_dict:
- if key in item:
- if item[key]!='' and item[key] is not None:
- if re.search("^[\d\.]+$",item[key]) is not None:
- _set.add(str(float(item[key])))
- else:
- _set.add(str(item[key]))
- return _set
- def split_with_time(list_dict,sort_key,timedelta=86400*120):
- if len(list_dict)>0:
- if sort_key in list_dict[0]:
- list_dict.sort(key=lambda x:x[sort_key])
- list_group = []
- _begin = 0
- for i in range(len(list_dict)-1):
- if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])<timedelta:
- continue
- else:
- _group = []
- for j in range(_begin,i+1):
- _group.append(list_dict[j])
- if len(_group)>1:
- list_group.append(_group)
- _begin = i + 1
- if len(list_dict)>1:
- _group = []
- for j in range(_begin,len(list_dict)):
- _group.append(list_dict[j])
- if len(_group)>1:
- list_group.append(_group)
- return list_group
- return [list_dict]
- @annotate('bigint,bigint,string,string,string,string,string,string,bigint->string')
- class f_merge_rule_limit_num_contain_greater(BaseUDAF):
- '''
- 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
- '''
- def __init__(self):
- import logging
- import json,re
- global json,logging,re
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def new_buffer(self):
- return [list()]
- def iterate(self, buffer,docid,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column,greater_column,MAX_NUM):
- buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1,
- "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
- "contain_column":contain_column,"greater_column":greater_column,"MAX_NUM":MAX_NUM})
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- def terminate(self, buffer):
- MAX_NUM = 5
- if len(buffer[0])>0:
- MAX_NUM = buffer[0][0]["MAX_NUM"]
- list_split = split_with_time(buffer[0],"page_time_stamp")
- list_group = []
- for _split in list_split:
- flag = True
- keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
- dict_set = {}
- for _key in keys:
- dict_set[_key] = set()
- if len(_split)>MAX_NUM:
- flag = False
- else:
- for _key in keys:
- logging.info(_key+str(getSet(_split,_key)))
- if len(getSet(_split,_key))>1:
- flag = False
- break
- MAX_CONTAIN_COLUMN = None
- #判断组内每条公告是否包含
- if flag:
- for _d in _split:
- contain_column = _d["contain_column"]
- if contain_column is not None and contain_column !="":
- if MAX_CONTAIN_COLUMN is None:
- MAX_CONTAIN_COLUMN = contain_column
- else:
- if len(MAX_CONTAIN_COLUMN)<len(contain_column):
- if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
- flag = False
- break
- MAX_CONTAIN_COLUMN = contain_column
- else:
- if MAX_CONTAIN_COLUMN.find("project_name")==-1:
- flag = False
- break
- if len(getSet(_split,"greater_column"))==1:
- flag = False
- break
- if flag:
- _set_docid = set()
- for item in _split:
- _set_docid.add(item["docid"])
- if len(_set_docid)>1:
- list_group.append(list(_set_docid))
- return json.dumps(list_group)
- @annotate('bigint,string,string,string,string,string,string->string')
- class f_remege_limit_num_contain(BaseUDAF):
- '''
- 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
- '''
- def __init__(self):
- import logging
- import json,re
- global json,logging,re
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def new_buffer(self):
- return [list()]
- def iterate(self, buffer,docid,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column1,contain_column2):
- buffer[0].append({"docid":docid,"set_limit_column1":set_limit_column1,
- "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
- "contain_column1":contain_column1,"contain_column2":contain_column2})
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- def terminate(self, buffer):
- list_group = []
- the_group = buffer[0]
- keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
- re_merge = False
- for _key in keys:
- if len(getSet(the_group,_key))>1:
- re_merge = True
- break
- contain_keys = ["contain_column1","contain_column2"]
- logging.info(the_group)
- if re_merge:
- dict_docid_doc = {}
- for _doc in the_group:
- dict_docid_doc[_doc["docid"]] = _doc
- for _doc in the_group:
- merge_flag = False
- for _index in range(len(list_group)):
- _g = list_group[_index]
- hit_count = 0
- dict_temp = dict()
- for _c_key in contain_keys:
- if _g[_c_key] is not None and _doc[_c_key] is not None:
- if len(_g[_c_key])>len(_doc[_c_key]):
- if str(_g[_c_key]).find(str(_doc[_c_key]))>=0:
- dict_temp[_c_key] = _g[_c_key]
- hit_count += 1
- else:
- if str(_doc[_c_key]).find(str(_g[_c_key]))>=0:
- dict_temp[_c_key] = _doc[_c_key]
- _g[_c_key] = _doc[_c_key]
- hit_count += 1
- logging.info(_doc["docid"])
- logging.info(_g)
- logging.info(str(hit_count))
- if hit_count==len(contain_keys):
- for _c_key in contain_keys:
- _g[_c_key] = dict_temp[_c_key]
- _g["docid"].append(_doc["docid"])
- merge_flag = True
- break
- if not merge_flag:
- _dict = dict()
- _dict["docid"] = [_doc["docid"]]
- for _c_key in contain_keys:
- _dict[_c_key] = _doc[_c_key]
- list_group.append(_dict)
- final_group = []
- #判断是否符合一个值
- for _group in list_group:
- _split = []
- for _docid in _group["docid"]:
- _split.append(dict_docid_doc[_docid])
- _flag = True
- for _key in keys:
- if len(getSet(_split,_key))>1:
- _flag = False
- break
- if not _flag:
- for _docid in _group["docid"]:
- final_group.append([_docid])
- else:
- final_group.append(list(set(_group["docid"])))
- else:
- final_group = [list(set([item["docid"] for item in the_group]))]
- return json.dumps(final_group)
- @annotate('string -> string')
- class f_get_remerge_group(BaseUDTF):
- '''
- 将多个组拆解成多条记录
- '''
- def __init__(self):
- import logging
- import json
- global json,logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def process(self,json_remerge):
- if json_remerge is not None:
- list_group = json.loads(json_remerge)
- for _group in list_group:
- l_g = list(set(_group))
- list_docid = [str(_docid) for _docid in l_g]
- self.forward(",".join(list_docid))
- @annotate('string -> bigint,bigint')
- class f_check_remerge(BaseUDTF):
- '''
- 将多个组拆解成多条记录
- '''
- def __init__(self):
- import logging
- import json
- global json,logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def process(self,json_remerge):
- if json_remerge is not None:
- list_group = json.loads(json_remerge)
- for _group in list_group:
- for _docid in _group:
- self.forward(_group[-1],_docid)
- @annotate('string -> bigint,bigint')
- class f_arrange_group_single(BaseUDTF):
- '''
- 将多个组拆解成多条记录
- '''
- def __init__(self):
- import logging
- import json
- global json,logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def process(self,json_set_docid):
- if json_set_docid is not None:
- list_group = json.loads(json_set_docid)
- for _group in list_group:
- for index_i in range(len(_group)):
- for index_j in range(len(_group)):
- # if index_i!=index_j and _group[index_i]!=_group[index_j]:
- if index_i!=index_j:
- self.forward(_group[index_i],_group[index_j])
- @annotate('bigint,bigint->string')
- class f_get_merge_docids(BaseUDAF):
- '''
- 合并组为一条记录
- '''
- def __init__(self):
- import json
- global json
- def new_buffer(self):
- return [set()]
- def iterate(self, buffer,docid1,docid2):
- buffer[0].add(docid1)
- buffer[0].add(docid2)
- def merge(self, buffer, pbuffer):
- buffer[0] |= pbuffer[0]
- def terminate(self, buffer):
- set_docid = buffer[0]
- list_docid = list(set_docid)
- list_docid.sort(key=lambda x:x)
- list_docid_str = []
- for _docid in list_docid:
- list_docid_str.append(str(_docid))
- return ",".join(list_docid_str)
|