From cf0894d396113fdf87da4e8b9d28fc669fca2d24 Mon Sep 17 00:00:00 2001 From: Michael Luo Date: Tue, 23 Jun 2020 09:48:23 -0700 Subject: [PATCH] [rllib] MAML Agent (#8862) * Halfway done with transferring MAML to new Ray * MAML Beta Out * Debugging MAML atm * Distributed Execution * Pendulum Mass Working * All experiments complete * Cleaned up codebase * Travis CI * Travis CI * Tests * Merged conflicts * Fixed variance bug conflict * Comment resolved * Apply suggestions from code review fixed test_maml * Update rllib/agents/maml/tests/test_maml.py * asdf * Fix testing Co-authored-by: Sven Mika --- rllib/BUILD | 8 + rllib/agents/maml/__init__.py | 6 + rllib/agents/maml/maml.py | 226 +++++++++ rllib/agents/maml/maml_tf_policy.py | 427 ++++++++++++++++++ rllib/agents/maml/tests/test_maml.py | 44 ++ rllib/agents/registry.py | 6 + rllib/dyna.yaml | 17 + rllib/examples/env/ant_rand_goal.py | 73 +++ rllib/examples/env/halfcheetah_rand_direc.py | 62 +++ rllib/examples/env/pendulum_mass.py | 28 ++ .../maml/ant-rand-goal-maml.yaml | 26 ++ .../maml/halfcheetah-rand-direc-maml.yaml | 25 + .../maml/pendulum-mass-maml.yaml | 26 ++ 13 files changed, 974 insertions(+) create mode 100644 rllib/agents/maml/__init__.py create mode 100644 rllib/agents/maml/maml.py create mode 100644 rllib/agents/maml/maml_tf_policy.py create mode 100644 rllib/agents/maml/tests/test_maml.py create mode 100644 rllib/dyna.yaml create mode 100644 rllib/examples/env/ant_rand_goal.py create mode 100644 rllib/examples/env/halfcheetah_rand_direc.py create mode 100644 rllib/examples/env/pendulum_mass.py create mode 100644 rllib/tuned_examples/maml/ant-rand-goal-maml.yaml create mode 100644 rllib/tuned_examples/maml/halfcheetah-rand-direc-maml.yaml create mode 100644 rllib/tuned_examples/maml/pendulum-mass-maml.yaml diff --git a/rllib/BUILD b/rllib/BUILD index bf57d4e68..c0e2db1b1 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -471,6 +471,14 @@ py_test( srcs = ["agents/marwil/tests/test_marwil.py"] ) +# MAMLTrainer +py_test( + name = "test_maml", + tags = ["agents_dir"], + size = "small", + srcs = ["agents/maml/tests/test_maml.py"] +) + # PGTrainer py_test( name = "test_pg", diff --git a/rllib/agents/maml/__init__.py b/rllib/agents/maml/__init__.py new file mode 100644 index 000000000..d559945b3 --- /dev/null +++ b/rllib/agents/maml/__init__.py @@ -0,0 +1,6 @@ +from ray.rllib.agents.maml.maml import MAMLTrainer, DEFAULT_CONFIG + +__all__ = [ + "MAMLTrainer", + "DEFAULT_CONFIG", +] diff --git a/rllib/agents/maml/maml.py b/rllib/agents/maml/maml.py new file mode 100644 index 000000000..77cd4a851 --- /dev/null +++ b/rllib/agents/maml/maml.py @@ -0,0 +1,226 @@ +import logging + +import numpy as np +from ray.rllib.utils.sgd import standardized +from ray.rllib.agents import with_common_config +from ray.rllib.agents.maml.maml_tf_policy import MAMLTFPolicy +from ray.rllib.agents.trainer_template import build_trainer +from typing import List +from ray.rllib.evaluation.metrics import get_learner_stats +from ray.rllib.execution.common import STEPS_SAMPLED_COUNTER, \ + STEPS_TRAINED_COUNTER, LEARNER_INFO, _get_shared_metrics +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.execution.metric_ops import CollectMetrics +from ray.util.iter import from_actors +from ray.rllib.utils.types import SampleBatchType + +logger = logging.getLogger(__name__) + +# yapf: disable +# __sphinx_doc_begin__ +DEFAULT_CONFIG = with_common_config({ + # If true, use the Generalized Advantage Estimator (GAE) + # with a value function, see https://arxiv.org/pdf/1506.02438.pdf. + "use_gae": True, + # GAE(lambda) parameter + "lambda": 1.0, + # Initial coefficient for KL divergence + "kl_coeff": 0.0005, + # Size of batches collected from each worker + "rollout_fragment_length": 200, + # Stepsize of SGD + "lr": 1e-3, + # Share layers for value function + "vf_share_layers": False, + # Coefficient of the value function loss + "vf_loss_coeff": 0.5, + # Coefficient of the entropy regularizer + "entropy_coeff": 0.0, + # PPO clip parameter + "clip_param": 0.3, + # Clip param for the value function. Note that this is sensitive to the + # scale of the rewards. If your expected V is large, increase this. + "vf_clip_param": 10.0, + # If specified, clip the global norm of gradients by this amount + "grad_clip": None, + # Target value for KL divergence + "kl_target": 0.01, + # Whether to rollout "complete_episodes" or "truncate_episodes" + "batch_mode": "complete_episodes", + # Which observation filter to apply to the observation + "observation_filter": "NoFilter", + # Number of Inner adaptation steps for the MAML algorithm + "inner_adaptation_steps": 1, + # Number of MAML steps per meta-update iteration (PPO steps) + "maml_optimizer_steps": 5, + # Inner Adaptation Step size + "inner_lr": 0.1, +}) +# __sphinx_doc_end__ +# yapf: enable + + +# @mluo: TODO +def set_worker_tasks(workers): + n_tasks = len(workers.remote_workers()) + tasks = workers.local_worker().foreach_env(lambda x: x)[0].sample_tasks( + n_tasks) + for i, worker in enumerate(workers.remote_workers()): + worker.foreach_env.remote(lambda env: env.set_task(tasks[i])) + + +class InnerAdaptationSteps: + def __init__(self, workers, inner_adaptation_steps, metric_gen): + self.workers = workers + self.n = inner_adaptation_steps + self.buffer = [] + self.split = [] + self.metrics = {} + self.metric_gen = metric_gen + + def __call__(self, samples: List[SampleBatchType]): + samples, split_lst = self.post_process_samples(samples) + self.buffer.extend(samples) + self.split.append(split_lst) + self.post_process_metrics() + if len(self.split) > self.n: + out = SampleBatch.concat_samples(self.buffer) + out["split"] = np.array(self.split) + self.buffer = [] + self.split = [] + + # Metrics Reporting + metrics = _get_shared_metrics() + metrics.counters[STEPS_SAMPLED_COUNTER] += out.count + + # Reporting Adaptation Rew Diff + ep_rew_pre = self.metrics["episode_reward_mean"] + ep_rew_post = self.metrics["episode_reward_mean_adapt_" + + str(self.n)] + self.metrics["adaptation_delta"] = ep_rew_post - ep_rew_pre + return [(out, self.metrics)] + else: + self.inner_adaptation_step(samples) + return [] + + def post_process_samples(self, samples): + split_lst = [] + for sample in samples: + sample["advantages"] = standardized(sample["advantages"]) + split_lst.append(sample.count) + return samples, split_lst + + def inner_adaptation_step(self, samples): + for i, e in enumerate(self.workers.remote_workers()): + e.learn_on_batch.remote(samples[i]) + + def post_process_metrics(self): + # Obtain Current Dataset Metrics and filter out + name = "_adapt_" + str(len(self.split) - 1) if len( + self.split) > 1 else "" + res = self.metric_gen.__call__(None) + + self.metrics["episode_reward_max" + + str(name)] = res["episode_reward_max"] + self.metrics["episode_reward_mean" + + str(name)] = res["episode_reward_mean"] + self.metrics["episode_reward_min" + + str(name)] = res["episode_reward_min"] + + +class MetaUpdate: + def __init__(self, workers, maml_steps, metric_gen): + self.workers = workers + self.maml_optimizer_steps = maml_steps + self.metric_gen = metric_gen + + def __call__(self, data_tuple): + # Metaupdate Step + samples = data_tuple[0] + adapt_metrics_dict = data_tuple[1] + for i in range(self.maml_optimizer_steps): + fetches = self.workers.local_worker().learn_on_batch(samples) + fetches = get_learner_stats(fetches) + + # Sync workers with meta policy + self.workers.sync_weights() + + # Set worker tasks + set_worker_tasks(self.workers) + + # Update KLS + def update(pi, pi_id): + assert "inner_kl" not in fetches, ( + "inner_kl should be nested under policy id key", fetches) + if pi_id in fetches: + assert "inner_kl" in fetches[pi_id], (fetches, pi_id) + pi.update_kls(fetches[pi_id]["inner_kl"]) + else: + logger.warning("No data for {}, not updating kl".format(pi_id)) + + self.workers.local_worker().foreach_trainable_policy(update) + + # Modify Reporting Metrics + metrics = _get_shared_metrics() + metrics.info[LEARNER_INFO] = fetches + metrics.counters[STEPS_TRAINED_COUNTER] += samples.count + + res = self.metric_gen.__call__(None) + res.update(adapt_metrics_dict) + + return res + + +def execution_plan(workers, config): + # Sync workers with meta policy + workers.sync_weights() + + # Samples and sets worker tasks + set_worker_tasks(workers) + + # Metric Collector + metric_collect = CollectMetrics( + workers, + min_history=config["metrics_smoothing_episodes"], + timeout_seconds=config["collect_metrics_timeout"]) + + # Iterator for Inner Adaptation Data gathering (from pre->post adaptation) + rollouts = from_actors(workers.remote_workers()) + rollouts = rollouts.batch_across_shards() + rollouts = rollouts.combine( + InnerAdaptationSteps(workers, config["inner_adaptation_steps"], + metric_collect)) + + # Metaupdate Step + train_op = rollouts.for_each( + MetaUpdate(workers, config["maml_optimizer_steps"], metric_collect)) + return train_op + + +def get_policy_class(config): + # @mluo: TODO + if config["framework"] == "torch": + raise ValueError("MAML not implemented in Pytorch yet") + return MAMLTFPolicy + + +def validate_config(config): + if config["inner_adaptation_steps"] <= 0: + raise ValueError("Inner Adaptation Steps must be >=1.") + if config["maml_optimizer_steps"] <= 0: + raise ValueError("PPO steps for meta-update needs to be >=0") + if config["entropy_coeff"] < 0: + raise ValueError("entropy_coeff must be >=0") + if config["batch_mode"] != "complete_episodes": + raise ValueError("truncate_episodes not supported") + if config["num_workers"] <= 0: + raise ValueError("Must have at least 1 worker/task.") + + +MAMLTrainer = build_trainer( + name="MAML", + default_config=DEFAULT_CONFIG, + default_policy=MAMLTFPolicy, + get_policy_class=get_policy_class, + execution_plan=execution_plan, + validate_config=validate_config) diff --git a/rllib/agents/maml/maml_tf_policy.py b/rllib/agents/maml/maml_tf_policy.py new file mode 100644 index 000000000..541cab675 --- /dev/null +++ b/rllib/agents/maml/maml_tf_policy.py @@ -0,0 +1,427 @@ +import logging + +import ray +from ray.rllib.evaluation.postprocessing import Postprocessing +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.policy.tf_policy_template import build_tf_policy +from ray.rllib.utils import try_import_tf +from ray.rllib.agents.ppo.ppo_tf_policy import postprocess_ppo_gae, \ + vf_preds_fetches, clip_gradients, setup_config, ValueNetworkMixin +from ray.rllib.utils.framework import get_activation_fn + +tf = try_import_tf() + +logger = logging.getLogger(__name__) + + +def PPOLoss(dist_class, + actions, + curr_logits, + behaviour_logits, + advantages, + value_fn, + value_targets, + vf_preds, + cur_kl_coeff, + entropy_coeff, + clip_param, + vf_clip_param, + vf_loss_coeff, + clip_loss=False): + def surrogate_loss(actions, curr_dist, prev_dist, advantages, clip_param, + clip_loss): + pi_new_logp = curr_dist.logp(actions) + pi_old_logp = prev_dist.logp(actions) + + logp_ratio = tf.exp(pi_new_logp - pi_old_logp) + if clip_loss: + return tf.minimum( + advantages * logp_ratio, + advantages * tf.clip_by_value(logp_ratio, 1 - clip_param, + 1 + clip_param)) + return advantages * logp_ratio + + def kl_loss(curr_dist, prev_dist): + return prev_dist.kl(curr_dist) + + def entropy_loss(dist): + return dist.entropy() + + def vf_loss(value_fn, value_targets, vf_preds, vf_clip_param=0.1): + # GAE Value Function Loss + vf_loss1 = tf.square(value_fn - value_targets) + vf_clipped = vf_preds + tf.clip_by_value(value_fn - vf_preds, + -vf_clip_param, vf_clip_param) + vf_loss2 = tf.square(vf_clipped - value_targets) + vf_loss = tf.maximum(vf_loss1, vf_loss2) + return vf_loss + + pi_new_dist = dist_class(curr_logits, None) + pi_old_dist = dist_class(behaviour_logits, None) + + surr_loss = tf.reduce_mean( + surrogate_loss(actions, pi_new_dist, pi_old_dist, advantages, + clip_param, clip_loss)) + kl_loss = tf.reduce_mean(kl_loss(pi_new_dist, pi_old_dist)) + vf_loss = tf.reduce_mean( + vf_loss(value_fn, value_targets, vf_preds, vf_clip_param)) + entropy_loss = tf.reduce_mean(entropy_loss(pi_new_dist)) + + total_loss = -surr_loss + cur_kl_coeff * kl_loss + total_loss += vf_loss_coeff * vf_loss - entropy_coeff * entropy_loss + return total_loss, surr_loss, kl_loss, vf_loss, entropy_loss + + +# This is the computation graph for workers (inner adaptation steps) +class WorkerLoss(object): + def __init__(self, + dist_class, + actions, + curr_logits, + behaviour_logits, + advantages, + value_fn, + value_targets, + vf_preds, + cur_kl_coeff, + entropy_coeff, + clip_param, + vf_clip_param, + vf_loss_coeff, + clip_loss=False): + self.loss, surr_loss, kl_loss, vf_loss, ent_loss = PPOLoss( + dist_class=dist_class, + actions=actions, + curr_logits=curr_logits, + behaviour_logits=behaviour_logits, + advantages=advantages, + value_fn=value_fn, + value_targets=value_targets, + vf_preds=vf_preds, + cur_kl_coeff=cur_kl_coeff, + entropy_coeff=entropy_coeff, + clip_param=clip_param, + vf_clip_param=vf_clip_param, + vf_loss_coeff=vf_loss_coeff, + clip_loss=clip_loss) + self.loss = tf.Print(self.loss, ["Worker Adapt Loss", self.loss]) + + +# This is the Meta-Update computation graph for main (meta-update step) +class MAMLLoss(object): + def __init__(self, + model, + config, + dist_class, + value_targets, + advantages, + actions, + behaviour_logits, + vf_preds, + cur_kl_coeff, + policy_vars, + obs, + num_tasks, + split, + inner_adaptation_steps=1, + entropy_coeff=0, + clip_param=0.3, + vf_clip_param=0.1, + vf_loss_coeff=1.0, + use_gae=True): + + self.config = config + self.num_tasks = num_tasks + self.inner_adaptation_steps = inner_adaptation_steps + self.clip_param = clip_param + self.dist_class = dist_class + self.cur_kl_coeff = cur_kl_coeff + + # Split episode tensors into [inner_adaptation_steps+1, num_tasks, -1] + self.obs = self.split_placeholders(obs, split) + self.actions = self.split_placeholders(actions, split) + self.behaviour_logits = self.split_placeholders( + behaviour_logits, split) + self.advantages = self.split_placeholders(advantages, split) + self.value_targets = self.split_placeholders(value_targets, split) + self.vf_preds = self.split_placeholders(vf_preds, split) + + # Construct name to tensor dictionary for easier indexing + self.policy_vars = {} + for var in policy_vars: + self.policy_vars[var.name] = var + + # Calculate pi_new for PPO + pi_new_logits, current_policy_vars, value_fns = [], [], [] + for i in range(self.num_tasks): + pi_new, value_fn = self.feed_forward( + self.obs[0][i], + self.policy_vars, + policy_config=config["model"]) + pi_new_logits.append(pi_new) + value_fns.append(value_fn) + current_policy_vars.append(self.policy_vars) + + inner_kls = [] + inner_ppo_loss = [] + + # Recompute weights for inner-adaptation (same weights as workers) + for step in range(self.inner_adaptation_steps): + kls = [] + for i in range(self.num_tasks): + # PPO Loss Function (only Surrogate) + ppo_loss, _, kl_loss, _, _ = PPOLoss( + dist_class=dist_class, + actions=self.actions[step][i], + curr_logits=pi_new_logits[i], + behaviour_logits=self.behaviour_logits[step][i], + advantages=self.advantages[step][i], + value_fn=value_fns[i], + value_targets=self.value_targets[step][i], + vf_preds=self.vf_preds[step][i], + cur_kl_coeff=0.0, + entropy_coeff=entropy_coeff, + clip_param=clip_param, + vf_clip_param=vf_clip_param, + vf_loss_coeff=vf_loss_coeff, + clip_loss=False) + adapted_policy_vars = self.compute_updated_variables( + ppo_loss, current_policy_vars[i]) + pi_new_logits[i], value_fns[i] = self.feed_forward( + self.obs[step + 1][i], + adapted_policy_vars, + policy_config=config["model"]) + current_policy_vars[i] = adapted_policy_vars + kls.append(kl_loss) + inner_ppo_loss.append(ppo_loss) + + self.kls = kls + inner_kls.append(kls) + + mean_inner_kl = tf.stack( + [tf.reduce_mean(tf.stack(inner_kl)) for inner_kl in inner_kls]) + self.mean_inner_kl = mean_inner_kl + + ppo_obj = [] + for i in range(self.num_tasks): + ppo_loss, surr_loss, kl_loss, val_loss, entropy_loss = PPOLoss( + dist_class=dist_class, + actions=self.actions[self.inner_adaptation_steps][i], + curr_logits=pi_new_logits[i], + behaviour_logits=self.behaviour_logits[ + self.inner_adaptation_steps][i], + advantages=self.advantages[self.inner_adaptation_steps][i], + value_fn=value_fns[i], + value_targets=self.value_targets[self.inner_adaptation_steps][ + i], + vf_preds=self.vf_preds[self.inner_adaptation_steps][i], + cur_kl_coeff=0.0, + entropy_coeff=entropy_coeff, + clip_param=clip_param, + vf_clip_param=vf_clip_param, + vf_loss_coeff=vf_loss_coeff, + clip_loss=True) + ppo_obj.append(ppo_loss) + self.mean_policy_loss = surr_loss + self.mean_kl = kl_loss + self.mean_vf_loss = val_loss + self.mean_entropy = entropy_loss + self.inner_kl_loss = tf.reduce_mean( + tf.multiply(self.cur_kl_coeff, mean_inner_kl)) + self.loss = tf.reduce_mean(tf.stack(ppo_obj, + axis=0)) + self.inner_kl_loss + self.loss = tf.Print( + self.loss, + ["Meta-Loss", self.loss, "Inner KL", self.mean_inner_kl]) + + def feed_forward(self, obs, policy_vars, policy_config): + # Hacky for now, reconstruct FC network with adapted weights + # @mluo: TODO for any network + def fc_network(inp, network_vars, hidden_nonlinearity, + output_nonlinearity, policy_config): + bias_added = False + x = inp + for name, param in network_vars.items(): + if "kernel" in name: + x = tf.matmul(x, param) + elif "bias" in name: + x = tf.add(x, param) + bias_added = True + else: + raise NameError + + if bias_added: + if "out" not in name: + x = hidden_nonlinearity(x) + elif "out" in name: + x = output_nonlinearity(x) + else: + raise NameError + bias_added = False + return x + + policyn_vars = {} + valuen_vars = {} + log_std = None + for name, param in policy_vars.items(): + if "value" in name: + valuen_vars[name] = param + elif "log_std" in name: + log_std = param + else: + policyn_vars[name] = param + + output_nonlinearity = tf.identity + hidden_nonlinearity = get_activation_fn( + policy_config["fcnet_activation"]) + + pi_new_logits = fc_network(obs, policyn_vars, hidden_nonlinearity, + output_nonlinearity, policy_config) + if log_std is not None: + pi_new_logits = tf.concat( + [pi_new_logits, 0.0 * pi_new_logits + log_std], 1) + value_fn = fc_network(obs, valuen_vars, hidden_nonlinearity, + output_nonlinearity, policy_config) + + return pi_new_logits, tf.reshape(value_fn, [-1]) + + def compute_updated_variables(self, loss, network_vars): + grad = tf.gradients(loss, list(network_vars.values())) + adapted_vars = {} + for i, tup in enumerate(network_vars.items()): + name, var = tup + if grad[i] is None: + adapted_vars[name] = var + else: + adapted_vars[name] = var - self.config["inner_lr"] * grad[i] + return adapted_vars + + def split_placeholders(self, placeholder, split): + inner_placeholder_list = tf.split( + placeholder, tf.math.reduce_sum(split, axis=1), axis=0) + placeholder_list = [] + for index, split_placeholder in enumerate(inner_placeholder_list): + placeholder_list.append( + tf.split(split_placeholder, split[index], axis=0)) + return placeholder_list + + +def maml_loss(policy, model, dist_class, train_batch): + logits, state = model.from_batch(train_batch) + + policy._loss_input_dict["split"] = tf.placeholder( + tf.int32, + name="Meta-Update-Splitting", + shape=(policy.config["inner_adaptation_steps"] + 1, + policy.config["num_workers"])) + policy.cur_lr = policy.config["lr"] + + if policy.config["worker_index"]: + policy.loss_obj = WorkerLoss( + dist_class=dist_class, + actions=train_batch[SampleBatch.ACTIONS], + curr_logits=logits, + behaviour_logits=train_batch[SampleBatch.ACTION_DIST_INPUTS], + advantages=train_batch[Postprocessing.ADVANTAGES], + value_fn=model.value_function(), + value_targets=train_batch[Postprocessing.VALUE_TARGETS], + vf_preds=train_batch[SampleBatch.VF_PREDS], + cur_kl_coeff=0.0, + entropy_coeff=policy.config["entropy_coeff"], + clip_param=policy.config["clip_param"], + vf_clip_param=policy.config["vf_clip_param"], + vf_loss_coeff=policy.config["vf_loss_coeff"], + clip_loss=False) + else: + policy.var_list = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, + tf.get_variable_scope().name) + policy.loss_obj = MAMLLoss( + model=model, + dist_class=dist_class, + value_targets=train_batch[Postprocessing.VALUE_TARGETS], + advantages=train_batch[Postprocessing.ADVANTAGES], + actions=train_batch[SampleBatch.ACTIONS], + behaviour_logits=train_batch[SampleBatch.ACTION_DIST_INPUTS], + vf_preds=train_batch[SampleBatch.VF_PREDS], + cur_kl_coeff=policy.kl_coeff, + policy_vars=policy.var_list, + obs=train_batch[SampleBatch.CUR_OBS], + num_tasks=policy.config["num_workers"], + split=train_batch["split"], + config=policy.config, + inner_adaptation_steps=policy.config["inner_adaptation_steps"], + entropy_coeff=policy.config["entropy_coeff"], + clip_param=policy.config["clip_param"], + vf_clip_param=policy.config["vf_clip_param"], + vf_loss_coeff=policy.config["vf_loss_coeff"], + use_gae=policy.config["use_gae"]) + + return policy.loss_obj.loss + + +def maml_stats(policy, train_batch): + if policy.config["worker_index"]: + return {"worker_loss": policy.loss_obj.loss} + else: + return { + "cur_kl_coeff": tf.cast(policy.kl_coeff, tf.float64), + "cur_lr": tf.cast(policy.cur_lr, tf.float64), + "total_loss": policy.loss_obj.loss, + "policy_loss": policy.loss_obj.mean_policy_loss, + "vf_loss": policy.loss_obj.mean_vf_loss, + "kl": policy.loss_obj.mean_kl, + "inner_kl": policy.loss_obj.mean_inner_kl, + "entropy": policy.loss_obj.mean_entropy, + } + + +class KLCoeffMixin: + def __init__(self, config): + self.kl_coeff_val = [config["kl_coeff"] + ] * config["inner_adaptation_steps"] + self.kl_target = self.config["kl_target"] + self.kl_coeff = tf.get_variable( + initializer=tf.constant_initializer(self.kl_coeff_val), + name="kl_coeff", + shape=(config["inner_adaptation_steps"]), + trainable=False, + dtype=tf.float32) + + def update_kls(self, sampled_kls): + for i, kl in enumerate(sampled_kls): + if kl < self.kl_target / 1.5: + self.kl_coeff_val[i] *= 0.5 + elif kl > 1.5 * self.kl_target: + self.kl_coeff_val[i] *= 2.0 + print(self.kl_coeff_val) + self.kl_coeff.load(self.kl_coeff_val, session=self.get_session()) + return self.kl_coeff_val + + +def maml_optimizer_fn(policy, config): + """ + Workers use simple SGD for inner adaptation + Meta-Policy uses Adam optimizer for meta-update + """ + if not config["worker_index"]: + return tf.train.AdamOptimizer(learning_rate=config["lr"]) + return tf.train.GradientDescentOptimizer(learning_rate=config["inner_lr"]) + + +def setup_mixins(policy, obs_space, action_space, config): + ValueNetworkMixin.__init__(policy, obs_space, action_space, config) + KLCoeffMixin.__init__(policy, config) + + +MAMLTFPolicy = build_tf_policy( + name="MAMLTFPolicy", + get_default_config=lambda: ray.rllib.agents.maml.maml.DEFAULT_CONFIG, + loss_fn=maml_loss, + stats_fn=maml_stats, + optimizer_fn=maml_optimizer_fn, + extra_action_fetches_fn=vf_preds_fetches, + postprocess_fn=postprocess_ppo_gae, + gradients_fn=clip_gradients, + before_init=setup_config, + before_loss_init=setup_mixins, + mixins=[KLCoeffMixin]) diff --git a/rllib/agents/maml/tests/test_maml.py b/rllib/agents/maml/tests/test_maml.py new file mode 100644 index 000000000..98b37ca78 --- /dev/null +++ b/rllib/agents/maml/tests/test_maml.py @@ -0,0 +1,44 @@ +import unittest + +import ray +import ray.rllib.agents.maml as maml +from ray.rllib.utils.framework import try_import_tf +from ray.rllib.utils.test_utils import check_compute_single_action, \ + framework_iterator + +tf = try_import_tf() + + +class TestMAML(unittest.TestCase): + @classmethod + def setUpClass(cls): + ray.init() + + @classmethod + def tearDownClass(cls): + ray.shutdown() + + def test_maml_compilation(self): + """Test whether a MAMLTrainer can be built with all frameworks.""" + config = maml.DEFAULT_CONFIG.copy() + config["num_workers"] = 1 + config["horizon"] = 200 + config["rollout_fragment_length"] = 200 + num_iterations = 1 + + # Test for tf framework (torch not implemented yet). + for _ in framework_iterator(config, frameworks=("tf")): + trainer = maml.MAMLTrainer( + config=config, + env="ray.rllib.examples.env.pendulum_mass.PendulumMassEnv") + for i in range(num_iterations): + trainer.train() + check_compute_single_action( + trainer, include_prev_action_reward=True) + trainer.stop() + + +if __name__ == "__main__": + import pytest + import sys + sys.exit(pytest.main(["-v", __file__])) diff --git a/rllib/agents/registry.py b/rllib/agents/registry.py index 1478237cd..98be4a61b 100644 --- a/rllib/agents/registry.py +++ b/rllib/agents/registry.py @@ -95,6 +95,11 @@ def _import_marwil(): return marwil.MARWILTrainer +def _import_maml(): + from ray.rllib.agents import maml + return maml.MAMLTrainer + + ALGORITHMS = { "SAC": _import_sac, "DDPG": _import_ddpg, @@ -114,6 +119,7 @@ ALGORITHMS = { "APPO": _import_appo, "DDPPO": _import_ddppo, "MARWIL": _import_marwil, + "MAML": _import_maml, } diff --git a/rllib/dyna.yaml b/rllib/dyna.yaml new file mode 100644 index 000000000..7b18a53e2 --- /dev/null +++ b/rllib/dyna.yaml @@ -0,0 +1,17 @@ +dynamics-dyna: + env: + grid_search: + - HalfCheetah-v2 + - Humanoid-v2 + - Ant-v2 + - Hopper-v2 + run: DYNA + local_dir: ~/dyna_results + stop: + training_iteration: 4000 + config: + # Works for both torch and tf. + framework: torch + rollout_fragment_length: 200 + train_batch_size: 1000 + num_workers: 1 diff --git a/rllib/examples/env/ant_rand_goal.py b/rllib/examples/env/ant_rand_goal.py new file mode 100644 index 000000000..76cd8a136 --- /dev/null +++ b/rllib/examples/env/ant_rand_goal.py @@ -0,0 +1,73 @@ +import numpy as np +import gym +from gym.envs.mujoco.mujoco_env import MujocoEnv + + +class AntRandGoalEnv(gym.utils.EzPickle, MujocoEnv): + """Ant Environment that randomizes goals as tasks + + Goals are randomly sampled 2D positions + """ + + def __init__(self): + self.set_task(self.sample_tasks(1)[0]) + MujocoEnv.__init__(self, "ant.xml", 5) + gym.utils.EzPickle.__init__(self) + + def sample_tasks(self, n_tasks): + # Samples a goal position (2x1 position ector) + a = np.random.random(n_tasks) * 2 * np.pi + r = 3 * np.random.random(n_tasks)**0.5 + return np.stack((r * np.cos(a), r * np.sin(a)), axis=-1) + + def set_task(self, task): + """ + Args: + task: task of the meta-learning environment + """ + self.goal_pos = task + + def get_task(self): + """ + Returns: + task: task of the meta-learning environment + """ + return self.goal_pos + + def step(self, a): + self.do_simulation(a, self.frame_skip) + xposafter = self.get_body_com("torso") + goal_reward = -np.sum(np.abs( + xposafter[:2] - self.goal_pos)) # make it happy, not suicidal + ctrl_cost = .1 * np.square(a).sum() + contact_cost = 0.5 * 1e-3 * np.sum( + np.square(np.clip(self.sim.data.cfrc_ext, -1, 1))) + # survive_reward = 1.0 + survive_reward = 0.0 + reward = goal_reward - ctrl_cost - contact_cost + survive_reward + # notdone = np.isfinite(state).all() and 1.0 >= state[2] >= 0. + # done = not notdone + done = False + ob = self._get_obs() + return ob, reward, done, dict( + reward_forward=goal_reward, + reward_ctrl=-ctrl_cost, + reward_contact=-contact_cost, + reward_survive=survive_reward) + + def _get_obs(self): + return np.concatenate([ + self.sim.data.qpos.flat, + self.sim.data.qvel.flat, + np.clip(self.sim.data.cfrc_ext, -1, 1).flat, + ]) + + def reset_model(self): + qpos = self.init_qpos + self.np_random.uniform( + size=self.model.nq, low=-.1, high=.1) + qvel = self.init_qvel + self.np_random.randn(self.model.nv) * .1 + self.set_state(qpos, qvel) + return self._get_obs() + + def viewer_setup(self): + self.viewer.cam.distance = self.model.stat.extent * 0.5 diff --git a/rllib/examples/env/halfcheetah_rand_direc.py b/rllib/examples/env/halfcheetah_rand_direc.py new file mode 100644 index 000000000..07c02a30e --- /dev/null +++ b/rllib/examples/env/halfcheetah_rand_direc.py @@ -0,0 +1,62 @@ +import numpy as np +import gym +from gym.envs.mujoco.mujoco_env import MujocoEnv + + +class HalfCheetahRandDirecEnv(MujocoEnv, gym.utils.EzPickle): + """HalfCheetah Environment with two diff tasks, moving forwards or backwards + + Direction is defined as a scalar: +1.0 (forwards) or -1.0 (backwards) + """ + + def __init__(self, goal_direction=None): + self.goal_direction = goal_direction if goal_direction else 1.0 + MujocoEnv.__init__(self, "half_cheetah.xml", 5) + gym.utils.EzPickle.__init__(self, goal_direction) + + def sample_tasks(self, n_tasks): + # For fwd/bwd env, goal direc is backwards if - 1.0, forwards if + 1.0 + return np.random.choice((-1.0, 1.0), (n_tasks, )) + + def set_task(self, task): + """ + Args: + task: task of the meta-learning environment + """ + self.goal_direction = task + + def get_task(self): + """ + Returns: + task: task of the meta-learning environment + """ + return self.goal_direction + + def step(self, action): + xposbefore = self.sim.data.qpos[0] + self.do_simulation(action, self.frame_skip) + xposafter = self.sim.data.qpos[0] + ob = self._get_obs() + reward_ctrl = -0.5 * 0.1 * np.square(action).sum() + reward_run = self.goal_direction * (xposafter - xposbefore) / self.dt + reward = reward_ctrl + reward_run + done = False + return ob, reward, done, dict( + reward_run=reward_run, reward_ctrl=reward_ctrl) + + def _get_obs(self): + return np.concatenate([ + self.sim.data.qpos.flat[1:], + self.sim.data.qvel.flat, + ]) + + def reset_model(self): + qpos = self.init_qpos + self.np_random.uniform( + low=-.1, high=.1, size=self.model.nq) + qvel = self.init_qvel + self.np_random.randn(self.model.nv) * .1 + self.set_state(qpos, qvel) + obs = self._get_obs() + return obs + + def viewer_setup(self): + self.viewer.cam.distance = self.model.stat.extent * 0.5 diff --git a/rllib/examples/env/pendulum_mass.py b/rllib/examples/env/pendulum_mass.py new file mode 100644 index 000000000..e29359baa --- /dev/null +++ b/rllib/examples/env/pendulum_mass.py @@ -0,0 +1,28 @@ +import numpy as np +import gym +from gym.envs.classic_control.pendulum import PendulumEnv + + +class PendulumMassEnv(PendulumEnv, gym.utils.EzPickle): + """PendulumMassEnv varies the weight of the pendulum + + Tasks are defined to be weight uniformly sampled between [0.5,2] + """ + + def sample_tasks(self, n_tasks): + # Mass is a random float between 0.5 and 2 + return np.random.uniform(low=0.5, high=2.0, size=(n_tasks, )) + + def set_task(self, task): + """ + Args: + task: task of the meta-learning environment + """ + self.m = task + + def get_task(self): + """ + Returns: + task: task of the meta-learning environment + """ + return self.m diff --git a/rllib/tuned_examples/maml/ant-rand-goal-maml.yaml b/rllib/tuned_examples/maml/ant-rand-goal-maml.yaml new file mode 100644 index 000000000..92c5bc385 --- /dev/null +++ b/rllib/tuned_examples/maml/ant-rand-goal-maml.yaml @@ -0,0 +1,26 @@ +ant-rand-goal-maml: + env: ray.rllib.examples.env.ant_rand_goal.AntRandGoalEnv + run: MAML + stop: + training_iteration: 1000 + config: + horizon: 200 + rollout_fragment_length: 200 + num_envs_per_worker: 20 + inner_adaptation_steps: 2 + maml_optimizer_steps: 5 + gamma: 0.99 + lambda: 1.0 + lr: 0.001 + vf_loss_coeff: 0.5 + clip_param: 0.3 + kl_target: 0.01 + kl_coeff: 0.0005 + num_workers: 32 + num_gpus: 1 + inner_lr: 0.03 + explore: True + clip_actions: False + model: + fcnet_hiddens: [64, 64] + free_log_std: True diff --git a/rllib/tuned_examples/maml/halfcheetah-rand-direc-maml.yaml b/rllib/tuned_examples/maml/halfcheetah-rand-direc-maml.yaml new file mode 100644 index 000000000..232aa5c80 --- /dev/null +++ b/rllib/tuned_examples/maml/halfcheetah-rand-direc-maml.yaml @@ -0,0 +1,25 @@ +halfcheetah-rand-direc-maml: + env: ray.rllib.examples.env.halfcheetah_rand_direc.HalfCheetahRandDirecEnv + run: MAML + stop: + training_iteration: 1000 + config: + horizon: 100 + rollout_fragment_length: 100 + num_envs_per_worker: 20 + inner_adaptation_steps: 1 + maml_optimizer_steps: 5 + gamma: 0.99 + lambda: 1.0 + lr: 0.001 + vf_loss_coeff: 0.5 + clip_param: 0.3 + kl_target: 0.01 + kl_coeff: 0.0005 + num_workers: 31 + num_gpus: 1 + inner_lr: 0.1 + clip_actions: False + model: + fcnet_hiddens: [64, 64] + free_log_std: True diff --git a/rllib/tuned_examples/maml/pendulum-mass-maml.yaml b/rllib/tuned_examples/maml/pendulum-mass-maml.yaml new file mode 100644 index 000000000..faf63db72 --- /dev/null +++ b/rllib/tuned_examples/maml/pendulum-mass-maml.yaml @@ -0,0 +1,26 @@ +pendulum-mass-maml: + env: ray.rllib.examples.env.pendulum_mass.PendulumMassEnv + run: MAML + stop: + training_iteration: 500 + config: + horizon: 200 + rollout_fragment_length: 200 + num_envs_per_worker: 10 + inner_adaptation_steps: 1 + maml_optimizer_steps: 5 + gamma: 0.99 + lambda: 1.0 + lr: 0.001 + vf_loss_coeff: 0.5 + clip_param: 0.3 + kl_target: 0.01 + kl_coeff: 0.001 + num_workers: 20 + num_gpus: 1 + inner_lr: 0.03 + explore: True + clip_actions: False + model: + fcnet_hiddens: [64, 64] + free_log_std: True