소스 검색

预下载附件及设置status流程

fangjiasheng 1 년 전
부모
커밋
53e239c30c
2개의 변경된 파일309개의 추가작업 그리고 1개의 파일을 삭제
  1. 1 1
      BaseDataMaintenance/common/multiThread.py
  2. 308 0
      BaseDataMaintenance/maintenance/document/download_attachment_and_set_status_rerun.py

+ 1 - 1
BaseDataMaintenance/common/multiThread.py

@@ -46,7 +46,7 @@ class _taskHandler(threading.Thread):
     def run(self):
         while(True):
             try:
-                logging.info("handler task queue size is %d need_stop %s thread_id:%d"%(self.task_queue.qsize(),str(self.need_stop),threading.get_ident()))
+                logging.info("%s handler task queue size is %d need_stop %s thread_id:%d"%(self.task_handler.__name__, self.task_queue.qsize(),str(self.need_stop),threading.get_ident()))
                 item = self.task_queue.get(True,timeout=5)
 
                 self.task_handler(item,self.result_queue,*self.args,**self.kwargs)

+ 308 - 0
BaseDataMaintenance/maintenance/document/download_attachment_and_set_status_rerun.py

@@ -0,0 +1,308 @@
+import json
+import logging
+import os
+import queue
+import sys
+import time
+import pandas as pd
+import numpy as np
+
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+
+sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../../")
+from BaseDataMaintenance.common.ossUtils import downloadFile
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+from BaseDataMaintenance.maintenance.dataflow import Dataflow
+from BaseDataMaintenance.model.ots.attachment import attachment, attachment_filemd5, attachment_status
+from BaseDataMaintenance.model.ots.document import Document, document_docid, document_partitionkey, document_status
+
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+
+
+def read_file(_file):
+    # 读文件
+    if _file[-3:] == 'csv':
+        df = pd.read_csv(_file)
+    elif _file[-4:] == 'xlsx':
+        df = pd.read_excel(_file, engine='openpyxl')
+    else:
+        raise TypeError('Only support csv or xlsx!')
+    return df
+
+
+def get_md5_path_from_file(md5_path_file, ):
+    # 读文件
+    df = read_file(md5_path_file)
+
+    # 处理校验md5
+    md5_col_name = "filemd5"
+    path_col_name = "path"
+    if md5_col_name not in df.columns or path_col_name not in df.columns:
+        raise AttributeError('Dataframe have no those col! ' + md5_col_name + ' ' + path_col_name)
+
+    md5_path_list = df[[md5_col_name, path_col_name]].values.tolist()
+    temp_list = []
+    for line in md5_path_list:
+        if line in temp_list:
+            continue
+
+        md5, path = line
+        if md5 in ['', np.nan, None]:
+            continue
+        if path in ['', np.nan, None]:
+            continue
+        if len(md5) != 32:
+            continue
+        temp_list.append(line)
+    md5_path_list = temp_list
+    md5_path_list.sort(key=lambda x: x[0])
+    logging.info('get md5 & path finish!')
+    return md5_path_list
+
+
+def get_docid_partitionkey_from_file(docid_partitionkey_file):
+    # 读文件
+    df = read_file(docid_partitionkey_file)
+
+    # 处理校验docid, partitionkey
+    docid_col_name = "docid"
+    partitionkey_col_name = "partitionkey"
+    page_attachments_col_name = 'page_attachments'
+    if docid_col_name not in df.columns or partitionkey_col_name not in df.columns:
+        raise AttributeError('Dataframe have no those col! ' + docid_col_name + ' ' + partitionkey_col_name)
+
+    docid_md5_dict = {}
+    if page_attachments_col_name in df.columns:
+        docid_md5_list = df[[docid_col_name, page_attachments_col_name]].values.tolist()
+        for docid, page_attachments in docid_md5_list:
+            if docid in ['', np.nan, None]:
+                continue
+            if page_attachments in ['', np.nan, None]:
+                continue
+            try:
+                docid = int(docid)
+                atts = json.loads(page_attachments)
+            except:
+                continue
+
+            md5_list = []
+            for att in atts:
+                md5_list.append(att.get('fileMd5'))
+            docid_md5_dict[docid] = md5_list
+
+    docid_partitionkey_list = df[[docid_col_name, partitionkey_col_name]].values.tolist()
+    temp_list = []
+    for line in docid_partitionkey_list:
+        if line in temp_list:
+            continue
+
+        docid, partitionkey = line
+        if docid in ['', np.nan, None]:
+            continue
+        if partitionkey in ['', np.nan, None]:
+            continue
+        try:
+            docid = int(docid)
+            partitionkey = int(partitionkey)
+            line = [docid, partitionkey]
+        except:
+            continue
+
+        temp_list.append(line)
+    docid_partitionkey_list = temp_list
+    docid_partitionkey_list.sort(key=lambda x: x[0])
+    logging.info('get docid & partitionkey finish!')
+    return docid_partitionkey_list, docid_md5_dict
+
+
+def download_attachment(md5_path_list):
+    flow = Dataflow()
+    bucket = flow.bucket
+
+    success_md5_list = []
+    continue_md5_list = []
+    start_time = time.time()
+    index = 0
+    for md5, obj_path in md5_path_list:
+        if index % 100 == 0:
+            logging.info('Loop ' + str(index) + ' ' + str(time.time()-start_time))
+            start_time = time.time()
+        index += 1
+
+        # 设置路径
+        relative_path = obj_path[5:].replace("//","/")
+        localpath = "/FileInfo/%s"%(relative_path)
+        if not os.path.exists(localpath):
+            if not os.path.exists(os.path.dirname(localpath)):
+                os.makedirs(os.path.dirname(localpath))
+        else:
+            logging.info('md5 continue ' + md5 + ' ' + obj_path)
+            continue_md5_list.append(md5)
+            continue
+
+        # 下载
+        try:
+            download_success = downloadFile(bucket, obj_path, localpath)
+        except Exception as e:
+            download_success = False
+
+        if download_success:
+            success_md5_list.append(md5)
+        else:
+            logging.info('download failed! ' + md5, obj_path)
+
+    logging.info('download attachments finish! all/success/continue/fail ' + ' ' +
+          str(len(md5_path_list)) + '/' + str(len(success_md5_list)) + '/' + str(len(continue_md5_list)) + '/' + str(len(md5_path_list)+len(continue_md5_list)-len(success_md5_list)))
+
+    return success_md5_list
+
+
+flow = Dataflow()
+bucket = flow.bucket
+def download_attachment_mp(args, queue):
+    md5, obj_path = args
+    # 设置路径
+    relative_path = obj_path[5:].replace("//","/")
+    localpath = "/FileInfo/%s"%(relative_path)
+    if not os.path.exists(localpath):
+        if not os.path.exists(os.path.dirname(localpath)):
+            os.makedirs(os.path.dirname(localpath))
+    else:
+        logging.info('md5 continue ' + md5 + ' ' + obj_path)
+        return
+
+    # 下载
+    try:
+        download_success = downloadFile(bucket, obj_path, localpath)
+    except Exception as e:
+        download_success = False
+
+
+def set_status(docid_list, md5_list):
+    ots_client = getConnect_ots()
+
+    index = 0
+    start_time = time.time()
+    for md5 in md5_list:
+        if index % 1000 == 0:
+            logging.info('Loop ' + str(index) + ' ' + str(time.time()-start_time))
+            start_time = time.time()
+        ots_attach = attachment({attachment_filemd5: md5, attachment_status: 0})
+        ots_attach.update_row(ots_client)
+        index += 1
+
+    logging.info('update attachment status finish!')
+
+    index = 0
+    start_time = time.time()
+    for docid, p_key in docid_list:
+        if index % 1000 == 0:
+            logging.info('Loop ' + str(index) + ' ' + str(time.time()-start_time))
+            start_time = time.time()
+        ots_doc = Document({document_docid: docid, document_partitionkey: p_key, document_status: 0})
+        ots_doc.update_row(ots_client)
+        index += 1
+
+    logging.info('update document status finish!')
+
+
+ots_client = getConnect_ots()
+def set_status_document(args, queue):
+    docid, p_key = args
+    # print(docid, p_key)
+    ots_doc = Document({document_docid: docid, document_partitionkey: p_key, document_status: 1})
+    # ots_doc.update_row(ots_client)
+
+
+def set_status_attachment(args, queue):
+    md5, path = args
+    # print(md5, path)
+    ots_attach = attachment({attachment_filemd5: md5, attachment_status: 0})
+    # ots_attach.update_row(ots_client)
+
+
+def rerun_main(file_path, set_docid=True, set_md5=True):
+    # 分割两个文件名
+    docid_file_path, md5_file_path = file_path.split(',')
+    logging.info('file ' + docid_file_path + ' ' + md5_file_path)
+
+    # 读取文件
+    docid_key_list = []
+    docid_md5_dict = {}
+    if set_docid:
+        docid_key_list, docid_md5_dict = get_docid_partitionkey_from_file(docid_file_path)
+        print('len(docid_key_list)', len(docid_key_list))
+        print(docid_key_list[:10])
+    md5_path_list = []
+    if set_md5:
+        md5_path_list = get_md5_path_from_file(md5_file_path)
+        print('len(md5_path_list)', len(md5_path_list))
+        print(md5_path_list[:10])
+
+    # 多线程执行
+    batch = 1000
+    used_md5_list = []
+    if md5_path_list and docid_key_list:
+        start_time = time.time()
+        for i in range(0, len(docid_key_list), batch):
+            logging.info('Start batch ' + str(i) + ' to ' + str(i+batch))
+
+            # 取一个batch的docid
+            if i + batch < len(docid_key_list):
+                sub_docid_key_list = docid_key_list[i:i+batch]
+            else:
+                sub_docid_key_list = docid_key_list[i:]
+
+            # 读取这些docid对应的MD5
+            sub_md5_list = []
+            for docid, key in sub_docid_key_list:
+                if docid_md5_dict.get(docid):
+                    sub_md5_list += docid_md5_dict.get(docid)
+
+            # 取存在MD5的path
+            sub_md5_path_list = []
+            for key in md5_path_list:
+                if key[0] in sub_md5_list and key[0] not in used_md5_list:
+                    sub_md5_path_list.append(key)
+                    used_md5_list.append(key[0])
+
+            # 根据path下载attachment
+            task_queue = queue.Queue()
+            result_queue = queue.Queue()
+            for k in sub_md5_path_list:
+                task_queue.put(k)
+            a = MultiThreadHandler(task_queue=task_queue, task_handler=download_attachment_mp,
+                                   result_queue=result_queue, thread_count=1)
+            a.run()
+
+            # set attachment status
+            task_queue = queue.Queue()
+            result_queue = queue.Queue()
+            for k in sub_md5_path_list:
+                task_queue.put(k)
+            a = MultiThreadHandler(task_queue=task_queue, task_handler=set_status_attachment,
+                                   result_queue=result_queue, thread_count=10)
+            a.run()
+
+            # set document status
+            task_queue = queue.Queue()
+            result_queue = queue.Queue()
+            for k in docid_key_list:
+                task_queue.put(k)
+            a = MultiThreadHandler(task_queue=task_queue, task_handler=set_status_document,
+                                   result_queue=result_queue, thread_count=10)
+            a.run()
+            logging.info('Finish batch ' + str(i) + ' to ' + str(i+batch) + ' ' + str(time.time()-start_time))
+            time.sleep(5)
+            break
+
+
+if __name__ == '__main__':
+    # docid_path = 'D:\\BIDI_DOC\\比地_文档\\重跑附件乱码_240104.xlsx'
+    # md5_path = 'D:\\BIDI_DOC\\比地_文档\\重跑附件乱码md5_240104_3.xlsx'
+    docid_path = '/data/python/BaseDataMaintenance/重跑附件乱码_240104.xlsx'
+    md5_path = '/data/python/BaseDataMaintenance/重跑附件乱码md5_240104_3.xlsx'
+
+    rerun_main(docid_path + ',' + md5_path)
+
+