#coding:UTF8 from odps.udf import annotate from odps.udf import BaseUDAF from odps.udf import BaseUDTF import re import time import json import logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') import math @annotate('string->string') class f_splitProduct(BaseUDTF): def process(self,product): if product is None: return for str_p in product.split(","): self.forward(str_p) def getTimeStamp(str_time): try: if str_time is not None and re.search("\d{4}\-\d{2}\-\d{2}.*",str_time) is not None: timeArray = time.strptime(str_time[:10], "%Y-%m-%d") timeStamp = int(time.mktime(timeArray)) return timeStamp else: return 0 except Exception as e: return 0 @annotate('string->string') class f_groupproduct(BaseUDAF): def new_buffer(self): return [[]] def iterate(self,buffer, page_time): timestamp = getTimeStamp(page_time) if timestamp>0: _set = set(buffer[0]) _set.add(timestamp) _list = list(_set) _list.sort(key=lambda x:x,reverse=True) buffer[0] = _list[:10000] def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) _set = set(buffer[0]) _list = list(_set) _list.sort(key=lambda x:x,reverse=True) buffer[0] = _list[:10000] def terminate(self, buffer): return json.dumps(buffer[0],ensure_ascii=False) @annotate('bigint->string') class f_groupdocid(BaseUDAF): def new_buffer(self): return [[]] def iterate(self,buffer, docid): buffer[0].append(docid) buffer[0] = buffer[0][:10000] def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) buffer[0] = buffer[0][:10000] def terminate(self, buffer): return json.dumps(buffer[0],ensure_ascii=False) def clusterTimestamp(aint_timestamp,distance = 28*24*60*60): def updateCenter(_c,_t): _center = _c["center"] _c["center"] = (_center*(len(_c["timestamp"])-1)+_t)//len(_c["timestamp"]) aint_timestamp.sort(key=lambda x:x,reverse=True) adict_cluster = [] for _t in aint_timestamp: _find = False for _c in adict_cluster: _center = _c["center"] if abs(_t-_center)int_maxD: int_maxD = _d if len(aint_dis)>0: # int_avgD = int(sum(aint_dis)/len(aint_dis)) int_avgD = getAvgD(aint_dis) int_minD = min(aint_dis) int_maxD = max(aint_dis) if abs(int_maxD-int_avgD)>abs(int_minD-int_avgD): int_minD = min([int_avgD,int_minD]) int_maxD = max([int_avgD,int_minD]) else: int_minD = min([int_avgD,int_maxD]) int_maxD = max([int_avgD,int_maxD]) int_avgD = (int_minD+int_maxD)//2 # for _d in aint_dis: # aint_gre = [int(a>=_d) for a in aint_dis] # if sum(aint_gre)/len(aint_gre)>0.5 and (int_maxD-_d)/int_avgD<0.5: # cluster_d = _d return aint_dis,int_avgD,int_minD,int_maxD,cluster_d def getPeriod(aint_timestamp): aint_center = clusterTimestamp(aint_timestamp,distance=29*24*60*60)#聚类 aint_dis,int_avgD,int_minD,int_maxD,cluster_d = getDistanceOfCluster(aint_center) if cluster_d is not None: aint_center = clusterTimestamp(aint_center,distance=(cluster_d-1)*24*60*60) aint_dis,int_avgD,int_minD,int_maxD,cluster_d = getDistanceOfCluster(aint_center) _prob = 0 last_time = time.strftime('%Y-%m-%d',time.localtime(max(aint_center))) if len(aint_dis)>=2 and (max(aint_center)-min(aint_center))>365*24*60*60: flt_powD = 0 for int_d in aint_dis: flt_powD += (int_d-int_avgD)**2 base_prob = 0.99 if len(aint_dis)<4: base_prob = 0.8 elif len(aint_dis)<6: base_prob = 0.9 _prob = round(base_prob-(flt_powD/len(aint_dis)/int_avgD**2),4) # if flt_powD/len(aint_dis)<30: if _prob>0.5 and int_maxD-int_minD<=70: return last_time,_prob,int(int_avgD),int(int_minD),int(int_maxD),len(aint_dis) return None,_prob,None,None,None,None def timeAdd(_time,days): a = time.mktime(time.strptime(_time,'%Y-%m-%d'))+86400*days _time1 = time.strftime("%Y-%m-%d",time.localtime(a)) return _time1 @annotate('string->string,string,string,double,bigint,bigint,bigint,bigint') class f_getProductCycle(BaseUDTF): def process(self,json_timestamp): if json_timestamp is None: return aint_timestamp = json.loads(json_timestamp) # aint_timestamp.sort(key=lambda x:x,reverse=True) # aint_center = aint_timestamp last_time,_prob,int_avgD,int_minD,int_maxD,_periods = getPeriod(aint_timestamp) if int_avgD is not None: may_begin = timeAdd(last_time,int_minD) may_end = timeAdd(last_time,int_maxD) self.forward(may_begin,may_end,last_time,_prob,int_avgD,int_minD,int_maxD,_periods) @annotate('string->string') class f_getTendererCompany(BaseUDTF): def process(self,sub_docs_json): if sub_docs_json is None: return sub_docs = json.loads(sub_docs_json) for _doc in sub_docs: _win = _doc.get("win_tenderer") if _win is not None: self.forward(_win) _second = _doc.get("second_tenderer") if _second is not None: self.forward(_second) _third = _doc.get("third_tenderer") if _third is not None: self.forward(_third) @annotate('string,bigint->string') class f_concatstr(BaseUDAF): def new_buffer(self): return [[]] def iterate(self,buffer, _str,signal): self.signal = signal if _str is not None and _str!="": buffer[0].append(str(_str)) buffer[0] = buffer[0][:10000] def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) buffer[0] = buffer[0][:10000] def terminate(self, buffer): _s = ",".join(buffer[0]) _s1 = set(_s.split(",")) if "" in _s1: _s1.remove("") return ",".join(list(_s1)) @annotate('string,bigint->string') class f_getLastProjectUuid(BaseUDAF): def new_buffer(self): return [[]] def iterate(self,buffer, _uuid,page_timestamp): buffer[0].append({"uuid":_uuid,"timestamp":page_timestamp}) buffer[0] = buffer[0][:10000] def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) buffer[0] = buffer[0][:10000] def terminate(self, buffer): if len(buffer[0])>0: buffer[0].sort(key=lambda x:x["timestamp"],reverse=True) return buffer[0][0]["uuid"] return None @annotate('string->string') class f_groupJsonStr(BaseUDAF): def new_buffer(self): return [[]] def iterate(self,buffer, _str): buffer[0].append(_str) buffer[0] = buffer[0][:10000] def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) buffer[0] = buffer[0][:10000] def terminate(self, buffer): return json.dumps(list(set(buffer[0])),ensure_ascii=False) @annotate('bigint,string,string->string,string,string,string,string,double,string') class f_extractDemand(BaseUDTF): def getProduct(self,tenderee,_product,project_name): if len(_product)>0: _product.sort(key=lambda x:len(x),reverse=True) return _product[0] else: product = str(project_name).replace(tenderee,"") product = re.sub(".*公司|项目|采购","",product) return product def formatTime(self,_date): if _date is not None: _d = _date.split("-") if len(_d)==3: return "%s-%s-%s"%(_d[0].rjust(4,"2"),_d[1].rjust(2,"0"),_d[2].rjust(2,"0")) def process(self,docid,tenderee,json_demand_info): if json_demand_info is None: return demand_info = json.loads(json_demand_info) for _line in demand_info["data"]: try: _product = _line.get("product",[]) order_end = _line.get("order_end") order_end = self.formatTime(order_end) project_name = _line.get("project_name") demand = _line.get("demand") budget = _line.get("budget") if budget is not None and len(budget)>0: budget = float(budget) order_begin = _line.get("order_begin") order_begin = self.formatTime(order_begin) if order_begin is None or order_end is None: continue product = self.getProduct(tenderee,_product,project_name) json_docids = json.dumps([str(docid)]) self.forward(product,order_begin,order_end,demand,project_name,budget,json_docids) except Exception as e: logging.info("============error:%s"%(str(e)))