convert_need_interface.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611
  1. # encoding=utf8
  2. import base64
  3. import inspect
  4. import json
  5. import logging
  6. import os
  7. import pickle
  8. import random
  9. import sys
  10. import time
  11. import uuid
  12. import zlib
  13. import redis
  14. from werkzeug.exceptions import NotFound
  15. sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../")
  16. import traceback
  17. import requests
  18. from format_convert import _global
  19. from format_convert.utils import get_platform, get_sequential_data, judge_error_code, request_post, get_ip_port, \
  20. get_intranet_ip, get_logger, log, memory_decorator
  21. from ocr.ocr_interface import ocr, OcrModels
  22. from otr.otr_interface import otr, OtrModels
  23. from format_convert.libreoffice_interface import office_convert
  24. if get_platform() == "Windows":
  25. FROM_REMOTE = False
  26. only_test_ocr = False
  27. if only_test_ocr:
  28. ip_port_flag = {}
  29. ip_port_dict = get_ip_port()
  30. for _k in ip_port_dict.keys():
  31. ip_port_flag.update({_k: {"ocr": 0,
  32. "otr": 0,
  33. "convert": 0,
  34. "office": 0
  35. }})
  36. _global.update({"ip_port_flag": ip_port_flag})
  37. ip_port_dict["http://127.0.0.1"]["ocr"] = ["17000"]
  38. ip_port_dict["http://127.0.0.1"]["otr"] = ["18000"]
  39. _global.update({"ip_port": ip_port_dict})
  40. else:
  41. FROM_REMOTE = True
  42. # 连接redis数据库
  43. # redis_db = redis.StrictRedis(host='192.168.2.103', port='6379',
  44. # db=1, password='bidi123456', health_check_interval=300)
  45. redis_db = None
  46. def _interface(_dict, time_out=60, retry_times=3, use_zlib=False):
  47. try:
  48. # 重试
  49. model_type = _dict.get("model_type")
  50. while retry_times:
  51. ip_port = interface_pool(model_type)
  52. if judge_error_code(ip_port):
  53. return ip_port
  54. _url = ip_port + "/" + model_type
  55. base64_stream = base64.b64encode(pickle.dumps(_dict))
  56. # if use_zlib:
  57. # base64_stream = zlib.compress(base64_stream)
  58. r = pickle.loads(base64.b64decode(request_post(_url, {"data": base64_stream, "model_type": model_type}, time_out=time_out, use_zlib=use_zlib)))
  59. log("get _interface return")
  60. if type(r) == list:
  61. # 接口连不上换个端口重试
  62. if retry_times <= 1:
  63. return r
  64. else:
  65. retry_times -= 1
  66. log("retry post _interface... left times " + str(retry_times) + " " + model_type)
  67. continue
  68. if judge_error_code(r):
  69. return r
  70. return r
  71. break
  72. except TimeoutError:
  73. return [-5]
  74. except requests.exceptions.ConnectionError as e:
  75. return [-2]
  76. def from_office_interface(src_path, dest_path, target_format, retry_times=1, from_remote=FROM_REMOTE):
  77. try:
  78. # Win10跳出超时装饰器
  79. # if get_platform() == "Windows":
  80. # # origin_office_convert = office_convert.__wrapped__
  81. # # file_path = origin_office_convert(src_path, dest_path, target_format, retry_times)
  82. # file_path = office_convert(src_path, dest_path, target_format, retry_times)
  83. # else:
  84. # # 将装饰器包装为一个类,否则多进程Pickle会报错 it's not the same object as xxx 问题,
  85. # # timeout_decorator_obj = my_timeout_decorator.TimeoutClass(office_convert, 180, TimeoutError)
  86. # # file_path = timeout_decorator_obj.run(src_path, dest_path, target_format, retry_times)
  87. #
  88. # file_path = office_convert(src_path, dest_path, target_format, retry_times)
  89. if from_remote:
  90. # 重试
  91. retry_times_1 = 1
  92. retry_times_2 = 2
  93. while retry_times_1 and retry_times_2:
  94. # _ip = ip_pool("soffice", _random=True)
  95. # _port = port_pool("soffice", _random=True)
  96. # _ip = interface_ip_list[0]
  97. # _port = "16002"
  98. # _ip, _port = interface_pool("soffice")
  99. # ip_port = from_schedule_interface("office")
  100. ip_port = interface_pool_gunicorn("office")
  101. if judge_error_code(ip_port):
  102. return ip_port
  103. _url = ip_port + "/soffice"
  104. with open(src_path, "rb") as f:
  105. file_bytes = f.read()
  106. base64_stream = base64.b64encode(file_bytes)
  107. start_time = time.time()
  108. r = json.loads(request_post(_url, {"src_path": src_path,
  109. "dest_path": dest_path,
  110. "file": base64_stream,
  111. "target_format": target_format,
  112. "retry_times": retry_times}, time_out=25))
  113. log("get interface return")
  114. log("office use time " + str(time.time()-start_time))
  115. if type(r) == list:
  116. # 接口连不上换个端口重试
  117. if retry_times_1 <= 1:
  118. return r
  119. else:
  120. retry_times_1 -= 1
  121. log("retry post office_interface... left times " + str(retry_times_1))
  122. continue
  123. file_str = r.get("data")
  124. if judge_error_code(file_str):
  125. if retry_times_2 <= 1:
  126. return file_str
  127. else:
  128. retry_times_2 -= 1
  129. continue
  130. file_bytes = eval(file_str)
  131. uid1 = src_path.split(os.sep)[-1].split(".")[0]
  132. file_path = dest_path + uid1 + "." + target_format
  133. if not os.path.exists(os.path.dirname(file_path)):
  134. os.makedirs(os.path.dirname(file_path), mode=0o777)
  135. with open(file_path, "wb") as f:
  136. f.write(file_bytes)
  137. break
  138. else:
  139. file_path = office_convert(src_path, dest_path, target_format, retry_times)
  140. if judge_error_code(file_path):
  141. return file_path
  142. return file_path
  143. except TimeoutError:
  144. log("from_office_interface timeout error!")
  145. return [-5]
  146. except:
  147. log("from_office_interface error!")
  148. print("from_office_interface", traceback.print_exc())
  149. return [-1]
  150. def from_ocr_interface(image_stream, is_table=False, from_remote=FROM_REMOTE):
  151. log("into from_ocr_interface")
  152. try:
  153. base64_stream = base64.b64encode(image_stream)
  154. # 调用接口
  155. try:
  156. if from_remote:
  157. retry_times_1 = 3
  158. # 重试
  159. while retry_times_1:
  160. # _ip = ip_pool("ocr", _random=True)
  161. # _port = port_pool("ocr", _random=True)
  162. # if _ip == interface_ip_list[1]:
  163. # _port = ocr_port_list[0]
  164. # _ip, _port = interface_pool("ocr")
  165. # ip_port = _ip + ":" + _port
  166. # ip_port = from_schedule_interface("ocr")
  167. ip_port = interface_pool_gunicorn("ocr")
  168. if judge_error_code(ip_port):
  169. return ip_port
  170. _url = ip_port + "/ocr"
  171. r = json.loads(request_post(_url, {"data": base64_stream,
  172. "md5": _global.get("md5")},
  173. time_out=60))
  174. log("get interface return")
  175. if type(r) == list:
  176. # 接口连不上换个端口重试
  177. if retry_times_1 <= 1:
  178. if is_table:
  179. return r, r
  180. else:
  181. return r
  182. else:
  183. retry_times_1 -= 1
  184. log("retry post ocr_interface... left times " + str(retry_times_1))
  185. continue
  186. if judge_error_code(r):
  187. return r
  188. break
  189. else:
  190. if globals().get("global_ocr_model") is None:
  191. print("=========== init ocr model ===========")
  192. globals().update({"global_ocr_model": OcrModels().get_model()})
  193. r = ocr(data=base64_stream, ocr_model=globals().get("global_ocr_model"))
  194. except TimeoutError:
  195. if is_table:
  196. return [-5], [-5]
  197. else:
  198. return [-5]
  199. except requests.exceptions.ConnectionError as e:
  200. if is_table:
  201. return [-2], [-2]
  202. else:
  203. return [-2]
  204. _dict = r
  205. text_list = eval(_dict.get("text"))
  206. bbox_list = eval(_dict.get("bbox"))
  207. if text_list is None:
  208. text_list = []
  209. if bbox_list is None:
  210. bbox_list = []
  211. if is_table:
  212. return text_list, bbox_list
  213. else:
  214. if text_list and bbox_list:
  215. text = get_sequential_data(text_list, bbox_list, html=True)
  216. if judge_error_code(text):
  217. return text
  218. else:
  219. text = ""
  220. return text
  221. except Exception as e:
  222. log("from_ocr_interface error!")
  223. # print("from_ocr_interface", e, global_type)
  224. if is_table:
  225. return [-1], [-1]
  226. else:
  227. return [-1]
  228. def from_gpu_interface_flask(_dict, model_type, predictor_type):
  229. log("into from_gpu_interface")
  230. start_time = time.time()
  231. try:
  232. # 调用接口
  233. _dict.update({"predictor_type": predictor_type, "model_type": model_type})
  234. if model_type == "ocr":
  235. use_zlib = True
  236. else:
  237. use_zlib = False
  238. result = _interface(_dict, time_out=30, retry_times=2, use_zlib=use_zlib)
  239. log("from_gpu_interface finish size " + str(sys.getsizeof(_dict)) + " time " + str(time.time()-start_time))
  240. return result
  241. except Exception as e:
  242. log("from_gpu_interface error!")
  243. log("from_gpu_interface failed " + str(time.time()-start_time))
  244. traceback.print_exc()
  245. return [-2]
  246. def from_gpu_interface_redis(_dict, model_type, predictor_type):
  247. log("into from_gpu_interface")
  248. start_time = time.time()
  249. try:
  250. # 调用接口
  251. _uuid = uuid.uuid1().hex
  252. _dict.update({"predictor_type": predictor_type, "model_type": model_type,
  253. "uuid": _uuid})
  254. _time = time.time()
  255. log("pickle.dumps(_dict)" + str(_dict))
  256. redis_db.rpush("producer_"+model_type, pickle.dumps(_dict))
  257. log("producer_" + model_type + " len " + str(redis_db.llen("producer_" + model_type)))
  258. log("to producer_" + model_type + " time " + str(time.time()-_time))
  259. _time = time.time()
  260. time_out = 300
  261. while True:
  262. time.sleep(0.2)
  263. if time.time() - _time > time_out:
  264. raise Exception
  265. if redis_db.hexists("consumer_"+model_type, _uuid):
  266. time1 = time.time()
  267. result = redis_db.hget("consumer_"+model_type, _uuid)
  268. log("from consumer_"+model_type + " time " + str(time.time()-time1))
  269. break
  270. result = pickle.loads(result)
  271. log("from_gpu_interface finish - size " + str(sys.getsizeof(_dict)) + " - time " + str(time.time()-start_time))
  272. return result
  273. except Exception as e:
  274. log("from_gpu_interface error!")
  275. log("from_gpu_interface failed " + str(time.time()-start_time))
  276. traceback.print_exc()
  277. return [-2]
  278. def from_otr_interface2(image_stream):
  279. log("into from_otr_interface")
  280. try:
  281. base64_stream = base64.b64encode(image_stream)
  282. # 调用接口
  283. try:
  284. if globals().get("global_otr_model") is None:
  285. globals().update({"global_otr_model": OtrModels().get_model()})
  286. print("=========== init otr model ===========")
  287. r = otr(data=base64_stream, otr_model=globals().get("global_otr_model"))
  288. except TimeoutError:
  289. return [-5], [-5], [-5], [-5], [-5]
  290. except requests.exceptions.ConnectionError as e:
  291. log("from_otr_interface")
  292. print("from_otr_interface", traceback.print_exc())
  293. return [-2], [-2], [-2], [-2], [-2]
  294. # 处理结果
  295. _dict = r
  296. points = eval(_dict.get("points"))
  297. split_lines = eval(_dict.get("split_lines"))
  298. bboxes = eval(_dict.get("bboxes"))
  299. outline_points = eval(_dict.get("outline_points"))
  300. lines = eval(_dict.get("lines"))
  301. # print("from_otr_interface len(bboxes)", len(bboxes))
  302. if points is None:
  303. points = []
  304. if split_lines is None:
  305. split_lines = []
  306. if bboxes is None:
  307. bboxes = []
  308. if outline_points is None:
  309. outline_points = []
  310. if lines is None:
  311. lines = []
  312. return points, split_lines, bboxes, outline_points, lines
  313. except Exception as e:
  314. log("from_otr_interface error!")
  315. print("from_otr_interface", traceback.print_exc())
  316. return [-1], [-1], [-1], [-1], [-1]
  317. def from_otr_interface(image_stream, is_from_pdf=False, from_remote=FROM_REMOTE):
  318. log("into from_otr_interface")
  319. try:
  320. base64_stream = base64.b64encode(image_stream)
  321. # 调用接口
  322. try:
  323. if from_remote:
  324. retry_times_1 = 3
  325. # 重试
  326. while retry_times_1:
  327. # _ip = ip_pool("otr", _random=True)
  328. # _port = port_pool("otr", _random=True)
  329. # if _ip == interface_ip_list[1]:
  330. # _port = otr_port_list[0]
  331. ip_port = interface_pool_gunicorn("otr")
  332. # ip_port = from_schedule_interface("otr")
  333. if judge_error_code(ip_port):
  334. return ip_port
  335. _url = ip_port + "/otr"
  336. r = json.loads(request_post(_url, {"data": base64_stream,
  337. "is_from_pdf": is_from_pdf,
  338. "md5": _global.get("md5")}, time_out=60))
  339. log("get interface return")
  340. if type(r) == list:
  341. # 接口连不上换个端口重试
  342. if retry_times_1 <= 1:
  343. return r
  344. else:
  345. retry_times_1 -= 1
  346. log("retry post otr_interface... left times " + str(retry_times_1))
  347. continue
  348. if judge_error_code(r):
  349. return r
  350. break
  351. else:
  352. if globals().get("global_otr_model") is None:
  353. print("=========== init otr model ===========")
  354. globals().update({"global_otr_model": OtrModels().get_model()})
  355. r = otr(data=base64_stream, otr_model=globals().get("global_otr_model"), is_from_pdf=is_from_pdf)
  356. # r = otr(data=base64_stream, otr_model=None, is_from_pdf=is_from_pdf)
  357. except TimeoutError:
  358. return [-5]
  359. except requests.exceptions.ConnectionError as e:
  360. log("from_otr_interface")
  361. print("from_otr_interface", traceback.print_exc())
  362. return [-2]
  363. # 处理结果
  364. _dict = r
  365. list_line = eval(_dict.get("list_line"))
  366. return list_line
  367. except Exception as e:
  368. log("from_otr_interface error!")
  369. print("from_otr_interface", traceback.print_exc())
  370. return [-1]
  371. # def from_schedule_interface(interface_type):
  372. # try:
  373. # _ip = "http://" + get_intranet_ip()
  374. # _port = ip_port_dict.get(_ip).get("schedule")[0]
  375. # _url = _ip + ":" + _port + "/schedule"
  376. # data = {"interface_type": interface_type}
  377. # result = json.loads(request_post(_url, data, time_out=10)).get("data")
  378. # if judge_error_code(result):
  379. # return result
  380. # _ip, _port = result
  381. # log("from_schedule_interface " + _ip + " " + _port)
  382. # return _ip + ":" + _port
  383. # except requests.exceptions.ConnectionError as e:
  384. # log("from_schedule_interface ConnectionError")
  385. # return [-2]
  386. # except:
  387. # log("from_schedule_interface error!")
  388. # traceback.print_exc()
  389. # return [-1]
  390. def interface_pool(interface_type, use_gunicorn=True):
  391. ip_port_flag = _global.get("ip_port_flag")
  392. ip_port_dict = _global.get("ip_port")
  393. try:
  394. if use_gunicorn:
  395. _ip = "http://127.0.0.1"
  396. _port = ip_port_dict.get(_ip).get(interface_type)[0]
  397. ip_port = _ip + ":" + str(_port)
  398. log(ip_port)
  399. return ip_port
  400. # 负载均衡, 选取ip
  401. interface_load_list = []
  402. for _ip in ip_port_flag.keys():
  403. if ip_port_dict.get(_ip).get(interface_type):
  404. load_scale = ip_port_flag.get(_ip).get(interface_type) / len(ip_port_dict.get(_ip).get(interface_type))
  405. interface_load_list.append([_ip, load_scale])
  406. if not interface_load_list:
  407. raise NotFound
  408. interface_load_list.sort(key=lambda x: x[-1])
  409. _ip = interface_load_list[0][0]
  410. # 负载均衡, 选取port
  411. ip_type_cnt = ip_port_flag.get(_ip).get(interface_type)
  412. ip_type_total = len(ip_port_dict.get(_ip).get(interface_type))
  413. if ip_type_cnt == 0:
  414. ip_type_cnt = random.randint(0, ip_type_total-1)
  415. port_index = ip_type_cnt % ip_type_total
  416. _port = ip_port_dict.get(_ip).get(interface_type)[port_index]
  417. # 更新flag
  418. current_flag = ip_type_cnt
  419. if current_flag >= 10000:
  420. ip_port_flag[_ip][interface_type] = 0
  421. else:
  422. ip_port_flag[_ip][interface_type] = current_flag + 1
  423. _global.update({"ip_port_flag": ip_port_flag})
  424. # log(str(_global.get("ip_port_flag")))
  425. ip_port = _ip + ":" + str(_port)
  426. log(ip_port)
  427. return ip_port
  428. except NotFound:
  429. log("cannot read ip from config! checkout config")
  430. return [-2]
  431. except:
  432. traceback.print_exc()
  433. return [-1]
  434. def interface_pool_gunicorn(interface_type):
  435. ip_flag_list = _global.get("ip_flag")
  436. ip_port_flag_dict = _global.get("ip_port_flag")
  437. ip_port_dict = _global.get("ip_port")
  438. try:
  439. if ip_flag_list is None or ip_port_dict is None or ip_port_flag_dict is None:
  440. raise NotFound
  441. if interface_type == "office":
  442. _ip = "http://127.0.0.1"
  443. # 选取端口
  444. port_list = ip_port_dict.get(_ip).get(interface_type)
  445. ip_type_cnt = ip_port_flag_dict.get(_ip).get(interface_type)
  446. if ip_type_cnt == 0:
  447. _port = port_list[random.randint(0, len(port_list)-1)]
  448. else:
  449. _port = port_list[ip_type_cnt % len(port_list)]
  450. # 更新flag
  451. if ip_port_flag_dict.get(_ip).get(interface_type) >= 10000:
  452. ip_port_flag_dict[_ip][interface_type] = 0
  453. else:
  454. ip_port_flag_dict[_ip][interface_type] += 1
  455. _global.update({"ip_port_flag": ip_port_flag_dict})
  456. else:
  457. # 负载均衡, 选取ip
  458. ip_flag_list.sort(key=lambda x: x[1])
  459. if ip_flag_list[-1][1] == 0:
  460. ip_index = random.randint(0, len(ip_flag_list)-1)
  461. else:
  462. ip_index = 0
  463. _ip = ip_flag_list[ip_index][0]
  464. # 选取端口, 使用gunicorn则直接选第一个
  465. _port = ip_port_dict.get(_ip).get(interface_type)[0]
  466. # 更新flag
  467. if ip_flag_list[ip_index][1] >= 10000:
  468. ip_flag_list[ip_index][1] = 0
  469. else:
  470. ip_flag_list[ip_index][1] += + 1
  471. _global.update({"ip_flag": ip_flag_list})
  472. ip_port = _ip + ":" + str(_port)
  473. log(ip_port)
  474. return ip_port
  475. except NotFound:
  476. log("ip_flag or ip_port_dict is None! checkout config")
  477. return [-2]
  478. except:
  479. traceback.print_exc()
  480. return [-1]
  481. # def interface_pool(interface_type):
  482. # try:
  483. # ip_port_dict = _global.get("ip_port")
  484. # ip_list = list(ip_port_dict.keys())
  485. # _ip = random.choice(ip_list)
  486. # if interface_type != 'office':
  487. # _port = ip_port_dict.get(_ip).get(interface_type)[0]
  488. # else:
  489. # _port = random.choice(ip_port_dict.get(_ip).get(interface_type))
  490. # log(_ip + ":" + _port)
  491. # return _ip + ":" + _port
  492. # except Exception as e:
  493. # traceback.print_exc()
  494. # return [-1]
  495. # def ip_pool(interface_type, _random=False):
  496. # ip_flag_name = interface_type + '_ip_flag'
  497. # ip_flag = globals().get(ip_flag_name)
  498. # if ip_flag is None:
  499. # if _random:
  500. # _r = random.randint(0, len(interface_ip_list)-1)
  501. # ip_flag = _r
  502. # globals().update({ip_flag_name: ip_flag})
  503. # ip_index = _r
  504. # else:
  505. # ip_flag = 0
  506. # globals().update({ip_flag_name: ip_flag})
  507. # ip_index = 0
  508. # else:
  509. # ip_index = ip_flag % len(interface_ip_list)
  510. # ip_flag += 1
  511. #
  512. # if ip_flag >= 10000:
  513. # ip_flag = 0
  514. # globals().update({ip_flag_name: ip_flag})
  515. #
  516. # log("ip_pool " + interface_type + " " + str(ip_flag) + " " + str(interface_ip_list[ip_index]))
  517. # return interface_ip_list[ip_index]
  518. #
  519. #
  520. # def port_pool(interface_type, _random=False):
  521. # port_flag_name = interface_type + '_port_flag'
  522. #
  523. # port_flag = globals().get(port_flag_name)
  524. # if port_flag is None:
  525. # if _random:
  526. # if interface_type == "ocr":
  527. # _r = random.randint(0, len(ocr_port_list)-1)
  528. # elif interface_type == "otr":
  529. # _r = random.randint(0, len(otr_port_list)-1)
  530. # else:
  531. # _r = random.randint(0, len(soffice_port_list)-1)
  532. # port_flag = _r
  533. # globals().update({port_flag_name: port_flag})
  534. # port_index = _r
  535. # else:
  536. # port_flag = 0
  537. # globals().update({port_flag_name: port_flag})
  538. # port_index = 0
  539. # else:
  540. # if interface_type == "ocr":
  541. # port_index = port_flag % len(ocr_port_list)
  542. # elif interface_type == "otr":
  543. # port_index = port_flag % len(otr_port_list)
  544. # else:
  545. # port_index = port_flag % len(soffice_port_list)
  546. # port_flag += 1
  547. #
  548. # if port_flag >= 10000:
  549. # port_flag = 0
  550. # globals().update({port_flag_name: port_flag})
  551. #
  552. # if interface_type == "ocr":
  553. # log("port_pool " + interface_type + " " + str(port_flag) + " " + ocr_port_list[port_index])
  554. # return ocr_port_list[port_index]
  555. # elif interface_type == "otr":
  556. # log("port_pool " + interface_type + " " + str(port_flag) + " " + otr_port_list[port_index])
  557. # return otr_port_list[port_index]
  558. # else:
  559. # log("port_pool " + interface_type + " " + str(port_flag) + " " + soffice_port_list[port_index])
  560. # return soffice_port_list[port_index]