diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 6fea9e273..1b6280556 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -8,7 +8,7 @@ except ImportError: pytest_timeout = None import ray -from ray.test_utils import wait_for_condition +from ray.test_utils import get_other_nodes, wait_for_condition import ray.cluster_utils from ray._raylet import PlacementGroupID @@ -350,5 +350,73 @@ def test_cuda_visible_devices(ray_start_cluster): assert devices == "0", devices +def test_placement_group_reschedule_when_node_dead(ray_start_cluster): + @ray.remote(num_cpus=1) + class Actor(object): + def __init__(self): + self.n = 0 + + def value(self): + return self.n + + cluster = ray_start_cluster + cluster.add_node(num_cpus=4) + cluster.add_node(num_cpus=4) + cluster.add_node(num_cpus=4) + cluster.wait_for_nodes() + ray.init(address=cluster.address) + + # Make sure both head and worker node are alive. + nodes = ray.nodes() + assert len(nodes) == 3 + assert nodes[0]["alive"] and nodes[1]["alive"] and nodes[2]["alive"] + + placement_group_id = ray.experimental.placement_group( + name="name", + strategy="SPREAD", + bundles=[{ + "CPU": 2 + }, { + "CPU": 2 + }, { + "CPU": 2 + }]) + actor_1 = Actor.options( + placement_group_id=placement_group_id, + placement_group_bundle_index=0, + detached=True).remote() + actor_2 = Actor.options( + placement_group_id=placement_group_id, + placement_group_bundle_index=1, + detached=True).remote() + actor_3 = Actor.options( + placement_group_id=placement_group_id, + placement_group_bundle_index=2, + detached=True).remote() + print(ray.get(actor_1.value.remote())) + print(ray.get(actor_2.value.remote())) + print(ray.get(actor_3.value.remote())) + + cluster.remove_node(get_other_nodes(cluster, exclude_head=True)[-1]) + cluster.wait_for_nodes() + + actor_4 = Actor.options( + placement_group_id=placement_group_id, + placement_group_bundle_index=0, + detached=True).remote() + actor_5 = Actor.options( + placement_group_id=placement_group_id, + placement_group_bundle_index=1, + detached=True).remote() + actor_6 = Actor.options( + placement_group_id=placement_group_id, + placement_group_bundle_index=2, + detached=True).remote() + print(ray.get(actor_4.value.remote())) + print(ray.get(actor_5.value.remote())) + print(ray.get(actor_6.value.remote())) + ray.shutdown() + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 679e01ff4..cebd8a93a 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -395,6 +395,8 @@ std::shared_ptr GcsNodeManager::RemoveNode( alive_nodes_.erase(iter); // Remove from cluster resources. cluster_resources_.erase(node_id); + // Remove from cluster realtime resources. + cluster_realtime_resources_.erase(node_id); if (!is_intended) { // Broadcast a warning to all of the drivers indicating that the node // has been marked as dead. diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index 75ae6b146..a4710a77e 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -40,7 +40,7 @@ std::string GcsPlacementGroup::GetName() const { } std::vector> GcsPlacementGroup::GetBundles() const { - auto bundles = placement_group_table_data_.bundles(); + const auto &bundles = placement_group_table_data_.bundles(); std::vector> ret_bundles; for (auto &bundle : bundles) { ret_bundles.push_back(std::make_shared(bundle)); @@ -48,6 +48,18 @@ std::vector> GcsPlacementGroup::GetBundles( return ret_bundles; } +std::vector> GcsPlacementGroup::GetUnplacedBundles() + const { + const auto &bundles = placement_group_table_data_.bundles(); + std::vector> unplaced_bundles; + for (auto &bundle : bundles) { + if (ClientID::FromBinary(bundle.node_id()).IsNil()) { + unplaced_bundles.push_back(std::make_shared(bundle)); + } + } + return unplaced_bundles; +} + rpc::PlacementStrategy GcsPlacementGroup::GetStrategy() const { return placement_group_table_data_.strategy(); } @@ -56,13 +68,17 @@ const rpc::PlacementGroupTableData &GcsPlacementGroup::GetPlacementGroupTableDat return placement_group_table_data_; } -const std::string GcsPlacementGroup::DebugString() const { +std::string GcsPlacementGroup::DebugString() const { std::stringstream stream; stream << "placement group id = " << GetPlacementGroupID() << ", name = " << GetName() << ", strategy = " << GetStrategy(); return stream.str(); } +rpc::Bundle *GcsPlacementGroup::GetMutableBundle(int bundle_index) { + return placement_group_table_data_.mutable_bundles(bundle_index); +} + ///////////////////////////////////////////////////////////////////////////////////////// GcsPlacementGroupManager::GcsPlacementGroupManager( @@ -74,20 +90,18 @@ GcsPlacementGroupManager::GcsPlacementGroupManager( gcs_table_storage_(std::move(gcs_table_storage)) {} void GcsPlacementGroupManager::RegisterPlacementGroup( - const ray::rpc::CreatePlacementGroupRequest &request, StatusCallback callback) { + const std::shared_ptr &placement_group, StatusCallback callback) { RAY_CHECK(callback); - const auto &placement_group_spec = request.placement_group_spec(); - auto placement_group_id = - PlacementGroupID::FromBinary(placement_group_spec.placement_group_id()); - auto placement_group = std::make_shared(request); // TODO(ffbin): If GCS is restarted, GCS client will repeatedly send // `CreatePlacementGroup` requests, // which will lead to resource leakage, we will solve it in next pr. // Mark the callback as pending and invoke it after the placement_group has been // successfully created. - placement_group_to_register_callback_[placement_group_id] = std::move(callback); - registered_placement_groups_.emplace(placement_group_id, placement_group); + placement_group_to_register_callback_[placement_group->GetPlacementGroupID()] = + std::move(callback); + registered_placement_groups_.emplace(placement_group->GetPlacementGroupID(), + placement_group); pending_placement_groups_.emplace_back(std::move(placement_group)); SchedulePendingPlacementGroups(); } @@ -108,9 +122,20 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationFailed( std::shared_ptr placement_group) { RAY_LOG(WARNING) << "Failed to create placement group " << placement_group->GetName() << ", try again."; - // We will attempt to schedule this placement_group once - // an eligible node is registered. - pending_placement_groups_.emplace_back(std::move(placement_group)); + // We will attempt to schedule this placement_group once an eligible node is + // registered. + auto state = placement_group->GetState(); + RAY_CHECK(state == rpc::PlacementGroupTableData::RESCHEDULING || + state == rpc::PlacementGroupTableData::PENDING); + if (state == rpc::PlacementGroupTableData::RESCHEDULING) { + // NOTE: If a node is dead, the placement group scheduler should try to recover the + // group by rescheduling the bundles of the dead node. This should have higher + // priority than trying to place other placement groups. + pending_placement_groups_.emplace_front(std::move(placement_group)); + } else { + pending_placement_groups_.emplace_back(std::move(placement_group)); + } + MarkSchedulingDone(); RetryCreatingPlacementGroup(); } @@ -147,7 +172,7 @@ void GcsPlacementGroupManager::SchedulePendingPlacementGroups() { if (registered_placement_groups_.find(placement_group_id) != registered_placement_groups_.end()) { MarkSchedulingStarted(placement_group_id); - gcs_placement_group_scheduler_->Schedule( + gcs_placement_group_scheduler_->ScheduleUnplacedBundles( placement_group, [this](std::shared_ptr placement_group) { OnPlacementGroupCreationFailed(std::move(placement_group)); @@ -186,18 +211,18 @@ void GcsPlacementGroupManager::HandleCreatePlacementGroup( return; } - RegisterPlacementGroup( - request, [reply, send_reply_callback, placement_group](Status status) { - if (status.ok()) { - RAY_LOG(INFO) << "Finished registering placement group, " - << placement_group->DebugString(); - } else { - RAY_LOG(WARNING) - << "Failed to register placement group, " - << placement_group->DebugString() << ", cause: " << status.message(); - } - GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); - }); + RegisterPlacementGroup(placement_group, [reply, send_reply_callback, + placement_group](Status status) { + if (status.ok()) { + RAY_LOG(INFO) << "Finished registering placement group, " + << placement_group->DebugString(); + } else { + RAY_LOG(WARNING) << "Failed to register placement group, " + << placement_group->DebugString() + << ", cause: " << status.message(); + } + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); + }); })); } @@ -299,5 +324,26 @@ void GcsPlacementGroupManager::RetryCreatingPlacementGroup() { RayConfig::instance().gcs_create_placement_group_retry_interval_ms()); } +void GcsPlacementGroupManager::OnNodeDead(const ClientID &node_id) { + RAY_LOG(WARNING) << "Node " << node_id + << " failed, rescheduling the placement groups on the dead node."; + auto bundles = gcs_placement_group_scheduler_->GetBundlesOnNode(node_id); + for (const auto &bundle : bundles) { + auto iter = registered_placement_groups_.find(bundle.first); + if (iter != registered_placement_groups_.end()) { + for (const auto &bundle_index : bundle.second) { + iter->second->GetMutableBundle(bundle_index)->clear_node_id(); + } + // TODO(ffbin): If we have a placement group bundle that requires a unique resource + // (for example gpu resource when there’s only one gpu node), this can postpone + // creating until a node with the resources is added. we will solve it in next pr. + iter->second->UpdateState(rpc::PlacementGroupTableData::RESCHEDULING); + pending_placement_groups_.emplace_front(iter->second); + } + } + + SchedulePendingPlacementGroups(); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h index a13231ef9..ce21b8d01 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h @@ -57,24 +57,32 @@ class GcsPlacementGroup { /// Get the immutable PlacementGroupTableData of this placement group. const rpc::PlacementGroupTableData &GetPlacementGroupTableData(); + /// Get the mutable bundle of this placement group. + rpc::Bundle *GetMutableBundle(int bundle_index); + /// Update the state of this placement_group. void UpdateState(rpc::PlacementGroupTableData::PlacementGroupState state); + /// Get the state of this gcs placement_group. rpc::PlacementGroupTableData::PlacementGroupState GetState() const; /// Get the id of this placement_group. PlacementGroupID GetPlacementGroupID() const; + /// Get the name of this placement_group. std::string GetName() const; - /// Get the bundles of this placement_group + /// Get the bundles of this placement_group (including unplaced). std::vector> GetBundles() const; + /// Get the unplaced bundles of this placement group. + std::vector> GetUnplacedBundles() const; + /// Get the Strategy rpc::PlacementStrategy GetStrategy() const; // Get debug string for the placement group. - const std::string DebugString() const; + std::string DebugString() const; private: /// The placement_group meta data which contains the task specification as well as the @@ -120,16 +128,15 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { /// Register placement_group asynchronously. /// - /// \param request Contains the meta info to create the placement_group. + /// \param placement_group The placement group to be created. /// \param callback Will be invoked after the placement_group is created successfully or /// be invoked immediately if the placement_group is already registered to /// `registered_placement_groups_` and its state is `CREATED`. The callback will not be /// called in this case. - void RegisterPlacementGroup(const rpc::CreatePlacementGroupRequest &request, + void RegisterPlacementGroup(const std::shared_ptr &placement_group, StatusCallback callback); /// Schedule placement_groups in the `pending_placement_groups_` queue. - /// This function is exposed for testing only. void SchedulePendingPlacementGroups(); /// Get the placement_group ID for the named placement_group. Returns nil if the @@ -156,6 +163,12 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { void RemovePlacementGroup(const PlacementGroupID &placement_group_id, StatusCallback on_placement_group_removed); + /// Handle a node death. This will reschedule all bundles associated with the + /// specified node id. + /// + /// \param node_id The specified node id. + void OnNodeDead(const ClientID &node_id); + private: /// Try to create placement group after a short time. void RetryCreatingPlacementGroup(); @@ -192,6 +205,9 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { /// The pending placement_groups which will not be scheduled until there's a resource /// change. + /// NOTE: When we remove placement group, we need to look for + /// `pending_placement_groups_` and delete the specific placement group, so we can't use + /// `std::priority_queue`. std::deque> pending_placement_groups_; /// The scheduler to schedule all registered placement_groups. diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index dca55e0e8..0e628adee 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -114,15 +114,17 @@ ScheduleMap GcsSpreadStrategy::Schedule( return schedule_map; } -void GcsPlacementGroupScheduler::Schedule( +void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( std::shared_ptr placement_group, std::function)> failure_callback, std::function)> success_callback) { - RAY_LOG(INFO) << "Scheduling placement group " << placement_group->GetName(); - auto bundles = placement_group->GetBundles(); + auto bundles = placement_group->GetUnplacedBundles(); auto strategy = placement_group->GetStrategy(); - auto selected_nodes = - scheduler_strategies_[strategy]->Schedule(bundles, GetScheduleContext()); + + RAY_LOG(INFO) << "Scheduling placement group " << placement_group->GetName() + << ", bundles size = " << bundles.size(); + auto selected_nodes = scheduler_strategies_[strategy]->Schedule( + bundles, GetScheduleContext(placement_group->GetPlacementGroupID())); // If no nodes are available, scheduling fails. if (selected_nodes.empty()) { @@ -177,10 +179,10 @@ void GcsPlacementGroupScheduler::Schedule( void GcsPlacementGroupScheduler::DestroyPlacementGroupBundleResourcesIfExists( const PlacementGroupID &placement_group_id) { - auto it = placement_group_to_bundle_location_.find(placement_group_id); + auto it = placement_group_to_bundle_locations_.find(placement_group_id); // If bundle location has been already removed, it means bundles // are already destroyed. Do nothing. - if (it == placement_group_to_bundle_location_.end()) { + if (it == placement_group_to_bundle_locations_.end()) { return; } @@ -190,7 +192,7 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupBundleResourcesIfExists( auto &node_id = iter.second.first; CancelResourceReserve(bundle_spec, gcs_node_manager_.GetNode(node_id)); } - placement_group_to_bundle_location_.erase(it); + placement_group_to_bundle_locations_.erase(it); // Remove bundles from node_to_leased_bundles_ because bundels are removed now. for (const auto &bundle_location : *bundle_locations) { @@ -243,7 +245,7 @@ void GcsPlacementGroupScheduler::CancelResourceReserve( const std::shared_ptr &bundle_spec, const std::shared_ptr &node) { if (node == nullptr) { - RAY_LOG(WARNING) << "Node id " << node->node_id() << " for a placement group id " + RAY_LOG(WARNING) << "Node for a placement group id " << bundle_spec->PlacementGroupId() << " and a bundle index, " << bundle_spec->Index() << " has already removed. Cancellation request will be ignored."; @@ -285,9 +287,7 @@ void GcsPlacementGroupScheduler::OnAllBundleSchedulingRequestReturned( const std::function)> &schedule_success_handler) { const auto &placement_group_id = placement_group->GetPlacementGroupID(); - RAY_CHECK( - placement_group_to_bundle_location_.emplace(placement_group_id, bundle_locations) - .second); + placement_group_to_bundle_locations_.emplace(placement_group_id, bundle_locations); if (placement_group_leasing_in_progress_.find(placement_group_id) == placement_group_leasing_in_progress_.end() || @@ -310,12 +310,14 @@ void GcsPlacementGroupScheduler::OnAllBundleSchedulingRequestReturned( [schedule_success_handler, placement_group](Status status) { schedule_success_handler(placement_group); })); - // Update `node_to_leased_bundles_`. + for (const auto &iter : *bundle_locations) { const auto &location = iter.second; const auto &bundle_sepc = location.second; node_to_leased_bundles_[location.first].emplace(bundle_sepc->BundleId(), bundle_sepc); + placement_group->GetMutableBundle(location.second->Index()) + ->set_node_id(location.first.Binary()); } } // Erase leasing in progress placement group. @@ -326,7 +328,8 @@ void GcsPlacementGroupScheduler::OnAllBundleSchedulingRequestReturned( } } -std::unique_ptr GcsPlacementGroupScheduler::GetScheduleContext() { +std::unique_ptr GcsPlacementGroupScheduler::GetScheduleContext( + const PlacementGroupID &placement_group_id) { // TODO(ffbin): We will add listener to the GCS node manager to handle node deletion. auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes(); for (const auto &iter : alive_nodes) { @@ -341,8 +344,28 @@ std::unique_ptr GcsPlacementGroupScheduler::GetScheduleContext( for (const auto &iter : node_to_leased_bundles_) { node_to_bundles->emplace(iter.first, iter.second.size()); } - return std::unique_ptr( - new ScheduleContext(node_to_bundles, gcs_node_manager_)); + + std::shared_ptr bundle_locations = nullptr; + auto iter = placement_group_to_bundle_locations_.find(placement_group_id); + if (iter != placement_group_to_bundle_locations_.end()) { + bundle_locations = iter->second; + } + return std::unique_ptr(new ScheduleContext( + std::move(node_to_bundles), bundle_locations, gcs_node_manager_)); +} + +absl::flat_hash_map> +GcsPlacementGroupScheduler::GetBundlesOnNode(const ClientID &node_id) { + absl::flat_hash_map> bundles_on_node; + const auto node_iter = node_to_leased_bundles_.find(node_id); + if (node_iter != node_to_leased_bundles_.end()) { + const auto &bundles = node_iter->second; + for (auto &bundle : bundles) { + bundles_on_node[bundle.first.first].push_back(bundle.second->BundleId().second); + } + node_to_leased_bundles_.erase(node_iter); + } + return bundles_on_node; } } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index f33c64d40..f56e337c4 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -40,18 +40,27 @@ struct pair_hash { using ScheduleMap = std::unordered_map; using BundleLocations = std::unordered_map< BundleID, std::pair>, pair_hash>; + class GcsPlacementGroup; class GcsPlacementGroupSchedulerInterface { public: - /// Schedule the specified placement_group. + /// Schedule unplaced bundles of the specified placement group. /// - /// \param placement_group to be scheduled. - virtual void Schedule( + /// \param placement_group The placement group to be scheduled. + /// \param failure_callback This function is called if the schedule is failed. + /// \param success_callback This function is called if the schedule is successful. + virtual void ScheduleUnplacedBundles( std::shared_ptr placement_group, - std::function)> schedule_failure_handler, - std::function)> - schedule_success_handler) = 0; + std::function)> failure_callback, + std::function)> success_callback) = 0; + + /// Get bundles belong to the specified node. + /// + /// \param node_id ID of the dead node. + /// \return The bundles belong to the dead node. + virtual absl::flat_hash_map> GetBundlesOnNode( + const ClientID &node_id) = 0; /// Destroy bundle resources from all nodes in the placement group. virtual void DestroyPlacementGroupBundleResourcesIfExists( @@ -66,11 +75,16 @@ class GcsPlacementGroupSchedulerInterface { class ScheduleContext { public: ScheduleContext(std::shared_ptr> node_to_bundles, + const std::shared_ptr &bundle_locations, const GcsNodeManager &node_manager) - : node_to_bundles_(std::move(node_to_bundles)), node_manager_(node_manager) {} + : node_to_bundles_(std::move(node_to_bundles)), + bundle_locations_(bundle_locations), + node_manager_(node_manager) {} // Key is node id, value is the number of bundles on the node. - std::shared_ptr> node_to_bundles_; + const std::shared_ptr> node_to_bundles_; + // The locations of existing bundles for this placement group. + const std::shared_ptr &bundle_locations_; const GcsNodeManager &node_manager_; }; @@ -124,7 +138,7 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { virtual ~GcsPlacementGroupScheduler() = default; - /// Schedule the specified placement_group. + /// Schedule unplaced bundles of the specified placement group. /// If there is no available nodes then the `schedule_failed_handler` will be /// triggered, otherwise the bundle in placement_group will be add into a queue and /// schedule all bundle by calling ReserveResourceFromNode(). @@ -132,7 +146,7 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// \param placement_group to be scheduled. /// \param failure_callback This function is called if the schedule is failed. /// \param success_callback This function is called if the schedule is successful. - void Schedule( + void ScheduleUnplacedBundles( std::shared_ptr placement_group, std::function)> failure_handler, std::function)> success_handler) override; @@ -151,6 +165,13 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// cancelled. void MarkScheduleCancelled(const PlacementGroupID &placement_group_id) override; + /// Get bundles belong to the specified node. + /// + /// \param node_id ID of the dead node. + /// \return The bundles belong to the dead node. + absl::flat_hash_map> GetBundlesOnNode( + const ClientID &node_id) override; + protected: /// Lease resource from the specified node for the specified bundle. void ReserveResourceFromNode(const std::shared_ptr &bundle, @@ -177,8 +198,9 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { const std::function)> &schedule_success_handler); - /// Generate schedule conetext. - std::unique_ptr GetScheduleContext(); + /// Generate schedule context. + std::unique_ptr GetScheduleContext( + const PlacementGroupID &placement_group_id); /// A timer that ticks every cancel resource failure milliseconds. boost::asio::deadline_timer return_timer_; @@ -217,10 +239,11 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { absl::flat_hash_set placement_group_leasing_in_progress_; /// A map from placement group id to bundle locations. - /// It is used to destroy bundles for the placement group. + /// It is used to destroy bundles for the placement group. When we reschedule bundles, + /// we can get the location of other bundles from here. /// NOTE: It is a reverse index of `node_to_leased_bundles`. absl::flat_hash_map> - placement_group_to_bundle_location_; + placement_group_to_bundle_locations_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index b9d7c27b1..f16f1bfe1 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -204,8 +204,9 @@ void GcsServer::InitGcsActorManager() { gcs_node_manager_->AddNodeRemovedListener( [this](std::shared_ptr node) { - // All of the related actors should be reconstructed when a node is removed from - // the GCS. + // All of the related placement groups and actors should be reconstructed when a + // node is removed from the GCS. + gcs_placement_group_manager_->OnNodeDead(ClientID::FromBinary(node->node_id())); gcs_actor_manager_->OnNodeDead(ClientID::FromBinary(node->node_id())); }); diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc index a5f1afc1d..65c8c4206 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -24,14 +24,14 @@ using ::testing::_; class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterface { public: - MockPlacementGroupScheduler() {} + MockPlacementGroupScheduler() = default; - void Schedule(std::shared_ptr placement_group, - std::function)> - schedule_failure_handler = nullptr, - std::function)> - schedule_success_handler = nullptr) { - placement_groups.push_back(placement_group); + void ScheduleUnplacedBundles( + std::shared_ptr placement_group, + std::function)> failure_handler, + std::function)> success_handler) + override { + placement_groups_.push_back(placement_group); } MOCK_METHOD1(DestroyPlacementGroupBundleResourcesIfExists, @@ -39,7 +39,16 @@ class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterf MOCK_METHOD1(MarkScheduleCancelled, void(const PlacementGroupID &placement_group_id)); - std::vector> placement_groups; + absl::flat_hash_map> GetBundlesOnNode( + const ClientID &node_id) override { + absl::flat_hash_map> bundles; + bundles[group_on_dead_node_] = bundles_on_dead_node_; + return bundles; + } + + PlacementGroupID group_on_dead_node_; + std::vector bundles_on_dead_node_; + std::vector> placement_groups_; }; class GcsPlacementGroupManagerTest : public ::testing::Test { @@ -75,16 +84,17 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { }; TEST_F(GcsPlacementGroupManagerTest, TestBasic) { - auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); + auto request = Mocker::GenCreatePlacementGroupRequest(); std::atomic finished_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( - create_placement_group_request, [&finished_placement_group_count](Status status) { + std::make_shared(request), + [&finished_placement_group_count](Status status) { ++finished_placement_group_count; }); ASSERT_EQ(finished_placement_group_count, 0); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1); - auto placement_group = mock_placement_group_scheduler_->placement_groups.back(); - mock_placement_group_scheduler_->placement_groups.pop_back(); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); + auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); + mock_placement_group_scheduler_->placement_groups_.pop_back(); gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); WaitForExpectedCount(finished_placement_group_count, 1); @@ -92,22 +102,23 @@ TEST_F(GcsPlacementGroupManagerTest, TestBasic) { } TEST_F(GcsPlacementGroupManagerTest, TestSchedulingFailed) { - auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); + auto request = Mocker::GenCreatePlacementGroupRequest(); std::atomic finished_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( - create_placement_group_request, [&finished_placement_group_count](Status status) { + std::make_shared(request), + [&finished_placement_group_count](Status status) { ++finished_placement_group_count; }); ASSERT_EQ(finished_placement_group_count, 0); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1); - auto placement_group = mock_placement_group_scheduler_->placement_groups.back(); - mock_placement_group_scheduler_->placement_groups.clear(); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); + auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); + mock_placement_group_scheduler_->placement_groups_.clear(); gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group); gcs_placement_group_manager_->SchedulePendingPlacementGroups(); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1); - mock_placement_group_scheduler_->placement_groups.clear(); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); + mock_placement_group_scheduler_->placement_groups_.clear(); ASSERT_EQ(finished_placement_group_count, 0); // Check that the placement_group is in state `CREATED`. @@ -117,54 +128,54 @@ TEST_F(GcsPlacementGroupManagerTest, TestSchedulingFailed) { } TEST_F(GcsPlacementGroupManagerTest, TestGetPlacementGroupIDByName) { - auto create_placement_group_request = - Mocker::GenCreatePlacementGroupRequest("test_name"); + auto request = Mocker::GenCreatePlacementGroupRequest("test_name"); std::atomic finished_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( - create_placement_group_request, [&finished_placement_group_count](Status status) { + std::make_shared(request), + [&finished_placement_group_count](Status status) { ++finished_placement_group_count; }); ASSERT_EQ(finished_placement_group_count, 0); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1); - auto placement_group = mock_placement_group_scheduler_->placement_groups.back(); - mock_placement_group_scheduler_->placement_groups.pop_back(); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); + auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); + mock_placement_group_scheduler_->placement_groups_.pop_back(); gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); WaitForExpectedCount(finished_placement_group_count, 1); ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED); ASSERT_EQ( gcs_placement_group_manager_->GetPlacementGroupIDByName("test_name"), - PlacementGroupID::FromBinary( - create_placement_group_request.placement_group_spec().placement_group_id())); + PlacementGroupID::FromBinary(request.placement_group_spec().placement_group_id())); } TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeAdd) { - auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); + auto request = Mocker::GenCreatePlacementGroupRequest(); std::atomic finished_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( - create_placement_group_request, [&finished_placement_group_count](Status status) { + std::make_shared(request), + [&finished_placement_group_count](Status status) { ++finished_placement_group_count; }); ASSERT_EQ(finished_placement_group_count, 0); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1); - auto placement_group = mock_placement_group_scheduler_->placement_groups.back(); - mock_placement_group_scheduler_->placement_groups.pop_back(); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); + auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); + mock_placement_group_scheduler_->placement_groups_.pop_back(); // If the creation of placement group fails, it will be rescheduled after a short time. gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group); auto condition = [this]() { - return (int)mock_placement_group_scheduler_->placement_groups.size() == 1; + return (int)mock_placement_group_scheduler_->placement_groups_.size() == 1; }; EXPECT_TRUE(WaitForCondition(condition, 10 * 1000)); } TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) { - auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); + auto request = Mocker::GenCreatePlacementGroupRequest(); std::atomic finished_placement_group_count(0); std::atomic failed_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( - create_placement_group_request, + std::make_shared(request), [&finished_placement_group_count, &failed_placement_group_count](Status status) { if (status.ok()) { ++finished_placement_group_count; @@ -175,9 +186,9 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) { ASSERT_EQ(finished_placement_group_count, 0); ASSERT_EQ(failed_placement_group_count, 0); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1); - auto placement_group = mock_placement_group_scheduler_->placement_groups.back(); - mock_placement_group_scheduler_->placement_groups.clear(); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); + auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); + mock_placement_group_scheduler_->placement_groups_.clear(); gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group); ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::PENDING); @@ -188,8 +199,8 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) { // Make sure it is not rescheduled gcs_placement_group_manager_->SchedulePendingPlacementGroups(); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 0); - mock_placement_group_scheduler_->placement_groups.clear(); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0); + mock_placement_group_scheduler_->placement_groups_.clear(); WaitForExpectedCount(finished_placement_group_count, 0); WaitForExpectedCount(failed_placement_group_count, 1); @@ -199,11 +210,11 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) { } TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) { - auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); + auto request = Mocker::GenCreatePlacementGroupRequest(); std::atomic finished_placement_group_count(0); std::atomic failed_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( - create_placement_group_request, + std::make_shared(request), [&finished_placement_group_count, &failed_placement_group_count](Status status) { if (status.ok()) { ++finished_placement_group_count; @@ -214,9 +225,9 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) { ASSERT_EQ(finished_placement_group_count, 0); ASSERT_EQ(failed_placement_group_count, 0); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1); - auto placement_group = mock_placement_group_scheduler_->placement_groups.back(); - mock_placement_group_scheduler_->placement_groups.clear(); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); + auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); + mock_placement_group_scheduler_->placement_groups_.clear(); ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::PENDING); // Placement group is in leasing state. @@ -229,8 +240,8 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) { // Make sure it is not rescheduled gcs_placement_group_manager_->SchedulePendingPlacementGroups(); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 0); - mock_placement_group_scheduler_->placement_groups.clear(); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0); + mock_placement_group_scheduler_->placement_groups_.clear(); WaitForExpectedCount(finished_placement_group_count, 0); WaitForExpectedCount(failed_placement_group_count, 1); @@ -240,16 +251,17 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) { } TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) { - auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); + auto request = Mocker::GenCreatePlacementGroupRequest(); std::atomic finished_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( - create_placement_group_request, [&finished_placement_group_count](Status status) { + std::make_shared(request), + [&finished_placement_group_count](Status status) { if (status.ok()) { ++finished_placement_group_count; } }); - auto placement_group = mock_placement_group_scheduler_->placement_groups.back(); - mock_placement_group_scheduler_->placement_groups.pop_back(); + auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); + mock_placement_group_scheduler_->placement_groups_.pop_back(); gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); WaitForExpectedCount(finished_placement_group_count, 1); @@ -267,8 +279,8 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) { // Make sure it is not rescheduled gcs_placement_group_manager_->SchedulePendingPlacementGroups(); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 0); - mock_placement_group_scheduler_->placement_groups.clear(); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0); + mock_placement_group_scheduler_->placement_groups_.clear(); ASSERT_EQ(finished_placement_group_count, 1); // Make sure we can re-remove again. @@ -276,6 +288,61 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) { placement_group_id, [](Status status) { ASSERT_TRUE(status.ok()); }); } +TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeDead) { + auto request1 = Mocker::GenCreatePlacementGroupRequest(); + std::atomic finished_placement_group_count(0); + gcs_placement_group_manager_->RegisterPlacementGroup( + std::make_shared(request1), + [&finished_placement_group_count](Status status) { + ++finished_placement_group_count; + }); + auto request2 = Mocker::GenCreatePlacementGroupRequest(); + gcs_placement_group_manager_->RegisterPlacementGroup( + std::make_shared(request2), + [&finished_placement_group_count](Status status) { + ++finished_placement_group_count; + }); + ASSERT_EQ(finished_placement_group_count, 0); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); + auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); + placement_group->GetMutableBundle(0)->set_node_id(ClientID::FromRandom().Binary()); + placement_group->GetMutableBundle(1)->set_node_id(ClientID::FromRandom().Binary()); + mock_placement_group_scheduler_->placement_groups_.pop_back(); + + // If a node dies, we will set the bundles above it to be unplaced and reschedule the + // placement group. The placement group state is set to `RESCHEDULING` and will be + // scheduled first. + mock_placement_group_scheduler_->group_on_dead_node_ = + placement_group->GetPlacementGroupID(); + mock_placement_group_scheduler_->bundles_on_dead_node_.push_back(0); + gcs_placement_group_manager_->OnNodeDead(ClientID::FromRandom()); + + // Trigger scheduling `RESCHEDULING` placement group. + auto finished_group = std::make_shared( + placement_group->GetPlacementGroupTableData()); + gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(finished_group); + WaitForExpectedCount(finished_placement_group_count, 1); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_[0]->GetPlacementGroupID(), + placement_group->GetPlacementGroupID()); + const auto &bundles = + mock_placement_group_scheduler_->placement_groups_[0]->GetBundles(); + EXPECT_TRUE(ClientID::FromBinary(bundles[0]->GetMutableMessage().node_id()).IsNil()); + EXPECT_FALSE(ClientID::FromBinary(bundles[1]->GetMutableMessage().node_id()).IsNil()); + + // If `RESCHEDULING` placement group fails to create, we will schedule it again first. + placement_group = mock_placement_group_scheduler_->placement_groups_.back(); + mock_placement_group_scheduler_->placement_groups_.pop_back(); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0); + gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group); + auto condition = [this]() { + return (int)mock_placement_group_scheduler_->placement_groups_.size() == 1; + }; + EXPECT_TRUE(WaitForCondition(condition, 10 * 1000)); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_[0]->GetPlacementGroupID(), + placement_group->GetPlacementGroupID()); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index 4eca5d985..c4455e389 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -37,17 +37,16 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { io_service_, io_service_, gcs_pub_sub_, gcs_table_storage_); gcs_table_storage_ = std::make_shared(io_service_); store_client_ = std::make_shared(io_service_); - gcs_placement_group_scheduler_ = - std::make_shared( - io_service_, gcs_table_storage_, *gcs_node_manager_, - /*lease_client_fplacement_groupy=*/ - [this](const rpc::Address &address) { - if (0 == address.port()) { - return raylet_client_; - } else { - return raylet_client1_; - } - }); + scheduler_ = std::make_shared( + io_service_, gcs_table_storage_, *gcs_node_manager_, + /*lease_client_fplacement_groupy=*/ + [this](const rpc::Address &address) { + if (0 == address.port()) { + return raylet_client_; + } else { + return raylet_client1_; + } + }); } void TearDown() override { @@ -88,15 +87,15 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { // Failed to schedule the placement group, because the node resources is not enough. auto request = Mocker::GenCreatePlacementGroupRequest("", strategy); auto placement_group = std::make_shared(request); - gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, - success_handler); + scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, + success_handler); WaitPendingDone(failure_placement_groups_, 1); ASSERT_EQ(0, success_placement_groups_.size()); // A new node is added, and the rescheduling is successful. AddNode(Mocker::GenNodeInfo(0), 2); - gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, - success_handler); + scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, + success_handler); ASSERT_TRUE(raylet_client_->GrantResourceReserve()); ASSERT_TRUE(raylet_client_->GrantResourceReserve()); WaitPendingDone(success_placement_groups_, 1); @@ -112,8 +111,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { std::shared_ptr raylet_client_; std::shared_ptr raylet_client1_; std::shared_ptr gcs_node_manager_; - std::shared_ptr - gcs_placement_group_scheduler_; + std::shared_ptr scheduler_; std::vector> success_placement_groups_; std::vector> failure_placement_groups_; std::shared_ptr gcs_pub_sub_; @@ -127,7 +125,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestScheduleFailedWithZeroNode) { auto placement_group = std::make_shared(request); // Schedule the placement_group with zero node. - gcs_placement_group_scheduler_->Schedule( + scheduler_->ScheduleUnplacedBundles( placement_group, [this](std::shared_ptr placement_group) { failure_placement_groups_.emplace_back(std::move(placement_group)); @@ -154,7 +152,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupSuccess) { // Schedule the placement_group with 1 available node, and the lease request should be // send to the node. - gcs_placement_group_scheduler_->Schedule( + scheduler_->ScheduleUnplacedBundles( placement_group, [this](std::shared_ptr placement_group) { absl::MutexLock lock(&vector_mutex_); @@ -184,7 +182,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupFailed) { // Schedule the placement_group with 1 available node, and the lease request should be // send to the node. - gcs_placement_group_scheduler_->Schedule( + scheduler_->ScheduleUnplacedBundles( placement_group, [this](std::shared_ptr placement_group) { absl::MutexLock lock(&vector_mutex_); @@ -216,7 +214,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource) // Schedule the placement_group with 1 available node, and the lease request should be // send to the node. - gcs_placement_group_scheduler_->Schedule( + scheduler_->ScheduleUnplacedBundles( placement_group, [this](std::shared_ptr placement_group) { absl::MutexLock lock(&vector_mutex_); @@ -259,8 +257,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyBalancedScheduling) auto request = Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::STRICT_PACK); auto placement_group = std::make_shared(request); - gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, - success_handler); + scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, + success_handler); if (!raylet_client_->lease_callbacks.empty()) { ASSERT_TRUE(raylet_client_->GrantResourceReserve()); @@ -295,8 +293,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) { auto request = Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::STRICT_PACK); auto placement_group = std::make_shared(request); - gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, - success_handler); + scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); ASSERT_TRUE(raylet_client_->GrantResourceReserve()); ASSERT_TRUE(raylet_client_->GrantResourceReserve()); WaitPendingDone(success_placement_groups_, 1); @@ -309,8 +306,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) { Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::STRICT_PACK); auto placement_group2 = std::make_shared(create_placement_group_request2); - gcs_placement_group_scheduler_->Schedule(placement_group2, failure_handler, - success_handler); + scheduler_->ScheduleUnplacedBundles(placement_group2, failure_handler, success_handler); ASSERT_TRUE(raylet_client_->GrantResourceReserve()); ASSERT_TRUE(raylet_client_->GrantResourceReserve()); WaitPendingDone(success_placement_groups_, 2); @@ -327,7 +323,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyPlacementGroup) { // Schedule the placement_group with 1 available node, and the lease request should be // send to the node. - gcs_placement_group_scheduler_->Schedule( + scheduler_->ScheduleUnplacedBundles( placement_group, [this](std::shared_ptr placement_group) { absl::MutexLock lock(&vector_mutex_); @@ -342,14 +338,12 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyPlacementGroup) { WaitPendingDone(failure_placement_groups_, 0); WaitPendingDone(success_placement_groups_, 1); const auto &placement_group_id = placement_group->GetPlacementGroupID(); - gcs_placement_group_scheduler_->DestroyPlacementGroupBundleResourcesIfExists( - placement_group_id); + scheduler_->DestroyPlacementGroupBundleResourcesIfExists(placement_group_id); ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve()); ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve()); // Subsequent destroy request should not do anything. - gcs_placement_group_scheduler_->DestroyPlacementGroupBundleResourcesIfExists( - placement_group_id); + scheduler_->DestroyPlacementGroupBundleResourcesIfExists(placement_group_id); ASSERT_FALSE(raylet_client_->GrantCancelResourceReserve()); ASSERT_FALSE(raylet_client_->GrantCancelResourceReserve()); } @@ -366,7 +360,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyCancelledPlacementGroup) { // Schedule the placement_group with 1 available node, and the lease request should be // send to the node. - gcs_placement_group_scheduler_->Schedule( + scheduler_->ScheduleUnplacedBundles( placement_group, [this](std::shared_ptr placement_group) { absl::MutexLock lock(&vector_mutex_); @@ -379,7 +373,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyCancelledPlacementGroup) { // Now, cancel the schedule request. ASSERT_TRUE(raylet_client_->GrantResourceReserve()); - gcs_placement_group_scheduler_->MarkScheduleCancelled(placement_group_id); + scheduler_->MarkScheduleCancelled(placement_group_id); ASSERT_TRUE(raylet_client_->GrantResourceReserve()); ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve()); ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve()); @@ -407,8 +401,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPackStrategyLargeBundlesScheduling) { auto request = Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK, 15); auto placement_group = std::make_shared(request); - gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, - success_handler); + scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); RAY_CHECK(raylet_client_->num_lease_requested > 0); RAY_CHECK(raylet_client1_->num_lease_requested > 0); for (int index = 0; index < raylet_client_->num_lease_requested; ++index) { @@ -420,6 +413,51 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPackStrategyLargeBundlesScheduling) { WaitPendingDone(success_placement_groups_, 1); } +TEST_F(GcsPlacementGroupSchedulerTest, TestRescheduleWhenNodeDead) { + auto node0 = Mocker::GenNodeInfo(0); + auto node1 = Mocker::GenNodeInfo(1); + AddNode(node0); + AddNode(node1); + ASSERT_EQ(2, gcs_node_manager_->GetAllAliveNodes().size()); + + auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); + auto placement_group = + std::make_shared(create_placement_group_request); + + // Schedule the placement group successfully. + auto failure_handler = [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + failure_placement_groups_.emplace_back(std::move(placement_group)); + }; + auto success_handler = [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + success_placement_groups_.emplace_back(std::move(placement_group)); + }; + + scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client1_->GrantResourceReserve()); + WaitPendingDone(success_placement_groups_, 1); + + auto bundles_on_node0 = + scheduler_->GetBundlesOnNode(ClientID::FromBinary(node0->node_id())); + ASSERT_EQ(1, bundles_on_node0.size()); + auto bundles_on_node1 = + scheduler_->GetBundlesOnNode(ClientID::FromBinary(node1->node_id())); + ASSERT_EQ(1, bundles_on_node1.size()); + + // Node1 is dead, reschedule the placement group. + auto bundle_on_dead_node = placement_group->GetMutableBundle(0); + bundle_on_dead_node->clear_node_id(); + scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); + if (0 == bundles_on_node0[placement_group->GetPlacementGroupID()][0]) { + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + } else { + ASSERT_TRUE(raylet_client1_->GrantResourceReserve()); + } + WaitPendingDone(success_placement_groups_, 2); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 178e8dfc7..b776ec5ef 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -146,6 +146,8 @@ message Bundle { } BundleIdentifier bundle_id = 1; map unit_resources = 2; + // The location of this bundle. + bytes node_id = 3; } message PlacementGroupSpec { @@ -192,11 +194,11 @@ message ActorCreationTaskSpec { repeated string dynamic_worker_options = 4; // The max number of concurrent calls for direct call actors. int32 max_concurrency = 5; - // Whether the actor is persistent + // Whether the actor is persistent. bool is_detached = 6; // Globally-unique name of the actor. Should only be populated when is_detached is true. string name = 7; - // Whether the actor use async actor calls + // Whether the actor use async actor calls. bool is_asyncio = 8; // Field used for storing application-level extensions to the actor definition. string extension_data = 9; diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 7b003ed35..8a4fe4a9d 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -169,6 +169,8 @@ message PlacementGroupTableData { CREATED = 1; // Placement Group is already removed and won't be reschedule. REMOVED = 2; + // Placement Group is rescheduling because the node it placed is dead. + RESCHEDULING = 3; } // ID of the PlacementGroup.