Add placement group java api (#9611)

* add part code

* add part code

* add part code

* fix code style

* fix review comment

* fix review comment

* add part code

* add part code

* add part code

* add part code

* fix review comment

* fix review comment

* fix code style

* fix review comment

* fix lint error

* fix lint error

Co-authored-by: 灵洵 <fengbin.ffb@antfin.com>
This commit is contained in:
fangfengbin
2020-07-25 15:39:05 +08:00
committed by GitHub
parent 5dc4b6686e
commit 28d5f9696d
22 changed files with 403 additions and 26 deletions
@@ -17,6 +17,8 @@ import io.ray.api.id.ActorId;
import io.ray.api.id.ObjectId;
import io.ray.api.options.ActorCreationOptions;
import io.ray.api.options.CallOptions;
import io.ray.api.placementgroup.PlacementGroup;
import io.ray.api.placementgroup.PlacementStrategy;
import io.ray.api.runtimecontext.RuntimeContext;
import io.ray.runtime.config.RayConfig;
import io.ray.runtime.context.RuntimeContextImpl;
@@ -35,6 +37,7 @@ import io.ray.runtime.task.FunctionArg;
import io.ray.runtime.task.TaskExecutor;
import io.ray.runtime.task.TaskSubmitter;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
@@ -155,6 +158,12 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
return (PyActorHandle) createActorImpl(functionDescriptor, args, options);
}
@Override
public PlacementGroup createPlacementGroup(List<Map<String, Double>> bundles,
PlacementStrategy strategy) {
return taskSubmitter.createPlacementGroup(bundles, strategy);
}
@SuppressWarnings("unchecked")
@Override
public <T extends BaseActorHandle> T getActorHandle(ActorId actorId) {
@@ -0,0 +1,58 @@
package io.ray.runtime.placementgroup;
import io.ray.api.id.BaseId;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
/**
* Represents the id of a placement group.
*/
public class PlacementGroupId extends BaseId implements Serializable {
public static final int LENGTH = 16;
public static final PlacementGroupId NIL = nil();
private PlacementGroupId(byte[] id) {
super(id);
}
/**
* Creates a PlacementGroupId from the given ByteBuffer.
*/
public static PlacementGroupId fromByteBuffer(ByteBuffer bb) {
return new PlacementGroupId(byteBuffer2Bytes(bb));
}
/**
* Create a PlacementGroupId instance according to the given bytes.
*/
public static PlacementGroupId fromBytes(byte[] bytes) {
return new PlacementGroupId(bytes);
}
/**
* Generate a nil PlacementGroupId.
*/
private static PlacementGroupId nil() {
byte[] b = new byte[LENGTH];
Arrays.fill(b, (byte) 0xFF);
return new PlacementGroupId(b);
}
/**
* Generate an PlacementGroupId with random value. Used for local mode and test only.
*/
public static PlacementGroupId fromRandom() {
byte[] b = new byte[LENGTH];
new Random().nextBytes(b);
return new PlacementGroupId(b);
}
@Override
public int size() {
return LENGTH;
}
}
@@ -0,0 +1,28 @@
package io.ray.runtime.placementgroup;
import io.ray.api.placementgroup.PlacementGroup;
/**
* The default implementation of `PlacementGroup` interface.
*/
public class PlacementGroupImpl implements PlacementGroup {
private PlacementGroupId id;
private int bundleCount = 0;
public PlacementGroupImpl() {
}
public PlacementGroupImpl(PlacementGroupId id, int bundleCount) {
this.id = id;
this.bundleCount = bundleCount;
}
public PlacementGroupId getId() {
return id;
}
public int getBundleCount() {
return bundleCount;
}
}
@@ -12,6 +12,8 @@ import io.ray.api.id.TaskId;
import io.ray.api.id.UniqueId;
import io.ray.api.options.ActorCreationOptions;
import io.ray.api.options.CallOptions;
import io.ray.api.placementgroup.PlacementGroup;
import io.ray.api.placementgroup.PlacementStrategy;
import io.ray.runtime.RayRuntimeInternal;
import io.ray.runtime.actor.LocalModeActorHandle;
import io.ray.runtime.context.LocalModeWorkerContext;
@@ -27,6 +29,7 @@ import io.ray.runtime.generated.Common.TaskSpec;
import io.ray.runtime.generated.Common.TaskType;
import io.ray.runtime.object.LocalModeObjectStore;
import io.ray.runtime.object.NativeRayObject;
import io.ray.runtime.placementgroup.PlacementGroupImpl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -102,7 +105,7 @@ public class LocalModeTaskSubmitter implements TaskSubmitter {
// Check whether task arguments are ready.
for (TaskArg arg : taskSpec.getArgsList()) {
ByteString idByteString = arg.getObjectRef().getObjectId();
if (idByteString != ByteString.EMPTY) {
if (idByteString != ByteString.EMPTY) {
ObjectId id = new ObjectId(idByteString.toByteArray());
if (!objectStore.isObjectReady(id)) {
unreadyObjects.add(id);
@@ -209,6 +212,12 @@ public class LocalModeTaskSubmitter implements TaskSubmitter {
}
}
@Override
public PlacementGroup createPlacementGroup(List<Map<String, Double>> bundles,
PlacementStrategy strategy) {
return new PlacementGroupImpl();
}
@Override
public BaseActorHandle getActor(ActorId actorId) {
return actorHandles.get(actorId).copy();
@@ -8,9 +8,14 @@ import io.ray.api.id.ActorId;
import io.ray.api.id.ObjectId;
import io.ray.api.options.ActorCreationOptions;
import io.ray.api.options.CallOptions;
import io.ray.api.placementgroup.PlacementGroup;
import io.ray.api.placementgroup.PlacementStrategy;
import io.ray.runtime.actor.NativeActorHandle;
import io.ray.runtime.functionmanager.FunctionDescriptor;
import io.ray.runtime.placementgroup.PlacementGroupId;
import io.ray.runtime.placementgroup.PlacementGroupImpl;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@@ -34,11 +39,20 @@ public class NativeTaskSubmitter implements TaskSubmitter {
@Override
public BaseActorHandle createActor(FunctionDescriptor functionDescriptor, List<FunctionArg> args,
ActorCreationOptions options) throws IllegalArgumentException {
if (options != null && StringUtils.isNotBlank(options.name)) {
Optional<BaseActorHandle> actor =
options.global ? Ray.getGlobalActor(options.name) : Ray.getActor(options.name);
Preconditions.checkArgument(!actor.isPresent(),
String.format("Actor of name %s exists", options.name));
if (options != null) {
if (options.group != null) {
PlacementGroupImpl group = (PlacementGroupImpl)options.group;
Preconditions.checkArgument(options.bundleIndex >= 0
&& options.bundleIndex < group.getBundleCount(),
String.format("Bundle index %s is invalid", options.bundleIndex));
}
if (StringUtils.isNotBlank(options.name)) {
Optional<BaseActorHandle> actor =
options.global ? Ray.getGlobalActor(options.name) : Ray.getActor(options.name);
Preconditions.checkArgument(!actor.isPresent(),
String.format("Actor of name %s exists", options.name));
}
}
byte[] actorId = nativeCreateActor(functionDescriptor, functionDescriptor.hashCode(), args,
options);
@@ -63,6 +77,13 @@ public class NativeTaskSubmitter implements TaskSubmitter {
return returnIds.stream().map(ObjectId::new).collect(Collectors.toList());
}
@Override
public PlacementGroup createPlacementGroup(List<Map<String, Double>> bundles,
PlacementStrategy strategy) {
byte[] bytes = nativeCreatePlacementGroup(bundles, strategy.value());
return new PlacementGroupImpl(PlacementGroupId.fromBytes(bytes), bundles.size());
}
private static native List<byte[]> nativeSubmitTask(FunctionDescriptor functionDescriptor,
int functionDescriptorHash, List<FunctionArg> args, int numReturns, CallOptions callOptions);
@@ -73,4 +94,7 @@ public class NativeTaskSubmitter implements TaskSubmitter {
private static native List<byte[]> nativeSubmitActorTask(byte[] actorId,
FunctionDescriptor functionDescriptor, int functionDescriptorHash, List<FunctionArg> args,
int numReturns, CallOptions callOptions);
private static native byte[] nativeCreatePlacementGroup(List<Map<String, Double>> bundles,
int strategy);
}
@@ -5,8 +5,11 @@ import io.ray.api.id.ActorId;
import io.ray.api.id.ObjectId;
import io.ray.api.options.ActorCreationOptions;
import io.ray.api.options.CallOptions;
import io.ray.api.placementgroup.PlacementGroup;
import io.ray.api.placementgroup.PlacementStrategy;
import io.ray.runtime.functionmanager.FunctionDescriptor;
import java.util.List;
import java.util.Map;
/**
* A set of methods to submit tasks and create actors.
@@ -47,6 +50,15 @@ public interface TaskSubmitter {
List<ObjectId> submitActorTask(BaseActorHandle actor, FunctionDescriptor functionDescriptor,
List<FunctionArg> args, int numReturns, CallOptions options);
/**
* Create a placement group.
* @param bundles Preallocated resource list.
* @param strategy Actor placement strategy.
* @return A handle to the created placement group.
*/
PlacementGroup createPlacementGroup(List<Map<String, Double>> bundles,
PlacementStrategy strategy);
BaseActorHandle getActor(ActorId actorId);
}