[Java] Load driver resources from local path. (#3001)

## What do these changes do?
1. Add a configuration item `driver.resource-path`.
2. Load driver resources from the local path which is specified in the `ray.conf`.

Before this change, we should add all driver resources(like user's jar package, dependencies package and config files) into `classpath`.

After this change, we should add the driver resources into the mount path which we can configure it in `ray.conf`, and we shouldn't configure `classpath` for driver resources any more.

## Related issue number
N/A
This commit is contained in:
Wang Qing
2018-10-09 04:05:26 +08:00
committed by Hao Chen
parent 2d35a97a76
commit 84bf5fc8f3
7 changed files with 97 additions and 9 deletions
@@ -48,7 +48,7 @@ public abstract class AbstractRayRuntime implements RayRuntime {
public AbstractRayRuntime(RayConfig rayConfig) {
this.rayConfig = rayConfig;
functionManager = new FunctionManager();
functionManager = new FunctionManager(rayConfig.driverResourcePath);
worker = new Worker(this);
workerContext = new WorkerContext(rayConfig.workerMode, rayConfig.driverId);
}
@@ -51,6 +51,7 @@ public class RayConfig {
public final String redisModulePath;
public final String plasmaStoreExecutablePath;
public final String rayletExecutablePath;
public final String driverResourcePath;
private void validate() {
if (workerMode == WorkerMode.WORKER) {
@@ -156,6 +157,18 @@ public class RayConfig {
plasmaStoreExecutablePath = rayHome + "/build/src/plasma/plasma_store_server";
rayletExecutablePath = rayHome + "/build/src/ray/raylet/raylet";
// driver resource path
String localDriverResourcePath;
if (config.hasPath("ray.driver.resource-path")) {
localDriverResourcePath = config.getString("ray.driver.resource-path");
} else {
localDriverResourcePath = rayHome + "/driver/resource";
LOGGER.warn("Didn't configure ray.driver.resource-path, set it to default value: {}",
localDriverResourcePath);
}
driverResourcePath = localDriverResourcePath;
// validate config
validate();
LOGGER.debug("Created config: {}", this);
@@ -15,13 +15,18 @@ import org.apache.commons.lang3.tuple.Pair;
import org.objectweb.asm.Type;
import org.ray.api.function.RayFunc;
import org.ray.api.id.UniqueId;
import org.ray.runtime.util.JarLoader;
import org.ray.runtime.util.LambdaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Manages functions by driver id.
*/
public class FunctionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(FunctionManager.class);
static final String CONSTRUCTOR_NAME = "<init>";
/**
@@ -36,6 +41,21 @@ public class FunctionManager {
*/
private Map<UniqueId, DriverFunctionTable> driverFunctionTables = new HashMap<>();
/**
* The resource path which we can load the driver's jar resources.
*/
private String driverResourcePath;
/**
* Construct a FunctionManager with the specified driver resource path.
*
* @param driverResourcePath The specified driver resource that
* can store the driver's resources.
*/
public FunctionManager(String driverResourcePath) {
this.driverResourcePath = driverResourcePath;
}
/**
* Get the RayFunction from a RayFunc instance (a lambda).
*
@@ -66,8 +86,19 @@ public class FunctionManager {
public RayFunction getFunction(UniqueId driverId, FunctionDescriptor functionDescriptor) {
DriverFunctionTable driverFunctionTable = driverFunctionTables.get(driverId);
if (driverFunctionTable == null) {
//TODO(hchen): distinguish class loader by driver id.
ClassLoader classLoader = getClass().getClassLoader();
String resourcePath = driverResourcePath + "/" + driverId.toString() + "/";
ClassLoader classLoader;
try {
classLoader = JarLoader.loadJars(resourcePath, false);
LOGGER.info("Succeeded to load driver({}) resource. Resource path is {}",
driverId, resourcePath);
} catch (Exception e) {
LOGGER.error("Failed to load driver({}) resource. Resource path is {}",
driverId, resourcePath);
classLoader = getClass().getClassLoader();
}
driverFunctionTable = new DriverFunctionTable(classLoader);
driverFunctionTables.put(driverId, driverFunctionTable);
}
@@ -14,13 +14,16 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.filefilter.DirectoryFileFilter;
import org.apache.commons.io.filefilter.RegexFileFilter;
import org.ray.runtime.util.logger.RayLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* load and unload jars from a dir.
*/
public class JarLoader {
private static final Logger LOGGER = LoggerFactory.getLogger(JarLoader.class);
public static URLClassLoader loadJars(String dir, boolean explicitLoad) {
// get all jars
Collection<File> jars = FileUtils.listFiles(
@@ -42,7 +45,7 @@ public class JarLoader {
for (File appJar : appJars) {
try {
RayLog.core.info("load jar " + appJar.getAbsolutePath());
LOGGER.info("succeeded to load jar {}.", appJar.getAbsolutePath());
JarFile jar = new JarFile(appJar.getAbsolutePath());
jars.add(jar);
urls.add(appJar.toURI().toURL());
@@ -25,9 +25,15 @@ ray {
// Available resources on this node, for example "CPU:4,GPU:0".
resources: ""
// If worker.mode is DRIVER, specify the driver id.
// If not provided, a random id will be used.
driver.id: ""
// Configuration items about driver.
driver {
// If worker.mode is DRIVER, specify the driver id.
// If not provided, a random id will be used.
id: ""
// If worker.mode is WORKER, it means that worker will load
// the resources from this path to execute tasks.
resource-path: /tmp/ray/driver/resource
}
// Root dir of log files.
log-dir: /tmp/ray/logs
@@ -76,4 +82,5 @@ ray {
// RPC socket name of Raylet
socket-name: /tmp/ray/sockets/raylet
}
}
@@ -1,5 +1,9 @@
package org.ray.runtime.functionmanager;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Map;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
@@ -41,6 +45,8 @@ public class FunctionManagerTest {
private static FunctionDescriptor barDescriptor;
private static FunctionDescriptor barConstructorDescriptor;
private final static String resourcePath = "/tmp/ray/test/resource";
private FunctionManager functionManager;
@BeforeClass
@@ -59,7 +65,7 @@ public class FunctionManagerTest {
@Before
public void before() {
functionManager = new FunctionManager();
functionManager = new FunctionManager(FunctionManagerTest.resourcePath);
}
@Test
@@ -116,4 +122,27 @@ public class FunctionManagerTest {
Assert.assertTrue(res.containsKey(
ImmutablePair.of(barConstructorDescriptor.name, barConstructorDescriptor.typeDescriptor)));
}
//TODO(qwang): This is an integration test case, and we should move it to test folder in the future.
@Test
public void testGetFunctionFromLocalResource() throws Exception{
UniqueId driverId = UniqueId.fromHexString("0123456789012345678901234567890123456789");
//TODO(qwang): We should use a independent app demo instead of `tutorial`.
final String srcJarPath = System.getProperty("user.dir") +
"/../tutorial/target/ray-tutorial-0.1-SNAPSHOT.jar";
final String destJarPath = resourcePath + "/" + driverId.toString() +
"/ray-tutorial-0.1-SNAPSHOT.jar";
File file = new File(resourcePath + "/" + driverId.toString());
file.mkdirs();
Files.copy(Paths.get(srcJarPath), Paths.get(destJarPath), StandardCopyOption.REPLACE_EXISTING);
FunctionDescriptor sayHelloDescriptor = new FunctionDescriptor("org.ray.exercise.Exercise02",
"sayHello", "()Ljava/lang/String;");
RayFunction func = functionManager.getFunction(driverId, sayHelloDescriptor);
Assert.assertEquals(func.getFunctionDescriptor(), sayHelloDescriptor);
}
}