123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- import base64
- import json
- import logging
- import os
- import sys
- import time
- import traceback
- from multiprocessing import Process, RLock
- from flask import Flask, request
- from werkzeug.exceptions import NotFound
- sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../")
- from format_convert.utils import get_platform, get_ip_port, request_post, get_intranet_ip
- # 接口配置
- app = Flask(__name__)
- @app.route('/schedule', methods=['POST'])
- def _schedule():
- logging.info("into _schedule")
- _lock = globals().get("lock")
- start_time = time.time()
- try:
- _lock.acquire()
- if not request.form:
- logging.info("_schedule no data!")
- return {"data": [-9]}
- interface_type = request.form.get("interface_type")
- _ip, _port = interface_pool(interface_type)
- logging.info("_schedule " + _ip + " " + _port)
- return {"data": [_ip, _port]}
- except NotFound:
- logging.info("_schedule cannot find " + interface_type + " 's interfaces! Please Checkout")
- return {"data": [-2]}
- except:
- traceback.print_exc()
- logging.info("_schedule failed!")
- return {"data": [-1]}
- finally:
- _lock.release()
- logging.info("_schedule cost " + str(time.time()-start_time))
- def interface_pool(interface_type):
- ip_port_flag_dict = globals().get("ip_port_flag")
- ip_port_dict = globals().get("ip_port")
- # print(ip_port_flag_dict)
- # print(ip_port_dict)
- # 负载均衡, 选取ip
- interface_load_list = []
- for _ip in ip_port_flag_dict.keys():
- if ip_port_dict.get(_ip).get(interface_type):
- load_scale = ip_port_flag_dict.get(_ip).get(interface_type) / len(ip_port_dict.get(_ip).get(interface_type))
- interface_load_list.append([_ip, load_scale])
- if not interface_load_list:
- raise NotFound
- interface_load_list.sort(key=lambda x: x[-1])
- _ip = interface_load_list[0][0]
- # 负载均衡, 选取port
- port_index = ip_port_flag_dict.get(_ip).get(interface_type) % len(ip_port_dict.get(_ip).get(interface_type))
- _port = ip_port_dict.get(_ip).get(interface_type)[port_index]
- # 更新flag
- current_flag = globals().get("ip_port_flag").get(_ip).get(interface_type)
- if current_flag >= 10000:
- globals()["ip_port_flag"][_ip][interface_type] = 0
- else:
- globals()["ip_port_flag"][_ip][interface_type] = current_flag + 1
- return _ip, _port
- def set_flask_global():
- # 接口轮询所需锁、参数
- globals().update({"lock": RLock()})
- ip_port_flag = {}
- ip_port_dict = get_ip_port()
- for _k in ip_port_dict.keys():
- ip_port_flag.update({_k: {"ocr": 0,
- "otr": 0,
- "convert": 0,
- "office": 0
- }})
- globals().update({"ip_port_flag": ip_port_flag})
- globals().update({"ip_port": ip_port_dict})
- # print(globals().get("ip_port"))
- def test_schedule(interface_type):
- _url = 'http://127.0.0.1:15011/schedule'
- # _url = 'http://192.168.2.102:15011/schedule'
- # _url = 'http://172.16.160.65:15011/schedule'
- data = {"interface_type": interface_type}
- result = json.loads(request_post(_url, data, time_out=10000)).get("data")
- print(result)
- if __name__ == "__main__":
- set_flask_global()
- if len(sys.argv) == 2:
- port = int(sys.argv[1])
- else:
- port = 15011
- ip = get_intranet_ip()
- logging.basicConfig(level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - '
- + ip + ' - ' + str(port) + ' - %(message)s')
- app.run(host='0.0.0.0', port=port, threaded=True, debug=False)
- logging.info("Schedule running "+str(port))
- # for i in range(10):
- # p = Process(target=test_schedule, args=("ocr", ))
- # p.start()
- # p = Process(target=test_schedule, args=("otr", ))
- # p.start()
- # p = Process(target=test_schedule, args=("office", ))
- # p.start()
- # p.join()
|