From 0bf79cfbdeca54cd46d1e1ad677820788146f419 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Wed, 11 Sep 2019 18:38:14 -0700 Subject: [PATCH] Properly short circuit core worker Get() on exception (#5672) --- BUILD.bazel | 1 + src/ray/common/ray_object.cc | 22 +++++++++++++ src/ray/common/ray_object.h | 6 +++- src/ray/core_worker/object_interface.cc | 6 +++- .../store_provider/memory_store_provider.cc | 6 +++- .../store_provider/memory_store_provider.h | 3 +- .../store_provider/plasma_store_provider.cc | 31 +++++-------------- .../store_provider/plasma_store_provider.h | 11 ++----- .../store_provider/store_provider.h | 11 ++++--- src/ray/core_worker/test/core_worker_test.cc | 23 ++++++++++++-- 10 files changed, 77 insertions(+), 43 deletions(-) create mode 100644 src/ray/common/ray_object.cc diff --git a/BUILD.bazel b/BUILD.bazel index 1dd14b209..275a74540 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -221,6 +221,7 @@ cc_library( copts = COPTS, deps = [ ":common_cc_proto", + ":gcs_cc_proto", ":node_manager_fbs", ":ray_util", "@boost//:asio", diff --git a/src/ray/common/ray_object.cc b/src/ray/common/ray_object.cc new file mode 100644 index 000000000..eb3057fd9 --- /dev/null +++ b/src/ray/common/ray_object.cc @@ -0,0 +1,22 @@ +#include "ray/common/ray_object.h" + +namespace ray { + +bool RayObject::IsException() { + if (metadata_ == nullptr) { + return false; + } + // TODO (kfstorm): metadata should be structured. + const std::string metadata(reinterpret_cast(metadata_->Data()), + metadata_->Size()); + const auto error_type_descriptor = ray::rpc::ErrorType_descriptor(); + for (int i = 0; i < error_type_descriptor->value_count(); i++) { + const auto error_type_number = error_type_descriptor->value(i)->number(); + if (metadata == std::to_string(error_type_number)) { + return true; + } + } + return false; +} + +} // namespace ray diff --git a/src/ray/common/ray_object.h b/src/ray/common/ray_object.h index a7b3c9b21..163a338d1 100644 --- a/src/ray/common/ray_object.h +++ b/src/ray/common/ray_object.h @@ -2,6 +2,7 @@ #define RAY_COMMON_RAY_OBJECT_H #include "ray/common/buffer.h" +#include "ray/protobuf/gcs.pb.h" #include "ray/util/logging.h" namespace ray { @@ -65,6 +66,9 @@ class RayObject { /// Whether this object has metadata. bool HasMetadata() const { return metadata_ != nullptr; } + /// Whether the object represents an exception. + bool IsException(); + private: std::shared_ptr data_; std::shared_ptr metadata_; @@ -74,4 +78,4 @@ class RayObject { } // namespace ray -#endif // RAY_COMMON_BUFFER_H \ No newline at end of file +#endif // RAY_COMMON_RAY_OBJECT_H diff --git a/src/ray/core_worker/object_interface.cc b/src/ray/core_worker/object_interface.cc index 79a1d6e77..9b2e3ea45 100644 --- a/src/ray/core_worker/object_interface.cc +++ b/src/ray/core_worker/object_interface.cc @@ -76,6 +76,7 @@ Status CoreWorkerObjectInterface::Get(const std::vector &ids, std::unordered_map> result_map; auto remaining_timeout_ms = timeout_ms; + bool got_exception = false; // Re-order the list so that we always get from plasma store provider first, // since it uses a loop of `FetchOrReconstruct` and plasma `Get`, it's not @@ -99,7 +100,10 @@ Status CoreWorkerObjectInterface::Get(const std::vector &ids, auto start_time = current_time_ms(); RAY_RETURN_NOT_OK(store_providers_[entry.first]->Get( entry.second, remaining_timeout_ms, worker_context_.GetCurrentTaskID(), - &result_map)); + &result_map, &got_exception)); + if (got_exception) { + break; + } if (remaining_timeout_ms > 0) { int64_t duration = current_time_ms() - start_time; remaining_timeout_ms = diff --git a/src/ray/core_worker/store_provider/memory_store_provider.cc b/src/ray/core_worker/store_provider/memory_store_provider.cc index ebbb8ac93..676456613 100644 --- a/src/ray/core_worker/store_provider/memory_store_provider.cc +++ b/src/ray/core_worker/store_provider/memory_store_provider.cc @@ -30,7 +30,8 @@ Status CoreWorkerMemoryStoreProvider::Put(const RayObject &object, Status CoreWorkerMemoryStoreProvider::Get( const std::unordered_set &object_ids, int64_t timeout_ms, const TaskID &task_id, - std::unordered_map> *results) { + std::unordered_map> *results, + bool *got_exception) { const std::vector id_vector(object_ids.begin(), object_ids.end()); std::vector> result_objects; RAY_RETURN_NOT_OK( @@ -39,6 +40,9 @@ Status CoreWorkerMemoryStoreProvider::Get( for (size_t i = 0; i < id_vector.size(); i++) { if (result_objects[i] != nullptr) { (*results)[id_vector[i]] = result_objects[i]; + if (result_objects[i]->IsException()) { + *got_exception = true; + } } } return Status::OK(); diff --git a/src/ray/core_worker/store_provider/memory_store_provider.h b/src/ray/core_worker/store_provider/memory_store_provider.h index c60d7f7ca..cad734b49 100644 --- a/src/ray/core_worker/store_provider/memory_store_provider.h +++ b/src/ray/core_worker/store_provider/memory_store_provider.h @@ -25,7 +25,8 @@ class CoreWorkerMemoryStoreProvider : public CoreWorkerStoreProvider { /// See `CoreWorkerStoreProvider::Get` for semantics. Status Get(const std::unordered_set &object_ids, int64_t timeout_ms, const TaskID &task_id, - std::unordered_map> *results) override; + std::unordered_map> *results, + bool *got_exception) override; /// See `CoreWorkerStoreProvider::Wait` for semantics. /// Note that `num_objects` must equal to number of items in `object_ids`. diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 807c9755d..618748a96 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -80,7 +80,7 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore( const auto result_object = std::make_shared(data, metadata); (*results)[object_id] = result_object; remaining.erase(object_id); - if (IsException(*result_object)) { + if (result_object->IsException()) { *got_exception = true; } } @@ -92,9 +92,9 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore( Status CoreWorkerPlasmaStoreProvider::Get( const std::unordered_set &object_ids, int64_t timeout_ms, const TaskID &task_id, - std::unordered_map> *results) { + std::unordered_map> *results, + bool *got_exception) { int64_t batch_size = RayConfig::instance().worker_fetch_request_size(); - bool got_exception = false; std::vector batch_ids; std::unordered_set remaining(object_ids.begin(), object_ids.end()); @@ -108,11 +108,11 @@ Status CoreWorkerPlasmaStoreProvider::Get( } RAY_RETURN_NOT_OK(FetchAndGetFromPlasmaStore(remaining, batch_ids, /*timeout_ms=*/0, /*fetch_only=*/true, task_id, results, - &got_exception)); + got_exception)); } // If all objects were fetched already, return. - if (remaining.empty() || got_exception) { + if (remaining.empty() || *got_exception) { return Status::OK(); } @@ -142,8 +142,8 @@ Status CoreWorkerPlasmaStoreProvider::Get( size_t previous_size = remaining.size(); RAY_RETURN_NOT_OK(FetchAndGetFromPlasmaStore(remaining, batch_ids, batch_timeout, /*fetch_only=*/false, task_id, results, - &got_exception)); - should_break = should_break || got_exception; + got_exception)); + should_break = should_break || *got_exception; if ((previous_size - remaining.size()) < batch_ids.size()) { unsuccessful_attempts++; @@ -178,23 +178,6 @@ Status CoreWorkerPlasmaStoreProvider::Delete(const std::vector &object return raylet_client_->FreeObjects(object_ids, local_only, delete_creating_tasks); } -bool CoreWorkerPlasmaStoreProvider::IsException(const RayObject &object) { - // TODO (kfstorm): metadata should be structured. - if (!object.HasMetadata()) { - return false; - } - const std::string metadata(reinterpret_cast(object.GetMetadata()->Data()), - object.GetMetadata()->Size()); - const auto error_type_descriptor = ray::rpc::ErrorType_descriptor(); - for (int i = 0; i < error_type_descriptor->value_count(); i++) { - const auto error_type_number = error_type_descriptor->value(i)->number(); - if (metadata == std::to_string(error_type_number)) { - return true; - } - } - return false; -} - void CoreWorkerPlasmaStoreProvider::WarnIfAttemptedTooManyTimes( int num_attempts, const std::unordered_set &remaining) { if (num_attempts % RayConfig::instance().object_store_get_warn_per_num_attempts() == diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 6436c5fcf..1ca5249bb 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -26,7 +26,8 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { /// See `CoreWorkerStoreProvider::Get` for semantics. Status Get(const std::unordered_set &object_ids, int64_t timeout_ms, const TaskID &task_id, - std::unordered_map> *results) override; + std::unordered_map> *results, + bool *got_exception) override; /// See `CoreWorkerStoreProvider::Wait` for semantics. Status Wait(const std::unordered_set &object_ids, int num_objects, @@ -51,7 +52,7 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { /// \param[out] results Map of objects to write results into. This method will only /// add to this map, not clear or remove from it, so the caller can pass in a non-empty /// map. - /// \param[out] got_exception Whether any of the fetched objects contained an + /// \param[out] got_exception Set to true if any of the fetched objects contained an /// exception. /// \return Status. Status FetchAndGetFromPlasmaStore( @@ -60,12 +61,6 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { std::unordered_map> *results, bool *got_exception); - /// Whether the buffer represents an exception object. - /// - /// \param[in] object Object data. - /// \return Whether it represents an exception object. - static bool IsException(const RayObject &object); - /// Print a warning if we've attempted too many times, but some objects are still /// unavailable. /// diff --git a/src/ray/core_worker/store_provider/store_provider.h b/src/ray/core_worker/store_provider/store_provider.h index 0e3fffc23..394fc2196 100644 --- a/src/ray/core_worker/store_provider/store_provider.h +++ b/src/ray/core_worker/store_provider/store_provider.h @@ -32,11 +32,12 @@ class CoreWorkerStoreProvider { /// \param[in] task_id ID for the current task. /// \param[out] results Map of objects to write results into. Get will only add to this /// map, not clear or remove from it, so the caller can pass in a non-empty map. - /// \return Status. - virtual Status Get( - const std::unordered_set &object_ids, int64_t timeout_ms, - const TaskID &task_id, - std::unordered_map> *results) = 0; + /// \param[out] got_exception Set to true if any of the fetched results were an + /// exception. \return Status. + virtual Status Get(const std::unordered_set &object_ids, int64_t timeout_ms, + const TaskID &task_id, + std::unordered_map> *results, + bool *got_exception) = 0; /// Wait for a list of objects to appear in the object store. Objects that appear will /// be added to the ready set. diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 649a61642..a1068d867 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -13,6 +13,7 @@ #include "ray/raylet/raylet_client.h" #include "src/ray/protobuf/direct_actor.grpc.pb.h" #include "src/ray/protobuf/direct_actor.pb.h" +#include "src/ray/protobuf/gcs.pb.h" #include "src/ray/util/test_util.h" #include @@ -518,10 +519,12 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) { ASSERT_TRUE(wait_results.count(nonexistent_id) == 0); // Test Get(). + bool got_exception = false; std::unordered_map> results; std::unordered_set ids_set(ids.begin(), ids.end()); - RAY_CHECK_OK(provider.Get(ids_set, -1, RandomTaskId(), &results)); + RAY_CHECK_OK(provider.Get(ids_set, -1, RandomTaskId(), &results, &got_exception)); + ASSERT_TRUE(!got_exception); ASSERT_EQ(results.size(), ids.size()); for (size_t i = 0; i < ids.size(); i++) { const auto &expected = buffers[i]; @@ -542,7 +545,8 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) { RAY_CHECK_OK(provider.Delete(ids, true, false)); usleep(200 * 1000); - RAY_CHECK_OK(provider.Get(ids_set, 0, RandomTaskId(), &results)); + RAY_CHECK_OK(provider.Get(ids_set, 0, RandomTaskId(), &results, &got_exception)); + ASSERT_TRUE(!got_exception); ASSERT_EQ(results.size(), 0); // Test Wait() with objects which will become ready later. @@ -823,6 +827,21 @@ TEST_F(SingleNodeTest, TestObjectInterface) { ASSERT_EQ(*results[i]->GetMetadata(), *buffers[i].GetMetadata()); } + // Test Get() returns early when it encounters an error. + std::vector ids_with_exception(ids.begin(), ids.end()); + ids_with_exception.push_back(ObjectID::FromRandom()); + std::vector buffers_with_exception(buffers.begin(), buffers.end()); + std::string error_string = std::to_string(ray::rpc::TASK_EXECUTION_EXCEPTION); + char error_buffer[error_string.size()]; + size_t len = error_string.copy(error_buffer, error_string.size(), 0); + buffers_with_exception.emplace_back( + nullptr, std::make_shared( + reinterpret_cast(error_buffer), len)); + + RAY_CHECK_OK(core_worker.Objects().Put(buffers_with_exception.back(), + ids_with_exception.back())); + RAY_CHECK_OK(core_worker.Objects().Get(ids_with_exception, -1, &results)); + // Test Wait(). ObjectID non_existent_id = ObjectID::FromRandom(); std::vector all_ids(ids);