captcha_interface.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. # encoding:utf-8
  2. import base64
  3. import json
  4. import logging
  5. import os
  6. import random
  7. import sys
  8. import time
  9. import traceback
  10. from glob import glob
  11. import numpy as np
  12. import cv2
  13. os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  14. sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../")
  15. from flask import Flask, request
  16. from utils import request_post, bytes2np, np2bytes, base64_decode, image_to_str, str_to_image
  17. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  18. bdr_url = "http://127.0.0.1:17055/bdr"
  19. chd_url = "http://127.0.0.1:17056/chd"
  20. chr_url = "http://127.0.0.1:17057/chr"
  21. cho_url = "http://127.0.0.1:17058/cho"
  22. pzd_url = "http://127.0.0.1:17059/pzd"
  23. ced_url = "http://127.0.0.1:17060/ced"
  24. cer_url = "http://127.0.0.1:17061/cer"
  25. captcha_url = "http://127.0.0.1:17054/captcha"
  26. # 验证码类型:
  27. # 1: 拖拽还原拼图
  28. # 2: 拖拽还原图片
  29. # 3: 点击中文(带提示文字图片)
  30. # 4: 点击中文(按语序)
  31. # 5: 点击中文(带提示文字)
  32. # 6: 计算算式(带中文)
  33. # 接口配置
  34. app = Flask(__name__)
  35. @app.route('/captcha', methods=['POST'])
  36. def captcha():
  37. start_time = time.time()
  38. logging.info("into captcha_interface captcha")
  39. try:
  40. # 接收网络数据
  41. if not request.form:
  42. logging.info("captcha no data!")
  43. return json.dumps({"success": False, "cost": time.time()-start_time})
  44. base64_data = request.form.get("base64pic")
  45. base64_data2 = request.form.get("base64pic2")
  46. tip_char = request.form.get("char")
  47. code = int(request.form.get("code"))
  48. if tip_char:
  49. logging.info("tip_char " + str(tip_char))
  50. logging.info("code " + str(code))
  51. logging.info("captcha_interface get data time" + str(time.time()-start_time))
  52. if base64_data2 is not None:
  53. result = get_captcha_result([base64_data, base64_data2], code)
  54. elif tip_char is not None:
  55. result = get_captcha_result([base64_data, tip_char], code)
  56. else:
  57. result = get_captcha_result([base64_data], code)
  58. if result is None:
  59. logging.info("success False")
  60. return json.dumps({"success": False, "cost": time.time()-start_time})
  61. logging.info("success True")
  62. return json.dumps({"predict": result, "success": True, "cost": time.time()-start_time})
  63. except:
  64. traceback.print_exc()
  65. logging.info("success False")
  66. return json.dumps({"success": False, "cost": time.time()-start_time})
  67. finally:
  68. logging.info("captcha interface finish time " + str(time.time()-start_time))
  69. def get_captcha_result(base64_list, code):
  70. """
  71. 验证码类型:
  72. 1: 拖拽还原拼图
  73. 2: 拖拽还原图片
  74. 3: 点击中文(带提示文字图片)
  75. 4: 点击中文(按语序)
  76. 5: 点击中文(带提示文字)
  77. 6: 计算算式(带中文)
  78. :param base64_list: base64格式图片列表
  79. :param code: 验证码类型
  80. :return:
  81. """
  82. if code == 1:
  83. result = json.loads(request_post(pzd_url, prepare_data(base64_list[0], "pzd")))
  84. if result.get("success"):
  85. predict = result.get("data")
  86. logging.info("code " + str(code) + " pzd " + "predict " + str(predict))
  87. return predict
  88. elif code == 2:
  89. result = json.loads(request_post(bdr_url, prepare_data(base64_list[0], "bdr")))
  90. if result.get("success"):
  91. predict = result.get("data")
  92. logging.info("code " + str(code) + " bdr " + "predict " + str(predict))
  93. return predict
  94. elif code == 3:
  95. # detect tips
  96. result = json.loads(request_post(chd_url, prepare_data(base64_list[1], "chd2")))
  97. if result.get("success"):
  98. box_list_tips = result.get("data")
  99. else:
  100. return None
  101. logging.info("code " + str(code) + " chd2 " + "predict " + str(box_list_tips))
  102. if len(box_list_tips) == 0:
  103. return None
  104. # recognize tips
  105. result = json.loads(request_post(chr_url, prepare_data([base64_list[1], box_list_tips], "chr")))
  106. if result.get("success"):
  107. char_list_tips = result.get("data")
  108. else:
  109. return None
  110. logging.info("code " + str(code) + " chr " + "predict " + str(char_list_tips))
  111. if len(char_list_tips) != len(box_list_tips):
  112. return None
  113. # detect
  114. result = json.loads(request_post(chd_url, prepare_data(base64_list[0], "chd")))
  115. if result.get("success"):
  116. box_list = result.get("data")
  117. else:
  118. return None
  119. logging.info("code " + str(code) + " chd " + "predict " + str(box_list))
  120. if len(box_list) == 0:
  121. return None
  122. # recognize
  123. result = json.loads(request_post(chr_url, prepare_data([base64_list[0], box_list], "chr")))
  124. if result.get("success"):
  125. char_list = result.get("data")
  126. else:
  127. return None
  128. logging.info("code " + str(code) + " chr " + "predict " + str(char_list))
  129. if len(char_list) != len(box_list):
  130. return None
  131. for c in char_list_tips:
  132. if c not in char_list:
  133. return None
  134. _dict = {char_list[i]: box_list[i] for i in range(len(box_list))}
  135. predict = [_dict.get(x) for x in char_list_tips]
  136. return predict
  137. elif code == 4:
  138. # detect
  139. result = json.loads(request_post(chd_url, prepare_data(base64_list[0], "chd")))
  140. if result.get("success"):
  141. box_list = result.get("data")
  142. else:
  143. return None
  144. logging.info("code " + str(code) + " chd " + "predict " + str(box_list))
  145. if len(box_list) == 0:
  146. return None
  147. # recognize
  148. result = json.loads(request_post(chr_url, prepare_data([base64_list[0], box_list], "chr")))
  149. if result.get("success"):
  150. char_list = result.get("data")
  151. else:
  152. return None
  153. logging.info("code " + str(code) + " chr " + "predict " + str(char_list))
  154. if len(char_list) != len(box_list):
  155. return None
  156. # order
  157. result = json.loads(request_post(cho_url, prepare_data(char_list, "cho")))
  158. if result.get("success"):
  159. ordered_char_list = result.get("data")
  160. else:
  161. return None
  162. logging.info("code " + str(code) + " cho " + "predict " + str(ordered_char_list))
  163. _dict = {char_list[i]: box_list[i] for i in range(len(box_list))}
  164. predict = [_dict.get(x) for x in ordered_char_list]
  165. return predict
  166. elif code == 5:
  167. # detect
  168. result = json.loads(request_post(chd_url, prepare_data(base64_list[0], "chd")))
  169. if result.get("success"):
  170. box_list = result.get("data")
  171. else:
  172. return None
  173. logging.info("code " + str(code) + " chd " + "predict " + str(box_list))
  174. if len(box_list) == 0:
  175. return None
  176. # recognize
  177. result = json.loads(request_post(chr_url, prepare_data([base64_list[0], box_list], "chr")))
  178. if result.get("success"):
  179. char_list = result.get("data")
  180. else:
  181. return None
  182. logging.info("code " + str(code) + " chr " + "predict " + str(char_list))
  183. if len(char_list) != len(box_list):
  184. return None
  185. tip_char = base64_list[1]
  186. _dict = {char_list[i]: box_list[i] for i in range(len(box_list))}
  187. predict = [_dict.get(x) for x in tip_char]
  188. # guess
  189. if predict.count(None) == 1:
  190. guess_list = []
  191. for c in char_list:
  192. if c not in tip_char:
  193. guess_list.append(_dict.get(c))
  194. # print("guess_list", guess_list)
  195. predict[predict.index(None)] = random.choice(guess_list)
  196. return predict
  197. if None in predict:
  198. return None
  199. else:
  200. return predict
  201. elif code == 6:
  202. # denoise
  203. result = json.loads(request_post(ced_url, prepare_data(base64_list[0], "ced")))
  204. if result.get("success"):
  205. image_np = str_to_image(result.get("data"))
  206. else:
  207. return None
  208. logging.info("code " + str(code) + " ced " + "predict success")
  209. # recognize
  210. result = json.loads(request_post(cer_url, prepare_data([image_np], "cer")))
  211. if result.get("success"):
  212. equation_result = result.get("data")
  213. else:
  214. return None
  215. logging.info("code " + str(code) + " cer " + "predict " + str(equation_result))
  216. return equation_result
  217. return None
  218. def prepare_data(data, _type):
  219. if _type in ['pzd', 'bdr', 'chd', 'ced']:
  220. return {"data": data}
  221. elif _type in ['chd2']:
  222. return {"data": data, "tips": 1}
  223. elif _type in ['chr']:
  224. image_base64 = data[0]
  225. box_list = data[1]
  226. image_bytes = base64_decode(image_base64)
  227. image_np = bytes2np(image_bytes)
  228. str_list = []
  229. for box in box_list:
  230. file_str = image_to_str(image_np[box[1]:box[3], box[0]:box[2], :])
  231. # file_bytes = np2bytes(image_np[box[1]:box[3], box[0]:box[2], :])
  232. # file_base64 = base64.b64encode(file_bytes)
  233. # file_str = file_base64.decode("utf-8")
  234. str_list.append(file_str)
  235. return {"data": json.dumps(str_list)}
  236. elif _type in ['cho']:
  237. return {"data": json.dumps(data)}
  238. elif _type in ['cer']:
  239. data = image_to_str(data[0])
  240. return {"data": json.dumps(data)}
  241. else:
  242. raise
  243. def test_interface(from_remote=True):
  244. # captcha_url = "http://192.168.2.103:17054/captcha"
  245. captcha_url = "http://127.0.0.1:17054/captcha"
  246. # paths = glob("D:/Project/captcha/data/test/yolo_15.jpg")
  247. # paths = glob("D:/Project/captcha/data/test/FileInfo1021/19584571-511d-11ed-93ac-b4b5b67760ae_3.jpg")
  248. paths = glob("yolo_15.jpg")
  249. for file_path in paths:
  250. img_np = cv2.imread(file_path)
  251. file_bytes = np2bytes(img_np)
  252. file_base64 = base64.b64encode(file_bytes)
  253. code = 1
  254. if from_remote:
  255. if code in [1, 4]:
  256. file_json = {"base64pic": file_base64, "code": code}
  257. result = json.loads(request_post(captcha_url, file_json))
  258. print("result", result)
  259. if result.get("success"):
  260. out_boxes = result.get("predict")
  261. print("out_boxes", out_boxes)
  262. for box in out_boxes:
  263. cv2.rectangle(img_np, (box[0], box[1]), (box[2], box[3]), (0, 0, 255))
  264. cv2.imshow("img_np", img_np)
  265. cv2.waitKey(0)
  266. else:
  267. print("failed!")
  268. elif code in [2]:
  269. file_json = {"base64pic": file_base64, "code": code}
  270. result = json.loads(request_post(captcha_url, file_json))
  271. print("result", result)
  272. if result.get("success"):
  273. w = int(result.get("predict"))
  274. print("w", w)
  275. img_new = np.concatenate([img_np[:, w:, :], img_np[:, :w, :]], axis=1)
  276. cv2.imshow("img_np", img_np)
  277. cv2.imshow("img_new", img_new)
  278. cv2.waitKey(0)
  279. else:
  280. print("failed!")
  281. elif code in [3]:
  282. file_base64_2 = cv2.imread("../dev/click_captcha/data/test/yolo_3.jpg")
  283. cv2.imshow("file_base64_2", file_base64_2)
  284. cv2.waitKey(0)
  285. file_base64_2 = np2bytes(file_base64_2)
  286. file_base64_2 = base64.b64encode(file_base64_2)
  287. file_json = {"base64pic": file_base64, "base64pic2": file_base64_2, "code": code}
  288. result = json.loads(request_post(captcha_url, file_json))
  289. print("result", result)
  290. if result.get("success"):
  291. out_boxes = result.get("predict")
  292. print("out_boxes", out_boxes)
  293. for box in out_boxes:
  294. cv2.rectangle(img_np, (box[0], box[1]), (box[2], box[3]), (0, 0, 255))
  295. cv2.imshow("img_np", img_np)
  296. cv2.waitKey(0)
  297. else:
  298. print("failed!")
  299. elif code in [5]:
  300. tip_char = "蠢贱榨"
  301. file_json = {"base64pic": file_base64, "char": tip_char, "code": code}
  302. result = json.loads(request_post(captcha_url, file_json))
  303. print("result", result)
  304. if result.get("success"):
  305. out_boxes = result.get("predict")
  306. print("out_boxes", out_boxes)
  307. for box in out_boxes:
  308. cv2.rectangle(img_np, (box[0], box[1]), (box[2], box[3]), (0, 0, 255))
  309. cv2.imshow("img_np", img_np)
  310. cv2.waitKey(0)
  311. elif code in [6]:
  312. file_json = {"base64pic": file_base64, "code": code}
  313. result = json.loads(request_post(captcha_url, file_json))
  314. print("result", result)
  315. if result.get("success"):
  316. equation_result = result.get("predict")
  317. print(equation_result)
  318. if __name__ == "__main__":
  319. # app.run(host='127.0.0.1', port=17054, debug=False)
  320. test_interface()