diff --git a/java/api/src/main/java/org/ray/api/Ray.java b/java/api/src/main/java/org/ray/api/Ray.java index a0afc55dd..a7d6ed313 100644 --- a/java/api/src/main/java/org/ray/api/Ray.java +++ b/java/api/src/main/java/org/ray/api/Ray.java @@ -108,7 +108,7 @@ public final class Ray extends RayCall { /** * Get the underlying runtime instance. */ - static RayRuntime internal() { + public static RayRuntime internal() { return runtime; } } diff --git a/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java b/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java index 1ff44aa35..d609d4de5 100644 --- a/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java +++ b/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java @@ -52,6 +52,14 @@ public interface RayRuntime { */ WaitResult wait(List> waitList, int numReturns, int timeoutMs); + /** + * Free a list of objects from Plasma Store. + * + * @param objectIds The object ids to free. + * @param localOnly Whether only free objects for local object store or not. + */ + void free(List objectIds, boolean localOnly); + /** * Invoke a remote function. * diff --git a/java/runtime-common/src/main/java/org/ray/core/AbstractRayRuntime.java b/java/runtime-common/src/main/java/org/ray/core/AbstractRayRuntime.java index 9234a65a0..31ef679cf 100644 --- a/java/runtime-common/src/main/java/org/ray/core/AbstractRayRuntime.java +++ b/java/runtime-common/src/main/java/org/ray/core/AbstractRayRuntime.java @@ -305,6 +305,11 @@ public abstract class AbstractRayRuntime implements RayRuntime { } } + @Override + public void free(List objectIds, boolean localOnly) { + localSchedulerClient.freePlasmaObjects(objectIds, localOnly); + } + private List> splitIntoBatches(List objectIds, int batchSize) { List> batches = new ArrayList<>(); int objectsSize = objectIds.size(); diff --git a/java/runtime-common/src/main/java/org/ray/spi/LocalSchedulerLink.java b/java/runtime-common/src/main/java/org/ray/spi/LocalSchedulerLink.java index 04e201149..aa741b1a2 100644 --- a/java/runtime-common/src/main/java/org/ray/spi/LocalSchedulerLink.java +++ b/java/runtime-common/src/main/java/org/ray/spi/LocalSchedulerLink.java @@ -24,4 +24,6 @@ public interface LocalSchedulerLink { UniqueId generateTaskId(UniqueId driverId, UniqueId parentTaskId, int taskIndex); List wait(byte[][] objectIds, int timeoutMs, int numReturns); + + void freePlasmaObjects(List objectIds, boolean localOnly); } diff --git a/java/runtime-dev/src/main/java/org/ray/spi/impl/MockLocalScheduler.java b/java/runtime-dev/src/main/java/org/ray/spi/impl/MockLocalScheduler.java index 795fd5975..0f60a8a3b 100644 --- a/java/runtime-dev/src/main/java/org/ray/spi/impl/MockLocalScheduler.java +++ b/java/runtime-dev/src/main/java/org/ray/spi/impl/MockLocalScheduler.java @@ -99,4 +99,9 @@ public class MockLocalScheduler implements LocalSchedulerLink { public List wait(byte[][] objectIds, int timeoutMs, int numReturns) { return store.wait(objectIds, timeoutMs, numReturns); } + + @Override + public void freePlasmaObjects(List objectIds, boolean localOnly) { + return; + } } diff --git a/java/runtime-native/src/main/java/org/ray/spi/impl/DefaultLocalSchedulerClient.java b/java/runtime-native/src/main/java/org/ray/spi/impl/DefaultLocalSchedulerClient.java index 35eb25f2b..8ee86babc 100644 --- a/java/runtime-native/src/main/java/org/ray/spi/impl/DefaultLocalSchedulerClient.java +++ b/java/runtime-native/src/main/java/org/ray/spi/impl/DefaultLocalSchedulerClient.java @@ -120,6 +120,12 @@ public class DefaultLocalSchedulerClient implements LocalSchedulerLink { nativeNotifyUnblocked(client); } + @Override + public void freePlasmaObjects(List objectIds, boolean localOnly) { + byte[][] objectIdsArray = getIdBytes(objectIds); + nativeFreePlasmaObjects(client, objectIdsArray, localOnly); + } + public static TaskSpec taskInfo2Spec(ByteBuffer bb) { bb.order(ByteOrder.LITTLE_ENDIAN); TaskInfo info = TaskInfo.getRootAsTaskInfo(bb); @@ -277,9 +283,10 @@ public class DefaultLocalSchedulerClient implements LocalSchedulerLink { /// 1) pushd $Dir/java/runtime-native/target/classes /// 2) javah -classpath .:$Dir/java/runtime-common/target/classes/:$Dir/java/api/target/classes/ /// org.ray.spi.impl.DefaultLocalSchedulerClient - /// 3) cp org_ray_spi_impl_DefaultLocalSchedulerClient.h $Dir/src/local_scheduler/lib/java/ - /// 4) vim $Dir/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc - /// 5) popd + /// 3) clang-format -i org_ray_spi_impl_DefaultLocalSchedulerClient.h + /// 4) cp org_ray_spi_impl_DefaultLocalSchedulerClient.h $Dir/src/local_scheduler/lib/java/ + /// 5) vim $Dir/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc + /// 6) popd private static native long nativeInit(String localSchedulerSocket, byte[] workerId, boolean isWorker, byte[] driverTaskId, boolean useRaylet); @@ -305,4 +312,7 @@ public class DefaultLocalSchedulerClient implements LocalSchedulerLink { private static native byte[] nativeGenerateTaskId(byte[] driverId, byte[] parentTaskId, int taskIndex); + private static native void nativeFreePlasmaObjects(long conn, byte[][] objectIds, + boolean localOnly); + } diff --git a/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java b/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java new file mode 100644 index 000000000..5e1453904 --- /dev/null +++ b/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java @@ -0,0 +1,53 @@ +package org.ray.api.test; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.List; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.ray.api.Ray; +import org.ray.api.RayObject; +import org.ray.api.WaitResult; +import org.ray.api.annotation.RayRemote; +import org.ray.api.id.UniqueId; +import org.ray.core.AbstractRayRuntime; + + +@RunWith(MyRunner.class) +public class PlasmaFreeTest { + + @RayRemote + private static String hello() { + return "hello"; + } + + @Test + public void test() { + Assume.assumeTrue(AbstractRayRuntime.getParams().use_raylet); + RayObject helloId = Ray.call(PlasmaFreeTest::hello); + String helloString = helloId.get(); + Assert.assertEquals("hello", helloString); + List> waitFor = ImmutableList.of(helloId); + WaitResult waitResult = Ray.wait(waitFor, 1, 2 * 1000); + List> readyOnes = waitResult.getReady(); + List> unreadyOnes = waitResult.getUnready(); + Assert.assertEquals(1, readyOnes.size()); + Assert.assertEquals(0, unreadyOnes.size()); + + List freeList = new ArrayList<>(); + freeList.add(helloId.getId()); + Ray.internal().free(freeList, true); + // Flush: trigger the release function because Plasma Client has cache. + for (int i = 0; i < 128; i++) { + Ray.call(PlasmaFreeTest::hello).get(); + } + + waitResult = Ray.wait(waitFor, 1, 2 * 1000); + readyOnes = waitResult.getReady(); + unreadyOnes = waitResult.getUnready(); + Assert.assertEquals(0, readyOnes.size()); + Assert.assertEquals(1, unreadyOnes.size()); + } +} diff --git a/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc b/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc index 4ffee0a05..6e54cd6d0 100644 --- a/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc +++ b/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc @@ -279,6 +279,31 @@ Java_org_ray_spi_impl_DefaultLocalSchedulerClient_nativeGenerateTaskId( return result; } +/* + * Class: org_ray_spi_impl_DefaultLocalSchedulerClient + * Method: nativeFreePlasmaObjects + * Signature: ([[BZ)V + */ +JNIEXPORT void JNICALL +Java_org_ray_spi_impl_DefaultLocalSchedulerClient_nativeFreePlasmaObjects( + JNIEnv *env, + jclass, + jlong client, + jobjectArray objectIds, + jboolean localOnly) { + std::vector object_ids; + auto len = env->GetArrayLength(objectIds); + for (int i = 0; i < len; i++) { + jbyteArray object_id_bytes = + static_cast(env->GetObjectArrayElement(objectIds, i)); + UniqueIdFromJByteArray object_id(env, object_id_bytes); + object_ids.push_back(*object_id.PID); + env->DeleteLocalRef(object_id_bytes); + } + auto conn = reinterpret_cast(client); + local_scheduler_free_objects_in_object_store(conn, object_ids, localOnly); +} + #ifdef __cplusplus } #endif diff --git a/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.h b/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.h index 223896158..cafeb643f 100644 --- a/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.h +++ b/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.h @@ -120,6 +120,19 @@ Java_org_ray_spi_impl_DefaultLocalSchedulerClient_nativeGenerateTaskId( jbyteArray, jint); +/* + * Class: org_ray_spi_impl_DefaultLocalSchedulerClient + * Method: nativeFreePlasmaObjects + * Signature: (J[[BZ)V + */ +JNIEXPORT void JNICALL +Java_org_ray_spi_impl_DefaultLocalSchedulerClient_nativeFreePlasmaObjects( + JNIEnv *, + jclass, + jlong, + jobjectArray, + jboolean); + #ifdef __cplusplus } #endif