|
@@ -10,6 +10,7 @@ import threading
|
|
|
import logging
|
|
|
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
import time
|
|
|
+from multiprocessing import Process,Queue
|
|
|
|
|
|
def log(msg):
|
|
|
logging.info(msg)
|
|
@@ -106,6 +107,18 @@ def multiLoadEnv():
|
|
|
@annotate("string,bigint,string,string->string,bigint,string")
|
|
|
class Extract(BaseUDTF):
|
|
|
|
|
|
+ def f_queue_process(self,task_queue,result_queue):
|
|
|
+ log("start import predict function")
|
|
|
+ from BiddingKG.dl.interface.extract import predict as predict
|
|
|
+ log("import done")
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ item = task_queue.get(True)
|
|
|
+ result_json = predict(item.get("docid",""),item.get("content",""),item.get("title",""))
|
|
|
+ result_queue.put(result_json)
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+
|
|
|
def __init__(self):
|
|
|
|
|
|
# self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package")
|
|
@@ -120,25 +133,27 @@ class Extract(BaseUDTF):
|
|
|
|
|
|
multiLoadEnv()
|
|
|
|
|
|
- import BiddingKG.dl.common.nerUtils
|
|
|
- log("time5"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
|
|
|
- import BiddingKG.dl.interface.predictor as predictor
|
|
|
- log("time6"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
|
|
|
- import BiddingKG.dl.interface.Entitys as Entitys
|
|
|
- log("time6.1"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
|
|
|
- import BiddingKG.dl.interface.getAttributes as getAttributes
|
|
|
- log("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
|
|
|
- import BiddingKG.dl.entityLink.entityLink as entityLink
|
|
|
- log("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
|
|
|
- import BiddingKG.dl.interface.Preprocessing as Preprocessing
|
|
|
- log("time6.3"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
|
|
|
+ # import BiddingKG.dl.common.nerUtils
|
|
|
+ # log("time5"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
|
|
|
+ # import BiddingKG.dl.interface.predictor as predictor
|
|
|
+ # log("time6"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
|
|
|
+ # import BiddingKG.dl.interface.Entitys as Entitys
|
|
|
+ # log("time6.1"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
|
|
|
+ # import BiddingKG.dl.interface.getAttributes as getAttributes
|
|
|
+ # log("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
|
|
|
+ # import BiddingKG.dl.entityLink.entityLink as entityLink
|
|
|
+ # log("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
|
|
|
+ # import BiddingKG.dl.interface.Preprocessing as Preprocessing
|
|
|
+ # log("time6.3"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
|
|
|
|
|
|
- log("=======")
|
|
|
- time.sleep(5)
|
|
|
- from BiddingKG.dl.interface.extract import predict as predict
|
|
|
- log("=======import done")
|
|
|
+ # log("start import predict function")
|
|
|
+ # from BiddingKG.dl.interface.extract import predict as predict
|
|
|
+ # log("import done")
|
|
|
import json
|
|
|
-
|
|
|
+ self.task_queue = Queue()
|
|
|
+ self.result_queue = Queue()
|
|
|
+ self.deal_process = Process(target=self.f_queue_process,args=(self.task_queue,self.result_queue))
|
|
|
+ self.deal_process.start()
|
|
|
import numpy as np
|
|
|
|
|
|
|
|
@@ -158,6 +173,25 @@ class Extract(BaseUDTF):
|
|
|
return json.JSONEncoder.default(self, obj)
|
|
|
|
|
|
def process(self,content,_doc_id,_title,page_time):
|
|
|
+ # #直接处理
|
|
|
+ # if content is not None and _doc_id not in [105677700,126694044,126795572,126951461,71708072,137850637]:
|
|
|
+ # result_json = predict(str(_doc_id),content,str(_title))
|
|
|
+ # self.forward(page_time,int(_doc_id),result_json)
|
|
|
+
|
|
|
+
|
|
|
if content is not None and _doc_id not in [105677700,126694044,126795572,126951461,71708072,137850637]:
|
|
|
- result_json = predict(str(_doc_id),content,str(_title))
|
|
|
- self.forward(page_time,int(_doc_id),result_json)
|
|
|
+ _item = {"docid":_doc_id,"content":content,"title":_title}
|
|
|
+ self.task_queue.put(_item)
|
|
|
+
|
|
|
+ try:
|
|
|
+ if not self.deal_process.is_alive():
|
|
|
+ log("deal process is down")
|
|
|
+ self.deal_process = Process(target=self.f_queue_process,args=(self.task_queue,self.result_queue))
|
|
|
+ self.deal_process.start()
|
|
|
+ result_json = self.result_queue.get(timeout=60*4)
|
|
|
+ self.forward(page_time,int(_doc_id),result_json)
|
|
|
+ except Exception as e:
|
|
|
+ log("dealing docid %s failed by timeout"%(str(_doc_id)))
|
|
|
+ self.deal_process.kill()
|
|
|
+ self.deal_process = Process(target=self.f_queue_process,args=(self.task_queue,self.result_queue))
|
|
|
+ self.deal_process.start()
|