[Core]Async updating issue fixed for actor's num_restart (#10176)

* bug fixed for num_restart updating

* add log

* log updated

* lint

* fixed

* Update src/ray/gcs/gcs_server/gcs_actor_manager.cc

Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu>

* bug fixed

* bug fixed

* test passed

Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu>
This commit is contained in:
Lixin Wei
2020-08-27 02:49:26 +08:00
committed by GitHub
parent c35ad8237d
commit 4b856fa416
4 changed files with 16 additions and 13 deletions
@@ -363,8 +363,7 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) {
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
// We receive the RESTART message late. Nothing happens.
submitter_.DisconnectActor(actor_id, 0, /*dead=*/false);
submitter_.DisconnectActor(actor_id, 1, /*dead=*/false);
ASSERT_EQ(num_clients_connected_, 2);
// Submit a task.
task = CreateActorTaskHelper(actor_id, worker_id, 2);
@@ -373,7 +372,7 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) {
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
// The actor dies twice. We receive the last RESTART message first.
submitter_.DisconnectActor(actor_id, 2, /*dead=*/false);
submitter_.DisconnectActor(actor_id, 3, /*dead=*/false);
ASSERT_EQ(num_clients_connected_, 2);
// Submit a task.
task = CreateActorTaskHelper(actor_id, worker_id, 3);
@@ -383,17 +382,17 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) {
// We receive the late messages. Nothing happens.
submitter_.ConnectActor(actor_id, addr, 2);
submitter_.DisconnectActor(actor_id, 1, /*dead=*/false);
submitter_.DisconnectActor(actor_id, 2, /*dead=*/false);
ASSERT_EQ(num_clients_connected_, 2);
// The actor dies permanently. All tasks are failed.
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task.TaskId(), _, _)).Times(1);
submitter_.DisconnectActor(actor_id, 2, /*dead=*/true);
submitter_.DisconnectActor(actor_id, 3, /*dead=*/true);
ASSERT_EQ(num_clients_connected_, 2);
// We receive more late messages. Nothing happens because the actor is dead.
submitter_.DisconnectActor(actor_id, 3, /*dead=*/false);
submitter_.ConnectActor(actor_id, addr, 3);
submitter_.DisconnectActor(actor_id, 4, /*dead=*/false);
submitter_.ConnectActor(actor_id, addr, 4);
ASSERT_EQ(num_clients_connected_, 2);
// Submit a task.
task = CreateActorTaskHelper(actor_id, worker_id, 4);
@@ -128,9 +128,10 @@ void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id,
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
if (num_restarts <= queue->second.num_restarts) {
if (num_restarts < queue->second.num_restarts) {
// This message is about an old version of the actor and the actor has
// already restarted since then. Skip the connection.
RAY_LOG(INFO) << "Skip actor that has already been restarted, actor_id=" << actor_id;
return;
}
@@ -168,9 +169,10 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(const ActorID &actor_id
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
if (num_restarts < queue->second.num_restarts && !dead) {
if (num_restarts <= queue->second.num_restarts && !dead) {
// This message is about an old version of the actor that has already been
// restarted successfully. Skip the message handling.
RAY_LOG(INFO) << "Skip actor that has already been restarted, actor_id=" << actor_id;
return;
}
+3 -1
View File
@@ -793,6 +793,9 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
<< " at node " << node_id << ", need_reschedule = " << need_reschedule
<< ", remaining_restarts = " << remaining_restarts;
if (remaining_restarts != 0) {
// num_restarts must be set before updating GCS, or num_restarts will be inconsistent
// between memory cache and storage.
mutable_actor_table_data->set_num_restarts(num_restarts + 1);
mutable_actor_table_data->set_state(rpc::ActorTableData::RESTARTING);
const auto actor_table_data = actor->GetActorTableData();
// Make sure to reset the address before flushing to GCS. Otherwise,
@@ -808,7 +811,6 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
nullptr));
}));
gcs_actor_scheduler_->Schedule(actor);
mutable_actor_table_data->set_num_restarts(num_restarts + 1);
} else {
// Remove actor from `named_actors_` if its name is not empty.
if (!actor->GetName().empty()) {
+3 -3
View File
@@ -128,9 +128,9 @@ message ActorTableData {
// Max number of times this actor should be restarted,
// a value of -1 indicates an infinite number of reconstruction attempts.
int64 max_restarts = 7;
// Number of restarts that have been successfully performed on this
// actor. This is the equal to the number of ALIVE messages that have been
// previously published for this actor.
// Number of restarts that has been tried on this actor.
// This will be greater by 1 than what's published before in ALIVE.
// ALIVE:0 RESTARTING:1 ALIVE:1 RESTARTING:2, etc
uint64 num_restarts = 8;
// The address of the the actor.
Address address = 9;