123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- #encoding:GBK
- import sys
- import os
- sys.path.append("../")
- import pandas as pd
- from dataSource.source import *
- import json
- from utils.multiThread import MultiThreadHandler
- import queue
- from utils.Utils import *
- from dataSource.pool import ConnectorPool
- import re
- from tablestore import *
- import traceback
- from utils.hashUtil import aesCipher
- from export.exportEnterprise import getDictEnterprise,getOneContact
- def exportAttachment():
- ots_client = getConnect_ots()
- columns = ["path","swfUrls","crtime","link_status","size"]
- bool_query = BoolQuery(must_queries=[
- RangeQuery('crtime','2022-09-18 00:00:00')
- # TermQuery("filetype","swf"),
- # TermQuery("status",10),
- # BoolQuery(must_not_queries=[RangeQuery("link_status",1)])
- ]
- )
- rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
- SearchQuery(bool_query,sort=Sort([FieldSort("crtime",SortOrder.ASC)]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- df_data = {"filemd5":[],"path":[],"crtime":[],"size":[]}
- set_columns = set()
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- for k,v in df_data.items():
- v.append(_dict.get(k))
- _count = len(list_dict)
- while True:
- if not next_token:
- break
- rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- _count += len(list_dict)
- print("%d/%d"%(_count,total_count))
- for _dict in list_dict:
- for k,v in df_data.items():
- v.append(_dict.get(k))
- # if _count>=10000:
- # break
- log("================%d"%(len(df_data["path"])))
- for i in range(len(df_data["path"])):
- for k in df_data.keys():
- print("%s %s"%(df_data["filemd5"][i],df_data["path"][i]))
- df = pd.DataFrame(df_data)
- df.to_excel("../data/%s_attach_export.xlsx"%(getCurrent_date("%Y-%m-%d_%H%M%S")))
- import oss2
- common_bucket = None
- def getBucket():
- global common_bucket
- if common_bucket is None:
- auth = getAuth()
- check_url = "oss-cn-hangzhou-internal.aliyuncs.com"
- if check_net(check_url):
- bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
- else:
- bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
- attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
- log("bucket_url:%s"%(bucket_url))
- attachment_bucket_name = "attachment-hub"
- common_bucket = oss2.Bucket(auth,bucket_url,attachment_bucket_name)
- return common_bucket
- from dataSource.ossUtils import *
- def downloadAtta():
- filename = "½ðÈÚ»ú¹¹Êý¾Ýµ¼³ö.xlsx"
- df = pd.read_excel(filename)
- list_docid = list(set(df["docid"]))
- ots_client = getConnect_ots()
- bucket = getBucket()
- task_queue = queue.Queue()
- for _docid in list_docid:
- task_queue.put(_docid)
- def _handle(item,result_queue,ots_client):
- consumed, return_row, next_token = ots_client.get_row("document",[("partitionkey",int(item%500+1)),("docid",int(item))],["page_attachments"])
- _dict = getRow_ots_primary(return_row)
- _page_attachments = json.loads(_dict.get("page_attachments","[]"))
- _index = 0
- for _pa in _page_attachments:
- _index = 1
- _filemd5 = _pa.get("fileMd5")
- consumed, return_row, next_token = ots_client.get_row("attachment",[("filemd5",_filemd5)],["path","filetype"])
- _dict = getRow_ots_primary(return_row)
- _path = _dict.get("path")
- _filetype = _dict.get("filetype")
- localpath = "attach/%d_%d.%s"%(item,_index,_filetype)
- if os.path.exists(localpath):
- continue
- downloadFile(bucket,_path,localpath)
- mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
- mt.run()
- def analisysFile():
- filename = "ap1.log"
- dict_minute = {}
- _count = 0
- with open(filename,"r") as f:
- while True:
- _line = f.readline()
- if not _line:
- break
- _count += 1
- if re.search("swf of docid",_line) is not None:
- _minute = _line[:16]
- if _minute not in dict_minute:
- dict_minute[_minute] = 0
- dict_minute[_minute] += 1
- log("all line counts:%d"%_count)
- keys = list(dict_minute.keys())
- keys.sort(key=lambda x:x)
- for k in keys:
- print(k,dict_minute[k])
- if __name__=="__main__":
- exportAttachment()
- # downloadAtta()
- # analisysFile()
|