From 6b1a57542e8596395ab018f9773dab80f67e790c Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Mon, 23 Dec 2019 23:12:57 -0600 Subject: [PATCH] Add `actor.__ray_kill__()` to terminate actors immediately (#6523) --- doc/source/actors.rst | 8 ++++- python/ray/_raylet.pyx | 8 +++++ python/ray/actor.py | 16 +++++++++- python/ray/includes/libcoreworker.pxd | 1 + python/ray/tests/test_actor.py | 16 ++++++++++ python/ray/worker.py | 22 ++++++++++++++ src/ray/core_worker/core_worker.cc | 27 +++++++++++++++++ src/ray/core_worker/core_worker.h | 11 +++++++ .../transport/direct_actor_transport.cc | 30 ++++++++++++++++++- .../transport/direct_actor_transport.h | 10 +++++++ src/ray/protobuf/core_worker.proto | 10 +++++++ src/ray/rpc/worker/core_worker_client.h | 19 ++++++++++-- 12 files changed, 173 insertions(+), 5 deletions(-) diff --git a/doc/source/actors.rst b/doc/source/actors.rst index e49583a60..3215dee9e 100644 --- a/doc/source/actors.rst +++ b/doc/source/actors.rst @@ -143,7 +143,13 @@ If necessary, you can manually terminate an actor by calling ``ray.actor.exit_actor()`` from within one of the actor methods. This will kill the actor process and release resources associated/assigned to the actor. This approach should generally not be necessary as actors are automatically garbage -collected. +collected. The ``ObjectID`` resulting from the task can be waited on to wait +for the actor to exit (calling ``ray.get()`` on it will raise a ``RayActorError``). +Note that this method of termination will wait until any previously submitted +tasks finish executing. If you want to terminate an actor immediately, you can +call ``actor_handle.__ray_kill__()``. This will cause the actor to exit immediately +and any pending tasks to fail. Any exit handlers installed in the actor using +``atexit`` will be called. Passing Around Actor Handles ---------------------------- diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index a82a66b51..e0c054067 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1034,6 +1034,14 @@ cdef class CoreWorker: return VectorToObjectIDs(return_ids) + def kill_actor(self, ActorID actor_id): + cdef: + CActorID c_actor_id = actor_id.native() + + with nogil: + check_status(self.core_worker.get().KillActor( + c_actor_id)) + def resource_ids(self): cdef: ResourceMappingType resource_mapping = ( diff --git a/python/ray/actor.py b/python/ray/actor.py index b7b58d9be..8b4cdf312 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -642,7 +642,7 @@ class ActorHandle(object): self._actor_id.hex()) def __del__(self): - """Kill the worker that is running this actor.""" + """Terminate the worker that is running this actor.""" # TODO(swang): Also clean up forked actor handles. # Kill the worker if this is the original actor handle, created # with Class.remote(). TODO(rkn): Even without passing handles around, @@ -671,6 +671,20 @@ class ActorHandle(object): finally: self.__ray_terminate__._actor_hard_ref = None + def __ray_kill__(self): + """Kill the actor that this actor handle refers to immediately. + + This will cause any outstanding tasks submitted to the actor to fail + and the actor to exit in the same way as if it crashed. In general, + you should prefer to just delete the actor handle and let it clean up + gracefull. + + Returns: + None. + """ + worker = ray.worker.get_global_worker() + worker.core_worker.kill_actor(self._ray_actor_id) + @property def _actor_id(self): return self._ray_actor_id diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 134755118..0bbc269be 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -98,6 +98,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const CActorID &actor_id, const CRayFunction &function, const c_vector[CTaskArg] &args, const CTaskOptions &options, c_vector[CObjectID] *return_ids) + CRayStatus KillActor(const CActorID &actor_id) unique_ptr[CProfileEvent] CreateProfileEvent( const c_string &event_type) diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 889fca8ee..a398b77c2 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -1431,6 +1431,22 @@ ray.get(actor.ping.remote()) assert ray.get(detached_actor.ping.remote()) == "pong" +def test_kill(ray_start_regular): + @ray.remote + class Actor(object): + def hang(self): + # Never returns. + ray.get(ray.ObjectID.from_random()) + + actor = Actor.remote() + result = actor.hang.remote() + ready, _ = ray.wait([result], timeout=0.1) + assert len(ready) == 0 + actor.__ray_kill__() + with pytest.raises(ray.exceptions.RayActorError): + ray.get(result, timeout=1) + + if __name__ == "__main__": import pytest import sys diff --git a/python/ray/worker.py b/python/ray/worker.py index 2fba5496e..01ee8f16b 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1746,6 +1746,28 @@ def remote(*args, **kwargs): class Foo(object): def method(self): return 1 + + Remote task and actor objects returned by @ray.remote can also be + dynamically modified with the same arguments as above using + ``.options()`` as follows: + + .. code-block:: python + + @ray.remote(num_gpus=1, max_calls=1, num_return_vals=2) + def f(): + return 1, 2 + g = f.options(num_gpus=2, max_calls=None) + + @ray.remote(num_cpus=2, resources={"CustomResource": 1}) + class Foo(object): + def method(self): + return 1 + Bar = Foo.options(num_cpus=1, resources=None) + + Running remote actors will be terminated when the actor handle to them + in Python is deleted, which will cause them to complete any outstanding + work and then shut down. If you want to kill them immediately, you can + also call ``actor_handle.__ray_kill__()``. """ worker = get_global_worker() diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 43d0af509..b4bf55b34 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -96,6 +96,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, RayLog::StartRayLog(app_name.str(), RayLogLevel::INFO, log_dir_); RayLog::InstallFailureSignalHandler(); } + RAY_LOG(INFO) << "Initializing worker " << worker_context_.GetWorkerID(); // Initialize gcs client. gcs_client_ = std::make_shared(gcs_options); @@ -715,6 +716,13 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f return status; } +Status CoreWorker::KillActor(const ActorID &actor_id) { + ActorHandle *actor_handle = nullptr; + RAY_RETURN_NOT_OK(GetActorHandle(actor_id, &actor_handle)); + RAY_CHECK(actor_handle->IsDirectCallActor()); + return direct_actor_submitter_->KillActor(actor_id); +} + ActorID CoreWorker::DeserializeAndRegisterActorHandle(const std::string &serialized) { std::unique_ptr actor_handle(new ActorHandle(serialized)); const ActorID actor_id = actor_handle->GetActorID(); @@ -853,6 +861,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, return_ids.pop_back(); task_type = TaskType::ACTOR_CREATION_TASK; SetActorId(task_spec.ActorCreationId()); + RAY_LOG(INFO) << "Creating actor: " << actor_id_; } else if (task_spec.IsActorTask()) { RAY_CHECK(return_ids.size() > 0); return_ids.pop_back(); @@ -1032,6 +1041,24 @@ void CoreWorker::HandleGetObjectStatus(const rpc::GetObjectStatusRequest &reques } } +void CoreWorker::HandleKillActor(const rpc::KillActorRequest &request, + rpc::KillActorReply *reply, + rpc::SendReplyCallback send_reply_callback) { + ActorID intended_actor_id = ActorID::FromBinary(request.intended_actor_id()); + if (intended_actor_id != worker_context_.GetCurrentActorID()) { + std::ostringstream stream; + stream << "Mismatched ActorID: ignoring KillActor for previous actor " + << intended_actor_id + << ", current actor ID: " << worker_context_.GetCurrentActorID(); + auto msg = stream.str(); + RAY_LOG(ERROR) << msg; + send_reply_callback(Status::Invalid(msg), nullptr, nullptr); + return; + } + RAY_LOG(INFO) << "Got KillActor, shutting down..."; + Shutdown(); +} + void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest &request, rpc::GetCoreWorkerStatsReply *reply, rpc::SendReplyCallback send_reply_callback) { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 03d4b9abd..fffe787a7 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -33,6 +33,7 @@ RAY_CORE_WORKER_RPC_HANDLER(PushTask, 9999) \ RAY_CORE_WORKER_RPC_HANDLER(DirectActorCallArgWaitComplete, 100) \ RAY_CORE_WORKER_RPC_HANDLER(GetObjectStatus, 9999) \ + RAY_CORE_WORKER_RPC_HANDLER(KillActor, 9999) \ RAY_CORE_WORKER_RPC_HANDLER(GetCoreWorkerStats, 100) namespace ray { @@ -324,6 +325,12 @@ class CoreWorker { const TaskOptions &task_options, std::vector *return_ids); + /// Tell an actor to exit immediately, without completing outstanding work. + /// + /// \param[in] actor_id ID of the actor to kill. + /// \param[out] Status + Status KillActor(const ActorID &actor_id); + /// Add an actor handle from a serialized string. /// /// This should be called when an actor handle is given to us by another task @@ -406,6 +413,10 @@ class CoreWorker { rpc::GetObjectStatusReply *reply, rpc::SendReplyCallback send_reply_callback); + /// Implements gRPC server handler. + void HandleKillActor(const rpc::KillActorRequest &request, rpc::KillActorReply *reply, + rpc::SendReplyCallback send_reply_callback); + /// Get statistics from core worker. void HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest &request, rpc::GetCoreWorkerStatsReply *reply, diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index bac8fc748..9f49e444a 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -1,12 +1,31 @@ +#include "ray/core_worker/transport/direct_actor_transport.h" + #include #include "ray/common/task/task.h" -#include "ray/core_worker/transport/direct_actor_transport.h" using ray::rpc::ActorTableData; namespace ray { +Status CoreWorkerDirectActorTaskSubmitter::KillActor(const ActorID &actor_id) { + absl::MutexLock lock(&mu_); + pending_force_kills_.insert(actor_id); + auto it = rpc_clients_.find(actor_id); + if (it == rpc_clients_.end()) { + // Actor is not yet created, or is being reconstructed, cache the request + // and submit after actor is alive. + // TODO(zhijunfu): it might be possible for a user to specify an invalid + // actor handle (e.g. from unpickling), in that case it might be desirable + // to have a timeout to mark it as invalid if it doesn't show up in the + // specified time. + RAY_LOG(DEBUG) << "Actor " << actor_id << " is not yet created."; + } else { + SendPendingTasks(actor_id); + } + return Status::OK(); +} + Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) { RAY_LOG(DEBUG) << "Submitting task " << task_spec.TaskId(); RAY_CHECK(task_spec.IsActorTask()); @@ -101,6 +120,15 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(const ActorID &actor_id void CoreWorkerDirectActorTaskSubmitter::SendPendingTasks(const ActorID &actor_id) { auto &client = rpc_clients_[actor_id]; RAY_CHECK(client); + // Check if there is a pending force kill. If there is, send it and disconnect the + // client. + if (pending_force_kills_.find(actor_id) != pending_force_kills_.end()) { + rpc::KillActorRequest request; + request.set_intended_actor_id(actor_id.Binary()); + RAY_CHECK_OK(client->KillActor(request, nullptr)); + pending_force_kills_.erase(actor_id); + } + // Submit all pending requests. auto &requests = pending_requests_[actor_id]; auto head = requests.begin(); diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index feb08c17b..8fd2c3c35 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -11,6 +11,7 @@ #include "absl/base/thread_annotations.h" #include "absl/container/flat_hash_map.h" +#include "absl/container/flat_hash_set.h" #include "absl/synchronization/mutex.h" #include "ray/common/id.h" #include "ray/common/ray_object.h" @@ -48,6 +49,12 @@ class CoreWorkerDirectActorTaskSubmitter { /// \return Status::Invalid if the task is not yet supported. Status SubmitTask(TaskSpecification task_spec); + /// Tell this actor to exit immediately. + /// + /// \param[in] actor_id The actor_id of the actor to kill. + /// \return Status::Invalid if the actor could not be killed. + Status KillActor(const ActorID &actor_id); + /// Create connection to actor and send all pending tasks. /// /// \param[in] actor_id Actor ID. @@ -107,6 +114,9 @@ class CoreWorkerDirectActorTaskSubmitter { /// rpc_clients_ map. absl::flat_hash_map worker_ids_ GUARDED_BY(mu_); + /// Set of actor ids that should be force killed once a client is available. + absl::flat_hash_set pending_force_kills_ GUARDED_BY(mu_); + /// Map from actor id to the actor's pending requests. Each actor's requests /// are ordered by the task number in the request. absl::flat_hash_map>> diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 3a9c5dc68..5aebd2495 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -117,6 +117,14 @@ message GetObjectStatusReply { ObjectStatus status = 1; } +message KillActorRequest { + // ID of the actor that is intended to be killed. + bytes intended_actor_id = 1; +} + +message KillActorReply { +} + message GetCoreWorkerStatsRequest { // The ID of the worker this message is intended for. bytes intended_worker_id = 1; @@ -137,6 +145,8 @@ service CoreWorkerService { returns (DirectActorCallArgWaitCompleteReply); // Ask the object's owner about the object's current status. rpc GetObjectStatus(GetObjectStatusRequest) returns (GetObjectStatusReply); + // Request that the worker shut down without completing outstanding work. + rpc KillActor(KillActorRequest) returns (KillActorReply); // Get metrics from core workers. rpc GetCoreWorkerStats(GetCoreWorkerStatsRequest) returns (GetCoreWorkerStatsReply); } diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index 61262781d..2f0b99f23 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -1,15 +1,15 @@ #ifndef RAY_RPC_CORE_WORKER_CLIENT_H #define RAY_RPC_CORE_WORKER_CLIENT_H +#include + #include #include #include #include -#include #include "absl/base/thread_annotations.h" #include "absl/hash/hash.h" - #include "ray/common/status.h" #include "ray/rpc/client_call.h" #include "ray/util/logging.h" @@ -121,6 +121,12 @@ class CoreWorkerClientInterface { return Status::NotImplemented(""); } + /// Tell this actor to exit immediately. + virtual ray::Status KillActor(const KillActorRequest &request, + const ClientCallback &callback) { + return Status::NotImplemented(""); + } + virtual ray::Status GetCoreWorkerStats( const GetCoreWorkerStatsRequest &request, const ClientCallback &callback) { @@ -203,6 +209,15 @@ class CoreWorkerClient : public std::enable_shared_from_this, return call->GetStatus(); } + virtual ray::Status KillActor(const KillActorRequest &request, + const ClientCallback &callback) override { + auto call = client_call_manager_ + .CreateCall( + *stub_, &CoreWorkerService::Stub::PrepareAsyncKillActor, request, + callback); + return call->GetStatus(); + } + virtual ray::Status GetCoreWorkerStats( const GetCoreWorkerStatsRequest &request, const ClientCallback &callback) override {