From 55a090fb1685df7ad789c6b87f35cdd404d715f2 Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Wed, 28 Oct 2020 12:13:19 +0800 Subject: [PATCH] [GCS]Optimize gcs client nodes get function (#11424) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [GCS]Optimize gcs client nodes get function * fix review comment Co-authored-by: 灵洵 --- src/ray/core_worker/core_worker.cc | 17 ++++++----------- src/ray/gcs/accessor.h | 8 +++++--- .../gcs/gcs_client/service_based_accessor.cc | 5 ++++- src/ray/gcs/gcs_client/service_based_accessor.h | 3 ++- .../gcs/gcs_server/test/gcs_server_test_util.h | 3 ++- src/ray/gcs/redis_accessor.cc | 3 ++- src/ray/gcs/redis_accessor.h | 3 ++- src/ray/object_manager/object_directory.cc | 6 ++---- 8 files changed, 25 insertions(+), 23 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 5421ff8e1..6ba2cc23b 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -399,10 +399,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ auto check_node_alive_fn = [this](const NodeID &node_id) { auto node = gcs_client_->Nodes().Get(node_id); - if (!node) { - return false; - } - return node->state() == rpc::GcsNodeInfo::ALIVE; + return node.has_value(); }; auto reconstruct_object_callback = [this](const ObjectID &object_id) { io_service_.post([this, object_id]() { @@ -505,13 +502,11 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ const auto &node_id = NodeID::FromBinary(loc.manager()); auto node = gcs_client_->Nodes().Get(node_id); RAY_CHECK(node.has_value()); - if (node->state() == rpc::GcsNodeInfo::ALIVE) { - rpc::Address address; - address.set_raylet_id(node->node_id()); - address.set_ip_address(node->node_manager_address()); - address.set_port(node->node_manager_port()); - locations.push_back(address); - } + rpc::Address address; + address.set_raylet_id(node->node_id()); + address.set_ip_address(node->node_manager_address()); + address.set_port(node->node_manager_port()); + locations.push_back(address); } callback(object_id, locations); }); diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 96436e6bd..e8b55ac20 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -495,9 +495,11 @@ class NodeInfoAccessor { /// is called before. /// /// \param node_id The ID of node to look up in local cache. - /// \return The item returned by GCS. If the item to read doesn't exist, - /// this optional object is empty. - virtual boost::optional Get(const NodeID &node_id) const = 0; + /// \param filter_dead_nodes Whether or not if this method will filter dead nodes. + /// \return The item returned by GCS. If the item to read doesn't exist or the node is + /// dead, this optional object is empty. + virtual boost::optional Get(const NodeID &node_id, + bool filter_dead_nodes = true) const = 0; /// Get information of all nodes from local cache. /// Non-thread safe. diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 649642864..dbd958936 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -558,10 +558,13 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToNodeChange( } boost::optional ServiceBasedNodeInfoAccessor::Get( - const NodeID &node_id) const { + const NodeID &node_id, bool filter_dead_nodes) const { RAY_CHECK(!node_id.IsNil()); auto entry = node_cache_.find(node_id); if (entry != node_cache_.end()) { + if (filter_dead_nodes && entry->second.state() == rpc::GcsNodeInfo::DEAD) { + return boost::none; + } return entry->second; } return boost::none; diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index c1ce2a453..2618e74dc 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -166,7 +166,8 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor { const SubscribeCallback &subscribe, const StatusCallback &done) override; - boost::optional Get(const NodeID &node_id) const override; + boost::optional Get(const NodeID &node_id, + bool filter_dead_nodes = false) const override; const std::unordered_map &GetAll() const override; diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index e004a17ca..c6134e08e 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -341,7 +341,8 @@ struct GcsServerMocker { return Status::NotImplemented(""); } - boost::optional Get(const NodeID &node_id) const override { + boost::optional Get(const NodeID &node_id, + bool filter_dead_nodes = true) const override { return boost::none; } diff --git a/src/ray/gcs/redis_accessor.cc b/src/ray/gcs/redis_accessor.cc index 9231ee508..06f001342 100644 --- a/src/ray/gcs/redis_accessor.cc +++ b/src/ray/gcs/redis_accessor.cc @@ -564,7 +564,8 @@ Status RedisNodeInfoAccessor::AsyncGetAll( return client_table.Lookup(on_done); } -boost::optional RedisNodeInfoAccessor::Get(const NodeID &node_id) const { +boost::optional RedisNodeInfoAccessor::Get(const NodeID &node_id, + bool filter_dead_nodes) const { GcsNodeInfo node_info; ClientTable &client_table = client_impl_->client_table(); bool found = client_table.GetClient(node_id, &node_info); diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index d7b3d49e5..17f3d0a00 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -340,7 +340,8 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor { const SubscribeCallback &subscribe, const StatusCallback &done) override; - boost::optional Get(const NodeID &node_id) const override; + boost::optional Get(const NodeID &node_id, + bool filter_dead_nodes = true) const override; const std::unordered_map &GetAll() const override; diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index 6ef217a1f..869347530 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -94,10 +94,8 @@ void ObjectDirectory::LookupRemoteConnectionInfo( if (node_info) { NodeID result_node_id = NodeID::FromBinary(node_info->node_id()); RAY_CHECK(result_node_id == connection_info.client_id); - if (node_info->state() == GcsNodeInfo::ALIVE) { - connection_info.ip = node_info->node_manager_address(); - connection_info.port = static_cast(node_info->object_manager_port()); - } + connection_info.ip = node_info->node_manager_address(); + connection_info.port = static_cast(node_info->object_manager_port()); } }