From eb41c945a1f394a4edfee916a5db5409d55ea120 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Sat, 26 Oct 2019 16:37:39 -0700 Subject: [PATCH] Add gRPC endpoint to raylet to expose metrics (#6005) --- BUILD.bazel | 14 ++++- bazel/ray_deps_setup.bzl | 7 ++- ci/travis/install-dependencies.sh | 10 ++-- python/ray/tests/test_metrics.py | 53 +++++++++++++++++++ src/ray/protobuf/node_manager.proto | 16 ++++++ src/ray/raylet/node_manager.cc | 16 ++++++ src/ray/raylet/node_manager.h | 5 ++ src/ray/raylet/worker_pool.cc | 24 +++++++++ src/ray/raylet/worker_pool.h | 10 ++++ .../rpc/node_manager/node_manager_client.h | 7 +++ .../rpc/node_manager/node_manager_server.h | 24 ++++++++- 11 files changed, 174 insertions(+), 12 deletions(-) create mode 100644 python/ray/tests/test_metrics.py diff --git a/BUILD.bazel b/BUILD.bazel index 58760d239..d724e4dc8 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -3,6 +3,7 @@ load("@com_github_grpc_grpc//bazel:cc_grpc_library.bzl", "cc_grpc_library") load("@build_stack_rules_proto//python:python_proto_compile.bzl", "python_proto_compile") +load("@build_stack_rules_proto//python:python_grpc_compile.bzl", "python_grpc_compile") load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library") load("@//bazel:ray.bzl", "flatbuffer_py_library") load("@//bazel:cython_library.bzl", "pyx_library") @@ -22,7 +23,7 @@ cc_proto_library( deps = [":common_proto"], ) -python_proto_compile( +python_grpc_compile( name = "common_py_proto", deps = [":common_proto"], ) @@ -39,7 +40,7 @@ cc_proto_library( deps = [":gcs_proto"], ) -python_proto_compile( +python_grpc_compile( name = "gcs_py_proto", deps = [":gcs_proto"], ) @@ -55,6 +56,11 @@ cc_proto_library( deps = [":node_manager_proto"], ) +python_grpc_compile( + name = "node_manager_py_proto", + deps = [":node_manager_proto"], +) + proto_library( name = "object_manager_proto", srcs = ["src/ray/protobuf/object_manager.proto"], @@ -859,6 +865,7 @@ filegroup( srcs = [ "common_py_proto", "gcs_py_proto", + "node_manager_py_proto", ], ) @@ -895,6 +902,9 @@ genrule( # NOTE(hchen): Protobuf doesn't allow specifying Python package name. So we use this `sed` # command to change the import path in the generated file. sed -i -E 's/from src.ray.protobuf/from ./' $$WORK_DIR/python/ray/core/generated/gcs_pb2.py && + sed -i -E 's/from src.ray.protobuf/from ./' $$WORK_DIR/python/ray/core/generated/common_pb2.py && + sed -i -E 's/from src.ray.protobuf/from ./' $$WORK_DIR/python/ray/core/generated/node_manager_pb2.py && + sed -i -E 's/from src.ray.protobuf/from ./' $$WORK_DIR/python/ray/core/generated/node_manager_pb2_grpc.py && echo $$WORK_DIR > $@ """, local = 1, diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index d0740b0e7..4b4ad2696 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -122,9 +122,8 @@ def ray_deps_setup(): sha256 = "b5efbe086b9a00826a3f830094312e6d1647157b5a5e7954a8ac4179bce3de8b", ) - http_archive( + git_repository( name = "build_stack_rules_proto", - urls = ["https://github.com/stackb/rules_proto/archive/b93b544f851fdcd3fc5c3d47aee3b7ca158a8841.tar.gz"], - sha256 = "c62f0b442e82a6152fcd5b1c0b7c4028233a9e314078952b6b04253421d56d61", - strip_prefix = "rules_proto-b93b544f851fdcd3fc5c3d47aee3b7ca158a8841", + remote = "https://github.com/stackb/rules_proto.git", + commit = "d9a123032f8436dbc34069cfc3207f2810a494ee", ) diff --git a/ci/travis/install-dependencies.sh b/ci/travis/install-dependencies.sh index ed29836d1..3b30bf5b7 100755 --- a/ci/travis/install-dependencies.sh +++ b/ci/travis/install-dependencies.sh @@ -25,7 +25,8 @@ if [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "linux" ]]; then bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" pip install -q scipy tensorflow cython==0.29.0 gym opencv-python-headless pyyaml pandas==0.24.2 requests \ - feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky networkx tabulate psutil kubernetes + feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky networkx \ + tabulate psutil kubernetes grpcio elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then sudo apt-get update sudo apt-get install -y python-dev python-numpy build-essential curl unzip tmux gdb @@ -35,14 +36,15 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then export PATH="$HOME/miniconda/bin:$PATH" pip install -q scipy tensorflow cython==0.29.0 gym opencv-python-headless pyyaml pandas==0.24.2 requests \ feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp \ - uvicorn dataclasses pygments werkzeug kubernetes flask + uvicorn dataclasses pygments werkzeug kubernetes flask grpcio elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then # Install miniconda. wget https://repo.continuum.io/miniconda/Miniconda2-4.5.4-MacOSX-x86_64.sh -O miniconda.sh -nv bash miniconda.sh -b -p $HOME/miniconda export PATH="$HOME/miniconda/bin:$PATH" pip install -q cython==0.29.0 tensorflow gym opencv-python-headless pyyaml pandas==0.24.2 requests \ - feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky networkx tabulate psutil kubernetes + feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky networkx \ + tabulate psutil kubernetes grpcio elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then # Install miniconda. wget https://repo.continuum.io/miniconda/Miniconda3-4.5.4-MacOSX-x86_64.sh -O miniconda.sh -nv @@ -50,7 +52,7 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then export PATH="$HOME/miniconda/bin:$PATH" pip install -q cython==0.29.0 tensorflow gym opencv-python-headless pyyaml pandas==0.24.2 requests \ feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp \ - uvicorn dataclasses pygments werkzeug kubernetes flask + uvicorn dataclasses pygments werkzeug kubernetes flask grpcio elif [[ "$LINT" == "1" ]]; then sudo apt-get update sudo apt-get install -y build-essential curl unzip diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py new file mode 100644 index 000000000..b91c4faef --- /dev/null +++ b/python/ray/tests/test_metrics.py @@ -0,0 +1,53 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import grpc +import psutil +import time + +import ray +from ray.core.generated import node_manager_pb2 +from ray.core.generated import node_manager_pb2_grpc +from ray.tests.utils import RayTestTimeoutException + + +def test_worker_stats(ray_start_regular): + raylet = ray.nodes()[0] + num_cpus = raylet["Resources"]["CPU"] + raylet_address = "{}:{}".format(raylet["NodeManagerAddress"], + ray.nodes()[0]["NodeManagerPort"]) + + channel = grpc.insecure_channel(raylet_address) + stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) + reply = stub.GetNodeStats(node_manager_pb2.NodeStatsRequest()) + # Check that there is one connected driver. + drivers = [worker for worker in reply.workers_stats if worker.is_driver] + assert len(drivers) == 1 + assert os.getpid() == drivers[0].pid + + timeout_seconds = 20 + start_time = time.time() + while True: + if time.time() - start_time > timeout_seconds: + raise RayTestTimeoutException( + "Timed out while waiting for worker processes") + + # Wait for the workers to start. + if len(reply.workers_stats) < num_cpus + 1: + time.sleep(1) + reply = stub.GetNodeStats(node_manager_pb2.NodeStatsRequest()) + continue + + # Check that the rest of the processes are workers, 1 for each CPU. + assert len(reply.workers_stats) == num_cpus + 1 + # Check that all processes are Python. + pids = [worker.pid for worker in reply.workers_stats] + processes = [ + p.info["name"] for p in psutil.process_iter(attrs=["pid", "name"]) + if p.info["pid"] in pids + ] + for process in processes: + assert "python" in process or "ray" in process + break diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 59d2ce18f..bace666e6 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -15,8 +15,24 @@ message ForwardTaskRequest { message ForwardTaskReply { } +message NodeStatsRequest { +} + +message WorkerStats { + // PID of the worker process. + uint32 pid = 1; + // Whether this is a driver. + bool is_driver = 2; +} + +message NodeStatsReply { + repeated WorkerStats workers_stats = 1; +} + // Service for inter-node-manager communication. service NodeManagerService { // Forward a task and its uncommitted lineage to the remote node manager. rpc ForwardTask(ForwardTaskRequest) returns (ForwardTaskReply); + // Get the current node stats. + rpc GetNodeStats(NodeStatsRequest) returns (NodeStatsReply); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 9ccfd28e7..3942d3387 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2537,6 +2537,22 @@ std::string NodeManager::DebugString() const { return result.str(); } +void NodeManager::HandleNodeStatsRequest(const rpc::NodeStatsRequest &request, + rpc::NodeStatsReply *reply, + rpc::SendReplyCallback send_reply_callback) { + for (const auto &worker : worker_pool_.GetAllWorkers()) { + auto worker_stats = reply->add_workers_stats(); + worker_stats->set_pid(worker->Pid()); + worker_stats->set_is_driver(false); + } + for (const auto &driver : worker_pool_.GetAllDrivers()) { + auto worker_stats = reply->add_workers_stats(); + worker_stats->set_pid(driver->Pid()); + worker_stats->set_is_driver(true); + } + send_reply_callback(Status::OK(), nullptr, nullptr); +} + void NodeManager::RecordMetrics() const { if (stats::StatsConfig::instance().IsStatsDisabled()) { return; diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 2d9779d22..4185d240b 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -492,6 +492,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler { rpc::ForwardTaskReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Handle a `NodeStats` request. + void HandleNodeStatsRequest(const rpc::NodeStatsRequest &request, + rpc::NodeStatsReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Push an error to the driver if this node is full of actors and so we are /// unable to schedule new tasks or actors at all. void WarnResourceDeadlock(); diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 984c95736..606b46edb 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -389,6 +389,30 @@ std::vector> WorkerPool::GetWorkersRunningTasksForJob( return workers; } +const std::vector> WorkerPool::GetAllWorkers() const { + std::vector> workers; + + for (const auto &entry : states_by_lang_) { + for (const auto &worker : entry.second.registered_workers) { + workers.push_back(worker); + } + } + + return workers; +} + +const std::vector> WorkerPool::GetAllDrivers() const { + std::vector> drivers; + + for (const auto &entry : states_by_lang_) { + for (const auto &driver : entry.second.registered_drivers) { + drivers.push_back(driver); + } + } + + return drivers; +} + void WorkerPool::WarnAboutSize() { for (const auto &entry : states_by_lang_) { auto state = entry.second; diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index ff1703be7..ec3cc80ac 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -112,6 +112,16 @@ class WorkerPool { std::vector> GetWorkersRunningTasksForJob( const JobID &job_id) const; + /// Get all the workers. + /// + /// \return A list containing all the workers. + const std::vector> GetAllWorkers() const; + + /// Get all the drivers. + /// + /// \return A list containing all the drivers. + const std::vector> GetAllDrivers() const; + /// Whether there is a pending worker for the given task. /// Note that, this is only used for actor creation task with dynamic options. /// And if the worker registered but isn't assigned a task, diff --git a/src/ray/rpc/node_manager/node_manager_client.h b/src/ray/rpc/node_manager/node_manager_client.h index 005c75db4..8a26907ec 100644 --- a/src/ray/rpc/node_manager/node_manager_client.h +++ b/src/ray/rpc/node_manager/node_manager_client.h @@ -42,6 +42,13 @@ class NodeManagerClient { callback); } + /// Get current node stats. + void GetNodeStats(const ClientCallback &callback) { + NodeStatsRequest request; + client_call_manager_.CreateCall( + *stub_, &NodeManagerService::Stub::PrepareAsyncGetNodeStats, request, callback); + } + private: /// The gRPC-generated stub. std::unique_ptr stub_; diff --git a/src/ray/rpc/node_manager/node_manager_server.h b/src/ray/rpc/node_manager/node_manager_server.h index 76531f10e..08ebe9563 100644 --- a/src/ray/rpc/node_manager/node_manager_server.h +++ b/src/ray/rpc/node_manager/node_manager_server.h @@ -23,6 +23,17 @@ class NodeManagerServiceHandler { virtual void HandleForwardTask(const ForwardTaskRequest &request, ForwardTaskReply *reply, SendReplyCallback send_reply_callback) = 0; + + /// Handle a `GetNodeStats` request. + /// The implementation can handle this request asynchronously. When handling is done, + /// the `send_reply_callback` should be called. + /// + /// \param[in] request The request message. + /// \param[out] reply The reply message. + /// \param[in] send_reply_callback The callback to be called when the request is done. + virtual void HandleNodeStatsRequest(const NodeStatsRequest &request, + NodeStatsReply *reply, + SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `NodeManagerService`. @@ -43,7 +54,7 @@ class NodeManagerGrpcService : public GrpcService { const std::unique_ptr &cq, std::vector, int>> *server_call_factories_and_concurrencies) override { - // Initialize the factory for `ForwardTask` requests. + // Initialize the factory for requests. std::unique_ptr forward_task_call_factory( new ServerCallFactoryImpl( @@ -51,9 +62,18 @@ class NodeManagerGrpcService : public GrpcService { service_handler_, &NodeManagerServiceHandler::HandleForwardTask, cq, main_service_)); - // Set `ForwardTask`'s accept concurrency to 100. + std::unique_ptr node_stats_call_factory( + new ServerCallFactoryImpl( + service_, &NodeManagerService::AsyncService::RequestGetNodeStats, + service_handler_, &NodeManagerServiceHandler::HandleNodeStatsRequest, cq, + main_service_)); + + // Set accept concurrency. server_call_factories_and_concurrencies->emplace_back( std::move(forward_task_call_factory), 100); + server_call_factories_and_concurrencies->emplace_back( + std::move(node_stats_call_factory), 1); } private: