From ddd4c42fe5aaf0d2b494b4a23bf8f4f12e62561f Mon Sep 17 00:00:00 2001 From: Kai Yang Date: Tue, 14 Jan 2020 17:12:00 +0800 Subject: [PATCH] [Java] Add killActor API in Java (#6728) * Add killActor API in Java * fix javadoc * update test case * Address comments --- java/api/src/main/java/org/ray/api/Ray.java | 11 +++++ .../java/org/ray/api/runtime/RayRuntime.java | 7 ++++ .../java/org/ray/runtime/RayDevRuntime.java | 6 +++ .../runtime/RayMultiWorkerNativeRuntime.java | 5 +++ .../org/ray/runtime/RayNativeRuntime.java | 12 ++++++ .../src/main/java/org/ray/api/TestUtils.java | 13 +++++- .../java/org/ray/api/test/KillActorTest.java | 40 +++++++++++++++++++ .../java/org_ray_runtime_RayNativeRuntime.cc | 7 ++++ .../java/org_ray_runtime_RayNativeRuntime.h | 10 +++++ 9 files changed, 109 insertions(+), 2 deletions(-) create mode 100644 java/test/src/main/java/org/ray/api/test/KillActorTest.java diff --git a/java/api/src/main/java/org/ray/api/Ray.java b/java/api/src/main/java/org/ray/api/Ray.java index 375f43786..7cddaf58b 100644 --- a/java/api/src/main/java/org/ray/api/Ray.java +++ b/java/api/src/main/java/org/ray/api/Ray.java @@ -130,6 +130,7 @@ public final class Ray extends RayCall { /** * Set the async context for the current thread. + * * @param asyncContext The async context to set. */ public static void setAsyncContext(Object asyncContext) { @@ -180,6 +181,16 @@ public final class Ray extends RayCall { runtime.setResource(resourceName, capacity, UniqueId.NIL); } + /** + * Kill the actor immediately. This will cause any outstanding tasks submitted to the actor to + * fail and the actor to exit in the same way as if it crashed. + * + * @param actor The actor to be killed. + */ + public static void killActor(RayActor actor) { + runtime.killActor(actor); + } + /** * Get the runtime context. */ diff --git a/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java b/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java index 059e85aea..1881064ed 100644 --- a/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java +++ b/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java @@ -76,6 +76,13 @@ public interface RayRuntime { */ void setResource(String resourceName, double capacity, UniqueId nodeId); + /** + * Kill the actor immediately. + * + * @param actor The actor to be killed. + */ + void killActor(RayActor actor); + /** * Invoke a remote function. * diff --git a/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java index 8582238f0..99eaea59b 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java @@ -2,6 +2,7 @@ package org.ray.runtime; import java.util.concurrent.atomic.AtomicInteger; +import org.ray.api.RayActor; import org.ray.api.id.JobId; import org.ray.api.id.UniqueId; import org.ray.runtime.config.RayConfig; @@ -47,6 +48,11 @@ public class RayDevRuntime extends AbstractRayRuntime { LOGGER.error("Not implemented under SINGLE_PROCESS mode."); } + @Override + public void killActor(RayActor actor) { + throw new UnsupportedOperationException(); + } + @Override public Object getAsyncContext() { return null; diff --git a/java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java index 335ed032a..f142cb67c 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java @@ -140,6 +140,11 @@ public class RayMultiWorkerNativeRuntime implements RayRuntime { getCurrentRuntime().setResource(resourceName, capacity, nodeId); } + @Override + public void killActor(RayActor actor) { + getCurrentRuntime().killActor(actor); + } + @Override public RayObject call(RayFunc func, Object[] args, CallOptions options) { return getCurrentRuntime().call(func, args, options); diff --git a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java index 5e035f7da..4c8b6e42f 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java @@ -6,8 +6,10 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.commons.io.FileUtils; +import org.ray.api.RayActor; import org.ray.api.id.JobId; import org.ray.api.id.UniqueId; +import org.ray.runtime.actor.NativeRayActor; import org.ray.runtime.config.RayConfig; import org.ray.runtime.context.NativeWorkerContext; import org.ray.runtime.functionmanager.FunctionManager; @@ -127,6 +129,14 @@ public final class RayNativeRuntime extends AbstractRayRuntime { nativeSetResource(nativeCoreWorkerPointer, resourceName, capacity, nodeId.getBytes()); } + @Override + public void killActor(RayActor actor) { + if (!((NativeRayActor) actor).isDirectCallActor()) { + throw new UnsupportedOperationException("Only direct call actors can be killed."); + } + nativeKillActor(nativeCoreWorkerPointer, actor.getId().getBytes()); + } + @Override public Object getAsyncContext() { return null; @@ -184,4 +194,6 @@ public final class RayNativeRuntime extends AbstractRayRuntime { private static native void nativeSetResource(long conn, String resourceName, double capacity, byte[] nodeId); + + private static native void nativeKillActor(long nativeCoreWorkerPointer, byte[] actorId); } diff --git a/java/test/src/main/java/org/ray/api/TestUtils.java b/java/test/src/main/java/org/ray/api/TestUtils.java index e036aa7d2..486e55fe0 100644 --- a/java/test/src/main/java/org/ray/api/TestUtils.java +++ b/java/test/src/main/java/org/ray/api/TestUtils.java @@ -28,8 +28,17 @@ public class TestUtils { } public static void skipTestIfDirectActorCallEnabled() { - if (ActorCreationOptions.DEFAULT_USE_DIRECT_CALL) { - throw new SkipException("This test doesn't work when direct actor call is enabled."); + skipTestIfDirectActorCallEnabled(true); + } + + public static void skipTestIfDirectActorCallDisabled() { + skipTestIfDirectActorCallEnabled(false); + } + + private static void skipTestIfDirectActorCallEnabled(boolean enabled) { + if (enabled == ActorCreationOptions.DEFAULT_USE_DIRECT_CALL) { + throw new SkipException(String.format("This test doesn't work when direct actor call is %s.", + enabled ? "enabled" : "disabled")); } } diff --git a/java/test/src/main/java/org/ray/api/test/KillActorTest.java b/java/test/src/main/java/org/ray/api/test/KillActorTest.java new file mode 100644 index 000000000..9cf7b4eec --- /dev/null +++ b/java/test/src/main/java/org/ray/api/test/KillActorTest.java @@ -0,0 +1,40 @@ +package org.ray.api.test; + +import com.google.common.collect.ImmutableList; +import org.ray.api.Ray; +import org.ray.api.RayActor; +import org.ray.api.RayObject; +import org.ray.api.TestUtils; +import org.ray.api.annotation.RayRemote; +import org.ray.api.exception.RayActorException; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Test(groups = { "directCall" }) +public class KillActorTest extends BaseTest { + + @RayRemote + public static class HangActor { + + public boolean alive() { + return true; + } + + public boolean hang() throws InterruptedException { + while (true) { + Thread.sleep(1000); + } + } + } + + public void testKillActor() { + TestUtils.skipTestUnderSingleProcess(); + TestUtils.skipTestIfDirectActorCallDisabled(); + RayActor actor = Ray.createActor(HangActor::new); + Assert.assertTrue(Ray.call(HangActor::alive, actor).get()); + RayObject result = Ray.call(HangActor::hang, actor); + Assert.assertEquals(0, Ray.wait(ImmutableList.of(result), 1, 500).getReady().size()); + Ray.killActor(actor); + Assert.expectThrows(RayActorException.class, result::get); + } +} diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc index 7ba94f67a..0c44e8e4f 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc @@ -133,6 +133,13 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeSetResource( THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0); } +JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeKillActor( + JNIEnv *env, jclass, jlong nativeCoreWorkerPointer, jbyteArray actorId) { + auto core_worker = reinterpret_cast(nativeCoreWorkerPointer); + auto status = core_worker->KillActor(JavaByteArrayToId(env, actorId)); + THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0); +} + #ifdef __cplusplus } #endif diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.h b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.h index 48bc81f04..55f9ca298 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.h +++ b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.h @@ -56,6 +56,16 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeShutdownHook( JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeSetResource( JNIEnv *, jclass, jlong, jstring, jdouble, jbyteArray); +/* + * Class: org_ray_runtime_RayNativeRuntime + * Method: nativeKillActor + * Signature: (J[B)V + */ +JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeKillActor(JNIEnv *, + jclass, + jlong, + jbyteArray); + #ifdef __cplusplus } #endif