diff --git a/java/api/src/main/java/io/ray/api/Ray.java b/java/api/src/main/java/io/ray/api/Ray.java index da9047a66..fb71a3bac 100644 --- a/java/api/src/main/java/io/ray/api/Ray.java +++ b/java/api/src/main/java/io/ray/api/Ray.java @@ -87,6 +87,24 @@ public final class Ray extends RayCall { return internal().get(objectList); } + /** + * Wait for a list of RayObjects to be available, until specified number of objects are ready, or + * specified timeout has passed. + * + * @param waitList A list of object references to wait for. + * @param numReturns The number of objects that should be returned. + * @param timeoutMs The maximum time in milliseconds to wait before returning. + * @param fetchLocal If true, wait for the object to be downloaded onto the local node before + * returning it as ready. If false, ray.wait() will not trigger fetching of objects to the + * local node and will return immediately once the object is available anywhere in the + * cluster. + * @return Two lists, one containing locally available objects, one containing the rest. + */ + public static WaitResult wait( + List> waitList, int numReturns, int timeoutMs, boolean fetchLocal) { + return internal().wait(waitList, numReturns, timeoutMs, fetchLocal); + } + /** * Wait for a list of RayObjects to be locally available, until specified number of objects are * ready, or specified timeout has passed. @@ -97,30 +115,29 @@ public final class Ray extends RayCall { * @return Two lists, one containing locally available objects, one containing the rest. */ public static WaitResult wait(List> waitList, int numReturns, int timeoutMs) { - return internal().wait(waitList, numReturns, timeoutMs); + return wait(waitList, numReturns, timeoutMs, true); } /** - * A convenient helper method for Ray.wait. It will wait infinitely until specified number of - * objects are locally available. + * Wait for a list of RayObjects to be locally available, until specified number of objects are + * ready. * * @param waitList A list of object references to wait for. * @param numReturns The number of objects that should be returned. * @return Two lists, one containing locally available objects, one containing the rest. */ public static WaitResult wait(List> waitList, int numReturns) { - return internal().wait(waitList, numReturns, Integer.MAX_VALUE); + return wait(waitList, numReturns, Integer.MAX_VALUE); } /** - * A convenient helper method for Ray.wait. It will wait infinitely until all objects are locally - * available. + * Wait for a list of RayObjects to be locally available. * * @param waitList A list of object references to wait for. * @return Two lists, one containing locally available objects, one containing the rest. */ public static WaitResult wait(List> waitList) { - return internal().wait(waitList, waitList.size(), Integer.MAX_VALUE); + return wait(waitList, waitList.size()); } /** diff --git a/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java b/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java index 53da3d48d..ac5f44f3f 100644 --- a/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java +++ b/java/api/src/main/java/io/ray/api/runtime/RayRuntime.java @@ -53,15 +53,20 @@ public interface RayRuntime { List get(List> objectRefs); /** - * Wait for a list of RayObjects to be locally available, until specified number of objects are - * ready, or specified timeout has passed. + * Wait for a list of RayObjects to be available, until specified number of objects are ready, or + * specified timeout has passed. * * @param waitList A list of ObjectRef to wait for. * @param numReturns The number of objects that should be returned. * @param timeoutMs The maximum time in milliseconds to wait before returning. + * @param fetchLocal If true, wait for the object to be downloaded onto the local node before + * returning it as ready. If false, ray.wait() will not trigger fetching of objects to the + * local node and will return immediately once the object is available anywhere in the + * cluster. * @return Two lists, one containing locally available objects, one containing the rest. */ - WaitResult wait(List> waitList, int numReturns, int timeoutMs); + WaitResult wait( + List> waitList, int numReturns, int timeoutMs, boolean fetchLocal); /** * Free a list of objects from Plasma Store. diff --git a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java index f3478e4c6..15d9e9d76 100644 --- a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java @@ -105,8 +105,9 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal { } @Override - public WaitResult wait(List> waitList, int numReturns, int timeoutMs) { - return objectStore.wait(waitList, numReturns, timeoutMs); + public WaitResult wait( + List> waitList, int numReturns, int timeoutMs, boolean fetchLocal) { + return objectStore.wait(waitList, numReturns, timeoutMs, fetchLocal); } @Override diff --git a/java/runtime/src/main/java/io/ray/runtime/object/LocalModeObjectStore.java b/java/runtime/src/main/java/io/ray/runtime/object/LocalModeObjectStore.java index e1bfc64fa..cb5752d00 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/LocalModeObjectStore.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/LocalModeObjectStore.java @@ -60,7 +60,8 @@ public class LocalModeObjectStore extends ObjectStore { } @Override - public List wait(List objectIds, int numObjects, long timeoutMs) { + public List wait( + List objectIds, int numObjects, long timeoutMs, boolean fetchLocal) { waitInternal(objectIds, numObjects, timeoutMs); return objectIds.stream().map(pool::containsKey).collect(Collectors.toList()); } diff --git a/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java b/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java index 24dd5b8a2..c68709e10 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java @@ -45,8 +45,9 @@ public class NativeObjectStore extends ObjectStore { } @Override - public List wait(List objectIds, int numObjects, long timeoutMs) { - return nativeWait(toBinaryList(objectIds), numObjects, timeoutMs); + public List wait( + List objectIds, int numObjects, long timeoutMs, boolean fetchLocal) { + return nativeWait(toBinaryList(objectIds), numObjects, timeoutMs, fetchLocal); } @Override @@ -113,7 +114,7 @@ public class NativeObjectStore extends ObjectStore { private static native List nativeGet(List ids, long timeoutMs); private static native List nativeWait( - List objectIds, int numObjects, long timeoutMs); + List objectIds, int numObjects, long timeoutMs, boolean fetchLocal); private static native void nativeDelete(List objectIds, boolean localOnly); diff --git a/java/runtime/src/main/java/io/ray/runtime/object/ObjectStore.java b/java/runtime/src/main/java/io/ray/runtime/object/ObjectStore.java index 8711811b2..5e7b62603 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/ObjectStore.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/ObjectStore.java @@ -117,25 +117,36 @@ public abstract class ObjectStore { } /** - * Wait for a list of objects to appear in the object store. + * Wait for a list of RayObjects to be available, until specified number of objects are ready, or + * specified timeout has passed. * * @param objectIds IDs of the objects to wait for. * @param numObjects Number of objects that should appear. * @param timeoutMs Timeout in milliseconds, wait infinitely if it's negative. + * @param fetchLocal If true, wait for the object to be downloaded onto the local node before + * returning it as ready. If false, ray.wait() will not trigger fetching of objects to the + * local node and will return immediately once the object is available anywhere in the + * cluster. * @return A bitset that indicates each object has appeared or not. */ - public abstract List wait(List objectIds, int numObjects, long timeoutMs); + public abstract List wait( + List objectIds, int numObjects, long timeoutMs, boolean fetchLocal); /** - * Wait for a list of RayObjects to be locally available, until specified number of objects are - * ready, or specified timeout has passed. + * Wait for a list of RayObjects to be available, until specified number of objects are ready, or + * specified timeout has passed. * * @param waitList A list of object references to wait for. * @param numReturns The number of objects that should be returned. * @param timeoutMs The maximum time in milliseconds to wait before returning. + * @param fetchLocal If true, wait for the object to be downloaded onto the local node before + * returning it as ready. If false, ray.wait() will not trigger fetching of objects to the + * local node and will return immediately once the object is available anywhere in the + * cluster. * @return Two lists, one containing locally available objects, one containing the rest. */ - public WaitResult wait(List> waitList, int numReturns, int timeoutMs) { + public WaitResult wait( + List> waitList, int numReturns, int timeoutMs, boolean fetchLocal) { Preconditions.checkNotNull(waitList); if (waitList.isEmpty()) { return new WaitResult<>(Collections.emptyList(), Collections.emptyList()); @@ -144,7 +155,7 @@ public abstract class ObjectStore { List ids = waitList.stream().map(ref -> ((ObjectRefImpl) ref).getId()).collect(Collectors.toList()); - List ready = wait(ids, numReturns, timeoutMs); + List ready = wait(ids, numReturns, timeoutMs, fetchLocal); List> readyList = new ArrayList<>(); List> unreadyList = new ArrayList<>(); diff --git a/java/test.sh b/java/test.sh index 8336c1da1..f946fd91a 100755 --- a/java/test.sh +++ b/java/test.sh @@ -41,6 +41,15 @@ bazel build //java:gen_maven_deps echo "Build test jar." bazel build //java:all_tests_deploy.jar +java/generate_jni_header_files.sh + +if ! git diff --exit-code -- java src/ray/core_worker/lib/java; then + echo "Files are changed after build. Common cases are:" + echo " * Java native methods doesn't match JNI files. You need to either update Java code or JNI code." + echo " * pom_template.xml and pom.xml doesn't match. You need to either update pom_template.xml or pom.xml." + exit 1 +fi + # Enable multi-worker feature in Java test TEST_ARGS=(-Dray.job.num-java-workers-per-process=10) diff --git a/java/test/pom.xml b/java/test/pom.xml index c9e34821b..f401f3cff 100644 --- a/java/test/pom.xml +++ b/java/test/pom.xml @@ -117,41 +117,6 @@ - - - com.diffplug.spotless - spotless-maven-plugin - 2.6.1 - - - - - - - - - .java - - - - - - - - true - 4 - - - - - - - 1.7 - - - - - diff --git a/java/test/src/main/java/io/ray/test/PlasmaFreeTest.java b/java/test/src/main/java/io/ray/test/PlasmaFreeTest.java index 3e49ff798..b8235b8d8 100644 --- a/java/test/src/main/java/io/ray/test/PlasmaFreeTest.java +++ b/java/test/src/main/java/io/ray/test/PlasmaFreeTest.java @@ -25,7 +25,7 @@ public class PlasmaFreeTest extends BaseTest { () -> !TestUtils.getRuntime() .getObjectStore() - .wait(ImmutableList.of(((ObjectRefImpl) helloId).getId()), 1, 0) + .wait(ImmutableList.of(((ObjectRefImpl) helloId).getId()), 1, 0, true) .get(0), 50); if (TestUtils.isSingleProcessMode()) { diff --git a/java/test/src/main/java/io/ray/test/ReferenceCountingTest.java b/java/test/src/main/java/io/ray/test/ReferenceCountingTest.java index aa5658195..a98f95959 100644 --- a/java/test/src/main/java/io/ray/test/ReferenceCountingTest.java +++ b/java/test/src/main/java/io/ray/test/ReferenceCountingTest.java @@ -119,7 +119,7 @@ public class ReferenceCountingTest extends BaseTest { TestUtils.getRuntime().getObjectStore().getRaw(ImmutableList.of(objectId), Long.MAX_VALUE); } else { List result = - TestUtils.getRuntime().getObjectStore().wait(ImmutableList.of(objectId), 1, 100); + TestUtils.getRuntime().getObjectStore().wait(ImmutableList.of(objectId), 1, 100, true); Assert.assertFalse(result.get(0)); } } diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h index 69c05cf93..daa4e05a9 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h @@ -25,7 +25,7 @@ extern "C" { * Class: io_ray_runtime_RayNativeRuntime * Method: nativeInitialize * Signature: - * (ILjava/lang/String;ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;[BLio/ray/runtime/gcs/GcsClientOptions;ILjava/lang/String;Ljava/util/Map;)V + * (ILjava/lang/String;ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;[BLio/ray/runtime/gcs/GcsClientOptions;ILjava/lang/String;Ljava/util/Map;[B)V */ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize( JNIEnv *, jclass, jint, jstring, jint, jstring, jstring, jstring, jbyteArray, jobject, diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h index b1da06e57..fd194de55 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.h @@ -52,7 +52,7 @@ JNIEXPORT jobject JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeGet /* * Class: io_ray_runtime_object_NativeObjectStore * Method: nativeWait - * Signature: (Ljava/util/List;IJ)Ljava/util/List; + * Signature: (Ljava/util/List;IJZ)Ljava/util/List; */ JNIEXPORT jobject JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeWait( JNIEnv *, jclass, jobject, jint, jlong, jboolean); @@ -68,7 +68,7 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeDelete /* * Class: io_ray_runtime_object_NativeObjectStore * Method: nativeAddLocalReference - * Signature: ([B)V + * Signature: ([B[B)V */ JNIEXPORT void JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeAddLocalReference(JNIEnv *, jclass, @@ -78,7 +78,7 @@ Java_io_ray_runtime_object_NativeObjectStore_nativeAddLocalReference(JNIEnv *, j /* * Class: io_ray_runtime_object_NativeObjectStore * Method: nativeRemoveLocalReference - * Signature: ([B)V + * Signature: ([B[B)V */ JNIEXPORT void JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativeRemoveLocalReference(JNIEnv *, jclass, diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskExecutor.h b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskExecutor.h index bf376aa12..ab7ec077d 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskExecutor.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskExecutor.h @@ -21,25 +21,6 @@ #ifdef __cplusplus extern "C" { #endif -#undef io_ray_runtime_task_NativeTaskExecutor_NUM_ACTOR_CHECKPOINTS_TO_KEEP -#define io_ray_runtime_task_NativeTaskExecutor_NUM_ACTOR_CHECKPOINTS_TO_KEEP 20L -/* - * Class: io_ray_runtime_task_NativeTaskExecutor - * Method: nativePrepareCheckpoint - * Signature: ()[B - */ -JNIEXPORT jbyteArray JNICALL -Java_io_ray_runtime_task_NativeTaskExecutor_nativePrepareCheckpoint(JNIEnv *, jclass); - -/* - * Class: io_ray_runtime_task_NativeTaskExecutor - * Method: nativeNotifyActorResumedFromCheckpoint - * Signature: ([B)V - */ -JNIEXPORT void JNICALL -Java_io_ray_runtime_task_NativeTaskExecutor_nativeNotifyActorResumedFromCheckpoint( - JNIEnv *, jclass, jbyteArray); - #ifdef __cplusplus } #endif diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.h b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.h index 8ea517b60..d57e2d573 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.h @@ -74,13 +74,13 @@ Java_io_ray_runtime_task_NativeTaskSubmitter_nativeRemovePlacementGroup(JNIEnv * /* * Class: io_ray_runtime_task_NativeTaskSubmitter * Method: nativeWaitPlacementGroupReady - * Signature: (J)Z + * Signature: ([BI)Z */ JNIEXPORT jboolean JNICALL -Java_io_ray_runtime_task_NativeTaskSubmitter__nativeWaitPlacementGroupReady(JNIEnv *, - jclass, - jbyteArray, - jint); +Java_io_ray_runtime_task_NativeTaskSubmitter_nativeWaitPlacementGroupReady(JNIEnv *, + jclass, + jbyteArray, + jint); #ifdef __cplusplus }