|
@@ -723,7 +723,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
def __del__(self):
|
|
|
self.conn.disconnect()
|
|
|
|
|
|
- def __init__(self):
|
|
|
+ def __init__(self,create_listener=True):
|
|
|
Dataflow_extract.__init__(self)
|
|
|
|
|
|
self.industy_url = "http://127.0.0.1:15000/industry_extract"
|
|
@@ -761,8 +761,10 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
# listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
|
|
|
# createComsumer(listener_extract,self.mq_extract)
|
|
|
# self.list_extract_comsumer.append(listener_extract)
|
|
|
- listener_p = Process(target=self.start_extract_listener)
|
|
|
- listener_p.start()
|
|
|
+
|
|
|
+ if create_listener:
|
|
|
+ listener_p = Process(target=self.start_extract_listener)
|
|
|
+ listener_p.start()
|
|
|
|
|
|
def start_extract_listener(self):
|
|
|
|
|
@@ -840,18 +842,21 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
|
|
|
|
|
|
from BaseDataMaintenance.java.MQInfo import getQueueSize
|
|
|
- extract_failed_size = getQueueSize("dataflow_extract_failed")
|
|
|
- extract_size = getQueueSize("dataflow_extract")
|
|
|
- log("extract_failed_size %s extract_size %s"%(str(extract_failed_size),str(extract_size)))
|
|
|
- if extract_failed_size>0 and extract_size<100:
|
|
|
- failed_listener = self.ExtractListener(getConnect_activateMQ(),_handle)
|
|
|
- createComsumer(failed_listener,self.mq_extract_failed)
|
|
|
- while 1:
|
|
|
- extract_failed_size = getQueueSize("dataflow_extract_failed")
|
|
|
- if extract_failed_size==0:
|
|
|
- break
|
|
|
- time.sleep(10)
|
|
|
- failed_listener.conn.disconnect()
|
|
|
+ try:
|
|
|
+ extract_failed_size = getQueueSize("dataflow_extract_failed")
|
|
|
+ extract_size = getQueueSize("dataflow_extract")
|
|
|
+ log("extract_failed_size %s extract_size %s"%(str(extract_failed_size),str(extract_size)))
|
|
|
+ if extract_failed_size>0 and extract_size<100:
|
|
|
+ failed_listener = self.ExtractListener(getConnect_activateMQ(),_handle,1)
|
|
|
+ createComsumer(failed_listener,self.mq_extract_failed)
|
|
|
+ while 1:
|
|
|
+ extract_failed_size = getQueueSize("dataflow_extract_failed")
|
|
|
+ if extract_failed_size==0:
|
|
|
+ break
|
|
|
+ time.sleep(10)
|
|
|
+ failed_listener.conn.disconnect()
|
|
|
+ except Exception as e:
|
|
|
+ traceback.print_exc()
|
|
|
|
|
|
def flow_extract(self,):
|
|
|
self.comsumer()
|