Fix the issue that gcs service client ignores error status code (#7539)

* add gcs reply status

* rebase master

* use macro to simplify

* convert status in gcs rpc client

* define a Status message in probobuf

Co-authored-by: 灵洵 <fengbin.ffb@antfin.com>
This commit is contained in:
fangfengbin
2020-03-12 15:08:29 +08:00
committed by GitHub
parent 20ef4a8603
commit 4c834b9d68
13 changed files with 156 additions and 123 deletions
+39 -46
View File
@@ -27,11 +27,29 @@
// Adapted from Apache Arrow, Apache Kudu, TensorFlow
#include "ray/common/status.h"
#include <map>
#include <assert.h>
namespace ray {
#define STATUS_CODE_OK "OK"
#define STATUS_CODE_OUT_OF_MEMORY "Out of memory"
#define STATUS_CODE_KEY_ERROR "Key error"
#define STATUS_CODE_TYPE_ERROR "Type error"
#define STATUS_CODE_INVALID "Invalid"
#define STATUS_CODE_IO_ERROR "IOError"
#define STATUS_CODE_OBJECT_EXISTS "ObjectExists"
#define STATUS_CODE_OBJECT_STORE_FULL "ObjectStoreFull"
#define STATUS_CODE_UNKNOWN_ERROR "Unknown error"
#define STATUS_CODE_NOT_IMPLEMENTED "NotImplemented"
#define STATUS_CODE_REDIS_ERROR "RedisError"
#define STATUS_CODE_TIMED_OUT "TimedOut"
#define STATUS_CODE_INTERRUPTED "Interrupted"
#define STATUS_CODE_INTENTIONAL_SYSTEM_EXIT "IntentionalSystemExit"
#define STATUS_CODE_UNEXPECTED_SYSTEM_EXIT "UnexpectedSystemExit"
#define STATUS_CODE_UNKNOWN "Unknown"
Status::Status(StatusCode code, const std::string &msg) {
assert(code != StatusCode::OK);
state_ = new State;
@@ -50,55 +68,30 @@ void Status::CopyFrom(const State *state) {
std::string Status::CodeAsString() const {
if (state_ == NULL) {
return "OK";
return STATUS_CODE_OK;
}
const char *type;
switch (code()) {
case StatusCode::OK:
type = "OK";
break;
case StatusCode::OutOfMemory:
type = "Out of memory";
break;
case StatusCode::KeyError:
type = "Key error";
break;
case StatusCode::TypeError:
type = "Type error";
break;
case StatusCode::Invalid:
type = "Invalid";
break;
case StatusCode::IOError:
type = "IOError";
break;
case StatusCode::ObjectExists:
type = "ObjectExists";
break;
case StatusCode::ObjectStoreFull:
type = "ObjectStoreFull";
break;
case StatusCode::UnknownError:
type = "Unknown error";
break;
case StatusCode::NotImplemented:
type = "NotImplemented";
break;
case StatusCode::RedisError:
type = "RedisError";
break;
case StatusCode::TimedOut:
type = "TimedOut";
break;
case StatusCode::Interrupted:
type = "Interrupted";
break;
default:
type = "Unknown";
break;
static std::map<StatusCode, std::string> code_to_str = {
{StatusCode::OK, STATUS_CODE_OK},
{StatusCode::OutOfMemory, STATUS_CODE_OUT_OF_MEMORY},
{StatusCode::KeyError, STATUS_CODE_KEY_ERROR},
{StatusCode::TypeError, STATUS_CODE_TYPE_ERROR},
{StatusCode::Invalid, STATUS_CODE_INVALID},
{StatusCode::IOError, STATUS_CODE_IO_ERROR},
{StatusCode::ObjectExists, STATUS_CODE_OBJECT_EXISTS},
{StatusCode::ObjectStoreFull, STATUS_CODE_OBJECT_STORE_FULL},
{StatusCode::UnknownError, STATUS_CODE_UNKNOWN_ERROR},
{StatusCode::NotImplemented, STATUS_CODE_NOT_IMPLEMENTED},
{StatusCode::RedisError, STATUS_CODE_REDIS_ERROR},
{StatusCode::TimedOut, STATUS_CODE_TIMED_OUT},
{StatusCode::Interrupted, STATUS_CODE_INTERRUPTED},
{StatusCode::IntentionalSystemExit, STATUS_CODE_INTENTIONAL_SYSTEM_EXIT},
{StatusCode::UnexpectedSystemExit, STATUS_CODE_UNEXPECTED_SYSTEM_EXIT}};
if (!code_to_str.count(code())) {
return STATUS_CODE_UNKNOWN;
}
return std::string(type);
return code_to_str[code()];
}
std::string Status::ToString() const {
@@ -34,7 +34,7 @@ void DefaultActorInfoHandler::HandleGetActorInfo(
RAY_LOG(ERROR) << "Failed to get actor info: " << status.ToString()
<< ", actor id = " << actor_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Actors().AsyncGet(actor_id, on_done);
@@ -51,12 +51,12 @@ void DefaultActorInfoHandler::HandleRegisterActorInfo(
RAY_LOG(DEBUG) << "Registering actor info, actor id = " << actor_id;
auto actor_table_data = std::make_shared<ActorTableData>();
actor_table_data->CopyFrom(request.actor_table_data());
auto on_done = [actor_id, send_reply_callback](Status status) {
auto on_done = [actor_id, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to register actor info: " << status.ToString()
<< ", actor id = " << actor_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Actors().AsyncRegister(actor_table_data, on_done);
@@ -73,12 +73,12 @@ void DefaultActorInfoHandler::HandleUpdateActorInfo(
RAY_LOG(DEBUG) << "Updating actor info, actor id = " << actor_id;
auto actor_table_data = std::make_shared<ActorTableData>();
actor_table_data->CopyFrom(request.actor_table_data());
auto on_done = [actor_id, send_reply_callback](Status status) {
auto on_done = [actor_id, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to update actor info: " << status.ToString()
<< ", actor id = " << actor_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Actors().AsyncUpdate(actor_id, actor_table_data, on_done);
@@ -98,13 +98,13 @@ void DefaultActorInfoHandler::HandleAddActorCheckpoint(
<< ", checkpoint id = " << checkpoint_id;
auto actor_checkpoint_data = std::make_shared<ActorCheckpointData>();
actor_checkpoint_data->CopyFrom(request.checkpoint_data());
auto on_done = [actor_id, checkpoint_id, send_reply_callback](Status status) {
auto on_done = [actor_id, checkpoint_id, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to add actor checkpoint: " << status.ToString()
<< ", actor id = " << actor_id
<< ", checkpoint id = " << checkpoint_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Actors().AsyncAddCheckpoint(actor_checkpoint_data, on_done);
@@ -130,7 +130,7 @@ void DefaultActorInfoHandler::HandleGetActorCheckpoint(
RAY_LOG(ERROR) << "Failed to get actor checkpoint: " << status.ToString()
<< ", checkpoint id = " << checkpoint_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Actors().AsyncGetCheckpoint(checkpoint_id, on_done);
@@ -156,7 +156,7 @@ void DefaultActorInfoHandler::HandleGetActorCheckpointID(
RAY_LOG(ERROR) << "Failed to get actor checkpoint id: " << status.ToString()
<< ", actor id = " << actor_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Actors().AsyncGetCheckpointID(actor_id, on_done);
@@ -25,12 +25,12 @@ void DefaultErrorInfoHandler::HandleReportJobError(
RAY_LOG(DEBUG) << "Reporting job error, job id = " << job_id << ", type = " << type;
auto error_table_data = std::make_shared<ErrorTableData>();
error_table_data->CopyFrom(request.error_data());
auto on_done = [job_id, type, send_reply_callback](Status status) {
auto on_done = [job_id, type, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to report job error, job id = " << job_id
<< ", type = " << type;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Errors().AsyncReportJobError(error_table_data, on_done);
@@ -29,8 +29,7 @@ void DefaultJobInfoHandler::HandleAddJob(const rpc::AddJobRequest &request,
RAY_LOG(ERROR) << "Failed to add job, job id = " << job_id
<< ", driver pid = " << request.data().driver_pid();
}
reply->set_success(status.ok());
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Jobs().AsyncAdd(job_table_data, on_done);
@@ -50,8 +49,7 @@ void DefaultJobInfoHandler::HandleMarkJobFinished(
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to mark job state, job id = " << job_id;
}
reply->set_success(status.ok());
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Jobs().AsyncMarkFinished(job_id, on_done);
@@ -24,12 +24,12 @@ void DefaultNodeInfoHandler::HandleRegisterNode(
ClientID node_id = ClientID::FromBinary(request.node_info().node_id());
RAY_LOG(DEBUG) << "Registering node info, node id = " << node_id;
auto on_done = [node_id, send_reply_callback](Status status) {
auto on_done = [node_id, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to register node info: " << status.ToString()
<< ", node id = " << node_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Nodes().AsyncRegister(request.node_info(), on_done);
@@ -45,12 +45,12 @@ void DefaultNodeInfoHandler::HandleUnregisterNode(
ClientID node_id = ClientID::FromBinary(request.node_id());
RAY_LOG(DEBUG) << "Unregistering node info, node id = " << node_id;
auto on_done = [node_id, send_reply_callback](Status status) {
auto on_done = [node_id, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to unregister node info: " << status.ToString()
<< ", node id = " << node_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Nodes().AsyncUnregister(node_id, on_done);
@@ -73,7 +73,7 @@ void DefaultNodeInfoHandler::HandleGetAllNodeInfo(
} else {
RAY_LOG(ERROR) << "Failed to get all nodes info: " << status.ToString();
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Nodes().AsyncGetAll(on_done);
@@ -89,12 +89,12 @@ void DefaultNodeInfoHandler::HandleReportHeartbeat(
ClientID node_id = ClientID::FromBinary(request.heartbeat().client_id());
RAY_LOG(DEBUG) << "Reporting heartbeat, node id = " << node_id;
auto on_done = [node_id, send_reply_callback](Status status) {
auto on_done = [node_id, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to report heartbeat: " << status.ToString()
<< ", node id = " << node_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
auto heartbeat_data = std::make_shared<rpc::HeartbeatTableData>();
@@ -112,12 +112,12 @@ void DefaultNodeInfoHandler::HandleReportBatchHeartbeat(
RAY_LOG(DEBUG) << "Reporting batch heartbeat, batch size = "
<< request.heartbeat_batch().batch_size();
auto on_done = [&request, send_reply_callback](Status status) {
auto on_done = [&request, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to report batch heartbeat: " << status.ToString()
<< ", batch size = " << request.heartbeat_batch().batch_size();
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
auto heartbeat_batch_data = std::make_shared<rpc::HeartbeatBatchTableData>();
@@ -151,7 +151,7 @@ void DefaultNodeInfoHandler::HandleGetResources(const GetResourcesRequest &reque
RAY_LOG(ERROR) << "Failed to get node resources: " << status.ToString()
<< ", node id = " << node_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Nodes().AsyncGetResources(node_id, on_done);
@@ -173,12 +173,12 @@ void DefaultNodeInfoHandler::HandleUpdateResources(
resources[resource.first] = std::make_shared<rpc::ResourceTableData>(resource.second);
}
auto on_done = [node_id, send_reply_callback](Status status) {
auto on_done = [node_id, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to update node resources: " << status.ToString()
<< ", node id = " << node_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Nodes().AsyncUpdateResources(node_id, resources, on_done);
@@ -196,12 +196,12 @@ void DefaultNodeInfoHandler::HandleDeleteResources(
auto resource_names = VectorFromProtobuf(request.resource_name_list());
RAY_LOG(DEBUG) << "Deleting node resources, node id = " << node_id;
auto on_done = [node_id, send_reply_callback](Status status) {
auto on_done = [node_id, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to delete node resources: " << status.ToString()
<< ", node id = " << node_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status =
@@ -34,7 +34,7 @@ void DefaultObjectInfoHandler::HandleGetObjectLocations(
RAY_LOG(ERROR) << "Failed to get object locations: " << status.ToString()
<< ", object id = " << object_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Objects().AsyncGetLocations(object_id, on_done);
@@ -53,12 +53,12 @@ void DefaultObjectInfoHandler::HandleAddObjectLocation(
RAY_LOG(DEBUG) << "Adding object location, object id = " << object_id
<< ", node id = " << node_id;
auto on_done = [object_id, node_id, send_reply_callback](Status status) {
auto on_done = [object_id, node_id, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to add object location: " << status.ToString()
<< ", object id = " << object_id << ", node id = " << node_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Objects().AsyncAddLocation(object_id, node_id, on_done);
@@ -78,12 +78,12 @@ void DefaultObjectInfoHandler::HandleRemoveObjectLocation(
RAY_LOG(DEBUG) << "Removing object location, object id = " << object_id
<< ", node id = " << node_id;
auto on_done = [object_id, node_id, send_reply_callback](Status status) {
auto on_done = [object_id, node_id, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to remove object location: " << status.ToString()
<< ", object id = " << object_id << ", node id = " << node_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Objects().AsyncRemoveLocation(object_id, node_id, on_done);
+2 -2
View File
@@ -25,13 +25,13 @@ void DefaultStatsHandler::HandleAddProfileData(const AddProfileDataRequest &requ
<< request.profile_data().component_type() << ", node id = " << node_id;
auto profile_table_data = std::make_shared<ProfileTableData>();
profile_table_data->CopyFrom(request.profile_data());
auto on_done = [node_id, request, send_reply_callback](Status status) {
auto on_done = [node_id, request, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to add profile data, component type = "
<< request.profile_data().component_type()
<< ", node id = " << node_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Stats().AsyncAddProfileData(profile_table_data, on_done);
@@ -25,12 +25,12 @@ void DefaultTaskInfoHandler::HandleAddTask(const AddTaskRequest &request,
RAY_LOG(DEBUG) << "Adding task, task id = " << task_id << ", job id = " << job_id;
auto task_table_data = std::make_shared<TaskTableData>();
task_table_data->CopyFrom(request.task_data());
auto on_done = [job_id, task_id, request, send_reply_callback](Status status) {
auto on_done = [job_id, task_id, request, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to add task, task id = " << task_id
<< ", job id = " << job_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Tasks().AsyncAdd(task_table_data, on_done);
@@ -54,7 +54,7 @@ void DefaultTaskInfoHandler::HandleGetTask(const GetTaskRequest &request,
} else {
RAY_LOG(ERROR) << "Failed to get task, task id = " << task_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Tasks().AsyncGet(task_id, on_done);
@@ -69,11 +69,11 @@ void DefaultTaskInfoHandler::HandleDeleteTasks(const DeleteTasksRequest &request
SendReplyCallback send_reply_callback) {
std::vector<TaskID> task_ids = IdVectorFromProtobuf<TaskID>(request.task_id_list());
RAY_LOG(DEBUG) << "Deleting tasks, task id list size = " << task_ids.size();
auto on_done = [task_ids, request, send_reply_callback](Status status) {
auto on_done = [task_ids, request, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to delete tasks, task id list size = " << task_ids.size();
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Tasks().AsyncDelete(task_ids, on_done);
@@ -92,12 +92,12 @@ void DefaultTaskInfoHandler::HandleAddTaskLease(const AddTaskLeaseRequest &reque
<< ", node id = " << node_id;
auto task_lease_data = std::make_shared<TaskLeaseData>();
task_lease_data->CopyFrom(request.task_lease_data());
auto on_done = [task_id, node_id, request, send_reply_callback](Status status) {
auto on_done = [task_id, node_id, request, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to add task lease, task id = " << task_id
<< ", node id = " << node_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Tasks().AsyncAddTaskLease(task_lease_data, on_done);
@@ -118,13 +118,13 @@ void DefaultTaskInfoHandler::HandleAttemptTaskReconstruction(
<< ", node id = " << node_id;
auto task_reconstruction_data = std::make_shared<TaskReconstructionData>();
task_reconstruction_data->CopyFrom(request.task_reconstruction());
auto on_done = [node_id, request, send_reply_callback](Status status) {
auto on_done = [node_id, request, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to reconstruct task, reconstructions num = "
<< request.task_reconstruction().num_reconstructions()
<< ", node id = " << node_id;
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status =
@@ -455,8 +455,8 @@ class GcsServerTest : public RedisServiceManagerForTest {
std::unique_ptr<rpc::GcsRpcClient> client_;
std::unique_ptr<rpc::ClientCallManager> client_call_manager_;
// Timeout waiting for gcs server reply, default is 2s
const uint64_t timeout_ms_ = 2000;
// Timeout waiting for gcs server reply, default is 5s
const uint64_t timeout_ms_ = 5000;
};
TEST_F(GcsServerTest, TestActorInfo) {
@@ -618,6 +618,8 @@ TEST_F(GcsServerTest, TestTaskInfo) {
rpc::DeleteTasksRequest delete_tasks_request;
delete_tasks_request.add_task_id_list(task_id.Binary());
ASSERT_TRUE(DeleteTasks(delete_tasks_request));
result = GetTask(task_id.Binary());
ASSERT_TRUE(!result.has_task());
// Add task lease
ClientID node_id = ClientID::FromRandom();
@@ -24,12 +24,12 @@ void DefaultWorkerInfoHandler::HandleReportWorkerFailure(
RAY_LOG(DEBUG) << "Reporting worker failure, " << worker_address.DebugString();
auto worker_failure_data = std::make_shared<WorkerFailureData>();
worker_failure_data->CopyFrom(request.worker_failure());
auto on_done = [worker_address, send_reply_callback](Status status) {
auto on_done = [worker_address, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to report worker failure, "
<< worker_address.DebugString();
}
send_reply_callback(status, nullptr, nullptr);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status =
+39 -9
View File
@@ -18,12 +18,17 @@ package ray.rpc;
import "src/ray/protobuf/gcs.proto";
message GcsStatus {
int32 code = 1;
string message = 2;
}
message AddJobRequest {
JobTableData data = 1;
}
message AddJobReply {
bool success = 1;
GcsStatus status = 1;
}
message MarkJobFinishedRequest {
@@ -31,7 +36,7 @@ message MarkJobFinishedRequest {
}
message MarkJobFinishedReply {
bool success = 1;
GcsStatus status = 1;
}
// Service for job info access.
@@ -48,8 +53,9 @@ message GetActorInfoRequest {
}
message GetActorInfoReply {
GcsStatus status = 1;
// Data of actor.
ActorTableData actor_table_data = 1;
ActorTableData actor_table_data = 2;
}
message RegisterActorInfoRequest {
@@ -58,6 +64,7 @@ message RegisterActorInfoRequest {
}
message RegisterActorInfoReply {
GcsStatus status = 1;
}
message UpdateActorInfoRequest {
@@ -68,6 +75,7 @@ message UpdateActorInfoRequest {
}
message UpdateActorInfoReply {
GcsStatus status = 1;
}
message AddActorCheckpointRequest {
@@ -75,6 +83,7 @@ message AddActorCheckpointRequest {
}
message AddActorCheckpointReply {
GcsStatus status = 1;
}
message GetActorCheckpointRequest {
@@ -82,7 +91,8 @@ message GetActorCheckpointRequest {
}
message GetActorCheckpointReply {
ActorCheckpointData checkpoint_data = 1;
GcsStatus status = 1;
ActorCheckpointData checkpoint_data = 2;
}
message GetActorCheckpointIDRequest {
@@ -90,7 +100,8 @@ message GetActorCheckpointIDRequest {
}
message GetActorCheckpointIDReply {
ActorCheckpointIdData checkpoint_id_data = 1;
GcsStatus status = 1;
ActorCheckpointIdData checkpoint_id_data = 2;
}
// Service for actor info access.
@@ -116,6 +127,7 @@ message RegisterNodeRequest {
}
message RegisterNodeReply {
GcsStatus status = 1;
}
message UnregisterNodeRequest {
@@ -124,13 +136,15 @@ message UnregisterNodeRequest {
}
message UnregisterNodeReply {
GcsStatus status = 1;
}
message GetAllNodeInfoRequest {
}
message GetAllNodeInfoReply {
repeated GcsNodeInfo node_info_list = 1;
GcsStatus status = 1;
repeated GcsNodeInfo node_info_list = 2;
}
message ReportHeartbeatRequest {
@@ -138,6 +152,7 @@ message ReportHeartbeatRequest {
}
message ReportHeartbeatReply {
GcsStatus status = 1;
}
message ReportBatchHeartbeatRequest {
@@ -145,6 +160,7 @@ message ReportBatchHeartbeatRequest {
}
message ReportBatchHeartbeatReply {
GcsStatus status = 1;
}
message GetResourcesRequest {
@@ -152,7 +168,8 @@ message GetResourcesRequest {
}
message GetResourcesReply {
map<string, ResourceTableData> resources = 1;
GcsStatus status = 1;
map<string, ResourceTableData> resources = 2;
}
message UpdateResourcesRequest {
@@ -161,6 +178,7 @@ message UpdateResourcesRequest {
}
message UpdateResourcesReply {
GcsStatus status = 1;
}
message DeleteResourcesRequest {
@@ -169,6 +187,7 @@ message DeleteResourcesRequest {
}
message DeleteResourcesReply {
GcsStatus status = 1;
}
// Service for node info access.
@@ -198,8 +217,9 @@ message GetObjectLocationsRequest {
}
message GetObjectLocationsReply {
GcsStatus status = 1;
// Data of object
repeated ObjectTableData object_table_data_list = 1;
repeated ObjectTableData object_table_data_list = 2;
}
message AddObjectLocationRequest {
@@ -210,6 +230,7 @@ message AddObjectLocationRequest {
}
message AddObjectLocationReply {
GcsStatus status = 1;
}
message RemoveObjectLocationRequest {
@@ -220,6 +241,7 @@ message RemoveObjectLocationRequest {
}
message RemoveObjectLocationReply {
GcsStatus status = 1;
}
// Service for object info access.
@@ -238,6 +260,7 @@ message AddTaskRequest {
}
message AddTaskReply {
GcsStatus status = 1;
}
message GetTaskRequest {
@@ -245,7 +268,8 @@ message GetTaskRequest {
}
message GetTaskReply {
TaskTableData task_data = 1;
GcsStatus status = 1;
TaskTableData task_data = 2;
}
message DeleteTasksRequest {
@@ -253,6 +277,7 @@ message DeleteTasksRequest {
}
message DeleteTasksReply {
GcsStatus status = 1;
}
message AddTaskLeaseRequest {
@@ -260,6 +285,7 @@ message AddTaskLeaseRequest {
}
message AddTaskLeaseReply {
GcsStatus status = 1;
}
message AttemptTaskReconstructionRequest {
@@ -267,6 +293,7 @@ message AttemptTaskReconstructionRequest {
}
message AttemptTaskReconstructionReply {
GcsStatus status = 1;
}
// Service for task info access.
@@ -289,6 +316,7 @@ message AddProfileDataRequest {
}
message AddProfileDataReply {
GcsStatus status = 1;
}
// Service for stats access.
@@ -302,6 +330,7 @@ message ReportJobErrorRequest {
}
message ReportJobErrorReply {
GcsStatus status = 1;
}
// Service for error info access.
@@ -315,6 +344,7 @@ message ReportWorkerFailureRequest {
}
message ReportWorkerFailureReply {
GcsStatus status = 1;
}
// Service for worker info access.
+24 -19
View File
@@ -46,25 +46,30 @@ class Executor {
};
// Define a void GCS RPC client method.
#define VOID_GCS_RPC_CLIENT_METHOD(SERVICE, METHOD, grpc_client, SPECS) \
void METHOD(const METHOD##Request &request, \
const ClientCallback<METHOD##Reply> &callback) SPECS { \
auto executor = new Executor(this); \
auto operation_callback = [this, request, callback, executor]( \
const Status &status, const METHOD##Reply &reply) { \
if (!status.IsIOError()) { \
callback(status, reply); \
delete executor; \
} else { \
Reconnect(); \
executor->Retry(); \
} \
}; \
auto operation = [request, operation_callback](GcsRpcClient *gcs_rpc_client) { \
RAY_UNUSED(INVOKE_RPC_CALL(SERVICE, METHOD, request, operation_callback, \
gcs_rpc_client->grpc_client)); \
}; \
executor->Execute(operation); \
#define VOID_GCS_RPC_CLIENT_METHOD(SERVICE, METHOD, grpc_client, SPECS) \
void METHOD(const METHOD##Request &request, \
const ClientCallback<METHOD##Reply> &callback) SPECS { \
auto executor = new Executor(this); \
auto operation_callback = [this, request, callback, executor]( \
const ray::Status &status, \
const METHOD##Reply &reply) { \
if (!status.IsIOError()) { \
auto status = \
reply.status().code() == (int)StatusCode::OK \
? Status() \
: Status(StatusCode(reply.status().code()), reply.status().message()); \
callback(status, reply); \
delete executor; \
} else { \
Reconnect(); \
executor->Retry(); \
} \
}; \
auto operation = [request, operation_callback](GcsRpcClient *gcs_rpc_client) { \
RAY_UNUSED(INVOKE_RPC_CALL(SERVICE, METHOD, request, operation_callback, \
gcs_rpc_client->grpc_client)); \
}; \
executor->Execute(operation); \
}
/// Client used for communicating with gcs server.
+5
View File
@@ -46,6 +46,11 @@ namespace rpc {
#define WORKER_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(WorkerInfoGcsService, HANDLER)
#define GCS_RPC_SEND_REPLY(send_reply_callback, reply, status) \
reply->mutable_status()->set_code((int)status.code()); \
reply->mutable_status()->set_message(status.message()); \
send_reply_callback(ray::Status::OK(), nullptr, nullptr)
class JobInfoGcsServiceHandler {
public:
virtual ~JobInfoGcsServiceHandler() = default;