123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- #coding:UTF8
- from odps.udf import annotate
- from odps.udf import BaseUDAF
- from odps.udf import BaseUDTF
- import re
- import time
- import json
- import logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- import math
- @annotate('string->string')
- class f_splitProduct(BaseUDTF):
- def process(self,product):
- if product is None:
- return
- for str_p in product.split(","):
- self.forward(str_p)
- def getTimeStamp(str_time):
- try:
- if str_time is not None and re.search("\d{4}\-\d{2}\-\d{2}.*",str_time) is not None:
- timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
- timeStamp = int(time.mktime(timeArray))
- return timeStamp
- else:
- return 0
- except Exception as e:
- return 0
- @annotate('string->string')
- class f_groupproduct(BaseUDAF):
- def new_buffer(self):
- return [[]]
- def iterate(self,buffer, page_time):
- timestamp = getTimeStamp(page_time)
- if timestamp>0:
- _set = set(buffer[0])
- _set.add(timestamp)
- _list = list(_set)
- _list.sort(key=lambda x:x,reverse=True)
- buffer[0] = _list[:10000]
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- _set = set(buffer[0])
- _list = list(_set)
- _list.sort(key=lambda x:x,reverse=True)
- buffer[0] = _list[:10000]
- def terminate(self, buffer):
- return json.dumps(buffer[0],ensure_ascii=False)
- @annotate('string->bigint')
- class f_isdistinct(BaseUDAF):
- def new_buffer(self):
- return [{}]
- def iterate(self,buffer, tenderee):
- if len(buffer[0].keys())>20:
- return
- _key = tenderee
- if tenderee is None or tenderee=="":
- _key = "None"
- if _key not in buffer[0]:
- buffer[0][_key] = 0
- buffer[0][_key] += 1
- _key = "whole"
- if _key not in buffer[0]:
- buffer[0][_key] = 0
- buffer[0][_key] += 1
- def merge(self, buffer, pbuffer):
- for k,v in pbuffer[0].items():
- if k in buffer[0]:
- buffer[0][k] += v
- else:
- buffer[0][k] = v
- def terminate(self, buffer):
- _dict = buffer[0]
- if len(_dict.keys())>20:
- return 0
- _whole = _dict.get("whole",-1)
- list_v = []
- _empty = _dict.get("None",-1)
- for k,v in _dict.items():
- if k=="None":
- continue
- list_v.append(v)
- _max = max(list_v)
- if (_max+_empty)/_whole>0.9 and _max/_whole>0.4:
- return 1
- return 0
|