attachmentRec.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. #coding:utf8
  2. from odps.udf import annotate
  3. from odps.distcache import get_cache_archive
  4. from odps.distcache import get_cache_file
  5. from odps.udf import BaseUDTF,BaseUDAF
  6. import threading
  7. import logging
  8. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  9. import time
  10. import os
  11. def log(msg):
  12. logging.info(msg)
  13. # 配置pandas依赖包
  14. def include_package_path(res_name):
  15. import os, sys
  16. archive_files = get_cache_archive(res_name)
  17. dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
  18. if '.dist_info' not in f.name], key=lambda v: len(v))
  19. log("add path:%s"%(dir_names[0]))
  20. sys.path.append(dir_names[0])
  21. return os.path.dirname(dir_names[0])
  22. # 可能出现类似RuntimeError: xxx has been blocked by sandbox
  23. # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true
  24. def include_file(file_name):
  25. import os, sys
  26. so_file = get_cache_file(file_name)
  27. sys.path.append(os.path.dirname(os.path.abspath(so_file.name)))
  28. def include_so(file_name):
  29. import os, sys
  30. so_file = get_cache_file(file_name)
  31. with open(so_file.name, 'rb') as fp:
  32. content=fp.read()
  33. so = open(file_name, "wb")
  34. so.write(content)
  35. so.flush()
  36. so.close()
  37. #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入
  38. def init_env(list_files,package_name):
  39. import os,sys
  40. if len(list_files)==1:
  41. so_file = get_cache_file(list_files[0])
  42. cmd_line = os.path.abspath(so_file.name)
  43. os.system("unzip -o %s -d %s"%(cmd_line,package_name))
  44. elif len(list_files)>1:
  45. cmd_line = "cat"
  46. for _file in list_files:
  47. so_file = get_cache_file(_file)
  48. cmd_line += " "+os.path.abspath(so_file.name)
  49. cmd_line += " > temp.zip"
  50. os.system(cmd_line)
  51. os.system("unzip -o temp.zip -d %s"%(package_name))
  52. # os.system("rm -rf %s/*.dist-info"%(package_name))
  53. # return os.listdir(os.path.abspath("local_package"))
  54. # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package")))
  55. # os.system("source ~/.bashrc")
  56. sys.path.insert(0,os.path.abspath(package_name))
  57. # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real"))
  58. import platform
  59. def getPlatform():
  60. return platform.platform()
  61. @annotate('->string')
  62. class f_getPlatform(object):
  63. def evaluate(self):
  64. return getPlatform()
  65. @annotate('string->string,string,bigint')
  66. class f_strip_filemd5(BaseUDTF):
  67. def process(self,filemd5):
  68. split_filemd5 = filemd5.split("-")
  69. filemd5_strip = split_filemd5[0]
  70. if len(split_filemd5)==1:
  71. parts = 0
  72. else:
  73. parts = int(split_filemd5[1])
  74. self.forward(filemd5,filemd5_strip,parts)
  75. @annotate('string,bigint->string')
  76. class f_group_filemd5(BaseUDAF):
  77. def __init__(self):
  78. import json
  79. global json
  80. def new_buffer(self):
  81. return [[]]
  82. def iterate(self, buffer,filemd5,part):
  83. buffer[0].append([filemd5,part])
  84. def merge(self, buffer, pbuffer):
  85. buffer[0].extend(pbuffer[0])
  86. def terminate(self, buffer):
  87. list_group = buffer[0]
  88. list_group.sort(key=lambda x:x[1])
  89. list_filemd5 = []
  90. for item in list_group:
  91. list_filemd5.append(item[0])
  92. return json.dumps(list_filemd5)
  93. @annotate('string->string,string,string,string,string,string,string,string,string')
  94. class f_split_filemd5(BaseUDTF):
  95. def __init__(self):
  96. import json
  97. from uuid import uuid4
  98. global json,uuid4
  99. def process(self,filemd5s):
  100. list_filemd5 = json.loads(filemd5s)
  101. list_result = [uuid4().hex[:19] for i in range(max(9,len(list_filemd5)))]
  102. logging.info(str(list_filemd5))
  103. for i in range(len(list_filemd5)):
  104. list_result[i] = list_filemd5[i]
  105. self.forward(list_result[0],list_result[1],list_result[2],list_result[3],list_result[4],
  106. list_result[5],list_result[6],list_result[7],list_result[8])
  107. def downloadFile(bucket,objectPath,localPath):
  108. try:
  109. start_time = time.time()
  110. # bucket.get_object_to_file(objectPath, localPath)
  111. oss2.resumable_download(bucket, objectPath, localPath,
  112. store=oss2.ResumableDownloadStore(root="/home/admin"),
  113. multiget_threshold=200*1024,
  114. part_size=200*1024,
  115. num_threads=5)
  116. log("download %s takes %d"%(objectPath,time.time()-start_time))
  117. return True
  118. except Exception as e:
  119. log("download object failed of %s"%str(objectPath))
  120. return False
  121. @annotate('->string')
  122. class f_test_download(BaseUDTF):
  123. def __init__(self):
  124. include_package_path("oss_env.zip")
  125. import json
  126. from uuid import uuid4
  127. import logging
  128. import oss2
  129. global json,uuid4,oss2
  130. self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
  131. self.attachment_bucket_name = "attachment-hub"
  132. self.auth = oss2.Auth("LTAI4FyUT7ZcQFZPjVtw5y9b", "2zscfFTvy3JWavtCeCOthLxF8bDNH3")
  133. self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
  134. def process(self):
  135. downloadFile(self.bucket,"049c/20210701/2021-07-01/03755/1625135745231.zip","/home/admin/1.pdf")
  136. @annotate('string->string')
  137. class f_test_exit(BaseUDTF):
  138. def __init__(self):
  139. import json
  140. from uuid import uuid4
  141. import logging
  142. def process(self,s):
  143. for i in range(3):
  144. time.sleep(10)
  145. log("jump heart")
  146. self.forward("1")
  147. @annotate('bigint->string')
  148. class f_getRandomStr(object):
  149. def __init__(self):
  150. import random
  151. global random
  152. self.result_s = ""
  153. def evaluate(self,count):
  154. if self.result_s=="":
  155. list_c = [chr(ord('a')+i) for i in range(26)]
  156. result_s = ""
  157. for i in range(count):
  158. index = random.randint(0,len(list_c)-1)
  159. result_s += list_c[index]
  160. self.result_s = result_s
  161. for i in range(count//200):
  162. index = random.randint(0,len(self.result_s)-1)
  163. index_1 = random.randint(0,len(self.result_s)-1)
  164. self.result_s = self.result_s[:index]+self.result_s[index_1:index_1+1]+self.result_s[index+1:]
  165. return self.result_s
  166. @annotate('string->string')
  167. class f_extract_pageAttachments(BaseUDTF):
  168. def __init__(self):
  169. include_package_path("envs_py37.env.zip")
  170. import json
  171. from uuid import uuid4
  172. from bs4 import BeautifulSoup
  173. import logging
  174. global json,BeautifulSoup
  175. def process(self,_html):
  176. if _html is not None:
  177. page_attachments = self.extract_pageAttachments(_html)
  178. if len(page_attachments)>0:
  179. self.forward(json.dumps(page_attachments,ensure_ascii=False))
  180. def extract_pageAttachments(self,_html):
  181. fileSuffix = [".zip", ".rar", ".tar", ".7z", ".wim", ".docx", ".doc", ".xlsx", ".xls", ".pdf", ".txt", ".hnzf", ".bmp", ".jpg", ".jpeg", ".png", ".tif", ".swf"]
  182. _soup = BeautifulSoup(_html,"lxml")
  183. list_a = _soup.find_all("a")
  184. list_img = _soup.find_all("img")
  185. page_attachments = []
  186. for _a in list_a:
  187. _text =_a.get_text()
  188. _url = _a.attrs.get("href","")
  189. if _url.find("http://www.bidizhaobiao.com")>=0:
  190. continue
  191. is_attach = False
  192. for suf in fileSuffix:
  193. if _text.find(suf)>=0 or _url.find(suf)>=0:
  194. is_attach = True
  195. if is_attach:
  196. page_attachments.append({"fileLink":_url,"fileTitle":_text})
  197. for _a in list_img:
  198. _text =_a.get_text()
  199. _url = _a.attrs.get("src","")
  200. if _url.find("http://www.bidizhaobiao.com")>=0:
  201. continue
  202. is_attach = False
  203. for suf in fileSuffix:
  204. if _text.find(suf)>=0 or _url.find(suf)>=0:
  205. is_attach = True
  206. if is_attach:
  207. page_attachments.append({"fileLink":_url,"fileTitle":_text})
  208. return page_attachments