From 6974cea0cd502ca8f5eca9618a2a1c72f882eedf Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 2 Oct 2020 20:54:26 -0700 Subject: [PATCH] [Core] Use optional return instead of nullptr for the GetNode method. --- src/ray/gcs/gcs_server/gcs_actor_scheduler.cc | 4 ++- src/ray/gcs/gcs_server/gcs_node_manager.cc | 4 +-- src/ray/gcs/gcs_server/gcs_node_manager.h | 4 +-- .../gcs_placement_group_scheduler.cc | 32 +++++++++++-------- .../gcs_placement_group_scheduler.h | 16 ++++++---- .../gcs_server/test/gcs_node_manager_test.cc | 4 +-- 6 files changed, 37 insertions(+), 27 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index 140c77662..bcf8b96da 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -277,7 +277,9 @@ void GcsActorScheduler::HandleWorkerLeasedReply( // node, and then try again on the new node. RAY_CHECK(!retry_at_raylet_address.raylet_id().empty()); auto spill_back_node_id = NodeID::FromBinary(retry_at_raylet_address.raylet_id()); - if (auto spill_back_node = gcs_node_manager_.GetNode(spill_back_node_id)) { + auto maybe_spill_back_node = gcs_node_manager_.GetNode(spill_back_node_id); + if (maybe_spill_back_node.has_value()) { + auto spill_back_node = maybe_spill_back_node.value(); actor->UpdateAddress(retry_at_raylet_address); RAY_CHECK(node_to_actors_when_leasing_[actor->GetNodeID()] .emplace(actor->GetActorID()) diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 97b706ce1..910f17fd5 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -367,11 +367,11 @@ void GcsNodeManager::HandleGetAllAvailableResources( GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); } -std::shared_ptr GcsNodeManager::GetNode( +absl::optional> GcsNodeManager::GetNode( const ray::NodeID &node_id) const { auto iter = alive_nodes_.find(node_id); if (iter == alive_nodes_.end()) { - return nullptr; + return {}; } return iter->second; diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index c94a26ebb..db4278a35 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -111,8 +111,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// Get alive node by ID. /// /// \param node_id The id of the node. - /// \return the node if it is alive else return nullptr. - std::shared_ptr GetNode(const NodeID &node_id) const; + /// \return the node if it is alive. Optional empty value if it is not alive. + absl::optional> GetNode(const NodeID &node_id) const; /// Get all alive nodes. /// diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index eb45b64b7..f5332d0d6 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -285,9 +285,14 @@ void GcsPlacementGroupScheduler::MarkScheduleCancelled( void GcsPlacementGroupScheduler::PrepareResources( const std::shared_ptr &bundle, - const std::shared_ptr &node, const StatusCallback &callback) { - const auto lease_client = GetLeaseClientFromNode(node); - const auto node_id = NodeID::FromBinary(node->node_id()); + const absl::optional> &node, + const StatusCallback &callback) { + if (!node.has_value()) { + callback(Status::NotFound("Node is already dead.")); + return; + } + const auto lease_client = GetLeaseClientFromNode(node.value()); + const auto node_id = NodeID::FromBinary(node.value()->node_id()); RAY_LOG(INFO) << "Preparing resource from node " << node_id << " for a bundle: " << bundle->DebugString(); lease_client->PrepareBundleResources( @@ -308,10 +313,11 @@ void GcsPlacementGroupScheduler::PrepareResources( void GcsPlacementGroupScheduler::CommitResources( const std::shared_ptr &bundle, - const std::shared_ptr &node, const StatusCallback callback) { - RAY_CHECK(node != nullptr); - const auto lease_client = GetLeaseClientFromNode(node); - const auto node_id = NodeID::FromBinary(node->node_id()); + const absl::optional> &node, + const StatusCallback callback) { + RAY_CHECK(node.has_value()); + const auto lease_client = GetLeaseClientFromNode(node.value()); + const auto node_id = NodeID::FromBinary(node.value()->node_id()); RAY_LOG(INFO) << "Committing resource to a node " << node_id << " for a bundle: " << bundle->DebugString(); lease_client->CommitBundleResources( @@ -331,17 +337,17 @@ void GcsPlacementGroupScheduler::CommitResources( void GcsPlacementGroupScheduler::CancelResourceReserve( const std::shared_ptr &bundle_spec, - const std::shared_ptr &node) { - if (node == nullptr) { + const absl::optional> &node) { + if (!node.has_value()) { RAY_LOG(INFO) << "Node for a placement group id " << bundle_spec->PlacementGroupId() << " and a bundle index, " << bundle_spec->Index() << " has already removed. Cancellation request will be ignored."; return; } - auto node_id = NodeID::FromBinary(node->node_id()); + auto node_id = NodeID::FromBinary(node.value()->node_id()); RAY_LOG(INFO) << "Cancelling the resource reserved for bundle: " << bundle_spec->DebugString() << " at node " << node_id; - const auto return_client = GetLeaseClientFromNode(node); + const auto return_client = GetLeaseClientFromNode(node.value()); return_client->CancelResourceReserve( *bundle_spec, [bundle_spec, node_id](const Status &status, const rpc::CancelResourceReserveReply &reply) { @@ -395,7 +401,7 @@ void GcsPlacementGroupScheduler::CommitAllBundles( } }; - if (node != nullptr) { + if (node.has_value()) { CommitResources(bundle, node, commit_resources_callback); } else { RAY_LOG(INFO) << "Failed to commit resources because the node is dead, node id = " @@ -612,7 +618,7 @@ BundleLocationIndex::GetBundleLocationsOnNode(const NodeID &node_id) { } void BundleLocationIndex::AddNodes( - const absl::flat_hash_map> &nodes) { + const absl::flat_hash_map> &nodes) { for (const auto &iter : nodes) { if (!node_to_leased_bundles_.contains(iter.first)) { node_to_leased_bundles_[iter.first] = std::make_shared(); diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index 2e8598ee5..0eb4d7fc7 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -317,7 +317,7 @@ class BundleLocationIndex { /// /// \param alive_nodes map of alive nodes. void AddNodes( - const absl::flat_hash_map> &nodes); + const absl::flat_hash_map> &nodes); private: /// Map from node ID to the set of bundles. This is used to lookup bundles at each node @@ -392,9 +392,10 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// \param bundle A bundle to schedule on a node. /// \param node A node to prepare resources for a given bundle. /// \param callback - void PrepareResources(const std::shared_ptr &bundle, - const std::shared_ptr &node, - const StatusCallback &callback); + void PrepareResources( + const std::shared_ptr &bundle, + const absl::optional> &node, + const StatusCallback &callback); /// Send a bundle COMMIT request to a node. This means the placement group creation /// is ready and GCS will commit resources on a given node. @@ -403,7 +404,7 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// \param node A node to commit resources for a given bundle. /// \param callback void CommitResources(const std::shared_ptr &bundle, - const std::shared_ptr &node, + const absl::optional> &node, const StatusCallback callback); /// Cacnel prepared or committed resources from a node. @@ -412,8 +413,9 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// /// \param bundle A description of the bundle to return. /// \param node The node that the worker will be returned for. - void CancelResourceReserve(const std::shared_ptr &bundle_spec, - const std::shared_ptr &node); + void CancelResourceReserve( + const std::shared_ptr &bundle_spec, + const absl::optional> &node); /// Get an existing lease client or connect a new one or connect a new one. std::shared_ptr GetOrConnectLeaseClient( diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc index a01f83b5e..7b54b410e 100644 --- a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc @@ -40,10 +40,10 @@ TEST_F(GcsNodeManagerTest, TestManagement) { auto node_id = NodeID::FromBinary(node->node_id()); node_manager.AddNode(node); - ASSERT_EQ(node, node_manager.GetNode(node_id)); + ASSERT_EQ(node, node_manager.GetNode(node_id).value()); node_manager.RemoveNode(node_id); - ASSERT_EQ(nullptr, node_manager.GetNode(node_id)); + ASSERT_TRUE(!node_manager.GetNode(node_id).has_value()); } TEST_F(GcsNodeManagerTest, TestListener) {