diff --git a/BUILD.bazel b/BUILD.bazel index 0373f61cc..8ffc27954 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -77,6 +77,11 @@ proto_library( deps = [":common_proto"], ) +cc_proto_library( + name = "reporter_cc_proto", + deps = [":reporter_proto"], +) + python_grpc_compile( name = "reporter_py_proto", deps = [":reporter_proto"], @@ -252,6 +257,31 @@ cc_library( ], ) +# Metrics Agent gRPC lib. +cc_grpc_library( + name = "reporter_cc_grpc", + srcs = [":reporter_proto"], + grpc_only = True, + deps = [":reporter_cc_proto"], +) + +# Metrics Agent client. +cc_library( + name = "reporter_rpc", + hdrs = glob([ + "src/ray/rpc/metrics_agent_client.h", + ]), + copts = COPTS, + deps = [ + ":grpc_common_lib", + ":ray_common", + ":reporter_cc_grpc", + "@boost//:asio", + "@boost//:thread", + "@com_github_grpc_grpc//:grpc++", + ], +) + # === End of rpc definitions === # === Begin of plasma definitions === @@ -576,6 +606,7 @@ cc_library( }), deps = [ ":ray_util", + ":reporter_rpc", "@com_github_jupp0r_prometheus_cpp//pull", "@com_google_absl//absl/base:core_headers", "@com_google_absl//absl/memory", diff --git a/python/ray/node.py b/python/ray/node.py index 19c3ad22f..9b304a0bf 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -108,6 +108,7 @@ class Node: include_log_monitor=True, resources={}, temp_dir=ray.utils.get_ray_temp_dir(), + metrics_agent_port=self._get_unused_port()[0], worker_path=os.path.join( os.path.dirname(os.path.abspath(__file__)), "workers/default_worker.py")) @@ -554,6 +555,7 @@ class Node: open_log(reporter_err_name)) process_info = ray.services.start_reporter( self.redis_address, + self._ray_params.metrics_agent_port, stdout_file=stdout_file, stderr_file=stderr_file, redis_password=self._ray_params.redis_password, @@ -661,6 +663,7 @@ class Node: self._ray_params.max_worker_port, self._ray_params.object_manager_port, self._ray_params.redis_password, + self._ray_params.metrics_agent_port, use_valgrind=use_valgrind, use_profiler=use_profiler, stdout_file=stdout_file, diff --git a/python/ray/parameter.py b/python/ray/parameter.py index cefe3b17f..113621547 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -87,6 +87,7 @@ class RayParams: Java worker. java_worker_options (list): The command options for Java worker. load_code_from_local: Whether load code from local file or from GCS. + metrics_agent_port(int): The port to bind metrics agent. _internal_config (str): JSON configuration for overriding RayConfig defaults. For testing purposes ONLY. lru_evict (bool): Enable LRU eviction if space is needed. @@ -132,6 +133,7 @@ class RayParams: java_worker_options=None, load_code_from_local=False, _internal_config=None, + metrics_agent_port=None, lru_evict=False): self.object_ref_seed = object_ref_seed self.redis_address = redis_address @@ -169,6 +171,7 @@ class RayParams: self.include_java = include_java self.java_worker_options = java_worker_options self.load_code_from_local = load_code_from_local + self.metrics_agent_port = metrics_agent_port self._internal_config = _internal_config self._lru_evict = lru_evict self._check_usage() diff --git a/python/ray/reporter.py b/python/ray/reporter.py index 437bae723..5ccc0b7f5 100644 --- a/python/ray/reporter.py +++ b/python/ray/reporter.py @@ -55,6 +55,10 @@ class ReporterServer(reporter_pb2_grpc.ReporterServiceServicer): return reporter_pb2.GetProfilingStatsReply( profiling_stats=profiling_stats, stdout=stdout, stderr=stderr) + def ReportMetrics(self, request, context): + # TODO(sang): Process metrics here. + return reporter_pb2.ReportMetricsReply() + def recursive_asdict(o): if isinstance(o, tuple) and hasattr(o, "_asdict"): @@ -94,11 +98,12 @@ class Reporter: redis_client: A client used to communicate with the Redis server. """ - def __init__(self, redis_address, redis_password=None): + def __init__(self, redis_address, port, redis_password=None): """Initialize the reporter object.""" self.cpu_counts = (psutil.cpu_count(), psutil.cpu_count(logical=False)) self.ip = ray.services.get_node_ip_address() self.hostname = platform.node() + self.port = port _ = psutil.cpu_percent() # For initialization @@ -225,7 +230,7 @@ class Reporter: server = grpc.server(thread_pool, options=(("grpc.so_reuseport", 0), )) reporter_pb2_grpc.add_ReporterServiceServicer_to_server( ReporterServer(), server) - port = server.add_insecure_port("[::]:0") + port = server.add_insecure_port("[::]:{}".format(self.port)) server.start() self.redis_client.set("REPORTER_PORT:{}".format(self.ip), port) """Run the reporter.""" @@ -248,6 +253,11 @@ if __name__ == "__main__": required=True, type=str, help="The address to use for Redis.") + parser.add_argument( + "--port", + required=True, + type=int, + help="The port to bind the reporter process.") parser.add_argument( "--redis-password", required=False, @@ -270,7 +280,8 @@ if __name__ == "__main__": args = parser.parse_args() ray.utils.setup_logger(args.logging_level, args.logging_format) - reporter = Reporter(args.redis_address, redis_password=args.redis_password) + reporter = Reporter( + args.redis_address, args.port, redis_password=args.redis_password) try: reporter.run() diff --git a/python/ray/services.py b/python/ray/services.py index 59cf5d5ec..51022ec79 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1065,6 +1065,7 @@ def start_log_monitor(redis_address, def start_reporter(redis_address, + port, stdout_file=None, stderr_file=None, redis_password=None, @@ -1073,6 +1074,7 @@ def start_reporter(redis_address, Args: redis_address (str): The address of the Redis instance. + port(int): The port to bind the reporter process. stdout_file: A file handle opened for writing to redirect stdout to. If no redirection should happen, then this should be None. stderr_file: A file handle opened for writing to redirect stderr to. If @@ -1085,10 +1087,8 @@ def start_reporter(redis_address, reporter_filepath = os.path.join( os.path.dirname(os.path.abspath(__file__)), "reporter.py") command = [ - sys.executable, - "-u", - reporter_filepath, - "--redis-address={}".format(redis_address), + sys.executable, "-u", reporter_filepath, + "--redis-address={}".format(redis_address), "--port={}".format(port) ] if redis_password: command += ["--redis-password", redis_password] @@ -1249,6 +1249,7 @@ def start_raylet(redis_address, max_worker_port=None, object_manager_port=None, redis_password=None, + metrics_agent_port=None, use_valgrind=False, use_profiler=False, stdout_file=None, @@ -1284,6 +1285,7 @@ def start_raylet(redis_address, max_worker_port (int): The highest port number that workers will bind on. If set, min_worker_port must also be set. redis_password: The password to use when connecting to Redis. + metrics_agent_port(int): The port where metrics agent is bound to. use_valgrind (bool): True if the raylet should be started inside of valgrind. If this is True, use_profiler must be False. use_profiler (bool): True if the raylet should be started inside @@ -1390,6 +1392,7 @@ def start_raylet(redis_address, "--redis_password={}".format(redis_password or ""), "--temp_dir={}".format(temp_dir), "--session_dir={}".format(session_dir), + "--metrics-agent-port={}".format(metrics_agent_port), ] if config.get("plasma_store_as_thread"): # command related to the plasma store diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 222a85bec..db985196e 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -316,5 +316,11 @@ RAY_CONFIG(bool, gcs_actor_service_enabled, getenv("RAY_GCS_ACTOR_SERVICE_ENABLED") != nullptr && getenv("RAY_GCS_ACTOR_SERVICE_ENABLED") == std::string("true")) +/// The batch size for metrics export. +RAY_CONFIG(int64_t, metrics_report_batch_size, 100) + +/// Whether or not we enable metrics collection. +RAY_CONFIG(int64_t, enable_metrics_collection, true) + /// Whether start the Plasma Store as a Raylet thread. RAY_CONFIG(bool, put_small_object_in_memory_store, false) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 97ed09fdb..abb9f2c14 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -294,7 +294,6 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ // NOTE(edoakes): any initialization depending on RayConfig must happen after this line. RayConfig::instance().initialize(internal_config); - // Start RPC server after all the task receivers are properly initialized and we have // our assigned port from the raylet. core_worker_server_ = std::unique_ptr( diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 0f4d426bf..5c9d66ab3 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -332,3 +332,10 @@ message CoreWorkerStats { // Local reference table. repeated ObjectRefInfo object_refs = 17; } + +message MetricPoint { + string metric_name = 1; + int64 timestamp = 2; + double value = 3; + map tags = 4; +} diff --git a/src/ray/protobuf/reporter.proto b/src/ray/protobuf/reporter.proto index 4ddb6562d..d45c0f4db 100644 --- a/src/ray/protobuf/reporter.proto +++ b/src/ray/protobuf/reporter.proto @@ -16,6 +16,8 @@ syntax = "proto3"; package ray.rpc; +import "src/ray/protobuf/common.proto"; + message GetProfilingStatsRequest { // PID of the worker process. uint32 pid = 1; @@ -32,8 +34,17 @@ message GetProfilingStatsReply { string stderr = 3; } +message ReportMetricsRequest { + repeated MetricPoint metrics_point = 1; +} + +message ReportMetricsReply { +} + // Service for communicating with the reporter.py process on a remote node. service ReporterService { // Get the profiling stats. rpc GetProfilingStats(GetProfilingStatsRequest) returns (GetProfilingStatsReply); + // Report metrics to the local metrics agents. + rpc ReportMetrics(ReportMetricsRequest) returns (ReportMetricsReply); } diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index ab851224c..0db0e25be 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -27,6 +27,7 @@ DEFINE_string(raylet_socket_name, "", "The socket name of raylet."); DEFINE_string(store_socket_name, "", "The socket name of object store."); DEFINE_int32(object_manager_port, -1, "The port of object manager."); DEFINE_int32(node_manager_port, -1, "The port of node manager."); +DEFINE_int32(metrics_agent_port, -1, "The port of metrics agent."); DEFINE_string(node_ip_address, "", "The ip address of this node."); DEFINE_string(redis_address, "", "The ip address of redis server."); DEFINE_int32(redis_port, -1, "The port of redis server."); @@ -43,10 +44,6 @@ DEFINE_string(java_worker_command, "", "Java worker command."); DEFINE_string(redis_password, "", "The password of redis."); DEFINE_string(temp_dir, "", "Temporary directory."); DEFINE_string(session_dir, "", "The path of this ray session directory."); -DEFINE_bool(disable_stats, false, "Whether disable the stats."); -DEFINE_string(stat_address, "127.0.0.1:8888", "The address that we report metrics to."); -DEFINE_bool(enable_stdout_exporter, false, - "Whether enable the stdout exporter for stats."); DEFINE_bool(head_node, false, "Whether this is the head node of the cluster."); // store options DEFINE_int64(object_store_memory, -1, "The initial memory of the object store."); @@ -67,6 +64,7 @@ int main(int argc, char *argv[]) { const std::string store_socket_name = FLAGS_store_socket_name; const int object_manager_port = static_cast(FLAGS_object_manager_port); const int node_manager_port = static_cast(FLAGS_node_manager_port); + const int metrics_agent_port = static_cast(FLAGS_metrics_agent_port); const std::string node_ip_address = FLAGS_node_ip_address; const std::string redis_address = FLAGS_redis_address; const int redis_port = static_cast(FLAGS_redis_port); @@ -82,22 +80,12 @@ int main(int argc, char *argv[]) { const std::string redis_password = FLAGS_redis_password; const std::string temp_dir = FLAGS_temp_dir; const std::string session_dir = FLAGS_session_dir; - const bool disable_stats = FLAGS_disable_stats; - const std::string stat_address = FLAGS_stat_address; - const bool enable_stdout_exporter = FLAGS_enable_stdout_exporter; const bool head_node = FLAGS_head_node; const int64_t object_store_memory = FLAGS_object_store_memory; const std::string plasma_directory = FLAGS_plasma_directory; const bool huge_pages = FLAGS_huge_pages; gflags::ShutDownCommandLineFlags(); - // Initialize stats. - const ray::stats::TagsType global_tags = { - {ray::stats::JobNameKey, "raylet"}, - {ray::stats::VersionKey, "0.9.0.dev0"}, - {ray::stats::NodeAddressKey, node_ip_address}}; - ray::stats::Init(stat_address, global_tags, disable_stats, enable_stdout_exporter); - // Configuration for the node manager. ray::raylet::NodeManagerConfig node_manager_config; std::unordered_map static_resource_conf; @@ -236,6 +224,13 @@ int main(int argc, char *argv[]) { server->Start(); })); + // Initialize stats. + const ray::stats::TagsType global_tags = { + {ray::stats::JobNameKey, "raylet"}, + {ray::stats::VersionKey, "0.9.0.dev0"}, + {ray::stats::NodeAddressKey, node_ip_address}}; + ray::stats::Init(global_tags, metrics_agent_port, main_service); + // Destroy the Raylet on a SIGTERM. The pointer to main_service is // guaranteed to be valid since this function will run the event loop // instead of returning immediately. diff --git a/src/ray/rpc/metrics_agent_client.h b/src/ray/rpc/metrics_agent_client.h new file mode 100644 index 000000000..35eab574f --- /dev/null +++ b/src/ray/rpc/metrics_agent_client.h @@ -0,0 +1,60 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include + +#include "ray/common/status.h" +#include "ray/protobuf/reporter.grpc.pb.h" +#include "ray/protobuf/reporter.pb.h" +#include "ray/rpc/grpc_client.h" +#include "ray/util/logging.h" + +namespace ray { +namespace rpc { + +/// Client used for communicating with a remote node manager server. +class MetricsAgentClient { + public: + /// Constructor. + /// + /// \param[in] address Address of the metrics agent server. + /// \param[in] port Port of the metrics agent server. + /// \param[in] client_call_manager The `ClientCallManager` used for managing requests. + MetricsAgentClient(const std::string &address, const int port, + ClientCallManager &client_call_manager) + : client_call_manager_(client_call_manager) { + grpc_client_ = std::unique_ptr>( + new GrpcClient(address, port, client_call_manager)); + }; + + /// Report metrics to metrics agent. + /// + /// \param[in] request The request message. + /// \param[in] callback The callback function that handles reply. + VOID_RPC_CLIENT_METHOD(ReporterService, ReportMetrics, grpc_client_, ) + + private: + /// The RPC client. + std::unique_ptr> grpc_client_; + + /// The `ClientCallManager` used for managing requests. + ClientCallManager &client_call_manager_; +}; + +} // namespace rpc +} // namespace ray diff --git a/src/ray/stats/metric.h b/src/ray/stats/metric.h index d35daa83a..b1acc43a8 100644 --- a/src/ray/stats/metric.h +++ b/src/ray/stats/metric.h @@ -17,10 +17,8 @@ #include #include -#include "opencensus/exporters/stats/prometheus/prometheus_exporter.h" #include "opencensus/stats/stats.h" #include "opencensus/tags/tag_key.h" -#include "prometheus/exposer.h" #include "ray/util/logging.h" @@ -35,12 +33,16 @@ class StatsConfig final { public: static StatsConfig &instance(); + /// Set the global tags that will be appended to all metrics in this process. void SetGlobalTags(const TagsType &global_tags); + /// Get the current global tags. const TagsType &GetGlobalTags() const; + /// Set if the stats are enabled in this process. void SetIsDisableStats(bool disable_stats); + /// Get whether or not stats are enabled. bool IsStatsDisabled() const; void SetReportInterval(const absl::Duration interval); @@ -59,6 +61,7 @@ class StatsConfig final { private: TagsType global_tags_; + /// If true, don't collect metrics in this process. bool is_stats_disabled_ = true; // Regular reporting interval for all reporters. absl::Duration report_interval_ = absl::Seconds(10); diff --git a/src/ray/stats/metric_exporter.h b/src/ray/stats/metric_exporter.h index c3fbbeacd..f08937439 100644 --- a/src/ray/stats/metric_exporter.h +++ b/src/ray/stats/metric_exporter.h @@ -35,7 +35,9 @@ class MetricExporter final : public opencensus::stats::StatsExporter::Handler { size_t report_batch_size = kDefaultBatchSize) : metric_exporter_client_(metric_exporter_client), report_batch_size_(report_batch_size) {} + ~MetricExporter() = default; + static void Register(std::shared_ptr metric_exporter_client, size_t report_batch_size) { opencensus::stats::StatsExporter::RegisterPushHandler( @@ -63,10 +65,8 @@ class MetricExporter final : public opencensus::stats::StatsExporter::Handler { tags[keys[i]] = row.first[i]; } // Current timestamp is used for point not view data time. - MetricPoint point{metric_name, - current_sys_time_ms(), - static_cast(row.second), - tags}; + MetricPoint point{metric_name, current_sys_time_ms(), + static_cast(row.second), tags}; RAY_LOG(DEBUG) << "Metric name " << metric_name << ", value " << point.value; points.push_back(std::move(point)); if (points.size() >= report_batch_size_) { diff --git a/src/ray/stats/metric_exporter_client.cc b/src/ray/stats/metric_exporter_client.cc index f5df17738..9a774c517 100644 --- a/src/ray/stats/metric_exporter_client.cc +++ b/src/ray/stats/metric_exporter_client.cc @@ -19,10 +19,16 @@ namespace ray { namespace stats { +/// +/// Stdout Exporter +/// void StdoutExporterClient::ReportMetrics(const std::vector &points) { RAY_LOG(DEBUG) << "Metric point size : " << points.size(); } +/// +/// Metrics Exporter Decorator +/// MetricExporterDecorator::MetricExporterDecorator( std::shared_ptr exporter) : exporter_(exporter) {} @@ -32,5 +38,35 @@ void MetricExporterDecorator::ReportMetrics(const std::vector &poin exporter_->ReportMetrics(points); } } + +/// +/// Metrics Agent Exporter +/// +MetricsAgentExporter::MetricsAgentExporter(std::shared_ptr exporter, + const int port, + boost::asio::io_service &io_service, + const std::string address) + : MetricExporterDecorator(exporter), client_call_manager_(io_service) { + client_.reset(new rpc::MetricsAgentClient(address, port, client_call_manager_)); +} + +void MetricsAgentExporter::ReportMetrics(const std::vector &points) { + MetricExporterDecorator::ReportMetrics(points); + rpc::ReportMetricsRequest request; + for (auto point : points) { + auto metric_point = request.add_metrics_point(); + metric_point->set_metric_name(point.metric_name); + metric_point->set_timestamp(point.timestamp); + metric_point->set_value(point.value); + auto mutable_tags = metric_point->mutable_tags(); + for (auto &tag : point.tags) { + (*mutable_tags)[tag.first] = tag.second; + } + } + + // TODO(sang): Should retry metrics report if it fails. + client_->ReportMetrics(request, nullptr); +} + } // namespace stats } // namespace ray diff --git a/src/ray/stats/metric_exporter_client.h b/src/ray/stats/metric_exporter_client.h index 507f4051c..bce4d9b62 100644 --- a/src/ray/stats/metric_exporter_client.h +++ b/src/ray/stats/metric_exporter_client.h @@ -14,6 +14,10 @@ #pragma once +#include + +#include "ray/rpc/client_call.h" +#include "ray/rpc/metrics_agent_client.h" #include "ray/stats/metric.h" namespace ray { @@ -51,15 +55,20 @@ class MetricExporterDecorator : public MetricExporterClient { std::shared_ptr exporter_; }; -class OpentsdbExporterClient : public MetricExporterDecorator { +class MetricsAgentExporter : public MetricExporterDecorator { public: - OpentsdbExporterClient(std::shared_ptr exporter) - : MetricExporterDecorator(exporter) {} - void ReportMetrics(const std::vector &points) override { - MetricExporterDecorator::ReportMetrics(points); - // TODO(lingxuan.zlx): opentsdb client is used for report to backend - // storage. - } + MetricsAgentExporter(std::shared_ptr exporter, const int port, + boost::asio::io_service &io_service, const std::string address); + + ~MetricsAgentExporter() {} + + void ReportMetrics(const std::vector &points) override; + + private: + /// Client to call a metrics agent gRPC server. + std::unique_ptr client_; + /// Call Manager for gRPC client. + rpc::ClientCallManager client_call_manager_; }; } // namespace stats diff --git a/src/ray/stats/metric_exporter_client_test.cc b/src/ray/stats/metric_exporter_client_test.cc index 6b1e32b3b..30929d675 100644 --- a/src/ray/stats/metric_exporter_client_test.cc +++ b/src/ray/stats/metric_exporter_client_test.cc @@ -122,17 +122,20 @@ class MetricExporterClientTest : public ::testing::Test { absl::Duration harvest_interval = absl::Milliseconds(kReportFlushInterval / 2); ray::stats::StatsConfig::instance().SetReportInterval(report_interval); ray::stats::StatsConfig::instance().SetHarvestInterval(harvest_interval); - ray::stats::Init("127.0.0.1:8888", global_tags, false); + std::shared_ptr exporter(new stats::StdoutExporterClient()); std::shared_ptr mock1(new MockExporterClient1(exporter)); std::shared_ptr mock2(new MockExporterClient2(mock1)); - MetricExporter::Register(mock2, kMockReportBatchSize); + ray::stats::Init(global_tags, 10054, io_service_, mock2, kMockReportBatchSize); } void Shutdown() { MockExporterClient1::ResetCount(); MockExporterClient2::ResetCount(); } + + private: + boost::asio::io_service io_service_; }; int MockExporterClient1::client1_count; diff --git a/src/ray/stats/stats.h b/src/ray/stats/stats.h index d02258164..c0340d596 100644 --- a/src/ray/stats/stats.h +++ b/src/ray/stats/stats.h @@ -23,48 +23,49 @@ #include "opencensus/stats/internal/delta_producer.h" #include "opencensus/stats/stats.h" #include "opencensus/tags/tag_key.h" -#include "prometheus/exposer.h" +#include "ray/common/ray_config.h" #include "ray/stats/metric.h" +#include "ray/stats/metric_exporter.h" +#include "ray/stats/metric_exporter_client.h" #include "ray/util/logging.h" namespace ray { namespace stats { +#include + /// Include metric_defs.h to define measure items. #include "metric_defs.h" /// Initialize stats. -static void Init(const std::string &address, const TagsType &global_tags, - bool disable_stats = false, bool enable_stdout_exporter = false) { +static void Init( + const TagsType &global_tags, const int metrics_agent_port, + boost::asio::io_service &io_service, + std::shared_ptr exporter_to_use = nullptr, + int64_t metrics_report_batch_size = RayConfig::instance().metrics_report_batch_size(), + bool disable_stats = !RayConfig::instance().enable_metrics_collection()) { StatsConfig::instance().SetIsDisableStats(disable_stats); if (disable_stats) { RAY_LOG(INFO) << "Disabled stats."; return; } - // Enable the Prometheus exporter. - // Note that the reason for we using local static variables - // here is to make sure they are single-instances. - static auto exporter = - std::make_shared(); - - if (enable_stdout_exporter) { - // Enable stdout exporter by default. - opencensus::exporters::stats::StdoutExporter::Register(); + // Force to have a singleton exporter. + static std::shared_ptr exporter; + // Default exporter is metrics agent exporter. + if (exporter_to_use == nullptr) { + std::shared_ptr stdout_exporter(new StdoutExporterClient()); + exporter.reset(new MetricsAgentExporter(stdout_exporter, metrics_agent_port, + io_service, "127.0.0.1")); + } else { + exporter = exporter_to_use; } - // Enable prometheus exporter. - try { - static prometheus::Exposer exposer(address); - exposer.RegisterCollectable(exporter); - RAY_LOG(INFO) << "Succeeded to initialize stats: exporter address is " << address; - } catch (std::exception &e) { - RAY_LOG(WARNING) << "Failed to create the Prometheus exporter. This doesn't " - << "affect anything except stats. Caused by: " << e.what(); - return; - } + // TODO(sang): Currently, we don't do any cleanup. This can lead us to lose last 10 + // seconds data before we exit the main script. + MetricExporter::Register(exporter, metrics_report_batch_size); opencensus::stats::StatsExporter::SetInterval( StatsConfig::instance().GetReportInterval()); opencensus::stats::DeltaProducer::Get()->SetHarvestInterval( diff --git a/src/ray/stats/stats_test.cc b/src/ray/stats/stats_test.cc index 66c09f643..d787d6937 100644 --- a/src/ray/stats/stats_test.cc +++ b/src/ray/stats/stats_test.cc @@ -61,11 +61,16 @@ class StatsTest : public ::testing::Test { void SetUp() { const stats::TagsType global_tags = {{stats::LanguageKey, "CPP"}, {stats::WorkerPidKey, "1000"}}; - ray::stats::Init("127.0.0.1:8888", global_tags, false); + std::shared_ptr exporter( + new stats::StdoutExporterClient()); + ray::stats::Init(global_tags, 10054, io_service_, exporter); MockExporter::Register(); } void Shutdown() {} + + private: + boost::asio::io_service io_service_; }; TEST_F(StatsTest, F) {