[Core] Ownership-based Object Directory: Consolidate location table and reference table. (#13220)

* Added owned object reference before Plasma put on Create() + Seal() path.

* Consolidated location table and reference table in reference counter.

* Restore type in definition.

* Clean up owned reference on failed Seal().

* Added RemoveOwnedObject test for reference counter.

* Guard against ref going out of scope before location RPCs.

* Add 'owner must have ref in scope' precondition to documentation for object location methods.

* Move to separate Create() + Seal() methods for existing objects.

* Clearer distinction between Create() and Seal() methods.

* Make it clear that references will normally be cleaned up by reference counting.
This commit is contained in:
Clark Zinzow
2021-01-14 14:48:10 -07:00
committed by GitHub
parent d1e9887be2
commit 9a658b568f
12 changed files with 219 additions and 115 deletions
+18 -11
View File
@@ -915,13 +915,13 @@ cdef class CoreWorker:
CObjectID *c_object_id, shared_ptr[CBuffer] *data):
if object_ref is None:
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().Create(
check_status(CCoreWorkerProcess.GetCoreWorker().CreateOwned(
metadata, data_size, contained_ids,
c_object_id, data))
else:
c_object_id[0] = object_ref.native()
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().Create(
check_status(CCoreWorkerProcess.GetCoreWorker().CreateExisting(
metadata, data_size, c_object_id[0],
CCoreWorkerProcess.GetCoreWorker().GetRpcAddress(),
data))
@@ -933,7 +933,7 @@ cdef class CoreWorker:
return data.get() == NULL
def put_file_like_object(
self, metadata, data_size, file_like, ObjectRef object_ref=None):
self, metadata, data_size, file_like, ObjectRef object_ref):
"""Directly create a new Plasma Store object from a file like
object. This avoids extra memory copy.
@@ -971,8 +971,9 @@ cdef class CoreWorker:
# Using custom object refs is not supported because we
# can't track their lifecycle, so we don't pin the object
# in this case.
check_status(CCoreWorkerProcess.GetCoreWorker().Seal(
c_object_id, pin_object=False))
check_status(
CCoreWorkerProcess.GetCoreWorker().SealExisting(
c_object_id, pin_object=False))
def put_serialized_object(self, serialized_object,
ObjectRef object_ref=None,
@@ -1007,12 +1008,18 @@ cdef class CoreWorker:
c_object_id_vector, c_object_id))
else:
with nogil:
# Using custom object refs is not supported because we
# can't track their lifecycle, so we don't pin the object
# in this case.
check_status(CCoreWorkerProcess.GetCoreWorker().Seal(
c_object_id,
pin_object and object_ref is None))
if object_ref is None:
check_status(
CCoreWorkerProcess.GetCoreWorker().SealOwned(
c_object_id,
pin_object))
else:
# Using custom object refs is not supported because we
# can't track their lifecycle, so we don't pin the
# object in this case.
check_status(
CCoreWorkerProcess.GetCoreWorker().SealExisting(
c_object_id, pin_object=False))
return c_object_id.Binary()
+11 -10
View File
@@ -169,16 +169,17 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CRayStatus Put(const CRayObject &object,
const c_vector[CObjectID] &contained_object_ids,
const CObjectID &object_id)
CRayStatus Create(const shared_ptr[CBuffer] &metadata,
const size_t data_size,
const c_vector[CObjectID] &contained_object_ids,
CObjectID *object_id, shared_ptr[CBuffer] *data)
CRayStatus Create(const shared_ptr[CBuffer] &metadata,
const size_t data_size,
const CObjectID &object_id,
const CAddress &owner_address,
shared_ptr[CBuffer] *data)
CRayStatus Seal(const CObjectID &object_id, c_bool pin_object)
CRayStatus CreateOwned(const shared_ptr[CBuffer] &metadata,
const size_t data_size,
const c_vector[CObjectID] &contained_object_ids,
CObjectID *object_id, shared_ptr[CBuffer] *data)
CRayStatus CreateExisting(const shared_ptr[CBuffer] &metadata,
const size_t data_size,
const CObjectID &object_id,
const CAddress &owner_address,
shared_ptr[CBuffer] *data)
CRayStatus SealOwned(const CObjectID &object_id, c_bool pin_object)
CRayStatus SealExisting(const CObjectID &object_id, c_bool pin_object)
CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms,
c_vector[shared_ptr[CRayObject]] *results,
c_bool plasma_objects_only)
+7
View File
@@ -370,6 +370,13 @@ def put_object(obj, use_ray_put):
return _put.remote(obj)
def put_unpinned_object(obj):
value = ray.worker.global_worker.get_serialization_context().serialize(obj)
return ray.ObjectRef(
ray.worker.global_worker.core_worker.put_serialized_object(
value, pin_object=False))
def wait_until_server_available(address,
timeout_ms=5000,
retry_interval_ms=100):
+3 -2
View File
@@ -2,6 +2,7 @@ import numpy as np
import unittest
import ray
from ray.test_utils import put_unpinned_object
MB = 1024 * 1024
@@ -49,7 +50,7 @@ class TestMemoryLimits(unittest.TestCase):
try:
ray.init(num_cpus=1, _driver_object_store_memory=100 * MB)
ray.worker.global_worker.put_object(
np.zeros(50 * MB, dtype=np.uint8), pin_object=False)
np.zeros(50 * MB, dtype=np.uint8))
self.assertRaises(
OBJECT_TOO_LARGE,
lambda: ray.put(np.zeros(200 * MB, dtype=np.uint8)))
@@ -64,7 +65,7 @@ class TestMemoryLimits(unittest.TestCase):
object_store_memory=300 * MB,
_driver_object_store_memory=driver_quota)
obj = np.ones(200 * 1024, dtype=np.uint8)
z = ray.worker.global_worker.put_object(obj, pin_object=False)
z = put_unpinned_object(obj)
a = LightActor._remote(object_store_memory=a_quota)
b = GreedyActor._remote(object_store_memory=b_quota)
for _ in range(5):
@@ -2,6 +2,7 @@ import numpy as np
import unittest
import ray
from ray.test_utils import put_unpinned_object
class TestObjectLostErrors(unittest.TestCase):
@@ -15,8 +16,7 @@ class TestObjectLostErrors(unittest.TestCase):
ray.shutdown()
def testDriverPutEvictedCannotReconstruct(self):
x_id = ray.worker.global_worker.put_object(
np.zeros(1 * 1024 * 1024), pin_object=False)
x_id = put_unpinned_object(np.zeros(1 * 1024 * 1024))
ray.get(x_id)
for _ in range(20):
ray.put(np.zeros(10 * 1024 * 1024))
+3 -5
View File
@@ -228,7 +228,7 @@ class Worker:
def set_load_code_from_local(self, load_code_from_local):
self._load_code_from_local = load_code_from_local
def put_object(self, value, object_ref=None, pin_object=True):
def put_object(self, value, object_ref=None):
"""Put value in the local object store with object reference `object_ref`.
This assumes that the value for `object_ref` has not yet been placed in
@@ -242,7 +242,6 @@ class Worker:
value: The value to put in the object store.
object_ref (ObjectRef): The object ref of the value to be
put. If None, one will be generated.
pin_object: If set, the object will be pinned at the raylet.
Returns:
ObjectRef: The object ref the object was put under.
@@ -274,8 +273,7 @@ class Worker:
# reference counter.
return ray.ObjectRef(
self.core_worker.put_serialized_object(
serialized_value, object_ref=object_ref,
pin_object=pin_object))
serialized_value, object_ref=object_ref))
def deserialize_objects(self, data_metadata_pairs, object_refs):
context = self.get_serialization_context()
@@ -1418,7 +1416,7 @@ def put(value):
worker.check_connected()
with profiling.profile("ray.put"):
try:
object_ref = worker.put_object(value, pin_object=True)
object_ref = worker.put_object(value)
except ObjectStoreFullError:
logger.info(
"Put failed since the value was either too large or the "
+65 -32
View File
@@ -867,7 +867,11 @@ Status CoreWorker::Put(const RayObject &object,
reference_counter_->AddOwnedObject(
*object_id, contained_object_ids, rpc_address_, CurrentCallSite(), object.GetSize(),
/*is_reconstructable=*/false, NodeID::FromBinary(rpc_address_.raylet_id()));
return Put(object, contained_object_ids, *object_id, /*pin_object=*/true);
auto status = Put(object, contained_object_ids, *object_id, /*pin_object=*/true);
if (!status.ok()) {
reference_counter_->RemoveOwnedObject(*object_id);
}
return status;
}
Status CoreWorker::Put(const RayObject &object,
@@ -906,33 +910,37 @@ Status CoreWorker::Put(const RayObject &object,
return Status::OK();
}
Status CoreWorker::Create(const std::shared_ptr<Buffer> &metadata, const size_t data_size,
const std::vector<ObjectID> &contained_object_ids,
ObjectID *object_id, std::shared_ptr<Buffer> *data) {
Status CoreWorker::CreateOwned(const std::shared_ptr<Buffer> &metadata,
const size_t data_size,
const std::vector<ObjectID> &contained_object_ids,
ObjectID *object_id, std::shared_ptr<Buffer> *data) {
*object_id = ObjectID::FromIndex(worker_context_.GetCurrentTaskID(),
worker_context_.GetNextPutIndex());
reference_counter_->AddOwnedObject(*object_id, contained_object_ids, rpc_address_,
CurrentCallSite(), data_size + metadata->Size(),
/*is_reconstructable=*/false,
NodeID::FromBinary(rpc_address_.raylet_id()));
if (options_.is_local_mode ||
(RayConfig::instance().put_small_object_in_memory_store() &&
static_cast<int64_t>(data_size) <
RayConfig::instance().max_direct_call_object_size())) {
*data = std::make_shared<LocalMemoryBuffer>(data_size);
} else {
RAY_RETURN_NOT_OK(plasma_store_provider_->Create(
metadata, data_size, *object_id, /* owner_address = */ rpc_address_, data));
}
// Only add the object to the reference counter if it didn't already exist.
if (data) {
reference_counter_->AddOwnedObject(*object_id, contained_object_ids, rpc_address_,
CurrentCallSite(), data_size + metadata->Size(),
/*is_reconstructable=*/false,
NodeID::FromBinary(rpc_address_.raylet_id()));
auto status =
plasma_store_provider_->Create(metadata, data_size, *object_id,
/* owner_address = */ rpc_address_, data);
if (!status.ok() || !data) {
reference_counter_->RemoveOwnedObject(*object_id);
return status;
}
}
return Status::OK();
}
Status CoreWorker::Create(const std::shared_ptr<Buffer> &metadata, const size_t data_size,
const ObjectID &object_id, const rpc::Address &owner_address,
std::shared_ptr<Buffer> *data) {
Status CoreWorker::CreateExisting(const std::shared_ptr<Buffer> &metadata,
const size_t data_size, const ObjectID &object_id,
const rpc::Address &owner_address,
std::shared_ptr<Buffer> *data) {
if (options_.is_local_mode) {
return Status::NotImplemented(
"Creating an object with a pre-existing ObjectID is not supported in local mode");
@@ -942,8 +950,16 @@ Status CoreWorker::Create(const std::shared_ptr<Buffer> &metadata, const size_t
}
}
Status CoreWorker::Seal(const ObjectID &object_id, bool pin_object,
const absl::optional<rpc::Address> &owner_address) {
Status CoreWorker::SealOwned(const ObjectID &object_id, bool pin_object) {
auto status = SealExisting(object_id, pin_object);
if (!status.ok()) {
reference_counter_->RemoveOwnedObject(object_id);
}
return status;
}
Status CoreWorker::SealExisting(const ObjectID &object_id, bool pin_object,
const absl::optional<rpc::Address> &owner_address) {
RAY_RETURN_NOT_OK(plasma_store_provider_->Seal(object_id));
if (pin_object) {
// Tell the raylet to pin the object **after** it is created.
@@ -1748,8 +1764,8 @@ Status CoreWorker::AllocateReturnObjects(
RayConfig::instance().max_direct_call_object_size()) {
data_buffer = std::make_shared<LocalMemoryBuffer>(data_sizes[i]);
} else {
RAY_RETURN_NOT_OK(Create(metadatas[i], data_sizes[i], object_ids[i],
owner_address, &data_buffer));
RAY_RETURN_NOT_OK(CreateExisting(metadatas[i], data_sizes[i], object_ids[i],
owner_address, &data_buffer));
object_already_exists = !data_buffer;
}
}
@@ -1833,7 +1849,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
}
if (return_objects->at(i)->GetData() != nullptr &&
return_objects->at(i)->GetData()->IsPlasmaBuffer()) {
if (!Seal(return_ids[i], /*pin_object=*/true, caller_address).ok()) {
if (!SealExisting(return_ids[i], /*pin_object=*/true, caller_address).ok()) {
RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to seal object "
<< return_ids[i] << " in store: " << status.message();
}
@@ -2149,9 +2165,14 @@ void CoreWorker::HandleAddObjectLocationOwner(
send_reply_callback)) {
return;
}
reference_counter_->AddObjectLocation(ObjectID::FromBinary(request.object_id()),
NodeID::FromBinary(request.node_id()));
send_reply_callback(Status::OK(), nullptr, nullptr);
auto object_id = ObjectID::FromBinary(request.object_id());
auto reference_exists = reference_counter_->AddObjectLocation(
object_id, NodeID::FromBinary(request.node_id()));
Status status =
reference_exists
? Status::OK()
: Status::ObjectNotFound("Object " + object_id.Hex() + " not found");
send_reply_callback(status, nullptr, nullptr);
}
void CoreWorker::HandleRemoveObjectLocationOwner(
@@ -2162,9 +2183,14 @@ void CoreWorker::HandleRemoveObjectLocationOwner(
send_reply_callback)) {
return;
}
reference_counter_->RemoveObjectLocation(ObjectID::FromBinary(request.object_id()),
NodeID::FromBinary(request.node_id()));
send_reply_callback(Status::OK(), nullptr, nullptr);
auto object_id = ObjectID::FromBinary(request.object_id());
auto reference_exists = reference_counter_->RemoveObjectLocation(
object_id, NodeID::FromBinary(request.node_id()));
Status status =
reference_exists
? Status::OK()
: Status::ObjectNotFound("Object " + object_id.Hex() + " not found");
send_reply_callback(status, nullptr, nullptr);
}
void CoreWorker::HandleGetObjectLocationsOwner(
@@ -2175,12 +2201,19 @@ void CoreWorker::HandleGetObjectLocationsOwner(
send_reply_callback)) {
return;
}
std::unordered_set<NodeID> node_ids =
reference_counter_->GetObjectLocations(ObjectID::FromBinary(request.object_id()));
for (const auto &node_id : node_ids) {
reply->add_node_ids(node_id.Binary());
auto object_id = ObjectID::FromBinary(request.object_id());
absl::optional<absl::flat_hash_set<NodeID>> node_ids =
reference_counter_->GetObjectLocations(object_id);
Status status;
if (node_ids.has_value()) {
for (const auto &node_id : node_ids.value()) {
reply->add_node_ids(node_id.Binary());
}
status = Status::OK();
} else {
status = Status::ObjectNotFound("Object " + object_id.Hex() + " not found");
}
send_reply_callback(Status::OK(), nullptr, nullptr);
send_reply_callback(status, nullptr, nullptr);
}
void CoreWorker::HandleWaitForRefRemoved(const rpc::WaitForRefRemovedRequest &request,
+25 -15
View File
@@ -493,9 +493,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
const ObjectID &object_id, bool pin_object = false);
/// Create and return a buffer in the object store that can be directly written
/// into. After writing to the buffer, the caller must call `Seal()` to finalize
/// the object. The `Create()` and `Seal()` combination is an alternative interface
/// to `Put()` that allows frontends to avoid an extra copy when possible.
/// into. After writing to the buffer, the caller must call `SealOwned()` to
/// finalize the object. The `CreateOwned()` and `SealOwned()` combination is
/// an alternative interface to `Put()` that allows frontends to avoid an extra
/// copy when possible.
///
/// \param[in] metadata Metadata of the object to be written.
/// \param[in] data_size Size of the object to be written.
@@ -503,14 +504,15 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[out] object_id Object ID generated for the put.
/// \param[out] data Buffer for the user to write the object into.
/// \return Status.
Status Create(const std::shared_ptr<Buffer> &metadata, const size_t data_size,
const std::vector<ObjectID> &contained_object_ids, ObjectID *object_id,
std::shared_ptr<Buffer> *data);
Status CreateOwned(const std::shared_ptr<Buffer> &metadata, const size_t data_size,
const std::vector<ObjectID> &contained_object_ids,
ObjectID *object_id, std::shared_ptr<Buffer> *data);
/// Create and return a buffer in the object store that can be directly written
/// into. After writing to the buffer, the caller must call `Seal()` to finalize
/// the object. The `Create()` and `Seal()` combination is an alternative interface
/// to `Put()` that allows frontends to avoid an extra copy when possible.
/// into, for an object ID that already exists. After writing to the buffer, the
/// caller must call `SealExisting()` to finalize the object. The `CreateExisting()`
/// and `SealExisting()` combination is an alternative interface to `Put()` that
/// allows frontends to avoid an extra copy when possible.
///
/// \param[in] metadata Metadata of the object to be written.
/// \param[in] data_size Size of the object to be written.
@@ -518,20 +520,28 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[in] owner_address The address of the object's owner.
/// \param[out] data Buffer for the user to write the object into.
/// \return Status.
Status Create(const std::shared_ptr<Buffer> &metadata, const size_t data_size,
const ObjectID &object_id, const rpc::Address &owner_address,
std::shared_ptr<Buffer> *data);
Status CreateExisting(const std::shared_ptr<Buffer> &metadata, const size_t data_size,
const ObjectID &object_id, const rpc::Address &owner_address,
std::shared_ptr<Buffer> *data);
/// Finalize placing an object into the object store. This should be called after
/// a corresponding `Create()` call and then writing into the returned buffer.
/// a corresponding `CreateOwned()` call and then writing into the returned buffer.
///
/// \param[in] object_id Object ID corresponding to the object.
/// \param[in] pin_object Whether or not to pin the object at the local raylet.
/// \return Status.
Status SealOwned(const ObjectID &object_id, bool pin_object);
/// Finalize placing an object into the object store. This should be called after
/// a corresponding `CreateExisting()` call and then writing into the returned buffer.
///
/// \param[in] object_id Object ID corresponding to the object.
/// \param[in] pin_object Whether or not to pin the object at the local raylet.
/// \param[in] owner_address Address of the owner of the object who will be contacted by
/// the raylet if the object is pinned. If not provided, defaults to this worker.
/// \return Status.
Status Seal(const ObjectID &object_id, bool pin_object,
const absl::optional<rpc::Address> &owner_address = absl::nullopt);
Status SealExisting(const ObjectID &object_id, bool pin_object,
const absl::optional<rpc::Address> &owner_address = absl::nullopt);
/// Get a list of objects from the object store. Objects that failed to be retrieved
/// will be returned as nullptrs.
@@ -33,11 +33,11 @@ ray::Status PutSerializedObject(JNIEnv *env, jobject obj, ray::ObjectID object_i
std::shared_ptr<ray::Buffer> data;
ray::Status status;
if (object_id.IsNil()) {
status = ray::CoreWorkerProcess::GetCoreWorker().Create(
status = ray::CoreWorkerProcess::GetCoreWorker().CreateOwned(
native_ray_object->GetMetadata(), data_size, native_ray_object->GetNestedIds(),
out_object_id, &data);
} else {
status = ray::CoreWorkerProcess::GetCoreWorker().Create(
status = ray::CoreWorkerProcess::GetCoreWorker().CreateExisting(
native_ray_object->GetMetadata(), data_size, object_id,
ray::CoreWorkerProcess::GetCoreWorker().GetRpcAddress(), &data);
*out_object_id = object_id;
@@ -53,8 +53,13 @@ ray::Status PutSerializedObject(JNIEnv *env, jobject obj, ray::ObjectID object_i
if (data->Size() > 0) {
memcpy(data->Data(), native_ray_object->GetData()->Data(), data->Size());
}
RAY_CHECK_OK(ray::CoreWorkerProcess::GetCoreWorker().Seal(
*out_object_id, pin_object && object_id.IsNil()));
if (object_id.IsNil()) {
RAY_CHECK_OK(
ray::CoreWorkerProcess::GetCoreWorker().SealOwned(*out_object_id, pin_object));
} else {
RAY_CHECK_OK(ray::CoreWorkerProcess::GetCoreWorker().SealExisting(
*out_object_id, /* pin_object = */ false));
}
}
return ray::Status::OK();
}
+38 -16
View File
@@ -166,6 +166,20 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id,
}
}
void ReferenceCounter::RemoveOwnedObject(const ObjectID &object_id) {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
RAY_CHECK(it != object_id_refs_.end())
<< "Tried to remove reference for nonexistent owned object " << object_id
<< ", object must be added with ReferenceCounter::AddOwnedObject() before it "
<< "can be removed";
RAY_CHECK(it->second.RefCount() == 0)
<< "Tried to remove reference for owned object " << object_id << " that has "
<< it->second.RefCount() << " references, must have 0 references to be removed";
RAY_LOG(DEBUG) << "Removing owned object " << object_id;
DeleteReferenceInternal(it, nullptr);
}
void ReferenceCounter::UpdateObjectSize(const ObjectID &object_id, int64_t object_size) {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
@@ -896,34 +910,42 @@ void ReferenceCounter::SetReleaseLineageCallback(
on_lineage_released_ = callback;
}
void ReferenceCounter::AddObjectLocation(const ObjectID &object_id,
bool ReferenceCounter::AddObjectLocation(const ObjectID &object_id,
const NodeID &node_id) {
absl::MutexLock lock(&mutex_);
auto it = object_id_locations_.find(object_id);
if (it == object_id_locations_.end()) {
it = object_id_locations_.emplace(object_id, absl::flat_hash_set<NodeID>()).first;
auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) {
RAY_LOG(WARNING) << "Tried to add an object location for an object " << object_id
<< " that doesn't exist in the reference table";
return false;
}
it->second.insert(node_id);
it->second.locations.insert(node_id);
return true;
}
void ReferenceCounter::RemoveObjectLocation(const ObjectID &object_id,
bool ReferenceCounter::RemoveObjectLocation(const ObjectID &object_id,
const NodeID &node_id) {
absl::MutexLock lock(&mutex_);
auto it = object_id_locations_.find(object_id);
RAY_CHECK(it != object_id_locations_.end());
it->second.erase(node_id);
auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) {
RAY_LOG(WARNING) << "Tried to remove an object location for an object " << object_id
<< " that doesn't exist in the reference table";
return false;
}
it->second.locations.erase(node_id);
return true;
}
std::unordered_set<NodeID> ReferenceCounter::GetObjectLocations(
absl::optional<absl::flat_hash_set<NodeID>> ReferenceCounter::GetObjectLocations(
const ObjectID &object_id) {
absl::MutexLock lock(&mutex_);
auto it = object_id_locations_.find(object_id);
RAY_CHECK(it != object_id_locations_.end());
std::unordered_set<NodeID> locations;
for (const auto &location : it->second) {
locations.insert(location);
auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) {
RAY_LOG(WARNING) << "Tried to get the object locations for an object " << object_id
<< " that doesn't exist in the reference table";
return absl::nullopt;
}
return locations;
return it->second.locations;
}
void ReferenceCounter::HandleObjectSpilled(const ObjectID &object_id) {
+28 -18
View File
@@ -172,6 +172,15 @@ class ReferenceCounter : public ReferenceCounterInterface,
const absl::optional<NodeID> &pinned_at_raylet_id = absl::optional<NodeID>())
LOCKS_EXCLUDED(mutex_);
/// Remove reference for an object that we own. The reference will only be
/// removed if the object's ref count is 0. This should only be used when
/// speculatively adding an owned reference that may need to be rolled back, e.g. if
/// the creation of the corresponding Plasma object fails. All other references will
/// be cleaned up via the reference counting protocol.
///
/// \param[in] object_id The ID of the object that we own and wish to remove.
void RemoveOwnedObject(const ObjectID &object_id) LOCKS_EXCLUDED(mutex_);
/// Update the size of the object.
///
/// \param[in] object_id The ID of the object.
@@ -361,26 +370,32 @@ class ReferenceCounter : public ReferenceCounterInterface,
const absl::flat_hash_map<ObjectID, std::pair<int64_t, std::string>> pinned_objects,
rpc::CoreWorkerStats *stats) const LOCKS_EXCLUDED(mutex_);
/// Add location to the location table of the given object.
/// Add a new location for the given object. The owner must have the object ref in
/// scope.
///
/// \param[in] object_id The object to update.
/// \param[in] node_id The node to be added to the location table.
void AddObjectLocation(const ObjectID &object_id, const NodeID &node_id)
/// \param[in] node_id The new object location to be added.
/// \return True if the reference exists, false otherwise.
bool AddObjectLocation(const ObjectID &object_id, const NodeID &node_id)
LOCKS_EXCLUDED(mutex_);
/// Remove location from the location table of the given object.
/// Remove a location for the given object. The owner must have the object ref in
/// scope.
///
/// \param[in] object_id The object to update.
/// \param[in] node_id The node to be removed from the location table.
void RemoveObjectLocation(const ObjectID &object_id, const NodeID &node_id)
/// \param[in] node_id The object location to be removed.
/// \return True if the reference exists, false otherwise.
bool RemoveObjectLocation(const ObjectID &object_id, const NodeID &node_id)
LOCKS_EXCLUDED(mutex_);
/// Get the locations from the location table of the given object.
/// Get the locations of the given object. The owner must have the object ref in
/// scope.
///
/// \param[in] object_id The object to get locations for.
/// \return The nodes that have the object.
std::unordered_set<NodeID> GetObjectLocations(const ObjectID &object_id)
LOCKS_EXCLUDED(mutex_);
/// \return The nodes that have the object if the reference exists, empty optional
/// otherwise.
absl::optional<absl::flat_hash_set<NodeID>> GetObjectLocations(
const ObjectID &object_id) LOCKS_EXCLUDED(mutex_);
/// Handle an object has been spilled to external storage.
///
@@ -475,6 +490,9 @@ class ReferenceCounter : public ReferenceCounterInterface,
// counting is enabled, then some raylet must be pinning the object value.
// This is the address of that raylet.
absl::optional<NodeID> pinned_at_raylet_id;
// If this object is owned by us and stored in plasma, this contains all
// object locations.
absl::flat_hash_set<NodeID> locations;
// Whether this object can be reconstructed via lineage. If false, then the
// object's value will be pinned as long as it is referenced by any other
// object's lineage.
@@ -695,14 +713,6 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// Holds all reference counts and dependency information for tracked ObjectIDs.
ReferenceTable object_id_refs_ GUARDED_BY(mutex_);
using LocationTable = absl::flat_hash_map<ObjectID, absl::flat_hash_set<NodeID>>;
/// Holds the client information for the owned objects. This table is seperate from
/// the reference table because we add object reference after putting object into the
/// plasma store and add the location to the object directory. Therefore we will receive
/// object location information before the reference is created.
LocationTable object_id_locations_ GUARDED_BY(mutex_);
/// Objects whose values have been freed by the language frontend.
/// The values in plasma will not be pinned. An object ID is
/// removed from this set once its Reference has been deleted
@@ -2108,6 +2108,16 @@ TEST_F(ReferenceCountTest, TestFree) {
ASSERT_FALSE(rc->IsPlasmaObjectFreed(id));
}
TEST_F(ReferenceCountTest, TestRemoveOwnedObject) {
ObjectID id = ObjectID::FromRandom();
// Test remove owned object.
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, false);
ASSERT_TRUE(rc->HasReference(id));
rc->RemoveOwnedObject(id);
ASSERT_FALSE(rc->HasReference(id));
}
} // namespace ray
int main(int argc, char **argv) {