diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 2ab288355..f53248114 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -188,10 +188,18 @@ class ServiceBasedGcsClientTest : public ::testing::Test { .AsyncRegisterActor(task_spec, [](Status status) {}) .ok(); } - std::promise promise; - RAY_CHECK_OK(gcs_client_->Actors().AsyncRegisterActor( - task_spec, [&promise](Status status) { promise.set_value(status.ok()); })); - return WaitReady(promise.get_future(), timeout_ms_); + + // NOTE: GCS will not reply when actor registration fails, so when GCS restarts, gcs + // client will register the actor again and promise may be set twice. + auto promise = std::make_shared>(); + RAY_CHECK_OK( + gcs_client_->Actors().AsyncRegisterActor(task_spec, [promise](Status status) { + try { + promise->set_value(status.ok()); + } catch (...) { + } + })); + return WaitReady(promise->get_future(), timeout_ms_); } rpc::ActorTableData GetActor(const ActorID &actor_id) { @@ -208,14 +216,22 @@ class ServiceBasedGcsClientTest : public ::testing::Test { return actor_table_data; } - std::vector GetAllActors() { + std::vector GetAllActors(bool filter_non_dead_actor = false) { std::promise promise; std::vector actors; RAY_CHECK_OK(gcs_client_->Actors().AsyncGetAll( - [&actors, &promise](Status status, - const std::vector &result) { + [filter_non_dead_actor, &actors, &promise]( + Status status, const std::vector &result) { if (!result.empty()) { - actors.assign(result.begin(), result.end()); + if (filter_non_dead_actor) { + for (auto &iter : result) { + if (iter.state() == gcs::ActorTableData::DEAD) { + actors.emplace_back(iter); + } + } + } else { + actors.assign(result.begin(), result.end()); + } } promise.set_value(true); })); @@ -1041,15 +1057,22 @@ TEST_F(ServiceBasedGcsClientTest, TestActorTableResubscribe) { // didn't restart, it will fetch data again from the GCS server. The GCS will destroy // the actor because it finds that the actor is out of scope, so we'll receive another // notification of DEAD state. - WaitForExpectedCount(num_subscribe_all_notifications, 3); WaitForExpectedCount(num_subscribe_one_notifications, 3); - /// NOTE: GCS will not reply when actor registration fails, so when GCS restarts, gcs - /// client will register the actor again. When an actor is registered, the status in GCS - /// is `DEPENDENCIES_UNREADY`. When GCS finds that the owner of an actor is nil, it will - /// destroy the actor and the status of the actor will change to `DEAD`. The GCS client - /// fetch actor info from the GCS server, and the status of the actor may be - /// `DEPENDENCIES_UNREADY` or `DEAD`, so we do not assert the actor status here any - /// more. + + // NOTE: GCS will not reply when actor registration fails, so when GCS restarts, gcs + // client will register the actor again. When an actor is registered, the status in GCS + // is `DEPENDENCIES_UNREADY`. When GCS finds that the owner of an actor is nil, it will + // destroy the actor and the status of the actor will change to `DEAD`. The GCS client + // fetch actor info from the GCS server, and the status of the actor may be + // `DEPENDENCIES_UNREADY` or `DEAD`, so we do not assert the actor status here any + // more. + // If the status of the actor is `DEPENDENCIES_UNREADY`, we will fetch two records, so + // `num_subscribe_all_notifications` will be 4. If the status of the actor is `DEAD`, we + // will fetch one record, so `num_subscribe_all_notifications` will be 3. + auto condition = [&num_subscribe_all_notifications]() { + return num_subscribe_all_notifications == 3 || num_subscribe_all_notifications == 4; + }; + EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count())); } TEST_F(ServiceBasedGcsClientTest, TestObjectTableResubscribe) { @@ -1346,14 +1369,16 @@ TEST_F(ServiceBasedGcsClientTest, TestEvictExpiredDestroyedActors) { actor_ids.insert(ActorID::FromBinary(actor_table_data->actor_id())); } - // Get all actors. + // NOTE: GCS will not reply when actor registration fails, so when GCS restarts, gcs + // client will register the actor again and the status of the actor may be + // `DEPENDENCIES_UNREADY` or `DEAD`. We should get all dead actors. auto condition = [this]() { - return GetAllActors().size() == + return GetAllActors(true).size() == RayConfig::instance().maximum_gcs_destroyed_actor_cached_count(); }; EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count())); - auto actors = GetAllActors(); + auto actors = GetAllActors(true); for (const auto &actor : actors) { EXPECT_TRUE(actor_ids.contains(ActorID::FromBinary(actor.actor_id()))); }