[Java] Add Plasma Free to Java code path (#2802)

This commit is contained in:
Yuhong Guo
2018-09-04 15:28:23 +08:00
committed by Hao Chen
parent 25ffe57a5c
commit dfb7c2be1e
9 changed files with 125 additions and 4 deletions
+1 -1
View File
@@ -108,7 +108,7 @@ public final class Ray extends RayCall {
/**
* Get the underlying runtime instance.
*/
static RayRuntime internal() {
public static RayRuntime internal() {
return runtime;
}
}
@@ -52,6 +52,14 @@ public interface RayRuntime {
*/
<T> WaitResult<T> wait(List<RayObject<T>> waitList, int numReturns, int timeoutMs);
/**
* Free a list of objects from Plasma Store.
*
* @param objectIds The object ids to free.
* @param localOnly Whether only free objects for local object store or not.
*/
void free(List<UniqueId> objectIds, boolean localOnly);
/**
* Invoke a remote function.
*
@@ -305,6 +305,11 @@ public abstract class AbstractRayRuntime implements RayRuntime {
}
}
@Override
public void free(List<UniqueId> objectIds, boolean localOnly) {
localSchedulerClient.freePlasmaObjects(objectIds, localOnly);
}
private List<List<UniqueId>> splitIntoBatches(List<UniqueId> objectIds, int batchSize) {
List<List<UniqueId>> batches = new ArrayList<>();
int objectsSize = objectIds.size();
@@ -24,4 +24,6 @@ public interface LocalSchedulerLink {
UniqueId generateTaskId(UniqueId driverId, UniqueId parentTaskId, int taskIndex);
List<byte[]> wait(byte[][] objectIds, int timeoutMs, int numReturns);
void freePlasmaObjects(List<UniqueId> objectIds, boolean localOnly);
}
@@ -99,4 +99,9 @@ public class MockLocalScheduler implements LocalSchedulerLink {
public List<byte[]> wait(byte[][] objectIds, int timeoutMs, int numReturns) {
return store.wait(objectIds, timeoutMs, numReturns);
}
@Override
public void freePlasmaObjects(List<UniqueId> objectIds, boolean localOnly) {
return;
}
}
@@ -120,6 +120,12 @@ public class DefaultLocalSchedulerClient implements LocalSchedulerLink {
nativeNotifyUnblocked(client);
}
@Override
public void freePlasmaObjects(List<UniqueId> objectIds, boolean localOnly) {
byte[][] objectIdsArray = getIdBytes(objectIds);
nativeFreePlasmaObjects(client, objectIdsArray, localOnly);
}
public static TaskSpec taskInfo2Spec(ByteBuffer bb) {
bb.order(ByteOrder.LITTLE_ENDIAN);
TaskInfo info = TaskInfo.getRootAsTaskInfo(bb);
@@ -277,9 +283,10 @@ public class DefaultLocalSchedulerClient implements LocalSchedulerLink {
/// 1) pushd $Dir/java/runtime-native/target/classes
/// 2) javah -classpath .:$Dir/java/runtime-common/target/classes/:$Dir/java/api/target/classes/
/// org.ray.spi.impl.DefaultLocalSchedulerClient
/// 3) cp org_ray_spi_impl_DefaultLocalSchedulerClient.h $Dir/src/local_scheduler/lib/java/
/// 4) vim $Dir/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc
/// 5) popd
/// 3) clang-format -i org_ray_spi_impl_DefaultLocalSchedulerClient.h
/// 4) cp org_ray_spi_impl_DefaultLocalSchedulerClient.h $Dir/src/local_scheduler/lib/java/
/// 5) vim $Dir/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc
/// 6) popd
private static native long nativeInit(String localSchedulerSocket, byte[] workerId,
boolean isWorker, byte[] driverTaskId, boolean useRaylet);
@@ -305,4 +312,7 @@ public class DefaultLocalSchedulerClient implements LocalSchedulerLink {
private static native byte[] nativeGenerateTaskId(byte[] driverId, byte[] parentTaskId,
int taskIndex);
private static native void nativeFreePlasmaObjects(long conn, byte[][] objectIds,
boolean localOnly);
}
@@ -0,0 +1,53 @@
package org.ray.api.test;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ray.api.Ray;
import org.ray.api.RayObject;
import org.ray.api.WaitResult;
import org.ray.api.annotation.RayRemote;
import org.ray.api.id.UniqueId;
import org.ray.core.AbstractRayRuntime;
@RunWith(MyRunner.class)
public class PlasmaFreeTest {
@RayRemote
private static String hello() {
return "hello";
}
@Test
public void test() {
Assume.assumeTrue(AbstractRayRuntime.getParams().use_raylet);
RayObject<String> helloId = Ray.call(PlasmaFreeTest::hello);
String helloString = helloId.get();
Assert.assertEquals("hello", helloString);
List<RayObject<String>> waitFor = ImmutableList.of(helloId);
WaitResult<String> waitResult = Ray.wait(waitFor, 1, 2 * 1000);
List<RayObject<String>> readyOnes = waitResult.getReady();
List<RayObject<String>> unreadyOnes = waitResult.getUnready();
Assert.assertEquals(1, readyOnes.size());
Assert.assertEquals(0, unreadyOnes.size());
List<UniqueId> freeList = new ArrayList<>();
freeList.add(helloId.getId());
Ray.internal().free(freeList, true);
// Flush: trigger the release function because Plasma Client has cache.
for (int i = 0; i < 128; i++) {
Ray.call(PlasmaFreeTest::hello).get();
}
waitResult = Ray.wait(waitFor, 1, 2 * 1000);
readyOnes = waitResult.getReady();
unreadyOnes = waitResult.getUnready();
Assert.assertEquals(0, readyOnes.size());
Assert.assertEquals(1, unreadyOnes.size());
}
}