mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 19:16:19 +08:00
[core] Add debug information for the PullManager and LocalObjectManager (#13782)
* Add debug info * Formatting. Co-authored-by: SangBin Cho <rkooo567@gmail.com>
This commit is contained in:
@@ -818,6 +818,7 @@ std::string ObjectManager::DebugString() const {
|
||||
result << "\n" << object_directory_->DebugString();
|
||||
result << "\n" << store_notification_->DebugString();
|
||||
result << "\n" << buffer_pool_.DebugString();
|
||||
result << "\n" << pull_manager_->DebugString();
|
||||
return result.str();
|
||||
}
|
||||
|
||||
|
||||
@@ -424,4 +424,16 @@ void PullManager::Tick() {
|
||||
|
||||
int PullManager::NumActiveRequests() const { return object_pull_requests_.size(); }
|
||||
|
||||
std::string PullManager::DebugString() const {
|
||||
std::stringstream result;
|
||||
result << "PullManager:";
|
||||
result << "\n- num bytes available for pulled objects: " << num_bytes_available_;
|
||||
result << "\n- num bytes being pulled: " << num_bytes_being_pulled_;
|
||||
result << "\n- num pull request bundles: " << pull_request_bundles_.size();
|
||||
result << "\n- num objects requested pull: " << object_pull_requests_.size();
|
||||
result << "\n- num objects actively being pulled: "
|
||||
<< active_object_pull_requests_.size();
|
||||
return result.str();
|
||||
}
|
||||
|
||||
} // namespace ray
|
||||
|
||||
@@ -100,6 +100,8 @@ class PullManager {
|
||||
/// The number of ongoing object pulls.
|
||||
int NumActiveRequests() const;
|
||||
|
||||
std::string DebugString() const;
|
||||
|
||||
private:
|
||||
/// A helper structure for tracking information about each ongoing object pull.
|
||||
struct ObjectPullRequest {
|
||||
|
||||
@@ -32,6 +32,7 @@ void LocalObjectManager::PinObjects(const std::vector<ObjectID> &object_ids,
|
||||
continue;
|
||||
}
|
||||
RAY_LOG(DEBUG) << "Pinning object " << object_id;
|
||||
pinned_objects_size_ += object->GetSize();
|
||||
pinned_objects_.emplace(object_id, std::move(object));
|
||||
}
|
||||
}
|
||||
@@ -69,7 +70,10 @@ void LocalObjectManager::ReleaseFreedObject(const ObjectID &object_id) {
|
||||
if (automatic_object_deletion_enabled_) {
|
||||
spilled_object_pending_delete_.push(object_id);
|
||||
}
|
||||
pinned_objects_.erase(object_id);
|
||||
if (pinned_objects_.count(object_id)) {
|
||||
pinned_objects_size_ -= pinned_objects_[object_id]->GetSize();
|
||||
pinned_objects_.erase(object_id);
|
||||
}
|
||||
}
|
||||
|
||||
// Try to evict all copies of the object from the cluster.
|
||||
@@ -237,6 +241,7 @@ void LocalObjectManager::SpillObjectsInternal(
|
||||
for (const auto &object_id : objects_to_spill) {
|
||||
auto it = objects_pending_spill_.find(object_id);
|
||||
RAY_CHECK(it != objects_pending_spill_.end());
|
||||
pinned_objects_size_ += it->second->GetSize();
|
||||
pinned_objects_.emplace(object_id, std::move(it->second));
|
||||
objects_pending_spill_.erase(it);
|
||||
}
|
||||
@@ -454,6 +459,17 @@ void LocalObjectManager::FillObjectSpillingStats(rpc::GetNodeStatsReply *reply)
|
||||
stats->set_restored_objects_total(restored_objects_total_);
|
||||
}
|
||||
|
||||
std::string LocalObjectManager::DebugString() const {
|
||||
std::stringstream result;
|
||||
result << "LocalObjectManager:\n";
|
||||
result << "- num pinned objects: " << pinned_objects_.size() << "\n";
|
||||
result << "- pinned objects size: " << pinned_objects_size_ << "\n";
|
||||
result << "- num objects pending restore: " << objects_pending_restore_.size() << "\n";
|
||||
result << "- num objects pending spill: " << objects_pending_spill_.size() << "\n";
|
||||
result << "- num bytes pending spill: " << num_bytes_pending_spill_ << "\n";
|
||||
return result.str();
|
||||
}
|
||||
|
||||
}; // namespace raylet
|
||||
|
||||
}; // namespace ray
|
||||
|
||||
@@ -136,6 +136,8 @@ class LocalObjectManager {
|
||||
/// \param Output parameter.
|
||||
void FillObjectSpillingStats(rpc::GetNodeStatsReply *reply) const;
|
||||
|
||||
std::string DebugString() const;
|
||||
|
||||
private:
|
||||
FRIEND_TEST(LocalObjectManagerTest, TestSpillObjectsOfSize);
|
||||
FRIEND_TEST(LocalObjectManagerTest,
|
||||
@@ -203,6 +205,9 @@ class LocalObjectManager {
|
||||
// Objects that are pinned on this node.
|
||||
absl::flat_hash_map<ObjectID, std::unique_ptr<RayObject>> pinned_objects_;
|
||||
|
||||
// Total size of objects pinned on this node.
|
||||
size_t pinned_objects_size_ = 0;
|
||||
|
||||
// Objects that were pinned on this node but that are being spilled.
|
||||
// These objects will be released once spilling is complete and the URL is
|
||||
// written to the object directory.
|
||||
|
||||
@@ -2334,6 +2334,7 @@ std::string NodeManager::DebugString() const {
|
||||
for (auto &pair : cluster_resource_map_) {
|
||||
result << "\n" << pair.first.Hex() << ": " << pair.second.DebugString();
|
||||
}
|
||||
result << "\n" << local_object_manager_.DebugString();
|
||||
result << "\n" << object_manager_.DebugString();
|
||||
result << "\n" << gcs_client_->DebugString();
|
||||
result << "\n" << worker_pool_.DebugString();
|
||||
|
||||
@@ -1037,6 +1037,10 @@ std::string WorkerPool::DebugString() const {
|
||||
<< " workers: " << entry.second.registered_workers.size();
|
||||
result << "\n- num " << Language_Name(entry.first)
|
||||
<< " drivers: " << entry.second.registered_drivers.size();
|
||||
result << "\n- num object spill callbacks queued: "
|
||||
<< entry.second.spill_io_worker_state.pending_io_tasks.size();
|
||||
result << "\n- num object restore queued: "
|
||||
<< entry.second.restore_io_worker_state.pending_io_tasks.size();
|
||||
}
|
||||
result << "\n- num idle workers: " << idle_of_all_languages_.size();
|
||||
return result.str();
|
||||
|
||||
Reference in New Issue
Block a user