123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- from BaseDataMaintenance.common.Utils import *
- from tablestore import *
- import traceback
- class BaseModel():
- def __init__(self):
- self.COLUMN_MAX_SIZE = 2*1024*1024
- self.all_columns = []
- def getProperties(self):
- return self.__dict__
- def setProperties(self,k,v):
- if k in self.__dict__:
- self.__dict__[k] = v
- def getPrimary_keys(self):
- raise NotImplementedError
- def setValue(self,k,v,isColumn=True):
- if "all_columns" not in self.__dict__:
- self.all_columns = []
- self.__dict__[k] = v
- if isColumn:
- if k not in (set(self.all_columns)):
- self.all_columns.append(k)
- else:
- if k in set(self.all_columns):
- self.all_columns.remove(k)
- def getAll_columns(self):
- return list(self.__dict__.keys())
- def getAttribute_keys(self):
- return list(set(self.all_columns)-set(self.getPrimary_keys()))
- def getAttribute_turple(self):
- _list = []
- for _key in self.getAttribute_keys():
- if _key=="all_columns":
- continue
- _v = self.getProperties().get(_key)
- if _v is not None and _v!="":
- if isinstance(_v,list):
- _v = json.dumps(_v)
- _list.append((_key,_v))
- return _list
- def getPrimaryKey_turple(self):
- _list = []
- for _key in self.getPrimary_keys():
- _list.append((_key,self.getProperties().get(_key)))
- return _list
- @staticmethod
- def search(ots_client,table_name,key_tuple,columns_to_get):
- try:
- # 调用get_row接口查询,最后一个参数值1表示只需要返回一个版本的值。
- consumed, return_row, next_token = ots_client.get_row(table_name, key_tuple, columns_to_get, None, 1)
- if return_row is not None:
- _dict = getRow_ots_primary(return_row)
- return _dict
- return None
- # 客户端异常,一般为参数错误或者网络异常。
- except OTSClientError as e:
- traceback.print_exc()
- log("%s get row failed, http_status:%s, error_message:%s" % (table_name,str(e.get_http_status()), str(e.get_error_message())))
- # 服务端异常,一般为参数错误或者流控错误。
- except OTSServiceError as e:
- traceback.print_exc()
- log("get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (str(e.get_http_status()), e.get_error_code(), e.get_error_message(), e.get_request_id()))
- def fix_columns(self,ots_client,columns_to_fix,_flag):
- _dict = self.search(ots_client,self.table_name,self.getPrimaryKey_turple(),columns_to_fix)
- if _dict is not None:
- for k,v in _dict.items():
- self.setValue(k,v,_flag)
- return True
- def delete_row(self,ots_client):
- primary_key = self.getPrimaryKey_turple()
- row = Row(primary_key)
- try:
- consumed, return_row = ots_client.delete_row(self.table_name, row, None)
- except OTSClientError as e:
- log("update row failed, http_status:%s, error_message:%s" % (str(e.get_http_status()), e.get_error_message()))
- return False
- # 服务端异常,一般为参数错误或者流控错误。
- except OTSServiceError as e:
- log("update row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()))
- return False
- except Exception as e:
- log(str(e))
- return False
- # raise e
- # log ('Delete succeed, consume %s write cu.' % consumed.write)
- return True
- def exists_row(self,ots_client):
- primary_key = self.getPrimaryKey_turple()
- try:
- consumed, return_row, next_token = ots_client.get_row(self.table_name, primary_key, self.getPrimary_keys(), None, 1)
- if return_row is not None:
- return True
- except Exception as e:
- traceback.print_exc()
- log("get row faild of %s"%(str(primary_key)))
- return False
- def update_row(self,ots_client,retrytimes=3):
- primary_key = self.getPrimaryKey_turple()
- update_of_attribute_columns = {
- 'PUT' : self.getAttribute_turple()
- }
- row = Row(primary_key,update_of_attribute_columns)
- condition = Condition('IGNORE')
- for i in range(retrytimes):
- try:
- consumed, return_row = ots_client.update_row(self.table_name, row, condition)
- return True
- # 客户端异常,一般为参数错误或者网络异常。
- except OTSClientError as e:
- traceback.print_exc()
- log("update row failed, http_status:%s, error_message:%s" % (str(e.get_http_status()), e.get_error_message()))
- # raise e
- # 服务端异常,一般为参数错误或者流控错误。
- except OTSServiceError as e:
- traceback.print_exc()
- log("update row failed, http_status:%s, error_code:%s, error_message:%s, request_id:%s" % (str(e.get_http_status()), e.get_error_code(), e.get_error_message(), e.get_request_id()))
- # raise e
- except Exception as e:
- traceback.print_exc()
- pass
- # raise e
- return False
- # log ('update succeed, consume %s write cu.' % consumed.write)
|