import base64 import json import logging import os import sys import time import traceback from glob import glob import numpy as np import cv2 sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") from flask import Flask, request from utils import request_post, bytes2np, np2bytes logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') bdr_url = "http://127.0.0.1:17055/bdr" chd_url = "http://127.0.0.1:17056/chd" chr_url = "http://127.0.0.1:17057/chr" cho_url = "http://127.0.0.1:17058/cho" pzd_url = "http://127.0.0.1:17059/pzd" captcha_url = "http://127.0.0.1:17054/captcha" # 接口配置 app = Flask(__name__) @app.route('/captcha', methods=['POST']) def captcha(): start_time = time.time() logging.info("into captcha_interface captcha") try: # 接收网络数据 if not request.form: logging.info("captcha no data!") return json.dumps({"success": False, "cost": time.time()-start_time}) base64_data = request.form.get("base64pic") base64_data2 = request.form.get("base64pic2") code = int(request.form.get("code")) logging.info("code " + str(code)) logging.info("captcha_interface get data time" + str(time.time()-start_time)) if base64_data2 is None: result = get_captcha_result([base64_data], code) else: result = get_captcha_result([base64_data, base64_data2], code) if result is None: return json.dumps({"success": False, "cost": time.time()-start_time}) return json.dumps({"predict": result, "success": True, "cost": time.time()-start_time}) except: traceback.print_exc() return json.dumps({"success": False, "cost": time.time()-start_time}) finally: logging.info("captcha interface finish time " + str(time.time()-start_time)) def get_captcha_result(base64_list, code): """ 验证码类型: 1: 拖拽还原拼图 2: 拖拽还原图片 3: 点击中文(带提示) 4: 点击中文(按语序) 5: :param base64_list: base64格式图片列表 :param code: 验证码类型 :return: """ if code == 1: result = json.loads(request_post(pzd_url, prepare_data(base64_list[0], "pzd"))) if result.get("success"): predict = result.get("data") logging.info("code " + str(code) + " pzd " + "predict " + str(predict)) return predict elif code == 2: result = json.loads(request_post(bdr_url, prepare_data(base64_list[0], "bdr"))) if result.get("success"): predict = result.get("data") logging.info("code " + str(code) + " bdr " + "predict " + str(predict)) return predict elif code == 3: # detect tips result = json.loads(request_post(chd_url, prepare_data(base64_list[1], "chd2"))) if result.get("success"): box_list_tips = result.get("data") else: return None logging.info("code " + str(code) + " chd2 " + "predict " + str(box_list_tips)) # recognize tips result = json.loads(request_post(chr_url, prepare_data([base64_list[1], box_list_tips], "chr"))) if result.get("success"): char_list_tips = result.get("data") else: return None logging.info("code " + str(code) + " chr " + "predict " + str(char_list_tips)) if len(char_list_tips) != len(box_list_tips): return None # detect result = json.loads(request_post(chd_url, prepare_data(base64_list[0], "chd"))) if result.get("success"): box_list = result.get("data") else: return None logging.info("code " + str(code) + " chd " + "predict " + str(box_list)) # recognize result = json.loads(request_post(chr_url, prepare_data([base64_list[0], box_list], "chr"))) if result.get("success"): char_list = result.get("data") else: return None logging.info("code " + str(code) + " chr " + "predict " + str(char_list)) if len(char_list) != len(box_list): return None for c in char_list_tips: if c not in char_list: return None _dict = {char_list[i]: box_list[i] for i in range(len(box_list))} predict = [_dict.get(x) for x in char_list_tips] return predict elif code == 4: # detect result = json.loads(request_post(chd_url, prepare_data(base64_list[0], "chd"))) if result.get("success"): box_list = result.get("data") else: return None logging.info("code " + str(code) + " chd " + "predict " + str(box_list)) # recognize result = json.loads(request_post(chr_url, prepare_data([base64_list[0], box_list], "chr"))) if result.get("success"): char_list = result.get("data") else: return None logging.info("code " + str(code) + " chr " + "predict " + str(char_list)) if len(char_list) != len(box_list): return None # order result = json.loads(request_post(cho_url, prepare_data(char_list, "cho"))) if result.get("success"): ordered_char_list = result.get("data") else: return None logging.info("code " + str(code) + " cho " + "predict " + str(ordered_char_list)) _dict = {char_list[i]: box_list[i] for i in range(len(box_list))} predict = [_dict.get(x) for x in ordered_char_list] return predict return None def prepare_data(data, _type): if _type in ['pzd', 'bdr', 'chd']: return {"data": data} elif _type in ['chd2']: return {"data": data, "tips": 1} elif _type in ['chr']: image_base64 = data[0] box_list = data[1] image_bytes = base64.b64decode(image_base64) image_np = bytes2np(image_bytes) str_list = [] for box in box_list: file_bytes = np2bytes(image_np[box[1]:box[3], box[0]:box[2], :]) file_base64 = base64.b64encode(file_bytes) file_str = file_base64.decode("utf-8") str_list.append(file_str) return {"data": json.dumps(str_list)} elif _type in ['cho']: return {"data": json.dumps(data)} else: raise def test_interface(from_remote=True): captcha_url = "http://192.168.2.103:17054/captcha" paths = glob("../dev/click_captcha/data/test/yolo_1.jpg") for file_path in paths: img_np = cv2.imread(file_path) file_bytes = np2bytes(img_np) file_base64 = base64.b64encode(file_bytes) code = 3 if from_remote: if code in [1, 4]: file_json = {"base64pic": file_base64, "code": code} result = json.loads(request_post(captcha_url, file_json)) print("result", result) if result.get("success"): out_boxes = result.get("predict") print("out_boxes", out_boxes) for box in out_boxes: cv2.rectangle(img_np, (box[0], box[1]), (box[2], box[3]), (0, 0, 255)) cv2.imshow("img_np", img_np) cv2.waitKey(0) else: print("failed!") elif code in [2]: file_json = {"base64pic": file_base64, "code": code} result = json.loads(request_post(captcha_url, file_json)) print("result", result) if result.get("success"): w = int(result.get("predict")) print("w", w) img_new = np.concatenate([img_np[:, w:, :], img_np[:, :w, :]], axis=1) cv2.imshow("img_np", img_np) cv2.imshow("img_new", img_new) cv2.waitKey(0) else: print("failed!") elif code in [3]: file_base64_2 = cv2.imread("../dev/click_captcha/data/test/yolo_3.jpg") cv2.imshow("file_base64_2", file_base64_2) cv2.waitKey(0) file_base64_2 = np2bytes(file_base64_2) file_base64_2 = base64.b64encode(file_base64_2) file_json = {"base64pic": file_base64, "base64pic2": file_base64_2, "code": code} result = json.loads(request_post(captcha_url, file_json)) print("result", result) if result.get("success"): out_boxes = result.get("predict") print("out_boxes", out_boxes) for box in out_boxes: cv2.rectangle(img_np, (box[0], box[1]), (box[2], box[3]), (0, 0, 255)) cv2.imshow("img_np", img_np) cv2.waitKey(0) else: print("failed!") if __name__ == "__main__": # app.run(host='127.0.0.1', port=17054, debug=False) test_interface()