[Stats] metrics agent exporter (#9361)

This commit is contained in:
SangBin Cho
2020-07-14 09:49:16 -07:00
committed by GitHub
parent 5b192842b5
commit f6eb47fc1f
18 changed files with 247 additions and 61 deletions
+31
View File
@@ -77,6 +77,11 @@ proto_library(
deps = [":common_proto"],
)
cc_proto_library(
name = "reporter_cc_proto",
deps = [":reporter_proto"],
)
python_grpc_compile(
name = "reporter_py_proto",
deps = [":reporter_proto"],
@@ -252,6 +257,31 @@ cc_library(
],
)
# Metrics Agent gRPC lib.
cc_grpc_library(
name = "reporter_cc_grpc",
srcs = [":reporter_proto"],
grpc_only = True,
deps = [":reporter_cc_proto"],
)
# Metrics Agent client.
cc_library(
name = "reporter_rpc",
hdrs = glob([
"src/ray/rpc/metrics_agent_client.h",
]),
copts = COPTS,
deps = [
":grpc_common_lib",
":ray_common",
":reporter_cc_grpc",
"@boost//:asio",
"@boost//:thread",
"@com_github_grpc_grpc//:grpc++",
],
)
# === End of rpc definitions ===
# === Begin of plasma definitions ===
@@ -576,6 +606,7 @@ cc_library(
}),
deps = [
":ray_util",
":reporter_rpc",
"@com_github_jupp0r_prometheus_cpp//pull",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/memory",
+3
View File
@@ -108,6 +108,7 @@ class Node:
include_log_monitor=True,
resources={},
temp_dir=ray.utils.get_ray_temp_dir(),
metrics_agent_port=self._get_unused_port()[0],
worker_path=os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"workers/default_worker.py"))
@@ -554,6 +555,7 @@ class Node:
open_log(reporter_err_name))
process_info = ray.services.start_reporter(
self.redis_address,
self._ray_params.metrics_agent_port,
stdout_file=stdout_file,
stderr_file=stderr_file,
redis_password=self._ray_params.redis_password,
@@ -661,6 +663,7 @@ class Node:
self._ray_params.max_worker_port,
self._ray_params.object_manager_port,
self._ray_params.redis_password,
self._ray_params.metrics_agent_port,
use_valgrind=use_valgrind,
use_profiler=use_profiler,
stdout_file=stdout_file,
+3
View File
@@ -87,6 +87,7 @@ class RayParams:
Java worker.
java_worker_options (list): The command options for Java worker.
load_code_from_local: Whether load code from local file or from GCS.
metrics_agent_port(int): The port to bind metrics agent.
_internal_config (str): JSON configuration for overriding
RayConfig defaults. For testing purposes ONLY.
lru_evict (bool): Enable LRU eviction if space is needed.
@@ -132,6 +133,7 @@ class RayParams:
java_worker_options=None,
load_code_from_local=False,
_internal_config=None,
metrics_agent_port=None,
lru_evict=False):
self.object_ref_seed = object_ref_seed
self.redis_address = redis_address
@@ -169,6 +171,7 @@ class RayParams:
self.include_java = include_java
self.java_worker_options = java_worker_options
self.load_code_from_local = load_code_from_local
self.metrics_agent_port = metrics_agent_port
self._internal_config = _internal_config
self._lru_evict = lru_evict
self._check_usage()
+14 -3
View File
@@ -55,6 +55,10 @@ class ReporterServer(reporter_pb2_grpc.ReporterServiceServicer):
return reporter_pb2.GetProfilingStatsReply(
profiling_stats=profiling_stats, stdout=stdout, stderr=stderr)
def ReportMetrics(self, request, context):
# TODO(sang): Process metrics here.
return reporter_pb2.ReportMetricsReply()
def recursive_asdict(o):
if isinstance(o, tuple) and hasattr(o, "_asdict"):
@@ -94,11 +98,12 @@ class Reporter:
redis_client: A client used to communicate with the Redis server.
"""
def __init__(self, redis_address, redis_password=None):
def __init__(self, redis_address, port, redis_password=None):
"""Initialize the reporter object."""
self.cpu_counts = (psutil.cpu_count(), psutil.cpu_count(logical=False))
self.ip = ray.services.get_node_ip_address()
self.hostname = platform.node()
self.port = port
_ = psutil.cpu_percent() # For initialization
@@ -225,7 +230,7 @@ class Reporter:
server = grpc.server(thread_pool, options=(("grpc.so_reuseport", 0), ))
reporter_pb2_grpc.add_ReporterServiceServicer_to_server(
ReporterServer(), server)
port = server.add_insecure_port("[::]:0")
port = server.add_insecure_port("[::]:{}".format(self.port))
server.start()
self.redis_client.set("REPORTER_PORT:{}".format(self.ip), port)
"""Run the reporter."""
@@ -248,6 +253,11 @@ if __name__ == "__main__":
required=True,
type=str,
help="The address to use for Redis.")
parser.add_argument(
"--port",
required=True,
type=int,
help="The port to bind the reporter process.")
parser.add_argument(
"--redis-password",
required=False,
@@ -270,7 +280,8 @@ if __name__ == "__main__":
args = parser.parse_args()
ray.utils.setup_logger(args.logging_level, args.logging_format)
reporter = Reporter(args.redis_address, redis_password=args.redis_password)
reporter = Reporter(
args.redis_address, args.port, redis_password=args.redis_password)
try:
reporter.run()
+7 -4
View File
@@ -1065,6 +1065,7 @@ def start_log_monitor(redis_address,
def start_reporter(redis_address,
port,
stdout_file=None,
stderr_file=None,
redis_password=None,
@@ -1073,6 +1074,7 @@ def start_reporter(redis_address,
Args:
redis_address (str): The address of the Redis instance.
port(int): The port to bind the reporter process.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
@@ -1085,10 +1087,8 @@ def start_reporter(redis_address,
reporter_filepath = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "reporter.py")
command = [
sys.executable,
"-u",
reporter_filepath,
"--redis-address={}".format(redis_address),
sys.executable, "-u", reporter_filepath,
"--redis-address={}".format(redis_address), "--port={}".format(port)
]
if redis_password:
command += ["--redis-password", redis_password]
@@ -1249,6 +1249,7 @@ def start_raylet(redis_address,
max_worker_port=None,
object_manager_port=None,
redis_password=None,
metrics_agent_port=None,
use_valgrind=False,
use_profiler=False,
stdout_file=None,
@@ -1284,6 +1285,7 @@ def start_raylet(redis_address,
max_worker_port (int): The highest port number that workers will bind
on. If set, min_worker_port must also be set.
redis_password: The password to use when connecting to Redis.
metrics_agent_port(int): The port where metrics agent is bound to.
use_valgrind (bool): True if the raylet should be started inside
of valgrind. If this is True, use_profiler must be False.
use_profiler (bool): True if the raylet should be started inside
@@ -1390,6 +1392,7 @@ def start_raylet(redis_address,
"--redis_password={}".format(redis_password or ""),
"--temp_dir={}".format(temp_dir),
"--session_dir={}".format(session_dir),
"--metrics-agent-port={}".format(metrics_agent_port),
]
if config.get("plasma_store_as_thread"):
# command related to the plasma store
+6
View File
@@ -316,5 +316,11 @@ RAY_CONFIG(bool, gcs_actor_service_enabled,
getenv("RAY_GCS_ACTOR_SERVICE_ENABLED") != nullptr &&
getenv("RAY_GCS_ACTOR_SERVICE_ENABLED") == std::string("true"))
/// The batch size for metrics export.
RAY_CONFIG(int64_t, metrics_report_batch_size, 100)
/// Whether or not we enable metrics collection.
RAY_CONFIG(int64_t, enable_metrics_collection, true)
/// Whether start the Plasma Store as a Raylet thread.
RAY_CONFIG(bool, put_small_object_in_memory_store, false)
-1
View File
@@ -294,7 +294,6 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
// NOTE(edoakes): any initialization depending on RayConfig must happen after this line.
RayConfig::instance().initialize(internal_config);
// Start RPC server after all the task receivers are properly initialized and we have
// our assigned port from the raylet.
core_worker_server_ = std::unique_ptr<rpc::GrpcServer>(
+7
View File
@@ -332,3 +332,10 @@ message CoreWorkerStats {
// Local reference table.
repeated ObjectRefInfo object_refs = 17;
}
message MetricPoint {
string metric_name = 1;
int64 timestamp = 2;
double value = 3;
map<string, string> tags = 4;
}
+11
View File
@@ -16,6 +16,8 @@ syntax = "proto3";
package ray.rpc;
import "src/ray/protobuf/common.proto";
message GetProfilingStatsRequest {
// PID of the worker process.
uint32 pid = 1;
@@ -32,8 +34,17 @@ message GetProfilingStatsReply {
string stderr = 3;
}
message ReportMetricsRequest {
repeated MetricPoint metrics_point = 1;
}
message ReportMetricsReply {
}
// Service for communicating with the reporter.py process on a remote node.
service ReporterService {
// Get the profiling stats.
rpc GetProfilingStats(GetProfilingStatsRequest) returns (GetProfilingStatsReply);
// Report metrics to the local metrics agents.
rpc ReportMetrics(ReportMetricsRequest) returns (ReportMetricsReply);
}
+9 -14
View File
@@ -27,6 +27,7 @@ DEFINE_string(raylet_socket_name, "", "The socket name of raylet.");
DEFINE_string(store_socket_name, "", "The socket name of object store.");
DEFINE_int32(object_manager_port, -1, "The port of object manager.");
DEFINE_int32(node_manager_port, -1, "The port of node manager.");
DEFINE_int32(metrics_agent_port, -1, "The port of metrics agent.");
DEFINE_string(node_ip_address, "", "The ip address of this node.");
DEFINE_string(redis_address, "", "The ip address of redis server.");
DEFINE_int32(redis_port, -1, "The port of redis server.");
@@ -43,10 +44,6 @@ DEFINE_string(java_worker_command, "", "Java worker command.");
DEFINE_string(redis_password, "", "The password of redis.");
DEFINE_string(temp_dir, "", "Temporary directory.");
DEFINE_string(session_dir, "", "The path of this ray session directory.");
DEFINE_bool(disable_stats, false, "Whether disable the stats.");
DEFINE_string(stat_address, "127.0.0.1:8888", "The address that we report metrics to.");
DEFINE_bool(enable_stdout_exporter, false,
"Whether enable the stdout exporter for stats.");
DEFINE_bool(head_node, false, "Whether this is the head node of the cluster.");
// store options
DEFINE_int64(object_store_memory, -1, "The initial memory of the object store.");
@@ -67,6 +64,7 @@ int main(int argc, char *argv[]) {
const std::string store_socket_name = FLAGS_store_socket_name;
const int object_manager_port = static_cast<int>(FLAGS_object_manager_port);
const int node_manager_port = static_cast<int>(FLAGS_node_manager_port);
const int metrics_agent_port = static_cast<int>(FLAGS_metrics_agent_port);
const std::string node_ip_address = FLAGS_node_ip_address;
const std::string redis_address = FLAGS_redis_address;
const int redis_port = static_cast<int>(FLAGS_redis_port);
@@ -82,22 +80,12 @@ int main(int argc, char *argv[]) {
const std::string redis_password = FLAGS_redis_password;
const std::string temp_dir = FLAGS_temp_dir;
const std::string session_dir = FLAGS_session_dir;
const bool disable_stats = FLAGS_disable_stats;
const std::string stat_address = FLAGS_stat_address;
const bool enable_stdout_exporter = FLAGS_enable_stdout_exporter;
const bool head_node = FLAGS_head_node;
const int64_t object_store_memory = FLAGS_object_store_memory;
const std::string plasma_directory = FLAGS_plasma_directory;
const bool huge_pages = FLAGS_huge_pages;
gflags::ShutDownCommandLineFlags();
// Initialize stats.
const ray::stats::TagsType global_tags = {
{ray::stats::JobNameKey, "raylet"},
{ray::stats::VersionKey, "0.9.0.dev0"},
{ray::stats::NodeAddressKey, node_ip_address}};
ray::stats::Init(stat_address, global_tags, disable_stats, enable_stdout_exporter);
// Configuration for the node manager.
ray::raylet::NodeManagerConfig node_manager_config;
std::unordered_map<std::string, double> static_resource_conf;
@@ -236,6 +224,13 @@ int main(int argc, char *argv[]) {
server->Start();
}));
// Initialize stats.
const ray::stats::TagsType global_tags = {
{ray::stats::JobNameKey, "raylet"},
{ray::stats::VersionKey, "0.9.0.dev0"},
{ray::stats::NodeAddressKey, node_ip_address}};
ray::stats::Init(global_tags, metrics_agent_port, main_service);
// Destroy the Raylet on a SIGTERM. The pointer to main_service is
// guaranteed to be valid since this function will run the event loop
// instead of returning immediately.
+60
View File
@@ -0,0 +1,60 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <grpcpp/grpcpp.h>
#include <thread>
#include "ray/common/status.h"
#include "ray/protobuf/reporter.grpc.pb.h"
#include "ray/protobuf/reporter.pb.h"
#include "ray/rpc/grpc_client.h"
#include "ray/util/logging.h"
namespace ray {
namespace rpc {
/// Client used for communicating with a remote node manager server.
class MetricsAgentClient {
public:
/// Constructor.
///
/// \param[in] address Address of the metrics agent server.
/// \param[in] port Port of the metrics agent server.
/// \param[in] client_call_manager The `ClientCallManager` used for managing requests.
MetricsAgentClient(const std::string &address, const int port,
ClientCallManager &client_call_manager)
: client_call_manager_(client_call_manager) {
grpc_client_ = std::unique_ptr<GrpcClient<ReporterService>>(
new GrpcClient<ReporterService>(address, port, client_call_manager));
};
/// Report metrics to metrics agent.
///
/// \param[in] request The request message.
/// \param[in] callback The callback function that handles reply.
VOID_RPC_CLIENT_METHOD(ReporterService, ReportMetrics, grpc_client_, )
private:
/// The RPC client.
std::unique_ptr<GrpcClient<ReporterService>> grpc_client_;
/// The `ClientCallManager` used for managing requests.
ClientCallManager &client_call_manager_;
};
} // namespace rpc
} // namespace ray
+5 -2
View File
@@ -17,10 +17,8 @@
#include <memory>
#include <unordered_map>
#include "opencensus/exporters/stats/prometheus/prometheus_exporter.h"
#include "opencensus/stats/stats.h"
#include "opencensus/tags/tag_key.h"
#include "prometheus/exposer.h"
#include "ray/util/logging.h"
@@ -35,12 +33,16 @@ class StatsConfig final {
public:
static StatsConfig &instance();
/// Set the global tags that will be appended to all metrics in this process.
void SetGlobalTags(const TagsType &global_tags);
/// Get the current global tags.
const TagsType &GetGlobalTags() const;
/// Set if the stats are enabled in this process.
void SetIsDisableStats(bool disable_stats);
/// Get whether or not stats are enabled.
bool IsStatsDisabled() const;
void SetReportInterval(const absl::Duration interval);
@@ -59,6 +61,7 @@ class StatsConfig final {
private:
TagsType global_tags_;
/// If true, don't collect metrics in this process.
bool is_stats_disabled_ = true;
// Regular reporting interval for all reporters.
absl::Duration report_interval_ = absl::Seconds(10);
+4 -4
View File
@@ -35,7 +35,9 @@ class MetricExporter final : public opencensus::stats::StatsExporter::Handler {
size_t report_batch_size = kDefaultBatchSize)
: metric_exporter_client_(metric_exporter_client),
report_batch_size_(report_batch_size) {}
~MetricExporter() = default;
static void Register(std::shared_ptr<MetricExporterClient> metric_exporter_client,
size_t report_batch_size) {
opencensus::stats::StatsExporter::RegisterPushHandler(
@@ -63,10 +65,8 @@ class MetricExporter final : public opencensus::stats::StatsExporter::Handler {
tags[keys[i]] = row.first[i];
}
// Current timestamp is used for point not view data time.
MetricPoint point{metric_name,
current_sys_time_ms(),
static_cast<double>(row.second),
tags};
MetricPoint point{metric_name, current_sys_time_ms(),
static_cast<double>(row.second), tags};
RAY_LOG(DEBUG) << "Metric name " << metric_name << ", value " << point.value;
points.push_back(std::move(point));
if (points.size() >= report_batch_size_) {
+36
View File
@@ -19,10 +19,16 @@
namespace ray {
namespace stats {
///
/// Stdout Exporter
///
void StdoutExporterClient::ReportMetrics(const std::vector<MetricPoint> &points) {
RAY_LOG(DEBUG) << "Metric point size : " << points.size();
}
///
/// Metrics Exporter Decorator
///
MetricExporterDecorator::MetricExporterDecorator(
std::shared_ptr<MetricExporterClient> exporter)
: exporter_(exporter) {}
@@ -32,5 +38,35 @@ void MetricExporterDecorator::ReportMetrics(const std::vector<MetricPoint> &poin
exporter_->ReportMetrics(points);
}
}
///
/// Metrics Agent Exporter
///
MetricsAgentExporter::MetricsAgentExporter(std::shared_ptr<MetricExporterClient> exporter,
const int port,
boost::asio::io_service &io_service,
const std::string address)
: MetricExporterDecorator(exporter), client_call_manager_(io_service) {
client_.reset(new rpc::MetricsAgentClient(address, port, client_call_manager_));
}
void MetricsAgentExporter::ReportMetrics(const std::vector<MetricPoint> &points) {
MetricExporterDecorator::ReportMetrics(points);
rpc::ReportMetricsRequest request;
for (auto point : points) {
auto metric_point = request.add_metrics_point();
metric_point->set_metric_name(point.metric_name);
metric_point->set_timestamp(point.timestamp);
metric_point->set_value(point.value);
auto mutable_tags = metric_point->mutable_tags();
for (auto &tag : point.tags) {
(*mutable_tags)[tag.first] = tag.second;
}
}
// TODO(sang): Should retry metrics report if it fails.
client_->ReportMetrics(request, nullptr);
}
} // namespace stats
} // namespace ray
+17 -8
View File
@@ -14,6 +14,10 @@
#pragma once
#include <boost/asio.hpp>
#include "ray/rpc/client_call.h"
#include "ray/rpc/metrics_agent_client.h"
#include "ray/stats/metric.h"
namespace ray {
@@ -51,15 +55,20 @@ class MetricExporterDecorator : public MetricExporterClient {
std::shared_ptr<MetricExporterClient> exporter_;
};
class OpentsdbExporterClient : public MetricExporterDecorator {
class MetricsAgentExporter : public MetricExporterDecorator {
public:
OpentsdbExporterClient(std::shared_ptr<MetricExporterClient> exporter)
: MetricExporterDecorator(exporter) {}
void ReportMetrics(const std::vector<MetricPoint> &points) override {
MetricExporterDecorator::ReportMetrics(points);
// TODO(lingxuan.zlx): opentsdb client is used for report to backend
// storage.
}
MetricsAgentExporter(std::shared_ptr<MetricExporterClient> exporter, const int port,
boost::asio::io_service &io_service, const std::string address);
~MetricsAgentExporter() {}
void ReportMetrics(const std::vector<MetricPoint> &points) override;
private:
/// Client to call a metrics agent gRPC server.
std::unique_ptr<rpc::MetricsAgentClient> client_;
/// Call Manager for gRPC client.
rpc::ClientCallManager client_call_manager_;
};
} // namespace stats
+5 -2
View File
@@ -122,17 +122,20 @@ class MetricExporterClientTest : public ::testing::Test {
absl::Duration harvest_interval = absl::Milliseconds(kReportFlushInterval / 2);
ray::stats::StatsConfig::instance().SetReportInterval(report_interval);
ray::stats::StatsConfig::instance().SetHarvestInterval(harvest_interval);
ray::stats::Init("127.0.0.1:8888", global_tags, false);
std::shared_ptr<MetricExporterClient> exporter(new stats::StdoutExporterClient());
std::shared_ptr<MetricExporterClient> mock1(new MockExporterClient1(exporter));
std::shared_ptr<MetricExporterClient> mock2(new MockExporterClient2(mock1));
MetricExporter::Register(mock2, kMockReportBatchSize);
ray::stats::Init(global_tags, 10054, io_service_, mock2, kMockReportBatchSize);
}
void Shutdown() {
MockExporterClient1::ResetCount();
MockExporterClient2::ResetCount();
}
private:
boost::asio::io_service io_service_;
};
int MockExporterClient1::client1_count;
+23 -22
View File
@@ -23,48 +23,49 @@
#include "opencensus/stats/internal/delta_producer.h"
#include "opencensus/stats/stats.h"
#include "opencensus/tags/tag_key.h"
#include "prometheus/exposer.h"
#include "ray/common/ray_config.h"
#include "ray/stats/metric.h"
#include "ray/stats/metric_exporter.h"
#include "ray/stats/metric_exporter_client.h"
#include "ray/util/logging.h"
namespace ray {
namespace stats {
#include <boost/asio.hpp>
/// Include metric_defs.h to define measure items.
#include "metric_defs.h"
/// Initialize stats.
static void Init(const std::string &address, const TagsType &global_tags,
bool disable_stats = false, bool enable_stdout_exporter = false) {
static void Init(
const TagsType &global_tags, const int metrics_agent_port,
boost::asio::io_service &io_service,
std::shared_ptr<MetricExporterClient> exporter_to_use = nullptr,
int64_t metrics_report_batch_size = RayConfig::instance().metrics_report_batch_size(),
bool disable_stats = !RayConfig::instance().enable_metrics_collection()) {
StatsConfig::instance().SetIsDisableStats(disable_stats);
if (disable_stats) {
RAY_LOG(INFO) << "Disabled stats.";
return;
}
// Enable the Prometheus exporter.
// Note that the reason for we using local static variables
// here is to make sure they are single-instances.
static auto exporter =
std::make_shared<opencensus::exporters::stats::PrometheusExporter>();
if (enable_stdout_exporter) {
// Enable stdout exporter by default.
opencensus::exporters::stats::StdoutExporter::Register();
// Force to have a singleton exporter.
static std::shared_ptr<MetricExporterClient> exporter;
// Default exporter is metrics agent exporter.
if (exporter_to_use == nullptr) {
std::shared_ptr<MetricExporterClient> stdout_exporter(new StdoutExporterClient());
exporter.reset(new MetricsAgentExporter(stdout_exporter, metrics_agent_port,
io_service, "127.0.0.1"));
} else {
exporter = exporter_to_use;
}
// Enable prometheus exporter.
try {
static prometheus::Exposer exposer(address);
exposer.RegisterCollectable(exporter);
RAY_LOG(INFO) << "Succeeded to initialize stats: exporter address is " << address;
} catch (std::exception &e) {
RAY_LOG(WARNING) << "Failed to create the Prometheus exporter. This doesn't "
<< "affect anything except stats. Caused by: " << e.what();
return;
}
// TODO(sang): Currently, we don't do any cleanup. This can lead us to lose last 10
// seconds data before we exit the main script.
MetricExporter::Register(exporter, metrics_report_batch_size);
opencensus::stats::StatsExporter::SetInterval(
StatsConfig::instance().GetReportInterval());
opencensus::stats::DeltaProducer::Get()->SetHarvestInterval(
+6 -1
View File
@@ -61,11 +61,16 @@ class StatsTest : public ::testing::Test {
void SetUp() {
const stats::TagsType global_tags = {{stats::LanguageKey, "CPP"},
{stats::WorkerPidKey, "1000"}};
ray::stats::Init("127.0.0.1:8888", global_tags, false);
std::shared_ptr<stats::MetricExporterClient> exporter(
new stats::StdoutExporterClient());
ray::stats::Init(global_tags, 10054, io_service_, exporter);
MockExporter::Register();
}
void Shutdown() {}
private:
boost::asio::io_service io_service_;
};
TEST_F(StatsTest, F) {