|
@@ -0,0 +1,169 @@
|
|
|
+from odps.udf import annotate
|
|
|
+from odps.udf import BaseUDAF
|
|
|
+from odps.udf import BaseUDTF
|
|
|
+
|
|
|
+@annotate('string,string,string,string,bigint,datetime,string,string,string,string->string')
|
|
|
+class dumplicate(BaseUDAF):
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ import datetime
|
|
|
+ import json
|
|
|
+ import logging
|
|
|
+ global datetime,json,logging,MyEncoder
|
|
|
+
|
|
|
+ logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
+ class MyEncoder(json.JSONEncoder):
|
|
|
+
|
|
|
+ def default(self, obj):
|
|
|
+ if isinstance(obj, bytes):
|
|
|
+ return str(obj, encoding='utf-8')
|
|
|
+ return json.JSONEncoder.default(self, obj)
|
|
|
+
|
|
|
+ def new_buffer(self):
|
|
|
+ return [[]]
|
|
|
+
|
|
|
+ def iterate(self, buffer, company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city):
|
|
|
+ logging.info(company_name)
|
|
|
+ buffer[0].append([company_name.strip(),mobile_no,phone_no,contact_person,level,create_time.timestamp(),email,company_addr,province,city])
|
|
|
+ logging.info(company_name)
|
|
|
+
|
|
|
+ def merge(self, buffer, pbuffer):
|
|
|
+ logging.info('-3=')
|
|
|
+ buffer[0].extend(pbuffer[0])
|
|
|
+ logging.info('-4=')
|
|
|
+
|
|
|
+ def terminate(self, buffer):
|
|
|
+ logging.info('-1=')
|
|
|
+ buffer[0].sort(key=lambda x:x[5],reverse=True)
|
|
|
+ company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city = buffer[0][0]
|
|
|
+ logging.info("-2=")
|
|
|
+ return json.dumps([company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city],cls=MyEncoder,ensure_ascii=False)
|
|
|
+
|
|
|
+
|
|
|
+@annotate("string->string,string,string,string,bigint,datetime,string,string,string,string")
|
|
|
+class liberate(BaseUDTF):
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ import json
|
|
|
+ import time
|
|
|
+ import logging
|
|
|
+ import datetime
|
|
|
+ # import sys
|
|
|
+ # reload(sys)
|
|
|
+ # sys.setdefaultencoding('utf8')
|
|
|
+ global json,MyEncoder,logging,time,datetime
|
|
|
+
|
|
|
+ logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
+ class MyEncoder(json.JSONEncoder):
|
|
|
+
|
|
|
+ def default(self, obj):
|
|
|
+ if isinstance(obj, bytes):
|
|
|
+ return str(obj, encoding='utf-8')
|
|
|
+ return json.JSONEncoder.default(self, obj)
|
|
|
+
|
|
|
+
|
|
|
+ def process(self, json_dumplicate):
|
|
|
+ try:
|
|
|
+ logging.info(json_dumplicate)
|
|
|
+ json_dumplicate = json_dumplicate.replace("\\n","").replace('\\"','').replace("\\r","")
|
|
|
+ company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city = json.loads(json_dumplicate)
|
|
|
+ create_time = datetime.datetime.fromtimestamp(create_time)
|
|
|
+ self.forward(company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city)
|
|
|
+ except Exception as e:
|
|
|
+ pass
|
|
|
+
|
|
|
+import re
|
|
|
+mobile_pattern = re.compile("^1\d{10}$")
|
|
|
+def recog_likeType(phone):
|
|
|
+ if re.search(mobile_pattern,phone) is not None:
|
|
|
+ return "mobile"
|
|
|
+ else:
|
|
|
+ return "phone"
|
|
|
+
|
|
|
+@annotate("string,string,string,string,string,string->string")
|
|
|
+class f_tojson_docuentContact(object):
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ import json
|
|
|
+ global json
|
|
|
+
|
|
|
+
|
|
|
+ def evaluate(self, tenderee,tenderee_contact,tenderee_phone,agency,agency_contact,agency_phone):
|
|
|
+ list_contact = []
|
|
|
+ if tenderee!="" and tenderee_contact!="" and tenderee_phone!='' and tenderee_phone is not None:
|
|
|
+ _dict = {"company":tenderee,"contact_person":tenderee_contact,"level":20}
|
|
|
+ if recog_likeType(tenderee_phone)=="mobile":
|
|
|
+ _dict["mobile_no"] = tenderee_phone
|
|
|
+ else:
|
|
|
+ _dict["phone_no"] = tenderee_phone
|
|
|
+ list_contact.append(_dict)
|
|
|
+ if agency!="" and agency_contact!="" and agency_phone!='' and agency_phone is not None:
|
|
|
+ _dict = {"company":agency,"contact_person":agency_contact,"level":20}
|
|
|
+ if recog_likeType(agency_phone)=="mobile":
|
|
|
+ _dict["mobile_no"] = agency_phone
|
|
|
+ else:
|
|
|
+ _dict["phone_no"] = agency_phone
|
|
|
+ list_contact.append(_dict)
|
|
|
+ return json.dumps(list_contact)
|
|
|
+
|
|
|
+@annotate("string->string,string,string,string,bigint,string")
|
|
|
+class f_liberate_contactJson(BaseUDTF):
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ import json
|
|
|
+ import time
|
|
|
+ import logging
|
|
|
+ import datetime
|
|
|
+ # import sys
|
|
|
+ # reload(sys)
|
|
|
+ # sys.setdefaultencoding('utf8')
|
|
|
+ global json,MyEncoder,logging,time,datetime
|
|
|
+
|
|
|
+ logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
+
|
|
|
+
|
|
|
+ def process(self, json_contact):
|
|
|
+ try:
|
|
|
+ list_dict = json.loads(json_contact)
|
|
|
+ for _dict in list_dict:
|
|
|
+ company = _dict.get("company")
|
|
|
+ contact_person = _dict.get("contact_person")
|
|
|
+ mobile_no = _dict.get("mobile_no","")
|
|
|
+ if mobile_no is None:
|
|
|
+ mobile_no = ""
|
|
|
+ phone_no = _dict.get("phone_no","")
|
|
|
+ if phone_no is None:
|
|
|
+ phone_no = ""
|
|
|
+ else:
|
|
|
+ phone_no = re.sub('[^0-9\-转]','',phone_no)
|
|
|
+ if len(phone_no)<6:
|
|
|
+ phone_no = ""
|
|
|
+ level = _dict.get("level")
|
|
|
+ mail = _dict.get("mail","")
|
|
|
+ self.forward(company,contact_person,mobile_no,phone_no,level,mail)
|
|
|
+ except Exception as e:
|
|
|
+ logging.info(str(e))
|
|
|
+ logging.info(json_contact)
|
|
|
+
|
|
|
+@annotate('string->bigint')
|
|
|
+class f_count_company(BaseUDAF):
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ import datetime
|
|
|
+ import json
|
|
|
+ import logging
|
|
|
+ global datetime,json,logging,MyEncoder
|
|
|
+
|
|
|
+ logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
+
|
|
|
+ def new_buffer(self):
|
|
|
+ return [set()]
|
|
|
+
|
|
|
+ def iterate(self, buffer, company_name):
|
|
|
+ buffer[0].add(company_name)
|
|
|
+
|
|
|
+ def merge(self, buffer, pbuffer):
|
|
|
+ buffer[0] |= pbuffer[0]
|
|
|
+
|
|
|
+ def terminate(self, buffer):
|
|
|
+ return len(buffer[0])
|