From 4c834b9d68de4b809835f8a77cdc9e49286e4847 Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Thu, 12 Mar 2020 15:08:29 +0800 Subject: [PATCH] Fix the issue that gcs service client ignores error status code (#7539) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add gcs reply status * rebase master * use macro to simplify * convert status in gcs rpc client * define a Status message in probobuf Co-authored-by: 灵洵 --- src/ray/common/status.cc | 85 +++++++++---------- .../gcs/gcs_server/actor_info_handler_impl.cc | 18 ++-- .../gcs/gcs_server/error_info_handler_impl.cc | 4 +- .../gcs/gcs_server/job_info_handler_impl.cc | 6 +- .../gcs/gcs_server/node_info_handler_impl.cc | 28 +++--- .../gcs_server/object_info_handler_impl.cc | 10 +-- src/ray/gcs/gcs_server/stats_handler_impl.cc | 4 +- .../gcs/gcs_server/task_info_handler_impl.cc | 18 ++-- .../gcs_server/test/gcs_server_rpc_test.cc | 6 +- .../gcs_server/worker_info_handler_impl.cc | 4 +- src/ray/protobuf/gcs_service.proto | 48 +++++++++-- src/ray/rpc/gcs_server/gcs_rpc_client.h | 43 +++++----- src/ray/rpc/gcs_server/gcs_rpc_server.h | 5 ++ 13 files changed, 156 insertions(+), 123 deletions(-) diff --git a/src/ray/common/status.cc b/src/ray/common/status.cc index 59fd9f6b4..ac8263ae8 100644 --- a/src/ray/common/status.cc +++ b/src/ray/common/status.cc @@ -27,11 +27,29 @@ // Adapted from Apache Arrow, Apache Kudu, TensorFlow #include "ray/common/status.h" +#include #include namespace ray { +#define STATUS_CODE_OK "OK" +#define STATUS_CODE_OUT_OF_MEMORY "Out of memory" +#define STATUS_CODE_KEY_ERROR "Key error" +#define STATUS_CODE_TYPE_ERROR "Type error" +#define STATUS_CODE_INVALID "Invalid" +#define STATUS_CODE_IO_ERROR "IOError" +#define STATUS_CODE_OBJECT_EXISTS "ObjectExists" +#define STATUS_CODE_OBJECT_STORE_FULL "ObjectStoreFull" +#define STATUS_CODE_UNKNOWN_ERROR "Unknown error" +#define STATUS_CODE_NOT_IMPLEMENTED "NotImplemented" +#define STATUS_CODE_REDIS_ERROR "RedisError" +#define STATUS_CODE_TIMED_OUT "TimedOut" +#define STATUS_CODE_INTERRUPTED "Interrupted" +#define STATUS_CODE_INTENTIONAL_SYSTEM_EXIT "IntentionalSystemExit" +#define STATUS_CODE_UNEXPECTED_SYSTEM_EXIT "UnexpectedSystemExit" +#define STATUS_CODE_UNKNOWN "Unknown" + Status::Status(StatusCode code, const std::string &msg) { assert(code != StatusCode::OK); state_ = new State; @@ -50,55 +68,30 @@ void Status::CopyFrom(const State *state) { std::string Status::CodeAsString() const { if (state_ == NULL) { - return "OK"; + return STATUS_CODE_OK; } - const char *type; - switch (code()) { - case StatusCode::OK: - type = "OK"; - break; - case StatusCode::OutOfMemory: - type = "Out of memory"; - break; - case StatusCode::KeyError: - type = "Key error"; - break; - case StatusCode::TypeError: - type = "Type error"; - break; - case StatusCode::Invalid: - type = "Invalid"; - break; - case StatusCode::IOError: - type = "IOError"; - break; - case StatusCode::ObjectExists: - type = "ObjectExists"; - break; - case StatusCode::ObjectStoreFull: - type = "ObjectStoreFull"; - break; - case StatusCode::UnknownError: - type = "Unknown error"; - break; - case StatusCode::NotImplemented: - type = "NotImplemented"; - break; - case StatusCode::RedisError: - type = "RedisError"; - break; - case StatusCode::TimedOut: - type = "TimedOut"; - break; - case StatusCode::Interrupted: - type = "Interrupted"; - break; - default: - type = "Unknown"; - break; + static std::map code_to_str = { + {StatusCode::OK, STATUS_CODE_OK}, + {StatusCode::OutOfMemory, STATUS_CODE_OUT_OF_MEMORY}, + {StatusCode::KeyError, STATUS_CODE_KEY_ERROR}, + {StatusCode::TypeError, STATUS_CODE_TYPE_ERROR}, + {StatusCode::Invalid, STATUS_CODE_INVALID}, + {StatusCode::IOError, STATUS_CODE_IO_ERROR}, + {StatusCode::ObjectExists, STATUS_CODE_OBJECT_EXISTS}, + {StatusCode::ObjectStoreFull, STATUS_CODE_OBJECT_STORE_FULL}, + {StatusCode::UnknownError, STATUS_CODE_UNKNOWN_ERROR}, + {StatusCode::NotImplemented, STATUS_CODE_NOT_IMPLEMENTED}, + {StatusCode::RedisError, STATUS_CODE_REDIS_ERROR}, + {StatusCode::TimedOut, STATUS_CODE_TIMED_OUT}, + {StatusCode::Interrupted, STATUS_CODE_INTERRUPTED}, + {StatusCode::IntentionalSystemExit, STATUS_CODE_INTENTIONAL_SYSTEM_EXIT}, + {StatusCode::UnexpectedSystemExit, STATUS_CODE_UNEXPECTED_SYSTEM_EXIT}}; + + if (!code_to_str.count(code())) { + return STATUS_CODE_UNKNOWN; } - return std::string(type); + return code_to_str[code()]; } std::string Status::ToString() const { diff --git a/src/ray/gcs/gcs_server/actor_info_handler_impl.cc b/src/ray/gcs/gcs_server/actor_info_handler_impl.cc index 64bbb17c6..14d3ecd86 100644 --- a/src/ray/gcs/gcs_server/actor_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/actor_info_handler_impl.cc @@ -34,7 +34,7 @@ void DefaultActorInfoHandler::HandleGetActorInfo( RAY_LOG(ERROR) << "Failed to get actor info: " << status.ToString() << ", actor id = " << actor_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Actors().AsyncGet(actor_id, on_done); @@ -51,12 +51,12 @@ void DefaultActorInfoHandler::HandleRegisterActorInfo( RAY_LOG(DEBUG) << "Registering actor info, actor id = " << actor_id; auto actor_table_data = std::make_shared(); actor_table_data->CopyFrom(request.actor_table_data()); - auto on_done = [actor_id, send_reply_callback](Status status) { + auto on_done = [actor_id, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to register actor info: " << status.ToString() << ", actor id = " << actor_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Actors().AsyncRegister(actor_table_data, on_done); @@ -73,12 +73,12 @@ void DefaultActorInfoHandler::HandleUpdateActorInfo( RAY_LOG(DEBUG) << "Updating actor info, actor id = " << actor_id; auto actor_table_data = std::make_shared(); actor_table_data->CopyFrom(request.actor_table_data()); - auto on_done = [actor_id, send_reply_callback](Status status) { + auto on_done = [actor_id, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to update actor info: " << status.ToString() << ", actor id = " << actor_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Actors().AsyncUpdate(actor_id, actor_table_data, on_done); @@ -98,13 +98,13 @@ void DefaultActorInfoHandler::HandleAddActorCheckpoint( << ", checkpoint id = " << checkpoint_id; auto actor_checkpoint_data = std::make_shared(); actor_checkpoint_data->CopyFrom(request.checkpoint_data()); - auto on_done = [actor_id, checkpoint_id, send_reply_callback](Status status) { + auto on_done = [actor_id, checkpoint_id, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to add actor checkpoint: " << status.ToString() << ", actor id = " << actor_id << ", checkpoint id = " << checkpoint_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Actors().AsyncAddCheckpoint(actor_checkpoint_data, on_done); @@ -130,7 +130,7 @@ void DefaultActorInfoHandler::HandleGetActorCheckpoint( RAY_LOG(ERROR) << "Failed to get actor checkpoint: " << status.ToString() << ", checkpoint id = " << checkpoint_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Actors().AsyncGetCheckpoint(checkpoint_id, on_done); @@ -156,7 +156,7 @@ void DefaultActorInfoHandler::HandleGetActorCheckpointID( RAY_LOG(ERROR) << "Failed to get actor checkpoint id: " << status.ToString() << ", actor id = " << actor_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Actors().AsyncGetCheckpointID(actor_id, on_done); diff --git a/src/ray/gcs/gcs_server/error_info_handler_impl.cc b/src/ray/gcs/gcs_server/error_info_handler_impl.cc index d7c5b6aa7..c14881606 100644 --- a/src/ray/gcs/gcs_server/error_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/error_info_handler_impl.cc @@ -25,12 +25,12 @@ void DefaultErrorInfoHandler::HandleReportJobError( RAY_LOG(DEBUG) << "Reporting job error, job id = " << job_id << ", type = " << type; auto error_table_data = std::make_shared(); error_table_data->CopyFrom(request.error_data()); - auto on_done = [job_id, type, send_reply_callback](Status status) { + auto on_done = [job_id, type, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to report job error, job id = " << job_id << ", type = " << type; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Errors().AsyncReportJobError(error_table_data, on_done); diff --git a/src/ray/gcs/gcs_server/job_info_handler_impl.cc b/src/ray/gcs/gcs_server/job_info_handler_impl.cc index ce6004c5a..fa467437e 100644 --- a/src/ray/gcs/gcs_server/job_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/job_info_handler_impl.cc @@ -29,8 +29,7 @@ void DefaultJobInfoHandler::HandleAddJob(const rpc::AddJobRequest &request, RAY_LOG(ERROR) << "Failed to add job, job id = " << job_id << ", driver pid = " << request.data().driver_pid(); } - reply->set_success(status.ok()); - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Jobs().AsyncAdd(job_table_data, on_done); @@ -50,8 +49,7 @@ void DefaultJobInfoHandler::HandleMarkJobFinished( if (!status.ok()) { RAY_LOG(ERROR) << "Failed to mark job state, job id = " << job_id; } - reply->set_success(status.ok()); - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Jobs().AsyncMarkFinished(job_id, on_done); diff --git a/src/ray/gcs/gcs_server/node_info_handler_impl.cc b/src/ray/gcs/gcs_server/node_info_handler_impl.cc index 9637aa2bc..11a88c3ff 100644 --- a/src/ray/gcs/gcs_server/node_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/node_info_handler_impl.cc @@ -24,12 +24,12 @@ void DefaultNodeInfoHandler::HandleRegisterNode( ClientID node_id = ClientID::FromBinary(request.node_info().node_id()); RAY_LOG(DEBUG) << "Registering node info, node id = " << node_id; - auto on_done = [node_id, send_reply_callback](Status status) { + auto on_done = [node_id, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to register node info: " << status.ToString() << ", node id = " << node_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Nodes().AsyncRegister(request.node_info(), on_done); @@ -45,12 +45,12 @@ void DefaultNodeInfoHandler::HandleUnregisterNode( ClientID node_id = ClientID::FromBinary(request.node_id()); RAY_LOG(DEBUG) << "Unregistering node info, node id = " << node_id; - auto on_done = [node_id, send_reply_callback](Status status) { + auto on_done = [node_id, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to unregister node info: " << status.ToString() << ", node id = " << node_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Nodes().AsyncUnregister(node_id, on_done); @@ -73,7 +73,7 @@ void DefaultNodeInfoHandler::HandleGetAllNodeInfo( } else { RAY_LOG(ERROR) << "Failed to get all nodes info: " << status.ToString(); } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Nodes().AsyncGetAll(on_done); @@ -89,12 +89,12 @@ void DefaultNodeInfoHandler::HandleReportHeartbeat( ClientID node_id = ClientID::FromBinary(request.heartbeat().client_id()); RAY_LOG(DEBUG) << "Reporting heartbeat, node id = " << node_id; - auto on_done = [node_id, send_reply_callback](Status status) { + auto on_done = [node_id, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to report heartbeat: " << status.ToString() << ", node id = " << node_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; auto heartbeat_data = std::make_shared(); @@ -112,12 +112,12 @@ void DefaultNodeInfoHandler::HandleReportBatchHeartbeat( RAY_LOG(DEBUG) << "Reporting batch heartbeat, batch size = " << request.heartbeat_batch().batch_size(); - auto on_done = [&request, send_reply_callback](Status status) { + auto on_done = [&request, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to report batch heartbeat: " << status.ToString() << ", batch size = " << request.heartbeat_batch().batch_size(); } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; auto heartbeat_batch_data = std::make_shared(); @@ -151,7 +151,7 @@ void DefaultNodeInfoHandler::HandleGetResources(const GetResourcesRequest &reque RAY_LOG(ERROR) << "Failed to get node resources: " << status.ToString() << ", node id = " << node_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Nodes().AsyncGetResources(node_id, on_done); @@ -173,12 +173,12 @@ void DefaultNodeInfoHandler::HandleUpdateResources( resources[resource.first] = std::make_shared(resource.second); } - auto on_done = [node_id, send_reply_callback](Status status) { + auto on_done = [node_id, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to update node resources: " << status.ToString() << ", node id = " << node_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Nodes().AsyncUpdateResources(node_id, resources, on_done); @@ -196,12 +196,12 @@ void DefaultNodeInfoHandler::HandleDeleteResources( auto resource_names = VectorFromProtobuf(request.resource_name_list()); RAY_LOG(DEBUG) << "Deleting node resources, node id = " << node_id; - auto on_done = [node_id, send_reply_callback](Status status) { + auto on_done = [node_id, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to delete node resources: " << status.ToString() << ", node id = " << node_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = diff --git a/src/ray/gcs/gcs_server/object_info_handler_impl.cc b/src/ray/gcs/gcs_server/object_info_handler_impl.cc index d2e99a639..56b79d0ce 100644 --- a/src/ray/gcs/gcs_server/object_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/object_info_handler_impl.cc @@ -34,7 +34,7 @@ void DefaultObjectInfoHandler::HandleGetObjectLocations( RAY_LOG(ERROR) << "Failed to get object locations: " << status.ToString() << ", object id = " << object_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Objects().AsyncGetLocations(object_id, on_done); @@ -53,12 +53,12 @@ void DefaultObjectInfoHandler::HandleAddObjectLocation( RAY_LOG(DEBUG) << "Adding object location, object id = " << object_id << ", node id = " << node_id; - auto on_done = [object_id, node_id, send_reply_callback](Status status) { + auto on_done = [object_id, node_id, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to add object location: " << status.ToString() << ", object id = " << object_id << ", node id = " << node_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Objects().AsyncAddLocation(object_id, node_id, on_done); @@ -78,12 +78,12 @@ void DefaultObjectInfoHandler::HandleRemoveObjectLocation( RAY_LOG(DEBUG) << "Removing object location, object id = " << object_id << ", node id = " << node_id; - auto on_done = [object_id, node_id, send_reply_callback](Status status) { + auto on_done = [object_id, node_id, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to remove object location: " << status.ToString() << ", object id = " << object_id << ", node id = " << node_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Objects().AsyncRemoveLocation(object_id, node_id, on_done); diff --git a/src/ray/gcs/gcs_server/stats_handler_impl.cc b/src/ray/gcs/gcs_server/stats_handler_impl.cc index 448bb76f1..e04560142 100644 --- a/src/ray/gcs/gcs_server/stats_handler_impl.cc +++ b/src/ray/gcs/gcs_server/stats_handler_impl.cc @@ -25,13 +25,13 @@ void DefaultStatsHandler::HandleAddProfileData(const AddProfileDataRequest &requ << request.profile_data().component_type() << ", node id = " << node_id; auto profile_table_data = std::make_shared(); profile_table_data->CopyFrom(request.profile_data()); - auto on_done = [node_id, request, send_reply_callback](Status status) { + auto on_done = [node_id, request, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to add profile data, component type = " << request.profile_data().component_type() << ", node id = " << node_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Stats().AsyncAddProfileData(profile_table_data, on_done); diff --git a/src/ray/gcs/gcs_server/task_info_handler_impl.cc b/src/ray/gcs/gcs_server/task_info_handler_impl.cc index 88940a417..691285fda 100644 --- a/src/ray/gcs/gcs_server/task_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/task_info_handler_impl.cc @@ -25,12 +25,12 @@ void DefaultTaskInfoHandler::HandleAddTask(const AddTaskRequest &request, RAY_LOG(DEBUG) << "Adding task, task id = " << task_id << ", job id = " << job_id; auto task_table_data = std::make_shared(); task_table_data->CopyFrom(request.task_data()); - auto on_done = [job_id, task_id, request, send_reply_callback](Status status) { + auto on_done = [job_id, task_id, request, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to add task, task id = " << task_id << ", job id = " << job_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Tasks().AsyncAdd(task_table_data, on_done); @@ -54,7 +54,7 @@ void DefaultTaskInfoHandler::HandleGetTask(const GetTaskRequest &request, } else { RAY_LOG(ERROR) << "Failed to get task, task id = " << task_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Tasks().AsyncGet(task_id, on_done); @@ -69,11 +69,11 @@ void DefaultTaskInfoHandler::HandleDeleteTasks(const DeleteTasksRequest &request SendReplyCallback send_reply_callback) { std::vector task_ids = IdVectorFromProtobuf(request.task_id_list()); RAY_LOG(DEBUG) << "Deleting tasks, task id list size = " << task_ids.size(); - auto on_done = [task_ids, request, send_reply_callback](Status status) { + auto on_done = [task_ids, request, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to delete tasks, task id list size = " << task_ids.size(); } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Tasks().AsyncDelete(task_ids, on_done); @@ -92,12 +92,12 @@ void DefaultTaskInfoHandler::HandleAddTaskLease(const AddTaskLeaseRequest &reque << ", node id = " << node_id; auto task_lease_data = std::make_shared(); task_lease_data->CopyFrom(request.task_lease_data()); - auto on_done = [task_id, node_id, request, send_reply_callback](Status status) { + auto on_done = [task_id, node_id, request, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to add task lease, task id = " << task_id << ", node id = " << node_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = gcs_client_.Tasks().AsyncAddTaskLease(task_lease_data, on_done); @@ -118,13 +118,13 @@ void DefaultTaskInfoHandler::HandleAttemptTaskReconstruction( << ", node id = " << node_id; auto task_reconstruction_data = std::make_shared(); task_reconstruction_data->CopyFrom(request.task_reconstruction()); - auto on_done = [node_id, request, send_reply_callback](Status status) { + auto on_done = [node_id, request, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to reconstruct task, reconstructions num = " << request.task_reconstruction().num_reconstructions() << ", node id = " << node_id; } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = diff --git a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc index 118a72783..21b595ca6 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc @@ -455,8 +455,8 @@ class GcsServerTest : public RedisServiceManagerForTest { std::unique_ptr client_; std::unique_ptr client_call_manager_; - // Timeout waiting for gcs server reply, default is 2s - const uint64_t timeout_ms_ = 2000; + // Timeout waiting for gcs server reply, default is 5s + const uint64_t timeout_ms_ = 5000; }; TEST_F(GcsServerTest, TestActorInfo) { @@ -618,6 +618,8 @@ TEST_F(GcsServerTest, TestTaskInfo) { rpc::DeleteTasksRequest delete_tasks_request; delete_tasks_request.add_task_id_list(task_id.Binary()); ASSERT_TRUE(DeleteTasks(delete_tasks_request)); + result = GetTask(task_id.Binary()); + ASSERT_TRUE(!result.has_task()); // Add task lease ClientID node_id = ClientID::FromRandom(); diff --git a/src/ray/gcs/gcs_server/worker_info_handler_impl.cc b/src/ray/gcs/gcs_server/worker_info_handler_impl.cc index 601f1c40f..05bcb84c9 100644 --- a/src/ray/gcs/gcs_server/worker_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/worker_info_handler_impl.cc @@ -24,12 +24,12 @@ void DefaultWorkerInfoHandler::HandleReportWorkerFailure( RAY_LOG(DEBUG) << "Reporting worker failure, " << worker_address.DebugString(); auto worker_failure_data = std::make_shared(); worker_failure_data->CopyFrom(request.worker_failure()); - auto on_done = [worker_address, send_reply_callback](Status status) { + auto on_done = [worker_address, reply, send_reply_callback](Status status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to report worker failure, " << worker_address.DebugString(); } - send_reply_callback(status, nullptr, nullptr); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; Status status = diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 77061613a..b7d2aa994 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -18,12 +18,17 @@ package ray.rpc; import "src/ray/protobuf/gcs.proto"; +message GcsStatus { + int32 code = 1; + string message = 2; +} + message AddJobRequest { JobTableData data = 1; } message AddJobReply { - bool success = 1; + GcsStatus status = 1; } message MarkJobFinishedRequest { @@ -31,7 +36,7 @@ message MarkJobFinishedRequest { } message MarkJobFinishedReply { - bool success = 1; + GcsStatus status = 1; } // Service for job info access. @@ -48,8 +53,9 @@ message GetActorInfoRequest { } message GetActorInfoReply { + GcsStatus status = 1; // Data of actor. - ActorTableData actor_table_data = 1; + ActorTableData actor_table_data = 2; } message RegisterActorInfoRequest { @@ -58,6 +64,7 @@ message RegisterActorInfoRequest { } message RegisterActorInfoReply { + GcsStatus status = 1; } message UpdateActorInfoRequest { @@ -68,6 +75,7 @@ message UpdateActorInfoRequest { } message UpdateActorInfoReply { + GcsStatus status = 1; } message AddActorCheckpointRequest { @@ -75,6 +83,7 @@ message AddActorCheckpointRequest { } message AddActorCheckpointReply { + GcsStatus status = 1; } message GetActorCheckpointRequest { @@ -82,7 +91,8 @@ message GetActorCheckpointRequest { } message GetActorCheckpointReply { - ActorCheckpointData checkpoint_data = 1; + GcsStatus status = 1; + ActorCheckpointData checkpoint_data = 2; } message GetActorCheckpointIDRequest { @@ -90,7 +100,8 @@ message GetActorCheckpointIDRequest { } message GetActorCheckpointIDReply { - ActorCheckpointIdData checkpoint_id_data = 1; + GcsStatus status = 1; + ActorCheckpointIdData checkpoint_id_data = 2; } // Service for actor info access. @@ -116,6 +127,7 @@ message RegisterNodeRequest { } message RegisterNodeReply { + GcsStatus status = 1; } message UnregisterNodeRequest { @@ -124,13 +136,15 @@ message UnregisterNodeRequest { } message UnregisterNodeReply { + GcsStatus status = 1; } message GetAllNodeInfoRequest { } message GetAllNodeInfoReply { - repeated GcsNodeInfo node_info_list = 1; + GcsStatus status = 1; + repeated GcsNodeInfo node_info_list = 2; } message ReportHeartbeatRequest { @@ -138,6 +152,7 @@ message ReportHeartbeatRequest { } message ReportHeartbeatReply { + GcsStatus status = 1; } message ReportBatchHeartbeatRequest { @@ -145,6 +160,7 @@ message ReportBatchHeartbeatRequest { } message ReportBatchHeartbeatReply { + GcsStatus status = 1; } message GetResourcesRequest { @@ -152,7 +168,8 @@ message GetResourcesRequest { } message GetResourcesReply { - map resources = 1; + GcsStatus status = 1; + map resources = 2; } message UpdateResourcesRequest { @@ -161,6 +178,7 @@ message UpdateResourcesRequest { } message UpdateResourcesReply { + GcsStatus status = 1; } message DeleteResourcesRequest { @@ -169,6 +187,7 @@ message DeleteResourcesRequest { } message DeleteResourcesReply { + GcsStatus status = 1; } // Service for node info access. @@ -198,8 +217,9 @@ message GetObjectLocationsRequest { } message GetObjectLocationsReply { + GcsStatus status = 1; // Data of object - repeated ObjectTableData object_table_data_list = 1; + repeated ObjectTableData object_table_data_list = 2; } message AddObjectLocationRequest { @@ -210,6 +230,7 @@ message AddObjectLocationRequest { } message AddObjectLocationReply { + GcsStatus status = 1; } message RemoveObjectLocationRequest { @@ -220,6 +241,7 @@ message RemoveObjectLocationRequest { } message RemoveObjectLocationReply { + GcsStatus status = 1; } // Service for object info access. @@ -238,6 +260,7 @@ message AddTaskRequest { } message AddTaskReply { + GcsStatus status = 1; } message GetTaskRequest { @@ -245,7 +268,8 @@ message GetTaskRequest { } message GetTaskReply { - TaskTableData task_data = 1; + GcsStatus status = 1; + TaskTableData task_data = 2; } message DeleteTasksRequest { @@ -253,6 +277,7 @@ message DeleteTasksRequest { } message DeleteTasksReply { + GcsStatus status = 1; } message AddTaskLeaseRequest { @@ -260,6 +285,7 @@ message AddTaskLeaseRequest { } message AddTaskLeaseReply { + GcsStatus status = 1; } message AttemptTaskReconstructionRequest { @@ -267,6 +293,7 @@ message AttemptTaskReconstructionRequest { } message AttemptTaskReconstructionReply { + GcsStatus status = 1; } // Service for task info access. @@ -289,6 +316,7 @@ message AddProfileDataRequest { } message AddProfileDataReply { + GcsStatus status = 1; } // Service for stats access. @@ -302,6 +330,7 @@ message ReportJobErrorRequest { } message ReportJobErrorReply { + GcsStatus status = 1; } // Service for error info access. @@ -315,6 +344,7 @@ message ReportWorkerFailureRequest { } message ReportWorkerFailureReply { + GcsStatus status = 1; } // Service for worker info access. diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index da3a6a1cb..98b85db90 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -46,25 +46,30 @@ class Executor { }; // Define a void GCS RPC client method. -#define VOID_GCS_RPC_CLIENT_METHOD(SERVICE, METHOD, grpc_client, SPECS) \ - void METHOD(const METHOD##Request &request, \ - const ClientCallback &callback) SPECS { \ - auto executor = new Executor(this); \ - auto operation_callback = [this, request, callback, executor]( \ - const Status &status, const METHOD##Reply &reply) { \ - if (!status.IsIOError()) { \ - callback(status, reply); \ - delete executor; \ - } else { \ - Reconnect(); \ - executor->Retry(); \ - } \ - }; \ - auto operation = [request, operation_callback](GcsRpcClient *gcs_rpc_client) { \ - RAY_UNUSED(INVOKE_RPC_CALL(SERVICE, METHOD, request, operation_callback, \ - gcs_rpc_client->grpc_client)); \ - }; \ - executor->Execute(operation); \ +#define VOID_GCS_RPC_CLIENT_METHOD(SERVICE, METHOD, grpc_client, SPECS) \ + void METHOD(const METHOD##Request &request, \ + const ClientCallback &callback) SPECS { \ + auto executor = new Executor(this); \ + auto operation_callback = [this, request, callback, executor]( \ + const ray::Status &status, \ + const METHOD##Reply &reply) { \ + if (!status.IsIOError()) { \ + auto status = \ + reply.status().code() == (int)StatusCode::OK \ + ? Status() \ + : Status(StatusCode(reply.status().code()), reply.status().message()); \ + callback(status, reply); \ + delete executor; \ + } else { \ + Reconnect(); \ + executor->Retry(); \ + } \ + }; \ + auto operation = [request, operation_callback](GcsRpcClient *gcs_rpc_client) { \ + RAY_UNUSED(INVOKE_RPC_CALL(SERVICE, METHOD, request, operation_callback, \ + gcs_rpc_client->grpc_client)); \ + }; \ + executor->Execute(operation); \ } /// Client used for communicating with gcs server. diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index 9a131fd22..e621249b1 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -46,6 +46,11 @@ namespace rpc { #define WORKER_INFO_SERVICE_RPC_HANDLER(HANDLER) \ RPC_SERVICE_HANDLER(WorkerInfoGcsService, HANDLER) +#define GCS_RPC_SEND_REPLY(send_reply_callback, reply, status) \ + reply->mutable_status()->set_code((int)status.code()); \ + reply->mutable_status()->set_message(status.message()); \ + send_reply_callback(ray::Status::OK(), nullptr, nullptr) + class JobInfoGcsServiceHandler { public: virtual ~JobInfoGcsServiceHandler() = default;