12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 |
- from odps.udf import annotate
- from odps.udf import BaseUDTF,BaseUDAF
- import platform
- def getPlatform():
- return platform.platform()
- @annotate('->string')
- class f_getPlatform(object):
- def evaluate(self):
- return getPlatform()
- @annotate('string->string,string,bigint')
- class f_strip_filemd5(BaseUDTF):
- def process(self,filemd5):
- split_filemd5 = filemd5.split("-")
- filemd5_strip = split_filemd5[0]
- if len(split_filemd5)==1:
- parts = 0
- else:
- parts = int(split_filemd5[1])
- self.forward(filemd5,filemd5_strip,parts)
- @annotate('string,bigint->string')
- class f_group_filemd5(BaseUDAF):
- def __init__(self):
- import json
- global json
- def new_buffer(self):
- return [[]]
- def iterate(self, buffer,filemd5,part):
- buffer[0].append([filemd5,part])
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- def terminate(self, buffer):
- list_group = buffer[0]
- list_group.sort(key=lambda x:x[1])
- list_filemd5 = []
- for item in list_group:
- list_filemd5.append(item[0])
- return json.dumps(list_filemd5)
- @annotate('string->string,string,string,string,string,string,string,string,string')
- class f_split_filemd5(BaseUDTF):
- def __init__(self):
- import json
- from uuid import uuid4
- global json,uuid4
- def process(self,filemd5s):
- list_result = [uuid4().hex[:19] for i in range(9)]
- list_filemd5 = json.loads(filemd5s)
- for i in range(len(list_filemd5)):
- list_result[i] = list_filemd5[i]
- self.forward(list_result[0],list_result[1],list_result[2],list_result[3],list_result[4],
- list_result[5],list_result[6],list_result[7],list_result[8])
|