123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436 |
- import contextlib
- import logging
- import math
- import numpy as np
- import torch
- import torch.nn as nn
- from botr.yolov8.yolo_utils import make_anchors, dist2bbox, make_divisible
- def autopad(k, p=None, d=1): # kernel, padding, dilation
- """Pad to 'same' shape outputs."""
- if d > 1:
- k = d * (k - 1) + 1 if isinstance(k, int) else [d * (x - 1) + 1 for x in k] # actual kernel-size
- if p is None:
- p = k // 2 if isinstance(k, int) else [x // 2 for x in k] # auto-pad
- return p
- class Conv(nn.Module):
- """Standard convolution with args(ch_in, ch_out, kernel, stride, padding, groups, dilation, activation)."""
- default_act = nn.SiLU() # default activation
- def __init__(self, c1, c2, k=1, s=1, p=None, g=1, d=1, act=True):
- """Initialize Conv layer with given arguments including activation."""
- super().__init__()
- self.conv = nn.Conv2d(c1, c2, k, s, autopad(k, p, d), groups=g, dilation=d, bias=False)
- self.bn = nn.BatchNorm2d(c2)
- self.act = self.default_act if act is True else act if isinstance(act, nn.Module) else nn.Identity()
- def forward(self, x):
- """Apply convolution, batch normalization and activation to input tensor."""
- return self.act(self.bn(self.conv(x)))
- def forward_fuse(self, x):
- """Perform transposed convolution of 2D data."""
- return self.act(self.conv(x))
- class Conv2(Conv):
- """Simplified RepConv module with Conv fusing."""
- def __init__(self, c1, c2, k=3, s=1, p=None, g=1, d=1, act=True):
- """Initialize Conv layer with given arguments including activation."""
- super().__init__(c1, c2, k, s, p, g=g, d=d, act=act)
- self.cv2 = nn.Conv2d(c1, c2, 1, s, autopad(1, p, d), groups=g, dilation=d, bias=False) # add 1x1 conv
- def forward(self, x):
- """Apply convolution, batch normalization and activation to input tensor."""
- return self.act(self.bn(self.conv(x) + self.cv2(x)))
- def fuse_convs(self):
- """Fuse parallel convolutions."""
- w = torch.zeros_like(self.conv.weight.data)
- i = [x // 2 for x in w.shape[2:]]
- w[:, :, i[0]:i[0] + 1, i[1]:i[1] + 1] = self.cv2.weight.data.clone()
- self.conv.weight.data += w
- self.__delattr__('cv2')
- class DWConv(Conv):
- """Depth-wise convolution."""
- def __init__(self, c1, c2, k=1, s=1, d=1, act=True): # ch_in, ch_out, kernel, stride, dilation, activation
- super().__init__(c1, c2, k, s, g=math.gcd(c1, c2), d=d, act=act)
- class ConvTranspose(nn.Module):
- """Convolution transpose 2d layer."""
- default_act = nn.SiLU() # default activation
- def __init__(self, c1, c2, k=2, s=2, p=0, bn=True, act=True):
- """Initialize ConvTranspose2d layer with batch normalization and activation function."""
- super().__init__()
- self.conv_transpose = nn.ConvTranspose2d(c1, c2, k, s, p, bias=not bn)
- self.bn = nn.BatchNorm2d(c2) if bn else nn.Identity()
- self.act = self.default_act if act is True else act if isinstance(act, nn.Module) else nn.Identity()
- def forward(self, x):
- """Applies transposed convolutions, batch normalization and activation to input."""
- return self.act(self.bn(self.conv_transpose(x)))
- def forward_fuse(self, x):
- """Applies activation and convolution transpose operation to input."""
- return self.act(self.conv_transpose(x))
- class RepConv(nn.Module):
- """RepConv is a basic rep-style block, including training and deploy status
- This code is based on https://github.com/DingXiaoH/RepVGG/blob/main/repvgg.py
- """
- default_act = nn.SiLU() # default activation
- def __init__(self, c1, c2, k=3, s=1, p=1, g=1, d=1, act=True, bn=False, deploy=False):
- super().__init__()
- assert k == 3 and p == 1
- self.g = g
- self.c1 = c1
- self.c2 = c2
- self.act = self.default_act if act is True else act if isinstance(act, nn.Module) else nn.Identity()
- self.bn = nn.BatchNorm2d(num_features=c1) if bn and c2 == c1 and s == 1 else None
- self.conv1 = Conv(c1, c2, k, s, p=p, g=g, act=False)
- self.conv2 = Conv(c1, c2, 1, s, p=(p - k // 2), g=g, act=False)
- def forward_fuse(self, x):
- """Forward process"""
- return self.act(self.conv(x))
- def forward(self, x):
- """Forward process"""
- id_out = 0 if self.bn is None else self.bn(x)
- return self.act(self.conv1(x) + self.conv2(x) + id_out)
- def get_equivalent_kernel_bias(self):
- kernel3x3, bias3x3 = self._fuse_bn_tensor(self.conv1)
- kernel1x1, bias1x1 = self._fuse_bn_tensor(self.conv2)
- kernelid, biasid = self._fuse_bn_tensor(self.bn)
- return kernel3x3 + self._pad_1x1_to_3x3_tensor(kernel1x1) + kernelid, bias3x3 + bias1x1 + biasid
- def _avg_to_3x3_tensor(self, avgp):
- channels = self.c1
- groups = self.g
- kernel_size = avgp.kernel_size
- input_dim = channels // groups
- k = torch.zeros((channels, input_dim, kernel_size, kernel_size))
- k[np.arange(channels), np.tile(np.arange(input_dim), groups), :, :] = 1.0 / kernel_size ** 2
- return k
- def _pad_1x1_to_3x3_tensor(self, kernel1x1):
- if kernel1x1 is None:
- return 0
- else:
- return torch.nn.functional.pad(kernel1x1, [1, 1, 1, 1])
- def _fuse_bn_tensor(self, branch):
- if branch is None:
- return 0, 0
- if isinstance(branch, Conv):
- kernel = branch.conv.weight
- running_mean = branch.bn.running_mean
- running_var = branch.bn.running_var
- gamma = branch.bn.weight
- beta = branch.bn.bias
- eps = branch.bn.eps
- elif isinstance(branch, nn.BatchNorm2d):
- if not hasattr(self, 'id_tensor'):
- input_dim = self.c1 // self.g
- kernel_value = np.zeros((self.c1, input_dim, 3, 3), dtype=np.float32)
- for i in range(self.c1):
- kernel_value[i, i % input_dim, 1, 1] = 1
- self.id_tensor = torch.from_numpy(kernel_value).to(branch.weight.device)
- kernel = self.id_tensor
- running_mean = branch.running_mean
- running_var = branch.running_var
- gamma = branch.weight
- beta = branch.bias
- eps = branch.eps
- std = (running_var + eps).sqrt()
- t = (gamma / std).reshape(-1, 1, 1, 1)
- return kernel * t, beta - running_mean * gamma / std
- def fuse_convs(self):
- if hasattr(self, 'conv'):
- return
- kernel, bias = self.get_equivalent_kernel_bias()
- self.conv = nn.Conv2d(in_channels=self.conv1.conv.in_channels,
- out_channels=self.conv1.conv.out_channels,
- kernel_size=self.conv1.conv.kernel_size,
- stride=self.conv1.conv.stride,
- padding=self.conv1.conv.padding,
- dilation=self.conv1.conv.dilation,
- groups=self.conv1.conv.groups,
- bias=True).requires_grad_(False)
- self.conv.weight.data = kernel
- self.conv.bias.data = bias
- for para in self.parameters():
- para.detach_()
- self.__delattr__('conv1')
- self.__delattr__('conv2')
- if hasattr(self, 'nm'):
- self.__delattr__('nm')
- if hasattr(self, 'bn'):
- self.__delattr__('bn')
- if hasattr(self, 'id_tensor'):
- self.__delattr__('id_tensor')
- class DFL(nn.Module):
- """
- Integral module of Distribution Focal Loss (DFL).
- Proposed in Generalized Focal Loss https://ieeexplore.ieee.org/document/9792391
- """
- def __init__(self, c1=16):
- """Initialize a convolutional layer with a given number of input channels."""
- super().__init__()
- self.conv = nn.Conv2d(c1, 1, 1, bias=False).requires_grad_(False)
- x = torch.arange(c1, dtype=torch.float)
- self.conv.weight.data[:] = nn.Parameter(x.view(1, c1, 1, 1))
- self.c1 = c1
- def forward(self, x):
- """Applies a transformer layer on input tensor 'x' and returns a tensor."""
- b, c, a = x.shape # batch, channels, anchors
- return self.conv(x.view(b, 4, self.c1, a).transpose(2, 1).softmax(1)).view(b, 4, a)
- # return self.conv(x.view(b, self.c1, 4, a).softmax(1)).view(b, 4, a)
- class Concat(nn.Module):
- """Concatenate a list of tensors along dimension."""
- def __init__(self, dimension=1):
- """Concatenates a list of tensors along a specified dimension."""
- super().__init__()
- self.d = dimension
- def forward(self, x):
- """Forward pass for the YOLOv8 mask Proto module."""
- return torch.cat(x, self.d)
- class SPPF(nn.Module):
- """Spatial Pyramid Pooling - Fast (SPPF) layer for YOLOv5 by Glenn Jocher."""
- def __init__(self, c1, c2, k=5): # equivalent to SPP(k=(5, 9, 13))
- super().__init__()
- c_ = c1 // 2 # hidden channels
- self.cv1 = Conv(c1, c_, 1, 1)
- self.cv2 = Conv(c_ * 4, c2, 1, 1)
- self.m = nn.MaxPool2d(kernel_size=k, stride=1, padding=k // 2)
- def forward(self, x):
- """Forward pass through Ghost Convolution block."""
- x = self.cv1(x)
- y1 = self.m(x)
- y2 = self.m(y1)
- return self.cv2(torch.cat((x, y1, y2, self.m(y2)), 1))
- class Bottleneck(nn.Module):
- """Standard bottleneck."""
- def __init__(self, c1, c2, shortcut=True, g=1, k=(3, 3), e=0.5): # ch_in, ch_out, shortcut, groups, kernels, expand
- super().__init__()
- c_ = int(c2 * e) # hidden channels
- self.cv1 = Conv(c1, c_, k[0], 1)
- self.cv2 = Conv(c_, c2, k[1], 1, g=g)
- self.add = shortcut and c1 == c2
- def forward(self, x):
- """'forward()' applies the YOLOv5 FPN to input data."""
- return x + self.cv2(self.cv1(x)) if self.add else self.cv2(self.cv1(x))
- class C2f(nn.Module):
- """CSP Bottleneck with 2 convolutions."""
- def __init__(self, c1, c2, n=1, shortcut=False, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion
- super().__init__()
- self.c = int(c2 * e) # hidden channels
- self.cv1 = Conv(c1, 2 * self.c, 1, 1)
- self.cv2 = Conv((2 + n) * self.c, c2, 1) # optional act=FReLU(c2)
- self.m = nn.ModuleList(Bottleneck(self.c, self.c, shortcut, g, k=((3, 3), (3, 3)), e=1.0) for _ in range(n))
- def forward(self, x):
- """Forward pass through C2f layer."""
- y = list(self.cv1(x).chunk(2, 1))
- y.extend(m(y[-1]) for m in self.m)
- return self.cv2(torch.cat(y, 1))
- def forward_split(self, x):
- """Forward pass using split() instead of chunk()."""
- y = list(self.cv1(x).split((self.c, self.c), 1))
- y.extend(m(y[-1]) for m in self.m)
- return self.cv2(torch.cat(y, 1))
- class Detect(nn.Module):
- """YOLOv8 Detect head for detection models."""
- dynamic = False # force grid reconstruction
- export = False # export mode
- shape = None
- anchors = torch.empty(0) # init
- strides = torch.empty(0) # init
- def __init__(self, nc=80, ch=()): # detection layer
- super().__init__()
- self.nc = nc # number of classes
- self.nl = len(ch) # number of detection layers
- self.reg_max = 16 # DFL channels (ch[0] // 16 to scale 4/8/12/16/20 for n/s/m/l/x)
- self.no = nc + self.reg_max * 4 # number of outputs per anchor
- self.stride = torch.zeros(self.nl) # strides computed during build
- c2, c3 = max((16, ch[0] // 4, self.reg_max * 4)), max(ch[0], self.nc) # channels
- self.cv2 = nn.ModuleList(
- nn.Sequential(Conv(x, c2, 3), Conv(c2, c2, 3), nn.Conv2d(c2, 4 * self.reg_max, 1)) for x in ch)
- self.cv3 = nn.ModuleList(nn.Sequential(Conv(x, c3, 3), Conv(c3, c3, 3), nn.Conv2d(c3, self.nc, 1)) for x in ch)
- self.dfl = DFL(self.reg_max) if self.reg_max > 1 else nn.Identity()
- def forward(self, x):
- """Concatenates and returns predicted bounding boxes and class probabilities."""
- shape = x[0].shape # BCHW
- for i in range(self.nl):
- x[i] = torch.cat((self.cv2[i](x[i]), self.cv3[i](x[i])), 1)
- if self.training:
- return x
- elif self.dynamic or self.shape != shape:
- self.anchors, self.strides = (x.transpose(0, 1) for x in make_anchors(x, self.stride, 0.5))
- self.shape = shape
- x_cat = torch.cat([xi.view(shape[0], self.no, -1) for xi in x], 2)
- if self.export and self.format in ('saved_model', 'pb', 'tflite', 'edgetpu', 'tfjs'): # avoid TF FlexSplitV ops
- box = x_cat[:, :self.reg_max * 4]
- cls = x_cat[:, self.reg_max * 4:]
- else:
- box, cls = x_cat.split((self.reg_max * 4, self.nc), 1)
- dbox = dist2bbox(self.dfl(box), self.anchors.unsqueeze(0), xywh=True, dim=1) * self.strides
- y = torch.cat((dbox, cls.sigmoid()), 1)
- return y if self.export else (y, x)
- def bias_init(self):
- """Initialize Detect() biases, WARNING: requires stride availability."""
- m = self # self.model[-1] # Detect() module
- # cf = torch.bincount(torch.tensor(np.concatenate(dataset.labels, 0)[:, 0]).long(), minlength=nc) + 1
- # ncf = math.log(0.6 / (m.nc - 0.999999)) if cf is None else torch.log(cf / cf.sum()) # nominal class frequency
- for a, b, s in zip(m.cv2, m.cv3, m.stride): # from
- a[-1].bias.data[:] = 1.0 # box
- b[-1].bias.data[:m.nc] = math.log(5 / m.nc / (640 / s) ** 2) # cls (.01 objects, 80 classes, 640 img)
- def fuse_conv_and_bn(conv, bn):
- """Fuse Conv2d() and BatchNorm2d() layers https://tehnokv.com/posts/fusing-batchnorm-and-conv/."""
- fusedconv = nn.Conv2d(conv.in_channels,
- conv.out_channels,
- kernel_size=conv.kernel_size,
- stride=conv.stride,
- padding=conv.padding,
- dilation=conv.dilation,
- groups=conv.groups,
- bias=True).requires_grad_(False).to(conv.weight.device)
- # Prepare filters
- w_conv = conv.weight.clone().view(conv.out_channels, -1)
- w_bn = torch.diag(bn.weight.div(torch.sqrt(bn.eps + bn.running_var)))
- fusedconv.weight.copy_(torch.mm(w_bn, w_conv).view(fusedconv.weight.shape))
- # Prepare spatial bias
- b_conv = torch.zeros(conv.weight.size(0), device=conv.weight.device) if conv.bias is None else conv.bias
- b_bn = bn.bias - bn.weight.mul(bn.running_mean).div(torch.sqrt(bn.running_var + bn.eps))
- fusedconv.bias.copy_(torch.mm(w_bn, b_conv.reshape(-1, 1)).reshape(-1) + b_bn)
- return fusedconv
- def fuse_deconv_and_bn(deconv, bn):
- """Fuse ConvTranspose2d() and BatchNorm2d() layers."""
- fuseddconv = nn.ConvTranspose2d(deconv.in_channels,
- deconv.out_channels,
- kernel_size=deconv.kernel_size,
- stride=deconv.stride,
- padding=deconv.padding,
- output_padding=deconv.output_padding,
- dilation=deconv.dilation,
- groups=deconv.groups,
- bias=True).requires_grad_(False).to(deconv.weight.device)
- # Prepare filters
- w_deconv = deconv.weight.clone().view(deconv.out_channels, -1)
- w_bn = torch.diag(bn.weight.div(torch.sqrt(bn.eps + bn.running_var)))
- fuseddconv.weight.copy_(torch.mm(w_bn, w_deconv).view(fuseddconv.weight.shape))
- # Prepare spatial bias
- b_conv = torch.zeros(deconv.weight.size(1), device=deconv.weight.device) if deconv.bias is None else deconv.bias
- b_bn = bn.bias - bn.weight.mul(bn.running_mean).div(torch.sqrt(bn.running_var + bn.eps))
- fuseddconv.bias.copy_(torch.mm(w_bn, b_conv.reshape(-1, 1)).reshape(-1) + b_bn)
- return fuseddconv
- def parse_model(d, ch):
- # Parse a YOLO model.yaml dictionary into a PyTorch model
- import ast
- # Args
- max_channels = float('inf')
- nc, act, scales = (d.get(x) for x in ('nc', 'act', 'scales'))
- depth, width, kpt_shape = (d.get(x, 1.0) for x in ('depth_multiple', 'width_multiple', 'kpt_shape'))
- if scales:
- scale = d.get('scale')
- if not scale:
- scale = tuple(scales.keys())[0]
- logging.warning(f"WARNING ⚠️ no model scale passed. Assuming scale='{scale}'.")
- depth, width, max_channels = scales[scale]
- if act:
- Conv.default_act = eval(act) # redefine default activation, i.e. Conv.default_act = nn.SiLU()
- ch = [ch]
- layers, save, c2 = [], [], ch[-1] # layers, savelist, ch out
- for i, (f, n, m, args) in enumerate(d['backbone'] + d['head']): # from, number, module, args
- m = getattr(torch.nn, m[3:]) if 'nn.' in m else globals()[m] # get module
- for j, a in enumerate(args):
- if isinstance(a, str):
- with contextlib.suppress(ValueError):
- args[j] = locals()[a] if a in locals() else ast.literal_eval(a)
- n = n_ = max(round(n * depth), 1) if n > 1 else n # depth gain
- if m in (Conv, ConvTranspose, Bottleneck, SPPF, DWConv, C2f, nn.ConvTranspose2d):
- c1, c2 = ch[f], args[0]
- if c2 != nc: # if c2 not equal to number of classes (i.e. for Classify() output)
- c2 = make_divisible(min(c2, max_channels) * width, 8)
- args = [c1, c2, *args[1:]]
- if m in (C2f,):
- args.insert(2, n) # number of repeats
- n = 1
- elif m is nn.BatchNorm2d:
- args = [ch[f]]
- elif m is Concat:
- c2 = sum(ch[x] for x in f)
- elif m in (Detect,):
- args.append([ch[x] for x in f])
- else:
- c2 = ch[f]
- m_ = nn.Sequential(*(m(*args) for _ in range(n))) if n > 1 else m(*args) # module
- t = str(m)[8:-2].replace('__main__.', '') # module type
- m.np = sum(x.numel() for x in m_.parameters()) # number params
- m_.i, m_.f, m_.type = i, f, t # attach index, 'from' index, type
- save.extend(x % i for x in ([f] if isinstance(f, int) else f) if x != -1) # append to savelist
- layers.append(m_)
- if i == 0:
- ch = []
- ch.append(c2)
- return nn.Sequential(*layers), sorted(save)
|