from odps.udf import annotate from odps.udf import BaseUDAF from odps.udf import BaseUDTF import re import time import json import logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') @annotate('string->string') class f_splitProduct(BaseUDTF): def process(self,product): if product is None: return for str_p in product.split(","): self.forward(str_p) def getTimeStamp(str_time): try: if str_time is not None and re.search("\d{4}\-\d{2}\-\d{2}.*",str_time) is not None: timeArray = time.strptime(str_time[:10], "%Y-%m-%d") timeStamp = int(time.mktime(timeArray)) return timeStamp else: return 0 except Exception as e: return 0 @annotate('string->string') class f_groupproduct(BaseUDAF): def new_buffer(self): return [[]] def iterate(self,buffer, page_time): timestamp = getTimeStamp(page_time) if timestamp>0: _set = set(buffer[0]) _set.add(timestamp) _list = list(_set) _list.sort(key=lambda x:x,reverse=True) buffer[0] = _list[:10000] def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) _set = set(buffer[0]) _list = list(_set) _list.sort(key=lambda x:x,reverse=True) buffer[0] = _list[:10000] def terminate(self, buffer): return json.dumps(buffer[0],ensure_ascii=False) @annotate('bigint->string') class f_groupdocid(BaseUDAF): def new_buffer(self): return [[]] def iterate(self,buffer, docid): buffer[0].append(docid) buffer[0] = buffer[0][:10000] def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) buffer[0] = buffer[0][:10000] def terminate(self, buffer): return json.dumps(buffer[0],ensure_ascii=False) def clusterTimestamp(aint_timestamp): def updateCenter(_c,_t): _center = _c["center"] _c["center"] = (_center*(len(_c["timestamp"])-1)+_t)//len(_c["timestamp"]) aint_timestamp.sort(key=lambda x:x,reverse=True) distance = 30*24*60*60 adict_cluster = [] for _t in aint_timestamp: _find = False for _c in adict_cluster: _center = _c["center"] if abs(_t-_center)int_maxD: int_maxD = _d if len(aint_center)>2 and (max(aint_center)-min(aint_center))>265*24*60*60: int_avgD/= len(aint_dis) for int_d in aint_dis: flt_powD += (int_d-int_avgD)**2 if flt_powD/len(aint_dis)<30: return int(int_avgD),int(int_minD),int(int_maxD) return None,None,None @annotate('string->string,bigint,bigint,bigint') class f_getProductCycle(BaseUDTF): def process(self,json_timestamp): if json_timestamp is None: return aint_timestamp = json.loads(json_timestamp) aint_center = clusterTimestamp(aint_timestamp) int_avgD,int_minD,int_maxD = getPeriod(aint_center) if int_avgD is not None: self.forward(time.strftime('%Y-%m-%d',time.localtime(max(aint_center))),int_avgD,int_minD,int_maxD)