diff --git a/doc/source/cross-language.rst b/doc/source/cross-language.rst index 54b426791..904b91ce8 100644 --- a/doc/source/cross-language.rst +++ b/doc/source/cross-language.rst @@ -3,6 +3,25 @@ Cross-language programming This page will show you how to use Ray's cross-language programming feature. +Setup the cluster +----------------- + +We need to set the ``--code-search-path`` option on ``ray start`` command. The ``--code-search-path`` option instructs workers to load Java or Python code from the specified code search path. + +.. code-block:: bash + + ray start ... --code-search-path=/path/to/code + +You can also provide multiple directories for this option. + +.. code-block:: bash + + ray start ... --code-search-path=/path/to/jars1:/path/to/jars2:/path/to/pys1:/path/to/pys2 + +.. note: + + If ``--code-search-path`` is specified, you can only run remote functions which can be found in ``--code-search-path``. + Python calling Java ------------------- @@ -41,7 +60,7 @@ from the above Java class. import ray - ray.init(_load_code_from_local=True) + ray.init(address="auto") # Define a Java class. counter_class = ray.java_actor_class( @@ -57,13 +76,13 @@ from the above Java class. # Define a Java function. add_function = ray.java_function( "io.ray.demo.Math", "add") - + # Call the Java remote function. obj_ref3 = add_function.remote(1, 2) assert ray.get(obj_ref3) == 3 ray.shutdown() - + Java calling Python ------------------- @@ -92,7 +111,7 @@ Suppose we have a Python module as follows: * The function or class should be decorated by `@ray.remote`. -Then, in Java, we can call the above Python remote function, or create an actor +Then, in Java, we can call the above Python remote function, or create an actor from the above Python class. .. code-block:: java @@ -139,7 +158,7 @@ Cross-language data serialization The arguments and return values of ray call can be serialized & deserialized automatically if their types are the following: - + - Primitive data types =========== ======= ======= MessagePack Python Java @@ -168,7 +187,7 @@ automatically if their types are the following: float type to receive the input argument, the double precision Python data will be reduced to float precision in Java. * BigInteger can support max value of 2^64-1, please refer to: - https://github.com/msgpack/msgpack/blob/master/spec.md#int-format-family. + https://github.com/msgpack/msgpack/blob/master/spec.md#int-format-family. If the value larger than 2^64-1, then transfer the BigInteger: - From Java to Python: *raise an exception* @@ -272,7 +291,7 @@ Then, run the following code: import ray - ray.init(_load_code_from_local=True) + ray.init(address="auto") obj_ref = ray.java_function( "io.ray.demo.MyRayClass", diff --git a/doc/source/starting-ray.rst b/doc/source/starting-ray.rst index 63bc7ce6c..cb3173d5d 100644 --- a/doc/source/starting-ray.rst +++ b/doc/source/starting-ray.rst @@ -135,6 +135,8 @@ There are two steps needed to use Ray in a distributed setting: ray up cluster.yaml + To configure the Ray cluster to run Java code, you need to add the ``--code-search-path`` option. It's used to specify classpath for workers in the cluster. Your jar files must be distributed to all the nodes of the Ray cluster before running your code. You also need to make sure the paths of jar files are the same among nodes. + You can monitor the Ray cluster status with ``ray monitor cluster.yaml`` and ssh into the head node with ``ray attach cluster.yaml``. 2. Specify the address of the Ray cluster when initializing Ray in your code. This causes Ray to connect to the existing cluster instead of starting a new one on the local node. @@ -152,14 +154,11 @@ There are two steps needed to use Ray in a distributed setting: You need to add the ``ray.redis.address`` parameter to your command line (like ``-Dray.redis.address=...``). - You need to add the ``ray.job.resource-path`` parameter as well. Your jar files must be distributed to all the nodes of the Ray cluster before running your code. You also need to make sure the paths of jar files are the same among nodes. Let's say your jar files are located in ``/path/to/jars/``, all files under this path will be loaded by worker processes. - To connect your program to the Ray cluster, run it like this: .. code-block:: bash java -classpath /path/to/jars/ \ - -Dray.job.resource-path=/path/to/jars/ \ -Dray.redis.address=
\ @@ -189,6 +188,11 @@ The command will print out the address of the Redis server that was started (and $ ray start --address=
+If you want to run Java code, you need to specify the classpath via the ``--code-search-path`` option. + +.. code-block:: bash + + $ ray start ... --code-search-path=/path/to/jars Local mode ---------- diff --git a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java index fffc0efd2..db2e5e841 100644 --- a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java @@ -69,7 +69,7 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal { public AbstractRayRuntime(RayConfig rayConfig) { this.rayConfig = rayConfig; setIsContextSet(rayConfig.workerMode == Common.WorkerType.DRIVER); - functionManager = new FunctionManager(rayConfig.jobResourcePath); + functionManager = new FunctionManager(rayConfig.codeSearchPath); runtimeContext = new RuntimeContextImpl(this); } diff --git a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java index 724e82fb4..8c5be8f8f 100644 --- a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java @@ -157,7 +157,8 @@ public final class RayNativeRuntime extends AbstractRayRuntime { JobConfig.newBuilder() .setNumJavaWorkersPerProcess(rayConfig.numWorkersPerProcess) .addAllJvmOptions(rayConfig.jvmOptionsForJavaWorker) - .putAllWorkerEnv(rayConfig.workerEnv); + .putAllWorkerEnv(rayConfig.workerEnv) + .addAllCodeSearchPath(rayConfig.codeSearchPath); serializedJobConfig = jobConfigBuilder.build().toByteArray(); } diff --git a/java/runtime/src/main/java/io/ray/runtime/config/RayConfig.java b/java/runtime/src/main/java/io/ray/runtime/config/RayConfig.java index a91781ac9..10afc09ce 100644 --- a/java/runtime/src/main/java/io/ray/runtime/config/RayConfig.java +++ b/java/runtime/src/main/java/io/ray/runtime/config/RayConfig.java @@ -16,6 +16,8 @@ import io.ray.runtime.util.ResourceUtil; import java.io.File; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -73,7 +75,7 @@ public class RayConfig { public int nodeManagerPort; public final Map rayletConfigParameters; - public final String jobResourcePath; + public List codeSearchPath; public final String pythonWorkerCommand; private static volatile RayConfig instance = null; @@ -225,11 +227,12 @@ public class RayConfig { rayletConfigParameters.put(entry.getKey(), value == null ? "" : value.toString()); } - // Job resource path. - if (config.hasPath("ray.job.resource-path")) { - jobResourcePath = config.getString("ray.job.resource-path"); + // Job code search path. + if (config.hasPath("ray.job.code-search-path")) { + codeSearchPath = Arrays.asList( + config.getString("ray.job.code-search-path").split(":")); } else { - jobResourcePath = null; + codeSearchPath = Collections.emptyList(); } boolean enableMultiTenancy = false; @@ -311,7 +314,7 @@ public class RayConfig { dynamic.put("ray.object-store.socket-name", objectStoreSocketName); dynamic.put("ray.raylet.node-manager-port", nodeManagerPort); dynamic.put("ray.redis.address", redisAddress); - dynamic.put("ray.job.resource-path", jobResourcePath); + dynamic.put("ray.job.code-search-path", codeSearchPath); Config toRender = ConfigFactory.parseMap(dynamic).withFallback(config); return toRender.root().render(ConfigRenderOptions.concise()); } diff --git a/java/runtime/src/main/java/io/ray/runtime/functionmanager/FunctionManager.java b/java/runtime/src/main/java/io/ray/runtime/functionmanager/FunctionManager.java index f7f11fae1..05b248dc7 100644 --- a/java/runtime/src/main/java/io/ray/runtime/functionmanager/FunctionManager.java +++ b/java/runtime/src/main/java/io/ray/runtime/functionmanager/FunctionManager.java @@ -1,6 +1,5 @@ package io.ray.runtime.functionmanager; -import com.google.common.base.Strings; import io.ray.api.function.RayFunc; import io.ray.api.id.JobId; import io.ray.runtime.util.LambdaUtils; @@ -12,6 +11,8 @@ import java.lang.reflect.Method; import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -21,10 +22,11 @@ import java.util.Map; import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.DirectoryFileFilter; import org.apache.commons.io.filefilter.RegexFileFilter; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.objectweb.asm.Type; @@ -57,16 +59,16 @@ public class FunctionManager { /** * The resource path which we can load the job's jar resources. */ - private final String jobResourcePath; + private final List codeSearchPath; /** - * Construct a FunctionManager with the specified job resource path. + * Construct a FunctionManager with the specified code search path. * - * @param jobResourcePath The specified job resource that can store the job's + * @param codeSearchPath The specified job resource that can store the job's * resources. */ - public FunctionManager(String jobResourcePath) { - this.jobResourcePath = jobResourcePath; + public FunctionManager(List codeSearchPath) { + this.codeSearchPath = codeSearchPath; } /** @@ -115,23 +117,35 @@ public class FunctionManager { private JobFunctionTable createJobFunctionTable(JobId jobId) { ClassLoader classLoader; - if (Strings.isNullOrEmpty(jobResourcePath)) { + if (codeSearchPath == null || codeSearchPath.isEmpty()) { classLoader = getClass().getClassLoader(); } else { - File resourceDir = new File(jobResourcePath + "/" + jobId.toString() + "/"); - Collection files = FileUtils.listFiles(resourceDir, - new RegexFileFilter(".*\\.jar"), DirectoryFileFilter.DIRECTORY); - files.add(resourceDir); - final List urlList = files.stream().map(file -> { - try { - return file.toURI().toURL(); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - }).collect(Collectors.toList()); - classLoader = new URLClassLoader(urlList.toArray(new URL[urlList.size()])); - LOGGER.debug("Resource loaded for job {} from path {}.", jobId, - resourceDir.getAbsolutePath()); + URL[] urls = codeSearchPath.stream() + .filter(p -> StringUtils.isNotBlank(p) && Files.exists(Paths.get(p))) + .flatMap(p -> { + try { + if (!Files.isDirectory(Paths.get(p))) { + if (!p.endsWith(".jar")) { + return Stream.of(Paths.get(p).getParent().toAbsolutePath().toUri().toURL()); + } else { + return Stream.of(Paths.get(p).toAbsolutePath().toUri().toURL()); + } + } else { + List subUrls = new ArrayList<>(); + subUrls.add(Paths.get(p).toAbsolutePath().toUri().toURL()); + Collection jars = FileUtils.listFiles(new File(p), + new RegexFileFilter(".*\\.jar"), DirectoryFileFilter.DIRECTORY); + for (File jar : jars) { + subUrls.add(jar.toPath().toUri().toURL()); + } + return subUrls.stream(); + } + } catch (MalformedURLException e) { + throw new RuntimeException(String.format("Illegal %s resource path", p)); + } + }).toArray(URL[]::new); + classLoader = new URLClassLoader(urls); + LOGGER.debug("Resource loaded for job {} from path {}.", jobId, urls); } return new JobFunctionTable(classLoader); diff --git a/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java index 21b47aaf9..fad44f730 100644 --- a/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java +++ b/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java @@ -357,7 +357,10 @@ public class RunManager { File workerConfigFile = new File(rayConfig.sessionDir + "/java_worker.conf"); FileUtils.write(workerConfigFile, rayConfig.render(), Charset.defaultCharset()); cmd.add("-Dray.config-file=" + workerConfigFile.getAbsolutePath()); - + if (!rayConfig.codeSearchPath.isEmpty()) { + cmd.add("-Dray.job.code-search-path=" + + String.join(":", rayConfig.codeSearchPath)); + } cmd.add("RAY_WORKER_RAYLET_CONFIG_PLACEHOLDER"); cmd.addAll(rayConfig.jvmParameters); diff --git a/java/runtime/src/main/resources/ray.default.conf b/java/runtime/src/main/resources/ray.default.conf index 66002f48d..02438ecd2 100644 --- a/java/runtime/src/main/resources/ray.default.conf +++ b/java/runtime/src/main/resources/ray.default.conf @@ -22,10 +22,10 @@ ray { // If worker.mode is DRIVER, specify the job id. // If not provided, a random id will be used. id: "" - // If this config is set, worker will use different paths to load resources when - // executing tasks from different jobs. E.g. if it's set to '/tm/job_resources', - // the path for job 123 will be '/tmp/job_resources/123'. - resource-path: "" + // A list of directories or jar files separated by colon that specify the + // search path for user code. This will be used as `CLASSPATH` in Java, + // and `PYTHONPATH` in Python. + code-search-path: "" /// The number of java worker per worker process. num-java-workers-per-process: 1 /// The jvm options for java workers of the job. diff --git a/java/runtime/src/test/java/io/ray/runtime/config/RayConfigTest.java b/java/runtime/src/test/java/io/ray/runtime/config/RayConfigTest.java index a30ebbfe4..ee07bf8d0 100644 --- a/java/runtime/src/test/java/io/ray/runtime/config/RayConfigTest.java +++ b/java/runtime/src/test/java/io/ray/runtime/config/RayConfigTest.java @@ -1,6 +1,7 @@ package io.ray.runtime.config; import io.ray.runtime.generated.Common.WorkerType; +import java.util.Collections; import org.testng.Assert; import org.testng.annotations.Test; @@ -11,13 +12,14 @@ public class RayConfigTest { @Test public void testCreateRayConfig() { try { - System.setProperty("ray.job.resource-path", "path/to/ray/job/resource/path"); + System.setProperty("ray.job.code-search-path", "path/to/ray/job/resource/path"); RayConfig rayConfig = RayConfig.create(); Assert.assertEquals(WorkerType.DRIVER, rayConfig.workerMode); - Assert.assertEquals("path/to/ray/job/resource/path", rayConfig.jobResourcePath); + Assert.assertEquals(Collections.singletonList("path/to/ray/job/resource/path"), + rayConfig.codeSearchPath); } finally { // Unset system properties. - System.clearProperty("ray.job.resource-path"); + System.clearProperty("ray.job.code-search-path"); } } diff --git a/java/runtime/src/test/java/io/ray/runtime/functionmanager/FunctionManagerTest.java b/java/runtime/src/test/java/io/ray/runtime/functionmanager/FunctionManagerTest.java index 20d60a486..f369cc939 100644 --- a/java/runtime/src/test/java/io/ray/runtime/functionmanager/FunctionManagerTest.java +++ b/java/runtime/src/test/java/io/ray/runtime/functionmanager/FunctionManagerTest.java @@ -7,6 +7,7 @@ import io.ray.runtime.functionmanager.FunctionManager.JobFunctionTable; import java.io.File; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.Collections; import java.util.Map; import javax.tools.JavaCompiler; import javax.tools.ToolProvider; @@ -151,9 +152,8 @@ public class FunctionManagerTest { @Test public void testGetFunctionFromLocalResource() throws Exception { JobId jobId = JobId.fromInt(1); - final String resourcePath = FileUtils.getTempDirectoryPath() + "/ray_test_resources"; - final String jobResourcePath = resourcePath + "/" + jobId.toString(); - File jobResourceDir = new File(jobResourcePath); + final String codeSearchPath = FileUtils.getTempDirectoryPath() + "/ray_test_resources/"; + File jobResourceDir = new File(codeSearchPath); FileUtils.deleteQuietly(jobResourceDir); jobResourceDir.mkdirs(); jobResourceDir.deleteOnExit(); @@ -165,13 +165,13 @@ public class FunctionManagerTest { demoJavaFile += " }\n"; demoJavaFile += "}"; - // Write the demo java file to the job resource path. - String javaFilePath = jobResourcePath + "/DemoApp.java"; + // Write the demo java file to the job code search path. + String javaFilePath = codeSearchPath + "/DemoApp.java"; Files.write(Paths.get(javaFilePath), demoJavaFile.getBytes()); // Compile the java file. JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); - int result = compiler.run(null, null, null, "-d", jobResourcePath, javaFilePath); + int result = compiler.run(null, null, null, "-d", codeSearchPath, javaFilePath); if (result != 0) { throw new RuntimeException("Couldn't compile Demo.java."); } @@ -179,7 +179,8 @@ public class FunctionManagerTest { // Test loading the function. JavaFunctionDescriptor descriptor = new JavaFunctionDescriptor( "DemoApp", "hello", "()Ljava/lang/String;"); - final FunctionManager functionManager = new FunctionManager(resourcePath); + final FunctionManager functionManager = new FunctionManager( + Collections.singletonList(codeSearchPath)); RayFunction func = functionManager.getFunction(jobId, descriptor); Assert.assertEquals(func.getFunctionDescriptor(), descriptor); } diff --git a/java/test.sh b/java/test.sh index 32105a158..70ef6ebbb 100755 --- a/java/test.sh +++ b/java/test.sh @@ -46,6 +46,16 @@ echo "Running tests under single-process mode." # bazel test //java:all_tests --jvmopt="-Dray.run-mode=SINGLE_PROCESS" --config=ci || single_exit_code=$? run_testng java -Dray.run-mode="SINGLE_PROCESS" -cp "$ROOT_DIR"/../bazel-bin/java/all_tests_deploy.jar "${TEST_ARGS[@]}" org.testng.TestNG -d /tmp/ray_java_test_output "$ROOT_DIR"/testng.xml +echo "Running connecting existing cluster tests" +case "${OSTYPE}" in + linux*) ip=$(hostname -I | awk '{print $1}');; + darwin*) ip=$(ipconfig getifaddr en0);; + *) echo "Can't get ip address for ${OSTYPE}"; exit 1;; +esac +RAY_BACKEND_LOG_LEVEL=debug ray start --head --redis-port=6379 --redis-password=123456 --include-java --code-search-path="$PWD/bazel-bin/java/all_tests_deploy.jar" +RAY_BACKEND_LOG_LEVEL=debug java -cp bazel-bin/java/all_tests_deploy.jar -Dray.redis.address="$ip:6379"\ + -Dray.redis.password='123456' -Dray.job.code-search-path="$PWD/bazel-bin/java/all_tests_deploy.jar" io.ray.test.MultiDriverTest +ray stop popd pushd "$ROOT_DIR" diff --git a/java/test/src/main/java/io/ray/test/BaseMultiLanguageTest.java b/java/test/src/main/java/io/ray/test/BaseMultiLanguageTest.java index 4d89bd1db..2aeb90988 100644 --- a/java/test/src/main/java/io/ray/test/BaseMultiLanguageTest.java +++ b/java/test/src/main/java/io/ray/test/BaseMultiLanguageTest.java @@ -8,6 +8,7 @@ import io.ray.runtime.config.RayConfig; import io.ray.runtime.util.NetworkUtil; import java.io.File; import java.lang.ProcessBuilder.Redirect; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -66,12 +67,11 @@ public abstract class BaseMultiLanguageTest { // jars in the `ray` wheel doesn't contains test classes, so we add test classes explicitly. // Since mvn test classes contains `test` in path and bazel test classes is located at a jar // with `test` included in the name, we can check classpath `test` to filter out test classes. - String classpath = Stream.of(System.getProperty("java.class.path").split(":")) + List classpath = Stream.of(System.getProperty("java.class.path").split(":")) .filter(s -> !s.contains(" ") && s.contains("test")) - .collect(Collectors.joining(":")); - String workerOptions = new Gson().toJson(ImmutableList.of("-classpath", classpath)); + .collect(Collectors.toList()); // Start ray cluster. - List startCommand = ImmutableList.of( + List startCommand = Arrays.asList( "ray", "start", "--head", @@ -83,9 +83,10 @@ public abstract class BaseMultiLanguageTest { String.format("--node-manager-port=%s", nodeManagerPort), "--load-code-from-local", "--include-java", - "--java-worker-options=" + workerOptions, - "--system-config=" + new Gson().toJson(RayConfig.create().rayletConfigParameters) + "--system-config=" + new Gson().toJson(RayConfig.create().rayletConfigParameters), + "--code-search-path=" + String.join(":", classpath) ); + if (!executeCommand(startCommand, 10, getRayStartEnv())) { throw new RuntimeException("Couldn't start ray cluster."); } diff --git a/java/test/src/main/java/io/ray/test/ClassLoaderTest.java b/java/test/src/main/java/io/ray/test/ClassLoaderTest.java index 6777272fc..631422d79 100644 --- a/java/test/src/main/java/io/ray/test/ClassLoaderTest.java +++ b/java/test/src/main/java/io/ray/test/ClassLoaderTest.java @@ -3,7 +3,6 @@ package io.ray.test; import io.ray.api.ActorHandle; import io.ray.api.BaseActorHandle; import io.ray.api.ObjectRef; -import io.ray.api.Ray; import io.ray.api.options.ActorCreationOptions; import io.ray.runtime.AbstractRayRuntime; import io.ray.runtime.functionmanager.FunctionDescriptor; @@ -23,31 +22,31 @@ import org.testng.annotations.Test; public class ClassLoaderTest extends BaseTest { - private final String resourcePath = FileUtils.getTempDirectoryPath() + private final String codeSearchPath = FileUtils.getTempDirectoryPath() + "/ray_test/ClassLoaderTest"; @BeforeClass public void setUp() { // The potential issue of multiple `ClassLoader` instances for the same job on multi-threading - // scenario only occurs if the classes are loaded from the job resource path. - System.setProperty("ray.job.resource-path", resourcePath); + // scenario only occurs if the classes are loaded from the job code search path. + System.setProperty("ray.job.code-search-path", codeSearchPath); } @AfterClass public void tearDown() { - System.clearProperty("ray.job.resource-path"); + System.clearProperty("ray.job.code-search-path"); } @Test(groups = {"cluster"}) public void testClassLoaderInMultiThreading() throws Exception { - final String jobResourcePath = resourcePath + "/" + Ray.getRuntimeContext().getCurrentJobId(); - File jobResourceDir = new File(jobResourcePath); + File jobResourceDir = new File(codeSearchPath); FileUtils.deleteQuietly(jobResourceDir); jobResourceDir.mkdirs(); jobResourceDir.deleteOnExit(); - // In this test case the class is expected to be loaded from the job resource path, so we need - // to put the compiled class file into the job resource path and load it later. + // In this test case the class is expected to be loaded from the job code search path, + // so we need to put the compiled class file into the job code search path and load it + // later. String testJavaFile = "" + "import java.lang.management.ManagementFactory;\n" + "import java.lang.management.RuntimeMXBean;\n" @@ -83,14 +82,14 @@ public class ClassLoaderTest extends BaseTest { + " }\n" + "}"; - // Write the demo java file to the job resource path. - String javaFilePath = jobResourcePath + "/ClassLoaderTester.java"; + // Write the demo java file to the job code search path. + String javaFilePath = codeSearchPath + "/ClassLoaderTester.java"; Files.write(Paths.get(javaFilePath), testJavaFile.getBytes()); // Compile the java file. JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); int result = compiler.run(null, null, null, "-d", - jobResourcePath, javaFilePath); + codeSearchPath, javaFilePath); if (result != 0) { throw new RuntimeException("Couldn't compile ClassLoaderTester.java."); } diff --git a/python/ray/job_config.py b/python/ray/job_config.py index eedee46ef..ab2c20d42 100644 --- a/python/ray/job_config.py +++ b/python/ray/job_config.py @@ -10,6 +10,9 @@ class JobConfig: num_java_workers_per_process (int): The number of java workers per worker process. jvm_options (str[]): The jvm options for java workers of the job. + code_search_path (list): A list of directories or jar files that + specify the search path for user code. This will be used as + `CLASSPATH` in Java and `PYTHONPATH` in Python. """ def __init__( @@ -17,6 +20,7 @@ class JobConfig: worker_env=None, num_java_workers_per_process=1, jvm_options=None, + code_search_path=None, ): if worker_env is None: self.worker_env = dict() @@ -27,6 +31,8 @@ class JobConfig: self.jvm_options = [] else: self.jvm_options = jvm_options + if code_search_path is None: + self.code_search_path = [] def serialize(self): job_config = ray.gcs_utils.JobConfig() @@ -35,4 +41,5 @@ class JobConfig: job_config.num_java_workers_per_process = ( self.num_java_workers_per_process) job_config.jvm_options.extend(self.jvm_options) + job_config.code_search_path.extend(self.code_search_path) return job_config.SerializeToString() diff --git a/python/ray/node.py b/python/ray/node.py index fdc59330d..a65cc1e87 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -730,7 +730,8 @@ class Node: head_node=self.head, start_initial_python_workers_for_first_job=self._ray_params. start_initial_python_workers_for_first_job, - object_spilling_config=self._ray_params.object_spilling_config) + object_spilling_config=self._ray_params.object_spilling_config, + code_search_path=self._ray_params.code_search_path) assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info] diff --git a/python/ray/parameter.py b/python/ray/parameter.py index 9e7d38346..4a4ec4e85 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -147,7 +147,8 @@ class RayParams: metrics_agent_port=None, metrics_export_port=None, lru_evict=False, - object_spilling_config=None): + object_spilling_config=None, + code_search_path=None): self.object_ref_seed = object_ref_seed self.redis_address = redis_address self.num_cpus = num_cpus @@ -194,6 +195,9 @@ class RayParams: self._enable_object_reconstruction = enable_object_reconstruction self.object_spilling_config = object_spilling_config self._check_usage() + self.code_search_path = code_search_path + if code_search_path is None: + self.code_search_path = [] # Set the internal config options for LRU eviction. if lru_evict: diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 02e7ddfe9..1d5a0bd05 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -354,6 +354,13 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port): default=None, type=str, help="Overwrite the options to start Java workers.") +@click.option( + "--code-search-path", + default=None, + type=str, + help="A list of directories or jar files separated by colon that specify " + "the search path for user code. This will be used as `CLASSPATH` in " + "Java and `PYTHONPATH` in Python.") @click.option( "--system-config", default=None, @@ -391,9 +398,9 @@ def start(node_ip_address, redis_address, address, redis_port, port, dashboard_port, block, plasma_directory, huge_pages, autoscaling_config, no_redirect_worker_output, no_redirect_output, plasma_store_socket_name, raylet_socket_name, temp_dir, include_java, - java_worker_options, load_code_from_local, system_config, lru_evict, - enable_object_reconstruction, metrics_export_port, log_style, - log_color, verbose): + java_worker_options, code_search_path, load_code_from_local, + system_config, lru_evict, enable_object_reconstruction, + metrics_export_port, log_style, log_color, verbose): """Start Ray processes manually on the local machine.""" cli_logger.log_style = log_style cli_logger.color_mode = log_color @@ -504,6 +511,7 @@ def start(node_ip_address, redis_address, address, redis_port, port, dashboard_port=dashboard_port, java_worker_options=java_worker_options, load_code_from_local=load_code_from_local, + code_search_path=code_search_path, _system_config=system_config, lru_evict=lru_evict, enable_object_reconstruction=enable_object_reconstruction, diff --git a/python/ray/services.py b/python/ray/services.py index 46f002840..b2b1380a8 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -11,16 +11,15 @@ import socket import subprocess import sys import time -import redis import colorama +import colorful as cf +import psutil # Ray modules import ray import ray.ray_constants as ray_constants -import psutil - +import redis from ray.autoscaler.cli_logger import cli_logger -import colorful as cf resource = None if sys.platform != "win32": @@ -71,11 +70,6 @@ DEFAULT_WORKER_EXECUTABLE = os.path.join( os.path.abspath(os.path.dirname(__file__)), "core/src/ray/cpp/default_worker" + EXE_SUFFIX) -DEFAULT_JAVA_WORKER_CLASSPATH = [ - os.path.join( - os.path.abspath(os.path.dirname(__file__)), "../../../build/java/*"), -] - # Logger for this module. It should be configured at the entry point # into the program using Ray. Ray provides a default configuration at # entry/init points. @@ -1282,7 +1276,8 @@ def start_raylet(redis_address, socket_to_use=None, head_node=False, start_initial_python_workers_for_first_job=False, - object_spilling_config=None): + object_spilling_config=None, + code_search_path=None): """Start a raylet, which is a combined local scheduler and object manager. Args: @@ -1320,6 +1315,9 @@ def start_raylet(redis_address, include_java (bool): If True, the raylet backend can also support Java worker. java_worker_options (list): The command options for Java worker. + code_search_path (list): Code search path for worker. code_search_path + is added to worker command in non-multi-tenancy mode and job_config + in multi-tenancy mode. Returns: ProcessInfo for the process that was started. """ @@ -1348,16 +1346,15 @@ def start_raylet(redis_address, gcs_ip_address, gcs_port = redis_address.split(":") if include_java is True: - default_cp = os.pathsep.join(DEFAULT_JAVA_WORKER_CLASSPATH) java_worker_command = build_java_worker_command( - json.loads(java_worker_options) - if java_worker_options else ["-classpath", default_cp], + json.loads(java_worker_options) if java_worker_options else [], redis_address, node_manager_port, plasma_store_name, raylet_name, redis_password, session_dir, + code_search_path, ) else: java_worker_command = [] @@ -1384,6 +1381,8 @@ def start_raylet(redis_address, f"--config-list={config_str}", f"--temp-dir={temp_dir}", f"--metrics-agent-port={metrics_agent_port}" ] + if code_search_path: + start_worker_command.append(f"--code-search-path={code_search_path}") if redis_password: start_worker_command += [f"--redis-password={redis_password}"] @@ -1398,6 +1397,9 @@ def start_raylet(redis_address, if max_worker_port is None: max_worker_port = 0 + if code_search_path is not None and len(code_search_path) > 0: + load_code_from_local = True + if load_code_from_local: start_worker_command += ["--load-code-from-local"] @@ -1493,15 +1495,10 @@ def get_ray_jars_dir(): return os.path.abspath(os.path.join(current_dir, "jars")) -def build_java_worker_command( - java_worker_options, - redis_address, - node_manager_port, - plasma_store_name, - raylet_name, - redis_password, - session_dir, -): +def build_java_worker_command(java_worker_options, redis_address, + node_manager_port, plasma_store_name, + raylet_name, redis_password, session_dir, + code_search_path): """This method assembles the command used to start a Java worker. Args: @@ -1512,6 +1509,7 @@ def build_java_worker_command( raylet_name (str): The name of the raylet socket to create. redis_password (str): The password of connect to redis. session_dir (str): The path of this session. + code_search_path (list): Teh job code search path. Returns: The command string for starting Java worker. """ @@ -1532,7 +1530,7 @@ def build_java_worker_command( pairs.append(("ray.home", RAY_HOME)) pairs.append(("ray.logging.dir", os.path.join(session_dir, "logs"))) pairs.append(("ray.session-dir", session_dir)) - + pairs.append(("ray.job.code-search-path", code_search_path)) command = ["java"] + ["-D{}={}".format(*pair) for pair in pairs] command += ["RAY_WORKER_RAYLET_CONFIG_PLACEHOLDER"] diff --git a/python/ray/worker.py b/python/ray/worker.py index d83d42260..851f4933e 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -493,6 +493,7 @@ def init( _redis_password=ray_constants.REDIS_DEFAULT_PASSWORD, _include_java=False, _java_worker_options=None, + _code_search_path=None, _temp_dir=None, _load_code_from_local=False, _lru_evict=False, @@ -587,6 +588,7 @@ def init( _load_code_from_local: Whether code should be loaded from a local module or from the GCS. _java_worker_options: Overwrite the options to start Java workers. + _code_search_path (list): Java classpath or python import path. _lru_evict (bool): If True, when an object store is full, it will evict objects in LRU order to make more space and when under memory pressure, ray.ObjectLostError may be thrown. If False, then @@ -682,6 +684,7 @@ def init( temp_dir=_temp_dir, load_code_from_local=_load_code_from_local, java_worker_options=_java_worker_options, + code_search_path=_code_search_path, start_initial_python_workers_for_first_job=True, _system_config=_system_config, lru_evict=_lru_evict, diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 108bcdcd6..d2534e841 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -1,6 +1,8 @@ import argparse import json import time +import sys +import os import ray import ray.actor @@ -98,7 +100,13 @@ parser.add_argument( type=str, default="", help="The configuration of object spilling. Only used by I/O workers.") - +parser.add_argument( + "--code-search-path", + default=None, + type=str, + help="A list of directories or jar files separated by colon that specify " + "the search path for user code. This will be used as `CLASSPATH` in " + "Java and `PYTHONPATH` in Python.") if __name__ == "__main__": args = parser.parse_args() @@ -135,6 +143,13 @@ if __name__ == "__main__": if raylet_ip_address is None: raylet_ip_address = args.node_ip_address + code_search_path = args.code_search_path + if code_search_path is not None: + for p in code_search_path.split(":"): + if os.path.isfile(p): + p = os.path.dirname(p) + sys.path.append(p) + ray_params = RayParams( node_ip_address=args.node_ip_address, raylet_ip_address=raylet_ip_address, diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index c4d621538..6c3157d30 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -321,6 +321,10 @@ message JobConfig { uint32 num_java_workers_per_process = 2; // The jvm options for java workers of the job. repeated string jvm_options = 3; + // A list of directories or jar files that specify the search path for user + // code. This will be used as `CLASSPATH` in Java, and `PYTHONPATH` in + // Python. + repeated string code_search_path = 4; } message JobTableData { diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index a786f11ab..caf4f67e8 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -224,6 +224,33 @@ Process WorkerPool::StartWorkerProcess(const Language &language, dynamic_options.insert(dynamic_options.begin(), job_config->jvm_options().begin(), job_config->jvm_options().end()); } + // For non-multi-tenancy mode, job code search path is embedded in worker_command. + if (RayConfig::instance().enable_multi_tenancy() && job_config) { + std::string code_search_path_str; + for (int i = 0; i < job_config->code_search_path_size(); i++) { + auto path = job_config->code_search_path(i); + if (i != 0) { + code_search_path_str += ":"; + } + code_search_path_str += path; + } + if (!code_search_path_str.empty()) { + switch (language) { + case Language::PYTHON: { + code_search_path_str = "--code-search-path=" + code_search_path_str; + break; + } + case Language::JAVA: { + code_search_path_str = "-Dray.job.code-search-path" + code_search_path_str; + break; + } + default: + RAY_LOG(FATAL) << "code_search_path is not supported for worker language " + << language; + } + dynamic_options.push_back(code_search_path_str); + } + } // Extract pointers from the worker command to pass into execvp. std::vector worker_command_args;