diff --git a/java/test/src/main/java/io/ray/test/JobConfigTest.java b/java/test/src/main/java/io/ray/test/JobConfigTest.java index ce0dfb0da..5bb4ea48a 100644 --- a/java/test/src/main/java/io/ray/test/JobConfigTest.java +++ b/java/test/src/main/java/io/ray/test/JobConfigTest.java @@ -16,6 +16,9 @@ public class JobConfigTest extends BaseTest { System.setProperty("ray.raylet.config.enable_multi_tenancy", "true"); System.setProperty("ray.job.num-java-workers-per-process", "3"); System.setProperty("ray.job.jvm-options.0", "-DX=999"); + System.setProperty("ray.job.jvm-options.1", "-DY=998"); + System.setProperty("ray.job.worker-env.foo1", "bar1"); + System.setProperty("ray.job.worker-env.foo2", "bar2"); } @AfterClass @@ -23,10 +26,17 @@ public class JobConfigTest extends BaseTest { System.clearProperty("ray.raylet.config.enable_multi_tenancy"); System.clearProperty("ray.job.num-java-workers-per-process"); System.clearProperty("ray.job.jvm-options.0"); + System.clearProperty("ray.job.jvm-options.1"); + System.clearProperty("ray.job.worker-env.foo1"); + System.clearProperty("ray.job.worker-env.foo2"); } - public static String getJvmOptions() { - return System.getProperty("X"); + public static String getJvmOptions(String propertyName) { + return System.getProperty(propertyName); + } + + public static String getEnvVariable(String key) { + return System.getenv(key); } public static Integer getWorkersNum() { @@ -39,14 +49,23 @@ public class JobConfigTest extends BaseTest { return TestUtils.getRuntime().getRayConfig().numWorkersPerProcess; } - public String getJvmOptions() { - return System.getProperty("X"); + public String getJvmOptions(String propertyName) { + return System.getProperty(propertyName); + } + + public static String getEnvVariable(String key) { + return System.getenv(key); } } public void testJvmOptions() { - ObjectRef obj = Ray.task(JobConfigTest::getJvmOptions).remote(); - Assert.assertEquals("999", obj.get()); + Assert.assertEquals("999", Ray.task(JobConfigTest::getJvmOptions, "X").remote().get()); + Assert.assertEquals("998", Ray.task(JobConfigTest::getJvmOptions, "Y").remote().get()); + } + + public void testWorkerEnvVariable() { + Assert.assertEquals("bar1", Ray.task(JobConfigTest::getEnvVariable, "foo1").remote().get()); + Assert.assertEquals("bar2", Ray.task(JobConfigTest::getEnvVariable, "foo2").remote().get()); } public void testNumJavaWorkerPerProcess() { @@ -59,8 +78,12 @@ public class JobConfigTest extends BaseTest { ActorHandle actor = Ray.actor(MyActor::new).remote(); // test jvm options. - ObjectRef obj1 = actor.task(MyActor::getJvmOptions).remote(); - Assert.assertEquals("999", obj1.get()); + Assert.assertEquals("999", actor.task(MyActor::getJvmOptions, "X").remote().get()); + Assert.assertEquals("998", actor.task(MyActor::getJvmOptions, "Y").remote().get()); + + // test worker env variables + Assert.assertEquals("bar1", Ray.task(MyActor::getEnvVariable, "foo1").remote().get()); + Assert.assertEquals("bar2", Ray.task(MyActor::getEnvVariable, "foo2").remote().get()); // test workers number. ObjectRef obj2 = actor.task(MyActor::getWorkersNum).remote(); diff --git a/python/ray/tests/test_multi_tenancy.py b/python/ray/tests/test_multi_tenancy.py index ade11cc8b..2deb786dc 100644 --- a/python/ray/tests/test_multi_tenancy.py +++ b/python/ray/tests/test_multi_tenancy.py @@ -1,5 +1,6 @@ # coding: utf-8 import json +import os import sys import grpc @@ -112,5 +113,23 @@ ray.shutdown() all_worker_pids.add(worker_pid) +def test_worker_env(shutdown_only): + ray.init( + job_config=ray.job_config.JobConfig(worker_env={ + "foo1": "bar1", + "foo2": "bar2" + }), + _internal_config=json.dumps({ + "enable_multi_tenancy": True + })) + + @ray.remote + def get_env(key): + return os.environ.get(key) + + assert ray.get(get_env.remote("foo1")) == "bar1" + assert ray.get(get_env.remote("foo2")) == "bar2" + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 350bb5d2e..c8ebdc0cc 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -273,8 +273,11 @@ Process WorkerPool::StartWorkerProcess(const Language &language, const JobID &jo << " placeholder is not found in worker command."; } - // TODO(kfstorm): Set up environment variables in a later PR. - Process proc = StartProcess(worker_command_args); + std::map env; + if (RayConfig::instance().enable_multi_tenancy()) { + env.insert(job_config->worker_env().begin(), job_config->worker_env().end()); + } + Process proc = StartProcess(worker_command_args, env); if (RayConfig::instance().enable_multi_tenancy()) { // If the pid is reused between processes, the old process must have exited. // So it's safe to bind the pid with another job ID. @@ -310,7 +313,8 @@ void WorkerPool::MonitorStartingWorkerProcess(const Process &proc, }); } -Process WorkerPool::StartProcess(const std::vector &worker_command_args) { +Process WorkerPool::StartProcess(const std::vector &worker_command_args, + const std::map &env) { if (RAY_LOG_ENABLED(DEBUG)) { std::stringstream stream; stream << "Starting worker process with command:"; @@ -327,7 +331,7 @@ Process WorkerPool::StartProcess(const std::vector &worker_command_ argv.push_back(arg.c_str()); } argv.push_back(NULL); - Process child(argv.data(), io_service_, ec); + Process child(argv.data(), io_service_, ec, /*decouple=*/false, env); if (!child.IsValid() || ec) { // The worker failed to start. This is a fatal error. RAY_LOG(FATAL) << "Failed to start worker with return value " << ec << ": " diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 64c497d42..59338db30 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -241,8 +241,11 @@ class WorkerPool : public WorkerPoolInterface { /// unless the caller manually detaches the process after the call. /// /// \param worker_command_args The command arguments of new worker process. + /// \param[in] env Additional environment variables to be set on this process besides + /// the environment variables of the parent process. /// \return An object representing the started worker process. - virtual Process StartProcess(const std::vector &worker_command_args); + virtual Process StartProcess(const std::vector &worker_command_args, + const std::map &env); /// Push an warning message to user if worker pool is getting to big. virtual void WarnAboutSize(); diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index c8fae0ae7..1cf777821 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -48,7 +48,8 @@ class WorkerPoolMock : public WorkerPool { using WorkerPool::StartWorkerProcess; // we need this to be public for testing - Process StartProcess(const std::vector &worker_command_args) override { + Process StartProcess(const std::vector &worker_command_args, + const std::map &env) override { // Use a bogus process ID that won't conflict with those in the system pid_t pid = static_cast(PID_MAX_LIMIT + 1 + worker_commands_by_proc_.size()); last_worker_process_ = Process::FromPid(pid); diff --git a/src/ray/util/process.cc b/src/ray/util/process.cc index 829020da3..22b818db4 100644 --- a/src/ray/util/process.cc +++ b/src/ray/util/process.cc @@ -40,6 +40,21 @@ #include "ray/util/logging.h" #include "ray/util/util.h" +#ifdef __APPLE__ +extern char **environ; + +// macOS dosn't come with execvpe. +// https://stackoverflow.com/questions/7789750/execve-with-path-search +int execvpe(const char *program, char **argv, char **envp) { + char **saved = environ; + int rc; + environ = envp; + rc = execvp(program, argv); + environ = saved; + return rc; +} +#endif + namespace ray { class ProcessFD { @@ -60,11 +75,42 @@ class ProcessFD { pid_t GetId() const; // Fork + exec combo. Returns -1 for the PID on failure. - static ProcessFD spawnvp(const char *argv[], std::error_code &ec, bool decouple) { + static ProcessFD spawnvpe(const char *argv[], std::error_code &ec, bool decouple, + const std::map &env) { ec = std::error_code(); intptr_t fd; pid_t pid; #ifdef _WIN32 + LPTCH env_strings = GetEnvironmentStrings(); + RAY_CHECK(env_strings) << GetLastError(); + std::vector new_env_vector; + // Copy parent process environment variables + LPTSTR env_pointer = env_strings; + while (*env_pointer) { + LPTSTR env_pointer2 = env_pointer; + while (*env_pointer2) { + new_env_vector.push_back(*env_pointer2); + env_pointer2++; + } + new_env_vector.push_back('\0'); + env_pointer2++; + env_pointer = env_pointer2; + } + RAY_CHECK(FreeEnvironmentStrings(env_strings)) << GetLastError(); + // Add additional environment variables + for (const auto &item : env) { + for (const char &ch : item.first) { + new_env_vector.push_back(ch); + } + new_env_vector.push_back('='); + for (const char &ch : item.second) { + new_env_vector.push_back(ch); + } + new_env_vector.push_back('\0'); + } + new_env_vector.push_back('\0'); + auto new_env_strings = new_env_vector.data(); + (void)decouple; // Windows doesn't require anything particular for decoupling. std::vector args; for (size_t i = 0; argv[i]; ++i) { @@ -86,7 +132,8 @@ class ProcessFD { (void)cmd.c_str(); // We'll need this to be null-terminated (but mutable) below TCHAR *cmdline = &*cmd.begin(); STARTUPINFO si = {sizeof(si)}; - if (CreateProcessA(NULL, cmdline, NULL, NULL, FALSE, 0, NULL, NULL, &si, &pi)) { + if (CreateProcessA(NULL, cmdline, NULL, NULL, FALSE, 0, new_env_strings, NULL, + &si, &pi)) { succeeded = true; break; } @@ -102,6 +149,27 @@ class ProcessFD { pid = -1; } #else + size_t environ_size = 0; + char **env_pointer = environ; + while (*env_pointer) { + environ_size++; + env_pointer++; + } + const char *envp[environ_size + env.size() + 1]; + // Copy parent process environment variables + for (size_t i = 0; i < environ_size; i++) { + envp[i] = *(environ + i); + } + // Add additional environment variables + std::vector env_strings; + for (const auto &item : env) { + env_strings.emplace_back(item.first + "=" + item.second); + } + for (size_t i = 0; i < env_strings.size(); i++) { + envp[environ_size + i] = env_strings[i].c_str(); + } + envp[environ_size + env.size()] = NULL; + // TODO(mehrdadn): Use clone() on Linux or posix_spawnp() on Mac to avoid duplicating // file descriptors into the child process, as that can be problematic. int pipefds[2]; // Create pipe to get PID & track lifetime @@ -127,7 +195,7 @@ class ProcessFD { // This is the spawned process. Any intermediate parent is now dead. pid_t my_pid = getpid(); if (write(pipefds[1], &my_pid, sizeof(my_pid)) == sizeof(my_pid)) { - execvp(argv[0], const_cast(argv)); + execvpe(argv[0], const_cast(argv), const_cast(envp)); } _exit(errno); // fork() succeeded and exec() failed, so abort the child } @@ -274,23 +342,24 @@ Process &Process::operator=(Process other) { Process::Process(pid_t pid) { p_ = std::make_shared(pid); } -Process::Process(const char *argv[], void *io_service, std::error_code &ec, - bool decouple) { +Process::Process(const char *argv[], void *io_service, std::error_code &ec, bool decouple, + const std::map &env) { (void)io_service; - ProcessFD procfd = ProcessFD::spawnvp(argv, ec, decouple); + ProcessFD procfd = ProcessFD::spawnvpe(argv, ec, decouple, env); if (!ec) { p_ = std::make_shared(std::move(procfd)); } } -std::error_code Process::Call(const std::vector &args) { +std::error_code Process::Call(const std::vector &args, + const std::map &env) { std::vector argv; for (size_t i = 0; i != args.size(); ++i) { argv.push_back(args[i].c_str()); } argv.push_back(NULL); std::error_code ec; - Process proc(&*argv.begin(), NULL, ec, true); + Process proc(&*argv.begin(), NULL, ec, true, env); if (!ec) { int return_code = proc.Wait(); if (return_code != 0) { @@ -320,16 +389,16 @@ bool Process::IsNull() const { return !p_; } bool Process::IsValid() const { return GetId() != -1; } -std::pair Process::Spawn(const std::vector &args, - bool decouple, - const std::string &pid_file) { +std::pair Process::Spawn( + const std::vector &args, bool decouple, const std::string &pid_file, + const std::map &env) { std::vector argv; for (size_t i = 0; i != args.size(); ++i) { argv.push_back(args[i].c_str()); } argv.push_back(NULL); std::error_code error; - Process proc(&*argv.begin(), NULL, error, decouple); + Process proc(&*argv.begin(), NULL, error, decouple, env); if (!error && !pid_file.empty()) { std::ofstream file(pid_file, std::ios_base::out | std::ios_base::trunc); file << proc.GetId() << std::endl; diff --git a/src/ray/util/process.h b/src/ray/util/process.h index 5603baa65..d545492a5 100644 --- a/src/ray/util/process.h +++ b/src/ray/util/process.h @@ -21,6 +21,7 @@ #endif #include +#include #include #include #include @@ -59,10 +60,14 @@ class Process { /// \param[in] io_service Boost.Asio I/O service (optional). /// \param[in] ec Returns any error that occurred when spawning the process. /// \param[in] decouple True iff the parent will not wait for the child to exit. + /// \param[in] env Additional environment variables to be set on this process besides + /// the environment variables of the parent process. explicit Process(const char *argv[], void *io_service, std::error_code &ec, - bool decouple = false); + bool decouple = false, + const std::map &env = {}); /// Convenience function to run the given command line and wait for it to finish. - static std::error_code Call(const std::vector &args); + static std::error_code Call(const std::vector &args, + const std::map &env = {}); static Process CreateNewDummy(); static Process FromPid(pid_t pid); pid_t GetId() const; @@ -77,7 +82,8 @@ class Process { /// \param pid_file A file to write the PID of the spawned process in. static std::pair Spawn( const std::vector &args, bool decouple, - const std::string &pid_file = std::string()); + const std::string &pid_file = std::string(), + const std::map &env = {}); /// Waits for process to terminate. Not supported for unowned processes. /// \return The process's exit code. Returns 0 for a dummy process, -1 for a null one. int Wait() const;