mirror of
https://github.com/wassname/ray.git
synced 2026-07-05 18:48:25 +08:00
Fix high CPU usage in object manager due to O(n^2) iteration over active pulls list (#13724)
This commit is contained in:
@@ -51,7 +51,8 @@ uint64_t PullManager::Pull(const std::vector<rpc::ObjectReference> &object_ref_b
|
||||
|
||||
bool PullManager::ActivateNextPullBundleRequest(
|
||||
const std::map<uint64_t, std::vector<rpc::ObjectReference>>::iterator
|
||||
&next_request_it) {
|
||||
&next_request_it,
|
||||
std::vector<ObjectID> *objects_to_pull) {
|
||||
// Check that we have sizes for all of the objects in the bundle. If not, we
|
||||
// should not activate the bundle, since it may put us over the available
|
||||
// capacity.
|
||||
@@ -81,6 +82,7 @@ bool PullManager::ActivateNextPullBundleRequest(
|
||||
auto it = object_pull_requests_.find(obj_id);
|
||||
RAY_CHECK(it != object_pull_requests_.end());
|
||||
num_bytes_being_pulled_ += it->second.object_size;
|
||||
objects_to_pull->push_back(obj_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,7 +93,8 @@ bool PullManager::ActivateNextPullBundleRequest(
|
||||
}
|
||||
|
||||
void PullManager::DeactivatePullBundleRequest(
|
||||
const std::map<uint64_t, std::vector<rpc::ObjectReference>>::iterator &request_it) {
|
||||
const std::map<uint64_t, std::vector<rpc::ObjectReference>>::iterator &request_it,
|
||||
std::unordered_set<ObjectID> *objects_to_cancel) {
|
||||
for (const auto &ref : request_it->second) {
|
||||
auto obj_id = ObjectRefToId(ref);
|
||||
RAY_CHECK(active_object_pull_requests_[obj_id].erase(request_it->first));
|
||||
@@ -101,6 +104,10 @@ void PullManager::DeactivatePullBundleRequest(
|
||||
RAY_CHECK(it != object_pull_requests_.end());
|
||||
num_bytes_being_pulled_ -= it->second.object_size;
|
||||
active_object_pull_requests_.erase(obj_id);
|
||||
|
||||
if (objects_to_cancel) {
|
||||
objects_to_cancel->insert(obj_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,10 +127,9 @@ void PullManager::UpdatePullsBasedOnAvailableMemory(size_t num_bytes_available)
|
||||
RAY_LOG(DEBUG) << "Updating pulls based on available memory: " << num_bytes_available;
|
||||
}
|
||||
num_bytes_available_ = num_bytes_available;
|
||||
uint64_t prev_highest_req_id_being_pulled = highest_req_id_being_pulled_;
|
||||
|
||||
std::unordered_set<ObjectID> object_ids_to_pull;
|
||||
// While there is available capacity, activate the next pull request.
|
||||
std::vector<ObjectID> objects_to_pull;
|
||||
while (num_bytes_being_pulled_ < num_bytes_available_) {
|
||||
// Get the next pull request in the queue.
|
||||
const auto last_request_it = pull_request_bundles_.find(highest_req_id_being_pulled_);
|
||||
@@ -145,7 +151,7 @@ void PullManager::UpdatePullsBasedOnAvailableMemory(size_t num_bytes_available)
|
||||
<< " num bytes available: " << num_bytes_available_;
|
||||
// There is another pull bundle request that we could try, and there is
|
||||
// enough space. Activate the next pull bundle request in the queue.
|
||||
if (!ActivateNextPullBundleRequest(next_request_it)) {
|
||||
if (!ActivateNextPullBundleRequest(next_request_it, &objects_to_pull)) {
|
||||
// This pull bundle request could not be activated, due to lack of object
|
||||
// size information. Wait until we have object size information before
|
||||
// activating this pull bundle.
|
||||
@@ -162,18 +168,15 @@ void PullManager::UpdatePullsBasedOnAvailableMemory(size_t num_bytes_available)
|
||||
<< " num bytes available: " << num_bytes_available_;
|
||||
const auto last_request_it = pull_request_bundles_.find(highest_req_id_being_pulled_);
|
||||
RAY_CHECK(last_request_it != pull_request_bundles_.end());
|
||||
DeactivatePullBundleRequest(last_request_it);
|
||||
DeactivatePullBundleRequest(last_request_it, &object_ids_to_cancel);
|
||||
}
|
||||
|
||||
TriggerOutOfMemoryHandlingIfNeeded();
|
||||
|
||||
if (highest_req_id_being_pulled_ > prev_highest_req_id_being_pulled) {
|
||||
// There are newly activated requests. Start pulling objects for the newly
|
||||
// activated requests.
|
||||
// NOTE(swang): We could also just wait for the next timer tick to pull the
|
||||
// objects, but this would add a delay of up to one tick for any bundles of
|
||||
// multiple objects, even when we are not under memory pressure.
|
||||
Tick();
|
||||
for (const auto &obj_id : objects_to_pull) {
|
||||
if (object_ids_to_cancel.count(obj_id) == 0) {
|
||||
TryToMakeObjectLocal(obj_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -146,12 +146,14 @@ class PullManager {
|
||||
/// any objects in the request that are not already being pulled.
|
||||
bool ActivateNextPullBundleRequest(
|
||||
const std::map<uint64_t, std::vector<rpc::ObjectReference>>::iterator
|
||||
&next_request_it);
|
||||
&next_request_it,
|
||||
std::vector<ObjectID> *objects_to_pull);
|
||||
|
||||
/// Deactivate a pull request in the queue. This cancels any pull or restore
|
||||
/// operations for the object.
|
||||
void DeactivatePullBundleRequest(
|
||||
const std::map<uint64_t, std::vector<rpc::ObjectReference>>::iterator &request_it);
|
||||
const std::map<uint64_t, std::vector<rpc::ObjectReference>>::iterator &request_it,
|
||||
std::unordered_set<ObjectID> *objects_to_cancel = nullptr);
|
||||
|
||||
/// Trigger out-of-memory handling if the first request in the queue needs
|
||||
/// more space than the bytes available. This is needed to make room for the
|
||||
|
||||
@@ -2509,14 +2509,16 @@ rpc::ObjectStoreStats AccumulateStoreStats(
|
||||
rpc::ObjectStoreStats store_stats;
|
||||
for (const auto &reply : node_stats) {
|
||||
auto cur_store = reply.store_stats();
|
||||
store_stats.set_spill_time_total_s(store_stats.spill_time_total_s() +
|
||||
cur_store.spill_time_total_s());
|
||||
// Use max aggregation for time, since the nodes are spilling concurrently.
|
||||
store_stats.set_spill_time_total_s(
|
||||
std::max(store_stats.spill_time_total_s(), cur_store.spill_time_total_s()));
|
||||
store_stats.set_restore_time_total_s(
|
||||
std::max(store_stats.restore_time_total_s(), cur_store.restore_time_total_s()));
|
||||
// Use sum aggregation for the rest of the metrics.
|
||||
store_stats.set_spilled_bytes_total(store_stats.spilled_bytes_total() +
|
||||
cur_store.spilled_bytes_total());
|
||||
store_stats.set_spilled_objects_total(store_stats.spilled_objects_total() +
|
||||
cur_store.spilled_objects_total());
|
||||
store_stats.set_restore_time_total_s(store_stats.restore_time_total_s() +
|
||||
cur_store.restore_time_total_s());
|
||||
store_stats.set_restored_bytes_total(store_stats.restored_bytes_total() +
|
||||
cur_store.restored_bytes_total());
|
||||
store_stats.set_restored_objects_total(store_stats.restored_objects_total() +
|
||||
|
||||
Reference in New Issue
Block a user