123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- import time
- import logging
- import traceback
- from swf.export import SVGExporter
- from swf.movie import SWF
- import re
- import codecs
- import base64
- import os
- import oss2
- import io
- def swf2images(filepath,export_dir):
- if not os.path.exists(export_dir):
- os.mkdir(export_dir)
- if type(filepath)==io.BytesIO:
- swf_file = SWF(filepath)
- svg_exporter = SVGExporter()
- svg = swf_file.export(svg_exporter)
- else:
- with open(filepath, 'rb') as f:
- swf_file = SWF(f)
- svg_exporter = SVGExporter()
- svg = swf_file.export(svg_exporter)
- # with open('swf_export.jpg', 'wb') as f:
- # f.write(svg.read())
- swf_str = str(svg.getvalue(), encoding='utf-8')
- # print(swf_str)
- # 正则匹配图片的信息位置
- result0 = re.finditer('<image id=(.[^>]*)', swf_str)
- image_bytes_list = []
- i = 0
- image_path_list = []
- for r in result0:
- i += 1
- # 截取图片信息所在位置
- swf_str0 = swf_str[r.span()[0]:r.span()[1]+1]
- # 正则匹配得到图片的base64编码
- result1 = re.search('xlink:href="data:(.[^>]*)', swf_str0)
- swf_str1 = swf_str0[result1.span()[0]:result1.span()[1]]
- reg1_prefix = 'b\''
- result1 = re.search(reg1_prefix + '(.[^\']*)', swf_str1)
- swf_str1 = swf_str1[result1.span()[0]+len(reg1_prefix):result1.span()[1]]
- # base64_str -> base64_bytes -> no "\\" base64_bytes -> bytes -> image
- base64_bytes_with_double = bytes(swf_str1, "utf-8")
- base64_bytes = codecs.escape_decode(base64_bytes_with_double, "hex-escape")[0]
- image_bytes = base64.b64decode(base64_bytes)
- image_bytes_list.append(image_bytes)
- image_path = os.path.join(export_dir,"swf_page_%d.png"%(i))
- with open(image_path, 'wb') as f:
- f.write(image_bytes)
- def uploadFileByPath(bucket,filepath,uploadpath,headers=None):
- try:
- start_time = time.time()
- logging.info("uploading file of %s"%filepath)
- with open(filepath,"rb") as f:
- bucket.put_object(uploadpath,f,headers=headers)
- logging.info("upload file of %s takes %ds"%(filepath,time.time()-start_time))
- return True
- except Exception as e:
- traceback.print_exc()
- logging.info("upload object failed of %s"%(filepath))
- return False
- def transformSWF(bucket,attachment_hub_url,objectPath,localpath,swf_dir):
- swf_urls = []
- try:
- swf2images(localpath,swf_dir)
- list_files = os.listdir(swf_dir)
- list_files.sort(key=lambda x:x)
- headers = dict()
- headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PUBLIC_READ
- for _file in list_files:
- swf_localpath = "%s/%s"%(swf_dir,_file)
- swf_objectPath = "%s/%s"%(objectPath.split(".")[0],_file)
- uploadFileByPath(bucket,swf_localpath,swf_objectPath,headers)
- _url = "%s/%s"%(attachment_hub_url,swf_objectPath)
- swf_urls.append(_url)
- os.remove(swf_localpath)
- except Exception as e:
- traceback.print_exc()
- return swf_urls
- if __name__ == '__main__':
- a = b'1234ab'
- import io
- b = io.BytesIO(a)
- print(type(b)==io.BytesIO)
- print(b.tell())
- import sys
- sys.argv.append(" ")
|