From 06a58016d851bd54c4796591ab96fd5551d08021 Mon Sep 17 00:00:00 2001 From: Wang Qing Date: Fri, 17 Aug 2018 12:59:44 +0800 Subject: [PATCH] [multi-language part 2] Change the command line arguments to start raylet (#2670) --- .../main/java/org/ray/runner/RunManager.java | 21 +++++++++++-------- python/ray/services.py | 3 ++- src/ray/raylet/main.cc | 17 ++++++++++++--- 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/java/runtime-native/src/main/java/org/ray/runner/RunManager.java b/java/runtime-native/src/main/java/org/ray/runner/RunManager.java index d625f3c90..6785bb7bb 100644 --- a/java/runtime-native/src/main/java/org/ray/runner/RunManager.java +++ b/java/runtime-native/src/main/java/org/ray/runner/RunManager.java @@ -198,11 +198,12 @@ public class RunManager { } int processIndex = runInfo.allProcesses.get(type.ordinal()).size(); + ProcessBuilder builder; - List newCmd = Arrays.stream(cmd).filter(s -> s.length() > 0) - .collect(Collectors.toList()); - builder = new ProcessBuilder(newCmd); + List newCommand = Arrays.asList(cmd); + builder = new ProcessBuilder(newCommand); builder.directory(new File(workDir)); + if (redirect) { String stdoutFile; String stderrFile; @@ -689,10 +690,10 @@ public class RunManager { String rayletSocketName = "/tmp/raylet" + rpcPort; String filePath = paths.raylet; - - String workerCmd = null; - workerCmd = buildWorkerCommandRaylet(info.storeName, rayletSocketName, UniqueID.nil, - "", workDir + rpcPort, ip, redisAddress); + + //Create the worker command that the raylet will use to start workers. + String workerCommand = buildWorkerCommandRaylet(info.storeName, rayletSocketName, + UniqueID.nil, "", workDir + rpcPort, ip, redisAddress); int sep = redisAddress.indexOf(':'); assert (sep != -1); @@ -701,8 +702,10 @@ public class RunManager { String resourceArgument = ResourceUtil.getResourcesStringFromMap(staticResources); - String[] cmds = new String[]{filePath, rayletSocketName, storeName, ip, gcsIp, - gcsPort, "" + numWorkers, workerCmd, resourceArgument}; + // The second-last arugment is the worker command for Python, not needed for Java. + String[] cmds = new String[]{filePath,rayletSocketName, storeName, ip, gcsIp, + gcsPort, "" + numWorkers, resourceArgument, + "", workerCommand}; Process p = startProcess(cmds, null, RunInfo.ProcessType.PT_RAYLET, workDir + rpcPort, redisAddress, ip, redirect, cleanup); diff --git a/python/ray/services.py b/python/ray/services.py index 3a23c04ee..90380c9c8 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -990,8 +990,9 @@ def start_raylet(redis_address, gcs_ip_address, gcs_port, str(num_workers), - start_worker_command, resource_argument, + start_worker_command, + "", # Worker command for Java, not needed for Python. ] if use_valgrind: diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 46a8e139a..3d5588508 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -6,7 +6,7 @@ #ifndef RAYLET_TEST int main(int argc, char *argv[]) { - RAY_CHECK(argc == 9); + RAY_CHECK(argc == 10); const std::string raylet_socket_name = std::string(argv[1]); const std::string store_socket_name = std::string(argv[2]); @@ -14,8 +14,9 @@ int main(int argc, char *argv[]) { const std::string redis_address = std::string(argv[4]); int redis_port = std::stoi(argv[5]); int num_initial_workers = std::stoi(argv[6]); - const std::string worker_command = std::string(argv[7]); - const std::string static_resource_list = std::string(argv[8]); + const std::string static_resource_list = std::string(argv[7]); + const std::string python_worker_command = std::string(argv[8]); + const std::string java_worker_command = std::string(argv[9]); // Configuration for the node manager. ray::raylet::NodeManagerConfig node_manager_config; @@ -39,6 +40,16 @@ int main(int argc, char *argv[]) { RayConfig::instance().num_workers_per_process(); // Use a default worker that can execute empty tasks with dependencies. + std::string worker_command; + if (!python_worker_command.empty()) { + worker_command = python_worker_command; + } else if (!java_worker_command.empty()) { + worker_command = java_worker_command; + } else { + RAY_CHECK(0) + << "Either Python worker command or Java worker command should be provided."; + } + std::istringstream iss(worker_command); std::vector results(std::istream_iterator{iss}, std::istream_iterator());