mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 20:22:39 +08:00
Make more tests compatible with Windows (#9303)
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import errno
|
||||
import io
|
||||
import json
|
||||
import fnmatch
|
||||
import os
|
||||
@@ -9,8 +10,12 @@ import time
|
||||
import socket
|
||||
import math
|
||||
|
||||
from contextlib import redirect_stdout, redirect_stderr
|
||||
|
||||
import ray
|
||||
import ray.services
|
||||
import ray.utils
|
||||
from ray.scripts.scripts import main as ray_main
|
||||
|
||||
import psutil # We must import psutil after ray because we bundle it with ray.
|
||||
|
||||
@@ -52,6 +57,63 @@ def _pid_alive(pid):
|
||||
return alive
|
||||
|
||||
|
||||
def check_call_module(main, argv, capture_stdout=False, capture_stderr=False):
|
||||
# We use this function instead of calling the "ray" command to work around
|
||||
# some deadlocks that occur when piping ray's output on Windows
|
||||
stream = io.TextIOWrapper(io.BytesIO(), encoding=sys.stdout.encoding)
|
||||
old_argv = sys.argv[:]
|
||||
try:
|
||||
sys.argv = argv[:]
|
||||
try:
|
||||
with redirect_stderr(stream if capture_stderr else sys.stderr):
|
||||
with redirect_stdout(stream if capture_stdout else sys.stdout):
|
||||
main()
|
||||
finally:
|
||||
stream.flush()
|
||||
except SystemExit as ex:
|
||||
if ex.code:
|
||||
output = stream.buffer.getvalue()
|
||||
raise subprocess.CalledProcessError(ex.code, argv, output)
|
||||
except Exception as ex:
|
||||
output = stream.buffer.getvalue()
|
||||
raise subprocess.CalledProcessError(1, argv, output, ex.args[0])
|
||||
finally:
|
||||
sys.argv = old_argv
|
||||
if capture_stdout:
|
||||
sys.stdout.buffer.write(stream.buffer.getvalue())
|
||||
elif capture_stderr:
|
||||
sys.stderr.buffer.write(stream.buffer.getvalue())
|
||||
return stream.buffer.getvalue()
|
||||
|
||||
|
||||
def check_call_ray(args, capture_stdout=False, capture_stderr=False):
|
||||
# We use this function instead of calling the "ray" command to work around
|
||||
# some deadlocks that occur when piping ray's output on Windows
|
||||
argv = ["ray"] + args
|
||||
if sys.platform == "win32":
|
||||
result = check_call_module(
|
||||
ray_main,
|
||||
argv,
|
||||
capture_stdout=capture_stdout,
|
||||
capture_stderr=capture_stderr)
|
||||
else:
|
||||
stdout_redir = None
|
||||
stderr_redir = None
|
||||
if capture_stdout:
|
||||
stdout_redir = subprocess.PIPE
|
||||
if capture_stderr and capture_stdout:
|
||||
stderr_redir = subprocess.STDOUT
|
||||
elif capture_stderr:
|
||||
stderr_redir = subprocess.PIPE
|
||||
proc = subprocess.Popen(argv, stdout=stdout_redir, stderr=stderr_redir)
|
||||
(stdout, stderr) = proc.communicate()
|
||||
if proc.returncode:
|
||||
raise subprocess.CalledProcessError(proc.returncode, argv, stdout,
|
||||
stderr)
|
||||
result = b"".join([s for s in [stdout, stderr] if s is not None])
|
||||
return result
|
||||
|
||||
|
||||
def wait_for_pid_to_exit(pid, timeout=20):
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < timeout:
|
||||
|
||||
@@ -6,7 +6,6 @@ import shutil
|
||||
import json
|
||||
import sys
|
||||
import socket
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
@@ -20,7 +19,8 @@ import ray.cluster_utils
|
||||
import ray.test_utils
|
||||
import setproctitle
|
||||
|
||||
from ray.test_utils import RayTestTimeoutException, wait_for_num_actors
|
||||
from ray.test_utils import (check_call_ray, RayTestTimeoutException,
|
||||
wait_for_num_actors)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -404,7 +404,8 @@ def test_ray_stack(ray_start_2_cpus):
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < 30:
|
||||
# Attempt to parse the "ray stack" call.
|
||||
output = ray.utils.decode(subprocess.check_output(["ray", "stack"]))
|
||||
output = ray.utils.decode(
|
||||
check_call_ray(["stack"], capture_stdout=True))
|
||||
if ("unique_name_1" in output and "unique_name_2" in output
|
||||
and "unique_name_3" in output):
|
||||
success = True
|
||||
@@ -434,12 +435,13 @@ def test_pandas_parquet_serialization():
|
||||
|
||||
|
||||
def test_socket_dir_not_existing(shutdown_only):
|
||||
random_name = ray.ObjectRef.from_random().hex()
|
||||
temp_raylet_socket_dir = os.path.join(ray.utils.get_ray_temp_dir(),
|
||||
"tests", random_name)
|
||||
temp_raylet_socket_name = os.path.join(temp_raylet_socket_dir,
|
||||
"raylet_socket")
|
||||
ray.init(num_cpus=1, raylet_socket_name=temp_raylet_socket_name)
|
||||
if sys.platform != "win32":
|
||||
random_name = ray.ObjectRef.from_random().hex()
|
||||
temp_raylet_socket_dir = os.path.join(ray.utils.get_ray_temp_dir(),
|
||||
"tests", random_name)
|
||||
temp_raylet_socket_name = os.path.join(temp_raylet_socket_dir,
|
||||
"raylet_socket")
|
||||
ray.init(num_cpus=1, raylet_socket_name=temp_raylet_socket_name)
|
||||
|
||||
|
||||
def test_raylet_is_robust_to_random_messages(ray_start_regular):
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import os
|
||||
import pytest
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
import ray
|
||||
from ray.test_utils import (
|
||||
RayTestTimeoutException, run_string_as_driver,
|
||||
RayTestTimeoutException, check_call_ray, run_string_as_driver,
|
||||
run_string_as_driver_nonblocking, wait_for_children_of_pid,
|
||||
wait_for_children_of_pid_to_exit, kill_process_by_name, Semaphore)
|
||||
|
||||
@@ -357,66 +358,62 @@ def test_calling_start_ray_head(call_ray_stop_only):
|
||||
# should also test the non-head node code path.
|
||||
|
||||
# Test starting Ray with no arguments.
|
||||
subprocess.check_call(["ray", "start", "--head"])
|
||||
subprocess.check_call(["ray", "stop"])
|
||||
check_call_ray(["start", "--head"])
|
||||
check_call_ray(["stop"])
|
||||
|
||||
# Test starting Ray with a redis port specified.
|
||||
subprocess.check_call(["ray", "start", "--head"])
|
||||
subprocess.check_call(["ray", "stop"])
|
||||
check_call_ray(["start", "--head"])
|
||||
check_call_ray(["stop"])
|
||||
|
||||
# Test starting Ray with a node IP address specified.
|
||||
subprocess.check_call(
|
||||
["ray", "start", "--head", "--node-ip-address", "127.0.0.1"])
|
||||
subprocess.check_call(["ray", "stop"])
|
||||
check_call_ray(["start", "--head", "--node-ip-address", "127.0.0.1"])
|
||||
check_call_ray(["stop"])
|
||||
|
||||
# Test starting Ray with the object manager and node manager ports
|
||||
# specified.
|
||||
subprocess.check_call([
|
||||
"ray", "start", "--head", "--object-manager-port", "12345",
|
||||
check_call_ray([
|
||||
"start", "--head", "--object-manager-port", "12345",
|
||||
"--node-manager-port", "54321"
|
||||
])
|
||||
subprocess.check_call(["ray", "stop"])
|
||||
check_call_ray(["stop"])
|
||||
|
||||
# Test starting Ray with the worker port range specified.
|
||||
subprocess.check_call([
|
||||
"ray", "start", "--head", "--min-worker-port", "50000",
|
||||
"--max-worker-port", "51000"
|
||||
check_call_ray([
|
||||
"start", "--head", "--min-worker-port", "50000", "--max-worker-port",
|
||||
"51000"
|
||||
])
|
||||
subprocess.check_call(["ray", "stop"])
|
||||
check_call_ray(["stop"])
|
||||
|
||||
# Test starting Ray with the number of CPUs specified.
|
||||
subprocess.check_call(["ray", "start", "--head", "--num-cpus", "2"])
|
||||
subprocess.check_call(["ray", "stop"])
|
||||
check_call_ray(["start", "--head", "--num-cpus", "2"])
|
||||
check_call_ray(["stop"])
|
||||
|
||||
# Test starting Ray with the number of GPUs specified.
|
||||
subprocess.check_call(["ray", "start", "--head", "--num-gpus", "100"])
|
||||
subprocess.check_call(["ray", "stop"])
|
||||
check_call_ray(["start", "--head", "--num-gpus", "100"])
|
||||
check_call_ray(["stop"])
|
||||
|
||||
# Test starting Ray with the max redis clients specified.
|
||||
subprocess.check_call(
|
||||
["ray", "start", "--head", "--redis-max-clients", "100"])
|
||||
subprocess.check_call(["ray", "stop"])
|
||||
check_call_ray(["start", "--head", "--redis-max-clients", "100"])
|
||||
check_call_ray(["stop"])
|
||||
|
||||
if "RAY_USE_NEW_GCS" not in os.environ:
|
||||
# Test starting Ray with redis shard ports specified.
|
||||
subprocess.check_call([
|
||||
"ray", "start", "--head", "--redis-shard-ports", "6380,6381,6382"
|
||||
])
|
||||
subprocess.check_call(["ray", "stop"])
|
||||
check_call_ray(
|
||||
["start", "--head", "--redis-shard-ports", "6380,6381,6382"])
|
||||
check_call_ray(["stop"])
|
||||
|
||||
# Test starting Ray with all arguments specified.
|
||||
subprocess.check_call([
|
||||
"ray", "start", "--head", "--redis-shard-ports", "6380,6381,6382",
|
||||
check_call_ray([
|
||||
"start", "--head", "--redis-shard-ports", "6380,6381,6382",
|
||||
"--object-manager-port", "12345", "--num-cpus", "2", "--num-gpus",
|
||||
"0", "--redis-max-clients", "100", "--resources", "{\"Custom\": 1}"
|
||||
])
|
||||
subprocess.check_call(["ray", "stop"])
|
||||
check_call_ray(["stop"])
|
||||
|
||||
# Test starting Ray with invalid arguments.
|
||||
with pytest.raises(subprocess.CalledProcessError):
|
||||
subprocess.check_call(
|
||||
["ray", "start", "--head", "--address", "127.0.0.1:6379"])
|
||||
subprocess.check_call(["ray", "stop"])
|
||||
check_call_ray(["start", "--head", "--address", "127.0.0.1:6379"])
|
||||
check_call_ray(["stop"])
|
||||
|
||||
# Test --block. Killing a child process should cause the command to exit.
|
||||
blocked = subprocess.Popen(["ray", "start", "--head", "--block"])
|
||||
@@ -694,18 +691,22 @@ ray.get(main_wait.release.remote())
|
||||
ray.get(main_wait.acquire.remote())
|
||||
ray.get(main_wait.acquire.remote())
|
||||
|
||||
driver1_out = p1.stdout.read().decode("ascii").split("\n")
|
||||
driver2_out = p2.stdout.read().decode("ascii").split("\n")
|
||||
driver1_out = p1.stdout.read().decode("ascii")
|
||||
driver2_out = p2.stdout.read().decode("ascii")
|
||||
if sys.platform == "win32":
|
||||
driver1_out = driver1_out.replace("\r", "")
|
||||
driver2_out = driver2_out.replace("\r", "")
|
||||
driver1_out_split = driver1_out.split("\n")
|
||||
driver2_out_split = driver2_out.split("\n")
|
||||
|
||||
assert driver1_out[0][-1] == "1"
|
||||
assert driver1_out[1][-1] == "2"
|
||||
assert driver2_out[0][-1] == "3"
|
||||
assert driver2_out[1][-1] == "4"
|
||||
assert driver1_out_split[0][-1] == "1"
|
||||
assert driver1_out_split[1][-1] == "2"
|
||||
assert driver2_out_split[0][-1] == "3"
|
||||
assert driver2_out_split[1][-1] == "4"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
import sys
|
||||
# Make subprocess happy in bazel.
|
||||
os.environ["LC_ALL"] = "en_US.UTF-8"
|
||||
os.environ["LANG"] = "en_US.UTF-8"
|
||||
|
||||
@@ -11,6 +11,7 @@ from contextlib import contextmanager
|
||||
|
||||
from ray.projects.scripts import (session_start, session_commands,
|
||||
session_execute)
|
||||
from ray.test_utils import check_call_ray
|
||||
import ray
|
||||
|
||||
TEST_DIR = os.path.join(
|
||||
@@ -57,13 +58,14 @@ def test_project_root():
|
||||
|
||||
|
||||
def test_project_validation():
|
||||
path = os.path.join(TEST_DIR, "project1")
|
||||
subprocess.check_call(["ray", "project", "validate"], cwd=path)
|
||||
with _chdir_and_back(os.path.join(TEST_DIR, "project1")):
|
||||
check_call_ray(["project", "validate"])
|
||||
|
||||
|
||||
def test_project_no_validation():
|
||||
with pytest.raises(subprocess.CalledProcessError):
|
||||
subprocess.check_call(["ray", "project", "validate"], cwd=TEST_DIR)
|
||||
with _chdir_and_back(TEST_DIR):
|
||||
with pytest.raises(subprocess.CalledProcessError):
|
||||
check_call_ray(["project", "validate"])
|
||||
|
||||
|
||||
@contextmanager
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
import pytest
|
||||
import ray
|
||||
from ray.cluster_utils import Cluster
|
||||
from ray.test_utils import check_call_ray
|
||||
|
||||
|
||||
def unix_socket_create_path(name):
|
||||
@@ -73,8 +73,8 @@ def test_tempdir(shutdown_only):
|
||||
|
||||
def test_tempdir_commandline():
|
||||
shutil.rmtree(ray.utils.get_ray_temp_dir(), ignore_errors=True)
|
||||
subprocess.check_call([
|
||||
"ray", "start", "--head", "--temp-dir=" + os.path.join(
|
||||
check_call_ray([
|
||||
"start", "--head", "--temp-dir=" + os.path.join(
|
||||
ray.utils.get_user_temp_dir(), "i_am_a_temp_dir2")
|
||||
])
|
||||
assert os.path.exists(
|
||||
@@ -82,7 +82,7 @@ def test_tempdir_commandline():
|
||||
"i_am_a_temp_dir2")), "Specified temp dir not found."
|
||||
assert not os.path.exists(
|
||||
ray.utils.get_ray_temp_dir()), "Default temp dir should not exist."
|
||||
subprocess.check_call(["ray", "stop"])
|
||||
check_call_ray(["stop"])
|
||||
shutil.rmtree(
|
||||
os.path.join(ray.utils.get_user_temp_dir(), "i_am_a_temp_dir2"),
|
||||
ignore_errors=True)
|
||||
|
||||
@@ -212,7 +212,7 @@ class TrainingOperator:
|
||||
updating the model.
|
||||
|
||||
By default, this method implementation assumes that batches
|
||||
are in (\*features, labels) format. So we also support multiple inputs
|
||||
are in (\\*features, labels) format. So we also support multiple inputs
|
||||
model. If using amp/fp16 training, it will also scale the loss
|
||||
automatically.
|
||||
|
||||
@@ -309,7 +309,7 @@ class TrainingOperator:
|
||||
You can override this method to provide arbitrary metrics.
|
||||
|
||||
Same as ``train_batch``, this method implementation assumes that
|
||||
batches are in (\*features, labels) format by default. So we also
|
||||
batches are in (\\*features, labels) format by default. So we also
|
||||
support multiple inputs model.
|
||||
|
||||
Args:
|
||||
|
||||
Reference in New Issue
Block a user