#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (C) IBM Corporation 2018
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
simple_cnn: a simple Convolutional Neural Network (CNN) designed specifically to solve MNIST and CIFAR 10 dataset. \
To be taken as an illustrative example.
"""
__author__ = "Younes Bouhadjar & Vincent Marois"
import torch
import numpy as np
import torch.nn as nn
from miprometheus.models.model import Model
[docs]class SimpleConvNet(Model):
"""
A simple 2 layers CNN designed specifically to solve ``MNIST`` & ``CIFAR10`` datasets. \
The parameters here are not hardcoded so the user can adjust them for his application, \
and see their impact on the model's behavior.
"""
[docs] def __init__(self, params, problem_default_values_={}):
"""
Constructor of the ``SimpleConvNet``. \
The overall structure of this CNN is as follows:
Conv1 -> MaxPool1 -> ReLu -> Conv2 -> MaxPool2 -> ReLu (-> flatten) -> Linear1 -> Linear2 -> Linear3
The parameters that the user can change are:
- For Conv1 & Conv2: number of output channels, kernel size, stride and padding.
- For MaxPool1 & MaxPool2: Kernel size
- For Linear3: The number of classes is read from ``problem_default_values_``. The number of output nodes for \
Linear1 is set to 120, and Linear2 is fixed to 120 -> 84 for now. Linear3 is 84 -> nb_classes.
.. note::
We are using the default values of ``dilatation``, ``groups`` & ``bias`` for ``nn.Conv2D``.
Similarly for the ``stride``, ``padding``, ``dilatation``, ``return_indices`` & ``ceil_mode`` of \
``nn.MaxPool2D``.
The size of the images (width, height, number of channels) are read from ``problem_default_values_``. \
Also, it is possible that the images are padded (with 0s) by the ``Problem`` class. The padding values \
(e.g. [2,2,2,2]) should be indicated in ``problem_default_values_``, so that we can adjust the width & height.
.. note::
The images will be upscaled to [224, 224] (which is the input size of AlexNet, so this would \
allow for comparison) if ``problem_default_values_['up_scaling']`` is ``True``.
:param params: dict of parameters (read from configuration ``.yaml`` file).
:type params: utils.ParamInterface
:param problem_default_values_: default values coming from the ``Problem`` class.
:type problem_default_values_: dict
"""
# call base constructor.
super(SimpleConvNet, self).__init__(params, problem_default_values_)
# retrieve the Conv1 parameters
self.out_channels_conv1 = params['conv1']['out_channels']
self.kernel_size_conv1 = params['conv1']['kernel_size']
self.stride_conv1 = params['conv1']['stride']
self.padding_conv1 = params['conv1']['padding']
# retrieve the Conv2 parameters
self.out_channels_conv2 = params['conv2']['out_channels']
self.kernel_size_conv2 = params['conv2']['kernel_size']
self.stride_conv2 = params['conv2']['stride']
self.padding_conv2 = params['conv2']['padding']
# retrieve the MaxPool1 parameter
self.kernel_size_maxpool1 = params['maxpool1']['kernel_size']
# retrieve the MaxPool2 parameter
self.kernel_size_maxpool2 = params['maxpool2']['kernel_size']
# model name
self.name = 'SimpleConvNet'
# get image information from the problem class
try:
self.height = problem_default_values_['height']
self.width = problem_default_values_['width']
self.num_classes = problem_default_values_['num_classes']
self.num_channels = problem_default_values_['num_channels']
except KeyError:
self.logger.warning("Couldn't retrieve one or more value(s) from problem_default_values")
exit(-1)
self.data_definitions = {'images': {'size': [-1, self.num_channels, self.height, self.width],
'type': [torch.Tensor]},
'targets': {'size': [-1, 1], 'type': [torch.Tensor]}
}
# We can compute the spatial size of the output volume as a function of the input volume size (W),
# the receptive field size of the Conv Layer neurons (F), the stride with which they are applied (S),
# and the amount of zero padding used (P) on the border.
# The corresponding equation is conv_size = ((W−F+2P)/S)+1.
# doc for nn.Conv2D: https://pytorch.org/docs/stable/nn.html#torch.nn.Conv2d
# doc for nn.MaxPool2D: https://pytorch.org/docs/stable/nn.html#torch.nn.MaxPool2d
# Conv1
self.conv1 = nn.Conv2d(in_channels=self.num_channels,
out_channels=self.out_channels_conv1,
kernel_size=self.kernel_size_conv1,
stride=self.stride_conv1,
padding=self.padding_conv1,
dilation=1,
groups=1,
bias=True)
self.width_features_conv1 = np.floor(
((self.width - self.kernel_size_conv1 + 2*self.padding_conv1) / self.stride_conv1) + 1)
self.height_features_conv1 = np.floor(
((self.height - self.kernel_size_conv1 + 2*self.padding_conv1) / self.stride_conv1) + 1)
# ----------------------------------------------------
# MaxPool1
self.maxpool1 = nn.MaxPool2d(kernel_size=self.kernel_size_maxpool1)
self.width_features_maxpool1 = np.floor(
((self.width_features_conv1 - self.maxpool1.kernel_size + 2 * self.maxpool1.padding) / self.maxpool1.stride) + 1)
self.height_features_maxpool1 = np.floor(
((self.height_features_conv1 - self.maxpool1.kernel_size + 2 * self.maxpool1.padding) / self.maxpool1.stride) + 1)
# ----------------------------------------------------
# Conv2
self.conv2 = nn.Conv2d(in_channels=self.out_channels_conv1,
out_channels=self.out_channels_conv2,
kernel_size=self.kernel_size_conv2,
stride=self.stride_conv2,
padding=self.padding_conv2,
dilation=1,
groups=1,
bias=True)
self.width_features_conv2 = np.floor(
((self.width_features_maxpool1 - self.kernel_size_conv2 + 2*self.padding_conv2) / self.stride_conv2) + 1)
self.height_features_conv2 = np.floor(
((self.height_features_maxpool1 - self.kernel_size_conv2 + 2*self.padding_conv2) / self.stride_conv2) + 1)
# ----------------------------------------------------
# MaxPool2
self.maxpool2 = nn.MaxPool2d(kernel_size=self.kernel_size_maxpool2)
self.width_features_maxpool2 = np.floor(
((self.width_features_conv2 - self.maxpool2.kernel_size + 2 * self.maxpool2.padding) / self.maxpool2.stride) + 1)
self.height_features_maxpool2 = np.floor(
((self.height_features_conv2 - self.maxpool2.kernel_size + 2 * self.maxpool2.padding) / self.maxpool2.stride) + 1)
# ----------------------------------------------------
# Linear layers
self.linear1 = nn.Linear(in_features=int(self.out_channels_conv2 * self.width_features_maxpool2 * self.height_features_maxpool2),
out_features=120)
self.linear2 = nn.Linear(in_features=120, out_features=84)
self.linear3 = nn.Linear(in_features=84, out_features=self.num_classes)
# log some info.
self.logger.info('Computed output shape of each layer:')
self.logger.info('Input: [N, {}, {}, {}]'.format(self.num_channels, self.width, self.height))
self.logger.info('Conv1: [N, {}, {}, {}]'.format(self.out_channels_conv1, self.width_features_conv1,
self.height_features_conv1))
self.logger.info('MaxPool1: [N, {}, {}, {}]'.format(self.out_channels_conv1, self.width_features_maxpool1,
self.height_features_maxpool1))
self.logger.info('Conv2: [N, {}, {}, {}]'.format(self.out_channels_conv2, self.width_features_conv2,
self.height_features_conv2))
self.logger.info('MaxPool2: [N, {}, {}, {}]'.format(self.out_channels_conv2, self.width_features_maxpool2,
self.height_features_maxpool2))
self.logger.info('Flatten: [N, {}]'.format(self.out_channels_conv2 * self.width_features_maxpool2 *
self.height_features_maxpool2))
self.logger.info('Linear1: [N, {}]'.format(self.linear1.out_features))
self.logger.info('Linear2: [N, {}]'.format(self.linear2.out_features))
self.logger.info('Linear3: [N, {}]'.format(self.linear3.out_features))
if self.app_state.visualize:
self.output_conv1 = []
self.output_conv2 = []
[docs] def forward(self, data_dict):
"""
forward pass of the ``SimpleConvNet`` model.
:param data_dict: DataDict({'images','targets', 'targets_label'}), where:
- images: [batch_size, num_channels, width, height],
- targets [batch_size]
:return: Predictions [batch_size, num_classes]
"""
# get images
images = data_dict['images']
# apply Convolutional layer 1
out_conv1 = self.conv1(images)
if self.app_state.visualize:
self.output_conv1 = out_conv1
# apply max_pooling and relu
out_maxpool1 = torch.nn.functional.relu(self.maxpool1(out_conv1))
# apply Convolutional layer 2
out_conv2 = self.conv2(out_maxpool1)
if self.app_state.visualize:
self.output_conv2 = out_conv2
# apply max_pooling and relu
out_maxpool2 = torch.nn.functional.relu(self.maxpool2(out_conv2))
# flatten for the linear layers
x = out_maxpool2.view(-1, int(self.out_channels_conv2 * self.width_features_maxpool2 * self.height_features_maxpool2))
# apply 3 linear layers
x = torch.nn.functional.relu(self.linear1(x))
x = torch.nn.functional.relu(self.linear2(x))
x = self.linear3(x)
return x
[docs] def plot(self, data_dict, predictions, sample_number=0):
"""
Simple plot - shows the ``Problem``'s images with the target & actual predicted class.\
:param data_dict: DataDict({'images','targets', 'targets_label'})
:type data_dict: utils.DataDict
:param predictions: Predictions of the ``SimpleConvNet``.
:type predictions: torch.tensor
:param sample_number: Index of the sample in batch (DEFAULT: 0).
:type sample_number: int
"""
# Check if we are supposed to visualize at all.
if not self.app_state.visualize:
return False
import matplotlib
# unpack data_dict
images = data_dict['images']
targets = data_dict['targets']
# Get sample.
image = images[sample_number].cpu().detach().numpy()
target = targets[sample_number].cpu().detach().numpy()
prediction = predictions[sample_number].cpu().detach().numpy()
# Reshape image.
if image.shape[0] == 1:
# This is single channel image - get rid of that dimension
image = np.squeeze(image, axis=0)
else:
# More channels - move channels to axis2
# (X : array_like, shape (n, m) or (n, m, 3) or (n, m, 4))
image = image.transpose(1, 2, 0)
# Show data.
matplotlib.pyplot.title('Prediction: Class # {} (Target: Class # {})'.format(
np.argmax(prediction), target))
matplotlib.pyplot.imshow(image, interpolation='nearest', aspect='auto')
# Show the feature maps of Conv1
f1 = matplotlib.pyplot.figure()
grid_size = int(np.sqrt(self.out_channels_conv1)) + 1
gs = matplotlib.gridspec.GridSpec(grid_size, grid_size)
for i in range(self.out_channels_conv1):
ax = matplotlib.pyplot.subplot(gs[i])
ax.imshow(self.output_conv1[0, i].detach().numpy())
f1.suptitle('feature maps of Conv1')
# Show the feature maps of Conv2
f2 = matplotlib.pyplot.figure()
grid_size = int(np.sqrt(self.out_channels_conv2)) + 1
gs = matplotlib.gridspec.GridSpec(grid_size, grid_size)
for i in range(self.out_channels_conv2):
ax = matplotlib.pyplot.subplot(gs[i])
ax.imshow(self.output_conv2[0, i].detach().numpy())
f2.suptitle('feature maps of Conv2')
# Plot!
matplotlib.pyplot.show()
if __name__ == '__main__':
# Set visualization.
from miprometheus.utils.app_state import AppState
AppState().visualize = True
from miprometheus.utils.param_interface import ParamInterface
from torch.utils.data import DataLoader
from miprometheus.problems.image_to_class.mnist import MNIST
problem_params = ParamInterface()
problem_params.add_config_params({'use_train_data': True,
'root_dir': '~/data/mnist',
'padding': [0, 0, 0, 0],
'up_scaling': False})
batch_size = 64
# create problem
problem = MNIST(problem_params)
print('Problem {} instantiated.'.format(problem.name))
# instantiate DataLoader object
dataloader = DataLoader(problem, batch_size=batch_size, collate_fn=problem.collate_fn)
# Test base model.
from miprometheus.utils.param_interface import ParamInterface
model_params = ParamInterface()
model_params.add_config_params({'conv1': {'out_channels': 6,
'kernel_size': 5,
'stride': 1,
'padding': 0},
'conv2': {'out_channels': 16,
'kernel_size': 5,
'stride': 1,
'padding': 0},
'maxpool1': {'kernel_size': 2},
'maxpool2': {'kernel_size': 2}})
# model
model = SimpleConvNet(model_params, problem.default_values)
print('Model {} instantiated.'.format(model.name))
# perform handshaking between MAC & CLEVR
model.handshake_definitions(problem.data_definitions)
# generate a batch
for i_batch, sample in enumerate(dataloader):
print('Sample # {} - {}'.format(i_batch, sample['images'].shape), type(sample))
logits = model(sample)
print(logits.shape)
# Plot it and check whether window was closed or not.
if model.plot(sample, logits):
break