import base64 import json import logging import os import sys import time import traceback from multiprocessing import Process, RLock from flask import Flask, request from werkzeug.exceptions import NotFound sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") from format_convert.utils import get_platform, get_ip_port, request_post, get_intranet_ip # 接口配置 app = Flask(__name__) @app.route('/schedule', methods=['POST']) def _schedule(): logging.info("into _schedule") _lock = globals().get("lock") start_time = time.time() try: _lock.acquire() if not request.form: logging.info("_schedule no data!") return {"data": [-9]} interface_type = request.form.get("interface_type") _ip, _port = interface_pool(interface_type) logging.info("_schedule " + _ip + " " + _port) return {"data": [_ip, _port]} except NotFound: logging.info("_schedule cannot find " + interface_type + " 's interfaces! Please Checkout") return {"data": [-2]} except: traceback.print_exc() logging.info("_schedule failed!") return {"data": [-1]} finally: _lock.release() logging.info("_schedule cost " + str(time.time()-start_time)) def interface_pool(interface_type): ip_port_flag_dict = globals().get("ip_port_flag") ip_port_dict = globals().get("ip_port") # print(ip_port_flag_dict) # print(ip_port_dict) # 负载均衡, 选取ip interface_load_list = [] for _ip in ip_port_flag_dict.keys(): if ip_port_dict.get(_ip).get(interface_type): load_scale = ip_port_flag_dict.get(_ip).get(interface_type) / len(ip_port_dict.get(_ip).get(interface_type)) interface_load_list.append([_ip, load_scale]) if not interface_load_list: raise NotFound interface_load_list.sort(key=lambda x: x[-1]) _ip = interface_load_list[0][0] # 负载均衡, 选取port port_index = ip_port_flag_dict.get(_ip).get(interface_type) % len(ip_port_dict.get(_ip).get(interface_type)) _port = ip_port_dict.get(_ip).get(interface_type)[port_index] # 更新flag current_flag = globals().get("ip_port_flag").get(_ip).get(interface_type) if current_flag >= 10000: globals()["ip_port_flag"][_ip][interface_type] = 0 else: globals()["ip_port_flag"][_ip][interface_type] = current_flag + 1 return _ip, _port def set_flask_global(): # 接口轮询所需锁、参数 globals().update({"lock": RLock()}) ip_port_flag = {} ip_port_dict = get_ip_port() for _k in ip_port_dict.keys(): ip_port_flag.update({_k: {"ocr": 0, "otr": 0, "convert": 0, "office": 0 }}) globals().update({"ip_port_flag": ip_port_flag}) globals().update({"ip_port": ip_port_dict}) # print(globals().get("ip_port")) def test_schedule(interface_type): _url = 'http://127.0.0.1:15011/schedule' # _url = 'http://192.168.2.102:15011/schedule' # _url = 'http://172.16.160.65:15011/schedule' data = {"interface_type": interface_type} result = json.loads(request_post(_url, data, time_out=10000)).get("data") print(result) if __name__ == "__main__": set_flask_global() if len(sys.argv) == 2: port = int(sys.argv[1]) else: port = 15011 ip = get_intranet_ip() logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - ' + ip + ' - ' + str(port) + ' - %(message)s') app.run(host='0.0.0.0', port=port, threaded=True, debug=False) logging.info("Schedule running "+str(port)) # for i in range(10): # p = Process(target=test_schedule, args=("ocr", )) # p.start() # p = Process(target=test_schedule, args=("otr", )) # p.start() # p = Process(target=test_schedule, args=("office", )) # p.start() # p.join()