From b4d87b8fc5ff03b868adba35b37da32b7be9f71f Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 27 Jan 2021 14:02:22 -0800 Subject: [PATCH] Fix high CPU usage in object manager due to O(n^2) iteration over active pulls list (#13724) --- src/ray/object_manager/pull_manager.cc | 29 ++++++++++++++------------ src/ray/object_manager/pull_manager.h | 6 ++++-- src/ray/raylet/node_manager.cc | 10 +++++---- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/src/ray/object_manager/pull_manager.cc b/src/ray/object_manager/pull_manager.cc index 302f2f435..f4920a8de 100644 --- a/src/ray/object_manager/pull_manager.cc +++ b/src/ray/object_manager/pull_manager.cc @@ -51,7 +51,8 @@ uint64_t PullManager::Pull(const std::vector &object_ref_b bool PullManager::ActivateNextPullBundleRequest( const std::map>::iterator - &next_request_it) { + &next_request_it, + std::vector *objects_to_pull) { // Check that we have sizes for all of the objects in the bundle. If not, we // should not activate the bundle, since it may put us over the available // capacity. @@ -81,6 +82,7 @@ bool PullManager::ActivateNextPullBundleRequest( auto it = object_pull_requests_.find(obj_id); RAY_CHECK(it != object_pull_requests_.end()); num_bytes_being_pulled_ += it->second.object_size; + objects_to_pull->push_back(obj_id); } } @@ -91,7 +93,8 @@ bool PullManager::ActivateNextPullBundleRequest( } void PullManager::DeactivatePullBundleRequest( - const std::map>::iterator &request_it) { + const std::map>::iterator &request_it, + std::unordered_set *objects_to_cancel) { for (const auto &ref : request_it->second) { auto obj_id = ObjectRefToId(ref); RAY_CHECK(active_object_pull_requests_[obj_id].erase(request_it->first)); @@ -101,6 +104,10 @@ void PullManager::DeactivatePullBundleRequest( RAY_CHECK(it != object_pull_requests_.end()); num_bytes_being_pulled_ -= it->second.object_size; active_object_pull_requests_.erase(obj_id); + + if (objects_to_cancel) { + objects_to_cancel->insert(obj_id); + } } } @@ -120,10 +127,9 @@ void PullManager::UpdatePullsBasedOnAvailableMemory(size_t num_bytes_available) RAY_LOG(DEBUG) << "Updating pulls based on available memory: " << num_bytes_available; } num_bytes_available_ = num_bytes_available; - uint64_t prev_highest_req_id_being_pulled = highest_req_id_being_pulled_; - std::unordered_set object_ids_to_pull; // While there is available capacity, activate the next pull request. + std::vector objects_to_pull; while (num_bytes_being_pulled_ < num_bytes_available_) { // Get the next pull request in the queue. const auto last_request_it = pull_request_bundles_.find(highest_req_id_being_pulled_); @@ -145,7 +151,7 @@ void PullManager::UpdatePullsBasedOnAvailableMemory(size_t num_bytes_available) << " num bytes available: " << num_bytes_available_; // There is another pull bundle request that we could try, and there is // enough space. Activate the next pull bundle request in the queue. - if (!ActivateNextPullBundleRequest(next_request_it)) { + if (!ActivateNextPullBundleRequest(next_request_it, &objects_to_pull)) { // This pull bundle request could not be activated, due to lack of object // size information. Wait until we have object size information before // activating this pull bundle. @@ -162,18 +168,15 @@ void PullManager::UpdatePullsBasedOnAvailableMemory(size_t num_bytes_available) << " num bytes available: " << num_bytes_available_; const auto last_request_it = pull_request_bundles_.find(highest_req_id_being_pulled_); RAY_CHECK(last_request_it != pull_request_bundles_.end()); - DeactivatePullBundleRequest(last_request_it); + DeactivatePullBundleRequest(last_request_it, &object_ids_to_cancel); } TriggerOutOfMemoryHandlingIfNeeded(); - if (highest_req_id_being_pulled_ > prev_highest_req_id_being_pulled) { - // There are newly activated requests. Start pulling objects for the newly - // activated requests. - // NOTE(swang): We could also just wait for the next timer tick to pull the - // objects, but this would add a delay of up to one tick for any bundles of - // multiple objects, even when we are not under memory pressure. - Tick(); + for (const auto &obj_id : objects_to_pull) { + if (object_ids_to_cancel.count(obj_id) == 0) { + TryToMakeObjectLocal(obj_id); + } } } diff --git a/src/ray/object_manager/pull_manager.h b/src/ray/object_manager/pull_manager.h index 26eba1a35..3a542fef7 100644 --- a/src/ray/object_manager/pull_manager.h +++ b/src/ray/object_manager/pull_manager.h @@ -146,12 +146,14 @@ class PullManager { /// any objects in the request that are not already being pulled. bool ActivateNextPullBundleRequest( const std::map>::iterator - &next_request_it); + &next_request_it, + std::vector *objects_to_pull); /// Deactivate a pull request in the queue. This cancels any pull or restore /// operations for the object. void DeactivatePullBundleRequest( - const std::map>::iterator &request_it); + const std::map>::iterator &request_it, + std::unordered_set *objects_to_cancel = nullptr); /// Trigger out-of-memory handling if the first request in the queue needs /// more space than the bytes available. This is needed to make room for the diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 072064f46..e1ac5eb67 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2509,14 +2509,16 @@ rpc::ObjectStoreStats AccumulateStoreStats( rpc::ObjectStoreStats store_stats; for (const auto &reply : node_stats) { auto cur_store = reply.store_stats(); - store_stats.set_spill_time_total_s(store_stats.spill_time_total_s() + - cur_store.spill_time_total_s()); + // Use max aggregation for time, since the nodes are spilling concurrently. + store_stats.set_spill_time_total_s( + std::max(store_stats.spill_time_total_s(), cur_store.spill_time_total_s())); + store_stats.set_restore_time_total_s( + std::max(store_stats.restore_time_total_s(), cur_store.restore_time_total_s())); + // Use sum aggregation for the rest of the metrics. store_stats.set_spilled_bytes_total(store_stats.spilled_bytes_total() + cur_store.spilled_bytes_total()); store_stats.set_spilled_objects_total(store_stats.spilled_objects_total() + cur_store.spilled_objects_total()); - store_stats.set_restore_time_total_s(store_stats.restore_time_total_s() + - cur_store.restore_time_total_s()); store_stats.set_restored_bytes_total(store_stats.restored_bytes_total() + cur_store.restored_bytes_total()); store_stats.set_restored_objects_total(store_stats.restored_objects_total() +