123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- import base64
- import json
- import logging
- import os
- import sys
- import time
- import traceback
- from glob import glob
- import numpy as np
- import cv2
- sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../")
- from flask import Flask, request
- from utils import request_post, bytes2np, np2bytes
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- bdr_url = "http://127.0.0.1:17055/bdr"
- chd_url = "http://127.0.0.1:17056/chd"
- chr_url = "http://127.0.0.1:17057/chr"
- cho_url = "http://127.0.0.1:17058/cho"
- pzd_url = "http://127.0.0.1:17059/pzd"
- captcha_url = "http://127.0.0.1:17054/captcha"
- # 接口配置
- app = Flask(__name__)
- @app.route('/captcha', methods=['POST'])
- def captcha():
- start_time = time.time()
- logging.info("into captcha_interface captcha")
- try:
- # 接收网络数据
- if not request.form:
- logging.info("captcha no data!")
- return json.dumps({"success": False, "cost": time.time()-start_time})
- base64_data = request.form.get("base64pic")
- base64_data2 = request.form.get("base64pic2")
- code = int(request.form.get("code"))
- logging.info("code " + str(code))
- logging.info("captcha_interface get data time" + str(time.time()-start_time))
- if base64_data2 is None:
- result = get_captcha_result([base64_data], code)
- else:
- result = get_captcha_result([base64_data, base64_data2], code)
- if result is None:
- return json.dumps({"success": False, "cost": time.time()-start_time})
- return json.dumps({"predict": result, "success": True, "cost": time.time()-start_time})
- except:
- traceback.print_exc()
- return json.dumps({"success": False, "cost": time.time()-start_time})
- finally:
- logging.info("captcha interface finish time " + str(time.time()-start_time))
- def get_captcha_result(base64_list, code):
- """
- 验证码类型:
- 1: 拖拽还原拼图
- 2: 拖拽还原图片
- 3: 点击中文(带提示)
- 4: 点击中文(按语序)
- 5:
- :param base64_list: base64格式图片列表
- :param code: 验证码类型
- :return:
- """
- if code == 1:
- result = json.loads(request_post(pzd_url, prepare_data(base64_list[0], "pzd")))
- if result.get("success"):
- predict = result.get("data")
- logging.info("code " + str(code) + " pzd " + "predict " + str(predict))
- return predict
- elif code == 2:
- result = json.loads(request_post(bdr_url, prepare_data(base64_list[0], "bdr")))
- if result.get("success"):
- predict = result.get("data")
- logging.info("code " + str(code) + " bdr " + "predict " + str(predict))
- return predict
- elif code == 3:
- # detect tips
- result = json.loads(request_post(chd_url, prepare_data(base64_list[1], "chd2")))
- if result.get("success"):
- box_list_tips = result.get("data")
- else:
- return None
- logging.info("code " + str(code) + " chd2 " + "predict " + str(box_list_tips))
- # recognize tips
- result = json.loads(request_post(chr_url, prepare_data([base64_list[1], box_list_tips], "chr")))
- if result.get("success"):
- char_list_tips = result.get("data")
- else:
- return None
- logging.info("code " + str(code) + " chr " + "predict " + str(char_list_tips))
- if len(char_list_tips) != len(box_list_tips):
- return None
- # detect
- result = json.loads(request_post(chd_url, prepare_data(base64_list[0], "chd")))
- if result.get("success"):
- box_list = result.get("data")
- else:
- return None
- logging.info("code " + str(code) + " chd " + "predict " + str(box_list))
- # recognize
- result = json.loads(request_post(chr_url, prepare_data([base64_list[0], box_list], "chr")))
- if result.get("success"):
- char_list = result.get("data")
- else:
- return None
- logging.info("code " + str(code) + " chr " + "predict " + str(char_list))
- if len(char_list) != len(box_list):
- return None
- for c in char_list_tips:
- if c not in char_list:
- return None
- _dict = {char_list[i]: box_list[i] for i in range(len(box_list))}
- predict = [_dict.get(x) for x in char_list_tips]
- return predict
- elif code == 4:
- # detect
- result = json.loads(request_post(chd_url, prepare_data(base64_list[0], "chd")))
- if result.get("success"):
- box_list = result.get("data")
- else:
- return None
- logging.info("code " + str(code) + " chd " + "predict " + str(box_list))
- # recognize
- result = json.loads(request_post(chr_url, prepare_data([base64_list[0], box_list], "chr")))
- if result.get("success"):
- char_list = result.get("data")
- else:
- return None
- logging.info("code " + str(code) + " chr " + "predict " + str(char_list))
- if len(char_list) != len(box_list):
- return None
- # order
- result = json.loads(request_post(cho_url, prepare_data(char_list, "cho")))
- if result.get("success"):
- ordered_char_list = result.get("data")
- else:
- return None
- logging.info("code " + str(code) + " cho " + "predict " + str(ordered_char_list))
- _dict = {char_list[i]: box_list[i] for i in range(len(box_list))}
- predict = [_dict.get(x) for x in ordered_char_list]
- return predict
- return None
- def prepare_data(data, _type):
- if _type in ['pzd', 'bdr', 'chd']:
- return {"data": data}
- elif _type in ['chd2']:
- return {"data": data, "tips": 1}
- elif _type in ['chr']:
- image_base64 = data[0]
- box_list = data[1]
- image_bytes = base64.b64decode(image_base64)
- image_np = bytes2np(image_bytes)
- str_list = []
- for box in box_list:
- file_bytes = np2bytes(image_np[box[1]:box[3], box[0]:box[2], :])
- file_base64 = base64.b64encode(file_bytes)
- file_str = file_base64.decode("utf-8")
- str_list.append(file_str)
- return {"data": json.dumps(str_list)}
- elif _type in ['cho']:
- return {"data": json.dumps(data)}
- else:
- raise
- def test_interface(from_remote=True):
- captcha_url = "http://192.168.2.103:17054/captcha"
- paths = glob("../dev/click_captcha/data/test/yolo_19.jpg")
- for file_path in paths:
- img_np = cv2.imread(file_path)
- file_bytes = np2bytes(img_np)
- file_base64 = base64.b64encode(file_bytes)
- code = 1
- if from_remote:
- if code in [1, 4]:
- file_json = {"base64pic": file_base64, "code": code}
- result = json.loads(request_post(captcha_url, file_json))
- print("result", result)
- if result.get("success"):
- out_boxes = result.get("predict")
- print("out_boxes", out_boxes)
- for box in out_boxes:
- cv2.rectangle(img_np, (box[0], box[1]), (box[2], box[3]), (0, 0, 255))
- cv2.imshow("img_np", img_np)
- cv2.waitKey(0)
- else:
- print("failed!")
- elif code in [2]:
- file_json = {"base64pic": file_base64, "code": code}
- result = json.loads(request_post(captcha_url, file_json))
- print("result", result)
- if result.get("success"):
- w = int(result.get("predict"))
- print("w", w)
- img_new = np.concatenate([img_np[:, w:, :], img_np[:, :w, :]], axis=1)
- cv2.imshow("img_np", img_np)
- cv2.imshow("img_new", img_new)
- cv2.waitKey(0)
- else:
- print("failed!")
- elif code in [3]:
- file_base64_2 = cv2.imread("../dev/click_captcha/data/test/yolo_3.jpg")
- cv2.imshow("file_base64_2", file_base64_2)
- cv2.waitKey(0)
- file_base64_2 = np2bytes(file_base64_2)
- file_base64_2 = base64.b64encode(file_base64_2)
- file_json = {"base64pic": file_base64, "base64pic2": file_base64_2, "code": code}
- result = json.loads(request_post(captcha_url, file_json))
- print("result", result)
- if result.get("success"):
- out_boxes = result.get("predict")
- print("out_boxes", out_boxes)
- for box in out_boxes:
- cv2.rectangle(img_np, (box[0], box[1]), (box[2], box[3]), (0, 0, 255))
- cv2.imshow("img_np", img_np)
- cv2.waitKey(0)
- else:
- print("failed!")
- if __name__ == "__main__":
- # app.run(host='127.0.0.1', port=17054, debug=False)
- test_interface()
|