diff --git a/ci/jenkins_tests/run_rllib_tests.sh b/ci/jenkins_tests/run_rllib_tests.sh index c26335150..9ad5db1f2 100644 --- a/ci/jenkins_tests/run_rllib_tests.sh +++ b/ci/jenkins_tests/run_rllib_tests.sh @@ -428,6 +428,9 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output python /ray/rllib/contrib/random_agent/random_agent.py +docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ + /ray/ci/suppress_output python /ray/rllib/examples/twostep_game.py --stop=2000 --run=contrib/MADDPG + docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/ci/suppress_output python /ray/rllib/examples/twostep_game.py --stop=2000 --run=PG diff --git a/doc/source/rllib-algorithms.rst b/doc/source/rllib-algorithms.rst index ca3d19bd1..26e29bd92 100644 --- a/doc/source/rllib-algorithms.rst +++ b/doc/source/rllib-algorithms.rst @@ -300,6 +300,19 @@ Tuned examples: `Two-step game `__ `[implementation] `__ MADDPG is a specialized multi-agent algorithm. Code here is adapted from https://github.com/openai/maddpg to integrate with RLlib multi-agent APIs. Please check `wsjeon/maddpg-rllib `__ for examples and more information. + +**MADDPG-specific configs** (see also `common configs `__): + +Tuned examples: `Multi-Agent Particle Environment `__, `Two-step game `__ + +.. literalinclude:: ../../rllib/contrib/maddpg/maddpg.py + :language: python + :start-after: __sphinx_doc_begin__ + :end-before: __sphinx_doc_end__ + Advantage Re-Weighted Imitation Learning (MARWIL) ------------------------------------------------- diff --git a/doc/source/rllib.rst b/doc/source/rllib.rst index a64da784e..aa16e02d3 100644 --- a/doc/source/rllib.rst +++ b/doc/source/rllib.rst @@ -84,6 +84,7 @@ Algorithms * Multi-agent specific - `QMIX Monotonic Value Factorisation (QMIX, VDN, IQN) `__ + - `Multi-Agent Actor Critic (contrib/MADDPG) `__ * Offline diff --git a/rllib/README.md b/rllib/README.md index 667062c63..fb19cbeeb 100644 --- a/rllib/README.md +++ b/rllib/README.md @@ -27,4 +27,4 @@ If you've found RLlib useful for your research, you can cite the [paper](https:/ Development Install ------------------- -You can develop RLlib locally without needing to compile Ray by using the [setup-dev.py](https://github.com/ray-project/ray/blob/master/python/ray/setup-dev.py) script. This sets up links between the ``rllib`` dir in your git repo and the one bundled with the ``ray`` package. When using this script, make sure that your git branch is in sync with the installed Ray binaries (i.e., you are up-to-date on `master `__ and have the latest [wheel](https://ray.readthedocs.io/en/latest/installation.html) installed.) +You can develop RLlib locally without needing to compile Ray by using the [setup-dev.py](https://github.com/ray-project/ray/blob/master/python/ray/setup-dev.py) script. This sets up links between the ``rllib`` dir in your git repo and the one bundled with the ``ray`` package. When using this script, make sure that your git branch is in sync with the installed Ray binaries (i.e., you are up-to-date on [master](https://github.com/ray-project/ray) and have the latest [wheel](https://ray.readthedocs.io/en/latest/installation.html) installed.) diff --git a/rllib/__init__.py b/rllib/__init__.py index 0824e9995..a65debdc6 100644 --- a/rllib/__init__.py +++ b/rllib/__init__.py @@ -7,18 +7,18 @@ import sys # Note: do not introduce unnecessary library dependencies here, e.g. gym. # This file is imported from the tune module in order to register RLlib agents. -from ray.tune.registry import register_trainable - -from ray.rllib.evaluation.policy_graph import PolicyGraph -from ray.rllib.evaluation.tf_policy_graph import TFPolicyGraph -from ray.rllib.evaluation.rollout_worker import RolloutWorker from ray.rllib.env.base_env import BaseEnv +from ray.rllib.env.external_env import ExternalEnv from ray.rllib.env.multi_agent_env import MultiAgentEnv from ray.rllib.env.vector_env import VectorEnv -from ray.rllib.env.external_env import ExternalEnv +from ray.rllib.evaluation.policy_graph import PolicyGraph +from ray.rllib.evaluation.rollout_worker import RolloutWorker +from ray.rllib.evaluation.tf_policy_graph import TFPolicyGraph from ray.rllib.policy.policy import Policy -from ray.rllib.policy.tf_policy import TFPolicy from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.policy.tf_policy import TFPolicy + +from ray.tune.registry import register_trainable def _setup_logger(): @@ -38,14 +38,33 @@ def _setup_logger(): def _register_all(): - - from ray.rllib.agents.registry import ALGORITHMS + from ray.rllib.agents.trainer import Trainer, with_common_config + from ray.rllib.agents.registry import ALGORITHMS, get_agent_class from ray.rllib.contrib.registry import CONTRIBUTED_ALGORITHMS + for key in list(ALGORITHMS.keys()) + list(CONTRIBUTED_ALGORITHMS.keys( )) + ["__fake", "__sigmoid_fake_data", "__parameter_tuning"]: - from ray.rllib.agents.registry import get_agent_class register_trainable(key, get_agent_class(key)) + def _see_contrib(name): + """Returns dummy agent class warning algo is in contrib/.""" + + class _SeeContrib(Trainer): + _name = "SeeContrib" + _default_config = with_common_config({}) + + def _setup(self, config): + raise NameError( + "Please run `contrib/{}` instead.".format(name)) + + return _SeeContrib + + # also register the aliases minus contrib/ to give a good error message + for key in list(CONTRIBUTED_ALGORITHMS.keys()): + assert key.startswith("contrib/") + alias = key.split("/", 1)[1] + register_trainable(alias, _see_contrib(alias)) + _setup_logger() _register_all() diff --git a/rllib/contrib/maddpg/README.md b/rllib/contrib/maddpg/README.md new file mode 100644 index 000000000..636eeae64 --- /dev/null +++ b/rllib/contrib/maddpg/README.md @@ -0,0 +1,4 @@ +# Implementation of MADDPG in RLLib + +Please check [wsjeon/maddpg-rllib](https://github.com/wsjeon/maddpg-rllib) for more information. + diff --git a/rllib/contrib/maddpg/__init__.py b/rllib/contrib/maddpg/__init__.py new file mode 100644 index 000000000..90586a267 --- /dev/null +++ b/rllib/contrib/maddpg/__init__.py @@ -0,0 +1,7 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from ray.rllib.contrib.maddpg.maddpg import MADDPGTrainer, DEFAULT_CONFIG + +__all__ = ["MADDPGTrainer", "DEFAULT_CONFIG"] diff --git a/rllib/contrib/maddpg/maddpg.py b/rllib/contrib/maddpg/maddpg.py new file mode 100644 index 000000000..74f0cd2e4 --- /dev/null +++ b/rllib/contrib/maddpg/maddpg.py @@ -0,0 +1,183 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +"""Contributed port of MADDPG from OpenAI baselines. + +The implementation has a couple assumptions: +- The number of agents is fixed and known upfront. +- Each agent is bound to a policy of the same name. +- Discrete actions are sent as logits (pre-softmax). + +For a minimal example, see twostep_game.py, and the README for how to run +with the multi-agent particle envs. +""" + +import logging + +from ray.rllib.agents.trainer import with_common_config +from ray.rllib.agents.dqn.dqn import GenericOffPolicyTrainer +from ray.rllib.contrib.maddpg.maddpg_policy import MADDPGTFPolicy +from ray.rllib.optimizers import SyncReplayOptimizer +from ray.rllib.policy.sample_batch import SampleBatch, MultiAgentBatch + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +# yapf: disable +# __sphinx_doc_begin__ +DEFAULT_CONFIG = with_common_config({ + # === Settings for each individual policy === + # ID of the agent controlled by this policy + "agent_id": None, + # Use a local critic for this policy. + "use_local_critic": False, + + # === Evaluation === + # Evaluation interval + "evaluation_interval": None, + # Number of episodes to run per evaluation period. + "evaluation_num_episodes": 10, + + # === Model === + # Apply a state preprocessor with spec given by the "model" config option + # (like other RL algorithms). This is mostly useful if you have a weird + # observation shape, like an image. Disabled by default. + "use_state_preprocessor": False, + # Postprocess the policy network model output with these hidden layers. If + # use_state_preprocessor is False, then these will be the *only* hidden + # layers in the network. + "actor_hiddens": [64, 64], + # Hidden layers activation of the postprocessing stage of the policy + # network + "actor_hidden_activation": "relu", + # Postprocess the critic network model output with these hidden layers; + # again, if use_state_preprocessor is True, then the state will be + # preprocessed by the model specified with the "model" config option first. + "critic_hiddens": [64, 64], + # Hidden layers activation of the postprocessing state of the critic. + "critic_hidden_activation": "relu", + # N-step Q learning + "n_step": 1, + # Algorithm for good policies + "good_policy": "maddpg", + # Algorithm for adversary policies + "adv_policy": "maddpg", + + # === Replay buffer === + # Size of the replay buffer. Note that if async_updates is set, then + # each worker will have a replay buffer of this size. + "buffer_size": int(1e6), + # Observation compression. Note that compression makes simulation slow in + # MPE. + "compress_observations": False, + + # === Optimization === + # Learning rate for the critic (Q-function) optimizer. + "critic_lr": 1e-2, + # Learning rate for the actor (policy) optimizer. + "actor_lr": 1e-2, + # Update the target network every `target_network_update_freq` steps. + "target_network_update_freq": 0, + # Update the target by \tau * policy + (1-\tau) * target_policy + "tau": 0.01, + # Weights for feature regularization for the actor + "actor_feature_reg": 0.001, + # If not None, clip gradients during optimization at this value + "grad_norm_clipping": 0.5, + # How many steps of the model to sample before learning starts. + "learning_starts": 1024 * 25, + # Update the replay buffer with this many samples at once. Note that this + # setting applies per-worker if num_workers > 1. + "sample_batch_size": 100, + # Size of a batched sampled from replay buffer for training. Note that + # if async_updates is set, then each worker returns gradients for a + # batch of this size. + "train_batch_size": 1024, + # Number of env steps to optimize for before returning + "timesteps_per_iteration": 0, + + # === Parallelism === + # Number of workers for collecting samples with. This only makes sense + # to increase if your environment is particularly slow to sample, or if + # you're using the Async or Ape-X optimizers. + "num_workers": 1, + # Prevent iterations from going lower than this time span + "min_iter_time_s": 0, +}) +# __sphinx_doc_end__ +# yapf: enable + + +def set_global_timestep(trainer): + global_timestep = trainer.optimizer.num_steps_sampled + trainer.train_start_timestep = global_timestep + + +def before_learn_on_batch(multi_agent_batch, policies, train_batch_size): + samples = {} + + # Modify keys. + for pid, p in policies.items(): + i = p.config["agent_id"] + keys = multi_agent_batch.policy_batches[pid].data.keys() + keys = ["_".join([k, str(i)]) for k in keys] + samples.update( + dict( + zip(keys, + multi_agent_batch.policy_batches[pid].data.values()))) + + # Make ops and feed_dict to get "new_obs" from target action sampler. + new_obs_ph_n = [p.new_obs_ph for p in policies.values()] + new_obs_n = list() + for k, v in samples.items(): + if "new_obs" in k: + new_obs_n.append(v) + + target_act_sampler_n = [p.target_act_sampler for p in policies.values()] + feed_dict = dict(zip(new_obs_ph_n, new_obs_n)) + + new_act_n = p.sess.run(target_act_sampler_n, feed_dict) + samples.update( + {"new_actions_%d" % i: new_act + for i, new_act in enumerate(new_act_n)}) + + # Share samples among agents. + policy_batches = {pid: SampleBatch(samples) for pid in policies.keys()} + return MultiAgentBatch(policy_batches, train_batch_size) + + +def make_optimizer(workers, config): + return SyncReplayOptimizer( + workers, + learning_starts=config["learning_starts"], + buffer_size=config["buffer_size"], + train_batch_size=config["train_batch_size"], + before_learn_on_batch=before_learn_on_batch, + synchronize_sampling=True, + prioritized_replay=False) + + +def add_trainer_metrics(trainer, result): + global_timestep = trainer.optimizer.num_steps_sampled + result.update( + timesteps_this_iter=global_timestep - trainer.train_start_timestep, + info=dict({ + "num_target_updates": trainer.state["num_target_updates"], + }, **trainer.optimizer.stats())) + + +def collect_metrics(trainer): + result = trainer.collect_metrics() + return result + + +MADDPGTrainer = GenericOffPolicyTrainer.with_updates( + name="MADDPG", + default_config=DEFAULT_CONFIG, + default_policy=MADDPGTFPolicy, + before_init=None, + before_train_step=set_global_timestep, + make_policy_optimizer=make_optimizer, + after_train_result=add_trainer_metrics, + collect_metrics_fn=collect_metrics, + before_evaluate_fn=None) diff --git a/rllib/contrib/maddpg/maddpg_policy.py b/rllib/contrib/maddpg/maddpg_policy.py new file mode 100644 index 000000000..bae7c1d7d --- /dev/null +++ b/rllib/contrib/maddpg/maddpg_policy.py @@ -0,0 +1,377 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import ray +from ray.rllib.agents.dqn.dqn_policy import minimize_and_clip, _adjust_nstep +from ray.rllib.evaluation.metrics import LEARNER_STATS_KEY +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.models import ModelCatalog +from ray.rllib.utils.annotations import override +from ray.rllib.utils.error import UnsupportedSpaceException +from ray.rllib.policy.policy import Policy +from ray.rllib.policy.tf_policy import TFPolicy +from ray.rllib.utils import try_import_tf + +import logging +from gym.spaces import Box, Discrete +import numpy as np + +logger = logging.getLogger(__name__) + +tf = try_import_tf() + + +class MADDPGPostprocessing(object): + """Implements agentwise termination signal and n-step learning.""" + + @override(Policy) + def postprocess_trajectory(self, + sample_batch, + other_agent_batches=None, + episode=None): + # FIXME: Get done from info is required since agentwise done is not + # supported now. + sample_batch.data["dones"] = self.get_done_from_info( + sample_batch.data["infos"]) + + # N-step Q adjustments + if self.config["n_step"] > 1: + _adjust_nstep(self.config["n_step"], self.config["gamma"], + sample_batch[SampleBatch.CUR_OBS], + sample_batch[SampleBatch.ACTIONS], + sample_batch[SampleBatch.REWARDS], + sample_batch[SampleBatch.NEXT_OBS], + sample_batch[SampleBatch.DONES]) + + return sample_batch + + +class MADDPGTFPolicy(MADDPGPostprocessing, TFPolicy): + def __init__(self, obs_space, act_space, config): + # _____ Initial Configuration + self.config = config = dict(ray.rllib.contrib.maddpg.DEFAULT_CONFIG, + **config) + self.global_step = tf.train.get_or_create_global_step() + + # FIXME: Get done from info is required since agentwise done is not + # supported now. + self.get_done_from_info = np.vectorize( + lambda info: info.get("done", False)) + + agent_id = config["agent_id"] + if agent_id is None: + raise ValueError("Must set `agent_id` in the policy config.") + if type(agent_id) is not int: + raise ValueError("Agent ids must be integers for MADDPG.") + + # _____ Environment Setting + def _make_continuous_space(space): + if isinstance(space, Box): + return space + elif isinstance(space, Discrete): + return Box( + low=np.zeros((space.n, )), high=np.ones((space.n, ))) + else: + raise UnsupportedSpaceException( + "Space {} is not supported.".format(space)) + + obs_space_n = [ + _make_continuous_space(space) + for _, (_, space, _, + _) in sorted(config["multiagent"]["policies"].items()) + ] + act_space_n = [ + _make_continuous_space(space) + for _, (_, _, space, + _) in sorted(config["multiagent"]["policies"].items()) + ] + + # _____ Placeholders + # Placeholders for policy evaluation and updates + def _make_ph_n(space_n, name=""): + return [ + tf.placeholder( + tf.float32, + shape=(None, ) + space.shape, + name=name + "_%d" % i) for i, space in enumerate(space_n) + ] + + obs_ph_n = _make_ph_n(obs_space_n, "obs") + act_ph_n = _make_ph_n(act_space_n, "actions") + new_obs_ph_n = _make_ph_n(obs_space_n, "new_obs") + new_act_ph_n = _make_ph_n(act_space_n, "new_actions") + rew_ph = tf.placeholder( + tf.float32, shape=None, name="rewards_{}".format(agent_id)) + done_ph = tf.placeholder( + tf.float32, shape=None, name="dones_{}".format(agent_id)) + + if config["use_local_critic"]: + obs_space_n, act_space_n = [obs_space_n[agent_id]], [ + act_space_n[agent_id] + ] + obs_ph_n, act_ph_n = [obs_ph_n[agent_id]], [act_ph_n[agent_id]] + new_obs_ph_n, new_act_ph_n = [new_obs_ph_n[agent_id]], [ + new_act_ph_n[agent_id] + ] + agent_id = 0 + + # _____ Value Network + # Build critic network for t. + critic, _, critic_model_n, critic_vars = self._build_critic_network( + obs_ph_n, + act_ph_n, + obs_space_n, + act_space_n, + hiddens=config["critic_hiddens"], + activation=getattr(tf.nn, config["critic_hidden_activation"]), + scope="critic") + + # Build critic network for t + 1. + target_critic, _, _, target_critic_vars = self._build_critic_network( + new_obs_ph_n, + new_act_ph_n, + obs_space_n, + act_space_n, + hiddens=config["critic_hiddens"], + activation=getattr(tf.nn, config["critic_hidden_activation"]), + scope="target_critic") + + # Build critic loss. + td_error = tf.subtract( + tf.stop_gradient( + rew_ph + (1.0 - done_ph) * + (config["gamma"]**config["n_step"]) * target_critic[:, 0]), + critic[:, 0]) + critic_loss = tf.reduce_mean(td_error**2) + + # _____ Policy Network + # Build actor network for t. + act_sampler, actor_feature, actor_model, actor_vars = ( + self._build_actor_network( + obs_ph_n[agent_id], + obs_space_n[agent_id], + act_space_n[agent_id], + hiddens=config["actor_hiddens"], + activation=getattr(tf.nn, config["actor_hidden_activation"]), + scope="actor")) + + # Build actor network for t + 1. + self.new_obs_ph = new_obs_ph_n[agent_id] + self.target_act_sampler, _, _, target_actor_vars = ( + self._build_actor_network( + self.new_obs_ph, + obs_space_n[agent_id], + act_space_n[agent_id], + hiddens=config["actor_hiddens"], + activation=getattr(tf.nn, config["actor_hidden_activation"]), + scope="target_actor")) + + # Build actor loss. + act_n = act_ph_n.copy() + act_n[agent_id] = act_sampler + critic, _, _, _ = self._build_critic_network( + obs_ph_n, + act_n, + obs_space_n, + act_space_n, + hiddens=config["critic_hiddens"], + activation=getattr(tf.nn, config["critic_hidden_activation"]), + scope="critic") + actor_loss = -tf.reduce_mean(critic) + if config["actor_feature_reg"] is not None: + actor_loss += config["actor_feature_reg"] * tf.reduce_mean( + actor_feature**2) + + # _____ Losses + self.losses = {"critic": critic_loss, "actor": actor_loss} + + # _____ Optimizers + self.optimizers = { + "critic": tf.train.AdamOptimizer(config["critic_lr"]), + "actor": tf.train.AdamOptimizer(config["actor_lr"]) + } + + # _____ Build variable update ops. + self.tau = tf.placeholder_with_default( + config["tau"], shape=(), name="tau") + + def _make_target_update_op(vs, target_vs, tau): + return [ + target_v.assign(tau * v + (1.0 - tau) * target_v) + for v, target_v in zip(vs, target_vs) + ] + + self.update_target_vars = _make_target_update_op( + critic_vars + actor_vars, target_critic_vars + target_actor_vars, + self.tau) + + def _make_set_weight_op(variables): + vs = list() + for v in variables.values(): + vs += v + phs = [ + tf.placeholder( + tf.float32, + shape=v.get_shape(), + name=v.name.split(":")[0] + "_ph") for v in vs + ] + return tf.group(*[v.assign(ph) for v, ph in zip(vs, phs)]), phs + + self.vars = { + "critic": critic_vars, + "actor": actor_vars, + "target_critic": target_critic_vars, + "target_actor": target_actor_vars + } + self.update_vars, self.vars_ph = _make_set_weight_op(self.vars) + + # _____ TensorFlow Initialization + + self.sess = tf.get_default_session() + + def _make_loss_inputs(placeholders): + return [(ph.name.split("/")[-1].split(":")[0], ph) + for ph in placeholders] + + loss_inputs = _make_loss_inputs(obs_ph_n + act_ph_n + new_obs_ph_n + + new_act_ph_n + [rew_ph, done_ph]) + + TFPolicy.__init__( + self, + obs_space, + act_space, + self.sess, + obs_input=obs_ph_n[agent_id], + action_sampler=act_sampler, + loss=actor_loss + critic_loss, + loss_inputs=loss_inputs) + + self.sess.run(tf.global_variables_initializer()) + + # Hard initial update + self.update_target(1.0) + + @override(TFPolicy) + def optimizer(self): + return None + + @override(TFPolicy) + def gradients(self, optimizer, loss): + if self.config["grad_norm_clipping"] is not None: + self.gvs = { + k: minimize_and_clip(optimizer, self.losses[k], self.vars[k], + self.config["grad_norm_clipping"]) + for k, optimizer in self.optimizers.items() + } + else: + self.gvs = { + k: optimizer.compute_gradients(self.losses[k], self.vars[k]) + for k, optimizer in self.optimizers.items() + } + return self.gvs["critic"] + self.gvs["actor"] + + @override(TFPolicy) + def build_apply_op(self, optimizer, grads_and_vars): + critic_apply_op = self.optimizers["critic"].apply_gradients( + self.gvs["critic"]) + + with tf.control_dependencies([tf.assign_add(self.global_step, 1)]): + with tf.control_dependencies([critic_apply_op]): + actor_apply_op = self.optimizers["actor"].apply_gradients( + self.gvs["actor"]) + + return actor_apply_op + + @override(TFPolicy) + def extra_compute_action_feed_dict(self): + return {} + + @override(TFPolicy) + def extra_compute_grad_fetches(self): + return {LEARNER_STATS_KEY: {}} + + @override(TFPolicy) + def get_weights(self): + var_list = [] + for var in self.vars.values(): + var_list += var + return self.sess.run(var_list) + + @override(TFPolicy) + def set_weights(self, weights): + self.sess.run( + self.update_vars, feed_dict=dict(zip(self.vars_ph, weights))) + + @override(Policy) + def get_state(self): + return TFPolicy.get_state(self) + + @override(Policy) + def set_state(self, state): + TFPolicy.set_state(self, state) + + def _build_critic_network(self, + obs_n, + act_n, + obs_space_n, + act_space_n, + hiddens, + activation=None, + scope=None): + with tf.variable_scope(scope, reuse=tf.AUTO_REUSE) as scope: + if self.config["use_state_preprocessor"]: + model_n = [ + ModelCatalog.get_model({ + "obs": obs, + "is_training": self._get_is_training_placeholder(), + }, obs_space, act_space, 1, self.config["model"]) + for obs, obs_space, act_space in zip( + obs_n, obs_space_n, act_space_n) + ] + out_n = [model.last_layer for model in model_n] + out = tf.concat(out_n + act_n, axis=1) + else: + model_n = [None] * len(obs_n) + out = tf.concat(obs_n + act_n, axis=1) + + for hidden in hiddens: + out = tf.layers.dense(out, units=hidden, activation=activation) + feature = out + out = tf.layers.dense(feature, units=1, activation=None) + + return out, feature, model_n, tf.global_variables(scope.name) + + def _build_actor_network(self, + obs, + obs_space, + act_space, + hiddens, + activation=None, + scope=None): + from tensorflow.contrib.distributions import RelaxedOneHotCategorical + with tf.variable_scope(scope, reuse=tf.AUTO_REUSE) as scope: + if self.config["use_state_preprocessor"]: + model = ModelCatalog.get_model({ + "obs": obs, + "is_training": self._get_is_training_placeholder(), + }, obs_space, act_space, 1, self.config["model"]) + out = model.last_layer + else: + model = None + out = obs + + for hidden in hiddens: + out = tf.layers.dense(out, units=hidden, activation=activation) + feature = tf.layers.dense( + out, units=act_space.shape[0], activation=None) + sampler = RelaxedOneHotCategorical( + temperature=1.0, logits=feature).sample() + + return sampler, feature, model, tf.global_variables(scope.name) + + def update_target(self, tau=None): + if tau is not None: + self.sess.run(self.update_target_vars, {self.tau: tau}) + else: + self.sess.run(self.update_target_vars) diff --git a/rllib/contrib/registry.py b/rllib/contrib/registry.py index 650a4429d..340cd701c 100644 --- a/rllib/contrib/registry.py +++ b/rllib/contrib/registry.py @@ -10,6 +10,12 @@ def _import_random_agent(): return RandomAgent +def _import_maddpg(): + from ray.rllib.contrib import maddpg + return maddpg.MADDPGTrainer + + CONTRIBUTED_ALGORITHMS = { "contrib/RandomAgent": _import_random_agent, + "contrib/MADDPG": _import_maddpg, } diff --git a/rllib/examples/twostep_game.py b/rllib/examples/twostep_game.py index 577e834e5..b0f530919 100644 --- a/rllib/examples/twostep_game.py +++ b/rllib/examples/twostep_game.py @@ -6,6 +6,7 @@ from __future__ import print_function import argparse from gym.spaces import Tuple, Discrete +import numpy as np import ray from ray import tune @@ -26,14 +27,24 @@ class TwoStepGame(MultiAgentEnv): def __init__(self, env_config): self.state = None + self.agent_1 = 0 + self.agent_2 = 1 + # MADDPG emits action logits instead of actual discrete actions + self.actions_are_logits = env_config.get("actions_are_logits", False) def reset(self): self.state = 0 - return {"agent_1": self.state, "agent_2": self.state + 3} + return {self.agent_1: self.state, self.agent_2: self.state + 3} def step(self, action_dict): + if self.actions_are_logits: + action_dict = { + k: np.random.choice([0, 1], p=v) + for k, v in action_dict.items() + } + if self.state == 0: - action = action_dict["agent_1"] + action = action_dict[self.agent_1] assert action in [0, 1], action if action == 0: self.state = 1 @@ -45,16 +56,21 @@ class TwoStepGame(MultiAgentEnv): global_rew = 7 done = True else: - if action_dict["agent_1"] == 0 and action_dict["agent_2"] == 0: + if action_dict[self.agent_1] == 0 and action_dict[self. + agent_2] == 0: global_rew = 0 - elif action_dict["agent_1"] == 1 and action_dict["agent_2"] == 1: + elif action_dict[self.agent_1] == 1 and action_dict[self. + agent_2] == 1: global_rew = 8 else: global_rew = 1 done = True - rewards = {"agent_1": global_rew / 2.0, "agent_2": global_rew / 2.0} - obs = {"agent_1": self.state, "agent_2": self.state + 3} + rewards = { + self.agent_1: global_rew / 2.0, + self.agent_2: global_rew / 2.0 + } + obs = {self.agent_1: self.state, self.agent_2: self.state + 3} dones = {"__all__": done} infos = {} return obs, rewards, dones, infos @@ -64,7 +80,7 @@ if __name__ == "__main__": args = parser.parse_args() grouping = { - "group_1": ["agent_1", "agent_2"], + "group_1": [0, 1], } obs_space = Tuple([ TwoStepGame.observation_space, @@ -79,7 +95,37 @@ if __name__ == "__main__": lambda config: TwoStepGame(config).with_agent_groups( grouping, obs_space=obs_space, act_space=act_space)) - if args.run == "QMIX": + if args.run == "contrib/MADDPG": + obs_space_dict = { + "agent_1": TwoStepGame.observation_space, + "agent_2": TwoStepGame.observation_space, + } + act_space_dict = { + "agent_1": TwoStepGame.action_space, + "agent_2": TwoStepGame.action_space, + } + config = { + "learning_starts": 100, + "env_config": { + "actions_are_logits": True, + }, + "multiagent": { + "policies": { + "pol1": (None, TwoStepGame.observation_space, + TwoStepGame.action_space, { + "agent_id": 0, + }), + "pol2": (None, TwoStepGame.observation_space, + TwoStepGame.action_space, { + "agent_id": 1, + }), + }, + "policy_mapping_fn": tune.function( + lambda x: "pol1" if x == 0 else "pol2"), + }, + } + group = False + elif args.run == "QMIX": config = { "sample_batch_size": 4, "train_batch_size": 32, diff --git a/rllib/optimizers/replay_buffer.py b/rllib/optimizers/replay_buffer.py index 273e4cd85..1012a5b76 100644 --- a/rllib/optimizers/replay_buffer.py +++ b/rllib/optimizers/replay_buffer.py @@ -68,6 +68,18 @@ class ReplayBuffer(object): return (np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)) + @DeveloperAPI + def sample_idxes(self, batch_size): + return [ + random.randint(0, + len(self._storage) - 1) for _ in range(batch_size) + ] + + @DeveloperAPI + def sample_with_idxes(self, idxes): + self._num_sampled += len(idxes) + return self._encode_sample(idxes) + @DeveloperAPI def sample(self, batch_size): """Sample a batch of experiences. @@ -164,6 +176,27 @@ class PrioritizedReplayBuffer(ReplayBuffer): res.append(idx) return res + @DeveloperAPI + def sample_idxes(self, batch_size): + return self._sample_proportional(batch_size) + + @DeveloperAPI + def sample_with_idxes(self, idxes, beta): + assert beta > 0 + self._num_sampled += len(idxes) + + weights = [] + p_min = self._it_min.min() / self._it_sum.sum() + max_weight = (p_min * len(self._storage))**(-beta) + + for idx in idxes: + p_sample = self._it_sum[idx] / self._it_sum.sum() + weight = (p_sample * len(self._storage))**(-beta) + weights.append(weight / max_weight) + weights = np.array(weights) + encoded_sample = self._encode_sample(idxes) + return tuple(list(encoded_sample) + [weights, idxes]) + @DeveloperAPI def sample(self, batch_size, beta): """Sample a batch of experiences. diff --git a/rllib/optimizers/sync_replay_optimizer.py b/rllib/optimizers/sync_replay_optimizer.py index 9cb2eb746..bbd3e4f49 100644 --- a/rllib/optimizers/sync_replay_optimizer.py +++ b/rllib/optimizers/sync_replay_optimizer.py @@ -41,7 +41,9 @@ class SyncReplayOptimizer(PolicyOptimizer): beta_annealing_fraction=0.2, final_prioritized_replay_beta=0.4, train_batch_size=32, - sample_batch_size=4): + sample_batch_size=4, + before_learn_on_batch=None, + synchronize_sampling=False): """Initialize an sync replay optimizer. Arguments: @@ -59,6 +61,10 @@ class SyncReplayOptimizer(PolicyOptimizer): final_prioritized_replay_beta (float): final value of beta train_batch_size (int): size of batches to learn on sample_batch_size (int): size of batches to sample from workers + before_learn_on_batch (function): callback to run before passing + the sampled batch to learn on + synchronize_sampling (bool): whether to sample the experiences for + all policies with the same indices (used in MADDPG). """ PolicyOptimizer.__init__(self, workers) @@ -71,6 +77,8 @@ class SyncReplayOptimizer(PolicyOptimizer): final_p=final_prioritized_replay_beta) self.prioritized_replay_eps = prioritized_replay_eps self.train_batch_size = train_batch_size + self.before_learn_on_batch = before_learn_on_batch + self.synchronize_sampling = synchronize_sampling # Stats self.update_weights_timer = TimerStat() @@ -154,6 +162,11 @@ class SyncReplayOptimizer(PolicyOptimizer): samples = self._replay() with self.grad_timer: + if self.before_learn_on_batch: + samples = self.before_learn_on_batch( + samples, + self.workers.local_worker().policy_map, + self.train_batch_size) info_dict = self.workers.local_worker().learn_on_batch(samples) for policy_id, info in info_dict.items(): self.learner_stats[policy_id] = get_learner_stats(info) @@ -171,17 +184,25 @@ class SyncReplayOptimizer(PolicyOptimizer): def _replay(self): samples = {} + idxes = None with self.replay_timer: for policy_id, replay_buffer in self.replay_buffers.items(): + if self.synchronize_sampling: + if idxes is None: + idxes = replay_buffer.sample_idxes( + self.train_batch_size) + else: + idxes = replay_buffer.sample_idxes(self.train_batch_size) + if isinstance(replay_buffer, PrioritizedReplayBuffer): (obses_t, actions, rewards, obses_tp1, dones, weights, - batch_indexes) = replay_buffer.sample( - self.train_batch_size, + batch_indexes) = replay_buffer.sample_with_idxes( + idxes, beta=self.prioritized_replay_beta.value( self.num_steps_trained)) else: (obses_t, actions, rewards, obses_tp1, - dones) = replay_buffer.sample(self.train_batch_size) + dones) = replay_buffer.sample_with_idxes(idxes) weights = np.ones_like(rewards) batch_indexes = -np.ones_like(rewards) samples[policy_id] = SampleBatch({