diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index aa9902eec..9258bbc30 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -53,14 +53,16 @@ def test_gcs_server_restart(ray_start_regular): indirect=True) def test_gcs_server_restart_during_actor_creation(ray_start_regular): ids = [] - for i in range(0, 100): + # We reduce the number of actors because there are too many actors created + # and `Too many open files` error will be thrown. + for i in range(0, 20): actor = Increase.remote() ids.append(actor.method.remote(1)) ray.worker._global_node.kill_gcs_server() ray.worker._global_node.start_gcs_server() - ready, unready = ray.wait(ids, num_returns=100, timeout=240) + ready, unready = ray.wait(ids, num_returns=20, timeout=240) print("Ready objects is {}.".format(ready)) print("Unready objects is {}.".format(unready)) assert len(unready) == 0 diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index aef130e68..dc553995b 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -8,15 +8,21 @@ except ImportError: pytest_timeout = None import ray -from ray.test_utils import (get_other_nodes, wait_for_condition, +from ray.test_utils import (generate_system_config_map, get_other_nodes, + run_string_as_driver, wait_for_condition, get_error_message) import ray.cluster_utils from ray._raylet import PlacementGroupID -from ray.test_utils import run_string_as_driver from ray.util.placement_group import (PlacementGroup, get_current_placement_group) +@ray.remote +class Increase: + def method(self, x): + return x + 2 + + def test_placement_group_pack(ray_start_cluster): @ray.remote(num_cpus=2) class Actor(object): @@ -1156,5 +1162,96 @@ ray.shutdown() wait_for_condition(lambda: assert_num_cpus(num_nodes * num_cpu_per_node)) +@pytest.mark.parametrize( + "ray_start_cluster_head", [ + generate_system_config_map( + num_heartbeats_timeout=20, ping_gcs_rpc_server_max_retries=60) + ], + indirect=True) +def test_create_placement_group_after_gcs_server_restarts( + ray_start_cluster_head): + cluster = ray_start_cluster_head + cluster.add_node(num_cpus=2) + cluster.add_node(num_cpus=2) + cluster.wait_for_nodes() + + # Create placement group 1 successfully. + placement_group1 = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}]) + ray.get(placement_group1.ready(), timeout=2) + table = ray.util.placement_group_table(placement_group1) + assert table["state"] == "CREATED" + + # Restart gcs server. + cluster.head_node.kill_gcs_server() + cluster.head_node.start_gcs_server() + + # Create placement group 2 successfully. + placement_group2 = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}]) + ray.get(placement_group2.ready(), timeout=2) + table = ray.util.placement_group_table(placement_group2) + assert table["state"] == "CREATED" + + # Create placement group 3. + # Status is `PENDING` because the cluster resource is insufficient. + placement_group3 = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}]) + with pytest.raises(ray.exceptions.GetTimeoutError): + ray.get(placement_group3.ready(), timeout=2) + table = ray.util.placement_group_table(placement_group3) + assert table["state"] == "PENDING" + + +@pytest.mark.parametrize( + "ray_start_cluster_head", [ + generate_system_config_map( + num_heartbeats_timeout=20, ping_gcs_rpc_server_max_retries=60) + ], + indirect=True) +def test_create_actor_with_placement_group_after_gcs_server_restart( + ray_start_cluster_head): + cluster = ray_start_cluster_head + cluster.add_node(num_cpus=2) + cluster.wait_for_nodes() + + # Create a placement group. + placement_group = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}]) + + # Create an actor that occupies resources after gcs server restart. + cluster.head_node.kill_gcs_server() + cluster.head_node.start_gcs_server() + actor_2 = Increase.options( + placement_group=placement_group, + placement_group_bundle_index=1).remote() + assert ray.get(actor_2.method.remote(1)) == 3 + + +@pytest.mark.parametrize( + "ray_start_cluster_head", [ + generate_system_config_map( + num_heartbeats_timeout=20, ping_gcs_rpc_server_max_retries=60) + ], + indirect=True) +def test_create_placement_group_during_gcs_server_restart( + ray_start_cluster_head): + cluster = ray_start_cluster_head + cluster.add_node(num_cpus=20) + cluster.wait_for_nodes() + + # Create placement groups during gcs server restart. + placement_groups = [] + for i in range(0, 100): + placement_group = ray.util.placement_group([{ + "CPU": 0.1 + }, { + "CPU": 0.1 + }]) + placement_groups.append(placement_group) + + cluster.head_node.kill_gcs_server() + cluster.head_node.start_gcs_server() + + for i in range(0, 10): + ray.get(placement_groups[i].ready(), timeout=2) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index c48b6adfe..74d95e82a 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -117,19 +117,58 @@ GcsPlacementGroupManager::GcsPlacementGroupManager( void GcsPlacementGroupManager::RegisterPlacementGroup( const std::shared_ptr &placement_group, StatusCallback callback) { + // NOTE: After the abnormal recovery of the network between GCS client and GCS server or + // the GCS server is restarted, it is required to continue to register placement group + // successfully. RAY_CHECK(callback); + const auto &placement_group_id = placement_group->GetPlacementGroupID(); + + auto iter = registered_placement_groups_.find(placement_group_id); + if (iter != registered_placement_groups_.end()) { + auto pending_register_iter = + placement_group_to_register_callback_.find(placement_group_id); + if (pending_register_iter != placement_group_to_register_callback_.end()) { + // 1. The GCS client sends the `RegisterPlacementGroup` request to the GCS server. + // 2. The GCS client receives some network errors. + // 3. The GCS client resends the `RegisterPlacementGroup` request to the GCS server. + pending_register_iter->second = std::move(callback); + } else { + // 1. The GCS client sends the `RegisterPlacementGroup` request to the GCS server. + // 2. The GCS server flushes the placement group to the storage and restarts before + // replying to the GCS client. + // 3. The GCS client resends the `RegisterPlacementGroup` request to the GCS server. + RAY_LOG(INFO) << "Placement group " << placement_group_id + << " is already registered."; + callback(Status::OK()); + } + return; + } - // TODO(ffbin): If GCS is restarted, GCS client will repeatedly send - // `CreatePlacementGroup` requests, - // which will lead to resource leakage, we will solve it in next pr. // Mark the callback as pending and invoke it after the placement_group has been // successfully created. placement_group_to_register_callback_[placement_group->GetPlacementGroupID()] = std::move(callback); registered_placement_groups_.emplace(placement_group->GetPlacementGroupID(), placement_group); - pending_placement_groups_.emplace_back(std::move(placement_group)); - SchedulePendingPlacementGroups(); + pending_placement_groups_.emplace_back(placement_group); + + RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put( + placement_group_id, placement_group->GetPlacementGroupTableData(), + [this, placement_group_id, placement_group](Status status) { + RAY_CHECK_OK(status); + if (!registered_placement_groups_.contains(placement_group_id)) { + auto iter = placement_group_to_register_callback_.find(placement_group_id); + if (iter != placement_group_to_register_callback_.end()) { + std::stringstream stream; + stream << "Placement group of id " << placement_group_id + << " has been removed before registration."; + iter->second(Status::NotFound(stream.str())); + placement_group_to_register_callback_.erase(iter); + } + } else { + SchedulePendingPlacementGroups(); + } + })); } PlacementGroupID GcsPlacementGroupManager::GetPlacementGroupIDByName( @@ -217,41 +256,19 @@ void GcsPlacementGroupManager::HandleCreatePlacementGroup( const ray::rpc::CreatePlacementGroupRequest &request, ray::rpc::CreatePlacementGroupReply *reply, ray::rpc::SendReplyCallback send_reply_callback) { - auto placement_group_id = - PlacementGroupID::FromBinary(request.placement_group_spec().placement_group_id()); auto placement_group = std::make_shared(request); - RAY_LOG(INFO) << "Registering placement group, " << placement_group->DebugString(); - // We need this call here because otherwise, if placement group is removed right after - // here, it can cause inconsistent states. - registered_placement_groups_.emplace(placement_group_id, placement_group); - - RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put( - placement_group_id, placement_group->GetPlacementGroupTableData(), - [this, request, reply, send_reply_callback, placement_group_id, - placement_group](Status status) { - RAY_CHECK_OK(status); - if (!registered_placement_groups_.contains(placement_group_id)) { - std::stringstream stream; - stream << "Placement group of id " << placement_group_id - << " has been removed before registration."; - GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::NotFound(stream.str())); - return; - } - - RegisterPlacementGroup(placement_group, [reply, send_reply_callback, - placement_group](Status status) { - if (status.ok()) { - RAY_LOG(INFO) << "Finished registering placement group, " - << placement_group->DebugString(); - } else { - RAY_LOG(INFO) << "Failed to register placement group, " - << placement_group->DebugString() - << ", cause: " << status.message(); - } - GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); - }); - })); + RegisterPlacementGroup(placement_group, [reply, send_reply_callback, + placement_group](Status status) { + if (status.ok()) { + RAY_LOG(INFO) << "Finished registering placement group, " + << placement_group->DebugString(); + } else { + RAY_LOG(INFO) << "Failed to register placement group, " + << placement_group->DebugString() << ", cause: " << status.message(); + } + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); + }); } void GcsPlacementGroupManager::HandleRemovePlacementGroup( @@ -448,5 +465,29 @@ void GcsPlacementGroupManager::UpdatePlacementGroupLoad() { gcs_node_manager_.UpdatePlacementGroupLoad(move(placement_group_load)); } +void GcsPlacementGroupManager::LoadInitialData(const EmptyCallback &done) { + RAY_LOG(INFO) << "GcsPlacementGroupManager loading initial data."; + auto callback = [this, + done](const std::unordered_map &result) { + for (auto &item : result) { + auto placement_group = std::make_shared(item.second); + if (item.second.state() != rpc::PlacementGroupTableData::REMOVED) { + registered_placement_groups_.emplace(item.first, placement_group); + + if (item.second.state() == rpc::PlacementGroupTableData::PENDING || + item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) { + pending_placement_groups_.emplace_back(std::move(placement_group)); + } + } + } + + SchedulePendingPlacementGroups(); + RAY_LOG(INFO) << "Finished loading initial data."; + done(); + }; + RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().GetAll(callback)); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h index 68e1c6670..96a392684 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h @@ -231,6 +231,12 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { /// Collect stats from gcs placement group manager in-memory data structures. void CollectStats() const; + /// Load initial data from gcs storage to memory cache asynchronously. + /// This should be called when GCS server restarts after a failure. + /// + /// \param done Callback that will be called when load is complete. + void LoadInitialData(const EmptyCallback &done); + private: /// Try to create placement group after a short time. void RetryCreatingPlacementGroup(); diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 8a50a1269..099624d2f 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -101,7 +101,7 @@ void GcsServer::Start() { rpc_server_.RegisterService(*worker_info_service_); auto load_completed_count = std::make_shared(0); - int load_count = 2; + int load_count = 3; auto on_done = [this, load_count, load_completed_count]() { ++(*load_completed_count); @@ -126,6 +126,7 @@ void GcsServer::Start() { }; gcs_object_manager_->LoadInitialData(on_done); gcs_node_manager_->LoadInitialData(on_done); + gcs_placement_group_manager_->LoadInitialData(on_done); // Print debug info periodically. PrintDebugInfo(); diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc index f7cf0927b..af86eab3d 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -31,6 +31,7 @@ class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterf std::function)> failure_handler, std::function)> success_handler) override { + absl::MutexLock lock(&mutex_); placement_groups_.push_back(placement_group); } @@ -46,9 +47,15 @@ class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterf return bundles; } + int GetPlacementGroupCount() { + absl::MutexLock lock(&mutex_); + return placement_groups_.size(); + } + PlacementGroupID group_on_dead_node_; std::vector bundles_on_dead_node_; std::vector> placement_groups_; + absl::Mutex mutex_; }; class GcsPlacementGroupManagerTest : public ::testing::Test { @@ -78,6 +85,13 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { thread_io_service_->join(); } + void WaitForExpectedPgCount(int expected_count) { + auto condition = [this, expected_count]() { + return mock_placement_group_scheduler_->GetPlacementGroupCount() == expected_count; + }; + EXPECT_TRUE(WaitForCondition(condition, 10 * 1000)); + } + std::shared_ptr mock_placement_group_scheduler_; std::unique_ptr gcs_placement_group_manager_; @@ -95,11 +109,11 @@ TEST_F(GcsPlacementGroupManagerTest, TestBasic) { std::atomic finished_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( std::make_shared(request), - [&finished_placement_group_count](Status status) { + [&finished_placement_group_count](const Status &status) { ++finished_placement_group_count; }); ASSERT_EQ(finished_placement_group_count, 0); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); + WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); mock_placement_group_scheduler_->placement_groups_.pop_back(); @@ -113,12 +127,12 @@ TEST_F(GcsPlacementGroupManagerTest, TestSchedulingFailed) { std::atomic finished_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( std::make_shared(request), - [&finished_placement_group_count](Status status) { + [&finished_placement_group_count](const Status &status) { ++finished_placement_group_count; }); ASSERT_EQ(finished_placement_group_count, 0); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); + WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); mock_placement_group_scheduler_->placement_groups_.clear(); @@ -139,12 +153,12 @@ TEST_F(GcsPlacementGroupManagerTest, TestGetPlacementGroupIDByName) { std::atomic finished_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( std::make_shared(request), - [&finished_placement_group_count](Status status) { + [&finished_placement_group_count](const Status &status) { ++finished_placement_group_count; }); ASSERT_EQ(finished_placement_group_count, 0); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); + WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); mock_placement_group_scheduler_->placement_groups_.pop_back(); @@ -161,20 +175,17 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeAdd) { std::atomic finished_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( std::make_shared(request), - [&finished_placement_group_count](Status status) { + [&finished_placement_group_count](const Status &status) { ++finished_placement_group_count; }); ASSERT_EQ(finished_placement_group_count, 0); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); + WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); mock_placement_group_scheduler_->placement_groups_.pop_back(); // If the creation of placement group fails, it will be rescheduled after a short time. gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group); - auto condition = [this]() { - return (int)mock_placement_group_scheduler_->placement_groups_.size() == 1; - }; - EXPECT_TRUE(WaitForCondition(condition, 10 * 1000)); + WaitForExpectedPgCount(1); } TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) { @@ -183,7 +194,8 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) { std::atomic failed_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( std::make_shared(request), - [&finished_placement_group_count, &failed_placement_group_count](Status status) { + [&finished_placement_group_count, + &failed_placement_group_count](const Status &status) { if (status.ok()) { ++finished_placement_group_count; } else { @@ -191,9 +203,9 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) { } }); + WaitForExpectedPgCount(1); ASSERT_EQ(finished_placement_group_count, 0); ASSERT_EQ(failed_placement_group_count, 0); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); mock_placement_group_scheduler_->placement_groups_.clear(); @@ -201,7 +213,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) { ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::PENDING); const auto &placement_group_id = placement_group->GetPlacementGroupID(); gcs_placement_group_manager_->RemovePlacementGroup(placement_group_id, - [](Status status) {}); + [](const Status &status) {}); ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::REMOVED); // Make sure it is not rescheduled @@ -213,7 +225,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) { // Make sure we can re-remove again. gcs_placement_group_manager_->RemovePlacementGroup( - placement_group_id, [](Status status) { ASSERT_TRUE(status.ok()); }); + placement_group_id, [](const Status &status) { ASSERT_TRUE(status.ok()); }); } TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) { @@ -222,7 +234,8 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) { std::atomic failed_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( std::make_shared(request), - [&finished_placement_group_count, &failed_placement_group_count](Status status) { + [&finished_placement_group_count, + &failed_placement_group_count](const Status &status) { if (status.ok()) { ++finished_placement_group_count; } else { @@ -230,9 +243,9 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) { } }); + WaitForExpectedPgCount(1); ASSERT_EQ(finished_placement_group_count, 0); ASSERT_EQ(failed_placement_group_count, 0); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); mock_placement_group_scheduler_->placement_groups_.clear(); ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::PENDING); @@ -242,7 +255,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) { EXPECT_CALL(*mock_placement_group_scheduler_, MarkScheduleCancelled(placement_group_id)) .Times(1); gcs_placement_group_manager_->RemovePlacementGroup(placement_group_id, - [](Status status) {}); + [](const Status &status) {}); ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::REMOVED); gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group); @@ -255,7 +268,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) { // Make sure we can re-remove again. gcs_placement_group_manager_->RemovePlacementGroup( - placement_group_id, [](Status status) { ASSERT_TRUE(status.ok()); }); + placement_group_id, [](const Status &status) { ASSERT_TRUE(status.ok()); }); } TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) { @@ -263,11 +276,12 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) { std::atomic finished_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( std::make_shared(request), - [&finished_placement_group_count](Status status) { + [&finished_placement_group_count](const Status &status) { if (status.ok()) { ++finished_placement_group_count; } }); + WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); mock_placement_group_scheduler_->placement_groups_.pop_back(); @@ -282,7 +296,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) { EXPECT_CALL(*mock_placement_group_scheduler_, MarkScheduleCancelled(placement_group_id)) .Times(0); gcs_placement_group_manager_->RemovePlacementGroup(placement_group_id, - [](Status status) {}); + [](const Status &status) {}); ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::REMOVED); // Make sure it is not rescheduled @@ -293,7 +307,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) { // Make sure we can re-remove again. gcs_placement_group_manager_->RemovePlacementGroup( - placement_group_id, [](Status status) { ASSERT_TRUE(status.ok()); }); + placement_group_id, [](const Status &status) { ASSERT_TRUE(status.ok()); }); } TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeDead) { @@ -301,17 +315,17 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeDead) { std::atomic finished_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( std::make_shared(request1), - [&finished_placement_group_count](Status status) { + [&finished_placement_group_count](const Status &status) { ++finished_placement_group_count; }); auto request2 = Mocker::GenCreatePlacementGroupRequest(); gcs_placement_group_manager_->RegisterPlacementGroup( std::make_shared(request2), - [&finished_placement_group_count](Status status) { + [&finished_placement_group_count](const Status &status) { ++finished_placement_group_count; }); ASSERT_EQ(finished_placement_group_count, 0); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); + WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); placement_group->GetMutableBundle(0)->set_node_id(NodeID::FromRandom().Binary()); placement_group->GetMutableBundle(1)->set_node_id(NodeID::FromRandom().Binary()); @@ -343,10 +357,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeDead) { mock_placement_group_scheduler_->placement_groups_.pop_back(); ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0); gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group); - auto condition = [this]() { - return (int)mock_placement_group_scheduler_->placement_groups_.size() == 1; - }; - EXPECT_TRUE(WaitForCondition(condition, 10 * 1000)); + WaitForExpectedPgCount(1); ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_[0]->GetPlacementGroupID(), placement_group->GetPlacementGroupID()); } @@ -367,6 +378,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenActorDeadAndJobDead [&finished_placement_group_count](Status status) { ++finished_placement_group_count; }); + WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); auto placement_group_id = placement_group->GetPlacementGroupID(); gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); @@ -400,6 +412,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenActorAndJobDead) { [&finished_placement_group_count](Status status) { ++finished_placement_group_count; }); + WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); auto placement_group_id = placement_group->GetPlacementGroupID(); gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); @@ -433,6 +446,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenOnlyJobDead) { [&finished_placement_group_count](Status status) { ++finished_placement_group_count; }); + WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); auto placement_group_id = placement_group->GetPlacementGroupID(); gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); @@ -463,6 +477,7 @@ TEST_F(GcsPlacementGroupManagerTest, [&finished_placement_group_count](Status status) { ++finished_placement_group_count; }); + WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); auto placement_group_id = placement_group->GetPlacementGroupID(); gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);