diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index b91c4faef..c1c26334d 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -41,7 +41,11 @@ def test_worker_stats(ray_start_regular): continue # Check that the rest of the processes are workers, 1 for each CPU. + print(reply) assert len(reply.workers_stats) == num_cpus + 1 + views = [view.view_name for view in reply.view_data] + assert "redis_latency" in views + assert "local_available_resource" in views # Check that all processes are Python. pids = [worker.pid for worker in reply.workers_stats] processes = [ diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index bace666e6..6344f8010 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -25,8 +25,33 @@ message WorkerStats { bool is_driver = 2; } +message ViewData { + message Measure { + // A short string that describes the tags for this mesaure, e.g., + // "Tag1:Value1,Tag2:Value2,Tag3:Value3" + string tags = 1; + // Int64 type value (if present). + int64 int_value = 2; + // Double type value (if present). + double double_value = 3; + // Distribution type value (if present). + double distribution_min = 4; + double distribution_mean = 5; + double distribution_max = 6; + double distribution_count = 7; + repeated double distribution_bucket_boundaries = 8; + repeated double distribution_bucket_counts = 9; + } + + // The name of this Census view. + string view_name = 1; + // The list of measures recorded under this view. + repeated Measure measures = 2; +} + message NodeStatsReply { repeated WorkerStats workers_stats = 1; + repeated ViewData view_data = 2; } // Service for inter-node-manager communication. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index cd15034f7..347c006ea 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2579,6 +2579,21 @@ std::string NodeManager::DebugString() const { return result.str(); } +// Summarizes a Census view and tag values into a compact string, e.g., +// "Tag1:Value1,Tag2:Value2,Tag3:Value3". +std::string compact_tag_string(const opencensus::stats::ViewDescriptor &view, + const std::vector &values) { + std::stringstream result; + const auto &keys = view.columns(); + for (size_t i = 0; i < values.size(); i++) { + result << keys[i].name() << ":" << values[i]; + if (i < values.size() - 1) { + result << ","; + } + } + return result.str(); +} + void NodeManager::HandleNodeStatsRequest(const rpc::NodeStatsRequest &request, rpc::NodeStatsReply *reply, rpc::SendReplyCallback send_reply_callback) { @@ -2592,10 +2607,49 @@ void NodeManager::HandleNodeStatsRequest(const rpc::NodeStatsRequest &request, worker_stats->set_pid(driver->Pid()); worker_stats->set_is_driver(true); } + // Ensure we never report an empty set of metrics. + if (!recorded_metrics_) { + RecordMetrics(); + RAY_CHECK(recorded_metrics_); + } + for (const auto &view : opencensus::stats::StatsExporter::GetViewData()) { + auto view_data = reply->add_view_data(); + view_data->set_view_name(view.first.name()); + if (view.second.type() == opencensus::stats::ViewData::Type::kInt64) { + for (const auto &measure : view.second.int_data()) { + auto measure_data = view_data->add_measures(); + measure_data->set_tags(compact_tag_string(view.first, measure.first)); + measure_data->set_int_value(measure.second); + } + } else if (view.second.type() == opencensus::stats::ViewData::Type::kDouble) { + for (const auto &measure : view.second.double_data()) { + auto measure_data = view_data->add_measures(); + measure_data->set_tags(compact_tag_string(view.first, measure.first)); + measure_data->set_double_value(measure.second); + } + } else { + RAY_CHECK(view.second.type() == opencensus::stats::ViewData::Type::kDistribution); + for (const auto &measure : view.second.distribution_data()) { + auto measure_data = view_data->add_measures(); + measure_data->set_tags(compact_tag_string(view.first, measure.first)); + measure_data->set_distribution_min(measure.second.min()); + measure_data->set_distribution_mean(measure.second.mean()); + measure_data->set_distribution_max(measure.second.max()); + measure_data->set_distribution_count(measure.second.count()); + for (const auto &bound : measure.second.bucket_boundaries().lower_boundaries()) { + measure_data->add_distribution_bucket_boundaries(bound); + } + for (const auto &count : measure.second.bucket_counts()) { + measure_data->add_distribution_bucket_counts(count); + } + } + } + } send_reply_callback(Status::OK(), nullptr, nullptr); } -void NodeManager::RecordMetrics() const { +void NodeManager::RecordMetrics() { + recorded_metrics_ = true; if (stats::StatsConfig::instance().IsStatsDisabled()) { return; } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index fda201d26..c07340317 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -103,7 +103,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler { std::string DebugString() const; /// Record metrics. - void RecordMetrics() const; + void RecordMetrics(); /// Get the port of the node manager rpc server. int GetServerPort() const { return node_manager_server_.GetPort(); } @@ -531,6 +531,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler { bool fair_queueing_enabled_; /// Whether we have printed out a resource deadlock warning. bool resource_deadlock_warned_ = false; + /// Whether we have recorded any metrics yet. + bool recorded_metrics_ = false; /// The path to the ray temp dir. std::string temp_dir_; /// The timer used to get profiling information from the object manager and