mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 22:08:16 +08:00
[Object Spilling] Skip normal ray.get path when spilling objects. (#13831)
This commit is contained in:
@@ -898,6 +898,18 @@ cdef class CoreWorker:
|
||||
|
||||
return RayObjectsToDataMetadataPairs(results)
|
||||
|
||||
def get_if_local(self, object_refs):
|
||||
"""Get objects from local plasma store directly
|
||||
without a fetch request to raylet."""
|
||||
cdef:
|
||||
c_vector[shared_ptr[CRayObject]] results
|
||||
c_vector[CObjectID] c_object_ids = ObjectRefsToVector(object_refs)
|
||||
with nogil:
|
||||
check_status(
|
||||
CCoreWorkerProcess.GetCoreWorker().GetIfLocal(
|
||||
c_object_ids, &results))
|
||||
return RayObjectsToDataMetadataPairs(results)
|
||||
|
||||
def object_exists(self, ObjectRef object_ref):
|
||||
cdef:
|
||||
c_bool has_object
|
||||
|
||||
@@ -82,11 +82,11 @@ class ExternalStorage(metaclass=abc.ABCMeta):
|
||||
|
||||
def _get_objects_from_store(self, object_refs):
|
||||
worker = ray.worker.global_worker
|
||||
ray_object_pairs = worker.core_worker.get_objects(
|
||||
object_refs,
|
||||
worker.current_task_id,
|
||||
timeout_ms=0,
|
||||
plasma_objects_only=True)
|
||||
# Since the object should always exist in the plasma store before
|
||||
# spilling, it can directly get the object from the local plasma
|
||||
# store.
|
||||
# issue: https://github.com/ray-project/ray/pull/13831
|
||||
ray_object_pairs = worker.core_worker.get_if_local(object_refs)
|
||||
return ray_object_pairs
|
||||
|
||||
def _put_object_to_store(self, metadata, data_size, file_like, object_ref):
|
||||
|
||||
@@ -183,6 +183,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
|
||||
CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms,
|
||||
c_vector[shared_ptr[CRayObject]] *results,
|
||||
c_bool plasma_objects_only)
|
||||
CRayStatus GetIfLocal(
|
||||
const c_vector[CObjectID] &ids,
|
||||
c_vector[shared_ptr[CRayObject]] *results)
|
||||
CRayStatus Contains(const CObjectID &object_id, c_bool *has_object)
|
||||
CRayStatus Wait(const c_vector[CObjectID] &object_ids, int num_objects,
|
||||
int64_t timeout_ms, c_vector[c_bool] *results,
|
||||
|
||||
Reference in New Issue
Block a user