torch_rec_model.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. import torch
  2. import torch.nn as nn
  3. import numpy as np
  4. import math
  5. from torch.nn import functional as F
  6. class HSwish(nn.Module):
  7. def forward(self, x):
  8. out = x * F.relu6(x + 3, inplace=True) / 6
  9. return out
  10. class ConvBNACT(nn.Module):
  11. def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, groups=1, act=None):
  12. super().__init__()
  13. self.conv = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size,
  14. stride=stride, padding=padding, groups=groups,
  15. bias=False)
  16. self.bn = nn.BatchNorm2d(out_channels)
  17. if act == 'relu':
  18. self.act = nn.ReLU()
  19. elif act == 'hard_swish':
  20. self.act = HSwish()
  21. elif act is None:
  22. self.act = None
  23. def forward(self, x):
  24. x = self.conv(x)
  25. x = self.bn(x)
  26. if self.act is not None:
  27. x = self.act(x)
  28. return x
  29. class ConvBNACTWithPool(nn.Module):
  30. def __init__(self, in_channels, out_channels, kernel_size, stride=1, groups=1, act=None):
  31. super().__init__()
  32. self.pool = nn.AvgPool2d(kernel_size=stride, stride=stride, padding=0, ceil_mode=True)
  33. self.conv = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=1,
  34. padding=(kernel_size - 1) // 2,
  35. groups=groups,
  36. bias=False)
  37. self.bn = nn.BatchNorm2d(out_channels)
  38. if act is None:
  39. self.act = None
  40. else:
  41. self.act = nn.ReLU()
  42. def forward(self, x):
  43. x = self.pool(x)
  44. x = self.conv(x)
  45. x = self.bn(x)
  46. if self.act is not None:
  47. x = self.act(x)
  48. return x
  49. class ShortCut(nn.Module):
  50. def __init__(self, in_channels, out_channels, stride, name, if_first=False):
  51. super().__init__()
  52. assert name is not None, 'shortcut must have name'
  53. self.name = name
  54. if in_channels != out_channels or stride[0] != 1:
  55. if if_first:
  56. self.conv = ConvBNACT(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=stride,
  57. padding=0, groups=1, act=None)
  58. else:
  59. self.conv = ConvBNACTWithPool(in_channels=in_channels, out_channels=out_channels, kernel_size=1,
  60. stride=stride, groups=1, act=None)
  61. elif if_first:
  62. self.conv = ConvBNACT(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=stride,
  63. padding=0, groups=1, act=None)
  64. else:
  65. self.conv = None
  66. def forward(self, x):
  67. if self.conv is not None:
  68. x = self.conv(x)
  69. return x
  70. class BasicBlock(nn.Module):
  71. def __init__(self, in_channels, out_channels, stride, if_first, name):
  72. super().__init__()
  73. assert name is not None, 'block must have name'
  74. self.name = name
  75. self.conv0 = ConvBNACT(in_channels=in_channels, out_channels=out_channels, kernel_size=3, stride=stride,
  76. padding=1, groups=1, act='relu')
  77. self.conv1 = ConvBNACT(in_channels=out_channels, out_channels=out_channels, kernel_size=3, stride=1, padding=1,
  78. groups=1, act=None)
  79. self.shortcut = ShortCut(in_channels=in_channels, out_channels=out_channels, stride=stride,
  80. name=f'{name}_branch1', if_first=if_first, )
  81. self.relu = nn.ReLU()
  82. self.output_channels = out_channels
  83. def forward(self, x):
  84. y = self.conv0(x)
  85. y = self.conv1(y)
  86. y = y + self.shortcut(x)
  87. return self.relu(y)
  88. class BottleneckBlock(nn.Module):
  89. def __init__(self, in_channels, out_channels, stride, if_first, name):
  90. super().__init__()
  91. assert name is not None, 'bottleneck must have name'
  92. self.name = name
  93. self.conv0 = ConvBNACT(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=1, padding=0,
  94. groups=1, act='relu')
  95. self.conv1 = ConvBNACT(in_channels=out_channels, out_channels=out_channels, kernel_size=3, stride=stride,
  96. padding=1, groups=1, act='relu')
  97. self.conv2 = ConvBNACT(in_channels=out_channels, out_channels=out_channels * 4, kernel_size=1, stride=1,
  98. padding=0, groups=1, act=None)
  99. self.shortcut = ShortCut(in_channels=in_channels, out_channels=out_channels * 4, stride=stride,
  100. if_first=if_first, name=f'{name}_branch1')
  101. self.relu = nn.ReLU()
  102. self.output_channels = out_channels * 4
  103. def forward(self, x):
  104. y = self.conv0(x)
  105. y = self.conv1(y)
  106. y = self.conv2(y)
  107. y = y + self.shortcut(x)
  108. return self.relu(y)
  109. class ResNet(nn.Module):
  110. def __init__(self, in_channels, layers, **kwargs):
  111. super().__init__()
  112. supported_layers = {
  113. 18: {'depth': [2, 2, 2, 2], 'block_class': BasicBlock},
  114. 34: {'depth': [3, 4, 6, 3], 'block_class': BasicBlock},
  115. 50: {'depth': [3, 4, 6, 3], 'block_class': BottleneckBlock},
  116. 101: {'depth': [3, 4, 23, 3], 'block_class': BottleneckBlock},
  117. 152: {'depth': [3, 8, 36, 3], 'block_class': BottleneckBlock},
  118. 200: {'depth': [3, 12, 48, 3], 'block_class': BottleneckBlock}
  119. }
  120. assert layers in supported_layers, "supported layers are {} but input layer is {}".format(supported_layers,
  121. layers)
  122. depth = supported_layers[layers]['depth']
  123. block_class = supported_layers[layers]['block_class']
  124. num_filters = [64, 128, 256, 512]
  125. self.conv1 = nn.Sequential(
  126. ConvBNACT(in_channels=in_channels, out_channels=32, kernel_size=3, stride=1, padding=1, act='relu'),
  127. ConvBNACT(in_channels=32, out_channels=32, kernel_size=3, stride=1, act='relu', padding=1),
  128. ConvBNACT(in_channels=32, out_channels=64, kernel_size=3, stride=1, act='relu', padding=1)
  129. )
  130. self.pool1 = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
  131. self.stages = nn.ModuleList()
  132. in_ch = 64
  133. for block_index in range(len(depth)):
  134. block_list = []
  135. for i in range(depth[block_index]):
  136. if layers >= 50:
  137. if layers in [101, 152, 200] and block_index == 2:
  138. if i == 0:
  139. conv_name = "res" + str(block_index + 2) + "a"
  140. else:
  141. conv_name = "res" + str(block_index + 2) + "b" + str(i)
  142. else:
  143. conv_name = "res" + str(block_index + 2) + chr(97 + i)
  144. else:
  145. conv_name = f'res{str(block_index + 2)}{chr(97 + i)}'
  146. if i == 0 and block_index != 0:
  147. stride = (2, 1)
  148. else:
  149. stride = (1, 1)
  150. block_list.append(block_class(in_channels=in_ch, out_channels=num_filters[block_index],
  151. stride=stride,
  152. if_first=block_index == i == 0, name=conv_name))
  153. in_ch = block_list[-1].output_channels
  154. self.stages.append(nn.Sequential(*block_list))
  155. self.out_channels = in_ch
  156. self.out = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
  157. def forward(self, x):
  158. x = self.conv1(x)
  159. x = self.pool1(x)
  160. for stage in self.stages:
  161. x = stage(x)
  162. x = self.out(x)
  163. return x
  164. class Im2Seq(nn.Module):
  165. def __init__(self, in_channels, **kwargs):
  166. super().__init__()
  167. self.out_channels = in_channels
  168. def forward(self, x):
  169. B, C, H, W = x.shape
  170. assert H == 1
  171. x = x.reshape(B, C, H * W)
  172. x = x.permute((0, 2, 1))
  173. return x
  174. class CTCHead(nn.Module):
  175. def __init__(self,
  176. in_channels=192,
  177. out_channels=6624,
  178. fc_decay=0.0004,
  179. mid_channels=None,
  180. return_feats=False,
  181. **kwargs):
  182. super(CTCHead, self).__init__()
  183. if mid_channels is None:
  184. self.fc = nn.Linear(
  185. in_channels,
  186. out_channels)
  187. else:
  188. self.fc1 = nn.Linear(
  189. in_channels,
  190. mid_channels)
  191. self.fc2 = nn.Linear(
  192. mid_channels,
  193. out_channels)
  194. self.in_channels = in_channels
  195. self.out_channels = out_channels
  196. self.mid_channels = mid_channels
  197. self.return_feats = return_feats
  198. self.apply(self._init_weights)
  199. print('---------model weight inits-----------')
  200. def _init_weights(self, m):
  201. if isinstance(m, nn.Linear):
  202. stdv = 1.0 / math.sqrt(self.in_channels * 1.0)
  203. nn.init.uniform_(m.weight, -stdv, stdv)
  204. nn.init.uniform_(m.bias, -stdv, stdv)
  205. def forward(self, x):
  206. if self.mid_channels is None:
  207. predicts = self.fc(x)
  208. else:
  209. x = self.fc1(x)
  210. predicts = self.fc2(x)
  211. if self.return_feats:
  212. result = (predicts, x)
  213. else:
  214. result = (predicts, None)
  215. return result[0]
  216. class EncoderWithRNN(nn.Module):
  217. def __init__(self, in_channels,**kwargs):
  218. super(EncoderWithRNN, self).__init__()
  219. hidden_size = kwargs.get('hidden_size', 256)
  220. self.out_channels = hidden_size * 2
  221. self.lstm = nn.LSTM(in_channels, hidden_size, bidirectional=True, num_layers=2,batch_first=True)
  222. def forward(self, x):
  223. self.lstm.flatten_parameters()
  224. x, _ = self.lstm(x)
  225. return x
  226. class SequenceEncoder(nn.Module):
  227. def __init__(self, in_channels, encoder_type='rnn', **kwargs):
  228. super(SequenceEncoder, self).__init__()
  229. self.encoder_reshape = Im2Seq(in_channels)
  230. self.out_channels = self.encoder_reshape.out_channels
  231. if encoder_type == 'reshape':
  232. self.only_reshape = True
  233. else:
  234. support_encoder_dict = {
  235. 'reshape': Im2Seq,
  236. 'rnn': EncoderWithRNN
  237. }
  238. assert encoder_type in support_encoder_dict, '{} must in {}'.format(
  239. encoder_type, support_encoder_dict.keys())
  240. self.encoder = support_encoder_dict[encoder_type](
  241. self.encoder_reshape.out_channels,**kwargs)
  242. self.out_channels = self.encoder.out_channels
  243. self.only_reshape = False
  244. def forward(self, x):
  245. x = self.encoder_reshape(x)
  246. if not self.only_reshape:
  247. x = self.encoder(x)
  248. return x
  249. class Rec_ResNet_34(nn.Module):
  250. def __init__(self,class_nums=None):
  251. super(Rec_ResNet_34, self).__init__()
  252. self.backbone = ResNet(in_channels=3,layers=34)
  253. hidden_size = 256
  254. self.neck = SequenceEncoder(in_channels=512,encoder_type='rnn',hidden_size=hidden_size)
  255. if class_nums:
  256. self.class_nums = class_nums
  257. else:
  258. self.class_nums = 7551
  259. self.head = CTCHead(in_channels=hidden_size*2,out_channels=self.class_nums + 1,mid_channels=None)
  260. # self.head = CTCHead(in_channels=2304,out_channels=7546,mid_channels=200)
  261. def forward(self, x):
  262. x = self.backbone(x)
  263. x = self.neck(x)
  264. x = self.head(x)
  265. return x