123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- #coding:UTF8
- from odps.udf import annotate
- from odps.udf import BaseUDAF
- from odps.udf import BaseUDTF
- import re
- import time
- import json
- import logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- import math
- @annotate('string->string')
- class f_splitProduct(BaseUDTF):
- def process(self,product):
- if product is None:
- return
- for str_p in product.split(","):
- self.forward(str_p)
- def getTimeStamp(str_time):
- try:
- if str_time is not None and re.search("\d{4}\-\d{2}\-\d{2}.*",str_time) is not None:
- timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
- timeStamp = int(time.mktime(timeArray))
- return timeStamp
- else:
- return 0
- except Exception as e:
- return 0
- @annotate('string->string')
- class f_groupproduct(BaseUDAF):
- def new_buffer(self):
- return [[]]
- def iterate(self,buffer, page_time):
- timestamp = getTimeStamp(page_time)
- if timestamp>0:
- _set = set(buffer[0])
- _set.add(timestamp)
- _list = list(_set)
- _list.sort(key=lambda x:x,reverse=True)
- buffer[0] = _list[:10000]
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- _set = set(buffer[0])
- _list = list(_set)
- _list.sort(key=lambda x:x,reverse=True)
- buffer[0] = _list[:10000]
- def terminate(self, buffer):
- return json.dumps(buffer[0],ensure_ascii=False)
- @annotate('bigint->string')
- class f_groupdocid(BaseUDAF):
- def new_buffer(self):
- return [[]]
- def iterate(self,buffer, docid):
- buffer[0].append(docid)
- buffer[0] = buffer[0][:10000]
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- buffer[0] = buffer[0][:10000]
- def terminate(self, buffer):
- return json.dumps(buffer[0],ensure_ascii=False)
- def clusterTimestamp(aint_timestamp,distance = 28*24*60*60):
- def updateCenter(_c,_t):
- _center = _c["center"]
- _c["center"] = (_center*(len(_c["timestamp"])-1)+_t)//len(_c["timestamp"])
- aint_timestamp.sort(key=lambda x:x,reverse=True)
- adict_cluster = []
- for _t in aint_timestamp:
- _find = False
- for _c in adict_cluster:
- _center = _c["center"]
- if abs(_t-_center)<distance:
- _find = True
- _c["timestamp"].append(_t)
- updateCenter(_c,_t)
- break
- if not _find:
- _c = {"timestamp":[_t],"center":_t}
- adict_cluster.append(_c)
- aint_center = []
- for _c in adict_cluster:
- aint_center.append(_c["center"])
- return aint_center
- def getAvgD(aint_dis):
- if len(aint_dis)==0:
- return 0
- avg_dis = 1
- int_avgD = int(sum(aint_dis)/len(aint_dis))
- new_aint_dis = [a for a in aint_dis]
- print(sum(aint_dis)/len(aint_dis))
- min_pow = 10000000
- min_dis = min(aint_dis)
- for _dis in range(min(aint_dis),max(aint_dis)+1):
- pow_x = 0
- for _d in new_aint_dis:
- pow_x += math.sqrt(abs((_d-_dis)))
- print(_dis,pow_x)
- if pow_x<min_pow:
- min_pow = pow_x
- min_dis = _dis
- return min_dis
- def getDistanceOfCluster(aint_center):
- aint_center.sort(key=lambda x:x)
- aint_dis = []
- int_avgD = 0
- int_minD = 1000
- int_maxD = 0
- cluster_d = None
- #计算平均间隔,最大间隔,最小间隔
- for int_c in range(1,len(aint_center)):
- int_after = aint_center[int_c]//(24*60*60)
- int_before = aint_center[int_c-1]//(24*60*60)
- _d = abs(int_after-int_before)
- if _d==0:
- continue
- aint_dis.append(_d)
- if _d<int_minD:
- int_minD = _d
- if _d>int_maxD:
- int_maxD = _d
- if len(aint_dis)>0:
- # int_avgD = int(sum(aint_dis)/len(aint_dis))
- int_avgD = getAvgD(aint_dis)
- int_minD = min(aint_dis)
- int_maxD = max(aint_dis)
- if abs(int_maxD-int_avgD)>abs(int_minD-int_avgD):
- int_minD = min([int_avgD,int_minD])
- int_maxD = max([int_avgD,int_minD])
- else:
- int_minD = min([int_avgD,int_maxD])
- int_maxD = max([int_avgD,int_maxD])
- int_avgD = (int_minD+int_maxD)//2
- # for _d in aint_dis:
- # aint_gre = [int(a>=_d) for a in aint_dis]
- # if sum(aint_gre)/len(aint_gre)>0.5 and (int_maxD-_d)/int_avgD<0.5:
- # cluster_d = _d
- return aint_dis,int_avgD,int_minD,int_maxD,cluster_d
- def getPeriod(aint_timestamp):
- aint_center = clusterTimestamp(aint_timestamp,distance=29*24*60*60)#聚类
- aint_dis,int_avgD,int_minD,int_maxD,cluster_d = getDistanceOfCluster(aint_center)
- if cluster_d is not None:
- aint_center = clusterTimestamp(aint_center,distance=(cluster_d-1)*24*60*60)
- aint_dis,int_avgD,int_minD,int_maxD,cluster_d = getDistanceOfCluster(aint_center)
- _prob = 0
- last_time = time.strftime('%Y-%m-%d',time.localtime(max(aint_center)))
- if len(aint_dis)>=2 and (max(aint_center)-min(aint_center))>365*24*60*60:
- flt_powD = 0
- for int_d in aint_dis:
- flt_powD += (int_d-int_avgD)**2
- base_prob = 0.99
- if len(aint_dis)<4:
- base_prob = 0.8
- elif len(aint_dis)<6:
- base_prob = 0.9
- _prob = round(base_prob-(flt_powD/len(aint_dis)/int_avgD**2),4)
- # if flt_powD/len(aint_dis)<30:
- if _prob>0.5 and int_maxD-int_minD<=70:
- return last_time,_prob,int(int_avgD),int(int_minD),int(int_maxD),len(aint_dis)
- return None,_prob,None,None,None,None
- def timeAdd(_time,days):
- a = time.mktime(time.strptime(_time,'%Y-%m-%d'))+86400*days
- _time1 = time.strftime("%Y-%m-%d",time.localtime(a))
- return _time1
- @annotate('string->string,string,string,double,bigint,bigint,bigint,bigint')
- class f_getProductCycle(BaseUDTF):
- def process(self,json_timestamp):
- if json_timestamp is None:
- return
- aint_timestamp = json.loads(json_timestamp)
- # aint_timestamp.sort(key=lambda x:x,reverse=True)
- # aint_center = aint_timestamp
- last_time,_prob,int_avgD,int_minD,int_maxD,_periods = getPeriod(aint_timestamp)
- if int_avgD is not None:
- may_begin = timeAdd(last_time,int_minD)
- may_end = timeAdd(last_time,int_maxD)
- self.forward(may_begin,may_end,last_time,_prob,int_avgD,int_minD,int_maxD,_periods)
- @annotate('string->string')
- class f_getTendererCompany(BaseUDTF):
- def process(self,sub_docs_json):
- if sub_docs_json is None:
- return
- sub_docs = json.loads(sub_docs_json)
- for _doc in sub_docs:
- _win = _doc.get("win_tenderer")
- if _win is not None:
- self.forward(_win)
- _second = _doc.get("second_tenderer")
- if _second is not None:
- self.forward(_second)
- _third = _doc.get("third_tenderer")
- if _third is not None:
- self.forward(_third)
- @annotate('string,bigint->string')
- class f_concatstr(BaseUDAF):
- def new_buffer(self):
- return [[]]
- def iterate(self,buffer, _str,signal):
- self.signal = signal
- if _str is not None and _str!="":
- buffer[0].append(str(_str))
- buffer[0] = buffer[0][:10000]
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- buffer[0] = buffer[0][:10000]
- def terminate(self, buffer):
- _s = ",".join(buffer[0])
- _s1 = set(_s.split(","))
- if "" in _s1:
- _s1.remove("")
- return ",".join(list(_s1))
- @annotate('string,bigint->string')
- class f_getLastProjectUuid(BaseUDAF):
- def new_buffer(self):
- return [[]]
- def iterate(self,buffer, _uuid,page_timestamp):
- buffer[0].append({"uuid":_uuid,"timestamp":page_timestamp})
- buffer[0] = buffer[0][:10000]
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- buffer[0] = buffer[0][:10000]
- def terminate(self, buffer):
- if len(buffer[0])>0:
- buffer[0].sort(key=lambda x:x["timestamp"],reverse=True)
- return buffer[0][0]["uuid"]
- return None
- @annotate('string->string')
- class f_groupJsonStr(BaseUDAF):
- def new_buffer(self):
- return [[]]
- def iterate(self,buffer, _str):
- buffer[0].append(_str)
- buffer[0] = buffer[0][:10000]
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- buffer[0] = buffer[0][:10000]
- def terminate(self, buffer):
- return json.dumps(list(set(buffer[0])),ensure_ascii=False)
- @annotate('bigint,string,string->string,string,string,string,string,double,string')
- class f_extractDemand(BaseUDTF):
- def getProduct(self,tenderee,_product,project_name):
- if len(_product)>0:
- _product.sort(key=lambda x:len(x),reverse=True)
- return _product[0]
- else:
- product = str(project_name).replace(tenderee,"")
- product = re.sub(".*公司|项目|采购","",product)
- return product
- def formatTime(self,_date):
- if _date is not None:
- _d = _date.split("-")
- if len(_d)==3:
- return "%s-%s-%s"%(_d[0].rjust(4,"2"),_d[1].rjust(2,"0"),_d[2].rjust(2,"0"))
- def process(self,docid,tenderee,json_demand_info):
- if json_demand_info is None:
- return
- demand_info = json.loads(json_demand_info)
- for _line in demand_info["data"]:
- try:
- _product = _line.get("product",[])
- order_end = _line.get("order_end")
- order_end = self.formatTime(order_end)
- project_name = _line.get("project_name")
- demand = _line.get("demand")
- budget = _line.get("budget")
- if budget is not None and len(budget)>0:
- budget = float(budget)
- order_begin = _line.get("order_begin")
- order_begin = self.formatTime(order_begin)
- if order_begin is None or order_end is None:
- continue
- product = self.getProduct(tenderee,_product,project_name)
- json_docids = json.dumps([str(docid)])
- self.forward(product,order_begin,order_end,demand,project_name,budget,json_docids)
- except Exception as e:
- logging.info("============error:%s"%(str(e)))
|