diff --git a/ci/travis/ci.sh b/ci/travis/ci.sh index 82286c8c2..2527a4c5b 100755 --- a/ci/travis/ci.sh +++ b/ci/travis/ci.sh @@ -145,8 +145,11 @@ test_python() { -python/ray/tests:test_advanced_3 # test_invalid_unicode_in_worker_log() fails on Windows -python/ray/tests:test_autoscaler_aws -python/ray/tests:test_component_failures + -python/ray/tests:test_component_failures_3 # timeout -python/ray/tests:test_basic_2 # hangs on shared cluster tests -python/ray/tests:test_basic_2_client_mode + -python/ray/tests:test_basic_3 # timeout + -python/ray/tests:test_basic_3_client_mode -python/ray/tests:test_cli -python/ray/tests:test_failure -python/ray/tests:test_global_gc diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 97980a641..6bb68b854 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -23,6 +23,7 @@ py_test_module_list( "test_autoscaling_policy.py", "test_basic.py", "test_basic_2.py", + "test_basic_3.py", "test_cancel.py", "test_cli.py", "test_component_failures_2.py", @@ -174,11 +175,12 @@ py_test_module_list( "test_advanced.py", "test_basic.py", "test_basic_2.py", + "test_basic_3.py", ], size = "medium", extra_srcs = SRCS, name_suffix = "_client_mode", - # TODO(barakmich): py_test will support env in Bazel 4.0.0... + # TODO(barakmich): py_test will support env in Bazel 4.0.0... # Until then, we can use tags. #env = {"RAY_CLIENT_MODE": "1"}, tags = ["exclusive", "client_tests"], diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 4475bb6ea..e33af42de 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -9,11 +9,7 @@ import numpy as np import pytest import ray.cluster_utils -from ray.test_utils import ( - client_test_enabled, - dicts_equal, - wait_for_pid_to_exit, -) +from ray.test_utils import (client_test_enabled) import ray @@ -170,126 +166,6 @@ def test_invalid_arguments(shutdown_only): x = 1 -def test_many_fractional_resources(shutdown_only): - ray.init(num_cpus=2, num_gpus=2, resources={"Custom": 2}) - - @ray.remote - def g(): - return 1 - - @ray.remote - def f(block, accepted_resources): - true_resources = { - resource: value[0][1] - for resource, value in ray.get_resource_ids().items() - } - if block: - ray.get(g.remote()) - return dicts_equal(true_resources, accepted_resources) - - # Check that the resource are assigned correctly. - result_ids = [] - for rand1, rand2, rand3 in np.random.uniform(size=(100, 3)): - resource_set = {"CPU": int(rand1 * 10000) / 10000} - result_ids.append(f._remote([False, resource_set], num_cpus=rand1)) - - resource_set = {"CPU": 1, "GPU": int(rand1 * 10000) / 10000} - result_ids.append(f._remote([False, resource_set], num_gpus=rand1)) - - resource_set = {"CPU": 1, "Custom": int(rand1 * 10000) / 10000} - result_ids.append( - f._remote([False, resource_set], resources={"Custom": rand1})) - - resource_set = { - "CPU": int(rand1 * 10000) / 10000, - "GPU": int(rand2 * 10000) / 10000, - "Custom": int(rand3 * 10000) / 10000 - } - result_ids.append( - f._remote( - [False, resource_set], - num_cpus=rand1, - num_gpus=rand2, - resources={"Custom": rand3})) - result_ids.append( - f._remote( - [True, resource_set], - num_cpus=rand1, - num_gpus=rand2, - resources={"Custom": rand3})) - assert all(ray.get(result_ids)) - - # Check that the available resources at the end are the same as the - # beginning. - stop_time = time.time() + 10 - correct_available_resources = False - while time.time() < stop_time: - available_resources = ray.available_resources() - if ("CPU" in available_resources - and ray.available_resources()["CPU"] == 2.0 - and "GPU" in available_resources - and ray.available_resources()["GPU"] == 2.0 - and "Custom" in available_resources - and ray.available_resources()["Custom"] == 2.0): - correct_available_resources = True - break - if not correct_available_resources: - assert False, "Did not get correct available resources." - - -def test_background_tasks_with_max_calls(shutdown_only): - ray.init(num_cpus=2) - - @ray.remote - def g(): - time.sleep(.1) - return 0 - - @ray.remote(max_calls=1, max_retries=0) - def f(): - return [g.remote()] - - nested = ray.get([f.remote() for _ in range(10)]) - - # Should still be able to retrieve these objects, since f's workers will - # wait for g to finish before exiting. - ray.get([x[0] for x in nested]) - - @ray.remote(max_calls=1, max_retries=0) - def f(): - return os.getpid(), g.remote() - - nested = ray.get([f.remote() for _ in range(10)]) - while nested: - pid, g_id = nested.pop(0) - ray.get(g_id) - del g_id - wait_for_pid_to_exit(pid) - - -@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") -def test_fair_queueing(shutdown_only): - ray.init(num_cpus=1, _system_config={"fair_queueing_enabled": 1}) - - @ray.remote - def h(): - return 0 - - @ray.remote - def g(): - return ray.get(h.remote()) - - @ray.remote - def f(): - return ray.get(g.remote()) - - # This will never finish without fair queueing of {f, g, h}: - # https://github.com/ray-project/ray/issues/3644 - ready, _ = ray.wait( - [f.remote() for _ in range(1000)], timeout=60.0, num_returns=1000) - assert len(ready) == 1000, len(ready) - - def test_put_get(shutdown_only): ray.init(num_cpus=0) diff --git a/python/ray/tests/test_basic_3.py b/python/ray/tests/test_basic_3.py new file mode 100644 index 000000000..3b4b7ac94 --- /dev/null +++ b/python/ray/tests/test_basic_3.py @@ -0,0 +1,142 @@ +# coding: utf-8 +import logging +import os +import sys +import time + +import numpy as np +import pytest + +import ray.cluster_utils +from ray.test_utils import ( + dicts_equal, + wait_for_pid_to_exit, +) + +import ray + +logger = logging.getLogger(__name__) + + +def test_many_fractional_resources(shutdown_only): + ray.init(num_cpus=2, num_gpus=2, resources={"Custom": 2}) + + @ray.remote + def g(): + return 1 + + @ray.remote + def f(block, accepted_resources): + true_resources = { + resource: value[0][1] + for resource, value in ray.get_resource_ids().items() + } + if block: + ray.get(g.remote()) + return dicts_equal(true_resources, accepted_resources) + + # Check that the resource are assigned correctly. + result_ids = [] + for rand1, rand2, rand3 in np.random.uniform(size=(100, 3)): + resource_set = {"CPU": int(rand1 * 10000) / 10000} + result_ids.append(f._remote([False, resource_set], num_cpus=rand1)) + + resource_set = {"CPU": 1, "GPU": int(rand1 * 10000) / 10000} + result_ids.append(f._remote([False, resource_set], num_gpus=rand1)) + + resource_set = {"CPU": 1, "Custom": int(rand1 * 10000) / 10000} + result_ids.append( + f._remote([False, resource_set], resources={"Custom": rand1})) + + resource_set = { + "CPU": int(rand1 * 10000) / 10000, + "GPU": int(rand2 * 10000) / 10000, + "Custom": int(rand3 * 10000) / 10000 + } + result_ids.append( + f._remote( + [False, resource_set], + num_cpus=rand1, + num_gpus=rand2, + resources={"Custom": rand3})) + result_ids.append( + f._remote( + [True, resource_set], + num_cpus=rand1, + num_gpus=rand2, + resources={"Custom": rand3})) + assert all(ray.get(result_ids)) + + # Check that the available resources at the end are the same as the + # beginning. + stop_time = time.time() + 10 + correct_available_resources = False + while time.time() < stop_time: + available_resources = ray.available_resources() + if ("CPU" in available_resources + and ray.available_resources()["CPU"] == 2.0 + and "GPU" in available_resources + and ray.available_resources()["GPU"] == 2.0 + and "Custom" in available_resources + and ray.available_resources()["Custom"] == 2.0): + correct_available_resources = True + break + if not correct_available_resources: + assert False, "Did not get correct available resources." + + +def test_background_tasks_with_max_calls(shutdown_only): + ray.init(num_cpus=2) + + @ray.remote + def g(): + time.sleep(.1) + return 0 + + @ray.remote(max_calls=1, max_retries=0) + def f(): + return [g.remote()] + + nested = ray.get([f.remote() for _ in range(10)]) + + # Should still be able to retrieve these objects, since f's workers will + # wait for g to finish before exiting. + ray.get([x[0] for x in nested]) + + @ray.remote(max_calls=1, max_retries=0) + def f(): + return os.getpid(), g.remote() + + nested = ray.get([f.remote() for _ in range(10)]) + while nested: + pid, g_id = nested.pop(0) + ray.get(g_id) + del g_id + wait_for_pid_to_exit(pid) + + +@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") +def test_fair_queueing(shutdown_only): + ray.init(num_cpus=1, _system_config={"fair_queueing_enabled": 1}) + + @ray.remote + def h(): + return 0 + + @ray.remote + def g(): + return ray.get(h.remote()) + + @ray.remote + def f(): + return ray.get(g.remote()) + + # This will never finish without fair queueing of {f, g, h}: + # https://github.com/ray-project/ray/issues/3644 + ready, _ = ray.wait( + [f.remote() for _ in range(1000)], timeout=60.0, num_returns=1000) + assert len(ready) == 1000, len(ready) + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", __file__]))