From 7d67af6c2a4d435e6492d359837127980021020f Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Thu, 19 Nov 2020 11:04:26 -0800 Subject: [PATCH] [Metrics] Add stats to measure process startup time + scheduling stats. (#12100) * Add new stats. * Fix issues. --- dashboard/dashboard.py | 5 +++- python/ray/tests/test_metrics_agent.py | 16 +++++++++++++ src/ray/raylet/main.cc | 2 ++ src/ray/raylet/node_manager.cc | 33 ++++++++++++++++++++++---- src/ray/raylet/node_manager.h | 18 ++++++++++++++ src/ray/raylet/worker_pool.cc | 7 +++++- src/ray/stats/metric_defs.h | 19 +++++++++++++++ 7 files changed, 94 insertions(+), 6 deletions(-) diff --git a/dashboard/dashboard.py b/dashboard/dashboard.py index 7c10d0c88..e5188fb97 100644 --- a/dashboard/dashboard.py +++ b/dashboard/dashboard.py @@ -20,7 +20,7 @@ import ray.new_dashboard.utils as dashboard_utils import ray.ray_constants as ray_constants import ray._private.services import ray.utils - +from ray.metrics_agent import PrometheusServiceDiscoveryWriter # Logger for this module. It should be configured at the entry point # into the program using Ray. Ray provides a default configuration at # entry/init points. @@ -194,6 +194,9 @@ if __name__ == "__main__": args.redis_address, redis_password=args.redis_password, log_dir=args.log_dir) + service_discovery = PrometheusServiceDiscoveryWriter( + args.redis_address, args.redis_password, args.temp_dir) + service_discovery.start() loop = asyncio.get_event_loop() loop.run_until_complete(dashboard.run()) except Exception as e: diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index 2c36a5f7d..408cd69f2 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -1,4 +1,5 @@ import json +import pathlib from pprint import pformat from unittest.mock import MagicMock @@ -7,6 +8,7 @@ import pytest from prometheus_client.parser import text_string_to_metric_families import ray +from ray.ray_constants import PROMETHEUS_SERVICE_DISCOVERY_FILE from ray.metrics_agent import PrometheusServiceDiscoveryWriter from ray.util.metrics import Count, Histogram, Gauge from ray.test_utils import wait_for_condition, SignalActor @@ -44,6 +46,20 @@ def test_prometheus_file_based_service_discovery(ray_start_cluster): loaded_json_data["targets"])) +def test_prome_file_discovery_run_by_dashboard(shutdown_only): + ray.init(num_cpus=0) + global_node = ray.worker._global_node + temp_dir = global_node.get_temp_dir_path() + + def is_service_discovery_exist(): + for path in pathlib.Path(temp_dir).iterdir(): + if PROMETHEUS_SERVICE_DISCOVERY_FILE in str(path): + return True + return False + + wait_for_condition(is_service_discovery_exist) + + @pytest.fixture def _setup_cluster_for_test(ray_start_cluster): NUM_NODES = 2 diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index cb0d38a5d..dfac0ee85 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -219,6 +219,8 @@ int main(int argc, char *argv[]) { RayConfig::instance().raylet_heartbeat_timeout_milliseconds(); node_manager_config.debug_dump_period_ms = RayConfig::instance().debug_dump_period_milliseconds(); + node_manager_config.record_metrics_period_ms = + RayConfig::instance().metrics_report_interval_ms() / 2; node_manager_config.fair_queueing_enabled = RayConfig::instance().fair_queueing_enabled(); node_manager_config.object_pinning_enabled = diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 960691938..014aad929 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -183,7 +183,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self }, on_objects_spilled), new_scheduler_enabled_(RayConfig::instance().new_scheduler_enabled()), - report_worker_backlog_(RayConfig::instance().report_worker_backlog()) { + report_worker_backlog_(RayConfig::instance().report_worker_backlog()), + record_metrics_period_(config.record_metrics_period_ms) { RAY_LOG(INFO) << "Initializing NodeManager with ID " << self_node_id_; RAY_CHECK(heartbeat_period_.count() > 0); // Initialize the resource map with own cluster resource configuration. @@ -516,11 +517,17 @@ void NodeManager::Heartbeat() { if (debug_dump_period_ > 0 && static_cast(now_ms - last_debug_dump_at_ms_) > debug_dump_period_) { DumpDebugState(); - RecordMetrics(); WarnResourceDeadlock(); last_debug_dump_at_ms_ = now_ms; } + if (record_metrics_period_ > 0 && + static_cast(now_ms - metrics_last_recorded_time_ms_) > + record_metrics_period_) { + RecordMetrics(); + metrics_last_recorded_time_ms_ = now_ms; + } + // Evict all copies of freed objects from the cluster. local_object_manager_.FlushFreeObjectsIfNeeded(now_ms); @@ -1684,6 +1691,7 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest Task task(task_message, backlog_size); bool is_actor_creation_task = task.GetTaskSpecification().IsActorCreationTask(); ActorID actor_id = ActorID::Nil(); + metrics_num_task_scheduled_ += 1; if (is_actor_creation_task) { actor_id = task.GetTaskSpecification().ActorCreationId(); @@ -1738,7 +1746,9 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest "cannot obtain the information of the leased worker, so we need to " "release the leased worker to avoid leakage."; leased_workers_.erase(worker_id); + metrics_num_task_executed_ -= 1; }; + metrics_num_task_executed_ += 1; send_reply_callback(Status::OK(), nullptr, reply_failure_handler); RAY_CHECK(leased_workers_.find(worker_id) == leased_workers_.end()) << "Worker is already leased out " << worker_id; @@ -1747,12 +1757,13 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest leased_workers_[worker_id] = worker; }); task.OnSpillbackInstead( - [reply, task_id, send_reply_callback](const NodeID &spillback_to, - const std::string &address, int port) { + [this, reply, task_id, send_reply_callback](const NodeID &spillback_to, + const std::string &address, int port) { RAY_LOG(DEBUG) << "Worker lease request SPILLBACK " << task_id; reply->mutable_retry_at_raylet_address()->set_ip_address(address); reply->mutable_retry_at_raylet_address()->set_port(port); reply->mutable_retry_at_raylet_address()->set_raylet_id(spillback_to.Binary()); + metrics_num_task_spilled_back_ += 1; send_reply_callback(Status::OK(), nullptr, nullptr); }); task.OnCancellationInstead([reply, task_id, send_reply_callback]() { @@ -3351,6 +3362,9 @@ void NodeManager::RecordMetrics() { if (stats::StatsConfig::instance().IsStatsDisabled()) { return; } + // Last recorded time will be reset in the caller side. + uint64_t current_time = current_time_ms(); + uint64_t duration_ms = current_time - metrics_last_recorded_time_ms_; // Record available resources of this node. const auto &available_resources = @@ -3367,6 +3381,17 @@ void NodeManager::RecordMetrics() { {{stats::ResourceNameKey, pair.first}}); } + // Record average number of tasks information per second. + stats::AvgNumScheduledTasks.Record((double)metrics_num_task_scheduled_ * + (1000.0 / (double)duration_ms)); + metrics_num_task_scheduled_ = 0; + stats::AvgNumExecutedTasks.Record((double)metrics_num_task_executed_ * + (1000.0 / (double)duration_ms)); + metrics_num_task_executed_ = 0; + stats::AvgNumSpilledBackTasks.Record((double)metrics_num_task_spilled_back_ * + (1000.0 / (double)duration_ms)); + metrics_num_task_spilled_back_ = 0; + object_manager_.RecordMetrics(); local_queues_.RecordMetrics(); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 02437de9e..9794d5d99 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -99,6 +99,8 @@ struct NodeManagerConfig { std::string session_dir; /// The raylet config list of this node. std::unordered_map raylet_config; + // The time between record metrics in milliseconds, or -1 to disable. + uint64_t record_metrics_period_ms; }; typedef std::pair BundleID; @@ -826,6 +828,22 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Save `BundleSpecification` for cleaning leaked bundles after GCS restart. absl::flat_hash_map, pair_hash> bundle_spec_map_; + + /// Fields that are used to report metrics. + /// The period between debug state dumps. + int64_t record_metrics_period_; + + /// Last time metrics are recorded. + uint64_t metrics_last_recorded_time_ms_; + + /// Number of tasks that are received and scheduled. + uint64_t metrics_num_task_scheduled_; + + /// Number of tasks that are executed at this node. + uint64_t metrics_num_task_executed_; + + /// Number of tasks that are spilled back to other nodes. + uint64_t metrics_num_task_spilled_back_; }; } // namespace raylet diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 02783f08f..8af2a9a8b 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -339,8 +339,13 @@ Process WorkerPool::StartWorkerProcess( for (const auto &pair : override_environment_variables) { env[pair.first] = pair.second; } - + // Start a process and measure the startup time. + auto start = std::chrono::high_resolution_clock::now(); Process proc = StartProcess(worker_command_args, env); + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(end - start); + stats::ProcessStartupTimeMs.Record(duration.count()); + RAY_LOG(DEBUG) << "Started worker process of " << workers_to_start << " worker(s) with pid " << proc.GetId(); MonitorStartingWorkerProcess(proc, language, worker_type); diff --git a/src/ray/stats/metric_defs.h b/src/ray/stats/metric_defs.h index 84e10d080..2c715706c 100644 --- a/src/ray/stats/metric_defs.h +++ b/src/ray/stats/metric_defs.h @@ -75,6 +75,25 @@ static Histogram HeartbeatReportMs( "heartbeats.", "ms", {100, 200, 400, 800, 1600, 3200, 6400, 15000, 30000}); +static Histogram ProcessStartupTimeMs("process_startup_time_ms", + "Time to start up a worker process.", "ms", + {1, 10, 100, 1000, 10000}); + +static Gauge AvgNumScheduledTasks( + "avg_num_scheduled_tasks", + "Number of tasks that are queued on this node per second. It doesn't guarantee " + "that the task will be executed in this node. If the task is executed, it is " + "recorded as avg_num_executed_tasks. If the task is not executed and needs to be " + "scheduled in other nodes, it will be recorded as avg_num_spilled_back_tasks", + "tasks"); + +static Gauge AvgNumExecutedTasks("avg_num_executed_tasks", + "Number of executed tasks on this node per second.", + "tasks"); + +static Gauge AvgNumSpilledBackTasks("avg_num_spilled_back_tasks", + "Number of spilled back tasks per second.", "tasks"); + /// /// GCS Server Metrics ///