filltenderee.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. #coding:UTF8
  2. from odps.udf import annotate
  3. from odps.udf import BaseUDAF
  4. from odps.udf import BaseUDTF
  5. import re
  6. import time
  7. import json
  8. import logging
  9. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  10. import math
  11. @annotate('string->string')
  12. class f_splitProduct(BaseUDTF):
  13. def process(self,product):
  14. if product is None:
  15. return
  16. for str_p in product.split(","):
  17. self.forward(str_p)
  18. def getTimeStamp(str_time):
  19. try:
  20. if str_time is not None and re.search("\d{4}\-\d{2}\-\d{2}.*",str_time) is not None:
  21. timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
  22. timeStamp = int(time.mktime(timeArray))
  23. return timeStamp
  24. else:
  25. return 0
  26. except Exception as e:
  27. return 0
  28. @annotate('string->string')
  29. class f_groupproduct(BaseUDAF):
  30. def new_buffer(self):
  31. return [[]]
  32. def iterate(self,buffer, page_time):
  33. timestamp = getTimeStamp(page_time)
  34. if timestamp>0:
  35. _set = set(buffer[0])
  36. _set.add(timestamp)
  37. _list = list(_set)
  38. _list.sort(key=lambda x:x,reverse=True)
  39. buffer[0] = _list[:10000]
  40. def merge(self, buffer, pbuffer):
  41. buffer[0].extend(pbuffer[0])
  42. _set = set(buffer[0])
  43. _list = list(_set)
  44. _list.sort(key=lambda x:x,reverse=True)
  45. buffer[0] = _list[:10000]
  46. def terminate(self, buffer):
  47. return json.dumps(buffer[0],ensure_ascii=False)
  48. @annotate('string->bigint')
  49. class f_isdistinct(BaseUDAF):
  50. def new_buffer(self):
  51. return [{}]
  52. def iterate(self,buffer, tenderee):
  53. if len(buffer[0].keys())>20:
  54. return
  55. _key = tenderee
  56. if tenderee is None or tenderee=="":
  57. _key = "None"
  58. if _key not in buffer[0]:
  59. buffer[0][_key] = 0
  60. buffer[0][_key] += 1
  61. _key = "whole"
  62. if _key not in buffer[0]:
  63. buffer[0][_key] = 0
  64. buffer[0][_key] += 1
  65. def merge(self, buffer, pbuffer):
  66. for k,v in pbuffer[0].items():
  67. if k in buffer[0]:
  68. buffer[0][k] += v
  69. else:
  70. buffer[0][k] = v
  71. def terminate(self, buffer):
  72. _dict = buffer[0]
  73. if len(_dict.keys())>20:
  74. return 0
  75. _whole = _dict.get("whole",-1)
  76. list_v = []
  77. _empty = _dict.get("None",-1)
  78. for k,v in _dict.items():
  79. if k=="None":
  80. continue
  81. list_v.append(v)
  82. _max = max(list_v)
  83. if (_max+_empty)/_whole>0.9 and _max/_whole>0.4:
  84. return 1
  85. return 0