#coding=utf-8 # evaluate为该方法的入口函数,必须用这个名字 from odps.udf import annotate from odps.distcache import get_cache_archive from odps.distcache import get_cache_file from odps.udf import BaseUDTF from odps.udf import BaseUDAF from odps.distcache import get_cache_archive from odps.distcache import get_cache_file # 配置pandas依赖包 def include_package_path(res_name): import os, sys archive_files = get_cache_archive(res_name) dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files if '.dist_info' not in f.name], key=lambda v: len(v)) sys.path.append(dir_names[0]) return os.path.dirname(dir_names[0]) # 可能出现类似RuntimeError: xxx has been blocked by sandbox # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true def include_file(file_name): import os, sys so_file = get_cache_file(file_name) sys.path.append(os.path.dirname(os.path.abspath(so_file.name))) def include_so(file_name): import os, sys so_file = get_cache_file(file_name) with open(so_file.name, 'rb') as fp: content=fp.read() so = open(file_name, "wb") so.write(content) so.flush() so.close() #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入 def init_env(list_files,package_name): import os,sys if len(list_files)==1: so_file = get_cache_file(list_files[0]) cmd_line = os.path.abspath(so_file.name) os.system("unzip -o %s -d %s"%(cmd_line,package_name)) elif len(list_files)>1: cmd_line = "cat" for _file in list_files: so_file = get_cache_file(_file) cmd_line += " "+os.path.abspath(so_file.name) cmd_line += " > temp.zip" os.system(cmd_line) os.system("unzip -o temp.zip -d %s"%(package_name)) # os.system("rm -rf %s/*.dist-info"%(package_name)) # return os.listdir(os.path.abspath("local_package")) # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package"))) # os.system("source ~/.bashrc") sys.path.insert(0,os.path.abspath(package_name)) # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real")) def load_project(): start_time = time.time() init_env(["BiddingKG.zip.env.baseline"],str(uuid.uuid4())) # init_env(["BiddingKG.zip.env.backup"],str(uuid.uuid4())) logging.info("init biddingkg.zip.env.line cost %d"%(time.time()-start_time)) def load_vector(): start_time = time.time() init_env(["wiki_128_word_embedding_new.vector.env"],".") logging.info("init wiki_128_word_embedding_new cost %d"%(time.time()-start_time)) start_time = time.time() init_env(["enterprise.zip.env"],".") # init_env(["LEGAL_ENTERPRISE.zip.env"],".") logging.info("init legal_enterprise.zip.env cost %d"%(time.time()-start_time)) start_time = time.time() init_env(["so.env"],".") logging.info("init so.env cost %d"%(time.time()-start_time)) def load_py(): start_time = time.time() # self.out = init_env(["envs_py37.zip.env"],str(uuid.uuid4())) include_package_path("envs_py37.env.zip") logging.info("init envs_py37 cost %d"%(time.time()-start_time)) def multiLoadEnv(): load_project() load_vector() load_py() import json class MyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, bytes): return str(obj, encoding='utf-8') elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64)): return float(obj) elif isinstance(obj,(np.int64)): return int(obj) return json.JSONEncoder.default(self, obj) @annotate("string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string->string") class f_json_extract_online(BaseUDTF): def __init__(self): import uuid global uuid import logging import datetime import numpy as np global json,MyEncoder,time,log,MyEncoder,np def process(self,page_time,doctitle, tenderee,tenderee_contact,tenderee_phone,agency, agency_contact,agency_phone,sub_docs_json,project_code, project_name,product,time_bidclose,time_bidopen,time_release, moneysource,person_review,bidway,punish,serviceTime): _dict = {} _dict["code"] = project_code if project_code is not None else "" _dict["name"] = project_name if project_name is not None else "" if product is not None and product!="": _dict["product"] = product.split(",") else: _dict["product"] = [] _dict["time_bidclose"] = time_bidclose if time_bidclose is not None else "" _dict["time_bidopen"] = time_bidopen if time_bidopen is not None else "" _dict["time_release"] = time_release if time_release is not None else "" _dict["moneysource"] = moneysource if moneysource is not None else "" if person_review not in (None,''): _dict["person_review"] = person_review.split(",") else: _dict["person_review"] = [] _dict["bidway"] = bidway if bidway is not None else "" _dict["serviceTime"] = serviceTime if serviceTime is not None else "" if punish not in (None,''): _punish = json.loads(punish) else: _punish = {} for k,v in _punish.items(): _dict[k] = v if sub_docs_json not in (None,''): _docs = json.loads(sub_docs_json) else: _docs = [{}] set_comp_contact = set() if tenderee not in (None,"") and tenderee_contact not in (None,""): set_comp_contact.add("%s-%s-%s-%s"%("tenderee",tenderee,tenderee_contact,tenderee_phone)) if agency not in (None,"") and agency_contact not in (None,""): set_comp_contact.add("%s-%s-%s-%s"%("agency",agency,agency_contact,agency_phone)) set_pack_comp = set() if tenderee not in (None,""): set_pack_comp.add("%s-%s-%s"%("Project","tenderee",tenderee)) if agency not in (None,""): set_pack_comp.add("%s-%s-%s"%("Project","agency",agency)) set_pack_money = set() for _d in _docs: if len(_d.keys())>0: sub_project_name = _d.get("sub_project_name","Project") bidding_budget = float(_d.get("bidding_budget",0)) win_tenderer = _d.get("win_tenderer","") win_bid_price = float(_d.get("win_bid_price",0)) win_tenderer_manager = _d.get("win_tenderer_manager","") win_tenderer_phone = _d.get("win_tenderer_phone","") second_tenderer = _d.get("second_tenderer","") second_bid_price = float(_d.get("second_bid_price",0)) second_tenderer_manager = _d.get("second_tenderer_manager","") second_tenderer_phone = _d.get("second_tenderer_phone","") third_tenderer = _d.get("third_tenderer","") third_bid_price = float(_d.get("third_bid_price",0)) third_tenderer_manager = _d.get("third_tenderer_manager","") third_tenderer_phone = _d.get("third_tenderer_phone","") if win_tenderer not in (None,"") and win_tenderer_manager not in (None,""): set_comp_contact.add("%s-%s-%s-%s"%("win_tenderee",win_tenderer,win_tenderer_manager,win_tenderer_phone)) if second_tenderer not in (None,"") and second_tenderer_manager not in (None,""): set_comp_contact.add("%s-%s-%s-%s"%("second_tenderer",second_tenderer,second_tenderer_manager,second_tenderer_phone)) if third_tenderer not in (None,"") and third_tenderer_manager not in (None,""): set_comp_contact.add("%s-%s-%s-%s"%("third_tenderer",third_tenderer,third_tenderer_manager,third_tenderer_phone)) if win_tenderer not in (None,""): set_pack_comp.add("%s-%s-%s"%(sub_project_name,"win_tenderer",win_tenderer)) if second_tenderer not in (None,""): set_pack_comp.add("%s-%s-%s"%(sub_project_name,"second_tenderer",second_tenderer)) if third_tenderer not in (None,""): set_pack_comp.add("%s-%s-%s"%(sub_project_name,"third_tenderer",third_tenderer)) if bidding_budget>0: set_pack_money.add("%s-%s-%2f"%(sub_project_name,"bidding_budget",bidding_budget)) if win_bid_price>0: set_pack_money.add("%s-%s-%2f"%(sub_project_name,"win_tenderer",win_bid_price)) if second_bid_price>0: set_pack_money.add("%s-%s-%2f"%(sub_project_name,"second_tenderer",second_bid_price)) if third_bid_price>0: set_pack_money.add("%s-%s-%2f"%(sub_project_name,"third_tenderer",third_bid_price)) _dict["set_comp_contact"] = list(set_comp_contact) _dict["set_pack_comp"] = list(set_pack_comp) _dict["set_pack_money"] = list(set_pack_money) self.forward(json.dumps(_dict,cls=MyEncoder,sort_keys=True,indent=4,ensure_ascii=False)) @annotate("string,string->string") class f_compair_extract(object): def __init__(self): import logging import re import json global logging,re,json logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def evaluate(self, json_online,json_result): dict_online = json.loads(json_online) dict_result = json.loads(json_result) logging.info(json_online) dict_test = {} set_comp_contact = set() set_pack_comp = set() set_pack_money = set() logging.info("1") for k,v in dict_result.items(): if k in ["bidway","moneysource","time_bidclose","serviceTime","time_bidopen","time_release","name"]: dict_test[k] = v elif k in ["code"]: if len(v)>0: dict_test["code"] = v[0] else: dict_test["code"] = "" elif k in ["person_review","product"]: list_temp = v list_temp.sort(key=lambda x:x) dict_test[k] = list_temp elif k in ["punish"]: for k1,v1 in v.items(): dict_test[k1] = v1 elif k in ["prem"]: for _pack,_prem in v.items(): bidding_budget = float(_prem.get("tendereeMoney",0)) role_lists = _prem.get("roleList",[]) if bidding_budget>0: set_pack_money.add("%s-%s-%2f"%(_pack,"bidding_budget",bidding_budget)) for _role in role_lists: role_type = _role[0] role_name = _role[1] role_money = 0 if _role[2]=="" else float(_role[2]) contact_list = _role[3] for _person,_phone in contact_list: set_comp_contact.add("%s-%s-%s-%s"%(role_type,role_name,_person,_phone)) set_pack_comp.add("%s-%s-%s"%(_pack,role_type,role_name)) if role_money >0: set_pack_money.add("%s-%s-%2f"%(_pack,role_type,role_money)) dict_test["set_comp_contact"] = list(set_comp_contact) dict_test["set_pack_comp"] = list(set_pack_comp) dict_test["set_pack_money"] = list(set_pack_money) logging.info(dict_test) logging.info("2") dict_compair = {} set_keys_online = set(dict_online.keys()) set_keys_test = set(dict_test.keys()) union_keys = list(set_keys_online|set_keys_test) logging.info(str(union_keys)) for _key in union_keys: logging.info(_key) v_online = dict_online.get(_key,"") v_test = dict_test.get(_key,"") logging.info(v_online) logging.info(v_test) if isinstance(v_online,list) or isinstance(v_test,list): logging.info("3") if v_online=="": v_online = [] if v_test=="": v_test = [] v_online.sort(key=lambda x:x) v_test.sort(key=lambda x:x) s_online = set(v_online) s_test = set(v_test) diff_count = len(s_online-s_test)+len(s_test-s_online) dict_compair[_key+"_diff"] = diff_count dict_compair[_key+"_online"] = v_online dict_compair[_key+"_test"] = v_test elif isinstance(v_online,str): logging.info("4") if v_online==v_test: diff_count = 0 else: diff_count = 1 dict_compair[_key+"_diff"] = diff_count dict_compair[_key+"_online"] = v_online dict_compair[_key+"_test"] = v_test return json.dumps(dict_compair,sort_keys=True,indent=4,ensure_ascii=False) import hashlib def getMD5(sourceHtml): if sourceHtml is not None and len(sourceHtml)>0: if isinstance(sourceHtml,str): bs = sourceHtml.encode() elif isinstance(sourceHtml,bytes): bs = sourceHtml else: return "" md5 = hashlib.md5() md5.update(bs) return md5.hexdigest() return "" def getFingerprint(sourceHtml): md5 = getMD5(sourceHtml) if md5!="": _fingerprint = "md5=%s"%(md5) else: _fingerprint = "" return _fingerprint @annotate("string,string->string") class f_getFingerprint(object): def __init__(self): import logging import re import json global logging,re,json logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def evaluate(self, doctitle,dochtmlcon): fingerprint = getFingerprint(doctitle+dochtmlcon) return fingerprint @annotate('bigint,string,string,string,string,string,string,string,string->string') class f_check_dumplicate(BaseUDAF): ''' 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值 ''' def __init__(self): import logging import json,re global json,logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [list()] def iterate(self, buffer,docid,doctitle,project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price): buffer[0].append({"docid":docid,"doctitle":doctitle,"project_code":project_code,"project_name":project_name, "tenderee":tenderee,"agency":agency,"win_tenderer":win_tenderer,"bidding_budget":bidding_budget,"win_bid_price":win_bid_price}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_group = [] list_group.append(buffer[0]) return json.dumps(list_group,ensure_ascii=False) @annotate('string -> bigint,bigint,string,string,string,string,string,string,string,string') class f_check_dumplicate_group(BaseUDTF): ''' 从最后的结果中获取组 ''' def __init__(self): import logging import json global json,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def process(self,list_group): if list_group is not None: final_group = json.loads(list_group) logging.info(list_group) for _groups in final_group: for _group in _groups: self.forward(_groups[0]["docid"],_group["docid"],_group["doctitle"],_group["project_code"],_group["project_name"],_group["tenderee"],_group["agency"],_group["win_tenderer"],_group["bidding_budget"],_group["win_bid_price"]) @annotate('string->bigint') class f_is_contain(BaseUDAF): ''' 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值 ''' def __init__(self): import logging import json,re global json,logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [list()] def iterate(self, buffer,doctitle): buffer[0].append(doctitle) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): is_contain = 1 list_doctitle = buffer[0] main_doctitle = "" for _doctitle in list_doctitle: if _doctitle in main_doctitle or main_doctitle in _doctitle: if len(_doctitle)>len(main_doctitle): main_doctitle = _doctitle else: is_contain = 0 break return is_contain def getSet(list_dict,key): _set = set() for item in list_dict: if key in item: if item[key]!='' and item[key] is not None: if re.search("^[\d\.]+$",item[key]) is not None: _set.add(str(float(item[key]))) else: _set.add(str(item[key])) return _set def split_with_time(list_dict,sort_key,timedelta=86400*2): if len(list_dict)>0: if sort_key in list_dict[0]: list_dict.sort(key=lambda x:x[sort_key]) list_group = [] _begin = 0 for i in range(len(list_dict)-1): if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])1: list_group.append(_group) _begin = i + 1 if len(list_dict)>1: _group = [] for j in range(_begin,len(list_dict)): _group.append(list_dict[j]) if len(_group)>1: list_group.append(_group) return list_group return [list_dict] @annotate('bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string->string') class f_check_dumplicate_1(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同 ''' def __init__(self): import logging import json,re global json,logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [list()] def iterate(self, buffer,docid,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column,doctitle,project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price): buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1, "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4, "contain_column":contain_column,"doctitle":doctitle,"project_code":project_code,"project_name":project_name, "tenderee":tenderee,"agency":agency,"win_tenderer":win_tenderer,"bidding_budget":bidding_budget,"win_bid_price":win_bid_price}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_split = split_with_time(buffer[0],"page_time_stamp") list_group = [] for _split in list_split: flag = True keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"] for _key in keys: logging.info(_key+str(getSet(_split,_key))) if len(getSet(_split,_key))>1: flag = False break MAX_CONTAIN_COLUMN = None #判断组内每条公告是否包含 if flag: for _d in _split: contain_column = _d["contain_column"] if contain_column is not None and contain_column !="": if MAX_CONTAIN_COLUMN is None: MAX_CONTAIN_COLUMN = contain_column else: if len(MAX_CONTAIN_COLUMN)1: list_group.append(_split) return json.dumps(list_group) @annotate('string->string,string') class f_splitAttach(BaseUDTF): def __init__(self): import logging import time global time,logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.info("start init env") load_py() logging.info("init env done") from bs4 import BeautifulSoup global BeautifulSoup def process(self,dochtmlcon): doctextcon = "" attachmenttextcon = "" if dochtmlcon is not None: _soup = BeautifulSoup(dochtmlcon,"lxml") _find = _soup.find("div",attrs={"class":"richTextFetch"}) if _find is not None: attachmenttextcon = _find.get_text() _find.decompose() doctextcon = _soup.get_text() self.forward(doctextcon,attachmenttextcon) def getTitleFromHtml(filemd5,_html): _soup = BeautifulSoup(_html,"lxml") _find = _soup.find("a",attrs={"data":filemd5}) _title = "" if _find is not None: _title = _find.get_text() return _title def getSourceLinkFromHtml(filemd5,_html): _soup = BeautifulSoup(_html,"lxml") _find = _soup.find("a",attrs={"filelink":filemd5}) filelink = "" if _find is None: _find = _soup.find("img",attrs={"filelink":filemd5}) if _find is not None: filelink = _find.attrs.get("src","") else: filelink = _find.attrs.get("href","") return filelink def turnAttachmentsFromHtml(dochtmlcon,page_attachments): new_attachments = json.loads(page_attachments) for _atta in new_attachments: fileMd5 = _atta.get("fileMd5") if fileMd5 is not None: fileTitle = getTitleFromHtml(fileMd5,dochtmlcon) fileLink = getSourceLinkFromHtml(fileMd5,dochtmlcon) _atta["fileTitle"] = fileTitle _atta["fileLink"] = fileLink print(new_attachments) return json.dumps(new_attachments,ensure_ascii=False) @annotate('string,string->string') class f_turnPageattachments(object): def evaluate(self,dochtmlcon,page_attachments): new_page_attachments = None if page_attachments is not None: if "fileMd5" in page_attachments: new_page_attachments = turnAttachmentsFromHtml(dochtmlcon,page_attachments) return new_page_attachments @annotate("string->string") class f_getRoles(BaseUDTF): def __init__(self): self.columns = ["win_tenderer","second_tenderer","third_tenderer"] pass # bidway名称统一规范 def bidway_integrate(self,sub_docs_json): if sub_docs_json is not None: _docs = json.loads(sub_docs_json) for _doc in _docs: for _c in self.columns: if _doc.get(_c) is not None: self.forward(_doc.get(_c)) def process(self,sub_docs_json): self.bidway_integrate(sub_docs_json) @annotate("string->string") class turn_bidway(BaseUDTF): def __init__(self): self.bidway_dict = {'询价': '询价', '竞争性谈判': '竞争性谈判', '公开比选': '其他', '国内竞争性磋商': '竞争性磋商', '招标方式:t公开': '公开招标', '竞价': '竞价', '竞标': '竞价', '电子竞价': '竞价', '电子书面竞投': '竞价', '单一来源': '单一来源', '网上竞价': '竞价', '公开招标': '公开招标', '询比': '询价', '定点采购': '其他', '招标方式:■公开': '公开招标', '交易其他,付款其他': '其他', '竞争性评审': '竞争性磋商', '公开招租': '其他', '\\N': '', '比选': '其他', '比质比价': '其他', '分散采购': '其他', '内部邀标': '邀请招标', '邀请招标': '邀请招标', '网上招标': '公开招标', '非定向询价': '询价', '网络竞价': '竞价', '公开询价': '询价', '定点采购议价': '其他', '询单': '询价', '网上挂牌': '其他', '网上直购': '其他', '定向询价': '询价', '采购方式:公开': '公开招标', '磋商': '竞争性磋商', '公开招投标': '公开招标', '招标方式:√公开': '公开招标', '公开选取': '公开招标', '网上电子投标': '公开招标', '公开竞谈': '竞争性谈判', '竞争性磋商': '竞争性磋商', '采购方式:邀请': '邀请招标', '公开竞价': '竞价', '其他': '其他', '公开招募': '其他', '网上询价': '询价'} # bidway名称统一规范 def bidway_integrate(self,bidway): integrate_name = self.bidway_dict.get(bidway,"其他") return integrate_name def process(self,bidway): new_bidway =self.bidway_integrate(bidway) self.forward(new_bidway)