123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- from odps.udf import annotate
- from odps.udf import BaseUDAF
- from odps.udf import BaseUDTF
- @annotate('string,string,string,string,bigint,datetime,string,string,string,string->string')
- class dumplicate(BaseUDAF):
- def __init__(self):
- import datetime
- import json
- import logging
- global datetime,json,logging,MyEncoder
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- class MyEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, bytes):
- return str(obj, encoding='utf-8')
- return json.JSONEncoder.default(self, obj)
- def new_buffer(self):
- return [[]]
- def iterate(self, buffer, company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city):
- logging.info(company_name)
- buffer[0].append([company_name.strip(),mobile_no,phone_no,contact_person,level,create_time.timestamp(),email,company_addr,province,city])
- logging.info(company_name)
- def merge(self, buffer, pbuffer):
- logging.info('-3=')
- buffer[0].extend(pbuffer[0])
- logging.info('-4=')
- def terminate(self, buffer):
- logging.info('-1=')
- buffer[0].sort(key=lambda x:x[5],reverse=True)
- company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city = buffer[0][0]
- logging.info("-2=")
- return json.dumps([company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city],cls=MyEncoder,ensure_ascii=False)
- @annotate("string->string,string,string,string,bigint,datetime,string,string,string,string")
- class liberate(BaseUDTF):
- def __init__(self):
- import json
- import time
- import logging
- import datetime
- # import sys
- # reload(sys)
- # sys.setdefaultencoding('utf8')
- global json,MyEncoder,logging,time,datetime
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- class MyEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, bytes):
- return str(obj, encoding='utf-8')
- return json.JSONEncoder.default(self, obj)
- def process(self, json_dumplicate):
- try:
- logging.info(json_dumplicate)
- json_dumplicate = json_dumplicate.replace("\\n","").replace('\\"','').replace("\\r","")
- company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city = json.loads(json_dumplicate)
- create_time = datetime.datetime.fromtimestamp(create_time)
- self.forward(company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city)
- except Exception as e:
- pass
- import re
- mobile_pattern = re.compile("^1\d{10}$")
- def recog_likeType(phone):
- if re.search(mobile_pattern,phone) is not None:
- return "mobile"
- else:
- return "phone"
- @annotate("string,string,string,string,string,string->string")
- class f_tojson_docuentContact(object):
- def __init__(self):
- import json
- global json
- def evaluate(self, tenderee,tenderee_contact,tenderee_phone,agency,agency_contact,agency_phone):
- list_contact = []
- if tenderee!="" and tenderee_contact!="" and tenderee_phone!='' and tenderee_phone is not None:
- _dict = {"company":tenderee,"contact_person":tenderee_contact,"level":20}
- if recog_likeType(tenderee_phone)=="mobile":
- _dict["mobile_no"] = tenderee_phone
- else:
- _dict["phone_no"] = tenderee_phone
- list_contact.append(_dict)
- if agency!="" and agency_contact!="" and agency_phone!='' and agency_phone is not None:
- _dict = {"company":agency,"contact_person":agency_contact,"level":20}
- if recog_likeType(agency_phone)=="mobile":
- _dict["mobile_no"] = agency_phone
- else:
- _dict["phone_no"] = agency_phone
- list_contact.append(_dict)
- return json.dumps(list_contact)
- @annotate("string->string,string,string,string,bigint,string")
- class f_liberate_contactJson(BaseUDTF):
- def __init__(self):
- import json
- import time
- import logging
- import datetime
- # import sys
- # reload(sys)
- # sys.setdefaultencoding('utf8')
- global json,MyEncoder,logging,time,datetime
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def process(self, json_contact):
- try:
- list_dict = json.loads(json_contact)
- for _dict in list_dict:
- company = _dict.get("company")
- contact_person = _dict.get("contact_person")
- mobile_no = _dict.get("mobile_no","")
- if mobile_no is None:
- mobile_no = ""
- phone_no = _dict.get("phone_no","")
- if phone_no is None:
- phone_no = ""
- else:
- phone_no = re.sub('[^0-9\-转]','',phone_no)
- if len(phone_no)<6:
- phone_no = ""
- level = _dict.get("level")
- mail = _dict.get("mail","")
- self.forward(company,contact_person,mobile_no,phone_no,level,mail)
- except Exception as e:
- logging.info(str(e))
- logging.info(json_contact)
- @annotate('string->bigint')
- class f_count_company(BaseUDAF):
- def __init__(self):
- import datetime
- import json
- import logging
- global datetime,json,logging,MyEncoder
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def new_buffer(self):
- return [set()]
- def iterate(self, buffer, company_name):
- buffer[0].add(company_name)
- def merge(self, buffer, pbuffer):
- buffer[0] |= pbuffer[0]
- def terminate(self, buffer):
- return len(buffer[0])
|