diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py new file mode 100644 index 000000000..f9c34d223 --- /dev/null +++ b/python/ray/tests/conftest.py @@ -0,0 +1,163 @@ +""" +This file defines the common pytest fixtures used in current directory. +""" + +from contextlib import contextmanager +import json +import pytest +import subprocess + +import ray +from ray.tests.cluster_utils import Cluster +from ray.tests.utils import run_and_get_output + + +@pytest.fixture +def shutdown_only(): + yield None + # The code after the yield will run as teardown code. + ray.shutdown() + + +def generate_internal_config_map(**kwargs): + internal_config = json.dumps(kwargs) + ray_kwargs = { + "_internal_config": internal_config, + } + return ray_kwargs + + +def get_default_fixure_internal_config(): + internal_config = json.dumps({ + "initial_reconstruction_timeout_milliseconds": 200, + "num_heartbeats_timeout": 10, + }) + return internal_config + + +def get_default_fixture_ray_kwargs(): + internal_config = get_default_fixure_internal_config() + ray_kwargs = { + "num_cpus": 1, + "object_store_memory": 10**8, + "_internal_config": internal_config, + } + return ray_kwargs + + +@contextmanager +def _ray_start(**kwargs): + init_kwargs = get_default_fixture_ray_kwargs() + init_kwargs.update(kwargs) + # Start the Ray processes. + address_info = ray.init(**init_kwargs) + yield address_info + # The code after the yield will run as teardown code. + ray.shutdown() + + +# The following fixture will start ray with 1 cpu. +@pytest.fixture +def ray_start_regular(request): + param = getattr(request, "param", {}) + with _ray_start(**param) as res: + yield res + + +@pytest.fixture +def ray_start_2_cpus(request): + param = getattr(request, "param", {}) + with _ray_start(num_cpus=2, **param) as res: + yield res + + +@pytest.fixture +def ray_start_10_cpus(request): + param = getattr(request, "param", {}) + with _ray_start(num_cpus=10, **param) as res: + yield res + + +@contextmanager +def _ray_start_cluster(**kwargs): + init_kwargs = get_default_fixture_ray_kwargs() + num_nodes = 0 + do_init = False + # num_nodes & do_init are not arguments for ray.init, so delete them. + if "num_nodes" in kwargs: + num_nodes = kwargs["num_nodes"] + del kwargs["num_nodes"] + if "do_init" in kwargs: + do_init = kwargs["do_init"] + del kwargs["do_init"] + elif num_nodes > 0: + do_init = True + init_kwargs.update(kwargs) + cluster = Cluster() + remote_nodes = [] + for _ in range(num_nodes): + remote_nodes.append(cluster.add_node(**init_kwargs)) + if do_init: + ray.init(redis_address=cluster.redis_address) + yield cluster + # The code after the yield will run as teardown code. + ray.shutdown() + cluster.shutdown() + + +# This fixture will start a cluster with empty nodes. +@pytest.fixture +def ray_start_cluster(request): + param = getattr(request, "param", {}) + with _ray_start_cluster(**param) as res: + yield res + + +@pytest.fixture +def ray_start_cluster_head(request): + param = getattr(request, "param", {}) + with _ray_start_cluster(do_init=True, num_nodes=1, **param) as res: + yield res + + +@pytest.fixture +def ray_start_cluster_2_nodes(request): + param = getattr(request, "param", {}) + with _ray_start_cluster(do_init=True, num_nodes=2, **param) as res: + yield res + + +@pytest.fixture +def ray_start_object_store_memory(request): + # Start the Ray processes. + store_size = request.param + internal_config = get_default_fixure_internal_config() + init_kwargs = { + "num_cpus": 1, + "_internal_config": internal_config, + "object_store_memory": store_size, + } + ray.init(**init_kwargs) + yield store_size + # The code after the yield will run as teardown code. + ray.shutdown() + + +@pytest.fixture +def call_ray_start(request): + parameter = getattr(request, "param", "ray start --head --num-cpus=1") + command_args = parameter.split(" ") + out = run_and_get_output(command_args) + # Get the redis address from the output. + redis_substring_prefix = "redis_address=\"" + redis_address_location = ( + out.find(redis_substring_prefix) + len(redis_substring_prefix)) + redis_address = out[redis_address_location:] + redis_address = redis_address.split("\"")[0] + + yield redis_address + + # Disconnect from the Ray cluster. + ray.shutdown() + # Kill the Ray cluster. + subprocess.Popen(["ray", "stop"]).wait() diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 2dfc35350..13d14f3d1 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -16,82 +16,13 @@ import ray import ray.ray_constants as ray_constants import ray.tests.utils import ray.tests.cluster_utils +from ray.tests.conftest import generate_internal_config_map from ray.tests.utils import ( - wait_for_errors, relevant_errors, + wait_for_errors, ) -@pytest.fixture -def ray_start_regular(request): - internal_config = { - "initial_reconstruction_timeout_milliseconds": 200, - "num_heartbeats_timeout": 10, - } - internal_config.update(getattr(request, "param", {})) - # Start the Ray processes. - ray.init( - num_cpus=1, - _internal_config=json.dumps(internal_config), - ) - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - -@pytest.fixture -def shutdown_only(): - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - -@pytest.fixture() -def ray_start_cluster(): - cluster = ray.tests.cluster_utils.Cluster() - yield cluster - - # The code after the yield will run as teardown code. - ray.shutdown() - cluster.shutdown() - - -@pytest.fixture() -def two_node_cluster(): - internal_config = json.dumps({ - "initial_reconstruction_timeout_milliseconds": 200, - "num_heartbeats_timeout": 10, - }) - cluster = ray.tests.cluster_utils.Cluster() - for _ in range(2): - remote_node = cluster.add_node( - num_cpus=1, _internal_config=internal_config) - ray.init(redis_address=cluster.redis_address) - yield cluster, remote_node - - # The code after the yield will run as teardown code. - ray.shutdown() - cluster.shutdown() - - -@pytest.fixture -def head_node_cluster(request): - timeout = getattr(request, "param", 200) - cluster = ray.tests.cluster_utils.Cluster( - initialize_head=True, - connect=True, - head_node_args={ - "_internal_config": json.dumps({ - "initial_reconstruction_timeout_milliseconds": timeout, - "num_heartbeats_timeout": 10, - }) - }) - yield cluster - # The code after the yield will run as teardown code. - ray.shutdown() - cluster.shutdown() - - @pytest.fixture def ray_checkpointable_actor_cls(request): checkpoint_dir = "/tmp/ray_temp_checkpoint_dir/" @@ -610,16 +541,7 @@ def test_multiple_actors(ray_start_regular): assert v == num_actors * [j + 1] -@pytest.fixture -def ray_start_bigger(): - # Start the Ray processes. - ray.init(num_cpus=10) - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - -def test_remote_function_within_actor(ray_start_bigger): +def test_remote_function_within_actor(ray_start_10_cpus): # Make sure we can use remote funtions within actors. # Create some values to close over. @@ -667,7 +589,7 @@ def test_remote_function_within_actor(ray_start_bigger): range(1, 6)) -def test_define_actor_within_actor(ray_start_bigger): +def test_define_actor_within_actor(ray_start_10_cpus): # Make sure we can use remote funtions within actors. @ray.remote @@ -694,7 +616,7 @@ def test_define_actor_within_actor(ray_start_bigger): assert ray.get(actor1.get_values.remote(5)) == (3, 5) -def test_use_actor_within_actor(ray_start_bigger): +def test_use_actor_within_actor(ray_start_10_cpus): # Make sure we can use actors within actors. @ray.remote @@ -718,7 +640,7 @@ def test_use_actor_within_actor(ray_start_bigger): assert ray.get(actor2.get_values.remote(5)) == (3, 4) -def test_define_actor_within_remote_function(ray_start_bigger): +def test_define_actor_within_remote_function(ray_start_10_cpus): # Make sure we can define and actors within remote funtions. @ray.remote @@ -739,7 +661,7 @@ def test_define_actor_within_remote_function(ray_start_bigger): [f.remote(i, 20) for i in range(10)]) == [20 * [i] for i in range(10)] -def test_use_actor_within_remote_function(ray_start_bigger): +def test_use_actor_within_remote_function(ray_start_10_cpus): # Make sure we can create and use actors within remote funtions. @ray.remote @@ -758,7 +680,7 @@ def test_use_actor_within_remote_function(ray_start_bigger): assert ray.get(f.remote(3)) == 3 -def test_actor_import_counter(ray_start_bigger): +def test_actor_import_counter(ray_start_10_cpus): # This is mostly a test of the export counters to make sure that when # an actor is imported, all of the necessary remote functions have been # imported. @@ -1379,8 +1301,9 @@ def test_blocking_actor_task(shutdown_only): assert remaining_ids == [x_id] -def test_exception_raised_when_actor_node_dies(head_node_cluster): - remote_node = head_node_cluster.add_node() +def test_exception_raised_when_actor_node_dies(ray_start_cluster_head): + cluster = ray_start_cluster_head + remote_node = cluster.add_node() @ray.remote class Counter(object): @@ -1401,7 +1324,7 @@ def test_exception_raised_when_actor_node_dies(head_node_cluster): actor = Counter.remote() # Kill the second node. - head_node_cluster.remove_node(remote_node) + cluster.remove_node(remote_node) # Submit some new actor tasks both before and after the node failure is # detected. Make sure that getting the result raises an exception. @@ -1419,8 +1342,9 @@ def test_exception_raised_when_actor_node_dies(head_node_cluster): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Hanging with new GCS API.") -def test_actor_init_fails(head_node_cluster): - remote_node = head_node_cluster.add_node() +def test_actor_init_fails(ray_start_cluster_head): + cluster = ray_start_cluster_head + remote_node = cluster.add_node() @ray.remote(max_reconstructions=1) class Counter(object): @@ -1436,16 +1360,17 @@ def test_actor_init_fails(head_node_cluster): # Allow some time to forward the actor creation tasks to the other node. time.sleep(0.1) # Kill the second node. - head_node_cluster.remove_node(remote_node) + cluster.remove_node(remote_node) # Get all of the results results = ray.get([actor.inc.remote() for actor in actors]) assert results == [1 for actor in actors] -def test_reconstruction_suppression(head_node_cluster): +def test_reconstruction_suppression(ray_start_cluster_head): + cluster = ray_start_cluster_head num_nodes = 10 - worker_nodes = [head_node_cluster.add_node() for _ in range(num_nodes)] + worker_nodes = [cluster.add_node() for _ in range(num_nodes)] @ray.remote(max_reconstructions=1) class Counter(object): @@ -1465,7 +1390,7 @@ def test_reconstruction_suppression(head_node_cluster): ray.get([actor.inc.remote() for actor in actors]) # Kill a node. - head_node_cluster.remove_node(worker_nodes[0]) + cluster.remove_node(worker_nodes[0]) # Submit several tasks per actor. These should be randomly scheduled to the # nodes, so that multiple nodes will detect and try to reconstruct the @@ -1537,8 +1462,8 @@ def setup_counter_actor(test_checkpoint=False, @pytest.mark.skip("Fork/join consistency not yet implemented.") -def test_distributed_handle(two_node_cluster): - cluster = two_node_cluster +def test_distributed_handle(ray_start_cluster_2_nodes): + cluster = ray_start_cluster_2_nodes counter, ids = setup_counter_actor(test_checkpoint=False) @ray.remote @@ -1575,8 +1500,8 @@ def test_distributed_handle(two_node_cluster): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Hanging with new GCS API.") -def test_remote_checkpoint_distributed_handle(two_node_cluster): - cluster = two_node_cluster +def test_remote_checkpoint_distributed_handle(ray_start_cluster_2_nodes): + cluster = ray_start_cluster_2_nodes counter, ids = setup_counter_actor(test_checkpoint=True) @ray.remote @@ -1616,8 +1541,8 @@ def test_remote_checkpoint_distributed_handle(two_node_cluster): @pytest.mark.skip("Fork/join consistency not yet implemented.") -def test_checkpoint_distributed_handle(two_node_cluster): - cluster = two_node_cluster +def test_checkpoint_distributed_handle(ray_start_cluster_2_nodes): + cluster = ray_start_cluster_2_nodes counter, ids = setup_counter_actor(test_checkpoint=True) @ray.remote @@ -1719,16 +1644,17 @@ def _test_nondeterministic_reconstruction( @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Currently doesn't work with the new GCS.") -def test_nondeterministic_reconstruction(two_node_cluster): - cluster = two_node_cluster +def test_nondeterministic_reconstruction(ray_start_cluster_2_nodes): + cluster = ray_start_cluster_2_nodes _test_nondeterministic_reconstruction(cluster, 10, 100, 10) @pytest.mark.skip("Nondeterministic reconstruction currently not supported " "when there are concurrent forks that didn't finish " "initial execution.") -def test_nondeterministic_reconstruction_concurrent_forks(two_node_cluster): - cluster = two_node_cluster +def test_nondeterministic_reconstruction_concurrent_forks( + ray_start_cluster_2_nodes): + cluster = ray_start_cluster_2_nodes _test_nondeterministic_reconstruction(cluster, 10, 100, 1) @@ -2098,7 +2024,11 @@ def test_creating_more_actors_than_resources(shutdown_only): ray.get(results) -def test_actor_eviction(shutdown_only): +@pytest.mark.parametrize( + "ray_start_object_store_memory", [10**8], indirect=True) +def test_actor_eviction(ray_start_object_store_memory): + object_store_memory = ray_start_object_store_memory + @ray.remote class Actor(object): def __init__(self): @@ -2107,13 +2037,6 @@ def test_actor_eviction(shutdown_only): def create_object(self, size): return np.random.rand(size) - object_store_memory = 10**8 - ray.init( - object_store_memory=object_store_memory, - _internal_config=json.dumps({ - "initial_reconstruction_timeout_milliseconds": 200 - })) - a = Actor.remote() # Submit enough methods on the actor so that they exceed the size of the # object store. @@ -2189,9 +2112,9 @@ def test_actor_reconstruction(ray_start_regular): ray.get(actor.increase.remote()) -def test_actor_reconstruction_on_node_failure(head_node_cluster): +def test_actor_reconstruction_on_node_failure(ray_start_cluster_head): """Test actor reconstruction when node dies unexpectedly.""" - cluster = head_node_cluster + cluster = ray_start_cluster_head max_reconstructions = 3 # Add a few nodes to the cluster. # Use custom resource to make sure the actor is only created on worker @@ -2253,8 +2176,14 @@ def test_actor_reconstruction_on_node_failure(head_node_cluster): # this test. Because if this value is too small, suprious task reconstruction # may happen and cause the test fauilure. If the value is too large, this test # could be very slow. We can remove this once we support dynamic timeout. -@pytest.mark.parametrize("head_node_cluster", [1000], indirect=True) -def test_multiple_actor_reconstruction(head_node_cluster): +@pytest.mark.parametrize( + "ray_start_cluster_head", [ + generate_internal_config_map( + initial_reconstruction_timeout_milliseconds=1000) + ], + indirect=True) +def test_multiple_actor_reconstruction(ray_start_cluster_head): + cluster = ray_start_cluster_head # This test can be made more stressful by increasing the numbers below. # The total number of actors created will be # num_actors_at_a_time * num_nodes. @@ -2263,7 +2192,7 @@ def test_multiple_actor_reconstruction(head_node_cluster): num_function_calls_at_a_time = 10 worker_nodes = [ - head_node_cluster.add_node( + cluster.add_node( num_cpus=3, _internal_config=json.dumps({ "initial_reconstruction_timeout_milliseconds": 200, @@ -2303,7 +2232,7 @@ def test_multiple_actor_reconstruction(head_node_cluster): for _ in range(num_function_calls_at_a_time): result_ids[actor].append(actor.inc.remote(j**2 * 0.000001)) # Kill a node. - head_node_cluster.remove_node(node) + cluster.remove_node(node) # Run some more methods. for j in range(len(actors)): @@ -2401,15 +2330,16 @@ def test_remote_checkpointing(ray_start_regular, ray_checkpointable_actor_cls): assert ray.get(actor.was_resumed_from_checkpoint.remote()) is True -def test_checkpointing_on_node_failure(two_node_cluster, +def test_checkpointing_on_node_failure(ray_start_cluster_2_nodes, ray_checkpointable_actor_cls): """Test actor checkpointing on a remote node.""" # Place the actor on the remote node. - cluster, remote_node = two_node_cluster + cluster = ray_start_cluster_2_nodes + remote_node = [node for node in cluster.worker_nodes] actor_cls = ray.remote(max_reconstructions=1)(ray_checkpointable_actor_cls) actor = actor_cls.remote() while (ray.get(actor.local_plasma.remote()) != - remote_node.plasma_store_socket_name): + remote_node[0].plasma_store_socket_name): actor = actor_cls.remote() # Call increase several times. @@ -2420,7 +2350,7 @@ def test_checkpointing_on_node_failure(two_node_cluster, # Assert that the actor wasn't resumed from a checkpoint. assert ray.get(actor.was_resumed_from_checkpoint.remote()) is False # Kill actor process. - cluster.remove_node(remote_node) + cluster.remove_node(remote_node[0]) # Assert that the actor was resumed from a checkpoint and its value is # still correct. assert ray.get(actor.get.remote()) == expected @@ -2519,9 +2449,7 @@ def test_checkpointing_load_exception(ray_start_regular, "ray_start_regular", # This overwrite currently isn't effective, # see https://github.com/ray-project/ray/issues/3926. - [{ - "num_actor_checkpoints_to_keep": 20 - }], + [generate_internal_config_map(num_actor_checkpoints_to_keep=20)], indirect=True, ) def test_deleting_actor_checkpoint(ray_start_regular): diff --git a/python/ray/tests/test_array.py b/python/ray/tests/test_array.py index 1ddd9d45c..a0b600737 100644 --- a/python/ray/tests/test_array.py +++ b/python/ray/tests/test_array.py @@ -17,19 +17,12 @@ if sys.version_info >= (3, 0): @pytest.fixture -def ray_start_regular(): - for module in [ - ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg - ]: - reload(module) - # Start the Ray processes. - ray.init(num_cpus=2) - yield None - # The code after the yield will run as teardown code. - ray.shutdown() +def reload_modules(): + modules = [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg] + [reload(module) for module in modules] -def test_remote_array_methods(ray_start_regular): +def test_remote_array_methods(ray_start_2_cpus, reload_modules): # test eye object_id = ra.eye.remote(3) val = ray.get(object_id) @@ -56,7 +49,7 @@ def test_remote_array_methods(ray_start_regular): assert_almost_equal(np.dot(q_val, r_val), a_val) -def test_distributed_array_assemble(ray_start_regular): +def test_distributed_array_assemble(ray_start_2_cpus, reload_modules): a = ra.ones.remote([da.BLOCK_SIZE, da.BLOCK_SIZE]) b = ra.zeros.remote([da.BLOCK_SIZE, da.BLOCK_SIZE]) x = da.DistArray([2 * da.BLOCK_SIZE, da.BLOCK_SIZE], np.array([[a], [b]])) @@ -68,25 +61,7 @@ def test_distributed_array_assemble(ray_start_regular): ])) -@pytest.fixture -def ray_start_two_nodes(): - for module in [ - ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg - ]: - reload(module) - # Start the Ray processes. - cluster = ray.tests.cluster_utils.Cluster() - for _ in range(2): - cluster.add_node(num_cpus=10) - ray.init(redis_address=cluster.redis_address) - yield None - - # The code after the yield will run as teardown code. - ray.shutdown() - cluster.shutdown() - - -def test_distributed_array_methods(ray_start_two_nodes): +def test_distributed_array_methods(ray_start_cluster_2_nodes, reload_modules): x = da.zeros.remote([9, 25, 51], "float") assert_equal(ray.get(da.assemble.remote(x)), np.zeros([9, 25, 51])) diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 4c05e7466..cc28f0374 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -33,23 +33,7 @@ from ray.utils import _random_string logger = logging.getLogger(__name__) -@pytest.fixture -def ray_start(): - # Start the Ray processes. - ray.init(num_cpus=1) - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - -@pytest.fixture -def shutdown_only(): - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - -def test_simple_serialization(ray_start): +def test_simple_serialization(ray_start_regular): primitive_objects = [ # Various primitive types. 0, @@ -116,7 +100,7 @@ def test_simple_serialization(ray_start): assert type(obj) == type(new_obj_2) -def test_complex_serialization(ray_start): +def test_complex_serialization(ray_start_regular): def assert_equal(obj1, obj2): module_numpy = (type(obj1).__module__ == np.__name__ or type(obj2).__module__ == np.__name__) @@ -319,7 +303,7 @@ def test_complex_serialization(ray_start): assert_equal(obj, ray.get(ray.put(obj))) -def test_ray_recursive_objects(ray_start): +def test_ray_recursive_objects(ray_start_regular): class ClassA(object): pass @@ -347,7 +331,7 @@ def test_ray_recursive_objects(ray_start): ray.put(obj) -def test_passing_arguments_by_value_out_of_the_box(ray_start): +def test_passing_arguments_by_value_out_of_the_box(ray_start_regular): @ray.remote def f(x): return x @@ -379,7 +363,7 @@ def test_passing_arguments_by_value_out_of_the_box(ray_start): ray.get(ray.put(Foo)) -def test_putting_object_that_closes_over_object_id(ray_start): +def test_putting_object_that_closes_over_object_id(ray_start_regular): # This test is here to prevent a regression of # https://github.com/ray-project/ray/issues/1317. @@ -422,9 +406,7 @@ def test_put_get(shutdown_only): assert value_before == value_after -def test_custom_serializers(shutdown_only): - ray.init(num_cpus=1) - +def test_custom_serializers(ray_start_regular): class Foo(object): def __init__(self): self.x = 3 @@ -454,7 +436,7 @@ def test_custom_serializers(shutdown_only): assert ray.get(f.remote()) == ((3, "string1", Bar.__name__), "string2") -def test_serialization_final_fallback(ray_start): +def test_serialization_final_fallback(ray_start_regular): pytest.importorskip("catboost") # This test will only run when "catboost" is installed. from catboost import CatBoostClassifier @@ -471,9 +453,7 @@ def test_serialization_final_fallback(ray_start): reconstructed_model.get_params().items()) -def test_register_class(shutdown_only): - ray.init(num_cpus=2) - +def test_register_class(ray_start_2_cpus): # Check that putting an object of a class that has not been registered # throws an exception. class TempClass(object): @@ -616,7 +596,7 @@ def test_register_class(shutdown_only): assert not hasattr(c2, "method1") -def test_keyword_args(shutdown_only): +def test_keyword_args(ray_start_regular): @ray.remote def keyword_fct1(a, b="hello"): return "{} {}".format(a, b) @@ -629,8 +609,6 @@ def test_keyword_args(shutdown_only): def keyword_fct3(a, b, c="hello", d="world"): return "{} {} {} {}".format(a, b, c, d) - ray.init(num_cpus=1) - x = keyword_fct1.remote(1) assert ray.get(x) == "1 hello" x = keyword_fct1.remote(1, "hi") @@ -886,8 +864,7 @@ def test_submit_api(shutdown_only): assert ray.get([id1, id2, id3, id4]) == [0, 1, "test", 2] -def test_get_multiple(shutdown_only): - ray.init(num_cpus=1) +def test_get_multiple(ray_start_regular): object_ids = [ray.put(i) for i in range(10)] assert ray.get(object_ids) == list(range(10)) @@ -898,8 +875,7 @@ def test_get_multiple(shutdown_only): assert results == indices -def test_get_multiple_experimental(shutdown_only): - ray.init(num_cpus=1) +def test_get_multiple_experimental(ray_start_regular): object_ids = [ray.put(i) for i in range(10)] object_ids_tuple = tuple(object_ids) @@ -909,8 +885,7 @@ def test_get_multiple_experimental(shutdown_only): assert ray.experimental.get(object_ids_nparray) == list(range(10)) -def test_get_dict(shutdown_only): - ray.init(num_cpus=1) +def test_get_dict(ray_start_regular): d = {str(i): ray.put(i) for i in range(5)} for i in range(5, 10): d[str(i)] = i @@ -919,9 +894,7 @@ def test_get_dict(shutdown_only): assert result == expected -def test_wait(shutdown_only): - ray.init(num_cpus=1) - +def test_wait(ray_start_regular): @ray.remote def f(delay): time.sleep(delay) @@ -976,9 +949,7 @@ def test_wait(shutdown_only): ray.wait([1]) -def test_wait_iterables(shutdown_only): - ray.init(num_cpus=1) - +def test_wait_iterables(ray_start_regular): @ray.remote def f(delay): time.sleep(delay) @@ -1075,9 +1046,7 @@ def test_caching_functions_to_run(shutdown_only): ray.worker.global_worker.run_function_on_all_workers(f) -def test_running_function_on_all_workers(shutdown_only): - ray.init(num_cpus=1) - +def test_running_function_on_all_workers(ray_start_regular): def f(worker_info): sys.path.append("fake_directory") @@ -1104,9 +1073,7 @@ def test_running_function_on_all_workers(shutdown_only): assert "fake_directory" not in ray.get(get_path2.remote()) -def test_profiling_api(shutdown_only): - ray.init(num_cpus=2) - +def test_profiling_api(ray_start_2_cpus): @ray.remote def f(): with ray.profile( @@ -1150,16 +1117,6 @@ def test_profiling_api(shutdown_only): break -@pytest.fixture() -def ray_start_cluster(): - cluster = ray.tests.cluster_utils.Cluster() - yield cluster - - # The code after the yield will run as teardown code. - ray.shutdown() - cluster.shutdown() - - def test_wait_cluster(ray_start_cluster): cluster = ray_start_cluster cluster.add_node(num_cpus=1, resources={"RemoteResource": 1}) @@ -1227,10 +1184,9 @@ def test_object_transfer_dump(ray_start_cluster): }) == num_nodes -def test_identical_function_names(shutdown_only): +def test_identical_function_names(ray_start_regular): # Define a bunch of remote functions and make sure that we don't # accidentally call an older version. - ray.init(num_cpus=1) num_calls = 200 @@ -1294,8 +1250,7 @@ def test_identical_function_names(shutdown_only): assert result_values == num_calls * [5] -def test_illegal_api_calls(shutdown_only): - ray.init(num_cpus=1) +def test_illegal_api_calls(ray_start_regular): # Verify that we cannot call put on an ObjectID. x = ray.put(1) @@ -1310,10 +1265,9 @@ def test_illegal_api_calls(shutdown_only): # because plasma client isn't thread-safe. This needs to be fixed from the # Arrow side. See #4107 for relevant discussions. @pytest.mark.skipif(six.PY2, reason="Doesn't work in Python 2.") -def test_multithreading(shutdown_only): +def test_multithreading(ray_start_2_cpus): # This test requires at least 2 CPUs to finish since the worker does not # release resources when joining the threads. - ray.init(num_cpus=2) def run_test_in_multi_threads(test_case, num_threads=10, num_repeats=25): """A helper function that runs test cases in multiple threads.""" @@ -2273,9 +2227,7 @@ def test_specific_gpus(save_gpu_ids_shutdown_only): ray.get([g.remote() for _ in range(100)]) -def test_blocking_tasks(shutdown_only): - ray.init(num_cpus=1) - +def test_blocking_tasks(ray_start_regular): @ray.remote def f(i, j): return (i, j) @@ -2310,9 +2262,7 @@ def test_blocking_tasks(shutdown_only): ray.get(sleep.remote()) -def test_max_call_tasks(shutdown_only): - ray.init(num_cpus=1) - +def test_max_call_tasks(ray_start_regular): @ray.remote(max_calls=1) def f(): return os.getpid() @@ -2692,9 +2642,7 @@ def test_wait_reconstruction(shutdown_only): assert len(ready_ids) == 1 -def test_ray_setproctitle(shutdown_only): - ray.init(num_cpus=2) - +def test_ray_setproctitle(ray_start_2_cpus): @ray.remote class UniqueName(object): def __init__(self): @@ -2739,9 +2687,7 @@ def test_duplicate_error_messages(shutdown_only): @pytest.mark.skipif( os.getenv("TRAVIS") is None, reason="This test should only be run on Travis.") -def test_ray_stack(shutdown_only): - ray.init(num_cpus=2) - +def test_ray_stack(ray_start_2_cpus): def unique_name_1(): time.sleep(1000) @@ -2797,9 +2743,7 @@ def test_socket_dir_not_existing(shutdown_only): ray.init(num_cpus=1, raylet_socket_name=temp_raylet_socket_name) -def test_raylet_is_robust_to_random_messages(shutdown_only): - - ray.init(num_cpus=1) +def test_raylet_is_robust_to_random_messages(ray_start_regular): node_manager_address = None node_manager_port = None for client in ray.global_state.client_table(): @@ -2820,7 +2764,7 @@ def test_raylet_is_robust_to_random_messages(shutdown_only): assert ray.get(f.remote()) == 1 -def test_non_ascii_comment(ray_start): +def test_non_ascii_comment(ray_start_regular): @ray.remote def f(): # 日本語 Japanese comment diff --git a/python/ray/tests/test_batched_queue.py b/python/ray/tests/test_batched_queue.py index 62a7da234..0a52e4144 100644 --- a/python/ray/tests/test_batched_queue.py +++ b/python/ray/tests/test_batched_queue.py @@ -2,22 +2,12 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import pytest import time import ray from ray.experimental.streaming.batched_queue import BatchedQueue -@pytest.fixture -def ray_start(): - # Start the Ray processes. - ray.init(num_cpus=2) - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - @ray.remote class Reader(object): def __init__(self, queue): @@ -36,7 +26,7 @@ class Reader(object): time.sleep(0.001) -def test_batched_queue(ray_start): +def test_batched_queue(ray_start_regular): # Batched queue parameters max_queue_size = 10000 # Max number of batches in queue max_batch_size = 1000 # Max number of elements per batch diff --git a/python/ray/tests/test_component_failures.py b/python/ray/tests/test_component_failures.py index 4d9bbd80d..a7a6eeb9c 100644 --- a/python/ray/tests/test_component_failures.py +++ b/python/ray/tests/test_component_failures.py @@ -3,7 +3,6 @@ from __future__ import division from __future__ import print_function import os -import json import signal import sys import time @@ -17,43 +16,12 @@ from ray.tests.cluster_utils import Cluster from ray.tests.utils import run_string_as_driver_nonblocking -@pytest.fixture -def shutdown_only(): - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - -@pytest.fixture -def ray_start_cluster(): - node_args = { - "num_cpus": 4, - "_internal_config": json.dumps({ - "initial_reconstruction_timeout_milliseconds": 1000, - "num_heartbeats_timeout": 10 - }) - } - # Start with 3 worker nodes and 4 cores each. - cluster = Cluster( - initialize_head=True, connect=True, head_node_args=node_args) - workers = [] - for _ in range(3): - workers.append(cluster.add_node(**node_args)) - cluster.wait_for_nodes() - yield cluster - ray.shutdown() - cluster.shutdown() - - # This test checks that when a worker dies in the middle of a get, the plasma # store and raylet will not die. @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Not working with new GCS API.") -def test_dying_worker_get(shutdown_only): - # Start the Ray processes. - ray.init(num_cpus=2) - +def test_dying_worker_get(ray_start_2_cpus): @ray.remote def sleep_forever(): time.sleep(10**6) @@ -100,9 +68,9 @@ def test_dying_worker_get(shutdown_only): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Not working with new GCS API.") -def test_dying_driver_get(shutdown_only): +def test_dying_driver_get(ray_start_regular): # Start the Ray processes. - address_info = ray.init(num_cpus=1) + address_info = ray_start_regular @ray.remote def sleep_forever(): @@ -143,9 +111,7 @@ ray.get(ray.ObjectID(ray.utils.hex_to_binary("{}"))) @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Not working with new GCS API.") -def test_dying_worker_wait(shutdown_only): - ray.init(num_cpus=2) - +def test_dying_worker_wait(ray_start_2_cpus): @ray.remote def sleep_forever(): time.sleep(10**6) @@ -185,9 +151,9 @@ def test_dying_worker_wait(shutdown_only): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Not working with new GCS API.") -def test_dying_driver_wait(shutdown_only): +def test_dying_driver_wait(ray_start_regular): # Start the Ray processes. - address_info = ray.init(num_cpus=1) + address_info = ray_start_regular @ray.remote def sleep_forever(): @@ -283,28 +249,6 @@ def test_worker_failed(ray_start_workers_separate_multinode): pass -@pytest.fixture -def ray_initialize_cluster(): - # Start with 4 workers and 4 cores. - num_nodes = 4 - num_workers_per_scheduler = 8 - - cluster = Cluster() - for _ in range(num_nodes): - cluster.add_node( - num_cpus=num_workers_per_scheduler, - _internal_config=json.dumps({ - "initial_reconstruction_timeout_milliseconds": 1000, - "num_heartbeats_timeout": 10, - })) - ray.init(redis_address=cluster.redis_address) - - yield cluster - - ray.shutdown() - cluster.shutdown() - - def _test_component_failed(cluster, component_type): """Kill a component on all worker nodes and check workload succeeds.""" # Submit many tasks with many dependencies. @@ -364,8 +308,13 @@ def check_components_alive(cluster, component_type, check_component_alive): assert not process.poll() is None -def test_raylet_failed(ray_initialize_cluster): - cluster = ray_initialize_cluster +@pytest.mark.parametrize( + "ray_start_cluster", [{ + "num_cpus": 8, + "num_nodes": 4 + }], indirect=True) +def test_raylet_failed(ray_start_cluster): + cluster = ray_start_cluster # Kill all local schedulers on worker nodes. _test_component_failed(cluster, ray_constants.PROCESS_TYPE_RAYLET) @@ -377,8 +326,13 @@ def test_raylet_failed(ray_initialize_cluster): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Hanging with new GCS API.") -def test_plasma_store_failed(ray_initialize_cluster): - cluster = ray_initialize_cluster +@pytest.mark.parametrize( + "ray_start_cluster", [{ + "num_cpus": 8, + "num_nodes": 4 + }], indirect=True) +def test_plasma_store_failed(ray_start_cluster): + cluster = ray_start_cluster # Kill all plasma stores on worker nodes. _test_component_failed(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE) @@ -388,6 +342,13 @@ def test_plasma_store_failed(ray_initialize_cluster): check_components_alive(cluster, ray_constants.PROCESS_TYPE_RAYLET, False) +@pytest.mark.parametrize( + "ray_start_cluster", [{ + "num_cpus": 4, + "num_nodes": 3, + "do_init": True + }], + indirect=True) def test_actor_creation_node_failure(ray_start_cluster): # TODO(swang): Refactor test_raylet_failed, etc to reuse the below code. cluster = ray_start_cluster @@ -434,8 +395,7 @@ def test_actor_creation_node_failure(ray_start_cluster): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Hanging with new GCS API.") -def test_driver_lives_sequential(shutdown_only): - ray.init(num_cpus=1) +def test_driver_lives_sequential(ray_start_regular): ray.worker._global_node.kill_raylet() ray.worker._global_node.kill_plasma_store() ray.worker._global_node.kill_log_monitor() @@ -448,8 +408,7 @@ def test_driver_lives_sequential(shutdown_only): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Hanging with new GCS API.") -def test_driver_lives_parallel(shutdown_only): - ray.init(num_cpus=1) +def test_driver_lives_parallel(ray_start_regular): all_processes = ray.worker._global_node.all_processes process_infos = (all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] + all_processes[ray_constants.PROCESS_TYPE_RAYLET] + diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 81d9c9380..91b3228c6 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -17,23 +17,10 @@ import ray import ray.ray_constants as ray_constants from ray.utils import _random_string from ray.tests.cluster_utils import Cluster -from ray.tests.utils import (relevant_errors, wait_for_errors) - - -@pytest.fixture -def ray_start_regular(): - # Start the Ray processes. - ray.init(num_cpus=2) - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - -@pytest.fixture -def shutdown_only(): - yield None - # The code after the yield will run as teardown code. - ray.shutdown() +from ray.tests.utils import ( + relevant_errors, + wait_for_errors, +) def test_failed_task(ray_start_regular): @@ -89,7 +76,7 @@ def test_failed_task(ray_start_regular): assert False -def test_fail_importing_remote_function(ray_start_regular): +def test_fail_importing_remote_function(ray_start_2_cpus): # Create the contents of a temporary Python file. temporary_python_file = """ def temporary_helper_function(): @@ -129,7 +116,7 @@ def temporary_helper_function(): sys.path.pop(-1) -def test_failed_function_to_run(ray_start_regular): +def test_failed_function_to_run(ray_start_2_cpus): def f(worker): if ray.worker.global_worker.mode == ray.WORKER_MODE: raise Exception("Function to run failed.") @@ -386,17 +373,9 @@ def test_actor_scope_or_intentionally_killed_message(ray_start_regular): "Should not have propogated an error - {}".format(ray.error_info())) -@pytest.fixture -def ray_start_object_store_memory(): - # Start the Ray processes. - store_size = 10**6 - ray.init(num_cpus=1, object_store_memory=store_size) - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - @pytest.mark.skip("This test does not work yet.") +@pytest.mark.parametrize( + "ray_start_object_store_memory", [10**6], indirect=True) def test_put_error1(ray_start_object_store_memory): num_objects = 3 object_size = 4 * 10**5 @@ -439,6 +418,8 @@ def test_put_error1(ray_start_object_store_memory): @pytest.mark.skip("This test does not work yet.") +@pytest.mark.parametrize( + "ray_start_object_store_memory", [10**6], indirect=True) def test_put_error2(ray_start_object_store_memory): # This is the same as the previous test, but it calls ray.put directly. num_objects = 3 @@ -608,8 +589,8 @@ def test_warning_for_too_many_nested_tasks(shutdown_only): wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 1) -def test_redis_module_failure(shutdown_only): - address_info = ray.init(num_cpus=1) +def test_redis_module_failure(ray_start_regular): + address_info = ray_start_regular redis_address = address_info["redis_address"] redis_address = redis_address.split(":") assert len(redis_address) == 2 @@ -653,28 +634,10 @@ def test_redis_module_failure(shutdown_only): run_one_command("RAY.SET_ADD", 1, 1, 3, 1) -@pytest.fixture -def ray_start_two_nodes(): - # Start the Ray processes. - cluster = Cluster() - for _ in range(2): - cluster.add_node( - num_cpus=0, - _internal_config=json.dumps({ - "num_heartbeats_timeout": 40 - })) - ray.init(redis_address=cluster.redis_address) - - yield cluster - # The code after the yield will run as teardown code. - ray.shutdown() - cluster.shutdown() - - # Note that this test will take at least 10 seconds because it must wait for # the monitor to detect enough missed heartbeats. -def test_warning_for_dead_node(ray_start_two_nodes): - cluster = ray_start_two_nodes +def test_warning_for_dead_node(ray_start_cluster_2_nodes): + cluster = ray_start_cluster_2_nodes cluster.wait_for_nodes() client_ids = {item["ClientID"] for item in ray.global_state.client_table()} diff --git a/python/ray/tests/test_global_state.py b/python/ray/tests/test_global_state.py index 11920a21c..ee3d304c2 100644 --- a/python/ray/tests/test_global_state.py +++ b/python/ray/tests/test_global_state.py @@ -2,7 +2,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import json import pytest try: import pytest_timeout @@ -11,33 +10,6 @@ except ImportError: import time import ray -from ray.tests.cluster_utils import Cluster - - -@pytest.fixture -def ray_start(): - # Start the Ray processes. - ray.init(num_cpus=1) - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - -@pytest.fixture -def cluster_start(): - # Start the Ray processes. - cluster = Cluster( - initialize_head=True, - connect=True, - head_node_args={ - "num_cpus": 1, - "_internal_config": json.dumps({ - "num_heartbeats_timeout": 10 - }) - }) - yield cluster - ray.shutdown() - cluster.shutdown() # TODO(rliaw): The proper way to do this is to have the pytest config setup. @@ -45,7 +17,7 @@ def cluster_start(): pytest_timeout is None, reason="Timeout package not installed; skipping test that may hang.") @pytest.mark.timeout(10) -def test_replenish_resources(ray_start): +def test_replenish_resources(ray_start_regular): cluster_resources = ray.global_state.cluster_resources() available_resources = ray.global_state.available_resources() assert cluster_resources == available_resources @@ -67,7 +39,7 @@ def test_replenish_resources(ray_start): pytest_timeout is None, reason="Timeout package not installed; skipping test that may hang.") @pytest.mark.timeout(10) -def test_uses_resources(ray_start): +def test_uses_resources(ray_start_regular): cluster_resources = ray.global_state.cluster_resources() @ray.remote @@ -89,9 +61,9 @@ def test_uses_resources(ray_start): pytest_timeout is None, reason="Timeout package not installed; skipping test that may hang.") @pytest.mark.timeout(20) -def test_add_remove_cluster_resources(cluster_start): +def test_add_remove_cluster_resources(ray_start_cluster_head): """Tests that Global State API is consistent with actual cluster.""" - cluster = cluster_start + cluster = ray_start_cluster_head assert ray.global_state.cluster_resources()["CPU"] == 1 nodes = [] nodes += [cluster.add_node(num_cpus=1)] diff --git a/python/ray/tests/test_microbenchmarks.py b/python/ray/tests/test_microbenchmarks.py index 6bbbc48bb..e42e0fba4 100644 --- a/python/ray/tests/test_microbenchmarks.py +++ b/python/ray/tests/test_microbenchmarks.py @@ -2,20 +2,12 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import pytest import os -import ray +import pytest import time import numpy as np - -@pytest.fixture -def ray_start_regular(): - # Start the Ray processes. - ray.init(num_cpus=3) - yield None - # The code after the yield will run as teardown code. - ray.shutdown() +import ray def test_timing(ray_start_regular): @@ -96,6 +88,7 @@ def test_timing(ray_start_regular): # average_elapsed_time should be about 0.00087. +@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) def test_cache(ray_start_regular): A = np.random.rand(1, 1000000) v = np.random.rand(1000000) diff --git a/python/ray/tests/test_mini.py b/python/ray/tests/test_mini.py index cc4c5a039..b4e630080 100644 --- a/python/ray/tests/test_mini.py +++ b/python/ray/tests/test_mini.py @@ -2,22 +2,12 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import pytest import ray test_values = [1, 1.0, "test", b"test", (0, 1), [0, 1], {0: 1}] -@pytest.fixture -def ray_start(): - # Start the Ray processes. - ray.init(num_cpus=1) - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - -def test_basic_task_api(ray_start): +def test_basic_task_api(ray_start_regular): # Test a simple function. @@ -50,7 +40,7 @@ def test_basic_task_api(ray_start): # Test keyword arguments. -def test_put_api(ray_start): +def test_put_api(ray_start_regular): for obj in test_values: assert ray.get(ray.put(obj)) == obj @@ -61,7 +51,7 @@ def test_put_api(ray_start): assert ray.get(ray.put(obj)) == obj -def test_actor_api(ray_start): +def test_actor_api(ray_start_regular): @ray.remote class Foo(object): def __init__(self, val): diff --git a/python/ray/tests/test_modin.py b/python/ray/tests/test_modin.py index 06b8fe1cf..c0ea8d339 100644 --- a/python/ray/tests/test_modin.py +++ b/python/ray/tests/test_modin.py @@ -7,13 +7,6 @@ import pytest import ray -@pytest.fixture -def shutdown_only(): - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - def test_modin_import_with_ray_init(shutdown_only): ray.init(num_cpus=1) import modin.pandas as pd diff --git a/python/ray/tests/test_multi_node.py b/python/ray/tests/test_multi_node.py index 8e6cd7fcf..1e7252b31 100644 --- a/python/ray/tests/test_multi_node.py +++ b/python/ray/tests/test_multi_node.py @@ -12,26 +12,8 @@ from ray.tests.utils import (run_and_get_output, run_string_as_driver, run_string_as_driver_nonblocking) -@pytest.fixture -def ray_start_head(): - out = run_and_get_output(["ray", "start", "--head", "--num-cpus=2"]) - # Get the redis address from the output. - redis_substring_prefix = "redis_address=\"" - redis_address_location = ( - out.find(redis_substring_prefix) + len(redis_substring_prefix)) - redis_address = out[redis_address_location:] - redis_address = redis_address.split("\"")[0] - - yield redis_address - - # Disconnect from the Ray cluster. - ray.shutdown() - # Kill the Ray cluster. - subprocess.Popen(["ray", "stop"]).wait() - - -def test_error_isolation(ray_start_head): - redis_address = ray_start_head +def test_error_isolation(call_ray_start): + redis_address = call_ray_start # Connect a driver to the Ray cluster. ray.init(redis_address=redis_address) @@ -99,10 +81,10 @@ print("success") assert error_string1 in ray.error_info()[0]["message"] -def test_remote_function_isolation(ray_start_head): +def test_remote_function_isolation(call_ray_start): # This test will run multiple remote functions with the same names in # two different drivers. Connect a driver to the Ray cluster. - redis_address = ray_start_head + redis_address = call_ray_start ray.init(redis_address=redis_address) @@ -142,10 +124,10 @@ print("success") assert "success" in out -def test_driver_exiting_quickly(ray_start_head): +def test_driver_exiting_quickly(call_ray_start): # This test will create some drivers that submit some tasks and then # exit without waiting for the tasks to complete. - redis_address = ray_start_head + redis_address = call_ray_start ray.init(redis_address=redis_address) @@ -218,25 +200,11 @@ ray.get([a.log.remote(), f.remote()]) assert out.count(log_message) == 4 -@pytest.fixture -def ray_start_head_with_resources(): - out = run_and_get_output( - ["ray", "start", "--head", "--num-cpus=1", "--num-gpus=1"]) - # Get the redis address from the output. - redis_substring_prefix = "redis_address=\"" - redis_address_location = ( - out.find(redis_substring_prefix) + len(redis_substring_prefix)) - redis_address = out[redis_address_location:] - redis_address = redis_address.split("\"")[0] - - yield redis_address - - # Kill the Ray cluster. - subprocess.Popen(["ray", "stop"]).wait() - - -def test_drivers_release_resources(ray_start_head_with_resources): - redis_address = ray_start_head_with_resources +@pytest.mark.parametrize( + "call_ray_start", ["ray start --head --num-cpus=1 --num-gpus=1"], + indirect=True) +def test_drivers_release_resources(call_ray_start): + redis_address = call_ray_start # Define a driver that creates an actor and exits. driver_script1 = """ @@ -359,23 +327,13 @@ def test_calling_start_ray_head(): subprocess.Popen(["ray", "stop"]).wait() -@pytest.fixture -def ray_start_head_local(): - # Start the Ray processes on this machine. - run_and_get_output([ - "ray", "start", "--head", "--node-ip-address=localhost", - "--redis-port=6379" - ]) - - yield None - - # Disconnect from the Ray cluster. - ray.shutdown() - # Kill the Ray cluster. - subprocess.Popen(["ray", "stop"]).wait() - - -def test_using_hostnames(ray_start_head_local): +@pytest.mark.parametrize( + "call_ray_start", [ + "ray start --head --num-cpus=1 " + + "--node-ip-address=localhost --redis-port=6379" + ], + indirect=True) +def test_using_hostnames(call_ray_start): ray.init(node_ip_address="localhost", redis_address="localhost:6379") @ray.remote @@ -385,15 +343,6 @@ def test_using_hostnames(ray_start_head_local): assert ray.get(f.remote()) == 1 -@pytest.fixture -def ray_start_regular(): - # Start the Ray processes. - address_info = ray.init(num_cpus=1) - yield address_info - # The code after the yield will run as teardown code. - ray.shutdown() - - def test_connecting_in_local_case(ray_start_regular): address_info = ray_start_regular @@ -453,10 +402,10 @@ print("success") assert "success" in out -def test_driver_exiting_when_worker_blocked(ray_start_head): +def test_driver_exiting_when_worker_blocked(call_ray_start): # This test will create some drivers that submit some tasks and then # exit without waiting for the tasks to complete. - redis_address = ray_start_head + redis_address = call_ray_start ray.init(redis_address=redis_address) diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index 28e93050c..5bfb21c68 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -2,7 +2,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import json import logging import pytest import time @@ -10,46 +9,11 @@ import time import ray import ray.ray_constants as ray_constants from ray.tests.cluster_utils import Cluster +from ray.tests.conftest import generate_internal_config_map logger = logging.getLogger(__name__) -@pytest.fixture -def start_connected_cluster(): - # Start the Ray processes. - g = Cluster( - initialize_head=True, - connect=True, - head_node_args={ - "num_cpus": 1, - "_internal_config": json.dumps({ - "num_heartbeats_timeout": 10 - }) - }) - yield g - # The code after the yield will run as teardown code. - ray.shutdown() - g.shutdown() - - -@pytest.fixture -def start_connected_longer_cluster(): - """Creates a cluster with a longer timeout.""" - g = Cluster( - initialize_head=True, - connect=True, - head_node_args={ - "num_cpus": 1, - "_internal_config": json.dumps({ - "num_heartbeats_timeout": 20 - }) - }) - yield g - # The code after the yield will run as teardown code. - ray.shutdown() - g.shutdown() - - def test_cluster(): """Basic test for adding and removing nodes in cluster.""" g = Cluster(initialize_head=False) @@ -70,7 +34,11 @@ def test_shutdown(): assert not any(n.any_processes_alive() for n in [node, node2]) -def test_internal_config(start_connected_longer_cluster): +@pytest.mark.parametrize( + "ray_start_cluster_head", + [generate_internal_config_map(num_heartbeats_timeout=20)], + indirect=True) +def test_internal_config(ray_start_cluster_head): """Checks that the internal configuration setting works. We set the cluster to timeout nodes after 2 seconds of no timeouts. We @@ -78,7 +46,7 @@ def test_internal_config(start_connected_longer_cluster): of sync, then wait another 2 seconds (giving 1 second of leeway) to check that the client has timed out. """ - cluster = start_connected_longer_cluster + cluster = ray_start_cluster_head worker = cluster.add_node() cluster.wait_for_nodes() @@ -90,13 +58,13 @@ def test_internal_config(start_connected_longer_cluster): assert ray.global_state.cluster_resources()["CPU"] == 1 -def test_wait_for_nodes(start_connected_cluster): +def test_wait_for_nodes(ray_start_cluster_head): """Unit test for `Cluster.wait_for_nodes`. Adds 4 workers, waits, then removes 4 workers, waits, then adds 1 worker, waits, and removes 1 worker, waits. """ - cluster = start_connected_cluster + cluster = ray_start_cluster_head workers = [cluster.add_node() for i in range(4)] cluster.wait_for_nodes() [cluster.remove_node(w) for w in workers] @@ -110,8 +78,8 @@ def test_wait_for_nodes(start_connected_cluster): assert ray.global_state.cluster_resources()["CPU"] == 1 -def test_worker_plasma_store_failure(start_connected_cluster): - cluster = start_connected_cluster +def test_worker_plasma_store_failure(ray_start_cluster_head): + cluster = ray_start_cluster_head worker = cluster.add_node() cluster.wait_for_nodes() # Log monitor doesn't die for some reason diff --git a/python/ray/tests/test_node_manager.py b/python/ray/tests/test_node_manager.py index 5a6b71f18..fc195d3a2 100644 --- a/python/ray/tests/test_node_manager.py +++ b/python/ray/tests/test_node_manager.py @@ -2,27 +2,14 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import pytest - import ray -from ray.tests.cluster_utils import Cluster from ray.tests.utils import run_string_as_driver -@pytest.fixture() -def ray_start_empty_cluster(): - cluster = Cluster() - yield cluster - - # The code after the yield will run as teardown code. - ray.shutdown() - cluster.shutdown() - - # This tests the queue transitions for infeasible tasks. This has been an issue # in the past, e.g., https://github.com/ray-project/ray/issues/3275. -def test_infeasible_tasks(ray_start_empty_cluster): - cluster = ray_start_empty_cluster +def test_infeasible_tasks(ray_start_cluster): + cluster = ray_start_cluster @ray.remote def f(): diff --git a/python/ray/tests/test_object_manager.py b/python/ray/tests/test_object_manager.py index 1626312be..b096d2a78 100644 --- a/python/ray/tests/test_object_manager.py +++ b/python/ray/tests/test_object_manager.py @@ -33,7 +33,7 @@ def create_cluster(num_nodes): @pytest.fixture() -def ray_start_cluster(): +def ray_start_cluster_with_resource(): num_nodes = 5 cluster = create_cluster(num_nodes) yield cluster, num_nodes @@ -43,20 +43,10 @@ def ray_start_cluster(): cluster.shutdown() -@pytest.fixture() -def ray_start_empty_cluster(): - cluster = Cluster() - yield cluster - - # The code after the yield will run as teardown code. - ray.shutdown() - cluster.shutdown() - - # This test is here to make sure that when we broadcast an object to a bunch of # machines, we don't have too many excess object transfers. -def test_object_broadcast(ray_start_cluster): - cluster, num_nodes = ray_start_cluster +def test_object_broadcast(ray_start_cluster_with_resource): + cluster, num_nodes = ray_start_cluster_with_resource @ray.remote def f(x): @@ -137,8 +127,8 @@ def test_object_broadcast(ray_start_cluster): # to the actor's object manager. However, in the past we did not deduplicate # the pushes and so the same object could get shipped to the same object # manager many times. This test checks that that isn't happening. -def test_actor_broadcast(ray_start_cluster): - cluster, num_nodes = ray_start_cluster +def test_actor_broadcast(ray_start_cluster_with_resource): + cluster, num_nodes = ray_start_cluster_with_resource @ray.remote class Actor(object): @@ -212,8 +202,8 @@ def test_actor_broadcast(ray_start_cluster): # The purpose of this test is to make sure that an object that was already been # transferred to a node can be transferred again. -def test_object_transfer_retry(ray_start_empty_cluster): - cluster = ray_start_empty_cluster +def test_object_transfer_retry(ray_start_cluster): + cluster = ray_start_cluster repeated_push_delay = 4 @@ -300,8 +290,8 @@ def test_object_transfer_retry(ray_start_empty_cluster): # The purpose of this test is to make sure we can transfer many objects. In the # past, this has caused failures in which object managers create too many open # files and run out of resources. -def test_many_small_transfers(ray_start_cluster): - cluster, num_nodes = ray_start_cluster +def test_many_small_transfers(ray_start_cluster_with_resource): + cluster, num_nodes = ray_start_cluster_with_resource @ray.remote def f(*args): diff --git a/python/ray/tests/test_queue.py b/python/ray/tests/test_queue.py index 2e4e7aa6d..5bcb6b9f3 100644 --- a/python/ray/tests/test_queue.py +++ b/python/ray/tests/test_queue.py @@ -9,16 +9,7 @@ import ray from ray.experimental.queue import Queue, Empty, Full -@pytest.fixture -def ray_start(): - # Start the Ray process. - ray.init(num_cpus=1) - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - -def test_queue(ray_start): +def test_queue(ray_start_regular): @ray.remote def get_async(queue, block, timeout, sleep): time.sleep(sleep) diff --git a/python/ray/tests/test_ray_init.py b/python/ray/tests/test_ray_init.py index ef50b0130..31a5e666e 100644 --- a/python/ray/tests/test_ray_init.py +++ b/python/ray/tests/test_ray_init.py @@ -18,13 +18,6 @@ def password(): return random_bytes.encode("hex") # Python 2 -@pytest.fixture -def shutdown_only(): - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - class TestRedisPassword(object): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", diff --git a/python/ray/tests/test_recursion.py b/python/ray/tests/test_recursion.py index 5af4fa15b..56b28f5e6 100644 --- a/python/ray/tests/test_recursion.py +++ b/python/ray/tests/test_recursion.py @@ -6,23 +6,9 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import pytest - import ray -@pytest.fixture -def ray_start(): - # Start ray instance - ray.init(num_cpus=1) - - # Run test using this fixture - yield None - - # Shutdown ray instance - ray.shutdown() - - @ray.remote def factorial(n): if n == 0: @@ -30,7 +16,7 @@ def factorial(n): return n * ray.get(factorial.remote(n - 1)) -def test_recursion(ray_start): +def test_recursion(ray_start_regular): assert ray.get(factorial.remote(0)) == 1 assert ray.get(factorial.remote(1)) == 1 assert ray.get(factorial.remote(2)) == 2 diff --git a/python/ray/tests/test_set_task_returns.py b/python/ray/tests/test_set_task_returns.py index 7c8a6bb9d..9accfc74c 100644 --- a/python/ray/tests/test_set_task_returns.py +++ b/python/ray/tests/test_set_task_returns.py @@ -10,16 +10,7 @@ import ray.experimental.no_return import ray.worker -@pytest.fixture -def ray_start(): - # Start the Ray processes. - ray.init(num_cpus=1) - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - -def test_set_single_output(ray_start): +def test_set_single_output(ray_start_regular): @ray.remote def f(): return_object_ids = ray.worker.global_worker._current_task.returns() @@ -29,7 +20,7 @@ def test_set_single_output(ray_start): assert ray.get(f.remote()) == 123 -def test_set_multiple_outputs(ray_start): +def test_set_multiple_outputs(ray_start_regular): @ray.remote(num_return_vals=3) def f(set_out0, set_out1, set_out2): returns = [] @@ -51,7 +42,7 @@ def test_set_multiple_outputs(ray_start): ] -def test_set_actor_method(ray_start): +def test_set_actor_method(ray_start_regular): @ray.remote class Actor(object): def __init__(self): @@ -67,7 +58,7 @@ def test_set_actor_method(ray_start): assert ray.get(actor.ping.remote()) == 123 -def test_exception(ray_start): +def test_exception(ray_start_regular): @ray.remote(num_return_vals=2) def f(): return_object_ids = ray.worker.global_worker._current_task.returns() @@ -84,7 +75,7 @@ def test_exception(ray_start): ray.get(exception_id) -def test_no_set_and_no_return(ray_start): +def test_no_set_and_no_return(ray_start_regular): @ray.remote def f(): return ray.experimental.no_return.NoReturn diff --git a/python/ray/tests/test_signal.py b/python/ray/tests/test_signal.py index 9885cdde7..cb1daa838 100644 --- a/python/ray/tests/test_signal.py +++ b/python/ray/tests/test_signal.py @@ -1,4 +1,3 @@ -import pytest import time import ray @@ -10,15 +9,6 @@ class UserSignal(signal.Signal): self.value = value -@pytest.fixture -def ray_start(): - # Start the Ray processes. - ray.init(num_cpus=4) - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - def receive_all_signals(sources, timeout): # Get all signals from sources, until there is no signal for a time # period of timeout. @@ -32,7 +22,7 @@ def receive_all_signals(sources, timeout): results.extend(r) -def test_task_to_driver(ray_start): +def test_task_to_driver(ray_start_regular): # Send a signal from a task to the driver. @ray.remote @@ -47,7 +37,7 @@ def test_task_to_driver(ray_start): assert len(result_list) == 1 -def test_send_signal_from_actor_to_driver(ray_start): +def test_send_signal_from_actor_to_driver(ray_start_regular): # Send several signals from an actor, and receive them in the driver. @ray.remote @@ -70,7 +60,7 @@ def test_send_signal_from_actor_to_driver(ray_start): assert signal_value + str(i) == result_list[i][1].value -def test_send_signals_from_actor_to_driver(ray_start): +def test_send_signals_from_actor_to_driver(ray_start_regular): # Send "count" signal at intervals from an actor and get # these signals in the driver. @@ -96,7 +86,7 @@ def test_send_signals_from_actor_to_driver(ray_start): assert True -def test_task_crash(ray_start): +def test_task_crash(ray_start_regular): # Get an error when ray.get() is called on the return of a failed task. @ray.remote @@ -114,7 +104,7 @@ def test_task_crash(ray_start): assert type(result_list[0][1]) == signal.ErrorSignal -def test_task_crash_without_get(ray_start): +def test_task_crash_without_get(ray_start_regular): # Get an error when task failed. @ray.remote @@ -127,7 +117,7 @@ def test_task_crash_without_get(ray_start): assert type(result_list[0][1]) == signal.ErrorSignal -def test_actor_crash(ray_start): +def test_actor_crash(ray_start_regular): # Get an error when ray.get() is called on a return parameter # of a method that failed. @@ -150,7 +140,7 @@ def test_actor_crash(ray_start): assert type(result_list[0][1]) == signal.ErrorSignal -def test_actor_crash_init(ray_start): +def test_actor_crash_init(ray_start_regular): # Get an error when an actor's __init__ failed. @ray.remote @@ -168,7 +158,7 @@ def test_actor_crash_init(ray_start): assert type(result_list[0][1]) == signal.ErrorSignal -def test_actor_crash_init2(ray_start): +def test_actor_crash_init2(ray_start_regular): # Get errors when (1) __init__ fails, and (2) subsequently when # ray.get() is called on the return parameter of another method # of the actor. @@ -192,7 +182,7 @@ def test_actor_crash_init2(ray_start): assert type(result_list[0][1]) == signal.ErrorSignal -def test_actor_crash_init3(ray_start): +def test_actor_crash_init3(ray_start_regular): # Get errors when (1) __init__ fails, and (2) subsequently when # another method of the actor is invoked. @@ -213,7 +203,7 @@ def test_actor_crash_init3(ray_start): assert type(result_list[0][1]) == signal.ErrorSignal -def test_send_signals_from_actor_to_actor(ray_start): +def test_send_signals_from_actor_to_actor(ray_start_regular): # Send "count" signal at intervals of 100ms from two actors and get # these signals in another actor. @@ -260,7 +250,7 @@ def test_send_signals_from_actor_to_actor(ray_start): assert received_count == 2 * count -def test_forget(ray_start): +def test_forget(ray_start_regular): # Send "count" signals on behalf of an actor, then ignore all these # signals, and then send anther "count" signals on behalf of the same # actor. Then show that the driver only gets the last "count" signals. @@ -284,7 +274,7 @@ def test_forget(ray_start): assert len(result_list) == count -def test_send_signal_from_two_tasks_to_driver(ray_start): +def test_send_signal_from_two_tasks_to_driver(ray_start_regular): # Define a remote function that sends a user-defined signal. @ray.remote def send_signal(value): @@ -302,7 +292,7 @@ def test_send_signal_from_two_tasks_to_driver(ray_start): assert len(result_list) == 1 -def test_receiving_on_two_returns(ray_start): +def test_receiving_on_two_returns(ray_start_regular): @ray.remote(num_return_vals=2) def send_signal(value): signal.send(UserSignal(value)) @@ -318,7 +308,7 @@ def test_receiving_on_two_returns(ray_start): or (x == results[1][0] and y == results[0][0])) -def test_serial_tasks_reading_same_signal(ray_start): +def test_serial_tasks_reading_same_signal(ray_start_regular): @ray.remote def send_signal(value): signal.send(UserSignal(value)) diff --git a/python/ray/tests/test_stress.py b/python/ray/tests/test_stress.py index cdb5684e3..98a029946 100644 --- a/python/ray/tests/test_stress.py +++ b/python/ray/tests/test_stress.py @@ -473,21 +473,13 @@ def test_nondeterministic_task(ray_start_reconstruction): assert cluster.remaining_processes_alive() -@pytest.fixture -def ray_start_driver_put_errors(): - plasma_store_memory = 10**9 - # Start the Ray processes. - ray.init(num_cpus=1, object_store_memory=plasma_store_memory) - yield plasma_store_memory - # The code after the yield will run as teardown code. - ray.shutdown() - - @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Failing with new GCS API on Linux.") -def test_driver_put_errors(ray_start_driver_put_errors): - plasma_store_memory = ray_start_driver_put_errors +@pytest.mark.parametrize( + "ray_start_object_store_memory", [10**9], indirect=True) +def test_driver_put_errors(ray_start_object_store_memory): + plasma_store_memory = ray_start_object_store_memory # Define the size of one task's return argument so that the combined # sum of all objects' sizes is at least twice the plasma stores' # combined allotted memory. diff --git a/python/ray/tests/test_tensorflow.py b/python/ray/tests/test_tensorflow.py index 258ca0124..07dfe60ef 100644 --- a/python/ray/tests/test_tensorflow.py +++ b/python/ray/tests/test_tensorflow.py @@ -3,7 +3,6 @@ from __future__ import division from __future__ import print_function from numpy.testing import assert_almost_equal -import pytest import tensorflow as tf import ray @@ -96,16 +95,7 @@ class TrainActor(object): return self.values[1].get_weights() -@pytest.fixture -def ray_start_regular(): - # Start the Ray processes. - ray.init(num_cpus=2) - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - -def test_tensorflow_variables(ray_start_regular): +def test_tensorflow_variables(ray_start_2_cpus): sess = tf.Session() loss, init, _, _ = make_linear_network() sess.run(init) @@ -144,7 +134,7 @@ def test_tensorflow_variables(ray_start_regular): # Test that the variable names for the two different nets are not # modified by TensorFlow to be unique (i.e., they should already # be unique because of the variable prefix). -def test_variable_name_collision(ray_start_regular): +def test_variable_name_collision(ray_start_2_cpus): net1 = NetActor() net2 = NetActor() @@ -155,7 +145,7 @@ def test_variable_name_collision(ray_start_regular): # Test that TensorFlowVariables can take in addition variables through # input_variables arg and with no loss. -def test_additional_variables_no_loss(ray_start_regular): +def test_additional_variables_no_loss(ray_start_2_cpus): net = LossActor(use_loss=False) assert len(net.values[0].variables.items()) == 1 assert len(net.values[0].placeholders.items()) == 1 @@ -165,7 +155,7 @@ def test_additional_variables_no_loss(ray_start_regular): # Test that TensorFlowVariables can take in addition variables through # input_variables arg and with a loss. -def test_additional_variables_with_loss(ray_start_regular): +def test_additional_variables_with_loss(ray_start_2_cpus): net = LossActor() assert len(net.values[0].variables.items()) == 3 assert len(net.values[0].placeholders.items()) == 3 @@ -175,7 +165,7 @@ def test_additional_variables_with_loss(ray_start_regular): # Test that different networks on the same worker are independent and # we can get/set their weights without any interaction. -def test_networks_independent(ray_start_regular): +def test_networks_independent(ray_start_2_cpus): # Note we use only one worker to ensure that all of the remote # functions run on the same worker. net1 = NetActor() @@ -204,7 +194,7 @@ def test_networks_independent(ray_start_regular): # This test creates an additional network on the driver so that the # tensorflow variables on the driver and the worker differ. -def test_network_driver_worker_independent(ray_start_regular): +def test_network_driver_worker_independent(ray_start_2_cpus): # Create a network on the driver locally. sess1 = tf.Session() loss1, init1, _, _ = make_linear_network() @@ -219,7 +209,7 @@ def test_network_driver_worker_independent(ray_start_regular): assert weights2 == new_weights2 -def test_variables_control_dependencies(ray_start_regular): +def test_variables_control_dependencies(ray_start_2_cpus): # Creates a network and appends a momentum optimizer. sess = tf.Session() loss, init, _, _ = make_linear_network() @@ -232,12 +222,12 @@ def test_variables_control_dependencies(ray_start_regular): assert len(net_vars.variables.items()) == 4 -def test_remote_training_step(ray_start_regular): +def test_remote_training_step(ray_start_2_cpus): net = ray.remote(TrainActor).remote() ray.get(net.training_step.remote(net.get_weights.remote())) -def test_remote_training_loss(ray_start_regular): +def test_remote_training_loss(ray_start_2_cpus): net = ray.remote(TrainActor).remote() net_values = TrainActor().values loss, variables, _, sess, grads, train, placeholders = net_values