#coding:UTF8 from odps.udf import annotate from odps.udf import BaseUDAF from odps.udf import BaseUDTF import re import time import json import logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') @annotate('string->string') class f_splitProduct(BaseUDTF): def process(self,product): if product is None: return for str_p in product.split(","): self.forward(str_p) def getTimeStamp(str_time): try: if str_time is not None and re.search("\d{4}\-\d{2}\-\d{2}.*",str_time) is not None: timeArray = time.strptime(str_time[:10], "%Y-%m-%d") timeStamp = int(time.mktime(timeArray)) return timeStamp else: return 0 except Exception as e: return 0 @annotate('string->string') class f_groupproduct(BaseUDAF): def new_buffer(self): return [[]] def iterate(self,buffer, page_time): timestamp = getTimeStamp(page_time) if timestamp>0: _set = set(buffer[0]) _set.add(timestamp) _list = list(_set) _list.sort(key=lambda x:x,reverse=True) buffer[0] = _list[:10000] def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) _set = set(buffer[0]) _list = list(_set) _list.sort(key=lambda x:x,reverse=True) buffer[0] = _list[:10000] def terminate(self, buffer): return json.dumps(buffer[0],ensure_ascii=False) @annotate('bigint->string') class f_groupdocid(BaseUDAF): def new_buffer(self): return [[]] def iterate(self,buffer, docid): buffer[0].append(docid) buffer[0] = buffer[0][:10000] def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) buffer[0] = buffer[0][:10000] def terminate(self, buffer): return json.dumps(buffer[0],ensure_ascii=False) def clusterTimestamp(aint_timestamp,distance = 28*24*60*60): def updateCenter(_c,_t): _center = _c["center"] _c["center"] = (_center*(len(_c["timestamp"])-1)+_t)//len(_c["timestamp"]) aint_timestamp.sort(key=lambda x:x,reverse=True) adict_cluster = [] for _t in aint_timestamp: _find = False for _c in adict_cluster: _center = _c["center"] if abs(_t-_center)int_maxD: int_maxD = _d if len(aint_dis)>0: int_avgD = int(sum(aint_dis)/len(aint_dis)) int_minD = min(aint_dis) int_maxD = max(aint_dis) for _d in aint_dis: aint_gre = [int(a>=_d) for a in aint_dis] if sum(aint_gre)/len(aint_gre)>0.5 and (int_maxD-_d)/int_avgD<0.5: cluster_d = _d return aint_dis,int_avgD,int_minD,int_maxD,cluster_d def getPeriod(aint_timestamp): aint_center = clusterTimestamp(aint_timestamp,distance=29*24*60*60)#聚类 aint_dis,int_avgD,int_minD,int_maxD,cluster_d = getDistanceOfCluster(aint_center) if cluster_d is not None: aint_center = clusterTimestamp(aint_center,distance=(cluster_d-1)*24*60*60) aint_dis,int_avgD,int_minD,int_maxD,cluster_d = getDistanceOfCluster(aint_center) _prob = 0 last_time = time.strftime('%Y-%m-%d',time.localtime(max(aint_center))) if len(aint_dis)>=2 and (max(aint_center)-min(aint_center))>365*24*60*60: flt_powD = 0 for int_d in aint_dis: flt_powD += (int_d-int_avgD)**2 base_prob = 0.99 if len(aint_dis)<4: base_prob = 0.8 elif len(aint_dis)<6: base_prob = 0.9 _prob = round(base_prob-(flt_powD/len(aint_dis)/int_avgD**2),4) # if flt_powD/len(aint_dis)<30: if _prob>0.3: return last_time,_prob,int(int_avgD),int(int_minD),int(int_maxD),len(aint_dis) return None,_prob,None,None,None,None @annotate('string->string,double,bigint,bigint,bigint,bigint') class f_getProductCycle(BaseUDTF): def process(self,json_timestamp): if json_timestamp is None: return aint_timestamp = json.loads(json_timestamp) # aint_timestamp.sort(key=lambda x:x,reverse=True) # aint_center = aint_timestamp last_time,_prob,int_avgD,int_minD,int_maxD,_periods = getPeriod(aint_timestamp) if int_avgD is not None: self.forward(last_time,_prob,int_avgD,int_minD,int_maxD,_periods) @annotate('string->string') class f_getTendererCompany(BaseUDTF): def process(self,sub_docs_json): if sub_docs_json is None: return sub_docs = json.loads(sub_docs_json) for _doc in sub_docs: _win = _doc.get("win_tenderer") if _win is not None: self.forward(_win) _second = _doc.get("second_tenderer") if _second is not None: self.forward(_second) _third = _doc.get("third_tenderer") if _third is not None: self.forward(_third) @annotate('string,bigint->string') class f_concatstr(BaseUDAF): def new_buffer(self): return [[]] def iterate(self,buffer, _str,signal): self.signal = signal if _str is not None and _str!="": buffer[0].append(str(_str)) buffer[0] = buffer[0][:10000] def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) buffer[0] = buffer[0][:10000] def terminate(self, buffer): _s = ",".join(buffer[0]) _s1 = set(_s.split(",")) if "" in _s1: _s1.remove("") return ",".join(list(_s1))