[Core] Multi-tenancy: Pass env variables from job config to worker processes (#10022)

This commit is contained in:
Kai Yang
2020-08-11 05:31:37 +08:00
committed by GitHub
parent df676bc9aa
commit 3bc17fa62a
7 changed files with 154 additions and 29 deletions
@@ -16,6 +16,9 @@ public class JobConfigTest extends BaseTest {
System.setProperty("ray.raylet.config.enable_multi_tenancy", "true");
System.setProperty("ray.job.num-java-workers-per-process", "3");
System.setProperty("ray.job.jvm-options.0", "-DX=999");
System.setProperty("ray.job.jvm-options.1", "-DY=998");
System.setProperty("ray.job.worker-env.foo1", "bar1");
System.setProperty("ray.job.worker-env.foo2", "bar2");
}
@AfterClass
@@ -23,10 +26,17 @@ public class JobConfigTest extends BaseTest {
System.clearProperty("ray.raylet.config.enable_multi_tenancy");
System.clearProperty("ray.job.num-java-workers-per-process");
System.clearProperty("ray.job.jvm-options.0");
System.clearProperty("ray.job.jvm-options.1");
System.clearProperty("ray.job.worker-env.foo1");
System.clearProperty("ray.job.worker-env.foo2");
}
public static String getJvmOptions() {
return System.getProperty("X");
public static String getJvmOptions(String propertyName) {
return System.getProperty(propertyName);
}
public static String getEnvVariable(String key) {
return System.getenv(key);
}
public static Integer getWorkersNum() {
@@ -39,14 +49,23 @@ public class JobConfigTest extends BaseTest {
return TestUtils.getRuntime().getRayConfig().numWorkersPerProcess;
}
public String getJvmOptions() {
return System.getProperty("X");
public String getJvmOptions(String propertyName) {
return System.getProperty(propertyName);
}
public static String getEnvVariable(String key) {
return System.getenv(key);
}
}
public void testJvmOptions() {
ObjectRef<String> obj = Ray.task(JobConfigTest::getJvmOptions).remote();
Assert.assertEquals("999", obj.get());
Assert.assertEquals("999", Ray.task(JobConfigTest::getJvmOptions, "X").remote().get());
Assert.assertEquals("998", Ray.task(JobConfigTest::getJvmOptions, "Y").remote().get());
}
public void testWorkerEnvVariable() {
Assert.assertEquals("bar1", Ray.task(JobConfigTest::getEnvVariable, "foo1").remote().get());
Assert.assertEquals("bar2", Ray.task(JobConfigTest::getEnvVariable, "foo2").remote().get());
}
public void testNumJavaWorkerPerProcess() {
@@ -59,8 +78,12 @@ public class JobConfigTest extends BaseTest {
ActorHandle<MyActor> actor = Ray.actor(MyActor::new).remote();
// test jvm options.
ObjectRef<String> obj1 = actor.task(MyActor::getJvmOptions).remote();
Assert.assertEquals("999", obj1.get());
Assert.assertEquals("999", actor.task(MyActor::getJvmOptions, "X").remote().get());
Assert.assertEquals("998", actor.task(MyActor::getJvmOptions, "Y").remote().get());
// test worker env variables
Assert.assertEquals("bar1", Ray.task(MyActor::getEnvVariable, "foo1").remote().get());
Assert.assertEquals("bar2", Ray.task(MyActor::getEnvVariable, "foo2").remote().get());
// test workers number.
ObjectRef<Integer> obj2 = actor.task(MyActor::getWorkersNum).remote();
+19
View File
@@ -1,5 +1,6 @@
# coding: utf-8
import json
import os
import sys
import grpc
@@ -112,5 +113,23 @@ ray.shutdown()
all_worker_pids.add(worker_pid)
def test_worker_env(shutdown_only):
ray.init(
job_config=ray.job_config.JobConfig(worker_env={
"foo1": "bar1",
"foo2": "bar2"
}),
_internal_config=json.dumps({
"enable_multi_tenancy": True
}))
@ray.remote
def get_env(key):
return os.environ.get(key)
assert ray.get(get_env.remote("foo1")) == "bar1"
assert ray.get(get_env.remote("foo2")) == "bar2"
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
+8 -4
View File
@@ -273,8 +273,11 @@ Process WorkerPool::StartWorkerProcess(const Language &language, const JobID &jo
<< " placeholder is not found in worker command.";
}
// TODO(kfstorm): Set up environment variables in a later PR.
Process proc = StartProcess(worker_command_args);
std::map<std::string, std::string> env;
if (RayConfig::instance().enable_multi_tenancy()) {
env.insert(job_config->worker_env().begin(), job_config->worker_env().end());
}
Process proc = StartProcess(worker_command_args, env);
if (RayConfig::instance().enable_multi_tenancy()) {
// If the pid is reused between processes, the old process must have exited.
// So it's safe to bind the pid with another job ID.
@@ -310,7 +313,8 @@ void WorkerPool::MonitorStartingWorkerProcess(const Process &proc,
});
}
Process WorkerPool::StartProcess(const std::vector<std::string> &worker_command_args) {
Process WorkerPool::StartProcess(const std::vector<std::string> &worker_command_args,
const std::map<std::string, std::string> &env) {
if (RAY_LOG_ENABLED(DEBUG)) {
std::stringstream stream;
stream << "Starting worker process with command:";
@@ -327,7 +331,7 @@ Process WorkerPool::StartProcess(const std::vector<std::string> &worker_command_
argv.push_back(arg.c_str());
}
argv.push_back(NULL);
Process child(argv.data(), io_service_, ec);
Process child(argv.data(), io_service_, ec, /*decouple=*/false, env);
if (!child.IsValid() || ec) {
// The worker failed to start. This is a fatal error.
RAY_LOG(FATAL) << "Failed to start worker with return value " << ec << ": "
+4 -1
View File
@@ -241,8 +241,11 @@ class WorkerPool : public WorkerPoolInterface {
/// unless the caller manually detaches the process after the call.
///
/// \param worker_command_args The command arguments of new worker process.
/// \param[in] env Additional environment variables to be set on this process besides
/// the environment variables of the parent process.
/// \return An object representing the started worker process.
virtual Process StartProcess(const std::vector<std::string> &worker_command_args);
virtual Process StartProcess(const std::vector<std::string> &worker_command_args,
const std::map<std::string, std::string> &env);
/// Push an warning message to user if worker pool is getting to big.
virtual void WarnAboutSize();
+2 -1
View File
@@ -48,7 +48,8 @@ class WorkerPoolMock : public WorkerPool {
using WorkerPool::StartWorkerProcess; // we need this to be public for testing
Process StartProcess(const std::vector<std::string> &worker_command_args) override {
Process StartProcess(const std::vector<std::string> &worker_command_args,
const std::map<std::string, std::string> &env) override {
// Use a bogus process ID that won't conflict with those in the system
pid_t pid = static_cast<pid_t>(PID_MAX_LIMIT + 1 + worker_commands_by_proc_.size());
last_worker_process_ = Process::FromPid(pid);
+81 -12
View File
@@ -40,6 +40,21 @@
#include "ray/util/logging.h"
#include "ray/util/util.h"
#ifdef __APPLE__
extern char **environ;
// macOS dosn't come with execvpe.
// https://stackoverflow.com/questions/7789750/execve-with-path-search
int execvpe(const char *program, char **argv, char **envp) {
char **saved = environ;
int rc;
environ = envp;
rc = execvp(program, argv);
environ = saved;
return rc;
}
#endif
namespace ray {
class ProcessFD {
@@ -60,11 +75,42 @@ class ProcessFD {
pid_t GetId() const;
// Fork + exec combo. Returns -1 for the PID on failure.
static ProcessFD spawnvp(const char *argv[], std::error_code &ec, bool decouple) {
static ProcessFD spawnvpe(const char *argv[], std::error_code &ec, bool decouple,
const std::map<std::string, std::string> &env) {
ec = std::error_code();
intptr_t fd;
pid_t pid;
#ifdef _WIN32
LPTCH env_strings = GetEnvironmentStrings();
RAY_CHECK(env_strings) << GetLastError();
std::vector<char> new_env_vector;
// Copy parent process environment variables
LPTSTR env_pointer = env_strings;
while (*env_pointer) {
LPTSTR env_pointer2 = env_pointer;
while (*env_pointer2) {
new_env_vector.push_back(*env_pointer2);
env_pointer2++;
}
new_env_vector.push_back('\0');
env_pointer2++;
env_pointer = env_pointer2;
}
RAY_CHECK(FreeEnvironmentStrings(env_strings)) << GetLastError();
// Add additional environment variables
for (const auto &item : env) {
for (const char &ch : item.first) {
new_env_vector.push_back(ch);
}
new_env_vector.push_back('=');
for (const char &ch : item.second) {
new_env_vector.push_back(ch);
}
new_env_vector.push_back('\0');
}
new_env_vector.push_back('\0');
auto new_env_strings = new_env_vector.data();
(void)decouple; // Windows doesn't require anything particular for decoupling.
std::vector<std::string> args;
for (size_t i = 0; argv[i]; ++i) {
@@ -86,7 +132,8 @@ class ProcessFD {
(void)cmd.c_str(); // We'll need this to be null-terminated (but mutable) below
TCHAR *cmdline = &*cmd.begin();
STARTUPINFO si = {sizeof(si)};
if (CreateProcessA(NULL, cmdline, NULL, NULL, FALSE, 0, NULL, NULL, &si, &pi)) {
if (CreateProcessA(NULL, cmdline, NULL, NULL, FALSE, 0, new_env_strings, NULL,
&si, &pi)) {
succeeded = true;
break;
}
@@ -102,6 +149,27 @@ class ProcessFD {
pid = -1;
}
#else
size_t environ_size = 0;
char **env_pointer = environ;
while (*env_pointer) {
environ_size++;
env_pointer++;
}
const char *envp[environ_size + env.size() + 1];
// Copy parent process environment variables
for (size_t i = 0; i < environ_size; i++) {
envp[i] = *(environ + i);
}
// Add additional environment variables
std::vector<std::string> env_strings;
for (const auto &item : env) {
env_strings.emplace_back(item.first + "=" + item.second);
}
for (size_t i = 0; i < env_strings.size(); i++) {
envp[environ_size + i] = env_strings[i].c_str();
}
envp[environ_size + env.size()] = NULL;
// TODO(mehrdadn): Use clone() on Linux or posix_spawnp() on Mac to avoid duplicating
// file descriptors into the child process, as that can be problematic.
int pipefds[2]; // Create pipe to get PID & track lifetime
@@ -127,7 +195,7 @@ class ProcessFD {
// This is the spawned process. Any intermediate parent is now dead.
pid_t my_pid = getpid();
if (write(pipefds[1], &my_pid, sizeof(my_pid)) == sizeof(my_pid)) {
execvp(argv[0], const_cast<char *const *>(argv));
execvpe(argv[0], const_cast<char **>(argv), const_cast<char **>(envp));
}
_exit(errno); // fork() succeeded and exec() failed, so abort the child
}
@@ -274,23 +342,24 @@ Process &Process::operator=(Process other) {
Process::Process(pid_t pid) { p_ = std::make_shared<ProcessFD>(pid); }
Process::Process(const char *argv[], void *io_service, std::error_code &ec,
bool decouple) {
Process::Process(const char *argv[], void *io_service, std::error_code &ec, bool decouple,
const std::map<std::string, std::string> &env) {
(void)io_service;
ProcessFD procfd = ProcessFD::spawnvp(argv, ec, decouple);
ProcessFD procfd = ProcessFD::spawnvpe(argv, ec, decouple, env);
if (!ec) {
p_ = std::make_shared<ProcessFD>(std::move(procfd));
}
}
std::error_code Process::Call(const std::vector<std::string> &args) {
std::error_code Process::Call(const std::vector<std::string> &args,
const std::map<std::string, std::string> &env) {
std::vector<const char *> argv;
for (size_t i = 0; i != args.size(); ++i) {
argv.push_back(args[i].c_str());
}
argv.push_back(NULL);
std::error_code ec;
Process proc(&*argv.begin(), NULL, ec, true);
Process proc(&*argv.begin(), NULL, ec, true, env);
if (!ec) {
int return_code = proc.Wait();
if (return_code != 0) {
@@ -320,16 +389,16 @@ bool Process::IsNull() const { return !p_; }
bool Process::IsValid() const { return GetId() != -1; }
std::pair<Process, std::error_code> Process::Spawn(const std::vector<std::string> &args,
bool decouple,
const std::string &pid_file) {
std::pair<Process, std::error_code> Process::Spawn(
const std::vector<std::string> &args, bool decouple, const std::string &pid_file,
const std::map<std::string, std::string> &env) {
std::vector<const char *> argv;
for (size_t i = 0; i != args.size(); ++i) {
argv.push_back(args[i].c_str());
}
argv.push_back(NULL);
std::error_code error;
Process proc(&*argv.begin(), NULL, error, decouple);
Process proc(&*argv.begin(), NULL, error, decouple, env);
if (!error && !pid_file.empty()) {
std::ofstream file(pid_file, std::ios_base::out | std::ios_base::trunc);
file << proc.GetId() << std::endl;
+9 -3
View File
@@ -21,6 +21,7 @@
#endif
#include <functional>
#include <map>
#include <memory>
#include <string>
#include <system_error>
@@ -59,10 +60,14 @@ class Process {
/// \param[in] io_service Boost.Asio I/O service (optional).
/// \param[in] ec Returns any error that occurred when spawning the process.
/// \param[in] decouple True iff the parent will not wait for the child to exit.
/// \param[in] env Additional environment variables to be set on this process besides
/// the environment variables of the parent process.
explicit Process(const char *argv[], void *io_service, std::error_code &ec,
bool decouple = false);
bool decouple = false,
const std::map<std::string, std::string> &env = {});
/// Convenience function to run the given command line and wait for it to finish.
static std::error_code Call(const std::vector<std::string> &args);
static std::error_code Call(const std::vector<std::string> &args,
const std::map<std::string, std::string> &env = {});
static Process CreateNewDummy();
static Process FromPid(pid_t pid);
pid_t GetId() const;
@@ -77,7 +82,8 @@ class Process {
/// \param pid_file A file to write the PID of the spawned process in.
static std::pair<Process, std::error_code> Spawn(
const std::vector<std::string> &args, bool decouple,
const std::string &pid_file = std::string());
const std::string &pid_file = std::string(),
const std::map<std::string, std::string> &env = {});
/// Waits for process to terminate. Not supported for unowned processes.
/// \return The process's exit code. Returns 0 for a dummy process, -1 for a null one.
int Wait() const;