diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index f107f4c7a..c5a6b66ca 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -1,3 +1,4 @@ +import asyncio import json import fnmatch import os @@ -210,3 +211,27 @@ def generate_internal_config_map(**kwargs): "_internal_config": internal_config, } return ray_kwargs + + +@ray.remote(num_cpus=0) +class SignalActor: + def __init__(self): + self.ready_event = asyncio.Event() + + def send(self): + self.ready_event.set() + + async def wait(self, should_wait=True): + if should_wait: + await self.ready_event.wait() + + +class RemoteSignal: + def __init__(self): + self.signal_actor = SignalActor.remote() + + def send(self): + ray.get(self.signal_actor.send.remote()) + + def wait(self): + ray.get(self.signal_actor.wait.remote()) diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 1de71bc1c..45d6ae9e6 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -8,10 +8,8 @@ import pickle import re import string import sys -import tempfile import threading import time -import uuid import weakref import numpy as np @@ -1314,35 +1312,21 @@ def test_get_dict(ray_start_regular): def test_get_with_timeout(ray_start_regular): - def random_path(): - return os.path.join(tempfile.gettempdir(), uuid.uuid4().hex) - - def touch(path): - with open(path, "w"): - pass - - @ray.remote - def wait_for_file(path): - if path: - while True: - if os.path.exists(path): - break - time.sleep(0.1) + signal = ray.test_utils.SignalActor.remote() # Check that get() returns early if object is ready. start = time.time() - ray.get(wait_for_file.remote(None), timeout=30) + ray.get(signal.wait.remote(should_wait=False), timeout=30) assert time.time() - start < 30 # Check that get() raises a TimeoutError after the timeout if the object # is not ready yet. - path = random_path() - result_id = wait_for_file.remote(path) + result_id = signal.wait.remote() with pytest.raises(RayTimeoutError): ray.get(result_id, timeout=0.1) # Check that a subsequent get() returns early. - touch(path) + ray.get(signal.send.remote()) start = time.time() ray.get(result_id, timeout=30) assert time.time() - start < 30 diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 6a75eed39..fd521c79f 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -5,7 +5,6 @@ import sys import tempfile import threading import time -import uuid import numpy as np import pytest @@ -19,6 +18,7 @@ from ray.test_utils import ( wait_for_condition, wait_for_errors, RayTestTimeoutException, + SignalActor, ) @@ -82,19 +82,6 @@ def test_failed_task(ray_start_regular): def test_get_throws_quickly_when_found_exception(ray_start_regular): - def random_path(): - return os.path.join(tempfile.gettempdir(), uuid.uuid4().hex) - - def touch(path): - with open(path, "w"): - pass - - def wait_for_file(path): - while True: - if os.path.exists(path): - break - time.sleep(0.1) - # We use an actor instead of functions here. If we use functions, it's # very likely that two normal tasks are submitted before the first worker # is registered to Raylet. Since `maximum_startup_concurrency` is 1, @@ -110,25 +97,27 @@ def test_get_throws_quickly_when_found_exception(ray_start_regular): def bad_func2(self): os._exit(0) - def slow_func(self, path): - wait_for_file(path) + def slow_func(self, signal): + ray.get(signal.wait.remote()) def expect_exception(objects, exception): with pytest.raises(ray.exceptions.RayError) as err: ray.get(objects) assert err.type is exception - f = random_path() + signal1 = SignalActor.remote() actor = Actor.options(max_concurrency=2).remote() - expect_exception([actor.bad_func1.remote(), - actor.slow_func.remote(f)], ray.exceptions.RayTaskError) - touch(f) + expect_exception( + [actor.bad_func1.remote(), + actor.slow_func.remote(signal1)], ray.exceptions.RayTaskError) + ray.get(signal1.send.remote()) - f = random_path() - actor = Actor.options(max_concurrency=2).remote() - expect_exception([actor.bad_func2.remote(), - actor.slow_func.remote(f)], ray.exceptions.RayActorError) - touch(f) + signal2 = SignalActor.remote() + actor = Actor.options(is_direct_call=True, max_concurrency=2).remote() + expect_exception( + [actor.bad_func2.remote(), + actor.slow_func.remote(signal2)], ray.exceptions.RayActorError) + ray.get(signal2.send.remote()) def test_fail_importing_remote_function(ray_start_2_cpus): @@ -380,7 +369,7 @@ def test_actor_worker_dying(ray_start_regular): pass a = Actor.remote() - [obj], _ = ray.wait([a.kill.remote()], timeout=5.0) + [obj], _ = ray.wait([a.kill.remote()], timeout=5) with pytest.raises(ray.exceptions.RayActorError): ray.get(obj) with pytest.raises(ray.exceptions.RayTaskError): @@ -872,11 +861,11 @@ def test_connect_with_disconnected_node(shutdown_only): # This node is killed by SIGKILL, ray_monitor will mark it to dead. dead_node = cluster.add_node(num_cpus=0, _internal_config=config) cluster.remove_node(dead_node, allow_graceful=False) - wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 1, timeout=2) + wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 1) # This node is killed by SIGKILL, ray_monitor will mark it to dead. dead_node = cluster.add_node(num_cpus=0, _internal_config=config) cluster.remove_node(dead_node, allow_graceful=False) - wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 2, timeout=2) + wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 2) # This node is killed by SIGTERM, ray_monitor will not mark it again. removing_node = cluster.add_node(num_cpus=0, _internal_config=config) cluster.remove_node(removing_node, allow_graceful=True) diff --git a/python/ray/tests/test_multiprocessing.py b/python/ray/tests/test_multiprocessing.py index 389d58231..da8647197 100644 --- a/python/ray/tests/test_multiprocessing.py +++ b/python/ray/tests/test_multiprocessing.py @@ -8,6 +8,7 @@ from collections import defaultdict import queue import ray +from ray.test_utils import SignalActor from ray.util.multiprocessing import Pool, TimeoutError @@ -137,17 +138,18 @@ def test_initializer(shutdown_only): def test_close(pool_4_processes): - def f(object_id): - return ray.get(object_id) + def f(signal): + ray.get(signal.wait.remote()) + return "hello" - object_id = ray.ObjectID.from_random() - result = pool_4_processes.map_async(f, [object_id for _ in range(4)]) + signal = SignalActor.remote() + result = pool_4_processes.map_async(f, [signal for _ in range(4)]) assert not result.ready() pool_4_processes.close() assert not result.ready() - # Fulfill the object_id, causing the head of line tasks to finish. - ray.worker.global_worker.put_object("hello", object_id=object_id) + # Signal the head of line tasks to finish. + ray.get(signal.send.remote()) pool_4_processes.join() # close() shouldn't interrupt pending tasks, so check that they succeeded. @@ -158,11 +160,11 @@ def test_close(pool_4_processes): def test_terminate(pool_4_processes): - def f(object_id): - return ray.get(object_id) + def f(signal): + return ray.get(signal.wait.remote()) - object_id = ray.ObjectID.from_random() - result = pool_4_processes.map_async(f, [object_id for _ in range(4)]) + signal = SignalActor.remote() + result = pool_4_processes.map_async(f, [signal for _ in range(4)]) assert not result.ready() pool_4_processes.terminate() @@ -218,32 +220,32 @@ def test_apply_async(pool): pool.apply_async(f, (1, 2), {"kwarg1": 3}).get() # Won't return until the input ObjectID is fulfilled. - def ten_over(input): - return 10 / ray.get(input[0]) + def ten_over(args): + signal, val = args + ray.get(signal.wait.remote()) + return 10 / val - # Generate a random ObjectID that will be fulfilled later. - object_id = ray.ObjectID.from_random() - result = pool.apply_async(ten_over, ([object_id], )) + signal = SignalActor.remote() + result = pool.apply_async(ten_over, ([signal, 10], )) result.wait(timeout=0.01) assert not result.ready() with pytest.raises(TimeoutError): result.get(timeout=0.01) # Fulfill the ObjectID. - ray.worker.global_worker.put_object(10, object_id=object_id) + ray.get(signal.send.remote()) result.wait(timeout=10) assert result.ready() assert result.successful() assert result.get() == 1 - # Generate a random ObjectID that will be fulfilled later. - object_id = ray.ObjectID.from_random() - result = pool.apply_async(ten_over, ([object_id], )) + signal = SignalActor.remote() + result = pool.apply_async(ten_over, ([signal, 0], )) with pytest.raises(ValueError, match="not ready"): result.successful() # Fulfill the ObjectID with 0, causing the task to fail (divide by zero). - ray.worker.global_worker.put_object(0, object_id=object_id) + ray.get(signal.send.remote()) result.wait(timeout=10) assert result.ready() assert not result.successful() @@ -276,21 +278,20 @@ def test_map(pool_4_processes): def test_map_async(pool_4_processes): def f(args): - index = args[0] - ray.get(args[1]) + index, signal = args + ray.get(signal.wait.remote()) return index, os.getpid() - # Generate a random ObjectID that will be fulfilled later. - object_id = ray.ObjectID.from_random() + signal = SignalActor.remote() async_result = pool_4_processes.map_async( - f, [(i, object_id) for i in range(1000)]) + f, [(i, signal) for i in range(1000)]) assert not async_result.ready() with pytest.raises(TimeoutError): async_result.get(timeout=0.01) async_result.wait(timeout=0.01) - # Fulfill the object ID, finishing the tasks. - ray.worker.global_worker.put_object(0, object_id=object_id) + # Send the signal to finish the tasks. + ray.get(signal.send.remote()) async_result.wait(timeout=10) assert async_result.ready() assert async_result.successful() @@ -440,23 +441,21 @@ def test_imap_unordered(pool_4_processes): def test_imap_timeout(pool_4_processes): def f(args): + index, wait_index, signal = args time.sleep(0.1 * random.random()) - index = args[0] - wait_index = args[1] - object_id = args[2] if index == wait_index: - ray.get(object_id) + ray.get(signal.wait.remote()) return index wait_index = 23 - object_id = ray.ObjectID.from_random() + signal = SignalActor.remote() result_iter = pool_4_processes.imap( - f, [(index, wait_index, object_id) for index in range(100)]) + f, [(index, wait_index, signal) for index in range(100)]) for i in range(100): if i == wait_index: with pytest.raises(TimeoutError): result = result_iter.next(timeout=0.1) - ray.worker.global_worker.put_object(None, object_id=object_id) + ray.get(signal.send.remote()) result = result_iter.next() assert result == i @@ -465,16 +464,15 @@ def test_imap_timeout(pool_4_processes): result_iter.next() wait_index = 23 - object_id = ray.ObjectID.from_random() + signal = SignalActor.remote() result_iter = pool_4_processes.imap_unordered( - f, [(index, wait_index, object_id) for index in range(100)], - chunksize=11) + f, [(index, wait_index, signal) for index in range(100)], chunksize=11) in_order = [] for i in range(100): try: result = result_iter.next(timeout=1) except TimeoutError: - ray.worker.global_worker.put_object(None, object_id=object_id) + ray.get(signal.send.remote()) result = result_iter.next() in_order.append(result == i) diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index 054d0f505..361b054ce 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -2,11 +2,8 @@ import copy import json import logging -import os import gc -import tempfile import time -import uuid import weakref import numpy as np @@ -15,7 +12,7 @@ import pytest import ray import ray.cluster_utils -import ray.test_utils +from ray.test_utils import SignalActor from ray.internal.internal_api import global_gc logger = logging.getLogger(__name__) @@ -192,45 +189,27 @@ def test_local_refcounts(ray_start_regular): def test_dependency_refcounts(ray_start_regular): - # Return a large object that will be spilled to plasma. - def large_object(): - return np.zeros(10 * 1024 * 1024, dtype=np.uint8) - - # TODO: Clean up tmpfiles? - def random_path(): - return os.path.join(tempfile.gettempdir(), uuid.uuid4().hex) - - def touch(path): - with open(path, "w"): - pass - - def wait_for_file(path): - while True: - if os.path.exists(path): - break - time.sleep(0.1) - @ray.remote - def one_dep(dep, path=None, fail=False): - if path is not None: - wait_for_file(path) + def one_dep(dep, signal=None, fail=False): + if signal is not None: + ray.get(signal.wait.remote()) if fail: raise Exception("failed on purpose") @ray.remote - def one_dep_large(dep, path=None): - if path is not None: - wait_for_file(path) - # This should be spilled to plasma. - return large_object() + def one_dep_large(dep, signal=None): + if signal is not None: + ray.get(signal.wait.remote()) + # This will be spilled to plasma. + return np.zeros(10 * 1024 * 1024, dtype=np.uint8) # Test that regular plasma dependency refcounts are decremented once the # task finishes. - f = random_path() - large_dep = ray.put(large_object()) - result = one_dep.remote(large_dep, path=f) + signal = SignalActor.remote() + large_dep = ray.put(np.zeros(10 * 1024 * 1024, dtype=np.uint8)) + result = one_dep.remote(large_dep, signal=signal) check_refcounts({large_dep: (1, 1), result: (1, 0)}) - touch(f) + ray.get(signal.send.remote()) # Reference count should be removed once the task finishes. check_refcounts({large_dep: (1, 0), result: (1, 0)}) del large_dep, result @@ -238,29 +217,29 @@ def test_dependency_refcounts(ray_start_regular): # Test that inlined dependency refcounts are decremented once they are # inlined. - f = random_path() - dep = one_dep.remote(None, path=f) + signal = SignalActor.remote() + dep = one_dep.remote(None, signal=signal) check_refcounts({dep: (1, 0)}) result = one_dep.remote(dep) check_refcounts({dep: (1, 1), result: (1, 0)}) - touch(f) + ray.get(signal.send.remote()) # Reference count should be removed as soon as the dependency is inlined. - check_refcounts({dep: (1, 0), result: (1, 0)}, timeout=1) + check_refcounts({dep: (1, 0), result: (1, 0)}) del dep, result check_refcounts({}) # Test that spilled plasma dependency refcounts are decremented once # the task finishes. - f1, f2 = random_path(), random_path() - dep = one_dep_large.remote(None, path=f1) + signal1, signal2 = SignalActor.remote(), SignalActor.remote() + dep = one_dep_large.remote(None, signal=signal1) check_refcounts({dep: (1, 0)}) - result = one_dep.remote(dep, path=f2) + result = one_dep.remote(dep, signal=signal2) check_refcounts({dep: (1, 1), result: (1, 0)}) - touch(f1) - ray.get(dep, timeout=5.0) + ray.get(signal1.send.remote()) + ray.get(dep, timeout=10) # Reference count should remain because the dependency is in plasma. check_refcounts({dep: (1, 1), result: (1, 0)}) - touch(f2) + ray.get(signal2.send.remote()) # Reference count should be removed because the task finished. check_refcounts({dep: (1, 0), result: (1, 0)}) del dep, result @@ -268,11 +247,11 @@ def test_dependency_refcounts(ray_start_regular): # Test that regular plasma dependency refcounts are decremented if a task # fails. - f = random_path() - large_dep = ray.put(large_object()) - result = one_dep.remote(large_dep, path=f, fail=True) + signal = SignalActor.remote() + large_dep = ray.put(np.zeros(10 * 1024 * 1024, dtype=np.uint8)) + result = one_dep.remote(large_dep, signal=signal, fail=True) check_refcounts({large_dep: (1, 1), result: (1, 0)}) - touch(f) + ray.get(signal.send.remote()) # Reference count should be removed once the task finishes. check_refcounts({large_dep: (1, 0), result: (1, 0)}) del large_dep, result @@ -280,16 +259,16 @@ def test_dependency_refcounts(ray_start_regular): # Test that spilled plasma dependency refcounts are decremented if a task # fails. - f1, f2 = random_path(), random_path() - dep = one_dep_large.remote(None, path=f1) + signal1, signal2 = SignalActor.remote(), SignalActor.remote() + dep = one_dep_large.remote(None, signal=signal1) check_refcounts({dep: (1, 0)}) - result = one_dep.remote(dep, path=f2, fail=True) + result = one_dep.remote(dep, signal=signal2, fail=True) check_refcounts({dep: (1, 1), result: (1, 0)}) - touch(f1) - ray.get(dep, timeout=5.0) + ray.get(signal1.send.remote()) + ray.get(dep, timeout=10) # Reference count should remain because the dependency is in plasma. check_refcounts({dep: (1, 1), result: (1, 0)}) - touch(f2) + ray.get(signal2.send.remote()) # Reference count should be removed because the task finished. check_refcounts({dep: (1, 0), result: (1, 0)}) del dep, result @@ -338,13 +317,13 @@ def test_pending_task_dependency_pinning(one_worker_100MiB): # the ray.get below due to the subsequent ray.puts that fill up the object # store. np_array = np.zeros(40 * 1024 * 1024, dtype=np.uint8) - random_oid = ray.ObjectID.from_random() - oid = pending.remote(np_array, random_oid) + signal = SignalActor.remote() + oid = pending.remote(np_array, signal.wait.remote()) for _ in range(2): ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8)) - ray.worker.global_worker.put_object(None, object_id=random_oid) + ray.get(signal.send.remote()) ray.get(oid) @@ -395,8 +374,8 @@ def test_basic_serialized_reference(one_worker_100MiB): return np.zeros(40 * 1024 * 1024, dtype=np.uint8) array_oid = put.remote() - random_oid = ray.ObjectID.from_random() - oid = pending.remote([array_oid], random_oid) + signal = SignalActor.remote() + oid = pending.remote([array_oid], signal.wait.remote()) # Remove the local reference. array_oid_bytes = array_oid.binary() @@ -406,7 +385,7 @@ def test_basic_serialized_reference(one_worker_100MiB): _fill_object_store_and_get(array_oid_bytes) # Fulfill the dependency, causing the task to finish. - ray.worker.global_worker.put_object(None, object_id=random_oid) + ray.get(signal.send.remote()) ray.get(oid) # Reference should be gone, check that array gets evicted. @@ -418,21 +397,22 @@ def test_basic_serialized_reference(one_worker_100MiB): # chain is running and should be removed once it finishes. def test_recursive_serialized_reference(one_worker_100MiB): @ray.remote - def recursive(ref, dep, max_depth, depth=0): + def recursive(ref, signal, max_depth, depth=0): ray.get(ref[0]) if depth == max_depth: - return ray.get(dep[0]) + return ray.get(signal.wait.remote()) else: - return recursive.remote(ref, dep, max_depth, depth + 1) + return recursive.remote(ref, signal, max_depth, depth + 1) @ray.remote def put(): return np.zeros(40 * 1024 * 1024, dtype=np.uint8) + signal = SignalActor.remote() + max_depth = 5 array_oid = put.remote() - random_oid = ray.ObjectID.from_random() - head_oid = recursive.remote([array_oid], [random_oid], max_depth) + head_oid = recursive.remote([array_oid], signal, max_depth) # Remove the local reference. array_oid_bytes = array_oid.binary() @@ -446,7 +426,7 @@ def test_recursive_serialized_reference(one_worker_100MiB): _fill_object_store_and_get(array_oid_bytes) # Fulfill the dependency, causing the tail task to finish. - ray.worker.global_worker.put_object(None, object_id=random_oid) + ray.get(signal.send.remote()) assert ray.get(tail_oid) is None # Reference should be gone, check that array gets evicted. @@ -511,19 +491,18 @@ def test_worker_holding_serialized_reference(one_worker_100MiB): return @ray.remote - def launch_pending_task(refs): - ref, dep = refs - return child.remote(ref, dep) + def launch_pending_task(ref, signal): + return child.remote(ref[0], signal.wait.remote()) @ray.remote def put(): return np.zeros(40 * 1024 * 1024, dtype=np.uint8) + signal = SignalActor.remote() + # Test that the reference held by the actor isn't evicted. array_oid = put.remote() - random_oid = ray.ObjectID.from_random() - child_return_id = ray.get( - launch_pending_task.remote([array_oid, random_oid])) + child_return_id = ray.get(launch_pending_task.remote([array_oid], signal)) # Remove the local reference. array_oid_bytes = array_oid.binary() @@ -532,7 +511,7 @@ def test_worker_holding_serialized_reference(one_worker_100MiB): # Test that the reference prevents the object from being evicted. _fill_object_store_and_get(array_oid_bytes) - ray.worker.global_worker.put_object(None, object_id=random_oid) + ray.get(signal.send.remote()) ray.get(child_return_id) del child_return_id @@ -560,24 +539,25 @@ def test_basic_nested_ids(one_worker_100MiB): # recursively and for submitted tasks. def test_recursively_nest_ids(one_worker_100MiB): @ray.remote - def recursive(ref, dep, max_depth, depth=0): + def recursive(ref, signal, max_depth, depth=0): unwrapped = ray.get(ref[0]) if depth == max_depth: - return ray.get(dep[0]) + return ray.get(signal.wait.remote()) else: - return recursive.remote(unwrapped, dep, max_depth, depth + 1) + return recursive.remote(unwrapped, signal, max_depth, depth + 1) @ray.remote def put(): return np.zeros(40 * 1024 * 1024, dtype=np.uint8) + signal = SignalActor.remote() + max_depth = 5 array_oid = put.remote() - random_oid = ray.ObjectID.from_random() nested_oid = array_oid for _ in range(max_depth): nested_oid = ray.put([nested_oid]) - head_oid = recursive.remote([nested_oid], [random_oid], max_depth) + head_oid = recursive.remote([nested_oid], signal, max_depth) # Remove the local reference. array_oid_bytes = array_oid.binary() @@ -591,7 +571,7 @@ def test_recursively_nest_ids(one_worker_100MiB): _fill_object_store_and_get(array_oid_bytes) # Fulfill the dependency, causing the tail task to finish. - ray.worker.global_worker.put_object(None, object_id=random_oid) + ray.get(signal.send.remote()) ray.get(tail_oid) # Reference should be gone, check that array gets evicted. @@ -638,14 +618,14 @@ def test_pass_returned_object_id(one_worker_100MiB): return [put.remote()] @ray.remote - def pending(ref, dep): - ray.get(dep[0]) + def pending(ref, signal): + ray.get(signal.wait.remote()) ray.get(ref[0]) + signal = SignalActor.remote() outer_oid = return_an_id.remote() inner_oid_binary = ray.get(outer_oid)[0].binary() - random_oid = ray.ObjectID.from_random() - pending_oid = pending.remote([outer_oid], [random_oid]) + pending_oid = pending.remote([outer_oid], signal) # Remove the local reference to the returned ID. del outer_oid @@ -654,7 +634,7 @@ def test_pass_returned_object_id(one_worker_100MiB): _fill_object_store_and_get(inner_oid_binary) # Check that the task finishing unpins the object. - ray.worker.global_worker.put_object(None, object_id=random_oid) + ray.get(signal.send.remote()) ray.get(pending_oid) _fill_object_store_and_get(inner_oid_binary, succeed=False) @@ -673,18 +653,18 @@ def test_recursively_pass_returned_object_id(one_worker_100MiB): return [put.remote()] @ray.remote - def recursive(ref, dep, max_depth, depth=0): + def recursive(ref, signal, max_depth, depth=0): ray.get(ref[0]) if depth == max_depth: - return ray.get(dep[0]) + return ray.get(signal.wait.remote()) else: - return recursive.remote(ref, dep, max_depth, depth + 1) + return recursive.remote(ref, signal, max_depth, depth + 1) max_depth = 5 outer_oid = return_an_id.remote() inner_oid_bytes = ray.get(outer_oid)[0].binary() - random_oid = ray.ObjectID.from_random() - head_oid = recursive.remote([outer_oid], [random_oid], max_depth) + signal = SignalActor.remote() + head_oid = recursive.remote([outer_oid], signal, max_depth) # Remove the local reference. del outer_oid @@ -697,7 +677,7 @@ def test_recursively_pass_returned_object_id(one_worker_100MiB): _fill_object_store_and_get(inner_oid_bytes) # Fulfill the dependency, causing the tail task to finish. - ray.worker.global_worker.put_object(None, object_id=random_oid) + ray.get(signal.send.remote()) ray.get(tail_oid) # Reference should be gone, check that returned ID gets evicted.