[rllib] Document "v2" APIs (#2316)

* re

* wip

* wip

* a3c working

* torch support

* pg works

* lint

* rm v2

* consumer id

* clean up pg

* clean up more

* fix python 2.7

* tf session management

* docs

* dqn wip

* fix compile

* dqn

* apex runs

* up

* impotrs

* ddpg

* quotes

* fix tests

* fix last r

* fix tests

* lint

* pass checkpoint restore

* kwar

* nits

* policy graph

* fix yapf

* com

* class

* pyt

* vectorization

* update

* test cpe

* unit test

* fix ddpg2

* changes

* wip

* args

* faster test

* common

* fix

* add alg option

* batch mode and policy serving

* multi serving test

* todo

* wip

* serving test

* doc async env

* num envs

* comments

* thread

* remove init hook

* update

* fix ppo

* comments1

* fix

* updates

* add jenkins tests

* fix

* fix pytorch

* fix

* fixes

* fix a3c policy

* fix squeeze

* fix trunc on apex

* fix squeezing for real

* update

* remove horizon test for now

* multiagent wip

* update

* fix race condition

* fix ma

* t

* doc

* st

* wip

* example

* wip

* working

* cartpole

* wip

* batch wip

* fix bug

* make other_batches None default

* working

* debug

* nit

* warn

* comments

* fix ppo

* fix obs filter

* update

* wip

* tf

* update

* fix

* cleanup

* cleanup

* spacing

* model

* fix

* dqn

* fix ddpg

* doc

* keep names

* update

* fix

* com

* docs

* clarify model outputs

* Update torch_policy_graph.py

* fix obs filter

* pass thru worker index

* fix

* rename

* vlad torch comments

* fix log action

* debug name

* fix lstm

* remove unused ddpg net

* remove conv net

* revert lstm

* wip

* wip

* cast

* wip

* works

* fix a3c

* works

* lstm util test

* doc

* clean up

* update

* fix lstm check

* move to end

* fix sphinx

* fix cmd

* remove bad doc

* envs

* vec

* doc prep

* models

* rl

* alg

* up

* clarify

* copy

* async sa

* fix

* comments

* fix a3c conf

* tune lstm

* fix reshape

* fix

* back to 16

* tuned a3c update

* update

* tuned

* optional

* merge

* wip

* fix up

* move pg class

* rename env

* wip

* update

* tip

* alg

* readme

* fix catalog

* readme

* doc

* context

* remove prep

* comma

* add env

* link to paper

* paper

* update

* rnn

* update

* wip

* clean up ev creation

* fix

* fix

* fix

* fix lint

* up

* no comma

* ma

* Update run_multi_node_tests.sh

* fix

* sphinx is stupid

* sphinx is stupid

* clarify torch graph

* no horizon

* fix config

* sb

* Update test_optimizers.py
This commit is contained in:
Eric Liang
2018-07-01 00:05:08 -07:00
committed by GitHub
parent 762bdf646e
commit 8aa56c12e6
113 changed files with 1148 additions and 1141 deletions
+1 -1
View File
@@ -22,7 +22,7 @@ def _internal_kv_put(key, value, overwrite=False):
This only has an effect if the key does not already have a value.
Returns
Returns:
already_exists (bool): whether the value already exists.
"""
+21 -18
View File
@@ -1,22 +1,25 @@
Ray RLlib: Scalable Reinforcement Learning
==========================================
RLlib: Scalable Reinforcement Learning
======================================
Ray RLlib is an RL execution toolkit built on the Ray distributed execution framework. See the `user documentation <http://ray.readthedocs.io/en/latest/rllib.html>`__ and `paper <https://arxiv.org/abs/1712.09381>`__.
RLlib is an open-source library for reinforcement learning that offers both a collection of reference algorithms and scalable primitives for composing new ones.
RLlib includes the following reference algorithms:
For an overview of RLlib, see the `documentation <http://ray.readthedocs.io/en/latest/rllib.html>`__.
- Proximal Policy Optimization (`PPO <https://github.com/ray-project/ray/tree/master/python/ray/rllib/ppo>`__) which is a proximal variant of `TRPO <https://arxiv.org/abs/1502.05477>`__.
If you've found RLlib useful for your research, you can cite the `paper <https://arxiv.org/abs/1712.09381>`__ as follows:
- Policy Gradients (`PG <https://github.com/ray-project/ray/tree/master/python/ray/rllib/pg>`__).
- Asynchronous Advantage Actor-Critic (`A3C <https://github.com/ray-project/ray/tree/master/python/ray/rllib/a3c>`__).
- Deep Q Networks (`DQN <https://github.com/ray-project/ray/tree/master/python/ray/rllib/dqn>`__).
- Deep Deterministic Policy Gradients (`DDPG <https://github.com/ray-project/ray/tree/master/python/ray/rllib/ddpg>`__).
- Ape-X Distributed Prioritized Experience Replay, including both `DQN <https://github.com/ray-project/ray/blob/master/python/ray/rllib/dqn/apex.py>`__ and `DDPG <https://github.com/ray-project/ray/blob/master/python/ray/rllib/ddpg/apex.py>`__ variants.
- Evolution Strategies (`ES <https://github.com/ray-project/ray/tree/master/python/ray/rllib/es>`__), as described in `this paper <https://arxiv.org/abs/1703.03864>`__.
These algorithms can be run on any OpenAI Gym MDP, including custom ones written and registered by the user.
```
@inproceedings{liang2018rllib,
Author = {Eric Liang and
Richard Liaw and
Robert Nishihara and
Philipp Moritz and
Roy Fox and
Ken Goldberg and
Joseph E. Gonzalez and
Michael I. Jordan and
Ion Stoica},
Title = {{RLlib}: Abstractions for Distributed Reinforcement Learning},
Booktitle = {International Conference on Machine Learning ({ICML})},
Year = {2018}
}
```
+10 -9
View File
@@ -6,20 +6,21 @@ from __future__ import print_function
# This file is imported from the tune module in order to register RLlib agents.
from ray.tune.registry import register_trainable
from ray.rllib.utils.policy_graph import PolicyGraph
from ray.rllib.utils.tf_policy_graph import TFPolicyGraph
from ray.rllib.utils.common_policy_evaluator import CommonPolicyEvaluator
from ray.rllib.utils.async_vector_env import AsyncVectorEnv
from ray.rllib.utils.vector_env import VectorEnv
from ray.rllib.utils.serving_env import ServingEnv
from ray.rllib.optimizers.sample_batch import SampleBatch
from ray.rllib.evaluation.policy_graph import PolicyGraph
from ray.rllib.evaluation.tf_policy_graph import TFPolicyGraph
from ray.rllib.env.async_vector_env import AsyncVectorEnv
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.rllib.env.vector_env import VectorEnv
from ray.rllib.env.serving_env import ServingEnv
from ray.rllib.evaluation.common_policy_evaluator import CommonPolicyEvaluator
from ray.rllib.evaluation.sample_batch import SampleBatch
def _register_all():
for key in ["PPO", "ES", "DQN", "APEX", "A3C", "BC", "PG", "DDPG",
"APEX_DDPG", "__fake", "__sigmoid_fake_data",
"__parameter_tuning"]:
from ray.rllib.agent import get_agent_class
from ray.rllib.agents.agent import get_agent_class
register_trainable(key, get_agent_class(key))
@@ -27,5 +28,5 @@ _register_all()
__all__ = [
"PolicyGraph", "TFPolicyGraph", "CommonPolicyEvaluator", "SampleBatch",
"AsyncVectorEnv", "VectorEnv", "ServingEnv",
"AsyncVectorEnv", "MultiAgentEnv", "VectorEnv", "ServingEnv",
]
-3
View File
@@ -1,3 +0,0 @@
from ray.rllib.a3c.a3c import A3CAgent, DEFAULT_CONFIG
__all__ = ["A3CAgent", "DEFAULT_CONFIG"]
+3
View File
@@ -0,0 +1,3 @@
from ray.rllib.agents.agent import Agent, with_common_config
__all__ = ["Agent", "with_common_config"]
+3
View File
@@ -0,0 +1,3 @@
from ray.rllib.agents.a3c.a3c import A3CAgent, DEFAULT_CONFIG
__all__ = ["A3CAgent", "DEFAULT_CONFIG"]
@@ -6,26 +6,17 @@ import pickle
import os
import ray
from ray.rllib.agent import Agent
from ray.rllib.agents.agent import Agent, with_common_config
from ray.rllib.optimizers import AsyncGradientsOptimizer
from ray.rllib.utils import FilterManager
from ray.rllib.utils.common_policy_evaluator import CommonPolicyEvaluator, \
collect_metrics
from ray.rllib.evaluation.metrics import collect_metrics
from ray.tune.trial import Resources
DEFAULT_CONFIG = {
# Number of workers (excluding master)
"num_workers": 2,
# Number of environments to evaluate vectorwise per worker.
"num_envs": 1,
DEFAULT_CONFIG = with_common_config({
# Size of rollout batch
"batch_size": 10,
"sample_batch_size": 10,
# Use PyTorch as backend - no LSTM support
"use_pytorch": False,
# Which observation filter to apply to the observation
"observation_filter": "NoFilter",
# Discount factor of MDP
"gamma": 0.99,
# GAE(gamma) parameter
"lambda": 1.0,
# Max global norm for each gradient calculated by worker
@@ -40,6 +31,8 @@ DEFAULT_CONFIG = {
"use_gpu_for_workers": False,
# Whether to emit extra summary stats
"summarize": False,
# Workers sample async
"sample_async": True,
# Model and preprocessor options
"model": {
# Use LSTM model. Requires TF.
@@ -55,23 +48,25 @@ DEFAULT_CONFIG = {
# (Image statespace) - Converts image shape to (C, dim, dim)
"channel_major": False,
},
# Configure TF for single-process operation
"tf_session_args": {
"intra_op_parallelism_threads": 1,
"inter_op_parallelism_threads": 1,
"gpu_options": {
"allow_growth": True,
},
},
# Arguments to pass to the rllib optimizer
"optimizer": {
# Number of gradients applied for each `train` step
"grads_per_step": 100,
},
# Arguments to pass to the env creator
"env_config": {},
# === Multiagent ===
"multiagent": {
"policy_graphs": {},
"policy_mapping_fn": None,
},
}
})
class A3CAgent(Agent):
"""A3C implementations in TensorFlow and PyTorch."""
_agent_name = "A3C"
_default_config = DEFAULT_CONFIG
@@ -86,51 +81,18 @@ class A3CAgent(Agent):
def _init(self):
if self.config["use_pytorch"]:
from ray.rllib.a3c.a3c_torch_policy import A3CTorchPolicyGraph
self.policy_cls = A3CTorchPolicyGraph
from ray.rllib.agents.a3c.a3c_torch_policy import \
A3CTorchPolicyGraph
policy_cls = A3CTorchPolicyGraph
else:
from ray.rllib.a3c.a3c_tf_policy import A3CPolicyGraph
self.policy_cls = A3CPolicyGraph
if self.config["use_pytorch"]:
session_creator = None
else:
import tensorflow as tf
def session_creator():
return tf.Session(
config=tf.ConfigProto(
intra_op_parallelism_threads=1,
inter_op_parallelism_threads=1,
gpu_options=tf.GPUOptions(allow_growth=True)))
remote_cls = CommonPolicyEvaluator.as_remote(
num_gpus=1 if self.config["use_gpu_for_workers"] else 0)
self.local_evaluator = CommonPolicyEvaluator(
self.env_creator,
self.config["multiagent"]["policy_graphs"] or self.policy_cls,
policy_mapping_fn=self.config["multiagent"]["policy_mapping_fn"],
batch_steps=self.config["batch_size"],
batch_mode="truncate_episodes",
tf_session_creator=session_creator,
env_config=self.config["env_config"],
model_config=self.config["model"], policy_config=self.config,
num_envs=self.config["num_envs"])
self.remote_evaluators = [
remote_cls.remote(
self.env_creator,
self.config["multiagent"]["policy_graphs"] or self.policy_cls,
policy_mapping_fn=(
self.config["multiagent"]["policy_mapping_fn"]),
batch_steps=self.config["batch_size"],
batch_mode="truncate_episodes", sample_async=True,
tf_session_creator=session_creator,
env_config=self.config["env_config"],
model_config=self.config["model"], policy_config=self.config,
num_envs=self.config["num_envs"],
worker_index=i+1)
for i in range(self.config["num_workers"])]
from ray.rllib.agents.a3c.a3c_tf_policy import A3CPolicyGraph
policy_cls = A3CPolicyGraph
self.local_evaluator = self.make_local_evaluator(
self.env_creator, policy_cls)
self.remote_evaluators = self.make_remote_evaluators(
self.env_creator, policy_cls, self.config["num_workers"],
{"num_gpus": 1 if self.config["use_gpu_for_workers"] else 0})
self.optimizer = AsyncGradientsOptimizer(
self.config["optimizer"], self.local_evaluator,
self.remote_evaluators)
@@ -168,12 +130,3 @@ class A3CAgent(Agent):
for a, o in zip(self.remote_evaluators, extra_data["remote_state"])
])
self.local_evaluator.restore(extra_data["local_state"])
def compute_action(self, observation, state=None):
if state is None:
state = []
obs = self.local_evaluator.filters["default"](
observation, update=False)
return self.local_evaluator.for_policy(
lambda p: p.compute_single_action(
obs, state, is_training=False)[0])
@@ -7,8 +7,8 @@ import gym
import ray
from ray.rllib.utils.error import UnsupportedSpaceException
from ray.rllib.utils.postprocessing import compute_advantages
from ray.rllib.utils.tf_policy_graph import TFPolicyGraph
from ray.rllib.evaluation.postprocessing import compute_advantages
from ray.rllib.evaluation.tf_policy_graph import TFPolicyGraph
from ray.rllib.models.misc import linear, normc_initializer
from ray.rllib.models.catalog import ModelCatalog
@@ -32,7 +32,7 @@ class A3CLoss(object):
class A3CPolicyGraph(TFPolicyGraph):
def __init__(self, observation_space, action_space, config):
config = dict(ray.rllib.a3c.a3c.DEFAULT_CONFIG, **config)
config = dict(ray.rllib.agents.a3c.a3c.DEFAULT_CONFIG, **config)
self.config = config
self.sess = tf.get_default_session()
@@ -9,8 +9,8 @@ from torch import nn
import ray
from ray.rllib.models.pytorch.misc import var_to_np
from ray.rllib.models.catalog import ModelCatalog
from ray.rllib.utils.postprocessing import compute_advantages
from ray.rllib.utils.torch_policy_graph import TorchPolicyGraph
from ray.rllib.evaluation.postprocessing import compute_advantages
from ray.rllib.evaluation.torch_policy_graph import TorchPolicyGraph
class A3CLoss(nn.Module):
@@ -40,7 +40,7 @@ class A3CTorchPolicyGraph(TorchPolicyGraph):
"""A simple, non-recurrent PyTorch policy example."""
def __init__(self, obs_space, action_space, config):
config = dict(ray.rllib.a3c.a3c.DEFAULT_CONFIG, **config)
config = dict(ray.rllib.agents.a3c.a3c.DEFAULT_CONFIG, **config)
self.config = config
_, self.logit_dim = ModelCatalog.get_action_dist(
action_space, self.config["model"])
@@ -2,19 +2,62 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import numpy as np
import copy
import json
import numpy as np
import os
import pickle
import tensorflow as tf
from ray.rllib.evaluation.common_policy_evaluator import CommonPolicyEvaluator
from ray.tune.registry import ENV_CREATOR, _global_registry
from ray.tune.result import TrainingResult
from ray.tune.trainable import Trainable
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
COMMON_CONFIG = {
# Discount factor of the MDP
"gamma": 0.99,
# Number of steps after which the rollout gets cut
"horizon": None,
# Number of environments to evaluate vectorwise per worker.
"num_envs": 1,
# Number of actors used for parallelism
"num_workers": 2,
# Default sample batch size
"sample_batch_size": 200,
# Whether to rollout "complete_episodes" or "truncate_episodes"
"batch_mode": "truncate_episodes",
# Whether to use a background thread for sampling (slightly off-policy)
"sample_async": False,
# Which observation filter to apply to the observation
"observation_filter": "NoFilter",
# Whether to use rllib or deepmind preprocessors
"preprocessor_pref": "rllib",
# Arguments to pass to the env creator
"env_config": {},
# Arguments to pass to model
"model": {},
# Arguments to pass to the rllib optimizer
"optimizer": {},
# Override default TF session args if non-empty
"tf_session_args": {},
# Whether to LZ4 compress observations
"compress_observations": False,
# === Multiagent ===
"multiagent": {
"policy_graphs": {},
"policy_mapping_fn": None,
},
}
def with_common_config(extra_config):
"""Returns the given config dict merged with common agent confs."""
config = copy.deepcopy(COMMON_CONFIG)
config.update(extra_config)
return config
def _deep_update(original, new_dict, new_keys_allowed, whitelist):
@@ -62,6 +105,47 @@ class Agent(Trainable):
_allow_unknown_subkeys = [
"tf_session_args", "env_config", "model", "optimizer", "multiagent"]
def make_local_evaluator(self, env_creator, policy_graph):
"""Convenience method to return configured local evaluator."""
return self._make_evaluator(
CommonPolicyEvaluator, env_creator, policy_graph, 0)
def make_remote_evaluators(
self, env_creator, policy_graph, count, remote_args):
"""Convenience method to return a number of remote evaluators."""
cls = CommonPolicyEvaluator.as_remote(**remote_args).remote
return [
self._make_evaluator(cls, env_creator, policy_graph, i+1)
for i in range(count)]
def _make_evaluator(self, cls, env_creator, policy_graph, worker_index):
config = self.config
def session_creator():
return tf.Session(
config=tf.ConfigProto(**config["tf_session_args"]))
return cls(
env_creator,
self.config["multiagent"]["policy_graphs"] or policy_graph,
policy_mapping_fn=self.config["multiagent"]["policy_mapping_fn"],
tf_session_creator=(
session_creator if config["tf_session_args"] else None),
batch_steps=config["sample_batch_size"],
batch_mode=config["batch_mode"],
episode_horizon=config["horizon"],
preprocessor_pref=config["preprocessor_pref"],
sample_async=config["sample_async"],
compress_observations=config["compress_observations"],
num_envs=config["num_envs"],
observation_filter=config["observation_filter"],
env_config=config["env_config"],
model_config=config["model"],
policy_config=config,
worker_index=worker_index)
@classmethod
def resource_help(cls, config):
return (
@@ -116,11 +200,6 @@ class Agent(Trainable):
raise NotImplementedError
def compute_action(self, observation):
"""Computes an action using the current trained policy."""
raise NotImplementedError
@property
def iteration(self):
"""Current training iter, auto-incremented with each train() call."""
@@ -139,6 +218,17 @@ class Agent(Trainable):
raise NotImplementedError
def compute_action(self, observation, state=None):
"""Computes an action using the current trained policy."""
if state is None:
state = []
obs = self.local_evaluator.filters["default"](
observation, update=False)
return self.local_evaluator.for_policy(
lambda p: p.compute_single_action(
obs, state, is_training=False)[0])
class _MockAgent(Agent):
"""Mock agent for use in tests"""
@@ -228,31 +318,31 @@ def get_agent_class(alg):
"""Returns the class of a known agent given its name."""
if alg == "DDPG":
from ray.rllib import ddpg
from ray.rllib.agents import ddpg
return ddpg.DDPGAgent
elif alg == "APEX_DDPG":
from ray.rllib import ddpg
from ray.rllib.agents import ddpg
return ddpg.ApexDDPGAgent
elif alg == "PPO":
from ray.rllib import ppo
from ray.rllib.agents import ppo
return ppo.PPOAgent
elif alg == "ES":
from ray.rllib import es
from ray.rllib.agents import es
return es.ESAgent
elif alg == "DQN":
from ray.rllib import dqn
from ray.rllib.agents import dqn
return dqn.DQNAgent
elif alg == "APEX":
from ray.rllib import dqn
from ray.rllib.agents import dqn
return dqn.ApexAgent
elif alg == "A3C":
from ray.rllib import a3c
from ray.rllib.agents import a3c
return a3c.A3CAgent
elif alg == "BC":
from ray.rllib import bc
from ray.rllib.agents import bc
return bc.BCAgent
elif alg == "PG":
from ray.rllib import pg
from ray.rllib.agents import pg
return pg.PGAgent
elif alg == "script":
from ray.tune import script_runner
+3
View File
@@ -0,0 +1,3 @@
from ray.rllib.agents.bc.bc import BCAgent, DEFAULT_CONFIG
__all__ = ["BCAgent", "DEFAULT_CONFIG"]
@@ -3,9 +3,9 @@ from __future__ import division
from __future__ import print_function
import ray
from ray.rllib.agent import Agent
from ray.rllib.bc.bc_evaluator import BCEvaluator, GPURemoteBCEvaluator, \
RemoteBCEvaluator
from ray.rllib.agents.agent import Agent
from ray.rllib.agents.bc.bc_evaluator import BCEvaluator, \
GPURemoteBCEvaluator, RemoteBCEvaluator
from ray.rllib.optimizers import AsyncGradientsOptimizer
from ray.tune.result import TrainingResult
from ray.tune.trial import Resources
@@ -6,10 +6,10 @@ import pickle
from six.moves import queue
import ray
from ray.rllib.bc.experience_dataset import ExperienceDataset
from ray.rllib.bc.policy import BCPolicy
from ray.rllib.agents.bc.experience_dataset import ExperienceDataset
from ray.rllib.agents.bc.policy import BCPolicy
from ray.rllib.evaluation.interface import PolicyEvaluator
from ray.rllib.models import ModelCatalog
from ray.rllib.optimizers import PolicyEvaluator
class BCEvaluator(PolicyEvaluator):
@@ -2,7 +2,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.rllib.ddpg.apex import ApexDDPGAgent
from ray.rllib.ddpg.ddpg import DDPGAgent, DEFAULT_CONFIG
from ray.rllib.agents.ddpg.apex import ApexDDPGAgent
from ray.rllib.agents.ddpg.ddpg import DDPGAgent, DEFAULT_CONFIG
__all__ = ["DDPGAgent", "ApexDDPGAgent", "DEFAULT_CONFIG"]
@@ -2,16 +2,16 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.rllib.ddpg.ddpg import DDPGAgent, DEFAULT_CONFIG as DDPG_CONFIG
from ray.rllib.agents.ddpg.ddpg import DDPGAgent, DEFAULT_CONFIG as DDPG_CONFIG
from ray.utils import merge_dicts
APEX_DDPG_DEFAULT_CONFIG = merge_dicts(
DDPG_CONFIG,
{
"optimizer_class": "AsyncSamplesOptimizer",
"optimizer_config":
"optimizer":
merge_dicts(
DDPG_CONFIG["optimizer_config"], {
DDPG_CONFIG["optimizer"], {
"max_weight_sync_delay": 400,
"num_replay_buffer_shards": 4,
"debug": False
@@ -2,9 +2,10 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.rllib.dqn.common.schedules import ConstantSchedule, LinearSchedule
from ray.rllib.dqn.dqn import DQNAgent
from ray.rllib.ddpg.ddpg_policy_graph import DDPGPolicyGraph
from ray.rllib.agents.agent import with_common_config
from ray.rllib.agents.dqn.dqn import DQNAgent
from ray.rllib.agents.ddpg.ddpg_policy_graph import DDPGPolicyGraph
from ray.rllib.utils.schedules import ConstantSchedule, LinearSchedule
OPTIMIZER_SHARED_CONFIGS = [
"buffer_size", "prioritized_replay", "prioritized_replay_alpha",
@@ -12,7 +13,7 @@ OPTIMIZER_SHARED_CONFIGS = [
"train_batch_size", "learning_starts", "clip_rewards"
]
DEFAULT_CONFIG = {
DEFAULT_CONFIG = with_common_config({
# === Model ===
# Hidden layer sizes of the policy network
"actor_hiddens": [64, 64],
@@ -24,12 +25,6 @@ DEFAULT_CONFIG = {
"critic_hidden_activation": "relu",
# N-step Q learning
"n_step": 1,
# Config options to pass to the model constructor
"model": {},
# Discount factor for the MDP
"gamma": 0.99,
# Arguments to pass to the env creator
"env_config": {},
# === Exploration ===
# Max num timesteps for annealing schedules. Exploration is annealed from
@@ -99,30 +94,21 @@ DEFAULT_CONFIG = {
# to increase if your environment is particularly slow to sample, or if
# you"re using the Async or Ape-X optimizers.
"num_workers": 0,
# Number of environments to evaluate vectorwise per worker.
"num_envs": 1,
# Whether to allocate GPUs for workers (if > 0).
"num_gpus_per_worker": 0,
# Whether to allocate CPUs for workers (if > 0).
"num_cpus_per_worker": 1,
# Optimizer class to use.
"optimizer_class": "SyncReplayOptimizer",
# Config to pass to the optimizer.
"optimizer_config": {},
# Whether to use a distribution of epsilons across workers for exploration.
"per_worker_exploration": False,
# Whether to compute priorities on workers.
"worker_side_prioritization": False,
# === Multiagent ===
"multiagent": {
"policy_graphs": {},
"policy_mapping_fn": None,
},
}
})
class DDPGAgent(DQNAgent):
"""DDPG implementation in TensorFlow."""
_agent_name = "DDPG"
_default_config = DEFAULT_CONFIG
_policy_graph = DDPGPolicyGraph
@@ -8,11 +8,11 @@ import tensorflow as tf
import tensorflow.contrib.layers as layers
import ray
from ray.rllib.dqn.dqn_policy_graph import _huber_loss, _minimize_and_clip, \
_scope_vars, _postprocess_dqn
from ray.rllib.agents.dqn.dqn_policy_graph import _huber_loss, \
_minimize_and_clip, _scope_vars, _postprocess_dqn
from ray.rllib.models import ModelCatalog
from ray.rllib.utils.error import UnsupportedSpaceException
from ray.rllib.utils.tf_policy_graph import TFPolicyGraph
from ray.rllib.evaluation.tf_policy_graph import TFPolicyGraph
A_SCOPE = "a_func"
@@ -113,7 +113,7 @@ class ActorCriticLoss(object):
class DDPGPolicyGraph(TFPolicyGraph):
def __init__(self, observation_space, action_space, config):
config = dict(ray.rllib.ddpg.ddpg.DEFAULT_CONFIG, **config)
config = dict(ray.rllib.agents.ddpg.ddpg.DEFAULT_CONFIG, **config)
if not isinstance(action_space, Box):
raise UnsupportedSpaceException(
"Action space {} is not supported for DDPG.".format(
@@ -2,7 +2,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.rllib.dqn.apex import ApexAgent
from ray.rllib.dqn.dqn import DQNAgent, DEFAULT_CONFIG
from ray.rllib.agents.dqn.apex import ApexAgent
from ray.rllib.agents.dqn.dqn import DQNAgent, DEFAULT_CONFIG
__all__ = ["ApexAgent", "DQNAgent", "DEFAULT_CONFIG"]
@@ -2,7 +2,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.rllib.dqn.dqn import DQNAgent, DEFAULT_CONFIG as DQN_CONFIG
from ray.rllib.agents.dqn.dqn import DQNAgent, DEFAULT_CONFIG as DQN_CONFIG
from ray.tune.trial import Resources
from ray.utils import merge_dicts
@@ -10,9 +10,9 @@ APEX_DEFAULT_CONFIG = merge_dicts(
DQN_CONFIG,
{
"optimizer_class": "AsyncSamplesOptimizer",
"optimizer_config":
"optimizer":
merge_dicts(
DQN_CONFIG["optimizer_config"], {
DQN_CONFIG["optimizer"], {
"max_weight_sync_delay": 400,
"num_replay_buffer_shards": 4,
"debug": False
@@ -47,7 +47,7 @@ class ApexAgent(DQNAgent):
def default_resource_request(cls, config):
cf = dict(cls._default_config, **config)
return Resources(
cpu=1 + cf["optimizer_config"]["num_replay_buffer_shards"],
cpu=1 + cf["optimizer"]["num_replay_buffer_shards"],
gpu=cf["gpu"] and 1 or 0,
extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"],
extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"])
@@ -7,11 +7,10 @@ import os
import ray
from ray.rllib import optimizers
from ray.rllib.dqn.common.schedules import ConstantSchedule, LinearSchedule
from ray.rllib.dqn.dqn_policy_graph import DQNPolicyGraph
from ray.rllib.utils.common_policy_evaluator import CommonPolicyEvaluator, \
collect_metrics
from ray.rllib.agent import Agent
from ray.rllib.agents.agent import Agent, with_common_config
from ray.rllib.agents.dqn.dqn_policy_graph import DQNPolicyGraph
from ray.rllib.evaluation.metrics import collect_metrics
from ray.rllib.utils.schedules import ConstantSchedule, LinearSchedule
from ray.tune.trial import Resources
@@ -20,7 +19,7 @@ OPTIMIZER_SHARED_CONFIGS = [
"prioritized_replay_beta", "prioritized_replay_eps", "sample_batch_size",
"train_batch_size", "learning_starts", "clip_rewards"]
DEFAULT_CONFIG = {
DEFAULT_CONFIG = with_common_config({
# === Model ===
# Whether to use dueling dqn
"dueling": True,
@@ -30,12 +29,8 @@ DEFAULT_CONFIG = {
"hiddens": [256],
# N-step Q learning
"n_step": 1,
# Config options to pass to the model constructor
"model": {},
# Discount factor for the MDP
"gamma": 0.99,
# Arguments to pass to the env creator
"env_config": {},
# Whether to use rllib or deepmind preprocessors
"preprocessor_pref": "deepmind",
# === Exploration ===
# Max num timesteps for annealing schedules. Exploration is annealed from
@@ -66,6 +61,8 @@ DEFAULT_CONFIG = {
"prioritized_replay_eps": 1e-6,
# Whether to clip rewards to [-1, 1] prior to adding to the replay buffer.
"clip_rewards": True,
# Whether to LZ4 compress observations
"compress_observations": True,
# === Optimization ===
# Learning rate for adam optimizer
@@ -89,30 +86,22 @@ DEFAULT_CONFIG = {
# to increase if your environment is particularly slow to sample, or if
# you"re using the Async or Ape-X optimizers.
"num_workers": 0,
# Number of environments to evaluate vectorwise per worker.
"num_envs": 1,
# Whether to allocate GPUs for workers (if > 0).
"num_gpus_per_worker": 0,
# Whether to allocate CPUs for workers (if > 0).
"num_cpus_per_worker": 1,
# Optimizer class to use.
"optimizer_class": "SyncReplayOptimizer",
# Config to pass to the optimizer.
"optimizer_config": {},
# Whether to use a distribution of epsilons across workers for exploration.
"per_worker_exploration": False,
# Whether to compute priorities on workers.
"worker_side_prioritization": False,
# === Multiagent ===
"multiagent": {
"policy_graphs": {},
"policy_mapping_fn": None,
},
}
})
class DQNAgent(Agent):
"""DQN implementation in TensorFlow."""
_agent_name = "DQN"
_default_config = DEFAULT_CONFIG
_policy_graph = DQNPolicyGraph
@@ -126,32 +115,10 @@ class DQNAgent(Agent):
extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"])
def _init(self):
# Update effective batch size to include n-step
adjusted_batch_size = (
self.config["sample_batch_size"] + self.config["n_step"] - 1)
self.local_evaluator = CommonPolicyEvaluator(
self.env_creator,
self.config["multiagent"]["policy_graphs"] or self._policy_graph,
policy_mapping_fn=self.config["multiagent"]["policy_mapping_fn"],
batch_steps=adjusted_batch_size,
batch_mode="truncate_episodes", preprocessor_pref="deepmind",
compress_observations=True,
env_config=self.config["env_config"],
model_config=self.config["model"], policy_config=self.config,
num_envs=self.config["num_envs"])
remote_cls = CommonPolicyEvaluator.as_remote(
num_cpus=self.config["num_cpus_per_worker"],
num_gpus=self.config["num_gpus_per_worker"])
self.remote_evaluators = [
remote_cls.remote(
self.env_creator, self._policy_graph,
batch_steps=adjusted_batch_size,
batch_mode="truncate_episodes", preprocessor_pref="deepmind",
compress_observations=True,
env_config=self.config["env_config"],
model_config=self.config["model"], policy_config=self.config,
num_envs=self.config["num_envs"],
worker_index=i+1)
for i in range(self.config["num_workers"])]
self.config["sample_batch_size"] = adjusted_batch_size
self.exploration0 = self._make_exploration_schedule(0)
self.explorations = [
@@ -159,11 +126,17 @@ class DQNAgent(Agent):
for i in range(self.config["num_workers"])]
for k in OPTIMIZER_SHARED_CONFIGS:
if k not in self.config["optimizer_config"]:
self.config["optimizer_config"][k] = self.config[k]
if k not in self.config["optimizer"]:
self.config["optimizer"][k] = self.config[k]
self.local_evaluator = self.make_local_evaluator(
self.env_creator, self._policy_graph)
self.remote_evaluators = self.make_remote_evaluators(
self.env_creator, self._policy_graph, self.config["num_workers"],
{"num_cpus": self.config["num_cpus_per_worker"],
"num_gpus": self.config["num_gpus_per_worker"]})
self.optimizer = getattr(optimizers, self.config["optimizer_class"])(
self.config["optimizer_config"], self.local_evaluator,
self.config["optimizer"], self.local_evaluator,
self.remote_evaluators)
self.last_target_update_ts = 0
@@ -247,10 +220,3 @@ class DQNAgent(Agent):
self.optimizer.restore(extra_data[2])
self.num_target_updates = extra_data[3]
self.last_target_update_ts = extra_data[4]
def compute_action(self, observation, state=None):
if state is None:
state = []
return self.local_evaluator.for_policy(
lambda p: p.compute_single_action(
observation, state, is_training=False)[0])
@@ -9,9 +9,9 @@ import tensorflow.contrib.layers as layers
import ray
from ray.rllib.models import ModelCatalog
from ray.rllib.optimizers.sample_batch import SampleBatch
from ray.rllib.evaluation.sample_batch import SampleBatch
from ray.rllib.utils.error import UnsupportedSpaceException
from ray.rllib.utils.tf_policy_graph import TFPolicyGraph
from ray.rllib.evaluation.tf_policy_graph import TFPolicyGraph
Q_SCOPE = "q_func"
@@ -79,7 +79,7 @@ class QLoss(object):
class DQNPolicyGraph(TFPolicyGraph):
def __init__(self, observation_space, action_space, config):
config = dict(ray.rllib.dqn.dqn.DEFAULT_CONFIG, **config)
config = dict(ray.rllib.agents.dqn.dqn.DEFAULT_CONFIG, **config)
if not isinstance(action_space, Discrete):
raise UnsupportedSpaceException(
"Action space {} is not supported for DQN.".format(
+3
View File
@@ -0,0 +1,3 @@
from ray.rllib.agents.es.es import (ESAgent, DEFAULT_CONFIG)
__all__ = ["ESAgent", "DEFAULT_CONFIG"]
@@ -12,13 +12,13 @@ import pickle
import time
import ray
from ray.rllib import agent
from ray.rllib.agents import Agent
from ray.tune.trial import Resources
from ray.rllib.es import optimizers
from ray.rllib.es import policies
from ray.rllib.es import tabular_logger as tlogger
from ray.rllib.es import utils
from ray.rllib.agents.es import optimizers
from ray.rllib.agents.es import policies
from ray.rllib.agents.es import tabular_logger as tlogger
from ray.rllib.agents.es import utils
Result = namedtuple("Result", [
@@ -134,7 +134,9 @@ class Worker(object):
eval_lengths=eval_lengths)
class ESAgent(agent.Agent):
class ESAgent(Agent):
"""Large-scale implementation of Evolution Strategies in Ray."""
_agent_name = "ES"
_default_config = DEFAULT_CONFIG
+3
View File
@@ -0,0 +1,3 @@
from ray.rllib.agents.pg.pg import PGAgent, DEFAULT_CONFIG
__all__ = ["PGAgent", "DEFAULT_CONFIG"]
+54
View File
@@ -0,0 +1,54 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.rllib.agents.agent import Agent, with_common_config
from ray.rllib.agents.pg.pg_policy_graph import PGPolicyGraph
from ray.rllib.evaluation.metrics import collect_metrics
from ray.rllib.optimizers import SyncSamplesOptimizer
from ray.tune.trial import Resources
DEFAULT_CONFIG = with_common_config({
# No remote workers by default
"num_workers": 0,
# Learning rate
"lr": 0.0004,
# Override model config
"model": {
# Use LSTM model.
"use_lstm": False,
# Max seq length for LSTM training.
"max_seq_len": 20,
},
})
class PGAgent(Agent):
"""Simple policy gradient agent.
This is an example agent to show how to implement algorithms in RLlib.
In most cases, you will probably want to use the PPO agent instead.
"""
_agent_name = "PG"
_default_config = DEFAULT_CONFIG
@classmethod
def default_resource_request(cls, config):
cf = dict(cls._default_config, **config)
return Resources(cpu=1, gpu=0, extra_cpu=cf["num_workers"])
def _init(self):
self.local_evaluator = self.make_local_evaluator(
self.env_creator, PGPolicyGraph)
self.remote_evaluators = self.make_remote_evaluators(
self.env_creator, PGPolicyGraph, self.config["num_workers"], {})
self.optimizer = SyncSamplesOptimizer(
self.config["optimizer"], self.local_evaluator,
self.remote_evaluators)
def _train(self):
self.optimizer.step()
return collect_metrics(
self.optimizer.local_evaluator, self.optimizer.remote_evaluators)
@@ -6,8 +6,8 @@ import tensorflow as tf
import ray
from ray.rllib.models.catalog import ModelCatalog
from ray.rllib.utils.postprocessing import compute_advantages
from ray.rllib.utils.tf_policy_graph import TFPolicyGraph
from ray.rllib.evaluation.postprocessing import compute_advantages
from ray.rllib.evaluation.tf_policy_graph import TFPolicyGraph
class PGLoss(object):
@@ -17,7 +17,7 @@ class PGLoss(object):
class PGPolicyGraph(TFPolicyGraph):
def __init__(self, obs_space, action_space, config):
config = dict(ray.rllib.pg.pg.DEFAULT_CONFIG, **config)
config = dict(ray.rllib.agents.pg.pg.DEFAULT_CONFIG, **config)
self.config = config
# Setup policy
+3
View File
@@ -0,0 +1,3 @@
from ray.rllib.agents.ppo.ppo import (PPOAgent, DEFAULT_CONFIG)
__all__ = ["PPOAgent", "DEFAULT_CONFIG"]
@@ -5,22 +5,16 @@ from __future__ import print_function
import os
import numpy as np
import pickle
import tensorflow as tf
import ray
from ray.tune.trial import Resources
from ray.rllib.agent import Agent
from ray.rllib.utils.common_policy_evaluator import (
CommonPolicyEvaluator, collect_metrics)
from ray.rllib.agents import Agent, with_common_config
from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicyGraph
from ray.rllib.evaluation.metrics import collect_metrics
from ray.rllib.utils import FilterManager
from ray.rllib.ppo.ppo_tf_policy import PPOTFPolicyGraph
from ray.rllib.optimizers.multi_gpu_optimizer import LocalMultiGPUOptimizer
from ray.tune.trial import Resources
DEFAULT_CONFIG = {
# Discount factor of the MDP
"gamma": 0.995,
# Number of steps after which the rollout gets cut
"horizon": 2000,
DEFAULT_CONFIG = with_common_config({
# If true, use the Generalized Advantage Estimator (GAE)
# with a value function, see https://arxiv.org/pdf/1506.02438.pdf.
"use_gae": True,
@@ -28,22 +22,12 @@ DEFAULT_CONFIG = {
"lambda": 1.0,
# Initial coefficient for KL divergence
"kl_coeff": 0.2,
# Number of timesteps collected for each SGD round
"timesteps_per_batch": 4000,
# Number of SGD iterations in each outer loop
"num_sgd_iter": 30,
# Stepsize of SGD
"sgd_stepsize": 5e-5,
# TODO(pcm): Expose the choice between gpus and cpus
# as a command line argument.
"devices": ["/cpu:%d" % i for i in range(4)],
"tf_session_args": {
"device_count": {"CPU": 4},
"log_device_placement": False,
"allow_soft_placement": True,
"intra_op_parallelism_threads": 1,
"inter_op_parallelism_threads": 1,
},
# Batch size for policy evaluations for rollouts
"rollout_batchsize": 1,
# Total SGD batch size across all devices for SGD
"sgd_batchsize": 128,
# Coefficient of the value function loss
@@ -54,82 +38,41 @@ DEFAULT_CONFIG = {
"clip_param": 0.3,
# Target value for KL divergence
"kl_target": 0.01,
# Config params to pass to the model
"model": {"free_log_std": False},
# Which observation filter to apply to the observation
"observation_filter": "MeanStdFilter",
# If >1, adds frameskip
"extra_frameskip": 1,
# Number of timesteps collected in each outer loop
"timesteps_per_batch": 4000,
# Each tasks performs rollouts until at least this
# number of steps is obtained
"min_steps_per_task": 200,
# Number of actors used to collect the rollouts
"num_workers": 2,
# Number of GPUs to use for SGD
"num_gpus": 0,
# Whether to allocate GPUs for workers (if > 0).
"num_gpus_per_worker": 0,
# Whether to allocate CPUs for workers (if > 0).
"num_cpus_per_worker": 1,
# Dump TensorFlow timeline after this many SGD minibatches
"full_trace_nth_sgd_batch": -1,
# Whether to profile data loading
"full_trace_data_load": False,
# Outer loop iteration index when we drop into the TensorFlow debugger
"tf_debug_iteration": -1,
# If this is True, the TensorFlow debugger is invoked if an Inf or NaN
# is detected
"tf_debug_inf_or_nan": False,
# If True, we write tensorflow logs and checkpoints
"write_logs": True,
# Arguments to pass to the env creator
"env_config": {},
}
# Whether to rollout "complete_episodes" or "truncate_episodes"
"batch_mode": "complete_episodes",
# Which observation filter to apply to the observation
"observation_filter": "MeanStdFilter",
})
class PPOAgent(Agent):
"""Multi-GPU optimized implementation of PPO in TensorFlow."""
_agent_name = "PPO"
_default_config = DEFAULT_CONFIG
_default_policy_graph = PPOTFPolicyGraph
@classmethod
def default_resource_request(cls, config):
cf = dict(cls._default_config, **config)
return Resources(
cpu=1,
gpu=len([d for d in cf["devices"] if "gpu" in d.lower()]),
gpu=cf["num_gpus"],
extra_cpu=cf["num_cpus_per_worker"] * cf["num_workers"],
extra_gpu=cf["num_gpus_per_worker"] * cf["num_workers"])
def _init(self):
def session_creator():
return tf.Session(
config=tf.ConfigProto(**self.config["tf_session_args"]))
self.local_evaluator = CommonPolicyEvaluator(
self.env_creator,
self._default_policy_graph,
tf_session_creator=session_creator,
batch_mode="complete_episodes",
observation_filter=self.config["observation_filter"],
env_config=self.config["env_config"],
model_config=self.config["model"],
policy_config=self.config
)
RemoteEvaluator = CommonPolicyEvaluator.as_remote(
num_cpus=self.config["num_cpus_per_worker"],
num_gpus=self.config["num_gpus_per_worker"])
self.remote_evaluators = [
RemoteEvaluator.remote(
self.env_creator,
self._default_policy_graph,
batch_mode="complete_episodes",
observation_filter=self.config["observation_filter"],
env_config=self.config["env_config"],
model_config=self.config["model"],
policy_config=self.config
)
for _ in range(self.config["num_workers"])]
self.local_evaluator = self.make_local_evaluator(
self.env_creator, PPOTFPolicyGraph)
self.remote_evaluators = self.make_remote_evaluators(
self.env_creator, PPOTFPolicyGraph, self.config["num_workers"],
{"num_cpus": self.config["num_cpus_per_worker"],
"num_gpus": self.config["num_gpus_per_worker"]})
self.optimizer = LocalMultiGPUOptimizer(
{"sgd_batch_size": self.config["sgd_batchsize"],
"sgd_stepsize": self.config["sgd_stepsize"],
@@ -137,10 +80,6 @@ class PPOAgent(Agent):
"timesteps_per_batch": self.config["timesteps_per_batch"]},
self.local_evaluator, self.remote_evaluators)
# TODO(rliaw): Push into Policy Graph
with self.local_evaluator.tf_sess.graph.as_default():
self.saver = tf.train.Saver()
def _train(self):
def postprocess_samples(batch):
# Divide by the maximum of value.std() and 1e-4
@@ -183,10 +122,8 @@ class PPOAgent(Agent):
ev.__ray_terminate__.remote()
def _save(self, checkpoint_dir):
checkpoint_path = self.saver.save(
self.local_evaluator.tf_sess,
os.path.join(checkpoint_dir, "checkpoint"),
global_step=self.iteration)
checkpoint_path = os.path.join(checkpoint_dir,
"checkpoint-{}".format(self.iteration))
agent_state = ray.get(
[a.save.remote() for a in self.remote_evaluators])
extra_data = [
@@ -196,18 +133,8 @@ class PPOAgent(Agent):
return checkpoint_path
def _restore(self, checkpoint_path):
self.saver.restore(self.local_evaluator.tf_sess, checkpoint_path)
extra_data = pickle.load(open(checkpoint_path + ".extra_data", "rb"))
self.local_evaluator.restore(extra_data[0])
ray.get([
a.restore.remote(o)
for (a, o) in zip(self.remote_evaluators, extra_data[1])])
def compute_action(self, observation, state=None):
if state is None:
state = []
obs = self.local_evaluator.filters["default"](
observation, update=False)
return self.local_evaluator.for_policy(
lambda p: p.compute_single_action(
obs, state, is_training=False)[0])
@@ -4,9 +4,9 @@ from __future__ import print_function
import tensorflow as tf
from ray.rllib.evaluation.postprocessing import compute_advantages
from ray.rllib.evaluation.tf_policy_graph import TFPolicyGraph
from ray.rllib.models.catalog import ModelCatalog
from ray.rllib.utils.postprocessing import compute_advantages
from ray.rllib.utils.tf_policy_graph import TFPolicyGraph
class PPOLoss(object):
@@ -120,6 +120,7 @@ class PPOTFPolicyGraph(TFPolicyGraph):
("logprobs", logprobs_ph),
("vf_preds", vf_preds_ph)
]
# TODO(ekl) feed RNN states in here
# KL Coefficient
self.kl_coeff = tf.get_variable(
@@ -3,7 +3,7 @@ from __future__ import division
from __future__ import print_function
import ray
from ray.rllib.optimizers import SampleBatch
from ray.rllib.evaluation.sample_batch import SampleBatch
def collect_samples(agents, timesteps_per_batch):
@@ -8,7 +8,7 @@ import tensorflow as tf
from numpy.testing import assert_allclose
from ray.rllib.models.action_dist import Categorical
from ray.rllib.ppo.utils import flatten, concatenate
from ray.rllib.agents.ppo.utils import flatten, concatenate
# TODO(ekl): move to rllib/models dir
-3
View File
@@ -1,3 +0,0 @@
from ray.rllib.bc.bc import BCAgent, DEFAULT_CONFIG
__all__ = ["BCAgent", "DEFAULT_CONFIG"]
+9
View File
@@ -0,0 +1,9 @@
from ray.rllib.env.async_vector_env import AsyncVectorEnv
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.rllib.env.serving_env import ServingEnv
from ray.rllib.env.vector_env import VectorEnv
from ray.rllib.env.env_context import EnvContext
__all__ = [
"AsyncVectorEnv", "MultiAgentEnv", "ServingEnv", "VectorEnv", "EnvContext"
]
@@ -2,9 +2,9 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.rllib.utils.serving_env import ServingEnv
from ray.rllib.utils.vector_env import VectorEnv
from ray.rllib.utils.multi_agent_env import MultiAgentEnv
from ray.rllib.env.serving_env import ServingEnv
from ray.rllib.env.vector_env import VectorEnv
from ray.rllib.env.multi_agent_env import MultiAgentEnv
class AsyncVectorEnv(object):
@@ -84,7 +84,8 @@ class AsyncVectorEnv(object):
The returns are two-level dicts mapping from env_id to a dict of
agent_id to values. The number of agents and envs can vary over time.
Returns:
Returns
-------
obs (dict): New observations for each ready agent.
rewards (dict): Reward values for each ready agent. If the
episode is just started, the value will be None.
@@ -95,6 +96,7 @@ class AsyncVectorEnv(object):
that happens, there will be an entry in this dict that contains
the taken action. There is no need to send_actions() for agents
that have already chosen off-policy actions.
"""
raise NotImplementedError
@@ -6,7 +6,8 @@ from __future__ import print_function
class MultiAgentEnv(object):
"""An environment that hosts multiple independent agents.
Agents are identified by (string) agent ids.
Agents are identified by (string) agent ids. Note that these "agents" here
are not to be confused with RLlib agents.
Examples:
>>> env = MyMultiAgentEnv()
@@ -49,7 +50,8 @@ class MultiAgentEnv(object):
The returns are dicts mapping from agent_id strings to values. The
number of agents in the env can vary over time.
Returns:
Returns
-------
obs (dict): New observations for each ready agent.
rewards (dict): Reward values for each ready agent. If the
episode is just started, the value will be None.
@@ -14,7 +14,7 @@ class VectorEnv(object):
Attributes:
action_space (gym.Space): Action space of individual envs.
observation_space (gym.Space): Observation space of individual envs.
num_envs (int): Number of envs to batch over.
num_envs (int): Number of envs in this vector env.
"""
@staticmethod
-3
View File
@@ -1,3 +0,0 @@
from ray.rllib.es.es import (ESAgent, DEFAULT_CONFIG)
__all__ = ["ESAgent", "DEFAULT_CONFIG"]
+14
View File
@@ -0,0 +1,14 @@
from ray.rllib.evaluation.common_policy_evaluator import CommonPolicyEvaluator
from ray.rllib.evaluation.interface import PolicyEvaluator
from ray.rllib.evaluation.policy_graph import PolicyGraph
from ray.rllib.evaluation.tf_policy_graph import TFPolicyGraph
from ray.rllib.evaluation.torch_policy_graph import TorchPolicyGraph
from ray.rllib.evaluation.sample_batch import SampleBatch, MultiAgentBatch, \
SampleBatchBuilder, MultiAgentSampleBatchBuilder
from ray.rllib.evaluation.sampler import SyncSampler, AsyncSampler
__all__ = [
"PolicyEvaluator", "CommonPolicyEvaluator", "PolicyGraph", "TFPolicyGraph",
"TorchPolicyGraph", "SampleBatch", "MultiAgentBatch", "SampleBatchBuilder",
"MultiAgentSampleBatchBuilder", "SyncSampler", "AsyncSampler",
]
@@ -2,112 +2,75 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import gym
import numpy as np
import pickle
import tensorflow as tf
import ray
from ray.rllib.models import ModelCatalog
from ray.rllib.optimizers.policy_evaluator import PolicyEvaluator
from ray.rllib.optimizers.sample_batch import MultiAgentBatch, \
from ray.rllib.env.async_vector_env import AsyncVectorEnv
from ray.rllib.env.atari_wrappers import wrap_deepmind, is_atari
from ray.rllib.env.env_context import EnvContext
from ray.rllib.env.serving_env import ServingEnv
from ray.rllib.env.vector_env import VectorEnv
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.rllib.evaluation.interface import PolicyEvaluator
from ray.rllib.evaluation.sample_batch import MultiAgentBatch, \
DEFAULT_POLICY_ID
from ray.rllib.utils.async_vector_env import AsyncVectorEnv
from ray.rllib.utils.atari_wrappers import wrap_deepmind, is_atari
from ray.rllib.evaluation.sampler import AsyncSampler, SyncSampler
from ray.rllib.utils.compression import pack
from ray.rllib.utils.env_context import EnvContext
from ray.rllib.utils.filter import get_filter
from ray.rllib.utils.multi_agent_env import MultiAgentEnv
from ray.rllib.utils.policy_graph import PolicyGraph
from ray.rllib.utils.sampler import AsyncSampler, SyncSampler
from ray.rllib.utils.serving_env import ServingEnv
from ray.rllib.utils.tf_policy_graph import TFPolicyGraph
from ray.rllib.evaluation.policy_graph import PolicyGraph
from ray.rllib.evaluation.tf_policy_graph import TFPolicyGraph
from ray.rllib.utils.tf_run_builder import TFRunBuilder
from ray.rllib.utils.vector_env import VectorEnv
from ray.tune.result import TrainingResult
def collect_metrics(local_evaluator, remote_evaluators=[]):
"""Gathers episode metrics from CommonPolicyEvaluator instances."""
episode_rewards = []
episode_lengths = []
policy_rewards = collections.defaultdict(list)
metric_lists = ray.get(
[a.apply.remote(lambda ev: ev.sampler.get_metrics())
for a in remote_evaluators])
metric_lists.append(local_evaluator.sampler.get_metrics())
for metrics in metric_lists:
for episode in metrics:
episode_lengths.append(episode.episode_length)
episode_rewards.append(episode.episode_reward)
for (_, policy_id), reward in episode.agent_rewards.items():
policy_rewards[policy_id].append(reward)
if episode_rewards:
min_reward = min(episode_rewards)
max_reward = max(episode_rewards)
else:
min_reward = float('nan')
max_reward = float('nan')
avg_reward = np.mean(episode_rewards)
avg_length = np.mean(episode_lengths)
timesteps = np.sum(episode_lengths)
for policy_id, rewards in policy_rewards.copy().items():
policy_rewards[policy_id] = np.mean(rewards)
return TrainingResult(
episode_reward_max=max_reward,
episode_reward_min=min_reward,
episode_reward_mean=avg_reward,
episode_len_mean=avg_length,
episodes_total=len(episode_lengths),
timesteps_this_iter=timesteps,
policy_reward_mean=dict(policy_rewards))
class CommonPolicyEvaluator(PolicyEvaluator):
"""Policy evaluator implementation that operates on a rllib.PolicyGraph.
"""Common ``PolicyEvaluator`` implementation that wraps a ``PolicyGraph``.
TODO: multi-gpu
This class wraps a policy graph instance and an environment class to
collect experiences from the environment. You can create many replicas of
this class as Ray actors to scale RL training.
This class supports vectorized and multi-agent policy evaluation (e.g.,
VectorEnv, MultiAgentEnv, etc.)
Examples:
# Create a policy evaluator and using it to collect experiences.
>>> # Create a policy evaluator and using it to collect experiences.
>>> evaluator = CommonPolicyEvaluator(
env_creator=lambda _: gym.make("CartPole-v0"),
policy_graph=PGPolicyGraph)
... env_creator=lambda _: gym.make("CartPole-v0"),
... policy_graph=PGPolicyGraph)
>>> print(evaluator.sample())
SampleBatch({
"obs": [[...]], "actions": [[...]], "rewards": [[...]],
"dones": [[...]], "new_obs": [[...]]})
# Creating policy evaluators using optimizer_cls.make().
>>> # Creating policy evaluators using optimizer_cls.make().
>>> optimizer = SyncSamplesOptimizer.make(
evaluator_cls=CommonPolicyEvaluator,
evaluator_args={
"env_creator": lambda _: gym.make("CartPole-v0"),
"policy_graph": PGPolicyGraph,
},
num_workers=10)
... evaluator_cls=CommonPolicyEvaluator,
... evaluator_args={
... "env_creator": lambda _: gym.make("CartPole-v0"),
... "policy_graph": PGPolicyGraph,
... },
... num_workers=10)
>>> for _ in range(10): optimizer.step()
# Creating a multi-agent policy evaluator
>>> # Creating a multi-agent policy evaluator
>>> evaluator = CommonPolicyEvaluator(
env_creator=lambda _: MultiAgentTrafficGrid(num_cars=25),
policy_graph={
# Use an ensemble of two policies for car agents
"car_policy1":
(PGPolicyGraph, Box(...), Discrete(...), {"gamma": 0.99}),
"car_policy2":
(PGPolicyGraph, Box(...), Discrete(...), {"gamma": 0.95}),
# Use a single shared policy for all traffic lights
"traffic_light_policy":
(PGPolicyGraph, Box(...), Discrete(...), {}),
},
policy_mapping_fn=lambda agent_id:
random.choice(["car_policy1", "car_policy2"])
if agent_id.startswith("car_") else "traffic_light_policy")
... env_creator=lambda _: MultiAgentTrafficGrid(num_cars=25),
... policy_graphs={
... # Use an ensemble of two policies for car agents
... "car_policy1":
... (PGPolicyGraph, Box(...), Discrete(...), {"gamma": 0.99}),
... "car_policy2":
... (PGPolicyGraph, Box(...), Discrete(...), {"gamma": 0.95}),
... # Use a single shared policy for all traffic lights
... "traffic_light_policy":
... (PGPolicyGraph, Box(...), Discrete(...), {}),
... },
... policy_mapping_fn=lambda agent_id:
... random.choice(["car_policy1", "car_policy2"])
... if agent_id.startswith("car_") else "traffic_light_policy")
>>> print(evaluator.sample().keys())
MultiAgentBatch({
"car_policy1": SampleBatch(...),
@@ -6,12 +6,9 @@ import os
class PolicyEvaluator(object):
"""Algorithms implement this interface to leverage policy optimizers.
"""This is the interface between policy optimizers and policy evaluation.
Policy evaluators are the "data plane" of an algorithm.
Any algorithm that implements Evaluator can plug in any PolicyOptimizer,
e.g. async SGD, Ape-X, local multi-GPU SGD, etc.
See also: CommonPolicyEvaluator
"""
def sample(self):
@@ -21,7 +18,7 @@ class PolicyEvaluator(object):
Returns:
SampleBatch|MultiAgentBatch: A columnar batch of experiences
(e.g., tensors), or a multi-agent batch.
(e.g., tensors), or a multi-agent batch.
Examples:
>>> print(ev.sample())
@@ -37,9 +34,9 @@ class PolicyEvaluator(object):
Returns:
(grads, info): A list of gradients that can be applied on a
compatible evaluator. In the multi-agent case, returns a dict
of gradients keyed by policy graph ids. An info dictionary of
extra metadata is also returned.
compatible evaluator. In the multi-agent case, returns a dict
of gradients keyed by policy graph ids. An info dictionary of
extra metadata is also returned.
Examples:
>>> batch = ev.sample()
+48
View File
@@ -0,0 +1,48 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import collections
import ray
from ray.tune.result import TrainingResult
def collect_metrics(local_evaluator, remote_evaluators=[]):
"""Gathers episode metrics from CommonPolicyEvaluator instances."""
episode_rewards = []
episode_lengths = []
policy_rewards = collections.defaultdict(list)
metric_lists = ray.get(
[a.apply.remote(lambda ev: ev.sampler.get_metrics())
for a in remote_evaluators])
metric_lists.append(local_evaluator.sampler.get_metrics())
for metrics in metric_lists:
for episode in metrics:
episode_lengths.append(episode.episode_length)
episode_rewards.append(episode.episode_reward)
for (_, policy_id), reward in episode.agent_rewards.items():
policy_rewards[policy_id].append(reward)
if episode_rewards:
min_reward = min(episode_rewards)
max_reward = max(episode_rewards)
else:
min_reward = float('nan')
max_reward = float('nan')
avg_reward = np.mean(episode_rewards)
avg_length = np.mean(episode_lengths)
timesteps = np.sum(episode_lengths)
for policy_id, rewards in policy_rewards.copy().items():
policy_rewards[policy_id] = np.mean(rewards)
return TrainingResult(
episode_reward_max=max_reward,
episode_reward_min=min_reward,
episode_reward_mean=avg_reward,
episode_len_mean=avg_length,
episodes_total=len(episode_lengths),
timesteps_this_iter=timesteps,
policy_reward_mean=dict(policy_rewards))
@@ -4,7 +4,7 @@ from __future__ import print_function
import numpy as np
import scipy.signal
from ray.rllib.optimizers import SampleBatch
from ray.rllib.evaluation.sample_batch import SampleBatch
def discount(x, gamma):
@@ -7,9 +7,9 @@ import numpy as np
import six.moves.queue as queue
import threading
from ray.rllib.optimizers.sample_batch import MultiAgentSampleBatchBuilder, \
from ray.rllib.evaluation.sample_batch import MultiAgentSampleBatchBuilder, \
MultiAgentBatch
from ray.rllib.utils.async_vector_env import AsyncVectorEnv
from ray.rllib.env.async_vector_env import AsyncVectorEnv
from ray.rllib.utils.tf_run_builder import TFRunBuilder
@@ -5,8 +5,8 @@ from __future__ import print_function
import tensorflow as tf
import ray
from ray.rllib.evaluation.policy_graph import PolicyGraph
from ray.rllib.models.lstm import chop_into_sequences
from ray.rllib.utils.policy_graph import PolicyGraph
from ray.rllib.utils.tf_run_builder import TFRunBuilder
@@ -5,11 +5,14 @@ from __future__ import print_function
import numpy as np
from threading import Lock
import torch
import torch.nn.functional as F
try:
import torch
import torch.nn.functional as F
from ray.rllib.models.pytorch.misc import var_to_np
except ImportError:
pass # soft dep
from ray.rllib.models.pytorch.misc import var_to_np
from ray.rllib.utils.policy_graph import PolicyGraph
from ray.rllib.evaluation.policy_graph import PolicyGraph
class TorchPolicyGraph(PolicyGraph):
@@ -35,11 +38,12 @@ class TorchPolicyGraph(PolicyGraph):
observation_space (gym.Space): observation space of the policy.
action_space (gym.Space): action space of the policy.
model (nn.Module): PyTorch policy module. Given observations as
input, this module must a list of outputs where the first item
are action logits, and the remainder can be any value.
input, this module must return a list of outputs where the
first item is action logits, and the rest can be any value.
loss (nn.Module): Loss defined as a PyTorch module. The inputs for
this module are defined by the `loss_inputs` param. This module
returns a single scalar loss.
returns a single scalar loss. Note that this module should
internally be using the model module.
loss_inputs (list): List of SampleBatch columns that will be
passed to the loss module's forward() function when computing
the loss. For example, ["obs", "action", "advantages"].
@@ -7,7 +7,7 @@ import gym
from gym.envs.registration import register
import ray
import ray.rllib.ppo as ppo
import ray.rllib.agents.ppo as ppo
from ray.tune.registry import register_env
env_name = "MultiAgentMountainCarEnv"
@@ -7,7 +7,7 @@ import gym
from gym.envs.registration import register
import ray
import ray.rllib.ppo as ppo
import ray.rllib.agents.ppo as ppo
from ray.tune.registry import register_env
env_name = "MultiAgentPendulumEnv"
@@ -18,8 +18,8 @@ import gym
import random
import ray
from ray.rllib.pg.pg import PGAgent
from ray.rllib.pg.pg_policy_graph import PGPolicyGraph
from ray.rllib.agents.pg.pg import PGAgent
from ray.rllib.agents.pg.pg_policy_graph import PGPolicyGraph
from ray.rllib.test.test_multi_agent_env import MultiCartpole
from ray.tune.logger import pretty_print
from ray.tune.registry import register_env
@@ -13,8 +13,8 @@ import os
from gym import spaces
import ray
from ray.rllib.dqn import DQNAgent
from ray.rllib.utils.serving_env import ServingEnv
from ray.rllib.agents.dqn import DQNAgent
from ray.rllib.env.serving_env import ServingEnv
from ray.rllib.utils.policy_server import PolicyServer
from ray.tune.logger import pretty_print
from ray.tune.registry import register_env
+2 -2
View File
@@ -2,11 +2,11 @@ from ray.rllib.models.catalog import ModelCatalog
from ray.rllib.models.action_dist import (ActionDistribution, Categorical,
DiagGaussian, Deterministic)
from ray.rllib.models.model import Model
from ray.rllib.models.preprocessors import Preprocessor
from ray.rllib.models.fcnet import FullyConnectedNetwork
from ray.rllib.models.lstm import LSTM
from ray.rllib.models.multiagentfcnet import MultiAgentFullyConnectedNetwork
__all__ = ["ActionDistribution", "ActionDistribution", "Categorical",
"DiagGaussian", "Deterministic", "ModelCatalog", "Model",
"FullyConnectedNetwork", "LSTM", "MultiAgentFullyConnectedNetwork"]
"Preprocessor", "FullyConnectedNetwork", "LSTM"]
+3 -6
View File
@@ -1,15 +1,12 @@
from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer
from ray.rllib.optimizers.async_samples_optimizer import AsyncSamplesOptimizer
from ray.rllib.optimizers.async_gradients_optimizer import \
AsyncGradientsOptimizer
from ray.rllib.optimizers.sync_samples_optimizer import SyncSamplesOptimizer
from ray.rllib.optimizers.sync_replay_optimizer import SyncReplayOptimizer
from ray.rllib.optimizers.multi_gpu_optimizer import LocalMultiGPUOptimizer
from ray.rllib.optimizers.sample_batch import SampleBatch, MultiAgentBatch
from ray.rllib.optimizers.policy_evaluator import PolicyEvaluator, \
TFMultiGPUSupport
__all__ = [
"AsyncSamplesOptimizer", "AsyncGradientsOptimizer", "SyncSamplesOptimizer",
"SyncReplayOptimizer", "LocalMultiGPUOptimizer", "SampleBatch",
"PolicyEvaluator", "TFMultiGPUSupport", "MultiAgentBatch"]
"PolicyOptimizer", "AsyncSamplesOptimizer", "AsyncGradientsOptimizer",
"SyncSamplesOptimizer", "SyncReplayOptimizer", "LocalMultiGPUOptimizer"]
@@ -17,7 +17,7 @@ from six.moves import queue
import ray
from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer
from ray.rllib.optimizers.replay_buffer import PrioritizedReplayBuffer
from ray.rllib.optimizers.sample_batch import SampleBatch
from ray.rllib.evaluation.sample_batch import SampleBatch
from ray.rllib.utils.actors import TaskPool, create_colocated
from ray.rllib.utils.timer import TimerStat
from ray.rllib.utils.window_stat import WindowStat
@@ -8,9 +8,9 @@ import os
import tensorflow as tf
import ray
from ray.rllib.evaluation.tf_policy_graph import TFPolicyGraph
from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer
from ray.rllib.optimizers.multi_gpu_impl import LocalSyncParallelOptimizer
from ray.rllib.utils.tf_policy_graph import TFPolicyGraph
from ray.rllib.utils.timer import TimerStat
@@ -87,7 +87,7 @@ class LocalMultiGPUOptimizer(PolicyOptimizer):
with self.sample_timer:
if self.remote_evaluators:
# TODO(rliaw): remove when refactoring
from ray.rllib.ppo.rollout import collect_samples
from ray.rllib.agents.ppo.rollout import collect_samples
samples = collect_samples(self.remote_evaluators,
self.timesteps_per_batch)
else:
@@ -3,7 +3,7 @@ from __future__ import division
from __future__ import print_function
import ray
from ray.rllib.optimizers.sample_batch import MultiAgentBatch
from ray.rllib.evaluation.sample_batch import MultiAgentBatch
class PolicyOptimizer(object):
@@ -31,34 +31,6 @@ class PolicyOptimizer(object):
evaluators created by this optimizer.
"""
@classmethod
def make(
cls, evaluator_cls, evaluator_args, num_workers, optimizer_config,
evaluator_resources={"num_cpus": None}):
"""Create evaluators and an optimizer instance using those evaluators.
Args:
evaluator_cls (class): Python class of the evaluators to create.
evaluator_args (list|dict): Constructor args for the evaluators.
num_workers (int): Number of remote evaluators to create in
addition to a local evaluator. This can be zero or greater.
optimizer_config (dict): Keyword arguments to pass to the
optimizer class constructor.
"""
remote_cls = ray.remote(**evaluator_resources)(evaluator_cls)
if isinstance(evaluator_args, list):
local_evaluator = evaluator_cls(*evaluator_args)
remote_evaluators = [
remote_cls.remote(*evaluator_args)
for _ in range(num_workers)]
else:
local_evaluator = evaluator_cls(**evaluator_args)
remote_evaluators = [
remote_cls.remote(worker_index=i+1, **evaluator_args)
for i in range(num_workers)]
return cls(optimizer_config, local_evaluator, remote_evaluators)
def __init__(self, config, local_evaluator, remote_evaluators):
"""Create an optimizer instance.
@@ -9,7 +9,7 @@ import ray
from ray.rllib.optimizers.replay_buffer import ReplayBuffer, \
PrioritizedReplayBuffer
from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer
from ray.rllib.optimizers.sample_batch import SampleBatch, DEFAULT_POLICY_ID, \
from ray.rllib.evaluation.sample_batch import SampleBatch, DEFAULT_POLICY_ID, \
MultiAgentBatch
from ray.rllib.utils.compression import pack_if_needed
from ray.rllib.utils.filter import RunningStat
@@ -4,7 +4,7 @@ from __future__ import print_function
import ray
from ray.rllib.optimizers.policy_optimizer import PolicyOptimizer
from ray.rllib.optimizers.sample_batch import SampleBatch
from ray.rllib.evaluation.sample_batch import SampleBatch
from ray.rllib.utils.filter import RunningStat
from ray.rllib.utils.timer import TimerStat
-3
View File
@@ -1,3 +0,0 @@
from ray.rllib.pg.pg import PGAgent, DEFAULT_CONFIG
__all__ = ["PGAgent", "DEFAULT_CONFIG"]
-86
View File
@@ -1,86 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.rllib.agent import Agent
from ray.rllib.optimizers import SyncSamplesOptimizer
from ray.rllib.pg.pg_policy_graph import PGPolicyGraph
from ray.rllib.utils.common_policy_evaluator import CommonPolicyEvaluator, \
collect_metrics
from ray.tune.trial import Resources
DEFAULT_CONFIG = {
# Number of workers (excluding master)
"num_workers": 0,
# Number of environments to evaluate vectorwise per worker.
"num_envs": 1,
# Size of rollout batch
"batch_size": 512,
# Discount factor of MDP
"gamma": 0.99,
# Number of steps after which the rollout gets cut
"horizon": 500,
# Learning rate
"lr": 0.0004,
# Arguments to pass to the rllib optimizer
"optimizer": {},
# Model parameters
"model": {"fcnet_hiddens": [128, 128], "max_seq_len": 20},
# Arguments to pass to the env creator
"env_config": {},
# === Multiagent ===
"multiagent": {
"policy_graphs": {},
"policy_mapping_fn": None,
},
}
class PGAgent(Agent):
"""Simple policy gradient agent.
This is an example agent to show how to implement algorithms in RLlib.
In most cases, you will probably want to use the PPO agent instead.
"""
_agent_name = "PG"
_default_config = DEFAULT_CONFIG
@classmethod
def default_resource_request(cls, config):
cf = dict(cls._default_config, **config)
return Resources(cpu=1, gpu=0, extra_cpu=cf["num_workers"])
def _init(self):
self.optimizer = SyncSamplesOptimizer.make(
evaluator_cls=CommonPolicyEvaluator,
evaluator_args={
"env_creator": self.env_creator,
"policy_graph": (
self.config["multiagent"]["policy_graphs"] or
PGPolicyGraph),
"policy_mapping_fn":
self.config["multiagent"]["policy_mapping_fn"],
"batch_steps": self.config["batch_size"],
"batch_mode": "truncate_episodes",
"model_config": self.config["model"],
"env_config": self.config["env_config"],
"policy_config": self.config,
"num_envs": self.config["num_envs"],
},
num_workers=self.config["num_workers"],
optimizer_config=self.config["optimizer"])
def _train(self):
self.optimizer.step()
return collect_metrics(
self.optimizer.local_evaluator, self.optimizer.remote_evaluators)
def compute_action(self, observation, state=None):
if state is None:
state = []
return self.local_evaluator.for_policy(
lambda p: p.compute_single_action(
observation, state, is_training=False)[0])
-3
View File
@@ -1,3 +0,0 @@
from ray.rllib.ppo.ppo import (PPOAgent, DEFAULT_CONFIG)
__all__ = ["PPOAgent", "DEFAULT_CONFIG"]
+2 -2
View File
@@ -11,8 +11,8 @@ import pickle
import gym
import ray
from ray.rllib.agent import get_agent_class
from ray.rllib.dqn.common.wrappers import wrap_dqn
from ray.rllib.agents.agent import get_agent_class
from ray.rllib.agents.dqn.common.wrappers import wrap_dqn
from ray.rllib.models import ModelCatalog
EXAMPLE_USAGE = """
+1 -1
View File
@@ -3,7 +3,7 @@ from __future__ import division
from __future__ import print_function
import numpy as np
from ray.rllib.optimizers import SampleBatch
from ray.rllib.evaluation import SampleBatch
from ray.rllib.utils.filter import MeanStdFilter
@@ -7,7 +7,7 @@ from __future__ import print_function
import numpy as np
import ray
from ray.rllib.agent import get_agent_class
from ray.rllib.agents.agent import get_agent_class
def get_mean_action(alg, obs):
@@ -7,12 +7,12 @@ import time
import unittest
import ray
from ray.rllib.pg import PGAgent
from ray.rllib.utils.common_policy_evaluator import CommonPolicyEvaluator, \
collect_metrics
from ray.rllib.utils.policy_graph import PolicyGraph
from ray.rllib.utils.postprocessing import compute_advantages
from ray.rllib.utils.vector_env import VectorEnv
from ray.rllib.agents.pg import PGAgent
from ray.rllib.evaluation.common_policy_evaluator import CommonPolicyEvaluator
from ray.rllib.evaluation.metrics import collect_metrics
from ray.rllib.evaluation.policy_graph import PolicyGraph
from ray.rllib.evaluation.postprocessing import compute_advantages
from ray.rllib.env.vector_env import VectorEnv
from ray.tune.registry import register_env
@@ -101,7 +101,8 @@ class TestCommonPolicyEvaluator(unittest.TestCase):
def testQueryEvaluators(self):
register_env("test", lambda _: gym.make("CartPole-v0"))
pg = PGAgent(env="test", config={"num_workers": 2, "batch_size": 5})
pg = PGAgent(
env="test", config={"num_workers": 2, "sample_batch_size": 5})
results = pg.optimizer.foreach_evaluator(lambda ev: ev.batch_steps)
results2 = pg.optimizer.foreach_evaluator_with_index(
lambda ev, i: (i, ev.batch_steps))
+1 -1
View File
@@ -4,7 +4,7 @@ from __future__ import print_function
import unittest
from ray.rllib.dqn.dqn_policy_graph import adjust_nstep
from ray.rllib.agents.dqn.dqn_policy_graph import adjust_nstep
class DQNTest(unittest.TestCase):
@@ -7,17 +7,17 @@ import random
import unittest
import ray
from ray.rllib.pg import PGAgent
from ray.rllib.pg.pg_policy_graph import PGPolicyGraph
from ray.rllib.dqn.dqn_policy_graph import DQNPolicyGraph
from ray.rllib.agents.pg import PGAgent
from ray.rllib.agents.pg.pg_policy_graph import PGPolicyGraph
from ray.rllib.agents.dqn.dqn_policy_graph import DQNPolicyGraph
from ray.rllib.optimizers import SyncSamplesOptimizer, \
SyncReplayOptimizer, AsyncGradientsOptimizer
from ray.rllib.test.test_common_policy_evaluator import MockEnv, MockEnv2, \
MockPolicyGraph
from ray.rllib.utils.common_policy_evaluator import CommonPolicyEvaluator, \
collect_metrics
from ray.rllib.utils.async_vector_env import _MultiAgentEnvToAsync
from ray.rllib.utils.multi_agent_env import MultiAgentEnv
from ray.rllib.evaluation.common_policy_evaluator import CommonPolicyEvaluator
from ray.rllib.evaluation.metrics import collect_metrics
from ray.rllib.env.async_vector_env import _MultiAgentEnvToAsync
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.tune.registry import register_env
+2 -1
View File
@@ -8,7 +8,8 @@ import numpy as np
import ray
from ray.rllib.test.mock_evaluator import _MockEvaluator
from ray.rllib.optimizers import AsyncGradientsOptimizer, SampleBatch
from ray.rllib.optimizers import AsyncGradientsOptimizer
from ray.rllib.evaluation import SampleBatch
class AsyncOptimizerTest(unittest.TestCase):
+4 -4
View File
@@ -9,10 +9,10 @@ import unittest
import uuid
import ray
from ray.rllib.dqn import DQNAgent
from ray.rllib.pg import PGAgent
from ray.rllib.utils.common_policy_evaluator import CommonPolicyEvaluator
from ray.rllib.utils.serving_env import ServingEnv
from ray.rllib.agents.dqn import DQNAgent
from ray.rllib.agents.pg import PGAgent
from ray.rllib.evaluation.common_policy_evaluator import CommonPolicyEvaluator
from ray.rllib.env.serving_env import ServingEnv
from ray.rllib.test.test_common_policy_evaluator import BadPolicyGraph, \
MockPolicyGraph, MockEnv
from ray.tune.registry import register_env
@@ -7,7 +7,7 @@ from gym.envs.registration import EnvSpec
import numpy as np
import ray
from ray.rllib.agent import get_agent_class
from ray.rllib.agents.agent import get_agent_class
from ray.rllib.utils.error import UnsupportedSpaceException
from ray.tune.registry import register_env
@@ -95,7 +95,6 @@ class ModelSupportedSpaces(unittest.TestCase):
check_support(
"PPO",
{"num_workers": 1, "num_sgd_iter": 1, "timesteps_per_batch": 1,
"devices": ["/cpu:0"], "min_steps_per_task": 1,
"sgd_batchsize": 1},
stats)
check_support(
@@ -1,4 +1,12 @@
hopper-ppo:
env: Hopper-v1
run: PPO
config: {"gamma": 0.995, "kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": .0001, "sgd_batchsize": 32768, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 160000, "num_workers": 64}
config:
gamma: 0.995
kl_coeff: 1.0
num_sgd_iter: 20
sgd_stepsize: .0001
sgd_batchsize: 32768
timesteps_per_batch: 160000
num_workers: 64
num_gpus: 4
@@ -3,5 +3,17 @@ humanoid-ppo-gae:
run: PPO
stop:
episode_reward_mean: 6000
config: {"lambda": 0.95, "clip_param": 0.2, "kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": .0001, "sgd_batchsize": 32768, "horizon": 5000, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_workers": 64, "model": {"free_log_std": true}, "write_logs": false}
config:
gamma: 0.995
lambda: 0.95
clip_param: 0.2
kl_coeff: 1.0
num_sgd_iter: 20
sgd_stepsize: .0001
sgd_batchsize: 32768
horizon: 5000
timesteps_per_batch: 320000
model:
free_log_std: true
num_workers: 64
num_gpus: 4
@@ -2,5 +2,16 @@ humanoid-ppo:
env: Humanoid-v1
run: PPO
stop:
episode_reward_mean: 6000
config: {"kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": .0001, "sgd_batchsize": 32768, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_workers": 64, "model": {"free_log_std": true}, "use_gae": false}
episode_reward_mean: 6000
config:
gamma: 0.995
kl_coeff: 1.0
num_sgd_iter: 20
sgd_stepsize: .0001
sgd_batchsize: 32768
timesteps_per_batch: 320000
model:
free_log_std: true
use_gae: false
num_workers: 64
num_gpus: 4
@@ -3,7 +3,7 @@ pong-a3c-pytorch-cnn:
run: A3C
config:
num_workers: 16
batch_size: 20
sample_batch_size: 20
use_pytorch: true
vf_loss_coeff: 0.5
entropy_coeff: -0.01
@@ -5,7 +5,7 @@ pong-a3c:
run: A3C
config:
num_workers: 16
batch_size: 20
sample_batch_size: 20
use_pytorch: false
vf_loss_coeff: 0.5
entropy_coeff: -0.01
@@ -14,4 +14,4 @@ pong-deterministic-ppo:
gamma: 0.99
num_workers: 4
num_sgd_iter: 20
devices: ["/gpu:0"]
num_gpus: 1
@@ -1,4 +1,11 @@
walker2d-v1-ppo:
env: Walker2d-v1
run: PPO
config: {"kl_coeff": 1.0, "num_sgd_iter": 20, "sgd_stepsize": .0001, "sgd_batchsize": 32768, "devices": ["/gpu:0", "/gpu:1", "/gpu:2", "/gpu:3"], "tf_session_args": {"device_count": {"GPU": 4}, "log_device_placement": false, "allow_soft_placement": true}, "timesteps_per_batch": 320000, "num_workers": 64}
config:
kl_coeff: 1.0
num_sgd_iter: 20
sgd_stepsize: .0001
sgd_batchsize: 32768
timesteps_per_batch: 320000
num_workers: 64
num_gpus: 4
+9 -1
View File
@@ -1,3 +1,11 @@
from ray.rllib.utils.filter_manager import FilterManager
from ray.rllib.utils.filter import Filter
from ray.rllib.utils.policy_client import PolicyClient
from ray.rllib.utils.policy_server import PolicyServer
__all__ = ["FilterManager"]
__all__ = [
"Filter",
"FilterManager",
"PolicyClient",
"PolicyServer",
]
+1 -1
View File
@@ -13,7 +13,7 @@ except ImportError:
class PolicyClient(object):
"""Client to interact with a RLlib policy server."""
"""REST client to interact with a RLlib policy server."""
START_EPISODE = "START_EPISODE"
GET_ACTION = "GET_ACTION"
+28
View File
@@ -18,6 +18,34 @@ elif sys.version_info[0] == 3:
class PolicyServer(ThreadingMixIn, HTTPServer):
"""REST server than can be launched from a ServingEnv.
This launches a multi-threaded server that listens on the specified host
and port to serve policy requests and forward experiences to RLlib.
Examples:
>>> class CartpoleServing(ServingEnv):
def __init__(self):
ServingEnv.__init__(
self, spaces.Discrete(2),
spaces.Box(low=-10, high=10, shape=(4,)))
def run(self):
server = PolicyServer(self, "localhost", 8900)
server.serve_forever()
>>> register_env("srv", lambda _: CartpoleServing())
>>> pg = PGAgent(env="srv", config={"num_workers": 0})
>>> while True:
pg.train()
>>> client = PolicyClient("localhost:8900")
>>> eps_id = client.start_episode()
>>> action = client.get_action(eps_id, obs)
>>> ...
>>> client.log_returns(eps_id, reward)
>>> ...
>>> client.log_returns(eps_id, reward)
"""
def __init__(self, serving_env, address, port):
handler = _make_handler(serving_env)
HTTPServer.__init__(self, (address, port), handler)