From 9a658b568fb22444d2dddd8a4e1e9c4f1a45d90a Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Thu, 14 Jan 2021 14:48:10 -0700 Subject: [PATCH] [Core] Ownership-based Object Directory: Consolidate location table and reference table. (#13220) * Added owned object reference before Plasma put on Create() + Seal() path. * Consolidated location table and reference table in reference counter. * Restore type in definition. * Clean up owned reference on failed Seal(). * Added RemoveOwnedObject test for reference counter. * Guard against ref going out of scope before location RPCs. * Add 'owner must have ref in scope' precondition to documentation for object location methods. * Move to separate Create() + Seal() methods for existing objects. * Clearer distinction between Create() and Seal() methods. * Make it clear that references will normally be cleaned up by reference counting. --- python/ray/_raylet.pyx | 29 +++--- python/ray/includes/libcoreworker.pxd | 21 ++-- python/ray/test_utils.py | 7 ++ python/ray/tests/test_memory_limits.py | 5 +- .../tests/test_unreconstructable_errors.py | 4 +- python/ray/worker.py | 8 +- src/ray/core_worker/core_worker.cc | 97 +++++++++++++------ src/ray/core_worker/core_worker.h | 40 +++++--- ...io_ray_runtime_object_NativeObjectStore.cc | 13 ++- src/ray/core_worker/reference_count.cc | 54 ++++++++--- src/ray/core_worker/reference_count.h | 46 +++++---- src/ray/core_worker/reference_count_test.cc | 10 ++ 12 files changed, 219 insertions(+), 115 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 0d4f5b97b..8ba80852f 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -915,13 +915,13 @@ cdef class CoreWorker: CObjectID *c_object_id, shared_ptr[CBuffer] *data): if object_ref is None: with nogil: - check_status(CCoreWorkerProcess.GetCoreWorker().Create( + check_status(CCoreWorkerProcess.GetCoreWorker().CreateOwned( metadata, data_size, contained_ids, c_object_id, data)) else: c_object_id[0] = object_ref.native() with nogil: - check_status(CCoreWorkerProcess.GetCoreWorker().Create( + check_status(CCoreWorkerProcess.GetCoreWorker().CreateExisting( metadata, data_size, c_object_id[0], CCoreWorkerProcess.GetCoreWorker().GetRpcAddress(), data)) @@ -933,7 +933,7 @@ cdef class CoreWorker: return data.get() == NULL def put_file_like_object( - self, metadata, data_size, file_like, ObjectRef object_ref=None): + self, metadata, data_size, file_like, ObjectRef object_ref): """Directly create a new Plasma Store object from a file like object. This avoids extra memory copy. @@ -971,8 +971,9 @@ cdef class CoreWorker: # Using custom object refs is not supported because we # can't track their lifecycle, so we don't pin the object # in this case. - check_status(CCoreWorkerProcess.GetCoreWorker().Seal( - c_object_id, pin_object=False)) + check_status( + CCoreWorkerProcess.GetCoreWorker().SealExisting( + c_object_id, pin_object=False)) def put_serialized_object(self, serialized_object, ObjectRef object_ref=None, @@ -1007,12 +1008,18 @@ cdef class CoreWorker: c_object_id_vector, c_object_id)) else: with nogil: - # Using custom object refs is not supported because we - # can't track their lifecycle, so we don't pin the object - # in this case. - check_status(CCoreWorkerProcess.GetCoreWorker().Seal( - c_object_id, - pin_object and object_ref is None)) + if object_ref is None: + check_status( + CCoreWorkerProcess.GetCoreWorker().SealOwned( + c_object_id, + pin_object)) + else: + # Using custom object refs is not supported because we + # can't track their lifecycle, so we don't pin the + # object in this case. + check_status( + CCoreWorkerProcess.GetCoreWorker().SealExisting( + c_object_id, pin_object=False)) return c_object_id.Binary() diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 849cc70b2..f1acad1fa 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -169,16 +169,17 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus Put(const CRayObject &object, const c_vector[CObjectID] &contained_object_ids, const CObjectID &object_id) - CRayStatus Create(const shared_ptr[CBuffer] &metadata, - const size_t data_size, - const c_vector[CObjectID] &contained_object_ids, - CObjectID *object_id, shared_ptr[CBuffer] *data) - CRayStatus Create(const shared_ptr[CBuffer] &metadata, - const size_t data_size, - const CObjectID &object_id, - const CAddress &owner_address, - shared_ptr[CBuffer] *data) - CRayStatus Seal(const CObjectID &object_id, c_bool pin_object) + CRayStatus CreateOwned(const shared_ptr[CBuffer] &metadata, + const size_t data_size, + const c_vector[CObjectID] &contained_object_ids, + CObjectID *object_id, shared_ptr[CBuffer] *data) + CRayStatus CreateExisting(const shared_ptr[CBuffer] &metadata, + const size_t data_size, + const CObjectID &object_id, + const CAddress &owner_address, + shared_ptr[CBuffer] *data) + CRayStatus SealOwned(const CObjectID &object_id, c_bool pin_object) + CRayStatus SealExisting(const CObjectID &object_id, c_bool pin_object) CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms, c_vector[shared_ptr[CRayObject]] *results, c_bool plasma_objects_only) diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index 3751a89b6..eb93595e7 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -370,6 +370,13 @@ def put_object(obj, use_ray_put): return _put.remote(obj) +def put_unpinned_object(obj): + value = ray.worker.global_worker.get_serialization_context().serialize(obj) + return ray.ObjectRef( + ray.worker.global_worker.core_worker.put_serialized_object( + value, pin_object=False)) + + def wait_until_server_available(address, timeout_ms=5000, retry_interval_ms=100): diff --git a/python/ray/tests/test_memory_limits.py b/python/ray/tests/test_memory_limits.py index fd7f487fb..5d798ac24 100644 --- a/python/ray/tests/test_memory_limits.py +++ b/python/ray/tests/test_memory_limits.py @@ -2,6 +2,7 @@ import numpy as np import unittest import ray +from ray.test_utils import put_unpinned_object MB = 1024 * 1024 @@ -49,7 +50,7 @@ class TestMemoryLimits(unittest.TestCase): try: ray.init(num_cpus=1, _driver_object_store_memory=100 * MB) ray.worker.global_worker.put_object( - np.zeros(50 * MB, dtype=np.uint8), pin_object=False) + np.zeros(50 * MB, dtype=np.uint8)) self.assertRaises( OBJECT_TOO_LARGE, lambda: ray.put(np.zeros(200 * MB, dtype=np.uint8))) @@ -64,7 +65,7 @@ class TestMemoryLimits(unittest.TestCase): object_store_memory=300 * MB, _driver_object_store_memory=driver_quota) obj = np.ones(200 * 1024, dtype=np.uint8) - z = ray.worker.global_worker.put_object(obj, pin_object=False) + z = put_unpinned_object(obj) a = LightActor._remote(object_store_memory=a_quota) b = GreedyActor._remote(object_store_memory=b_quota) for _ in range(5): diff --git a/python/ray/tests/test_unreconstructable_errors.py b/python/ray/tests/test_unreconstructable_errors.py index a86d50da4..501dce905 100644 --- a/python/ray/tests/test_unreconstructable_errors.py +++ b/python/ray/tests/test_unreconstructable_errors.py @@ -2,6 +2,7 @@ import numpy as np import unittest import ray +from ray.test_utils import put_unpinned_object class TestObjectLostErrors(unittest.TestCase): @@ -15,8 +16,7 @@ class TestObjectLostErrors(unittest.TestCase): ray.shutdown() def testDriverPutEvictedCannotReconstruct(self): - x_id = ray.worker.global_worker.put_object( - np.zeros(1 * 1024 * 1024), pin_object=False) + x_id = put_unpinned_object(np.zeros(1 * 1024 * 1024)) ray.get(x_id) for _ in range(20): ray.put(np.zeros(10 * 1024 * 1024)) diff --git a/python/ray/worker.py b/python/ray/worker.py index 5212d522c..a5a28024a 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -228,7 +228,7 @@ class Worker: def set_load_code_from_local(self, load_code_from_local): self._load_code_from_local = load_code_from_local - def put_object(self, value, object_ref=None, pin_object=True): + def put_object(self, value, object_ref=None): """Put value in the local object store with object reference `object_ref`. This assumes that the value for `object_ref` has not yet been placed in @@ -242,7 +242,6 @@ class Worker: value: The value to put in the object store. object_ref (ObjectRef): The object ref of the value to be put. If None, one will be generated. - pin_object: If set, the object will be pinned at the raylet. Returns: ObjectRef: The object ref the object was put under. @@ -274,8 +273,7 @@ class Worker: # reference counter. return ray.ObjectRef( self.core_worker.put_serialized_object( - serialized_value, object_ref=object_ref, - pin_object=pin_object)) + serialized_value, object_ref=object_ref)) def deserialize_objects(self, data_metadata_pairs, object_refs): context = self.get_serialization_context() @@ -1418,7 +1416,7 @@ def put(value): worker.check_connected() with profiling.profile("ray.put"): try: - object_ref = worker.put_object(value, pin_object=True) + object_ref = worker.put_object(value) except ObjectStoreFullError: logger.info( "Put failed since the value was either too large or the " diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 3cd8e9825..16edda83b 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -867,7 +867,11 @@ Status CoreWorker::Put(const RayObject &object, reference_counter_->AddOwnedObject( *object_id, contained_object_ids, rpc_address_, CurrentCallSite(), object.GetSize(), /*is_reconstructable=*/false, NodeID::FromBinary(rpc_address_.raylet_id())); - return Put(object, contained_object_ids, *object_id, /*pin_object=*/true); + auto status = Put(object, contained_object_ids, *object_id, /*pin_object=*/true); + if (!status.ok()) { + reference_counter_->RemoveOwnedObject(*object_id); + } + return status; } Status CoreWorker::Put(const RayObject &object, @@ -906,33 +910,37 @@ Status CoreWorker::Put(const RayObject &object, return Status::OK(); } -Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t data_size, - const std::vector &contained_object_ids, - ObjectID *object_id, std::shared_ptr *data) { +Status CoreWorker::CreateOwned(const std::shared_ptr &metadata, + const size_t data_size, + const std::vector &contained_object_ids, + ObjectID *object_id, std::shared_ptr *data) { *object_id = ObjectID::FromIndex(worker_context_.GetCurrentTaskID(), worker_context_.GetNextPutIndex()); + reference_counter_->AddOwnedObject(*object_id, contained_object_ids, rpc_address_, + CurrentCallSite(), data_size + metadata->Size(), + /*is_reconstructable=*/false, + NodeID::FromBinary(rpc_address_.raylet_id())); if (options_.is_local_mode || (RayConfig::instance().put_small_object_in_memory_store() && static_cast(data_size) < RayConfig::instance().max_direct_call_object_size())) { *data = std::make_shared(data_size); } else { - RAY_RETURN_NOT_OK(plasma_store_provider_->Create( - metadata, data_size, *object_id, /* owner_address = */ rpc_address_, data)); - } - // Only add the object to the reference counter if it didn't already exist. - if (data) { - reference_counter_->AddOwnedObject(*object_id, contained_object_ids, rpc_address_, - CurrentCallSite(), data_size + metadata->Size(), - /*is_reconstructable=*/false, - NodeID::FromBinary(rpc_address_.raylet_id())); + auto status = + plasma_store_provider_->Create(metadata, data_size, *object_id, + /* owner_address = */ rpc_address_, data); + if (!status.ok() || !data) { + reference_counter_->RemoveOwnedObject(*object_id); + return status; + } } return Status::OK(); } -Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t data_size, - const ObjectID &object_id, const rpc::Address &owner_address, - std::shared_ptr *data) { +Status CoreWorker::CreateExisting(const std::shared_ptr &metadata, + const size_t data_size, const ObjectID &object_id, + const rpc::Address &owner_address, + std::shared_ptr *data) { if (options_.is_local_mode) { return Status::NotImplemented( "Creating an object with a pre-existing ObjectID is not supported in local mode"); @@ -942,8 +950,16 @@ Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t } } -Status CoreWorker::Seal(const ObjectID &object_id, bool pin_object, - const absl::optional &owner_address) { +Status CoreWorker::SealOwned(const ObjectID &object_id, bool pin_object) { + auto status = SealExisting(object_id, pin_object); + if (!status.ok()) { + reference_counter_->RemoveOwnedObject(object_id); + } + return status; +} + +Status CoreWorker::SealExisting(const ObjectID &object_id, bool pin_object, + const absl::optional &owner_address) { RAY_RETURN_NOT_OK(plasma_store_provider_->Seal(object_id)); if (pin_object) { // Tell the raylet to pin the object **after** it is created. @@ -1748,8 +1764,8 @@ Status CoreWorker::AllocateReturnObjects( RayConfig::instance().max_direct_call_object_size()) { data_buffer = std::make_shared(data_sizes[i]); } else { - RAY_RETURN_NOT_OK(Create(metadatas[i], data_sizes[i], object_ids[i], - owner_address, &data_buffer)); + RAY_RETURN_NOT_OK(CreateExisting(metadatas[i], data_sizes[i], object_ids[i], + owner_address, &data_buffer)); object_already_exists = !data_buffer; } } @@ -1833,7 +1849,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, } if (return_objects->at(i)->GetData() != nullptr && return_objects->at(i)->GetData()->IsPlasmaBuffer()) { - if (!Seal(return_ids[i], /*pin_object=*/true, caller_address).ok()) { + if (!SealExisting(return_ids[i], /*pin_object=*/true, caller_address).ok()) { RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to seal object " << return_ids[i] << " in store: " << status.message(); } @@ -2149,9 +2165,14 @@ void CoreWorker::HandleAddObjectLocationOwner( send_reply_callback)) { return; } - reference_counter_->AddObjectLocation(ObjectID::FromBinary(request.object_id()), - NodeID::FromBinary(request.node_id())); - send_reply_callback(Status::OK(), nullptr, nullptr); + auto object_id = ObjectID::FromBinary(request.object_id()); + auto reference_exists = reference_counter_->AddObjectLocation( + object_id, NodeID::FromBinary(request.node_id())); + Status status = + reference_exists + ? Status::OK() + : Status::ObjectNotFound("Object " + object_id.Hex() + " not found"); + send_reply_callback(status, nullptr, nullptr); } void CoreWorker::HandleRemoveObjectLocationOwner( @@ -2162,9 +2183,14 @@ void CoreWorker::HandleRemoveObjectLocationOwner( send_reply_callback)) { return; } - reference_counter_->RemoveObjectLocation(ObjectID::FromBinary(request.object_id()), - NodeID::FromBinary(request.node_id())); - send_reply_callback(Status::OK(), nullptr, nullptr); + auto object_id = ObjectID::FromBinary(request.object_id()); + auto reference_exists = reference_counter_->RemoveObjectLocation( + object_id, NodeID::FromBinary(request.node_id())); + Status status = + reference_exists + ? Status::OK() + : Status::ObjectNotFound("Object " + object_id.Hex() + " not found"); + send_reply_callback(status, nullptr, nullptr); } void CoreWorker::HandleGetObjectLocationsOwner( @@ -2175,12 +2201,19 @@ void CoreWorker::HandleGetObjectLocationsOwner( send_reply_callback)) { return; } - std::unordered_set node_ids = - reference_counter_->GetObjectLocations(ObjectID::FromBinary(request.object_id())); - for (const auto &node_id : node_ids) { - reply->add_node_ids(node_id.Binary()); + auto object_id = ObjectID::FromBinary(request.object_id()); + absl::optional> node_ids = + reference_counter_->GetObjectLocations(object_id); + Status status; + if (node_ids.has_value()) { + for (const auto &node_id : node_ids.value()) { + reply->add_node_ids(node_id.Binary()); + } + status = Status::OK(); + } else { + status = Status::ObjectNotFound("Object " + object_id.Hex() + " not found"); } - send_reply_callback(Status::OK(), nullptr, nullptr); + send_reply_callback(status, nullptr, nullptr); } void CoreWorker::HandleWaitForRefRemoved(const rpc::WaitForRefRemovedRequest &request, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index baa8561f7..597493c38 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -493,9 +493,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const ObjectID &object_id, bool pin_object = false); /// Create and return a buffer in the object store that can be directly written - /// into. After writing to the buffer, the caller must call `Seal()` to finalize - /// the object. The `Create()` and `Seal()` combination is an alternative interface - /// to `Put()` that allows frontends to avoid an extra copy when possible. + /// into. After writing to the buffer, the caller must call `SealOwned()` to + /// finalize the object. The `CreateOwned()` and `SealOwned()` combination is + /// an alternative interface to `Put()` that allows frontends to avoid an extra + /// copy when possible. /// /// \param[in] metadata Metadata of the object to be written. /// \param[in] data_size Size of the object to be written. @@ -503,14 +504,15 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[out] object_id Object ID generated for the put. /// \param[out] data Buffer for the user to write the object into. /// \return Status. - Status Create(const std::shared_ptr &metadata, const size_t data_size, - const std::vector &contained_object_ids, ObjectID *object_id, - std::shared_ptr *data); + Status CreateOwned(const std::shared_ptr &metadata, const size_t data_size, + const std::vector &contained_object_ids, + ObjectID *object_id, std::shared_ptr *data); /// Create and return a buffer in the object store that can be directly written - /// into. After writing to the buffer, the caller must call `Seal()` to finalize - /// the object. The `Create()` and `Seal()` combination is an alternative interface - /// to `Put()` that allows frontends to avoid an extra copy when possible. + /// into, for an object ID that already exists. After writing to the buffer, the + /// caller must call `SealExisting()` to finalize the object. The `CreateExisting()` + /// and `SealExisting()` combination is an alternative interface to `Put()` that + /// allows frontends to avoid an extra copy when possible. /// /// \param[in] metadata Metadata of the object to be written. /// \param[in] data_size Size of the object to be written. @@ -518,20 +520,28 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] owner_address The address of the object's owner. /// \param[out] data Buffer for the user to write the object into. /// \return Status. - Status Create(const std::shared_ptr &metadata, const size_t data_size, - const ObjectID &object_id, const rpc::Address &owner_address, - std::shared_ptr *data); + Status CreateExisting(const std::shared_ptr &metadata, const size_t data_size, + const ObjectID &object_id, const rpc::Address &owner_address, + std::shared_ptr *data); /// Finalize placing an object into the object store. This should be called after - /// a corresponding `Create()` call and then writing into the returned buffer. + /// a corresponding `CreateOwned()` call and then writing into the returned buffer. + /// + /// \param[in] object_id Object ID corresponding to the object. + /// \param[in] pin_object Whether or not to pin the object at the local raylet. + /// \return Status. + Status SealOwned(const ObjectID &object_id, bool pin_object); + + /// Finalize placing an object into the object store. This should be called after + /// a corresponding `CreateExisting()` call and then writing into the returned buffer. /// /// \param[in] object_id Object ID corresponding to the object. /// \param[in] pin_object Whether or not to pin the object at the local raylet. /// \param[in] owner_address Address of the owner of the object who will be contacted by /// the raylet if the object is pinned. If not provided, defaults to this worker. /// \return Status. - Status Seal(const ObjectID &object_id, bool pin_object, - const absl::optional &owner_address = absl::nullopt); + Status SealExisting(const ObjectID &object_id, bool pin_object, + const absl::optional &owner_address = absl::nullopt); /// Get a list of objects from the object store. Objects that failed to be retrieved /// will be returned as nullptrs. diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc index d66088de1..96b97c490 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc @@ -33,11 +33,11 @@ ray::Status PutSerializedObject(JNIEnv *env, jobject obj, ray::ObjectID object_i std::shared_ptr data; ray::Status status; if (object_id.IsNil()) { - status = ray::CoreWorkerProcess::GetCoreWorker().Create( + status = ray::CoreWorkerProcess::GetCoreWorker().CreateOwned( native_ray_object->GetMetadata(), data_size, native_ray_object->GetNestedIds(), out_object_id, &data); } else { - status = ray::CoreWorkerProcess::GetCoreWorker().Create( + status = ray::CoreWorkerProcess::GetCoreWorker().CreateExisting( native_ray_object->GetMetadata(), data_size, object_id, ray::CoreWorkerProcess::GetCoreWorker().GetRpcAddress(), &data); *out_object_id = object_id; @@ -53,8 +53,13 @@ ray::Status PutSerializedObject(JNIEnv *env, jobject obj, ray::ObjectID object_i if (data->Size() > 0) { memcpy(data->Data(), native_ray_object->GetData()->Data(), data->Size()); } - RAY_CHECK_OK(ray::CoreWorkerProcess::GetCoreWorker().Seal( - *out_object_id, pin_object && object_id.IsNil())); + if (object_id.IsNil()) { + RAY_CHECK_OK( + ray::CoreWorkerProcess::GetCoreWorker().SealOwned(*out_object_id, pin_object)); + } else { + RAY_CHECK_OK(ray::CoreWorkerProcess::GetCoreWorker().SealExisting( + *out_object_id, /* pin_object = */ false)); + } } return ray::Status::OK(); } diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index be89794ed..c638f831d 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -166,6 +166,20 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id, } } +void ReferenceCounter::RemoveOwnedObject(const ObjectID &object_id) { + absl::MutexLock lock(&mutex_); + auto it = object_id_refs_.find(object_id); + RAY_CHECK(it != object_id_refs_.end()) + << "Tried to remove reference for nonexistent owned object " << object_id + << ", object must be added with ReferenceCounter::AddOwnedObject() before it " + << "can be removed"; + RAY_CHECK(it->second.RefCount() == 0) + << "Tried to remove reference for owned object " << object_id << " that has " + << it->second.RefCount() << " references, must have 0 references to be removed"; + RAY_LOG(DEBUG) << "Removing owned object " << object_id; + DeleteReferenceInternal(it, nullptr); +} + void ReferenceCounter::UpdateObjectSize(const ObjectID &object_id, int64_t object_size) { absl::MutexLock lock(&mutex_); auto it = object_id_refs_.find(object_id); @@ -896,34 +910,42 @@ void ReferenceCounter::SetReleaseLineageCallback( on_lineage_released_ = callback; } -void ReferenceCounter::AddObjectLocation(const ObjectID &object_id, +bool ReferenceCounter::AddObjectLocation(const ObjectID &object_id, const NodeID &node_id) { absl::MutexLock lock(&mutex_); - auto it = object_id_locations_.find(object_id); - if (it == object_id_locations_.end()) { - it = object_id_locations_.emplace(object_id, absl::flat_hash_set()).first; + auto it = object_id_refs_.find(object_id); + if (it == object_id_refs_.end()) { + RAY_LOG(WARNING) << "Tried to add an object location for an object " << object_id + << " that doesn't exist in the reference table"; + return false; } - it->second.insert(node_id); + it->second.locations.insert(node_id); + return true; } -void ReferenceCounter::RemoveObjectLocation(const ObjectID &object_id, +bool ReferenceCounter::RemoveObjectLocation(const ObjectID &object_id, const NodeID &node_id) { absl::MutexLock lock(&mutex_); - auto it = object_id_locations_.find(object_id); - RAY_CHECK(it != object_id_locations_.end()); - it->second.erase(node_id); + auto it = object_id_refs_.find(object_id); + if (it == object_id_refs_.end()) { + RAY_LOG(WARNING) << "Tried to remove an object location for an object " << object_id + << " that doesn't exist in the reference table"; + return false; + } + it->second.locations.erase(node_id); + return true; } -std::unordered_set ReferenceCounter::GetObjectLocations( +absl::optional> ReferenceCounter::GetObjectLocations( const ObjectID &object_id) { absl::MutexLock lock(&mutex_); - auto it = object_id_locations_.find(object_id); - RAY_CHECK(it != object_id_locations_.end()); - std::unordered_set locations; - for (const auto &location : it->second) { - locations.insert(location); + auto it = object_id_refs_.find(object_id); + if (it == object_id_refs_.end()) { + RAY_LOG(WARNING) << "Tried to get the object locations for an object " << object_id + << " that doesn't exist in the reference table"; + return absl::nullopt; } - return locations; + return it->second.locations; } void ReferenceCounter::HandleObjectSpilled(const ObjectID &object_id) { diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index d18684e32..caceabc53 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -172,6 +172,15 @@ class ReferenceCounter : public ReferenceCounterInterface, const absl::optional &pinned_at_raylet_id = absl::optional()) LOCKS_EXCLUDED(mutex_); + /// Remove reference for an object that we own. The reference will only be + /// removed if the object's ref count is 0. This should only be used when + /// speculatively adding an owned reference that may need to be rolled back, e.g. if + /// the creation of the corresponding Plasma object fails. All other references will + /// be cleaned up via the reference counting protocol. + /// + /// \param[in] object_id The ID of the object that we own and wish to remove. + void RemoveOwnedObject(const ObjectID &object_id) LOCKS_EXCLUDED(mutex_); + /// Update the size of the object. /// /// \param[in] object_id The ID of the object. @@ -361,26 +370,32 @@ class ReferenceCounter : public ReferenceCounterInterface, const absl::flat_hash_map> pinned_objects, rpc::CoreWorkerStats *stats) const LOCKS_EXCLUDED(mutex_); - /// Add location to the location table of the given object. + /// Add a new location for the given object. The owner must have the object ref in + /// scope. /// /// \param[in] object_id The object to update. - /// \param[in] node_id The node to be added to the location table. - void AddObjectLocation(const ObjectID &object_id, const NodeID &node_id) + /// \param[in] node_id The new object location to be added. + /// \return True if the reference exists, false otherwise. + bool AddObjectLocation(const ObjectID &object_id, const NodeID &node_id) LOCKS_EXCLUDED(mutex_); - /// Remove location from the location table of the given object. + /// Remove a location for the given object. The owner must have the object ref in + /// scope. /// /// \param[in] object_id The object to update. - /// \param[in] node_id The node to be removed from the location table. - void RemoveObjectLocation(const ObjectID &object_id, const NodeID &node_id) + /// \param[in] node_id The object location to be removed. + /// \return True if the reference exists, false otherwise. + bool RemoveObjectLocation(const ObjectID &object_id, const NodeID &node_id) LOCKS_EXCLUDED(mutex_); - /// Get the locations from the location table of the given object. + /// Get the locations of the given object. The owner must have the object ref in + /// scope. /// /// \param[in] object_id The object to get locations for. - /// \return The nodes that have the object. - std::unordered_set GetObjectLocations(const ObjectID &object_id) - LOCKS_EXCLUDED(mutex_); + /// \return The nodes that have the object if the reference exists, empty optional + /// otherwise. + absl::optional> GetObjectLocations( + const ObjectID &object_id) LOCKS_EXCLUDED(mutex_); /// Handle an object has been spilled to external storage. /// @@ -475,6 +490,9 @@ class ReferenceCounter : public ReferenceCounterInterface, // counting is enabled, then some raylet must be pinning the object value. // This is the address of that raylet. absl::optional pinned_at_raylet_id; + // If this object is owned by us and stored in plasma, this contains all + // object locations. + absl::flat_hash_set locations; // Whether this object can be reconstructed via lineage. If false, then the // object's value will be pinned as long as it is referenced by any other // object's lineage. @@ -695,14 +713,6 @@ class ReferenceCounter : public ReferenceCounterInterface, /// Holds all reference counts and dependency information for tracked ObjectIDs. ReferenceTable object_id_refs_ GUARDED_BY(mutex_); - using LocationTable = absl::flat_hash_map>; - - /// Holds the client information for the owned objects. This table is seperate from - /// the reference table because we add object reference after putting object into the - /// plasma store and add the location to the object directory. Therefore we will receive - /// object location information before the reference is created. - LocationTable object_id_locations_ GUARDED_BY(mutex_); - /// Objects whose values have been freed by the language frontend. /// The values in plasma will not be pinned. An object ID is /// removed from this set once its Reference has been deleted diff --git a/src/ray/core_worker/reference_count_test.cc b/src/ray/core_worker/reference_count_test.cc index 4d36851f6..a6be5bf07 100644 --- a/src/ray/core_worker/reference_count_test.cc +++ b/src/ray/core_worker/reference_count_test.cc @@ -2108,6 +2108,16 @@ TEST_F(ReferenceCountTest, TestFree) { ASSERT_FALSE(rc->IsPlasmaObjectFreed(id)); } +TEST_F(ReferenceCountTest, TestRemoveOwnedObject) { + ObjectID id = ObjectID::FromRandom(); + + // Test remove owned object. + rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, false); + ASSERT_TRUE(rc->HasReference(id)); + rc->RemoveOwnedObject(id); + ASSERT_FALSE(rc->HasReference(id)); +} + } // namespace ray int main(int argc, char **argv) {