diff --git a/java/api/src/main/java/org/ray/api/Ray.java b/java/api/src/main/java/org/ray/api/Ray.java index 7cddaf58b..f66be6c73 100644 --- a/java/api/src/main/java/org/ray/api/Ray.java +++ b/java/api/src/main/java/org/ray/api/Ray.java @@ -181,16 +181,6 @@ public final class Ray extends RayCall { runtime.setResource(resourceName, capacity, UniqueId.NIL); } - /** - * Kill the actor immediately. This will cause any outstanding tasks submitted to the actor to - * fail and the actor to exit in the same way as if it crashed. - * - * @param actor The actor to be killed. - */ - public static void killActor(RayActor actor) { - runtime.killActor(actor); - } - /** * Get the runtime context. */ diff --git a/java/api/src/main/java/org/ray/api/RayActor.java b/java/api/src/main/java/org/ray/api/RayActor.java index a9ec9c94f..712423900 100644 --- a/java/api/src/main/java/org/ray/api/RayActor.java +++ b/java/api/src/main/java/org/ray/api/RayActor.java @@ -33,4 +33,13 @@ public interface RayActor extends ActorCall { */ ActorId getId(); + /** + * Kill the actor immediately. This will cause any outstanding tasks submitted to the actor to + * fail and the actor to exit in the same way as if it crashed. + * + * @param noReconstruction If set to true, the killed actor will not be reconstructed anymore. + */ + default void kill(boolean noReconstruction) { + Ray.internal().killActor(this, noReconstruction); + } } diff --git a/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java b/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java index 595049306..61c3b01e9 100644 --- a/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java +++ b/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java @@ -80,8 +80,9 @@ public interface RayRuntime { * Kill the actor immediately. * * @param actor The actor to be killed. + * @param noReconstruction If set to true, the killed actor will not be reconstructed anymore. */ - void killActor(RayActor actor); + void killActor(RayActor actor, boolean noReconstruction); /** * Invoke a remote function. diff --git a/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java index 563d2b99b..0edc285bf 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java @@ -1,7 +1,6 @@ package org.ray.runtime; import java.util.concurrent.atomic.AtomicInteger; - import org.ray.api.RayActor; import org.ray.api.id.JobId; import org.ray.api.id.UniqueId; @@ -50,7 +49,7 @@ public class RayDevRuntime extends AbstractRayRuntime { } @Override - public void killActor(RayActor actor) { + public void killActor(RayActor actor, boolean noReconstruction) { throw new UnsupportedOperationException(); } diff --git a/java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java index c2d706318..4fc5e8752 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java @@ -139,8 +139,8 @@ public class RayMultiWorkerNativeRuntime implements RayRuntime { } @Override - public void killActor(RayActor actor) { - getCurrentRuntime().killActor(actor); + public void killActor(RayActor actor, boolean noReconstruction) { + getCurrentRuntime().killActor(actor, noReconstruction); } @Override diff --git a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java index 27af5f756..1255b31f1 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java @@ -135,8 +135,8 @@ public final class RayNativeRuntime extends AbstractRayRuntime { } @Override - public void killActor(RayActor actor) { - nativeKillActor(nativeCoreWorkerPointer, actor.getId().getBytes()); + public void killActor(RayActor actor, boolean noReconstruction) { + nativeKillActor(nativeCoreWorkerPointer, actor.getId().getBytes(), noReconstruction); } @Override @@ -200,5 +200,6 @@ public final class RayNativeRuntime extends AbstractRayRuntime { private static native void nativeSetResource(long conn, String resourceName, double capacity, byte[] nodeId); - private static native void nativeKillActor(long nativeCoreWorkerPointer, byte[] actorId); + private static native void nativeKillActor(long nativeCoreWorkerPointer, byte[] actorId, + boolean noReconstruction); } diff --git a/java/test/src/main/java/org/ray/api/test/KillActorTest.java b/java/test/src/main/java/org/ray/api/test/KillActorTest.java index 8af24174b..baaf18a18 100644 --- a/java/test/src/main/java/org/ray/api/test/KillActorTest.java +++ b/java/test/src/main/java/org/ray/api/test/KillActorTest.java @@ -1,21 +1,35 @@ package org.ray.api.test; import com.google.common.collect.ImmutableList; +import java.util.function.BiConsumer; import org.ray.api.Ray; import org.ray.api.RayActor; import org.ray.api.RayObject; import org.ray.api.TestUtils; import org.ray.api.exception.RayActorException; +import org.ray.api.options.ActorCreationOptions; import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @Test public class KillActorTest extends BaseTest { + @BeforeClass + public void setUp() { + System.setProperty("ray.raylet.config.num_workers_per_process_java", "1"); + } + + @AfterClass + public void tearDown() { + System.clearProperty("ray.raylet.config.num_workers_per_process_java"); + } + public static class HangActor { - public boolean alive() { - return true; + public String ping() { + return "pong"; } public boolean hang() throws InterruptedException { @@ -25,13 +39,60 @@ public class KillActorTest extends BaseTest { } } - public void testKillActor() { + public static class KillerActor { + + public void kill(RayActor actor, boolean noReconstruction) { + actor.kill(noReconstruction); + } + } + + private static void localKill(RayActor actor, boolean noReconstruction) { + actor.kill(noReconstruction); + } + + private static void remoteKill(RayActor actor, boolean noReconstruction) { + RayActor killer = Ray.createActor(KillerActor::new); + killer.call(KillerActor::kill, actor, noReconstruction); + } + + private void testKillActor(BiConsumer, Boolean> kill, boolean noReconstruction) { TestUtils.skipTestUnderSingleProcess(); - RayActor actor = Ray.createActor(HangActor::new); - Assert.assertTrue(actor.call(HangActor::alive).get()); + + ActorCreationOptions options = + new ActorCreationOptions.Builder().setMaxReconstructions(1).createActorCreationOptions(); + RayActor actor = Ray.createActor(HangActor::new, options); RayObject result = actor.call(HangActor::hang); + // The actor will hang in this task. Assert.assertEquals(0, Ray.wait(ImmutableList.of(result), 1, 500).getReady().size()); - Ray.killActor(actor); + + // Kill the actor + kill.accept(actor, noReconstruction); + // The get operation will fail with RayActorException Assert.expectThrows(RayActorException.class, result::get); + + try { + // Sleep 1s here to make sure the driver has received the actor notification + // (of state RECONSTRUCTING or DEAD). + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + if (noReconstruction) { + // The actor should not be reconstructed. + Assert.expectThrows(RayActorException.class, () -> actor.call(HangActor::hang).get()); + } else { + Assert.assertEquals(actor.call(HangActor::ping).get(), "pong"); + } + } + + public void testLocalKill() { + testKillActor(KillActorTest::localKill, false); + testKillActor(KillActorTest::localKill, true); + } + + public void testRemoteKill() { + testKillActor(KillActorTest::remoteKill, false); + testKillActor(KillActorTest::remoteKill, true); } } diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index aaff43e9a..86cdfebee 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -859,13 +859,13 @@ cdef class CoreWorker: return VectorToObjectIDs(return_ids) - def kill_actor(self, ActorID actor_id): + def kill_actor(self, ActorID actor_id, c_bool no_reconstruction): cdef: CActorID c_actor_id = actor_id.native() with nogil: check_status(self.core_worker.get().KillActor( - c_actor_id, True)) + c_actor_id, True, no_reconstruction)) def resource_ids(self): cdef: diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 94fe4363c..7558ea4c8 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -116,7 +116,9 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const CActorID &actor_id, const CRayFunction &function, const c_vector[CTaskArg] &args, const CTaskOptions &options, c_vector[CObjectID] *return_ids) - CRayStatus KillActor(const CActorID &actor_id, c_bool force_kill) + CRayStatus KillActor( + const CActorID &actor_id, c_bool force_kill, + c_bool no_reconstruction) unique_ptr[CProfileEvent] CreateProfileEvent( const c_string &event_type) diff --git a/python/ray/worker.py b/python/ray/worker.py index 79f092574..5ba7bc13d 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1643,7 +1643,7 @@ def kill(actor): "Got: {}.".format(type(actor))) worker = ray.worker.get_global_worker() - worker.core_worker.kill_actor(actor._ray_actor_id) + worker.core_worker.kill_actor(actor._ray_actor_id, False) def _mode(worker=global_worker): diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index af073dc47..ee249bd49 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -872,11 +872,12 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f return status; } -Status CoreWorker::KillActor(const ActorID &actor_id, bool force_kill) { +Status CoreWorker::KillActor(const ActorID &actor_id, bool force_kill, + bool no_reconstruction) { ActorHandle *actor_handle = nullptr; RAY_RETURN_NOT_OK(GetActorHandle(actor_id, &actor_handle)); RAY_CHECK(actor_handle->IsDirectCallActor()); - direct_actor_submitter_->KillActor(actor_id, force_kill); + direct_actor_submitter_->KillActor(actor_id, force_kill, no_reconstruction); return Status::OK(); } @@ -977,7 +978,8 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle, RAY_LOG(INFO) << "Owner's handle and creation ID " << object_id << " has gone out of scope, sending message to actor " << actor_id << " to do a clean exit."; - RAY_CHECK_OK(KillActor(actor_id, /*intentional=*/true)); + RAY_CHECK_OK( + KillActor(actor_id, /*force_kill=*/true, /*no_reconstruction=*/false)); } })); } @@ -1397,6 +1399,9 @@ void CoreWorker::HandleKillActor(const rpc::KillActorRequest &request, if (request.force_kill()) { RAY_LOG(INFO) << "Got KillActor, exiting immediately..."; + if (request.no_reconstruction()) { + RAY_IGNORE_EXPR(local_raylet_client_->Disconnect()); + } if (log_dir_ != "") { RayLog::ShutDownRayLog(); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 1fff54437..b481c2909 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -394,8 +394,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Tell an actor to exit immediately, without completing outstanding work. /// /// \param[in] actor_id ID of the actor to kill. + /// \param[in] no_reconstruction If set to true, the killed actor will not be + /// reconstructed anymore. /// \param[out] Status - Status KillActor(const ActorID &actor_id, bool force_kill); + Status KillActor(const ActorID &actor_id, bool force_kill, bool no_reconstruction); /// Decrease the reference count for this actor. Should be called by the /// language frontend when a reference to the ActorHandle destroyed. diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc index d54217000..555d8e522 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc @@ -158,10 +158,11 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeSetResource( } JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeKillActor( - JNIEnv *env, jclass, jlong nativeCoreWorkerPointer, jbyteArray actorId) { + JNIEnv *env, jclass, jlong nativeCoreWorkerPointer, jbyteArray actorId, + jboolean noReconstruction) { auto core_worker = reinterpret_cast(nativeCoreWorkerPointer); auto status = core_worker->KillActor(JavaByteArrayToId(env, actorId), - /*force_kill=*/true); + /*force_kill=*/true, noReconstruction); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0); } diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.h b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.h index ac075c785..821a1fadc 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.h +++ b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.h @@ -73,12 +73,10 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeSetResource( /* * Class: org_ray_runtime_RayNativeRuntime * Method: nativeKillActor - * Signature: (J[B)V + * Signature: (J[BZ)V */ -JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeKillActor(JNIEnv *, - jclass, - jlong, - jbyteArray); +JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeKillActor( + JNIEnv *, jclass, jlong, jbyteArray, jboolean); #ifdef __cplusplus } diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index a6dfa5d2f..1c62d7ab9 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -23,13 +23,23 @@ using ray::rpc::ActorTableData; namespace ray { void CoreWorkerDirectActorTaskSubmitter::KillActor(const ActorID &actor_id, - bool force_kill) { + bool force_kill, + bool no_reconstruction) { absl::MutexLock lock(&mu_); - auto inserted = pending_force_kills_.emplace(actor_id, force_kill); + rpc::KillActorRequest request; + request.set_intended_actor_id(actor_id.Binary()); + request.set_force_kill(force_kill); + request.set_no_reconstruction(no_reconstruction); + auto inserted = pending_force_kills_.emplace(actor_id, request); if (!inserted.second && force_kill) { // Overwrite the previous request to kill the actor if the new request is a // force kill. - inserted.first->second = true; + inserted.first->second.set_force_kill(true); + if (no_reconstruction) { + // Overwrite the previous request to disable reconstruction if the new request's + // no_reconstruction flag is set to true. + inserted.first->second.set_no_reconstruction(true); + } } auto it = rpc_clients_.find(actor_id); if (it == rpc_clients_.end()) { @@ -102,9 +112,7 @@ void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id, rpc_clients_[actor_id] = std::shared_ptr(client_factory_(address)); } - if (pending_requests_.count(actor_id) > 0) { - SendPendingTasks(actor_id); - } + SendPendingTasks(actor_id); } void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(const ActorID &actor_id, @@ -135,6 +143,8 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(const ActorID &actor_id // replies. They will be treated as failed once the connection dies. // We retain the sequencing information so that we can properly fail // any tasks submitted after the actor death. + + pending_force_kills_.erase(actor_id); } } @@ -145,11 +155,9 @@ void CoreWorkerDirectActorTaskSubmitter::SendPendingTasks(const ActorID &actor_i // client. auto it = pending_force_kills_.find(actor_id); if (it != pending_force_kills_.end()) { - rpc::KillActorRequest request; - request.set_intended_actor_id(actor_id.Binary()); - request.set_force_kill(it->second); + RAY_LOG(INFO) << "Sending KillActor request to actor " << actor_id; // It's okay if this fails because this means the worker is already dead. - RAY_UNUSED(client->KillActor(request, nullptr)); + RAY_UNUSED(client->KillActor(it->second, nullptr)); pending_force_kills_.erase(it); } diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index 1712f7d65..175efe191 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -70,7 +70,9 @@ class CoreWorkerDirectActorTaskSubmitter { /// \param[in] actor_id The actor_id of the actor to kill. /// \param[in] force_kill Whether to force kill the actor, or let the actor /// try a clean exit. - void KillActor(const ActorID &actor_id, bool force_kill); + /// \param[in] no_reconstruction If set to true, the killed actor will not be + /// reconstructed anymore. + void KillActor(const ActorID &actor_id, bool force_kill, bool no_reconstruction); /// Create connection to actor and send all pending tasks. /// @@ -134,8 +136,10 @@ class CoreWorkerDirectActorTaskSubmitter { /// rpc_clients_ map. absl::flat_hash_map worker_ids_ GUARDED_BY(mu_); - /// Set of actor ids that should be force killed once a client is available. - absl::flat_hash_map pending_force_kills_ GUARDED_BY(mu_); + /// Map from actor ids that should be force killed once a client is available to the + /// pending kill actor requests. + absl::flat_hash_map pending_force_kills_ + GUARDED_BY(mu_); /// Map from actor id to the actor's pending requests. Each actor's requests /// are ordered by the task number in the request. diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index f6a449d85..d418497b0 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -172,6 +172,8 @@ message KillActorRequest { bytes intended_actor_id = 1; // Whether to force kill the actor. bool force_kill = 2; + // If set to true, the killed actor will not be reconstructed anymore. + bool no_reconstruction = 3; } message KillActorReply {