from odps.udf import annotate from odps.distcache import get_cache_archive from odps.distcache import get_cache_file from odps.udf import BaseUDTF from odps.udf import BaseUDAF import threading import logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') import time import uuid import re import traceback from multiprocessing import Process,Queue def log(msg): logging.info(msg) # 配置pandas依赖包 def include_package_path(res_name): import os, sys archive_files = get_cache_archive(res_name) dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files if '.dist_info' not in f.name], key=lambda v: len(v)) _path = dir_names[0].split(".zip/files")[0]+".zip/files" log("add path:%s"%(_path)) sys.path.append(_path) return _path # 可能出现类似RuntimeError: xxx has been blocked by sandbox # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true def include_file(file_name): import os, sys so_file = get_cache_file(file_name) sys.path.append(os.path.dirname(os.path.abspath(so_file.name))) def include_so(file_name): import os, sys so_file = get_cache_file(file_name) with open(so_file.name, 'rb') as fp: content=fp.read() so = open(file_name, "wb") so.write(content) so.flush() so.close() #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入 def init_env(list_files,package_name): import os,sys if len(list_files)==1: so_file = get_cache_file(list_files[0]) cmd_line = os.path.abspath(so_file.name) os.system("unzip -o %s -d %s"%(cmd_line,package_name)) elif len(list_files)>1: cmd_line = "cat" for _file in list_files: so_file = get_cache_file(_file) cmd_line += " "+os.path.abspath(so_file.name) cmd_line += " > temp.zip" os.system(cmd_line) os.system("unzip -o temp.zip -d %s"%(package_name)) # os.system("rm -rf %s/*.dist-info"%(package_name)) # return os.listdir(os.path.abspath("local_package")) # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package"))) # os.system("source ~/.bashrc") sys.path.insert(0,os.path.abspath(package_name)) # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real")) def multiLoadEnv(): def load_project(): start_time = time.time() include_package_path("BiddingKG.backup.zip") logging.info("init biddingkg.zip.env.line cost %d"%(time.time()-start_time)) def load_vector(): start_time = time.time() init_env(["wiki_128_word_embedding_new.vector.env"],".") logging.info("init wiki_128_word_embedding_new cost %d"%(time.time()-start_time)) start_time = time.time() init_env(["enterprise.zip.env"],".") # init_env(["LEGAL_ENTERPRISE.zip.env"],".") logging.info("init legal_enterprise.zip.env cost %d"%(time.time()-start_time)) start_time = time.time() init_env(["so.env"],".") logging.info("init so.env cost %d"%(time.time()-start_time)) def load_py(): start_time = time.time() # self.out = init_env(["envs_py37.zip.env"],str(uuid.uuid4())) include_package_path("envs_py37.env.zip") logging.info("init envs_py37 cost %d"%(time.time()-start_time)) load_project() load_vector() load_py() def getPattern(): filename = "proposedBuildingKeyword.zip.env" init_env([filename],".") df = pd.read_excel("proposedBuildingKeyword.xlsx") dict_industry_keywords = {} for _industry,_keyword in zip(df["类别"],df["关键词"]): if _industry not in dict_industry_keywords: dict_industry_keywords[_industry] = set() dict_industry_keywords[_industry].add(_keyword) list_industry_p = [] for k,v in dict_industry_keywords.items(): if len(v)>0: list_industry_p.append("(?P<%s>%s)"%(k,"|".join(list(v)))) _pattern = re.compile("|".join(list_industry_p)) return _pattern dict_stage = {"设计阶段":"设计", "环评阶段":"环评", "施工准备":"监理", "施工在建":"施工"} list_stage_v = [] for k,v in dict_stage.items(): list_stage_v.append("(?P<%s>%s)"%(k,v)) stage_pattern = "|".join(list_stage_v) def extract_industry(content,_pattern): list_stage = [] for stage_search in re.finditer(_pattern,content): for k,v in stage_search.groupdict().items(): if v is not None: list_stage.append(k) if len(list_stage)>0: return list_stage[0] return None def extract_legal_stage(content): if re.search("拍卖|转让|产权|出让|租赁|招租|采购",content) is not None: return None list_stage = [] for stage_search in re.finditer(stage_pattern,content): for k,v in stage_search.groupdict().items(): if v is not None: list_stage.append(k) if len(list_stage)>0: return list_stage[-1] return None def extract_proportion(content): _pattern = "(?P((建筑|建设)面积|全长)[大概约为是::【\[\s]*[\d,]+(\.\d+)?[十百千万亿]*([\]】平方kK千万公㎡mM米里顷亩]+2?))" _pattern_search = re.search(_pattern,content) _proportion = "" if _pattern_search is not None: _proportion = _pattern_search.groupdict().get("proportion","") if _proportion=="": _pattern = "(?P((建筑|建设|区域)?面积|全长|项目规模)[大概约为是::【\[\s]*[\d,]+(\.\d+)?[十百千万亿]*([\]】平方kK千万公㎡mM米里顷亩]+2?))" _pattern_search = re.search(_pattern,content) if _pattern_search is not None: _proportion = _pattern_search.groupdict().get("proportion","") return _proportion def extract_projectDigest(content): _pattern = "(?P(项目|工程|标的|需求|建设|招标|采购|内容)(概况|规模|简介|信息|范围|内容|说明|摘要).{10,300})" _pattern_search = re.search(_pattern,content) _projectDigest = "" _find = "" if _pattern_search is not None: _find = _pattern_search.groupdict().get("projectDigest","") if len(_find)>0: _projectDigest = "。".join(_find.split("。")[0:3]) return _projectDigest def extract_projectAddress(list_sentence,list_entity): for p_entity in list_entity: if len(p_entity.entity_text)>10 and p_entity.entity_type=="location": for _sentence in list_sentence: if _sentence.sentence_index==p_entity.sentence_index: _span = spanWindow(tokens=_sentence.tokens,begin_index=p_entity.begin_index,end_index=p_entity.end_index,size=20,center_include=True,word_flag=True,text=p_entity.entity_text) if re.search("(项目|建设)(地址|地点)",_span[0]) is not None: return p_entity.entity_text return None def extract_begin_end_time(list_sentence,list_entity): _begin_time = None _end_time = None for p_entity in list_entity: if p_entity.entity_type=="time": for _sentence in list_sentence: if _sentence.sentence_index==p_entity.sentence_index: _span = spanWindow(tokens=_sentence.tokens,begin_index=p_entity.begin_index,end_index=p_entity.end_index,size=20,center_include=True,word_flag=True,text=p_entity.entity_text) if re.search("开工(时间|日期)",_span[0]) is not None: _time_temp = timeFormat(p_entity.entity_text) if len(_time_temp)>0: _begin_time = _time_temp if re.search("(竣工|完工)(时间|日期)",_span[0]) is not None: _time_temp = timeFormat(p_entity.entity_text) if len(_time_temp)>0: _end_time = _time_temp return _begin_time,_end_time @annotate('bigint,string,string,string -> string,string,string,string,string,string,string,string') class extract_proposedBuilding(BaseUDTF): def __init__(self): multiLoadEnv() import pandas as pd global pd self._pattern = getPattern() self.task_queue = Queue() self.result_queue = Queue() self.deal_process = Process(target=self.f_queue_process,args=(self.task_queue,self.result_queue)) self.deal_process.start() import numpy as np self.last_timeout = False def f_queue_process(self,task_queue,result_queue): log("start import predict function") import BiddingKG.dl.interface.Preprocessing as Preprocessing from BiddingKG.dl.common.Utils import spanWindow,timeFormat global spanWindow,timeFormat log("import done") while True: try: item = task_queue.get(True,timeout=10) doc_id = item.get("docid","") dochtmlcon = item.get("dochtmlcon","") doctitle = item.get("doctitle","") project_name = item.get("project_name","") log("start process docid:%s"%(str(doc_id))) _stage = extract_legal_stage(doctitle) result_json = None if _stage is not None: list_articles,list_sentences,list_entitys,list_outlines,_cost_time = Preprocessing.get_preprocessed([[doc_id,dochtmlcon,"","",doctitle,"",""]],useselffool=True) for list_article,list_sentence,list_entity in zip(list_articles,list_sentences,list_entitys): content = list_article.content _stage = extract_legal_stage(doctitle) if _stage is None: continue _industry = extract_industry(content,self._pattern) if _industry is None: continue _proportion = extract_proportion(content) _projectDigest = extract_projectDigest(content) _projectAddress = extract_projectAddress(list_sentence,list_entity) _begin_time,_end_time = extract_begin_end_time(list_sentence,list_entity) project_name_refind = "" if project_name is not None and len(project_name)>0: project_name_refind = re.sub("设计|环评|监理|施工","",project_name) if _stage is not None: result_json = {"_stage":_stage, "_proportion":_proportion, "_projectAddress":_projectAddress, "_projectDigest":_projectDigest, "_begin_time":_begin_time, "_end_time":_end_time, "project_name_refind":project_name_refind, "_industry":_industry} result_queue.put(result_json,True) log("end process docid:%s"%(str(doc_id))) except Exception as e: traceback.print_exc() log("get data time out") pass def process(self,doc_id,dochtmlcon,doctitle,project_name): # #直接处理 # if content is not None and _doc_id not in [105677700,126694044,126795572,126951461,71708072,137850637]: # result_json = predict(str(_doc_id),content,str(_title)) # self.forward(page_time,int(_doc_id),result_json) if dochtmlcon is not None and doc_id not in [105677700,126694044,126795572,126951461,71708072,137850637]: #清除队列中的数据 try: while(self.task_queue.qsize()>0): self.task_queue.get(timeout=5) except Exception as e: pass try: while(self.result_queue.qsize()>0): self.result_queue.get(timeout=5) except Exception as e: pass _item = {"docid":doc_id,"dochtmlcon":dochtmlcon,"doctitle":doctitle,"project_name":project_name} try: _timeout = 60*4 if self.last_timeout: _timeout += 60*5 self.last_timeout = False if not self.deal_process.is_alive(): log("deal process is down") self.task_queue = Queue() self.deal_process = Process(target=self.f_queue_process,args=(self.task_queue,self.result_queue)) self.deal_process.start() _timeout += 60*5 log("putting item to task_queue with docid:%s"%(str(doc_id))) self.task_queue.put(_item) result_json = self.result_queue.get(timeout=_timeout) if result_json is not None: self.forward(result_json.get("_stage"),result_json.get("_proportion"),result_json.get("_projectDigest"),result_json.get("_projectAddress"),result_json.get("_begin_time"),result_json.get("_end_time"),result_json.get("project_name_refind"),result_json.get("_industry")) except Exception as e: log("dealing docid %s failed by timeout"%(str(doc_id))) self.last_timeout = True self.deal_process.kill() time.sleep(5) self.task_queue = Queue() self.deal_process = Process(target=self.f_queue_process,args=(self.task_queue,self.result_queue)) self.deal_process.start() @annotate('bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string->string') class f_remege_proposedBuildingProject(BaseUDAF): ''' 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同 ''' def __init__(self): import logging import json,re global json,logging,re logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') def new_buffer(self): return [list()] def iterate(self, buffer,docid,page_time,province,city,district,tenderee,tenderee_contact,tenderee_phone,agency, project_code,project_name,stage,proportion,projectDigest,projectAddress,begin_time,end_time, project_name_refind,industry): buffer[0].append({"docid":docid,"page_time":page_time,"province":province,"city":city,"district":district, "tenderee":tenderee,"tenderee_contact":tenderee_contact,"tenderee_phone":tenderee_phone, "agency":agency,"project_code":project_code,"project_name":project_name,"stage":stage,"proportion":proportion, "projectDigest":projectDigest,"projectAddress":projectAddress,"begin_time":begin_time,"end_time":end_time, "project_name_refind":project_name_refind,"industry":industry}) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_group = buffer[0] return json.dumps(list_group,ensure_ascii=False)