product_parameter.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. from apscheduler.schedulers.blocking import BlockingScheduler
  2. from tablestore import *
  3. from BaseDataMaintenance.dataSource.source import getConnect_ots,getAuth,is_internal
  4. from BaseDataMaintenance.dataSource.interface import *
  5. from multiprocessing import Queue as PQueue
  6. from multiprocessing import Process
  7. from BaseDataMaintenance.model.ots.document_product import *
  8. from BaseDataMaintenance.model.ots.attachment import *
  9. from BaseDataMaintenance.common.Utils import *
  10. from BaseDataMaintenance.common.ossUtils import *
  11. from BaseDataMaintenance.maintenance.product.htmlparser import *
  12. from BaseDataMaintenance.maintenance.product.productUtils import pool_product
  13. import oss2
  14. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  15. parameter_status_no_bidfile = -1
  16. parameter_status_to_process = 0
  17. parameter_status_process_succeed = 1
  18. parameter_status_process_failed = 2
  19. parameter_status_process_jump = 3
  20. parameter_status_not_found = 4
  21. import redis
  22. from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
  23. class Product_Attachment_Processor():
  24. def __init__(self,):
  25. self.ots_client = getConnect_ots()
  26. self.product_attachment_queue = PQueue()
  27. self.product_attachment_queue_size = 50
  28. self.set_product_attachment = set()
  29. self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
  30. self.auth = getAuth()
  31. oss2.defaults.connection_pool_size = 100
  32. oss2.defaults.multiget_num_threads = 20
  33. if is_internal:
  34. self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  35. else:
  36. self.bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  37. log("bucket_url:%s"%(self.bucket_url))
  38. self.attachment_bucket_name = "attachment-hub"
  39. self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
  40. self.current_path = os.path.dirname(__file__)
  41. self.download_path = "%s/%s"%(self.current_path,"download")
  42. self.test_url="http://192.168.2.102:15011/convert"
  43. def process_parameters_producer(self,):
  44. attachment_size = getQueueSize("dataflow_attachment")
  45. if attachment_size<100:
  46. _qsize = self.product_attachment_queue.qsize()
  47. log("product_attachment_queue %d"%(_qsize))
  48. if _qsize>self.product_attachment_queue_size/3:
  49. return
  50. bool_query = BoolQuery(must_queries=[
  51. TermQuery("parameter_status",parameter_status_to_process)
  52. ])
  53. list_id = []
  54. dict_docid_list = {}
  55. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  56. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
  57. ColumnsToGet([DOCUMENT_PRODUCT_ATTACHMENTS,DOCUMENT_PRODUCT_NAME,DOCUMENT_PRODUCT_ORIGINAL_NAME,DOCUMENT_PRODUCT_DOCID],return_type=ColumnReturnType.SPECIFIED))
  58. list_data = getRow_ots(rows)
  59. _count = 0
  60. for data in list_data:
  61. _id = data.get(DOCUMENT_PRODUCT_ID)
  62. list_id.append(_id)
  63. if _id in self.set_product_attachment:
  64. continue
  65. docid = data.get(DOCUMENT_PRODUCT_DOCID)
  66. if docid not in dict_docid_list:
  67. dict_docid_list[docid] = []
  68. dict_docid_list[docid].append(data)
  69. _count += 1
  70. while next_token:
  71. if len(dict_docid_list.keys())>=self.product_attachment_queue_size:
  72. break
  73. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  74. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  75. ColumnsToGet([DOCUMENT_PRODUCT_ATTACHMENTS,DOCUMENT_PRODUCT_NAME,DOCUMENT_PRODUCT_ORIGINAL_NAME,DOCUMENT_PRODUCT_DOCID],return_type=ColumnReturnType.SPECIFIED))
  76. list_data = getRow_ots(rows)
  77. for data in list_data:
  78. _id = data.get(DOCUMENT_PRODUCT_ID)
  79. list_id.append(_id)
  80. if _id in self.set_product_attachment:
  81. continue
  82. docid = data.get(DOCUMENT_PRODUCT_DOCID)
  83. if docid not in dict_docid_list:
  84. dict_docid_list[docid] = []
  85. dict_docid_list[docid].append(data)
  86. _count += 1
  87. for k,v in dict_docid_list.items():
  88. self.product_attachment_queue.put(v)
  89. _qsize = self.product_attachment_queue.qsize()
  90. log("after product_attachment_queue %d"%(_qsize))
  91. self.set_product_attachment = set(list_id)
  92. def get_whole_html(self,_filemd5):
  93. atta = attachment({attachment_filemd5:_filemd5})
  94. _html = ""
  95. db = redis.Redis(connection_pool=pool_product)
  96. _key = "filemd5:%s"%(_filemd5)
  97. _cache_html = None
  98. try:
  99. _cache_html = db.get(_key)
  100. except Exception as e:
  101. logger.info("get redis cache html error")
  102. if _cache_html is not None:
  103. _html = _cache_html
  104. else:
  105. if atta.fix_columns(self.ots_client,[attachment_path,attachment_filetype,attachment_size],True):
  106. objectPath = atta.getProperties().get(attachment_path)
  107. _filetype = atta.getProperties().get(attachment_filetype)
  108. _size = atta.getProperties().get(attachment_size,0)
  109. if _size<=0 or _size>=20*1024*1024:
  110. return _html
  111. # not supported on windows
  112. # if _filetype in ("doc","xls"):
  113. # if len(list_filemd5)==1:
  114. # dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_jump,True)
  115. # dp.update_row(self.ots_client)
  116. # return
  117. # else:
  118. # continue
  119. localpath = "%s/%s.%s"%(self.download_path,_filemd5,_filetype)
  120. localhtml = "%s/%s.%s"%(self.download_path,_filemd5,"html")
  121. download_succeed = False
  122. try:
  123. if not os.path.exists(localpath):
  124. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  125. else:
  126. download_succeed = True
  127. except Exception as e:
  128. download_succeed = False
  129. if download_succeed:
  130. try:
  131. start_time = time.time()
  132. if os.path.exists(localhtml):
  133. _html = open(localhtml,"r",encoding="utf8").read()
  134. _success = True
  135. if len(_html)>10:
  136. _success = True
  137. else:
  138. _data_base64 = base64.b64encode(open(localpath,"rb").read())
  139. _success,_html,swf_images,classification = getAttachDealInterface(_data_base64,_filetype,kwargs={'page_no': '1,-1',"max_bytes":"-1","timeout":6000},timeout=6000)
  140. if _success:
  141. db.set(_key,_html,24*60*60)
  142. # save for dubug
  143. # localhtml = "%s/%s.%s"%(self.download_path,_filemd5,"html")
  144. # with open(localhtml,"w",encoding="utf8") as f:
  145. # f.write(_html)
  146. except ConnectionError as e1:
  147. if time.time()-start_time>5000:
  148. db.set(_key,_html,24*60*60)
  149. else:
  150. raise e1
  151. except Exception as e:
  152. traceback.print_exc()
  153. finally:
  154. try:
  155. if os.path.exists(localpath):
  156. os.remove(localpath)
  157. pass
  158. except Exception as e:
  159. pass
  160. else:
  161. log("attachment %s not exists"%_filemd5)
  162. return _html
  163. def process_parameters_handler(self,list_item,result_queue):
  164. for item in list_item:
  165. attachments = item.get(DOCUMENT_PRODUCT_ATTACHMENTS)
  166. product_name = item.get(DOCUMENT_PRODUCT_NAME)
  167. product_original_name = item.get(DOCUMENT_PRODUCT_ORIGINAL_NAME)
  168. list_product = []
  169. log("processing name:%s original_name:%s attachments:%s"%(product_name,product_original_name,attachments))
  170. if product_original_name is not None:
  171. _l = product_original_name.split("_")
  172. _l.reverse()
  173. list_product.extend(_l)
  174. if product_name is not None:
  175. list_product.append(product_name)
  176. list_product = list(set(list_product))
  177. dp = Document_product(item)
  178. if attachments is None or attachments=="" or len(list_product)==0:
  179. dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_no_bidfile)
  180. dp.update_row(self.ots_client)
  181. return
  182. list_attachment = json.loads(attachments)
  183. list_attachment.sort(key=lambda x:0 if x.get("classification")=="招标文件" else 1 if x.get("classification")=="采购清单" else 2)
  184. list_filemd5 = [a.get("filemd5","") for a in list_attachment]
  185. _find = False
  186. _success = False
  187. list_text = []
  188. for _filemd5 in list_filemd5:
  189. _html = self.get_whole_html(_filemd5)
  190. if len(_html)>5:
  191. pd = ParseDocument(_html,True)
  192. for _product in list_product:
  193. pd.fix_tree(_product)
  194. list_data = pd.tree
  195. _text,_count = extract_product_parameters(list_data,_product)
  196. if _count>0:
  197. _find = True
  198. if _text is not None:
  199. list_text.append(_text)
  200. pd = ParseDocument(_html,False)
  201. for _product in list_product:
  202. pd.fix_tree(_product)
  203. list_data = pd.tree
  204. _text,_count = extract_product_parameters(list_data,_product)
  205. if _count>0:
  206. _find = True
  207. if _text is not None:
  208. list_text.append(_text)
  209. else:
  210. log("product attachment process filemd5 %s has no content"%(_filemd5))
  211. if len(list_text)>0:
  212. _text = getBestProductText(list_text,'',[])
  213. logger.info("extract_parameter_text bid_filemd5s:%s name:%s original_name:%s parameter_text:%s"%(str(list_filemd5),product_name,product_original_name,_text))
  214. dp.setValue(DOCUMENT_PRODUCT_PARAMETER,_text,True)
  215. dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_succeed,True)
  216. dp.update_row(self.ots_client)
  217. _success = True
  218. break
  219. if not _success:
  220. if not _find:
  221. dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_not_found,True)
  222. dp.update_row(self.ots_client)
  223. else:
  224. dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_failed,True)
  225. dp.update_row(self.ots_client)
  226. def start_process(self):
  227. self.process_parameters_producer()
  228. thread_count = 7
  229. mt = MultiThreadHandler(self.product_attachment_queue,self.process_parameters_handler,None,thread_count,need_stop=False,restart=True)
  230. mt.run()
  231. def process_parameters_comsumer(self,):
  232. # process_count = 3
  233. # list_process = []
  234. # for i in range(process_count):
  235. # p = Process(target=self.start_process)
  236. # list_process.append(p)
  237. # for p in list_process:
  238. # p.start()
  239. # for p in list_process:
  240. # p.join()
  241. self.start_process()
  242. def start_process_parameters(self):
  243. scheduler = BlockingScheduler()
  244. scheduler.add_job(self.process_parameters_producer,"cron",second="*/20")
  245. scheduler.add_job(self.process_parameters_comsumer,"cron",second="*/30")
  246. scheduler.start()
  247. def start_process_parameters():
  248. pap = Product_Attachment_Processor()
  249. pap.start_process_parameters()
  250. def change_parameters_status():
  251. ots_client =getConnect_ots()
  252. bool_query = BoolQuery(must_queries=[
  253. RangeQuery("parameter_status",-1)
  254. ],
  255. must_not_queries=[
  256. TermQuery("parameter_status",parameter_status_to_process),
  257. TermQuery("parameter_status",parameter_status_process_succeed),
  258. TermQuery("parameter_status",parameter_status_process_jump),
  259. TermQuery("parameter_status",parameter_status_no_bidfile),
  260. TermQuery("parameter_status",parameter_status_not_found),
  261. ])
  262. list_data = []
  263. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  264. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("parameter_status")]),limit=100,get_total_count=True),
  265. ColumnsToGet([DOCUMENT_PRODUCT_ID],return_type=ColumnReturnType.SPECIFIED))
  266. list_data.extend(getRow_ots(rows))
  267. print("total_count",total_count)
  268. while next_token:
  269. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  270. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  271. ColumnsToGet([DOCUMENT_PRODUCT_ID],return_type=ColumnReturnType.SPECIFIED))
  272. list_data.extend(getRow_ots(rows))
  273. from queue import Queue
  274. task_queue = Queue()
  275. for data in list_data:
  276. task_queue.put(data)
  277. def _handle(data,result_queue):
  278. dp = Document_product(data)
  279. dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_to_process,True)
  280. dp.setValue(DOCUMENT_PRODUCT_PARAMETER,"",True)
  281. dp.update_row(ots_client)
  282. mt = MultiThreadHandler(task_queue,_handle,None,30)
  283. mt.run()
  284. if __name__ == '__main__':
  285. start_process_parameters()
  286. # change_parameters_status()