#coding:utf8 from odps.udf import annotate,BaseUDAF,BaseUDTF import logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') import json import traceback @annotate('string->string') class getYearMonth(object): def evaluate(self,page_time): if page_time is None: return "" return str(page_time[:7]) @annotate('double->string') class getMoneyRange(object): def evaluate(self,money): if money is None or money==0: return '等于0或空' elif money<10*10000: return '(0,10万)' elif money<100*10000: return '[10万,100万)' elif money<500*10000: return '[100万,500万)' elif money<1000*10000: return '[500万,1000万)' elif money<10000*10000: return '[1000万,1亿)' elif money<10*10000*10000: return '[1亿,10亿)' elif money<100*10000*10000: return '[10亿,100亿)' else: return '[100亿,500亿]' @annotate('string->bigint') class getdocidFromDocids(BaseUDTF): def process(self,docids): for docid in docids.split(","): self.forward(int(docid)) @annotate('string->string') class fixEnterpriseName(object): def __init__(self): import re global re def evaluate(self,name): new_name = re.sub("[#!!&@$'\s\*\"{};;]","",name) new_name = re.sub("amp|lt|bramp|gt|nbsp|br","",new_name) _s = re.search("\*+",name) if _s is not None: if _s.span()[1]-_s.span()[0]>=3: new_name = "" if len(new_name)<4: new_name = "" if new_name.find("有限公司")>=0 and len(new_name)<=7: new_name = "" return new_name @annotate('string->string') class removeCommonWord(object): def __init__(self): from AreaGet import AreaGet import re global re self.dict_area = AreaGet().getDict_area() _pattern = "" list_name = [] for k,v in self.dict_area.items(): _name = v.get("cname","") if _name!="": list_name.append(_name) _pattern = "|".join(list_name)+"|[省市区县]|有限|公司|股份|分公司|责任" self.pattern = re.compile(_pattern) def evaluate(self,name): return re.sub(self.pattern,"",name) @annotate("string->string,string,string,string,bigint,bigint") class dealEnterpriseCircle(BaseUDTF): def __init__(self): from AreaGet import AreaGet import re global re self.dict_area = AreaGet().getDict_area() set_area = set() for k,v in self.dict_area.items(): set_area.add(v.get("cname")) self.set_area = set_area def process(self,name): name = re.sub("\s+","",name) new_name = name.replace("(","(").replace(")",")") new_name = re.sub("\(+",'(',new_name) new_name = re.sub("\)+",')',new_name) bool_area = 0 bool_end = 0 circle = "" before = "" for _s in re.finditer("\(.+?\)",new_name): circle = new_name[_s.span()[0]:_s.span()[1]][1:-1] if _s.span()[1]>=len(new_name): bool_end = 1 before = new_name[:_s.span()[0]] if circle in self.set_area: bool_area = 1 else: bool_area = 0 self.forward(name,new_name,before,circle,bool_area,bool_end) @annotate('string->string') class f_turn_circle(object): def __init__(self): import re global re def evaluate(self,name): if name is not None: return name.replace("(","(").replace(")",")") else: return "" @annotate('string,string->string,bigint') class f_dumplicate_contacts(BaseUDTF): def __init__(self): pass def process(self,name,contacts): if contacts is None: self.forward(contacts,1) return try: list_contacts = json.loads(contacts) _set = set() _phone_set = set() new_list_contacts = [] list_contacts.sort(key=lambda x:len(x.get("contact_person","")),reverse=True) for _conta in list_contacts: contact_person = _conta.get("contact_person","") mobile_no = _conta.get("mobile_no","") phone_no = _conta.get("phone_no","") if contact_person=="" and (mobile_no in _phone_set or phone_no in _phone_set): continue _key = "%s-%s-%s"%(contact_person,mobile_no,phone_no) if _key in _set: continue if mobile_no!="": _phone_set.add(mobile_no) if phone_no!="": _phone_set.add(phone_no) new_list_contacts.append(_conta) _set.add(_key) if len(new_list_contacts)!=len(list_contacts): logging.info(name) new_list_contacts.sort(key=lambda x:x.get("level",0),reverse=True) self.forward(json.dumps(new_list_contacts,ensure_ascii=False),1) except Exception as e: traceback.print_exc() logging.info(contacts) self.forward(None,0)