[Java] Add fetchLocal parameter in Ray.wait() (#13604)

This commit is contained in:
Kai Yang
2021-01-22 17:55:00 +08:00
committed by GitHub
parent 00c14ce4a4
commit 90f1e408de
14 changed files with 78 additions and 87 deletions
+24 -7
View File
@@ -87,6 +87,24 @@ public final class Ray extends RayCall {
return internal().get(objectList);
}
/**
* Wait for a list of RayObjects to be available, until specified number of objects are ready, or
* specified timeout has passed.
*
* @param waitList A list of object references to wait for.
* @param numReturns The number of objects that should be returned.
* @param timeoutMs The maximum time in milliseconds to wait before returning.
* @param fetchLocal If true, wait for the object to be downloaded onto the local node before
* returning it as ready. If false, ray.wait() will not trigger fetching of objects to the
* local node and will return immediately once the object is available anywhere in the
* cluster.
* @return Two lists, one containing locally available objects, one containing the rest.
*/
public static <T> WaitResult<T> wait(
List<ObjectRef<T>> waitList, int numReturns, int timeoutMs, boolean fetchLocal) {
return internal().wait(waitList, numReturns, timeoutMs, fetchLocal);
}
/**
* Wait for a list of RayObjects to be locally available, until specified number of objects are
* ready, or specified timeout has passed.
@@ -97,30 +115,29 @@ public final class Ray extends RayCall {
* @return Two lists, one containing locally available objects, one containing the rest.
*/
public static <T> WaitResult<T> wait(List<ObjectRef<T>> waitList, int numReturns, int timeoutMs) {
return internal().wait(waitList, numReturns, timeoutMs);
return wait(waitList, numReturns, timeoutMs, true);
}
/**
* A convenient helper method for Ray.wait. It will wait infinitely until specified number of
* objects are locally available.
* Wait for a list of RayObjects to be locally available, until specified number of objects are
* ready.
*
* @param waitList A list of object references to wait for.
* @param numReturns The number of objects that should be returned.
* @return Two lists, one containing locally available objects, one containing the rest.
*/
public static <T> WaitResult<T> wait(List<ObjectRef<T>> waitList, int numReturns) {
return internal().wait(waitList, numReturns, Integer.MAX_VALUE);
return wait(waitList, numReturns, Integer.MAX_VALUE);
}
/**
* A convenient helper method for Ray.wait. It will wait infinitely until all objects are locally
* available.
* Wait for a list of RayObjects to be locally available.
*
* @param waitList A list of object references to wait for.
* @return Two lists, one containing locally available objects, one containing the rest.
*/
public static <T> WaitResult<T> wait(List<ObjectRef<T>> waitList) {
return internal().wait(waitList, waitList.size(), Integer.MAX_VALUE);
return wait(waitList, waitList.size());
}
/**
@@ -53,15 +53,20 @@ public interface RayRuntime {
<T> List<T> get(List<ObjectRef<T>> objectRefs);
/**
* Wait for a list of RayObjects to be locally available, until specified number of objects are
* ready, or specified timeout has passed.
* Wait for a list of RayObjects to be available, until specified number of objects are ready, or
* specified timeout has passed.
*
* @param waitList A list of ObjectRef to wait for.
* @param numReturns The number of objects that should be returned.
* @param timeoutMs The maximum time in milliseconds to wait before returning.
* @param fetchLocal If true, wait for the object to be downloaded onto the local node before
* returning it as ready. If false, ray.wait() will not trigger fetching of objects to the
* local node and will return immediately once the object is available anywhere in the
* cluster.
* @return Two lists, one containing locally available objects, one containing the rest.
*/
<T> WaitResult<T> wait(List<ObjectRef<T>> waitList, int numReturns, int timeoutMs);
<T> WaitResult<T> wait(
List<ObjectRef<T>> waitList, int numReturns, int timeoutMs, boolean fetchLocal);
/**
* Free a list of objects from Plasma Store.
@@ -105,8 +105,9 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
}
@Override
public <T> WaitResult<T> wait(List<ObjectRef<T>> waitList, int numReturns, int timeoutMs) {
return objectStore.wait(waitList, numReturns, timeoutMs);
public <T> WaitResult<T> wait(
List<ObjectRef<T>> waitList, int numReturns, int timeoutMs, boolean fetchLocal) {
return objectStore.wait(waitList, numReturns, timeoutMs, fetchLocal);
}
@Override
@@ -60,7 +60,8 @@ public class LocalModeObjectStore extends ObjectStore {
}
@Override
public List<Boolean> wait(List<ObjectId> objectIds, int numObjects, long timeoutMs) {
public List<Boolean> wait(
List<ObjectId> objectIds, int numObjects, long timeoutMs, boolean fetchLocal) {
waitInternal(objectIds, numObjects, timeoutMs);
return objectIds.stream().map(pool::containsKey).collect(Collectors.toList());
}
@@ -45,8 +45,9 @@ public class NativeObjectStore extends ObjectStore {
}
@Override
public List<Boolean> wait(List<ObjectId> objectIds, int numObjects, long timeoutMs) {
return nativeWait(toBinaryList(objectIds), numObjects, timeoutMs);
public List<Boolean> wait(
List<ObjectId> objectIds, int numObjects, long timeoutMs, boolean fetchLocal) {
return nativeWait(toBinaryList(objectIds), numObjects, timeoutMs, fetchLocal);
}
@Override
@@ -113,7 +114,7 @@ public class NativeObjectStore extends ObjectStore {
private static native List<NativeRayObject> nativeGet(List<byte[]> ids, long timeoutMs);
private static native List<Boolean> nativeWait(
List<byte[]> objectIds, int numObjects, long timeoutMs);
List<byte[]> objectIds, int numObjects, long timeoutMs, boolean fetchLocal);
private static native void nativeDelete(List<byte[]> objectIds, boolean localOnly);
@@ -117,25 +117,36 @@ public abstract class ObjectStore {
}
/**
* Wait for a list of objects to appear in the object store.
* Wait for a list of RayObjects to be available, until specified number of objects are ready, or
* specified timeout has passed.
*
* @param objectIds IDs of the objects to wait for.
* @param numObjects Number of objects that should appear.
* @param timeoutMs Timeout in milliseconds, wait infinitely if it's negative.
* @param fetchLocal If true, wait for the object to be downloaded onto the local node before
* returning it as ready. If false, ray.wait() will not trigger fetching of objects to the
* local node and will return immediately once the object is available anywhere in the
* cluster.
* @return A bitset that indicates each object has appeared or not.
*/
public abstract List<Boolean> wait(List<ObjectId> objectIds, int numObjects, long timeoutMs);
public abstract List<Boolean> wait(
List<ObjectId> objectIds, int numObjects, long timeoutMs, boolean fetchLocal);
/**
* Wait for a list of RayObjects to be locally available, until specified number of objects are
* ready, or specified timeout has passed.
* Wait for a list of RayObjects to be available, until specified number of objects are ready, or
* specified timeout has passed.
*
* @param waitList A list of object references to wait for.
* @param numReturns The number of objects that should be returned.
* @param timeoutMs The maximum time in milliseconds to wait before returning.
* @param fetchLocal If true, wait for the object to be downloaded onto the local node before
* returning it as ready. If false, ray.wait() will not trigger fetching of objects to the
* local node and will return immediately once the object is available anywhere in the
* cluster.
* @return Two lists, one containing locally available objects, one containing the rest.
*/
public <T> WaitResult<T> wait(List<ObjectRef<T>> waitList, int numReturns, int timeoutMs) {
public <T> WaitResult<T> wait(
List<ObjectRef<T>> waitList, int numReturns, int timeoutMs, boolean fetchLocal) {
Preconditions.checkNotNull(waitList);
if (waitList.isEmpty()) {
return new WaitResult<>(Collections.emptyList(), Collections.emptyList());
@@ -144,7 +155,7 @@ public abstract class ObjectStore {
List<ObjectId> ids =
waitList.stream().map(ref -> ((ObjectRefImpl<?>) ref).getId()).collect(Collectors.toList());
List<Boolean> ready = wait(ids, numReturns, timeoutMs);
List<Boolean> ready = wait(ids, numReturns, timeoutMs, fetchLocal);
List<ObjectRef<T>> readyList = new ArrayList<>();
List<ObjectRef<T>> unreadyList = new ArrayList<>();
+9
View File
@@ -41,6 +41,15 @@ bazel build //java:gen_maven_deps
echo "Build test jar."
bazel build //java:all_tests_deploy.jar
java/generate_jni_header_files.sh
if ! git diff --exit-code -- java src/ray/core_worker/lib/java; then
echo "Files are changed after build. Common cases are:"
echo " * Java native methods doesn't match JNI files. You need to either update Java code or JNI code."
echo " * pom_template.xml and pom.xml doesn't match. You need to either update pom_template.xml or pom.xml."
exit 1
fi
# Enable multi-worker feature in Java test
TEST_ARGS=(-Dray.job.num-java-workers-per-process=10)
-35
View File
@@ -117,41 +117,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.6.1</version>
<configuration>
<!-- optional: limit format enforcement to just the files changed by this feature branch -->
<!-- <ratchetFrom>origin/main</ratchetFrom>-->
<formats>
<!-- you can define as many formats as you want, each is independent -->
<format>
<!-- define the files to apply to -->
<includes>
<include>.java</include>
</includes>
<excludes>
</excludes>
<!-- define the steps to apply to those files -->
<trimTrailingWhitespace/>
<endWithNewline/>
<indent>
<tabs>true</tabs>
<spacesPerTab>4</spacesPerTab>
</indent>
</format>
</formats>
<!-- define a language-specific format -->
<java>
<googleJavaFormat>
<version>1.7</version> <!-- optional -->
<style>GOOGLE</style> <!-- or AOSP (optional) -->
</googleJavaFormat>
</java>
</configuration>
</plugin>
</plugins>
</build>
</project>
@@ -25,7 +25,7 @@ public class PlasmaFreeTest extends BaseTest {
() ->
!TestUtils.getRuntime()
.getObjectStore()
.wait(ImmutableList.of(((ObjectRefImpl<String>) helloId).getId()), 1, 0)
.wait(ImmutableList.of(((ObjectRefImpl<String>) helloId).getId()), 1, 0, true)
.get(0),
50);
if (TestUtils.isSingleProcessMode()) {
@@ -119,7 +119,7 @@ public class ReferenceCountingTest extends BaseTest {
TestUtils.getRuntime().getObjectStore().getRaw(ImmutableList.of(objectId), Long.MAX_VALUE);
} else {
List<Boolean> result =
TestUtils.getRuntime().getObjectStore().wait(ImmutableList.of(objectId), 1, 100);
TestUtils.getRuntime().getObjectStore().wait(ImmutableList.of(objectId), 1, 100, true);
Assert.assertFalse(result.get(0));
}
}