diff --git a/.gitignore b/.gitignore index 21d8edfc4..69d4533cb 100644 --- a/.gitignore +++ b/.gitignore @@ -156,3 +156,6 @@ venv # Vim .*.swp *.swp + +# tools +tools/prometheus* diff --git a/.travis.yml b/.travis.yml index 92737ecca..13b4f45f6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -161,6 +161,10 @@ install: - ./ci/suppress_output ./ci/travis/install-cython-examples.sh - ./ci/suppress_output bash src/ray/test/run_gcs_tests.sh + # stats test. + - ./ci/suppress_output bazel build //:stats_test -c opt + - ./bazel-bin/stats_test + # Raylet tests. - ./ci/suppress_output bash src/ray/test/run_object_manager_tests.sh - ./ci/suppress_output bazel test --build_tests_only --test_lang_filters=cc //:all diff --git a/BUILD.bazel b/BUILD.bazel index 41366578d..6dceb78a8 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -32,6 +32,40 @@ cc_binary( ], ) +cc_library( + name = "stats_lib", + srcs = glob( + [ + "src/ray/stats/*.cc", + ], + exclude = [ + "src/ray/stats/*_test.cc", + ], + ), + hdrs = glob( + [ + "src/ray/stats/*.h", + ], + ), + copts = COPTS, + includes = [ + "src", + ], + linkopts = ["-pthread"], + deps = [ + ":ray_util", + "@com_github_jupp0r_prometheus_cpp//pull", + "@com_google_absl//absl/base:core_headers", + "@com_google_absl//absl/memory", + "@com_google_absl//absl/strings", + "@com_google_googletest//:gtest", + "@io_opencensus_cpp//opencensus/exporters/stats/prometheus:prometheus_exporter", + "@io_opencensus_cpp//opencensus/exporters/stats/stdout:stdout_exporter", + "@io_opencensus_cpp//opencensus/stats", + "@io_opencensus_cpp//opencensus/tags", + ], +) + cc_library( name = "raylet_lib", srcs = glob( @@ -48,6 +82,7 @@ cc_library( "src/ray/raylet/*.h", ]), copts = COPTS, + linkopts = ["-pthread"], deps = [ ":gcs", ":gcs_fbs", @@ -55,8 +90,16 @@ cc_library( ":object_manager", ":ray_common", ":ray_util", + ":stats_lib", "@boost//:asio", + "@com_github_jupp0r_prometheus_cpp//pull", + "@com_google_absl//absl/base:core_headers", + "@com_google_absl//absl/memory", + "@com_google_absl//absl/strings", "@com_google_googletest//:gtest", + "@io_opencensus_cpp//opencensus/exporters/stats/prometheus:prometheus_exporter", + "@io_opencensus_cpp//opencensus/stats", + "@io_opencensus_cpp//opencensus/tags", "@plasma//:plasma_client", ], ) @@ -146,6 +189,15 @@ cc_test( ], ) +cc_test( + name = "stats_test", + srcs = ["src/ray/stats/stats_test.cc"], + deps = [ + ":stats_lib", + "@com_google_googletest//:gtest_main", + ], +) + cc_library( name = "object_manager", srcs = glob([ @@ -299,6 +351,7 @@ cc_library( ":node_manager_fbs", ":ray_common", ":ray_util", + ":stats_lib", "@boost//:asio", ], ) diff --git a/WORKSPACE b/WORKSPACE index a2a5c191e..86751bcfb 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -11,6 +11,8 @@ git_repository( remote = "https://github.com/ruifangChen/checkstyle_java", ) +load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") + git_repository( name = "com_github_nelhage_rules_boost", commit = "6d6fd834281cb8f8e758dd9ad76df86304bf1869", @@ -63,3 +65,31 @@ new_git_repository( load("@//bazel:python_configure.bzl", "python_configure") python_configure(name = "local_config_python") + +http_archive( + name = "io_opencensus_cpp", + strip_prefix = "opencensus-cpp-0.3.0", + urls = ["https://github.com/census-instrumentation/opencensus-cpp/archive/v0.3.0.zip"], +) + +# OpenCensus depends on Abseil so we have to explicitly pull it in. +# This is how diamond dependencies are prevented. +git_repository( + name = "com_google_absl", + commit = "88a152ae747c3c42dc9167d46c590929b048d436", + remote = "https://github.com/abseil/abseil-cpp.git", +) + +# OpenCensus depends on jupp0r/prometheus-cpp +http_archive( + name = "com_github_jupp0r_prometheus_cpp", + strip_prefix = "prometheus-cpp-master", + + # TODO(qwang): We should use the repository of `jupp0r` here when this PR + # `https://github.com/jupp0r/prometheus-cpp/pull/225` getting merged. + urls = ["https://github.com/jovany-wang/prometheus-cpp/archive/master.zip"], +) + +load("@com_github_jupp0r_prometheus_cpp//:repositories.bzl", "prometheus_cpp_repositories") + +prometheus_cpp_repositories() diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 8942e68ac..12e08a649 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -2,6 +2,7 @@ #include "ray/ray_config.h" #include "ray/raylet/raylet.h" +#include "ray/stats/stats.h" #include "ray/status.h" #ifndef RAYLET_TEST @@ -20,7 +21,7 @@ int main(int argc, char *argv[]) { ray::RayLogLevel::INFO, /*log_dir=*/""); ray::RayLog::InstallFailureSignalHandler(); - RAY_CHECK(argc >= 14 && argc <= 16); + RAY_CHECK(argc >= 14 && argc <= 18); const std::string raylet_socket_name = std::string(argv[1]); const std::string store_socket_name = std::string(argv[2]); @@ -37,6 +38,17 @@ int main(int argc, char *argv[]) { const std::string java_worker_command = std::string(argv[13]); const std::string redis_password = (argc >= 15 ? std::string(argv[14]) : ""); const std::string temp_dir = (argc >= 16 ? std::string(argv[15]) : "/tmp/ray"); + const std::string disable_stats_str(argc >= 17 ? std::string(argv[16]) : "false"); + const bool disable_stats = ("true" == disable_stats_str); + const std::string stat_address = + (argc >= 18 ? std::string(argv[17]) : "127.0.0.1:8888"); + + // Initialize stats. + const ray::stats::TagsType global_tags = { + {ray::stats::JobNameKey, "raylet"}, + {ray::stats::VersionKey, "0.7.0"}, + {ray::stats::NodeAddressKey, node_ip_address}}; + ray::stats::Init(stat_address, global_tags, disable_stats); // Configuration for the node manager. ray::raylet::NodeManagerConfig node_manager_config; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 3566df39d..efbba1a53 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -7,6 +7,7 @@ #include "ray/common/common_protocol.h" #include "ray/id.h" #include "ray/raylet/format/node_manager_generated.h" +#include "ray/stats/stats.h" namespace { @@ -1318,6 +1319,7 @@ void NodeManager::TreatTaskAsFailedIfLost(const Task &task) { void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineage, bool forwarded) { + stats::TaskCountReceived().Record(1); const TaskSpecification &spec = task.GetTaskSpecification(); const TaskID &task_id = spec.TaskId(); RAY_LOG(DEBUG) << "Submitting task: task_id=" << task_id diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 12c753bdc..44f8e39f7 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -5,6 +5,8 @@ #include #include +#include "ray/ray_config.h" +#include "ray/stats/stats.h" #include "ray/status.h" #include "ray/util/logging.h" @@ -130,6 +132,8 @@ void WorkerPool::StartWorkerProcess(const Language &language) { RAY_LOG(DEBUG) << "Started worker process with pid " << pid; state.starting_worker_processes.emplace( std::make_pair(pid, num_workers_per_process_)); + stats::CurrentWorker().Record(pid, {{stats::LanguageKey, EnumNameLanguage(language)}, + {stats::WorkerPidKey, std::to_string(pid)}}); return; } } @@ -232,6 +236,11 @@ std::shared_ptr WorkerPool::PopWorker(const TaskSpecification &task_spec bool WorkerPool::DisconnectWorker(const std::shared_ptr &worker) { auto &state = GetStateForLanguage(worker->GetLanguage()); RAY_CHECK(RemoveWorker(state.registered_workers, worker)); + + stats::CurrentWorker().Record( + 0, {{stats::LanguageKey, EnumNameLanguage(worker->GetLanguage())}, + {stats::WorkerPidKey, std::to_string(worker->Pid())}}); + return RemoveWorker(state.idle, worker); } diff --git a/src/ray/stats/metric.cc b/src/ray/stats/metric.cc new file mode 100644 index 000000000..46ec61dd4 --- /dev/null +++ b/src/ray/stats/metric.cc @@ -0,0 +1,105 @@ +#include "ray/stats/metric.h" + +namespace ray { + +namespace stats { + +static void RegisterAsView(opencensus::stats::ViewDescriptor view_descriptor, + const std::vector &keys) { + // Register global keys. + for (const auto &tag : ray::stats::StatsConfig::instance().GetGlobalTags()) { + view_descriptor = view_descriptor.add_column(tag.first); + } + + // Register custom keys. + for (const auto &key : keys) { + view_descriptor = view_descriptor.add_column(key); + } + + opencensus::stats::View view(view_descriptor); + view_descriptor.RegisterForExport(); +} + +StatsConfig &StatsConfig::instance() { + static StatsConfig instance; + return instance; +} + +void StatsConfig::SetGlobalTags(const TagsType &global_tags) { + global_tags_ = global_tags; +} + +const TagsType &StatsConfig::GetGlobalTags() const { return global_tags_; } + +void StatsConfig::SetIsDisableStats(bool disable_stats) { + is_stats_disabled_ = disable_stats; +} + +bool StatsConfig::IsStatsDisabled() const { return is_stats_disabled_; } + +void Metric::Record(double value, const TagsType &tags) { + if (StatsConfig::instance().IsStatsDisabled()) { + return; + } + + if (measure_ == nullptr) { + measure_.reset(new opencensus::stats::Measure( + opencensus::stats::Measure::Register(name_, description_, unit_))); + RegisterView(); + } + + // Do record. + TagsType combined_tags(tags); + combined_tags.insert(std::end(combined_tags), + std::begin(StatsConfig::instance().GetGlobalTags()), + std::end(StatsConfig::instance().GetGlobalTags())); + opencensus::stats::Record({{*measure_, value}}, combined_tags); +} + +void Gauge::RegisterView() { + opencensus::stats::ViewDescriptor view_descriptor = + opencensus::stats::ViewDescriptor() + .set_name(name_) + .set_description(description_) + .set_measure(name_) + .set_aggregation(opencensus::stats::Aggregation::LastValue()); + RegisterAsView(view_descriptor, tag_keys_); +} + +void Histogram::RegisterView() { + opencensus::stats::ViewDescriptor view_descriptor = + opencensus::stats::ViewDescriptor() + .set_name(name_) + .set_description(description_) + .set_measure(name_) + .set_aggregation(opencensus::stats::Aggregation::Distribution( + opencensus::stats::BucketBoundaries::Explicit(boundaries_))); + + RegisterAsView(view_descriptor, tag_keys_); +} + +void Count::RegisterView() { + opencensus::stats::ViewDescriptor view_descriptor = + opencensus::stats::ViewDescriptor() + .set_name(name_) + .set_description(description_) + .set_measure(name_) + .set_aggregation(opencensus::stats::Aggregation::Count()); + + RegisterAsView(view_descriptor, tag_keys_); +} + +void Sum::RegisterView() { + opencensus::stats::ViewDescriptor view_descriptor = + opencensus::stats::ViewDescriptor() + .set_name(name_) + .set_description(description_) + .set_measure(name_) + .set_aggregation(opencensus::stats::Aggregation::Count()); + + RegisterAsView(view_descriptor, tag_keys_); +} + +} // namespace stats + +} // namespace ray diff --git a/src/ray/stats/metric.h b/src/ray/stats/metric.h new file mode 100644 index 000000000..96f67bbd5 --- /dev/null +++ b/src/ray/stats/metric.h @@ -0,0 +1,139 @@ +#ifndef RAY_STATS_METRIC_H +#define RAY_STATS_METRIC_H + +#include +#include + +#include "opencensus/exporters/stats/prometheus/prometheus_exporter.h" +#include "opencensus/stats/stats.h" +#include "opencensus/tags/tag_key.h" +#include "prometheus/exposer.h" + +#include "ray/util/logging.h" + +namespace ray { + +namespace stats { + +/// Include tag_defs.h to define tag items +#include "tag_defs.h" + +class StatsConfig final { + public: + static StatsConfig &instance(); + + void SetGlobalTags(const TagsType &global_tags); + + const TagsType &GetGlobalTags() const; + + void SetIsDisableStats(bool disable_stats); + + bool IsStatsDisabled() const; + + private: + StatsConfig() = default; + ~StatsConfig() = default; + StatsConfig(const StatsConfig &) = delete; + StatsConfig &operator=(const StatsConfig &) = delete; + + private: + TagsType global_tags_; + bool is_stats_disabled_ = true; +}; + +/// The helper function for registering a view. +static void RegisterAsView(opencensus::stats::ViewDescriptor view_descriptor, + const std::vector &keys); + +/// A thin wrapper that wraps the `opencensus::tag::measure` for using it simply. +class Metric { + public: + Metric(const std::string &name, const std::string &description, const std::string &unit, + const std::vector &tag_keys = {}) + : measure_(nullptr), + name_(name), + description_(description), + unit_(unit), + tag_keys_(tag_keys){}; + + virtual ~Metric() = default; + + Metric &operator()() { return *this; } + + /// Get the name of this metric. + std::string GetName() const { return name_; } + + /// Record the value for this metric. + void Record(double value) { Record(value, {}); } + + /// Record the value for this metric. + /// + /// \param value The value that we record. + /// \param tags The tag values that we want to record for this metric record. + void Record(double value, const TagsType &tags); + + protected: + virtual void RegisterView() = 0; + + protected: + std::string name_; + std::string description_; + std::string unit_; + std::vector tag_keys_; + std::unique_ptr> measure_; + +}; // class Metric + +class Gauge : public Metric { + public: + Gauge(const std::string &name, const std::string &description, const std::string &unit, + const std::vector &tag_keys = {}) + : Metric(name, description, unit, tag_keys) {} + + private: + void RegisterView() override; + +}; // class Gauge + +class Histogram : public Metric { + public: + Histogram(const std::string &name, const std::string &description, + const std::string &unit, const std::vector boundaries, + const std::vector &tag_keys = {}) + : Metric(name, description, unit, tag_keys), boundaries_(boundaries) {} + + private: + void RegisterView() override; + + private: + std::vector boundaries_; + +}; // class Histogram + +class Count : public Metric { + public: + Count(const std::string &name, const std::string &description, const std::string &unit, + const std::vector &tag_keys = {}) + : Metric(name, description, unit, tag_keys) {} + + private: + void RegisterView() override; + +}; // class Count + +class Sum : public Metric { + public: + Sum(const std::string &name, const std::string &description, const std::string &unit, + const std::vector &tag_keys = {}) + : Metric(name, description, unit, tag_keys) {} + + private: + void RegisterView() override; + +}; // class Sum + +} // namespace stats + +} // namespace ray + +#endif // RAY_STATS_METRIC_H diff --git a/src/ray/stats/metric_defs.h b/src/ray/stats/metric_defs.h new file mode 100644 index 000000000..bd4aeb00c --- /dev/null +++ b/src/ray/stats/metric_defs.h @@ -0,0 +1,27 @@ +#ifndef RAY_STATS_METRIC_DEFS_H +#define RAY_STATS_METRIC_DEFS_H + +/// The definitions of metrics that you can use everywhere. +/// +/// There are 4 types of metric: +/// Histogram: Histogram distribution of metric points. +/// Gauge: Keeps the last recorded value, drops everything before. +/// Count: The count of the number of metric points. +/// Sum: A sum up of the metric points. +/// +/// You can follow these examples to define your metrics. + +static Gauge CurrentWorker("current_worker", + "This metric is used for report states of workers. " + "Through this, we can see the worker's state on dashboard.", + "1 pcs", {NodeAddressKey, LanguageKey, WorkerPidKey}); + +static Count TaskCountReceived("task_count_received", + "The count that the raylet received.", "pcs", + {NodeAddressKey}); + +static Histogram RedisLatency("redis_latency", "The latency of a Redis operation.", "us", + {100, 200, 300, 400, 500, 600, 700, 800, 900, 1000}, + {NodeAddressKey, CustomKey}); + +#endif // RAY_STATS_METRIC_DEFS_H diff --git a/src/ray/stats/stats.h b/src/ray/stats/stats.h new file mode 100644 index 000000000..93eafb90d --- /dev/null +++ b/src/ray/stats/stats.h @@ -0,0 +1,60 @@ +#ifndef RAY_STATS_STATS_H +#define RAY_STATS_STATS_H + +#include +#include +#include + +#include "opencensus/exporters/stats/prometheus/prometheus_exporter.h" +#include "opencensus/exporters/stats/stdout/stdout_exporter.h" +#include "opencensus/stats/stats.h" +#include "opencensus/tags/tag_key.h" +#include "prometheus/exposer.h" + +#include "ray/stats/metric.h" +#include "ray/util/logging.h" + +namespace ray { + +namespace stats { + +/// Include metric_defs.h to define measure items. +#include "metric_defs.h" + +/// Initialize stats. +static void Init(const std::string &address, const TagsType &global_tags, + bool disable_stats = false) { + StatsConfig::instance().SetIsDisableStats(disable_stats); + if (disable_stats) { + RAY_LOG(INFO) << "Disabled stats."; + return; + } + + // Enable the Prometheus exporter. + // Note that the reason for we using local static variables + // here is to make sure they are single-instances. + static auto exporter = + std::make_shared(); + + // Enable stdout exporter by default. + opencensus::exporters::stats::StdoutExporter::Register(); + + // Enable prometheus exporter. + try { + static prometheus::Exposer exposer(address); + exposer.RegisterCollectable(exporter); + RAY_LOG(INFO) << "Succeeded to initialize stats: exporter address is " << address; + } catch (std::exception &e) { + RAY_LOG(WARNING) << "Failed to create the Prometheus exporter. This doesn't " + << "affect anything except stats. Caused by: " << e.what(); + return; + } + + StatsConfig::instance().SetGlobalTags(global_tags); +} + +} // namespace stats + +} // namespace ray + +#endif // RAY_STATS_STATS_H diff --git a/src/ray/stats/stats_test.cc b/src/ray/stats/stats_test.cc new file mode 100644 index 000000000..706478a73 --- /dev/null +++ b/src/ray/stats/stats_test.cc @@ -0,0 +1,66 @@ +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include +#include +#include +#include + +#include "absl/memory/memory.h" +#include "ray/stats/stats.h" + +namespace ray { + +class MockExporter : public opencensus::stats::StatsExporter::Handler { + public: + static void Register() { + opencensus::stats::StatsExporter::RegisterPushHandler( + absl::make_unique()); + } + + void ExportViewData( + const std::vector> &data) override { + for (const auto &datum : data) { + auto &descriptor = datum.first; + auto &view_data = datum.second; + + ASSERT_EQ("current_worker", descriptor.name()); + ASSERT_EQ(opencensus::stats::ViewData::Type::kDouble, view_data.type()); + + for (const auto row : view_data.double_data()) { + for (int i = 0; i < descriptor.columns().size(); ++i) { + if (descriptor.columns()[i].name() == "NodeAddress") { + ASSERT_EQ("Localhost", row.first[i]); + } + } + // row.second store the data of this metric. + ASSERT_EQ(2345, row.second); + } + } + } +}; + +class StatsTest : public ::testing::Test { + public: + void SetUp() { + ray::stats::Init("127.0.0.1:8888", {}, false); + MockExporter::Register(); + } + + void Shutdown() {} +}; + +TEST_F(StatsTest, F) { + for (size_t i = 0; i < 500; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + stats::CurrentWorker().Record(2345, {{stats::NodeAddressKey, "Localhost"}}); + } +} + +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/ray/stats/tag_defs.h b/src/ray/stats/tag_defs.h new file mode 100644 index 000000000..60bfcafae --- /dev/null +++ b/src/ray/stats/tag_defs.h @@ -0,0 +1,22 @@ +#ifndef RAY_STATS_TAG_DEFS_H +#define RAY_STATS_TAG_DEFS_H + +/// The definitions of tag keys that you can use every where. +/// You can follow these examples to define and register your tag keys. + +using TagKeyType = opencensus::tags::TagKey; +using TagsType = std::vector>; + +static const TagKeyType JobNameKey = TagKeyType::Register("JobName"); + +static const TagKeyType CustomKey = TagKeyType::Register("CustomKey"); + +static const TagKeyType NodeAddressKey = TagKeyType::Register("NodeAddress"); + +static const TagKeyType VersionKey = TagKeyType::Register("Version"); + +static const TagKeyType LanguageKey = TagKeyType::Register("Language"); + +static const TagKeyType WorkerPidKey = TagKeyType::Register("WorkerPid"); + +#endif // RAY_STATS_TAG_DEFS_H diff --git a/tools/install-prometheus-server.sh b/tools/install-prometheus-server.sh new file mode 100755 index 000000000..71db6fd2f --- /dev/null +++ b/tools/install-prometheus-server.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash + +set -x +set -e + +TOOLS_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) + +pushd $TOOLS_DIR + +# Download Prometheus server. +unamestr="$(uname)" +if [[ "$unamestr" == "Linux" ]]; then + echo "Downloading Prometheus server for linux system." + PACKAGE_NAME=prometheus-2.8.0.linux-amd64 +elif [[ "$unamestr" == "Darwin" ]]; then + echo "Downloading Prometheus server for MacOS system." + PACKAGE_NAME=prometheus-2.8.0.darwin-amd64 +else + echo "Downloading abort: Unrecognized platform." + exit -1 +fi + +URL=https://github.com/prometheus/prometheus/releases/download/v2.8.0/$PACKAGE_NAME.tar.gz +wget $URL +tar xvfz $PACKAGE_NAME.tar.gz + +popd