mirror of
https://github.com/wassname/ray.git
synced 2026-07-02 04:08:08 +08:00
Report census view data as part of raylet node stats (#6060)
This commit is contained in:
@@ -41,7 +41,11 @@ def test_worker_stats(ray_start_regular):
|
||||
continue
|
||||
|
||||
# Check that the rest of the processes are workers, 1 for each CPU.
|
||||
print(reply)
|
||||
assert len(reply.workers_stats) == num_cpus + 1
|
||||
views = [view.view_name for view in reply.view_data]
|
||||
assert "redis_latency" in views
|
||||
assert "local_available_resource" in views
|
||||
# Check that all processes are Python.
|
||||
pids = [worker.pid for worker in reply.workers_stats]
|
||||
processes = [
|
||||
|
||||
@@ -25,8 +25,33 @@ message WorkerStats {
|
||||
bool is_driver = 2;
|
||||
}
|
||||
|
||||
message ViewData {
|
||||
message Measure {
|
||||
// A short string that describes the tags for this mesaure, e.g.,
|
||||
// "Tag1:Value1,Tag2:Value2,Tag3:Value3"
|
||||
string tags = 1;
|
||||
// Int64 type value (if present).
|
||||
int64 int_value = 2;
|
||||
// Double type value (if present).
|
||||
double double_value = 3;
|
||||
// Distribution type value (if present).
|
||||
double distribution_min = 4;
|
||||
double distribution_mean = 5;
|
||||
double distribution_max = 6;
|
||||
double distribution_count = 7;
|
||||
repeated double distribution_bucket_boundaries = 8;
|
||||
repeated double distribution_bucket_counts = 9;
|
||||
}
|
||||
|
||||
// The name of this Census view.
|
||||
string view_name = 1;
|
||||
// The list of measures recorded under this view.
|
||||
repeated Measure measures = 2;
|
||||
}
|
||||
|
||||
message NodeStatsReply {
|
||||
repeated WorkerStats workers_stats = 1;
|
||||
repeated ViewData view_data = 2;
|
||||
}
|
||||
|
||||
// Service for inter-node-manager communication.
|
||||
|
||||
@@ -2579,6 +2579,21 @@ std::string NodeManager::DebugString() const {
|
||||
return result.str();
|
||||
}
|
||||
|
||||
// Summarizes a Census view and tag values into a compact string, e.g.,
|
||||
// "Tag1:Value1,Tag2:Value2,Tag3:Value3".
|
||||
std::string compact_tag_string(const opencensus::stats::ViewDescriptor &view,
|
||||
const std::vector<std::string> &values) {
|
||||
std::stringstream result;
|
||||
const auto &keys = view.columns();
|
||||
for (size_t i = 0; i < values.size(); i++) {
|
||||
result << keys[i].name() << ":" << values[i];
|
||||
if (i < values.size() - 1) {
|
||||
result << ",";
|
||||
}
|
||||
}
|
||||
return result.str();
|
||||
}
|
||||
|
||||
void NodeManager::HandleNodeStatsRequest(const rpc::NodeStatsRequest &request,
|
||||
rpc::NodeStatsReply *reply,
|
||||
rpc::SendReplyCallback send_reply_callback) {
|
||||
@@ -2592,10 +2607,49 @@ void NodeManager::HandleNodeStatsRequest(const rpc::NodeStatsRequest &request,
|
||||
worker_stats->set_pid(driver->Pid());
|
||||
worker_stats->set_is_driver(true);
|
||||
}
|
||||
// Ensure we never report an empty set of metrics.
|
||||
if (!recorded_metrics_) {
|
||||
RecordMetrics();
|
||||
RAY_CHECK(recorded_metrics_);
|
||||
}
|
||||
for (const auto &view : opencensus::stats::StatsExporter::GetViewData()) {
|
||||
auto view_data = reply->add_view_data();
|
||||
view_data->set_view_name(view.first.name());
|
||||
if (view.second.type() == opencensus::stats::ViewData::Type::kInt64) {
|
||||
for (const auto &measure : view.second.int_data()) {
|
||||
auto measure_data = view_data->add_measures();
|
||||
measure_data->set_tags(compact_tag_string(view.first, measure.first));
|
||||
measure_data->set_int_value(measure.second);
|
||||
}
|
||||
} else if (view.second.type() == opencensus::stats::ViewData::Type::kDouble) {
|
||||
for (const auto &measure : view.second.double_data()) {
|
||||
auto measure_data = view_data->add_measures();
|
||||
measure_data->set_tags(compact_tag_string(view.first, measure.first));
|
||||
measure_data->set_double_value(measure.second);
|
||||
}
|
||||
} else {
|
||||
RAY_CHECK(view.second.type() == opencensus::stats::ViewData::Type::kDistribution);
|
||||
for (const auto &measure : view.second.distribution_data()) {
|
||||
auto measure_data = view_data->add_measures();
|
||||
measure_data->set_tags(compact_tag_string(view.first, measure.first));
|
||||
measure_data->set_distribution_min(measure.second.min());
|
||||
measure_data->set_distribution_mean(measure.second.mean());
|
||||
measure_data->set_distribution_max(measure.second.max());
|
||||
measure_data->set_distribution_count(measure.second.count());
|
||||
for (const auto &bound : measure.second.bucket_boundaries().lower_boundaries()) {
|
||||
measure_data->add_distribution_bucket_boundaries(bound);
|
||||
}
|
||||
for (const auto &count : measure.second.bucket_counts()) {
|
||||
measure_data->add_distribution_bucket_counts(count);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
send_reply_callback(Status::OK(), nullptr, nullptr);
|
||||
}
|
||||
|
||||
void NodeManager::RecordMetrics() const {
|
||||
void NodeManager::RecordMetrics() {
|
||||
recorded_metrics_ = true;
|
||||
if (stats::StatsConfig::instance().IsStatsDisabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -103,7 +103,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
|
||||
std::string DebugString() const;
|
||||
|
||||
/// Record metrics.
|
||||
void RecordMetrics() const;
|
||||
void RecordMetrics();
|
||||
|
||||
/// Get the port of the node manager rpc server.
|
||||
int GetServerPort() const { return node_manager_server_.GetPort(); }
|
||||
@@ -531,6 +531,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
|
||||
bool fair_queueing_enabled_;
|
||||
/// Whether we have printed out a resource deadlock warning.
|
||||
bool resource_deadlock_warned_ = false;
|
||||
/// Whether we have recorded any metrics yet.
|
||||
bool recorded_metrics_ = false;
|
||||
/// The path to the ray temp dir.
|
||||
std::string temp_dir_;
|
||||
/// The timer used to get profiling information from the object manager and
|
||||
|
||||
Reference in New Issue
Block a user