[Java] Allow actor handle to be serialized without forking (#3686)

This commit is contained in:
Wang Qing
2019-01-06 00:29:08 +08:00
committed by Hao Chen
parent 03fe760616
commit 692fdc6bc3
5 changed files with 58 additions and 5 deletions
@@ -17,4 +17,5 @@ public interface RayActor<T> {
* @return The id of this actor handle.
*/
UniqueId getHandleId();
}
@@ -67,7 +67,17 @@ public final class RayActorImpl<T> implements RayActor<T>, Externalizable {
return taskCounter++;
}
private UniqueId computeNextActorHandleId() {
public RayActorImpl<T> fork() {
RayActorImpl<T> ret = new RayActorImpl<>();
ret.id = this.id;
ret.taskCounter = 0;
ret.numForks = 0;
ret.taskCursor = this.taskCursor;
ret.handleId = this.computeNextActorHandleId();
return ret;
}
protected UniqueId computeNextActorHandleId() {
byte[] bytes = Sha1Digestor.digest(handleId.getBytes(), ++numForks);
return new UniqueId(bytes);
}
@@ -75,8 +85,10 @@ public final class RayActorImpl<T> implements RayActor<T>, Externalizable {
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(this.id);
out.writeObject(this.computeNextActorHandleId());
out.writeObject(this.handleId);
out.writeObject(this.taskCursor);
out.writeObject(this.taskCounter);
out.writeObject(this.numForks);
}
@Override
@@ -84,5 +96,7 @@ public final class RayActorImpl<T> implements RayActor<T>, Externalizable {
this.id = (UniqueId) in.readObject();
this.handleId = (UniqueId) in.readObject();
this.taskCursor = (UniqueId) in.readObject();
this.taskCounter = (int) in.readObject();
this.numForks = (int) in.readObject();
}
}
@@ -0,0 +1,25 @@
package org.ray.runtime.util;
import java.io.IOException;
import org.nustaq.serialization.FSTBasicObjectSerializer;
import org.nustaq.serialization.FSTClazzInfo;
import org.nustaq.serialization.FSTClazzInfo.FSTFieldInfo;
import org.nustaq.serialization.FSTObjectInput;
import org.nustaq.serialization.FSTObjectOutput;
import org.ray.runtime.RayActorImpl;
public class RayActorSerializer extends FSTBasicObjectSerializer {
@Override
public void writeObject(FSTObjectOutput out, Object toWrite, FSTClazzInfo clzInfo,
FSTClazzInfo.FSTFieldInfo referencedBy, int streamPosition) throws IOException {
((RayActorImpl) toWrite).fork().writeExternal(out);
}
@Override
public void readObject(FSTObjectInput in, Object toRead, FSTClazzInfo clzInfo,
FSTFieldInfo referencedBy) throws Exception {
super.readObject(in, toRead, clzInfo, referencedBy);
((RayActorImpl) toRead).readExternal(in);
}
}
@@ -1,14 +1,18 @@
package org.ray.runtime.util;
import org.nustaq.serialization.FSTConfiguration;
import org.ray.runtime.RayActorImpl;
/**
* Java object serialization TODO: use others (e.g. Arrow) for higher performance
*/
public class Serializer {
static final ThreadLocal<FSTConfiguration> conf = ThreadLocal.withInitial(
FSTConfiguration::createDefaultConfiguration);
private static final ThreadLocal<FSTConfiguration> conf = ThreadLocal.withInitial(() -> {
FSTConfiguration conf = FSTConfiguration.createDefaultConfiguration();
conf.registerSerializer(RayActorImpl.class, new RayActorSerializer(), true);
return conf;
});
public static byte[] encode(Object obj) {
return conf.get().asByteArray(obj);
@@ -8,6 +8,7 @@ import org.ray.api.RayObject;
import org.ray.api.annotation.RayRemote;
import org.ray.api.function.RayFunc2;
import org.ray.api.id.UniqueId;
import org.ray.runtime.RayActorImpl;
public class ActorTest extends BaseTest {
@@ -69,10 +70,18 @@ public class ActorTest extends BaseTest {
@Test
public void testPassActorAsParameter() {
RayActor<Counter> actor = Ray.createActor(Counter::new, 0);
RayFunc2<RayActor, Integer, Integer> f = ActorTest::testActorAsFirstParameter;
Assert.assertEquals(Integer.valueOf(1),
Ray.call(ActorTest::testActorAsFirstParameter, actor, 1).get());
Assert.assertEquals(Integer.valueOf(11),
Ray.call(ActorTest::testActorAsSecondParameter, 10, actor).get());
}
@Test
public void testForkingActorHandle() {
RayActor<Counter> counter = Ray.createActor(Counter::new, 100);
Assert.assertEquals(Integer.valueOf(101), Ray.call(Counter::increase, counter, 1).get());
RayActor<Counter> counter2 = ((RayActorImpl<Counter>) counter).fork();
Assert.assertEquals(Integer.valueOf(103), Ray.call(Counter::increase, counter2, 2).get());
}
}