From 872219940bafe7f5860be576e778665dab2b4c58 Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Tue, 29 Sep 2020 01:06:40 +0800 Subject: [PATCH] [GCS]Fix miss `PollOwnerForActorOutOfScope` after gcs restarts bug (#11054) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix_RemoveActorFromOwner_crash_bug * fix review comment * fix review comment * rm unused ut * add testcase * fix review comment * rm unused import * fix code style * fix ut bug Co-authored-by: 灵洵 --- python/ray/tests/test_gcs_fault_tolerance.py | 33 +++++++++++++++++++ .../test/service_based_gcs_client_test.cc | 19 ++++++----- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 11 ++++--- 3 files changed, 49 insertions(+), 14 deletions(-) diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index b6585cabd..aa9902eec 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -126,6 +126,39 @@ def test_node_failure_detector_when_gcs_server_restart(ray_start_cluster_head): wait_for_condition(condition, timeout=10) +@pytest.mark.parametrize( + "ray_start_regular", [ + generate_system_config_map( + num_heartbeats_timeout=20, ping_gcs_rpc_server_max_retries=60) + ], + indirect=True) +def test_del_actor_after_gcs_server_restart(ray_start_regular): + actor = Increase.options(name="abc").remote() + result = ray.get(actor.method.remote(1)) + assert result == 3 + + ray.worker._global_node.kill_gcs_server() + ray.worker._global_node.start_gcs_server() + + actor_id = actor._actor_id.hex() + del actor + + def condition(): + actor_status = ray.actors(actor_id=actor_id) + if actor_status["State"] == ray.gcs_utils.ActorTableData.DEAD: + return True + else: + return False + + # Wait for the actor dead. + wait_for_condition(condition, timeout=10) + + # If `PollOwnerForActorOutOfScope` was successfully called, + # name should be properly deleted. + with pytest.raises(ValueError): + ray.get_actor("abc") + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 6456212bb..3d43ddc4f 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -946,22 +946,23 @@ TEST_F(ServiceBasedGcsClientTest, TestActorTableResubscribe) { RestartGcsServer(); // When GCS client detects that GCS server has restarted, but the pub-sub server - // didn't restart, it will fetch data again from the GCS server. So we'll receive - // another notification of ALIVE state. + // didn't restart, it will fetch data again from the GCS server. The GCS will destroy + // the actor because it finds that the actor is out of scope, so we'll receive another + // notification of DEAD state. WaitForExpectedCount(num_subscribe_all_notifications, 2); WaitForExpectedCount(num_subscribe_one_notifications, 2); - CheckActorData(subscribe_all_notifications[1], rpc::ActorTableData::ALIVE); - CheckActorData(subscribe_one_notifications[1], rpc::ActorTableData::ALIVE); + CheckActorData(subscribe_all_notifications[1], rpc::ActorTableData::DEAD); + CheckActorData(subscribe_one_notifications[1], rpc::ActorTableData::DEAD); - // Update the actor state to DEAD. - actor_table_data->set_state(rpc::ActorTableData::DEAD); + // Update the actor state to ALIVE. + actor_table_data->set_state(rpc::ActorTableData::ALIVE); ASSERT_TRUE(UpdateActor(actor_id, actor_table_data)); - // We should receive a new DEAD notification from the subscribe channel. + // We should receive a new ALIVE notification from the subscribe channel. WaitForExpectedCount(num_subscribe_all_notifications, 3); WaitForExpectedCount(num_subscribe_one_notifications, 3); - CheckActorData(subscribe_all_notifications[2], rpc::ActorTableData::DEAD); - CheckActorData(subscribe_one_notifications[2], rpc::ActorTableData::DEAD); + CheckActorData(subscribe_all_notifications[2], rpc::ActorTableData::ALIVE); + CheckActorData(subscribe_one_notifications[2], rpc::ActorTableData::ALIVE); } TEST_F(ServiceBasedGcsClientTest, TestObjectTableResubscribe) { diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 2e919c5ab..b63c145a3 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -949,16 +949,17 @@ void GcsActorManager::LoadInitialData(const EmptyCallback &done) { RAY_CHECK(unresolved_actors_[owner_node][owner_worker] .emplace(actor->GetActorID()) .second); - if (!actor->IsDetached() && worker_client_factory_) { - // This actor is owned. Send a long polling request to the actor's - // owner to determine when the actor should be removed. - PollOwnerForActorOutOfScope(actor); - } } else if (item.second.state() == ray::rpc::ActorTableData::ALIVE) { created_actors_[actor->GetNodeID()].emplace(actor->GetWorkerID(), actor->GetActorID()); } + if (!actor->IsDetached()) { + // This actor is owned. Send a long polling request to the actor's + // owner to determine when the actor should be removed. + PollOwnerForActorOutOfScope(actor); + } + auto &workers = owners_[actor->GetNodeID()]; auto it = workers.find(actor->GetWorkerID()); if (it == workers.end()) {