diff --git a/python/benchmarks/benchmark_wait.py b/python/benchmarks/benchmark_wait.py index 614d76a38..1003713e3 100644 --- a/python/benchmarks/benchmark_wait.py +++ b/python/benchmarks/benchmark_wait.py @@ -19,7 +19,7 @@ def sleep(x): class WaitSuite(object): - timeout = 10 + timeout = 0.01 timer = time.time def time_wait_task(self): @@ -35,5 +35,5 @@ class WaitSuite(object): def time_wait_timeout(self, timeout): ray.wait([sleep.remote(0.5)], timeout=timeout) - time_wait_timeout.params = [200, 800] - time_wait_timeout.param_names = ["timeout_ms"] + time_wait_timeout.params = [0.2, 0.8] + time_wait_timeout.param_names = ["timeout"] diff --git a/python/ray/experimental/api.py b/python/ray/experimental/api.py index 9891ecff7..32ce48c73 100644 --- a/python/ray/experimental/api.py +++ b/python/ray/experimental/api.py @@ -52,8 +52,8 @@ def wait(object_ids, num_returns=1, timeout=None, worker=None): List like of object IDs for objects that may or may not be ready. Note that these IDs must be unique. num_returns (int): The number of object IDs that should be returned. - timeout (int): The maximum amount of time in milliseconds to wait - before returning. + timeout (float): The maximum amount of time in seconds to wait before + returning. Returns: A list of object IDs that are ready and a list of the remaining object @@ -61,6 +61,11 @@ def wait(object_ids, num_returns=1, timeout=None, worker=None): """ worker = ray.worker.global_worker if worker is None else worker if isinstance(object_ids, (tuple, np.ndarray)): - return ray.wait(list(object_ids), num_returns, timeout, worker) + return ray.wait( + list(object_ids), + num_returns=num_returns, + timeout=timeout, + worker=worker) - return ray.wait(object_ids, num_returns, timeout, worker) + return ray.wait( + object_ids, num_returns=num_returns, timeout=timeout, worker=worker) diff --git a/python/ray/rllib/evaluation/metrics.py b/python/ray/rllib/evaluation/metrics.py index 1b270be37..9e1ca7a38 100644 --- a/python/ray/rllib/evaluation/metrics.py +++ b/python/ray/rllib/evaluation/metrics.py @@ -32,7 +32,7 @@ def collect_episodes(local_evaluator, for a in remote_evaluators ] collected, _ = ray.wait( - pending, num_returns=len(pending), timeout=timeout_seconds * 1000) + pending, num_returns=len(pending), timeout=timeout_seconds * 1.0) num_metric_batches_dropped = len(pending) - len(collected) metric_lists = ray.get(collected) diff --git a/python/ray/rllib/utils/actors.py b/python/ray/rllib/utils/actors.py index 689aa945c..d7affb5f7 100644 --- a/python/ray/rllib/utils/actors.py +++ b/python/ray/rllib/utils/actors.py @@ -28,7 +28,8 @@ class TaskPool(object): def completed(self): pending = list(self._tasks) if pending: - ready, _ = ray.wait(pending, num_returns=len(pending), timeout=10) + ready, _ = ray.wait( + pending, num_returns=len(pending), timeout=0.01) for obj_id in ready: yield (self._tasks.pop(obj_id), self._objects.pop(obj_id)) diff --git a/python/ray/rllib/utils/filter.py b/python/ray/rllib/utils/filter.py index 9a1f37dbd..6fd677131 100644 --- a/python/ray/rllib/utils/filter.py +++ b/python/ray/rllib/utils/filter.py @@ -20,7 +20,8 @@ class Filter(object): """Creates a new object with same state as self. Returns: - copy (Filter): Copy of self""" + A copy of self. + """ raise NotImplementedError def sync(self, other): diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index a2e76f60e..cfaf0fbe7 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -111,7 +111,7 @@ class RayTrialExecutor(TrialExecutor): stop_tasks.append(trial.runner.__ray_terminate__.remote()) # TODO(ekl) seems like wait hangs when killing actors _, unfinished = ray.wait( - stop_tasks, num_returns=2, timeout=250) + stop_tasks, num_returns=2, timeout=0.25) except Exception: logger.exception("Error stopping runner.") self.set_status(trial, Trial.ERROR) diff --git a/python/ray/worker.py b/python/ray/worker.py index 6085e0510..97ad4e04a 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -2259,6 +2259,11 @@ def put(value, worker=global_worker): def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): """Return a list of IDs that are ready and a list of IDs that are not. + .. warning:: + + The **timeout** argument used to be in **milliseconds** (up through + ``ray==0.6.1``) and now it is in **seconds**. + If timeout is set, the function returns either when the requested number of IDs are ready or when the timeout is reached, whichever occurs first. If it is not set, the function simply waits until that number of objects is ready @@ -2278,8 +2283,8 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): object_ids (List[ObjectID]): List of object IDs for objects that may or may not be ready. Note that these IDs must be unique. num_returns (int): The number of object IDs that should be returned. - timeout (int): The maximum amount of time in milliseconds to wait - before returning. + timeout (float): The maximum amount of time in seconds to wait before + returning. Returns: A list of object IDs that are ready and a list of the remaining object @@ -2294,6 +2299,15 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): raise TypeError("wait() expected a list of ObjectID, got {}".format( type(object_ids))) + if isinstance(timeout, int) and timeout != 0: + logger.warning("The 'timeout' argument now requires seconds instead " + "of milliseconds. This message can be suppressed by " + "passing in a float.") + + if timeout is not None and timeout < 0: + raise ValueError("The 'timeout' argument must be nonnegative. " + "Received {}".format(timeout)) + if worker.mode != LOCAL_MODE: for object_id in object_ids: if not isinstance(object_id, ray.ObjectID): @@ -2328,9 +2342,11 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): with worker.state_lock: current_task_id = worker.get_current_thread_task_id() - timeout = timeout if timeout is not None else 2**30 + timeout = timeout if timeout is not None else 10**6 + timeout_milliseconds = int(timeout * 1000) ready_ids, remaining_ids = worker.raylet_client.wait( - object_ids, num_returns, timeout, False, current_task_id) + object_ids, num_returns, timeout_milliseconds, False, + current_task_id) return ready_ids, remaining_ids diff --git a/test/actor_test.py b/test/actor_test.py index 82ce7e950..d4b386ed0 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -759,7 +759,7 @@ def test_actors_on_nodes_with_no_cpus(ray_start_regular): pass f = Foo.remote() - ready_ids, _ = ray.wait([f.method.remote()], timeout=100) + ready_ids, _ = ray.wait([f.method.remote()], timeout=0.1) assert ready_ids == [] @@ -843,7 +843,7 @@ def test_actor_gpus(ray_start_cluster): # Creating a new actor should fail because all of the GPUs are being # used. a = Actor1.remote() - ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=0.01) assert ready_ids == [] @@ -884,7 +884,7 @@ def test_actor_multiple_gpus(ray_start_cluster): # Creating a new actor should fail because all of the GPUs are being # used. a = Actor1.remote() - ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=0.01) assert ready_ids == [] # We should be able to create more actors that use only a single GPU. @@ -913,7 +913,7 @@ def test_actor_multiple_gpus(ray_start_cluster): # Creating a new actor should fail because all of the GPUs are being # used. a = Actor2.remote() - ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=0.01) assert ready_ids == [] @@ -953,7 +953,7 @@ def test_actor_different_numbers_of_gpus(ray_start_cluster): # Creating a new actor should fail because all of the GPUs are being # used. a = Actor1.remote() - ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=0.01) assert ready_ids == [] @@ -1030,7 +1030,7 @@ def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster): # All the GPUs should be used up now. a = Actor.remote() - ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=0.01) assert ready_ids == [] @@ -1165,7 +1165,7 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster): # Now if we run some GPU tasks, they should not be scheduled. results = [f1.remote() for _ in range(30)] - ready_ids, remaining_ids = ray.wait(results, timeout=1000) + ready_ids, remaining_ids = ray.wait(results, timeout=1.0) assert len(ready_ids) == 0 @@ -1274,7 +1274,7 @@ def test_blocking_actor_task(shutdown_only): # block. actor = CPUFoo.remote() x_id = actor.blocking_method.remote() - ready_ids, remaining_ids = ray.wait([x_id], timeout=1000) + ready_ids, remaining_ids = ray.wait([x_id], timeout=1.0) assert ready_ids == [] assert remaining_ids == [x_id] @@ -1289,7 +1289,7 @@ def test_blocking_actor_task(shutdown_only): # Make sure that GPU resources are not released when actors block. actor = GPUFoo.remote() x_id = actor.blocking_method.remote() - ready_ids, remaining_ids = ray.wait([x_id], timeout=1000) + ready_ids, remaining_ids = ray.wait([x_id], timeout=1.0) assert ready_ids == [] assert remaining_ids == [x_id] @@ -2010,7 +2010,7 @@ def test_lifetime_and_transient_resources(ray_start_regular): actor2s = [Actor2.remote() for _ in range(2)] results = [a.method.remote() for a in actor2s] ready_ids, remaining_ids = ray.wait( - results, num_returns=len(results), timeout=1000) + results, num_returns=len(results), timeout=1.0) assert len(ready_ids) == 1 @@ -2066,7 +2066,7 @@ def test_creating_more_actors_than_resources(shutdown_only): ray.wait([result2]) actor3 = ResourceActor1.remote() result3 = actor3.method.remote() - ready_ids, _ = ray.wait([result3], timeout=200) + ready_ids, _ = ray.wait([result3], timeout=0.2) assert len(ready_ids) == 0 # By deleting actor1, we free up resources to create actor3. diff --git a/test/component_failures_test.py b/test/component_failures_test.py index c0548c28e..36ee0a498 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -389,9 +389,7 @@ def test_actor_creation_node_failure(ray_start_cluster): # reconstruction for any actor creation tasks that were forwarded # to nodes that then failed. ready, _ = ray.wait( - children_out, - num_returns=len(children_out), - timeout=5 * 60 * 1000) + children_out, num_returns=len(children_out), timeout=5 * 60.0) assert len(ready) == len(children_out) # Replace any actors that died. diff --git a/test/failure_test.py b/test/failure_test.py index 488bc5153..a38055786 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -337,7 +337,7 @@ def test_actor_worker_dying(ray_start_regular): pass a = Actor.remote() - [obj], _ = ray.wait([a.kill.remote()], timeout=5000) + [obj], _ = ray.wait([a.kill.remote()], timeout=5.0) with pytest.raises(Exception): ray.get(obj) with pytest.raises(Exception): diff --git a/test/runtest.py b/test/runtest.py index 852c840a4..9ec7c1854 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -753,7 +753,7 @@ def test_defining_remote_functions(shutdown_only): args=[], num_cpus=1, num_gpus=1, resources={"Custom": 1})) == [0] infeasible_id = g._remote(args=[], resources={"NonexistentCustom": 1}) - ready_ids, remaining_ids = ray.wait([infeasible_id], timeout=50) + ready_ids, remaining_ids = ray.wait([infeasible_id], timeout=0.05) assert len(ready_ids) == 0 assert len(remaining_ids) == 1 @@ -828,14 +828,14 @@ def test_wait(shutdown_only): objectids = [f.remote(0.5), f.remote(0.5), f.remote(0.5), f.remote(0.5)] start_time = time.time() - ready_ids, remaining_ids = ray.wait(objectids, timeout=1750, num_returns=4) + ready_ids, remaining_ids = ray.wait(objectids, timeout=1.75, num_returns=4) assert time.time() - start_time < 2 assert len(ready_ids) == 3 assert len(remaining_ids) == 1 ray.wait(objectids) objectids = [f.remote(1.0), f.remote(0.5), f.remote(0.5), f.remote(0.5)] start_time = time.time() - ready_ids, remaining_ids = ray.wait(objectids, timeout=5000) + ready_ids, remaining_ids = ray.wait(objectids, timeout=5.0) assert time.time() - start_time < 5 assert len(ready_ids) == 1 assert len(remaining_ids) == 3 @@ -1302,13 +1302,13 @@ def test_free_objects_multi_node(ray_start_cluster): ] # Case 1: run this local_only=False. All 3 objects will be deleted. (a, b, c) = run_one_test(actors, False) - (l1, l2) = ray.wait([a, b, c], timeout=10, num_returns=1) + (l1, l2) = ray.wait([a, b, c], timeout=0.01, num_returns=1) # All the objects are deleted. assert len(l1) == 0 assert len(l2) == 3 # Case 2: run this local_only=True. Only 1 object will be deleted. (a, b, c) = run_one_test(actors, True) - (l1, l2) = ray.wait([a, b, c], timeout=10, num_returns=3) + (l1, l2) = ray.wait([a, b, c], timeout=0.01, num_returns=3) # One object is deleted and 2 objects are not. assert len(l1) == 2 assert len(l2) == 1 @@ -1740,7 +1740,7 @@ def test_fractional_resources(shutdown_only): # custom resource. TODO(rkn): Re-enable this once ray.wait is # implemented. f2 = Foo2._remote([], {}, resources={"Custom": 0.7}) - ready, _ = ray.wait([f2.method.remote()], timeout=500) + ready, _ = ray.wait([f2.method.remote()], timeout=0.5) assert len(ready) == 0 # Make sure we can start an actor that requries only 0.3 of the custom # resource. @@ -1977,7 +1977,7 @@ def test_two_custom_resources(ray_start_cluster): # Make sure that tasks with unsatisfied custom resource requirements do # not get scheduled. - ready_ids, remaining_ids = ray.wait([j.remote(), k.remote()], timeout=500) + ready_ids, remaining_ids = ray.wait([j.remote(), k.remote()], timeout=0.5) assert ready_ids == []