12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- import sys,os
- sys.path.append(os.path.dirname(__file__)+"/..")
- from multiprocessing import Queue
- import pandas as pd
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- from BaseDataMaintenance.common.ossUtils import test_download
- from BaseDataMaintenance.dataSource.interface import getAttachDealInterface
- from BaseDataMaintenance.common.Utils import *
- import time,base64
- from threading import Thread
- import traceback
- def test_xls_doc():
- df = pd.read_excel("%s/%s"%(os.path.dirname(__file__),"attachmentProcessTime2.xlsx"))
- _queue = Queue()
- _queue_ocr = Queue()
- _count = 0
- for filemd5,filetype in zip(df["filemd5"],df["filetype"]):
- _count += 1
- if _count>500:
- break
- _dict = {"filemd5":filemd5,"filetype":filetype}
- if filetype in ["bmp","jpeg","jpg","png","swf","pdf","tif"]:
- _queue_ocr.put(_dict)
- else:
- _queue.put(_dict)
- print("_queue size:%d"%(_queue.qsize()))
- print("_queue_ocr size:%d"%(_queue_ocr.qsize()))
- def _handle(item,result_queue):
- d_start_time = time.time()
- _path = test_download(item.get("filemd5"))
- time_download = time.time()-d_start_time
- _data_base64 = base64.b64encode(open(_path,"rb").read())
- #调用接口处理结果
- start_time = time.time()
- _success,_html,_ = getAttachDealInterface(_data_base64,item.get("filetype"))
- reg_time = time.time()-start_time
- log("process filemd5:%s of type:%s download:%ds recognize:%ds result:%s rec_size:%d"%(item.get("filemd5"),item.get("filetype"),time_download,reg_time,str(_success),len(_html)))
- def _process():
- while 1:
- try:
- item = _queue_ocr.get(True,timeout=0.2)
- _handle(item,None)
- except Exception as e:
- log("ocr queue error:%s"%(str(e)))
- traceback.print_exc()
- pass
- try:
- item = _queue.get(True,timeout=0.2)
- _handle(item,None)
- except Exception as e:
- pass
- # mt = MultiThreadHandler(_queue,_handle,None,20)
- # mt.run()
- list_thread = []
- for i in range(20):
- list_thread.append(Thread(target=_process))
- for t in list_thread:
- t.start()
- for t in list_thread:
- t.join()
- if __name__ == '__main__':
- test_xls_doc()
|