cycleRec.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. from odps.udf import annotate
  2. from odps.udf import BaseUDAF
  3. from odps.udf import BaseUDTF
  4. import re
  5. import time
  6. import json
  7. import logging
  8. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  9. @annotate('string->string')
  10. class f_splitProduct(BaseUDTF):
  11. def process(self,product):
  12. if product is None:
  13. return
  14. for str_p in product.split(","):
  15. self.forward(str_p)
  16. def getTimeStamp(str_time):
  17. try:
  18. if str_time is not None and re.search("\d{4}\-\d{2}\-\d{2}.*",str_time) is not None:
  19. timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
  20. timeStamp = int(time.mktime(timeArray))
  21. return timeStamp
  22. else:
  23. return 0
  24. except Exception as e:
  25. return 0
  26. @annotate('string->string')
  27. class f_groupproduct(BaseUDAF):
  28. def new_buffer(self):
  29. return [[]]
  30. def iterate(self,buffer, page_time):
  31. timestamp = getTimeStamp(page_time)
  32. if timestamp>0:
  33. _set = set(buffer[0])
  34. _set.add(timestamp)
  35. _list = list(_set)
  36. _list.sort(key=lambda x:x,reverse=True)
  37. buffer[0] = _list[:10000]
  38. def merge(self, buffer, pbuffer):
  39. buffer[0].extend(pbuffer[0])
  40. _set = set(buffer[0])
  41. _list = list(_set)
  42. _list.sort(key=lambda x:x,reverse=True)
  43. buffer[0] = _list[:10000]
  44. def terminate(self, buffer):
  45. return json.dumps(buffer[0],ensure_ascii=False)
  46. @annotate('bigint->string')
  47. class f_groupdocid(BaseUDAF):
  48. def new_buffer(self):
  49. return [[]]
  50. def iterate(self,buffer, docid):
  51. buffer[0].append(docid)
  52. buffer[0] = buffer[0][:10000]
  53. def merge(self, buffer, pbuffer):
  54. buffer[0].extend(pbuffer[0])
  55. buffer[0] = buffer[0][:10000]
  56. def terminate(self, buffer):
  57. return json.dumps(buffer[0],ensure_ascii=False)
  58. def clusterTimestamp(aint_timestamp):
  59. def updateCenter(_c,_t):
  60. _center = _c["center"]
  61. _c["center"] = (_center*(len(_c["timestamp"])-1)+_t)//len(_c["timestamp"])
  62. aint_timestamp.sort(key=lambda x:x,reverse=True)
  63. distance = 30*24*60*60
  64. adict_cluster = []
  65. for _t in aint_timestamp:
  66. _find = False
  67. for _c in adict_cluster:
  68. _center = _c["center"]
  69. if abs(_t-_center)<distance:
  70. _find = True
  71. _c["timestamp"].append(_t)
  72. updateCenter(_c,_t)
  73. break
  74. if not _find:
  75. _c = {"timestamp":[_t],"center":_t}
  76. adict_cluster.append(_c)
  77. aint_center = []
  78. for _c in adict_cluster:
  79. aint_center.append(_c["center"])
  80. return aint_center
  81. def getPeriod(aint_center):
  82. aint_center.sort(key=lambda x:x)
  83. flt_powD = 0
  84. aint_dis = []
  85. int_avgD = 0
  86. int_minD = 1000
  87. int_maxD = 0
  88. for int_c in range(1,len(aint_center)):
  89. int_after = aint_center[int_c]//(24*60*60)
  90. int_before = aint_center[int_c-1]//(24*60*60)
  91. _d = abs(int_after-int_before)
  92. aint_dis.append(_d)
  93. int_avgD += _d
  94. if _d<int_minD:
  95. int_minD = _d
  96. if _d>int_maxD:
  97. int_maxD = _d
  98. if len(aint_center)>2 and (max(aint_center)-min(aint_center))>265*24*60*60:
  99. int_avgD/= len(aint_dis)
  100. for int_d in aint_dis:
  101. flt_powD += (int_d-int_avgD)**2
  102. if flt_powD/len(aint_dis)<30:
  103. return int(int_avgD),int(int_minD),int(int_maxD)
  104. return None,None,None
  105. @annotate('string->string,bigint,bigint,bigint')
  106. class f_getProductCycle(BaseUDTF):
  107. def process(self,json_timestamp):
  108. if json_timestamp is None:
  109. return
  110. aint_timestamp = json.loads(json_timestamp)
  111. aint_center = clusterTimestamp(aint_timestamp)
  112. int_avgD,int_minD,int_maxD = getPeriod(aint_center)
  113. if int_avgD is not None:
  114. self.forward(time.strftime('%Y-%m-%d',time.localtime(max(aint_center))),int_avgD,int_minD,int_maxD)