from odps.udf import annotate from odps.udf import BaseUDTF,BaseUDAF import platform def getPlatform(): return platform.platform() @annotate('->string') class f_getPlatform(object): def evaluate(self): return getPlatform() @annotate('string->string,string,bigint') class f_strip_filemd5(BaseUDTF): def process(self,filemd5): split_filemd5 = filemd5.split("-") filemd5_strip = split_filemd5[0] if len(split_filemd5)==1: parts = 0 else: parts = int(split_filemd5[1]) self.forward(filemd5,filemd5_strip,parts) @annotate('string,bigint->string') class f_group_filemd5(BaseUDAF): def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,filemd5,part): buffer[0].append([filemd5,part]) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_group = buffer[0] list_group.sort(key=lambda x:x[1]) list_filemd5 = [] for item in list_group: list_filemd5.append(item[0]) return json.dumps(list_filemd5) @annotate('string->string,string,string,string,string,string,string,string,string') class f_split_filemd5(BaseUDTF): def __init__(self): import json from uuid import uuid4 global json,uuid4 def process(self,filemd5s): list_result = [uuid4().hex[:19] for i in range(9)] list_filemd5 = json.loads(filemd5s) for i in range(len(list_filemd5)): list_result[i] = list_filemd5[i] self.forward(list_result[0],list_result[1],list_result[2],list_result[3],list_result[4], list_result[5],list_result[6],list_result[7],list_result[8])