From 3504391fd2b65ef1777e3d361a23e126add107e5 Mon Sep 17 00:00:00 2001 From: Kai Yang Date: Wed, 30 Sep 2020 14:54:53 +0800 Subject: [PATCH] [Core] Multi-tenancy: enable multi-tenancy by default (#10570) * Add new job in Travis to enable multi-tenancy * fix * Update .bazelrc * Update .travis.yml * fix test_job_gc_with_detached_actor * fix test_multiple_downstream_tasks * fix lint * Enable multi-tenancy by default * Kill idle workers in FIFO order * Update test * minor update * Address comments * fix some cases * fix test_remote_cancel * Address comments * fix after merge * remove kill * fix worker_pool_test * fix java test timeout * fix test_two_custom_resources * Add a delay when killing idle workers * fix test_worker_failure * fix test_worker_failed again * fix DisconnectWorker * update test_worker_failed * Revert some python tests * lint * address comments --- .../java/io/ray/runtime/config/RayConfig.java | 9 +- python/ray/test_utils.py | 7 +- python/ray/tests/test_advanced_2.py | 16 ++- python/ray/tests/test_component_failures_2.py | 6 +- python/ray/tests/test_job.py | 4 +- python/ray/tests/test_multi_tenancy.py | 30 ++--- python/ray/tests/test_multinode_failures.py | 6 +- python/ray/tests/test_reconstruction.py | 6 +- src/ray/common/ray_config_def.h | 11 +- src/ray/core_worker/core_worker.cc | 6 + src/ray/core_worker/core_worker.h | 4 + src/ray/protobuf/core_worker.proto | 8 ++ src/ray/raylet/node_manager.cc | 5 - src/ray/raylet/worker_pool.cc | 124 +++++++++++++----- src/ray/raylet/worker_pool.h | 24 +++- src/ray/rpc/worker/core_worker_client.h | 5 + src/ray/rpc/worker/core_worker_server.h | 6 +- 17 files changed, 205 insertions(+), 72 deletions(-) diff --git a/java/runtime/src/main/java/io/ray/runtime/config/RayConfig.java b/java/runtime/src/main/java/io/ray/runtime/config/RayConfig.java index 903b87847..e7f5cddb7 100644 --- a/java/runtime/src/main/java/io/ray/runtime/config/RayConfig.java +++ b/java/runtime/src/main/java/io/ray/runtime/config/RayConfig.java @@ -235,10 +235,17 @@ public class RayConfig { codeSearchPath = Collections.emptyList(); } - boolean enableMultiTenancy = false; + boolean enableMultiTenancy; if (config.hasPath("ray.raylet.config.enable_multi_tenancy")) { enableMultiTenancy = Boolean.valueOf(config.getString("ray.raylet.config.enable_multi_tenancy")); + } else { + String envString = System.getenv("RAY_ENABLE_MULTI_TENANCY"); + if (StringUtils.isNotBlank(envString)) { + enableMultiTenancy = "1".equals(envString); + } else { + enableMultiTenancy = true; // Default value + } } if (!enableMultiTenancy) { diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index c9f35a557..dc4562be2 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -207,10 +207,13 @@ def run_string_as_driver_nonblocking(driver_script): return proc -def wait_for_num_actors(num_actors, timeout=10): +def wait_for_num_actors(num_actors, state=None, timeout=10): start_time = time.time() while time.time() - start_time < timeout: - if len(ray.actors()) >= num_actors: + if len([ + _ for _ in ray.actors().values() + if state is None or _["State"] == state + ]) >= num_actors: return time.sleep(0.1) raise RayTestTimeoutException("Timed out while waiting for global state.") diff --git a/python/ray/tests/test_advanced_2.py b/python/ray/tests/test_advanced_2.py index 1dc0b895d..f1f96031d 100644 --- a/python/ray/tests/test_advanced_2.py +++ b/python/ray/tests/test_advanced_2.py @@ -11,7 +11,10 @@ import ray import ray.cluster_utils import ray.test_utils -from ray.test_utils import RayTestTimeoutException +from ray.test_utils import ( + RayTestTimeoutException, + wait_for_condition, +) logger = logging.getLogger(__name__) @@ -505,6 +508,17 @@ def test_two_custom_resources(ray_start_cluster): }) ray.init(address=cluster.address) + @ray.remote + def foo(): + # Sleep a while to emulate a slow operation. This is needed to make + # sure tasks are scheduled to different nodes. + time.sleep(0.1) + return ray.worker.global_worker.node.unique_id + + # Make sure each node has at least one idle worker. + wait_for_condition( + lambda: len(set(ray.get([foo.remote() for _ in range(6)]))) == 2) + @ray.remote(resources={"CustomResource1": 1}) def f(): time.sleep(0.001) diff --git a/python/ray/tests/test_component_failures_2.py b/python/ray/tests/test_component_failures_2.py index 5bfd159cc..2235c5745 100644 --- a/python/ray/tests/test_component_failures_2.py +++ b/python/ray/tests/test_component_failures_2.py @@ -63,7 +63,11 @@ def test_worker_failed(ray_start_workers_separate_multinode): time.sleep(0.1) # Kill the workers as the tasks execute. for pid in pids: - os.kill(pid, SIGKILL) + try: + os.kill(pid, SIGKILL) + except OSError: + # The process may have already exited due to worker capping. + pass time.sleep(0.1) # Make sure that we either get the object or we get an appropriate # exception. diff --git a/python/ray/tests/test_job.py b/python/ray/tests/test_job.py index 395ae9f96..15b082b46 100644 --- a/python/ray/tests/test_job.py +++ b/python/ray/tests/test_job.py @@ -67,11 +67,13 @@ class Actor: return 1 _ = Actor.options(lifetime="detached", name="DetachedActor").remote() +# Make sure the actor is created before the driver exits. +ray.get(_.value.remote()) """.format(address) p = run_string_as_driver_nonblocking(driver) # Wait for actor to be created - wait_for_num_actors(1) + wait_for_num_actors(1, ray.gcs_utils.ActorTableData.ALIVE) actor_table = ray.actors() assert len(actor_table) == 1 diff --git a/python/ray/tests/test_multi_tenancy.py b/python/ray/tests/test_multi_tenancy.py index 18d0be1de..3e937976e 100644 --- a/python/ray/tests/test_multi_tenancy.py +++ b/python/ray/tests/test_multi_tenancy.py @@ -31,10 +31,7 @@ def get_workers(): # `ray.init(...)`, Raylet will start `num_cpus` Python workers for the driver. def test_initial_workers(shutdown_only): # `num_cpus` should be <=2 because a Travis CI machine only has 2 CPU cores - ray.init( - num_cpus=1, - include_dashboard=True, - _system_config={"enable_multi_tenancy": True}) + ray.init(num_cpus=1, include_dashboard=True) wait_for_condition(lambda: len(get_workers()) == 1) @@ -46,7 +43,7 @@ def test_initial_workers(shutdown_only): # different drivers were scheduled to the same worker process, that is, tasks # of different jobs were not correctly isolated during execution. def test_multi_drivers(shutdown_only): - info = ray.init(num_cpus=10, _system_config={"enable_multi_tenancy": True}) + info = ray.init(num_cpus=10) driver_code = """ import os @@ -118,8 +115,7 @@ def test_worker_env(shutdown_only): job_config=ray.job_config.JobConfig(worker_env={ "foo1": "bar1", "foo2": "bar2" - }), - _system_config={"enable_multi_tenancy": True}) + })) @ray.remote def get_env(key): @@ -131,7 +127,7 @@ def test_worker_env(shutdown_only): def test_worker_capping_kill_idle_workers(shutdown_only): # Avoid starting initial workers by setting num_cpus to 0. - ray.init(num_cpus=0, _system_config={"enable_multi_tenancy": True}) + ray.init(num_cpus=0) assert len(get_workers()) == 0 @ray.remote(num_cpus=0) @@ -157,16 +153,13 @@ def test_worker_capping_kill_idle_workers(shutdown_only): # Worker 3 runs a normal task wait_for_condition(lambda: len(get_workers()) == 3) - ray.get(obj1) - # Worker 2 now becomes idle and should be killed - wait_for_condition(lambda: len(get_workers()) == 2) - ray.get(obj2) - # Worker 3 now becomes idle and should be killed + ray.get([obj1, obj2]) + # Worker 2 and 3 now become idle and should be killed wait_for_condition(lambda: len(get_workers()) == 1) def test_worker_capping_run_many_small_tasks(shutdown_only): - ray.init(num_cpus=2, _system_config={"enable_multi_tenancy": True}) + ray.init(num_cpus=2) @ray.remote(num_cpus=0.5) def foo(): @@ -188,7 +181,7 @@ def test_worker_capping_run_many_small_tasks(shutdown_only): def test_worker_capping_run_chained_tasks(shutdown_only): - ray.init(num_cpus=2, _system_config={"enable_multi_tenancy": True}) + ray.init(num_cpus=2) @ray.remote(num_cpus=0.5) def foo(x): @@ -215,7 +208,7 @@ def test_worker_capping_run_chained_tasks(shutdown_only): def test_worker_capping_fifo(shutdown_only): # Start 2 initial workers by setting num_cpus to 2. - info = ray.init(num_cpus=2, _system_config={"enable_multi_tenancy": True}) + info = ray.init(num_cpus=2) wait_for_condition(lambda: len(get_workers()) == 2) time.sleep(1) @@ -233,6 +226,7 @@ def test_worker_capping_fifo(shutdown_only): driver_code = """ import ray +import time ray.init(address="{}") @@ -241,6 +235,8 @@ def foo(): pass ray.get(foo.remote()) +# Sleep a while to make sure an idle worker exits before this driver exits. +time.sleep(2) ray.shutdown() """.format(info["redis_address"]) @@ -254,7 +250,7 @@ ray.shutdown() def test_worker_registration_failure_after_driver_exit(shutdown_only): - info = ray.init(num_cpus=1, _system_config={"enable_multi_tenancy": True}) + info = ray.init(num_cpus=1) driver_code = """ import ray diff --git a/python/ray/tests/test_multinode_failures.py b/python/ray/tests/test_multinode_failures.py index 58f17cc1a..1d11ad96e 100644 --- a/python/ray/tests/test_multinode_failures.py +++ b/python/ray/tests/test_multinode_failures.py @@ -63,7 +63,11 @@ def test_worker_failed(ray_start_workers_separate_multinode): time.sleep(0.1) # Kill the workers as the tasks execute. for pid in pids: - os.kill(pid, SIGKILL) + try: + os.kill(pid, SIGKILL) + except OSError: + # The process may have already exited due to worker capping. + pass time.sleep(0.1) # Make sure that we either get the object or we get an appropriate # exception. diff --git a/python/ray/tests/test_reconstruction.py b/python/ray/tests/test_reconstruction.py index f6599483d..382225cea 100644 --- a/python/ray/tests/test_reconstruction.py +++ b/python/ray/tests/test_reconstruction.py @@ -410,7 +410,11 @@ def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled): return obj = large_object.options(resources={"node2": 1}).remote() - downstream = [chain.remote(obj) for _ in range(4)] + downstream = [ + chain.options(resources={ + "node1": 1 + }).remote(obj) for _ in range(4) + ] for obj in downstream: ray.get(dependent_task.options(resources={"node1": 1}).remote(obj)) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index f71d98a2e..83c1f1800 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -287,7 +287,16 @@ RAY_CONFIG(int64_t, max_resource_shapes_per_load_report, 100) RAY_CONFIG(int64_t, gcs_server_request_timeout_seconds, 5) /// Whether to enable multi tenancy features. -RAY_CONFIG(bool, enable_multi_tenancy, false) +RAY_CONFIG(bool, enable_multi_tenancy, + getenv("RAY_ENABLE_MULTI_TENANCY") == nullptr || + getenv("RAY_ENABLE_MULTI_TENANCY") == std::string("1")) + +/// The interval of periodic idle worker killing. A negative value means worker capping is +/// disabled. +RAY_CONFIG(int64_t, kill_idle_workers_interval_ms, 200) + +/// The idle time threshold for an idle worker to be killed. +RAY_CONFIG(int64_t, idle_worker_killing_time_threshold_ms, 1000) /// Whether start the Plasma Store as a Raylet thread. RAY_CONFIG(bool, ownership_based_object_directory_enabled, false) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index c5e862177..549f60903 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2256,6 +2256,12 @@ void CoreWorker::HandleRestoreSpilledObjects( } } +void CoreWorker::HandleExit(const rpc::ExitRequest &request, rpc::ExitReply *reply, + rpc::SendReplyCallback send_reply_callback) { + send_reply_callback(Status::OK(), nullptr, nullptr); + Exit(/*intentional=*/true); +} + void CoreWorker::YieldCurrentFiber(FiberEvent &event) { RAY_CHECK(worker_context_.CurrentActorIsAsync()); boost::this_fiber::yield(); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 419a6df9f..688300604 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -890,6 +890,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { rpc::RestoreSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) override; + // Make the this worker exit. + void HandleExit(const rpc::ExitRequest &request, rpc::ExitReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// /// Public methods related to async actor call. This should only be used when /// the actor is (1) direct actor and (2) using asyncio mode. diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 007c97b70..f62ad8070 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -312,6 +312,12 @@ message RestoreSpilledObjectsRequest { message RestoreSpilledObjectsReply { } +message ExitRequest { +} + +message ExitReply { +} + service CoreWorkerService { // Push a task directly to this worker from another. rpc PushTask(PushTaskRequest) returns (PushTaskReply); @@ -357,4 +363,6 @@ service CoreWorkerService { returns (RestoreSpilledObjectsReply); // Notification from raylet that an object ID is available in local plasma. rpc PlasmaObjectReady(PlasmaObjectReadyRequest) returns (PlasmaObjectReadyReply); + // Request for a worker to exit. + rpc Exit(ExitRequest) returns (ExitReply); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index a14cc873e..5785a8cb8 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1370,11 +1370,6 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr & // Call task dispatch to assign work to the new worker. DispatchTasks(local_queues_.GetReadyTasksByClass()); } - if (RayConfig::instance().enable_multi_tenancy()) { - // We trigger killing here instead of inside `Worker::PushWorker` because we - // only kill an idle worker if it remains idle after scheduling. - worker_pool_.TryKillingIdleWorkers(); - } } void NodeManager::ProcessDisconnectClientMessage( diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index da428ee86..80f174810 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -71,8 +71,8 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers, first_job_registered_python_worker_count_(0), first_job_driver_wait_num_python_workers_(std::min( num_initial_python_workers_for_first_job, maximum_startup_concurrency)), - num_initial_python_workers_for_first_job_( - num_initial_python_workers_for_first_job) { + num_initial_python_workers_for_first_job_(num_initial_python_workers_for_first_job), + kill_idle_workers_timer_(io_service) { RAY_CHECK(maximum_startup_concurrency > 0); #ifndef _WIN32 // Ignore SIGCHLD signals. If we don't do this, then worker processes will @@ -127,6 +127,8 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers, } if (!RayConfig::instance().enable_multi_tenancy()) { Start(num_workers); + } else { + ScheduleIdleWorkerKilling(); } } @@ -639,7 +641,9 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { if (worker->GetActorId().IsNil()) { state.idle.insert(worker); if (RayConfig::instance().enable_multi_tenancy()) { - idle_of_all_languages.push_back(worker); + int64_t now = current_time_ms(); + idle_of_all_languages_.emplace_back(worker, now); + idle_of_all_languages_map_[worker] = now; } } else { state.idle_actor[worker->GetActorId()] = worker; @@ -647,7 +651,24 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { } } +void WorkerPool::ScheduleIdleWorkerKilling() { + if (RayConfig::instance().kill_idle_workers_interval_ms() > 0) { + kill_idle_workers_timer_.expires_from_now(boost::posix_time::milliseconds( + RayConfig::instance().kill_idle_workers_interval_ms())); + kill_idle_workers_timer_.async_wait([this](const boost::system::error_code &error) { + if (error == boost::asio::error::operation_aborted) { + return; + } + TryKillingIdleWorkers(); + ScheduleIdleWorkerKilling(); + }); + } +} + void WorkerPool::TryKillingIdleWorkers() { + RAY_CHECK(idle_of_all_languages_.size() == idle_of_all_languages_map_.size()); + + int64_t now = current_time_ms(); size_t running_size = 0; for (const auto &worker : GetAllRegisteredWorkers()) { if (!worker->IsDead()) { @@ -656,18 +677,24 @@ void WorkerPool::TryKillingIdleWorkers() { } // Kill idle workers in FIFO order. - for (auto it = idle_of_all_languages.begin(); - it != idle_of_all_languages.end() && - running_size > static_cast(num_workers_soft_limit_); - it++) { - if ((*it)->IsDead()) { + for (const auto &idle_pair : idle_of_all_languages_) { + if (running_size <= static_cast(num_workers_soft_limit_)) { + break; + } + if (now - idle_pair.second < + RayConfig::instance().idle_worker_killing_time_threshold_ms()) { + break; + } + + const auto &idle_worker = idle_pair.first; + if (idle_worker->IsDead()) { // This worker has already been killed. // This is possible because a Java worker process may hold multiple workers. continue; } - auto process = (*it)->GetProcess(); + auto process = idle_worker->GetProcess(); - auto &worker_state = GetStateForLanguage((*it)->GetLanguage()); + auto &worker_state = GetStateForLanguage(idle_worker->GetLanguage()); if (worker_state.starting_worker_processes.count(process) > 0) { // A Java worker process may hold multiple workers. @@ -680,8 +707,11 @@ void WorkerPool::TryKillingIdleWorkers() { auto workers_in_the_same_process = GetWorkersByProcess(process); bool can_be_killed = true; for (const auto &worker : workers_in_the_same_process) { - if (worker_state.idle.count(worker) == 0) { - // Another worker in this process isn't idle, so this process can't be killed. + if (worker_state.idle.count(worker) == 0 || + now - idle_of_all_languages_map_[worker] < + RayConfig::instance().idle_worker_killing_time_threshold_ms()) { + // Another worker in this process isn't idle, or hasn't been idle for a while, so + // this process can't be killed. can_be_killed = false; break; } @@ -690,31 +720,50 @@ void WorkerPool::TryKillingIdleWorkers() { continue; } - for (auto worker_it = workers_in_the_same_process.begin(); - worker_it != workers_in_the_same_process.end(); worker_it++) { + if (running_size - workers_in_the_same_process.size() < + static_cast(num_workers_soft_limit_)) { + // A Java worker process may contain multiple workers. Killing more workers than we + // expect may slow the job. + return; + } + + for (const auto &worker : workers_in_the_same_process) { RAY_LOG(INFO) << "The worker pool has " << running_size << " registered workers which exceeds the soft limit of " - << num_workers_soft_limit_ << ", and worker " - << (*worker_it)->WorkerId() << " with pid " << process.GetId() - << " is idle. Kill it."; + << num_workers_soft_limit_ << ", and worker " << worker->WorkerId() + << " with pid " << process.GetId() + << " has been idle for a a while. Kill it."; + // To avoid object lost issue caused by forcibly killing, send an RPC request to the + // worker to allow it to do cleanup before exiting. + auto rpc_client = worker->rpc_client(); + RAY_CHECK(rpc_client); + rpc::ExitRequest request; + rpc_client->Exit(request, [](const ray::Status &status, const rpc::ExitReply &r) { + if (!status.ok()) { + RAY_LOG(ERROR) << "Failed to send exit request: " << status.ToString(); + } + }); // Remove the worker from the idle pool so it can't be popped anymore. - RemoveWorker(worker_state.idle, *worker_it); - if (!(*worker_it)->IsDead()) { - (*worker_it)->MarkDead(); + RemoveWorker(worker_state.idle, worker); + if (!worker->IsDead()) { + worker->MarkDead(); running_size--; } } - process.Kill(); } - std::list> new_idle_of_all_languages; - for (auto it = idle_of_all_languages.begin(); it != idle_of_all_languages.end(); it++) { - if (!(*it)->IsDead()) { - new_idle_of_all_languages.push_back(*it); + std::list, int64_t>> + new_idle_of_all_languages; + idle_of_all_languages_map_.clear(); + for (const auto &idle_pair : idle_of_all_languages_) { + if (!idle_pair.first->IsDead()) { + new_idle_of_all_languages.push_back(idle_pair); + idle_of_all_languages_map_.emplace(idle_pair); } } - idle_of_all_languages = std::move(new_idle_of_all_languages); + idle_of_all_languages_ = std::move(new_idle_of_all_languages); + RAY_CHECK(idle_of_all_languages_.size() == idle_of_all_languages_map_.size()); } std::shared_ptr WorkerPool::PopWorker( @@ -760,18 +809,19 @@ std::shared_ptr WorkerPool::PopWorker( } else { // Find an available worker which is already assigned to this job. // Try to pop the most recently pushed worker. - for (auto it = idle_of_all_languages.rbegin(); it != idle_of_all_languages.rend(); + for (auto it = idle_of_all_languages_.rbegin(); it != idle_of_all_languages_.rend(); it++) { - if (task_spec.GetLanguage() != (*it)->GetLanguage() || - (*it)->GetAssignedJobId() != task_spec.JobId()) { + if (task_spec.GetLanguage() != it->first->GetLanguage() || + it->first->GetAssignedJobId() != task_spec.JobId()) { continue; } - state.idle.erase(*it); + state.idle.erase(it->first); // We can't erase a reverse_iterator. auto lit = it.base(); lit--; - worker = std::move(*lit); - idle_of_all_languages.erase(lit); + worker = std::move(lit->first); + idle_of_all_languages_.erase(lit); + idle_of_all_languages_map_.erase(worker); break; } if (worker == nullptr) { @@ -804,6 +854,14 @@ std::shared_ptr WorkerPool::PopWorker( bool WorkerPool::DisconnectWorker(const std::shared_ptr &worker) { auto &state = GetStateForLanguage(worker->GetLanguage()); RAY_CHECK(RemoveWorker(state.registered_workers, worker)); + for (auto it = idle_of_all_languages_.begin(); it != idle_of_all_languages_.end(); + it++) { + if (it->first == worker) { + idle_of_all_languages_.erase(it); + idle_of_all_languages_map_.erase(worker); + break; + } + } stats::CurrentWorker().Record( 0, {{stats::LanguageKey, Language_Name(worker->GetLanguage())}, @@ -958,7 +1016,7 @@ std::string WorkerPool::DebugString() const { result << "\n- num " << Language_Name(entry.first) << " drivers: " << entry.second.registered_drivers.size(); } - result << "- num idle workers: " << idle_of_all_languages.size(); + result << "- num idle workers: " << idle_of_all_languages_.size(); return result.str(); } diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 5746931a5..b78d6cdd5 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -187,10 +187,6 @@ class WorkerPool : public WorkerPoolInterface { /// \param The idle worker to add. void PushWorker(const std::shared_ptr &worker); - /// Try killing idle workers to ensure the running workers are in a - /// reasonable size. - void TryKillingIdleWorkers(); - /// Pop an idle worker from the pool. The caller is responsible for pushing /// the worker back onto the pool once the worker has completed its work. /// @@ -363,6 +359,13 @@ class WorkerPool : public WorkerPoolInterface { /// started. void TryStartIOWorkers(const Language &language, State &state); + /// Try killing idle workers to ensure the running workers are in a + /// reasonable size. + void TryKillingIdleWorkers(); + + /// Schedule the periodic killing of idle workers. + void ScheduleIdleWorkerKilling(); + /// Get all workers of the given process. /// /// \param process The process of workers. @@ -407,8 +410,17 @@ class WorkerPool : public WorkerPoolInterface { absl::flat_hash_map unfinished_jobs_; /// The pool of idle non-actor workers of all languages. This is used to kill idle - /// workers in FIFO order. - std::list> idle_of_all_languages; + /// workers in FIFO order. The second element of std::pair is the time a worker becomes + /// idle. + std::list, int64_t>> idle_of_all_languages_; + + /// This map stores the same data as `idle_of_all_languages_`, but in a map structure + /// for lookup performance. + std::unordered_map, int64_t> + idle_of_all_languages_map_; + + /// The timer to trigger idle worker killing. + boost::asio::deadline_timer kill_idle_workers_timer_; }; } // namespace raylet diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index 6a2069845..3d0e599bd 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -186,6 +186,9 @@ class CoreWorkerClientInterface { const ClientCallback &callback) { } + virtual void Exit(const ExitRequest &request, + const ClientCallback &callback) {} + virtual ~CoreWorkerClientInterface(){}; }; @@ -244,6 +247,8 @@ class CoreWorkerClient : public std::enable_shared_from_this, VOID_RPC_CLIENT_METHOD(CoreWorkerService, PlasmaObjectReady, grpc_client_, override) + VOID_RPC_CLIENT_METHOD(CoreWorkerService, Exit, grpc_client_, override) + void PushActorTask(std::unique_ptr request, bool skip_queue, const ClientCallback &callback) override { if (skip_queue) { diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index 4926e047f..88ccde3c2 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -43,7 +43,8 @@ namespace rpc { RPC_SERVICE_HANDLER(CoreWorkerService, LocalGC) \ RPC_SERVICE_HANDLER(CoreWorkerService, SpillObjects) \ RPC_SERVICE_HANDLER(CoreWorkerService, RestoreSpilledObjects) \ - RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady) + RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady) \ + RPC_SERVICE_HANDLER(CoreWorkerService, Exit) #define RAY_CORE_WORKER_DECLARE_RPC_HANDLERS \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PushTask) \ @@ -62,7 +63,8 @@ namespace rpc { DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(LocalGC) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(SpillObjects) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(RestoreSpilledObjects) \ - DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PlasmaObjectReady) + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PlasmaObjectReady) \ + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(Exit) /// Interface of the `CoreWorkerServiceHandler`, see `src/ray/protobuf/core_worker.proto`. class CoreWorkerServiceHandler {