diff --git a/java/runtime/src/main/java/org/ray/runtime/DefaultRayRuntimeFactory.java b/java/runtime/src/main/java/org/ray/runtime/DefaultRayRuntimeFactory.java index b03debddc..69448ecb7 100644 --- a/java/runtime/src/main/java/org/ray/runtime/DefaultRayRuntimeFactory.java +++ b/java/runtime/src/main/java/org/ray/runtime/DefaultRayRuntimeFactory.java @@ -18,7 +18,7 @@ public class DefaultRayRuntimeFactory implements RayRuntimeFactory { @Override public RayRuntime createRayRuntime() { - RayConfig rayConfig = RayConfig.create(); + RayConfig rayConfig = RayConfig.getInstance(); try { FunctionManager functionManager = new FunctionManager(rayConfig.jobResourcePath); RayRuntime runtime; diff --git a/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java index 99eaea59b..563d2b99b 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java @@ -41,6 +41,7 @@ public class RayDevRuntime extends AbstractRayRuntime { taskSubmitter = null; } taskExecutor = null; + RayConfig.reset(); } @Override diff --git a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java index 4c8b6e42f..7e315c5ee 100644 --- a/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java @@ -45,17 +45,24 @@ public final class RayNativeRuntime extends AbstractRayRuntime { // Expose ray ABI symbols which may be depended by other shared // libraries such as libstreaming_java.so. // See BUILD.bazel:libcore_worker_library_java.so + final RayConfig rayConfig = RayConfig.getInstance(); + if (rayConfig.getRedisAddress() != null && rayConfig.workerMode == WorkerType.DRIVER) { + // Fetch session dir from GCS if this is a driver that is connecting to the existing GCS. + RedisClient client = new RedisClient(rayConfig.getRedisAddress(), rayConfig.redisPassword); + final String sessionDir = client.get("session_dir", null); + Preconditions.checkNotNull(sessionDir); + rayConfig.setSessionDir(sessionDir); + } + JniUtils.loadLibrary("core_worker_library_java", true); LOGGER.debug("Native libraries loaded."); - RayConfig globalRayConfig = RayConfig.create(); - resetLibraryPath(globalRayConfig); - + resetLibraryPath(rayConfig); try { - FileUtils.forceMkdir(new File(globalRayConfig.logDir)); + FileUtils.forceMkdir(new File(rayConfig.logDir)); } catch (IOException e) { throw new RuntimeException("Failed to create the log directory.", e); } - nativeSetup(globalRayConfig.logDir); + nativeSetup(rayConfig.logDir); Runtime.getRuntime().addShutdownHook(new Thread(RayNativeRuntime::nativeShutdownHook)); } @@ -67,7 +74,6 @@ public final class RayNativeRuntime extends AbstractRayRuntime { public RayNativeRuntime(RayConfig rayConfig, FunctionManager functionManager) { super(rayConfig, functionManager); - // Reset library path at runtime. resetLibraryPath(rayConfig); @@ -111,7 +117,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime { manager.cleanup(); manager = null; } - + RayConfig.reset(); LOGGER.info("RayNativeRuntime shutdown"); } diff --git a/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java b/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java index 23c2eec9a..d6e76d1eb 100644 --- a/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java +++ b/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java @@ -8,10 +8,12 @@ import com.typesafe.config.ConfigException; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValue; import java.io.File; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.Map; - +import java.util.Random; import org.ray.api.id.JobId; import org.ray.runtime.generated.Common.WorkerType; import org.ray.runtime.util.NetworkUtil; @@ -30,12 +32,22 @@ public class RayConfig { public static final String DEFAULT_CONFIG_FILE = "ray.default.conf"; public static final String CUSTOM_CONFIG_FILE = "ray.conf"; + private static final Random RANDOM = new Random(); + + private static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("YYYY-MM-dd_HH-mm-ss"); + + private static final String DEFAULT_TEMP_DIR = "/tmp/ray"; + + private Config config; + public final String nodeIp; public final WorkerType workerMode; public final RunMode runMode; public final Map resources; private JobId jobId; - public final String logDir; + public String sessionDir; + public String logDir; public final boolean redirectOutput; public final List libraryPath; public final List classpath; @@ -50,16 +62,35 @@ public class RayConfig { public final String headRedisPassword; public final String redisPassword; - public final String objectStoreSocketName; + public String objectStoreSocketName; public final Long objectStoreSize; - public final String rayletSocketName; + public String rayletSocketName; private int nodeManagerPort; public final List rayletConfigParameters; public final String jobResourcePath; public final String pythonWorkerCommand; + private static volatile RayConfig instance = null; + + public static RayConfig getInstance() { + if (instance == null) { + synchronized (RayConfig.class) { + if (instance == null) { + instance = RayConfig.create(); + } + } + } + return instance; + } + + public static void reset() { + synchronized (RayConfig.class) { + instance = null; + } + } + /** * Number of threads that execute tasks. */ @@ -83,6 +114,7 @@ public class RayConfig { } public RayConfig(Config config) { + this.config = config; // Worker mode. WorkerType localWorkerMode; try { @@ -118,8 +150,11 @@ public class RayConfig { } else { this.jobId = JobId.NIL; } - // Log dir. - logDir = removeTrailingSlash(config.getString("ray.log-dir")); + + updateSessionDir(); + // Object store configurations. + objectStoreSize = config.getBytes("ray.object-store.size"); + // Redirect output. redirectOutput = config.getBoolean("ray.redirect-output"); // Library path. @@ -160,12 +195,6 @@ public class RayConfig { headRedisPassword = config.getString("ray.redis.head-password"); redisPassword = config.getString("ray.redis.password"); - // Object store configurations. - objectStoreSocketName = config.getString("ray.object-store.socket-name"); - objectStoreSize = config.getBytes("ray.object-store.size"); - - // Raylet socket name. - rayletSocketName = config.getString("ray.raylet.socket-name"); // Raylet node manager port. nodeManagerPort = config.getInt("ray.raylet.node-manager-port"); if (nodeManagerPort == 0) { @@ -234,6 +263,66 @@ public class RayConfig { return nodeManagerPort; } + public void setSessionDir(String sessionDir) { + this.sessionDir = sessionDir; + } + + public String getSessionDir() { + return sessionDir; + } + + private void updateSessionDir() { + // session dir + String localSessionDir = System.getProperty("ray.session-dir"); + if (workerMode == WorkerType.DRIVER) { + Preconditions.checkState(localSessionDir == null); + final int minBound = 100000; + final int maxBound = 999999; + final String sessionName = String.format("session_%s_%d", DATE_TIME_FORMATTER.format( + LocalDateTime.now()), RANDOM.nextInt(maxBound - minBound) + minBound); + sessionDir = String.format("%s/%s", DEFAULT_TEMP_DIR, sessionName); + } else if (workerMode == WorkerType.WORKER) { + Preconditions.checkState(localSessionDir != null); + sessionDir = removeTrailingSlash(localSessionDir); + } else { + throw new RuntimeException("Unknown worker type."); + } + + // Log dir. + String localLogDir = null; + if (config.hasPath("ray.log-dir")) { + localLogDir = removeTrailingSlash(config.getString("ray.log-dir")); + } + if (Strings.isNullOrEmpty(localLogDir)) { + logDir = String.format("%s/logs", sessionDir); + } else { + logDir = localLogDir; + } + + // Object store socket name. + String localObjectStoreSocketName = null; + if (config.hasPath("ray.object-store.socket-name")) { + localObjectStoreSocketName = config.getString("ray.object-store.socket-name"); + } + if (Strings.isNullOrEmpty(localObjectStoreSocketName)) { + objectStoreSocketName = String.format("%s/sockets/object_store", sessionDir); + } else { + objectStoreSocketName = localObjectStoreSocketName; + } + + // Raylet socket name. + String localRayletSocketName = null; + if (config.hasPath("ray.raylet.socket-name")) { + localRayletSocketName = config.getString("ray.raylet.socket-name"); + } + if (Strings.isNullOrEmpty(localRayletSocketName)) { + rayletSocketName = String.format("%s/sockets/raylet", sessionDir); + } else { + rayletSocketName = localRayletSocketName; + } + + } + @Override public String toString() { return "RayConfig{" diff --git a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java index 66c45f258..514b0bb59 100644 --- a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java +++ b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java @@ -1,11 +1,14 @@ package org.ray.runtime.runner; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; @@ -18,7 +21,7 @@ import java.util.stream.Stream; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; import org.ray.runtime.config.RayConfig; -import org.ray.runtime.util.FileUtil; +import org.ray.runtime.util.BinaryFileUtil; import org.ray.runtime.util.ResourceUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,9 +39,11 @@ public class RunManager { private static final String WORKER_CLASS = "org.ray.runtime.runner.worker.DefaultWorker"; + private static final String SESSION_LATEST = "session_latest"; + private RayConfig rayConfig; - private Random random; + private Random random = new Random(); private List> processes; @@ -47,7 +52,7 @@ public class RunManager { public RunManager(RayConfig rayConfig) { this.rayConfig = rayConfig; processes = new ArrayList<>(); - random = new Random(); + createTempDirs(); } public void cleanup() { @@ -95,6 +100,17 @@ public class RunManager { FileUtils.forceMkdir(new File(rayConfig.logDir)); FileUtils.forceMkdir(new File(rayConfig.rayletSocketName).getParentFile()); FileUtils.forceMkdir(new File(rayConfig.objectStoreSocketName).getParentFile()); + + // Remove session_latest first, and then create a new symbolic link for session_latest. + final String parentOfSessionDir = new File(rayConfig.sessionDir).getParent(); + final File sessionLatest = new File( + String.format("%s/%s", parentOfSessionDir, SESSION_LATEST)); + if (sessionLatest.exists()) { + sessionLatest.delete(); + } + Files.createSymbolicLink( + Paths.get(sessionLatest.getAbsolutePath()), + Paths.get(rayConfig.sessionDir)); } catch (IOException e) { LOGGER.error("Couldn't create temp directories.", e); throw new RuntimeException(e); @@ -171,7 +187,6 @@ public class RunManager { public void startRayProcesses(boolean isHead) { LOGGER.info("Starting ray processes @ {}.", rayConfig.nodeIp); try { - createTempDirs(); if (isHead) { startGcs(); } @@ -218,50 +233,47 @@ public class RunManager { } // See `src/ray/gcs/gcs_server/gcs_server_main.cc` for the meaning of each parameter. - try (FileUtil.TempFile gcsServerFile = FileUtil.getTempFileFromResource("gcs_server")) { - gcsServerFile.getFile().setExecutable(true); - List command = ImmutableList.of( - gcsServerFile.getFile().getAbsolutePath(), - String.format("--redis_address=%s", rayConfig.getRedisIp()), - String.format("--redis_port=%d", rayConfig.getRedisPort()), - String.format("--config_list=%s", String.join(",", rayConfig.rayletConfigParameters)), - String.format("--redis_password=%s", redisPasswordOption) - ); - - startProcess(command, null, "gcs_server"); - } + final File gcsServerFile = BinaryFileUtil.getFile( + rayConfig.sessionDir, BinaryFileUtil.GCS_SERVER_BINARY_NAME); + Preconditions.checkState(gcsServerFile.setExecutable(true)); + List command = ImmutableList.of( + gcsServerFile.getAbsolutePath(), + String.format("--redis_address=%s", rayConfig.getRedisIp()), + String.format("--redis_port=%d", rayConfig.getRedisPort()), + String.format("--config_list=%s", String.join(",", rayConfig.rayletConfigParameters)), + String.format("--redis_password=%s", redisPasswordOption) + ); + startProcess(command, null, "gcs_server"); } } private String startRedisInstance(String ip, int port, String password, Integer shard) { - try (FileUtil.TempFile redisServerFile = FileUtil.getTempFileFromResource("redis-server")) { - try (FileUtil.TempFile redisModuleFile = FileUtil.getTempFileFromResource( - "libray_redis_module.so")) { - redisServerFile.getFile().setExecutable(true); - List command = Lists.newArrayList( - // The redis-server executable file. - redisServerFile.getFile().getAbsolutePath(), - "--protected-mode", - "no", - "--port", - String.valueOf(port), - "--loglevel", - "warning", - "--loadmodule", - // The redis module file. - redisModuleFile.getFile().getAbsolutePath() - ); + final File redisServerFile = BinaryFileUtil.getFile( + rayConfig.sessionDir, BinaryFileUtil.REDIS_SERVER_BINARY_NAME); + Preconditions.checkState(redisServerFile.setExecutable(true)); + List command = Lists.newArrayList( + // The redis-server executable file. + redisServerFile.getAbsolutePath(), + "--protected-mode", + "no", + "--port", + String.valueOf(port), + "--loglevel", + "warning", + "--loadmodule", + // The redis module file. + BinaryFileUtil.getFile( + rayConfig.sessionDir, BinaryFileUtil.REDIS_MODULE_LIBRARY_NAME).getAbsolutePath() + ); - if (!Strings.isNullOrEmpty(password)) { - command.add("--requirepass "); - command.add(password); - } - - String name = shard == null ? "redis" : "redis-" + shard; - startProcess(command, null, name); - } + if (!Strings.isNullOrEmpty(password)) { + command.add("--requirepass "); + command.add(password); } + String name = shard == null ? "redis" : "redis-" + shard; + startProcess(command, null, name); + try (Jedis client = new Jedis("127.0.0.1", port)) { if (!Strings.isNullOrEmpty(password)) { client.auth(password); @@ -287,30 +299,30 @@ public class RunManager { } // See `src/ray/raylet/main.cc` for the meaning of each parameter. - try (FileUtil.TempFile rayletFile = FileUtil.getTempFileFromResource("raylet")) { - rayletFile.getFile().setExecutable(true); - List command = ImmutableList.of( - rayletFile.getFile().getAbsolutePath(), - String.format("--raylet_socket_name=%s", rayConfig.rayletSocketName), - String.format("--store_socket_name=%s", rayConfig.objectStoreSocketName), - String.format("--object_manager_port=%d", 0), // The object manager port. - // The node manager port. - String.format("--node_manager_port=%d", rayConfig.getNodeManagerPort()), - String.format("--node_ip_address=%s", rayConfig.nodeIp), - String.format("--redis_address=%s", rayConfig.getRedisIp()), - String.format("--redis_port=%d", rayConfig.getRedisPort()), - String.format("--num_initial_workers=%d", 0), // number of initial workers - String.format("--maximum_startup_concurrency=%d", maximumStartupConcurrency), - String.format("--static_resource_list=%s", - ResourceUtil.getResourcesStringFromMap(rayConfig.resources)), - String.format("--config_list=%s", String.join(",", rayConfig.rayletConfigParameters)), - String.format("--python_worker_command=%s", buildPythonWorkerCommand()), - String.format("--java_worker_command=%s", buildWorkerCommandRaylet()), - String.format("--redis_password=%s", redisPasswordOption) - ); + final File rayletFile = BinaryFileUtil.getFile( + rayConfig.sessionDir, BinaryFileUtil.RAYLET_BINARY_NAME); + Preconditions.checkState(rayletFile.setExecutable(true)); + List command = ImmutableList.of( + rayletFile.getAbsolutePath(), + String.format("--raylet_socket_name=%s", rayConfig.rayletSocketName), + String.format("--store_socket_name=%s", rayConfig.objectStoreSocketName), + String.format("--object_manager_port=%d", 0), // The object manager port. + // The node manager port. + String.format("--node_manager_port=%d", rayConfig.getNodeManagerPort()), + String.format("--node_ip_address=%s", rayConfig.nodeIp), + String.format("--redis_address=%s", rayConfig.getRedisIp()), + String.format("--redis_port=%d", rayConfig.getRedisPort()), + String.format("--num_initial_workers=%d", 0), // number of initial workers + String.format("--maximum_startup_concurrency=%d", maximumStartupConcurrency), + String.format("--static_resource_list=%s", + ResourceUtil.getResourcesStringFromMap(rayConfig.resources)), + String.format("--config_list=%s", String.join(",", rayConfig.rayletConfigParameters)), + String.format("--python_worker_command=%s", buildPythonWorkerCommand()), + String.format("--java_worker_command=%s", buildWorkerCommand()), + String.format("--redis_password=%s", redisPasswordOption) + ); - startProcess(command, null, "raylet"); - } + startProcess(command, null, "raylet"); } private String concatPath(Stream stream) { @@ -319,7 +331,7 @@ public class RunManager { return stream.filter(s -> !s.contains(" ")).collect(Collectors.joining(":")); } - private String buildWorkerCommandRaylet() { + private String buildWorkerCommand() { List cmd = new ArrayList<>(); cmd.add("java"); cmd.add("-classpath"); @@ -335,6 +347,9 @@ public class RunManager { String libraryPath = concatPath(rayConfig.libraryPath.stream()); cmd.add("-Djava.library.path=" + libraryPath); + // session path + cmd.add("-Dray.session-dir=" + rayConfig.sessionDir); + // logging path if (rayConfig.redirectOutput) { cmd.add("-Dray.logging.stdout=org.apache.log4j.varia.NullAppender"); @@ -379,21 +394,21 @@ public class RunManager { } private void startObjectStore() { - try (FileUtil.TempFile plasmaStoreFile = FileUtil - .getTempFileFromResource("plasma_store_server")) { - plasmaStoreFile.getFile().setExecutable(true); - List command = ImmutableList.of( - // The plasma store executable file. - plasmaStoreFile.getFile().getAbsolutePath(), - "-s", - rayConfig.objectStoreSocketName, - "-m", - rayConfig.objectStoreSize.toString() - ); - startProcess(command, null, "plasma_store"); - } + final File objectStoreFile = BinaryFileUtil.getFile( + rayConfig.sessionDir, BinaryFileUtil.PLASMA_STORE_SERVER_BINARY_NAME); + Preconditions.checkState(objectStoreFile.setExecutable(true)); + List command = ImmutableList.of( + // The plasma store executable file. + objectStoreFile.getAbsolutePath(), + "-s", + rayConfig.objectStoreSocketName, + "-m", + rayConfig.objectStoreSize.toString() + ); + startProcess(command, null, "plasma_store"); } + private String buildPythonWorkerCommand() { // disable python worker start from raylet, which starts from java if (rayConfig.pythonWorkerCommand == null) { diff --git a/java/runtime/src/main/java/org/ray/runtime/util/BinaryFileUtil.java b/java/runtime/src/main/java/org/ray/runtime/util/BinaryFileUtil.java new file mode 100644 index 000000000..4c5cc402f --- /dev/null +++ b/java/runtime/src/main/java/org/ray/runtime/util/BinaryFileUtil.java @@ -0,0 +1,48 @@ +package org.ray.runtime.util; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import org.apache.commons.io.FileUtils; + +public class BinaryFileUtil { + public static final String REDIS_SERVER_BINARY_NAME = "redis-server"; + + public static final String GCS_SERVER_BINARY_NAME = "gcs_server"; + + public static final String PLASMA_STORE_SERVER_BINARY_NAME = "plasma_store_server"; + + public static final String RAYLET_BINARY_NAME = "raylet"; + + public static final String REDIS_MODULE_LIBRARY_NAME = "libray_redis_module.so"; + + public static final String CORE_WORKER_JAVA_LIBRARY = + System.mapLibraryName("core_worker_library_java"); + + public static File getFile(String destDir, String fileName) { + File file = new File(String.format("%s/%s", destDir, fileName)); + if (file.exists()) { + return file; + } + + final File dir = file.getParentFile(); + try { + if (!dir.exists()) { + FileUtils.forceMkdir(dir); + } + } catch (IOException e) { + throw new RuntimeException("Couldn't make directory: " + dir.getAbsolutePath(), e); + } + // File does not exist. + try (InputStream is = BinaryFileUtil.class.getResourceAsStream("/" + fileName)) { + Preconditions.checkNotNull(is, "{} doesn't exist.", fileName); + Files.copy(is, Paths.get(file.getCanonicalPath())); + } catch (IOException e) { + throw new RuntimeException("Couldn't get temp file from resource " + fileName, e); + } + return file; + } +} diff --git a/java/runtime/src/main/java/org/ray/runtime/util/FileUtil.java b/java/runtime/src/main/java/org/ray/runtime/util/FileUtil.java deleted file mode 100644 index c65976330..000000000 --- a/java/runtime/src/main/java/org/ray/runtime/util/FileUtil.java +++ /dev/null @@ -1,68 +0,0 @@ -package org.ray.runtime.util; - -import com.google.common.base.Preconditions; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.nio.file.StandardCopyOption; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FileUtil { - - private static final Logger LOGGER = LoggerFactory.getLogger(FileUtil.class); - - /** - * Represents a temp file. - * - * This class implements the `AutoCloseable` interface. It can be used in a `try-with-resource` - * block. When exiting the block, the temp file will be automatically removed. - */ - public static class TempFile implements AutoCloseable { - - File file; - - TempFile(File file) { - this.file = file; - } - - public File getFile() { - return file; - } - - @Override - public void close() { - if (!file.delete()) { - LOGGER.warn("Couldn't delete temp file {}", file.getAbsolutePath()); - } - } - } - - /** - * Get a temp file from resource. - * - * @param resourceFileName File name. - * @return A `TempFile` object. - */ - public static TempFile getTempFileFromResource(String resourceFileName) { - File file; - try { - file = File.createTempFile(resourceFileName, ""); - } catch (IOException e) { - throw new RuntimeException("Couldn't create temp file " + resourceFileName, e); - } - - try (InputStream in = FileUtil.class.getResourceAsStream("/" + resourceFileName)) { - Preconditions.checkNotNull(in, "{} doesn't exist.", resourceFileName); - Files.copy(in, Paths.get(file.getCanonicalPath()), StandardCopyOption.REPLACE_EXISTING); - - } catch (IOException e) { - throw new RuntimeException("Couldn't get temp file from resource " + resourceFileName, e); - } - - return new TempFile(file); - } -} - diff --git a/java/runtime/src/main/java/org/ray/runtime/util/JniUtils.java b/java/runtime/src/main/java/org/ray/runtime/util/JniUtils.java index ccc68867c..b281eb3c2 100644 --- a/java/runtime/src/main/java/org/ray/runtime/util/JniUtils.java +++ b/java/runtime/src/main/java/org/ray/runtime/util/JniUtils.java @@ -3,8 +3,10 @@ package org.ray.runtime.util; import com.google.common.base.Strings; import com.google.common.collect.Sets; import com.sun.jna.NativeLibrary; +import java.io.File; import java.lang.reflect.Field; import java.util.Set; +import org.ray.runtime.config.RayConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,18 +38,17 @@ public class JniUtils { LOGGER.debug("Loading native library {}.", libraryName); // Load native library. String fileName = System.mapLibraryName(libraryName); - String libPath = null; - try (FileUtil.TempFile libFile = FileUtil.getTempFileFromResource(fileName)) { - libPath = libFile.getFile().getAbsolutePath(); - if (exportSymbols) { - // Expose library symbols using RTLD_GLOBAL which may be depended by other shared - // libraries. - NativeLibrary.getInstance(libFile.getFile().getAbsolutePath()); - } - System.load(libPath); + final String sessionDir = RayConfig.getInstance().sessionDir; + final File file = BinaryFileUtil.getFile(sessionDir, fileName); + + if (exportSymbols) { + // Expose library symbols using RTLD_GLOBAL which may be depended by other shared + // libraries. + NativeLibrary.getInstance(file.getAbsolutePath()); } + System.load(file.getAbsolutePath()); LOGGER.debug("Native library loaded."); - resetLibraryPath(libPath); + resetLibraryPath(file.getAbsolutePath()); loadedLibs.add(libraryName); } } diff --git a/java/runtime/src/main/resources/ray.default.conf b/java/runtime/src/main/resources/ray.default.conf index 066665913..2ea2c4374 100644 --- a/java/runtime/src/main/resources/ray.default.conf +++ b/java/runtime/src/main/resources/ray.default.conf @@ -32,7 +32,8 @@ ray { } // Root dir of log files. - log-dir: /tmp/ray/logs + // If this is not set, the default log-dir will be `${temp-dir}/session_xxx/logs`. + log-dir: "" // If true, output of worker processes will be redirected to log files. // Otherwise, output will be printed to console. @@ -74,8 +75,9 @@ ray { // Object store configurations // ---------------------------- object-store { - // RPC socket name of object store - socket-name: /tmp/ray/sockets/object_store + // RPC socket name of object store. + // If this is not set, the default name will be `${temp-dir}/session_xxx/sockets/object_store`. + socket-name: "" // Initial size of the object store. size: 10 MB } @@ -84,8 +86,9 @@ ray { // Raylet configurations // ---------------------------- raylet { - // RPC socket name of Raylet - socket-name: /tmp/ray/sockets/raylet + // RPC socket name of Raylet. + // If this is not set, the default name will be `${temp-dir}/session_xxx/sockets/raylet`. + socket-name: "" // Listening port for node manager. node-manager-port: 0 diff --git a/python/ray/services.py b/python/ray/services.py index 5285a8529..c93d673f1 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1328,7 +1328,7 @@ def build_java_worker_command( command += "-Dray.home={} ".format(RAY_HOME) command += "-Dray.log-dir={} ".format(os.path.join(session_dir, "logs")) - + command += "-Dray.session-dir={}".format(session_dir) command += ("-Dray.raylet.config.num_workers_per_process_java=" + "RAY_WORKER_NUM_WORKERS_PLACEHOLDER ") diff --git a/python/setup.py b/python/setup.py index ee015dd7a..62b384275 100644 --- a/python/setup.py +++ b/python/setup.py @@ -15,7 +15,6 @@ import setuptools.command.build_ext as _build_ext # manually. # NOTE: The lists below must be kept in sync with ray/BUILD.bazel. - ray_files = [ "ray/core/src/ray/thirdparty/redis/src/redis-server", "ray/core/src/ray/gcs/redis_module/libray_redis_module.so",