#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (C) IBM Corporation 2018
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
grid_tester_cpu.py:
- This file contains the implementation of a worker running the :py:class:`miprometheus.workers.Tester` \
on the results of a ``GridTrainer`` using CPUs.
- The main input is a list of directories for each problem/model e.g. `experiments/serial_recall/dnc`, \
and executes on every run of the model in that directory.
"""
__author__ = "Tomasz Kornuta & Vincent Marois"
import os
import shutil
import subprocess
from functools import partial
from multiprocessing.pool import ThreadPool
from miprometheus.grid_workers.grid_worker import GridWorker
[docs]class GridTesterCPU(GridWorker):
"""
Implementation of the Grid Tester running on CPUs.
Reuses the :py:class:`miprometheus.workers.Tester` to start one test experiment.
"""
[docs] def __init__(self, name="GridTesterCPU", use_gpu=False):
"""
Constructor for the :py:class:`miprometheus.grid_workers.GridTesterCPU`:
- Calls the base constructor to set the worker's name and add default command lines arguments,
- Adds some ``GridTester`` specific command line arguments.
:param name: Name of the worker (DEFAULT: "GridTesterCPU").
:type name: str
:param use_gpu: Indicates whether the worker should use GPU or not.
:type use_gpu: bool
"""
# call base constructor
super(GridTesterCPU, self).__init__(name=name,use_gpu=use_gpu)
# Get number_of_repetitions
self.parser.add_argument('--repeat',
dest='experiment_repetitions',
type=int,
default=1,
help='Number of experiment repetitions to run for each model (DEFAULT=1).')
# Get number_of_repetitions
self.parser.add_argument('--max_concur_runs',
dest='max_concurrent_runs',
type=int,
default=-1,
help='Value limiting the number of concurrently running experiments.'
'The set limit will be truncated by number of available CPUs/GPUs.'
' (DEFAULT=-1, meaning that it will be set to the number of CPUs/GPUs)')
[docs] def setup_grid_experiment(self):
"""
Setups the overall grid of experiments:
- Calls :py:func:`GridWorker.setup_grid_experiment()` to parse arguments,
- Recursively creates the paths to the experiments folders, verifying that they are valid (e.g. \
they contain a saved model, `model_best.pt`).
"""
super(GridTesterCPU, self).setup_grid_experiment()
# Check the presence of mip-tester script.
if shutil.which('mip-tester') is None:
self.logger.error("Cannot localize the 'mip-tester' script! (hint: please use setup.py to install it)")
exit(-1)
self.experiment_rootdir = self.flags.expdir
# Get grid settings.
experiment_repetitions = self.flags.experiment_repetitions
self.max_concurrent_runs = self.flags.max_concurrent_runs
# get all sub-directories paths in expdir, repeating according to flags.experiment_repetitions
self.experiments_list = []
for _ in range(experiment_repetitions):
for root, dirs, _ in os.walk(self.experiment_rootdir, topdown=True):
for name in dirs:
self.experiments_list.append(os.path.join(root, name))
# Keep only the folders that contain best_model.pt in model subdirectory.
# We assume that training configuration is there as well.
self.experiments_list = [elem for elem in self.experiments_list
if os.path.isfile(elem + '/model_best.pt')]
# Check if these are 'valid' folders, e.g. they contain a saved model
if len(self.experiments_list) == 0:
self.logger.error("There are no models in {} directory!".format(self.experiment_rootdir))
exit(-2)
# List folders.
exp_str = "Found the following models in {} directory:\n".format(self.experiment_rootdir)
exp_str += '='*80 + '\n'
for exp in self.experiments_list:
exp_str += " - {}/model_best.pt\n".format(exp)
exp_str += '='*80 + '\n'
self.logger.info(exp_str)
self.logger.info('Number of experiments to run: {}'.format(len(self.experiments_list)))
self.experiments_done = 0
# Ask for confirmation - optional.
if self.flags.user_confirm:
try:
input('Press <Enter> to confirm and start the grid of experiments\n')
except KeyboardInterrupt:
exit(0)
[docs] def run_grid_experiment(self):
"""
Main function of the :py:class:`miprometheus.grid_workers.GridTesterCPU`.
Maps the grid experiments to CPU cores in the limit of the maximum concurrent runs allowed or maximum\
available cores.
"""
try:
# Check max number of child processes.
if self.max_concurrent_runs <= 0: # We need at least one process!
max_processes = self.get_available_cpus()
else:
# Take into account the minimum value.
max_processes = min(self.get_available_cpus(), self.max_concurrent_runs)
self.logger.info('Spanning experiments using {} CPU(s) concurrently'.format(max_processes))
# Run in as many threads as there are CPUs available to the script.
with ThreadPool(processes=max_processes) as pool:
func = partial(GridTesterCPU.run_experiment, self, prefix="")
pool.map(func, self.experiments_list)
self.logger.info('Grid testing finished')
except KeyboardInterrupt:
self.logger.info('Grid testing interrupted!')
[docs] def run_experiment(self, experiment_path: str, prefix=""):
"""
Runs a test on the specified model (experiment_path) using the :py:class:`miprometheus.workers.Tester`.
:param experiment_path: Path to an experiment folder containing a trained model.
:type experiment_path: str
:param prefix: Prefix to position before the command string (e.g. 'cuda-gpupick -n 1'). Optional.
:type prefix: str
..note::
- Visualization is deactivated to avoid any user interaction.
- Command-line arguments such as the logging interval (``--li``) and log level (``--ll``) are passed \
to the :py:class:`miprometheus.workers.Tester`.
"""
try:
path_to_model = os.path.join(experiment_path, 'model_best.pt')
self.logger.warning(path_to_model)
# Run the test
command_str = "{}mip-tester --model {} --li {} --ll {}".format(
prefix, path_to_model,
self.flags.logging_interval,
self.flags.log_level)
# Add gpu flag if required.
if self.app_state.use_CUDA:
command_str += " --gpu "
self.logger.info("Starting: {}".format(command_str))
with open(os.devnull, 'w') as devnull:
result = subprocess.run(command_str.split(" "), stdout=devnull)
self.experiments_done += 1
self.logger.info("Finished: {}".format(command_str))
self.logger.info(
'Number of experiments done: {}/{}.'.format(self.experiments_done, len(self.experiments_list)))
if result.returncode != 0:
self.logger.info("Testing exited with code: {}".format(result.returncode))
except KeyboardInterrupt:
self.logger.info('Grid testing interrupted!')
def main():
"""
Entry point function for the :py:class:`miprometheus.grid_workers.GridTesterCPU`.
"""
grid_tester_cpu = GridTesterCPU()
# parse args, load configuration and create all required objects.
grid_tester_cpu.setup_grid_experiment()
# GO!
grid_tester_cpu.run_grid_experiment()
if __name__ == '__main__':
main()