[Core] Allow users to specify the classpath and import path (#10560)

* move job resource path to job config

* job resource path support list

* job resource path support for python

* fix job_resource_path support

* fix worker command

* fix job config

* use jar file instead of parent path

* fix job resource path

* add test to test.sh

* lint

* Update java/runtime/src/main/resources/ray.default.conf

Co-authored-by: Kai Yang <kfstorm@outlook.com>

* fix testGetFunctionFromLocalResource

* lint

* fix rebase

* add jars in resource path to classloader

* add job_resource_path to worker

* add ray stop

* rename job_resource_path to resource_path

* fix resource_path

* refine resource_path comments

* rename job resource path to code search path

* Add instruction about starting a cross-language cluster

* fix ClassLoaderTest.java

* add code-search-path to RunManager

* refine comments for code-search-path

* rename resourcePath to codeSearchPath

* Update doc

* fix

* rename resourcePath to codeSearchPath

* update doc

* filter out empty path

* fix comments

* fix comments

* fix tests

* revert pom

* lint

* fix doc

* update doc

* Apply suggestions from code review

* lint

Co-authored-by: Kai Yang <kfstorm@outlook.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
This commit is contained in:
chaokunyang
2020-09-09 00:46:32 +08:00
committed by GitHub
parent 3645a05644
commit bbfbc98a41
22 changed files with 226 additions and 102 deletions
@@ -69,7 +69,7 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
public AbstractRayRuntime(RayConfig rayConfig) {
this.rayConfig = rayConfig;
setIsContextSet(rayConfig.workerMode == Common.WorkerType.DRIVER);
functionManager = new FunctionManager(rayConfig.jobResourcePath);
functionManager = new FunctionManager(rayConfig.codeSearchPath);
runtimeContext = new RuntimeContextImpl(this);
}
@@ -157,7 +157,8 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
JobConfig.newBuilder()
.setNumJavaWorkersPerProcess(rayConfig.numWorkersPerProcess)
.addAllJvmOptions(rayConfig.jvmOptionsForJavaWorker)
.putAllWorkerEnv(rayConfig.workerEnv);
.putAllWorkerEnv(rayConfig.workerEnv)
.addAllCodeSearchPath(rayConfig.codeSearchPath);
serializedJobConfig = jobConfigBuilder.build().toByteArray();
}
@@ -16,6 +16,8 @@ import io.ray.runtime.util.ResourceUtil;
import java.io.File;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -73,7 +75,7 @@ public class RayConfig {
public int nodeManagerPort;
public final Map<String, String> rayletConfigParameters;
public final String jobResourcePath;
public List<String> codeSearchPath;
public final String pythonWorkerCommand;
private static volatile RayConfig instance = null;
@@ -225,11 +227,12 @@ public class RayConfig {
rayletConfigParameters.put(entry.getKey(), value == null ? "" : value.toString());
}
// Job resource path.
if (config.hasPath("ray.job.resource-path")) {
jobResourcePath = config.getString("ray.job.resource-path");
// Job code search path.
if (config.hasPath("ray.job.code-search-path")) {
codeSearchPath = Arrays.asList(
config.getString("ray.job.code-search-path").split(":"));
} else {
jobResourcePath = null;
codeSearchPath = Collections.emptyList();
}
boolean enableMultiTenancy = false;
@@ -311,7 +314,7 @@ public class RayConfig {
dynamic.put("ray.object-store.socket-name", objectStoreSocketName);
dynamic.put("ray.raylet.node-manager-port", nodeManagerPort);
dynamic.put("ray.redis.address", redisAddress);
dynamic.put("ray.job.resource-path", jobResourcePath);
dynamic.put("ray.job.code-search-path", codeSearchPath);
Config toRender = ConfigFactory.parseMap(dynamic).withFallback(config);
return toRender.root().render(ConfigRenderOptions.concise());
}
@@ -1,6 +1,5 @@
package io.ray.runtime.functionmanager;
import com.google.common.base.Strings;
import io.ray.api.function.RayFunc;
import io.ray.api.id.JobId;
import io.ray.runtime.util.LambdaUtils;
@@ -12,6 +11,8 @@ import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -21,10 +22,11 @@ import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.DirectoryFileFilter;
import org.apache.commons.io.filefilter.RegexFileFilter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.objectweb.asm.Type;
@@ -57,16 +59,16 @@ public class FunctionManager {
/**
* The resource path which we can load the job's jar resources.
*/
private final String jobResourcePath;
private final List<String> codeSearchPath;
/**
* Construct a FunctionManager with the specified job resource path.
* Construct a FunctionManager with the specified code search path.
*
* @param jobResourcePath The specified job resource that can store the job's
* @param codeSearchPath The specified job resource that can store the job's
* resources.
*/
public FunctionManager(String jobResourcePath) {
this.jobResourcePath = jobResourcePath;
public FunctionManager(List<String> codeSearchPath) {
this.codeSearchPath = codeSearchPath;
}
/**
@@ -115,23 +117,35 @@ public class FunctionManager {
private JobFunctionTable createJobFunctionTable(JobId jobId) {
ClassLoader classLoader;
if (Strings.isNullOrEmpty(jobResourcePath)) {
if (codeSearchPath == null || codeSearchPath.isEmpty()) {
classLoader = getClass().getClassLoader();
} else {
File resourceDir = new File(jobResourcePath + "/" + jobId.toString() + "/");
Collection<File> files = FileUtils.listFiles(resourceDir,
new RegexFileFilter(".*\\.jar"), DirectoryFileFilter.DIRECTORY);
files.add(resourceDir);
final List<URL> urlList = files.stream().map(file -> {
try {
return file.toURI().toURL();
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
classLoader = new URLClassLoader(urlList.toArray(new URL[urlList.size()]));
LOGGER.debug("Resource loaded for job {} from path {}.", jobId,
resourceDir.getAbsolutePath());
URL[] urls = codeSearchPath.stream()
.filter(p -> StringUtils.isNotBlank(p) && Files.exists(Paths.get(p)))
.flatMap(p -> {
try {
if (!Files.isDirectory(Paths.get(p))) {
if (!p.endsWith(".jar")) {
return Stream.of(Paths.get(p).getParent().toAbsolutePath().toUri().toURL());
} else {
return Stream.of(Paths.get(p).toAbsolutePath().toUri().toURL());
}
} else {
List<URL> subUrls = new ArrayList<>();
subUrls.add(Paths.get(p).toAbsolutePath().toUri().toURL());
Collection<File> jars = FileUtils.listFiles(new File(p),
new RegexFileFilter(".*\\.jar"), DirectoryFileFilter.DIRECTORY);
for (File jar : jars) {
subUrls.add(jar.toPath().toUri().toURL());
}
return subUrls.stream();
}
} catch (MalformedURLException e) {
throw new RuntimeException(String.format("Illegal %s resource path", p));
}
}).toArray(URL[]::new);
classLoader = new URLClassLoader(urls);
LOGGER.debug("Resource loaded for job {} from path {}.", jobId, urls);
}
return new JobFunctionTable(classLoader);
@@ -357,7 +357,10 @@ public class RunManager {
File workerConfigFile = new File(rayConfig.sessionDir + "/java_worker.conf");
FileUtils.write(workerConfigFile, rayConfig.render(), Charset.defaultCharset());
cmd.add("-Dray.config-file=" + workerConfigFile.getAbsolutePath());
if (!rayConfig.codeSearchPath.isEmpty()) {
cmd.add("-Dray.job.code-search-path=" +
String.join(":", rayConfig.codeSearchPath));
}
cmd.add("RAY_WORKER_RAYLET_CONFIG_PLACEHOLDER");
cmd.addAll(rayConfig.jvmParameters);
@@ -22,10 +22,10 @@ ray {
// If worker.mode is DRIVER, specify the job id.
// If not provided, a random id will be used.
id: ""
// If this config is set, worker will use different paths to load resources when
// executing tasks from different jobs. E.g. if it's set to '/tm/job_resources',
// the path for job 123 will be '/tmp/job_resources/123'.
resource-path: ""
// A list of directories or jar files separated by colon that specify the
// search path for user code. This will be used as `CLASSPATH` in Java,
// and `PYTHONPATH` in Python.
code-search-path: ""
/// The number of java worker per worker process.
num-java-workers-per-process: 1
/// The jvm options for java workers of the job.
@@ -1,6 +1,7 @@
package io.ray.runtime.config;
import io.ray.runtime.generated.Common.WorkerType;
import java.util.Collections;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -11,13 +12,14 @@ public class RayConfigTest {
@Test
public void testCreateRayConfig() {
try {
System.setProperty("ray.job.resource-path", "path/to/ray/job/resource/path");
System.setProperty("ray.job.code-search-path", "path/to/ray/job/resource/path");
RayConfig rayConfig = RayConfig.create();
Assert.assertEquals(WorkerType.DRIVER, rayConfig.workerMode);
Assert.assertEquals("path/to/ray/job/resource/path", rayConfig.jobResourcePath);
Assert.assertEquals(Collections.singletonList("path/to/ray/job/resource/path"),
rayConfig.codeSearchPath);
} finally {
// Unset system properties.
System.clearProperty("ray.job.resource-path");
System.clearProperty("ray.job.code-search-path");
}
}
@@ -7,6 +7,7 @@ import io.ray.runtime.functionmanager.FunctionManager.JobFunctionTable;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Map;
import javax.tools.JavaCompiler;
import javax.tools.ToolProvider;
@@ -151,9 +152,8 @@ public class FunctionManagerTest {
@Test
public void testGetFunctionFromLocalResource() throws Exception {
JobId jobId = JobId.fromInt(1);
final String resourcePath = FileUtils.getTempDirectoryPath() + "/ray_test_resources";
final String jobResourcePath = resourcePath + "/" + jobId.toString();
File jobResourceDir = new File(jobResourcePath);
final String codeSearchPath = FileUtils.getTempDirectoryPath() + "/ray_test_resources/";
File jobResourceDir = new File(codeSearchPath);
FileUtils.deleteQuietly(jobResourceDir);
jobResourceDir.mkdirs();
jobResourceDir.deleteOnExit();
@@ -165,13 +165,13 @@ public class FunctionManagerTest {
demoJavaFile += " }\n";
demoJavaFile += "}";
// Write the demo java file to the job resource path.
String javaFilePath = jobResourcePath + "/DemoApp.java";
// Write the demo java file to the job code search path.
String javaFilePath = codeSearchPath + "/DemoApp.java";
Files.write(Paths.get(javaFilePath), demoJavaFile.getBytes());
// Compile the java file.
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
int result = compiler.run(null, null, null, "-d", jobResourcePath, javaFilePath);
int result = compiler.run(null, null, null, "-d", codeSearchPath, javaFilePath);
if (result != 0) {
throw new RuntimeException("Couldn't compile Demo.java.");
}
@@ -179,7 +179,8 @@ public class FunctionManagerTest {
// Test loading the function.
JavaFunctionDescriptor descriptor = new JavaFunctionDescriptor(
"DemoApp", "hello", "()Ljava/lang/String;");
final FunctionManager functionManager = new FunctionManager(resourcePath);
final FunctionManager functionManager = new FunctionManager(
Collections.singletonList(codeSearchPath));
RayFunction func = functionManager.getFunction(jobId, descriptor);
Assert.assertEquals(func.getFunctionDescriptor(), descriptor);
}
+10
View File
@@ -46,6 +46,16 @@ echo "Running tests under single-process mode."
# bazel test //java:all_tests --jvmopt="-Dray.run-mode=SINGLE_PROCESS" --config=ci || single_exit_code=$?
run_testng java -Dray.run-mode="SINGLE_PROCESS" -cp "$ROOT_DIR"/../bazel-bin/java/all_tests_deploy.jar "${TEST_ARGS[@]}" org.testng.TestNG -d /tmp/ray_java_test_output "$ROOT_DIR"/testng.xml
echo "Running connecting existing cluster tests"
case "${OSTYPE}" in
linux*) ip=$(hostname -I | awk '{print $1}');;
darwin*) ip=$(ipconfig getifaddr en0);;
*) echo "Can't get ip address for ${OSTYPE}"; exit 1;;
esac
RAY_BACKEND_LOG_LEVEL=debug ray start --head --redis-port=6379 --redis-password=123456 --include-java --code-search-path="$PWD/bazel-bin/java/all_tests_deploy.jar"
RAY_BACKEND_LOG_LEVEL=debug java -cp bazel-bin/java/all_tests_deploy.jar -Dray.redis.address="$ip:6379"\
-Dray.redis.password='123456' -Dray.job.code-search-path="$PWD/bazel-bin/java/all_tests_deploy.jar" io.ray.test.MultiDriverTest
ray stop
popd
pushd "$ROOT_DIR"
@@ -8,6 +8,7 @@ import io.ray.runtime.config.RayConfig;
import io.ray.runtime.util.NetworkUtil;
import java.io.File;
import java.lang.ProcessBuilder.Redirect;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -66,12 +67,11 @@ public abstract class BaseMultiLanguageTest {
// jars in the `ray` wheel doesn't contains test classes, so we add test classes explicitly.
// Since mvn test classes contains `test` in path and bazel test classes is located at a jar
// with `test` included in the name, we can check classpath `test` to filter out test classes.
String classpath = Stream.of(System.getProperty("java.class.path").split(":"))
List<String> classpath = Stream.of(System.getProperty("java.class.path").split(":"))
.filter(s -> !s.contains(" ") && s.contains("test"))
.collect(Collectors.joining(":"));
String workerOptions = new Gson().toJson(ImmutableList.of("-classpath", classpath));
.collect(Collectors.toList());
// Start ray cluster.
List<String> startCommand = ImmutableList.of(
List<String> startCommand = Arrays.asList(
"ray",
"start",
"--head",
@@ -83,9 +83,10 @@ public abstract class BaseMultiLanguageTest {
String.format("--node-manager-port=%s", nodeManagerPort),
"--load-code-from-local",
"--include-java",
"--java-worker-options=" + workerOptions,
"--system-config=" + new Gson().toJson(RayConfig.create().rayletConfigParameters)
"--system-config=" + new Gson().toJson(RayConfig.create().rayletConfigParameters),
"--code-search-path=" + String.join(":", classpath)
);
if (!executeCommand(startCommand, 10, getRayStartEnv())) {
throw new RuntimeException("Couldn't start ray cluster.");
}
@@ -3,7 +3,6 @@ package io.ray.test;
import io.ray.api.ActorHandle;
import io.ray.api.BaseActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.options.ActorCreationOptions;
import io.ray.runtime.AbstractRayRuntime;
import io.ray.runtime.functionmanager.FunctionDescriptor;
@@ -23,31 +22,31 @@ import org.testng.annotations.Test;
public class ClassLoaderTest extends BaseTest {
private final String resourcePath = FileUtils.getTempDirectoryPath()
private final String codeSearchPath = FileUtils.getTempDirectoryPath()
+ "/ray_test/ClassLoaderTest";
@BeforeClass
public void setUp() {
// The potential issue of multiple `ClassLoader` instances for the same job on multi-threading
// scenario only occurs if the classes are loaded from the job resource path.
System.setProperty("ray.job.resource-path", resourcePath);
// scenario only occurs if the classes are loaded from the job code search path.
System.setProperty("ray.job.code-search-path", codeSearchPath);
}
@AfterClass
public void tearDown() {
System.clearProperty("ray.job.resource-path");
System.clearProperty("ray.job.code-search-path");
}
@Test(groups = {"cluster"})
public void testClassLoaderInMultiThreading() throws Exception {
final String jobResourcePath = resourcePath + "/" + Ray.getRuntimeContext().getCurrentJobId();
File jobResourceDir = new File(jobResourcePath);
File jobResourceDir = new File(codeSearchPath);
FileUtils.deleteQuietly(jobResourceDir);
jobResourceDir.mkdirs();
jobResourceDir.deleteOnExit();
// In this test case the class is expected to be loaded from the job resource path, so we need
// to put the compiled class file into the job resource path and load it later.
// In this test case the class is expected to be loaded from the job code search path,
// so we need to put the compiled class file into the job code search path and load it
// later.
String testJavaFile = ""
+ "import java.lang.management.ManagementFactory;\n"
+ "import java.lang.management.RuntimeMXBean;\n"
@@ -83,14 +82,14 @@ public class ClassLoaderTest extends BaseTest {
+ " }\n"
+ "}";
// Write the demo java file to the job resource path.
String javaFilePath = jobResourcePath + "/ClassLoaderTester.java";
// Write the demo java file to the job code search path.
String javaFilePath = codeSearchPath + "/ClassLoaderTester.java";
Files.write(Paths.get(javaFilePath), testJavaFile.getBytes());
// Compile the java file.
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
int result = compiler.run(null, null, null, "-d",
jobResourcePath, javaFilePath);
codeSearchPath, javaFilePath);
if (result != 0) {
throw new RuntimeException("Couldn't compile ClassLoaderTester.java.");
}