From c1b0f9ccdfa495991d672ee2369d0d374f3d7076 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Tue, 17 Mar 2020 10:30:21 -0500 Subject: [PATCH] Add failure tests to test_reference_counting (#7400) --- python/ray/_raylet.pyx | 3 +- python/ray/test_utils.py | 8 +- python/ray/tests/BUILD | 16 + python/ray/tests/test_actor_failures.py | 9 +- python/ray/tests/test_failure.py | 10 +- python/ray/tests/test_global_gc.py | 133 ++++ python/ray/tests/test_reference_counting.py | 738 +++++------------- python/ray/tests/test_reference_counting_2.py | 289 +++++++ src/ray/common/ray_config_def.h | 3 + src/ray/core_worker/core_worker.cc | 7 +- 10 files changed, 664 insertions(+), 552 deletions(-) create mode 100644 python/ray/tests/test_global_gc.py create mode 100644 python/ray/tests/test_reference_counting_2.py diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 1caa49707..3124bf685 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -533,7 +533,8 @@ cdef void async_plasma_callback(CObjectID object_id, if event_handler is not None: obj_id = ObjectID(object_id.Binary()) if data_size > 0 and obj_id: - # This must be asynchronous to allow objects to avoid blocking the IO thread. + # This must be asynchronous to allow objects to avoid blocking + # the IO thread. event_handler._loop.call_soon_threadsafe( event_handler._complete_future, obj_id) diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index 9fb3c53bd..19cd0afe9 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -137,24 +137,22 @@ def wait_for_errors(error_type, num_errors, timeout=20): def wait_for_condition(condition_predictor, - timeout_ms=1000, + timeout=1000, retry_interval_ms=100): """A helper function that waits until a condition is met. Args: condition_predictor: A function that predicts the condition. - timeout_ms: Maximum timeout in milliseconds. + timeout: Maximum timeout in seconds. retry_interval_ms: Retry interval in milliseconds. Return: Whether the condition is met within the timeout. """ - time_elapsed = 0 start = time.time() - while time_elapsed <= timeout_ms: + while time.time() - start <= timeout: if condition_predictor(): return True - time_elapsed = (time.time() - start) * 1000 time.sleep(retry_interval_ms / 1000.0) return False diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 84696a919..5fd0aeb6b 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -203,6 +203,22 @@ py_test( deps = ["//:ray_lib"], ) +py_test( + name = "test_reference_counting_2", + size = "medium", + srcs = ["test_reference_counting_2.py"], + tags = ["exclusive"], + deps = ["//:ray_lib"], +) + +py_test( + name = "test_global_gc", + size = "small", + srcs = ["test_global_gc.py"], + tags = ["exclusive"], + deps = ["//:ray_lib"], +) + py_test( name = "test_global_state", size = "small", diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index f9ef76644..46bc30e54 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -198,10 +198,13 @@ def test_actor_reconstruction_without_task(ray_start_regular): # Kill the actor. pid = ray.get(actor.get_pid.remote()) os.kill(pid, signal.SIGKILL) + # Wait until the actor is reconstructed. - assert wait_for_condition( - lambda: ray.worker.global_worker.core_worker.object_exists(obj_ids[1]), - timeout_ms=5000) + def check_reconstructed(): + worker = ray.worker.global_worker + return worker.core_worker.object_exists(obj_ids[1]) + + assert wait_for_condition(check_reconstructed) def test_actor_reconstruction_on_node_failure(ray_start_cluster_head): diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 344af40d8..171a36c1d 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -1142,23 +1142,21 @@ def test_fate_sharing(ray_start_cluster): pid = ray.get(a.get_pid.remote()) a.start_child.remote(use_actors=use_actors) # Wait for the child to be scheduled. - assert wait_for_condition( - lambda: not child_resource_available(), timeout_ms=10000) + assert wait_for_condition(lambda: not child_resource_available()) # Kill the parent process. os.kill(pid, 9) - assert wait_for_condition(child_resource_available, timeout_ms=10000) + assert wait_for_condition(child_resource_available) # Test fate sharing if the parent node dies. def test_node_failure(node_to_kill, use_actors): a = Actor.options(resources={"parent": 1}).remote() a.start_child.remote(use_actors=use_actors) # Wait for the child to be scheduled. - assert wait_for_condition( - lambda: not child_resource_available(), timeout_ms=10000) + assert wait_for_condition(lambda: not child_resource_available()) # Kill the parent process. cluster.remove_node(node_to_kill, allow_graceful=False) node_to_kill = cluster.add_node(num_cpus=1, resources={"parent": 1}) - assert wait_for_condition(child_resource_available, timeout_ms=10000) + assert wait_for_condition(child_resource_available) return node_to_kill test_process_failure(use_actors=True) diff --git a/python/ray/tests/test_global_gc.py b/python/ray/tests/test_global_gc.py new file mode 100644 index 000000000..d922436b3 --- /dev/null +++ b/python/ray/tests/test_global_gc.py @@ -0,0 +1,133 @@ +# coding: utf-8 +import gc +import logging +import weakref + +import numpy as np + +import pytest + +import ray +import ray.cluster_utils +from ray.test_utils import wait_for_condition +from ray.internal.internal_api import global_gc + +logger = logging.getLogger(__name__) + + +def test_global_gc(shutdown_only): + cluster = ray.cluster_utils.Cluster() + for _ in range(2): + cluster.add_node(num_cpus=1, num_gpus=0) + ray.init(address=cluster.address) + + class ObjectWithCyclicRef: + def __init__(self): + self.loop = self + + @ray.remote(num_cpus=1) + class GarbageHolder: + def __init__(self): + gc.disable() + x = ObjectWithCyclicRef() + self.garbage = weakref.ref(x) + + def has_garbage(self): + return self.garbage() is not None + + try: + gc.disable() + + # Local driver. + local_ref = weakref.ref(ObjectWithCyclicRef()) + + # Remote workers. + actors = [GarbageHolder.remote() for _ in range(2)] + assert local_ref() is not None + assert all(ray.get([a.has_garbage.remote() for a in actors])) + + # GC should be triggered for all workers, including the local driver. + global_gc() + + def check_refs_gced(): + return (local_ref() is None and + not any(ray.get([a.has_garbage.remote() for a in actors]))) + + assert wait_for_condition(check_refs_gced) + finally: + gc.enable() + + +def test_global_gc_when_full(shutdown_only): + cluster = ray.cluster_utils.Cluster() + for _ in range(2): + cluster.add_node( + num_cpus=1, num_gpus=0, object_store_memory=100 * 1024 * 1024) + ray.init(address=cluster.address) + + class LargeObjectWithCyclicRef: + def __init__(self): + self.loop = self + self.large_object = ray.put( + np.zeros(40 * 1024 * 1024, dtype=np.uint8)) + + @ray.remote(num_cpus=1) + class GarbageHolder: + def __init__(self): + gc.disable() + x = LargeObjectWithCyclicRef() + self.garbage = weakref.ref(x) + + def has_garbage(self): + return self.garbage() is not None + + def return_large_array(self): + return np.zeros(80 * 1024 * 1024, dtype=np.uint8) + + try: + gc.disable() + + # Local driver. + local_ref = weakref.ref(LargeObjectWithCyclicRef()) + + # Remote workers. + actors = [GarbageHolder.remote() for _ in range(2)] + assert local_ref() is not None + assert all(ray.get([a.has_garbage.remote() for a in actors])) + + # GC should be triggered for all workers, including the local driver, + # when the driver tries to ray.put a value that doesn't fit in the + # object store. This should cause the captured ObjectIDs' numpy arrays + # to be evicted. + ray.put(np.zeros(80 * 1024 * 1024, dtype=np.uint8)) + + def check_refs_gced(): + return (local_ref() is None and + not any(ray.get([a.has_garbage.remote() for a in actors]))) + + assert wait_for_condition(check_refs_gced) + + # Local driver. + local_ref = weakref.ref(LargeObjectWithCyclicRef()) + + # Remote workers. + actors = [GarbageHolder.remote() for _ in range(2)] + + # GC should be triggered for all workers, including the local driver, + # when a remote task tries to put a return value that doesn't fit in + # the object store. This should cause the captured ObjectIDs' numpy + # arrays to be evicted. + ray.get(actors[0].return_large_array.remote()) + + def check_refs_gced(): + return (local_ref() is None and + not any(ray.get([a.has_garbage.remote() for a in actors]))) + + assert wait_for_condition(check_refs_gced) + finally: + gc.enable() + + +if __name__ == "__main__": + import sys + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index 6d73e8a99..1f24654f6 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -1,11 +1,9 @@ # coding: utf-8 -import asyncio import copy import json import logging -import gc +import os import time -import weakref import numpy as np @@ -13,8 +11,7 @@ import pytest import ray import ray.cluster_utils -from ray.test_utils import SignalActor, put_object, wait_for_condition -from ray.internal.internal_api import global_gc +from ray.test_utils import SignalActor, put_object logger = logging.getLogger(__name__) @@ -23,6 +20,7 @@ logger = logging.getLogger(__name__) def one_worker_100MiB(request): config = json.dumps({ "distributed_ref_counting_enabled": 1, + "task_retry_delay_ms": 0, "object_store_full_max_retries": 3, "object_store_full_initial_delay_ms": 100, }) @@ -75,121 +73,6 @@ def check_refcounts(expected, timeout=10): time.sleep(0.1) -def test_global_gc(shutdown_only): - cluster = ray.cluster_utils.Cluster() - for _ in range(2): - cluster.add_node(num_cpus=1, num_gpus=0) - ray.init(address=cluster.address) - - class ObjectWithCyclicRef: - def __init__(self): - self.loop = self - - @ray.remote(num_cpus=1) - class GarbageHolder: - def __init__(self): - gc.disable() - x = ObjectWithCyclicRef() - self.garbage = weakref.ref(x) - - def has_garbage(self): - return self.garbage() is not None - - try: - gc.disable() - - # Local driver. - local_ref = weakref.ref(ObjectWithCyclicRef()) - - # Remote workers. - actors = [GarbageHolder.remote() for _ in range(2)] - assert local_ref() is not None - assert all(ray.get([a.has_garbage.remote() for a in actors])) - - # GC should be triggered for all workers, including the local driver. - global_gc() - - def check_refs_gced(): - return (local_ref() is None and - not any(ray.get([a.has_garbage.remote() for a in actors]))) - - wait_for_condition(check_refs_gced, timeout_ms=10000) - finally: - gc.enable() - - -def test_global_gc_when_full(shutdown_only): - cluster = ray.cluster_utils.Cluster() - for _ in range(2): - cluster.add_node( - num_cpus=1, num_gpus=0, object_store_memory=100 * 1024 * 1024) - ray.init(address=cluster.address) - - class LargeObjectWithCyclicRef: - def __init__(self): - self.loop = self - self.large_object = ray.put( - np.zeros(40 * 1024 * 1024, dtype=np.uint8)) - - @ray.remote(num_cpus=1) - class GarbageHolder: - def __init__(self): - gc.disable() - x = LargeObjectWithCyclicRef() - self.garbage = weakref.ref(x) - - def has_garbage(self): - return self.garbage() is not None - - def return_large_array(self): - return np.zeros(80 * 1024 * 1024, dtype=np.uint8) - - try: - gc.disable() - - # Local driver. - local_ref = weakref.ref(LargeObjectWithCyclicRef()) - - # Remote workers. - actors = [GarbageHolder.remote() for _ in range(2)] - assert local_ref() is not None - assert all(ray.get([a.has_garbage.remote() for a in actors])) - - # GC should be triggered for all workers, including the local driver, - # when the driver tries to ray.put a value that doesn't fit in the - # object store. This should cause the captured ObjectIDs' numpy arrays - # to be evicted. - ray.put(np.zeros(80 * 1024 * 1024, dtype=np.uint8)) - - def check_refs_gced(): - return (local_ref() is None and - not any(ray.get([a.has_garbage.remote() for a in actors]))) - - wait_for_condition(check_refs_gced, timeout_ms=10000) - - # Local driver. - local_ref = weakref.ref(LargeObjectWithCyclicRef()) - - # Remote workers. - actors = [GarbageHolder.remote() for _ in range(2)] - - def check_refs_gced(): - return (local_ref() is None and - not any(ray.get([a.has_garbage.remote() for a in actors]))) - - wait_for_condition(check_refs_gced, timeout_ms=10000) - - # GC should be triggered for all workers, including the local driver, - # when a remote task tries to put a return value that doesn't fit in - # the object store. This should cause the captured ObjectIDs' numpy - # arrays to be evicted. - ray.get(actors[0].return_large_array.remote()) - assert local_ref() is None - assert not any(ray.get([a.has_garbage.remote() for a in actors])) - finally: - gc.enable() - - def test_local_refcounts(ray_start_regular): oid1 = ray.put(None) check_refcounts({oid1: (1, 0)}) @@ -371,420 +254,6 @@ def test_feature_flag(shutdown_only): _fill_object_store_and_get(actor.get_large_object.remote(), succeed=False) -# Remote function takes serialized reference and doesn't hold onto it after -# finishing. Referenced object shouldn't be evicted while the task is pending -# and should be evicted after it returns. -@pytest.mark.parametrize("use_ray_put", [False, True]) -def test_basic_serialized_reference(one_worker_100MiB, use_ray_put): - @ray.remote - def pending(ref, dep): - ray.get(ref[0]) - - array_oid = put_object( - np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) - signal = SignalActor.remote() - oid = pending.remote([array_oid], signal.wait.remote()) - - # Remove the local reference. - array_oid_bytes = array_oid.binary() - del array_oid - - # Check that the remote reference pins the object. - _fill_object_store_and_get(array_oid_bytes) - - # Fulfill the dependency, causing the task to finish. - ray.get(signal.send.remote()) - ray.get(oid) - - # Reference should be gone, check that array gets evicted. - _fill_object_store_and_get(array_oid_bytes, succeed=False) - - -# Call a recursive chain of tasks that pass a serialized reference to the end -# of the chain. The reference should still exist while the final task in the -# chain is running and should be removed once it finishes. -@pytest.mark.parametrize("use_ray_put", [False, True]) -def test_recursive_serialized_reference(one_worker_100MiB, use_ray_put): - @ray.remote(num_cpus=0) - class Signal: - def __init__(self): - self.ready_event = asyncio.Event() - - def send(self): - self.ready_event.set() - - async def wait(self): - await self.ready_event.wait() - - @ray.remote - def recursive(ref, signal, max_depth, depth=0): - ray.get(ref[0]) - if depth == max_depth: - return ray.get(signal.wait.remote()) - else: - return recursive.remote(ref, signal, max_depth, depth + 1) - - signal = SignalActor.remote() - - max_depth = 5 - array_oid = put_object( - np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) - head_oid = recursive.remote([array_oid], signal, max_depth) - - # Remove the local reference. - array_oid_bytes = array_oid.binary() - del array_oid - - tail_oid = head_oid - for _ in range(max_depth): - tail_oid = ray.get(tail_oid) - - # Check that the remote reference pins the object. - _fill_object_store_and_get(array_oid_bytes) - - # Fulfill the dependency, causing the tail task to finish. - ray.get(signal.send.remote()) - assert ray.get(tail_oid) is None - - # Reference should be gone, check that array gets evicted. - _fill_object_store_and_get(array_oid_bytes, succeed=False) - - -# Test that a passed reference held by an actor after the method finishes -# is kept until the reference is removed from the actor. Also tests giving -# the actor a duplicate reference to the same object ID. -@pytest.mark.parametrize("use_ray_put", [False, True]) -def test_actor_holding_serialized_reference(one_worker_100MiB, use_ray_put): - @ray.remote - class GreedyActor(object): - def __init__(self): - pass - - def set_ref1(self, ref): - self.ref1 = ref - - def add_ref2(self, new_ref): - self.ref2 = new_ref - - def delete_ref1(self): - self.ref1 = None - - def delete_ref2(self): - self.ref2 = None - - # Test that the reference held by the actor isn't evicted. - array_oid = put_object( - np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) - actor = GreedyActor.remote() - actor.set_ref1.remote([array_oid]) - - # Test that giving the same actor a duplicate reference works. - ray.get(actor.add_ref2.remote([array_oid])) - - # Remove the local reference. - array_oid_bytes = array_oid.binary() - del array_oid - - # Test that the remote references still pin the object. - _fill_object_store_and_get(array_oid_bytes) - - # Test that removing only the first reference doesn't unpin the object. - ray.get(actor.delete_ref1.remote()) - _fill_object_store_and_get(array_oid_bytes) - - # Test that deleting the second reference stops it from being pinned. - ray.get(actor.delete_ref2.remote()) - _fill_object_store_and_get(array_oid_bytes, succeed=False) - - -# Test that a passed reference held by an actor after a task finishes -# is kept until the reference is removed from the worker. Also tests giving -# the worker a duplicate reference to the same object ID. -@pytest.mark.parametrize("use_ray_put", [False, True]) -def test_worker_holding_serialized_reference(one_worker_100MiB, use_ray_put): - @ray.remote(num_cpus=0) - class Signal: - def __init__(self): - self.ready_event = asyncio.Event() - - def send(self): - self.ready_event.set() - - async def wait(self): - await self.ready_event.wait() - - @ray.remote - def child(dep1, dep2): - return - - @ray.remote - def launch_pending_task(ref, signal): - return child.remote(ref[0], signal.wait.remote()) - - signal = SignalActor.remote() - - # Test that the reference held by the actor isn't evicted. - array_oid = put_object( - np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) - child_return_id = ray.get(launch_pending_task.remote([array_oid], signal)) - - # Remove the local reference. - array_oid_bytes = array_oid.binary() - del array_oid - - # Test that the reference prevents the object from being evicted. - _fill_object_store_and_get(array_oid_bytes) - - ray.get(signal.send.remote()) - ray.get(child_return_id) - del child_return_id - - _fill_object_store_and_get(array_oid_bytes, succeed=False) - - -# Test that an object containing object IDs within it pins the inner IDs. -def test_basic_nested_ids(one_worker_100MiB): - inner_oid = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8)) - outer_oid = ray.put([inner_oid]) - - # Remove the local reference to the inner object. - inner_oid_bytes = inner_oid.binary() - del inner_oid - - # Check that the outer reference pins the inner object. - _fill_object_store_and_get(inner_oid_bytes) - - # Remove the outer reference and check that the inner object gets evicted. - del outer_oid - _fill_object_store_and_get(inner_oid_bytes, succeed=False) - - -# Test that an object containing object IDs within it pins the inner IDs -# recursively and for submitted tasks. -@pytest.mark.parametrize("use_ray_put", [False, True]) -def test_recursively_nest_ids(one_worker_100MiB, use_ray_put): - @ray.remote(num_cpus=0) - class Signal: - def __init__(self): - self.ready_event = asyncio.Event() - - def send(self): - self.ready_event.set() - - async def wait(self): - await self.ready_event.wait() - - @ray.remote - def recursive(ref, signal, max_depth, depth=0): - unwrapped = ray.get(ref[0]) - if depth == max_depth: - return ray.get(signal.wait.remote()) - else: - return recursive.remote(unwrapped, signal, max_depth, depth + 1) - - signal = SignalActor.remote() - - max_depth = 5 - array_oid = put_object( - np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) - nested_oid = array_oid - for _ in range(max_depth): - nested_oid = ray.put([nested_oid]) - head_oid = recursive.remote([nested_oid], signal, max_depth) - - # Remove the local reference. - array_oid_bytes = array_oid.binary() - del array_oid, nested_oid - - tail_oid = head_oid - for _ in range(max_depth): - tail_oid = ray.get(tail_oid) - - # Check that the remote reference pins the object. - _fill_object_store_and_get(array_oid_bytes) - - # Fulfill the dependency, causing the tail task to finish. - ray.get(signal.send.remote()) - ray.get(tail_oid) - - # Reference should be gone, check that array gets evicted. - _fill_object_store_and_get(array_oid_bytes, succeed=False) - - -# Test that serialized objectIDs returned from remote tasks are pinned until -# they go out of scope on the caller side. -@pytest.mark.parametrize("use_ray_put", [False, True]) -def test_return_object_id(one_worker_100MiB, use_ray_put): - @ray.remote - def return_an_id(): - return [ - put_object( - np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) - ] - - outer_oid = return_an_id.remote() - inner_oid_binary = ray.get(outer_oid)[0].binary() - - # Check that the inner ID is pinned by the outer ID. - _fill_object_store_and_get(inner_oid_binary) - - # Check that taking a reference to the inner ID and removing the outer ID - # doesn't unpin the object. - inner_oid = ray.get(outer_oid)[0] - del outer_oid - _fill_object_store_and_get(inner_oid_binary) - - # Check that removing the inner ID unpins the object. - del inner_oid - _fill_object_store_and_get(inner_oid_binary, succeed=False) - - -# Test that serialized objectIDs returned from remote tasks are pinned if -# passed into another remote task by the caller. -@pytest.mark.parametrize("use_ray_put", [False, True]) -def test_pass_returned_object_id(one_worker_100MiB, use_ray_put): - @ray.remote(num_cpus=0) - class Signal: - def __init__(self): - self.ready_event = asyncio.Event() - - def send(self): - self.ready_event.set() - - async def wait(self): - await self.ready_event.wait() - - @ray.remote - def put(): - return - - @ray.remote - def return_an_id(): - return [ - put_object( - np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) - ] - - @ray.remote - def pending(ref): - ray.get(ref[0]) - return ref[0] - - signal = SignalActor.remote() - outer_oid = return_an_id.remote() - pending_oid = pending.remote([outer_oid]) - - # Remove the local reference to the returned ID. - del outer_oid - - # Check that the inner ID is pinned by the remote task ID. - _fill_object_store_and_get(pending_oid, succeed=False) - ray.get(signal.send.remote()) - inner_oid = ray.get(pending_oid) - inner_oid_binary = inner_oid.binary() - _fill_object_store_and_get(inner_oid_binary) - - del pending_oid - del inner_oid - _fill_object_store_and_get(inner_oid_binary, succeed=False) - - -# Call a recursive chain of tasks that pass a serialized reference that was -# returned by another task to the end of the chain. The reference should still -# exist while the final task in the chain is running and should be removed once -# it finishes. -@pytest.mark.parametrize("use_ray_put", [False, True]) -def test_recursively_pass_returned_object_id(one_worker_100MiB, use_ray_put): - @ray.remote(num_cpus=0) - class Signal: - def __init__(self): - self.ready_event = asyncio.Event() - - def send(self): - self.ready_event.set() - - async def wait(self): - await self.ready_event.wait() - - @ray.remote - def return_an_id(): - return put_object( - np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) - - @ray.remote - def recursive(ref, signal, max_depth, depth=0): - inner_id = ray.get(ref[0]) - if depth == max_depth: - ray.get(signal.wait.remote()) - return inner_id - else: - return recursive.remote(ref, signal, max_depth, depth + 1) - - max_depth = 5 - outer_oid = return_an_id.remote() - signal = SignalActor.remote() - head_oid = recursive.remote([outer_oid], signal, max_depth) - - # Remove the local reference. - - outer_oid = head_oid - for _ in range(max_depth): - outer_oid = ray.get(outer_oid) - - # Fill the object store. - _fill_object_store_and_get(outer_oid, succeed=False) - - # Fulfill the dependency, causing the tail task to finish. - ray.get(signal.send.remote()) - - # Check that the remote reference pins the object. - inner_oid = ray.get(outer_oid) - _fill_object_store_and_get(inner_oid) - inner_oid_bytes = inner_oid.binary() - - # Reference should be gone, check that returned ID gets evicted. - del head_oid - del outer_oid - del inner_oid - _fill_object_store_and_get(inner_oid_bytes, succeed=False) - - -# Call a recursive chain of tasks. The final task in the chain returns an -# ObjectID returned by a task that it submitted. Every other task in the chain -# returns the same ObjectID by calling ray.get() on its submitted task and -# returning the result. The reference should still exist while the driver has a -# reference to the final task's ObjectID. -@pytest.mark.parametrize("use_ray_put", [False, True]) -def test_recursively_return_borrowed_object_id(one_worker_100MiB, use_ray_put): - @ray.remote - def recursive(num_tasks_left): - if num_tasks_left == 0: - return put_object( - np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) - - final_id = ray.get(recursive.remote(num_tasks_left - 1)) - ray.get(final_id) - return final_id - - max_depth = 5 - head_oid = recursive.remote(max_depth) - final_oid = ray.get(head_oid) - final_oid_bytes = final_oid.binary() - - # Check that the driver's reference pins the object. - _fill_object_store_and_get(final_oid_bytes) - - # Remove the local reference and try it again. - final_oid = ray.get(head_oid) - _fill_object_store_and_get(final_oid_bytes) - - # Remove all references. - del head_oid - del final_oid - # Reference should be gone, check that returned ID gets evicted. - _fill_object_store_and_get(final_oid_bytes, succeed=False) - - def test_out_of_band_serialized_object_id(one_worker_100MiB): assert len( ray.worker.global_worker.core_worker.get_all_reference_counts()) == 0 @@ -840,6 +309,207 @@ def test_captured_object_id(one_worker_100MiB): _fill_object_store_and_get(oid) +# Remote function takes serialized reference and doesn't hold onto it after +# finishing. Referenced object shouldn't be evicted while the task is pending +# and should be evicted after it returns. +@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True), + (True, False), (True, True)]) +def test_basic_serialized_reference(one_worker_100MiB, use_ray_put, failure): + @ray.remote(max_retries=1) + def pending(ref, dep): + ray.get(ref[0]) + if failure: + os._exit(0) + + array_oid = put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) + signal = SignalActor.remote() + oid = pending.remote([array_oid], signal.wait.remote()) + + # Remove the local reference. + array_oid_bytes = array_oid.binary() + del array_oid + + # Check that the remote reference pins the object. + _fill_object_store_and_get(array_oid_bytes) + + # Fulfill the dependency, causing the task to finish. + ray.get(signal.send.remote()) + try: + ray.get(oid) + assert not failure + except ray.exceptions.RayWorkerError: + assert failure + + # Reference should be gone, check that array gets evicted. + _fill_object_store_and_get(array_oid_bytes, succeed=False) + + +# Call a recursive chain of tasks that pass a serialized reference to the end +# of the chain. The reference should still exist while the final task in the +# chain is running and should be removed once it finishes. +@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True), + (True, False), (True, True)]) +def test_recursive_serialized_reference(one_worker_100MiB, use_ray_put, + failure): + @ray.remote(max_retries=1) + def recursive(ref, signal, max_depth, depth=0): + ray.get(ref[0]) + if depth == max_depth: + ray.get(signal.wait.remote()) + if failure: + os._exit(0) + return + else: + return recursive.remote(ref, signal, max_depth, depth + 1) + + signal = SignalActor.remote() + + max_depth = 5 + array_oid = put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) + head_oid = recursive.remote([array_oid], signal, max_depth) + + # Remove the local reference. + array_oid_bytes = array_oid.binary() + del array_oid + + tail_oid = head_oid + for _ in range(max_depth): + tail_oid = ray.get(tail_oid) + + # Check that the remote reference pins the object. + _fill_object_store_and_get(array_oid_bytes) + + # Fulfill the dependency, causing the tail task to finish. + ray.get(signal.send.remote()) + try: + assert ray.get(tail_oid) is None + assert not failure + # TODO(edoakes): this should raise WorkerError. + except ray.exceptions.UnreconstructableError: + assert failure + + # Reference should be gone, check that array gets evicted. + _fill_object_store_and_get(array_oid_bytes, succeed=False) + + +# Test that a passed reference held by an actor after the method finishes +# is kept until the reference is removed from the actor. Also tests giving +# the actor a duplicate reference to the same object ID. +@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True), + (True, False), (True, True)]) +def test_actor_holding_serialized_reference(one_worker_100MiB, use_ray_put, + failure): + @ray.remote + class GreedyActor(object): + def __init__(self): + pass + + def set_ref1(self, ref): + self.ref1 = ref + + def add_ref2(self, new_ref): + self.ref2 = new_ref + + def delete_ref1(self): + self.ref1 = None + + def delete_ref2(self): + self.ref2 = None + + # Test that the reference held by the actor isn't evicted. + array_oid = put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) + actor = GreedyActor.remote() + actor.set_ref1.remote([array_oid]) + + # Test that giving the same actor a duplicate reference works. + ray.get(actor.add_ref2.remote([array_oid])) + + # Remove the local reference. + array_oid_bytes = array_oid.binary() + del array_oid + + # Test that the remote references still pin the object. + _fill_object_store_and_get(array_oid_bytes) + + # Test that removing only the first reference doesn't unpin the object. + ray.get(actor.delete_ref1.remote()) + _fill_object_store_and_get(array_oid_bytes) + + if failure: + # Test that the actor exiting stops the reference from being pinned. + ray.kill(actor) + # Wait for the actor to exit. + with pytest.raises(ray.exceptions.RayActorError): + ray.get(actor.delete_ref1.remote()) + else: + # Test that deleting the second reference stops it from being pinned. + ray.get(actor.delete_ref2.remote()) + _fill_object_store_and_get(array_oid_bytes, succeed=False) + + +# Test that a passed reference held by an actor after a task finishes +# is kept until the reference is removed from the worker. Also tests giving +# the worker a duplicate reference to the same object ID. +@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True), + (True, False), (True, True)]) +def test_worker_holding_serialized_reference(one_worker_100MiB, use_ray_put, + failure): + @ray.remote(max_retries=1) + def child(dep1, dep2): + if failure: + os._exit(0) + return + + @ray.remote + def launch_pending_task(ref, signal): + return child.remote(ref[0], signal.wait.remote()) + + signal = SignalActor.remote() + + # Test that the reference held by the actor isn't evicted. + array_oid = put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) + child_return_id = ray.get(launch_pending_task.remote([array_oid], signal)) + + # Remove the local reference. + array_oid_bytes = array_oid.binary() + del array_oid + + # Test that the reference prevents the object from being evicted. + _fill_object_store_and_get(array_oid_bytes) + + ray.get(signal.send.remote()) + try: + ray.get(child_return_id) + assert not failure + except (ray.exceptions.RayWorkerError, + ray.exceptions.UnreconstructableError): + assert failure + del child_return_id + + _fill_object_store_and_get(array_oid_bytes, succeed=False) + + +# Test that an object containing object IDs within it pins the inner IDs. +def test_basic_nested_ids(one_worker_100MiB): + inner_oid = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8)) + outer_oid = ray.put([inner_oid]) + + # Remove the local reference to the inner object. + inner_oid_bytes = inner_oid.binary() + del inner_oid + + # Check that the outer reference pins the inner object. + _fill_object_store_and_get(inner_oid_bytes) + + # Remove the outer reference and check that the inner object gets evicted. + del outer_oid + _fill_object_store_and_get(inner_oid_bytes, succeed=False) + + if __name__ == "__main__": import sys sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_reference_counting_2.py b/python/ray/tests/test_reference_counting_2.py new file mode 100644 index 000000000..49895386c --- /dev/null +++ b/python/ray/tests/test_reference_counting_2.py @@ -0,0 +1,289 @@ +# coding: utf-8 +import json +import logging +import os +import signal + +import numpy as np + +import pytest + +import ray +import ray.cluster_utils +from ray.test_utils import SignalActor, put_object, wait_for_condition + +logger = logging.getLogger(__name__) + + +@pytest.fixture +def one_worker_100MiB(request): + config = json.dumps({ + "distributed_ref_counting_enabled": 1, + "object_store_full_max_retries": 2, + "task_retry_delay_ms": 0, + }) + yield ray.init( + num_cpus=1, + object_store_memory=100 * 1024 * 1024, + _internal_config=config) + ray.shutdown() + + +def _fill_object_store_and_get(oid, succeed=True, object_MiB=40, + num_objects=5): + for _ in range(num_objects): + ray.put(np.zeros(object_MiB * 1024 * 1024, dtype=np.uint8)) + + if type(oid) is bytes: + oid = ray.ObjectID(oid) + + if succeed: + ray.get(oid) + else: + if oid.is_direct_call_type(): + with pytest.raises(ray.exceptions.RayTimeoutError): + ray.get(oid, timeout=0.1) + else: + with pytest.raises(ray.exceptions.UnreconstructableError): + ray.get(oid) + + +# Test that an object containing object IDs within it pins the inner IDs +# recursively and for submitted tasks. +@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True), + (True, False), (True, True)]) +def test_recursively_nest_ids(one_worker_100MiB, use_ray_put, failure): + @ray.remote(max_retries=1) + def recursive(ref, signal, max_depth, depth=0): + unwrapped = ray.get(ref[0]) + if depth == max_depth: + ray.get(signal.wait.remote()) + if failure: + os._exit(0) + return + else: + return recursive.remote(unwrapped, signal, max_depth, depth + 1) + + signal = SignalActor.remote() + + max_depth = 5 + array_oid = put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) + nested_oid = array_oid + for _ in range(max_depth): + nested_oid = ray.put([nested_oid]) + head_oid = recursive.remote([nested_oid], signal, max_depth) + + # Remove the local reference. + array_oid_bytes = array_oid.binary() + del array_oid, nested_oid + + tail_oid = head_oid + for _ in range(max_depth): + tail_oid = ray.get(tail_oid) + + # Check that the remote reference pins the object. + _fill_object_store_and_get(array_oid_bytes) + + # Fulfill the dependency, causing the tail task to finish. + ray.get(signal.send.remote()) + try: + ray.get(tail_oid) + assert not failure + # TODO(edoakes): this should raise WorkerError. + except ray.exceptions.UnreconstructableError: + assert failure + + # Reference should be gone, check that array gets evicted. + _fill_object_store_and_get(array_oid_bytes, succeed=False) + + +# Test that serialized objectIDs returned from remote tasks are pinned until +# they go out of scope on the caller side. +@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True), + (True, False), (True, True)]) +def test_return_object_id(one_worker_100MiB, use_ray_put, failure): + @ray.remote + def return_an_id(): + return [ + put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) + ] + + @ray.remote(max_retries=1) + def exit(): + os._exit(0) + + outer_oid = return_an_id.remote() + inner_oid_binary = ray.get(outer_oid)[0].binary() + + # Check that the inner ID is pinned by the outer ID. + _fill_object_store_and_get(inner_oid_binary) + + # Check that taking a reference to the inner ID and removing the outer ID + # doesn't unpin the object. + inner_oid = ray.get(outer_oid)[0] # noqa: F841 + del outer_oid + _fill_object_store_and_get(inner_oid_binary) + + if failure: + # Check that the owner dying unpins the object. This should execute on + # the same worker because there is only one started and the other tasks + # have finished. + with pytest.raises(ray.exceptions.RayWorkerError): + ray.get(exit.remote()) + else: + # Check that removing the inner ID unpins the object. + del inner_oid + _fill_object_store_and_get(inner_oid_binary, succeed=False) + + +# Test that serialized objectIDs returned from remote tasks are pinned if +# passed into another remote task by the caller. +@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True), + (True, False), (True, True)]) +def test_pass_returned_object_id(one_worker_100MiB, use_ray_put, failure): + @ray.remote + def return_an_id(): + return [ + put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) + ] + + # TODO(edoakes): this fails with an ActorError with max_retries=1. + @ray.remote(max_retries=0) + def pending(ref, signal): + ray.get(signal.wait.remote()) + ray.get(ref[0]) + if failure: + os._exit(0) + + signal = SignalActor.remote() + outer_oid = return_an_id.remote() + inner_oid_binary = ray.get(outer_oid)[0].binary() + pending_oid = pending.remote([outer_oid], signal) + + # Remove the local reference to the returned ID. + del outer_oid + + # Check that the inner ID is pinned by the remote task ID and finishing + # the task unpins the object. + ray.get(signal.send.remote()) + try: + # Should succeed because inner_oid is pinned if no failure. + ray.get(pending_oid) + assert not failure + except ray.exceptions.RayWorkerError: + assert failure + + def ref_not_exists(): + worker = ray.worker.global_worker + inner_oid = ray.ObjectID(inner_oid_binary) + return not worker.core_worker.object_exists(inner_oid) + + assert wait_for_condition(ref_not_exists) + + +# Call a recursive chain of tasks that pass a serialized reference that was +# returned by another task to the end of the chain. The reference should still +# exist while the final task in the chain is running and should be removed once +# it finishes. +@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True), + (True, False), (True, True)]) +def test_recursively_pass_returned_object_id(one_worker_100MiB, use_ray_put, + failure): + @ray.remote + def return_an_id(): + return put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) + + @ray.remote(max_retries=1) + def recursive(ref, signal, max_depth, depth=0): + inner_id = ray.get(ref[0]) + if depth == max_depth: + ray.get(signal.wait.remote()) + if failure: + os._exit(0) + return inner_id + else: + return inner_id, recursive.remote(ref, signal, max_depth, + depth + 1) + + max_depth = 5 + outer_oid = return_an_id.remote() + signal = SignalActor.remote() + head_oid = recursive.remote([outer_oid], signal, max_depth) + + # Remove the local reference. + inner_oid = None + outer_oid = head_oid + for i in range(max_depth): + inner_oid, outer_oid = ray.get(outer_oid) + + # Check that the remote reference pins the object. + _fill_object_store_and_get(outer_oid, succeed=False) + + # Fulfill the dependency, causing the tail task to finish. + ray.get(signal.send.remote()) + + try: + # Check that the remote reference pins the object. + ray.get(outer_oid) + _fill_object_store_and_get(inner_oid) + assert not failure + # TODO(edoakes): this should raise WorkerError. + except ray.exceptions.UnreconstructableError: + assert failure + + inner_oid_bytes = inner_oid.binary() + del inner_oid + del head_oid + del outer_oid + + # Reference should be gone, check that returned ID gets evicted. + _fill_object_store_and_get(inner_oid_bytes, succeed=False) + + +# Call a recursive chain of tasks. The final task in the chain returns an +# ObjectID returned by a task that it submitted. Every other task in the chain +# returns the same ObjectID by calling ray.get() on its submitted task and +# returning the result. The reference should still exist while the driver has a +# reference to the final task's ObjectID. +@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True), + (True, False), (True, True)]) +def test_recursively_return_borrowed_object_id(one_worker_100MiB, use_ray_put, + failure): + @ray.remote + def recursive(num_tasks_left): + if num_tasks_left == 0: + return put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), + use_ray_put), os.getpid() + + return ray.get(recursive.remote(num_tasks_left - 1)) + + max_depth = 5 + head_oid = recursive.remote(max_depth) + final_oid, owner_pid = ray.get(head_oid) + final_oid_bytes = final_oid.binary() + + # Check that the driver's reference pins the object. + _fill_object_store_and_get(final_oid_bytes) + + # Remove the local reference and try it again. + _fill_object_store_and_get(final_oid_bytes) + + if failure: + os.kill(owner_pid, signal.SIGKILL) + else: + # Remove all references. + del head_oid + del final_oid + + # Reference should be gone, check that returned ID gets evicted. + _fill_object_store_and_get(final_oid_bytes, succeed=False) + + +if __name__ == "__main__": + import sys + sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index f6a9ddc00..7834cf686 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -260,3 +260,6 @@ RAY_CONFIG(int32_t, object_store_full_max_retries, 5) /// Duration to sleep after failing to put an object in plasma because it is full. /// This will be exponentially increased for each retry. RAY_CONFIG(uint32_t, object_store_full_initial_delay_ms, 1000) + +/// Duration to wait between retries for failed tasks. +RAY_CONFIG(uint32_t, task_retry_delay_ms, 5000) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index e8df34089..6ada5b66a 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -202,10 +202,11 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, [this](const TaskSpecification &spec) { // Retry after a delay to emulate the existing Raylet reconstruction // behaviour. TODO(ekl) backoff exponentially. - RAY_LOG(ERROR) << "Will resubmit task after a 5 second delay: " - << spec.DebugString(); + uint32_t delay = RayConfig::instance().task_retry_delay_ms(); + RAY_LOG(ERROR) << "Will resubmit task after a " << delay + << "ms delay: " << spec.DebugString(); absl::MutexLock lock(&mutex_); - to_resubmit_.push_back(std::make_pair(current_time_ms() + 5000, spec)); + to_resubmit_.push_back(std::make_pair(current_time_ms() + delay, spec)); })); // Create an entry for the driver task in the task table. This task is