diff --git a/java/test/src/main/java/org/ray/api/test/WorkerJvmOptionsTest.java b/java/test/src/main/java/org/ray/api/test/WorkerJvmOptionsTest.java index 90a2817a8..4a39ae2e9 100644 --- a/java/test/src/main/java/org/ray/api/test/WorkerJvmOptionsTest.java +++ b/java/test/src/main/java/org/ray/api/test/WorkerJvmOptionsTest.java @@ -22,7 +22,9 @@ public class WorkerJvmOptionsTest extends BaseTest { public void testJvmOptions() { TestUtils.skipTestUnderSingleProcess(); ActorCreationOptions options = new ActorCreationOptions.Builder() - .setJvmOptions("-Dtest.suffix=suffix") + // The whitespaces in following argument are intentionally added to test + // that raylet can correctly handle dynamic options with whitespaces. + .setJvmOptions(" -Dtest.suffix=suffix -Dtest.suffix1=suffix1 ") .createActorCreationOptions(); RayActor actor = Ray.createActor(Echo::new, options); RayObject obj = Ray.call(Echo::getOptions, actor); diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 4be890228..7efded15b 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -32,14 +32,6 @@ DEFINE_bool(enable_stdout_exporter, false, #ifndef RAYLET_TEST -/// A helper function that parse the worker command string into a vector of arguments. -static std::vector parse_worker_command(std::string worker_command) { - std::istringstream iss(worker_command); - std::vector result(std::istream_iterator{iss}, - std::istream_iterator()); - return result; -} - int main(int argc, char *argv[]) { InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog, ray::RayLog::ShutDownRayLog, argv[0], @@ -118,11 +110,11 @@ int main(int argc, char *argv[]) { if (!python_worker_command.empty()) { node_manager_config.worker_commands.emplace( - make_pair(ray::Language::PYTHON, parse_worker_command(python_worker_command))); + make_pair(ray::Language::PYTHON, SplitStrByWhitespaces(python_worker_command))); } if (!java_worker_command.empty()) { node_manager_config.worker_commands.emplace( - make_pair(ray::Language::JAVA, parse_worker_command(java_worker_command))); + make_pair(ray::Language::JAVA, SplitStrByWhitespaces(java_worker_command))); } if (python_worker_command.empty() && java_worker_command.empty()) { RAY_CHECK(0) diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index ee2f10ea0..a02014020 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -120,7 +120,7 @@ int WorkerPool::StartWorkerProcess(const Language &language, << " non-actor workers"; // Extract pointers from the worker command to pass into execvp. - std::vector worker_command_args; + std::vector worker_command_args; size_t dynamic_option_index = 0; for (auto const &token : state.worker_command) { const auto option_placeholder = @@ -129,14 +129,15 @@ int WorkerPool::StartWorkerProcess(const Language &language, if (token == option_placeholder) { if (!dynamic_options.empty()) { RAY_CHECK(dynamic_option_index < dynamic_options.size()); - worker_command_args.push_back(dynamic_options[dynamic_option_index].c_str()); + auto options = SplitStrByWhitespaces(dynamic_options[dynamic_option_index]); + worker_command_args.insert(worker_command_args.end(), options.begin(), + options.end()); ++dynamic_option_index; } } else { - worker_command_args.push_back(token.c_str()); + worker_command_args.push_back(token); } } - worker_command_args.push_back(nullptr); pid_t pid = StartProcess(worker_command_args); if (pid < 0) { @@ -152,7 +153,16 @@ int WorkerPool::StartWorkerProcess(const Language &language, return -1; } -pid_t WorkerPool::StartProcess(const std::vector &worker_command_args) { +pid_t WorkerPool::StartProcess(const std::vector &worker_command_args) { + if (RAY_LOG_ENABLED(DEBUG)) { + std::stringstream stream; + stream << "Starting worker process with command:"; + for (const auto &arg : worker_command_args) { + stream << " " << arg; + } + RAY_LOG(DEBUG) << stream.str(); + } + // Launch the process to create the worker. pid_t pid = fork(); @@ -165,8 +175,14 @@ pid_t WorkerPool::StartProcess(const std::vector &worker_command_a signal(SIGCHLD, SIG_DFL); // Try to execute the worker command. - int rv = execvp(worker_command_args[0], - const_cast(worker_command_args.data())); + std::vector worker_command_args_str; + for (const auto &arg : worker_command_args) { + worker_command_args_str.push_back(arg.c_str()); + } + worker_command_args_str.push_back(nullptr); + int rv = execvp(worker_command_args_str[0], + const_cast(worker_command_args_str.data())); + // The worker failed to start. This is a fatal error. RAY_LOG(FATAL) << "Failed to start worker with return value " << rv << ": " << strerror(errno); diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 3221c5cf0..9569cd5c2 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -148,7 +148,7 @@ class WorkerPool { /// /// \param worker_command_args The command arguments of new worker process. /// \return The process ID of started worker process. - virtual pid_t StartProcess(const std::vector &worker_command_args); + virtual pid_t StartProcess(const std::vector &worker_command_args); /// Push an warning message to user if worker pool is getting to big. virtual void WarnAboutSize(); diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 386bec31b..16a80aebb 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -33,16 +33,9 @@ class WorkerPoolMock : public WorkerPool { WorkerPool::StartWorkerProcess(language, dynamic_options); } - pid_t StartProcess(const std::vector &worker_command_args) override { + pid_t StartProcess(const std::vector &worker_command_args) override { last_worker_pid_ += 1; - std::vector local_worker_commands_args; - for (auto item : worker_command_args) { - if (item == nullptr) { - break; - } - local_worker_commands_args.push_back(std::string(item)); - } - worker_commands_by_pid[last_worker_pid_] = std::move(local_worker_commands_args); + worker_commands_by_pid[last_worker_pid_] = worker_command_args; return last_worker_pid_; } diff --git a/src/ray/util/util.h b/src/ray/util/util.h index c8c9930ff..9a5ae95b9 100644 --- a/src/ray/util/util.h +++ b/src/ray/util/util.h @@ -3,6 +3,8 @@ #include #include +#include +#include #include #include "ray/common/status.h" @@ -51,6 +53,18 @@ inline ray::Status boost_to_ray_status(const boost::system::error_code &error) { } } +/// A helper function to split a string by whitespaces. +/// +/// \param str The string with whitespaces. +/// +/// \return A vector that contains strings split by whitespaces. +inline std::vector SplitStrByWhitespaces(const std::string &str) { + std::istringstream iss(str); + std::vector result(std::istream_iterator{iss}, + std::istream_iterator()); + return result; +} + class InitShutdownRAII { public: /// Type of the Shutdown function.