diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index f7c93fd50..031b41111 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -9,7 +9,6 @@ import subprocess import ray from ray.tests.cluster_utils import Cluster -from ray.tests.utils import run_and_get_output @pytest.fixture @@ -155,7 +154,8 @@ def ray_start_object_store_memory(request): def call_ray_start(request): parameter = getattr(request, "param", "ray start --head --num-cpus=1") command_args = parameter.split(" ") - out = run_and_get_output(command_args) + out = ray.utils.decode( + subprocess.check_output(command_args, stderr=subprocess.STDOUT)) # Get the redis address from the output. redis_substring_prefix = "redis_address=\"" redis_address_location = ( @@ -168,7 +168,7 @@ def call_ray_start(request): # Disconnect from the Ray cluster. ray.shutdown() # Kill the Ray cluster. - subprocess.Popen(["ray", "stop"]).wait() + subprocess.check_output(["ray", "stop"]) @pytest.fixture() diff --git a/python/ray/tests/test_monitors.py b/python/ray/tests/test_monitors.py index 9eebe7e45..f80eb5e57 100644 --- a/python/ray/tests/test_monitors.py +++ b/python/ray/tests/test_monitors.py @@ -10,18 +10,19 @@ import time import ray -from ray.tests.utils import run_and_get_output - def _test_cleanup_on_driver_exit(num_redis_shards): - stdout = run_and_get_output([ - "ray", - "start", - "--head", - "--num-redis-shards", - str(num_redis_shards), - ]) - lines = [m.strip() for m in stdout.split("\n")] + output = ray.utils.decode( + subprocess.check_output( + [ + "ray", + "start", + "--head", + "--num-redis-shards", + str(num_redis_shards), + ], + stderr=subprocess.STDOUT)) + lines = [m.strip() for m in output.split("\n")] init_cmd = [m for m in lines if m.startswith("ray.init")] assert 1 == len(init_cmd) redis_address = init_cmd[0].split("redis_address=\"")[-1][:-2] @@ -90,7 +91,7 @@ def _test_cleanup_on_driver_exit(num_redis_shards): assert (0, 1) == StateSummary() ray.shutdown() - subprocess.Popen(["ray", "stop"]).wait() + subprocess.check_output(["ray", "stop"]) @pytest.mark.skipif( diff --git a/python/ray/tests/test_multi_node.py b/python/ray/tests/test_multi_node.py index 50eac65ab..446c34d80 100644 --- a/python/ray/tests/test_multi_node.py +++ b/python/ray/tests/test_multi_node.py @@ -9,7 +9,7 @@ import time import ray from ray.utils import _random_string -from ray.tests.utils import (run_and_get_output, run_string_as_driver, +from ray.tests.utils import (run_string_as_driver, run_string_as_driver_nonblocking) @@ -272,60 +272,60 @@ def test_calling_start_ray_head(): # should also test the non-head node code path. # Test starting Ray with no arguments. - run_and_get_output(["ray", "start", "--head"]) - subprocess.Popen(["ray", "stop"]).wait() + subprocess.check_output(["ray", "start", "--head"]) + subprocess.check_output(["ray", "stop"]) # Test starting Ray with a redis port specified. - run_and_get_output(["ray", "start", "--head", "--redis-port", "6379"]) - subprocess.Popen(["ray", "stop"]).wait() + subprocess.check_output(["ray", "start", "--head", "--redis-port", "6379"]) + subprocess.check_output(["ray", "stop"]) # Test starting Ray with a node IP address specified. - run_and_get_output( + subprocess.check_output( ["ray", "start", "--head", "--node-ip-address", "127.0.0.1"]) - subprocess.Popen(["ray", "stop"]).wait() + subprocess.check_output(["ray", "stop"]) # Test starting Ray with the object manager and node manager ports # specified. - run_and_get_output([ + subprocess.check_output([ "ray", "start", "--head", "--object-manager-port", "12345", "--node-manager-port", "54321" ]) - subprocess.Popen(["ray", "stop"]).wait() + subprocess.check_output(["ray", "stop"]) # Test starting Ray with the number of CPUs specified. - run_and_get_output(["ray", "start", "--head", "--num-cpus", "2"]) - subprocess.Popen(["ray", "stop"]).wait() + subprocess.check_output(["ray", "start", "--head", "--num-cpus", "2"]) + subprocess.check_output(["ray", "stop"]) # Test starting Ray with the number of GPUs specified. - run_and_get_output(["ray", "start", "--head", "--num-gpus", "100"]) - subprocess.Popen(["ray", "stop"]).wait() + subprocess.check_output(["ray", "start", "--head", "--num-gpus", "100"]) + subprocess.check_output(["ray", "stop"]) # Test starting Ray with the max redis clients specified. - run_and_get_output( + subprocess.check_output( ["ray", "start", "--head", "--redis-max-clients", "100"]) - subprocess.Popen(["ray", "stop"]).wait() + subprocess.check_output(["ray", "stop"]) if "RAY_USE_NEW_GCS" not in os.environ: # Test starting Ray with redis shard ports specified. - run_and_get_output([ + subprocess.check_output([ "ray", "start", "--head", "--redis-shard-ports", "6380,6381,6382" ]) - subprocess.Popen(["ray", "stop"]).wait() + subprocess.check_output(["ray", "stop"]) # Test starting Ray with all arguments specified. - run_and_get_output([ + subprocess.check_output([ "ray", "start", "--head", "--redis-port", "6379", "--redis-shard-ports", "6380,6381,6382", "--object-manager-port", "12345", "--num-cpus", "2", "--num-gpus", "0", "--redis-max-clients", "100", "--resources", "{\"Custom\": 1}" ]) - subprocess.Popen(["ray", "stop"]).wait() + subprocess.check_output(["ray", "stop"]) # Test starting Ray with invalid arguments. - with pytest.raises(Exception): - run_and_get_output( + with pytest.raises(subprocess.CalledProcessError): + subprocess.check_output( ["ray", "start", "--head", "--redis-address", "127.0.0.1:6379"]) - subprocess.Popen(["ray", "stop"]).wait() + subprocess.check_output(["ray", "stop"]) # Test --block. Killing any child process should cause the command to exit. blocked = subprocess.Popen(["ray", "start", "--head", "--block"]) diff --git a/python/ray/tests/utils.py b/python/ray/tests/utils.py index f01b22fe9..e7dff2639 100644 --- a/python/ray/tests/utils.py +++ b/python/ray/tests/utils.py @@ -37,16 +37,6 @@ def wait_for_pid_to_exit(pid, timeout=20): raise Exception("Timed out while waiting for process to exit.") -def run_and_get_output(command): - with tempfile.NamedTemporaryFile() as tmp: - p = subprocess.Popen(command, stdout=tmp, stderr=tmp) - if p.wait() != 0: - raise RuntimeError("ray start did not terminate properly") - with open(tmp.name, "r") as f: - result = f.readlines() - return "\n".join(result) - - def run_string_as_driver(driver_script): """Run a driver as a separate process. @@ -61,7 +51,8 @@ def run_string_as_driver(driver_script): f.write(driver_script.encode("ascii")) f.flush() out = ray.utils.decode( - subprocess.check_output([sys.executable, f.name])) + subprocess.check_output( + [sys.executable, f.name], stderr=subprocess.STDOUT)) return out