From 697f765efc26794ca1df2fc115780560b943309a Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 18 Oct 2019 00:03:57 -0400 Subject: [PATCH] Refactor CoreWorker to remove TaskInterface (#5924) * Remove TaskInterface * Remove Status return value * Remove CActorHandle, some return values, TaskSubmitter * lint * doc * doc * fix build * lint * Return Status, guarded by annotation, fail tasks for RECONSTRUCTING actors * fix * move annotation * revert * Fix core worker test * nits --- python/ray/_raylet.pyx | 41 ++--- python/ray/actor.py | 3 +- python/ray/includes/common.pxd | 7 - python/ray/includes/libcoreworker.pxd | 36 ++-- src/ray/core_worker/common.h | 40 +++++ src/ray/core_worker/core_worker.cc | 159 +++++++++++++++++- src/ray/core_worker/core_worker.h | 82 +++++++-- src/ray/core_worker/object_interface.h | 8 +- src/ray/core_worker/task_interface.cc | 139 --------------- src/ray/core_worker/task_interface.h | 152 ----------------- src/ray/core_worker/test/core_worker_test.cc | 72 +++----- .../transport/direct_actor_transport.cc | 19 +-- .../transport/direct_actor_transport.h | 7 +- .../core_worker/transport/raylet_transport.cc | 9 - .../core_worker/transport/raylet_transport.h | 19 --- src/ray/core_worker/transport/transport.h | 20 --- src/ray/rpc/worker/direct_actor_client.h | 9 +- 17 files changed, 343 insertions(+), 479 deletions(-) delete mode 100644 src/ray/core_worker/task_interface.cc delete mode 100644 src/ray/core_worker/task_interface.h diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index a149bd550..93f25ab32 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -23,7 +23,6 @@ from libcpp.vector cimport vector as c_vector from cython.operator import dereference, postincrement from ray.includes.common cimport ( - CActorHandle, CLanguage, CRayObject, CRayStatus, @@ -630,7 +629,6 @@ cdef class CoreWorker: CRayFunction ray_function c_vector[CTaskArg] args_vector c_vector[CObjectID] return_ids - CTaskID caller_id with profiling.profile("submit_task"): prepare_resources(resources, &c_resources) @@ -638,11 +636,9 @@ cdef class CoreWorker: ray_function = CRayFunction( LANGUAGE_PYTHON, string_vector_from_list(function_descriptor)) prepare_args(args, &args_vector) - caller_id = self.core_worker.get().GetCallerId() with nogil: - check_status(self.core_worker.get().Tasks().SubmitTask( - caller_id, + check_status(self.core_worker.get().SubmitTask( ray_function, args_vector, task_options, &return_ids)) return VectorToObjectIDs(return_ids) @@ -654,13 +650,12 @@ cdef class CoreWorker: resources, placement_resources): cdef: - unique_ptr[CActorHandle] actor_handle CRayFunction ray_function c_vector[CTaskArg] args_vector c_vector[c_string] dynamic_worker_options unordered_map[c_string, double] c_resources unordered_map[c_string, double] c_placement_resources - CTaskID caller_id + CActorID c_actor_id with profiling.profile("submit_task"): prepare_resources(resources, &c_resources) @@ -668,22 +663,16 @@ cdef class CoreWorker: ray_function = CRayFunction( LANGUAGE_PYTHON, string_vector_from_list(function_descriptor)) prepare_args(args, &args_vector) - caller_id = self.core_worker.get().GetCallerId() with nogil: - check_status(self.core_worker.get().Tasks().CreateActor( - caller_id, + check_status(self.core_worker.get().CreateActor( ray_function, args_vector, CActorCreationOptions( max_reconstructions, False, c_resources, c_placement_resources, dynamic_worker_options), - &actor_handle)) + &c_actor_id)) - actor_id = ActorID(actor_handle.get().GetActorID().Binary()) - inserted = self.core_worker.get().AddActorHandle( - move(actor_handle)) - assert inserted, "Actor {} already exists".format(actor_id) - return actor_id + return ActorID(c_actor_id.Binary()) def submit_actor_task(self, ActorID actor_id, @@ -699,7 +688,6 @@ cdef class CoreWorker: CRayFunction ray_function c_vector[CTaskArg] args_vector c_vector[CObjectID] return_ids - CTaskID caller_id with profiling.profile("submit_task"): prepare_resources(resources, &c_resources) @@ -707,12 +695,10 @@ cdef class CoreWorker: ray_function = CRayFunction( LANGUAGE_PYTHON, string_vector_from_list(function_descriptor)) prepare_args(args, &args_vector) - caller_id = self.core_worker.get().GetCallerId() with nogil: - check_status(self.core_worker.get().Tasks().SubmitActorTask( - caller_id, - self.core_worker.get().GetActorHandle(c_actor_id), + check_status(self.core_worker.get().SubmitActorTask( + c_actor_id, ray_function, args_vector, task_options, &return_ids)) @@ -726,17 +712,16 @@ cdef class CoreWorker: self.core_worker.get().CreateProfileEvent(c_event_type), extra_data) - def deserialize_actor_handle(self, c_string bytes): - cdef: - unique_ptr[CActorHandle] actor_handle - actor_handle.reset(new CActorHandle(bytes)) - actor_id = ActorID(actor_handle.get().GetActorID().Binary()) - self.core_worker.get().AddActorHandle(move(actor_handle)) + def deserialize_and_register_actor_handle(self, const c_string &bytes): + c_actor_id = self.core_worker.get().DeserializeAndRegisterActorHandle( + bytes) + actor_id = ActorID(c_actor_id.Binary()) return actor_id def serialize_actor_handle(self, ActorID actor_id): cdef: CActorID c_actor_id = actor_id.native() c_string output - self.core_worker.get().GetActorHandle(c_actor_id).Serialize(&output) + check_status(self.core_worker.get().SerializeActorHandle( + c_actor_id, &output)) return output diff --git a/python/ray/actor.py b/python/ray/actor.py index 7da3967c3..02378c1e2 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -631,7 +631,8 @@ class ActorHandle(object): # TODO(swang): Accessing the worker's current task ID is not # thread-safe. # Local mode just uses the actor ID. - worker.core_worker.deserialize_actor_handle(state["core_handle"]) + worker.core_worker.deserialize_and_register_actor_handle( + state["core_handle"]) if hasattr(worker, "core_worker") else state["core_handle"], state["module_name"], state["class_name"], diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index d259158a5..a7268a152 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -183,7 +183,6 @@ cdef extern from "ray/core_worker/common.h" nogil: @staticmethod CTaskArg PassByValue(const shared_ptr[CRayObject] &data) -cdef extern from "ray/core_worker/task_interface.h" nogil: cdef cppclass CTaskOptions "ray::TaskOptions": CTaskOptions() CTaskOptions(int num_returns, @@ -197,12 +196,6 @@ cdef extern from "ray/core_worker/task_interface.h" nogil: const unordered_map[c_string, double] &placement_resources, const c_vector[c_string] &dynamic_worker_options) - cdef cppclass CActorHandle "ray::ActorHandle": - CActorHandle(const c_string &serialized) - - CActorID GetActorID() const - void Serialize(c_string *output) - cdef extern from "ray/gcs/gcs_client_interface.h" nogil: cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions": CGcsClientOptions(const c_string &ip, int port, diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index d8c1e2942..5c992b810 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -12,7 +12,6 @@ from ray.includes.unique_ids cimport ( ) from ray.includes.common cimport ( CActorCreationOptions, - CActorHandle, CBuffer, CRayFunction, CRayObject, @@ -30,23 +29,6 @@ cdef extern from "ray/core_worker/profiling.h" nogil: cdef cppclass CProfileEvent "ray::worker::ProfileEvent": void SetExtraData(const c_string &extra_data) -cdef extern from "ray/core_worker/task_interface.h" namespace "ray" nogil: - cdef cppclass CTaskSubmissionInterface "CoreWorkerTaskInterface": - CRayStatus SubmitTask( - const CTaskID &caller_id, - const CRayFunction &function, const c_vector[CTaskArg] &args, - const CTaskOptions &options, c_vector[CObjectID] *return_ids) - CRayStatus CreateActor( - const CTaskID &caller_id, - const CRayFunction &function, const c_vector[CTaskArg] &args, - const CActorCreationOptions &options, - unique_ptr[CActorHandle] *handle) - CRayStatus SubmitActorTask( - const CTaskID &caller_id, - CActorHandle &handle, const CRayFunction &function, - const c_vector[CTaskArg] &args, const CTaskOptions &options, - c_vector[CObjectID] *return_ids) - cdef extern from "ray/core_worker/object_interface.h" nogil: cdef cppclass CObjectInterface "ray::CoreWorkerObjectInterface": CRayStatus SetClientOptions(c_string client_name, int64_t limit) @@ -78,7 +60,18 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CWorkerType &GetWorkerType() CLanguage &GetLanguage() CObjectInterface &Objects() - CTaskSubmissionInterface &Tasks() + + CRayStatus SubmitTask( + const CRayFunction &function, const c_vector[CTaskArg] &args, + const CTaskOptions &options, c_vector[CObjectID] *return_ids) + CRayStatus CreateActor( + const CRayFunction &function, const c_vector[CTaskArg] &args, + const CActorCreationOptions &options, CActorID *actor_id) + CRayStatus SubmitActorTask( + const CActorID &actor_id, const CRayFunction &function, + const c_vector[CTaskArg] &args, const CTaskOptions &options, + c_vector[CObjectID] *return_ids) + # CTaskExecutionInterface &Execution() unique_ptr[CProfileEvent] CreateProfileEvent( const c_string &event_type) @@ -94,5 +87,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: void SetActorId(const CActorID &actor_id) const CActorID &GetActorId() CTaskID GetCallerId() - c_bool AddActorHandle(unique_ptr[CActorHandle] handle) - CActorHandle &GetActorHandle(const CActorID &actor_id) + CActorID DeserializeAndRegisterActorHandle(const c_string &bytes) + CRayStatus SerializeActorHandle(const CActorID &actor_id, c_string + *bytes) diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 42c1e642a..637c2af25 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -83,6 +83,46 @@ enum class StoreProviderType { PLASMA, MEMORY }; enum class TaskTransportType { RAYLET, DIRECT_ACTOR }; +/// Options for all tasks (actor and non-actor) except for actor creation. +struct TaskOptions { + TaskOptions() {} + TaskOptions(int num_returns, std::unordered_map &resources) + : num_returns(num_returns), resources(resources) {} + + /// Number of returns of this task. + int num_returns = 1; + /// Resources required by this task. + std::unordered_map resources; +}; + +/// Options for actor creation tasks. +struct ActorCreationOptions { + ActorCreationOptions() {} + ActorCreationOptions(uint64_t max_reconstructions, bool is_direct_call, + const std::unordered_map &resources, + const std::unordered_map &placement_resources, + const std::vector &dynamic_worker_options) + : max_reconstructions(max_reconstructions), + is_direct_call(is_direct_call), + resources(resources), + placement_resources(placement_resources), + dynamic_worker_options(dynamic_worker_options) {} + + /// Maximum number of times that the actor should be reconstructed when it dies + /// unexpectedly. It must be non-negative. If it's 0, the actor won't be reconstructed. + const uint64_t max_reconstructions = 0; + /// Whether to use direct actor call. If this is set to true, callers will submit + /// tasks directly to the created actor without going through raylet. + const bool is_direct_call = false; + /// Resources required by the whole lifetime of this actor. + const std::unordered_map resources; + /// Resources required to place this actor. + const std::unordered_map placement_resources; + /// The dynamic options used in the worker command when starting a worker process for + /// an actor creation task. + const std::vector dynamic_worker_options; +}; + } // namespace ray #endif // RAY_CORE_WORKER_COMMON_H diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 62cd6d646..b4fbe4fbd 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1,8 +1,44 @@ #include +#include "ray/common/task/task_util.h" #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" +namespace { + +void BuildCommonTaskSpec( + ray::TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id, + const TaskID ¤t_task_id, const int task_index, const TaskID &caller_id, + const ray::RayFunction &function, const std::vector &args, + uint64_t num_returns, + const std::unordered_map &required_resources, + const std::unordered_map &required_placement_resources, + ray::TaskTransportType transport_type, std::vector *return_ids) { + // Build common task spec. + builder.SetCommonTaskSpec(task_id, function.GetLanguage(), + function.GetFunctionDescriptor(), job_id, current_task_id, + task_index, caller_id, num_returns, required_resources, + required_placement_resources); + // Set task arguments. + for (const auto &arg : args) { + if (arg.IsPassedByReference()) { + builder.AddByRefArg(arg.GetReference()); + } else { + builder.AddByValueArg(arg.GetValue()); + } + } + + // Compute return IDs. + return_ids->resize(num_returns); + for (size_t i = 0; i < num_returns; i++) { + (*return_ids)[i] = + ObjectID::ForTaskReturn(task_id, i + 1, + /*transport_type=*/static_cast(transport_type)); + } +} + +} // namespace + namespace ray { CoreWorker::CoreWorker( @@ -48,8 +84,6 @@ CoreWorker::CoreWorker( object_interface_ = std::unique_ptr(new CoreWorkerObjectInterface( worker_context_, raylet_client_, store_socket, use_memory_store)); - task_interface_ = std::unique_ptr(new CoreWorkerTaskInterface( - worker_context_, raylet_client_, *object_interface_, io_service_)); // Initialize task execution. int rpc_server_port = 0; @@ -97,6 +131,11 @@ CoreWorker::CoreWorker( RAY_CHECK_OK(gcs_client_->raylet_task_table().Add(job_id, task_id, data, nullptr)); worker_context_.SetCurrentTaskId(task_id); } + + direct_actor_submitter_ = std::unique_ptr( + new CoreWorkerDirectActorTaskSubmitter( + io_service_, + object_interface_->CreateStoreProvider(StoreProviderType::MEMORY))); } CoreWorker::~CoreWorker() { @@ -174,7 +213,7 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle) { // submit tasks to dead actors. } - task_interface_->HandleDirectActorUpdate(actor_id, actor_data); + direct_actor_submitter_->HandleActorUpdate(actor_id, actor_data); RAY_LOG(INFO) << "received notification on actor, state=" << static_cast(actor_data.state()) << ", actor_id: " << actor_id @@ -188,10 +227,118 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle) { return inserted; } -ActorHandle &CoreWorker::GetActorHandle(const ActorID &actor_id) { +Status CoreWorker::GetActorHandle(const ActorID &actor_id, + ActorHandle **actor_handle) const { auto it = actor_handles_.find(actor_id); - RAY_CHECK(it != actor_handles_.end()); - return *it->second; + if (it == actor_handles_.end()) { + return Status::Invalid("Handle for actor does not exist"); + } + *actor_handle = it->second.get(); + return Status::OK(); +} + +Status CoreWorker::SubmitTask(const RayFunction &function, + const std::vector &args, + const TaskOptions &task_options, + std::vector *return_ids) { + TaskSpecBuilder builder; + const int next_task_index = worker_context_.GetNextTaskIndex(); + const auto task_id = + TaskID::ForNormalTask(worker_context_.GetCurrentJobID(), + worker_context_.GetCurrentTaskID(), next_task_index); + BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, + worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), + function, args, task_options.num_returns, task_options.resources, + {}, TaskTransportType::RAYLET, return_ids); + return raylet_client_->SubmitTask(builder.Build()); +} + +Status CoreWorker::CreateActor(const RayFunction &function, + const std::vector &args, + const ActorCreationOptions &actor_creation_options, + ActorID *return_actor_id) { + const int next_task_index = worker_context_.GetNextTaskIndex(); + const ActorID actor_id = + ActorID::Of(worker_context_.GetCurrentJobID(), worker_context_.GetCurrentTaskID(), + next_task_index); + const TaskID actor_creation_task_id = TaskID::ForActorCreationTask(actor_id); + const JobID job_id = worker_context_.GetCurrentJobID(); + std::vector return_ids; + TaskSpecBuilder builder; + BuildCommonTaskSpec( + builder, job_id, actor_creation_task_id, worker_context_.GetCurrentTaskID(), + next_task_index, GetCallerId(), function, args, 1, actor_creation_options.resources, + actor_creation_options.placement_resources, TaskTransportType::RAYLET, &return_ids); + builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_reconstructions, + actor_creation_options.dynamic_worker_options, + actor_creation_options.is_direct_call); + + std::unique_ptr actor_handle(new ActorHandle( + actor_id, job_id, /*actor_cursor=*/return_ids[0], function.GetLanguage(), + actor_creation_options.is_direct_call, function.GetFunctionDescriptor())); + RAY_CHECK(AddActorHandle(std::move(actor_handle))) + << "Actor " << actor_id << " already exists"; + + RAY_RETURN_NOT_OK(raylet_client_->SubmitTask(builder.Build())); + *return_actor_id = actor_id; + return Status::OK(); +} + +Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &function, + const std::vector &args, + const TaskOptions &task_options, + std::vector *return_ids) { + ActorHandle *actor_handle = nullptr; + RAY_RETURN_NOT_OK(GetActorHandle(actor_id, &actor_handle)); + + // Add one for actor cursor object id for tasks. + const int num_returns = task_options.num_returns + 1; + + const bool is_direct_call = actor_handle->IsDirectCallActor(); + const TaskTransportType transport_type = + is_direct_call ? TaskTransportType::DIRECT_ACTOR : TaskTransportType::RAYLET; + + // Build common task spec. + TaskSpecBuilder builder; + const int next_task_index = worker_context_.GetNextTaskIndex(); + const TaskID actor_task_id = TaskID::ForActorTask( + worker_context_.GetCurrentJobID(), worker_context_.GetCurrentTaskID(), + next_task_index, actor_handle->GetActorID()); + BuildCommonTaskSpec(builder, actor_handle->CreationJobID(), actor_task_id, + worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), + function, args, num_returns, task_options.resources, {}, + transport_type, return_ids); + + const ObjectID new_cursor = return_ids->back(); + actor_handle->SetActorTaskSpec(builder, transport_type, new_cursor); + // Remove cursor from return ids. + return_ids->pop_back(); + + // Submit task. + Status status; + if (is_direct_call) { + status = direct_actor_submitter_->SubmitTask(builder.Build()); + } else { + status = raylet_client_->SubmitTask(builder.Build()); + } + return status; +} + +ActorID CoreWorker::DeserializeAndRegisterActorHandle(const std::string &serialized) { + std::unique_ptr actor_handle(new ActorHandle(serialized)); + const ActorID actor_id = actor_handle->GetActorID(); + RAY_UNUSED(AddActorHandle(std::move(actor_handle))); + return actor_id; +} + +Status CoreWorker::SerializeActorHandle(const ActorID &actor_id, + std::string *output) const { + ActorHandle *actor_handle = nullptr; + auto status = GetActorHandle(actor_id, &actor_handle); + if (status.ok()) { + actor_handle->Serialize(output); + } + return status; } } // namespace ray diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 5c98af38f..82452390d 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -2,12 +2,13 @@ #define RAY_CORE_WORKER_CORE_WORKER_H #include "ray/common/buffer.h" +#include "ray/core_worker/actor_handle.h" #include "ray/core_worker/common.h" #include "ray/core_worker/context.h" #include "ray/core_worker/object_interface.h" #include "ray/core_worker/profiling.h" #include "ray/core_worker/task_execution.h" -#include "ray/core_worker/task_interface.h" +#include "ray/core_worker/transport/direct_actor_transport.h" #include "ray/gcs/redis_gcs_client.h" #include "ray/raylet/raylet_client.h" @@ -58,10 +59,6 @@ class CoreWorker { RayletClient &GetRayletClient() { return *raylet_client_; } - /// Return the `CoreWorkerTaskInterface` that contains the methods related to task - /// submisson. - CoreWorkerTaskInterface &Tasks() { return *task_interface_; } - /// Return the `CoreWorkerObjectInterface` that contains methods related to object /// store. CoreWorkerObjectInterface &Objects() { return *object_interface_; } @@ -99,6 +96,69 @@ class CoreWorker { /// of the bytes zeroed out. TaskID GetCallerId() const; + /* Methods related to task submission. */ + + /// Submit a normal task. + /// + /// \param[in] function The remote function to execute. + /// \param[in] args Arguments of this task. + /// \param[in] task_options Options for this task. + /// \param[out] return_ids Ids of the return objects. + /// \return Status error if task submission fails, likely due to raylet failure. + Status SubmitTask(const RayFunction &function, const std::vector &args, + const TaskOptions &task_options, std::vector *return_ids); + + /// Create an actor. + /// + /// \param[in] caller_id ID of the task submitter. + /// \param[in] function The remote function that generates the actor object. + /// \param[in] args Arguments of this task. + /// \param[in] actor_creation_options Options for this actor creation task. + /// \param[out] actor_handle Handle to the actor. + /// \param[out] actor_id ID of the created actor. This can be used to submit + /// tasks on the actor. + /// \return Status error if actor creation fails, likely due to raylet failure. + Status CreateActor(const RayFunction &function, const std::vector &args, + const ActorCreationOptions &actor_creation_options, + ActorID *actor_id); + + /// Submit an actor task. + /// + /// \param[in] caller_id ID of the task submitter. + /// \param[in] actor_handle Handle to the actor. + /// \param[in] function The remote function to execute. + /// \param[in] args Arguments of this task. + /// \param[in] task_options Options for this task. + /// \param[out] return_ids Ids of the return objects. + /// \return Status error if the task is invalid or if the task submission + /// failed. Tasks can be invalid for direct actor calls because not all tasks + /// are currently supported. + Status SubmitActorTask(const ActorID &actor_id, const RayFunction &function, + const std::vector &args, + const TaskOptions &task_options, + std::vector *return_ids); + + /// Add an actor handle from a serialized string. + /// + /// This should be called when an actor handle is given to us by another task + /// or actor. This may be called even if we already have a handle to the same + /// actor. + /// + /// \param[in] serialized The serialized actor handle. + /// \return The ActorID of the deserialized handle. + ActorID DeserializeAndRegisterActorHandle(const std::string &serialized); + + /// Serialize an actor handle. + /// + /// This should be called when passing an actor handle to another task or + /// actor. + /// + /// \param[in] actor_id The ID of the actor handle to serialize. + /// \param[out] The serialized handle. + /// \return Status::Invalid if we don't have the specified handle. + Status SerializeActorHandle(const ActorID &actor_id, std::string *output) const; + + private: /// Give this worker a handle to an actor. /// /// This handle will remain as long as the current actor or task is @@ -114,11 +174,11 @@ class CoreWorker { /// Get a handle to an actor. This asserts that the worker actually has this /// handle. /// - /// \param actor_id The actor handle to get. - /// \return A handle to the requested actor. - ActorHandle &GetActorHandle(const ActorID &actor_id); + /// \param[in] actor_id The actor handle to get. + /// \param[out] actor_handle A handle to the requested actor. + /// \return Status::Invalid if we don't have this actor handle. + Status GetActorHandle(const ActorID &actor_id, ActorHandle **actor_handle) const; - private: void StartIOService(); const WorkerType worker_type_; @@ -141,8 +201,8 @@ class CoreWorker { std::thread io_thread_; std::shared_ptr profiler_; std::unique_ptr raylet_client_; + std::unique_ptr direct_actor_submitter_; std::unique_ptr gcs_client_; - std::unique_ptr task_interface_; std::unique_ptr object_interface_; /// Map from actor ID to a handle to that actor. @@ -150,6 +210,8 @@ class CoreWorker { /// Only available if it's not a driver. std::unique_ptr task_execution_interface_; + + friend class CoreWorkerTest; }; } // namespace ray diff --git a/src/ray/core_worker/object_interface.h b/src/ray/core_worker/object_interface.h index 90665d69e..9d576c319 100644 --- a/src/ray/core_worker/object_interface.h +++ b/src/ray/core_worker/object_interface.h @@ -115,6 +115,10 @@ class CoreWorkerObjectInterface { /// \return std::string The string describing memory usage. std::string MemoryUsageString(); + /// Create a new store provider for the specified type on demand. + std::unique_ptr CreateStoreProvider( + StoreProviderType type) const; + private: /// Helper function to group object IDs by the store provider that should be used /// for them. @@ -138,10 +142,6 @@ class CoreWorkerObjectInterface { EnumUnorderedMap> &ids_per_provider, int64_t timeout_ms, int *num_objects, std::unordered_set *results); - /// Create a new store provider for the specified type on demand. - std::unique_ptr CreateStoreProvider( - StoreProviderType type) const; - /// Add a store provider for the specified type. void AddStoreProvider(StoreProviderType type); diff --git a/src/ray/core_worker/task_interface.cc b/src/ray/core_worker/task_interface.cc deleted file mode 100644 index 7923afb29..000000000 --- a/src/ray/core_worker/task_interface.cc +++ /dev/null @@ -1,139 +0,0 @@ -#include "ray/core_worker/task_interface.h" -#include "ray/core_worker/context.h" -#include "ray/core_worker/task_interface.h" -#include "ray/core_worker/transport/direct_actor_transport.h" -#include "ray/core_worker/transport/raylet_transport.h" - -namespace ray { - -CoreWorkerTaskInterface::CoreWorkerTaskInterface( - WorkerContext &worker_context, std::unique_ptr &raylet_client, - CoreWorkerObjectInterface &object_interface, boost::asio::io_service &io_service) - : worker_context_(worker_context) { - task_submitters_.emplace(TaskTransportType::RAYLET, - std::unique_ptr( - new CoreWorkerRayletTaskSubmitter(raylet_client))); - task_submitters_.emplace(TaskTransportType::DIRECT_ACTOR, - std::unique_ptr( - new CoreWorkerDirectActorTaskSubmitter( - io_service, object_interface.CreateStoreProvider( - StoreProviderType::MEMORY)))); -} - -void CoreWorkerTaskInterface::BuildCommonTaskSpec( - TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id, - const int task_index, const TaskID &caller_id, const RayFunction &function, - const std::vector &args, uint64_t num_returns, - const std::unordered_map &required_resources, - const std::unordered_map &required_placement_resources, - TaskTransportType transport_type, std::vector *return_ids) { - // Build common task spec. - builder.SetCommonTaskSpec( - task_id, function.GetLanguage(), function.GetFunctionDescriptor(), job_id, - worker_context_.GetCurrentTaskID(), task_index, caller_id, num_returns, - required_resources, required_placement_resources); - // Set task arguments. - for (const auto &arg : args) { - if (arg.IsPassedByReference()) { - builder.AddByRefArg(arg.GetReference()); - } else { - builder.AddByValueArg(arg.GetValue()); - } - } - - // Compute return IDs. - (*return_ids).resize(num_returns); - for (size_t i = 0; i < num_returns; i++) { - (*return_ids)[i] = - ObjectID::ForTaskReturn(task_id, i + 1, - /*transport_type=*/static_cast(transport_type)); - } -} - -Status CoreWorkerTaskInterface::SubmitTask(const TaskID &caller_id, - const RayFunction &function, - const std::vector &args, - const TaskOptions &task_options, - std::vector *return_ids) { - TaskSpecBuilder builder; - const int next_task_index = worker_context_.GetNextTaskIndex(); - const auto task_id = - TaskID::ForNormalTask(worker_context_.GetCurrentJobID(), - worker_context_.GetCurrentTaskID(), next_task_index); - BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, - next_task_index, caller_id, function, args, - task_options.num_returns, task_options.resources, {}, - TaskTransportType::RAYLET, return_ids); - return task_submitters_[TaskTransportType::RAYLET]->SubmitTask(builder.Build()); -} - -Status CoreWorkerTaskInterface::CreateActor( - const TaskID &caller_id, const RayFunction &function, - const std::vector &args, const ActorCreationOptions &actor_creation_options, - std::unique_ptr *actor_handle) { - const int next_task_index = worker_context_.GetNextTaskIndex(); - const ActorID actor_id = - ActorID::Of(worker_context_.GetCurrentJobID(), worker_context_.GetCurrentTaskID(), - next_task_index); - const TaskID actor_creation_task_id = TaskID::ForActorCreationTask(actor_id); - const JobID job_id = worker_context_.GetCurrentJobID(); - std::vector return_ids; - TaskSpecBuilder builder; - BuildCommonTaskSpec(builder, job_id, actor_creation_task_id, next_task_index, caller_id, - function, args, 1, actor_creation_options.resources, - actor_creation_options.placement_resources, - TaskTransportType::RAYLET, &return_ids); - builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_reconstructions, - actor_creation_options.dynamic_worker_options, - actor_creation_options.is_direct_call); - - *actor_handle = std::unique_ptr(new ActorHandle( - actor_id, job_id, /*actor_cursor=*/return_ids[0], function.GetLanguage(), - actor_creation_options.is_direct_call, function.GetFunctionDescriptor())); - - return task_submitters_[TaskTransportType::RAYLET]->SubmitTask(builder.Build()); -} - -Status CoreWorkerTaskInterface::SubmitActorTask(const TaskID &caller_id, - ActorHandle &actor_handle, - const RayFunction &function, - const std::vector &args, - const TaskOptions &task_options, - std::vector *return_ids) { - // Add one for actor cursor object id for tasks. - const int num_returns = task_options.num_returns + 1; - - const bool is_direct_call = actor_handle.IsDirectCallActor(); - const TaskTransportType transport_type = - is_direct_call ? TaskTransportType::DIRECT_ACTOR : TaskTransportType::RAYLET; - - // Build common task spec. - TaskSpecBuilder builder; - const int next_task_index = worker_context_.GetNextTaskIndex(); - const TaskID actor_task_id = TaskID::ForActorTask( - worker_context_.GetCurrentJobID(), worker_context_.GetCurrentTaskID(), - next_task_index, actor_handle.GetActorID()); - BuildCommonTaskSpec(builder, actor_handle.CreationJobID(), actor_task_id, - next_task_index, caller_id, function, args, num_returns, - task_options.resources, {}, transport_type, return_ids); - - const ObjectID new_cursor = return_ids->back(); - actor_handle.SetActorTaskSpec(builder, transport_type, new_cursor); - - // Submit task. - auto status = task_submitters_[transport_type]->SubmitTask(builder.Build()); - // Remove cursor from return ids. - return_ids->pop_back(); - - return status; -} - -void CoreWorkerTaskInterface::HandleDirectActorUpdate( - const ActorID &actor_id, const gcs::ActorTableData &actor_data) { - auto &submitter = task_submitters_[TaskTransportType::DIRECT_ACTOR]; - auto &direct_actor_submitter = - reinterpret_cast &>(submitter); - direct_actor_submitter->HandleActorUpdate(actor_id, actor_data); -} - -} // namespace ray diff --git a/src/ray/core_worker/task_interface.h b/src/ray/core_worker/task_interface.h deleted file mode 100644 index e9c08f9bc..000000000 --- a/src/ray/core_worker/task_interface.h +++ /dev/null @@ -1,152 +0,0 @@ -#ifndef RAY_CORE_WORKER_TASK_INTERFACE_H -#define RAY_CORE_WORKER_TASK_INTERFACE_H - -#include "ray/common/buffer.h" -#include "ray/common/grpc_util.h" -#include "ray/common/id.h" -#include "ray/common/status.h" -#include "ray/common/task/task.h" -#include "ray/common/task/task_spec.h" -#include "ray/common/task/task_util.h" -#include "ray/core_worker/actor_handle.h" -#include "ray/core_worker/common.h" -#include "ray/core_worker/context.h" -#include "ray/core_worker/object_interface.h" -#include "ray/core_worker/transport/transport.h" -#include "ray/gcs/redis_gcs_client.h" -#include "ray/protobuf/core_worker.pb.h" - -namespace ray { - -/// Options of a non-actor-creation task. -struct TaskOptions { - TaskOptions() {} - TaskOptions(int num_returns, std::unordered_map &resources) - : num_returns(num_returns), resources(resources) {} - - /// Number of returns of this task. - int num_returns = 1; - /// Resources required by this task. - std::unordered_map resources; -}; - -/// Options of an actor creation task. -struct ActorCreationOptions { - ActorCreationOptions() {} - ActorCreationOptions(uint64_t max_reconstructions, bool is_direct_call, - const std::unordered_map &resources, - const std::unordered_map &placement_resources, - const std::vector &dynamic_worker_options) - : max_reconstructions(max_reconstructions), - is_direct_call(is_direct_call), - resources(resources), - placement_resources(placement_resources), - dynamic_worker_options(dynamic_worker_options) {} - - /// Maximum number of times that the actor should be reconstructed when it dies - /// unexpectedly. It must be non-negative. If it's 0, the actor won't be reconstructed. - const uint64_t max_reconstructions = 0; - /// Whether to use direct actor call. If this is set to true, callers will submit - /// tasks directly to the created actor without going through raylet. - const bool is_direct_call = false; - /// Resources required by the whole lifetime of this actor. - const std::unordered_map resources; - /// Resources required to place this actor. - const std::unordered_map placement_resources; - /// The dynamic options used in the worker command when starting a worker process for - /// an actor creation task. - const std::vector dynamic_worker_options; -}; - -/// The interface that contains all `CoreWorker` methods that are related to task -/// submission. -class CoreWorkerTaskInterface { - public: - CoreWorkerTaskInterface(WorkerContext &worker_context, - std::unique_ptr &raylet_client, - CoreWorkerObjectInterface &object_interface, - boost::asio::io_service &io_service); - - /// Submit a normal task. - /// - /// \param[in] caller_id ID of the task submitter. - /// \param[in] function The remote function to execute. - /// \param[in] args Arguments of this task. - /// \param[in] task_options Options for this task. - /// \param[out] return_ids Ids of the return objects. - /// \return Status. - Status SubmitTask(const TaskID &caller_id, const RayFunction &function, - const std::vector &args, const TaskOptions &task_options, - std::vector *return_ids); - - /// Create an actor. - /// - /// \param[in] caller_id ID of the task submitter. - /// \param[in] function The remote function that generates the actor object. - /// \param[in] args Arguments of this task. - /// \param[in] actor_creation_options Options for this actor creation task. - /// \param[out] actor_handle Handle to the actor. - /// \return Status. - Status CreateActor(const TaskID &caller_id, const RayFunction &function, - const std::vector &args, - const ActorCreationOptions &actor_creation_options, - std::unique_ptr *actor_handle); - - /// Submit an actor task. - /// - /// \param[in] caller_id ID of the task submitter. - /// \param[in] actor_handle Handle to the actor. - /// \param[in] function The remote function to execute. - /// \param[in] args Arguments of this task. - /// \param[in] task_options Options for this task. - /// \param[out] return_ids Ids of the return objects. - /// \return Status. - Status SubmitActorTask(const TaskID &caller_id, ActorHandle &actor_handle, - const RayFunction &function, const std::vector &args, - const TaskOptions &task_options, - std::vector *return_ids); - - /// Handle an update about an actor. - /// - /// \param[in] actor_id The ID of the actor whose status has changed. - /// \param[in] actor_data The actor's new status information. - void HandleDirectActorUpdate(const ActorID &actor_id, - const gcs::ActorTableData &actor_data); - - private: - /// Build common attributes of the task spec, and compute return ids. - /// - /// \param[in] builder Builder to build a `TaskSpec`. - /// \param[in] job_id The ID of the job submitting the task. - /// \param[in] task_id The ID of this task. - /// \param[in] task_index The task index used to build this task. - /// \param[in] function The remote function to execute. - /// \param[in] args Arguments of this task. - /// \param[in] num_returns Number of returns. - /// \param[in] required_resources Resources required by this task. - /// \param[in] required_placement_resources Resources required by placing this task on a - /// node. - /// \param[in] transport_type The transport used for this task. - /// \param[out] return_ids Return IDs. - /// \return Void. - void BuildCommonTaskSpec( - TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id, - const int task_index, const TaskID &caller_id, const RayFunction &function, - const std::vector &args, uint64_t num_returns, - const std::unordered_map &required_resources, - const std::unordered_map &required_placement_resources, - TaskTransportType transport_type, std::vector *return_ids); - - /// Reference to the parent CoreWorker's context. - WorkerContext &worker_context_; - - /// All the task submitters supported. - EnumUnorderedMap> - task_submitters_; - - friend class CoreWorkerTest; -}; - -} // namespace ray - -#endif // RAY_CORE_WORKER_TASK_INTERFACE_H diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index b41cfa055..ee6cc0e07 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -48,9 +48,9 @@ std::shared_ptr GenerateRandomBuffer() { return std::make_shared(arg1.data(), arg1.size(), true); } -ActorHandle &CreateActorHelper(CoreWorker &worker, - std::unordered_map &resources, - bool is_direct_call, uint64_t max_reconstructions) { +ActorID CreateActorHelper(CoreWorker &worker, + std::unordered_map &resources, + bool is_direct_call, uint64_t max_reconstructions) { std::unique_ptr actor_handle; uint8_t array[] = {1, 2, 3}; @@ -64,11 +64,9 @@ ActorHandle &CreateActorHelper(CoreWorker &worker, max_reconstructions, is_direct_call, resources, resources, {}}; // Create an actor. - RAY_CHECK_OK(worker.Tasks().CreateActor(worker.GetCallerId(), func, args, actor_options, - &actor_handle)); - ActorID actor_id = actor_handle->GetActorID(); - RAY_CHECK(worker.AddActorHandle(std::move(actor_handle))); - return worker.GetActorHandle(actor_id); + ActorID actor_id; + RAY_CHECK_OK(worker.CreateActor(func, args, actor_options, &actor_id)); + return actor_id; } class CoreWorkerTest : public ::testing::Test { @@ -205,14 +203,7 @@ bool CoreWorkerTest::WaitForDirectCallActorState(CoreWorker &worker, const ActorID &actor_id, bool wait_alive, int timeout_ms) { auto condition_func = [&worker, actor_id, wait_alive]() -> bool { - auto &task_submitters = worker.Tasks().task_submitters_; - RAY_CHECK(task_submitters.count(TaskTransportType::DIRECT_ACTOR) > 0); - auto submitter = - worker.Tasks().task_submitters_[TaskTransportType::DIRECT_ACTOR].get(); - auto direct_actor_submitter = - dynamic_cast(submitter); - RAY_CHECK(direct_actor_submitter != nullptr); - bool actor_alive = direct_actor_submitter->IsActorAlive(actor_id); + bool actor_alive = worker.direct_actor_submitter_->IsActorAlive(actor_id); return wait_alive ? actor_alive : !actor_alive; }; @@ -243,8 +234,7 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map &res TaskOptions options; std::vector return_ids; - RAY_CHECK_OK(driver.Tasks().SubmitTask(driver.GetCallerId(), func, args, options, - &return_ids)); + RAY_CHECK_OK(driver.SubmitTask(func, args, options, &return_ids)); ASSERT_EQ(return_ids.size(), 1); @@ -268,7 +258,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso raylet_socket_names_[0], NextJobId(), gcs_options_, "", "127.0.0.1", nullptr); - auto &actor_handle = CreateActorHelper(driver, resources, is_direct_call, 1000); + auto actor_id = CreateActorHelper(driver, resources, is_direct_call, 1000); // Test submitting some tasks with by-value args for that actor. { @@ -288,8 +278,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso std::vector return_ids; RayFunction func(ray::Language::PYTHON, {}); - RAY_CHECK_OK(driver.Tasks().SubmitActorTask(driver.GetCallerId(), actor_handle, - func, args, options, &return_ids)); + RAY_CHECK_OK(driver.SubmitActorTask(actor_id, func, args, options, &return_ids)); ASSERT_EQ(return_ids.size(), 1); ASSERT_TRUE(return_ids[0].IsReturnObject()); ASSERT_EQ( @@ -329,14 +318,14 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso TaskOptions options{1, resources}; std::vector return_ids; RayFunction func(ray::Language::PYTHON, {}); - auto status = driver.Tasks().SubmitActorTask(driver.GetCallerId(), actor_handle, func, - args, options, &return_ids); + auto status = driver.SubmitActorTask(actor_id, func, args, options, &return_ids); if (is_direct_call) { // For direct actor call, submitting a task with by-reference arguments // would fail. ASSERT_TRUE(!status.ok()); return; } + ASSERT_TRUE(status.ok()); ASSERT_EQ(return_ids.size(), 1); @@ -359,11 +348,10 @@ void CoreWorkerTest::TestActorReconstruction( nullptr); // creating actor. - auto &actor_handle = CreateActorHelper(driver, resources, is_direct_call, 1000); + auto actor_id = CreateActorHelper(driver, resources, is_direct_call, 1000); // Wait for actor alive event. - ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle.GetActorID(), true, - 30 * 1000 /* 30s */)); + ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_id, true, 30 * 1000 /* 30s */)); RAY_LOG(INFO) << "actor has been created"; // Test submitting some tasks with by-value args for that actor. @@ -377,10 +365,10 @@ void CoreWorkerTest::TestActorReconstruction( ASSERT_EQ(system("pkill mock_worker"), 0); // Wait for actor restruction event, and then for alive event. - ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle.GetActorID(), false, - 30 * 1000 /* 30s */)); - ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle.GetActorID(), true, - 30 * 1000 /* 30s */)); + ASSERT_TRUE( + WaitForDirectCallActorState(driver, actor_id, false, 30 * 1000 /* 30s */)); + ASSERT_TRUE( + WaitForDirectCallActorState(driver, actor_id, true, 30 * 1000 /* 30s */)); RAY_LOG(INFO) << "actor has been reconstructed"; } @@ -397,9 +385,7 @@ void CoreWorkerTest::TestActorReconstruction( std::vector return_ids; RayFunction func(ray::Language::PYTHON, {}); - auto status = driver.Tasks().SubmitActorTask(driver.GetCallerId(), actor_handle, - func, args, options, &return_ids); - RAY_CHECK_OK(status); + RAY_CHECK_OK(driver.SubmitActorTask(actor_id, func, args, options, &return_ids)); ASSERT_EQ(return_ids.size(), 1); // Verify if it's expected data. std::vector> results; @@ -418,7 +404,7 @@ void CoreWorkerTest::TestActorFailure(std::unordered_map &r nullptr); // creating actor. - auto &actor_handle = + auto actor_id = CreateActorHelper(driver, resources, is_direct_call, 0 /* not reconstructable */); // Test submitting some tasks with by-value args for that actor. @@ -444,11 +430,7 @@ void CoreWorkerTest::TestActorFailure(std::unordered_map &r std::vector return_ids; RayFunction func(ray::Language::PYTHON, {}); - auto status = driver.Tasks().SubmitActorTask(driver.GetCallerId(), actor_handle, - func, args, options, &return_ids); - if (i < task_index_to_kill_worker) { - RAY_CHECK_OK(status); - } + RAY_CHECK_OK(driver.SubmitActorTask(actor_id, func, args, options, &return_ids)); ASSERT_EQ(return_ids.size(), 1); all_results.emplace_back(std::make_pair(return_ids[0], buffer1)); @@ -704,12 +686,11 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { std::vector object_ids; // Create an actor. std::unordered_map resources; - auto &actor_handle = CreateActorHelper(driver, resources, - /*is_direct_call=*/true, - /*max_reconstructions=*/0); + auto actor_id = CreateActorHelper(driver, resources, + /*is_direct_call=*/true, + /*max_reconstructions=*/0); // wait for actor creation finish. - ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle.GetActorID(), true, - 30 * 1000 /* 30s */)); + ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_id, true, 30 * 1000 /* 30s */)); // Test submitting some tasks with by-value args for that actor. int64_t start_ms = current_time_ms(); const int num_tasks = 100000; @@ -726,8 +707,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { std::vector return_ids; RayFunction func(ray::Language::PYTHON, {}); - RAY_CHECK_OK(driver.Tasks().SubmitActorTask(driver.GetCallerId(), actor_handle, func, - args, options, &return_ids)); + RAY_CHECK_OK(driver.SubmitActorTask(actor_id, func, args, options, &return_ids)); ASSERT_EQ(return_ids.size(), 1); object_ids.emplace_back(return_ids[0]); } diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index d630711da..5e6e992c0 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -50,7 +50,6 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask( // specified time. pending_requests_[actor_id].emplace_back(std::move(request)); RAY_LOG(DEBUG) << "Actor " << actor_id << " is not yet created."; - return Status::OK(); } else if (iter->second.state_ == ActorTableData::ALIVE) { // Actor is alive, submit the request. if (rpc_clients_.count(actor_id) == 0) { @@ -62,14 +61,15 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask( // Submit request. auto &client = rpc_clients_[actor_id]; PushTask(*client, std::move(request), actor_id, task_id, num_returns); - return Status::OK(); } else { // Actor is dead, treat the task as failure. RAY_CHECK(iter->second.state_ == ActorTableData::DEAD); TreatTaskAsFailed(task_id, num_returns, rpc::ErrorType::ACTOR_DIED); - // Return OK here so that we can get the error from store with get operation. - return Status::OK(); } + + // If the task submission subsequently fails, then the client will receive + // the error in a callback. + return Status::OK(); } void CoreWorkerDirectActorTaskSubmitter::HandleActorUpdate( @@ -103,15 +103,14 @@ void CoreWorkerDirectActorTaskSubmitter::HandleActorUpdate( waiting_reply_tasks_.erase(actor_id); } - // If this actor is permanently dead and there are pending requests, treat - // the pending tasks as failed. - if (actor_data.state() == ActorTableData::DEAD && - pending_requests_.count(actor_id) > 0) { - for (const auto &request : pending_requests_[actor_id]) { + // If there are pending requests, treat the pending tasks as failed. + auto pending_it = pending_requests_.find(actor_id); + if (pending_it != pending_requests_.end()) { + for (const auto &request : pending_it->second) { TreatTaskAsFailed(TaskID::FromBinary(request->task_spec().task_id()), request->task_spec().num_returns(), rpc::ErrorType::ACTOR_DIED); } - pending_requests_.erase(actor_id); + pending_requests_.erase(pending_it); } } } diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index 489608cf3..40c219eda 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -32,7 +32,8 @@ struct ActorStateData { std::pair location_; }; -class CoreWorkerDirectActorTaskSubmitter : public CoreWorkerTaskSubmitter { +// This class is thread-safe. +class CoreWorkerDirectActorTaskSubmitter { public: CoreWorkerDirectActorTaskSubmitter( boost::asio::io_service &io_service, @@ -41,8 +42,8 @@ class CoreWorkerDirectActorTaskSubmitter : public CoreWorkerTaskSubmitter { /// Submit a task to an actor for execution. /// /// \param[in] task The task spec to submit. - /// \return Status. - Status SubmitTask(const TaskSpecification &task_spec) override; + /// \return Status::Invalid if the task is not yet supported. + Status SubmitTask(const TaskSpecification &task_spec); /// Handle an update about an actor. /// diff --git a/src/ray/core_worker/transport/raylet_transport.cc b/src/ray/core_worker/transport/raylet_transport.cc index f0f7a01cf..e17f3acca 100644 --- a/src/ray/core_worker/transport/raylet_transport.cc +++ b/src/ray/core_worker/transport/raylet_transport.cc @@ -4,15 +4,6 @@ namespace ray { -CoreWorkerRayletTaskSubmitter::CoreWorkerRayletTaskSubmitter( - std::unique_ptr &raylet_client) - : raylet_client_(raylet_client) {} - -Status CoreWorkerRayletTaskSubmitter::SubmitTask(const TaskSpecification &task) { - RAY_CHECK(raylet_client_ != nullptr); - return raylet_client_->SubmitTask(task); -} - CoreWorkerRayletTaskReceiver::CoreWorkerRayletTaskReceiver( WorkerContext &worker_context, std::unique_ptr &raylet_client, CoreWorkerObjectInterface &object_interface, boost::asio::io_service &io_service, diff --git a/src/ray/core_worker/transport/raylet_transport.h b/src/ray/core_worker/transport/raylet_transport.h index 39a529cde..76e3dc8cf 100644 --- a/src/ray/core_worker/transport/raylet_transport.h +++ b/src/ray/core_worker/transport/raylet_transport.h @@ -10,25 +10,6 @@ namespace ray { -/// In raylet task submitter and receiver, a task is submitted to raylet, and possibly -/// gets forwarded to another raylet on which node the task should be executed, and -/// then a worker on that node gets this task and starts executing it. - -class CoreWorkerRayletTaskSubmitter : public CoreWorkerTaskSubmitter { - public: - CoreWorkerRayletTaskSubmitter(std::unique_ptr &raylet_client); - - /// Submit a task for execution to raylet. - /// - /// \param[in] task The task spec to submit. - /// \return Status. - virtual Status SubmitTask(const TaskSpecification &task_spec) override; - - private: - /// Raylet client. - std::unique_ptr &raylet_client_; -}; - class CoreWorkerRayletTaskReceiver : public CoreWorkerTaskReceiver, public rpc::WorkerTaskHandler { public: diff --git a/src/ray/core_worker/transport/transport.h b/src/ray/core_worker/transport/transport.h index 34353841f..51a8d9549 100644 --- a/src/ray/core_worker/transport/transport.h +++ b/src/ray/core_worker/transport/transport.h @@ -12,26 +12,6 @@ namespace ray { -/// Interfaces for task submitter and receiver. They are separate classes but should be -/// used in pairs - one type of task submitter should be used together with task -/// with the same type, so these classes are put together in this same file. -/// -/// Task submitter/receiver should inherit from these classes and provide implementions -/// for the methods. The actual task submitter/receiver can submit/get tasks via raylet, -/// or directly to/from another worker. - -/// This class is responsible to submit tasks. -class CoreWorkerTaskSubmitter { - public: - /// Submit a task for execution. - /// - /// \param[in] task The task spec to submit. - /// \return Status. - virtual Status SubmitTask(const TaskSpecification &task_spec) = 0; - - virtual ~CoreWorkerTaskSubmitter() {} -}; - /// This class receives tasks for execution. class CoreWorkerTaskReceiver { public: diff --git a/src/ray/rpc/worker/direct_actor_client.h b/src/ray/rpc/worker/direct_actor_client.h index ae685ccad..05eb49264 100644 --- a/src/ray/rpc/worker/direct_actor_client.h +++ b/src/ray/rpc/worker/direct_actor_client.h @@ -7,6 +7,7 @@ #include #include +#include "absl/base/thread_annotations.h" #include "ray/common/status.h" #include "ray/rpc/client_call.h" @@ -49,7 +50,7 @@ class DirectActorClient : public std::enable_shared_from_this return ray::Status::OK(); } - /// Send as many pending tasks as possible. This method is NOT thread-safe. + /// Send as many pending tasks as possible. This method is thread-safe. /// /// The client will guarantee no more than kMaxBytesInFlight bytes of RPCs are being /// sent at once. This prevents the server scheduling queue from being overwhelmed. @@ -116,13 +117,13 @@ class DirectActorClient : public std::enable_shared_from_this /// Queue of requests to send. std::deque, ClientCallback>> - send_queue_; + send_queue_ GUARDED_BY(mutex_); /// The number of bytes currently in flight. - int64_t rpc_bytes_in_flight_ = 0; + int64_t rpc_bytes_in_flight_ GUARDED_BY(mutex_) = 0; /// The max sequence number we have processed responses for. - int64_t max_finished_seq_no_ = -1; + int64_t max_finished_seq_no_ GUARDED_BY(mutex_) = -1; }; } // namespace rpc