From e0bf9d73053b73a34f001799ae9276736ca8c7fd Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 15 Nov 2018 21:47:50 -0800 Subject: [PATCH] Add debug string to raylet (#3317) * initial debug string * format * wip debug string * fix compile * fix * update * finished * to file * logs dir * use temp root * fix * override --- python/ray/services.py | 1 + python/ray/worker.py | 2 +- src/ray/common/client_connection.cc | 23 ++++++++++ src/ray/common/client_connection.h | 14 ++++++ src/ray/gcs/client.cc | 15 ++++++ src/ray/gcs/client.h | 5 ++ src/ray/gcs/tables.cc | 35 ++++++++++++++ src/ray/gcs/tables.h | 33 +++++++++++++ src/ray/object_manager/connection_pool.cc | 16 +++++++ src/ray/object_manager/connection_pool.h | 5 ++ src/ray/object_manager/object_buffer_pool.cc | 8 ++++ src/ray/object_manager/object_buffer_pool.h | 5 ++ src/ray/object_manager/object_directory.cc | 8 ++++ src/ray/object_manager/object_directory.h | 7 +++ src/ray/object_manager/object_manager.cc | 15 ++++++ src/ray/object_manager/object_manager.h | 5 ++ .../object_store_notification_manager.cc | 10 ++++ .../object_store_notification_manager.h | 7 +++ src/ray/ray_config.h | 10 ++++ src/ray/raylet/actor_registration.cc | 13 ++++++ src/ray/raylet/actor_registration.h | 5 ++ src/ray/raylet/lineage_cache.cc | 12 +++++ src/ray/raylet/lineage_cache.h | 5 ++ src/ray/raylet/main.cc | 8 +++- src/ray/raylet/node_manager.cc | 46 +++++++++++++++++++ src/ray/raylet/node_manager.h | 18 ++++++++ src/ray/raylet/reconstruction_policy.cc | 7 +++ src/ray/raylet/reconstruction_policy.h | 5 ++ src/ray/raylet/reconstruction_policy_test.cc | 2 + src/ray/raylet/scheduling_queue.cc | 29 ++++++------ src/ray/raylet/scheduling_queue.h | 12 ++--- src/ray/raylet/scheduling_resources.cc | 8 ++++ src/ray/raylet/scheduling_resources.h | 5 ++ src/ray/raylet/task_dependency_manager.cc | 11 +++++ src/ray/raylet/task_dependency_manager.h | 5 ++ src/ray/raylet/worker_pool.cc | 10 ++++ src/ray/raylet/worker_pool.h | 5 ++ 37 files changed, 404 insertions(+), 26 deletions(-) diff --git a/python/ray/services.py b/python/ray/services.py index 75dffa45b..981d06160 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -955,6 +955,7 @@ def start_raylet(redis_address, start_worker_command, "", # Worker command for Java, not needed for Python. redis_password or "", + get_temp_root(), ] if use_valgrind: diff --git a/python/ray/worker.py b/python/ray/worker.py index 9c2358fce..5d5bf6e4a 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1655,7 +1655,7 @@ def init(redis_address=None, driver_id: The ID of driver. configure_logging: True if allow the logging cofiguration here. Otherwise, the users may want to configure it by their own. - logging_level: Logging level, default will be loging.INFO. + logging_level: Logging level, default will be logging.INFO. logging_format: Logging format, default will be "%(message)s" which means only contains the message. plasma_store_socket_name (str): If provided, it will specify the socket diff --git a/src/ray/common/client_connection.cc b/src/ray/common/client_connection.cc index 2098b8c8d..0018b4afa 100644 --- a/src/ray/common/client_connection.cc +++ b/src/ray/common/client_connection.cc @@ -83,6 +83,9 @@ void ServerConnection::ReadBuffer( template ray::Status ServerConnection::WriteMessage(int64_t type, int64_t length, const uint8_t *message) { + sync_writes_ += 1; + bytes_written_ += length; + std::vector message_buffers; auto write_version = RayConfig::instance().ray_protocol_version(); message_buffers.push_back(boost::asio::buffer(&write_version, sizeof(write_version))); @@ -96,6 +99,9 @@ template void ServerConnection::WriteMessageAsync( int64_t type, int64_t length, const uint8_t *message, const std::function &handler) { + async_writes_ += 1; + bytes_written_ += length; + auto write_buffer = std::unique_ptr(new AsyncWriteBuffer()); write_buffer->write_version = RayConfig::instance().ray_protocol_version(); write_buffer->write_type = type; @@ -220,6 +226,7 @@ void ClientConnection::ProcessMessageHeader(const boost::system::error_code & RAY_CHECK(read_version_ == RayConfig::instance().ray_protocol_version()); // Resize the message buffer to match the received length. read_message_.resize(read_length_); + ServerConnection::bytes_read_ += read_length_; // Wait for the message to be read. boost::asio::async_read( ServerConnection::socket_, boost::asio::buffer(read_message_), @@ -242,6 +249,22 @@ void ClientConnection::ProcessMessage(const boost::system::error_code &error) } } +template +std::string ServerConnection::DebugString() const { + std::stringstream result; + result << "\n- bytes read: " << bytes_read_; + result << "\n- bytes written: " << bytes_written_; + result << "\n- num async writes: " << async_writes_; + result << "\n- num sync writes: " << sync_writes_; + result << "\n- writing: " << async_write_in_flight_; + int64_t num_bytes = 0; + for (auto &buffer : async_write_queue_) { + num_bytes += buffer->write_length; + } + result << "\n- pending async bytes: " << num_bytes; + return result.str(); +} + template class ServerConnection; template class ServerConnection; template class ClientConnection; diff --git a/src/ray/common/client_connection.h b/src/ray/common/client_connection.h index 83c9849d9..ae33d3aa5 100644 --- a/src/ray/common/client_connection.h +++ b/src/ray/common/client_connection.h @@ -72,6 +72,8 @@ class ServerConnection : public std::enable_shared_from_this socket_.close(ec); } + std::string DebugString() const; + protected: /// A private constructor for a server connection. ServerConnection(boost::asio::basic_stream_socket &&socket); @@ -97,6 +99,18 @@ class ServerConnection : public std::enable_shared_from_this /// Whether we are in the middle of an async write. bool async_write_in_flight_; + /// Count of async messages sent total. + int64_t async_writes_ = 0; + + /// Count of sync messages sent total. + int64_t sync_writes_ = 0; + + /// Count of bytes sent total. + int64_t bytes_written_ = 0; + + /// Count of bytes read total. + int64_t bytes_read_ = 0; + private: /// Asynchronously flushes the write queue. While async writes are running, the flag /// async_write_in_flight_ will be set. This should only be called when no async writes diff --git a/src/ray/gcs/client.cc b/src/ray/gcs/client.cc index f323073bb..1f7bb8460 100644 --- a/src/ray/gcs/client.cc +++ b/src/ray/gcs/client.cc @@ -179,6 +179,21 @@ Status AsyncGcsClient::Attach(boost::asio::io_service &io_service) { return Status::OK(); } +std::string AsyncGcsClient::DebugString() const { + std::stringstream result; + result << "AsyncGcsClient:"; + result << "\n- TaskTable: " << raylet_task_table_->DebugString(); + result << "\n- ActorTable: " << actor_table_->DebugString(); + result << "\n- TaskReconstructionLog: " << task_reconstruction_log_->DebugString(); + result << "\n- TaskLeaseTable: " << task_lease_table_->DebugString(); + result << "\n- HeartbeatTable: " << heartbeat_table_->DebugString(); + result << "\n- ErrorTable: " << error_table_->DebugString(); + result << "\n- ProfileTable: " << profile_table_->DebugString(); + result << "\n- ClientTable: " << client_table_->DebugString(); + result << "\n- DriverTable: " << driver_table_->DebugString(); + return result.str(); +} + ObjectTable &AsyncGcsClient::object_table() { return *object_table_; } raylet::TaskTable &AsyncGcsClient::raylet_task_table() { return *raylet_task_table_; } diff --git a/src/ray/gcs/client.h b/src/ray/gcs/client.h index d08f7b2c7..62813b553 100644 --- a/src/ray/gcs/client.h +++ b/src/ray/gcs/client.h @@ -75,6 +75,11 @@ class RAY_EXPORT AsyncGcsClient { std::vector> shard_contexts() { return shard_contexts_; } std::shared_ptr primary_context() { return primary_context_; } + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; + private: std::unique_ptr function_table_; std::unique_ptr class_table_; diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index f1a9a1975..24c053b21 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -38,6 +38,7 @@ namespace gcs { template Status Log::Append(const JobID &job_id, const ID &id, std::shared_ptr &dataT, const WriteCallback &done) { + num_appends_++; auto callback = [this, id, dataT, done](const std::string &data) { if (done != nullptr) { (done)(client_, id, *dataT); @@ -56,6 +57,7 @@ template Status Log::AppendAt(const JobID &job_id, const ID &id, std::shared_ptr &dataT, const WriteCallback &done, const WriteCallback &failure, int log_length) { + num_appends_++; auto callback = [this, id, dataT, done, failure](const std::string &data) { if (data.empty()) { if (done != nullptr) { @@ -78,6 +80,7 @@ Status Log::AppendAt(const JobID &job_id, const ID &id, template Status Log::Lookup(const JobID &job_id, const ID &id, const Callback &lookup) { + num_lookups_++; auto callback = [this, id, lookup](const std::string &data) { if (lookup != nullptr) { std::vector results; @@ -164,9 +167,17 @@ Status Log::CancelNotifications(const JobID &job_id, const ID &id, pubsub_channel_, nullptr); } +template +std::string Log::DebugString() const { + std::stringstream result; + result << "num lookups: " << num_lookups_ << ", num appends: " << num_appends_; + return result.str(); +} + template Status Table::Add(const JobID &job_id, const ID &id, std::shared_ptr &dataT, const WriteCallback &done) { + num_adds_++; auto callback = [this, id, dataT, done](const std::string &data) { if (done != nullptr) { (done)(client_, id, *dataT); @@ -184,6 +195,7 @@ Status Table::Add(const JobID &job_id, const ID &id, template Status Table::Lookup(const JobID &job_id, const ID &id, const Callback &lookup, const FailureCallback &failure) { + num_lookups_++; return Log::Lookup(job_id, id, [lookup, failure](AsyncGcsClient *client, const ID &id, const std::vector &data) { @@ -221,6 +233,13 @@ Status Table::Subscribe(const JobID &job_id, const ClientID &client_id done); } +template +std::string Table::DebugString() const { + std::stringstream result; + result << "num lookups: " << num_lookups_ << ", num adds: " << num_adds_; + return result.str(); +} + Status ErrorTable::PushErrorToDriver(const JobID &job_id, const std::string &type, const std::string &error_message, double timestamp) { auto data = std::make_shared(); @@ -234,6 +253,10 @@ Status ErrorTable::PushErrorToDriver(const JobID &job_id, const std::string &typ }); } +std::string ErrorTable::DebugString() const { + return Log::DebugString(); +} + Status ProfileTable::AddProfileEvent(const std::string &event_type, const std::string &component_type, const UniqueID &component_id, @@ -273,6 +296,10 @@ Status ProfileTable::AddProfileEventBatch(const ProfileTableData &profile_events }); } +std::string ProfileTable::DebugString() const { + return Log::DebugString(); +} + Status DriverTable::AppendDriverData(const JobID &driver_id, bool is_dead) { auto data = std::make_shared(); data->driver_id = driver_id.binary(); @@ -436,6 +463,14 @@ const std::unordered_map &ClientTable::GetAllClients return client_cache_; } +std::string ClientTable::DebugString() const { + std::stringstream result; + result << Log::DebugString(); + result << ", cache size: " << client_cache_.size() + << ", num removed: " << removed_clients_.size(); + return result.str(); +} + template class Log; template class Log; template class Table; diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 6bc3759e9..2847376fd 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -184,6 +184,11 @@ class Log : public LogInterface, virtual public PubsubInterface { Status CancelNotifications(const JobID &job_id, const ID &id, const ClientID &client_id); + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; + protected: std::shared_ptr GetRedisContext(const ID &id) { static std::hash index; @@ -208,6 +213,10 @@ class Log : public LogInterface, virtual public PubsubInterface { /// Commands to a GCS table can either be regular (default) or chain-replicated. CommandType command_type_ = CommandType::kRegular; + + private: + int64_t num_appends_ = 0; + int64_t num_lookups_ = 0; }; template @@ -295,6 +304,11 @@ class Table : private Log, const Callback &subscribe, const FailureCallback &failure, const SubscriptionCallback &done); + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; + protected: using Log::shard_contexts_; using Log::client_; @@ -302,6 +316,10 @@ class Table : private Log, using Log::prefix_; using Log::command_type_; using Log::GetRedisContext; + + private: + int64_t num_adds_ = 0; + int64_t num_lookups_ = 0; }; class ObjectTable : public Log { @@ -452,6 +470,11 @@ class ErrorTable : private Log { /// \return Status. Status PushErrorToDriver(const JobID &job_id, const std::string &type, const std::string &error_message, double timestamp); + + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; }; class ProfileTable : private Log { @@ -484,6 +507,11 @@ class ProfileTable : private Log { /// \param profile_events The profile events to record. /// \return Status. Status AddProfileEventBatch(const ProfileTableData &profile_events); + + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; }; using CustomSerializerTable = Table; @@ -583,6 +611,11 @@ class ClientTable : private Log { /// \return The client ID to client information map. const std::unordered_map &GetAllClients() const; + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; + private: /// Handle a client table notification. void HandleNotification(AsyncGcsClient *client, const ClientTableDataT ¬ifications); diff --git a/src/ray/object_manager/connection_pool.cc b/src/ray/object_manager/connection_pool.cc index 10ed2b8f3..c4c7c8df8 100644 --- a/src/ray/object_manager/connection_pool.cc +++ b/src/ray/object_manager/connection_pool.cc @@ -133,4 +133,20 @@ void ConnectionPool::Return(SenderMapType &conn_map, const ClientID &client_id, RAY_LOG(DEBUG) << "Return " << client_id << " " << conn_map[client_id].size(); } +std::string ConnectionPool::DebugString() const { + std::stringstream result; + result << "ConnectionPool:"; + result << "\n- num message send connections: " << message_send_connections_.size(); + result << "\n- num transfer send connections: " << transfer_send_connections_.size(); + result << "\n- num avail message send connections: " + << available_transfer_send_connections_.size(); + result << "\n- num avail transfer send connections: " + << available_transfer_send_connections_.size(); + result << "\n- num message receive connections: " + << message_receive_connections_.size(); + result << "\n- num transfer receive connections: " + << transfer_receive_connections_.size(); + return result.str(); +} + } // namespace ray diff --git a/src/ray/object_manager/connection_pool.h b/src/ray/object_manager/connection_pool.h index d2e92ee2a..69fbd5261 100644 --- a/src/ray/object_manager/connection_pool.h +++ b/src/ray/object_manager/connection_pool.h @@ -90,6 +90,11 @@ class ConnectionPool { /// \return Status of invoking this method. ray::Status RemoveSender(ConnectionType type, std::shared_ptr conn); + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; + /// This object cannot be copied for thread-safety. RAY_DISALLOW_COPY_AND_ASSIGN(ConnectionPool); diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 6651f2042..5f101e795 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -195,4 +195,12 @@ void ObjectBufferPool::FreeObjects(const std::vector &object_ids) { ARROW_CHECK_OK(store_client_.Delete(plasma_ids)); } +std::string ObjectBufferPool::DebugString() const { + std::stringstream result; + result << "BufferPool:"; + result << "\n- get buffer state map size: " << get_buffer_state_.size(); + result << "\n- create buffer state map size: " << create_buffer_state_.size(); + return result.str(); +} + } // namespace ray diff --git a/src/ray/object_manager/object_buffer_pool.h b/src/ray/object_manager/object_buffer_pool.h index 9fe347451..ed6594ed4 100644 --- a/src/ray/object_manager/object_buffer_pool.h +++ b/src/ray/object_manager/object_buffer_pool.h @@ -129,6 +129,11 @@ class ObjectBufferPool { /// \return Void. void FreeObjects(const std::vector &object_ids); + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; + private: /// Abort the create operation associated with an object. This destroys the buffer /// state, including create operations in progress for all chunks of the object. diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index 64f6033c5..fce8b1134 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -192,4 +192,12 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id, return status; } +std::string ObjectDirectory::DebugString() const { + std::stringstream result; + result << "ObjectDirectory:"; + result << "\n- num listeners: " << listeners_.size(); + result << "\n- num eviction entries: " << object_evictions_.size(); + return result.str(); +} + } // namespace ray diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index e705e1b56..7d6f11710 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -103,6 +103,11 @@ class ObjectDirectoryInterface { /// \return Status of whether this method succeeded. virtual ray::Status ReportObjectRemoved(const ObjectID &object_id, const ClientID &client_id) = 0; + + /// Returns debug string for class. + /// + /// \return string. + virtual std::string DebugString() const = 0; }; /// Ray ObjectDirectory declaration. @@ -140,6 +145,8 @@ class ObjectDirectory : public ObjectDirectoryInterface { ray::Status ReportObjectRemoved(const ObjectID &object_id, const ClientID &client_id) override; + std::string DebugString() const override; + /// ObjectDirectory should not be copied. RAY_DISALLOW_COPY_AND_ASSIGN(ObjectDirectory); diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 1a6d48a68..cf68dd114 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -900,4 +900,19 @@ ProfileTableDataT ObjectManager::GetAndResetProfilingInfo() { return profile_info; } +std::string ObjectManager::DebugString() const { + std::stringstream result; + result << "ObjectManager:"; + result << "\n- num local objects: " << local_objects_.size(); + result << "\n- num active wait requests: " << active_wait_requests_.size(); + result << "\n- num unfulfilled push requests: " << unfulfilled_push_requests_.size(); + result << "\n- num pull requests: " << pull_requests_.size(); + result << "\n- num buffered profile events: " << profile_events_.size(); + result << "\n" << object_directory_->DebugString(); + result << "\n" << store_notification_.DebugString(); + result << "\n" << buffer_pool_.DebugString(); + result << "\n" << connection_pool_.DebugString(); + return result.str(); +} + } // namespace ray diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 59b6bda63..bbbe0603f 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -181,6 +181,11 @@ class ObjectManager : public ObjectManagerInterface { /// to this method. ProfileTableDataT GetAndResetProfilingInfo(); + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; + private: friend class TestObjectManager; diff --git a/src/ray/object_manager/object_store_notification_manager.cc b/src/ray/object_manager/object_store_notification_manager.cc index 361dba49a..e590e8efa 100644 --- a/src/ray/object_manager/object_store_notification_manager.cc +++ b/src/ray/object_manager/object_store_notification_manager.cc @@ -70,12 +70,14 @@ void ObjectStoreNotificationManager::ProcessStoreAdd( for (auto &handler : add_handlers_) { handler(object_info); } + num_adds_processed_++; } void ObjectStoreNotificationManager::ProcessStoreRemove(const ObjectID &object_id) { for (auto &handler : rem_handlers_) { handler(object_id); } + num_removes_processed_++; } void ObjectStoreNotificationManager::SubscribeObjAdded( @@ -88,4 +90,12 @@ void ObjectStoreNotificationManager::SubscribeObjDeleted( rem_handlers_.push_back(std::move(callback)); } +std::string ObjectStoreNotificationManager::DebugString() const { + std::stringstream result; + result << "ObjectStoreNotificationManager:"; + result << "\n- num adds processed: " << num_adds_processed_; + result << "\n- num removes processed: " << num_removes_processed_; + return result.str(); +} + } // namespace ray diff --git a/src/ray/object_manager/object_store_notification_manager.h b/src/ray/object_manager/object_store_notification_manager.h index 0e9c90130..45d131602 100644 --- a/src/ray/object_manager/object_store_notification_manager.h +++ b/src/ray/object_manager/object_store_notification_manager.h @@ -46,6 +46,11 @@ class ObjectStoreNotificationManager { /// \param callback A callback expecting an ObjectID. void SubscribeObjDeleted(std::function callback); + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; + private: /// Async loop for handling object store notifications. void NotificationWait(); @@ -63,6 +68,8 @@ class ObjectStoreNotificationManager { plasma::PlasmaClient store_client_; int c_socket_; int64_t length_; + int64_t num_adds_processed_; + int64_t num_removes_processed_; std::vector notification_; boost::asio::local::stream_protocol::socket socket_; }; diff --git a/src/ray/ray_config.h b/src/ray/ray_config.h index 1b6dee349..da30fdebd 100644 --- a/src/ray/ray_config.h +++ b/src/ray/ray_config.h @@ -20,6 +20,10 @@ class RayConfig { return heartbeat_timeout_milliseconds_; } + int64_t debug_dump_period_milliseconds() const { + return debug_dump_period_milliseconds_; + } + int64_t num_heartbeats_timeout() const { return num_heartbeats_timeout_; } uint64_t num_heartbeats_warning() const { return num_heartbeats_warning_; } @@ -115,6 +119,8 @@ class RayConfig { handler_warning_timeout_ms_ = pair.second; } else if (pair.first == "heartbeat_timeout_milliseconds") { heartbeat_timeout_milliseconds_ = pair.second; + } else if (pair.first == "debug_dump_period_milliseconds") { + debug_dump_period_milliseconds_ = pair.second; } else if (pair.first == "num_heartbeats_timeout") { num_heartbeats_timeout_ = pair.second; } else if (pair.first == "num_heartbeats_warning") { @@ -191,6 +197,7 @@ class RayConfig { heartbeat_timeout_milliseconds_(100), num_heartbeats_timeout_(100), num_heartbeats_warning_(5), + debug_dump_period_milliseconds_(10000), initial_reconstruction_timeout_milliseconds_(10000), get_timeout_milliseconds_(1000), worker_get_request_size_(10000), @@ -244,6 +251,9 @@ class RayConfig { /// handler is drifting. uint64_t num_heartbeats_warning_; + /// The duration between dumping debug info to logs, or -1 to disable. + int64_t debug_dump_period_milliseconds_; + /// The initial period for a task execution lease. The lease will expire this /// many milliseconds after the first acquisition of the lease. Nodes that /// require an object will not try to reconstruct the task until at least diff --git a/src/ray/raylet/actor_registration.cc b/src/ray/raylet/actor_registration.cc index ced923609..45215e004 100644 --- a/src/ray/raylet/actor_registration.cc +++ b/src/ray/raylet/actor_registration.cc @@ -1,5 +1,7 @@ #include "ray/raylet/actor_registration.h" +#include + #include "ray/util/logging.h" namespace ray { @@ -41,6 +43,17 @@ bool ActorRegistration::IsAlive() const { return alive_; } void ActorRegistration::MarkDead() { alive_ = false; } +std::string ActorRegistration::DebugString() const { + std::stringstream result; + if (alive_) { + result << "alive"; + } else { + result << "dead"; + } + result << ", num handles: " << frontier_.size(); + return result.str(); +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/actor_registration.h b/src/ray/raylet/actor_registration.h index bd16e4f46..d5d3f8c39 100644 --- a/src/ray/raylet/actor_registration.h +++ b/src/ray/raylet/actor_registration.h @@ -85,6 +85,11 @@ class ActorRegistration { /// \return Void. void MarkDead(); + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; + private: /// Information from the global actor table about this actor, including the /// node manager location. diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index 24b7ab034..85f87b871 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -1,5 +1,7 @@ #include "lineage_cache.h" +#include + namespace ray { namespace raylet { @@ -421,6 +423,16 @@ bool LineageCache::ContainsTask(const TaskID &task_id) const { size_t LineageCache::NumEntries() const { return lineage_.GetEntries().size(); } +std::string LineageCache::DebugString() const { + std::stringstream result; + result << "LineageCache:"; + result << "\n- committed tasks: " << committed_tasks_.size(); + result << "\n- child map size: " << children_.size(); + result << "\n- num subscribed tasks: " << subscribed_tasks_.size(); + result << "\n- lineage size: " << lineage_.GetEntries().size(); + return result.str(); +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index ad9510dcb..700d18a3b 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -272,6 +272,11 @@ class LineageCache { /// \return The number of entries in the lineage cache. size_t NumEntries() const; + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; + private: /// Flush a task that is in UNCOMMITTED_READY state. void FlushTask(const TaskID &task_id); diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 93ccc28fd..85fa4d0e9 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -20,7 +20,7 @@ int main(int argc, char *argv[]) { ray::RayLogLevel::INFO, /*log_dir=*/""); ray::RayLog::InstallFailureSignalHandler(); - RAY_CHECK(argc == 14 || argc == 15); + RAY_CHECK(argc >= 14 && argc <= 16); const std::string raylet_socket_name = std::string(argv[1]); const std::string store_socket_name = std::string(argv[2]); @@ -35,7 +35,8 @@ int main(int argc, char *argv[]) { const std::string config_list = std::string(argv[11]); const std::string python_worker_command = std::string(argv[12]); const std::string java_worker_command = std::string(argv[13]); - const std::string redis_password = (argc == 15 ? std::string(argv[14]) : ""); + const std::string redis_password = (argc >= 15 ? std::string(argv[14]) : ""); + const std::string temp_dir = (argc >= 16 ? std::string(argv[15]) : "/tmp/ray"); // Configuration for the node manager. ray::raylet::NodeManagerConfig node_manager_config; @@ -91,8 +92,11 @@ int main(int argc, char *argv[]) { node_manager_config.heartbeat_period_ms = RayConfig::instance().heartbeat_timeout_milliseconds(); + node_manager_config.debug_dump_period_ms = + RayConfig::instance().debug_dump_period_milliseconds(); node_manager_config.max_lineage_size = RayConfig::instance().max_lineage_size(); node_manager_config.store_socket_name = store_socket_name; + node_manager_config.temp_dir = temp_dir; // Configuration for the object manager. ray::ObjectManagerConfig object_manager_config; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index c0dd866ad..1f5c0b1f0 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1,5 +1,7 @@ #include "ray/raylet/node_manager.h" +#include + #include "ray/common/common_protocol.h" #include "ray/id.h" #include "ray/raylet/format/node_manager_generated.h" @@ -50,6 +52,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, gcs_client_(gcs_client), heartbeat_timer_(io_service), heartbeat_period_(std::chrono::milliseconds(config.heartbeat_period_ms)), + debug_dump_period_(config.debug_dump_period_ms), + temp_dir_(config.temp_dir), object_manager_profile_timer_(io_service), local_resources_(config.resource_config), local_available_resources_(config.resource_config), @@ -174,6 +178,7 @@ ray::Status NodeManager::RegisterGcs() { // Start sending heartbeats to the GCS. last_heartbeat_at_ms_ = current_time_ms(); + last_debug_dump_at_ms_ = current_time_ms(); Heartbeat(); // Start the timer that gets object manager profiling information and sends it // to the GCS. @@ -276,6 +281,12 @@ void NodeManager::Heartbeat() { } RAY_CHECK_OK(status); + if (debug_dump_period_ > 0 && + static_cast(now_ms - last_debug_dump_at_ms_) > debug_dump_period_) { + DumpDebugState(); + last_debug_dump_at_ms_ = now_ms; + } + // Reset the timer. heartbeat_timer_.expires_from_now(heartbeat_period_); heartbeat_timer_.async_wait([this](const boost::system::error_code &error) { @@ -1655,6 +1666,41 @@ void NodeManager::ForwardTask(const Task &task, const ClientID &node_id, }); } +void NodeManager::DumpDebugState() { + std::fstream fs; + fs.open(temp_dir_ + "/debug_state.txt", std::fstream::out | std::fstream::trunc); + fs << DebugString(); + fs.close(); +} + +std::string NodeManager::DebugString() const { + std::stringstream result; + uint64_t now_ms = current_time_ms(); + result << "NodeManager:"; + result << "\nLocalResources: " << local_resources_.DebugString(); + result << "\nClusterResources:"; + for (auto &pair : cluster_resource_map_) { + result << "\n" << pair.first.hex() << ": " << pair.second.DebugString(); + } + result << "\n" << object_manager_.DebugString(); + result << "\n" << gcs_client_->DebugString(); + result << "\n" << worker_pool_.DebugString(); + result << "\n" << local_queues_.DebugString(); + result << "\n" << reconstruction_policy_.DebugString(); + result << "\n" << task_dependency_manager_.DebugString(); + result << "\n" << lineage_cache_.DebugString(); + result << "\nRemoteConnections:"; + for (auto &pair : remote_server_connections_) { + result << "\n" << pair.first.hex() << ": " << pair.second->DebugString(); + } + result << "\nActorRegistry:"; + for (auto &pair : actor_registry_) { + result << "\n" << pair.first.hex() << ": " << pair.second.DebugString(); + } + result << "\nDebugString() time ms: " << (current_time_ms() - now_ms); + return result.str(); +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 932a754d1..a920bac0f 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -39,10 +39,14 @@ struct NodeManagerConfig { std::unordered_map> worker_commands; /// The time between heartbeats in milliseconds. uint64_t heartbeat_period_ms; + /// The time between debug dumps in milliseconds, or -1 to disable. + uint64_t debug_dump_period_ms; /// the maximum lineage size. uint64_t max_lineage_size; /// The store socket name. std::string store_socket_name; + /// The path to the ray temp dir. + std::string temp_dir; }; class NodeManager { @@ -92,6 +96,11 @@ class NodeManager { /// \return Status indicating whether this was done successfully or not. ray::Status RegisterGcs(); + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; + private: /// Methods for handling clients. @@ -108,6 +117,9 @@ class NodeManager { /// Send heartbeats to the GCS. void Heartbeat(); + /// Write out debug state to a file. + void DumpDebugState(); + /// Get profiling information from the object manager and push it to the GCS. /// /// \return Void. @@ -357,12 +369,18 @@ class NodeManager { boost::asio::steady_timer heartbeat_timer_; /// The period used for the heartbeat timer. std::chrono::milliseconds heartbeat_period_; + /// The period between debug state dumps. + int64_t debug_dump_period_; + /// The path to the ray temp dir. + std::string temp_dir_; /// The timer used to get profiling information from the object manager and /// push it to the GCS. boost::asio::steady_timer object_manager_profile_timer_; /// The time that the last heartbeat was sent at. Used to make sure we are /// keeping up with heartbeats. uint64_t last_heartbeat_at_ms_; + /// The time that the last debug string was logged to the console. + uint64_t last_debug_dump_at_ms_; /// The resources local to this node. const SchedulingResources local_resources_; /// The resources (and specific resource IDs) that are currently available. diff --git a/src/ray/raylet/reconstruction_policy.cc b/src/ray/raylet/reconstruction_policy.cc index c14dd34a0..6abf5b53d 100644 --- a/src/ray/raylet/reconstruction_policy.cc +++ b/src/ray/raylet/reconstruction_policy.cc @@ -203,6 +203,13 @@ void ReconstructionPolicy::Cancel(const ObjectID &object_id) { } } +std::string ReconstructionPolicy::DebugString() const { + std::stringstream result; + result << "ReconstructionPolicy:"; + result << "\n- num reconstructing: " << listening_tasks_.size(); + return result.str(); +} + } // namespace raylet } // end namespace ray diff --git a/src/ray/raylet/reconstruction_policy.h b/src/ray/raylet/reconstruction_policy.h index dfa69ebf5..f18290aa3 100644 --- a/src/ray/raylet/reconstruction_policy.h +++ b/src/ray/raylet/reconstruction_policy.h @@ -71,6 +71,11 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface { /// this timeout, then objects that the task creates may be reconstructed. void HandleTaskLeaseNotification(const TaskID &task_id, int64_t lease_timeout_ms); + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; + private: struct ReconstructionTask { ReconstructionTask(boost::asio::io_service &io_service) diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index ec117cd1c..4062511ae 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -36,6 +36,8 @@ class MockObjectDirectory : public ObjectDirectoryInterface { locations_[object_id] = locations; } + std::string DebugString() const { return ""; } + MOCK_METHOD0(RegisterBackend, void(void)); MOCK_CONST_METHOD1(LookupRemoteConnectionInfo, void(RemoteConnectionInfo &)); MOCK_CONST_METHOD0(LookupAllRemoteConnections, std::vector()); diff --git a/src/ray/raylet/scheduling_queue.cc b/src/ray/raylet/scheduling_queue.cc index 7f3a412e9..35d6bcfd3 100644 --- a/src/ray/raylet/scheduling_queue.cc +++ b/src/ray/raylet/scheduling_queue.cc @@ -1,5 +1,7 @@ #include "scheduling_queue.h" +#include + #include "ray/status.h" namespace { @@ -346,22 +348,17 @@ const std::unordered_set &SchedulingQueue::GetDriverTaskIds() const { return driver_task_ids_; } -const std::string SchedulingQueue::ToString() const { - std::string result; - - result += "placeable_tasks_ size is " + - std::to_string(placeable_tasks_.GetTasks().size()) + "\n"; - result += - "waiting_tasks_ size is " + std::to_string(waiting_tasks_.GetTasks().size()) + "\n"; - result += - "ready_tasks_ size is " + std::to_string(ready_tasks_.GetTasks().size()) + "\n"; - result += - "running_tasks_ size is " + std::to_string(running_tasks_.GetTasks().size()) + "\n"; - result += "infeasible_tasks_ size is " + - std::to_string(infeasible_tasks_.GetTasks().size()) + "\n"; - result += "methods_waiting_for_actor_creation_ size is " + - std::to_string(methods_waiting_for_actor_creation_.GetTasks().size()) + "\n"; - return result; +std::string SchedulingQueue::DebugString() const { + std::stringstream result; + result << "SchedulingQueue:"; + result << "\n- num placeable tasks: " << placeable_tasks_.GetTasks().size(); + result << "\n- num waiting tasks: " << waiting_tasks_.GetTasks().size(); + result << "\n- num ready tasks: " << ready_tasks_.GetTasks().size(); + result << "\n- num running tasks: " << running_tasks_.GetTasks().size(); + result << "\n- num infeasible tasks: " << infeasible_tasks_.GetTasks().size(); + result << "\n- num methods waiting for actor creation: " + << methods_waiting_for_actor_creation_.GetTasks().size(); + return result.str(); } } // namespace raylet diff --git a/src/ray/raylet/scheduling_queue.h b/src/ray/raylet/scheduling_queue.h index a360edf64..f3251ebca 100644 --- a/src/ray/raylet/scheduling_queue.h +++ b/src/ray/raylet/scheduling_queue.h @@ -214,13 +214,6 @@ class SchedulingQueue { /// \return Aggregate resource demand from ready tasks. ResourceSet GetReadyQueueResources() const; - /// Return a human-readable string indicating the number of tasks in each - /// queue. - /// - /// \return A string that can be used to display the contents of the queues - /// for debugging purposes. - const std::string ToString() const; - class TaskQueue { public: /// Creating a task queue. @@ -274,6 +267,11 @@ class SchedulingQueue { ResourceSet current_resource_load_; }; + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; + private: /// Tasks that are destined for actors that have not yet been created. TaskQueue methods_waiting_for_actor_creation_; diff --git a/src/ray/raylet/scheduling_resources.cc b/src/ray/raylet/scheduling_resources.cc index e9638162d..ddd75167c 100644 --- a/src/ray/raylet/scheduling_resources.cc +++ b/src/ray/raylet/scheduling_resources.cc @@ -1,6 +1,7 @@ #include "scheduling_resources.h" #include +#include #include "ray/util/logging.h" @@ -512,6 +513,13 @@ bool SchedulingResources::Acquire(const ResourceSet &resources) { return resources_available_.SubtractResourcesStrict(resources); } +std::string SchedulingResources::DebugString() const { + std::stringstream result; + result << "\n- total: " << resources_total_.ToString(); + result << "\n- avail: " << resources_available_.ToString(); + return result.str(); +}; + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/scheduling_resources.h b/src/ray/raylet/scheduling_resources.h index bac293366..ba6fce2da 100644 --- a/src/ray/raylet/scheduling_resources.h +++ b/src/ray/raylet/scheduling_resources.h @@ -374,6 +374,11 @@ class SchedulingResources { /// negative resources. bool Acquire(const ResourceSet &resources); + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; + private: /// Static resource configuration (e.g., static_resources). ResourceSet resources_total_; diff --git a/src/ray/raylet/task_dependency_manager.cc b/src/ray/raylet/task_dependency_manager.cc index df4546efa..339f70da1 100644 --- a/src/ray/raylet/task_dependency_manager.cc +++ b/src/ray/raylet/task_dependency_manager.cc @@ -325,6 +325,17 @@ void TaskDependencyManager::RemoveTasksAndRelatedObjects( } } +std::string TaskDependencyManager::DebugString() const { + std::stringstream result; + result << "TaskDependencyManager:"; + result << "\n- task dep map size: " << task_dependencies_.size(); + result << "\n- task req map size: " << required_tasks_.size(); + result << "\n- req objects map size: " << required_objects_.size(); + result << "\n- local objects map size: " << local_objects_.size(); + result << "\n- pending tasks map size: " << pending_tasks_.size(); + return result.str(); +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/task_dependency_manager.h b/src/ray/raylet/task_dependency_manager.h index 84c47bd16..5caae6663 100644 --- a/src/ray/raylet/task_dependency_manager.h +++ b/src/ray/raylet/task_dependency_manager.h @@ -111,6 +111,11 @@ class TaskDependencyManager { /// \param task_ids The collection of task IDs. void RemoveTasksAndRelatedObjects(const std::unordered_set &task_ids); + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; + private: using ObjectDependencyMap = std::unordered_map>; diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 7ed4a1408..4f4807275 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -256,6 +256,16 @@ std::vector> WorkerPool::GetWorkersRunningTasksForDriver return workers; } +std::string WorkerPool::DebugString() const { + std::stringstream result; + result << "WorkerPool:"; + for (const auto &entry : states_by_lang_) { + result << "\n- num workers: " << entry.second.registered_workers.size(); + result << "\n- num drivers: " << entry.second.registered_drivers.size(); + } + return result.str(); +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 528cc917d..78e0642ab 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -118,6 +118,11 @@ class WorkerPool { std::vector> GetWorkersRunningTasksForDriver( const DriverID &driver_id) const; + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const; + protected: /// A map from the pids of starting worker processes /// to the number of their unregistered workers.