|
@@ -0,0 +1,102 @@
|
|
|
+#coding:UTF8
|
|
|
+
|
|
|
+from odps.udf import annotate
|
|
|
+from odps.udf import BaseUDAF
|
|
|
+from odps.udf import BaseUDTF
|
|
|
+import re
|
|
|
+import time
|
|
|
+import json
|
|
|
+import logging
|
|
|
+logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
+import math
|
|
|
+
|
|
|
+@annotate('string->string')
|
|
|
+class f_splitProduct(BaseUDTF):
|
|
|
+
|
|
|
+ def process(self,product):
|
|
|
+ if product is None:
|
|
|
+ return
|
|
|
+ for str_p in product.split(","):
|
|
|
+ self.forward(str_p)
|
|
|
+
|
|
|
+def getTimeStamp(str_time):
|
|
|
+ try:
|
|
|
+ if str_time is not None and re.search("\d{4}\-\d{2}\-\d{2}.*",str_time) is not None:
|
|
|
+ timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
|
|
|
+ timeStamp = int(time.mktime(timeArray))
|
|
|
+ return timeStamp
|
|
|
+ else:
|
|
|
+ return 0
|
|
|
+ except Exception as e:
|
|
|
+ return 0
|
|
|
+
|
|
|
+@annotate('string->string')
|
|
|
+class f_groupproduct(BaseUDAF):
|
|
|
+
|
|
|
+ def new_buffer(self):
|
|
|
+ return [[]]
|
|
|
+
|
|
|
+ def iterate(self,buffer, page_time):
|
|
|
+ timestamp = getTimeStamp(page_time)
|
|
|
+ if timestamp>0:
|
|
|
+ _set = set(buffer[0])
|
|
|
+ _set.add(timestamp)
|
|
|
+ _list = list(_set)
|
|
|
+ _list.sort(key=lambda x:x,reverse=True)
|
|
|
+ buffer[0] = _list[:10000]
|
|
|
+
|
|
|
+
|
|
|
+ def merge(self, buffer, pbuffer):
|
|
|
+ buffer[0].extend(pbuffer[0])
|
|
|
+ _set = set(buffer[0])
|
|
|
+ _list = list(_set)
|
|
|
+ _list.sort(key=lambda x:x,reverse=True)
|
|
|
+ buffer[0] = _list[:10000]
|
|
|
+
|
|
|
+ def terminate(self, buffer):
|
|
|
+ return json.dumps(buffer[0],ensure_ascii=False)
|
|
|
+
|
|
|
+@annotate('string->bigint')
|
|
|
+class f_isdistinct(BaseUDAF):
|
|
|
+
|
|
|
+ def new_buffer(self):
|
|
|
+ return [{}]
|
|
|
+
|
|
|
+ def iterate(self,buffer, tenderee):
|
|
|
+ if len(buffer[0].keys())>20:
|
|
|
+ return
|
|
|
+ _key = tenderee
|
|
|
+ if tenderee is None or tenderee=="":
|
|
|
+ _key = "None"
|
|
|
+ if _key not in buffer[0]:
|
|
|
+ buffer[0][_key] = 0
|
|
|
+ buffer[0][_key] += 1
|
|
|
+ _key = "whole"
|
|
|
+ if _key not in buffer[0]:
|
|
|
+ buffer[0][_key] = 0
|
|
|
+ buffer[0][_key] += 1
|
|
|
+
|
|
|
+
|
|
|
+ def merge(self, buffer, pbuffer):
|
|
|
+ for k,v in pbuffer[0].items():
|
|
|
+ if k in buffer[0]:
|
|
|
+ buffer[0][k] += v
|
|
|
+ else:
|
|
|
+ buffer[0][k] = v
|
|
|
+
|
|
|
+ def terminate(self, buffer):
|
|
|
+ _dict = buffer[0]
|
|
|
+ if len(_dict.keys())>20:
|
|
|
+ return 0
|
|
|
+ _whole = _dict.get("whole",-1)
|
|
|
+ list_v = []
|
|
|
+ _empty = _dict.get("None",-1)
|
|
|
+ for k,v in _dict.items():
|
|
|
+ if k=="None":
|
|
|
+ continue
|
|
|
+ list_v.append(v)
|
|
|
+ _max = max(list_v)
|
|
|
+ if (_max+_empty)/_whole>0.9 and _max/_whole>0.4:
|
|
|
+ return 1
|
|
|
+
|
|
|
+ return 0
|