#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (C) IBM Corporation 2018
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
grid_worker.py:
- Contains the definition of the :py:class:`miprometheus.grid_workers.GridWorker` class, \
base for all grid workers, such as :py:class:`miprometheus.grid_workers.GridTrainerCPU` \
& :py:class:`miprometheus.grid_workers.GridAnalyzer`.
- These grid workers do not inherit from :py:class:`miprometheus.workers.Worker`, as they are different \
in behavior. Rather, they reuse the base workers to manage grid of experiments (by calling them using the commands).
- This class also contains the definition of the default command line arguments of the grid workers.
"""
__author__ = "Vincent Marois & Tomasz Kornuta"
import os
import psutil
import logging
import argparse
from abc import abstractmethod
from miprometheus.utils.app_state import AppState
from miprometheus.utils.param_interface import ParamInterface
[docs]class GridWorker(object):
"""
Base abstract class for the grid workers.
All grid workers should subclass it and override the relevant methods.
"""
[docs] def __init__(self, name="GridWorker", use_gpu=False):
"""
Base constructor for all grid workers:
- Initializes the AppState singleton:
>>> self.app_state = AppState()
- Initializes the Parameter Registry:
>>> self.params = ParamInterface()
- Defines the logger:
>>> self.logger = logging.getLogger(name=self.name)
- Creates parser and adds default worker command line arguments (you can display them with ``--h``).
:param name: Name of the worker (DEFAULT: "GridWorker").
:type name: str
:param use_gpu: Indicates whether the worker should use GPU or not. Value coming from the subclasses \
(e.g. :py:class:`miprometheus.grid_workers.GridTrainerCPU` vs \
:py:class:`miprometheus.grid_workers.GridTrainerGPU`) (DEFAULT: False).
:type use_gpu: bool
"""
# Call base constructor.
super(GridWorker, self).__init__()
# Set worker name.
self.name = name
# Initialize the application state singleton.
self.app_state = AppState()
self.app_state.use_CUDA = use_gpu
# Initialize parameter interface/registry.
self.params = ParamInterface()
# Load the default logger configuration.
logger_config = {'version': 1,
'disable_existing_loggers': False,
'formatters': {
'simple': {
'format': '[%(asctime)s] - %(levelname)s - %(name)s >>> %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S'}},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
'formatter': 'simple',
'stream': 'ext://sys.stdout'}},
'root': {'level': 'DEBUG',
'handlers': ['console']}}
logging.config.dictConfig(logger_config)
# Create the Logger, set its label and logging level.
self.logger = logging.getLogger(name=self.name)
# Create parser with a list of runtime arguments.
self.parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
# Add arguments to the specific parser.
# These arguments will be shared by all grid workers.
self.parser.add_argument('--expdir',
dest='expdir',
type=str,
default="./experiments",
help='Path to the directory where the experiments folders '
'will be / are stored. Affects all grid experiments.'
' (DEFAULT: ./experiments)')
self.parser.add_argument('--ll',
action='store',
dest='log_level',
type=str,
default='INFO',
choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG', 'NOTSET'],
help="Log level for the experiments. (Default: INFO)")
self.parser.add_argument('--li',
dest='logging_interval',
default=100,
type=int,
help='Statistics logging interval. Will impact logging to the logger and exporting to '
'TensorBoard for the experiments. Do not affect the grid worker itself. '
'Writing to the csv file is not impacted (interval of 1).'
' (Default: 100, i.e. logs every 100 episodes).')
self.parser.add_argument('--agree',
dest='user_confirm',
action='store_true',
help='Request user confirmation before starting the grid experiment.'
' (Default: False)')
[docs] def setup_grid_experiment(self):
"""
Setups the overall grid of experiments.
Base method:
- Parses command line arguments,
- Sets the 3 default sections (training / validation / test) for the param registry, \
sets seeds to unspecified and disable multiprocessing.
.. note::
Child classes should override this method, but still call its parent to draw the basic functionality \
implemented here.
"""
# Parse arguments.
self.flags, self.unparsed = self.parser.parse_known_args()
# Set logger depending on the settings.
self.logger.setLevel(getattr(logging, self.flags.log_level.upper(), None))
# add empty sections
self.params.add_default_params({"training": {}})
self.params.add_default_params({"validation": {}})
self.params.add_default_params({"testing": {}})
# set seeds to undefined (-1) and deactivate multiprocessing for `DataLoader`.
# It is important not to set the seeds here as they would be identical for all experiments.
self.params["training"].add_default_params({"seed_numpy": -1,
"seed_torch": -1,
"dataloader": {'num_workers': 0}})
self.params["validation"].add_default_params({"dataloader": {'num_workers': 0}})
self.params["testing"].add_default_params({"seed_numpy": -1,
"seed_torch": -1,
"dataloader": {'num_workers': 0}})
[docs] @abstractmethod
def run_grid_experiment(self):
"""
Main function of the :py:class:`miprometheus.grid_workers.GridWorker`, which essentially maps \
an experiment to available core or device.
.. note::
Abstract. Should be implemented in the subclasses.
"""
[docs] def get_available_cpus(self):
"""
Returns the number of available CPUs on the current machine.
"""
# Check scheduler for number of available cpus - if OS offers that!
if hasattr(os, 'sched_getaffinity'):
return len(os.sched_getaffinity(0))
proc = psutil.Process()
# cpu_affinity() is only available on Linux, Windows and FreeBSD
if hasattr(proc, 'cpu_affinity'):
return len(proc.cpu_affinity())
# Simply return CPU count
return psutil.cpu_count()