activateMQUtils.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import time
  2. dest="/queue/dataflow_attachment"
  3. messages = 10
  4. data = "test from BeijingJiaotong"
  5. import uuid
  6. import traceback
  7. from BaseDataMaintenance.common.Utils import log
  8. def send_msg_toacmq(pool_conn,msg,dest,retry_times=5):
  9. for _ in range(retry_times):
  10. conn = pool_conn.getConnector()
  11. try:
  12. conn.send(body=str(msg), destination=dest, persistent='false')
  13. return True
  14. except Exception as e:
  15. traceback.print_exc()
  16. conn.disconnect()
  17. finally:
  18. if conn.is_connected():
  19. pool_conn.putConnector(conn)
  20. else:
  21. del conn
  22. return False
  23. class MyListener(object):
  24. def __init__(self, conn,subscription):
  25. self.conn = conn
  26. self.count = 0
  27. self.subscription = subscription
  28. self.start = time.time()
  29. def on_error(self, headers):
  30. print('received an error %s' % headers.body)
  31. def on_message(self, headers):
  32. message_id = headers.headers["message-id"]
  33. body = headers.body
  34. print(self.subscription,body)
  35. self.conn.disconnect()
  36. # self.conn.nack(message_id,"")
  37. def ackMsg(conn,message_id,subscription=""):
  38. log("ack message:%s"%(message_id))
  39. try:
  40. conn.ack(message_id,subscription)
  41. except Exception as e:
  42. traceback.print_exc()
  43. def createComsumer(listener,dest,ack="client-individual",*args,**kwargs):
  44. conn = listener.conn
  45. substription = uuid.uuid4().hex
  46. conn.set_listener(substription,listener)
  47. # conn.subscribe(destination=dest, ack=ack, id="")
  48. conn.subscribe(destination=dest,ack=ack,id="", headers={'activemq.prefetchSize': 1})
  49. def test():
  50. from BaseDataMaintenance.dataSource.source import getConnect_activateMQ
  51. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  52. activateMQ_conn_pool = ConnectorPool(10,30,getConnect_activateMQ)
  53. import uuid
  54. # for i in range(10):
  55. # conn = activateMQ_conn_pool.getConnector()
  56. # send_msg_toacmq(conn,"test msg",dest)
  57. # activateMQ_conn_pool.putConnector(conn)
  58. conn = getConnect_activateMQ()
  59. #
  60. substription = uuid.uuid4().hex
  61. conn.set_listener(substription,MyListener(conn,substription))
  62. conn.subscribe(destination=dest, ack='client-individual', id="")
  63. # conn1 = getConnect_activateMQ()
  64. # substription = uuid.uuid4().hex
  65. # conn1.set_listener(substription,MyListener(conn1,substription))
  66. # conn1.subscribe(destination=dest, ack='client-individual', id="")
  67. # for i in range(100):
  68. # send_msg_toacmq(conn,"test msg%d"%i,dest)
  69. # time.sleep(1)
  70. while 1:
  71. print(conn.is_connected())
  72. time.sleep(1)
  73. if __name__ == '__main__':
  74. test()