Add failure tests to test_reference_counting (#7400)

This commit is contained in:
Edward Oakes
2020-03-17 10:30:21 -05:00
committed by GitHub
parent 7678418210
commit c1b0f9ccdf
10 changed files with 664 additions and 552 deletions
+2 -1
View File
@@ -533,7 +533,8 @@ cdef void async_plasma_callback(CObjectID object_id,
if event_handler is not None:
obj_id = ObjectID(object_id.Binary())
if data_size > 0 and obj_id:
# This must be asynchronous to allow objects to avoid blocking the IO thread.
# This must be asynchronous to allow objects to avoid blocking
# the IO thread.
event_handler._loop.call_soon_threadsafe(
event_handler._complete_future, obj_id)
+3 -5
View File
@@ -137,24 +137,22 @@ def wait_for_errors(error_type, num_errors, timeout=20):
def wait_for_condition(condition_predictor,
timeout_ms=1000,
timeout=1000,
retry_interval_ms=100):
"""A helper function that waits until a condition is met.
Args:
condition_predictor: A function that predicts the condition.
timeout_ms: Maximum timeout in milliseconds.
timeout: Maximum timeout in seconds.
retry_interval_ms: Retry interval in milliseconds.
Return:
Whether the condition is met within the timeout.
"""
time_elapsed = 0
start = time.time()
while time_elapsed <= timeout_ms:
while time.time() - start <= timeout:
if condition_predictor():
return True
time_elapsed = (time.time() - start) * 1000
time.sleep(retry_interval_ms / 1000.0)
return False
+16
View File
@@ -203,6 +203,22 @@ py_test(
deps = ["//:ray_lib"],
)
py_test(
name = "test_reference_counting_2",
size = "medium",
srcs = ["test_reference_counting_2.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_global_gc",
size = "small",
srcs = ["test_global_gc.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_global_state",
size = "small",
+6 -3
View File
@@ -198,10 +198,13 @@ def test_actor_reconstruction_without_task(ray_start_regular):
# Kill the actor.
pid = ray.get(actor.get_pid.remote())
os.kill(pid, signal.SIGKILL)
# Wait until the actor is reconstructed.
assert wait_for_condition(
lambda: ray.worker.global_worker.core_worker.object_exists(obj_ids[1]),
timeout_ms=5000)
def check_reconstructed():
worker = ray.worker.global_worker
return worker.core_worker.object_exists(obj_ids[1])
assert wait_for_condition(check_reconstructed)
def test_actor_reconstruction_on_node_failure(ray_start_cluster_head):
+4 -6
View File
@@ -1142,23 +1142,21 @@ def test_fate_sharing(ray_start_cluster):
pid = ray.get(a.get_pid.remote())
a.start_child.remote(use_actors=use_actors)
# Wait for the child to be scheduled.
assert wait_for_condition(
lambda: not child_resource_available(), timeout_ms=10000)
assert wait_for_condition(lambda: not child_resource_available())
# Kill the parent process.
os.kill(pid, 9)
assert wait_for_condition(child_resource_available, timeout_ms=10000)
assert wait_for_condition(child_resource_available)
# Test fate sharing if the parent node dies.
def test_node_failure(node_to_kill, use_actors):
a = Actor.options(resources={"parent": 1}).remote()
a.start_child.remote(use_actors=use_actors)
# Wait for the child to be scheduled.
assert wait_for_condition(
lambda: not child_resource_available(), timeout_ms=10000)
assert wait_for_condition(lambda: not child_resource_available())
# Kill the parent process.
cluster.remove_node(node_to_kill, allow_graceful=False)
node_to_kill = cluster.add_node(num_cpus=1, resources={"parent": 1})
assert wait_for_condition(child_resource_available, timeout_ms=10000)
assert wait_for_condition(child_resource_available)
return node_to_kill
test_process_failure(use_actors=True)
+133
View File
@@ -0,0 +1,133 @@
# coding: utf-8
import gc
import logging
import weakref
import numpy as np
import pytest
import ray
import ray.cluster_utils
from ray.test_utils import wait_for_condition
from ray.internal.internal_api import global_gc
logger = logging.getLogger(__name__)
def test_global_gc(shutdown_only):
cluster = ray.cluster_utils.Cluster()
for _ in range(2):
cluster.add_node(num_cpus=1, num_gpus=0)
ray.init(address=cluster.address)
class ObjectWithCyclicRef:
def __init__(self):
self.loop = self
@ray.remote(num_cpus=1)
class GarbageHolder:
def __init__(self):
gc.disable()
x = ObjectWithCyclicRef()
self.garbage = weakref.ref(x)
def has_garbage(self):
return self.garbage() is not None
try:
gc.disable()
# Local driver.
local_ref = weakref.ref(ObjectWithCyclicRef())
# Remote workers.
actors = [GarbageHolder.remote() for _ in range(2)]
assert local_ref() is not None
assert all(ray.get([a.has_garbage.remote() for a in actors]))
# GC should be triggered for all workers, including the local driver.
global_gc()
def check_refs_gced():
return (local_ref() is None and
not any(ray.get([a.has_garbage.remote() for a in actors])))
assert wait_for_condition(check_refs_gced)
finally:
gc.enable()
def test_global_gc_when_full(shutdown_only):
cluster = ray.cluster_utils.Cluster()
for _ in range(2):
cluster.add_node(
num_cpus=1, num_gpus=0, object_store_memory=100 * 1024 * 1024)
ray.init(address=cluster.address)
class LargeObjectWithCyclicRef:
def __init__(self):
self.loop = self
self.large_object = ray.put(
np.zeros(40 * 1024 * 1024, dtype=np.uint8))
@ray.remote(num_cpus=1)
class GarbageHolder:
def __init__(self):
gc.disable()
x = LargeObjectWithCyclicRef()
self.garbage = weakref.ref(x)
def has_garbage(self):
return self.garbage() is not None
def return_large_array(self):
return np.zeros(80 * 1024 * 1024, dtype=np.uint8)
try:
gc.disable()
# Local driver.
local_ref = weakref.ref(LargeObjectWithCyclicRef())
# Remote workers.
actors = [GarbageHolder.remote() for _ in range(2)]
assert local_ref() is not None
assert all(ray.get([a.has_garbage.remote() for a in actors]))
# GC should be triggered for all workers, including the local driver,
# when the driver tries to ray.put a value that doesn't fit in the
# object store. This should cause the captured ObjectIDs' numpy arrays
# to be evicted.
ray.put(np.zeros(80 * 1024 * 1024, dtype=np.uint8))
def check_refs_gced():
return (local_ref() is None and
not any(ray.get([a.has_garbage.remote() for a in actors])))
assert wait_for_condition(check_refs_gced)
# Local driver.
local_ref = weakref.ref(LargeObjectWithCyclicRef())
# Remote workers.
actors = [GarbageHolder.remote() for _ in range(2)]
# GC should be triggered for all workers, including the local driver,
# when a remote task tries to put a return value that doesn't fit in
# the object store. This should cause the captured ObjectIDs' numpy
# arrays to be evicted.
ray.get(actors[0].return_large_array.remote())
def check_refs_gced():
return (local_ref() is None and
not any(ray.get([a.has_garbage.remote() for a in actors])))
assert wait_for_condition(check_refs_gced)
finally:
gc.enable()
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))
+204 -534
View File
@@ -1,11 +1,9 @@
# coding: utf-8
import asyncio
import copy
import json
import logging
import gc
import os
import time
import weakref
import numpy as np
@@ -13,8 +11,7 @@ import pytest
import ray
import ray.cluster_utils
from ray.test_utils import SignalActor, put_object, wait_for_condition
from ray.internal.internal_api import global_gc
from ray.test_utils import SignalActor, put_object
logger = logging.getLogger(__name__)
@@ -23,6 +20,7 @@ logger = logging.getLogger(__name__)
def one_worker_100MiB(request):
config = json.dumps({
"distributed_ref_counting_enabled": 1,
"task_retry_delay_ms": 0,
"object_store_full_max_retries": 3,
"object_store_full_initial_delay_ms": 100,
})
@@ -75,121 +73,6 @@ def check_refcounts(expected, timeout=10):
time.sleep(0.1)
def test_global_gc(shutdown_only):
cluster = ray.cluster_utils.Cluster()
for _ in range(2):
cluster.add_node(num_cpus=1, num_gpus=0)
ray.init(address=cluster.address)
class ObjectWithCyclicRef:
def __init__(self):
self.loop = self
@ray.remote(num_cpus=1)
class GarbageHolder:
def __init__(self):
gc.disable()
x = ObjectWithCyclicRef()
self.garbage = weakref.ref(x)
def has_garbage(self):
return self.garbage() is not None
try:
gc.disable()
# Local driver.
local_ref = weakref.ref(ObjectWithCyclicRef())
# Remote workers.
actors = [GarbageHolder.remote() for _ in range(2)]
assert local_ref() is not None
assert all(ray.get([a.has_garbage.remote() for a in actors]))
# GC should be triggered for all workers, including the local driver.
global_gc()
def check_refs_gced():
return (local_ref() is None and
not any(ray.get([a.has_garbage.remote() for a in actors])))
wait_for_condition(check_refs_gced, timeout_ms=10000)
finally:
gc.enable()
def test_global_gc_when_full(shutdown_only):
cluster = ray.cluster_utils.Cluster()
for _ in range(2):
cluster.add_node(
num_cpus=1, num_gpus=0, object_store_memory=100 * 1024 * 1024)
ray.init(address=cluster.address)
class LargeObjectWithCyclicRef:
def __init__(self):
self.loop = self
self.large_object = ray.put(
np.zeros(40 * 1024 * 1024, dtype=np.uint8))
@ray.remote(num_cpus=1)
class GarbageHolder:
def __init__(self):
gc.disable()
x = LargeObjectWithCyclicRef()
self.garbage = weakref.ref(x)
def has_garbage(self):
return self.garbage() is not None
def return_large_array(self):
return np.zeros(80 * 1024 * 1024, dtype=np.uint8)
try:
gc.disable()
# Local driver.
local_ref = weakref.ref(LargeObjectWithCyclicRef())
# Remote workers.
actors = [GarbageHolder.remote() for _ in range(2)]
assert local_ref() is not None
assert all(ray.get([a.has_garbage.remote() for a in actors]))
# GC should be triggered for all workers, including the local driver,
# when the driver tries to ray.put a value that doesn't fit in the
# object store. This should cause the captured ObjectIDs' numpy arrays
# to be evicted.
ray.put(np.zeros(80 * 1024 * 1024, dtype=np.uint8))
def check_refs_gced():
return (local_ref() is None and
not any(ray.get([a.has_garbage.remote() for a in actors])))
wait_for_condition(check_refs_gced, timeout_ms=10000)
# Local driver.
local_ref = weakref.ref(LargeObjectWithCyclicRef())
# Remote workers.
actors = [GarbageHolder.remote() for _ in range(2)]
def check_refs_gced():
return (local_ref() is None and
not any(ray.get([a.has_garbage.remote() for a in actors])))
wait_for_condition(check_refs_gced, timeout_ms=10000)
# GC should be triggered for all workers, including the local driver,
# when a remote task tries to put a return value that doesn't fit in
# the object store. This should cause the captured ObjectIDs' numpy
# arrays to be evicted.
ray.get(actors[0].return_large_array.remote())
assert local_ref() is None
assert not any(ray.get([a.has_garbage.remote() for a in actors]))
finally:
gc.enable()
def test_local_refcounts(ray_start_regular):
oid1 = ray.put(None)
check_refcounts({oid1: (1, 0)})
@@ -371,420 +254,6 @@ def test_feature_flag(shutdown_only):
_fill_object_store_and_get(actor.get_large_object.remote(), succeed=False)
# Remote function takes serialized reference and doesn't hold onto it after
# finishing. Referenced object shouldn't be evicted while the task is pending
# and should be evicted after it returns.
@pytest.mark.parametrize("use_ray_put", [False, True])
def test_basic_serialized_reference(one_worker_100MiB, use_ray_put):
@ray.remote
def pending(ref, dep):
ray.get(ref[0])
array_oid = put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
signal = SignalActor.remote()
oid = pending.remote([array_oid], signal.wait.remote())
# Remove the local reference.
array_oid_bytes = array_oid.binary()
del array_oid
# Check that the remote reference pins the object.
_fill_object_store_and_get(array_oid_bytes)
# Fulfill the dependency, causing the task to finish.
ray.get(signal.send.remote())
ray.get(oid)
# Reference should be gone, check that array gets evicted.
_fill_object_store_and_get(array_oid_bytes, succeed=False)
# Call a recursive chain of tasks that pass a serialized reference to the end
# of the chain. The reference should still exist while the final task in the
# chain is running and should be removed once it finishes.
@pytest.mark.parametrize("use_ray_put", [False, True])
def test_recursive_serialized_reference(one_worker_100MiB, use_ray_put):
@ray.remote(num_cpus=0)
class Signal:
def __init__(self):
self.ready_event = asyncio.Event()
def send(self):
self.ready_event.set()
async def wait(self):
await self.ready_event.wait()
@ray.remote
def recursive(ref, signal, max_depth, depth=0):
ray.get(ref[0])
if depth == max_depth:
return ray.get(signal.wait.remote())
else:
return recursive.remote(ref, signal, max_depth, depth + 1)
signal = SignalActor.remote()
max_depth = 5
array_oid = put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
head_oid = recursive.remote([array_oid], signal, max_depth)
# Remove the local reference.
array_oid_bytes = array_oid.binary()
del array_oid
tail_oid = head_oid
for _ in range(max_depth):
tail_oid = ray.get(tail_oid)
# Check that the remote reference pins the object.
_fill_object_store_and_get(array_oid_bytes)
# Fulfill the dependency, causing the tail task to finish.
ray.get(signal.send.remote())
assert ray.get(tail_oid) is None
# Reference should be gone, check that array gets evicted.
_fill_object_store_and_get(array_oid_bytes, succeed=False)
# Test that a passed reference held by an actor after the method finishes
# is kept until the reference is removed from the actor. Also tests giving
# the actor a duplicate reference to the same object ID.
@pytest.mark.parametrize("use_ray_put", [False, True])
def test_actor_holding_serialized_reference(one_worker_100MiB, use_ray_put):
@ray.remote
class GreedyActor(object):
def __init__(self):
pass
def set_ref1(self, ref):
self.ref1 = ref
def add_ref2(self, new_ref):
self.ref2 = new_ref
def delete_ref1(self):
self.ref1 = None
def delete_ref2(self):
self.ref2 = None
# Test that the reference held by the actor isn't evicted.
array_oid = put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
actor = GreedyActor.remote()
actor.set_ref1.remote([array_oid])
# Test that giving the same actor a duplicate reference works.
ray.get(actor.add_ref2.remote([array_oid]))
# Remove the local reference.
array_oid_bytes = array_oid.binary()
del array_oid
# Test that the remote references still pin the object.
_fill_object_store_and_get(array_oid_bytes)
# Test that removing only the first reference doesn't unpin the object.
ray.get(actor.delete_ref1.remote())
_fill_object_store_and_get(array_oid_bytes)
# Test that deleting the second reference stops it from being pinned.
ray.get(actor.delete_ref2.remote())
_fill_object_store_and_get(array_oid_bytes, succeed=False)
# Test that a passed reference held by an actor after a task finishes
# is kept until the reference is removed from the worker. Also tests giving
# the worker a duplicate reference to the same object ID.
@pytest.mark.parametrize("use_ray_put", [False, True])
def test_worker_holding_serialized_reference(one_worker_100MiB, use_ray_put):
@ray.remote(num_cpus=0)
class Signal:
def __init__(self):
self.ready_event = asyncio.Event()
def send(self):
self.ready_event.set()
async def wait(self):
await self.ready_event.wait()
@ray.remote
def child(dep1, dep2):
return
@ray.remote
def launch_pending_task(ref, signal):
return child.remote(ref[0], signal.wait.remote())
signal = SignalActor.remote()
# Test that the reference held by the actor isn't evicted.
array_oid = put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
child_return_id = ray.get(launch_pending_task.remote([array_oid], signal))
# Remove the local reference.
array_oid_bytes = array_oid.binary()
del array_oid
# Test that the reference prevents the object from being evicted.
_fill_object_store_and_get(array_oid_bytes)
ray.get(signal.send.remote())
ray.get(child_return_id)
del child_return_id
_fill_object_store_and_get(array_oid_bytes, succeed=False)
# Test that an object containing object IDs within it pins the inner IDs.
def test_basic_nested_ids(one_worker_100MiB):
inner_oid = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
outer_oid = ray.put([inner_oid])
# Remove the local reference to the inner object.
inner_oid_bytes = inner_oid.binary()
del inner_oid
# Check that the outer reference pins the inner object.
_fill_object_store_and_get(inner_oid_bytes)
# Remove the outer reference and check that the inner object gets evicted.
del outer_oid
_fill_object_store_and_get(inner_oid_bytes, succeed=False)
# Test that an object containing object IDs within it pins the inner IDs
# recursively and for submitted tasks.
@pytest.mark.parametrize("use_ray_put", [False, True])
def test_recursively_nest_ids(one_worker_100MiB, use_ray_put):
@ray.remote(num_cpus=0)
class Signal:
def __init__(self):
self.ready_event = asyncio.Event()
def send(self):
self.ready_event.set()
async def wait(self):
await self.ready_event.wait()
@ray.remote
def recursive(ref, signal, max_depth, depth=0):
unwrapped = ray.get(ref[0])
if depth == max_depth:
return ray.get(signal.wait.remote())
else:
return recursive.remote(unwrapped, signal, max_depth, depth + 1)
signal = SignalActor.remote()
max_depth = 5
array_oid = put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
nested_oid = array_oid
for _ in range(max_depth):
nested_oid = ray.put([nested_oid])
head_oid = recursive.remote([nested_oid], signal, max_depth)
# Remove the local reference.
array_oid_bytes = array_oid.binary()
del array_oid, nested_oid
tail_oid = head_oid
for _ in range(max_depth):
tail_oid = ray.get(tail_oid)
# Check that the remote reference pins the object.
_fill_object_store_and_get(array_oid_bytes)
# Fulfill the dependency, causing the tail task to finish.
ray.get(signal.send.remote())
ray.get(tail_oid)
# Reference should be gone, check that array gets evicted.
_fill_object_store_and_get(array_oid_bytes, succeed=False)
# Test that serialized objectIDs returned from remote tasks are pinned until
# they go out of scope on the caller side.
@pytest.mark.parametrize("use_ray_put", [False, True])
def test_return_object_id(one_worker_100MiB, use_ray_put):
@ray.remote
def return_an_id():
return [
put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
]
outer_oid = return_an_id.remote()
inner_oid_binary = ray.get(outer_oid)[0].binary()
# Check that the inner ID is pinned by the outer ID.
_fill_object_store_and_get(inner_oid_binary)
# Check that taking a reference to the inner ID and removing the outer ID
# doesn't unpin the object.
inner_oid = ray.get(outer_oid)[0]
del outer_oid
_fill_object_store_and_get(inner_oid_binary)
# Check that removing the inner ID unpins the object.
del inner_oid
_fill_object_store_and_get(inner_oid_binary, succeed=False)
# Test that serialized objectIDs returned from remote tasks are pinned if
# passed into another remote task by the caller.
@pytest.mark.parametrize("use_ray_put", [False, True])
def test_pass_returned_object_id(one_worker_100MiB, use_ray_put):
@ray.remote(num_cpus=0)
class Signal:
def __init__(self):
self.ready_event = asyncio.Event()
def send(self):
self.ready_event.set()
async def wait(self):
await self.ready_event.wait()
@ray.remote
def put():
return
@ray.remote
def return_an_id():
return [
put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
]
@ray.remote
def pending(ref):
ray.get(ref[0])
return ref[0]
signal = SignalActor.remote()
outer_oid = return_an_id.remote()
pending_oid = pending.remote([outer_oid])
# Remove the local reference to the returned ID.
del outer_oid
# Check that the inner ID is pinned by the remote task ID.
_fill_object_store_and_get(pending_oid, succeed=False)
ray.get(signal.send.remote())
inner_oid = ray.get(pending_oid)
inner_oid_binary = inner_oid.binary()
_fill_object_store_and_get(inner_oid_binary)
del pending_oid
del inner_oid
_fill_object_store_and_get(inner_oid_binary, succeed=False)
# Call a recursive chain of tasks that pass a serialized reference that was
# returned by another task to the end of the chain. The reference should still
# exist while the final task in the chain is running and should be removed once
# it finishes.
@pytest.mark.parametrize("use_ray_put", [False, True])
def test_recursively_pass_returned_object_id(one_worker_100MiB, use_ray_put):
@ray.remote(num_cpus=0)
class Signal:
def __init__(self):
self.ready_event = asyncio.Event()
def send(self):
self.ready_event.set()
async def wait(self):
await self.ready_event.wait()
@ray.remote
def return_an_id():
return put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
@ray.remote
def recursive(ref, signal, max_depth, depth=0):
inner_id = ray.get(ref[0])
if depth == max_depth:
ray.get(signal.wait.remote())
return inner_id
else:
return recursive.remote(ref, signal, max_depth, depth + 1)
max_depth = 5
outer_oid = return_an_id.remote()
signal = SignalActor.remote()
head_oid = recursive.remote([outer_oid], signal, max_depth)
# Remove the local reference.
outer_oid = head_oid
for _ in range(max_depth):
outer_oid = ray.get(outer_oid)
# Fill the object store.
_fill_object_store_and_get(outer_oid, succeed=False)
# Fulfill the dependency, causing the tail task to finish.
ray.get(signal.send.remote())
# Check that the remote reference pins the object.
inner_oid = ray.get(outer_oid)
_fill_object_store_and_get(inner_oid)
inner_oid_bytes = inner_oid.binary()
# Reference should be gone, check that returned ID gets evicted.
del head_oid
del outer_oid
del inner_oid
_fill_object_store_and_get(inner_oid_bytes, succeed=False)
# Call a recursive chain of tasks. The final task in the chain returns an
# ObjectID returned by a task that it submitted. Every other task in the chain
# returns the same ObjectID by calling ray.get() on its submitted task and
# returning the result. The reference should still exist while the driver has a
# reference to the final task's ObjectID.
@pytest.mark.parametrize("use_ray_put", [False, True])
def test_recursively_return_borrowed_object_id(one_worker_100MiB, use_ray_put):
@ray.remote
def recursive(num_tasks_left):
if num_tasks_left == 0:
return put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
final_id = ray.get(recursive.remote(num_tasks_left - 1))
ray.get(final_id)
return final_id
max_depth = 5
head_oid = recursive.remote(max_depth)
final_oid = ray.get(head_oid)
final_oid_bytes = final_oid.binary()
# Check that the driver's reference pins the object.
_fill_object_store_and_get(final_oid_bytes)
# Remove the local reference and try it again.
final_oid = ray.get(head_oid)
_fill_object_store_and_get(final_oid_bytes)
# Remove all references.
del head_oid
del final_oid
# Reference should be gone, check that returned ID gets evicted.
_fill_object_store_and_get(final_oid_bytes, succeed=False)
def test_out_of_band_serialized_object_id(one_worker_100MiB):
assert len(
ray.worker.global_worker.core_worker.get_all_reference_counts()) == 0
@@ -840,6 +309,207 @@ def test_captured_object_id(one_worker_100MiB):
_fill_object_store_and_get(oid)
# Remote function takes serialized reference and doesn't hold onto it after
# finishing. Referenced object shouldn't be evicted while the task is pending
# and should be evicted after it returns.
@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True),
(True, False), (True, True)])
def test_basic_serialized_reference(one_worker_100MiB, use_ray_put, failure):
@ray.remote(max_retries=1)
def pending(ref, dep):
ray.get(ref[0])
if failure:
os._exit(0)
array_oid = put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
signal = SignalActor.remote()
oid = pending.remote([array_oid], signal.wait.remote())
# Remove the local reference.
array_oid_bytes = array_oid.binary()
del array_oid
# Check that the remote reference pins the object.
_fill_object_store_and_get(array_oid_bytes)
# Fulfill the dependency, causing the task to finish.
ray.get(signal.send.remote())
try:
ray.get(oid)
assert not failure
except ray.exceptions.RayWorkerError:
assert failure
# Reference should be gone, check that array gets evicted.
_fill_object_store_and_get(array_oid_bytes, succeed=False)
# Call a recursive chain of tasks that pass a serialized reference to the end
# of the chain. The reference should still exist while the final task in the
# chain is running and should be removed once it finishes.
@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True),
(True, False), (True, True)])
def test_recursive_serialized_reference(one_worker_100MiB, use_ray_put,
failure):
@ray.remote(max_retries=1)
def recursive(ref, signal, max_depth, depth=0):
ray.get(ref[0])
if depth == max_depth:
ray.get(signal.wait.remote())
if failure:
os._exit(0)
return
else:
return recursive.remote(ref, signal, max_depth, depth + 1)
signal = SignalActor.remote()
max_depth = 5
array_oid = put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
head_oid = recursive.remote([array_oid], signal, max_depth)
# Remove the local reference.
array_oid_bytes = array_oid.binary()
del array_oid
tail_oid = head_oid
for _ in range(max_depth):
tail_oid = ray.get(tail_oid)
# Check that the remote reference pins the object.
_fill_object_store_and_get(array_oid_bytes)
# Fulfill the dependency, causing the tail task to finish.
ray.get(signal.send.remote())
try:
assert ray.get(tail_oid) is None
assert not failure
# TODO(edoakes): this should raise WorkerError.
except ray.exceptions.UnreconstructableError:
assert failure
# Reference should be gone, check that array gets evicted.
_fill_object_store_and_get(array_oid_bytes, succeed=False)
# Test that a passed reference held by an actor after the method finishes
# is kept until the reference is removed from the actor. Also tests giving
# the actor a duplicate reference to the same object ID.
@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True),
(True, False), (True, True)])
def test_actor_holding_serialized_reference(one_worker_100MiB, use_ray_put,
failure):
@ray.remote
class GreedyActor(object):
def __init__(self):
pass
def set_ref1(self, ref):
self.ref1 = ref
def add_ref2(self, new_ref):
self.ref2 = new_ref
def delete_ref1(self):
self.ref1 = None
def delete_ref2(self):
self.ref2 = None
# Test that the reference held by the actor isn't evicted.
array_oid = put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
actor = GreedyActor.remote()
actor.set_ref1.remote([array_oid])
# Test that giving the same actor a duplicate reference works.
ray.get(actor.add_ref2.remote([array_oid]))
# Remove the local reference.
array_oid_bytes = array_oid.binary()
del array_oid
# Test that the remote references still pin the object.
_fill_object_store_and_get(array_oid_bytes)
# Test that removing only the first reference doesn't unpin the object.
ray.get(actor.delete_ref1.remote())
_fill_object_store_and_get(array_oid_bytes)
if failure:
# Test that the actor exiting stops the reference from being pinned.
ray.kill(actor)
# Wait for the actor to exit.
with pytest.raises(ray.exceptions.RayActorError):
ray.get(actor.delete_ref1.remote())
else:
# Test that deleting the second reference stops it from being pinned.
ray.get(actor.delete_ref2.remote())
_fill_object_store_and_get(array_oid_bytes, succeed=False)
# Test that a passed reference held by an actor after a task finishes
# is kept until the reference is removed from the worker. Also tests giving
# the worker a duplicate reference to the same object ID.
@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True),
(True, False), (True, True)])
def test_worker_holding_serialized_reference(one_worker_100MiB, use_ray_put,
failure):
@ray.remote(max_retries=1)
def child(dep1, dep2):
if failure:
os._exit(0)
return
@ray.remote
def launch_pending_task(ref, signal):
return child.remote(ref[0], signal.wait.remote())
signal = SignalActor.remote()
# Test that the reference held by the actor isn't evicted.
array_oid = put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
child_return_id = ray.get(launch_pending_task.remote([array_oid], signal))
# Remove the local reference.
array_oid_bytes = array_oid.binary()
del array_oid
# Test that the reference prevents the object from being evicted.
_fill_object_store_and_get(array_oid_bytes)
ray.get(signal.send.remote())
try:
ray.get(child_return_id)
assert not failure
except (ray.exceptions.RayWorkerError,
ray.exceptions.UnreconstructableError):
assert failure
del child_return_id
_fill_object_store_and_get(array_oid_bytes, succeed=False)
# Test that an object containing object IDs within it pins the inner IDs.
def test_basic_nested_ids(one_worker_100MiB):
inner_oid = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
outer_oid = ray.put([inner_oid])
# Remove the local reference to the inner object.
inner_oid_bytes = inner_oid.binary()
del inner_oid
# Check that the outer reference pins the inner object.
_fill_object_store_and_get(inner_oid_bytes)
# Remove the outer reference and check that the inner object gets evicted.
del outer_oid
_fill_object_store_and_get(inner_oid_bytes, succeed=False)
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))
@@ -0,0 +1,289 @@
# coding: utf-8
import json
import logging
import os
import signal
import numpy as np
import pytest
import ray
import ray.cluster_utils
from ray.test_utils import SignalActor, put_object, wait_for_condition
logger = logging.getLogger(__name__)
@pytest.fixture
def one_worker_100MiB(request):
config = json.dumps({
"distributed_ref_counting_enabled": 1,
"object_store_full_max_retries": 2,
"task_retry_delay_ms": 0,
})
yield ray.init(
num_cpus=1,
object_store_memory=100 * 1024 * 1024,
_internal_config=config)
ray.shutdown()
def _fill_object_store_and_get(oid, succeed=True, object_MiB=40,
num_objects=5):
for _ in range(num_objects):
ray.put(np.zeros(object_MiB * 1024 * 1024, dtype=np.uint8))
if type(oid) is bytes:
oid = ray.ObjectID(oid)
if succeed:
ray.get(oid)
else:
if oid.is_direct_call_type():
with pytest.raises(ray.exceptions.RayTimeoutError):
ray.get(oid, timeout=0.1)
else:
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(oid)
# Test that an object containing object IDs within it pins the inner IDs
# recursively and for submitted tasks.
@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True),
(True, False), (True, True)])
def test_recursively_nest_ids(one_worker_100MiB, use_ray_put, failure):
@ray.remote(max_retries=1)
def recursive(ref, signal, max_depth, depth=0):
unwrapped = ray.get(ref[0])
if depth == max_depth:
ray.get(signal.wait.remote())
if failure:
os._exit(0)
return
else:
return recursive.remote(unwrapped, signal, max_depth, depth + 1)
signal = SignalActor.remote()
max_depth = 5
array_oid = put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
nested_oid = array_oid
for _ in range(max_depth):
nested_oid = ray.put([nested_oid])
head_oid = recursive.remote([nested_oid], signal, max_depth)
# Remove the local reference.
array_oid_bytes = array_oid.binary()
del array_oid, nested_oid
tail_oid = head_oid
for _ in range(max_depth):
tail_oid = ray.get(tail_oid)
# Check that the remote reference pins the object.
_fill_object_store_and_get(array_oid_bytes)
# Fulfill the dependency, causing the tail task to finish.
ray.get(signal.send.remote())
try:
ray.get(tail_oid)
assert not failure
# TODO(edoakes): this should raise WorkerError.
except ray.exceptions.UnreconstructableError:
assert failure
# Reference should be gone, check that array gets evicted.
_fill_object_store_and_get(array_oid_bytes, succeed=False)
# Test that serialized objectIDs returned from remote tasks are pinned until
# they go out of scope on the caller side.
@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True),
(True, False), (True, True)])
def test_return_object_id(one_worker_100MiB, use_ray_put, failure):
@ray.remote
def return_an_id():
return [
put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
]
@ray.remote(max_retries=1)
def exit():
os._exit(0)
outer_oid = return_an_id.remote()
inner_oid_binary = ray.get(outer_oid)[0].binary()
# Check that the inner ID is pinned by the outer ID.
_fill_object_store_and_get(inner_oid_binary)
# Check that taking a reference to the inner ID and removing the outer ID
# doesn't unpin the object.
inner_oid = ray.get(outer_oid)[0] # noqa: F841
del outer_oid
_fill_object_store_and_get(inner_oid_binary)
if failure:
# Check that the owner dying unpins the object. This should execute on
# the same worker because there is only one started and the other tasks
# have finished.
with pytest.raises(ray.exceptions.RayWorkerError):
ray.get(exit.remote())
else:
# Check that removing the inner ID unpins the object.
del inner_oid
_fill_object_store_and_get(inner_oid_binary, succeed=False)
# Test that serialized objectIDs returned from remote tasks are pinned if
# passed into another remote task by the caller.
@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True),
(True, False), (True, True)])
def test_pass_returned_object_id(one_worker_100MiB, use_ray_put, failure):
@ray.remote
def return_an_id():
return [
put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
]
# TODO(edoakes): this fails with an ActorError with max_retries=1.
@ray.remote(max_retries=0)
def pending(ref, signal):
ray.get(signal.wait.remote())
ray.get(ref[0])
if failure:
os._exit(0)
signal = SignalActor.remote()
outer_oid = return_an_id.remote()
inner_oid_binary = ray.get(outer_oid)[0].binary()
pending_oid = pending.remote([outer_oid], signal)
# Remove the local reference to the returned ID.
del outer_oid
# Check that the inner ID is pinned by the remote task ID and finishing
# the task unpins the object.
ray.get(signal.send.remote())
try:
# Should succeed because inner_oid is pinned if no failure.
ray.get(pending_oid)
assert not failure
except ray.exceptions.RayWorkerError:
assert failure
def ref_not_exists():
worker = ray.worker.global_worker
inner_oid = ray.ObjectID(inner_oid_binary)
return not worker.core_worker.object_exists(inner_oid)
assert wait_for_condition(ref_not_exists)
# Call a recursive chain of tasks that pass a serialized reference that was
# returned by another task to the end of the chain. The reference should still
# exist while the final task in the chain is running and should be removed once
# it finishes.
@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True),
(True, False), (True, True)])
def test_recursively_pass_returned_object_id(one_worker_100MiB, use_ray_put,
failure):
@ray.remote
def return_an_id():
return put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put)
@ray.remote(max_retries=1)
def recursive(ref, signal, max_depth, depth=0):
inner_id = ray.get(ref[0])
if depth == max_depth:
ray.get(signal.wait.remote())
if failure:
os._exit(0)
return inner_id
else:
return inner_id, recursive.remote(ref, signal, max_depth,
depth + 1)
max_depth = 5
outer_oid = return_an_id.remote()
signal = SignalActor.remote()
head_oid = recursive.remote([outer_oid], signal, max_depth)
# Remove the local reference.
inner_oid = None
outer_oid = head_oid
for i in range(max_depth):
inner_oid, outer_oid = ray.get(outer_oid)
# Check that the remote reference pins the object.
_fill_object_store_and_get(outer_oid, succeed=False)
# Fulfill the dependency, causing the tail task to finish.
ray.get(signal.send.remote())
try:
# Check that the remote reference pins the object.
ray.get(outer_oid)
_fill_object_store_and_get(inner_oid)
assert not failure
# TODO(edoakes): this should raise WorkerError.
except ray.exceptions.UnreconstructableError:
assert failure
inner_oid_bytes = inner_oid.binary()
del inner_oid
del head_oid
del outer_oid
# Reference should be gone, check that returned ID gets evicted.
_fill_object_store_and_get(inner_oid_bytes, succeed=False)
# Call a recursive chain of tasks. The final task in the chain returns an
# ObjectID returned by a task that it submitted. Every other task in the chain
# returns the same ObjectID by calling ray.get() on its submitted task and
# returning the result. The reference should still exist while the driver has a
# reference to the final task's ObjectID.
@pytest.mark.parametrize("use_ray_put,failure", [(False, False), (False, True),
(True, False), (True, True)])
def test_recursively_return_borrowed_object_id(one_worker_100MiB, use_ray_put,
failure):
@ray.remote
def recursive(num_tasks_left):
if num_tasks_left == 0:
return put_object(
np.zeros(40 * 1024 * 1024, dtype=np.uint8),
use_ray_put), os.getpid()
return ray.get(recursive.remote(num_tasks_left - 1))
max_depth = 5
head_oid = recursive.remote(max_depth)
final_oid, owner_pid = ray.get(head_oid)
final_oid_bytes = final_oid.binary()
# Check that the driver's reference pins the object.
_fill_object_store_and_get(final_oid_bytes)
# Remove the local reference and try it again.
_fill_object_store_and_get(final_oid_bytes)
if failure:
os.kill(owner_pid, signal.SIGKILL)
else:
# Remove all references.
del head_oid
del final_oid
# Reference should be gone, check that returned ID gets evicted.
_fill_object_store_and_get(final_oid_bytes, succeed=False)
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))
+3
View File
@@ -260,3 +260,6 @@ RAY_CONFIG(int32_t, object_store_full_max_retries, 5)
/// Duration to sleep after failing to put an object in plasma because it is full.
/// This will be exponentially increased for each retry.
RAY_CONFIG(uint32_t, object_store_full_initial_delay_ms, 1000)
/// Duration to wait between retries for failed tasks.
RAY_CONFIG(uint32_t, task_retry_delay_ms, 5000)
+4 -3
View File
@@ -202,10 +202,11 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
[this](const TaskSpecification &spec) {
// Retry after a delay to emulate the existing Raylet reconstruction
// behaviour. TODO(ekl) backoff exponentially.
RAY_LOG(ERROR) << "Will resubmit task after a 5 second delay: "
<< spec.DebugString();
uint32_t delay = RayConfig::instance().task_retry_delay_ms();
RAY_LOG(ERROR) << "Will resubmit task after a " << delay
<< "ms delay: " << spec.DebugString();
absl::MutexLock lock(&mutex_);
to_resubmit_.push_back(std::make_pair(current_time_ms() + 5000, spec));
to_resubmit_.push_back(std::make_pair(current_time_ms() + delay, spec));
}));
// Create an entry for the driver task in the task table. This task is