From 8f19f1eafbbaa46c5260da8b63093fcac5f41ae0 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 7 Jul 2020 11:11:41 -0700 Subject: [PATCH] [Core] Actor handle refactoring (#8895) * Marking needed changes. * Resolve basic dependencies. * In progress. * linting. * In progress 2. * Linting. * Refactor done. Cleanup needed. * Linting. * Recover kill actor in core worker because it is used inside raylet * Cleanup. * Use unique pointer instead. Unit tests are broken now. * Fix the upstream change. * Addressed code review 1. * Lint. * Addressed code review 2. * Fix weird github history. * Lint. * Linting using clang 7.0. * Use a better check message. * Revert cpp stuff. * Fix weird linting errors. * Manuall fix all lint issues. * Update a newline. * Refactor some interface. * Addressed all code review. * Addressed code review --- BUILD.bazel | 10 + python/ray/_raylet.pxd | 22 +- python/ray/_raylet.pyx | 25 +- python/ray/includes/libcoreworker.pxd | 6 +- python/ray/util/named_actors.py | 4 +- src/ray/core_worker/actor_manager.cc | 171 +++++++++- src/ray/core_worker/actor_manager.h | 141 ++++++++- src/ray/core_worker/actor_reporter.cc | 37 +++ src/ray/core_worker/actor_reporter.h | 44 +++ src/ray/core_worker/core_worker.cc | 190 +++-------- src/ray/core_worker/core_worker.h | 51 +-- .../io_ray_runtime_actor_NativeActorHandle.cc | 19 +- src/ray/core_worker/reference_count.h | 27 +- src/ray/core_worker/task_manager.cc | 2 +- src/ray/core_worker/task_manager.h | 8 +- .../core_worker/test/actor_manager_test.cc | 299 ++++++++++++++++++ src/ray/core_worker/test/task_manager_test.cc | 10 +- .../transport/direct_actor_transport.h | 14 +- 18 files changed, 822 insertions(+), 258 deletions(-) create mode 100644 src/ray/core_worker/actor_reporter.cc create mode 100644 src/ray/core_worker/actor_reporter.h create mode 100644 src/ray/core_worker/test/actor_manager_test.cc diff --git a/BUILD.bazel b/BUILD.bazel index 11a413ae8..152039c93 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -800,6 +800,16 @@ cc_test( ], ) +cc_test( + name = "actor_manager_test", + srcs = ["src/ray/core_worker/test/actor_manager_test.cc"], + copts = COPTS, + deps = [ + ":core_worker_lib", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "scheduling_test", srcs = ["src/ray/common/scheduling/scheduling_test.cc"], diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd index 969801655..9d7883fec 100644 --- a/python/ray/_raylet.pxd +++ b/python/ray/_raylet.pxd @@ -27,16 +27,16 @@ from ray.includes.function_descriptor cimport ( ) cdef extern from *: - """ - #if __OPTIMIZE__ && __OPTIMIZE__ == 1 - #undef __OPTIMIZE__ - int __OPTIMIZE__ = 1; - #define __OPTIMIZE__ 1 - #else - int __OPTIMIZE__ = 0; - #endif - """ - int __OPTIMIZE__ + """ + #if __OPTIMIZE__ && __OPTIMIZE__ == 1 + #undef __OPTIMIZE__ + int __OPTIMIZE__ = 1; + #define __OPTIMIZE__ 1 + #else + int __OPTIMIZE__ = 0; + #endif + """ + int __OPTIMIZE__ cdef extern from "Python.h": # Note(simon): This is used to configure asyncio actor stack size. @@ -98,7 +98,7 @@ cdef class CoreWorker: self, worker, outputs, const c_vector[CObjectID] return_ids, c_vector[shared_ptr[CRayObject]] *returns) cdef yield_current_fiber(self, CFiberEvent &fiber_event) - cdef make_actor_handle(self, CActorHandle *c_actor_handle) + cdef make_actor_handle(self, const CActorHandle *c_actor_handle) cdef class FunctionDescriptor: cdef: diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index c6db879e4..fc4f23a6f 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -112,8 +112,8 @@ include "includes/serialization.pxi" include "includes/libcoreworker.pxi" include "includes/global_state_accessor.pxi" -# Expose GCC & Clang macro to report whether C++ optimizations were enabled -# during compilation. +# Expose GCC & Clang macro to report +# whether C++ optimizations were enabled during compilation. OPTIMIZED = __OPTIMIZE__ logger = logging.getLogger(__name__) @@ -1014,7 +1014,7 @@ cdef class CoreWorker: CCoreWorkerProcess.GetCoreWorker().RemoveActorHandleReference( c_actor_id) - cdef make_actor_handle(self, CActorHandle *c_actor_handle): + cdef make_actor_handle(self, const CActorHandle *c_actor_handle): worker = ray.worker.global_worker worker.check_connected() manager = worker.function_actor_manager @@ -1058,24 +1058,27 @@ cdef class CoreWorker: ObjectID outer_object_id): cdef: - CActorHandle* c_actor_handle CObjectID c_outer_object_id = (outer_object_id.native() if outer_object_id else CObjectID.Nil()) - c_actor_id = (CCoreWorkerProcess.GetCoreWorker() + c_actor_id = (CCoreWorkerProcess + .GetCoreWorker() .DeserializeAndRegisterActorHandle( bytes, c_outer_object_id)) - check_status(CCoreWorkerProcess.GetCoreWorker().GetActorHandle( - c_actor_id, &c_actor_handle)) + cdef: + # NOTE: This handle should not be stored anywhere. + const CActorHandle* c_actor_handle = ( + CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id)) return self.make_actor_handle(c_actor_handle) def get_named_actor_handle(self, const c_string &name): cdef: - CActorHandle* c_actor_handle + # NOTE: This handle should not be stored anywhere. + const CActorHandle* c_actor_handle = ( + CCoreWorkerProcess.GetCoreWorker().GetNamedActorHandle(name)) - with nogil: - check_status(CCoreWorkerProcess.GetCoreWorker() - .GetNamedActorHandle(name, &c_actor_handle)) + if c_actor_handle == NULL: + raise ValueError("Named Actor Handle Not Found") return self.make_actor_handle(c_actor_handle) def serialize_actor_handle(self, ActorID actor_id): diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 2e612df54..d34fa37cd 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -123,10 +123,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus SerializeActorHandle(const CActorID &actor_id, c_string *bytes, CObjectID *c_actor_handle_id) - CRayStatus GetActorHandle(const CActorID &actor_id, - CActorHandle **actor_handle) const - CRayStatus GetNamedActorHandle(const c_string &name, - CActorHandle **actor_handle) + const CActorHandle* GetActorHandle(const CActorID &actor_id) const + const CActorHandle* GetNamedActorHandle(const c_string &name) void AddLocalReference(const CObjectID &object_id) void RemoveLocalReference(const CObjectID &object_id) const CAddress &GetRpcAddress() const diff --git a/python/ray/util/named_actors.py b/python/ray/util/named_actors.py index d929815a1..ff84fd9f5 100644 --- a/python/ray/util/named_actors.py +++ b/python/ray/util/named_actors.py @@ -74,7 +74,9 @@ def _register_actor(name, actor_handle): exists = False if exists: - raise ValueError("An actor with name={} already exists".format(name)) + raise ValueError("An actor with name={} already exists or there " + "was timeout in getting this actor handle." + .format(name)) # Add the actor to Redis if it does not already exist. _internal_kv_put(actor_name, pickle.dumps(actor_handle), overwrite=True) diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index 633875be6..89ce290ba 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -13,24 +13,171 @@ // limitations under the License. #include "ray/core_worker/actor_manager.h" + #include "ray/gcs/pb_util.h" #include "ray/gcs/redis_accessor.h" namespace ray { -void ActorManager::PublishTerminatedActor(const TaskSpecification &actor_creation_task) { - auto actor_id = actor_creation_task.ActorCreationId(); - auto data = gcs::CreateActorTableData(actor_creation_task, rpc::Address(), - rpc::ActorTableData::DEAD, 0); +ActorID ActorManager::RegisterActorHandle(std::unique_ptr actor_handle, + const ObjectID &outer_object_id, + const TaskID &caller_id, + const std::string &call_site, + const rpc::Address &caller_address) { + const ActorID actor_id = actor_handle->GetActorID(); + const rpc::Address owner_address = actor_handle->GetOwnerAddress(); + const auto actor_creation_return_id = ObjectID::ForActorHandle(actor_id); - auto update_callback = [actor_id](Status status) { - if (!status.ok()) { - // Only one node at a time should succeed at creating or updating the actor. - RAY_LOG(ERROR) << "Failed to update state to DEAD for actor " << actor_id - << ", error: " << status.ToString(); - } - }; - RAY_CHECK_OK(actor_accessor_.AsyncRegister(data, update_callback)); + RAY_UNUSED(AddActorHandle(std::move(actor_handle), + /*is_owner_handle=*/false, caller_id, call_site, + caller_address, actor_id, actor_creation_return_id)); + ObjectID actor_handle_id = ObjectID::ForActorHandle(actor_id); + reference_counter_->AddBorrowedObject(actor_handle_id, outer_object_id, owner_address); + return actor_id; +} + +const std::unique_ptr &ActorManager::GetActorHandle( + const ActorID &actor_id) { + absl::MutexLock lock(&mutex_); + auto it = actor_handles_.find(actor_id); + RAY_CHECK(it != actor_handles_.end()) + << "Cannot find an actor handle of id, " << actor_id + << ". This method should be called only when you ensure actor handles exists."; + return it->second; +} + +bool ActorManager::CheckActorHandleExists(const ActorID &actor_id) { + absl::MutexLock lock(&mutex_); + return actor_handles_.find(actor_id) != actor_handles_.end(); +} + +bool ActorManager::AddNewActorHandle(std::unique_ptr actor_handle, + const TaskID &caller_id, + const std::string &call_site, + const rpc::Address &caller_address, + bool is_detached) { + const auto &actor_id = actor_handle->GetActorID(); + const auto actor_creation_return_id = ObjectID::ForActorHandle(actor_id); + // Detached actor doesn't need ref counting. + if (!is_detached) { + reference_counter_->AddOwnedObject(actor_creation_return_id, + /*inner_ids=*/{}, caller_address, call_site, + /*object_size*/ -1, + /*is_reconstructable=*/true); + } + + return AddActorHandle(std::move(actor_handle), + /*is_owner_handle=*/!is_detached, caller_id, call_site, + caller_address, actor_id, actor_creation_return_id); +} + +bool ActorManager::AddActorHandle(std::unique_ptr actor_handle, + bool is_owner_handle, const TaskID &caller_id, + const std::string &call_site, + const rpc::Address &caller_address, + const ActorID &actor_id, + const ObjectID &actor_creation_return_id) { + reference_counter_->AddLocalReference(actor_creation_return_id, call_site); + direct_actor_submitter_->AddActorQueueIfNotExists(actor_id); + bool inserted; + { + absl::MutexLock lock(&mutex_); + inserted = actor_handles_.emplace(actor_id, std::move(actor_handle)).second; + } + if (inserted) { + // Register a callback to handle actor notifications. + auto actor_notification_callback = + std::bind(&ActorManager::HandleActorStateNotification, this, + std::placeholders::_1, std::placeholders::_2); + + RAY_CHECK_OK(gcs_client_->Actors().AsyncSubscribe( + actor_id, actor_notification_callback, nullptr)); + + RAY_CHECK(reference_counter_->SetDeleteCallback( + actor_creation_return_id, + [this, actor_id, is_owner_handle](const ObjectID &object_id) { + if (is_owner_handle) { + // If we own the actor and the actor handle is no longer in scope, + // terminate the actor. We do not do this if the GCS service is + // enabled since then the GCS will terminate the actor for us. + // TODO(sang): Remove this block once gcs_actor_service is enabled by default. + if (!RayConfig::instance().gcs_actor_service_enabled()) { + RAY_LOG(INFO) << "Owner's handle and creation ID " << object_id + << " has gone out of scope, sending message to actor " + << actor_id << " to do a clean exit."; + RAY_CHECK(CheckActorHandleExists(actor_id)); + direct_actor_submitter_->KillActor(actor_id, + /*force_kill=*/false, + /*no_restart=*/false); + } + } + + absl::MutexLock lock(&mutex_); + // TODO(swang): Erase the actor handle once all refs to the actor + // have gone out of scope. We cannot erase it here in case the + // language frontend receives another ref to the same actor. In this + // case, we must remember the last task counter that we sent to the + // actor. + // TODO(ekl) we can't unsubscribe to actor notifications here due to + // https://github.com/ray-project/ray/pull/6885 + auto callback = actor_out_of_scope_callbacks_.extract(actor_id); + if (callback) { + callback.mapped()(actor_id); + } + })); + } + + return inserted; +} + +void ActorManager::AddActorOutOfScopeCallback( + const ActorID &actor_id, + std::function actor_out_of_scope_callbacks) { + absl::MutexLock lock(&mutex_); + auto it = actor_handles_.find(actor_id); + if (it == actor_handles_.end()) { + actor_out_of_scope_callbacks(actor_id); + } else { + RAY_CHECK(actor_out_of_scope_callbacks_ + .emplace(actor_id, std::move(actor_out_of_scope_callbacks)) + .second); + } +} + +void ActorManager::HandleActorStateNotification(const ActorID &actor_id, + const gcs::ActorTableData &actor_data) { + if (actor_data.state() == gcs::ActorTableData::PENDING) { + // The actor is being created and not yet ready, just ignore! + } else if (actor_data.state() == gcs::ActorTableData::RESTARTING) { + direct_actor_submitter_->DisconnectActor(actor_id, false); + } else if (actor_data.state() == gcs::ActorTableData::DEAD) { + direct_actor_submitter_->DisconnectActor(actor_id, true); + // We cannot erase the actor handle here because clients can still + // submit tasks to dead actors. This also means we defer unsubscription, + // otherwise we crash when bulk unsubscribing all actor handles. + } else { + direct_actor_submitter_->ConnectActor(actor_id, actor_data.address()); + } + + const auto &actor_state = gcs::ActorTableData::ActorState_Name(actor_data.state()); + RAY_LOG(INFO) << "received notification on actor, state: " << actor_state + << ", actor_id: " << actor_id + << ", ip address: " << actor_data.address().ip_address() + << ", port: " << actor_data.address().port() << ", worker_id: " + << WorkerID::FromBinary(actor_data.address().worker_id()) + << ", raylet_id: " + << ClientID::FromBinary(actor_data.address().raylet_id()); +} + +std::vector ActorManager::GetActorHandleIDsFromHandles() { + absl::MutexLock lock(&mutex_); + std::vector actor_handle_ids; + for (const auto &handle : actor_handles_) { + auto actor_id = handle.first; + auto actor_handle_id = ObjectID::ForActorHandle(actor_id); + actor_handle_ids.push_back(actor_handle_id); + } + return actor_handle_ids; } } // namespace ray diff --git a/src/ray/core_worker/actor_manager.h b/src/ray/core_worker/actor_manager.h index a297c6b1e..0a6426d33 100644 --- a/src/ray/core_worker/actor_manager.h +++ b/src/ray/core_worker/actor_manager.h @@ -14,34 +14,145 @@ #pragma once +#include "absl/container/flat_hash_map.h" #include "ray/core_worker/actor_handle.h" +#include "ray/core_worker/reference_count.h" +#include "ray/core_worker/transport/direct_actor_transport.h" #include "ray/gcs/redis_gcs_client.h" namespace ray { -// Interface for testing. -class ActorManagerInterface { - public: - virtual void PublishTerminatedActor(const TaskSpecification &actor_creation_task) = 0; - - virtual ~ActorManagerInterface() {} -}; - /// Class to manage lifetimes of actors that we create (actor children). /// Currently this class is only used to publish actor DEAD event /// for actor creation task failures. All other cases are managed /// by raylet. -class ActorManager : public ActorManagerInterface { +class ActorManager { public: - ActorManager(gcs::ActorInfoAccessor &actor_accessor) - : actor_accessor_(actor_accessor) {} + explicit ActorManager( + std::shared_ptr gcs_client, + std::shared_ptr direct_actor_submitter, + std::shared_ptr reference_counter) + : gcs_client_(gcs_client), + direct_actor_submitter_(direct_actor_submitter), + reference_counter_(reference_counter) {} - /// Called when an actor that we own can no longer be restarted. - void PublishTerminatedActor(const TaskSpecification &actor_creation_task) override; + ~ActorManager() = default; + + friend class ActorManagerTest; + + /// Register an actor handle. + /// + /// This should be called when an actor handle is given to us by another task + /// or actor. This may be called even if we already have a handle to the same + /// actor. + /// + /// \param[in] actor_handle The actor handle. + /// \param[in] outer_object_id The object ID that contained the serialized + /// actor handle, if any. + /// \param[in] caller_id The caller's task ID + /// \param[in] call_site The caller's site. + /// \return The ActorID of the deserialized handle. + ActorID RegisterActorHandle(std::unique_ptr actor_handle, + const ObjectID &outer_object_id, const TaskID &caller_id, + const std::string &call_site, + const rpc::Address &caller_address); + + /// Get a handle to an actor. + /// + /// \param[in] actor_id The actor handle to get. + /// \return reference to the actor_handle's pointer. + /// NOTE: Returned actorHandle should not be stored anywhere. + const std::unique_ptr &GetActorHandle(const ActorID &actor_id); + + /// Check if an actor handle that corresponds to an actor_id exists. + /// \param[in] actor_id The actor id of a handle. + /// \return True if the actor_handle for an actor_id exists. False otherwise. + bool CheckActorHandleExists(const ActorID &actor_id); + + /// Give this worker a new handle to an actor. + /// + /// This handle will remain as long as the current actor or task is + /// executing, even if the Python handle goes out of scope. Tasks submitted + /// through this handle are guaranteed to execute in the same order in which + /// they are submitted. + /// + /// NOTE: Getting an actor handle from GCS (named actor) is considered as adding a new + /// actor handle. + /// + /// \param actor_handle The handle to the actor. + /// \param[in] caller_id The caller's task ID + /// \param[in] call_site The caller's site. + /// \param[in] is_detached Whether or not the actor of a handle is detached (named) + /// actor. \return True if the handle was added and False if we already had a handle to + /// the same actor. + bool AddNewActorHandle(std::unique_ptr actor_handle, + const TaskID &caller_id, const std::string &call_site, + const rpc::Address &caller_address, bool is_detached); + + /// Add a callback that is called when an actor goes out of scope. + /// + /// \param actor_id The actor id that owns the callback. + /// \param actor_out_of_scope_callbacks The callback function that will be called when + /// an actor_id goes out of scope. + void AddActorOutOfScopeCallback( + const ActorID &actor_id, + std::function actor_out_of_scope_callbacks); + + /// Get a list of actor_ids from existing actor handles. + /// This is used for debugging purpose. + std::vector GetActorHandleIDsFromHandles(); private: - /// Global database of actors. - gcs::ActorInfoAccessor &actor_accessor_; + /// Give this worker a handle to an actor. + /// + /// This handle will remain as long as the current actor or task is + /// executing, even if the Python handle goes out of scope. Tasks submitted + /// through this handle are guaranteed to execute in the same order in which + /// they are submitted. + /// + /// \param actor_handle The handle to the actor. + /// \param is_owner_handle Whether this is the owner's handle to the actor. + /// The owner is the creator of the actor and is responsible for telling the + /// actor to disconnect once all handles are out of scope. + /// \param[in] caller_id The caller's task ID + /// \param[in] call_site The caller's site. + /// \param[in] actor_id The id of an actor + /// \param[in] actor_creation_return_id object id of this actor creation + /// \return True if the handle was added and False if we already had a handle + /// to the same actor. + bool AddActorHandle(std::unique_ptr actor_handle, bool is_owner_handle, + const TaskID &caller_id, const std::string &call_site, + const rpc::Address &caller_address, const ActorID &actor_id, + const ObjectID &actor_creation_return_id); + + /// Handle actor state notification published from GCS. + /// + /// \param[in] actor_id The actor id of this notification. + /// \param[in] actor_data The GCS actor data. + void HandleActorStateNotification(const ActorID &actor_id, + const gcs::ActorTableData &actor_data); + + /// GCS client + std::shared_ptr gcs_client_; + + /// Interface to submit tasks directly to other actors. + std::shared_ptr direct_actor_submitter_; + + /// Used to keep track of actor handle reference counts. + /// All actor handle related ref counting logic should be included here. + std::shared_ptr reference_counter_; + + mutable absl::Mutex mutex_; + + /// Map from actor ID to a handle to that actor. + /// Actor handle is a logical abstraction that holds actor handle's states. + absl::flat_hash_map> actor_handles_ + GUARDED_BY(mutex_); + + /// Map from actor ID to a callback. Callback is called when + /// the corresponding handles are gone out of scope. + absl::flat_hash_map> + actor_out_of_scope_callbacks_ GUARDED_BY(mutex_); }; } // namespace ray diff --git a/src/ray/core_worker/actor_reporter.cc b/src/ray/core_worker/actor_reporter.cc new file mode 100644 index 000000000..049256a11 --- /dev/null +++ b/src/ray/core_worker/actor_reporter.cc @@ -0,0 +1,37 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/core_worker/actor_reporter.h" + +#include "ray/gcs/pb_util.h" +#include "ray/gcs/redis_accessor.h" + +namespace ray { + +void ActorReporter::PublishTerminatedActor(const TaskSpecification &actor_creation_task) { + auto actor_id = actor_creation_task.ActorCreationId(); + auto data = gcs::CreateActorTableData(actor_creation_task, rpc::Address(), + rpc::ActorTableData::DEAD, 0); + + auto update_callback = [actor_id](Status status) { + if (!status.ok()) { + // Only one node at a time should succeed at creating or updating the actor. + RAY_LOG(ERROR) << "Failed to update state to DEAD for actor " << actor_id + << ", error: " << status.ToString(); + } + }; + RAY_CHECK_OK(gcs_client_->Actors().AsyncRegister(data, update_callback)); +} + +} // namespace ray diff --git a/src/ray/core_worker/actor_reporter.h b/src/ray/core_worker/actor_reporter.h new file mode 100644 index 000000000..1a1323de9 --- /dev/null +++ b/src/ray/core_worker/actor_reporter.h @@ -0,0 +1,44 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "ray/gcs/redis_gcs_client.h" + +namespace ray { + +// TODO(SANG): This class won't be needed once GCS actor mangement becomes the default. +// Interface for testing. +class ActorReporterInterface { + public: + virtual void PublishTerminatedActor(const TaskSpecification &actor_creation_task) = 0; + + virtual ~ActorReporterInterface() {} +}; + +class ActorReporter : public ActorReporterInterface { + public: + ActorReporter(std::shared_ptr gcs_client) : gcs_client_(gcs_client) {} + + ~ActorReporter() {} + + /// Called when an actor that we own can no longer be restarted. + void PublishTerminatedActor(const TaskSpecification &actor_creation_task) override; + + private: + /// GCS client + std::shared_ptr gcs_client_; +}; + +} // namespace ray diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index a2553ffb0..c761309f1 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -329,8 +329,6 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ }; RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, nullptr)); - actor_manager_ = std::unique_ptr(new ActorManager(gcs_client_->Actors())); - // Initialize profiler. profiler_ = std::make_shared( worker_context_, options_.node_ip_address, io_service_, gcs_client_); @@ -353,6 +351,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ boost::asio::chrono::milliseconds(kInternalHeartbeatMillis)); internal_timer_.async_wait(boost::bind(&CoreWorker::InternalHeartbeat, this, _1)); + actor_reporter_ = std::unique_ptr(new ActorReporter(gcs_client_)); + plasma_store_provider_.reset(new CoreWorkerPlasmaStoreProvider( options_.store_socket, local_raylet_client_, options_.check_signals, /*evict_if_full=*/RayConfig::instance().object_pinning_enabled(), @@ -377,7 +377,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ }); }; task_manager_.reset(new TaskManager( - memory_store_, reference_counter_, actor_manager_, + memory_store_, reference_counter_, actor_reporter_, [this](const TaskSpecification &spec, bool delay) { if (delay) { // Retry after a delay to emulate the existing Raylet reconstruction @@ -434,7 +434,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ }; } - direct_actor_submitter_ = std::unique_ptr( + direct_actor_submitter_ = std::shared_ptr( new CoreWorkerDirectActorTaskSubmitter(client_factory, memory_store_, task_manager_)); @@ -450,6 +450,9 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ direct_task_receiver_->Init(client_factory, rpc_address_, local_raylet_client_); } + actor_manager_ = std::unique_ptr( + new ActorManager(gcs_client_, direct_actor_submitter_, reference_counter_)); + auto object_lookup_fn = [this](const ObjectID &object_id, const ObjectLookupCallback &callback) { return gcs_client_->Objects().AsyncGetLocations( @@ -692,12 +695,10 @@ void CoreWorker::InternalHeartbeat(const boost::system::error_code &error) { std::unordered_map> CoreWorker::GetAllReferenceCounts() const { auto counts = reference_counter_->GetAllReferenceCounts(); - absl::MutexLock lock(&actor_handles_mutex_); + std::vector actor_handle_ids = actor_manager_->GetActorHandleIDsFromHandles(); // Strip actor IDs from the ref counts since there is no associated ObjectID // in the language frontend. - for (const auto &handle : actor_handles_) { - auto actor_id = handle.first; - auto actor_handle_id = ObjectID::ForActorHandle(actor_id); + for (const auto &actor_handle_id : actor_handle_ids) { counts.erase(actor_handle_id); } return counts; @@ -1158,8 +1159,9 @@ Status CoreWorker::CreateActor(const RayFunction &function, actor_id, GetCallerId(), rpc_address_, job_id, /*actor_cursor=*/return_ids[0], function.GetLanguage(), function.GetFunctionDescriptor(), extension_data, actor_creation_options.max_task_retries)); - RAY_CHECK(AddActorHandle(std::move(actor_handle), - /*is_owner_handle=*/!actor_creation_options.is_detached)) + RAY_CHECK(actor_manager_->AddNewActorHandle(std::move(actor_handle), GetCallerId(), + CurrentCallSite(), rpc_address_, + actor_creation_options.is_detached)) << "Actor " << actor_id << " already exists"; *return_actor_id = actor_id; @@ -1186,8 +1188,8 @@ void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &fun const std::vector> &args, const TaskOptions &task_options, std::vector *return_ids) { - ActorHandle *actor_handle = nullptr; - RAY_CHECK_OK(GetActorHandle(actor_id, &actor_handle)); + const std::unique_ptr &actor_handle = + actor_manager_->GetActorHandle(actor_id); // Add one for actor cursor object id for tasks. const int num_returns = task_options.num_returns + 1; @@ -1223,9 +1225,8 @@ void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &fun } Status CoreWorker::CancelTask(const ObjectID &object_id, bool force_kill) { - ActorHandle *h = nullptr; if (!object_id.CreatedByTask() || - GetActorHandle(object_id.TaskId().ActorId(), &h).ok()) { + actor_manager_->CheckActorHandleExists(object_id.TaskId().ActorId())) { return Status::Invalid("Actor task cancellation is not supported."); } rpc::Address obj_addr; @@ -1244,8 +1245,11 @@ Status CoreWorker::CancelTask(const ObjectID &object_id, bool force_kill) { } Status CoreWorker::KillActor(const ActorID &actor_id, bool force_kill, bool no_restart) { - ActorHandle *actor_handle = nullptr; - RAY_RETURN_NOT_OK(GetActorHandle(actor_id, &actor_handle)); + if (!actor_manager_->CheckActorHandleExists(actor_id)) { + std::stringstream stream; + stream << "Failed to find a corresponding actor handle for " << actor_id; + return Status::Invalid(stream.str()); + } direct_actor_submitter_->KillActor(actor_id, force_kill, no_restart); return Status::OK(); } @@ -1258,125 +1262,25 @@ void CoreWorker::RemoveActorHandleReference(const ActorID &actor_id) { ActorID CoreWorker::DeserializeAndRegisterActorHandle(const std::string &serialized, const ObjectID &outer_object_id) { std::unique_ptr actor_handle(new ActorHandle(serialized)); - const auto actor_id = actor_handle->GetActorID(); - const auto owner_address = actor_handle->GetOwnerAddress(); - - RAY_UNUSED(AddActorHandle(std::move(actor_handle), /*is_owner_handle=*/false)); - - ObjectID actor_handle_id = ObjectID::ForActorHandle(actor_id); - reference_counter_->AddBorrowedObject(actor_handle_id, outer_object_id, owner_address); - - return actor_id; + return actor_manager_->RegisterActorHandle(std::move(actor_handle), outer_object_id, + GetCallerId(), CurrentCallSite(), + rpc_address_); } Status CoreWorker::SerializeActorHandle(const ActorID &actor_id, std::string *output, ObjectID *actor_handle_id) const { - ActorHandle *actor_handle = nullptr; - auto status = GetActorHandle(actor_id, &actor_handle); - if (status.ok()) { - actor_handle->Serialize(output); - *actor_handle_id = ObjectID::ForActorHandle(actor_id); - } - return status; -} - -bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle, - bool is_owner_handle) { - const auto &actor_id = actor_handle->GetActorID(); - const auto actor_creation_return_id = ObjectID::ForActorHandle(actor_id); - if (is_owner_handle) { - reference_counter_->AddOwnedObject(actor_creation_return_id, - /*inner_ids=*/{}, rpc_address_, CurrentCallSite(), - -1, - /*is_reconstructable=*/true); - } - - reference_counter_->AddLocalReference(actor_creation_return_id, CurrentCallSite()); - direct_actor_submitter_->AddActorQueueIfNotExists(actor_id); - - bool inserted; - { - absl::MutexLock lock(&actor_handles_mutex_); - inserted = actor_handles_.emplace(actor_id, std::move(actor_handle)).second; - } - - if (inserted) { - // Register a callback to handle actor notifications. - auto actor_notification_callback = [this](const ActorID &actor_id, - const gcs::ActorTableData &actor_data) { - if (actor_data.state() == gcs::ActorTableData::PENDING) { - // The actor is being created and not yet ready, just ignore! - } else if (actor_data.state() == gcs::ActorTableData::RESTARTING) { - direct_actor_submitter_->DisconnectActor(actor_id, false); - } else if (actor_data.state() == gcs::ActorTableData::DEAD) { - direct_actor_submitter_->DisconnectActor(actor_id, true); - // We cannot erase the actor handle here because clients can still - // submit tasks to dead actors. This also means we defer unsubscription, - // otherwise we crash when bulk unsubscribing all actor handles. - } else { - direct_actor_submitter_->ConnectActor(actor_id, actor_data.address()); - } - - const auto &actor_state = gcs::ActorTableData::ActorState_Name(actor_data.state()); - RAY_LOG(INFO) << "received notification on actor, state: " << actor_state - << ", actor_id: " << actor_id - << ", ip address: " << actor_data.address().ip_address() - << ", port: " << actor_data.address().port() << ", worker_id: " - << WorkerID::FromBinary(actor_data.address().worker_id()) - << ", raylet_id: " - << ClientID::FromBinary(actor_data.address().raylet_id()); - }; - - RAY_CHECK_OK(gcs_client_->Actors().AsyncSubscribe( - actor_id, actor_notification_callback, nullptr)); - - RAY_CHECK(reference_counter_->SetDeleteCallback( - actor_creation_return_id, - [this, actor_id, is_owner_handle](const ObjectID &object_id) { - if (is_owner_handle) { - // If we own the actor and the actor handle is no longer in scope, - // terminate the actor. We do not do this if the GCS service is - // enabled since then the GCS will terminate the actor for us. - if (!RayConfig::instance().gcs_actor_service_enabled()) { - RAY_LOG(INFO) << "Owner's handle and creation ID " << object_id - << " has gone out of scope, sending message to actor " - << actor_id << " to do a clean exit."; - RAY_CHECK_OK( - KillActor(actor_id, /*force_kill=*/false, /*no_restart=*/false)); - } - } - - absl::MutexLock lock(&actor_handles_mutex_); - // TODO(swang): Erase the actor handle once all refs to the actor - // have gone out of scope. We cannot erase it here in case the - // language frontend receives another ref to the same actor. In this - // case, we must remember the last task counter that we sent to the - // actor. - // TODO(ekl) we can't unsubscribe to actor notifications here due to - // https://github.com/ray-project/ray/pull/6885 - auto callback = actor_out_of_scope_callbacks_.extract(actor_id); - if (callback) { - callback.mapped()(actor_id); - } - })); - } - - return inserted; -} - -Status CoreWorker::GetActorHandle(const ActorID &actor_id, - ActorHandle **actor_handle) const { - absl::MutexLock lock(&actor_handles_mutex_); - auto it = actor_handles_.find(actor_id); - if (it == actor_handles_.end()) { - return Status::Invalid("Handle for actor does not exist"); - } - *actor_handle = it->second.get(); + const std::unique_ptr &actor_handle = + actor_manager_->GetActorHandle(actor_id); + actor_handle->Serialize(output); + *actor_handle_id = ObjectID::ForActorHandle(actor_id); return Status::OK(); } -Status CoreWorker::GetNamedActorHandle(const std::string &name, - ActorHandle **actor_handle) { +const ActorHandle *CoreWorker::GetActorHandle(const ActorID &actor_id) const { + return actor_manager_->GetActorHandle(actor_id).get(); +} + +const ActorHandle *CoreWorker::GetNamedActorHandle(const std::string &name) { RAY_CHECK(RayConfig::instance().gcs_actor_service_enabled()); RAY_CHECK(!name.empty()); @@ -1393,7 +1297,9 @@ Status CoreWorker::GetNamedActorHandle(const std::string &name, if (status.ok() && result) { auto actor_handle = std::unique_ptr(new ActorHandle(*result)); actor_id = actor_handle->GetActorID(); - AddActorHandle(std::move(actor_handle), /*is_owner_handle=*/false); + actor_manager_->AddNewActorHandle(std::move(actor_handle), GetCallerId(), + CurrentCallSite(), rpc_address_, + /*is_detached*/ true); } else { RAY_LOG(INFO) << "Failed to look up actor with name: " << name; // Use a NIL actor ID to signal that the actor wasn't found. @@ -1401,26 +1307,24 @@ Status CoreWorker::GetNamedActorHandle(const std::string &name, } ready_promise.set_value(); })); - // Block until the RPC completes. Set a timeout to avoid hangs if the // GCS service crashes. if (ready_promise.get_future().wait_for(std::chrono::seconds(5)) != std::future_status::ready) { - return Status::TimedOut("Timed out trying to get named actor."); + RAY_LOG(ERROR) << "There was timeout in getting the actor handle. It is probably " + "because GCS server is dead or there's a high load there."; + return nullptr; } - Status status; if (actor_id.IsNil()) { - std::stringstream stream; - stream + RAY_LOG(WARNING) << "Failed to look up actor with name '" << name - << "'. It is either you look up the named actor you didn't create or the named " + << "'. It is either you look up the named actor you didn't create or the named" "actor hasn't been created because named actor creation is asynchronous."; - status = Status::NotFound(stream.str()); - } else { - status = GetActorHandle(actor_id, actor_handle); + return nullptr; } - return status; + + return GetActorHandle(actor_id); } const ResourceMappingType CoreWorker::GetResourceIDs() const { @@ -1807,13 +1711,7 @@ void CoreWorker::HandleWaitForActorOutOfScope( const auto actor_id = ActorID::FromBinary(request.actor_id()); RAY_LOG(DEBUG) << "Received HandleWaitForActorOutOfScope for " << actor_id; - absl::MutexLock lock(&actor_handles_mutex_); - auto it = actor_handles_.find(actor_id); - if (it == actor_handles_.end()) { - respond(actor_id); - } else { - RAY_CHECK(actor_out_of_scope_callbacks_.emplace(actor_id, std::move(respond)).second); - } + actor_manager_->AddActorOutOfScopeCallback(actor_id, std::move(respond)); } void CoreWorker::HandleWaitForObjectEviction( diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 66f38ed7d..5fa20af72 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -19,6 +19,7 @@ #include "ray/common/buffer.h" #include "ray/core_worker/actor_handle.h" #include "ray/core_worker/actor_manager.h" +#include "ray/core_worker/actor_reporter.h" #include "ray/core_worker/common.h" #include "ray/core_worker/context.h" #include "ray/core_worker/future_resolver.h" @@ -672,17 +673,22 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Get a handle to an actor. /// + /// NOTE: This function should be called ONLY WHEN we know actor handle exists. + /// NOTE: The actor_handle obtained by this function should not be stored anywhere + /// because this method returns the raw pointer to what a unique pointer points to. + /// /// \param[in] actor_id The actor handle to get. - /// \param[out] actor_handle A handle to the requested actor. /// \return Status::Invalid if we don't have this actor handle. - Status GetActorHandle(const ActorID &actor_id, ActorHandle **actor_handle) const; + const ActorHandle *GetActorHandle(const ActorID &actor_id) const; /// Get a handle to a named actor. /// + /// NOTE: The actor_handle obtained by this function should not be stored anywhere. + /// /// \param[in] name The name of the actor whose handle to get. /// \param[out] actor_handle A handle to the requested actor. - /// \return Status::NotFound if an actor with the specified name wasn't found. - Status GetNamedActorHandle(const std::string &name, ActorHandle **actor_handle); + /// \return The raw pointer to the actor handle if found, nullptr otherwise. + const ActorHandle *GetNamedActorHandle(const std::string &name); /// /// The following methods are handlers for the core worker's gRPC server, which follow @@ -814,21 +820,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { reference_counter_->AddLocalReference(object_id, call_site); } - /// Give this worker a handle to an actor. - /// - /// This handle will remain as long as the current actor or task is - /// executing, even if the Python handle goes out of scope. Tasks submitted - /// through this handle are guaranteed to execute in the same order in which - /// they are submitted. - /// - /// \param actor_handle The handle to the actor. - /// \param is_owner_handle Whether this is the owner's handle to the actor. - /// The owner is the creator of the actor and is responsible for telling the - /// actor to disconnect once all handles are out of scope. - /// \return True if the handle was added and False if we already had a handle - /// to the same actor. - bool AddActorHandle(std::unique_ptr actor_handle, bool is_owner_handle); - /// /// Private methods related to task execution. Should not be used by driver processes. /// @@ -992,10 +983,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { std::shared_ptr task_manager_; // Interface for publishing actor death event for actor creation failure. - std::shared_ptr actor_manager_; + std::shared_ptr actor_reporter_; // Interface to submit tasks directly to other actors. - std::unique_ptr direct_actor_submitter_; + std::shared_ptr direct_actor_submitter_; // Interface to submit non-actor tasks directly to leased workers. std::unique_ptr direct_task_submitter_; @@ -1003,20 +994,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Manages recovery of objects stored in remote plasma nodes. std::unique_ptr object_recovery_manager_; - // TODO(swang): Refactor to merge actor_handles_mutex_ and all fields that it - // protects into the ActorManager. - /// The `actor_handles_` field could be mutated concurrently due to multi-threading, we - /// need a mutex to protect it. - mutable absl::Mutex actor_handles_mutex_; + /// + /// Fields related to actor handles. + /// - /// Map from actor ID to a handle to that actor. - absl::flat_hash_map> actor_handles_ - GUARDED_BY(actor_handles_mutex_); - - /// Map from actor ID to a callback to call when all local handles to that - /// actor have gone out of scpoe. - absl::flat_hash_map> - actor_out_of_scope_callbacks_ GUARDED_BY(actor_handles_mutex_); + /// Interface to manage actor handles. + std::unique_ptr actor_manager_; /// /// Fields related to task execution. diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc b/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc index d84a9537c..ee13b5c51 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc @@ -13,7 +13,9 @@ // limitations under the License. #include "ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.h" + #include + #include "ray/common/id.h" #include "ray/core_worker/actor_handle.h" #include "ray/core_worker/common.h" @@ -27,10 +29,8 @@ extern "C" { JNIEXPORT jint JNICALL Java_io_ray_runtime_actor_NativeActorHandle_nativeGetLanguage( JNIEnv *env, jclass o, jbyteArray actorId) { auto actor_id = JavaByteArrayToId(env, actorId); - ray::ActorHandle *native_actor_handle = nullptr; - auto status = ray::CoreWorkerProcess::GetCoreWorker().GetActorHandle( - actor_id, &native_actor_handle); - THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, false); + const ray::ActorHandle *native_actor_handle = + ray::CoreWorkerProcess::GetCoreWorker().GetActorHandle(actor_id); return native_actor_handle->ActorLanguage(); } @@ -38,10 +38,8 @@ JNIEXPORT jobject JNICALL Java_io_ray_runtime_actor_NativeActorHandle_nativeGetActorCreationTaskFunctionDescriptor( JNIEnv *env, jclass o, jbyteArray actorId) { auto actor_id = JavaByteArrayToId(env, actorId); - ray::ActorHandle *native_actor_handle = nullptr; - auto status = ray::CoreWorkerProcess::GetCoreWorker().GetActorHandle( - actor_id, &native_actor_handle); - THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); + const ray::ActorHandle *native_actor_handle = + ray::CoreWorkerProcess::GetCoreWorker().GetActorHandle(actor_id); auto function_descriptor = native_actor_handle->ActorCreationTaskFunctionDescriptor(); return NativeRayFunctionDescriptorToJavaStringList(env, function_descriptor); } @@ -59,8 +57,9 @@ JNIEXPORT jbyteArray JNICALL Java_io_ray_runtime_actor_NativeActorHandle_nativeS return bytes; } -JNIEXPORT jbyteArray JNICALL Java_io_ray_runtime_actor_NativeActorHandle_nativeDeserialize( - JNIEnv *env, jclass o, jbyteArray data) { +JNIEXPORT jbyteArray JNICALL +Java_io_ray_runtime_actor_NativeActorHandle_nativeDeserialize(JNIEnv *env, jclass o, + jbyteArray data) { auto buffer = JavaByteArrayToNativeBuffer(env, data); RAY_CHECK(buffer->Size() > 0); auto binary = std::string(reinterpret_cast(buffer->Data()), buffer->Size()); diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index f35d0065d..915572472 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -14,6 +14,8 @@ #pragma once +#include + #include "absl/base/thread_annotations.h" #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" @@ -24,13 +26,32 @@ #include "ray/rpc/worker/core_worker_client.h" #include "ray/util/logging.h" -#include - namespace ray { +// Interface for mocking. +class ReferenceCounterInterface { + public: + virtual void AddLocalReference(const ObjectID &object_id, + const std::string &call_site) = 0; + virtual bool AddBorrowedObject(const ObjectID &object_id, const ObjectID &outer_id, + const rpc::Address &owner_address) = 0; + virtual void AddOwnedObject(const ObjectID &object_id, + const std::vector &contained_ids, + const rpc::Address &owner_address, + const std::string &call_site, const int64_t object_size, + bool is_reconstructable, + const absl::optional &pinned_at_raylet_id = + absl::optional()) = 0; + virtual bool SetDeleteCallback( + const ObjectID &object_id, + const std::function callback) = 0; + + virtual ~ReferenceCounterInterface() {} +}; + /// Class used by the core worker to keep track of ObjectID reference counts for garbage /// collection. This class is thread safe. -class ReferenceCounter { +class ReferenceCounter : public ReferenceCounterInterface { public: using ReferenceTableProto = ::google::protobuf::RepeatedPtrField; diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 8c4ad8dc0..323327653 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -447,7 +447,7 @@ void TaskManager::MarkPendingTaskFailed(const TaskID &task_id, if (spec.IsActorCreationTask()) { // Publish actor death if actor creation task failed after // a number of retries. - actor_manager_->PublishTerminatedActor(spec); + actor_reporter_->PublishTerminatedActor(spec); } } diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 17b975a18..c89c121cc 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -19,7 +19,7 @@ #include "absl/synchronization/mutex.h" #include "ray/common/id.h" #include "ray/common/task/task.h" -#include "ray/core_worker/actor_manager.h" +#include "ray/core_worker/actor_reporter.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" #include "ray/protobuf/core_worker.pb.h" #include "ray/protobuf/gcs.pb.h" @@ -58,13 +58,13 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa public: TaskManager(std::shared_ptr in_memory_store, std::shared_ptr reference_counter, - std::shared_ptr actor_manager, + std::shared_ptr actor_reporter, RetryTaskCallback retry_task_callback, const std::function &check_node_alive, ReconstructObjectCallback reconstruct_object_callback) : in_memory_store_(in_memory_store), reference_counter_(reference_counter), - actor_manager_(actor_manager), + actor_reporter_(actor_reporter), retry_task_callback_(retry_task_callback), check_node_alive_(check_node_alive), reconstruct_object_callback_(reconstruct_object_callback) { @@ -235,7 +235,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa std::shared_ptr reference_counter_; // Interface for publishing actor creation. - std::shared_ptr actor_manager_; + std::shared_ptr actor_reporter_; /// Called when a task should be retried. const RetryTaskCallback retry_task_callback_; diff --git a/src/ray/core_worker/test/actor_manager_test.cc b/src/ray/core_worker/test/actor_manager_test.cc new file mode 100644 index 000000000..9320f9925 --- /dev/null +++ b/src/ray/core_worker/test/actor_manager_test.cc @@ -0,0 +1,299 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/core_worker/actor_manager.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include "ray/common/task/task_spec.h" +#include "ray/common/test_util.h" +#include "ray/core_worker/actor_reporter.h" +#include "ray/core_worker/reference_count.h" +#include "ray/core_worker/transport/direct_actor_transport.h" +#include "ray/gcs/redis_accessor.h" +#include "ray/gcs/redis_gcs_client.h" + +namespace ray { + +using ::testing::_; + +class MockActorInfoAccessor : public gcs::RedisActorInfoAccessor { + public: + MockActorInfoAccessor(gcs::RedisGcsClient *client) + : gcs::RedisActorInfoAccessor(client) {} + + ~MockActorInfoAccessor() {} + + ray::Status AsyncSubscribe( + const ActorID &actor_id, + const gcs::SubscribeCallback &subscribe, + const gcs::StatusCallback &done) { + auto callback_entry = std::make_pair(actor_id, subscribe); + callback_map_.emplace(actor_id, subscribe); + return Status::OK(); + } + + bool ActorStateNotificationPublished(const ActorID &actor_id, + const gcs::ActorTableData &actor_data) { + auto it = callback_map_.find(actor_id); + if (it == callback_map_.end()) return false; + auto actor_state_notification_callback = it->second; + actor_state_notification_callback(actor_id, actor_data); + return true; + } + + bool CheckSubscriptionRequested(const ActorID &actor_id) { + return callback_map_.find(actor_id) != callback_map_.end(); + } + + absl::flat_hash_map> + callback_map_; +}; + +class MockGcsClient : public gcs::RedisGcsClient { + public: + MockGcsClient(const gcs::GcsClientOptions &options) : gcs::RedisGcsClient(options) {} + + void Init(MockActorInfoAccessor *actor_accesor_mock) { + actor_accessor_.reset(actor_accesor_mock); + } + + ~MockGcsClient() {} +}; + +class MockDirectActorSubmitter : public CoreWorkerDirectActorTaskSubmitterInterface { + public: + MockDirectActorSubmitter() : CoreWorkerDirectActorTaskSubmitterInterface() {} + + MOCK_METHOD1(AddActorQueueIfNotExists, void(const ActorID &actor_id)); + MOCK_METHOD2(ConnectActor, void(const ActorID &actor_id, const rpc::Address &address)); + MOCK_METHOD2(DisconnectActor, void(const ActorID &actor_id, bool dead)); + MOCK_METHOD3(KillActor, + void(const ActorID &actor_id, bool force_kill, bool no_restart)); + + virtual ~MockDirectActorSubmitter() {} +}; + +class MockReferenceCounter : public ReferenceCounterInterface { + public: + MockReferenceCounter() : ReferenceCounterInterface() {} + + MOCK_METHOD2(AddLocalReference, + void(const ObjectID &object_id, const std::string &call_sit)); + + MOCK_METHOD3(AddBorrowedObject, + bool(const ObjectID &object_id, const ObjectID &outer_id, + const rpc::Address &owner_address)); + + MOCK_METHOD7(AddOwnedObject, + void(const ObjectID &object_id, const std::vector &contained_ids, + const rpc::Address &owner_address, const std::string &call_site, + const int64_t object_size, bool is_reconstructable, + const absl::optional &pinned_at_raylet_id)); + + MOCK_METHOD2(SetDeleteCallback, + bool(const ObjectID &object_id, + const std::function callback)); + + virtual ~MockReferenceCounter() {} +}; + +class ActorManagerTest : public ::testing::Test { + public: + ActorManagerTest() + : options_("", 1, ""), + gcs_client_mock_(new MockGcsClient(options_)), + actor_info_accessor_(new MockActorInfoAccessor(gcs_client_mock_.get())), + direct_actor_submitter_(new MockDirectActorSubmitter()), + reference_counter_(new MockReferenceCounter()) { + gcs_client_mock_->Init(actor_info_accessor_); + } + + ~ActorManagerTest() {} + + void SetUp() { + actor_manager_ = std::make_shared( + gcs_client_mock_, direct_actor_submitter_, reference_counter_); + } + + void TearDown() { actor_manager_.reset(); } + + ActorID AddActorHandle() const { + JobID job_id = JobID::FromInt(1); + const TaskID task_id = TaskID::ForDriverTask(job_id); + ActorID actor_id = ActorID::Of(job_id, task_id, 1); + const auto caller_address = rpc::Address(); + const auto call_site = ""; + RayFunction function(ray::Language::PYTHON, + ray::FunctionDescriptorBuilder::BuildPython("", "", "", "")); + + auto actor_handle = absl::make_unique( + actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(), + function.GetLanguage(), function.GetFunctionDescriptor(), "", 0); + EXPECT_CALL(*reference_counter_, SetDeleteCallback(_, _)) + .WillRepeatedly(testing::Return(true)); + actor_manager_->AddNewActorHandle(move(actor_handle), task_id, call_site, + caller_address, /*is_detached*/ false); + return actor_id; + } + + gcs::GcsClientOptions options_; + std::shared_ptr gcs_client_mock_; + MockActorInfoAccessor *actor_info_accessor_; + std::shared_ptr direct_actor_submitter_; + std::shared_ptr reference_counter_; + std::shared_ptr actor_manager_; +}; + +TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) { + JobID job_id = JobID::FromInt(1); + const TaskID task_id = TaskID::ForDriverTask(job_id); + ActorID actor_id = ActorID::Of(job_id, task_id, 1); + const auto caller_address = rpc::Address(); + const auto call_site = ""; + RayFunction function(ray::Language::PYTHON, + ray::FunctionDescriptorBuilder::BuildPython("", "", "", "")); + auto actor_handle = absl::make_unique( + actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(), + function.GetLanguage(), function.GetFunctionDescriptor(), "", 0); + EXPECT_CALL(*reference_counter_, SetDeleteCallback(_, _)) + .WillRepeatedly(testing::Return(true)); + + // Add an actor handle. + ASSERT_TRUE(actor_manager_->AddNewActorHandle(move(actor_handle), task_id, call_site, + caller_address, false)); + // Make sure the subscription request is sent to GCS. + ASSERT_TRUE(actor_info_accessor_->CheckSubscriptionRequested(actor_id)); + ASSERT_TRUE(actor_manager_->CheckActorHandleExists(actor_id)); + + auto actor_handle2 = absl::make_unique( + actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(), + function.GetLanguage(), function.GetFunctionDescriptor(), "", 0); + // Make sure the same actor id adding will return false. + ASSERT_FALSE(actor_manager_->AddNewActorHandle(move(actor_handle2), task_id, call_site, + caller_address, false)); + // Make sure we can get an actor handle correctly. + const std::unique_ptr &actor_handle_to_get = + actor_manager_->GetActorHandle(actor_id); + ASSERT_TRUE(actor_handle_to_get->GetActorID() == actor_id); + + // Check after the actor is created, if it is connected to an actor. + EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _)).Times(1); + rpc::ActorTableData actor_table_data; + actor_table_data.set_actor_id(actor_id.Binary()); + actor_table_data.set_state( + rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE); + actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data); + + // Now actor state is updated to DEAD. Make sure it is diconnected. + EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _)).Times(1); + actor_table_data.set_actor_id(actor_id.Binary()); + actor_table_data.set_state( + rpc::ActorTableData_ActorState::ActorTableData_ActorState_DEAD); + actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data); +} + +TEST_F(ActorManagerTest, TestCheckActorHandleDoesntExists) { + JobID job_id = JobID::FromInt(2); + const TaskID task_id = TaskID::ForDriverTask(job_id); + ActorID actor_id = ActorID::Of(job_id, task_id, 1); + ASSERT_FALSE(actor_manager_->CheckActorHandleExists(actor_id)); +} + +TEST_F(ActorManagerTest, RegisterActorHandles) { + JobID job_id = JobID::FromInt(1); + const TaskID task_id = TaskID::ForDriverTask(job_id); + ActorID actor_id = ActorID::Of(job_id, task_id, 1); + const auto caller_address = rpc::Address(); + const auto call_site = ""; + RayFunction function(ray::Language::PYTHON, + ray::FunctionDescriptorBuilder::BuildPython("", "", "", "")); + auto actor_handle = absl::make_unique( + actor_id, TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(), + function.GetLanguage(), function.GetFunctionDescriptor(), "", 0); + EXPECT_CALL(*reference_counter_, SetDeleteCallback(_, _)) + .WillRepeatedly(testing::Return(true)); + ObjectID outer_object_id = ObjectID::Nil(); + + // Sinece RegisterActor happens in a non-owner worker, we should + // make sure it borrows an object. + EXPECT_CALL(*reference_counter_, AddBorrowedObject(_, _, _)); + ActorID returned_actor_id = actor_manager_->RegisterActorHandle( + std::move(actor_handle), outer_object_id, task_id, call_site, caller_address); + ASSERT_TRUE(returned_actor_id == actor_id); + // Let's try to get the handle and make sure it works. + const std::unique_ptr &actor_handle_to_get = + actor_manager_->GetActorHandle(actor_id); + ASSERT_TRUE(actor_handle_to_get->GetActorID() == actor_id); + ASSERT_TRUE(actor_handle_to_get->CreationJobID() == job_id); +} + +TEST_F(ActorManagerTest, TestActorStateNotificationPending) { + ActorID actor_id = AddActorHandle(); + // Nothing happens if state is pending. + EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _)).Times(0); + EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _)).Times(0); + rpc::ActorTableData actor_table_data; + actor_table_data.set_actor_id(actor_id.Binary()); + actor_table_data.set_state( + rpc::ActorTableData_ActorState::ActorTableData_ActorState_PENDING); + ASSERT_TRUE( + actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data)); +} + +TEST_F(ActorManagerTest, TestActorStateNotificationRestarting) { + ActorID actor_id = AddActorHandle(); + // Should disconnect to an actor when actor is restarting. + EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _)).Times(0); + EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _)).Times(1); + rpc::ActorTableData actor_table_data; + actor_table_data.set_actor_id(actor_id.Binary()); + actor_table_data.set_state( + rpc::ActorTableData_ActorState::ActorTableData_ActorState_RESTARTING); + ASSERT_TRUE( + actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data)); +} + +TEST_F(ActorManagerTest, TestActorStateNotificationDead) { + ActorID actor_id = AddActorHandle(); + // Should disconnect to an actor when actor is dead. + EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _)).Times(0); + EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _)).Times(1); + rpc::ActorTableData actor_table_data; + actor_table_data.set_actor_id(actor_id.Binary()); + actor_table_data.set_state( + rpc::ActorTableData_ActorState::ActorTableData_ActorState_DEAD); + ASSERT_TRUE( + actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data)); +} + +TEST_F(ActorManagerTest, TestActorStateNotificationAlive) { + ActorID actor_id = AddActorHandle(); + // Should connect to an actor when actor is alive. + EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _)).Times(1); + EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _)).Times(0); + rpc::ActorTableData actor_table_data; + actor_table_data.set_actor_id(actor_id.Binary()); + actor_table_data.set_state( + rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE); + ASSERT_TRUE( + actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data)); +} + +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index 3062eb9fe..227ad5a37 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -17,7 +17,7 @@ #include "gtest/gtest.h" #include "ray/common/task/task_spec.h" #include "ray/common/test_util.h" -#include "ray/core_worker/actor_manager.h" +#include "ray/core_worker/actor_reporter.h" #include "ray/core_worker/reference_count.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" @@ -35,7 +35,7 @@ TaskSpecification CreateTaskHelper(uint64_t num_returns, return task; } -class MockActorManager : public ActorManagerInterface { +class MockActorManager : public ActorReporterInterface { void PublishTerminatedActor(const TaskSpecification &actor_creation_task) override { num_terminations += 1; } @@ -50,8 +50,8 @@ class TaskManagerTest : public ::testing::Test { reference_counter_(std::shared_ptr(new ReferenceCounter( rpc::Address(), /*distributed_ref_counting_enabled=*/true, lineage_pinning_enabled))), - actor_manager_(std::shared_ptr(new MockActorManager())), - manager_(store_, reference_counter_, actor_manager_, + actor_reporter_(std::shared_ptr(new MockActorManager())), + manager_(store_, reference_counter_, actor_reporter_, [this](const TaskSpecification &spec, bool delay) { num_retries_++; return Status::OK(); @@ -63,7 +63,7 @@ class TaskManagerTest : public ::testing::Test { std::shared_ptr store_; std::shared_ptr reference_counter_; - std::shared_ptr actor_manager_; + std::shared_ptr actor_reporter_; bool all_nodes_alive_ = true; std::vector objects_to_recover_; TaskManager manager_; diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index 7549dd488..f4ff3d34e 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -46,8 +46,20 @@ const int kMaxReorderWaitSeconds = 30; /// In direct actor call task submitter and receiver, a task is directly submitted /// to the actor that will execute it. +// Interface for testing. +class CoreWorkerDirectActorTaskSubmitterInterface { + public: + virtual void AddActorQueueIfNotExists(const ActorID &actor_id) = 0; + virtual void ConnectActor(const ActorID &actor_id, const rpc::Address &address) = 0; + virtual void DisconnectActor(const ActorID &actor_id, bool dead = false) = 0; + virtual void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart) = 0; + + virtual ~CoreWorkerDirectActorTaskSubmitterInterface() {} +}; + // This class is thread-safe. -class CoreWorkerDirectActorTaskSubmitter { +class CoreWorkerDirectActorTaskSubmitter + : public CoreWorkerDirectActorTaskSubmitterInterface { public: CoreWorkerDirectActorTaskSubmitter(rpc::ClientFactoryFn client_factory, std::shared_ptr store,