MADDPG implementation in RLlib (#5348)

This commit is contained in:
Wonseok Jeon
2019-08-06 19:22:06 -04:00
committed by Eric Liang
parent 094ec7adbc
commit 281829e712
13 changed files with 736 additions and 23 deletions
+3
View File
@@ -428,6 +428,9 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/rllib/contrib/random_agent/random_agent.py
docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/rllib/examples/twostep_game.py --stop=2000 --run=contrib/MADDPG
docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/rllib/examples/twostep_game.py --stop=2000 --run=PG
+13
View File
@@ -300,6 +300,19 @@ Tuned examples: `Two-step game <https://github.com/ray-project/ray/blob/master/r
:start-after: __sphinx_doc_begin__
:end-before: __sphinx_doc_end__
Multi-Agent Actor Critic (contrib/MADDPG)
-----------------------------------------
`[paper] <https://arxiv.org/abs/1706.02275>`__ `[implementation] <https://github.com/ray-project/ray/blob/master/rllib/contrib/maddpg/maddpg.py>`__ MADDPG is a specialized multi-agent algorithm. Code here is adapted from https://github.com/openai/maddpg to integrate with RLlib multi-agent APIs. Please check `wsjeon/maddpg-rllib <https://github.com/wsjeon/maddpg-rllib>`__ for examples and more information.
**MADDPG-specific configs** (see also `common configs <rllib-training.html#common-parameters>`__):
Tuned examples: `Multi-Agent Particle Environment <https://github.com/wsjeon/maddpg-rllib/tree/master/plots>`__, `Two-step game <https://github.com/ray-project/ray/blob/master/rllib/examples/twostep_game.py>`__
.. literalinclude:: ../../rllib/contrib/maddpg/maddpg.py
:language: python
:start-after: __sphinx_doc_begin__
:end-before: __sphinx_doc_end__
Advantage Re-Weighted Imitation Learning (MARWIL)
-------------------------------------------------
+1
View File
@@ -84,6 +84,7 @@ Algorithms
* Multi-agent specific
- `QMIX Monotonic Value Factorisation (QMIX, VDN, IQN) <rllib-algorithms.html#qmix-monotonic-value-factorisation-qmix-vdn-iqn>`__
- `Multi-Agent Actor Critic (contrib/MADDPG) <rllib-algorithms.html#multi-agent-actor-critic-contrib-maddpg>`__
* Offline
+1 -1
View File
@@ -27,4 +27,4 @@ If you've found RLlib useful for your research, you can cite the [paper](https:/
Development Install
-------------------
You can develop RLlib locally without needing to compile Ray by using the [setup-dev.py](https://github.com/ray-project/ray/blob/master/python/ray/setup-dev.py) script. This sets up links between the ``rllib`` dir in your git repo and the one bundled with the ``ray`` package. When using this script, make sure that your git branch is in sync with the installed Ray binaries (i.e., you are up-to-date on `master <https://github.com/ray-project/ray>`__ and have the latest [wheel](https://ray.readthedocs.io/en/latest/installation.html) installed.)
You can develop RLlib locally without needing to compile Ray by using the [setup-dev.py](https://github.com/ray-project/ray/blob/master/python/ray/setup-dev.py) script. This sets up links between the ``rllib`` dir in your git repo and the one bundled with the ``ray`` package. When using this script, make sure that your git branch is in sync with the installed Ray binaries (i.e., you are up-to-date on [master](https://github.com/ray-project/ray) and have the latest [wheel](https://ray.readthedocs.io/en/latest/installation.html) installed.)
+29 -10
View File
@@ -7,18 +7,18 @@ import sys
# Note: do not introduce unnecessary library dependencies here, e.g. gym.
# This file is imported from the tune module in order to register RLlib agents.
from ray.tune.registry import register_trainable
from ray.rllib.evaluation.policy_graph import PolicyGraph
from ray.rllib.evaluation.tf_policy_graph import TFPolicyGraph
from ray.rllib.evaluation.rollout_worker import RolloutWorker
from ray.rllib.env.base_env import BaseEnv
from ray.rllib.env.external_env import ExternalEnv
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.rllib.env.vector_env import VectorEnv
from ray.rllib.env.external_env import ExternalEnv
from ray.rllib.evaluation.policy_graph import PolicyGraph
from ray.rllib.evaluation.rollout_worker import RolloutWorker
from ray.rllib.evaluation.tf_policy_graph import TFPolicyGraph
from ray.rllib.policy.policy import Policy
from ray.rllib.policy.tf_policy import TFPolicy
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.policy.tf_policy import TFPolicy
from ray.tune.registry import register_trainable
def _setup_logger():
@@ -38,14 +38,33 @@ def _setup_logger():
def _register_all():
from ray.rllib.agents.registry import ALGORITHMS
from ray.rllib.agents.trainer import Trainer, with_common_config
from ray.rllib.agents.registry import ALGORITHMS, get_agent_class
from ray.rllib.contrib.registry import CONTRIBUTED_ALGORITHMS
for key in list(ALGORITHMS.keys()) + list(CONTRIBUTED_ALGORITHMS.keys(
)) + ["__fake", "__sigmoid_fake_data", "__parameter_tuning"]:
from ray.rllib.agents.registry import get_agent_class
register_trainable(key, get_agent_class(key))
def _see_contrib(name):
"""Returns dummy agent class warning algo is in contrib/."""
class _SeeContrib(Trainer):
_name = "SeeContrib"
_default_config = with_common_config({})
def _setup(self, config):
raise NameError(
"Please run `contrib/{}` instead.".format(name))
return _SeeContrib
# also register the aliases minus contrib/ to give a good error message
for key in list(CONTRIBUTED_ALGORITHMS.keys()):
assert key.startswith("contrib/")
alias = key.split("/", 1)[1]
register_trainable(alias, _see_contrib(alias))
_setup_logger()
_register_all()
+4
View File
@@ -0,0 +1,4 @@
# Implementation of MADDPG in RLLib
Please check [wsjeon/maddpg-rllib](https://github.com/wsjeon/maddpg-rllib) for more information.
+7
View File
@@ -0,0 +1,7 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.rllib.contrib.maddpg.maddpg import MADDPGTrainer, DEFAULT_CONFIG
__all__ = ["MADDPGTrainer", "DEFAULT_CONFIG"]
+183
View File
@@ -0,0 +1,183 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
"""Contributed port of MADDPG from OpenAI baselines.
The implementation has a couple assumptions:
- The number of agents is fixed and known upfront.
- Each agent is bound to a policy of the same name.
- Discrete actions are sent as logits (pre-softmax).
For a minimal example, see twostep_game.py, and the README for how to run
with the multi-agent particle envs.
"""
import logging
from ray.rllib.agents.trainer import with_common_config
from ray.rllib.agents.dqn.dqn import GenericOffPolicyTrainer
from ray.rllib.contrib.maddpg.maddpg_policy import MADDPGTFPolicy
from ray.rllib.optimizers import SyncReplayOptimizer
from ray.rllib.policy.sample_batch import SampleBatch, MultiAgentBatch
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# yapf: disable
# __sphinx_doc_begin__
DEFAULT_CONFIG = with_common_config({
# === Settings for each individual policy ===
# ID of the agent controlled by this policy
"agent_id": None,
# Use a local critic for this policy.
"use_local_critic": False,
# === Evaluation ===
# Evaluation interval
"evaluation_interval": None,
# Number of episodes to run per evaluation period.
"evaluation_num_episodes": 10,
# === Model ===
# Apply a state preprocessor with spec given by the "model" config option
# (like other RL algorithms). This is mostly useful if you have a weird
# observation shape, like an image. Disabled by default.
"use_state_preprocessor": False,
# Postprocess the policy network model output with these hidden layers. If
# use_state_preprocessor is False, then these will be the *only* hidden
# layers in the network.
"actor_hiddens": [64, 64],
# Hidden layers activation of the postprocessing stage of the policy
# network
"actor_hidden_activation": "relu",
# Postprocess the critic network model output with these hidden layers;
# again, if use_state_preprocessor is True, then the state will be
# preprocessed by the model specified with the "model" config option first.
"critic_hiddens": [64, 64],
# Hidden layers activation of the postprocessing state of the critic.
"critic_hidden_activation": "relu",
# N-step Q learning
"n_step": 1,
# Algorithm for good policies
"good_policy": "maddpg",
# Algorithm for adversary policies
"adv_policy": "maddpg",
# === Replay buffer ===
# Size of the replay buffer. Note that if async_updates is set, then
# each worker will have a replay buffer of this size.
"buffer_size": int(1e6),
# Observation compression. Note that compression makes simulation slow in
# MPE.
"compress_observations": False,
# === Optimization ===
# Learning rate for the critic (Q-function) optimizer.
"critic_lr": 1e-2,
# Learning rate for the actor (policy) optimizer.
"actor_lr": 1e-2,
# Update the target network every `target_network_update_freq` steps.
"target_network_update_freq": 0,
# Update the target by \tau * policy + (1-\tau) * target_policy
"tau": 0.01,
# Weights for feature regularization for the actor
"actor_feature_reg": 0.001,
# If not None, clip gradients during optimization at this value
"grad_norm_clipping": 0.5,
# How many steps of the model to sample before learning starts.
"learning_starts": 1024 * 25,
# Update the replay buffer with this many samples at once. Note that this
# setting applies per-worker if num_workers > 1.
"sample_batch_size": 100,
# Size of a batched sampled from replay buffer for training. Note that
# if async_updates is set, then each worker returns gradients for a
# batch of this size.
"train_batch_size": 1024,
# Number of env steps to optimize for before returning
"timesteps_per_iteration": 0,
# === Parallelism ===
# Number of workers for collecting samples with. This only makes sense
# to increase if your environment is particularly slow to sample, or if
# you're using the Async or Ape-X optimizers.
"num_workers": 1,
# Prevent iterations from going lower than this time span
"min_iter_time_s": 0,
})
# __sphinx_doc_end__
# yapf: enable
def set_global_timestep(trainer):
global_timestep = trainer.optimizer.num_steps_sampled
trainer.train_start_timestep = global_timestep
def before_learn_on_batch(multi_agent_batch, policies, train_batch_size):
samples = {}
# Modify keys.
for pid, p in policies.items():
i = p.config["agent_id"]
keys = multi_agent_batch.policy_batches[pid].data.keys()
keys = ["_".join([k, str(i)]) for k in keys]
samples.update(
dict(
zip(keys,
multi_agent_batch.policy_batches[pid].data.values())))
# Make ops and feed_dict to get "new_obs" from target action sampler.
new_obs_ph_n = [p.new_obs_ph for p in policies.values()]
new_obs_n = list()
for k, v in samples.items():
if "new_obs" in k:
new_obs_n.append(v)
target_act_sampler_n = [p.target_act_sampler for p in policies.values()]
feed_dict = dict(zip(new_obs_ph_n, new_obs_n))
new_act_n = p.sess.run(target_act_sampler_n, feed_dict)
samples.update(
{"new_actions_%d" % i: new_act
for i, new_act in enumerate(new_act_n)})
# Share samples among agents.
policy_batches = {pid: SampleBatch(samples) for pid in policies.keys()}
return MultiAgentBatch(policy_batches, train_batch_size)
def make_optimizer(workers, config):
return SyncReplayOptimizer(
workers,
learning_starts=config["learning_starts"],
buffer_size=config["buffer_size"],
train_batch_size=config["train_batch_size"],
before_learn_on_batch=before_learn_on_batch,
synchronize_sampling=True,
prioritized_replay=False)
def add_trainer_metrics(trainer, result):
global_timestep = trainer.optimizer.num_steps_sampled
result.update(
timesteps_this_iter=global_timestep - trainer.train_start_timestep,
info=dict({
"num_target_updates": trainer.state["num_target_updates"],
}, **trainer.optimizer.stats()))
def collect_metrics(trainer):
result = trainer.collect_metrics()
return result
MADDPGTrainer = GenericOffPolicyTrainer.with_updates(
name="MADDPG",
default_config=DEFAULT_CONFIG,
default_policy=MADDPGTFPolicy,
before_init=None,
before_train_step=set_global_timestep,
make_policy_optimizer=make_optimizer,
after_train_result=add_trainer_metrics,
collect_metrics_fn=collect_metrics,
before_evaluate_fn=None)
+377
View File
@@ -0,0 +1,377 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import ray
from ray.rllib.agents.dqn.dqn_policy import minimize_and_clip, _adjust_nstep
from ray.rllib.evaluation.metrics import LEARNER_STATS_KEY
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.models import ModelCatalog
from ray.rllib.utils.annotations import override
from ray.rllib.utils.error import UnsupportedSpaceException
from ray.rllib.policy.policy import Policy
from ray.rllib.policy.tf_policy import TFPolicy
from ray.rllib.utils import try_import_tf
import logging
from gym.spaces import Box, Discrete
import numpy as np
logger = logging.getLogger(__name__)
tf = try_import_tf()
class MADDPGPostprocessing(object):
"""Implements agentwise termination signal and n-step learning."""
@override(Policy)
def postprocess_trajectory(self,
sample_batch,
other_agent_batches=None,
episode=None):
# FIXME: Get done from info is required since agentwise done is not
# supported now.
sample_batch.data["dones"] = self.get_done_from_info(
sample_batch.data["infos"])
# N-step Q adjustments
if self.config["n_step"] > 1:
_adjust_nstep(self.config["n_step"], self.config["gamma"],
sample_batch[SampleBatch.CUR_OBS],
sample_batch[SampleBatch.ACTIONS],
sample_batch[SampleBatch.REWARDS],
sample_batch[SampleBatch.NEXT_OBS],
sample_batch[SampleBatch.DONES])
return sample_batch
class MADDPGTFPolicy(MADDPGPostprocessing, TFPolicy):
def __init__(self, obs_space, act_space, config):
# _____ Initial Configuration
self.config = config = dict(ray.rllib.contrib.maddpg.DEFAULT_CONFIG,
**config)
self.global_step = tf.train.get_or_create_global_step()
# FIXME: Get done from info is required since agentwise done is not
# supported now.
self.get_done_from_info = np.vectorize(
lambda info: info.get("done", False))
agent_id = config["agent_id"]
if agent_id is None:
raise ValueError("Must set `agent_id` in the policy config.")
if type(agent_id) is not int:
raise ValueError("Agent ids must be integers for MADDPG.")
# _____ Environment Setting
def _make_continuous_space(space):
if isinstance(space, Box):
return space
elif isinstance(space, Discrete):
return Box(
low=np.zeros((space.n, )), high=np.ones((space.n, )))
else:
raise UnsupportedSpaceException(
"Space {} is not supported.".format(space))
obs_space_n = [
_make_continuous_space(space)
for _, (_, space, _,
_) in sorted(config["multiagent"]["policies"].items())
]
act_space_n = [
_make_continuous_space(space)
for _, (_, _, space,
_) in sorted(config["multiagent"]["policies"].items())
]
# _____ Placeholders
# Placeholders for policy evaluation and updates
def _make_ph_n(space_n, name=""):
return [
tf.placeholder(
tf.float32,
shape=(None, ) + space.shape,
name=name + "_%d" % i) for i, space in enumerate(space_n)
]
obs_ph_n = _make_ph_n(obs_space_n, "obs")
act_ph_n = _make_ph_n(act_space_n, "actions")
new_obs_ph_n = _make_ph_n(obs_space_n, "new_obs")
new_act_ph_n = _make_ph_n(act_space_n, "new_actions")
rew_ph = tf.placeholder(
tf.float32, shape=None, name="rewards_{}".format(agent_id))
done_ph = tf.placeholder(
tf.float32, shape=None, name="dones_{}".format(agent_id))
if config["use_local_critic"]:
obs_space_n, act_space_n = [obs_space_n[agent_id]], [
act_space_n[agent_id]
]
obs_ph_n, act_ph_n = [obs_ph_n[agent_id]], [act_ph_n[agent_id]]
new_obs_ph_n, new_act_ph_n = [new_obs_ph_n[agent_id]], [
new_act_ph_n[agent_id]
]
agent_id = 0
# _____ Value Network
# Build critic network for t.
critic, _, critic_model_n, critic_vars = self._build_critic_network(
obs_ph_n,
act_ph_n,
obs_space_n,
act_space_n,
hiddens=config["critic_hiddens"],
activation=getattr(tf.nn, config["critic_hidden_activation"]),
scope="critic")
# Build critic network for t + 1.
target_critic, _, _, target_critic_vars = self._build_critic_network(
new_obs_ph_n,
new_act_ph_n,
obs_space_n,
act_space_n,
hiddens=config["critic_hiddens"],
activation=getattr(tf.nn, config["critic_hidden_activation"]),
scope="target_critic")
# Build critic loss.
td_error = tf.subtract(
tf.stop_gradient(
rew_ph + (1.0 - done_ph) *
(config["gamma"]**config["n_step"]) * target_critic[:, 0]),
critic[:, 0])
critic_loss = tf.reduce_mean(td_error**2)
# _____ Policy Network
# Build actor network for t.
act_sampler, actor_feature, actor_model, actor_vars = (
self._build_actor_network(
obs_ph_n[agent_id],
obs_space_n[agent_id],
act_space_n[agent_id],
hiddens=config["actor_hiddens"],
activation=getattr(tf.nn, config["actor_hidden_activation"]),
scope="actor"))
# Build actor network for t + 1.
self.new_obs_ph = new_obs_ph_n[agent_id]
self.target_act_sampler, _, _, target_actor_vars = (
self._build_actor_network(
self.new_obs_ph,
obs_space_n[agent_id],
act_space_n[agent_id],
hiddens=config["actor_hiddens"],
activation=getattr(tf.nn, config["actor_hidden_activation"]),
scope="target_actor"))
# Build actor loss.
act_n = act_ph_n.copy()
act_n[agent_id] = act_sampler
critic, _, _, _ = self._build_critic_network(
obs_ph_n,
act_n,
obs_space_n,
act_space_n,
hiddens=config["critic_hiddens"],
activation=getattr(tf.nn, config["critic_hidden_activation"]),
scope="critic")
actor_loss = -tf.reduce_mean(critic)
if config["actor_feature_reg"] is not None:
actor_loss += config["actor_feature_reg"] * tf.reduce_mean(
actor_feature**2)
# _____ Losses
self.losses = {"critic": critic_loss, "actor": actor_loss}
# _____ Optimizers
self.optimizers = {
"critic": tf.train.AdamOptimizer(config["critic_lr"]),
"actor": tf.train.AdamOptimizer(config["actor_lr"])
}
# _____ Build variable update ops.
self.tau = tf.placeholder_with_default(
config["tau"], shape=(), name="tau")
def _make_target_update_op(vs, target_vs, tau):
return [
target_v.assign(tau * v + (1.0 - tau) * target_v)
for v, target_v in zip(vs, target_vs)
]
self.update_target_vars = _make_target_update_op(
critic_vars + actor_vars, target_critic_vars + target_actor_vars,
self.tau)
def _make_set_weight_op(variables):
vs = list()
for v in variables.values():
vs += v
phs = [
tf.placeholder(
tf.float32,
shape=v.get_shape(),
name=v.name.split(":")[0] + "_ph") for v in vs
]
return tf.group(*[v.assign(ph) for v, ph in zip(vs, phs)]), phs
self.vars = {
"critic": critic_vars,
"actor": actor_vars,
"target_critic": target_critic_vars,
"target_actor": target_actor_vars
}
self.update_vars, self.vars_ph = _make_set_weight_op(self.vars)
# _____ TensorFlow Initialization
self.sess = tf.get_default_session()
def _make_loss_inputs(placeholders):
return [(ph.name.split("/")[-1].split(":")[0], ph)
for ph in placeholders]
loss_inputs = _make_loss_inputs(obs_ph_n + act_ph_n + new_obs_ph_n +
new_act_ph_n + [rew_ph, done_ph])
TFPolicy.__init__(
self,
obs_space,
act_space,
self.sess,
obs_input=obs_ph_n[agent_id],
action_sampler=act_sampler,
loss=actor_loss + critic_loss,
loss_inputs=loss_inputs)
self.sess.run(tf.global_variables_initializer())
# Hard initial update
self.update_target(1.0)
@override(TFPolicy)
def optimizer(self):
return None
@override(TFPolicy)
def gradients(self, optimizer, loss):
if self.config["grad_norm_clipping"] is not None:
self.gvs = {
k: minimize_and_clip(optimizer, self.losses[k], self.vars[k],
self.config["grad_norm_clipping"])
for k, optimizer in self.optimizers.items()
}
else:
self.gvs = {
k: optimizer.compute_gradients(self.losses[k], self.vars[k])
for k, optimizer in self.optimizers.items()
}
return self.gvs["critic"] + self.gvs["actor"]
@override(TFPolicy)
def build_apply_op(self, optimizer, grads_and_vars):
critic_apply_op = self.optimizers["critic"].apply_gradients(
self.gvs["critic"])
with tf.control_dependencies([tf.assign_add(self.global_step, 1)]):
with tf.control_dependencies([critic_apply_op]):
actor_apply_op = self.optimizers["actor"].apply_gradients(
self.gvs["actor"])
return actor_apply_op
@override(TFPolicy)
def extra_compute_action_feed_dict(self):
return {}
@override(TFPolicy)
def extra_compute_grad_fetches(self):
return {LEARNER_STATS_KEY: {}}
@override(TFPolicy)
def get_weights(self):
var_list = []
for var in self.vars.values():
var_list += var
return self.sess.run(var_list)
@override(TFPolicy)
def set_weights(self, weights):
self.sess.run(
self.update_vars, feed_dict=dict(zip(self.vars_ph, weights)))
@override(Policy)
def get_state(self):
return TFPolicy.get_state(self)
@override(Policy)
def set_state(self, state):
TFPolicy.set_state(self, state)
def _build_critic_network(self,
obs_n,
act_n,
obs_space_n,
act_space_n,
hiddens,
activation=None,
scope=None):
with tf.variable_scope(scope, reuse=tf.AUTO_REUSE) as scope:
if self.config["use_state_preprocessor"]:
model_n = [
ModelCatalog.get_model({
"obs": obs,
"is_training": self._get_is_training_placeholder(),
}, obs_space, act_space, 1, self.config["model"])
for obs, obs_space, act_space in zip(
obs_n, obs_space_n, act_space_n)
]
out_n = [model.last_layer for model in model_n]
out = tf.concat(out_n + act_n, axis=1)
else:
model_n = [None] * len(obs_n)
out = tf.concat(obs_n + act_n, axis=1)
for hidden in hiddens:
out = tf.layers.dense(out, units=hidden, activation=activation)
feature = out
out = tf.layers.dense(feature, units=1, activation=None)
return out, feature, model_n, tf.global_variables(scope.name)
def _build_actor_network(self,
obs,
obs_space,
act_space,
hiddens,
activation=None,
scope=None):
from tensorflow.contrib.distributions import RelaxedOneHotCategorical
with tf.variable_scope(scope, reuse=tf.AUTO_REUSE) as scope:
if self.config["use_state_preprocessor"]:
model = ModelCatalog.get_model({
"obs": obs,
"is_training": self._get_is_training_placeholder(),
}, obs_space, act_space, 1, self.config["model"])
out = model.last_layer
else:
model = None
out = obs
for hidden in hiddens:
out = tf.layers.dense(out, units=hidden, activation=activation)
feature = tf.layers.dense(
out, units=act_space.shape[0], activation=None)
sampler = RelaxedOneHotCategorical(
temperature=1.0, logits=feature).sample()
return sampler, feature, model, tf.global_variables(scope.name)
def update_target(self, tau=None):
if tau is not None:
self.sess.run(self.update_target_vars, {self.tau: tau})
else:
self.sess.run(self.update_target_vars)
+6
View File
@@ -10,6 +10,12 @@ def _import_random_agent():
return RandomAgent
def _import_maddpg():
from ray.rllib.contrib import maddpg
return maddpg.MADDPGTrainer
CONTRIBUTED_ALGORITHMS = {
"contrib/RandomAgent": _import_random_agent,
"contrib/MADDPG": _import_maddpg,
}
+54 -8
View File
@@ -6,6 +6,7 @@ from __future__ import print_function
import argparse
from gym.spaces import Tuple, Discrete
import numpy as np
import ray
from ray import tune
@@ -26,14 +27,24 @@ class TwoStepGame(MultiAgentEnv):
def __init__(self, env_config):
self.state = None
self.agent_1 = 0
self.agent_2 = 1
# MADDPG emits action logits instead of actual discrete actions
self.actions_are_logits = env_config.get("actions_are_logits", False)
def reset(self):
self.state = 0
return {"agent_1": self.state, "agent_2": self.state + 3}
return {self.agent_1: self.state, self.agent_2: self.state + 3}
def step(self, action_dict):
if self.actions_are_logits:
action_dict = {
k: np.random.choice([0, 1], p=v)
for k, v in action_dict.items()
}
if self.state == 0:
action = action_dict["agent_1"]
action = action_dict[self.agent_1]
assert action in [0, 1], action
if action == 0:
self.state = 1
@@ -45,16 +56,21 @@ class TwoStepGame(MultiAgentEnv):
global_rew = 7
done = True
else:
if action_dict["agent_1"] == 0 and action_dict["agent_2"] == 0:
if action_dict[self.agent_1] == 0 and action_dict[self.
agent_2] == 0:
global_rew = 0
elif action_dict["agent_1"] == 1 and action_dict["agent_2"] == 1:
elif action_dict[self.agent_1] == 1 and action_dict[self.
agent_2] == 1:
global_rew = 8
else:
global_rew = 1
done = True
rewards = {"agent_1": global_rew / 2.0, "agent_2": global_rew / 2.0}
obs = {"agent_1": self.state, "agent_2": self.state + 3}
rewards = {
self.agent_1: global_rew / 2.0,
self.agent_2: global_rew / 2.0
}
obs = {self.agent_1: self.state, self.agent_2: self.state + 3}
dones = {"__all__": done}
infos = {}
return obs, rewards, dones, infos
@@ -64,7 +80,7 @@ if __name__ == "__main__":
args = parser.parse_args()
grouping = {
"group_1": ["agent_1", "agent_2"],
"group_1": [0, 1],
}
obs_space = Tuple([
TwoStepGame.observation_space,
@@ -79,7 +95,37 @@ if __name__ == "__main__":
lambda config: TwoStepGame(config).with_agent_groups(
grouping, obs_space=obs_space, act_space=act_space))
if args.run == "QMIX":
if args.run == "contrib/MADDPG":
obs_space_dict = {
"agent_1": TwoStepGame.observation_space,
"agent_2": TwoStepGame.observation_space,
}
act_space_dict = {
"agent_1": TwoStepGame.action_space,
"agent_2": TwoStepGame.action_space,
}
config = {
"learning_starts": 100,
"env_config": {
"actions_are_logits": True,
},
"multiagent": {
"policies": {
"pol1": (None, TwoStepGame.observation_space,
TwoStepGame.action_space, {
"agent_id": 0,
}),
"pol2": (None, TwoStepGame.observation_space,
TwoStepGame.action_space, {
"agent_id": 1,
}),
},
"policy_mapping_fn": tune.function(
lambda x: "pol1" if x == 0 else "pol2"),
},
}
group = False
elif args.run == "QMIX":
config = {
"sample_batch_size": 4,
"train_batch_size": 32,
+33
View File
@@ -68,6 +68,18 @@ class ReplayBuffer(object):
return (np.array(obses_t), np.array(actions), np.array(rewards),
np.array(obses_tp1), np.array(dones))
@DeveloperAPI
def sample_idxes(self, batch_size):
return [
random.randint(0,
len(self._storage) - 1) for _ in range(batch_size)
]
@DeveloperAPI
def sample_with_idxes(self, idxes):
self._num_sampled += len(idxes)
return self._encode_sample(idxes)
@DeveloperAPI
def sample(self, batch_size):
"""Sample a batch of experiences.
@@ -164,6 +176,27 @@ class PrioritizedReplayBuffer(ReplayBuffer):
res.append(idx)
return res
@DeveloperAPI
def sample_idxes(self, batch_size):
return self._sample_proportional(batch_size)
@DeveloperAPI
def sample_with_idxes(self, idxes, beta):
assert beta > 0
self._num_sampled += len(idxes)
weights = []
p_min = self._it_min.min() / self._it_sum.sum()
max_weight = (p_min * len(self._storage))**(-beta)
for idx in idxes:
p_sample = self._it_sum[idx] / self._it_sum.sum()
weight = (p_sample * len(self._storage))**(-beta)
weights.append(weight / max_weight)
weights = np.array(weights)
encoded_sample = self._encode_sample(idxes)
return tuple(list(encoded_sample) + [weights, idxes])
@DeveloperAPI
def sample(self, batch_size, beta):
"""Sample a batch of experiences.
+25 -4
View File
@@ -41,7 +41,9 @@ class SyncReplayOptimizer(PolicyOptimizer):
beta_annealing_fraction=0.2,
final_prioritized_replay_beta=0.4,
train_batch_size=32,
sample_batch_size=4):
sample_batch_size=4,
before_learn_on_batch=None,
synchronize_sampling=False):
"""Initialize an sync replay optimizer.
Arguments:
@@ -59,6 +61,10 @@ class SyncReplayOptimizer(PolicyOptimizer):
final_prioritized_replay_beta (float): final value of beta
train_batch_size (int): size of batches to learn on
sample_batch_size (int): size of batches to sample from workers
before_learn_on_batch (function): callback to run before passing
the sampled batch to learn on
synchronize_sampling (bool): whether to sample the experiences for
all policies with the same indices (used in MADDPG).
"""
PolicyOptimizer.__init__(self, workers)
@@ -71,6 +77,8 @@ class SyncReplayOptimizer(PolicyOptimizer):
final_p=final_prioritized_replay_beta)
self.prioritized_replay_eps = prioritized_replay_eps
self.train_batch_size = train_batch_size
self.before_learn_on_batch = before_learn_on_batch
self.synchronize_sampling = synchronize_sampling
# Stats
self.update_weights_timer = TimerStat()
@@ -154,6 +162,11 @@ class SyncReplayOptimizer(PolicyOptimizer):
samples = self._replay()
with self.grad_timer:
if self.before_learn_on_batch:
samples = self.before_learn_on_batch(
samples,
self.workers.local_worker().policy_map,
self.train_batch_size)
info_dict = self.workers.local_worker().learn_on_batch(samples)
for policy_id, info in info_dict.items():
self.learner_stats[policy_id] = get_learner_stats(info)
@@ -171,17 +184,25 @@ class SyncReplayOptimizer(PolicyOptimizer):
def _replay(self):
samples = {}
idxes = None
with self.replay_timer:
for policy_id, replay_buffer in self.replay_buffers.items():
if self.synchronize_sampling:
if idxes is None:
idxes = replay_buffer.sample_idxes(
self.train_batch_size)
else:
idxes = replay_buffer.sample_idxes(self.train_batch_size)
if isinstance(replay_buffer, PrioritizedReplayBuffer):
(obses_t, actions, rewards, obses_tp1, dones, weights,
batch_indexes) = replay_buffer.sample(
self.train_batch_size,
batch_indexes) = replay_buffer.sample_with_idxes(
idxes,
beta=self.prioritized_replay_beta.value(
self.num_steps_trained))
else:
(obses_t, actions, rewards, obses_tp1,
dones) = replay_buffer.sample(self.train_batch_size)
dones) = replay_buffer.sample_with_idxes(idxes)
weights = np.ones_like(rewards)
batch_indexes = -np.ones_like(rewards)
samples[policy_id] = SampleBatch({