From 23b6fdcda110de4b1dcae1a254b8d9d2c613f21f Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 25 Mar 2020 16:31:31 -0700 Subject: [PATCH] `ray memory` should collect statistics from all nodes (#7721) --- python/ray/tests/BUILD | 2 +- python/ray/tests/test_memstat.py | 31 ++++++++++++++++++- src/ray/raylet/node_manager.cc | 51 ++++++++++++++++++++++---------- 3 files changed, 66 insertions(+), 18 deletions(-) diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index f94f6304d..3cfa7a18b 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -221,7 +221,7 @@ py_test( py_test( name = "test_global_gc", - size = "small", + size = "medium", srcs = ["test_global_gc.py"], tags = ["exclusive"], deps = ["//:ray_lib"], diff --git a/python/ray/tests/test_memstat.py b/python/ray/tests/test_memstat.py index 3a69329cf..2c8db67fa 100644 --- a/python/ray/tests/test_memstat.py +++ b/python/ray/tests/test_memstat.py @@ -1,6 +1,8 @@ -import ray import numpy as np import time + +import ray +from ray.cluster_utils import Cluster from ray.internal.internal_api import memory_summary # Unique strings. @@ -189,6 +191,33 @@ def test_pinned_object_call_site(ray_start_regular): assert num_objects(info) == 0, info +def test_multi_node_stats(shutdown_only): + cluster = Cluster() + for _ in range(2): + cluster.add_node(num_cpus=1) + + ray.init(address=cluster.address) + + @ray.remote(num_cpus=1) + class Actor: + def __init__(self): + self.ref = ray.put(np.zeros(100000)) + + def ping(self): + pass + + # Each actor will be on a different node. + a = Actor.remote() + b = Actor.remote() + ray.get(a.ping.remote()) + ray.get(b.ping.remote()) + + # Verify we have collected stats across the nodes. + info = memory_summary() + print(info) + assert count(info, PUT_OBJ) == 2, info + + if __name__ == "__main__": import pytest import sys diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 35b901b2d..85320376d 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -3380,12 +3380,11 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_ } } -std::string FormatMemoryInfo( - std::vector> node_stats) { +std::string FormatMemoryInfo(std::vector node_stats) { // First pass to compute object sizes. absl::flat_hash_map object_sizes; for (const auto &reply : node_stats) { - for (const auto &worker_stats : reply->workers_stats()) { + for (const auto &worker_stats : reply.workers_stats()) { for (const auto &object_ref : worker_stats.core_worker_stats().object_refs()) { auto obj_id = ObjectID::FromBinary(object_ref.object_id()); if (object_ref.object_size() > 0) { @@ -3408,7 +3407,7 @@ std::string FormatMemoryInfo( // Second pass builds the summary string for each node. for (const auto &reply : node_stats) { - for (const auto &worker_stats : reply->workers_stats()) { + for (const auto &worker_stats : reply.workers_stats()) { bool pid_printed = false; for (const auto &object_ref : worker_stats.core_worker_stats().object_refs()) { if (!object_ref.pinned_in_memory() && object_ref.local_ref_count() == 0 && @@ -3459,23 +3458,43 @@ std::string FormatMemoryInfo( void NodeManager::HandleFormatGlobalMemoryInfo( const rpc::FormatGlobalMemoryInfoRequest &request, rpc::FormatGlobalMemoryInfoReply *reply, rpc::SendReplyCallback send_reply_callback) { - std::vector> replies; - + auto replies = std::make_shared>(); auto local_request = std::make_shared(); auto local_reply = std::make_shared(); local_request->set_include_memory_info(true); - // TODO(ekl): for (const auto &entry : remote_node_manager_clients_) {} - // to handle remote nodes + unsigned int num_nodes = remote_node_manager_clients_.size() + 1; + rpc::GetNodeStatsRequest stats_req; + stats_req.set_include_memory_info(true); - HandleGetNodeStats(*local_request, local_reply.get(), - [local_request, local_reply, replies, reply, send_reply_callback]( - Status status, std::function success, - std::function failure) mutable { - replies.push_back(local_reply); - reply->set_memory_summary(FormatMemoryInfo(replies)); - send_reply_callback(Status::OK(), nullptr, nullptr); - }); + auto store_reply = [replies, reply, num_nodes, + send_reply_callback](const rpc::GetNodeStatsReply &local_reply) { + replies->push_back(local_reply); + if (replies->size() >= num_nodes) { + reply->set_memory_summary(FormatMemoryInfo(*replies)); + send_reply_callback(Status::OK(), nullptr, nullptr); + } + }; + + // Fetch from remote nodes. + for (const auto &entry : remote_node_manager_clients_) { + entry.second->GetNodeStats( + stats_req, [replies, store_reply](const ray::Status &status, + const rpc::GetNodeStatsReply &r) { + if (!status.ok()) { + RAY_LOG(ERROR) << "Failed to get remote node stats: " << status.ToString(); + } + store_reply(r); + }); + } + + // Fetch from the local node. + HandleGetNodeStats( + stats_req, local_reply.get(), + [local_reply, store_reply](Status status, std::function success, + std::function failure) mutable { + store_reply(*local_reply); + }); } void NodeManager::HandleGlobalGC(const rpc::GlobalGCRequest &request,