mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 19:48:31 +08:00
Properly short circuit core worker Get() on exception (#5672)
This commit is contained in:
committed by
Philipp Moritz
parent
946ebfaa3c
commit
0bf79cfbde
@@ -221,6 +221,7 @@ cc_library(
|
||||
copts = COPTS,
|
||||
deps = [
|
||||
":common_cc_proto",
|
||||
":gcs_cc_proto",
|
||||
":node_manager_fbs",
|
||||
":ray_util",
|
||||
"@boost//:asio",
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
#include "ray/common/ray_object.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
bool RayObject::IsException() {
|
||||
if (metadata_ == nullptr) {
|
||||
return false;
|
||||
}
|
||||
// TODO (kfstorm): metadata should be structured.
|
||||
const std::string metadata(reinterpret_cast<const char *>(metadata_->Data()),
|
||||
metadata_->Size());
|
||||
const auto error_type_descriptor = ray::rpc::ErrorType_descriptor();
|
||||
for (int i = 0; i < error_type_descriptor->value_count(); i++) {
|
||||
const auto error_type_number = error_type_descriptor->value(i)->number();
|
||||
if (metadata == std::to_string(error_type_number)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
} // namespace ray
|
||||
@@ -2,6 +2,7 @@
|
||||
#define RAY_COMMON_RAY_OBJECT_H
|
||||
|
||||
#include "ray/common/buffer.h"
|
||||
#include "ray/protobuf/gcs.pb.h"
|
||||
#include "ray/util/logging.h"
|
||||
|
||||
namespace ray {
|
||||
@@ -65,6 +66,9 @@ class RayObject {
|
||||
/// Whether this object has metadata.
|
||||
bool HasMetadata() const { return metadata_ != nullptr; }
|
||||
|
||||
/// Whether the object represents an exception.
|
||||
bool IsException();
|
||||
|
||||
private:
|
||||
std::shared_ptr<Buffer> data_;
|
||||
std::shared_ptr<Buffer> metadata_;
|
||||
@@ -74,4 +78,4 @@ class RayObject {
|
||||
|
||||
} // namespace ray
|
||||
|
||||
#endif // RAY_COMMON_BUFFER_H
|
||||
#endif // RAY_COMMON_RAY_OBJECT_H
|
||||
|
||||
@@ -76,6 +76,7 @@ Status CoreWorkerObjectInterface::Get(const std::vector<ObjectID> &ids,
|
||||
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> result_map;
|
||||
auto remaining_timeout_ms = timeout_ms;
|
||||
bool got_exception = false;
|
||||
|
||||
// Re-order the list so that we always get from plasma store provider first,
|
||||
// since it uses a loop of `FetchOrReconstruct` and plasma `Get`, it's not
|
||||
@@ -99,7 +100,10 @@ Status CoreWorkerObjectInterface::Get(const std::vector<ObjectID> &ids,
|
||||
auto start_time = current_time_ms();
|
||||
RAY_RETURN_NOT_OK(store_providers_[entry.first]->Get(
|
||||
entry.second, remaining_timeout_ms, worker_context_.GetCurrentTaskID(),
|
||||
&result_map));
|
||||
&result_map, &got_exception));
|
||||
if (got_exception) {
|
||||
break;
|
||||
}
|
||||
if (remaining_timeout_ms > 0) {
|
||||
int64_t duration = current_time_ms() - start_time;
|
||||
remaining_timeout_ms =
|
||||
|
||||
@@ -30,7 +30,8 @@ Status CoreWorkerMemoryStoreProvider::Put(const RayObject &object,
|
||||
Status CoreWorkerMemoryStoreProvider::Get(
|
||||
const std::unordered_set<ObjectID> &object_ids, int64_t timeout_ms,
|
||||
const TaskID &task_id,
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results) {
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results,
|
||||
bool *got_exception) {
|
||||
const std::vector<ObjectID> id_vector(object_ids.begin(), object_ids.end());
|
||||
std::vector<std::shared_ptr<RayObject>> result_objects;
|
||||
RAY_RETURN_NOT_OK(
|
||||
@@ -39,6 +40,9 @@ Status CoreWorkerMemoryStoreProvider::Get(
|
||||
for (size_t i = 0; i < id_vector.size(); i++) {
|
||||
if (result_objects[i] != nullptr) {
|
||||
(*results)[id_vector[i]] = result_objects[i];
|
||||
if (result_objects[i]->IsException()) {
|
||||
*got_exception = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
|
||||
@@ -25,7 +25,8 @@ class CoreWorkerMemoryStoreProvider : public CoreWorkerStoreProvider {
|
||||
/// See `CoreWorkerStoreProvider::Get` for semantics.
|
||||
Status Get(const std::unordered_set<ObjectID> &object_ids, int64_t timeout_ms,
|
||||
const TaskID &task_id,
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results) override;
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results,
|
||||
bool *got_exception) override;
|
||||
|
||||
/// See `CoreWorkerStoreProvider::Wait` for semantics.
|
||||
/// Note that `num_objects` must equal to number of items in `object_ids`.
|
||||
|
||||
@@ -80,7 +80,7 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore(
|
||||
const auto result_object = std::make_shared<RayObject>(data, metadata);
|
||||
(*results)[object_id] = result_object;
|
||||
remaining.erase(object_id);
|
||||
if (IsException(*result_object)) {
|
||||
if (result_object->IsException()) {
|
||||
*got_exception = true;
|
||||
}
|
||||
}
|
||||
@@ -92,9 +92,9 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore(
|
||||
Status CoreWorkerPlasmaStoreProvider::Get(
|
||||
const std::unordered_set<ObjectID> &object_ids, int64_t timeout_ms,
|
||||
const TaskID &task_id,
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results) {
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results,
|
||||
bool *got_exception) {
|
||||
int64_t batch_size = RayConfig::instance().worker_fetch_request_size();
|
||||
bool got_exception = false;
|
||||
std::vector<ObjectID> batch_ids;
|
||||
std::unordered_set<ObjectID> remaining(object_ids.begin(), object_ids.end());
|
||||
|
||||
@@ -108,11 +108,11 @@ Status CoreWorkerPlasmaStoreProvider::Get(
|
||||
}
|
||||
RAY_RETURN_NOT_OK(FetchAndGetFromPlasmaStore(remaining, batch_ids, /*timeout_ms=*/0,
|
||||
/*fetch_only=*/true, task_id, results,
|
||||
&got_exception));
|
||||
got_exception));
|
||||
}
|
||||
|
||||
// If all objects were fetched already, return.
|
||||
if (remaining.empty() || got_exception) {
|
||||
if (remaining.empty() || *got_exception) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@@ -142,8 +142,8 @@ Status CoreWorkerPlasmaStoreProvider::Get(
|
||||
size_t previous_size = remaining.size();
|
||||
RAY_RETURN_NOT_OK(FetchAndGetFromPlasmaStore(remaining, batch_ids, batch_timeout,
|
||||
/*fetch_only=*/false, task_id, results,
|
||||
&got_exception));
|
||||
should_break = should_break || got_exception;
|
||||
got_exception));
|
||||
should_break = should_break || *got_exception;
|
||||
|
||||
if ((previous_size - remaining.size()) < batch_ids.size()) {
|
||||
unsuccessful_attempts++;
|
||||
@@ -178,23 +178,6 @@ Status CoreWorkerPlasmaStoreProvider::Delete(const std::vector<ObjectID> &object
|
||||
return raylet_client_->FreeObjects(object_ids, local_only, delete_creating_tasks);
|
||||
}
|
||||
|
||||
bool CoreWorkerPlasmaStoreProvider::IsException(const RayObject &object) {
|
||||
// TODO (kfstorm): metadata should be structured.
|
||||
if (!object.HasMetadata()) {
|
||||
return false;
|
||||
}
|
||||
const std::string metadata(reinterpret_cast<const char *>(object.GetMetadata()->Data()),
|
||||
object.GetMetadata()->Size());
|
||||
const auto error_type_descriptor = ray::rpc::ErrorType_descriptor();
|
||||
for (int i = 0; i < error_type_descriptor->value_count(); i++) {
|
||||
const auto error_type_number = error_type_descriptor->value(i)->number();
|
||||
if (metadata == std::to_string(error_type_number)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void CoreWorkerPlasmaStoreProvider::WarnIfAttemptedTooManyTimes(
|
||||
int num_attempts, const std::unordered_set<ObjectID> &remaining) {
|
||||
if (num_attempts % RayConfig::instance().object_store_get_warn_per_num_attempts() ==
|
||||
|
||||
@@ -26,7 +26,8 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider {
|
||||
/// See `CoreWorkerStoreProvider::Get` for semantics.
|
||||
Status Get(const std::unordered_set<ObjectID> &object_ids, int64_t timeout_ms,
|
||||
const TaskID &task_id,
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results) override;
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results,
|
||||
bool *got_exception) override;
|
||||
|
||||
/// See `CoreWorkerStoreProvider::Wait` for semantics.
|
||||
Status Wait(const std::unordered_set<ObjectID> &object_ids, int num_objects,
|
||||
@@ -51,7 +52,7 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider {
|
||||
/// \param[out] results Map of objects to write results into. This method will only
|
||||
/// add to this map, not clear or remove from it, so the caller can pass in a non-empty
|
||||
/// map.
|
||||
/// \param[out] got_exception Whether any of the fetched objects contained an
|
||||
/// \param[out] got_exception Set to true if any of the fetched objects contained an
|
||||
/// exception.
|
||||
/// \return Status.
|
||||
Status FetchAndGetFromPlasmaStore(
|
||||
@@ -60,12 +61,6 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider {
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results,
|
||||
bool *got_exception);
|
||||
|
||||
/// Whether the buffer represents an exception object.
|
||||
///
|
||||
/// \param[in] object Object data.
|
||||
/// \return Whether it represents an exception object.
|
||||
static bool IsException(const RayObject &object);
|
||||
|
||||
/// Print a warning if we've attempted too many times, but some objects are still
|
||||
/// unavailable.
|
||||
///
|
||||
|
||||
@@ -32,11 +32,12 @@ class CoreWorkerStoreProvider {
|
||||
/// \param[in] task_id ID for the current task.
|
||||
/// \param[out] results Map of objects to write results into. Get will only add to this
|
||||
/// map, not clear or remove from it, so the caller can pass in a non-empty map.
|
||||
/// \return Status.
|
||||
virtual Status Get(
|
||||
const std::unordered_set<ObjectID> &object_ids, int64_t timeout_ms,
|
||||
const TaskID &task_id,
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results) = 0;
|
||||
/// \param[out] got_exception Set to true if any of the fetched results were an
|
||||
/// exception. \return Status.
|
||||
virtual Status Get(const std::unordered_set<ObjectID> &object_ids, int64_t timeout_ms,
|
||||
const TaskID &task_id,
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> *results,
|
||||
bool *got_exception) = 0;
|
||||
|
||||
/// Wait for a list of objects to appear in the object store. Objects that appear will
|
||||
/// be added to the ready set.
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include "ray/raylet/raylet_client.h"
|
||||
#include "src/ray/protobuf/direct_actor.grpc.pb.h"
|
||||
#include "src/ray/protobuf/direct_actor.pb.h"
|
||||
#include "src/ray/protobuf/gcs.pb.h"
|
||||
#include "src/ray/util/test_util.h"
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
@@ -518,10 +519,12 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) {
|
||||
ASSERT_TRUE(wait_results.count(nonexistent_id) == 0);
|
||||
|
||||
// Test Get().
|
||||
bool got_exception = false;
|
||||
std::unordered_map<ObjectID, std::shared_ptr<RayObject>> results;
|
||||
std::unordered_set<ObjectID> ids_set(ids.begin(), ids.end());
|
||||
RAY_CHECK_OK(provider.Get(ids_set, -1, RandomTaskId(), &results));
|
||||
RAY_CHECK_OK(provider.Get(ids_set, -1, RandomTaskId(), &results, &got_exception));
|
||||
|
||||
ASSERT_TRUE(!got_exception);
|
||||
ASSERT_EQ(results.size(), ids.size());
|
||||
for (size_t i = 0; i < ids.size(); i++) {
|
||||
const auto &expected = buffers[i];
|
||||
@@ -542,7 +545,8 @@ void CoreWorkerTest::TestStoreProvider(StoreProviderType type) {
|
||||
RAY_CHECK_OK(provider.Delete(ids, true, false));
|
||||
|
||||
usleep(200 * 1000);
|
||||
RAY_CHECK_OK(provider.Get(ids_set, 0, RandomTaskId(), &results));
|
||||
RAY_CHECK_OK(provider.Get(ids_set, 0, RandomTaskId(), &results, &got_exception));
|
||||
ASSERT_TRUE(!got_exception);
|
||||
ASSERT_EQ(results.size(), 0);
|
||||
|
||||
// Test Wait() with objects which will become ready later.
|
||||
@@ -823,6 +827,21 @@ TEST_F(SingleNodeTest, TestObjectInterface) {
|
||||
ASSERT_EQ(*results[i]->GetMetadata(), *buffers[i].GetMetadata());
|
||||
}
|
||||
|
||||
// Test Get() returns early when it encounters an error.
|
||||
std::vector<ObjectID> ids_with_exception(ids.begin(), ids.end());
|
||||
ids_with_exception.push_back(ObjectID::FromRandom());
|
||||
std::vector<RayObject> buffers_with_exception(buffers.begin(), buffers.end());
|
||||
std::string error_string = std::to_string(ray::rpc::TASK_EXECUTION_EXCEPTION);
|
||||
char error_buffer[error_string.size()];
|
||||
size_t len = error_string.copy(error_buffer, error_string.size(), 0);
|
||||
buffers_with_exception.emplace_back(
|
||||
nullptr, std::make_shared<LocalMemoryBuffer>(
|
||||
reinterpret_cast<uint8_t *>(error_buffer), len));
|
||||
|
||||
RAY_CHECK_OK(core_worker.Objects().Put(buffers_with_exception.back(),
|
||||
ids_with_exception.back()));
|
||||
RAY_CHECK_OK(core_worker.Objects().Get(ids_with_exception, -1, &results));
|
||||
|
||||
// Test Wait().
|
||||
ObjectID non_existent_id = ObjectID::FromRandom();
|
||||
std::vector<ObjectID> all_ids(ids);
|
||||
|
||||
Reference in New Issue
Block a user