|
@@ -350,6 +350,224 @@ class f_remege_limit_num_contain(BaseUDAF):
|
|
|
log(str(final_group))
|
|
|
return json.dumps(final_group)
|
|
|
|
|
|
+def getCurrent_date(format="%Y-%m-%d %H:%M:%S"):
|
|
|
+ _time = time.strftime(format,time.localtime())
|
|
|
+ return _time
|
|
|
+
|
|
|
+@annotate('bigint->string')
|
|
|
+class f_get_single_merged_bychannel(BaseUDTF):
|
|
|
+
|
|
|
+ def process(self,docid):
|
|
|
+ _d = {"data":{str(docid):[]},"process_time":getCurrent_date()}
|
|
|
+ self.forward(json.dumps(_d))
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+@annotate('bigint,bigint,bigint,string,string,string,string,string,string,string,bigint->string')
|
|
|
+class f_remege_limit_num_contain_bychannel(BaseUDAF):
|
|
|
+ '''
|
|
|
+ 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
|
|
|
+ '''
|
|
|
+ def __init__(self):
|
|
|
+ import logging
|
|
|
+ import json,re
|
|
|
+ global json,logging,re
|
|
|
+ logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
+
|
|
|
+ def new_buffer(self):
|
|
|
+ return [list()]
|
|
|
+
|
|
|
+ def iterate(self, buffer,docid,docchannel,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column1,contain_column2,notLike_column,confidence):
|
|
|
+ _dict = {"docid":docid,"docchannel":docchannel,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1,
|
|
|
+ "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
|
|
|
+ "contain_column1":contain_column1,"contain_column2":contain_column2,"notLike_column":notLike_column,"confidence":confidence}
|
|
|
+ _count = 0
|
|
|
+ for _,v in _dict.items():
|
|
|
+ if v is not None and str(v)!="":
|
|
|
+ _count += 1
|
|
|
+ _dict["extract_count"] = _count
|
|
|
+ buffer[0].append(_dict)
|
|
|
+
|
|
|
+ def merge(self, buffer, pbuffer):
|
|
|
+ buffer[0].extend(pbuffer[0])
|
|
|
+
|
|
|
+ def getNotLikeSet(self,_dict,column_name):
|
|
|
+ column_value = _dict.get(column_name,None)
|
|
|
+ _set = set()
|
|
|
+ if column_value is not None:
|
|
|
+ for _i in range(1,len(column_value)):
|
|
|
+ _set.add(column_value[_i-1:_i+1])
|
|
|
+ _dict["notLike_set"] = _set
|
|
|
+
|
|
|
+ def getSimilarity(self,_set1,_set2):
|
|
|
+ _sum = max([1,min([len(_set1),len(_set2)])])
|
|
|
+ return len(_set1&_set2)/_sum
|
|
|
+
|
|
|
+ def terminate(self, buffer):
|
|
|
+ list_group = []
|
|
|
+ the_group = buffer[0]
|
|
|
+
|
|
|
+ SIM_PROB = 0.6
|
|
|
+ for _d in the_group:
|
|
|
+ self.getNotLikeSet(_d,"notLike_column")
|
|
|
+
|
|
|
+ #判断多个值与否
|
|
|
+ keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
|
|
|
+ re_merge = False
|
|
|
+ for _key in keys:
|
|
|
+ if len(getSet(the_group,_key))>1:
|
|
|
+ re_merge = True
|
|
|
+ break
|
|
|
+ #判断是否相似而不相同
|
|
|
+ re_merge_sim = False
|
|
|
+ for _i1 in range(0,len(the_group)):
|
|
|
+ for _j1 in range(_i1+1,len(the_group)):
|
|
|
+ _set1 = the_group[_i1]["notLike_set"]
|
|
|
+ _set2 = the_group[_j1]["notLike_set"]
|
|
|
+ _sim = self.getSimilarity(_set1,_set2)
|
|
|
+ if _sim>SIM_PROB and _sim<1:
|
|
|
+ re_merge_sim = True
|
|
|
+ break
|
|
|
+ contain_keys = ["contain_column1","contain_column2"]
|
|
|
+
|
|
|
+ logging.info(the_group)
|
|
|
+ logging.info(str(re_merge)+str(re_merge_sim))
|
|
|
+ #重新成组
|
|
|
+ dict_docid_doc = {}
|
|
|
+ for _doc in the_group:
|
|
|
+ dict_docid_doc[_doc["docid"]] = _doc
|
|
|
+ if re_merge or re_merge_sim:
|
|
|
+ the_group.sort(key=lambda x:x["confidence"],reverse=True)
|
|
|
+ the_group.sort(key=lambda x:x["page_time_stamp"])
|
|
|
+
|
|
|
+ for _doc in the_group:
|
|
|
+ merge_flag = False
|
|
|
+ for _index in range(len(list_group)):
|
|
|
+ _g = list_group[_index]
|
|
|
+ hit_count = 0
|
|
|
+ dict_temp = dict()
|
|
|
+ #多个值的异常
|
|
|
+ if re_merge:
|
|
|
+ for _c_key in contain_keys:
|
|
|
+ dict_temp[_c_key] = _g[_c_key]
|
|
|
+ if _g[_c_key] is not None and _doc[_c_key] is not None:
|
|
|
+ if len(_g[_c_key])>len(_doc[_c_key]):
|
|
|
+ if str(_g[_c_key]).find(str(_doc[_c_key]))>=0:
|
|
|
+ dict_temp[_c_key] = _g[_c_key]
|
|
|
+ hit_count += 1
|
|
|
+ else:
|
|
|
+ if str(_doc[_c_key]).find(str(_g[_c_key]))>=0:
|
|
|
+ dict_temp[_c_key] = _doc[_c_key]
|
|
|
+ _g[_c_key] = _doc[_c_key]
|
|
|
+ hit_count += 1
|
|
|
+ else:
|
|
|
+ hit_count = 1
|
|
|
+ # if hit_count==len(contain_keys):
|
|
|
+ if hit_count>0:
|
|
|
+ _flag_sim = False
|
|
|
+ #相似而不相同的异常
|
|
|
+ if re_merge_sim:
|
|
|
+ for _docid in _g["docid"]:
|
|
|
+ tmp_d = dict_docid_doc[_docid]
|
|
|
+ _sim = self.getSimilarity(tmp_d["notLike_set"],_doc["notLike_set"])
|
|
|
+ if _sim>SIM_PROB and _sim<1:
|
|
|
+ _flag_sim = True
|
|
|
+ if not _flag_sim:
|
|
|
+ for _c_key in dict_temp.keys():
|
|
|
+ _g[_c_key] = dict_temp[_c_key]
|
|
|
+ _g["docid"].append(_doc["docid"])
|
|
|
+ merge_flag = True
|
|
|
+ break
|
|
|
+ if not merge_flag:
|
|
|
+ _dict = dict()
|
|
|
+ _dict["docid"] = [_doc["docid"]]
|
|
|
+ for _c_key in contain_keys:
|
|
|
+ _dict[_c_key] = _doc[_c_key]
|
|
|
+ list_group.append(_dict)
|
|
|
+
|
|
|
+ final_group = []
|
|
|
+ #判断是否符合一个值
|
|
|
+ for _group in list_group:
|
|
|
+ _split = []
|
|
|
+ for _docid in _group["docid"]:
|
|
|
+ _split.append(dict_docid_doc[_docid])
|
|
|
+
|
|
|
+ #通过置信度排序,尽可能保留组
|
|
|
+ _split.sort(key=lambda x:x["confidence"],reverse=True)
|
|
|
+ #置信度
|
|
|
+ list_key_index = []
|
|
|
+ for _k in keys:
|
|
|
+ list_key_index.append(getDiffIndex(_split,_k))
|
|
|
+
|
|
|
+ _index = min(list_key_index)
|
|
|
+
|
|
|
+
|
|
|
+ final_group.append([_c["docid"] for _c in _split[:_index]])
|
|
|
+ for _c in _split[_index:]:
|
|
|
+ final_group.append([_c["docid"]])
|
|
|
+
|
|
|
+
|
|
|
+ #若是找到两个以上,则全部单独成组,否则成一组
|
|
|
+ # _flag = True
|
|
|
+ # for _key in keys:
|
|
|
+ # if len(getSet(_split,_key))>1:
|
|
|
+ # _flag = False
|
|
|
+ # break
|
|
|
+ # if not _flag:
|
|
|
+ # for _docid in _group["docid"]:
|
|
|
+ # final_group.append([_docid])
|
|
|
+ # else:
|
|
|
+ # final_group.append(list(set(_group["docid"])))
|
|
|
+ else:
|
|
|
+ final_group = [list(set([item["docid"] for item in the_group]))]
|
|
|
+ log(str(final_group))
|
|
|
+
|
|
|
+
|
|
|
+ #每个channel选择一篇公告
|
|
|
+ final_group_channel = []
|
|
|
+ for _group in final_group:
|
|
|
+ dict_channel_id = {}
|
|
|
+ otherChannel = 10000
|
|
|
+ for _docid in _group:
|
|
|
+ _channel = dict_docid_doc[_docid].get("docchannel")
|
|
|
+ if _channel in [115,116,117]:
|
|
|
+ otherChannel += 1
|
|
|
+ _channel = otherChannel
|
|
|
+ if _channel not in dict_channel_id:
|
|
|
+ dict_channel_id[_channel] = []
|
|
|
+ dict_channel_id[_channel].append([_docid,dict_docid_doc[_docid].get("page_time_stamp"),dict_docid_doc[_docid].get("extract_count")])
|
|
|
+ channel_dict = {}
|
|
|
+ for k,v in dict_channel_id.items():
|
|
|
+ v.sort(key=lambda x:x[1])
|
|
|
+ v.sort(key=lambda x:x[2],reverse=True)
|
|
|
+ channel_dict[v[0][0]] = []
|
|
|
+ for _docs in v[1:]:
|
|
|
+ channel_dict[v[0][0]].append(_docs[0])
|
|
|
+ _d = {"data":channel_dict,"process_time":getCurrent_date()}
|
|
|
+ final_group_channel.append(_d)
|
|
|
+
|
|
|
+
|
|
|
+ return json.dumps(final_group_channel)
|
|
|
+
|
|
|
+@annotate('string -> string')
|
|
|
+class f_get_remerge_group_channel(BaseUDTF):
|
|
|
+ '''
|
|
|
+ 将多个组拆解成多条记录
|
|
|
+ '''
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ import logging
|
|
|
+ import json
|
|
|
+ global json,logging
|
|
|
+ logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
+
|
|
|
+ def process(self,json_remerge):
|
|
|
+ if json_remerge is not None:
|
|
|
+ list_group = json.loads(json_remerge)
|
|
|
+ for _group in list_group:
|
|
|
+ self.forward(json.dumps(_group))
|
|
|
+
|
|
|
@annotate('string -> string')
|
|
|
class f_get_remerge_group(BaseUDTF):
|
|
|
'''
|
|
@@ -840,7 +1058,29 @@ class f_getMergeProb(BaseUDTF):
|
|
|
|
|
|
|
|
|
|
|
|
+@annotate('string -> bigint,bigint')
|
|
|
+class f_check_remerge_channel(BaseUDTF):
|
|
|
+ '''
|
|
|
+ 将多个组拆解成多条记录
|
|
|
+ '''
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ import logging
|
|
|
+ import json
|
|
|
+ global json,logging
|
|
|
+ logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
|
|
|
+ def process(self,json_remerge):
|
|
|
+ if json_remerge is not None:
|
|
|
+ list_group = json.loads(json_remerge)
|
|
|
+ for _group in list_group:
|
|
|
+ _keys = _group.get("data").keys()
|
|
|
+ if len(_keys)>0:
|
|
|
+ main_docid = int(list(_keys)[0])
|
|
|
+ for k,v in _group.get("data",{}).items():
|
|
|
+ self.forward(main_docid,int(k))
|
|
|
+ for _v in v:
|
|
|
+ self.forward(main_docid,int(_v))
|
|
|
|
|
|
@annotate('string -> bigint,bigint')
|
|
|
class f_check_remerge(BaseUDTF):
|