123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- from odps.udf import annotate
- from odps.distcache import get_cache_archive
- from odps.distcache import get_cache_file
- from odps.udf import BaseUDTF
- from odps.udf import BaseUDAF
- import threading
- import logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- import time
- import uuid
- import re
- import traceback
- from multiprocessing import Process,Queue
- def log(msg):
- logging.info(msg)
- # 配置pandas依赖包
- def include_package_path(res_name):
- import os, sys
- archive_files = get_cache_archive(res_name)
- dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
- if '.dist_info' not in f.name], key=lambda v: len(v))
- _path = dir_names[0].split(".zip/files")[0]+".zip/files"
- log("add path:%s"%(_path))
- sys.path.append(_path)
- return _path
- # 可能出现类似RuntimeError: xxx has been blocked by sandbox
- # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true
- def include_file(file_name):
- import os, sys
- so_file = get_cache_file(file_name)
- sys.path.append(os.path.dirname(os.path.abspath(so_file.name)))
- def include_so(file_name):
- import os, sys
- so_file = get_cache_file(file_name)
- with open(so_file.name, 'rb') as fp:
- content=fp.read()
- so = open(file_name, "wb")
- so.write(content)
- so.flush()
- so.close()
- #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入
- def init_env(list_files,package_name):
- import os,sys
- if len(list_files)==1:
- so_file = get_cache_file(list_files[0])
- cmd_line = os.path.abspath(so_file.name)
- os.system("unzip -o %s -d %s"%(cmd_line,package_name))
- elif len(list_files)>1:
- cmd_line = "cat"
- for _file in list_files:
- so_file = get_cache_file(_file)
- cmd_line += " "+os.path.abspath(so_file.name)
- cmd_line += " > temp.zip"
- os.system(cmd_line)
- os.system("unzip -o temp.zip -d %s"%(package_name))
- # os.system("rm -rf %s/*.dist-info"%(package_name))
- # return os.listdir(os.path.abspath("local_package"))
- # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package")))
- # os.system("source ~/.bashrc")
- sys.path.insert(0,os.path.abspath(package_name))
- # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real"))
- def multiLoadEnv():
- def load_project():
- start_time = time.time()
- include_package_path("BiddingKG.backup.zip")
- logging.info("init biddingkg.zip.env.line cost %d"%(time.time()-start_time))
- def load_vector():
- start_time = time.time()
- init_env(["wiki_128_word_embedding_new.vector.env"],".")
- logging.info("init wiki_128_word_embedding_new cost %d"%(time.time()-start_time))
- start_time = time.time()
- init_env(["enterprise.zip.env"],".")
- # init_env(["LEGAL_ENTERPRISE.zip.env"],".")
- logging.info("init legal_enterprise.zip.env cost %d"%(time.time()-start_time))
- start_time = time.time()
- init_env(["so.env"],".")
- logging.info("init so.env cost %d"%(time.time()-start_time))
- def load_py():
- start_time = time.time()
- # self.out = init_env(["envs_py37.zip.env"],str(uuid.uuid4()))
- include_package_path("envs_py37.env.zip")
- logging.info("init envs_py37 cost %d"%(time.time()-start_time))
- load_project()
- load_vector()
- load_py()
- def getPattern():
- filename = "proposedBuildingKeyword.zip.env"
- init_env([filename],".")
- df = pd.read_excel("proposedBuildingKeyword.xlsx")
- dict_industry_keywords = {}
- for _industry,_keyword in zip(df["类别"],df["关键词"]):
- if _industry not in dict_industry_keywords:
- dict_industry_keywords[_industry] = set()
- dict_industry_keywords[_industry].add(_keyword)
- list_industry_p = []
- for k,v in dict_industry_keywords.items():
- if len(v)>0:
- list_industry_p.append("(?P<%s>%s)"%(k,"|".join(list(v))))
- _pattern = re.compile("|".join(list_industry_p))
- return _pattern
- dict_stage = {"设计阶段":"设计",
- "环评阶段":"环评",
- "施工准备":"监理",
- "施工在建":"施工"}
- list_stage_v = []
- for k,v in dict_stage.items():
- list_stage_v.append("(?P<%s>%s)"%(k,v))
- stage_pattern = "|".join(list_stage_v)
- def extract_industry(content,_pattern):
- list_stage = []
- for stage_search in re.finditer(_pattern,content):
- for k,v in stage_search.groupdict().items():
- if v is not None:
- list_stage.append(k)
- if len(list_stage)>0:
- return list_stage[0]
- return None
- def extract_legal_stage(content):
- if re.search("拍卖|转让|产权|出让|租赁|招租|采购",content) is not None:
- return None
- list_stage = []
- for stage_search in re.finditer(stage_pattern,content):
- for k,v in stage_search.groupdict().items():
- if v is not None:
- list_stage.append(k)
- if len(list_stage)>0:
- return list_stage[-1]
- return None
- def extract_proportion(content):
- _pattern = "(?P<proportion>((建筑|建设)面积|全长)[大概约为是::【\[\s]*[\d,]+(\.\d+)?[十百千万亿]*([\]】平方kK千万公㎡mM米里顷亩]+2?))"
- _pattern_search = re.search(_pattern,content)
- _proportion = ""
- if _pattern_search is not None:
- _proportion = _pattern_search.groupdict().get("proportion","")
- if _proportion=="":
- _pattern = "(?P<proportion>((建筑|建设|区域)?面积|全长|项目规模)[大概约为是::【\[\s]*[\d,]+(\.\d+)?[十百千万亿]*([\]】平方kK千万公㎡mM米里顷亩]+2?))"
- _pattern_search = re.search(_pattern,content)
- if _pattern_search is not None:
- _proportion = _pattern_search.groupdict().get("proportion","")
- return _proportion
- def extract_projectDigest(content):
- _pattern = "(?P<projectDigest>(项目|工程|标的|需求|建设|招标|采购|内容)(概况|规模|简介|信息|范围|内容|说明|摘要).{10,300})"
- _pattern_search = re.search(_pattern,content)
- _projectDigest = ""
- _find = ""
- if _pattern_search is not None:
- _find = _pattern_search.groupdict().get("projectDigest","")
- if len(_find)>0:
- _projectDigest = "。".join(_find.split("。")[0:3])
- return _projectDigest
- def extract_projectAddress(list_sentence,list_entity):
- for p_entity in list_entity:
- if len(p_entity.entity_text)>10 and p_entity.entity_type=="location":
- for _sentence in list_sentence:
- if _sentence.sentence_index==p_entity.sentence_index:
- _span = spanWindow(tokens=_sentence.tokens,begin_index=p_entity.begin_index,end_index=p_entity.end_index,size=20,center_include=True,word_flag=True,text=p_entity.entity_text)
- if re.search("(项目|建设)(地址|地点)",_span[0]) is not None:
- return p_entity.entity_text
- return None
- def extract_begin_end_time(list_sentence,list_entity):
- _begin_time = None
- _end_time = None
- for p_entity in list_entity:
- if p_entity.entity_type=="time":
- for _sentence in list_sentence:
- if _sentence.sentence_index==p_entity.sentence_index:
- _span = spanWindow(tokens=_sentence.tokens,begin_index=p_entity.begin_index,end_index=p_entity.end_index,size=20,center_include=True,word_flag=True,text=p_entity.entity_text)
- if re.search("开工(时间|日期)",_span[0]) is not None:
- _time_temp = timeFormat(p_entity.entity_text)
- if len(_time_temp)>0:
- _begin_time = _time_temp
- if re.search("(竣工|完工)(时间|日期)",_span[0]) is not None:
- _time_temp = timeFormat(p_entity.entity_text)
- if len(_time_temp)>0:
- _end_time = _time_temp
- return _begin_time,_end_time
- @annotate('bigint,string,string,string -> string,string,string,string,string,string,string,string')
- class extract_proposedBuilding(BaseUDTF):
- def __init__(self):
- multiLoadEnv()
- import pandas as pd
- global pd
- self._pattern = getPattern()
- self.task_queue = Queue()
- self.result_queue = Queue()
- self.deal_process = Process(target=self.f_queue_process,args=(self.task_queue,self.result_queue))
- self.deal_process.start()
- import numpy as np
- self.last_timeout = False
- def f_queue_process(self,task_queue,result_queue):
- log("start import predict function")
- import BiddingKG.dl.interface.Preprocessing as Preprocessing
- from BiddingKG.dl.common.Utils import spanWindow,timeFormat
- global spanWindow,timeFormat
- log("import done")
- while True:
- try:
- item = task_queue.get(True,timeout=10)
- doc_id = item.get("docid","")
- dochtmlcon = item.get("dochtmlcon","")
- doctitle = item.get("doctitle","")
- project_name = item.get("project_name","")
- log("start process docid:%s"%(str(doc_id)))
- _stage = extract_legal_stage(doctitle)
- result_json = None
- if _stage is not None:
- list_articles,list_sentences,list_entitys,list_outlines,_cost_time = Preprocessing.get_preprocessed([[doc_id,dochtmlcon,"","",doctitle,"",""]],useselffool=True)
- for list_article,list_sentence,list_entity in zip(list_articles,list_sentences,list_entitys):
- content = list_article.content
- _stage = extract_legal_stage(doctitle)
- if _stage is None:
- continue
- _industry = extract_industry(content,self._pattern)
- if _industry is None:
- continue
- _proportion = extract_proportion(content)
- _projectDigest = extract_projectDigest(content)
- _projectAddress = extract_projectAddress(list_sentence,list_entity)
- _begin_time,_end_time = extract_begin_end_time(list_sentence,list_entity)
- project_name_refind = ""
- if project_name is not None and len(project_name)>0:
- project_name_refind = re.sub("设计|环评|监理|施工","",project_name)
- if _stage is not None:
- result_json = {"_stage":_stage,
- "_proportion":_proportion,
- "_projectAddress":_projectAddress,
- "_projectDigest":_projectDigest,
- "_begin_time":_begin_time,
- "_end_time":_end_time,
- "project_name_refind":project_name_refind,
- "_industry":_industry}
- result_queue.put(result_json,True)
- log("end process docid:%s"%(str(doc_id)))
- except Exception as e:
- traceback.print_exc()
- log("get data time out")
- pass
- def process(self,doc_id,dochtmlcon,doctitle,project_name):
- # #直接处理
- # if content is not None and _doc_id not in [105677700,126694044,126795572,126951461,71708072,137850637]:
- # result_json = predict(str(_doc_id),content,str(_title))
- # self.forward(page_time,int(_doc_id),result_json)
- if dochtmlcon is not None and doc_id not in [105677700,126694044,126795572,126951461,71708072,137850637]:
- #清除队列中的数据
- try:
- while(self.task_queue.qsize()>0):
- self.task_queue.get(timeout=5)
- except Exception as e:
- pass
- try:
- while(self.result_queue.qsize()>0):
- self.result_queue.get(timeout=5)
- except Exception as e:
- pass
- _item = {"docid":doc_id,"dochtmlcon":dochtmlcon,"doctitle":doctitle,"project_name":project_name}
- try:
- _timeout = 60*4
- if self.last_timeout:
- _timeout += 60*5
- self.last_timeout = False
- if not self.deal_process.is_alive():
- log("deal process is down")
- self.task_queue = Queue()
- self.deal_process = Process(target=self.f_queue_process,args=(self.task_queue,self.result_queue))
- self.deal_process.start()
- _timeout += 60*5
- log("putting item to task_queue with docid:%s"%(str(doc_id)))
- self.task_queue.put(_item)
- result_json = self.result_queue.get(timeout=_timeout)
- if result_json is not None:
- self.forward(result_json.get("_stage"),result_json.get("_proportion"),result_json.get("_projectDigest"),result_json.get("_projectAddress"),result_json.get("_begin_time"),result_json.get("_end_time"),result_json.get("project_name_refind"),result_json.get("_industry"))
- except Exception as e:
- log("dealing docid %s failed by timeout"%(str(doc_id)))
- self.last_timeout = True
- self.deal_process.kill()
- time.sleep(5)
- self.task_queue = Queue()
- self.deal_process = Process(target=self.f_queue_process,args=(self.task_queue,self.result_queue))
- self.deal_process.start()
- @annotate('bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string->string')
- class f_remege_proposedBuildingProject(BaseUDAF):
- '''
- 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
- '''
- def __init__(self):
- import logging
- import json,re
- global json,logging,re
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- def new_buffer(self):
- return [list()]
- def iterate(self, buffer,docid,page_time,province,city,district,tenderee,tenderee_contact,tenderee_phone,agency,
- project_code,project_name,stage,proportion,projectDigest,projectAddress,begin_time,end_time,
- project_name_refind,industry):
- buffer[0].append({"docid":docid,"page_time":page_time,"province":province,"city":city,"district":district,
- "tenderee":tenderee,"tenderee_contact":tenderee_contact,"tenderee_phone":tenderee_phone,
- "agency":agency,"project_code":project_code,"project_name":project_name,"stage":stage,"proportion":proportion,
- "projectDigest":projectDigest,"projectAddress":projectAddress,"begin_time":begin_time,"end_time":end_time,
- "project_name_refind":project_name_refind,"industry":industry})
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- def terminate(self, buffer):
- list_group = buffer[0]
- return json.dumps(list_group,ensure_ascii=False)
|