Add a flag to disable reconstruction for a killed actor (#7346)

This commit is contained in:
Kai Yang
2020-03-13 19:10:21 +08:00
committed by GitHub
parent 575c89cf47
commit d6e8f47065
17 changed files with 135 additions and 52 deletions
@@ -181,16 +181,6 @@ public final class Ray extends RayCall {
runtime.setResource(resourceName, capacity, UniqueId.NIL);
}
/**
* Kill the actor immediately. This will cause any outstanding tasks submitted to the actor to
* fail and the actor to exit in the same way as if it crashed.
*
* @param actor The actor to be killed.
*/
public static void killActor(RayActor<?> actor) {
runtime.killActor(actor);
}
/**
* Get the runtime context.
*/
@@ -33,4 +33,13 @@ public interface RayActor<A> extends ActorCall<A> {
*/
ActorId getId();
/**
* Kill the actor immediately. This will cause any outstanding tasks submitted to the actor to
* fail and the actor to exit in the same way as if it crashed.
*
* @param noReconstruction If set to true, the killed actor will not be reconstructed anymore.
*/
default void kill(boolean noReconstruction) {
Ray.internal().killActor(this, noReconstruction);
}
}
@@ -80,8 +80,9 @@ public interface RayRuntime {
* Kill the actor immediately.
*
* @param actor The actor to be killed.
* @param noReconstruction If set to true, the killed actor will not be reconstructed anymore.
*/
void killActor(RayActor<?> actor);
void killActor(RayActor<?> actor, boolean noReconstruction);
/**
* Invoke a remote function.
@@ -1,7 +1,6 @@
package org.ray.runtime;
import java.util.concurrent.atomic.AtomicInteger;
import org.ray.api.RayActor;
import org.ray.api.id.JobId;
import org.ray.api.id.UniqueId;
@@ -50,7 +49,7 @@ public class RayDevRuntime extends AbstractRayRuntime {
}
@Override
public void killActor(RayActor<?> actor) {
public void killActor(RayActor<?> actor, boolean noReconstruction) {
throw new UnsupportedOperationException();
}
@@ -139,8 +139,8 @@ public class RayMultiWorkerNativeRuntime implements RayRuntime {
}
@Override
public void killActor(RayActor<?> actor) {
getCurrentRuntime().killActor(actor);
public void killActor(RayActor<?> actor, boolean noReconstruction) {
getCurrentRuntime().killActor(actor, noReconstruction);
}
@Override
@@ -135,8 +135,8 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
}
@Override
public void killActor(RayActor<?> actor) {
nativeKillActor(nativeCoreWorkerPointer, actor.getId().getBytes());
public void killActor(RayActor<?> actor, boolean noReconstruction) {
nativeKillActor(nativeCoreWorkerPointer, actor.getId().getBytes(), noReconstruction);
}
@Override
@@ -200,5 +200,6 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
private static native void nativeSetResource(long conn, String resourceName, double capacity,
byte[] nodeId);
private static native void nativeKillActor(long nativeCoreWorkerPointer, byte[] actorId);
private static native void nativeKillActor(long nativeCoreWorkerPointer, byte[] actorId,
boolean noReconstruction);
}
@@ -1,21 +1,35 @@
package org.ray.api.test;
import com.google.common.collect.ImmutableList;
import java.util.function.BiConsumer;
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.RayObject;
import org.ray.api.TestUtils;
import org.ray.api.exception.RayActorException;
import org.ray.api.options.ActorCreationOptions;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test
public class KillActorTest extends BaseTest {
@BeforeClass
public void setUp() {
System.setProperty("ray.raylet.config.num_workers_per_process_java", "1");
}
@AfterClass
public void tearDown() {
System.clearProperty("ray.raylet.config.num_workers_per_process_java");
}
public static class HangActor {
public boolean alive() {
return true;
public String ping() {
return "pong";
}
public boolean hang() throws InterruptedException {
@@ -25,13 +39,60 @@ public class KillActorTest extends BaseTest {
}
}
public void testKillActor() {
public static class KillerActor {
public void kill(RayActor<?> actor, boolean noReconstruction) {
actor.kill(noReconstruction);
}
}
private static void localKill(RayActor<?> actor, boolean noReconstruction) {
actor.kill(noReconstruction);
}
private static void remoteKill(RayActor<?> actor, boolean noReconstruction) {
RayActor<KillerActor> killer = Ray.createActor(KillerActor::new);
killer.call(KillerActor::kill, actor, noReconstruction);
}
private void testKillActor(BiConsumer<RayActor<?>, Boolean> kill, boolean noReconstruction) {
TestUtils.skipTestUnderSingleProcess();
RayActor<HangActor> actor = Ray.createActor(HangActor::new);
Assert.assertTrue(actor.call(HangActor::alive).get());
ActorCreationOptions options =
new ActorCreationOptions.Builder().setMaxReconstructions(1).createActorCreationOptions();
RayActor<HangActor> actor = Ray.createActor(HangActor::new, options);
RayObject<Boolean> result = actor.call(HangActor::hang);
// The actor will hang in this task.
Assert.assertEquals(0, Ray.wait(ImmutableList.of(result), 1, 500).getReady().size());
Ray.killActor(actor);
// Kill the actor
kill.accept(actor, noReconstruction);
// The get operation will fail with RayActorException
Assert.expectThrows(RayActorException.class, result::get);
try {
// Sleep 1s here to make sure the driver has received the actor notification
// (of state RECONSTRUCTING or DEAD).
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (noReconstruction) {
// The actor should not be reconstructed.
Assert.expectThrows(RayActorException.class, () -> actor.call(HangActor::hang).get());
} else {
Assert.assertEquals(actor.call(HangActor::ping).get(), "pong");
}
}
public void testLocalKill() {
testKillActor(KillActorTest::localKill, false);
testKillActor(KillActorTest::localKill, true);
}
public void testRemoteKill() {
testKillActor(KillActorTest::remoteKill, false);
testKillActor(KillActorTest::remoteKill, true);
}
}