diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd index e9db97ba5..7fc0a991b 100644 --- a/python/ray/_raylet.pxd +++ b/python/ray/_raylet.pxd @@ -60,8 +60,8 @@ cdef class CoreWorker: cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata, size_t data_size, ObjectID object_id, + c_vector[CObjectID] contained_ids, CObjectID *c_object_id, shared_ptr[CBuffer] *data) - # TODO: handle noreturn better cdef store_task_outputs( self, worker, outputs, const c_vector[CObjectID] return_ids, c_vector[shared_ptr[CRayObject]] *returns) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 0c4d7f01e..4c4fe7519 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -287,7 +287,13 @@ cdef void prepare_args( else: serialized_arg = worker.get_serialization_context().serialize(arg) size = serialized_arg.total_bytes - if size <= put_threshold: + + # TODO(edoakes): any objects containing ObjectIDs are spilled to + # plasma here. This is inefficient for small objects, but inlined + # arguments aren't associated ObjectIDs right now so this is a + # simple fix for reference counting purposes. + if (size <= put_threshold and + len(serialized_arg.contained_object_ids) == 0): arg_data = dynamic_pointer_cast[CBuffer, LocalMemoryBuffer]( make_shared[LocalMemoryBuffer](size)) write_serialized_object(serialized_arg, arg_data) @@ -645,6 +651,7 @@ cdef class CoreWorker: cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata, size_t data_size, ObjectID object_id, + c_vector[CObjectID] contained_ids, CObjectID *c_object_id, shared_ptr[CBuffer] *data): delay = ray_constants.DEFAULT_PUT_OBJECT_DELAY for attempt in reversed( @@ -653,13 +660,14 @@ cdef class CoreWorker: if object_id is None: with nogil: check_status(self.core_worker.get().Create( - metadata, data_size, + metadata, data_size, contained_ids, c_object_id, data)) else: c_object_id[0] = object_id.native() with nogil: check_status(self.core_worker.get().Create( - metadata, data_size, c_object_id[0], data)) + metadata, data_size, contained_ids, + c_object_id[0], data)) break except ObjectStoreFullError as e: if attempt: @@ -685,22 +693,22 @@ cdef class CoreWorker: CObjectID c_object_id shared_ptr[CBuffer] data shared_ptr[CBuffer] metadata - # The object won't be pinned if an ObjectID is provided by the - # user (because we can't track its lifetime to unpin). Note that - # the API to do this isn't supported as a public API. - c_bool owns_object = object_id is None metadata = string_to_buffer(serialized_object.metadata) total_bytes = serialized_object.total_bytes object_already_exists = self._create_put_buffer( metadata, total_bytes, object_id, + ObjectIDsToVector(serialized_object.contained_object_ids), &c_object_id, &data) + if not object_already_exists: write_serialized_object(serialized_object, data) with nogil: + # Using custom object IDs is not supported because we can't + # track their lifecycle, so don't pin the object in that case. check_status( self.core_worker.get().Seal( - c_object_id, owns_object, pin_object)) + c_object_id, pin_object and object_id is None)) return ObjectID(c_object_id.Binary()) @@ -942,6 +950,7 @@ cdef class CoreWorker: cdef: c_vector[size_t] data_sizes c_vector[shared_ptr[CBuffer]] metadatas + c_vector[c_vector[CObjectID]] contained_ids if return_ids.size() == 0: return @@ -963,9 +972,11 @@ cdef class CoreWorker: metadatas.push_back( string_to_buffer(serialized_object.metadata)) serialized_objects.append(serialized_object) + contained_ids.push_back( + ObjectIDsToVector(serialized_object.contained_object_ids)) check_status(self.core_worker.get().AllocateReturnObjects( - return_ids, data_sizes, metadatas, returns)) + return_ids, data_sizes, metadatas, contained_ids, returns)) for i, serialized_object in enumerate(serialized_objects): # A nullptr is returned if the object already exists. diff --git a/python/ray/cloudpickle/cloudpickle_fast.py b/python/ray/cloudpickle/cloudpickle_fast.py index 465c7d215..a0a5a44f6 100644 --- a/python/ray/cloudpickle/cloudpickle_fast.py +++ b/python/ray/cloudpickle/cloudpickle_fast.py @@ -52,7 +52,8 @@ def dump(obj, file, protocol=None, buffer_callback=None): Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure compatibility with older versions of Python. """ - CloudPickler(file, protocol=protocol, buffer_callback=buffer_callback).dump(obj) + CloudPickler(file, protocol=protocol, + buffer_callback=buffer_callback).dump(obj) def dumps(obj, protocol=None, buffer_callback=None): @@ -66,7 +67,8 @@ def dumps(obj, protocol=None, buffer_callback=None): compatibility with older versions of Python. """ with io.BytesIO() as file: - cp = CloudPickler(file, protocol=protocol, buffer_callback=buffer_callback) + cp = CloudPickler(file, protocol=protocol, + buffer_callback=buffer_callback) cp.dump(obj) return file.getvalue() @@ -79,9 +81,9 @@ def _class_getnewargs(obj): if hasattr(obj, "__slots__"): type_kwargs["__slots__"] = obj.__slots__ - __dict__ = obj.__dict__.get('__dict__', None) + __dict__ = obj.__dict__.get("__dict__", None) if isinstance(__dict__, property): - type_kwargs['__dict__'] = __dict__ + type_kwargs["__dict__"] = __dict__ return (type(obj), obj.__name__, obj.__bases__, type_kwargs, _ensure_tracking(obj), None) @@ -141,7 +143,7 @@ def _function_getstate(func): def _class_getstate(obj): clsdict = _extract_class_dict(obj) - clsdict.pop('__weakref__', None) + clsdict.pop("__weakref__", None) # For ABCMeta in python3.7+, remove _abc_impl as it is not picklable. # This is a fix which breaks the cache but this only makes the first @@ -160,7 +162,7 @@ def _class_getstate(obj): for k in obj.__slots__: clsdict.pop(k, None) - clsdict.pop('__dict__', None) # unpicklable property object + clsdict.pop("__dict__", None) # unpicklable property object return (clsdict, {}) @@ -428,10 +430,10 @@ def _numpy_ndarray_reduce(array): # the PickleBuffer instance will hold a view on the transpose # of the initial array, that is C-contiguous. if not array.flags.c_contiguous and array.flags.f_contiguous: - order = 'F' + order = "F" picklebuf_args = array.transpose() else: - order = 'C' + order = "C" picklebuf_args = array try: buffer = picklebuf_class(picklebuf_args) @@ -485,7 +487,8 @@ class CloudPickler(Pickler): def __init__(self, file, protocol=None, buffer_callback=None): if protocol is None: protocol = DEFAULT_PROTOCOL - Pickler.__init__(self, file, protocol=protocol, buffer_callback=buffer_callback) + Pickler.__init__(self, file, protocol=protocol, + buffer_callback=buffer_callback) # map functions __globals__ attribute ids, to ensure that functions # sharing the same global namespace at pickling time also share their # global namespace at unpickling time. @@ -531,8 +534,9 @@ class CloudPickler(Pickler): # This is a patch for python3.5 if isinstance(obj, numpy.ndarray): if (self.proto < 5 or - (not obj.flags.c_contiguous and not obj.flags.f_contiguous) or - obj.dtype == 'O' or obj.itemsize == 0): + (not obj.flags.c_contiguous and + not obj.flags.f_contiguous) or + obj.dtype == "O" or obj.itemsize == 0): return NotImplemented return _numpy_ndarray_reduce(obj) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 540624483..e1df8e4c5 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -106,6 +106,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const c_vector[CObjectID] &object_ids, const c_vector[size_t] &data_sizes, const c_vector[shared_ptr[CBuffer]] &metadatas, + const c_vector[c_vector[CObjectID]] &contained_object_ids, c_vector[shared_ptr[CRayObject]] *return_objects) CJobID GetCurrentJobId() @@ -129,16 +130,22 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CAddress &owner_address) CRayStatus SetClientOptions(c_string client_name, int64_t limit) - CRayStatus Put(const CRayObject &object, CObjectID *object_id) - CRayStatus Put(const CRayObject &object, const CObjectID &object_id) + CRayStatus Put(const CRayObject &object, + const c_vector[CObjectID] &contained_object_ids, + CObjectID *object_id) + CRayStatus Put(const CRayObject &object, + const c_vector[CObjectID] &contained_object_ids, + const CObjectID &object_id) CRayStatus Create(const shared_ptr[CBuffer] &metadata, const size_t data_size, + const c_vector[CObjectID] &contained_object_ids, CObjectID *object_id, shared_ptr[CBuffer] *data) CRayStatus Create(const shared_ptr[CBuffer] &metadata, - const size_t data_size, const CObjectID &object_id, + const size_t data_size, + const c_vector[CObjectID] &contained_object_ids, + const CObjectID &object_id, shared_ptr[CBuffer] *data) - CRayStatus Seal(const CObjectID &object_id, c_bool owns_object, - c_bool pin_object) + CRayStatus Seal(const CObjectID &object_id, c_bool pin_object) CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms, c_vector[shared_ptr[CRayObject]] *results) CRayStatus Contains(const CObjectID &object_id, c_bool *has_object) diff --git a/python/ray/serialization.py b/python/ray/serialization.py index 37f4e9589..5a9e056d8 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -1,6 +1,7 @@ import hashlib import logging import time +import threading import pyarrow.plasma as plasma @@ -34,8 +35,9 @@ class DeserializationError(Exception): class SerializedObject: - def __init__(self, metadata): + def __init__(self, metadata, contained_object_ids=None): self._metadata = metadata + self._contained_object_ids = contained_object_ids or [] @property def total_bytes(self): @@ -45,11 +47,15 @@ class SerializedObject: def metadata(self): return self._metadata + @property + def contained_object_ids(self): + return self._contained_object_ids + class Pickle5SerializedObject(SerializedObject): - def __init__(self, inband, writer): - super(Pickle5SerializedObject, - self).__init__(ray_constants.PICKLE5_BUFFER_METADATA) + def __init__(self, inband, writer, contained_object_ids): + super(Pickle5SerializedObject, self).__init__( + ray_constants.PICKLE5_BUFFER_METADATA, contained_object_ids) self.inband = inband self.writer = writer # cached total bytes @@ -126,6 +132,7 @@ class SerializationContext: self.worker = worker assert worker.use_pickle self.use_pickle = worker.use_pickle + self._thread_local = threading.local() def actor_handle_serializer(obj): return obj._serialization_helper(True) @@ -147,6 +154,7 @@ class SerializationContext: return serialized_obj[0](*serialized_obj[1]) def object_id_serializer(obj): + self.add_contained_object_id(obj) owner_id = "" owner_address = "" if obj.is_direct_call_type(): @@ -192,6 +200,21 @@ class SerializationContext: # construct a reducer pickle.CloudPickler.dispatch[cls] = _CloudPicklerReducer + def get_and_clear_contained_object_ids(self): + if not hasattr(self._thread_local, "object_ids"): + self._thread_local.object_ids = set() + return set() + + object_ids = self._thread_local.object_ids + self._thread_local.object_ids = set() + return object_ids + + def add_contained_object_id(self, object_id): + if not hasattr(self._thread_local, "object_ids"): + self._thread_local.object_ids = set() + + self._thread_local.object_ids.add(object_id) + def _deserialize_object(self, data, metadata, object_id): if metadata: if metadata == ray_constants.PICKLE5_BUFFER_METADATA: @@ -291,7 +314,8 @@ class SerializationContext: writer = Pickle5Writer() inband = pickle.dumps( value, protocol=5, buffer_callback=writer.buffer_callback) - return Pickle5SerializedObject(inband, writer) + return Pickle5SerializedObject( + inband, writer, self.get_and_clear_contained_object_ids()) def register_custom_serializer(self, cls, diff --git a/python/ray/worker.py b/python/ray/worker.py index 0dc1b6294..94e14e8a8 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -293,13 +293,9 @@ class Worker: should_warn_of_slow_puts = False return result - def deserialize_objects(self, - data_metadata_pairs, - object_ids, - error_timeout=10): + def deserialize_objects(self, data_metadata_pairs, object_ids): context = self.get_serialization_context() - return context.deserialize_objects(data_metadata_pairs, object_ids, - error_timeout) + return context.deserialize_objects(data_metadata_pairs, object_ids) def get_objects(self, object_ids, timeout=None): """Get the values in the object store associated with the IDs. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 46c5fb0ef..71b23358d 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -366,45 +366,55 @@ Status CoreWorker::SetClientOptions(std::string name, int64_t limit_bytes) { return plasma_store_provider_->SetClientOptions(name, limit_bytes); } -Status CoreWorker::Put(const RayObject &object, ObjectID *object_id) { +Status CoreWorker::Put(const RayObject &object, + const std::vector &contained_object_ids, + ObjectID *object_id) { *object_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(), worker_context_.GetNextPutIndex(), static_cast(TaskTransportType::RAYLET)); reference_counter_->AddOwnedObject(*object_id, GetCallerId(), rpc_address_); - RAY_RETURN_NOT_OK(Put(object, *object_id)); + RAY_RETURN_NOT_OK(Put(object, contained_object_ids, *object_id)); // Tell the raylet to pin the object **after** it is created. RAY_CHECK_OK(local_raylet_client_->PinObjectIDs(rpc_address_, {*object_id})); return Status::OK(); } -Status CoreWorker::Put(const RayObject &object, const ObjectID &object_id) { +Status CoreWorker::Put(const RayObject &object, + const std::vector &contained_object_ids, + const ObjectID &object_id) { RAY_CHECK(object_id.GetTransportType() == static_cast(TaskTransportType::RAYLET)) << "Invalid transport type flag in object ID: " << object_id.GetTransportType(); + // TODO(edoakes,swang): add contained object IDs to the reference counter. return plasma_store_provider_->Put(object, object_id); } Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t data_size, + const std::vector &contained_object_ids, ObjectID *object_id, std::shared_ptr *data) { *object_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(), worker_context_.GetNextPutIndex(), static_cast(TaskTransportType::RAYLET)); - return Create(metadata, data_size, *object_id, data); + RAY_RETURN_NOT_OK(Create(metadata, data_size, contained_object_ids, *object_id, data)); + // Only add the object to the reference counter if it didn't already exist. + if (data) { + reference_counter_->AddOwnedObject(*object_id, GetCallerId(), rpc_address_); + } + return Status::OK(); } Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t data_size, + const std::vector &contained_object_ids, const ObjectID &object_id, std::shared_ptr *data) { + // TODO(edoakes,swang): add contained object IDs to the reference counter. return plasma_store_provider_->Create(metadata, data_size, object_id, data); } -Status CoreWorker::Seal(const ObjectID &object_id, bool owns_object, bool pin_object) { +Status CoreWorker::Seal(const ObjectID &object_id, bool pin_object) { RAY_RETURN_NOT_OK(plasma_store_provider_->Seal(object_id)); - if (owns_object) { - reference_counter_->AddOwnedObject(object_id, GetCallerId(), rpc_address_); - if (pin_object) { - // Tell the raylet to pin the object **after** it is created. - RAY_CHECK_OK(local_raylet_client_->PinObjectIDs(rpc_address_, {object_id})); - } + if (pin_object) { + // Tell the raylet to pin the object **after** it is created. + RAY_CHECK_OK(local_raylet_client_->PinObjectIDs(rpc_address_, {object_id})); } return Status::OK(); } @@ -868,6 +878,7 @@ void CoreWorker::StartExecutingTasks() { task_execution_service_.run(); } Status CoreWorker::AllocateReturnObjects( const std::vector &object_ids, const std::vector &data_sizes, const std::vector> &metadatas, + const std::vector> &contained_object_ids, std::vector> *return_objects) { RAY_CHECK(object_ids.size() == metadatas.size()); RAY_CHECK(object_ids.size() == data_sizes.size()); @@ -879,11 +890,12 @@ Status CoreWorker::AllocateReturnObjects( if (data_sizes[i] > 0) { if (worker_context_.CurrentTaskIsDirectCall() && static_cast(data_sizes[i]) < - RayConfig::instance().max_direct_call_object_size()) { + RayConfig::instance().max_direct_call_object_size() && + contained_object_ids[i].empty()) { data_buffer = std::make_shared(data_sizes[i]); } else { - RAY_RETURN_NOT_OK( - Create(metadatas[i], data_sizes[i], object_ids[i], &data_buffer)); + RAY_RETURN_NOT_OK(Create(metadatas[i], data_sizes[i], contained_object_ids[i], + object_ids[i], &data_buffer)); object_already_exists = !data_buffer; } } @@ -953,12 +965,12 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, continue; } if (return_objects->at(i)->GetData()->IsPlasmaBuffer()) { - if (!Seal(return_ids[i], /*owns_object=*/false, /*pin_object=*/false).ok()) { + if (!Seal(return_ids[i], /*pin_object=*/false).ok()) { RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to seal object " << return_ids[i] << " in store: " << status.message(); } } else if (!worker_context_.CurrentTaskIsDirectCall()) { - if (!Put(*return_objects->at(i), return_ids[i]).ok()) { + if (!Put(*return_objects->at(i), {}, return_ids[i]).ok()) { RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to put object " << return_ids[i] << " in store: " << status.message(); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 04c4fd746..5428bf08c 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -177,16 +177,20 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Put an object into object store. /// /// \param[in] object The ray object. + /// \param[in] contained_object_ids The IDs serialized in this object. /// \param[out] object_id Generated ID of the object. /// \return Status. - Status Put(const RayObject &object, ObjectID *object_id); + Status Put(const RayObject &object, const std::vector &contained_object_ids, + ObjectID *object_id); /// Put an object with specified ID into object store. /// /// \param[in] object The ray object. + /// \param[in] contained_object_ids The IDs serialized in this object. /// \param[in] object_id Object ID specified by the user. /// \return Status. - Status Put(const RayObject &object, const ObjectID &object_id); + Status Put(const RayObject &object, const std::vector &contained_object_ids, + const ObjectID &object_id); /// Create and return a buffer in the object store that can be directly written /// into. After writing to the buffer, the caller must call `Seal()` to finalize @@ -195,11 +199,13 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// /// \param[in] metadata Metadata of the object to be written. /// \param[in] data_size Size of the object to be written. + /// \param[in] contained_object_ids The IDs serialized in this object. /// \param[out] object_id Object ID generated for the put. /// \param[out] data Buffer for the user to write the object into. /// \return Status. Status Create(const std::shared_ptr &metadata, const size_t data_size, - ObjectID *object_id, std::shared_ptr *data); + const std::vector &contained_object_ids, ObjectID *object_id, + std::shared_ptr *data); /// Create and return a buffer in the object store that can be directly written /// into. After writing to the buffer, the caller must call `Seal()` to finalize @@ -208,24 +214,21 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// /// \param[in] metadata Metadata of the object to be written. /// \param[in] data_size Size of the object to be written. + /// \param[in] contained_object_ids The IDs serialized in this object. /// \param[in] object_id Object ID specified by the user. /// \param[out] data Buffer for the user to write the object into. /// \return Status. Status Create(const std::shared_ptr &metadata, const size_t data_size, + const std::vector &contained_object_ids, const ObjectID &object_id, std::shared_ptr *data); /// Finalize placing an object into the object store. This should be called after /// a corresponding `Create()` call and then writing into the returned buffer. /// /// \param[in] object_id Object ID corresponding to the object. - /// \param[in] owns_object Whether or not this worker owns the object. If true, - /// the object will be added as owned to the reference counter as an - /// owned object and this worker will be responsible for managing its - /// lifetime. - /// \param[in] pin_object Whether or not to pin the object at the local raylet. This - /// only applies when owns_object is true. + /// \param[in] pin_object Whether or not to pin the object at the local raylet. /// \return Status. - Status Seal(const ObjectID &object_id, bool owns_object, bool pin_object); + Status Seal(const ObjectID &object_id, bool pin_object); /// Get a list of objects from the object store. Objects that failed to be retrieved /// will be returned as nullptrs. @@ -409,12 +412,14 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] object_ids Object IDs of the return values. /// \param[in] data_sizes Sizes of the return values. /// \param[in] metadatas Metadata buffers of the return values. + /// \param[in] contained_object_ids IDs serialized within each return object. /// \param[out] return_objects RayObjects containing buffers to write results into. /// \return Status. - Status AllocateReturnObjects(const std::vector &object_ids, - const std::vector &data_sizes, - const std::vector> &metadatas, - std::vector> *return_objects); + Status AllocateReturnObjects( + const std::vector &object_ids, const std::vector &data_sizes, + const std::vector> &metadatas, + const std::vector> &contained_object_ids, + std::vector> *return_objects); /// Get a handle to an actor. /// diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_object_NativeObjectStore.cc b/src/ray/core_worker/lib/java/org_ray_runtime_object_NativeObjectStore.cc index 9bcde92ab..4182b90f0 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_object_NativeObjectStore.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_object_NativeObjectStore.cc @@ -16,7 +16,7 @@ Java_org_ray_runtime_object_NativeObjectStore_nativePut__JLorg_ray_runtime_objec RAY_CHECK(ray_object != nullptr); ray::ObjectID object_id; auto status = reinterpret_cast(nativeCoreWorkerPointer) - ->Put(*ray_object, &object_id); + ->Put(*ray_object, {}, &object_id); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); return IdToJavaByteArray(env, object_id); } @@ -29,7 +29,7 @@ Java_org_ray_runtime_object_NativeObjectStore_nativePut__J_3BLorg_ray_runtime_ob auto ray_object = JavaNativeRayObjectToNativeRayObject(env, obj); RAY_CHECK(ray_object != nullptr); auto status = reinterpret_cast(nativeCoreWorkerPointer) - ->Put(*ray_object, object_id); + ->Put(*ray_object, {}, object_id); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0); } diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 079e441c2..165264cc0 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -279,7 +279,7 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map &res auto buffer2 = GenerateRandomBuffer(); ObjectID object_id; - RAY_CHECK_OK(driver.Put(RayObject(buffer2, nullptr), &object_id)); + RAY_CHECK_OK(driver.Put(RayObject(buffer2, nullptr), {}, &object_id)); std::vector args; args.emplace_back( @@ -367,7 +367,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso auto buffer2 = std::make_shared(array2, sizeof(array2)); ObjectID object_id; - RAY_CHECK_OK(driver.Put(RayObject(buffer1, nullptr), &object_id)); + RAY_CHECK_OK(driver.Put(RayObject(buffer1, nullptr), {}, &object_id)); // Create arguments with PassByRef and PassByValue. std::vector args; @@ -836,7 +836,7 @@ TEST_F(SingleNodeTest, TestObjectInterface) { std::vector ids(buffers.size()); for (size_t i = 0; i < ids.size(); i++) { - RAY_CHECK_OK(core_worker.Put(buffers[i], &ids[i])); + RAY_CHECK_OK(core_worker.Put(buffers[i], {}, &ids[i])); } // Test Get(). @@ -859,7 +859,8 @@ TEST_F(SingleNodeTest, TestObjectInterface) { nullptr, std::make_shared( reinterpret_cast(error_buffer), len)); - RAY_CHECK_OK(core_worker.Put(buffers_with_exception.back(), ids_with_exception.back())); + RAY_CHECK_OK( + core_worker.Put(buffers_with_exception.back(), {}, ids_with_exception.back())); RAY_CHECK_OK(core_worker.Get(ids_with_exception, -1, &results)); // Test Wait(). @@ -909,7 +910,7 @@ TEST_F(TwoNodeTest, TestObjectInterfaceCrossNodes) { std::vector ids(buffers.size()); for (size_t i = 0; i < ids.size(); i++) { - RAY_CHECK_OK(worker1.Put(RayObject(buffers[i], nullptr), &ids[i])); + RAY_CHECK_OK(worker1.Put(RayObject(buffers[i], nullptr), {}, &ids[i])); } // Test Get() from remote node.