diff --git a/python/ray/rllib/__init__.py b/python/ray/rllib/__init__.py index db9f52687..b3155f2dc 100644 --- a/python/ray/rllib/__init__.py +++ b/python/ray/rllib/__init__.py @@ -2,6 +2,8 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import logging + # Note: do not introduce unnecessary library dependencies here, e.g. gym. # This file is imported from the tune module in order to register RLlib agents. from ray.tune.registry import register_trainable @@ -16,6 +18,17 @@ from ray.rllib.evaluation.policy_evaluator import PolicyEvaluator from ray.rllib.evaluation.sample_batch import SampleBatch +def _setup_logger(): + logger = logging.getLogger("ray.rllib") + handler = logging.StreamHandler() + handler.setFormatter( + logging.Formatter( + "%(asctime)s\t%(levelname)s %(filename)s:%(lineno)s -- %(message)s" + )) + logger.addHandler(handler) + logger.propagate = False + + def _register_all(): for key in [ @@ -27,6 +40,7 @@ def _register_all(): register_trainable(key, get_agent_class(key)) +_setup_logger() _register_all() __all__ = [ diff --git a/python/ray/rllib/agents/a3c/a3c.py b/python/ray/rllib/agents/a3c/a3c.py index 55f179ade..5788f3381 100644 --- a/python/ray/rllib/agents/a3c/a3c.py +++ b/python/ray/rllib/agents/a3c/a3c.py @@ -10,6 +10,7 @@ from ray.rllib.optimizers import AsyncGradientsOptimizer from ray.rllib.utils import merge_dicts from ray.tune.trial import Resources +# yapf: disable # __sphinx_doc_begin__ DEFAULT_CONFIG = with_common_config({ # Size of rollout batch @@ -36,8 +37,8 @@ DEFAULT_CONFIG = with_common_config({ # sample_batch_size by up to 5x due to async buffering of batches. "sample_async": True, }) - # __sphinx_doc_end__ +# yapf: enable class A3CAgent(Agent): diff --git a/python/ray/rllib/agents/agent.py b/python/ray/rllib/agents/agent.py index 17d719851..c082c1d28 100644 --- a/python/ray/rllib/agents/agent.py +++ b/python/ray/rllib/agents/agent.py @@ -4,6 +4,7 @@ from __future__ import print_function import copy import os +import logging import pickle import tempfile from datetime import datetime @@ -19,12 +20,38 @@ from ray.tune.trainable import Trainable from ray.tune.logger import UnifiedLogger from ray.tune.result import DEFAULT_RESULTS_DIR +# yapf: disable # __sphinx_doc_begin__ COMMON_CONFIG = { + # === Debugging === + # Whether to write episode stats and videos to the agent log dir + "monitor": False, + # Set the RLlib log level for the agent process and its remote evaluators + "log_level": "INFO", + + # === Policy === + # Arguments to pass to model. See models/catalog.py for a full list of the + # available model options. + "model": MODEL_DEFAULTS, + # Arguments to pass to the policy optimizer. These vary by optimizer. + "optimizer": {}, + + # === Environment === # Discount factor of the MDP "gamma": 0.99, # Number of steps after which the episode is forced to terminate "horizon": None, + # Arguments to pass to the env creator + "env_config": {}, + # Environment name can also be passed via config + "env": None, + # Whether to clip rewards prior to experience postprocessing. Setting to + # None means clip for Atari only. + "clip_rewards": None, + # Whether to use rllib or deepmind preprocessors by default + "preprocessor_pref": "deepmind", + + # === Execution === # Number of environments to evaluate vectorwise per worker. "num_envs_per_worker": 1, # Number of actors used for parallelism @@ -42,20 +69,6 @@ COMMON_CONFIG = { "observation_filter": "NoFilter", # Whether to synchronize the statistics of remote filters. "synchronize_filters": True, - # Whether to clip rewards prior to experience postprocessing. Setting to - # None means clip for Atari only. - "clip_rewards": None, - # Whether to use rllib or deepmind preprocessors - "preprocessor_pref": "deepmind", - # Arguments to pass to the env creator - "env_config": {}, - # Environment name can also be passed via config - "env": None, - # Arguments to pass to model. See models/catalog.py for a full list of the - # available model options. - "model": MODEL_DEFAULTS, - # Arguments to pass to the policy optimizer. These vary by optimizer. - "optimizer": {}, # Configure TF for single-process operation by default "tf_session_args": { # note: parallelism_threads is set to auto for the local evaluator @@ -72,8 +85,6 @@ COMMON_CONFIG = { }, # Whether to LZ4 compress observations "compress_observations": False, - # Whether to write episode stats and videos to the agent log dir - "monitor": False, # Allocate a fraction of a GPU instead of one (e.g., 0.3 GPUs) "gpu_fraction": 1, @@ -88,8 +99,8 @@ COMMON_CONFIG = { "policies_to_train": None, }, } - # __sphinx_doc_end__ +# yapf: enable def with_common_config(extra_config): @@ -170,7 +181,8 @@ class Agent(Trainable): model_config=config["model"], policy_config=config, worker_index=worker_index, - monitor_path=self.logdir if config["monitor"] else None) + monitor_path=self.logdir if config["monitor"] else None, + log_level=config["log_level"]) @classmethod def resource_help(cls, config): @@ -197,13 +209,12 @@ class Agent(Trainable): # Agents allow env ids to be passed directly to the constructor. self._env_id = env or config.get("env") - if not self._env_id: - raise ValueError("Must specify env (str) when creating agent") # Create a default logger creator if no logger_creator is specified if logger_creator is None: timestr = datetime.today().strftime("%Y-%m-%d_%H-%M-%S") - logdir_prefix = '_'.join([self._agent_name, self._env_id, timestr]) + logdir_prefix = "{}_{}_{}".format( + [self._agent_name, self._env_id, timestr]) def default_logger_creator(config): """Creates a Unified logger with a default logdir prefix @@ -256,6 +267,8 @@ class Agent(Trainable): self._allow_unknown_configs, self._allow_unknown_subkeys) self.config = merged_config + if self.config.get("log_level"): + logging.getLogger("ray.rllib").setLevel(self.config["log_level"]) # TODO(ekl) setting the graph is unnecessary for PyTorch agents with tf.Graph().as_default(): diff --git a/python/ray/rllib/agents/ars/ars.py b/python/ray/rllib/agents/ars/ars.py index 0c9af4dda..67e87057f 100644 --- a/python/ray/rllib/agents/ars/ars.py +++ b/python/ray/rllib/agents/ars/ars.py @@ -7,6 +7,7 @@ from __future__ import division from __future__ import print_function from collections import namedtuple +import logging import numpy as np import time @@ -16,14 +17,16 @@ from ray.tune.trial import Resources from ray.rllib.agents.ars import optimizers from ray.rllib.agents.ars import policies -from ray.rllib.agents.es import tabular_logger as tlogger from ray.rllib.agents.ars import utils +logger = logging.getLogger(__name__) + Result = namedtuple("Result", [ "noise_indices", "noisy_returns", "sign_noisy_returns", "noisy_lengths", "eval_returns", "eval_lengths" ]) +# yapf: disable # __sphinx_doc_begin__ DEFAULT_CONFIG = with_common_config({ "noise_stdev": 0.02, # std deviation of parameter noise @@ -38,6 +41,7 @@ DEFAULT_CONFIG = with_common_config({ "offset": 0, }) # __sphinx_doc_end__ +# yapf: enable @ray.remote @@ -163,12 +167,12 @@ class ARSAgent(Agent): self.report_length = self.config["report_length"] # Create the shared noise table. - print("Creating shared noise table.") + logger.info("Creating shared noise table.") noise_id = create_shared_noise.remote(self.config["noise_size"]) self.noise = SharedNoiseTable(ray.get(noise_id)) # Create the actors. - print("Creating actors.") + logger.info("Creating actors.") self.workers = [ Worker.remote(self.config, self.env_creator, noise_id) for _ in range(self.config["num_workers"]) @@ -182,8 +186,9 @@ class ARSAgent(Agent): num_episodes, num_timesteps = 0, 0 results = [] while num_episodes < min_episodes: - print("Collected {} episodes {} timesteps so far this iter".format( - num_episodes, num_timesteps)) + logger.info( + "Collected {} episodes {} timesteps so far this iter".format( + num_episodes, num_timesteps)) rollout_ids = [ worker.do_rollouts.remote(theta_id) for worker in self.workers ] @@ -263,7 +268,6 @@ class ARSAgent(Agent): g /= np.std(noisy_returns) assert (g.shape == (self.policy.num_params, ) and g.dtype == np.float32) - print('the number of policy params is, ', self.policy.num_params) # Compute the new weights theta. theta, update_ratio = self.optimizer.update(-g) # Set the new weights in the local copy of the policy. @@ -272,18 +276,9 @@ class ARSAgent(Agent): if len(all_eval_returns) > 0: self.reward_list.append(eval_returns.mean()) - tlogger.record_tabular("NoisyEpRewMean", noisy_returns.mean()) - tlogger.record_tabular("NoisyEpRewStd", noisy_returns.std()) - tlogger.record_tabular("NoisyEpLenMean", noisy_lengths.mean()) - - tlogger.record_tabular("WeightsNorm", float(np.square(theta).sum())) - tlogger.record_tabular("WeightsStd", float(np.std(theta))) - tlogger.record_tabular("Grad2Norm", float(np.sqrt(np.square(g).sum()))) - tlogger.record_tabular("UpdateRatio", float(update_ratio)) - tlogger.dump_tabular() - info = { "weights_norm": np.square(theta).sum(), + "weights_std": np.std(theta), "grad_norm": np.square(g).sum(), "update_ratio": update_ratio, "episodes_this_iter": noisy_lengths.size, diff --git a/python/ray/rllib/agents/ddpg/ddpg.py b/python/ray/rllib/agents/ddpg/ddpg.py index c35fdaa71..ed58718b4 100644 --- a/python/ray/rllib/agents/ddpg/ddpg.py +++ b/python/ray/rllib/agents/ddpg/ddpg.py @@ -13,6 +13,7 @@ OPTIMIZER_SHARED_CONFIGS = [ "train_batch_size", "learning_starts" ] +# yapf: disable # __sphinx_doc_begin__ DEFAULT_CONFIG = with_common_config({ # === Model === @@ -108,8 +109,8 @@ DEFAULT_CONFIG = with_common_config({ # Prevent iterations from going lower than this time span "min_iter_time_s": 1, }) - # __sphinx_doc_end__ +# yapf: enable class DDPGAgent(DQNAgent): diff --git a/python/ray/rllib/agents/dqn/apex.py b/python/ray/rllib/agents/dqn/apex.py index 052d0fd3e..ac8ec4490 100644 --- a/python/ray/rllib/agents/dqn/apex.py +++ b/python/ray/rllib/agents/dqn/apex.py @@ -6,6 +6,7 @@ from ray.rllib.agents.dqn.dqn import DQNAgent, DEFAULT_CONFIG as DQN_CONFIG from ray.rllib.utils import merge_dicts from ray.tune.trial import Resources +# yapf: disable # __sphinx_doc_begin__ APEX_DEFAULT_CONFIG = merge_dicts( DQN_CONFIG, # see also the options in dqn.py, which are also supported @@ -31,8 +32,8 @@ APEX_DEFAULT_CONFIG = merge_dicts( "min_iter_time_s": 30, }, ) - # __sphinx_doc_end__ +# yapf: enable class ApexAgent(DQNAgent): diff --git a/python/ray/rllib/agents/dqn/dqn.py b/python/ray/rllib/agents/dqn/dqn.py index f86b286ce..312040777 100644 --- a/python/ray/rllib/agents/dqn/dqn.py +++ b/python/ray/rllib/agents/dqn/dqn.py @@ -20,6 +20,7 @@ OPTIMIZER_SHARED_CONFIGS = [ "learning_starts" ] +# yapf: disable # __sphinx_doc_begin__ DEFAULT_CONFIG = with_common_config({ # === Model === @@ -116,8 +117,8 @@ DEFAULT_CONFIG = with_common_config({ # Prevent iterations from going lower than this time span "min_iter_time_s": 1, }) - # __sphinx_doc_end__ +# yapf: enable class DQNAgent(Agent): diff --git a/python/ray/rllib/agents/es/es.py b/python/ray/rllib/agents/es/es.py index 2fe63c71a..ed2ed1869 100644 --- a/python/ray/rllib/agents/es/es.py +++ b/python/ray/rllib/agents/es/es.py @@ -6,6 +6,7 @@ from __future__ import division from __future__ import print_function from collections import namedtuple +import logging import numpy as np import time @@ -15,15 +16,17 @@ from ray.tune.trial import Resources from ray.rllib.agents.es import optimizers from ray.rllib.agents.es import policies -from ray.rllib.agents.es import tabular_logger as tlogger from ray.rllib.agents.es import utils from ray.rllib.utils import merge_dicts +logger = logging.getLogger(__name__) + Result = namedtuple("Result", [ "noise_indices", "noisy_returns", "sign_noisy_returns", "noisy_lengths", "eval_returns", "eval_lengths" ]) +# yapf: disable # __sphinx_doc_begin__ DEFAULT_CONFIG = with_common_config({ "l2_coeff": 0.005, @@ -39,6 +42,7 @@ DEFAULT_CONFIG = with_common_config({ "report_length": 10, }) # __sphinx_doc_end__ +# yapf: enable @ray.remote @@ -169,12 +173,12 @@ class ESAgent(Agent): self.report_length = self.config["report_length"] # Create the shared noise table. - print("Creating shared noise table.") + logger.info("Creating shared noise table.") noise_id = create_shared_noise.remote(self.config["noise_size"]) self.noise = SharedNoiseTable(ray.get(noise_id)) # Create the actors. - print("Creating actors.") + logger.info("Creating actors.") self.workers = [ Worker.remote(self.config, policy_params, self.env_creator, noise_id) for _ in range(self.config["num_workers"]) @@ -188,8 +192,9 @@ class ESAgent(Agent): num_episodes, num_timesteps = 0, 0 results = [] while num_episodes < min_episodes or num_timesteps < min_timesteps: - print("Collected {} episodes {} timesteps so far this iter".format( - num_episodes, num_timesteps)) + logger.info( + "Collected {} episodes {} timesteps so far this iter".format( + num_episodes, num_timesteps)) rollout_ids = [ worker.do_rollouts.remote(theta_id) for worker in self.workers ] @@ -269,21 +274,6 @@ class ESAgent(Agent): if len(all_eval_returns) > 0: self.reward_list.append(np.mean(eval_returns)) - tlogger.record_tabular("EvalEpRewStd", eval_returns.std()) - tlogger.record_tabular("EvalEpLenMean", eval_lengths.mean()) - - tlogger.record_tabular("EpRewMean", noisy_returns.mean()) - tlogger.record_tabular("EpRewStd", noisy_returns.std()) - tlogger.record_tabular("EpLenMean", noisy_lengths.mean()) - - tlogger.record_tabular("Norm", float(np.square(theta).sum())) - tlogger.record_tabular("GradNorm", float(np.square(g).sum())) - tlogger.record_tabular("UpdateRatio", float(update_ratio)) - - tlogger.record_tabular("EpisodesThisIter", noisy_lengths.size) - tlogger.record_tabular("EpisodesSoFar", self.episodes_so_far) - tlogger.dump_tabular() - info = { "weights_norm": np.square(theta).sum(), "grad_norm": np.square(g).sum(), diff --git a/python/ray/rllib/agents/es/tabular_logger.py b/python/ray/rllib/agents/es/tabular_logger.py deleted file mode 100644 index 1463e59e0..000000000 --- a/python/ray/rllib/agents/es/tabular_logger.py +++ /dev/null @@ -1,229 +0,0 @@ -# Code in this file is copied and adapted from -# https://github.com/openai/evolution-strategies-starter. - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from collections import OrderedDict -import os -import sys -import time - -import tensorflow as tf -from tensorflow.core.util import event_pb2 -from tensorflow.python import pywrap_tensorflow -from tensorflow.python.util import compat - -DEBUG = 10 -INFO = 20 -WARN = 30 -ERROR = 40 - -DISABLED = 50 - - -class TbWriter(object): - """Based on SummaryWriter, but changed to allow for a different prefix.""" - - def __init__(self, dir, prefix): - self.dir = dir - # Start at 1, because EvWriter automatically generates an object with - # step = 0. - self.step = 1 - self.evwriter = pywrap_tensorflow.EventsWriter( - compat.as_bytes(os.path.join(dir, prefix))) - - def write_values(self, key2val): - summary = tf.Summary(value=[ - tf.Summary.Value(tag=k, simple_value=float(v)) - for (k, v) in key2val.items() - ]) - event = event_pb2.Event(wall_time=time.time(), summary=summary) - event.step = self.step - self.evwriter.WriteEvent(event) - self.evwriter.Flush() - self.step += 1 - - def close(self): - self.evwriter.Close() - - -# API - - -def start(dir): - if _Logger.CURRENT is not _Logger.DEFAULT: - sys.stderr.write("WARNING: You asked to start logging (dir=%s), but " - "you never stopped the previous logger (dir=%s)." - "\n" % (dir, _Logger.CURRENT.dir)) - _Logger.CURRENT = _Logger(dir=dir) - - -def stop(): - if _Logger.CURRENT is _Logger.DEFAULT: - sys.stderr.write("WARNING: You asked to stop logging, but you never " - "started any previous logger." - "\n" % (dir, _Logger.CURRENT.dir)) - return - _Logger.CURRENT.close() - _Logger.CURRENT = _Logger.DEFAULT - - -def record_tabular(key, val): - """Log a value of some diagnostic. - - Call this once for each diagnostic quantity, each iteration. - """ - _Logger.CURRENT.record_tabular(key, val) - - -def dump_tabular(): - """Write all of the diagnostics from the current iteration.""" - _Logger.CURRENT.dump_tabular() - - -def log(*args, **kwargs): - """Write the sequence of args, with no separators. - - This is written to the console and output files (if you've configured an - output file). - """ - level = kwargs['level'] if 'level' in kwargs else INFO - _Logger.CURRENT.log(*args, level=level) - - -def debug(*args): - log(*args, level=DEBUG) - - -def info(*args): - log(*args, level=INFO) - - -def warn(*args): - log(*args, level=WARN) - - -def error(*args): - log(*args, level=ERROR) - - -def set_level(level): - """ - Set logging threshold on current logger. - """ - _Logger.CURRENT.set_level(level) - - -def get_dir(): - """ - Get directory that log files are being written to. - will be None if there is no output directory (i.e., if you didn't call - start) - """ - return _Logger.CURRENT.get_dir() - - -def get_expt_dir(): - sys.stderr.write("get_expt_dir() is Deprecated. Switch to get_dir()\n") - return get_dir() - - -# Backend - - -class _Logger(object): - # A logger with no output files. (See right below class definition) so that - # you can still log to the terminal without setting up any output files. - DEFAULT = None - # Current logger being used by the free functions above. - CURRENT = None - - def __init__(self, dir=None): - self.name2val = OrderedDict() # Values this iteration. - self.level = INFO - self.dir = dir - self.text_outputs = [sys.stdout] - if dir is not None: - os.makedirs(dir, exist_ok=True) - self.text_outputs.append(open(os.path.join(dir, "log.txt"), "w")) - self.tbwriter = TbWriter(dir=dir, prefix="events") - else: - self.tbwriter = None - - # Logging API, forwarded - - def record_tabular(self, key, val): - self.name2val[key] = val - - def dump_tabular(self): - # Create strings for printing. - key2str = OrderedDict() - for (key, val) in self.name2val.items(): - if hasattr(val, "__float__"): - valstr = "%-8.3g" % val - else: - valstr = val - key2str[self._truncate(key)] = self._truncate(valstr) - keywidth = max(map(len, key2str.keys())) - valwidth = max(map(len, key2str.values())) - # Write to all text outputs - self._write_text("-" * (keywidth + valwidth + 7), "\n") - for (key, val) in key2str.items(): - self._write_text("| ", key, " " * (keywidth - len(key)), " | ", - val, " " * (valwidth - len(val)), " |\n") - self._write_text("-" * (keywidth + valwidth + 7), "\n") - for f in self.text_outputs: - try: - f.flush() - except OSError: - sys.stderr.write('Warning! OSError when flushing.\n') - # Write to tensorboard - if self.tbwriter is not None: - self.tbwriter.write_values(self.name2val) - self.name2val.clear() - - def log(self, *args, **kwargs): - level = kwargs['level'] if 'level' in kwargs else INFO - if self.level <= level: - self._do_log(*args) - - # Configuration - - def set_level(self, level): - self.level = level - - def get_dir(self): - return self.dir - - def close(self): - for f in self.text_outputs[1:]: - f.close() - if self.tbwriter: - self.tbwriter.close() - - # Misc - - def _do_log(self, *args): - self._write_text(*args + ('\n', )) - for f in self.text_outputs: - try: - f.flush() - except OSError: - print('Warning! OSError when flushing.') - - def _write_text(self, *strings): - for f in self.text_outputs: - for string in strings: - f.write(string) - - def _truncate(self, s): - if len(s) > 33: - return s[:30] + "..." - else: - return s - - -_Logger.DEFAULT = _Logger() -_Logger.CURRENT = _Logger.DEFAULT diff --git a/python/ray/rllib/agents/impala/impala.py b/python/ray/rllib/agents/impala/impala.py index a303643f5..fe53a5fb5 100644 --- a/python/ray/rllib/agents/impala/impala.py +++ b/python/ray/rllib/agents/impala/impala.py @@ -23,6 +23,7 @@ OPTIMIZER_SHARED_CONFIGS = [ "max_sample_requests_in_flight_per_worker", ] +# yapf: disable # __sphinx_doc_begin__ DEFAULT_CONFIG = with_common_config({ # V-trace params (see vtrace.py). @@ -65,8 +66,8 @@ DEFAULT_CONFIG = with_common_config({ "vf_loss_coeff": 0.5, "entropy_coeff": -0.01, }) - # __sphinx_doc_end__ +# yapf: enable class ImpalaAgent(Agent): diff --git a/python/ray/rllib/agents/pg/pg.py b/python/ray/rllib/agents/pg/pg.py index edc24ca1b..8ef5170bb 100644 --- a/python/ray/rllib/agents/pg/pg.py +++ b/python/ray/rllib/agents/pg/pg.py @@ -8,6 +8,7 @@ from ray.rllib.optimizers import SyncSamplesOptimizer from ray.rllib.utils import merge_dicts from ray.tune.trial import Resources +# yapf: disable # __sphinx_doc_begin__ DEFAULT_CONFIG = with_common_config({ # No remote workers by default @@ -15,8 +16,8 @@ DEFAULT_CONFIG = with_common_config({ # Learning rate "lr": 0.0004, }) - # __sphinx_doc_end__ +# yapf: enable class PGAgent(Agent): diff --git a/python/ray/rllib/agents/ppo/ppo.py b/python/ray/rllib/agents/ppo/ppo.py index 0527bcddd..480bf66e4 100644 --- a/python/ray/rllib/agents/ppo/ppo.py +++ b/python/ray/rllib/agents/ppo/ppo.py @@ -2,12 +2,17 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import logging + from ray.rllib.agents import Agent, with_common_config from ray.rllib.agents.ppo.ppo_policy_graph import PPOPolicyGraph from ray.rllib.utils import merge_dicts from ray.rllib.optimizers import SyncSamplesOptimizer, LocalMultiGPUOptimizer from ray.tune.trial import Resources +logger = logging.getLogger(__name__) + +# yapf: disable # __sphinx_doc_begin__ DEFAULT_CONFIG = with_common_config({ # If true, use the Generalized Advantage Estimator (GAE) @@ -55,8 +60,8 @@ DEFAULT_CONFIG = with_common_config({ # Use the sync samples optimizer instead of the multi-gpu one "simple_optimizer": False, }) - # __sphinx_doc_end__ +# yapf: enable class PPOAgent(Agent): @@ -111,7 +116,7 @@ class PPOAgent(Agent): if waste_ratio > 1.5: raise ValueError(msg) else: - print("Warning: " + msg) + logger.warn(msg) if self.config["sgd_minibatch_size"] > self.config["train_batch_size"]: raise ValueError( "Minibatch size {} must be <= train batch size {}.".format( diff --git a/python/ray/rllib/evaluation/policy_evaluator.py b/python/ray/rllib/evaluation/policy_evaluator.py index db88eb759..0578c3417 100644 --- a/python/ray/rllib/evaluation/policy_evaluator.py +++ b/python/ray/rllib/evaluation/policy_evaluator.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function import gym +import logging import pickle import tensorflow as tf @@ -99,7 +100,8 @@ class PolicyEvaluator(EvaluatorInterface): model_config=None, policy_config=None, worker_index=0, - monitor_path=None): + monitor_path=None, + log_level=None): """Initialize a policy evaluator. Arguments: @@ -158,8 +160,12 @@ class PolicyEvaluator(EvaluatorInterface): through EnvContext so that envs can be configured per worker. monitor_path (str): Write out episode stats and videos to this directory if specified. + log_level (str): Set the root log level on creation. """ + if log_level: + logging.getLogger("ray.rllib").setLevel(log_level) + env_context = EnvContext(env_config or {}, worker_index) policy_config = policy_config or {} self.policy_config = policy_config diff --git a/python/ray/rllib/evaluation/sampler.py b/python/ray/rllib/evaluation/sampler.py index 2f90dcbab..85d5386b1 100644 --- a/python/ray/rllib/evaluation/sampler.py +++ b/python/ray/rllib/evaluation/sampler.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function from collections import defaultdict, namedtuple +import logging import numpy as np import six.moves.queue as queue import threading @@ -16,6 +17,8 @@ from ray.rllib.env.atari_wrappers import get_wrapper_by_cls, MonitorEnv from ray.rllib.models.action_dist import TupleActions from ray.rllib.utils.tf_run_builder import TFRunBuilder +logger = logging.getLogger(__name__) + RolloutMetrics = namedtuple( "RolloutMetrics", ["episode_length", "episode_reward", "agent_rewards"]) @@ -221,7 +224,7 @@ def _env_runner(async_vector_env, horizon = ( async_vector_env.get_unwrapped()[0].spec.max_episode_steps) except Exception: - print("*** WARNING ***: no episode horizon specified, assuming inf") + logger.warn("no episode horizon specified, assuming inf") if not horizon: horizon = float("inf") diff --git a/python/ray/rllib/models/catalog.py b/python/ray/rllib/models/catalog.py index 1c55cb79e..4c0a20f77 100644 --- a/python/ray/rllib/models/catalog.py +++ b/python/ray/rllib/models/catalog.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function import gym +import logging import numpy as np import tensorflow as tf from functools import partial @@ -21,6 +22,9 @@ from ray.rllib.models.fcnet import FullyConnectedNetwork from ray.rllib.models.visionnet import VisionNetwork from ray.rllib.models.lstm import LSTM +logger = logging.getLogger(__name__) + +# yapf: disable # __sphinx_doc_begin__ MODEL_DEFAULTS = { # === Built-in options === @@ -67,8 +71,8 @@ MODEL_DEFAULTS = { # Extra options to pass to the custom classes "custom_options": {}, } - # __sphinx_doc_end__ +# yapf: enable class ModelCatalog(object): @@ -200,7 +204,7 @@ class ModelCatalog(object): seq_lens): if options.get("custom_model"): model = options["custom_model"] - print("Using custom model {}".format(model)) + logger.info("Using custom model {}".format(model)) return _global_registry.get(RLLIB_MODEL, model)( input_dict, obs_space, @@ -238,7 +242,7 @@ class ModelCatalog(object): options = options or MODEL_DEFAULTS if options.get("custom_model"): model = options["custom_model"] - print("Using custom torch model {}".format(model)) + logger.info("Using custom torch model {}".format(model)) return _global_registry.get(RLLIB_MODEL, model)( input_shape, num_outputs, options) @@ -271,7 +275,7 @@ class ModelCatalog(object): if options.get("custom_preprocessor"): preprocessor = options["custom_preprocessor"] - print("Using custom preprocessor {}".format(preprocessor)) + logger.info("Using custom preprocessor {}".format(preprocessor)) return _global_registry.get(RLLIB_PREPROCESSOR, preprocessor)( env.observation_space, options) diff --git a/python/ray/rllib/models/preprocessors.py b/python/ray/rllib/models/preprocessors.py index b7e084062..8144b5706 100644 --- a/python/ray/rllib/models/preprocessors.py +++ b/python/ray/rllib/models/preprocessors.py @@ -1,13 +1,17 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function + import cv2 +import logging import numpy as np import gym ATARI_OBS_SHAPE = (210, 160, 3) ATARI_RAM_OBS_SHAPE = (128, ) +logger = logging.getLogger(__name__) + class Preprocessor(object): """Defines an abstract observation preprocessor function. @@ -128,7 +132,7 @@ class TupleFlatteningPreprocessor(Preprocessor): self.preprocessors = [] for i in range(len(self._obs_space.spaces)): space = self._obs_space.spaces[i] - print("Creating sub-preprocessor for", space) + logger.info("Creating sub-preprocessor for {}".format(space)) preprocessor = get_preprocessor(space)(space, self._options) self.preprocessors.append(preprocessor) size += preprocessor.size @@ -153,7 +157,7 @@ class DictFlatteningPreprocessor(Preprocessor): size = 0 self.preprocessors = [] for space in self._obs_space.spaces.values(): - print("Creating sub-preprocessor for", space) + logger.info("Creating sub-preprocessor for {}".format(space)) preprocessor = get_preprocessor(space)(space, self._options) self.preprocessors.append(preprocessor) size += preprocessor.size diff --git a/python/ray/rllib/models/pytorch/fcnet.py b/python/ray/rllib/models/pytorch/fcnet.py index e8f50da2f..f69cb7ca2 100644 --- a/python/ray/rllib/models/pytorch/fcnet.py +++ b/python/ray/rllib/models/pytorch/fcnet.py @@ -2,10 +2,14 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import logging + from ray.rllib.models.pytorch.model import Model, SlimFC from ray.rllib.models.pytorch.misc import normc_initializer import torch.nn as nn +logger = logging.getLogger(__name__) + class FullyConnectedNetwork(Model): """TODO(rliaw): Logits, Value should both be contained here""" @@ -19,7 +23,7 @@ class FullyConnectedNetwork(Model): activation = nn.Tanh elif fcnet_activation == "relu": activation = nn.ReLU - print("Constructing fcnet {} {}".format(hiddens, activation)) + logger.info("Constructing fcnet {} {}".format(hiddens, activation)) layers = [] last_layer_size = inputs diff --git a/python/ray/rllib/optimizers/async_samples_optimizer.py b/python/ray/rllib/optimizers/async_samples_optimizer.py index 69f5e849b..5ad6bd809 100644 --- a/python/ray/rllib/optimizers/async_samples_optimizer.py +++ b/python/ray/rllib/optimizers/async_samples_optimizer.py @@ -6,6 +6,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import logging import numpy as np import random import time @@ -20,6 +21,8 @@ from ray.rllib.utils.actors import TaskPool from ray.rllib.utils.timer import TimerStat from ray.rllib.utils.window_stat import WindowStat +logger = logging.getLogger(__name__) + LEARNER_QUEUE_MAX_SIZE = 16 NUM_DATA_LOAD_THREADS = 16 @@ -84,7 +87,7 @@ class TFMultiGPULearner(LearnerThread): self.devices = ["/cpu:0"] else: self.devices = ["/gpu:{}".format(i) for i in range(num_gpus)] - print("TFMultiGPULearner devices", self.devices) + logger.info("TFMultiGPULearner devices {}".format(self.devices)) assert self.train_batch_size % len(self.devices) == 0 assert self.train_batch_size >= len(self.devices), "batch too small" self.policy = self.local_evaluator.policy_map["default"] @@ -199,7 +202,7 @@ class AsyncSamplesOptimizer(PolicyOptimizer): self.sample_batch_size = sample_batch_size if num_gpus > 1 or num_parallel_data_loaders > 1: - print( + logger.info( "Enabling multi-GPU mode, {} GPUs, {} parallel loaders".format( num_gpus, num_parallel_data_loaders)) if train_batch_size // max(1, num_gpus) % ( diff --git a/python/ray/rllib/optimizers/multi_gpu_optimizer.py b/python/ray/rllib/optimizers/multi_gpu_optimizer.py index 4595415a1..7e01ee904 100644 --- a/python/ray/rllib/optimizers/multi_gpu_optimizer.py +++ b/python/ray/rllib/optimizers/multi_gpu_optimizer.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import logging import numpy as np from collections import defaultdict import tensorflow as tf @@ -12,6 +13,8 @@ from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer from ray.rllib.optimizers.multi_gpu_impl import LocalSyncParallelOptimizer from ray.rllib.utils.timer import TimerStat +logger = logging.getLogger(__name__) + class LocalMultiGPUOptimizer(PolicyOptimizer): """A synchronous optimizer that uses multiple local GPUs. @@ -53,7 +56,7 @@ class LocalMultiGPUOptimizer(PolicyOptimizer): self.update_weights_timer = TimerStat() self.standardize_fields = standardize_fields - print("LocalMultiGPUOptimizer devices", self.devices) + logger.info("LocalMultiGPUOptimizer devices {}".format(self.devices)) if set(self.local_evaluator.policy_map.keys()) != {"default"}: raise ValueError( @@ -126,7 +129,7 @@ class LocalMultiGPUOptimizer(PolicyOptimizer): with self.grad_timer: num_batches = ( int(tuples_per_device) // int(self.per_device_batch_size)) - print("== sgd epochs ==") + logger.debug("== sgd epochs ==") for i in range(self.num_sgd_iter): iter_extra_fetches = defaultdict(list) permutation = np.random.permutation(num_batches) @@ -136,7 +139,7 @@ class LocalMultiGPUOptimizer(PolicyOptimizer): permutation[batch_index] * self.per_device_batch_size) for k, v in batch_fetches.items(): iter_extra_fetches[k].append(v) - print(i, _averaged(iter_extra_fetches)) + logger.debug("{} {}".format(i, _averaged(iter_extra_fetches))) self.num_steps_sampled += samples.count self.num_steps_trained += samples.count diff --git a/python/ray/rllib/optimizers/policy_optimizer.py b/python/ray/rllib/optimizers/policy_optimizer.py index 21fcf5f0b..9d83140e9 100644 --- a/python/ray/rllib/optimizers/policy_optimizer.py +++ b/python/ray/rllib/optimizers/policy_optimizer.py @@ -2,11 +2,15 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import logging + import ray from ray.rllib.evaluation.policy_evaluator import PolicyEvaluator from ray.rllib.evaluation.metrics import collect_episodes, summarize_episodes from ray.rllib.evaluation.sample_batch import MultiAgentBatch +logger = logging.getLogger(__name__) + class PolicyOptimizer(object): """Policy optimizers encapsulate distributed RL optimization strategies. diff --git a/python/ray/rllib/optimizers/sync_samples_optimizer.py b/python/ray/rllib/optimizers/sync_samples_optimizer.py index 20922ff54..38d5269f0 100644 --- a/python/ray/rllib/optimizers/sync_samples_optimizer.py +++ b/python/ray/rllib/optimizers/sync_samples_optimizer.py @@ -3,11 +3,14 @@ from __future__ import division from __future__ import print_function import ray +import logging from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer from ray.rllib.evaluation.sample_batch import SampleBatch from ray.rllib.utils.filter import RunningStat from ray.rllib.utils.timer import TimerStat +logger = logging.getLogger(__name__) + class SyncSamplesOptimizer(PolicyOptimizer): """A simple synchronous RL optimizer. @@ -52,7 +55,7 @@ class SyncSamplesOptimizer(PolicyOptimizer): if "stats" in fetches: self.learner_stats = fetches["stats"] if self.num_sgd_iter > 1: - print(i, fetches) + logger.debug("{} {}".format(i, fetches)) self.grad_timer.push_units_processed(samples.count) self.num_steps_sampled += samples.count diff --git a/python/ray/rllib/utils/actors.py b/python/ray/rllib/utils/actors.py index e865feb43..487c3595e 100644 --- a/python/ray/rllib/utils/actors.py +++ b/python/ray/rllib/utils/actors.py @@ -2,9 +2,12 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import logging import os import ray +logger = logging.getLogger(__name__) + class TaskPool(object): """Helper class for tracking the status of many in-flight actor tasks.""" @@ -80,11 +83,12 @@ def split_colocated(actors): def try_create_colocated(cls, args, count): actors = [cls.remote(*args) for _ in range(count)] local, _ = split_colocated(actors) - print("Got {} colocated actors of {}".format(len(local), count)) + logger.info("Got {} colocated actors of {}".format(len(local), count)) return local def create_colocated(cls, args, count): + logger.info("Trying to create {} colocated actors".format(count)) ok = [] i = 1 while len(ok) < count and i < 10: diff --git a/python/ray/rllib/utils/compression.py b/python/ray/rllib/utils/compression.py index 5f28455ee..aed0dd598 100644 --- a/python/ray/rllib/utils/compression.py +++ b/python/ray/rllib/utils/compression.py @@ -2,18 +2,21 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import logging import time import base64 import numpy as np import pyarrow +logger = logging.getLogger(__name__) + try: import lz4.frame LZ4_ENABLED = True except ImportError: - print("WARNING: lz4 not available, disabling sample compression. " - "This will significantly impact RLlib performance. " - "To install lz4, run `pip install lz4`.") + logger.warn("lz4 not available, disabling sample compression. " + "This will significantly impact RLlib performance. " + "To install lz4, run `pip install lz4`.") LZ4_ENABLED = False diff --git a/python/ray/rllib/utils/policy_client.py b/python/ray/rllib/utils/policy_client.py index 901dc983b..1bb4b5e13 100644 --- a/python/ray/rllib/utils/policy_client.py +++ b/python/ray/rllib/utils/policy_client.py @@ -2,14 +2,17 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import logging import pickle +logger = logging.getLogger(__name__) + try: import requests # `requests` is not part of stdlib. except ImportError: requests = None - print("Couldn't import `requests` library. Be sure to install it on" - " the client side.") + logger.warn("Couldn't import `requests` library. Be sure to install it on" + " the client side.") class PolicyClient(object): @@ -109,8 +112,7 @@ class PolicyClient(object): payload = pickle.dumps(data) response = requests.post(self._address, data=payload) if response.status_code != 200: - print("Request failed", data) - print(response.text) + logger.error("Request failed {}: {}".format(response.text, data)) response.raise_for_status() parsed = pickle.loads(response.content) return parsed diff --git a/python/ray/rllib/utils/tf_run_builder.py b/python/ray/rllib/utils/tf_run_builder.py index 2ea3ba7b8..ce1c58279 100644 --- a/python/ray/rllib/utils/tf_run_builder.py +++ b/python/ray/rllib/utils/tf_run_builder.py @@ -2,12 +2,15 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import logging import os import time import tensorflow as tf from tensorflow.python.client import timeline +logger = logging.getLogger(__name__) + class TFRunBuilder(object): """Used to incrementally build up a TensorFlow run. @@ -43,7 +46,7 @@ class TFRunBuilder(object): self.session, self.fetches, self.debug_name, self.feed_dict, os.environ.get("TF_TIMELINE_DIR")) except Exception as e: - print("Error fetching: {}, feed_dict={}".format( + logger.error("Error fetching: {}, feed_dict={}".format( self.fetches, self.feed_dict)) raise e if isinstance(to_fetch, int): @@ -76,8 +79,8 @@ def run_timeline(sess, ops, debug_name, feed_dict={}, timeline_dir=None): debug_name, os.getpid(), _count)) _count += 1 trace_file = open(outf, "w") - print("Wrote tf timeline ({} s) to {}".format(time.time() - start, - os.path.abspath(outf))) + logger.info("Wrote tf timeline ({} s) to {}".format( + time.time() - start, os.path.abspath(outf))) trace_file.write(trace.generate_chrome_trace_format()) else: fetches = sess.run(ops, feed_dict=feed_dict)