enterpriseFix.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. #coding:utf8
  2. from odps.udf import annotate,BaseUDAF,BaseUDTF
  3. import logging
  4. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  5. import json
  6. import traceback
  7. @annotate('string->string')
  8. class getYearMonth(object):
  9. def evaluate(self,page_time):
  10. if page_time is None:
  11. return ""
  12. return str(page_time[:7])
  13. @annotate('double->string')
  14. class getMoneyRange(object):
  15. def evaluate(self,money):
  16. if money is None or money==0:
  17. return '等于0或空'
  18. elif money<10*10000:
  19. return '(0,10万)'
  20. elif money<100*10000:
  21. return '[10万,100万)'
  22. elif money<500*10000:
  23. return '[100万,500万)'
  24. elif money<1000*10000:
  25. return '[500万,1000万)'
  26. elif money<10000*10000:
  27. return '[1000万,1亿)'
  28. elif money<10*10000*10000:
  29. return '[1亿,10亿)'
  30. elif money<100*10000*10000:
  31. return '[10亿,100亿)'
  32. else:
  33. return '[100亿,500亿]'
  34. @annotate('string->bigint')
  35. class getdocidFromDocids(BaseUDTF):
  36. def process(self,docids):
  37. for docid in docids.split(","):
  38. self.forward(int(docid))
  39. @annotate('string->string')
  40. class fixEnterpriseName(object):
  41. def __init__(self):
  42. import re
  43. global re
  44. def evaluate(self,name):
  45. new_name = re.sub("[#!!&@$'\s\*\"{};;]","",name)
  46. new_name = re.sub("amp|lt|bramp|gt|nbsp|br","",new_name)
  47. _s = re.search("\*+",name)
  48. if _s is not None:
  49. if _s.span()[1]-_s.span()[0]>=3:
  50. new_name = ""
  51. if len(new_name)<4:
  52. new_name = ""
  53. if new_name.find("有限公司")>=0 and len(new_name)<=7:
  54. new_name = ""
  55. return new_name
  56. @annotate('string->string')
  57. class removeCommonWord(object):
  58. def __init__(self):
  59. from AreaGet import AreaGet
  60. import re
  61. global re
  62. self.dict_area = AreaGet().getDict_area()
  63. _pattern = ""
  64. list_name = []
  65. for k,v in self.dict_area.items():
  66. _name = v.get("cname","")
  67. if _name!="":
  68. list_name.append(_name)
  69. _pattern = "|".join(list_name)+"|[省市区县]|有限|公司|股份|分公司|责任"
  70. self.pattern = re.compile(_pattern)
  71. def evaluate(self,name):
  72. return re.sub(self.pattern,"",name)
  73. @annotate("string->string,string,string,string,bigint,bigint")
  74. class dealEnterpriseCircle(BaseUDTF):
  75. def __init__(self):
  76. from AreaGet import AreaGet
  77. import re
  78. global re
  79. self.dict_area = AreaGet().getDict_area()
  80. set_area = set()
  81. for k,v in self.dict_area.items():
  82. set_area.add(v.get("cname"))
  83. self.set_area = set_area
  84. def process(self,name):
  85. name = re.sub("\s+","",name)
  86. new_name = name.replace("(","(").replace(")",")")
  87. new_name = re.sub("\(+",'(',new_name)
  88. new_name = re.sub("\)+",')',new_name)
  89. bool_area = 0
  90. bool_end = 0
  91. circle = ""
  92. before = ""
  93. for _s in re.finditer("\(.+?\)",new_name):
  94. circle = new_name[_s.span()[0]:_s.span()[1]][1:-1]
  95. if _s.span()[1]>=len(new_name):
  96. bool_end = 1
  97. before = new_name[:_s.span()[0]]
  98. if circle in self.set_area:
  99. bool_area = 1
  100. else:
  101. bool_area = 0
  102. self.forward(name,new_name,before,circle,bool_area,bool_end)
  103. @annotate('string->string')
  104. class f_turn_circle(object):
  105. def __init__(self):
  106. import re
  107. global re
  108. def evaluate(self,name):
  109. if name is not None:
  110. return name.replace("(","(").replace(")",")")
  111. else:
  112. return ""
  113. @annotate('string,string->string,bigint')
  114. class f_dumplicate_contacts(BaseUDTF):
  115. def __init__(self):
  116. pass
  117. def process(self,name,contacts):
  118. if contacts is None:
  119. self.forward(contacts,1)
  120. return
  121. try:
  122. list_contacts = json.loads(contacts)
  123. _set = set()
  124. _phone_set = set()
  125. new_list_contacts = []
  126. list_contacts.sort(key=lambda x:len(x.get("contact_person","")),reverse=True)
  127. for _conta in list_contacts:
  128. contact_person = _conta.get("contact_person","")
  129. mobile_no = _conta.get("mobile_no","")
  130. phone_no = _conta.get("phone_no","")
  131. if contact_person=="" and (mobile_no in _phone_set or phone_no in _phone_set):
  132. continue
  133. _key = "%s-%s-%s"%(contact_person,mobile_no,phone_no)
  134. if _key in _set:
  135. continue
  136. if mobile_no!="":
  137. _phone_set.add(mobile_no)
  138. if phone_no!="":
  139. _phone_set.add(phone_no)
  140. new_list_contacts.append(_conta)
  141. _set.add(_key)
  142. if len(new_list_contacts)!=len(list_contacts):
  143. logging.info(name)
  144. new_list_contacts.sort(key=lambda x:x.get("level",0),reverse=True)
  145. self.forward(json.dumps(new_list_contacts,ensure_ascii=False),1)
  146. except Exception as e:
  147. traceback.print_exc()
  148. logging.info(contacts)
  149. self.forward(None,0)