123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- from odps.udf import annotate
- from odps.udf import BaseUDAF
- from odps.udf import BaseUDTF
- import re
- import time
- import json
- import logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- @annotate('string->string')
- class f_splitProduct(BaseUDTF):
- def process(self,product):
- if product is None:
- return
- for str_p in product.split(","):
- self.forward(str_p)
- def getTimeStamp(str_time):
- try:
- if str_time is not None and re.search("\d{4}\-\d{2}\-\d{2}.*",str_time) is not None:
- timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
- timeStamp = int(time.mktime(timeArray))
- return timeStamp
- else:
- return 0
- except Exception as e:
- return 0
- @annotate('string->string')
- class f_groupproduct(BaseUDAF):
- def new_buffer(self):
- return [[]]
- def iterate(self,buffer, page_time):
- timestamp = getTimeStamp(page_time)
- if timestamp>0:
- _set = set(buffer[0])
- _set.add(timestamp)
- _list = list(_set)
- _list.sort(key=lambda x:x,reverse=True)
- buffer[0] = _list[:10000]
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- _set = set(buffer[0])
- _list = list(_set)
- _list.sort(key=lambda x:x,reverse=True)
- buffer[0] = _list[:10000]
- def terminate(self, buffer):
- return json.dumps(buffer[0],ensure_ascii=False)
- @annotate('bigint->string')
- class f_groupdocid(BaseUDAF):
- def new_buffer(self):
- return [[]]
- def iterate(self,buffer, docid):
- buffer[0].append(docid)
- buffer[0] = buffer[0][:10000]
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- buffer[0] = buffer[0][:10000]
- def terminate(self, buffer):
- return json.dumps(buffer[0],ensure_ascii=False)
- def clusterTimestamp(aint_timestamp):
- def updateCenter(_c,_t):
- _center = _c["center"]
- _c["center"] = (_center*(len(_c["timestamp"])-1)+_t)//len(_c["timestamp"])
- aint_timestamp.sort(key=lambda x:x,reverse=True)
- distance = 30*24*60*60
- adict_cluster = []
- for _t in aint_timestamp:
- _find = False
- for _c in adict_cluster:
- _center = _c["center"]
- if abs(_t-_center)<distance:
- _find = True
- _c["timestamp"].append(_t)
- updateCenter(_c,_t)
- break
- if not _find:
- _c = {"timestamp":[_t],"center":_t}
- adict_cluster.append(_c)
- aint_center = []
- for _c in adict_cluster:
- aint_center.append(_c["center"])
- return aint_center
- def getPeriod(aint_center):
- aint_center.sort(key=lambda x:x)
- flt_powD = 0
- aint_dis = []
- int_avgD = 0
- int_minD = 1000
- int_maxD = 0
- for int_c in range(1,len(aint_center)):
- int_after = aint_center[int_c]//(24*60*60)
- int_before = aint_center[int_c-1]//(24*60*60)
- _d = abs(int_after-int_before)
- aint_dis.append(_d)
- int_avgD += _d
- if _d<int_minD:
- int_minD = _d
- if _d>int_maxD:
- int_maxD = _d
- if len(aint_center)>2 and (max(aint_center)-min(aint_center))>265*24*60*60:
- int_avgD/= len(aint_dis)
- for int_d in aint_dis:
- flt_powD += (int_d-int_avgD)**2
- if flt_powD/len(aint_dis)<30:
- return int(int_avgD),int(int_minD),int(int_maxD)
- return None,None,None
- @annotate('string->string,bigint,bigint,bigint')
- class f_getProductCycle(BaseUDTF):
- def process(self,json_timestamp):
- if json_timestamp is None:
- return
- aint_timestamp = json.loads(json_timestamp)
- aint_center = clusterTimestamp(aint_timestamp)
- int_avgD,int_minD,int_maxD = getPeriod(aint_center)
- if int_avgD is not None:
- self.forward(time.strftime('%Y-%m-%d',time.localtime(max(aint_center))),int_avgD,int_minD,int_maxD)
|