mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 22:53:20 +08:00
[rllib] example and docs on how to use parametric actions with DQN / PG algorithms (#3384)
This commit is contained in:
@@ -30,16 +30,21 @@ class QNetwork(object):
|
||||
sigma0=0.5):
|
||||
self.model = model
|
||||
with tf.variable_scope("action_value"):
|
||||
action_out = model.last_layer
|
||||
for i in range(len(hiddens)):
|
||||
if use_noisy:
|
||||
action_out = self.noisy_layer("hidden_%d" % i, action_out,
|
||||
hiddens[i], sigma0)
|
||||
else:
|
||||
action_out = layers.fully_connected(
|
||||
action_out,
|
||||
num_outputs=hiddens[i],
|
||||
activation_fn=tf.nn.relu)
|
||||
if hiddens:
|
||||
action_out = model.last_layer
|
||||
for i in range(len(hiddens)):
|
||||
if use_noisy:
|
||||
action_out = self.noisy_layer(
|
||||
"hidden_%d" % i, action_out, hiddens[i], sigma0)
|
||||
else:
|
||||
action_out = layers.fully_connected(
|
||||
action_out,
|
||||
num_outputs=hiddens[i],
|
||||
activation_fn=tf.nn.relu)
|
||||
else:
|
||||
# Avoid postprocessing the outputs. This enables custom models
|
||||
# to be used for parametric action DQN.
|
||||
action_out = model.outputs
|
||||
if use_noisy:
|
||||
action_scores = self.noisy_layer(
|
||||
"output",
|
||||
@@ -47,11 +52,13 @@ class QNetwork(object):
|
||||
num_actions * num_atoms,
|
||||
sigma0,
|
||||
non_linear=False)
|
||||
else:
|
||||
elif hiddens:
|
||||
action_scores = layers.fully_connected(
|
||||
action_out,
|
||||
num_outputs=num_actions * num_atoms,
|
||||
activation_fn=None)
|
||||
else:
|
||||
action_scores = model.outputs
|
||||
if num_atoms > 1:
|
||||
# Distributional Q-learning uses a discrete support z
|
||||
# to represent the action value distribution
|
||||
@@ -107,7 +114,7 @@ class QNetwork(object):
|
||||
self.logits = support_logits_per_action
|
||||
self.dist = support_prob_per_action
|
||||
else:
|
||||
action_scores_mean = tf.reduce_mean(action_scores, 1)
|
||||
action_scores_mean = _reduce_mean_ignore_inf(action_scores, 1)
|
||||
action_scores_centered = action_scores - tf.expand_dims(
|
||||
action_scores_mean, 1)
|
||||
self.value = state_score + action_scores_centered
|
||||
@@ -176,11 +183,15 @@ class QValuePolicy(object):
|
||||
def __init__(self, q_values, observations, num_actions, stochastic, eps):
|
||||
deterministic_actions = tf.argmax(q_values, axis=1)
|
||||
batch_size = tf.shape(observations)[0]
|
||||
random_actions = tf.random_uniform(
|
||||
tf.stack([batch_size]),
|
||||
minval=0,
|
||||
maxval=num_actions,
|
||||
dtype=tf.int64)
|
||||
|
||||
# Special case masked out actions (q_value ~= -inf) so that we don't
|
||||
# even consider them for exploration.
|
||||
random_valid_action_logits = tf.where(
|
||||
tf.equal(q_values, tf.float32.min),
|
||||
tf.ones_like(q_values) * tf.float32.min, tf.ones_like(q_values))
|
||||
random_actions = tf.squeeze(
|
||||
tf.multinomial(random_valid_action_logits, 1), axis=1)
|
||||
|
||||
chose_random = tf.random_uniform(
|
||||
tf.stack([batch_size]), minval=0, maxval=1, dtype=tf.float32) < eps
|
||||
stochastic_actions = tf.where(chose_random, random_actions,
|
||||
@@ -368,8 +379,8 @@ class DQNPolicyGraph(TFPolicyGraph):
|
||||
qnet = QNetwork(
|
||||
ModelCatalog.get_model({
|
||||
"obs": obs
|
||||
}, space, 1, self.config["model"]), self.num_actions,
|
||||
self.config["dueling"], self.config["hiddens"],
|
||||
}, space, self.num_actions, self.config["model"]),
|
||||
self.num_actions, self.config["dueling"], self.config["hiddens"],
|
||||
self.config["noisy"], self.config["num_atoms"],
|
||||
self.config["v_min"], self.config["v_max"], self.config["sigma0"])
|
||||
return qnet.value, qnet.logits, qnet.dist, qnet.model
|
||||
@@ -507,6 +518,14 @@ def _postprocess_dqn(policy_graph, sample_batch):
|
||||
return batch
|
||||
|
||||
|
||||
def _reduce_mean_ignore_inf(x, axis):
|
||||
"""Same as tf.reduce_mean() but ignores -inf values."""
|
||||
mask = tf.not_equal(x, tf.float32.min)
|
||||
x_zeroed = tf.where(mask, x, tf.zeros_like(x))
|
||||
return (tf.reduce_sum(x_zeroed, axis) / tf.reduce_sum(
|
||||
tf.cast(mask, tf.float32), axis))
|
||||
|
||||
|
||||
def _huber_loss(x, delta=1.0):
|
||||
"""Reference: https://en.wikipedia.org/wiki/Huber_loss"""
|
||||
return tf.where(
|
||||
|
||||
@@ -110,6 +110,11 @@ class PPOAgent(Agent):
|
||||
and not self.config["simple_optimizer"]):
|
||||
logger.warn("forcing simple_optimizer=True in multi-agent mode")
|
||||
self.config["simple_optimizer"] = True
|
||||
if self.config["observation_filter"] != "NoFilter":
|
||||
# TODO(ekl): consider setting the default to be NoFilter
|
||||
logger.warn(
|
||||
"By default, observations will be normalized with {}".format(
|
||||
self.config["observation_filter"]))
|
||||
|
||||
def _train(self):
|
||||
prev_steps = self.optimizer.num_steps_sampled
|
||||
|
||||
@@ -0,0 +1,196 @@
|
||||
"""Example of handling variable length and/or parametric action spaces.
|
||||
|
||||
This is a toy example of the action-embedding based approach for handling large
|
||||
discrete action spaces (potentially infinite in size), similar to how
|
||||
OpenAI Five works:
|
||||
|
||||
https://neuro.cs.ut.ee/the-use-of-embeddings-in-openai-five/
|
||||
|
||||
This currently works with RLlib's policy gradient style algorithms
|
||||
(e.g., PG, PPO, IMPALA, A2C) and also DQN.
|
||||
|
||||
Note that since the model outputs now include "-inf" tf.float32.min
|
||||
values, not all algorithm options are supported at the moment. For example,
|
||||
algorithms might crash if they don't properly ignore the -inf action scores.
|
||||
Working configurations are given below.
|
||||
"""
|
||||
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import argparse
|
||||
import random
|
||||
import numpy as np
|
||||
import gym
|
||||
from gym.spaces import Box, Discrete, Dict
|
||||
import tensorflow as tf
|
||||
import tensorflow.contrib.slim as slim
|
||||
|
||||
import ray
|
||||
from ray.rllib.models import Model, ModelCatalog
|
||||
from ray.rllib.models.misc import normc_initializer
|
||||
from ray.tune import run_experiments
|
||||
from ray.tune.registry import register_env
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--stop", type=int, default=200)
|
||||
parser.add_argument("--run", type=str, default="PPO")
|
||||
|
||||
|
||||
class ParametricActionCartpole(gym.Env):
|
||||
"""Parametric action version of CartPole.
|
||||
|
||||
In this env there are only ever two valid actions, but we pretend there are
|
||||
actually up to `max_avail_actions` actions that can be taken, and the two
|
||||
valid actions are randomly hidden among this set.
|
||||
|
||||
At each step, we emit a dict of:
|
||||
- the actual cart observation
|
||||
- a mask of valid actions (e.g., [0, 0, 1, 0, 0, 1] for 6 max avail)
|
||||
- the list of action embeddings (w/ zeroes for invalid actions) (e.g.,
|
||||
[[0, 0],
|
||||
[0, 0],
|
||||
[-0.2322, -0.2569],
|
||||
[0, 0],
|
||||
[0, 0],
|
||||
[0.7878, 1.2297]] for max_avail_actions=6)
|
||||
|
||||
In a real environment, the actions embeddings would be larger than two
|
||||
units of course, and also there would be a variable number of valid actions
|
||||
per step instead of always [LEFT, RIGHT].
|
||||
"""
|
||||
|
||||
def __init__(self, max_avail_actions):
|
||||
# Use simple random 2-unit action embeddings for [LEFT, RIGHT]
|
||||
self.left_action_embed = np.random.randn(2)
|
||||
self.right_action_embed = np.random.randn(2)
|
||||
self.action_space = Discrete(max_avail_actions)
|
||||
self.wrapped = gym.make("CartPole-v0")
|
||||
self.observation_space = Dict({
|
||||
"action_mask": Box(0, 1, shape=(max_avail_actions, )),
|
||||
"avail_actions": Box(-1, 1, shape=(max_avail_actions, 2)),
|
||||
"cart": self.wrapped.observation_space,
|
||||
})
|
||||
|
||||
def update_avail_actions(self):
|
||||
self.action_assignments = [[0, 0]] * self.action_space.n
|
||||
self.action_mask = [0] * self.action_space.n
|
||||
self.left_idx, self.right_idx = random.sample(
|
||||
range(self.action_space.n), 2)
|
||||
self.action_assignments[self.left_idx] = self.left_action_embed
|
||||
self.action_assignments[self.right_idx] = self.right_action_embed
|
||||
self.action_mask[self.left_idx] = 1
|
||||
self.action_mask[self.right_idx] = 1
|
||||
|
||||
def reset(self):
|
||||
self.update_avail_actions()
|
||||
return {
|
||||
"action_mask": self.action_mask,
|
||||
"avail_actions": self.action_assignments,
|
||||
"cart": self.wrapped.reset(),
|
||||
}
|
||||
|
||||
def step(self, action):
|
||||
if action == self.left_idx:
|
||||
actual_action = 0
|
||||
elif action == self.right_idx:
|
||||
actual_action = 1
|
||||
else:
|
||||
raise ValueError(
|
||||
"Chosen action was not one of the non-zero action embeddings",
|
||||
action, self.action_assignments, self.action_mask,
|
||||
self.left_idx, self.right_idx)
|
||||
orig_obs, rew, done, info = self.wrapped.step(actual_action)
|
||||
self.update_avail_actions()
|
||||
obs = {
|
||||
"action_mask": self.action_mask,
|
||||
"avail_actions": self.action_assignments,
|
||||
"cart": orig_obs,
|
||||
}
|
||||
return obs, rew, done, info
|
||||
|
||||
|
||||
class ParametricActionsModel(Model):
|
||||
"""Parametric action model that handles the dot product and masking.
|
||||
|
||||
This assumes the outputs are logits for a single Categorical action dist.
|
||||
Getting this to work with a more complex output (e.g., if the action space
|
||||
is a tuple of several distributions) is also possible but left as an
|
||||
exercise to the reader.
|
||||
"""
|
||||
|
||||
def _build_layers_v2(self, input_dict, num_outputs, options):
|
||||
# Extract the available actions tensor from the observation.
|
||||
avail_actions = input_dict["obs"]["avail_actions"]
|
||||
action_mask = input_dict["obs"]["action_mask"]
|
||||
action_embed_size = avail_actions.shape[2].value
|
||||
if num_outputs != avail_actions.shape[1].value:
|
||||
raise ValueError(
|
||||
"This model assumes num outputs is equal to max avail actions",
|
||||
num_outputs, avail_actions)
|
||||
|
||||
# Standard FC net component.
|
||||
last_layer = input_dict["obs"]["cart"]
|
||||
hiddens = [256, 256]
|
||||
for i, size in enumerate(hiddens):
|
||||
label = "fc{}".format(i)
|
||||
last_layer = slim.fully_connected(
|
||||
last_layer,
|
||||
size,
|
||||
weights_initializer=normc_initializer(1.0),
|
||||
activation_fn=tf.nn.tanh,
|
||||
scope=label)
|
||||
output = slim.fully_connected(
|
||||
last_layer,
|
||||
action_embed_size,
|
||||
weights_initializer=normc_initializer(0.01),
|
||||
activation_fn=None,
|
||||
scope="fc_out")
|
||||
|
||||
# Expand the model output to [BATCH, 1, EMBED_SIZE]. Note that the
|
||||
# avail actions tensor is of shape [BATCH, MAX_ACTIONS, EMBED_SIZE].
|
||||
intent_vector = tf.expand_dims(output, 1)
|
||||
|
||||
# Batch dot product => shape of logits is [BATCH, MAX_ACTIONS].
|
||||
action_logits = tf.reduce_sum(avail_actions * intent_vector, axis=2)
|
||||
|
||||
# Mask out invalid actions (use tf.float32.min for stability)
|
||||
inf_mask = tf.maximum(tf.log(action_mask), tf.float32.min)
|
||||
masked_logits = inf_mask + action_logits
|
||||
|
||||
return masked_logits, last_layer
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = parser.parse_args()
|
||||
ray.init()
|
||||
|
||||
ModelCatalog.register_custom_model("pa_model", ParametricActionsModel)
|
||||
register_env("pa_cartpole", lambda _: ParametricActionCartpole(10))
|
||||
if args.run == "PPO":
|
||||
cfg = {
|
||||
"observation_filter": "NoFilter", # don't filter the action list
|
||||
"vf_share_layers": True, # don't create duplicate value model
|
||||
}
|
||||
elif args.run == "DQN":
|
||||
cfg = {
|
||||
"hiddens": [], # don't postprocess the action scores
|
||||
}
|
||||
else:
|
||||
cfg = {}
|
||||
run_experiments({
|
||||
"parametric_cartpole": {
|
||||
"run": args.run,
|
||||
"env": "pa_cartpole",
|
||||
"stop": {
|
||||
"episode_reward_mean": args.stop,
|
||||
},
|
||||
"config": dict({
|
||||
"model": {
|
||||
"custom_model": "pa_model",
|
||||
},
|
||||
"num_workers": 0,
|
||||
}, **cfg),
|
||||
},
|
||||
})
|
||||
@@ -217,7 +217,7 @@ class ModelCatalog(object):
|
||||
seq_lens):
|
||||
if options.get("custom_model"):
|
||||
model = options["custom_model"]
|
||||
logger.info("Using custom model {}".format(model))
|
||||
logger.debug("Using custom model {}".format(model))
|
||||
return _global_registry.get(RLLIB_MODEL, model)(
|
||||
input_dict,
|
||||
obs_space,
|
||||
|
||||
@@ -2,6 +2,7 @@ from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
from collections import OrderedDict
|
||||
import cv2
|
||||
import logging
|
||||
import numpy as np
|
||||
@@ -164,6 +165,8 @@ class DictFlatteningPreprocessor(Preprocessor):
|
||||
return (size, )
|
||||
|
||||
def transform(self, observation):
|
||||
if not isinstance(observation, OrderedDict):
|
||||
observation = OrderedDict(sorted(list(observation.items())))
|
||||
assert len(observation) == len(self.preprocessors), \
|
||||
(len(observation), len(self.preprocessors))
|
||||
return np.concatenate([
|
||||
|
||||
@@ -27,5 +27,5 @@ basic-dqn:
|
||||
prioritized_replay_alpha: 0.5
|
||||
beta_annealing_fraction: 1.0
|
||||
final_prioritized_replay_beta: 1.0
|
||||
num_gpus: 1
|
||||
num_gpus: 0.2
|
||||
timesteps_per_iteration: 10000
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# Runs on a single g3.16xl node
|
||||
# Runs on a single g3.4xl node
|
||||
# See https://github.com/ray-project/rl-experiments for results
|
||||
atari-basic-dqn:
|
||||
env:
|
||||
@@ -29,5 +29,5 @@ atari-basic-dqn:
|
||||
prioritized_replay_alpha: 0.5
|
||||
beta_annealing_fraction: 1.0
|
||||
final_prioritized_replay_beta: 1.0
|
||||
num_gpus: 1
|
||||
num_gpus: 0.2
|
||||
timesteps_per_iteration: 10000
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
# Runs on a single g3.4xl node
|
||||
# See https://github.com/ray-project/rl-experiments for results
|
||||
dueling-ddqn:
|
||||
env:
|
||||
grid_search:
|
||||
@@ -27,5 +29,5 @@ dueling-ddqn:
|
||||
prioritized_replay_alpha: 0.5
|
||||
beta_annealing_fraction: 1.0
|
||||
final_prioritized_replay_beta: 1.0
|
||||
num_gpus: 1
|
||||
num_gpus: 0.2
|
||||
timesteps_per_iteration: 10000
|
||||
|
||||
@@ -9,7 +9,7 @@ pong-impala-fast:
|
||||
config:
|
||||
sample_batch_size: 50
|
||||
train_batch_size: 1000
|
||||
num_workers: 256
|
||||
num_workers: 128
|
||||
num_envs_per_worker: 5
|
||||
broadcast_interval: 5
|
||||
max_sample_requests_in_flight_per_worker: 1
|
||||
|
||||
Reference in New Issue
Block a user