[Core] Multi-tenancy: Kill idle workers in FIFO order (#10597)

* Kill idle workers in FIFO order

* Update test

* minor update

* Address comments

* fix after merge

* fix worker_pool_test
This commit is contained in:
Kai Yang
2020-09-23 01:59:11 +08:00
committed by GitHub
parent 1deb281ea6
commit 864d1d2b59
4 changed files with 170 additions and 97 deletions
+60 -19
View File
@@ -9,21 +9,22 @@ import pytest
import ray
import ray.test_utils
from ray.core.generated import node_manager_pb2, node_manager_pb2_grpc
from ray.test_utils import (wait_for_condition, run_string_as_driver,
from ray.test_utils import (wait_for_condition, wait_for_pid_to_exit,
run_string_as_driver,
run_string_as_driver_nonblocking)
def get_num_workers():
def get_workers():
raylet = ray.nodes()[0]
raylet_address = "{}:{}".format(raylet["NodeManagerAddress"],
raylet["NodeManagerPort"])
channel = grpc.insecure_channel(raylet_address)
stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel)
return len([
return [
worker for worker in stub.GetNodeStats(
node_manager_pb2.GetNodeStatsRequest()).workers_stats
if not worker.is_driver
])
]
# Test that when `redis_address` and `job_config` is not set in
@@ -34,7 +35,7 @@ def test_initial_workers(shutdown_only):
num_cpus=1,
include_dashboard=True,
_system_config={"enable_multi_tenancy": True})
wait_for_condition(lambda: get_num_workers() == 1)
wait_for_condition(lambda: len(get_workers()) == 1)
# This test case starts some driver processes. Each driver process submits
@@ -131,7 +132,7 @@ def test_worker_env(shutdown_only):
def test_worker_capping_kill_idle_workers(shutdown_only):
# Avoid starting initial workers by setting num_cpus to 0.
ray.init(num_cpus=0, _system_config={"enable_multi_tenancy": True})
assert get_num_workers() == 0
assert len(get_workers()) == 0
@ray.remote(num_cpus=0)
class Actor:
@@ -141,7 +142,7 @@ def test_worker_capping_kill_idle_workers(shutdown_only):
actor = Actor.remote()
ray.get(actor.ping.remote())
# Actor is now alive and worker 1 which holds the actor is alive
assert get_num_workers() == 1
assert len(get_workers()) == 1
@ray.remote(num_cpus=0)
def foo():
@@ -150,18 +151,18 @@ def test_worker_capping_kill_idle_workers(shutdown_only):
obj1 = foo.remote()
# Worker 2 runs a normal task
wait_for_condition(lambda: get_num_workers() == 2)
wait_for_condition(lambda: len(get_workers()) == 2)
obj2 = foo.remote()
# Worker 3 runs a normal task
wait_for_condition(lambda: get_num_workers() == 3)
wait_for_condition(lambda: len(get_workers()) == 3)
ray.get(obj1)
# Worker 2 now becomes idle and should be killed
wait_for_condition(lambda: get_num_workers() == 2)
wait_for_condition(lambda: len(get_workers()) == 2)
ray.get(obj2)
# Worker 3 now becomes idle and should be killed
wait_for_condition(lambda: get_num_workers() == 1)
wait_for_condition(lambda: len(get_workers()) == 1)
def test_worker_capping_run_many_small_tasks(shutdown_only):
@@ -174,16 +175,16 @@ def test_worker_capping_run_many_small_tasks(shutdown_only):
# Run more tasks than `num_cpus`, but the CPU resource requirement is
# still within `num_cpus`.
obj_refs = [foo.remote() for _ in range(4)]
wait_for_condition(lambda: get_num_workers() == 4)
wait_for_condition(lambda: len(get_workers()) == 4)
ray.get(obj_refs)
# After finished the tasks, some workers are killed to keep the total
# number of workers <= num_cpus.
wait_for_condition(lambda: get_num_workers() == 2)
wait_for_condition(lambda: len(get_workers()) == 2)
time.sleep(1)
# The two remaining workers stay alive forever.
assert get_num_workers() == 2
assert len(get_workers()) == 2
def test_worker_capping_run_chained_tasks(shutdown_only):
@@ -200,16 +201,56 @@ def test_worker_capping_run_chained_tasks(shutdown_only):
# Run a chain of tasks which exceed `num_cpus` in amount, but the CPU
# resource requirement is still within `num_cpus`.
obj = foo.remote(4)
wait_for_condition(lambda: get_num_workers() == 4)
wait_for_condition(lambda: len(get_workers()) == 4)
ray.get(obj)
# After finished the tasks, some workers are killed to keep the total
# number of workers <= num_cpus.
wait_for_condition(lambda: get_num_workers() == 2)
wait_for_condition(lambda: len(get_workers()) == 2)
time.sleep(1)
# The two remaining workers stay alive forever.
assert get_num_workers() == 2
assert len(get_workers()) == 2
def test_worker_capping_fifo(shutdown_only):
# Start 2 initial workers by setting num_cpus to 2.
info = ray.init(num_cpus=2, _system_config={"enable_multi_tenancy": True})
wait_for_condition(lambda: len(get_workers()) == 2)
time.sleep(1)
@ray.remote
def getpid():
return os.getpid()
worker1, worker2 = get_workers()
if worker1.pid == ray.get(getpid.remote()):
worker1, worker2 = [worker2, worker1]
# Worker 1 is before worker 2 in the FIFO queue.
driver_code = """
import ray
ray.init(address="{}")
@ray.remote
def foo():
pass
ray.get(foo.remote())
ray.shutdown()
""".format(info["redis_address"])
run_string_as_driver(driver_code)
# Worker 1 should have been killed.
wait_for_pid_to_exit(worker1.pid)
wait_for_condition(lambda: len(get_workers()) == 1)
assert worker2.pid == get_workers()[0].pid
def test_worker_registration_failure_after_driver_exit(shutdown_only):
@@ -231,14 +272,14 @@ def foo():
ray.shutdown()
""".format(info["redis_address"])
before = get_num_workers()
before = len(get_workers())
assert before == 1
run_string_as_driver(driver_code)
# wait for a while to let workers register
time.sleep(2)
wait_for_condition(lambda: get_num_workers() == before, timeout=10)
wait_for_condition(lambda: len(get_workers()) == before)
if __name__ == "__main__":