From 081708bdefb1bc14b9086e203c22113daa0a4d35 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Tue, 21 May 2019 17:13:48 +0800 Subject: [PATCH] [Java] Dynamic resource API in Java (#4824) --- java/api/src/main/java/org/ray/api/Ray.java | 15 +++++++ .../java/org/ray/api/runtime/RayRuntime.java | 9 ++++ .../org/ray/runtime/AbstractRayRuntime.java | 9 ++++ .../ray/runtime/raylet/MockRayletClient.java | 5 +++ .../org/ray/runtime/raylet/RayletClient.java | 2 + .../ray/runtime/raylet/RayletClientImpl.java | 7 +++ .../org/ray/api/test/DynamicResourceTest.java | 44 +++++++++++++++++++ ...org_ray_runtime_raylet_RayletClientImpl.cc | 18 ++++++++ .../org_ray_runtime_raylet_RayletClientImpl.h | 8 ++++ 9 files changed, 117 insertions(+) create mode 100644 java/test/src/main/java/org/ray/api/test/DynamicResourceTest.java diff --git a/java/api/src/main/java/org/ray/api/Ray.java b/java/api/src/main/java/org/ray/api/Ray.java index fa82ea685..3ebfc1668 100644 --- a/java/api/src/main/java/org/ray/api/Ray.java +++ b/java/api/src/main/java/org/ray/api/Ray.java @@ -123,6 +123,21 @@ public final class Ray extends RayCall { return runtime; } + /** + * Update the resource for the specified client. + * Set the resource for the specific node. + */ + public static void setResource(UniqueId nodeId, String resourceName, double capacity) { + runtime.setResource(resourceName, capacity, nodeId); + } + + /** + * Set the resource for local node. + */ + public static void setResource(String resourceName, double capacity) { + runtime.setResource(resourceName, capacity, UniqueId.NIL); + } + /** * Get the runtime context. */ diff --git a/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java b/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java index 521032316..7767253c5 100644 --- a/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java +++ b/java/api/src/main/java/org/ray/api/runtime/RayRuntime.java @@ -65,6 +65,15 @@ public interface RayRuntime { */ void free(List objectIds, boolean localOnly, boolean deleteCreatingTasks); + /** + * Set the resource for the specific node. + * + * @param resourceName The name of resource. + * @param capacity The capacity of the resource. + * @param nodeId The node that we want to set its resource. + */ + void setResource(String resourceName, double capacity, UniqueId nodeId); + /** * Invoke a remote function. * diff --git a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java index af8cff9d7..e77d9a6f5 100644 --- a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java @@ -210,6 +210,15 @@ public abstract class AbstractRayRuntime implements RayRuntime { rayletClient.freePlasmaObjects(objectIds, localOnly, deleteCreatingTasks); } + @Override + public void setResource(String resourceName, double capacity, UniqueId nodeId) { + Preconditions.checkArgument(Double.compare(capacity, 0) >= 0); + if (nodeId == null) { + nodeId = UniqueId.NIL; + } + rayletClient.setResource(resourceName, capacity, nodeId); + } + private List> splitIntoBatches(List objectIds) { List> batches = new ArrayList<>(); int objectsSize = objectIds.size(); diff --git a/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java b/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java index 385431c70..640789c3b 100644 --- a/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java +++ b/java/runtime/src/main/java/org/ray/runtime/raylet/MockRayletClient.java @@ -209,6 +209,11 @@ public class MockRayletClient implements RayletClient { throw new NotImplementedException("Not implemented."); } + @Override + public void setResource(String resourceName, double capacity, UniqueId nodeId) { + LOGGER.error("Not implemented under SINGLE_PROCESS mode."); + } + @Override public void destroy() { exec.shutdown(); diff --git a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClient.java b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClient.java index fc6fc75b0..19db27f6d 100644 --- a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClient.java +++ b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClient.java @@ -30,5 +30,7 @@ public interface RayletClient { void notifyActorResumedFromCheckpoint(UniqueId actorId, UniqueId checkpointId); + void setResource(String resourceName, double capacity, UniqueId nodeId); + void destroy(); } diff --git a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java index 0ed1f9c86..b46d6b611 100644 --- a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java +++ b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java @@ -308,6 +308,10 @@ public class RayletClientImpl implements RayletClient { return buffer; } + public void setResource(String resourceName, double capacity, UniqueId nodeId) { + nativeSetResource(client, resourceName, capacity, nodeId.getBytes()); + } + public void destroy() { nativeDestroy(client); } @@ -357,4 +361,7 @@ public class RayletClientImpl implements RayletClient { private static native void nativeNotifyActorResumedFromCheckpoint(long conn, byte[] actorId, byte[] checkpointId); + + private static native void nativeSetResource(long conn, String resourceName, double capacity, + byte[] nodeId) throws RayException; } diff --git a/java/test/src/main/java/org/ray/api/test/DynamicResourceTest.java b/java/test/src/main/java/org/ray/api/test/DynamicResourceTest.java new file mode 100644 index 000000000..ffda07322 --- /dev/null +++ b/java/test/src/main/java/org/ray/api/test/DynamicResourceTest.java @@ -0,0 +1,44 @@ +package org.ray.api.test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.List; +import org.ray.api.Ray; +import org.ray.api.RayObject; +import org.ray.api.TestUtils; +import org.ray.api.WaitResult; +import org.ray.api.annotation.RayRemote; +import org.ray.api.options.CallOptions; +import org.ray.api.runtimecontext.NodeInfo; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class DynamicResourceTest extends BaseTest { + + @RayRemote + public static String sayHi() { + return "hi"; + } + + @Test + public void testSetResource() { + TestUtils.skipTestUnderSingleProcess(); + CallOptions op1 = new CallOptions(ImmutableMap.of("A", 10.0)); + RayObject obj = Ray.call(DynamicResourceTest::sayHi, op1); + WaitResult result = Ray.wait(ImmutableList.of(obj), 1, 1000); + Assert.assertEquals(result.getReady().size(), 0); + + Ray.setResource("A", 10.0); + + // Assert node info. + List nodes = Ray.getRuntimeContext().getAllNodeInfo(); + Assert.assertEquals(nodes.size(), 1); + Assert.assertEquals(nodes.get(0).resources.get("A"), 10.0); + + // Assert ray call result. + result = Ray.wait(ImmutableList.of(obj), 1, 1000); + Assert.assertEquals(result.getReady().size(), 1); + Assert.assertEquals(Ray.get(obj.getId()), "hi"); + } + +} diff --git a/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc b/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc index eb9d2f0e5..ac32911ef 100644 --- a/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc +++ b/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc @@ -302,6 +302,24 @@ Java_org_ray_runtime_raylet_RayletClientImpl_nativeNotifyActorResumedFromCheckpo ThrowRayExceptionIfNotOK(env, status); } +/* + * Class: org_ray_runtime_raylet_RayletClientImpl + * Method: nativeSetResource + * Signature: (JLjava/lang/String;D[B)V + */ +JNIEXPORT void JNICALL +Java_org_ray_runtime_raylet_RayletClientImpl_nativeSetResource(JNIEnv *env, jclass, + jlong client, jstring resourceName, jdouble capacity, jbyteArray nodeId) { + auto raylet_client = reinterpret_cast(client); + UniqueIdFromJByteArray node_id(env, nodeId); + const char *native_resource_name = env->GetStringUTFChars(resourceName, JNI_FALSE); + + auto status = raylet_client->SetResource(native_resource_name, + static_cast(capacity), node_id.GetId()); + env->ReleaseStringUTFChars(resourceName, native_resource_name); + ThrowRayExceptionIfNotOK(env, status); +} + #ifdef __cplusplus } #endif diff --git a/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.h b/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.h index c00c7c009..91338a12e 100644 --- a/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.h +++ b/src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.h @@ -116,6 +116,14 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeNotifyActorResumedFromCheckpoint( JNIEnv *, jclass, jlong, jbyteArray, jbyteArray); +/* + * Class: org_ray_runtime_raylet_RayletClientImpl + * Method: nativeSetResource + * Signature: (JLjava/lang/String;D[B)V + */ +JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeSetResource( + JNIEnv *, jclass, jlong, jstring, jdouble, jbyteArray); + #ifdef __cplusplus } #endif