[Core] Use optional return instead of nullptr for the GetNode method.

This commit is contained in:
SangBin Cho
2020-10-02 20:54:26 -07:00
committed by GitHub
parent 6288d954b5
commit 6974cea0cd
6 changed files with 37 additions and 27 deletions
@@ -277,7 +277,9 @@ void GcsActorScheduler::HandleWorkerLeasedReply(
// node, and then try again on the new node.
RAY_CHECK(!retry_at_raylet_address.raylet_id().empty());
auto spill_back_node_id = NodeID::FromBinary(retry_at_raylet_address.raylet_id());
if (auto spill_back_node = gcs_node_manager_.GetNode(spill_back_node_id)) {
auto maybe_spill_back_node = gcs_node_manager_.GetNode(spill_back_node_id);
if (maybe_spill_back_node.has_value()) {
auto spill_back_node = maybe_spill_back_node.value();
actor->UpdateAddress(retry_at_raylet_address);
RAY_CHECK(node_to_actors_when_leasing_[actor->GetNodeID()]
.emplace(actor->GetActorID())
+2 -2
View File
@@ -367,11 +367,11 @@ void GcsNodeManager::HandleGetAllAvailableResources(
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}
std::shared_ptr<rpc::GcsNodeInfo> GcsNodeManager::GetNode(
absl::optional<std::shared_ptr<rpc::GcsNodeInfo>> GcsNodeManager::GetNode(
const ray::NodeID &node_id) const {
auto iter = alive_nodes_.find(node_id);
if (iter == alive_nodes_.end()) {
return nullptr;
return {};
}
return iter->second;
+2 -2
View File
@@ -111,8 +111,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
/// Get alive node by ID.
///
/// \param node_id The id of the node.
/// \return the node if it is alive else return nullptr.
std::shared_ptr<rpc::GcsNodeInfo> GetNode(const NodeID &node_id) const;
/// \return the node if it is alive. Optional empty value if it is not alive.
absl::optional<std::shared_ptr<rpc::GcsNodeInfo>> GetNode(const NodeID &node_id) const;
/// Get all alive nodes.
///
@@ -285,9 +285,14 @@ void GcsPlacementGroupScheduler::MarkScheduleCancelled(
void GcsPlacementGroupScheduler::PrepareResources(
const std::shared_ptr<BundleSpecification> &bundle,
const std::shared_ptr<ray::rpc::GcsNodeInfo> &node, const StatusCallback &callback) {
const auto lease_client = GetLeaseClientFromNode(node);
const auto node_id = NodeID::FromBinary(node->node_id());
const absl::optional<std::shared_ptr<ray::rpc::GcsNodeInfo>> &node,
const StatusCallback &callback) {
if (!node.has_value()) {
callback(Status::NotFound("Node is already dead."));
return;
}
const auto lease_client = GetLeaseClientFromNode(node.value());
const auto node_id = NodeID::FromBinary(node.value()->node_id());
RAY_LOG(INFO) << "Preparing resource from node " << node_id
<< " for a bundle: " << bundle->DebugString();
lease_client->PrepareBundleResources(
@@ -308,10 +313,11 @@ void GcsPlacementGroupScheduler::PrepareResources(
void GcsPlacementGroupScheduler::CommitResources(
const std::shared_ptr<BundleSpecification> &bundle,
const std::shared_ptr<ray::rpc::GcsNodeInfo> &node, const StatusCallback callback) {
RAY_CHECK(node != nullptr);
const auto lease_client = GetLeaseClientFromNode(node);
const auto node_id = NodeID::FromBinary(node->node_id());
const absl::optional<std::shared_ptr<ray::rpc::GcsNodeInfo>> &node,
const StatusCallback callback) {
RAY_CHECK(node.has_value());
const auto lease_client = GetLeaseClientFromNode(node.value());
const auto node_id = NodeID::FromBinary(node.value()->node_id());
RAY_LOG(INFO) << "Committing resource to a node " << node_id
<< " for a bundle: " << bundle->DebugString();
lease_client->CommitBundleResources(
@@ -331,17 +337,17 @@ void GcsPlacementGroupScheduler::CommitResources(
void GcsPlacementGroupScheduler::CancelResourceReserve(
const std::shared_ptr<BundleSpecification> &bundle_spec,
const std::shared_ptr<ray::rpc::GcsNodeInfo> &node) {
if (node == nullptr) {
const absl::optional<std::shared_ptr<ray::rpc::GcsNodeInfo>> &node) {
if (!node.has_value()) {
RAY_LOG(INFO) << "Node for a placement group id " << bundle_spec->PlacementGroupId()
<< " and a bundle index, " << bundle_spec->Index()
<< " has already removed. Cancellation request will be ignored.";
return;
}
auto node_id = NodeID::FromBinary(node->node_id());
auto node_id = NodeID::FromBinary(node.value()->node_id());
RAY_LOG(INFO) << "Cancelling the resource reserved for bundle: "
<< bundle_spec->DebugString() << " at node " << node_id;
const auto return_client = GetLeaseClientFromNode(node);
const auto return_client = GetLeaseClientFromNode(node.value());
return_client->CancelResourceReserve(
*bundle_spec, [bundle_spec, node_id](const Status &status,
const rpc::CancelResourceReserveReply &reply) {
@@ -395,7 +401,7 @@ void GcsPlacementGroupScheduler::CommitAllBundles(
}
};
if (node != nullptr) {
if (node.has_value()) {
CommitResources(bundle, node, commit_resources_callback);
} else {
RAY_LOG(INFO) << "Failed to commit resources because the node is dead, node id = "
@@ -612,7 +618,7 @@ BundleLocationIndex::GetBundleLocationsOnNode(const NodeID &node_id) {
}
void BundleLocationIndex::AddNodes(
const absl::flat_hash_map<NodeID, std::shared_ptr<rpc::GcsNodeInfo>> &nodes) {
const absl::flat_hash_map<NodeID, std::shared_ptr<ray::rpc::GcsNodeInfo>> &nodes) {
for (const auto &iter : nodes) {
if (!node_to_leased_bundles_.contains(iter.first)) {
node_to_leased_bundles_[iter.first] = std::make_shared<BundleLocations>();
@@ -317,7 +317,7 @@ class BundleLocationIndex {
///
/// \param alive_nodes map of alive nodes.
void AddNodes(
const absl::flat_hash_map<NodeID, std::shared_ptr<rpc::GcsNodeInfo>> &nodes);
const absl::flat_hash_map<NodeID, std::shared_ptr<ray::rpc::GcsNodeInfo>> &nodes);
private:
/// Map from node ID to the set of bundles. This is used to lookup bundles at each node
@@ -392,9 +392,10 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// \param bundle A bundle to schedule on a node.
/// \param node A node to prepare resources for a given bundle.
/// \param callback
void PrepareResources(const std::shared_ptr<BundleSpecification> &bundle,
const std::shared_ptr<ray::rpc::GcsNodeInfo> &node,
const StatusCallback &callback);
void PrepareResources(
const std::shared_ptr<BundleSpecification> &bundle,
const absl::optional<std::shared_ptr<ray::rpc::GcsNodeInfo>> &node,
const StatusCallback &callback);
/// Send a bundle COMMIT request to a node. This means the placement group creation
/// is ready and GCS will commit resources on a given node.
@@ -403,7 +404,7 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// \param node A node to commit resources for a given bundle.
/// \param callback
void CommitResources(const std::shared_ptr<BundleSpecification> &bundle,
const std::shared_ptr<ray::rpc::GcsNodeInfo> &node,
const absl::optional<std::shared_ptr<ray::rpc::GcsNodeInfo>> &node,
const StatusCallback callback);
/// Cacnel prepared or committed resources from a node.
@@ -412,8 +413,9 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
///
/// \param bundle A description of the bundle to return.
/// \param node The node that the worker will be returned for.
void CancelResourceReserve(const std::shared_ptr<BundleSpecification> &bundle_spec,
const std::shared_ptr<ray::rpc::GcsNodeInfo> &node);
void CancelResourceReserve(
const std::shared_ptr<BundleSpecification> &bundle_spec,
const absl::optional<std::shared_ptr<ray::rpc::GcsNodeInfo>> &node);
/// Get an existing lease client or connect a new one or connect a new one.
std::shared_ptr<ResourceReserveInterface> GetOrConnectLeaseClient(
@@ -40,10 +40,10 @@ TEST_F(GcsNodeManagerTest, TestManagement) {
auto node_id = NodeID::FromBinary(node->node_id());
node_manager.AddNode(node);
ASSERT_EQ(node, node_manager.GetNode(node_id));
ASSERT_EQ(node, node_manager.GetNode(node_id).value());
node_manager.RemoveNode(node_id);
ASSERT_EQ(nullptr, node_manager.GetNode(node_id));
ASSERT_TRUE(!node_manager.GetNode(node_id).has_value());
}
TEST_F(GcsNodeManagerTest, TestListener) {