|
@@ -8,6 +8,7 @@ import time
|
|
|
import json
|
|
|
import logging
|
|
|
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
+import math
|
|
|
|
|
|
@annotate('string->string')
|
|
|
class f_splitProduct(BaseUDTF):
|
|
@@ -102,6 +103,29 @@ def clusterTimestamp(aint_timestamp,distance = 28*24*60*60):
|
|
|
|
|
|
return aint_center
|
|
|
|
|
|
+def getAvgD(aint_dis):
|
|
|
+ if len(aint_dis)==0:
|
|
|
+ return 0
|
|
|
+ avg_dis = 1
|
|
|
+ int_avgD = int(sum(aint_dis)/len(aint_dis))
|
|
|
+ new_aint_dis = [a for a in aint_dis]
|
|
|
+ print(sum(aint_dis)/len(aint_dis))
|
|
|
+ min_pow = 10000000
|
|
|
+ min_dis = min(aint_dis)
|
|
|
+
|
|
|
+ for _dis in range(min(aint_dis),max(aint_dis)+1):
|
|
|
+
|
|
|
+ pow_x = 0
|
|
|
+ for _d in new_aint_dis:
|
|
|
+ pow_x += math.sqrt(abs((_d-_dis)))
|
|
|
+ print(_dis,pow_x)
|
|
|
+ if pow_x<min_pow:
|
|
|
+ min_pow = pow_x
|
|
|
+ min_dis = _dis
|
|
|
+
|
|
|
+ return min_dis
|
|
|
+
|
|
|
+
|
|
|
def getDistanceOfCluster(aint_center):
|
|
|
aint_center.sort(key=lambda x:x)
|
|
|
aint_dis = []
|
|
@@ -122,13 +146,21 @@ def getDistanceOfCluster(aint_center):
|
|
|
if _d>int_maxD:
|
|
|
int_maxD = _d
|
|
|
if len(aint_dis)>0:
|
|
|
- int_avgD = int(sum(aint_dis)/len(aint_dis))
|
|
|
+ # int_avgD = int(sum(aint_dis)/len(aint_dis))
|
|
|
+ int_avgD = getAvgD(aint_dis)
|
|
|
int_minD = min(aint_dis)
|
|
|
int_maxD = max(aint_dis)
|
|
|
- for _d in aint_dis:
|
|
|
- aint_gre = [int(a>=_d) for a in aint_dis]
|
|
|
- if sum(aint_gre)/len(aint_gre)>0.5 and (int_maxD-_d)/int_avgD<0.5:
|
|
|
- cluster_d = _d
|
|
|
+ if abs(int_maxD-int_avgD)>abs(int_minD-int_avgD):
|
|
|
+ int_minD = min([int_avgD,int_minD])
|
|
|
+ int_maxD = max([int_avgD,int_minD])
|
|
|
+ else:
|
|
|
+ int_minD = min([int_avgD,int_maxD])
|
|
|
+ int_maxD = max([int_avgD,int_maxD])
|
|
|
+ int_avgD = (int_minD+int_maxD)//2
|
|
|
+ # for _d in aint_dis:
|
|
|
+ # aint_gre = [int(a>=_d) for a in aint_dis]
|
|
|
+ # if sum(aint_gre)/len(aint_gre)>0.5 and (int_maxD-_d)/int_avgD<0.5:
|
|
|
+ # cluster_d = _d
|
|
|
|
|
|
return aint_dis,int_avgD,int_minD,int_maxD,cluster_d
|
|
|
|
|
@@ -153,12 +185,17 @@ def getPeriod(aint_timestamp):
|
|
|
base_prob = 0.9
|
|
|
_prob = round(base_prob-(flt_powD/len(aint_dis)/int_avgD**2),4)
|
|
|
# if flt_powD/len(aint_dis)<30:
|
|
|
- if _prob>0.3:
|
|
|
+ if _prob>0.5 and int_maxD-int_minD<=70:
|
|
|
return last_time,_prob,int(int_avgD),int(int_minD),int(int_maxD),len(aint_dis)
|
|
|
return None,_prob,None,None,None,None
|
|
|
|
|
|
+def timeAdd(_time,days):
|
|
|
+ a = time.mktime(time.strptime(_time,'%Y-%m-%d'))+86400*days
|
|
|
+
|
|
|
+ _time1 = time.strftime("%Y-%m-%d",time.localtime(a))
|
|
|
+ return _time1
|
|
|
|
|
|
-@annotate('string->string,double,bigint,bigint,bigint,bigint')
|
|
|
+@annotate('string->string,string,string,double,bigint,bigint,bigint,bigint')
|
|
|
class f_getProductCycle(BaseUDTF):
|
|
|
|
|
|
def process(self,json_timestamp):
|
|
@@ -170,7 +207,9 @@ class f_getProductCycle(BaseUDTF):
|
|
|
# aint_center = aint_timestamp
|
|
|
last_time,_prob,int_avgD,int_minD,int_maxD,_periods = getPeriod(aint_timestamp)
|
|
|
if int_avgD is not None:
|
|
|
- self.forward(last_time,_prob,int_avgD,int_minD,int_maxD,_periods)
|
|
|
+ may_begin = timeAdd(last_time,int_minD)
|
|
|
+ may_end = timeAdd(last_time,int_maxD)
|
|
|
+ self.forward(may_begin,may_end,last_time,_prob,int_avgD,int_minD,int_maxD,_periods)
|
|
|
|
|
|
@annotate('string->string')
|
|
|
class f_getTendererCompany(BaseUDTF):
|
|
@@ -215,3 +254,86 @@ class f_concatstr(BaseUDAF):
|
|
|
if "" in _s1:
|
|
|
_s1.remove("")
|
|
|
return ",".join(list(_s1))
|
|
|
+
|
|
|
+@annotate('string,bigint->string')
|
|
|
+class f_getLastProjectUuid(BaseUDAF):
|
|
|
+
|
|
|
+ def new_buffer(self):
|
|
|
+ return [[]]
|
|
|
+
|
|
|
+ def iterate(self,buffer, _uuid,page_timestamp):
|
|
|
+ buffer[0].append({"uuid":_uuid,"timestamp":page_timestamp})
|
|
|
+ buffer[0] = buffer[0][:10000]
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def merge(self, buffer, pbuffer):
|
|
|
+ buffer[0].extend(pbuffer[0])
|
|
|
+ buffer[0] = buffer[0][:10000]
|
|
|
+
|
|
|
+ def terminate(self, buffer):
|
|
|
+ if len(buffer[0])>0:
|
|
|
+ buffer[0].sort(key=lambda x:x["timestamp"],reverse=True)
|
|
|
+ return buffer[0][0]["uuid"]
|
|
|
+ return None
|
|
|
+
|
|
|
+@annotate('string->string')
|
|
|
+class f_groupJsonStr(BaseUDAF):
|
|
|
+
|
|
|
+ def new_buffer(self):
|
|
|
+ return [[]]
|
|
|
+
|
|
|
+ def iterate(self,buffer, _str):
|
|
|
+ buffer[0].append(_str)
|
|
|
+ buffer[0] = buffer[0][:10000]
|
|
|
+
|
|
|
+
|
|
|
+ def merge(self, buffer, pbuffer):
|
|
|
+ buffer[0].extend(pbuffer[0])
|
|
|
+ buffer[0] = buffer[0][:10000]
|
|
|
+
|
|
|
+ def terminate(self, buffer):
|
|
|
+ return json.dumps(list(set(buffer[0])),ensure_ascii=False)
|
|
|
+
|
|
|
+@annotate('bigint,string,string->string,string,string,string,string,double,string')
|
|
|
+class f_extractDemand(BaseUDTF):
|
|
|
+
|
|
|
+ def getProduct(self,tenderee,_product,project_name):
|
|
|
+ if len(_product)>0:
|
|
|
+ _product.sort(key=lambda x:len(x),reverse=True)
|
|
|
+ return _product[0]
|
|
|
+ else:
|
|
|
+ product = str(project_name).replace(tenderee,"")
|
|
|
+ product = re.sub(".*公司|项目|采购","",product)
|
|
|
+ return product
|
|
|
+
|
|
|
+ def formatTime(self,_date):
|
|
|
+ if _date is not None:
|
|
|
+ _d = _date.split("-")
|
|
|
+ if len(_d)==3:
|
|
|
+ return "%s-%s-%s"%(_d[0].rjust(4,"2"),_d[1].rjust(2,"0"),_d[2].rjust(2,"0"))
|
|
|
+
|
|
|
+
|
|
|
+ def process(self,docid,tenderee,json_demand_info):
|
|
|
+ if json_demand_info is None:
|
|
|
+ return
|
|
|
+ demand_info = json.loads(json_demand_info)
|
|
|
+ for _line in demand_info["data"]:
|
|
|
+ try:
|
|
|
+ _product = _line.get("product",[])
|
|
|
+ order_end = _line.get("order_end")
|
|
|
+ order_end = self.formatTime(order_end)
|
|
|
+ project_name = _line.get("project_name")
|
|
|
+ demand = _line.get("demand")
|
|
|
+ budget = _line.get("budget")
|
|
|
+ if budget is not None and len(budget)>0:
|
|
|
+ budget = float(budget)
|
|
|
+ order_begin = _line.get("order_begin")
|
|
|
+ order_begin = self.formatTime(order_begin)
|
|
|
+ if order_begin is None or order_end is None:
|
|
|
+ continue
|
|
|
+ product = self.getProduct(tenderee,_product,project_name)
|
|
|
+ json_docids = json.dumps([str(docid)])
|
|
|
+ self.forward(product,order_begin,order_end,demand,project_name,budget,json_docids)
|
|
|
+ except Exception as e:
|
|
|
+ logging.info("============error:%s"%(str(e)))
|