From 311cf74f45e69ad921894ccf3bb467faa054b4c1 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Mon, 1 Feb 2021 10:48:21 -0800 Subject: [PATCH] [core][object spillin] Fix bugs in admission control (#13781) --- src/ray/gcs/accessor.h | 2 +- src/ray/gcs/gcs_client/service_based_accessor.cc | 3 ++- src/ray/gcs/gcs_client/service_based_accessor.h | 2 +- src/ray/object_manager/plasma/store.cc | 2 ++ src/ray/object_manager/plasma/store.h | 7 +++++-- src/ray/object_manager/pull_manager.cc | 10 ++++++++-- src/ray/raylet/local_object_manager.cc | 6 +++++- src/ray/raylet/test/local_object_manager_test.cc | 2 +- 8 files changed, 25 insertions(+), 9 deletions(-) diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 3bc700202..e7ddb765b 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -308,7 +308,7 @@ class ObjectInfoAccessor { /// \return Status virtual Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url, - const NodeID &spilled_node_id, + const NodeID &spilled_node_id, size_t object_size, const StatusCallback &callback) = 0; /// Remove location of object from GCS asynchronously. diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 821e0f7d9..654a53826 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -1102,7 +1102,7 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddLocation(const ObjectID &object_i Status ServiceBasedObjectInfoAccessor::AsyncAddSpilledUrl( const ObjectID &object_id, const std::string &spilled_url, - const NodeID &spilled_node_id, const StatusCallback &callback) { + const NodeID &spilled_node_id, size_t object_size, const StatusCallback &callback) { RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id << ", spilled_url = " << spilled_url << ", job id = " << object_id.TaskId().JobId(); @@ -1110,6 +1110,7 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddSpilledUrl( request.set_object_id(object_id.Binary()); request.set_spilled_url(spilled_url); request.set_spilled_node_id(spilled_node_id.Binary()); + request.set_size(object_size); auto operation = [this, request, callback](const SequencerDoneCallback &done_callback) { client_impl_->GetGcsRpcClient().AddObjectLocation( diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index 149fa6d2e..79deb2a6c 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -326,7 +326,7 @@ class ServiceBasedObjectInfoAccessor : public ObjectInfoAccessor { size_t object_size, const StatusCallback &callback) override; Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url, - const NodeID &node_id, + const NodeID &node_id, size_t object_size, const StatusCallback &callback) override; Status AsyncRemoveLocation(const ObjectID &object_id, const NodeID &node_id, diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index d12a8096a..7af47777a 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -159,6 +159,7 @@ void PlasmaStore::AddToClientObjectIds(const ObjectID &object_id, ObjectTableEnt if (entry->ref_count == 0) { // Tell the eviction policy that this object is being used. eviction_policy_.BeginObjectAccess(object_id); + num_bytes_in_use_ += entry->data_size + entry->metadata_size; } // Increase reference count. entry->ref_count++; @@ -537,6 +538,7 @@ int PlasmaStore::RemoveFromClientObjectIds(const ObjectID &object_id, // If no more clients are using this object, notify the eviction policy // that the object is no longer being used. if (entry->ref_count == 0) { + num_bytes_in_use_ -= entry->data_size + entry->metadata_size; RAY_LOG(DEBUG) << "Releasing object no longer in use " << object_id; if (deletion_cache_.count(object_id) == 0) { // Tell the eviction policy that this object is no longer being used. diff --git a/src/ray/object_manager/plasma/store.h b/src/ray/object_manager/plasma/store.h index 2ad3aad26..214cf9763 100644 --- a/src/ray/object_manager/plasma/store.h +++ b/src/ray/object_manager/plasma/store.h @@ -211,8 +211,9 @@ class PlasmaStore { void ProcessCreateRequests(); void GetAvailableMemory(std::function callback) const { - size_t available = - PlasmaAllocator::GetFootprintLimit() - eviction_policy_.GetPinnedMemoryBytes(); + int64_t num_bytes_in_use = static_cast(num_bytes_in_use_); + RAY_CHECK(PlasmaAllocator::GetFootprintLimit() >= num_bytes_in_use); + size_t available = PlasmaAllocator::GetFootprintLimit() - num_bytes_in_use; callback(available); } @@ -313,6 +314,8 @@ class PlasmaStore { /// interface that node manager or object manager can access the plasma store with this /// mutex if it is not absolutely necessary. std::recursive_mutex mutex_; + + size_t num_bytes_in_use_ = 0; }; } // namespace plasma diff --git a/src/ray/object_manager/pull_manager.cc b/src/ray/object_manager/pull_manager.cc index f4920a8de..63d2bf181 100644 --- a/src/ray/object_manager/pull_manager.cc +++ b/src/ray/object_manager/pull_manager.cc @@ -277,11 +277,17 @@ void PullManager::OnLocationChange(const ObjectID &object_id, it->second.spilled_url = spilled_url; it->second.spilled_node_id = spilled_node_id; if (!it->second.object_size_set) { - RAY_LOG(DEBUG) << "Updated size of object " << object_id << " to " << object_size - << ", num bytes being pulled is now " << num_bytes_being_pulled_; it->second.object_size = object_size; it->second.object_size_set = true; UpdatePullsBasedOnAvailableMemory(num_bytes_available_); + RAY_LOG(DEBUG) << "Updated size of object " << object_id << " to " << object_size + << ", num bytes being pulled is now " << num_bytes_being_pulled_; + if (it->second.object_size == 0) { + RAY_LOG(WARNING) << "Size of object " << object_id + << " stored in object store is zero. This may be a bug since " + "objects in the object store should be large, and can result " + "in too many objects being fetched to this node"; + } } RAY_LOG(DEBUG) << "OnLocationChange " << spilled_url << " num clients " << client_ids.size(); diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index 9909beb76..075293232 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -265,11 +265,15 @@ void LocalObjectManager::AddSpilledUrls( // don't need to report where this object is spilled. const auto node_id_object_spilled = is_external_storage_type_fs_ ? self_node_id_ : NodeID::Nil(); + + auto it = objects_pending_spill_.find(object_id); + RAY_CHECK(it != objects_pending_spill_.end()); + // Write to object directory. Wait for the write to finish before // releasing the object to make sure that the spilled object can // be retrieved by other raylets. RAY_CHECK_OK(object_info_accessor_.AsyncAddSpilledUrl( - object_id, object_url, node_id_object_spilled, + object_id, object_url, node_id_object_spilled, it->second->GetSize(), [this, object_id, object_url, callback, num_remaining](Status status) { RAY_CHECK_OK(status); // Unpin the object. diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index 8ff77250f..f68707ce7 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -194,7 +194,7 @@ class MockObjectInfoAccessor : public gcs::ObjectInfoAccessor { size_t object_size, const gcs::StatusCallback &callback)); Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url, - const NodeID &spilled_node_id, + const NodeID &spilled_node_id, size_t object_size, const gcs::StatusCallback &callback) { object_urls[object_id] = spilled_url; callbacks.push_back(callback);