diff --git a/src/ray/core_worker/object_interface.cc b/src/ray/core_worker/object_interface.cc index 406c5edfc..79a1d6e77 100644 --- a/src/ray/core_worker/object_interface.cc +++ b/src/ray/core_worker/object_interface.cc @@ -125,90 +125,85 @@ Status CoreWorkerObjectInterface::Wait(const std::vector &ids, int num (*results).resize(ids.size(), false); if (num_objects <= 0 || num_objects > static_cast(ids.size())) { - return Status::Invalid("num_objects value is not valid"); + return Status::Invalid( + "Number of objects to wait for must be between 1 and the number of ids."); } EnumUnorderedMap> object_ids_per_store_provider; GroupObjectIdsByStoreProvider(ids, &object_ids_per_store_provider); + size_t total_count = 0; + for (const auto &entry : object_ids_per_store_provider) { + total_count += entry.second.size(); + } + if (total_count != ids.size()) { + return Status::Invalid("Duplicate object IDs not supported in wait."); + } + + // TODO(edoakes): this logic is not ideal, and will have to be addressed + // before we enable direct actor calls in the Python code. If we are waiting + // on a list of objects mixed between multiple store providers, we could + // easily end up in the situation where we're blocked waiting on one store + // provider while another actually has enough objects ready to fulfill + // 'num_objects'. This is partially addressed by trying them all once with + // a timeout of 0, but that does not address the situation where objects + // become available on the second store provider while waiting on the first. + + std::unordered_set ready; // Wait from all the store providers with timeout set to 0. This is to avoid the case // where we might use up the entire timeout on trying to get objects from one store // provider before even trying another (which might have all of the objects available). - RAY_RETURN_NOT_OK(WaitFromMultipleStoreProviders(ids, object_ids_per_store_provider, - /* timeout_ms= */ 0, &num_objects, - results)); + RAY_RETURN_NOT_OK(WaitFromMultipleStoreProviders(object_ids_per_store_provider, + /*timeout_ms=*/0, &num_objects, + &ready)); if (num_objects > 0) { // Wait from all the store providers with the specified timeout // if the required number of objects haven't been ready yet. - RAY_RETURN_NOT_OK(WaitFromMultipleStoreProviders(ids, object_ids_per_store_provider, - /* timeout_ms= */ timeout_ms, - &num_objects, results)); + RAY_RETURN_NOT_OK(WaitFromMultipleStoreProviders(object_ids_per_store_provider, + /*timeout_ms=*/timeout_ms, + &num_objects, &ready)); + } + + for (size_t i = 0; i < ids.size(); i++) { + if (ready.find(ids[i]) != ready.end()) { + (*results)[i] = true; + } } return Status::OK(); } Status CoreWorkerObjectInterface::WaitFromMultipleStoreProviders( - const std::vector &ids, - const EnumUnorderedMap> - &ids_per_provider, - int64_t timeout_ms, int *num_objects, std::vector *results) { - std::unordered_map object_counts; - for (const auto &entry : ids) { - auto iter = object_counts.find(entry); - if (iter == object_counts.end()) { - object_counts.emplace(entry, 1); - } else { - iter->second++; + EnumUnorderedMap> &ids_per_provider, + int64_t timeout_ms, int *num_objects, std::unordered_set *ready) { + int64_t remaining_timeout_ms = timeout_ms; + for (auto &provider_entry : ids_per_provider) { + if (*num_objects <= 0) { + break; + } + int64_t start_time = current_time_ms(); + int required_objects = + std::min(static_cast(provider_entry.second.size()), *num_objects); + std::unordered_set provider_ready; + RAY_RETURN_NOT_OK(store_providers_[provider_entry.first]->Wait( + provider_entry.second, required_objects, remaining_timeout_ms, + worker_context_.GetCurrentTaskID(), &provider_ready)); + + // Update num_objects and remove the ready objects from the list so they don't get + // double-counted. + *num_objects -= provider_ready.size(); + for (const ObjectID &ready_id : provider_ready) { + ready->insert(ready_id); + provider_entry.second.erase(ready_id); } - } - auto remaining_timeout_ms = timeout_ms; - for (const auto &entry : ids_per_provider) { - std::unordered_set objects; - auto start_time = current_time_ms(); - int required_objects = std::min(static_cast(entry.second.size()), *num_objects); - RAY_RETURN_NOT_OK(WaitFromStoreProvider(entry.first, entry.second, required_objects, - remaining_timeout_ms, &objects)); if (remaining_timeout_ms > 0) { int64_t duration = current_time_ms() - start_time; remaining_timeout_ms = std::max(static_cast(0), remaining_timeout_ms - duration); } - for (const auto &entry : objects) { - *num_objects -= object_counts[entry]; - } - - for (size_t i = 0; i < ids.size(); i++) { - if (objects.count(ids[i]) > 0) { - (*results)[i] = true; - } - } - - if (*num_objects <= 0) { - break; - } - } - - return Status::OK(); -}; - -Status CoreWorkerObjectInterface::WaitFromStoreProvider( - StoreProviderType type, const std::unordered_set &object_ids, - int num_objects, int64_t timeout_ms, std::unordered_set *results) { - std::vector ids(object_ids.begin(), object_ids.end()); - if (!ids.empty()) { - std::vector objects; - RAY_RETURN_NOT_OK(store_providers_[type]->Wait( - ids, num_objects, timeout_ms, worker_context_.GetCurrentTaskID(), &objects)); - RAY_CHECK(ids.size() == objects.size()); - for (size_t i = 0; i < objects.size(); i++) { - if (objects[i]) { - (*results).insert(ids[i]); - } - } } return Status::OK(); diff --git a/src/ray/core_worker/object_interface.h b/src/ray/core_worker/object_interface.h index bcf23b30b..227f6cdae 100644 --- a/src/ray/core_worker/object_interface.h +++ b/src/ray/core_worker/object_interface.h @@ -71,33 +71,18 @@ class CoreWorkerObjectInterface { bool delete_creating_tasks); private: - /// Helper function to get a list of objects from different store providers. + /// Helper function to get a set of objects from different store providers. /// - /// \param[in] object_ids IDs of the objects to get. /// \param[in] ids_per_provider A map from store provider type to the set of // object ids for that store provider. /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's -1. /// \param[in/out] num_objects Number of objects that should appear before returning. - /// \param[out] results A bitset that indicates each object has appeared or not. + /// Should be updated as objects are added to the ready set. + /// \param[in/out] results A set that holds objects that are ready. /// \return Status. Status WaitFromMultipleStoreProviders( - const std::vector &object_ids, - const EnumUnorderedMap> - &ids_per_provider, - int64_t timeout_ms, int *num_objects, std::vector *results); - - /// Helper function to wait a list of objects from a specific store provider. - /// - /// \param[in] type The type of store provider to use. - /// \param[in] object_ids IDs of the objects to wait for. - /// \param[in] num_objects Number of objects that should appear before returning. - /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. - /// \param[out] results A bitset that indicates each object has appeared or not. - /// \return Status. - Status WaitFromStoreProvider(StoreProviderType type, - const std::unordered_set &object_ids, - int num_objects, int64_t timeout_ms, - std::unordered_set *results); + EnumUnorderedMap> &ids_per_provider, + int64_t timeout_ms, int *num_objects, std::unordered_set *results); /// Create a new store provider for the specified type on demand. std::unique_ptr CreateStoreProvider( diff --git a/src/ray/core_worker/store_provider/memory_store_provider.cc b/src/ray/core_worker/store_provider/memory_store_provider.cc index 1fedf1f12..ebbb8ac93 100644 --- a/src/ray/core_worker/store_provider/memory_store_provider.cc +++ b/src/ray/core_worker/store_provider/memory_store_provider.cc @@ -44,18 +44,20 @@ Status CoreWorkerMemoryStoreProvider::Get( return Status::OK(); } -Status CoreWorkerMemoryStoreProvider::Wait(const std::vector &object_ids, +Status CoreWorkerMemoryStoreProvider::Wait(const std::unordered_set &object_ids, int num_objects, int64_t timeout_ms, const TaskID &task_id, - std::vector *results) { - (*results).resize(object_ids.size(), false); - + std::unordered_set *ready) { + std::vector id_vector(object_ids.begin(), object_ids.end()); std::vector> result_objects; + RAY_CHECK(object_ids.size() == id_vector.size()); RAY_RETURN_NOT_OK( - store_->Get(object_ids, num_objects, timeout_ms, false, &result_objects)); - RAY_CHECK(result_objects.size() == object_ids.size()); - for (size_t i = 0; i < object_ids.size(); i++) { - (*results)[i] = (result_objects[i] != nullptr); + store_->Get(id_vector, num_objects, timeout_ms, false, &result_objects)); + + for (size_t i = 0; i < id_vector.size(); i++) { + if (result_objects[i] != nullptr) { + ready->insert(id_vector[i]); + } } return Status::OK(); diff --git a/src/ray/core_worker/store_provider/memory_store_provider.h b/src/ray/core_worker/store_provider/memory_store_provider.h index 066d7cf41..c60d7f7ca 100644 --- a/src/ray/core_worker/store_provider/memory_store_provider.h +++ b/src/ray/core_worker/store_provider/memory_store_provider.h @@ -29,9 +29,9 @@ class CoreWorkerMemoryStoreProvider : public CoreWorkerStoreProvider { /// See `CoreWorkerStoreProvider::Wait` for semantics. /// Note that `num_objects` must equal to number of items in `object_ids`. - Status Wait(const std::vector &object_ids, int num_objects, + Status Wait(const std::unordered_set &object_ids, int num_objects, int64_t timeout_ms, const TaskID &task_id, - std::vector *results) override; + std::unordered_set *ready) override; /// See `CoreWorkerStoreProvider::Delete` for semantics. /// Note that `local_only` must be true, and `delete_creating_tasks` must be false here. diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 54b850405..ab728cde4 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -150,27 +150,20 @@ Status CoreWorkerPlasmaStoreProvider::Get( return raylet_client_->NotifyUnblocked(task_id); } -Status CoreWorkerPlasmaStoreProvider::Wait(const std::vector &object_ids, +Status CoreWorkerPlasmaStoreProvider::Wait(const std::unordered_set &object_ids, int num_objects, int64_t timeout_ms, const TaskID &task_id, - std::vector *results) { + std::unordered_set *ready) { WaitResultPair result_pair; - auto status = raylet_client_->Wait(object_ids, num_objects, timeout_ms, false, task_id, - &result_pair); - std::unordered_set ready_ids; + std::vector id_vector(object_ids.begin(), object_ids.end()); + RAY_RETURN_NOT_OK(raylet_client_->Wait(id_vector, num_objects, timeout_ms, false, + task_id, &result_pair)); + for (const auto &entry : result_pair.first) { - ready_ids.insert(entry); + ready->insert(entry); } - // TODO(zhijunfu): change RayletClient::Wait() to return a bit set, so that we don't - // need - // to do this translation. - (*results).resize(object_ids.size()); - for (size_t i = 0; i < object_ids.size(); i++) { - (*results)[i] = ready_ids.count(object_ids[i]) > 0; - } - - return status; + return Status::OK(); } Status CoreWorkerPlasmaStoreProvider::Delete(const std::vector &object_ids, diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 04d666084..6436c5fcf 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -29,9 +29,9 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { std::unordered_map> *results) override; /// See `CoreWorkerStoreProvider::Wait` for semantics. - Status Wait(const std::vector &object_ids, int num_objects, + Status Wait(const std::unordered_set &object_ids, int num_objects, int64_t timeout_ms, const TaskID &task_id, - std::vector *results) override; + std::unordered_set *ready) override; /// See `CoreWorkerStoreProvider::Delete` for semantics. Status Delete(const std::vector &object_ids, bool local_only = true, diff --git a/src/ray/core_worker/store_provider/store_provider.h b/src/ray/core_worker/store_provider/store_provider.h index 1c3474fc4..dc7757427 100644 --- a/src/ray/core_worker/store_provider/store_provider.h +++ b/src/ray/core_worker/store_provider/store_provider.h @@ -88,17 +88,19 @@ class CoreWorkerStoreProvider { const TaskID &task_id, std::unordered_map> *results) = 0; - /// Wait for a list of objects to appear in the object store. + /// Wait for a list of objects to appear in the object store. Objects that appear will + /// be added to the ready set. /// /// \param[in] object_ids IDs of the objects to wait for. /// \param[in] num_objects Number of objects that should appear before returning. /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. /// \param[in] task_id ID for the current task. - /// \param[out] results A bitset that indicates each object has appeared or not. + /// \param[out] ready IDs of objects that have appeared. Wait will only add to this + /// set, not clear or remove from it, so the caller can pass in a non-empty set. /// \return Status. - virtual Status Wait(const std::vector &object_ids, int num_objects, + virtual Status Wait(const std::unordered_set &object_ids, int num_objects, int64_t timeout_ms, const TaskID &task_id, - std::vector *results) = 0; + std::unordered_set *ready) = 0; /// Delete a list of objects from the object store. /// diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index bc48162f1..b973e7c77 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -494,27 +494,21 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) { RAY_CHECK_OK(provider.Put(buffers[i], ids[i])); } - // Test Wait() with duplicate object ids. - std::vector ids_with_duplicate; - ids_with_duplicate.insert(ids_with_duplicate.end(), ids.begin(), ids.end()); - // add the same ids again to test `Get` with duplicate object ids. - ids_with_duplicate.insert(ids_with_duplicate.end(), ids.begin(), ids.end()); + std::unordered_set wait_ids(ids.begin(), ids.end()); + std::unordered_set wait_results; - std::vector wait_ids(ids_with_duplicate); - ObjectID non_existent_id = ObjectID::FromRandom(); - wait_ids.push_back(non_existent_id); + ObjectID nonexistent_id = ObjectID::FromRandom(); + wait_ids.insert(nonexistent_id); + RAY_CHECK_OK( + provider.Wait(wait_ids, ids.size() + 1, 100, RandomTaskId(), &wait_results)); + ASSERT_EQ(wait_results.size(), ids.size()); + ASSERT_TRUE(wait_results.count(nonexistent_id) == 0); - std::vector wait_results; - RAY_CHECK_OK(provider.Wait(wait_ids, 5, 100, RandomTaskId(), &wait_results)); - ASSERT_EQ(wait_results.size(), 5); - ASSERT_EQ(wait_results, std::vector({true, true, true, true, false})); - - // Test Wait() with duplicate object ids, and the required `num_objects` - // is less than size of `wait_ids`. + // Test Wait() where the required `num_objects` is less than size of `wait_ids`. wait_results.clear(); - RAY_CHECK_OK(provider.Wait(wait_ids, 4, -1, RandomTaskId(), &wait_results)); - ASSERT_EQ(wait_results.size(), 5); - ASSERT_EQ(wait_results, std::vector({true, true, true, true, false})); + RAY_CHECK_OK(provider.Wait(wait_ids, ids.size(), -1, RandomTaskId(), &wait_results)); + ASSERT_EQ(wait_results.size(), ids.size()); + ASSERT_TRUE(wait_results.count(nonexistent_id) == 0); // Test Get(). std::unordered_map> results; @@ -545,8 +539,11 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) { ASSERT_EQ(results.size(), 0); // Test Wait() with objects which will become ready later. + std::vector ready_ids(buffers.size()); std::vector unready_ids(buffers.size()); for (size_t i = 0; i < unready_ids.size(); i++) { + ready_ids[i] = ObjectID::FromRandom(); + RAY_CHECK_OK(provider.Put(buffers[i], ready_ids[i])); unready_ids[i] = ObjectID::FromRandom(); } @@ -560,12 +557,43 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) { std::thread async_thread(thread_func); - // wait for the objects to appear. + wait_ids.clear(); + wait_ids.insert(ready_ids.begin(), ready_ids.end()); + wait_ids.insert(unready_ids.begin(), unready_ids.end()); wait_results.clear(); + + // Check that only the ready ids are returned when timeout ends before thread runs. RAY_CHECK_OK( - provider.Wait(unready_ids, unready_ids.size(), -1, RandomTaskId(), &wait_results)); - // wait for the thread to finish. + provider.Wait(wait_ids, ready_ids.size() + 1, 100, RandomTaskId(), &wait_results)); + ASSERT_EQ(ready_ids.size(), wait_results.size()); + for (const auto &ready_id : ready_ids) { + ASSERT_TRUE(wait_results.find(ready_id) != wait_results.end()); + } + for (const auto &unready_id : unready_ids) { + ASSERT_TRUE(wait_results.find(unready_id) == wait_results.end()); + } + + wait_results.clear(); + // Check that enough objects are returned after the thread inserts at least one object. + RAY_CHECK_OK( + provider.Wait(wait_ids, ready_ids.size() + 1, 5000, RandomTaskId(), &wait_results)); + ASSERT_TRUE(wait_results.size() >= ready_ids.size() + 1); + for (const auto &ready_id : ready_ids) { + ASSERT_TRUE(wait_results.find(ready_id) != wait_results.end()); + } + + wait_results.clear(); + // Check that all objects are returned after the thread completes. async_thread.join(); + RAY_CHECK_OK( + provider.Wait(wait_ids, wait_ids.size(), -1, RandomTaskId(), &wait_results)); + ASSERT_EQ(wait_results.size(), ready_ids.size() + unready_ids.size()); + for (const auto &ready_id : ready_ids) { + ASSERT_TRUE(wait_results.find(ready_id) != wait_results.end()); + } + for (const auto &unready_id : unready_ids) { + ASSERT_TRUE(wait_results.find(unready_id) != wait_results.end()); + } } class ZeroNodeTest : public CoreWorkerTest {