[Java worker] Migrate task execution and submission on top of core worker (#5370)

This commit is contained in:
Kai Yang
2019-08-16 13:52:13 +08:00
committed by Hao Chen
parent 3a853121b9
commit b1aae0e398
95 changed files with 3069 additions and 2991 deletions
@@ -1,6 +1,8 @@
package org.ray.api.test;
import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.ray.api.Ray;
import org.ray.api.RayActor;
@@ -10,8 +12,8 @@ import org.ray.api.annotation.RayRemote;
import org.ray.api.exception.UnreconstructableException;
import org.ray.api.id.UniqueId;
import org.ray.runtime.AbstractRayRuntime;
import org.ray.runtime.RayActorImpl;
import org.ray.runtime.objectstore.NativeRayObject;
import org.ray.runtime.actor.NativeRayActor;
import org.ray.runtime.object.NativeRayObject;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -72,6 +74,12 @@ public class ActorTest extends BaseTest {
return res.get();
}
@RayRemote
public static int testActorAsFieldOfParameter(List<RayActor<Counter>> actor, int delta) {
RayObject<Integer> res = Ray.call(Counter::increase, actor.get(0), delta);
return res.get();
}
@Test
public void testPassActorAsParameter() {
RayActor<Counter> actor = Ray.createActor(Counter::new, 0);
@@ -79,13 +87,17 @@ public class ActorTest extends BaseTest {
Ray.call(ActorTest::testActorAsFirstParameter, actor, 1).get());
Assert.assertEquals(Integer.valueOf(11),
Ray.call(ActorTest::testActorAsSecondParameter, 10, actor).get());
Assert.assertEquals(Integer.valueOf(111),
Ray.call(ActorTest::testActorAsFieldOfParameter, Collections.singletonList(actor), 100)
.get());
}
@Test
public void testForkingActorHandle() {
TestUtils.skipTestUnderSingleProcess();
RayActor<Counter> counter = Ray.createActor(Counter::new, 100);
Assert.assertEquals(Integer.valueOf(101), Ray.call(Counter::increase, counter, 1).get());
RayActor<Counter> counter2 = ((RayActorImpl<Counter>) counter).fork();
RayActor<Counter> counter2 = ((NativeRayActor) counter).fork();
Assert.assertEquals(Integer.valueOf(103), Ray.call(Counter::increase, counter2, 2).get());
}
@@ -100,9 +112,8 @@ public class ActorTest extends BaseTest {
Ray.internal().free(ImmutableList.of(value.getId()), false, false);
// Wait until the object is deleted, because the above free operation is async.
while (true) {
NativeRayObject result = ((AbstractRayRuntime)
Ray.internal()).getObjectStoreProxy().getObjectInterface()
.get(ImmutableList.of(value.getId()), 0).get(0);
NativeRayObject result = ((AbstractRayRuntime) Ray.internal()).getObjectStore()
.getRaw(ImmutableList.of(value.getId()), 0).get(0);
if (result == null) {
break;
}
@@ -7,7 +7,7 @@ import org.ray.api.RayObject;
import org.ray.api.TestUtils;
import org.ray.api.exception.RayException;
import org.ray.api.id.ObjectId;
import org.ray.runtime.RayObjectImpl;
import org.ray.runtime.object.RayObjectImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -1,10 +1,12 @@
package org.ray.api.test;
import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import org.ray.api.Ray;
import org.ray.api.RayObject;
import org.ray.api.TestUtils;
import org.ray.api.annotation.RayRemote;
import org.ray.api.id.TaskId;
import org.ray.runtime.AbstractRayRuntime;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -24,8 +26,8 @@ public class PlasmaFreeTest extends BaseTest {
Ray.internal().free(ImmutableList.of(helloId.getId()), true, false);
final boolean result = TestUtils.waitForCondition(() ->
((AbstractRayRuntime) Ray.internal()).getObjectStoreProxy().getObjectInterface()
.get(ImmutableList.of(helloId.getId()), 0).get(0) == null, 50);
((AbstractRayRuntime) Ray.internal()).getObjectStore()
.getRaw(ImmutableList.of(helloId.getId()), 0).get(0) == null, 50);
Assert.assertTrue(result);
}
@@ -36,9 +38,10 @@ public class PlasmaFreeTest extends BaseTest {
Assert.assertEquals("hello", helloId.get());
Ray.internal().free(ImmutableList.of(helloId.getId()), true, true);
TaskId taskId = TaskId.fromBytes(Arrays.copyOf(helloId.getId().getBytes(), TaskId.LENGTH));
final boolean result = TestUtils.waitForCondition(
() -> !(((AbstractRayRuntime)Ray.internal()).getGcsClient())
.rayletTaskExistsInGcs(helloId.getId().getTaskId()), 50);
() -> !(((AbstractRayRuntime) Ray.internal()).getGcsClient())
.rayletTaskExistsInGcs(taskId), 50);
Assert.assertTrue(result);
}
@@ -1,10 +1,12 @@
package org.ray.api.test;
import java.util.Collections;
import org.ray.api.Ray;
import org.ray.api.TestUtils;
import org.ray.api.id.ObjectId;
import org.ray.runtime.AbstractRayRuntime;
import org.ray.runtime.objectstore.ObjectStoreProxy;
import org.ray.runtime.object.NativeRayObject;
import org.ray.runtime.object.ObjectStore;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -15,11 +17,15 @@ public class PlasmaStoreTest extends BaseTest {
TestUtils.skipTestUnderSingleProcess();
ObjectId objectId = ObjectId.fromRandom();
AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal();
ObjectStoreProxy objectInterface = runtime.getObjectStoreProxy();
objectInterface.put(objectId, 1);
Assert.assertEquals(objectInterface.<Integer>get(objectId), (Integer) 1);
objectInterface.put(objectId, 2);
ObjectStore objectStore = runtime.getObjectStore();
objectStore.putRaw(new NativeRayObject(new byte[]{1}, null), objectId);
Assert.assertEquals(
objectStore.getRaw(Collections.singletonList(objectId), -1).get(0).data[0],
(byte) 1);
objectStore.putRaw(new NativeRayObject(new byte[]{2}, null), objectId);
// Putting 2 objects with duplicate ID should fail but ignored.
Assert.assertEquals(objectInterface.<Integer>get(objectId), (Integer) 1);
Assert.assertEquals(
objectStore.getRaw(Collections.singletonList(objectId), -1).get(0).data[0],
(byte) 1);
}
}
@@ -1,23 +1,24 @@
package org.ray.api.test;
import org.ray.api.Ray;
import org.ray.api.RayPyActor;
import org.ray.api.id.ActorId;
import org.ray.api.id.JobId;
import org.ray.api.id.UniqueId;
import org.ray.runtime.RayPyActorImpl;
import org.ray.runtime.util.Serializer;
import org.ray.api.id.ObjectId;
import org.ray.runtime.AbstractRayRuntime;
import org.ray.runtime.object.NativeRayObject;
import org.ray.runtime.object.ObjectStore;
import org.testng.Assert;
import org.testng.annotations.Test;
public class RaySerializerTest {
public class RaySerializerTest extends BaseMultiLanguageTest {
@Test
public void testSerializePyActor() {
final ActorId pyActorId = ActorId.generateActorId(JobId.fromInt(1));
RayPyActor pyActor = new RayPyActorImpl(pyActorId, "test", "RaySerializerTest");
byte[] bytes = Serializer.encode(pyActor);
RayPyActor result = Serializer.decode(bytes);
Assert.assertEquals(result.getId(), pyActorId);
RayPyActor pyActor = Ray.createPyActor("test", "RaySerializerTest");
ObjectStore objectStore = ((AbstractRayRuntime) Ray.internal()).getObjectStore();
NativeRayObject nativeRayObject = objectStore.serialize(pyActor);
RayPyActor result = (RayPyActor) objectStore
.deserialize(nativeRayObject, ObjectId.fromRandom());
Assert.assertEquals(result.getId(), pyActor.getId());
Assert.assertEquals(result.getModuleName(), "test");
Assert.assertEquals(result.getClassName(), "RaySerializerTest");
}
@@ -50,36 +50,10 @@ public class UniqueIdTest {
Assert.assertTrue(id6.isNil());
}
@Test
public void testComputeReturnId() {
// Mock a taskId, and the lowest 4 bytes should be 0.
TaskId taskId = TaskId.fromHexString("123456789ABCDE123456789ABCDE");
ObjectId returnId = ObjectId.forReturn(taskId, 1);
Assert.assertEquals("123456789abcde123456789abcde00c001000000", returnId.toString());
Assert.assertEquals(returnId.getTaskId(), taskId);
returnId = ObjectId.forReturn(taskId, 0x01020304);
Assert.assertEquals("123456789abcde123456789abcde00c004030201", returnId.toString());
}
@Test
public void testComputePutId() {
// Mock a taskId, the lowest 4 bytes should be 0.
TaskId taskId = TaskId.fromHexString("123456789ABCDE123456789ABCDE");
ObjectId putId = ObjectId.forPut(taskId, 1);
Assert.assertEquals("123456789abcde123456789abcde008001000000".toLowerCase(), putId.toString());
putId = ObjectId.forPut(taskId, 0x01020304);
Assert.assertEquals("123456789abcde123456789abcde008004030201".toLowerCase(), putId.toString());
}
@Test
void testMurmurHash() {
UniqueId id = UniqueId.fromHexString("3131313131313131313132323232323232323232");
long remainder = Long.remainderUnsigned(IdUtil.murmurHashCode(id), 1000000000);
Assert.assertEquals(remainder, 787616861);
}
}