product_parameter.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. from apscheduler.schedulers.blocking import BlockingScheduler
  2. from tablestore import *
  3. from BaseDataMaintenance.dataSource.source import getConnect_ots,getAuth,is_internal
  4. from BaseDataMaintenance.dataSource.interface import *
  5. from multiprocessing import Queue as PQueue
  6. from multiprocessing import Process
  7. from BaseDataMaintenance.model.ots.document_product import *
  8. from BaseDataMaintenance.model.ots.attachment import *
  9. from BaseDataMaintenance.model.ots.document import *
  10. from BaseDataMaintenance.common.Utils import *
  11. from BaseDataMaintenance.common.ossUtils import *
  12. from BaseDataMaintenance.maintenance.product.htmlparser import *
  13. from BaseDataMaintenance.maintenance.product.productUtils import pool_product
  14. import oss2
  15. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  16. parameter_status_no_bidfile = -1
  17. parameter_status_to_process = 0
  18. parameter_status_process_succeed = 1
  19. parameter_status_process_failed = 2
  20. parameter_status_process_jump = 3
  21. parameter_status_not_found = 4
  22. parameter_status_to_process_his = 100
  23. import redis
  24. from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
  25. class Product_Attachment_Processor():
  26. def __init__(self,):
  27. self.ots_client = getConnect_ots()
  28. self.product_attachment_queue = PQueue()
  29. self.product_attachment_processed_queue = PQueue()
  30. self.product_attachment_queue_size = 50
  31. self.set_product_attachment = set()
  32. self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
  33. self.auth = getAuth()
  34. oss2.defaults.connection_pool_size = 100
  35. oss2.defaults.multiget_num_threads = 20
  36. if is_internal:
  37. self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  38. else:
  39. self.bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  40. log("bucket_url:%s"%(self.bucket_url))
  41. self.attachment_bucket_name = "attachment-hub"
  42. self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
  43. self.current_path = os.path.dirname(__file__)
  44. self.download_path = "%s/%s"%(self.current_path,"download")
  45. self.test_url="http://192.168.2.102:15011/convert"
  46. def getAttachments(self,docid):
  47. list_atta = []
  48. partitionkey = docid%500+1
  49. doc = Document({document_partitionkey:partitionkey,
  50. document_docid:docid})
  51. doc.fix_columns(self.ots_client,[document_attachment_path],True)
  52. page_attachments = doc.getProperties().get(document_attachment_path)
  53. if page_attachments is not None and page_attachments!="":
  54. attachments = json.loads(page_attachments)
  55. for _a in attachments:
  56. _filemd5 = _a.get(document_attachment_path_filemd5)
  57. _da = {attachment_filemd5:_filemd5}
  58. _attach = attachment(_da)
  59. if _attach.fix_columns(ots_client,[attachment_classification,attachment_filetype],True):
  60. _da[attachment_classification] = _attach.getProperties().get(attachment_classification)
  61. _da[attachment_filetype] = _attach.getProperties().get(attachment_filetype)
  62. list_atta.append(_da)
  63. return json.dumps(list_atta,ensure_ascii=False)
  64. def process_parameters_producer(self,):
  65. attachment_size = getQueueSize("dataflow_attachment")
  66. if attachment_size<100:
  67. while 1:
  68. try:
  69. _id = self.product_attachment_processed_queue.get(False)
  70. if _id in self.set_product_attachment:
  71. self.set_product_attachment.remove(_id)
  72. except Exception as e:
  73. break
  74. _qsize = self.product_attachment_queue.qsize()
  75. log("product_attachment_queue %d"%(_qsize))
  76. if _qsize>self.product_attachment_queue_size/3:
  77. return
  78. bool_query = BoolQuery(should_queries=[
  79. # TermQuery(DOCUMENT_PRODUCT_DOCID,305253400)
  80. TermQuery(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_to_process),
  81. TermQuery(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_to_process_his),
  82. BoolQuery(must_not_queries=[ExistsQuery(DOCUMENT_PRODUCT_PARAMETER_STATUS)])
  83. ])
  84. list_id = []
  85. dict_docid_list = {}
  86. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  87. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",sort_order=SortOrder.DESC)]),limit=100,get_total_count=True),
  88. ColumnsToGet([DOCUMENT_PRODUCT_ATTACHMENTS,DOCUMENT_PRODUCT_NAME,DOCUMENT_PRODUCT_ORIGINAL_NAME,DOCUMENT_PRODUCT_DOCID],return_type=ColumnReturnType.SPECIFIED))
  89. list_data = getRow_ots(rows)
  90. _count = 0
  91. for data in list_data:
  92. _id = data.get(DOCUMENT_PRODUCT_ID)
  93. list_id.append(_id)
  94. if _id in self.set_product_attachment:
  95. continue
  96. self.set_product_attachment.add(_id)
  97. docid = data.get(DOCUMENT_PRODUCT_DOCID)
  98. if docid not in dict_docid_list:
  99. dict_docid_list[docid] = []
  100. dict_docid_list[docid].append(data)
  101. _count += 1
  102. while next_token:
  103. if len(dict_docid_list.keys())>=self.product_attachment_queue_size:
  104. break
  105. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  106. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  107. ColumnsToGet([DOCUMENT_PRODUCT_ATTACHMENTS,DOCUMENT_PRODUCT_NAME,DOCUMENT_PRODUCT_ORIGINAL_NAME,DOCUMENT_PRODUCT_DOCID],return_type=ColumnReturnType.SPECIFIED))
  108. list_data = getRow_ots(rows)
  109. for data in list_data:
  110. _id = data.get(DOCUMENT_PRODUCT_ID)
  111. list_id.append(_id)
  112. if _id in self.set_product_attachment:
  113. continue
  114. self.set_product_attachment.add(_id)
  115. docid = data.get(DOCUMENT_PRODUCT_DOCID)
  116. if docid not in dict_docid_list:
  117. dict_docid_list[docid] = []
  118. dict_docid_list[docid].append(data)
  119. _count += 1
  120. for k,v in dict_docid_list.items():
  121. if v[0].get(DOCUMENT_PRODUCT_ATTACHMENTS) is None:
  122. _attachments = self.getAttachments(v[0].get(DOCUMENT_PRODUCT_DOCID))
  123. for _v in v:
  124. _v[DOCUMENT_PRODUCT_ATTACHMENTS] = _attachments
  125. self.product_attachment_queue.put(v)
  126. _qsize = self.product_attachment_queue.qsize()
  127. log("after product_attachment_queue %d"%(_qsize))
  128. # self.set_product_attachment = set(list_id[-20000:])
  129. def get_whole_html(self,_filemd5):
  130. atta = attachment({attachment_filemd5:_filemd5})
  131. _html = ""
  132. db = redis.Redis(connection_pool=pool_product)
  133. _key = "filemd5:%s"%(_filemd5)
  134. _cache_html = None
  135. try:
  136. _cache_html = db.get(_key)
  137. except Exception as e:
  138. logger.info("get redis cache html error")
  139. if _cache_html is not None:
  140. _html = _cache_html
  141. else:
  142. if atta.fix_columns(self.ots_client,[attachment_path,attachment_filetype,attachment_size],True):
  143. objectPath = atta.getProperties().get(attachment_path)
  144. _filetype = atta.getProperties().get(attachment_filetype)
  145. _size = atta.getProperties().get(attachment_size,0)
  146. if _size<=0 or _size>=20*1024*1024:
  147. return _html
  148. # not supported on windows
  149. # if _filetype in ("doc","xls"):
  150. # if len(list_filemd5)==1:
  151. # dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_jump,True)
  152. # dp.update_row(self.ots_client)
  153. # return
  154. # else:
  155. # continue
  156. localpath = "%s/%s.%s"%(self.download_path,_filemd5,_filetype)
  157. localhtml = "%s/%s.%s"%(self.download_path,_filemd5,"html")
  158. download_succeed = False
  159. try:
  160. if not os.path.exists(localpath):
  161. download_succeed = downloadFile(self.bucket,objectPath,localpath)
  162. else:
  163. download_succeed = True
  164. except Exception as e:
  165. download_succeed = False
  166. if download_succeed:
  167. try:
  168. start_time = time.time()
  169. if os.path.exists(localhtml):
  170. _html = open(localhtml,"r",encoding="utf8").read()
  171. _success = True
  172. if len(_html)>10:
  173. _success = True
  174. else:
  175. _data_base64 = base64.b64encode(open(localpath,"rb").read())
  176. _success,_html,swf_images,classification = getAttachDealInterface(_data_base64,_filetype,kwargs={'page_no': '1,-1',"max_bytes":"-1","timeout":6000},timeout=6000)
  177. if _success:
  178. db.set(_key,_html,24*60*60)
  179. # save for dubug
  180. # localhtml = "%s/%s.%s"%(self.download_path,_filemd5,"html")
  181. # with open(localhtml,"w",encoding="utf8") as f:
  182. # f.write(_html)
  183. except ConnectionError as e1:
  184. if time.time()-start_time>5000:
  185. db.set(_key,_html,24*60*60)
  186. else:
  187. raise e1
  188. except Exception as e:
  189. traceback.print_exc()
  190. finally:
  191. try:
  192. if os.path.exists(localpath):
  193. os.remove(localpath)
  194. pass
  195. except Exception as e:
  196. pass
  197. else:
  198. log("attachment %s not exists"%_filemd5)
  199. return _html
  200. def process_parameters_handler(self,list_item,result_queue):
  201. for item in list_item:
  202. try:
  203. attachments = item.get(DOCUMENT_PRODUCT_ATTACHMENTS)
  204. product_name = item.get(DOCUMENT_PRODUCT_NAME)
  205. product_original_name = item.get(DOCUMENT_PRODUCT_ORIGINAL_NAME)
  206. list_product = []
  207. log("processing name:%s original_name:%s attachments:%s"%(product_name,product_original_name,attachments))
  208. if product_original_name is not None:
  209. _l = product_original_name.split("_")
  210. _l.reverse()
  211. list_product.extend(_l)
  212. if product_name is not None:
  213. list_product.append(product_name)
  214. list_product = list(set(list_product))
  215. dp = Document_product(item)
  216. if attachments is None or attachments=="" or len(list_product)==0:
  217. dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_no_bidfile,True)
  218. dp.update_row(self.ots_client)
  219. return
  220. if isinstance(attachments,str):
  221. list_attachment = json.loads(attachments)
  222. elif isinstance(attachments,list):
  223. list_attachment = attachments
  224. else:
  225. log("attachment type error")
  226. dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_no_bidfile,True)
  227. dp.update_row(self.ots_client)
  228. return
  229. list_attachment.sort(key=lambda x:0 if x.get("classification")=="招标文件" else 1 if x.get("classification")=="采购清单" else 2)
  230. list_filemd5 = [a.get("filemd5","") for a in list_attachment]
  231. _find = False
  232. _success = False
  233. list_text = []
  234. for _filemd5 in list_filemd5:
  235. _html = self.get_whole_html(_filemd5)
  236. if len(_html)>5:
  237. pd = ParseDocument(_html,True)
  238. for _product in list_product:
  239. pd.fix_tree(_product)
  240. list_data = pd.tree
  241. _text,_count = extract_product_parameters(list_data,_product)
  242. if _count>0:
  243. _find = True
  244. if _text is not None:
  245. list_text.append(_text)
  246. pd = ParseDocument(_html,False)
  247. for _product in list_product:
  248. pd.fix_tree(_product)
  249. list_data = pd.tree
  250. _text,_count = extract_product_parameters(list_data,_product)
  251. if _count>0:
  252. _find = True
  253. if _text is not None:
  254. list_text.append(_text)
  255. else:
  256. log("product attachment process filemd5 %s has no content"%(_filemd5))
  257. if len(list_text)>0:
  258. _html = getBestProductText(list_text,'',[])
  259. _html = format_text(_html)
  260. _soup = BeautifulSoup(_html,"lxml")
  261. _text = _soup.get_text()
  262. logger.info("extract_parameter_text bid_filemd5s:%s name:%s original_name:%s parameter_text:%s"%(str(list_filemd5),product_name,product_original_name,_text))
  263. dp.setValue(DOCUMENT_PRODUCT_PARAMETER,_text,True)
  264. dp.setValue(DOCUMENT_PRODUCT_PARAMETER_HTML,_html,True)
  265. dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_succeed,True)
  266. dp.setValue(DOCUMENT_PRODUCT_IS_PARAMETER,1,True)
  267. dp.update_row(self.ots_client)
  268. _success = True
  269. break
  270. if not _success:
  271. if not _find:
  272. dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_not_found,True)
  273. dp.update_row(self.ots_client)
  274. else:
  275. dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_failed,True)
  276. dp.update_row(self.ots_client)
  277. except Exception as e:
  278. traceback.print_exc()
  279. finally:
  280. self.product_attachment_processed_queue.put(item.get(DOCUMENT_PRODUCT_ID))
  281. def start_process(self):
  282. self.process_parameters_producer()
  283. thread_count = 7
  284. mt = MultiThreadHandler(self.product_attachment_queue,self.process_parameters_handler,None,thread_count,need_stop=False,restart=True)
  285. mt.run()
  286. def process_parameters_comsumer(self,):
  287. # process_count = 3
  288. # list_process = []
  289. # for i in range(process_count):
  290. # p = Process(target=self.start_process)
  291. # list_process.append(p)
  292. # for p in list_process:
  293. # p.start()
  294. # for p in list_process:
  295. # p.join()
  296. self.start_process()
  297. def start_process_parameters(self):
  298. scheduler = BlockingScheduler()
  299. scheduler.add_job(self.process_parameters_producer,"cron",second="*/20")
  300. scheduler.add_job(self.process_parameters_comsumer,"cron",second="*/30")
  301. scheduler.start()
  302. def start_process_parameters():
  303. pap = Product_Attachment_Processor()
  304. pap.start_process_parameters()
  305. def change_parameters_status():
  306. ots_client =getConnect_ots()
  307. bool_query = BoolQuery(must_queries=[
  308. RangeQuery("parameter_status",-1)
  309. ],
  310. must_not_queries=[
  311. TermQuery("parameter_status",parameter_status_to_process),
  312. TermQuery("parameter_status",parameter_status_process_succeed),
  313. TermQuery("parameter_status",parameter_status_process_jump),
  314. TermQuery("parameter_status",parameter_status_no_bidfile),
  315. TermQuery("parameter_status",parameter_status_not_found),
  316. ])
  317. list_data = []
  318. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  319. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("parameter_status")]),limit=100,get_total_count=True),
  320. ColumnsToGet([DOCUMENT_PRODUCT_ID],return_type=ColumnReturnType.SPECIFIED))
  321. list_data.extend(getRow_ots(rows))
  322. print("total_count",total_count)
  323. while next_token:
  324. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  325. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  326. ColumnsToGet([DOCUMENT_PRODUCT_ID],return_type=ColumnReturnType.SPECIFIED))
  327. list_data.extend(getRow_ots(rows))
  328. from queue import Queue
  329. task_queue = Queue()
  330. for data in list_data:
  331. task_queue.put(data)
  332. def _handle(data,result_queue):
  333. dp = Document_product(data)
  334. dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_to_process,True)
  335. dp.setValue(DOCUMENT_PRODUCT_PARAMETER,"",True)
  336. dp.update_row(ots_client)
  337. mt = MultiThreadHandler(task_queue,_handle,None,30)
  338. mt.run()
  339. if __name__ == '__main__':
  340. # start_process_parameters()
  341. # change_parameters_status()
  342. ots_client = getConnect_ots()
  343. a = Document_product({DOCUMENT_PRODUCT_ID:"00000d8f94ba32d840c21fc9343ce4fb"})
  344. a.fix_columns(ots_client,[DOCUMENT_PRODUCT_PARAMETER,DOCUMENT_PRODUCT_IS_PARAMETER],True)
  345. with open("a.html","w",encoding="utf8") as f:
  346. f.write(a.getProperties().get(DOCUMENT_PRODUCT_PARAMETER))
  347. print(a.getProperties().get(DOCUMENT_PRODUCT_PARAMETER))