From 4a28306186ee5f8329c3949e463be02f488b68f5 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 7 Nov 2019 21:28:55 -0800 Subject: [PATCH] Allow large returns from direct actor calls (#6088) --- python/ray/includes/task.pxd | 2 +- python/ray/includes/task.pxi | 2 +- python/ray/tests/test_basic.py | 19 +++ python/ray/worker.py | 2 + src/ray/common/id.cc | 12 +- src/ray/common/id.h | 5 + src/ray/common/ray_config_def.h | 4 + src/ray/common/ray_object.cc | 9 ++ src/ray/common/ray_object.h | 4 + src/ray/common/task/task_spec.cc | 8 +- src/ray/common/task/task_spec.h | 6 +- src/ray/core_worker/core_worker.cc | 114 ++++++++++++------ src/ray/core_worker/core_worker.h | 8 +- .../memory_store/memory_store.cc | 7 ++ .../memory_store/memory_store.h | 6 + .../store_provider/memory_store_provider.cc | 6 + .../store_provider/memory_store_provider.h | 2 + .../transport/direct_actor_transport.cc | 77 +++++++----- .../transport/direct_actor_transport.h | 4 +- .../core_worker/transport/raylet_transport.cc | 2 - .../core_worker/transport/raylet_transport.h | 2 +- src/ray/protobuf/core_worker.proto | 7 +- src/ray/protobuf/gcs.proto | 4 + src/ray/raylet/lineage_cache_test.cc | 8 +- src/ray/raylet/node_manager.cc | 4 +- .../raylet/task_dependency_manager_test.cc | 10 +- 26 files changed, 237 insertions(+), 97 deletions(-) diff --git a/python/ray/includes/task.pxd b/python/ray/includes/task.pxd index bfd31fc93..4b742c3f4 100644 --- a/python/ray/includes/task.pxd +++ b/python/ray/includes/task.pxd @@ -52,7 +52,7 @@ cdef extern from "ray/common/task/task_spec.h" nogil: c_bool ArgByRef(uint64_t arg_index) const int ArgIdCount(uint64_t arg_index) const CObjectID ArgId(uint64_t arg_index, uint64_t id_index) const - CObjectID ReturnId(uint64_t return_index) const + CObjectID ReturnIdForPlasma(uint64_t return_index) const const uint8_t *ArgData(uint64_t arg_index) const size_t ArgDataSize(uint64_t arg_index) const const uint8_t *ArgMetadata(uint64_t arg_index) const diff --git a/python/ray/includes/task.pxi b/python/ray/includes/task.pxi index 6c37a3247..d6bfd3e31 100644 --- a/python/ray/includes/task.pxi +++ b/python/ray/includes/task.pxi @@ -108,7 +108,7 @@ cdef class TaskSpec: return_id_list = [] for i in range(self.task_spec.get().NumReturns()): return_id_list.append( - ObjectID(self.task_spec.get().ReturnId(i).Binary())) + ObjectID(self.task_spec.get().ReturnIdForPlasma(i).Binary())) return return_id_list def required_resources(self): diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index d9554b7ef..c368d46b7 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1206,6 +1206,25 @@ def test_direct_actor_enabled(ray_start_regular): assert ray.get(obj_id) == 2 +def test_direct_actor_large_objects(ray_start_regular): + @ray.remote + class Actor(object): + def __init__(self): + pass + + def f(self): + time.sleep(1) + return np.zeros(10000000) + + a = Actor._remote(is_direct_call=True) + obj_id = a.f.remote() + assert not ray.worker.global_worker.core_worker.object_exists(obj_id) + done, _ = ray.wait([obj_id]) + assert len(done) == 1 + assert ray.worker.global_worker.core_worker.object_exists(obj_id) + assert isinstance(ray.get(obj_id), np.ndarray) + + def test_direct_actor_errors(ray_start_regular): @ray.remote class Actor(object): diff --git a/python/ray/worker.py b/python/ray/worker.py index 8b95fa431..4fd7827c7 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -463,6 +463,8 @@ class Worker(object): elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"): return UnreconstructableError(ray.ObjectID(object_id.binary())) else: + assert error_type != ErrorType.Value("OBJECT_IN_PLASMA"), \ + "Tried to get object that has been promoted to plasma." assert False, "Unrecognized error type " + str(error_type) elif data: # If data is not empty, deserialize the object. diff --git a/src/ray/common/id.cc b/src/ray/common/id.cc index 2612c7008..9be9cb138 100644 --- a/src/ray/common/id.cc +++ b/src/ray/common/id.cc @@ -73,9 +73,11 @@ inline void SetObjectTypeFlag(ObjectType object_type, ObjectIDFlagsType *flags) } inline void SetTransportTypeFlag(uint8_t transport_type, ObjectIDFlagsType *flags) { + // TODO(ekl) we should be masking for all the SET operations in this file. + auto mask = static_cast(1) << kTransportTypeBitsOffset; const ObjectIDFlagsType transport_type_bits = static_cast(transport_type) << kTransportTypeBitsOffset; - *flags = (*flags bitor transport_type_bits); + *flags = ((*flags bitand ~mask) bitor transport_type_bits); } inline bool CreatedByTask(ObjectIDFlagsType flags) { @@ -148,6 +150,14 @@ bool ObjectID::IsReturnObject() const { return ::ray::GetObjectType(this->GetFlags()) == ObjectType::RETURN_OBJECT; } +ObjectID ObjectID::WithTransportType(TaskTransportType transport_type) const { + ObjectID copy = ObjectID::FromBinary(Binary()); + ObjectIDFlagsType flags = GetFlags(); + SetTransportTypeFlag(static_cast(transport_type), &flags); + std::memcpy(copy.id_ + TaskID::kLength, &flags, sizeof(flags)); + return copy; +} + uint8_t ObjectID::GetTransportType() const { return ::ray::GetTransportType(this->GetFlags()); } diff --git a/src/ray/common/id.h b/src/ray/common/id.h index 637efedae..5a68b0b05 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -294,6 +294,11 @@ class ObjectID : public BaseID { return GetTransportType() == static_cast(TaskTransportType::DIRECT_ACTOR); } + /// Return this object id with a changed transport type. + /// + /// \return Copy of this object id with the specified transport type. + ObjectID WithTransportType(TaskTransportType transport_type) const; + /// Get the transport type of this object. /// /// \return The type of the transport which is used to transfer this object. diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index ef62b960c..1a052c2f8 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -40,6 +40,10 @@ RAY_CONFIG(int64_t, debug_dump_period_milliseconds, 10000) /// type of task from starving other types (see issue #3664). RAY_CONFIG(bool, fair_queueing_enabled, true) +// The max allowed size in bytes of a return object from direct actor calls. +// Objects larger than this size will be spilled to plasma. +RAY_CONFIG(int64_t, max_direct_call_object_size, 100 * 1024) + /// The initial period for a task execution lease. The lease will expire this /// many milliseconds after the first acquisition of the lease. Nodes that /// require an object will not try to reconstruct the task until at least diff --git a/src/ray/common/ray_object.cc b/src/ray/common/ray_object.cc index eb3057fd9..cf5c5c674 100644 --- a/src/ray/common/ray_object.cc +++ b/src/ray/common/ray_object.cc @@ -19,4 +19,13 @@ bool RayObject::IsException() { return false; } +bool RayObject::IsInPlasmaError() { + if (metadata_ == nullptr) { + return false; + } + const std::string metadata(reinterpret_cast(metadata_->Data()), + metadata_->Size()); + return metadata == std::to_string(ray::rpc::ErrorType::OBJECT_IN_PLASMA); +} + } // namespace ray diff --git a/src/ray/common/ray_object.h b/src/ray/common/ray_object.h index 04ff0e9e3..72c44e672 100644 --- a/src/ray/common/ray_object.h +++ b/src/ray/common/ray_object.h @@ -64,6 +64,10 @@ class RayObject { /// Whether the object represents an exception. bool IsException(); + /// Whether the object has been promoted to plasma (i.e., since it was too + /// large to return directly as part of a gRPC response). + bool IsInPlasmaError(); + private: std::shared_ptr data_; std::shared_ptr metadata_; diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 6e6ec3a33..061900aaa 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -74,8 +74,10 @@ size_t TaskSpecification::NumArgs() const { return message_->args_size(); } size_t TaskSpecification::NumReturns() const { return message_->num_returns(); } -ObjectID TaskSpecification::ReturnId(size_t return_index) const { - return ObjectID::ForTaskReturn(TaskId(), return_index + 1, /*transport_type=*/0); +ObjectID TaskSpecification::ReturnId(size_t return_index, + TaskTransportType transport_type) const { + return ObjectID::ForTaskReturn(TaskId(), return_index + 1, + static_cast(transport_type)); } bool TaskSpecification::ArgByRef(size_t arg_index) const { @@ -181,7 +183,7 @@ ObjectID TaskSpecification::PreviousActorTaskDummyObjectId() const { ObjectID TaskSpecification::ActorDummyObject() const { RAY_CHECK(IsActorTask() || IsActorCreationTask()); - return ReturnId(NumReturns() - 1); + return ReturnId(NumReturns() - 1, TaskTransportType::RAYLET); } bool TaskSpecification::IsDirectCall() const { diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index bd925ea35..b4720f7ba 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -72,7 +72,11 @@ class TaskSpecification : public MessageWrapper { ObjectID ArgId(size_t arg_index, size_t id_index) const; - ObjectID ReturnId(size_t return_index) const; + ObjectID ReturnId(size_t return_index, TaskTransportType transport_type) const; + + ObjectID ReturnIdForPlasma(size_t return_index) const { + return ReturnId(return_index, TaskTransportType::RAYLET); + } const uint8_t *ArgData(size_t arg_index) const; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 47971ef4c..c0633fb5c 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -43,22 +43,8 @@ void BuildCommonTaskSpec( void GroupObjectIdsByStoreProvider(const std::vector &object_ids, absl::flat_hash_set *plasma_object_ids, absl::flat_hash_set *memory_object_ids) { - // There are two cases: - // - for task return objects from direct actor call, use memory store provider; - // - all the others use plasma store provider. for (const auto &object_id : object_ids) { - // For raylet transport we always use plasma store provider, for direct actor call - // there are a few cases: - // - objects manually added to store by `ray.put`: for these objects they always use - // plasma store provider; - // - task arguments: these objects are passed by value, and are not put into store; - // - task return objects: these are put into memory store of the task submitter - // and are only used locally. - // Thus we need to check whether this object is a task return object in additional - // to whether it's from direct actor call before we can choose memory store provider. - if (object_id.IsReturnObject() && - object_id.GetTransportType() == - static_cast(ray::TaskTransportType::DIRECT_ACTOR)) { + if (object_id.IsDirectActorType()) { memory_object_ids->insert(object_id); } else { plasma_object_ids->insert(object_id); @@ -296,7 +282,7 @@ Status CoreWorker::Seal(const ObjectID &object_id) { return plasma_store_provider_->Seal(object_id); } -Status CoreWorker::Get(const std::vector &ids, int64_t timeout_ms, +Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_ms, std::vector> *results) { results->resize(ids.size(), nullptr); @@ -312,15 +298,48 @@ Status CoreWorker::Get(const std::vector &ids, int64_t timeout_ms, &result_map, &got_exception)); if (!got_exception) { + int64_t local_timeout_ms = timeout_ms; if (timeout_ms >= 0) { - timeout_ms = std::max(static_cast(0), - timeout_ms - (current_time_ms() - start_time)); + local_timeout_ms = std::max(static_cast(0), + timeout_ms - (current_time_ms() - start_time)); } - RAY_RETURN_NOT_OK(memory_store_provider_->Get(memory_object_ids, timeout_ms, + RAY_RETURN_NOT_OK(memory_store_provider_->Get(memory_object_ids, local_timeout_ms, worker_context_.GetCurrentTaskID(), &result_map, &got_exception)); } + // If any of the objects have been promoted to plasma, then we retry their + // gets at the provider plasma. Once we get the objects from plasma, we flip + // the transport type again and return them for the original direct call ids. + absl::flat_hash_set promoted_plasma_ids; + for (const auto &pair : result_map) { + if (pair.second->IsInPlasmaError()) { + promoted_plasma_ids.insert(pair.first.WithTransportType(TaskTransportType::RAYLET)); + } + } + if (!promoted_plasma_ids.empty()) { + int64_t local_timeout_ms = timeout_ms; + if (timeout_ms >= 0) { + local_timeout_ms = std::max(static_cast(0), + timeout_ms - (current_time_ms() - start_time)); + } + RAY_RETURN_NOT_OK(plasma_store_provider_->Get(promoted_plasma_ids, local_timeout_ms, + worker_context_.GetCurrentTaskID(), + &result_map, &got_exception)); + for (const auto &id : promoted_plasma_ids) { + auto it = result_map.find(id); + if (it == result_map.end()) { + result_map.erase(id.WithTransportType(TaskTransportType::DIRECT_ACTOR)); + } else { + result_map[id.WithTransportType(TaskTransportType::DIRECT_ACTOR)] = it->second; + } + result_map.erase(id); + } + for (const auto &pair : result_map) { + RAY_CHECK(!pair.second->IsInPlasmaError()); + } + } + // Loop through `ids` and fill each entry for the `results` vector, // this ensures that entries `results` have exactly the same order as // they are in `ids`. When there are duplicate object ids, all the entries @@ -335,8 +354,20 @@ Status CoreWorker::Get(const std::vector &ids, int64_t timeout_ms, } Status CoreWorker::Contains(const ObjectID &object_id, bool *has_object) { - // Currently only the Plasma store supports Contains(). - return plasma_store_provider_->Contains(object_id, has_object); + bool found = false; + if (object_id.IsDirectActorType()) { + // Note that the memory store returns false if the object value is + // ErrorType::OBJECT_IN_PLASMA. + RAY_RETURN_NOT_OK(memory_store_provider_->Contains(object_id, &found)); + } + if (!found) { + // We check plasma as a fallback in all cases, since a direct call object + // may have been spilled to plasma. + RAY_RETURN_NOT_OK(plasma_store_provider_->Contains( + object_id.WithTransportType(TaskTransportType::RAYLET), &found)); + } + *has_object = found; + return Status::OK(); } Status CoreWorker::Wait(const std::vector &ids, int num_objects, @@ -375,6 +406,8 @@ Status CoreWorker::Wait(const std::vector &ids, int num_objects, worker_context_.GetCurrentTaskID(), &ready)); } if (memory_object_ids.size() > 0) { + // TODO(ekl) for memory objects that are ErrorType::OBJECT_IN_PLASMA, we should + // consider waiting on them in plasma as well to ensure they are local. RAY_RETURN_NOT_OK(memory_store_provider_->Wait( memory_object_ids, std::max(0, static_cast(ready.size()) - num_objects), /*timeout_ms=*/0, worker_context_.GetCurrentTaskID(), &ready)); @@ -636,12 +669,15 @@ Status CoreWorker::AllocateReturnObjects( bool object_already_exists = false; std::shared_ptr data_buffer; if (data_sizes[i] > 0) { - if (!worker_context_.CurrentActorUseDirectCall()) { - RAY_RETURN_NOT_OK( - Create(metadatas[i], data_sizes[i], object_ids[i], &data_buffer)); - object_already_exists = !data_buffer; - } else { + if (worker_context_.CurrentActorUseDirectCall() && + static_cast(data_sizes[i]) < + RayConfig::instance().max_direct_call_object_size()) { data_buffer = std::make_shared(data_sizes[i]); + } else { + RAY_RETURN_NOT_OK(Create( + metadatas[i], data_sizes[i], + object_ids[i].WithTransportType(TaskTransportType::RAYLET), &data_buffer)); + object_already_exists = !data_buffer; } } // Leave the return object as a nullptr if there is no data or metadata. @@ -657,7 +693,7 @@ Status CoreWorker::AllocateReturnObjects( Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, const ResourceMappingType &resource_ids, - std::vector> *return_by_value) { + std::vector> *return_objects) { resource_ids_ = resource_ids; worker_context_.SetCurrentTask(task_spec); SetCurrentTaskId(task_spec.TaskId()); @@ -668,9 +704,12 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, std::vector arg_reference_ids; RAY_CHECK_OK(BuildArgsForExecutor(task_spec, &args, &arg_reference_ids)); + const auto transport_type = worker_context_.CurrentActorUseDirectCall() + ? TaskTransportType::DIRECT_ACTOR + : TaskTransportType::RAYLET; std::vector return_ids; for (size_t i = 0; i < task_spec.NumReturns(); i++) { - return_ids.push_back(task_spec.ReturnId(i)); + return_ids.push_back(task_spec.ReturnId(i, transport_type)); } Status status; @@ -686,28 +725,25 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, task_type = TaskType::ACTOR_TASK; } - std::vector> return_objects; status = task_execution_callback_(task_type, func, task_spec.GetRequiredResources().GetResourceMap(), - args, arg_reference_ids, return_ids, &return_objects); + args, arg_reference_ids, return_ids, return_objects); - for (size_t i = 0; i < return_objects.size(); i++) { + for (size_t i = 0; i < return_objects->size(); i++) { // The object is nullptr if it already existed in the object store. - if (!return_objects[i]) { + if (!return_objects->at(i)) { continue; } - if (return_objects[i]->GetData()->IsPlasmaBuffer()) { - if (!Seal(return_ids[i]).ok()) { - RAY_LOG(ERROR) << "Task " << task_spec.TaskId() << " failed to seal object " + if (return_objects->at(i)->GetData()->IsPlasmaBuffer()) { + if (!Seal(return_ids[i].WithTransportType(TaskTransportType::RAYLET)).ok()) { + RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to seal object " << return_ids[i] << " in store: " << status.message(); } } else if (!worker_context_.CurrentActorUseDirectCall()) { - if (!Put(*return_objects[i], return_ids[i]).ok()) { - RAY_LOG(ERROR) << "Task " << task_spec.TaskId() << " failed to seal object " + if (!Put(*return_objects->at(i), return_ids[i]).ok()) { + RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to put object " << return_ids[i] << " in store: " << status.message(); } - } else { - return_by_value->push_back(return_objects[i]); } } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 15789b051..a2869da37 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -176,7 +176,7 @@ class CoreWorker { /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. /// \param[out] results Result list of objects data. /// \return Status. - Status Get(const std::vector &ids, int64_t timeout_ms, + Status Get(const std::vector &ids, const int64_t timeout_ms, std::vector> *results); /// Return whether or not the object store contains the given object. @@ -197,8 +197,8 @@ class CoreWorker { /// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative. /// \param[out] results A bitset that indicates each object has appeared or not. /// \return Status. - Status Wait(const std::vector &object_ids, int num_objects, - int64_t timeout_ms, std::vector *results); + Status Wait(const std::vector &object_ids, const int num_objects, + const int64_t timeout_ms, std::vector *results); /// Delete a list of objects from the object store. /// @@ -391,7 +391,7 @@ class CoreWorker { /// \return Status. Status ExecuteTask(const TaskSpecification &task_spec, const ResourceMappingType &resource_ids, - std::vector> *return_by_value); + std::vector> *return_objects); /// Build arguments for task executor. This would loop through all the arguments /// in task spec, and for each of them that's passed by reference (ObjectID), diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index ca21b81ec..0f0627d13 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -229,4 +229,11 @@ void CoreWorkerMemoryStore::Delete(const std::vector &object_ids) { } } +bool CoreWorkerMemoryStore::Contains(const ObjectID &object_id) { + std::unique_lock lock(lock_); + auto it = objects_.find(object_id); + // If obj is in plasma, we defer to the plasma store for the Contains() call. + return it != objects_.end() && !it->second->IsInPlasmaError(); +} + } // namespace ray diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.h b/src/ray/core_worker/store_provider/memory_store/memory_store.h index 6cad239b9..bd9596c43 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.h +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.h @@ -45,6 +45,12 @@ class CoreWorkerMemoryStore { /// \return Void. void Delete(const std::vector &object_ids); + /// Check whether this store contains the object. + /// + /// \param[in] object_id The object to check. + /// \return Whether the store has the object. + bool Contains(const ObjectID &object_id); + private: /// Map from object ID to `RayObject`. absl::flat_hash_map> objects_; diff --git a/src/ray/core_worker/store_provider/memory_store_provider.cc b/src/ray/core_worker/store_provider/memory_store_provider.cc index 549c7e317..12b6d14bc 100644 --- a/src/ray/core_worker/store_provider/memory_store_provider.cc +++ b/src/ray/core_worker/store_provider/memory_store_provider.cc @@ -43,6 +43,12 @@ Status CoreWorkerMemoryStoreProvider::Get( return Status::OK(); } +Status CoreWorkerMemoryStoreProvider::Contains(const ObjectID &object_id, + bool *has_object) { + *has_object = store_->Contains(object_id); + return Status::OK(); +} + Status CoreWorkerMemoryStoreProvider::Wait( const absl::flat_hash_set &object_ids, int num_objects, int64_t timeout_ms, const TaskID &task_id, absl::flat_hash_set *ready) { diff --git a/src/ray/core_worker/store_provider/memory_store_provider.h b/src/ray/core_worker/store_provider/memory_store_provider.h index d93c1abd5..2ae0d66e3 100644 --- a/src/ray/core_worker/store_provider/memory_store_provider.h +++ b/src/ray/core_worker/store_provider/memory_store_provider.h @@ -26,6 +26,8 @@ class CoreWorkerMemoryStoreProvider { absl::flat_hash_map> *results, bool *got_exception); + Status Contains(const ObjectID &object_id, bool *has_object); + /// Note that `num_objects` must equal to number of items in `object_ids`. Status Wait(const absl::flat_hash_set &object_ids, int num_objects, int64_t timeout_ms, const TaskID &task_id, diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 6c69213fa..a9e5ba8e4 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -10,7 +10,7 @@ CoreWorkerDirectActorTaskSubmitter::CoreWorkerDirectActorTaskSubmitter( std::unique_ptr store_provider) : io_service_(io_service), client_call_manager_(io_service), - store_provider_(std::move(store_provider)) {} + in_memory_store_(std::move(store_provider)) {} Status CoreWorkerDirectActorTaskSubmitter::SubmitTask( const TaskSpecification &task_spec) { @@ -147,22 +147,34 @@ void CoreWorkerDirectActorTaskSubmitter::DirectActorAssignTask( for (int i = 0; i < reply.return_objects_size(); i++) { const auto &return_object = reply.return_objects(i); ObjectID object_id = ObjectID::FromBinary(return_object.object_id()); - std::shared_ptr data_buffer; - if (return_object.data().size() > 0) { - data_buffer = std::make_shared( - const_cast( - reinterpret_cast(return_object.data().data())), - return_object.data().size()); + + if (return_object.in_plasma()) { + // Mark it as in plasma with a dummy object. + std::string meta = + std::to_string(static_cast(rpc::ErrorType::OBJECT_IN_PLASMA)); + auto metadata = + const_cast(reinterpret_cast(meta.data())); + auto meta_buffer = std::make_shared(metadata, meta.size()); + RAY_CHECK_OK( + in_memory_store_->Put(RayObject(nullptr, meta_buffer), object_id)); + } else { + std::shared_ptr data_buffer; + if (return_object.data().size() > 0) { + data_buffer = std::make_shared( + const_cast( + reinterpret_cast(return_object.data().data())), + return_object.data().size()); + } + std::shared_ptr metadata_buffer; + if (return_object.metadata().size() > 0) { + metadata_buffer = std::make_shared( + const_cast( + reinterpret_cast(return_object.metadata().data())), + return_object.metadata().size()); + } + RAY_CHECK_OK(in_memory_store_->Put(RayObject(data_buffer, metadata_buffer), + object_id)); } - std::shared_ptr metadata_buffer; - if (return_object.metadata().size() > 0) { - metadata_buffer = std::make_shared( - const_cast( - reinterpret_cast(return_object.metadata().data())), - return_object.metadata().size()); - } - RAY_CHECK_OK( - store_provider_->Put(RayObject(data_buffer, metadata_buffer), object_id)); } }); if (!status.ok()) { @@ -181,7 +193,7 @@ void CoreWorkerDirectActorTaskSubmitter::TreatTaskAsFailed( std::string meta = std::to_string(static_cast(error_type)); auto metadata = const_cast(reinterpret_cast(meta.data())); auto meta_buffer = std::make_shared(metadata, meta.size()); - RAY_CHECK_OK(store_provider_->Put(RayObject(nullptr, meta_buffer), object_id)); + RAY_CHECK_OK(in_memory_store_->Put(RayObject(nullptr, meta_buffer), object_id)); } } @@ -257,8 +269,8 @@ void CoreWorkerDirectActorTaskReceiver::HandleDirectActorAssignTask( // TODO(edoakes): resource IDs are currently kept track of in the raylet, // need to come up with a solution for this. ResourceMappingType resource_ids; - std::vector> return_by_value; - auto status = task_handler_(task_spec, resource_ids, &return_by_value); + std::vector> return_objects; + auto status = task_handler_(task_spec, resource_ids, &return_objects); if (status.IsSystemExit()) { // In Python, SystemExit can only be raised on the main thread. To work // around this when we are executing tasks on worker threads, we re-post the @@ -266,22 +278,29 @@ void CoreWorkerDirectActorTaskReceiver::HandleDirectActorAssignTask( task_main_io_service_.post([this]() { exit_handler_(); }); return; } - RAY_CHECK(return_by_value.size() == num_returns) - << return_by_value.size() << " " << num_returns; + RAY_CHECK(return_objects.size() == num_returns) + << return_objects.size() << " " << num_returns; - for (size_t i = 0; i < return_by_value.size(); i++) { + for (size_t i = 0; i < return_objects.size(); i++) { auto return_object = reply->add_return_objects(); ObjectID id = ObjectID::ForTaskReturn( task_spec.TaskId(), /*index=*/i + 1, /*transport_type=*/static_cast(TaskTransportType::DIRECT_ACTOR)); return_object->set_object_id(id.Binary()); - const auto &result = return_by_value[i]; - if (result->GetData() != nullptr) { - return_object->set_data(result->GetData()->Data(), result->GetData()->Size()); - } - if (result->GetMetadata() != nullptr) { - return_object->set_metadata(result->GetMetadata()->Data(), - result->GetMetadata()->Size()); + + // The object is nullptr if it already existed in the object store. + const auto &result = return_objects[i]; + if (result == nullptr || result->GetData()->IsPlasmaBuffer()) { + return_object->set_in_plasma(true); + } else { + if (result->GetData() != nullptr) { + return_object->set_data(result->GetData()->Data(), + result->GetData()->Size()); + } + if (result->GetMetadata() != nullptr) { + return_object->set_metadata(result->GetMetadata()->Data(), + result->GetMetadata()->Size()); + } } } diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index a178bc7f6..6192a2eb1 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -126,7 +126,7 @@ class CoreWorkerDirectActorTaskSubmitter { std::unordered_map> waiting_reply_tasks_; /// The store provider. - std::unique_ptr store_provider_; + std::unique_ptr in_memory_store_; friend class CoreWorkerTest; }; @@ -333,7 +333,7 @@ class CoreWorkerDirectActorTaskReceiver { public: using TaskHandler = std::function> *return_by_value)>; + std::vector> *return_objects)>; CoreWorkerDirectActorTaskReceiver(WorkerContext &worker_context, boost::asio::io_service &main_io_service, diff --git a/src/ray/core_worker/transport/raylet_transport.cc b/src/ray/core_worker/transport/raylet_transport.cc index c25cf239f..27b3f8bdb 100644 --- a/src/ray/core_worker/transport/raylet_transport.cc +++ b/src/ray/core_worker/transport/raylet_transport.cc @@ -51,8 +51,6 @@ void CoreWorkerRayletTaskReceiver::HandleAssignTask( exit_handler_(); return; } - // Raylet transport doesn't currently support returning objects inline. - RAY_CHECK(results.size() == 0); RAY_LOG(DEBUG) << "Assigned task " << task_spec.TaskId() << " finished execution."; diff --git a/src/ray/core_worker/transport/raylet_transport.h b/src/ray/core_worker/transport/raylet_transport.h index 98d53ceb7..e218e9cd9 100644 --- a/src/ray/core_worker/transport/raylet_transport.h +++ b/src/ray/core_worker/transport/raylet_transport.h @@ -13,7 +13,7 @@ class CoreWorkerRayletTaskReceiver { public: using TaskHandler = std::function> *return_by_value)>; + std::vector> *return_objects)>; CoreWorkerRayletTaskReceiver(std::unique_ptr &raylet_client, const TaskHandler &task_handler, diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 1e976ff48..d48f68eb6 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -47,10 +47,13 @@ message AssignTaskReply { message ReturnObject { // Object ID. bytes object_id = 1; + // If set, indicates the data is in plasma instead of inline. This + // means that data and metadata will be empty. + bool in_plasma = 2; // Data of the object. - bytes data = 2; + bytes data = 3; // Metadata of the object. - bytes metadata = 3; + bytes metadata = 4; } message DirectActorAssignTaskRequest { diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 55ef646a7..54f35bee1 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -283,4 +283,8 @@ enum ErrorType { OBJECT_UNRECONSTRUCTABLE = 2; // Indicates that a task failed due to user code failure. TASK_EXECUTION_EXCEPTION = 3; + // Indicates that the object has been placed in plasma. This error shouldn't ever be + // exposed to user code; it is only used internally to indicate the result of a direct + // call has been placed in plasma. + OBJECT_IN_PLASMA = 4; } diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index 2e9974108..633505c51 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -167,7 +167,7 @@ std::vector InsertTaskChain(LineageCache &lineage_cache, inserted_tasks.push_back(task); arguments.clear(); for (size_t j = 0; j < task.GetTaskSpecification().NumReturns(); j++) { - arguments.push_back(task.GetTaskSpecification().ReturnId(j)); + arguments.push_back(task.GetTaskSpecification().ReturnIdForPlasma(j)); } } return arguments; @@ -321,7 +321,7 @@ TEST_F(LineageCacheTest, TestEvictChain) { for (int i = 0; i < 3; i++) { auto task = ExampleTask(arguments, 1); tasks.push_back(task); - arguments = {task.GetTaskSpecification().ReturnId(0)}; + arguments = {task.GetTaskSpecification().ReturnIdForPlasma(0)}; } Lineage uncommitted_lineage; @@ -374,7 +374,7 @@ TEST_F(LineageCacheTest, TestEvictManyParents) { for (int i = 0; i < 10; i++) { auto task = ExampleTask({}, 1); parent_tasks.push_back(task); - arguments.push_back(task.GetTaskSpecification().ReturnId(0)); + arguments.push_back(task.GetTaskSpecification().ReturnIdForPlasma(0)); auto lineage = CreateSingletonLineage(task); lineage_cache_.AddUncommittedLineage(task.GetTaskSpecification().TaskId(), lineage); } @@ -525,7 +525,7 @@ TEST_F(LineageCacheTest, TestEvictionUncommittedChildren) { // Add more tasks to the lineage cache that will remain local. Each of these // tasks is dependent one of the tasks that was forwarded above. for (const auto &task : tasks) { - auto return_id = task.GetTaskSpecification().ReturnId(0); + auto return_id = task.GetTaskSpecification().ReturnIdForPlasma(0); auto dependent_task = ExampleTask({return_id}, 1); auto lineage = CreateSingletonLineage(dependent_task); lineage_cache_.AddUncommittedLineage(dependent_task.GetTaskSpecification().TaskId(), diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 347c006ea..88791a793 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1560,7 +1560,7 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ // Determine which IDs should be marked as failed. std::vector objects_to_fail; for (int64_t i = 0; i < num_returns; i++) { - objects_to_fail.push_back(spec.ReturnId(i).ToPlasmaId()); + objects_to_fail.push_back(spec.ReturnId(i, TaskTransportType::RAYLET).ToPlasmaId()); } const JobID job_id = task.GetTaskSpecification().JobId(); MarkObjectsAsFailed(error_type, objects_to_fail, job_id); @@ -1611,7 +1611,7 @@ void NodeManager::TreatTaskAsFailedIfLost(const Task &task) { // lookup callbacks are fired. auto task_marked_as_failed = std::make_shared(false); for (int64_t i = 0; i < num_returns; i++) { - const ObjectID object_id = spec.ReturnId(i); + const ObjectID object_id = spec.ReturnId(i, TaskTransportType::RAYLET); // Lookup the return value's locations. RAY_CHECK_OK(object_directory_->LookupLocations( object_id, [this, task_marked_as_failed, task]( diff --git a/src/ray/raylet/task_dependency_manager_test.cc b/src/ray/raylet/task_dependency_manager_test.cc index 3cf67e6f1..aae322454 100644 --- a/src/ray/raylet/task_dependency_manager_test.cc +++ b/src/ray/raylet/task_dependency_manager_test.cc @@ -95,7 +95,7 @@ std::vector MakeTaskChain(int chain_size, task_chain.push_back(task); arguments.clear(); for (size_t j = 0; j < task.GetTaskSpecification().NumReturns(); j++) { - arguments.push_back(task.GetTaskSpecification().ReturnId(j)); + arguments.push_back(task.GetTaskSpecification().ReturnIdForPlasma(j)); } } return task_chain; @@ -245,7 +245,7 @@ TEST_F(TaskDependencyManagerTest, TestTaskChain) { auto task = tasks.front(); tasks.erase(tasks.begin()); TaskID task_id = task.GetTaskSpecification().TaskId(); - auto return_id = task.GetTaskSpecification().ReturnId(0); + auto return_id = task.GetTaskSpecification().ReturnIdForPlasma(0); task_dependency_manager_.UnsubscribeGetDependencies(task_id); // Simulate the object notifications for the task's return values. @@ -305,7 +305,7 @@ TEST_F(TaskDependencyManagerTest, TestTaskForwarding) { // Get the first task. const auto task = tasks.front(); TaskID task_id = task.GetTaskSpecification().TaskId(); - ObjectID return_id = task.GetTaskSpecification().ReturnId(0); + ObjectID return_id = task.GetTaskSpecification().ReturnIdForPlasma(0); // Simulate forwarding the first task to a remote node. task_dependency_manager_.UnsubscribeGetDependencies(task_id); // The object returned by the first task should be considered remote once we @@ -444,7 +444,7 @@ TEST_F(TaskDependencyManagerTest, TestRemoveTasksAndRelatedObjects) { // runnable. auto task = tasks.front(); TaskID task_id = task.GetTaskSpecification().TaskId(); - auto return_id = task.GetTaskSpecification().ReturnId(0); + auto return_id = task.GetTaskSpecification().ReturnIdForPlasma(0); task_dependency_manager_.UnsubscribeGetDependencies(task_id); // Simulate the object notifications for the task's return values. auto ready_tasks = task_dependency_manager_.HandleObjectLocal(return_id); @@ -469,7 +469,7 @@ TEST_F(TaskDependencyManagerTest, TestRemoveTasksAndRelatedObjects) { // Simulate the object notifications for the second task's return values. // Make sure that this does not return the third task, which should have been // removed. - return_id = tasks[1].GetTaskSpecification().ReturnId(0); + return_id = tasks[1].GetTaskSpecification().ReturnIdForPlasma(0); ready_tasks = task_dependency_manager_.HandleObjectLocal(return_id); ASSERT_TRUE(ready_tasks.empty()); }