#encoding:GBK import sys import os sys.path.append("../") import pandas as pd from dataSource.source import * import json from utils.multiThread import MultiThreadHandler import queue from utils.Utils import * from dataSource.pool import ConnectorPool import re from tablestore import * import traceback from utils.hashUtil import aesCipher from export.exportEnterprise import getDictEnterprise,getOneContact def exportAttachment(): ots_client = getConnect_ots() columns = ["path","swfUrls","crtime","link_status","size"] bool_query = BoolQuery(must_queries=[ RangeQuery('crtime','2022-09-18 00:00:00') # TermQuery("filetype","swf"), # TermQuery("status",10), # BoolQuery(must_not_queries=[RangeQuery("link_status",1)]) ] ) rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index", SearchQuery(bool_query,sort=Sort([FieldSort("crtime",SortOrder.ASC)]),limit=100,get_total_count=True), columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED)) df_data = {"filemd5":[],"path":[],"crtime":[],"size":[]} set_columns = set() list_dict = getRow_ots(rows) for _dict in list_dict: for k,v in df_data.items(): v.append(_dict.get(k)) _count = len(list_dict) while True: if not next_token: break rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index", SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True), columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED)) list_dict = getRow_ots(rows) _count += len(list_dict) print("%d/%d"%(_count,total_count)) for _dict in list_dict: for k,v in df_data.items(): v.append(_dict.get(k)) # if _count>=10000: # break log("================%d"%(len(df_data["path"]))) for i in range(len(df_data["path"])): for k in df_data.keys(): print("%s %s"%(df_data["filemd5"][i],df_data["path"][i])) df = pd.DataFrame(df_data) df.to_excel("../data/%s_attach_export.xlsx"%(getCurrent_date("%Y-%m-%d_%H%M%S"))) import oss2 common_bucket = None def getBucket(): global common_bucket if common_bucket is None: auth = getAuth() check_url = "oss-cn-hangzhou-internal.aliyuncs.com" if check_net(check_url): bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com" else: bucket_url = "http://oss-cn-hangzhou.aliyuncs.com" attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/" log("bucket_url:%s"%(bucket_url)) attachment_bucket_name = "attachment-hub" common_bucket = oss2.Bucket(auth,bucket_url,attachment_bucket_name) return common_bucket from dataSource.ossUtils import * def downloadAtta(): filename = "金融机构数据导出.xlsx" df = pd.read_excel(filename) list_docid = list(set(df["docid"])) ots_client = getConnect_ots() bucket = getBucket() task_queue = queue.Queue() for _docid in list_docid: task_queue.put(_docid) def _handle(item,result_queue,ots_client): consumed, return_row, next_token = ots_client.get_row("document",[("partitionkey",int(item%500+1)),("docid",int(item))],["page_attachments"]) _dict = getRow_ots_primary(return_row) _page_attachments = json.loads(_dict.get("page_attachments","[]")) _index = 0 for _pa in _page_attachments: _index = 1 _filemd5 = _pa.get("fileMd5") consumed, return_row, next_token = ots_client.get_row("attachment",[("filemd5",_filemd5)],["path","filetype"]) _dict = getRow_ots_primary(return_row) _path = _dict.get("path") _filetype = _dict.get("filetype") localpath = "attach/%d_%d.%s"%(item,_index,_filetype) if os.path.exists(localpath): continue downloadFile(bucket,_path,localpath) mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client) mt.run() def analisysFile(): filename = "ap1.log" dict_minute = {} _count = 0 with open(filename,"r") as f: while True: _line = f.readline() if not _line: break _count += 1 if re.search("swf of docid",_line) is not None: _minute = _line[:16] if _minute not in dict_minute: dict_minute[_minute] = 0 dict_minute[_minute] += 1 log("all line counts:%d"%_count) keys = list(dict_minute.keys()) keys.sort(key=lambda x:x) for k in keys: print(k,dict_minute[k]) if __name__=="__main__": exportAttachment() # downloadAtta() # analisysFile()