diff --git a/bazel/python.bzl b/bazel/python.bzl new file mode 100644 index 000000000..2b6f548a2 --- /dev/null +++ b/bazel/python.bzl @@ -0,0 +1,12 @@ +# py_test_module_list creates a py_test target for each +# Python file in `files` +def py_test_module_list(files, size, deps, extra_srcs, **kwargs): + for file in files: + # remove .py + name = file[:-3] + native.py_test( + name=name, + size=size, + srcs=extra_srcs+[file], + **kwargs + ) diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 0408ec920..e8ac4c6cf 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -1,3 +1,5 @@ +load("//bazel:python.bzl", "py_test_module_list") + SRCS = [] + select({ "@bazel_tools//src/conditions:windows": glob([ # TODO(mehrdadn): This should be added for all platforms once resulting errors are fixed @@ -6,214 +8,98 @@ SRCS = [] + select({ "//conditions:default": [], }) -py_test( - name = "test_actor", - size = "medium", - srcs = SRCS + ["test_actor.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], +py_test_module_list( + files = [ + "test_actor_advanced.py", + "test_actor_failures.py", + "test_actor.py", + "test_actor_resources.py", + "test_advanced_2.py", + "test_advanced_3.py", + "test_advanced.py", + "test_array.py", + "test_async.py", + "test_basic_2.py", + "test_basic.py", + "test_cancel.py", + "test_component_failures_2.py", + "test_component_failures_3.py", + "test_dynres.py", + "test_gcs_fault_tolerance.py", + "test_global_gc.py", + "test_iter.py", + "test_joblib.py", + "test_memory_limits.py", + "test_memory_scheduling.py", + "test_metrics.py", + "test_multi_node_2.py", + "test_multinode_failures_2.py", + "test_multinode_failures.py", + "test_multi_node.py", + "test_multiprocessing.py", + "test_object_manager.py", + "test_output.py", + "test_placement_group.py", + "test_reconstruction.py", + "test_reference_counting_2.py", + "test_reference_counting.py", + "test_stress.py", + "test_stress_sharded.py", + "test_unreconstructable_errors.py", + ], + size = "medium", + extra_srcs = SRCS, + tags = ["exclusive"], + deps = ["//:ray_lib"], ) -py_test( - name = "test_actor_advanced", - size = "medium", - srcs = SRCS + ["test_actor_advanced.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], +py_test_module_list( + files = [ + "test_actor_pool.py", + "test_args.py", + "test_asyncio.py", + "test_autoscaler.py", + "test_autoscaler_yaml.py", + "test_component_failures.py", + "test_coordinator_server.py", + "test_dask_scheduler.py", + "test_debug_tools.py", + "test_global_state.py", + "test_job.py", + "test_memstat.py", + "test_metrics_agent.py", + "test_microbenchmarks.py", + "test_mini.py", + "test_multi_tenancy.py", + "test_node_manager.py", + "test_numba.py", + "test_projects.py", + "test_ray_init.py", + "test_serialization.py", + "test_tempfile.py", + "test_tensorflow.py", + "test_webui.py", + ], + size = "small", + extra_srcs = SRCS, + tags = ["exclusive"], + deps = ["//:ray_lib"], ) -py_test( - name = "test_actor_pool", - size = "small", - srcs = SRCS + ["test_actor_pool.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_memstat", - size = "small", - srcs = SRCS + ["test_memstat.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_dask_scheduler", - size = "small", - srcs = SRCS + ["test_dask_scheduler.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_iter", - size = "medium", - srcs = SRCS + ["test_iter.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_actor_resources", - size = "medium", - srcs = SRCS + ["test_actor_resources.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_actor_failures", - size = "medium", - srcs = SRCS + ["test_actor_failures.py"], - # TODO(ekl) enable this once we support actor reconstruction again - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_serialization", - size = "small", - srcs = SRCS + ["test_serialization.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_basic", - size = "medium", - srcs = SRCS + ["test_basic.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_basic_2", - size = "medium", - srcs = SRCS + ["test_basic_2.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_advanced", - size = "medium", - srcs = SRCS + ["test_advanced.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_advanced_2", - size = "medium", - srcs = SRCS + ["test_advanced_2.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_advanced_3", - size = "medium", - srcs = SRCS + ["test_advanced_3.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_multi_tenancy", - size = "small", - srcs = SRCS + ["test_multi_tenancy.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_component_failures", - size = "small", - srcs = SRCS + ["test_component_failures.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_component_failures_2", - size = "medium", - srcs = SRCS + ["test_component_failures_2.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_component_failures_3", - size = "medium", - srcs = SRCS + ["test_component_failures_3.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_multinode_failures", - size = "medium", - srcs = SRCS + ["test_multinode_failures.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_multinode_failures_2", - size = "medium", - srcs = SRCS + ["test_multinode_failures_2.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_stress", - size = "medium", - srcs = SRCS + ["test_stress.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_stress_sharded", - size = "medium", - srcs = SRCS + ["test_stress_sharded.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_stress_failure", - size = "large", - srcs = SRCS + ["test_stress_failure.py"], - # TODO(ekl) enable again once we support direct call reconstruction - tags = ["exclusive", "manual"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_array", - size = "medium", - srcs = SRCS + ["test_array.py"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_autoscaler", - size = "small", - srcs = SRCS + ["test_autoscaler.py"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_coordinator_server", - size = "small", - srcs = SRCS + ["test_coordinator_server.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], +py_test_module_list( + files = [ + "test_stress_faiure.py", + "test_failure.py" + ], + size = "large", + extra_srcs = SRCS, + # TODO(ekl) enable again once we support direct call reconstruction + tags = ["exclusive", "manual"], + deps = ["//:ray_lib"], ) +# TODO(barakmich): aws/ might want its own buildfile, or +# py_test_module_list should support subdirectories. py_test( name = "test_autoscaler_aws", size = "small", @@ -221,293 +107,3 @@ py_test( deps = ["//:ray_lib"], ) -py_test( - name = "test_autoscaler_yaml", - size = "small", - srcs = SRCS + ["test_autoscaler_yaml.py"], - data = ["additional_property.yaml"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_debug_tools", - size = "small", - srcs = SRCS + ["test_debug_tools.py"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_dynres", - size = "medium", - srcs = SRCS + ["test_dynres.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_failure", - size = "large", - srcs = SRCS + ["test_failure.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_reconstruction", - size = "medium", - srcs = SRCS + ["test_reconstruction.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_reference_counting", - size = "medium", - srcs = SRCS + ["test_reference_counting.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_reference_counting_2", - size = "medium", - srcs = SRCS + ["test_reference_counting_2.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_global_gc", - size = "medium", - srcs = SRCS + ["test_global_gc.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_global_state", - size = "small", - srcs = SRCS + ["test_global_state.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_memory_limits", - size = "medium", - srcs = SRCS + ["test_memory_limits.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_memory_scheduling", - size = "medium", - srcs = SRCS + ["test_memory_scheduling.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_metrics", - size = "small", - srcs = SRCS + ["test_metrics.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_metrics_agent", - size = "small", - srcs = SRCS + ["test_metrics_agent.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_microbenchmarks", - size = "small", - srcs = SRCS + ["test_microbenchmarks.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_mini", - size = "small", - srcs = SRCS + ["test_mini.py"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_multiprocessing", - size = "medium", - srcs = SRCS + ["test_multiprocessing.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_job", - size = "small", - srcs = SRCS + ["test_job.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_joblib", - size = "medium", - srcs = SRCS + ["test_joblib.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_multi_node_2", - size = "medium", - srcs = SRCS + ["test_multi_node_2.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_multi_node", - size = "medium", - srcs = SRCS + ["test_multi_node.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_node_manager", - size = "small", - srcs = SRCS + ["test_node_manager.py"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_object_manager", - size = "medium", - srcs = SRCS + ["test_object_manager.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_projects", - size = "small", - srcs = SRCS + ["test_projects.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_queue", - size = "small", - srcs = SRCS + ["test_queue.py"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_ray_init", - size = "small", - srcs = SRCS + ["test_ray_init.py"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_tempfile", - size = "small", - srcs = SRCS + ["test_tempfile.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_tensorflow", - size = "small", - srcs = SRCS + ["test_tensorflow.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_unreconstructable_errors", - size = "medium", - srcs = SRCS + ["test_unreconstructable_errors.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_webui", - size = "small", - srcs = SRCS + ["test_webui.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_args", - size = "small", - srcs = SRCS + ["test_args.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_asyncio", - size = "small", - srcs = SRCS + ["test_asyncio.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_numba", - size = "small", - srcs = SRCS + ["test_numba.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_cancel", - size = "medium", - srcs = SRCS + ["test_cancel.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_gcs_fault_tolerance", - size = "medium", - srcs = SRCS + ["test_gcs_fault_tolerance.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_output", - size = "medium", - srcs = SRCS + ["test_output.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_async", - size = "medium", - srcs = SRCS + ["test_async.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) - -py_test( - name = "test_placement_group", - size = "medium", - srcs = SRCS + ["test_placement_group.py"], - tags = ["exclusive"], - deps = ["//:ray_lib"], -) diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index a67b88c82..5ebaa39b1 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -74,13 +74,25 @@ def ray_start_regular(request): yield res -@pytest.fixture(scope="session") +@pytest.fixture(scope="module") def ray_start_regular_shared(request): param = getattr(request, "param", {}) with _ray_start(**param) as res: yield res +@pytest.fixture( + scope="module", params=[{ + "local_mode": True + }, { + "local_mode": False + }]) +def ray_start_shared_local_modes(request): + param = getattr(request, "param", {}) + with _ray_start(**param) as res: + yield res + + @pytest.fixture def ray_start_2_cpus(request): param = getattr(request, "param", {}) diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 9fc77d72a..5e00e30aa 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -15,308 +15,6 @@ import ray.test_utils import ray.cluster_utils -def test_actor_exit_from_task(ray_start_regular): - @ray.remote - class Actor: - def __init__(self): - print("Actor created") - - def f(self): - return 0 - - @ray.remote - def f(): - a = Actor.remote() - x_id = a.f.remote() - return [x_id] - - x_id = ray.get(f.remote())[0] - print(ray.get(x_id)) # This should not hang. - - -def test_actor_init_error_propagated(ray_start_regular): - @ray.remote - class Actor: - def __init__(self, error=False): - if error: - raise Exception("oops") - - def foo(self): - return "OK" - - actor = Actor.remote(error=False) - ray.get(actor.foo.remote()) - - actor = Actor.remote(error=True) - with pytest.raises(Exception, match=".*oops.*"): - ray.get(actor.foo.remote()) - - -def test_keyword_args(ray_start_regular): - @ray.remote - class Actor: - def __init__(self, arg0, arg1=1, arg2="a"): - self.arg0 = arg0 - self.arg1 = arg1 - self.arg2 = arg2 - - def get_values(self, arg0, arg1=2, arg2="b"): - return self.arg0 + arg0, self.arg1 + arg1, self.arg2 + arg2 - - actor = Actor.remote(0) - assert ray.get(actor.get_values.remote(1)) == (1, 3, "ab") - - actor = Actor.remote(1, 2) - assert ray.get(actor.get_values.remote(2, 3)) == (3, 5, "ab") - - actor = Actor.remote(1, 2, "c") - assert ray.get(actor.get_values.remote(2, 3, "d")) == (3, 5, "cd") - - actor = Actor.remote(1, arg2="c") - assert ray.get(actor.get_values.remote(0, arg2="d")) == (1, 3, "cd") - assert ray.get(actor.get_values.remote(0, arg2="d", arg1=0)) == (1, 1, - "cd") - - actor = Actor.remote(1, arg2="c", arg1=2) - assert ray.get(actor.get_values.remote(0, arg2="d")) == (1, 4, "cd") - assert ray.get(actor.get_values.remote(0, arg2="d", arg1=0)) == (1, 2, - "cd") - assert ray.get(actor.get_values.remote(arg2="d", arg1=0, arg0=2)) == (3, 2, - "cd") - - # Make sure we get an exception if the constructor is called - # incorrectly. - with pytest.raises(Exception): - actor = Actor.remote() - - with pytest.raises(Exception): - actor = Actor.remote(0, 1, 2, arg3=3) - - with pytest.raises(Exception): - actor = Actor.remote(0, arg0=1) - - # Make sure we get an exception if the method is called incorrectly. - actor = Actor.remote(1) - with pytest.raises(Exception): - ray.get(actor.get_values.remote()) - - -def test_actor_method_metadata_cache(ray_start_regular): - class Actor(object): - pass - - # The cache of ActorClassMethodMetadata. - cache = ray.actor.ActorClassMethodMetadata._cache - cache.clear() - - # Check cache hit during ActorHandle deserialization. - A1 = ray.remote(Actor) - a = A1.remote() - assert len(cache) == 1 - cached_data_id = [id(x) for x in list(cache.items())[0]] - for x in range(10): - a = pickle.loads(pickle.dumps(a)) - assert len(ray.actor.ActorClassMethodMetadata._cache) == 1 - assert [id(x) for x in list(cache.items())[0]] == cached_data_id - - -def test_actor_name_conflict(ray_start_regular): - @ray.remote - class A(object): - def foo(self): - return 100000 - - a = A.remote() - r = a.foo.remote() - - results = [r] - for x in range(10): - - @ray.remote - class A(object): - def foo(self): - return x - - a = A.remote() - r = a.foo.remote() - results.append(r) - - assert ray.get(results) == [100000, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9] - - -def test_variable_number_of_args(ray_start_regular): - @ray.remote - class Actor: - def __init__(self, arg0, arg1=1, *args): - self.arg0 = arg0 - self.arg1 = arg1 - self.args = args - - def get_values(self, arg0, arg1=2, *args): - return self.arg0 + arg0, self.arg1 + arg1, self.args, args - - actor = Actor.remote(0) - assert ray.get(actor.get_values.remote(1)) == (1, 3, (), ()) - - actor = Actor.remote(1, 2) - assert ray.get(actor.get_values.remote(2, 3)) == (3, 5, (), ()) - - actor = Actor.remote(1, 2, "c") - assert ray.get(actor.get_values.remote(2, 3, "d")) == (3, 5, ("c", ), - ("d", )) - - actor = Actor.remote(1, 2, "a", "b", "c", "d") - assert ray.get(actor.get_values.remote( - 2, 3, 1, 2, 3, 4)) == (3, 5, ("a", "b", "c", "d"), (1, 2, 3, 4)) - - @ray.remote - class Actor: - def __init__(self, *args): - self.args = args - - def get_values(self, *args): - return self.args, args - - a = Actor.remote() - assert ray.get(a.get_values.remote()) == ((), ()) - a = Actor.remote(1) - assert ray.get(a.get_values.remote(2)) == ((1, ), (2, )) - a = Actor.remote(1, 2) - assert ray.get(a.get_values.remote(3, 4)) == ((1, 2), (3, 4)) - - -def test_no_args(ray_start_regular): - @ray.remote - class Actor: - def __init__(self): - pass - - def get_values(self): - pass - - actor = Actor.remote() - assert ray.get(actor.get_values.remote()) is None - - -def test_no_constructor(ray_start_regular): - @ray.remote - class Actor: - def get_values(self): - pass - - actor = Actor.remote() - assert ray.get(actor.get_values.remote()) is None - - -def test_custom_classes(ray_start_regular): - class Foo: - def __init__(self, x): - self.x = x - - @ray.remote - class Actor: - def __init__(self, f2): - self.f1 = Foo(1) - self.f2 = f2 - - def get_values1(self): - return self.f1, self.f2 - - def get_values2(self, f3): - return self.f1, self.f2, f3 - - actor = Actor.remote(Foo(2)) - results1 = ray.get(actor.get_values1.remote()) - assert results1[0].x == 1 - assert results1[1].x == 2 - results2 = ray.get(actor.get_values2.remote(Foo(3))) - assert results2[0].x == 1 - assert results2[1].x == 2 - assert results2[2].x == 3 - - -def test_actor_class_attributes(ray_start_regular): - class Grandparent: - GRANDPARENT = 2 - - class Parent1(Grandparent): - PARENT1 = 6 - - class Parent2: - PARENT2 = 7 - - @ray.remote - class TestActor(Parent1, Parent2): - X = 3 - - @classmethod - def f(cls): - assert TestActor.GRANDPARENT == 2 - assert TestActor.PARENT1 == 6 - assert TestActor.PARENT2 == 7 - assert TestActor.X == 3 - return 4 - - def g(self): - assert TestActor.GRANDPARENT == 2 - assert TestActor.PARENT1 == 6 - assert TestActor.PARENT2 == 7 - assert TestActor.f() == 4 - return TestActor.X - - t = TestActor.remote() - assert ray.get(t.g.remote()) == 3 - - -def test_actor_static_attributes(ray_start_regular): - class Grandparent: - GRANDPARENT = 2 - - @staticmethod - def grandparent_static(): - assert Grandparent.GRANDPARENT == 2 - return 1 - - class Parent1(Grandparent): - PARENT1 = 6 - - @staticmethod - def parent1_static(): - assert Parent1.PARENT1 == 6 - return 2 - - def parent1(self): - assert Parent1.PARENT1 == 6 - - class Parent2: - PARENT2 = 7 - - def parent2(self): - assert Parent2.PARENT2 == 7 - - @ray.remote - class TestActor(Parent1, Parent2): - X = 3 - - @staticmethod - def f(): - assert TestActor.GRANDPARENT == 2 - assert TestActor.PARENT1 == 6 - assert TestActor.PARENT2 == 7 - assert TestActor.X == 3 - return 4 - - def g(self): - assert TestActor.GRANDPARENT == 2 - assert TestActor.PARENT1 == 6 - assert TestActor.PARENT2 == 7 - assert TestActor.f() == 4 - return TestActor.X - - t = TestActor.remote() - assert ray.get(t.g.remote()) == 3 - - def test_caching_actors(shutdown_only): # Test defining actors before ray.init() has been called. @@ -340,261 +38,6 @@ def test_caching_actors(shutdown_only): assert ray.get(f.get_val.remote()) == 3 -def test_decorator_args(ray_start_regular): - # This is an invalid way of using the actor decorator. - with pytest.raises(Exception): - - @ray.remote() - class Actor: - def __init__(self): - pass - - # This is an invalid way of using the actor decorator. - with pytest.raises(Exception): - - @ray.remote(invalid_kwarg=0) # noqa: F811 - class Actor: # noqa: F811 - def __init__(self): - pass - - # This is an invalid way of using the actor decorator. - with pytest.raises(Exception): - - @ray.remote(num_cpus=0, invalid_kwarg=0) # noqa: F811 - class Actor: # noqa: F811 - def __init__(self): - pass - - # This is a valid way of using the decorator. - @ray.remote(num_cpus=1) # noqa: F811 - class Actor: # noqa: F811 - def __init__(self): - pass - - # This is a valid way of using the decorator. - @ray.remote(num_gpus=1) # noqa: F811 - class Actor: # noqa: F811 - def __init__(self): - pass - - # This is a valid way of using the decorator. - @ray.remote(num_cpus=1, num_gpus=1) # noqa: F811 - class Actor: # noqa: F811 - def __init__(self): - pass - - -def test_random_id_generation(ray_start_regular): - @ray.remote - class Foo: - def __init__(self): - pass - - # Make sure that seeding numpy does not interfere with the generation - # of actor IDs. - np.random.seed(1234) - random.seed(1234) - f1 = Foo.remote() - np.random.seed(1234) - random.seed(1234) - f2 = Foo.remote() - - assert f1._actor_id != f2._actor_id - - -def test_actor_class_name(ray_start_regular): - @ray.remote - class Foo: - def __init__(self): - pass - - Foo.remote() - - r = ray.worker.global_worker.redis_client - actor_keys = r.keys("ActorClass*") - assert len(actor_keys) == 1 - actor_class_info = r.hgetall(actor_keys[0]) - assert actor_class_info[b"class_name"] == b"Foo" - assert b"test_actor" in actor_class_info[b"module"] - - -def test_actor_inheritance(ray_start_regular): - class NonActorBase: - def __init__(self): - pass - - # Test that an actor class can inherit from a non-actor class. - @ray.remote - class ActorBase(NonActorBase): - def __init__(self): - pass - - # Test that you can't instantiate an actor class directly. - with pytest.raises( - Exception, match="Actors cannot be instantiated directly."): - ActorBase() - - # Test that you can't inherit from an actor class. - with pytest.raises( - TypeError, - match="Inheriting from actor classes is not " - "currently supported."): - - class Derived(ActorBase): - def __init__(self): - pass - - -def test_multiple_return_values(ray_start_regular): - @ray.remote - class Foo: - def method0(self): - return 1 - - @ray.method(num_return_vals=1) - def method1(self): - return 1 - - @ray.method(num_return_vals=2) - def method2(self): - return 1, 2 - - @ray.method(num_return_vals=3) - def method3(self): - return 1, 2, 3 - - f = Foo.remote() - - id0 = f.method0.remote() - assert ray.get(id0) == 1 - - id1 = f.method1.remote() - assert ray.get(id1) == 1 - - id2a, id2b = f.method2.remote() - assert ray.get([id2a, id2b]) == [1, 2] - - id3a, id3b, id3c = f.method3.remote() - assert ray.get([id3a, id3b, id3c]) == [1, 2, 3] - - -def test_define_actor(ray_start_regular): - @ray.remote - class Test: - def __init__(self, x): - self.x = x - - def f(self, y): - return self.x + y - - t = Test.remote(2) - assert ray.get(t.f.remote(1)) == 3 - - # Make sure that calling an actor method directly raises an exception. - with pytest.raises(Exception): - t.f(1) - - -def test_actor_deletion(ray_start_regular): - # Make sure that when an actor handles goes out of scope, the actor - # destructor is called. - - @ray.remote - class Actor: - def getpid(self): - return os.getpid() - - a = Actor.remote() - pid = ray.get(a.getpid.remote()) - a = None - ray.test_utils.wait_for_pid_to_exit(pid) - - actors = [Actor.remote() for _ in range(10)] - pids = ray.get([a.getpid.remote() for a in actors]) - a = None - actors = None - [ray.test_utils.wait_for_pid_to_exit(pid) for pid in pids] - - -def test_actor_method_deletion(ray_start_regular): - @ray.remote - class Actor: - def method(self): - return 1 - - # Make sure that if we create an actor and call a method on it - # immediately, the actor doesn't get killed before the method is - # called. - assert ray.get(Actor.remote().method.remote()) == 1 - - -def test_distributed_actor_handle_deletion(ray_start_regular): - @ray.remote - class Actor: - def method(self): - return 1 - - def getpid(self): - return os.getpid() - - @ray.remote - def f(actor, signal): - ray.get(signal.wait.remote()) - return ray.get(actor.method.remote()) - - signal = ray.test_utils.SignalActor.remote() - a = Actor.remote() - pid = ray.get(a.getpid.remote()) - # Pass the handle to another task that cannot run yet. - x_id = f.remote(a, signal) - # Delete the original handle. The actor should not get killed yet. - del a - - # Once the task finishes, the actor process should get killed. - ray.get(signal.send.remote()) - assert ray.get(x_id) == 1 - ray.test_utils.wait_for_pid_to_exit(pid) - - -def test_multiple_actors(ray_start_regular): - @ray.remote - class Counter: - def __init__(self, value): - self.value = value - - def increase(self): - self.value += 1 - return self.value - - def reset(self): - self.value = 0 - - num_actors = 5 - num_increases = 50 - # Create multiple actors. - actors = [Counter.remote(i) for i in range(num_actors)] - results = [] - # Call each actor's method a bunch of times. - for i in range(num_actors): - results += [actors[i].increase.remote() for _ in range(num_increases)] - result_values = ray.get(results) - for i in range(num_actors): - v = result_values[(num_increases * i):(num_increases * (i + 1))] - assert v == list(range(i + 1, num_increases + i + 1)) - - # Reset the actor values. - [actor.reset.remote() for actor in actors] - - # Interweave the method calls on the different actors. - results = [] - for j in range(num_increases): - results += [actor.increase.remote() for actor in actors] - result_values = ray.get(results) - for j in range(num_increases): - v = result_values[(num_actors * j):(num_actors * (j + 1))] - assert v == num_actors * [j + 1] - - def test_remote_function_within_actor(ray_start_10_cpus): # Make sure we can use remote funtions within actors. @@ -790,7 +233,564 @@ def test_actor_import_counter(ray_start_10_cpus): assert ray.get(g.remote()) == num_remote_functions - 1 -def test_inherit_actor_from_class(ray_start_regular): +def test_actor_method_metadata_cache(ray_start_regular): + class Actor(object): + pass + + # The cache of ActorClassMethodMetadata. + cache = ray.actor.ActorClassMethodMetadata._cache + cache.clear() + + # Check cache hit during ActorHandle deserialization. + A1 = ray.remote(Actor) + a = A1.remote() + assert len(cache) == 1 + cached_data_id = [id(x) for x in list(cache.items())[0]] + for x in range(10): + a = pickle.loads(pickle.dumps(a)) + assert len(ray.actor.ActorClassMethodMetadata._cache) == 1 + assert [id(x) for x in list(cache.items())[0]] == cached_data_id + + +def test_actor_class_name(ray_start_regular): + @ray.remote + class Foo: + def __init__(self): + pass + + Foo.remote() + + r = ray.worker.global_worker.redis_client + actor_keys = r.keys("ActorClass*") + assert len(actor_keys) == 1 + actor_class_info = r.hgetall(actor_keys[0]) + assert actor_class_info[b"class_name"] == b"Foo" + assert b"test_actor" in actor_class_info[b"module"] + + +def test_actor_exit_from_task(ray_start_regular_shared): + @ray.remote + class Actor: + def __init__(self): + print("Actor created") + + def f(self): + return 0 + + @ray.remote + def f(): + a = Actor.remote() + x_id = a.f.remote() + return [x_id] + + x_id = ray.get(f.remote())[0] + print(ray.get(x_id)) # This should not hang. + + +def test_actor_init_error_propagated(ray_start_regular_shared): + @ray.remote + class Actor: + def __init__(self, error=False): + if error: + raise Exception("oops") + + def foo(self): + return "OK" + + actor = Actor.remote(error=False) + ray.get(actor.foo.remote()) + + actor = Actor.remote(error=True) + with pytest.raises(Exception, match=".*oops.*"): + ray.get(actor.foo.remote()) + + +def test_keyword_args(ray_start_regular_shared): + @ray.remote + class Actor: + def __init__(self, arg0, arg1=1, arg2="a"): + self.arg0 = arg0 + self.arg1 = arg1 + self.arg2 = arg2 + + def get_values(self, arg0, arg1=2, arg2="b"): + return self.arg0 + arg0, self.arg1 + arg1, self.arg2 + arg2 + + actor = Actor.remote(0) + assert ray.get(actor.get_values.remote(1)) == (1, 3, "ab") + + actor = Actor.remote(1, 2) + assert ray.get(actor.get_values.remote(2, 3)) == (3, 5, "ab") + + actor = Actor.remote(1, 2, "c") + assert ray.get(actor.get_values.remote(2, 3, "d")) == (3, 5, "cd") + + actor = Actor.remote(1, arg2="c") + assert ray.get(actor.get_values.remote(0, arg2="d")) == (1, 3, "cd") + assert ray.get(actor.get_values.remote(0, arg2="d", arg1=0)) == (1, 1, + "cd") + + actor = Actor.remote(1, arg2="c", arg1=2) + assert ray.get(actor.get_values.remote(0, arg2="d")) == (1, 4, "cd") + assert ray.get(actor.get_values.remote(0, arg2="d", arg1=0)) == (1, 2, + "cd") + assert ray.get(actor.get_values.remote(arg2="d", arg1=0, arg0=2)) == (3, 2, + "cd") + + # Make sure we get an exception if the constructor is called + # incorrectly. + with pytest.raises(Exception): + actor = Actor.remote() + + with pytest.raises(Exception): + actor = Actor.remote(0, 1, 2, arg3=3) + + with pytest.raises(Exception): + actor = Actor.remote(0, arg0=1) + + # Make sure we get an exception if the method is called incorrectly. + actor = Actor.remote(1) + with pytest.raises(Exception): + ray.get(actor.get_values.remote()) + + +def test_actor_name_conflict(ray_start_regular_shared): + @ray.remote + class A(object): + def foo(self): + return 100000 + + a = A.remote() + r = a.foo.remote() + + results = [r] + for x in range(10): + + @ray.remote + class A(object): + def foo(self): + return x + + a = A.remote() + r = a.foo.remote() + results.append(r) + + assert ray.get(results) == [100000, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + + +def test_variable_number_of_args(ray_start_regular_shared): + @ray.remote + class Actor: + def __init__(self, arg0, arg1=1, *args): + self.arg0 = arg0 + self.arg1 = arg1 + self.args = args + + def get_values(self, arg0, arg1=2, *args): + return self.arg0 + arg0, self.arg1 + arg1, self.args, args + + actor = Actor.remote(0) + assert ray.get(actor.get_values.remote(1)) == (1, 3, (), ()) + + actor = Actor.remote(1, 2) + assert ray.get(actor.get_values.remote(2, 3)) == (3, 5, (), ()) + + actor = Actor.remote(1, 2, "c") + assert ray.get(actor.get_values.remote(2, 3, "d")) == (3, 5, ("c", ), + ("d", )) + + actor = Actor.remote(1, 2, "a", "b", "c", "d") + assert ray.get(actor.get_values.remote( + 2, 3, 1, 2, 3, 4)) == (3, 5, ("a", "b", "c", "d"), (1, 2, 3, 4)) + + @ray.remote + class Actor: + def __init__(self, *args): + self.args = args + + def get_values(self, *args): + return self.args, args + + a = Actor.remote() + assert ray.get(a.get_values.remote()) == ((), ()) + a = Actor.remote(1) + assert ray.get(a.get_values.remote(2)) == ((1, ), (2, )) + a = Actor.remote(1, 2) + assert ray.get(a.get_values.remote(3, 4)) == ((1, 2), (3, 4)) + + +def test_no_args(ray_start_regular_shared): + @ray.remote + class Actor: + def __init__(self): + pass + + def get_values(self): + pass + + actor = Actor.remote() + assert ray.get(actor.get_values.remote()) is None + + +def test_no_constructor(ray_start_regular_shared): + @ray.remote + class Actor: + def get_values(self): + pass + + actor = Actor.remote() + assert ray.get(actor.get_values.remote()) is None + + +def test_custom_classes(ray_start_regular_shared): + class Foo: + def __init__(self, x): + self.x = x + + @ray.remote + class Actor: + def __init__(self, f2): + self.f1 = Foo(1) + self.f2 = f2 + + def get_values1(self): + return self.f1, self.f2 + + def get_values2(self, f3): + return self.f1, self.f2, f3 + + actor = Actor.remote(Foo(2)) + results1 = ray.get(actor.get_values1.remote()) + assert results1[0].x == 1 + assert results1[1].x == 2 + results2 = ray.get(actor.get_values2.remote(Foo(3))) + assert results2[0].x == 1 + assert results2[1].x == 2 + assert results2[2].x == 3 + + +def test_actor_class_attributes(ray_start_regular_shared): + class Grandparent: + GRANDPARENT = 2 + + class Parent1(Grandparent): + PARENT1 = 6 + + class Parent2: + PARENT2 = 7 + + @ray.remote + class TestActor(Parent1, Parent2): + X = 3 + + @classmethod + def f(cls): + assert TestActor.GRANDPARENT == 2 + assert TestActor.PARENT1 == 6 + assert TestActor.PARENT2 == 7 + assert TestActor.X == 3 + return 4 + + def g(self): + assert TestActor.GRANDPARENT == 2 + assert TestActor.PARENT1 == 6 + assert TestActor.PARENT2 == 7 + assert TestActor.f() == 4 + return TestActor.X + + t = TestActor.remote() + assert ray.get(t.g.remote()) == 3 + + +def test_actor_static_attributes(ray_start_regular_shared): + class Grandparent: + GRANDPARENT = 2 + + @staticmethod + def grandparent_static(): + assert Grandparent.GRANDPARENT == 2 + return 1 + + class Parent1(Grandparent): + PARENT1 = 6 + + @staticmethod + def parent1_static(): + assert Parent1.PARENT1 == 6 + return 2 + + def parent1(self): + assert Parent1.PARENT1 == 6 + + class Parent2: + PARENT2 = 7 + + def parent2(self): + assert Parent2.PARENT2 == 7 + + @ray.remote + class TestActor(Parent1, Parent2): + X = 3 + + @staticmethod + def f(): + assert TestActor.GRANDPARENT == 2 + assert TestActor.PARENT1 == 6 + assert TestActor.PARENT2 == 7 + assert TestActor.X == 3 + return 4 + + def g(self): + assert TestActor.GRANDPARENT == 2 + assert TestActor.PARENT1 == 6 + assert TestActor.PARENT2 == 7 + assert TestActor.f() == 4 + return TestActor.X + + t = TestActor.remote() + assert ray.get(t.g.remote()) == 3 + + +def test_decorator_args(ray_start_regular_shared): + # This is an invalid way of using the actor decorator. + with pytest.raises(Exception): + + @ray.remote() + class Actor: + def __init__(self): + pass + + # This is an invalid way of using the actor decorator. + with pytest.raises(Exception): + + @ray.remote(invalid_kwarg=0) # noqa: F811 + class Actor: # noqa: F811 + def __init__(self): + pass + + # This is an invalid way of using the actor decorator. + with pytest.raises(Exception): + + @ray.remote(num_cpus=0, invalid_kwarg=0) # noqa: F811 + class Actor: # noqa: F811 + def __init__(self): + pass + + # This is a valid way of using the decorator. + @ray.remote(num_cpus=1) # noqa: F811 + class Actor: # noqa: F811 + def __init__(self): + pass + + # This is a valid way of using the decorator. + @ray.remote(num_gpus=1) # noqa: F811 + class Actor: # noqa: F811 + def __init__(self): + pass + + # This is a valid way of using the decorator. + @ray.remote(num_cpus=1, num_gpus=1) # noqa: F811 + class Actor: # noqa: F811 + def __init__(self): + pass + + +def test_random_id_generation(ray_start_regular_shared): + @ray.remote + class Foo: + def __init__(self): + pass + + # Make sure that seeding numpy does not interfere with the generation + # of actor IDs. + np.random.seed(1234) + random.seed(1234) + f1 = Foo.remote() + np.random.seed(1234) + random.seed(1234) + f2 = Foo.remote() + + assert f1._actor_id != f2._actor_id + + +def test_actor_inheritance(ray_start_regular_shared): + class NonActorBase: + def __init__(self): + pass + + # Test that an actor class can inherit from a non-actor class. + @ray.remote + class ActorBase(NonActorBase): + def __init__(self): + pass + + # Test that you can't instantiate an actor class directly. + with pytest.raises( + Exception, match="Actors cannot be instantiated directly."): + ActorBase() + + # Test that you can't inherit from an actor class. + with pytest.raises( + TypeError, + match="Inheriting from actor classes is not " + "currently supported."): + + class Derived(ActorBase): + def __init__(self): + pass + + +def test_multiple_return_values(ray_start_regular_shared): + @ray.remote + class Foo: + def method0(self): + return 1 + + @ray.method(num_return_vals=1) + def method1(self): + return 1 + + @ray.method(num_return_vals=2) + def method2(self): + return 1, 2 + + @ray.method(num_return_vals=3) + def method3(self): + return 1, 2, 3 + + f = Foo.remote() + + id0 = f.method0.remote() + assert ray.get(id0) == 1 + + id1 = f.method1.remote() + assert ray.get(id1) == 1 + + id2a, id2b = f.method2.remote() + assert ray.get([id2a, id2b]) == [1, 2] + + id3a, id3b, id3c = f.method3.remote() + assert ray.get([id3a, id3b, id3c]) == [1, 2, 3] + + +def test_define_actor(ray_start_regular_shared): + @ray.remote + class Test: + def __init__(self, x): + self.x = x + + def f(self, y): + return self.x + y + + t = Test.remote(2) + assert ray.get(t.f.remote(1)) == 3 + + # Make sure that calling an actor method directly raises an exception. + with pytest.raises(Exception): + t.f(1) + + +def test_actor_deletion(ray_start_regular_shared): + # Make sure that when an actor handles goes out of scope, the actor + # destructor is called. + + @ray.remote + class Actor: + def getpid(self): + return os.getpid() + + a = Actor.remote() + pid = ray.get(a.getpid.remote()) + a = None + ray.test_utils.wait_for_pid_to_exit(pid) + + actors = [Actor.remote() for _ in range(10)] + pids = ray.get([a.getpid.remote() for a in actors]) + a = None + actors = None + [ray.test_utils.wait_for_pid_to_exit(pid) for pid in pids] + + +def test_actor_method_deletion(ray_start_regular_shared): + @ray.remote + class Actor: + def method(self): + return 1 + + # Make sure that if we create an actor and call a method on it + # immediately, the actor doesn't get killed before the method is + # called. + assert ray.get(Actor.remote().method.remote()) == 1 + + +def test_distributed_actor_handle_deletion(ray_start_regular_shared): + @ray.remote + class Actor: + def method(self): + return 1 + + def getpid(self): + return os.getpid() + + @ray.remote + def f(actor, signal): + ray.get(signal.wait.remote()) + return ray.get(actor.method.remote()) + + signal = ray.test_utils.SignalActor.remote() + a = Actor.remote() + pid = ray.get(a.getpid.remote()) + # Pass the handle to another task that cannot run yet. + x_id = f.remote(a, signal) + # Delete the original handle. The actor should not get killed yet. + del a + + # Once the task finishes, the actor process should get killed. + ray.get(signal.send.remote()) + assert ray.get(x_id) == 1 + ray.test_utils.wait_for_pid_to_exit(pid) + + +def test_multiple_actors(ray_start_regular_shared): + @ray.remote + class Counter: + def __init__(self, value): + self.value = value + + def increase(self): + self.value += 1 + return self.value + + def reset(self): + self.value = 0 + + num_actors = 5 + num_increases = 50 + # Create multiple actors. + actors = [Counter.remote(i) for i in range(num_actors)] + results = [] + # Call each actor's method a bunch of times. + for i in range(num_actors): + results += [actors[i].increase.remote() for _ in range(num_increases)] + result_values = ray.get(results) + for i in range(num_actors): + v = result_values[(num_increases * i):(num_increases * (i + 1))] + assert v == list(range(i + 1, num_increases + i + 1)) + + # Reset the actor values. + [actor.reset.remote() for actor in actors] + + # Interweave the method calls on the different actors. + results = [] + for j in range(num_increases): + results += [actor.increase.remote() for actor in actors] + result_values = ray.get(results) + for j in range(num_increases): + v = result_values[(num_actors * j):(num_actors * (j + 1))] + assert v == num_actors * [j + 1] + + +def test_inherit_actor_from_class(ray_start_regular_shared): # Make sure we can define an actor by inheriting from a regular class. # Note that actors cannot inherit from other actors. @@ -819,7 +819,7 @@ def test_inherit_actor_from_class(ray_start_regular): @pytest.mark.skip( "This test is just used to print the latency of creating 100 actors.") -def test_actor_creation_latency(ray_start_regular): +def test_actor_creation_latency(ray_start_regular_shared): # This test is just used to test the latency of actor creation. @ray.remote class Actor: diff --git a/python/ray/tests/test_actor_advanced.py b/python/ray/tests/test_actor_advanced.py index 833b4d1a4..eaeaa2e96 100644 --- a/python/ray/tests/test_actor_advanced.py +++ b/python/ray/tests/test_actor_advanced.py @@ -620,41 +620,6 @@ def test_calling_put_on_actor_handle(ray_start_regular): ray.get(g.remote()) -def test_pickling_actor_handle(ray_start_regular): - @ray.remote - class Foo: - def method(self): - pass - - f = Foo.remote() - new_f = ray.worker.pickle.loads(ray.worker.pickle.dumps(f)) - # Verify that we can call a method on the unpickled handle. TODO(rkn): - # we should also test this from a different driver. - ray.get(new_f.method.remote()) - - -def test_pickled_actor_handle_call_in_method_twice(ray_start_regular): - @ray.remote - class Actor1: - def f(self): - return 1 - - @ray.remote - class Actor2: - def __init__(self, constructor): - self.actor = constructor() - - def step(self): - ray.get(self.actor.f.remote()) - - a = Actor1.remote() - - b = Actor2.remote(lambda: a) - - ray.get(b.step.remote()) - ray.get(b.step.remote()) - - def test_detached_actor(ray_start_regular): @ray.remote class DetachedActor: @@ -836,26 +801,6 @@ def test_detached_actor_cleanup_due_to_failure(ray_start_cluster): assert ray.get(deatched_actor.ping.remote()) == "pong" -def test_kill(ray_start_regular): - @ray.remote - class Actor: - def hang(self): - while True: - time.sleep(1) - - actor = Actor.remote() - result = actor.hang.remote() - ready, _ = ray.wait([result], timeout=0.5) - assert len(ready) == 0 - ray.kill(actor, no_restart=False) - - with pytest.raises(ray.exceptions.RayActorError): - ray.get(result) - - with pytest.raises(ValueError): - ray.kill("not_an_actor_handle") - - # This test verifies actor creation task failure will not # hang the caller. def test_actor_creation_task_crash(ray_start_regular): @@ -952,6 +897,61 @@ def test_pending_actor_removed_by_owner(ray_start_regular): assert ray.get(f.remote()) +def test_pickling_actor_handle(ray_start_regular_shared): + @ray.remote + class Foo: + def method(self): + pass + + f = Foo.remote() + new_f = ray.worker.pickle.loads(ray.worker.pickle.dumps(f)) + # Verify that we can call a method on the unpickled handle. TODO(rkn): + # we should also test this from a different driver. + ray.get(new_f.method.remote()) + + +def test_pickled_actor_handle_call_in_method_twice(ray_start_regular_shared): + @ray.remote + class Actor1: + def f(self): + return 1 + + @ray.remote + class Actor2: + def __init__(self, constructor): + self.actor = constructor() + + def step(self): + ray.get(self.actor.f.remote()) + + a = Actor1.remote() + + b = Actor2.remote(lambda: a) + + ray.get(b.step.remote()) + ray.get(b.step.remote()) + + +def test_kill(ray_start_regular_shared): + @ray.remote + class Actor: + def hang(self): + while True: + time.sleep(1) + + actor = Actor.remote() + result = actor.hang.remote() + ready, _ = ray.wait([result], timeout=0.5) + assert len(ready) == 0 + ray.kill(actor, no_restart=False) + + with pytest.raises(ray.exceptions.RayActorError): + ray.get(result) + + with pytest.raises(ValueError): + ray.kill("not_an_actor_handle") + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 46c1ff5a0..c0ee85dc0 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -230,6 +230,34 @@ def test_fair_queueing(shutdown_only): assert len(ready) == 1000, len(ready) +def test_put_get(shutdown_only): + ray.init(num_cpus=0) + + for i in range(100): + value_before = i * 10**6 + object_ref = ray.put(value_before) + value_after = ray.get(object_ref) + assert value_before == value_after + + for i in range(100): + value_before = i * 10**6 * 1.0 + object_ref = ray.put(value_before) + value_after = ray.get(object_ref) + assert value_before == value_after + + for i in range(100): + value_before = "h" * i + object_ref = ray.put(value_before) + value_after = ray.get(object_ref) + assert value_before == value_after + + for i in range(100): + value_before = [1] * i + object_ref = ray.put(value_before) + value_after = ray.get(object_ref) + assert value_before == value_after + + def test_function_descriptor(): python_descriptor = ray._raylet.PythonFunctionDescriptor( "module_name", "function_name", "class_name", "function_hash") @@ -247,14 +275,7 @@ def test_function_descriptor(): assert d.get(python_descriptor2) == 123 -@pytest.mark.parametrize( - "ray_start_regular", [{ - "local_mode": True - }, { - "local_mode": False - }], - indirect=True) -def test_nested_functions(ray_start_regular): +def test_nested_functions(ray_start_shared_local_modes): # Make sure that remote functions can use other values that are defined # after the remote function but before the first function invocation. @ray.remote @@ -303,14 +324,7 @@ def test_nested_functions(ray_start_regular): assert ray.get(factorial_odd.remote(5)) == 120 -@pytest.mark.parametrize( - "ray_start_regular", [{ - "local_mode": True - }, { - "local_mode": False - }], - indirect=True) -def test_ray_recursive_objects(ray_start_regular): +def test_ray_recursive_objects(ray_start_shared_local_modes): class ClassA: pass @@ -336,14 +350,7 @@ def test_ray_recursive_objects(ray_start_regular): ray.put(obj) -@pytest.mark.parametrize( - "ray_start_regular", [{ - "local_mode": True - }, { - "local_mode": False - }], - indirect=True) -def test_reducer_override_no_reference_cycle(ray_start_regular): +def test_reducer_override_no_reference_cycle(ray_start_shared_local_modes): # bpo-39492: reducer_override used to induce a spurious reference cycle # inside the Pickler object, that could prevent all serialized objects # from being garbage-collected without explicity invoking gc.collect. @@ -379,14 +386,7 @@ def test_reducer_override_no_reference_cycle(ray_start_regular): assert new_obj() is None -@pytest.mark.parametrize( - "ray_start_regular", [{ - "local_mode": True - }, { - "local_mode": False - }], - indirect=True) -def test_deserialized_from_buffer_immutable(ray_start_regular): +def test_deserialized_from_buffer_immutable(ray_start_shared_local_modes): x = np.full((2, 2), 1.) o = ray.put(x) y = ray.get(o) @@ -395,14 +395,8 @@ def test_deserialized_from_buffer_immutable(ray_start_regular): y[0, 0] = 9. -@pytest.mark.parametrize( - "ray_start_regular", [{ - "local_mode": True - }, { - "local_mode": False - }], - indirect=True) -def test_passing_arguments_by_value_out_of_the_box(ray_start_regular): +def test_passing_arguments_by_value_out_of_the_box( + ray_start_shared_local_modes): @ray.remote def f(x): return x @@ -434,14 +428,8 @@ def test_passing_arguments_by_value_out_of_the_box(ray_start_regular): ray.get(ray.put(Foo)) -@pytest.mark.parametrize( - "ray_start_regular", [{ - "local_mode": True - }, { - "local_mode": False - }], - indirect=True) -def test_putting_object_that_closes_over_object_ref(ray_start_regular): +def test_putting_object_that_closes_over_object_ref( + ray_start_shared_local_modes): # This test is here to prevent a regression of # https://github.com/ray-project/ray/issues/1317. @@ -456,42 +444,7 @@ def test_putting_object_that_closes_over_object_ref(ray_start_regular): ray.put(f) -def test_put_get(shutdown_only): - ray.init(num_cpus=0) - - for i in range(100): - value_before = i * 10**6 - object_ref = ray.put(value_before) - value_after = ray.get(object_ref) - assert value_before == value_after - - for i in range(100): - value_before = i * 10**6 * 1.0 - object_ref = ray.put(value_before) - value_after = ray.get(object_ref) - assert value_before == value_after - - for i in range(100): - value_before = "h" * i - object_ref = ray.put(value_before) - value_after = ray.get(object_ref) - assert value_before == value_after - - for i in range(100): - value_before = [1] * i - object_ref = ray.put(value_before) - value_after = ray.get(object_ref) - assert value_before == value_after - - -@pytest.mark.parametrize( - "ray_start_regular", [{ - "local_mode": True - }, { - "local_mode": False - }], - indirect=True) -def test_custom_serializers(ray_start_regular): +def test_custom_serializers(ray_start_shared_local_modes): class Foo: def __init__(self): self.x = 3 @@ -521,14 +474,7 @@ def test_custom_serializers(ray_start_regular): assert ray.get(f.remote()) == ((3, "string1", Bar.__name__), "string2") -@pytest.mark.parametrize( - "ray_start_regular", [{ - "local_mode": True - }, { - "local_mode": False - }], - indirect=True) -def test_keyword_args(ray_start_regular): +def test_keyword_args(ray_start_shared_local_modes): @ray.remote def keyword_fct1(a, b="hello"): return "{} {}".format(a, b) @@ -613,14 +559,7 @@ def test_keyword_args(ray_start_regular): assert ray.get(f3.remote(4)) == 4 -@pytest.mark.parametrize( - "ray_start_regular", [{ - "local_mode": True - }, { - "local_mode": False - }], - indirect=True) -def test_args_starkwargs(ray_start_regular): +def test_args_starkwargs(ray_start_shared_local_modes): def starkwargs(a, b, **kwargs): return a, b, kwargs @@ -648,14 +587,7 @@ def test_args_starkwargs(ray_start_regular): ray.get(remote_test_function.remote(local_method, actor_method)) -@pytest.mark.parametrize( - "ray_start_regular", [{ - "local_mode": True - }, { - "local_mode": False - }], - indirect=True) -def test_args_named_and_star(ray_start_regular): +def test_args_named_and_star(ray_start_shared_local_modes): def hello(a, x="hello", **kwargs): return a, x, kwargs @@ -689,14 +621,7 @@ def test_args_named_and_star(ray_start_regular): ray.get(remote_test_function.remote(local_method, actor_method)) -@pytest.mark.parametrize( - "ray_start_regular", [{ - "local_mode": True - }, { - "local_mode": False - }], - indirect=True) -def test_args_stars_after(ray_start_regular): +def test_args_stars_after(ray_start_shared_local_modes): def star_args_after(a="hello", b="heo", *args, **kwargs): return a, b, args, kwargs @@ -728,7 +653,7 @@ def test_args_stars_after(ray_start_regular): ray.get(remote_test_function.remote(local_method, actor_method)) -def test_object_id_backward_compatibility(ray_start_regular): +def test_object_id_backward_compatibility(ray_start_shared_local_modes): # We've renamed Python's `ObjectID` to `ObjectRef`, and added a type # alias for backward compatibility. # This test is to make sure legacy code can still use `ObjectID`. @@ -743,7 +668,7 @@ def test_object_id_backward_compatibility(ray_start_regular): assert isinstance(object_ref, ray.ObjectRef) -def test_nonascii_in_function_body(ray_start_regular): +def test_nonascii_in_function_body(ray_start_shared_local_modes): @ray.remote def return_a_greek_char(): return "φ" @@ -752,5 +677,4 @@ def test_nonascii_in_function_body(ray_start_regular): if __name__ == "__main__": - import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_basic_2.py b/python/ray/tests/test_basic_2.py index f22939791..56fd44207 100644 --- a/python/ray/tests/test_basic_2.py +++ b/python/ray/tests/test_basic_2.py @@ -188,115 +188,6 @@ def test_redefining_remote_functions(shutdown_only): assert ray.get(ray.get(h.remote(i))) == i -@pytest.mark.parametrize( - "ray_start_regular", [{ - "local_mode": True - }, { - "local_mode": False - }], - indirect=True) -def test_get_multiple(ray_start_regular): - object_refs = [ray.put(i) for i in range(10)] - assert ray.get(object_refs) == list(range(10)) - - # Get a random choice of object refs with duplicates. - indices = list(np.random.choice(range(10), 5)) - indices += indices - results = ray.get([object_refs[i] for i in indices]) - assert results == indices - - -@pytest.mark.parametrize( - "ray_start_regular", [{ - "local_mode": True - }, { - "local_mode": False - }], - indirect=True) -def test_get_multiple_experimental(ray_start_regular): - object_refs = [ray.put(i) for i in range(10)] - - object_refs_tuple = tuple(object_refs) - assert ray.experimental.get(object_refs_tuple) == list(range(10)) - - object_refs_nparray = np.array(object_refs) - assert ray.experimental.get(object_refs_nparray) == list(range(10)) - - -@pytest.mark.parametrize( - "ray_start_regular", [{ - "local_mode": True - }, { - "local_mode": False - }], - indirect=True) -def test_get_dict(ray_start_regular): - d = {str(i): ray.put(i) for i in range(5)} - for i in range(5, 10): - d[str(i)] = i - result = ray.experimental.get(d) - expected = {str(i): i for i in range(10)} - assert result == expected - - -def test_get_with_timeout(ray_start_regular): - signal = ray.test_utils.SignalActor.remote() - - # Check that get() returns early if object is ready. - start = time.time() - ray.get(signal.wait.remote(should_wait=False), timeout=30) - assert time.time() - start < 30 - - # Check that get() raises a TimeoutError after the timeout if the object - # is not ready yet. - result_id = signal.wait.remote() - with pytest.raises(RayTimeoutError): - ray.get(result_id, timeout=0.1) - - # Check that a subsequent get() returns early. - ray.get(signal.send.remote()) - start = time.time() - ray.get(result_id, timeout=30) - assert time.time() - start < 30 - - -@pytest.mark.parametrize( - "ray_start_regular", [{ - "local_mode": True - }, { - "local_mode": False - }], - indirect=True) -# https://github.com/ray-project/ray/issues/6329 -def test_call_actors_indirect_through_tasks(ray_start_regular): - @ray.remote - class Counter: - def __init__(self, value): - self.value = int(value) - - def increase(self, delta): - self.value += int(delta) - return self.value - - @ray.remote - def foo(object): - return ray.get(object.increase.remote(1)) - - @ray.remote - def bar(object): - return ray.get(object.increase.remote(1)) - - @ray.remote - def zoo(object): - return ray.get(object[0].increase.remote(1)) - - c = Counter.remote(0) - for _ in range(0, 100): - ray.get(foo.remote(c)) - ray.get(bar.remote(c)) - ray.get(zoo.remote([c])) - - def test_call_matrix(shutdown_only): ray.init(object_store_memory=1000 * 1024 * 1024) @@ -362,62 +253,6 @@ def test_call_matrix(shutdown_only): check(source_actor, dest_actor, is_large, out_of_band) -@pytest.mark.parametrize( - "ray_start_cluster", [{ - "num_cpus": 1, - "num_nodes": 1, - }, { - "num_cpus": 1, - "num_nodes": 2, - }], - indirect=True) -def test_call_chain(ray_start_cluster): - @ray.remote - def g(x): - return x + 1 - - x = 0 - for _ in range(100): - x = g.remote(x) - assert ray.get(x) == 100 - - -def test_inline_arg_memory_corruption(ray_start_regular): - @ray.remote - def f(): - return np.zeros(1000, dtype=np.uint8) - - @ray.remote - class Actor: - def __init__(self): - self.z = [] - - def add(self, x): - self.z.append(x) - for prev in self.z: - assert np.sum(prev) == 0, ("memory corruption detected", prev) - - a = Actor.remote() - for i in range(100): - ray.get(a.add.remote(f.remote())) - - -def test_skip_plasma(ray_start_regular): - @ray.remote - class Actor: - def __init__(self): - pass - - def f(self, x): - return x * 2 - - a = Actor.remote() - obj_ref = a.f.remote(1) - # it is not stored in plasma - assert not ray.worker.global_worker.core_worker.object_exists(obj_ref) - assert ray.get(obj_ref) == 2 - - def test_actor_call_order(shutdown_only): ray.init(num_cpus=4) @@ -441,53 +276,6 @@ def test_actor_call_order(shutdown_only): for i in range(100)]) == list(range(100)) -def test_actor_large_objects(ray_start_regular): - @ray.remote - class Actor: - def __init__(self): - pass - - def f(self): - time.sleep(1) - return np.zeros(10000000) - - a = Actor.remote() - obj_ref = a.f.remote() - assert not ray.worker.global_worker.core_worker.object_exists(obj_ref) - done, _ = ray.wait([obj_ref]) - assert len(done) == 1 - assert ray.worker.global_worker.core_worker.object_exists(obj_ref) - assert isinstance(ray.get(obj_ref), np.ndarray) - - -def test_actor_pass_by_ref(ray_start_regular): - @ray.remote - class Actor: - def __init__(self): - pass - - def f(self, x): - return x * 2 - - @ray.remote - def f(x): - return x - - @ray.remote - def error(): - sys.exit(0) - - a = Actor.remote() - assert ray.get(a.f.remote(f.remote(1))) == 2 - - fut = [a.f.remote(f.remote(i)) for i in range(100)] - assert ray.get(fut) == [i * 2 for i in range(100)] - - # propagates errors for pass by ref - with pytest.raises(Exception): - ray.get(a.f.remote(error.remote())) - - def test_actor_pass_by_ref_order_optimization(shutdown_only): ray.init(num_cpus=4) @@ -525,7 +313,217 @@ def test_actor_pass_by_ref_order_optimization(shutdown_only): assert delta < 10, "did not skip slow value" -def test_actor_recursive(ray_start_regular): +@pytest.mark.parametrize( + "ray_start_cluster", [{ + "num_cpus": 1, + "num_nodes": 1, + }, { + "num_cpus": 1, + "num_nodes": 2, + }], + indirect=True) +def test_call_chain(ray_start_cluster): + @ray.remote + def g(x): + return x + 1 + + x = 0 + for _ in range(100): + x = g.remote(x) + assert ray.get(x) == 100 + + +def test_internal_config_when_connecting(ray_start_cluster): + config = json.dumps({ + "object_pinning_enabled": 0, + "initial_reconstruction_timeout_milliseconds": 200 + }) + cluster = ray.cluster_utils.Cluster() + cluster.add_node( + _internal_config=config, object_store_memory=100 * 1024 * 1024) + cluster.wait_for_nodes() + + # Specifying _internal_config when connecting to a cluster is disallowed. + with pytest.raises(ValueError): + ray.init(address=cluster.address, _internal_config=config) + + # Check that the config was picked up (object pinning is disabled). + ray.init(address=cluster.address) + obj_ref = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8)) + + for _ in range(5): + ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8)) + + # This would not raise an exception if object pinning was enabled. + with pytest.raises(ray.exceptions.UnreconstructableError): + ray.get(obj_ref) + + +def test_get_multiple(ray_start_regular_shared): + object_refs = [ray.put(i) for i in range(10)] + assert ray.get(object_refs) == list(range(10)) + + # Get a random choice of object refs with duplicates. + indices = list(np.random.choice(range(10), 5)) + indices += indices + results = ray.get([object_refs[i] for i in indices]) + assert results == indices + + +def test_get_multiple_experimental(ray_start_regular_shared): + object_refs = [ray.put(i) for i in range(10)] + + object_refs_tuple = tuple(object_refs) + assert ray.experimental.get(object_refs_tuple) == list(range(10)) + + object_refs_nparray = np.array(object_refs) + assert ray.experimental.get(object_refs_nparray) == list(range(10)) + + +def test_get_dict(ray_start_regular_shared): + d = {str(i): ray.put(i) for i in range(5)} + for i in range(5, 10): + d[str(i)] = i + result = ray.experimental.get(d) + expected = {str(i): i for i in range(10)} + assert result == expected + + +def test_get_with_timeout(ray_start_regular_shared): + signal = ray.test_utils.SignalActor.remote() + + # Check that get() returns early if object is ready. + start = time.time() + ray.get(signal.wait.remote(should_wait=False), timeout=30) + assert time.time() - start < 30 + + # Check that get() raises a TimeoutError after the timeout if the object + # is not ready yet. + result_id = signal.wait.remote() + with pytest.raises(RayTimeoutError): + ray.get(result_id, timeout=0.1) + + # Check that a subsequent get() returns early. + ray.get(signal.send.remote()) + start = time.time() + ray.get(result_id, timeout=30) + assert time.time() - start < 30 + + +# https://github.com/ray-project/ray/issues/6329 +def test_call_actors_indirect_through_tasks(ray_start_regular_shared): + @ray.remote + class Counter: + def __init__(self, value): + self.value = int(value) + + def increase(self, delta): + self.value += int(delta) + return self.value + + @ray.remote + def foo(object): + return ray.get(object.increase.remote(1)) + + @ray.remote + def bar(object): + return ray.get(object.increase.remote(1)) + + @ray.remote + def zoo(object): + return ray.get(object[0].increase.remote(1)) + + c = Counter.remote(0) + for _ in range(0, 100): + ray.get(foo.remote(c)) + ray.get(bar.remote(c)) + ray.get(zoo.remote([c])) + + +def test_inline_arg_memory_corruption(ray_start_regular_shared): + @ray.remote + def f(): + return np.zeros(1000, dtype=np.uint8) + + @ray.remote + class Actor: + def __init__(self): + self.z = [] + + def add(self, x): + self.z.append(x) + for prev in self.z: + assert np.sum(prev) == 0, ("memory corruption detected", prev) + + a = Actor.remote() + for i in range(100): + ray.get(a.add.remote(f.remote())) + + +def test_skip_plasma(ray_start_regular_shared): + @ray.remote + class Actor: + def __init__(self): + pass + + def f(self, x): + return x * 2 + + a = Actor.remote() + obj_ref = a.f.remote(1) + # it is not stored in plasma + assert not ray.worker.global_worker.core_worker.object_exists(obj_ref) + assert ray.get(obj_ref) == 2 + + +def test_actor_large_objects(ray_start_regular_shared): + @ray.remote + class Actor: + def __init__(self): + pass + + def f(self): + time.sleep(1) + return np.zeros(10000000) + + a = Actor.remote() + obj_ref = a.f.remote() + assert not ray.worker.global_worker.core_worker.object_exists(obj_ref) + done, _ = ray.wait([obj_ref]) + assert len(done) == 1 + assert ray.worker.global_worker.core_worker.object_exists(obj_ref) + assert isinstance(ray.get(obj_ref), np.ndarray) + + +def test_actor_pass_by_ref(ray_start_regular_shared): + @ray.remote + class Actor: + def __init__(self): + pass + + def f(self, x): + return x * 2 + + @ray.remote + def f(x): + return x + + @ray.remote + def error(): + sys.exit(0) + + a = Actor.remote() + assert ray.get(a.f.remote(f.remote(1))) == 2 + + fut = [a.f.remote(f.remote(i)) for i in range(100)] + assert ray.get(fut) == [i * 2 for i in range(100)] + + # propagates errors for pass by ref + with pytest.raises(Exception): + ray.get(a.f.remote(error.remote())) + + +def test_actor_recursive(ray_start_regular_shared): @ray.remote class Actor: def __init__(self, delegate=None): @@ -548,7 +546,7 @@ def test_actor_recursive(ray_start_regular): assert result == [x * 2 for x in range(100)] -def test_actor_concurrent(ray_start_regular): +def test_actor_concurrent(ray_start_regular_shared): @ray.remote class Batcher: def __init__(self): @@ -574,7 +572,7 @@ def test_actor_concurrent(ray_start_regular): assert r1 == r2 == r3 -def test_wait(ray_start_regular): +def test_wait(ray_start_regular_shared): @ray.remote def f(delay): time.sleep(delay) @@ -621,7 +619,7 @@ def test_wait(ray_start_regular): ray.wait([1]) -def test_duplicate_args(ray_start_regular): +def test_duplicate_args(ray_start_regular_shared): @ray.remote def f(arg1, arg2, @@ -650,32 +648,6 @@ def test_duplicate_args(ray_start_regular): arg1, arg2, arg1, kwarg1=arg1, kwarg2=arg2, kwarg1_duplicate=arg1)) -def test_internal_config_when_connecting(ray_start_cluster): - config = json.dumps({ - "object_pinning_enabled": 0, - "initial_reconstruction_timeout_milliseconds": 200 - }) - cluster = ray.cluster_utils.Cluster() - cluster.add_node( - _internal_config=config, object_store_memory=100 * 1024 * 1024) - cluster.wait_for_nodes() - - # Specifying _internal_config when connecting to a cluster is disallowed. - with pytest.raises(ValueError): - ray.init(address=cluster.address, _internal_config=config) - - # Check that the config was picked up (object pinning is disabled). - ray.init(address=cluster.address) - obj_ref = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8)) - - for _ in range(5): - ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8)) - - # This would not raise an exception if object pinning was enabled. - with pytest.raises(ray.exceptions.UnreconstructableError): - ray.get(obj_ref) - - def test_get_correct_node_ip(): with patch("ray.worker") as worker_mock: node_mock = MagicMock()