From 78b6bfb7f94192cfcc8e6fab03e8063dcdae674a Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Sun, 19 Aug 2018 14:46:36 +0800 Subject: [PATCH] [Java] Change log dir to /tmp/raylogs (#2677) Currently, log directory in Java is a relative path . This PR changes it to `/tmp/raylogs` (with the same format as Python, e.g., `local_scheduler-2018-51-17_17-8-6-05164.err`). It also cleans up some relative code. --- .gitignore | 1 - .../cli/src/main/java/org/ray/cli/RayCli.java | 9 +- .../main/java/org/ray/util/logger/RayLog.java | 9 +- java/ray.config.ini | 2 - .../main/java/org/ray/core/RayRuntime.java | 2 +- .../org/ray/core/model/RayParameters.java | 4 +- .../main/java/org/ray/runner/ProcessInfo.java | 2 +- .../main/java/org/ray/runner/RunManager.java | 263 ++++++------------ 8 files changed, 101 insertions(+), 191 deletions(-) diff --git a/.gitignore b/.gitignore index 1bd48415a..5c2e9ec73 100644 --- a/.gitignore +++ b/.gitignore @@ -153,7 +153,6 @@ build # Java java/**/target -java/run java/**/lib java/**/.settings java/**/.classpath diff --git a/java/cli/src/main/java/org/ray/cli/RayCli.java b/java/cli/src/main/java/org/ray/cli/RayCli.java index c99541f35..f65e42570 100644 --- a/java/cli/src/main/java/org/ray/cli/RayCli.java +++ b/java/cli/src/main/java/org/ray/cli/RayCli.java @@ -6,14 +6,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import net.lingala.zip4j.core.ZipFile; -import net.lingala.zip4j.exception.ZipException; import org.ray.api.UniqueID; -import org.ray.cli.CommandStart; -import org.ray.cli.CommandStop; -import org.ray.core.RayRuntime; import org.ray.core.model.RayParameters; import org.ray.core.model.RunMode; -import org.ray.runner.RunInfo; import org.ray.runner.RunManager; import org.ray.runner.worker.DefaultDriver; import org.ray.spi.KeyValueStoreLink; @@ -73,7 +68,7 @@ public class RayCli { RayParameters params = new RayParameters(config); // Init RayLog before using it. - RayLog.init(params.working_directory); + RayLog.init(params.log_dir); RayLog.core.info("Using IP address {} for this node.", params.node_ip_address); RunManager manager; @@ -173,7 +168,7 @@ public class RayCli { UniqueID resourceId = functionManager.registerResource(zip); // Init RayLog before using it. - RayLog.init(params.working_directory); + RayLog.init(params.log_dir); RayLog.rapp.debug( "registerResource " + resourceId + " for package " + packageName + " done"); diff --git a/java/common/src/main/java/org/ray/util/logger/RayLog.java b/java/common/src/main/java/org/ray/util/logger/RayLog.java index 4259f83fb..fb1beeb29 100644 --- a/java/common/src/main/java/org/ray/util/logger/RayLog.java +++ b/java/common/src/main/java/org/ray/util/logger/RayLog.java @@ -22,14 +22,13 @@ public class RayLog { public static Logger rapp; /** - * it must be called before using Ray loggers, - * or the dynamic update does not work. - * @param workingDir store the logs under params.working_directory + * Initialize loggers + * @param logDir directory of the log files. */ - public static void init(String workingDir) { + public static void init(String logDir) { String loggingPath = System.getProperty("logging.path"); if (loggingPath == null) { - System.setProperty("logging.path", workingDir + "/logs"); + System.setProperty("logging.path", logDir); } String loggingFileName = System.getProperty("logging.file.name"); if (loggingFileName != null && loggingFileName.contains("*pid_suffix*")) { diff --git a/java/ray.config.ini b/java/ray.config.ini index 8427ed635..5c7f9b696 100644 --- a/java/ray.config.ini +++ b/java/ray.config.ini @@ -42,8 +42,6 @@ num_workers = 2 driver_id = 0123456789abcdef0123456789abcdef01234567 -working_directory = %CONFIG_FILE_DIR%/run - redis_port = 34111 num_local_schedulers = 1 diff --git a/java/runtime-common/src/main/java/org/ray/core/RayRuntime.java b/java/runtime-common/src/main/java/org/ray/core/RayRuntime.java index 25bacf062..d155a22cf 100644 --- a/java/runtime-common/src/main/java/org/ray/core/RayRuntime.java +++ b/java/runtime-common/src/main/java/org/ray/core/RayRuntime.java @@ -80,7 +80,7 @@ public abstract class RayRuntime implements RayApi { configReader = new ConfigReader(configPath, updateConfigStr); RayRuntime.params = new RayParameters(configReader); - RayLog.init(params.working_directory); + RayLog.init(params.log_dir); assert RayLog.core != null; ins = instantiate(params); diff --git a/java/runtime-common/src/main/java/org/ray/core/model/RayParameters.java b/java/runtime-common/src/main/java/org/ray/core/model/RayParameters.java index 2b722a41c..b8dd42dd8 100644 --- a/java/runtime-common/src/main/java/org/ray/core/model/RayParameters.java +++ b/java/runtime-common/src/main/java/org/ray/core/model/RayParameters.java @@ -46,8 +46,8 @@ public class RayParameters { @AConfig(comment = "driver ID when the worker is served as a driver") public UniqueID driver_id = UniqueID.nil; - @AConfig(comment = "working directory") - public String working_directory = "./run"; + @AConfig(comment = "logging directory") + public String log_dir = "/tmp/raylogs"; @AConfig(comment = "primary redis port") public int redis_port = 34222; diff --git a/java/runtime-native/src/main/java/org/ray/runner/ProcessInfo.java b/java/runtime-native/src/main/java/org/ray/runner/ProcessInfo.java index c2489c69d..94bae347a 100644 --- a/java/runtime-native/src/main/java/org/ray/runner/ProcessInfo.java +++ b/java/runtime-native/src/main/java/org/ray/runner/ProcessInfo.java @@ -5,7 +5,7 @@ public class ProcessInfo { public Process process; public String[] cmd; public RunInfo.ProcessType type; - public String workDir; + public String name; public String redisAddress; public String ip; public boolean redirect; diff --git a/java/runtime-native/src/main/java/org/ray/runner/RunManager.java b/java/runtime-native/src/main/java/org/ray/runner/RunManager.java index 6785bb7bb..48febe0dc 100644 --- a/java/runtime-native/src/main/java/org/ray/runner/RunManager.java +++ b/java/runtime-native/src/main/java/org/ray/runner/RunManager.java @@ -1,15 +1,17 @@ package org.ray.runner; +import com.google.common.collect.ImmutableList; import java.io.File; import java.io.IOException; import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.ray.api.UniqueID; import org.ray.core.model.RayParameters; import org.ray.core.model.RunMode; @@ -29,18 +31,20 @@ public class RunManager { public static final int INT16_MAX = 32767; + private static final DateTimeFormatter DATE_TIME_FORMATTER = + DateTimeFormatter.ofPattern("Y-m-d_H-M-S"); + private RayParameters params; private PathConfig paths; private ConfigReader configReader; - private String procStdoutFileName = ""; - - private String procStderrFileName = ""; - private RunInfo runInfo = new RunInfo(); + private Random random = new Random(); + + public RunManager(RayParameters params, PathConfig paths, ConfigReader configReader) { this.params = params; this.paths = paths; @@ -60,18 +64,6 @@ public class RunManager { return runInfo; } - public PathConfig getPathManager() { - return paths; - } - - public String getProcStdoutFileName() { - return procStdoutFileName; - } - - public String getProcStderrFileName() { - return procStderrFileName; - } - public void startRayHead() throws Exception { if (params.redis_address.length() != 0) { throw new Exception("Redis address must be empty in head node."); @@ -110,9 +102,9 @@ public class RunManager { } public Process startDriver(String mainClass, String redisAddress, UniqueID driverId, - String workDir, String ip, - String driverClass, String driverArgs, String additonalClassPaths, - String additionalConfigs) { + String logDir, String ip, + String driverClass, String driverArgs, String additonalClassPaths, + String additionalConfigs) { String driverConfigs = "ray.java.start.driver_id=" + driverId + ";ray.java.start.driver_class=" + driverClass; if (driverArgs != null) { @@ -131,30 +123,29 @@ public class RunManager { additonalClassPaths, additionalConfigs, "", - workDir, ip, redisAddress, - true, + false, false, null ); } private Process startJavaProcess(RunInfo.ProcessType pt, String mainClass, - String additonalClassPaths, String additionalConfigs, - String additionalJvmArgs, String workDir, String ip, String - redisAddr, boolean redirect, - boolean cleanup, String agentlibAddr) { + String additonalClassPaths, String additionalConfigs, + String additionalJvmArgs, String ip, String + redisAddr, boolean redirect, + boolean cleanup, String agentlibAddr) { String cmd = buildJavaProcessCommand(pt, mainClass, additonalClassPaths, additionalConfigs, - additionalJvmArgs, workDir, ip, redisAddr, agentlibAddr); - return startProcess(cmd.split(" "), null, pt, workDir, redisAddr, ip, redirect, cleanup); + additionalJvmArgs, ip, redisAddr, agentlibAddr); + return startProcess(cmd.split(" "), null, pt, "", redisAddr, ip, redirect, cleanup); } private String buildJavaProcessCommand( RunInfo.ProcessType pt, String mainClass, String additionalClassPaths, String additionalConfigs, - String additionalJvmArgs, String workDir, String ip, String redisAddr, String agentlibAddr) { + String additionalJvmArgs, String ip, String redisAddr, String agentlibAddr) { String cmd = "java -ea -noverify " + params.jvm_parameters + " "; if (agentlibAddr != null && !agentlibAddr.equals("")) { cmd += " -agentlib:jdwp=transport=dt_socket,address=" + agentlibAddr + ",server=y,suspend=n"; @@ -178,7 +169,7 @@ public class RunManager { cmd += " --overwrite=" + section + "node_ip_address=" + ip + ";" + section + "redis_address=" + redisAddr + ";" - + section + "working_directory=" + workDir + ";" + + section + "log_dir=" + params.log_dir + ";" + section + "run_mode=" + params.run_mode; if (additionalConfigs.length() > 0) { @@ -189,34 +180,21 @@ public class RunManager { } private Process startProcess(String[] cmd, Map env, RunInfo.ProcessType type, - String workDir, - String redisAddress, String ip, boolean redirect, - boolean cleanup) { - File wdir = new File(workDir); - if (!wdir.exists()) { - wdir.mkdirs(); - } - - int processIndex = runInfo.allProcesses.get(type.ordinal()).size(); - + String name, + String redisAddress, String ip, boolean redirect, + boolean cleanup) { ProcessBuilder builder; List newCommand = Arrays.asList(cmd); builder = new ProcessBuilder(newCommand); - builder.directory(new File(workDir)); if (redirect) { - String stdoutFile; - String stderrFile; - stdoutFile = workDir + "/" + processIndex + ".out.txt"; - stderrFile = workDir + "/" + processIndex + ".err.txt"; - builder.redirectOutput(new File(stdoutFile)); - builder.redirectError(new File(stderrFile)); - List stdFileList = new ArrayList<>(); - stdFileList.add(stdoutFile); - stdFileList.add(stderrFile); - record_log_files_in_redis(redisAddress, ip, stdFileList); - procStdoutFileName = stdoutFile; - procStderrFileName = stderrFile; + int logId = random.nextInt(10000); + String date = DATE_TIME_FORMATTER.format(LocalDateTime.now()); + String stdout = String.format("%s/%s-%s-%05d.out", params.log_dir, name, date, logId); + String stderr = String.format("%s/%s-%s-%05d.err", params.log_dir, name, date, logId); + builder.redirectOutput(new File(stdout)); + builder.redirectError(new File(stderr)); + recordLogFilesInRedis(redisAddress, ip, ImmutableList.of(stdout, stderr)); } if (env != null && !env.isEmpty()) { @@ -227,16 +205,11 @@ public class RunManager { try { p = builder.start(); } catch (IOException e) { - RayLog.core - .error("Start process " + Arrays.toString(cmd).replace(',', ' ') + " in working dir '" - + workDir + "' failed", - e); + RayLog.core.error("Failed to start process {}", name, e); return null; } - RayLog.core.info( - "Start process " + p.hashCode() + " OK, cmd = " + Arrays.toString(cmd).replace(',', ' ') - + ", working dir = '" + workDir + "'" + (redirect ? ", redirect" : ", no redirect")); + RayLog.core.info("Process {} started", name); if (cleanup) { runInfo.toBeCleanedProcesses.get(type.ordinal()).add(p); @@ -245,7 +218,7 @@ public class RunManager { ProcessInfo processInfo = new ProcessInfo(); processInfo.cmd = cmd; processInfo.type = type; - processInfo.workDir = workDir; + processInfo.name = name; processInfo.redisAddress = redisAddress; processInfo.ip = ip; processInfo.redirect = redirect; @@ -256,28 +229,28 @@ public class RunManager { return p; } - private void record_log_files_in_redis(String redisAddress, String nodeIpAddress, - List logfiles) { + private void recordLogFilesInRedis(String redisAddress, String nodeIpAddress, + List logFiles) { if (redisAddress != null && !redisAddress.isEmpty() && nodeIpAddress != null - && !nodeIpAddress.isEmpty() && logfiles.size() > 0) { + && !nodeIpAddress.isEmpty() && logFiles.size() > 0) { String[] ipPort = redisAddress.split(":"); Jedis jedisClient = new Jedis(ipPort[0], Integer.parseInt(ipPort[1])); String logFileListKey = String.format("LOG_FILENAMES:{%s}", nodeIpAddress); - for (String logfile : logfiles) { + for (String logfile : logFiles) { jedisClient.rpush(logFileListKey, logfile); } jedisClient.close(); } } - public void startRayProcesses() { + private void startRayProcesses() { Jedis redisClient = null; RayLog.core.info("start ray processes @ " + params.node_ip_address + " ..."); // start primary redis if (params.redis_address.length() == 0) { - List primaryShards = startRedis(params.working_directory + "/redis", + List primaryShards = startRedis( params.node_ip_address, params.redis_port, 1, params.redirect, params.cleanup); params.redis_address = primaryShards.get(0); @@ -295,7 +268,7 @@ public class RunManager { // start redis shards if (params.start_redis_shards) { - runInfo.redisShards = startRedis(params.working_directory + "/redis/shards", + runInfo.redisShards = startRedis( params.node_ip_address, params.redis_port + 1, params.num_redis_shards, params.redirect, params.cleanup); @@ -310,7 +283,7 @@ public class RunManager { // start global scheduler if (params.include_global_scheduler && !params.use_raylet) { - startGlobalScheduler(params.working_directory + "/globalScheduler", + startGlobalScheduler( params.redis_address, params.node_ip_address, params.redirect, params.cleanup); } @@ -349,7 +322,7 @@ public class RunManager { int rpcPort = params.object_store_rpc_port; String storeName = "/tmp/plasma_store" + rpcPort; - startObjectStore(0, info, params.working_directory + "/store", + startObjectStore(0, info, params.redis_address, params.node_ip_address, params.redirect, params.cleanup); Map staticResources = @@ -357,18 +330,18 @@ public class RunManager { //Start raylet startRaylet(storeName, info, params.num_workers, - params.working_directory + "/raylet", params.redis_address, + params.redis_address, params.node_ip_address, params.redirect, staticResources, params.cleanup); runInfo.localStores.add(info); } else { for (int i = 0; i < params.num_local_schedulers; i++) { // Start object stores - startObjectStore(i, info, params.working_directory + "/store", + startObjectStore(i, info, params.redis_address, params.node_ip_address, params.redirect, params.cleanup); startObjectManager(i, info, - params.working_directory + "/storeManager", params.redis_address, + params.redis_address, params.node_ip_address, params.redirect, params.cleanup); // Start local scheduler @@ -381,7 +354,7 @@ public class RunManager { startLocalScheduler(i, info, params.num_cpus[i], params.num_gpus[i], workerCount, - params.working_directory + "/localsc", params.redis_address, + params.redis_address, params.node_ip_address, params.redirect, params.cleanup); runInfo.localStores.add(info); @@ -395,8 +368,8 @@ public class RunManager { localStores.workerCount = localNumWorkers[i]; for (int j = 0; j < localNumWorkers[i]; j++) { startWorker(localStores.storeName, localStores.managerName, localStores.schedulerName, - params.working_directory + "/worker" + i + "." + j, params.redis_address, - params.node_ip_address, UniqueID.nil, "", params.redirect, params.cleanup); + "/worker" + i + "." + j, params.redis_address, + params.node_ip_address, UniqueID.nil, "", params.redirect, params.cleanup); } } } @@ -415,7 +388,7 @@ public class RunManager { } } - public boolean checkAlive(HashSet excludeTypes) { + private boolean checkAlive(HashSet excludeTypes) { RunInfo.ProcessType[] types = RunInfo.ProcessType.values(); for (int i = 0; i < types.length; i++) { if (excludeTypes.contains(types[i])) { @@ -439,43 +412,6 @@ public class RunManager { return runInfo.deadProcess.isEmpty(); } - public boolean tryRecoverDeadProcess() { - - if (runInfo.deadProcess.isEmpty()) { - return true; - } - - /* check the dead process */ - for (ProcessInfo info : runInfo.deadProcess) { - if (info.type == RunInfo.ProcessType.PT_LOCAL_SCHEDULER - || info.type == RunInfo.ProcessType.PT_PLASMA_STORE - || info.type == RunInfo.ProcessType.PT_PLASMA_MANAGER) { - /* When local scheduler or plasma store or plasma manager process dead, we can not - * recover this node simply by restarting the dead process. Instead, We need to restart - * all the node processes - * */ - RayLog.core - .error(info.type.name() + "process dead, we can not simply restart this process"); - return false; - } - } - - /* try to recover */ - ProcessInfo info; - for (int i = 0; i < runInfo.deadProcess.size(); i++) { - info = runInfo.deadProcess.get(i); - if (info.type == RunInfo.ProcessType.PT_GLOBAL_SCHEDULER) { - RayLog.core.error(info.type.name() + "process dead, restart this process"); - startProcess(info.cmd, null, info.type, info.workDir, info.redisAddress, info.ip, - info.redirect, info.cleanup); - } else { - RayLog.core.error(info.type.name() + "process dead, we don't deal with it"); - } - } - runInfo.deadProcess.clear(); - return true; - } - // kill all processes started by startRayHead public void cleanup(boolean killAll) { // clean up the process in reverse order @@ -512,12 +448,12 @@ public class RunManager { // when the worker exits // @return primary redis shard address // - private List startRedis(String workDir, String ip, int port, int numOfShards, - boolean redirect, boolean cleanup) { + private List startRedis(String ip, int port, int numOfShards, + boolean redirect, boolean cleanup) { ArrayList shards = new ArrayList<>(); String addr; for (int i = 0; i < numOfShards; i++) { - addr = startRedisInstance(workDir, ip, port + i, redirect, cleanup); + addr = startRedisInstance(ip, port + i, redirect, cleanup); if (addr.length() == 0) { cleanup(cleanup); @@ -540,21 +476,20 @@ public class RunManager { // @param port given port for this redis instance, 0 for auto-selected port // @return redis server address // - private String startRedisInstance(String workDir, String ip, int port, - boolean redirect, boolean cleanup) { + private String startRedisInstance(String ip, int port, + boolean redirect, boolean cleanup) { String redisFilePath = paths.redis_server; String redisModule = paths.redis_module; assert (new File(redisFilePath).exists()) : "file don't exsits : " + redisFilePath; assert (new File(redisModule).exists()) : "file don't exsits : " + redisModule; - String cmd = redisFilePath + " --protected-mode no --port " + port + " --loglevel warning" + " --loadmodule " + redisModule; Map env = null; Process p = startProcess(cmd.split(" "), env, RunInfo.ProcessType.PT_REDIS_SERVER, - workDir + port, "", ip, redirect, cleanup); + "redis", "", ip, redirect, cleanup); if (p == null || !p.isAlive()) { return ""; @@ -578,31 +513,17 @@ public class RunManager { return ip + ":" + port; } - private void startGlobalScheduler(String workDir, String redisAddress, String ip, - boolean redirect, boolean cleanup) { + private void startGlobalScheduler(String redisAddress, String ip, + boolean redirect, boolean cleanup) { String filePath = paths.global_scheduler; String cmd = filePath + " -r " + redisAddress + " -h " + ip; Map env = null; - startProcess(cmd.split(" "), env, RunInfo.ProcessType.PT_GLOBAL_SCHEDULER, workDir, + startProcess(cmd.split(" "), env, RunInfo.ProcessType.PT_GLOBAL_SCHEDULER, "global_scheduler", redisAddress, ip, redirect, cleanup); } - private Map retrieveEnv(String conf, Map env) { - String[] splits = conf.split(" "); - for (String item : splits) { - int idx = item.trim().indexOf('='); - if (idx == -1) { - continue; - } - String key = item.substring(0, idx); - String val = item.substring(idx + 1); - env.put(key, val); - } - return env; - } - /* * @param storeName The name of the plasma store socket to connect to * @@ -625,9 +546,9 @@ public class RunManager { * start */ private void startLocalScheduler(int index, AddressInfo info, int numCpus, - int numGpus, int numWorkers, String workDir, - String redisAddress, String ip, boolean redirect, - boolean cleanup) { + int numGpus, int numWorkers, + String redisAddress, String ip, boolean redirect, + boolean cleanup) { //if (numCpus <= 0) // numCpus = Runtime.getRuntime().availableProcessors(); if (numGpus <= 0) { @@ -649,7 +570,7 @@ public class RunManager { String workerCmd = null; workerCmd = buildWorkerCommand(true, info.storeName, info.managerName, name, - UniqueID.nil, "", workDir + rpcPort, ip, redisAddress); + UniqueID.nil, "", ip, redisAddress); cmd += " -w \"" + workerCmd + "\""; if (redisAddress.length() > 0) { @@ -662,7 +583,7 @@ public class RunManager { Map env = null; String[] cmds = StringUtil.split(cmd, " ", "\"", "\"").toArray(new String[0]); Process p = startProcess(cmds, env, RunInfo.ProcessType.PT_LOCAL_SCHEDULER, - workDir + rpcPort, redisAddress, ip, redirect, cleanup); + "local_scheduler", redisAddress, ip, redirect, cleanup); if (p != null && p.isAlive()) { try { @@ -683,8 +604,8 @@ public class RunManager { } private void startRaylet(String storeName, AddressInfo info, int numWorkers, - String workDir, String redisAddress, String ip, boolean redirect, - Map staticResources, boolean cleanup) { + String redisAddress, String ip, boolean redirect, + Map staticResources, boolean cleanup) { int rpcPort = params.raylet_port; String rayletSocketName = "/tmp/raylet" + rpcPort; @@ -692,8 +613,8 @@ public class RunManager { String filePath = paths.raylet; //Create the worker command that the raylet will use to start workers. - String workerCommand = buildWorkerCommandRaylet(info.storeName, rayletSocketName, - UniqueID.nil, "", workDir + rpcPort, ip, redisAddress); + String workerCommand = buildWorkerCommandRaylet(info.storeName, rayletSocketName, + UniqueID.nil, "", ip, redisAddress); int sep = redisAddress.indexOf(':'); assert (sep != -1); @@ -703,12 +624,12 @@ public class RunManager { String resourceArgument = ResourceUtil.getResourcesStringFromMap(staticResources); // The second-last arugment is the worker command for Python, not needed for Java. - String[] cmds = new String[]{filePath,rayletSocketName, storeName, ip, gcsIp, - gcsPort, "" + numWorkers, resourceArgument, - "", workerCommand}; + String[] cmds = new String[]{filePath, rayletSocketName, storeName, ip, gcsIp, + gcsPort, "" + numWorkers, resourceArgument, + "", workerCommand}; Process p = startProcess(cmds, null, RunInfo.ProcessType.PT_RAYLET, - workDir + rpcPort, redisAddress, ip, redirect, cleanup); + "raylet", redisAddress, ip, redirect, cleanup); if (p != null && p.isAlive()) { try { @@ -729,8 +650,8 @@ public class RunManager { } private String buildWorkerCommandRaylet(String storeName, String rayletSocketName, - UniqueID actorId, String actorClass, String workDir, - String ip, String redisAddress) { + UniqueID actorId, String actorClass, + String ip, String redisAddress) { String workerConfigs = "ray.java.start.object_store_name=" + storeName + ";ray.java.start.raylet_socket_name=" + rayletSocketName + ";ray.java.start.worker_mode=WORKER;ray.java.start.use_raylet=true"; @@ -743,7 +664,7 @@ public class RunManager { } String jvmArgs = ""; - jvmArgs += " -Dlogging.path=" + params.working_directory + "/logs/workers"; + jvmArgs += " -Dlogging.path=" + params.log_dir; jvmArgs += " -Dlogging.file.name=core-*pid_suffix*"; return buildJavaProcessCommand( @@ -752,7 +673,6 @@ public class RunManager { "", workerConfigs, jvmArgs, - workDir, ip, redisAddress, null @@ -760,9 +680,9 @@ public class RunManager { } private String buildWorkerCommand(boolean isFromLocalScheduler, String storeName, - String storeManagerName, String localSchedulerName, - UniqueID actorId, String actorClass, String workDir, String - ip, String redisAddress) { + String storeManagerName, String localSchedulerName, + UniqueID actorId, String actorClass, String + ip, String redisAddress) { String workerConfigs = "ray.java.start.object_store_name=" + storeName + ";ray.java.start.object_store_manager_name=" + storeManagerName + ";ray.java.start.worker_mode=WORKER" @@ -776,7 +696,7 @@ public class RunManager { } String jvmArgs = ""; - jvmArgs += " -Dlogging.path=" + params.working_directory + "/logs/workers"; + jvmArgs += " -Dlogging.path=" + params.log_dir; jvmArgs += " -Dlogging.file.name=core-*pid_suffix*"; return buildJavaProcessCommand( @@ -785,15 +705,14 @@ public class RunManager { "", workerConfigs, jvmArgs, - workDir, ip, redisAddress, null ); } - private void startObjectStore(int index, AddressInfo info, String workDir, String redisAddress, - String ip, boolean redirect, boolean cleanup) { + private void startObjectStore(int index, AddressInfo info, String redisAddress, + String ip, boolean redirect, boolean cleanup) { int occupiedMemoryMb = params.object_store_occupied_memory_MB; long memoryBytes = occupiedMemoryMb * 1000000; String filePath = paths.store; @@ -804,7 +723,7 @@ public class RunManager { Map env = null; Process p = startProcess(cmd.split(" "), env, RunInfo.ProcessType.PT_PLASMA_STORE, - workDir + rpcPort, redisAddress, ip, redirect, cleanup); + "plasma_store", redisAddress, ip, redirect, cleanup); if (p != null && p.isAlive()) { try { @@ -824,9 +743,9 @@ public class RunManager { } } - private AddressInfo startObjectManager(int index, AddressInfo info, String workDir, - String redisAddress, String ip, boolean redirect, - boolean cleanup) { + private AddressInfo startObjectManager(int index, AddressInfo info, + String redisAddress, String ip, boolean redirect, + boolean cleanup) { String filePath = paths.store_manager; int rpcPort = params.object_store_manager_rpc_port + index; String name = "/tmp/plasma_manager" + rpcPort; @@ -838,7 +757,7 @@ public class RunManager { Map env = null; Process p = startProcess(cmd.split(" "), env, RunInfo.ProcessType.PT_PLASMA_MANAGER, - workDir + rpcPort, redisAddress, ip, redirect, cleanup); + "object_manager", redisAddress, ip, redirect, cleanup); if (p != null && p.isAlive()) { try { @@ -859,12 +778,12 @@ public class RunManager { } public void startWorker(String storeName, String storeManagerName, - String localSchedulerName, String workDir, String redisAddress, - String ip, UniqueID actorId, String actorClass, - boolean redirect, boolean cleanup) { + String localSchedulerName, String workerName, String redisAddress, + String ip, UniqueID actorId, String actorClass, + boolean redirect, boolean cleanup) { String cmd = buildWorkerCommand(false, storeName, storeManagerName, localSchedulerName, actorId, - actorClass, workDir, ip, redisAddress); - startProcess(cmd.split(" "), null, RunInfo.ProcessType.PT_WORKER, workDir, redisAddress, ip, + actorClass, ip, redisAddress); + startProcess(cmd.split(" "), null, RunInfo.ProcessType.PT_WORKER, workerName, redisAddress, ip, redirect, cleanup); } }