attachmentRec.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. from odps.udf import annotate
  2. from odps.udf import BaseUDTF,BaseUDAF
  3. import platform
  4. def getPlatform():
  5. return platform.platform()
  6. @annotate('->string')
  7. class f_getPlatform(object):
  8. def evaluate(self):
  9. return getPlatform()
  10. @annotate('string->string,string,bigint')
  11. class f_strip_filemd5(BaseUDTF):
  12. def process(self,filemd5):
  13. split_filemd5 = filemd5.split("-")
  14. filemd5_strip = split_filemd5[0]
  15. if len(split_filemd5)==1:
  16. parts = 0
  17. else:
  18. parts = int(split_filemd5[1])
  19. self.forward(filemd5,filemd5_strip,parts)
  20. @annotate('string,bigint->string')
  21. class f_group_filemd5(BaseUDAF):
  22. def __init__(self):
  23. import json
  24. global json
  25. def new_buffer(self):
  26. return [[]]
  27. def iterate(self, buffer,filemd5,part):
  28. buffer[0].append([filemd5,part])
  29. def merge(self, buffer, pbuffer):
  30. buffer[0].extend(pbuffer[0])
  31. def terminate(self, buffer):
  32. list_group = buffer[0]
  33. list_group.sort(key=lambda x:x[1])
  34. list_filemd5 = []
  35. for item in list_group:
  36. list_filemd5.append(item[0])
  37. return json.dumps(list_filemd5)
  38. @annotate('string->string,string,string,string,string,string,string,string,string')
  39. class f_split_filemd5(BaseUDTF):
  40. def __init__(self):
  41. import json
  42. from uuid import uuid4
  43. global json,uuid4
  44. def process(self,filemd5s):
  45. list_result = [uuid4().hex[:19] for i in range(9)]
  46. list_filemd5 = json.loads(filemd5s)
  47. for i in range(len(list_filemd5)):
  48. list_result[i] = list_filemd5[i]
  49. self.forward(list_result[0],list_result[1],list_result[2],list_result[3],list_result[4],
  50. list_result[5],list_result[6],list_result[7],list_result[8])