mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 19:16:19 +08:00
[GCS]Optimize gcs client nodes get function (#11424)
* [GCS]Optimize gcs client nodes get function * fix review comment Co-authored-by: 灵洵 <fengbin.ffb@antfin.com>
This commit is contained in:
@@ -399,10 +399,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
|
||||
|
||||
auto check_node_alive_fn = [this](const NodeID &node_id) {
|
||||
auto node = gcs_client_->Nodes().Get(node_id);
|
||||
if (!node) {
|
||||
return false;
|
||||
}
|
||||
return node->state() == rpc::GcsNodeInfo::ALIVE;
|
||||
return node.has_value();
|
||||
};
|
||||
auto reconstruct_object_callback = [this](const ObjectID &object_id) {
|
||||
io_service_.post([this, object_id]() {
|
||||
@@ -505,13 +502,11 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
|
||||
const auto &node_id = NodeID::FromBinary(loc.manager());
|
||||
auto node = gcs_client_->Nodes().Get(node_id);
|
||||
RAY_CHECK(node.has_value());
|
||||
if (node->state() == rpc::GcsNodeInfo::ALIVE) {
|
||||
rpc::Address address;
|
||||
address.set_raylet_id(node->node_id());
|
||||
address.set_ip_address(node->node_manager_address());
|
||||
address.set_port(node->node_manager_port());
|
||||
locations.push_back(address);
|
||||
}
|
||||
rpc::Address address;
|
||||
address.set_raylet_id(node->node_id());
|
||||
address.set_ip_address(node->node_manager_address());
|
||||
address.set_port(node->node_manager_port());
|
||||
locations.push_back(address);
|
||||
}
|
||||
callback(object_id, locations);
|
||||
});
|
||||
|
||||
@@ -495,9 +495,11 @@ class NodeInfoAccessor {
|
||||
/// is called before.
|
||||
///
|
||||
/// \param node_id The ID of node to look up in local cache.
|
||||
/// \return The item returned by GCS. If the item to read doesn't exist,
|
||||
/// this optional object is empty.
|
||||
virtual boost::optional<rpc::GcsNodeInfo> Get(const NodeID &node_id) const = 0;
|
||||
/// \param filter_dead_nodes Whether or not if this method will filter dead nodes.
|
||||
/// \return The item returned by GCS. If the item to read doesn't exist or the node is
|
||||
/// dead, this optional object is empty.
|
||||
virtual boost::optional<rpc::GcsNodeInfo> Get(const NodeID &node_id,
|
||||
bool filter_dead_nodes = true) const = 0;
|
||||
|
||||
/// Get information of all nodes from local cache.
|
||||
/// Non-thread safe.
|
||||
|
||||
@@ -558,10 +558,13 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToNodeChange(
|
||||
}
|
||||
|
||||
boost::optional<GcsNodeInfo> ServiceBasedNodeInfoAccessor::Get(
|
||||
const NodeID &node_id) const {
|
||||
const NodeID &node_id, bool filter_dead_nodes) const {
|
||||
RAY_CHECK(!node_id.IsNil());
|
||||
auto entry = node_cache_.find(node_id);
|
||||
if (entry != node_cache_.end()) {
|
||||
if (filter_dead_nodes && entry->second.state() == rpc::GcsNodeInfo::DEAD) {
|
||||
return boost::none;
|
||||
}
|
||||
return entry->second;
|
||||
}
|
||||
return boost::none;
|
||||
|
||||
@@ -166,7 +166,8 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor {
|
||||
const SubscribeCallback<NodeID, GcsNodeInfo> &subscribe,
|
||||
const StatusCallback &done) override;
|
||||
|
||||
boost::optional<GcsNodeInfo> Get(const NodeID &node_id) const override;
|
||||
boost::optional<GcsNodeInfo> Get(const NodeID &node_id,
|
||||
bool filter_dead_nodes = false) const override;
|
||||
|
||||
const std::unordered_map<NodeID, GcsNodeInfo> &GetAll() const override;
|
||||
|
||||
|
||||
@@ -341,7 +341,8 @@ struct GcsServerMocker {
|
||||
return Status::NotImplemented("");
|
||||
}
|
||||
|
||||
boost::optional<rpc::GcsNodeInfo> Get(const NodeID &node_id) const override {
|
||||
boost::optional<rpc::GcsNodeInfo> Get(const NodeID &node_id,
|
||||
bool filter_dead_nodes = true) const override {
|
||||
return boost::none;
|
||||
}
|
||||
|
||||
|
||||
@@ -564,7 +564,8 @@ Status RedisNodeInfoAccessor::AsyncGetAll(
|
||||
return client_table.Lookup(on_done);
|
||||
}
|
||||
|
||||
boost::optional<GcsNodeInfo> RedisNodeInfoAccessor::Get(const NodeID &node_id) const {
|
||||
boost::optional<GcsNodeInfo> RedisNodeInfoAccessor::Get(const NodeID &node_id,
|
||||
bool filter_dead_nodes) const {
|
||||
GcsNodeInfo node_info;
|
||||
ClientTable &client_table = client_impl_->client_table();
|
||||
bool found = client_table.GetClient(node_id, &node_info);
|
||||
|
||||
@@ -340,7 +340,8 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor {
|
||||
const SubscribeCallback<NodeID, GcsNodeInfo> &subscribe,
|
||||
const StatusCallback &done) override;
|
||||
|
||||
boost::optional<GcsNodeInfo> Get(const NodeID &node_id) const override;
|
||||
boost::optional<GcsNodeInfo> Get(const NodeID &node_id,
|
||||
bool filter_dead_nodes = true) const override;
|
||||
|
||||
const std::unordered_map<NodeID, GcsNodeInfo> &GetAll() const override;
|
||||
|
||||
|
||||
@@ -94,10 +94,8 @@ void ObjectDirectory::LookupRemoteConnectionInfo(
|
||||
if (node_info) {
|
||||
NodeID result_node_id = NodeID::FromBinary(node_info->node_id());
|
||||
RAY_CHECK(result_node_id == connection_info.client_id);
|
||||
if (node_info->state() == GcsNodeInfo::ALIVE) {
|
||||
connection_info.ip = node_info->node_manager_address();
|
||||
connection_info.port = static_cast<uint16_t>(node_info->object_manager_port());
|
||||
}
|
||||
connection_info.ip = node_info->node_manager_address();
|
||||
connection_info.port = static_cast<uint16_t>(node_info->object_manager_port());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user