Source code for miprometheus.problems.seq_to_seq.algorithmic.algorithmic_seq_to_seq_problem

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (C) IBM Corporation 2018
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
algorithmic_seq_to_seq_problem.py: abstract base class for algorithmic sequential problems.

"""
__author__ = "Tomasz Kornuta, Younes Bouhadjar, Vincent Marois"

from abc import abstractmethod
import numpy as np
import torch
import torch.nn as nn

from miprometheus.problems.seq_to_seq.seq_to_seq_problem import SeqToSeqProblem
from miprometheus.utils.loss.masked_bce_with_logits_loss import MaskedBCEWithLogitsLoss


[docs]class AlgorithmicSeqToSeqProblem(SeqToSeqProblem): """ Base class for algorithmic sequential problems. Provides some basic features useful in all problems of such nature. ..info: All derived classes will provide two operation modes: - "optimized": "__getitem__" in fact does nothing (returns index), \ whereas "collate_fn" generates the whole batch. - "not_optimized": "__getitem__" generates a single sample, while \ "collate_fn" collates them. Advantage of the "not_optimized" mode is that a single batch will contain sequences of varying length. This mode is around 10 times slower though. ..warning: In both cases the derived classes will work as true data generators, \ and not really care about the indices provided from the list. As a result,\ each epoch will contain newly generated, thus different samples (for the same indices). ..warning: "optimized" mode is not suited to be used with many dataloader workers, i.e. \ setting num_workers > 0 will in fact slow the whole generation (by 3-4 times!). """
[docs] def __init__(self, params): """ Initializes problem object. Calls base ``SeqToSeqProblem`` constructor. Sets ``nn.BCEWithLogitsLoss()`` as the default loss function. :param params: Dictionary of parameters (read from configuration ``.yaml`` file). """ # call base constructor super(AlgorithmicSeqToSeqProblem, self).__init__(params) # "Default" problem name. self.name = 'AlgorithmicSeqToSeqProblem' # Set default loss function - cross entropy. if self.use_mask: self.loss_function = MaskedBCEWithLogitsLoss() else: self.loss_function = nn.BCEWithLogitsLoss() # Extract "standard" list of parameters for algorithmic tasks. # Number of bits in one element. self.control_bits = params['control_bits'] self.data_bits = params['data_bits'] # Set main two bits: store and recall. self.store_bit = 0 self.recall_bit = 1 # Min and max lengths of a single subsequence (number of elements). self.min_sequence_length = params['min_sequence_length'] self.max_sequence_length = params['max_sequence_length'] # Set default values for all Algorithmic Seq2Seq problems. self.default_values = { # Size of the input item, in this case it is number of bits. 'input_item_size': self.control_bits + self.data_bits, # Size of the output item. # Valid for most algorithmic tasks, must be overwritten by e.g. equality/symmetry, # which for every input item return single bit of information. 'output_item_size': self.data_bits, # Number of bit that indicates beginning of input sequence (store). 'store_bit': self.store_bit, # Number of bit that indicates beginning of target sequence (recall). 'recall_bit': self.recall_bit, } # Set data_definitions dict for all Algorithmic Seq2Seq problems. self.data_definitions = {'sequences': {'size': [-1, -1, -1], 'type': [torch.Tensor]}, 'targets': {'size': [-1, -1, -1], 'type': [torch.Tensor]}, 'masks': {'size': [-1, -1, 1], 'type': [torch.Tensor]}, 'sequences_length': {'size': [-1, 1], 'type': [torch.Tensor]}, 'num_subsequences': {'size': [-1, 1], 'type': [torch.Tensor]}, } # Set the default size of the dataset. # TODO: Should derive the actual theoretical limit instead of an arbitrary limit. self.params.add_default_params({'size': 1000}) self.length = params['size'] # Add parameter denoting 0-1 distribution (DEFAULT: 0.5 i.e. equal). self.params.add_default_params({'bias': 0.5}) self.bias = params['bias'] # Use "additional" control lines. self.params.add_default_params({'use_control_lines': True}) self.use_control_lines = params['use_control_lines'] # Random control lines. self.params.add_default_params({'randomize_control_lines': True}) self.randomize_control_lines = params['randomize_control_lines'] # Set default data generation mode. self.params.add_default_params({'generation_mode': 'optimized'}) gen_mode = params['generation_mode'] if gen_mode == 'optimized': # "Attach" the "__getitem__" and "collate_fn" functions - generates whole batch at once, optimized. setattr(self.__class__, '__getitem__', staticmethod(self.do_not_generate_sample)) setattr(self.__class__, 'collate_fn', staticmethod(self.collate_by_batch_generation)) else: # "Attach" the "__getitem__" and "collate_fn" functions - samples are generated one by one, slower. setattr(self.__class__, '__getitem__', staticmethod(self.generate_sample_ignore_index)) setattr(self.__class__, 'collate_fn', staticmethod(self.collate_samples_from_batch))
[docs] def pad_collate_tensor_list(self, tensor_list, max_seq_len = -1): """ Method collates list of 2D tensors with varying dimension 0 ("sequence length"). Pads 0 along that dimension. :param tensor_list: list [BATCH_SIZE] of tensors [SEQ_LEN, DATA_SIZE] to be padded. :param max_seq_len: max sequence length (DEFAULT: -1 means that it will recalculate it on the fly) :return: 3D padded tensor [BATCH_SIZE, MAX_SEQ_LEN, DATA_SIZE] """ # Get batch size. batch_size = len(tensor_list) if (max_seq_len < 0): # Get max total length. max_seq_len = max([t.shape[0] for t in tensor_list]) # Collate tensors - add padding to each of them separatelly. collated_tensors = torch.zeros(size=(batch_size, max_seq_len, tensor_list[0].shape[-1])) for i,t in enumerate(tensor_list): # Version 1: pad #ten_pad = max_seq_len - t.shape[0] # (padLeft, padRight, padTop, padBottom) #pad = torch.nn.ZeroPad2d( (0, 0, 0, ten_pad)) #collated_tensors[i,:,:] = pad(t) # Version 2: copy. ten_len = t.shape[0] collated_tensors[i,:ten_len] = t return collated_tensors
[docs] @abstractmethod def generate_batch(self, batch_size): """ Generates a batch of samples of size ''batch_size'' on-the-fly. ..note: To be implemented in the derived algorithmic problem classes. :param batch_size: Size of the batch to be returned. :return: DataDict({'sequences', 'sequences_length', 'targets', 'masks', 'num_subsequences'}), with: - sequences: [BATCH_SIZE, 2*SEQ_LENGTH+2, CONTROL_BITS+DATA_BITS] - sequences_length: [BATCH_SIZE, 1] (the same random value between self.min_sequence_length and self.max_sequence_length) - targets: [BATCH_SIZE, , 2*SEQ_LENGTH+2, DATA_BITS] - masks: [BATCH_SIZE, 2*SEQ_LENGTH+2, 1] - num_subsequences: [BATCH_SIZE, 1] """
[docs] def generate_sample_ignore_index(self, index): """ Returns one individual sample generated on-the-fly. .. note:: The sequence length is drawn randomly between ``self.min_sequence_length`` and \ ``self.max_sequence_length``. .. warning:: As the name of the method suggests, ''the index'' will in fact be ignored during generation. :param index: index of the sample to returned (IGNORED). :return: DataDict({'sequences', 'sequences_length', 'targets', 'masks', 'num_subsequences'}), with: - sequences: [2*SEQ_LENGTH+2, CONTROL_BITS+DATA_BITS], - sequences_length: [1] (random value between self.min_sequence_length and self.max_sequence_length) - targets: [2*SEQ_LENGTH+2, DATA_BITS] - masks: [2*SEQ_LENGTH+2] - num_subsequences: [1] """ # Generate batch of size 1. data_dict = self.generate_batch(1) # Squeeze the batch dimension. for key in self.data_definitions.keys(): data_dict[key] = data_dict[key].squeeze(0) return data_dict
[docs] def collate_samples_from_batch(self, batch_of_dicts): """ Generates a batch of samples on-the-fly :param batch_of_dicts: Should be a list of DataDict retrieved by `__getitem__`, each containing tensors, numbers,\ dicts or lists. --> **Not Used Here!** :return: DataDict({'sequences', 'sequences_length', 'targets', 'masks', 'num_subsequences'}), with: - sequences: [BATCH_SIZE, 2*MAX_SEQ_LENGTH+2, CONTROL_BITS+DATA_BITS], - sequences_length: [BATCH_SIZE, 1] (random values between self.min_sequence_length and self.max_sequence_length) - targets: [BATCH_SIZE, 2*MAX_SEQ_LENGTH+2, DATA_BITS], - mask: [BATCH_SIZE, [2*MAX_SEQ_LENGTH+2] - num_subsequences: [BATCH_SIZE, 1] """ # Get max total (input+markers+output) length. max_batch_total_len = max([d['sequences'].shape[0] for d in batch_of_dicts]) # Collate sequences - add padding to each of them separatelly. collated_sequences = self.pad_collate_tensor_list( [d['sequences'] for d in batch_of_dicts], max_batch_total_len) #print(collated_sequences.shape) # Collate masks. collated_masks = self.pad_collate_tensor_list( [d['masks'] for d in batch_of_dicts], max_batch_total_len) #print(collated_masks.shape) # Collate targets. collated_targets = self.pad_collate_tensor_list( [d['targets'] for d in batch_of_dicts], max_batch_total_len) #print(collated_targets.shape) # Collate lengths. collated_lengths = torch.tensor([d['sequences_length'] for d in batch_of_dicts]) #print(collated_lengths) # Collate lengths. collated_num_subsequences = torch.tensor([d['num_subsequences'] for d in batch_of_dicts]) #print(collated_num_subsequences) # Return data_dict. data_dict = self.create_data_dict() data_dict['sequences'] = collated_sequences data_dict['sequences_length'] = collated_lengths data_dict['targets'] = collated_targets data_dict['masks'] = collated_masks data_dict['num_subsequences'] = collated_num_subsequences return data_dict
[docs] def do_not_generate_sample(self, index): """ Method used as __getitem__ in "optimized" mode. It simply returns back the received index. Whole generation is made in ''collate_fn'' (i.e. collate_by_generation_batch'') .. warning:: As the name of the method suggests, the method does not generate the sample. :param index: index of the sample to returned (IGNORED). :return: index """ return index
[docs] def collate_by_batch_generation(self, batch): """ Generates a batch of samples on-the-fly. .. warning:: The samples created by ``__getitem__`` are simply not used in this function. As``collate_fn`` generates on-the-fly a batch of samples relying on the underlying ''generate_batch''\ method, all having the same length (randomly selected thought). :param batch: **Not Used Here!** :return: DataDict({'sequences', 'sequences_length', 'targets', 'masks', 'num_subsequences'}), with: - sequences: [BATCH_SIZE, 2*SEQ_LENGTH+2, CONTROL_BITS+DATA_BITS] - sequences_length: [BATCH_SIZE, 1] (the same random value between self.min_sequence_length and self.max_sequence_length) - targets: [BATCH_SIZE, , 2*SEQ_LENGTH+2, DATA_BITS] - masks: [BATCH_SIZE, 2*SEQ_LENGTH+2, 1] - num_subsequences: [BATCH_SIZE, 1] """ # Generate batch of size 1. data_dict = self.generate_batch(len(batch)) return data_dict
[docs] def set_max_length(self, max_length): """ Sets maximum sequence lenth (property). :param max_length: Length to be saved as max. """ self.max_sequence_length = max_length
[docs] def curriculum_learning_initialize(self, curriculum_params): """ Initializes curriculum learning - simply saves the curriculum params. .. note:: This method can be overwritten in the derived classes. :param curriculum_params: Interface to parameters accessing curriculum learning view of the registry tree. """ # Save params. self.curriculum_params = curriculum_params # Inform the user. epoch_size = self.get_epoch_size(self.params["batch_size"]) self.logger.info("Initializing curriculum learning! Will activate when all samples are exhausted" + \ "(every {} episodes when using batch of size {})".format(epoch_size, self.params["batch_size"]))
[docs] def curriculum_learning_update_params(self, episode): """ Updates problem parameters according to curriculum learning. In the \ case of algorithmic sequential problems, it updates the max sequence \ length, depending on configuration parameters. :param episode: Number of the current episode. :type episode: int :return: Boolean informing whether curriculum learning is finished (or wasn't active at all). """ # Curriculum learning stop condition. curric_done = True try: # Read curriculum learning parameters. max_max_length = self.params['max_sequence_length'] initial_max_sequence_length = self.curriculum_params['initial_max_sequence_length'] epoch_size = self.get_epoch_size(self.params["batch_size"]) # Curriculum learning goes from the initial max length to the # max length in steps of size 1 max_length = initial_max_sequence_length + \ ((episode+1) // epoch_size) if max_length > max_max_length: max_length = max_max_length else: curric_done = False # Change max length. self.max_sequence_length = max_length except KeyError: pass # Return information whether we finished CL (i.e. reached max sequence length). return curric_done
[docs] def calculate_accuracy(self, data_dict, logits): """ Calculate accuracy equal to mean difference between outputs and targets. .. warning:: Applies mask to both logits and targets. :param data_dict: DataDict({'sequences', 'sequences_length', 'targets', 'mask', 'num_subsequences'}). :param logits: Predictions of the model. :type logits: tensor :return: Accuracy. """ # Check if mask should be is used - if so, apply. if self.use_mask: return self.loss_function.masked_accuracy( logits, data_dict['targets'], data_dict['masks']) else: return (1 - torch.abs(torch.round(torch.nn.functional.sigmoid(logits)) - data_dict['targets'])).mean()
[docs] def add_ctrl(self, seq, ctrl, pos): """ Adds control channels to a sequence. :param seq: Sequence to which controls channel are added. :type seq: array_like :param ctrl: Elements to add :type ctrl: array_like :param: pos: Object that defines the index or indices before which ctrl is inserted. :type pos: int, slice or sequence of ints :return: updated sequence. """ return np.insert(seq, pos, ctrl, axis=-1)
[docs] def augment(self, seq, markers, ctrl_start=None, add_marker_data=False, add_marker_dummy=True): """ Creates augmented sequence as well as end marker and a dummy sequence. :param seq: Sequence :type seq: array_like :param markers: (ctrl_data, ctrl_dummy, pos) :type markers: tuple :param ctrl_start: :type ctrl_start: :param add_marker_data: Whether to add a marker before the data :type add_marker_data: bool :param add_marker_dummy: Whether to add a marker before the dummy :type add_marker_dummy: bool :return: [augmented_sequence, dummy] """ ctrl_data, ctrl_dummy, pos = markers w = self.add_ctrl(seq, ctrl_data, pos) start = self.add_ctrl( np.zeros((seq.shape[0], 1, seq.shape[2])), ctrl_start, pos) if add_marker_data: w = np.concatenate((start, w), axis=1) start_dummy = self.add_ctrl( np.zeros((seq.shape[0], 1, seq.shape[2])), ctrl_dummy, pos) ctrl_data_select = np.zeros(len(ctrl_data)) dummy = self.add_ctrl(np.zeros_like(seq), ctrl_data_select, pos) if add_marker_dummy: dummy = np.concatenate((start_dummy, dummy), axis=1) return [w, dummy]
[docs] def add_statistics(self, stat_col): """ Add accuracy, seq_length and max_seq_length statistics to a ``StatisticsCollector``. :param stat_col: Statistics collector. :type stat_col: ``StatisticsCollector`` """ # Add basic statistics. super(AlgorithmicSeqToSeqProblem, self).add_statistics(stat_col) stat_col.add_statistic('acc', '{:12.10f}') stat_col.add_statistic('seq_length', '{:d}') #stat_col.add_statistic('num_subseq', '{:d}') stat_col.add_statistic('max_seq_length', '{:d}') stat_col.add_statistic('batch_size', '{:06d}')
[docs] def collect_statistics(self, stat_col, data_dict, logits): """ Collects accuracy, seq_length and max_seq_length. :param stat_col: Statistics collector. :type stat_col: ``StatisticsCollector`` :param data_dict: DataDict({'sequences', 'sequences_length', 'targets', 'mask', 'num_subsequences'}). :type data_dict: DataDict :param logits: Predictions of the model. :type logits: tensor """ # Collect basic statistics. super(AlgorithmicSeqToSeqProblem, self).collect_statistics(stat_col, data_dict, logits) stat_col['acc'] = self.calculate_accuracy(data_dict, logits) stat_col['seq_length'] = max(data_dict['sequences_length']).item() #stat_col['num_subseq'] = data_dict['num_subsequences'] stat_col['max_seq_length'] = self.max_sequence_length stat_col['batch_size'] = logits.shape[0] # Batch major.
[docs] def add_aggregators(self, stat_agg): """ Adds problem-dependent statistical aggregators to ``StatisticsAggregator``. :param stat_agg: ``StatisticsAggregator``. """ # Add basic aggregators. super(AlgorithmicSeqToSeqProblem, self).add_aggregators(stat_agg) stat_agg.add_aggregator('acc', '{:12.10f}') # represents the average accuracy stat_agg.add_aggregator('acc_min', '{:12.10f}') stat_agg.add_aggregator('acc_max', '{:12.10f}') stat_agg.add_aggregator('acc_std', '{:12.10f}') stat_agg.add_aggregator('samples_aggregated', '{:06d}')
[docs] def aggregate_statistics(self, stat_col, stat_agg): """ Aggregates the statistics collected by ``StatisticsCollector`` and adds the results to ``StatisticsAggregator``. :param stat_col: ``StatisticsCollector``. :param stat_agg: ``StatisticsAggregator``. """ # Aggregate base statistics. super(AlgorithmicSeqToSeqProblem, self).aggregate_statistics(stat_col, stat_agg) stat_agg['acc_min'] = min(stat_col['acc']) stat_agg['acc_max'] = max(stat_col['acc']) stat_agg['acc'] = torch.mean(torch.tensor(stat_col['acc'])) stat_agg['acc_std'] = 0.0 if len(stat_col['acc']) <= 1 else torch.std(torch.tensor(stat_col['acc'])) stat_agg['samples_aggregated'] = sum(stat_col['batch_size'])
[docs] def show_sample(self, data_dict, sample=0): """ Shows the sample (both input and target sequences) using ``matplotlib``. Elementary visualization. :param data_dict: DataDict({'sequences', 'sequences_length', 'targets', 'mask', 'num_subsequences'}). :type data_dict: DataDict :param sample: Number of sample in a batch (Default: 0) :type sample: int """ import matplotlib # Generate "canvas". fig, (ax1, ax2, ax3) = matplotlib.pyplot.subplots(3, 1, sharex=True, sharey=False, gridspec_kw={ 'width_ratios': [data_dict['sequences'].shape[1]], 'height_ratios': [10, 10, 1]}) # Set ticks. ax1.xaxis.set_major_locator(matplotlib.ticker.MaxNLocator(integer=True)) ax1.yaxis.set_major_locator(matplotlib.ticker.MaxNLocator(integer=True)) ax2.yaxis.set_major_locator(matplotlib.ticker.MaxNLocator(integer=True)) ax3.yaxis.set_major_locator(matplotlib.ticker.NullLocator()) # Set labels. ax1.set_title('Inputs') ax1.set_ylabel('Control/Data bits') ax2.set_title('Targets') ax2.set_ylabel('Data bits') ax3.set_title('Target mask') ax3.set_ylabel('Mask bit') ax3.set_xlabel('Item number', fontname='Times New Roman', fontsize=13) # print data #print("\ninputs:", data_dict['sequences'][sample, :, :]) #print("\ntargets:", data_dict['targets'][sample, :, :]) #print("\nmask:", data_dict['mask'][sample:sample + 1, :]) #print("\nseq_length:", data_dict['sequences_length']) #print("\nnum_subsequences:", data_dict['num_subsequences']) # show data. ax1.imshow(np.transpose(data_dict['sequences'][sample, :, :], [1, 0]), interpolation='nearest', aspect='auto') ax2.imshow(np.transpose(data_dict['targets'][sample, :, :], [1, 0]), interpolation='nearest', aspect='auto') ax3.imshow(np.transpose(data_dict['masks'][sample, :, :], [1, 0]), interpolation='nearest', aspect='auto') # Plot! matplotlib.pyplot.tight_layout() matplotlib.pyplot.show()
if __name__ == '__main__': from miprometheus.utils.param_interface import ParamInterface params = ParamInterface() params.add_config_params({'control_bits': 2, 'data_bits': 8, 'min_sequence_length': 1, 'max_sequence_length': 10}) sample = AlgorithmicSeqToSeqProblem(params)[0] # equivalent to ImageTextToClassProblem(params={}).__getitem__(index=0) print(repr(sample))