fix java build failure (#6062)

This commit is contained in:
Zhijun Fu
2019-12-06 14:38:43 +08:00
committed by Hao Chen
parent 1c638a11a7
commit b88b8202cc
33 changed files with 251 additions and 378 deletions
@@ -14,10 +14,4 @@ public interface RayActor<T> {
* @return The id of this actor.
*/
ActorId getId();
/**
* @return The id of this actor handle.
*/
UniqueId getHandleId();
}
@@ -165,7 +165,8 @@ public abstract class AbstractRayRuntime implements RayRuntime {
private RayObject callNormalFunction(FunctionDescriptor functionDescriptor,
Object[] args, int numReturns, CallOptions options) {
List<FunctionArg> functionArgs = ArgumentsBuilder.wrap(args, /*isDirectCall*/false);
List<FunctionArg> functionArgs = ArgumentsBuilder
.wrap(args, functionDescriptor.getLanguage(), /*isDirectCall*/false);
List<ObjectId> returnIds = taskSubmitter.submitTask(functionDescriptor,
functionArgs, numReturns, options);
Preconditions.checkState(returnIds.size() == numReturns && returnIds.size() <= 1);
@@ -178,7 +179,8 @@ public abstract class AbstractRayRuntime implements RayRuntime {
private RayObject callActorFunction(RayActor rayActor,
FunctionDescriptor functionDescriptor, Object[] args, int numReturns) {
List<FunctionArg> functionArgs = ArgumentsBuilder.wrap(args, isDirectCall(rayActor));
List<FunctionArg> functionArgs = ArgumentsBuilder
.wrap(args, functionDescriptor.getLanguage(), isDirectCall(rayActor));
List<ObjectId> returnIds = taskSubmitter.submitActorTask(rayActor,
functionDescriptor, functionArgs, numReturns, null);
Preconditions.checkState(returnIds.size() == numReturns && returnIds.size() <= 1);
@@ -191,7 +193,8 @@ public abstract class AbstractRayRuntime implements RayRuntime {
private RayActor createActorImpl(FunctionDescriptor functionDescriptor,
Object[] args, ActorCreationOptions options) {
List<FunctionArg> functionArgs = ArgumentsBuilder.wrap(args, /*isDirectCall*/false);
List<FunctionArg> functionArgs = ArgumentsBuilder
.wrap(args, functionDescriptor.getLanguage(), /*isDirectCall*/false);
if (functionDescriptor.getLanguage() != Language.JAVA && options != null) {
Preconditions.checkState(Strings.isNullOrEmpty(options.jvmOptions));
}
@@ -109,6 +109,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
// TODO(qwang): Get object_store_socket_name and raylet_socket_name from Redis.
nativeCoreWorkerPointer = nativeInitCoreWorker(rayConfig.workerMode.getNumber(),
rayConfig.objectStoreSocketName, rayConfig.rayletSocketName,
rayConfig.nodeIp, rayConfig.getNodeManagerPort(),
(rayConfig.workerMode == WorkerType.DRIVER ? rayConfig.getJobId() : JobId.NIL).getBytes(),
new GcsClientOptions(rayConfig));
Preconditions.checkState(nativeCoreWorkerPointer != 0);
@@ -150,6 +151,10 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
nativeRunTaskExecutor(nativeCoreWorkerPointer, taskExecutor);
}
public long getNativeCoreWorkerPointer() {
return nativeCoreWorkerPointer;
}
/**
* Register this worker or driver to GCS.
*/
@@ -176,7 +181,8 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
}
private static native long nativeInitCoreWorker(int workerMode, String storeSocket,
String rayletSocket, byte[] jobId, GcsClientOptions gcsClientOptions);
String rayletSocket, String nodeIpAddress, int nodeManagerPort, byte[] jobId,
GcsClientOptions gcsClientOptions);
private static native void nativeRunTaskExecutor(long nativeCoreWorkerPointer,
TaskExecutor taskExecutor);
@@ -35,11 +35,6 @@ public class LocalModeRayActor implements RayActor, Externalizable {
return actorId;
}
@Override
public UniqueId getHandleId() {
return UniqueId.NIL;
}
public ObjectId exchangePreviousActorTaskDummyObjectId(ObjectId previousActorTaskDummyObjectId) {
return this.previousActorTaskDummyObjectId.getAndSet(previousActorTaskDummyObjectId);
}
@@ -6,10 +6,15 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.List;
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.RayPyActor;
import org.ray.api.id.ActorId;
import org.ray.api.id.UniqueId;
import org.ray.api.runtime.RayRuntime;
import org.ray.runtime.AbstractRayRuntime;
import org.ray.runtime.RayNativeRuntime;
import org.ray.runtime.RayMultiWorkerNativeRuntime;
import org.ray.runtime.generated.Common.Language;
/**
@@ -18,13 +23,19 @@ import org.ray.runtime.generated.Common.Language;
public class NativeRayActor implements RayActor, RayPyActor, Externalizable {
/**
* Address of native actor handle.
* Address of core worker.
*/
private long nativeActorHandle;
private long nativeCoreWorkerPointer;
/**
* ID of the actor.
*/
private byte[] actorId;
public NativeRayActor(long nativeActorHandle) {
Preconditions.checkState(nativeActorHandle != 0);
this.nativeActorHandle = nativeActorHandle;
public NativeRayActor(long nativeCoreWorkerPointer, byte[] actorId) {
Preconditions.checkState(nativeCoreWorkerPointer != 0);
Preconditions.checkState(!ActorId.fromBytes(actorId).isNil());
this.nativeCoreWorkerPointer = nativeCoreWorkerPointer;
this.actorId = actorId;
}
/**
@@ -33,75 +44,64 @@ public class NativeRayActor implements RayActor, RayPyActor, Externalizable {
public NativeRayActor() {
}
public long getNativeActorHandle() {
return nativeActorHandle;
}
@Override
public ActorId getId() {
return ActorId.fromBytes(nativeGetActorId(nativeActorHandle));
}
@Override
public UniqueId getHandleId() {
return new UniqueId(nativeGetActorHandleId(nativeActorHandle));
return ActorId.fromBytes(actorId);
}
public Language getLanguage() {
return Language.forNumber(nativeGetLanguage(nativeActorHandle));
return Language.forNumber(nativeGetLanguage(nativeCoreWorkerPointer, actorId));
}
public boolean isDirectCallActor() {
return nativeIsDirectCallActor(nativeActorHandle);
return nativeIsDirectCallActor(nativeCoreWorkerPointer, actorId);
}
@Override
public String getModuleName() {
Preconditions.checkState(getLanguage() == Language.PYTHON);
return nativeGetActorCreationTaskFunctionDescriptor(nativeActorHandle).get(0);
return nativeGetActorCreationTaskFunctionDescriptor(
nativeCoreWorkerPointer, actorId).get(0);
}
@Override
public String getClassName() {
Preconditions.checkState(getLanguage() == Language.PYTHON);
return nativeGetActorCreationTaskFunctionDescriptor(nativeActorHandle).get(1);
}
public NativeRayActor fork() {
return new NativeRayActor(nativeFork(nativeActorHandle));
return nativeGetActorCreationTaskFunctionDescriptor(
nativeCoreWorkerPointer, actorId).get(1);
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(nativeSerialize(nativeActorHandle));
out.writeObject(nativeSerialize(nativeCoreWorkerPointer, actorId));
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
nativeActorHandle = nativeDeserialize((byte[]) in.readObject());
RayRuntime runtime = Ray.internal();
if (runtime instanceof RayMultiWorkerNativeRuntime) {
runtime = ((RayMultiWorkerNativeRuntime) runtime).getCurrentRuntime();
}
Preconditions.checkState(runtime instanceof RayNativeRuntime);
nativeCoreWorkerPointer = ((RayNativeRuntime)runtime).getNativeCoreWorkerPointer();
actorId = nativeDeserialize(nativeCoreWorkerPointer, (byte[]) in.readObject());
}
@Override
protected void finalize() {
nativeFree(nativeActorHandle);
// TODO(zhijunfu): do we need to free the ActorHandle in core worker?
}
private static native long nativeFork(long nativeActorHandle);
private static native int nativeGetLanguage(long nativeCoreWorkerPointer, byte[] actorId);
private static native byte[] nativeGetActorId(long nativeActorHandle);
private static native byte[] nativeGetActorHandleId(long nativeActorHandle);
private static native int nativeGetLanguage(long nativeActorHandle);
private static native boolean nativeIsDirectCallActor(long nativeActorHandle);
private static native boolean nativeIsDirectCallActor(long nativeCoreWorkerPointer, byte[] actorId);
private static native List<String> nativeGetActorCreationTaskFunctionDescriptor(
long nativeActorHandle);
long nativeCoreWorkerPointer, byte[] actorId);
private static native byte[] nativeSerialize(long nativeActorHandle);
private static native byte[] nativeSerialize(long nativeCoreWorkerPointer, byte[] actorId);
private static native long nativeDeserialize(byte[] data);
private static native void nativeFree(long nativeActorHandle);
private static native byte[] nativeDeserialize(long nativeCoreWorkerPointer, byte[] data);
}
@@ -15,7 +15,7 @@ public class NativeRayActorSerializer extends FSTBasicObjectSerializer {
@Override
public void writeObject(FSTObjectOutput out, Object toWrite, FSTClazzInfo clzInfo,
FSTClazzInfo.FSTFieldInfo referencedBy, int streamPosition) throws IOException {
((NativeRayActor) toWrite).fork().writeExternal(out);
((NativeRayActor) toWrite).writeExternal(out);
}
@Override
@@ -52,6 +52,7 @@ public class RayConfig {
public final Long objectStoreSize;
public final String rayletSocketName;
private int nodeManagerPort;
public final List<String> rayletConfigParameters;
public final String jobResourcePath;
@@ -154,11 +155,18 @@ public class RayConfig {
// Raylet socket name.
rayletSocketName = config.getString("ray.raylet.socket-name");
// Raylet node manager port.
nodeManagerPort = config.getInt("ray.raylet.node-manager-port");
if (nodeManagerPort == 0) {
Preconditions.checkState(this.redisAddress == null,
"Java worker started by raylet should accept the node manager port from raylet.");
nodeManagerPort = NetworkUtil.getUnusedPort();
}
// Raylet parameters.
rayletConfigParameters = new ArrayList<>();
Config rayletConfig = config.getConfig("ray.raylet.config");
for (Map.Entry<String,ConfigValue> entry : rayletConfig.entrySet()) {
for (Map.Entry<String, ConfigValue> entry : rayletConfig.entrySet()) {
String parameter = entry.getKey() + "," + entry.getValue().unwrapped();
rayletConfigParameters.add(parameter);
}
@@ -211,6 +219,10 @@ public class RayConfig {
return this.jobId;
}
public int getNodeManagerPort() {
return nodeManagerPort;
}
@Override
public String toString() {
return "RayConfig{"
@@ -243,7 +255,7 @@ public class RayConfig {
* 1. System properties.
* 2. `ray.conf` file.
* 3. `ray.default.conf` file.
*/
*/
public static RayConfig create() {
ConfigFactory.invalidateCaches();
Config config = ConfigFactory.systemProperties();
@@ -258,7 +258,7 @@ public class RunManager {
String.format("--raylet_socket_name=%s", rayConfig.rayletSocketName),
String.format("--store_socket_name=%s", rayConfig.objectStoreSocketName),
String.format("--object_manager_port=%d", 0), // The object manager port.
String.format("--node_manager_port=%d", 0), // The node manager port.
String.format("--node_manager_port=%d", rayConfig.getNodeManagerPort()), // The node manager port.
String.format("--node_ip_address=%s", rayConfig.nodeIp),
String.format("--redis_address=%s", rayConfig.getRedisIp()),
String.format("--redis_port=%d", rayConfig.getRedisPort()),
@@ -312,6 +312,8 @@ public class RunManager {
cmd.add("-Dray.raylet.socket-name=" + rayConfig.rayletSocketName);
cmd.add("-Dray.object-store.socket-name=" + rayConfig.objectStoreSocketName);
cmd.add("-Dray.raylet.node-manager-port=" + rayConfig.getNodeManagerPort());
// Config overwrite
cmd.add("-Dray.redis.address=" + rayConfig.getRedisAddress());
@@ -337,7 +339,7 @@ public class RunManager {
private void startObjectStore() {
try (FileUtil.TempFile plasmaStoreFile = FileUtil
.getTempFileFromResource("plasma_store_server")) {
.getTempFileFromResource("external/plasma/plasma_store_server")) {
plasmaStoreFile.getFile().setExecutable(true);
List<String> command = ImmutableList.of(
// The plasma store executable file.
@@ -8,6 +8,7 @@ import org.ray.api.id.ObjectId;
import org.ray.api.runtime.RayRuntime;
import org.ray.runtime.AbstractRayRuntime;
import org.ray.runtime.RayMultiWorkerNativeRuntime;
import org.ray.runtime.generated.Common.Language;
import org.ray.runtime.object.NativeRayObject;
import org.ray.runtime.object.ObjectSerializer;
@@ -22,10 +23,16 @@ public class ArgumentsBuilder {
*/
private static final int LARGEST_SIZE_PASS_BY_VALUE = 100 * 1024;
/**
* This dummy type is also defined in signature.py. Please keep it synced.
*/
private static final NativeRayObject PYTHON_DUMMY_TYPE = ObjectSerializer
.serialize("__RAY_DUMMY__".getBytes());
/**
* Convert real function arguments to task spec arguments.
*/
public static List<FunctionArg> wrap(Object[] args, boolean isDirectCall) {
public static List<FunctionArg> wrap(Object[] args, Language language, boolean isDirectCall) {
List<FunctionArg> ret = new ArrayList<>();
for (Object arg : args) {
ObjectId id = null;
@@ -48,6 +55,9 @@ public class ArgumentsBuilder {
value = null;
}
}
if (language == Language.PYTHON) {
ret.add(FunctionArg.passByValue(PYTHON_DUMMY_TYPE));
}
if (id != null) {
ret.add(FunctionArg.passByReference(id));
} else {
@@ -35,9 +35,9 @@ public class NativeTaskSubmitter implements TaskSubmitter {
@Override
public RayActor createActor(FunctionDescriptor functionDescriptor, List<FunctionArg> args,
ActorCreationOptions options) {
long nativeActorHandle = nativeCreateActor(nativeCoreWorkerPointer, functionDescriptor, args,
byte[] actorId = nativeCreateActor(nativeCoreWorkerPointer, functionDescriptor, args,
options);
return new NativeRayActor(nativeActorHandle);
return new NativeRayActor(nativeCoreWorkerPointer, actorId);
}
@Override
@@ -45,7 +45,7 @@ public class NativeTaskSubmitter implements TaskSubmitter {
List<FunctionArg> args, int numReturns, CallOptions options) {
Preconditions.checkState(actor instanceof NativeRayActor);
List<byte[]> returnIds = nativeSubmitActorTask(nativeCoreWorkerPointer,
((NativeRayActor) actor).getNativeActorHandle(), functionDescriptor, args, numReturns,
actor.getId().getBytes(), functionDescriptor, args, numReturns,
options);
return returnIds.stream().map(ObjectId::new).collect(Collectors.toList());
}
@@ -54,11 +54,11 @@ public class NativeTaskSubmitter implements TaskSubmitter {
FunctionDescriptor functionDescriptor, List<FunctionArg> args, int numReturns,
CallOptions callOptions);
private static native long nativeCreateActor(long nativeCoreWorkerPointer,
private static native byte[] nativeCreateActor(long nativeCoreWorkerPointer,
FunctionDescriptor functionDescriptor, List<FunctionArg> args,
ActorCreationOptions actorCreationOptions);
private static native List<byte[]> nativeSubmitActorTask(long nativeCoreWorkerPointer,
long nativeActorHandle, FunctionDescriptor functionDescriptor, List<FunctionArg> args,
byte[] actorId, FunctionDescriptor functionDescriptor, List<FunctionArg> args,
int numReturns, CallOptions callOptions);
}
@@ -5,6 +5,7 @@ import java.io.IOException;
import java.net.DatagramSocket;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.util.Enumeration;
@@ -47,6 +48,19 @@ public class NetworkUtil {
return "127.0.0.1";
}
public static int getUnusedPort() {
int port;
try {
ServerSocket ss = new ServerSocket();
ss.bind(new InetSocketAddress(0));
port = ss.getLocalPort();
ss.close();
} catch (Exception e) {
throw new RuntimeException("Failed to bind to an available port.", e);
}
return port;
}
public static boolean isPortAvailable(int port) {
if (port < 1 || port > 65535) {
throw new IllegalArgumentException("Invalid start port: " + port);
@@ -84,6 +84,8 @@ ray {
raylet {
// RPC socket name of Raylet
socket-name: /tmp/ray/sockets/raylet
// Listening port for node manager.
node-manager-port: 0
// See src/ray/ray_config_def.h for options.
config {
@@ -12,8 +12,6 @@ import org.ray.api.TestUtils.LargeObject;
import org.ray.api.annotation.RayRemote;
import org.ray.api.exception.UnreconstructableException;
import org.ray.api.id.UniqueId;
import org.ray.runtime.actor.NativeRayActor;
import org.ray.runtime.object.NativeRayObject;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -107,14 +105,6 @@ public class ActorTest extends BaseTest {
.get());
}
public void testForkingActorHandle() {
TestUtils.skipTestUnderSingleProcess();
RayActor<Counter> counter = Ray.createActor(Counter::new, 100);
Assert.assertEquals(Integer.valueOf(101), Ray.call(Counter::increaseAndGet, counter, 1).get());
RayActor<Counter> counter2 = ((NativeRayActor) counter).fork();
Assert.assertEquals(Integer.valueOf(103), Ray.call(Counter::increaseAndGet, counter2, 2).get());
}
public void testUnreconstructableActorObject() throws InterruptedException {
TestUtils.skipTestUnderSingleProcess();
// The UnreconstructableException is created by raylet.
@@ -128,9 +118,9 @@ public class ActorTest extends BaseTest {
Ray.internal().free(ImmutableList.of(value.getId()), false, false);
// Wait until the object is deleted, because the above free operation is async.
while (true) {
NativeRayObject result = TestUtils.getRuntime().getObjectStore()
.getRaw(ImmutableList.of(value.getId()), 0).get(0);
if (result == null) {
Boolean result = TestUtils.getRuntime().getObjectStore()
.wait(ImmutableList.of(value.getId()), 1, 0).get(0);
if (!result) {
break;
}
TimeUnit.MILLISECONDS.sleep(100);
@@ -10,6 +10,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.ray.api.Ray;
import org.ray.runtime.util.NetworkUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.SkipException;
@@ -65,6 +66,8 @@ public abstract class BaseMultiLanguageTest {
}
}
String nodeManagerPort = String.valueOf(NetworkUtil.getUnusedPort());
// Start ray cluster.
String workerOptions =
" -classpath " + System.getProperty("java.class.path");
@@ -75,6 +78,7 @@ public abstract class BaseMultiLanguageTest {
"--redis-port=6379",
String.format("--plasma-store-socket-name=%s", PLASMA_STORE_SOCKET_NAME),
String.format("--raylet-socket-name=%s", RAYLET_SOCKET_NAME),
String.format("--node-manager-port=%s", nodeManagerPort),
"--load-code-from-local",
"--include-java",
"--java-worker-options=" + workerOptions
@@ -94,6 +98,7 @@ public abstract class BaseMultiLanguageTest {
System.setProperty("ray.redis.address", "127.0.0.1:6379");
System.setProperty("ray.object-store.socket-name", PLASMA_STORE_SOCKET_NAME);
System.setProperty("ray.raylet.socket-name", RAYLET_SOCKET_NAME);
System.setProperty("ray.raylet.node-manager-port", nodeManagerPort);
Ray.init();
}
@@ -113,6 +118,7 @@ public abstract class BaseMultiLanguageTest {
System.clearProperty("ray.redis.address");
System.clearProperty("ray.object-store.socket-name");
System.clearProperty("ray.raylet.socket-name");
System.clearProperty("ray.raylet.node-manager-port");
// Stop ray cluster.
final List<String> stopCommand = ImmutableList.of(
@@ -26,7 +26,7 @@ public class PlasmaFreeTest extends BaseTest {
final boolean result = TestUtils.waitForCondition(() ->
TestUtils.getRuntime().getObjectStore()
.getRaw(ImmutableList.of(helloId.getId()), 0).get(0) == null, 50);
.wait(ImmutableList.of(helloId.getId()), 1, 0).get(0) == false, 50);
Assert.assertTrue(result);
}