1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036 |
- #coding:UTF8
- from odps.udf import annotate
- from odps.udf import BaseUDTF
- from odps.udf import BaseUDAF
- @annotate('string,string -> string,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint')
- class f_decode_extract(BaseUDTF):
- def __init__(self):
- import logging
- import json
- import time,re
- global json,logging,time,re
- self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- self.dict_channel = {"公告变更":51,
- "招标公告":52,
- "中标信息":101,
- "招标预告":102,
- "招标答疑":103,
- "资审结果":105,
- "法律法规":106,
- "新闻资讯":107,
- "采购意向":114,
- "拍卖出让":115,
- "土地矿产":116,
- "产权交易":117,
- "废标公告":118,
- "候选人公示":119,
- "合同公告":120}
- def process(self, extractjson,otherjson):
- if extractjson is not None:
- _extract = json.loads(extractjson)
- else:
- _extract = {}
- if otherjson is not None:
- _other = json.loads(otherjson)
- else:
- _other = {}
- project_code = ""
- project_name = ""
- tenderee = ""
- agency = ""
- win_tenderer = ""
- bidding_budget = ""
- win_bid_price = ""
- fingerprint = ""
- page_time_stamp = 0
- docchannel = 0
- extract_count = 0
- page_time = _other.get("pageTime",time.strftime('%Y-%m-%d',time.localtime()))
- doctitle = _other.get("doctitle","")
- doctitle_refine = re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '', doctitle)
- area = _other.get("area","")
- province = _other.get("province","")
- city = _other.get("city","")
- district = _other.get("district","")
- web_source_no = _other.get("webSourceNo","")
- # docchannel = _other.get("docchannel",0)
- docchannel = self.dict_channel.get(_extract.get("docchannel",""),0)
- if re.search(self.time_pattern,page_time) is not None:
- try:
- timeArray = time.strptime(page_time[:11], "%Y-%m-%d")
- page_time_stamp = int(time.mktime(timeArray))
- except Exception as e:
- pass
- list_code = _extract.get("code",[])
- if len(list_code)>0:
- project_code = list_code[0]
- project_name = _extract.get("name","")
- fingerprint = _extract.get("fingerprint","")
- dict_pack = _extract.get("prem",{})
- logging.info(dict_pack)
- for _key in dict_pack.keys():
- if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0:
- extract_count += 1
- if bidding_budget=="":
- bidding_budget = str(float(dict_pack[_key]["tendereeMoney"]))
- for _role in dict_pack[_key]["roleList"]:
- extract_count += 1
- if _role[2]!='' and float(_role[2])>0:
- extract_count += 1
- if _role[0]=="tenderee":
- tenderee = _role[1]
- if _role[0]=="win_tenderer":
- if win_tenderer=="":
- win_tenderer = _role[1]
- if _role[2]!='' and float(_role[2])>0:
- extract_count += 1
- if win_bid_price=="":
- win_bid_price = str(float(_role[2]))
- if _role[0]=="agency":
- agency = _role[1]
- if project_code!="":
- extract_count += 1
- if project_name!="":
- extract_count += 1
- logging.info(page_time+doctitle+doctitle_refine+area+province+city+
- district+web_source_no+project_code+project_name+tenderee+agency+win_tenderer+bidding_budget+win_bid_price)
- self.forward(page_time,page_time_stamp,docchannel,doctitle,doctitle_refine,area,province,city,
- district,web_source_no,fingerprint,project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count)
- @annotate("string->bigint")
- class f_get_extractCount(object):
- def __init__(self):
- import time
- global time
- import logging
- import json
- import re
- global json,logging,re
- self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def evaluate(self, extractjson):
- if extractjson is not None:
- _extract = json.loads(extractjson)
- else:
- _extract = {}
- dict_pack = _extract.get("prem",{})
- extract_count = 0
- list_code = _extract.get("code",[])
- if len(list_code)>0:
- project_code = list_code[0]
- else:
- project_code = ""
- project_name = _extract.get("name","")
- bidding_budget = ""
- win_tenderer = ""
- win_bid_price = ""
- for _key in dict_pack.keys():
- if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0:
- extract_count += 1
- if bidding_budget=="":
- bidding_budget = str(float(dict_pack[_key]["tendereeMoney"]))
- for _role in dict_pack[_key]["roleList"]:
- extract_count += 1
- if _role[2]!='' and float(_role[2])>0:
- extract_count += 1
- if _role[0]=="tenderee":
- tenderee = _role[1]
- if _role[0]=="win_tenderer":
- if win_tenderer=="":
- win_tenderer = _role[1]
- if _role[2]!='' and float(_role[2])>0:
- extract_count += 1
- if win_bid_price=="":
- win_bid_price = str(float(_role[2]))
- if _role[0]=="agency":
- agency = _role[1]
- if project_code!="":
- extract_count += 1
- if project_name!="":
- extract_count += 1
- return extract_count
- @annotate('string,string,string,string,string -> string,string,string,bigint')
- class f_decode_sub_docs_json(BaseUDTF):
- def __init__(self):
- import logging
- import json
- global json,logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def process(self, project_code,project_name,tenderee,agency,sub_docs_json):
- columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""}
- extract_count = 0
- if project_code is not None and project_code!="":
- extract_count += 1
- if project_name is not None and project_name!="":
- extract_count += 1
- if tenderee is not None and tenderee!="":
- extract_count += 1
- if agency is not None and agency!="":
- extract_count += 1
- if sub_docs_json is not None:
- for sub_docs in json.loads(sub_docs_json):
- for _key_sub_docs in sub_docs.keys():
- extract_count += 1
- if _key_sub_docs in columns:
- if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]:
- if _key_sub_docs in ["bidding_budget","win_bid_price"]:
- if float(sub_docs[_key_sub_docs])>0:
- columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs]))
- else:
- columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
- self.forward(columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count)
- @annotate("string->bigint")
- class totimestamp(object):
- def __init__(self):
- import time
- global time
- import logging
- import json
- import re
- global json,logging,re
- self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def evaluate(self, str_time):
- try:
- logging.info(str_time)
- if str_time is not None and re.search(self.time_pattern,str_time) is not None:
- timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
- timeStamp = int(time.mktime(timeArray))
- return timeStamp
- else:
- return 0
- except Exception as e:
- return 0
- @annotate("string->string")
- class refind_name(object):
- def __init__(self):
- import logging
- import re
- global logging,re
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def evaluate(self, title):
- if title is not None:
- return re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|\[|\]|【|】', '', title)
- return ""
- @annotate('bigint,bigint,bigint,string,bigint,string->string')
- class f_set_docid(BaseUDAF):
- '''
- 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
- '''
- def __init__(self):
- import json
- global json
- def new_buffer(self):
- return [[]]
- def iterate(self, buffer,docid, page_time_stamp,extract_count,defind_column,defind_count,tenderee):
- buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,
- "defind_column":defind_column,"defind_count":defind_count,"tenderee":tenderee})
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- def terminate(self, buffer):
- list_docs = buffer[0]
- list_docs.sort(key=lambda x:x["page_time_stamp"])
- list_group = []
- _begin = 0
- defind_count = 0
- if len(list_docs)>0:
- defind_count = list_docs[0]["defind_count"]
- for i in range(len(list_docs)-1):
- if abs(list_docs[i]["page_time_stamp"]-list_docs[i+1]["page_time_stamp"])<=86400*2:
- continue
- else:
- _group = []
- _set_column = set()
- _set_tenderee = set()
- for j in range(_begin,i+1):
- if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
- _set_tenderee.add(list_docs[j]["tenderee"])
- _set_column.add(list_docs[j]["defind_column"])
- _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
- if len(_group)>=3 and len(_set_tenderee)>1:
- pass
- else:
- if len(_group)>1:
- if defind_count==2:
- if len(_set_column)>=2:
- list_group.append(_group)
- elif defind_count==1:
- if len(_set_column)==1:
- list_group.append(_group)
- elif defind_count==0:
- list_group.append(_group)
- _begin = i+1
- if len(list_docs)>1:
- _set_column = set()
- _set_tenderee = set()
- _group = []
- for j in range(_begin,len(list_docs)):
- if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
- _set_tenderee.add(list_docs[j]["tenderee"])
- _set_column.add(list_docs[j]["defind_column"])
- _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
- if len(_group)>=3 and len(_set_tenderee)>1:
- pass
- else:
- if len(_group)>1:
- if defind_count==2:
- if len(_set_column)>=2:
- list_group.append(_group)
- elif defind_count==1:
- if len(_set_column)==1:
- list_group.append(_group)
- elif defind_count==0:
- list_group.append(_group)
- return json.dumps(list_group)
- def isEmpty(_str):
- if _str is None or _str=="":
- return True
- return False
- @annotate('bigint,bigint,bigint,string,string,string,string,string,string,string,string->string')
- class f_set_docid_binaryChart(BaseUDAF):
- '''
- 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
- '''
- def __init__(self):
- import json
- global json
- def new_buffer(self):
- return [[]]
- def iterate(self, buffer,docid, page_time_stamp,extract_count,project_code,project_name,tenderee,bidding_budget,win_tenderer,win_bid_price,agency,web_source_no):
- buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,
- "project_code":project_code,"project_name":project_name,"tenderee":tenderee,
- "bidding_budget":bidding_budget,"win_tenderer":win_tenderer,"win_bid_price":win_bid_price,
- "agency":agency,"web_source_no":web_source_no})
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- def terminate(self, buffer):
- list_docs = buffer[0]
- list_timeGroups = split_with_time(list_docs,"page_time_stamp",86400*2)
- list_group = []
- empty_key = ["project_code","bidding_budget","win_tenderer","win_bid_price","agency"]
- for _timeGroups in list_timeGroups:
- list_empty = []
- list_notEmpty = []
- for _item in _timeGroups:
- empty_flag = True
- for _key in empty_key:
- if not isEmpty(_item[_key]):
- empty_flag = False
- break
- if empty_flag:
- list_empty.append(_item)
- else:
- list_notEmpty.append(_item)
- for _e in list_empty:
- _group = [{"docid":_e["docid"],"extract_count":_e["extract_count"]}]
- _e_tenderee = _e["tenderee"]
- for _ne in list_notEmpty:
- if "set_webSource" not in _ne:
- _ne["set_webSource"] = set()
- _ne["set_webSource"].add(_ne["web_source_no"])
- _suit = False
- if not isEmpty(_e_tenderee) and _e_tenderee==_ne["tenderee"]:
- _suit = True
- elif isEmpty(_e_tenderee):
- _suit = True
- if _suit:
- if _e["web_source_no"] not in _ne["set_webSource"]:
- _ne["set_webSource"].add(_e["web_source_no"])
- _group.append({"docid":_ne["docid"],"extract_count":_ne["extract_count"]})
- break
- if len(_group)>1:
- list_group.append(_group)
- return json.dumps(list_group)
- def split_with_time(list_dict,sort_key,timedelta=86400*2):
- if len(list_dict)>0:
- if sort_key in list_dict[0]:
- list_dict.sort(key=lambda x:x[sort_key])
- list_group = []
- _begin = 0
- for i in range(len(list_dict)-1):
- if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])<timedelta:
- continue
- else:
- _group = []
- for j in range(_begin,i+1):
- _group.append(list_dict[j])
- if len(_group)>1:
- list_group.append(_group)
- _begin = i + 1
- if len(list_dict)>1:
- _group = []
- for j in range(_begin,len(list_dict)):
- _group.append(list_dict[j])
- if len(_group)>1:
- list_group.append(_group)
- return list_group
- return [list_dict]
- @annotate('bigint,bigint,bigint,string,string,string,string,string->string')
- class f_set_docid_limitNum_contain(BaseUDAF):
- '''
- 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
- '''
- def __init__(self):
- import logging
- import json,re
- global json,logging,re
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def new_buffer(self):
- return [list()]
- def iterate(self, buffer,docid,page_time_stamp,extract_count,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column):
- buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,"set_limit_column1":set_limit_column1,
- "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
- "contain_column":contain_column})
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- def terminate(self, buffer):
- list_split = split_with_time(buffer[0],"page_time_stamp")
- list_group = []
- for _split in list_split:
- flag = True
- keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
- for _key in keys:
- logging.info(_key+str(getSet(_split,_key)))
- if len(getSet(_split,_key))>1:
- flag = False
- break
- MAX_CONTAIN_COLUMN = None
- #判断组内每条公告是否包含
- if flag:
- for _d in _split:
- contain_column = _d["contain_column"]
- if contain_column is not None and contain_column !="":
- if MAX_CONTAIN_COLUMN is None:
- MAX_CONTAIN_COLUMN = contain_column
- else:
- if len(MAX_CONTAIN_COLUMN)<len(contain_column):
- if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
- flag = False
- break
- MAX_CONTAIN_COLUMN = contain_column
- else:
- if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
- flag = False
- break
- if flag:
- if len(_split)>1:
- _group = []
- for _item in _split:
- _group.append({"docid":_item["docid"],"extract_count":_item["extract_count"]})
- list_group.append(_group)
- return json.dumps(list_group)
- @annotate('bigint->string')
- class f_stamp_squence(BaseUDAF):
- '''
- 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
- '''
- def __init__(self):
- import json
- global json
- import logging
- global logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def new_buffer(self):
- return [set()]
- def iterate(self, buffer,page_time_stamp):
- buffer[0].add(page_time_stamp)
- def merge(self, buffer, pbuffer):
- buffer[0] |= pbuffer[0]
- def terminate(self, buffer):
- if 0 in buffer[0]:
- buffer[0].remove(0)
- list_stamp = list(buffer[0])
- list_stamp.sort(key=lambda x:x)
- list_stamp_final = []
- _begin = 0
- _time_decase = 86400*2
- logging.info(str(list_stamp))
- for _index in range(len(list_stamp)-1):
- if list_stamp[_index+1]-list_stamp[_index]<_time_decase:
- continue
- else:
- list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[_index]+_time_decase])
- _begin = _index+1
- if len(list_stamp)>0:
- list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[-1]+_time_decase])
- return json.dumps(list_stamp_final)
- @annotate("bigint,string->bigint")
- class in_stamp(object):
- def __init__(self):
- import logging
- import re
- import json
- global logging,re,json
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def evaluate(self, page_time_stamp,json_stamp):
- list_stamp = json.loads(json_stamp)
- int_flag = 0
- for item in list_stamp:
- if page_time_stamp <item[0]:
- break
- if page_time_stamp>item[0] and page_time_stamp<item[1]:
- int_flag = 1
- break
- return int_flag
- def getConfidence(rule_id):
- if rule_id ==0:
- return 30
- elif rule_id >=1 and rule_id <30:
- return 20
- else:
- return 10
- @annotate('string,string -> string')
- class f_splitStr(BaseUDTF):
- '''
- 将多个组拆解成多条记录
- '''
- def __init__(self):
- import logging
- import json
- global json,logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def process(self, str_split,_split):
- try:
- for _s in str_split.split(_split):
- self.forward(_s)
- except Exception as e:
- pass
- @annotate('string,bigint -> bigint,bigint,bigint,bigint,bigint')
- class f_split_group_single(BaseUDTF):
- '''
- 将多个组拆解成多条记录
- '''
- def __init__(self):
- import logging
- import json
- global json,logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def process(self, json_set_docid,rule_id):
- list_group = json.loads(json_set_docid)
- for item in list_group:
- if len(item)>100:
- item.sort(key=lambda x:x["docid"],reverse=True)
- index_i = 0
- for index_j in range(1,len(item)):
- if item[index_i]["docid"]!=item[index_j]["docid"]:
- self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id))
- else:
- for index_i in range(len(item)):
- for index_j in range(len(item)):
- if index_i!=index_j and item[index_i]["docid"]!=item[index_j]["docid"]:
- self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id))
- @annotate('bigint,string->string')
- class group_document(BaseUDAF):
- '''
- 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
- '''
- def __init__(self):
- import json
- global json
- def new_buffer(self):
- return [[]]
- def iterate(self, buffer,id,json_set_docid):
- buffer[0].append({"id":id,"json_set_docid":json.loads(json_set_docid)})
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- def terminate(self, buffer):
- return json.dumps(buffer[0])
- @annotate('bigint,string,bigint,string -> bigint,bigint,string')
- class decare_document(BaseUDTF):
- '''
- 将多个组拆解成多条记录
- '''
- def __init__(self):
- import logging
- import json
- global json,logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def process(self,group_id1, json_list_doc1,group_id2,json_list_doc2):
- #y=x,少掉近一半的数据
- if group_id1>=group_id2:
- list_doc1 = json.loads(json_list_doc1)
- list_doc2 = json.loads(json_list_doc2)
- for _doc1 in list_doc1:
- for _doc2 in list_doc2:
- #同一个重复group不做判断
- if _doc1["id"]!=_doc2["id"]:
- #判断两个group是否有重复
- _set1 = set()
- for _item1 in _doc1["json_set_docid"]:
- _set1.add(_item1["docid"])
- _set2 = set()
- for _item2 in _doc2["json_set_docid"]:
- _set2.add(_item2["docid"])
- if len(_set1&_set2)>0:
- new_json_set_docid = _doc1["json_set_docid"]
- for _item2 in _doc2["json_set_docid"]:
- if _item2["docid"] not in _set1:
- new_json_set_docid.append(_item2)
- self.forward(_doc1["id"],_doc2["id"],json.dumps(new_json_set_docid))
- def getBestDocid(list_pair):
- # [docid1,extract_count1,docid2,extract_count2]
- # list_pair.sort(key=lambda x:x[3],reverse=True)
- # _max_count = max(list_pair[0][3],list_pair[0][1])
- # set_candidate = set()
- # if list_pair[0][1]==_max_count:
- # set_candidate.add(list_pair[0][0])
- # for item in list_pair:
- # if item[3]==_max_count:
- # set_candidate.add(item[2])
- # else:
- # break
- # list_candidate = list(set_candidate)
- # list_candidate.sort(key=lambda x:x)
- new_pair = []
- new_pair.append([list_pair[0][0],list_pair[0][0],list_pair[0][1]])
- for item in list_pair:
- new_pair.append([item[0],item[2],item[3]])
- new_pair.sort(key=lambda x:x[1])
- new_pair.sort(key=lambda x:x[2],reverse=True)
- return new_pair[0][1]
- @annotate('bigint,bigint,bigint,bigint->string')
- class choose_document(BaseUDAF):
- '''
- 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
- '''
- def __init__(self):
- import json
- global json
- def new_buffer(self):
- return [[]]
- def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
- buffer[0].append([docid1,extract_count1,docid2,extract_count2])
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- def terminate(self, buffer):
- list_pair = buffer[0]
- _set = set()
- for item in buffer[0]:
- _set.add(str(item[2]))
- list_dumplicate = list(_set)
- best_docid = getBestDocid(list_pair)
- if best_docid==list_pair[0][0]:
- save_flag = 1
- else:
- save_flag = 0
- return json.dumps({"save_flag":save_flag,"dumplicates":list_dumplicate})
- @annotate('string -> bigint,string')
- class f_get_choose_document(BaseUDTF):
- '''
- 将多个组拆解成多条记录
- '''
- def __init__(self):
- import logging
- import json
- global json,logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def process(self,json_choose):
- if json_choose is None:
- self.forward(1,None)
- else:
- _choose = json.loads(json_choose)
- self.forward(_choose["save_flag"],",".join(_choose["dumplicates"]))
- @annotate('bigint,bigint,bigint,bigint->string')
- class group_document_bestFirst(BaseUDAF):
- '''
- 将组里面最优的放在前面
- '''
- def __init__(self):
- import json
- global json
- def new_buffer(self):
- return [[]]
- def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
- buffer[0].append([docid1,extract_count1,docid2,extract_count2])
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- def terminate(self, buffer):
- list_pair = buffer[0]
- _set = set()
- for item in buffer[0]:
- _set.add(item[2])
- _set.add(list_pair[0][0])
- best_docid = getBestDocid(list_pair)
- _set.remove(best_docid)
- list_dumplicate = list(_set)
- list_dumplicate.sort(key=lambda x:x)
- list_dumplicate.insert(0,best_docid)
- list_dumplicate_str = []
- for item in list_dumplicate:
- list_dumplicate_str.append(str(item))
- return ",".join(list_dumplicate_str)
- @annotate('string -> bigint,string')
- class f_get_best_dumplicates(BaseUDTF):
- '''
- 得到每个分组中最优的那一条及其重复记录
- '''
- def __init__(self):
- import logging
- import json
- global json,logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def process(self,list_dumplicate_str):
- if list_dumplicate_str is None:
- pass
- else:
- list_dumplicate = list_dumplicate_str.split(",")
- if len(list_dumplicate)>0:
- self.forward(int(list_dumplicate[0]),",".join(list_dumplicate[1:]))
- else:
- pass
- @annotate('bigint,bigint->string')
- class bridge2group(BaseUDAF):
- '''
- 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
- '''
- def __init__(self):
- import json
- global json
- def new_buffer(self):
- return [set()]
- def iterate(self, buffer,docid1,docid2):
- buffer[0].add(docid1)
- buffer[0].add(docid2)
- def merge(self, buffer, pbuffer):
- buffer[0] |= pbuffer[0]
- def terminate(self, buffer):
- list_pair = list(buffer[0])
- list_pair.sort(key=lambda x:x,reverse=True)
- return json.dumps(list_pair)
- @annotate('string -> bigint,bigint')
- class group2bridge(BaseUDTF):
- '''
- 将多个组拆解成多条记录
- '''
- def __init__(self):
- import logging
- import json
- global json,logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def process(self,json_list_docid):
- list_docid = json.loads(json_list_docid)
- for _docid in list_docid:
- self.forward(list_docid[-1],_docid)
- @annotate('bigint,bigint,string -> bigint')
- class f_get_dump_docid(BaseUDTF):
- '''
- 将多个组拆解成多条记录
- '''
- def __init__(self):
- import logging
- import json
- global json,logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def process(self,docid,save_flag,dumplicates):
- if save_flag==0:
- self.forward(docid)
- if dumplicates is not None:
- list_docid = dumplicates.split(",")
- if len(list_docid)>0:
- for _docid in list_docid[1:]:
- self.forward(int(_docid))
- else:
- if dumplicates is not None:
- list_docid = dumplicates.split(",")
- if len(list_docid)>0:
- for _docid in list_docid:
- self.forward(int(_docid))
- @annotate('string -> bigint,bigint')
- class f_get_docid(BaseUDTF):
- '''
- 将多个组拆解成多条记录
- '''
- def __init__(self):
- import logging
- import json
- global json,logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def process(self,json_set_docid):
- team_id = 0
- if json_set_docid is not None:
- list_docses = json.loads(json_set_docid)
- for list_docs in list_docses:
- team_id += 1
- for item in list_docs:
- self.forward(team_id,item["docid"])
- @annotate("string->bigint")
- class get_count_dump(object):
- def __init__(self):
- import logging
- import re
- global logging,re
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def evaluate(self, title):
- _count = 0
- if title is not None:
- _count = len(title.split(","))
- return _count
- def getSet(list_dict,key):
- _set = set()
- for item in list_dict:
- if key in item:
- if item[key]!='' and item[key] is not None:
- if re.search("^\d[\d\.]*$",item[key]) is not None:
- _set.add(str(float(item[key])))
- else:
- _set.add(str(item[key]))
- return _set
- def getDiffIndex(list_dict,key,confidence=100):
- _set = set()
- for _i in range(len(list_dict)):
- item = list_dict[_i]
- if item["confidence"]>=confidence:
- continue
- if key in item:
- if item[key]!='' and item[key] is not None:
- if re.search("^\d+(\.\d+)?$",item[key]) is not None:
- _set.add(str(float(item[key])))
- else:
- _set.add(str(item[key]))
- if len(_set)>1:
- return _i
- return len(list_dict)
- @annotate('bigint,string -> bigint,bigint')
- class f_getGroup_dumpFinal(BaseUDTF):
- '''
- 从最后的结果中获取组
- '''
- def __init__(self):
- import logging
- import json
- global json,logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def process(self,docid,dumplicates):
- self.forward(int(docid),int(docid))
- if dumplicates is not None:
- list_docids = dumplicates.split(",")
- for _docid in list_docids:
- self.forward(int(docid),int(_docid))
- @annotate('bigint,bigint,string,string,string,string,bigint,bigint,bigint->string')
- class f_redump_limit_num(BaseUDAF):
- '''
- 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
- 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值
- '''
- def __init__(self):
- import logging
- import json,re
- global json,logging,re
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def new_buffer(self):
- return [list()]
- def iterate(self, buffer,main_docid,docid,doctitle,set_limit_column2,set_limit_column3,set_limit_column4,extract_count1,extract_count2,confidence):
- buffer[0].append({"main_docid":main_docid,"docid":docid,"doctitle":doctitle,"set_limit_column2":set_limit_column2,
- "set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,"extract_count1":extract_count1,
- "extract_count2":extract_count2,"confidence":confidence})
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- def terminate(self, buffer):
- list_group = []
- the_group = buffer[0]
- the_group.sort(key=lambda x:x["confidence"],reverse=True)
- if len(the_group)>5:
- keys = ["doctitle","set_limit_column2","set_limit_column3","set_limit_column4"]
- else:
- keys = ["set_limit_column2","set_limit_column3","set_limit_column4"]
- final_group = []
- #置信度
- list_key_index = []
- for _k in keys:
- if _k=="doctitle":
- list_key_index.append(getDiffIndex(the_group,_k,confidence=30))
- else:
- list_key_index.append(getDiffIndex(the_group,_k))
- _index = min(list_key_index)
- if _index>1:
- main_docid = the_group[0]["main_docid"]
- for item in the_group[:_index]:
- if item["docid"]!=main_docid:
- final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"],"confidence":item["confidence"]})
- # stay = True
- # for _key in keys:
- # if len(getSet(the_group,_key))>1:
- # stay = False
- # break
- #
- # if stay:
- # main_docid = the_group[0]["main_docid"]
- # for item in the_group:
- # if item["docid"]!=main_docid:
- # final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"]})
- return json.dumps(final_group)
- @annotate('string -> bigint,bigint,bigint,bigint,bigint')
- class f_get_dumpFinal_checked(BaseUDTF):
- '''
- 从最后的结果中获取组
- '''
- def __init__(self):
- import logging
- import json
- global json,logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def process(self,list_group):
- if list_group is not None:
- final_group = json.loads(list_group)
- for _group in final_group:
- self.forward(_group["docid1"],_group["docid2"],_group["extract_count1"],_group["extract_count2"],_group["confidence"])
- @annotate('string -> bigint')
- class f_getDumplicateDocids(BaseUDTF):
- '''
- 从最后的结果中获取组
- '''
- def __init__(self):
- import logging
- import json
- global json,logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def process(self,dumplicates):
- list_docids = dumplicates.split(",")
- for _d in list_docids:
- self.forward(int(_d))
|