mirror of
https://github.com/wassname/ray.git
synced 2026-07-01 10:18:53 +08:00
[autoscaler] Fix worker capping fifo test in new scheduler (#12512)
This commit is contained in:
@@ -10,9 +10,8 @@ import ray
|
||||
import ray.test_utils
|
||||
from ray.core.generated import common_pb2
|
||||
from ray.core.generated import node_manager_pb2, node_manager_pb2_grpc
|
||||
from ray.test_utils import (
|
||||
wait_for_condition, wait_for_pid_to_exit, run_string_as_driver,
|
||||
run_string_as_driver_nonblocking, new_scheduler_enabled)
|
||||
from ray.test_utils import (wait_for_condition, run_string_as_driver,
|
||||
run_string_as_driver_nonblocking)
|
||||
|
||||
|
||||
def get_workers():
|
||||
@@ -207,50 +206,6 @@ def test_worker_capping_run_chained_tasks(shutdown_only):
|
||||
assert len(get_workers()) == 2
|
||||
|
||||
|
||||
@pytest.mark.skipif(new_scheduler_enabled(), reason="fails")
|
||||
def test_worker_capping_fifo(shutdown_only):
|
||||
# Start 2 initial workers by setting num_cpus to 2.
|
||||
info = ray.init(num_cpus=2)
|
||||
wait_for_condition(lambda: len(get_workers()) == 2)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
@ray.remote
|
||||
def getpid():
|
||||
return os.getpid()
|
||||
|
||||
worker1, worker2 = get_workers()
|
||||
|
||||
if worker1.pid == ray.get(getpid.remote()):
|
||||
worker1, worker2 = [worker2, worker1]
|
||||
|
||||
# Worker 1 is before worker 2 in the FIFO queue.
|
||||
|
||||
driver_code = """
|
||||
import ray
|
||||
import time
|
||||
|
||||
ray.init(address="{}")
|
||||
|
||||
@ray.remote
|
||||
def foo():
|
||||
pass
|
||||
|
||||
ray.get(foo.remote())
|
||||
# Sleep a while to make sure an idle worker exits before this driver exits.
|
||||
time.sleep(2)
|
||||
ray.shutdown()
|
||||
""".format(info["redis_address"])
|
||||
|
||||
run_string_as_driver(driver_code)
|
||||
|
||||
# Worker 1 should have been killed.
|
||||
wait_for_pid_to_exit(worker1.pid)
|
||||
|
||||
wait_for_condition(lambda: len(get_workers()) == 1)
|
||||
assert worker2.pid == get_workers()[0].pid
|
||||
|
||||
|
||||
def test_worker_registration_failure_after_driver_exit(shutdown_only):
|
||||
info = ray.init(num_cpus=1)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user