evaluates.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. #coding=utf-8
  2. # evaluate为该方法的入口函数,必须用这个名字
  3. from odps.udf import annotate
  4. from odps.distcache import get_cache_archive
  5. from odps.distcache import get_cache_file
  6. from odps.udf import BaseUDTF
  7. import threading
  8. import logging
  9. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  10. import time
  11. # 配置pandas依赖包
  12. def include_package_path(res_name):
  13. import os, sys
  14. archive_files = get_cache_archive(res_name)
  15. dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
  16. if '.dist_info' not in f.name], key=lambda v: len(v))
  17. sys.path.append(dir_names[0])
  18. return os.path.dirname(dir_names[0])
  19. # 可能出现类似RuntimeError: xxx has been blocked by sandbox
  20. # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true
  21. def include_file(file_name):
  22. import os, sys
  23. so_file = get_cache_file(file_name)
  24. sys.path.append(os.path.dirname(os.path.abspath(so_file.name)))
  25. def include_so(file_name):
  26. import os, sys
  27. so_file = get_cache_file(file_name)
  28. with open(so_file.name, 'rb') as fp:
  29. content=fp.read()
  30. so = open(file_name, "wb")
  31. so.write(content)
  32. so.flush()
  33. so.close()
  34. #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入
  35. def init_env(list_files,package_name):
  36. import os,sys
  37. if len(list_files)==1:
  38. so_file = get_cache_file(list_files[0])
  39. cmd_line = os.path.abspath(so_file.name)
  40. os.system("unzip -o %s -d %s"%(cmd_line,package_name))
  41. elif len(list_files)>1:
  42. cmd_line = "cat"
  43. for _file in list_files:
  44. so_file = get_cache_file(_file)
  45. cmd_line += " "+os.path.abspath(so_file.name)
  46. cmd_line += " > temp.zip"
  47. os.system(cmd_line)
  48. os.system("unzip -o temp.zip -d %s"%(package_name))
  49. # os.system("rm -rf %s/*.dist-info"%(package_name))
  50. # return os.listdir(os.path.abspath("local_package"))
  51. # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package")))
  52. # os.system("source ~/.bashrc")
  53. sys.path.insert(0,os.path.abspath(package_name))
  54. # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real"))
  55. def multiLoadEnv():
  56. def load_project():
  57. start_time = time.time()
  58. init_env(["BiddingKG.zip.env.baseline"],str(uuid.uuid4()))
  59. # init_env(["BiddingKG.zip.env.backup"],str(uuid.uuid4()))
  60. logging.info("init biddingkg.zip.env.line cost %d"%(time.time()-start_time))
  61. def load_vector():
  62. start_time = time.time()
  63. init_env(["wiki_128_word_embedding_new.vector.env"],".")
  64. logging.info("init wiki_128_word_embedding_new cost %d"%(time.time()-start_time))
  65. start_time = time.time()
  66. init_env(["enterprise.zip.env"],".")
  67. # init_env(["LEGAL_ENTERPRISE.zip.env"],".")
  68. logging.info("init legal_enterprise.zip.env cost %d"%(time.time()-start_time))
  69. start_time = time.time()
  70. init_env(["so.env"],".")
  71. logging.info("init so.env cost %d"%(time.time()-start_time))
  72. def load_py():
  73. start_time = time.time()
  74. # self.out = init_env(["envs_py37.zip.env"],str(uuid.uuid4()))
  75. include_package_path("envs_py37.env.zip")
  76. logging.info("init envs_py37 cost %d"%(time.time()-start_time))
  77. load_project()
  78. load_vector()
  79. load_py()
  80. @annotate("string,bigint,string,string->string,bigint,string")
  81. class Extract(BaseUDTF):
  82. def __init__(self):
  83. # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package")
  84. import uuid
  85. global uuid
  86. import logging
  87. import datetime
  88. import time
  89. global time
  90. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  91. multiLoadEnv()
  92. # logging.info("time5"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  93. # import BiddingKG.dl.interface.predictor as predictor
  94. # logging.info("time6"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  95. # import BiddingKG.dl.interface.Entitys as Entitys
  96. # logging.info("time6.1"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  97. # import BiddingKG.dl.interface.getAttributes as getAttributes
  98. # logging.info("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  99. # import BiddingKG.dl.entityLink.entityLink as entityLink
  100. # logging.info("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  101. # import BiddingKG.dl.interface.Preprocessing as Preprocessing
  102. # logging.info("time6.3"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  103. from BiddingKG.dl.interface.extract import predict as predict
  104. import json
  105. from BiddingKG.dl.common.Utils import log
  106. import numpy as np
  107. global predictor,Entitys,getAttributes,entityLink,json,MyEncoder,Preprocessing,log,MyEncoder,np,predict
  108. class MyEncoder(json.JSONEncoder):
  109. def default(self, obj):
  110. if isinstance(obj, np.ndarray):
  111. return obj.tolist()
  112. elif isinstance(obj, bytes):
  113. return str(obj, encoding='utf-8')
  114. elif isinstance(obj, (np.float_, np.float16, np.float32,
  115. np.float64)):
  116. return float(obj)
  117. elif isinstance(obj,(np.int64)):
  118. return int(obj)
  119. return json.JSONEncoder.default(self, obj)
  120. def process(self,content,_doc_id,_title,page_time):
  121. if content is not None and _doc_id not in [105677700,126694044,126795572,126951461,71708072,137850637]:
  122. result_json = predict(str(_doc_id),content,str(_title))
  123. self.forward(page_time,int(_doc_id),result_json)