[tune] [rllib] Centralized driver logging (#1208)

* logger v2

* add logger

* lint

* todo

* viskit works now

* doc

* remove none check

* fix timeout

* Missing Numpy for Sigmoid data
This commit is contained in:
Eric Liang
2017-11-15 22:11:47 -08:00
committed by GitHub
parent e066bcf633
commit 009f59defc
6 changed files with 213 additions and 133 deletions
+20 -126
View File
@@ -4,24 +4,18 @@ from __future__ import print_function
from datetime import datetime
import json
import logging
import numpy as np
import os
import pickle
import sys
import tempfile
import time
import uuid
import tensorflow as tf
from ray.tune.logger import UnifiedLogger
from ray.tune.result import TrainingResult
if sys.version_info[0] == 2:
import cStringIO as StringIO
elif sys.version_info[0] == 3:
import io as StringIO
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@@ -39,24 +33,18 @@ class Agent(object):
"""
_allow_unknown_configs = False
_default_logdir = "/tmp/ray"
def __init__(
self, env_creator, config, local_dir='/tmp/ray',
upload_dir=None, experiment_tag=None):
self, env_creator, config, logger_creator=None):
"""Initialize an RLLib agent.
Args:
env_creator (str|func): Name of the OpenAI gym environment to train
against, or a function that creates such an env.
config (obj): Algorithm-specific configuration data.
local_dir (str): Directory where results and temporary files will
be placed.
upload_dir (str): Optional remote URI like s3://bucketname/ where
results will be uploaded.
experiment_tag (str): Optional string containing extra metadata
about the experiment, e.g. a summary of parameters. This string
will be included in the logdir path and when displaying agent
progress.
config (dict): Algorithm-specific configuration data.
logger_creator (func): Function that creates a ray.tune.Logger
object. If unspecified, a default logger is created.
"""
self._initialize_ok = False
self._experiment_id = uuid.uuid4().hex
@@ -79,40 +67,20 @@ class Agent(object):
"Unknown agent config `{}`, "
"all agent configs: {}".format(k, self.config.keys()))
self.config.update(config)
self.config.update({
"experiment_tag": experiment_tag,
"alg": self._agent_name,
"env_name": env_name,
"experiment_id": self._experiment_id,
})
logdir_suffix = "{}_{}_{}".format(
env_name,
self._agent_name,
experiment_tag or datetime.today().strftime("%Y-%m-%d_%H-%M-%S"))
if not os.path.exists(local_dir):
os.makedirs(local_dir)
self.logdir = tempfile.mkdtemp(prefix=logdir_suffix, dir=local_dir)
if upload_dir:
log_upload_uri = os.path.join(upload_dir, logdir_suffix)
if logger_creator:
self._result_logger = logger_creator(self.config)
self.logdir = self._result_logger.logdir
else:
log_upload_uri = None
# TODO(ekl) consider inlining config into the result jsons
config_out = os.path.join(self.logdir, "config.json")
with open(config_out, "w") as f:
json.dump(self.config, f, sort_keys=True, cls=_Encoder)
logger.info(
"%s agent created with logdir '%s' and upload uri '%s'",
self.__class__.__name__, self.logdir, log_upload_uri)
self._result_logger = _Logger(
os.path.join(self.logdir, "result.json"),
log_upload_uri and os.path.join(log_upload_uri, "result.json"))
self._file_writer = tf.summary.FileWriter(self.logdir)
logdir_suffix = "{}_{}_{}".format(
env_name,
self._agent_name,
datetime.today().strftime("%Y-%m-%d_%H-%M-%S"))
if not os.path.exists(self._default_logdir):
os.makedirs(self._default_logdir)
self.logdir = tempfile.mkdtemp(
prefix=logdir_suffix, dir=self._default_logdir)
self._result_logger = UnifiedLogger(self.config, self.logdir, None)
self._iteration = 0
self._time_total = 0.0
@@ -161,29 +129,10 @@ class Agent(object):
pid=os.getpid(),
hostname=os.uname()[1])
self._log_result(result)
self._result_logger.on_result(result)
return result
def _log_result(self, result):
"""Appends the given result to this agent's log dir."""
# We need to use a custom json serializer class so that NaNs get
# encoded as null as required by Athena.
json.dump(result._asdict(), self._result_logger, cls=_Encoder)
self._result_logger.write("\n")
attrs_to_log = [
"time_this_iter_s", "mean_loss", "mean_accuracy",
"episode_reward_mean", "episode_len_mean"]
values = []
for attr in attrs_to_log:
if getattr(result, attr) is not None:
values.append(tf.Summary.Value(
tag="ray/tune/{}".format(attr),
simple_value=getattr(result, attr)))
train_stats = tf.Summary(value=values)
self._file_writer.add_summary(train_stats, result.training_iteration)
def save(self):
"""Saves the current model state to a checkpoint.
@@ -214,7 +163,7 @@ class Agent(object):
def stop(self):
"""Releases all resources used by this agent."""
self._file_writer.close()
self._result_logger.close()
def compute_action(self, observation):
"""Computes an action using the current trained policy."""
@@ -255,61 +204,6 @@ class Agent(object):
raise NotImplementedError
class _Encoder(json.JSONEncoder):
def __init__(self, nan_str="null", **kwargs):
super(_Encoder, self).__init__(**kwargs)
self.nan_str = nan_str
def iterencode(self, o, _one_shot=False):
if self.ensure_ascii:
_encoder = json.encoder.encode_basestring_ascii
else:
_encoder = json.encoder.encode_basestring
def floatstr(o, allow_nan=self.allow_nan, nan_str=self.nan_str):
return repr(o) if not np.isnan(o) else nan_str
_iterencode = json.encoder._make_iterencode(
None, self.default, _encoder, self.indent, floatstr,
self.key_separator, self.item_separator, self.sort_keys,
self.skipkeys, _one_shot)
return _iterencode(o, 0)
def default(self, value):
if np.isnan(value):
return None
if np.issubdtype(value, float):
return float(value)
if np.issubdtype(value, int):
return int(value)
class _Logger(object):
"""Writing small amounts of data to S3 with real-time updates.
"""
def __init__(self, local_file, uri=None):
self.local_out = open(local_file, "w")
self.result_buffer = StringIO.StringIO()
self.uri = uri
if self.uri:
import smart_open
self.smart_open = smart_open.smart_open
def write(self, b):
self.local_out.write(b)
self.local_out.flush()
# TODO(pcm): At the moment we are writing the whole results output from
# the beginning in each iteration. This will write O(n^2) bytes where n
# is the number of bytes printed so far. Fix this! This should at least
# only write the last 5MBs (S3 chunksize).
if self.uri:
with self.smart_open(self.uri, "w") as f:
self.result_buffer.write(b)
f.write(self.result_buffer.getvalue())
class _MockAgent(Agent):
"""Mock agent for use in tests"""
+164
View File
@@ -0,0 +1,164 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import csv
import json
import numpy as np
import os
import sys
import tensorflow as tf
from ray.tune.result import TrainingResult
if sys.version_info[0] == 2:
import cStringIO as StringIO
elif sys.version_info[0] == 3:
import io as StringIO
class Logger(object):
"""Logging interface for ray.tune; specialized implementations follow.
By default, the UnifiedLogger implementation is used which logs results in
multiple formats (TensorBoard, rllab/viskit, plain json) at once.
"""
_attrs_to_log = [
"time_this_iter_s", "mean_loss", "mean_accuracy",
"episode_reward_mean", "episode_len_mean"]
def __init__(self, config, logdir, upload_uri=None):
self.config = config
self.logdir = logdir
self.uri = upload_uri
self._init()
def _init(self):
pass
def on_result(self, result):
"""Given a result, appends it to the existing log."""
raise NotImplementedError
def close(self):
"""Releases all resources used by this logger."""
pass
class UnifiedLogger(Logger):
"""Unified result logger for TensorBoard, rllab/viskit, plain json."""
def _init(self):
self._loggers = []
for cls in [_JsonLogger, _TFLogger, _VisKitLogger]:
self._loggers.append(cls(self.config, self.logdir, self.uri))
print("Unified logger created with logdir '{}'".format(self.logdir))
def on_result(self, result):
for logger in self._loggers:
logger.on_result(result)
def close(self):
for logger in self._loggers:
logger.close()
class NoopLogger(Logger):
def on_result(self, result):
pass
class _JsonLogger(Logger):
def _init(self):
config_out = os.path.join(self.logdir, "params.json")
with open(config_out, "w") as f:
json.dump(self.config, f, sort_keys=True, cls=_CustomEncoder)
local_file = os.path.join(self.logdir, "result.json")
self.local_out = open(local_file, "w")
if self.uri:
self.result_buffer = StringIO.StringIO()
import smart_open
self.smart_open = smart_open.smart_open
def on_result(self, result):
json.dump(result._asdict(), self, cls=_CustomEncoder)
self.write("\n")
def write(self, b):
self.local_out.write(b)
self.local_out.flush()
# TODO(pcm): At the moment we are writing the whole results output from
# the beginning in each iteration. This will write O(n^2) bytes where n
# is the number of bytes printed so far. Fix this! This should at least
# only write the last 5MBs (S3 chunksize).
if self.uri:
with self.smart_open(self.uri, "w") as f:
self.result_buffer.write(b)
f.write(self.result_buffer.getvalue())
def close(self):
self.local_out.close()
class _TFLogger(Logger):
def _init(self):
self._file_writer = tf.summary.FileWriter(self.logdir)
def on_result(self, result):
values = []
for attr in Logger._attrs_to_log:
if getattr(result, attr) is not None:
values.append(tf.Summary.Value(
tag="ray/tune/{}".format(attr),
simple_value=getattr(result, attr)))
train_stats = tf.Summary(value=values)
self._file_writer.add_summary(train_stats, result.training_iteration)
def close(self):
self._file_writer.close()
class _VisKitLogger(Logger):
def _init(self):
# Note that we assume params.json was already created by JsonLogger
self._file = open(os.path.join(self.logdir, "progress.csv"), "w")
self._csv_out = csv.DictWriter(self._file, TrainingResult._fields)
self._csv_out.writeheader()
def on_result(self, result):
self._csv_out.writerow(result._asdict())
def close(self):
self._file.close()
class _CustomEncoder(json.JSONEncoder):
def __init__(self, nan_str="null", **kwargs):
super(_CustomEncoder, self).__init__(**kwargs)
self.nan_str = nan_str
def iterencode(self, o, _one_shot=False):
if self.ensure_ascii:
_encoder = json.encoder.encode_basestring_ascii
else:
_encoder = json.encoder.encode_basestring
def floatstr(o, allow_nan=self.allow_nan, nan_str=self.nan_str):
return repr(o) if not np.isnan(o) else nan_str
_iterencode = json.encoder._make_iterencode(
None, self.default, _encoder, self.indent, floatstr,
self.key_separator, self.item_separator, self.sort_keys,
self.skipkeys, _one_shot)
return _iterencode(o, 0)
def default(self, value):
if np.isnan(value):
return None
if np.issubdtype(value, float):
return float(value)
if np.issubdtype(value, int):
return int(value)
+2 -1
View File
@@ -159,7 +159,8 @@ class ScriptRunner(Agent):
self._last_reported_timestep = result.timesteps_total
self._last_reported_time = now
self._iteration += 1
self._log_result(result)
self._result_logger.on_result(result)
return result
+25 -6
View File
@@ -2,12 +2,14 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tempfile
import traceback
import ray
import os
from collections import namedtuple
from ray.rllib.agent import get_agent_class
from ray.tune.logger import NoopLogger, UnifiedLogger
class Resources(
@@ -92,6 +94,8 @@ class Trial(object):
self.agent = None
self.status = Trial.PENDING
self.location = None
self.logdir = None
self.result_logger = None
def start(self):
"""Starts this trial.
@@ -104,7 +108,7 @@ class Trial(object):
if self._checkpoint_path:
self.restore_from_path(path=self._checkpoint_path)
def stop(self, error=False):
def stop(self, error=False, stop_logger=True):
"""Stops this trial.
Stops this trial, releasing all allocating resources. If stopping the
@@ -126,10 +130,11 @@ class Trial(object):
stop_tasks.append(self.agent.stop.remote())
stop_tasks.append(self.agent.__ray_terminate__.remote(
self.agent._ray_actor_id.id()))
# TODO(ekl) seems like wait hangs when killing actors
_, unfinished = ray.wait(
stop_tasks, num_returns=2, timeout=10000)
stop_tasks, num_returns=2, timeout=250)
if unfinished:
print(("Stopping %s Actor was unsuccessful, "
print(("Stopping %s Actor timed out, "
"but moving on...") % self)
except Exception:
print("Error stopping agent:", traceback.format_exc())
@@ -137,6 +142,10 @@ class Trial(object):
finally:
self.agent = None
if stop_logger and self.result_logger:
self.result_logger.close()
self.result_logger = None
def pause(self):
"""We want to release resources (specifically GPUs) when pausing an
experiment. This results in a state similar to TERMINATED."""
@@ -144,7 +153,7 @@ class Trial(object):
assert self.status == Trial.RUNNING, self.status
try:
self.checkpoint()
self.stop()
self.stop(stop_logger=False)
self.status = Trial.PAUSED
except Exception:
print("Error pausing agent:", traceback.format_exc())
@@ -250,9 +259,19 @@ class Trial(object):
cls = ray.remote(
num_cpus=self.resources.driver_cpu_limit,
num_gpus=self.resources.driver_gpu_limit)(agent_cls)
if not self.result_logger:
if not os.path.exists(self.local_dir):
os.makedirs(self.local_dir)
self.logdir = tempfile.mkdtemp(
prefix=str(self), dir=self.local_dir)
self.result_logger = UnifiedLogger(
self.config, self.logdir, self.upload_dir)
remote_logdir = self.logdir
# Logging for trials is handled centrally by TrialRunner, so
# configure the remote agent to use a noop-logger.
self.agent = cls.remote(
self.env_creator, self.config, self.local_dir, self.upload_dir,
experiment_tag=self.experiment_tag)
self.env_creator, self.config,
lambda config: NoopLogger(config, remote_logdir))
def __str__(self):
identifier = '{}_{}'.format(self.alg, self.env_name)
+1
View File
@@ -159,6 +159,7 @@ class TrialRunner(object):
del self._running[result_id]
try:
result = ray.get(result_id)
trial.result_logger.on_result(result)
print("result", result)
trial.last_result = result
self._total_time += result.time_this_iter_s
+1
View File
@@ -1,5 +1,6 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.tune.trial import Trial