cycleRec.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. #coding:UTF8
  2. from odps.udf import annotate
  3. from odps.udf import BaseUDAF
  4. from odps.udf import BaseUDTF
  5. import re
  6. import time
  7. import json
  8. import logging
  9. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  10. import math
  11. @annotate('string->string')
  12. class f_splitProduct(BaseUDTF):
  13. def process(self,product):
  14. if product is None:
  15. return
  16. for str_p in product.split(","):
  17. self.forward(str_p)
  18. def getTimeStamp(str_time):
  19. try:
  20. if str_time is not None and re.search("\d{4}\-\d{2}\-\d{2}.*",str_time) is not None:
  21. timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
  22. timeStamp = int(time.mktime(timeArray))
  23. return timeStamp
  24. else:
  25. return 0
  26. except Exception as e:
  27. return 0
  28. @annotate('string->string')
  29. class f_groupproduct(BaseUDAF):
  30. def new_buffer(self):
  31. return [[]]
  32. def iterate(self,buffer, page_time):
  33. timestamp = getTimeStamp(page_time)
  34. if timestamp>0:
  35. _set = set(buffer[0])
  36. _set.add(timestamp)
  37. _list = list(_set)
  38. _list.sort(key=lambda x:x,reverse=True)
  39. buffer[0] = _list[:10000]
  40. def merge(self, buffer, pbuffer):
  41. buffer[0].extend(pbuffer[0])
  42. _set = set(buffer[0])
  43. _list = list(_set)
  44. _list.sort(key=lambda x:x,reverse=True)
  45. buffer[0] = _list[:10000]
  46. def terminate(self, buffer):
  47. return json.dumps(buffer[0],ensure_ascii=False)
  48. @annotate('bigint->string')
  49. class f_groupdocid(BaseUDAF):
  50. def new_buffer(self):
  51. return [[]]
  52. def iterate(self,buffer, docid):
  53. buffer[0].append(docid)
  54. buffer[0] = buffer[0][:10000]
  55. def merge(self, buffer, pbuffer):
  56. buffer[0].extend(pbuffer[0])
  57. buffer[0] = buffer[0][:10000]
  58. def terminate(self, buffer):
  59. return json.dumps(buffer[0],ensure_ascii=False)
  60. def clusterTimestamp(aint_timestamp,distance = 28*24*60*60):
  61. def updateCenter(_c,_t):
  62. _center = _c["center"]
  63. _c["center"] = (_center*(len(_c["timestamp"])-1)+_t)//len(_c["timestamp"])
  64. aint_timestamp.sort(key=lambda x:x,reverse=True)
  65. adict_cluster = []
  66. for _t in aint_timestamp:
  67. _find = False
  68. for _c in adict_cluster:
  69. _center = _c["center"]
  70. if abs(_t-_center)<distance:
  71. _find = True
  72. _c["timestamp"].append(_t)
  73. updateCenter(_c,_t)
  74. break
  75. if not _find:
  76. _c = {"timestamp":[_t],"center":_t}
  77. adict_cluster.append(_c)
  78. aint_center = []
  79. for _c in adict_cluster:
  80. aint_center.append(_c["center"])
  81. return aint_center
  82. def getAvgD(aint_dis):
  83. if len(aint_dis)==0:
  84. return 0
  85. avg_dis = 1
  86. int_avgD = int(sum(aint_dis)/len(aint_dis))
  87. new_aint_dis = [a for a in aint_dis]
  88. print(sum(aint_dis)/len(aint_dis))
  89. min_pow = 10000000
  90. min_dis = min(aint_dis)
  91. for _dis in range(min(aint_dis),max(aint_dis)+1):
  92. pow_x = 0
  93. for _d in new_aint_dis:
  94. pow_x += math.sqrt(abs((_d-_dis)))
  95. print(_dis,pow_x)
  96. if pow_x<min_pow:
  97. min_pow = pow_x
  98. min_dis = _dis
  99. return min_dis
  100. def getDistanceOfCluster(aint_center):
  101. aint_center.sort(key=lambda x:x)
  102. aint_dis = []
  103. int_avgD = 0
  104. int_minD = 1000
  105. int_maxD = 0
  106. cluster_d = None
  107. #计算平均间隔,最大间隔,最小间隔
  108. for int_c in range(1,len(aint_center)):
  109. int_after = aint_center[int_c]//(24*60*60)
  110. int_before = aint_center[int_c-1]//(24*60*60)
  111. _d = abs(int_after-int_before)
  112. if _d==0:
  113. continue
  114. aint_dis.append(_d)
  115. if _d<int_minD:
  116. int_minD = _d
  117. if _d>int_maxD:
  118. int_maxD = _d
  119. if len(aint_dis)>0:
  120. # int_avgD = int(sum(aint_dis)/len(aint_dis))
  121. int_avgD = getAvgD(aint_dis)
  122. int_minD = min(aint_dis)
  123. int_maxD = max(aint_dis)
  124. if abs(int_maxD-int_avgD)>abs(int_minD-int_avgD):
  125. int_minD = min([int_avgD,int_minD])
  126. int_maxD = max([int_avgD,int_minD])
  127. else:
  128. int_minD = min([int_avgD,int_maxD])
  129. int_maxD = max([int_avgD,int_maxD])
  130. int_avgD = (int_minD+int_maxD)//2
  131. # for _d in aint_dis:
  132. # aint_gre = [int(a>=_d) for a in aint_dis]
  133. # if sum(aint_gre)/len(aint_gre)>0.5 and (int_maxD-_d)/int_avgD<0.5:
  134. # cluster_d = _d
  135. return aint_dis,int_avgD,int_minD,int_maxD,cluster_d
  136. def getPeriod(aint_timestamp):
  137. aint_center = clusterTimestamp(aint_timestamp,distance=29*24*60*60)#聚类
  138. aint_dis,int_avgD,int_minD,int_maxD,cluster_d = getDistanceOfCluster(aint_center)
  139. if cluster_d is not None:
  140. aint_center = clusterTimestamp(aint_center,distance=(cluster_d-1)*24*60*60)
  141. aint_dis,int_avgD,int_minD,int_maxD,cluster_d = getDistanceOfCluster(aint_center)
  142. _prob = 0
  143. last_time = time.strftime('%Y-%m-%d',time.localtime(max(aint_center)))
  144. if len(aint_dis)>=2 and (max(aint_center)-min(aint_center))>365*24*60*60:
  145. flt_powD = 0
  146. for int_d in aint_dis:
  147. flt_powD += (int_d-int_avgD)**2
  148. base_prob = 0.99
  149. if len(aint_dis)<4:
  150. base_prob = 0.8
  151. elif len(aint_dis)<6:
  152. base_prob = 0.9
  153. _prob = round(base_prob-(flt_powD/len(aint_dis)/int_avgD**2),4)
  154. # if flt_powD/len(aint_dis)<30:
  155. if _prob>0.5 and int_maxD-int_minD<=70:
  156. return last_time,_prob,int(int_avgD),int(int_minD),int(int_maxD),len(aint_dis)
  157. return None,_prob,None,None,None,None
  158. def timeAdd(_time,days):
  159. a = time.mktime(time.strptime(_time,'%Y-%m-%d'))+86400*days
  160. _time1 = time.strftime("%Y-%m-%d",time.localtime(a))
  161. return _time1
  162. @annotate('string->string,string,string,double,bigint,bigint,bigint,bigint')
  163. class f_getProductCycle(BaseUDTF):
  164. def process(self,json_timestamp):
  165. if json_timestamp is None:
  166. return
  167. aint_timestamp = json.loads(json_timestamp)
  168. # aint_timestamp.sort(key=lambda x:x,reverse=True)
  169. # aint_center = aint_timestamp
  170. last_time,_prob,int_avgD,int_minD,int_maxD,_periods = getPeriod(aint_timestamp)
  171. if int_avgD is not None:
  172. may_begin = timeAdd(last_time,int_minD)
  173. may_end = timeAdd(last_time,int_maxD)
  174. self.forward(may_begin,may_end,last_time,_prob,int_avgD,int_minD,int_maxD,_periods)
  175. @annotate('string->string')
  176. class f_getTendererCompany(BaseUDTF):
  177. def process(self,sub_docs_json):
  178. if sub_docs_json is None:
  179. return
  180. sub_docs = json.loads(sub_docs_json)
  181. for _doc in sub_docs:
  182. _win = _doc.get("win_tenderer")
  183. if _win is not None:
  184. self.forward(_win)
  185. _second = _doc.get("second_tenderer")
  186. if _second is not None:
  187. self.forward(_second)
  188. _third = _doc.get("third_tenderer")
  189. if _third is not None:
  190. self.forward(_third)
  191. @annotate('string,bigint->string')
  192. class f_concatstr(BaseUDAF):
  193. def new_buffer(self):
  194. return [[]]
  195. def iterate(self,buffer, _str,signal):
  196. self.signal = signal
  197. if _str is not None and _str!="":
  198. buffer[0].append(str(_str))
  199. buffer[0] = buffer[0][:10000]
  200. def merge(self, buffer, pbuffer):
  201. buffer[0].extend(pbuffer[0])
  202. buffer[0] = buffer[0][:10000]
  203. def terminate(self, buffer):
  204. _s = ",".join(buffer[0])
  205. _s1 = set(_s.split(","))
  206. if "" in _s1:
  207. _s1.remove("")
  208. return ",".join(list(_s1))
  209. @annotate('string,bigint->string')
  210. class f_getLastProjectUuid(BaseUDAF):
  211. def new_buffer(self):
  212. return [[]]
  213. def iterate(self,buffer, _uuid,page_timestamp):
  214. buffer[0].append({"uuid":_uuid,"timestamp":page_timestamp})
  215. buffer[0] = buffer[0][:10000]
  216. def merge(self, buffer, pbuffer):
  217. buffer[0].extend(pbuffer[0])
  218. buffer[0] = buffer[0][:10000]
  219. def terminate(self, buffer):
  220. if len(buffer[0])>0:
  221. buffer[0].sort(key=lambda x:x["timestamp"],reverse=True)
  222. return buffer[0][0]["uuid"]
  223. return None
  224. @annotate('string->string')
  225. class f_groupJsonStr(BaseUDAF):
  226. def new_buffer(self):
  227. return [[]]
  228. def iterate(self,buffer, _str):
  229. buffer[0].append(_str)
  230. buffer[0] = buffer[0][:10000]
  231. def merge(self, buffer, pbuffer):
  232. buffer[0].extend(pbuffer[0])
  233. buffer[0] = buffer[0][:10000]
  234. def terminate(self, buffer):
  235. return json.dumps(list(set(buffer[0])),ensure_ascii=False)
  236. @annotate('bigint,string,string->string,string,string,string,string,double,string')
  237. class f_extractDemand(BaseUDTF):
  238. def getProduct(self,tenderee,_product,project_name):
  239. if len(_product)>0:
  240. _product.sort(key=lambda x:len(x),reverse=True)
  241. return _product[0]
  242. else:
  243. product = str(project_name).replace(tenderee,"")
  244. product = re.sub(".*公司|项目|采购","",product)
  245. return product
  246. def formatTime(self,_date):
  247. if _date is not None:
  248. _d = _date.split("-")
  249. if len(_d)==3:
  250. return "%s-%s-%s"%(_d[0].rjust(4,"2"),_d[1].rjust(2,"0"),_d[2].rjust(2,"0"))
  251. def process(self,docid,tenderee,json_demand_info):
  252. if json_demand_info is None:
  253. return
  254. demand_info = json.loads(json_demand_info)
  255. for _line in demand_info["data"]:
  256. try:
  257. _product = _line.get("product",[])
  258. order_end = _line.get("order_end")
  259. order_end = self.formatTime(order_end)
  260. project_name = _line.get("project_name")
  261. demand = _line.get("demand")
  262. budget = _line.get("budget")
  263. if budget is not None and len(budget)>0:
  264. budget = float(budget)
  265. order_begin = _line.get("order_begin")
  266. order_begin = self.formatTime(order_begin)
  267. if order_begin is None or order_end is None:
  268. continue
  269. product = self.getProduct(tenderee,_product,project_name)
  270. json_docids = json.dumps([str(docid)])
  271. self.forward(product,order_begin,order_end,demand,project_name,budget,json_docids)
  272. except Exception as e:
  273. logging.info("============error:%s"%(str(e)))