[Java] add exitActor API for java (#10496)

This commit is contained in:
chaokunyang
2020-09-04 10:11:42 +08:00
committed by GitHub
parent 5e4db6ad24
commit cf3875bd8c
11 changed files with 195 additions and 0 deletions
@@ -241,4 +241,16 @@ public final class Ray extends RayCall {
PlacementStrategy strategy) {
return runtime.createPlacementGroup(bundles, strategy);
}
/**
* Intentionally exit the current actor.
* <p>
* This method is used to disconnect an actor and exit the worker.
*
* @throws RuntimeException An exception is raised if this is a driver or this worker is not
* an actor.
*/
public static void exitActor() {
runtime.exitActor();
}
}
@@ -193,4 +193,9 @@ public interface RayRuntime {
* @return The wrapped callable.
*/
<T> Callable<T> wrapCallable(Callable<T> callable);
/**
* Intentionally exit the current actor.
*/
void exitActor();
}
@@ -84,6 +84,11 @@ public class RayDevRuntime extends AbstractRayRuntime {
super.setAsyncContext(asyncContext);
}
@Override
public void exitActor() {
}
private JobId nextJobId() {
return JobId.fromInt(jobCounter.getAndIncrement());
}
@@ -9,6 +9,7 @@ import io.ray.api.runtimecontext.NodeInfo;
import io.ray.runtime.config.RayConfig;
import io.ray.runtime.context.NativeWorkerContext;
import io.ray.runtime.exception.RayException;
import io.ray.runtime.exception.RayIntentionalSystemExitException;
import io.ray.runtime.gcs.GcsClient;
import io.ray.runtime.gcs.GcsClientOptions;
import io.ray.runtime.gcs.RedisClient;
@@ -245,6 +246,16 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
super.setAsyncContext(asyncContext);
}
@Override
public void exitActor() {
if (rayConfig.workerMode != WorkerType.WORKER || runtimeContext.getCurrentActorId().isNil()) {
throw new RuntimeException("This shouldn't be called on a non-actor worker.");
}
LOGGER.info("Actor {} is exiting.", runtimeContext.getCurrentActorId());
throw new RayIntentionalSystemExitException(
String.format("Actor %s is exiting.", runtimeContext.getCurrentActorId()));
}
@Override
public void run() {
Preconditions.checkState(rayConfig.workerMode == WorkerType.WORKER);
@@ -0,0 +1,15 @@
package io.ray.runtime.exception;
/**
* The exception represents that there is an intentional system exit.
*/
public class RayIntentionalSystemExitException extends RuntimeException {
public RayIntentionalSystemExitException(String message) {
super(message);
}
public RayIntentionalSystemExitException(String message, Throwable cause) {
super(message, cause);
}
}
@@ -6,6 +6,7 @@ import io.ray.api.id.JobId;
import io.ray.api.id.TaskId;
import io.ray.api.id.UniqueId;
import io.ray.runtime.RayRuntimeInternal;
import io.ray.runtime.exception.RayIntentionalSystemExitException;
import io.ray.runtime.exception.RayTaskException;
import io.ray.runtime.functionmanager.JavaFunctionDescriptor;
import io.ray.runtime.functionmanager.RayFunction;
@@ -159,6 +160,12 @@ public abstract class TaskExecutor<T extends TaskExecutor.ActorContext> {
}
LOGGER.debug("Finished executing task {}", taskId);
} catch (Throwable e) {
if (e instanceof RayIntentionalSystemExitException) {
// We don't need to fill the `returnObjects` with an exception metadata
// because the node manager or the direct actor task submitter will fill
// the return object with the ACTOR_DIED metadata.
throw (RayIntentionalSystemExitException) e;
}
LOGGER.error("Error executing task " + taskId, e);
if (taskType != TaskType.ACTOR_CREATION_TASK) {
boolean hasReturn = rayFunction != null && rayFunction.hasReturn();
@@ -1,5 +1,6 @@
package io.ray.runtime.util;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.concurrent.locks.ReentrantLock;
@@ -34,4 +35,15 @@ public class SystemUtil {
return pid;
}
public static boolean isProcessAlive(int pid) {
Process process;
try {
process = Runtime.getRuntime().exec(new String[]{"ps", "-p", String.valueOf(pid)});
process.waitFor();
} catch (InterruptedException | IOException e) {
throw new RuntimeException(e);
}
return process.exitValue() == 0;
}
}
@@ -0,0 +1,114 @@
package io.ray.test;
import static io.ray.runtime.util.SystemUtil.pid;
import io.ray.api.ActorHandle;
import io.ray.api.Checkpointable;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.id.ActorId;
import io.ray.api.id.UniqueId;
import io.ray.runtime.exception.RayActorException;
import io.ray.runtime.util.SystemUtil;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.Test;
@Test(groups = {"cluster"})
public class ExitActorTest extends BaseTest {
private static class ExitingActor implements Checkpointable {
int counter = 0;
public Integer incr() {
return ++counter;
}
public int getPid() {
return pid();
}
@Override
public boolean shouldCheckpoint(CheckpointContext checkpointContext) {
return true;
}
@Override
public void saveCheckpoint(ActorId actorId, UniqueId checkpointId) {
}
@Override
public UniqueId loadCheckpoint(ActorId actorId, List<Checkpoint> availableCheckpoints) {
// Dummy load checkpoint.
this.counter = 1;
return availableCheckpoints.get(availableCheckpoints.size() - 1).checkpointId;
}
@Override
public void checkpointExpired(ActorId actorId, UniqueId checkpointId) {
}
public boolean exit() {
Ray.exitActor();
return false;
}
}
public void testExitActor() throws IOException, InterruptedException {
ActorHandle<ExitingActor> actor = Ray.actor(ExitingActor::new)
.setMaxRestarts(10000).remote();
Assert.assertEquals(1, (int) (actor.task(ExitingActor::incr).remote().get()));
int pid = actor.task(ExitingActor::getPid).remote().get();
Runtime.getRuntime().exec("kill -9 " + pid);
TimeUnit.SECONDS.sleep(1);
// Make sure this actor can be reconstructed.
Assert.assertEquals(2, (int) actor.task(ExitingActor::incr).remote().get());
// `exitActor` will exit the actor without reconstructing.
ObjectRef<Boolean> obj = actor.task(ExitingActor::exit).remote();
Assert.assertThrows(RayActorException.class, obj::get);
}
public void testExitActorInMultiWorker() {
Assert.assertTrue(TestUtils.getRuntime().getRayConfig().numWorkersPerProcess > 1);
ActorHandle<ExitingActor> actor1 = Ray.actor(ExitingActor::new)
.setMaxRestarts(10000).remote();
int pid = actor1.task(ExitingActor::getPid).remote().get();
ActorHandle<ExitingActor> actor2;
while (true) {
// Create another actor which share the same process of actor 1.
actor2 = Ray.actor(ExitingActor::new).setMaxRestarts(0).remote();
int actor2Pid = actor2.task(ExitingActor::getPid).remote().get();
if (actor2Pid == pid) {
break;
}
}
ObjectRef<Boolean> obj1 = actor1.task(ExitingActor::exit).remote();
Assert.assertThrows(RayActorException.class, obj1::get);
Assert.assertTrue(SystemUtil.isProcessAlive(pid));
// Actor 2 shouldn't exit or be reconstructed.
Assert.assertEquals(1, (int) actor2.task(ExitingActor::incr).remote().get());
Assert.assertEquals(pid, (int) actor2.task(ExitingActor::getPid).remote().get());
Assert.assertTrue(SystemUtil.isProcessAlive(pid));
}
public void testExitActorWithDynamicOptions() {
ActorHandle<ExitingActor> actor = Ray.actor(ExitingActor::new)
.setMaxRestarts(10000)
// Set dummy JVM options to start a worker process with only one worker.
.setJvmOptions(" ")
.remote();
int pid = actor.task(ExitingActor::getPid).remote().get();
Assert.assertTrue(SystemUtil.isProcessAlive(pid));
ObjectRef<Boolean> obj1 = actor.task(ExitingActor::exit).remote();
Assert.assertThrows(RayActorException.class, obj1::get);
// Now the actor shouldn't be reconstructed anymore.
Assert.assertThrows(RayActorException.class,
() -> actor.task(ExitingActor::getPid).remote().get());
// Now the worker process should be dead.
Assert.assertTrue(TestUtils.waitForCondition(() -> !SystemUtil.isProcessAlive(pid), 5000));
}
}