BaseModel.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. from BaseDataMaintenance.common.Utils import *
  2. from tablestore import *
  3. import traceback
  4. class BaseModel():
  5. def __init__(self):
  6. self.COLUMN_MAX_SIZE = 2*1024*1024
  7. self.all_columns = []
  8. def getProperties(self):
  9. return self.__dict__
  10. def setProperties(self,k,v):
  11. if k in self.__dict__:
  12. self.__dict__[k] = v
  13. def getPrimary_keys(self):
  14. raise NotImplementedError
  15. def setValue(self,k,v,isColumn=True):
  16. if "all_columns" not in self.__dict__:
  17. self.all_columns = []
  18. self.__dict__[k] = v
  19. if isColumn:
  20. if k not in (set(self.all_columns)):
  21. self.all_columns.append(k)
  22. else:
  23. if k in set(self.all_columns):
  24. self.all_columns.remove(k)
  25. def getAll_columns(self):
  26. return list(self.__dict__.keys())
  27. def getAttribute_keys(self):
  28. return list(set(self.all_columns)-set(self.getPrimary_keys()))
  29. def getAttribute_turple(self):
  30. _list = []
  31. for _key in self.getAttribute_keys():
  32. if _key=="all_columns":
  33. continue
  34. _v = self.getProperties().get(_key)
  35. if _v is not None and _v!="":
  36. if isinstance(_v,list):
  37. _v = json.dumps(_v)
  38. _list.append((_key,_v))
  39. return _list
  40. def getPrimaryKey_turple(self):
  41. _list = []
  42. for _key in self.getPrimary_keys():
  43. _list.append((_key,self.getProperties().get(_key)))
  44. return _list
  45. @staticmethod
  46. def search(ots_client,table_name,key_tuple,columns_to_get):
  47. try:
  48. # 调用get_row接口查询,最后一个参数值1表示只需要返回一个版本的值。
  49. consumed, return_row, next_token = ots_client.get_row(table_name, key_tuple, columns_to_get, None, 1)
  50. if return_row is not None:
  51. _dict = getRow_ots_primary(return_row)
  52. return _dict
  53. return None
  54. # 客户端异常,一般为参数错误或者网络异常。
  55. except OTSClientError as e:
  56. traceback.print_exc()
  57. log("%s get row failed, http_status:%s, error_message:%s" % (table_name,str(e.get_http_status()), str(e.get_error_message())))
  58. # 服务端异常,一般为参数错误或者流控错误。
  59. except OTSServiceError as e:
  60. traceback.print_exc()
  61. log("get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (str(e.get_http_status()), e.get_error_code(), e.get_error_message(), e.get_request_id()))
  62. def fix_columns(self,ots_client,columns_to_fix,_flag):
  63. _dict = self.search(ots_client,self.table_name,self.getPrimaryKey_turple(),columns_to_fix)
  64. if _dict is not None:
  65. for k,v in _dict.items():
  66. self.setValue(k,v,_flag)
  67. return True
  68. def delete_row(self,ots_client):
  69. primary_key = self.getPrimaryKey_turple()
  70. row = Row(primary_key)
  71. try:
  72. consumed, return_row = ots_client.delete_row(self.table_name, row, None)
  73. except OTSClientError as e:
  74. log("update row failed, http_status:%s, error_message:%s" % (str(e.get_http_status()), e.get_error_message()))
  75. return False
  76. # 服务端异常,一般为参数错误或者流控错误。
  77. except OTSServiceError as e:
  78. log("update row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()))
  79. return False
  80. except Exception as e:
  81. log(str(e))
  82. return False
  83. # raise e
  84. # log ('Delete succeed, consume %s write cu.' % consumed.write)
  85. return True
  86. def exists_row(self,ots_client):
  87. primary_key = self.getPrimaryKey_turple()
  88. try:
  89. consumed, return_row, next_token = ots_client.get_row(self.table_name, primary_key, self.getPrimary_keys(), None, 1)
  90. if return_row is not None:
  91. return True
  92. except Exception as e:
  93. traceback.print_exc()
  94. log("get row faild of %s"%(str(primary_key)))
  95. return False
  96. def update_row(self,ots_client,retrytimes=3):
  97. primary_key = self.getPrimaryKey_turple()
  98. update_of_attribute_columns = {
  99. 'PUT' : self.getAttribute_turple()
  100. }
  101. row = Row(primary_key,update_of_attribute_columns)
  102. condition = Condition('IGNORE')
  103. for i in range(retrytimes):
  104. try:
  105. consumed, return_row = ots_client.update_row(self.table_name, row, condition)
  106. return True
  107. # 客户端异常,一般为参数错误或者网络异常。
  108. except OTSClientError as e:
  109. traceback.print_exc()
  110. log("update row failed, http_status:%s, error_message:%s" % (str(e.get_http_status()), e.get_error_message()))
  111. # raise e
  112. # 服务端异常,一般为参数错误或者流控错误。
  113. except OTSServiceError as e:
  114. traceback.print_exc()
  115. log("update row failed, http_status:%s, error_code:%s, error_message:%s, request_id:%s" % (str(e.get_http_status()), e.get_error_code(), e.get_error_message(), e.get_request_id()))
  116. # raise e
  117. except Exception as e:
  118. traceback.print_exc()
  119. pass
  120. # raise e
  121. return False
  122. # log ('update succeed, consume %s write cu.' % consumed.write)