From 904f48ebd9dc25e91aa6841d39bf9c905c86d817 Mon Sep 17 00:00:00 2001 From: Kai Yang Date: Tue, 10 Nov 2020 03:02:15 +0800 Subject: [PATCH] [Core] Multi-tenancy: Pass job ID from Raylet to worker via env variable (#11829) * Pass job ID from Raylet to worker via env variable * fix * fix * fix * lint * fix * fix test_object_spilling * address comments * lint * fix --- src/ray/common/constants.h | 2 ++ src/ray/common/id.h | 40 +++++++++++++++++++++++++ src/ray/core_worker/context.cc | 27 ++++++++++++----- src/ray/core_worker/context.h | 1 + src/ray/core_worker/core_worker.cc | 26 +++++++++++++++- src/ray/core_worker/test/mock_worker.cc | 1 - src/ray/raylet/node_manager.cc | 21 +++++++++---- src/ray/raylet/test/util.h | 1 + src/ray/raylet/worker.cc | 17 +++-------- src/ray/raylet/worker.h | 7 +++-- src/ray/raylet/worker_pool.cc | 37 +++++------------------ src/ray/raylet/worker_pool.h | 5 +--- src/ray/raylet/worker_pool_test.cc | 8 ++--- src/ray/raylet_client/raylet_client.h | 2 +- streaming/src/test/mock_actor.cc | 1 - 15 files changed, 125 insertions(+), 71 deletions(-) diff --git a/src/ray/common/constants.h b/src/ray/common/constants.h index 0ac238b2a..1636846f0 100644 --- a/src/ray/common/constants.h +++ b/src/ray/common/constants.h @@ -42,3 +42,5 @@ constexpr char kWorkerRayletConfigPlaceholder[] = "RAY_WORKER_RAYLET_CONFIG_PLAC /// Public DNS address which is is used to connect and get local IP. constexpr char kPublicDNSServerIp[] = "8.8.8.8"; constexpr int kPublicDNSServerPort = 53; + +constexpr char kEnvVarKeyJobId[] = "RAY_JOB_ID"; diff --git a/src/ray/common/id.h b/src/ray/common/id.h index 6506ca8ed..2eb6cd679 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -61,6 +61,7 @@ class BaseID { // Warning: this can duplicate IDs after a fork() call. We assume this never happens. static T FromRandom(); static T FromBinary(const std::string &binary); + static T FromHex(const std::string &hex_str); static const T &Nil(); static size_t Size() { return T::Size(); } @@ -394,6 +395,45 @@ T BaseID::FromBinary(const std::string &binary) { return t; } +inline unsigned char hex_to_uchar(const char c, bool &err) { + unsigned char num = 0; + if (c >= '0' && c <= '9') { + num = c - '0'; + } else if (c >= 'a' && c <= 'f') { + num = c - 'a' + 0xa; + } else if (c >= 'A' && c <= 'F') { + num = c - 'A' + 0xA; + } else { + err = true; + } + return num; +} + +template +T BaseID::FromHex(const std::string &hex_str) { + T id; + + if (2 * T::Size() != hex_str.size()) { + RAY_LOG(ERROR) << "incorrect hex string length: 2 * " << T::Size() + << " != " << hex_str.size() << ", hex string: " << hex_str; + return T::Nil(); + } + + uint8_t *data = id.MutableData(); + for (size_t i = 0; i < T::Size(); i++) { + char first = hex_str[2 * i]; + char second = hex_str[2 * i + 1]; + bool err = false; + data[i] = (hex_to_uchar(first, err) << 4) + hex_to_uchar(second, err); + if (err) { + RAY_LOG(ERROR) << "incorrect hex character, hex string: " << hex_str; + return T::Nil(); + } + } + + return id; +} + template const T &BaseID::Nil() { static const T nil_id; diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index aa3f3f548..b62ce116e 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -111,7 +111,9 @@ WorkerContext::WorkerContext(WorkerType worker_type, const WorkerID &worker_id, const JobID &job_id) : worker_type_(worker_type), worker_id_(worker_id), - current_job_id_(worker_type_ == WorkerType::DRIVER ? job_id : JobID::Nil()), + current_job_id_(RayConfig::instance().enable_multi_tenancy() + ? job_id + : (worker_type_ == WorkerType::DRIVER ? job_id : JobID::Nil())), current_actor_id_(ActorID::Nil()), current_actor_placement_group_id_(PlacementGroupID::Nil()), placement_group_capture_child_tasks_(true), @@ -161,7 +163,10 @@ const std::unordered_map return override_environment_variables_; } -void WorkerContext::SetCurrentJobId(const JobID &job_id) { current_job_id_ = job_id; } +void WorkerContext::SetCurrentJobId(const JobID &job_id) { + RAY_CHECK(!RayConfig::instance().enable_multi_tenancy()); + current_job_id_ = job_id; +} void WorkerContext::SetCurrentTaskId(const TaskID &task_id) { GetThreadContext().SetCurrentTaskId(task_id); @@ -170,13 +175,17 @@ void WorkerContext::SetCurrentTaskId(const TaskID &task_id) { void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { GetThreadContext().SetCurrentTask(task_spec); if (task_spec.IsNormalTask()) { - RAY_CHECK(current_job_id_.IsNil()); - SetCurrentJobId(task_spec.JobId()); + if (!RayConfig::instance().enable_multi_tenancy()) { + RAY_CHECK(current_job_id_.IsNil()); + SetCurrentJobId(task_spec.JobId()); + } current_task_is_direct_call_ = true; override_environment_variables_ = task_spec.OverrideEnvironmentVariables(); } else if (task_spec.IsActorCreationTask()) { - RAY_CHECK(current_job_id_.IsNil()); - SetCurrentJobId(task_spec.JobId()); + if (!RayConfig::instance().enable_multi_tenancy()) { + RAY_CHECK(current_job_id_.IsNil()); + SetCurrentJobId(task_spec.JobId()); + } RAY_CHECK(current_actor_id_.IsNil()); current_actor_id_ = task_spec.ActorCreationId(); current_actor_is_direct_call_ = true; @@ -187,7 +196,9 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { placement_group_capture_child_tasks_ = task_spec.PlacementGroupCaptureChildTasks(); override_environment_variables_ = task_spec.OverrideEnvironmentVariables(); } else if (task_spec.IsActorTask()) { - RAY_CHECK(current_job_id_ == task_spec.JobId()); + if (!RayConfig::instance().enable_multi_tenancy()) { + RAY_CHECK(current_job_id_ == task_spec.JobId()); + } RAY_CHECK(current_actor_id_ == task_spec.ActorId()); } else { RAY_CHECK(false); @@ -196,7 +207,7 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { void WorkerContext::ResetCurrentTask(const TaskSpecification &task_spec) { GetThreadContext().ResetCurrentTask(); - if (task_spec.IsNormalTask()) { + if (!RayConfig::instance().enable_multi_tenancy() && task_spec.IsNormalTask()) { SetCurrentJobId(JobID::Nil()); } } diff --git a/src/ray/core_worker/context.h b/src/ray/core_worker/context.h index fac3f2c08..11c481b06 100644 --- a/src/ray/core_worker/context.h +++ b/src/ray/core_worker/context.h @@ -42,6 +42,7 @@ class WorkerContext { const std::unordered_map &GetCurrentOverrideEnvironmentVariables() const; + // TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted. // TODO(edoakes): remove this once Python core worker uses the task interfaces. void SetCurrentJobId(const JobID &job_id); diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 01213c190..f9840a695 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -59,6 +59,30 @@ void BuildCommonTaskSpec( } } +ray::JobID GetProcessJobID(const ray::CoreWorkerOptions &options) { + if (options.worker_type == ray::WorkerType::DRIVER) { + RAY_CHECK(!options.job_id.IsNil()); + } else { + RAY_CHECK(options.job_id.IsNil()); + } + + if (options.worker_type == ray::WorkerType::WORKER) { + // For workers, the job ID is assigned by Raylet via an environment variable. + const char *job_id_env = std::getenv(kEnvVarKeyJobId); + // TODO(kfstorm): Use `RAY_CHECK` instead once the `enable_multi_tenancy` flag is + // removed. + // RAY_CHECK(job_id_env); + if (!job_id_env) { + // Multi-tenancy is disabled. + // NOTE(kfstorm): We can't read `RayConfig::instance().enable_multi_tenancy()` here + // because `RayConfig` is not initialized yet. + return ray::JobID::Nil(); + } + return ray::JobID::FromHex(job_id_env); + } + return options.job_id; +} + } // namespace namespace ray { @@ -273,7 +297,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ get_call_site_(RayConfig::instance().record_ref_creation_sites() ? options_.get_lang_stack : nullptr), - worker_context_(options_.worker_type, worker_id, options_.job_id), + worker_context_(options_.worker_type, worker_id, GetProcessJobID(options_)), io_work_(io_service_), client_call_manager_(new rpc::ClientCallManager(io_service_)), death_check_timer_(io_service_), diff --git a/src/ray/core_worker/test/mock_worker.cc b/src/ray/core_worker/test/mock_worker.cc index 200833703..47f111dfd 100644 --- a/src/ray/core_worker/test/mock_worker.cc +++ b/src/ray/core_worker/test/mock_worker.cc @@ -39,7 +39,6 @@ class MockWorker { options.language = Language::PYTHON; options.store_socket = store_socket; options.raylet_socket = raylet_socket; - options.job_id = JobID::FromInt(1); options.gcs_options = gcs_options; options.enable_logging = true; options.install_failure_signal_handler = true; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 6fe7c9d19..b70c58cb5 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1166,13 +1166,22 @@ void NodeManager::ProcessRegisterClientRequestMessage( auto message = flatbuffers::GetRoot(message_data); Language language = static_cast(message->language()); + const JobID job_id = from_flatbuf(*message->job_id()); WorkerID worker_id = from_flatbuf(*message->worker_id()); pid_t pid = message->worker_pid(); std::string worker_ip_address = string_from_flatbuf(*message->ip_address()); // TODO(suquark): Use `WorkerType` in `common.proto` without type converting. rpc::WorkerType worker_type = static_cast(message->worker_type()); - auto worker = std::dynamic_pointer_cast(std::make_shared( - worker_id, language, worker_type, worker_ip_address, client, client_call_manager_)); + if ((RayConfig::instance().enable_multi_tenancy() && + worker_type != rpc::WorkerType::IO_WORKER) || + worker_type == rpc::WorkerType::DRIVER) { + RAY_CHECK(!job_id.IsNil()); + } else { + RAY_CHECK(job_id.IsNil()); + } + auto worker = std::dynamic_pointer_cast( + std::make_shared(job_id, worker_id, language, worker_type, + worker_ip_address, client, client_call_manager_)); auto send_reply_callback = [this, client](Status status, int assigned_port) { flatbuffers::FlatBufferBuilder fbb; @@ -1211,14 +1220,12 @@ void NodeManager::ProcessRegisterClientRequestMessage( // Register the new driver. RAY_CHECK(pid >= 0); worker->SetProcess(Process::FromPid(pid)); - const JobID job_id = from_flatbuf(*message->job_id()); // Compute a dummy driver task id from a given driver. const TaskID driver_task_id = TaskID::ComputeDriverTaskId(worker_id); worker->AssignTaskId(driver_task_id); rpc::JobConfig job_config; job_config.ParseFromString(message->serialized_job_config()->str()); - Status status = - worker_pool_.RegisterDriver(worker, job_id, job_config, send_reply_callback); + Status status = worker_pool_.RegisterDriver(worker, job_config, send_reply_callback); if (status.ok()) { local_queues_.AddDriverTaskId(driver_task_id); auto job_data_ptr = @@ -2847,7 +2854,9 @@ void NodeManager::FinishAssignTask(const std::shared_ptr &worke // We successfully assigned the task to the worker. worker->AssignTaskId(spec.TaskId()); worker->SetOwnerAddress(spec.CallerAddress()); - worker->AssignJobId(spec.JobId()); + if (!RayConfig::instance().enable_multi_tenancy()) { + worker->AssignJobId(spec.JobId()); + } // TODO(swang): For actors with multiple actor handles, to // guarantee that tasks are replayed in the same order after a // failure, we must update the task's execution dependency to be diff --git a/src/ray/raylet/test/util.h b/src/ray/raylet/test/util.h index 9339455c2..476cd1f4f 100644 --- a/src/ray/raylet/test/util.h +++ b/src/ray/raylet/test/util.h @@ -32,6 +32,7 @@ class MockWorker : public WorkerInterface { void AssignTaskId(const TaskID &task_id) {} + // TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted. void AssignJobId(const JobID &job_id) {} void SetAssignedTask(Task &assigned_task) {} diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index 4ddc252bb..a3298247b 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -26,7 +26,7 @@ namespace ray { namespace raylet { /// A constructor responsible for initializing the state of a worker. -Worker::Worker(const WorkerID &worker_id, const Language &language, +Worker::Worker(const JobID &job_id, const WorkerID &worker_id, const Language &language, rpc::WorkerType worker_type, const std::string &ip_address, std::shared_ptr connection, rpc::ClientCallManager &client_call_manager) @@ -37,6 +37,7 @@ Worker::Worker(const WorkerID &worker_id, const Language &language, assigned_port_(-1), port_(-1), connection_(connection), + assigned_job_id_(job_id), placement_group_id_(PlacementGroupID::Nil()), dead_(false), blocked_(false), @@ -111,18 +112,8 @@ const std::unordered_set &Worker::GetBlockedTaskIds() const { } void Worker::AssignJobId(const JobID &job_id) { - if (!RayConfig::instance().enable_multi_tenancy()) { - assigned_job_id_ = job_id; - } else { - if (!assigned_job_id_.IsNil()) { - RAY_CHECK(assigned_job_id_ == job_id) - << "The worker " << worker_id_ << " is already assigned to job " - << assigned_job_id_ << ". It cannot be reassigned to job " << job_id; - } else { - assigned_job_id_ = job_id; - RAY_LOG(INFO) << "Assigned worker " << worker_id_ << " to job " << job_id; - } - } + RAY_CHECK(!RayConfig::instance().enable_multi_tenancy()); + assigned_job_id_ = job_id; } const JobID &Worker::GetAssignedJobId() const { return assigned_job_id_; } diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index f739c98cb..512230db7 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -60,6 +60,7 @@ class WorkerInterface { virtual bool AddBlockedTaskId(const TaskID &task_id) = 0; virtual bool RemoveBlockedTaskId(const TaskID &task_id) = 0; virtual const std::unordered_set &GetBlockedTaskIds() const = 0; + // TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted. virtual void AssignJobId(const JobID &job_id) = 0; virtual const JobID &GetAssignedJobId() const = 0; virtual void AssignActorId(const ActorID &actor_id) = 0; @@ -121,8 +122,9 @@ class Worker : public WorkerInterface { public: /// A constructor that initializes a worker object. /// NOTE: You MUST manually set the worker process. - Worker(const WorkerID &worker_id, const Language &language, rpc::WorkerType worker_type, - const std::string &ip_address, std::shared_ptr connection, + Worker(const JobID &job_id, const WorkerID &worker_id, const Language &language, + rpc::WorkerType worker_type, const std::string &ip_address, + std::shared_ptr connection, rpc::ClientCallManager &client_call_manager); /// A destructor responsible for freeing all worker state. ~Worker() {} @@ -149,6 +151,7 @@ class Worker : public WorkerInterface { bool AddBlockedTaskId(const TaskID &task_id); bool RemoveBlockedTaskId(const TaskID &task_id); const std::unordered_set &GetBlockedTaskIds() const; + // TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted. void AssignJobId(const JobID &job_id); const JobID &GetAssignedJobId() const; void AssignActorId(const ActorID &actor_id); diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 4778e6b5a..6b5922ee7 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -329,6 +329,12 @@ Process WorkerPool::StartWorkerProcess( } ProcessEnvironment env; + if (RayConfig::instance().enable_multi_tenancy() && + worker_type != rpc::WorkerType::IO_WORKER) { + // We pass the job ID to worker processes via an environment variable, so we don't + // need to add a new CLI parameter for both Python and Java workers. + env.emplace(kEnvVarKeyJobId, job_id.Hex()); + } if (RayConfig::instance().enable_multi_tenancy() && job_config) { env.insert(job_config->worker_env().begin(), job_config->worker_env().end()); } @@ -338,12 +344,6 @@ Process WorkerPool::StartWorkerProcess( } Process proc = StartProcess(worker_command_args, env); - if (RayConfig::instance().enable_multi_tenancy() && job_config) { - // If the pid is reused between processes, the old process must have exited. - // So it's safe to bind the pid with another job ID. - RAY_LOG(DEBUG) << "Worker process " << proc.GetId() << " is bound to job " << job_id; - state.worker_pids_to_assigned_jobs[proc.GetId()] = job_id; - } RAY_LOG(DEBUG) << "Started worker process of " << workers_to_start << " worker(s) with pid " << proc.GetId(); MonitorStartingWorkerProcess(proc, language, worker_type); @@ -486,27 +486,6 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr &worker state.registered_workers.insert(worker); - if (RayConfig::instance().enable_multi_tenancy() && - worker->GetWorkerType() != rpc::WorkerType::IO_WORKER) { - auto dedicated_workers_it = state.worker_pids_to_assigned_jobs.find(pid); - RAY_CHECK(dedicated_workers_it != state.worker_pids_to_assigned_jobs.end()); - auto job_id = dedicated_workers_it->second; - - // If the job is unknown to Raylet, we don't allow new registrations. - if (!all_jobs_.contains(job_id)) { - auto process = Process::FromPid(pid); - state.starting_worker_processes.erase(process); - Status status = - Status::Invalid("The provided job ID is unknown. Reject registration."); - send_reply_callback(status, /*port=*/0); - return status; - } - - worker->AssignJobId(job_id); - // We don't call state.worker_pids_to_assigned_jobs.erase(job_id) here - // because we allow multi-workers per worker process. - } - // Send the reply immediately for worker registrations. send_reply_callback(Status::OK(), port); return Status::OK(); @@ -549,7 +528,7 @@ void WorkerPool::OnWorkerStarted(const std::shared_ptr &worker) } Status WorkerPool::RegisterDriver(const std::shared_ptr &driver, - const JobID &job_id, const rpc::JobConfig &job_config, + const rpc::JobConfig &job_config, std::function send_reply_callback) { int port; RAY_CHECK(!driver->GetAssignedTaskId().IsNil()); @@ -561,7 +540,7 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr &driver driver->SetAssignedPort(port); auto &state = GetStateForLanguage(driver->GetLanguage()); state.registered_drivers.insert(std::move(driver)); - driver->AssignJobId(job_id); + const auto job_id = driver->GetAssignedJobId(); all_jobs_[job_id] = job_config; // This is a workaround to start initial workers on this node if and only if Raylet is diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 75fd641d1..12aef89b5 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -158,13 +158,12 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// Register a new driver. /// /// \param[in] worker The driver to be registered. - /// \param[in] job_id The job ID of the driver. /// \param[in] job_config The config of the job. /// \param[in] send_reply_callback The callback to invoke after registration is /// finished/failed. /// \return If the registration is successful. Status RegisterDriver(const std::shared_ptr &worker, - const JobID &job_id, const rpc::JobConfig &job_config, + const rpc::JobConfig &job_config, std::function send_reply_callback); /// Get the client connection's registered worker. @@ -334,8 +333,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { std::unordered_map dedicated_workers_to_tasks; /// A map for speeding up looking up the pending worker for the given task. std::unordered_map tasks_to_dedicated_workers; - /// A map for looking up the owner JobId by the pid of worker. - std::unordered_map worker_pids_to_assigned_jobs; /// We'll push a warning to the user every time a multiple of this many /// worker processes has been started. int multiple_for_warning; diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 219c4a444..274cf3e3f 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -119,11 +119,10 @@ class WorkerPoolTest : public ::testing::TestWithParam { ClientConnection::Create(client_handler, message_handler, std::move(socket), "worker", {}, error_message_type_); std::shared_ptr worker_ = std::make_shared( - WorkerID::FromRandom(), language, rpc::WorkerType::WORKER, "127.0.0.1", client, - client_call_manager_); + job_id, WorkerID::FromRandom(), language, rpc::WorkerType::WORKER, "127.0.0.1", + client, client_call_manager_); std::shared_ptr worker = std::dynamic_pointer_cast(worker_); - worker->AssignJobId(job_id); if (!proc.IsNull()) { worker->SetProcess(proc); } @@ -135,8 +134,7 @@ class WorkerPoolTest : public ::testing::TestWithParam { const rpc::JobConfig &job_config = rpc::JobConfig()) { auto driver = CreateWorker(Process::CreateNewDummy(), Language::PYTHON, job_id); driver->AssignTaskId(TaskID::ForDriverTask(job_id)); - RAY_CHECK_OK( - worker_pool_->RegisterDriver(driver, job_id, job_config, [](Status, int) {})); + RAY_CHECK_OK(worker_pool_->RegisterDriver(driver, job_config, [](Status, int) {})); return driver; } diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index 8feb437e5..936f6a3c1 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -180,7 +180,7 @@ class RayletClient : public PinObjectsInterface, /// \param worker_id A unique ID to represent the worker. /// \param worker_type The type of the worker. If it is a certain worker type, an /// additional message will be sent to register as one. - /// \param job_id The ID of the driver. This is non-nil if the client is a driver. + /// \param job_id The job ID of the driver or worker. /// \param language Language of the worker. /// \param ip_address The IP address of the worker. /// \param status This will be populated with the result of connection attempt. diff --git a/streaming/src/test/mock_actor.cc b/streaming/src/test/mock_actor.cc index 3a7646758..28911758e 100644 --- a/streaming/src/test/mock_actor.cc +++ b/streaming/src/test/mock_actor.cc @@ -493,7 +493,6 @@ class StreamingWorker { options.language = Language::PYTHON; options.store_socket = store_socket; options.raylet_socket = raylet_socket; - options.job_id = JobID::FromInt(1); options.gcs_options = gcs_options; options.enable_logging = true; options.install_failure_signal_handler = true;