diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 7105f51da..f8c109de7 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -243,6 +243,8 @@ RAY_CONFIG(uint32_t, gcs_create_placement_group_retry_interval_ms, 200) RAY_CONFIG(uint32_t, maximum_gcs_destroyed_actor_cached_count, 100000) /// Maximum number of dead nodes in GCS server memory cache. RAY_CONFIG(uint32_t, maximum_gcs_dead_node_cached_count, 1000) +/// The interval at which the gcs server will print debug info. +RAY_CONFIG(int64_t, gcs_dump_debug_log_interval_minutes, 1) /// Maximum number of times to retry putting an object when the plasma store is full. /// Can be set to -1 to enable unlimited retries. diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 5ec42193e..3ba2ca0ac 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -120,6 +120,7 @@ void GcsActorManager::HandleRegisterActor(const rpc::RegisterActorRequest &reque << ", job id = " << actor_id.JobId() << ", actor id = " << actor_id; GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); } + ++counts_[CountType::REGISTER_ACTOR_REQUEST]; } void GcsActorManager::HandleCreateActor(const rpc::CreateActorRequest &request, @@ -142,6 +143,7 @@ void GcsActorManager::HandleCreateActor(const rpc::CreateActorRequest &request, << ", actor id = " << actor_id << ", status: " << status.ToString(); GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); } + ++counts_[CountType::CREATE_ACTOR_REQUEST]; } void GcsActorManager::HandleGetActorInfo(const rpc::GetActorInfoRequest &request, @@ -166,6 +168,7 @@ void GcsActorManager::HandleGetActorInfo(const rpc::GetActorInfoRequest &request RAY_LOG(DEBUG) << "Finished getting actor info, job id = " << actor_id.JobId() << ", actor id = " << actor_id; GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + ++counts_[CountType::GET_ACTOR_INFO_REQUEST]; } void GcsActorManager::HandleGetAllActorInfo(const rpc::GetAllActorInfoRequest &request, @@ -181,6 +184,7 @@ void GcsActorManager::HandleGetAllActorInfo(const rpc::GetAllActorInfoRequest &r } RAY_LOG(DEBUG) << "Finished getting all actor info."; GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + ++counts_[CountType::GET_ALL_ACTOR_INFO_REQUEST]; } void GcsActorManager::HandleGetNamedActorInfo( @@ -207,6 +211,7 @@ void GcsActorManager::HandleGetNamedActorInfo( << ", actor id = " << actor_id; } GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); + ++counts_[CountType::GET_NAMED_ACTOR_INFO_REQUEST]; } void GcsActorManager::HandleRegisterActorInfo( const rpc::RegisterActorInfoRequest &request, rpc::RegisterActorInfoReply *reply, @@ -234,6 +239,7 @@ void GcsActorManager::HandleRegisterActorInfo( if (!status.ok()) { on_done(status); } + ++counts_[CountType::REGISTER_ACTOR_INFO_REQUEST]; } void GcsActorManager::HandleUpdateActorInfo(const rpc::UpdateActorInfoRequest &request, @@ -262,6 +268,7 @@ void GcsActorManager::HandleUpdateActorInfo(const rpc::UpdateActorInfoRequest &r if (!status.ok()) { on_done(status); } + ++counts_[CountType::UPDATE_ACTOR_INFO_REQUEST]; } void GcsActorManager::HandleAddActorCheckpoint( @@ -310,6 +317,7 @@ void GcsActorManager::HandleAddActorCheckpoint( if (!status.ok()) { on_done(status); } + ++counts_[CountType::ADD_ACTOR_CHECKPOINT_REQUEST]; } void GcsActorManager::HandleGetActorCheckpoint( @@ -341,6 +349,7 @@ void GcsActorManager::HandleGetActorCheckpoint( if (!status.ok()) { on_done(status, boost::none); } + ++counts_[CountType::GET_ACTOR_CHECKPOINT_REQUEST]; } void GcsActorManager::HandleGetActorCheckpointID( @@ -369,6 +378,7 @@ void GcsActorManager::HandleGetActorCheckpointID( if (!status.ok()) { on_done(status, boost::none); } + ++counts_[CountType::GET_ACTOR_CHECKPOINT_ID_REQUEST]; } Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &request, @@ -1137,5 +1147,32 @@ void GcsActorManager::AddDestroyedActorToCache(const std::shared_ptr & actor->GetActorID(), (int64_t)actor->GetActorTableData().timestamp()); } +std::string GcsActorManager::DebugString() const { + std::ostringstream stream; + stream << "GcsActorManager: {RegisterActor request count: " + << counts_[CountType::REGISTER_ACTOR_REQUEST] + << ", CreateActor request count: " << counts_[CountType::CREATE_ACTOR_REQUEST] + << ", GetActorInfo request count: " << counts_[CountType::GET_ACTOR_INFO_REQUEST] + << ", GetNamedActorInfo request count: " + << counts_[CountType::GET_NAMED_ACTOR_INFO_REQUEST] + << ", RegisterActorInfo request count: " + << counts_[CountType::REGISTER_ACTOR_INFO_REQUEST] + << ", UpdateActorInfo request count: " + << counts_[CountType::UPDATE_ACTOR_INFO_REQUEST] + << ", AddActorCheckpoint request count: " + << counts_[CountType::ADD_ACTOR_CHECKPOINT_REQUEST] + << ", GetActorCheckpoint request count: " + << counts_[CountType::GET_ACTOR_CHECKPOINT_REQUEST] + << ", GetActorCheckpointID request count: " + << counts_[CountType::GET_ACTOR_CHECKPOINT_ID_REQUEST] + << ", Registered actors count: " << registered_actors_.size() + << ", Destroyed actors count: " << destroyed_actors_.size() + << ", Named actors count: " << named_actors_.size() + << ", Unresolved actors count: " << unresolved_actors_.size() + << ", Pending actors count: " << pending_actors_.size() + << ", Created actors count: " << created_actors_.size() << "}"; + return stream.str(); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index e64fe431f..7731f4eec 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -302,6 +302,8 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// Collect stats from gcs actor manager in-memory data structures. void CollectStats() const; + std::string DebugString() const; + private: /// A data structure representing an actor's owner. struct Owner { @@ -413,6 +415,22 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// This method MUST BE IDEMPOTENT because it can be called multiple times during /// actor destroy process. std::function destroy_owned_placement_group_if_needed_; + + // Debug info. + enum CountType { + REGISTER_ACTOR_REQUEST = 0, + CREATE_ACTOR_REQUEST = 1, + GET_ACTOR_INFO_REQUEST = 2, + GET_NAMED_ACTOR_INFO_REQUEST = 3, + GET_ALL_ACTOR_INFO_REQUEST = 4, + REGISTER_ACTOR_INFO_REQUEST = 5, + UPDATE_ACTOR_INFO_REQUEST = 6, + ADD_ACTOR_CHECKPOINT_REQUEST = 7, + GET_ACTOR_CHECKPOINT_REQUEST = 8, + GET_ACTOR_CHECKPOINT_ID_REQUEST = 9, + CountType_MAX = 10, + }; + uint64_t counts_[CountType::CountType_MAX] = {0}; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 6f35ec2b4..0d937e074 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -146,6 +146,7 @@ void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request, }; RAY_CHECK_OK( gcs_table_storage_->NodeTable().Put(node_id, request.node_info(), on_done)); + ++counts_[CountType::REGISTER_NODE_REQUEST]; } void GcsNodeManager::HandleUnregisterNode(const rpc::UnregisterNodeRequest &request, @@ -172,6 +173,7 @@ void GcsNodeManager::HandleUnregisterNode(const rpc::UnregisterNodeRequest &requ // Update node state to DEAD instead of deleting it. RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_done)); } + ++counts_[CountType::UNREGISTER_NODE_REQUEST]; } void GcsNodeManager::HandleGetAllNodeInfo(const rpc::GetAllNodeInfoRequest &request, @@ -184,6 +186,7 @@ void GcsNodeManager::HandleGetAllNodeInfo(const rpc::GetAllNodeInfoRequest &requ reply->add_node_info_list()->CopyFrom(*entry.second); } GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + ++counts_[CountType::GET_ALL_NODE_INFO_REQUEST]; } void GcsNodeManager::HandleReportHeartbeat(const rpc::ReportHeartbeatRequest &request, @@ -210,6 +213,7 @@ void GcsNodeManager::HandleReportHeartbeat(const rpc::ReportHeartbeatRequest &re node_failure_detector_service_.post( [this, node_id] { node_failure_detector_->HandleHeartbeat(node_id); }); GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + ++counts_[CountType::REPORT_HEARTBEAT_REQUEST]; } void GcsNodeManager::HandleGetResources(const rpc::GetResourcesRequest &request, @@ -223,6 +227,7 @@ void GcsNodeManager::HandleGetResources(const rpc::GetResourcesRequest &request, } } GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + ++counts_[CountType::GET_RESOURCES_REQUEST]; } void GcsNodeManager::HandleUpdateResources(const rpc::UpdateResourcesRequest &request, @@ -260,6 +265,7 @@ void GcsNodeManager::HandleUpdateResources(const rpc::UpdateResourcesRequest &re RAY_LOG(ERROR) << "Failed to update resources as node " << node_id << " is not registered."; } + ++counts_[CountType::UPDATE_RESOURCES_REQUEST]; } void GcsNodeManager::HandleDeleteResources(const rpc::DeleteResourcesRequest &request, @@ -293,6 +299,7 @@ void GcsNodeManager::HandleDeleteResources(const rpc::DeleteResourcesRequest &re GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); RAY_LOG(DEBUG) << "Finished deleting node resources, node id = " << node_id; } + ++counts_[CountType::DELETE_RESOURCES_REQUEST]; } void GcsNodeManager::HandleSetInternalConfig(const rpc::SetInternalConfigRequest &request, @@ -304,6 +311,7 @@ void GcsNodeManager::HandleSetInternalConfig(const rpc::SetInternalConfigRequest }; RAY_CHECK_OK(gcs_table_storage_->InternalConfigTable().Put(UniqueID::Nil(), request.config(), on_done)); + ++counts_[CountType::SET_INTERNAL_CONFIG_REQUEST]; } void GcsNodeManager::HandleGetInternalConfig(const rpc::GetInternalConfigRequest &request, @@ -319,6 +327,7 @@ void GcsNodeManager::HandleGetInternalConfig(const rpc::GetInternalConfigRequest }; RAY_CHECK_OK( gcs_table_storage_->InternalConfigTable().Get(UniqueID::Nil(), get_system_config)); + ++counts_[CountType::GET_INTERNAL_CONFIG_REQUEST]; } void GcsNodeManager::HandleGetAllAvailableResources( @@ -334,6 +343,7 @@ void GcsNodeManager::HandleGetAllAvailableResources( reply->add_resources_list()->CopyFrom(resource); } GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + ++counts_[CountType::GET_ALL_AVAILABLE_RESOURCES_REQUEST]; } void GcsNodeManager::HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &request, @@ -383,6 +393,7 @@ void GcsNodeManager::HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &re } GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + ++counts_[CountType::GET_ALL_HEARTBEAT_REQUEST]; } void GcsNodeManager::UpdateNodeHeartbeat(const NodeID node_id, @@ -580,5 +591,30 @@ void GcsNodeManager::SendBatchedHeartbeat() { }); } +std::string GcsNodeManager::DebugString() const { + std::ostringstream stream; + stream << "GcsNodeManager: {RegisterNode request count: " + << counts_[CountType::REGISTER_NODE_REQUEST] + << ", UnregisterNode request count: " + << counts_[CountType::UNREGISTER_NODE_REQUEST] + << ", GetAllNodeInfo request count: " + << counts_[CountType::GET_ALL_NODE_INFO_REQUEST] + << ", ReportHeartbeat request count: " + << counts_[CountType::REPORT_HEARTBEAT_REQUEST] + << ", GetHeartbeat request count: " << counts_[CountType::GET_HEARTBEAT_REQUEST] + << ", GetAllHeartbeat request count: " + << counts_[CountType::GET_ALL_HEARTBEAT_REQUEST] + << ", GetResources request count: " << counts_[CountType::GET_RESOURCES_REQUEST] + << ", UpdateResources request count: " + << counts_[CountType::UPDATE_RESOURCES_REQUEST] + << ", DeleteResources request count: " + << counts_[CountType::DELETE_RESOURCES_REQUEST] + << ", SetInternalConfig request count: " + << counts_[CountType::SET_INTERNAL_CONFIG_REQUEST] + << ", GetInternalConfig request count: " + << counts_[CountType::GET_INTERNAL_CONFIG_REQUEST] << "}"; + return stream.str(); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index 6a5a2da89..00f0d11bc 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -176,6 +176,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler { void UpdatePlacementGroupLoad( const std::shared_ptr placement_group_load); + std::string DebugString() const; + protected: class NodeFailureDetector { public: @@ -287,6 +289,24 @@ class GcsNodeManager : public rpc::NodeInfoHandler { absl::flat_hash_map> cluster_realtime_resources_; /// Placement group load information that is used for autoscaler. absl::optional> placement_group_load_; + + // Debug info. + enum CountType { + REGISTER_NODE_REQUEST = 0, + UNREGISTER_NODE_REQUEST = 1, + GET_ALL_NODE_INFO_REQUEST = 2, + REPORT_HEARTBEAT_REQUEST = 3, + GET_HEARTBEAT_REQUEST = 4, + GET_ALL_HEARTBEAT_REQUEST = 5, + GET_RESOURCES_REQUEST = 6, + UPDATE_RESOURCES_REQUEST = 7, + DELETE_RESOURCES_REQUEST = 8, + SET_INTERNAL_CONFIG_REQUEST = 9, + GET_INTERNAL_CONFIG_REQUEST = 10, + GET_ALL_AVAILABLE_RESOURCES_REQUEST = 11, + CountType_MAX = 12, + }; + uint64_t counts_[CountType::CountType_MAX] = {0}; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_object_manager.cc b/src/ray/gcs/gcs_server/gcs_object_manager.cc index 2528f6006..1e15987bd 100644 --- a/src/ray/gcs/gcs_server/gcs_object_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_object_manager.cc @@ -35,6 +35,7 @@ void GcsObjectManager::HandleGetObjectLocations( RAY_LOG(DEBUG) << "Finished getting object locations, job id = " << object_id.TaskId().JobId() << ", object id = " << object_id; GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + ++counts_[CountType::GET_OBJECT_LOCATIONS_REQUEST]; } void GcsObjectManager::HandleGetAllObjectLocations( @@ -54,6 +55,7 @@ void GcsObjectManager::HandleGetAllObjectLocations( } RAY_LOG(DEBUG) << "Finished getting all object locations."; GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + ++counts_[CountType::GET_ALL_OBJECT_LOCATIONS_REQUEST]; } void GcsObjectManager::HandleAddObjectLocation( @@ -107,6 +109,7 @@ void GcsObjectManager::HandleAddObjectLocation( if (!status.ok()) { on_done(status); } + ++counts_[CountType::ADD_OBJECT_LOCATION_REQUEST]; } void GcsObjectManager::HandleRemoveObjectLocation( @@ -152,6 +155,7 @@ void GcsObjectManager::HandleRemoveObjectLocation( if (!status.ok()) { on_done(status); } + ++counts_[CountType::REMOVE_OBJECT_LOCATION_REQUEST]; } void GcsObjectManager::AddObjectsLocation( @@ -304,6 +308,21 @@ void GcsObjectManager::LoadInitialData(const EmptyCallback &done) { RAY_CHECK_OK(gcs_table_storage_->ObjectTable().GetAll(callback)); } +std::string GcsObjectManager::DebugString() const { + absl::MutexLock lock(&mutex_); + std::ostringstream stream; + stream << "GcsObjectManager: {GetObjectLocations request count: " + << counts_[CountType::GET_OBJECT_LOCATIONS_REQUEST] + << ", GetAllObjectLocations request count: " + << counts_[CountType::GET_ALL_OBJECT_LOCATIONS_REQUEST] + << ", AddObjectLocation request count: " + << counts_[CountType::ADD_OBJECT_LOCATION_REQUEST] + << ", RemoveObjectLocation request count: " + << counts_[CountType::REMOVE_OBJECT_LOCATION_REQUEST] + << ", Object count: " << object_to_locations_.size() << "}"; + return stream.str(); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_object_manager.h b/src/ray/gcs/gcs_server/gcs_object_manager.h index dc5249c1c..b1602d824 100644 --- a/src/ray/gcs/gcs_server/gcs_object_manager.h +++ b/src/ray/gcs/gcs_server/gcs_object_manager.h @@ -59,6 +59,8 @@ class GcsObjectManager : public rpc::ObjectInfoHandler { /// \param done Callback that will be called when load is complete. void LoadInitialData(const EmptyCallback &done); + std::string DebugString() const; + protected: struct LocationSet { absl::flat_hash_set locations; @@ -138,6 +140,16 @@ class GcsObjectManager : public rpc::ObjectInfoHandler { std::shared_ptr gcs_table_storage_; std::shared_ptr gcs_pub_sub_; + + // Debug info. + enum CountType { + GET_OBJECT_LOCATIONS_REQUEST = 0, + GET_ALL_OBJECT_LOCATIONS_REQUEST = 1, + ADD_OBJECT_LOCATION_REQUEST = 2, + REMOVE_OBJECT_LOCATION_REQUEST = 3, + CountType_MAX = 4, + }; + uint64_t counts_[CountType::CountType_MAX] = {0}; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index ffba3eb0a..8a50a1269 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -126,6 +126,9 @@ void GcsServer::Start() { }; gcs_object_manager_->LoadInitialData(on_done); gcs_node_manager_->LoadInitialData(on_done); + + // Print debug info periodically. + PrintDebugInfo(); } void GcsServer::Stop() { @@ -296,5 +299,19 @@ void GcsServer::CollectStats() { (RayConfig::instance().metrics_report_interval_ms() / 2) /* milliseconds */); } +void GcsServer::PrintDebugInfo() { + std::ostringstream stream; + stream << gcs_node_manager_->DebugString() << "\n" + << gcs_actor_manager_->DebugString() << "\n" + << gcs_object_manager_->DebugString() << "\n" + << ((rpc::DefaultTaskInfoHandler *)task_info_handler_.get())->DebugString(); + // TODO(ffbin): We will get the session_dir in the next PR, and write the log to + // gcs_debug_state.txt. + RAY_LOG(INFO) << stream.str(); + execute_after(main_service_, [this] { PrintDebugInfo(); }, + (RayConfig::instance().gcs_dump_debug_log_interval_minutes() * + 60000) /* milliseconds */); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index f1e799139..7b4e2e3bc 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -113,6 +113,9 @@ class GcsServer { /// Collect stats from each module for every (metrics_report_interval_ms / 2) ms. void CollectStats(); + /// Print debug info periodically. + void PrintDebugInfo(); + /// Gcs server configuration GcsServerConfig config_; /// The main io service to drive event posted from grpc threads. diff --git a/src/ray/gcs/gcs_server/task_info_handler_impl.cc b/src/ray/gcs/gcs_server/task_info_handler_impl.cc index 794b6e653..7034c87a5 100644 --- a/src/ray/gcs/gcs_server/task_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/task_info_handler_impl.cc @@ -42,6 +42,7 @@ void DefaultTaskInfoHandler::HandleAddTask(const AddTaskRequest &request, if (!status.ok()) { on_done(status); } + ++counts_[CountType::ADD_TASK_REQUEST]; } void DefaultTaskInfoHandler::HandleGetTask(const GetTaskRequest &request, @@ -64,6 +65,7 @@ void DefaultTaskInfoHandler::HandleGetTask(const GetTaskRequest &request, if (!status.ok()) { on_done(status, boost::none); } + ++counts_[CountType::GET_TASK_REQUEST]; } void DefaultTaskInfoHandler::HandleDeleteTasks(const DeleteTasksRequest &request, @@ -87,6 +89,7 @@ void DefaultTaskInfoHandler::HandleDeleteTasks(const DeleteTasksRequest &request } RAY_LOG(DEBUG) << "Finished deleting tasks, job id = " << job_id << ", task id list size = " << task_ids.size(); + ++counts_[CountType::DELETE_TASKS_REQUEST]; } void DefaultTaskInfoHandler::HandleAddTaskLease(const AddTaskLeaseRequest &request, @@ -116,6 +119,7 @@ void DefaultTaskInfoHandler::HandleAddTaskLease(const AddTaskLeaseRequest &reque if (!status.ok()) { on_done(status); } + ++counts_[CountType::ADD_TASK_LEASE_REQUEST]; } void DefaultTaskInfoHandler::HandleGetTaskLease(const GetTaskLeaseRequest &request, @@ -138,6 +142,7 @@ void DefaultTaskInfoHandler::HandleGetTaskLease(const GetTaskLeaseRequest &reque if (!status.ok()) { on_done(status, boost::none); } + ++counts_[CountType::GET_TASK_LEASE_REQUEST]; } void DefaultTaskInfoHandler::HandleAttemptTaskReconstruction( @@ -170,6 +175,20 @@ void DefaultTaskInfoHandler::HandleAttemptTaskReconstruction( if (!status.ok()) { on_done(status); } + ++counts_[CountType::ATTEMPT_TASK_RECONSTRUCTION_REQUEST]; +} + +std::string DefaultTaskInfoHandler::DebugString() const { + std::ostringstream stream; + stream << "DefaultTaskInfoHandler: {AddTask request count: " + << counts_[CountType::ADD_TASK_REQUEST] + << ", GetTask request count: " << counts_[CountType::GET_TASK_REQUEST] + << ", DeleteTasks request count: " << counts_[CountType::DELETE_TASKS_REQUEST] + << ", AddTaskLease request count: " << counts_[CountType::ADD_TASK_LEASE_REQUEST] + << ", GetTaskLease request count: " << counts_[CountType::GET_TASK_LEASE_REQUEST] + << ", AttemptTaskReconstruction request count: " + << counts_[CountType::ATTEMPT_TASK_RECONSTRUCTION_REQUEST] << "}"; + return stream.str(); } } // namespace rpc diff --git a/src/ray/gcs/gcs_server/task_info_handler_impl.h b/src/ray/gcs/gcs_server/task_info_handler_impl.h index 0b6538a59..98cd64bda 100644 --- a/src/ray/gcs/gcs_server/task_info_handler_impl.h +++ b/src/ray/gcs/gcs_server/task_info_handler_impl.h @@ -48,9 +48,23 @@ class DefaultTaskInfoHandler : public rpc::TaskInfoHandler { AttemptTaskReconstructionReply *reply, SendReplyCallback send_reply_callback) override; + std::string DebugString() const; + private: std::shared_ptr gcs_table_storage_; std::shared_ptr &gcs_pub_sub_; + + // Debug info. + enum CountType { + ADD_TASK_REQUEST = 0, + GET_TASK_REQUEST = 1, + DELETE_TASKS_REQUEST = 2, + ADD_TASK_LEASE_REQUEST = 3, + GET_TASK_LEASE_REQUEST = 4, + ATTEMPT_TASK_RECONSTRUCTION_REQUEST = 5, + CountType_MAX = 6, + }; + uint64_t counts_[CountType::CountType_MAX] = {0}; }; } // namespace rpc