123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- #coding:utf8
- from odps.udf import annotate,BaseUDAF,BaseUDTF
- import logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- import json
- import traceback
- @annotate('string->string')
- class getYearMonth(object):
- def evaluate(self,page_time):
- if page_time is None:
- return ""
- return str(page_time[:7])
- @annotate('double->string')
- class getMoneyRange(object):
- def evaluate(self,money):
- if money is None or money==0:
- return '等于0或空'
- elif money<10*10000:
- return '(0,10万)'
- elif money<100*10000:
- return '[10万,100万)'
- elif money<500*10000:
- return '[100万,500万)'
- elif money<1000*10000:
- return '[500万,1000万)'
- elif money<10000*10000:
- return '[1000万,1亿)'
- elif money<10*10000*10000:
- return '[1亿,10亿)'
- elif money<100*10000*10000:
- return '[10亿,100亿)'
- else:
- return '[100亿,500亿]'
- @annotate('string->bigint')
- class getdocidFromDocids(BaseUDTF):
- def process(self,docids):
- for docid in docids.split(","):
- self.forward(int(docid))
- @annotate('string->string')
- class fixEnterpriseName(object):
- def __init__(self):
- import re
- global re
- def evaluate(self,name):
- new_name = re.sub("[#!!&@$'\s\*\"{};;]","",name)
- new_name = re.sub("amp|lt|bramp|gt|nbsp|br","",new_name)
- _s = re.search("\*+",name)
- if _s is not None:
- if _s.span()[1]-_s.span()[0]>=3:
- new_name = ""
- if len(new_name)<4:
- new_name = ""
- if new_name.find("有限公司")>=0 and len(new_name)<=7:
- new_name = ""
- return new_name
- @annotate('string->string')
- class removeCommonWord(object):
- def __init__(self):
- from AreaGet import AreaGet
- import re
- global re
- self.dict_area = AreaGet().getDict_area()
- _pattern = ""
- list_name = []
- for k,v in self.dict_area.items():
- _name = v.get("cname","")
- if _name!="":
- list_name.append(_name)
- _pattern = "|".join(list_name)+"|[省市区县]|有限|公司|股份|分公司|责任"
- self.pattern = re.compile(_pattern)
- def evaluate(self,name):
- return re.sub(self.pattern,"",name)
- @annotate("string->string,string,string,string,bigint,bigint")
- class dealEnterpriseCircle(BaseUDTF):
- def __init__(self):
- from AreaGet import AreaGet
- import re
- global re
- self.dict_area = AreaGet().getDict_area()
- set_area = set()
- for k,v in self.dict_area.items():
- set_area.add(v.get("cname"))
- self.set_area = set_area
- def process(self,name):
- name = re.sub("\s+","",name)
- new_name = name.replace("(","(").replace(")",")")
- new_name = re.sub("\(+",'(',new_name)
- new_name = re.sub("\)+",')',new_name)
- bool_area = 0
- bool_end = 0
- circle = ""
- before = ""
- for _s in re.finditer("\(.+?\)",new_name):
- circle = new_name[_s.span()[0]:_s.span()[1]][1:-1]
- if _s.span()[1]>=len(new_name):
- bool_end = 1
- before = new_name[:_s.span()[0]]
- if circle in self.set_area:
- bool_area = 1
- else:
- bool_area = 0
- self.forward(name,new_name,before,circle,bool_area,bool_end)
- @annotate('string->string')
- class f_turn_circle(object):
- def __init__(self):
- import re
- global re
- def evaluate(self,name):
- if name is not None:
- return name.replace("(","(").replace(")",")")
- else:
- return ""
- @annotate('string,string->string,bigint')
- class f_dumplicate_contacts(BaseUDTF):
- def __init__(self):
- pass
- def process(self,name,contacts):
- if contacts is None:
- self.forward(contacts,1)
- return
- try:
- list_contacts = json.loads(contacts)
- _set = set()
- _phone_set = set()
- new_list_contacts = []
- list_contacts.sort(key=lambda x:len(x.get("contact_person","")),reverse=True)
- for _conta in list_contacts:
- contact_person = _conta.get("contact_person","")
- mobile_no = _conta.get("mobile_no","")
- phone_no = _conta.get("phone_no","")
- if contact_person=="" and (mobile_no in _phone_set or phone_no in _phone_set):
- continue
- _key = "%s-%s-%s"%(contact_person,mobile_no,phone_no)
- if _key in _set:
- continue
- if mobile_no!="":
- _phone_set.add(mobile_no)
- if phone_no!="":
- _phone_set.add(phone_no)
- new_list_contacts.append(_conta)
- _set.add(_key)
- if len(new_list_contacts)!=len(list_contacts):
- logging.info(name)
- new_list_contacts.sort(key=lambda x:x.get("level",0),reverse=True)
- self.forward(json.dumps(new_list_contacts,ensure_ascii=False),1)
- except Exception as e:
- traceback.print_exc()
- logging.info(contacts)
- self.forward(None,0)
|