From cf3875bd8c998eb811f9ea98083b2cb9c5e71dcf Mon Sep 17 00:00:00 2001 From: chaokunyang Date: Fri, 4 Sep 2020 10:11:42 +0800 Subject: [PATCH] [Java] add exitActor API for java (#10496) --- java/api/src/main/java/io/ray/api/Ray.java | 12 ++ .../java/io/ray/api/runtime/RayRuntime.java | 5 + .../java/io/ray/runtime/RayDevRuntime.java | 5 + .../java/io/ray/runtime/RayNativeRuntime.java | 11 ++ .../RayIntentionalSystemExitException.java | 15 +++ .../io/ray/runtime/task/TaskExecutor.java | 7 ++ .../java/io/ray/runtime/util/SystemUtil.java | 12 ++ .../main/java/io/ray/test/ExitActorTest.java | 114 ++++++++++++++++++ .../java/io_ray_runtime_RayNativeRuntime.cc | 7 ++ src/ray/core_worker/lib/java/jni_init.cc | 4 + src/ray/core_worker/lib/java/jni_utils.h | 3 + 11 files changed, 195 insertions(+) create mode 100644 java/runtime/src/main/java/io/ray/runtime/exception/RayIntentionalSystemExitException.java create mode 100644 java/test/src/main/java/io/ray/test/ExitActorTest.java diff --git a/java/api/src/main/java/io/ray/api/Ray.java b/java/api/src/main/java/io/ray/api/Ray.java index 1e9d68482..623256db4 100644 --- a/java/api/src/main/java/io/ray/api/Ray.java +++ b/java/api/src/main/java/io/ray/api/Ray.java @@ -241,4 +241,16 @@ public final class Ray extends RayCall { PlacementStrategy strategy) { return runtime.createPlacementGroup(bundles, strategy); } + + /** + * Intentionally exit the current actor. + *

+ * This method is used to disconnect an actor and exit the worker. + * + * @throws RuntimeException An exception is raised if this is a driver or this worker is not + * an actor. + */ + public static void exitActor() { + runtime.exitActor(); + } } diff --git a/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java b/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java index 477fb029d..19ec5390a 100644 --- a/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java +++ b/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java @@ -193,4 +193,9 @@ public interface RayRuntime { * @return The wrapped callable. */ Callable wrapCallable(Callable callable); + + /** + * Intentionally exit the current actor. + */ + void exitActor(); } diff --git a/java/runtime/src/main/java/io/ray/runtime/RayDevRuntime.java b/java/runtime/src/main/java/io/ray/runtime/RayDevRuntime.java index 778f3f07e..26fa0430b 100644 --- a/java/runtime/src/main/java/io/ray/runtime/RayDevRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/RayDevRuntime.java @@ -84,6 +84,11 @@ public class RayDevRuntime extends AbstractRayRuntime { super.setAsyncContext(asyncContext); } + @Override + public void exitActor() { + + } + private JobId nextJobId() { return JobId.fromInt(jobCounter.getAndIncrement()); } diff --git a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java index 5eada0cbb..724e82fb4 100644 --- a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java @@ -9,6 +9,7 @@ import io.ray.api.runtimecontext.NodeInfo; import io.ray.runtime.config.RayConfig; import io.ray.runtime.context.NativeWorkerContext; import io.ray.runtime.exception.RayException; +import io.ray.runtime.exception.RayIntentionalSystemExitException; import io.ray.runtime.gcs.GcsClient; import io.ray.runtime.gcs.GcsClientOptions; import io.ray.runtime.gcs.RedisClient; @@ -245,6 +246,16 @@ public final class RayNativeRuntime extends AbstractRayRuntime { super.setAsyncContext(asyncContext); } + @Override + public void exitActor() { + if (rayConfig.workerMode != WorkerType.WORKER || runtimeContext.getCurrentActorId().isNil()) { + throw new RuntimeException("This shouldn't be called on a non-actor worker."); + } + LOGGER.info("Actor {} is exiting.", runtimeContext.getCurrentActorId()); + throw new RayIntentionalSystemExitException( + String.format("Actor %s is exiting.", runtimeContext.getCurrentActorId())); + } + @Override public void run() { Preconditions.checkState(rayConfig.workerMode == WorkerType.WORKER); diff --git a/java/runtime/src/main/java/io/ray/runtime/exception/RayIntentionalSystemExitException.java b/java/runtime/src/main/java/io/ray/runtime/exception/RayIntentionalSystemExitException.java new file mode 100644 index 000000000..b2c168c70 --- /dev/null +++ b/java/runtime/src/main/java/io/ray/runtime/exception/RayIntentionalSystemExitException.java @@ -0,0 +1,15 @@ +package io.ray.runtime.exception; + +/** + * The exception represents that there is an intentional system exit. + */ +public class RayIntentionalSystemExitException extends RuntimeException { + + public RayIntentionalSystemExitException(String message) { + super(message); + } + + public RayIntentionalSystemExitException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java b/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java index 2b5a97ea2..27c2b8bd0 100644 --- a/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java +++ b/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java @@ -6,6 +6,7 @@ import io.ray.api.id.JobId; import io.ray.api.id.TaskId; import io.ray.api.id.UniqueId; import io.ray.runtime.RayRuntimeInternal; +import io.ray.runtime.exception.RayIntentionalSystemExitException; import io.ray.runtime.exception.RayTaskException; import io.ray.runtime.functionmanager.JavaFunctionDescriptor; import io.ray.runtime.functionmanager.RayFunction; @@ -159,6 +160,12 @@ public abstract class TaskExecutor { } LOGGER.debug("Finished executing task {}", taskId); } catch (Throwable e) { + if (e instanceof RayIntentionalSystemExitException) { + // We don't need to fill the `returnObjects` with an exception metadata + // because the node manager or the direct actor task submitter will fill + // the return object with the ACTOR_DIED metadata. + throw (RayIntentionalSystemExitException) e; + } LOGGER.error("Error executing task " + taskId, e); if (taskType != TaskType.ACTOR_CREATION_TASK) { boolean hasReturn = rayFunction != null && rayFunction.hasReturn(); diff --git a/java/runtime/src/main/java/io/ray/runtime/util/SystemUtil.java b/java/runtime/src/main/java/io/ray/runtime/util/SystemUtil.java index a8423e2d6..37bb88bb1 100644 --- a/java/runtime/src/main/java/io/ray/runtime/util/SystemUtil.java +++ b/java/runtime/src/main/java/io/ray/runtime/util/SystemUtil.java @@ -1,5 +1,6 @@ package io.ray.runtime.util; +import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.util.concurrent.locks.ReentrantLock; @@ -34,4 +35,15 @@ public class SystemUtil { return pid; } + + public static boolean isProcessAlive(int pid) { + Process process; + try { + process = Runtime.getRuntime().exec(new String[]{"ps", "-p", String.valueOf(pid)}); + process.waitFor(); + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } + return process.exitValue() == 0; + } } diff --git a/java/test/src/main/java/io/ray/test/ExitActorTest.java b/java/test/src/main/java/io/ray/test/ExitActorTest.java new file mode 100644 index 000000000..4ca537bc8 --- /dev/null +++ b/java/test/src/main/java/io/ray/test/ExitActorTest.java @@ -0,0 +1,114 @@ +package io.ray.test; + +import static io.ray.runtime.util.SystemUtil.pid; + +import io.ray.api.ActorHandle; +import io.ray.api.Checkpointable; +import io.ray.api.ObjectRef; +import io.ray.api.Ray; +import io.ray.api.id.ActorId; +import io.ray.api.id.UniqueId; +import io.ray.runtime.exception.RayActorException; +import io.ray.runtime.util.SystemUtil; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Test(groups = {"cluster"}) +public class ExitActorTest extends BaseTest { + + private static class ExitingActor implements Checkpointable { + + int counter = 0; + + public Integer incr() { + return ++counter; + } + + public int getPid() { + return pid(); + } + + @Override + public boolean shouldCheckpoint(CheckpointContext checkpointContext) { + return true; + } + + @Override + public void saveCheckpoint(ActorId actorId, UniqueId checkpointId) { + } + + @Override + public UniqueId loadCheckpoint(ActorId actorId, List availableCheckpoints) { + // Dummy load checkpoint. + this.counter = 1; + return availableCheckpoints.get(availableCheckpoints.size() - 1).checkpointId; + } + + @Override + public void checkpointExpired(ActorId actorId, UniqueId checkpointId) { + } + + public boolean exit() { + Ray.exitActor(); + return false; + } + } + + public void testExitActor() throws IOException, InterruptedException { + ActorHandle actor = Ray.actor(ExitingActor::new) + .setMaxRestarts(10000).remote(); + Assert.assertEquals(1, (int) (actor.task(ExitingActor::incr).remote().get())); + int pid = actor.task(ExitingActor::getPid).remote().get(); + Runtime.getRuntime().exec("kill -9 " + pid); + TimeUnit.SECONDS.sleep(1); + // Make sure this actor can be reconstructed. + Assert.assertEquals(2, (int) actor.task(ExitingActor::incr).remote().get()); + + // `exitActor` will exit the actor without reconstructing. + ObjectRef obj = actor.task(ExitingActor::exit).remote(); + Assert.assertThrows(RayActorException.class, obj::get); + } + + public void testExitActorInMultiWorker() { + Assert.assertTrue(TestUtils.getRuntime().getRayConfig().numWorkersPerProcess > 1); + ActorHandle actor1 = Ray.actor(ExitingActor::new) + .setMaxRestarts(10000).remote(); + int pid = actor1.task(ExitingActor::getPid).remote().get(); + ActorHandle actor2; + while (true) { + // Create another actor which share the same process of actor 1. + actor2 = Ray.actor(ExitingActor::new).setMaxRestarts(0).remote(); + int actor2Pid = actor2.task(ExitingActor::getPid).remote().get(); + if (actor2Pid == pid) { + break; + } + } + ObjectRef obj1 = actor1.task(ExitingActor::exit).remote(); + Assert.assertThrows(RayActorException.class, obj1::get); + Assert.assertTrue(SystemUtil.isProcessAlive(pid)); + // Actor 2 shouldn't exit or be reconstructed. + Assert.assertEquals(1, (int) actor2.task(ExitingActor::incr).remote().get()); + Assert.assertEquals(pid, (int) actor2.task(ExitingActor::getPid).remote().get()); + Assert.assertTrue(SystemUtil.isProcessAlive(pid)); + } + + public void testExitActorWithDynamicOptions() { + ActorHandle actor = Ray.actor(ExitingActor::new) + .setMaxRestarts(10000) + // Set dummy JVM options to start a worker process with only one worker. + .setJvmOptions(" ") + .remote(); + int pid = actor.task(ExitingActor::getPid).remote().get(); + Assert.assertTrue(SystemUtil.isProcessAlive(pid)); + ObjectRef obj1 = actor.task(ExitingActor::exit).remote(); + Assert.assertThrows(RayActorException.class, obj1::get); + // Now the actor shouldn't be reconstructed anymore. + Assert.assertThrows(RayActorException.class, + () -> actor.task(ExitingActor::getPid).remote().get()); + // Now the worker process should be dead. + Assert.assertTrue(TestUtils.waitForCondition(() -> !SystemUtil.isProcessAlive(pid), 5000)); + } +} diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc index 7f510b2c1..78ca14b41 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc @@ -145,6 +145,13 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize( jobject java_return_objects = env->CallObjectMethod(java_task_executor, java_task_executor_execute, ray_function_array_list, args_array_list); + // Check whether the exception is `IntentionalSystemExit`. + jthrowable throwable = env->ExceptionOccurred(); + if (throwable && + env->IsInstanceOf(throwable, + java_ray_intentional_system_exit_exception_class)) { + return ray::Status::IntentionalSystemExit(); + } RAY_CHECK_JAVA_EXCEPTION(env); // Process return objects. diff --git a/src/ray/core_worker/lib/java/jni_init.cc b/src/ray/core_worker/lib/java/jni_init.cc index ed1860b15..596973020 100644 --- a/src/ray/core_worker/lib/java/jni_init.cc +++ b/src/ray/core_worker/lib/java/jni_init.cc @@ -54,6 +54,7 @@ jclass java_system_class; jmethodID java_system_gc; jclass java_ray_exception_class; +jclass java_ray_intentional_system_exit_exception_class; jclass java_jni_exception_util_class; jmethodID java_jni_exception_util_get_stack_trace; @@ -171,6 +172,8 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) { java_system_gc = env->GetStaticMethodID(java_system_class, "gc", "()V"); java_ray_exception_class = LoadClass(env, "io/ray/runtime/exception/RayException"); + java_ray_intentional_system_exit_exception_class = + LoadClass(env, "io/ray/runtime/exception/RayIntentionalSystemExitException"); java_jni_exception_util_class = LoadClass(env, "io/ray/runtime/util/JniExceptionUtil"); java_jni_exception_util_get_stack_trace = env->GetStaticMethodID( @@ -272,6 +275,7 @@ void JNI_OnUnload(JavaVM *vm, void *reserved) { env->DeleteGlobalRef(java_map_entry_class); env->DeleteGlobalRef(java_system_class); env->DeleteGlobalRef(java_ray_exception_class); + env->DeleteGlobalRef(java_ray_intentional_system_exit_exception_class); env->DeleteGlobalRef(java_jni_exception_util_class); env->DeleteGlobalRef(java_base_id_class); env->DeleteGlobalRef(java_function_descriptor_class); diff --git a/src/ray/core_worker/lib/java/jni_utils.h b/src/ray/core_worker/lib/java/jni_utils.h index a5c14d76f..406cd71d4 100644 --- a/src/ray/core_worker/lib/java/jni_utils.h +++ b/src/ray/core_worker/lib/java/jni_utils.h @@ -94,6 +94,9 @@ extern jmethodID java_system_gc; /// RayException class extern jclass java_ray_exception_class; +/// RayIntentionalSystemExitException class +extern jclass java_ray_intentional_system_exit_exception_class; + /// JniExceptionUtil class extern jclass java_jni_exception_util_class; /// getStackTrace method of JniExceptionUtil class