cycleRec.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. #coding:UTF8
  2. from odps.udf import annotate
  3. from odps.udf import BaseUDAF
  4. from odps.udf import BaseUDTF
  5. import re
  6. import time
  7. import json
  8. import logging
  9. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  10. @annotate('string->string')
  11. class f_splitProduct(BaseUDTF):
  12. def process(self,product):
  13. if product is None:
  14. return
  15. for str_p in product.split(","):
  16. self.forward(str_p)
  17. def getTimeStamp(str_time):
  18. try:
  19. if str_time is not None and re.search("\d{4}\-\d{2}\-\d{2}.*",str_time) is not None:
  20. timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
  21. timeStamp = int(time.mktime(timeArray))
  22. return timeStamp
  23. else:
  24. return 0
  25. except Exception as e:
  26. return 0
  27. @annotate('string->string')
  28. class f_groupproduct(BaseUDAF):
  29. def new_buffer(self):
  30. return [[]]
  31. def iterate(self,buffer, page_time):
  32. timestamp = getTimeStamp(page_time)
  33. if timestamp>0:
  34. _set = set(buffer[0])
  35. _set.add(timestamp)
  36. _list = list(_set)
  37. _list.sort(key=lambda x:x,reverse=True)
  38. buffer[0] = _list[:10000]
  39. def merge(self, buffer, pbuffer):
  40. buffer[0].extend(pbuffer[0])
  41. _set = set(buffer[0])
  42. _list = list(_set)
  43. _list.sort(key=lambda x:x,reverse=True)
  44. buffer[0] = _list[:10000]
  45. def terminate(self, buffer):
  46. return json.dumps(buffer[0],ensure_ascii=False)
  47. @annotate('bigint->string')
  48. class f_groupdocid(BaseUDAF):
  49. def new_buffer(self):
  50. return [[]]
  51. def iterate(self,buffer, docid):
  52. buffer[0].append(docid)
  53. buffer[0] = buffer[0][:10000]
  54. def merge(self, buffer, pbuffer):
  55. buffer[0].extend(pbuffer[0])
  56. buffer[0] = buffer[0][:10000]
  57. def terminate(self, buffer):
  58. return json.dumps(buffer[0],ensure_ascii=False)
  59. def clusterTimestamp(aint_timestamp,distance = 28*24*60*60):
  60. def updateCenter(_c,_t):
  61. _center = _c["center"]
  62. _c["center"] = (_center*(len(_c["timestamp"])-1)+_t)//len(_c["timestamp"])
  63. aint_timestamp.sort(key=lambda x:x,reverse=True)
  64. adict_cluster = []
  65. for _t in aint_timestamp:
  66. _find = False
  67. for _c in adict_cluster:
  68. _center = _c["center"]
  69. if abs(_t-_center)<distance:
  70. _find = True
  71. _c["timestamp"].append(_t)
  72. updateCenter(_c,_t)
  73. break
  74. if not _find:
  75. _c = {"timestamp":[_t],"center":_t}
  76. adict_cluster.append(_c)
  77. aint_center = []
  78. for _c in adict_cluster:
  79. aint_center.append(_c["center"])
  80. return aint_center
  81. def getDistanceOfCluster(aint_center):
  82. aint_center.sort(key=lambda x:x)
  83. aint_dis = []
  84. int_avgD = 0
  85. int_minD = 1000
  86. int_maxD = 0
  87. cluster_d = None
  88. #计算平均间隔,最大间隔,最小间隔
  89. for int_c in range(1,len(aint_center)):
  90. int_after = aint_center[int_c]//(24*60*60)
  91. int_before = aint_center[int_c-1]//(24*60*60)
  92. _d = abs(int_after-int_before)
  93. if _d==0:
  94. continue
  95. aint_dis.append(_d)
  96. if _d<int_minD:
  97. int_minD = _d
  98. if _d>int_maxD:
  99. int_maxD = _d
  100. if len(aint_dis)>0:
  101. int_avgD = int(sum(aint_dis)/len(aint_dis))
  102. int_minD = min(aint_dis)
  103. int_maxD = max(aint_dis)
  104. for _d in aint_dis:
  105. aint_gre = [int(a>=_d) for a in aint_dis]
  106. if sum(aint_gre)/len(aint_gre)>0.5 and (int_maxD-_d)/int_avgD<0.5:
  107. cluster_d = _d
  108. return aint_dis,int_avgD,int_minD,int_maxD,cluster_d
  109. def getPeriod(aint_timestamp):
  110. aint_center = clusterTimestamp(aint_timestamp)#聚类
  111. aint_dis,int_avgD,int_minD,int_maxD,cluster_d = getDistanceOfCluster(aint_center)
  112. if cluster_d is not None:
  113. aint_center = clusterTimestamp(aint_center,distance=(cluster_d-1)*24*60*60)
  114. aint_dis,int_avgD,int_minD,int_maxD,cluster_d = getDistanceOfCluster(aint_center)
  115. _prob = 0
  116. last_time = time.strftime('%Y-%m-%d',time.localtime(max(aint_center)))
  117. if len(aint_dis)>=2 and (max(aint_center)-min(aint_center))>365*24*60*60:
  118. flt_powD = 0
  119. for int_d in aint_dis:
  120. flt_powD += (int_d-int_avgD)**2
  121. base_prob = 0.99
  122. if len(aint_dis)<4:
  123. base_prob = 0.8
  124. elif len(aint_dis)<6:
  125. base_prob = 0.9
  126. _prob = round(base_prob-(flt_powD/len(aint_dis)/int_avgD**2),4)
  127. # if flt_powD/len(aint_dis)<30:
  128. if _prob>0.5:
  129. return last_time,_prob,int(int_avgD),int(int_minD),int(int_maxD),len(aint_dis)
  130. return None,_prob,None,None,None,None
  131. @annotate('string->string,double,bigint,bigint,bigint,bigint')
  132. class f_getProductCycle(BaseUDTF):
  133. def process(self,json_timestamp):
  134. if json_timestamp is None:
  135. return
  136. aint_timestamp = json.loads(json_timestamp)
  137. # aint_timestamp.sort(key=lambda x:x,reverse=True)
  138. # aint_center = aint_timestamp
  139. last_time,_prob,int_avgD,int_minD,int_maxD,_periods = getPeriod(aint_timestamp)
  140. if int_avgD is not None:
  141. self.forward(last_time,_prob,int_avgD,int_minD,int_maxD,_periods)
  142. @annotate('string->string')
  143. class f_getTendererCompany(BaseUDTF):
  144. def process(self,sub_docs_json):
  145. if sub_docs_json is None:
  146. return
  147. sub_docs = json.loads(sub_docs_json)
  148. for _doc in sub_docs:
  149. _win = _doc.get("win_tenderer")
  150. if _win is not None:
  151. self.forward(_win)
  152. _second = _doc.get("second_tenderer")
  153. if _second is not None:
  154. self.forward(_second)
  155. _third = _doc.get("third_tenderer")
  156. if _third is not None:
  157. self.forward(_third)
  158. @annotate('string->string')
  159. class f_concatstr(BaseUDAF):
  160. def new_buffer(self):
  161. return [[]]
  162. def iterate(self,buffer, _str):
  163. if _str is not None and _str!="":
  164. buffer[0].append(str(_str))
  165. buffer[0] = buffer[0][:10000]
  166. def merge(self, buffer, pbuffer):
  167. buffer[0].extend(pbuffer[0])
  168. buffer[0] = buffer[0][:10000]
  169. def terminate(self, buffer):
  170. return ",".join(buffer[0])