From 4b856fa41614f5e4ff819ec83e251c9aefd5faef Mon Sep 17 00:00:00 2001 From: Lixin Wei Date: Thu, 27 Aug 2020 02:49:26 +0800 Subject: [PATCH] [Core]Async updating issue fixed for actor's num_restart (#10176) * bug fixed for num_restart updating * add log * log updated * lint * fixed * Update src/ray/gcs/gcs_server/gcs_actor_manager.cc Co-authored-by: Stephanie Wang * bug fixed * bug fixed * test passed Co-authored-by: Stephanie Wang --- .../core_worker/test/direct_actor_transport_test.cc | 13 ++++++------- .../core_worker/transport/direct_actor_transport.cc | 6 ++++-- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 4 +++- src/ray/protobuf/gcs.proto | 6 +++--- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/src/ray/core_worker/test/direct_actor_transport_test.cc b/src/ray/core_worker/test/direct_actor_transport_test.cc index fb894565f..d92fc95ee 100644 --- a/src/ray/core_worker/test/direct_actor_transport_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_test.cc @@ -363,8 +363,7 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) { ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); // We receive the RESTART message late. Nothing happens. - submitter_.DisconnectActor(actor_id, 0, /*dead=*/false); - + submitter_.DisconnectActor(actor_id, 1, /*dead=*/false); ASSERT_EQ(num_clients_connected_, 2); // Submit a task. task = CreateActorTaskHelper(actor_id, worker_id, 2); @@ -373,7 +372,7 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) { ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); // The actor dies twice. We receive the last RESTART message first. - submitter_.DisconnectActor(actor_id, 2, /*dead=*/false); + submitter_.DisconnectActor(actor_id, 3, /*dead=*/false); ASSERT_EQ(num_clients_connected_, 2); // Submit a task. task = CreateActorTaskHelper(actor_id, worker_id, 3); @@ -383,17 +382,17 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) { // We receive the late messages. Nothing happens. submitter_.ConnectActor(actor_id, addr, 2); - submitter_.DisconnectActor(actor_id, 1, /*dead=*/false); + submitter_.DisconnectActor(actor_id, 2, /*dead=*/false); ASSERT_EQ(num_clients_connected_, 2); // The actor dies permanently. All tasks are failed. EXPECT_CALL(*task_finisher_, PendingTaskFailed(task.TaskId(), _, _)).Times(1); - submitter_.DisconnectActor(actor_id, 2, /*dead=*/true); + submitter_.DisconnectActor(actor_id, 3, /*dead=*/true); ASSERT_EQ(num_clients_connected_, 2); // We receive more late messages. Nothing happens because the actor is dead. - submitter_.DisconnectActor(actor_id, 3, /*dead=*/false); - submitter_.ConnectActor(actor_id, addr, 3); + submitter_.DisconnectActor(actor_id, 4, /*dead=*/false); + submitter_.ConnectActor(actor_id, addr, 4); ASSERT_EQ(num_clients_connected_, 2); // Submit a task. task = CreateActorTaskHelper(actor_id, worker_id, 4); diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index eb7569b5c..c05d5b71f 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -128,9 +128,10 @@ void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id, auto queue = client_queues_.find(actor_id); RAY_CHECK(queue != client_queues_.end()); - if (num_restarts <= queue->second.num_restarts) { + if (num_restarts < queue->second.num_restarts) { // This message is about an old version of the actor and the actor has // already restarted since then. Skip the connection. + RAY_LOG(INFO) << "Skip actor that has already been restarted, actor_id=" << actor_id; return; } @@ -168,9 +169,10 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(const ActorID &actor_id absl::MutexLock lock(&mu_); auto queue = client_queues_.find(actor_id); RAY_CHECK(queue != client_queues_.end()); - if (num_restarts < queue->second.num_restarts && !dead) { + if (num_restarts <= queue->second.num_restarts && !dead) { // This message is about an old version of the actor that has already been // restarted successfully. Skip the message handling. + RAY_LOG(INFO) << "Skip actor that has already been restarted, actor_id=" << actor_id; return; } diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 79b205097..e97eb2d1d 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -793,6 +793,9 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche << " at node " << node_id << ", need_reschedule = " << need_reschedule << ", remaining_restarts = " << remaining_restarts; if (remaining_restarts != 0) { + // num_restarts must be set before updating GCS, or num_restarts will be inconsistent + // between memory cache and storage. + mutable_actor_table_data->set_num_restarts(num_restarts + 1); mutable_actor_table_data->set_state(rpc::ActorTableData::RESTARTING); const auto actor_table_data = actor->GetActorTableData(); // Make sure to reset the address before flushing to GCS. Otherwise, @@ -808,7 +811,6 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche nullptr)); })); gcs_actor_scheduler_->Schedule(actor); - mutable_actor_table_data->set_num_restarts(num_restarts + 1); } else { // Remove actor from `named_actors_` if its name is not empty. if (!actor->GetName().empty()) { diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index e8a4e0a0b..c4d621538 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -128,9 +128,9 @@ message ActorTableData { // Max number of times this actor should be restarted, // a value of -1 indicates an infinite number of reconstruction attempts. int64 max_restarts = 7; - // Number of restarts that have been successfully performed on this - // actor. This is the equal to the number of ALIVE messages that have been - // previously published for this actor. + // Number of restarts that has been tried on this actor. + // This will be greater by 1 than what's published before in ALIVE. + // ALIVE:0 RESTARTING:1 ALIVE:1 RESTARTING:2, etc uint64 num_restarts = 8; // The address of the the actor. Address address = 9;