123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466 |
- import logging
- import math
- import re
- import time
- from pathlib import Path
- from types import SimpleNamespace
- import cv2
- import torch
- import torchvision
- import yaml
- import numpy as np
- import torch.nn as nn
- from PIL import Image
- def yaml_load(file='data.yaml', append_filename=False):
- """
- Load YAML data from a file.
- Args:
- file (str, optional): File name. Default is 'data.yaml'.
- append_filename (bool): Add the YAML filename to the YAML dictionary. Default is False.
- Returns:
- dict: YAML data and file name.
- """
- with open(file, errors='ignore', encoding='utf-8') as f:
- s = f.read() # string
- # Remove special characters
- if not s.isprintable():
- s = re.sub(r'[^\x09\x0A\x0D\x20-\x7E\x85\xA0-\uD7FF\uE000-\uFFFD\U00010000-\U0010ffff]+', '', s)
- # Add YAML filename to dict and return
- return {**yaml.safe_load(s), 'yaml_file': str(file)} if append_filename else yaml.safe_load(s)
- def smart_inference_mode():
- """Applies torch.inference_mode() decorator if torch>=1.9.0 else torch.no_grad() decorator."""
- def decorate(fn):
- torch_version = re.findall('\d+', torch.__version__)
- if int(torch_version[0]) >= 1 and int(torch_version[1]) >= 9:
- TORCH_1_9 = True
- else:
- TORCH_1_9 = False
- """Applies appropriate torch decorator for inference mode based on torch version."""
- return (torch.inference_mode if TORCH_1_9 else torch.no_grad)()(fn)
- return decorate
- def make_anchors(feats, strides, grid_cell_offset=0.5):
- """Generate anchors from features."""
- anchor_points, stride_tensor = [], []
- assert feats is not None
- dtype, device = feats[0].dtype, feats[0].device
- for i, stride in enumerate(strides):
- _, _, h, w = feats[i].shape
- sx = torch.arange(end=w, device=device, dtype=dtype) + grid_cell_offset # shift x
- sy = torch.arange(end=h, device=device, dtype=dtype) + grid_cell_offset # shift y
- torch_version = re.findall('\d+', torch.__version__)
- if int(torch_version[0]) >= 1 and int(torch_version[1]) >= 10:
- TORCH_1_10 = True
- else:
- TORCH_1_10 = False
- sy, sx = torch.meshgrid(sy, sx, indexing='ij') if TORCH_1_10 else torch.meshgrid(sy, sx)
- anchor_points.append(torch.stack((sx, sy), -1).view(-1, 2))
- stride_tensor.append(torch.full((h * w, 1), stride, dtype=dtype, device=device))
- return torch.cat(anchor_points), torch.cat(stride_tensor)
- def dist2bbox(distance, anchor_points, xywh=True, dim=-1):
- """Transform distance(ltrb) to box(xywh or xyxy)."""
- lt, rb = distance.chunk(2, dim)
- x1y1 = anchor_points - lt
- x2y2 = anchor_points + rb
- if xywh:
- c_xy = (x1y1 + x2y2) / 2
- wh = x2y2 - x1y1
- return torch.cat((c_xy, wh), dim) # xywh bbox
- return torch.cat((x1y1, x2y2), dim) # xyxy bbox
- def attempt_load_one_weight(weight, device=None, inplace=True):
- """Loads a single model weights."""
- from botr.yolov8.module import Detect
- from botr.yolov8.model import DetectionModel
- model = DetectionModel()
- ckpt = model.load_state_dict(torch.load(weight))
- model.to(device).float()
- model = model.fuse().eval() # model in eval mode
- # Module compatibility updates
- for m in model.modules():
- t = type(m)
- if t in (nn.Hardswish, nn.LeakyReLU, nn.ReLU, nn.ReLU6, nn.SiLU, Detect):
- m.inplace = inplace # torch 1.7.0 compatibility
- elif t is nn.Upsample and not hasattr(m, 'recompute_scale_factor'):
- m.recompute_scale_factor = None # torch 1.11.0 compatibility
- # Return model and ckpt
- return model, ckpt
- def xywh2xyxy(x):
- """
- Convert bounding box coordinates from (x, y, width, height) format to (x1, y1, x2, y2) format where (x1, y1) is the
- top-left corner and (x2, y2) is the bottom-right corner.
- Args:
- x (np.ndarray) or (torch.Tensor): The input bounding box coordinates in (x, y, width, height) format.
- Returns:
- y (np.ndarray) or (torch.Tensor): The bounding box coordinates in (x1, y1, x2, y2) format.
- """
- y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x)
- y[..., 0] = x[..., 0] - x[..., 2] / 2 # top left x
- y[..., 1] = x[..., 1] - x[..., 3] / 2 # top left y
- y[..., 2] = x[..., 0] + x[..., 2] / 2 # bottom right x
- y[..., 3] = x[..., 1] + x[..., 3] / 2 # bottom right y
- return y
- def box_iou(box1, box2, eps=1e-7):
- """
- Calculate intersection-over-union (IoU) of boxes.
- Both sets of boxes are expected to be in (x1, y1, x2, y2) format.
- Based on https://github.com/pytorch/vision/blob/master/torchvision/ops/boxes.py
- Args:
- box1 (torch.Tensor): A tensor of shape (N, 4) representing N bounding boxes.
- box2 (torch.Tensor): A tensor of shape (M, 4) representing M bounding boxes.
- eps (float, optional): A small value to avoid division by zero. Defaults to 1e-7.
- Returns:
- (torch.Tensor): An NxM tensor containing the pairwise IoU values for every element in box1 and box2.
- """
- # inter(N,M) = (rb(N,M,2) - lt(N,M,2)).clamp(0).prod(2)
- (a1, a2), (b1, b2) = box1.unsqueeze(1).chunk(2, 2), box2.unsqueeze(0).chunk(2, 2)
- inter = (torch.min(a2, b2) - torch.max(a1, b1)).clamp_(0).prod(2)
- # IoU = inter / (area1 + area2 - inter)
- return inter / ((a2 - a1).prod(2) + (b2 - b1).prod(2) - inter + eps)
- def clip_boxes(boxes, shape):
- """
- It takes a list of bounding boxes and a shape (height, width) and clips the bounding boxes to the
- shape
- Args:
- boxes (torch.Tensor): the bounding boxes to clip
- shape (tuple): the shape of the image
- """
- if isinstance(boxes, torch.Tensor): # faster individually
- boxes[..., 0].clamp_(0, shape[1]) # x1
- boxes[..., 1].clamp_(0, shape[0]) # y1
- boxes[..., 2].clamp_(0, shape[1]) # x2
- boxes[..., 3].clamp_(0, shape[0]) # y2
- else: # np.array (faster grouped)
- boxes[..., [0, 2]] = boxes[..., [0, 2]].clip(0, shape[1]) # x1, x2
- boxes[..., [1, 3]] = boxes[..., [1, 3]].clip(0, shape[0]) # y1, y2
- def scale_boxes(img1_shape, boxes, img0_shape, ratio_pad=None):
- """
- Rescales bounding boxes (in the format of xyxy) from the shape of the image they were originally specified in
- (img1_shape) to the shape of a different image (img0_shape).
- Args:
- img1_shape (tuple): The shape of the image that the bounding boxes are for, in the format of (height, width).
- boxes (torch.Tensor): the bounding boxes of the objects in the image, in the format of (x1, y1, x2, y2)
- img0_shape (tuple): the shape of the target image, in the format of (height, width).
- ratio_pad (tuple): a tuple of (ratio, pad) for scaling the boxes. If not provided, the ratio and pad will be
- calculated based on the size difference between the two images.
- Returns:
- boxes (torch.Tensor): The scaled bounding boxes, in the format of (x1, y1, x2, y2)
- """
- if ratio_pad is None: # calculate from img0_shape
- gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new
- pad = round((img1_shape[1] - img0_shape[1] * gain) / 2 - 0.1), round(
- (img1_shape[0] - img0_shape[0] * gain) / 2 - 0.1) # wh padding
- else:
- gain = ratio_pad[0][0]
- pad = ratio_pad[1]
- boxes[..., [0, 2]] -= pad[0] # x padding
- boxes[..., [1, 3]] -= pad[1] # y padding
- boxes[..., :4] /= gain
- clip_boxes(boxes, img0_shape)
- return boxes
- def non_max_suppression(
- prediction,
- conf_thres=0.25,
- iou_thres=0.45,
- classes=None,
- agnostic=False,
- multi_label=False,
- labels=(),
- max_det=300,
- nc=0, # number of classes (optional)
- max_time_img=0.05,
- max_nms=30000,
- max_wh=7680,
- ):
- """
- Perform non-maximum suppression (NMS) on a set of boxes, with support for masks and multiple labels per box.
- Arguments:
- prediction (torch.Tensor): A tensor of shape (batch_size, num_classes + 4 + num_masks, num_boxes)
- containing the predicted boxes, classes, and masks. The tensor should be in the format
- output by a model, such as YOLO.
- conf_thres (float): The confidence threshold below which boxes will be filtered out.
- Valid values are between 0.0 and 1.0.
- iou_thres (float): The IoU threshold below which boxes will be filtered out during NMS.
- Valid values are between 0.0 and 1.0.
- classes (List[int]): A list of class indices to consider. If None, all classes will be considered.
- agnostic (bool): If True, the model is agnostic to the number of classes, and all
- classes will be considered as one.
- multi_label (bool): If True, each box may have multiple labels.
- labels (List[List[Union[int, float, torch.Tensor]]]): A list of lists, where each inner
- list contains the apriori labels for a given image. The list should be in the format
- output by a dataloader, with each label being a tuple of (class_index, x1, y1, x2, y2).
- max_det (int): The maximum number of boxes to keep after NMS.
- nc (int): (optional) The number of classes output by the model. Any indices after this will be considered masks.
- max_time_img (float): The maximum time (seconds) for processing one image.
- max_nms (int): The maximum number of boxes into torchvision.ops.nms().
- max_wh (int): The maximum box width and height in pixels
- Returns:
- (List[torch.Tensor]): A list of length batch_size, where each element is a tensor of
- shape (num_boxes, 6 + num_masks) containing the kept boxes, with columns
- (x1, y1, x2, y2, confidence, class, mask1, mask2, ...).
- """
- # Checks
- assert 0 <= conf_thres <= 1, f'Invalid Confidence threshold {conf_thres}, valid values are between 0.0 and 1.0'
- assert 0 <= iou_thres <= 1, f'Invalid IoU {iou_thres}, valid values are between 0.0 and 1.0'
- if isinstance(prediction, (list, tuple)): # YOLOv8 model in validation model, output = (inference_out, loss_out)
- prediction = prediction[0] # select only inference output
- device = prediction.device
- mps = 'mps' in device.type # Apple MPS
- if mps: # MPS not fully supported yet, convert tensors to CPU before NMS
- prediction = prediction.cpu()
- bs = prediction.shape[0] # batch size
- nc = nc or (prediction.shape[1] - 4) # number of classes
- nm = prediction.shape[1] - nc - 4
- mi = 4 + nc # mask start index
- xc = prediction[:, 4:mi].amax(1) > conf_thres # candidates
- # Settings
- # min_wh = 2 # (pixels) minimum box width and height
- time_limit = 0.5 + max_time_img * bs # seconds to quit after
- redundant = True # require redundant detections
- multi_label &= nc > 1 # multiple labels per box (adds 0.5ms/img)
- merge = False # use merge-NMS
- t = time.time()
- output = [torch.zeros((0, 6 + nm), device=prediction.device)] * bs
- for xi, x in enumerate(prediction): # image index, image inference
- # Apply constraints
- # x[((x[:, 2:4] < min_wh) | (x[:, 2:4] > max_wh)).any(1), 4] = 0 # width-height
- x = x.transpose(0, -1)[xc[xi]] # confidence
- # Cat apriori labels if autolabelling
- if labels and len(labels[xi]):
- lb = labels[xi]
- v = torch.zeros((len(lb), nc + nm + 5), device=x.device)
- v[:, :4] = lb[:, 1:5] # box
- v[range(len(lb)), lb[:, 0].long() + 4] = 1.0 # cls
- x = torch.cat((x, v), 0)
- # If none remain process next image
- if not x.shape[0]:
- continue
- # Detections matrix nx6 (xyxy, conf, cls)
- box, cls, mask = x.split((4, nc, nm), 1)
- box = xywh2xyxy(box) # center_x, center_y, width, height) to (x1, y1, x2, y2)
- if multi_label:
- i, j = (cls > conf_thres).nonzero(as_tuple=False).T
- x = torch.cat((box[i], x[i, 4 + j, None], j[:, None].float(), mask[i]), 1)
- else: # best class only
- conf, j = cls.max(1, keepdim=True)
- x = torch.cat((box, conf, j.float(), mask), 1)[conf.view(-1) > conf_thres]
- # Filter by class
- if classes is not None:
- x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)]
- # Apply finite constraint
- # if not torch.isfinite(x).all():
- # x = x[torch.isfinite(x).all(1)]
- # Check shape
- n = x.shape[0] # number of boxes
- if not n: # no boxes
- continue
- x = x[x[:, 4].argsort(descending=True)[:max_nms]] # sort by confidence and remove excess boxes
- # Batched NMS
- c = x[:, 5:6] * (0 if agnostic else max_wh) # classes
- boxes, scores = x[:, :4] + c, x[:, 4] # boxes (offset by class), scores
- i = torchvision.ops.nms(boxes, scores, iou_thres) # NMS
- i = i[:max_det] # limit detections
- if merge and (1 < n < 3E3): # Merge NMS (boxes merged using weighted mean)
- # Update boxes as boxes(i,4) = weights(i,n) * boxes(n,4)
- iou = box_iou(boxes[i], boxes) > iou_thres # iou matrix
- weights = iou * scores[None] # box weights
- x[i, :4] = torch.mm(weights, x[:, :4]).float() / weights.sum(1, keepdim=True) # merged boxes
- if redundant:
- i = i[iou.sum(1) > 1] # require redundancy
- output[xi] = x[i]
- if mps:
- output[xi] = output[xi].to(device)
- if (time.time() - t) > time_limit:
- logging.warning(f'WARNING ⚠️ NMS time limit {time_limit:.3f}s exceeded')
- break # time limit exceeded
- return output
- def make_divisible(x, divisor):
- """Returns nearest x divisible by divisor."""
- if isinstance(divisor, torch.Tensor):
- divisor = int(divisor.max()) # to int
- return math.ceil(x / divisor) * divisor
- def initialize_weights(model):
- """Initialize model weights to random values."""
- for m in model.modules():
- t = type(m)
- if t is nn.Conv2d:
- pass # nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
- elif t is nn.BatchNorm2d:
- m.eps = 1e-3
- m.momentum = 0.03
- elif t in [nn.Hardswish, nn.LeakyReLU, nn.ReLU, nn.ReLU6, nn.SiLU]:
- m.inplace = True
- def get_num_params(model):
- """Return the total number of parameters in a YOLO model."""
- return sum(x.numel() for x in model.parameters())
- def get_num_gradients(model):
- """Return the total number of parameters with gradients in a YOLO model."""
- return sum(x.numel() for x in model.parameters() if x.requires_grad)
- class LetterBox:
- """Resize image and padding for detection, instance segmentation, pose."""
- def __init__(self, new_shape=(640, 640), auto=False, scaleFill=False, scaleup=True, stride=32):
- """Initialize LetterBox object with specific parameters."""
- self.new_shape = new_shape
- self.auto = auto
- self.scaleFill = scaleFill
- self.scaleup = scaleup
- self.stride = stride
- def __call__(self, labels=None, image=None):
- """Return updated labels and image with added border."""
- if labels is None:
- labels = {}
- img = labels.get('img') if image is None else image
- shape = img.shape[:2] # current shape [height, width]
- new_shape = labels.pop('rect_shape', self.new_shape)
- if isinstance(new_shape, int):
- new_shape = (new_shape, new_shape)
- # Scale ratio (new / old)
- r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
- if not self.scaleup: # only scale down, do not scale up (for better val mAP)
- r = min(r, 1.0)
- # Compute padding
- ratio = r, r # width, height ratios
- new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
- dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
- if self.auto: # minimum rectangle
- dw, dh = np.mod(dw, self.stride), np.mod(dh, self.stride) # wh padding
- elif self.scaleFill: # stretch
- dw, dh = 0.0, 0.0
- new_unpad = (new_shape[1], new_shape[0])
- ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios
- dw /= 2 # divide padding into 2 sides
- dh /= 2
- if labels.get('ratio_pad'):
- labels['ratio_pad'] = (labels['ratio_pad'], (dw, dh)) # for evaluation
- if shape[::-1] != new_unpad: # resize
- img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
- top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
- left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
- img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT,
- value=(114, 114, 114)) # add border
- if len(labels):
- labels = self._update_labels(labels, ratio, dw, dh)
- labels['img'] = img
- labels['resized_shape'] = new_shape
- return labels
- else:
- return img
- def _update_labels(self, labels, ratio, padw, padh):
- """Update labels."""
- labels['instances'].convert_bbox(format='xyxy')
- labels['instances'].denormalize(*labels['img'].shape[:2][::-1])
- labels['instances'].scale(*ratio)
- labels['instances'].add_padding(padw, padh)
- return labels
- class LoadPilAndNumpy:
- def __init__(self, im0, imgsz=640):
- """Initialize PIL and Numpy Dataloader."""
- if not isinstance(im0, list):
- im0 = [im0]
- self.paths = [getattr(im, 'filename', f'image{i}.jpg') for i, im in enumerate(im0)]
- self.im0 = [self._single_check(im) for im in im0]
- self.imgsz = imgsz
- self.mode = 'image'
- # Generate fake paths
- self.bs = len(self.im0)
- self.source_type = ''
- @staticmethod
- def _single_check(im):
- """Validate and format an image to numpy array."""
- assert isinstance(im, (Image.Image, np.ndarray)), f'Expected PIL/np.ndarray image type, but got {type(im)}'
- if isinstance(im, Image.Image):
- if im.mode != 'RGB':
- im = im.convert('RGB')
- im = np.asarray(im)[:, :, ::-1]
- im = np.ascontiguousarray(im) # contiguous
- return im
- def __len__(self):
- """Returns the length of the 'im0' attribute."""
- return len(self.im0)
- def __next__(self):
- """Returns batch paths, images, processed images, None, ''."""
- if self.count == 1: # loop only once as it's batch inference
- raise StopIteration
- self.count += 1
- return self.paths, self.im0, None, ''
- def __iter__(self):
- """Enables iteration for class LoadPilAndNumpy."""
- self.count = 0
- return self
|