diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index dc9fceaca..3d2b9ea73 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -898,6 +898,18 @@ cdef class CoreWorker: return RayObjectsToDataMetadataPairs(results) + def get_if_local(self, object_refs): + """Get objects from local plasma store directly + without a fetch request to raylet.""" + cdef: + c_vector[shared_ptr[CRayObject]] results + c_vector[CObjectID] c_object_ids = ObjectRefsToVector(object_refs) + with nogil: + check_status( + CCoreWorkerProcess.GetCoreWorker().GetIfLocal( + c_object_ids, &results)) + return RayObjectsToDataMetadataPairs(results) + def object_exists(self, ObjectRef object_ref): cdef: c_bool has_object diff --git a/python/ray/external_storage.py b/python/ray/external_storage.py index f764e9c0f..26d5c4a4d 100644 --- a/python/ray/external_storage.py +++ b/python/ray/external_storage.py @@ -82,11 +82,11 @@ class ExternalStorage(metaclass=abc.ABCMeta): def _get_objects_from_store(self, object_refs): worker = ray.worker.global_worker - ray_object_pairs = worker.core_worker.get_objects( - object_refs, - worker.current_task_id, - timeout_ms=0, - plasma_objects_only=True) + # Since the object should always exist in the plasma store before + # spilling, it can directly get the object from the local plasma + # store. + # issue: https://github.com/ray-project/ray/pull/13831 + ray_object_pairs = worker.core_worker.get_if_local(object_refs) return ray_object_pairs def _put_object_to_store(self, metadata, data_size, file_like, object_ref): diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index f1acad1fa..0b7c3b0f5 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -183,6 +183,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms, c_vector[shared_ptr[CRayObject]] *results, c_bool plasma_objects_only) + CRayStatus GetIfLocal( + const c_vector[CObjectID] &ids, + c_vector[shared_ptr[CRayObject]] *results) CRayStatus Contains(const CObjectID &object_id, c_bool *has_object) CRayStatus Wait(const c_vector[CObjectID] &object_ids, int num_objects, int64_t timeout_ms, c_vector[c_bool] *results, diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 2f5dcc57e..1961406d8 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1058,6 +1058,23 @@ Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_m return Status::OK(); } +Status CoreWorker::GetIfLocal(const std::vector &ids, + std::vector> *results) { + results->resize(ids.size(), nullptr); + + absl::flat_hash_map> result_map; + RAY_RETURN_NOT_OK(plasma_store_provider_->GetIfLocal(ids, &result_map)); + for (size_t i = 0; i < ids.size(); i++) { + auto pair = result_map.find(ids[i]); + // The caller of this method should guarantee that the object exists in the plasma + // store when this method is called. + RAY_CHECK(pair != result_map.end()); + RAY_CHECK(pair->second != nullptr); + (*results)[i] = pair->second; + } + return Status::OK(); +} + Status CoreWorker::Contains(const ObjectID &object_id, bool *has_object) { bool found = false; bool in_plasma = false; diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 088ba346a..89331b5ce 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -555,6 +555,20 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { std::vector> *results, bool plasma_objects_only = false); + /// Get objects directly from the local plasma store, without waiting for the + /// objects to be fetched from another node. This should only be used + /// internally, never by user code. + /// NOTE: Caller of this method should guarantee that the object already exists in the + /// plasma store, thus it doesn't need to fetch from other nodes. + /// + /// \param[in] ids The IDs of the objects to get. + /// \param[out] results The results will be stored here. A nullptr will be + /// added for objects that were not in the local store. + /// \return Status OK if all objects were found. Returns ObjectNotFound error + /// if at least one object was not in the local store. + Status GetIfLocal(const std::vector &ids, + std::vector> *results); + /// Return whether or not the object store contains the given object. /// /// \param[in] object_id ID of the objects to check for. diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index a8f116287..b42c4b509 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -225,6 +225,38 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore( return Status::OK(); } +Status CoreWorkerPlasmaStoreProvider::GetIfLocal( + const std::vector &object_ids, + absl::flat_hash_map> *results) { + std::vector plasma_results; + { + std::lock_guard guard(store_client_mutex_); + RAY_RETURN_NOT_OK(store_client_.Get(object_ids, /*timeout_ms=*/0, &plasma_results)); + } + + for (size_t i = 0; i < object_ids.size(); i++) { + if (plasma_results[i].data != nullptr || plasma_results[i].metadata != nullptr) { + const auto &object_id = object_ids[i]; + std::shared_ptr data = nullptr; + std::shared_ptr metadata = nullptr; + if (plasma_results[i].data && plasma_results[i].data->Size()) { + // We track the set of active data buffers in active_buffers_. On destruction, + // the buffer entry will be removed from the set via callback. + data = std::make_shared(plasma_results[i].data, buffer_tracker_, + object_id); + buffer_tracker_->Record(object_id, data.get(), get_current_call_site_()); + } + if (plasma_results[i].metadata && plasma_results[i].metadata->Size()) { + metadata = plasma_results[i].metadata; + } + const auto result_object = + std::make_shared(data, metadata, std::vector()); + (*results)[object_id] = result_object; + } + } + return Status::OK(); +} + Status UnblockIfNeeded(const std::shared_ptr &client, const WorkerContext &ctx) { if (ctx.CurrentTaskIsDirectCall()) { diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 2282a09a9..e67c561b6 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -143,6 +143,18 @@ class CoreWorkerPlasmaStoreProvider { absl::flat_hash_map> *results, bool *got_exception); + /// Get objects directly from the local plasma store, without waiting for the + /// objects to be fetched from another node. This should only be used + /// internally, never by user code. + /// + /// \param[in] ids The IDs of the objects to get. + /// \param[out] results The results will be stored here. A nullptr will be + /// added for objects that were not in the local store. + /// \return Status OK if the request to the local object store was + /// successful. + Status GetIfLocal(const std::vector &ids, + absl::flat_hash_map> *results); + Status Contains(const ObjectID &object_id, bool *has_object); Status Wait(const absl::flat_hash_set &object_ids, int num_objects,