mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 14:42:31 +08:00
Use subprocess.check_output in tests (#5465)
This commit is contained in:
committed by
Robert Nishihara
parent
c7ae4e5e1f
commit
0440c00019
@@ -9,7 +9,6 @@ import subprocess
|
||||
|
||||
import ray
|
||||
from ray.tests.cluster_utils import Cluster
|
||||
from ray.tests.utils import run_and_get_output
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -155,7 +154,8 @@ def ray_start_object_store_memory(request):
|
||||
def call_ray_start(request):
|
||||
parameter = getattr(request, "param", "ray start --head --num-cpus=1")
|
||||
command_args = parameter.split(" ")
|
||||
out = run_and_get_output(command_args)
|
||||
out = ray.utils.decode(
|
||||
subprocess.check_output(command_args, stderr=subprocess.STDOUT))
|
||||
# Get the redis address from the output.
|
||||
redis_substring_prefix = "redis_address=\""
|
||||
redis_address_location = (
|
||||
@@ -168,7 +168,7 @@ def call_ray_start(request):
|
||||
# Disconnect from the Ray cluster.
|
||||
ray.shutdown()
|
||||
# Kill the Ray cluster.
|
||||
subprocess.Popen(["ray", "stop"]).wait()
|
||||
subprocess.check_output(["ray", "stop"])
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
|
||||
@@ -10,18 +10,19 @@ import time
|
||||
|
||||
import ray
|
||||
|
||||
from ray.tests.utils import run_and_get_output
|
||||
|
||||
|
||||
def _test_cleanup_on_driver_exit(num_redis_shards):
|
||||
stdout = run_and_get_output([
|
||||
"ray",
|
||||
"start",
|
||||
"--head",
|
||||
"--num-redis-shards",
|
||||
str(num_redis_shards),
|
||||
])
|
||||
lines = [m.strip() for m in stdout.split("\n")]
|
||||
output = ray.utils.decode(
|
||||
subprocess.check_output(
|
||||
[
|
||||
"ray",
|
||||
"start",
|
||||
"--head",
|
||||
"--num-redis-shards",
|
||||
str(num_redis_shards),
|
||||
],
|
||||
stderr=subprocess.STDOUT))
|
||||
lines = [m.strip() for m in output.split("\n")]
|
||||
init_cmd = [m for m in lines if m.startswith("ray.init")]
|
||||
assert 1 == len(init_cmd)
|
||||
redis_address = init_cmd[0].split("redis_address=\"")[-1][:-2]
|
||||
@@ -90,7 +91,7 @@ def _test_cleanup_on_driver_exit(num_redis_shards):
|
||||
assert (0, 1) == StateSummary()
|
||||
|
||||
ray.shutdown()
|
||||
subprocess.Popen(["ray", "stop"]).wait()
|
||||
subprocess.check_output(["ray", "stop"])
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
|
||||
@@ -9,7 +9,7 @@ import time
|
||||
|
||||
import ray
|
||||
from ray.utils import _random_string
|
||||
from ray.tests.utils import (run_and_get_output, run_string_as_driver,
|
||||
from ray.tests.utils import (run_string_as_driver,
|
||||
run_string_as_driver_nonblocking)
|
||||
|
||||
|
||||
@@ -272,60 +272,60 @@ def test_calling_start_ray_head():
|
||||
# should also test the non-head node code path.
|
||||
|
||||
# Test starting Ray with no arguments.
|
||||
run_and_get_output(["ray", "start", "--head"])
|
||||
subprocess.Popen(["ray", "stop"]).wait()
|
||||
subprocess.check_output(["ray", "start", "--head"])
|
||||
subprocess.check_output(["ray", "stop"])
|
||||
|
||||
# Test starting Ray with a redis port specified.
|
||||
run_and_get_output(["ray", "start", "--head", "--redis-port", "6379"])
|
||||
subprocess.Popen(["ray", "stop"]).wait()
|
||||
subprocess.check_output(["ray", "start", "--head", "--redis-port", "6379"])
|
||||
subprocess.check_output(["ray", "stop"])
|
||||
|
||||
# Test starting Ray with a node IP address specified.
|
||||
run_and_get_output(
|
||||
subprocess.check_output(
|
||||
["ray", "start", "--head", "--node-ip-address", "127.0.0.1"])
|
||||
subprocess.Popen(["ray", "stop"]).wait()
|
||||
subprocess.check_output(["ray", "stop"])
|
||||
|
||||
# Test starting Ray with the object manager and node manager ports
|
||||
# specified.
|
||||
run_and_get_output([
|
||||
subprocess.check_output([
|
||||
"ray", "start", "--head", "--object-manager-port", "12345",
|
||||
"--node-manager-port", "54321"
|
||||
])
|
||||
subprocess.Popen(["ray", "stop"]).wait()
|
||||
subprocess.check_output(["ray", "stop"])
|
||||
|
||||
# Test starting Ray with the number of CPUs specified.
|
||||
run_and_get_output(["ray", "start", "--head", "--num-cpus", "2"])
|
||||
subprocess.Popen(["ray", "stop"]).wait()
|
||||
subprocess.check_output(["ray", "start", "--head", "--num-cpus", "2"])
|
||||
subprocess.check_output(["ray", "stop"])
|
||||
|
||||
# Test starting Ray with the number of GPUs specified.
|
||||
run_and_get_output(["ray", "start", "--head", "--num-gpus", "100"])
|
||||
subprocess.Popen(["ray", "stop"]).wait()
|
||||
subprocess.check_output(["ray", "start", "--head", "--num-gpus", "100"])
|
||||
subprocess.check_output(["ray", "stop"])
|
||||
|
||||
# Test starting Ray with the max redis clients specified.
|
||||
run_and_get_output(
|
||||
subprocess.check_output(
|
||||
["ray", "start", "--head", "--redis-max-clients", "100"])
|
||||
subprocess.Popen(["ray", "stop"]).wait()
|
||||
subprocess.check_output(["ray", "stop"])
|
||||
|
||||
if "RAY_USE_NEW_GCS" not in os.environ:
|
||||
# Test starting Ray with redis shard ports specified.
|
||||
run_and_get_output([
|
||||
subprocess.check_output([
|
||||
"ray", "start", "--head", "--redis-shard-ports", "6380,6381,6382"
|
||||
])
|
||||
subprocess.Popen(["ray", "stop"]).wait()
|
||||
subprocess.check_output(["ray", "stop"])
|
||||
|
||||
# Test starting Ray with all arguments specified.
|
||||
run_and_get_output([
|
||||
subprocess.check_output([
|
||||
"ray", "start", "--head", "--redis-port", "6379",
|
||||
"--redis-shard-ports", "6380,6381,6382", "--object-manager-port",
|
||||
"12345", "--num-cpus", "2", "--num-gpus", "0",
|
||||
"--redis-max-clients", "100", "--resources", "{\"Custom\": 1}"
|
||||
])
|
||||
subprocess.Popen(["ray", "stop"]).wait()
|
||||
subprocess.check_output(["ray", "stop"])
|
||||
|
||||
# Test starting Ray with invalid arguments.
|
||||
with pytest.raises(Exception):
|
||||
run_and_get_output(
|
||||
with pytest.raises(subprocess.CalledProcessError):
|
||||
subprocess.check_output(
|
||||
["ray", "start", "--head", "--redis-address", "127.0.0.1:6379"])
|
||||
subprocess.Popen(["ray", "stop"]).wait()
|
||||
subprocess.check_output(["ray", "stop"])
|
||||
|
||||
# Test --block. Killing any child process should cause the command to exit.
|
||||
blocked = subprocess.Popen(["ray", "start", "--head", "--block"])
|
||||
|
||||
@@ -37,16 +37,6 @@ def wait_for_pid_to_exit(pid, timeout=20):
|
||||
raise Exception("Timed out while waiting for process to exit.")
|
||||
|
||||
|
||||
def run_and_get_output(command):
|
||||
with tempfile.NamedTemporaryFile() as tmp:
|
||||
p = subprocess.Popen(command, stdout=tmp, stderr=tmp)
|
||||
if p.wait() != 0:
|
||||
raise RuntimeError("ray start did not terminate properly")
|
||||
with open(tmp.name, "r") as f:
|
||||
result = f.readlines()
|
||||
return "\n".join(result)
|
||||
|
||||
|
||||
def run_string_as_driver(driver_script):
|
||||
"""Run a driver as a separate process.
|
||||
|
||||
@@ -61,7 +51,8 @@ def run_string_as_driver(driver_script):
|
||||
f.write(driver_script.encode("ascii"))
|
||||
f.flush()
|
||||
out = ray.utils.decode(
|
||||
subprocess.check_output([sys.executable, f.name]))
|
||||
subprocess.check_output(
|
||||
[sys.executable, f.name], stderr=subprocess.STDOUT))
|
||||
return out
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user