[rllib] Support parallel, parameterized evaluation (#6981)

* eval api

* update

* sync eval filters

* sync fix

* docs

* update

* docs

* update

* link

* nit

* doc updates

* format
This commit is contained in:
Eric Liang
2020-02-01 22:12:12 -08:00
committed by GitHub
parent b9ad79d66f
commit fbc545c03b
8 changed files with 381 additions and 46 deletions
+3
View File
@@ -1,3 +1,6 @@
docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/rllib/examples/custom_eval.py --custom-eval
docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/rllib/tests/test_catalog.py
+4
View File
@@ -212,6 +212,10 @@ You can mix supervised losses into any RLlib algorithm through custom models. Fo
**PyTorch**: There is no explicit API for adding losses to custom torch models. However, you can modify the loss in the policy definition directly. Like for TF models, offline datasets can be incorporated by creating an input reader and calling ``reader.next()`` in the loss forward pass.
Self-Supervised Model Losses
----------------------------
You can also use the ``custom_loss()`` API to add in self-supervised losses such as VAE reconstruction loss and L2-regularization.
Variable-length / Parametric Action Spaces
------------------------------------------
+1 -1
View File
@@ -13,7 +13,7 @@ Example: Training on previously saved experiences
.. note::
For custom models and enviroments, you'll need to use the `Python API <rllib-training.html#python-api>`__.
For custom models and enviroments, you'll need to use the `Python API <rllib-training.html#basic-python-api>`__.
In this example, we will save batches of experiences generated during online training to disk, and then leverage this saved data to train a policy offline using DQN. First, we run a simple policy gradient algorithm for 100k steps with ``"output": "/tmp/cartpole-out"`` to tell RLlib to write simulation outputs to the ``/tmp/cartpole-out`` directory.
+11 -3
View File
@@ -4,6 +4,9 @@ RLlib Table of Contents
Training APIs
-------------
* `Command-line <rllib-training.html>`__
- `Evaluating Trained Policies <rllib-training.html#evaluating-trained-policies>`__
* `Configuration <rllib-training.html#configuration>`__
- `Specifying Parameters <rllib-training.html#specifying-parameters>`__
@@ -14,9 +17,7 @@ Training APIs
- `Tuned Examples <rllib-training.html#tuned-examples>`__
* `Python API <rllib-training.html#python-api>`__
- `Custom Training Workflows <rllib-training.html#custom-training-workflows>`__
* `Basic Python API <rllib-training.html#basic-python-api>`__
- `Computing Actions <rllib-training.html#computing-actions>`__
@@ -24,10 +25,16 @@ Training APIs
- `Accessing Model State <rllib-training.html#accessing-model-state>`__
* `Advanced Python APIs <rllib-training.html#advanced-python-apis>`__
- `Custom Training Workflows <rllib-training.html#custom-training-workflows>`__
- `Global Coordination <rllib-training.html#global-coordination>`__
- `Callbacks and Custom Metrics <rllib-training.html#callbacks-and-custom-metrics>`__
- `Customized Evaluation During Training <rllib-training.html#customized-evaluation-during-training>`__
- `Rewriting Trajectories <rllib-training.html#rewriting-trajectories>`__
- `Curriculum Learning <rllib-training.html#curriculum-learning>`__
@@ -64,6 +71,7 @@ Models, Preprocessors, and Action Distributions
* `Custom Preprocessors <rllib-models.html#custom-preprocessors>`__
* `Custom Action Distributions <rllib-models.html#custom-action-distributions>`__
* `Supervised Model Losses <rllib-models.html#supervised-model-losses>`__
* `Self-Supervised Model Losses <rllib-models.html#self-supervised-model-losses>`__
* `Variable-length / Parametric Action Spaces <rllib-models.html#variable-length-parametric-action-spaces>`__
* `Autoregressive Action Distributions <rllib-models.html#autoregressive-action-distributions>`__
+76 -9
View File
@@ -61,6 +61,8 @@ and renders its behavior in the environment specified by ``--env``.
(Type ``rllib rollout --help`` to see the available evaluation options.)
For more advanced evaluation functionality, refer to `Customized Evaluation During Training <#customized-evaluation-during-training>`__.
Configuration
-------------
@@ -107,8 +109,8 @@ You can run these with the ``rllib train`` command as follows:
rllib train -f /path/to/tuned/example.yaml
Python API
----------
Basic Python API
----------------
The Python API provides the needed flexibility for applying RLlib to new problems. You will need to use this API if you wish to use `custom environments, preprocessors, or models <rllib-models.html>`__ with RLlib.
@@ -177,13 +179,6 @@ Tune will schedule the trials to run in parallel on your Ray cluster:
- PPO_CartPole-v0_0_lr=0.01: RUNNING [pid=21940], 16 s, 4013 ts, 22 rew
- PPO_CartPole-v0_1_lr=0.001: RUNNING [pid=21942], 27 s, 8111 ts, 54.7 rew
Custom Training Workflows
~~~~~~~~~~~~~~~~~~~~~~~~~
In the `basic training example <https://github.com/ray-project/ray/blob/master/rllib/examples/custom_env.py>`__, Tune will call ``train()`` on your trainer once per training iteration and report the new training results. Sometimes, it is desirable to have full control over training, but still run inside Tune. Tune supports `custom trainable functions <tune-usage.html#trainable-api>`__ that can be used to implement `custom training workflows (example) <https://github.com/ray-project/ray/blob/master/rllib/examples/custom_train_fn.py>`__.
For even finer-grained control over training, you can use RLlib's lower-level `building blocks <rllib-concepts.html>`__ directly to implement `fully customized training workflows <https://github.com/ray-project/ray/blob/master/rllib/examples/rollout_worker_custom_workflow.py>`__.
Computing Actions
~~~~~~~~~~~~~~~~~
@@ -431,6 +426,16 @@ Similar to accessing policy state, you may want to get a reference to the underl
This is especially useful when used with `custom model classes <rllib-models.html>`__.
Advanced Python APIs
--------------------
Custom Training Workflows
~~~~~~~~~~~~~~~~~~~~~~~~~
In the `basic training example <https://github.com/ray-project/ray/blob/master/rllib/examples/custom_env.py>`__, Tune will call ``train()`` on your trainer once per training iteration and report the new training results. Sometimes, it is desirable to have full control over training, but still run inside Tune. Tune supports `custom trainable functions <tune-usage.html#trainable-api>`__ that can be used to implement `custom training workflows (example) <https://github.com/ray-project/ray/blob/master/rllib/examples/custom_train_fn.py>`__.
For even finer-grained control over training, you can use RLlib's lower-level `building blocks <rllib-concepts.html>`__ directly to implement `fully customized training workflows <https://github.com/ray-project/ray/blob/master/rllib/examples/rollout_worker_custom_workflow.py>`__.
Global Coordination
~~~~~~~~~~~~~~~~~~~
Sometimes, it is necessary to coordinate between pieces of code that live in different processes managed by RLlib. For example, it can be useful to maintain a global average of a certain variable, or centrally control a hyperparameter used by policies. Ray provides a general way to achieve this through *named actors* (learn more about Ray actors `here <actors.html>`__). As an example, consider maintaining a shared global counter that is incremented by environments and read periodically from your driver program:
@@ -515,6 +520,68 @@ Custom metrics can be accessed and visualized like any other training result:
.. image:: custom_metric.png
Customized Evaluation During Training
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
RLlib will report online training rewards, however in some cases you may want to compute
rewards with different settings (e.g., with exploration turned off, or on a specific set
of environment configurations). You can evaluate policies during training by setting one
or more of the ``evaluation_interval``, ``evaluation_num_episodes``, ``evaluation_config``,
``evaluation_num_workers``, and ``custom_eval_function`` configs
(see `trainer.py <https://github.com/ray-project/ray/blob/master/rllib/agents/trainer.py>`__ for further documentation).
There is an end to end example of how to set up custom online evaluation in `custom_eval.py <https://github.com/ray-project/ray/blob/master/rllib/examples/custom_eval.py>`__. Note that if you only want to eval your policy at the end of training, you can set ``evaluation_interval: N``, where ``N`` is the number of training iterations before stopping.
Below are some examples of how the custom evaluation metrics are reported nested under the ``evaluation`` key of normal training results:
.. code-block:: bash
------------------------------------------------------------------------
Sample output for `python custom_eval.py`
------------------------------------------------------------------------
INFO trainer.py:623 -- Evaluating current policy for 10 episodes.
INFO trainer.py:650 -- Running round 0 of parallel evaluation (2/10 episodes)
INFO trainer.py:650 -- Running round 1 of parallel evaluation (4/10 episodes)
INFO trainer.py:650 -- Running round 2 of parallel evaluation (6/10 episodes)
INFO trainer.py:650 -- Running round 3 of parallel evaluation (8/10 episodes)
INFO trainer.py:650 -- Running round 4 of parallel evaluation (10/10 episodes)
Result for PG_SimpleCorridor_2c6b27dc:
...
evaluation:
custom_metrics: {}
episode_len_mean: 15.864661654135338
episode_reward_max: 1.0
episode_reward_mean: 0.49624060150375937
episode_reward_min: 0.0
episodes_this_iter: 133
.. code-block:: bash
------------------------------------------------------------------------
Sample output for `python custom_eval.py --custom-eval`
------------------------------------------------------------------------
INFO trainer.py:631 -- Running custom eval function <function ...>
Update corridor length to 4
Update corridor length to 7
Custom evaluation round 1
Custom evaluation round 2
Custom evaluation round 3
Custom evaluation round 4
Result for PG_SimpleCorridor_0de4e686:
...
evaluation:
custom_metrics: {}
episode_len_mean: 9.15695067264574
episode_reward_max: 1.0
episode_reward_mean: 0.9596412556053812
episode_reward_min: 0.0
episodes_this_iter: 223
foo: 1
Rewriting Trajectories
~~~~~~~~~~~~~~~~~~~~~~
+84 -31
View File
@@ -1,6 +1,7 @@
from datetime import datetime
import copy
import logging
import math
import os
import pickle
import six
@@ -170,13 +171,30 @@ COMMON_CONFIG = {
# Note that evaluation is currently not parallelized, and that for Ape-X
# metrics are already only reported for the lowest epsilon workers.
"evaluation_interval": None,
# Number of episodes to run per evaluation period.
# Number of episodes to run per evaluation period. If using multiple
# evaluation workers, we will run at least this many episodes total.
"evaluation_num_episodes": 10,
# Extra arguments to pass to evaluation workers.
# Typical usage is to pass extra args to evaluation env creator
# and to disable exploration by computing deterministic actions
# TODO(kismuz): implement determ. actions and include relevant keys hints
"evaluation_config": {},
"evaluation_config": {
# Example: overriding env_config, exploration, etc:
# "env_config": {...},
# "exploration_fraction": 0,
# "exploration_final_eps": 0,
},
# Number of parallel workers to use for evaluation. Note that this is set
# to zero by default, which means evaluation will be run in the trainer
# process. If you increase this, it will increase the Ray resource usage
# of the trainer since evaluation workers are created separately from
# rollout workers.
"evaluation_num_workers": 0,
# Customize the evaluation method. This must be a function of signature
# (trainer: Trainer, eval_workers: WorkerSet) -> metrics: dict. See the
# Trainer._evaluate() method to see the default implementation. The
# trainer guarantees all eval workers have the latest policy state before
# this function is called.
"custom_eval_function": None,
# === Advanced Rollout Settings ===
# Use a background thread for sampling (slightly off-policy, usually not
@@ -408,17 +426,18 @@ class Trainer(Trainable):
def default_resource_request(cls, config):
cf = dict(cls._default_config, **config)
Trainer._validate_config(cf)
num_workers = cf["num_workers"] + cf["evaluation_num_workers"]
# TODO(ekl): add custom resources here once tune supports them
return Resources(
cpu=cf["num_cpus_for_driver"],
gpu=cf["num_gpus"],
memory=cf["memory"],
object_store_memory=cf["object_store_memory"],
extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"],
extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"],
extra_memory=cf["memory_per_worker"] * cf["num_workers"],
extra_cpu=cf["num_cpus_per_worker"] * num_workers,
extra_gpu=cf["num_gpus_per_worker"] * num_workers,
extra_memory=cf["memory_per_worker"] * num_workers,
extra_object_store_memory=cf["object_store_memory_per_worker"] *
cf["num_workers"])
num_workers)
@override(Trainable)
@PublicAPI
@@ -456,29 +475,32 @@ class Trainer(Trainable):
if result is None:
raise RuntimeError("Failed to recover from worker crash")
if (self.config.get("observation_filter", "NoFilter") != "NoFilter"
and hasattr(self, "workers")
and isinstance(self.workers, WorkerSet)):
FilterManager.synchronize(
self.workers.local_worker().filters,
self.workers.remote_workers(),
update_remote=self.config["synchronize_filters"])
logger.debug("synchronized filters: {}".format(
self.workers.local_worker().filters))
if hasattr(self, "workers") and isinstance(self.workers, WorkerSet):
self._sync_filters_if_needed(self.workers)
if self._has_policy_optimizer():
result["num_healthy_workers"] = len(
self.optimizer.workers.remote_workers())
if self.config["evaluation_interval"]:
if self._iteration % self.config["evaluation_interval"] == 0:
evaluation_metrics = self._evaluate()
assert isinstance(evaluation_metrics, dict), \
"_evaluate() needs to return a dict."
result.update(evaluation_metrics)
if self.config["evaluation_interval"] == 1 or (
self._iteration > 0 and self.config["evaluation_interval"]
and self._iteration % self.config["evaluation_interval"] == 0):
evaluation_metrics = self._evaluate()
assert isinstance(evaluation_metrics, dict), \
"_evaluate() needs to return a dict."
result.update(evaluation_metrics)
return result
def _sync_filters_if_needed(self, workers):
if self.config.get("observation_filter", "NoFilter") != "NoFilter":
FilterManager.synchronize(
workers.local_worker().filters,
workers.remote_workers(),
update_remote=self.config["synchronize_filters"])
logger.debug("synchronized filters: {}".format(
workers.local_worker().filters))
@override(Trainable)
def _log_result(self, result):
if self.config["callbacks"].get("on_train_result"):
@@ -548,8 +570,8 @@ class Trainer(Trainable):
self.env_creator,
self._policy,
merge_dicts(self.config, extra_config),
num_workers=0)
self.evaluation_metrics = self._evaluate()
num_workers=self.config["evaluation_num_workers"])
self.evaluation_metrics = {}
@override(Trainable)
def _stop(self):
@@ -600,15 +622,46 @@ class Trainer(Trainable):
"overrides, since the results will be the "
"same as reported during normal policy evaluation.")
logger.info("Evaluating current policy for {} episodes".format(
self.config["evaluation_num_episodes"]))
self._before_evaluate()
self.evaluation_workers.local_worker().restore(
self.workers.local_worker().save())
for _ in range(self.config["evaluation_num_episodes"]):
self.evaluation_workers.local_worker().sample()
metrics = collect_metrics(self.evaluation_workers.local_worker())
# Broadcast the new policy weights to all evaluation workers.
logger.info("Synchronizing weights to evaluation workers.")
weights = ray.put(self.workers.local_worker().save())
self.evaluation_workers.foreach_worker(
lambda w: w.restore(ray.get(weights)))
self._sync_filters_if_needed(self.evaluation_workers)
if self.config["custom_eval_function"]:
logger.info("Running custom eval function {}".format(
self.config["custom_eval_function"]))
metrics = self.config["custom_eval_function"](
self, self.evaluation_workers)
if not metrics or not isinstance(metrics, dict):
raise ValueError("Custom eval function must return "
"dict of metrics, got {}.".format(metrics))
else:
logger.info("Evaluating current policy for {} episodes.".format(
self.config["evaluation_num_episodes"]))
if self.config["evaluation_num_workers"] == 0:
for _ in range(self.config["evaluation_num_episodes"]):
self.evaluation_workers.local_worker().sample()
else:
num_rounds = int(
math.ceil(self.config["evaluation_num_episodes"] /
self.config["evaluation_num_workers"]))
num_workers = len(self.evaluation_workers.remote_workers())
num_episodes = num_rounds * num_workers
for i in range(num_rounds):
logger.info("Running round {} of parallel evaluation "
"({}/{} episodes)".format(
i, (i + 1) * num_workers, num_episodes))
ray.get([
w.sample.remote()
for w in self.evaluation_workers.remote_workers()
])
metrics = collect_metrics(self.evaluation_workers.local_worker(),
self.evaluation_workers.remote_workers())
return {"evaluation": metrics}
@DeveloperAPI
+6 -2
View File
@@ -81,14 +81,18 @@ def collect_episodes(local_worker=None,
@DeveloperAPI
def summarize_episodes(episodes, new_episodes):
def summarize_episodes(episodes, new_episodes=None):
"""Summarizes a set of episode metrics tuples.
Arguments:
episodes: smoothed set of episodes including historical ones
new_episodes: just the new episodes in this iteration
new_episodes: just the new episodes in this iteration. This must be
a subset of `episodes`. If None, assumes all episodes are new.
"""
if new_episodes is None:
new_episodes = episodes
episodes, estimates = _partition(episodes)
new_episodes, _ = _partition(new_episodes)
+196
View File
@@ -0,0 +1,196 @@
"""Example of customizing evaluation with RLlib.
Pass --custom-eval to run with a custom evaluation function too.
Here we define a custom evaluation method that runs a specific sweep of env
parameters (SimpleCorridor corridor lengths).
------------------------------------------------------------------------
Sample output for `python custom_eval.py`
------------------------------------------------------------------------
INFO trainer.py:623 -- Evaluating current policy for 10 episodes.
INFO trainer.py:650 -- Running round 0 of parallel evaluation (2/10 episodes)
INFO trainer.py:650 -- Running round 1 of parallel evaluation (4/10 episodes)
INFO trainer.py:650 -- Running round 2 of parallel evaluation (6/10 episodes)
INFO trainer.py:650 -- Running round 3 of parallel evaluation (8/10 episodes)
INFO trainer.py:650 -- Running round 4 of parallel evaluation (10/10 episodes)
Result for PG_SimpleCorridor_2c6b27dc:
...
evaluation:
custom_metrics: {}
episode_len_mean: 15.864661654135338
episode_reward_max: 1.0
episode_reward_mean: 0.49624060150375937
episode_reward_min: 0.0
episodes_this_iter: 133
off_policy_estimator: {}
policy_reward_max: {}
policy_reward_mean: {}
policy_reward_min: {}
sampler_perf:
mean_env_wait_ms: 0.0362923321333299
mean_inference_ms: 0.6319202064080927
mean_processing_ms: 0.14143652169068222
------------------------------------------------------------------------
Sample output for `python custom_eval.py --custom-eval`
------------------------------------------------------------------------
INFO trainer.py:631 -- Running custom eval function <function ...>
Update corridor length to 4
Update corridor length to 7
Custom evaluation round 1
Custom evaluation round 2
Custom evaluation round 3
Custom evaluation round 4
Result for PG_SimpleCorridor_0de4e686:
...
evaluation:
custom_metrics: {}
episode_len_mean: 9.15695067264574
episode_reward_max: 1.0
episode_reward_mean: 0.9596412556053812
episode_reward_min: 0.0
episodes_this_iter: 223
foo: 1
off_policy_estimator: {}
policy_reward_max: {}
policy_reward_mean: {}
policy_reward_min: {}
sampler_perf:
mean_env_wait_ms: 0.03423667269562796
mean_inference_ms: 0.5654563161491506
mean_processing_ms: 0.14494765630060774
"""
import argparse
import numpy as np
import gym
from gym.spaces import Discrete, Box
import ray
from ray import tune
from ray.rllib.evaluation.metrics import collect_episodes, summarize_episodes
parser = argparse.ArgumentParser()
parser.add_argument("--custom-eval", action="store_true")
args = parser.parse_args()
def custom_eval_function(trainer, eval_workers):
"""Example of a custom evaluation function.
Arguments:
trainer (Trainer): trainer class to evaluate.
eval_workers (WorkerSet): evaluation workers.
Returns:
metrics (dict): evaluation metrics dict.
"""
# We configured 2 eval workers in the training config.
worker_1, worker_2 = eval_workers.remote_workers()
# Set different env settings for each worker. Here we use a fixed config,
# which also could have been computed in each worker by looking at
# env_config.worker_index (printed in SimpleCorridor class above).
worker_1.foreach_env.remote(lambda env: env.set_corridor_length(4))
worker_2.foreach_env.remote(lambda env: env.set_corridor_length(7))
for i in range(5):
print("Custom evaluation round", i)
# Calling .sample() runs exactly one episode per worker due to how the
# eval workers are configured.
ray.get([w.sample.remote() for w in eval_workers.remote_workers()])
# Collect the accumulated episodes on the workers, and then summarize the
# episode stats into a metrics dict.
episodes, _ = collect_episodes(
remote_workers=eval_workers.remote_workers(), timeout_seconds=99999)
# You can compute metrics from the episodes manually, or use the
# convenient `summarize_episodes()` utility:
metrics = summarize_episodes(episodes)
# Note that the above two statements are the equivalent of:
# metrics = collect_metrics(eval_workers.local_worker(),
# eval_workers.remote_workers())
# You can also put custom values in the metrics dict.
metrics["foo"] = 1
return metrics
class SimpleCorridor(gym.Env):
"""Custom env we use for this example."""
def __init__(self, env_config):
self.end_pos = env_config["corridor_length"]
self.cur_pos = 0
self.action_space = Discrete(2)
self.observation_space = Box(0.0, 9999, shape=(1, ), dtype=np.float32)
print("Created env for worker index", env_config.worker_index,
"with corridor length", self.end_pos)
def set_corridor_length(self, length):
print("Update corridor length to", length)
self.end_pos = length
def reset(self):
self.cur_pos = 0
return [self.cur_pos]
def step(self, action):
assert action in [0, 1], action
if action == 0 and self.cur_pos > 0:
self.cur_pos -= 1
elif action == 1:
self.cur_pos += 1
done = self.cur_pos >= self.end_pos
return [self.cur_pos], 1 if done else 0, done, {}
if __name__ == "__main__":
if args.custom_eval:
eval_fn = custom_eval_function
else:
eval_fn = None
tune.run(
"PG",
stop={
"training_iteration": 10,
},
config={
"env": SimpleCorridor,
"env_config": {
"corridor_length": 10,
},
"horizon": 20,
"log_level": "INFO",
# Training rollouts will be collected using just the learner
# process, but evaluation will be done in parallel with two
# workers. Hence, this run will use 3 CPUs total (1 for the
# learner + 2 more for evaluation workers).
"num_workers": 0,
"evaluation_num_workers": 2,
# Optional custom eval function.
"custom_eval_function": eval_fn,
# Enable evaluation, once per training iteration.
"evaluation_interval": 1,
# Run 10 episodes each time evaluation runs.
"evaluation_num_episodes": 10,
# Override the env config for evaluation.
"evaluation_config": {
"env_config": {
# Evaluate using LONGER corridor than trained on.
"corridor_length": 5,
},
},
})