#coding:UTF8 from odps.udf import annotate from odps.udf import BaseUDTF from odps.udf import BaseUDAF @annotate('string,string -> string,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint') class f_decode_extract(BaseUDTF): def __init__(self): import logging import json import time,re global json,logging,time,re self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*" logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self, extractjson,otherjson): if extractjson is not None: _extract = json.loads(extractjson) else: _extract = {} if otherjson is not None: _other = json.loads(otherjson) else: _other = {} project_code = "" project_name = "" tenderee = "" agency = "" win_tenderer = "" bidding_budget = "" win_bid_price = "" fingerprint = "" page_time_stamp = 0 docchannel = 0 extract_count = 0 page_time = _other.get("pageTime",time.strftime('%Y-%m-%d',time.localtime())) doctitle = _other.get("doctitle","") doctitle_refine = re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '', doctitle) area = _other.get("area","") province = _other.get("province","") city = _other.get("city","") district = _other.get("district","") web_source_no = _other.get("webSourceNo","") docchannel = _other.get("docchannel",0) if re.search(self.time_pattern,page_time) is not None: try: timeArray = time.strptime(page_time[:11], "%Y-%m-%d") page_time_stamp = int(time.mktime(timeArray)) except Exception as e: pass list_code = _extract.get("code",[]) if len(list_code)>0: project_code = list_code[0] project_name = _extract.get("name","") fingerprint = _extract.get("fingerprint","") dict_pack = _extract.get("prem",{}) logging.info(dict_pack) for _key in dict_pack.keys(): if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0: extract_count += 1 if bidding_budget=="": bidding_budget = str(float(dict_pack[_key]["tendereeMoney"])) for _role in dict_pack[_key]["roleList"]: extract_count += 1 if _role[2]!='' and float(_role[2])>0: extract_count += 1 if _role[0]=="tenderee": tenderee = _role[1] if _role[0]=="win_tenderer": if win_tenderer=="": win_tenderer = _role[1] if _role[2]!='' and float(_role[2])>0: extract_count += 1 if win_bid_price=="": win_bid_price = str(float(_role[2])) if _role[0]=="agency": agency = _role[1] if project_code!="": extract_count += 1 if project_name!="": extract_count += 1 logging.info(page_time+doctitle+doctitle_refine+area+province+city+ district+web_source_no+project_code+project_name+tenderee+agency+win_tenderer+bidding_budget+win_bid_price) self.forward(page_time,page_time_stamp,docchannel,doctitle,doctitle_refine,area,province,city, district,web_source_no,fingerprint,project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count) @annotate("string->bigint") class f_get_extractCount(object): def __init__(self): import time global time import logging import json import re global json,logging,re self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*" logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def evaluate(self, extractjson): if extractjson is not None: _extract = json.loads(extractjson) else: _extract = {} dict_pack = _extract.get("prem",{}) extract_count = 0 list_code = _extract.get("code",[]) if len(list_code)>0: project_code = list_code[0] else: project_code = "" project_name = _extract.get("name","") bidding_budget = "" win_tenderer = "" win_bid_price = "" for _key in dict_pack.keys(): if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0: extract_count += 1 if bidding_budget=="": bidding_budget = str(float(dict_pack[_key]["tendereeMoney"])) for _role in dict_pack[_key]["roleList"]: extract_count += 1 if _role[2]!='' and float(_role[2])>0: extract_count += 1 if _role[0]=="tenderee": tenderee = _role[1] if _role[0]=="win_tenderer": if win_tenderer=="": win_tenderer = _role[1] if _role[2]!='' and float(_role[2])>0: extract_count += 1 if win_bid_price=="": win_bid_price = str(float(_role[2])) if _role[0]=="agency": agency = _role[1] if project_code!="": extract_count += 1 if project_name!="": extract_count += 1 return extract_count @annotate('string,string,string,string,string -> string,string,string,bigint') class f_decode_sub_docs_json(BaseUDTF): def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self, project_code,project_name,tenderee,agency,sub_docs_json): columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""} extract_count = 0 if project_code is not None and project_code!="": extract_count += 1 if project_name is not None and project_name!="": extract_count += 1 if tenderee is not None and tenderee!="": extract_count += 1 if agency is not None and agency!="": extract_count += 1 if sub_docs_json is not None: for sub_docs in json.loads(sub_docs_json): for _key_sub_docs in sub_docs.keys(): extract_count += 1 if _key_sub_docs in columns: if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]: if _key_sub_docs in ["bidding_budget","win_bid_price"]: if float(sub_docs[_key_sub_docs])>0: columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs])) else: columns[_key_sub_docs] = str(sub_docs[_key_sub_docs]) self.forward(columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count) @annotate("string->bigint") class totimestamp(object): def __init__(self): import time global time import logging import json import re global json,logging,re self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*" logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def evaluate(self, str_time): try: logging.info(str_time) if str_time is not None and re.search(self.time_pattern,str_time) is not None: timeArray = time.strptime(str_time[:10], "%Y-%m-%d") timeStamp = int(time.mktime(timeArray)) return timeStamp else: return 0 except Exception as e: return 0 @annotate("string->string") class refind_name(object): def __init__(self): import logging import re global logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def evaluate(self, title): if title is not None: return re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|\[|\]|【|】', '', title) return "" @annotate('bigint,bigint,bigint,string,bigint,string->string') class f_set_docid(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> "" ''' def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,docid, page_time_stamp,extract_count,defind_column,defind_count,tenderee): buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count, "defind_column":defind_column,"defind_count":defind_count,"tenderee":tenderee}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_docs = buffer[0] list_docs.sort(key=lambda x:x["page_time_stamp"]) list_group = [] _begin = 0 defind_count = 0 if len(list_docs)>0: defind_count = list_docs[0]["defind_count"] for i in range(len(list_docs)-1): if abs(list_docs[i]["page_time_stamp"]-list_docs[i+1]["page_time_stamp"])<=86400*2: continue else: _group = [] _set_column = set() _set_tenderee = set() for j in range(_begin,i+1): if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="": _set_tenderee.add(list_docs[j]["tenderee"]) _set_column.add(list_docs[j]["defind_column"]) _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]}) if len(_group)>=3 and len(_set_tenderee)>1: pass else: if len(_group)>1: if defind_count==2: if len(_set_column)>=2: list_group.append(_group) elif defind_count==1: if len(_set_column)==1: list_group.append(_group) elif defind_count==0: list_group.append(_group) _begin = i+1 if len(list_docs)>1: _set_column = set() _set_tenderee = set() _group = [] for j in range(_begin,len(list_docs)): if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="": _set_tenderee.add(list_docs[j]["tenderee"]) _set_column.add(list_docs[j]["defind_column"]) _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]}) if len(_group)>=3 and len(_set_tenderee)>1: pass else: if len(_group)>1: if defind_count==2: if len(_set_column)>=2: list_group.append(_group) elif defind_count==1: if len(_set_column)==1: list_group.append(_group) elif defind_count==0: list_group.append(_group) return json.dumps(list_group) def isEmpty(_str): if _str is None or _str=="": return True return False @annotate('bigint,bigint,bigint,string,string,string,string,string,string,string,string->string') class f_set_docid_binaryChart(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> "" ''' def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,docid, page_time_stamp,extract_count,project_code,project_name,tenderee,bidding_budget,win_tenderer,win_bid_price,agency,web_source_no): buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count, "project_code":project_code,"project_name":project_name,"tenderee":tenderee, "bidding_budget":bidding_budget,"win_tenderer":win_tenderer,"win_bid_price":win_bid_price, "agency":agency,"web_source_no":web_source_no}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_docs = buffer[0] list_timeGroups = split_with_time(list_docs,"page_time_stamp",86400*2) list_group = [] empty_key = ["project_code","bidding_budget","win_tenderer","win_bid_price","agency"] for _timeGroups in list_timeGroups: list_empty = [] list_notEmpty = [] for _item in _timeGroups: empty_flag = True for _key in empty_key: if not isEmpty(_item[_key]): empty_flag = False break if empty_flag: list_empty.append(_item) else: list_notEmpty.append(_item) for _e in list_empty: _group = [{"docid":_e["docid"],"extract_count":_e["extract_count"]}] _e_tenderee = _e["tenderee"] for _ne in list_notEmpty: if "set_webSource" not in _ne: _ne["set_webSource"] = set() _ne["set_webSource"].add(_ne["web_source_no"]) _suit = False if not isEmpty(_e_tenderee) and _e_tenderee==_ne["tenderee"]: _suit = True elif isEmpty(_e_tenderee): _suit = True if _suit: if _e["web_source_no"] not in _ne["set_webSource"]: _ne["set_webSource"].add(_e["web_source_no"]) _group.append({"docid":_ne["docid"],"extract_count":_ne["extract_count"]}) break if len(_group)>1: list_group.append(_group) return json.dumps(list_group) def split_with_time(list_dict,sort_key,timedelta=86400*2): if len(list_dict)>0: if sort_key in list_dict[0]: list_dict.sort(key=lambda x:x[sort_key]) list_group = [] _begin = 0 for i in range(len(list_dict)-1): if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])1: list_group.append(_group) _begin = i + 1 if len(list_dict)>1: _group = [] for j in range(_begin,len(list_dict)): _group.append(list_dict[j]) if len(_group)>1: list_group.append(_group) return list_group return [list_dict] @annotate('bigint,bigint,bigint,string,string,string,string,string->string') class f_set_docid_limitNum_contain(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同 ''' def __init__(self): import logging import json,re global json,logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [list()] def iterate(self, buffer,docid,page_time_stamp,extract_count,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column): buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,"set_limit_column1":set_limit_column1, "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4, "contain_column":contain_column}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_split = split_with_time(buffer[0],"page_time_stamp") list_group = [] for _split in list_split: flag = True keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"] for _key in keys: logging.info(_key+str(getSet(_split,_key))) if len(getSet(_split,_key))>1: flag = False break MAX_CONTAIN_COLUMN = None #判断组内每条公告是否包含 if flag: for _d in _split: contain_column = _d["contain_column"] if contain_column is not None and contain_column !="": if MAX_CONTAIN_COLUMN is None: MAX_CONTAIN_COLUMN = contain_column else: if len(MAX_CONTAIN_COLUMN)1: _group = [] for _item in _split: _group.append({"docid":_item["docid"],"extract_count":_item["extract_count"]}) list_group.append(_group) return json.dumps(list_group) @annotate('bigint->string') class f_stamp_squence(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> "" ''' def __init__(self): import json global json import logging global logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [set()] def iterate(self, buffer,page_time_stamp): buffer[0].add(page_time_stamp) def merge(self, buffer, pbuffer): buffer[0] |= pbuffer[0] def terminate(self, buffer): if 0 in buffer[0]: buffer[0].remove(0) list_stamp = list(buffer[0]) list_stamp.sort(key=lambda x:x) list_stamp_final = [] _begin = 0 _time_decase = 86400*2 logging.info(str(list_stamp)) for _index in range(len(list_stamp)-1): if list_stamp[_index+1]-list_stamp[_index]<_time_decase: continue else: list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[_index]+_time_decase]) _begin = _index+1 if len(list_stamp)>0: list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[-1]+_time_decase]) return json.dumps(list_stamp_final) @annotate("bigint,string->bigint") class in_stamp(object): def __init__(self): import logging import re import json global logging,re,json logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def evaluate(self, page_time_stamp,json_stamp): list_stamp = json.loads(json_stamp) int_flag = 0 for item in list_stamp: if page_time_stamp item[0] and page_time_stamp=1 and rule_id <30: return 20 else: return 10 @annotate('string,bigint -> bigint,bigint,bigint,bigint,bigint') class f_split_group_single(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self, json_set_docid,rule_id): list_group = json.loads(json_set_docid) for item in list_group: if len(item)>100: item.sort(key=lambda x:x["docid"],reverse=True) index_i = 0 for index_j in range(1,len(item)): if item[index_i]["docid"]!=item[index_j]["docid"]: self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id)) else: for index_i in range(len(item)): for index_j in range(len(item)): if index_i!=index_j and item[index_i]["docid"]!=item[index_j]["docid"]: self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id)) @annotate('bigint,string->string') class group_document(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> "" ''' def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,id,json_set_docid): buffer[0].append({"id":id,"json_set_docid":json.loads(json_set_docid)}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): return json.dumps(buffer[0]) @annotate('bigint,string,bigint,string -> bigint,bigint,string') class decare_document(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,group_id1, json_list_doc1,group_id2,json_list_doc2): #y=x,少掉近一半的数据 if group_id1>=group_id2: list_doc1 = json.loads(json_list_doc1) list_doc2 = json.loads(json_list_doc2) for _doc1 in list_doc1: for _doc2 in list_doc2: #同一个重复group不做判断 if _doc1["id"]!=_doc2["id"]: #判断两个group是否有重复 _set1 = set() for _item1 in _doc1["json_set_docid"]: _set1.add(_item1["docid"]) _set2 = set() for _item2 in _doc2["json_set_docid"]: _set2.add(_item2["docid"]) if len(_set1&_set2)>0: new_json_set_docid = _doc1["json_set_docid"] for _item2 in _doc2["json_set_docid"]: if _item2["docid"] not in _set1: new_json_set_docid.append(_item2) self.forward(_doc1["id"],_doc2["id"],json.dumps(new_json_set_docid)) def getBestDocid(list_pair): list_pair.sort(key=lambda x:x[3],reverse=True) _max_count = max(list_pair[0][3],list_pair[0][1]) set_candidate = set() if list_pair[0][1]==_max_count: set_candidate.add(list_pair[0][0]) for item in list_pair: if item[3]==_max_count: set_candidate.add(item[2]) else: break list_candidate = list(set_candidate) list_candidate.sort(key=lambda x:x) return list_candidate[0] @annotate('bigint,bigint,bigint,bigint->string') class choose_document(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> "" ''' def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2): buffer[0].append([docid1,extract_count1,docid2,extract_count2]) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_pair = buffer[0] _set = set() for item in buffer[0]: _set.add(str(item[2])) list_dumplicate = list(_set) best_docid = getBestDocid(list_pair) if best_docid==list_pair[0][0]: save_flag = 1 else: save_flag = 0 return json.dumps({"save_flag":save_flag,"dumplicates":list_dumplicate}) @annotate('string -> bigint,string') class f_get_choose_document(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,json_choose): if json_choose is None: self.forward(1,None) else: _choose = json.loads(json_choose) self.forward(_choose["save_flag"],",".join(_choose["dumplicates"])) @annotate('bigint,bigint,bigint,bigint->string') class group_document_bestFirst(BaseUDAF): ''' 将组里面最优的放在前面 ''' def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2): buffer[0].append([docid1,extract_count1,docid2,extract_count2]) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_pair = buffer[0] _set = set() for item in buffer[0]: _set.add(item[2]) _set.add(list_pair[0][0]) best_docid = getBestDocid(list_pair) _set.remove(best_docid) list_dumplicate = list(_set) list_dumplicate.sort(key=lambda x:x) list_dumplicate.insert(0,best_docid) list_dumplicate_str = [] for item in list_dumplicate: list_dumplicate_str.append(str(item)) return ",".join(list_dumplicate_str) @annotate('string -> bigint,string') class f_get_best_dumplicates(BaseUDTF): ''' 得到每个分组中最优的那一条及其重复记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,list_dumplicate_str): if list_dumplicate_str is None: pass else: list_dumplicate = list_dumplicate_str.split(",") if len(list_dumplicate)>0: self.forward(int(list_dumplicate[0]),",".join(list_dumplicate[1:])) else: pass @annotate('bigint,bigint->string') class bridge2group(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> "" ''' def __init__(self): import json global json def new_buffer(self): return [set()] def iterate(self, buffer,docid1,docid2): buffer[0].add(docid1) buffer[0].add(docid2) def merge(self, buffer, pbuffer): buffer[0] |= pbuffer[0] def terminate(self, buffer): list_pair = list(buffer[0]) list_pair.sort(key=lambda x:x,reverse=True) return json.dumps(list_pair) @annotate('string -> bigint,bigint') class group2bridge(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,json_list_docid): list_docid = json.loads(json_list_docid) for _docid in list_docid: self.forward(list_docid[-1],_docid) @annotate('bigint,bigint,string -> bigint') class f_get_dump_docid(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,docid,save_flag,dumplicates): if save_flag==0: self.forward(docid) if dumplicates is not None: list_docid = dumplicates.split(",") if len(list_docid)>0: for _docid in list_docid[1:]: self.forward(int(_docid)) else: if dumplicates is not None: list_docid = dumplicates.split(",") if len(list_docid)>0: for _docid in list_docid: self.forward(int(_docid)) @annotate('string -> bigint,bigint') class f_get_docid(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,json_set_docid): team_id = 0 if json_set_docid is not None: list_docses = json.loads(json_set_docid) for list_docs in list_docses: team_id += 1 for item in list_docs: self.forward(team_id,item["docid"]) @annotate("string->bigint") class get_count_dump(object): def __init__(self): import logging import re global logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def evaluate(self, title): _count = 0 if title is not None: _count = len(title.split(",")) return _count def getSet(list_dict,key): _set = set() for item in list_dict: if key in item: if item[key]!='' and item[key] is not None: if re.search("^\d[\d\.]*$",item[key]) is not None: _set.add(str(float(item[key]))) else: _set.add(str(item[key])) return _set def getDiffIndex(list_dict,key,confidence=100): _set = set() for _i in range(len(list_dict)): item = list_dict[_i] if item["confidence"]>=confidence: continue if key in item: if item[key]!='' and item[key] is not None: if re.search("^\d[\d\.]*$",item[key]) is not None: _set.add(str(float(item[key]))) else: _set.add(str(item[key])) if len(_set)>1: return _i return len(list_dict) @annotate('bigint,string -> bigint,bigint') class f_getGroup_dumpFinal(BaseUDTF): ''' 从最后的结果中获取组 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,docid,dumplicates): self.forward(int(docid),int(docid)) if dumplicates is not None: list_docids = dumplicates.split(",") for _docid in list_docids: self.forward(int(docid),int(_docid)) @annotate('bigint,bigint,string,string,string,string,bigint,bigint,bigint->string') class f_redump_limit_num(BaseUDAF): ''' 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值 ''' def __init__(self): import logging import json,re global json,logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [list()] def iterate(self, buffer,main_docid,docid,doctitle,set_limit_column2,set_limit_column3,set_limit_column4,extract_count1,extract_count2,confidence): buffer[0].append({"main_docid":main_docid,"docid":docid,"doctitle":doctitle,"set_limit_column2":set_limit_column2, "set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,"extract_count1":extract_count1, "extract_count2":extract_count2,"confidence":confidence}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_group = [] the_group = buffer[0] the_group.sort(key=lambda x:x["confidence"],reverse=True) if len(the_group)>5: keys = ["doctitle","set_limit_column2","set_limit_column3","set_limit_column4"] else: keys = ["set_limit_column2","set_limit_column3","set_limit_column4"] final_group = [] #置信度 list_key_index = [] for _k in keys: if _k=="doctitle": list_key_index.append(getDiffIndex(the_group,_k,confidence=30)) else: list_key_index.append(getDiffIndex(the_group,_k)) _index = min(list_key_index) if _index>1: main_docid = the_group[0]["main_docid"] for item in the_group[:_index]: if item["docid"]!=main_docid: final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"],"confidence":item["confidence"]}) # stay = True # for _key in keys: # if len(getSet(the_group,_key))>1: # stay = False # break # # if stay: # main_docid = the_group[0]["main_docid"] # for item in the_group: # if item["docid"]!=main_docid: # final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"]}) return json.dumps(final_group) @annotate('string -> bigint,bigint,bigint,bigint,bigint') class f_get_dumpFinal_checked(BaseUDTF): ''' 从最后的结果中获取组 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,list_group): if list_group is not None: final_group = json.loads(list_group) for _group in final_group: self.forward(_group["docid1"],_group["docid2"],_group["extract_count1"],_group["extract_count2"],_group["confidence"]) @annotate('string -> bigint') class f_getDumplicateDocids(BaseUDTF): ''' 从最后的结果中获取组 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,dumplicates): list_docids = dumplicates.split(",") for _d in list_docids: self.forward(int(_d))