schedule_interface.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import base64
  2. import json
  3. import logging
  4. import os
  5. import sys
  6. import time
  7. import traceback
  8. from multiprocessing import Process, RLock
  9. from flask import Flask, request
  10. from werkzeug.exceptions import NotFound
  11. sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../")
  12. from format_convert.utils import get_platform, get_ip_port, request_post, get_intranet_ip
  13. # 接口配置
  14. app = Flask(__name__)
  15. @app.route('/schedule', methods=['POST'])
  16. def _schedule():
  17. logging.info("into _schedule")
  18. _lock = globals().get("lock")
  19. start_time = time.time()
  20. try:
  21. _lock.acquire()
  22. if not request.form:
  23. logging.info("_schedule no data!")
  24. return {"data": [-9]}
  25. interface_type = request.form.get("interface_type")
  26. _ip, _port = interface_pool(interface_type)
  27. logging.info("_schedule " + _ip + " " + _port)
  28. return {"data": [_ip, _port]}
  29. except NotFound:
  30. logging.info("_schedule cannot find " + interface_type + " 's interfaces! Please Checkout")
  31. return {"data": [-2]}
  32. except:
  33. traceback.print_exc()
  34. logging.info("_schedule failed!")
  35. return {"data": [-1]}
  36. finally:
  37. _lock.release()
  38. logging.info("_schedule cost " + str(time.time()-start_time))
  39. def interface_pool(interface_type):
  40. ip_port_flag_dict = globals().get("ip_port_flag")
  41. ip_port_dict = globals().get("ip_port")
  42. # print(ip_port_flag_dict)
  43. # print(ip_port_dict)
  44. # 负载均衡, 选取ip
  45. interface_load_list = []
  46. for _ip in ip_port_flag_dict.keys():
  47. if ip_port_dict.get(_ip).get(interface_type):
  48. load_scale = ip_port_flag_dict.get(_ip).get(interface_type) / len(ip_port_dict.get(_ip).get(interface_type))
  49. interface_load_list.append([_ip, load_scale])
  50. if not interface_load_list:
  51. raise NotFound
  52. interface_load_list.sort(key=lambda x: x[-1])
  53. _ip = interface_load_list[0][0]
  54. # 负载均衡, 选取port
  55. port_index = ip_port_flag_dict.get(_ip).get(interface_type) % len(ip_port_dict.get(_ip).get(interface_type))
  56. _port = ip_port_dict.get(_ip).get(interface_type)[port_index]
  57. # 更新flag
  58. current_flag = globals().get("ip_port_flag").get(_ip).get(interface_type)
  59. if current_flag >= 10000:
  60. globals()["ip_port_flag"][_ip][interface_type] = 0
  61. else:
  62. globals()["ip_port_flag"][_ip][interface_type] = current_flag + 1
  63. return _ip, _port
  64. def set_flask_global():
  65. # 接口轮询所需锁、参数
  66. globals().update({"lock": RLock()})
  67. ip_port_flag = {}
  68. ip_port_dict = get_ip_port()
  69. for _k in ip_port_dict.keys():
  70. ip_port_flag.update({_k: {"ocr": 0,
  71. "otr": 0,
  72. "convert": 0,
  73. "office": 0
  74. }})
  75. globals().update({"ip_port_flag": ip_port_flag})
  76. globals().update({"ip_port": ip_port_dict})
  77. # print(globals().get("ip_port"))
  78. def test_schedule(interface_type):
  79. _url = 'http://127.0.0.1:15011/schedule'
  80. # _url = 'http://192.168.2.102:15011/schedule'
  81. # _url = 'http://172.16.160.65:15011/schedule'
  82. data = {"interface_type": interface_type}
  83. result = json.loads(request_post(_url, data, time_out=10000)).get("data")
  84. print(result)
  85. if __name__ == "__main__":
  86. set_flask_global()
  87. if len(sys.argv) == 2:
  88. port = int(sys.argv[1])
  89. else:
  90. port = 15011
  91. ip = get_intranet_ip()
  92. logging.basicConfig(level=logging.INFO,
  93. format='%(asctime)s - %(name)s - %(levelname)s - '
  94. + ip + ' - ' + str(port) + ' - %(message)s')
  95. app.run(host='0.0.0.0', port=port, threaded=True, debug=False)
  96. logging.info("Schedule running "+str(port))
  97. # for i in range(10):
  98. # p = Process(target=test_schedule, args=("ocr", ))
  99. # p.start()
  100. # p = Process(target=test_schedule, args=("otr", ))
  101. # p.start()
  102. # p = Process(target=test_schedule, args=("office", ))
  103. # p.start()
  104. # p.join()