contactDumplicate.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. from odps.udf import annotate
  2. from odps.udf import BaseUDAF
  3. from odps.udf import BaseUDTF
  4. @annotate('string,string,string,string,bigint,datetime,string,string,string,string->string')
  5. class dumplicate(BaseUDAF):
  6. def __init__(self):
  7. import datetime
  8. import json
  9. import logging
  10. global datetime,json,logging,MyEncoder
  11. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  12. class MyEncoder(json.JSONEncoder):
  13. def default(self, obj):
  14. if isinstance(obj, bytes):
  15. return str(obj, encoding='utf-8')
  16. return json.JSONEncoder.default(self, obj)
  17. def new_buffer(self):
  18. return [[]]
  19. def iterate(self, buffer, company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city):
  20. logging.info(company_name)
  21. buffer[0].append([company_name.strip(),mobile_no,phone_no,contact_person,level,create_time.timestamp(),email,company_addr,province,city])
  22. logging.info(company_name)
  23. def merge(self, buffer, pbuffer):
  24. logging.info('-3=')
  25. buffer[0].extend(pbuffer[0])
  26. logging.info('-4=')
  27. def terminate(self, buffer):
  28. logging.info('-1=')
  29. buffer[0].sort(key=lambda x:x[5],reverse=True)
  30. company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city = buffer[0][0]
  31. logging.info("-2=")
  32. return json.dumps([company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city],cls=MyEncoder,ensure_ascii=False)
  33. @annotate("string->string,string,string,string,bigint,datetime,string,string,string,string")
  34. class liberate(BaseUDTF):
  35. def __init__(self):
  36. import json
  37. import time
  38. import logging
  39. import datetime
  40. # import sys
  41. # reload(sys)
  42. # sys.setdefaultencoding('utf8')
  43. global json,MyEncoder,logging,time,datetime
  44. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  45. class MyEncoder(json.JSONEncoder):
  46. def default(self, obj):
  47. if isinstance(obj, bytes):
  48. return str(obj, encoding='utf-8')
  49. return json.JSONEncoder.default(self, obj)
  50. def process(self, json_dumplicate):
  51. try:
  52. logging.info(json_dumplicate)
  53. json_dumplicate = json_dumplicate.replace("\\n","").replace('\\"','').replace("\\r","")
  54. company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city = json.loads(json_dumplicate)
  55. create_time = datetime.datetime.fromtimestamp(create_time)
  56. self.forward(company_name,mobile_no,phone_no,contact_person,level,create_time,email,company_addr,province,city)
  57. except Exception as e:
  58. pass
  59. import re
  60. mobile_pattern = re.compile("^1\d{10}$")
  61. def recog_likeType(phone):
  62. if re.search(mobile_pattern,phone) is not None:
  63. return "mobile"
  64. else:
  65. return "phone"
  66. @annotate("string,string,string,string,string,string->string")
  67. class f_tojson_docuentContact(object):
  68. def __init__(self):
  69. import json
  70. global json
  71. def evaluate(self, tenderee,tenderee_contact,tenderee_phone,agency,agency_contact,agency_phone):
  72. list_contact = []
  73. if tenderee!="" and tenderee_contact!="" and tenderee_phone!='' and tenderee_phone is not None:
  74. _dict = {"company":tenderee,"contact_person":tenderee_contact,"level":20}
  75. if recog_likeType(tenderee_phone)=="mobile":
  76. _dict["mobile_no"] = tenderee_phone
  77. else:
  78. _dict["phone_no"] = tenderee_phone
  79. list_contact.append(_dict)
  80. if agency!="" and agency_contact!="" and agency_phone!='' and agency_phone is not None:
  81. _dict = {"company":agency,"contact_person":agency_contact,"level":20}
  82. if recog_likeType(agency_phone)=="mobile":
  83. _dict["mobile_no"] = agency_phone
  84. else:
  85. _dict["phone_no"] = agency_phone
  86. list_contact.append(_dict)
  87. return json.dumps(list_contact)
  88. @annotate("string->string,string,string,string,bigint,string")
  89. class f_liberate_contactJson(BaseUDTF):
  90. def __init__(self):
  91. import json
  92. import time
  93. import logging
  94. import datetime
  95. # import sys
  96. # reload(sys)
  97. # sys.setdefaultencoding('utf8')
  98. global json,MyEncoder,logging,time,datetime
  99. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  100. def process(self, json_contact):
  101. try:
  102. list_dict = json.loads(json_contact)
  103. for _dict in list_dict:
  104. company = _dict.get("company")
  105. contact_person = _dict.get("contact_person")
  106. mobile_no = _dict.get("mobile_no","")
  107. if mobile_no is None:
  108. mobile_no = ""
  109. phone_no = _dict.get("phone_no","")
  110. if phone_no is None:
  111. phone_no = ""
  112. else:
  113. phone_no = re.sub('[^0-9\-转]','',phone_no)
  114. if len(phone_no)<6:
  115. phone_no = ""
  116. level = _dict.get("level")
  117. mail = _dict.get("mail","")
  118. self.forward(company,contact_person,mobile_no,phone_no,level,mail)
  119. except Exception as e:
  120. logging.info(str(e))
  121. logging.info(json_contact)
  122. @annotate('string->bigint')
  123. class f_count_company(BaseUDAF):
  124. def __init__(self):
  125. import datetime
  126. import json
  127. import logging
  128. global datetime,json,logging,MyEncoder
  129. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  130. def new_buffer(self):
  131. return [set()]
  132. def iterate(self, buffer, company_name):
  133. buffer[0].add(company_name)
  134. def merge(self, buffer, pbuffer):
  135. buffer[0] |= pbuffer[0]
  136. def terminate(self, buffer):
  137. return len(buffer[0])