From 692fdc6bc37d5bc6ab9fbcdf7cfa3526b829ed8f Mon Sep 17 00:00:00 2001 From: Wang Qing Date: Sun, 6 Jan 2019 00:29:08 +0800 Subject: [PATCH] [Java] Allow actor handle to be serialized without forking (#3686) --- .../src/main/java/org/ray/api/RayActor.java | 1 + .../java/org/ray/runtime/RayActorImpl.java | 18 +++++++++++-- .../ray/runtime/util/RayActorSerializer.java | 25 +++++++++++++++++++ .../java/org/ray/runtime/util/Serializer.java | 8 ++++-- .../main/java/org/ray/api/test/ActorTest.java | 11 +++++++- 5 files changed, 58 insertions(+), 5 deletions(-) create mode 100644 java/runtime/src/main/java/org/ray/runtime/util/RayActorSerializer.java diff --git a/java/api/src/main/java/org/ray/api/RayActor.java b/java/api/src/main/java/org/ray/api/RayActor.java index 0295e43f9..eef8eb05a 100644 --- a/java/api/src/main/java/org/ray/api/RayActor.java +++ b/java/api/src/main/java/org/ray/api/RayActor.java @@ -17,4 +17,5 @@ public interface RayActor { * @return The id of this actor handle. */ UniqueId getHandleId(); + } diff --git a/java/runtime/src/main/java/org/ray/runtime/RayActorImpl.java b/java/runtime/src/main/java/org/ray/runtime/RayActorImpl.java index 5f0ae13ea..64fe45a86 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayActorImpl.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayActorImpl.java @@ -67,7 +67,17 @@ public final class RayActorImpl implements RayActor, Externalizable { return taskCounter++; } - private UniqueId computeNextActorHandleId() { + public RayActorImpl fork() { + RayActorImpl ret = new RayActorImpl<>(); + ret.id = this.id; + ret.taskCounter = 0; + ret.numForks = 0; + ret.taskCursor = this.taskCursor; + ret.handleId = this.computeNextActorHandleId(); + return ret; + } + + protected UniqueId computeNextActorHandleId() { byte[] bytes = Sha1Digestor.digest(handleId.getBytes(), ++numForks); return new UniqueId(bytes); } @@ -75,8 +85,10 @@ public final class RayActorImpl implements RayActor, Externalizable { @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(this.id); - out.writeObject(this.computeNextActorHandleId()); + out.writeObject(this.handleId); out.writeObject(this.taskCursor); + out.writeObject(this.taskCounter); + out.writeObject(this.numForks); } @Override @@ -84,5 +96,7 @@ public final class RayActorImpl implements RayActor, Externalizable { this.id = (UniqueId) in.readObject(); this.handleId = (UniqueId) in.readObject(); this.taskCursor = (UniqueId) in.readObject(); + this.taskCounter = (int) in.readObject(); + this.numForks = (int) in.readObject(); } } diff --git a/java/runtime/src/main/java/org/ray/runtime/util/RayActorSerializer.java b/java/runtime/src/main/java/org/ray/runtime/util/RayActorSerializer.java new file mode 100644 index 000000000..24c9a3284 --- /dev/null +++ b/java/runtime/src/main/java/org/ray/runtime/util/RayActorSerializer.java @@ -0,0 +1,25 @@ +package org.ray.runtime.util; + +import java.io.IOException; +import org.nustaq.serialization.FSTBasicObjectSerializer; +import org.nustaq.serialization.FSTClazzInfo; +import org.nustaq.serialization.FSTClazzInfo.FSTFieldInfo; +import org.nustaq.serialization.FSTObjectInput; +import org.nustaq.serialization.FSTObjectOutput; +import org.ray.runtime.RayActorImpl; + +public class RayActorSerializer extends FSTBasicObjectSerializer { + + @Override + public void writeObject(FSTObjectOutput out, Object toWrite, FSTClazzInfo clzInfo, + FSTClazzInfo.FSTFieldInfo referencedBy, int streamPosition) throws IOException { + ((RayActorImpl) toWrite).fork().writeExternal(out); + } + + @Override + public void readObject(FSTObjectInput in, Object toRead, FSTClazzInfo clzInfo, + FSTFieldInfo referencedBy) throws Exception { + super.readObject(in, toRead, clzInfo, referencedBy); + ((RayActorImpl) toRead).readExternal(in); + } +} diff --git a/java/runtime/src/main/java/org/ray/runtime/util/Serializer.java b/java/runtime/src/main/java/org/ray/runtime/util/Serializer.java index 4eb1d72b5..ab5080aaf 100644 --- a/java/runtime/src/main/java/org/ray/runtime/util/Serializer.java +++ b/java/runtime/src/main/java/org/ray/runtime/util/Serializer.java @@ -1,14 +1,18 @@ package org.ray.runtime.util; import org.nustaq.serialization.FSTConfiguration; +import org.ray.runtime.RayActorImpl; /** * Java object serialization TODO: use others (e.g. Arrow) for higher performance */ public class Serializer { - static final ThreadLocal conf = ThreadLocal.withInitial( - FSTConfiguration::createDefaultConfiguration); + private static final ThreadLocal conf = ThreadLocal.withInitial(() -> { + FSTConfiguration conf = FSTConfiguration.createDefaultConfiguration(); + conf.registerSerializer(RayActorImpl.class, new RayActorSerializer(), true); + return conf; + }); public static byte[] encode(Object obj) { return conf.get().asByteArray(obj); diff --git a/java/test/src/main/java/org/ray/api/test/ActorTest.java b/java/test/src/main/java/org/ray/api/test/ActorTest.java index c8fb6ce6e..b7b839a79 100644 --- a/java/test/src/main/java/org/ray/api/test/ActorTest.java +++ b/java/test/src/main/java/org/ray/api/test/ActorTest.java @@ -8,6 +8,7 @@ import org.ray.api.RayObject; import org.ray.api.annotation.RayRemote; import org.ray.api.function.RayFunc2; import org.ray.api.id.UniqueId; +import org.ray.runtime.RayActorImpl; public class ActorTest extends BaseTest { @@ -69,10 +70,18 @@ public class ActorTest extends BaseTest { @Test public void testPassActorAsParameter() { RayActor actor = Ray.createActor(Counter::new, 0); - RayFunc2 f = ActorTest::testActorAsFirstParameter; Assert.assertEquals(Integer.valueOf(1), Ray.call(ActorTest::testActorAsFirstParameter, actor, 1).get()); Assert.assertEquals(Integer.valueOf(11), Ray.call(ActorTest::testActorAsSecondParameter, 10, actor).get()); } + + @Test + public void testForkingActorHandle() { + RayActor counter = Ray.createActor(Counter::new, 100); + Assert.assertEquals(Integer.valueOf(101), Ray.call(Counter::increase, counter, 1).get()); + RayActor counter2 = ((RayActorImpl) counter).fork(); + Assert.assertEquals(Integer.valueOf(103), Ray.call(Counter::increase, counter2, 2).get()); + } + }