#coding:UTF8 from odps.udf import annotate from odps.udf import BaseUDTF from odps.udf import BaseUDAF import re @annotate('string,string -> string,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string') class f_decode_extract(BaseUDTF): def __init__(self): import logging import json import time,re global json,logging,time,re self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*" logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') self.dict_channel = {"公告变更":51, "招标公告":52, "中标信息":101, "招标预告":102, "招标答疑":103, "资审结果":105, "法律法规":106, "新闻资讯":107, "采购意向":114, "拍卖出让":115, "土地矿产":116, "产权交易":117, "废标公告":118, "候选人公示":119, "合同公告":120} def process(self, extractjson,otherjson): if extractjson is not None: _extract = json.loads(extractjson) else: _extract = {} if otherjson is not None: _other = json.loads(otherjson) else: _other = {} project_code = "" project_name = "" tenderee = "" agency = "" win_tenderer = "" bidding_budget = "" win_bid_price = "" fingerprint = "" page_time_stamp = 0 docchannel = 0 extract_count = 0 page_time = _other.get("pageTime",time.strftime('%Y-%m-%d',time.localtime())) doctitle = _other.get("doctitle","") doctitle_refine = re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '', doctitle) area = _other.get("area","") province = _other.get("province","") city = _other.get("city","") district = _other.get("district","") web_source_no = _other.get("webSourceNo","") time_bidclose = _extract.get("time_bidclose") time_bidopen = _extract.get("time_bidopen") time_bidstart = _extract.get("time_bidstart") time_commencement = _extract.get("time_commencement") time_completion = _extract.get("time_completion") time_earnest_money_end = _extract.get("time_earnestMoneyEnd") time_earnest_money_start = _extract.get("time_earnestMoneyStart") time_get_file_end = _extract.get("time_getFileEnd") time_get_file_start = _extract.get("time_getFileStart") time_publicity_end = _extract.get("time_publicityEnd") time_publicity_start = _extract.get("time_publicityStart") time_registration_end = _extract.get("time_registrationEnd") time_registration_start = _extract.get("time_registrationStart") time_release = _extract.get("time_release") # docchannel = _other.get("docchannel",0) docchannel_name = _extract.get("docchannel",{}).get("docchannel") doctype_name = _extract.get("docchannel",{}).get("doctype") if doctype_name in ["法律法规","新闻资讯","拍卖出让","土地矿产"]: docchannel_name = doctype_name docchannel = self.dict_channel.get(docchannel_name,0) if re.search(self.time_pattern,page_time) is not None: try: timeArray = time.strptime(page_time[:11], "%Y-%m-%d") page_time_stamp = int(time.mktime(timeArray)) except Exception as e: pass list_code = _extract.get("code",[]) if len(list_code)>0: project_code = list_code[0] project_name = _extract.get("name","") fingerprint = _extract.get("fingerprint","") dict_pack = _extract.get("prem",{}) logging.info(dict_pack) for _key in dict_pack.keys(): if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0: extract_count += 1 if bidding_budget=="": bidding_budget = str(float(dict_pack[_key]["tendereeMoney"])) for _role in dict_pack[_key]["roleList"]: if isinstance(_role,list): extract_count += 1 if _role[2]!='' and float(_role[2])>0: extract_count += 1 if _role[0]=="tenderee": tenderee = _role[1] if _role[0]=="win_tenderer": if win_tenderer=="": win_tenderer = _role[1] if _role[2]!='' and float(_role[2])>0: extract_count += 1 if win_bid_price=="": win_bid_price = str(float(_role[2])) if _role[0]=="agency": agency = _role[1] if isinstance(_role,dict): extract_count += 1 if str(_role["role_money"]["money"])!='' and float(_role["role_money"]["money"])>0: extract_count += 1 if _role["role_name"]=="tenderee": tenderee = _role["role_text"] if _role["role_name"]=="win_tenderer": if win_tenderer=="": win_tenderer = _role["role_text"] if str(_role["role_money"]["money"])!='' and float(_role["role_money"]["money"])>0: extract_count += 1 if win_bid_price=="": win_bid_price = str(float(_role["role_money"]["money"])) if _role["role_name"]=="agency": agency = _role["role_text"] if project_code!="": extract_count += 1 if project_name!="": extract_count += 1 logging.info(page_time+doctitle+doctitle_refine+area+province+city+ district+web_source_no+project_code+project_name+tenderee+agency+win_tenderer+bidding_budget+win_bid_price) self.forward(page_time,page_time_stamp,docchannel,doctitle,doctitle_refine,area,province,city, district,web_source_no,fingerprint,project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count, time_bidclose,time_bidopen,time_bidstart,time_commencement,time_completion,time_earnest_money_end,time_earnest_money_start, time_get_file_end,time_get_file_start,time_publicity_end,time_publicity_start,time_registration_end,time_registration_start,time_release) @annotate("string->string") class f_get_product(object): def __init__(self): import time global time import logging import json import re global json,logging,re self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*" logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def evaluate(self, extractjson): if extractjson is None or extractjson=="": extractjson = "{}" _extract = json.loads(extractjson) return ",".join(_extract.get("product",[])) @annotate("string->string") class f_get_package(object): def __init__(self): import time global time import logging import json import re global json,logging,re self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*" logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def evaluate(self, extractjson): if extractjson is None or extractjson=="": extractjson = "{}" _extract = json.loads(extractjson) prem = _extract.get("prem",{}) list_pack = [] for k,v in prem.items(): if k!="Project": list_pack.append(k) return ",".join(list_pack) @annotate("string,string->string") class f_get_nlp_enterprise(object): def __init__(self): import time global time import logging import json import re global json,logging,re self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*" logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def evaluate(self, doctextcon,extractjson): if extractjson is None or extractjson=="": extractjson = "{}" _extract = json.loads(extractjson) nlp_enterprise = _extract.get("nlp_enterprise",[]) nlp_enterprise_attachment = _extract.get("nlp_enterprise_attachment",[]) if len(nlp_enterprise)==0 and len(nlp_enterprise_attachment)==0: dict_pack = _extract.get("prem",{}) for _key in dict_pack.keys(): for _role in dict_pack[_key]["roleList"]: if isinstance(_role,list): _entity = _role[1] nlp_enterprise.append(_entity) if isinstance(_role,dict): _entity = _role["role_text"] nlp_enterprise.append(_entity) nlp_enterprise = list(set(nlp_enterprise)) dict_entity = {"indoctextcon":nlp_enterprise, "notindoctextcon":nlp_enterprise_attachment} return json.dumps(dict_entity,ensure_ascii=False) @annotate("string->bigint") class f_get_extractCount(object): def __init__(self): import time global time import logging import json import re global json,logging,re self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*" logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def evaluate(self, extractjson): if extractjson is not None: _extract = json.loads(extractjson) else: _extract = {} dict_pack = _extract.get("prem",{}) extract_count = 0 list_code = _extract.get("code",[]) if len(list_code)>0: project_code = list_code[0] else: project_code = "" project_name = _extract.get("name","") bidding_budget = "" win_tenderer = "" win_bid_price = "" for _key in dict_pack.keys(): if "tendereeMoney" in dict_pack[_key] and dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0: extract_count += 1 if bidding_budget=="": bidding_budget = str(float(dict_pack[_key]["tendereeMoney"])) for _role in dict_pack[_key]["roleList"]: if isinstance(_role,list): extract_count += 1 if _role[2]!='' and float(_role[2])>0: extract_count += 1 if _role[0]=="tenderee": tenderee = _role[1] if _role[0]=="win_tenderer": if win_tenderer=="": win_tenderer = _role[1] if _role[2]!='' and float(_role[2])>0: extract_count += 1 if win_bid_price=="": win_bid_price = str(float(_role[2])) if _role[0]=="agency": agency = _role[1] if isinstance(_role,dict): extract_count += 1 if "role_money" in _role: if str(_role["role_money"].get("money",""))!='' and float(_role["role_money"].get("money",""))>0: extract_count += 1 if _role.get("role_name")=="tenderee": tenderee = _role["role_text"] if _role.get("role_name")=="win_tenderer": if win_tenderer=="": win_tenderer = _role["role_text"] if "role_money" in _role: if str(_role["role_money"]["money"])!='' and float(_role["role_money"]["money"])>0: extract_count += 1 if win_bid_price=="": win_bid_price = str(float(_role["role_money"]["money"])) if _role["role_name"]=="agency": agency = _role["role_text"] if project_code!="": extract_count += 1 if project_name!="": extract_count += 1 return extract_count @annotate('string,string,string,string,string -> string,string,string,bigint') class f_decode_sub_docs_json(BaseUDTF): def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self, project_code,project_name,tenderee,agency,sub_docs_json): columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""} extract_count = 0 if project_code is not None and project_code!="": extract_count += 1 if project_name is not None and project_name!="": extract_count += 1 if tenderee is not None and tenderee!="": extract_count += 1 if agency is not None and agency!="": extract_count += 1 if sub_docs_json is not None: for sub_docs in json.loads(sub_docs_json): for _key_sub_docs in sub_docs.keys(): extract_count += 1 if _key_sub_docs in columns: if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]: if _key_sub_docs in ["bidding_budget","win_bid_price"]: if float(sub_docs[_key_sub_docs])>0: columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs])) else: columns[_key_sub_docs] = str(sub_docs[_key_sub_docs]) self.forward(columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count) @annotate("string->bigint") class totimestamp(object): def __init__(self): import time global time import logging import json import re global json,logging,re self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*" logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def evaluate(self, str_time): try: logging.info(str_time) if str_time is not None and re.search(self.time_pattern,str_time) is not None: timeArray = time.strptime(str_time[:10], "%Y-%m-%d") timeStamp = int(time.mktime(timeArray)) return timeStamp else: return 0 except Exception as e: return 0 @annotate("string->string") class refind_name(object): def __init__(self): import logging import re global logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def evaluate(self, title): if title is not None: return re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|\[|\]|【|】', '', title) return "" @annotate('bigint,bigint,bigint,string,bigint,string->string') class f_set_docid(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> "" ''' def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,docid, page_time_stamp,extract_count,defind_column,defind_count,tenderee): buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count, "defind_column":defind_column,"defind_count":defind_count,"tenderee":tenderee}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_docs = buffer[0] list_docs.sort(key=lambda x:x["page_time_stamp"]) list_group = [] _begin = 0 defind_count = 0 if len(list_docs)>0: defind_count = list_docs[0]["defind_count"] print(defind_count) for i in range(len(list_docs)-1): if abs(list_docs[i]["page_time_stamp"]-list_docs[i+1]["page_time_stamp"])<=86400*2: continue else: _group = [] _set_column = set() _set_tenderee = set() for j in range(_begin,i+1): if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="": _set_tenderee.add(list_docs[j]["tenderee"]) _set_column.add(list_docs[j]["defind_column"]) _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]}) if len(_group)>=3 and len(_set_tenderee)>1: pass else: print(defind_count,len(_set_column)) if len(_group)>1: if defind_count==2: if len(_set_column)>=2: list_group.append(_group) elif defind_count==1: if len(_set_column)==1: list_group.append(_group) elif defind_count==0: list_group.append(_group) _begin = i+1 if len(list_docs)>1: _set_column = set() _set_tenderee = set() _group = [] for j in range(_begin,len(list_docs)): if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="": _set_tenderee.add(list_docs[j]["tenderee"]) _set_column.add(list_docs[j]["defind_column"]) _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]}) if len(_group)>=3 and len(_set_tenderee)>1: pass else: if len(_group)>1: if defind_count==2: if len(_set_column)>=2: list_group.append(_group) elif defind_count==1: if len(_set_column)==1: list_group.append(_group) elif defind_count==0: list_group.append(_group) return json.dumps(list_group) # def terminate(self, buffer): # # # list_docs = buffer[0] # if len(list_docs)>0: # defind_count = list_docs[0]["defind_count"] # # list_time_group = split_with_time(list_docs,"page_time_stamp",86400*2) # # list_group = [] # for time_group in list_time_group: # _group = [] # _set_column = set() # base_tenderee = "" # _set_tenderee = set() # for j in range(len(time_group)): # if time_group[j]["tenderee"] is not None and time_group[j]["tenderee"]!="": # # if base_tenderee =="": # # base_tenderee = time_group[j]["tenderee"] # # _set_tenderee.add(time_group[j]["tenderee"]) # # simi = getSimilarityOfString(base_tenderee,time_group[j]["tenderee"]) # # if simi<0.8: # # _set_tenderee.add(time_group[j]["tenderee"]) # # _set_tenderee.add(time_group[j]["tenderee"]) # _set_column.add(time_group[j]["defind_column"]) # _group.append({"docid":time_group[j]["docid"],"extract_count":time_group[j]["extract_count"]}) # # if len(_group)>=3 and len(_set_tenderee)>1: # pass # else: # if len(_group)>1: # if defind_count==2: # if len(_set_column)>=2: # list_group.append(_group) # elif defind_count==1: # if len(_set_column)==1: # list_group.append(_group) # elif defind_count==0: # list_group.append(_group) # # return json.dumps(list_group) def isEmpty(_str): if _str is None or _str=="": return True return False @annotate('bigint->string') class f_group_fingerprint(BaseUDAF): def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,docid): buffer[0].append(docid) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_docid = buffer[0] list_docid.sort(key=lambda x:x) return ",".join([str(a) for a in list_docid]) @annotate('string->bigint,string') class f_ungroup_fingerprint(BaseUDTF): def process(self,dumplicates): list_docid = dumplicates.split(",") self.forward(int(list_docid[0]),",".join(list_docid[1:])) @annotate('bigint,bigint,string->string') class f_dump_probability(BaseUDAF): ''' 合并组为一条记录 ''' def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,docid,page_time_stamp,_type): buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"type":_type}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_dict = buffer[0] list_dict = list_dict[:10000] list_group = split_with_time(list_dict,sort_key="page_time_stamp",timedelta=86400*2) return json.dumps(list_group) @annotate('string -> bigint,bigint,bigint,bigint,string') class f_split_dumplicate_probability(BaseUDTF): def __init__(self): import logging import json global logging,json logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,list_group_str): logging.info("0") logging.info(list_group_str) if list_group_str is not None: logging.info("1") try: list_group = json.loads(list_group_str) logging.info("2") for _group in list_group: if len(_group)>0: _type = _group[0].get("type","") logging.info("3%d"%len(list_group)) # _group.sort(key=lambda x:x["page_time_stamp"]) _len = min(100,len(_group)) for _index_i in range(_len): _count = 0 for _index_j in range(_index_i+1,_len): if abs(_group[_index_j]["page_time_stamp"]-_group[_index_i]["page_time_stamp"])>86400*120: break _count += 1 _docid1 = _group[_index_i]["docid"] _docid2 = _group[_index_j]["docid"] if _docid1<_docid2: self.forward(_docid1,_docid2,1,_len,_type) else: self.forward(_docid2,_docid1,1,_len,_type) except Exception as e: logging(str(e)) @annotate('bigint,bigint,string->string') class f_dumplicate_groupPairs(BaseUDAF): ''' 合并组为一条记录 ''' def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,is_exists,counts,_type): buffer[0].append({"is_exists":is_exists,"counts":counts,"_type":_type}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_dict = buffer[0] list_dict = list_dict[:10000] return json.dumps(list_dict) def check_columns(tenderee_less,tenderee_greater, agency_less,agency_greater,project_code_less,project_code_greater,project_name_less,project_name_greater, win_tenderer_less,win_tenderer_greater,win_bid_price_less,win_bid_price_greater, bidding_budget_less,bidding_budget_greater,doctitle_refine_less,doctitle_refine_greater): flag = True _set_tenderee = set() if tenderee_less is not None and tenderee_less!="": _set_tenderee.add(tenderee_less) if tenderee_greater is not None and tenderee_greater!="": _set_tenderee.add(tenderee_greater) if len(_set_tenderee)>1: return False code_sim = getSimilarityOfString(project_code_less,project_code_greater) if code_sim>0.6 and code_sim<1: return False #同批次不同编号 if getLength(project_code_less)>0 and getLength(project_code_greater)>0: _split_code_less = project_code_less.split("-") _split_code_greater = project_code_greater.split("-") if len(_split_code_less)>1 and len(_split_code_greater)>1: if _split_code_less[0]==_split_code_greater[0] and project_code_less!=project_code_greater: return False _set_win_tenderer = set() if win_tenderer_less is not None and win_tenderer_less!="": _set_win_tenderer.add(win_tenderer_less) if win_tenderer_greater is not None and win_tenderer_greater!="": _set_win_tenderer.add(win_tenderer_greater) if len(_set_win_tenderer)>1: return False _set_win_bid_price = set() if win_bid_price_less is not None and win_bid_price_less!="": _set_win_bid_price.add(float(win_bid_price_less)) if win_bid_price_greater is not None and win_bid_price_greater!="": _set_win_bid_price.add(float(win_bid_price_greater)) if len(_set_win_bid_price)>1: return False _set_bidding_budget = set() if bidding_budget_less is not None and bidding_budget_less!="": _set_bidding_budget.add(float(bidding_budget_less)) if bidding_budget_greater is not None and bidding_budget_greater!="": _set_bidding_budget.add(float(bidding_budget_greater)) if len(_set_bidding_budget)>1: return False return True import math def featurnCount(_count,max_count=100): return max(0,min(1,_count))*(1/math.sqrt(max(1,_count-1))) def getSimLevel(str1,str2): str1_null = False str2_null = False _v = 0 if str1 is None or str1=="": str1_null = True if str2 is None or str2=="": str2_null = True if str1_null and str2_null: _v = 2 elif str1_null and not str2_null: _v = 4 elif not str1_null and str2_null: _v = 6 elif not str1_null and not str2_null: if str1==str2: _v = 10 else: _v = 0 return _v def getLength(_str): return len(_str if _str is not None else "") def check_money(bidding_budget_less,bidding_budget_greater, win_bid_price_less,win_bid_price_greater): #check saming budget_is_same = "" price_is_same = "" if getLength(bidding_budget_less)>0 and getLength(bidding_budget_greater)>0: budget_less = float(bidding_budget_less) budget_greater = float(bidding_budget_greater) if budget_less!=budget_greater: if max(budget_less,budget_greater)/min(budget_less,budget_greater)==10000: budget_is_same = True if budget_less>10000 and budget_greater>10000 and round(budget_less/10000,2)==round(budget_greater/10000,2): budget_is_same = True if budget_is_same=="": return False if getLength(win_bid_price_less)>0 and getLength(win_bid_price_greater)>0: price_less = float(win_bid_price_less) price_greater = float(win_bid_price_greater) if price_less!=price_greater: if max(price_less,price_greater)/min(price_less,price_greater)==10000: price_is_same = True if price_less>10000 and price_greater>10000 and round(price_less/10000,2)==round(price_greater/10000,2): price_is_same = True if price_is_same=="": return False return True def check_entity(nlp_enterprise_less,nlp_enterprise_greater, tenderee_less,tenderee_greater, agency_less,agency_greater, win_tenderer_less,win_tenderer_greater, similarity=0.85): def get_same_of_entity(nlp_enterprise_less,nlp_enterprise_greater,entity_less,entity_greater,similarity): if getLength(entity_less)>0 and getLength(entity_greater)>0: if entity_less!=entity_greater: is_same = '' _sim = jaccard_score(entity_less,entity_greater) if _sim>similarity: is_same = True if is_same=='': if str(nlp_enterprise_less).find(entity_greater)>0 or str(nlp_enterprise_greater).find(entity_less)>0: is_same = True if is_same=='': return False return True if not get_same_of_entity(nlp_enterprise_less,nlp_enterprise_greater,tenderee_less,tenderee_greater,similarity): return False if not get_same_of_entity(nlp_enterprise_less,nlp_enterprise_greater,agency_less,agency_greater,similarity): return False if not get_same_of_entity(nlp_enterprise_less,nlp_enterprise_greater,win_tenderer_less,win_tenderer_greater,similarity): return False return True def check_codes(project_codes_less,project_codes_greater): #check the similarity is_same = False is_sim = False for project_code_less in project_codes_less: for project_code_greater in project_codes_greater: code_sim = getSimilarityOfString(project_code_less,project_code_greater) if code_sim>0.6 and code_sim<1: is_sim = True if code_sim==1: is_same = True if is_same: return True if is_sim: return False return True def check_demand(): return True package_number_pattern = re.compile("(?P(((([^承]|^)包|标[段号的包]|分?包|包组|包件)编?号?|子项目|项目类型|项目)[::]?[0-9A-Za-z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦ]{1,4}[^\.]?)[^至]?|((?![\.])第?[ⅠⅡⅢⅣⅤⅥⅦ0-9A-Za-z一二三四五六七八九十]{1,4}(包号|标[段号的包]|分?包)))") # 第? 去掉问号 修复 纯木浆8包/箱复印 这种作为包号 code_pattern = re.compile("[A-Za-z0-9\-\(\)()【】\.-]+") num_pattern = re.compile("^\d+(?:\.\d+)?$") num1_pattern = re.compile("[一二三四五六七八九]+") location_pattern = re.compile(".{1,2}[市区镇县村路]") building_pattern = "工程招标代理|工程设计|工程造价咨询|施工图设计文件审查|咨询|环评|设计|施工监理|施工|监理|EPC|epc|总承包|水土保持|选址论证|勘界|勘察|预算编制|预算审核|设备类|第?[\((]?[一二三四五六七八九1-9][)\)]?[次批]" date_pattern = re.compile("\d{2,4}[\-\./年]\d{1,2}[\-\./月]\d{1,2}") def check_doctitle(doctitle_refind_less,doctitle_refind_greater,codes_less=[],code_greater=[]): doctitle_refind_less = str(doctitle_refind_less).replace("(","(").replace(")",")") doctitle_refind_greater = str(doctitle_refind_greater).replace("(","(").replace(")",")") for _c in codes_less: doctitle_refind_less = str(doctitle_refind_less).replace(_c,"") for _c in code_greater: doctitle_refind_greater = str(doctitle_refind_greater).replace(_c,"") doctitle_refind_less = re.sub(date_pattern,"",doctitle_refind_less) doctitle_refind_greater = re.sub(date_pattern,"",doctitle_refind_greater) #check the package if doctitle_refind_less is None: doctitle_refind_less = "" if doctitle_refind_greater is None: doctitle_refind_greater = "" _pack1 = None _pack2 = None #if contain then pass if doctitle_refind_less.find(doctitle_refind_greater)>=0 or doctitle_refind_greater.find(doctitle_refind_less)>=0: return True #check the package in title _match = re.search(package_number_pattern,doctitle_refind_less) if _match is not None: _pack1 = _match.groupdict()["name"] _match = re.search(package_number_pattern,doctitle_refind_greater) if _match is not None: _pack2 = _match.groupdict()["name"] if _pack1 is not None and _pack2 is not None: if _pack1!=_pack2: return False #check the nums in title doctitle_refind_less = re.sub(package_number_pattern,"",doctitle_refind_less) doctitle_refind_greater = re.sub(package_number_pattern,"",doctitle_refind_greater) #check the nums,location,building in title for _p in [code_pattern]: num_all_l = re.findall(_p,doctitle_refind_less) num_all_g = re.findall(_p,doctitle_refind_greater) set_num_l = set() set_num_g = set() for _l in num_all_l: if re.search(num_pattern,_l) is not None: if _l.find(".")>0: set_num_l.add(_l) elif len(_l)<4: set_num_l.add(_l) for _g in num_all_g: if re.search(num_pattern,_g) is not None: if _g.find(".")>0: set_num_g.add(_g) elif len(_g)<4: set_num_g.add(_g) if len(set_num_l)>0 and len(set_num_g)>0: if len(set_num_l&set_num_g)!=len(set_num_l): return False #check location and keywords for _p in [num1_pattern,location_pattern,building_pattern]: num_all_l = re.findall(_p,doctitle_refind_less) num_all_g = re.findall(_p,doctitle_refind_greater) set_num_l = set(num_all_l) set_num_g = set(num_all_g) if len(set_num_l)==len(set_num_g): if len(set_num_l&set_num_g)!=len(set_num_l): return False return True def check_product(product_less,product_greater,split_char=","): if getLength(product_less)>0 and getLength(product_greater)>0: _product_l = product_less.split(split_char) _product_g = product_greater.split(split_char) for _l in _product_l: for _g in _product_g: if getSimilarityOfString(_l,_g)>=0.8: return True return False return True def check_package(package_less,package_greater,split_char=","): if getLength(package_less)>0 and getLength(package_greater)>0: _product_l = package_less.split(split_char) _product_g = package_greater.split(split_char) for _l in _product_l: for _g in _product_g: if _l==_g: return True return False return True def check_time(json_time_less,json_time_greater): if getLength(json_time_less)>0 and getLength(json_time_greater)>0: if isinstance(json_time_less,dict): time_less = json_time_less else: time_less = json.loads(json_time_less) if isinstance(json_time_greater,dict): time_greater = json_time_greater else: time_greater = json.loads(json_time_greater) for k,v in time_less.items(): if getLength(v)>0: v1 = time_greater.get(k,"") if getLength(v1)>0: if v!=v1: return False return True @annotate("string,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string->string,double") class f_dumplicate_featureMatrix(BaseUDTF): def __init__(self): import logging import json global logging,json def process(self,json_context,docchannel_less,docchannel_greater,page_time_less,page_time_greater,nlp_enterprise_less,nlp_enterprise_greater,tenderee_less,tenderee_greater, agency_less,agency_greater,project_code_less,project_code_greater,project_name_less,project_name_greater, win_tenderer_less,win_tenderer_greater,win_bid_price_less,win_bid_price_greater, bidding_budget_less,bidding_budget_greater,doctitle_refine_less,doctitle_refine_greater,product_less,product_greater): #check the page_time by special docchannel if docchannel_less in (51,102,103,104,115,116,117): if doctitle_refine_less!=doctitle_refine_greater: if page_time_less!=page_time_greater: self.forward("[1-%s]"%(str(docchannel_less)),0) return if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,[str(project_code_less)],[str(project_code_greater)]): self.forward("[2-%s]"%(str(doctitle_refine_less)+"=="+str(doctitle_refine_greater)),0) return # if not check_codes([project_code_less],[project_code_greater]): # self.forward("[3-%s]"%(str(project_code_less)+"=="+str(project_code_greater)),0) # return if not check_demand(): self.forward("[4-]",0) return if not check_entity(nlp_enterprise_less,nlp_enterprise_greater, tenderee_less,tenderee_greater, agency_less,agency_greater, win_tenderer_less,win_tenderer_greater): _error = "" for a in [nlp_enterprise_less,nlp_enterprise_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater]: _error += str(a) self.forward("[5-%s]"%_error,0) return if not check_money(bidding_budget_less,bidding_budget_greater, win_bid_price_less,win_bid_price_greater): _error = "" for a in [bidding_budget_less,bidding_budget_greater, win_bid_price_less,win_bid_price_greater]: _error += str(a) self.forward("[6-%s]"%_error,0) return if not check_product(product_less,product_greater): _error = "%s=%s"%(str(product_less),str(product_greater)) self.forward("7-%s"%_error,0) return _context = json.loads(json_context) min_counts = 100 dict_context = {} for item in _context: if item["counts"]string') class f_redump_probability_final_check(BaseUDAF): ''' 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值 ''' def __init__(self): import logging import json,re global json,logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [list()] def iterate(self, buffer,main_docid,docid,newly,docchannel,nlp_enterprise,product,package,json_dicttime,page_time,project_code,doctitle_refine,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count,confidence): buffer[0].append({"main_docid":main_docid,"docid":docid,"docchannel":docchannel,"nlp_enterprise":nlp_enterprise,"product":product,"package":package,"json_dicttime":json_dicttime,"page_time":page_time, "project_code":project_code,"doctitle_refine":doctitle_refine,"tenderee":tenderee,"agency":agency,"win_tenderer":win_tenderer,"bidding_budget":bidding_budget, "win_bid_price":win_bid_price,"extract_count":extract_count,"confidence":confidence}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_group = [] the_group = buffer[0] the_group.sort(key=lambda x:x["confidence"],reverse=True) _index = 0 if len(the_group)>0: _index = 1 while _index0 and getLength(project_code_greater)>0 and project_code_less==project_code_greater: check_result["code"] = 2 else: check_result["code"] = 1 if not check_product(product_less,product_greater): check_result["product"] = 0 check_result["pass"] = 0 logging.info("check_product_failed:%s==%s"%(str(product_less),str(product_greater))) else: if getLength(product_less)>0 and getLength(product_greater)>0: check_result["product"] = 2 else: check_result["product"] = 1 if not check_demand(): check_result["pass"] = 0 if not check_entity(nlp_enterprise_less,nlp_enterprise_greater, tenderee_less,tenderee_greater, agency_less,agency_greater, win_tenderer_less,win_tenderer_greater): check_result["entity"] = 0 check_result["pass"] = 0 logging.info("check_entity_failed:%s==%s==%s==%s==%s==%s==%s==%s"%(str(nlp_enterprise_less),str(nlp_enterprise_greater),str(tenderee_less),str(tenderee_greater),str(agency_less),str(agency_greater),str(win_tenderer_less),str(win_tenderer_greater))) else: if docchannel_less in (51,52,103,105,114,118) and getLength(tenderee_less)>0 and getLength(tenderee_greater)>0: check_result["entity"] = 2 elif docchannel_less in (101,119,120) and getLength(win_tenderer_less)>0 and getLength(win_tenderer_greater)>0: check_result["entity"] = 2 else: check_result["entity"] = 1 if not check_money(bidding_budget_less,bidding_budget_greater, win_bid_price_less,win_bid_price_greater): logging.info("check_money_failed:%s==%s==%s==%s"%(str(bidding_budget_less),str(bidding_budget_greater),str(win_bid_price_less),str(win_bid_price_greater))) check_result["money"] = 0 check_result["pass"] = 0 else: if docchannel_less in (51,52,103,105,114,118) and getLength(bidding_budget_less)>0 and getLength(bidding_budget_greater)>0: check_result["money"] = 2 elif docchannel_less in (101,119,120) and getLength(win_bid_price_less)>0 and getLength(win_bid_price_greater)>0: check_result["money"] = 2 else: check_result["money"] = 1 #added check if not check_package(package_less,package_greater): logging.info("check_package_failed:%s==%s"%(str(package_less),str(package_greater))) check_result["package"] = 0 check_result["pass"] = 0 else: if getLength(package_less)>0 and getLength(package_greater)>0: check_result["package"] = 2 else: check_result["package"] = 1 #added check if not check_time(json_time_less,json_time_greater): logging.info("check_time_failed:%s==%s"%(str(json_time_less),str(json_time_greater))) check_result["time"] = 0 check_result["pass"] = 0 else: if getLength(json_time_less)>10 and getLength(json_time_greater)>10: check_result["time"] = 2 else: check_result["time"] = 1 if check_result.get("pass",0)==0: logging.info(str(check_result)) if check_result.get("time",1)==0: break if check_result.get("money",1)==0: break if check_result.get("entity",1)==2 and check_result.get("code",1)==2 and check_result.get("doctitle",2)==2 and check_result.get("product",2)==2: pass else: break _less_index += 1 if _less_index!=_index: break _index += 1 dumplicates = "" if _index>1: logging.info("index/whole:%d/%d"%(_index,len(the_group))) final_group = the_group[:_index] final_group.sort(key=lambda x:x["docid"]) final_group.sort(key=lambda x:x["extract_count"],reverse=True) _set = set() for _d in final_group: _docid = _d["docid"] if _docid in _set: continue dumplicates += "%d,"%_docid _set.add(_docid) dumplicates = dumplicates[:-1] return dumplicates @annotate('bigint,bigint,bigint,string,string,string,string,string,string,string,string->string') class f_set_docid_binaryChart(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> "" ''' def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,docid, page_time_stamp,extract_count,project_code,project_name,tenderee,bidding_budget,win_tenderer,win_bid_price,agency,web_source_no): buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count, "project_code":project_code,"project_name":project_name,"tenderee":tenderee, "bidding_budget":bidding_budget,"win_tenderer":win_tenderer,"win_bid_price":win_bid_price, "agency":agency,"web_source_no":web_source_no}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_docs = buffer[0] list_timeGroups = split_with_time(list_docs,"page_time_stamp",86400*2) list_group = [] empty_key = ["project_code","bidding_budget","win_tenderer","win_bid_price","agency"] for _timeGroups in list_timeGroups: list_empty = [] list_notEmpty = [] for _item in _timeGroups: empty_flag = True for _key in empty_key: if not isEmpty(_item[_key]): empty_flag = False break if empty_flag: list_empty.append(_item) else: list_notEmpty.append(_item) for _e in list_empty: _group = [{"docid":_e["docid"],"extract_count":_e["extract_count"]}] _e_tenderee = _e["tenderee"] for _ne in list_notEmpty: if "set_webSource" not in _ne: _ne["set_webSource"] = set() _ne["set_webSource"].add(_ne["web_source_no"]) _suit = False if not isEmpty(_e_tenderee) and _e_tenderee==_ne["tenderee"]: _suit = True elif isEmpty(_e_tenderee): _suit = True if _suit: if _e["web_source_no"] not in _ne["set_webSource"]: _ne["set_webSource"].add(_e["web_source_no"]) _group.append({"docid":_ne["docid"],"extract_count":_ne["extract_count"]}) break if len(_group)>1: list_group.append(_group) return json.dumps(list_group) def split_with_time(list_dict,sort_key,timedelta=86400*2): if len(list_dict)>0: if sort_key in list_dict[0]: list_dict.sort(key=lambda x:x[sort_key]) list_group = [] _begin = 0 for i in range(len(list_dict)-1): if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])<=timedelta: continue else: _group = [] for j in range(_begin,i+1): _group.append(list_dict[j]) if len(_group)>1: list_group.append(_group) _begin = i + 1 if len(list_dict)>1: _group = [] for j in range(_begin,len(list_dict)): _group.append(list_dict[j]) if len(_group)>1: list_group.append(_group) return list_group return [list_dict] @annotate('bigint,bigint,bigint,string,string,string,string,string->string') class f_set_docid_limitNum_contain(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同 ''' def __init__(self): import logging import json,re global json,logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [list()] def iterate(self, buffer,docid,page_time_stamp,extract_count,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column): buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,"set_limit_column1":set_limit_column1, "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4, "contain_column":contain_column}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_split = split_with_time(buffer[0],"page_time_stamp") list_group = [] for _split in list_split: flag = True keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"] for _key in keys: logging.info(_key+str(getSet(_split,_key))) if len(getSet(_split,_key))>1: flag = False break MAX_CONTAIN_COLUMN = None #判断组内每条公告是否包含 if flag: for _d in _split: contain_column = _d["contain_column"] if contain_column is not None and contain_column !="": if MAX_CONTAIN_COLUMN is None: MAX_CONTAIN_COLUMN = contain_column else: if len(MAX_CONTAIN_COLUMN)1: _group = [] for _item in _split: _group.append({"docid":_item["docid"],"extract_count":_item["extract_count"]}) list_group.append(_group) return json.dumps(list_group) @annotate('bigint->string') class f_stamp_squence(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> "" ''' def __init__(self): import json global json import logging global logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [set()] def iterate(self, buffer,page_time_stamp): buffer[0].add(page_time_stamp) def merge(self, buffer, pbuffer): buffer[0] |= pbuffer[0] def terminate(self, buffer): if 0 in buffer[0]: buffer[0].remove(0) list_stamp = list(buffer[0]) list_stamp.sort(key=lambda x:x) list_stamp_final = [] _begin = 0 _time_decase = 86400*2 logging.info(str(list_stamp)) for _index in range(len(list_stamp)-1): if list_stamp[_index+1]-list_stamp[_index]<_time_decase: continue else: list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[_index]+_time_decase]) _begin = _index+1 if len(list_stamp)>0: list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[-1]+_time_decase]) return json.dumps(list_stamp_final) @annotate("bigint,string->bigint") class in_stamp(object): def __init__(self): import logging import re import json global logging,re,json logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def evaluate(self, page_time_stamp,json_stamp): list_stamp = json.loads(json_stamp) int_flag = 0 for item in list_stamp: if page_time_stamp item[0] and page_time_stamp=1 and rule_id <30: return 20 else: return 10 @annotate('string,string -> string') class f_splitStr(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self, str_split,_split): try: for _s in str_split.split(_split): self.forward(_s) except Exception as e: pass @annotate('string,bigint -> bigint,bigint,bigint,bigint,bigint') class f_split_group_single(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self, json_set_docid,rule_id): list_group = json.loads(json_set_docid) for item in list_group: if len(item)>100: item.sort(key=lambda x:x["docid"],reverse=True) index_i = 0 for index_j in range(1,len(item)): if item[index_i]["docid"]!=item[index_j]["docid"]: self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id)) else: for index_i in range(len(item)): for index_j in range(len(item)): if index_i!=index_j and item[index_i]["docid"]!=item[index_j]["docid"]: self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id)) @annotate('bigint,string->string') class group_document(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> "" ''' def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,id,json_set_docid): buffer[0].append({"id":id,"json_set_docid":json.loads(json_set_docid)}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): return json.dumps(buffer[0]) @annotate('bigint,string,bigint,string -> bigint,bigint,string') class decare_document(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,group_id1, json_list_doc1,group_id2,json_list_doc2): #y=x,少掉近一半的数据 if group_id1>=group_id2: list_doc1 = json.loads(json_list_doc1) list_doc2 = json.loads(json_list_doc2) for _doc1 in list_doc1: for _doc2 in list_doc2: #同一个重复group不做判断 if _doc1["id"]!=_doc2["id"]: #判断两个group是否有重复 _set1 = set() for _item1 in _doc1["json_set_docid"]: _set1.add(_item1["docid"]) _set2 = set() for _item2 in _doc2["json_set_docid"]: _set2.add(_item2["docid"]) if len(_set1&_set2)>0: new_json_set_docid = _doc1["json_set_docid"] for _item2 in _doc2["json_set_docid"]: if _item2["docid"] not in _set1: new_json_set_docid.append(_item2) self.forward(_doc1["id"],_doc2["id"],json.dumps(new_json_set_docid)) def getBestDocid(list_pair): # [docid1,extract_count1,docid2,extract_count2] # list_pair.sort(key=lambda x:x[3],reverse=True) # _max_count = max(list_pair[0][3],list_pair[0][1]) # set_candidate = set() # if list_pair[0][1]==_max_count: # set_candidate.add(list_pair[0][0]) # for item in list_pair: # if item[3]==_max_count: # set_candidate.add(item[2]) # else: # break # list_candidate = list(set_candidate) # list_candidate.sort(key=lambda x:x) new_pair = [] new_pair.append([list_pair[0][0],list_pair[0][0],list_pair[0][1]]) for item in list_pair: new_pair.append([item[0],item[2],item[3]]) new_pair.sort(key=lambda x:x[1]) new_pair.sort(key=lambda x:x[2],reverse=True) return new_pair[0][1] @annotate('bigint,bigint,bigint,bigint->string') class choose_document(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> "" ''' def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2): buffer[0].append([docid1,extract_count1,docid2,extract_count2]) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_pair = buffer[0] _set = set() for item in buffer[0]: _set.add(str(item[2])) list_dumplicate = list(_set) best_docid = getBestDocid(list_pair) if best_docid==list_pair[0][0]: save_flag = 1 else: save_flag = 0 return json.dumps({"save_flag":save_flag,"dumplicates":list_dumplicate}) @annotate('string -> bigint,string') class f_get_choose_document(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,json_choose): if json_choose is None: self.forward(1,None) else: _choose = json.loads(json_choose) self.forward(_choose["save_flag"],",".join(_choose["dumplicates"])) @annotate('string->bigint') class f_get_codes_count(object): def evaluate(self,extract_json): if extract_json is None or extract_json=="": extract_json = "{}" _extract = json.loads(extract_json) _codes = _extract.get("code",[]) return len(_codes) @annotate('string->string') class f_get_codes(object): def evaluate(self,extract_json): if extract_json is None or extract_json=="": extract_json = "{}" _extract = json.loads(extract_json) _codes = _extract.get("code",[]) return ",".join(_codes) @annotate('bigint,bigint,bigint,bigint->string') class group_document_bestFirst(BaseUDAF): ''' 将组里面最优的放在前面 ''' def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2): buffer[0].append([docid1,extract_count1,docid2,extract_count2]) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_pair = buffer[0] _set = set() for item in buffer[0]: _set.add(item[2]) _set.add(list_pair[0][0]) best_docid = getBestDocid(list_pair) _set.remove(best_docid) list_dumplicate = list(_set) list_dumplicate.sort(key=lambda x:x) list_dumplicate.insert(0,best_docid) list_dumplicate_str = [] for item in list_dumplicate: list_dumplicate_str.append(str(item)) return ",".join(list_dumplicate_str) @annotate('string -> bigint,string') class f_get_best_dumplicates(BaseUDTF): ''' 得到每个分组中最优的那一条及其重复记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,list_dumplicate_str): if list_dumplicate_str is None or list_dumplicate_str=='': pass else: list_dumplicate = list_dumplicate_str.split(",") if len(list_dumplicate)>0: self.forward(int(list_dumplicate[0]),",".join(list_dumplicate[1:])) else: pass @annotate('bigint,bigint->string') class bridge2group(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> "" ''' def __init__(self): import json global json def new_buffer(self): return [set()] def iterate(self, buffer,docid1,docid2): buffer[0].add(docid1) buffer[0].add(docid2) def merge(self, buffer, pbuffer): buffer[0] |= pbuffer[0] def terminate(self, buffer): list_pair = list(buffer[0]) list_pair.sort(key=lambda x:x,reverse=True) return json.dumps(list_pair) @annotate('string -> bigint,bigint') class group2bridge(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,json_list_docid): list_docid = json.loads(json_list_docid) for _docid in list_docid: self.forward(list_docid[-1],_docid) @annotate('string->string') class to_url(object): def evaluate(self,_s): if _s is None or _s=="": return else: list_l = [] for l in _s.split(","): list_l.append("http://www.bidizhaobiao.com/info-%s.html"%l) return ",".join(list_l) @annotate('bigint,bigint,string -> bigint') class f_get_dump_docid(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,docid,save_flag,dumplicates): if save_flag==0: self.forward(docid) if dumplicates is not None: list_docid = dumplicates.split(",") if len(list_docid)>0: for _docid in list_docid[1:]: self.forward(int(_docid)) else: if dumplicates is not None: list_docid = dumplicates.split(",") if len(list_docid)>0: for _docid in list_docid: self.forward(int(_docid)) @annotate('string -> bigint,bigint') class f_get_docid(BaseUDTF): ''' 将多个组拆解成多条记录 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,json_set_docid): team_id = 0 if json_set_docid is not None: list_docses = json.loads(json_set_docid) for list_docs in list_docses: team_id += 1 for item in list_docs: self.forward(team_id,item["docid"]) @annotate("string->bigint") class get_count_dump(object): def __init__(self): import logging import re global logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def evaluate(self, title): _count = 0 if title is not None: _count = len(title.split(",")) return _count def getSet(list_dict,key): _set = set() for item in list_dict: if key in item: if item[key]!='' and item[key] is not None: if re.search("^\d[\d\.]*$",item[key]) is not None: _set.add(str(float(item[key]))) else: _set.add(str(item[key])) return _set def getDiffIndex(list_dict,key,confidence=100): ''' 优化为相似度判断 :param list_dict: :param key: :param confidence: :return: ''' # _set = set() # for _i in range(len(list_dict)): # item = list_dict[_i] # if item["confidence"]>=confidence: # continue # if key in item: # if item[key]!='' and item[key] is not None: # if re.search("^\d+(\.\d+)?$",item[key]) is not None: # _set.add(str(float(item[key]))) # else: # _set.add(str(item[key])) # if len(_set)>1: # return _i # ============================== _set = set() _set_m = set() base_s = "" for _i in range(len(list_dict)): item = list_dict[_i] if item["confidence"]>=confidence: continue if key in item: if item[key]!='' and item[key] is not None: if re.search("^\d+(\.\d+)?$",item[key]) is not None: _m = float(item[key]) if _m>100000: _m = _m//10000*10000 _set_m.add(str(_m)) else: _s = str(item[key]) if base_s=="": base_s = _s else: simi = getSimilarityOfString(base_s,_s) if simi<0.8: return _i if len(_set_m)>1: return _i return len(list_dict) @annotate('bigint,string -> bigint,bigint') class f_getGroup_dumpFinal(BaseUDTF): ''' 从最后的结果中获取组 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,docid,dumplicates): self.forward(int(docid),int(docid)) if dumplicates is not None: list_docids = dumplicates.split(",") for _docid in list_docids: self.forward(int(docid),int(_docid)) @annotate('bigint,bigint,string,string,string,string,bigint,bigint,bigint->string') class f_redump_limit_num(BaseUDAF): ''' 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值 ''' def __init__(self): import logging import json,re global json,logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [list()] def iterate(self, buffer,main_docid,docid,doctitle,set_limit_column2,set_limit_column3,set_limit_column4,extract_count1,extract_count2,confidence): buffer[0].append({"main_docid":main_docid,"docid":docid,"doctitle":doctitle,"set_limit_column2":set_limit_column2, "set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,"extract_count1":extract_count1, "extract_count2":extract_count2,"confidence":confidence}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_group = [] the_group = buffer[0] the_group.sort(key=lambda x:x["confidence"],reverse=True) if len(the_group)>5: keys = ["doctitle","set_limit_column2","set_limit_column3","set_limit_column4"] else: keys = ["set_limit_column2","set_limit_column3","set_limit_column4"] final_group = [] #置信度 list_key_index = [] for _k in keys: if _k=="doctitle": list_key_index.append(getDiffIndex(the_group,_k,confidence=30)) else: list_key_index.append(getDiffIndex(the_group,_k)) _index = min(list_key_index) if _index>1: main_docid = the_group[0]["main_docid"] for item in the_group[:_index]: if item["docid"]!=main_docid: final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"],"confidence":item["confidence"]}) # stay = True # for _key in keys: # if len(getSet(the_group,_key))>1: # stay = False # break # # if stay: # main_docid = the_group[0]["main_docid"] # for item in the_group: # if item["docid"]!=main_docid: # final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"]}) return json.dumps(final_group) @annotate('string -> bigint,bigint,bigint,bigint,bigint') class f_get_dumpFinal_checked(BaseUDTF): ''' 从最后的结果中获取组 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,list_group): if list_group is not None: final_group = json.loads(list_group) for _group in final_group: self.forward(_group["docid1"],_group["docid2"],_group["extract_count1"],_group["extract_count2"],_group["confidence"]) @annotate('string -> bigint') class f_getDumplicateDocids(BaseUDTF): ''' 从最后的结果中获取组 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,dumplicates): list_docids = dumplicates.split(",") for _d in list_docids: self.forward(int(_d)) def jaccard_score(source,target): source_set = set([s for s in source]) target_set = set([s for s in target]) if len(source_set)==0 or len(target_set)==0: return 0 return max(len(source_set&target_set)/len(source_set),len(source_set&target_set)/len(target_set)) def getSimilarityOfString(str1,str2): _set1 = set() _set2 = set() if str1 is not None: for i in range(1,len(str1)): _set1.add(str1[i-1:i+1]) for i in range(2,len(str1)): _set1.add(str1[i-2:i+1]) if str2 is not None: for i in range(1,len(str2)): _set2.add(str2[i-1:i+1]) for i in range(2,len(str2)): _set2.add(str2[i-2:i+1]) _len = max(1,min(len(_set1),len(_set2))) return len(_set1&_set2)/_len @annotate("string,string,string,string,string,string,string,string,string,string->bigint") class f_is_legal(object): def __init__(self): import logging import re global logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def evaluate(self, tenderee1,tenderee2,bidding_budget1,budding_budget2,win_tenderee1,win_tenderee2,win_bid_price1,win_bid_price2,project_code1,project_code2): if tenderee1 is not None and tenderee1!="" and tenderee2 is not None and tenderee2!="" and tenderee1!=tenderee2: return 0 if bidding_budget1 is not None and bidding_budget1!="" and budding_budget2 is not None and budding_budget2!="" and bidding_budget1!=budding_budget2: return 0 if win_tenderee1 is not None and win_tenderee1!="" and win_tenderee2 is not None and win_tenderee2!="" and win_tenderee1!=win_tenderee2: return 0 if win_bid_price1 is not None and win_bid_price1!="" and win_bid_price2 is not None and win_bid_price2!="" and win_bid_price1!=win_bid_price2: return 0 _sim = getSimilarityOfString(project_code1,project_code2) if _sim>0.7 and _sim<1: return 0 return 1 @annotate('bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint,bigint,bigint->string') class f_autorule_group(BaseUDAF): ''' 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值 ''' def __init__(self): import logging import json,re global json,logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [list()] def iterate(self, buffer,main_docid,docid,docchannel,doctitle,doctitle_refine,area,province,city,district,web_source_no,fingerprint, project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count1,extract_count2,confidence): buffer[0].append({"main_docid":main_docid,"docid":docid,"docchannel":docchannel,"doctitle":doctitle, "doctitle_refine":doctitle_refine,"area":area,"province":province, "city":city,"district":district,"web_source_no":web_source_no,"fingerprint":fingerprint, "project_code":project_code,"project_name":project_name,"tenderee":tenderee,"agency":agency, "win_tenderer":win_tenderer,"bidding_budget":bidding_budget,"win_bid_price":win_bid_price, "extract_count1":extract_count1,"extract_count2":extract_count2,"confidence":confidence}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0][:100]) buffer[0] = buffer[0][:100] def getSameKeys(self,_dict1,_dict2): list_keys = [] for k,v in _dict1.items(): if k in ["area","city","confidence","district","extract_count1","extract_count2","main_docid","province"]: continue v2 = _dict2.get(k,"") if v is not None and v!="" and v2 is not None and v2!="" and v==v2: list_keys.append(k) list_keys.sort(key=lambda x:x) return "=".join(list_keys) def terminate(self, buffer): list_group = [] the_group = buffer[0] the_group.sort(key=lambda x:x["confidence"],reverse=True) if len(the_group)>5: keys = ["doctitle","tenderee","win_tenderer","bidding_budget","win_bid_price"] else: keys = ["tenderee","win_tenderer","bidding_budget","win_bid_price"] #置信度 list_key_index = [] for _k in keys: if _k=="doctitle": list_key_index.append(getDiffIndex(the_group,_k,confidence=30)) else: list_key_index.append(getDiffIndex(the_group,_k)) final_group = [] _index = min(list_key_index) if _index>1: for item in the_group[:_index]: final_group.append(item) list_rules = [] for i in range(len(final_group)): for j in range(i+1,len(final_group)): _dict1 = final_group[i] _dict2 = final_group[j] _rule = self.getSameKeys(_dict1,_dict2) list_rules.append([_rule,_dict1.get("docid"),_dict2.get("docid")]) return json.dumps(list_rules) @annotate('string -> string,bigint,bigint') class f_autorule_group_extract(BaseUDTF): ''' 从最后的结果中获取组 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,rules_json): list_rules = json.loads(rules_json) for _rule in list_rules: self.forward(_rule[0],_rule[1],_rule[2]) if __name__ == '__main__': # _str1 = "SXXY-ZBP-GG-2020002" # _str2 = "SXXY-ZBP-GG-2020002" # print(getSimilarityOfString(_str1,_str2)) print(check_doctitle("强化桂城街道工地扬尘防控监管巡查第三方(二次)","广东省强化桂城街道工地扬尘防控监管巡查第三方(二次)")) # print(check_codes(["F-2022-027(MASCG-2-F-F-2022-0462)"],["F-2022-027(MASCG-2-F-F-2022-0462)"])) # print(check_product(None,None)) # print(check_code("4451020073383382206021325","4451020073383382206021322")) # print(check_money("550.0","440.0","","")) # for i in range(0,2): # print(i) # location_pattern = re.compile(".{1,2}市|.{1,2}区|.{1,2}镇|.{1,2}县|.{1,2}村") # print(re.findall(location_pattern,"宁古线乡村振兴高优农业融合发展建设项目(洋中镇前路富代都村示范点农用塑料薄膜棚)")) # print(re.findall(location_pattern,"宁古线乡村振兴高优农业融合发展建设项目(洋中镇天湖村粮蔬基地农用塑料薄膜棚)")) # package_number_pattern = re.compile("(?P(((([^承]|^)包|标[段号的包]|分?包|包组|包件)编?号?|子项目|项目类型)[::]?[0-9A-Za-z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦ]{1,4}[^\.]?)[^至]?|((?![\.])第?[ⅠⅡⅢⅣⅤⅥⅦ0-9A-Za-z一二三四五六七八九十]{1,4}(包号|标[段号的包]|分?包)))") # 第? 去掉问号 修复 纯木浆8包/箱复印 这种作为包号 # _match = re.search(package_number_pattern,"2021年盘山县高标准农田建设项目三标段(高升街道)开标记录") # if _match is not None: # print(_match.groupdict()["name"]) # print(re.findall("((标[段号的包])[::]?[0-9A-Za-z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦ]{1,4})","[南宁市]桂林银行南宁办公大楼装修工程标段Ⅲ")) # print(check_doctitle("[南宁市]桂林银行南宁办公大楼装修工程标段Ⅲ","桂林银行南宁办公大楼装修工程标段ⅡGXYLG20182005-N中标公告")) # c = f_get_extractCount() # _json = ''' # { "attachmentTypes": "", "bidway": "", "code": [ "LCQTCG-2022-313" ], "cost_time": { "attrs": 0.02, "codename": 0.16, "deposit": 0.0, "nerToken": 0.8400000000000001, "person": 0.01, "prem": 0.02, "preprocess": 0.96, "product": 0.12, "product_attrs": 0.01, "punish": 0.11, "roleRuleFinal": 0.0, "rule": 0.0, "rule_channel": 0.0, "tableToText": 0.09000381469726562, "tendereeRuleRecall": 0.0, "time": 0.01, "total_unit_money": 0.0 }, "demand_info": { "data": [], "header": [], "header_col": [] }, "deposit_patment_way": "", "docchannel": { "docchannel": "招标公告", "doctype": "采招数据" }, "docid": "", "doctitle_refine": "郑济高铁聊城西站配套基础设施建设项目一期枢纽功能区建设(一标段)膨胀剂(暂估价)项目", "exist_table": 1, "extract_count": 5, "fail_reason": "", "fingerprint": "md5=b1ab0ee9cf9e1c5acc17477b9c0433cc", "match_enterprise": [], "match_enterprise_type": 0, "moneysource": "", "name": "郑济高铁聊城西站配套基础设施建设项目一期枢纽功能区建设工程(一标段)膨胀剂(暂估价)采购项目", "nlp_enterprise": [ "中建八局第一建设有限公司", "山东东岳项目管理有限公司", "聊城市公共资源交易中心", "江苏国泰新点软件有限公司" ], "person_review": [], "prem": { "Project": { "code": "", "roleList": [ { "linklist": [ [ "", "15540110649" ] ], "role_money": { "discount_ratio": "", "downward_floating_ratio": "", "floating_ratio": "", "money": 0, "money_unit": "" }, "role_name": "tenderee", "role_text": "中建八局第一建设有限公司", "serviceTime": "" }, { "linklist": [ [ "武工", "0635-2992305" ] ], "role_money": { "discount_ratio": "", "downward_floating_ratio": "", "floating_ratio": "", "money": 0, "money_unit": "" }, "role_name": "agency", "role_text": "山东东岳项目管理有限公司", "serviceTime": "" } ], "tendereeMoney": 0, "tendereeMoneyUnit": "" }, "一": { "code": "", "roleList": [], "tendereeMoney": 3267000.0, "tendereeMoneyUnit": "万元" } }, "process_time": "2022-05-30 14:31:13", "product": [ "枢纽功能区建设工程", "膨胀剂", "配套基础设施建设" ], "product_attrs": { "data": [], "header": [], "header_col": [] }, "serviceTime": "", "success": true, "time_bidclose": "2022-06-16", "time_bidopen": "2022-06-16", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnestMoneyEnd": "", "time_earnestMoneyStart": "", "time_getFileEnd": "2022-06-01", "time_getFileStart": "2022-05-26", "time_publicityEnd": "", "time_publicityStart": "", "time_registrationEnd": "", "time_registrationStart": "", "time_release": "2022-05-25", "total_tendereeMoney": 0, "total_tendereeMoneyUnit": "" } # ''' # c = f_get_nlp_enterprise() # print(c.evaluate("山东东岳项目管理有限公司",_json)) # print(c.evaluate(_json)) # c = f_set_docid()f_get_single_merged_bychannel # _s = ''' # 154064190 1512489600 4 03689-11 1 大连市妇女儿童医疗中心 # 154064188 1512489600 4 03689-11 1 大连市妇女儿童医疗中心 # 154064175 1512489600 4 03689-11 1 大连市妇女儿童医疗中心 # 30201228 1512489600 4 04111-1 1 大连市妇女儿童医疗中心 # 154064160 1512489600 4 03689-11 1 大连市妇女儿童医疗中心 # 154064168 1512489600 4 03689-11 1 大连市妇女儿童医疗中心 # ''' # buffer = c.new_buffer() # for _line in _s.split("\n"): # _line = _line.strip() # if _line=="": # continue # l_column = _line.split("\t") # print(l_column) # docid,page_time_stamp,extract_count,web_source_no,num,tenderee = l_column # page_time_stamp = int(page_time_stamp) # extract_count = int(extract_count) # num = 1 # c.iterate(buffer,docid,page_time_stamp,extract_count,web_source_no,num,tenderee) # print(c.terminate(buffer))