diff --git a/java/api/src/main/java/org/ray/api/RayActor.java b/java/api/src/main/java/org/ray/api/RayActor.java index 8d44901ea..0c1a7bd84 100644 --- a/java/api/src/main/java/org/ray/api/RayActor.java +++ b/java/api/src/main/java/org/ray/api/RayActor.java @@ -14,10 +14,4 @@ public interface RayActor { * @return The id of this actor. */ ActorId getId(); - - /** - * @return The id of this actor handle. - */ - UniqueId getHandleId(); - } diff --git a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java index be7c4e471..9c39e7a2a 100644 --- a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java @@ -165,7 +165,8 @@ public abstract class AbstractRayRuntime implements RayRuntime { private RayObject callNormalFunction(FunctionDescriptor functionDescriptor, Object[] args, int numReturns, CallOptions options) { - List functionArgs = ArgumentsBuilder.wrap(args, /*isDirectCall*/false); + List functionArgs = ArgumentsBuilder + .wrap(args, functionDescriptor.getLanguage(), /*isDirectCall*/false); List returnIds = taskSubmitter.submitTask(functionDescriptor, functionArgs, numReturns, options); Preconditions.checkState(returnIds.size() == numReturns && returnIds.size() <= 1); @@ -178,7 +179,8 @@ public abstract class AbstractRayRuntime implements RayRuntime { private RayObject callActorFunction(RayActor rayActor, FunctionDescriptor functionDescriptor, Object[] args, int numReturns) { - List functionArgs = ArgumentsBuilder.wrap(args, isDirectCall(rayActor)); + List functionArgs = ArgumentsBuilder + .wrap(args, functionDescriptor.getLanguage(), isDirectCall(rayActor)); List returnIds = taskSubmitter.submitActorTask(rayActor, functionDescriptor, functionArgs, numReturns, null); Preconditions.checkState(returnIds.size() == numReturns && returnIds.size() <= 1); @@ -191,7 +193,8 @@ public abstract class AbstractRayRuntime implements RayRuntime { private RayActor createActorImpl(FunctionDescriptor functionDescriptor, Object[] args, ActorCreationOptions options) { - List functionArgs = ArgumentsBuilder.wrap(args, /*isDirectCall*/false); + List functionArgs = ArgumentsBuilder + .wrap(args, functionDescriptor.getLanguage(), /*isDirectCall*/false); if (functionDescriptor.getLanguage() != Language.JAVA && options != null) { Preconditions.checkState(Strings.isNullOrEmpty(options.jvmOptions)); } diff --git a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java index 56002a595..4775553c5 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java @@ -109,6 +109,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime { // TODO(qwang): Get object_store_socket_name and raylet_socket_name from Redis. nativeCoreWorkerPointer = nativeInitCoreWorker(rayConfig.workerMode.getNumber(), rayConfig.objectStoreSocketName, rayConfig.rayletSocketName, + rayConfig.nodeIp, rayConfig.getNodeManagerPort(), (rayConfig.workerMode == WorkerType.DRIVER ? rayConfig.getJobId() : JobId.NIL).getBytes(), new GcsClientOptions(rayConfig)); Preconditions.checkState(nativeCoreWorkerPointer != 0); @@ -150,6 +151,10 @@ public final class RayNativeRuntime extends AbstractRayRuntime { nativeRunTaskExecutor(nativeCoreWorkerPointer, taskExecutor); } + public long getNativeCoreWorkerPointer() { + return nativeCoreWorkerPointer; + } + /** * Register this worker or driver to GCS. */ @@ -176,7 +181,8 @@ public final class RayNativeRuntime extends AbstractRayRuntime { } private static native long nativeInitCoreWorker(int workerMode, String storeSocket, - String rayletSocket, byte[] jobId, GcsClientOptions gcsClientOptions); + String rayletSocket, String nodeIpAddress, int nodeManagerPort, byte[] jobId, + GcsClientOptions gcsClientOptions); private static native void nativeRunTaskExecutor(long nativeCoreWorkerPointer, TaskExecutor taskExecutor); diff --git a/java/runtime/src/main/java/org/ray/runtime/actor/LocalModeRayActor.java b/java/runtime/src/main/java/org/ray/runtime/actor/LocalModeRayActor.java index 4ffe36d3d..f8bca58c9 100644 --- a/java/runtime/src/main/java/org/ray/runtime/actor/LocalModeRayActor.java +++ b/java/runtime/src/main/java/org/ray/runtime/actor/LocalModeRayActor.java @@ -35,11 +35,6 @@ public class LocalModeRayActor implements RayActor, Externalizable { return actorId; } - @Override - public UniqueId getHandleId() { - return UniqueId.NIL; - } - public ObjectId exchangePreviousActorTaskDummyObjectId(ObjectId previousActorTaskDummyObjectId) { return this.previousActorTaskDummyObjectId.getAndSet(previousActorTaskDummyObjectId); } diff --git a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActor.java b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActor.java index 8dd7ac8c3..a4d7c31b9 100644 --- a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActor.java +++ b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActor.java @@ -6,10 +6,15 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.List; +import org.ray.api.Ray; import org.ray.api.RayActor; import org.ray.api.RayPyActor; import org.ray.api.id.ActorId; import org.ray.api.id.UniqueId; +import org.ray.api.runtime.RayRuntime; +import org.ray.runtime.AbstractRayRuntime; +import org.ray.runtime.RayNativeRuntime; +import org.ray.runtime.RayMultiWorkerNativeRuntime; import org.ray.runtime.generated.Common.Language; /** @@ -18,13 +23,19 @@ import org.ray.runtime.generated.Common.Language; public class NativeRayActor implements RayActor, RayPyActor, Externalizable { /** - * Address of native actor handle. + * Address of core worker. */ - private long nativeActorHandle; + private long nativeCoreWorkerPointer; + /** + * ID of the actor. + */ + private byte[] actorId; - public NativeRayActor(long nativeActorHandle) { - Preconditions.checkState(nativeActorHandle != 0); - this.nativeActorHandle = nativeActorHandle; + public NativeRayActor(long nativeCoreWorkerPointer, byte[] actorId) { + Preconditions.checkState(nativeCoreWorkerPointer != 0); + Preconditions.checkState(!ActorId.fromBytes(actorId).isNil()); + this.nativeCoreWorkerPointer = nativeCoreWorkerPointer; + this.actorId = actorId; } /** @@ -33,75 +44,64 @@ public class NativeRayActor implements RayActor, RayPyActor, Externalizable { public NativeRayActor() { } - public long getNativeActorHandle() { - return nativeActorHandle; - } - @Override public ActorId getId() { - return ActorId.fromBytes(nativeGetActorId(nativeActorHandle)); - } - - @Override - public UniqueId getHandleId() { - return new UniqueId(nativeGetActorHandleId(nativeActorHandle)); + return ActorId.fromBytes(actorId); } public Language getLanguage() { - return Language.forNumber(nativeGetLanguage(nativeActorHandle)); + return Language.forNumber(nativeGetLanguage(nativeCoreWorkerPointer, actorId)); } public boolean isDirectCallActor() { - return nativeIsDirectCallActor(nativeActorHandle); + return nativeIsDirectCallActor(nativeCoreWorkerPointer, actorId); } @Override public String getModuleName() { Preconditions.checkState(getLanguage() == Language.PYTHON); - return nativeGetActorCreationTaskFunctionDescriptor(nativeActorHandle).get(0); + return nativeGetActorCreationTaskFunctionDescriptor( + nativeCoreWorkerPointer, actorId).get(0); } @Override public String getClassName() { Preconditions.checkState(getLanguage() == Language.PYTHON); - return nativeGetActorCreationTaskFunctionDescriptor(nativeActorHandle).get(1); - } - - public NativeRayActor fork() { - return new NativeRayActor(nativeFork(nativeActorHandle)); + return nativeGetActorCreationTaskFunctionDescriptor( + nativeCoreWorkerPointer, actorId).get(1); } @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(nativeSerialize(nativeActorHandle)); + out.writeObject(nativeSerialize(nativeCoreWorkerPointer, actorId)); } @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - nativeActorHandle = nativeDeserialize((byte[]) in.readObject()); + RayRuntime runtime = Ray.internal(); + if (runtime instanceof RayMultiWorkerNativeRuntime) { + runtime = ((RayMultiWorkerNativeRuntime) runtime).getCurrentRuntime(); + } + + Preconditions.checkState(runtime instanceof RayNativeRuntime); + nativeCoreWorkerPointer = ((RayNativeRuntime)runtime).getNativeCoreWorkerPointer(); + + actorId = nativeDeserialize(nativeCoreWorkerPointer, (byte[]) in.readObject()); } @Override protected void finalize() { - nativeFree(nativeActorHandle); + // TODO(zhijunfu): do we need to free the ActorHandle in core worker? } - private static native long nativeFork(long nativeActorHandle); + private static native int nativeGetLanguage(long nativeCoreWorkerPointer, byte[] actorId); - private static native byte[] nativeGetActorId(long nativeActorHandle); - - private static native byte[] nativeGetActorHandleId(long nativeActorHandle); - - private static native int nativeGetLanguage(long nativeActorHandle); - - private static native boolean nativeIsDirectCallActor(long nativeActorHandle); + private static native boolean nativeIsDirectCallActor(long nativeCoreWorkerPointer, byte[] actorId); private static native List nativeGetActorCreationTaskFunctionDescriptor( - long nativeActorHandle); + long nativeCoreWorkerPointer, byte[] actorId); - private static native byte[] nativeSerialize(long nativeActorHandle); + private static native byte[] nativeSerialize(long nativeCoreWorkerPointer, byte[] actorId); - private static native long nativeDeserialize(byte[] data); - - private static native void nativeFree(long nativeActorHandle); + private static native byte[] nativeDeserialize(long nativeCoreWorkerPointer, byte[] data); } diff --git a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActorSerializer.java b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActorSerializer.java index 11102cdd8..45a3b932d 100644 --- a/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActorSerializer.java +++ b/java/runtime/src/main/java/org/ray/runtime/actor/NativeRayActorSerializer.java @@ -15,7 +15,7 @@ public class NativeRayActorSerializer extends FSTBasicObjectSerializer { @Override public void writeObject(FSTObjectOutput out, Object toWrite, FSTClazzInfo clzInfo, FSTClazzInfo.FSTFieldInfo referencedBy, int streamPosition) throws IOException { - ((NativeRayActor) toWrite).fork().writeExternal(out); + ((NativeRayActor) toWrite).writeExternal(out); } @Override diff --git a/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java b/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java index 3be4bee26..27aec4ce8 100644 --- a/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java +++ b/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java @@ -52,6 +52,7 @@ public class RayConfig { public final Long objectStoreSize; public final String rayletSocketName; + private int nodeManagerPort; public final List rayletConfigParameters; public final String jobResourcePath; @@ -154,11 +155,18 @@ public class RayConfig { // Raylet socket name. rayletSocketName = config.getString("ray.raylet.socket-name"); + // Raylet node manager port. + nodeManagerPort = config.getInt("ray.raylet.node-manager-port"); + if (nodeManagerPort == 0) { + Preconditions.checkState(this.redisAddress == null, + "Java worker started by raylet should accept the node manager port from raylet."); + nodeManagerPort = NetworkUtil.getUnusedPort(); + } // Raylet parameters. rayletConfigParameters = new ArrayList<>(); Config rayletConfig = config.getConfig("ray.raylet.config"); - for (Map.Entry entry : rayletConfig.entrySet()) { + for (Map.Entry entry : rayletConfig.entrySet()) { String parameter = entry.getKey() + "," + entry.getValue().unwrapped(); rayletConfigParameters.add(parameter); } @@ -211,6 +219,10 @@ public class RayConfig { return this.jobId; } + public int getNodeManagerPort() { + return nodeManagerPort; + } + @Override public String toString() { return "RayConfig{" @@ -243,7 +255,7 @@ public class RayConfig { * 1. System properties. * 2. `ray.conf` file. * 3. `ray.default.conf` file. - */ + */ public static RayConfig create() { ConfigFactory.invalidateCaches(); Config config = ConfigFactory.systemProperties(); diff --git a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java index 7434e3890..f1bfab1f1 100644 --- a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java +++ b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java @@ -258,7 +258,7 @@ public class RunManager { String.format("--raylet_socket_name=%s", rayConfig.rayletSocketName), String.format("--store_socket_name=%s", rayConfig.objectStoreSocketName), String.format("--object_manager_port=%d", 0), // The object manager port. - String.format("--node_manager_port=%d", 0), // The node manager port. + String.format("--node_manager_port=%d", rayConfig.getNodeManagerPort()), // The node manager port. String.format("--node_ip_address=%s", rayConfig.nodeIp), String.format("--redis_address=%s", rayConfig.getRedisIp()), String.format("--redis_port=%d", rayConfig.getRedisPort()), @@ -312,6 +312,8 @@ public class RunManager { cmd.add("-Dray.raylet.socket-name=" + rayConfig.rayletSocketName); cmd.add("-Dray.object-store.socket-name=" + rayConfig.objectStoreSocketName); + cmd.add("-Dray.raylet.node-manager-port=" + rayConfig.getNodeManagerPort()); + // Config overwrite cmd.add("-Dray.redis.address=" + rayConfig.getRedisAddress()); @@ -337,7 +339,7 @@ public class RunManager { private void startObjectStore() { try (FileUtil.TempFile plasmaStoreFile = FileUtil - .getTempFileFromResource("plasma_store_server")) { + .getTempFileFromResource("external/plasma/plasma_store_server")) { plasmaStoreFile.getFile().setExecutable(true); List command = ImmutableList.of( // The plasma store executable file. diff --git a/java/runtime/src/main/java/org/ray/runtime/task/ArgumentsBuilder.java b/java/runtime/src/main/java/org/ray/runtime/task/ArgumentsBuilder.java index 07ae3dfb1..978a5a56b 100644 --- a/java/runtime/src/main/java/org/ray/runtime/task/ArgumentsBuilder.java +++ b/java/runtime/src/main/java/org/ray/runtime/task/ArgumentsBuilder.java @@ -8,6 +8,7 @@ import org.ray.api.id.ObjectId; import org.ray.api.runtime.RayRuntime; import org.ray.runtime.AbstractRayRuntime; import org.ray.runtime.RayMultiWorkerNativeRuntime; +import org.ray.runtime.generated.Common.Language; import org.ray.runtime.object.NativeRayObject; import org.ray.runtime.object.ObjectSerializer; @@ -22,10 +23,16 @@ public class ArgumentsBuilder { */ private static final int LARGEST_SIZE_PASS_BY_VALUE = 100 * 1024; + /** + * This dummy type is also defined in signature.py. Please keep it synced. + */ + private static final NativeRayObject PYTHON_DUMMY_TYPE = ObjectSerializer + .serialize("__RAY_DUMMY__".getBytes()); + /** * Convert real function arguments to task spec arguments. */ - public static List wrap(Object[] args, boolean isDirectCall) { + public static List wrap(Object[] args, Language language, boolean isDirectCall) { List ret = new ArrayList<>(); for (Object arg : args) { ObjectId id = null; @@ -48,6 +55,9 @@ public class ArgumentsBuilder { value = null; } } + if (language == Language.PYTHON) { + ret.add(FunctionArg.passByValue(PYTHON_DUMMY_TYPE)); + } if (id != null) { ret.add(FunctionArg.passByReference(id)); } else { diff --git a/java/runtime/src/main/java/org/ray/runtime/task/NativeTaskSubmitter.java b/java/runtime/src/main/java/org/ray/runtime/task/NativeTaskSubmitter.java index cae93e788..bc1083d9b 100644 --- a/java/runtime/src/main/java/org/ray/runtime/task/NativeTaskSubmitter.java +++ b/java/runtime/src/main/java/org/ray/runtime/task/NativeTaskSubmitter.java @@ -35,9 +35,9 @@ public class NativeTaskSubmitter implements TaskSubmitter { @Override public RayActor createActor(FunctionDescriptor functionDescriptor, List args, ActorCreationOptions options) { - long nativeActorHandle = nativeCreateActor(nativeCoreWorkerPointer, functionDescriptor, args, + byte[] actorId = nativeCreateActor(nativeCoreWorkerPointer, functionDescriptor, args, options); - return new NativeRayActor(nativeActorHandle); + return new NativeRayActor(nativeCoreWorkerPointer, actorId); } @Override @@ -45,7 +45,7 @@ public class NativeTaskSubmitter implements TaskSubmitter { List args, int numReturns, CallOptions options) { Preconditions.checkState(actor instanceof NativeRayActor); List returnIds = nativeSubmitActorTask(nativeCoreWorkerPointer, - ((NativeRayActor) actor).getNativeActorHandle(), functionDescriptor, args, numReturns, + actor.getId().getBytes(), functionDescriptor, args, numReturns, options); return returnIds.stream().map(ObjectId::new).collect(Collectors.toList()); } @@ -54,11 +54,11 @@ public class NativeTaskSubmitter implements TaskSubmitter { FunctionDescriptor functionDescriptor, List args, int numReturns, CallOptions callOptions); - private static native long nativeCreateActor(long nativeCoreWorkerPointer, + private static native byte[] nativeCreateActor(long nativeCoreWorkerPointer, FunctionDescriptor functionDescriptor, List args, ActorCreationOptions actorCreationOptions); private static native List nativeSubmitActorTask(long nativeCoreWorkerPointer, - long nativeActorHandle, FunctionDescriptor functionDescriptor, List args, + byte[] actorId, FunctionDescriptor functionDescriptor, List args, int numReturns, CallOptions callOptions); } diff --git a/java/runtime/src/main/java/org/ray/runtime/util/NetworkUtil.java b/java/runtime/src/main/java/org/ray/runtime/util/NetworkUtil.java index 23875aba1..8a6aca3f3 100644 --- a/java/runtime/src/main/java/org/ray/runtime/util/NetworkUtil.java +++ b/java/runtime/src/main/java/org/ray/runtime/util/NetworkUtil.java @@ -5,6 +5,7 @@ import java.io.IOException; import java.net.DatagramSocket; import java.net.Inet6Address; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.net.ServerSocket; import java.util.Enumeration; @@ -47,6 +48,19 @@ public class NetworkUtil { return "127.0.0.1"; } + public static int getUnusedPort() { + int port; + try { + ServerSocket ss = new ServerSocket(); + ss.bind(new InetSocketAddress(0)); + port = ss.getLocalPort(); + ss.close(); + } catch (Exception e) { + throw new RuntimeException("Failed to bind to an available port.", e); + } + return port; + } + public static boolean isPortAvailable(int port) { if (port < 1 || port > 65535) { throw new IllegalArgumentException("Invalid start port: " + port); diff --git a/java/runtime/src/main/resources/ray.default.conf b/java/runtime/src/main/resources/ray.default.conf index 3da6089d7..208a07b5a 100644 --- a/java/runtime/src/main/resources/ray.default.conf +++ b/java/runtime/src/main/resources/ray.default.conf @@ -84,6 +84,8 @@ ray { raylet { // RPC socket name of Raylet socket-name: /tmp/ray/sockets/raylet + // Listening port for node manager. + node-manager-port: 0 // See src/ray/ray_config_def.h for options. config { diff --git a/java/test/src/main/java/org/ray/api/test/ActorTest.java b/java/test/src/main/java/org/ray/api/test/ActorTest.java index 8b1b441e4..5dd81443f 100644 --- a/java/test/src/main/java/org/ray/api/test/ActorTest.java +++ b/java/test/src/main/java/org/ray/api/test/ActorTest.java @@ -12,8 +12,6 @@ import org.ray.api.TestUtils.LargeObject; import org.ray.api.annotation.RayRemote; import org.ray.api.exception.UnreconstructableException; import org.ray.api.id.UniqueId; -import org.ray.runtime.actor.NativeRayActor; -import org.ray.runtime.object.NativeRayObject; import org.testng.Assert; import org.testng.annotations.Test; @@ -107,14 +105,6 @@ public class ActorTest extends BaseTest { .get()); } - public void testForkingActorHandle() { - TestUtils.skipTestUnderSingleProcess(); - RayActor counter = Ray.createActor(Counter::new, 100); - Assert.assertEquals(Integer.valueOf(101), Ray.call(Counter::increaseAndGet, counter, 1).get()); - RayActor counter2 = ((NativeRayActor) counter).fork(); - Assert.assertEquals(Integer.valueOf(103), Ray.call(Counter::increaseAndGet, counter2, 2).get()); - } - public void testUnreconstructableActorObject() throws InterruptedException { TestUtils.skipTestUnderSingleProcess(); // The UnreconstructableException is created by raylet. @@ -128,9 +118,9 @@ public class ActorTest extends BaseTest { Ray.internal().free(ImmutableList.of(value.getId()), false, false); // Wait until the object is deleted, because the above free operation is async. while (true) { - NativeRayObject result = TestUtils.getRuntime().getObjectStore() - .getRaw(ImmutableList.of(value.getId()), 0).get(0); - if (result == null) { + Boolean result = TestUtils.getRuntime().getObjectStore() + .wait(ImmutableList.of(value.getId()), 1, 0).get(0); + if (!result) { break; } TimeUnit.MILLISECONDS.sleep(100); diff --git a/java/test/src/main/java/org/ray/api/test/BaseMultiLanguageTest.java b/java/test/src/main/java/org/ray/api/test/BaseMultiLanguageTest.java index 995f9588a..294430492 100644 --- a/java/test/src/main/java/org/ray/api/test/BaseMultiLanguageTest.java +++ b/java/test/src/main/java/org/ray/api/test/BaseMultiLanguageTest.java @@ -10,6 +10,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import org.ray.api.Ray; +import org.ray.runtime.util.NetworkUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.SkipException; @@ -65,6 +66,8 @@ public abstract class BaseMultiLanguageTest { } } + String nodeManagerPort = String.valueOf(NetworkUtil.getUnusedPort()); + // Start ray cluster. String workerOptions = " -classpath " + System.getProperty("java.class.path"); @@ -75,6 +78,7 @@ public abstract class BaseMultiLanguageTest { "--redis-port=6379", String.format("--plasma-store-socket-name=%s", PLASMA_STORE_SOCKET_NAME), String.format("--raylet-socket-name=%s", RAYLET_SOCKET_NAME), + String.format("--node-manager-port=%s", nodeManagerPort), "--load-code-from-local", "--include-java", "--java-worker-options=" + workerOptions @@ -94,6 +98,7 @@ public abstract class BaseMultiLanguageTest { System.setProperty("ray.redis.address", "127.0.0.1:6379"); System.setProperty("ray.object-store.socket-name", PLASMA_STORE_SOCKET_NAME); System.setProperty("ray.raylet.socket-name", RAYLET_SOCKET_NAME); + System.setProperty("ray.raylet.node-manager-port", nodeManagerPort); Ray.init(); } @@ -113,6 +118,7 @@ public abstract class BaseMultiLanguageTest { System.clearProperty("ray.redis.address"); System.clearProperty("ray.object-store.socket-name"); System.clearProperty("ray.raylet.socket-name"); + System.clearProperty("ray.raylet.node-manager-port"); // Stop ray cluster. final List stopCommand = ImmutableList.of( diff --git a/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java b/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java index 395209ad5..9a125fabf 100644 --- a/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java +++ b/java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java @@ -26,7 +26,7 @@ public class PlasmaFreeTest extends BaseTest { final boolean result = TestUtils.waitForCondition(() -> TestUtils.getRuntime().getObjectStore() - .getRaw(ImmutableList.of(helloId.getId()), 0).get(0) == null, 50); + .wait(ImmutableList.of(helloId.getId()), 1, 0).get(0) == false, 50); Assert.assertTrue(result); } diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index abc7d1bbd..8e9f6541d 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -452,7 +452,7 @@ cdef deserialize_args( c_args[i].get().GetMetadata()).to_pybytes() == RAW_BUFFER_METADATA): data = Buffer.make(c_args[i].get().GetData()) - args.append(data) + args.append(data.to_pybytes()) elif (c_args[i].get().HasMetadata() and Buffer.make( c_args[i].get().GetMetadata()).to_pybytes() == PICKLE_BUFFER_METADATA): diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index e2ef2a167..b46d9ca72 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -470,7 +470,7 @@ def stop(force, verbose): debug_operator = "| tee /dev/stderr" if verbose else "" command = ( - "kill -s {} $(ps ax -o {} | grep {} | grep -v grep {} | grep ray |" + "kill -s {} $(ps ax -o {} | grep {} | grep -v grep {} |" "awk '{{ print $1 }}') 2> /dev/null".format( # ^^ This is how you escape braces in python format string. "KILL" if force else "TERM", diff --git a/python/ray/services.py b/python/ray/services.py index d3852186f..7e5577545 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1184,6 +1184,7 @@ def start_raylet(redis_address, java_worker_command = build_java_worker_command( java_worker_options, redis_address, + node_manager_port, plasma_store_name, raylet_name, redis_password, @@ -1251,6 +1252,7 @@ def start_raylet(redis_address, def build_java_worker_command( java_worker_options, redis_address, + node_manager_port, plasma_store_name, raylet_name, redis_password, @@ -1275,6 +1277,7 @@ def build_java_worker_command( if redis_address is not None: command += "-Dray.redis.address={} ".format(redis_address) + command += "-Dray.raylet.node-manager-port={} ".format(node_manager_port) if plasma_store_name is not None: command += ( diff --git a/python/ray/signature.py b/python/ray/signature.py index 2e274d457..daaacf0b1 100644 --- a/python/ray/signature.py +++ b/python/ray/signature.py @@ -37,7 +37,9 @@ Attributes: by 'functools.partial'. """ -DUMMY_TYPE = "__RAY_DUMMY__" +# This dummy type is also defined in ArgumentsBuilder.java. Please keep it +# synced. +DUMMY_TYPE = b"__RAY_DUMMY__" def get_signature(func): diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 5d96dae98..3506d42df 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -5,6 +5,7 @@ namespace ray { +absl::Mutex TaskSpecification::mutex_; std::unordered_map TaskSpecification::sched_cls_to_id_; std::unordered_map @@ -13,6 +14,7 @@ int TaskSpecification::next_sched_id_; SchedulingClassDescriptor &TaskSpecification::GetSchedulingClassDescriptor( SchedulingClass id) { + absl::MutexLock lock(&mutex_); auto it = sched_id_to_cls_.find(id); RAY_CHECK(it != sched_id_to_cls_.end()) << "invalid id: " << id; return it->second; @@ -30,6 +32,7 @@ void TaskSpecification::ComputeResources() { // Map the scheduling class descriptor to an integer for performance. auto sched_cls = std::make_pair(GetRequiredResources(), FunctionDescriptor()); + absl::MutexLock lock(&mutex_); auto it = sched_cls_to_id_.find(sched_cls); if (it == sched_cls_to_id_.end()) { sched_cls_id_ = ++next_sched_id_; diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 0c14912d0..2c41d6fee 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -6,6 +6,7 @@ #include #include +#include "absl/synchronization/mutex.h" #include "ray/common/grpc_util.h" #include "ray/common/id.h" #include "ray/common/task/scheduling_resources.h" @@ -180,10 +181,15 @@ class TaskSpecification : public MessageWrapper { /// Cached scheduling class of this task. SchedulingClass sched_cls_id_; + /// Below static fields could be mutated in `ComputeResources` concurrently due to + /// multi-threading, we need a mutex to protect it. + static absl::Mutex mutex_; /// Keep global static id mappings for SchedulingClass for performance. - static std::unordered_map sched_cls_to_id_; - static std::unordered_map sched_id_to_cls_; - static int next_sched_id_; + static std::unordered_map sched_cls_to_id_ + GUARDED_BY(mutex_); + static std::unordered_map sched_id_to_cls_ + GUARDED_BY(mutex_); + static int next_sched_id_ GUARDED_BY(mutex_); }; } // namespace ray diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index ce9851546..c17d569a2 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -260,6 +260,7 @@ void CoreWorker::SetCurrentTaskId(const TaskID &task_id) { main_thread_task_id_ = task_id; // Clear all actor handles at the end of each non-actor task. if (actor_id_.IsNil() && task_id.IsNil()) { + absl::MutexLock lock(&actor_handles_mutex_); for (const auto &handle : actor_handles_) { RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(handle.first, nullptr)); } @@ -722,13 +723,16 @@ Status CoreWorker::SerializeActorHandle(const ActorID &actor_id, } bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle) { + absl::MutexLock lock(&actor_handles_mutex_); const auto &actor_id = actor_handle->GetActorID(); + auto inserted = actor_handles_.emplace(actor_id, std::move(actor_handle)).second; if (inserted) { // Register a callback to handle actor notifications. auto actor_notification_callback = [this](const ActorID &actor_id, const gcs::ActorTableData &actor_data) { if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) { + absl::MutexLock lock(&actor_handles_mutex_); auto it = actor_handles_.find(actor_id); RAY_CHECK(it != actor_handles_.end()); if (it->second->IsDirectCallActor()) { @@ -761,6 +765,7 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle) { Status CoreWorker::GetActorHandle(const ActorID &actor_id, ActorHandle **actor_handle) const { + absl::MutexLock lock(&actor_handles_mutex_); auto it = actor_handles_.find(actor_id); if (it == actor_handles_.end()) { return Status::Invalid("Handle for actor does not exist"); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index c9e03967a..16c024fe1 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -362,6 +362,13 @@ class CoreWorker { const std::vector> &metadatas, std::vector> *return_objects); + /// Get a handle to an actor. + /// + /// \param[in] actor_id The actor handle to get. + /// \param[out] actor_handle A handle to the requested actor. + /// \return Status::Invalid if we don't have this actor handle. + Status GetActorHandle(const ActorID &actor_id, ActorHandle **actor_handle) const; + /// /// The following methods are handlers for the core worker's gRPC server, which follow /// a macro-generated call convention. These are executed on the io_service_ and @@ -428,14 +435,6 @@ class CoreWorker { /// to the same actor. bool AddActorHandle(std::unique_ptr actor_handle); - /// Get a handle to an actor. This asserts that the worker actually has this - /// handle. - /// - /// \param[in] actor_id The actor handle to get. - /// \param[out] actor_handle A handle to the requested actor. - /// \return Status::Invalid if we don't have this actor handle. - Status GetActorHandle(const ActorID &actor_id, ActorHandle **actor_handle) const; - /// /// Private methods related to task execution. Should not be used by driver processes. /// @@ -559,8 +558,13 @@ class CoreWorker { // Interface to submit non-actor tasks directly to leased workers. std::unique_ptr direct_task_submitter_; + /// The `actor_handles_` field could be mutated concurrently due to multi-threading, we + /// need a mutex to protect it. + mutable absl::Mutex actor_handles_mutex_; + /// Map from actor ID to a handle to that actor. - absl::flat_hash_map> actor_handles_; + absl::flat_hash_map> actor_handles_ + GUARDED_BY(actor_handles_mutex_); /// Resolve local and remote dependencies for actor creation. std::unique_ptr resolver_; diff --git a/src/ray/core_worker/lib/java/jni_utils.h b/src/ray/core_worker/lib/java/jni_utils.h index 365b885c0..e5de78931 100644 --- a/src/ray/core_worker/lib/java/jni_utils.h +++ b/src/ray/core_worker/lib/java/jni_utils.h @@ -152,6 +152,8 @@ class JavaByteArrayBuffer : public ray::Buffer { bool OwnsData() const override { return true; } + bool IsPlasmaBuffer() const { return false; } + ~JavaByteArrayBuffer() { env_->ReleaseByteArrayElements(java_byte_array_, native_bytes_, JNI_ABORT); } diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc index e19cc9e5d..40ee3351d 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.cc @@ -23,19 +23,15 @@ inline ray::gcs::GcsClientOptions ToGcsClientOptions(JNIEnv *env, extern "C" { #endif -/* - * Class: org_ray_runtime_RayNativeRuntime - * Method: nativeInitCoreWorker - * Signature: - * (ILjava/lang/String;Ljava/lang/String;[BLorg/ray/runtime/gcs/GcsClientOptions;)J - */ JNIEXPORT jlong JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeInitCoreWorker( JNIEnv *env, jclass, jint workerMode, jstring storeSocket, jstring rayletSocket, - jbyteArray jobId, jobject gcsClientOptions) { + jstring nodeIpAddress, jint nodeManagerPort, jbyteArray jobId, + jobject gcsClientOptions) { auto native_store_socket = JavaStringToNativeString(env, storeSocket); auto native_raylet_socket = JavaStringToNativeString(env, rayletSocket); auto job_id = JavaByteArrayToId(env, jobId); auto gcs_client_options = ToGcsClientOptions(env, gcsClientOptions); + auto node_ip_address = JavaStringToNativeString(env, nodeIpAddress); auto task_execution_callback = [](ray::TaskType task_type, const ray::RayFunction &ray_function, @@ -74,8 +70,8 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeInitCoreWork try { auto core_worker = new ray::CoreWorker( static_cast(workerMode), ::Language::JAVA, native_store_socket, - native_raylet_socket, job_id, gcs_client_options, /*log_dir=*/"", - /*node_ip_address=*/"", task_execution_callback); + native_raylet_socket, job_id, gcs_client_options, /*log_dir=*/"", node_ip_address, + nodeManagerPort, task_execution_callback); return reinterpret_cast(core_worker); } catch (const std::exception &e) { std::ostringstream oss; @@ -85,11 +81,6 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeInitCoreWork } } -/* - * Class: org_ray_runtime_RayNativeRuntime - * Method: nativeRunTaskExecutor - * Signature: (JLorg/ray/runtime/task/TaskExecutor;)V - */ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeRunTaskExecutor( JNIEnv *env, jclass o, jlong nativeCoreWorkerPointer, jobject javaTaskExecutor) { local_env = env; @@ -100,11 +91,6 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeRunTaskExecut local_java_task_executor = nullptr; } -/* - * Class: org_ray_runtime_RayNativeRuntime - * Method: nativeDestroyCoreWorker - * Signature: (J)V - */ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeDestroyCoreWorker( JNIEnv *env, jclass o, jlong nativeCoreWorkerPointer) { auto core_worker = reinterpret_cast(nativeCoreWorkerPointer); @@ -112,11 +98,6 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeDestroyCoreWo delete core_worker; } -/* - * Class: org_ray_runtime_RayNativeRuntime - * Method: nativeSetup - * Signature: (Ljava/lang/String;)V - */ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeSetup(JNIEnv *env, jclass, jstring logDir) { @@ -125,21 +106,11 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeSetup(JNIEnv // TODO (kfstorm): If we add InstallFailureSignalHandler here, Java test may crash. } -/* - * Class: org_ray_runtime_RayNativeRuntime - * Method: nativeShutdownHook - * Signature: ()V - */ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeShutdownHook(JNIEnv *, jclass) { ray::RayLog::ShutDownRayLog(); } -/* - * Class: org_ray_runtime_RayNativeRuntime - * Method: nativeSetResource - * Signature: (JLjava/lang/String;D[B)V - */ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeSetResource( JNIEnv *env, jclass, jlong nativeCoreWorkerPointer, jstring resourceName, jdouble capacity, jbyteArray nodeId) { diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.h b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.h index 480564640..48bc81f04 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.h +++ b/src/ray/core_worker/lib/java/org_ray_runtime_RayNativeRuntime.h @@ -11,10 +11,10 @@ extern "C" { * Class: org_ray_runtime_RayNativeRuntime * Method: nativeInitCoreWorker * Signature: - * (ILjava/lang/String;Ljava/lang/String;[BLorg/ray/runtime/gcs/GcsClientOptions;)J + * (ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;I[BLorg/ray/runtime/gcs/GcsClientOptions;)J */ JNIEXPORT jlong JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeInitCoreWorker( - JNIEnv *, jclass, jint, jstring, jstring, jbyteArray, jobject); + JNIEnv *, jclass, jint, jstring, jstring, jstring, jint, jbyteArray, jobject); /* * Class: org_ray_runtime_RayNativeRuntime diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.cc b/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.cc index 41715f3b6..e4379f98e 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.cc @@ -2,119 +2,71 @@ #include #include "ray/common/id.h" #include "ray/core_worker/common.h" +#include "ray/core_worker/core_worker.h" #include "ray/core_worker/lib/java/jni_utils.h" -#include "ray/core_worker/task_interface.h" -inline ray::ActorHandle &GetActorHandle(jlong nativeActorHandle) { - return *(reinterpret_cast(nativeActorHandle)); +inline ray::CoreWorker &GetCoreWorker(jlong nativeCoreWorkerPointer) { + return *reinterpret_cast(nativeCoreWorkerPointer); } #ifdef __cplusplus extern "C" { #endif -/* - * Class: org_ray_runtime_actor_NativeRayActor - * Method: nativeFork - * Signature: (J)J - */ -JNIEXPORT jlong JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeFork( - JNIEnv *env, jclass o, jlong nativeActorHandle) { - return reinterpret_cast(GetActorHandle(nativeActorHandle).Fork().release()); -} - -/* - * Class: org_ray_runtime_actor_NativeRayActor - * Method: nativeGetActorId - * Signature: (J)[B - */ -JNIEXPORT jbyteArray JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeGetActorId( - JNIEnv *env, jclass o, jlong nativeActorHandle) { - return IdToJavaByteArray(env, - GetActorHandle(nativeActorHandle).GetActorID()); -} - -/* - * Class: org_ray_runtime_actor_NativeRayActor - * Method: nativeGetActorHandleId - * Signature: (J)[B - */ -JNIEXPORT jbyteArray JNICALL -Java_org_ray_runtime_actor_NativeRayActor_nativeGetActorHandleId( - JNIEnv *env, jclass o, jlong nativeActorHandle) { - return IdToJavaByteArray( - env, GetActorHandle(nativeActorHandle).GetActorHandleID()); -} - -/* - * Class: org_ray_runtime_actor_NativeRayActor - * Method: nativeGetLanguage - * Signature: (J)I - */ JNIEXPORT jint JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeGetLanguage( - JNIEnv *env, jclass o, jlong nativeActorHandle) { - return (jint)GetActorHandle(nativeActorHandle).ActorLanguage(); + JNIEnv *env, jclass o, jlong nativeCoreWorkerPointer, jbyteArray actorId) { + auto actor_id = JavaByteArrayToId(env, actorId); + ray::ActorHandle *native_actor_handle = nullptr; + auto status = GetCoreWorker(nativeCoreWorkerPointer) + .GetActorHandle(actor_id, &native_actor_handle); + THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (jint)0); + return (jint)native_actor_handle->ActorLanguage(); } -/* - * Class: org_ray_runtime_actor_NativeRayActor - * Method: nativeIsDirectCallActor - * Signature: (J)Z - */ JNIEXPORT jboolean JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeIsDirectCallActor( - JNIEnv *env, jclass o, jlong nativeActorHandle) { - return GetActorHandle(nativeActorHandle).IsDirectCallActor(); + JNIEnv *env, jclass o, jlong nativeCoreWorkerPointer, jbyteArray actorId) { + auto actor_id = JavaByteArrayToId(env, actorId); + ray::ActorHandle *native_actor_handle = nullptr; + auto status = GetCoreWorker(nativeCoreWorkerPointer) + .GetActorHandle(actor_id, &native_actor_handle); + THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, false); + return native_actor_handle->IsDirectCallActor(); } -/* - * Class: org_ray_runtime_actor_NativeRayActor - * Method: nativeGetActorCreationTaskFunctionDescriptor - * Signature: (J)Ljava/util/List; - */ JNIEXPORT jobject JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeGetActorCreationTaskFunctionDescriptor( - JNIEnv *env, jclass o, jlong nativeActorHandle) { - return NativeStringVectorToJavaStringList( - env, GetActorHandle(nativeActorHandle).ActorCreationTaskFunctionDescriptor()); + JNIEnv *env, jclass o, jlong nativeCoreWorkerPointer, jbyteArray actorId) { + auto actor_id = JavaByteArrayToId(env, actorId); + ray::ActorHandle *native_actor_handle = nullptr; + auto status = GetCoreWorker(nativeCoreWorkerPointer) + .GetActorHandle(actor_id, &native_actor_handle); + THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); + auto function_descriptor = native_actor_handle->ActorCreationTaskFunctionDescriptor(); + return NativeStringVectorToJavaStringList(env, function_descriptor); } -/* - * Class: org_ray_runtime_actor_NativeRayActor - * Method: nativeSerialize - * Signature: (J)[B - */ JNIEXPORT jbyteArray JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeSerialize( - JNIEnv *env, jclass o, jlong nativeActorHandle) { + JNIEnv *env, jclass o, jlong nativeCoreWorkerPointer, jbyteArray actorId) { + auto actor_id = JavaByteArrayToId(env, actorId); std::string output; - GetActorHandle(nativeActorHandle).Serialize(&output); + ray::Status status = + GetCoreWorker(nativeCoreWorkerPointer).SerializeActorHandle(actor_id, &output); jbyteArray bytes = env->NewByteArray(output.size()); env->SetByteArrayRegion(bytes, 0, output.size(), reinterpret_cast(output.c_str())); return bytes; } -/* - * Class: org_ray_runtime_actor_NativeRayActor - * Method: nativeDeserialize - * Signature: ([B)J - */ -JNIEXPORT jlong JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeDeserialize( - JNIEnv *env, jclass o, jbyteArray data) { +JNIEXPORT jbyteArray JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeDeserialize( + JNIEnv *env, jclass o, jlong nativeCoreWorkerPointer, jbyteArray data) { auto buffer = JavaByteArrayToNativeBuffer(env, data); RAY_CHECK(buffer->Size() > 0); auto binary = std::string(reinterpret_cast(buffer->Data()), buffer->Size()); - return reinterpret_cast(new ray::ActorHandle(binary, TaskID::Nil())); -} + auto actor_id = + GetCoreWorker(nativeCoreWorkerPointer).DeserializeAndRegisterActorHandle(binary); -/* - * Class: org_ray_runtime_actor_NativeRayActor - * Method: nativeFree - * Signature: (J)V - */ -JNIEXPORT void JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeFree( - JNIEnv *env, jclass o, jlong nativeActorHandle) { - delete &GetActorHandle(nativeActorHandle); + return IdToJavaByteArray(env, actor_id); } #ifdef __cplusplus diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.h b/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.h index 245064fcf..8f75e3a82 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.h +++ b/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.h @@ -7,80 +7,47 @@ #ifdef __cplusplus extern "C" { #endif -/* - * Class: org_ray_runtime_actor_NativeRayActor - * Method: nativeFork - * Signature: (J)J - */ -JNIEXPORT jlong JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeFork(JNIEnv *, - jclass, - jlong); - -/* - * Class: org_ray_runtime_actor_NativeRayActor - * Method: nativeGetActorId - * Signature: (J)[B - */ -JNIEXPORT jbyteArray JNICALL -Java_org_ray_runtime_actor_NativeRayActor_nativeGetActorId(JNIEnv *, jclass, jlong); - -/* - * Class: org_ray_runtime_actor_NativeRayActor - * Method: nativeGetActorHandleId - * Signature: (J)[B - */ -JNIEXPORT jbyteArray JNICALL -Java_org_ray_runtime_actor_NativeRayActor_nativeGetActorHandleId(JNIEnv *, jclass, jlong); - /* * Class: org_ray_runtime_actor_NativeRayActor * Method: nativeGetLanguage - * Signature: (J)I + * Signature: (J[B)I */ -JNIEXPORT jint JNICALL -Java_org_ray_runtime_actor_NativeRayActor_nativeGetLanguage(JNIEnv *, jclass, jlong); +JNIEXPORT jint JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeGetLanguage( + JNIEnv *, jclass, jlong, jbyteArray); /* * Class: org_ray_runtime_actor_NativeRayActor * Method: nativeIsDirectCallActor - * Signature: (J)Z + * Signature: (J[B)Z */ JNIEXPORT jboolean JNICALL -Java_org_ray_runtime_actor_NativeRayActor_nativeIsDirectCallActor(JNIEnv *, jclass, jlong); +Java_org_ray_runtime_actor_NativeRayActor_nativeIsDirectCallActor(JNIEnv *, jclass, jlong, + jbyteArray); /* * Class: org_ray_runtime_actor_NativeRayActor * Method: nativeGetActorCreationTaskFunctionDescriptor - * Signature: (J)Ljava/util/List; + * Signature: (J[B)Ljava/util/List; */ JNIEXPORT jobject JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeGetActorCreationTaskFunctionDescriptor( - JNIEnv *, jclass, jlong); + JNIEnv *, jclass, jlong, jbyteArray); /* * Class: org_ray_runtime_actor_NativeRayActor * Method: nativeSerialize - * Signature: (J)[B + * Signature: (J[B)[B */ -JNIEXPORT jbyteArray JNICALL -Java_org_ray_runtime_actor_NativeRayActor_nativeSerialize(JNIEnv *, jclass, jlong); +JNIEXPORT jbyteArray JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeSerialize( + JNIEnv *, jclass, jlong, jbyteArray); /* * Class: org_ray_runtime_actor_NativeRayActor * Method: nativeDeserialize - * Signature: ([B)J + * Signature: (J[B)[B */ -JNIEXPORT jlong JNICALL -Java_org_ray_runtime_actor_NativeRayActor_nativeDeserialize(JNIEnv *, jclass, jbyteArray); - -/* - * Class: org_ray_runtime_actor_NativeRayActor - * Method: nativeFree - * Signature: (J)V - */ -JNIEXPORT void JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeFree(JNIEnv *, - jclass, - jlong); +JNIEXPORT jbyteArray JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeDeserialize( + JNIEnv *, jclass, jlong, jbyteArray); #ifdef __cplusplus } diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_context_NativeWorkerContext.cc b/src/ray/core_worker/lib/java/org_ray_runtime_context_NativeWorkerContext.cc index b7e791044..49e3e3118 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_context_NativeWorkerContext.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_context_NativeWorkerContext.cc @@ -13,11 +13,6 @@ inline ray::WorkerContext &GetWorkerContextFromPointer(jlong nativeCoreWorkerPoi extern "C" { #endif -/* - * Class: org_ray_runtime_context_NativeWorkerContext - * Method: nativeGetCurrentTaskType - * Signature: (J)I - */ JNIEXPORT jint JNICALL Java_org_ray_runtime_context_NativeWorkerContext_nativeGetCurrentTaskType( JNIEnv *env, jclass, jlong nativeCoreWorkerPointer) { @@ -26,11 +21,6 @@ Java_org_ray_runtime_context_NativeWorkerContext_nativeGetCurrentTaskType( return static_cast(task_spec->GetMessage().type()); } -/* - * Class: org_ray_runtime_context_NativeWorkerContext - * Method: nativeGetCurrentTaskId - * Signature: (J)Ljava/nio/ByteBuffer; - */ JNIEXPORT jobject JNICALL Java_org_ray_runtime_context_NativeWorkerContext_nativeGetCurrentTaskId( JNIEnv *env, jclass, jlong nativeCoreWorkerPointer) { @@ -39,11 +29,6 @@ Java_org_ray_runtime_context_NativeWorkerContext_nativeGetCurrentTaskId( return IdToJavaByteBuffer(env, task_id); } -/* - * Class: org_ray_runtime_context_NativeWorkerContext - * Method: nativeGetCurrentJobId - * Signature: (J)Ljava/nio/ByteBuffer; - */ JNIEXPORT jobject JNICALL Java_org_ray_runtime_context_NativeWorkerContext_nativeGetCurrentJobId( JNIEnv *env, jclass, jlong nativeCoreWorkerPointer) { @@ -52,11 +37,6 @@ Java_org_ray_runtime_context_NativeWorkerContext_nativeGetCurrentJobId( return IdToJavaByteBuffer(env, job_id); } -/* - * Class: org_ray_runtime_context_NativeWorkerContext - * Method: nativeGetCurrentWorkerId - * Signature: (J)Ljava/nio/ByteBuffer; - */ JNIEXPORT jobject JNICALL Java_org_ray_runtime_context_NativeWorkerContext_nativeGetCurrentWorkerId( JNIEnv *env, jclass, jlong nativeCoreWorkerPointer) { @@ -65,11 +45,6 @@ Java_org_ray_runtime_context_NativeWorkerContext_nativeGetCurrentWorkerId( return IdToJavaByteBuffer(env, worker_id); } -/* - * Class: org_ray_runtime_context_NativeWorkerContext - * Method: nativeGetCurrentActorId - * Signature: (J)Ljava/nio/ByteBuffer; - */ JNIEXPORT jobject JNICALL Java_org_ray_runtime_context_NativeWorkerContext_nativeGetCurrentActorId( JNIEnv *env, jclass, jlong nativeCoreWorkerPointer) { diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_object_NativeObjectStore.cc b/src/ray/core_worker/lib/java/org_ray_runtime_object_NativeObjectStore.cc index c6b256f95..9bcde92ab 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_object_NativeObjectStore.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_object_NativeObjectStore.cc @@ -5,20 +5,10 @@ #include "ray/core_worker/core_worker.h" #include "ray/core_worker/lib/java/jni_utils.h" -inline ray::CoreWorkerObjectInterface &GetObjectInterfaceFromPointer( - jlong nativeCoreWorkerPointer) { - return reinterpret_cast(nativeCoreWorkerPointer)->Objects(); -} - #ifdef __cplusplus extern "C" { #endif -/* - * Class: org_ray_runtime_object_NativeObjectStore - * Method: nativePut - * Signature: (JLorg/ray/runtime/object/NativeRayObject;)[B - */ JNIEXPORT jbyteArray JNICALL Java_org_ray_runtime_object_NativeObjectStore_nativePut__JLorg_ray_runtime_object_NativeRayObject_2( JNIEnv *env, jclass, jlong nativeCoreWorkerPointer, jobject obj) { @@ -26,16 +16,11 @@ Java_org_ray_runtime_object_NativeObjectStore_nativePut__JLorg_ray_runtime_objec RAY_CHECK(ray_object != nullptr); ray::ObjectID object_id; auto status = reinterpret_cast(nativeCoreWorkerPointer) - .Put(*ray_object, &object_id); + ->Put(*ray_object, &object_id); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); return IdToJavaByteArray(env, object_id); } -/* - * Class: org_ray_runtime_object_NativeObjectStore - * Method: nativePut - * Signature: (J[BLorg/ray/runtime/object/NativeRayObject;)V - */ JNIEXPORT void JNICALL Java_org_ray_runtime_object_NativeObjectStore_nativePut__J_3BLorg_ray_runtime_object_NativeRayObject_2( JNIEnv *env, jclass, jlong nativeCoreWorkerPointer, jbyteArray objectId, @@ -44,15 +29,10 @@ Java_org_ray_runtime_object_NativeObjectStore_nativePut__J_3BLorg_ray_runtime_ob auto ray_object = JavaNativeRayObjectToNativeRayObject(env, obj); RAY_CHECK(ray_object != nullptr); auto status = reinterpret_cast(nativeCoreWorkerPointer) - .Put(*ray_object, object_id); + ->Put(*ray_object, object_id); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0); } -/* - * Class: org_ray_runtime_object_NativeObjectStore - * Method: nativeGet - * Signature: (JLjava/util/List;J)Ljava/util/List; - */ JNIEXPORT jobject JNICALL Java_org_ray_runtime_object_NativeObjectStore_nativeGet( JNIEnv *env, jclass, jlong nativeCoreWorkerPointer, jobject ids, jlong timeoutMs) { std::vector object_ids; @@ -62,17 +42,12 @@ JNIEXPORT jobject JNICALL Java_org_ray_runtime_object_NativeObjectStore_nativeGe }); std::vector> results; auto status = reinterpret_cast(nativeCoreWorkerPointer) - .Get(object_ids, (int64_t)timeoutMs, &results); + ->Get(object_ids, (int64_t)timeoutMs, &results); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); return NativeVectorToJavaList>( env, results, NativeRayObjectToJavaNativeRayObject); } -/* - * Class: org_ray_runtime_object_NativeObjectStore - * Method: nativeWait - * Signature: (JLjava/util/List;IJ)Ljava/util/List; - */ JNIEXPORT jobject JNICALL Java_org_ray_runtime_object_NativeObjectStore_nativeWait( JNIEnv *env, jclass, jlong nativeCoreWorkerPointer, jobject objectIds, jint numObjects, jlong timeoutMs) { @@ -83,18 +58,13 @@ JNIEXPORT jobject JNICALL Java_org_ray_runtime_object_NativeObjectStore_nativeWa }); std::vector results; auto status = reinterpret_cast(nativeCoreWorkerPointer) - .Wait(object_ids, (int)numObjects, (int64_t)timeoutMs, &results); + ->Wait(object_ids, (int)numObjects, (int64_t)timeoutMs, &results); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); return NativeVectorToJavaList(env, results, [](JNIEnv *env, const bool &item) { return env->NewObject(java_boolean_class, java_boolean_init, (jboolean)item); }); } -/* - * Class: org_ray_runtime_object_NativeObjectStore - * Method: nativeDelete - * Signature: (JLjava/util/List;ZZ)V - */ JNIEXPORT void JNICALL Java_org_ray_runtime_object_NativeObjectStore_nativeDelete( JNIEnv *env, jclass, jlong nativeCoreWorkerPointer, jobject objectIds, jboolean localOnly, jboolean deleteCreatingTasks) { @@ -104,7 +74,7 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_object_NativeObjectStore_nativeDelet return JavaByteArrayToId(env, static_cast(id)); }); auto status = reinterpret_cast(nativeCoreWorkerPointer) - .Delete(object_ids, (bool)localOnly, (bool)deleteCreatingTasks); + ->Delete(object_ids, (bool)localOnly, (bool)deleteCreatingTasks); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0); } diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskExecutor.cc b/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskExecutor.cc index 8658c2f8a..86229cfac 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskExecutor.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskExecutor.cc @@ -12,11 +12,6 @@ extern "C" { using ray::ClientID; -/* - * Class: org_ray_runtime_task_NativeTaskExecutor - * Method: nativePrepareCheckpoint - * Signature: (J)[B - */ JNIEXPORT jbyteArray JNICALL Java_org_ray_runtime_task_NativeTaskExecutor_nativePrepareCheckpoint( JNIEnv *env, jclass, jlong nativeCoreWorkerPointer) { @@ -25,8 +20,8 @@ Java_org_ray_runtime_task_NativeTaskExecutor_nativePrepareCheckpoint( const auto &task_spec = core_worker.GetWorkerContext().GetCurrentTask(); RAY_CHECK(task_spec->IsActorTask()); ActorCheckpointID checkpoint_id; - auto status = core_worker.GetRayletClient().PrepareActorCheckpoint( - actor_id, checkpoint_id); + auto status = + core_worker.GetRayletClient().PrepareActorCheckpoint(actor_id, checkpoint_id); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); jbyteArray result = env->NewByteArray(checkpoint_id.Size()); env->SetByteArrayRegion(result, 0, checkpoint_id.Size(), @@ -34,11 +29,6 @@ Java_org_ray_runtime_task_NativeTaskExecutor_nativePrepareCheckpoint( return result; } -/* - * Class: org_ray_runtime_task_NativeTaskExecutor - * Method: nativeNotifyActorResumedFromCheckpoint - * Signature: (J[B)V - */ JNIEXPORT void JNICALL Java_org_ray_runtime_task_NativeTaskExecutor_nativeNotifyActorResumedFromCheckpoint( JNIEnv *env, jclass, jlong nativeCoreWorkerPointer, jbyteArray checkpointId) { diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc index 129a49aa9..632f8db96 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc @@ -4,11 +4,9 @@ #include "ray/core_worker/common.h" #include "ray/core_worker/core_worker.h" #include "ray/core_worker/lib/java/jni_utils.h" -#include "ray/core_worker/task_interface.h" -inline ray::CoreWorkerTaskInterface &GetTaskInterfaceFromPointer( - jlong nativeCoreWorkerPointer) { - return reinterpret_cast(nativeCoreWorkerPointer)->Tasks(); +inline ray::CoreWorker &GetCoreWorker(jlong nativeCoreWorkerPointer) { + return *reinterpret_cast(nativeCoreWorkerPointer); } inline ray::RayFunction ToRayFunction(JNIEnv *env, jobject functionDescriptor) { @@ -70,7 +68,7 @@ inline ray::TaskOptions ToTaskOptions(JNIEnv *env, jint numReturns, jobject call resources = ToResources(env, java_resources); } - ray::TaskOptions task_options{numReturns, resources}; + ray::TaskOptions task_options{numReturns, /*is_direct_call=*/false, resources}; return task_options; } @@ -100,22 +98,22 @@ inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env, java_actor_creation_options_default_use_direct_call); } - ray::ActorCreationOptions action_creation_options{ - static_cast(max_reconstructions), use_direct_call, resources, resources, - dynamic_worker_options}; - return action_creation_options; + ray::ActorCreationOptions actor_creation_options{ + static_cast(max_reconstructions), + use_direct_call, + /*max_concurrency=*/1, + resources, + resources, + dynamic_worker_options, + /*is_detached=*/false, + /*is_asyncio=*/false}; + return actor_creation_options; } #ifdef __cplusplus extern "C" { #endif -/* - * Class: org_ray_runtime_task_NativeTaskSubmitter - * Method: nativeSubmitTask - * Signature: - * (JLorg/ray/runtime/functionmanager/FunctionDescriptor;Ljava/util/List;ILorg/ray/api/options/CallOptions;)Ljava/util/List; - */ JNIEXPORT jobject JNICALL Java_org_ray_runtime_task_NativeTaskSubmitter_nativeSubmitTask( JNIEnv *env, jclass p, jlong nativeCoreWorkerPointer, jobject functionDescriptor, jobject args, jint numReturns, jobject callOptions) { @@ -124,55 +122,44 @@ JNIEXPORT jobject JNICALL Java_org_ray_runtime_task_NativeTaskSubmitter_nativeSu auto task_options = ToTaskOptions(env, numReturns, callOptions); std::vector return_ids; - auto status = GetTaskInterfaceFromPointer(nativeCoreWorkerPointer) - .SubmitTask(ray_function, task_args, task_options, &return_ids); + auto status = GetCoreWorker(nativeCoreWorkerPointer) + .SubmitTask(ray_function, task_args, task_options, &return_ids, /*max_retries=*/1); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); return NativeIdVectorToJavaByteArrayList(env, return_ids); } -/* - * Class: org_ray_runtime_task_NativeTaskSubmitter - * Method: nativeCreateActor - * Signature: - * (JLorg/ray/runtime/functionmanager/FunctionDescriptor;Ljava/util/List;Lorg/ray/api/options/ActorCreationOptions;)J - */ -JNIEXPORT jlong JNICALL Java_org_ray_runtime_task_NativeTaskSubmitter_nativeCreateActor( +JNIEXPORT jbyteArray JNICALL +Java_org_ray_runtime_task_NativeTaskSubmitter_nativeCreateActor( JNIEnv *env, jclass p, jlong nativeCoreWorkerPointer, jobject functionDescriptor, jobject args, jobject actorCreationOptions) { auto ray_function = ToRayFunction(env, functionDescriptor); auto task_args = ToTaskArgs(env, args); auto actor_creation_options = ToActorCreationOptions(env, actorCreationOptions); - std::unique_ptr actor_handle; + ray::ActorID actor_id; auto status = - GetTaskInterfaceFromPointer(nativeCoreWorkerPointer) - .CreateActor(ray_function, task_args, actor_creation_options, &actor_handle); + GetCoreWorker(nativeCoreWorkerPointer) + .CreateActor(ray_function, task_args, actor_creation_options, &actor_id); - THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, 0); - return reinterpret_cast(actor_handle.release()); + THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); + return IdToJavaByteArray(env, actor_id); } -/* - * Class: org_ray_runtime_task_NativeTaskSubmitter - * Method: nativeSubmitActorTask - * Signature: - * (JJLorg/ray/runtime/functionmanager/FunctionDescriptor;Ljava/util/List;ILorg/ray/api/options/CallOptions;)Ljava/util/List; - */ JNIEXPORT jobject JNICALL Java_org_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask( - JNIEnv *env, jclass p, jlong nativeCoreWorkerPointer, jlong nativeActorHandle, + JNIEnv *env, jclass p, jlong nativeCoreWorkerPointer, jbyteArray actorId, jobject functionDescriptor, jobject args, jint numReturns, jobject callOptions) { - auto &actor_handle = *(reinterpret_cast(nativeActorHandle)); + auto actor_id = JavaByteArrayToId(env, actorId); auto ray_function = ToRayFunction(env, functionDescriptor); auto task_args = ToTaskArgs(env, args); auto task_options = ToTaskOptions(env, numReturns, callOptions); std::vector return_ids; - auto status = GetTaskInterfaceFromPointer(nativeCoreWorkerPointer) - .SubmitActorTask(actor_handle, ray_function, task_args, task_options, - &return_ids); + auto status = + GetCoreWorker(nativeCoreWorkerPointer) + .SubmitActorTask(actor_id, ray_function, task_args, task_options, &return_ids); THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); return NativeIdVectorToJavaByteArrayList(env, return_ids); diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.h b/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.h index 908b5eee4..22b6337c2 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.h +++ b/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.h @@ -20,22 +20,24 @@ JNIEXPORT jobject JNICALL Java_org_ray_runtime_task_NativeTaskSubmitter_nativeSu * Class: org_ray_runtime_task_NativeTaskSubmitter * Method: nativeCreateActor * Signature: - * (JLorg/ray/runtime/functionmanager/FunctionDescriptor;Ljava/util/List;Lorg/ray/api/options/ActorCreationOptions;)J + * (JLorg/ray/runtime/functionmanager/FunctionDescriptor;Ljava/util/List;Lorg/ray/api/options/ActorCreationOptions;)[B */ -JNIEXPORT jlong JNICALL Java_org_ray_runtime_task_NativeTaskSubmitter_nativeCreateActor( - JNIEnv *, jclass, jlong, jobject, jobject, jobject); +JNIEXPORT jbyteArray JNICALL +Java_org_ray_runtime_task_NativeTaskSubmitter_nativeCreateActor(JNIEnv *, jclass, jlong, + jobject, jobject, + jobject); /* * Class: org_ray_runtime_task_NativeTaskSubmitter * Method: nativeSubmitActorTask * Signature: - * (JJLorg/ray/runtime/functionmanager/FunctionDescriptor;Ljava/util/List;ILorg/ray/api/options/CallOptions;)Ljava/util/List; + * (J[BLorg/ray/runtime/functionmanager/FunctionDescriptor;Ljava/util/List;ILorg/ray/api/options/CallOptions;)Ljava/util/List; */ JNIEXPORT jobject JNICALL Java_org_ray_runtime_task_NativeTaskSubmitter_nativeSubmitActorTask(JNIEnv *, jclass, - jlong, jlong, jobject, - jobject, jint, - jobject); + jlong, jbyteArray, + jobject, jobject, + jint, jobject); #ifdef __cplusplus }