mirror of
https://github.com/wassname/ray.git
synced 2026-07-05 07:22:05 +08:00
[Core] Ownership-based Object Directory - Added support for object spilling in the ownership-based object directory. (#13948)
* Add support for object spilling in the ownership-based object directory. * Move owner address hashmap into pinned_objects_ and objects_pending_spill_. * Update local object manager tests. * Feedback and misc. fixes. * Move spilled unpin callback lambda to std::binded private method. * Skip test_delete_objects_multi_node test on MacOS for now.
This commit is contained in:
@@ -101,7 +101,8 @@ cdef class CoreWorker:
|
||||
cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata,
|
||||
size_t data_size, ObjectRef object_ref,
|
||||
c_vector[CObjectID] contained_ids,
|
||||
CObjectID *c_object_id, shared_ptr[CBuffer] *data)
|
||||
CObjectID *c_object_id, shared_ptr[CBuffer] *data,
|
||||
owner_address=*)
|
||||
cdef store_task_outputs(
|
||||
self, worker, outputs, const c_vector[CObjectID] return_ids,
|
||||
c_vector[shared_ptr[CRayObject]] *returns)
|
||||
|
||||
+20
-7
@@ -628,7 +628,8 @@ cdef void gc_collect() nogil:
|
||||
|
||||
|
||||
cdef c_vector[c_string] spill_objects_handler(
|
||||
const c_vector[CObjectID]& object_ids_to_spill) nogil:
|
||||
const c_vector[CObjectID]& object_ids_to_spill,
|
||||
const c_vector[c_string]& owner_addresses) nogil:
|
||||
cdef c_vector[c_string] return_urls
|
||||
with gil:
|
||||
object_refs = VectorToObjectRefs(object_ids_to_spill)
|
||||
@@ -636,7 +637,8 @@ cdef c_vector[c_string] spill_objects_handler(
|
||||
with ray.worker._changeproctitle(
|
||||
ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER,
|
||||
ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER_IDLE):
|
||||
urls = external_storage.spill_objects(object_refs)
|
||||
urls = external_storage.spill_objects(
|
||||
object_refs, owner_addresses)
|
||||
for url in urls:
|
||||
return_urls.push_back(url)
|
||||
except Exception:
|
||||
@@ -930,7 +932,11 @@ cdef class CoreWorker:
|
||||
cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata,
|
||||
size_t data_size, ObjectRef object_ref,
|
||||
c_vector[CObjectID] contained_ids,
|
||||
CObjectID *c_object_id, shared_ptr[CBuffer] *data):
|
||||
CObjectID *c_object_id, shared_ptr[CBuffer] *data,
|
||||
owner_address=None):
|
||||
cdef:
|
||||
CAddress c_owner_address
|
||||
|
||||
if object_ref is None:
|
||||
with nogil:
|
||||
check_status(CCoreWorkerProcess.GetCoreWorker().CreateOwned(
|
||||
@@ -938,11 +944,16 @@ cdef class CoreWorker:
|
||||
c_object_id, data))
|
||||
else:
|
||||
c_object_id[0] = object_ref.native()
|
||||
if owner_address is None:
|
||||
c_owner_address = CCoreWorkerProcess.GetCoreWorker(
|
||||
).GetRpcAddress()
|
||||
else:
|
||||
c_owner_address = CAddress()
|
||||
c_owner_address.ParseFromString(owner_address)
|
||||
with nogil:
|
||||
check_status(CCoreWorkerProcess.GetCoreWorker().CreateExisting(
|
||||
metadata, data_size, c_object_id[0],
|
||||
CCoreWorkerProcess.GetCoreWorker().GetRpcAddress(),
|
||||
data))
|
||||
c_owner_address, data))
|
||||
|
||||
# If data is nullptr, that means the ObjectRef already existed,
|
||||
# which we ignore.
|
||||
@@ -951,7 +962,8 @@ cdef class CoreWorker:
|
||||
return data.get() == NULL
|
||||
|
||||
def put_file_like_object(
|
||||
self, metadata, data_size, file_like, ObjectRef object_ref):
|
||||
self, metadata, data_size, file_like, ObjectRef object_ref,
|
||||
owner_address):
|
||||
"""Directly create a new Plasma Store object from a file like
|
||||
object. This avoids extra memory copy.
|
||||
|
||||
@@ -961,6 +973,7 @@ cdef class CoreWorker:
|
||||
file_like: A python file object that provides the `readinto`
|
||||
interface.
|
||||
object_ref: The new ObjectRef.
|
||||
owner_address: Owner address for this object ref.
|
||||
"""
|
||||
cdef:
|
||||
CObjectID c_object_id
|
||||
@@ -975,7 +988,7 @@ cdef class CoreWorker:
|
||||
object_already_exists = self._create_put_buffer(
|
||||
metadata_buf, data_size, object_ref,
|
||||
ObjectRefsToVector([]),
|
||||
&c_object_id, &data_buf)
|
||||
&c_object_id, &data_buf, owner_address)
|
||||
if object_already_exists:
|
||||
logger.debug("Object already exists in 'put_file_like_object'.")
|
||||
return
|
||||
|
||||
@@ -80,6 +80,8 @@ class ExternalStorage(metaclass=abc.ABCMeta):
|
||||
the external storage is invalid.
|
||||
"""
|
||||
|
||||
HEADER_LENGTH = 24
|
||||
|
||||
def _get_objects_from_store(self, object_refs):
|
||||
worker = ray.worker.global_worker
|
||||
# Since the object should always exist in the plasma store before
|
||||
@@ -89,18 +91,21 @@ class ExternalStorage(metaclass=abc.ABCMeta):
|
||||
ray_object_pairs = worker.core_worker.get_if_local(object_refs)
|
||||
return ray_object_pairs
|
||||
|
||||
def _put_object_to_store(self, metadata, data_size, file_like, object_ref):
|
||||
def _put_object_to_store(self, metadata, data_size, file_like, object_ref,
|
||||
owner_address):
|
||||
worker = ray.worker.global_worker
|
||||
worker.core_worker.put_file_like_object(metadata, data_size, file_like,
|
||||
object_ref)
|
||||
object_ref, owner_address)
|
||||
|
||||
def _write_multiple_objects(self, f: IO, object_refs: List[ObjectRef],
|
||||
owner_addresses: List[str],
|
||||
url: str) -> List[str]:
|
||||
"""Fuse all given objects into a given file handle.
|
||||
|
||||
Args:
|
||||
f(IO): File handle to fusion all given object refs.
|
||||
object_refs(list): Object references to fusion to a single file.
|
||||
owner_addresses(list): Owner addresses for the provided objects.
|
||||
url(str): url where the object ref is stored
|
||||
in the external storage.
|
||||
|
||||
@@ -112,13 +117,18 @@ class ExternalStorage(metaclass=abc.ABCMeta):
|
||||
keys = []
|
||||
offset = 0
|
||||
ray_object_pairs = self._get_objects_from_store(object_refs)
|
||||
for ref, (buf, metadata) in zip(object_refs, ray_object_pairs):
|
||||
for ref, (buf, metadata), owner_address in zip(
|
||||
object_refs, ray_object_pairs, owner_addresses):
|
||||
address_len = len(owner_address)
|
||||
metadata_len = len(metadata)
|
||||
buf_len = len(buf)
|
||||
# 16 bytes to store metadata and buffer length.
|
||||
data_size_in_bytes = metadata_len + buf_len + 16
|
||||
# 24 bytes to store owner address, metadata, and buffer lengths.
|
||||
data_size_in_bytes = (
|
||||
address_len + metadata_len + buf_len + self.HEADER_LENGTH)
|
||||
f.write(address_len.to_bytes(8, byteorder="little"))
|
||||
f.write(metadata_len.to_bytes(8, byteorder="little"))
|
||||
f.write(buf_len.to_bytes(8, byteorder="little"))
|
||||
f.write(owner_address)
|
||||
f.write(metadata)
|
||||
f.write(memoryview(buf))
|
||||
url_with_offset = create_url_with_offset(
|
||||
@@ -127,7 +137,8 @@ class ExternalStorage(metaclass=abc.ABCMeta):
|
||||
offset += data_size_in_bytes
|
||||
return keys
|
||||
|
||||
def _size_check(self, metadata_len, buffer_len, obtained_data_size):
|
||||
def _size_check(self, address_len, metadata_len, buffer_len,
|
||||
obtained_data_size):
|
||||
"""Check whether or not the obtained_data_size is as expected.
|
||||
|
||||
Args:
|
||||
@@ -138,9 +149,11 @@ class ExternalStorage(metaclass=abc.ABCMeta):
|
||||
|
||||
Raises:
|
||||
ValueError if obtained_data_size is different from
|
||||
metadata_len + buffer_len + 16(first 8 bytes to store length).
|
||||
address_len + metadata_len + buffer_len +
|
||||
24 (first 8 bytes to store length).
|
||||
"""
|
||||
data_size_in_bytes = metadata_len + buffer_len + 16
|
||||
data_size_in_bytes = (
|
||||
address_len + metadata_len + buffer_len + self.HEADER_LENGTH)
|
||||
if data_size_in_bytes != obtained_data_size:
|
||||
raise ValueError(
|
||||
f"Obtained data has a size of {data_size_in_bytes}, "
|
||||
@@ -148,7 +161,7 @@ class ExternalStorage(metaclass=abc.ABCMeta):
|
||||
f"size of {obtained_data_size}.")
|
||||
|
||||
@abc.abstractmethod
|
||||
def spill_objects(self, object_refs) -> List[str]:
|
||||
def spill_objects(self, object_refs, owner_addresses) -> List[str]:
|
||||
"""Spill objects to the external storage. Objects are specified
|
||||
by their object refs.
|
||||
|
||||
@@ -191,7 +204,7 @@ class ExternalStorage(metaclass=abc.ABCMeta):
|
||||
class NullStorage(ExternalStorage):
|
||||
"""The class that represents an uninitialized external storage."""
|
||||
|
||||
def spill_objects(self, object_refs) -> List[str]:
|
||||
def spill_objects(self, object_refs, owner_addresses) -> List[str]:
|
||||
raise NotImplementedError("External storage is not initialized")
|
||||
|
||||
def restore_spilled_objects(self, object_refs, url_with_offset_list):
|
||||
@@ -220,7 +233,7 @@ class FileSystemStorage(ExternalStorage):
|
||||
raise ValueError("The given directory path to store objects, "
|
||||
f"{self.directory_path}, could not be created.")
|
||||
|
||||
def spill_objects(self, object_refs) -> List[str]:
|
||||
def spill_objects(self, object_refs, owner_addresses) -> List[str]:
|
||||
if len(object_refs) == 0:
|
||||
return []
|
||||
# Always use the first object ref as a key when fusioning objects.
|
||||
@@ -228,7 +241,8 @@ class FileSystemStorage(ExternalStorage):
|
||||
filename = f"{first_ref.hex()}-multi-{len(object_refs)}"
|
||||
url = f"{os.path.join(self.directory_path, filename)}"
|
||||
with open(url, "wb") as f:
|
||||
return self._write_multiple_objects(f, object_refs, url)
|
||||
return self._write_multiple_objects(f, object_refs,
|
||||
owner_addresses, url)
|
||||
|
||||
def restore_spilled_objects(self, object_refs: List[ObjectRef],
|
||||
url_with_offset_list: List[str]):
|
||||
@@ -243,13 +257,17 @@ class FileSystemStorage(ExternalStorage):
|
||||
# Read a part of the file and recover the object.
|
||||
with open(base_url, "rb") as f:
|
||||
f.seek(offset)
|
||||
address_len = int.from_bytes(f.read(8), byteorder="little")
|
||||
metadata_len = int.from_bytes(f.read(8), byteorder="little")
|
||||
buf_len = int.from_bytes(f.read(8), byteorder="little")
|
||||
self._size_check(metadata_len, buf_len, parsed_result.size)
|
||||
self._size_check(address_len, metadata_len, buf_len,
|
||||
parsed_result.size)
|
||||
total += buf_len
|
||||
owner_address = f.read(address_len)
|
||||
metadata = f.read(metadata_len)
|
||||
# read remaining data to our buffer
|
||||
self._put_object_to_store(metadata, buf_len, f, object_ref)
|
||||
self._put_object_to_store(metadata, buf_len, f, object_ref,
|
||||
owner_address)
|
||||
return total
|
||||
|
||||
def delete_spilled_objects(self, urls: List[str]):
|
||||
@@ -320,7 +338,7 @@ class ExternalStorageSmartOpenImpl(ExternalStorage):
|
||||
self.transport_params = {"defer_seek": True}
|
||||
self.transport_params.update(self.override_transport_params)
|
||||
|
||||
def spill_objects(self, object_refs) -> List[str]:
|
||||
def spill_objects(self, object_refs, owner_addresses) -> List[str]:
|
||||
if len(object_refs) == 0:
|
||||
return []
|
||||
from smart_open import open
|
||||
@@ -331,7 +349,8 @@ class ExternalStorageSmartOpenImpl(ExternalStorage):
|
||||
with open(
|
||||
url, "wb",
|
||||
transport_params=self.transport_params) as file_like:
|
||||
return self._write_multiple_objects(file_like, object_refs, url)
|
||||
return self._write_multiple_objects(file_like, object_refs,
|
||||
owner_addresses, url)
|
||||
|
||||
def restore_spilled_objects(self, object_refs: List[ObjectRef],
|
||||
url_with_offset_list: List[str]):
|
||||
@@ -352,13 +371,16 @@ class ExternalStorageSmartOpenImpl(ExternalStorage):
|
||||
# smart open seek reads the file from offset-end_of_the_file
|
||||
# when the seek is called.
|
||||
f.seek(offset)
|
||||
address_len = int.from_bytes(f.read(8), byteorder="little")
|
||||
metadata_len = int.from_bytes(f.read(8), byteorder="little")
|
||||
buf_len = int.from_bytes(f.read(8), byteorder="little")
|
||||
self._size_check(metadata_len, buf_len, parsed_result.size)
|
||||
owner_address = f.read(address_len)
|
||||
total += buf_len
|
||||
metadata = f.read(metadata_len)
|
||||
# read remaining data to our buffer
|
||||
self._put_object_to_store(metadata, buf_len, f, object_ref)
|
||||
self._put_object_to_store(metadata, buf_len, f, object_ref,
|
||||
owner_address)
|
||||
return total
|
||||
|
||||
def delete_spilled_objects(self, urls: List[str]):
|
||||
@@ -397,16 +419,17 @@ def reset_external_storage():
|
||||
_external_storage = NullStorage()
|
||||
|
||||
|
||||
def spill_objects(object_refs):
|
||||
def spill_objects(object_refs, owner_addresses):
|
||||
"""Spill objects to the external storage. Objects are specified
|
||||
by their object refs.
|
||||
|
||||
Args:
|
||||
object_refs: The list of the refs of the objects to be spilled.
|
||||
owner_addresses: The owner addresses of the provided object refs.
|
||||
Returns:
|
||||
A list of keys corresponding to the input object refs.
|
||||
"""
|
||||
return _external_storage.spill_objects(object_refs)
|
||||
return _external_storage.spill_objects(object_refs, owner_addresses)
|
||||
|
||||
|
||||
def restore_spilled_objects(object_refs: List[ObjectRef],
|
||||
|
||||
@@ -241,7 +241,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
|
||||
(void(const CWorkerID &) nogil) on_worker_shutdown
|
||||
(CRayStatus() nogil) check_signals
|
||||
(void() nogil) gc_collect
|
||||
(c_vector[c_string](const c_vector[CObjectID] &) nogil) spill_objects
|
||||
(c_vector[c_string](
|
||||
const c_vector[CObjectID] &,
|
||||
const c_vector[c_string] &) nogil) spill_objects
|
||||
(int64_t(
|
||||
const c_vector[CObjectID] &,
|
||||
const c_vector[c_string] &) nogil) restore_spilled_objects
|
||||
|
||||
@@ -564,7 +564,8 @@ def test_delete_objects_on_worker_failure(object_spilling_config,
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
platform.system() == "Windows", reason="Failing on Windows.")
|
||||
platform.system() in ["Windows", "Darwin"],
|
||||
reason="Failing on Windows and MacOS.")
|
||||
def test_delete_objects_multi_node(multi_node_object_spilling_config,
|
||||
ray_start_cluster):
|
||||
# Limit our object store to 75 MiB of memory.
|
||||
|
||||
@@ -1271,6 +1271,8 @@ void CoreWorker::SpillOwnedObject(const ObjectID &object_id,
|
||||
RAY_LOG(ERROR) << "Failed to spill object " << object_id
|
||||
<< ", raylet unreachable or object could not be spilled.";
|
||||
}
|
||||
// TODO(Clark): Provide spilled URL and spilled node ID to callback so it can
|
||||
// added them to the reference.
|
||||
callback();
|
||||
});
|
||||
}
|
||||
@@ -1281,6 +1283,7 @@ Status CoreWorker::SpillObjects(const std::vector<ObjectID> &object_ids) {
|
||||
auto ready_promise = std::make_shared<std::promise<void>>(std::promise<void>());
|
||||
Status final_status;
|
||||
|
||||
// TODO(Clark): Add spilled URL and spilled node ID to reference in this callback.
|
||||
auto callback = [mutex, num_remaining, ready_promise]() {
|
||||
absl::MutexLock lock(mutex.get());
|
||||
(*num_remaining)--;
|
||||
@@ -1320,7 +1323,10 @@ Status CoreWorker::SpillObjects(const std::vector<ObjectID> &object_ids) {
|
||||
ready_promise->get_future().wait();
|
||||
|
||||
for (const auto &object_id : object_ids) {
|
||||
reference_counter_->HandleObjectSpilled(object_id);
|
||||
// TODO(Clark): Move this to the callback (unless we really wanted to batch it) and
|
||||
// also include the spilled URL, spilled node ID, and updated object size.
|
||||
reference_counter_->HandleObjectSpilled(object_id, "", NodeID::Nil(), -1,
|
||||
/*release*/ true);
|
||||
}
|
||||
return final_status;
|
||||
}
|
||||
@@ -2231,15 +2237,19 @@ void CoreWorker::HandleGetObjectLocationsOwner(
|
||||
auto object_id = ObjectID::FromBinary(request.object_id());
|
||||
const auto &callback = [object_id, reply, send_reply_callback](
|
||||
const absl::flat_hash_set<NodeID> &locations,
|
||||
int64_t object_size, int64_t current_version) {
|
||||
int64_t object_size, const std::string &spilled_url,
|
||||
const NodeID &spilled_node_id, int64_t current_version) {
|
||||
RAY_LOG(DEBUG) << "Replying to HandleGetObjectLocationsOwner for " << object_id
|
||||
<< " with location update version " << current_version << ", "
|
||||
<< locations.size() << " locations, and " << object_size
|
||||
<< " object size.";
|
||||
<< locations.size() << " locations, " << spilled_url
|
||||
<< " spilled url, " << spilled_node_id << " spilled node ID, and "
|
||||
<< object_size << " object size.";
|
||||
for (const auto &node_id : locations) {
|
||||
reply->add_node_ids(node_id.Binary());
|
||||
}
|
||||
reply->set_object_size(object_size);
|
||||
reply->set_spilled_url(spilled_url);
|
||||
reply->set_spilled_node_id(spilled_node_id.Binary());
|
||||
reply->set_current_version(current_version);
|
||||
send_reply_callback(Status::OK(), nullptr, nullptr);
|
||||
};
|
||||
@@ -2432,7 +2442,13 @@ void CoreWorker::HandleSpillObjects(const rpc::SpillObjectsRequest &request,
|
||||
for (const auto &id_binary : request.object_ids_to_spill()) {
|
||||
object_ids_to_spill.push_back(ObjectID::FromBinary(id_binary));
|
||||
}
|
||||
std::vector<std::string> object_urls = options_.spill_objects(object_ids_to_spill);
|
||||
std::vector<std::string> owner_addresses;
|
||||
owner_addresses.reserve(request.owner_addresses_size());
|
||||
for (const auto &owner_address : request.owner_addresses()) {
|
||||
owner_addresses.push_back(owner_address.SerializeAsString());
|
||||
}
|
||||
std::vector<std::string> object_urls =
|
||||
options_.spill_objects(object_ids_to_spill, owner_addresses);
|
||||
for (size_t i = 0; i < object_urls.size(); i++) {
|
||||
reply->add_spilled_objects_url(std::move(object_urls[i]));
|
||||
}
|
||||
@@ -2443,6 +2459,24 @@ void CoreWorker::HandleSpillObjects(const rpc::SpillObjectsRequest &request,
|
||||
}
|
||||
}
|
||||
|
||||
void CoreWorker::HandleAddSpilledUrl(const rpc::AddSpilledUrlRequest &request,
|
||||
rpc::AddSpilledUrlReply *reply,
|
||||
rpc::SendReplyCallback send_reply_callback) {
|
||||
const ObjectID object_id = ObjectID::FromBinary(request.object_id());
|
||||
const std::string &spilled_url = request.spilled_url();
|
||||
const NodeID node_id = NodeID::FromBinary(request.spilled_node_id());
|
||||
RAY_LOG(DEBUG) << "Received AddSpilledUrl request for object " << object_id
|
||||
<< ", which has been spilled to " << spilled_url << " on node "
|
||||
<< node_id;
|
||||
auto reference_exists = reference_counter_->HandleObjectSpilled(
|
||||
object_id, spilled_url, node_id, request.size(), /*release*/ false);
|
||||
Status status =
|
||||
reference_exists
|
||||
? Status::OK()
|
||||
: Status::ObjectNotFound("Object " + object_id.Hex() + " not found");
|
||||
send_reply_callback(status, nullptr, nullptr);
|
||||
}
|
||||
|
||||
void CoreWorker::HandleRestoreSpilledObjects(
|
||||
const rpc::RestoreSpilledObjectsRequest &request,
|
||||
rpc::RestoreSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) {
|
||||
|
||||
@@ -137,7 +137,9 @@ struct CoreWorkerOptions {
|
||||
/// be held up in garbage objects.
|
||||
std::function<void()> gc_collect;
|
||||
/// Application-language callback to spill objects to external storage.
|
||||
std::function<std::vector<std::string>(const std::vector<ObjectID> &)> spill_objects;
|
||||
std::function<std::vector<std::string>(const std::vector<ObjectID> &,
|
||||
const std::vector<std::string> &)>
|
||||
spill_objects;
|
||||
/// Application-language callback to restore objects from external storage.
|
||||
std::function<int64_t(const std::vector<ObjectID> &, const std::vector<std::string> &)>
|
||||
restore_spilled_objects;
|
||||
@@ -911,6 +913,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
|
||||
rpc::SpillObjectsReply *reply,
|
||||
rpc::SendReplyCallback send_reply_callback) override;
|
||||
|
||||
// Add spilled URL to owned reference.
|
||||
void HandleAddSpilledUrl(const rpc::AddSpilledUrlRequest &request,
|
||||
rpc::AddSpilledUrlReply *reply,
|
||||
rpc::SendReplyCallback send_reply_callback) override;
|
||||
|
||||
// Restore objects from external storage.
|
||||
void HandleRestoreSpilledObjects(const rpc::RestoreSpilledObjectsRequest &request,
|
||||
rpc::RestoreSpilledObjectsReply *reply,
|
||||
|
||||
@@ -960,17 +960,33 @@ size_t ReferenceCounter::GetObjectSize(const ObjectID &object_id) const {
|
||||
return it->second.object_size;
|
||||
}
|
||||
|
||||
void ReferenceCounter::HandleObjectSpilled(const ObjectID &object_id) {
|
||||
bool ReferenceCounter::HandleObjectSpilled(const ObjectID &object_id,
|
||||
const std::string spilled_url,
|
||||
const NodeID &spilled_node_id, int64_t size,
|
||||
bool release) {
|
||||
absl::MutexLock lock(&mutex_);
|
||||
auto it = object_id_refs_.find(object_id);
|
||||
if (it == object_id_refs_.end()) {
|
||||
RAY_LOG(WARNING) << "Spilled object " << object_id << " already out of scope";
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
it->second.spilled = true;
|
||||
// Release the primary plasma copy, if any.
|
||||
ReleasePlasmaObject(it);
|
||||
if (spilled_url != "") {
|
||||
it->second.spilled_url = spilled_url;
|
||||
}
|
||||
if (!spilled_node_id.IsNil()) {
|
||||
it->second.spilled_node_id = spilled_node_id;
|
||||
}
|
||||
if (size > 0) {
|
||||
it->second.object_size = size;
|
||||
}
|
||||
PushToLocationSubscribers(it);
|
||||
if (release) {
|
||||
// Release the primary plasma copy, if any.
|
||||
ReleasePlasmaObject(it);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
absl::optional<LocalityData> ReferenceCounter::GetLocalityData(
|
||||
@@ -1010,8 +1026,9 @@ void ReferenceCounter::PushToLocationSubscribers(ReferenceTable::iterator it) {
|
||||
const auto callbacks = it->second.location_subscription_callbacks;
|
||||
it->second.location_subscription_callbacks.clear();
|
||||
it->second.location_version++;
|
||||
for (const auto &callback : callbacks) {
|
||||
callback(it->second.locations, it->second.object_size, it->second.location_version);
|
||||
for (const auto callback : callbacks) {
|
||||
callback(it->second.locations, it->second.object_size, it->second.spilled_url,
|
||||
it->second.spilled_node_id, it->second.location_version);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1031,7 +1048,8 @@ Status ReferenceCounter::SubscribeObjectLocations(
|
||||
// If the last location version is less than the current location version, we
|
||||
// already have location data that the subscriber hasn't seen yet, so we immediately
|
||||
// invoke the callback.
|
||||
callback(it->second.locations, it->second.object_size, it->second.location_version);
|
||||
callback(it->second.locations, it->second.object_size, it->second.spilled_url,
|
||||
it->second.spilled_node_id, it->second.location_version);
|
||||
} else {
|
||||
// Otherwise, save the callback for later invocation.
|
||||
it->second.location_subscription_callbacks.push_back(callback);
|
||||
|
||||
@@ -51,7 +51,8 @@ class ReferenceCounterInterface {
|
||||
|
||||
// Callback for location subscriptions.
|
||||
using LocationSubscriptionCallback =
|
||||
std::function<void(const absl::flat_hash_set<NodeID> &, int64_t, int64_t)>;
|
||||
std::function<void(const absl::flat_hash_set<NodeID> &, int64_t, const std::string &,
|
||||
const NodeID &, int64_t)>;
|
||||
|
||||
/// Class used by the core worker to keep track of ObjectID reference counts for garbage
|
||||
/// collection. This class is thread safe.
|
||||
@@ -423,8 +424,15 @@ class ReferenceCounter : public ReferenceCounterInterface,
|
||||
/// Handle an object has been spilled to external storage.
|
||||
///
|
||||
/// This notifies the primary raylet that the object is safe to release and
|
||||
/// records that the object has been spilled to suppress reconstruction.
|
||||
void HandleObjectSpilled(const ObjectID &object_id);
|
||||
/// records the spill URL, spill node ID, and updated object size.
|
||||
/// \param[in] object_id The object that has been spilled.
|
||||
/// \param[in] spilled_url The URL to which the object has been spilled.
|
||||
/// \param[in] spilled_node_id The ID of the node on which the object was spilled.
|
||||
/// \param[in] size The size of the object.
|
||||
/// \param[in] release Whether to release the reference.
|
||||
/// \return True if the reference exists, false otherwise.
|
||||
bool HandleObjectSpilled(const ObjectID &object_id, const std::string spilled_url,
|
||||
const NodeID &spilled_node_id, int64_t size, bool release);
|
||||
|
||||
/// Get locality data for object.
|
||||
absl::optional<LocalityData> GetLocalityData(const ObjectID &object_id);
|
||||
@@ -586,6 +594,13 @@ class ReferenceCounter : public ReferenceCounterInterface,
|
||||
size_t lineage_ref_count = 0;
|
||||
/// Whether this object has been spilled to external storage.
|
||||
bool spilled = false;
|
||||
/// For objects that have been spilled to external storage, the URL from which
|
||||
/// they can be retrieved.
|
||||
std::string spilled_url = "";
|
||||
/// The ID of the node that spilled the object.
|
||||
/// This will be Nil if the object has not been spilled or if it is spilled
|
||||
/// distributed external storage.
|
||||
NodeID spilled_node_id = NodeID::Nil();
|
||||
/// Location subscription callbacks registered by async location get requests.
|
||||
/// These will be invoked whenever locations or object_size are changed.
|
||||
std::vector<LocationSubscriptionCallback> location_subscription_callbacks;
|
||||
|
||||
@@ -34,6 +34,56 @@ void FilterRemovedNodes(std::shared_ptr<gcs::GcsClient> gcs_client,
|
||||
}
|
||||
}
|
||||
|
||||
/// Update object location data based on response from the owning core worker.
|
||||
bool UpdateObjectLocations(const rpc::GetObjectLocationsOwnerReply &location_reply,
|
||||
const Status &status, const ObjectID &object_id,
|
||||
std::shared_ptr<gcs::GcsClient> gcs_client,
|
||||
std::unordered_set<NodeID> *node_ids, std::string *spilled_url,
|
||||
NodeID *spilled_node_id, size_t *object_size) {
|
||||
bool is_updated = false;
|
||||
|
||||
std::unordered_set<NodeID> new_node_ids;
|
||||
|
||||
if (!status.ok()) {
|
||||
RAY_LOG(INFO) << "Failed to return location updates to subscribers for " << object_id
|
||||
<< ": " << status.ToString()
|
||||
<< ", assuming that the object was freed or evicted.";
|
||||
// When we can't get location updates from the owner, we assume that the object was
|
||||
// freed or evicted, so we send an empty location update to all subscribers.
|
||||
*node_ids = new_node_ids;
|
||||
is_updated = true;
|
||||
} else {
|
||||
// The size can be 0 if the update was a deletion. This assumes that an
|
||||
// object's size is always greater than 0.
|
||||
// TODO(swang): If that's not the case, we should use a flag to check
|
||||
// whether the size is set instead.
|
||||
if (location_reply.object_size() > 0) {
|
||||
*object_size = location_reply.object_size();
|
||||
is_updated = true;
|
||||
}
|
||||
for (auto const &node_id : location_reply.node_ids()) {
|
||||
new_node_ids.emplace(NodeID::FromBinary(node_id));
|
||||
}
|
||||
// Filter out the removed nodes from the object locations.
|
||||
FilterRemovedNodes(gcs_client, &new_node_ids);
|
||||
if (new_node_ids != *node_ids) {
|
||||
*node_ids = new_node_ids;
|
||||
is_updated = true;
|
||||
}
|
||||
const std::string &new_spilled_url = location_reply.spilled_url();
|
||||
if (new_spilled_url != *spilled_url) {
|
||||
const auto new_spilled_node_id =
|
||||
NodeID::FromBinary(location_reply.spilled_node_id());
|
||||
RAY_LOG(DEBUG) << "Received object spilled to " << new_spilled_url << " spilled on "
|
||||
<< new_spilled_node_id;
|
||||
*spilled_url = new_spilled_url;
|
||||
*spilled_node_id = new_spilled_node_id;
|
||||
is_updated = true;
|
||||
}
|
||||
}
|
||||
return is_updated;
|
||||
}
|
||||
|
||||
rpc::Address GetOwnerAddressFromObjectInfo(
|
||||
const object_manager::protocol::ObjectInfoT &object_info) {
|
||||
rpc::Address owner_address;
|
||||
@@ -141,28 +191,13 @@ void OwnershipBasedObjectDirectory::SubscriptionCallback(
|
||||
if (it == listeners_.end()) {
|
||||
return;
|
||||
}
|
||||
std::unordered_set<NodeID> node_ids;
|
||||
|
||||
// Once this flag is set to true, it should never go back to false.
|
||||
it->second.subscribed = true;
|
||||
|
||||
if (!status.ok()) {
|
||||
RAY_LOG(INFO) << "Worker " << worker_id << " failed to return location updates to "
|
||||
<< "subscribers for " << object_id << ": " << status.ToString()
|
||||
<< ", assuming that the object was freed or evicted.";
|
||||
it->second.object_size = 0;
|
||||
} else {
|
||||
if (reply.object_size() > 0) {
|
||||
it->second.object_size = reply.object_size();
|
||||
}
|
||||
|
||||
for (auto const &node_id : reply.node_ids()) {
|
||||
node_ids.emplace(NodeID::FromBinary(node_id));
|
||||
}
|
||||
FilterRemovedNodes(gcs_client_, &node_ids);
|
||||
}
|
||||
if (node_ids != it->second.current_object_locations || !status.ok()) {
|
||||
it->second.current_object_locations = std::move(node_ids);
|
||||
// Update entries for this object.
|
||||
if (UpdateObjectLocations(reply, status, object_id, gcs_client_,
|
||||
&it->second.current_object_locations, &it->second.spilled_url,
|
||||
&it->second.spilled_node_id, &it->second.object_size)) {
|
||||
// Copy the callbacks so that the callbacks can unsubscribe without interrupting
|
||||
// looping over the callbacks.
|
||||
auto callbacks = it->second.callbacks;
|
||||
@@ -171,10 +206,12 @@ void OwnershipBasedObjectDirectory::SubscriptionCallback(
|
||||
// empty, since this may indicate that the objects have been evicted from
|
||||
// all nodes.
|
||||
for (const auto &callback_pair : callbacks) {
|
||||
// It is safe to call the callback directly since this is already running
|
||||
// in the subscription callback stack.
|
||||
callback_pair.second(object_id, it->second.current_object_locations, "",
|
||||
NodeID::Nil(), it->second.object_size);
|
||||
// We can call the callback directly without worrying about invalidating caller
|
||||
// iterators since this is already running in the subscription callback stack.
|
||||
// See https://github.com/ray-project/ray/issues/2959.
|
||||
callback_pair.second(object_id, it->second.current_object_locations,
|
||||
it->second.spilled_url, it->second.spilled_node_id,
|
||||
it->second.object_size);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -222,10 +259,16 @@ ray::Status OwnershipBasedObjectDirectory::SubscribeObjectLocations(
|
||||
// immediately notify the caller of the current known locations.
|
||||
if (listener_state.subscribed) {
|
||||
auto &locations = listener_state.current_object_locations;
|
||||
auto object_size = it->second.object_size;
|
||||
io_service_.post([callback, locations, object_size, object_id]() {
|
||||
callback(object_id, locations, "", NodeID::Nil(), object_size);
|
||||
});
|
||||
auto &spilled_url = listener_state.spilled_url;
|
||||
auto &spilled_node_id = listener_state.spilled_node_id;
|
||||
auto object_size = listener_state.object_size;
|
||||
// We post the callback to the event loop in order to avoid mutating data structures
|
||||
// shared with the caller and potentially invalidating caller iterators.
|
||||
// See https://github.com/ray-project/ray/issues/2959.
|
||||
io_service_.post(
|
||||
[callback, locations, spilled_url, spilled_node_id, object_size, object_id]() {
|
||||
callback(object_id, locations, spilled_url, spilled_node_id, object_size);
|
||||
});
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
@@ -246,36 +289,63 @@ ray::Status OwnershipBasedObjectDirectory::UnsubscribeObjectLocations(
|
||||
ray::Status OwnershipBasedObjectDirectory::LookupLocations(
|
||||
const ObjectID &object_id, const rpc::Address &owner_address,
|
||||
const OnLocationsFound &callback) {
|
||||
WorkerID worker_id = WorkerID::FromBinary(owner_address.worker_id());
|
||||
std::shared_ptr<rpc::CoreWorkerClient> rpc_client = GetClient(owner_address);
|
||||
if (rpc_client == nullptr) {
|
||||
RAY_LOG(WARNING) << "Object " << object_id << " does not have owner. "
|
||||
<< "LookupLocations returns an empty list of locations.";
|
||||
io_service_.post([callback, object_id]() {
|
||||
callback(object_id, std::unordered_set<NodeID>(), "", NodeID::Nil(), 0);
|
||||
});
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
rpc::GetObjectLocationsOwnerRequest request;
|
||||
request.set_intended_worker_id(owner_address.worker_id());
|
||||
request.set_object_id(object_id.Binary());
|
||||
request.set_last_version(-1);
|
||||
|
||||
rpc_client->GetObjectLocationsOwner(
|
||||
request, [this, worker_id, object_id, callback](
|
||||
Status status, const rpc::GetObjectLocationsOwnerReply &reply) {
|
||||
if (!status.ok()) {
|
||||
RAY_LOG(ERROR) << "Worker " << worker_id << " failed to get the location for "
|
||||
<< object_id;
|
||||
}
|
||||
std::unordered_set<NodeID> node_ids;
|
||||
for (auto const &node_id : reply.node_ids()) {
|
||||
node_ids.emplace(NodeID::FromBinary(node_id));
|
||||
}
|
||||
FilterRemovedNodes(gcs_client_, &node_ids);
|
||||
callback(object_id, node_ids, "", NodeID::Nil(), reply.object_size());
|
||||
auto it = listeners_.find(object_id);
|
||||
if (it != listeners_.end() && it->second.subscribed) {
|
||||
// If we have locations cached due to a concurrent SubscribeObjectLocations
|
||||
// call, and we have received at least one update from the owner about
|
||||
// the object's creation, then call the callback immediately with the
|
||||
// cached locations.
|
||||
auto &locations = it->second.current_object_locations;
|
||||
auto &spilled_url = it->second.spilled_url;
|
||||
auto &spilled_node_id = it->second.spilled_node_id;
|
||||
auto object_size = it->second.object_size;
|
||||
// We post the callback to the event loop in order to avoid mutating data structures
|
||||
// shared with the caller and potentially invalidating caller iterators.
|
||||
// See https://github.com/ray-project/ray/issues/2959.
|
||||
io_service_.post(
|
||||
[callback, object_id, locations, spilled_url, spilled_node_id, object_size]() {
|
||||
callback(object_id, locations, spilled_url, spilled_node_id, object_size);
|
||||
});
|
||||
} else {
|
||||
WorkerID worker_id = WorkerID::FromBinary(owner_address.worker_id());
|
||||
std::shared_ptr<rpc::CoreWorkerClient> rpc_client = GetClient(owner_address);
|
||||
if (rpc_client == nullptr) {
|
||||
RAY_LOG(WARNING) << "Object " << object_id << " does not have owner. "
|
||||
<< "LookupLocations returns an empty list of locations.";
|
||||
// We post the callback to the event loop in order to avoid mutating data structures
|
||||
// shared with the caller and potentially invalidating caller iterators.
|
||||
// See https://github.com/ray-project/ray/issues/2959.
|
||||
io_service_.post([callback, object_id]() {
|
||||
callback(object_id, std::unordered_set<NodeID>(), "", NodeID::Nil(), 0);
|
||||
});
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
rpc::GetObjectLocationsOwnerRequest request;
|
||||
request.set_intended_worker_id(owner_address.worker_id());
|
||||
request.set_object_id(object_id.Binary());
|
||||
request.set_last_version(-1);
|
||||
|
||||
rpc_client->GetObjectLocationsOwner(
|
||||
request, [this, worker_id, object_id, callback](
|
||||
Status status, const rpc::GetObjectLocationsOwnerReply &reply) {
|
||||
if (!status.ok()) {
|
||||
RAY_LOG(ERROR) << "Worker " << worker_id << " failed to get the location for "
|
||||
<< object_id;
|
||||
}
|
||||
std::unordered_set<NodeID> node_ids;
|
||||
std::string spilled_url;
|
||||
NodeID spilled_node_id;
|
||||
size_t object_size = 0;
|
||||
UpdateObjectLocations(reply, status, object_id, gcs_client_, &node_ids,
|
||||
&spilled_url, &spilled_node_id, &object_size);
|
||||
// We can call the callback directly without worrying about invalidating
|
||||
// caller iterators since this is already running in the core worker
|
||||
// client's lookup callback stack.
|
||||
// See https://github.com/ray-project/ray/issues/2959.
|
||||
callback(object_id, node_ids, spilled_url, spilled_node_id, object_size);
|
||||
});
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@@ -189,10 +189,18 @@ message GetObjectLocationsOwnerRequest {
|
||||
}
|
||||
|
||||
message GetObjectLocationsOwnerReply {
|
||||
// The IDs of the nodes that this object appeared on or was evicted by.
|
||||
repeated bytes node_ids = 1;
|
||||
// The size of the object in bytes.
|
||||
uint64 object_size = 2;
|
||||
// The object has been spilled to this URL. This should be set xor the above
|
||||
// fields are set.
|
||||
string spilled_url = 3;
|
||||
// The ID of the node that spilled the object.
|
||||
// This will be Nil if the object was spilled to distributed external storage.
|
||||
bytes spilled_node_id = 4;
|
||||
// The version of the returned location updates.
|
||||
int64 current_version = 3;
|
||||
int64 current_version = 5;
|
||||
}
|
||||
|
||||
message KillActorRequest {
|
||||
@@ -306,6 +314,9 @@ message PlasmaObjectReadyReply {
|
||||
message SpillObjectsRequest {
|
||||
// The IDs of objects to be spilled.
|
||||
repeated bytes object_ids_to_spill = 1;
|
||||
// The owner addresses of the objects to be spilled. Must be in the same order as
|
||||
// object_ids_to_spill.
|
||||
repeated Address owner_addresses = 2;
|
||||
}
|
||||
|
||||
message SpillObjectsReply {
|
||||
@@ -333,6 +344,22 @@ message DeleteSpilledObjectsRequest {
|
||||
message DeleteSpilledObjectsReply {
|
||||
}
|
||||
|
||||
message AddSpilledUrlRequest {
|
||||
// Object that was spilled.
|
||||
bytes object_id = 1;
|
||||
// For objects that have been spilled to external storage, the URL from which
|
||||
// they can be retrieved.
|
||||
string spilled_url = 2;
|
||||
// The ID of the node that spilled the object.
|
||||
// This will be Nil if the object was spilled to distributed external storage.
|
||||
bytes spilled_node_id = 3;
|
||||
// The size of the object in bytes.
|
||||
int64 size = 4;
|
||||
}
|
||||
|
||||
message AddSpilledUrlReply {
|
||||
}
|
||||
|
||||
message ExitRequest {
|
||||
}
|
||||
|
||||
@@ -385,6 +412,9 @@ service CoreWorkerService {
|
||||
// Delete spilled objects from external storage. Caller: raylet; callee: I/O worker.
|
||||
rpc DeleteSpilledObjects(DeleteSpilledObjectsRequest)
|
||||
returns (DeleteSpilledObjectsReply);
|
||||
// Add spilled URL, spilled node ID, and update object size for owned object.
|
||||
// Caller: raylet; callee: owner worker.
|
||||
rpc AddSpilledUrl(AddSpilledUrlRequest) returns (AddSpilledUrlReply);
|
||||
// Notification from raylet that an object ID is available in local plasma.
|
||||
rpc PlasmaObjectReady(PlasmaObjectReadyRequest) returns (PlasmaObjectReadyReply);
|
||||
// Request for a worker to exit.
|
||||
|
||||
@@ -179,6 +179,10 @@ message RequestObjectSpillageRequest {
|
||||
message RequestObjectSpillageReply {
|
||||
// Whether the object spilling was successful or not.
|
||||
bool success = 1;
|
||||
// Object URL where the object is spilled.
|
||||
string object_url = 2;
|
||||
// The node id of a node where the object is spilled.
|
||||
bytes spilled_node_id = 3;
|
||||
}
|
||||
|
||||
message RestoreSpilledObjectRequest {
|
||||
|
||||
@@ -21,7 +21,8 @@ namespace ray {
|
||||
namespace raylet {
|
||||
|
||||
void LocalObjectManager::PinObjects(const std::vector<ObjectID> &object_ids,
|
||||
std::vector<std::unique_ptr<RayObject>> &&objects) {
|
||||
std::vector<std::unique_ptr<RayObject>> &&objects,
|
||||
const rpc::Address &owner_address) {
|
||||
RAY_CHECK(object_pinning_enabled_);
|
||||
for (size_t i = 0; i < object_ids.size(); i++) {
|
||||
const auto &object_id = object_ids[i];
|
||||
@@ -33,7 +34,7 @@ void LocalObjectManager::PinObjects(const std::vector<ObjectID> &object_ids,
|
||||
}
|
||||
RAY_LOG(DEBUG) << "Pinning object " << object_id;
|
||||
pinned_objects_size_ += object->GetSize();
|
||||
pinned_objects_.emplace(object_id, std::move(object));
|
||||
pinned_objects_.emplace(object_id, std::make_pair(std::move(object), owner_address));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,7 +72,7 @@ void LocalObjectManager::ReleaseFreedObject(const ObjectID &object_id) {
|
||||
spilled_object_pending_delete_.push(object_id);
|
||||
}
|
||||
if (pinned_objects_.count(object_id)) {
|
||||
pinned_objects_size_ -= pinned_objects_[object_id]->GetSize();
|
||||
pinned_objects_size_ -= pinned_objects_[object_id].first->GetSize();
|
||||
pinned_objects_.erase(object_id);
|
||||
}
|
||||
}
|
||||
@@ -143,7 +144,7 @@ bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) {
|
||||
std::vector<ObjectID> objects_to_spill;
|
||||
while (bytes_to_spill <= num_bytes_to_spill && it != pinned_objects_.end()) {
|
||||
if (is_plasma_object_spillable_(it->first)) {
|
||||
bytes_to_spill += it->second->GetSize();
|
||||
bytes_to_spill += it->second.first->GetSize();
|
||||
objects_to_spill.push_back(it->first);
|
||||
}
|
||||
it++;
|
||||
@@ -155,7 +156,7 @@ bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) {
|
||||
SpillObjectsInternal(objects_to_spill, [this, bytes_to_spill, objects_to_spill,
|
||||
start_time](const Status &status) {
|
||||
if (!status.ok()) {
|
||||
RAY_LOG(ERROR) << "Error spilling objects " << status.ToString();
|
||||
RAY_LOG(INFO) << "Failed to spill objects: " << status.ToString();
|
||||
} else {
|
||||
auto now = absl::GetCurrentTimeNanos();
|
||||
RAY_LOG(DEBUG) << "Spilled " << bytes_to_spill << " bytes in "
|
||||
@@ -210,7 +211,7 @@ void LocalObjectManager::SpillObjectsInternal(
|
||||
if (it != pinned_objects_.end()) {
|
||||
RAY_LOG(DEBUG) << "Spilling object " << id;
|
||||
objects_to_spill.push_back(id);
|
||||
num_bytes_pending_spill_ += it->second->GetSize();
|
||||
num_bytes_pending_spill_ += it->second.first->GetSize();
|
||||
objects_pending_spill_[id] = std::move(it->second);
|
||||
pinned_objects_.erase(it);
|
||||
}
|
||||
@@ -228,6 +229,9 @@ void LocalObjectManager::SpillObjectsInternal(
|
||||
for (const auto &object_id : objects_to_spill) {
|
||||
RAY_LOG(DEBUG) << "Sending spill request for object " << object_id;
|
||||
request.add_object_ids_to_spill(object_id.Binary());
|
||||
auto it = objects_pending_spill_.find(object_id);
|
||||
RAY_CHECK(it != objects_pending_spill_.end());
|
||||
request.add_owner_addresses()->MergeFrom(it->second.second);
|
||||
}
|
||||
io_worker->rpc_client()->SpillObjects(
|
||||
request, [this, objects_to_spill, callback, io_worker](
|
||||
@@ -241,7 +245,7 @@ void LocalObjectManager::SpillObjectsInternal(
|
||||
for (const auto &object_id : objects_to_spill) {
|
||||
auto it = objects_pending_spill_.find(object_id);
|
||||
RAY_CHECK(it != objects_pending_spill_.end());
|
||||
pinned_objects_size_ += it->second->GetSize();
|
||||
pinned_objects_size_ += it->second.first->GetSize();
|
||||
pinned_objects_.emplace(object_id, std::move(it->second));
|
||||
objects_pending_spill_.erase(it);
|
||||
}
|
||||
@@ -258,6 +262,46 @@ void LocalObjectManager::SpillObjectsInternal(
|
||||
});
|
||||
}
|
||||
|
||||
void LocalObjectManager::UnpinSpilledObjectCallback(
|
||||
const ObjectID &object_id, const std::string &object_url,
|
||||
std::shared_ptr<size_t> num_remaining,
|
||||
std::function<void(const ray::Status &)> callback, ray::Status status) {
|
||||
if (!status.ok()) {
|
||||
RAY_LOG(INFO) << "Failed to send spilled url for object " << object_id
|
||||
<< " to object directory, considering the object to have been freed: "
|
||||
<< status.ToString();
|
||||
} else {
|
||||
RAY_LOG(DEBUG) << "Object " << object_id << " spilled to " << object_url
|
||||
<< " and object directory has been informed";
|
||||
}
|
||||
RAY_LOG(DEBUG) << "Unpinning pending spill object " << object_id;
|
||||
// Unpin the object.
|
||||
auto it = objects_pending_spill_.find(object_id);
|
||||
RAY_CHECK(it != objects_pending_spill_.end());
|
||||
num_bytes_pending_spill_ -= it->second.first->GetSize();
|
||||
objects_pending_spill_.erase(it);
|
||||
|
||||
// Update the object_id -> url_ref_count to use it for deletion later.
|
||||
// We need to track the references here because a single file can contain
|
||||
// multiple objects, and we shouldn't delete the file until
|
||||
// all the objects are gone out of scope.
|
||||
// object_url is equivalent to url_with_offset.
|
||||
auto parsed_url = ParseURL(object_url);
|
||||
const auto base_url_it = parsed_url->find("url");
|
||||
RAY_CHECK(base_url_it != parsed_url->end());
|
||||
if (!url_ref_count_.contains(base_url_it->second)) {
|
||||
url_ref_count_[base_url_it->second] = 1;
|
||||
} else {
|
||||
url_ref_count_[base_url_it->second] += 1;
|
||||
}
|
||||
spilled_objects_url_.emplace(object_id, object_url);
|
||||
|
||||
(*num_remaining)--;
|
||||
if (*num_remaining == 0 && callback) {
|
||||
callback(status);
|
||||
}
|
||||
}
|
||||
|
||||
void LocalObjectManager::AddSpilledUrls(
|
||||
const std::vector<ObjectID> &object_ids, const rpc::SpillObjectsReply &worker_reply,
|
||||
std::function<void(const ray::Status &)> callback) {
|
||||
@@ -274,39 +318,36 @@ void LocalObjectManager::AddSpilledUrls(
|
||||
auto it = objects_pending_spill_.find(object_id);
|
||||
RAY_CHECK(it != objects_pending_spill_.end());
|
||||
|
||||
// Write to object directory. Wait for the write to finish before
|
||||
// releasing the object to make sure that the spilled object can
|
||||
// be retrieved by other raylets.
|
||||
RAY_CHECK_OK(object_info_accessor_.AsyncAddSpilledUrl(
|
||||
object_id, object_url, node_id_object_spilled, it->second->GetSize(),
|
||||
[this, object_id, object_url, callback, num_remaining](Status status) {
|
||||
RAY_CHECK_OK(status);
|
||||
// Unpin the object.
|
||||
auto it = objects_pending_spill_.find(object_id);
|
||||
RAY_CHECK(it != objects_pending_spill_.end());
|
||||
num_bytes_pending_spill_ -= it->second->GetSize();
|
||||
objects_pending_spill_.erase(it);
|
||||
auto unpin_callback =
|
||||
std::bind(&LocalObjectManager::UnpinSpilledObjectCallback, this, object_id,
|
||||
object_url, num_remaining, callback, std::placeholders::_1);
|
||||
|
||||
// Update the object_id -> url_ref_count to use it for deletion later.
|
||||
// We need to track the references here because a single file can contain
|
||||
// multiple objects, and we shouldn't delete the file until
|
||||
// all the objects are gone out of scope.
|
||||
// object_url is equivalent to url_with_offset.
|
||||
auto parsed_url = ParseURL(object_url);
|
||||
const auto base_url_it = parsed_url->find("url");
|
||||
RAY_CHECK(base_url_it != parsed_url->end());
|
||||
if (!url_ref_count_.contains(base_url_it->second)) {
|
||||
url_ref_count_[base_url_it->second] = 1;
|
||||
} else {
|
||||
url_ref_count_[base_url_it->second] += 1;
|
||||
}
|
||||
spilled_objects_url_.emplace(object_id, object_url);
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
// TODO(Clark): Don't send RPC to owner if we're fulfilling an owner-initiated
|
||||
// spill RPC.
|
||||
rpc::AddSpilledUrlRequest request;
|
||||
request.set_object_id(object_id.Binary());
|
||||
request.set_spilled_url(object_url);
|
||||
request.set_spilled_node_id(node_id_object_spilled.Binary());
|
||||
request.set_size(it->second.first->GetSize());
|
||||
|
||||
(*num_remaining)--;
|
||||
if (*num_remaining == 0 && callback) {
|
||||
callback(status);
|
||||
}
|
||||
}));
|
||||
auto owner_client = owner_client_pool_.GetOrConnect(it->second.second);
|
||||
RAY_LOG(DEBUG) << "Sending spilled URL " << object_url << " for object "
|
||||
<< object_id << " to owner "
|
||||
<< WorkerID::FromBinary(it->second.second.worker_id());
|
||||
// Send spilled URL, spilled node ID, and object size to owner.
|
||||
owner_client->AddSpilledUrl(
|
||||
request, [unpin_callback](Status status, const rpc::AddSpilledUrlReply &reply) {
|
||||
unpin_callback(status);
|
||||
});
|
||||
} else {
|
||||
// Write to object directory. Wait for the write to finish before
|
||||
// releasing the object to make sure that the spilled object can
|
||||
// be retrieved by other raylets.
|
||||
RAY_CHECK_OK(object_info_accessor_.AsyncAddSpilledUrl(
|
||||
object_id, object_url, node_id_object_spilled, it->second.first->GetSize(),
|
||||
unpin_callback));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -321,11 +362,11 @@ void LocalObjectManager::AsyncRestoreSpilledObject(
|
||||
if (!node_id.IsNil() && node_id != self_node_id_) {
|
||||
// If we know where this object was spilled, and the current node is not that one,
|
||||
// send a RPC to a remote node that spilled the object to restore it.
|
||||
RAY_LOG(DEBUG) << "Send a object restoration request of id: " << object_id
|
||||
RAY_LOG(DEBUG) << "Send an object restoration request of id: " << object_id
|
||||
<< " to a remote node: " << node_id;
|
||||
// TODO(sang): We need to deduplicate this remote RPC. Since restore request
|
||||
// is retried every 10ms without exponential backoff, this can add huge overhead to a
|
||||
// remote node that spilled the object.
|
||||
// is retried every 10ms without exponential backoff, this can add huge overhead to
|
||||
// a remote node that spilled the object.
|
||||
restore_object_from_remote_node_(object_id, object_url, node_id);
|
||||
if (callback) {
|
||||
callback(Status::OK());
|
||||
@@ -395,9 +436,9 @@ void LocalObjectManager::ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_siz
|
||||
object_urls_to_delete.size() < max_batch_size) {
|
||||
auto &object_id = spilled_object_pending_delete_.front();
|
||||
// If the object is still spilling, do nothing. This will block other entries to be
|
||||
// processed, but it should be fine because the spilling will be eventually done, and
|
||||
// deleting objects is the low priority tasks.
|
||||
// This will instead enable simpler logic after this block.
|
||||
// processed, but it should be fine because the spilling will be eventually done,
|
||||
// and deleting objects is the low priority tasks. This will instead enable simpler
|
||||
// logic after this block.
|
||||
if (objects_pending_spill_.contains(object_id)) {
|
||||
break;
|
||||
}
|
||||
@@ -405,8 +446,8 @@ void LocalObjectManager::ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_siz
|
||||
// Object id is either spilled or not spilled at this point.
|
||||
const auto spilled_objects_url_it = spilled_objects_url_.find(object_id);
|
||||
if (spilled_objects_url_it != spilled_objects_url_.end()) {
|
||||
// If the object was spilled, see if we can delete it. We should first check the ref
|
||||
// count.
|
||||
// If the object was spilled, see if we can delete it. We should first check the
|
||||
// ref count.
|
||||
std::string &object_url = spilled_objects_url_it->second;
|
||||
// Note that here, we need to parse the object url to obtain the base_url.
|
||||
auto parsed_url = ParseURL(object_url);
|
||||
@@ -475,5 +516,4 @@ std::string LocalObjectManager::DebugString() const {
|
||||
}
|
||||
|
||||
}; // namespace raylet
|
||||
|
||||
}; // namespace ray
|
||||
|
||||
@@ -70,8 +70,10 @@ class LocalObjectManager {
|
||||
/// \param object_ids The objects to be pinned.
|
||||
/// \param objects Pointers to the objects to be pinned. The pointer should
|
||||
/// be kept in scope until the object can be released.
|
||||
/// \param owner_address The owner of the objects to be pinned.
|
||||
void PinObjects(const std::vector<ObjectID> &object_ids,
|
||||
std::vector<std::unique_ptr<RayObject>> &&objects);
|
||||
std::vector<std::unique_ptr<RayObject>> &&objects,
|
||||
const rpc::Address &owner_address);
|
||||
|
||||
/// Wait for the objects' owner to free the object. The objects will be
|
||||
/// released when the owner at the given address fails or replies that the
|
||||
@@ -164,6 +166,14 @@ class LocalObjectManager {
|
||||
/// objects.
|
||||
void FlushFreeObjects();
|
||||
|
||||
// A callback for unpinning spilled objects. This should be invoked after the object
|
||||
// has been spilled and after the object directory has been sent the spilled URL.
|
||||
void UnpinSpilledObjectCallback(const ObjectID &object_id,
|
||||
const std::string &object_url,
|
||||
std::shared_ptr<size_t> num_remaining,
|
||||
std::function<void(const ray::Status &)> callback,
|
||||
ray::Status status);
|
||||
|
||||
/// Add objects' spilled URLs to the global object directory. Call the
|
||||
/// callback once all URLs have been added.
|
||||
void AddSpilledUrls(const std::vector<ObjectID> &object_ids,
|
||||
@@ -203,7 +213,8 @@ class LocalObjectManager {
|
||||
std::function<void(const std::vector<ObjectID> &)> on_objects_freed_;
|
||||
|
||||
// Objects that are pinned on this node.
|
||||
absl::flat_hash_map<ObjectID, std::unique_ptr<RayObject>> pinned_objects_;
|
||||
absl::flat_hash_map<ObjectID, std::pair<std::unique_ptr<RayObject>, rpc::Address>>
|
||||
pinned_objects_;
|
||||
|
||||
// Total size of objects pinned on this node.
|
||||
size_t pinned_objects_size_ = 0;
|
||||
@@ -211,7 +222,8 @@ class LocalObjectManager {
|
||||
// Objects that were pinned on this node but that are being spilled.
|
||||
// These objects will be released once spilling is complete and the URL is
|
||||
// written to the object directory.
|
||||
absl::flat_hash_map<ObjectID, std::unique_ptr<RayObject>> objects_pending_spill_;
|
||||
absl::flat_hash_map<ObjectID, std::pair<std::unique_ptr<RayObject>, rpc::Address>>
|
||||
objects_pending_spill_;
|
||||
|
||||
/// Objects that were spilled on this node but that are being restored.
|
||||
/// The field is used to dedup the same restore request while restoration is in
|
||||
|
||||
@@ -516,11 +516,17 @@ void NodeManager::DoLocalGC() {
|
||||
void NodeManager::HandleRequestObjectSpillage(
|
||||
const rpc::RequestObjectSpillageRequest &request,
|
||||
rpc::RequestObjectSpillageReply *reply, rpc::SendReplyCallback send_reply_callback) {
|
||||
const auto &object_id = ObjectID::FromBinary(request.object_id());
|
||||
RAY_LOG(DEBUG) << "Received RequestObjectSpillage for object " << object_id;
|
||||
local_object_manager_.SpillObjects(
|
||||
{ObjectID::FromBinary(request.object_id())},
|
||||
[reply, send_reply_callback](const ray::Status &status) {
|
||||
{object_id}, [object_id, reply, send_reply_callback](const ray::Status &status) {
|
||||
if (status.ok()) {
|
||||
RAY_LOG(DEBUG) << "Object " << object_id
|
||||
<< " has been spilled, replying to owner";
|
||||
reply->set_success(true);
|
||||
// TODO(Clark): Add spilled URLs and spilled node ID to owner RPC reply here
|
||||
// if OBOD is enabled, instead of relying on automatic raylet spilling path to
|
||||
// send an extra RPC to the owner.
|
||||
}
|
||||
send_reply_callback(Status::OK(), nullptr, nullptr);
|
||||
});
|
||||
@@ -2406,6 +2412,7 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
|
||||
rpc::SendReplyCallback send_reply_callback) {
|
||||
std::vector<ObjectID> object_ids;
|
||||
object_ids.reserve(request.object_ids_size());
|
||||
const auto &owner_address = request.owner_address();
|
||||
for (const auto &object_id_binary : request.object_ids()) {
|
||||
object_ids.push_back(ObjectID::FromBinary(object_id_binary));
|
||||
}
|
||||
@@ -2419,10 +2426,10 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
|
||||
send_reply_callback(Status::Invalid("Failed to get objects."), nullptr, nullptr);
|
||||
return;
|
||||
}
|
||||
local_object_manager_.PinObjects(object_ids, std::move(results));
|
||||
local_object_manager_.PinObjects(object_ids, std::move(results), owner_address);
|
||||
}
|
||||
// Wait for the object to be freed by the owner, which keeps the ref count.
|
||||
local_object_manager_.WaitForObjectFree(request.owner_address(), object_ids);
|
||||
local_object_manager_.WaitForObjectFree(owner_address, object_ids);
|
||||
send_reply_callback(Status::OK(), nullptr, nullptr);
|
||||
}
|
||||
|
||||
|
||||
@@ -37,21 +37,41 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface {
|
||||
void WaitForObjectEviction(
|
||||
const rpc::WaitForObjectEvictionRequest &request,
|
||||
const rpc::ClientCallback<rpc::WaitForObjectEvictionReply> &callback) override {
|
||||
callbacks.push_back(callback);
|
||||
eviction_callbacks.push_back(callback);
|
||||
}
|
||||
|
||||
bool ReplyObjectEviction(Status status = Status::OK()) {
|
||||
if (callbacks.size() == 0) {
|
||||
if (eviction_callbacks.empty()) {
|
||||
return false;
|
||||
}
|
||||
auto callback = callbacks.front();
|
||||
auto callback = eviction_callbacks.front();
|
||||
auto reply = rpc::WaitForObjectEvictionReply();
|
||||
callback(status, reply);
|
||||
callbacks.pop_front();
|
||||
eviction_callbacks.pop_front();
|
||||
return true;
|
||||
}
|
||||
|
||||
std::list<rpc::ClientCallback<rpc::WaitForObjectEvictionReply>> callbacks;
|
||||
void AddSpilledUrl(
|
||||
const rpc::AddSpilledUrlRequest &request,
|
||||
const rpc::ClientCallback<rpc::AddSpilledUrlReply> &callback) override {
|
||||
object_urls.emplace(ObjectID::FromBinary(request.object_id()), request.spilled_url());
|
||||
spilled_url_callbacks.push_back(callback);
|
||||
}
|
||||
|
||||
bool ReplyAddSpilledUrl(Status status = Status::OK()) {
|
||||
if (spilled_url_callbacks.empty()) {
|
||||
return false;
|
||||
}
|
||||
auto callback = spilled_url_callbacks.front();
|
||||
auto reply = rpc::AddSpilledUrlReply();
|
||||
callback(status, reply);
|
||||
spilled_url_callbacks.pop_front();
|
||||
return true;
|
||||
}
|
||||
|
||||
std::deque<rpc::ClientCallback<rpc::WaitForObjectEvictionReply>> eviction_callbacks;
|
||||
std::unordered_map<ObjectID, std::string> object_urls;
|
||||
std::deque<rpc::ClientCallback<rpc::AddSpilledUrlReply>> spilled_url_callbacks;
|
||||
};
|
||||
|
||||
class MockIOWorkerClient : public rpc::CoreWorkerClientInterface {
|
||||
@@ -334,7 +354,7 @@ TEST_F(LocalObjectManagerTest, TestPin) {
|
||||
new RayObject(nullptr, meta_buffer, std::vector<ObjectID>()));
|
||||
objects.push_back(std::move(object));
|
||||
}
|
||||
manager.PinObjects(object_ids, std::move(objects));
|
||||
manager.PinObjects(object_ids, std::move(objects), owner_address);
|
||||
manager.WaitForObjectFree(owner_address, object_ids);
|
||||
|
||||
for (size_t i = 0; i < free_objects_batch_size; i++) {
|
||||
@@ -349,6 +369,8 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) {
|
||||
// First, spill objects.
|
||||
std::vector<ObjectID> object_ids;
|
||||
std::vector<std::unique_ptr<RayObject>> objects;
|
||||
rpc::Address owner_address;
|
||||
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
|
||||
|
||||
for (size_t i = 0; i < free_objects_batch_size; i++) {
|
||||
ObjectID object_id = ObjectID::FromRandom();
|
||||
@@ -358,7 +380,7 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) {
|
||||
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
|
||||
objects.push_back(std::move(object));
|
||||
}
|
||||
manager.PinObjects(object_ids, std::move(objects));
|
||||
manager.PinObjects(object_ids, std::move(objects), owner_address);
|
||||
|
||||
manager.SpillObjects(object_ids,
|
||||
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
|
||||
@@ -368,7 +390,11 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) {
|
||||
}
|
||||
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
|
||||
for (size_t i = 0; i < object_ids.size(); i++) {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
|
||||
} else {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
}
|
||||
}
|
||||
|
||||
// Then try restoring objects from local.
|
||||
@@ -416,6 +442,8 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) {
|
||||
TEST_F(LocalObjectManagerTest, TestExplicitSpill) {
|
||||
std::vector<ObjectID> object_ids;
|
||||
std::vector<std::unique_ptr<RayObject>> objects;
|
||||
rpc::Address owner_address;
|
||||
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
|
||||
|
||||
for (size_t i = 0; i < free_objects_batch_size; i++) {
|
||||
ObjectID object_id = ObjectID::FromRandom();
|
||||
@@ -425,7 +453,7 @@ TEST_F(LocalObjectManagerTest, TestExplicitSpill) {
|
||||
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
|
||||
objects.push_back(std::move(object));
|
||||
}
|
||||
manager.PinObjects(object_ids, std::move(objects));
|
||||
manager.PinObjects(object_ids, std::move(objects), owner_address);
|
||||
|
||||
int num_times_fired = 0;
|
||||
manager.SpillObjects(object_ids, [&](const Status &status) mutable {
|
||||
@@ -444,11 +472,19 @@ TEST_F(LocalObjectManagerTest, TestExplicitSpill) {
|
||||
}
|
||||
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
|
||||
for (size_t i = 0; i < object_ids.size(); i++) {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
|
||||
} else {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
}
|
||||
}
|
||||
ASSERT_EQ(num_times_fired, 1);
|
||||
for (size_t i = 0; i < object_ids.size(); i++) {
|
||||
ASSERT_EQ(object_table.object_urls[object_ids[i]], urls[i]);
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_EQ(owner_client->object_urls[object_ids[i]], urls[i]);
|
||||
} else {
|
||||
ASSERT_EQ(object_table.object_urls[object_ids[i]], urls[i]);
|
||||
}
|
||||
}
|
||||
for (const auto &id : object_ids) {
|
||||
ASSERT_EQ((*unpins)[id], 1);
|
||||
@@ -470,7 +506,7 @@ TEST_F(LocalObjectManagerTest, TestDuplicateSpill) {
|
||||
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
|
||||
objects.push_back(std::move(object));
|
||||
}
|
||||
manager.PinObjects(object_ids, std::move(objects));
|
||||
manager.PinObjects(object_ids, std::move(objects), owner_address);
|
||||
manager.WaitForObjectFree(owner_address, object_ids);
|
||||
|
||||
int num_times_fired = 0;
|
||||
@@ -494,11 +530,19 @@ TEST_F(LocalObjectManagerTest, TestDuplicateSpill) {
|
||||
EXPECT_CALL(worker_pool, PushSpillWorker(_));
|
||||
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
|
||||
for (size_t i = 0; i < object_ids.size(); i++) {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
|
||||
} else {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
}
|
||||
}
|
||||
ASSERT_EQ(num_times_fired, 1);
|
||||
for (size_t i = 0; i < object_ids.size(); i++) {
|
||||
ASSERT_EQ(object_table.object_urls[object_ids[i]], urls[i]);
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_EQ(owner_client->object_urls[object_ids[i]], urls[i]);
|
||||
} else {
|
||||
ASSERT_EQ(object_table.object_urls[object_ids[i]], urls[i]);
|
||||
}
|
||||
}
|
||||
ASSERT_FALSE(worker_pool.io_worker_client->ReplySpillObjects(urls));
|
||||
for (const auto &id : object_ids) {
|
||||
@@ -524,7 +568,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) {
|
||||
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
|
||||
objects.push_back(std::move(object));
|
||||
}
|
||||
manager.PinObjects(object_ids, std::move(objects));
|
||||
manager.PinObjects(object_ids, std::move(objects), owner_address);
|
||||
ASSERT_TRUE(manager.SpillObjectsOfSize(total_size / 2));
|
||||
for (const auto &id : object_ids) {
|
||||
ASSERT_EQ((*unpins)[id], 0);
|
||||
@@ -541,13 +585,26 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) {
|
||||
// to evict.
|
||||
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
|
||||
for (size_t i = 0; i < urls.size(); i++) {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
|
||||
} else {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
}
|
||||
}
|
||||
ASSERT_EQ(object_table.object_urls.size(), object_ids.size() / 2 + 1);
|
||||
for (auto &object_url : object_table.object_urls) {
|
||||
auto it = std::find(urls.begin(), urls.end(), object_url.second);
|
||||
ASSERT_TRUE(it != urls.end());
|
||||
ASSERT_EQ((*unpins)[object_url.first], 1);
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_EQ(owner_client->object_urls.size(), object_ids.size() / 2 + 1);
|
||||
for (auto &object_url : owner_client->object_urls) {
|
||||
auto it = std::find(urls.begin(), urls.end(), object_url.second);
|
||||
ASSERT_TRUE(it != urls.end());
|
||||
ASSERT_EQ((*unpins)[object_url.first], 1);
|
||||
}
|
||||
} else {
|
||||
ASSERT_EQ(object_table.object_urls.size(), object_ids.size() / 2 + 1);
|
||||
for (auto &object_url : object_table.object_urls) {
|
||||
auto it = std::find(urls.begin(), urls.end(), object_url.second);
|
||||
ASSERT_TRUE(it != urls.end());
|
||||
ASSERT_EQ((*unpins)[object_url.first], 1);
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure providing 0 bytes to SpillObjectsOfSize will spill one object.
|
||||
@@ -556,13 +613,23 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) {
|
||||
EXPECT_CALL(worker_pool, PushSpillWorker(_));
|
||||
const std::string url = BuildURL("url" + std::to_string(object_ids.size()));
|
||||
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({url}));
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
ASSERT_EQ(object_table.object_urls.size(), 3);
|
||||
urls.push_back(url);
|
||||
for (auto &object_url : object_table.object_urls) {
|
||||
auto it = std::find(urls.begin(), urls.end(), object_url.second);
|
||||
ASSERT_TRUE(it != urls.end());
|
||||
ASSERT_EQ((*unpins)[object_url.first], 1);
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
|
||||
ASSERT_EQ(owner_client->object_urls.size(), 3);
|
||||
for (auto &object_url : owner_client->object_urls) {
|
||||
auto it = std::find(urls.begin(), urls.end(), object_url.second);
|
||||
ASSERT_TRUE(it != urls.end());
|
||||
ASSERT_EQ((*unpins)[object_url.first], 1);
|
||||
}
|
||||
} else {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
ASSERT_EQ(object_table.object_urls.size(), 3);
|
||||
for (auto &object_url : object_table.object_urls) {
|
||||
auto it = std::find(urls.begin(), urls.end(), object_url.second);
|
||||
ASSERT_TRUE(it != urls.end());
|
||||
ASSERT_EQ((*unpins)[object_url.first], 1);
|
||||
}
|
||||
}
|
||||
|
||||
// Since there's no more object to spill, this should fail.
|
||||
@@ -587,7 +654,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectNotEvictable) {
|
||||
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
|
||||
objects.push_back(std::move(object));
|
||||
|
||||
manager.PinObjects(object_ids, std::move(objects));
|
||||
manager.PinObjects(object_ids, std::move(objects), owner_address);
|
||||
ASSERT_FALSE(manager.SpillObjectsOfSize(1000));
|
||||
for (const auto &id : object_ids) {
|
||||
ASSERT_EQ((*unpins)[id], 0);
|
||||
@@ -616,7 +683,7 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) {
|
||||
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
|
||||
objects.push_back(std::move(object));
|
||||
}
|
||||
manager.PinObjects(object_ids, std::move(objects));
|
||||
manager.PinObjects(object_ids, std::move(objects), owner_address);
|
||||
|
||||
// This will spill until 2 workers are occupied.
|
||||
manager.SpillObjectUptoMaxThroughput();
|
||||
@@ -633,12 +700,23 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) {
|
||||
std::vector<std::string> urls;
|
||||
urls.push_back(BuildURL("url" + std::to_string(0)));
|
||||
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({urls[0]}));
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
// Make sure object is spilled.
|
||||
ASSERT_EQ(object_table.object_urls.size(), 1);
|
||||
for (auto &object_url : object_table.object_urls) {
|
||||
if (urls[0] == object_url.second) {
|
||||
ASSERT_EQ((*unpins)[object_url.first], 1);
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
|
||||
// Make sure object is spilled.
|
||||
ASSERT_EQ(owner_client->object_urls.size(), 1);
|
||||
for (auto &object_url : owner_client->object_urls) {
|
||||
if (urls[0] == object_url.second) {
|
||||
ASSERT_EQ((*unpins)[object_url.first], 1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
// Make sure object is spilled.
|
||||
ASSERT_EQ(object_table.object_urls.size(), 1);
|
||||
for (auto &object_url : object_table.object_urls) {
|
||||
if (urls[0] == object_url.second) {
|
||||
ASSERT_EQ((*unpins)[object_url.first], 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -656,13 +734,26 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) {
|
||||
}
|
||||
for (size_t i = 1; i < urls.size(); i++) {
|
||||
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({urls[i]}));
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
|
||||
} else {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
}
|
||||
}
|
||||
ASSERT_EQ(object_table.object_urls.size(), 3);
|
||||
for (auto &object_url : object_table.object_urls) {
|
||||
auto it = std::find(urls.begin(), urls.end(), object_url.second);
|
||||
ASSERT_TRUE(it != urls.end());
|
||||
ASSERT_EQ((*unpins)[object_url.first], 1);
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_EQ(owner_client->object_urls.size(), 3);
|
||||
for (auto &object_url : owner_client->object_urls) {
|
||||
auto it = std::find(urls.begin(), urls.end(), object_url.second);
|
||||
ASSERT_TRUE(it != urls.end());
|
||||
ASSERT_EQ((*unpins)[object_url.first], 1);
|
||||
}
|
||||
} else {
|
||||
ASSERT_EQ(object_table.object_urls.size(), 3);
|
||||
for (auto &object_url : object_table.object_urls) {
|
||||
auto it = std::find(urls.begin(), urls.end(), object_url.second);
|
||||
ASSERT_TRUE(it != urls.end());
|
||||
ASSERT_EQ((*unpins)[object_url.first], 1);
|
||||
}
|
||||
}
|
||||
|
||||
// We cannot spill anymore as there is no more pinned object.
|
||||
@@ -683,7 +774,7 @@ TEST_F(LocalObjectManagerTest, TestSpillError) {
|
||||
|
||||
std::vector<std::unique_ptr<RayObject>> objects;
|
||||
objects.push_back(std::move(object));
|
||||
manager.PinObjects({object_id}, std::move(objects));
|
||||
manager.PinObjects({object_id}, std::move(objects), owner_address);
|
||||
|
||||
int num_times_fired = 0;
|
||||
manager.SpillObjects({object_id}, [&](const Status &status) mutable {
|
||||
@@ -695,7 +786,11 @@ TEST_F(LocalObjectManagerTest, TestSpillError) {
|
||||
EXPECT_CALL(worker_pool, PushSpillWorker(_));
|
||||
ASSERT_TRUE(
|
||||
worker_pool.io_worker_client->ReplySpillObjects({}, Status::IOError("error")));
|
||||
ASSERT_FALSE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_FALSE(owner_client->ReplyAddSpilledUrl());
|
||||
} else {
|
||||
ASSERT_FALSE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
}
|
||||
ASSERT_EQ(num_times_fired, 1);
|
||||
ASSERT_EQ((*unpins)[object_id], 0);
|
||||
|
||||
@@ -707,9 +802,14 @@ TEST_F(LocalObjectManagerTest, TestSpillError) {
|
||||
std::string url = BuildURL("url");
|
||||
EXPECT_CALL(worker_pool, PushSpillWorker(_));
|
||||
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({url}));
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
|
||||
ASSERT_EQ(owner_client->object_urls[object_id], url);
|
||||
} else {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
ASSERT_EQ(object_table.object_urls[object_id], url);
|
||||
}
|
||||
ASSERT_EQ(num_times_fired, 2);
|
||||
ASSERT_EQ(object_table.object_urls[object_id], url);
|
||||
ASSERT_EQ((*unpins)[object_id], 1);
|
||||
}
|
||||
|
||||
@@ -729,7 +829,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteNoSpilledObjects) {
|
||||
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
|
||||
objects.push_back(std::move(object));
|
||||
}
|
||||
manager.PinObjects(object_ids, std::move(objects));
|
||||
manager.PinObjects(object_ids, std::move(objects), owner_address);
|
||||
manager.WaitForObjectFree(owner_address, object_ids);
|
||||
|
||||
for (size_t i = 0; i < free_objects_batch_size; i++) {
|
||||
@@ -757,7 +857,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpilledObjects) {
|
||||
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
|
||||
objects.push_back(std::move(object));
|
||||
}
|
||||
manager.PinObjects(object_ids, std::move(objects));
|
||||
manager.PinObjects(object_ids, std::move(objects), owner_address);
|
||||
manager.WaitForObjectFree(owner_address, object_ids);
|
||||
|
||||
// 2 Objects are spilled out of 3.
|
||||
@@ -774,7 +874,11 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpilledObjects) {
|
||||
}
|
||||
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
|
||||
for (size_t i = 0; i < object_ids_to_spill.size(); i++) {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
|
||||
} else {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
}
|
||||
}
|
||||
|
||||
// All objects are out of scope now.
|
||||
@@ -805,7 +909,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCount) {
|
||||
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
|
||||
objects.push_back(std::move(object));
|
||||
}
|
||||
manager.PinObjects(object_ids, std::move(objects));
|
||||
manager.PinObjects(object_ids, std::move(objects), owner_address);
|
||||
manager.WaitForObjectFree(owner_address, object_ids);
|
||||
|
||||
// Every object is spilled.
|
||||
@@ -826,7 +930,11 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCount) {
|
||||
}
|
||||
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
|
||||
for (size_t i = 0; i < object_ids_to_spill.size(); i++) {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
|
||||
} else {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
}
|
||||
}
|
||||
|
||||
// Everything is evicted except the last object. In this case, ref count is still > 0.
|
||||
@@ -862,7 +970,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) {
|
||||
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
|
||||
objects.push_back(std::move(object));
|
||||
}
|
||||
manager.PinObjects(object_ids, std::move(objects));
|
||||
manager.PinObjects(object_ids, std::move(objects), owner_address);
|
||||
manager.WaitForObjectFree(owner_address, object_ids);
|
||||
|
||||
// Objects are spilled.
|
||||
@@ -881,7 +989,11 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) {
|
||||
}
|
||||
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
|
||||
for (size_t i = 0; i < 1; i++) {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
|
||||
} else {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
}
|
||||
}
|
||||
// Every object has gone out of scope.
|
||||
for (size_t i = 0; i < free_objects_batch_size; i++) {
|
||||
@@ -900,7 +1012,11 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) {
|
||||
new_urls.push_back(BuildURL("url" + std::to_string(i)));
|
||||
}
|
||||
for (size_t i = 1; i < object_ids_to_spill.size(); i++) {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
|
||||
} else {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
}
|
||||
}
|
||||
|
||||
// Every object is now deleted.
|
||||
@@ -925,7 +1041,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteMaxObjects) {
|
||||
new RayObject(data_buffer, nullptr, std::vector<ObjectID>()));
|
||||
objects.push_back(std::move(object));
|
||||
}
|
||||
manager.PinObjects(object_ids, std::move(objects));
|
||||
manager.PinObjects(object_ids, std::move(objects), owner_address);
|
||||
manager.WaitForObjectFree(owner_address, object_ids);
|
||||
|
||||
std::vector<ObjectID> object_ids_to_spill;
|
||||
@@ -943,7 +1059,11 @@ TEST_F(LocalObjectManagerTest, TestDeleteMaxObjects) {
|
||||
}
|
||||
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls));
|
||||
for (size_t i = 0; i < object_ids_to_spill.size(); i++) {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
if (RayConfig::instance().ownership_based_object_directory_enabled()) {
|
||||
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
|
||||
} else {
|
||||
ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl());
|
||||
}
|
||||
}
|
||||
|
||||
// Every reference has gone out of scope.
|
||||
|
||||
@@ -186,6 +186,9 @@ class CoreWorkerClientInterface {
|
||||
const DeleteSpilledObjectsRequest &request,
|
||||
const ClientCallback<DeleteSpilledObjectsReply> &callback) {}
|
||||
|
||||
virtual void AddSpilledUrl(const AddSpilledUrlRequest &request,
|
||||
const ClientCallback<AddSpilledUrlReply> &callback) {}
|
||||
|
||||
virtual void PlasmaObjectReady(const PlasmaObjectReadyRequest &request,
|
||||
const ClientCallback<PlasmaObjectReadyReply> &callback) {
|
||||
}
|
||||
@@ -251,6 +254,8 @@ class CoreWorkerClient : public std::enable_shared_from_this<CoreWorkerClient>,
|
||||
|
||||
VOID_RPC_CLIENT_METHOD(CoreWorkerService, DeleteSpilledObjects, grpc_client_, override)
|
||||
|
||||
VOID_RPC_CLIENT_METHOD(CoreWorkerService, AddSpilledUrl, grpc_client_, override)
|
||||
|
||||
VOID_RPC_CLIENT_METHOD(CoreWorkerService, PlasmaObjectReady, grpc_client_, override)
|
||||
|
||||
VOID_RPC_CLIENT_METHOD(CoreWorkerService, Exit, grpc_client_, override)
|
||||
|
||||
@@ -44,6 +44,7 @@ namespace rpc {
|
||||
RPC_SERVICE_HANDLER(CoreWorkerService, SpillObjects) \
|
||||
RPC_SERVICE_HANDLER(CoreWorkerService, RestoreSpilledObjects) \
|
||||
RPC_SERVICE_HANDLER(CoreWorkerService, DeleteSpilledObjects) \
|
||||
RPC_SERVICE_HANDLER(CoreWorkerService, AddSpilledUrl) \
|
||||
RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady) \
|
||||
RPC_SERVICE_HANDLER(CoreWorkerService, Exit)
|
||||
|
||||
@@ -65,6 +66,7 @@ namespace rpc {
|
||||
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(SpillObjects) \
|
||||
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(RestoreSpilledObjects) \
|
||||
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(DeleteSpilledObjects) \
|
||||
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AddSpilledUrl) \
|
||||
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PlasmaObjectReady) \
|
||||
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(Exit)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user