Integrate metrics (#4246)

This commit is contained in:
Wang Qing
2019-04-03 12:01:02 +08:00
committed by Philipp Moritz
parent 8e19d3721f
commit 7d776f35e1
14 changed files with 560 additions and 1 deletions
+3
View File
@@ -156,3 +156,6 @@ venv
# Vim
.*.swp
*.swp
# tools
tools/prometheus*
+4
View File
@@ -161,6 +161,10 @@ install:
- ./ci/suppress_output ./ci/travis/install-cython-examples.sh
- ./ci/suppress_output bash src/ray/test/run_gcs_tests.sh
# stats test.
- ./ci/suppress_output bazel build //:stats_test -c opt
- ./bazel-bin/stats_test
# Raylet tests.
- ./ci/suppress_output bash src/ray/test/run_object_manager_tests.sh
- ./ci/suppress_output bazel test --build_tests_only --test_lang_filters=cc //:all
+53
View File
@@ -32,6 +32,40 @@ cc_binary(
],
)
cc_library(
name = "stats_lib",
srcs = glob(
[
"src/ray/stats/*.cc",
],
exclude = [
"src/ray/stats/*_test.cc",
],
),
hdrs = glob(
[
"src/ray/stats/*.h",
],
),
copts = COPTS,
includes = [
"src",
],
linkopts = ["-pthread"],
deps = [
":ray_util",
"@com_github_jupp0r_prometheus_cpp//pull",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/memory",
"@com_google_absl//absl/strings",
"@com_google_googletest//:gtest",
"@io_opencensus_cpp//opencensus/exporters/stats/prometheus:prometheus_exporter",
"@io_opencensus_cpp//opencensus/exporters/stats/stdout:stdout_exporter",
"@io_opencensus_cpp//opencensus/stats",
"@io_opencensus_cpp//opencensus/tags",
],
)
cc_library(
name = "raylet_lib",
srcs = glob(
@@ -48,6 +82,7 @@ cc_library(
"src/ray/raylet/*.h",
]),
copts = COPTS,
linkopts = ["-pthread"],
deps = [
":gcs",
":gcs_fbs",
@@ -55,8 +90,16 @@ cc_library(
":object_manager",
":ray_common",
":ray_util",
":stats_lib",
"@boost//:asio",
"@com_github_jupp0r_prometheus_cpp//pull",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/memory",
"@com_google_absl//absl/strings",
"@com_google_googletest//:gtest",
"@io_opencensus_cpp//opencensus/exporters/stats/prometheus:prometheus_exporter",
"@io_opencensus_cpp//opencensus/stats",
"@io_opencensus_cpp//opencensus/tags",
"@plasma//:plasma_client",
],
)
@@ -146,6 +189,15 @@ cc_test(
],
)
cc_test(
name = "stats_test",
srcs = ["src/ray/stats/stats_test.cc"],
deps = [
":stats_lib",
"@com_google_googletest//:gtest_main",
],
)
cc_library(
name = "object_manager",
srcs = glob([
@@ -299,6 +351,7 @@ cc_library(
":node_manager_fbs",
":ray_common",
":ray_util",
":stats_lib",
"@boost//:asio",
],
)
+30
View File
@@ -11,6 +11,8 @@ git_repository(
remote = "https://github.com/ruifangChen/checkstyle_java",
)
load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
git_repository(
name = "com_github_nelhage_rules_boost",
commit = "6d6fd834281cb8f8e758dd9ad76df86304bf1869",
@@ -63,3 +65,31 @@ new_git_repository(
load("@//bazel:python_configure.bzl", "python_configure")
python_configure(name = "local_config_python")
http_archive(
name = "io_opencensus_cpp",
strip_prefix = "opencensus-cpp-0.3.0",
urls = ["https://github.com/census-instrumentation/opencensus-cpp/archive/v0.3.0.zip"],
)
# OpenCensus depends on Abseil so we have to explicitly pull it in.
# This is how diamond dependencies are prevented.
git_repository(
name = "com_google_absl",
commit = "88a152ae747c3c42dc9167d46c590929b048d436",
remote = "https://github.com/abseil/abseil-cpp.git",
)
# OpenCensus depends on jupp0r/prometheus-cpp
http_archive(
name = "com_github_jupp0r_prometheus_cpp",
strip_prefix = "prometheus-cpp-master",
# TODO(qwang): We should use the repository of `jupp0r` here when this PR
# `https://github.com/jupp0r/prometheus-cpp/pull/225` getting merged.
urls = ["https://github.com/jovany-wang/prometheus-cpp/archive/master.zip"],
)
load("@com_github_jupp0r_prometheus_cpp//:repositories.bzl", "prometheus_cpp_repositories")
prometheus_cpp_repositories()
+13 -1
View File
@@ -2,6 +2,7 @@
#include "ray/ray_config.h"
#include "ray/raylet/raylet.h"
#include "ray/stats/stats.h"
#include "ray/status.h"
#ifndef RAYLET_TEST
@@ -20,7 +21,7 @@ int main(int argc, char *argv[]) {
ray::RayLogLevel::INFO,
/*log_dir=*/"");
ray::RayLog::InstallFailureSignalHandler();
RAY_CHECK(argc >= 14 && argc <= 16);
RAY_CHECK(argc >= 14 && argc <= 18);
const std::string raylet_socket_name = std::string(argv[1]);
const std::string store_socket_name = std::string(argv[2]);
@@ -37,6 +38,17 @@ int main(int argc, char *argv[]) {
const std::string java_worker_command = std::string(argv[13]);
const std::string redis_password = (argc >= 15 ? std::string(argv[14]) : "");
const std::string temp_dir = (argc >= 16 ? std::string(argv[15]) : "/tmp/ray");
const std::string disable_stats_str(argc >= 17 ? std::string(argv[16]) : "false");
const bool disable_stats = ("true" == disable_stats_str);
const std::string stat_address =
(argc >= 18 ? std::string(argv[17]) : "127.0.0.1:8888");
// Initialize stats.
const ray::stats::TagsType global_tags = {
{ray::stats::JobNameKey, "raylet"},
{ray::stats::VersionKey, "0.7.0"},
{ray::stats::NodeAddressKey, node_ip_address}};
ray::stats::Init(stat_address, global_tags, disable_stats);
// Configuration for the node manager.
ray::raylet::NodeManagerConfig node_manager_config;
+2
View File
@@ -7,6 +7,7 @@
#include "ray/common/common_protocol.h"
#include "ray/id.h"
#include "ray/raylet/format/node_manager_generated.h"
#include "ray/stats/stats.h"
namespace {
@@ -1318,6 +1319,7 @@ void NodeManager::TreatTaskAsFailedIfLost(const Task &task) {
void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineage,
bool forwarded) {
stats::TaskCountReceived().Record(1);
const TaskSpecification &spec = task.GetTaskSpecification();
const TaskID &task_id = spec.TaskId();
RAY_LOG(DEBUG) << "Submitting task: task_id=" << task_id
+9
View File
@@ -5,6 +5,8 @@
#include <algorithm>
#include <thread>
#include "ray/ray_config.h"
#include "ray/stats/stats.h"
#include "ray/status.h"
#include "ray/util/logging.h"
@@ -130,6 +132,8 @@ void WorkerPool::StartWorkerProcess(const Language &language) {
RAY_LOG(DEBUG) << "Started worker process with pid " << pid;
state.starting_worker_processes.emplace(
std::make_pair(pid, num_workers_per_process_));
stats::CurrentWorker().Record(pid, {{stats::LanguageKey, EnumNameLanguage(language)},
{stats::WorkerPidKey, std::to_string(pid)}});
return;
}
}
@@ -232,6 +236,11 @@ std::shared_ptr<Worker> WorkerPool::PopWorker(const TaskSpecification &task_spec
bool WorkerPool::DisconnectWorker(const std::shared_ptr<Worker> &worker) {
auto &state = GetStateForLanguage(worker->GetLanguage());
RAY_CHECK(RemoveWorker(state.registered_workers, worker));
stats::CurrentWorker().Record(
0, {{stats::LanguageKey, EnumNameLanguage(worker->GetLanguage())},
{stats::WorkerPidKey, std::to_string(worker->Pid())}});
return RemoveWorker(state.idle, worker);
}
+105
View File
@@ -0,0 +1,105 @@
#include "ray/stats/metric.h"
namespace ray {
namespace stats {
static void RegisterAsView(opencensus::stats::ViewDescriptor view_descriptor,
const std::vector<opencensus::tags::TagKey> &keys) {
// Register global keys.
for (const auto &tag : ray::stats::StatsConfig::instance().GetGlobalTags()) {
view_descriptor = view_descriptor.add_column(tag.first);
}
// Register custom keys.
for (const auto &key : keys) {
view_descriptor = view_descriptor.add_column(key);
}
opencensus::stats::View view(view_descriptor);
view_descriptor.RegisterForExport();
}
StatsConfig &StatsConfig::instance() {
static StatsConfig instance;
return instance;
}
void StatsConfig::SetGlobalTags(const TagsType &global_tags) {
global_tags_ = global_tags;
}
const TagsType &StatsConfig::GetGlobalTags() const { return global_tags_; }
void StatsConfig::SetIsDisableStats(bool disable_stats) {
is_stats_disabled_ = disable_stats;
}
bool StatsConfig::IsStatsDisabled() const { return is_stats_disabled_; }
void Metric::Record(double value, const TagsType &tags) {
if (StatsConfig::instance().IsStatsDisabled()) {
return;
}
if (measure_ == nullptr) {
measure_.reset(new opencensus::stats::Measure<double>(
opencensus::stats::Measure<double>::Register(name_, description_, unit_)));
RegisterView();
}
// Do record.
TagsType combined_tags(tags);
combined_tags.insert(std::end(combined_tags),
std::begin(StatsConfig::instance().GetGlobalTags()),
std::end(StatsConfig::instance().GetGlobalTags()));
opencensus::stats::Record({{*measure_, value}}, combined_tags);
}
void Gauge::RegisterView() {
opencensus::stats::ViewDescriptor view_descriptor =
opencensus::stats::ViewDescriptor()
.set_name(name_)
.set_description(description_)
.set_measure(name_)
.set_aggregation(opencensus::stats::Aggregation::LastValue());
RegisterAsView(view_descriptor, tag_keys_);
}
void Histogram::RegisterView() {
opencensus::stats::ViewDescriptor view_descriptor =
opencensus::stats::ViewDescriptor()
.set_name(name_)
.set_description(description_)
.set_measure(name_)
.set_aggregation(opencensus::stats::Aggregation::Distribution(
opencensus::stats::BucketBoundaries::Explicit(boundaries_)));
RegisterAsView(view_descriptor, tag_keys_);
}
void Count::RegisterView() {
opencensus::stats::ViewDescriptor view_descriptor =
opencensus::stats::ViewDescriptor()
.set_name(name_)
.set_description(description_)
.set_measure(name_)
.set_aggregation(opencensus::stats::Aggregation::Count());
RegisterAsView(view_descriptor, tag_keys_);
}
void Sum::RegisterView() {
opencensus::stats::ViewDescriptor view_descriptor =
opencensus::stats::ViewDescriptor()
.set_name(name_)
.set_description(description_)
.set_measure(name_)
.set_aggregation(opencensus::stats::Aggregation::Count());
RegisterAsView(view_descriptor, tag_keys_);
}
} // namespace stats
} // namespace ray
+139
View File
@@ -0,0 +1,139 @@
#ifndef RAY_STATS_METRIC_H
#define RAY_STATS_METRIC_H
#include <memory>
#include <unordered_map>
#include "opencensus/exporters/stats/prometheus/prometheus_exporter.h"
#include "opencensus/stats/stats.h"
#include "opencensus/tags/tag_key.h"
#include "prometheus/exposer.h"
#include "ray/util/logging.h"
namespace ray {
namespace stats {
/// Include tag_defs.h to define tag items
#include "tag_defs.h"
class StatsConfig final {
public:
static StatsConfig &instance();
void SetGlobalTags(const TagsType &global_tags);
const TagsType &GetGlobalTags() const;
void SetIsDisableStats(bool disable_stats);
bool IsStatsDisabled() const;
private:
StatsConfig() = default;
~StatsConfig() = default;
StatsConfig(const StatsConfig &) = delete;
StatsConfig &operator=(const StatsConfig &) = delete;
private:
TagsType global_tags_;
bool is_stats_disabled_ = true;
};
/// The helper function for registering a view.
static void RegisterAsView(opencensus::stats::ViewDescriptor view_descriptor,
const std::vector<opencensus::tags::TagKey> &keys);
/// A thin wrapper that wraps the `opencensus::tag::measure` for using it simply.
class Metric {
public:
Metric(const std::string &name, const std::string &description, const std::string &unit,
const std::vector<opencensus::tags::TagKey> &tag_keys = {})
: measure_(nullptr),
name_(name),
description_(description),
unit_(unit),
tag_keys_(tag_keys){};
virtual ~Metric() = default;
Metric &operator()() { return *this; }
/// Get the name of this metric.
std::string GetName() const { return name_; }
/// Record the value for this metric.
void Record(double value) { Record(value, {}); }
/// Record the value for this metric.
///
/// \param value The value that we record.
/// \param tags The tag values that we want to record for this metric record.
void Record(double value, const TagsType &tags);
protected:
virtual void RegisterView() = 0;
protected:
std::string name_;
std::string description_;
std::string unit_;
std::vector<opencensus::tags::TagKey> tag_keys_;
std::unique_ptr<opencensus::stats::Measure<double>> measure_;
}; // class Metric
class Gauge : public Metric {
public:
Gauge(const std::string &name, const std::string &description, const std::string &unit,
const std::vector<opencensus::tags::TagKey> &tag_keys = {})
: Metric(name, description, unit, tag_keys) {}
private:
void RegisterView() override;
}; // class Gauge
class Histogram : public Metric {
public:
Histogram(const std::string &name, const std::string &description,
const std::string &unit, const std::vector<double> boundaries,
const std::vector<opencensus::tags::TagKey> &tag_keys = {})
: Metric(name, description, unit, tag_keys), boundaries_(boundaries) {}
private:
void RegisterView() override;
private:
std::vector<double> boundaries_;
}; // class Histogram
class Count : public Metric {
public:
Count(const std::string &name, const std::string &description, const std::string &unit,
const std::vector<opencensus::tags::TagKey> &tag_keys = {})
: Metric(name, description, unit, tag_keys) {}
private:
void RegisterView() override;
}; // class Count
class Sum : public Metric {
public:
Sum(const std::string &name, const std::string &description, const std::string &unit,
const std::vector<opencensus::tags::TagKey> &tag_keys = {})
: Metric(name, description, unit, tag_keys) {}
private:
void RegisterView() override;
}; // class Sum
} // namespace stats
} // namespace ray
#endif // RAY_STATS_METRIC_H
+27
View File
@@ -0,0 +1,27 @@
#ifndef RAY_STATS_METRIC_DEFS_H
#define RAY_STATS_METRIC_DEFS_H
/// The definitions of metrics that you can use everywhere.
///
/// There are 4 types of metric:
/// Histogram: Histogram distribution of metric points.
/// Gauge: Keeps the last recorded value, drops everything before.
/// Count: The count of the number of metric points.
/// Sum: A sum up of the metric points.
///
/// You can follow these examples to define your metrics.
static Gauge CurrentWorker("current_worker",
"This metric is used for report states of workers. "
"Through this, we can see the worker's state on dashboard.",
"1 pcs", {NodeAddressKey, LanguageKey, WorkerPidKey});
static Count TaskCountReceived("task_count_received",
"The count that the raylet received.", "pcs",
{NodeAddressKey});
static Histogram RedisLatency("redis_latency", "The latency of a Redis operation.", "us",
{100, 200, 300, 400, 500, 600, 700, 800, 900, 1000},
{NodeAddressKey, CustomKey});
#endif // RAY_STATS_METRIC_DEFS_H
+60
View File
@@ -0,0 +1,60 @@
#ifndef RAY_STATS_STATS_H
#define RAY_STATS_STATS_H
#include <exception>
#include <string>
#include <unordered_map>
#include "opencensus/exporters/stats/prometheus/prometheus_exporter.h"
#include "opencensus/exporters/stats/stdout/stdout_exporter.h"
#include "opencensus/stats/stats.h"
#include "opencensus/tags/tag_key.h"
#include "prometheus/exposer.h"
#include "ray/stats/metric.h"
#include "ray/util/logging.h"
namespace ray {
namespace stats {
/// Include metric_defs.h to define measure items.
#include "metric_defs.h"
/// Initialize stats.
static void Init(const std::string &address, const TagsType &global_tags,
bool disable_stats = false) {
StatsConfig::instance().SetIsDisableStats(disable_stats);
if (disable_stats) {
RAY_LOG(INFO) << "Disabled stats.";
return;
}
// Enable the Prometheus exporter.
// Note that the reason for we using local static variables
// here is to make sure they are single-instances.
static auto exporter =
std::make_shared<opencensus::exporters::stats::PrometheusExporter>();
// Enable stdout exporter by default.
opencensus::exporters::stats::StdoutExporter::Register();
// Enable prometheus exporter.
try {
static prometheus::Exposer exposer(address);
exposer.RegisterCollectable(exporter);
RAY_LOG(INFO) << "Succeeded to initialize stats: exporter address is " << address;
} catch (std::exception &e) {
RAY_LOG(WARNING) << "Failed to create the Prometheus exporter. This doesn't "
<< "affect anything except stats. Caused by: " << e.what();
return;
}
StatsConfig::instance().SetGlobalTags(global_tags);
}
} // namespace stats
} // namespace ray
#endif // RAY_STATS_STATS_H
+66
View File
@@ -0,0 +1,66 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <chrono>
#include <iostream>
#include <thread>
#include <vector>
#include "absl/memory/memory.h"
#include "ray/stats/stats.h"
namespace ray {
class MockExporter : public opencensus::stats::StatsExporter::Handler {
public:
static void Register() {
opencensus::stats::StatsExporter::RegisterPushHandler(
absl::make_unique<MockExporter>());
}
void ExportViewData(
const std::vector<std::pair<opencensus::stats::ViewDescriptor,
opencensus::stats::ViewData>> &data) override {
for (const auto &datum : data) {
auto &descriptor = datum.first;
auto &view_data = datum.second;
ASSERT_EQ("current_worker", descriptor.name());
ASSERT_EQ(opencensus::stats::ViewData::Type::kDouble, view_data.type());
for (const auto row : view_data.double_data()) {
for (int i = 0; i < descriptor.columns().size(); ++i) {
if (descriptor.columns()[i].name() == "NodeAddress") {
ASSERT_EQ("Localhost", row.first[i]);
}
}
// row.second store the data of this metric.
ASSERT_EQ(2345, row.second);
}
}
}
};
class StatsTest : public ::testing::Test {
public:
void SetUp() {
ray::stats::Init("127.0.0.1:8888", {}, false);
MockExporter::Register();
}
void Shutdown() {}
};
TEST_F(StatsTest, F) {
for (size_t i = 0; i < 500; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
stats::CurrentWorker().Record(2345, {{stats::NodeAddressKey, "Localhost"}});
}
}
} // namespace ray
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
+22
View File
@@ -0,0 +1,22 @@
#ifndef RAY_STATS_TAG_DEFS_H
#define RAY_STATS_TAG_DEFS_H
/// The definitions of tag keys that you can use every where.
/// You can follow these examples to define and register your tag keys.
using TagKeyType = opencensus::tags::TagKey;
using TagsType = std::vector<std::pair<opencensus::tags::TagKey, std::string>>;
static const TagKeyType JobNameKey = TagKeyType::Register("JobName");
static const TagKeyType CustomKey = TagKeyType::Register("CustomKey");
static const TagKeyType NodeAddressKey = TagKeyType::Register("NodeAddress");
static const TagKeyType VersionKey = TagKeyType::Register("Version");
static const TagKeyType LanguageKey = TagKeyType::Register("Language");
static const TagKeyType WorkerPidKey = TagKeyType::Register("WorkerPid");
#endif // RAY_STATS_TAG_DEFS_H
+27
View File
@@ -0,0 +1,27 @@
#!/usr/bin/env bash
set -x
set -e
TOOLS_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
pushd $TOOLS_DIR
# Download Prometheus server.
unamestr="$(uname)"
if [[ "$unamestr" == "Linux" ]]; then
echo "Downloading Prometheus server for linux system."
PACKAGE_NAME=prometheus-2.8.0.linux-amd64
elif [[ "$unamestr" == "Darwin" ]]; then
echo "Downloading Prometheus server for MacOS system."
PACKAGE_NAME=prometheus-2.8.0.darwin-amd64
else
echo "Downloading abort: Unrecognized platform."
exit -1
fi
URL=https://github.com/prometheus/prometheus/releases/download/v2.8.0/$PACKAGE_NAME.tar.gz
wget $URL
tar xvfz $PACKAGE_NAME.tar.gz
popd