From 9f8ff2e3b1e6f8bc748a28c2e91ef21168115bef Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 8 Jul 2020 10:56:52 -0700 Subject: [PATCH] [Core] GCS Actor Management Race Condition (#9215) * GCS Actor management on by default. * Fix travis config. * Change condition. * Finish the initial race condition fix. * Lint. * Refine the codebase. * Finish the initial version * Improve logic. * Remove unnecessary log messages. * Address code review. * Add tests * Revert the second race condition that doesn't happen anymore, handle some edge cases. add tests. * Address the second race condition found. * Addressed code review. * Addressed code review. * Run a new unit test only when gcs actor management is on. --- python/ray/tests/test_actor_advanced.py | 44 +++++++++++++++++++ src/ray/gcs/gcs_server/gcs_actor_manager.cc | 30 ++++++++----- src/ray/gcs/gcs_server/gcs_actor_scheduler.cc | 27 ++++++++++-- src/ray/gcs/gcs_server/gcs_actor_scheduler.h | 16 +++++++ .../gcs_server/test/gcs_actor_manager_test.cc | 32 ++++++++++++++ .../test/gcs_actor_scheduler_test.cc | 33 ++++++++++++++ 6 files changed, 168 insertions(+), 14 deletions(-) diff --git a/python/ray/tests/test_actor_advanced.py b/python/ray/tests/test_actor_advanced.py index 0bdd22f74..320b8f947 100644 --- a/python/ray/tests/test_actor_advanced.py +++ b/python/ray/tests/test_actor_advanced.py @@ -908,6 +908,50 @@ def test_actor_creation_task_crash(ray_start_regular): ray.get(ra.f.remote()) +@pytest.mark.skipif( + os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true", + reason=("This edge case is not handled when GCS actor management is off. " + "We won't fix this because GCS actor management " + "will be on by default anyway.")) +@pytest.mark.parametrize( + "ray_start_regular", [{ + "num_cpus": 2, + "num_gpus": 1 + }], indirect=True) +def test_pending_actor_removed_by_owner(ray_start_regular): + # Verify when an owner of pending actors is killed, the actor resources + # are correctly returned. + + @ray.remote(num_cpus=1, num_gpus=1) + class A: + def __init__(self): + self.actors = [] + + def create_actors(self): + self.actors = [B.remote() for _ in range(2)] + + @ray.remote(num_gpus=1) + class B: + def ping(self): + return True + + @ray.remote(num_gpus=1) + def f(): + return True + + a = A.remote() + # Create pending actors + ray.get(a.create_actors.remote()) + + # Owner is dead. pending actors should be killed + # and raylet should return workers correctly. + del a + a = B.remote() + assert ray.get(a.ping.remote()) + ray.kill(a) + assert ray.get(f.remote()) + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 37224c65e..6a6f905fc 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -516,15 +516,21 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) { // The actor was being scheduled and has now been canceled. RAY_CHECK(canceled_actor_id == actor_id); } else { - // The actor was pending scheduling. Remove it from the queue. auto pending_it = std::find_if(pending_actors_.begin(), pending_actors_.end(), [actor_id](const std::shared_ptr &actor) { return actor->GetActorID() == actor_id; }); - // TODO(rkooo567): The actor may be in the state of leasing worker. We need to add - // the processing logic of this in https://github.com/ray-project/ray/pull/9215. + + // The actor was pending scheduling. Remove it from the queue. if (pending_it != pending_actors_.end()) { pending_actors_.erase(pending_it); + } else { + // When actor creation request of this actor id is pending in raylet, + // it doesn't responds, and the actor should be still in leasing state. + // NOTE: Raylet will cancel the lease request once it receives the + // actor state notification. So this method doesn't have to cancel + // outstanding lease request by calling raylet_client->CancelWorkerLease + gcs_actor_scheduler_->CancelOnLeasing(node_id, actor_id); } } } @@ -562,8 +568,9 @@ void GcsActorManager::OnWorkerDead(const ray::ClientID &node_id, } } + // Find if actor is already created or in the creation process (lease request is + // granted) ActorID actor_id; - // Find from worker_to_created_actor_. auto iter = created_actors_.find(node_id); if (iter != created_actors_.end() && iter->second.count(worker_id)) { actor_id = iter->second[worker_id]; @@ -573,14 +580,17 @@ void GcsActorManager::OnWorkerDead(const ray::ClientID &node_id, } } else { actor_id = gcs_actor_scheduler_->CancelOnWorker(node_id, worker_id); + if (actor_id.IsNil()) { + return; + } } - if (!actor_id.IsNil()) { - RAY_LOG(WARNING) << "Worker " << worker_id << " on node " << node_id - << " failed, restarting actor " << actor_id; - // Reconstruct the actor. - ReconstructActor(actor_id, /*need_reschedule=*/!intentional_exit); - } + // Otherwise, try to reconstruct the actor that was already created or in the creation + // process. + RAY_LOG(WARNING) << "Worker " << worker_id << " on node " << node_id + << " failed, restarting actor " << actor_id + << ", intentional exit: " << intentional_exit; + ReconstructActor(actor_id, /*need_reschedule=*/!intentional_exit); } void GcsActorManager::OnNodeDead(const ClientID &node_id) { diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index 7ac91029c..303a976fe 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -123,6 +123,16 @@ std::vector GcsActorScheduler::CancelOnNode(const ClientID &node_id) { return actor_ids; } +void GcsActorScheduler::CancelOnLeasing(const ClientID &node_id, + const ActorID &actor_id) { + // NOTE: This method does not currently cancel the outstanding lease request. + // It only removes leasing information from the internal state so that + // RequestWorkerLease ignores the response from raylet. + auto node_it = node_to_actors_when_leasing_.find(node_id); + RAY_CHECK(node_it != node_to_actors_when_leasing_.end()); + node_it->second.erase(actor_id); +} + ActorID GcsActorScheduler::CancelOnWorker(const ClientID &node_id, const WorkerID &worker_id) { // Remove the worker from creating map and return ID of the actor associated with the @@ -169,11 +179,16 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr actor, // gcs_actor_manager will reconstruct it again. auto iter = node_to_actors_when_leasing_.find(node_id); if (iter != node_to_actors_when_leasing_.end()) { - // If the node is still available, the actor must be still in the leasing map as - // it is erased from leasing map only when `CancelOnNode` or the - // `RequestWorkerLeaseReply` is received from the node, so try lease again. auto actor_iter = iter->second.find(actor->GetActorID()); - RAY_CHECK(actor_iter != iter->second.end()); + if (actor_iter == iter->second.end()) { + // if actor is not in leasing state, it means it is cancelled. + RAY_LOG(INFO) << "Raylet granted a lease request, but the outstanding lease " + "request for " + << actor->GetActorID() + << " has been already cancelled. The response will be ignored."; + return; + } + if (status.ok()) { // Remove the actor from the leasing map as the reply is returned from the // remote node. @@ -252,6 +267,10 @@ void GcsActorScheduler::HandleWorkerLeasedReply( .emplace(leased_worker->GetWorkerID(), leased_worker) .second); actor->UpdateAddress(leased_worker->GetAddress()); + // Make sure to connect to the client before persisting actor info to GCS. + // Without this, there could be a possible race condition. Related issues: + // https://github.com/ray-project/ray/pull/9215/files#r449469320 + GetOrConnectCoreWorkerClient(leased_worker->GetAddress()); RAY_CHECK_OK(gcs_actor_table_.Put(actor->GetActorID(), actor->GetActorTableData(), [this, actor, leased_worker](Status status) { RAY_CHECK_OK(status); diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h index fc404f97e..f3371ceb6 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h @@ -55,6 +55,12 @@ class GcsActorSchedulerInterface { /// \return ID list of actors associated with the specified node id. virtual std::vector CancelOnNode(const ClientID &node_id) = 0; + /// Cancel a outstanding leasing request to raylets. + /// + /// \param node_id ID of the node where the actor leasing request has been sent. + /// \param actor_id ID of an actor. + virtual void CancelOnLeasing(const ClientID &node_id, const ActorID &actor_id) = 0; + /// Cancel the actor that is being scheduled to the specified worker. /// /// \param node_id ID of the node where the worker is located. @@ -109,6 +115,16 @@ class GcsActorScheduler : public GcsActorSchedulerInterface { /// \return ID list of actors associated with the specified node id. std::vector CancelOnNode(const ClientID &node_id) override; + /// Cancel a outstanding leasing request to raylets. + /// + /// NOTE: The current implementation does not actually send lease cancel request to + /// raylet. This method must be only used to ignore incoming raylet lease request + /// responses. + /// + /// \param node_id ID of the node where the actor leasing request has been sent. + /// \param actor_id ID of an actor. + void CancelOnLeasing(const ClientID &node_id, const ActorID &actor_id) override; + /// Cancel the actor that is being scheduled to the specified worker. /// /// \param node_id ID of the node where the worker is located. diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index 50277d50f..202353752 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -23,6 +23,7 @@ namespace ray { using ::testing::_; +using ::testing::Return; class MockActorScheduler : public gcs::GcsActorSchedulerInterface { public: @@ -34,6 +35,7 @@ class MockActorScheduler : public gcs::GcsActorSchedulerInterface { MOCK_METHOD1(CancelOnNode, std::vector(const ClientID &node_id)); MOCK_METHOD2(CancelOnWorker, ActorID(const ClientID &node_id, const WorkerID &worker_id)); + MOCK_METHOD2(CancelOnLeasing, void(const ClientID &node_id, const ActorID &actor_id)); std::vector> actors; }; @@ -599,6 +601,36 @@ TEST_F(GcsActorManagerTest, TestDestroyActorBeforeActorCreationCompletes) { ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD); } +TEST_F(GcsActorManagerTest, TestRaceConditionCancelLease) { + // Covers a scenario 1 in this PR https://github.com/ray-project/ray/pull/9215. + auto job_id = JobID::FromInt(1); + auto create_actor_request = + Mocker::GenCreateActorRequest(job_id, /*max_restarts=*/1, /*detached=*/false); + std::vector> finished_actors; + RAY_CHECK_OK(gcs_actor_manager_->RegisterActor( + create_actor_request, [&finished_actors](std::shared_ptr actor) { + finished_actors.emplace_back(actor); + })); + + ASSERT_EQ(finished_actors.size(), 0); + ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1); + auto actor = mock_actor_scheduler_->actors.back(); + mock_actor_scheduler_->actors.pop_back(); + const auto owner_node_id = actor->GetOwnerNodeID(); + const auto owner_worker_id = actor->GetOwnerID(); + + // Check that the actor is in state `ALIVE`. + rpc::Address address; + auto node_id = ClientID::FromRandom(); + auto worker_id = WorkerID::FromRandom(); + address.set_raylet_id(node_id.Binary()); + address.set_worker_id(worker_id.Binary()); + actor->UpdateAddress(address); + const auto actor_id = actor->GetActorID(); + EXPECT_CALL(*mock_actor_scheduler_, CancelOnLeasing(node_id, actor_id)); + gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id, false); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index c58f9037b..a8dfcc805 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -244,6 +244,39 @@ TEST_F(GcsActorSchedulerTest, TestNodeFailedWhenLeasing) { ASSERT_EQ(0, failure_actors_.size()); } +TEST_F(GcsActorSchedulerTest, TestLeasingCancelledWhenLeasing) { + auto node = Mocker::GenNodeInfo(); + auto node_id = ClientID::FromBinary(node->node_id()); + gcs_node_manager_->AddNode(node); + ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + + auto job_id = JobID::FromInt(1); + auto create_actor_request = Mocker::GenCreateActorRequest(job_id); + auto actor = std::make_shared(create_actor_request); + + // Schedule the actor with 1 available node, and the lease request should be send to the + // node. + gcs_actor_scheduler_->Schedule(actor); + ASSERT_EQ(1, raylet_client_->num_workers_requested); + ASSERT_EQ(1, raylet_client_->callbacks.size()); + + // Cancel the lease request. + gcs_actor_scheduler_->CancelOnLeasing(node_id, actor->GetActorID()); + ASSERT_EQ(1, raylet_client_->num_workers_requested); + ASSERT_EQ(1, raylet_client_->callbacks.size()); + + // Grant a worker, which will influence nothing. + ASSERT_TRUE(raylet_client_->GrantWorkerLease( + node->node_manager_address(), node->node_manager_port(), WorkerID::FromRandom(), + node_id, ClientID::Nil())); + ASSERT_EQ(1, raylet_client_->num_workers_requested); + ASSERT_EQ(0, raylet_client_->callbacks.size()); + ASSERT_EQ(0, gcs_actor_scheduler_->num_retry_leasing_count_); + + ASSERT_EQ(0, success_actors_.size()); + ASSERT_EQ(0, failure_actors_.size()); +} + TEST_F(GcsActorSchedulerTest, TestNodeFailedWhenCreating) { auto node = Mocker::GenNodeInfo(); auto node_id = ClientID::FromBinary(node->node_id());