diff --git a/python/ray/tests/test_multi_tenancy.py b/python/ray/tests/test_multi_tenancy.py index 2e96d3096..18d0be1de 100644 --- a/python/ray/tests/test_multi_tenancy.py +++ b/python/ray/tests/test_multi_tenancy.py @@ -9,21 +9,22 @@ import pytest import ray import ray.test_utils from ray.core.generated import node_manager_pb2, node_manager_pb2_grpc -from ray.test_utils import (wait_for_condition, run_string_as_driver, +from ray.test_utils import (wait_for_condition, wait_for_pid_to_exit, + run_string_as_driver, run_string_as_driver_nonblocking) -def get_num_workers(): +def get_workers(): raylet = ray.nodes()[0] raylet_address = "{}:{}".format(raylet["NodeManagerAddress"], raylet["NodeManagerPort"]) channel = grpc.insecure_channel(raylet_address) stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) - return len([ + return [ worker for worker in stub.GetNodeStats( node_manager_pb2.GetNodeStatsRequest()).workers_stats if not worker.is_driver - ]) + ] # Test that when `redis_address` and `job_config` is not set in @@ -34,7 +35,7 @@ def test_initial_workers(shutdown_only): num_cpus=1, include_dashboard=True, _system_config={"enable_multi_tenancy": True}) - wait_for_condition(lambda: get_num_workers() == 1) + wait_for_condition(lambda: len(get_workers()) == 1) # This test case starts some driver processes. Each driver process submits @@ -131,7 +132,7 @@ def test_worker_env(shutdown_only): def test_worker_capping_kill_idle_workers(shutdown_only): # Avoid starting initial workers by setting num_cpus to 0. ray.init(num_cpus=0, _system_config={"enable_multi_tenancy": True}) - assert get_num_workers() == 0 + assert len(get_workers()) == 0 @ray.remote(num_cpus=0) class Actor: @@ -141,7 +142,7 @@ def test_worker_capping_kill_idle_workers(shutdown_only): actor = Actor.remote() ray.get(actor.ping.remote()) # Actor is now alive and worker 1 which holds the actor is alive - assert get_num_workers() == 1 + assert len(get_workers()) == 1 @ray.remote(num_cpus=0) def foo(): @@ -150,18 +151,18 @@ def test_worker_capping_kill_idle_workers(shutdown_only): obj1 = foo.remote() # Worker 2 runs a normal task - wait_for_condition(lambda: get_num_workers() == 2) + wait_for_condition(lambda: len(get_workers()) == 2) obj2 = foo.remote() # Worker 3 runs a normal task - wait_for_condition(lambda: get_num_workers() == 3) + wait_for_condition(lambda: len(get_workers()) == 3) ray.get(obj1) # Worker 2 now becomes idle and should be killed - wait_for_condition(lambda: get_num_workers() == 2) + wait_for_condition(lambda: len(get_workers()) == 2) ray.get(obj2) # Worker 3 now becomes idle and should be killed - wait_for_condition(lambda: get_num_workers() == 1) + wait_for_condition(lambda: len(get_workers()) == 1) def test_worker_capping_run_many_small_tasks(shutdown_only): @@ -174,16 +175,16 @@ def test_worker_capping_run_many_small_tasks(shutdown_only): # Run more tasks than `num_cpus`, but the CPU resource requirement is # still within `num_cpus`. obj_refs = [foo.remote() for _ in range(4)] - wait_for_condition(lambda: get_num_workers() == 4) + wait_for_condition(lambda: len(get_workers()) == 4) ray.get(obj_refs) # After finished the tasks, some workers are killed to keep the total # number of workers <= num_cpus. - wait_for_condition(lambda: get_num_workers() == 2) + wait_for_condition(lambda: len(get_workers()) == 2) time.sleep(1) # The two remaining workers stay alive forever. - assert get_num_workers() == 2 + assert len(get_workers()) == 2 def test_worker_capping_run_chained_tasks(shutdown_only): @@ -200,16 +201,56 @@ def test_worker_capping_run_chained_tasks(shutdown_only): # Run a chain of tasks which exceed `num_cpus` in amount, but the CPU # resource requirement is still within `num_cpus`. obj = foo.remote(4) - wait_for_condition(lambda: get_num_workers() == 4) + wait_for_condition(lambda: len(get_workers()) == 4) ray.get(obj) # After finished the tasks, some workers are killed to keep the total # number of workers <= num_cpus. - wait_for_condition(lambda: get_num_workers() == 2) + wait_for_condition(lambda: len(get_workers()) == 2) time.sleep(1) # The two remaining workers stay alive forever. - assert get_num_workers() == 2 + assert len(get_workers()) == 2 + + +def test_worker_capping_fifo(shutdown_only): + # Start 2 initial workers by setting num_cpus to 2. + info = ray.init(num_cpus=2, _system_config={"enable_multi_tenancy": True}) + wait_for_condition(lambda: len(get_workers()) == 2) + + time.sleep(1) + + @ray.remote + def getpid(): + return os.getpid() + + worker1, worker2 = get_workers() + + if worker1.pid == ray.get(getpid.remote()): + worker1, worker2 = [worker2, worker1] + + # Worker 1 is before worker 2 in the FIFO queue. + + driver_code = """ +import ray + +ray.init(address="{}") + +@ray.remote +def foo(): + pass + +ray.get(foo.remote()) +ray.shutdown() + """.format(info["redis_address"]) + + run_string_as_driver(driver_code) + + # Worker 1 should have been killed. + wait_for_pid_to_exit(worker1.pid) + + wait_for_condition(lambda: len(get_workers()) == 1) + assert worker2.pid == get_workers()[0].pid def test_worker_registration_failure_after_driver_exit(shutdown_only): @@ -231,14 +272,14 @@ def foo(): ray.shutdown() """.format(info["redis_address"]) - before = get_num_workers() + before = len(get_workers()) assert before == 1 run_string_as_driver(driver_code) # wait for a while to let workers register time.sleep(2) - wait_for_condition(lambda: get_num_workers() == before, timeout=10) + wait_for_condition(lambda: len(get_workers()) == before) if __name__ == "__main__": diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 159cf3e34..267d79f53 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1370,9 +1370,9 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr & DispatchTasks(local_queues_.GetReadyTasksByClass()); } if (RayConfig::instance().enable_multi_tenancy()) { - // If the worker remains idle after scheduling, we may kill it to ensure the - // registered workers are in a reasonable size. - worker_pool_.TryKillingIdleWorker(worker); + // We trigger killing here instead of inside `Worker::PushWorker` because we + // only kill an idle worker if it remains idle after scheduling. + worker_pool_.TryKillingIdleWorkers(); } } diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 4129bb6c5..058d5af9f 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -165,16 +165,6 @@ WorkerPool::~WorkerPool() { } } -uint32_t WorkerPool::Size(const Language &language) const { - const auto state = states_by_lang_.find(language); - if (state == states_by_lang_.end()) { - return 0; - } else { - return static_cast(state->second.idle.size() + - state->second.idle_actor.size()); - } -} - Process WorkerPool::StartWorkerProcess(const Language &language, const rpc::WorkerType worker_type, const JobID &job_id, @@ -635,68 +625,83 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { // Put the worker to the corresponding idle pool. if (worker->GetActorId().IsNil()) { state.idle.insert(worker); + if (RayConfig::instance().enable_multi_tenancy()) { + idle_of_all_languages.push_back(worker); + } } else { state.idle_actor[worker->GetActorId()] = worker; } } } -void WorkerPool::TryKillingIdleWorker(std::shared_ptr worker) { - auto &worker_state = GetStateForLanguage(worker->GetLanguage()); - if (worker_state.pending_unregistration_workers.count(worker) > 0) { - // This worker has already been killed. - // This is possible because a Java worker process may hold multiple workers. - return; - } - - auto running_size = GetAllRegisteredWorkers().size(); - for (const auto &entry : states_by_lang_) { - running_size -= entry.second.pending_unregistration_workers.size(); - } - if (running_size <= static_cast(num_workers_soft_limit_)) { - return; - } - - auto worker_id = worker->WorkerId(); - const auto pid = worker->GetProcess().GetId(); - if (worker_state.idle.count(worker) == 0) { - return; - } - if (worker_state.starting_worker_processes.count(worker->GetProcess()) > 0) { - // A Java worker process may hold multiple workers. - RAY_LOG(DEBUG) << "Some workers of pid " << pid - << " are pending registration. Skip killing worker " << worker_id; - return; - } - - // Make sure all workers in this worker process are idle. - // This block of code is needed by Java workers. - std::unordered_set> workers_in_the_same_process; - for (const auto &worker_in_the_same_process : worker_state.registered_workers) { - if (worker_in_the_same_process->GetProcess().GetId() == pid) { - if (worker_state.idle.count(worker_in_the_same_process) == 0) { - // Another worker in this process isn't idle, so this process can't be killed. - return; - } else { - workers_in_the_same_process.insert(worker_in_the_same_process); - } +void WorkerPool::TryKillingIdleWorkers() { + size_t running_size = 0; + for (const auto worker : GetAllRegisteredWorkers()) { + if (!worker->IsDead()) { + running_size++; } } - for (auto worker_it = workers_in_the_same_process.begin(); - worker_it != workers_in_the_same_process.end(); worker_it++) { - RAY_LOG(INFO) << "The worker pool has " << running_size - << " registered workers which exceeds the soft limit of " - << num_workers_soft_limit_ << ", and worker " - << (*worker_it)->WorkerId() << " with pid " << pid - << " is idle. Kill it."; - // Remove the worker from the idle pool so it can't be popped anymore. However, we - // don't remove it from the registered pool because we want the worker to go through - // the normal disconnection logic in Node Manager. - RemoveWorker(worker_state.idle, *worker_it); - worker_state.pending_unregistration_workers.insert(*worker_it); + // Kill idle workers in FIFO order. + for (auto it = idle_of_all_languages.begin(); + it != idle_of_all_languages.end() && + running_size > static_cast(num_workers_soft_limit_); + it++) { + if ((*it)->IsDead()) { + // This worker has already been killed. + // This is possible because a Java worker process may hold multiple workers. + continue; + } + auto process = (*it)->GetProcess(); + + auto &worker_state = GetStateForLanguage((*it)->GetLanguage()); + + if (worker_state.starting_worker_processes.count(process) > 0) { + // A Java worker process may hold multiple workers. + // Some workers of this process are pending registration. Skip killing this worker. + continue; + } + + // Make sure all workers in this worker process are idle. + // This block of code is needed by Java workers. + auto workers_in_the_same_process = GetWorkersByProcess(process); + bool can_be_killed = true; + for (const auto &worker : workers_in_the_same_process) { + if (worker_state.idle.count(worker) == 0) { + // Another worker in this process isn't idle, so this process can't be killed. + can_be_killed = false; + break; + } + } + if (!can_be_killed) { + continue; + } + + for (auto worker_it = workers_in_the_same_process.begin(); + worker_it != workers_in_the_same_process.end(); worker_it++) { + RAY_LOG(INFO) << "The worker pool has " << running_size + << " registered workers which exceeds the soft limit of " + << num_workers_soft_limit_ << ", and worker " + << (*worker_it)->WorkerId() << " with pid " << process.GetId() + << " is idle. Kill it."; + // Remove the worker from the idle pool so it can't be popped anymore. + RemoveWorker(worker_state.idle, *worker_it); + if (!(*worker_it)->IsDead()) { + (*worker_it)->MarkDead(); + running_size--; + } + } + process.Kill(); } - worker->GetProcess().Kill(); + + std::list> new_idle_of_all_languages; + for (auto it = idle_of_all_languages.begin(); it != idle_of_all_languages.end(); it++) { + if (!(*it)->IsDead()) { + new_idle_of_all_languages.push_back(*it); + } + } + + idle_of_all_languages = std::move(new_idle_of_all_languages); } std::shared_ptr WorkerPool::PopWorker( @@ -741,12 +746,19 @@ std::shared_ptr WorkerPool::PopWorker( } } else { // Find an available worker which is already assigned to this job. - for (auto it = state.idle.begin(); it != state.idle.end(); it++) { - if ((*it)->GetAssignedJobId() != task_spec.JobId()) { + // Try to pop the most recently pushed worker. + for (auto it = idle_of_all_languages.rbegin(); it != idle_of_all_languages.rend(); + it++) { + if (task_spec.GetLanguage() != (*it)->GetLanguage() || + (*it)->GetAssignedJobId() != task_spec.JobId()) { continue; } - worker = std::move(*it); - state.idle.erase(it); + state.idle.erase(*it); + // We can't erase a reverse_iterator. + auto lit = it.base(); + lit--; + worker = std::move(*lit); + idle_of_all_languages.erase(lit); break; } if (worker == nullptr) { @@ -779,7 +791,6 @@ std::shared_ptr WorkerPool::PopWorker( bool WorkerPool::DisconnectWorker(const std::shared_ptr &worker) { auto &state = GetStateForLanguage(worker->GetLanguage()); RAY_CHECK(RemoveWorker(state.registered_workers, worker)); - RemoveWorker(state.pending_unregistration_workers, worker); stats::CurrentWorker().Record( 0, {{stats::LanguageKey, Language_Name(worker->GetLanguage())}, @@ -911,6 +922,20 @@ void WorkerPool::TryStartIOWorkers(const Language &language, State &state) { } } +std::unordered_set> WorkerPool::GetWorkersByProcess( + const Process &process) { + std::unordered_set> workers_of_process; + for (auto &entry : states_by_lang_) { + auto &worker_state = entry.second; + for (const auto &worker : worker_state.registered_workers) { + if (worker->GetProcess().GetId() == process.GetId()) { + workers_of_process.insert(worker); + } + } + } + return workers_of_process; +} + std::string WorkerPool::DebugString() const { std::stringstream result; result << "WorkerPool:"; @@ -920,6 +945,7 @@ std::string WorkerPool::DebugString() const { result << "\n- num " << Language_Name(entry.first) << " drivers: " << entry.second.registered_drivers.size(); } + result << "- num idle workers: " << idle_of_all_languages.size(); return result.str(); } diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 5d3d321b2..bc4da1057 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -181,10 +181,9 @@ class WorkerPool : public WorkerPoolInterface { /// \param The idle worker to add. void PushWorker(const std::shared_ptr &worker); - /// Try to kill the worker if it's idle. - /// - /// \param worker The worker to be killed. - void TryKillingIdleWorker(std::shared_ptr worker); + /// Try killing idle workers to ensure the running workers are in a + /// reasonable size. + void TryKillingIdleWorkers(); /// Pop an idle worker from the pool. The caller is responsible for pushing /// the worker back onto the pool once the worker has completed its work. @@ -298,10 +297,6 @@ class WorkerPool : public WorkerPoolInterface { std::unordered_set> registered_workers; /// All drivers that have registered and are still connected. std::unordered_set> registered_drivers; - /// All workers that have been killed but been unregistered yet. - /// This field is used to calculate the size of running workers when trying to kill an - /// idle worker. - std::unordered_set> pending_unregistration_workers; /// A map from the pids of starting worker processes /// to the number of their unregistered workers. std::unordered_map starting_worker_processes; @@ -362,6 +357,13 @@ class WorkerPool : public WorkerPoolInterface { /// started. void TryStartIOWorkers(const Language &language, State &state); + /// Get all workers of the given process. + /// + /// \param process The process of workers. + /// \return The workers of the given process. + std::unordered_set> GetWorkersByProcess( + const Process &process); + /// For Process class for managing subprocesses (e.g. reaping zombies). boost::asio::io_service *io_service_; /// The soft limit of the number of registered workers. @@ -397,6 +399,10 @@ class WorkerPool : public WorkerPoolInterface { /// This map tracks the latest infos of unfinished jobs. absl::flat_hash_map unfinished_jobs_; + + /// The pool of idle non-actor workers of all languages. This is used to kill idle + /// workers in FIFO order. + std::list> idle_of_all_languages; }; } // namespace raylet