diff --git a/.travis.yml b/.travis.yml index 205246168..b8fdedb40 100644 --- a/.travis.yml +++ b/.travis.yml @@ -148,7 +148,7 @@ matrix: before_script: - . ./ci/travis/ci.sh build script: - - travis_wait 90 bazel test --config=ci --test_output=errors --build_tests_only --test_tag_filters=learning_tests_tf rllib/... + - ./ci/keep_alive bazel test --config=ci --test_output=errors --build_tests_only --test_tag_filters=learning_tests_tf rllib/... # RLlib: Learning tests with tf=1.x (from rllib/tuned_examples/regression_tests/*.yaml). # Requested by Edi (MS): Test all learning capabilities with tf1.x @@ -165,7 +165,7 @@ matrix: before_script: - . ./ci/travis/ci.sh build script: - - travis_wait 90 bazel test --config=ci --test_output=errors --build_tests_only --test_tag_filters=learning_tests_tf rllib/... + - ./ci/keep_alive bazel test --config=ci --test_output=errors --build_tests_only --test_tag_filters=learning_tests_tf rllib/... # RLlib: Learning tests with torch (from rllib/tuned_examples/regression_tests/*.yaml). - os: linux @@ -181,7 +181,7 @@ matrix: before_script: - . ./ci/travis/ci.sh build script: - - travis_wait 90 bazel test --config=ci --test_output=errors --build_tests_only --test_tag_filters=learning_tests_torch rllib/... + - ./ci/keep_alive bazel test --config=ci --test_output=errors --build_tests_only --test_tag_filters=learning_tests_torch rllib/... # RLlib: Quick Agent train.py runs (compilation & running, no(!) learning). # Agent single tests (compilation, loss-funcs, etc..). @@ -198,7 +198,7 @@ matrix: before_script: - . ./ci/travis/ci.sh build script: - - travis_wait 60 bazel test --config=ci --build_tests_only --test_tag_filters=quick_train rllib/... + - ./ci/keep_alive bazel test --config=ci --build_tests_only --test_tag_filters=quick_train rllib/... # Test everything that does not have any of the "main" labels: # "learning_tests|quick_train|examples|tests_dir". #- ./ci/keep_alive bazel test --config=ci --build_tests_only --test_tag_filters=-learning_tests_tf,-learning_tests_torch,-quick_train,-examples,-tests_dir rllib/... diff --git a/ci/keep_alive b/ci/keep_alive index 5d86e4779..313507fdc 100755 --- a/ci/keep_alive +++ b/ci/keep_alive @@ -9,7 +9,6 @@ watchdog() { sleep 300 echo "(running, ${i}m total)" done - echo "Command timed out after 2.5h, dumping logs:" echo "TIMED OUT" kill -SIGKILL $PID } diff --git a/python/ray/tests/test_iter.py b/python/ray/tests/test_iter.py index 931ec2283..17f450725 100644 --- a/python/ray/tests/test_iter.py +++ b/python/ray/tests/test_iter.py @@ -9,6 +9,14 @@ from ray.util.iter import from_items, from_iterators, from_range, \ from ray.test_utils import Semaphore +def test_select_shards(ray_start_regular_shared): + it = from_items([1, 2, 3, 4], num_shards=4) + it1 = it.select_shards([0, 2]) + it2 = it.select_shards([1, 3]) + assert it1.take(4) == [1, 3] + assert it2.take(4) == [2, 4] + + def test_metrics(ray_start_regular_shared): it = from_items([1, 2, 3, 4], num_shards=1) it2 = from_items([1, 2, 3, 4], num_shards=1) diff --git a/python/ray/util/iter.py b/python/ray/util/iter.py index fe00034b1..bf4b2ed91 100644 --- a/python/ray/util/iter.py +++ b/python/ray/util/iter.py @@ -535,6 +535,29 @@ class ParallelIterator(Generic[T]): "ParallelUnion[{}, {}]".format(self, other), parent_iterators=self.parent_iterators + other.parent_iterators) + def select_shards(self, + shards_to_keep: List[int]) -> "ParallelIterator[T]": + """Return a child iterator that only iterates over given shards. + + It is the user's responsibility to ensure child iterators are operating + over disjoint sub-sets of this iterator's shards. + """ + if len(self.actor_sets) > 1: + raise ValueError("select_shards() is not allowed after union()") + if len(shards_to_keep) == 0: + raise ValueError("at least one shard must be selected") + old_actor_set = self.actor_sets[0] + new_actors = [ + a for (i, a) in enumerate(old_actor_set.actors) + if i in shards_to_keep + ] + assert len(new_actors) == len(shards_to_keep), "Invalid actor index" + new_actor_set = _ActorSet(new_actors, old_actor_set.transforms) + return ParallelIterator( + [new_actor_set], + "{}.select_shards({} total)".format(self, len(shards_to_keep)), + parent_iterators=self.parent_iterators) + def num_shards(self) -> int: """Return the number of worker actors backing this iterator.""" return sum(len(a.actors) for a in self.actor_sets) diff --git a/rllib/BUILD b/rllib/BUILD index 86f575318..a01ff851f 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -41,24 +41,233 @@ # -------------------------------------------------------------------- py_test( - name = "run_regression_tests_cartpole_tf", + name = "run_regression_tests_cartpole_pg_a3c_tf", main = "tests/run_regression_tests.py", tags = ["learning_tests_tf", "learning_tests_cartpole"], - size = "enormous", # = 60min timeout + size = "large", srcs = ["tests/run_regression_tests.py"], - data = glob(["tuned_examples/regression_tests/cartpole-*-tf.yaml"]), - # Pass `BAZEL` option and the path to look for yaml regression files. + data = [ + "tuned_examples/regression_tests/cartpole-pg-tf.yaml", + "tuned_examples/regression_tests/cartpole-a3c-tf.yaml", + ], args = ["BAZEL", "tuned_examples/regression_tests"] ) py_test( - name = "run_regression_tests_cartpole_torch", + name = "run_regression_tests_cartpole_appo_tf", + main = "tests/run_regression_tests.py", + tags = ["learning_tests_tf", "learning_tests_cartpole"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = [ + "tuned_examples/regression_tests/cartpole-appo-tf.yaml", + ], + args = ["BAZEL", "tuned_examples/regression_tests"] +) + +py_test( + name = "run_regression_tests_cartpole_appo_vtrace_tf", + main = "tests/run_regression_tests.py", + tags = ["learning_tests_tf", "learning_tests_cartpole"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = [ + "tuned_examples/regression_tests/cartpole-appo-vtrace-tf.yaml", + ], + args = ["BAZEL", "tuned_examples/regression_tests"] +) + +py_test( + name = "run_regression_tests_cartpole_es_tf", + main = "tests/run_regression_tests.py", + tags = ["learning_tests_tf", "learning_tests_cartpole"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = [ + "tuned_examples/regression_tests/cartpole-es-tf.yaml", + ], + args = ["BAZEL", "tuned_examples/regression_tests"] +) + +py_test( + name = "run_regression_tests_cartpole_ars_tf", + main = "tests/run_regression_tests.py", + tags = ["learning_tests_tf", "learning_tests_cartpole"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = [ + "tuned_examples/regression_tests/cartpole-ars-tf.yaml", + ], + args = ["BAZEL", "tuned_examples/regression_tests"] +) + +py_test( + name = "run_regression_tests_cartpole_dqn_tf", + main = "tests/run_regression_tests.py", + tags = ["learning_tests_tf", "learning_tests_cartpole"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = [ + "tuned_examples/regression_tests/cartpole-simpleq-tf.yaml", + "tuned_examples/regression_tests/cartpole-dqn-tf.yaml", + "tuned_examples/regression_tests/cartpole-dqn-param-noise-tf.yaml", + ], + args = ["BAZEL", "tuned_examples/regression_tests"] +) + +py_test( + name = "run_regression_tests_cartpole_impala_tf", + main = "tests/run_regression_tests.py", + tags = ["learning_tests_tf", "learning_tests_cartpole"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = [ + "tuned_examples/regression_tests/cartpole-impala-tf.yaml", + ], + args = ["BAZEL", "tuned_examples/regression_tests"] +) + +py_test( + name = "run_regression_tests_cartpole_sac_tf", + main = "tests/run_regression_tests.py", + tags = ["learning_tests_tf", "learning_tests_cartpole"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = [ + "tuned_examples/regression_tests/cartpole-sac-tf.yaml", + ], + args = ["BAZEL", "tuned_examples/regression_tests"] +) + +py_test( + name = "run_regression_tests_cartpole_ppo_tf", + main = "tests/run_regression_tests.py", + tags = ["learning_tests_tf", "learning_tests_cartpole"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = [ + "tuned_examples/regression_tests/cartpole-ppo-tf.yaml", + ], + args = ["BAZEL", "tuned_examples/regression_tests"] +) + +py_test( + name = "run_regression_tests_cartpole_a2c_torch", main = "tests/run_regression_tests.py", tags = ["learning_tests_torch", "learning_tests_cartpole"], - size = "enormous", # = 60min timeout + size = "large", srcs = ["tests/run_regression_tests.py"], - data = glob(["tuned_examples/regression_tests/cartpole-*-torch.yaml"]), - # Pass `BAZEL` option and the path to look for yaml regression files. + data = [ + "tuned_examples/regression_tests/cartpole-a2c-torch.yaml" + ], + args = ["BAZEL", "tuned_examples/regression_tests"] +) + +py_test( + name = "run_regression_tests_cartpole_appo_torch", + main = "tests/run_regression_tests.py", + tags = ["learning_tests_torch", "learning_tests_cartpole"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = [ + "tuned_examples/regression_tests/cartpole-appo-torch.yaml" + ], + args = ["BAZEL", "tuned_examples/regression_tests"] +) + +py_test( + name = "run_regression_tests_cartpole_appo_vtrace_torch", + main = "tests/run_regression_tests.py", + tags = ["learning_tests_torch", "learning_tests_cartpole"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = [ + "tuned_examples/regression_tests/cartpole-appo-vtrace-torch.yaml" + ], + args = ["BAZEL", "tuned_examples/regression_tests"] +) + +py_test( + name = "run_regression_tests_cartpole_ars_torch", + main = "tests/run_regression_tests.py", + tags = ["learning_tests_torch", "learning_tests_cartpole"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = [ + "tuned_examples/regression_tests/cartpole-ars-torch.yaml" + ], + args = ["BAZEL", "tuned_examples/regression_tests"] +) + +py_test( + name = "run_regression_tests_cartpole_dqn_torch", + main = "tests/run_regression_tests.py", + tags = ["learning_tests_torch", "learning_tests_cartpole"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = [ + "tuned_examples/regression_tests/cartpole-dqn-param-noise-torch.yaml" + ], + args = ["BAZEL", "tuned_examples/regression_tests"] +) + +py_test( + name = "run_regression_tests_cartpole_es_torch", + main = "tests/run_regression_tests.py", + tags = ["learning_tests_torch", "learning_tests_cartpole"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = [ + "tuned_examples/regression_tests/cartpole-es-torch.yaml" + ], + args = ["BAZEL", "tuned_examples/regression_tests"] +) + +py_test( + name = "run_regression_tests_cartpole_impala_torch", + main = "tests/run_regression_tests.py", + tags = ["learning_tests_torch", "learning_tests_cartpole"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = [ + "tuned_examples/regression_tests/cartpole-impala-torch.yaml" + ], + args = ["BAZEL", "tuned_examples/regression_tests"] +) + +py_test( + name = "run_regression_tests_cartpole_pg_torch", + main = "tests/run_regression_tests.py", + tags = ["learning_tests_torch", "learning_tests_cartpole"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = [ + "tuned_examples/regression_tests/cartpole-pg-torch.yaml" + ], + args = ["BAZEL", "tuned_examples/regression_tests"] +) + +py_test( + name = "run_regression_tests_cartpole_ppo_torch", + main = "tests/run_regression_tests.py", + tags = ["learning_tests_torch", "learning_tests_cartpole"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = [ + "tuned_examples/regression_tests/cartpole-ppo-torch.yaml" + ], + args = ["BAZEL", "tuned_examples/regression_tests"] +) + +py_test( + name = "run_regression_tests_cartpole_sac_torch", + main = "tests/run_regression_tests.py", + tags = ["learning_tests_torch", "learning_tests_cartpole"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = [ + "tuned_examples/regression_tests/cartpole-sac-torch.yaml" + ], args = ["BAZEL", "tuned_examples/regression_tests"] ) diff --git a/rllib/agents/dqn/apex.py b/rllib/agents/dqn/apex.py index f5b70c874..ff4d728e4 100644 --- a/rllib/agents/dqn/apex.py +++ b/rllib/agents/dqn/apex.py @@ -4,7 +4,7 @@ import copy import ray from ray.rllib.agents.dqn.dqn import DQNTrainer, DEFAULT_CONFIG as DQN_CONFIG from ray.rllib.execution.common import STEPS_TRAINED_COUNTER, \ - SampleBatchType, _get_shared_metrics + SampleBatchType, _get_shared_metrics, _get_global_vars from ray.rllib.evaluation.worker_set import WorkerSet from ray.rllib.execution.rollout_ops import ParallelRollouts from ray.rllib.execution.concurrency_ops import Concurrently, Enqueue, Dequeue @@ -105,7 +105,7 @@ class UpdateWorkerWeights: self.learner_thread.weights_updated = False self.weights = ray.put( self.workers.local_worker().get_weights()) - actor.set_weights.remote(self.weights) + actor.set_weights.remote(self.weights, _get_global_vars()) self.steps_since_update[actor] = 0 # Update metrics. metrics = LocalIterator.get_metrics() @@ -148,12 +148,15 @@ def execution_plan(workers: WorkerSet, config: dict): # the weights of the worker that generated the batch. rollouts = ParallelRollouts(workers, mode="async", num_async=2) store_op = rollouts \ - .for_each(StoreToReplayBuffer(actors=replay_actors)) \ - .zip_with_source_actor() \ - .for_each(UpdateWorkerWeights( - learner_thread, workers, - max_weight_sync_delay=config["optimizer"]["max_weight_sync_delay"]) - ) + .for_each(StoreToReplayBuffer(actors=replay_actors)) + # Only need to update workers if there are remote workers. + if workers.remote_workers(): + store_op = store_op.zip_with_source_actor() \ + .for_each(UpdateWorkerWeights( + learner_thread, workers, + max_weight_sync_delay=( + config["optimizer"]["max_weight_sync_delay"]) + )) # (2) Read experiences from the replay buffer actors and send to the # learner thread via its in-queue. diff --git a/rllib/agents/dqn/tests/test_apex_dqn.py b/rllib/agents/dqn/tests/test_apex_dqn.py index 0eca51dbb..e683e1645 100644 --- a/rllib/agents/dqn/tests/test_apex_dqn.py +++ b/rllib/agents/dqn/tests/test_apex_dqn.py @@ -14,6 +14,17 @@ class TestApexDQN(unittest.TestCase): def tearDown(self): ray.shutdown() + def test_apex_zero_workers(self): + config = apex.APEX_DEFAULT_CONFIG.copy() + config["num_workers"] = 0 + config["prioritized_replay"] = True + config["timesteps_per_iteration"] = 100 + config["min_iter_time_s"] = 1 + config["optimizer"]["num_replay_buffer_shards"] = 1 + trainer = apex.ApexTrainer(config=config, env="CartPole-v0") + trainer.train() + trainer.stop() + def test_apex_dqn_compilation_and_per_worker_epsilon_values(self): """Test whether an APEX-DQNTrainer can be built on all frameworks.""" config = apex.APEX_DEFAULT_CONFIG.copy() diff --git a/rllib/agents/impala/impala.py b/rllib/agents/impala/impala.py index 8b32a9e08..9c9c91007 100644 --- a/rllib/agents/impala/impala.py +++ b/rllib/agents/impala/impala.py @@ -1,12 +1,28 @@ +import copy +import logging + +import ray from ray.rllib.agents.a3c.a3c_tf_policy import A3CTFPolicy from ray.rllib.agents.impala.vtrace_tf_policy import VTraceTFPolicy +from ray.rllib.agents.impala.tree_agg import \ + gather_experiences_tree_aggregation from ray.rllib.agents.trainer import Trainer, with_common_config from ray.rllib.agents.trainer_template import build_trainer +from ray.rllib.execution.common import STEPS_TRAINED_COUNTER, _get_global_vars +from ray.rllib.execution.replay_ops import MixInReplay +from ray.rllib.execution.rollout_ops import ParallelRollouts, ConcatBatches +from ray.rllib.execution.concurrency_ops import Concurrently, Enqueue, Dequeue +from ray.rllib.execution.metric_ops import StandardMetricsReporting from ray.rllib.optimizers import AsyncSamplesOptimizer from ray.rllib.optimizers.aso_tree_aggregator import TreeAggregator +from ray.rllib.optimizers.aso_learner import LearnerThread +from ray.rllib.optimizers.aso_multi_gpu_learner import TFMultiGPULearner from ray.rllib.utils.annotations import override from ray.tune.trainable import Trainable from ray.tune.resources import Resources +from ray.util.iter import LocalIterator + +logger = logging.getLogger(__name__) # yapf: disable # __sphinx_doc_begin__ @@ -75,9 +91,6 @@ DEFAULT_CONFIG = with_common_config({ "vf_loss_coeff": 0.5, "entropy_coeff": 0.01, "entropy_coeff_schedule": None, - - # use fake (infinite speed) sampler for testing - "_fake_sampler": False, }) # __sphinx_doc_end__ # yapf: enable @@ -141,6 +154,37 @@ class OverrideDefaultResourceRequest: cf["num_workers"]) +def make_learner_thread(local_worker, config): + if config["num_gpus"] > 1 or config["num_data_loader_buffers"] > 1: + logger.info( + "Enabling multi-GPU mode, {} GPUs, {} parallel loaders".format( + config["num_gpus"], config["num_data_loader_buffers"])) + if config["num_data_loader_buffers"] < config["minibatch_buffer_size"]: + raise ValueError( + "In multi-gpu mode you must have at least as many " + "parallel data loader buffers as minibatch buffers: " + "{} vs {}".format(config["num_data_loader_buffers"], + config["minibatch_buffer_size"])) + learner_thread = TFMultiGPULearner( + local_worker, + num_gpus=config["num_gpus"], + lr=config["lr"], + train_batch_size=config["train_batch_size"], + num_data_loader_buffers=config["num_data_loader_buffers"], + minibatch_buffer_size=config["minibatch_buffer_size"], + num_sgd_iter=config["num_sgd_iter"], + learner_queue_size=config["learner_queue_size"], + learner_queue_timeout=config["learner_queue_timeout"]) + else: + learner_thread = LearnerThread( + local_worker, + minibatch_buffer_size=config["minibatch_buffer_size"], + num_sgd_iter=config["num_sgd_iter"], + learner_queue_size=config["learner_queue_size"], + learner_queue_timeout=config["learner_queue_timeout"]) + return learner_thread + + def get_policy_class(config): if config["use_pytorch"]: if config["vtrace"]: @@ -168,6 +212,106 @@ def validate_config(config): "Must use `batch_mode`=truncate_episodes if `vtrace` is True.") +# Update worker weights as they finish generating experiences. +class BroadcastUpdateLearnerWeights: + def __init__(self, learner_thread, workers, broadcast_interval): + self.learner_thread = learner_thread + self.steps_since_broadcast = 0 + self.broadcast_interval = broadcast_interval + self.workers = workers + self.weights = workers.local_worker().get_weights() + + def __call__(self, item): + actor, batch = item + self.steps_since_broadcast += 1 + if (self.steps_since_broadcast >= self.broadcast_interval + and self.learner_thread.weights_updated): + self.weights = ray.put(self.workers.local_worker().get_weights()) + self.steps_since_broadcast = 0 + self.learner_thread.weights_updated = False + # Update metrics. + metrics = LocalIterator.get_metrics() + metrics.counters["num_weight_broadcasts"] += 1 + actor.set_weights.remote(self.weights, _get_global_vars()) + + +def record_steps_trained(count): + metrics = LocalIterator.get_metrics() + # Manually update the steps trained counter since the learner thread + # is executing outside the pipeline. + metrics.counters[STEPS_TRAINED_COUNTER] += count + + +def gather_experiences_directly(workers, config): + rollouts = ParallelRollouts( + workers, + mode="async", + num_async=config["max_sample_requests_in_flight_per_worker"]) + + # Augment with replay and concat to desired train batch size. + train_batches = rollouts \ + .for_each(lambda batch: batch.decompress_if_needed()) \ + .for_each(MixInReplay( + num_slots=config["replay_buffer_num_slots"], + replay_proportion=config["replay_proportion"])) \ + .flatten() \ + .combine( + ConcatBatches(min_batch_size=config["train_batch_size"])) + + return train_batches + + +# Experimental distributed execution impl; enable with "use_exec_api": True. +def execution_plan(workers, config): + if config["num_aggregation_workers"] > 0: + train_batches = gather_experiences_tree_aggregation(workers, config) + else: + train_batches = gather_experiences_directly(workers, config) + + # Start the learner thread. + learner_thread = make_learner_thread(workers.local_worker(), config) + learner_thread.start() + + # This sub-flow sends experiences to the learner. + enqueue_op = train_batches \ + .for_each(Enqueue(learner_thread.inqueue)) + # Only need to update workers if there are remote workers. + if workers.remote_workers(): + enqueue_op = enqueue_op.zip_with_source_actor() \ + .for_each(BroadcastUpdateLearnerWeights( + learner_thread, workers, + broadcast_interval=config["broadcast_interval"])) + + # This sub-flow updates the steps trained counter based on learner output. + dequeue_op = Dequeue( + learner_thread.outqueue, check=learner_thread.is_alive) \ + .for_each(record_steps_trained) + + merged_op = Concurrently( + [enqueue_op, dequeue_op], mode="async", output_indexes=[1]) + + def add_learner_metrics(result): + def timer_to_ms(timer): + return round(1000 * timer.mean, 3) + + result["info"].update({ + "learner_queue": learner_thread.learner_queue_size.stats(), + "learner": copy.deepcopy(learner_thread.stats), + "timing_breakdown": { + "learner_grad_time_ms": timer_to_ms(learner_thread.grad_timer), + "learner_load_time_ms": timer_to_ms(learner_thread.load_timer), + "learner_load_wait_time_ms": timer_to_ms( + learner_thread.load_wait_timer), + "learner_dequeue_time_ms": timer_to_ms( + learner_thread.queue_timer), + } + }) + return result + + return StandardMetricsReporting(merged_op, workers, config) \ + .for_each(add_learner_metrics) + + ImpalaTrainer = build_trainer( name="IMPALA", default_config=DEFAULT_CONFIG, @@ -176,4 +320,5 @@ ImpalaTrainer = build_trainer( get_policy_class=get_policy_class, make_workers=defer_make_workers, make_policy_optimizer=make_aggregators_and_optimizer, + execution_plan=execution_plan, mixins=[OverrideDefaultResourceRequest]) diff --git a/rllib/agents/impala/tests/test_impala.py b/rllib/agents/impala/tests/test_impala.py index 9c4838515..4a7481b43 100644 --- a/rllib/agents/impala/tests/test_impala.py +++ b/rllib/agents/impala/tests/test_impala.py @@ -32,6 +32,7 @@ class TestIMPALA(unittest.TestCase): for i in range(num_iterations): print(trainer.train()) check_compute_action(trainer) + trainer.stop() # Test w/ LSTM. print("w/o LSTM") @@ -40,6 +41,7 @@ class TestIMPALA(unittest.TestCase): for i in range(num_iterations): print(trainer.train()) check_compute_action(trainer) + trainer.stop() if __name__ == "__main__": diff --git a/rllib/agents/impala/tree_agg.py b/rllib/agents/impala/tree_agg.py new file mode 100644 index 000000000..830f7f732 --- /dev/null +++ b/rllib/agents/impala/tree_agg.py @@ -0,0 +1,100 @@ +import logging +import os +from typing import List + +import ray +from ray.rllib.execution.common import STEPS_SAMPLED_COUNTER, \ + SampleBatchType +from ray.rllib.execution.replay_ops import MixInReplay +from ray.rllib.execution.rollout_ops import ParallelRollouts, ConcatBatches +from ray.rllib.utils.actors import create_colocated +from ray.util.iter import LocalIterator, ParallelIterator, \ + ParallelIteratorWorker, from_actors + +logger = logging.getLogger(__name__) + + +@ray.remote(num_cpus=0) +class Aggregator(ParallelIteratorWorker): + """An aggregation worker used by gather_experiences_tree_aggregation(). + + Each of these actors is a shard of a parallel iterator that consumes + batches from RolloutWorker actors, and emits batches of size + train_batch_size. This allows expensive decompression / concatenation + work to be offloaded to these actors instead of run in the learner. + """ + + def __init__(self, config: dict, + rollout_group: "ParallelIterator[SampleBatchType]"): + self.weights = None + self.global_vars = None + + def generator(): + it = rollout_group.gather_async( + num_async=config["max_sample_requests_in_flight_per_worker"]) + + # Update the rollout worker with our latest policy weights. + def update_worker(item): + worker, batch = item + if self.weights: + worker.set_weights.remote(self.weights, self.global_vars) + return batch + + # Augment with replay and concat to desired train batch size. + it = it.zip_with_source_actor() \ + .for_each(update_worker) \ + .for_each(lambda batch: batch.decompress_if_needed()) \ + .for_each(MixInReplay( + num_slots=config["replay_buffer_num_slots"], + replay_proportion=config["replay_proportion"])) \ + .flatten() \ + .combine( + ConcatBatches( + min_batch_size=config["train_batch_size"])) + + for train_batch in it: + yield train_batch + + super().__init__(generator, repeat=False) + + def get_host(self): + return os.uname()[1] + + def set_weights(self, weights, global_vars): + self.weights = weights + self.global_vars = global_vars + + +def gather_experiences_tree_aggregation(workers, config): + """Tree aggregation version of gather_experiences_directly().""" + + rollouts = ParallelRollouts(workers, mode="raw") + + # Divide up the workers between aggregators. + worker_assignments = [[] for _ in range(config["num_aggregation_workers"])] + i = 0 + for w in range(len(workers.remote_workers())): + worker_assignments[i].append(w) + i += 1 + i %= len(worker_assignments) + logger.info("Worker assignments: {}".format(worker_assignments)) + + # Create parallel iterators that represent each aggregation group. + rollout_groups: List["ParallelIterator[SampleBatchType]"] = [ + rollouts.select_shards(assigned) for assigned in worker_assignments + ] + + # This spawns |num_aggregation_workers| intermediate actors that aggregate + # experiences in parallel. We force colocation on the same node to maximize + # data bandwidth between them and the driver. + train_batches = from_actors([ + create_colocated(Aggregator, [config, g], 1)[0] for g in rollout_groups + ]) + + # TODO(ekl) properly account for replay. + def record_steps_sampled(batch): + metrics = LocalIterator.get_metrics() + metrics.counters[STEPS_SAMPLED_COUNTER] += batch.count + return batch + + return train_batches.gather_async().for_each(record_steps_sampled) diff --git a/rllib/agents/ppo/appo.py b/rllib/agents/ppo/appo.py index 01d0b0ade..9ef38795f 100644 --- a/rllib/agents/ppo/appo.py +++ b/rllib/agents/ppo/appo.py @@ -54,6 +54,9 @@ DEFAULT_CONFIG = with_base_config(impala.DEFAULT_CONFIG, { "vf_loss_coeff": 0.5, "entropy_coeff": 0.01, "entropy_coeff_schedule": None, + + # TODO: impl update target. + "use_exec_api": False, }) # __sphinx_doc_end__ # yapf: enable diff --git a/rllib/agents/ppo/ddppo.py b/rllib/agents/ppo/ddppo.py index 330941856..bf1d938ba 100644 --- a/rllib/agents/ppo/ddppo.py +++ b/rllib/agents/ppo/ddppo.py @@ -101,14 +101,6 @@ def make_distributed_allreduce_optimizer(workers, config): def execution_plan(workers, config): rollouts = ParallelRollouts(workers, mode="raw") - # Sync up the weights. In principle we don't need this, but it doesn't - # add too much overhead and handles the case where the user manually - # updates the local weights. - if config["keep_local_weights_in_sync"]: - weights = ray.put(workers.local_worker().get_weights()) - for e in workers.remote_workers(): - e.set_weights.remote(weights) - # Setup the distributed processes. if not workers.remote_workers(): raise ValueError("This optimizer requires >0 remote workers.") diff --git a/rllib/agents/ppo/tests/test_ddppo.py b/rllib/agents/ppo/tests/test_ddppo.py index 8524658b9..ee6726cfa 100644 --- a/rllib/agents/ppo/tests/test_ddppo.py +++ b/rllib/agents/ppo/tests/test_ddppo.py @@ -28,6 +28,7 @@ class TestDDPPO(unittest.TestCase): for i in range(num_iterations): trainer.train() check_compute_action(trainer) + trainer.stop() if __name__ == "__main__": diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index cb8e0d454..d4e9cf68e 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -138,6 +138,8 @@ COMMON_CONFIG = { # Log system resource metrics to results. This requires `psutil` to be # installed for sys stats, and `gputil` for GPU metrics. "log_sys_usage": True, + # Use fake (infinite speed) sampler. For testing only. + "fake_sampler": False, # === Framework Settings === # Use PyTorch (instead of tf). If using `rllib train`, this can also be diff --git a/rllib/evaluation/rollout_worker.py b/rllib/evaluation/rollout_worker.py index fe82f43f1..1b8506d25 100644 --- a/rllib/evaluation/rollout_worker.py +++ b/rllib/evaluation/rollout_worker.py @@ -149,7 +149,7 @@ class RolloutWorker(EvaluatorInterface, ParallelIteratorWorker): no_done_at_end=False, seed=None, extra_python_environs=None, - _fake_sampler=False): + fake_sampler=False): """Initialize a rollout worker. Arguments: @@ -245,7 +245,7 @@ class RolloutWorker(EvaluatorInterface, ParallelIteratorWorker): to ensure each remote worker has unique exploration behavior. extra_python_environs (dict): Extra python environments need to be set. - _fake_sampler (bool): Use a fake (inf speed) sampler for testing. + fake_sampler (bool): Use a fake (inf speed) sampler for testing. """ self._original_kwargs = locals().copy() del self._original_kwargs["self"] @@ -301,7 +301,7 @@ class RolloutWorker(EvaluatorInterface, ParallelIteratorWorker): self.preprocessing_enabled = True self.last_batch = None self.global_vars = None - self._fake_sampler = _fake_sampler + self.fake_sampler = fake_sampler self.env = _validate_env(env_creator(env_context)) if isinstance(self.env, MultiAgentEnv) or \ @@ -505,7 +505,7 @@ class RolloutWorker(EvaluatorInterface, ParallelIteratorWorker): SampleBatch|MultiAgentBatch from evaluating the current policies. """ - if self._fake_sampler and self.last_batch is not None: + if self.fake_sampler and self.last_batch is not None: return self.last_batch if log_once("sample_start"): @@ -550,7 +550,7 @@ class RolloutWorker(EvaluatorInterface, ParallelIteratorWorker): elif self.compress_observations: batch.compress() - if self._fake_sampler: + if self.fake_sampler: self.last_batch = batch return batch diff --git a/rllib/evaluation/worker_set.py b/rllib/evaluation/worker_set.py index 167a0c8af..0c5677d13 100644 --- a/rllib/evaluation/worker_set.py +++ b/rllib/evaluation/worker_set.py @@ -279,7 +279,7 @@ class WorkerSet: no_done_at_end=config["no_done_at_end"], seed=(config["seed"] + worker_index) if config["seed"] is not None else None, - _fake_sampler=config.get("_fake_sampler", False), + fake_sampler=config["fake_sampler"], extra_python_environs=extra_python_environs) return worker diff --git a/rllib/examples/custom_fast_model.py b/rllib/examples/custom_fast_model.py index e6ab6c1a4..6c628488c 100644 --- a/rllib/examples/custom_fast_model.py +++ b/rllib/examples/custom_fast_model.py @@ -45,7 +45,7 @@ if __name__ == "__main__": "rollout_fragment_length": 100, "train_batch_size": sample_from( lambda spec: 1000 * max(1, spec.config.num_gpus)), - "_fake_sampler": True, + "fake_sampler": True, }, }, }) diff --git a/rllib/execution/concurrency_ops.py b/rllib/execution/concurrency_ops.py index 2620448f4..2650181b2 100644 --- a/rllib/execution/concurrency_ops.py +++ b/rllib/execution/concurrency_ops.py @@ -56,6 +56,8 @@ def Concurrently(ops: List[LocalIterator], class Enqueue: """Enqueue data items into a queue.Queue instance. + Returns the input item as output. + The enqueue is non-blocking, so Enqueue operations can executed with Dequeue via the Concurrently() operator. @@ -79,6 +81,7 @@ class Enqueue: self.queue.put_nowait(x) except queue.Full: return _NextValueNotReady() + return x def Dequeue(input_queue: queue.Queue, check=lambda: True): diff --git a/rllib/policy/sample_batch.py b/rllib/policy/sample_batch.py index c299b4b51..1fb8c2857 100644 --- a/rllib/policy/sample_batch.py +++ b/rllib/policy/sample_batch.py @@ -216,6 +216,7 @@ class SampleBatch: elif len(arr) > 0 and is_compressed(arr[0]): self.data[key] = np.array( [unpack(o) for o in self.data[key]]) + return self def __str__(self): return "SampleBatch({})".format(str(self.data)) @@ -291,6 +292,7 @@ class MultiAgentBatch: def decompress_if_needed(self, columns=frozenset(["obs", "new_obs"])): for batch in self.policy_batches.values(): batch.decompress_if_needed(columns) + return self def __str__(self): return "MultiAgentBatch({}, count={})".format( diff --git a/rllib/tests/run_regression_tests.py b/rllib/tests/run_regression_tests.py index 149ce5a85..6399282df 100644 --- a/rllib/tests/run_regression_tests.py +++ b/rllib/tests/run_regression_tests.py @@ -22,11 +22,11 @@ import yaml import ray from ray.tune import run_experiments +from ray.rllib import _register_all if __name__ == "__main__": # Bazel regression test mode: Get path to look for yaml files from argv[2]. if sys.argv[1] == "BAZEL": - ray.init(num_cpus=5) # Get the path to use. rllib_dir = Path(__file__).parent.parent print("rllib dir={}".format(rllib_dir)) @@ -35,7 +35,6 @@ if __name__ == "__main__": map(lambda path: str(path.absolute()), yaml_files), reverse=True) # Normal mode: Get yaml files to run from command line. else: - ray.init() yaml_files = sys.argv[1:] print("Will run the following regression files:") @@ -51,7 +50,12 @@ if __name__ == "__main__": passed = False for i in range(3): - trials = run_experiments(experiments, resume=False, verbose=0) + try: + ray.init(num_cpus=5) + trials = run_experiments(experiments, resume=False, verbose=1) + finally: + ray.shutdown() + _register_all() for t in trials: if (t.last_result["episode_reward_mean"] >= diff --git a/rllib/tuned_examples/regression_tests/pendulum-td3.yaml b/rllib/tuned_examples/regression_tests/pendulum-td3.yaml index ef4d0fb9b..20b84b99e 100644 --- a/rllib/tuned_examples/regression_tests/pendulum-td3.yaml +++ b/rllib/tuned_examples/regression_tests/pendulum-td3.yaml @@ -1,7 +1,8 @@ pendulum-td3-tf: env: Pendulum-v0 run: TD3 - stop: + config: use_pytorch: false + stop: episode_reward_mean: -900 timesteps_total: 100000