download_attachment_and_set_status_rerun.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. import json
  2. import logging
  3. import os
  4. import queue
  5. import sys
  6. import time
  7. import pandas as pd
  8. import numpy as np
  9. sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../../")
  10. from BaseDataMaintenance.common.ossUtils import downloadFile
  11. from BaseDataMaintenance.dataSource.source import getConnect_ots
  12. from BaseDataMaintenance.maintenance.dataflow import Dataflow
  13. from BaseDataMaintenance.model.ots.attachment import attachment, attachment_filemd5, attachment_status
  14. from BaseDataMaintenance.model.ots.document import Document, document_docid, document_partitionkey, document_status
  15. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  16. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
  17. def read_file(_file):
  18. # 读文件
  19. if _file[-3:] == 'csv':
  20. df = pd.read_csv(_file)
  21. elif _file[-4:] == 'xlsx':
  22. df = pd.read_excel(_file, engine='openpyxl')
  23. else:
  24. raise TypeError('Only support csv or xlsx!')
  25. return df
  26. def get_md5_path_from_file(md5_path_file, ):
  27. # 读文件
  28. df = read_file(md5_path_file)
  29. # 处理校验md5
  30. md5_col_name = "filemd5"
  31. path_col_name = "path"
  32. if md5_col_name not in df.columns or path_col_name not in df.columns:
  33. raise AttributeError('Dataframe have no those col! ' + md5_col_name + ' ' + path_col_name)
  34. md5_path_list = df[[md5_col_name, path_col_name]].values.tolist()
  35. temp_list = []
  36. for line in md5_path_list:
  37. if line in temp_list:
  38. continue
  39. md5, path = line
  40. if md5 in ['', np.nan, None]:
  41. continue
  42. if path in ['', np.nan, None]:
  43. continue
  44. if len(md5) != 32:
  45. continue
  46. temp_list.append(line)
  47. md5_path_list = temp_list
  48. md5_path_list.sort(key=lambda x: x[0])
  49. logging.info('get md5 & path finish!')
  50. return md5_path_list
  51. def get_docid_partitionkey_from_file(docid_partitionkey_file):
  52. # 读文件
  53. df = read_file(docid_partitionkey_file)
  54. # 处理校验docid, partitionkey
  55. docid_col_name = "docid"
  56. partitionkey_col_name = "partitionkey"
  57. page_attachments_col_name = 'page_attachments'
  58. if docid_col_name not in df.columns or partitionkey_col_name not in df.columns:
  59. raise AttributeError('Dataframe have no those col! ' + docid_col_name + ' ' + partitionkey_col_name)
  60. docid_md5_dict = {}
  61. if page_attachments_col_name in df.columns:
  62. docid_md5_list = df[[docid_col_name, page_attachments_col_name]].values.tolist()
  63. for docid, page_attachments in docid_md5_list:
  64. if docid in ['', np.nan, None]:
  65. continue
  66. if page_attachments in ['', np.nan, None]:
  67. continue
  68. try:
  69. docid = int(docid)
  70. atts = json.loads(page_attachments)
  71. except:
  72. continue
  73. md5_list = []
  74. for att in atts:
  75. md5_list.append(att.get('fileMd5'))
  76. docid_md5_dict[docid] = md5_list
  77. docid_partitionkey_list = df[[docid_col_name, partitionkey_col_name]].values.tolist()
  78. temp_list = []
  79. for line in docid_partitionkey_list:
  80. if line in temp_list:
  81. continue
  82. docid, partitionkey = line
  83. if docid in ['', np.nan, None]:
  84. continue
  85. if partitionkey in ['', np.nan, None]:
  86. continue
  87. try:
  88. docid = int(docid)
  89. partitionkey = int(partitionkey)
  90. line = [docid, partitionkey]
  91. except:
  92. continue
  93. temp_list.append(line)
  94. docid_partitionkey_list = temp_list
  95. docid_partitionkey_list.sort(key=lambda x: x[0])
  96. logging.info('get docid & partitionkey finish!')
  97. return docid_partitionkey_list, docid_md5_dict
  98. def download_attachment(md5_path_list):
  99. flow = Dataflow()
  100. bucket = flow.bucket
  101. success_md5_list = []
  102. continue_md5_list = []
  103. start_time = time.time()
  104. index = 0
  105. for md5, obj_path in md5_path_list:
  106. if index % 100 == 0:
  107. logging.info('Loop ' + str(index) + ' ' + str(time.time()-start_time))
  108. start_time = time.time()
  109. index += 1
  110. # 设置路径
  111. relative_path = obj_path[5:].replace("//","/")
  112. localpath = "/FileInfo/%s"%(relative_path)
  113. if not os.path.exists(localpath):
  114. if not os.path.exists(os.path.dirname(localpath)):
  115. os.makedirs(os.path.dirname(localpath))
  116. else:
  117. logging.info('md5 continue ' + md5 + ' ' + obj_path)
  118. continue_md5_list.append(md5)
  119. continue
  120. # 下载
  121. try:
  122. download_success = downloadFile(bucket, obj_path, localpath)
  123. except Exception as e:
  124. download_success = False
  125. if download_success:
  126. success_md5_list.append(md5)
  127. else:
  128. logging.info('download failed! ' + md5, obj_path)
  129. logging.info('download attachments finish! all/success/continue/fail ' + ' ' +
  130. str(len(md5_path_list)) + '/' + str(len(success_md5_list)) + '/' + str(len(continue_md5_list)) + '/' + str(len(md5_path_list)+len(continue_md5_list)-len(success_md5_list)))
  131. return success_md5_list
  132. flow = Dataflow()
  133. bucket = flow.bucket
  134. def download_attachment_mp(args, queue):
  135. md5, obj_path = args
  136. # 设置路径
  137. relative_path = obj_path[5:].replace("//", "/")
  138. localpath = "/FileInfo/%s" % (relative_path)
  139. try:
  140. if not os.path.exists(localpath):
  141. if not os.path.exists(os.path.dirname(localpath)):
  142. os.makedirs(os.path.dirname(localpath))
  143. else:
  144. logging.info('md5 continue ' + md5 + ' ' + obj_path)
  145. return
  146. except:
  147. pass
  148. # 下载
  149. try:
  150. download_success = downloadFile(bucket, obj_path, localpath)
  151. except Exception as e:
  152. download_success = False
  153. def set_status(docid_list, md5_list):
  154. ots_client = getConnect_ots()
  155. index = 0
  156. start_time = time.time()
  157. for md5 in md5_list:
  158. if index % 1000 == 0:
  159. logging.info('Loop ' + str(index) + ' ' + str(time.time()-start_time))
  160. start_time = time.time()
  161. ots_attach = attachment({attachment_filemd5: md5, attachment_status: 0})
  162. ots_attach.update_row(ots_client)
  163. index += 1
  164. logging.info('update attachment status finish!')
  165. index = 0
  166. start_time = time.time()
  167. for docid, p_key in docid_list:
  168. if index % 1000 == 0:
  169. logging.info('Loop ' + str(index) + ' ' + str(time.time()-start_time))
  170. start_time = time.time()
  171. ots_doc = Document({document_docid: docid, document_partitionkey: p_key, document_status: 0})
  172. ots_doc.update_row(ots_client)
  173. index += 1
  174. logging.info('update document status finish!')
  175. ots_client = getConnect_ots()
  176. def set_status_document(args, queue):
  177. docid, p_key = args
  178. # print(docid, p_key)
  179. ots_doc = Document({document_docid: docid, document_partitionkey: p_key, document_status: 1})
  180. ots_doc.update_row(ots_client)
  181. def set_status_attachment(args, queue):
  182. md5, path = args
  183. # print(md5, path)
  184. ots_attach = attachment({attachment_filemd5: md5, attachment_status: 0})
  185. ots_attach.update_row(ots_client)
  186. def rerun_main(file_path, set_docid=True, set_md5=True):
  187. # 分割两个文件名
  188. docid_file_path, md5_file_path = file_path.split(',')
  189. logging.info('file ' + docid_file_path + ' ' + md5_file_path)
  190. # 读取文件
  191. docid_key_list = []
  192. docid_md5_dict = {}
  193. if set_docid:
  194. docid_key_list, docid_md5_dict = get_docid_partitionkey_from_file(docid_file_path)
  195. print('len(docid_key_list)', len(docid_key_list))
  196. print(docid_key_list[:10])
  197. md5_path_list = []
  198. if set_md5:
  199. md5_path_list = get_md5_path_from_file(md5_file_path)
  200. print('len(md5_path_list)', len(md5_path_list))
  201. print(md5_path_list[:10])
  202. # 多线程执行
  203. batch = 100
  204. start_index = 1800
  205. used_md5_list = []
  206. if md5_path_list and docid_key_list:
  207. start_time = time.time()
  208. for i in range(start_index, len(docid_key_list), batch):
  209. logging.info('Start batch ' + str(i) + ' to ' + str(i+batch))
  210. # 取一个batch的docid
  211. if i + batch < len(docid_key_list):
  212. sub_docid_key_list = docid_key_list[i:i+batch]
  213. else:
  214. sub_docid_key_list = docid_key_list[i:]
  215. # 读取这些docid对应的MD5
  216. sub_md5_list = []
  217. for docid, key in sub_docid_key_list:
  218. if docid_md5_dict.get(docid):
  219. sub_md5_list += docid_md5_dict.get(docid)
  220. # 取存在MD5的path
  221. sub_md5_path_list = []
  222. for key in md5_path_list:
  223. if key[0] in sub_md5_list and key[0] not in used_md5_list:
  224. sub_md5_path_list.append(key)
  225. used_md5_list.append(key[0])
  226. # 根据path下载attachment
  227. task_queue = queue.Queue()
  228. result_queue = queue.Queue()
  229. for k in sub_md5_path_list:
  230. task_queue.put(k)
  231. a = MultiThreadHandler(task_queue=task_queue, task_handler=download_attachment_mp,
  232. result_queue=result_queue, thread_count=4)
  233. a.run()
  234. # set attachment status
  235. task_queue = queue.Queue()
  236. result_queue = queue.Queue()
  237. for k in sub_md5_path_list:
  238. task_queue.put(k)
  239. a = MultiThreadHandler(task_queue=task_queue, task_handler=set_status_attachment,
  240. result_queue=result_queue, thread_count=10)
  241. a.run()
  242. # set document status
  243. task_queue = queue.Queue()
  244. result_queue = queue.Queue()
  245. for k in sub_docid_key_list:
  246. task_queue.put(k)
  247. a = MultiThreadHandler(task_queue=task_queue, task_handler=set_status_document,
  248. result_queue=result_queue, thread_count=10)
  249. a.run()
  250. logging.info('Finish batch ' + str(i) + ' to ' + str(i+batch) + ' ' + str(time.time()-start_time))
  251. time.sleep(10)
  252. if __name__ == '__main__':
  253. # docid_path = 'D:\\BIDI_DOC\\比地_文档\\重跑附件乱码_240104.xlsx'
  254. # md5_path = 'D:\\BIDI_DOC\\比地_文档\\重跑附件乱码md5_240104_3.xlsx'
  255. docid_path = '/data/python/BaseDataMaintenance/重跑附件乱码_240104.xlsx'
  256. md5_path = '/data/python/BaseDataMaintenance/重跑附件乱码md5_240104_3.xlsx'
  257. rerun_main(docid_path + ',' + md5_path)