test_convert_interface.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import sys,os
  2. sys.path.append(os.path.dirname(__file__)+"/..")
  3. from multiprocessing import Queue
  4. import pandas as pd
  5. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  6. from BaseDataMaintenance.common.ossUtils import test_download
  7. from BaseDataMaintenance.dataSource.interface import getAttachDealInterface
  8. from BaseDataMaintenance.common.Utils import *
  9. import time,base64
  10. from threading import Thread
  11. import traceback
  12. def test_xls_doc():
  13. df = pd.read_excel("%s/%s"%(os.path.dirname(__file__),"attachmentProcessTime2.xlsx"))
  14. _queue = Queue()
  15. _queue_ocr = Queue()
  16. _count = 0
  17. for filemd5,filetype in zip(df["filemd5"],df["filetype"]):
  18. _count += 1
  19. if _count>500:
  20. break
  21. _dict = {"filemd5":filemd5,"filetype":filetype}
  22. if filetype in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
  23. _queue_ocr.put(_dict)
  24. else:
  25. _queue.put(_dict)
  26. print("_queue size:%d"%(_queue.qsize()))
  27. print("_queue_ocr size:%d"%(_queue_ocr.qsize()))
  28. def _handle(item,result_queue):
  29. d_start_time = time.time()
  30. _path = test_download(item.get("filemd5"))
  31. time_download = time.time()-d_start_time
  32. _data_base64 = base64.b64encode(open(_path,"rb").read())
  33. #调用接口处理结果
  34. start_time = time.time()
  35. _success,_html,_ = getAttachDealInterface(_data_base64,item.get("filetype"))
  36. reg_time = time.time()-start_time
  37. log("process filemd5:%s of type:%s download:%ds recognize:%ds result:%s rec_size:%d"%(item.get("filemd5"),item.get("filetype"),time_download,reg_time,str(_success),len(_html)))
  38. def _process():
  39. while 1:
  40. try:
  41. item = _queue_ocr.get(True,timeout=0.2)
  42. _handle(item,None)
  43. except Exception as e:
  44. log("ocr queue error:%s"%(str(e)))
  45. traceback.print_exc()
  46. pass
  47. try:
  48. item = _queue.get(True,timeout=0.2)
  49. _handle(item,None)
  50. except Exception as e:
  51. pass
  52. # mt = MultiThreadHandler(_queue,_handle,None,20)
  53. # mt.run()
  54. list_thread = []
  55. for i in range(20):
  56. list_thread.append(Thread(target=_process))
  57. for t in list_thread:
  58. t.start()
  59. for t in list_thread:
  60. t.join()
  61. if __name__ == '__main__':
  62. test_xls_doc()