|
@@ -47,6 +47,107 @@ class ActiveMQListener():
|
|
|
def __del__(self):
|
|
|
self.conn.disconnect()
|
|
|
|
|
|
+
|
|
|
+class DynamicProcess(Process):
|
|
|
+
|
|
|
+ def __init__(self,listener_cls,comsumer_handler,comsumer_count,queue_name):
|
|
|
+ self.listener_cls = listener_cls
|
|
|
+ self.comsumer_handler = comsumer_handler
|
|
|
+ self.comsumer_count = comsumer_count
|
|
|
+ self.queue_name = queue_name
|
|
|
+ super().__init__()
|
|
|
+
|
|
|
+ def run(self):
|
|
|
+
|
|
|
+ self.list_comsumer = []
|
|
|
+ for _i in range(self.comsumer_count):
|
|
|
+ listener_attachment = self.listener_cls(getConnect_activateMQ(),self.comsumer_handler ,_i)
|
|
|
+ createComsumer(listener_attachment,self.queue_name)
|
|
|
+ self.list_comsumer.append(listener_attachment)
|
|
|
+
|
|
|
+ while 1:
|
|
|
+ for i in range(len(self.list_comsumer)):
|
|
|
+ if self.list_comsumer[i].conn.is_connected():
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ listener = self.listener_cls(getConnect_activateMQ(),self.comsumer_handler,i)
|
|
|
+ createComsumer(listener,self.queue_name)
|
|
|
+ self.list_comsumer[i] = listener
|
|
|
+ time.sleep(5)
|
|
|
+
|
|
|
+ def terminate(self) -> None:
|
|
|
+ for i in range(len(self.list_comsumer)):
|
|
|
+ _c = self.list_comsumer[i]
|
|
|
+ _c.conn.disconnect()
|
|
|
+ super().terminate()
|
|
|
+
|
|
|
+ def __del__(self):
|
|
|
+ for i in range(len(self.list_comsumer)):
|
|
|
+ _c = self.list_comsumer[i]
|
|
|
+ _c.conn.disconnect()
|
|
|
+
|
|
|
+def dynamic_listener(sorted_names,process_count,listener_cls,comsumer_handler,comsumer_count,min_count=5000):
|
|
|
+ from BaseDataMaintenance.java.MQInfo import getQueueSize
|
|
|
+
|
|
|
+ list_queue_dict = []
|
|
|
+ for _i in range(len(sorted_names)):
|
|
|
+ list_queue_dict.append({"name":sorted_names[_i],
|
|
|
+ "level":_i})
|
|
|
+ if len(list_queue_dict)==0:
|
|
|
+ return
|
|
|
+ last_name = ""
|
|
|
+ list_comsumer = []
|
|
|
+ try:
|
|
|
+ while 1:
|
|
|
+ for _d in list_queue_dict:
|
|
|
+ _size = getQueueSize(_d.get("name"))
|
|
|
+ _d["queue_size"] = _size
|
|
|
+ if _size>min_count:
|
|
|
+ _d["size_level"] = 1
|
|
|
+ else:
|
|
|
+ _d["size_level"] = 2
|
|
|
+
|
|
|
+ list_queue_dict.sort(key=lambda x:(x.get("size_level"),x.get("level")))
|
|
|
+ queue_name = list_queue_dict[0].get("name")
|
|
|
+ queue_size = list_queue_dict[0].get("queue_size")
|
|
|
+ log("dynamic_listener last_name:%s queue_name:%s %s"%(last_name,queue_name,str(list_queue_dict)))
|
|
|
+
|
|
|
+ # 存在于一个进程
|
|
|
+
|
|
|
+ if last_name=="":
|
|
|
+ for i in range(comsumer_count):
|
|
|
+ listener_attachment = listener_cls(getConnect_activateMQ(),comsumer_handler ,_i)
|
|
|
+ listener_name = createComsumer(listener_attachment,queue_name)
|
|
|
+ list_comsumer.append((listener_attachment,listener_name))
|
|
|
+ last_name = queue_name
|
|
|
+ elif last_name!=queue_name:
|
|
|
+ log("dynamic_listener terminate")
|
|
|
+ for listener_attachment,listener_name in list_comsumer:
|
|
|
+ listener_attachment.conn.remove_listener(listener_name)
|
|
|
+ listener_attachment.conn.disconnect()
|
|
|
+ list_comsumer.clear()
|
|
|
+ break
|
|
|
+
|
|
|
+ time.sleep(60)
|
|
|
+
|
|
|
+
|
|
|
+ # # 用进程启动
|
|
|
+ # if last_name!=queue_name:
|
|
|
+ # for _c in list_comsumer:
|
|
|
+ # _c.kill()
|
|
|
+ # _c.join()
|
|
|
+ # log("dynamic_listener terminate")
|
|
|
+ # list_comsumer.clear()
|
|
|
+ # for _i in range(process_count):
|
|
|
+ # listener_p = DynamicProcess(listener_cls,comsumer_handler,comsumer_count,queue_name=queue_name)
|
|
|
+ # listener_p.start()
|
|
|
+ # list_comsumer.append(listener_p)
|
|
|
+ #
|
|
|
+ # last_name = queue_name
|
|
|
+ # time.sleep(120)
|
|
|
+ except Exception as e:
|
|
|
+ traceback.print_exc()
|
|
|
+
|
|
|
class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
|
|
|
|
|
|
class AttachmentMQListener():
|
|
@@ -81,13 +182,18 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
|
|
|
|
|
|
|
|
|
self.mq_attachment = "/queue/dataflow_attachment"
|
|
|
+ self.mq_attachment_fix = "/queue/dataflow_attachment_fix"
|
|
|
+ self.mq_attachment_his = "/queue/dataflow_attachment_his"
|
|
|
self.mq_attachment_failed = "/queue/dataflow_attachment_failed"
|
|
|
self.mq_extract = "/queue/dataflow_extract"
|
|
|
+ self.mq_extract_his = "/queue/dataflow_extract_his"
|
|
|
|
|
|
self.queue_attachment_ocr = Queue()
|
|
|
self.queue_attachment_not_ocr = Queue()
|
|
|
self.comsumer_count = 20
|
|
|
self.comsumer_process_count = 5
|
|
|
+ self.comsumer_process_count_fix = 1
|
|
|
+ self.comsumer_process_count_his = 1
|
|
|
self.retry_comsumer_count = 10
|
|
|
self.retry_times = 5
|
|
|
self.list_attachment_comsumer = []
|
|
@@ -105,28 +211,39 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
|
|
|
self.session = None
|
|
|
|
|
|
for _ in range(self.comsumer_process_count):
|
|
|
- listener_p = Process(target=self.start_attachment_listener)
|
|
|
+ listener_p = Process(target=self.start_attachment_listener,args=(10,self.mq_attachment))
|
|
|
+ listener_p.start()
|
|
|
+ for _ in range(self.comsumer_process_count_fix):
|
|
|
+ listener_p = Process(target=self.start_attachment_listener,args=(5,self.mq_attachment_fix))
|
|
|
listener_p.start()
|
|
|
+ for _ in range(self.comsumer_process_count_his):
|
|
|
+ listener_p = Process(target=self.start_attachment_listener,args=(5,self.mq_attachment_his))
|
|
|
+ listener_p.start()
|
|
|
+
|
|
|
+ # listener_p = Process(target=dynamic_listener,args=(["dataflow_attachment","dataflow_attachment_fix","dataflow_attachment_his"],1,self.AttachmentMQListener,lambda _dict: self.attachment_listener_handler(_dict),2))
|
|
|
+ # listener_p.start()
|
|
|
|
|
|
# listener_p = Process(target=self.start_attachment_listener)
|
|
|
# listener_p.start()
|
|
|
|
|
|
|
|
|
|
|
|
- def start_attachment_listener(self):
|
|
|
- for _i in range(self.comsumer_count):
|
|
|
+
|
|
|
+ def start_attachment_listener(self,comsumer_count,queue_name):
|
|
|
+ list_attachment_comsumer = []
|
|
|
+ for _i in range(comsumer_count):
|
|
|
listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler ,_i)
|
|
|
- createComsumer(listener_attachment,self.mq_attachment)
|
|
|
- self.list_attachment_comsumer.append(listener_attachment)
|
|
|
+ createComsumer(listener_attachment,queue_name)
|
|
|
+ list_attachment_comsumer.append(listener_attachment)
|
|
|
|
|
|
while 1:
|
|
|
- for i in range(len(self.list_attachment_comsumer)):
|
|
|
- if self.list_attachment_comsumer[i].conn.is_connected():
|
|
|
+ for i in range(len(list_attachment_comsumer)):
|
|
|
+ if list_attachment_comsumer[i].conn.is_connected():
|
|
|
continue
|
|
|
else:
|
|
|
listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,i)
|
|
|
- createComsumer(listener,self.mq_attachment)
|
|
|
- self.list_attachment_comsumer[i] = listener
|
|
|
+ createComsumer(listener,queue_name)
|
|
|
+ list_attachment_comsumer[i] = listener
|
|
|
time.sleep(5)
|
|
|
|
|
|
def monitor_listener(self):
|
|
@@ -138,6 +255,9 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
|
|
|
createComsumer(listener,self.mq_attachment)
|
|
|
self.list_attachment_comsumer[i] = listener
|
|
|
|
|
|
+ def dynamic_listener_attachment(self):
|
|
|
+ dynamic_listener(["dataflow_attachment","dataflow_attachment_fix","dataflow_attachment_his"],1,self.AttachmentMQListener,lambda _dict: self.attachment_listener_handler(_dict),5)
|
|
|
+
|
|
|
|
|
|
def process_failed_attachment(self):
|
|
|
|
|
@@ -165,7 +285,10 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
|
|
|
message_id = frame.headers["message-id"]
|
|
|
item = json.loads(frame.body)
|
|
|
_idx = _dict.get("idx",1)
|
|
|
- page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
|
|
|
+ try:
|
|
|
+ page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
|
|
|
+ except Exception as e:
|
|
|
+ page_attachments = []
|
|
|
_dochtmlcon = item.get(document_tmp_dochtmlcon,"")
|
|
|
|
|
|
if random.random()<0.2:
|
|
@@ -194,6 +317,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
|
|
|
except Exception as e:
|
|
|
traceback.print_exc()
|
|
|
|
|
|
+
|
|
|
def rec_attachments_by_interface(self,list_attach,_dochtmlcon,save=True):
|
|
|
try:
|
|
|
list_html = []
|
|
@@ -328,7 +452,20 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
|
|
|
|
|
|
dtmp.setValue(document_tmp_attachment_extract_status,1,True)
|
|
|
dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
|
|
|
- send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
|
|
|
+
|
|
|
+ page_time = item.get(document_page_time,"")
|
|
|
+ current_date = getCurrent_date(format='%Y-%m-%d')
|
|
|
+ last_7_date = timeAdd(current_date,-7,format='%Y-%m-%d')
|
|
|
+ if page_time<last_7_date:
|
|
|
+ is_his = True
|
|
|
+ else:
|
|
|
+ is_his = False
|
|
|
+ if not is_his:
|
|
|
+ queue_name = self.mq_extract
|
|
|
+ else:
|
|
|
+ queue_name = self.mq_extract_his
|
|
|
+
|
|
|
+ send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),queue_name)
|
|
|
if send_succeed:
|
|
|
_to_ack = True
|
|
|
except Exception as e:
|
|
@@ -651,10 +788,10 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
|
|
|
|
|
|
if _filemd5 not in set_md5:
|
|
|
|
|
|
+ log("getAttachments search in ots:%s"%(_filemd5))
|
|
|
_path = self.getAttachPath(_filemd5,_dochtmlcon)
|
|
|
|
|
|
|
|
|
- log("getAttachments search in ots:%s"%(_filemd5))
|
|
|
_attach = {attachment_filemd5:_filemd5}
|
|
|
_attach_ots = attachment(_attach)
|
|
|
if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time,attachment_classification],True):
|
|
@@ -777,11 +914,14 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
|
|
|
|
|
|
# schedule.add_job(self.monitor_attachment_process,"cron",second="*/10")
|
|
|
# schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
|
|
|
+ schedule.add_job(self.dynamic_listener_attachment,"cron",second="*/10")
|
|
|
schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
|
|
|
schedule.start()
|
|
|
|
|
|
class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
|
|
|
+
|
|
|
+
|
|
|
class ExtractListener():
|
|
|
|
|
|
def __init__(self,conn,_func,_idx,*args,**kwargs):
|
|
@@ -806,6 +946,8 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
def __del__(self):
|
|
|
self.conn.disconnect()
|
|
|
|
|
|
+
|
|
|
+
|
|
|
def __init__(self,create_listener=True):
|
|
|
Dataflow_extract.__init__(self)
|
|
|
|
|
@@ -818,6 +960,8 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
|
|
|
|
|
|
self.mq_extract = "/queue/dataflow_extract"
|
|
|
+ self.mq_extract_fix = "/queue/dataflow_extract_fix"
|
|
|
+ self.mq_extract_his = "/queue/dataflow_extract_his"
|
|
|
self.mq_extract_ai = "/queue/dataflow_extract_AI"
|
|
|
self.mq_extract_failed = "/queue/dataflow_extract_failed"
|
|
|
|
|
@@ -855,11 +999,23 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
|
|
|
# 提取listener
|
|
|
if create_listener:
|
|
|
- for ii in range(10):
|
|
|
- listener_p = Process(target=self.start_extract_listener)
|
|
|
+ for ii in range(8):
|
|
|
+ listener_p = Process(target=self.start_extract_listener,args=(5,self.mq_extract))
|
|
|
+ listener_p.start()
|
|
|
+ for ii in range(1):
|
|
|
+ listener_p = Process(target=self.start_extract_listener,args=(2,self.mq_extract_fix))
|
|
|
listener_p.start()
|
|
|
+ for ii in range(2):
|
|
|
+ listener_p = Process(target=self.start_extract_listener,args=(4,self.mq_extract_his))
|
|
|
+ listener_p.start()
|
|
|
+ # listener_p = Process(target=dynamic_listener,args=(["dataflow_extract","dataflow_extract_fix","dataflow_extract_his"],8,self.ExtractListener,lambda _dict,result_queue:self.comsumer_handle(_dict,result_queue),2))
|
|
|
+ # listener_p.start()
|
|
|
listener_p_ai = Thread(target=self.start_extract_AI_listener)
|
|
|
listener_p_ai.start()
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
|
|
|
|
|
|
|
|
@@ -886,24 +1042,24 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
traceback.print_exc()
|
|
|
|
|
|
|
|
|
- def start_extract_listener(self):
|
|
|
+ def start_extract_listener(self,comsumer_count,queue_name):
|
|
|
|
|
|
- self.list_extract_comsumer = []
|
|
|
+ list_extract_comsumer = []
|
|
|
|
|
|
- for _i in range(self.comsumer_count):
|
|
|
+ for _i in range(comsumer_count):
|
|
|
listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
|
|
|
- createComsumer(listener_extract,self.mq_extract)
|
|
|
- self.list_extract_comsumer.append(listener_extract)
|
|
|
+ createComsumer(listener_extract,queue_name)
|
|
|
+ list_extract_comsumer.append(listener_extract)
|
|
|
|
|
|
while 1:
|
|
|
try:
|
|
|
- for _i in range(len(self.list_extract_comsumer)):
|
|
|
- if self.list_extract_comsumer[_i].conn.is_connected():
|
|
|
+ for _i in range(len(list_extract_comsumer)):
|
|
|
+ if list_extract_comsumer[_i].conn.is_connected():
|
|
|
continue
|
|
|
else:
|
|
|
listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
|
|
|
- createComsumer(listener,self.mq_extract)
|
|
|
- self.list_extract_comsumer[_i] = listener
|
|
|
+ createComsumer(listener,queue_name)
|
|
|
+ list_extract_comsumer[_i] = listener
|
|
|
time.sleep(5)
|
|
|
except Exception as e:
|
|
|
traceback.print_exc()
|
|
@@ -951,6 +1107,9 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
q_size = self.queue_extract.qsize()
|
|
|
log("queue extract size:%d"%(q_size))
|
|
|
|
|
|
+ def dynamic_listener_extract(self):
|
|
|
+ dynamic_listener(["dataflow_extract","dataflow_extract_fix","dataflow_extract_his"],1,self.ExtractListener,lambda _dict,result_queue:self.comsumer_handle(_dict,result_queue),5)
|
|
|
+
|
|
|
def process_extract_failed(self):
|
|
|
def _handle(_dict,result_queue):
|
|
|
frame = _dict.get("frame")
|
|
@@ -1375,12 +1534,13 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
subscription = frame.headers.setdefault('subscription', None)
|
|
|
item = json.loads(frame.body)
|
|
|
|
|
|
- _extract_json = None
|
|
|
- if document_extract2_extract_json in item:
|
|
|
- _extract_json = item.get(document_extract2_extract_json)
|
|
|
- item.pop(document_extract2_extract_json)
|
|
|
-
|
|
|
try:
|
|
|
+ _extract_json = None
|
|
|
+ if document_extract2_extract_json in item:
|
|
|
+ _extract_json = item.get(document_extract2_extract_json)
|
|
|
+ item.pop(document_extract2_extract_json)
|
|
|
+
|
|
|
+
|
|
|
message_acknowledged = False
|
|
|
dtmp = Document_tmp(item)
|
|
|
|
|
@@ -1494,7 +1654,9 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
traceback.print_exc()
|
|
|
if not message_acknowledged:
|
|
|
ackMsg(conn,message_id,subscription)
|
|
|
-
|
|
|
+ else:
|
|
|
+ send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai)
|
|
|
+ ackMsg(conn,message_id,subscription)
|
|
|
|
|
|
def merge_json(self,extract_json,extract_ai_json):
|
|
|
|
|
@@ -1868,6 +2030,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
def start_flow_extract(self):
|
|
|
schedule = BlockingScheduler()
|
|
|
schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
|
|
|
+ schedule.add_job(self.dynamic_listener_extract,"cron",second="*/10")
|
|
|
schedule.add_job(self.process_extract_failed,"cron",minute="*/5")
|
|
|
# schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
|
|
|
# schedule.add_job(self.monitor_listener,"cron",minute="*/5")
|
|
@@ -1927,7 +2090,11 @@ class Dataflow_init(Dataflow):
|
|
|
self.begin_docid = None
|
|
|
self.mq_init = "/queue/dataflow_init"
|
|
|
self.mq_attachment = "/queue/dataflow_attachment"
|
|
|
+ self.mq_attachment_fix = "/queue/dataflow_attachment_fix"
|
|
|
+ self.mq_attachment_his = "/queue/dataflow_attachment_his"
|
|
|
self.mq_extract = "/queue/dataflow_extract"
|
|
|
+ self.mq_extract_fix = "/queue/dataflow_extract_fix"
|
|
|
+ self.mq_extract_his = "/queue/dataflow_extract_his"
|
|
|
self.pool_mq1 = ConnectorPool(1,4,getConnect_activateMQ)
|
|
|
|
|
|
def on_error(self, headers):
|
|
@@ -1957,10 +2124,23 @@ class Dataflow_init(Dataflow):
|
|
|
body[document_original_docchannel] = body.get(document_docchannel)
|
|
|
page_attachments = body.get(document_tmp_attachment_path,"[]")
|
|
|
_uuid = body.get(document_tmp_uuid,"")
|
|
|
+ current_date = getCurrent_date(format='%Y-%m-%d')
|
|
|
+ last_7_date = timeAdd(current_date,-7,format='%Y-%m-%d')
|
|
|
+ page_time = body.get(document_page_time,"")
|
|
|
+ if page_time<last_7_date:
|
|
|
+ is_his = True
|
|
|
+ else:
|
|
|
+ is_his = False
|
|
|
+
|
|
|
+
|
|
|
if page_attachments!="[]":
|
|
|
status = random.randint(1,10)
|
|
|
body[document_tmp_status] = status
|
|
|
- if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment):
|
|
|
+ if not is_his:
|
|
|
+ queue_name = self.mq_attachment
|
|
|
+ else:
|
|
|
+ queue_name = self.mq_attachment_his
|
|
|
+ if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),queue_name):
|
|
|
log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
|
|
|
ackMsg(self.conn,message_id)
|
|
|
else:
|
|
@@ -1968,7 +2148,11 @@ class Dataflow_init(Dataflow):
|
|
|
else:
|
|
|
status = random.randint(11,50)
|
|
|
body[document_tmp_status] = status
|
|
|
- if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract):
|
|
|
+ if not is_his:
|
|
|
+ queue_name = self.mq_extract
|
|
|
+ else:
|
|
|
+ queue_name = self.mq_extract_his
|
|
|
+ if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),queue_name):
|
|
|
log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
|
|
|
ackMsg(self.conn,message_id)
|
|
|
else:
|
|
@@ -1990,7 +2174,11 @@ class Dataflow_init(Dataflow):
|
|
|
self.mq_init = "/queue/dataflow_init"
|
|
|
|
|
|
self.mq_attachment = "/queue/dataflow_attachment"
|
|
|
+ self.mq_attachment_fix = "/queue/dataflow_attachment_fix"
|
|
|
+ self.mq_attachment_his = "/queue/dataflow_attachment_his"
|
|
|
self.mq_extract = "/queue/dataflow_extract"
|
|
|
+ self.mq_extract_fix = "/queue/dataflow_extract_fix"
|
|
|
+ self.mq_extract_his = "/queue/dataflow_extract_his"
|
|
|
self.pool_oracle = ConnectorPool(10,15,getConnection_oracle)
|
|
|
self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
|
|
|
|
|
@@ -2079,14 +2267,31 @@ class Dataflow_init(Dataflow):
|
|
|
ots_dict["docid"] += self.base_shenpi_id
|
|
|
ots_dict["partitionkey"] = ots_dict["docid"]%500+1
|
|
|
|
|
|
+ current_date = getCurrent_date(format='%Y-%m-%d')
|
|
|
+ last_7_date = timeAdd(current_date,-7,format='%Y-%m-%d')
|
|
|
+ page_time = ots_dict.get(document_page_time,"")
|
|
|
+ if page_time<last_7_date:
|
|
|
+ is_his = True
|
|
|
+ else:
|
|
|
+ is_his = False
|
|
|
+
|
|
|
+
|
|
|
if ots_dict.get(T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS,"") !='[]':
|
|
|
- if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_attachment):
|
|
|
+ if not is_his:
|
|
|
+ queue_name = self.mq_attachment
|
|
|
+ else:
|
|
|
+ queue_name = self.mq_attachment_his
|
|
|
+ if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),queue_name):
|
|
|
self.max_shenpi_id = _id
|
|
|
else:
|
|
|
log("sent shenpi message to mq failed %s"%(_id))
|
|
|
break
|
|
|
else:
|
|
|
- if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_extract):
|
|
|
+ if not is_his:
|
|
|
+ queue_name = self.mq_extract
|
|
|
+ else:
|
|
|
+ queue_name = self.mq_extract_his
|
|
|
+ if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),queue_name):
|
|
|
self.max_shenpi_id = _id
|
|
|
else:
|
|
|
log("sent shenpi message to mq failed %s"%(_id))
|
|
@@ -2232,11 +2437,11 @@ class Dataflow_init(Dataflow):
|
|
|
if page_attachments!="[]":
|
|
|
status = random.randint(1,10)
|
|
|
_data[document_tmp_status] = status
|
|
|
- send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
|
|
|
+ send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment_fix)
|
|
|
else:
|
|
|
status = random.randint(11,50)
|
|
|
_data[document_tmp_status] = status
|
|
|
- send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
|
|
|
+ send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract_fix)
|
|
|
if send_succeed:
|
|
|
_document.setValue(document_tmp_status,0,True)
|
|
|
_document.update_row(self.ots_client)
|
|
@@ -2282,11 +2487,11 @@ class Dataflow_init(Dataflow):
|
|
|
if page_attachments!="[]":
|
|
|
status = random.randint(1,10)
|
|
|
_data[document_tmp_status] = status
|
|
|
- send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
|
|
|
+ send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment_fix)
|
|
|
else:
|
|
|
status = random.randint(11,50)
|
|
|
_data[document_tmp_status] = status
|
|
|
- send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
|
|
|
+ send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract_fix)
|
|
|
if send_succeed:
|
|
|
_document.setValue(document_tmp_status,1,True)
|
|
|
_document.update_row(self.ots_client)
|