mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 19:00:36 +08:00
[core][object spillin] Fix bugs in admission control (#13781)
This commit is contained in:
@@ -308,7 +308,7 @@ class ObjectInfoAccessor {
|
||||
/// \return Status
|
||||
virtual Status AsyncAddSpilledUrl(const ObjectID &object_id,
|
||||
const std::string &spilled_url,
|
||||
const NodeID &spilled_node_id,
|
||||
const NodeID &spilled_node_id, size_t object_size,
|
||||
const StatusCallback &callback) = 0;
|
||||
|
||||
/// Remove location of object from GCS asynchronously.
|
||||
|
||||
@@ -1102,7 +1102,7 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddLocation(const ObjectID &object_i
|
||||
|
||||
Status ServiceBasedObjectInfoAccessor::AsyncAddSpilledUrl(
|
||||
const ObjectID &object_id, const std::string &spilled_url,
|
||||
const NodeID &spilled_node_id, const StatusCallback &callback) {
|
||||
const NodeID &spilled_node_id, size_t object_size, const StatusCallback &callback) {
|
||||
RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id
|
||||
<< ", spilled_url = " << spilled_url
|
||||
<< ", job id = " << object_id.TaskId().JobId();
|
||||
@@ -1110,6 +1110,7 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddSpilledUrl(
|
||||
request.set_object_id(object_id.Binary());
|
||||
request.set_spilled_url(spilled_url);
|
||||
request.set_spilled_node_id(spilled_node_id.Binary());
|
||||
request.set_size(object_size);
|
||||
|
||||
auto operation = [this, request, callback](const SequencerDoneCallback &done_callback) {
|
||||
client_impl_->GetGcsRpcClient().AddObjectLocation(
|
||||
|
||||
@@ -326,7 +326,7 @@ class ServiceBasedObjectInfoAccessor : public ObjectInfoAccessor {
|
||||
size_t object_size, const StatusCallback &callback) override;
|
||||
|
||||
Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url,
|
||||
const NodeID &node_id,
|
||||
const NodeID &node_id, size_t object_size,
|
||||
const StatusCallback &callback) override;
|
||||
|
||||
Status AsyncRemoveLocation(const ObjectID &object_id, const NodeID &node_id,
|
||||
|
||||
@@ -159,6 +159,7 @@ void PlasmaStore::AddToClientObjectIds(const ObjectID &object_id, ObjectTableEnt
|
||||
if (entry->ref_count == 0) {
|
||||
// Tell the eviction policy that this object is being used.
|
||||
eviction_policy_.BeginObjectAccess(object_id);
|
||||
num_bytes_in_use_ += entry->data_size + entry->metadata_size;
|
||||
}
|
||||
// Increase reference count.
|
||||
entry->ref_count++;
|
||||
@@ -537,6 +538,7 @@ int PlasmaStore::RemoveFromClientObjectIds(const ObjectID &object_id,
|
||||
// If no more clients are using this object, notify the eviction policy
|
||||
// that the object is no longer being used.
|
||||
if (entry->ref_count == 0) {
|
||||
num_bytes_in_use_ -= entry->data_size + entry->metadata_size;
|
||||
RAY_LOG(DEBUG) << "Releasing object no longer in use " << object_id;
|
||||
if (deletion_cache_.count(object_id) == 0) {
|
||||
// Tell the eviction policy that this object is no longer being used.
|
||||
|
||||
@@ -211,8 +211,9 @@ class PlasmaStore {
|
||||
void ProcessCreateRequests();
|
||||
|
||||
void GetAvailableMemory(std::function<void(size_t)> callback) const {
|
||||
size_t available =
|
||||
PlasmaAllocator::GetFootprintLimit() - eviction_policy_.GetPinnedMemoryBytes();
|
||||
int64_t num_bytes_in_use = static_cast<int64_t>(num_bytes_in_use_);
|
||||
RAY_CHECK(PlasmaAllocator::GetFootprintLimit() >= num_bytes_in_use);
|
||||
size_t available = PlasmaAllocator::GetFootprintLimit() - num_bytes_in_use;
|
||||
callback(available);
|
||||
}
|
||||
|
||||
@@ -313,6 +314,8 @@ class PlasmaStore {
|
||||
/// interface that node manager or object manager can access the plasma store with this
|
||||
/// mutex if it is not absolutely necessary.
|
||||
std::recursive_mutex mutex_;
|
||||
|
||||
size_t num_bytes_in_use_ = 0;
|
||||
};
|
||||
|
||||
} // namespace plasma
|
||||
|
||||
@@ -277,11 +277,17 @@ void PullManager::OnLocationChange(const ObjectID &object_id,
|
||||
it->second.spilled_url = spilled_url;
|
||||
it->second.spilled_node_id = spilled_node_id;
|
||||
if (!it->second.object_size_set) {
|
||||
RAY_LOG(DEBUG) << "Updated size of object " << object_id << " to " << object_size
|
||||
<< ", num bytes being pulled is now " << num_bytes_being_pulled_;
|
||||
it->second.object_size = object_size;
|
||||
it->second.object_size_set = true;
|
||||
UpdatePullsBasedOnAvailableMemory(num_bytes_available_);
|
||||
RAY_LOG(DEBUG) << "Updated size of object " << object_id << " to " << object_size
|
||||
<< ", num bytes being pulled is now " << num_bytes_being_pulled_;
|
||||
if (it->second.object_size == 0) {
|
||||
RAY_LOG(WARNING) << "Size of object " << object_id
|
||||
<< " stored in object store is zero. This may be a bug since "
|
||||
"objects in the object store should be large, and can result "
|
||||
"in too many objects being fetched to this node";
|
||||
}
|
||||
}
|
||||
RAY_LOG(DEBUG) << "OnLocationChange " << spilled_url << " num clients "
|
||||
<< client_ids.size();
|
||||
|
||||
@@ -265,11 +265,15 @@ void LocalObjectManager::AddSpilledUrls(
|
||||
// don't need to report where this object is spilled.
|
||||
const auto node_id_object_spilled =
|
||||
is_external_storage_type_fs_ ? self_node_id_ : NodeID::Nil();
|
||||
|
||||
auto it = objects_pending_spill_.find(object_id);
|
||||
RAY_CHECK(it != objects_pending_spill_.end());
|
||||
|
||||
// Write to object directory. Wait for the write to finish before
|
||||
// releasing the object to make sure that the spilled object can
|
||||
// be retrieved by other raylets.
|
||||
RAY_CHECK_OK(object_info_accessor_.AsyncAddSpilledUrl(
|
||||
object_id, object_url, node_id_object_spilled,
|
||||
object_id, object_url, node_id_object_spilled, it->second->GetSize(),
|
||||
[this, object_id, object_url, callback, num_remaining](Status status) {
|
||||
RAY_CHECK_OK(status);
|
||||
// Unpin the object.
|
||||
|
||||
@@ -194,7 +194,7 @@ class MockObjectInfoAccessor : public gcs::ObjectInfoAccessor {
|
||||
size_t object_size, const gcs::StatusCallback &callback));
|
||||
|
||||
Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url,
|
||||
const NodeID &spilled_node_id,
|
||||
const NodeID &spilled_node_id, size_t object_size,
|
||||
const gcs::StatusCallback &callback) {
|
||||
object_urls[object_id] = spilled_url;
|
||||
callbacks.push_back(callback);
|
||||
|
||||
Reference in New Issue
Block a user