fix named actor single process mode bug (#9652)

This commit is contained in:
fangfengbin
2020-07-23 20:28:50 +08:00
committed by GitHub
parent 5ab17e0dd8
commit 01d6edae9c
2 changed files with 29 additions and 25 deletions
@@ -174,9 +174,11 @@ public class LocalModeTaskSubmitter implements TaskSubmitter {
= new LocalModeActorHandle(actorId, getReturnIds(taskSpec).get(0));
actorHandles.put(actorId, actorHandle.copy());
if (StringUtils.isNotBlank(options.name)) {
Preconditions.checkArgument(!namedActors.containsKey(options.name),
String.format("Actor of name %s exists", options.name));
namedActors.put(options.name, actorHandle);
String fullName = options.global ? options.name :
String.format("%s-%s", Ray.getRuntimeContext().getCurrentJobId(), options.name);
Preconditions.checkArgument(!namedActors.containsKey(fullName),
String.format("Actor of name %s exists", fullName));
namedActors.put(fullName, actorHandle);
}
return actorHandle;
}
@@ -215,11 +217,11 @@ public class LocalModeTaskSubmitter implements TaskSubmitter {
public Optional<BaseActorHandle> getActor(String name, boolean global) {
String fullName = global ? name :
String.format("%s-%s", Ray.getRuntimeContext().getCurrentJobId(), name);
if (namedActors.containsKey(fullName)) {
return Optional.of(namedActors.get(fullName));
} else {
ActorHandle actorHandle = namedActors.get(fullName);
if (null == actorHandle) {
return Optional.empty();
}
return Optional.of(actorHandle);
}
public void shutdown() {
@@ -52,26 +52,28 @@ public class NamedActorTest extends BaseTest {
Assert.assertEquals(namedActor.get().task(Counter::increment).remote().get(),
Integer.valueOf(2));
// Get the global actor from another driver.
RayConfig rayConfig = TestUtils.getRuntime().getRayConfig();
ProcessBuilder builder = new ProcessBuilder(
"java",
"-cp",
System.getProperty("java.class.path"),
"-Dray.redis.address=" + rayConfig.getRedisAddress(),
"-Dray.object-store.socket-name=" + rayConfig.objectStoreSocketName,
"-Dray.raylet.socket-name=" + rayConfig.rayletSocketName,
"-Dray.raylet.node-manager-port=" + rayConfig.getNodeManagerPort(),
NamedActorTest.class.getName(),
name);
builder.redirectError(ProcessBuilder.Redirect.INHERIT);
Process driver = builder.start();
Assert.assertTrue(driver.waitFor(60, TimeUnit.SECONDS));
Assert.assertEquals(driver.exitValue(), 0,
"The driver exited with code " + driver.exitValue());
if (!TestUtils.isSingleProcessMode()) {
// Get the global actor from another driver.
RayConfig rayConfig = TestUtils.getRuntime().getRayConfig();
ProcessBuilder builder = new ProcessBuilder(
"java",
"-cp",
System.getProperty("java.class.path"),
"-Dray.redis.address=" + rayConfig.getRedisAddress(),
"-Dray.object-store.socket-name=" + rayConfig.objectStoreSocketName,
"-Dray.raylet.socket-name=" + rayConfig.rayletSocketName,
"-Dray.raylet.node-manager-port=" + rayConfig.getNodeManagerPort(),
NamedActorTest.class.getName(),
name);
builder.redirectError(ProcessBuilder.Redirect.INHERIT);
Process driver = builder.start();
Assert.assertTrue(driver.waitFor(60, TimeUnit.SECONDS));
Assert.assertEquals(driver.exitValue(), 0,
"The driver exited with code " + driver.exitValue());
Assert.assertEquals(namedActor.get().task(Counter::increment).remote().get(),
Integer.valueOf(4));
Assert.assertEquals(namedActor.get().task(Counter::increment).remote().get(),
Integer.valueOf(4));
}
}
public static void main(String[] args) {