diff --git a/.bazelrc b/.bazelrc index 3f96f006a..f25d7f87e 100644 --- a/.bazelrc +++ b/.bazelrc @@ -13,6 +13,8 @@ build:linux --force_pic build:macos --force_pic build:clang-cl --compiler=clang-cl build:msvc --compiler=msvc-cl +# This is needed for some core tests to run correctly +test:windows --enable_runfiles # TODO(mehrdadn): Revert the "-\\.(asm|S)$" exclusion when this Bazel bug # for compiling assembly files is fixed on Windows: # https://github.com/bazelbuild/bazel/issues/8924 diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6304d83ba..a658e1c0a 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -115,7 +115,7 @@ jobs: . ./ci/travis/ci.sh build . ./ci/travis/ci.sh upload_wheels || true . ./ci/travis/ci.sh test_python - #. ./ci/travis/ci.sh test_core || true + . ./ci/travis/ci.sh test_core . ./ci/travis/ci.sh test_wheels - name: Run Clang Include-What-You-Use continue-on-error: true diff --git a/ci/travis/ci.sh b/ci/travis/ci.sh index c49a11582..6760609bf 100755 --- a/ci/travis/ci.sh +++ b/ci/travis/ci.sh @@ -114,20 +114,31 @@ upload_wheels() { } test_core() { - bazel test --config=ci --build_tests_only -- //:all -rllib/... + local args=( + "//:*" + ) + case "${OSTYPE}" in + msys) + args+=( + -//:redis_gcs_client_test + -//:core_worker_test + -//:gcs_pub_sub_test + -//:gcs_server_test + -//:gcs_server_rpc_test + -//:subscription_executor_test + ) + ;; + esac + bazel test --config=ci --build_tests_only -- "${args[@]}" } test_python() { if [ "${OSTYPE}" = msys ]; then local args=(python/ray/tests/...) args+=( - -python/ray/tests:test_actor_advanced - -python/ray/tests:test_actor_failures -python/ray/tests:test_advanced_2 - -python/ray/tests:test_advanced_3 - -python/ray/tests:test_array # timeout + -python/ray/tests:test_advanced_3 # test_invalid_unicode_in_worker_log() fails on Windows -python/ray/tests:test_autoscaler_aws - -python/ray/tests:test_autoscaler_yaml -python/ray/tests:test_component_failures -python/ray/tests:test_cython -python/ray/tests:test_failure @@ -137,15 +148,13 @@ test_python() { -python/ray/tests:test_metrics -python/ray/tests:test_multi_node -python/ray/tests:test_multi_node_2 - -python/ray/tests:test_multiprocessing # flaky + -python/ray/tests:test_multiprocessing # test_connect_to_ray() fails to connect to raylet -python/ray/tests:test_node_manager -python/ray/tests:test_object_manager -python/ray/tests:test_projects - -python/ray/tests:test_queue # timeout - -python/ray/tests:test_ray_init # flaky - -python/ray/tests:test_reconstruction # UnreconstructableError - -python/ray/tests:test_stress - -python/ray/tests:test_stress_sharded + -python/ray/tests:test_ray_init # test_redis_port() seems to fail here, but pass in isolation + -python/ray/tests:test_stress # timeout + -python/ray/tests:test_stress_sharded # timeout -python/ray/tests:test_webui ) bazel test -k --config=ci --test_timeout=600 --build_tests_only -- "${args[@]}"; diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index 2e386c8ef..d2b538c12 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -1,5 +1,6 @@ import asyncio import errno +import io import json import fnmatch import os @@ -9,8 +10,12 @@ import time import socket import math +from contextlib import redirect_stdout, redirect_stderr + import ray import ray.services +import ray.utils +from ray.scripts.scripts import main as ray_main import psutil # We must import psutil after ray because we bundle it with ray. @@ -52,6 +57,63 @@ def _pid_alive(pid): return alive +def check_call_module(main, argv, capture_stdout=False, capture_stderr=False): + # We use this function instead of calling the "ray" command to work around + # some deadlocks that occur when piping ray's output on Windows + stream = io.TextIOWrapper(io.BytesIO(), encoding=sys.stdout.encoding) + old_argv = sys.argv[:] + try: + sys.argv = argv[:] + try: + with redirect_stderr(stream if capture_stderr else sys.stderr): + with redirect_stdout(stream if capture_stdout else sys.stdout): + main() + finally: + stream.flush() + except SystemExit as ex: + if ex.code: + output = stream.buffer.getvalue() + raise subprocess.CalledProcessError(ex.code, argv, output) + except Exception as ex: + output = stream.buffer.getvalue() + raise subprocess.CalledProcessError(1, argv, output, ex.args[0]) + finally: + sys.argv = old_argv + if capture_stdout: + sys.stdout.buffer.write(stream.buffer.getvalue()) + elif capture_stderr: + sys.stderr.buffer.write(stream.buffer.getvalue()) + return stream.buffer.getvalue() + + +def check_call_ray(args, capture_stdout=False, capture_stderr=False): + # We use this function instead of calling the "ray" command to work around + # some deadlocks that occur when piping ray's output on Windows + argv = ["ray"] + args + if sys.platform == "win32": + result = check_call_module( + ray_main, + argv, + capture_stdout=capture_stdout, + capture_stderr=capture_stderr) + else: + stdout_redir = None + stderr_redir = None + if capture_stdout: + stdout_redir = subprocess.PIPE + if capture_stderr and capture_stdout: + stderr_redir = subprocess.STDOUT + elif capture_stderr: + stderr_redir = subprocess.PIPE + proc = subprocess.Popen(argv, stdout=stdout_redir, stderr=stderr_redir) + (stdout, stderr) = proc.communicate() + if proc.returncode: + raise subprocess.CalledProcessError(proc.returncode, argv, stdout, + stderr) + result = b"".join([s for s in [stdout, stderr] if s is not None]) + return result + + def wait_for_pid_to_exit(pid, timeout=20): start_time = time.time() while time.time() - start_time < timeout: diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 68e4da4ec..089dd1671 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -6,7 +6,6 @@ import shutil import json import sys import socket -import subprocess import tempfile import time @@ -20,7 +19,8 @@ import ray.cluster_utils import ray.test_utils import setproctitle -from ray.test_utils import RayTestTimeoutException, wait_for_num_actors +from ray.test_utils import (check_call_ray, RayTestTimeoutException, + wait_for_num_actors) logger = logging.getLogger(__name__) @@ -404,7 +404,8 @@ def test_ray_stack(ray_start_2_cpus): start_time = time.time() while time.time() - start_time < 30: # Attempt to parse the "ray stack" call. - output = ray.utils.decode(subprocess.check_output(["ray", "stack"])) + output = ray.utils.decode( + check_call_ray(["stack"], capture_stdout=True)) if ("unique_name_1" in output and "unique_name_2" in output and "unique_name_3" in output): success = True @@ -434,12 +435,13 @@ def test_pandas_parquet_serialization(): def test_socket_dir_not_existing(shutdown_only): - random_name = ray.ObjectRef.from_random().hex() - temp_raylet_socket_dir = os.path.join(ray.utils.get_ray_temp_dir(), - "tests", random_name) - temp_raylet_socket_name = os.path.join(temp_raylet_socket_dir, - "raylet_socket") - ray.init(num_cpus=1, raylet_socket_name=temp_raylet_socket_name) + if sys.platform != "win32": + random_name = ray.ObjectRef.from_random().hex() + temp_raylet_socket_dir = os.path.join(ray.utils.get_ray_temp_dir(), + "tests", random_name) + temp_raylet_socket_name = os.path.join(temp_raylet_socket_dir, + "raylet_socket") + ray.init(num_cpus=1, raylet_socket_name=temp_raylet_socket_name) def test_raylet_is_robust_to_random_messages(ray_start_regular): diff --git a/python/ray/tests/test_multi_node.py b/python/ray/tests/test_multi_node.py index b3be872e7..d600d2cf7 100644 --- a/python/ray/tests/test_multi_node.py +++ b/python/ray/tests/test_multi_node.py @@ -1,11 +1,12 @@ import os import pytest import subprocess +import sys import time import ray from ray.test_utils import ( - RayTestTimeoutException, run_string_as_driver, + RayTestTimeoutException, check_call_ray, run_string_as_driver, run_string_as_driver_nonblocking, wait_for_children_of_pid, wait_for_children_of_pid_to_exit, kill_process_by_name, Semaphore) @@ -357,66 +358,62 @@ def test_calling_start_ray_head(call_ray_stop_only): # should also test the non-head node code path. # Test starting Ray with no arguments. - subprocess.check_call(["ray", "start", "--head"]) - subprocess.check_call(["ray", "stop"]) + check_call_ray(["start", "--head"]) + check_call_ray(["stop"]) # Test starting Ray with a redis port specified. - subprocess.check_call(["ray", "start", "--head"]) - subprocess.check_call(["ray", "stop"]) + check_call_ray(["start", "--head"]) + check_call_ray(["stop"]) # Test starting Ray with a node IP address specified. - subprocess.check_call( - ["ray", "start", "--head", "--node-ip-address", "127.0.0.1"]) - subprocess.check_call(["ray", "stop"]) + check_call_ray(["start", "--head", "--node-ip-address", "127.0.0.1"]) + check_call_ray(["stop"]) # Test starting Ray with the object manager and node manager ports # specified. - subprocess.check_call([ - "ray", "start", "--head", "--object-manager-port", "12345", + check_call_ray([ + "start", "--head", "--object-manager-port", "12345", "--node-manager-port", "54321" ]) - subprocess.check_call(["ray", "stop"]) + check_call_ray(["stop"]) # Test starting Ray with the worker port range specified. - subprocess.check_call([ - "ray", "start", "--head", "--min-worker-port", "50000", - "--max-worker-port", "51000" + check_call_ray([ + "start", "--head", "--min-worker-port", "50000", "--max-worker-port", + "51000" ]) - subprocess.check_call(["ray", "stop"]) + check_call_ray(["stop"]) # Test starting Ray with the number of CPUs specified. - subprocess.check_call(["ray", "start", "--head", "--num-cpus", "2"]) - subprocess.check_call(["ray", "stop"]) + check_call_ray(["start", "--head", "--num-cpus", "2"]) + check_call_ray(["stop"]) # Test starting Ray with the number of GPUs specified. - subprocess.check_call(["ray", "start", "--head", "--num-gpus", "100"]) - subprocess.check_call(["ray", "stop"]) + check_call_ray(["start", "--head", "--num-gpus", "100"]) + check_call_ray(["stop"]) # Test starting Ray with the max redis clients specified. - subprocess.check_call( - ["ray", "start", "--head", "--redis-max-clients", "100"]) - subprocess.check_call(["ray", "stop"]) + check_call_ray(["start", "--head", "--redis-max-clients", "100"]) + check_call_ray(["stop"]) if "RAY_USE_NEW_GCS" not in os.environ: # Test starting Ray with redis shard ports specified. - subprocess.check_call([ - "ray", "start", "--head", "--redis-shard-ports", "6380,6381,6382" - ]) - subprocess.check_call(["ray", "stop"]) + check_call_ray( + ["start", "--head", "--redis-shard-ports", "6380,6381,6382"]) + check_call_ray(["stop"]) # Test starting Ray with all arguments specified. - subprocess.check_call([ - "ray", "start", "--head", "--redis-shard-ports", "6380,6381,6382", + check_call_ray([ + "start", "--head", "--redis-shard-ports", "6380,6381,6382", "--object-manager-port", "12345", "--num-cpus", "2", "--num-gpus", "0", "--redis-max-clients", "100", "--resources", "{\"Custom\": 1}" ]) - subprocess.check_call(["ray", "stop"]) + check_call_ray(["stop"]) # Test starting Ray with invalid arguments. with pytest.raises(subprocess.CalledProcessError): - subprocess.check_call( - ["ray", "start", "--head", "--address", "127.0.0.1:6379"]) - subprocess.check_call(["ray", "stop"]) + check_call_ray(["start", "--head", "--address", "127.0.0.1:6379"]) + check_call_ray(["stop"]) # Test --block. Killing a child process should cause the command to exit. blocked = subprocess.Popen(["ray", "start", "--head", "--block"]) @@ -694,18 +691,22 @@ ray.get(main_wait.release.remote()) ray.get(main_wait.acquire.remote()) ray.get(main_wait.acquire.remote()) - driver1_out = p1.stdout.read().decode("ascii").split("\n") - driver2_out = p2.stdout.read().decode("ascii").split("\n") + driver1_out = p1.stdout.read().decode("ascii") + driver2_out = p2.stdout.read().decode("ascii") + if sys.platform == "win32": + driver1_out = driver1_out.replace("\r", "") + driver2_out = driver2_out.replace("\r", "") + driver1_out_split = driver1_out.split("\n") + driver2_out_split = driver2_out.split("\n") - assert driver1_out[0][-1] == "1" - assert driver1_out[1][-1] == "2" - assert driver2_out[0][-1] == "3" - assert driver2_out[1][-1] == "4" + assert driver1_out_split[0][-1] == "1" + assert driver1_out_split[1][-1] == "2" + assert driver2_out_split[0][-1] == "3" + assert driver2_out_split[1][-1] == "4" if __name__ == "__main__": import pytest - import sys # Make subprocess happy in bazel. os.environ["LC_ALL"] = "en_US.UTF-8" os.environ["LANG"] = "en_US.UTF-8" diff --git a/python/ray/tests/test_projects.py b/python/ray/tests/test_projects.py index 48021fcf1..4e306546c 100644 --- a/python/ray/tests/test_projects.py +++ b/python/ray/tests/test_projects.py @@ -11,6 +11,7 @@ from contextlib import contextmanager from ray.projects.scripts import (session_start, session_commands, session_execute) +from ray.test_utils import check_call_ray import ray TEST_DIR = os.path.join( @@ -57,13 +58,14 @@ def test_project_root(): def test_project_validation(): - path = os.path.join(TEST_DIR, "project1") - subprocess.check_call(["ray", "project", "validate"], cwd=path) + with _chdir_and_back(os.path.join(TEST_DIR, "project1")): + check_call_ray(["project", "validate"]) def test_project_no_validation(): - with pytest.raises(subprocess.CalledProcessError): - subprocess.check_call(["ray", "project", "validate"], cwd=TEST_DIR) + with _chdir_and_back(TEST_DIR): + with pytest.raises(subprocess.CalledProcessError): + check_call_ray(["project", "validate"]) @contextmanager diff --git a/python/ray/tests/test_tempfile.py b/python/ray/tests/test_tempfile.py index b5036f342..9215d74eb 100644 --- a/python/ray/tests/test_tempfile.py +++ b/python/ray/tests/test_tempfile.py @@ -1,12 +1,12 @@ import os import shutil -import subprocess import sys import time import pytest import ray from ray.cluster_utils import Cluster +from ray.test_utils import check_call_ray def unix_socket_create_path(name): @@ -73,8 +73,8 @@ def test_tempdir(shutdown_only): def test_tempdir_commandline(): shutil.rmtree(ray.utils.get_ray_temp_dir(), ignore_errors=True) - subprocess.check_call([ - "ray", "start", "--head", "--temp-dir=" + os.path.join( + check_call_ray([ + "start", "--head", "--temp-dir=" + os.path.join( ray.utils.get_user_temp_dir(), "i_am_a_temp_dir2") ]) assert os.path.exists( @@ -82,7 +82,7 @@ def test_tempdir_commandline(): "i_am_a_temp_dir2")), "Specified temp dir not found." assert not os.path.exists( ray.utils.get_ray_temp_dir()), "Default temp dir should not exist." - subprocess.check_call(["ray", "stop"]) + check_call_ray(["stop"]) shutil.rmtree( os.path.join(ray.utils.get_user_temp_dir(), "i_am_a_temp_dir2"), ignore_errors=True) diff --git a/python/ray/util/sgd/torch/training_operator.py b/python/ray/util/sgd/torch/training_operator.py index 493d261e4..983409461 100644 --- a/python/ray/util/sgd/torch/training_operator.py +++ b/python/ray/util/sgd/torch/training_operator.py @@ -212,7 +212,7 @@ class TrainingOperator: updating the model. By default, this method implementation assumes that batches - are in (\*features, labels) format. So we also support multiple inputs + are in (\\*features, labels) format. So we also support multiple inputs model. If using amp/fp16 training, it will also scale the loss automatically. @@ -309,7 +309,7 @@ class TrainingOperator: You can override this method to provide arbitrary metrics. Same as ``train_batch``, this method implementation assumes that - batches are in (\*features, labels) format by default. So we also + batches are in (\\*features, labels) format by default. So we also support multiple inputs model. Args: