From cbd9632f3a08a48cae72e903196409b89376129f Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Tue, 25 Aug 2020 22:41:39 -0500 Subject: [PATCH] Fix wait timeout logic (#10199) --- python/ray/tests/test_basic.py | 16 ++++++ src/ray/core_worker/core_worker.cc | 53 +++++-------------- .../memory_store/memory_store.cc | 6 +-- 3 files changed, 31 insertions(+), 44 deletions(-) diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index c0ee85dc0..89e54de52 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -258,6 +258,22 @@ def test_put_get(shutdown_only): assert value_before == value_after +def test_wait_timing(shutdown_only): + ray.init(num_cpus=2) + + @ray.remote + def f(): + time.sleep(1) + + future = f.remote() + + start = time.time() + ready, not_ready = ray.wait([future], timeout=0.2) + assert 0.2 < time.time() - start < 0.3 + assert len(ready) == 0 + assert len(not_ready) == 1 + + def test_function_descriptor(): python_descriptor = ray._raylet.PythonFunctionDescriptor( "module_name", "function_name", "class_name", "function_hash") diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 007822da5..8a4c68881 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1044,56 +1044,27 @@ Status CoreWorker::Wait(const std::vector &ids, int num_objects, } absl::flat_hash_set ready; - // Wait from both store providers with timeout set to 0. This is to avoid the case - // where we might use up the entire timeout on trying to get objects from one store - // provider before even trying another (which might have all of the objects available). - if (memory_object_ids.size() > 0) { - RAY_RETURN_NOT_OK(memory_store_->Wait( - memory_object_ids, - std::min(static_cast(memory_object_ids.size()), num_objects), - /*timeout_ms=*/0, worker_context_, &ready)); - RetryObjectInPlasmaErrors(memory_store_, worker_context_, memory_object_ids, - plasma_object_ids, ready); - } + int64_t start_time = current_time_ms(); + RAY_RETURN_NOT_OK(memory_store_->Wait( + memory_object_ids, + std::min(static_cast(memory_object_ids.size()), num_objects), timeout_ms, + worker_context_, &ready)); + RetryObjectInPlasmaErrors(memory_store_, worker_context_, memory_object_ids, + plasma_object_ids, ready); RAY_CHECK(static_cast(ready.size()) <= num_objects); + if (timeout_ms > 0) { + timeout_ms = + std::max(0, static_cast(timeout_ms - (current_time_ms() - start_time))); + } if (static_cast(ready.size()) < num_objects && plasma_object_ids.size() > 0) { RAY_RETURN_NOT_OK(plasma_store_provider_->Wait( plasma_object_ids, std::min(static_cast(plasma_object_ids.size()), num_objects - static_cast(ready.size())), - /*timeout_ms=*/0, worker_context_, &ready)); + timeout_ms, worker_context_, &ready)); } RAY_CHECK(static_cast(ready.size()) <= num_objects); - if (timeout_ms != 0 && static_cast(ready.size()) < num_objects) { - // Clear the ready set and retry. We clear it so that we can compute the number of - // objects to fetch from the memory store easily below. - ready.clear(); - - int64_t start_time = current_time_ms(); - if (memory_object_ids.size() > 0) { - RAY_RETURN_NOT_OK(memory_store_->Wait( - memory_object_ids, - std::min(static_cast(memory_object_ids.size()), num_objects), timeout_ms, - worker_context_, &ready)); - RetryObjectInPlasmaErrors(memory_store_, worker_context_, memory_object_ids, - plasma_object_ids, ready); - } - RAY_CHECK(static_cast(ready.size()) <= num_objects); - if (timeout_ms > 0) { - timeout_ms = - std::max(0, static_cast(timeout_ms - (current_time_ms() - start_time))); - } - if (static_cast(ready.size()) < num_objects && plasma_object_ids.size() > 0) { - RAY_RETURN_NOT_OK(plasma_store_provider_->Wait( - plasma_object_ids, - std::min(static_cast(plasma_object_ids.size()), - num_objects - static_cast(ready.size())), - timeout_ms, worker_context_, &ready)); - } - RAY_CHECK(static_cast(ready.size()) <= num_objects); - } - for (size_t i = 0; i < ids.size(); i++) { if (ready.find(ids[i]) != ready.end()) { results->at(i) = true; diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index 55cdcd075..45eb13952 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -316,15 +316,15 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector &object_ids, // calls. If timeout_ms == -1, this should run forever until all objects are // ready or a signal is received. Else it should run repeatedly until that timeout // is reached. - while (!(done = get_request->Wait(iteration_timeout)) && !timed_out && - signal_status.ok()) { + while (!timed_out && signal_status.ok() && + !(done = get_request->Wait(iteration_timeout))) { if (check_signals_) { signal_status = check_signals_(); } if (remaining_timeout >= 0) { - iteration_timeout = std::min(remaining_timeout, iteration_timeout); remaining_timeout -= iteration_timeout; + iteration_timeout = std::min(remaining_timeout, iteration_timeout); timed_out = remaining_timeout <= 0; } }