From c1418b04df54795e5fe38973278e1cc6f26b33a0 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Mon, 28 Oct 2019 12:48:41 -0500 Subject: [PATCH] Remove CoreWorkerObjectInterface (#6023) --- python/ray/_raylet.pyx | 25 +- python/ray/includes/libcoreworker.pxd | 41 ++- src/ray/core_worker/common.h | 2 - src/ray/core_worker/core_worker.cc | 225 ++++++++++++- src/ray/core_worker/core_worker.h | 133 +++++++- src/ray/core_worker/lib/java/jni_utils.h | 1 - ...rg_ray_runtime_object_NativeObjectStore.cc | 15 +- src/ray/core_worker/object_interface.cc | 291 ---------------- src/ray/core_worker/object_interface.h | 186 ----------- .../memory_store/memory_store.cc | 1 - .../memory_store/memory_store.h | 1 - .../store_provider/memory_store_provider.cc | 35 +- .../store_provider/memory_store_provider.h | 23 +- .../store_provider/plasma_store_provider.cc | 12 +- .../store_provider/plasma_store_provider.h | 25 +- .../store_provider/store_provider.h | 110 ------ src/ray/core_worker/test/core_worker_test.cc | 314 +++++++++--------- src/ray/core_worker/test/mock_worker.cc | 1 - .../transport/direct_actor_transport.cc | 8 +- .../transport/direct_actor_transport.h | 11 +- .../core_worker/transport/raylet_transport.cc | 23 +- .../core_worker/transport/raylet_transport.h | 6 +- 22 files changed, 568 insertions(+), 921 deletions(-) delete mode 100644 src/ray/core_worker/object_interface.cc delete mode 100644 src/ray/core_worker/object_interface.h delete mode 100644 src/ray/core_worker/store_provider/store_provider.h diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 6a27f55ef..e7f30ec51 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -510,7 +510,8 @@ cdef execute_task( if not execution_info: function_descriptor = FunctionDescriptor.from_bytes_list( ray_function.GetFunctionDescriptor()) - execution_info = manager.get_execution_info(job_id, function_descriptor) + execution_info = manager.get_execution_info( + job_id, function_descriptor) execution_infos[descriptor] = execution_info function_name = execution_info.function_name @@ -696,7 +697,7 @@ cdef class CoreWorker: c_vector[CObjectID] c_object_ids = ObjectIDsToVector(object_ids) with nogil: - check_status(self.core_worker.get().Objects().Get( + check_status(self.core_worker.get().Get( c_object_ids, timeout_ms, &results)) return RayObjectsToDataMetadataPairs(results) @@ -707,7 +708,7 @@ cdef class CoreWorker: CObjectID c_object_id = object_id.native() with nogil: - check_status(self.core_worker.get().Objects().Contains( + check_status(self.core_worker.get().Contains( c_object_id, &has_object)) return has_object @@ -721,12 +722,12 @@ cdef class CoreWorker: try: if object_id is None: with nogil: - check_status(self.core_worker.get().Objects().Create( + check_status(self.core_worker.get().Create( metadata, data_size, c_object_id, data)) else: c_object_id[0] = object_id.native() with nogil: - check_status(self.core_worker.get().Objects().Create( + check_status(self.core_worker.get().Create( metadata, data_size, c_object_id[0], data)) break except ObjectStoreFullError as e: @@ -763,7 +764,7 @@ cdef class CoreWorker: with nogil: check_status( - self.core_worker.get().Objects().Seal(c_object_id)) + self.core_worker.get().Seal(c_object_id)) return ObjectID(c_object_id.Binary()) @@ -788,7 +789,7 @@ cdef class CoreWorker: with nogil: check_status( - self.core_worker.get().Objects().Seal(c_object_id)) + self.core_worker.get().Seal(c_object_id)) return ObjectID(c_object_id.Binary()) @@ -811,7 +812,7 @@ cdef class CoreWorker: writer.write_to(inband, data, memcopy_threads) with nogil: check_status( - self.core_worker.get().Objects().Seal(c_object_id)) + self.core_worker.get().Seal(c_object_id)) return ObjectID(c_object_id.Binary()) @@ -825,7 +826,7 @@ cdef class CoreWorker: wait_ids = ObjectIDsToVector(object_ids) with nogil: - check_status(self.core_worker.get().Objects().Wait( + check_status(self.core_worker.get().Wait( wait_ids, num_returns, timeout_ms, &results)) assert len(results) == len(object_ids) @@ -845,7 +846,7 @@ cdef class CoreWorker: c_vector[CObjectID] free_ids = ObjectIDsToVector(object_ids) with nogil: - check_status(self.core_worker.get().Objects().Delete( + check_status(self.core_worker.get().Delete( free_ids, local_only, delete_creating_tasks)) def set_object_store_client_options(self, client_name, @@ -853,7 +854,7 @@ cdef class CoreWorker: try: logger.debug("Setting plasma memory limit to {} for {}".format( limit_bytes, client_name)) - check_status(self.core_worker.get().Objects().SetClientOptions( + check_status(self.core_worker.get().SetClientOptions( client_name.encode("ascii"), limit_bytes)) except RayError as e: self.dump_object_store_memory_usage() @@ -866,7 +867,7 @@ cdef class CoreWorker: limit_bytes, client_name, e)) def dump_object_store_memory_usage(self): - message = self.core_worker.get().Objects().MemoryUsageString() + message = self.core_worker.get().MemoryUsageString() logger.warning("Local object store memory usage:\n{}\n".format( message.decode("utf-8"))) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 9b513ab36..106792282 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -48,27 +48,6 @@ cdef extern from "ray/core_worker/profiling.h" nogil: cdef cppclass CProfileEvent "ray::worker::ProfileEvent": void SetExtraData(const c_string &extra_data) -cdef extern from "ray/core_worker/object_interface.h" nogil: - cdef cppclass CObjectInterface "ray::CoreWorkerObjectInterface": - CRayStatus SetClientOptions(c_string client_name, int64_t limit) - CRayStatus Put(const CRayObject &object, CObjectID *object_id) - CRayStatus Put(const CRayObject &object, const CObjectID &object_id) - CRayStatus Create(const shared_ptr[CBuffer] &metadata, - const size_t data_size, CObjectID *object_id, - shared_ptr[CBuffer] *data) - CRayStatus Create(const shared_ptr[CBuffer] &metadata, - const size_t data_size, const CObjectID &object_id, - shared_ptr[CBuffer] *data) - CRayStatus Seal(const CObjectID &object_id) - CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms, - c_vector[shared_ptr[CRayObject]] *results) - CRayStatus Contains(const CObjectID &object_id, c_bool *has_object) - CRayStatus Wait(const c_vector[CObjectID] &object_ids, int num_objects, - int64_t timeout_ms, c_vector[c_bool] *results) - CRayStatus Delete(const c_vector[CObjectID] &object_ids, - c_bool local_only, c_bool delete_creating_tasks) - c_string MemoryUsageString() - cdef extern from "ray/core_worker/core_worker.h" nogil: cdef cppclass CCoreWorker "ray::CoreWorker": CCoreWorker(const CWorkerType worker_type, const CLanguage language, @@ -88,7 +67,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: void Disconnect() CWorkerType &GetWorkerType() CLanguage &GetLanguage() - CObjectInterface &Objects() void StartExecutingTasks() @@ -119,3 +97,22 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: *bytes) void AddActiveObjectID(const CObjectID &object_id) void RemoveActiveObjectID(const CObjectID &object_id) + + CRayStatus SetClientOptions(c_string client_name, int64_t limit) + CRayStatus Put(const CRayObject &object, CObjectID *object_id) + CRayStatus Put(const CRayObject &object, const CObjectID &object_id) + CRayStatus Create(const shared_ptr[CBuffer] &metadata, + const size_t data_size, CObjectID *object_id, + shared_ptr[CBuffer] *data) + CRayStatus Create(const shared_ptr[CBuffer] &metadata, + const size_t data_size, const CObjectID &object_id, + shared_ptr[CBuffer] *data) + CRayStatus Seal(const CObjectID &object_id) + CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms, + c_vector[shared_ptr[CRayObject]] *results) + CRayStatus Contains(const CObjectID &object_id, c_bool *has_object) + CRayStatus Wait(const c_vector[CObjectID] &object_ids, int num_objects, + int64_t timeout_ms, c_vector[c_bool] *results) + CRayStatus Delete(const c_vector[CObjectID] &object_ids, + c_bool local_only, c_bool delete_creating_tasks) + c_string MemoryUsageString() diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index c49fbacd9..c063c2ab4 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -81,8 +81,6 @@ class TaskArg { const std::shared_ptr value_; }; -enum class StoreProviderType { PLASMA, MEMORY }; - enum class TaskTransportType { RAYLET, DIRECT_ACTOR }; /// Options for all tasks (actor and non-actor) except for actor creation. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 951f9048d..9a8413b82 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -37,6 +37,33 @@ void BuildCommonTaskSpec( } } +// Group object ids according the the corresponding store providers. +void GroupObjectIdsByStoreProvider(const std::vector &object_ids, + std::unordered_set *plasma_object_ids, + std::unordered_set *memory_object_ids) { + // There are two cases: + // - for task return objects from direct actor call, use memory store provider; + // - all the others use plasma store provider. + for (const auto &object_id : object_ids) { + // For raylet transport we always use plasma store provider, for direct actor call + // there are a few cases: + // - objects manually added to store by `ray.put`: for these objects they always use + // plasma store provider; + // - task arguments: these objects are passed by value, and are not put into store; + // - task return objects: these are put into memory store of the task submitter + // and are only used locally. + // Thus we need to check whether this object is a task return object in additional + // to whether it's from direct actor call before we can choose memory store provider. + if (object_id.IsReturnObject() && + object_id.GetTransportType() == + static_cast(ray::TaskTransportType::DIRECT_ACTOR)) { + memory_object_ids->insert(object_id); + } else { + plasma_object_ids->insert(object_id); + } + } +} + } // namespace namespace ray { @@ -50,12 +77,13 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, : worker_type_(worker_type), language_(language), log_dir_(log_dir), + check_signals_(check_signals), worker_context_(worker_type, job_id), io_work_(io_service_), heartbeat_timer_(io_service_), worker_server_(WorkerTypeString(worker_type), 0 /* let grpc choose a port */), gcs_client_(gcs_options), - object_interface_(worker_context_, raylet_client_, store_socket, check_signals), + memory_store_(std::make_shared()), task_execution_service_work_(task_execution_service_), task_execution_callback_(task_execution_callback) { // Initialize logging if log_dir is passed. Otherwise, it must be initialized @@ -84,12 +112,11 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, std::placeholders::_2, std::placeholders::_3); raylet_task_receiver_ = std::unique_ptr(new CoreWorkerRayletTaskReceiver( - worker_context_, raylet_client_, object_interface_, task_execution_service_, - worker_server_, execute_task)); + worker_context_, raylet_client_, task_execution_service_, worker_server_, + execute_task)); direct_actor_task_receiver_ = std::unique_ptr( - new CoreWorkerDirectActorTaskReceiver(worker_context_, object_interface_, - task_execution_service_, worker_server_, - execute_task)); + new CoreWorkerDirectActorTaskReceiver(worker_context_, task_execution_service_, + worker_server_, execute_task)); } // Start RPC server after all the task receivers are properly initialized. @@ -115,6 +142,10 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, io_thread_ = std::thread(&CoreWorker::RunIOService, this); + plasma_store_provider_.reset( + new CoreWorkerPlasmaStoreProvider(store_socket, raylet_client_, check_signals_)); + memory_store_provider_.reset(new CoreWorkerMemoryStoreProvider(memory_store_)); + // Create an entry for the driver task in the task table. This task is // added immediately with status RUNNING. This allows us to push errors // related to this driver task back to the driver. For example, if the @@ -137,9 +168,11 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, SetCurrentTaskId(task_id); } + // TODO(edoakes): why don't we just share the memory store provider? direct_actor_submitter_ = std::unique_ptr( new CoreWorkerDirectActorTaskSubmitter( - io_service_, object_interface_.CreateStoreProvider(StoreProviderType::MEMORY))); + io_service_, std::unique_ptr( + new CoreWorkerMemoryStoreProvider(memory_store_)))); } CoreWorker::~CoreWorker() { @@ -224,6 +257,163 @@ void CoreWorker::ReportActiveObjectIDs() { active_object_ids_updated_ = false; } +Status CoreWorker::SetClientOptions(std::string name, int64_t limit_bytes) { + // Currently only the Plasma store supports client options. + return plasma_store_provider_->SetClientOptions(name, limit_bytes); +} + +Status CoreWorker::Put(const RayObject &object, ObjectID *object_id) { + *object_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(), + worker_context_.GetNextPutIndex(), + static_cast(TaskTransportType::RAYLET)); + return Put(object, *object_id); +} + +Status CoreWorker::Put(const RayObject &object, const ObjectID &object_id) { + RAY_CHECK(object_id.GetTransportType() == + static_cast(TaskTransportType::RAYLET)) + << "Invalid transport type flag in object ID: " << object_id.GetTransportType(); + return plasma_store_provider_->Put(object, object_id); +} + +Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t data_size, + ObjectID *object_id, std::shared_ptr *data) { + *object_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(), + worker_context_.GetNextPutIndex(), + static_cast(TaskTransportType::RAYLET)); + return Create(metadata, data_size, *object_id, data); +} + +Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t data_size, + const ObjectID &object_id, std::shared_ptr *data) { + return plasma_store_provider_->Create(metadata, data_size, object_id, data); +} + +Status CoreWorker::Seal(const ObjectID &object_id) { + return plasma_store_provider_->Seal(object_id); +} + +Status CoreWorker::Get(const std::vector &ids, int64_t timeout_ms, + std::vector> *results) { + results->resize(ids.size(), nullptr); + + std::unordered_set plasma_object_ids; + std::unordered_set memory_object_ids; + GroupObjectIdsByStoreProvider(ids, &plasma_object_ids, &memory_object_ids); + + bool got_exception = false; + std::unordered_map> result_map; + auto start_time = current_time_ms(); + RAY_RETURN_NOT_OK(plasma_store_provider_->Get(plasma_object_ids, timeout_ms, + worker_context_.GetCurrentTaskID(), + &result_map, &got_exception)); + + if (!got_exception) { + if (timeout_ms >= 0) { + timeout_ms = std::max(static_cast(0), + timeout_ms - (current_time_ms() - start_time)); + } + RAY_RETURN_NOT_OK(memory_store_provider_->Get(memory_object_ids, timeout_ms, + worker_context_.GetCurrentTaskID(), + &result_map, &got_exception)); + } + + // Loop through `ids` and fill each entry for the `results` vector, + // this ensures that entries `results` have exactly the same order as + // they are in `ids`. When there are duplicate object ids, all the entries + // for the same id are filled in. + for (size_t i = 0; i < ids.size(); i++) { + if (result_map.find(ids[i]) != result_map.end()) { + (*results)[i] = result_map[ids[i]]; + } + } + + return Status::OK(); +} + +Status CoreWorker::Contains(const ObjectID &object_id, bool *has_object) { + // Currently only the Plasma store supports Contains(). + return plasma_store_provider_->Contains(object_id, has_object); +} + +Status CoreWorker::Wait(const std::vector &ids, int num_objects, + int64_t timeout_ms, std::vector *results) { + results->resize(ids.size(), false); + + if (num_objects <= 0 || num_objects > static_cast(ids.size())) { + return Status::Invalid( + "Number of objects to wait for must be between 1 and the number of ids."); + } + + std::unordered_set plasma_object_ids; + std::unordered_set memory_object_ids; + GroupObjectIdsByStoreProvider(ids, &plasma_object_ids, &memory_object_ids); + + if (plasma_object_ids.size() + memory_object_ids.size() != ids.size()) { + return Status::Invalid("Duplicate object IDs not supported in wait."); + } + + // TODO(edoakes): this logic is not ideal, and will have to be addressed + // before we enable direct actor calls in the Python code. If we are waiting + // on a list of objects mixed between multiple store providers, we could + // easily end up in the situation where we're blocked waiting on one store + // provider while another actually has enough objects ready to fulfill + // 'num_objects'. This is partially addressed by trying them all once with + // a timeout of 0, but that does not address the situation where objects + // become available on the second store provider while waiting on the first. + + std::unordered_set ready; + // Wait from both store providers with timeout set to 0. This is to avoid the case + // where we might use up the entire timeout on trying to get objects from one store + // provider before even trying another (which might have all of the objects available). + RAY_RETURN_NOT_OK( + plasma_store_provider_->Wait(plasma_object_ids, num_objects, /*timeout_ms=*/0, + worker_context_.GetCurrentTaskID(), &ready)); + RAY_RETURN_NOT_OK(memory_store_provider_->Wait( + memory_object_ids, std::max(0, static_cast(ready.size()) - num_objects), + /*timeout_ms=*/0, worker_context_.GetCurrentTaskID(), &ready)); + + if (static_cast(ready.size()) < num_objects && timeout_ms != 0) { + int64_t start_time = current_time_ms(); + RAY_RETURN_NOT_OK( + plasma_store_provider_->Wait(plasma_object_ids, num_objects, timeout_ms, + worker_context_.GetCurrentTaskID(), &ready)); + if (timeout_ms > 0) { + timeout_ms = + std::max(0, static_cast(timeout_ms - (current_time_ms() - start_time))); + } + RAY_RETURN_NOT_OK( + memory_store_provider_->Wait(memory_object_ids, num_objects, timeout_ms, + worker_context_.GetCurrentTaskID(), &ready)); + } + + for (size_t i = 0; i < ids.size(); i++) { + if (ready.find(ids[i]) != ready.end()) { + results->at(i) = true; + } + } + + return Status::OK(); +} + +Status CoreWorker::Delete(const std::vector &object_ids, bool local_only, + bool delete_creating_tasks) { + std::unordered_set plasma_object_ids; + std::unordered_set memory_object_ids; + GroupObjectIdsByStoreProvider(object_ids, &plasma_object_ids, &memory_object_ids); + + RAY_RETURN_NOT_OK(plasma_store_provider_->Delete(plasma_object_ids, local_only, + delete_creating_tasks)); + RAY_RETURN_NOT_OK(memory_store_provider_->Delete(memory_object_ids)); + + return Status::OK(); +} + +std::string CoreWorker::MemoryUsageString() { + // Currently only the Plasma store returns a debug string. + return plasma_store_provider_->MemoryUsageString(); +} + TaskID CoreWorker::GetCallerId() const { TaskID caller_id; ActorID actor_id = GetActorId(); @@ -438,6 +628,25 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, SetCurrentTaskId(TaskID::Nil()); worker_context_.ResetCurrentTask(task_spec); + // TODO(edoakes): also check if not direct actor call. + // TODO(edoakes): this is only used by java. + if (results->size() != 0) { + for (size_t i = 0; i < results->size(); i++) { + ObjectID id = ObjectID::ForTaskReturn( + task_spec.TaskId(), /*index=*/i + 1, + /*transport_type=*/static_cast(TaskTransportType::RAYLET)); + if (!Put(*results->at(i), id).ok()) { + // NOTE(hchen): `PlasmaObjectExists` error is already ignored inside + // Put`, we treat other error types as fatal here. + RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to put object " << id + << " in store: " << status.message(); + } else { + RAY_LOG(DEBUG) << "Task " << task_spec.TaskId() << " put object " << id + << " in store."; + } + } + } + // TODO(zhijunfu): // 1. Check and handle failure. // 2. Save or load checkpoint. @@ -481,7 +690,7 @@ Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task, } std::vector> results; - auto status = object_interface_.Get(object_ids_to_fetch, -1, &results); + auto status = Get(object_ids_to_fetch, -1, &results); if (status.ok()) { for (size_t i = 0; i < results.size(); i++) { args->at(indices[i]) = results[i]; diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index f3ba51011..700f7cdca 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -5,8 +5,9 @@ #include "ray/core_worker/actor_handle.h" #include "ray/core_worker/common.h" #include "ray/core_worker/context.h" -#include "ray/core_worker/object_interface.h" #include "ray/core_worker/profiling.h" +#include "ray/core_worker/store_provider/memory_store_provider.h" +#include "ray/core_worker/store_provider/plasma_store_provider.h" #include "ray/core_worker/transport/direct_actor_transport.h" #include "ray/core_worker/transport/raylet_transport.h" #include "ray/gcs/redis_gcs_client.h" @@ -67,8 +68,6 @@ class CoreWorker { RayletClient &GetRayletClient() { return *raylet_client_; } - CoreWorkerObjectInterface &Objects() { return object_interface_; } - const TaskID &GetCurrentTaskId() const { return worker_context_.GetCurrentTaskID(); } void SetCurrentTaskId(const TaskID &task_id); @@ -88,6 +87,109 @@ class CoreWorker { // in the heartbeat messsage. void RemoveActiveObjectID(const ObjectID &object_id); + /* Public methods related to storing and retrieving objects. */ + + /// Set options for this client's interactions with the object store. + /// + /// \param[in] name Unique name for this object store client. + /// \param[in] limit The maximum amount of memory in bytes that this client + /// can use in the object store. + Status SetClientOptions(std::string name, int64_t limit_bytes); + + /// Put an object into object store. + /// + /// \param[in] object The ray object. + /// \param[out] object_id Generated ID of the object. + /// \return Status. + Status Put(const RayObject &object, ObjectID *object_id); + + /// Put an object with specified ID into object store. + /// + /// \param[in] object The ray object. + /// \param[in] object_id Object ID specified by the user. + /// \return Status. + Status Put(const RayObject &object, const ObjectID &object_id); + + /// Create and return a buffer in the object store that can be directly written + /// into. After writing to the buffer, the caller must call `Seal()` to finalize + /// the object. The `Create()` and `Seal()` combination is an alternative interface + /// to `Put()` that allows frontends to avoid an extra copy when possible. + /// + /// \param[in] metadata Metadata of the object to be written. + /// \param[in] data_size Size of the object to be written. + /// \param[out] object_id Object ID generated for the put. + /// \param[out] data Buffer for the user to write the object into. + /// \return Status. + Status Create(const std::shared_ptr &metadata, const size_t data_size, + ObjectID *object_id, std::shared_ptr *data); + + /// Create and return a buffer in the object store that can be directly written + /// into. After writing to the buffer, the caller must call `Seal()` to finalize + /// the object. The `Create()` and `Seal()` combination is an alternative interface + /// to `Put()` that allows frontends to avoid an extra copy when possible. + /// + /// \param[in] metadata Metadata of the object to be written. + /// \param[in] data_size Size of the object to be written. + /// \param[in] object_id Object ID specified by the user. + /// \param[out] data Buffer for the user to write the object into. + /// \return Status. + Status Create(const std::shared_ptr &metadata, const size_t data_size, + const ObjectID &object_id, std::shared_ptr *data); + + /// Finalize placing an object into the object store. This should be called after + /// a corresponding `Create()` call and then writing into the returned buffer. + /// + /// \param[in] object_id Object ID corresponding to the object. + /// \return Status. + Status Seal(const ObjectID &object_id); + + /// Get a list of objects from the object store. Objects that failed to be retrieved + /// will be returned as nullptrs. + /// + /// \param[in] ids IDs of the objects to get. + /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. + /// \param[out] results Result list of objects data. + /// \return Status. + Status Get(const std::vector &ids, int64_t timeout_ms, + std::vector> *results); + + /// Return whether or not the object store contains the given object. + /// + /// \param[in] object_id ID of the objects to check for. + /// \param[out] has_object Whether or not the object is present. + /// \return Status. + Status Contains(const ObjectID &object_id, bool *has_object); + + /// Wait for a list of objects to appear in the object store. + /// Duplicate object ids are supported, and `num_objects` includes duplicate ids in this + /// case. + /// TODO(zhijunfu): it is probably more clear in semantics to just fail when there + /// are duplicates, and require it to be handled at application level. + /// + /// \param[in] IDs of the objects to wait for. + /// \param[in] num_objects Number of objects that should appear. + /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. + /// \param[out] results A bitset that indicates each object has appeared or not. + /// \return Status. + Status Wait(const std::vector &object_ids, int num_objects, + int64_t timeout_ms, std::vector *results); + + /// Delete a list of objects from the object store. + /// + /// \param[in] object_ids IDs of the objects to delete. + /// \param[in] local_only Whether only delete the objects in local node, or all nodes in + /// the cluster. + /// \param[in] delete_creating_tasks Whether also delete the tasks that + /// created these objects. + /// \return Status. + Status Delete(const std::vector &object_ids, bool local_only, + bool delete_creating_tasks); + + /// Get a string describing object store memory usage for debugging purposes. + /// + /// \return std::string The string describing memory usage. + std::string MemoryUsageString(); + /* Public methods related to task submission. */ /// Get the caller ID used to submit tasks from this worker to an actor. @@ -240,6 +342,11 @@ class CoreWorker { /// Directory where log files are written. const std::string log_dir_; + /// Application-language callback to check for signals that have been received + /// since calling into C++. This will be called periodically (at least every + /// 1s) during long-running operations. + std::function check_signals_; + /// Shared state of the worker. Includes process-level and thread-level state. /// TODO(edoakes): we should move process-level state into this class and make /// this a ThreadContext. @@ -279,8 +386,16 @@ class CoreWorker { /// last time it was sent to the raylet. bool active_object_ids_updated_ = false; - // Interface for storing and retrieving shared objects. - CoreWorkerObjectInterface object_interface_; + /* Fields related to storing and retrieving objects. */ + + /// In-memory store for return objects. This is used for `MEMORY` store provider. + std::shared_ptr memory_store_; + + /// Plasma store interface. + std::unique_ptr plasma_store_provider_; + + /// In-memory store interface. + std::unique_ptr memory_store_provider_; /* Fields related to task submission. */ @@ -301,14 +416,14 @@ class CoreWorker { /// The asio work to keep task_execution_service_ alive. boost::asio::io_service::work task_execution_service_work_; - // Profiler including a background thread that pushes profiling events to the GCS. + /// Profiler including a background thread that pushes profiling events to the GCS. std::shared_ptr profiler_; - // Profile event for when the worker is idle. Should be reset when the worker - // enters and exits an idle period. + /// Profile event for when the worker is idle. Should be reset when the worker + /// enters and exits an idle period. std::unique_ptr idle_profile_event_; - // Task execution callback. + /// Task execution callback. TaskExecutionCallback task_execution_callback_; /// A map from resource name to the resource IDs that are currently reserved diff --git a/src/ray/core_worker/lib/java/jni_utils.h b/src/ray/core_worker/lib/java/jni_utils.h index 5edf9160d..365b885c0 100644 --- a/src/ray/core_worker/lib/java/jni_utils.h +++ b/src/ray/core_worker/lib/java/jni_utils.h @@ -6,7 +6,6 @@ #include "ray/common/id.h" #include "ray/common/ray_object.h" #include "ray/common/status.h" -#include "ray/core_worker/store_provider/store_provider.h" /// Boolean class extern jclass java_boolean_class; diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_object_NativeObjectStore.cc b/src/ray/core_worker/lib/java/org_ray_runtime_object_NativeObjectStore.cc index 24ad763bb..c6b256f95 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_object_NativeObjectStore.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_object_NativeObjectStore.cc @@ -4,7 +4,6 @@ #include "ray/core_worker/common.h" #include "ray/core_worker/core_worker.h" #include "ray/core_worker/lib/java/jni_utils.h" -#include "ray/core_worker/object_interface.h" inline ray::CoreWorkerObjectInterface &GetObjectInterfaceFromPointer( jlong nativeCoreWorkerPointer) { @@ -26,8 +25,8 @@ Java_org_ray_runtime_object_NativeObjectStore_nativePut__JLorg_ray_runtime_objec auto ray_object = JavaNativeRayObjectToNativeRayObject(env, obj); RAY_CHECK(ray_object != nullptr); ray::ObjectID object_id; - auto status = - GetObjectInterfaceFromPointer(nativeCoreWorkerPointer).Put(*ray_object, &object_id); + auto status = reinterpret_cast(nativeCoreWorkerPointer) + .Put(*ray_object, &object_id); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); return IdToJavaByteArray(env, object_id); } @@ -44,8 +43,8 @@ Java_org_ray_runtime_object_NativeObjectStore_nativePut__J_3BLorg_ray_runtime_ob auto object_id = JavaByteArrayToId(env, objectId); auto ray_object = JavaNativeRayObjectToNativeRayObject(env, obj); RAY_CHECK(ray_object != nullptr); - auto status = - GetObjectInterfaceFromPointer(nativeCoreWorkerPointer).Put(*ray_object, object_id); + auto status = reinterpret_cast(nativeCoreWorkerPointer) + .Put(*ray_object, object_id); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0); } @@ -62,7 +61,7 @@ JNIEXPORT jobject JNICALL Java_org_ray_runtime_object_NativeObjectStore_nativeGe return JavaByteArrayToId(env, static_cast(id)); }); std::vector> results; - auto status = GetObjectInterfaceFromPointer(nativeCoreWorkerPointer) + auto status = reinterpret_cast(nativeCoreWorkerPointer) .Get(object_ids, (int64_t)timeoutMs, &results); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); return NativeVectorToJavaList>( @@ -83,7 +82,7 @@ JNIEXPORT jobject JNICALL Java_org_ray_runtime_object_NativeObjectStore_nativeWa return JavaByteArrayToId(env, static_cast(id)); }); std::vector results; - auto status = GetObjectInterfaceFromPointer(nativeCoreWorkerPointer) + auto status = reinterpret_cast(nativeCoreWorkerPointer) .Wait(object_ids, (int)numObjects, (int64_t)timeoutMs, &results); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); return NativeVectorToJavaList(env, results, [](JNIEnv *env, const bool &item) { @@ -104,7 +103,7 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_object_NativeObjectStore_nativeDelet env, objectIds, &object_ids, [](JNIEnv *env, jobject id) { return JavaByteArrayToId(env, static_cast(id)); }); - auto status = GetObjectInterfaceFromPointer(nativeCoreWorkerPointer) + auto status = reinterpret_cast(nativeCoreWorkerPointer) .Delete(object_ids, (bool)localOnly, (bool)deleteCreatingTasks); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0); } diff --git a/src/ray/core_worker/object_interface.cc b/src/ray/core_worker/object_interface.cc deleted file mode 100644 index 43be90e11..000000000 --- a/src/ray/core_worker/object_interface.cc +++ /dev/null @@ -1,291 +0,0 @@ -#include - -#include "ray/common/ray_config.h" -#include "ray/core_worker/object_interface.h" -#include "ray/core_worker/store_provider/memory_store_provider.h" -#include "ray/core_worker/store_provider/plasma_store_provider.h" - -namespace ray { - -// Group object ids according the the corresponding store providers. -void CoreWorkerObjectInterface::GroupObjectIdsByStoreProvider( - const std::vector &object_ids, - EnumUnorderedMap> *results) { - // There are two cases: - // - for task return objects from direct actor call, use memory store provider; - // - all the others use plasma store provider. - for (const auto &object_id : object_ids) { - auto type = StoreProviderType::PLASMA; - // For raylet transport we always use plasma store provider, for direct actor call - // there are a few cases: - // - objects manually added to store by `ray.put`: for these objects they always use - // plasma store provider; - // - task arguments: these objects are passed by value, and are not put into store; - // - task return objects: these are put into memory store of the task submitter - // and are only used locally. - // Thus we need to check whether this object is a task return object in additional - // to whether it's from direct actor call before we can choose memory store provider. - if (object_id.IsReturnObject() && - object_id.GetTransportType() == - static_cast(TaskTransportType::DIRECT_ACTOR)) { - type = StoreProviderType::MEMORY; - } - - (*results)[type].insert(object_id); - } -} - -CoreWorkerObjectInterface::CoreWorkerObjectInterface( - WorkerContext &worker_context, std::unique_ptr &raylet_client, - const std::string &store_socket, std::function check_signals) - : worker_context_(worker_context), - raylet_client_(raylet_client), - store_socket_(store_socket), - memory_store_(std::make_shared()) { - check_signals_ = check_signals; - AddStoreProvider(StoreProviderType::PLASMA); - AddStoreProvider(StoreProviderType::MEMORY); -} - -Status CoreWorkerObjectInterface::SetClientOptions(std::string name, - int64_t limit_bytes) { - // Currently only the Plasma store supports client options. - return store_providers_[StoreProviderType::PLASMA]->SetClientOptions(name, limit_bytes); -} - -Status CoreWorkerObjectInterface::Put(const RayObject &object, ObjectID *object_id) { - *object_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(), - worker_context_.GetNextPutIndex(), - static_cast(TaskTransportType::RAYLET)); - return Put(object, *object_id); -} - -Status CoreWorkerObjectInterface::Put(const RayObject &object, - const ObjectID &object_id) { - RAY_CHECK(object_id.GetTransportType() == - static_cast(TaskTransportType::RAYLET)) - << "Invalid transport type flag in object ID: " << object_id.GetTransportType(); - return store_providers_[StoreProviderType::PLASMA]->Put(object, object_id); -} - -Status CoreWorkerObjectInterface::Create(const std::shared_ptr &metadata, - const size_t data_size, ObjectID *object_id, - std::shared_ptr *data) { - *object_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(), - worker_context_.GetNextPutIndex(), - static_cast(TaskTransportType::RAYLET)); - return Create(metadata, data_size, *object_id, data); -} - -Status CoreWorkerObjectInterface::Create(const std::shared_ptr &metadata, - const size_t data_size, - const ObjectID &object_id, - std::shared_ptr *data) { - return store_providers_[StoreProviderType::PLASMA]->Create(metadata, data_size, - object_id, data); -} - -Status CoreWorkerObjectInterface::Seal(const ObjectID &object_id) { - return store_providers_[StoreProviderType::PLASMA]->Seal(object_id); -} - -Status CoreWorkerObjectInterface::Get(const std::vector &ids, - int64_t timeout_ms, - std::vector> *results) { - (*results).resize(ids.size(), nullptr); - - // Divide the object ids by store provider type. For each store provider, - // maintain an unordered_set which does proper de-duplication, thus the - // store provider could simply assume its object ids don't have duplicates. - EnumUnorderedMap> - object_ids_per_store_provider; - GroupObjectIdsByStoreProvider(ids, &object_ids_per_store_provider); - - std::unordered_map> result_map; - auto remaining_timeout_ms = timeout_ms; - bool got_exception = false; - - // Re-order the list so that we always get from plasma store provider first, - // since it uses a loop of `FetchOrReconstruct` and plasma `Get`, it's not - // desirable if other store providers use up the timeout and leaves no time - // for plasma provider to reconstruct the objects as necessary. - std::list>>> - ids_per_provider; - for (const auto &entry : object_ids_per_store_provider) { - auto list_entry = std::make_pair(entry.first, std::ref(entry.second)); - if (entry.first == StoreProviderType::PLASMA) { - ids_per_provider.emplace_front(list_entry); - } else { - ids_per_provider.emplace_back(list_entry); - } - } - - // Note that if one store provider uses up the timeout, we will still try the others - // with a timeout of 0. - for (const auto &entry : ids_per_provider) { - auto start_time = current_time_ms(); - RAY_RETURN_NOT_OK(store_providers_[entry.first]->Get( - entry.second, remaining_timeout_ms, worker_context_.GetCurrentTaskID(), - &result_map, &got_exception)); - if (got_exception) { - break; - } - if (remaining_timeout_ms > 0) { - int64_t duration = current_time_ms() - start_time; - remaining_timeout_ms = - std::max(static_cast(0), remaining_timeout_ms - duration); - } - } - - // Loop through `ids` and fill each entry for the `results` vector, - // this ensures that entries `results` have exactly the same order as - // they are in `ids`. When there are duplicate object ids, all the entries - // for the same id are filled in. - for (size_t i = 0; i < ids.size(); i++) { - if (result_map.find(ids[i]) != result_map.end()) { - (*results)[i] = result_map[ids[i]]; - } - } - - return Status::OK(); -} - -Status CoreWorkerObjectInterface::Contains(const ObjectID &object_id, bool *has_object) { - // Currently only the Plasma store supports Contains(). - return store_providers_[StoreProviderType::PLASMA]->Contains(object_id, has_object); -} - -Status CoreWorkerObjectInterface::Wait(const std::vector &ids, int num_objects, - int64_t timeout_ms, std::vector *results) { - (*results).resize(ids.size(), false); - - if (num_objects <= 0 || num_objects > static_cast(ids.size())) { - return Status::Invalid( - "Number of objects to wait for must be between 1 and the number of ids."); - } - - EnumUnorderedMap> - object_ids_per_store_provider; - GroupObjectIdsByStoreProvider(ids, &object_ids_per_store_provider); - - size_t total_count = 0; - for (const auto &entry : object_ids_per_store_provider) { - total_count += entry.second.size(); - } - if (total_count != ids.size()) { - return Status::Invalid("Duplicate object IDs not supported in wait."); - } - - // TODO(edoakes): this logic is not ideal, and will have to be addressed - // before we enable direct actor calls in the Python code. If we are waiting - // on a list of objects mixed between multiple store providers, we could - // easily end up in the situation where we're blocked waiting on one store - // provider while another actually has enough objects ready to fulfill - // 'num_objects'. This is partially addressed by trying them all once with - // a timeout of 0, but that does not address the situation where objects - // become available on the second store provider while waiting on the first. - - std::unordered_set ready; - // Wait from all the store providers with timeout set to 0. This is to avoid the case - // where we might use up the entire timeout on trying to get objects from one store - // provider before even trying another (which might have all of the objects available). - RAY_RETURN_NOT_OK(WaitFromMultipleStoreProviders(object_ids_per_store_provider, - /*timeout_ms=*/0, &num_objects, - &ready)); - - if (num_objects > 0) { - // Wait from all the store providers with the specified timeout - // if the required number of objects haven't been ready yet. - RAY_RETURN_NOT_OK(WaitFromMultipleStoreProviders(object_ids_per_store_provider, - /*timeout_ms=*/timeout_ms, - &num_objects, &ready)); - } - - for (size_t i = 0; i < ids.size(); i++) { - if (ready.find(ids[i]) != ready.end()) { - (*results)[i] = true; - } - } - - return Status::OK(); -} - -Status CoreWorkerObjectInterface::WaitFromMultipleStoreProviders( - EnumUnorderedMap> &ids_per_provider, - int64_t timeout_ms, int *num_objects, std::unordered_set *ready) { - int64_t remaining_timeout_ms = timeout_ms; - for (auto &provider_entry : ids_per_provider) { - if (*num_objects <= 0) { - break; - } - int64_t start_time = current_time_ms(); - int required_objects = - std::min(static_cast(provider_entry.second.size()), *num_objects); - std::unordered_set provider_ready; - RAY_RETURN_NOT_OK(store_providers_[provider_entry.first]->Wait( - provider_entry.second, required_objects, remaining_timeout_ms, - worker_context_.GetCurrentTaskID(), &provider_ready)); - - // Update num_objects and remove the ready objects from the list so they don't get - // double-counted. - *num_objects -= provider_ready.size(); - for (const ObjectID &ready_id : provider_ready) { - ready->insert(ready_id); - provider_entry.second.erase(ready_id); - } - - if (remaining_timeout_ms > 0) { - int64_t duration = current_time_ms() - start_time; - remaining_timeout_ms = - std::max(static_cast(0), remaining_timeout_ms - duration); - } - } - - return Status::OK(); -} - -Status CoreWorkerObjectInterface::Delete(const std::vector &object_ids, - bool local_only, bool delete_creating_tasks) { - EnumUnorderedMap> - object_ids_per_store_provider; - GroupObjectIdsByStoreProvider(object_ids, &object_ids_per_store_provider); - - for (const auto &entry : object_ids_per_store_provider) { - auto type = entry.first; - bool is_plasma = (type == StoreProviderType::PLASMA); - - std::vector ids(entry.second.begin(), entry.second.end()); - RAY_RETURN_NOT_OK(store_providers_[type]->Delete( - ids, is_plasma ? local_only : false, is_plasma ? delete_creating_tasks : false)); - } - - return Status::OK(); -} - -std::string CoreWorkerObjectInterface::MemoryUsageString() { - // Currently only the Plasma store returns a debug string. - return store_providers_[StoreProviderType::PLASMA]->MemoryUsageString(); -} - -void CoreWorkerObjectInterface::AddStoreProvider(StoreProviderType type) { - store_providers_.emplace(type, CreateStoreProvider(type)); -} - -std::unique_ptr CoreWorkerObjectInterface::CreateStoreProvider( - StoreProviderType type) const { - switch (type) { - case StoreProviderType::PLASMA: - return std::unique_ptr( - new CoreWorkerPlasmaStoreProvider(store_socket_, raylet_client_, check_signals_)); - case StoreProviderType::MEMORY: - return std::unique_ptr( - new CoreWorkerMemoryStoreProvider(memory_store_)); - break; - default: - RAY_LOG(FATAL) << "unknown store provider type " << static_cast(type); - return nullptr; - } -} - -} // namespace ray diff --git a/src/ray/core_worker/object_interface.h b/src/ray/core_worker/object_interface.h deleted file mode 100644 index 48ae5de62..000000000 --- a/src/ray/core_worker/object_interface.h +++ /dev/null @@ -1,186 +0,0 @@ -#ifndef RAY_CORE_WORKER_OBJECT_INTERFACE_H -#define RAY_CORE_WORKER_OBJECT_INTERFACE_H - -#include "plasma/client.h" -#include "ray/common/buffer.h" -#include "ray/common/id.h" -#include "ray/common/status.h" -#include "ray/core_worker/common.h" -#include "ray/core_worker/context.h" -#include "ray/core_worker/store_provider/store_provider.h" - -namespace ray { - -class CoreWorker; -class CoreWorkerStoreProvider; -class CoreWorkerMemoryStore; - -/// The interface that contains all `CoreWorker` methods related to the object store. -class CoreWorkerObjectInterface { - public: - /// \param[in] worker_context WorkerContext of the parent CoreWorker. - /// \param[in] store_socket Path to the plasma store socket. - CoreWorkerObjectInterface(WorkerContext &worker_context, - std::unique_ptr &raylet_client, - const std::string &store_socket, - std::function check_signals = nullptr); - - /// Set options for this client's interactions with the object store. - /// - /// \param[in] name Unique name for this object store client. - /// \param[in] limit The maximum amount of memory in bytes that this client - /// can use in the object store. - Status SetClientOptions(std::string name, int64_t limit_bytes); - - /// Put an object into object store. - /// - /// \param[in] object The ray object. - /// \param[out] object_id Generated ID of the object. - /// \return Status. - Status Put(const RayObject &object, ObjectID *object_id); - - /// Put an object with specified ID into object store. - /// - /// \param[in] object The ray object. - /// \param[in] object_id Object ID specified by the user. - /// \return Status. - Status Put(const RayObject &object, const ObjectID &object_id); - - /// Create and return a buffer in the object store that can be directly written - /// into. After writing to the buffer, the caller must call `Seal()` to finalize - /// the object. The `Create()` and `Seal()` combination is an alternative interface - /// to `Put()` that allows frontends to avoid an extra copy when possible. - /// - /// \param[in] metadata Metadata of the object to be written. - /// \param[in] data_size Size of the object to be written. - /// \param[out] object_id Object ID generated for the put. - /// \param[out] data Buffer for the user to write the object into. - /// \return Status. - Status Create(const std::shared_ptr &metadata, const size_t data_size, - ObjectID *object_id, std::shared_ptr *data); - - /// Create and return a buffer in the object store that can be directly written - /// into. After writing to the buffer, the caller must call `Seal()` to finalize - /// the object. The `Create()` and `Seal()` combination is an alternative interface - /// to `Put()` that allows frontends to avoid an extra copy when possible. - /// - /// \param[in] metadata Metadata of the object to be written. - /// \param[in] data_size Size of the object to be written. - /// \param[in] object_id Object ID specified by the user. - /// \param[out] data Buffer for the user to write the object into. - /// \return Status. - Status Create(const std::shared_ptr &metadata, const size_t data_size, - const ObjectID &object_id, std::shared_ptr *data); - - /// Finalize placing an object into the object store. This should be called after - /// a corresponding `Create()` call and then writing into the returned buffer. - /// - /// \param[in] object_id Object ID corresponding to the object. - /// \return Status. - Status Seal(const ObjectID &object_id); - - /// Get a list of objects from the object store. Objects that failed to be retrieved - /// will be returned as nullptrs. - /// - /// \param[in] ids IDs of the objects to get. - /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. - /// \param[out] results Result list of objects data. - /// \return Status. - Status Get(const std::vector &ids, int64_t timeout_ms, - std::vector> *results); - - /// Return whether or not the object store contains the given object. - /// - /// \param[in] object_id ID of the objects to check for. - /// \param[out] has_object Whether or not the object is present. - /// \return Status. - Status Contains(const ObjectID &object_id, bool *has_object); - - /// Wait for a list of objects to appear in the object store. - /// Duplicate object ids are supported, and `num_objects` includes duplicate ids in this - /// case. - /// TODO(zhijunfu): it is probably more clear in semantics to just fail when there - /// are duplicates, and require it to be handled at application level. - /// - /// \param[in] IDs of the objects to wait for. - /// \param[in] num_objects Number of objects that should appear. - /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. - /// \param[out] results A bitset that indicates each object has appeared or not. - /// \return Status. - Status Wait(const std::vector &object_ids, int num_objects, - int64_t timeout_ms, std::vector *results); - - /// Delete a list of objects from the object store. - /// - /// \param[in] object_ids IDs of the objects to delete. - /// \param[in] local_only Whether only delete the objects in local node, or all nodes in - /// the cluster. - /// \param[in] delete_creating_tasks Whether also delete the tasks that - /// created these objects. - /// \return Status. - Status Delete(const std::vector &object_ids, bool local_only, - bool delete_creating_tasks); - - /// Get a string describing object store memory usage for debugging purposes. - /// - /// \return std::string The string describing memory usage. - std::string MemoryUsageString(); - - /// Create a new store provider for the specified type on demand. - std::unique_ptr CreateStoreProvider( - StoreProviderType type) const; - - private: - /// Helper function to group object IDs by the store provider that should be used - /// for them. - /// - /// \param[in] object_ids Object IDs to group. - /// \param[out] results Map of provider type to object IDs. - void GroupObjectIdsByStoreProvider( - const std::vector &object_ids, - EnumUnorderedMap> *results); - - /// Helper function to get a set of objects from different store providers. - /// - /// \param[in] ids_per_provider A map from store provider type to the set of - // object ids for that store provider. - /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's -1. - /// \param[in/out] num_objects Number of objects that should appear before returning. - /// Should be updated as objects are added to the ready set. - /// \param[in/out] results A set that holds objects that are ready. - /// \return Status. - Status WaitFromMultipleStoreProviders( - EnumUnorderedMap> &ids_per_provider, - int64_t timeout_ms, int *num_objects, std::unordered_set *results); - - /// Add a store provider for the specified type. - void AddStoreProvider(StoreProviderType type); - - /// Reference to the parent CoreWorker's context. - WorkerContext &worker_context_; - /// Reference to the parent CoreWorker's raylet client. - std::unique_ptr &raylet_client_; - - std::string store_socket_; - - /// In-memory store for return objects. This is used for `MEMORY` store provider. - std::shared_ptr memory_store_; - - /// All the store providers supported. - EnumUnorderedMap> - store_providers_; - - std::function check_signals_; - - friend class CoreWorkerTaskInterface; - - /// TODO(zhijunfu): This is necessary as direct call task submitter needs to create - /// a local plasma store provider, later we can refactor ObjectInterface to add a - /// `ObjectProviderLayer`, which will encapsulate the functionalities to get or create - /// a specific `StoreProvider`, and this can be removed then. - friend class CoreWorkerDirectActorTaskSubmitter; -}; - -} // namespace ray - -#endif // RAY_CORE_WORKER_OBJECT_INTERFACE_H diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index 8ff178936..0b645be75 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -2,7 +2,6 @@ #include "ray/common/ray_config.h" #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" -#include "ray/core_worker/object_interface.h" #include "ray/core_worker/store_provider/memory_store_provider.h" namespace ray { diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.h b/src/ray/core_worker/store_provider/memory_store/memory_store.h index 0a8422f81..3c4905db0 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.h +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.h @@ -4,7 +4,6 @@ #include "ray/common/id.h" #include "ray/common/status.h" #include "ray/core_worker/common.h" -#include "ray/core_worker/store_provider/store_provider.h" namespace ray { diff --git a/src/ray/core_worker/store_provider/memory_store_provider.cc b/src/ray/core_worker/store_provider/memory_store_provider.cc index 6a8a8dbcf..17290a28a 100644 --- a/src/ray/core_worker/store_provider/memory_store_provider.cc +++ b/src/ray/core_worker/store_provider/memory_store_provider.cc @@ -3,7 +3,6 @@ #include "ray/common/ray_config.h" #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" -#include "ray/core_worker/object_interface.h" namespace ray { @@ -13,12 +12,6 @@ CoreWorkerMemoryStoreProvider::CoreWorkerMemoryStoreProvider( RAY_CHECK(store != nullptr); } -Status CoreWorkerMemoryStoreProvider::SetClientOptions(std::string name, - int64_t limit_bytes) { - return Status::NotImplemented( - "SetClientOptions() not implemented for in-memory store."); -} - Status CoreWorkerMemoryStoreProvider::Put(const RayObject &object, const ObjectID &object_id) { Status status = store_->Put(object_id, object); @@ -29,19 +22,6 @@ Status CoreWorkerMemoryStoreProvider::Put(const RayObject &object, return status; } -Status CoreWorkerMemoryStoreProvider::Create(const std::shared_ptr &metadata, - const size_t data_size, - const ObjectID &object_id, - std::shared_ptr *data) { - return Status::NotImplemented( - "Create/Seal interface not implemented for in-memory store."); -} - -Status CoreWorkerMemoryStoreProvider::Seal(const ObjectID &object_id) { - return Status::NotImplemented( - "Create/Seal interface not implemented for in-memory store."); -} - Status CoreWorkerMemoryStoreProvider::Get( const std::unordered_set &object_ids, int64_t timeout_ms, const TaskID &task_id, @@ -63,11 +43,6 @@ Status CoreWorkerMemoryStoreProvider::Get( return Status::OK(); } -Status CoreWorkerMemoryStoreProvider::Contains(const ObjectID &object_id, - bool *has_object) { - return Status::NotImplemented("Contains() not implemented for in-memory store."); -} - Status CoreWorkerMemoryStoreProvider::Wait(const std::unordered_set &object_ids, int num_objects, int64_t timeout_ms, const TaskID &task_id, @@ -87,13 +62,11 @@ Status CoreWorkerMemoryStoreProvider::Wait(const std::unordered_set &o return Status::OK(); } -Status CoreWorkerMemoryStoreProvider::Delete(const std::vector &object_ids, - bool local_only, - bool delete_creating_tasks) { - store_->Delete(object_ids); +Status CoreWorkerMemoryStoreProvider::Delete( + const std::unordered_set &object_ids) { + std::vector object_id_vector(object_ids.begin(), object_ids.end()); + store_->Delete(object_id_vector); return Status::OK(); } -std::string CoreWorkerMemoryStoreProvider::MemoryUsageString() { return ""; } - } // namespace ray diff --git a/src/ray/core_worker/store_provider/memory_store_provider.h b/src/ray/core_worker/store_provider/memory_store_provider.h index 51d018064..68050472d 100644 --- a/src/ray/core_worker/store_provider/memory_store_provider.h +++ b/src/ray/core_worker/store_provider/memory_store_provider.h @@ -6,7 +6,6 @@ #include "ray/common/status.h" #include "ray/core_worker/common.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" -#include "ray/core_worker/store_provider/store_provider.h" namespace ray { @@ -16,36 +15,24 @@ class CoreWorker; /// An example usage for this is to retrieve the returned objects from direct /// actor call (see direct_actor_transport.cc). /// See `CoreWorkerStoreProvider` for the semantics of public methods. -class CoreWorkerMemoryStoreProvider : public CoreWorkerStoreProvider { +class CoreWorkerMemoryStoreProvider { public: CoreWorkerMemoryStoreProvider(std::shared_ptr store); - Status SetClientOptions(std::string name, int64_t limit_bytes) override; - - Status Put(const RayObject &object, const ObjectID &object_id) override; - - Status Create(const std::shared_ptr &metadata, const size_t data_size, - const ObjectID &object_id, std::shared_ptr *data) override; - - Status Seal(const ObjectID &object_id) override; + Status Put(const RayObject &object, const ObjectID &object_id); Status Get(const std::unordered_set &object_ids, int64_t timeout_ms, const TaskID &task_id, std::unordered_map> *results, - bool *got_exception) override; - - Status Contains(const ObjectID &object_id, bool *has_object) override; + bool *got_exception); /// Note that `num_objects` must equal to number of items in `object_ids`. Status Wait(const std::unordered_set &object_ids, int num_objects, int64_t timeout_ms, const TaskID &task_id, - std::unordered_set *ready) override; + std::unordered_set *ready); /// Note that `local_only` must be true, and `delete_creating_tasks` must be false here. - Status Delete(const std::vector &object_ids, bool local_only = true, - bool delete_creating_tasks = false) override; - - std::string MemoryUsageString() override; + Status Delete(const std::unordered_set &object_ids); private: /// Implementation. diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 4e071578f..0a3d8c3bc 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -2,13 +2,12 @@ #include "ray/common/ray_config.h" #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" -#include "ray/core_worker/object_interface.h" #include "ray/protobuf/gcs.pb.h" namespace ray { CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider( - const std::string &store_socket, std::unique_ptr &raylet_client, + const std::string &store_socket, const std::unique_ptr &raylet_client, std::function check_signals) : raylet_client_(raylet_client) { check_signals_ = check_signals; @@ -240,10 +239,11 @@ Status CoreWorkerPlasmaStoreProvider::Wait(const std::unordered_set &o return Status::OK(); } -Status CoreWorkerPlasmaStoreProvider::Delete(const std::vector &object_ids, - bool local_only, - bool delete_creating_tasks) { - return raylet_client_->FreeObjects(object_ids, local_only, delete_creating_tasks); +Status CoreWorkerPlasmaStoreProvider::Delete( + const std::unordered_set &object_ids, bool local_only, + bool delete_creating_tasks) { + std::vector object_id_vector(object_ids.begin(), object_ids.end()); + return raylet_client_->FreeObjects(object_id_vector, local_only, delete_creating_tasks); } std::string CoreWorkerPlasmaStoreProvider::MemoryUsageString() { diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index b7fabc2f8..d6c2de84d 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -6,7 +6,6 @@ #include "ray/common/id.h" #include "ray/common/status.h" #include "ray/core_worker/common.h" -#include "ray/core_worker/store_provider/store_provider.h" #include "ray/raylet/raylet_client.h" namespace ray { @@ -17,38 +16,38 @@ class CoreWorker; /// local and remote stores. Local access goes is done via a /// CoreWorkerLocalPlasmaStoreProvider and remote access goes through the raylet. /// See `CoreWorkerStoreProvider` for the semantics of public methods. -class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { +class CoreWorkerPlasmaStoreProvider { public: CoreWorkerPlasmaStoreProvider(const std::string &store_socket, - std::unique_ptr &raylet_client, + const std::unique_ptr &raylet_client, std::function check_signals); ~CoreWorkerPlasmaStoreProvider(); Status SetClientOptions(std::string name, int64_t limit_bytes); - Status Put(const RayObject &object, const ObjectID &object_id) override; + Status Put(const RayObject &object, const ObjectID &object_id); Status Create(const std::shared_ptr &metadata, const size_t data_size, - const ObjectID &object_id, std::shared_ptr *data) override; + const ObjectID &object_id, std::shared_ptr *data); - Status Seal(const ObjectID &object_id) override; + Status Seal(const ObjectID &object_id); Status Get(const std::unordered_set &object_ids, int64_t timeout_ms, const TaskID &task_id, std::unordered_map> *results, - bool *got_exception) override; + bool *got_exception); - Status Contains(const ObjectID &object_id, bool *has_object) override; + Status Contains(const ObjectID &object_id, bool *has_object); Status Wait(const std::unordered_set &object_ids, int num_objects, int64_t timeout_ms, const TaskID &task_id, - std::unordered_set *ready) override; + std::unordered_set *ready); - Status Delete(const std::vector &object_ids, bool local_only = true, - bool delete_creating_tasks = false) override; + Status Delete(const std::unordered_set &object_ids, bool local_only, + bool delete_creating_tasks); - std::string MemoryUsageString() override; + std::string MemoryUsageString(); private: /// Ask the raylet to fetch a set of objects and then attempt to get them @@ -81,7 +80,7 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { static void WarnIfAttemptedTooManyTimes(int num_attempts, const std::unordered_set &remaining); - std::unique_ptr &raylet_client_; + const std::unique_ptr &raylet_client_; plasma::PlasmaClient store_client_; std::mutex store_client_mutex_; std::function check_signals_; diff --git a/src/ray/core_worker/store_provider/store_provider.h b/src/ray/core_worker/store_provider/store_provider.h deleted file mode 100644 index ca389f232..000000000 --- a/src/ray/core_worker/store_provider/store_provider.h +++ /dev/null @@ -1,110 +0,0 @@ -#ifndef RAY_CORE_WORKER_STORE_PROVIDER_H -#define RAY_CORE_WORKER_STORE_PROVIDER_H - -#include "ray/common/buffer.h" -#include "ray/common/id.h" -#include "ray/common/status.h" -#include "ray/core_worker/common.h" - -namespace ray { - -/// Provider interface for store access. Store provider should inherit from this class and -/// provide implementions for the methods. The actual store provider may use a plasma -/// store or local memory store in worker process, or possibly other types of storage. - -class CoreWorkerStoreProvider { - public: - CoreWorkerStoreProvider() {} - - virtual ~CoreWorkerStoreProvider() {} - - /// Set options for this client's interactions with the object store. - /// - /// \param[in] name Unique name for this object store client. - /// \param[in] limit The maximum amount of memory in bytes that this client - /// can use in the object store. - virtual Status SetClientOptions(std::string name, int64_t limit_bytes) = 0; - - /// Put an object with specified ID into object store. - /// - /// \param[in] object The ray object. - /// \param[in] object_id Object ID specified by user. - /// \return Status. - virtual Status Put(const RayObject &object, const ObjectID &object_id) = 0; - - /// Create and return a buffer in the object store that can be directly written - /// into. After writing to the buffer, the caller must call `Seal()` to finalize - /// the object. The `Create()` and `Seal()` combination is an alternative interface - /// to `Put()` that allows frontends to avoid an extra copy when possible. - /// - /// \param[in] metadata Metadata of the object to be written. - /// \param[in] data_size Size of the object to be written. - /// \param[in] object_id Object ID specified by the user. - /// \param[out] data Buffer for the user to write the object into. - /// \return Status. - virtual Status Create(const std::shared_ptr &metadata, const size_t data_size, - const ObjectID &object_id, std::shared_ptr *data) = 0; - - /// Finalize placing an object into the object store. This should be called after - /// a corresponding `Create()` call and then writing into the returned buffer. - /// - /// \param[in] object_id Object ID corresponding to the object. - /// \return Status. - virtual Status Seal(const ObjectID &object_id) = 0; - - /// Get a set of objects from the object store. - /// - /// \param[in] object_ids IDs of the objects to get. - /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. - /// \param[in] task_id ID for the current task. - /// \param[out] results Map of objects to write results into. Get will only add to this - /// map, not clear or remove from it, so the caller can pass in a non-empty map. - /// \param[out] got_exception Set to true if any of the fetched results were an - /// exception. - /// \return Status. - virtual Status Get(const std::unordered_set &object_ids, int64_t timeout_ms, - const TaskID &task_id, - std::unordered_map> *results, - bool *got_exception) = 0; - - /// Return whether or not the object store contains the given object. - /// - /// \param[in] object_id ID of the objects to check for. - /// \param[out] has_object Whether or not the object is present. - /// \return Status. - virtual Status Contains(const ObjectID &object_id, bool *has_object) = 0; - - /// Wait for a list of objects to appear in the object store. Objects that appear will - /// be added to the ready set. - /// - /// \param[in] object_ids IDs of the objects to wait for. - /// \param[in] num_objects Number of objects that should appear before returning. - /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. - /// \param[in] task_id ID for the current task. - /// \param[out] ready IDs of objects that have appeared. Wait will only add to this - /// set, not clear or remove from it, so the caller can pass in a non-empty set. - /// \return Status. - virtual Status Wait(const std::unordered_set &object_ids, int num_objects, - int64_t timeout_ms, const TaskID &task_id, - std::unordered_set *ready) = 0; - - /// Delete a list of objects from the object store. - /// - /// \param[in] object_ids IDs of the objects to delete. - /// \param[in] local_only Whether only delete the objects in local node, or all nodes in - /// the cluster. - /// \param[in] delete_creating_tasks Whether also delete the tasks that - /// created these objects. - /// \return Status. - virtual Status Delete(const std::vector &object_ids, bool local_only = true, - bool delete_creating_tasks = false) = 0; - - /// Get a string describing object store memory usage for debugging purposes. - /// - /// \return std::string The string describing memory usage. - virtual std::string MemoryUsageString() = 0; -}; - -} // namespace ray - -#endif // RAY_CORE_WORKER_STORE_PROVIDER_H diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index ee6cc0e07..cd78b30cb 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -168,9 +168,6 @@ class CoreWorkerTest : public ::testing::Test { void TearDown() {} - // Test tore provider. - void TestStoreProvider(StoreProviderType type); - // Test normal tasks. void TestNormalTask(std::unordered_map &resources); @@ -223,7 +220,7 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map &res auto buffer2 = GenerateRandomBuffer(); ObjectID object_id; - RAY_CHECK_OK(driver.Objects().Put(RayObject(buffer2, nullptr), &object_id)); + RAY_CHECK_OK(driver.Put(RayObject(buffer2, nullptr), &object_id)); std::vector args; args.emplace_back( @@ -239,7 +236,7 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map &res ASSERT_EQ(return_ids.size(), 1); std::vector> results; - RAY_CHECK_OK(driver.Objects().Get(return_ids, -1, &results)); + RAY_CHECK_OK(driver.Get(return_ids, -1, &results)); ASSERT_EQ(results.size(), 1); ASSERT_EQ(results[0]->GetData()->Size(), buffer1->Size() + buffer2->Size()); @@ -286,7 +283,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso is_direct_call ? TaskTransportType::DIRECT_ACTOR : TaskTransportType::RAYLET); std::vector> results; - RAY_CHECK_OK(driver.Objects().Get(return_ids, -1, &results)); + RAY_CHECK_OK(driver.Get(return_ids, -1, &results)); ASSERT_EQ(results.size(), 1); ASSERT_EQ(results[0]->GetData()->Size(), buffer1->Size() + buffer2->Size()); @@ -307,7 +304,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso auto buffer2 = std::make_shared(array2, sizeof(array2)); ObjectID object_id; - RAY_CHECK_OK(driver.Objects().Put(RayObject(buffer1, nullptr), &object_id)); + RAY_CHECK_OK(driver.Put(RayObject(buffer1, nullptr), &object_id)); // Create arguments with PassByRef and PassByValue. std::vector args; @@ -330,7 +327,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso ASSERT_EQ(return_ids.size(), 1); std::vector> results; - RAY_CHECK_OK(driver.Objects().Get(return_ids, -1, &results)); + RAY_CHECK_OK(driver.Get(return_ids, -1, &results)); ASSERT_EQ(results.size(), 1); ASSERT_EQ(results[0]->GetData()->Size(), buffer1->Size() + buffer2->Size()); @@ -390,7 +387,7 @@ void CoreWorkerTest::TestActorReconstruction( // Verify if it's expected data. std::vector> results; - RAY_CHECK_OK(driver.Objects().Get(return_ids, -1, &results)); + RAY_CHECK_OK(driver.Get(return_ids, -1, &results)); ASSERT_EQ(results[0]->GetData()->Size(), buffer1->Size()); ASSERT_EQ(*results[0]->GetData(), *buffer1); } @@ -441,7 +438,7 @@ void CoreWorkerTest::TestActorFailure(std::unordered_map &r std::vector return_ids; return_ids.push_back(entry.first); std::vector> results; - RAY_CHECK_OK(driver.Objects().Get(return_ids, -1, &results)); + RAY_CHECK_OK(driver.Get(return_ids, -1, &results)); ASSERT_EQ(results.size(), 1); if (results[0]->HasMetadata()) { @@ -457,143 +454,6 @@ void CoreWorkerTest::TestActorFailure(std::unordered_map &r } } -void CoreWorkerTest::TestStoreProvider(StoreProviderType type) { - std::unique_ptr provider_ptr; - std::shared_ptr memory_store; - - switch (type) { - case StoreProviderType::MEMORY: - memory_store = std::make_shared(); - provider_ptr = std::unique_ptr( - new CoreWorkerMemoryStoreProvider(memory_store)); - break; - default: - RAY_LOG(FATAL) << "unspported store provider type " << static_cast(type); - break; - } - - auto &provider = *provider_ptr; - - uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; - uint8_t array2[] = {10, 11, 12, 13, 14, 15}; - - std::vector buffers; - buffers.emplace_back(std::make_shared(array1, sizeof(array1)), - std::make_shared(array1, sizeof(array1) / 2)); - buffers.emplace_back(std::make_shared(array2, sizeof(array2)), - std::make_shared(array2, sizeof(array2) / 2)); - - std::vector ids(buffers.size()); - for (size_t i = 0; i < ids.size(); i++) { - ids[i] = ObjectID::FromRandom(); - RAY_CHECK_OK(provider.Put(buffers[i], ids[i])); - } - - std::unordered_set wait_ids(ids.begin(), ids.end()); - std::unordered_set wait_results; - - ObjectID nonexistent_id = ObjectID::FromRandom(); - wait_ids.insert(nonexistent_id); - RAY_CHECK_OK( - provider.Wait(wait_ids, ids.size() + 1, 100, RandomTaskId(), &wait_results)); - ASSERT_EQ(wait_results.size(), ids.size()); - ASSERT_TRUE(wait_results.count(nonexistent_id) == 0); - - // Test Wait() where the required `num_objects` is less than size of `wait_ids`. - wait_results.clear(); - RAY_CHECK_OK(provider.Wait(wait_ids, ids.size(), -1, RandomTaskId(), &wait_results)); - ASSERT_EQ(wait_results.size(), ids.size()); - ASSERT_TRUE(wait_results.count(nonexistent_id) == 0); - - // Test Get(). - bool got_exception = false; - std::unordered_map> results; - std::unordered_set ids_set(ids.begin(), ids.end()); - RAY_CHECK_OK(provider.Get(ids_set, -1, RandomTaskId(), &results, &got_exception)); - - ASSERT_TRUE(!got_exception); - ASSERT_EQ(results.size(), ids.size()); - for (size_t i = 0; i < ids.size(); i++) { - const auto &expected = buffers[i]; - ASSERT_EQ(results[ids[i]]->GetData()->Size(), expected.GetData()->Size()); - ASSERT_EQ(memcmp(results[ids[i]]->GetData()->Data(), expected.GetData()->Data(), - expected.GetData()->Size()), - 0); - ASSERT_EQ(results[ids[i]]->GetMetadata()->Size(), expected.GetMetadata()->Size()); - ASSERT_EQ(memcmp(results[ids[i]]->GetMetadata()->Data(), - expected.GetMetadata()->Data(), expected.GetMetadata()->Size()), - 0); - } - - // Test Delete(). - // clear the reference held. - results.clear(); - - RAY_CHECK_OK(provider.Delete(ids, true, false)); - - usleep(200 * 1000); - RAY_CHECK_OK(provider.Get(ids_set, 0, RandomTaskId(), &results, &got_exception)); - ASSERT_TRUE(!got_exception); - ASSERT_EQ(results.size(), 0); - - // Test Wait() with objects which will become ready later. - std::vector ready_ids(buffers.size()); - std::vector unready_ids(buffers.size()); - for (size_t i = 0; i < unready_ids.size(); i++) { - ready_ids[i] = ObjectID::FromRandom(); - RAY_CHECK_OK(provider.Put(buffers[i], ready_ids[i])); - unready_ids[i] = ObjectID::FromRandom(); - } - - auto thread_func = [&unready_ids, &provider, &buffers]() { - sleep(1); - - for (size_t i = 0; i < unready_ids.size(); i++) { - RAY_CHECK_OK(provider.Put(buffers[i], unready_ids[i])); - } - }; - - std::thread async_thread(thread_func); - - wait_ids.clear(); - wait_ids.insert(ready_ids.begin(), ready_ids.end()); - wait_ids.insert(unready_ids.begin(), unready_ids.end()); - wait_results.clear(); - - // Check that only the ready ids are returned when timeout ends before thread runs. - RAY_CHECK_OK( - provider.Wait(wait_ids, ready_ids.size() + 1, 100, RandomTaskId(), &wait_results)); - ASSERT_EQ(ready_ids.size(), wait_results.size()); - for (const auto &ready_id : ready_ids) { - ASSERT_TRUE(wait_results.find(ready_id) != wait_results.end()); - } - for (const auto &unready_id : unready_ids) { - ASSERT_TRUE(wait_results.find(unready_id) == wait_results.end()); - } - - wait_results.clear(); - // Check that enough objects are returned after the thread inserts at least one object. - RAY_CHECK_OK( - provider.Wait(wait_ids, ready_ids.size() + 1, 5000, RandomTaskId(), &wait_results)); - ASSERT_TRUE(wait_results.size() >= ready_ids.size() + 1); - for (const auto &ready_id : ready_ids) { - ASSERT_TRUE(wait_results.find(ready_id) != wait_results.end()); - } - - wait_results.clear(); - // Check that all objects are returned after the thread completes. - async_thread.join(); - RAY_CHECK_OK( - provider.Wait(wait_ids, wait_ids.size(), -1, RandomTaskId(), &wait_results)); - ASSERT_EQ(wait_results.size(), ready_ids.size() + unready_ids.size()); - for (const auto &ready_id : ready_ids) { - ASSERT_TRUE(wait_results.find(ready_id) != wait_results.end()); - } - for (const auto &unready_id : unready_ids) { - ASSERT_TRUE(wait_results.find(unready_id) != wait_results.end()); - } -} - class ZeroNodeTest : public CoreWorkerTest { public: ZeroNodeTest() : CoreWorkerTest(0) {} @@ -716,7 +576,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { for (const auto &object_id : object_ids) { std::vector> results; - RAY_CHECK_OK(driver.Objects().Get({object_id}, -1, &results)); + RAY_CHECK_OK(driver.Get({object_id}, -1, &results)); ASSERT_EQ(results.size(), 1); } RAY_LOG(INFO) << "finish executing " << num_tasks << " tasks" @@ -766,7 +626,132 @@ TEST_F(ZeroNodeTest, TestActorHandle) { } TEST_F(SingleNodeTest, TestMemoryStoreProvider) { - TestStoreProvider(StoreProviderType::MEMORY); + std::shared_ptr memory_store = + std::make_shared(); + std::unique_ptr provider_ptr = + std::unique_ptr( + new CoreWorkerMemoryStoreProvider(memory_store)); + + auto &provider = *provider_ptr; + + uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; + uint8_t array2[] = {10, 11, 12, 13, 14, 15}; + + std::vector buffers; + buffers.emplace_back(std::make_shared(array1, sizeof(array1)), + std::make_shared(array1, sizeof(array1) / 2)); + buffers.emplace_back(std::make_shared(array2, sizeof(array2)), + std::make_shared(array2, sizeof(array2) / 2)); + + std::vector ids(buffers.size()); + for (size_t i = 0; i < ids.size(); i++) { + ids[i] = ObjectID::FromRandom(); + RAY_CHECK_OK(provider.Put(buffers[i], ids[i])); + } + + std::unordered_set wait_ids(ids.begin(), ids.end()); + std::unordered_set wait_results; + + ObjectID nonexistent_id = ObjectID::FromRandom(); + wait_ids.insert(nonexistent_id); + RAY_CHECK_OK( + provider.Wait(wait_ids, ids.size() + 1, 100, RandomTaskId(), &wait_results)); + ASSERT_EQ(wait_results.size(), ids.size()); + ASSERT_TRUE(wait_results.count(nonexistent_id) == 0); + + // Test Wait() where the required `num_objects` is less than size of `wait_ids`. + wait_results.clear(); + RAY_CHECK_OK(provider.Wait(wait_ids, ids.size(), -1, RandomTaskId(), &wait_results)); + ASSERT_EQ(wait_results.size(), ids.size()); + ASSERT_TRUE(wait_results.count(nonexistent_id) == 0); + + // Test Get(). + bool got_exception = false; + std::unordered_map> results; + std::unordered_set ids_set(ids.begin(), ids.end()); + RAY_CHECK_OK(provider.Get(ids_set, -1, RandomTaskId(), &results, &got_exception)); + + ASSERT_TRUE(!got_exception); + ASSERT_EQ(results.size(), ids.size()); + for (size_t i = 0; i < ids.size(); i++) { + const auto &expected = buffers[i]; + ASSERT_EQ(results[ids[i]]->GetData()->Size(), expected.GetData()->Size()); + ASSERT_EQ(memcmp(results[ids[i]]->GetData()->Data(), expected.GetData()->Data(), + expected.GetData()->Size()), + 0); + ASSERT_EQ(results[ids[i]]->GetMetadata()->Size(), expected.GetMetadata()->Size()); + ASSERT_EQ(memcmp(results[ids[i]]->GetMetadata()->Data(), + expected.GetMetadata()->Data(), expected.GetMetadata()->Size()), + 0); + } + + // Test Delete(). + // clear the reference held. + results.clear(); + + RAY_CHECK_OK(provider.Delete(ids_set)); + + usleep(200 * 1000); + RAY_CHECK_OK(provider.Get(ids_set, 0, RandomTaskId(), &results, &got_exception)); + ASSERT_TRUE(!got_exception); + ASSERT_EQ(results.size(), 0); + + // Test Wait() with objects which will become ready later. + std::vector ready_ids(buffers.size()); + std::vector unready_ids(buffers.size()); + for (size_t i = 0; i < unready_ids.size(); i++) { + ready_ids[i] = ObjectID::FromRandom(); + RAY_CHECK_OK(provider.Put(buffers[i], ready_ids[i])); + unready_ids[i] = ObjectID::FromRandom(); + } + + auto thread_func = [&unready_ids, &provider, &buffers]() { + sleep(1); + + for (size_t i = 0; i < unready_ids.size(); i++) { + RAY_CHECK_OK(provider.Put(buffers[i], unready_ids[i])); + } + }; + + std::thread async_thread(thread_func); + + wait_ids.clear(); + wait_ids.insert(ready_ids.begin(), ready_ids.end()); + wait_ids.insert(unready_ids.begin(), unready_ids.end()); + wait_results.clear(); + + // Check that only the ready ids are returned when timeout ends before thread runs. + RAY_CHECK_OK( + provider.Wait(wait_ids, ready_ids.size() + 1, 100, RandomTaskId(), &wait_results)); + ASSERT_EQ(ready_ids.size(), wait_results.size()); + for (const auto &ready_id : ready_ids) { + ASSERT_TRUE(wait_results.find(ready_id) != wait_results.end()); + } + for (const auto &unready_id : unready_ids) { + ASSERT_TRUE(wait_results.find(unready_id) == wait_results.end()); + } + + wait_results.clear(); + // Check that enough objects are returned after the thread inserts at least one object. + RAY_CHECK_OK( + provider.Wait(wait_ids, ready_ids.size() + 1, 5000, RandomTaskId(), &wait_results)); + ASSERT_TRUE(wait_results.size() >= ready_ids.size() + 1); + for (const auto &ready_id : ready_ids) { + ASSERT_TRUE(wait_results.find(ready_id) != wait_results.end()); + } + + wait_results.clear(); + // Check that all objects are returned after the thread completes. + async_thread.join(); + RAY_CHECK_OK( + provider.Wait(wait_ids, wait_ids.size(), -1, RandomTaskId(), &wait_results)); + ASSERT_EQ(wait_results.size(), ready_ids.size() + unready_ids.size()); + for (const auto &ready_id : ready_ids) { + ASSERT_TRUE(wait_results.find(ready_id) != wait_results.end()); + } + for (const auto &unready_id : unready_ids) { + ASSERT_TRUE(wait_results.find(unready_id) != wait_results.end()); + } } TEST_F(SingleNodeTest, TestObjectInterface) { @@ -785,12 +770,12 @@ TEST_F(SingleNodeTest, TestObjectInterface) { std::vector ids(buffers.size()); for (size_t i = 0; i < ids.size(); i++) { - RAY_CHECK_OK(core_worker.Objects().Put(buffers[i], &ids[i])); + RAY_CHECK_OK(core_worker.Put(buffers[i], &ids[i])); } // Test Get(). std::vector> results; - RAY_CHECK_OK(core_worker.Objects().Get(ids, -1, &results)); + RAY_CHECK_OK(core_worker.Get(ids, -1, &results)); ASSERT_EQ(results.size(), ids.size()); for (size_t i = 0; i < ids.size(); i++) { ASSERT_EQ(*results[i]->GetData(), *buffers[i].GetData()); @@ -808,9 +793,8 @@ TEST_F(SingleNodeTest, TestObjectInterface) { nullptr, std::make_shared( reinterpret_cast(error_buffer), len)); - RAY_CHECK_OK(core_worker.Objects().Put(buffers_with_exception.back(), - ids_with_exception.back())); - RAY_CHECK_OK(core_worker.Objects().Get(ids_with_exception, -1, &results)); + RAY_CHECK_OK(core_worker.Put(buffers_with_exception.back(), ids_with_exception.back())); + RAY_CHECK_OK(core_worker.Get(ids_with_exception, -1, &results)); // Test Wait(). ObjectID non_existent_id = ObjectID::FromRandom(); @@ -818,24 +802,24 @@ TEST_F(SingleNodeTest, TestObjectInterface) { all_ids.push_back(non_existent_id); std::vector wait_results; - RAY_CHECK_OK(core_worker.Objects().Wait(all_ids, 2, -1, &wait_results)); + RAY_CHECK_OK(core_worker.Wait(all_ids, 2, -1, &wait_results)); ASSERT_EQ(wait_results.size(), 3); ASSERT_EQ(wait_results, std::vector({true, true, false})); - RAY_CHECK_OK(core_worker.Objects().Wait(all_ids, 3, 100, &wait_results)); + RAY_CHECK_OK(core_worker.Wait(all_ids, 3, 100, &wait_results)); ASSERT_EQ(wait_results.size(), 3); ASSERT_EQ(wait_results, std::vector({true, true, false})); // Test Delete(). // clear the reference held by PlasmaBuffer. results.clear(); - RAY_CHECK_OK(core_worker.Objects().Delete(ids, true, false)); + RAY_CHECK_OK(core_worker.Delete(ids, true, false)); // Note that Delete() calls RayletClient::FreeObjects and would not // wait for objects being deleted, so wait a while for plasma store // to process the command. usleep(200 * 1000); - RAY_CHECK_OK(core_worker.Objects().Get(ids, 0, &results)); + RAY_CHECK_OK(core_worker.Get(ids, 0, &results)); ASSERT_EQ(results.size(), 2); ASSERT_TRUE(!results[0]); ASSERT_TRUE(!results[1]); @@ -859,12 +843,12 @@ TEST_F(TwoNodeTest, TestObjectInterfaceCrossNodes) { std::vector ids(buffers.size()); for (size_t i = 0; i < ids.size(); i++) { - RAY_CHECK_OK(worker1.Objects().Put(RayObject(buffers[i], nullptr), &ids[i])); + RAY_CHECK_OK(worker1.Put(RayObject(buffers[i], nullptr), &ids[i])); } // Test Get() from remote node. std::vector> results; - RAY_CHECK_OK(worker2.Objects().Get(ids, -1, &results)); + RAY_CHECK_OK(worker2.Get(ids, -1, &results)); ASSERT_EQ(results.size(), 2); for (size_t i = 0; i < ids.size(); i++) { @@ -878,30 +862,30 @@ TEST_F(TwoNodeTest, TestObjectInterfaceCrossNodes) { all_ids.push_back(non_existent_id); std::vector wait_results; - RAY_CHECK_OK(worker2.Objects().Wait(all_ids, 2, -1, &wait_results)); + RAY_CHECK_OK(worker2.Wait(all_ids, 2, -1, &wait_results)); ASSERT_EQ(wait_results.size(), 3); ASSERT_EQ(wait_results, std::vector({true, true, false})); - RAY_CHECK_OK(worker2.Objects().Wait(all_ids, 3, 100, &wait_results)); + RAY_CHECK_OK(worker2.Wait(all_ids, 3, 100, &wait_results)); ASSERT_EQ(wait_results.size(), 3); ASSERT_EQ(wait_results, std::vector({true, true, false})); // Test Delete() from all machines. // clear the reference held by PlasmaBuffer. results.clear(); - RAY_CHECK_OK(worker2.Objects().Delete(ids, false, false)); + RAY_CHECK_OK(worker2.Delete(ids, false, false)); // Note that Delete() calls RayletClient::FreeObjects and would not // wait for objects being deleted, so wait a while for plasma store // to process the command. usleep(1000 * 1000); // Verify objects are deleted from both machines. - RAY_CHECK_OK(worker2.Objects().Get(ids, 0, &results)); + RAY_CHECK_OK(worker2.Get(ids, 0, &results)); ASSERT_EQ(results.size(), 2); ASSERT_TRUE(!results[0]); ASSERT_TRUE(!results[1]); - RAY_CHECK_OK(worker1.Objects().Get(ids, 0, &results)); + RAY_CHECK_OK(worker1.Get(ids, 0, &results)); ASSERT_EQ(results.size(), 2); ASSERT_TRUE(!results[0]); ASSERT_TRUE(!results[1]); diff --git a/src/ray/core_worker/test/mock_worker.cc b/src/ray/core_worker/test/mock_worker.cc index 74f4b5671..c50d4187c 100644 --- a/src/ray/core_worker/test/mock_worker.cc +++ b/src/ray/core_worker/test/mock_worker.cc @@ -1,7 +1,6 @@ #define BOOST_BIND_NO_PLACEHOLDERS #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" -#include "ray/core_worker/store_provider/store_provider.h" #include "src/ray/util/test_util.h" using namespace std::placeholders; diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 1b14dee76..993d780d5 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -16,7 +16,7 @@ bool HasByReferenceArgs(const TaskSpecification &spec) { CoreWorkerDirectActorTaskSubmitter::CoreWorkerDirectActorTaskSubmitter( boost::asio::io_service &io_service, - std::unique_ptr store_provider) + std::unique_ptr store_provider) : io_service_(io_service), client_call_manager_(io_service), store_provider_(std::move(store_provider)) {} @@ -199,12 +199,10 @@ bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) c } CoreWorkerDirectActorTaskReceiver::CoreWorkerDirectActorTaskReceiver( - WorkerContext &worker_context, CoreWorkerObjectInterface &object_interface, - boost::asio::io_service &io_service, rpc::GrpcServer &server, - const TaskHandler &task_handler) + WorkerContext &worker_context, boost::asio::io_service &io_service, + rpc::GrpcServer &server, const TaskHandler &task_handler) : worker_context_(worker_context), io_service_(io_service), - object_interface_(object_interface), task_service_(io_service, *this), task_handler_(task_handler) { server.RegisterService(task_service_); diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index 137dc9c19..51d929e8a 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -6,7 +6,9 @@ #include #include "ray/common/id.h" -#include "ray/core_worker/object_interface.h" +#include "ray/common/ray_object.h" +#include "ray/core_worker/context.h" +#include "ray/core_worker/store_provider/memory_store_provider.h" #include "ray/gcs/redis_gcs_client.h" #include "ray/rpc/worker/direct_actor_client.h" #include "ray/rpc/worker/direct_actor_server.h" @@ -36,7 +38,7 @@ class CoreWorkerDirectActorTaskSubmitter { public: CoreWorkerDirectActorTaskSubmitter( boost::asio::io_service &io_service, - std::unique_ptr store_provider); + std::unique_ptr store_provider); /// Submit a task to an actor for execution. /// @@ -118,7 +120,7 @@ class CoreWorkerDirectActorTaskSubmitter { std::unordered_map> waiting_reply_tasks_; /// The store provider. - std::unique_ptr store_provider_; + std::unique_ptr store_provider_; friend class CoreWorkerTest; }; @@ -202,7 +204,6 @@ class CoreWorkerDirectActorTaskReceiver : public rpc::DirectActorHandler { std::vector> *results)>; CoreWorkerDirectActorTaskReceiver(WorkerContext &worker_context, - CoreWorkerObjectInterface &object_interface, boost::asio::io_service &io_service, rpc::GrpcServer &server, const TaskHandler &task_handler); @@ -222,8 +223,6 @@ class CoreWorkerDirectActorTaskReceiver : public rpc::DirectActorHandler { WorkerContext &worker_context_; /// The IO event loop. boost::asio::io_service &io_service_; - // Object interface. - CoreWorkerObjectInterface &object_interface_; /// The rpc service for `DirectActorService`. rpc::DirectActorGrpcService task_service_; /// The callback function to process a task. diff --git a/src/ray/core_worker/transport/raylet_transport.cc b/src/ray/core_worker/transport/raylet_transport.cc index d65a20d5f..21b955d1b 100644 --- a/src/ray/core_worker/transport/raylet_transport.cc +++ b/src/ray/core_worker/transport/raylet_transport.cc @@ -7,11 +7,10 @@ namespace ray { CoreWorkerRayletTaskReceiver::CoreWorkerRayletTaskReceiver( WorkerContext &worker_context, std::unique_ptr &raylet_client, - CoreWorkerObjectInterface &object_interface, boost::asio::io_service &io_service, - rpc::GrpcServer &server, const TaskHandler &task_handler) + boost::asio::io_service &io_service, rpc::GrpcServer &server, + const TaskHandler &task_handler) : worker_context_(worker_context), raylet_client_(raylet_client), - object_interface_(object_interface), task_service_(io_service, *this), task_handler_(task_handler) { server.RegisterService(task_service_); @@ -67,24 +66,6 @@ void CoreWorkerRayletTaskReceiver::HandleAssignTask( RAY_LOG(DEBUG) << "Assigned task " << task_spec.TaskId() << " finished execution. num_returns: " << num_returns; - if (results.size() != 0) { - RAY_CHECK(results.size() == num_returns); - for (size_t i = 0; i < num_returns; i++) { - ObjectID id = ObjectID::ForTaskReturn( - task_spec.TaskId(), /*index=*/i + 1, - /*transport_type=*/static_cast(TaskTransportType::RAYLET)); - Status status = object_interface_.Put(*results[i], id); - if (!status.ok()) { - // NOTE(hchen): `PlasmaObjectExists` error is already ignored inside - // `ObjectInterface::Put`, we treat other error types as fatal here. - RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to put object " << id - << " in store: " << status.message(); - } else { - RAY_LOG(DEBUG) << "Task " << task_spec.TaskId() << " put object " << id - << " in store."; - } - } - } // Notify raylet that current task is done via a `TaskDone` message. This is to // ensure that the task is marked as finished by raylet only after previous diff --git a/src/ray/core_worker/transport/raylet_transport.h b/src/ray/core_worker/transport/raylet_transport.h index 21cd31bdb..5597813ba 100644 --- a/src/ray/core_worker/transport/raylet_transport.h +++ b/src/ray/core_worker/transport/raylet_transport.h @@ -3,7 +3,8 @@ #include -#include "ray/core_worker/object_interface.h" +#include "ray/common/ray_object.h" +#include "ray/core_worker/context.h" #include "ray/raylet/raylet_client.h" #include "ray/rpc/worker/worker_server.h" @@ -17,7 +18,6 @@ class CoreWorkerRayletTaskReceiver : public rpc::WorkerTaskHandler { CoreWorkerRayletTaskReceiver(WorkerContext &worker_context, std::unique_ptr &raylet_client, - CoreWorkerObjectInterface &object_interface, boost::asio::io_service &io_service, rpc::GrpcServer &server, const TaskHandler &task_handler); @@ -37,8 +37,6 @@ class CoreWorkerRayletTaskReceiver : public rpc::WorkerTaskHandler { WorkerContext &worker_context_; /// Raylet client. std::unique_ptr &raylet_client_; - // Object interface. - CoreWorkerObjectInterface &object_interface_; /// The rpc service for `WorkerTaskService`. rpc::WorkerTaskGrpcService task_service_; /// The callback function to process a task.