From 65297e65f02e52472c114f52797c2ea18cc3fc3e Mon Sep 17 00:00:00 2001 From: zhu-eric Date: Thu, 26 Dec 2019 14:35:10 -0800 Subject: [PATCH] Experimental Actor Pool (#6055) * mod_table * Example fix for gallery * lint * nit * nit * fix * gallery * remove table for now * training, object store, tune, actors, advanced * start tf code * first cut tf * yapf * pytorch * add torch example * torch * parallel * tune * tuning * reviewsready * finetune * fix * move_code * update conf * compile * init hyperparameter * Start images * overview * extra * fix * works * update-ps-example * param_actor * fix * examples * simple * simplify_pong * flake8 and run hyperopt * add comments * add comments * add suggestion * add suggestion * suggestions * add suggestion * add suggestions * fixed in wrong area * last edit * finish changes * add line * format * reset * tests and docs * fix tests * bazelify Co-authored-by: Richard Liaw --- doc/source/actors.rst | 18 ++ doc/source/package-ref.rst | 5 + python/ray/experimental/__init__.py | 3 +- python/ray/experimental/actor_pool.py | 216 ++++++++++++++++++++ python/ray/experimental/gcs_flush_policy.py | 16 +- python/ray/tests/BUILD | 8 + python/ray/tests/test_actor_pool.py | 148 ++++++++++++++ 7 files changed, 406 insertions(+), 8 deletions(-) create mode 100644 python/ray/experimental/actor_pool.py create mode 100644 python/ray/tests/test_actor_pool.py diff --git a/doc/source/actors.rst b/doc/source/actors.rst index 3215dee9e..219f3ad5e 100644 --- a/doc/source/actors.rst +++ b/doc/source/actors.rst @@ -195,3 +195,21 @@ If we instantiate an actor, we can pass the handle around to various tasks. for _ in range(10): time.sleep(1) print(ray.get(counter.get_counter.remote())) + + +Actor Pool (Experimental) +------------------------- + +The ``ray.experimental`` module contains a utility class, ``ActorPool``. +This class is similar to multiprocessing.Pool and lets you schedule Ray tasks over a fixed pool of actors. + +.. code-block:: + + from ray.experimental import ActorPool + + a1, a2 = Actor.remote(), Actor.remote() + pool = ActorPool([a1, a2]) + print(pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4])) + # [2, 4, 6, 8] + +See the `package reference `_ for more information. diff --git a/doc/source/package-ref.rst b/doc/source/package-ref.rst index 5ec047c14..584c504d9 100644 --- a/doc/source/package-ref.rst +++ b/doc/source/package-ref.rst @@ -50,6 +50,7 @@ Experimental APIs ----------------- .. automodule:: ray.experimental + :members: The Ray Command Line API ------------------------ @@ -74,6 +75,10 @@ The Ray Command Line API :prog: ray exec :show-nested: +.. click:: ray.scripts.scripts:submit + :prog: ray submit + :show-nested: + .. click:: ray.scripts.scripts:attach :prog: ray attach :show-nested: diff --git a/python/ray/experimental/__init__.py b/python/ray/experimental/__init__.py index cb6438d0f..676962707 100644 --- a/python/ray/experimental/__init__.py +++ b/python/ray/experimental/__init__.py @@ -6,6 +6,7 @@ from .gcs_flush_policy import (set_flushing_policy, GcsFlushPolicy, SimpleGcsFlushPolicy) from .named_actors import get_actor, register_actor from .api import get, wait +from .actor_pool import ActorPool from .dynamic_resources import set_resource @@ -18,5 +19,5 @@ def TensorFlowVariables(*args, **kwargs): __all__ = [ "TensorFlowVariables", "get_actor", "register_actor", "get", "wait", "set_flushing_policy", "GcsFlushPolicy", "SimpleGcsFlushPolicy", - "set_resource" + "set_resource", "ActorPool" ] diff --git a/python/ray/experimental/actor_pool.py b/python/ray/experimental/actor_pool.py new file mode 100644 index 000000000..34a97ba0b --- /dev/null +++ b/python/ray/experimental/actor_pool.py @@ -0,0 +1,216 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import ray + + +class ActorPool(object): + """Utility class to operate on a fixed pool of actors. + + Arguments: + actors (list): List of Ray actor handles to use in this pool. + + Examples: + >>> a1, a2 = Actor.remote(), Actor.remote() + >>> pool = ActorPool([a1, a2]) + >>> print(pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4])) + [2, 4, 6, 8] + """ + + def __init__(self, actors): + # actors to be used + self._idle_actors = list(actors) + + # get actor from future + self._future_to_actor = {} + + # get future from index + self._index_to_future = {} + + # next task to do + self._next_task_index = 0 + + # next task to return + self._next_return_index = 0 + + # next work depending when actors free + self._pending_submits = [] + + def map(self, fn, values): + """Apply the given function in parallel over the actors and values. + + This returns an ordered iterator that will return results of the map + as they finish. Note that you must iterate over the iterator to force + the computation to finish. + + Arguments: + fn (func): Function that takes (actor, value) as argument and + returns an ObjectID computing the result over the value. The + actor will be considered busy until the ObjectID completes. + values (list): List of values that fn(actor, value) should be + applied to. + + Returns: + Iterator over results from applying fn to the actors and values. + + Examples: + >>> pool = ActorPool(...) + >>> print(pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4])) + [2, 4, 6, 8] + """ + for v in values: + self.submit(fn, v) + while self.has_next(): + yield self.get_next() + + def map_unordered(self, fn, values): + """Similar to map(), but returning an unordered iterator. + + This returns an unordered iterator that will return results of the map + as they finish. This can be more efficient that map() if some results + take longer to compute than others. + + Arguments: + fn (func): Function that takes (actor, value) as argument and + returns an ObjectID computing the result over the value. The + actor will be considered busy until the ObjectID completes. + values (list): List of values that fn(actor, value) should be + applied to. + + Returns: + Iterator over results from applying fn to the actors and values. + + Examples: + >>> pool = ActorPool(...) + >>> print(pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4])) + [6, 2, 4, 8] + """ + for v in values: + self.submit(fn, v) + while self.has_next(): + yield self.get_next_unordered() + + def submit(self, fn, value): + """Schedule a single task to run in the pool. + + This has the same argument semantics as map(), but takes on a single + value instead of a list of values. The result can be retrieved using + get_next() / get_next_unordered(). + + Arguments: + fn (func): Function that takes (actor, value) as argument and + returns an ObjectID computing the result over the value. The + actor will be considered busy until the ObjectID completes. + value (object): Value to compute a result for. + + Examples: + >>> pool = ActorPool(...) + >>> pool.submit(lambda a, v: a.double.remote(v), 1) + >>> pool.submit(lambda a, v: a.double.remote(v), 2) + >>> print(pool.get_next(), pool.get_next()) + 2, 4 + """ + if self._idle_actors: + actor = self._idle_actors.pop() + future = fn(actor, value) + self._future_to_actor[future] = (self._next_task_index, actor) + self._index_to_future[self._next_task_index] = future + self._next_task_index += 1 + else: + self._pending_submits.append((fn, value)) + + def has_next(self): + """Returns whether there are any pending results to return. + + Returns: + True if there are any pending results not yet returned. + + Examples: + >>> pool = ActorPool(...) + >>> pool.submit(lambda a, v: a.double.remote(v), 1) + >>> print(pool.has_next()) + True + >>> print(pool.get_next()) + 2 + >>> print(pool.has_next()) + False + """ + return bool(self._future_to_actor) + + def get_next(self, timeout=None): + """Returns the next pending result in order. + + This returns the next result produced by submit(), blocking for up to + the specified timeout until it is available. + + Returns: + The next result. + + Raises: + TimeoutError if the timeout is reached. + + Examples: + >>> pool = ActorPool(...) + >>> pool.submit(lambda a, v: a.double.remote(v), 1) + >>> print(pool.get_next()) + 2 + """ + if not self.has_next(): + raise StopIteration("No more results to get") + if self._next_return_index >= self._next_task_index: + raise ValueError("It is not allowed to call get_next() after " + "get_next_unordered().") + future = self._index_to_future[self._next_return_index] + if timeout is not None: + res, _ = ray.wait([future], timeout=timeout) + if not res: + raise TimeoutError("Timed out waiting for result") + del self._index_to_future[self._next_return_index] + self._next_return_index += 1 + i, a = self._future_to_actor.pop(future) + self._return_actor(a) + return ray.get(future) + + def get_next_unordered(self, timeout=None): + """Returns any of the next pending results. + + This returns some result produced by submit(), blocking for up to + the specified timeout until it is available. Unlike get_next(), the + results are not always returned in same order as submitted, which can + improve performance. + + Returns: + The next result. + + Raises: + TimeoutError if the timeout is reached. + + Examples: + >>> pool = ActorPool(...) + >>> pool.submit(lambda a, v: a.double.remote(v), 1) + >>> pool.submit(lambda a, v: a.double.remote(v), 2) + >>> print(pool.get_next_unordered()) + 4 + >>> print(pool.get_next_unordered()) + 2 + """ + if not self.has_next(): + raise StopIteration("No more results to get") + # TODO(ekl) bulk wait for performance + res, _ = ray.wait( + list(self._future_to_actor), num_returns=1, timeout=timeout) + if res: + [future] = res + else: + raise TimeoutError("Timed out waiting for result") + i, a = self._future_to_actor.pop(future) + self._return_actor(a) + del self._index_to_future[i] + self._next_return_index = max(self._next_return_index, i + 1) + return ray.get(future) + + def _return_actor(self, actor): + self._idle_actors.append(actor) + if self._pending_submits: + self.submit(*self._pending_submits.pop(0)) diff --git a/python/ray/experimental/gcs_flush_policy.py b/python/ray/experimental/gcs_flush_policy.py index 892c4c111..1d1717f03 100644 --- a/python/ray/experimental/gcs_flush_policy.py +++ b/python/ray/experimental/gcs_flush_policy.py @@ -32,18 +32,20 @@ class SimpleGcsFlushPolicy(GcsFlushPolicy): """A simple policy with constant flush rate, after a warmup period. Example policy values: + flush_when_at_least_bytes 2GB + flush_period_secs 10s + flush_num_entries_each_time 10k - This means - (1) If the GCS shard uses less than 2GB of memory, no flushing would take - place. This should cover most Ray runs. - (2) The GCS shard will only honor a flush request, if it's issued after 10 - seconds since the last processed flush. In particular this means it's - okay for the Monitor to issue requests more frequently than this param. + This means: (1) If the GCS shard uses less than 2GB of memory, + no flushing would take place. This should cover most Ray runs. (2) The + GCS shard will only honor a flush request, if it's issued after 10 + seconds since the last processed flush. In particular this means it's + okay for the Monitor to issue requests more frequently than this param. (3) When processing a flush, the shard will flush at most 10k entries. - This is to control the latency of each request. + This is to control the latency of each request. Note, flush rate == (flush period) * (num entries each time). So applications that have a heavier GCS load can tune these params. diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 80bbb2ba5..2767361c6 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -6,6 +6,14 @@ py_test( deps = ["//:ray_lib"], ) +py_test( + name = "test_actor_pool", + size = "small", + srcs = ["test_actor_pool.py"], + tags = ["exclusive"], + deps = ["//:ray_lib"], +) + py_test( name = "test_actor_resources", size = "medium", diff --git a/python/ray/tests/test_actor_pool.py b/python/ray/tests/test_actor_pool.py new file mode 100644 index 000000000..a45e15a10 --- /dev/null +++ b/python/ray/tests/test_actor_pool.py @@ -0,0 +1,148 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import time +import pytest + +import ray +from ray.experimental import ActorPool + + +@pytest.fixture +def init(): + ray.init(num_cpus=4) + yield + ray.shutdown() + + +def test_get_next(init): + @ray.remote + class MyActor(object): + def __init__(self): + pass + + def f(self, x): + return x + 1 + + def double(self, x): + return 2 * x + + actors = [MyActor.remote() for _ in range(4)] + pool = ActorPool(actors) + for i in range(5): + pool.submit(lambda a, v: a.f.remote(v), i) + assert pool.get_next() == i + 1 + + +def test_get_next_unordered(init): + @ray.remote + class MyActor(object): + def __init__(self): + pass + + def f(self, x): + return x + 1 + + def double(self, x): + return 2 * x + + actors = [MyActor.remote() for _ in range(4)] + pool = ActorPool(actors) + + total = [] + + for i in range(5): + pool.submit(lambda a, v: a.f.remote(v), i) + while pool.has_next(): + total += [pool.get_next_unordered()] + + assert all(elem in [1, 2, 3, 4, 5] for elem in total) + + +def test_map(init): + @ray.remote + class MyActor(object): + def __init__(self): + pass + + def f(self, x): + return x + 1 + + def double(self, x): + return 2 * x + + actors = [MyActor.remote() for _ in range(4)] + pool = ActorPool(actors) + + index = 0 + for v in pool.map(lambda a, v: a.double.remote(v), range(5)): + assert v == 2 * index + index += 1 + + +def test_map_unordered(init): + @ray.remote + class MyActor(object): + def __init__(self): + pass + + def f(self, x): + return x + 1 + + def double(self, x): + return 2 * x + + actors = [MyActor.remote() for _ in range(4)] + pool = ActorPool(actors) + + total = [] + for v in pool.map(lambda a, v: a.double.remote(v), range(5)): + total += [v] + + assert all(elem in [0, 2, 4, 6, 8] for elem in total) + + +def test_get_next_timeout(init): + @ray.remote + class MyActor(object): + def __init__(self): + pass + + def f(self, x): + while (True): + x = x + 1 + time.sleep(1) + return None + + def double(self, x): + return 2 * x + + actors = [MyActor.remote() for _ in range(4)] + pool = ActorPool(actors) + pool.submit(lambda a, v: a.f.remote(v), 0) + with pytest.raises(TimeoutError): + pool.get_next_unordered(5) + + +def test_get_next_unordered_timeout(init): + @ray.remote + class MyActor(object): + def __init__(self): + pass + + def f(self, x): + while (True): + x + 1 + time.sleep(1) + return + + def double(self, x): + return 2 * x + + actors = [MyActor.remote() for _ in range(4)] + pool = ActorPool(actors) + + pool.submit(lambda a, v: a.f.remote(v), 0) + with pytest.raises(TimeoutError): + pool.get_next_unordered(5)