From 2de9bfc7e36d465dfd1476eff41caa691369061f Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 9 Aug 2018 14:39:23 -0700 Subject: [PATCH] [xray] Log warnings for asio handlers that take too long (#2601) * Add fatal check for heartbeat drift * Log warning messages for handlers that take too long * Add debug labels to all ClientConnections --- src/common/state/ray_config.h | 16 ++++++++++++++++ src/ray/common/client_connection.cc | 18 ++++++++++++++---- src/ray/common/client_connection.h | 7 +++++-- .../test/object_manager_stress_test.cc | 5 +++-- .../object_manager/test/object_manager_test.cc | 5 +++-- src/ray/raylet/node_manager.cc | 16 ++++++++++++---- src/ray/raylet/node_manager.h | 11 +++++++++-- src/ray/raylet/raylet.cc | 15 ++++++++------- src/ray/raylet/worker_pool_test.cc | 4 ++-- 9 files changed, 72 insertions(+), 25 deletions(-) diff --git a/src/common/state/ray_config.h b/src/common/state/ray_config.h index 4096bdae5..155f65c84 100644 --- a/src/common/state/ray_config.h +++ b/src/common/state/ray_config.h @@ -12,12 +12,18 @@ class RayConfig { int64_t ray_protocol_version() const { return ray_protocol_version_; } + uint64_t handler_warning_timeout_ms() const { + return handler_warning_timeout_ms_; + } + int64_t heartbeat_timeout_milliseconds() const { return heartbeat_timeout_milliseconds_; } int64_t num_heartbeats_timeout() const { return num_heartbeats_timeout_; } + uint64_t num_heartbeats_warning() const { return num_heartbeats_warning_; } + int64_t initial_reconstruction_timeout_milliseconds() const { return initial_reconstruction_timeout_milliseconds_; } @@ -113,8 +119,10 @@ class RayConfig { private: RayConfig() : ray_protocol_version_(0x0000000000000000), + handler_warning_timeout_ms_(100), heartbeat_timeout_milliseconds_(100), num_heartbeats_timeout_(100), + num_heartbeats_warning_(5), initial_reconstruction_timeout_milliseconds_(200), get_timeout_milliseconds_(1000), worker_get_request_size_(10000), @@ -154,6 +162,10 @@ class RayConfig { /// In theory, this is used to detect Ray version mismatches. int64_t ray_protocol_version_; + /// The duration that a single handler on the event loop can take before a + /// warning is logged that the handler is taking too long. + uint64_t handler_warning_timeout_ms_; + /// The duration between heartbeats. These are sent by the plasma manager and /// local scheduler. int64_t heartbeat_timeout_milliseconds_; @@ -161,6 +173,10 @@ class RayConfig { /// heartbeat intervals, the global scheduler or monitor process will report /// it as dead to the db_client table. int64_t num_heartbeats_timeout_; + /// For a raylet, if the last heartbeat was sent more than this many + /// heartbeat periods ago, then a warning will be logged that the heartbeat + /// handler is drifting. + uint64_t num_heartbeats_warning_; /// The initial period for a task execution lease. The lease will expire this /// many milliseconds after the first acquisition of the lease. Nodes that diff --git a/src/ray/common/client_connection.cc b/src/ray/common/client_connection.cc index 04daa2067..2c272daab 100644 --- a/src/ray/common/client_connection.cc +++ b/src/ray/common/client_connection.cc @@ -86,9 +86,9 @@ ray::Status ServerConnection::WriteMessage(int64_t type, int64_t length, template std::shared_ptr> ClientConnection::Create( ClientHandler &client_handler, MessageHandler &message_handler, - boost::asio::basic_stream_socket &&socket) { + boost::asio::basic_stream_socket &&socket, const std::string &debug_label) { std::shared_ptr> self( - new ClientConnection(message_handler, std::move(socket))); + new ClientConnection(message_handler, std::move(socket), debug_label)); // Let our manager process our new connection. client_handler(*self); return self; @@ -96,8 +96,11 @@ std::shared_ptr> ClientConnection::Create( template ClientConnection::ClientConnection(MessageHandler &message_handler, - boost::asio::basic_stream_socket &&socket) - : ServerConnection(std::move(socket)), message_handler_(message_handler) {} + boost::asio::basic_stream_socket &&socket, + const std::string &debug_label) + : ServerConnection(std::move(socket)), + message_handler_(message_handler), + debug_label_(debug_label) {} template const ClientID &ClientConnection::GetClientID() { @@ -149,7 +152,14 @@ void ClientConnection::ProcessMessage(const boost::system::error_code &error) if (error) { read_type_ = static_cast(protocol::MessageType::DisconnectClient); } + + uint64_t start_ms = current_time_ms(); message_handler_(this->shared_from_this(), read_type_, read_message_.data()); + uint64_t interval = current_time_ms() - start_ms; + if (interval > RayConfig::instance().handler_warning_timeout_ms()) { + RAY_LOG(WARNING) << "[" << debug_label_ << "]ProcessMessage with type " << read_type_ + << " took " << interval << " ms "; + } } template class ServerConnection; diff --git a/src/ray/common/client_connection.h b/src/ray/common/client_connection.h index a0c8b659c..016a420cf 100644 --- a/src/ray/common/client_connection.h +++ b/src/ray/common/client_connection.h @@ -84,7 +84,7 @@ class ClientConnection : public ServerConnection, /// \return std::shared_ptr. static std::shared_ptr> Create( ClientHandler &new_client_handler, MessageHandler &message_handler, - boost::asio::basic_stream_socket &&socket); + boost::asio::basic_stream_socket &&socket, const std::string &debug_label); /// \return The ClientID of the remote client. const ClientID &GetClientID(); @@ -100,7 +100,8 @@ class ClientConnection : public ServerConnection, private: /// A private constructor for a node client connection. ClientConnection(MessageHandler &message_handler, - boost::asio::basic_stream_socket &&socket); + boost::asio::basic_stream_socket &&socket, + const std::string &debug_label); /// Process an error from the last operation, then process the message /// header from the client. void ProcessMessageHeader(const boost::system::error_code &error); @@ -112,6 +113,8 @@ class ClientConnection : public ServerConnection, ClientID client_id_; /// The handler for a message from the client. MessageHandler message_handler_; + /// A label used for debug messages. + const std::string debug_label_; /// Buffers for the current message being read rom the client. int64_t read_version_; int64_t read_type_; diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index 76d9f2ae4..88cde3986 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -74,8 +74,9 @@ class MockServer { object_manager_.ProcessClientMessage(client, message_type, message); }; // Accept a new local client and dispatch it to the node manager. - auto new_connection = TcpClientConnection::Create(client_handler, message_handler, - std::move(object_manager_socket_)); + auto new_connection = + TcpClientConnection::Create(client_handler, message_handler, + std::move(object_manager_socket_), "object manager"); DoAcceptObjectManager(); } diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index 09e0263c0..d3e165cad 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -65,8 +65,9 @@ class MockServer { object_manager_.ProcessClientMessage(client, message_type, message); }; // Accept a new local client and dispatch it to the node manager. - auto new_connection = TcpClientConnection::Create(client_handler, message_handler, - std::move(object_manager_socket_)); + auto new_connection = + TcpClientConnection::Create(client_handler, message_handler, + std::move(object_manager_socket_), "object manager"); DoAcceptObjectManager(); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index f2238b2fc..441a58ed6 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -82,7 +82,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, object_manager_(object_manager), gcs_client_(gcs_client), heartbeat_timer_(io_service), - heartbeat_period_ms_(config.heartbeat_period_ms), + heartbeat_period_(std::chrono::milliseconds(config.heartbeat_period_ms)), local_resources_(config.resource_config), local_available_resources_(config.resource_config), worker_pool_(config.num_initial_workers, config.num_workers_per_process, @@ -108,7 +108,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, remote_clients_(), remote_server_connections_(), actor_registry_() { - RAY_CHECK(heartbeat_period_ms_ > 0); + RAY_CHECK(heartbeat_period_.count() > 0); // Initialize the resource map with own cluster resource configuration. ClientID local_client_id = gcs_client_->client_table().GetLocalClientId(); cluster_resource_map_.emplace(local_client_id, @@ -205,6 +205,7 @@ ray::Status NodeManager::RegisterGcs() { driver_table_handler, nullptr)); // Start sending heartbeats to the GCS. + last_heartbeat_at_ms_ = current_time_ms(); Heartbeat(); return ray::Status::OK(); @@ -223,6 +224,14 @@ void NodeManager::HandleDriverTableUpdate( } void NodeManager::Heartbeat() { + uint64_t now_ms = current_time_ms(); + uint64_t interval = now_ms - last_heartbeat_at_ms_; + if (interval > RayConfig::instance().num_heartbeats_warning() * + RayConfig::instance().heartbeat_timeout_milliseconds()) { + RAY_LOG(WARNING) << "Last heartbeat was sent " << interval << " ms ago "; + } + last_heartbeat_at_ms_ = now_ms; + RAY_LOG(DEBUG) << "[Heartbeat] sending heartbeat."; auto &heartbeat_table = gcs_client_->heartbeat_table(); auto heartbeat_data = std::make_shared(); @@ -255,8 +264,7 @@ void NodeManager::Heartbeat() { RAY_CHECK_OK(status); // Reset the timer. - auto heartbeat_period = boost::posix_time::milliseconds(heartbeat_period_ms_); - heartbeat_timer_.expires_from_now(heartbeat_period); + heartbeat_timer_.expires_from_now(heartbeat_period_); heartbeat_timer_.async_wait([this](const boost::system::error_code &error) { RAY_CHECK(!error); Heartbeat(); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 49cadbc81..85d4f14fd 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -1,6 +1,8 @@ #ifndef RAY_RAYLET_NODE_MANAGER_H #define RAY_RAYLET_NODE_MANAGER_H +#include + // clang-format off #include "ray/raylet/task.h" #include "ray/object_manager/object_manager.h" @@ -159,8 +161,13 @@ class NodeManager { plasma::PlasmaClient store_client_; /// A client connection to the GCS. std::shared_ptr gcs_client_; - boost::asio::deadline_timer heartbeat_timer_; - uint64_t heartbeat_period_ms_; + /// The timer used to send heartbeats. + boost::asio::steady_timer heartbeat_timer_; + /// The period used for the heartbeat timer. + std::chrono::milliseconds heartbeat_period_; + /// The time that the last heartbeat was sent at. Used to make sure we are + /// keeping up with heartbeats. + uint64_t last_heartbeat_at_ms_; /// The resources local to this node. const SchedulingResources local_resources_; /// The resources (and specific resource IDs) that are currently available. diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index ffafa960f..e6c13c056 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -93,9 +93,9 @@ void Raylet::HandleAcceptNodeManager(const boost::system::error_code &error) { const uint8_t *message) { node_manager_.ProcessNodeManagerMessage(*client, message_type, message); }; - // Accept a new local client and dispatch it to the node manager. - auto new_connection = TcpClientConnection::Create(client_handler, message_handler, - std::move(node_manager_socket_)); + // Accept a new TCP client and dispatch it to the node manager. + auto new_connection = TcpClientConnection::Create( + client_handler, message_handler, std::move(node_manager_socket_), "node manager"); } // We're ready to accept another client. DoAcceptNodeManager(); @@ -115,9 +115,10 @@ void Raylet::HandleAcceptObjectManager(const boost::system::error_code &error) { const uint8_t *message) { object_manager_.ProcessClientMessage(client, message_type, message); }; - // Accept a new local client and dispatch it to the node manager. - auto new_connection = TcpClientConnection::Create(client_handler, message_handler, - std::move(object_manager_socket_)); + // Accept a new TCP client and dispatch it to the node manager. + auto new_connection = + TcpClientConnection::Create(client_handler, message_handler, + std::move(object_manager_socket_), "object manager"); DoAcceptObjectManager(); } @@ -138,7 +139,7 @@ void Raylet::HandleAccept(const boost::system::error_code &error) { }; // Accept a new local client and dispatch it to the node manager. auto new_connection = LocalClientConnection::Create(client_handler, message_handler, - std::move(socket_)); + std::move(socket_), "worker"); } // We're ready to accept another client. DoAccept(); diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 5c05913e0..b73f9ae3c 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -42,8 +42,8 @@ class WorkerPoolTest : public ::testing::Test { HandleMessage(client, message_type, message); }; boost::asio::local::stream_protocol::socket socket(io_service_); - auto client = - LocalClientConnection::Create(client_handler, message_handler, std::move(socket)); + auto client = LocalClientConnection::Create(client_handler, message_handler, + std::move(socket), "worker"); return std::shared_ptr(new Worker(pid, client)); }