From 5f5160ead944bbc2e670ec269f438b72fc6c6d05 Mon Sep 17 00:00:00 2001 From: Kai Yang Date: Fri, 4 Sep 2020 20:34:06 +0800 Subject: [PATCH] [Core] Multi-tenancy: Worker capping (#10500) --- python/ray/tests/BUILD | 2 +- python/ray/tests/test_multi_tenancy.py | 110 ++++++++++++++++++++++--- src/ray/raylet/main.cc | 6 +- src/ray/raylet/node_manager.cc | 7 +- src/ray/raylet/node_manager.h | 2 + src/ray/raylet/worker_pool.cc | 61 ++++++++++++++ src/ray/raylet/worker_pool.h | 14 +++- src/ray/raylet/worker_pool_test.cc | 2 +- 8 files changed, 188 insertions(+), 16 deletions(-) diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 055407a7f..f467ac7c6 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -45,6 +45,7 @@ py_test_module_list( "test_memory_scheduling.py", "test_metrics.py", "test_multi_node_2.py", + "test_multi_tenancy.py", "test_multinode_failures_2.py", "test_multinode_failures.py", "test_multi_node.py", @@ -85,7 +86,6 @@ py_test_module_list( "test_metrics_agent.py", "test_microbenchmarks.py", "test_mini.py", - "test_multi_tenancy.py", "test_node_manager.py", "test_numba.py", "test_ray_init.py", diff --git a/python/ray/tests/test_multi_tenancy.py b/python/ray/tests/test_multi_tenancy.py index ea89005cb..17b1dc919 100644 --- a/python/ray/tests/test_multi_tenancy.py +++ b/python/ray/tests/test_multi_tenancy.py @@ -1,6 +1,7 @@ # coding: utf-8 import os import sys +import time import grpc import pytest @@ -11,6 +12,19 @@ from ray.core.generated import node_manager_pb2, node_manager_pb2_grpc from ray.test_utils import wait_for_condition, run_string_as_driver_nonblocking +def get_num_workers(): + raylet = ray.nodes()[0] + raylet_address = "{}:{}".format(raylet["NodeManagerAddress"], + raylet["NodeManagerPort"]) + channel = grpc.insecure_channel(raylet_address) + stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) + return len([ + worker for worker in stub.GetNodeStats( + node_manager_pb2.GetNodeStatsRequest()).workers_stats + if not worker.is_driver + ]) + + # Test that when `redis_address` and `job_config` is not set in # `ray.init(...)`, Raylet will start `num_cpus` Python workers for the driver. def test_initial_workers(shutdown_only): @@ -19,17 +33,7 @@ def test_initial_workers(shutdown_only): num_cpus=1, include_dashboard=True, _system_config={"enable_multi_tenancy": True}) - raylet = ray.nodes()[0] - raylet_address = "{}:{}".format(raylet["NodeManagerAddress"], - raylet["NodeManagerPort"]) - channel = grpc.insecure_channel(raylet_address) - stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) - wait_for_condition(lambda: len([ - worker for worker in stub.GetNodeStats( - node_manager_pb2.GetNodeStatsRequest()).workers_stats - if not worker.is_driver - ]) == 1, - timeout=10) + wait_for_condition(lambda: get_num_workers() == 1) # This test case starts some driver processes. Each driver process submits @@ -123,5 +127,89 @@ def test_worker_env(shutdown_only): assert ray.get(get_env.remote("foo2")) == "bar2" +def test_worker_capping_kill_idle_workers(shutdown_only): + # Avoid starting initial workers by setting num_cpus to 0. + ray.init(num_cpus=0, _system_config={"enable_multi_tenancy": True}) + assert get_num_workers() == 0 + + @ray.remote(num_cpus=0) + class Actor: + def ping(self): + pass + + actor = Actor.remote() + ray.get(actor.ping.remote()) + # Actor is now alive and worker 1 which holds the actor is alive + assert get_num_workers() == 1 + + @ray.remote(num_cpus=0) + def foo(): + # Wait for a while + time.sleep(10) + + obj1 = foo.remote() + # Worker 2 runs a normal task + wait_for_condition(lambda: get_num_workers() == 2) + + obj2 = foo.remote() + # Worker 3 runs a normal task + wait_for_condition(lambda: get_num_workers() == 3) + + ray.get(obj1) + # Worker 2 now becomes idle and should be killed + wait_for_condition(lambda: get_num_workers() == 2) + ray.get(obj2) + # Worker 3 now becomes idle and should be killed + wait_for_condition(lambda: get_num_workers() == 1) + + +def test_worker_capping_run_many_small_tasks(shutdown_only): + ray.init(num_cpus=2, _system_config={"enable_multi_tenancy": True}) + + @ray.remote(num_cpus=0.5) + def foo(): + time.sleep(5) + + # Run more tasks than `num_cpus`, but the CPU resource requirement is + # still within `num_cpus`. + obj_refs = [foo.remote() for _ in range(4)] + wait_for_condition(lambda: get_num_workers() == 4) + + ray.get(obj_refs) + # After finished the tasks, some workers are killed to keep the total + # number of workers <= num_cpus. + wait_for_condition(lambda: get_num_workers() == 2) + + time.sleep(1) + # The two remaining workers stay alive forever. + assert get_num_workers() == 2 + + +def test_worker_capping_run_chained_tasks(shutdown_only): + ray.init(num_cpus=2, _system_config={"enable_multi_tenancy": True}) + + @ray.remote(num_cpus=0.5) + def foo(x): + if x > 1: + return ray.get(foo.remote(x - 1)) + x + else: + time.sleep(5) + return x + + # Run a chain of tasks which exceed `num_cpus` in amount, but the CPU + # resource requirement is still within `num_cpus`. + obj = foo.remote(4) + wait_for_condition(lambda: get_num_workers() == 4) + + ray.get(obj) + # After finished the tasks, some workers are killed to keep the total + # number of workers <= num_cpus. + wait_for_condition(lambda: get_num_workers() == 2) + + time.sleep(1) + # The two remaining workers stay alive forever. + assert get_num_workers() == 2 + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 4d75d8d65..f03a009fc 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -161,6 +161,10 @@ int main(int argc, char *argv[]) { // about this? static_resource_conf[resource_name] = std::stod(resource_quantity); } + auto num_cpus_it = static_resource_conf.find("CPU"); + int num_cpus = num_cpus_it != static_resource_conf.end() + ? static_cast(num_cpus_it->second) + : 0; node_manager_config.raylet_config = raylet_config; node_manager_config.resource_config = @@ -170,6 +174,7 @@ int main(int argc, char *argv[]) { node_manager_config.node_manager_address = node_ip_address; node_manager_config.node_manager_port = node_manager_port; node_manager_config.num_initial_workers = num_initial_workers; + node_manager_config.num_workers_soft_limit = num_cpus; node_manager_config.num_initial_python_workers_for_first_job = num_initial_python_workers_for_first_job; node_manager_config.maximum_startup_concurrency = maximum_startup_concurrency; @@ -225,7 +230,6 @@ int main(int argc, char *argv[]) { object_manager_config.plasma_directory = plasma_directory; object_manager_config.huge_pages = huge_pages; - int num_cpus = static_cast(static_resource_conf["CPU"]); object_manager_config.rpc_service_threads_number = std::min(std::max(2, num_cpus / 4), 8); object_manager_config.object_chunk_size = diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index e719aba2c..73527e0f2 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -139,7 +139,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, initial_config_(config), local_available_resources_(config.resource_config), worker_pool_( - io_service, config.num_initial_workers, + io_service, config.num_initial_workers, config.num_workers_soft_limit, config.num_initial_python_workers_for_first_job, config.maximum_startup_concurrency, config.min_worker_port, config.max_worker_port, gcs_client_, config.worker_commands, @@ -1362,6 +1362,11 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr & // Call task dispatch to assign work to the new worker. DispatchTasks(local_queues_.GetReadyTasksByClass()); } + if (RayConfig::instance().enable_multi_tenancy()) { + // If the worker remains idle after scheduling, we may kill it to ensure the + // registered workers are in a reasonable size. + worker_pool_.TryKillingIdleWorker(worker); + } } void NodeManager::ProcessDisconnectClientMessage( diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index b79084eeb..025b7e7f9 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -67,6 +67,8 @@ struct NodeManagerConfig { int max_worker_port; /// The initial number of workers to create. int num_initial_workers; + /// The soft limit of the number of workers. + int num_workers_soft_limit; /// Number of initial Python workers for the first job. int num_initial_python_workers_for_first_job; /// The maximum number of workers that can be started concurrently by a diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 8b759ec91..a786f11ab 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -55,6 +55,7 @@ namespace ray { namespace raylet { WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers, + int num_workers_soft_limit, int num_initial_python_workers_for_first_job, int maximum_startup_concurrency, int min_worker_port, int max_worker_port, std::shared_ptr gcs_client, @@ -62,6 +63,7 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers, const std::unordered_map &raylet_config, std::function starting_worker_timeout_callback) : io_service_(&io_service), + num_workers_soft_limit_(num_workers_soft_limit), maximum_startup_concurrency_(maximum_startup_concurrency), gcs_client_(std::move(gcs_client)), raylet_config_(raylet_config), @@ -591,6 +593,64 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { } } +void WorkerPool::TryKillingIdleWorker(std::shared_ptr worker) { + auto &worker_state = GetStateForLanguage(worker->GetLanguage()); + if (worker_state.pending_unregistration_workers.count(worker) > 0) { + // This worker has already been killed. + // This is possible because a Java worker process may hold multiple workers. + return; + } + + auto running_size = GetAllRegisteredWorkers().size(); + for (const auto &entry : states_by_lang_) { + running_size -= entry.second.pending_unregistration_workers.size(); + } + if (running_size <= static_cast(num_workers_soft_limit_)) { + return; + } + + auto worker_id = worker->WorkerId(); + const auto pid = worker->GetProcess().GetId(); + if (worker_state.idle.count(worker) == 0) { + return; + } + if (worker_state.starting_worker_processes.count(worker->GetProcess()) > 0) { + // A Java worker process may hold multiple workers. + RAY_LOG(DEBUG) << "Some workers of pid " << pid + << " are pending registration. Skip killing worker " << worker_id; + return; + } + + // Make sure all workers in this worker process are idle. + // This block of code is needed by Java workers. + std::unordered_set> workers_in_the_same_process; + for (const auto &worker_in_the_same_process : worker_state.registered_workers) { + if (worker_in_the_same_process->GetProcess().GetId() == pid) { + if (worker_state.idle.count(worker_in_the_same_process) == 0) { + // Another worker in this process isn't idle, so this process can't be killed. + return; + } else { + workers_in_the_same_process.insert(worker_in_the_same_process); + } + } + } + + for (auto worker_it = workers_in_the_same_process.begin(); + worker_it != workers_in_the_same_process.end(); worker_it++) { + RAY_LOG(INFO) << "The worker pool has " << running_size + << " registered workers which exceeds the soft limit of " + << num_workers_soft_limit_ << ", and worker " + << (*worker_it)->WorkerId() << " with pid " << pid + << " is idle. Kill it."; + // Remove the worker from the idle pool so it can't be popped anymore. However, we + // don't remove it from the registered pool because we want the worker to go through + // the normal disconnection logic in Node Manager. + RemoveWorker(worker_state.idle, *worker_it); + worker_state.pending_unregistration_workers.insert(*worker_it); + } + worker->GetProcess().Kill(); +} + std::shared_ptr WorkerPool::PopWorker( const TaskSpecification &task_spec) { auto &state = GetStateForLanguage(task_spec.GetLanguage()); @@ -671,6 +731,7 @@ std::shared_ptr WorkerPool::PopWorker( bool WorkerPool::DisconnectWorker(const std::shared_ptr &worker) { auto &state = GetStateForLanguage(worker->GetLanguage()); RAY_CHECK(RemoveWorker(state.registered_workers, worker)); + RemoveWorker(state.pending_unregistration_workers, worker); stats::CurrentWorker().Record( 0, {{stats::LanguageKey, Language_Name(worker->GetLanguage())}, diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index bdaa4d3cf..04bcf4ad0 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -73,6 +73,7 @@ class WorkerPool : public WorkerPoolInterface { /// the pool. /// /// \param num_workers The number of workers to start, per language. + /// \param num_workers_soft_limit The soft limit of the number of workers. /// \param num_initial_python_workers_for_first_job The number of initial Python /// workers for the first job. /// \param maximum_startup_concurrency The maximum number of worker processes @@ -88,7 +89,7 @@ class WorkerPool : public WorkerPoolInterface { /// \param starting_worker_timeout_callback The callback that will be triggered once /// it times out to start a worker. WorkerPool(boost::asio::io_service &io_service, int num_workers, - int num_initial_python_workers_for_first_job, + int num_workers_soft_limit, int num_initial_python_workers_for_first_job, int maximum_startup_concurrency, int min_worker_port, int max_worker_port, std::shared_ptr gcs_client, const WorkerCommandMap &worker_commands, @@ -180,6 +181,11 @@ class WorkerPool : public WorkerPoolInterface { /// \param The idle worker to add. void PushWorker(const std::shared_ptr &worker); + /// Try to kill the worker if it's idle. + /// + /// \param worker The worker to be killed. + void TryKillingIdleWorker(std::shared_ptr worker); + /// Pop an idle worker from the pool. The caller is responsible for pushing /// the worker back onto the pool once the worker has completed its work. /// @@ -292,6 +298,10 @@ class WorkerPool : public WorkerPoolInterface { std::unordered_set> registered_workers; /// All drivers that have registered and are still connected. std::unordered_set> registered_drivers; + /// All workers that have been killed but been unregistered yet. + /// This field is used to calculate the size of running workers when trying to kill an + /// idle worker. + std::unordered_set> pending_unregistration_workers; /// A map from the pids of starting worker processes /// to the number of their unregistered workers. std::unordered_map starting_worker_processes; @@ -354,6 +364,8 @@ class WorkerPool : public WorkerPoolInterface { /// For Process class for managing subprocesses (e.g. reaping zombies). boost::asio::io_service *io_service_; + /// The soft limit of the number of registered workers. + int num_workers_soft_limit_; /// The maximum number of worker processes that can be started concurrently. int maximum_startup_concurrency_; /// Keeps track of unused ports that newly-created workers can bind on. diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index a04692c10..56b251e66 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -34,7 +34,7 @@ class WorkerPoolMock : public WorkerPool { public: explicit WorkerPoolMock(boost::asio::io_service &io_service, const WorkerCommandMap &worker_commands) - : WorkerPool(io_service, 0, 0, MAXIMUM_STARTUP_CONCURRENCY, 0, 0, nullptr, + : WorkerPool(io_service, 0, 0, 0, MAXIMUM_STARTUP_CONCURRENCY, 0, 0, nullptr, worker_commands, {}, []() {}), last_worker_process_() { states_by_lang_[ray::Language::JAVA].num_workers_per_process =