From ca454c5c1b19bd76c2922714a579d70cd2011f2c Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Thu, 9 Jan 2020 15:37:42 +0800 Subject: [PATCH] Add task reconstruction function to task info handler (#6711) --- .../gcs/gcs_server/task_info_handler_impl.cc | 29 +++++++++++++++++++ .../gcs/gcs_server/task_info_handler_impl.h | 4 +++ .../gcs_server/test/gcs_server_rpc_test.cc | 21 ++++++++++++++ src/ray/protobuf/gcs_service.proto | 10 +++++++ src/ray/rpc/gcs_server/gcs_rpc_client.h | 4 +++ src/ray/rpc/gcs_server/gcs_rpc_server.h | 5 ++++ 6 files changed, 73 insertions(+) diff --git a/src/ray/gcs/gcs_server/task_info_handler_impl.cc b/src/ray/gcs/gcs_server/task_info_handler_impl.cc index 3b3a7de43..524f142da 100644 --- a/src/ray/gcs/gcs_server/task_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/task_info_handler_impl.cc @@ -94,5 +94,34 @@ void DefaultTaskInfoHandler::HandleAddTaskLease(const AddTaskLeaseRequest &reque << ", node id = " << node_id; } +void DefaultTaskInfoHandler::HandleAttemptTaskReconstruction( + const AttemptTaskReconstructionRequest &request, + AttemptTaskReconstructionReply *reply, SendReplyCallback send_reply_callback) { + ClientID node_id = + ClientID::FromBinary(request.task_reconstruction().node_manager_id()); + RAY_LOG(DEBUG) << "Reconstructing task, reconstructions num = " + << request.task_reconstruction().num_reconstructions() + << ", node id = " << node_id; + auto task_reconstruction_data = std::make_shared(); + task_reconstruction_data->CopyFrom(request.task_reconstruction()); + auto on_done = [node_id, request, send_reply_callback](Status status) { + if (!status.ok()) { + RAY_LOG(ERROR) << "Failed to reconstruct task, reconstructions num = " + << request.task_reconstruction().num_reconstructions() + << ", node id = " << node_id; + } + send_reply_callback(status, nullptr, nullptr); + }; + + Status status = + gcs_client_.Tasks().AttemptTaskReconstruction(task_reconstruction_data, on_done); + if (!status.ok()) { + on_done(status); + } + RAY_LOG(DEBUG) << "Finished reconstructing task, reconstructions num = " + << request.task_reconstruction().num_reconstructions() + << ", node id = " << node_id; +} + } // namespace rpc } // namespace ray diff --git a/src/ray/gcs/gcs_server/task_info_handler_impl.h b/src/ray/gcs/gcs_server/task_info_handler_impl.h index 7f66f257a..6ace507b5 100644 --- a/src/ray/gcs/gcs_server/task_info_handler_impl.h +++ b/src/ray/gcs/gcs_server/task_info_handler_impl.h @@ -25,6 +25,10 @@ class DefaultTaskInfoHandler : public rpc::TaskInfoHandler { void HandleAddTaskLease(const AddTaskLeaseRequest &request, AddTaskLeaseReply *reply, SendReplyCallback send_reply_callback) override; + void HandleAttemptTaskReconstruction(const AttemptTaskReconstructionRequest &request, + AttemptTaskReconstructionReply *reply, + SendReplyCallback send_reply_callback) override; + private: gcs::RedisGcsClient &gcs_client_; }; diff --git a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc index 78a86d2af..51f35b2a1 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc @@ -334,6 +334,17 @@ class GcsServerTest : public RedisServiceManagerForTest { return WaitReady(promise.get_future(), timeout_ms_); } + bool AttemptTaskReconstruction(const rpc::AttemptTaskReconstructionRequest &request) { + std::promise promise; + client_->AttemptTaskReconstruction( + request, [&promise](const Status &status, + const rpc::AttemptTaskReconstructionReply &reply) { + RAY_CHECK_OK(status); + promise.set_value(true); + }); + return WaitReady(promise.get_future(), timeout_ms_); + } + bool WaitReady(const std::future &future, uint64_t timeout_ms) { auto status = future.wait_for(std::chrono::milliseconds(timeout_ms)); return status == std::future_status::ready; @@ -572,6 +583,16 @@ TEST_F(GcsServerTest, TestTaskInfo) { rpc::AddTaskLeaseRequest add_task_lease_request; add_task_lease_request.mutable_task_lease_data()->CopyFrom(task_lease_data); ASSERT_TRUE(AddTaskLease(add_task_lease_request)); + + // Attempt task reconstruction + rpc::AttemptTaskReconstructionRequest attempt_task_reconstruction_request; + rpc::TaskReconstructionData task_reconstruction_data; + task_reconstruction_data.set_task_id(task_id.Binary()); + task_reconstruction_data.set_node_manager_id(node_id.Binary()); + task_reconstruction_data.set_num_reconstructions(0); + attempt_task_reconstruction_request.mutable_task_reconstruction()->CopyFrom( + task_reconstruction_data); + ASSERT_TRUE(AttemptTaskReconstruction(attempt_task_reconstruction_request)); } } // namespace ray diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 505512e4f..a435da8ff 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -248,6 +248,13 @@ message AddTaskLeaseRequest { message AddTaskLeaseReply { } +message AttemptTaskReconstructionRequest { + TaskReconstructionData task_reconstruction = 1; +} + +message AttemptTaskReconstructionReply { +} + // Service for task info access. service TaskInfoGcsService { // Add a task to GCS Service. @@ -258,4 +265,7 @@ service TaskInfoGcsService { rpc DeleteTasks(DeleteTasksRequest) returns (DeleteTasksReply); // Add a task lease to GCS Service. rpc AddTaskLease(AddTaskLeaseRequest) returns (AddTaskLeaseReply); + // Attempt task reconstruction to GCS Service. + rpc AttemptTaskReconstruction(AttemptTaskReconstructionRequest) + returns (AttemptTaskReconstructionReply); } diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index d1051bfd5..a9d996f41 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -105,6 +105,10 @@ class GcsRpcClient { /// Add a task lease to GCS Service. VOID_RPC_CLIENT_METHOD(TaskInfoGcsService, AddTaskLease, task_info_grpc_client_, ) + /// Attempt task reconstruction to GCS Service. + VOID_RPC_CLIENT_METHOD(TaskInfoGcsService, AttemptTaskReconstruction, + task_info_grpc_client_, ) + private: /// The gRPC-generated stub. std::unique_ptr> job_info_grpc_client_; diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index a5b8e4b86..aa9a1c98e 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -259,6 +259,10 @@ class TaskInfoGcsServiceHandler { virtual void HandleAddTaskLease(const AddTaskLeaseRequest &request, AddTaskLeaseReply *reply, SendReplyCallback send_reply_callback) = 0; + + virtual void HandleAttemptTaskReconstruction( + const AttemptTaskReconstructionRequest &request, + AttemptTaskReconstructionReply *reply, SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `TaskInfoGcsService`. @@ -282,6 +286,7 @@ class TaskInfoGrpcService : public GrpcService { TASK_INFO_SERVICE_RPC_HANDLER(GetTask, 1); TASK_INFO_SERVICE_RPC_HANDLER(DeleteTasks, 1); TASK_INFO_SERVICE_RPC_HANDLER(AddTaskLease, 1); + TASK_INFO_SERVICE_RPC_HANDLER(AttemptTaskReconstruction, 1); } private: