123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- import json
- import logging
- import os
- import queue
- import sys
- import time
- import pandas as pd
- import numpy as np
- sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../../")
- from BaseDataMaintenance.common.ossUtils import downloadFile
- from BaseDataMaintenance.dataSource.source import getConnect_ots
- from BaseDataMaintenance.maintenance.dataflow import Dataflow
- from BaseDataMaintenance.model.ots.attachment import attachment, attachment_filemd5, attachment_status
- from BaseDataMaintenance.model.ots.document import Document, document_docid, document_partitionkey, document_status
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
- def read_file(_file):
- # 读文件
- if _file[-3:] == 'csv':
- df = pd.read_csv(_file)
- elif _file[-4:] == 'xlsx':
- df = pd.read_excel(_file, engine='openpyxl')
- else:
- raise TypeError('Only support csv or xlsx!')
- return df
- def get_md5_path_from_file(md5_path_file, ):
- # 读文件
- df = read_file(md5_path_file)
- # 处理校验md5
- md5_col_name = "filemd5"
- path_col_name = "path"
- if md5_col_name not in df.columns or path_col_name not in df.columns:
- raise AttributeError('Dataframe have no those col! ' + md5_col_name + ' ' + path_col_name)
- md5_path_list = df[[md5_col_name, path_col_name]].values.tolist()
- temp_list = []
- for line in md5_path_list:
- if line in temp_list:
- continue
- md5, path = line
- if md5 in ['', np.nan, None]:
- continue
- if path in ['', np.nan, None]:
- continue
- if len(md5) != 32:
- continue
- temp_list.append(line)
- md5_path_list = temp_list
- md5_path_list.sort(key=lambda x: x[0])
- logging.info('get md5 & path finish!')
- return md5_path_list
- def get_docid_partitionkey_from_file(docid_partitionkey_file):
- # 读文件
- df = read_file(docid_partitionkey_file)
- # 处理校验docid, partitionkey
- docid_col_name = "docid"
- partitionkey_col_name = "partitionkey"
- page_attachments_col_name = 'page_attachments'
- if docid_col_name not in df.columns or partitionkey_col_name not in df.columns:
- raise AttributeError('Dataframe have no those col! ' + docid_col_name + ' ' + partitionkey_col_name)
- docid_md5_dict = {}
- if page_attachments_col_name in df.columns:
- docid_md5_list = df[[docid_col_name, page_attachments_col_name]].values.tolist()
- for docid, page_attachments in docid_md5_list:
- if docid in ['', np.nan, None]:
- continue
- if page_attachments in ['', np.nan, None]:
- continue
- try:
- docid = int(docid)
- atts = json.loads(page_attachments)
- except:
- continue
- md5_list = []
- for att in atts:
- md5_list.append(att.get('fileMd5'))
- docid_md5_dict[docid] = md5_list
- docid_partitionkey_list = df[[docid_col_name, partitionkey_col_name]].values.tolist()
- temp_list = []
- for line in docid_partitionkey_list:
- if line in temp_list:
- continue
- docid, partitionkey = line
- if docid in ['', np.nan, None]:
- continue
- if partitionkey in ['', np.nan, None]:
- continue
- try:
- docid = int(docid)
- partitionkey = int(partitionkey)
- line = [docid, partitionkey]
- except:
- continue
- temp_list.append(line)
- docid_partitionkey_list = temp_list
- docid_partitionkey_list.sort(key=lambda x: x[0])
- logging.info('get docid & partitionkey finish!')
- return docid_partitionkey_list, docid_md5_dict
- def download_attachment(md5_path_list):
- flow = Dataflow()
- bucket = flow.bucket
- success_md5_list = []
- continue_md5_list = []
- start_time = time.time()
- index = 0
- for md5, obj_path in md5_path_list:
- if index % 100 == 0:
- logging.info('Loop ' + str(index) + ' ' + str(time.time()-start_time))
- start_time = time.time()
- index += 1
- # 设置路径
- relative_path = obj_path[5:].replace("//","/")
- localpath = "/FileInfo/%s"%(relative_path)
- if not os.path.exists(localpath):
- if not os.path.exists(os.path.dirname(localpath)):
- os.makedirs(os.path.dirname(localpath))
- else:
- logging.info('md5 continue ' + md5 + ' ' + obj_path)
- continue_md5_list.append(md5)
- continue
- # 下载
- try:
- download_success = downloadFile(bucket, obj_path, localpath)
- except Exception as e:
- download_success = False
- if download_success:
- success_md5_list.append(md5)
- else:
- logging.info('download failed! ' + md5, obj_path)
- logging.info('download attachments finish! all/success/continue/fail ' + ' ' +
- str(len(md5_path_list)) + '/' + str(len(success_md5_list)) + '/' + str(len(continue_md5_list)) + '/' + str(len(md5_path_list)+len(continue_md5_list)-len(success_md5_list)))
- return success_md5_list
- flow = Dataflow()
- bucket = flow.bucket
- def download_attachment_mp(args, queue):
- md5, obj_path = args
- # 设置路径
- relative_path = obj_path[5:].replace("//", "/")
- localpath = "/FileInfo/%s" % (relative_path)
- try:
- if not os.path.exists(localpath):
- if not os.path.exists(os.path.dirname(localpath)):
- os.makedirs(os.path.dirname(localpath))
- else:
- logging.info('md5 continue ' + md5 + ' ' + obj_path)
- return
- except:
- pass
- # 下载
- try:
- download_success = downloadFile(bucket, obj_path, localpath)
- except Exception as e:
- download_success = False
- def set_status(docid_list, md5_list):
- ots_client = getConnect_ots()
- index = 0
- start_time = time.time()
- for md5 in md5_list:
- if index % 1000 == 0:
- logging.info('Loop ' + str(index) + ' ' + str(time.time()-start_time))
- start_time = time.time()
- ots_attach = attachment({attachment_filemd5: md5, attachment_status: 0})
- ots_attach.update_row(ots_client)
- index += 1
- logging.info('update attachment status finish!')
- index = 0
- start_time = time.time()
- for docid, p_key in docid_list:
- if index % 1000 == 0:
- logging.info('Loop ' + str(index) + ' ' + str(time.time()-start_time))
- start_time = time.time()
- ots_doc = Document({document_docid: docid, document_partitionkey: p_key, document_status: 0})
- ots_doc.update_row(ots_client)
- index += 1
- logging.info('update document status finish!')
- ots_client = getConnect_ots()
- def set_status_document(args, queue):
- docid, p_key = args
- # print(docid, p_key)
- ots_doc = Document({document_docid: docid, document_partitionkey: p_key, document_status: 1})
- ots_doc.update_row(ots_client)
- def set_status_attachment(args, queue):
- md5, path = args
- # print(md5, path)
- ots_attach = attachment({attachment_filemd5: md5, attachment_status: 0})
- ots_attach.update_row(ots_client)
- def rerun_main(file_path, set_docid=True, set_md5=True):
- # 分割两个文件名
- docid_file_path, md5_file_path = file_path.split(',')
- logging.info('file ' + docid_file_path + ' ' + md5_file_path)
- # 读取文件
- docid_key_list = []
- docid_md5_dict = {}
- if set_docid:
- docid_key_list, docid_md5_dict = get_docid_partitionkey_from_file(docid_file_path)
- print('len(docid_key_list)', len(docid_key_list))
- print(docid_key_list[:10])
- md5_path_list = []
- if set_md5:
- md5_path_list = get_md5_path_from_file(md5_file_path)
- print('len(md5_path_list)', len(md5_path_list))
- print(md5_path_list[:10])
- # 多线程执行
- batch = 100
- start_index = 1800
- used_md5_list = []
- if md5_path_list and docid_key_list:
- start_time = time.time()
- for i in range(start_index, len(docid_key_list), batch):
- logging.info('Start batch ' + str(i) + ' to ' + str(i+batch))
- # 取一个batch的docid
- if i + batch < len(docid_key_list):
- sub_docid_key_list = docid_key_list[i:i+batch]
- else:
- sub_docid_key_list = docid_key_list[i:]
- # 读取这些docid对应的MD5
- sub_md5_list = []
- for docid, key in sub_docid_key_list:
- if docid_md5_dict.get(docid):
- sub_md5_list += docid_md5_dict.get(docid)
- # 取存在MD5的path
- sub_md5_path_list = []
- for key in md5_path_list:
- if key[0] in sub_md5_list and key[0] not in used_md5_list:
- sub_md5_path_list.append(key)
- used_md5_list.append(key[0])
- # 根据path下载attachment
- task_queue = queue.Queue()
- result_queue = queue.Queue()
- for k in sub_md5_path_list:
- task_queue.put(k)
- a = MultiThreadHandler(task_queue=task_queue, task_handler=download_attachment_mp,
- result_queue=result_queue, thread_count=4)
- a.run()
- # set attachment status
- task_queue = queue.Queue()
- result_queue = queue.Queue()
- for k in sub_md5_path_list:
- task_queue.put(k)
- a = MultiThreadHandler(task_queue=task_queue, task_handler=set_status_attachment,
- result_queue=result_queue, thread_count=10)
- a.run()
- # set document status
- task_queue = queue.Queue()
- result_queue = queue.Queue()
- for k in sub_docid_key_list:
- task_queue.put(k)
- a = MultiThreadHandler(task_queue=task_queue, task_handler=set_status_document,
- result_queue=result_queue, thread_count=10)
- a.run()
- logging.info('Finish batch ' + str(i) + ' to ' + str(i+batch) + ' ' + str(time.time()-start_time))
- time.sleep(10)
- if __name__ == '__main__':
- # docid_path = 'D:\\BIDI_DOC\\比地_文档\\重跑附件乱码_240104.xlsx'
- # md5_path = 'D:\\BIDI_DOC\\比地_文档\\重跑附件乱码md5_240104_3.xlsx'
- docid_path = '/data/python/BaseDataMaintenance/重跑附件乱码_240104.xlsx'
- md5_path = '/data/python/BaseDataMaintenance/重跑附件乱码md5_240104_3.xlsx'
- rerun_main(docid_path + ',' + md5_path)
|