[Core] Multi-tenancy: Pass job ID from Raylet to worker via env variable (#11829)

* Pass job ID from Raylet to worker via env variable

* fix

* fix

* fix

* lint

* fix

* fix test_object_spilling

* address comments

* lint

* fix
This commit is contained in:
Kai Yang
2020-11-10 03:02:15 +08:00
committed by GitHub
parent 77e3163630
commit 904f48ebd9
15 changed files with 125 additions and 71 deletions
+2
View File
@@ -42,3 +42,5 @@ constexpr char kWorkerRayletConfigPlaceholder[] = "RAY_WORKER_RAYLET_CONFIG_PLAC
/// Public DNS address which is is used to connect and get local IP.
constexpr char kPublicDNSServerIp[] = "8.8.8.8";
constexpr int kPublicDNSServerPort = 53;
constexpr char kEnvVarKeyJobId[] = "RAY_JOB_ID";
+40
View File
@@ -61,6 +61,7 @@ class BaseID {
// Warning: this can duplicate IDs after a fork() call. We assume this never happens.
static T FromRandom();
static T FromBinary(const std::string &binary);
static T FromHex(const std::string &hex_str);
static const T &Nil();
static size_t Size() { return T::Size(); }
@@ -394,6 +395,45 @@ T BaseID<T>::FromBinary(const std::string &binary) {
return t;
}
inline unsigned char hex_to_uchar(const char c, bool &err) {
unsigned char num = 0;
if (c >= '0' && c <= '9') {
num = c - '0';
} else if (c >= 'a' && c <= 'f') {
num = c - 'a' + 0xa;
} else if (c >= 'A' && c <= 'F') {
num = c - 'A' + 0xA;
} else {
err = true;
}
return num;
}
template <typename T>
T BaseID<T>::FromHex(const std::string &hex_str) {
T id;
if (2 * T::Size() != hex_str.size()) {
RAY_LOG(ERROR) << "incorrect hex string length: 2 * " << T::Size()
<< " != " << hex_str.size() << ", hex string: " << hex_str;
return T::Nil();
}
uint8_t *data = id.MutableData();
for (size_t i = 0; i < T::Size(); i++) {
char first = hex_str[2 * i];
char second = hex_str[2 * i + 1];
bool err = false;
data[i] = (hex_to_uchar(first, err) << 4) + hex_to_uchar(second, err);
if (err) {
RAY_LOG(ERROR) << "incorrect hex character, hex string: " << hex_str;
return T::Nil();
}
}
return id;
}
template <typename T>
const T &BaseID<T>::Nil() {
static const T nil_id;
+19 -8
View File
@@ -111,7 +111,9 @@ WorkerContext::WorkerContext(WorkerType worker_type, const WorkerID &worker_id,
const JobID &job_id)
: worker_type_(worker_type),
worker_id_(worker_id),
current_job_id_(worker_type_ == WorkerType::DRIVER ? job_id : JobID::Nil()),
current_job_id_(RayConfig::instance().enable_multi_tenancy()
? job_id
: (worker_type_ == WorkerType::DRIVER ? job_id : JobID::Nil())),
current_actor_id_(ActorID::Nil()),
current_actor_placement_group_id_(PlacementGroupID::Nil()),
placement_group_capture_child_tasks_(true),
@@ -161,7 +163,10 @@ const std::unordered_map<std::string, std::string>
return override_environment_variables_;
}
void WorkerContext::SetCurrentJobId(const JobID &job_id) { current_job_id_ = job_id; }
void WorkerContext::SetCurrentJobId(const JobID &job_id) {
RAY_CHECK(!RayConfig::instance().enable_multi_tenancy());
current_job_id_ = job_id;
}
void WorkerContext::SetCurrentTaskId(const TaskID &task_id) {
GetThreadContext().SetCurrentTaskId(task_id);
@@ -170,13 +175,17 @@ void WorkerContext::SetCurrentTaskId(const TaskID &task_id) {
void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) {
GetThreadContext().SetCurrentTask(task_spec);
if (task_spec.IsNormalTask()) {
RAY_CHECK(current_job_id_.IsNil());
SetCurrentJobId(task_spec.JobId());
if (!RayConfig::instance().enable_multi_tenancy()) {
RAY_CHECK(current_job_id_.IsNil());
SetCurrentJobId(task_spec.JobId());
}
current_task_is_direct_call_ = true;
override_environment_variables_ = task_spec.OverrideEnvironmentVariables();
} else if (task_spec.IsActorCreationTask()) {
RAY_CHECK(current_job_id_.IsNil());
SetCurrentJobId(task_spec.JobId());
if (!RayConfig::instance().enable_multi_tenancy()) {
RAY_CHECK(current_job_id_.IsNil());
SetCurrentJobId(task_spec.JobId());
}
RAY_CHECK(current_actor_id_.IsNil());
current_actor_id_ = task_spec.ActorCreationId();
current_actor_is_direct_call_ = true;
@@ -187,7 +196,9 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) {
placement_group_capture_child_tasks_ = task_spec.PlacementGroupCaptureChildTasks();
override_environment_variables_ = task_spec.OverrideEnvironmentVariables();
} else if (task_spec.IsActorTask()) {
RAY_CHECK(current_job_id_ == task_spec.JobId());
if (!RayConfig::instance().enable_multi_tenancy()) {
RAY_CHECK(current_job_id_ == task_spec.JobId());
}
RAY_CHECK(current_actor_id_ == task_spec.ActorId());
} else {
RAY_CHECK(false);
@@ -196,7 +207,7 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) {
void WorkerContext::ResetCurrentTask(const TaskSpecification &task_spec) {
GetThreadContext().ResetCurrentTask();
if (task_spec.IsNormalTask()) {
if (!RayConfig::instance().enable_multi_tenancy() && task_spec.IsNormalTask()) {
SetCurrentJobId(JobID::Nil());
}
}
+1
View File
@@ -42,6 +42,7 @@ class WorkerContext {
const std::unordered_map<std::string, std::string>
&GetCurrentOverrideEnvironmentVariables() const;
// TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted.
// TODO(edoakes): remove this once Python core worker uses the task interfaces.
void SetCurrentJobId(const JobID &job_id);
+25 -1
View File
@@ -59,6 +59,30 @@ void BuildCommonTaskSpec(
}
}
ray::JobID GetProcessJobID(const ray::CoreWorkerOptions &options) {
if (options.worker_type == ray::WorkerType::DRIVER) {
RAY_CHECK(!options.job_id.IsNil());
} else {
RAY_CHECK(options.job_id.IsNil());
}
if (options.worker_type == ray::WorkerType::WORKER) {
// For workers, the job ID is assigned by Raylet via an environment variable.
const char *job_id_env = std::getenv(kEnvVarKeyJobId);
// TODO(kfstorm): Use `RAY_CHECK` instead once the `enable_multi_tenancy` flag is
// removed.
// RAY_CHECK(job_id_env);
if (!job_id_env) {
// Multi-tenancy is disabled.
// NOTE(kfstorm): We can't read `RayConfig::instance().enable_multi_tenancy()` here
// because `RayConfig` is not initialized yet.
return ray::JobID::Nil();
}
return ray::JobID::FromHex(job_id_env);
}
return options.job_id;
}
} // namespace
namespace ray {
@@ -273,7 +297,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
get_call_site_(RayConfig::instance().record_ref_creation_sites()
? options_.get_lang_stack
: nullptr),
worker_context_(options_.worker_type, worker_id, options_.job_id),
worker_context_(options_.worker_type, worker_id, GetProcessJobID(options_)),
io_work_(io_service_),
client_call_manager_(new rpc::ClientCallManager(io_service_)),
death_check_timer_(io_service_),
-1
View File
@@ -39,7 +39,6 @@ class MockWorker {
options.language = Language::PYTHON;
options.store_socket = store_socket;
options.raylet_socket = raylet_socket;
options.job_id = JobID::FromInt(1);
options.gcs_options = gcs_options;
options.enable_logging = true;
options.install_failure_signal_handler = true;
+15 -6
View File
@@ -1166,13 +1166,22 @@ void NodeManager::ProcessRegisterClientRequestMessage(
auto message = flatbuffers::GetRoot<protocol::RegisterClientRequest>(message_data);
Language language = static_cast<Language>(message->language());
const JobID job_id = from_flatbuf<JobID>(*message->job_id());
WorkerID worker_id = from_flatbuf<WorkerID>(*message->worker_id());
pid_t pid = message->worker_pid();
std::string worker_ip_address = string_from_flatbuf(*message->ip_address());
// TODO(suquark): Use `WorkerType` in `common.proto` without type converting.
rpc::WorkerType worker_type = static_cast<rpc::WorkerType>(message->worker_type());
auto worker = std::dynamic_pointer_cast<WorkerInterface>(std::make_shared<Worker>(
worker_id, language, worker_type, worker_ip_address, client, client_call_manager_));
if ((RayConfig::instance().enable_multi_tenancy() &&
worker_type != rpc::WorkerType::IO_WORKER) ||
worker_type == rpc::WorkerType::DRIVER) {
RAY_CHECK(!job_id.IsNil());
} else {
RAY_CHECK(job_id.IsNil());
}
auto worker = std::dynamic_pointer_cast<WorkerInterface>(
std::make_shared<Worker>(job_id, worker_id, language, worker_type,
worker_ip_address, client, client_call_manager_));
auto send_reply_callback = [this, client](Status status, int assigned_port) {
flatbuffers::FlatBufferBuilder fbb;
@@ -1211,14 +1220,12 @@ void NodeManager::ProcessRegisterClientRequestMessage(
// Register the new driver.
RAY_CHECK(pid >= 0);
worker->SetProcess(Process::FromPid(pid));
const JobID job_id = from_flatbuf<JobID>(*message->job_id());
// Compute a dummy driver task id from a given driver.
const TaskID driver_task_id = TaskID::ComputeDriverTaskId(worker_id);
worker->AssignTaskId(driver_task_id);
rpc::JobConfig job_config;
job_config.ParseFromString(message->serialized_job_config()->str());
Status status =
worker_pool_.RegisterDriver(worker, job_id, job_config, send_reply_callback);
Status status = worker_pool_.RegisterDriver(worker, job_config, send_reply_callback);
if (status.ok()) {
local_queues_.AddDriverTaskId(driver_task_id);
auto job_data_ptr =
@@ -2847,7 +2854,9 @@ void NodeManager::FinishAssignTask(const std::shared_ptr<WorkerInterface> &worke
// We successfully assigned the task to the worker.
worker->AssignTaskId(spec.TaskId());
worker->SetOwnerAddress(spec.CallerAddress());
worker->AssignJobId(spec.JobId());
if (!RayConfig::instance().enable_multi_tenancy()) {
worker->AssignJobId(spec.JobId());
}
// TODO(swang): For actors with multiple actor handles, to
// guarantee that tasks are replayed in the same order after a
// failure, we must update the task's execution dependency to be
+1
View File
@@ -32,6 +32,7 @@ class MockWorker : public WorkerInterface {
void AssignTaskId(const TaskID &task_id) {}
// TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted.
void AssignJobId(const JobID &job_id) {}
void SetAssignedTask(Task &assigned_task) {}
+4 -13
View File
@@ -26,7 +26,7 @@ namespace ray {
namespace raylet {
/// A constructor responsible for initializing the state of a worker.
Worker::Worker(const WorkerID &worker_id, const Language &language,
Worker::Worker(const JobID &job_id, const WorkerID &worker_id, const Language &language,
rpc::WorkerType worker_type, const std::string &ip_address,
std::shared_ptr<ClientConnection> connection,
rpc::ClientCallManager &client_call_manager)
@@ -37,6 +37,7 @@ Worker::Worker(const WorkerID &worker_id, const Language &language,
assigned_port_(-1),
port_(-1),
connection_(connection),
assigned_job_id_(job_id),
placement_group_id_(PlacementGroupID::Nil()),
dead_(false),
blocked_(false),
@@ -111,18 +112,8 @@ const std::unordered_set<TaskID> &Worker::GetBlockedTaskIds() const {
}
void Worker::AssignJobId(const JobID &job_id) {
if (!RayConfig::instance().enable_multi_tenancy()) {
assigned_job_id_ = job_id;
} else {
if (!assigned_job_id_.IsNil()) {
RAY_CHECK(assigned_job_id_ == job_id)
<< "The worker " << worker_id_ << " is already assigned to job "
<< assigned_job_id_ << ". It cannot be reassigned to job " << job_id;
} else {
assigned_job_id_ = job_id;
RAY_LOG(INFO) << "Assigned worker " << worker_id_ << " to job " << job_id;
}
}
RAY_CHECK(!RayConfig::instance().enable_multi_tenancy());
assigned_job_id_ = job_id;
}
const JobID &Worker::GetAssignedJobId() const { return assigned_job_id_; }
+5 -2
View File
@@ -60,6 +60,7 @@ class WorkerInterface {
virtual bool AddBlockedTaskId(const TaskID &task_id) = 0;
virtual bool RemoveBlockedTaskId(const TaskID &task_id) = 0;
virtual const std::unordered_set<TaskID> &GetBlockedTaskIds() const = 0;
// TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted.
virtual void AssignJobId(const JobID &job_id) = 0;
virtual const JobID &GetAssignedJobId() const = 0;
virtual void AssignActorId(const ActorID &actor_id) = 0;
@@ -121,8 +122,9 @@ class Worker : public WorkerInterface {
public:
/// A constructor that initializes a worker object.
/// NOTE: You MUST manually set the worker process.
Worker(const WorkerID &worker_id, const Language &language, rpc::WorkerType worker_type,
const std::string &ip_address, std::shared_ptr<ClientConnection> connection,
Worker(const JobID &job_id, const WorkerID &worker_id, const Language &language,
rpc::WorkerType worker_type, const std::string &ip_address,
std::shared_ptr<ClientConnection> connection,
rpc::ClientCallManager &client_call_manager);
/// A destructor responsible for freeing all worker state.
~Worker() {}
@@ -149,6 +151,7 @@ class Worker : public WorkerInterface {
bool AddBlockedTaskId(const TaskID &task_id);
bool RemoveBlockedTaskId(const TaskID &task_id);
const std::unordered_set<TaskID> &GetBlockedTaskIds() const;
// TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted.
void AssignJobId(const JobID &job_id);
const JobID &GetAssignedJobId() const;
void AssignActorId(const ActorID &actor_id);
+8 -29
View File
@@ -329,6 +329,12 @@ Process WorkerPool::StartWorkerProcess(
}
ProcessEnvironment env;
if (RayConfig::instance().enable_multi_tenancy() &&
worker_type != rpc::WorkerType::IO_WORKER) {
// We pass the job ID to worker processes via an environment variable, so we don't
// need to add a new CLI parameter for both Python and Java workers.
env.emplace(kEnvVarKeyJobId, job_id.Hex());
}
if (RayConfig::instance().enable_multi_tenancy() && job_config) {
env.insert(job_config->worker_env().begin(), job_config->worker_env().end());
}
@@ -338,12 +344,6 @@ Process WorkerPool::StartWorkerProcess(
}
Process proc = StartProcess(worker_command_args, env);
if (RayConfig::instance().enable_multi_tenancy() && job_config) {
// If the pid is reused between processes, the old process must have exited.
// So it's safe to bind the pid with another job ID.
RAY_LOG(DEBUG) << "Worker process " << proc.GetId() << " is bound to job " << job_id;
state.worker_pids_to_assigned_jobs[proc.GetId()] = job_id;
}
RAY_LOG(DEBUG) << "Started worker process of " << workers_to_start
<< " worker(s) with pid " << proc.GetId();
MonitorStartingWorkerProcess(proc, language, worker_type);
@@ -486,27 +486,6 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker
state.registered_workers.insert(worker);
if (RayConfig::instance().enable_multi_tenancy() &&
worker->GetWorkerType() != rpc::WorkerType::IO_WORKER) {
auto dedicated_workers_it = state.worker_pids_to_assigned_jobs.find(pid);
RAY_CHECK(dedicated_workers_it != state.worker_pids_to_assigned_jobs.end());
auto job_id = dedicated_workers_it->second;
// If the job is unknown to Raylet, we don't allow new registrations.
if (!all_jobs_.contains(job_id)) {
auto process = Process::FromPid(pid);
state.starting_worker_processes.erase(process);
Status status =
Status::Invalid("The provided job ID is unknown. Reject registration.");
send_reply_callback(status, /*port=*/0);
return status;
}
worker->AssignJobId(job_id);
// We don't call state.worker_pids_to_assigned_jobs.erase(job_id) here
// because we allow multi-workers per worker process.
}
// Send the reply immediately for worker registrations.
send_reply_callback(Status::OK(), port);
return Status::OK();
@@ -549,7 +528,7 @@ void WorkerPool::OnWorkerStarted(const std::shared_ptr<WorkerInterface> &worker)
}
Status WorkerPool::RegisterDriver(const std::shared_ptr<WorkerInterface> &driver,
const JobID &job_id, const rpc::JobConfig &job_config,
const rpc::JobConfig &job_config,
std::function<void(Status, int)> send_reply_callback) {
int port;
RAY_CHECK(!driver->GetAssignedTaskId().IsNil());
@@ -561,7 +540,7 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr<WorkerInterface> &driver
driver->SetAssignedPort(port);
auto &state = GetStateForLanguage(driver->GetLanguage());
state.registered_drivers.insert(std::move(driver));
driver->AssignJobId(job_id);
const auto job_id = driver->GetAssignedJobId();
all_jobs_[job_id] = job_config;
// This is a workaround to start initial workers on this node if and only if Raylet is
+1 -4
View File
@@ -158,13 +158,12 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// Register a new driver.
///
/// \param[in] worker The driver to be registered.
/// \param[in] job_id The job ID of the driver.
/// \param[in] job_config The config of the job.
/// \param[in] send_reply_callback The callback to invoke after registration is
/// finished/failed.
/// \return If the registration is successful.
Status RegisterDriver(const std::shared_ptr<WorkerInterface> &worker,
const JobID &job_id, const rpc::JobConfig &job_config,
const rpc::JobConfig &job_config,
std::function<void(Status, int)> send_reply_callback);
/// Get the client connection's registered worker.
@@ -334,8 +333,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
std::unordered_map<Process, TaskID> dedicated_workers_to_tasks;
/// A map for speeding up looking up the pending worker for the given task.
std::unordered_map<TaskID, Process> tasks_to_dedicated_workers;
/// A map for looking up the owner JobId by the pid of worker.
std::unordered_map<pid_t, JobID> worker_pids_to_assigned_jobs;
/// We'll push a warning to the user every time a multiple of this many
/// worker processes has been started.
int multiple_for_warning;
+3 -5
View File
@@ -119,11 +119,10 @@ class WorkerPoolTest : public ::testing::TestWithParam<bool> {
ClientConnection::Create(client_handler, message_handler, std::move(socket),
"worker", {}, error_message_type_);
std::shared_ptr<Worker> worker_ = std::make_shared<Worker>(
WorkerID::FromRandom(), language, rpc::WorkerType::WORKER, "127.0.0.1", client,
client_call_manager_);
job_id, WorkerID::FromRandom(), language, rpc::WorkerType::WORKER, "127.0.0.1",
client, client_call_manager_);
std::shared_ptr<WorkerInterface> worker =
std::dynamic_pointer_cast<WorkerInterface>(worker_);
worker->AssignJobId(job_id);
if (!proc.IsNull()) {
worker->SetProcess(proc);
}
@@ -135,8 +134,7 @@ class WorkerPoolTest : public ::testing::TestWithParam<bool> {
const rpc::JobConfig &job_config = rpc::JobConfig()) {
auto driver = CreateWorker(Process::CreateNewDummy(), Language::PYTHON, job_id);
driver->AssignTaskId(TaskID::ForDriverTask(job_id));
RAY_CHECK_OK(
worker_pool_->RegisterDriver(driver, job_id, job_config, [](Status, int) {}));
RAY_CHECK_OK(worker_pool_->RegisterDriver(driver, job_config, [](Status, int) {}));
return driver;
}
+1 -1
View File
@@ -180,7 +180,7 @@ class RayletClient : public PinObjectsInterface,
/// \param worker_id A unique ID to represent the worker.
/// \param worker_type The type of the worker. If it is a certain worker type, an
/// additional message will be sent to register as one.
/// \param job_id The ID of the driver. This is non-nil if the client is a driver.
/// \param job_id The job ID of the driver or worker.
/// \param language Language of the worker.
/// \param ip_address The IP address of the worker.
/// \param status This will be populated with the result of connection attempt.
-1
View File
@@ -493,7 +493,6 @@ class StreamingWorker {
options.language = Language::PYTHON;
options.store_socket = store_socket;
options.raylet_socket = raylet_socket;
options.job_id = JobID::FromInt(1);
options.gcs_options = gcs_options;
options.enable_logging = true;
options.install_failure_signal_handler = true;