from odps.udf import annotate from odps.udf import BaseUDAF from odps.udf import BaseUDTF @annotate('string,string,string,string,bigint,datetime,string,string,string,string->string') class dumplicate(BaseUDAF): def __init__(self): import datetime import json import logging global datetime,json,logging,MyEncoder logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') class MyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, bytes): return str(obj, encoding='utf-8') return json.JSONEncoder.default(self, obj) def new_buffer(self): return [[]] def iterate(self, buffer, company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city): logging.info(company_name) buffer[0].append([company_name.strip(),mobile_no,phone_no,contact_person,level,create_time.timestamp(),email,company_addr,province,city]) logging.info(company_name) def merge(self, buffer, pbuffer): logging.info('-3=') buffer[0].extend(pbuffer[0]) logging.info('-4=') def terminate(self, buffer): logging.info('-1=') buffer[0].sort(key=lambda x:x[5],reverse=True) company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city = buffer[0][0] logging.info("-2=") return json.dumps([company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city],cls=MyEncoder,ensure_ascii=False) @annotate("string->string,string,string,string,bigint,datetime,string,string,string,string") class liberate(BaseUDTF): def __init__(self): import json import time import logging import datetime # import sys # reload(sys) # sys.setdefaultencoding('utf8') global json,MyEncoder,logging,time,datetime logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') class MyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, bytes): return str(obj, encoding='utf-8') return json.JSONEncoder.default(self, obj) def process(self, json_dumplicate): try: logging.info(json_dumplicate) json_dumplicate = json_dumplicate.replace("\\n","").replace('\\"','').replace("\\r","") company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city = json.loads(json_dumplicate) create_time = datetime.datetime.fromtimestamp(create_time) self.forward(company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city) except Exception as e: pass import re mobile_pattern = re.compile("^1\d{10}$") def recog_likeType(phone): if re.search(mobile_pattern,phone) is not None: return "mobile" else: return "phone" @annotate("string,string,string,string,string,string->string") class f_tojson_docuentContact(object): def __init__(self): import json global json def evaluate(self, tenderee,tenderee_contact,tenderee_phone,agency,agency_contact,agency_phone): list_contact = [] if tenderee!="" and tenderee_contact!="" and tenderee_phone!='' and tenderee_phone is not None: _dict = {"company":tenderee,"contact_person":tenderee_contact,"level":20} if recog_likeType(tenderee_phone)=="mobile": _dict["mobile_no"] = tenderee_phone else: _dict["phone_no"] = tenderee_phone list_contact.append(_dict) if agency!="" and agency_contact!="" and agency_phone!='' and agency_phone is not None: _dict = {"company":agency,"contact_person":agency_contact,"level":20} if recog_likeType(agency_phone)=="mobile": _dict["mobile_no"] = agency_phone else: _dict["phone_no"] = agency_phone list_contact.append(_dict) return json.dumps(list_contact) @annotate("string->string,string,string,string,bigint,string") class f_liberate_contactJson(BaseUDTF): def __init__(self): import json import time import logging import datetime # import sys # reload(sys) # sys.setdefaultencoding('utf8') global json,MyEncoder,logging,time,datetime logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self, json_contact): try: list_dict = json.loads(json_contact) for _dict in list_dict: company = _dict.get("company") contact_person = _dict.get("contact_person") mobile_no = _dict.get("mobile_no","") if mobile_no is None: mobile_no = "" phone_no = _dict.get("phone_no","") if phone_no is None: phone_no = "" else: phone_no = re.sub('[^0-9\-转]','',phone_no) if len(phone_no)<6: phone_no = "" level = _dict.get("level") mail = _dict.get("mail","") self.forward(company,contact_person,mobile_no,phone_no,level,mail) except Exception as e: logging.info(str(e)) logging.info(json_contact) @annotate('string->bigint') class f_count_company(BaseUDAF): def __init__(self): import datetime import json import logging global datetime,json,logging,MyEncoder logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [set()] def iterate(self, buffer, company_name): buffer[0].add(company_name) def merge(self, buffer, pbuffer): buffer[0] |= pbuffer[0] def terminate(self, buffer): return len(buffer[0])