#coding:UTF8 from odps.udf import annotate from odps.udf import BaseUDAF from odps.udf import BaseUDTF import re import time import json import logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') import math @annotate('string->string') class f_splitProduct(BaseUDTF): def process(self,product): if product is None: return for str_p in product.split(","): self.forward(str_p) def getTimeStamp(str_time): try: if str_time is not None and re.search("\d{4}\-\d{2}\-\d{2}.*",str_time) is not None: timeArray = time.strptime(str_time[:10], "%Y-%m-%d") timeStamp = int(time.mktime(timeArray)) return timeStamp else: return 0 except Exception as e: return 0 @annotate('string->string') class f_groupproduct(BaseUDAF): def new_buffer(self): return [[]] def iterate(self,buffer, page_time): timestamp = getTimeStamp(page_time) if timestamp>0: _set = set(buffer[0]) _set.add(timestamp) _list = list(_set) _list.sort(key=lambda x:x,reverse=True) buffer[0] = _list[:10000] def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) _set = set(buffer[0]) _list = list(_set) _list.sort(key=lambda x:x,reverse=True) buffer[0] = _list[:10000] def terminate(self, buffer): return json.dumps(buffer[0],ensure_ascii=False) @annotate('string->bigint') class f_isdistinct(BaseUDAF): def new_buffer(self): return [{}] def iterate(self,buffer, tenderee): if len(buffer[0].keys())>20: return _key = tenderee if tenderee is None or tenderee=="": _key = "None" if _key not in buffer[0]: buffer[0][_key] = 0 buffer[0][_key] += 1 _key = "whole" if _key not in buffer[0]: buffer[0][_key] = 0 buffer[0][_key] += 1 def merge(self, buffer, pbuffer): for k,v in pbuffer[0].items(): if k in buffer[0]: buffer[0][k] += v else: buffer[0][k] = v def terminate(self, buffer): _dict = buffer[0] if len(_dict.keys())>20: return 0 _whole = _dict.get("whole",-1) list_v = [] _empty = _dict.get("None",-1) for k,v in _dict.items(): if k=="None": continue list_v.append(v) _max = max(list_v) if (_max+_empty)/_whole>0.9 and _max/_whole>0.4: return 1 return 0