diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index e51a2c21a..ecebd919d 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -167,6 +167,7 @@ def test_actor_restart(ray_init_with_task_retry_delay): results = [actor.increase.remote() for _ in range(100)] # Kill actor process, while the above task is still being executed. os.kill(pid, SIGKILL) + wait_for_pid_to_exit(pid) # Make sure that all tasks were executed in order before the actor's death. res = results.pop(0) i = 1 @@ -208,6 +209,7 @@ def test_actor_restart(ray_init_with_task_retry_delay): results = [actor.increase.remote() for _ in range(100)] pid = ray.get(actor.get_pid.remote()) os.kill(pid, SIGKILL) + wait_for_pid_to_exit(pid) # The actor has exceeded max restarts, and this task should fail. with pytest.raises(ray.exceptions.RayActorError): ray.get(actor.increase.remote()) @@ -244,6 +246,7 @@ def test_actor_restart_with_retry(ray_init_with_task_retry_delay): results = [actor.increase.remote() for _ in range(100)] # Kill actor process, while the above task is still being executed. os.kill(pid, SIGKILL) + wait_for_pid_to_exit(pid) # Check that none of the tasks failed and the actor is restarted. seq = list(range(1, 101)) results = ray.get(results) @@ -264,6 +267,7 @@ def test_actor_restart_with_retry(ray_init_with_task_retry_delay): results = [actor.increase.remote() for _ in range(100)] pid = ray.get(actor.get_pid.remote()) os.kill(pid, SIGKILL) + wait_for_pid_to_exit(pid) # The actor has exceeded max restarts, and this task should fail. with pytest.raises(ray.exceptions.RayActorError): ray.get(actor.increase.remote()) diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index 4cf07147a..8cdd8676c 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -153,19 +153,6 @@ void ActorManager::WaitForActorOutOfScope( void ActorManager::HandleActorStateNotification(const ActorID &actor_id, const gcs::ActorTableData &actor_data) { - if (actor_data.state() == gcs::ActorTableData::PENDING) { - // The actor is being created and not yet ready, just ignore! - } else if (actor_data.state() == gcs::ActorTableData::RESTARTING) { - direct_actor_submitter_->DisconnectActor(actor_id, false); - } else if (actor_data.state() == gcs::ActorTableData::DEAD) { - direct_actor_submitter_->DisconnectActor(actor_id, true); - // We cannot erase the actor handle here because clients can still - // submit tasks to dead actors. This also means we defer unsubscription, - // otherwise we crash when bulk unsubscribing all actor handles. - } else { - direct_actor_submitter_->ConnectActor(actor_id, actor_data.address()); - } - const auto &actor_state = gcs::ActorTableData::ActorState_Name(actor_data.state()); RAY_LOG(INFO) << "received notification on actor, state: " << actor_state << ", actor_id: " << actor_id @@ -173,7 +160,22 @@ void ActorManager::HandleActorStateNotification(const ActorID &actor_id, << ", port: " << actor_data.address().port() << ", worker_id: " << WorkerID::FromBinary(actor_data.address().worker_id()) << ", raylet_id: " - << ClientID::FromBinary(actor_data.address().raylet_id()); + << ClientID::FromBinary(actor_data.address().raylet_id()) + << ", num_restarts: " << actor_data.num_restarts(); + + if (actor_data.state() == gcs::ActorTableData::PENDING) { + // The actor is being created and not yet ready, just ignore! + } else if (actor_data.state() == gcs::ActorTableData::RESTARTING) { + direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(), false); + } else if (actor_data.state() == gcs::ActorTableData::DEAD) { + direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(), true); + // We cannot erase the actor handle here because clients can still + // submit tasks to dead actors. This also means we defer unsubscription, + // otherwise we crash when bulk unsubscribing all actor handles. + } else { + direct_actor_submitter_->ConnectActor(actor_id, actor_data.address(), + actor_data.num_restarts()); + } } std::vector ActorManager::GetActorHandleIDsFromHandles() { diff --git a/src/ray/core_worker/test/actor_manager_test.cc b/src/ray/core_worker/test/actor_manager_test.cc index 9320f9925..cafb97ec4 100644 --- a/src/ray/core_worker/test/actor_manager_test.cc +++ b/src/ray/core_worker/test/actor_manager_test.cc @@ -77,8 +77,10 @@ class MockDirectActorSubmitter : public CoreWorkerDirectActorTaskSubmitterInterf MockDirectActorSubmitter() : CoreWorkerDirectActorTaskSubmitterInterface() {} MOCK_METHOD1(AddActorQueueIfNotExists, void(const ActorID &actor_id)); - MOCK_METHOD2(ConnectActor, void(const ActorID &actor_id, const rpc::Address &address)); - MOCK_METHOD2(DisconnectActor, void(const ActorID &actor_id, bool dead)); + MOCK_METHOD3(ConnectActor, void(const ActorID &actor_id, const rpc::Address &address, + int64_t num_restarts)); + MOCK_METHOD3(DisconnectActor, + void(const ActorID &actor_id, int64_t num_restarts, bool dead)); MOCK_METHOD3(KillActor, void(const ActorID &actor_id, bool force_kill, bool no_restart)); @@ -189,7 +191,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) { ASSERT_TRUE(actor_handle_to_get->GetActorID() == actor_id); // Check after the actor is created, if it is connected to an actor. - EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _)).Times(1); + EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _, _)).Times(1); rpc::ActorTableData actor_table_data; actor_table_data.set_actor_id(actor_id.Binary()); actor_table_data.set_state( @@ -197,7 +199,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) { actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data); // Now actor state is updated to DEAD. Make sure it is diconnected. - EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _)).Times(1); + EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _, _)).Times(1); actor_table_data.set_actor_id(actor_id.Binary()); actor_table_data.set_state( rpc::ActorTableData_ActorState::ActorTableData_ActorState_DEAD); @@ -242,8 +244,8 @@ TEST_F(ActorManagerTest, RegisterActorHandles) { TEST_F(ActorManagerTest, TestActorStateNotificationPending) { ActorID actor_id = AddActorHandle(); // Nothing happens if state is pending. - EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _)).Times(0); - EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _)).Times(0); + EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _, _)).Times(0); + EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _, _)).Times(0); rpc::ActorTableData actor_table_data; actor_table_data.set_actor_id(actor_id.Binary()); actor_table_data.set_state( @@ -255,8 +257,8 @@ TEST_F(ActorManagerTest, TestActorStateNotificationPending) { TEST_F(ActorManagerTest, TestActorStateNotificationRestarting) { ActorID actor_id = AddActorHandle(); // Should disconnect to an actor when actor is restarting. - EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _)).Times(0); - EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _)).Times(1); + EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _, _)).Times(0); + EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _, _)).Times(1); rpc::ActorTableData actor_table_data; actor_table_data.set_actor_id(actor_id.Binary()); actor_table_data.set_state( @@ -268,8 +270,8 @@ TEST_F(ActorManagerTest, TestActorStateNotificationRestarting) { TEST_F(ActorManagerTest, TestActorStateNotificationDead) { ActorID actor_id = AddActorHandle(); // Should disconnect to an actor when actor is dead. - EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _)).Times(0); - EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _)).Times(1); + EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _, _)).Times(0); + EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _, _)).Times(1); rpc::ActorTableData actor_table_data; actor_table_data.set_actor_id(actor_id.Binary()); actor_table_data.set_state( @@ -281,8 +283,8 @@ TEST_F(ActorManagerTest, TestActorStateNotificationDead) { TEST_F(ActorManagerTest, TestActorStateNotificationAlive) { ActorID actor_id = AddActorHandle(); // Should connect to an actor when actor is alive. - EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _)).Times(1); - EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _)).Times(0); + EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _, _)).Times(1); + EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _, _)).Times(0); rpc::ActorTableData actor_table_data; actor_table_data.set_actor_id(actor_id.Binary()); actor_table_data.set_state( diff --git a/src/ray/core_worker/test/direct_actor_transport_test.cc b/src/ray/core_worker/test/direct_actor_transport_test.cc index c5421f340..b6320b877 100644 --- a/src/ray/core_worker/test/direct_actor_transport_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_test.cc @@ -103,9 +103,14 @@ class DirectActorSubmitterTest : public ::testing::Test { : worker_client_(std::shared_ptr(new MockWorkerClient())), store_(std::shared_ptr(new CoreWorkerMemoryStore())), task_finisher_(std::make_shared()), - submitter_([&](const rpc::Address &addr) { return worker_client_; }, store_, - task_finisher_) {} + submitter_( + [&](const rpc::Address &addr) { + num_clients_connected_++; + return worker_client_; + }, + store_, task_finisher_) {} + int num_clients_connected_ = 0; std::shared_ptr worker_client_; std::shared_ptr store_; std::shared_ptr task_finisher_; @@ -123,7 +128,7 @@ TEST_F(DirectActorSubmitterTest, TestSubmitTask) { ASSERT_TRUE(submitter_.SubmitTask(task).ok()); ASSERT_EQ(worker_client_->callbacks.size(), 0); - submitter_.ConnectActor(actor_id, addr); + submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 1); task = CreateActorTaskHelper(actor_id, worker_id, 1); @@ -145,7 +150,7 @@ TEST_F(DirectActorSubmitterTest, TestDependencies) { addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); submitter_.AddActorQueueIfNotExists(actor_id); - submitter_.ConnectActor(actor_id, addr); + submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); // Create two tasks for the actor with different arguments. @@ -179,7 +184,7 @@ TEST_F(DirectActorSubmitterTest, TestOutOfOrderDependencies) { addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); submitter_.AddActorQueueIfNotExists(actor_id); - submitter_.ConnectActor(actor_id, addr); + submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); // Create two tasks for the actor with different arguments. @@ -215,7 +220,7 @@ TEST_F(DirectActorSubmitterTest, TestActorDead) { ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); submitter_.AddActorQueueIfNotExists(actor_id); gcs::ActorTableData actor_data; - submitter_.ConnectActor(actor_id, addr); + submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); // Create two tasks for the actor. One depends on an object that is not yet available. @@ -235,10 +240,10 @@ TEST_F(DirectActorSubmitterTest, TestActorDead) { } EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _, _)).Times(0); - submitter_.DisconnectActor(actor_id, /*dead=*/false); + submitter_.DisconnectActor(actor_id, 0, /*dead=*/false); // Actor marked as dead. All queued tasks should get failed. EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _)).Times(1); - submitter_.DisconnectActor(actor_id, /*dead=*/true); + submitter_.DisconnectActor(actor_id, 1, /*dead=*/true); } TEST_F(DirectActorSubmitterTest, TestActorRestartNoRetry) { @@ -248,7 +253,7 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartNoRetry) { ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); submitter_.AddActorQueueIfNotExists(actor_id); gcs::ActorTableData actor_data; - submitter_.ConnectActor(actor_id, addr); + submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); // Create four tasks for the actor. @@ -268,13 +273,13 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartNoRetry) { ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""))); // Simulate the actor failing. - submitter_.DisconnectActor(actor_id, /*dead=*/false); + submitter_.DisconnectActor(actor_id, 0, /*dead=*/false); // Third task fails after the actor is disconnected. It should not get // retried. ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""))); // Actor gets restarted. - submitter_.ConnectActor(actor_id, addr); + submitter_.ConnectActor(actor_id, addr, 1); ASSERT_TRUE(submitter_.SubmitTask(task4).ok()); ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); ASSERT_TRUE(worker_client_->callbacks.empty()); @@ -289,7 +294,7 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartRetry) { ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); submitter_.AddActorQueueIfNotExists(actor_id); gcs::ActorTableData actor_data; - submitter_.ConnectActor(actor_id, addr); + submitter_.ConnectActor(actor_id, addr, 0); ASSERT_EQ(worker_client_->callbacks.size(), 0); // Create four tasks for the actor. @@ -313,12 +318,12 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartRetry) { ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""))); // Simulate the actor failing. - submitter_.DisconnectActor(actor_id, /*dead=*/false); + submitter_.DisconnectActor(actor_id, 0, /*dead=*/false); // Third task fails after the actor is disconnected. ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""))); // Actor gets restarted. - submitter_.ConnectActor(actor_id, addr); + submitter_.ConnectActor(actor_id, addr, 1); // A new task is submitted. ASSERT_TRUE(submitter_.SubmitTask(task4).ok()); // Tasks 2 and 3 get retried. @@ -332,6 +337,71 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartRetry) { ASSERT_THAT(worker_client_->received_seq_nos, ElementsAre(0, 1, 2, 2, 0, 1)); } +TEST_F(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) { + rpc::Address addr; + auto worker_id = WorkerID::FromRandom(); + addr.set_worker_id(worker_id.Binary()); + ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); + submitter_.AddActorQueueIfNotExists(actor_id); + gcs::ActorTableData actor_data; + submitter_.ConnectActor(actor_id, addr, 0); + ASSERT_EQ(worker_client_->callbacks.size(), 0); + ASSERT_EQ(num_clients_connected_, 1); + + // Create four tasks for the actor. + auto task = CreateActorTaskHelper(actor_id, worker_id, 0); + // Submit a task. + ASSERT_TRUE(submitter_.SubmitTask(task).ok()); + EXPECT_CALL(*task_finisher_, CompletePendingTask(task.TaskId(), _, _)).Times(1); + ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); + + // Actor restarts, but we don't receive the disconnect message until later. + submitter_.ConnectActor(actor_id, addr, 1); + ASSERT_EQ(num_clients_connected_, 2); + // Submit a task. + task = CreateActorTaskHelper(actor_id, worker_id, 1); + ASSERT_TRUE(submitter_.SubmitTask(task).ok()); + EXPECT_CALL(*task_finisher_, CompletePendingTask(task.TaskId(), _, _)).Times(1); + ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); + + // We receive the RESTART message late. Nothing happens. + submitter_.DisconnectActor(actor_id, 0, /*dead=*/false); + ASSERT_EQ(num_clients_connected_, 2); + // Submit a task. + task = CreateActorTaskHelper(actor_id, worker_id, 2); + ASSERT_TRUE(submitter_.SubmitTask(task).ok()); + EXPECT_CALL(*task_finisher_, CompletePendingTask(task.TaskId(), _, _)).Times(1); + ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); + + // The actor dies twice. We receive the last RESTART message first. + submitter_.DisconnectActor(actor_id, 2, /*dead=*/false); + ASSERT_EQ(num_clients_connected_, 2); + // Submit a task. + task = CreateActorTaskHelper(actor_id, worker_id, 3); + ASSERT_TRUE(submitter_.SubmitTask(task).ok()); + EXPECT_CALL(*task_finisher_, CompletePendingTask(task.TaskId(), _, _)).Times(0); + ASSERT_FALSE(worker_client_->ReplyPushTask(Status::OK())); + + // We receive the late messages. Nothing happens. + submitter_.ConnectActor(actor_id, addr, 2); + submitter_.DisconnectActor(actor_id, 1, /*dead=*/false); + ASSERT_EQ(num_clients_connected_, 2); + + // The actor dies permanently. All tasks are failed. + EXPECT_CALL(*task_finisher_, PendingTaskFailed(task.TaskId(), _, _)).Times(1); + submitter_.DisconnectActor(actor_id, 2, /*dead=*/true); + ASSERT_EQ(num_clients_connected_, 2); + + // We receive more late messages. Nothing happens because the actor is dead. + submitter_.DisconnectActor(actor_id, 3, /*dead=*/false); + submitter_.ConnectActor(actor_id, addr, 3); + ASSERT_EQ(num_clients_connected_, 2); + // Submit a task. + task = CreateActorTaskHelper(actor_id, worker_id, 4); + EXPECT_CALL(*task_finisher_, PendingTaskFailed(task.TaskId(), _, _)).Times(1); + ASSERT_TRUE(submitter_.SubmitTask(task).ok()); +} + class MockDependencyWaiter : public DependencyWaiter { public: MOCK_METHOD2(Wait, void(const std::vector &dependencies, diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 726de288c..d1db242f9 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -112,19 +112,39 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe return Status::OK(); } +void CoreWorkerDirectActorTaskSubmitter::DisconnectRpcClient(ClientQueue &queue) { + queue.rpc_client = nullptr; + queue.worker_id.clear(); + queue.pending_force_kill.reset(); +} + void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id, - const rpc::Address &address) { + const rpc::Address &address, + int64_t num_restarts) { + RAY_LOG(DEBUG) << "Connecting to actor " << actor_id << " at worker " + << WorkerID::FromBinary(address.worker_id()); absl::MutexLock lock(&mu_); auto queue = client_queues_.find(actor_id); RAY_CHECK(queue != client_queues_.end()); - if (queue->second.rpc_client) { - // Skip reconnection if we already have a client to this actor. - // NOTE(swang): This seems to only trigger in multithreaded Java tests. - RAY_CHECK(queue->second.worker_id == address.worker_id()); + if (num_restarts <= queue->second.num_restarts) { + // This message is about an old version of the actor and the actor has + // already restarted since then. Skip the connection. return; } + if (queue->second.state == rpc::ActorTableData::DEAD) { + // This message is about an old version of the actor and the actor has + // already died since then. Skip the connection. + return; + } + + queue->second.num_restarts = num_restarts; + if (queue->second.rpc_client) { + // Clear the client to the old version of the actor. + DisconnectRpcClient(queue->second); + } + queue->second.state = rpc::ActorTableData::ALIVE; // Update the mapping so new RPCs go out with the right intended worker id. queue->second.worker_id = address.worker_id(); @@ -142,26 +162,26 @@ void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id, } void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(const ActorID &actor_id, + int64_t num_restarts, bool dead) { + RAY_LOG(DEBUG) << "Disconnecting from actor " << actor_id; absl::MutexLock lock(&mu_); auto queue = client_queues_.find(actor_id); RAY_CHECK(queue != client_queues_.end()); - - if (dead) { - queue->second.state = rpc::ActorTableData::DEAD; - } else { - queue->second.state = rpc::ActorTableData::RESTARTING; + if (num_restarts < queue->second.num_restarts && !dead) { + // This message is about an old version of the actor that has already been + // restarted successfully. Skip the message handling. + return; } // The actor failed, so erase the client for now. Either the actor is // permanently dead or the new client will be inserted once the actor is // restarted. - queue->second.rpc_client = nullptr; - queue->second.worker_id.clear(); - queue->second.pending_force_kill.reset(); + DisconnectRpcClient(queue->second); - // If there are pending requests, treat the pending tasks as failed. if (dead) { + queue->second.state = rpc::ActorTableData::DEAD; + // If there are pending requests, treat the pending tasks as failed. RAY_LOG(INFO) << "Failing pending tasks for actor " << actor_id; auto &requests = queue->second.requests; auto head = requests.begin(); @@ -180,6 +200,11 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(const ActorID &actor_id // replies. They will be treated as failed once the connection dies. // We retain the sequencing information so that we can properly fail // any tasks submitted after the actor death. + } else if (queue->second.state != rpc::ActorTableData::DEAD) { + // Only update the actor's state if it is not permanently dead. The actor + // will eventually get restarted or marked as permanently dead. + queue->second.state = rpc::ActorTableData::RESTARTING; + queue->second.num_restarts = num_restarts; } } diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index 3cddde52c..b71512013 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -50,8 +50,10 @@ const int kMaxReorderWaitSeconds = 30; class CoreWorkerDirectActorTaskSubmitterInterface { public: virtual void AddActorQueueIfNotExists(const ActorID &actor_id) = 0; - virtual void ConnectActor(const ActorID &actor_id, const rpc::Address &address) = 0; - virtual void DisconnectActor(const ActorID &actor_id, bool dead = false) = 0; + virtual void ConnectActor(const ActorID &actor_id, const rpc::Address &address, + int64_t num_restarts) = 0; + virtual void DisconnectActor(const ActorID &actor_id, int64_t num_restarts, + bool dead = false) = 0; virtual void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart) = 0; virtual ~CoreWorkerDirectActorTaskSubmitterInterface() {} @@ -95,12 +97,21 @@ class CoreWorkerDirectActorTaskSubmitter /// /// \param[in] actor_id Actor ID. /// \param[in] address The new address of the actor. - void ConnectActor(const ActorID &actor_id, const rpc::Address &address); + /// \param[in] num_restarts How many times this actor has been restarted + /// before. If we've already seen a later incarnation of the actor, we will + /// ignore the command to connect. + void ConnectActor(const ActorID &actor_id, const rpc::Address &address, + int64_t num_restarts); /// Disconnect from a failed actor. /// /// \param[in] actor_id Actor ID. - void DisconnectActor(const ActorID &actor_id, bool dead = false); + /// \param[in] num_restarts How many times this actor has been restarted + /// before. If we've already seen a later incarnation of the actor, we will + /// ignore the command to connect. + /// \param[in] dead Whether the actor is permanently dead. In this case, all + /// pending tasks for the actor should be failed. + void DisconnectActor(const ActorID &actor_id, int64_t num_restarts, bool dead = false); /// Set the timerstamp for the caller. void SetCallerCreationTimestamp(int64_t timestamp); @@ -111,6 +122,10 @@ class CoreWorkerDirectActorTaskSubmitter /// an RPC client to the actor. If this is DEAD, then all tasks in the /// queue will be marked failed and all other ClientQueue state is ignored. rpc::ActorTableData::ActorState state = rpc::ActorTableData::PENDING; + /// How many times this actor has been restarted before. Starts at -1 to + /// indicate that the actor is not yet created. This is used to drop stale + /// messages from the GCS. + int64_t num_restarts = -1; /// The RPC client. We use shared_ptr to enable shared_from_this for /// pending client callbacks. std::shared_ptr rpc_client = nullptr; @@ -193,6 +208,9 @@ class CoreWorkerDirectActorTaskSubmitter /// \return Void. void SendPendingTasks(const ActorID &actor_id) EXCLUSIVE_LOCKS_REQUIRED(mu_); + /// Disconnect the RPC client for an actor. + void DisconnectRpcClient(ClientQueue &queue) EXCLUSIVE_LOCKS_REQUIRED(mu_); + /// Whether the specified actor is alive. /// /// \param[in] actor_id The actor ID. diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 6a6f905fc..8954dbc7e 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -652,7 +652,6 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche << " at node " << node_id << ", need_reschedule = " << need_reschedule << ", remaining_restarts = " << remaining_restarts; if (remaining_restarts != 0) { - mutable_actor_table_data->set_num_restarts(++num_restarts); mutable_actor_table_data->set_state(rpc::ActorTableData::RESTARTING); const auto actor_table_data = actor->GetActorTableData(); // Make sure to reset the address before flushing to GCS. Otherwise, @@ -668,6 +667,7 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche nullptr)); })); gcs_actor_scheduler_->Schedule(actor); + mutable_actor_table_data->set_num_restarts(num_restarts + 1); } else { // Remove actor from `named_actors_` if its name is not empty. if (!actor->GetName().empty()) { diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 5c4474da4..e765cce28 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -127,7 +127,9 @@ message ActorTableData { // Max number of times this actor should be restarted, // a value of -1 indicates an infinite number of reconstruction attempts. int64 max_restarts = 7; - // Number of restarts that have already been performed on this actor. + // Number of restarts that have been successfully performed on this + // actor. This is the equal to the number of ALIVE messages that have been + // previously published for this actor. uint64 num_restarts = 8; // The address of the the actor. Address address = 9;