From cd7e567a5772befc0bfcc4b994d26a8e56334155 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Thu, 11 Feb 2021 11:36:22 -0700 Subject: [PATCH] [Core] Ownership-based Object Directory - Added support for object spilling in the ownership-based object directory. (#13948) * Add support for object spilling in the ownership-based object directory. * Move owner address hashmap into pinned_objects_ and objects_pending_spill_. * Update local object manager tests. * Feedback and misc. fixes. * Move spilled unpin callback lambda to std::binded private method. * Skip test_delete_objects_multi_node test on MacOS for now. --- python/ray/_raylet.pxd | 3 +- python/ray/_raylet.pyx | 27 +- python/ray/external_storage.py | 61 +++-- python/ray/includes/libcoreworker.pxd | 4 +- python/ray/tests/test_object_spilling.py | 3 +- src/ray/core_worker/core_worker.cc | 44 +++- src/ray/core_worker/core_worker.h | 9 +- src/ray/core_worker/reference_count.cc | 32 ++- src/ray/core_worker/reference_count.h | 21 +- .../ownership_based_object_directory.cc | 182 +++++++++----- src/ray/protobuf/core_worker.proto | 32 ++- src/ray/protobuf/node_manager.proto | 4 + src/ray/raylet/local_object_manager.cc | 134 ++++++---- src/ray/raylet/local_object_manager.h | 18 +- src/ray/raylet/node_manager.cc | 15 +- .../raylet/test/local_object_manager_test.cc | 230 +++++++++++++----- src/ray/rpc/worker/core_worker_client.h | 5 + src/ray/rpc/worker/core_worker_server.h | 2 + 18 files changed, 615 insertions(+), 211 deletions(-) diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd index e8edc78a7..4a0f7b923 100644 --- a/python/ray/_raylet.pxd +++ b/python/ray/_raylet.pxd @@ -101,7 +101,8 @@ cdef class CoreWorker: cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata, size_t data_size, ObjectRef object_ref, c_vector[CObjectID] contained_ids, - CObjectID *c_object_id, shared_ptr[CBuffer] *data) + CObjectID *c_object_id, shared_ptr[CBuffer] *data, + owner_address=*) cdef store_task_outputs( self, worker, outputs, const c_vector[CObjectID] return_ids, c_vector[shared_ptr[CRayObject]] *returns) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 47b6aa4f8..da00f6273 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -628,7 +628,8 @@ cdef void gc_collect() nogil: cdef c_vector[c_string] spill_objects_handler( - const c_vector[CObjectID]& object_ids_to_spill) nogil: + const c_vector[CObjectID]& object_ids_to_spill, + const c_vector[c_string]& owner_addresses) nogil: cdef c_vector[c_string] return_urls with gil: object_refs = VectorToObjectRefs(object_ids_to_spill) @@ -636,7 +637,8 @@ cdef c_vector[c_string] spill_objects_handler( with ray.worker._changeproctitle( ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER, ray_constants.WORKER_PROCESS_TYPE_SPILL_WORKER_IDLE): - urls = external_storage.spill_objects(object_refs) + urls = external_storage.spill_objects( + object_refs, owner_addresses) for url in urls: return_urls.push_back(url) except Exception: @@ -930,7 +932,11 @@ cdef class CoreWorker: cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata, size_t data_size, ObjectRef object_ref, c_vector[CObjectID] contained_ids, - CObjectID *c_object_id, shared_ptr[CBuffer] *data): + CObjectID *c_object_id, shared_ptr[CBuffer] *data, + owner_address=None): + cdef: + CAddress c_owner_address + if object_ref is None: with nogil: check_status(CCoreWorkerProcess.GetCoreWorker().CreateOwned( @@ -938,11 +944,16 @@ cdef class CoreWorker: c_object_id, data)) else: c_object_id[0] = object_ref.native() + if owner_address is None: + c_owner_address = CCoreWorkerProcess.GetCoreWorker( + ).GetRpcAddress() + else: + c_owner_address = CAddress() + c_owner_address.ParseFromString(owner_address) with nogil: check_status(CCoreWorkerProcess.GetCoreWorker().CreateExisting( metadata, data_size, c_object_id[0], - CCoreWorkerProcess.GetCoreWorker().GetRpcAddress(), - data)) + c_owner_address, data)) # If data is nullptr, that means the ObjectRef already existed, # which we ignore. @@ -951,7 +962,8 @@ cdef class CoreWorker: return data.get() == NULL def put_file_like_object( - self, metadata, data_size, file_like, ObjectRef object_ref): + self, metadata, data_size, file_like, ObjectRef object_ref, + owner_address): """Directly create a new Plasma Store object from a file like object. This avoids extra memory copy. @@ -961,6 +973,7 @@ cdef class CoreWorker: file_like: A python file object that provides the `readinto` interface. object_ref: The new ObjectRef. + owner_address: Owner address for this object ref. """ cdef: CObjectID c_object_id @@ -975,7 +988,7 @@ cdef class CoreWorker: object_already_exists = self._create_put_buffer( metadata_buf, data_size, object_ref, ObjectRefsToVector([]), - &c_object_id, &data_buf) + &c_object_id, &data_buf, owner_address) if object_already_exists: logger.debug("Object already exists in 'put_file_like_object'.") return diff --git a/python/ray/external_storage.py b/python/ray/external_storage.py index 26d5c4a4d..138561f43 100644 --- a/python/ray/external_storage.py +++ b/python/ray/external_storage.py @@ -80,6 +80,8 @@ class ExternalStorage(metaclass=abc.ABCMeta): the external storage is invalid. """ + HEADER_LENGTH = 24 + def _get_objects_from_store(self, object_refs): worker = ray.worker.global_worker # Since the object should always exist in the plasma store before @@ -89,18 +91,21 @@ class ExternalStorage(metaclass=abc.ABCMeta): ray_object_pairs = worker.core_worker.get_if_local(object_refs) return ray_object_pairs - def _put_object_to_store(self, metadata, data_size, file_like, object_ref): + def _put_object_to_store(self, metadata, data_size, file_like, object_ref, + owner_address): worker = ray.worker.global_worker worker.core_worker.put_file_like_object(metadata, data_size, file_like, - object_ref) + object_ref, owner_address) def _write_multiple_objects(self, f: IO, object_refs: List[ObjectRef], + owner_addresses: List[str], url: str) -> List[str]: """Fuse all given objects into a given file handle. Args: f(IO): File handle to fusion all given object refs. object_refs(list): Object references to fusion to a single file. + owner_addresses(list): Owner addresses for the provided objects. url(str): url where the object ref is stored in the external storage. @@ -112,13 +117,18 @@ class ExternalStorage(metaclass=abc.ABCMeta): keys = [] offset = 0 ray_object_pairs = self._get_objects_from_store(object_refs) - for ref, (buf, metadata) in zip(object_refs, ray_object_pairs): + for ref, (buf, metadata), owner_address in zip( + object_refs, ray_object_pairs, owner_addresses): + address_len = len(owner_address) metadata_len = len(metadata) buf_len = len(buf) - # 16 bytes to store metadata and buffer length. - data_size_in_bytes = metadata_len + buf_len + 16 + # 24 bytes to store owner address, metadata, and buffer lengths. + data_size_in_bytes = ( + address_len + metadata_len + buf_len + self.HEADER_LENGTH) + f.write(address_len.to_bytes(8, byteorder="little")) f.write(metadata_len.to_bytes(8, byteorder="little")) f.write(buf_len.to_bytes(8, byteorder="little")) + f.write(owner_address) f.write(metadata) f.write(memoryview(buf)) url_with_offset = create_url_with_offset( @@ -127,7 +137,8 @@ class ExternalStorage(metaclass=abc.ABCMeta): offset += data_size_in_bytes return keys - def _size_check(self, metadata_len, buffer_len, obtained_data_size): + def _size_check(self, address_len, metadata_len, buffer_len, + obtained_data_size): """Check whether or not the obtained_data_size is as expected. Args: @@ -138,9 +149,11 @@ class ExternalStorage(metaclass=abc.ABCMeta): Raises: ValueError if obtained_data_size is different from - metadata_len + buffer_len + 16(first 8 bytes to store length). + address_len + metadata_len + buffer_len + + 24 (first 8 bytes to store length). """ - data_size_in_bytes = metadata_len + buffer_len + 16 + data_size_in_bytes = ( + address_len + metadata_len + buffer_len + self.HEADER_LENGTH) if data_size_in_bytes != obtained_data_size: raise ValueError( f"Obtained data has a size of {data_size_in_bytes}, " @@ -148,7 +161,7 @@ class ExternalStorage(metaclass=abc.ABCMeta): f"size of {obtained_data_size}.") @abc.abstractmethod - def spill_objects(self, object_refs) -> List[str]: + def spill_objects(self, object_refs, owner_addresses) -> List[str]: """Spill objects to the external storage. Objects are specified by their object refs. @@ -191,7 +204,7 @@ class ExternalStorage(metaclass=abc.ABCMeta): class NullStorage(ExternalStorage): """The class that represents an uninitialized external storage.""" - def spill_objects(self, object_refs) -> List[str]: + def spill_objects(self, object_refs, owner_addresses) -> List[str]: raise NotImplementedError("External storage is not initialized") def restore_spilled_objects(self, object_refs, url_with_offset_list): @@ -220,7 +233,7 @@ class FileSystemStorage(ExternalStorage): raise ValueError("The given directory path to store objects, " f"{self.directory_path}, could not be created.") - def spill_objects(self, object_refs) -> List[str]: + def spill_objects(self, object_refs, owner_addresses) -> List[str]: if len(object_refs) == 0: return [] # Always use the first object ref as a key when fusioning objects. @@ -228,7 +241,8 @@ class FileSystemStorage(ExternalStorage): filename = f"{first_ref.hex()}-multi-{len(object_refs)}" url = f"{os.path.join(self.directory_path, filename)}" with open(url, "wb") as f: - return self._write_multiple_objects(f, object_refs, url) + return self._write_multiple_objects(f, object_refs, + owner_addresses, url) def restore_spilled_objects(self, object_refs: List[ObjectRef], url_with_offset_list: List[str]): @@ -243,13 +257,17 @@ class FileSystemStorage(ExternalStorage): # Read a part of the file and recover the object. with open(base_url, "rb") as f: f.seek(offset) + address_len = int.from_bytes(f.read(8), byteorder="little") metadata_len = int.from_bytes(f.read(8), byteorder="little") buf_len = int.from_bytes(f.read(8), byteorder="little") - self._size_check(metadata_len, buf_len, parsed_result.size) + self._size_check(address_len, metadata_len, buf_len, + parsed_result.size) total += buf_len + owner_address = f.read(address_len) metadata = f.read(metadata_len) # read remaining data to our buffer - self._put_object_to_store(metadata, buf_len, f, object_ref) + self._put_object_to_store(metadata, buf_len, f, object_ref, + owner_address) return total def delete_spilled_objects(self, urls: List[str]): @@ -320,7 +338,7 @@ class ExternalStorageSmartOpenImpl(ExternalStorage): self.transport_params = {"defer_seek": True} self.transport_params.update(self.override_transport_params) - def spill_objects(self, object_refs) -> List[str]: + def spill_objects(self, object_refs, owner_addresses) -> List[str]: if len(object_refs) == 0: return [] from smart_open import open @@ -331,7 +349,8 @@ class ExternalStorageSmartOpenImpl(ExternalStorage): with open( url, "wb", transport_params=self.transport_params) as file_like: - return self._write_multiple_objects(file_like, object_refs, url) + return self._write_multiple_objects(file_like, object_refs, + owner_addresses, url) def restore_spilled_objects(self, object_refs: List[ObjectRef], url_with_offset_list: List[str]): @@ -352,13 +371,16 @@ class ExternalStorageSmartOpenImpl(ExternalStorage): # smart open seek reads the file from offset-end_of_the_file # when the seek is called. f.seek(offset) + address_len = int.from_bytes(f.read(8), byteorder="little") metadata_len = int.from_bytes(f.read(8), byteorder="little") buf_len = int.from_bytes(f.read(8), byteorder="little") self._size_check(metadata_len, buf_len, parsed_result.size) + owner_address = f.read(address_len) total += buf_len metadata = f.read(metadata_len) # read remaining data to our buffer - self._put_object_to_store(metadata, buf_len, f, object_ref) + self._put_object_to_store(metadata, buf_len, f, object_ref, + owner_address) return total def delete_spilled_objects(self, urls: List[str]): @@ -397,16 +419,17 @@ def reset_external_storage(): _external_storage = NullStorage() -def spill_objects(object_refs): +def spill_objects(object_refs, owner_addresses): """Spill objects to the external storage. Objects are specified by their object refs. Args: object_refs: The list of the refs of the objects to be spilled. + owner_addresses: The owner addresses of the provided object refs. Returns: A list of keys corresponding to the input object refs. """ - return _external_storage.spill_objects(object_refs) + return _external_storage.spill_objects(object_refs, owner_addresses) def restore_spilled_objects(object_refs: List[ObjectRef], diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 0b7c3b0f5..6114b9e7d 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -241,7 +241,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: (void(const CWorkerID &) nogil) on_worker_shutdown (CRayStatus() nogil) check_signals (void() nogil) gc_collect - (c_vector[c_string](const c_vector[CObjectID] &) nogil) spill_objects + (c_vector[c_string]( + const c_vector[CObjectID] &, + const c_vector[c_string] &) nogil) spill_objects (int64_t( const c_vector[CObjectID] &, const c_vector[c_string] &) nogil) restore_spilled_objects diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index 500c66225..e0e3033d2 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -564,7 +564,8 @@ def test_delete_objects_on_worker_failure(object_spilling_config, @pytest.mark.skipif( - platform.system() == "Windows", reason="Failing on Windows.") + platform.system() in ["Windows", "Darwin"], + reason="Failing on Windows and MacOS.") def test_delete_objects_multi_node(multi_node_object_spilling_config, ray_start_cluster): # Limit our object store to 75 MiB of memory. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 73b8b8981..86f6344b5 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1271,6 +1271,8 @@ void CoreWorker::SpillOwnedObject(const ObjectID &object_id, RAY_LOG(ERROR) << "Failed to spill object " << object_id << ", raylet unreachable or object could not be spilled."; } + // TODO(Clark): Provide spilled URL and spilled node ID to callback so it can + // added them to the reference. callback(); }); } @@ -1281,6 +1283,7 @@ Status CoreWorker::SpillObjects(const std::vector &object_ids) { auto ready_promise = std::make_shared>(std::promise()); Status final_status; + // TODO(Clark): Add spilled URL and spilled node ID to reference in this callback. auto callback = [mutex, num_remaining, ready_promise]() { absl::MutexLock lock(mutex.get()); (*num_remaining)--; @@ -1320,7 +1323,10 @@ Status CoreWorker::SpillObjects(const std::vector &object_ids) { ready_promise->get_future().wait(); for (const auto &object_id : object_ids) { - reference_counter_->HandleObjectSpilled(object_id); + // TODO(Clark): Move this to the callback (unless we really wanted to batch it) and + // also include the spilled URL, spilled node ID, and updated object size. + reference_counter_->HandleObjectSpilled(object_id, "", NodeID::Nil(), -1, + /*release*/ true); } return final_status; } @@ -2231,15 +2237,19 @@ void CoreWorker::HandleGetObjectLocationsOwner( auto object_id = ObjectID::FromBinary(request.object_id()); const auto &callback = [object_id, reply, send_reply_callback]( const absl::flat_hash_set &locations, - int64_t object_size, int64_t current_version) { + int64_t object_size, const std::string &spilled_url, + const NodeID &spilled_node_id, int64_t current_version) { RAY_LOG(DEBUG) << "Replying to HandleGetObjectLocationsOwner for " << object_id << " with location update version " << current_version << ", " - << locations.size() << " locations, and " << object_size - << " object size."; + << locations.size() << " locations, " << spilled_url + << " spilled url, " << spilled_node_id << " spilled node ID, and " + << object_size << " object size."; for (const auto &node_id : locations) { reply->add_node_ids(node_id.Binary()); } reply->set_object_size(object_size); + reply->set_spilled_url(spilled_url); + reply->set_spilled_node_id(spilled_node_id.Binary()); reply->set_current_version(current_version); send_reply_callback(Status::OK(), nullptr, nullptr); }; @@ -2432,7 +2442,13 @@ void CoreWorker::HandleSpillObjects(const rpc::SpillObjectsRequest &request, for (const auto &id_binary : request.object_ids_to_spill()) { object_ids_to_spill.push_back(ObjectID::FromBinary(id_binary)); } - std::vector object_urls = options_.spill_objects(object_ids_to_spill); + std::vector owner_addresses; + owner_addresses.reserve(request.owner_addresses_size()); + for (const auto &owner_address : request.owner_addresses()) { + owner_addresses.push_back(owner_address.SerializeAsString()); + } + std::vector object_urls = + options_.spill_objects(object_ids_to_spill, owner_addresses); for (size_t i = 0; i < object_urls.size(); i++) { reply->add_spilled_objects_url(std::move(object_urls[i])); } @@ -2443,6 +2459,24 @@ void CoreWorker::HandleSpillObjects(const rpc::SpillObjectsRequest &request, } } +void CoreWorker::HandleAddSpilledUrl(const rpc::AddSpilledUrlRequest &request, + rpc::AddSpilledUrlReply *reply, + rpc::SendReplyCallback send_reply_callback) { + const ObjectID object_id = ObjectID::FromBinary(request.object_id()); + const std::string &spilled_url = request.spilled_url(); + const NodeID node_id = NodeID::FromBinary(request.spilled_node_id()); + RAY_LOG(DEBUG) << "Received AddSpilledUrl request for object " << object_id + << ", which has been spilled to " << spilled_url << " on node " + << node_id; + auto reference_exists = reference_counter_->HandleObjectSpilled( + object_id, spilled_url, node_id, request.size(), /*release*/ false); + Status status = + reference_exists + ? Status::OK() + : Status::ObjectNotFound("Object " + object_id.Hex() + " not found"); + send_reply_callback(status, nullptr, nullptr); +} + void CoreWorker::HandleRestoreSpilledObjects( const rpc::RestoreSpilledObjectsRequest &request, rpc::RestoreSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index e16326441..2ced7a10f 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -137,7 +137,9 @@ struct CoreWorkerOptions { /// be held up in garbage objects. std::function gc_collect; /// Application-language callback to spill objects to external storage. - std::function(const std::vector &)> spill_objects; + std::function(const std::vector &, + const std::vector &)> + spill_objects; /// Application-language callback to restore objects from external storage. std::function &, const std::vector &)> restore_spilled_objects; @@ -911,6 +913,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { rpc::SpillObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) override; + // Add spilled URL to owned reference. + void HandleAddSpilledUrl(const rpc::AddSpilledUrlRequest &request, + rpc::AddSpilledUrlReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + // Restore objects from external storage. void HandleRestoreSpilledObjects(const rpc::RestoreSpilledObjectsRequest &request, rpc::RestoreSpilledObjectsReply *reply, diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index db05320a9..87400ca21 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -960,17 +960,33 @@ size_t ReferenceCounter::GetObjectSize(const ObjectID &object_id) const { return it->second.object_size; } -void ReferenceCounter::HandleObjectSpilled(const ObjectID &object_id) { +bool ReferenceCounter::HandleObjectSpilled(const ObjectID &object_id, + const std::string spilled_url, + const NodeID &spilled_node_id, int64_t size, + bool release) { absl::MutexLock lock(&mutex_); auto it = object_id_refs_.find(object_id); if (it == object_id_refs_.end()) { RAY_LOG(WARNING) << "Spilled object " << object_id << " already out of scope"; - return; + return false; } it->second.spilled = true; - // Release the primary plasma copy, if any. - ReleasePlasmaObject(it); + if (spilled_url != "") { + it->second.spilled_url = spilled_url; + } + if (!spilled_node_id.IsNil()) { + it->second.spilled_node_id = spilled_node_id; + } + if (size > 0) { + it->second.object_size = size; + } + PushToLocationSubscribers(it); + if (release) { + // Release the primary plasma copy, if any. + ReleasePlasmaObject(it); + } + return true; } absl::optional ReferenceCounter::GetLocalityData( @@ -1010,8 +1026,9 @@ void ReferenceCounter::PushToLocationSubscribers(ReferenceTable::iterator it) { const auto callbacks = it->second.location_subscription_callbacks; it->second.location_subscription_callbacks.clear(); it->second.location_version++; - for (const auto &callback : callbacks) { - callback(it->second.locations, it->second.object_size, it->second.location_version); + for (const auto callback : callbacks) { + callback(it->second.locations, it->second.object_size, it->second.spilled_url, + it->second.spilled_node_id, it->second.location_version); } } @@ -1031,7 +1048,8 @@ Status ReferenceCounter::SubscribeObjectLocations( // If the last location version is less than the current location version, we // already have location data that the subscriber hasn't seen yet, so we immediately // invoke the callback. - callback(it->second.locations, it->second.object_size, it->second.location_version); + callback(it->second.locations, it->second.object_size, it->second.spilled_url, + it->second.spilled_node_id, it->second.location_version); } else { // Otherwise, save the callback for later invocation. it->second.location_subscription_callbacks.push_back(callback); diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index 014b94714..415044d70 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -51,7 +51,8 @@ class ReferenceCounterInterface { // Callback for location subscriptions. using LocationSubscriptionCallback = - std::function &, int64_t, int64_t)>; + std::function &, int64_t, const std::string &, + const NodeID &, int64_t)>; /// Class used by the core worker to keep track of ObjectID reference counts for garbage /// collection. This class is thread safe. @@ -423,8 +424,15 @@ class ReferenceCounter : public ReferenceCounterInterface, /// Handle an object has been spilled to external storage. /// /// This notifies the primary raylet that the object is safe to release and - /// records that the object has been spilled to suppress reconstruction. - void HandleObjectSpilled(const ObjectID &object_id); + /// records the spill URL, spill node ID, and updated object size. + /// \param[in] object_id The object that has been spilled. + /// \param[in] spilled_url The URL to which the object has been spilled. + /// \param[in] spilled_node_id The ID of the node on which the object was spilled. + /// \param[in] size The size of the object. + /// \param[in] release Whether to release the reference. + /// \return True if the reference exists, false otherwise. + bool HandleObjectSpilled(const ObjectID &object_id, const std::string spilled_url, + const NodeID &spilled_node_id, int64_t size, bool release); /// Get locality data for object. absl::optional GetLocalityData(const ObjectID &object_id); @@ -586,6 +594,13 @@ class ReferenceCounter : public ReferenceCounterInterface, size_t lineage_ref_count = 0; /// Whether this object has been spilled to external storage. bool spilled = false; + /// For objects that have been spilled to external storage, the URL from which + /// they can be retrieved. + std::string spilled_url = ""; + /// The ID of the node that spilled the object. + /// This will be Nil if the object has not been spilled or if it is spilled + /// distributed external storage. + NodeID spilled_node_id = NodeID::Nil(); /// Location subscription callbacks registered by async location get requests. /// These will be invoked whenever locations or object_size are changed. std::vector location_subscription_callbacks; diff --git a/src/ray/object_manager/ownership_based_object_directory.cc b/src/ray/object_manager/ownership_based_object_directory.cc index 3f2ccc540..e5477c0c2 100644 --- a/src/ray/object_manager/ownership_based_object_directory.cc +++ b/src/ray/object_manager/ownership_based_object_directory.cc @@ -34,6 +34,56 @@ void FilterRemovedNodes(std::shared_ptr gcs_client, } } +/// Update object location data based on response from the owning core worker. +bool UpdateObjectLocations(const rpc::GetObjectLocationsOwnerReply &location_reply, + const Status &status, const ObjectID &object_id, + std::shared_ptr gcs_client, + std::unordered_set *node_ids, std::string *spilled_url, + NodeID *spilled_node_id, size_t *object_size) { + bool is_updated = false; + + std::unordered_set new_node_ids; + + if (!status.ok()) { + RAY_LOG(INFO) << "Failed to return location updates to subscribers for " << object_id + << ": " << status.ToString() + << ", assuming that the object was freed or evicted."; + // When we can't get location updates from the owner, we assume that the object was + // freed or evicted, so we send an empty location update to all subscribers. + *node_ids = new_node_ids; + is_updated = true; + } else { + // The size can be 0 if the update was a deletion. This assumes that an + // object's size is always greater than 0. + // TODO(swang): If that's not the case, we should use a flag to check + // whether the size is set instead. + if (location_reply.object_size() > 0) { + *object_size = location_reply.object_size(); + is_updated = true; + } + for (auto const &node_id : location_reply.node_ids()) { + new_node_ids.emplace(NodeID::FromBinary(node_id)); + } + // Filter out the removed nodes from the object locations. + FilterRemovedNodes(gcs_client, &new_node_ids); + if (new_node_ids != *node_ids) { + *node_ids = new_node_ids; + is_updated = true; + } + const std::string &new_spilled_url = location_reply.spilled_url(); + if (new_spilled_url != *spilled_url) { + const auto new_spilled_node_id = + NodeID::FromBinary(location_reply.spilled_node_id()); + RAY_LOG(DEBUG) << "Received object spilled to " << new_spilled_url << " spilled on " + << new_spilled_node_id; + *spilled_url = new_spilled_url; + *spilled_node_id = new_spilled_node_id; + is_updated = true; + } + } + return is_updated; +} + rpc::Address GetOwnerAddressFromObjectInfo( const object_manager::protocol::ObjectInfoT &object_info) { rpc::Address owner_address; @@ -141,28 +191,13 @@ void OwnershipBasedObjectDirectory::SubscriptionCallback( if (it == listeners_.end()) { return; } - std::unordered_set node_ids; - // Once this flag is set to true, it should never go back to false. it->second.subscribed = true; - if (!status.ok()) { - RAY_LOG(INFO) << "Worker " << worker_id << " failed to return location updates to " - << "subscribers for " << object_id << ": " << status.ToString() - << ", assuming that the object was freed or evicted."; - it->second.object_size = 0; - } else { - if (reply.object_size() > 0) { - it->second.object_size = reply.object_size(); - } - - for (auto const &node_id : reply.node_ids()) { - node_ids.emplace(NodeID::FromBinary(node_id)); - } - FilterRemovedNodes(gcs_client_, &node_ids); - } - if (node_ids != it->second.current_object_locations || !status.ok()) { - it->second.current_object_locations = std::move(node_ids); + // Update entries for this object. + if (UpdateObjectLocations(reply, status, object_id, gcs_client_, + &it->second.current_object_locations, &it->second.spilled_url, + &it->second.spilled_node_id, &it->second.object_size)) { // Copy the callbacks so that the callbacks can unsubscribe without interrupting // looping over the callbacks. auto callbacks = it->second.callbacks; @@ -171,10 +206,12 @@ void OwnershipBasedObjectDirectory::SubscriptionCallback( // empty, since this may indicate that the objects have been evicted from // all nodes. for (const auto &callback_pair : callbacks) { - // It is safe to call the callback directly since this is already running - // in the subscription callback stack. - callback_pair.second(object_id, it->second.current_object_locations, "", - NodeID::Nil(), it->second.object_size); + // We can call the callback directly without worrying about invalidating caller + // iterators since this is already running in the subscription callback stack. + // See https://github.com/ray-project/ray/issues/2959. + callback_pair.second(object_id, it->second.current_object_locations, + it->second.spilled_url, it->second.spilled_node_id, + it->second.object_size); } } @@ -222,10 +259,16 @@ ray::Status OwnershipBasedObjectDirectory::SubscribeObjectLocations( // immediately notify the caller of the current known locations. if (listener_state.subscribed) { auto &locations = listener_state.current_object_locations; - auto object_size = it->second.object_size; - io_service_.post([callback, locations, object_size, object_id]() { - callback(object_id, locations, "", NodeID::Nil(), object_size); - }); + auto &spilled_url = listener_state.spilled_url; + auto &spilled_node_id = listener_state.spilled_node_id; + auto object_size = listener_state.object_size; + // We post the callback to the event loop in order to avoid mutating data structures + // shared with the caller and potentially invalidating caller iterators. + // See https://github.com/ray-project/ray/issues/2959. + io_service_.post( + [callback, locations, spilled_url, spilled_node_id, object_size, object_id]() { + callback(object_id, locations, spilled_url, spilled_node_id, object_size); + }); } return Status::OK(); } @@ -246,36 +289,63 @@ ray::Status OwnershipBasedObjectDirectory::UnsubscribeObjectLocations( ray::Status OwnershipBasedObjectDirectory::LookupLocations( const ObjectID &object_id, const rpc::Address &owner_address, const OnLocationsFound &callback) { - WorkerID worker_id = WorkerID::FromBinary(owner_address.worker_id()); - std::shared_ptr rpc_client = GetClient(owner_address); - if (rpc_client == nullptr) { - RAY_LOG(WARNING) << "Object " << object_id << " does not have owner. " - << "LookupLocations returns an empty list of locations."; - io_service_.post([callback, object_id]() { - callback(object_id, std::unordered_set(), "", NodeID::Nil(), 0); - }); - return Status::OK(); - } - - rpc::GetObjectLocationsOwnerRequest request; - request.set_intended_worker_id(owner_address.worker_id()); - request.set_object_id(object_id.Binary()); - request.set_last_version(-1); - - rpc_client->GetObjectLocationsOwner( - request, [this, worker_id, object_id, callback]( - Status status, const rpc::GetObjectLocationsOwnerReply &reply) { - if (!status.ok()) { - RAY_LOG(ERROR) << "Worker " << worker_id << " failed to get the location for " - << object_id; - } - std::unordered_set node_ids; - for (auto const &node_id : reply.node_ids()) { - node_ids.emplace(NodeID::FromBinary(node_id)); - } - FilterRemovedNodes(gcs_client_, &node_ids); - callback(object_id, node_ids, "", NodeID::Nil(), reply.object_size()); + auto it = listeners_.find(object_id); + if (it != listeners_.end() && it->second.subscribed) { + // If we have locations cached due to a concurrent SubscribeObjectLocations + // call, and we have received at least one update from the owner about + // the object's creation, then call the callback immediately with the + // cached locations. + auto &locations = it->second.current_object_locations; + auto &spilled_url = it->second.spilled_url; + auto &spilled_node_id = it->second.spilled_node_id; + auto object_size = it->second.object_size; + // We post the callback to the event loop in order to avoid mutating data structures + // shared with the caller and potentially invalidating caller iterators. + // See https://github.com/ray-project/ray/issues/2959. + io_service_.post( + [callback, object_id, locations, spilled_url, spilled_node_id, object_size]() { + callback(object_id, locations, spilled_url, spilled_node_id, object_size); + }); + } else { + WorkerID worker_id = WorkerID::FromBinary(owner_address.worker_id()); + std::shared_ptr rpc_client = GetClient(owner_address); + if (rpc_client == nullptr) { + RAY_LOG(WARNING) << "Object " << object_id << " does not have owner. " + << "LookupLocations returns an empty list of locations."; + // We post the callback to the event loop in order to avoid mutating data structures + // shared with the caller and potentially invalidating caller iterators. + // See https://github.com/ray-project/ray/issues/2959. + io_service_.post([callback, object_id]() { + callback(object_id, std::unordered_set(), "", NodeID::Nil(), 0); }); + return Status::OK(); + } + + rpc::GetObjectLocationsOwnerRequest request; + request.set_intended_worker_id(owner_address.worker_id()); + request.set_object_id(object_id.Binary()); + request.set_last_version(-1); + + rpc_client->GetObjectLocationsOwner( + request, [this, worker_id, object_id, callback]( + Status status, const rpc::GetObjectLocationsOwnerReply &reply) { + if (!status.ok()) { + RAY_LOG(ERROR) << "Worker " << worker_id << " failed to get the location for " + << object_id; + } + std::unordered_set node_ids; + std::string spilled_url; + NodeID spilled_node_id; + size_t object_size = 0; + UpdateObjectLocations(reply, status, object_id, gcs_client_, &node_ids, + &spilled_url, &spilled_node_id, &object_size); + // We can call the callback directly without worrying about invalidating + // caller iterators since this is already running in the core worker + // client's lookup callback stack. + // See https://github.com/ray-project/ray/issues/2959. + callback(object_id, node_ids, spilled_url, spilled_node_id, object_size); + }); + } return Status::OK(); } diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index ef5f97302..66d5eb570 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -189,10 +189,18 @@ message GetObjectLocationsOwnerRequest { } message GetObjectLocationsOwnerReply { + // The IDs of the nodes that this object appeared on or was evicted by. repeated bytes node_ids = 1; + // The size of the object in bytes. uint64 object_size = 2; + // The object has been spilled to this URL. This should be set xor the above + // fields are set. + string spilled_url = 3; + // The ID of the node that spilled the object. + // This will be Nil if the object was spilled to distributed external storage. + bytes spilled_node_id = 4; // The version of the returned location updates. - int64 current_version = 3; + int64 current_version = 5; } message KillActorRequest { @@ -306,6 +314,9 @@ message PlasmaObjectReadyReply { message SpillObjectsRequest { // The IDs of objects to be spilled. repeated bytes object_ids_to_spill = 1; + // The owner addresses of the objects to be spilled. Must be in the same order as + // object_ids_to_spill. + repeated Address owner_addresses = 2; } message SpillObjectsReply { @@ -333,6 +344,22 @@ message DeleteSpilledObjectsRequest { message DeleteSpilledObjectsReply { } +message AddSpilledUrlRequest { + // Object that was spilled. + bytes object_id = 1; + // For objects that have been spilled to external storage, the URL from which + // they can be retrieved. + string spilled_url = 2; + // The ID of the node that spilled the object. + // This will be Nil if the object was spilled to distributed external storage. + bytes spilled_node_id = 3; + // The size of the object in bytes. + int64 size = 4; +} + +message AddSpilledUrlReply { +} + message ExitRequest { } @@ -385,6 +412,9 @@ service CoreWorkerService { // Delete spilled objects from external storage. Caller: raylet; callee: I/O worker. rpc DeleteSpilledObjects(DeleteSpilledObjectsRequest) returns (DeleteSpilledObjectsReply); + // Add spilled URL, spilled node ID, and update object size for owned object. + // Caller: raylet; callee: owner worker. + rpc AddSpilledUrl(AddSpilledUrlRequest) returns (AddSpilledUrlReply); // Notification from raylet that an object ID is available in local plasma. rpc PlasmaObjectReady(PlasmaObjectReadyRequest) returns (PlasmaObjectReadyReply); // Request for a worker to exit. diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 8e225293c..9273665f3 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -179,6 +179,10 @@ message RequestObjectSpillageRequest { message RequestObjectSpillageReply { // Whether the object spilling was successful or not. bool success = 1; + // Object URL where the object is spilled. + string object_url = 2; + // The node id of a node where the object is spilled. + bytes spilled_node_id = 3; } message RestoreSpilledObjectRequest { diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index ef9e53e21..3ee7de57c 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -21,7 +21,8 @@ namespace ray { namespace raylet { void LocalObjectManager::PinObjects(const std::vector &object_ids, - std::vector> &&objects) { + std::vector> &&objects, + const rpc::Address &owner_address) { RAY_CHECK(object_pinning_enabled_); for (size_t i = 0; i < object_ids.size(); i++) { const auto &object_id = object_ids[i]; @@ -33,7 +34,7 @@ void LocalObjectManager::PinObjects(const std::vector &object_ids, } RAY_LOG(DEBUG) << "Pinning object " << object_id; pinned_objects_size_ += object->GetSize(); - pinned_objects_.emplace(object_id, std::move(object)); + pinned_objects_.emplace(object_id, std::make_pair(std::move(object), owner_address)); } } @@ -71,7 +72,7 @@ void LocalObjectManager::ReleaseFreedObject(const ObjectID &object_id) { spilled_object_pending_delete_.push(object_id); } if (pinned_objects_.count(object_id)) { - pinned_objects_size_ -= pinned_objects_[object_id]->GetSize(); + pinned_objects_size_ -= pinned_objects_[object_id].first->GetSize(); pinned_objects_.erase(object_id); } } @@ -143,7 +144,7 @@ bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) { std::vector objects_to_spill; while (bytes_to_spill <= num_bytes_to_spill && it != pinned_objects_.end()) { if (is_plasma_object_spillable_(it->first)) { - bytes_to_spill += it->second->GetSize(); + bytes_to_spill += it->second.first->GetSize(); objects_to_spill.push_back(it->first); } it++; @@ -155,7 +156,7 @@ bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) { SpillObjectsInternal(objects_to_spill, [this, bytes_to_spill, objects_to_spill, start_time](const Status &status) { if (!status.ok()) { - RAY_LOG(ERROR) << "Error spilling objects " << status.ToString(); + RAY_LOG(INFO) << "Failed to spill objects: " << status.ToString(); } else { auto now = absl::GetCurrentTimeNanos(); RAY_LOG(DEBUG) << "Spilled " << bytes_to_spill << " bytes in " @@ -210,7 +211,7 @@ void LocalObjectManager::SpillObjectsInternal( if (it != pinned_objects_.end()) { RAY_LOG(DEBUG) << "Spilling object " << id; objects_to_spill.push_back(id); - num_bytes_pending_spill_ += it->second->GetSize(); + num_bytes_pending_spill_ += it->second.first->GetSize(); objects_pending_spill_[id] = std::move(it->second); pinned_objects_.erase(it); } @@ -228,6 +229,9 @@ void LocalObjectManager::SpillObjectsInternal( for (const auto &object_id : objects_to_spill) { RAY_LOG(DEBUG) << "Sending spill request for object " << object_id; request.add_object_ids_to_spill(object_id.Binary()); + auto it = objects_pending_spill_.find(object_id); + RAY_CHECK(it != objects_pending_spill_.end()); + request.add_owner_addresses()->MergeFrom(it->second.second); } io_worker->rpc_client()->SpillObjects( request, [this, objects_to_spill, callback, io_worker]( @@ -241,7 +245,7 @@ void LocalObjectManager::SpillObjectsInternal( for (const auto &object_id : objects_to_spill) { auto it = objects_pending_spill_.find(object_id); RAY_CHECK(it != objects_pending_spill_.end()); - pinned_objects_size_ += it->second->GetSize(); + pinned_objects_size_ += it->second.first->GetSize(); pinned_objects_.emplace(object_id, std::move(it->second)); objects_pending_spill_.erase(it); } @@ -258,6 +262,46 @@ void LocalObjectManager::SpillObjectsInternal( }); } +void LocalObjectManager::UnpinSpilledObjectCallback( + const ObjectID &object_id, const std::string &object_url, + std::shared_ptr num_remaining, + std::function callback, ray::Status status) { + if (!status.ok()) { + RAY_LOG(INFO) << "Failed to send spilled url for object " << object_id + << " to object directory, considering the object to have been freed: " + << status.ToString(); + } else { + RAY_LOG(DEBUG) << "Object " << object_id << " spilled to " << object_url + << " and object directory has been informed"; + } + RAY_LOG(DEBUG) << "Unpinning pending spill object " << object_id; + // Unpin the object. + auto it = objects_pending_spill_.find(object_id); + RAY_CHECK(it != objects_pending_spill_.end()); + num_bytes_pending_spill_ -= it->second.first->GetSize(); + objects_pending_spill_.erase(it); + + // Update the object_id -> url_ref_count to use it for deletion later. + // We need to track the references here because a single file can contain + // multiple objects, and we shouldn't delete the file until + // all the objects are gone out of scope. + // object_url is equivalent to url_with_offset. + auto parsed_url = ParseURL(object_url); + const auto base_url_it = parsed_url->find("url"); + RAY_CHECK(base_url_it != parsed_url->end()); + if (!url_ref_count_.contains(base_url_it->second)) { + url_ref_count_[base_url_it->second] = 1; + } else { + url_ref_count_[base_url_it->second] += 1; + } + spilled_objects_url_.emplace(object_id, object_url); + + (*num_remaining)--; + if (*num_remaining == 0 && callback) { + callback(status); + } +} + void LocalObjectManager::AddSpilledUrls( const std::vector &object_ids, const rpc::SpillObjectsReply &worker_reply, std::function callback) { @@ -274,39 +318,36 @@ void LocalObjectManager::AddSpilledUrls( auto it = objects_pending_spill_.find(object_id); RAY_CHECK(it != objects_pending_spill_.end()); - // Write to object directory. Wait for the write to finish before - // releasing the object to make sure that the spilled object can - // be retrieved by other raylets. - RAY_CHECK_OK(object_info_accessor_.AsyncAddSpilledUrl( - object_id, object_url, node_id_object_spilled, it->second->GetSize(), - [this, object_id, object_url, callback, num_remaining](Status status) { - RAY_CHECK_OK(status); - // Unpin the object. - auto it = objects_pending_spill_.find(object_id); - RAY_CHECK(it != objects_pending_spill_.end()); - num_bytes_pending_spill_ -= it->second->GetSize(); - objects_pending_spill_.erase(it); + auto unpin_callback = + std::bind(&LocalObjectManager::UnpinSpilledObjectCallback, this, object_id, + object_url, num_remaining, callback, std::placeholders::_1); - // Update the object_id -> url_ref_count to use it for deletion later. - // We need to track the references here because a single file can contain - // multiple objects, and we shouldn't delete the file until - // all the objects are gone out of scope. - // object_url is equivalent to url_with_offset. - auto parsed_url = ParseURL(object_url); - const auto base_url_it = parsed_url->find("url"); - RAY_CHECK(base_url_it != parsed_url->end()); - if (!url_ref_count_.contains(base_url_it->second)) { - url_ref_count_[base_url_it->second] = 1; - } else { - url_ref_count_[base_url_it->second] += 1; - } - spilled_objects_url_.emplace(object_id, object_url); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + // TODO(Clark): Don't send RPC to owner if we're fulfilling an owner-initiated + // spill RPC. + rpc::AddSpilledUrlRequest request; + request.set_object_id(object_id.Binary()); + request.set_spilled_url(object_url); + request.set_spilled_node_id(node_id_object_spilled.Binary()); + request.set_size(it->second.first->GetSize()); - (*num_remaining)--; - if (*num_remaining == 0 && callback) { - callback(status); - } - })); + auto owner_client = owner_client_pool_.GetOrConnect(it->second.second); + RAY_LOG(DEBUG) << "Sending spilled URL " << object_url << " for object " + << object_id << " to owner " + << WorkerID::FromBinary(it->second.second.worker_id()); + // Send spilled URL, spilled node ID, and object size to owner. + owner_client->AddSpilledUrl( + request, [unpin_callback](Status status, const rpc::AddSpilledUrlReply &reply) { + unpin_callback(status); + }); + } else { + // Write to object directory. Wait for the write to finish before + // releasing the object to make sure that the spilled object can + // be retrieved by other raylets. + RAY_CHECK_OK(object_info_accessor_.AsyncAddSpilledUrl( + object_id, object_url, node_id_object_spilled, it->second.first->GetSize(), + unpin_callback)); + } } } @@ -321,11 +362,11 @@ void LocalObjectManager::AsyncRestoreSpilledObject( if (!node_id.IsNil() && node_id != self_node_id_) { // If we know where this object was spilled, and the current node is not that one, // send a RPC to a remote node that spilled the object to restore it. - RAY_LOG(DEBUG) << "Send a object restoration request of id: " << object_id + RAY_LOG(DEBUG) << "Send an object restoration request of id: " << object_id << " to a remote node: " << node_id; // TODO(sang): We need to deduplicate this remote RPC. Since restore request - // is retried every 10ms without exponential backoff, this can add huge overhead to a - // remote node that spilled the object. + // is retried every 10ms without exponential backoff, this can add huge overhead to + // a remote node that spilled the object. restore_object_from_remote_node_(object_id, object_url, node_id); if (callback) { callback(Status::OK()); @@ -395,9 +436,9 @@ void LocalObjectManager::ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_siz object_urls_to_delete.size() < max_batch_size) { auto &object_id = spilled_object_pending_delete_.front(); // If the object is still spilling, do nothing. This will block other entries to be - // processed, but it should be fine because the spilling will be eventually done, and - // deleting objects is the low priority tasks. - // This will instead enable simpler logic after this block. + // processed, but it should be fine because the spilling will be eventually done, + // and deleting objects is the low priority tasks. This will instead enable simpler + // logic after this block. if (objects_pending_spill_.contains(object_id)) { break; } @@ -405,8 +446,8 @@ void LocalObjectManager::ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_siz // Object id is either spilled or not spilled at this point. const auto spilled_objects_url_it = spilled_objects_url_.find(object_id); if (spilled_objects_url_it != spilled_objects_url_.end()) { - // If the object was spilled, see if we can delete it. We should first check the ref - // count. + // If the object was spilled, see if we can delete it. We should first check the + // ref count. std::string &object_url = spilled_objects_url_it->second; // Note that here, we need to parse the object url to obtain the base_url. auto parsed_url = ParseURL(object_url); @@ -475,5 +516,4 @@ std::string LocalObjectManager::DebugString() const { } }; // namespace raylet - }; // namespace ray diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index 57ef8d3a1..267edabd9 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -70,8 +70,10 @@ class LocalObjectManager { /// \param object_ids The objects to be pinned. /// \param objects Pointers to the objects to be pinned. The pointer should /// be kept in scope until the object can be released. + /// \param owner_address The owner of the objects to be pinned. void PinObjects(const std::vector &object_ids, - std::vector> &&objects); + std::vector> &&objects, + const rpc::Address &owner_address); /// Wait for the objects' owner to free the object. The objects will be /// released when the owner at the given address fails or replies that the @@ -164,6 +166,14 @@ class LocalObjectManager { /// objects. void FlushFreeObjects(); + // A callback for unpinning spilled objects. This should be invoked after the object + // has been spilled and after the object directory has been sent the spilled URL. + void UnpinSpilledObjectCallback(const ObjectID &object_id, + const std::string &object_url, + std::shared_ptr num_remaining, + std::function callback, + ray::Status status); + /// Add objects' spilled URLs to the global object directory. Call the /// callback once all URLs have been added. void AddSpilledUrls(const std::vector &object_ids, @@ -203,7 +213,8 @@ class LocalObjectManager { std::function &)> on_objects_freed_; // Objects that are pinned on this node. - absl::flat_hash_map> pinned_objects_; + absl::flat_hash_map, rpc::Address>> + pinned_objects_; // Total size of objects pinned on this node. size_t pinned_objects_size_ = 0; @@ -211,7 +222,8 @@ class LocalObjectManager { // Objects that were pinned on this node but that are being spilled. // These objects will be released once spilling is complete and the URL is // written to the object directory. - absl::flat_hash_map> objects_pending_spill_; + absl::flat_hash_map, rpc::Address>> + objects_pending_spill_; /// Objects that were spilled on this node but that are being restored. /// The field is used to dedup the same restore request while restoration is in diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 9b66d0a7c..2287fd3e8 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -516,11 +516,17 @@ void NodeManager::DoLocalGC() { void NodeManager::HandleRequestObjectSpillage( const rpc::RequestObjectSpillageRequest &request, rpc::RequestObjectSpillageReply *reply, rpc::SendReplyCallback send_reply_callback) { + const auto &object_id = ObjectID::FromBinary(request.object_id()); + RAY_LOG(DEBUG) << "Received RequestObjectSpillage for object " << object_id; local_object_manager_.SpillObjects( - {ObjectID::FromBinary(request.object_id())}, - [reply, send_reply_callback](const ray::Status &status) { + {object_id}, [object_id, reply, send_reply_callback](const ray::Status &status) { if (status.ok()) { + RAY_LOG(DEBUG) << "Object " << object_id + << " has been spilled, replying to owner"; reply->set_success(true); + // TODO(Clark): Add spilled URLs and spilled node ID to owner RPC reply here + // if OBOD is enabled, instead of relying on automatic raylet spilling path to + // send an extra RPC to the owner. } send_reply_callback(Status::OK(), nullptr, nullptr); }); @@ -2406,6 +2412,7 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, rpc::SendReplyCallback send_reply_callback) { std::vector object_ids; object_ids.reserve(request.object_ids_size()); + const auto &owner_address = request.owner_address(); for (const auto &object_id_binary : request.object_ids()) { object_ids.push_back(ObjectID::FromBinary(object_id_binary)); } @@ -2419,10 +2426,10 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, send_reply_callback(Status::Invalid("Failed to get objects."), nullptr, nullptr); return; } - local_object_manager_.PinObjects(object_ids, std::move(results)); + local_object_manager_.PinObjects(object_ids, std::move(results), owner_address); } // Wait for the object to be freed by the owner, which keeps the ref count. - local_object_manager_.WaitForObjectFree(request.owner_address(), object_ids); + local_object_manager_.WaitForObjectFree(owner_address, object_ids); send_reply_callback(Status::OK(), nullptr, nullptr); } diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index f68707ce7..d056928c0 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -37,21 +37,41 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { void WaitForObjectEviction( const rpc::WaitForObjectEvictionRequest &request, const rpc::ClientCallback &callback) override { - callbacks.push_back(callback); + eviction_callbacks.push_back(callback); } bool ReplyObjectEviction(Status status = Status::OK()) { - if (callbacks.size() == 0) { + if (eviction_callbacks.empty()) { return false; } - auto callback = callbacks.front(); + auto callback = eviction_callbacks.front(); auto reply = rpc::WaitForObjectEvictionReply(); callback(status, reply); - callbacks.pop_front(); + eviction_callbacks.pop_front(); return true; } - std::list> callbacks; + void AddSpilledUrl( + const rpc::AddSpilledUrlRequest &request, + const rpc::ClientCallback &callback) override { + object_urls.emplace(ObjectID::FromBinary(request.object_id()), request.spilled_url()); + spilled_url_callbacks.push_back(callback); + } + + bool ReplyAddSpilledUrl(Status status = Status::OK()) { + if (spilled_url_callbacks.empty()) { + return false; + } + auto callback = spilled_url_callbacks.front(); + auto reply = rpc::AddSpilledUrlReply(); + callback(status, reply); + spilled_url_callbacks.pop_front(); + return true; + } + + std::deque> eviction_callbacks; + std::unordered_map object_urls; + std::deque> spilled_url_callbacks; }; class MockIOWorkerClient : public rpc::CoreWorkerClientInterface { @@ -334,7 +354,7 @@ TEST_F(LocalObjectManagerTest, TestPin) { new RayObject(nullptr, meta_buffer, std::vector())); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects)); + manager.PinObjects(object_ids, std::move(objects), owner_address); manager.WaitForObjectFree(owner_address, object_ids); for (size_t i = 0; i < free_objects_batch_size; i++) { @@ -349,6 +369,8 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) { // First, spill objects. std::vector object_ids; std::vector> objects; + rpc::Address owner_address; + owner_address.set_worker_id(WorkerID::FromRandom().Binary()); for (size_t i = 0; i < free_objects_batch_size; i++) { ObjectID object_id = ObjectID::FromRandom(); @@ -358,7 +380,7 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) { new RayObject(data_buffer, nullptr, std::vector())); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects)); + manager.PinObjects(object_ids, std::move(objects), owner_address); manager.SpillObjects(object_ids, [&](const Status &status) mutable { ASSERT_TRUE(status.ok()); }); @@ -368,7 +390,11 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) { } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); for (size_t i = 0; i < object_ids.size(); i++) { - ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + } else { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + } } // Then try restoring objects from local. @@ -416,6 +442,8 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) { TEST_F(LocalObjectManagerTest, TestExplicitSpill) { std::vector object_ids; std::vector> objects; + rpc::Address owner_address; + owner_address.set_worker_id(WorkerID::FromRandom().Binary()); for (size_t i = 0; i < free_objects_batch_size; i++) { ObjectID object_id = ObjectID::FromRandom(); @@ -425,7 +453,7 @@ TEST_F(LocalObjectManagerTest, TestExplicitSpill) { new RayObject(data_buffer, nullptr, std::vector())); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects)); + manager.PinObjects(object_ids, std::move(objects), owner_address); int num_times_fired = 0; manager.SpillObjects(object_ids, [&](const Status &status) mutable { @@ -444,11 +472,19 @@ TEST_F(LocalObjectManagerTest, TestExplicitSpill) { } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); for (size_t i = 0; i < object_ids.size(); i++) { - ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + } else { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + } } ASSERT_EQ(num_times_fired, 1); for (size_t i = 0; i < object_ids.size(); i++) { - ASSERT_EQ(object_table.object_urls[object_ids[i]], urls[i]); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_EQ(owner_client->object_urls[object_ids[i]], urls[i]); + } else { + ASSERT_EQ(object_table.object_urls[object_ids[i]], urls[i]); + } } for (const auto &id : object_ids) { ASSERT_EQ((*unpins)[id], 1); @@ -470,7 +506,7 @@ TEST_F(LocalObjectManagerTest, TestDuplicateSpill) { new RayObject(data_buffer, nullptr, std::vector())); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects)); + manager.PinObjects(object_ids, std::move(objects), owner_address); manager.WaitForObjectFree(owner_address, object_ids); int num_times_fired = 0; @@ -494,11 +530,19 @@ TEST_F(LocalObjectManagerTest, TestDuplicateSpill) { EXPECT_CALL(worker_pool, PushSpillWorker(_)); ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); for (size_t i = 0; i < object_ids.size(); i++) { - ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + } else { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + } } ASSERT_EQ(num_times_fired, 1); for (size_t i = 0; i < object_ids.size(); i++) { - ASSERT_EQ(object_table.object_urls[object_ids[i]], urls[i]); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_EQ(owner_client->object_urls[object_ids[i]], urls[i]); + } else { + ASSERT_EQ(object_table.object_urls[object_ids[i]], urls[i]); + } } ASSERT_FALSE(worker_pool.io_worker_client->ReplySpillObjects(urls)); for (const auto &id : object_ids) { @@ -524,7 +568,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) { new RayObject(data_buffer, nullptr, std::vector())); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects)); + manager.PinObjects(object_ids, std::move(objects), owner_address); ASSERT_TRUE(manager.SpillObjectsOfSize(total_size / 2)); for (const auto &id : object_ids) { ASSERT_EQ((*unpins)[id], 0); @@ -541,13 +585,26 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) { // to evict. ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); for (size_t i = 0; i < urls.size(); i++) { - ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + } else { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + } } - ASSERT_EQ(object_table.object_urls.size(), object_ids.size() / 2 + 1); - for (auto &object_url : object_table.object_urls) { - auto it = std::find(urls.begin(), urls.end(), object_url.second); - ASSERT_TRUE(it != urls.end()); - ASSERT_EQ((*unpins)[object_url.first], 1); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_EQ(owner_client->object_urls.size(), object_ids.size() / 2 + 1); + for (auto &object_url : owner_client->object_urls) { + auto it = std::find(urls.begin(), urls.end(), object_url.second); + ASSERT_TRUE(it != urls.end()); + ASSERT_EQ((*unpins)[object_url.first], 1); + } + } else { + ASSERT_EQ(object_table.object_urls.size(), object_ids.size() / 2 + 1); + for (auto &object_url : object_table.object_urls) { + auto it = std::find(urls.begin(), urls.end(), object_url.second); + ASSERT_TRUE(it != urls.end()); + ASSERT_EQ((*unpins)[object_url.first], 1); + } } // Make sure providing 0 bytes to SpillObjectsOfSize will spill one object. @@ -556,13 +613,23 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) { EXPECT_CALL(worker_pool, PushSpillWorker(_)); const std::string url = BuildURL("url" + std::to_string(object_ids.size())); ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({url})); - ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); - ASSERT_EQ(object_table.object_urls.size(), 3); urls.push_back(url); - for (auto &object_url : object_table.object_urls) { - auto it = std::find(urls.begin(), urls.end(), object_url.second); - ASSERT_TRUE(it != urls.end()); - ASSERT_EQ((*unpins)[object_url.first], 1); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + ASSERT_EQ(owner_client->object_urls.size(), 3); + for (auto &object_url : owner_client->object_urls) { + auto it = std::find(urls.begin(), urls.end(), object_url.second); + ASSERT_TRUE(it != urls.end()); + ASSERT_EQ((*unpins)[object_url.first], 1); + } + } else { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + ASSERT_EQ(object_table.object_urls.size(), 3); + for (auto &object_url : object_table.object_urls) { + auto it = std::find(urls.begin(), urls.end(), object_url.second); + ASSERT_TRUE(it != urls.end()); + ASSERT_EQ((*unpins)[object_url.first], 1); + } } // Since there's no more object to spill, this should fail. @@ -587,7 +654,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectNotEvictable) { new RayObject(data_buffer, nullptr, std::vector())); objects.push_back(std::move(object)); - manager.PinObjects(object_ids, std::move(objects)); + manager.PinObjects(object_ids, std::move(objects), owner_address); ASSERT_FALSE(manager.SpillObjectsOfSize(1000)); for (const auto &id : object_ids) { ASSERT_EQ((*unpins)[id], 0); @@ -616,7 +683,7 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) { new RayObject(data_buffer, nullptr, std::vector())); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects)); + manager.PinObjects(object_ids, std::move(objects), owner_address); // This will spill until 2 workers are occupied. manager.SpillObjectUptoMaxThroughput(); @@ -633,12 +700,23 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) { std::vector urls; urls.push_back(BuildURL("url" + std::to_string(0))); ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({urls[0]})); - ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); - // Make sure object is spilled. - ASSERT_EQ(object_table.object_urls.size(), 1); - for (auto &object_url : object_table.object_urls) { - if (urls[0] == object_url.second) { - ASSERT_EQ((*unpins)[object_url.first], 1); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + // Make sure object is spilled. + ASSERT_EQ(owner_client->object_urls.size(), 1); + for (auto &object_url : owner_client->object_urls) { + if (urls[0] == object_url.second) { + ASSERT_EQ((*unpins)[object_url.first], 1); + } + } + } else { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + // Make sure object is spilled. + ASSERT_EQ(object_table.object_urls.size(), 1); + for (auto &object_url : object_table.object_urls) { + if (urls[0] == object_url.second) { + ASSERT_EQ((*unpins)[object_url.first], 1); + } } } @@ -656,13 +734,26 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) { } for (size_t i = 1; i < urls.size(); i++) { ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({urls[i]})); - ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + } else { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + } } - ASSERT_EQ(object_table.object_urls.size(), 3); - for (auto &object_url : object_table.object_urls) { - auto it = std::find(urls.begin(), urls.end(), object_url.second); - ASSERT_TRUE(it != urls.end()); - ASSERT_EQ((*unpins)[object_url.first], 1); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_EQ(owner_client->object_urls.size(), 3); + for (auto &object_url : owner_client->object_urls) { + auto it = std::find(urls.begin(), urls.end(), object_url.second); + ASSERT_TRUE(it != urls.end()); + ASSERT_EQ((*unpins)[object_url.first], 1); + } + } else { + ASSERT_EQ(object_table.object_urls.size(), 3); + for (auto &object_url : object_table.object_urls) { + auto it = std::find(urls.begin(), urls.end(), object_url.second); + ASSERT_TRUE(it != urls.end()); + ASSERT_EQ((*unpins)[object_url.first], 1); + } } // We cannot spill anymore as there is no more pinned object. @@ -683,7 +774,7 @@ TEST_F(LocalObjectManagerTest, TestSpillError) { std::vector> objects; objects.push_back(std::move(object)); - manager.PinObjects({object_id}, std::move(objects)); + manager.PinObjects({object_id}, std::move(objects), owner_address); int num_times_fired = 0; manager.SpillObjects({object_id}, [&](const Status &status) mutable { @@ -695,7 +786,11 @@ TEST_F(LocalObjectManagerTest, TestSpillError) { EXPECT_CALL(worker_pool, PushSpillWorker(_)); ASSERT_TRUE( worker_pool.io_worker_client->ReplySpillObjects({}, Status::IOError("error"))); - ASSERT_FALSE(object_table.ReplyAsyncAddSpilledUrl()); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_FALSE(owner_client->ReplyAddSpilledUrl()); + } else { + ASSERT_FALSE(object_table.ReplyAsyncAddSpilledUrl()); + } ASSERT_EQ(num_times_fired, 1); ASSERT_EQ((*unpins)[object_id], 0); @@ -707,9 +802,14 @@ TEST_F(LocalObjectManagerTest, TestSpillError) { std::string url = BuildURL("url"); EXPECT_CALL(worker_pool, PushSpillWorker(_)); ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({url})); - ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + ASSERT_EQ(owner_client->object_urls[object_id], url); + } else { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + ASSERT_EQ(object_table.object_urls[object_id], url); + } ASSERT_EQ(num_times_fired, 2); - ASSERT_EQ(object_table.object_urls[object_id], url); ASSERT_EQ((*unpins)[object_id], 1); } @@ -729,7 +829,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteNoSpilledObjects) { new RayObject(data_buffer, nullptr, std::vector())); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects)); + manager.PinObjects(object_ids, std::move(objects), owner_address); manager.WaitForObjectFree(owner_address, object_ids); for (size_t i = 0; i < free_objects_batch_size; i++) { @@ -757,7 +857,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpilledObjects) { new RayObject(data_buffer, nullptr, std::vector())); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects)); + manager.PinObjects(object_ids, std::move(objects), owner_address); manager.WaitForObjectFree(owner_address, object_ids); // 2 Objects are spilled out of 3. @@ -774,7 +874,11 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpilledObjects) { } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); for (size_t i = 0; i < object_ids_to_spill.size(); i++) { - ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + } else { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + } } // All objects are out of scope now. @@ -805,7 +909,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCount) { new RayObject(data_buffer, nullptr, std::vector())); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects)); + manager.PinObjects(object_ids, std::move(objects), owner_address); manager.WaitForObjectFree(owner_address, object_ids); // Every object is spilled. @@ -826,7 +930,11 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCount) { } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); for (size_t i = 0; i < object_ids_to_spill.size(); i++) { - ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + } else { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + } } // Everything is evicted except the last object. In this case, ref count is still > 0. @@ -862,7 +970,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) { new RayObject(data_buffer, nullptr, std::vector())); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects)); + manager.PinObjects(object_ids, std::move(objects), owner_address); manager.WaitForObjectFree(owner_address, object_ids); // Objects are spilled. @@ -881,7 +989,11 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) { } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); for (size_t i = 0; i < 1; i++) { - ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + } else { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + } } // Every object has gone out of scope. for (size_t i = 0; i < free_objects_batch_size; i++) { @@ -900,7 +1012,11 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) { new_urls.push_back(BuildURL("url" + std::to_string(i))); } for (size_t i = 1; i < object_ids_to_spill.size(); i++) { - ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + } else { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + } } // Every object is now deleted. @@ -925,7 +1041,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteMaxObjects) { new RayObject(data_buffer, nullptr, std::vector())); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects)); + manager.PinObjects(object_ids, std::move(objects), owner_address); manager.WaitForObjectFree(owner_address, object_ids); std::vector object_ids_to_spill; @@ -943,7 +1059,11 @@ TEST_F(LocalObjectManagerTest, TestDeleteMaxObjects) { } ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls)); for (size_t i = 0; i < object_ids_to_spill.size(); i++) { - ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + if (RayConfig::instance().ownership_based_object_directory_enabled()) { + ASSERT_TRUE(owner_client->ReplyAddSpilledUrl()); + } else { + ASSERT_TRUE(object_table.ReplyAsyncAddSpilledUrl()); + } } // Every reference has gone out of scope. diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index a014a1776..8f2796581 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -186,6 +186,9 @@ class CoreWorkerClientInterface { const DeleteSpilledObjectsRequest &request, const ClientCallback &callback) {} + virtual void AddSpilledUrl(const AddSpilledUrlRequest &request, + const ClientCallback &callback) {} + virtual void PlasmaObjectReady(const PlasmaObjectReadyRequest &request, const ClientCallback &callback) { } @@ -251,6 +254,8 @@ class CoreWorkerClient : public std::enable_shared_from_this, VOID_RPC_CLIENT_METHOD(CoreWorkerService, DeleteSpilledObjects, grpc_client_, override) + VOID_RPC_CLIENT_METHOD(CoreWorkerService, AddSpilledUrl, grpc_client_, override) + VOID_RPC_CLIENT_METHOD(CoreWorkerService, PlasmaObjectReady, grpc_client_, override) VOID_RPC_CLIENT_METHOD(CoreWorkerService, Exit, grpc_client_, override) diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index 8f9d236e0..37c01cf48 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -44,6 +44,7 @@ namespace rpc { RPC_SERVICE_HANDLER(CoreWorkerService, SpillObjects) \ RPC_SERVICE_HANDLER(CoreWorkerService, RestoreSpilledObjects) \ RPC_SERVICE_HANDLER(CoreWorkerService, DeleteSpilledObjects) \ + RPC_SERVICE_HANDLER(CoreWorkerService, AddSpilledUrl) \ RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady) \ RPC_SERVICE_HANDLER(CoreWorkerService, Exit) @@ -65,6 +66,7 @@ namespace rpc { DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(SpillObjects) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(RestoreSpilledObjects) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(DeleteSpilledObjects) \ + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AddSpilledUrl) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PlasmaObjectReady) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(Exit)