From 4c03f7ca2ff923172b9591844cb715f40c12c9af Mon Sep 17 00:00:00 2001 From: Kai Yang Date: Mon, 14 Sep 2020 14:49:31 +0800 Subject: [PATCH] [Core] Multi-tenancy: Reject worker registration if job has finished (#10569) --- python/ray/tests/test_multi_tenancy.py | 32 ++++- src/ray/core_worker/core_worker.cc | 22 +++- src/ray/raylet/format/node_manager.fbs | 4 + src/ray/raylet/node_manager.cc | 13 +- src/ray/raylet/worker_pool.cc | 169 +++++++++++++------------ src/ray/raylet/worker_pool.h | 4 +- src/ray/raylet/worker_pool_test.cc | 11 +- src/ray/raylet_client/raylet_client.cc | 21 ++- src/ray/raylet_client/raylet_client.h | 5 +- 9 files changed, 182 insertions(+), 99 deletions(-) diff --git a/python/ray/tests/test_multi_tenancy.py b/python/ray/tests/test_multi_tenancy.py index 17b1dc919..2e96d3096 100644 --- a/python/ray/tests/test_multi_tenancy.py +++ b/python/ray/tests/test_multi_tenancy.py @@ -9,7 +9,8 @@ import pytest import ray import ray.test_utils from ray.core.generated import node_manager_pb2, node_manager_pb2_grpc -from ray.test_utils import wait_for_condition, run_string_as_driver_nonblocking +from ray.test_utils import (wait_for_condition, run_string_as_driver, + run_string_as_driver_nonblocking) def get_num_workers(): @@ -211,5 +212,34 @@ def test_worker_capping_run_chained_tasks(shutdown_only): assert get_num_workers() == 2 +def test_worker_registration_failure_after_driver_exit(shutdown_only): + info = ray.init(num_cpus=1, _system_config={"enable_multi_tenancy": True}) + + driver_code = """ +import ray +import time + + +ray.init(address="{}") + +@ray.remote +def foo(): + pass + +[foo.remote() for _ in range(100)] + +ray.shutdown() + """.format(info["redis_address"]) + + before = get_num_workers() + assert before == 1 + + run_string_as_driver(driver_code) + + # wait for a while to let workers register + time.sleep(2) + wait_for_condition(lambda: get_num_workers() == before, timeout=10) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 75c783955..fc32c5642 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -301,20 +301,30 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ // instead of crashing. auto grpc_client = rpc::NodeManagerWorkerClient::make( options_.raylet_ip_address, options_.node_manager_port, *client_call_manager_); + Status raylet_client_status; ClientID local_raylet_id; int assigned_port; std::unordered_map system_config; local_raylet_client_ = std::shared_ptr(new raylet::RayletClient( io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(), options_.worker_type, worker_context_.GetCurrentJobID(), options_.language, - options_.node_ip_address, &local_raylet_id, &assigned_port, &system_config, - options_.serialized_job_config)); + options_.node_ip_address, &raylet_client_status, &local_raylet_id, &assigned_port, + &system_config, options_.serialized_job_config)); + + if (!raylet_client_status.ok()) { + // Avoid using FATAL log or RAY_CHECK here because they may create a core dump file. + RAY_LOG(ERROR) << "Failed to register worker " << worker_id << " to Raylet. " + << raylet_client_status; + if (options_.enable_logging) { + RayLog::ShutDownRayLog(); + } + // Quit the process immediately. + _Exit(1); + } + connected_ = true; - RAY_CHECK(assigned_port != -1) - << "Failed to allocate a port for the worker. Please specify a wider port range " - "using the '--min-worker-port' and '--max-worker-port' arguments to 'ray " - "start'."; + RAY_CHECK(assigned_port >= 0); // NOTE(edoakes): any initialization depending on RayConfig must happen after this line. RayConfig::instance().initialize(system_config); diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index c78a06120..1f2cba6ce 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -150,6 +150,10 @@ table RegisterClientRequest { } table RegisterClientReply { + // Whether the registration succeeded. + success: bool; + // The reason of registration failure. + failure_reason: string; // GCS ClientID of the local node manager. raylet_id: string; // Port that this worker should listen on. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 3e5514bf4..6eab5a345 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1260,7 +1260,7 @@ void NodeManager::ProcessRegisterClientRequestMessage( auto worker = std::dynamic_pointer_cast(std::make_shared( worker_id, language, worker_type, worker_ip_address, client, client_call_manager_)); - auto send_reply_callback = [this, client](int assigned_port) { + auto send_reply_callback = [this, client](Status status, int assigned_port) { flatbuffers::FlatBufferBuilder fbb; std::vector system_config_keys; std::vector system_config_values; @@ -1269,7 +1269,8 @@ void NodeManager::ProcessRegisterClientRequestMessage( system_config_values.push_back(kv.second); } auto reply = ray::protocol::CreateRegisterClientReply( - fbb, to_flatbuf(fbb, self_node_id_), assigned_port, + fbb, status.ok(), fbb.CreateString(status.ToString()), + to_flatbuf(fbb, self_node_id_), assigned_port, string_vec_to_flatbuf(fbb, system_config_keys), string_vec_to_flatbuf(fbb, system_config_values)); fbb.Finish(reply); @@ -1285,7 +1286,13 @@ void NodeManager::ProcessRegisterClientRequestMessage( if (worker_type == rpc::WorkerType::WORKER || worker_type == rpc::WorkerType::IO_WORKER) { // Register the new worker. - RAY_UNUSED(worker_pool_.RegisterWorker(worker, pid, send_reply_callback)); + auto status = worker_pool_.RegisterWorker(worker, pid, send_reply_callback); + if (!status.ok()) { + // If the worker failed to register to Raylet, trigger task dispatching here to + // allow new worker processes to be started (if capped by + // maximum_startup_concurrency). + DispatchTasks(local_queues_.GetReadyTasksByClass()); + } } else { // Register the new driver. RAY_CHECK(pid >= 0); diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 6ade2367d..4129bb6c5 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -421,7 +421,6 @@ Status WorkerPool::GetNextFreePort(int *port) { // Return to pool to check later. free_ports_->push(*port); } - *port = -1; return Status::Invalid( "No available ports. Please specify a wider port range using --min-worker-port and " "--max-worker-port."); @@ -444,115 +443,127 @@ void WorkerPool::HandleJobFinished(const JobID &job_id) { Status WorkerPool::RegisterWorker(const std::shared_ptr &worker, pid_t pid, - std::function send_reply_callback) { + std::function send_reply_callback) { RAY_CHECK(worker); - // The port that this worker's gRPC server should listen on. 0 if the worker - // should bind on a random port. - int port; - Status status; - auto &state = GetStateForLanguage(worker->GetLanguage()); + auto it = state.starting_worker_processes.find(Process::FromPid(pid)); if (it == state.starting_worker_processes.end()) { RAY_LOG(WARNING) << "Received a register request from an unknown worker " << pid; - // Return -1 to signal to the worker that registration failed. - port = -1; - status = Status::Invalid("Unknown worker"); - } else { - RAY_RETURN_NOT_OK(GetNextFreePort(&port)); - RAY_LOG(DEBUG) << "Registering worker with pid " << pid << ", port: " << port - << ", worker_type: " << rpc::WorkerType_Name(worker->GetWorkerType()); - worker->SetAssignedPort(port); - worker->SetProcess(it->first); - it->second--; - if (it->second == 0) { - state.starting_worker_processes.erase(it); - // We may have slots to start more workers now. - TryStartIOWorkers(worker->GetLanguage(), state); + Status status = Status::Invalid("Unknown worker"); + send_reply_callback(status, /*port=*/0); + return status; + } + + // The port that this worker's gRPC server should listen on. 0 if the worker + // should bind on a random port. + int port = 0; + Status status = GetNextFreePort(&port); + if (!status.ok()) { + send_reply_callback(status, /*port=*/0); + return status; + } + RAY_LOG(DEBUG) << "Registering worker with pid " << pid << ", port: " << port + << ", worker_type: " << rpc::WorkerType_Name(worker->GetWorkerType()); + worker->SetAssignedPort(port); + worker->SetProcess(it->first); + it->second--; + if (it->second == 0) { + state.starting_worker_processes.erase(it); + // We may have slots to start more workers now. + TryStartIOWorkers(worker->GetLanguage(), state); + } + + RAY_CHECK(worker->GetProcess().GetId() == pid); + state.registered_workers.insert(worker); + if (worker->GetWorkerType() == rpc::WorkerType::IO_WORKER) { + state.registered_io_workers.insert(worker); + state.num_starting_io_workers--; + } + + if (RayConfig::instance().enable_multi_tenancy()) { + auto dedicated_workers_it = state.worker_pids_to_assigned_jobs.find(pid); + RAY_CHECK(dedicated_workers_it != state.worker_pids_to_assigned_jobs.end()); + auto job_id = dedicated_workers_it->second; + + // If the job is finished, we don't allow new registrations. + if (!unfinished_jobs_.contains(job_id)) { + auto process = Process::FromPid(pid); + state.starting_worker_processes.erase(process); + Status status = + Status::Invalid("The job is not running anymore. Reject registration."); + send_reply_callback(status, /*port=*/0); + return status; } - RAY_CHECK(worker->GetProcess().GetId() == pid); - state.registered_workers.insert(worker); - if (worker->GetWorkerType() == rpc::WorkerType::IO_WORKER) { - state.registered_io_workers.insert(worker); - state.num_starting_io_workers--; - } + worker->AssignJobId(job_id); + // We don't call state.worker_pids_to_assigned_jobs.erase(job_id) here + // because we allow multi-workers per worker process. - if (RayConfig::instance().enable_multi_tenancy()) { - auto dedicated_workers_it = state.worker_pids_to_assigned_jobs.find(pid); - RAY_CHECK(dedicated_workers_it != state.worker_pids_to_assigned_jobs.end()); - auto job_id = dedicated_workers_it->second; - worker->AssignJobId(job_id); - // We don't call state.worker_pids_to_assigned_jobs.erase(job_id) here - // because we allow multi-workers per worker process. - - // This is a workaround to finish driver registration after all initial workers are - // registered to Raylet if and only if Raylet is started by a Python driver and the - // job config is not set in `ray.init(...)`. - if (first_job_ == job_id && worker->GetLanguage() == Language::PYTHON) { - if (++first_job_registered_python_worker_count_ == - first_job_driver_wait_num_python_workers_) { - if (first_job_send_register_client_reply_to_driver_) { - first_job_send_register_client_reply_to_driver_(); - first_job_send_register_client_reply_to_driver_ = nullptr; - } + // This is a workaround to finish driver registration after all initial workers are + // registered to Raylet if and only if Raylet is started by a Python driver and the + // job config is not set in `ray.init(...)`. + if (first_job_ == job_id && worker->GetLanguage() == Language::PYTHON) { + if (++first_job_registered_python_worker_count_ == + first_job_driver_wait_num_python_workers_) { + if (first_job_send_register_client_reply_to_driver_) { + first_job_send_register_client_reply_to_driver_(); + first_job_send_register_client_reply_to_driver_ = nullptr; } } } - - status = Status::OK(); } // Send the reply immediately for worker registrations. - if (send_reply_callback) { - send_reply_callback(port); - } - return status; + send_reply_callback(Status::OK(), port); + return Status::OK(); } Status WorkerPool::RegisterDriver(const std::shared_ptr &driver, const JobID &job_id, const rpc::JobConfig &job_config, - std::function send_reply_callback) { + std::function send_reply_callback) { int port; RAY_CHECK(!driver->GetAssignedTaskId().IsNil()); - RAY_RETURN_NOT_OK(GetNextFreePort(&port)); + Status status = GetNextFreePort(&port); + if (!status.ok()) { + send_reply_callback(status, /*port=*/0); + return status; + } driver->SetAssignedPort(port); auto &state = GetStateForLanguage(driver->GetLanguage()); state.registered_drivers.insert(std::move(driver)); driver->AssignJobId(job_id); unfinished_jobs_[job_id] = job_config; - if (send_reply_callback) { - // This is a workaround to start initial workers on this node if and only if Raylet is - // started by a Python driver and the job config is not set in `ray.init(...)`. - // Invoke the `send_reply_callback` later to only finish driver - // registration after all initial workers are registered to Raylet. - bool delay_callback = false; - // Multi-tenancy is enabled. - if (RayConfig().instance().enable_multi_tenancy()) { - // If this is the first job. - if (first_job_.IsNil()) { - first_job_ = job_id; - // If the number of Python workers we need to wait is positive. - if (num_initial_python_workers_for_first_job_ > 0) { - delay_callback = true; - // Start initial Python workers for the first job. - for (int i = 0; i < num_initial_python_workers_for_first_job_; i++) { - StartWorkerProcess(Language::PYTHON, rpc::WorkerType::WORKER, job_id); - } + // This is a workaround to start initial workers on this node if and only if Raylet is + // started by a Python driver and the job config is not set in `ray.init(...)`. + // Invoke the `send_reply_callback` later to only finish driver + // registration after all initial workers are registered to Raylet. + bool delay_callback = false; + // Multi-tenancy is enabled. + if (RayConfig().instance().enable_multi_tenancy()) { + // If this is the first job. + if (first_job_.IsNil()) { + first_job_ = job_id; + // If the number of Python workers we need to wait is positive. + if (num_initial_python_workers_for_first_job_ > 0) { + delay_callback = true; + // Start initial Python workers for the first job. + for (int i = 0; i < num_initial_python_workers_for_first_job_; i++) { + StartWorkerProcess(Language::PYTHON, rpc::WorkerType::WORKER, job_id); } } } + } - if (delay_callback) { - RAY_CHECK(!first_job_send_register_client_reply_to_driver_); - first_job_send_register_client_reply_to_driver_ = [send_reply_callback, port]() { - send_reply_callback(port); - }; - } else { - send_reply_callback(port); - } + if (delay_callback) { + RAY_CHECK(!first_job_send_register_client_reply_to_driver_); + first_job_send_register_client_reply_to_driver_ = [send_reply_callback, port]() { + send_reply_callback(Status::OK(), port); + }; + } else { + send_reply_callback(Status::OK(), port); } return Status::OK(); diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 04bcf4ad0..5d3d321b2 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -122,7 +122,7 @@ class WorkerPool : public WorkerPoolInterface { /// Returns 0 if the worker should bind on a random port. /// \return If the registration is successful. Status RegisterWorker(const std::shared_ptr &worker, pid_t pid, - std::function send_reply_callback); + std::function send_reply_callback); /// Register a new driver. /// @@ -134,7 +134,7 @@ class WorkerPool : public WorkerPoolInterface { /// \return If the registration is successful. Status RegisterDriver(const std::shared_ptr &worker, const JobID &job_id, const rpc::JobConfig &job_config, - std::function send_reply_callback); + std::function send_reply_callback); /// Get the client connection's registered worker. /// diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 2d1a831d3..c293c3161 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -135,7 +135,8 @@ class WorkerPoolTest : public ::testing::TestWithParam { const rpc::JobConfig &job_config = rpc::JobConfig()) { auto driver = CreateWorker(Process::CreateNewDummy(), Language::PYTHON, job_id); driver->AssignTaskId(TaskID::ForDriverTask(job_id)); - RAY_CHECK_OK(worker_pool_->RegisterDriver(driver, job_id, job_config, nullptr)); + RAY_CHECK_OK( + worker_pool_->RegisterDriver(driver, job_id, job_config, [](Status, int) {})); return driver; } @@ -246,7 +247,7 @@ TEST_P(WorkerPoolTest, HandleWorkerRegistration) { ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 1); // Check that we cannot lookup the worker before it's registered. ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), nullptr); - RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), nullptr)); + RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), [](Status, int) {})); // Check that we can lookup the worker after it's registered. ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), worker); } @@ -259,6 +260,12 @@ TEST_P(WorkerPoolTest, HandleWorkerRegistration) { } } +TEST_P(WorkerPoolTest, HandleUnknownWorkerRegistration) { + auto worker = CreateWorker(Process(), Language::PYTHON); + auto status = worker_pool_->RegisterWorker(worker, 1234, [](Status, int) {}); + ASSERT_FALSE(status.ok()); +} + TEST_P(WorkerPoolTest, StartupPythonWorkerProcessCount) { TestStartupWorkerProcessCount(Language::PYTHON, 1, {"dummy_py_worker_command"}); } diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 935b23897..6859f7557 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -82,7 +82,7 @@ raylet::RayletClient::RayletClient( std::shared_ptr grpc_client, const std::string &raylet_socket, const WorkerID &worker_id, rpc::WorkerType worker_type, const JobID &job_id, const Language &language, - const std::string &ip_address, ClientID *raylet_id, int *port, + const std::string &ip_address, Status *status, ClientID *raylet_id, int *port, std::unordered_map *system_config, const std::string &job_config) : grpc_client_(std::move(grpc_client)), @@ -103,10 +103,23 @@ raylet::RayletClient::RayletClient( // Register the process ID with the raylet. // NOTE(swang): If raylet exits and we are registered as a worker, we will get killed. std::vector reply; - auto status = conn_->AtomicRequestReply(MessageType::RegisterClientRequest, - MessageType::RegisterClientReply, &reply, &fbb); - RAY_CHECK_OK_PREPEND(status, "[RayletClient] Unable to register worker with raylet."); + auto request_status = conn_->AtomicRequestReply( + MessageType::RegisterClientRequest, MessageType::RegisterClientReply, &reply, &fbb); + if (!request_status.ok()) { + *status = + Status(request_status.code(), + std::string("[RayletClient] Unable to register worker with raylet. ") + + request_status.message()); + return; + } auto reply_message = flatbuffers::GetRoot(reply.data()); + bool success = reply_message->success(); + if (success) { + *status = Status::OK(); + } else { + *status = Status::Invalid(string_from_flatbuf(*reply_message->failure_reason())); + return; + } *raylet_id = ClientID::FromBinary(reply_message->raylet_id()->str()); *port = reply_message->port(); diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index 061cd23f4..2dad68da1 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -181,6 +181,7 @@ class RayletClient : public PinObjectsInterface, /// \param job_id The ID of the driver. This is non-nil if the client is a driver. /// \param language Language of the worker. /// \param ip_address The IP address of the worker. + /// \param status This will be populated with the result of connection attempt. /// \param raylet_id This will be populated with the local raylet's ClientID. /// \param system_config This will be populated with internal config parameters /// provided by the raylet. @@ -190,8 +191,8 @@ class RayletClient : public PinObjectsInterface, std::shared_ptr grpc_client, const std::string &raylet_socket, const WorkerID &worker_id, rpc::WorkerType worker_type, const JobID &job_id, const Language &language, - const std::string &ip_address, ClientID *raylet_id, int *port, - std::unordered_map *system_config, + const std::string &ip_address, Status *status, ClientID *raylet_id, + int *port, std::unordered_map *system_config, const std::string &job_config); /// Connect to the raylet via grpc only.