torchsight.models.retinanet
module
Retinanet module.
This module contains an implementation of each one of the modules needed in the Retinanet architecture.
Retinanet original paper: https://arxiv.org/pdf/1708.02002.pdf
-
Feature pyramid: Based on a ResNet backbone, takes the output of each layer and generate a pyramid of the features. Original paper: https://arxiv.org/pdf/1612.03144.pdf
-
Regression: A module to compute the regressions for each bounding box.
- Classification: A module to classify the class of each bounding box.
This code is heavily inspired by yhenon: https://github.com/yhenon/pytorch-retinanet
Source code
"""Retinanet module.
This module contains an implementation of each one of the modules needed in the
Retinanet architecture.
Retinanet original paper:
https://arxiv.org/pdf/1708.02002.pdf
- Feature pyramid: Based on a ResNet backbone, takes the output of each layer
and generate a pyramid of the features.
Original paper:
https://arxiv.org/pdf/1612.03144.pdf
- Regression: A module to compute the regressions for each bounding box.
- Classification: A module to classify the class of each bounding box.
This code is heavily inspired by yhenon:
https://github.com/yhenon/pytorch-retinanet
"""
import torch
from torch import nn
from .anchors import Anchors
from .resnet import resnet18, resnet34, resnet50, resnet101, resnet152
class FeaturePyramid(nn.Module):
"""Feature pyramid network.
It takes the natural architecture of a convolutional network to generate a pyramid
of features with little extra effort.
It merges the outputs of each one of the ResNet layers with the previous one to give
more semantically meaning (the deepest the layer the more semantic meaning it has)
and the strong localization features of the first layers.
For more information please read the original paper:
https://arxiv.org/pdf/1612.03144.pdf
This implementation also used the exact provided at RetinaNet paper:
https://arxiv.org/pdf/1708.02002.pdf
Each time that a decision in the code mention "the paper" is the RetinaNet paper.
"""
strides = [8, 16, 32, 64, 128] # The stride applied to obtain each output
def __init__(self, resnet=18, features=256, levels=None, pretrained=True):
"""Initialize the network.
It init a ResNet with the given depth and use 'features' as the number of
channels for each feature map.
Args:
resnet (int): Indicates the depth of the resnet backbone.
features (int): Indicates the depth (number of channels) of each feature map
of the pyramid.
levels (list of int): The numbers of the layers in the FPN to get their feature maps.
If None is given it will return all the levels from 3 to 7.
If some level is not present it won't return that feature map level of the pyramid.
pretrained (bool): Indicates if the backbone must be pretrained.
"""
super(FeaturePyramid, self).__init__()
if levels is None:
levels = [3, 4, 5, 6, 7]
self.levels = levels
if resnet == 18:
self.backbone = resnet18(pretrained)
elif resnet == 34:
self.backbone = resnet34(pretrained)
elif resnet == 50:
self.backbone = resnet50(pretrained)
elif resnet == 101:
self.backbone = resnet101(pretrained)
elif resnet == 152:
self.backbone = resnet152(pretrained)
else:
raise ValueError('Invalid ResNet depth: {}'.format(resnet))
# The paper names the output for each layer a C_x where x is the number of the layer
# ResNet output feature maps has this depths for each layer
c5_depth = self.backbone.output_channels[0]
c4_depth = self.backbone.output_channels[1]
c3_depth = self.backbone.output_channels[2]
# The paper names the feature maps as P. We start from the last output, the more rich semantically.
# Conv 1x1 to set features dimension and conv 3x3 to generate feature map
self.p5_conv1 = nn.Conv2d(c5_depth, features, kernel_size=1, stride=1, padding=0)
self.p5_conv2 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1)
# Upsample to sum with c_4
self.p5_upsample = lambda input: nn.functional.interpolate(input, scale_factor=2, mode='nearest')
self.p4_conv1 = nn.Conv2d(c4_depth, features, kernel_size=1, stride=1, padding=0)
self.p4_conv2 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1)
self.p4_upsample = lambda input: nn.functional.interpolate(input, scale_factor=2, mode='nearest')
# We don't need to upsample c3 because we are not using c2
self.p3_conv1 = nn.Conv2d(c3_depth, features, kernel_size=1, stride=1, padding=0)
self.p3_conv2 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1)
# In the paper the generate also a p6 and p7 feature maps
# p6 is obtained via a 3x3 convolution with stride 2 over c5
self.p6_conv = nn.Conv2d(c5_depth, features, kernel_size=3, stride=2, padding=1)
# p7 is computed by applying ReLU followed with by a 3x3 convolution with stride 2 on p6
self.p7_relu = nn.ReLU()
self.p7_conv = nn.Conv2d(features, features, kernel_size=3, stride=2, padding=1)
def forward(self, images):
"""Generate the Feature Pyramid given the images as tensor.
Args:
images (torch.Tensor): A tensor with shape (batch size, 3, width, height).
Returns:
tuple: A tuple with the output for each level from 3 to 7.
Shapes:
- p3: (batch size, features, width / 8, height / 8)
- p4: (batch size, features, width / 16, height / 16)
- p5: (batch size, features, width / 32, height / 32)
- p6: (batch size, features, width / 64, height / 64)
- p7: (batch size, features, width / 128, height / 128)
"""
# Bottom-up pathway with resnet backbone
c5, c4, c3 = self.backbone(images)
# Top-down pathway and lateral connections
p5 = self.p5_conv1(c5)
p5_upsampled = self.p5_upsample(p5)
p5 = self.p5_conv2(p5)
p4 = self.p4_conv1(c4)
p4 = p4 + p5_upsampled
p4_upsampled = self.p4_upsample(p4)
p4 = self.p4_conv2(p4)
p3 = self.p3_conv1(c3)
p3 = p3 + p4_upsampled
p3 = self.p3_conv2(p3)
features = {3: p3, 4: p4, 5: p5}
if 6 in self.levels:
p6 = self.p6_conv(c5)
features[6] = p6
if 7 in self.levels:
p7 = self.p7_relu(p6)
p7 = self.p7_conv(p7)
features[7] = p7
return [features[i] for i in self.levels]
class SubModule(nn.Module):
"""Base class for the regression and classification submodules."""
def __init__(self, in_channels, outputs, anchors=9, features=256):
"""Initialize the components of the network.
Args:
in_channels (int): Indicates the number of features (or channels) of the feature map.
outputs (int): The number of outputs per anchor.
anchors (int, optional): Indicates the number of anchors per location in the feature map.
features (int, optional): Indicates the number of features that the conv layers must have.
"""
super(SubModule, self).__init__()
self.conv1 = nn.Conv2d(in_channels, features, kernel_size=3, stride=1, padding=1)
self.act1 = nn.ReLU()
self.conv2 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1)
self.act2 = nn.ReLU()
self.conv3 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1)
self.act3 = nn.ReLU()
self.conv4 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1)
self.act4 = nn.ReLU()
self.last_conv = nn.Conv2d(features, outputs * anchors, kernel_size=3, stride=1, padding=1)
def forward(self, feature_map):
"""Generates the outputs for each anchor and location in the feature map.
Args:
feature_map (torch.Tensor): A tensor with shape (batch size, in_channels, width, height).
Returns:
torch.Tensor: The tensor with outputs values for each location and anchor in the feature map.
Shape:
(batch size, outputs * anchors, width, height)
"""
out = self.conv1(feature_map)
out = self.act1(out)
out = self.conv2(out)
out = self.act2(out)
out = self.conv3(out)
out = self.act3(out)
out = self.conv4(out)
out = self.act4(out)
out = self.last_conv(out)
return out
class Regression(SubModule):
"""Regression submodule of RetinaNet.
It generates, given a feature map, a tensor with the values of regression for each
anchor.
"""
def __init__(self, in_channels, anchors=9, features=256):
"""Initialize the components of the network.
Args:
in_channels (int): Indicates the number of features (or channels) of the feature map.
anchors (int, optional): Indicates the number of anchors (i.e. bounding boxes) per location
in the feature map.
features (int, optional): Indicates the number of features that the conv layers must have.
"""
super(Regression, self).__init__(in_channels, outputs=4, anchors=anchors, features=features)
def forward(self, feature_map):
"""Generates the bounding box regression for each anchor and location in the feature map.
Args:
feature_map (torch.Tensor): A tensor with shape (batch size, in_channels, height, width).
Returns:
torch.Tensor: The tensor with bounding boxes values for the feature map.
Shape:
(batch size, height * width * number of anchors, 4)
"""
out = super(Regression, self).forward(feature_map)
# Now, out has shape (batch size, 4 * number of anchors, width, height)
out = out.permute(0, 2, 3, 1).contiguous() # Set regression values as the last dimension
return out.view(out.shape[0], -1, 4) # Change the shape of the tensor
class Classification(SubModule):
"""Classification submodule of RetinaNet.
It generates, given a feature map, a tensor with the probability of each class.
"""
def __init__(self, in_channels, classes, anchors=9, features=256, prior=0.01):
"""Initialize the network.
Args:
in_channels (int): The number of channels of the feature map.
classes (int): Indicates the number of classes to predict.
anchors (int, optional): The number of anchors per location in the feature map.
features (int, optional): Indicates the number of inner features that the conv layers must have.
"""
super(Classification, self).__init__(in_channels, outputs=classes, anchors=anchors, features=features)
# Initialize the last conv layer with `bias = -log((1-pi)/pi)` as in the paper
torch.nn.init.constant_(self.last_conv.bias, float(torch.log(torch.Tensor([(1 - prior) / prior]))))
self.classes = classes
self.activation = nn.Sigmoid()
def forward(self, feature_map):
"""Generates the probabilities for each class for each anchor for each location in the feature map.
Args:
feature_map (torch.Tensor): A tensor with shape (batch size, in_channels, height, width).
Returns:
torch.Tensor: The tensour with the probability of each class for each anchor and location in the
feature map. Shape: (batch size, height * width * anchors, classes)
"""
out = super(Classification, self).forward(feature_map)
out = self.activation(out)
# Now out has shape (batch size, classes * anchors, height, width)
out = out.permute(0, 2, 3, 1).contiguous() # Move the outputs to the last dim
return out.view(out.shape[0], -1, self.classes)
class RetinaNet(nn.Module):
"""RetinaNet network.
This Network is for object detection, so its outputs are the regressions
for the bounding boxes for each anchor and the probabilities for each class for each anchor.
In training mode returns all the predicted values (all bounding boxes and classes for all the anchors)
and in evaluation mode applies Non-Maximum suppresion to return only the relevant detections with shape
(N, 5) where N are the number of detections and 5 are for the x1, y1, x2, y2, class' label.
"""
def __init__(self, classes, resnet=18, features=None, anchors=None, fpn_levels=None, pretrained=True, device=None):
"""Initialize the network.
Arguments:
classes (int): The number of classes to detect.
resnet (int, optional): The depth of the resnet backbone for the Feature Pyramid Network.
features (dict, optional): The dict that indicates the features for each module of the network.
anchors (dict, optional): The dict with the 'sizes', 'scales' and 'ratios' sequences to initialize
the Anchors module.
fpn_levels (list of int): The numbers of the layers in the FPN to get their feature maps.
If None is given it will return all the levels from 3 to 7.
If some level is not present it won't return that feature map level of the pyramid.
pretrained (bool, optional): If the resnet backbone of the FPN must be pretrained on the ImageNet dataset.
This pretraining is provided by the torchvision package.
device (str, optional): The device where the module will run.
"""
super(RetinaNet, self).__init__()
if features is None:
features = {'pyramid': 256, 'regression': 256, 'classification': 256}
if anchors is None:
anchors = {'sizes': [32, 64, 128, 256, 512],
'scales': [2 ** 0, 2 ** (1/3), 2 ** (2/3)],
'ratios': [0.5, 1, 2]}
self.device = device if device is not None else 'cuda:0' if torch.cuda.is_available() else 'cpu'
# Modules
self.fpn = FeaturePyramid(
resnet=resnet, features=features['pyramid'], levels=fpn_levels, pretrained=pretrained
)
if len(anchors['sizes']) != 5:
raise ValueError('anchors_size must have length 5 to work with the FPN')
self.anchors = Anchors(anchors['sizes'], anchors['scales'], anchors['ratios'],
self.fpn.strides, device=self.device)
self.regression = Regression(in_channels=features['pyramid'],
anchors=self.anchors.n_anchors,
features=features['regression'])
self.classification = self.get_classification_module(in_channels=features['pyramid'],
classes=classes,
anchors=self.anchors.n_anchors,
features=features['classification'])
# Set the base threshold for evaluating mode
self.threshold = 0.6
self.iou_threshold = 0.5
self.evaluate_loss = False
def to(self, device):
"""Move the network to the given device and update the Anchors module that also needs to change
its device to anchors generation.
Arguments:
device (str): The device to where to move the network. Example: 'cpu' or 'cuda:0'.
"""
self.device = device
self.anchors.to(device)
return super(RetinaNet, self).to(device)
def get_classification_module(self, in_channels, classes, anchors, features):
"""Get the classification module to generate the probability of each anchor
to be any class.
Why to do a getter? Because this allow to override this method and compute the
classification with another method.
Arguments:
in_channels (int): The number of channels of the feature map.
classes (int): Indicates the number of classes to predict.
anchors (int, optional): The number of anchors per location in the feature map.
features (int, optional): Indicates the number of inner features that the conv layers must have.
Returns:
torch.nn.Module: The classification module for the net.
"""
return Classification(in_channels=in_channels, classes=classes, anchors=anchors, features=features)
def eval(self, threshold=None, iou_threshold=None, loss=False):
"""Set the model in the evaluation mode. Keep only bounding boxes with predictions with score
over threshold.
Args:
threshold (float): The threshold to keep only bounding boxes with a class' probability over it.
iou_threshold (float): If two bounding boxes has Intersection Over Union more than this
threshold they are detecting the same object.
loss (bool): If true the forward pass does not do NMS and return the same output as if it is
training. Why? To compute the loss over the validation dataset for example.
"""
if threshold is not None:
self.threshold = threshold
if iou_threshold is not None:
self.iou_threshold = iou_threshold
self.evaluate_loss = loss
return super(RetinaNet, self).eval()
def classify(self, feature_maps):
"""Perform the classification of the feature maps.
Arguments:
tuple: A tuple with the feature maps generated by the FPN backbone.
Returns:
torch.Tensor: The classification probability for each anchor.
Shape:
`(batch size, number of anchors, number of classes)`
"""
return torch.cat([self.classification(feature_map) for feature_map in feature_maps], dim=1)
def forward(self, images):
"""Forward pass of the network. Returns the anchors and the probability for each class per anchor.
In training mode (calling `model.train()`) it returns all the anchors ans classes' probabilities
but in evaluation mode (calling `model.eval()`) it applies Non-Maximum Supresion to keep only
the predictions that do not collide.
On evaluation mode we cannot return only two tensors (bounding boxes and classifications) because
different images could have different amounts of predictions over the threshold so we cannot keep
all them in a single tensor.
To avoid this problem in evaluation mode it returns a sequence of (bounding_boxes, classifications)
for each image.
Args:
images (torch.Tensor): Tensor with the batch of images.
Shape:
(batch size, channels, height, width)
Returns:
In training mode:
torch.Tensor: A tensor with the base anchors.
Shape:
(batch size, total anchors, 4)
torch.Tensor: A tensor with the regression values to adjust the anchors to a bounding box.
Shape:
(batch size, total anchors, 4)
torch.Tensor: A tensor with the probability of each class for each anchor.
Shape:
(batch size, total anchors, number of classes)
In evaluation mode:
sequence: A sequence of (bounding boxes, classifications) for each image.
Bounding boxes: Tensor with shape (total predictions, 4).
Classifications: Tensor with shape (total predictions, classes).
"""
feature_maps = self.fpn(images)
# Get regressions and classifications values with shape (batch size, total anchors, 4)
regressions = torch.cat([self.regression(feature_map) for feature_map in feature_maps], dim=1)
classifications = self.classify(feature_maps)
del feature_maps
# Transform the anchors to bounding boxes
anchors = self.anchors(images)
if self.training:
return anchors, regressions, classifications
if self.evaluate_loss:
return anchors.detach(), regressions.detach(), classifications.detach()
anchors, regressions, classifications = anchors.detach(), regressions.detach(), classifications.detach()
return self.transform(images, anchors, regressions, classifications)
def transform(self, images, anchors, regressions, classifications):
"""Transform, clip and apply NMS to the predictions.
Arguments:
images (torch.Tensor): The images that passed through the network. Useful for clipping
the predicted bounding boxes.
Shape:
(batch size, channels, height, width)
anchors (torch.Tensor): The base anchors generated for the images.
Shape:
(batch size, amount of anchors, 4)
regressions (torch.Tensor): The regression values to transform and adjust the bounding boxes.
Shape:
(batch size, amount of anchors, 4)
classifications (torch.Tensor): The probabilities for each class for each anchor.
Shape:
(batch size, amount of anchors, amount of classes)
Returns:
sequence: A sequence of (bounding boxes, classifications) for each image.
Bounding boxes: Tensor with shape (total predictions, 4).
Classifications: Tensor with shape (total predictions, classes).
"""
bounding_boxes = self.anchors.transform(anchors, regressions).detach()
del regressions
# Clip the boxes to fit in the image
bounding_boxes = self.anchors.clip(images, bounding_boxes).detach()
# Generate a sequence of (bounding_boxes, classifications) for each image
return [self.make_predictions(*self.nms(bounding_boxes[index], classifications[index]))
for index in range(images.shape[0])]
@staticmethod
def make_predictions(boxes, classifications):
"""Join the boxes and the classification probabilities and make a single predictions per bounding box.
Arguments:
boxes (torch.Tensor): The bounding boxes generated for the image.
Shape: `(num of boxes, 4)`.
classifications (torch.Tensor): The probability for each class.
Shape: `(num of boxes, num of classes)`.
Returns:
torch.Tensor: A single tensor with the label and the probability for each predicted bounding box.
"""
detections = torch.zeros(boxes.shape[0], 6)
if boxes.shape[0] > 0:
detections[:, :4] = boxes
probs, labels = classifications.max(dim=1)
detections[:, 4] = labels
detections[:, 5] = probs
return detections
def nms(self, boxes, classifications):
"""Apply Non-Maximum Suppression over the detections to remove bounding boxes that are detecting
the same object.
Args:
boxes (torch.Tensor): Tensor with the bounding boxes.
Shape:
(total anchors, 4)
classifications (torch.Tensor): Tensor with the scores for each class for each anchor.
Shape:
(total anchors, number of classes)
Returns:
torch.Tensor: The bounding boxes to keep.
torch.Tensor: The probabilities for each class for each bounding box keeped.
"""
# Get the max score of any class for each anchor
scores, _ = classifications.max(dim=1) # Shape (total anchors,)
# Keep only the bounding boxes and classifications over the threshold
scores_over_threshold = scores > self.threshold
boxes = boxes[scores_over_threshold, :]
classifications = classifications[scores_over_threshold, :]
if not classifications.shape[0] > 0:
return torch.zeros((0)).to(self.device), torch.zeros((0)).to(self.device)
# Update the scores to keep only the keeped boxes
scores = classifications.max(dim=1)[0]
# If there aren't detections return empty
if boxes.shape[0] == 0:
return torch.zeros(0).to(self.device)
# Get the numpy version
# was_cuda = detections.is_cuda
# detections = detections.cpu().numpy()
# Start the picked indexes list empty
picked = []
# Get the coordinates
x1 = boxes[:, 0].detach()
y1 = boxes[:, 1].detach()
x2 = boxes[:, 2].detach()
y2 = boxes[:, 3].detach()
# Compute the area of the bounding boxes
areas = ((x2 - x1 + 1) * (y2 - y1 + 1)).detach()
# Get the indexes of the detections sorted by score (lowest score first)
_, indexes = scores.sort()
del scores
while indexes.shape[0] > 1:
# Take the last index (highest score) and add it to the picked
actual = int(indexes[-1])
picked.append(actual)
# We need to find the overlap of the bounding boxes with the actual picked bounding box
# Find the largest (more to the bottom-right) (x,y) coordinates for the start
# (top-left) of the bounding box between the actual and all of the others with lower score
xx1 = torch.max(x1[actual], x1[indexes[:-1]])
yy1 = torch.max(y1[actual], y1[indexes[:-1]])
# Find the smallest (more to the top-left) (x,y) coordinates for the end (bottom-right)
# of the bounding box
xx2 = torch.min(x2[actual], x2[indexes[:-1]])
yy2 = torch.min(y2[actual], y2[indexes[:-1]])
# Compute width and height to compute the intersection over union
w = torch.max(torch.Tensor([0]).to(self.device), xx2 - xx1 + 1)
h = torch.max(torch.Tensor([0]).to(self.device), yy2 - yy1 + 1)
del xx1, yy1, xx2, yy2
intersection = (w * h)
del h, w
union = areas[actual] + areas[indexes[:-1]] - intersection
iou = intersection / union
# Delete the last index
indexes = indexes[:-1]
# Keep only the indexes that has overlap lower than the threshold
indexes = indexes[iou < self.iou_threshold].detach()
# Return the filtered bounding boxes and classifications
return boxes[picked].detach(), classifications[picked].detach()
Classes
class Classification (ancestors: SubModule, torch.nn.modules.module.Module)
-
Classification submodule of RetinaNet.
It generates, given a feature map, a tensor with the probability of each class.
Source code
class Classification(SubModule): """Classification submodule of RetinaNet. It generates, given a feature map, a tensor with the probability of each class. """ def __init__(self, in_channels, classes, anchors=9, features=256, prior=0.01): """Initialize the network. Args: in_channels (int): The number of channels of the feature map. classes (int): Indicates the number of classes to predict. anchors (int, optional): The number of anchors per location in the feature map. features (int, optional): Indicates the number of inner features that the conv layers must have. """ super(Classification, self).__init__(in_channels, outputs=classes, anchors=anchors, features=features) # Initialize the last conv layer with `bias = -log((1-pi)/pi)` as in the paper torch.nn.init.constant_(self.last_conv.bias, float(torch.log(torch.Tensor([(1 - prior) / prior])))) self.classes = classes self.activation = nn.Sigmoid() def forward(self, feature_map): """Generates the probabilities for each class for each anchor for each location in the feature map. Args: feature_map (torch.Tensor): A tensor with shape (batch size, in_channels, height, width). Returns: torch.Tensor: The tensour with the probability of each class for each anchor and location in the feature map. Shape: (batch size, height * width * anchors, classes) """ out = super(Classification, self).forward(feature_map) out = self.activation(out) # Now out has shape (batch size, classes * anchors, height, width) out = out.permute(0, 2, 3, 1).contiguous() # Move the outputs to the last dim return out.view(out.shape[0], -1, self.classes)
Methods
def __init__(self, in_channels, classes, anchors=9, features=256, prior=0.01)
-
Initialize the network.
Args
in_channels
:int
- The number of channels of the feature map.
classes
:int
- Indicates the number of classes to predict.
anchors
:int
, optional- The number of anchors per location in the feature map.
features
:int
, optional- Indicates the number of inner features that the conv layers must have.
Source code
def __init__(self, in_channels, classes, anchors=9, features=256, prior=0.01): """Initialize the network. Args: in_channels (int): The number of channels of the feature map. classes (int): Indicates the number of classes to predict. anchors (int, optional): The number of anchors per location in the feature map. features (int, optional): Indicates the number of inner features that the conv layers must have. """ super(Classification, self).__init__(in_channels, outputs=classes, anchors=anchors, features=features) # Initialize the last conv layer with `bias = -log((1-pi)/pi)` as in the paper torch.nn.init.constant_(self.last_conv.bias, float(torch.log(torch.Tensor([(1 - prior) / prior])))) self.classes = classes self.activation = nn.Sigmoid()
def forward(self, feature_map)
-
Generates the probabilities for each class for each anchor for each location in the feature map.
Args
feature_map
:torch.Tensor
- A tensor with shape (batch size, in_channels, height, width).
Returns
torch.Tensor: The tensour with the probability of each class for each anchor and location in the feature map. Shape: (batch size, height * width * anchors, classes)
Source code
def forward(self, feature_map): """Generates the probabilities for each class for each anchor for each location in the feature map. Args: feature_map (torch.Tensor): A tensor with shape (batch size, in_channels, height, width). Returns: torch.Tensor: The tensour with the probability of each class for each anchor and location in the feature map. Shape: (batch size, height * width * anchors, classes) """ out = super(Classification, self).forward(feature_map) out = self.activation(out) # Now out has shape (batch size, classes * anchors, height, width) out = out.permute(0, 2, 3, 1).contiguous() # Move the outputs to the last dim return out.view(out.shape[0], -1, self.classes)
class FeaturePyramid (ancestors: torch.nn.modules.module.Module)
-
Feature pyramid network.
It takes the natural architecture of a convolutional network to generate a pyramid of features with little extra effort.
It merges the outputs of each one of the ResNet layers with the previous one to give more semantically meaning (the deepest the layer the more semantic meaning it has) and the strong localization features of the first layers.
For more information please read the original paper: https://arxiv.org/pdf/1612.03144.pdf
This implementation also used the exact provided at RetinaNet paper: https://arxiv.org/pdf/1708.02002.pdf
Each time that a decision in the code mention "the paper" is the RetinaNet paper.
Source code
class FeaturePyramid(nn.Module): """Feature pyramid network. It takes the natural architecture of a convolutional network to generate a pyramid of features with little extra effort. It merges the outputs of each one of the ResNet layers with the previous one to give more semantically meaning (the deepest the layer the more semantic meaning it has) and the strong localization features of the first layers. For more information please read the original paper: https://arxiv.org/pdf/1612.03144.pdf This implementation also used the exact provided at RetinaNet paper: https://arxiv.org/pdf/1708.02002.pdf Each time that a decision in the code mention "the paper" is the RetinaNet paper. """ strides = [8, 16, 32, 64, 128] # The stride applied to obtain each output def __init__(self, resnet=18, features=256, levels=None, pretrained=True): """Initialize the network. It init a ResNet with the given depth and use 'features' as the number of channels for each feature map. Args: resnet (int): Indicates the depth of the resnet backbone. features (int): Indicates the depth (number of channels) of each feature map of the pyramid. levels (list of int): The numbers of the layers in the FPN to get their feature maps. If None is given it will return all the levels from 3 to 7. If some level is not present it won't return that feature map level of the pyramid. pretrained (bool): Indicates if the backbone must be pretrained. """ super(FeaturePyramid, self).__init__() if levels is None: levels = [3, 4, 5, 6, 7] self.levels = levels if resnet == 18: self.backbone = resnet18(pretrained) elif resnet == 34: self.backbone = resnet34(pretrained) elif resnet == 50: self.backbone = resnet50(pretrained) elif resnet == 101: self.backbone = resnet101(pretrained) elif resnet == 152: self.backbone = resnet152(pretrained) else: raise ValueError('Invalid ResNet depth: {}'.format(resnet)) # The paper names the output for each layer a C_x where x is the number of the layer # ResNet output feature maps has this depths for each layer c5_depth = self.backbone.output_channels[0] c4_depth = self.backbone.output_channels[1] c3_depth = self.backbone.output_channels[2] # The paper names the feature maps as P. We start from the last output, the more rich semantically. # Conv 1x1 to set features dimension and conv 3x3 to generate feature map self.p5_conv1 = nn.Conv2d(c5_depth, features, kernel_size=1, stride=1, padding=0) self.p5_conv2 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1) # Upsample to sum with c_4 self.p5_upsample = lambda input: nn.functional.interpolate(input, scale_factor=2, mode='nearest') self.p4_conv1 = nn.Conv2d(c4_depth, features, kernel_size=1, stride=1, padding=0) self.p4_conv2 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1) self.p4_upsample = lambda input: nn.functional.interpolate(input, scale_factor=2, mode='nearest') # We don't need to upsample c3 because we are not using c2 self.p3_conv1 = nn.Conv2d(c3_depth, features, kernel_size=1, stride=1, padding=0) self.p3_conv2 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1) # In the paper the generate also a p6 and p7 feature maps # p6 is obtained via a 3x3 convolution with stride 2 over c5 self.p6_conv = nn.Conv2d(c5_depth, features, kernel_size=3, stride=2, padding=1) # p7 is computed by applying ReLU followed with by a 3x3 convolution with stride 2 on p6 self.p7_relu = nn.ReLU() self.p7_conv = nn.Conv2d(features, features, kernel_size=3, stride=2, padding=1) def forward(self, images): """Generate the Feature Pyramid given the images as tensor. Args: images (torch.Tensor): A tensor with shape (batch size, 3, width, height). Returns: tuple: A tuple with the output for each level from 3 to 7. Shapes: - p3: (batch size, features, width / 8, height / 8) - p4: (batch size, features, width / 16, height / 16) - p5: (batch size, features, width / 32, height / 32) - p6: (batch size, features, width / 64, height / 64) - p7: (batch size, features, width / 128, height / 128) """ # Bottom-up pathway with resnet backbone c5, c4, c3 = self.backbone(images) # Top-down pathway and lateral connections p5 = self.p5_conv1(c5) p5_upsampled = self.p5_upsample(p5) p5 = self.p5_conv2(p5) p4 = self.p4_conv1(c4) p4 = p4 + p5_upsampled p4_upsampled = self.p4_upsample(p4) p4 = self.p4_conv2(p4) p3 = self.p3_conv1(c3) p3 = p3 + p4_upsampled p3 = self.p3_conv2(p3) features = {3: p3, 4: p4, 5: p5} if 6 in self.levels: p6 = self.p6_conv(c5) features[6] = p6 if 7 in self.levels: p7 = self.p7_relu(p6) p7 = self.p7_conv(p7) features[7] = p7 return [features[i] for i in self.levels]
Class variables
var strides
Methods
def __init__(self, resnet=18, features=256, levels=None, pretrained=True)
-
Initialize the network.
It init a ResNet with the given depth and use 'features' as the number of channels for each feature map.
Args
resnet
:int
- Indicates the depth of the resnet backbone.
features
:int
- Indicates the depth (number of channels) of each feature map of the pyramid.
levels
:list
ofint
- The numbers of the layers in the FPN to get their feature maps. If None is given it will return all the levels from 3 to 7. If some level is not present it won't return that feature map level of the pyramid.
pretrained
:bool
- Indicates if the backbone must be pretrained.
Source code
def __init__(self, resnet=18, features=256, levels=None, pretrained=True): """Initialize the network. It init a ResNet with the given depth and use 'features' as the number of channels for each feature map. Args: resnet (int): Indicates the depth of the resnet backbone. features (int): Indicates the depth (number of channels) of each feature map of the pyramid. levels (list of int): The numbers of the layers in the FPN to get their feature maps. If None is given it will return all the levels from 3 to 7. If some level is not present it won't return that feature map level of the pyramid. pretrained (bool): Indicates if the backbone must be pretrained. """ super(FeaturePyramid, self).__init__() if levels is None: levels = [3, 4, 5, 6, 7] self.levels = levels if resnet == 18: self.backbone = resnet18(pretrained) elif resnet == 34: self.backbone = resnet34(pretrained) elif resnet == 50: self.backbone = resnet50(pretrained) elif resnet == 101: self.backbone = resnet101(pretrained) elif resnet == 152: self.backbone = resnet152(pretrained) else: raise ValueError('Invalid ResNet depth: {}'.format(resnet)) # The paper names the output for each layer a C_x where x is the number of the layer # ResNet output feature maps has this depths for each layer c5_depth = self.backbone.output_channels[0] c4_depth = self.backbone.output_channels[1] c3_depth = self.backbone.output_channels[2] # The paper names the feature maps as P. We start from the last output, the more rich semantically. # Conv 1x1 to set features dimension and conv 3x3 to generate feature map self.p5_conv1 = nn.Conv2d(c5_depth, features, kernel_size=1, stride=1, padding=0) self.p5_conv2 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1) # Upsample to sum with c_4 self.p5_upsample = lambda input: nn.functional.interpolate(input, scale_factor=2, mode='nearest') self.p4_conv1 = nn.Conv2d(c4_depth, features, kernel_size=1, stride=1, padding=0) self.p4_conv2 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1) self.p4_upsample = lambda input: nn.functional.interpolate(input, scale_factor=2, mode='nearest') # We don't need to upsample c3 because we are not using c2 self.p3_conv1 = nn.Conv2d(c3_depth, features, kernel_size=1, stride=1, padding=0) self.p3_conv2 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1) # In the paper the generate also a p6 and p7 feature maps # p6 is obtained via a 3x3 convolution with stride 2 over c5 self.p6_conv = nn.Conv2d(c5_depth, features, kernel_size=3, stride=2, padding=1) # p7 is computed by applying ReLU followed with by a 3x3 convolution with stride 2 on p6 self.p7_relu = nn.ReLU() self.p7_conv = nn.Conv2d(features, features, kernel_size=3, stride=2, padding=1)
def forward(self, images)
-
Generate the Feature Pyramid given the images as tensor.
Args
images
:torch.Tensor
- A tensor with shape (batch size, 3, width, height).
Returns
tuple
- A tuple with the output for each level from 3 to 7. Shapes: - p3: (batch size, features, width / 8, height / 8) - p4: (batch size, features, width / 16, height / 16) - p5: (batch size, features, width / 32, height / 32) - p6: (batch size, features, width / 64, height / 64) - p7: (batch size, features, width / 128, height / 128)
Source code
def forward(self, images): """Generate the Feature Pyramid given the images as tensor. Args: images (torch.Tensor): A tensor with shape (batch size, 3, width, height). Returns: tuple: A tuple with the output for each level from 3 to 7. Shapes: - p3: (batch size, features, width / 8, height / 8) - p4: (batch size, features, width / 16, height / 16) - p5: (batch size, features, width / 32, height / 32) - p6: (batch size, features, width / 64, height / 64) - p7: (batch size, features, width / 128, height / 128) """ # Bottom-up pathway with resnet backbone c5, c4, c3 = self.backbone(images) # Top-down pathway and lateral connections p5 = self.p5_conv1(c5) p5_upsampled = self.p5_upsample(p5) p5 = self.p5_conv2(p5) p4 = self.p4_conv1(c4) p4 = p4 + p5_upsampled p4_upsampled = self.p4_upsample(p4) p4 = self.p4_conv2(p4) p3 = self.p3_conv1(c3) p3 = p3 + p4_upsampled p3 = self.p3_conv2(p3) features = {3: p3, 4: p4, 5: p5} if 6 in self.levels: p6 = self.p6_conv(c5) features[6] = p6 if 7 in self.levels: p7 = self.p7_relu(p6) p7 = self.p7_conv(p7) features[7] = p7 return [features[i] for i in self.levels]
class Regression (ancestors: SubModule, torch.nn.modules.module.Module)
-
Regression submodule of RetinaNet.
It generates, given a feature map, a tensor with the values of regression for each anchor.
Source code
class Regression(SubModule): """Regression submodule of RetinaNet. It generates, given a feature map, a tensor with the values of regression for each anchor. """ def __init__(self, in_channels, anchors=9, features=256): """Initialize the components of the network. Args: in_channels (int): Indicates the number of features (or channels) of the feature map. anchors (int, optional): Indicates the number of anchors (i.e. bounding boxes) per location in the feature map. features (int, optional): Indicates the number of features that the conv layers must have. """ super(Regression, self).__init__(in_channels, outputs=4, anchors=anchors, features=features) def forward(self, feature_map): """Generates the bounding box regression for each anchor and location in the feature map. Args: feature_map (torch.Tensor): A tensor with shape (batch size, in_channels, height, width). Returns: torch.Tensor: The tensor with bounding boxes values for the feature map. Shape: (batch size, height * width * number of anchors, 4) """ out = super(Regression, self).forward(feature_map) # Now, out has shape (batch size, 4 * number of anchors, width, height) out = out.permute(0, 2, 3, 1).contiguous() # Set regression values as the last dimension return out.view(out.shape[0], -1, 4) # Change the shape of the tensor
Methods
def __init__(self, in_channels, anchors=9, features=256)
-
Initialize the components of the network.
Args
in_channels
:int
- Indicates the number of features (or channels) of the feature map.
anchors
:int
, optional- Indicates the number of anchors (i.e. bounding boxes) per location in the feature map.
features
:int
, optional- Indicates the number of features that the conv layers must have.
Source code
def __init__(self, in_channels, anchors=9, features=256): """Initialize the components of the network. Args: in_channels (int): Indicates the number of features (or channels) of the feature map. anchors (int, optional): Indicates the number of anchors (i.e. bounding boxes) per location in the feature map. features (int, optional): Indicates the number of features that the conv layers must have. """ super(Regression, self).__init__(in_channels, outputs=4, anchors=anchors, features=features)
def forward(self, feature_map)
-
Generates the bounding box regression for each anchor and location in the feature map.
Args
feature_map
:torch.Tensor
- A tensor with shape (batch size, in_channels, height, width).
Returns
torch.Tensor: The tensor with bounding boxes values for the feature map. Shape: (batch size, height * width * number of anchors, 4)
Source code
def forward(self, feature_map): """Generates the bounding box regression for each anchor and location in the feature map. Args: feature_map (torch.Tensor): A tensor with shape (batch size, in_channels, height, width). Returns: torch.Tensor: The tensor with bounding boxes values for the feature map. Shape: (batch size, height * width * number of anchors, 4) """ out = super(Regression, self).forward(feature_map) # Now, out has shape (batch size, 4 * number of anchors, width, height) out = out.permute(0, 2, 3, 1).contiguous() # Set regression values as the last dimension return out.view(out.shape[0], -1, 4) # Change the shape of the tensor
class RetinaNet (ancestors: torch.nn.modules.module.Module)
-
RetinaNet network.
This Network is for object detection, so its outputs are the regressions for the bounding boxes for each anchor and the probabilities for each class for each anchor.
In training mode returns all the predicted values (all bounding boxes and classes for all the anchors) and in evaluation mode applies Non-Maximum suppresion to return only the relevant detections with shape (N, 5) where N are the number of detections and 5 are for the x1, y1, x2, y2, class' label.
Source code
class RetinaNet(nn.Module): """RetinaNet network. This Network is for object detection, so its outputs are the regressions for the bounding boxes for each anchor and the probabilities for each class for each anchor. In training mode returns all the predicted values (all bounding boxes and classes for all the anchors) and in evaluation mode applies Non-Maximum suppresion to return only the relevant detections with shape (N, 5) where N are the number of detections and 5 are for the x1, y1, x2, y2, class' label. """ def __init__(self, classes, resnet=18, features=None, anchors=None, fpn_levels=None, pretrained=True, device=None): """Initialize the network. Arguments: classes (int): The number of classes to detect. resnet (int, optional): The depth of the resnet backbone for the Feature Pyramid Network. features (dict, optional): The dict that indicates the features for each module of the network. anchors (dict, optional): The dict with the 'sizes', 'scales' and 'ratios' sequences to initialize the Anchors module. fpn_levels (list of int): The numbers of the layers in the FPN to get their feature maps. If None is given it will return all the levels from 3 to 7. If some level is not present it won't return that feature map level of the pyramid. pretrained (bool, optional): If the resnet backbone of the FPN must be pretrained on the ImageNet dataset. This pretraining is provided by the torchvision package. device (str, optional): The device where the module will run. """ super(RetinaNet, self).__init__() if features is None: features = {'pyramid': 256, 'regression': 256, 'classification': 256} if anchors is None: anchors = {'sizes': [32, 64, 128, 256, 512], 'scales': [2 ** 0, 2 ** (1/3), 2 ** (2/3)], 'ratios': [0.5, 1, 2]} self.device = device if device is not None else 'cuda:0' if torch.cuda.is_available() else 'cpu' # Modules self.fpn = FeaturePyramid( resnet=resnet, features=features['pyramid'], levels=fpn_levels, pretrained=pretrained ) if len(anchors['sizes']) != 5: raise ValueError('anchors_size must have length 5 to work with the FPN') self.anchors = Anchors(anchors['sizes'], anchors['scales'], anchors['ratios'], self.fpn.strides, device=self.device) self.regression = Regression(in_channels=features['pyramid'], anchors=self.anchors.n_anchors, features=features['regression']) self.classification = self.get_classification_module(in_channels=features['pyramid'], classes=classes, anchors=self.anchors.n_anchors, features=features['classification']) # Set the base threshold for evaluating mode self.threshold = 0.6 self.iou_threshold = 0.5 self.evaluate_loss = False def to(self, device): """Move the network to the given device and update the Anchors module that also needs to change its device to anchors generation. Arguments: device (str): The device to where to move the network. Example: 'cpu' or 'cuda:0'. """ self.device = device self.anchors.to(device) return super(RetinaNet, self).to(device) def get_classification_module(self, in_channels, classes, anchors, features): """Get the classification module to generate the probability of each anchor to be any class. Why to do a getter? Because this allow to override this method and compute the classification with another method. Arguments: in_channels (int): The number of channels of the feature map. classes (int): Indicates the number of classes to predict. anchors (int, optional): The number of anchors per location in the feature map. features (int, optional): Indicates the number of inner features that the conv layers must have. Returns: torch.nn.Module: The classification module for the net. """ return Classification(in_channels=in_channels, classes=classes, anchors=anchors, features=features) def eval(self, threshold=None, iou_threshold=None, loss=False): """Set the model in the evaluation mode. Keep only bounding boxes with predictions with score over threshold. Args: threshold (float): The threshold to keep only bounding boxes with a class' probability over it. iou_threshold (float): If two bounding boxes has Intersection Over Union more than this threshold they are detecting the same object. loss (bool): If true the forward pass does not do NMS and return the same output as if it is training. Why? To compute the loss over the validation dataset for example. """ if threshold is not None: self.threshold = threshold if iou_threshold is not None: self.iou_threshold = iou_threshold self.evaluate_loss = loss return super(RetinaNet, self).eval() def classify(self, feature_maps): """Perform the classification of the feature maps. Arguments: tuple: A tuple with the feature maps generated by the FPN backbone. Returns: torch.Tensor: The classification probability for each anchor. Shape: `(batch size, number of anchors, number of classes)` """ return torch.cat([self.classification(feature_map) for feature_map in feature_maps], dim=1) def forward(self, images): """Forward pass of the network. Returns the anchors and the probability for each class per anchor. In training mode (calling `model.train()`) it returns all the anchors ans classes' probabilities but in evaluation mode (calling `model.eval()`) it applies Non-Maximum Supresion to keep only the predictions that do not collide. On evaluation mode we cannot return only two tensors (bounding boxes and classifications) because different images could have different amounts of predictions over the threshold so we cannot keep all them in a single tensor. To avoid this problem in evaluation mode it returns a sequence of (bounding_boxes, classifications) for each image. Args: images (torch.Tensor): Tensor with the batch of images. Shape: (batch size, channels, height, width) Returns: In training mode: torch.Tensor: A tensor with the base anchors. Shape: (batch size, total anchors, 4) torch.Tensor: A tensor with the regression values to adjust the anchors to a bounding box. Shape: (batch size, total anchors, 4) torch.Tensor: A tensor with the probability of each class for each anchor. Shape: (batch size, total anchors, number of classes) In evaluation mode: sequence: A sequence of (bounding boxes, classifications) for each image. Bounding boxes: Tensor with shape (total predictions, 4). Classifications: Tensor with shape (total predictions, classes). """ feature_maps = self.fpn(images) # Get regressions and classifications values with shape (batch size, total anchors, 4) regressions = torch.cat([self.regression(feature_map) for feature_map in feature_maps], dim=1) classifications = self.classify(feature_maps) del feature_maps # Transform the anchors to bounding boxes anchors = self.anchors(images) if self.training: return anchors, regressions, classifications if self.evaluate_loss: return anchors.detach(), regressions.detach(), classifications.detach() anchors, regressions, classifications = anchors.detach(), regressions.detach(), classifications.detach() return self.transform(images, anchors, regressions, classifications) def transform(self, images, anchors, regressions, classifications): """Transform, clip and apply NMS to the predictions. Arguments: images (torch.Tensor): The images that passed through the network. Useful for clipping the predicted bounding boxes. Shape: (batch size, channels, height, width) anchors (torch.Tensor): The base anchors generated for the images. Shape: (batch size, amount of anchors, 4) regressions (torch.Tensor): The regression values to transform and adjust the bounding boxes. Shape: (batch size, amount of anchors, 4) classifications (torch.Tensor): The probabilities for each class for each anchor. Shape: (batch size, amount of anchors, amount of classes) Returns: sequence: A sequence of (bounding boxes, classifications) for each image. Bounding boxes: Tensor with shape (total predictions, 4). Classifications: Tensor with shape (total predictions, classes). """ bounding_boxes = self.anchors.transform(anchors, regressions).detach() del regressions # Clip the boxes to fit in the image bounding_boxes = self.anchors.clip(images, bounding_boxes).detach() # Generate a sequence of (bounding_boxes, classifications) for each image return [self.make_predictions(*self.nms(bounding_boxes[index], classifications[index])) for index in range(images.shape[0])] @staticmethod def make_predictions(boxes, classifications): """Join the boxes and the classification probabilities and make a single predictions per bounding box. Arguments: boxes (torch.Tensor): The bounding boxes generated for the image. Shape: `(num of boxes, 4)`. classifications (torch.Tensor): The probability for each class. Shape: `(num of boxes, num of classes)`. Returns: torch.Tensor: A single tensor with the label and the probability for each predicted bounding box. """ detections = torch.zeros(boxes.shape[0], 6) if boxes.shape[0] > 0: detections[:, :4] = boxes probs, labels = classifications.max(dim=1) detections[:, 4] = labels detections[:, 5] = probs return detections def nms(self, boxes, classifications): """Apply Non-Maximum Suppression over the detections to remove bounding boxes that are detecting the same object. Args: boxes (torch.Tensor): Tensor with the bounding boxes. Shape: (total anchors, 4) classifications (torch.Tensor): Tensor with the scores for each class for each anchor. Shape: (total anchors, number of classes) Returns: torch.Tensor: The bounding boxes to keep. torch.Tensor: The probabilities for each class for each bounding box keeped. """ # Get the max score of any class for each anchor scores, _ = classifications.max(dim=1) # Shape (total anchors,) # Keep only the bounding boxes and classifications over the threshold scores_over_threshold = scores > self.threshold boxes = boxes[scores_over_threshold, :] classifications = classifications[scores_over_threshold, :] if not classifications.shape[0] > 0: return torch.zeros((0)).to(self.device), torch.zeros((0)).to(self.device) # Update the scores to keep only the keeped boxes scores = classifications.max(dim=1)[0] # If there aren't detections return empty if boxes.shape[0] == 0: return torch.zeros(0).to(self.device) # Get the numpy version # was_cuda = detections.is_cuda # detections = detections.cpu().numpy() # Start the picked indexes list empty picked = [] # Get the coordinates x1 = boxes[:, 0].detach() y1 = boxes[:, 1].detach() x2 = boxes[:, 2].detach() y2 = boxes[:, 3].detach() # Compute the area of the bounding boxes areas = ((x2 - x1 + 1) * (y2 - y1 + 1)).detach() # Get the indexes of the detections sorted by score (lowest score first) _, indexes = scores.sort() del scores while indexes.shape[0] > 1: # Take the last index (highest score) and add it to the picked actual = int(indexes[-1]) picked.append(actual) # We need to find the overlap of the bounding boxes with the actual picked bounding box # Find the largest (more to the bottom-right) (x,y) coordinates for the start # (top-left) of the bounding box between the actual and all of the others with lower score xx1 = torch.max(x1[actual], x1[indexes[:-1]]) yy1 = torch.max(y1[actual], y1[indexes[:-1]]) # Find the smallest (more to the top-left) (x,y) coordinates for the end (bottom-right) # of the bounding box xx2 = torch.min(x2[actual], x2[indexes[:-1]]) yy2 = torch.min(y2[actual], y2[indexes[:-1]]) # Compute width and height to compute the intersection over union w = torch.max(torch.Tensor([0]).to(self.device), xx2 - xx1 + 1) h = torch.max(torch.Tensor([0]).to(self.device), yy2 - yy1 + 1) del xx1, yy1, xx2, yy2 intersection = (w * h) del h, w union = areas[actual] + areas[indexes[:-1]] - intersection iou = intersection / union # Delete the last index indexes = indexes[:-1] # Keep only the indexes that has overlap lower than the threshold indexes = indexes[iou < self.iou_threshold].detach() # Return the filtered bounding boxes and classifications return boxes[picked].detach(), classifications[picked].detach()
Static methods
def make_predictions(boxes, classifications)
-
Join the boxes and the classification probabilities and make a single predictions per bounding box.
Arguments
boxes
:torch.Tensor
- The bounding boxes generated for the image.
Shape:
(num of boxes, 4)
. classifications
:torch.Tensor
- The probability for each class.
Shape:
(num of boxes, num of classes)
.
Returns
torch.Tensor: A single tensor with the label and the probability for each predicted bounding box.
Source code
@staticmethod def make_predictions(boxes, classifications): """Join the boxes and the classification probabilities and make a single predictions per bounding box. Arguments: boxes (torch.Tensor): The bounding boxes generated for the image. Shape: `(num of boxes, 4)`. classifications (torch.Tensor): The probability for each class. Shape: `(num of boxes, num of classes)`. Returns: torch.Tensor: A single tensor with the label and the probability for each predicted bounding box. """ detections = torch.zeros(boxes.shape[0], 6) if boxes.shape[0] > 0: detections[:, :4] = boxes probs, labels = classifications.max(dim=1) detections[:, 4] = labels detections[:, 5] = probs return detections
Methods
def __init__(self, classes, resnet=18, features=None, anchors=None, fpn_levels=None, pretrained=True, device=None)
-
Initialize the network.
Arguments
classes
:int
- The number of classes to detect.
resnet
:int
, optional- The depth of the resnet backbone for the Feature Pyramid Network.
features
:dict
, optional- The dict that indicates the features for each module of the network.
anchors
:dict
, optional- The dict with the 'sizes', 'scales' and 'ratios' sequences to initialize the Anchors module.
fpn_levels
:list
ofint
- The numbers of the layers in the FPN to get their feature maps. If None is given it will return all the levels from 3 to 7. If some level is not present it won't return that feature map level of the pyramid.
pretrained
:bool
, optional- If the resnet backbone of the FPN must be pretrained on the ImageNet dataset. This pretraining is provided by the torchvision package.
device
:str
, optional- The device where the module will run.
Source code
def __init__(self, classes, resnet=18, features=None, anchors=None, fpn_levels=None, pretrained=True, device=None): """Initialize the network. Arguments: classes (int): The number of classes to detect. resnet (int, optional): The depth of the resnet backbone for the Feature Pyramid Network. features (dict, optional): The dict that indicates the features for each module of the network. anchors (dict, optional): The dict with the 'sizes', 'scales' and 'ratios' sequences to initialize the Anchors module. fpn_levels (list of int): The numbers of the layers in the FPN to get their feature maps. If None is given it will return all the levels from 3 to 7. If some level is not present it won't return that feature map level of the pyramid. pretrained (bool, optional): If the resnet backbone of the FPN must be pretrained on the ImageNet dataset. This pretraining is provided by the torchvision package. device (str, optional): The device where the module will run. """ super(RetinaNet, self).__init__() if features is None: features = {'pyramid': 256, 'regression': 256, 'classification': 256} if anchors is None: anchors = {'sizes': [32, 64, 128, 256, 512], 'scales': [2 ** 0, 2 ** (1/3), 2 ** (2/3)], 'ratios': [0.5, 1, 2]} self.device = device if device is not None else 'cuda:0' if torch.cuda.is_available() else 'cpu' # Modules self.fpn = FeaturePyramid( resnet=resnet, features=features['pyramid'], levels=fpn_levels, pretrained=pretrained ) if len(anchors['sizes']) != 5: raise ValueError('anchors_size must have length 5 to work with the FPN') self.anchors = Anchors(anchors['sizes'], anchors['scales'], anchors['ratios'], self.fpn.strides, device=self.device) self.regression = Regression(in_channels=features['pyramid'], anchors=self.anchors.n_anchors, features=features['regression']) self.classification = self.get_classification_module(in_channels=features['pyramid'], classes=classes, anchors=self.anchors.n_anchors, features=features['classification']) # Set the base threshold for evaluating mode self.threshold = 0.6 self.iou_threshold = 0.5 self.evaluate_loss = False
def classify(self, feature_maps)
-
Perform the classification of the feature maps.
Arguments
tuple
- A tuple with the feature maps generated by the FPN backbone.
Returns
torch.Tensor: The classification probability for each anchor. Shape:
(batch size, number of anchors, number of classes)
Source code
def classify(self, feature_maps): """Perform the classification of the feature maps. Arguments: tuple: A tuple with the feature maps generated by the FPN backbone. Returns: torch.Tensor: The classification probability for each anchor. Shape: `(batch size, number of anchors, number of classes)` """ return torch.cat([self.classification(feature_map) for feature_map in feature_maps], dim=1)
def eval(self, threshold=None, iou_threshold=None, loss=False)
-
Set the model in the evaluation mode. Keep only bounding boxes with predictions with score over threshold.
Args
threshold
:float
- The threshold to keep only bounding boxes with a class' probability over it.
iou_threshold
:float
- If two bounding boxes has Intersection Over Union more than this threshold they are detecting the same object.
loss
:bool
- If true the forward pass does not do NMS and return the same output as if it is training. Why? To compute the loss over the validation dataset for example.
Source code
def eval(self, threshold=None, iou_threshold=None, loss=False): """Set the model in the evaluation mode. Keep only bounding boxes with predictions with score over threshold. Args: threshold (float): The threshold to keep only bounding boxes with a class' probability over it. iou_threshold (float): If two bounding boxes has Intersection Over Union more than this threshold they are detecting the same object. loss (bool): If true the forward pass does not do NMS and return the same output as if it is training. Why? To compute the loss over the validation dataset for example. """ if threshold is not None: self.threshold = threshold if iou_threshold is not None: self.iou_threshold = iou_threshold self.evaluate_loss = loss return super(RetinaNet, self).eval()
def forward(self, images)
-
Forward pass of the network. Returns the anchors and the probability for each class per anchor.
In training mode (calling
model.train()
) it returns all the anchors ans classes' probabilities but in evaluation mode (callingmodel.eval()
) it applies Non-Maximum Supresion to keep only the predictions that do not collide.On evaluation mode we cannot return only two tensors (bounding boxes and classifications) because different images could have different amounts of predictions over the threshold so we cannot keep all them in a single tensor. To avoid this problem in evaluation mode it returns a sequence of (bounding_boxes, classifications) for each image.
Args
images
:torch.Tensor
- Tensor with the batch of images. Shape: (batch size, channels, height, width)
Returns
In training mode:
torch.Tensor: A tensor with the base anchors. Shape: (batch size, total anchors, 4) torch.Tensor: A tensor with the regression values to adjust the anchors to a bounding box. Shape: (batch size, total anchors, 4) torch.Tensor: A tensor with the probability of each class for each anchor. Shape: (batch size, total anchors, number of classes)
In evaluation mode:
sequence
- A sequence of (bounding boxes, classifications) for each image. Bounding boxes: Tensor with shape (total predictions, 4). Classifications: Tensor with shape (total predictions, classes).
Source code
def forward(self, images): """Forward pass of the network. Returns the anchors and the probability for each class per anchor. In training mode (calling `model.train()`) it returns all the anchors ans classes' probabilities but in evaluation mode (calling `model.eval()`) it applies Non-Maximum Supresion to keep only the predictions that do not collide. On evaluation mode we cannot return only two tensors (bounding boxes and classifications) because different images could have different amounts of predictions over the threshold so we cannot keep all them in a single tensor. To avoid this problem in evaluation mode it returns a sequence of (bounding_boxes, classifications) for each image. Args: images (torch.Tensor): Tensor with the batch of images. Shape: (batch size, channels, height, width) Returns: In training mode: torch.Tensor: A tensor with the base anchors. Shape: (batch size, total anchors, 4) torch.Tensor: A tensor with the regression values to adjust the anchors to a bounding box. Shape: (batch size, total anchors, 4) torch.Tensor: A tensor with the probability of each class for each anchor. Shape: (batch size, total anchors, number of classes) In evaluation mode: sequence: A sequence of (bounding boxes, classifications) for each image. Bounding boxes: Tensor with shape (total predictions, 4). Classifications: Tensor with shape (total predictions, classes). """ feature_maps = self.fpn(images) # Get regressions and classifications values with shape (batch size, total anchors, 4) regressions = torch.cat([self.regression(feature_map) for feature_map in feature_maps], dim=1) classifications = self.classify(feature_maps) del feature_maps # Transform the anchors to bounding boxes anchors = self.anchors(images) if self.training: return anchors, regressions, classifications if self.evaluate_loss: return anchors.detach(), regressions.detach(), classifications.detach() anchors, regressions, classifications = anchors.detach(), regressions.detach(), classifications.detach() return self.transform(images, anchors, regressions, classifications)
def get_classification_module(self, in_channels, classes, anchors, features)
-
Get the classification module to generate the probability of each anchor to be any class.
Why to do a getter? Because this allow to override this method and compute the classification with another method.
Arguments
in_channels
:int
- The number of channels of the feature map.
classes
:int
- Indicates the number of classes to predict.
anchors
:int
, optional- The number of anchors per location in the feature map.
features
:int
, optional- Indicates the number of inner features that the conv layers must have.
Returns
torch.nn.Module: The classification module for the net.
Source code
def get_classification_module(self, in_channels, classes, anchors, features): """Get the classification module to generate the probability of each anchor to be any class. Why to do a getter? Because this allow to override this method and compute the classification with another method. Arguments: in_channels (int): The number of channels of the feature map. classes (int): Indicates the number of classes to predict. anchors (int, optional): The number of anchors per location in the feature map. features (int, optional): Indicates the number of inner features that the conv layers must have. Returns: torch.nn.Module: The classification module for the net. """ return Classification(in_channels=in_channels, classes=classes, anchors=anchors, features=features)
def nms(self, boxes, classifications)
-
Apply Non-Maximum Suppression over the detections to remove bounding boxes that are detecting the same object.
Args
boxes
:torch.Tensor
- Tensor with the bounding boxes. Shape: (total anchors, 4)
classifications
:torch.Tensor
- Tensor with the scores for each class for each anchor. Shape: (total anchors, number of classes)
Returns
torch.Tensor: The bounding boxes to keep. torch.Tensor: The probabilities for each class for each bounding box keeped.
Source code
def nms(self, boxes, classifications): """Apply Non-Maximum Suppression over the detections to remove bounding boxes that are detecting the same object. Args: boxes (torch.Tensor): Tensor with the bounding boxes. Shape: (total anchors, 4) classifications (torch.Tensor): Tensor with the scores for each class for each anchor. Shape: (total anchors, number of classes) Returns: torch.Tensor: The bounding boxes to keep. torch.Tensor: The probabilities for each class for each bounding box keeped. """ # Get the max score of any class for each anchor scores, _ = classifications.max(dim=1) # Shape (total anchors,) # Keep only the bounding boxes and classifications over the threshold scores_over_threshold = scores > self.threshold boxes = boxes[scores_over_threshold, :] classifications = classifications[scores_over_threshold, :] if not classifications.shape[0] > 0: return torch.zeros((0)).to(self.device), torch.zeros((0)).to(self.device) # Update the scores to keep only the keeped boxes scores = classifications.max(dim=1)[0] # If there aren't detections return empty if boxes.shape[0] == 0: return torch.zeros(0).to(self.device) # Get the numpy version # was_cuda = detections.is_cuda # detections = detections.cpu().numpy() # Start the picked indexes list empty picked = [] # Get the coordinates x1 = boxes[:, 0].detach() y1 = boxes[:, 1].detach() x2 = boxes[:, 2].detach() y2 = boxes[:, 3].detach() # Compute the area of the bounding boxes areas = ((x2 - x1 + 1) * (y2 - y1 + 1)).detach() # Get the indexes of the detections sorted by score (lowest score first) _, indexes = scores.sort() del scores while indexes.shape[0] > 1: # Take the last index (highest score) and add it to the picked actual = int(indexes[-1]) picked.append(actual) # We need to find the overlap of the bounding boxes with the actual picked bounding box # Find the largest (more to the bottom-right) (x,y) coordinates for the start # (top-left) of the bounding box between the actual and all of the others with lower score xx1 = torch.max(x1[actual], x1[indexes[:-1]]) yy1 = torch.max(y1[actual], y1[indexes[:-1]]) # Find the smallest (more to the top-left) (x,y) coordinates for the end (bottom-right) # of the bounding box xx2 = torch.min(x2[actual], x2[indexes[:-1]]) yy2 = torch.min(y2[actual], y2[indexes[:-1]]) # Compute width and height to compute the intersection over union w = torch.max(torch.Tensor([0]).to(self.device), xx2 - xx1 + 1) h = torch.max(torch.Tensor([0]).to(self.device), yy2 - yy1 + 1) del xx1, yy1, xx2, yy2 intersection = (w * h) del h, w union = areas[actual] + areas[indexes[:-1]] - intersection iou = intersection / union # Delete the last index indexes = indexes[:-1] # Keep only the indexes that has overlap lower than the threshold indexes = indexes[iou < self.iou_threshold].detach() # Return the filtered bounding boxes and classifications return boxes[picked].detach(), classifications[picked].detach()
def to(self, device)
-
Move the network to the given device and update the Anchors module that also needs to change its device to anchors generation.
Arguments
device
:str
- The device to where to move the network. Example: 'cpu' or 'cuda:0'.
Source code
def to(self, device): """Move the network to the given device and update the Anchors module that also needs to change its device to anchors generation. Arguments: device (str): The device to where to move the network. Example: 'cpu' or 'cuda:0'. """ self.device = device self.anchors.to(device) return super(RetinaNet, self).to(device)
def transform(self, images, anchors, regressions, classifications)
-
Transform, clip and apply NMS to the predictions.
Arguments
images
:torch.Tensor
- The images that passed through the network. Useful for clipping the predicted bounding boxes. Shape: (batch size, channels, height, width)
anchors
:torch.Tensor
- The base anchors generated for the images. Shape: (batch size, amount of anchors, 4)
regressions
:torch.Tensor
- The regression values to transform and adjust the bounding boxes. Shape: (batch size, amount of anchors, 4)
classifications
:torch.Tensor
- The probabilities for each class for each anchor. Shape: (batch size, amount of anchors, amount of classes)
Returns
sequence
- A sequence of (bounding boxes, classifications) for each image. Bounding boxes: Tensor with shape (total predictions, 4). Classifications: Tensor with shape (total predictions, classes).
Source code
def transform(self, images, anchors, regressions, classifications): """Transform, clip and apply NMS to the predictions. Arguments: images (torch.Tensor): The images that passed through the network. Useful for clipping the predicted bounding boxes. Shape: (batch size, channels, height, width) anchors (torch.Tensor): The base anchors generated for the images. Shape: (batch size, amount of anchors, 4) regressions (torch.Tensor): The regression values to transform and adjust the bounding boxes. Shape: (batch size, amount of anchors, 4) classifications (torch.Tensor): The probabilities for each class for each anchor. Shape: (batch size, amount of anchors, amount of classes) Returns: sequence: A sequence of (bounding boxes, classifications) for each image. Bounding boxes: Tensor with shape (total predictions, 4). Classifications: Tensor with shape (total predictions, classes). """ bounding_boxes = self.anchors.transform(anchors, regressions).detach() del regressions # Clip the boxes to fit in the image bounding_boxes = self.anchors.clip(images, bounding_boxes).detach() # Generate a sequence of (bounding_boxes, classifications) for each image return [self.make_predictions(*self.nms(bounding_boxes[index], classifications[index])) for index in range(images.shape[0])]
class SubModule (ancestors: torch.nn.modules.module.Module)
-
Base class for the regression and classification submodules.
Source code
class SubModule(nn.Module): """Base class for the regression and classification submodules.""" def __init__(self, in_channels, outputs, anchors=9, features=256): """Initialize the components of the network. Args: in_channels (int): Indicates the number of features (or channels) of the feature map. outputs (int): The number of outputs per anchor. anchors (int, optional): Indicates the number of anchors per location in the feature map. features (int, optional): Indicates the number of features that the conv layers must have. """ super(SubModule, self).__init__() self.conv1 = nn.Conv2d(in_channels, features, kernel_size=3, stride=1, padding=1) self.act1 = nn.ReLU() self.conv2 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1) self.act2 = nn.ReLU() self.conv3 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1) self.act3 = nn.ReLU() self.conv4 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1) self.act4 = nn.ReLU() self.last_conv = nn.Conv2d(features, outputs * anchors, kernel_size=3, stride=1, padding=1) def forward(self, feature_map): """Generates the outputs for each anchor and location in the feature map. Args: feature_map (torch.Tensor): A tensor with shape (batch size, in_channels, width, height). Returns: torch.Tensor: The tensor with outputs values for each location and anchor in the feature map. Shape: (batch size, outputs * anchors, width, height) """ out = self.conv1(feature_map) out = self.act1(out) out = self.conv2(out) out = self.act2(out) out = self.conv3(out) out = self.act3(out) out = self.conv4(out) out = self.act4(out) out = self.last_conv(out) return out
Subclasses
Methods
def __init__(self, in_channels, outputs, anchors=9, features=256)
-
Initialize the components of the network.
Args
in_channels
:int
- Indicates the number of features (or channels) of the feature map.
outputs
:int
- The number of outputs per anchor.
anchors
:int
, optional- Indicates the number of anchors per location in the feature map.
features
:int
, optional- Indicates the number of features that the conv layers must have.
Source code
def __init__(self, in_channels, outputs, anchors=9, features=256): """Initialize the components of the network. Args: in_channels (int): Indicates the number of features (or channels) of the feature map. outputs (int): The number of outputs per anchor. anchors (int, optional): Indicates the number of anchors per location in the feature map. features (int, optional): Indicates the number of features that the conv layers must have. """ super(SubModule, self).__init__() self.conv1 = nn.Conv2d(in_channels, features, kernel_size=3, stride=1, padding=1) self.act1 = nn.ReLU() self.conv2 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1) self.act2 = nn.ReLU() self.conv3 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1) self.act3 = nn.ReLU() self.conv4 = nn.Conv2d(features, features, kernel_size=3, stride=1, padding=1) self.act4 = nn.ReLU() self.last_conv = nn.Conv2d(features, outputs * anchors, kernel_size=3, stride=1, padding=1)
def forward(self, feature_map)
-
Generates the outputs for each anchor and location in the feature map.
Args
feature_map
:torch.Tensor
- A tensor with shape (batch size, in_channels, width, height).
Returns
torch.Tensor: The tensor with outputs values for each location and anchor in the feature map. Shape: (batch size, outputs * anchors, width, height)
Source code
def forward(self, feature_map): """Generates the outputs for each anchor and location in the feature map. Args: feature_map (torch.Tensor): A tensor with shape (batch size, in_channels, width, height). Returns: torch.Tensor: The tensor with outputs values for each location and anchor in the feature map. Shape: (batch size, outputs * anchors, width, height) """ out = self.conv1(feature_map) out = self.act1(out) out = self.conv2(out) out = self.act2(out) out = self.conv3(out) out = self.act3(out) out = self.conv4(out) out = self.act4(out) out = self.last_conv(out) return out