Source code for miprometheus.problems.image_text_to_class.sort_of_clevr

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (C) IBM Corporation 2018
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
sort_of_clevr.py: ``Sort-of-CLEVR`` is a simplified version of the ``CLEVR`` dataset.

"""
__author__ = "Tomasz Kornuta & Vincent Marois"

import os
import h5py
import numpy as np
from PIL import Image, ImageDraw
import tqdm

import torch
from miprometheus.utils.data_dict import DataDict
from miprometheus.problems.image_text_to_class.image_text_to_class_problem import ImageTextToClassProblem, ObjectRepresentation


[docs]class SortOfCLEVR(ImageTextToClassProblem): """ ``Sort-of-CLEVR`` is a simple VQA problem, where the goal is to answer the\ question regarding a given image. Implementation of the generation is\ inspired by: https://github.com/gitlimlab/Relation-Network-Tensorflow Improvements: - Generates scenes with dynamic varying number of objects (2-6) - More types of intra- and inter-relational questions - More natural interpretation of questions :param data_folder: folder where to look for or save the file containing the dataset :type data_folder: str :param split: Indicates either ``train``, ``test`` or ``val`` :type split: str :param img_size: Size of the images to generate. :type img_size: int :param size: How many samples to generate. :type size: int :param regenerate: Whether to regenerate the dataset :type regenerate: Bool .. note:: When generating the dataset, this class: - First verifies if a file with a matching filename already exists in the ``data_folder``. The filename follows the following template: >>> filename = '<split>_<size>_<img_size>.hy' - If such a file exists, it is loaded and used as the dataset. If not, it is created and then used. - If ``regenerate`` is ``True``, the file is recreated regardless if one with the matching filename\ already exists or not. .. note:: The following is set by default: >>> params = {'data_folder': '~/data/sort-of-clevr/', >>> 'split': 'train', >>> 'regenerate': False, >>> 'size': 10000, >>> 'img_size': 128} """
[docs] def __init__(self, params): """ Initializes ``Sort-of-CLEVR`` problem, calls base class ``ImageTextToClassProblem``\ initialization, sets properties using the provided parameters. :param params: Dictionary of parameters (read from configuration ``.yaml`` file). """ # Call base class constructors. super(SortOfCLEVR, self).__init__(params) # problem name self.name = 'Sort-of-CLEVR' # Set default parameters. self.params.add_default_params({'data_folder': '~/data/sort-of-clevr/', 'split': 'train', 'regenerate': False, 'size': 10000, 'img_size': 128}) # parse params self.img_size = params["img_size"] self.dataset_size = params["size"] self.regenerate = params.get("regenerate", False) # Set general color properties. self.BG_COLOR = (180, 180, 150) self.COLOR = [ (0, 0, 210), # 'blue' (0, 210, 0), # 'green' (210, 0, 0), # 'red' (150, 150, 0), # 'yellow' (150, 0, 150), # 'magenta' (0, 150, 150), # 'cyan' # add more colors here if needed ] # Other hardcoded parameters. self.NUM_SHAPES = 2 self.NUM_COLORS = len(self.COLOR) self.NUM_QUESTIONS = 7 # Objects are characterised by colors, so cannot have more objects than # colors. self.MAX_NUM_OBJECTS = min(6, self.NUM_COLORS) self.GRID_SIZE = 4 # Get absolute path. data_folder = os.path.expanduser(params['data_folder']) # create the folder if it doesn't exist if not os.path.isdir(data_folder): self.logger.warning('Indicated data_folder does not exist, creating it.') os.mkdir(data_folder) # construct the dataset filename from 3 values: # set: either 'train', 'test' or 'val' # dataset size # image size data_filename = '{}_{}_{}.hy'.format(params['split'], str(self.dataset_size), str(self.img_size)) # define the default_values dict: holds parameters values that a model may need. self.default_values = {'height': self.img_size, 'width': self.img_size, 'num_channels': 3, 'num_classes': 10, 'question_size': 13} # define the data_definitions dict: holds a description of the DataDict content self.data_definitions = {'images': {'size': [-1, 3, self.img_size, self.img_size], 'type': [torch.Tensor]}, 'questions': {'size': [-1, self.NUM_COLORS+self.NUM_QUESTIONS], 'type': [torch.Tensor]}, 'targets_classes': {'size': [-1, self.NUM_COLORS+self.NUM_SHAPES+2], 'type': [torch.Tensor]}, 'targets': {'size': [-1], 'type': [torch.Tensor]}, 'scenes_description': {'size': [-1, -1], 'type': [list, str]}, } # Load or generate the dataset. self.load_dataset(data_folder, data_filename) self.length = self.dataset_size
[docs] def load_dataset(self, data_folder, data_filename): """ Loads the dataset from the HDF5-encoded file. .. note:: This function will look first if a dataset with the same filename already exists or not in\ the specified ``data_folder`` (this filename contains the number of samples and image size of the\ samples). If no such file does not exist, it is generated and saved in ``data_folder`` (with\ the specified ``data_filename``). """ # name of the file to look for or create self.filename = os.path.join(data_folder, data_filename) if self.regenerate: self.logger.warning('Regenerate is set to true: regenerating the dataset from scratch, ' 'without looking for an existing one.') self.generate_h5py_dataset(self.filename) else: # regenerate is false, looking if the file already exists if os.path.isfile(self.filename): self.logger.warning('Found file {}, using it as the dataset as it matches the filename template.'.format(self.filename)) else: # the file doesn't exist, we need to create it. self.logger.warning('File {} not found on disk, generating a new dataset.'.format(self.filename)) self.generate_h5py_dataset(self.filename)
[docs] def generate_h5py_dataset(self, filename): """ Generates a whole new ``Sort-of-CLEVR`` dataset and saves it in the form of\ a HDF5 file. :param filename: name of the file containing the samples. :type filename: str """ # open the HDF5 file. file = h5py.File(filename, 'w') # progress bar t = tqdm.tqdm(total=self.dataset_size, unit=" samples", unit_scale=True, unit_divisor=1000) # Initialise t.set_postfix(file=self.filename, refresh=False) count = 0 while count < self.dataset_size: # Generate the scene. objects = self.generate_scene_representation() # Generate corresponding image, questions and answers. I = self.generate_image(objects) Q = self.generate_question_matrix(objects) A = self.generate_answer_matrix(objects) # Iterate through all questions generated for a given scene. for j in range(len(objects) * self.NUM_QUESTIONS): # Create new group. id = str(count) grp = file.create_group(id) # Set data. grp['image'] = I grp['question'] = Q[j, ...] grp['answer'] = A[j, ...] grp['scene_description'] = self.scene2str(objects) # Increment counter. count += 1 t.update() # Check whether we generated the required number of samples if count >= self.dataset_size: break # Finalize the generation. t.close() file.close() self.logger.info('Generated dataset with {} samples and saved to {}'.format(self.dataset_size, self.filename))
[docs] def __getitem__(self, index): """ Getter method to access the dataset and return a sample. .. warning:: **HDF5 does not support multi threaded data access with num_workers > 1 on the data loading.** A way around this is to move every call for opening the HDF5 file to this ``__getitem__`` method. See https://discuss.pytorch.org/t/hdf5-multi-threaded-alternative/6189/9 for more info. :param index: index of the sample to return. :return: DataDict({'images','questions', 'targets', 'targets_index', 'scenes_description'}), with: - images: images (``self.img_size``) - questions: encoded questions - targets: one-hot encoded answers - targets_index: index of the answers - scenes_description: Scene description. """ # load the file data = h5py.File(self.filename, 'r') sample = data[str(index)] data_dict = DataDict({key: None for key in self.data_definitions.keys()}) data_dict['images'] = (sample['image'].value / 255).transpose(2, 1, 0) data_dict['questions'] = sample['question'].value.astype(np.float32) data_dict['targets_classes'] = sample['answer'].value.astype(np.float32) data_dict['targets'] = np.argmax(data_dict['targets_classes']) data_dict['scenes_description'] = sample['scene_description'].value return data_dict
[docs] def collate_fn(self, batch): """ Combines a list of ``DataDict`` (retrieved with ``__getitem__`` ) into a batch. .. note:: This function wraps a call to ``default_collate`` and simply returns the batch as a ``DataDict``\ instead of a dict. :param batch: list of individual ``DataDict`` samples to combine. :return: ``DataDict({'images','questions', 'targets', 'targets_index', 'scenes_description'})`` containing the batch. """ return DataDict({key: value for key, value in zip(self.data_definitions.keys(), super(SortOfCLEVR, self).collate_fn(batch).values())})
[docs] def color2str(self, color_index): """ Decodes the specified color index and returns it as a string. :param color_index: Index of the color. :type color_index: int :return: color name as a string. """ return { 0: 'blue', 1: 'green', 2: 'red', 3: 'yellow', 4: 'magenta', 5: 'cyan', }[color_index]
[docs] def shape2str(self, shape_index): """ Decodes the specified shape index and returns it as a string. :param shape_index: Index of the color. :type shape_index: int :return: shape name as a string. """ return { 0: 'rectangle', 1: 'circle', }[shape_index]
[docs] def question_type_template(self, question_index): """ Decodes the specified question index and returns the corresponding string template. :param question_index: Index of the color. :type question_index: int :return: corresponding string template. """ return { 0: 'What is the shape of the {} object?', 1: 'Is the {} {} closer to the bottom of the image?', 2: 'Is the {} {} closer to the left side of the image?', 3: 'What is the shape of the object nearest to the {} {}?', 4: 'What is the shape of the object farthest from the {} {}?', 5: 'What is the color of the object nearest to the {} {}?', 6: 'What is the color of the object farthest from the {} {}?', # 7: 'How many objects have the same shape as the {} {}?, }[question_index]
[docs] def question2str(self, encoded_question): """ Decodes the encoded question, i.e. produces a human-understandable string. :param encoded_question: Concatenation of two one-hot vectors: - The first one denotes the object of interest (its color), - The second one denotes the question type. :type encoded_question: tensor :return: The question as a human-understandable string. """ # "Decode" the color_query vector. color = np.argmax(encoded_question[:self.NUM_COLORS]) question_code = np.argmax(encoded_question[self.NUM_COLORS:]) # Return the question as a string. return self.question_type_template(question_code).format(self.color2str(color), 'object')
[docs] def answer2str(self, encoded_answer): """ Decodes the answer and returns the corresponding label. :param encoded_answer: Answer index, encoded as a one-hot vector. :type encoded_answer: np.array :return: answer label. """ return { # 0-5 colors 0: 'blue', 1: 'green', 2: 'red', 3: 'yellow', 4: 'magenta', 5: 'cyan', # 6-7 shapes 6: 'rectangle', 7: 'circle', # 8-9 yes/no 8: 'yes', 9: 'no', }[np.floor(encoded_answer)]
[docs] def scene2str(self, objects): """ Returns a string containing the shape, color and position of every object forming the scene. :param objects: List of objects - abstract scene representation. :type object: list :return: Str containing the scene description. """ desc = '| ' for obj in objects: # Add description desc = desc + ('{} {} at ({}, {}) | '.format(self.color2str(obj.color), self.shape2str(obj.shape), obj.x, obj.y)) return desc
[docs] def generate_scene_representation(self): """ Generates the scene representation. :return: List of objects - abstract scene representation. """ # Generate list of objects - no more then the number of colors. num_objects = np.random.random_integers(2, self.MAX_NUM_OBJECTS) # Shuffle "grid positions". grid_positions = np.arange(self.GRID_SIZE * self.GRID_SIZE) np.random.shuffle(grid_positions) # Size of a "grid block". block_size = int(self.img_size * 0.9 / self.GRID_SIZE) # Shuffle colors. colors = np.arange(self.NUM_COLORS) np.random.shuffle(colors) colors = colors[:num_objects] # Generate shapes. shapes = (np.random.rand(num_objects) < 0.5).astype(int) # List of objects presents in the scene. objects = [] # Generate coordinates. for i in range(num_objects): # Calculate object positions depending on "grid positions" x = grid_positions[i] % self.GRID_SIZE y = ( self.GRID_SIZE - np.floor( grid_positions[i] / self.GRID_SIZE) - 1).astype( np.uint8) # Calculate "image coordinates". x_img = (x + 0.5) * block_size + np.random.random_integers(-2, 2) y_img = (y + 0.5) * block_size + np.random.random_integers(-2, 2) # Add object to list. objects.append(ObjectRepresentation( x_img, y_img, colors[i], shapes[i])) return objects
[docs] def generate_image(self, objects): """ Generates the image on the basis of a given scene representation. :param objects: List of objects - abstract scene representation. :type object: list :return: ``np.array`` containing the generated image. """ img_size = self.img_size shape_size = int((img_size * 0.9 / self.GRID_SIZE) * 0.7 / 2) # Generate image [img_size, img_size, 3] img = Image.new('RGB', (img_size, img_size), color=self.BG_COLOR) drawer = ImageDraw.Draw(img) for obj in objects: # Calculate object position. position = (obj.x - shape_size, obj.y - shape_size, obj.x + shape_size, obj.y + shape_size) # Draw object. if obj.shape == 1: drawer.ellipse(position, fill=self.COLOR[obj.color]) else: drawer.rectangle(position, fill=self.COLOR[obj.color]) # Cast to np. return np.array(img)
[docs] def generate_question_matrix(self, objects): """ Generates the questions matrix: [# of shape * # of Q, # of color + # of Q]. This matrix contains all possible questions for a given scene representation. :param objects: List of objects - abstract scene representation. :type object: list :return the questions matrix (``np.array``) """ Q = np.zeros((len(objects) * self.NUM_QUESTIONS, self.NUM_COLORS + self.NUM_QUESTIONS), dtype=np.bool) for i, obj in enumerate(objects): v = np.zeros(self.NUM_COLORS) v[obj.color] = True Q[i * self.NUM_QUESTIONS:(i + 1) * self.NUM_QUESTIONS, :self.NUM_COLORS] = np.tile(v, (self.NUM_QUESTIONS, 1)) Q[i * self.NUM_QUESTIONS:(i + 1) * self.NUM_QUESTIONS, self.NUM_COLORS:] = np.diag(np.ones(self.NUM_QUESTIONS)) return Q
[docs] def generate_answer_matrix(self, objects): """ Generates the answers matrix: [# of shape * # of Q, # of color + 4] `# of color + 4` = [color 1, color 2, ... , circle, rectangle, yes, no] :param objects: List of objects - abstract scene representation. :type objects: list :return: the answer matrix (``np.array``) """ A = np.zeros((len(objects) * self.NUM_QUESTIONS, self.NUM_COLORS + 4), dtype=np.bool) for i, obj in enumerate(objects): # Q1: circle or rectangle? if obj.shape: A[i * self.NUM_QUESTIONS, self.NUM_COLORS + 1] = True else: A[i * self.NUM_QUESTIONS, self.NUM_COLORS] = True # Q2: bottom? if obj.y > int(self.img_size / 2): A[i * self.NUM_QUESTIONS + 1, self.NUM_COLORS + 2] = True else: A[i * self.NUM_QUESTIONS + 1, self.NUM_COLORS + 3] = True # Q3: left? if obj.x < int(self.img_size / 2): A[i * self.NUM_QUESTIONS + 2, self.NUM_COLORS + 2] = True else: A[i * self.NUM_QUESTIONS + 2, self.NUM_COLORS + 3] = True # Calculate distances. distances = np.array( [((obj.x - other_obj.x) ** 2 + (obj.y - other_obj.y) ** 2) for other_obj in objects]) idx = distances.argsort() # Ids of closest and most distant objects. min_idx = idx[1] max_idx = idx[-1] # Q4: the shape of the nearest object A[i * self.NUM_QUESTIONS + 3, self.NUM_COLORS + objects[min_idx].shape] = True # Q5: the shape of the farthest object A[i * self.NUM_QUESTIONS + 4, self.NUM_COLORS + objects[max_idx].shape] = True # Q6: the color of the nearest object A[i * self.NUM_QUESTIONS + 5, objects[min_idx].color] = True # Q7: the color of the farthest object A[i * self.NUM_QUESTIONS + 6, objects[max_idx].color] = True return A
[docs] def show_sample(self, data_dict, sample=0): """ Show a sample of the current DataDict. :param data_dict: DataDict({'images','questions', 'targets', 'targets_index', 'scenes_description'}) :type data_dict: DataDict :param sample: sample index to visualize. :type sample: int """ import matplotlib.pyplot as plt # Unpack data_dict. images, questions, targets, targets_index, scenes_description = data_dict.values() # Get sample. image = images[sample].numpy().transpose(2, 1, 0) question = questions[sample].numpy() answer = targets_index[sample].numpy() # Print scene description. self.logger.info("Scene description :\n {}".format(scenes_description[sample])) self.logger.info("Question :\n {} ({})".format(question, self.question2str(question))) self.logger.info("Answer :\n {} ({})".format(answer, self.answer2str(answer))) # Generate figure. fig = plt.figure(1) plt.title('Q: {}'.format(self.question2str(question))) plt.xlabel('A: {}'.format(self.answer2str(answer))) plt.imshow(image, interpolation='nearest', aspect='auto') # Plot! plt.show()
[docs] def plot_preprocessing(self, data_dict, logits): """ Allows for some data preprocessing before the model creates a plot for visualization during training or inference. To be redefined in inheriting classes. :param data_dict: DataDict({'images','questions', 'targets', 'targets_index', 'scenes_description'}) :param logits: Predictions of the model. :type logits: Tensor :return: data_tuplem aux_tuple, logits after preprocessing. """ # move DataDict to cpu and detach it from the graph data_dict = data_dict.cpu().detach().numpy() # Unpack data_dict. images, questions, targets, targets_index, scenes_description = data_dict.values() batch_size = targets.shape[0] logits = logits.cpu().detach().numpy() # Convert to string answers_string = [self.answer2str(targets_index[batch_num]) for batch_num in range(batch_size)] questions_string = [self.question2str(questions[batch_num]) for batch_num in range(batch_size)] prediction = [self.answer2str(np.argmax(logits[batch_num])) for batch_num in range(batch_size)] data_dict['targets_string'] = answers_string data_dict['questions_string'] = questions_string return data_dict, prediction
if __name__ == "__main__": """ Tests SortOfCLEVR - generates and displays a sample""" # "Loaded parameters". from miprometheus.utils.param_interface import ParamInterface params = ParamInterface() # using the default values # create problem sortofclevr = SortOfCLEVR(params) batch_size = 64 print('Number of episodes to run to cover the set once: {}'.format(sortofclevr.get_epoch_size(batch_size))) # get a sample sample = sortofclevr[0] print(repr(sample)) print('__getitem__ works.') # wrap DataLoader on top of this Dataset subclass from torch.utils.data import DataLoader dataloader = DataLoader(dataset=sortofclevr, collate_fn=sortofclevr.collate_fn, batch_size=batch_size, shuffle=True, num_workers=0) # try to see if there is a speed up when generating batches w/ multiple workers import time s = time.time() for i, batch in enumerate(dataloader): print('Batch # {} - {}'.format(i, type(batch))) print('Number of workers: {}'.format(dataloader.num_workers)) print('time taken to exhaust the dataset for a batch size of {}: {}s'.format(batch_size, time.time() - s)) # Display single sample (0) from batch. batch = next(iter(dataloader)) sortofclevr.show_sample(batch, 0) print('Unit test completed')