mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 18:06:25 +08:00
[GCS]Add gcs dump log(Part1) (#11727)
* add part code * fix compile bug * Fix bug * Add part code * fix review comment * fix review comment * fix lint error * fix review comment Co-authored-by: 灵洵 <fengbin.ffb@antfin.com> Co-authored-by: 灵洵 <fengbin.ffb@antgroup.com>
This commit is contained in:
@@ -243,6 +243,8 @@ RAY_CONFIG(uint32_t, gcs_create_placement_group_retry_interval_ms, 200)
|
||||
RAY_CONFIG(uint32_t, maximum_gcs_destroyed_actor_cached_count, 100000)
|
||||
/// Maximum number of dead nodes in GCS server memory cache.
|
||||
RAY_CONFIG(uint32_t, maximum_gcs_dead_node_cached_count, 1000)
|
||||
/// The interval at which the gcs server will print debug info.
|
||||
RAY_CONFIG(int64_t, gcs_dump_debug_log_interval_minutes, 1)
|
||||
|
||||
/// Maximum number of times to retry putting an object when the plasma store is full.
|
||||
/// Can be set to -1 to enable unlimited retries.
|
||||
|
||||
@@ -120,6 +120,7 @@ void GcsActorManager::HandleRegisterActor(const rpc::RegisterActorRequest &reque
|
||||
<< ", job id = " << actor_id.JobId() << ", actor id = " << actor_id;
|
||||
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
|
||||
}
|
||||
++counts_[CountType::REGISTER_ACTOR_REQUEST];
|
||||
}
|
||||
|
||||
void GcsActorManager::HandleCreateActor(const rpc::CreateActorRequest &request,
|
||||
@@ -142,6 +143,7 @@ void GcsActorManager::HandleCreateActor(const rpc::CreateActorRequest &request,
|
||||
<< ", actor id = " << actor_id << ", status: " << status.ToString();
|
||||
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
|
||||
}
|
||||
++counts_[CountType::CREATE_ACTOR_REQUEST];
|
||||
}
|
||||
|
||||
void GcsActorManager::HandleGetActorInfo(const rpc::GetActorInfoRequest &request,
|
||||
@@ -166,6 +168,7 @@ void GcsActorManager::HandleGetActorInfo(const rpc::GetActorInfoRequest &request
|
||||
RAY_LOG(DEBUG) << "Finished getting actor info, job id = " << actor_id.JobId()
|
||||
<< ", actor id = " << actor_id;
|
||||
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
|
||||
++counts_[CountType::GET_ACTOR_INFO_REQUEST];
|
||||
}
|
||||
|
||||
void GcsActorManager::HandleGetAllActorInfo(const rpc::GetAllActorInfoRequest &request,
|
||||
@@ -181,6 +184,7 @@ void GcsActorManager::HandleGetAllActorInfo(const rpc::GetAllActorInfoRequest &r
|
||||
}
|
||||
RAY_LOG(DEBUG) << "Finished getting all actor info.";
|
||||
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
|
||||
++counts_[CountType::GET_ALL_ACTOR_INFO_REQUEST];
|
||||
}
|
||||
|
||||
void GcsActorManager::HandleGetNamedActorInfo(
|
||||
@@ -207,6 +211,7 @@ void GcsActorManager::HandleGetNamedActorInfo(
|
||||
<< ", actor id = " << actor_id;
|
||||
}
|
||||
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
|
||||
++counts_[CountType::GET_NAMED_ACTOR_INFO_REQUEST];
|
||||
}
|
||||
void GcsActorManager::HandleRegisterActorInfo(
|
||||
const rpc::RegisterActorInfoRequest &request, rpc::RegisterActorInfoReply *reply,
|
||||
@@ -234,6 +239,7 @@ void GcsActorManager::HandleRegisterActorInfo(
|
||||
if (!status.ok()) {
|
||||
on_done(status);
|
||||
}
|
||||
++counts_[CountType::REGISTER_ACTOR_INFO_REQUEST];
|
||||
}
|
||||
|
||||
void GcsActorManager::HandleUpdateActorInfo(const rpc::UpdateActorInfoRequest &request,
|
||||
@@ -262,6 +268,7 @@ void GcsActorManager::HandleUpdateActorInfo(const rpc::UpdateActorInfoRequest &r
|
||||
if (!status.ok()) {
|
||||
on_done(status);
|
||||
}
|
||||
++counts_[CountType::UPDATE_ACTOR_INFO_REQUEST];
|
||||
}
|
||||
|
||||
void GcsActorManager::HandleAddActorCheckpoint(
|
||||
@@ -310,6 +317,7 @@ void GcsActorManager::HandleAddActorCheckpoint(
|
||||
if (!status.ok()) {
|
||||
on_done(status);
|
||||
}
|
||||
++counts_[CountType::ADD_ACTOR_CHECKPOINT_REQUEST];
|
||||
}
|
||||
|
||||
void GcsActorManager::HandleGetActorCheckpoint(
|
||||
@@ -341,6 +349,7 @@ void GcsActorManager::HandleGetActorCheckpoint(
|
||||
if (!status.ok()) {
|
||||
on_done(status, boost::none);
|
||||
}
|
||||
++counts_[CountType::GET_ACTOR_CHECKPOINT_REQUEST];
|
||||
}
|
||||
|
||||
void GcsActorManager::HandleGetActorCheckpointID(
|
||||
@@ -369,6 +378,7 @@ void GcsActorManager::HandleGetActorCheckpointID(
|
||||
if (!status.ok()) {
|
||||
on_done(status, boost::none);
|
||||
}
|
||||
++counts_[CountType::GET_ACTOR_CHECKPOINT_ID_REQUEST];
|
||||
}
|
||||
|
||||
Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &request,
|
||||
@@ -1137,5 +1147,32 @@ void GcsActorManager::AddDestroyedActorToCache(const std::shared_ptr<GcsActor> &
|
||||
actor->GetActorID(), (int64_t)actor->GetActorTableData().timestamp());
|
||||
}
|
||||
|
||||
std::string GcsActorManager::DebugString() const {
|
||||
std::ostringstream stream;
|
||||
stream << "GcsActorManager: {RegisterActor request count: "
|
||||
<< counts_[CountType::REGISTER_ACTOR_REQUEST]
|
||||
<< ", CreateActor request count: " << counts_[CountType::CREATE_ACTOR_REQUEST]
|
||||
<< ", GetActorInfo request count: " << counts_[CountType::GET_ACTOR_INFO_REQUEST]
|
||||
<< ", GetNamedActorInfo request count: "
|
||||
<< counts_[CountType::GET_NAMED_ACTOR_INFO_REQUEST]
|
||||
<< ", RegisterActorInfo request count: "
|
||||
<< counts_[CountType::REGISTER_ACTOR_INFO_REQUEST]
|
||||
<< ", UpdateActorInfo request count: "
|
||||
<< counts_[CountType::UPDATE_ACTOR_INFO_REQUEST]
|
||||
<< ", AddActorCheckpoint request count: "
|
||||
<< counts_[CountType::ADD_ACTOR_CHECKPOINT_REQUEST]
|
||||
<< ", GetActorCheckpoint request count: "
|
||||
<< counts_[CountType::GET_ACTOR_CHECKPOINT_REQUEST]
|
||||
<< ", GetActorCheckpointID request count: "
|
||||
<< counts_[CountType::GET_ACTOR_CHECKPOINT_ID_REQUEST]
|
||||
<< ", Registered actors count: " << registered_actors_.size()
|
||||
<< ", Destroyed actors count: " << destroyed_actors_.size()
|
||||
<< ", Named actors count: " << named_actors_.size()
|
||||
<< ", Unresolved actors count: " << unresolved_actors_.size()
|
||||
<< ", Pending actors count: " << pending_actors_.size()
|
||||
<< ", Created actors count: " << created_actors_.size() << "}";
|
||||
return stream.str();
|
||||
}
|
||||
|
||||
} // namespace gcs
|
||||
} // namespace ray
|
||||
|
||||
@@ -302,6 +302,8 @@ class GcsActorManager : public rpc::ActorInfoHandler {
|
||||
/// Collect stats from gcs actor manager in-memory data structures.
|
||||
void CollectStats() const;
|
||||
|
||||
std::string DebugString() const;
|
||||
|
||||
private:
|
||||
/// A data structure representing an actor's owner.
|
||||
struct Owner {
|
||||
@@ -413,6 +415,22 @@ class GcsActorManager : public rpc::ActorInfoHandler {
|
||||
/// This method MUST BE IDEMPOTENT because it can be called multiple times during
|
||||
/// actor destroy process.
|
||||
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed_;
|
||||
|
||||
// Debug info.
|
||||
enum CountType {
|
||||
REGISTER_ACTOR_REQUEST = 0,
|
||||
CREATE_ACTOR_REQUEST = 1,
|
||||
GET_ACTOR_INFO_REQUEST = 2,
|
||||
GET_NAMED_ACTOR_INFO_REQUEST = 3,
|
||||
GET_ALL_ACTOR_INFO_REQUEST = 4,
|
||||
REGISTER_ACTOR_INFO_REQUEST = 5,
|
||||
UPDATE_ACTOR_INFO_REQUEST = 6,
|
||||
ADD_ACTOR_CHECKPOINT_REQUEST = 7,
|
||||
GET_ACTOR_CHECKPOINT_REQUEST = 8,
|
||||
GET_ACTOR_CHECKPOINT_ID_REQUEST = 9,
|
||||
CountType_MAX = 10,
|
||||
};
|
||||
uint64_t counts_[CountType::CountType_MAX] = {0};
|
||||
};
|
||||
|
||||
} // namespace gcs
|
||||
|
||||
@@ -146,6 +146,7 @@ void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request,
|
||||
};
|
||||
RAY_CHECK_OK(
|
||||
gcs_table_storage_->NodeTable().Put(node_id, request.node_info(), on_done));
|
||||
++counts_[CountType::REGISTER_NODE_REQUEST];
|
||||
}
|
||||
|
||||
void GcsNodeManager::HandleUnregisterNode(const rpc::UnregisterNodeRequest &request,
|
||||
@@ -172,6 +173,7 @@ void GcsNodeManager::HandleUnregisterNode(const rpc::UnregisterNodeRequest &requ
|
||||
// Update node state to DEAD instead of deleting it.
|
||||
RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_done));
|
||||
}
|
||||
++counts_[CountType::UNREGISTER_NODE_REQUEST];
|
||||
}
|
||||
|
||||
void GcsNodeManager::HandleGetAllNodeInfo(const rpc::GetAllNodeInfoRequest &request,
|
||||
@@ -184,6 +186,7 @@ void GcsNodeManager::HandleGetAllNodeInfo(const rpc::GetAllNodeInfoRequest &requ
|
||||
reply->add_node_info_list()->CopyFrom(*entry.second);
|
||||
}
|
||||
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
|
||||
++counts_[CountType::GET_ALL_NODE_INFO_REQUEST];
|
||||
}
|
||||
|
||||
void GcsNodeManager::HandleReportHeartbeat(const rpc::ReportHeartbeatRequest &request,
|
||||
@@ -210,6 +213,7 @@ void GcsNodeManager::HandleReportHeartbeat(const rpc::ReportHeartbeatRequest &re
|
||||
node_failure_detector_service_.post(
|
||||
[this, node_id] { node_failure_detector_->HandleHeartbeat(node_id); });
|
||||
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
|
||||
++counts_[CountType::REPORT_HEARTBEAT_REQUEST];
|
||||
}
|
||||
|
||||
void GcsNodeManager::HandleGetResources(const rpc::GetResourcesRequest &request,
|
||||
@@ -223,6 +227,7 @@ void GcsNodeManager::HandleGetResources(const rpc::GetResourcesRequest &request,
|
||||
}
|
||||
}
|
||||
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
|
||||
++counts_[CountType::GET_RESOURCES_REQUEST];
|
||||
}
|
||||
|
||||
void GcsNodeManager::HandleUpdateResources(const rpc::UpdateResourcesRequest &request,
|
||||
@@ -260,6 +265,7 @@ void GcsNodeManager::HandleUpdateResources(const rpc::UpdateResourcesRequest &re
|
||||
RAY_LOG(ERROR) << "Failed to update resources as node " << node_id
|
||||
<< " is not registered.";
|
||||
}
|
||||
++counts_[CountType::UPDATE_RESOURCES_REQUEST];
|
||||
}
|
||||
|
||||
void GcsNodeManager::HandleDeleteResources(const rpc::DeleteResourcesRequest &request,
|
||||
@@ -293,6 +299,7 @@ void GcsNodeManager::HandleDeleteResources(const rpc::DeleteResourcesRequest &re
|
||||
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
|
||||
RAY_LOG(DEBUG) << "Finished deleting node resources, node id = " << node_id;
|
||||
}
|
||||
++counts_[CountType::DELETE_RESOURCES_REQUEST];
|
||||
}
|
||||
|
||||
void GcsNodeManager::HandleSetInternalConfig(const rpc::SetInternalConfigRequest &request,
|
||||
@@ -304,6 +311,7 @@ void GcsNodeManager::HandleSetInternalConfig(const rpc::SetInternalConfigRequest
|
||||
};
|
||||
RAY_CHECK_OK(gcs_table_storage_->InternalConfigTable().Put(UniqueID::Nil(),
|
||||
request.config(), on_done));
|
||||
++counts_[CountType::SET_INTERNAL_CONFIG_REQUEST];
|
||||
}
|
||||
|
||||
void GcsNodeManager::HandleGetInternalConfig(const rpc::GetInternalConfigRequest &request,
|
||||
@@ -319,6 +327,7 @@ void GcsNodeManager::HandleGetInternalConfig(const rpc::GetInternalConfigRequest
|
||||
};
|
||||
RAY_CHECK_OK(
|
||||
gcs_table_storage_->InternalConfigTable().Get(UniqueID::Nil(), get_system_config));
|
||||
++counts_[CountType::GET_INTERNAL_CONFIG_REQUEST];
|
||||
}
|
||||
|
||||
void GcsNodeManager::HandleGetAllAvailableResources(
|
||||
@@ -334,6 +343,7 @@ void GcsNodeManager::HandleGetAllAvailableResources(
|
||||
reply->add_resources_list()->CopyFrom(resource);
|
||||
}
|
||||
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
|
||||
++counts_[CountType::GET_ALL_AVAILABLE_RESOURCES_REQUEST];
|
||||
}
|
||||
|
||||
void GcsNodeManager::HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &request,
|
||||
@@ -383,6 +393,7 @@ void GcsNodeManager::HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &re
|
||||
}
|
||||
|
||||
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
|
||||
++counts_[CountType::GET_ALL_HEARTBEAT_REQUEST];
|
||||
}
|
||||
|
||||
void GcsNodeManager::UpdateNodeHeartbeat(const NodeID node_id,
|
||||
@@ -580,5 +591,30 @@ void GcsNodeManager::SendBatchedHeartbeat() {
|
||||
});
|
||||
}
|
||||
|
||||
std::string GcsNodeManager::DebugString() const {
|
||||
std::ostringstream stream;
|
||||
stream << "GcsNodeManager: {RegisterNode request count: "
|
||||
<< counts_[CountType::REGISTER_NODE_REQUEST]
|
||||
<< ", UnregisterNode request count: "
|
||||
<< counts_[CountType::UNREGISTER_NODE_REQUEST]
|
||||
<< ", GetAllNodeInfo request count: "
|
||||
<< counts_[CountType::GET_ALL_NODE_INFO_REQUEST]
|
||||
<< ", ReportHeartbeat request count: "
|
||||
<< counts_[CountType::REPORT_HEARTBEAT_REQUEST]
|
||||
<< ", GetHeartbeat request count: " << counts_[CountType::GET_HEARTBEAT_REQUEST]
|
||||
<< ", GetAllHeartbeat request count: "
|
||||
<< counts_[CountType::GET_ALL_HEARTBEAT_REQUEST]
|
||||
<< ", GetResources request count: " << counts_[CountType::GET_RESOURCES_REQUEST]
|
||||
<< ", UpdateResources request count: "
|
||||
<< counts_[CountType::UPDATE_RESOURCES_REQUEST]
|
||||
<< ", DeleteResources request count: "
|
||||
<< counts_[CountType::DELETE_RESOURCES_REQUEST]
|
||||
<< ", SetInternalConfig request count: "
|
||||
<< counts_[CountType::SET_INTERNAL_CONFIG_REQUEST]
|
||||
<< ", GetInternalConfig request count: "
|
||||
<< counts_[CountType::GET_INTERNAL_CONFIG_REQUEST] << "}";
|
||||
return stream.str();
|
||||
}
|
||||
|
||||
} // namespace gcs
|
||||
} // namespace ray
|
||||
|
||||
@@ -176,6 +176,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
|
||||
void UpdatePlacementGroupLoad(
|
||||
const std::shared_ptr<rpc::PlacementGroupLoad> placement_group_load);
|
||||
|
||||
std::string DebugString() const;
|
||||
|
||||
protected:
|
||||
class NodeFailureDetector {
|
||||
public:
|
||||
@@ -287,6 +289,24 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
|
||||
absl::flat_hash_map<NodeID, std::shared_ptr<ResourceSet>> cluster_realtime_resources_;
|
||||
/// Placement group load information that is used for autoscaler.
|
||||
absl::optional<std::shared_ptr<rpc::PlacementGroupLoad>> placement_group_load_;
|
||||
|
||||
// Debug info.
|
||||
enum CountType {
|
||||
REGISTER_NODE_REQUEST = 0,
|
||||
UNREGISTER_NODE_REQUEST = 1,
|
||||
GET_ALL_NODE_INFO_REQUEST = 2,
|
||||
REPORT_HEARTBEAT_REQUEST = 3,
|
||||
GET_HEARTBEAT_REQUEST = 4,
|
||||
GET_ALL_HEARTBEAT_REQUEST = 5,
|
||||
GET_RESOURCES_REQUEST = 6,
|
||||
UPDATE_RESOURCES_REQUEST = 7,
|
||||
DELETE_RESOURCES_REQUEST = 8,
|
||||
SET_INTERNAL_CONFIG_REQUEST = 9,
|
||||
GET_INTERNAL_CONFIG_REQUEST = 10,
|
||||
GET_ALL_AVAILABLE_RESOURCES_REQUEST = 11,
|
||||
CountType_MAX = 12,
|
||||
};
|
||||
uint64_t counts_[CountType::CountType_MAX] = {0};
|
||||
};
|
||||
|
||||
} // namespace gcs
|
||||
|
||||
@@ -35,6 +35,7 @@ void GcsObjectManager::HandleGetObjectLocations(
|
||||
RAY_LOG(DEBUG) << "Finished getting object locations, job id = "
|
||||
<< object_id.TaskId().JobId() << ", object id = " << object_id;
|
||||
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
|
||||
++counts_[CountType::GET_OBJECT_LOCATIONS_REQUEST];
|
||||
}
|
||||
|
||||
void GcsObjectManager::HandleGetAllObjectLocations(
|
||||
@@ -54,6 +55,7 @@ void GcsObjectManager::HandleGetAllObjectLocations(
|
||||
}
|
||||
RAY_LOG(DEBUG) << "Finished getting all object locations.";
|
||||
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
|
||||
++counts_[CountType::GET_ALL_OBJECT_LOCATIONS_REQUEST];
|
||||
}
|
||||
|
||||
void GcsObjectManager::HandleAddObjectLocation(
|
||||
@@ -107,6 +109,7 @@ void GcsObjectManager::HandleAddObjectLocation(
|
||||
if (!status.ok()) {
|
||||
on_done(status);
|
||||
}
|
||||
++counts_[CountType::ADD_OBJECT_LOCATION_REQUEST];
|
||||
}
|
||||
|
||||
void GcsObjectManager::HandleRemoveObjectLocation(
|
||||
@@ -152,6 +155,7 @@ void GcsObjectManager::HandleRemoveObjectLocation(
|
||||
if (!status.ok()) {
|
||||
on_done(status);
|
||||
}
|
||||
++counts_[CountType::REMOVE_OBJECT_LOCATION_REQUEST];
|
||||
}
|
||||
|
||||
void GcsObjectManager::AddObjectsLocation(
|
||||
@@ -304,6 +308,21 @@ void GcsObjectManager::LoadInitialData(const EmptyCallback &done) {
|
||||
RAY_CHECK_OK(gcs_table_storage_->ObjectTable().GetAll(callback));
|
||||
}
|
||||
|
||||
std::string GcsObjectManager::DebugString() const {
|
||||
absl::MutexLock lock(&mutex_);
|
||||
std::ostringstream stream;
|
||||
stream << "GcsObjectManager: {GetObjectLocations request count: "
|
||||
<< counts_[CountType::GET_OBJECT_LOCATIONS_REQUEST]
|
||||
<< ", GetAllObjectLocations request count: "
|
||||
<< counts_[CountType::GET_ALL_OBJECT_LOCATIONS_REQUEST]
|
||||
<< ", AddObjectLocation request count: "
|
||||
<< counts_[CountType::ADD_OBJECT_LOCATION_REQUEST]
|
||||
<< ", RemoveObjectLocation request count: "
|
||||
<< counts_[CountType::REMOVE_OBJECT_LOCATION_REQUEST]
|
||||
<< ", Object count: " << object_to_locations_.size() << "}";
|
||||
return stream.str();
|
||||
}
|
||||
|
||||
} // namespace gcs
|
||||
|
||||
} // namespace ray
|
||||
|
||||
@@ -59,6 +59,8 @@ class GcsObjectManager : public rpc::ObjectInfoHandler {
|
||||
/// \param done Callback that will be called when load is complete.
|
||||
void LoadInitialData(const EmptyCallback &done);
|
||||
|
||||
std::string DebugString() const;
|
||||
|
||||
protected:
|
||||
struct LocationSet {
|
||||
absl::flat_hash_set<NodeID> locations;
|
||||
@@ -138,6 +140,16 @@ class GcsObjectManager : public rpc::ObjectInfoHandler {
|
||||
|
||||
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
|
||||
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
|
||||
|
||||
// Debug info.
|
||||
enum CountType {
|
||||
GET_OBJECT_LOCATIONS_REQUEST = 0,
|
||||
GET_ALL_OBJECT_LOCATIONS_REQUEST = 1,
|
||||
ADD_OBJECT_LOCATION_REQUEST = 2,
|
||||
REMOVE_OBJECT_LOCATION_REQUEST = 3,
|
||||
CountType_MAX = 4,
|
||||
};
|
||||
uint64_t counts_[CountType::CountType_MAX] = {0};
|
||||
};
|
||||
|
||||
} // namespace gcs
|
||||
|
||||
@@ -126,6 +126,9 @@ void GcsServer::Start() {
|
||||
};
|
||||
gcs_object_manager_->LoadInitialData(on_done);
|
||||
gcs_node_manager_->LoadInitialData(on_done);
|
||||
|
||||
// Print debug info periodically.
|
||||
PrintDebugInfo();
|
||||
}
|
||||
|
||||
void GcsServer::Stop() {
|
||||
@@ -296,5 +299,19 @@ void GcsServer::CollectStats() {
|
||||
(RayConfig::instance().metrics_report_interval_ms() / 2) /* milliseconds */);
|
||||
}
|
||||
|
||||
void GcsServer::PrintDebugInfo() {
|
||||
std::ostringstream stream;
|
||||
stream << gcs_node_manager_->DebugString() << "\n"
|
||||
<< gcs_actor_manager_->DebugString() << "\n"
|
||||
<< gcs_object_manager_->DebugString() << "\n"
|
||||
<< ((rpc::DefaultTaskInfoHandler *)task_info_handler_.get())->DebugString();
|
||||
// TODO(ffbin): We will get the session_dir in the next PR, and write the log to
|
||||
// gcs_debug_state.txt.
|
||||
RAY_LOG(INFO) << stream.str();
|
||||
execute_after(main_service_, [this] { PrintDebugInfo(); },
|
||||
(RayConfig::instance().gcs_dump_debug_log_interval_minutes() *
|
||||
60000) /* milliseconds */);
|
||||
}
|
||||
|
||||
} // namespace gcs
|
||||
} // namespace ray
|
||||
|
||||
@@ -113,6 +113,9 @@ class GcsServer {
|
||||
/// Collect stats from each module for every (metrics_report_interval_ms / 2) ms.
|
||||
void CollectStats();
|
||||
|
||||
/// Print debug info periodically.
|
||||
void PrintDebugInfo();
|
||||
|
||||
/// Gcs server configuration
|
||||
GcsServerConfig config_;
|
||||
/// The main io service to drive event posted from grpc threads.
|
||||
|
||||
@@ -42,6 +42,7 @@ void DefaultTaskInfoHandler::HandleAddTask(const AddTaskRequest &request,
|
||||
if (!status.ok()) {
|
||||
on_done(status);
|
||||
}
|
||||
++counts_[CountType::ADD_TASK_REQUEST];
|
||||
}
|
||||
|
||||
void DefaultTaskInfoHandler::HandleGetTask(const GetTaskRequest &request,
|
||||
@@ -64,6 +65,7 @@ void DefaultTaskInfoHandler::HandleGetTask(const GetTaskRequest &request,
|
||||
if (!status.ok()) {
|
||||
on_done(status, boost::none);
|
||||
}
|
||||
++counts_[CountType::GET_TASK_REQUEST];
|
||||
}
|
||||
|
||||
void DefaultTaskInfoHandler::HandleDeleteTasks(const DeleteTasksRequest &request,
|
||||
@@ -87,6 +89,7 @@ void DefaultTaskInfoHandler::HandleDeleteTasks(const DeleteTasksRequest &request
|
||||
}
|
||||
RAY_LOG(DEBUG) << "Finished deleting tasks, job id = " << job_id
|
||||
<< ", task id list size = " << task_ids.size();
|
||||
++counts_[CountType::DELETE_TASKS_REQUEST];
|
||||
}
|
||||
|
||||
void DefaultTaskInfoHandler::HandleAddTaskLease(const AddTaskLeaseRequest &request,
|
||||
@@ -116,6 +119,7 @@ void DefaultTaskInfoHandler::HandleAddTaskLease(const AddTaskLeaseRequest &reque
|
||||
if (!status.ok()) {
|
||||
on_done(status);
|
||||
}
|
||||
++counts_[CountType::ADD_TASK_LEASE_REQUEST];
|
||||
}
|
||||
|
||||
void DefaultTaskInfoHandler::HandleGetTaskLease(const GetTaskLeaseRequest &request,
|
||||
@@ -138,6 +142,7 @@ void DefaultTaskInfoHandler::HandleGetTaskLease(const GetTaskLeaseRequest &reque
|
||||
if (!status.ok()) {
|
||||
on_done(status, boost::none);
|
||||
}
|
||||
++counts_[CountType::GET_TASK_LEASE_REQUEST];
|
||||
}
|
||||
|
||||
void DefaultTaskInfoHandler::HandleAttemptTaskReconstruction(
|
||||
@@ -170,6 +175,20 @@ void DefaultTaskInfoHandler::HandleAttemptTaskReconstruction(
|
||||
if (!status.ok()) {
|
||||
on_done(status);
|
||||
}
|
||||
++counts_[CountType::ATTEMPT_TASK_RECONSTRUCTION_REQUEST];
|
||||
}
|
||||
|
||||
std::string DefaultTaskInfoHandler::DebugString() const {
|
||||
std::ostringstream stream;
|
||||
stream << "DefaultTaskInfoHandler: {AddTask request count: "
|
||||
<< counts_[CountType::ADD_TASK_REQUEST]
|
||||
<< ", GetTask request count: " << counts_[CountType::GET_TASK_REQUEST]
|
||||
<< ", DeleteTasks request count: " << counts_[CountType::DELETE_TASKS_REQUEST]
|
||||
<< ", AddTaskLease request count: " << counts_[CountType::ADD_TASK_LEASE_REQUEST]
|
||||
<< ", GetTaskLease request count: " << counts_[CountType::GET_TASK_LEASE_REQUEST]
|
||||
<< ", AttemptTaskReconstruction request count: "
|
||||
<< counts_[CountType::ATTEMPT_TASK_RECONSTRUCTION_REQUEST] << "}";
|
||||
return stream.str();
|
||||
}
|
||||
|
||||
} // namespace rpc
|
||||
|
||||
@@ -48,9 +48,23 @@ class DefaultTaskInfoHandler : public rpc::TaskInfoHandler {
|
||||
AttemptTaskReconstructionReply *reply,
|
||||
SendReplyCallback send_reply_callback) override;
|
||||
|
||||
std::string DebugString() const;
|
||||
|
||||
private:
|
||||
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
|
||||
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub_;
|
||||
|
||||
// Debug info.
|
||||
enum CountType {
|
||||
ADD_TASK_REQUEST = 0,
|
||||
GET_TASK_REQUEST = 1,
|
||||
DELETE_TASKS_REQUEST = 2,
|
||||
ADD_TASK_LEASE_REQUEST = 3,
|
||||
GET_TASK_LEASE_REQUEST = 4,
|
||||
ATTEMPT_TASK_RECONSTRUCTION_REQUEST = 5,
|
||||
CountType_MAX = 6,
|
||||
};
|
||||
uint64_t counts_[CountType::CountType_MAX] = {0};
|
||||
};
|
||||
|
||||
} // namespace rpc
|
||||
|
||||
Reference in New Issue
Block a user