12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- import time
- dest="/queue/dataflow_attachment"
- messages = 10
- data = "test from BeijingJiaotong"
- import uuid
- import traceback
- from BaseDataMaintenance.common.Utils import log
- def send_msg_toacmq(pool_conn,msg,dest,retry_times=5):
- for _ in range(retry_times):
- conn = pool_conn.getConnector()
- try:
- conn.send(body=str(msg), destination=dest, persistent='false')
- return True
- except Exception as e:
- traceback.print_exc()
- conn.disconnect()
- finally:
- if conn.is_connected():
- pool_conn.putConnector(conn)
- else:
- del conn
- return False
- class MyListener(object):
- def __init__(self, conn,subscription):
- self.conn = conn
- self.count = 0
- self.subscription = subscription
- self.start = time.time()
- def on_error(self, headers):
- print('received an error %s' % headers.body)
- def on_message(self, headers):
- message_id = headers.headers["message-id"]
- body = headers.body
- print(self.subscription,body)
- self.conn.disconnect()
- # self.conn.nack(message_id,"")
- def ackMsg(conn,message_id,subscription=""):
- log("ack message:%s"%(message_id))
- try:
- conn.ack(message_id,subscription)
- except Exception as e:
- traceback.print_exc()
- def createComsumer(listener,dest,ack="client-individual",*args,**kwargs):
- conn = listener.conn
- substription = uuid.uuid4().hex
- conn.set_listener(substription,listener)
- # conn.subscribe(destination=dest, ack=ack, id="")
- conn.subscribe(destination=dest,ack=ack,id="", headers={'activemq.prefetchSize': 1})
- def test():
- from BaseDataMaintenance.dataSource.source import getConnect_activateMQ
- from BaseDataMaintenance.dataSource.pool import ConnectorPool
- activateMQ_conn_pool = ConnectorPool(10,30,getConnect_activateMQ)
- import uuid
- # for i in range(10):
- # conn = activateMQ_conn_pool.getConnector()
- # send_msg_toacmq(conn,"test msg",dest)
- # activateMQ_conn_pool.putConnector(conn)
- conn = getConnect_activateMQ()
- #
- substription = uuid.uuid4().hex
- conn.set_listener(substription,MyListener(conn,substription))
- conn.subscribe(destination=dest, ack='client-individual', id="")
- # conn1 = getConnect_activateMQ()
- # substription = uuid.uuid4().hex
- # conn1.set_listener(substription,MyListener(conn1,substription))
- # conn1.subscribe(destination=dest, ack='client-individual', id="")
- # for i in range(100):
- # send_msg_toacmq(conn,"test msg%d"%i,dest)
- # time.sleep(1)
- while 1:
- print(conn.is_connected())
- time.sleep(1)
- if __name__ == '__main__':
- test()
|