|
@@ -374,6 +374,22 @@ class decare_document(BaseUDTF):
|
|
new_json_set_docid.append(_item2)
|
|
new_json_set_docid.append(_item2)
|
|
self.forward(_doc1["id"],_doc2["id"],json.dumps(new_json_set_docid))
|
|
self.forward(_doc1["id"],_doc2["id"],json.dumps(new_json_set_docid))
|
|
|
|
|
|
|
|
+def getBestDocid(list_pair):
|
|
|
|
+ list_pair.sort(key=lambda x:x[3],reverse=True)
|
|
|
|
+ _max_count = max(list_pair[0][3],list_pair[0][1])
|
|
|
|
+ set_candidate = set()
|
|
|
|
+ if list_pair[0][1]==_max_count:
|
|
|
|
+ set_candidate.add(list_pair[0][0])
|
|
|
|
+ for item in list_pair:
|
|
|
|
+ if item[3]==_max_count:
|
|
|
|
+ set_candidate.add(item[2])
|
|
|
|
+ else:
|
|
|
|
+ break
|
|
|
|
+ list_candidate = list(set_candidate)
|
|
|
|
+ list_candidate.sort(key=lambda x:x)
|
|
|
|
+ return list_candidate[0]
|
|
|
|
+
|
|
|
|
+
|
|
@annotate('bigint,bigint,bigint,bigint->string')
|
|
@annotate('bigint,bigint,bigint,bigint->string')
|
|
class choose_document(BaseUDAF):
|
|
class choose_document(BaseUDAF):
|
|
'''
|
|
'''
|
|
@@ -395,32 +411,15 @@ class choose_document(BaseUDAF):
|
|
|
|
|
|
def terminate(self, buffer):
|
|
def terminate(self, buffer):
|
|
list_pair = buffer[0]
|
|
list_pair = buffer[0]
|
|
- list_pair.sort(key=lambda x:x[3],reverse=True)
|
|
|
|
- _max_count = list_pair[0][3]
|
|
|
|
- save_flag = 0
|
|
|
|
- list_dumplicate = []
|
|
|
|
_set = set()
|
|
_set = set()
|
|
for item in buffer[0]:
|
|
for item in buffer[0]:
|
|
_set.add(str(item[2]))
|
|
_set.add(str(item[2]))
|
|
- #不包含这条公告
|
|
|
|
- # _set.add(list_pair[0][0])
|
|
|
|
- if list_pair[0][1]>_max_count:
|
|
|
|
|
|
+ list_dumplicate = list(_set)
|
|
|
|
+ best_docid = getBestDocid(list_pair)
|
|
|
|
+ if best_docid==list_pair[0][0]:
|
|
save_flag = 1
|
|
save_flag = 1
|
|
- # _set.remove(list_pair[0][0])
|
|
|
|
- list_dumplicate = list(_set)
|
|
|
|
else:
|
|
else:
|
|
- if list_pair[0][1]<_max_count:
|
|
|
|
- save_flag = 0
|
|
|
|
- else:
|
|
|
|
- less_docid = list_pair[0][0]
|
|
|
|
- for item in list_pair:
|
|
|
|
- if item[3]>=_max_count and item[2]<less_docid:
|
|
|
|
- less_docid = item[2]
|
|
|
|
- if less_docid==list_pair[0][0]:
|
|
|
|
- save_flag = 1
|
|
|
|
- else:
|
|
|
|
- save_flag = 0
|
|
|
|
- list_dumplicate = list(_set)
|
|
|
|
|
|
+ save_flag = 0
|
|
return json.dumps({"save_flag":save_flag,"dumplicates":list_dumplicate})
|
|
return json.dumps({"save_flag":save_flag,"dumplicates":list_dumplicate})
|
|
|
|
|
|
|
|
|
|
@@ -464,22 +463,11 @@ class group_document_bestFirst(BaseUDAF):
|
|
|
|
|
|
def terminate(self, buffer):
|
|
def terminate(self, buffer):
|
|
list_pair = buffer[0]
|
|
list_pair = buffer[0]
|
|
- list_pair.sort(key=lambda x:x[3],reverse=True)
|
|
|
|
- _max_count = list_pair[0][3]
|
|
|
|
- save_flag = 0
|
|
|
|
- list_dumplicate = []
|
|
|
|
_set = set()
|
|
_set = set()
|
|
for item in buffer[0]:
|
|
for item in buffer[0]:
|
|
_set.add(item[2])
|
|
_set.add(item[2])
|
|
_set.add(list_pair[0][0])
|
|
_set.add(list_pair[0][0])
|
|
- best_docid = None
|
|
|
|
- if list_pair[0][1]>_max_count:
|
|
|
|
- best_docid = list_pair[0][0]
|
|
|
|
- else:
|
|
|
|
- best_docid = list_pair[0][2]
|
|
|
|
- for item in list_pair:
|
|
|
|
- if item[3]>=_max_count and item[2]<best_docid:
|
|
|
|
- best_docid = item[2]
|
|
|
|
|
|
+ best_docid = getBestDocid(list_pair)
|
|
_set.remove(best_docid)
|
|
_set.remove(best_docid)
|
|
list_dumplicate = list(_set)
|
|
list_dumplicate = list(_set)
|
|
list_dumplicate.sort(key=lambda x:x)
|
|
list_dumplicate.sort(key=lambda x:x)
|
|
@@ -616,3 +604,93 @@ class get_count_dump(object):
|
|
_count = len(title.split(","))
|
|
_count = len(title.split(","))
|
|
return _count
|
|
return _count
|
|
|
|
|
|
|
|
+def getSet(list_dict,key):
|
|
|
|
+ _set = set()
|
|
|
|
+ for item in list_dict:
|
|
|
|
+ if key in item:
|
|
|
|
+ if item[key]!='' and item[key] is not None:
|
|
|
|
+ if re.search("^\d[\d\.]*$",item[key]) is not None:
|
|
|
|
+ _set.add(str(float(item[key])))
|
|
|
|
+ else:
|
|
|
|
+ _set.add(str(item[key]))
|
|
|
|
+ return _set
|
|
|
|
+
|
|
|
|
+@annotate('bigint,string -> bigint,bigint')
|
|
|
|
+class f_getGroup_dumpFinal(BaseUDTF):
|
|
|
|
+ '''
|
|
|
|
+ 从最后的结果中获取组
|
|
|
|
+ '''
|
|
|
|
+
|
|
|
|
+ def __init__(self):
|
|
|
|
+ import logging
|
|
|
|
+ import json
|
|
|
|
+ global json,logging
|
|
|
|
+ logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
|
+
|
|
|
|
+ def process(self,docid,dumplicates):
|
|
|
|
+ self.forward(int(docid),int(docid))
|
|
|
|
+ if dumplicates is not None:
|
|
|
|
+ list_docids = dumplicates.split(",")
|
|
|
|
+ for _docid in list_docids:
|
|
|
|
+ self.forward(int(docid),int(_docid))
|
|
|
|
+
|
|
|
|
+@annotate('bigint,bigint,string,string,string,string,bigint,bigint->string')
|
|
|
|
+class f_redump_limit_num(BaseUDAF):
|
|
|
|
+ '''
|
|
|
|
+ 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
|
|
|
|
+ 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值
|
|
|
|
+ '''
|
|
|
|
+ def __init__(self):
|
|
|
|
+ import logging
|
|
|
|
+ import json,re
|
|
|
|
+ global json,logging,re
|
|
|
|
+ logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
|
+
|
|
|
|
+ def new_buffer(self):
|
|
|
|
+ return [list()]
|
|
|
|
+
|
|
|
|
+ def iterate(self, buffer,main_docid,docid,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,extract_count1,extract_count2):
|
|
|
|
+ buffer[0].append({"main_docid":main_docid,"docid":docid,"set_limit_column1":set_limit_column1,"set_limit_column2":set_limit_column2,
|
|
|
|
+ "set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,"extract_count1":extract_count1,"extract_count2":extract_count2})
|
|
|
|
+
|
|
|
|
+ def merge(self, buffer, pbuffer):
|
|
|
|
+ buffer[0].extend(pbuffer[0])
|
|
|
|
+
|
|
|
|
+ def terminate(self, buffer):
|
|
|
|
+ list_group = []
|
|
|
|
+ the_group = buffer[0]
|
|
|
|
+ if len(the_group)>5:
|
|
|
|
+ keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
|
|
|
|
+ else:
|
|
|
|
+ keys = ["set_limit_column2","set_limit_column3","set_limit_column4"]
|
|
|
|
+ stay = True
|
|
|
|
+ for _key in keys:
|
|
|
|
+ if len(getSet(the_group,_key))>1:
|
|
|
|
+ stay = False
|
|
|
|
+ break
|
|
|
|
+ final_group = []
|
|
|
|
+ if stay:
|
|
|
|
+ main_docid = the_group[0]["main_docid"]
|
|
|
|
+ for item in the_group:
|
|
|
|
+ if item["docid"]!=main_docid:
|
|
|
|
+ final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"]})
|
|
|
|
+
|
|
|
|
+ return json.dumps(final_group)
|
|
|
|
+
|
|
|
|
+@annotate('string -> bigint,bigint,bigint,bigint')
|
|
|
|
+class f_get_dumpFinal_checked(BaseUDTF):
|
|
|
|
+ '''
|
|
|
|
+ 从最后的结果中获取组
|
|
|
|
+ '''
|
|
|
|
+
|
|
|
|
+ def __init__(self):
|
|
|
|
+ import logging
|
|
|
|
+ import json
|
|
|
|
+ global json,logging
|
|
|
|
+ logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
|
+
|
|
|
|
+ def process(self,list_group):
|
|
|
|
+ if list_group is not None:
|
|
|
|
+ final_group = json.loads(list_group)
|
|
|
|
+ for _group in final_group:
|
|
|
|
+ self.forward(_group["docid1"],_group["docid2"],_group["extract_count1"],_group["extract_count2"])
|