# encoding:utf-8 import base64 import json import logging import os import random import sys import time import traceback from glob import glob import numpy as np import cv2 os.environ["CUDA_VISIBLE_DEVICES"] = "-1" sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") from flask import Flask, request from utils import request_post, bytes2np, np2bytes, base64_decode, image_to_str, str_to_image from puzzle_detect.pzd_interface import get_puzzle_tip_location logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') bdr_url = "http://127.0.0.1:17055/bdr" chd_url = "http://127.0.0.1:17056/chd" chr_url = "http://127.0.0.1:17057/chr" cho_url = "http://127.0.0.1:17058/cho" pzd_url = "http://127.0.0.1:17059/pzd" ced_url = "http://127.0.0.1:17060/ced" cer_url = "http://127.0.0.1:17061/cer" cac_url = "http://127.0.0.1:17062/cac" captcha_url = "http://127.0.0.1:17054/captcha" # 验证码类型: # 1: 拖拽还原拼图(多个拼图干扰) # 2: 拖拽还原图片 # 3: 点击中文(带提示文字图片) # 4: 点击中文(按语序) # 5: 点击中文(带提示文字) # 6: 计算算式(带中文) # 7: 验证码类型分类 # 接口配置 app = Flask(__name__) @app.route('/captcha', methods=['POST']) def captcha(): start_time = time.time() logging.info("into captcha_interface captcha") try: # 接收网络数据 if not request.form: logging.info("captcha no data!") return json.dumps({"success": False, "cost": time.time()-start_time}) base64_data = request.form.get("base64pic") base64_data2 = request.form.get("base64pic2") tip_char = request.form.get("char") code = int(request.form.get("code")) if tip_char: logging.info("tip_char " + str(tip_char)) logging.info("code " + str(code)) logging.info("captcha_interface get data time" + str(time.time()-start_time)) if base64_data2 is not None: result = get_captcha_result([base64_data, base64_data2], code) elif tip_char is not None: result = get_captcha_result([base64_data, tip_char], code) else: result = get_captcha_result([base64_data], code) if result is None: logging.info("success False") return json.dumps({"success": False, "cost": time.time()-start_time}) logging.info("success True") return json.dumps({"predict": result, "success": True, "cost": time.time()-start_time}) except: traceback.print_exc() logging.info("success False") return json.dumps({"success": False, "cost": time.time()-start_time}) finally: logging.info("captcha interface finish time " + str(time.time()-start_time)) def get_captcha_result(base64_list, code): """ 验证码类型: 1: 拖拽还原拼图(多个拼图干扰) 2: 拖拽还原图片 3: 点击中文(带提示文字图片) 4: 点击中文(按语序) 5: 点击中文(带提示文字) 6: 计算算式(带中文) 7: 验证码类型分类 8: 识别图中中文 :param base64_list: base64格式图片列表 :param code: 验证码类型 :return: """ if code == 1: result = json.loads(request_post(pzd_url, prepare_data(base64_list[0], "pzd"))) if result.get("success"): predict = result.get("data") logging.info("code " + str(code) + " pzd " + "predict " + str(predict)) if len(predict) <= 1: return predict else: h_list = get_puzzle_tip_location(base64_list[1]) min_len = 10000 new_predict = None for box in predict: if abs(box[1]-h_list[0]) + abs(box[3]-h_list[1]) < min_len: new_predict = [box] min_len = abs(box[1]-h_list[0]) + abs(box[3]-h_list[1]) logging.info("code " + str(code) + " pzd " + "match predict " + str(new_predict)) return new_predict elif code == 2: result = json.loads(request_post(bdr_url, prepare_data(base64_list[0], "bdr"))) if result.get("success"): predict = result.get("data") logging.info("code " + str(code) + " bdr " + "predict " + str(predict)) return predict elif code == 3: # detect tips result = json.loads(request_post(chd_url, prepare_data(base64_list[1], "chd2"))) if result.get("success"): box_list_tips = result.get("data") else: return None logging.info("code " + str(code) + " chd2 " + "predict " + str(box_list_tips)) if len(box_list_tips) == 0: return None # recognize tips result = json.loads(request_post(chr_url, prepare_data([base64_list[1], box_list_tips], "chr"))) if result.get("success"): char_list_tips = result.get("data") else: return None logging.info("code " + str(code) + " chr " + "predict " + str(char_list_tips)) if len(char_list_tips) != len(box_list_tips): return None # detect result = json.loads(request_post(chd_url, prepare_data(base64_list[0], "chd"))) if result.get("success"): box_list = result.get("data") else: return None logging.info("code " + str(code) + " chd " + "predict " + str(box_list)) if len(box_list) == 0: return None # recognize result = json.loads(request_post(chr_url, prepare_data([base64_list[0], box_list], "chr"))) if result.get("success"): char_list = result.get("data") else: return None logging.info("code " + str(code) + " chr " + "predict " + str(char_list)) if len(char_list) != len(box_list): return None for c in char_list_tips: if c not in char_list: return None _dict = {char_list[i]: box_list[i] for i in range(len(box_list))} predict = [_dict.get(x) for x in char_list_tips] return predict elif code == 4: # detect result = json.loads(request_post(chd_url, prepare_data(base64_list[0], "chd"))) if result.get("success"): box_list = result.get("data") else: return None logging.info("code " + str(code) + " chd " + "predict " + str(box_list)) if len(box_list) == 0: return None # recognize result = json.loads(request_post(chr_url, prepare_data([base64_list[0], box_list], "chr"))) if result.get("success"): char_list = result.get("data") else: return None logging.info("code " + str(code) + " chr " + "predict " + str(char_list)) if len(char_list) != len(box_list): return None # order result = json.loads(request_post(cho_url, prepare_data(char_list, "cho"))) if result.get("success"): ordered_char_list = result.get("data") else: return None logging.info("code " + str(code) + " cho " + "predict " + str(ordered_char_list)) _dict = {char_list[i]: box_list[i] for i in range(len(box_list))} predict = [_dict.get(x) for x in ordered_char_list] return predict elif code == 5: # detect result = json.loads(request_post(chd_url, prepare_data(base64_list[0], "chd"))) if result.get("success"): box_list = result.get("data") else: return None logging.info("code " + str(code) + " chd " + "predict " + str(box_list)) if len(box_list) == 0: return None # recognize result = json.loads(request_post(chr_url, prepare_data([base64_list[0], box_list], "chr"))) if result.get("success"): char_list = result.get("data") else: return None logging.info("code " + str(code) + " chr " + "predict " + str(char_list)) if len(char_list) != len(box_list): return None tip_char = base64_list[1] _dict = {char_list[i]: box_list[i] for i in range(len(box_list))} predict = [_dict.get(x) for x in tip_char] # guess if predict.count(None) == 1: guess_list = [] for c in char_list: if c not in tip_char: guess_list.append(_dict.get(c)) # print("guess_list", guess_list) predict[predict.index(None)] = random.choice(guess_list) return predict if None in predict: return None else: return predict elif code == 6: # denoise # result = json.loads(request_post(ced_url, prepare_data(base64_list[0], "ced"))) # if result.get("success"): # image_np = str_to_image(result.get("data")) # else: # return None # logging.info("code " + str(code) + " ced " + "predict success") # recognize result = json.loads(request_post(cer_url, prepare_data(base64_list[0], "cer"))) if result.get("success"): equation_result = result.get("data") else: return None logging.info("code " + str(code) + " cer " + "predict " + str(equation_result)) return equation_result elif code == 7: result = json.loads(request_post(cac_url, prepare_data(base64_list[0], "cac"))) if result.get("success"): predict = result.get("data") logging.info("code " + str(code) + " cac " + "predict " + str(predict)) return predict elif code == 8: # detect result = json.loads(request_post(chd_url, prepare_data(base64_list[0], "chd"))) if result.get("success"): box_list = result.get("data") else: return None logging.info("code " + str(code) + " chd " + "predict " + str(box_list)) if len(box_list) == 0: return None # recognize result = json.loads(request_post(chr_url, prepare_data([base64_list[0], box_list], "chr"))) if result.get("success"): char_list = result.get("data") else: return None logging.info("code " + str(code) + " chr " + "predict " + str(char_list)) if len(char_list) != len(box_list): return None return char_list return None def prepare_data(data, _type): if _type in ['pzd', 'bdr', 'chd', 'ced', 'cac', 'cer']: return {"data": data} elif _type in ['chd2']: return {"data": data, "tips": 1} elif _type in ['chr']: image_base64 = data[0] box_list = data[1] image_bytes = base64_decode(image_base64) image_np = bytes2np(image_bytes) str_list = [] for box in box_list: file_str = image_to_str(image_np[box[1]:box[3], box[0]:box[2], :]) # file_bytes = np2bytes(image_np[box[1]:box[3], box[0]:box[2], :]) # file_base64 = base64.b64encode(file_bytes) # file_str = file_base64.decode("utf-8") str_list.append(file_str) return {"data": json.dumps(str_list)} elif _type in ['cho']: return {"data": json.dumps(data)} # elif _type in ['cer']: # data = image_to_str(data) # return {"data": json.dumps(data)} else: raise def test_interface(from_remote=True): captcha_url = "http://192.168.2.103:17054/captcha" # captcha_url = "http://127.0.0.1:17054/captcha" # paths = glob("D:/Project/captcha/data/test/yolo_15.jpg") # paths = glob("D:/Project/captcha/data/test/FileInfo1021/19584571-511d-11ed-93ac-b4b5b67760ae_3.jpg") # paths = glob("yolo_15.jpg") # paths = glob(r'C:\Users\Administrator\Desktop\test_capture\puzzle8.png') # paths = glob("D:/Project/captcha/data/equation1/227_1.jpg") paths = glob(r'C:\Users\Administrator\Downloads\1.jpg') for file_path in paths: img_np = cv2.imread(file_path) file_bytes = np2bytes(img_np) file_base64 = base64.b64encode(file_bytes) # img_np = None # with open(file_path, 'r') as f: # file_base64 = f.read() code = 6 if from_remote: if code in [1]: file_base64_2 = cv2.imread(r"C:\Users\Administrator\Desktop\test_capture\puzzle7.png") file_base64_2 = np2bytes(file_base64_2) file_base64_2 = base64.b64encode(file_base64_2) file_json = {"base64pic": file_base64, 'base64pic2': file_base64_2, "code": code} result = json.loads(request_post(captcha_url, file_json)) print("result", result) if result.get("success"): out_boxes = result.get("predict") print("out_boxes", out_boxes) for box in out_boxes: cv2.rectangle(img_np, (box[0], box[1]), (box[2], box[3]), (0, 0, 255)) cv2.imshow("img_np", img_np) cv2.waitKey(0) else: print("failed!") elif code in [4]: file_json = {"base64pic": file_base64, "code": code} result = json.loads(request_post(captcha_url, file_json)) print("result", result) if result.get("success"): out_boxes = result.get("predict") print("out_boxes", out_boxes) for box in out_boxes: cv2.rectangle(img_np, (box[0], box[1]), (box[2], box[3]), (0, 0, 255)) cv2.imshow("img_np", img_np) cv2.waitKey(0) else: print("failed!") elif code in [2]: file_json = {"base64pic": file_base64, "code": code} result = json.loads(request_post(captcha_url, file_json)) print("result", result) if result.get("success"): w = int(result.get("predict")) print("w", w) img_new = np.concatenate([img_np[:, w:, :], img_np[:, :w, :]], axis=1) cv2.imshow("img_np", img_np) cv2.imshow("img_new", img_new) cv2.waitKey(0) else: print("failed!") elif code in [3]: file_base64_2 = cv2.imread("../dev/click_captcha/data/test/yolo_3.jpg") cv2.imshow("file_base64_2", file_base64_2) cv2.waitKey(0) file_base64_2 = np2bytes(file_base64_2) file_base64_2 = base64.b64encode(file_base64_2) file_json = {"base64pic": file_base64, "base64pic2": file_base64_2, "code": code} result = json.loads(request_post(captcha_url, file_json)) print("result", result) if result.get("success"): out_boxes = result.get("predict") print("out_boxes", out_boxes) for box in out_boxes: cv2.rectangle(img_np, (box[0], box[1]), (box[2], box[3]), (0, 0, 255)) cv2.imshow("img_np", img_np) cv2.waitKey(0) else: print("failed!") elif code in [5]: tip_char = "蠢贱榨" file_json = {"base64pic": file_base64, "char": tip_char, "code": code} result = json.loads(request_post(captcha_url, file_json)) print("result", result) if result.get("success"): out_boxes = result.get("predict") print("out_boxes", out_boxes) for box in out_boxes: cv2.rectangle(img_np, (box[0], box[1]), (box[2], box[3]), (0, 0, 255)) cv2.imshow("img_np", img_np) cv2.waitKey(0) elif code in [6]: file_json = {"base64pic": file_base64, "code": code} result = json.loads(request_post(captcha_url, file_json)) print("result", result) if result.get("success"): equation_result = result.get("predict") print(equation_result) elif code in [7]: file_json = {"base64pic": file_base64, "code": code} result = json.loads(request_post(captcha_url, file_json)) print("result", result) if result.get("success"): classify_result = result.get("predict") print(classify_result) elif code in [8]: file_json = {"base64pic": file_base64, "code": code} result = json.loads(request_post(captcha_url, file_json)) print("result", result) if result.get("success"): chars = result.get("predict") print("chars", chars) else: print("failed!") if __name__ == "__main__": # app.run(host='127.0.0.1', port=17054, debug=False) test_interface()