diff --git a/python/ray/rllib/agent.py b/python/ray/rllib/agent.py index f1ac6573c..fd2fac08f 100644 --- a/python/ray/rllib/agent.py +++ b/python/ray/rllib/agent.py @@ -4,24 +4,18 @@ from __future__ import print_function from datetime import datetime -import json import logging import numpy as np import os import pickle -import sys import tempfile import time import uuid import tensorflow as tf +from ray.tune.logger import UnifiedLogger from ray.tune.result import TrainingResult -if sys.version_info[0] == 2: - import cStringIO as StringIO -elif sys.version_info[0] == 3: - import io as StringIO - logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -39,24 +33,18 @@ class Agent(object): """ _allow_unknown_configs = False + _default_logdir = "/tmp/ray" def __init__( - self, env_creator, config, local_dir='/tmp/ray', - upload_dir=None, experiment_tag=None): + self, env_creator, config, logger_creator=None): """Initialize an RLLib agent. Args: env_creator (str|func): Name of the OpenAI gym environment to train against, or a function that creates such an env. - config (obj): Algorithm-specific configuration data. - local_dir (str): Directory where results and temporary files will - be placed. - upload_dir (str): Optional remote URI like s3://bucketname/ where - results will be uploaded. - experiment_tag (str): Optional string containing extra metadata - about the experiment, e.g. a summary of parameters. This string - will be included in the logdir path and when displaying agent - progress. + config (dict): Algorithm-specific configuration data. + logger_creator (func): Function that creates a ray.tune.Logger + object. If unspecified, a default logger is created. """ self._initialize_ok = False self._experiment_id = uuid.uuid4().hex @@ -79,40 +67,20 @@ class Agent(object): "Unknown agent config `{}`, " "all agent configs: {}".format(k, self.config.keys())) self.config.update(config) - self.config.update({ - "experiment_tag": experiment_tag, - "alg": self._agent_name, - "env_name": env_name, - "experiment_id": self._experiment_id, - }) - logdir_suffix = "{}_{}_{}".format( - env_name, - self._agent_name, - experiment_tag or datetime.today().strftime("%Y-%m-%d_%H-%M-%S")) - - if not os.path.exists(local_dir): - os.makedirs(local_dir) - - self.logdir = tempfile.mkdtemp(prefix=logdir_suffix, dir=local_dir) - - if upload_dir: - log_upload_uri = os.path.join(upload_dir, logdir_suffix) + if logger_creator: + self._result_logger = logger_creator(self.config) + self.logdir = self._result_logger.logdir else: - log_upload_uri = None - - # TODO(ekl) consider inlining config into the result jsons - config_out = os.path.join(self.logdir, "config.json") - with open(config_out, "w") as f: - json.dump(self.config, f, sort_keys=True, cls=_Encoder) - logger.info( - "%s agent created with logdir '%s' and upload uri '%s'", - self.__class__.__name__, self.logdir, log_upload_uri) - - self._result_logger = _Logger( - os.path.join(self.logdir, "result.json"), - log_upload_uri and os.path.join(log_upload_uri, "result.json")) - self._file_writer = tf.summary.FileWriter(self.logdir) + logdir_suffix = "{}_{}_{}".format( + env_name, + self._agent_name, + datetime.today().strftime("%Y-%m-%d_%H-%M-%S")) + if not os.path.exists(self._default_logdir): + os.makedirs(self._default_logdir) + self.logdir = tempfile.mkdtemp( + prefix=logdir_suffix, dir=self._default_logdir) + self._result_logger = UnifiedLogger(self.config, self.logdir, None) self._iteration = 0 self._time_total = 0.0 @@ -161,29 +129,10 @@ class Agent(object): pid=os.getpid(), hostname=os.uname()[1]) - self._log_result(result) + self._result_logger.on_result(result) return result - def _log_result(self, result): - """Appends the given result to this agent's log dir.""" - - # We need to use a custom json serializer class so that NaNs get - # encoded as null as required by Athena. - json.dump(result._asdict(), self._result_logger, cls=_Encoder) - self._result_logger.write("\n") - attrs_to_log = [ - "time_this_iter_s", "mean_loss", "mean_accuracy", - "episode_reward_mean", "episode_len_mean"] - values = [] - for attr in attrs_to_log: - if getattr(result, attr) is not None: - values.append(tf.Summary.Value( - tag="ray/tune/{}".format(attr), - simple_value=getattr(result, attr))) - train_stats = tf.Summary(value=values) - self._file_writer.add_summary(train_stats, result.training_iteration) - def save(self): """Saves the current model state to a checkpoint. @@ -214,7 +163,7 @@ class Agent(object): def stop(self): """Releases all resources used by this agent.""" - self._file_writer.close() + self._result_logger.close() def compute_action(self, observation): """Computes an action using the current trained policy.""" @@ -255,61 +204,6 @@ class Agent(object): raise NotImplementedError -class _Encoder(json.JSONEncoder): - - def __init__(self, nan_str="null", **kwargs): - super(_Encoder, self).__init__(**kwargs) - self.nan_str = nan_str - - def iterencode(self, o, _one_shot=False): - if self.ensure_ascii: - _encoder = json.encoder.encode_basestring_ascii - else: - _encoder = json.encoder.encode_basestring - - def floatstr(o, allow_nan=self.allow_nan, nan_str=self.nan_str): - return repr(o) if not np.isnan(o) else nan_str - - _iterencode = json.encoder._make_iterencode( - None, self.default, _encoder, self.indent, floatstr, - self.key_separator, self.item_separator, self.sort_keys, - self.skipkeys, _one_shot) - return _iterencode(o, 0) - - def default(self, value): - if np.isnan(value): - return None - if np.issubdtype(value, float): - return float(value) - if np.issubdtype(value, int): - return int(value) - - -class _Logger(object): - """Writing small amounts of data to S3 with real-time updates. - """ - - def __init__(self, local_file, uri=None): - self.local_out = open(local_file, "w") - self.result_buffer = StringIO.StringIO() - self.uri = uri - if self.uri: - import smart_open - self.smart_open = smart_open.smart_open - - def write(self, b): - self.local_out.write(b) - self.local_out.flush() - # TODO(pcm): At the moment we are writing the whole results output from - # the beginning in each iteration. This will write O(n^2) bytes where n - # is the number of bytes printed so far. Fix this! This should at least - # only write the last 5MBs (S3 chunksize). - if self.uri: - with self.smart_open(self.uri, "w") as f: - self.result_buffer.write(b) - f.write(self.result_buffer.getvalue()) - - class _MockAgent(Agent): """Mock agent for use in tests""" diff --git a/python/ray/tune/logger.py b/python/ray/tune/logger.py new file mode 100644 index 000000000..6faefc79d --- /dev/null +++ b/python/ray/tune/logger.py @@ -0,0 +1,164 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import csv +import json +import numpy as np +import os +import sys +import tensorflow as tf + +from ray.tune.result import TrainingResult + +if sys.version_info[0] == 2: + import cStringIO as StringIO +elif sys.version_info[0] == 3: + import io as StringIO + + +class Logger(object): + """Logging interface for ray.tune; specialized implementations follow. + + By default, the UnifiedLogger implementation is used which logs results in + multiple formats (TensorBoard, rllab/viskit, plain json) at once. + """ + + _attrs_to_log = [ + "time_this_iter_s", "mean_loss", "mean_accuracy", + "episode_reward_mean", "episode_len_mean"] + + def __init__(self, config, logdir, upload_uri=None): + self.config = config + self.logdir = logdir + self.uri = upload_uri + self._init() + + def _init(self): + pass + + def on_result(self, result): + """Given a result, appends it to the existing log.""" + + raise NotImplementedError + + def close(self): + """Releases all resources used by this logger.""" + + pass + + +class UnifiedLogger(Logger): + """Unified result logger for TensorBoard, rllab/viskit, plain json.""" + + def _init(self): + self._loggers = [] + for cls in [_JsonLogger, _TFLogger, _VisKitLogger]: + self._loggers.append(cls(self.config, self.logdir, self.uri)) + print("Unified logger created with logdir '{}'".format(self.logdir)) + + def on_result(self, result): + for logger in self._loggers: + logger.on_result(result) + + def close(self): + for logger in self._loggers: + logger.close() + + +class NoopLogger(Logger): + def on_result(self, result): + pass + + +class _JsonLogger(Logger): + def _init(self): + config_out = os.path.join(self.logdir, "params.json") + with open(config_out, "w") as f: + json.dump(self.config, f, sort_keys=True, cls=_CustomEncoder) + local_file = os.path.join(self.logdir, "result.json") + self.local_out = open(local_file, "w") + if self.uri: + self.result_buffer = StringIO.StringIO() + import smart_open + self.smart_open = smart_open.smart_open + + def on_result(self, result): + json.dump(result._asdict(), self, cls=_CustomEncoder) + self.write("\n") + + def write(self, b): + self.local_out.write(b) + self.local_out.flush() + # TODO(pcm): At the moment we are writing the whole results output from + # the beginning in each iteration. This will write O(n^2) bytes where n + # is the number of bytes printed so far. Fix this! This should at least + # only write the last 5MBs (S3 chunksize). + if self.uri: + with self.smart_open(self.uri, "w") as f: + self.result_buffer.write(b) + f.write(self.result_buffer.getvalue()) + + def close(self): + self.local_out.close() + + +class _TFLogger(Logger): + def _init(self): + self._file_writer = tf.summary.FileWriter(self.logdir) + + def on_result(self, result): + values = [] + for attr in Logger._attrs_to_log: + if getattr(result, attr) is not None: + values.append(tf.Summary.Value( + tag="ray/tune/{}".format(attr), + simple_value=getattr(result, attr))) + train_stats = tf.Summary(value=values) + self._file_writer.add_summary(train_stats, result.training_iteration) + + def close(self): + self._file_writer.close() + + +class _VisKitLogger(Logger): + def _init(self): + # Note that we assume params.json was already created by JsonLogger + self._file = open(os.path.join(self.logdir, "progress.csv"), "w") + self._csv_out = csv.DictWriter(self._file, TrainingResult._fields) + self._csv_out.writeheader() + + def on_result(self, result): + self._csv_out.writerow(result._asdict()) + + def close(self): + self._file.close() + + +class _CustomEncoder(json.JSONEncoder): + def __init__(self, nan_str="null", **kwargs): + super(_CustomEncoder, self).__init__(**kwargs) + self.nan_str = nan_str + + def iterencode(self, o, _one_shot=False): + if self.ensure_ascii: + _encoder = json.encoder.encode_basestring_ascii + else: + _encoder = json.encoder.encode_basestring + + def floatstr(o, allow_nan=self.allow_nan, nan_str=self.nan_str): + return repr(o) if not np.isnan(o) else nan_str + + _iterencode = json.encoder._make_iterencode( + None, self.default, _encoder, self.indent, floatstr, + self.key_separator, self.item_separator, self.sort_keys, + self.skipkeys, _one_shot) + return _iterencode(o, 0) + + def default(self, value): + if np.isnan(value): + return None + if np.issubdtype(value, float): + return float(value) + if np.issubdtype(value, int): + return int(value) diff --git a/python/ray/tune/script_runner.py b/python/ray/tune/script_runner.py index bdf3b133d..5de6b8675 100644 --- a/python/ray/tune/script_runner.py +++ b/python/ray/tune/script_runner.py @@ -159,7 +159,8 @@ class ScriptRunner(Agent): self._last_reported_timestep = result.timesteps_total self._last_reported_time = now self._iteration += 1 - self._log_result(result) + + self._result_logger.on_result(result) return result diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index 013cbc004..a77f4a0f6 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -2,12 +2,14 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import tempfile import traceback import ray import os from collections import namedtuple from ray.rllib.agent import get_agent_class +from ray.tune.logger import NoopLogger, UnifiedLogger class Resources( @@ -92,6 +94,8 @@ class Trial(object): self.agent = None self.status = Trial.PENDING self.location = None + self.logdir = None + self.result_logger = None def start(self): """Starts this trial. @@ -104,7 +108,7 @@ class Trial(object): if self._checkpoint_path: self.restore_from_path(path=self._checkpoint_path) - def stop(self, error=False): + def stop(self, error=False, stop_logger=True): """Stops this trial. Stops this trial, releasing all allocating resources. If stopping the @@ -126,10 +130,11 @@ class Trial(object): stop_tasks.append(self.agent.stop.remote()) stop_tasks.append(self.agent.__ray_terminate__.remote( self.agent._ray_actor_id.id())) + # TODO(ekl) seems like wait hangs when killing actors _, unfinished = ray.wait( - stop_tasks, num_returns=2, timeout=10000) + stop_tasks, num_returns=2, timeout=250) if unfinished: - print(("Stopping %s Actor was unsuccessful, " + print(("Stopping %s Actor timed out, " "but moving on...") % self) except Exception: print("Error stopping agent:", traceback.format_exc()) @@ -137,6 +142,10 @@ class Trial(object): finally: self.agent = None + if stop_logger and self.result_logger: + self.result_logger.close() + self.result_logger = None + def pause(self): """We want to release resources (specifically GPUs) when pausing an experiment. This results in a state similar to TERMINATED.""" @@ -144,7 +153,7 @@ class Trial(object): assert self.status == Trial.RUNNING, self.status try: self.checkpoint() - self.stop() + self.stop(stop_logger=False) self.status = Trial.PAUSED except Exception: print("Error pausing agent:", traceback.format_exc()) @@ -250,9 +259,19 @@ class Trial(object): cls = ray.remote( num_cpus=self.resources.driver_cpu_limit, num_gpus=self.resources.driver_gpu_limit)(agent_cls) + if not self.result_logger: + if not os.path.exists(self.local_dir): + os.makedirs(self.local_dir) + self.logdir = tempfile.mkdtemp( + prefix=str(self), dir=self.local_dir) + self.result_logger = UnifiedLogger( + self.config, self.logdir, self.upload_dir) + remote_logdir = self.logdir + # Logging for trials is handled centrally by TrialRunner, so + # configure the remote agent to use a noop-logger. self.agent = cls.remote( - self.env_creator, self.config, self.local_dir, self.upload_dir, - experiment_tag=self.experiment_tag) + self.env_creator, self.config, + lambda config: NoopLogger(config, remote_logdir)) def __str__(self): identifier = '{}_{}'.format(self.alg, self.env_name) diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 2c8b2a385..8eef585e9 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -159,6 +159,7 @@ class TrialRunner(object): del self._running[result_id] try: result = ray.get(result_id) + trial.result_logger.on_result(result) print("result", result) trial.last_result = result self._total_time += result.time_this_iter_s diff --git a/python/ray/tune/trial_scheduler.py b/python/ray/tune/trial_scheduler.py index 7ac0d3dce..3076c2854 100644 --- a/python/ray/tune/trial_scheduler.py +++ b/python/ray/tune/trial_scheduler.py @@ -1,5 +1,6 @@ from __future__ import absolute_import from __future__ import division +from __future__ import print_function from ray.tune.trial import Trial