diff --git a/java/api/src/main/java/io/ray/api/Ray.java b/java/api/src/main/java/io/ray/api/Ray.java
index 1e9d68482..623256db4 100644
--- a/java/api/src/main/java/io/ray/api/Ray.java
+++ b/java/api/src/main/java/io/ray/api/Ray.java
@@ -241,4 +241,16 @@ public final class Ray extends RayCall {
PlacementStrategy strategy) {
return runtime.createPlacementGroup(bundles, strategy);
}
+
+ /**
+ * Intentionally exit the current actor.
+ *
+ * This method is used to disconnect an actor and exit the worker.
+ *
+ * @throws RuntimeException An exception is raised if this is a driver or this worker is not
+ * an actor.
+ */
+ public static void exitActor() {
+ runtime.exitActor();
+ }
}
diff --git a/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java b/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java
index 477fb029d..19ec5390a 100644
--- a/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java
+++ b/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java
@@ -193,4 +193,9 @@ public interface RayRuntime {
* @return The wrapped callable.
*/
Callable wrapCallable(Callable callable);
+
+ /**
+ * Intentionally exit the current actor.
+ */
+ void exitActor();
}
diff --git a/java/runtime/src/main/java/io/ray/runtime/RayDevRuntime.java b/java/runtime/src/main/java/io/ray/runtime/RayDevRuntime.java
index 778f3f07e..26fa0430b 100644
--- a/java/runtime/src/main/java/io/ray/runtime/RayDevRuntime.java
+++ b/java/runtime/src/main/java/io/ray/runtime/RayDevRuntime.java
@@ -84,6 +84,11 @@ public class RayDevRuntime extends AbstractRayRuntime {
super.setAsyncContext(asyncContext);
}
+ @Override
+ public void exitActor() {
+
+ }
+
private JobId nextJobId() {
return JobId.fromInt(jobCounter.getAndIncrement());
}
diff --git a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java
index 5eada0cbb..724e82fb4 100644
--- a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java
+++ b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java
@@ -9,6 +9,7 @@ import io.ray.api.runtimecontext.NodeInfo;
import io.ray.runtime.config.RayConfig;
import io.ray.runtime.context.NativeWorkerContext;
import io.ray.runtime.exception.RayException;
+import io.ray.runtime.exception.RayIntentionalSystemExitException;
import io.ray.runtime.gcs.GcsClient;
import io.ray.runtime.gcs.GcsClientOptions;
import io.ray.runtime.gcs.RedisClient;
@@ -245,6 +246,16 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
super.setAsyncContext(asyncContext);
}
+ @Override
+ public void exitActor() {
+ if (rayConfig.workerMode != WorkerType.WORKER || runtimeContext.getCurrentActorId().isNil()) {
+ throw new RuntimeException("This shouldn't be called on a non-actor worker.");
+ }
+ LOGGER.info("Actor {} is exiting.", runtimeContext.getCurrentActorId());
+ throw new RayIntentionalSystemExitException(
+ String.format("Actor %s is exiting.", runtimeContext.getCurrentActorId()));
+ }
+
@Override
public void run() {
Preconditions.checkState(rayConfig.workerMode == WorkerType.WORKER);
diff --git a/java/runtime/src/main/java/io/ray/runtime/exception/RayIntentionalSystemExitException.java b/java/runtime/src/main/java/io/ray/runtime/exception/RayIntentionalSystemExitException.java
new file mode 100644
index 000000000..b2c168c70
--- /dev/null
+++ b/java/runtime/src/main/java/io/ray/runtime/exception/RayIntentionalSystemExitException.java
@@ -0,0 +1,15 @@
+package io.ray.runtime.exception;
+
+/**
+ * The exception represents that there is an intentional system exit.
+ */
+public class RayIntentionalSystemExitException extends RuntimeException {
+
+ public RayIntentionalSystemExitException(String message) {
+ super(message);
+ }
+
+ public RayIntentionalSystemExitException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java b/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java
index 2b5a97ea2..27c2b8bd0 100644
--- a/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java
+++ b/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java
@@ -6,6 +6,7 @@ import io.ray.api.id.JobId;
import io.ray.api.id.TaskId;
import io.ray.api.id.UniqueId;
import io.ray.runtime.RayRuntimeInternal;
+import io.ray.runtime.exception.RayIntentionalSystemExitException;
import io.ray.runtime.exception.RayTaskException;
import io.ray.runtime.functionmanager.JavaFunctionDescriptor;
import io.ray.runtime.functionmanager.RayFunction;
@@ -159,6 +160,12 @@ public abstract class TaskExecutor {
}
LOGGER.debug("Finished executing task {}", taskId);
} catch (Throwable e) {
+ if (e instanceof RayIntentionalSystemExitException) {
+ // We don't need to fill the `returnObjects` with an exception metadata
+ // because the node manager or the direct actor task submitter will fill
+ // the return object with the ACTOR_DIED metadata.
+ throw (RayIntentionalSystemExitException) e;
+ }
LOGGER.error("Error executing task " + taskId, e);
if (taskType != TaskType.ACTOR_CREATION_TASK) {
boolean hasReturn = rayFunction != null && rayFunction.hasReturn();
diff --git a/java/runtime/src/main/java/io/ray/runtime/util/SystemUtil.java b/java/runtime/src/main/java/io/ray/runtime/util/SystemUtil.java
index a8423e2d6..37bb88bb1 100644
--- a/java/runtime/src/main/java/io/ray/runtime/util/SystemUtil.java
+++ b/java/runtime/src/main/java/io/ray/runtime/util/SystemUtil.java
@@ -1,5 +1,6 @@
package io.ray.runtime.util;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.concurrent.locks.ReentrantLock;
@@ -34,4 +35,15 @@ public class SystemUtil {
return pid;
}
+
+ public static boolean isProcessAlive(int pid) {
+ Process process;
+ try {
+ process = Runtime.getRuntime().exec(new String[]{"ps", "-p", String.valueOf(pid)});
+ process.waitFor();
+ } catch (InterruptedException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ return process.exitValue() == 0;
+ }
}
diff --git a/java/test/src/main/java/io/ray/test/ExitActorTest.java b/java/test/src/main/java/io/ray/test/ExitActorTest.java
new file mode 100644
index 000000000..4ca537bc8
--- /dev/null
+++ b/java/test/src/main/java/io/ray/test/ExitActorTest.java
@@ -0,0 +1,114 @@
+package io.ray.test;
+
+import static io.ray.runtime.util.SystemUtil.pid;
+
+import io.ray.api.ActorHandle;
+import io.ray.api.Checkpointable;
+import io.ray.api.ObjectRef;
+import io.ray.api.Ray;
+import io.ray.api.id.ActorId;
+import io.ray.api.id.UniqueId;
+import io.ray.runtime.exception.RayActorException;
+import io.ray.runtime.util.SystemUtil;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test(groups = {"cluster"})
+public class ExitActorTest extends BaseTest {
+
+ private static class ExitingActor implements Checkpointable {
+
+ int counter = 0;
+
+ public Integer incr() {
+ return ++counter;
+ }
+
+ public int getPid() {
+ return pid();
+ }
+
+ @Override
+ public boolean shouldCheckpoint(CheckpointContext checkpointContext) {
+ return true;
+ }
+
+ @Override
+ public void saveCheckpoint(ActorId actorId, UniqueId checkpointId) {
+ }
+
+ @Override
+ public UniqueId loadCheckpoint(ActorId actorId, List availableCheckpoints) {
+ // Dummy load checkpoint.
+ this.counter = 1;
+ return availableCheckpoints.get(availableCheckpoints.size() - 1).checkpointId;
+ }
+
+ @Override
+ public void checkpointExpired(ActorId actorId, UniqueId checkpointId) {
+ }
+
+ public boolean exit() {
+ Ray.exitActor();
+ return false;
+ }
+ }
+
+ public void testExitActor() throws IOException, InterruptedException {
+ ActorHandle actor = Ray.actor(ExitingActor::new)
+ .setMaxRestarts(10000).remote();
+ Assert.assertEquals(1, (int) (actor.task(ExitingActor::incr).remote().get()));
+ int pid = actor.task(ExitingActor::getPid).remote().get();
+ Runtime.getRuntime().exec("kill -9 " + pid);
+ TimeUnit.SECONDS.sleep(1);
+ // Make sure this actor can be reconstructed.
+ Assert.assertEquals(2, (int) actor.task(ExitingActor::incr).remote().get());
+
+ // `exitActor` will exit the actor without reconstructing.
+ ObjectRef obj = actor.task(ExitingActor::exit).remote();
+ Assert.assertThrows(RayActorException.class, obj::get);
+ }
+
+ public void testExitActorInMultiWorker() {
+ Assert.assertTrue(TestUtils.getRuntime().getRayConfig().numWorkersPerProcess > 1);
+ ActorHandle actor1 = Ray.actor(ExitingActor::new)
+ .setMaxRestarts(10000).remote();
+ int pid = actor1.task(ExitingActor::getPid).remote().get();
+ ActorHandle actor2;
+ while (true) {
+ // Create another actor which share the same process of actor 1.
+ actor2 = Ray.actor(ExitingActor::new).setMaxRestarts(0).remote();
+ int actor2Pid = actor2.task(ExitingActor::getPid).remote().get();
+ if (actor2Pid == pid) {
+ break;
+ }
+ }
+ ObjectRef obj1 = actor1.task(ExitingActor::exit).remote();
+ Assert.assertThrows(RayActorException.class, obj1::get);
+ Assert.assertTrue(SystemUtil.isProcessAlive(pid));
+ // Actor 2 shouldn't exit or be reconstructed.
+ Assert.assertEquals(1, (int) actor2.task(ExitingActor::incr).remote().get());
+ Assert.assertEquals(pid, (int) actor2.task(ExitingActor::getPid).remote().get());
+ Assert.assertTrue(SystemUtil.isProcessAlive(pid));
+ }
+
+ public void testExitActorWithDynamicOptions() {
+ ActorHandle actor = Ray.actor(ExitingActor::new)
+ .setMaxRestarts(10000)
+ // Set dummy JVM options to start a worker process with only one worker.
+ .setJvmOptions(" ")
+ .remote();
+ int pid = actor.task(ExitingActor::getPid).remote().get();
+ Assert.assertTrue(SystemUtil.isProcessAlive(pid));
+ ObjectRef obj1 = actor.task(ExitingActor::exit).remote();
+ Assert.assertThrows(RayActorException.class, obj1::get);
+ // Now the actor shouldn't be reconstructed anymore.
+ Assert.assertThrows(RayActorException.class,
+ () -> actor.task(ExitingActor::getPid).remote().get());
+ // Now the worker process should be dead.
+ Assert.assertTrue(TestUtils.waitForCondition(() -> !SystemUtil.isProcessAlive(pid), 5000));
+ }
+}
diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc
index 7f510b2c1..78ca14b41 100644
--- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc
+++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc
@@ -145,6 +145,13 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(
jobject java_return_objects =
env->CallObjectMethod(java_task_executor, java_task_executor_execute,
ray_function_array_list, args_array_list);
+ // Check whether the exception is `IntentionalSystemExit`.
+ jthrowable throwable = env->ExceptionOccurred();
+ if (throwable &&
+ env->IsInstanceOf(throwable,
+ java_ray_intentional_system_exit_exception_class)) {
+ return ray::Status::IntentionalSystemExit();
+ }
RAY_CHECK_JAVA_EXCEPTION(env);
// Process return objects.
diff --git a/src/ray/core_worker/lib/java/jni_init.cc b/src/ray/core_worker/lib/java/jni_init.cc
index ed1860b15..596973020 100644
--- a/src/ray/core_worker/lib/java/jni_init.cc
+++ b/src/ray/core_worker/lib/java/jni_init.cc
@@ -54,6 +54,7 @@ jclass java_system_class;
jmethodID java_system_gc;
jclass java_ray_exception_class;
+jclass java_ray_intentional_system_exit_exception_class;
jclass java_jni_exception_util_class;
jmethodID java_jni_exception_util_get_stack_trace;
@@ -171,6 +172,8 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) {
java_system_gc = env->GetStaticMethodID(java_system_class, "gc", "()V");
java_ray_exception_class = LoadClass(env, "io/ray/runtime/exception/RayException");
+ java_ray_intentional_system_exit_exception_class =
+ LoadClass(env, "io/ray/runtime/exception/RayIntentionalSystemExitException");
java_jni_exception_util_class = LoadClass(env, "io/ray/runtime/util/JniExceptionUtil");
java_jni_exception_util_get_stack_trace = env->GetStaticMethodID(
@@ -272,6 +275,7 @@ void JNI_OnUnload(JavaVM *vm, void *reserved) {
env->DeleteGlobalRef(java_map_entry_class);
env->DeleteGlobalRef(java_system_class);
env->DeleteGlobalRef(java_ray_exception_class);
+ env->DeleteGlobalRef(java_ray_intentional_system_exit_exception_class);
env->DeleteGlobalRef(java_jni_exception_util_class);
env->DeleteGlobalRef(java_base_id_class);
env->DeleteGlobalRef(java_function_descriptor_class);
diff --git a/src/ray/core_worker/lib/java/jni_utils.h b/src/ray/core_worker/lib/java/jni_utils.h
index a5c14d76f..406cd71d4 100644
--- a/src/ray/core_worker/lib/java/jni_utils.h
+++ b/src/ray/core_worker/lib/java/jni_utils.h
@@ -94,6 +94,9 @@ extern jmethodID java_system_gc;
/// RayException class
extern jclass java_ray_exception_class;
+/// RayIntentionalSystemExitException class
+extern jclass java_ray_intentional_system_exit_exception_class;
+
/// JniExceptionUtil class
extern jclass java_jni_exception_util_class;
/// getStackTrace method of JniExceptionUtil class