mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 20:22:39 +08:00
Fix the issue when passing multiple options in one string (#5241)
* Fix * Fix linting * Fix linting * Address * Fix test
This commit is contained in:
@@ -22,7 +22,9 @@ public class WorkerJvmOptionsTest extends BaseTest {
|
||||
public void testJvmOptions() {
|
||||
TestUtils.skipTestUnderSingleProcess();
|
||||
ActorCreationOptions options = new ActorCreationOptions.Builder()
|
||||
.setJvmOptions("-Dtest.suffix=suffix")
|
||||
// The whitespaces in following argument are intentionally added to test
|
||||
// that raylet can correctly handle dynamic options with whitespaces.
|
||||
.setJvmOptions(" -Dtest.suffix=suffix -Dtest.suffix1=suffix1 ")
|
||||
.createActorCreationOptions();
|
||||
RayActor<Echo> actor = Ray.createActor(Echo::new, options);
|
||||
RayObject<String> obj = Ray.call(Echo::getOptions, actor);
|
||||
|
||||
+2
-10
@@ -32,14 +32,6 @@ DEFINE_bool(enable_stdout_exporter, false,
|
||||
|
||||
#ifndef RAYLET_TEST
|
||||
|
||||
/// A helper function that parse the worker command string into a vector of arguments.
|
||||
static std::vector<std::string> parse_worker_command(std::string worker_command) {
|
||||
std::istringstream iss(worker_command);
|
||||
std::vector<std::string> result(std::istream_iterator<std::string>{iss},
|
||||
std::istream_iterator<std::string>());
|
||||
return result;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog,
|
||||
ray::RayLog::ShutDownRayLog, argv[0],
|
||||
@@ -118,11 +110,11 @@ int main(int argc, char *argv[]) {
|
||||
|
||||
if (!python_worker_command.empty()) {
|
||||
node_manager_config.worker_commands.emplace(
|
||||
make_pair(ray::Language::PYTHON, parse_worker_command(python_worker_command)));
|
||||
make_pair(ray::Language::PYTHON, SplitStrByWhitespaces(python_worker_command)));
|
||||
}
|
||||
if (!java_worker_command.empty()) {
|
||||
node_manager_config.worker_commands.emplace(
|
||||
make_pair(ray::Language::JAVA, parse_worker_command(java_worker_command)));
|
||||
make_pair(ray::Language::JAVA, SplitStrByWhitespaces(java_worker_command)));
|
||||
}
|
||||
if (python_worker_command.empty() && java_worker_command.empty()) {
|
||||
RAY_CHECK(0)
|
||||
|
||||
@@ -120,7 +120,7 @@ int WorkerPool::StartWorkerProcess(const Language &language,
|
||||
<< " non-actor workers";
|
||||
|
||||
// Extract pointers from the worker command to pass into execvp.
|
||||
std::vector<const char *> worker_command_args;
|
||||
std::vector<std::string> worker_command_args;
|
||||
size_t dynamic_option_index = 0;
|
||||
for (auto const &token : state.worker_command) {
|
||||
const auto option_placeholder =
|
||||
@@ -129,14 +129,15 @@ int WorkerPool::StartWorkerProcess(const Language &language,
|
||||
if (token == option_placeholder) {
|
||||
if (!dynamic_options.empty()) {
|
||||
RAY_CHECK(dynamic_option_index < dynamic_options.size());
|
||||
worker_command_args.push_back(dynamic_options[dynamic_option_index].c_str());
|
||||
auto options = SplitStrByWhitespaces(dynamic_options[dynamic_option_index]);
|
||||
worker_command_args.insert(worker_command_args.end(), options.begin(),
|
||||
options.end());
|
||||
++dynamic_option_index;
|
||||
}
|
||||
} else {
|
||||
worker_command_args.push_back(token.c_str());
|
||||
worker_command_args.push_back(token);
|
||||
}
|
||||
}
|
||||
worker_command_args.push_back(nullptr);
|
||||
|
||||
pid_t pid = StartProcess(worker_command_args);
|
||||
if (pid < 0) {
|
||||
@@ -152,7 +153,16 @@ int WorkerPool::StartWorkerProcess(const Language &language,
|
||||
return -1;
|
||||
}
|
||||
|
||||
pid_t WorkerPool::StartProcess(const std::vector<const char *> &worker_command_args) {
|
||||
pid_t WorkerPool::StartProcess(const std::vector<std::string> &worker_command_args) {
|
||||
if (RAY_LOG_ENABLED(DEBUG)) {
|
||||
std::stringstream stream;
|
||||
stream << "Starting worker process with command:";
|
||||
for (const auto &arg : worker_command_args) {
|
||||
stream << " " << arg;
|
||||
}
|
||||
RAY_LOG(DEBUG) << stream.str();
|
||||
}
|
||||
|
||||
// Launch the process to create the worker.
|
||||
pid_t pid = fork();
|
||||
|
||||
@@ -165,8 +175,14 @@ pid_t WorkerPool::StartProcess(const std::vector<const char *> &worker_command_a
|
||||
signal(SIGCHLD, SIG_DFL);
|
||||
|
||||
// Try to execute the worker command.
|
||||
int rv = execvp(worker_command_args[0],
|
||||
const_cast<char *const *>(worker_command_args.data()));
|
||||
std::vector<const char *> worker_command_args_str;
|
||||
for (const auto &arg : worker_command_args) {
|
||||
worker_command_args_str.push_back(arg.c_str());
|
||||
}
|
||||
worker_command_args_str.push_back(nullptr);
|
||||
int rv = execvp(worker_command_args_str[0],
|
||||
const_cast<char *const *>(worker_command_args_str.data()));
|
||||
|
||||
// The worker failed to start. This is a fatal error.
|
||||
RAY_LOG(FATAL) << "Failed to start worker with return value " << rv << ": "
|
||||
<< strerror(errno);
|
||||
|
||||
@@ -148,7 +148,7 @@ class WorkerPool {
|
||||
///
|
||||
/// \param worker_command_args The command arguments of new worker process.
|
||||
/// \return The process ID of started worker process.
|
||||
virtual pid_t StartProcess(const std::vector<const char *> &worker_command_args);
|
||||
virtual pid_t StartProcess(const std::vector<std::string> &worker_command_args);
|
||||
|
||||
/// Push an warning message to user if worker pool is getting to big.
|
||||
virtual void WarnAboutSize();
|
||||
|
||||
@@ -33,16 +33,9 @@ class WorkerPoolMock : public WorkerPool {
|
||||
WorkerPool::StartWorkerProcess(language, dynamic_options);
|
||||
}
|
||||
|
||||
pid_t StartProcess(const std::vector<const char *> &worker_command_args) override {
|
||||
pid_t StartProcess(const std::vector<std::string> &worker_command_args) override {
|
||||
last_worker_pid_ += 1;
|
||||
std::vector<std::string> local_worker_commands_args;
|
||||
for (auto item : worker_command_args) {
|
||||
if (item == nullptr) {
|
||||
break;
|
||||
}
|
||||
local_worker_commands_args.push_back(std::string(item));
|
||||
}
|
||||
worker_commands_by_pid[last_worker_pid_] = std::move(local_worker_commands_args);
|
||||
worker_commands_by_pid[last_worker_pid_] = worker_command_args;
|
||||
return last_worker_pid_;
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,8 @@
|
||||
|
||||
#include <boost/system/error_code.hpp>
|
||||
#include <chrono>
|
||||
#include <iterator>
|
||||
#include <sstream>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "ray/common/status.h"
|
||||
@@ -51,6 +53,18 @@ inline ray::Status boost_to_ray_status(const boost::system::error_code &error) {
|
||||
}
|
||||
}
|
||||
|
||||
/// A helper function to split a string by whitespaces.
|
||||
///
|
||||
/// \param str The string with whitespaces.
|
||||
///
|
||||
/// \return A vector that contains strings split by whitespaces.
|
||||
inline std::vector<std::string> SplitStrByWhitespaces(const std::string &str) {
|
||||
std::istringstream iss(str);
|
||||
std::vector<std::string> result(std::istream_iterator<std::string>{iss},
|
||||
std::istream_iterator<std::string>());
|
||||
return result;
|
||||
}
|
||||
|
||||
class InitShutdownRAII {
|
||||
public:
|
||||
/// Type of the Shutdown function.
|
||||
|
||||
Reference in New Issue
Block a user