[Java] Support dynamically defining resources when submitting task. (#3070)

## What do these changes do?
Before this PR, if we want to specify some resources, we must do as following codes:
```java
@RayRemote(Resources={ResourceItem("CPU", 10)})
public static void f1() {
// do sth
}

@RayRemote(Resources={ResourceItem("CPU", 10)})
class Demo {
// sth
}
```
Unfortunately, it's no way for us to create another actor or task with different resources required.

After this PR, the thing will be:
```java
ActorCreationOptions option = new ActorCreationOptions(); 
option.resources.put("CPU", 4.0);
RayActor<Echo> echo1 = Ray.createActor(Echo::new, option);
option.resources.put("Res-A", 4.0);
RayActor<Echo> echo2 = Ray.createActor(Echo::new, option);


//if we don't specify resource,  the resources will be `{"cpu":0.0}` by default.
Ray.call(Echo::echo, echo2, 100);
```


## Related issue number
N/A
This commit is contained in:
Wang Qing
2018-10-19 21:22:32 +08:00
committed by Hao Chen
parent 9d23fa03c9
commit b410ee0d29
11 changed files with 1433 additions and 399 deletions
File diff suppressed because it is too large Load Diff
@@ -15,10 +15,4 @@ import java.lang.annotation.Target;
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface RayRemote {
/**
* Defines the quantity of various custom resources to reserve
* for this task or for the lifetime of the actor.
* @return an array of custom resource items.
*/
ResourceItem[] resources() default {};
}
@@ -1,28 +0,0 @@
package org.ray.api.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Represents a custom resource, including its name and quantity.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.ANNOTATION_TYPE)
public @interface ResourceItem {
/**
* Name of this resource, must not be null or empty.
*/
String name();
/**
* Quantity of this resource.
*/
double value() default 0;
}
@@ -0,0 +1,18 @@
package org.ray.api.options;
import java.util.Map;
/**
* The options for creating actor.
*/
public class ActorCreationOptions extends BaseTaskOptions {
public ActorCreationOptions() {
super();
}
public ActorCreationOptions(Map<String, Double> resources) {
super(resources);
}
}
@@ -0,0 +1,20 @@
package org.ray.api.options;
import java.util.HashMap;
import java.util.Map;
/**
* The options class for RayCall or ActorCreation.
*/
public abstract class BaseTaskOptions {
public Map<String, Double> resources;
public BaseTaskOptions() {
resources = new HashMap<>();
}
public BaseTaskOptions(Map<String, Double> resources) {
this.resources = resources;
}
}
@@ -0,0 +1,18 @@
package org.ray.api.options;
import java.util.Map;
/**
* The options for RayCall.
*/
public class CallOptions extends BaseTaskOptions {
public CallOptions() {
super();
}
public CallOptions(Map<String, Double> resources) {
super(resources);
}
}
@@ -6,6 +6,9 @@ import org.ray.api.RayObject;
import org.ray.api.WaitResult;
import org.ray.api.function.RayFunc;
import org.ray.api.id.UniqueId;
import org.ray.api.options.ActorCreationOptions;
import org.ray.api.options.BaseTaskOptions;
import org.ray.api.options.CallOptions;
/**
* Base interface of a Ray runtime.
@@ -65,9 +68,10 @@ public interface RayRuntime {
*
* @param func The remote function to run.
* @param args The arguments of the remote function.
* @param options The options for this call.
* @return The result object.
*/
RayObject call(RayFunc func, Object[] args);
RayObject call(RayFunc func, Object[] args, CallOptions options);
/**
* Invoke a remote function on an actor.
@@ -85,7 +89,9 @@ public interface RayRuntime {
* @param actorFactoryFunc A remote function whose return value is the actor object.
* @param args The arguments for the remote function.
* @param <T> The type of the actor object.
* @param options The options for creating actor.
* @return A handle to the actor.
*/
<T> RayActor<T> createActor(RayFunc actorFactoryFunc, Object[] args);
<T> RayActor<T> createActor(RayFunc actorFactoryFunc, Object[] args,
ActorCreationOptions options);
}
@@ -12,6 +12,9 @@ import org.ray.api.RayObject;
import org.ray.api.WaitResult;
import org.ray.api.function.RayFunc;
import org.ray.api.id.UniqueId;
import org.ray.api.options.ActorCreationOptions;
import org.ray.api.options.BaseTaskOptions;
import org.ray.api.options.CallOptions;
import org.ray.api.runtime.RayRuntime;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.functionmanager.FunctionManager;
@@ -186,8 +189,8 @@ public abstract class AbstractRayRuntime implements RayRuntime {
}
@Override
public RayObject call(RayFunc func, Object[] args) {
TaskSpec spec = createTaskSpec(func, RayActorImpl.NIL, args, false);
public RayObject call(RayFunc func, Object[] args, CallOptions options) {
TaskSpec spec = createTaskSpec(func, RayActorImpl.NIL, args, false, options);
rayletClient.submitTask(spec);
return new RayObjectImpl(spec.returnIds[0]);
}
@@ -198,7 +201,7 @@ public abstract class AbstractRayRuntime implements RayRuntime {
throw new IllegalArgumentException("Unsupported actor type: " + actor.getClass().getName());
}
RayActorImpl actorImpl = (RayActorImpl)actor;
TaskSpec spec = createTaskSpec(func, actorImpl, args, false);
TaskSpec spec = createTaskSpec(func, actorImpl, args, false, null);
spec.getExecutionDependencies().add(((RayActorImpl) actor).getTaskCursor());
actorImpl.setTaskCursor(spec.returnIds[1]);
rayletClient.submitTask(spec);
@@ -207,8 +210,10 @@ public abstract class AbstractRayRuntime implements RayRuntime {
@Override
@SuppressWarnings("unchecked")
public <T> RayActor<T> createActor(RayFunc actorFactoryFunc, Object[] args) {
TaskSpec spec = createTaskSpec(actorFactoryFunc, RayActorImpl.NIL, args, true);
public <T> RayActor<T> createActor(RayFunc actorFactoryFunc,
Object[] args, ActorCreationOptions options) {
TaskSpec spec = createTaskSpec(actorFactoryFunc, RayActorImpl.NIL,
args, true, options);
RayActorImpl<?> actor = new RayActorImpl(spec.returnIds[0]);
actor.increaseTaskCounter();
actor.setTaskCursor(spec.returnIds[0]);
@@ -236,11 +241,10 @@ public abstract class AbstractRayRuntime implements RayRuntime {
* @return A TaskSpec object.
*/
private TaskSpec createTaskSpec(RayFunc func, RayActorImpl actor, Object[] args,
boolean isActorCreationTask) {
boolean isActorCreationTask, BaseTaskOptions taskOptions) {
final TaskSpec current = workerContext.getCurrentTask();
UniqueId taskId = rayletClient.generateTaskId(current.driverId,
current.taskId,
workerContext.nextCallIndex());
current.taskId, workerContext.nextCallIndex());
int numReturns = actor.getId().isNil() ? 1 : 2;
UniqueId[] returnIds = genReturnIds(taskId, numReturns);
@@ -249,6 +253,18 @@ public abstract class AbstractRayRuntime implements RayRuntime {
actorCreationId = returnIds[0];
}
Map<String, Double> resources;
if (null == taskOptions) {
resources = new HashMap<>();
} else {
resources = new HashMap<>(taskOptions.resources);
}
if (!resources.containsKey(ResourceUtil.CPU_LITERAL)
&& !resources.containsKey(ResourceUtil.CPU_LITERAL.toLowerCase())) {
resources.put(ResourceUtil.CPU_LITERAL, 0.0);
}
RayFunction rayFunction = functionManager.getFunction(current.driverId, func);
return new TaskSpec(
current.driverId,
@@ -261,7 +277,7 @@ public abstract class AbstractRayRuntime implements RayRuntime {
actor.increaseTaskCounter(),
ArgumentsBuilder.wrap(args),
returnIds,
ResourceUtil.getResourcesMapFromArray(rayFunction.getRayRemoteAnnotation()),
resources,
rayFunction.getFunctionDescriptor()
);
}
@@ -2,59 +2,11 @@ package org.ray.runtime.util;
import java.util.HashMap;
import java.util.Map;
import org.ray.api.annotation.RayRemote;
import org.ray.api.annotation.ResourceItem;
public class ResourceUtil {
public static final String CPU_LITERAL = "CPU";
public static final String GPU_LITERAL = "GPU";
/**
* Convert the array that contains resource items to a map.
*
* @param remoteAnnotation The RayRemote annotation that contains the resource items.
* @return The map whose key represents the resource name
* and the value represents the resource quantity.
*/
public static Map<String, Double> getResourcesMapFromArray(RayRemote remoteAnnotation) {
Map<String, Double> resourceMap = new HashMap<>();
if (remoteAnnotation != null) {
for (ResourceItem item : remoteAnnotation.resources()) {
if (!item.name().isEmpty()) {
resourceMap.put(item.name(), item.value());
}
}
}
if (!resourceMap.containsKey(CPU_LITERAL)) {
resourceMap.put(CPU_LITERAL, 0.0);
}
return resourceMap;
}
/**
* Convert the resources map to a format string.
*
* @param resources The resource map to be Converted.
* @return The format resources string, like "{CPU:4, GPU:0}".
*/
public static String getResourcesFromatStringFromMap(Map<String, Double> resources) {
if (resources == null) {
return "{}";
}
StringBuilder builder = new StringBuilder();
builder.append("{");
int count = 1;
for (Map.Entry<String, Double> entry : resources.entrySet()) {
builder.append(entry.getKey()).append(":").append(entry.getValue());
count++;
if (count != resources.size()) {
builder.append(", ");
}
}
builder.append("}");
return builder.toString();
}
/**
* Convert resources map to a string that is used
* for the command line argument of starting raylet.
@@ -99,7 +51,7 @@ public class ResourceUtil {
String[] resourcePair = trimItem.split(":");
if (resourcePair.length != 2) {
throw new IllegalArgumentException("Format of static resurces configure is invalid.");
throw new IllegalArgumentException("Format of static resources configure is invalid.");
}
final String resourceName = resourcePair[0].trim();
@@ -21,7 +21,17 @@ public class RayCallGenerator extends BaseGenerator {
newLine("");
newLine("package org.ray.api;");
newLine("");
newLine("import org.ray.api.function.*;");
newLine("import org.ray.api.function.RayFunc;");
newLine("import org.ray.api.function.RayFunc0;");
newLine("import org.ray.api.function.RayFunc1;");
newLine("import org.ray.api.function.RayFunc2;");
newLine("import org.ray.api.function.RayFunc3;");
newLine("import org.ray.api.function.RayFunc4;");
newLine("import org.ray.api.function.RayFunc5;");
newLine("import org.ray.api.function.RayFunc6;");
newLine("import org.ray.api.options.ActorCreationOptions;");
newLine("import org.ray.api.options.BaseTaskOptions;");
newLine("import org.ray.api.options.CallOptions;");
newLine("");
newLine("/**");
@@ -33,19 +43,21 @@ public class RayCallGenerator extends BaseGenerator {
newLine(1, "// Methods for remote function invocation.");
newLine(1, "// =======================================");
for (int i = 0; i <= MAX_PARAMETERS; i++) {
buildCalls(i, false, false);
buildCalls(i, false, false, false);
buildCalls(i, false, false, true);
}
newLine(1, "// ===========================================");
newLine(1, "// Methods for remote actor method invocation.");
newLine(1, "// ===========================================");
for (int i = 0; i <= MAX_PARAMETERS - 1; i++) {
buildCalls(i, true, false);
buildCalls(i, true, false, false);
}
newLine(1, "// ===========================");
newLine(1, "// Methods for actor creation.");
newLine(1, "// ===========================");
for (int i = 0; i <= MAX_PARAMETERS; i++) {
buildCalls(i, false, true);
buildCalls(i, false, true, false);
buildCalls(i, false, true, true);
}
newLine("}");
return sb.toString();
@@ -57,7 +69,8 @@ public class RayCallGenerator extends BaseGenerator {
* @param forActor build actor api when true, otherwise build task api.
* @param forActorCreation build `Ray.createActor` when true, otherwise build `Ray.call`.
*/
private void buildCalls(int numParameters, boolean forActor, boolean forActorCreation) {
private void buildCalls(int numParameters, boolean forActor,
boolean forActorCreation, boolean hasOptionsParam) {
String genericTypes = "";
String argList = "";
for (int i = 0; i < numParameters; i++) {
@@ -82,18 +95,36 @@ public class RayCallGenerator extends BaseGenerator {
paramPrefix += ", ";
}
String optionsParam;
if (hasOptionsParam) {
optionsParam = forActorCreation ? ", ActorCreationOptions options" : ", CallOptions options";
} else {
optionsParam = "";
}
String optionsArg;
if (forActor) {
optionsArg = "";
} else {
if (hasOptionsParam) {
optionsArg = ", options";
} else {
optionsArg = ", null";
}
}
String returnType = !forActorCreation ? "RayObject<R>" : "RayActor<A>";
String funcName = !forActorCreation ? "call" : "createActor";
String funcArgs = !forActor ? "f, args" : "f, actor, args";
for (String param : generateParameters(0, numParameters)) {
// method signature
newLine(1, String.format(
"public static <%s> %s %s(%s) {",
genericTypes, returnType, funcName, paramPrefix + param
"public static <%s> %s %s(%s%s) {",
genericTypes, returnType, funcName, paramPrefix + param, optionsParam
));
// method body
newLine(2, String.format("Object[] args = new Object[]{%s};", argList));
newLine(2, String.format("return Ray.internal().%s(%s);", funcName, funcArgs));
newLine(2, String.format("return Ray.internal().%s(%s%s);", funcName, funcArgs, optionsArg));
newLine(1, "}");
}
}
@@ -1,6 +1,8 @@
package org.ray.api.test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import jdk.nashorn.internal.ir.annotations.Immutable;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -9,7 +11,8 @@ import org.ray.api.RayActor;
import org.ray.api.RayObject;
import org.ray.api.WaitResult;
import org.ray.api.annotation.RayRemote;
import org.ray.api.annotation.ResourceItem;
import org.ray.api.options.ActorCreationOptions;
import org.ray.api.options.CallOptions;
/**
* Resources Management Test.
@@ -17,36 +20,13 @@ import org.ray.api.annotation.ResourceItem;
@RunWith(MyRunner.class)
public class ResourcesManagementTest {
@RayRemote(resources = {@ResourceItem(name = "CPU", value = 4),
@ResourceItem(name = "GPU", value = 0)})
public static Integer echo1(Integer number) {
@RayRemote
public static Integer echo(Integer number) {
return number;
}
@RayRemote(resources = {@ResourceItem(name = "CPU", value = 4),
@ResourceItem(name = "GPU", value = 2)})
public static Integer echo2(Integer number) {
return number;
}
@RayRemote(resources = {@ResourceItem(name = "CPU", value = 2),
@ResourceItem(name = "GPU", value = 0)})
public static class Echo1 {
public Integer echo(Integer number) {
return number;
}
}
@RayRemote(resources = {@ResourceItem(name = "CPU", value = 8),
@ResourceItem(name = "GPU", value = 0)})
public static class Echo2 {
public Integer echo(Integer number) {
return number;
}
}
@RayRemote(resources = {@ResourceItem(name = "RES-A", value = 4)})
public static class Echo3 {
@RayRemote
public static class Echo {
public Integer echo(Integer number) {
return number;
}
@@ -54,12 +34,18 @@ public class ResourcesManagementTest {
@Test
public void testMethods() {
CallOptions callOptions1 = new CallOptions(ImmutableMap.of("CPU", 4.0, "GPU", 0.0));
// This is a case that can satisfy required resources.
RayObject<Integer> result1 = Ray.call(ResourcesManagementTest::echo1, 100);
// The static resources for test are "CPU:4,RES-A:4".
RayObject<Integer> result1 = Ray.call(ResourcesManagementTest::echo, 100, callOptions1);
Assert.assertEquals(100, (int) result1.get());
CallOptions callOptions2 = new CallOptions(ImmutableMap.of("CPU", 4.0, "GPU", 2.0));
// This is a case that can't satisfy required resources.
final RayObject<Integer> result2 = Ray.call(ResourcesManagementTest::echo2, 200);
// The static resources for test are "CPU:4,RES-A:4".
final RayObject<Integer> result2 = Ray.call(ResourcesManagementTest::echo, 200, callOptions2);
WaitResult<Integer> waitResult = Ray.wait(ImmutableList.of(result2), 1, 1000);
Assert.assertEquals(0, waitResult.getReady().size());
@@ -68,28 +54,29 @@ public class ResourcesManagementTest {
@Test
public void testActors() {
ActorCreationOptions actorCreationOptions1 =
new ActorCreationOptions(ImmutableMap.of("CPU", 2.0, "GPU", 0.0));
// This is a case that can satisfy required resources.
RayActor<ResourcesManagementTest.Echo1> echo1 = Ray.createActor(Echo1::new);
final RayObject<Integer> result1 = Ray.call(Echo1::echo, echo1, 100);
// The static resources for test are "CPU:4,RES-A:4".
RayActor<Echo> echo1 = Ray.createActor(Echo::new, actorCreationOptions1);
final RayObject<Integer> result1 = Ray.call(Echo::echo, echo1, 100);
Assert.assertEquals(100, (int) result1.get());
// This is a case that can't satisfy required resources.
RayActor<ResourcesManagementTest.Echo2> echo2 = Ray.createActor(Echo2::new);
final RayObject<Integer> result2 = Ray.call(Echo2::echo, echo2, 100);
// The static resources for test are "CPU:4,RES-A:4".
ActorCreationOptions actorCreationOptions2 =
new ActorCreationOptions(ImmutableMap.of("CPU", 8.0, "GPU", 0.0));
RayActor<ResourcesManagementTest.Echo> echo2 =
Ray.createActor(Echo::new, actorCreationOptions2);
final RayObject<Integer> result2 = Ray.call(Echo::echo, echo2, 100);
WaitResult<Integer> waitResult = Ray.wait(ImmutableList.of(result2), 1, 1000);
Assert.assertEquals(0, waitResult.getReady().size());
Assert.assertEquals(1, waitResult.getUnready().size());
}
@Test
public void testActorAndMemberMethods() {
// Note(qwang): This case depends on the following line.
// https://github.com/ray-project/ray/blob/master/java/test/src/main/java/org/ray/api/test/TestListener.java#L13
// If we change the static resources configuration item, this case may not pass.
// Then we should change this case too.
RayActor<Echo3> echo3 = Ray.createActor(Echo3::new);
Assert.assertEquals(100, (int) Ray.call(Echo3::echo, echo3, 100).get());
}
}