captcha_interface.py 17 KB


  1. # encoding:utf-8
  2. import base64
  3. import json
  4. import logging
  5. import os
  6. import random
  7. import sys
  8. import time
  9. import traceback
  10. from glob import glob
  11. import numpy as np
  12. import cv2
  13. os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  14. sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../")
  15. from flask import Flask, request
  16. from utils import request_post, bytes2np, np2bytes, base64_decode, image_to_str, str_to_image
  17. from puzzle_detect.pzd_interface import get_puzzle_tip_location
  18. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  19. bdr_url = "http://127.0.0.1:17055/bdr"
  20. chd_url = "http://127.0.0.1:17056/chd"
  21. chr_url = "http://127.0.0.1:17057/chr"
  22. cho_url = "http://127.0.0.1:17058/cho"
  23. pzd_url = "http://127.0.0.1:17059/pzd"
  24. ced_url = "http://127.0.0.1:17060/ced"
  25. cer_url = "http://127.0.0.1:17061/cer"
  26. cac_url = "http://127.0.0.1:17062/cac"
  27. captcha_url = "http://127.0.0.1:17054/captcha"
  28. # 验证码类型:
  29. # 1: 拖拽还原拼图(多个拼图干扰)
  30. # 2: 拖拽还原图片
  31. # 3: 点击中文(带提示文字图片)
  32. # 4: 点击中文(按语序)
  33. # 5: 点击中文(带提示文字)
  34. # 6: 计算算式(带中文)
  35. # 7: 验证码类型分类
  36. # 接口配置
  37. app = Flask(__name__)
  38. @app.route('/captcha', methods=['POST'])
  39. def captcha():
  40. start_time = time.time()
  41. logging.info("into captcha_interface captcha")
  42. try:
  43. # 接收网络数据
  44. if not request.form:
  45. logging.info("captcha no data!")
  46. return json.dumps({"success": False, "cost": time.time()-start_time})
  47. base64_data = request.form.get("base64pic")
  48. base64_data2 = request.form.get("base64pic2")
  49. tip_char = request.form.get("char")
  50. code = int(request.form.get("code"))
  51. if tip_char:
  52. logging.info("tip_char " + str(tip_char))
  53. logging.info("code " + str(code))
  54. logging.info("captcha_interface get data time" + str(time.time()-start_time))
  55. if base64_data2 is not None:
  56. result = get_captcha_result([base64_data, base64_data2], code)
  57. elif tip_char is not None:
  58. result = get_captcha_result([base64_data, tip_char], code)
  59. else:
  60. result = get_captcha_result([base64_data], code)
  61. if result is None:
  62. logging.info("success False")
  63. return json.dumps({"success": False, "cost": time.time()-start_time})
  64. logging.info("success True")
  65. return json.dumps({"predict": result, "success": True, "cost": time.time()-start_time})
  66. except:
  67. traceback.print_exc()
  68. logging.info("success False")
  69. return json.dumps({"success": False, "cost": time.time()-start_time})
  70. finally:
  71. logging.info("captcha interface finish time " + str(time.time()-start_time))
  72. def get_captcha_result(base64_list, code):
  73. """
  74. 验证码类型:
  75. 1: 拖拽还原拼图(多个拼图干扰)
  76. 2: 拖拽还原图片
  77. 3: 点击中文(带提示文字图片)
  78. 4: 点击中文(按语序)
  79. 5: 点击中文(带提示文字)
  80. 6: 计算算式(带中文)
  81. 7: 验证码类型分类
  82. 8: 识别图中中文
  83. :param base64_list: base64格式图片列表
  84. :param code: 验证码类型
  85. :return:
  86. """
  87. if code == 1:
  88. result = json.loads(request_post(pzd_url, prepare_data(base64_list[0], "pzd")))
  89. if result.get("success"):
  90. predict = result.get("data")
  91. logging.info("code " + str(code) + " pzd " + "predict " + str(predict))
  92. if len(predict) <= 1:
  93. return predict
  94. else:
  95. h_list = get_puzzle_tip_location(base64_list[1])
  96. min_len = 10000
  97. new_predict = None
  98. for box in predict:
  99. if abs(box[1]-h_list[0]) + abs(box[3]-h_list[1]) < min_len:
  100. new_predict = [box]
  101. min_len = abs(box[1]-h_list[0]) + abs(box[3]-h_list[1])
  102. logging.info("code " + str(code) + " pzd " + "match predict " + str(new_predict))
  103. return new_predict
  104. elif code == 2:
  105. result = json.loads(request_post(bdr_url, prepare_data(base64_list[0], "bdr")))
  106. if result.get("success"):
  107. predict = result.get("data")
  108. logging.info("code " + str(code) + " bdr " + "predict " + str(predict))
  109. return predict
  110. elif code == 3:
  111. # detect tips
  112. result = json.loads(request_post(chd_url, prepare_data(base64_list[1], "chd2")))
  113. if result.get("success"):
  114. box_list_tips = result.get("data")
  115. else:
  116. return None
  117. logging.info("code " + str(code) + " chd2 " + "predict " + str(box_list_tips))
  118. if len(box_list_tips) == 0:
  119. return None
  120. # recognize tips
  121. result = json.loads(request_post(chr_url, prepare_data([base64_list[1], box_list_tips], "chr")))
  122. if result.get("success"):
  123. char_list_tips = result.get("data")
  124. else:
  125. return None
  126. logging.info("code " + str(code) + " chr " + "predict " + str(char_list_tips))
  127. if len(char_list_tips) != len(box_list_tips):
  128. return None
  129. # detect
  130. result = json.loads(request_post(chd_url, prepare_data(base64_list[0], "chd")))
  131. if result.get("success"):
  132. box_list = result.get("data")
  133. else:
  134. return None
  135. logging.info("code " + str(code) + " chd " + "predict " + str(box_list))
  136. if len(box_list) == 0:
  137. return None
  138. # recognize
  139. result = json.loads(request_post(chr_url, prepare_data([base64_list[0], box_list], "chr")))
  140. if result.get("success"):
  141. char_list = result.get("data")
  142. else:
  143. return None
  144. logging.info("code " + str(code) + " chr " + "predict " + str(char_list))
  145. if len(char_list) != len(box_list):
  146. return None
  147. for c in char_list_tips:
  148. if c not in char_list:
  149. return None
  150. _dict = {char_list[i]: box_list[i] for i in range(len(box_list))}
  151. predict = [_dict.get(x) for x in char_list_tips]
  152. return predict
  153. elif code == 4:
  154. # detect
  155. result = json.loads(request_post(chd_url, prepare_data(base64_list[0], "chd")))
  156. if result.get("success"):
  157. box_list = result.get("data")
  158. else:
  159. return None
  160. logging.info("code " + str(code) + " chd " + "predict " + str(box_list))
  161. if len(box_list) == 0:
  162. return None
  163. # recognize
  164. result = json.loads(request_post(chr_url, prepare_data([base64_list[0], box_list], "chr")))
  165. if result.get("success"):
  166. char_list = result.get("data")
  167. else:
  168. return None
  169. logging.info("code " + str(code) + " chr " + "predict " + str(char_list))
  170. if len(char_list) != len(box_list):
  171. return None
  172. # order
  173. result = json.loads(request_post(cho_url, prepare_data(char_list, "cho")))
  174. if result.get("success"):
  175. ordered_char_list = result.get("data")
  176. else:
  177. return None
  178. logging.info("code " + str(code) + " cho " + "predict " + str(ordered_char_list))
  179. _dict = {char_list[i]: box_list[i] for i in range(len(box_list))}
  180. predict = [_dict.get(x) for x in ordered_char_list]
  181. return predict
  182. elif code == 5:
  183. # detect
  184. result = json.loads(request_post(chd_url, prepare_data(base64_list[0], "chd")))
  185. if result.get("success"):
  186. box_list = result.get("data")
  187. else:
  188. return None
  189. logging.info("code " + str(code) + " chd " + "predict " + str(box_list))
  190. if len(box_list) == 0:
  191. return None
  192. # recognize
  193. result = json.loads(request_post(chr_url, prepare_data([base64_list[0], box_list], "chr")))
  194. if result.get("success"):
  195. char_list = result.get("data")
  196. else:
  197. return None
  198. logging.info("code " + str(code) + " chr " + "predict " + str(char_list))
  199. if len(char_list) != len(box_list):
  200. return None
  201. tip_char = base64_list[1]
  202. _dict = {char_list[i]: box_list[i] for i in range(len(box_list))}
  203. predict = [_dict.get(x) for x in tip_char]
  204. # guess
  205. if predict.count(None) == 1:
  206. guess_list = []
  207. for c in char_list:
  208. if c not in tip_char:
  209. guess_list.append(_dict.get(c))
  210. # print("guess_list", guess_list)
  211. predict[predict.index(None)] = random.choice(guess_list)
  212. return predict
  213. if None in predict:
  214. return None
  215. else:
  216. return predict
  217. elif code == 6:
  218. # denoise
  219. # result = json.loads(request_post(ced_url, prepare_data(base64_list[0], "ced")))
  220. # if result.get("success"):
  221. # image_np = str_to_image(result.get("data"))
  222. # else:
  223. # return None
  224. # logging.info("code " + str(code) + " ced " + "predict success")
  225. # recognize
  226. result = json.loads(request_post(cer_url, prepare_data(base64_list[0], "cer")))
  227. if result.get("success"):
  228. equation_result = result.get("data")
  229. else:
  230. return None
  231. logging.info("code " + str(code) + " cer " + "predict " + str(equation_result))
  232. return equation_result
  233. elif code == 7:
  234. result = json.loads(request_post(cac_url, prepare_data(base64_list[0], "cac")))
  235. if result.get("success"):
  236. predict = result.get("data")
  237. logging.info("code " + str(code) + " cac " + "predict " + str(predict))
  238. return predict
  239. elif code == 8:
  240. # detect
  241. result = json.loads(request_post(chd_url, prepare_data(base64_list[0], "chd")))
  242. if result.get("success"):
  243. box_list = result.get("data")
  244. else:
  245. return None
  246. logging.info("code " + str(code) + " chd " + "predict " + str(box_list))
  247. if len(box_list) == 0:
  248. return None
  249. # recognize
  250. result = json.loads(request_post(chr_url, prepare_data([base64_list[0], box_list], "chr")))
  251. if result.get("success"):
  252. char_list = result.get("data")
  253. else:
  254. return None
  255. logging.info("code " + str(code) + " chr " + "predict " + str(char_list))
  256. if len(char_list) != len(box_list):
  257. return None
  258. return char_list
  259. return None
  260. def prepare_data(data, _type):
  261. if _type in ['pzd', 'bdr', 'chd', 'ced', 'cac', 'cer']:
  262. return {"data": data}
  263. elif _type in ['chd2']:
  264. return {"data": data, "tips": 1}
  265. elif _type in ['chr']:
  266. image_base64 = data[0]
  267. box_list = data[1]
  268. image_bytes = base64_decode(image_base64)
  269. image_np = bytes2np(image_bytes)
  270. str_list = []
  271. for box in box_list:
  272. file_str = image_to_str(image_np[box[1]:box[3], box[0]:box[2], :])
  273. # file_bytes = np2bytes(image_np[box[1]:box[3], box[0]:box[2], :])
  274. # file_base64 = base64.b64encode(file_bytes)
  275. # file_str = file_base64.decode("utf-8")
  276. str_list.append(file_str)
  277. return {"data": json.dumps(str_list)}
  278. elif _type in ['cho']:
  279. return {"data": json.dumps(data)}
  280. # elif _type in ['cer']:
  281. # data = image_to_str(data)
  282. # return {"data": json.dumps(data)}
  283. else:
  284. raise
  285. def test_interface(from_remote=True):
  286. captcha_url = "http://192.168.2.103:17054/captcha"
  287. # captcha_url = "http://127.0.0.1:17054/captcha"
  288. # paths = glob("D:/Project/captcha/data/test/yolo_15.jpg")
  289. # paths = glob("D:/Project/captcha/data/test/FileInfo1021/19584571-511d-11ed-93ac-b4b5b67760ae_3.jpg")
  290. # paths = glob("yolo_15.jpg")
  291. # paths = glob(r'C:\Users\Administrator\Desktop\test_capture\puzzle8.png')
  292. # paths = glob("D:/Project/captcha/data/equation1/227_1.jpg")
  293. paths = glob(r'C:\Users\Administrator\Downloads\1.jpg')
  294. for file_path in paths:
  295. img_np = cv2.imread(file_path)
  296. file_bytes = np2bytes(img_np)
  297. file_base64 = base64.b64encode(file_bytes)
  298. # img_np = None
  299. # with open(file_path, 'r') as f:
  300. # file_base64 = f.read()
  301. code = 6
  302. if from_remote:
  303. if code in [1]:
  304. file_base64_2 = cv2.imread(r"C:\Users\Administrator\Desktop\test_capture\puzzle7.png")
  305. file_base64_2 = np2bytes(file_base64_2)
  306. file_base64_2 = base64.b64encode(file_base64_2)
  307. file_json = {"base64pic": file_base64, 'base64pic2': file_base64_2, "code": code}
  308. result = json.loads(request_post(captcha_url, file_json))
  309. print("result", result)
  310. if result.get("success"):
  311. out_boxes = result.get("predict")
  312. print("out_boxes", out_boxes)
  313. for box in out_boxes:
  314. cv2.rectangle(img_np, (box[0], box[1]), (box[2], box[3]), (0, 0, 255))
  315. cv2.imshow("img_np", img_np)
  316. cv2.waitKey(0)
  317. else:
  318. print("failed!")
  319. elif code in [4]:
  320. file_json = {"base64pic": file_base64, "code": code}
  321. result = json.loads(request_post(captcha_url, file_json))
  322. print("result", result)
  323. if result.get("success"):
  324. out_boxes = result.get("predict")
  325. print("out_boxes", out_boxes)
  326. for box in out_boxes:
  327. cv2.rectangle(img_np, (box[0], box[1]), (box[2], box[3]), (0, 0, 255))
  328. cv2.imshow("img_np", img_np)
  329. cv2.waitKey(0)
  330. else:
  331. print("failed!")
  332. elif code in [2]:
  333. file_json = {"base64pic": file_base64, "code": code}
  334. result = json.loads(request_post(captcha_url, file_json))
  335. print("result", result)
  336. if result.get("success"):
  337. w = int(result.get("predict"))
  338. print("w", w)
  339. img_new = np.concatenate([img_np[:, w:, :], img_np[:, :w, :]], axis=1)
  340. cv2.imshow("img_np", img_np)
  341. cv2.imshow("img_new", img_new)
  342. cv2.waitKey(0)
  343. else:
  344. print("failed!")
  345. elif code in [3]:
  346. file_base64_2 = cv2.imread("../dev/click_captcha/data/test/yolo_3.jpg")
  347. cv2.imshow("file_base64_2", file_base64_2)
  348. cv2.waitKey(0)
  349. file_base64_2 = np2bytes(file_base64_2)
  350. file_base64_2 = base64.b64encode(file_base64_2)
  351. file_json = {"base64pic": file_base64, "base64pic2": file_base64_2, "code": code}
  352. result = json.loads(request_post(captcha_url, file_json))
  353. print("result", result)
  354. if result.get("success"):
  355. out_boxes = result.get("predict")
  356. print("out_boxes", out_boxes)
  357. for box in out_boxes:
  358. cv2.rectangle(img_np, (box[0], box[1]), (box[2], box[3]), (0, 0, 255))
  359. cv2.imshow("img_np", img_np)
  360. cv2.waitKey(0)
  361. else:
  362. print("failed!")
  363. elif code in [5]:
  364. tip_char = "蠢贱榨"
  365. file_json = {"base64pic": file_base64, "char": tip_char, "code": code}
  366. result = json.loads(request_post(captcha_url, file_json))
  367. print("result", result)
  368. if result.get("success"):
  369. out_boxes = result.get("predict")
  370. print("out_boxes", out_boxes)
  371. for box in out_boxes:
  372. cv2.rectangle(img_np, (box[0], box[1]), (box[2], box[3]), (0, 0, 255))
  373. cv2.imshow("img_np", img_np)
  374. cv2.waitKey(0)
  375. elif code in [6]:
  376. file_json = {"base64pic": file_base64, "code": code}
  377. result = json.loads(request_post(captcha_url, file_json))
  378. print("result", result)
  379. if result.get("success"):
  380. equation_result = result.get("predict")
  381. print(equation_result)
  382. elif code in [7]:
  383. file_json = {"base64pic": file_base64, "code": code}
  384. result = json.loads(request_post(captcha_url, file_json))
  385. print("result", result)
  386. if result.get("success"):
  387. classify_result = result.get("predict")
  388. print(classify_result)
  389. elif code in [8]:
  390. file_json = {"base64pic": file_base64, "code": code}
  391. result = json.loads(request_post(captcha_url, file_json))
  392. print("result", result)
  393. if result.get("success"):
  394. chars = result.get("predict")
  395. print("chars", chars)
  396. else:
  397. print("failed!")
  398. if __name__ == "__main__":
  399. # app.run(host='127.0.0.1', port=17054, debug=False)
  400. test_interface()