exportAttachment.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. #encoding:GBK
  2. import sys
  3. import os
  4. sys.path.append("../")
  5. import pandas as pd
  6. from dataSource.source import *
  7. import json
  8. from utils.multiThread import MultiThreadHandler
  9. import queue
  10. from utils.Utils import *
  11. from dataSource.pool import ConnectorPool
  12. import re
  13. from tablestore import *
  14. import traceback
  15. from utils.hashUtil import aesCipher
  16. from export.exportEnterprise import getDictEnterprise,getOneContact
  17. def exportAttachment():
  18. ots_client = getConnect_ots()
  19. columns = ["path","swfUrls","crtime","link_status","size"]
  20. bool_query = BoolQuery(must_queries=[
  21. RangeQuery('crtime','2022-09-18 00:00:00')
  22. # TermQuery("filetype","swf"),
  23. # TermQuery("status",10),
  24. # BoolQuery(must_not_queries=[RangeQuery("link_status",1)])
  25. ]
  26. )
  27. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  28. SearchQuery(bool_query,sort=Sort([FieldSort("crtime",SortOrder.ASC)]),limit=100,get_total_count=True),
  29. columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  30. df_data = {"filemd5":[],"path":[],"crtime":[],"size":[]}
  31. set_columns = set()
  32. list_dict = getRow_ots(rows)
  33. for _dict in list_dict:
  34. for k,v in df_data.items():
  35. v.append(_dict.get(k))
  36. _count = len(list_dict)
  37. while True:
  38. if not next_token:
  39. break
  40. rows,next_token,total_count,is_all_succeed = ots_client.search("attachment","attachment_index",
  41. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  42. columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  43. list_dict = getRow_ots(rows)
  44. _count += len(list_dict)
  45. print("%d/%d"%(_count,total_count))
  46. for _dict in list_dict:
  47. for k,v in df_data.items():
  48. v.append(_dict.get(k))
  49. # if _count>=10000:
  50. # break
  51. log("================%d"%(len(df_data["path"])))
  52. for i in range(len(df_data["path"])):
  53. for k in df_data.keys():
  54. print("%s %s"%(df_data["filemd5"][i],df_data["path"][i]))
  55. df = pd.DataFrame(df_data)
  56. df.to_excel("../data/%s_attach_export.xlsx"%(getCurrent_date("%Y-%m-%d_%H%M%S")))
  57. import oss2
  58. common_bucket = None
  59. def getBucket():
  60. global common_bucket
  61. if common_bucket is None:
  62. auth = getAuth()
  63. check_url = "oss-cn-hangzhou-internal.aliyuncs.com"
  64. if check_net(check_url):
  65. bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  66. else:
  67. bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
  68. attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
  69. log("bucket_url:%s"%(bucket_url))
  70. attachment_bucket_name = "attachment-hub"
  71. common_bucket = oss2.Bucket(auth,bucket_url,attachment_bucket_name)
  72. return common_bucket
  73. from dataSource.ossUtils import *
  74. def downloadAtta():
  75. filename = "½ðÈÚ»ú¹¹Êý¾Ýµ¼³ö.xlsx"
  76. df = pd.read_excel(filename)
  77. list_docid = list(set(df["docid"]))
  78. ots_client = getConnect_ots()
  79. bucket = getBucket()
  80. task_queue = queue.Queue()
  81. for _docid in list_docid:
  82. task_queue.put(_docid)
  83. def _handle(item,result_queue,ots_client):
  84. consumed, return_row, next_token = ots_client.get_row("document",[("partitionkey",int(item%500+1)),("docid",int(item))],["page_attachments"])
  85. _dict = getRow_ots_primary(return_row)
  86. _page_attachments = json.loads(_dict.get("page_attachments","[]"))
  87. _index = 0
  88. for _pa in _page_attachments:
  89. _index = 1
  90. _filemd5 = _pa.get("fileMd5")
  91. consumed, return_row, next_token = ots_client.get_row("attachment",[("filemd5",_filemd5)],["path","filetype"])
  92. _dict = getRow_ots_primary(return_row)
  93. _path = _dict.get("path")
  94. _filetype = _dict.get("filetype")
  95. localpath = "attach/%d_%d.%s"%(item,_index,_filetype)
  96. if os.path.exists(localpath):
  97. continue
  98. downloadFile(bucket,_path,localpath)
  99. mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
  100. mt.run()
  101. def analisysFile():
  102. filename = "ap1.log"
  103. dict_minute = {}
  104. _count = 0
  105. with open(filename,"r") as f:
  106. while True:
  107. _line = f.readline()
  108. if not _line:
  109. break
  110. _count += 1
  111. if re.search("swf of docid",_line) is not None:
  112. _minute = _line[:16]
  113. if _minute not in dict_minute:
  114. dict_minute[_minute] = 0
  115. dict_minute[_minute] += 1
  116. log("all line counts:%d"%_count)
  117. keys = list(dict_minute.keys())
  118. keys.sort(key=lambda x:x)
  119. for k in keys:
  120. print(k,dict_minute[k])
  121. if __name__=="__main__":
  122. exportAttachment()
  123. # downloadAtta()
  124. # analisysFile()