[Metrics] Implement basic metrics changes (#11769)

* Implement basic metrics changes

* Addressed code review.

* Fix build issue.

* Fix build issue.
This commit is contained in:
SangBin Cho
2020-11-05 11:07:05 -08:00
committed by Alex Wu
parent 1c0a52d0df
commit 8f3c315a99
19 changed files with 80 additions and 125 deletions
@@ -17,6 +17,7 @@
#include <utility>
#include "ray/common/ray_config.h"
#include "ray/stats/stats.h"
namespace ray {
namespace gcs {
@@ -689,6 +690,10 @@ absl::flat_hash_set<ActorID> GcsActorManager::GetUnresolvedActorsByOwnerWorker(
return actor_ids;
}
void GcsActorManager::CollectStats() const {
stats::PendingActors.Record(pending_actors_.size());
}
void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,
const ray::WorkerID &worker_id,
bool intentional_exit) {
@@ -299,6 +299,9 @@ class GcsActorManager : public rpc::ActorInfoHandler {
const absl::flat_hash_map<ActorID, std::vector<RegisterActorCallback>>
&GetActorRegisterCallbacks() const;
/// Collect stats from gcs actor manager in-memory data structures.
void CollectStats() const;
private:
/// A data structure representing an actor's owner.
struct Owner {
@@ -16,6 +16,7 @@
#include "ray/common/ray_config.h"
#include "ray/gcs/pb_util.h"
#include "ray/stats/stats.h"
#include "src/ray/protobuf/gcs.pb.h"
namespace ray {
@@ -372,6 +373,8 @@ std::shared_ptr<rpc::GcsNodeInfo> GcsNodeManager::RemoveNode(
auto iter = alive_nodes_.find(node_id);
if (iter != alive_nodes_.end()) {
removed_node = std::move(iter->second);
// Record stats that there's a new removed node.
stats::NodeFailureTotal.Record(1);
// Remove from alive nodes.
alive_nodes_.erase(iter);
// Remove from cluster resources.
@@ -502,6 +505,7 @@ void GcsNodeManager::SendBatchedHeartbeat() {
batch->add_batch()->Swap(&heartbeat.second);
}
stats::OutboundHeartbeatSizeKB.Record((double)(batch->ByteSizeLong() / 1024.0));
for (auto &demand : aggregate_load) {
auto demand_proto = batch->mutable_resource_load_by_shape()->add_resource_demands();
@@ -16,6 +16,7 @@
#include "ray/common/ray_config.h"
#include "ray/gcs/pb_util.h"
#include "ray/stats/stats.h"
#include "ray/util/asio_util.h"
#include "src/ray/protobuf/gcs.pb.h"
@@ -403,6 +404,10 @@ void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenActorDead(
}
}
void GcsPlacementGroupManager::CollectStats() const {
stats::PendingPlacementGroups.Record(pending_placement_groups_.size());
}
void GcsPlacementGroupManager::Tick() {
UpdatePlacementGroupLoad();
execute_after(io_context_, [this] { Tick(); }, 1000 /* milliseconds */);
@@ -224,6 +224,9 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// to.
void CleanPlacementGroupIfNeededWhenActorDead(const ActorID &actor_id);
/// Collect stats from gcs placement group manager in-memory data structures.
void CollectStats() const;
private:
/// Try to create placement group after a short time.
void RetryCreatingPlacementGroup();
+9
View File
@@ -24,6 +24,7 @@
#include "ray/gcs/gcs_server/gcs_worker_manager.h"
#include "ray/gcs/gcs_server/stats_handler_impl.h"
#include "ray/gcs/gcs_server/task_info_handler_impl.h"
#include "ray/util/asio_util.h"
namespace ray {
namespace gcs {
@@ -287,5 +288,13 @@ std::unique_ptr<GcsWorkerManager> GcsServer::InitGcsWorkerManager() {
new GcsWorkerManager(gcs_table_storage_, gcs_pub_sub_));
}
void GcsServer::CollectStats() {
gcs_actor_manager_->CollectStats();
gcs_placement_group_manager_->CollectStats();
execute_after(
main_service_, [this] { CollectStats(); },
(RayConfig::instance().metrics_report_interval_ms() / 2) /* milliseconds */);
}
} // namespace gcs
} // namespace ray
+3
View File
@@ -110,6 +110,9 @@ class GcsServer {
/// server address directly to raylets and get rid of this lookup.
void StoreGcsServerAddressInRedis();
/// Collect stats from each module for every (metrics_report_interval_ms / 2) ms.
void CollectStats();
/// Gcs server configuration
GcsServerConfig config_;
/// The main io service to drive event posted from grpc threads.
+4 -1
View File
@@ -13,6 +13,7 @@
// limitations under the License.
#include "ray/gcs/gcs_server/gcs_worker_manager.h"
#include "ray/stats/stats.h"
namespace ray {
namespace gcs {
@@ -31,7 +32,8 @@ void GcsWorkerManager::HandleReportWorkerFailure(
RAY_LOG(INFO) << log_stream.str();
} else {
RAY_LOG(WARNING) << log_stream.str()
<< ". If there are lots of this logs, that might indicate there are "
<< ". Unintentional worker failures have been reported. If there "
"are lots of this logs, that might indicate there are "
"unexpected failures in the cluster.";
}
auto worker_failure_data = std::make_shared<WorkerTableData>();
@@ -51,6 +53,7 @@ void GcsWorkerManager::HandleReportWorkerFailure(
<< ", node id = " << node_id
<< ", address = " << worker_address.ip_address();
} else {
stats::UnintentionalWorkerFailures.Record(1);
RAY_CHECK_OK(gcs_pub_sub_->Publish(WORKER_CHANNEL, worker_id.Binary(),
worker_failure_data->SerializeAsString(),
nullptr));
+1 -1
View File
@@ -40,7 +40,7 @@ void ProcessCallback(int64_t callback_index,
if (!callback_item->is_subscription_) {
// Record the redis latency for non-subscription redis operations.
auto end_time = absl::GetCurrentTimeNanos() / 1000;
ray::stats::RedisLatency().Record(end_time - callback_item->start_time_);
ray::stats::GcsLatency().Record(end_time - callback_item->start_time_);
}
// Dispatch the callback.
-3
View File
@@ -927,10 +927,7 @@ void ObjectManager::RecordMetrics() const {
stats::ObjectStoreAvailableMemory().Record(config_.object_store_memory - used_memory_);
stats::ObjectStoreUsedMemory().Record(used_memory_);
stats::ObjectStoreLocalObjects().Record(local_objects_.size());
stats::ObjectManagerWaitRequests().Record(active_wait_requests_.size());
stats::ObjectManagerPullRequests().Record(pull_requests_.size());
stats::ObjectManagerUnfulfilledPushRequests().Record(unfulfilled_push_requests_.size());
stats::ObjectManagerProfileEvents().Record(profile_events_.size());
}
} // namespace ray
+1 -4
View File
@@ -404,6 +404,7 @@ void NodeManager::Heartbeat() {
"lagging, this node can be marked as dead mistakenly.";
}
last_heartbeat_at_ms_ = now_ms;
stats::HeartbeatReportMs.Record(interval);
auto heartbeat_data = std::make_shared<HeartbeatTableData>();
SchedulingResources &local_resources = cluster_resource_map_[self_node_id_];
@@ -2259,7 +2260,6 @@ void NodeManager::MarkObjectsAsFailed(
}
void NodeManager::SubmitTask(const Task &task) {
stats::TaskCountReceived().Record(1);
const TaskSpecification &spec = task.GetTaskSpecification();
// Actor tasks should be no longer submitted to raylet.
RAY_CHECK(!spec.IsActorTask());
@@ -3466,14 +3466,11 @@ void NodeManager::RecordMetrics() {
}
object_manager_.RecordMetrics();
worker_pool_.RecordMetrics();
local_queues_.RecordMetrics();
task_dependency_manager_.RecordMetrics();
auto statistical_data = GetActorStatisticalData(actor_registry_);
stats::LiveActors().Record(statistical_data.live_actors);
stats::RestartingActors().Record(statistical_data.restarting_actors);
stats::DeadActors().Record(statistical_data.dead_actors);
}
bool NodeManager::ReturnBundleResources(const BundleSpecification &bundle_spec) {
+1 -6
View File
@@ -537,12 +537,7 @@ std::string SchedulingQueue::DebugString() const {
}
void SchedulingQueue::RecordMetrics() const {
stats::NumPlaceableTasks().Record(
GetTaskQueue(TaskState::PLACEABLE)->GetTasks().size());
stats::NumPlaceableTasks().Record(GetTaskQueue(TaskState::WAITING)->GetTasks().size());
stats::NumPlaceableTasks().Record(GetTaskQueue(TaskState::READY)->GetTasks().size());
stats::NumPlaceableTasks().Record(GetTaskQueue(TaskState::RUNNING)->GetTasks().size());
stats::NumPlaceableTasks().Record(
stats::NumInfeasibleTasks().Record(
GetTaskQueue(TaskState::INFEASIBLE)->GetTasks().size());
}
@@ -447,13 +447,6 @@ std::string TaskDependencyManager::DebugString() const {
return result.str();
}
void TaskDependencyManager::RecordMetrics() const {
stats::NumSubscribedTasks().Record(task_dependencies_.size());
stats::NumRequiredTasks().Record(required_tasks_.size());
stats::NumRequiredObjects().Record(required_objects_.size());
stats::NumPendingTasks().Record(pending_tasks_.size());
}
bool TaskDependencyManager::GetOwnerAddress(const ObjectID &object_id,
rpc::Address *owner_address) const {
const auto creating_task_entry = required_tasks_.find(object_id.TaskId());
-3
View File
@@ -152,9 +152,6 @@ class TaskDependencyManager {
/// \return string.
std::string DebugString() const;
/// Record metrics.
void RecordMetrics() const;
/// Get the address of the owner of this object. An address will only be
/// returned if the caller previously specified that this object is required
/// on this node, through a call to SubscribeGetDependencies or
-27
View File
@@ -873,10 +873,6 @@ bool WorkerPool::DisconnectWorker(const std::shared_ptr<WorkerInterface> &worker
}
}
stats::CurrentWorker().Record(
0, {{stats::LanguageKey, Language_Name(worker->GetLanguage())},
{stats::WorkerPidKey, std::to_string(worker->GetProcess().GetId())}});
MarkPortAsFree(worker->AssignedPort());
return RemoveWorker(state.idle, worker);
}
@@ -884,9 +880,6 @@ bool WorkerPool::DisconnectWorker(const std::shared_ptr<WorkerInterface> &worker
void WorkerPool::DisconnectDriver(const std::shared_ptr<WorkerInterface> &driver) {
auto &state = GetStateForLanguage(driver->GetLanguage());
RAY_CHECK(RemoveWorker(state.registered_drivers, driver));
stats::CurrentDriver().Record(
0, {{stats::LanguageKey, Language_Name(driver->GetLanguage())},
{stats::WorkerPidKey, std::to_string(driver->GetProcess().GetId())}});
MarkPortAsFree(driver->AssignedPort());
}
@@ -1040,26 +1033,6 @@ std::string WorkerPool::DebugString() const {
return result.str();
}
void WorkerPool::RecordMetrics() const {
for (const auto &entry : states_by_lang_) {
// Record worker.
for (auto worker : entry.second.registered_workers) {
stats::CurrentWorker().Record(
worker->GetProcess().GetId(),
{{stats::LanguageKey, Language_Name(worker->GetLanguage())},
{stats::WorkerPidKey, std::to_string(worker->GetProcess().GetId())}});
}
// Record driver.
for (auto driver : entry.second.registered_drivers) {
stats::CurrentDriver().Record(
driver->GetProcess().GetId(),
{{stats::LanguageKey, Language_Name(driver->GetLanguage())},
{stats::WorkerPidKey, std::to_string(driver->GetProcess().GetId())}});
}
}
}
} // namespace raylet
} // namespace ray
-3
View File
@@ -244,9 +244,6 @@ class WorkerPool : public WorkerPoolInterface {
/// \return string.
std::string DebugString() const;
/// Record metrics.
void RecordMetrics() const;
protected:
/// Asynchronously start a new worker process. Once the worker process has
/// registered with an external server, the process should create and
+35 -61
View File
@@ -27,25 +27,14 @@
///
/// Common
///
static Histogram RedisLatency("redis_latency", "The latency of a Redis operation.", "us",
{100, 200, 300, 400, 500, 600, 700, 800, 900, 1000},
{CustomKey});
static Histogram GcsLatency("gcs_latency",
"The latency of a GCS (by default Redis) operation.", "us",
{100, 200, 300, 400, 500, 600, 700, 800, 900, 1000},
{CustomKey});
///
/// Raylet Metrics
///
static Gauge CurrentWorker("current_worker",
"This metric is used for reporting states of workers."
"Through this, we can see the worker's state on dashboard.",
"1 pcs", {LanguageKey, WorkerPidKey});
static Gauge CurrentDriver("current_driver",
"This metric is used for reporting states of drivers.",
"1 pcs", {LanguageKey, DriverPidKey});
static Count TaskCountReceived("task_count_received",
"Number of tasks received by raylet.", "pcs", {});
static Gauge LocalAvailableResource("local_available_resource",
"The available resources on this node.", "pcs",
{ResourceNameKey});
@@ -59,8 +48,6 @@ static Gauge LiveActors("live_actors", "Number of live actors.", "actors");
static Gauge RestartingActors("restarting_actors", "Number of restarting actors.",
"actors");
static Gauge DeadActors("dead_actors", "Number of dead actors.", "actors");
static Gauge ObjectStoreAvailableMemory(
"object_store_available_memory",
"Amount of memory currently available in the object store.", "bytes");
@@ -73,54 +60,41 @@ static Gauge ObjectStoreLocalObjects("object_store_num_local_objects",
"Number of objects currently in the object store.",
"objects");
static Gauge ObjectManagerWaitRequests("object_manager_num_wait_requests",
"Number of pending wait requests for objects.",
"requests");
static Gauge ObjectManagerPullRequests("object_manager_num_pull_requests",
"Number of active pull requests for objects.",
"requests");
static Gauge ObjectManagerUnfulfilledPushRequests(
"object_manager_unfulfilled_push_requests",
"Number of unfulfilled push requests for objects.", "requests");
static Gauge ObjectManagerProfileEvents("object_manager_num_buffered_profile_events",
"Number of locally-buffered profile events.",
"events");
static Gauge NumSubscribedTasks(
"num_subscribed_tasks",
"The number of tasks that are subscribed to object dependencies.", "tasks");
static Gauge NumRequiredTasks("num_required_tasks",
"The number of tasks whose output object(s) are "
"required by another subscribed task.",
"tasks");
static Gauge NumRequiredObjects(
"num_required_objects",
"The number of objects that are required by a subscribed task.", "objects");
static Gauge NumPendingTasks("num_pending_tasks",
"The number of tasks that are pending execution.", "tasks");
static Gauge NumPlaceableTasks(
"num_placeable_tasks",
"The number of tasks in the scheduler that are in the 'placeable' state.", "tasks");
static Gauge NumWaitingTasks(
"num_waiting_tasks",
"The number of tasks in the scheduler that are in the 'waiting' state.", "tasks");
static Gauge NumReadyTasks(
"num_ready_tasks",
"The number of tasks in the scheduler that are in the 'ready' state.", "tasks");
static Gauge NumRunningTasks(
"num_running_tasks",
"The number of tasks in the scheduler that are in the 'running' state.", "tasks");
static Gauge NumInfeasibleTasks(
"num_infeasible_tasks",
"The number of tasks in the scheduler that are in the 'infeasible' state.", "tasks");
static Histogram HeartbeatReportMs(
"heartbeat_report_ms",
"Heartbeat report time in raylet. If this value is high, that means there's a high "
"system load. It is possible that this node will be killed because of missing "
"heartbeats.",
"ms", {100, 200, 400, 800, 1600, 3200, 6400, 15000, 30000});
///
/// GCS Server Metrics
///
static Count UnintentionalWorkerFailures(
"unintentional_worker_failures_total",
"Number of worker failures that are not intentional. For example, worker failures "
"due to system related errors.",
"worker_failures");
static Count NodeFailureTotal(
"node_failure_total", "Number of node failures that have happened in the cluster.",
"node_failures.");
static Gauge PendingActors("pending_actors", "Number of pending actors in GCS server.",
"actors");
static Gauge PendingPlacementGroups(
"pending_placement_groups", "Number of pending placement groups in the GCS server.",
"placement_groups");
static Histogram OutboundHeartbeatSizeKB("outbound_heartbeat_size_kb",
"Outbound heartbeat payload size", "kb",
{10, 50, 100, 1000, 10000, 100000});
+1 -1
View File
@@ -144,7 +144,7 @@ bool DoubleEqualTo(double value, double compared_value) {
TEST_F(MetricExporterClientTest, decorator_test) {
// Export client should emit at least once in report flush interval.
for (size_t i = 0; i < 100; ++i) {
stats::CurrentWorker().Record(i + 1);
stats::LiveActors().Record(i + 1);
}
opencensus::stats::DeltaProducer::Get()->Flush();
opencensus::stats::StatsExporterImpl::Get()->Export();
+5 -8
View File
@@ -41,14 +41,12 @@ class MockExporter : public opencensus::stats::StatsExporter::Handler {
auto &descriptor = datum.first;
auto &view_data = datum.second;
ASSERT_EQ("current_worker", descriptor.name());
ASSERT_EQ("local_available_resource", descriptor.name());
ASSERT_EQ(opencensus::stats::ViewData::Type::kDouble, view_data.type());
for (const auto &row : view_data.double_data()) {
for (size_t i = 0; i < descriptor.columns().size(); ++i) {
if (descriptor.columns()[i].name() == "WorkerPidKey") {
ASSERT_EQ("1000", row.first[i]);
} else if (descriptor.columns()[i].name() == "LanguageKey") {
ASSERT_EQ("CPP", row.first[i]);
if (descriptor.columns()[i].name() == "ResourceName") {
ASSERT_EQ("CPU", row.first[i]);
}
}
// row.second store the data of this metric.
@@ -69,8 +67,7 @@ class StatsTest : public ::testing::Test {
absl::Duration harvest_interval = absl::Milliseconds(kReportFlushInterval / 2);
ray::stats::StatsConfig::instance().SetReportInterval(report_interval);
ray::stats::StatsConfig::instance().SetHarvestInterval(harvest_interval);
const stats::TagsType global_tags = {{stats::LanguageKey, "CPP"},
{stats::WorkerPidKey, "1000"}};
const stats::TagsType global_tags = {{stats::ResourceNameKey, "CPU"}};
std::shared_ptr<stats::MetricExporterClient> exporter(
new stats::StdoutExporterClient());
ray::stats::Init(global_tags, MetricsAgentPort, exporter);
@@ -85,7 +82,7 @@ class StatsTest : public ::testing::Test {
TEST_F(StatsTest, F) {
for (size_t i = 0; i < 20; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
stats::CurrentWorker().Record(2345);
stats::LocalAvailableResource().Record(2345);
}
}