123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- #coding:UTF8
- from odps.udf import annotate
- from odps.udf import BaseUDAF
- from odps.udf import BaseUDTF
- import re
- import time
- import json
- import logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- @annotate('string->string')
- class f_splitProduct(BaseUDTF):
- def process(self,product):
- if product is None:
- return
- for str_p in product.split(","):
- self.forward(str_p)
- def getTimeStamp(str_time):
- try:
- if str_time is not None and re.search("\d{4}\-\d{2}\-\d{2}.*",str_time) is not None:
- timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
- timeStamp = int(time.mktime(timeArray))
- return timeStamp
- else:
- return 0
- except Exception as e:
- return 0
- @annotate('string->string')
- class f_groupproduct(BaseUDAF):
- def new_buffer(self):
- return [[]]
- def iterate(self,buffer, page_time):
- timestamp = getTimeStamp(page_time)
- if timestamp>0:
- _set = set(buffer[0])
- _set.add(timestamp)
- _list = list(_set)
- _list.sort(key=lambda x:x,reverse=True)
- buffer[0] = _list[:10000]
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- _set = set(buffer[0])
- _list = list(_set)
- _list.sort(key=lambda x:x,reverse=True)
- buffer[0] = _list[:10000]
- def terminate(self, buffer):
- return json.dumps(buffer[0],ensure_ascii=False)
- @annotate('bigint->string')
- class f_groupdocid(BaseUDAF):
- def new_buffer(self):
- return [[]]
- def iterate(self,buffer, docid):
- buffer[0].append(docid)
- buffer[0] = buffer[0][:10000]
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- buffer[0] = buffer[0][:10000]
- def terminate(self, buffer):
- return json.dumps(buffer[0],ensure_ascii=False)
- def clusterTimestamp(aint_timestamp,distance = 28*24*60*60):
- def updateCenter(_c,_t):
- _center = _c["center"]
- _c["center"] = (_center*(len(_c["timestamp"])-1)+_t)//len(_c["timestamp"])
- aint_timestamp.sort(key=lambda x:x,reverse=True)
- adict_cluster = []
- for _t in aint_timestamp:
- _find = False
- for _c in adict_cluster:
- _center = _c["center"]
- if abs(_t-_center)<distance:
- _find = True
- _c["timestamp"].append(_t)
- updateCenter(_c,_t)
- break
- if not _find:
- _c = {"timestamp":[_t],"center":_t}
- adict_cluster.append(_c)
- aint_center = []
- for _c in adict_cluster:
- aint_center.append(_c["center"])
- return aint_center
- def getDistanceOfCluster(aint_center):
- aint_center.sort(key=lambda x:x)
- aint_dis = []
- int_avgD = 0
- int_minD = 1000
- int_maxD = 0
- cluster_d = None
- #计算平均间隔,最大间隔,最小间隔
- for int_c in range(1,len(aint_center)):
- int_after = aint_center[int_c]//(24*60*60)
- int_before = aint_center[int_c-1]//(24*60*60)
- _d = abs(int_after-int_before)
- if _d==0:
- continue
- aint_dis.append(_d)
- if _d<int_minD:
- int_minD = _d
- if _d>int_maxD:
- int_maxD = _d
- if len(aint_dis)>0:
- int_avgD = int(sum(aint_dis)/len(aint_dis))
- int_minD = min(aint_dis)
- int_maxD = max(aint_dis)
- for _d in aint_dis:
- aint_gre = [int(a>=_d) for a in aint_dis]
- if sum(aint_gre)/len(aint_gre)>0.5 and (int_maxD-_d)/int_avgD<0.5:
- cluster_d = _d
- return aint_dis,int_avgD,int_minD,int_maxD,cluster_d
- def getPeriod(aint_timestamp):
- aint_center = clusterTimestamp(aint_timestamp)#聚类
- aint_dis,int_avgD,int_minD,int_maxD,cluster_d = getDistanceOfCluster(aint_center)
- if cluster_d is not None:
- aint_center = clusterTimestamp(aint_center,distance=(cluster_d-1)*24*60*60)
- aint_dis,int_avgD,int_minD,int_maxD,cluster_d = getDistanceOfCluster(aint_center)
- _prob = 0
- last_time = time.strftime('%Y-%m-%d',time.localtime(max(aint_center)))
- if len(aint_dis)>=2 and (max(aint_center)-min(aint_center))>365*24*60*60:
- flt_powD = 0
- for int_d in aint_dis:
- flt_powD += (int_d-int_avgD)**2
- base_prob = 0.99
- if len(aint_dis)<4:
- base_prob = 0.8
- elif len(aint_dis)<6:
- base_prob = 0.9
- _prob = round(base_prob-(flt_powD/len(aint_dis)/int_avgD**2),4)
- # if flt_powD/len(aint_dis)<30:
- if _prob>0.5:
- return last_time,_prob,int(int_avgD),int(int_minD),int(int_maxD),len(aint_dis)
- return None,_prob,None,None,None,None
- @annotate('string->string,double,bigint,bigint,bigint,bigint')
- class f_getProductCycle(BaseUDTF):
- def process(self,json_timestamp):
- if json_timestamp is None:
- return
- aint_timestamp = json.loads(json_timestamp)
- # aint_timestamp.sort(key=lambda x:x,reverse=True)
- # aint_center = aint_timestamp
- last_time,_prob,int_avgD,int_minD,int_maxD,_periods = getPeriod(aint_timestamp)
- if int_avgD is not None:
- self.forward(last_time,_prob,int_avgD,int_minD,int_maxD,_periods)
- @annotate('string->string')
- class f_getTendererCompany(BaseUDTF):
- def process(self,sub_docs_json):
- if sub_docs_json is None:
- return
- sub_docs = json.loads(sub_docs_json)
- for _doc in sub_docs:
- _win = _doc.get("win_tenderer")
- if _win is not None:
- self.forward(_win)
- _second = _doc.get("second_tenderer")
- if _second is not None:
- self.forward(_second)
- _third = _doc.get("third_tenderer")
- if _third is not None:
- self.forward(_third)
- @annotate('string->string')
- class f_concatstr(BaseUDAF):
- def new_buffer(self):
- return [[]]
- def iterate(self,buffer, _str):
- if _str is not None and _str!="":
- buffer[0].append(str(_str))
- buffer[0] = buffer[0][:10000]
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- buffer[0] = buffer[0][:10000]
- def terminate(self, buffer):
- return ",".join(buffer[0])
|