Add gRPC endpoint to raylet to expose metrics (#6005)

This commit is contained in:
Stephanie Wang
2019-10-26 16:37:39 -07:00
committed by Philipp Moritz
parent 010270b3dc
commit eb41c945a1
11 changed files with 174 additions and 12 deletions
+12 -2
View File
@@ -3,6 +3,7 @@
load("@com_github_grpc_grpc//bazel:cc_grpc_library.bzl", "cc_grpc_library")
load("@build_stack_rules_proto//python:python_proto_compile.bzl", "python_proto_compile")
load("@build_stack_rules_proto//python:python_grpc_compile.bzl", "python_grpc_compile")
load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
load("@//bazel:ray.bzl", "flatbuffer_py_library")
load("@//bazel:cython_library.bzl", "pyx_library")
@@ -22,7 +23,7 @@ cc_proto_library(
deps = [":common_proto"],
)
python_proto_compile(
python_grpc_compile(
name = "common_py_proto",
deps = [":common_proto"],
)
@@ -39,7 +40,7 @@ cc_proto_library(
deps = [":gcs_proto"],
)
python_proto_compile(
python_grpc_compile(
name = "gcs_py_proto",
deps = [":gcs_proto"],
)
@@ -55,6 +56,11 @@ cc_proto_library(
deps = [":node_manager_proto"],
)
python_grpc_compile(
name = "node_manager_py_proto",
deps = [":node_manager_proto"],
)
proto_library(
name = "object_manager_proto",
srcs = ["src/ray/protobuf/object_manager.proto"],
@@ -859,6 +865,7 @@ filegroup(
srcs = [
"common_py_proto",
"gcs_py_proto",
"node_manager_py_proto",
],
)
@@ -895,6 +902,9 @@ genrule(
# NOTE(hchen): Protobuf doesn't allow specifying Python package name. So we use this `sed`
# command to change the import path in the generated file.
sed -i -E 's/from src.ray.protobuf/from ./' $$WORK_DIR/python/ray/core/generated/gcs_pb2.py &&
sed -i -E 's/from src.ray.protobuf/from ./' $$WORK_DIR/python/ray/core/generated/common_pb2.py &&
sed -i -E 's/from src.ray.protobuf/from ./' $$WORK_DIR/python/ray/core/generated/node_manager_pb2.py &&
sed -i -E 's/from src.ray.protobuf/from ./' $$WORK_DIR/python/ray/core/generated/node_manager_pb2_grpc.py &&
echo $$WORK_DIR > $@
""",
local = 1,
+3 -4
View File
@@ -122,9 +122,8 @@ def ray_deps_setup():
sha256 = "b5efbe086b9a00826a3f830094312e6d1647157b5a5e7954a8ac4179bce3de8b",
)
http_archive(
git_repository(
name = "build_stack_rules_proto",
urls = ["https://github.com/stackb/rules_proto/archive/b93b544f851fdcd3fc5c3d47aee3b7ca158a8841.tar.gz"],
sha256 = "c62f0b442e82a6152fcd5b1c0b7c4028233a9e314078952b6b04253421d56d61",
strip_prefix = "rules_proto-b93b544f851fdcd3fc5c3d47aee3b7ca158a8841",
remote = "https://github.com/stackb/rules_proto.git",
commit = "d9a123032f8436dbc34069cfc3207f2810a494ee",
)
+6 -4
View File
@@ -25,7 +25,8 @@ if [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "linux" ]]; then
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
pip install -q scipy tensorflow cython==0.29.0 gym opencv-python-headless pyyaml pandas==0.24.2 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky networkx tabulate psutil kubernetes
feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky networkx \
tabulate psutil kubernetes grpcio
elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then
sudo apt-get update
sudo apt-get install -y python-dev python-numpy build-essential curl unzip tmux gdb
@@ -35,14 +36,15 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "linux" ]]; then
export PATH="$HOME/miniconda/bin:$PATH"
pip install -q scipy tensorflow cython==0.29.0 gym opencv-python-headless pyyaml pandas==0.24.2 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp \
uvicorn dataclasses pygments werkzeug kubernetes flask
uvicorn dataclasses pygments werkzeug kubernetes flask grpcio
elif [[ "$PYTHON" == "2.7" ]] && [[ "$platform" == "macosx" ]]; then
# Install miniconda.
wget https://repo.continuum.io/miniconda/Miniconda2-4.5.4-MacOSX-x86_64.sh -O miniconda.sh -nv
bash miniconda.sh -b -p $HOME/miniconda
export PATH="$HOME/miniconda/bin:$PATH"
pip install -q cython==0.29.0 tensorflow gym opencv-python-headless pyyaml pandas==0.24.2 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky networkx tabulate psutil kubernetes
feather-format lxml openpyxl xlrd py-spy setproctitle faulthandler pytest-timeout mock flaky networkx \
tabulate psutil kubernetes grpcio
elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then
# Install miniconda.
wget https://repo.continuum.io/miniconda/Miniconda3-4.5.4-MacOSX-x86_64.sh -O miniconda.sh -nv
@@ -50,7 +52,7 @@ elif [[ "$PYTHON" == "3.5" ]] && [[ "$platform" == "macosx" ]]; then
export PATH="$HOME/miniconda/bin:$PATH"
pip install -q cython==0.29.0 tensorflow gym opencv-python-headless pyyaml pandas==0.24.2 requests \
feather-format lxml openpyxl xlrd py-spy setproctitle pytest-timeout flaky networkx tabulate psutil aiohttp \
uvicorn dataclasses pygments werkzeug kubernetes flask
uvicorn dataclasses pygments werkzeug kubernetes flask grpcio
elif [[ "$LINT" == "1" ]]; then
sudo apt-get update
sudo apt-get install -y build-essential curl unzip
+53
View File
@@ -0,0 +1,53 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import grpc
import psutil
import time
import ray
from ray.core.generated import node_manager_pb2
from ray.core.generated import node_manager_pb2_grpc
from ray.tests.utils import RayTestTimeoutException
def test_worker_stats(ray_start_regular):
raylet = ray.nodes()[0]
num_cpus = raylet["Resources"]["CPU"]
raylet_address = "{}:{}".format(raylet["NodeManagerAddress"],
ray.nodes()[0]["NodeManagerPort"])
channel = grpc.insecure_channel(raylet_address)
stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel)
reply = stub.GetNodeStats(node_manager_pb2.NodeStatsRequest())
# Check that there is one connected driver.
drivers = [worker for worker in reply.workers_stats if worker.is_driver]
assert len(drivers) == 1
assert os.getpid() == drivers[0].pid
timeout_seconds = 20
start_time = time.time()
while True:
if time.time() - start_time > timeout_seconds:
raise RayTestTimeoutException(
"Timed out while waiting for worker processes")
# Wait for the workers to start.
if len(reply.workers_stats) < num_cpus + 1:
time.sleep(1)
reply = stub.GetNodeStats(node_manager_pb2.NodeStatsRequest())
continue
# Check that the rest of the processes are workers, 1 for each CPU.
assert len(reply.workers_stats) == num_cpus + 1
# Check that all processes are Python.
pids = [worker.pid for worker in reply.workers_stats]
processes = [
p.info["name"] for p in psutil.process_iter(attrs=["pid", "name"])
if p.info["pid"] in pids
]
for process in processes:
assert "python" in process or "ray" in process
break
+16
View File
@@ -15,8 +15,24 @@ message ForwardTaskRequest {
message ForwardTaskReply {
}
message NodeStatsRequest {
}
message WorkerStats {
// PID of the worker process.
uint32 pid = 1;
// Whether this is a driver.
bool is_driver = 2;
}
message NodeStatsReply {
repeated WorkerStats workers_stats = 1;
}
// Service for inter-node-manager communication.
service NodeManagerService {
// Forward a task and its uncommitted lineage to the remote node manager.
rpc ForwardTask(ForwardTaskRequest) returns (ForwardTaskReply);
// Get the current node stats.
rpc GetNodeStats(NodeStatsRequest) returns (NodeStatsReply);
}
+16
View File
@@ -2537,6 +2537,22 @@ std::string NodeManager::DebugString() const {
return result.str();
}
void NodeManager::HandleNodeStatsRequest(const rpc::NodeStatsRequest &request,
rpc::NodeStatsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
for (const auto &worker : worker_pool_.GetAllWorkers()) {
auto worker_stats = reply->add_workers_stats();
worker_stats->set_pid(worker->Pid());
worker_stats->set_is_driver(false);
}
for (const auto &driver : worker_pool_.GetAllDrivers()) {
auto worker_stats = reply->add_workers_stats();
worker_stats->set_pid(driver->Pid());
worker_stats->set_is_driver(true);
}
send_reply_callback(Status::OK(), nullptr, nullptr);
}
void NodeManager::RecordMetrics() const {
if (stats::StatsConfig::instance().IsStatsDisabled()) {
return;
+5
View File
@@ -492,6 +492,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
rpc::ForwardTaskReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `NodeStats` request.
void HandleNodeStatsRequest(const rpc::NodeStatsRequest &request,
rpc::NodeStatsReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Push an error to the driver if this node is full of actors and so we are
/// unable to schedule new tasks or actors at all.
void WarnResourceDeadlock();
+24
View File
@@ -389,6 +389,30 @@ std::vector<std::shared_ptr<Worker>> WorkerPool::GetWorkersRunningTasksForJob(
return workers;
}
const std::vector<std::shared_ptr<Worker>> WorkerPool::GetAllWorkers() const {
std::vector<std::shared_ptr<Worker>> workers;
for (const auto &entry : states_by_lang_) {
for (const auto &worker : entry.second.registered_workers) {
workers.push_back(worker);
}
}
return workers;
}
const std::vector<std::shared_ptr<Worker>> WorkerPool::GetAllDrivers() const {
std::vector<std::shared_ptr<Worker>> drivers;
for (const auto &entry : states_by_lang_) {
for (const auto &driver : entry.second.registered_drivers) {
drivers.push_back(driver);
}
}
return drivers;
}
void WorkerPool::WarnAboutSize() {
for (const auto &entry : states_by_lang_) {
auto state = entry.second;
+10
View File
@@ -112,6 +112,16 @@ class WorkerPool {
std::vector<std::shared_ptr<Worker>> GetWorkersRunningTasksForJob(
const JobID &job_id) const;
/// Get all the workers.
///
/// \return A list containing all the workers.
const std::vector<std::shared_ptr<Worker>> GetAllWorkers() const;
/// Get all the drivers.
///
/// \return A list containing all the drivers.
const std::vector<std::shared_ptr<Worker>> GetAllDrivers() const;
/// Whether there is a pending worker for the given task.
/// Note that, this is only used for actor creation task with dynamic options.
/// And if the worker registered but isn't assigned a task,
@@ -42,6 +42,13 @@ class NodeManagerClient {
callback);
}
/// Get current node stats.
void GetNodeStats(const ClientCallback<NodeStatsReply> &callback) {
NodeStatsRequest request;
client_call_manager_.CreateCall<NodeManagerService, NodeStatsRequest, NodeStatsReply>(
*stub_, &NodeManagerService::Stub::PrepareAsyncGetNodeStats, request, callback);
}
private:
/// The gRPC-generated stub.
std::unique_ptr<NodeManagerService::Stub> stub_;
+22 -2
View File
@@ -23,6 +23,17 @@ class NodeManagerServiceHandler {
virtual void HandleForwardTask(const ForwardTaskRequest &request,
ForwardTaskReply *reply,
SendReplyCallback send_reply_callback) = 0;
/// Handle a `GetNodeStats` request.
/// The implementation can handle this request asynchronously. When handling is done,
/// the `send_reply_callback` should be called.
///
/// \param[in] request The request message.
/// \param[out] reply The reply message.
/// \param[in] send_reply_callback The callback to be called when the request is done.
virtual void HandleNodeStatsRequest(const NodeStatsRequest &request,
NodeStatsReply *reply,
SendReplyCallback send_reply_callback) = 0;
};
/// The `GrpcService` for `NodeManagerService`.
@@ -43,7 +54,7 @@ class NodeManagerGrpcService : public GrpcService {
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
// Initialize the factory for `ForwardTask` requests.
// Initialize the factory for requests.
std::unique_ptr<ServerCallFactory> forward_task_call_factory(
new ServerCallFactoryImpl<NodeManagerService, NodeManagerServiceHandler,
ForwardTaskRequest, ForwardTaskReply>(
@@ -51,9 +62,18 @@ class NodeManagerGrpcService : public GrpcService {
service_handler_, &NodeManagerServiceHandler::HandleForwardTask, cq,
main_service_));
// Set `ForwardTask`'s accept concurrency to 100.
std::unique_ptr<ServerCallFactory> node_stats_call_factory(
new ServerCallFactoryImpl<NodeManagerService, NodeManagerServiceHandler,
NodeStatsRequest, NodeStatsReply>(
service_, &NodeManagerService::AsyncService::RequestGetNodeStats,
service_handler_, &NodeManagerServiceHandler::HandleNodeStatsRequest, cq,
main_service_));
// Set accept concurrency.
server_call_factories_and_concurrencies->emplace_back(
std::move(forward_task_call_factory), 100);
server_call_factories_and_concurrencies->emplace_back(
std::move(node_stats_call_factory), 1);
}
private: