ocr_gpu_interface.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. # encoding=utf8
  2. import argparse
  3. import base64
  4. import io
  5. import json
  6. import pickle
  7. import sys
  8. import os
  9. import threading
  10. import numpy as np
  11. import redis
  12. sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../")
  13. import time
  14. import traceback
  15. os.environ['FLAGS_eager_delete_tensor_gb'] = '0'
  16. from format_convert.utils import request_post, test_gpu, get_intranet_ip, log, get_md5_from_bytes, \
  17. to_share_memory, from_share_memory, get_np_type, namespace_to_dict, get_share_memory_list, get_ip_port, \
  18. release_share_memory, get_share_memory, close_share_memory_list
  19. from flask import Flask, request
  20. from format_convert import _global
  21. from ocr.tools.infer import utility
  22. from ocr.ppocr.utils.logging import get_logger
  23. logger = get_logger()
  24. # 接口配置
  25. app = Flask(__name__)
  26. ocr_model_dir = os.path.dirname(os.path.abspath(__file__)) + "/model/2.0/"
  27. lock = threading.RLock()
  28. ip_port_dict = get_ip_port()
  29. ip = 'http://127.0.0.1'
  30. ocr_port_list = ip_port_dict.get(ip).get("ocr")
  31. otr_port_list = ip_port_dict.get(ip).get("otr")
  32. # redis_db = redis.StrictRedis(host='127.0.0.1', port='6379',
  33. # db=1, password='bidi123456', health_check_interval=300)
  34. redis_db = None
  35. # @app.route('/ocr', methods=['POST'])
  36. def _ocr_gpu_flask():
  37. start_time = time.time()
  38. log("into _ocr")
  39. _global._init()
  40. _global.update({"port": globals().get("port")})
  41. log("into _ocr -> _global " + str(time.time()-start_time))
  42. start_time = time.time()
  43. try:
  44. if not request.form:
  45. log("ocr no data!")
  46. return json.dumps({"text": str([-9]), "bbox": str([-9])})
  47. log("judge request.form " + str(time.time()-start_time))
  48. start_time1 = time.time()
  49. result = pickle.loads(base64.b64decode(request.form.get("data")))
  50. # 解压
  51. inputs = result.get("inputs")
  52. # 解压numpy
  53. decompressed_array = io.BytesIO()
  54. decompressed_array.write(inputs)
  55. decompressed_array.seek(0)
  56. inputs = np.load(decompressed_array, allow_pickle=True)['arr_0']
  57. log("inputs.shape" + str(inputs.shape))
  58. args = result.get("args")
  59. predictor_type = result.get("predictor_type")
  60. model_type = result.get("model_type")
  61. _md5 = result.get("md5")
  62. _global.update({"md5": _md5})
  63. log("read data " + str(time.time()-start_time1))
  64. # 获取对应predictor
  65. if globals().get(predictor_type) is None:
  66. start_time1 = time.time()
  67. log("=== init " + model_type + " " + predictor_type + " model ===")
  68. predictor, input_tensor, output_tensors = \
  69. utility.create_predictor(args, predictor_type, logger)
  70. globals().update({predictor_type: {"predictor": predictor,
  71. "input_tensor": input_tensor,
  72. "output_tensors": output_tensors}})
  73. log("=== init " + model_type + " " + predictor_type + " model " + str(round(time.time()-start_time1, 2)) + " ===")
  74. else:
  75. predictor = globals().get(predictor_type).get("predictor")
  76. input_tensor = globals().get(predictor_type).get("input_tensor")
  77. output_tensors = globals().get(predictor_type).get("output_tensors")
  78. # 设置模型输入,运行
  79. input_tensor.copy_from_cpu(inputs)
  80. with lock:
  81. start_time1 = time.time()
  82. predictor.run()
  83. gpu_time = round(float(time.time()-start_time1), 2)
  84. # 获取模型输出
  85. outputs = []
  86. for output_tensor in output_tensors:
  87. output = output_tensor.copy_to_cpu()
  88. outputs.append(output)
  89. preds = outputs[0]
  90. # 压缩numpy
  91. compressed_array = io.BytesIO()
  92. np.savez_compressed(compressed_array, preds)
  93. compressed_array.seek(0)
  94. preds = compressed_array.read()
  95. # 释放内存
  96. predictor.clear_intermediate_tensor()
  97. predictor.try_shrink_memory()
  98. finish_time = round(float(time.time()-start_time), 2)
  99. log("ocr model predict time - " + str(predictor_type) + " - " + str(gpu_time) + " " + str(finish_time))
  100. return base64.b64encode(pickle.dumps({"preds": preds, "gpu_time": gpu_time, "elapse": finish_time}))
  101. except Exception as e:
  102. finish_time = round(float(time.time()-start_time), 2)
  103. traceback.print_exc()
  104. return base64.b64encode(pickle.dumps({"preds": None, "gpu_time": 0., "elapse": finish_time}))
  105. def _ocr_gpu_redis():
  106. start_time = time.time()
  107. log("into _ocr")
  108. _global._init()
  109. _global.update({"port": globals().get("port")})
  110. log("into _ocr -> _global " + str(time.time()-start_time))
  111. while True:
  112. start_time = time.time()
  113. try:
  114. if redis_db.llen("producer_ocr") == 0:
  115. continue
  116. log("judge llen " + str(time.time()-start_time))
  117. _time = time.time()
  118. result = redis_db.lpop("producer_ocr")
  119. if result is None:
  120. continue
  121. result = pickle.loads(result)
  122. log("from producer_ocr time " + str(time.time() - _time))
  123. _time = time.time()
  124. inputs = result.get("inputs")
  125. # # 解压numpy
  126. # decompressed_array = io.BytesIO()
  127. # decompressed_array.write(inputs)
  128. # decompressed_array.seek(0)
  129. # inputs = np.load(decompressed_array, allow_pickle=True)['arr_0']
  130. # log("inputs.shape " + str(inputs.shape))
  131. # log("numpy decompress " + str(time.time()-_time))
  132. args = result.get("args")
  133. _uuid = result.get("uuid")
  134. predictor_type = result.get("predictor_type")
  135. model_type = result.get("model_type")
  136. _md5 = result.get("md5")
  137. _global.update({"md5": _md5})
  138. log("read data " + str(time.time()-_time))
  139. # 获取对应predictor
  140. if globals().get(predictor_type) is None:
  141. start_time1 = time.time()
  142. log("=== init " + model_type + " " + predictor_type + " model ===")
  143. predictor, input_tensor, output_tensors = \
  144. utility.create_predictor(args, predictor_type, logger)
  145. globals().update({predictor_type: {"predictor": predictor,
  146. "input_tensor": input_tensor,
  147. "output_tensors": output_tensors}})
  148. log("=== init " + model_type + " " + predictor_type + " model " + str(round(time.time()-start_time1, 2)) + " ===")
  149. else:
  150. predictor = globals().get(predictor_type).get("predictor")
  151. input_tensor = globals().get(predictor_type).get("input_tensor")
  152. output_tensors = globals().get(predictor_type).get("output_tensors")
  153. # 设置模型输入,运行
  154. input_tensor.copy_from_cpu(inputs)
  155. start_time1 = time.time()
  156. predictor.run()
  157. gpu_time = round(float(time.time()-start_time1), 2)
  158. # 获取模型输出
  159. _time = time.time()
  160. outputs = []
  161. for output_tensor in output_tensors:
  162. output = output_tensor.copy_to_cpu()
  163. outputs.append(output)
  164. preds = outputs[0]
  165. log("output_tensors " + str(time.time()-_time))
  166. # # 压缩numpy
  167. # _time = time.time()
  168. # compressed_array = io.BytesIO()
  169. # np.savez_compressed(compressed_array, preds)
  170. # compressed_array.seek(0)
  171. # preds = compressed_array.read()
  172. # log("numpy compress " + str(time.time()-_time))
  173. # 写入redis
  174. finish_time = round(float(time.time()-start_time), 2)
  175. _time = time.time()
  176. redis_db.hset("consumer_ocr", _uuid, pickle.dumps({"preds": preds, "gpu_time": gpu_time, "elapse": finish_time}))
  177. log("to consumer_ocr " + str(time.time()-_time))
  178. # 释放内存
  179. predictor.clear_intermediate_tensor()
  180. predictor.try_shrink_memory()
  181. log("ocr model predict time - " + str(predictor_type) + " - " + str(gpu_time) + " " + str(finish_time))
  182. except Exception as e:
  183. traceback.print_exc()
  184. # @app.route('/ocr', methods=['POST'])
  185. def _ocr_gpu_flask_sm():
  186. start_time = time.time()
  187. log("into _ocr")
  188. _global._init()
  189. _global.update({"port": globals().get("port")})
  190. log("into _ocr -> _global " + str(time.time()-start_time))
  191. start_time = time.time()
  192. try:
  193. if not request.form:
  194. log("ocr no data!")
  195. return json.dumps({"text": str([-9]), "bbox": str([-9])})
  196. log("judge request.form " + str(time.time()-start_time))
  197. _time = time.time()
  198. result = json.loads(request.form.get("data"))
  199. predictor_type = result.get("predictor_type")
  200. model_type = result.get("model_type")
  201. args = result.get("args")
  202. args = namespace_to_dict(args, reverse=True)
  203. _md5 = result.get("md5")
  204. sm_name = result.get("sm_name")
  205. sm_shape = result.get("sm_shape")
  206. sm_dtype = result.get("sm_dtype")
  207. sm_dtype = get_np_type(sm_dtype)
  208. _global.update({"md5": _md5})
  209. log("read data " + str(time.time()-_time))
  210. # 读取共享内存
  211. _time = time.time()
  212. if sm_name:
  213. inputs = from_share_memory(sm_name, sm_shape, sm_dtype)
  214. else:
  215. log("from_share_memory failed!")
  216. raise Exception
  217. log("data from share memory " + sm_name + " " + str(time.time()-_time))
  218. # 获取对应predictor
  219. if globals().get(predictor_type) is None:
  220. start_time1 = time.time()
  221. log("=== init " + model_type + " " + predictor_type + " model ===")
  222. predictor, input_tensor, output_tensors = \
  223. utility.create_predictor(args, predictor_type, logger)
  224. globals().update({predictor_type: {"predictor": predictor,
  225. "input_tensor": input_tensor,
  226. "output_tensors": output_tensors}})
  227. log("=== init " + model_type + " " + predictor_type + " model " + str(round(time.time()-start_time1, 2)) + " ===")
  228. else:
  229. predictor = globals().get(predictor_type).get("predictor")
  230. input_tensor = globals().get(predictor_type).get("input_tensor")
  231. output_tensors = globals().get(predictor_type).get("output_tensors")
  232. _time = time.time()
  233. with lock:
  234. # 设置模型输入
  235. input_tensor.copy_from_cpu(inputs)
  236. # 运行
  237. predictor.run()
  238. # 获取模型输出
  239. outputs = []
  240. for output_tensor in output_tensors:
  241. output = output_tensor.copy_to_cpu()
  242. outputs.append(output)
  243. preds = outputs[0]
  244. gpu_time = round(float(time.time()-_time), 2)
  245. log("gpu_time " + str(gpu_time))
  246. _shape = preds.shape
  247. _dtype = str(preds.dtype)
  248. # 判断前一个读取完
  249. _time = time.time()
  250. while True:
  251. shm = globals().get("shm")
  252. if shm is None:
  253. break
  254. last_shape = globals().get("last_shape")
  255. sm_data = np.ndarray(last_shape, dtype=sm_dtype, buffer=shm.buf)
  256. if (sm_data == np.zeros(last_shape)).all():
  257. try:
  258. _time1 = time.time()
  259. shm.close()
  260. shm.unlink()
  261. log("release share memory " + str(time.time()-_time1))
  262. except FileNotFoundError:
  263. log("share memory " + shm.name + " not exists!")
  264. break
  265. log("wait for share memory being read " + str(time.time()-_time))
  266. # 数据放入共享内存
  267. _time = time.time()
  268. shm = to_share_memory(preds)
  269. globals().update({"shm": shm})
  270. globals().update({"last_shape": _shape})
  271. log("data into share memory " + str(shm.name) + " " + str(time.time()-_time))
  272. # 释放内存
  273. _time = time.time()
  274. predictor.clear_intermediate_tensor()
  275. predictor.try_shrink_memory()
  276. log("ocr shrink memory " + str(time.time()-_time))
  277. finish_time = round(float(time.time()-start_time), 2)
  278. log("ocr model predict time - " + str(predictor_type) + " - " + str(gpu_time) + " " + str(finish_time))
  279. return json.dumps({"gpu_time": gpu_time, "elapse": finish_time,
  280. "sm_name": shm.name, "sm_shape": _shape, "sm_dtype": _dtype})
  281. except Exception as e:
  282. finish_time = round(float(time.time()-start_time), 2)
  283. traceback.print_exc()
  284. return json.dumps({"gpu_time": gpu_time, "elapse": finish_time,
  285. "sm_name": None, "sm_shape": None, "sm_dtype": None})
  286. def _ocr():
  287. start_time = time.time()
  288. log("into _ocr")
  289. _global._init()
  290. _global.update({"port": globals().get("port")})
  291. log("into _ocr -> _global " + str(time.time()-start_time))
  292. start_time = time.time()
  293. sm_list_name = "sml_ocr_"+str(port)
  294. try:
  295. # 初始化模型
  296. for predictor_type in ["det", "cls", "rec"]:
  297. args = init_ocr_args()
  298. predictor, input_tensor, output_tensors = \
  299. utility.create_predictor(args, predictor_type, logger)
  300. globals().update({predictor_type: {"predictor": predictor,
  301. "input_tensor": input_tensor,
  302. "output_tensors": output_tensors}})
  303. if predictor == "det":
  304. inputs = np.zeros((1, 3, 1024, 1024), dtype=np.float32)
  305. else:
  306. inputs = np.zeros((30, 3, 32, 64), dtype=np.float32)
  307. # init model by running once
  308. input_tensor.copy_from_cpu(inputs)
  309. predictor.run()
  310. outputs = []
  311. for output_tensor in output_tensors:
  312. output = output_tensor.copy_to_cpu()
  313. outputs.append(output)
  314. log("finish init predictor " + predictor_type)
  315. # 循环判断是否有新数据需处理
  316. # full_sm_list = globals().get("sm_list")
  317. while True:
  318. try:
  319. full_sm_list = get_share_memory_list(sm_list_name=sm_list_name)
  320. except FileNotFoundError:
  321. full_sm_list = get_share_memory_list(sm_list_name=sm_list_name, list_size=10)
  322. try:
  323. if full_sm_list[0] == "1" and full_sm_list[-1] == "1":
  324. log("empty_sm_list[0] " + full_sm_list[0])
  325. log("empty_sm_list[-1] " + full_sm_list[-1])
  326. log("empty_sm_list[1] " + full_sm_list[1])
  327. log("wait for " + str(time.time()-start_time))
  328. break
  329. except ValueError:
  330. continue
  331. start_time = time.time()
  332. _time = time.time()
  333. _md5 = full_sm_list[1]
  334. model_type = full_sm_list[2]
  335. predictor_type = full_sm_list[3]
  336. args = full_sm_list[4]
  337. args = namespace_to_dict(eval(args), reverse=True)
  338. sm_name = full_sm_list[5]
  339. sm_shape = full_sm_list[6]
  340. sm_shape = eval(sm_shape)
  341. sm_dtype = full_sm_list[7]
  342. sm_dtype = get_np_type(sm_dtype)
  343. _global.update({"md5": _md5})
  344. log("read data " + str(time.time()-_time))
  345. # 读取共享内存
  346. _time = time.time()
  347. if sm_name:
  348. inputs = from_share_memory(sm_name, sm_shape, sm_dtype)
  349. log(predictor_type + " inputs shape " + str(inputs.shape))
  350. else:
  351. log("from_share_memory failed!")
  352. raise Exception
  353. log("data from share memory " + sm_name + " " + str(time.time()-_time))
  354. # 获取对应predictor
  355. if globals().get(predictor_type) is None:
  356. start_time1 = time.time()
  357. log("=== init " + model_type + " " + predictor_type + " model ===")
  358. predictor, input_tensor, output_tensors = \
  359. utility.create_predictor(args, predictor_type, logger)
  360. globals().update({predictor_type: {"predictor": predictor,
  361. "input_tensor": input_tensor,
  362. "output_tensors": output_tensors}})
  363. log("=== init " + model_type + " " + predictor_type + " model " + str(round(time.time()-start_time1, 2)) + " ===")
  364. else:
  365. predictor = globals().get(predictor_type).get("predictor")
  366. input_tensor = globals().get(predictor_type).get("input_tensor")
  367. output_tensors = globals().get(predictor_type).get("output_tensors")
  368. _time = time.time()
  369. with lock:
  370. # 设置模型输入
  371. input_tensor.copy_from_cpu(inputs)
  372. # 运行
  373. predictor.run()
  374. # 获取模型输出
  375. outputs = []
  376. for output_tensor in output_tensors:
  377. output = output_tensor.copy_to_cpu()
  378. outputs.append(output)
  379. preds = outputs[0]
  380. gpu_time = round(float(time.time()-_time), 2)
  381. log("gpu_time " + str(gpu_time))
  382. # 数据放入共享内存
  383. _time = time.time()
  384. # 先释放之前的同名share memory
  385. release_share_memory(get_share_memory(sm_name))
  386. # 写入共享内存
  387. shm = to_share_memory(preds, sm_name)
  388. full_sm_list[5] = shm.name
  389. full_sm_list[6] = str(preds.shape)
  390. full_sm_list[7] = str(preds.dtype)
  391. full_sm_list[8] = str(gpu_time)
  392. full_sm_list[-1] = "0"
  393. log("data into share memory " + str(shm.name) + " " + str(time.time()-_time))
  394. # 释放共享内存
  395. close_share_memory_list(full_sm_list)
  396. # 释放内存
  397. _time = time.time()
  398. predictor.clear_intermediate_tensor()
  399. predictor.try_shrink_memory()
  400. log("ocr shrink memory " + str(time.time()-_time))
  401. finish_time = round(float(time.time()-start_time), 2)
  402. log("ocr model predict time - " + str(predictor_type) + " - " + str(gpu_time) + " " + str(finish_time))
  403. except Exception as e:
  404. finish_time = round(float(time.time()-start_time), 2)
  405. traceback.print_exc()
  406. raise
  407. def init_ocr_args():
  408. return argparse.Namespace(
  409. use_gpu=True,
  410. ir_optim=True,
  411. use_tensorrt=False,
  412. gpu_mem=8000,
  413. image_dir='',
  414. det_algorithm='DB',
  415. det_model_dir=ocr_model_dir+"det",
  416. det_limit_side_len=1280,
  417. det_limit_type='max',
  418. det_db_thresh=0.1,
  419. # det_db_box_thresh 漏行 调小
  420. det_db_box_thresh=0.1,
  421. # det_db_unclip_ratio 检测框的贴近程度
  422. det_db_unclip_ratio=2.5,
  423. # 对文字膨胀操作
  424. use_dilation=False,
  425. det_east_score_thresh=0.8,
  426. det_east_cover_thresh=0.1,
  427. det_east_nms_thresh=0.2,
  428. rec_algorithm='CRNN',
  429. rec_model_dir=ocr_model_dir+"rec/ch",
  430. rec_image_shape="3, 32, 1000",
  431. rec_char_type='ch',
  432. rec_batch_num=30,
  433. max_text_length=128,
  434. rec_char_dict_path='./ppocr/utils/ppocr_keys_v1.txt',
  435. use_space_char=True,
  436. drop_score=0.5,
  437. cls_model_dir=ocr_model_dir+"cls",
  438. cls_image_shape="3, 32, 1000",
  439. label_list=['0', '180'],
  440. cls_batch_num=30,
  441. cls_thresh=0.9,
  442. enable_mkldnn=False,
  443. use_zero_copy_run=True,
  444. use_pdserving=False,
  445. lang='ch',
  446. det=True,
  447. rec=True,
  448. use_angle_cls=False)
  449. if __name__ == '__main__':
  450. if len(sys.argv) == 2:
  451. port = int(sys.argv[1])
  452. using_gpu_index = 0
  453. elif len(sys.argv) == 3:
  454. port = int(sys.argv[1])
  455. using_gpu_index = int(sys.argv[2])
  456. else:
  457. port = 17000
  458. using_gpu_index = 0
  459. _global._init()
  460. _global.update({"port": str(port)})
  461. globals().update({"port": str(port)})
  462. ip = get_intranet_ip()
  463. os.environ['CUDA_VISIBLE_DEVICES'] = str(using_gpu_index)
  464. # app.run(host='0.0.0.0', port=port, processes=1, threaded=False, debug=False)
  465. # app.run()
  466. # log("OCR running "+str(port))
  467. while True:
  468. _ocr()