diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 27b284971..49bef9442 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -303,10 +303,8 @@ cdef void prepare_args( arg_data, string_to_buffer(serialized_arg.metadata)))) else: args_vector.push_back( - CTaskArg.PassByReference( - (CObjectID.FromBinary( - core_worker.put_serialized_cobject( - serialized_arg))))) + CTaskArg.PassByReference((CObjectID.FromBinary( + core_worker.put_serialized_cobject(serialized_arg))))) cdef deserialize_args( const c_vector[shared_ptr[CRayObject]] &c_args, @@ -1047,7 +1045,7 @@ cdef class CoreWorker: ref_counts = {} while it != c_ref_counts.end(): - object_id = ObjectID(dereference(it).first.Binary()) + object_id = dereference(it).first.Hex() ref_counts[object_id] = { "local": dereference(it).second.first, "submitted": dereference(it).second.second} diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index f900a1e39..c815a9458 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -16,13 +16,39 @@ import ray.test_utils logger = logging.getLogger(__name__) +@pytest.fixture +def one_worker_100MiB(request): + yield ray.init(num_cpus=1, object_store_memory=100 * 1024 * 1024) + ray.shutdown() + + +def _fill_object_store_and_get(oid, succeed=True, object_MiB=40, + num_objects=5): + for _ in range(num_objects): + ray.put(np.zeros(object_MiB * 1024 * 1024, dtype=np.uint8)) + + if type(oid) is bytes: + oid = ray.ObjectID(oid) + + if succeed: + ray.get(oid) + else: + if oid.is_direct_call_type(): + with pytest.raises(ray.exceptions.RayTimeoutError): + ray.get(oid, timeout=0.1) + else: + with pytest.raises(ray.exceptions.UnreconstructableError): + ray.get(oid) + + def _check_refcounts(expected): actual = ray.worker.global_worker.core_worker.get_all_reference_counts() assert len(expected) == len(actual) for object_id, (local, submitted) in expected.items(): - assert object_id in actual - assert local == actual[object_id]["local"] - assert submitted == actual[object_id]["submitted"] + hex_id = object_id.hex().encode("ascii") + assert hex_id in actual + assert local == actual[hex_id]["local"] + assert submitted == actual[hex_id]["submitted"] def check_refcounts(expected, timeout=10): @@ -154,9 +180,7 @@ def test_dependency_refcounts(ray_start_regular): check_refcounts({}) -def test_basic_pinning(shutdown_only): - ray.init(object_store_memory=100 * 1024 * 1024) - +def test_basic_pinning(one_worker_100MiB): @ray.remote def f(array): return np.sum(array) @@ -188,9 +212,7 @@ def test_basic_pinning(shutdown_only): ray.get(actor.get_large_object.remote()) -def test_pending_task_dependency_pinning(shutdown_only): - ray.init(object_store_memory=100 * 1024 * 1024, use_pickle=True) - +def test_pending_task_dependency_pinning(one_worker_100MiB): @ray.remote def pending(input1, input2): return @@ -200,13 +222,13 @@ def test_pending_task_dependency_pinning(shutdown_only): # the ray.get below due to the subsequent ray.puts that fill up the object # store. np_array = np.zeros(40 * 1024 * 1024, dtype=np.uint8) - random_id = ray.ObjectID.from_random() - oid = pending.remote(np_array, random_id) + random_oid = ray.ObjectID.from_random() + oid = pending.remote(np_array, random_oid) for _ in range(2): - ray.put(np_array) + ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8)) - ray.worker.global_worker.put_object(None, object_id=random_id) + ray.worker.global_worker.put_object(None, object_id=random_oid) ray.get(oid) @@ -236,15 +258,343 @@ def test_feature_flag(shutdown_only): actor = Actor.remote() ray.get(actor.wait_for_actor_to_start.remote()) - for batch in range(10): - intermediate_result = f.remote( - np.zeros(10 * 1024 * 1024, dtype=np.uint8)) - ray.get(intermediate_result) - # The ray.get below fails with only LRU eviction, as the object # that was ray.put by the actor should have been evicted. - with pytest.raises(ray.exceptions.RayTimeoutError): - ray.get(actor.get_large_object.remote(), timeout=1) + _fill_object_store_and_get(actor.get_large_object.remote(), succeed=False) + + +# Remote function takes serialized reference and doesn't hold onto it after +# finishing. Referenced object shouldn't be evicted while the task is pending +# and should be evicted after it returns. +@pytest.mark.skip("Serialized ObjectID reference counting not implemented.") +def test_basic_serialized_reference(one_worker_100MiB): + @ray.remote + def pending(ref, dep): + ray.get(ref[0]) + + # TODO(edoakes): currently these tests don't work with ray.put() so we need + # to return from a task like this instead. Once that is fixed, should have + # tests run with both codepaths. + @ray.remote + def put(): + return np.zeros(40 * 1024 * 1024, dtype=np.uint8) + + array_oid = put.remote() + random_oid = ray.ObjectID.from_random() + oid = pending.remote([array_oid], random_oid) + + # Remove the local reference. + array_oid_bytes = array_oid.binary() + del array_oid + + # Check that the remote reference pins the object. + _fill_object_store_and_get(array_oid_bytes) + + # Fulfill the dependency, causing the task to finish. + ray.worker.global_worker.put_object(None, object_id=random_oid) + ray.get(oid) + + # Reference should be gone, check that array gets evicted. + _fill_object_store_and_get(array_oid_bytes, succeed=False) + + +# Call a recursive chain of tasks that pass a serialized reference to the end +# of the chain. The reference should still exist while the final task in the +# chain is running and should be removed once it finishes. +@pytest.mark.skip("Serialized ObjectID reference counting not implemented.") +def test_recursive_serialized_reference(one_worker_100MiB): + @ray.remote + def recursive(ref, dep, max_depth, depth=0): + ray.get(ref[0]) + if depth == max_depth: + return ray.get(dep[0]) + else: + return recursive.remote(ref, dep, max_depth, depth + 1) + + @ray.remote + def put(): + return np.zeros(40 * 1024 * 1024, dtype=np.uint8) + + max_depth = 5 + array_oid = put.remote() + random_oid = ray.ObjectID.from_random() + head_oid = recursive.remote([array_oid], [random_oid], max_depth) + + # Remove the local reference. + array_oid_bytes = array_oid.binary() + del array_oid + + tail_oid = head_oid + for _ in range(max_depth - 1): + tail_oid = ray.get(tail_oid) + + # Check that the remote reference pins the object. + _fill_object_store_and_get(array_oid_bytes) + + # Fulfill the dependency, causing the tail task to finish. + ray.worker.global_worker.put_object(None, object_id=random_oid) + ray.get(tail_oid) + + # Reference should be gone, check that array gets evicted. + _fill_object_store_and_get(array_oid_bytes, succeed=False) + + +# Test that a passed reference held by an actor after the method finishes +# is kept until the reference is removed from the actor. Also tests giving +# the actor a duplicate reference to the same object ID. +@pytest.mark.skip("Serialized ObjectID reference counting not implemented.") +def test_actor_holding_serialized_reference(one_worker_100MiB): + @ray.remote + class GreedyActor(object): + def __init__(self): + pass + + def set_ref1(self, ref): + self.ref1 = ref + + def add_ref2(self, new_ref): + self.ref2 = new_ref + + def delete_ref1(self): + self.ref1 = None + + def delete_ref2(self): + self.ref2 = None + + @ray.remote + def put(): + return np.zeros(40 * 1024 * 1024, dtype=np.uint8) + + # Test that the reference held by the actor isn't evicted. + array_oid = put.remote() + actor = GreedyActor.remote() + actor.set_ref1.remote([array_oid]) + + # Test that giving the same actor a duplicate reference works. + ray.get(actor.add_ref2.remote([array_oid])) + + # Remove the local reference. + array_oid_bytes = array_oid.binary() + del array_oid + + # Test that the remote references still pin the object. + _fill_object_store_and_get(array_oid_bytes) + + # Test that removing only the first reference doesn't unpin the object. + ray.get(actor.delete_ref1.remote()) + _fill_object_store_and_get(array_oid_bytes) + + # Test that deleting the second reference stops it from being pinned. + ray.get(actor.delete_ref2.remote()) + _fill_object_store_and_get(array_oid_bytes, succeed=False) + + +# Test that a passed reference held by an actor after a task finishes +# is kept until the reference is removed from the worker. Also tests giving +# the worker a duplicate reference to the same object ID. +@pytest.mark.skip("Serialized ObjectID reference counting not implemented.") +def test_worker_holding_serialized_reference(one_worker_100MiB): + @ray.remote + def child(dep1, dep2): + return + + @ray.remote + def launch_pending_task(refs): + ref, dep = refs + return child.remote(ref, dep) + + @ray.remote + def put(): + return np.zeros(40 * 1024 * 1024, dtype=np.uint8) + + # Test that the reference held by the actor isn't evicted. + array_oid = put.remote() + random_oid = ray.ObjectID.from_random() + child_return_id = ray.get( + launch_pending_task.remote([array_oid, random_oid])) + + # Remove the local reference. + array_oid_bytes = array_oid.binary() + del array_oid + + # Test that the reference prevents the object from being evicted. + _fill_object_store_and_get(array_oid_bytes) + + ray.worker.global_worker.put_object(None, object_id=random_oid) + ray.get(child_return_id) + del child_return_id + + _fill_object_store_and_get(array_oid_bytes, succeed=False) + + +# Test that an object containing object IDs within it pins the inner IDs. +@pytest.mark.skip("Serialized ObjectID reference counting not implemented.") +def test_basic_nested_ids(one_worker_100MiB): + inner_oid = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8)) + outer_oid = ray.put([inner_oid]) + + # Remove the local reference to the inner object. + inner_oid_bytes = inner_oid.binary() + del inner_oid + + # Check that the outer reference pins the inner object. + _fill_object_store_and_get(inner_oid_bytes) + + # Remove the outer reference and check that the inner object gets evicted. + del outer_oid + _fill_object_store_and_get(inner_oid_bytes, succeed=False) + + +# Test that an object containing object IDs within it pins the inner IDs +# recursively and for submitted tasks. +@pytest.mark.skip("Serialized ObjectID reference counting not implemented.") +def test_recursively_nest_ids(one_worker_100MiB): + @ray.remote + def recursive(ref, dep, max_depth, depth=0): + unwrapped = ray.get(ref[0]) + if depth == max_depth: + return ray.get(dep[0]) + else: + return recursive.remote(unwrapped, dep, max_depth, depth + 1) + + @ray.remote + def put(): + return np.zeros(40 * 1024 * 1024, dtype=np.uint8) + + max_depth = 5 + array_oid = put.remote() + random_oid = ray.ObjectID.from_random() + nested_oid = array_oid + for _ in range(max_depth): + nested_oid = ray.put([nested_oid]) + head_oid = recursive.remote([nested_oid], [random_oid], max_depth) + + # Remove the local reference. + array_oid_bytes = array_oid.binary() + del array_oid, nested_oid + + tail_oid = head_oid + for _ in range(max_depth - 1): + tail_oid = ray.get(tail_oid) + + # Check that the remote reference pins the object. + _fill_object_store_and_get(array_oid_bytes) + + # Fulfill the dependency, causing the tail task to finish. + ray.worker.global_worker.put_object(None, object_id=random_oid) + ray.get(tail_oid) + + # Reference should be gone, check that array gets evicted. + _fill_object_store_and_get(array_oid_bytes, succeed=False) + + +# Test that serialized objectIDs returned from remote tasks are pinned until +# they go out of scope on the caller side. +@pytest.mark.skip("Serialized ObjectID reference counting not implemented.") +def test_return_object_id(one_worker_100MiB): + @ray.remote + def put(): + return np.zeros(40 * 1024 * 1024, dtype=np.uint8) + + @ray.remote + def return_an_id(): + return [put.remote()] + + outer_oid = return_an_id.remote() + inner_oid_binary = ray.get(outer_oid)[0].binary() + + # Check that the inner ID is pinned by the outer ID. + _fill_object_store_and_get(inner_oid_binary) + + # Check that taking a reference to the inner ID and removing the outer ID + # doesn't unpin the object. + inner_oid = ray.get(outer_oid)[0] + del outer_oid + _fill_object_store_and_get(inner_oid_binary) + + # Check that removing the inner ID unpins the object. + del inner_oid + _fill_object_store_and_get(inner_oid_binary, succeed=False) + + +# Test that serialized objectIDs returned from remote tasks are pinned if +# passed into another remote task by the caller. +@pytest.mark.skip("Serialized ObjectID reference counting not implemented.") +def test_pass_returned_object_id(one_worker_100MiB): + @ray.remote + def put(): + return np.zeros(40 * 1024 * 1024, dtype=np.uint8) + + @ray.remote + def return_an_id(): + return [put.remote()] + + @ray.remote + def pending(ref, dep): + ray.get(dep[0]) + ray.get(ref[0]) + + outer_oid = return_an_id.remote() + inner_oid_binary = ray.get(outer_oid)[0].binary() + random_oid = ray.ObjectID.from_random() + pending_oid = pending.remote([outer_oid], [random_oid]) + + # Remove the local reference to the returned ID. + del outer_oid + + # Check that the inner ID is pinned by the remote task ID. + _fill_object_store_and_get(inner_oid_binary) + + # Check that the task finishing unpins the object. + ray.worker.global_worker.put_object(None, object_id=random_oid) + ray.get(pending_oid) + _fill_object_store_and_get(inner_oid_binary, succeed=False) + + +# Call a recursive chain of tasks that pass a serialized reference that was +# returned by another task to the end of the chain. The reference should still +# exist while the final task in the chain is running and should be removed once +# it finishes. +@pytest.mark.skip("Serialized ObjectID reference counting not implemented.") +def test_recursively_pass_returned_object_id(one_worker_100MiB): + @ray.remote + def put(): + return np.zeros(40 * 1024 * 1024, dtype=np.uint8) + + @ray.remote + def return_an_id(): + return [put.remote()] + + @ray.remote + def recursive(ref, dep, max_depth, depth=0): + ray.get(ref[0]) + if depth == max_depth: + return ray.get(dep[0]) + else: + return recursive.remote(ref, dep, max_depth, depth + 1) + + max_depth = 5 + outer_oid = return_an_id.remote() + inner_oid_bytes = ray.get(outer_oid)[0].binary() + random_oid = ray.ObjectID.from_random() + head_oid = recursive.remote([outer_oid], [random_oid], max_depth) + + # Remove the local reference. + del outer_oid + + tail_oid = head_oid + for _ in range(max_depth - 1): + tail_oid = ray.get(tail_oid) + + # Check that the remote reference pins the object. + _fill_object_store_and_get(inner_oid_bytes) + + # Fulfill the dependency, causing the tail task to finish. + ray.worker.global_worker.put_object(None, object_id=random_oid) + ray.get(tail_oid) + + # Reference should be gone, check that returned ID gets evicted. + _fill_object_store_and_get(inner_oid_bytes, succeed=False) if __name__ == "__main__":