3.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. import time
  2. import logging
  3. import traceback
  4. from swf.export import SVGExporter
  5. from swf.movie import SWF
  6. import re
  7. import codecs
  8. import base64
  9. import os
  10. import oss2
  11. import io
  12. def swf2images(filepath,export_dir):
  13. if not os.path.exists(export_dir):
  14. os.mkdir(export_dir)
  15. if type(filepath)==io.BytesIO:
  16. swf_file = SWF(filepath)
  17. svg_exporter = SVGExporter()
  18. svg = swf_file.export(svg_exporter)
  19. else:
  20. with open(filepath, 'rb') as f:
  21. swf_file = SWF(f)
  22. svg_exporter = SVGExporter()
  23. svg = swf_file.export(svg_exporter)
  24. # with open('swf_export.jpg', 'wb') as f:
  25. # f.write(svg.read())
  26. swf_str = str(svg.getvalue(), encoding='utf-8')
  27. # print(swf_str)
  28. # 正则匹配图片的信息位置
  29. result0 = re.finditer('<image id=(.[^>]*)', swf_str)
  30. image_bytes_list = []
  31. i = 0
  32. image_path_list = []
  33. for r in result0:
  34. i += 1
  35. # 截取图片信息所在位置
  36. swf_str0 = swf_str[r.span()[0]:r.span()[1]+1]
  37. # 正则匹配得到图片的base64编码
  38. result1 = re.search('xlink:href="data:(.[^>]*)', swf_str0)
  39. swf_str1 = swf_str0[result1.span()[0]:result1.span()[1]]
  40. reg1_prefix = 'b\''
  41. result1 = re.search(reg1_prefix + '(.[^\']*)', swf_str1)
  42. swf_str1 = swf_str1[result1.span()[0]+len(reg1_prefix):result1.span()[1]]
  43. # base64_str -> base64_bytes -> no "\\" base64_bytes -> bytes -> image
  44. base64_bytes_with_double = bytes(swf_str1, "utf-8")
  45. base64_bytes = codecs.escape_decode(base64_bytes_with_double, "hex-escape")[0]
  46. image_bytes = base64.b64decode(base64_bytes)
  47. image_bytes_list.append(image_bytes)
  48. image_path = os.path.join(export_dir,"swf_page_%d.png"%(i))
  49. with open(image_path, 'wb') as f:
  50. f.write(image_bytes)
  51. def uploadFileByPath(bucket,filepath,uploadpath,headers=None):
  52. try:
  53. start_time = time.time()
  54. logging.info("uploading file of %s"%filepath)
  55. with open(filepath,"rb") as f:
  56. bucket.put_object(uploadpath,f,headers=headers)
  57. logging.info("upload file of %s takes %ds"%(filepath,time.time()-start_time))
  58. return True
  59. except Exception as e:
  60. traceback.print_exc()
  61. logging.info("upload object failed of %s"%(filepath))
  62. return False
  63. def transformSWF(bucket,attachment_hub_url,objectPath,localpath,swf_dir):
  64. swf_urls = []
  65. try:
  66. swf2images(localpath,swf_dir)
  67. list_files = os.listdir(swf_dir)
  68. list_files.sort(key=lambda x:x)
  69. headers = dict()
  70. headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PUBLIC_READ
  71. for _file in list_files:
  72. swf_localpath = "%s/%s"%(swf_dir,_file)
  73. swf_objectPath = "%s/%s"%(objectPath.split(".")[0],_file)
  74. uploadFileByPath(bucket,swf_localpath,swf_objectPath,headers)
  75. _url = "%s/%s"%(attachment_hub_url,swf_objectPath)
  76. swf_urls.append(_url)
  77. os.remove(swf_localpath)
  78. except Exception as e:
  79. traceback.print_exc()
  80. return swf_urls
  81. if __name__ == '__main__':
  82. a = b'1234ab'
  83. import io
  84. b = io.BytesIO(a)
  85. print(type(b)==io.BytesIO)
  86. print(b.tell())
  87. import sys
  88. sys.argv.append(" ")