mirror of
https://github.com/wassname/ray.git
synced 2026-07-05 22:36:53 +08:00
Add python tests for serialized object ID reference counting (#7038)
This commit is contained in:
@@ -303,10 +303,8 @@ cdef void prepare_args(
|
||||
arg_data, string_to_buffer(serialized_arg.metadata))))
|
||||
else:
|
||||
args_vector.push_back(
|
||||
CTaskArg.PassByReference(
|
||||
(CObjectID.FromBinary(
|
||||
core_worker.put_serialized_cobject(
|
||||
serialized_arg)))))
|
||||
CTaskArg.PassByReference((CObjectID.FromBinary(
|
||||
core_worker.put_serialized_cobject(serialized_arg)))))
|
||||
|
||||
cdef deserialize_args(
|
||||
const c_vector[shared_ptr[CRayObject]] &c_args,
|
||||
@@ -1047,7 +1045,7 @@ cdef class CoreWorker:
|
||||
|
||||
ref_counts = {}
|
||||
while it != c_ref_counts.end():
|
||||
object_id = ObjectID(dereference(it).first.Binary())
|
||||
object_id = dereference(it).first.Hex()
|
||||
ref_counts[object_id] = {
|
||||
"local": dereference(it).second.first,
|
||||
"submitted": dereference(it).second.second}
|
||||
|
||||
@@ -16,13 +16,39 @@ import ray.test_utils
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def one_worker_100MiB(request):
|
||||
yield ray.init(num_cpus=1, object_store_memory=100 * 1024 * 1024)
|
||||
ray.shutdown()
|
||||
|
||||
|
||||
def _fill_object_store_and_get(oid, succeed=True, object_MiB=40,
|
||||
num_objects=5):
|
||||
for _ in range(num_objects):
|
||||
ray.put(np.zeros(object_MiB * 1024 * 1024, dtype=np.uint8))
|
||||
|
||||
if type(oid) is bytes:
|
||||
oid = ray.ObjectID(oid)
|
||||
|
||||
if succeed:
|
||||
ray.get(oid)
|
||||
else:
|
||||
if oid.is_direct_call_type():
|
||||
with pytest.raises(ray.exceptions.RayTimeoutError):
|
||||
ray.get(oid, timeout=0.1)
|
||||
else:
|
||||
with pytest.raises(ray.exceptions.UnreconstructableError):
|
||||
ray.get(oid)
|
||||
|
||||
|
||||
def _check_refcounts(expected):
|
||||
actual = ray.worker.global_worker.core_worker.get_all_reference_counts()
|
||||
assert len(expected) == len(actual)
|
||||
for object_id, (local, submitted) in expected.items():
|
||||
assert object_id in actual
|
||||
assert local == actual[object_id]["local"]
|
||||
assert submitted == actual[object_id]["submitted"]
|
||||
hex_id = object_id.hex().encode("ascii")
|
||||
assert hex_id in actual
|
||||
assert local == actual[hex_id]["local"]
|
||||
assert submitted == actual[hex_id]["submitted"]
|
||||
|
||||
|
||||
def check_refcounts(expected, timeout=10):
|
||||
@@ -154,9 +180,7 @@ def test_dependency_refcounts(ray_start_regular):
|
||||
check_refcounts({})
|
||||
|
||||
|
||||
def test_basic_pinning(shutdown_only):
|
||||
ray.init(object_store_memory=100 * 1024 * 1024)
|
||||
|
||||
def test_basic_pinning(one_worker_100MiB):
|
||||
@ray.remote
|
||||
def f(array):
|
||||
return np.sum(array)
|
||||
@@ -188,9 +212,7 @@ def test_basic_pinning(shutdown_only):
|
||||
ray.get(actor.get_large_object.remote())
|
||||
|
||||
|
||||
def test_pending_task_dependency_pinning(shutdown_only):
|
||||
ray.init(object_store_memory=100 * 1024 * 1024, use_pickle=True)
|
||||
|
||||
def test_pending_task_dependency_pinning(one_worker_100MiB):
|
||||
@ray.remote
|
||||
def pending(input1, input2):
|
||||
return
|
||||
@@ -200,13 +222,13 @@ def test_pending_task_dependency_pinning(shutdown_only):
|
||||
# the ray.get below due to the subsequent ray.puts that fill up the object
|
||||
# store.
|
||||
np_array = np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
random_id = ray.ObjectID.from_random()
|
||||
oid = pending.remote(np_array, random_id)
|
||||
random_oid = ray.ObjectID.from_random()
|
||||
oid = pending.remote(np_array, random_oid)
|
||||
|
||||
for _ in range(2):
|
||||
ray.put(np_array)
|
||||
ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
|
||||
|
||||
ray.worker.global_worker.put_object(None, object_id=random_id)
|
||||
ray.worker.global_worker.put_object(None, object_id=random_oid)
|
||||
ray.get(oid)
|
||||
|
||||
|
||||
@@ -236,15 +258,343 @@ def test_feature_flag(shutdown_only):
|
||||
actor = Actor.remote()
|
||||
ray.get(actor.wait_for_actor_to_start.remote())
|
||||
|
||||
for batch in range(10):
|
||||
intermediate_result = f.remote(
|
||||
np.zeros(10 * 1024 * 1024, dtype=np.uint8))
|
||||
ray.get(intermediate_result)
|
||||
|
||||
# The ray.get below fails with only LRU eviction, as the object
|
||||
# that was ray.put by the actor should have been evicted.
|
||||
with pytest.raises(ray.exceptions.RayTimeoutError):
|
||||
ray.get(actor.get_large_object.remote(), timeout=1)
|
||||
_fill_object_store_and_get(actor.get_large_object.remote(), succeed=False)
|
||||
|
||||
|
||||
# Remote function takes serialized reference and doesn't hold onto it after
|
||||
# finishing. Referenced object shouldn't be evicted while the task is pending
|
||||
# and should be evicted after it returns.
|
||||
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
|
||||
def test_basic_serialized_reference(one_worker_100MiB):
|
||||
@ray.remote
|
||||
def pending(ref, dep):
|
||||
ray.get(ref[0])
|
||||
|
||||
# TODO(edoakes): currently these tests don't work with ray.put() so we need
|
||||
# to return from a task like this instead. Once that is fixed, should have
|
||||
# tests run with both codepaths.
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
|
||||
array_oid = put.remote()
|
||||
random_oid = ray.ObjectID.from_random()
|
||||
oid = pending.remote([array_oid], random_oid)
|
||||
|
||||
# Remove the local reference.
|
||||
array_oid_bytes = array_oid.binary()
|
||||
del array_oid
|
||||
|
||||
# Check that the remote reference pins the object.
|
||||
_fill_object_store_and_get(array_oid_bytes)
|
||||
|
||||
# Fulfill the dependency, causing the task to finish.
|
||||
ray.worker.global_worker.put_object(None, object_id=random_oid)
|
||||
ray.get(oid)
|
||||
|
||||
# Reference should be gone, check that array gets evicted.
|
||||
_fill_object_store_and_get(array_oid_bytes, succeed=False)
|
||||
|
||||
|
||||
# Call a recursive chain of tasks that pass a serialized reference to the end
|
||||
# of the chain. The reference should still exist while the final task in the
|
||||
# chain is running and should be removed once it finishes.
|
||||
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
|
||||
def test_recursive_serialized_reference(one_worker_100MiB):
|
||||
@ray.remote
|
||||
def recursive(ref, dep, max_depth, depth=0):
|
||||
ray.get(ref[0])
|
||||
if depth == max_depth:
|
||||
return ray.get(dep[0])
|
||||
else:
|
||||
return recursive.remote(ref, dep, max_depth, depth + 1)
|
||||
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
|
||||
max_depth = 5
|
||||
array_oid = put.remote()
|
||||
random_oid = ray.ObjectID.from_random()
|
||||
head_oid = recursive.remote([array_oid], [random_oid], max_depth)
|
||||
|
||||
# Remove the local reference.
|
||||
array_oid_bytes = array_oid.binary()
|
||||
del array_oid
|
||||
|
||||
tail_oid = head_oid
|
||||
for _ in range(max_depth - 1):
|
||||
tail_oid = ray.get(tail_oid)
|
||||
|
||||
# Check that the remote reference pins the object.
|
||||
_fill_object_store_and_get(array_oid_bytes)
|
||||
|
||||
# Fulfill the dependency, causing the tail task to finish.
|
||||
ray.worker.global_worker.put_object(None, object_id=random_oid)
|
||||
ray.get(tail_oid)
|
||||
|
||||
# Reference should be gone, check that array gets evicted.
|
||||
_fill_object_store_and_get(array_oid_bytes, succeed=False)
|
||||
|
||||
|
||||
# Test that a passed reference held by an actor after the method finishes
|
||||
# is kept until the reference is removed from the actor. Also tests giving
|
||||
# the actor a duplicate reference to the same object ID.
|
||||
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
|
||||
def test_actor_holding_serialized_reference(one_worker_100MiB):
|
||||
@ray.remote
|
||||
class GreedyActor(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def set_ref1(self, ref):
|
||||
self.ref1 = ref
|
||||
|
||||
def add_ref2(self, new_ref):
|
||||
self.ref2 = new_ref
|
||||
|
||||
def delete_ref1(self):
|
||||
self.ref1 = None
|
||||
|
||||
def delete_ref2(self):
|
||||
self.ref2 = None
|
||||
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
|
||||
# Test that the reference held by the actor isn't evicted.
|
||||
array_oid = put.remote()
|
||||
actor = GreedyActor.remote()
|
||||
actor.set_ref1.remote([array_oid])
|
||||
|
||||
# Test that giving the same actor a duplicate reference works.
|
||||
ray.get(actor.add_ref2.remote([array_oid]))
|
||||
|
||||
# Remove the local reference.
|
||||
array_oid_bytes = array_oid.binary()
|
||||
del array_oid
|
||||
|
||||
# Test that the remote references still pin the object.
|
||||
_fill_object_store_and_get(array_oid_bytes)
|
||||
|
||||
# Test that removing only the first reference doesn't unpin the object.
|
||||
ray.get(actor.delete_ref1.remote())
|
||||
_fill_object_store_and_get(array_oid_bytes)
|
||||
|
||||
# Test that deleting the second reference stops it from being pinned.
|
||||
ray.get(actor.delete_ref2.remote())
|
||||
_fill_object_store_and_get(array_oid_bytes, succeed=False)
|
||||
|
||||
|
||||
# Test that a passed reference held by an actor after a task finishes
|
||||
# is kept until the reference is removed from the worker. Also tests giving
|
||||
# the worker a duplicate reference to the same object ID.
|
||||
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
|
||||
def test_worker_holding_serialized_reference(one_worker_100MiB):
|
||||
@ray.remote
|
||||
def child(dep1, dep2):
|
||||
return
|
||||
|
||||
@ray.remote
|
||||
def launch_pending_task(refs):
|
||||
ref, dep = refs
|
||||
return child.remote(ref, dep)
|
||||
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
|
||||
# Test that the reference held by the actor isn't evicted.
|
||||
array_oid = put.remote()
|
||||
random_oid = ray.ObjectID.from_random()
|
||||
child_return_id = ray.get(
|
||||
launch_pending_task.remote([array_oid, random_oid]))
|
||||
|
||||
# Remove the local reference.
|
||||
array_oid_bytes = array_oid.binary()
|
||||
del array_oid
|
||||
|
||||
# Test that the reference prevents the object from being evicted.
|
||||
_fill_object_store_and_get(array_oid_bytes)
|
||||
|
||||
ray.worker.global_worker.put_object(None, object_id=random_oid)
|
||||
ray.get(child_return_id)
|
||||
del child_return_id
|
||||
|
||||
_fill_object_store_and_get(array_oid_bytes, succeed=False)
|
||||
|
||||
|
||||
# Test that an object containing object IDs within it pins the inner IDs.
|
||||
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
|
||||
def test_basic_nested_ids(one_worker_100MiB):
|
||||
inner_oid = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
|
||||
outer_oid = ray.put([inner_oid])
|
||||
|
||||
# Remove the local reference to the inner object.
|
||||
inner_oid_bytes = inner_oid.binary()
|
||||
del inner_oid
|
||||
|
||||
# Check that the outer reference pins the inner object.
|
||||
_fill_object_store_and_get(inner_oid_bytes)
|
||||
|
||||
# Remove the outer reference and check that the inner object gets evicted.
|
||||
del outer_oid
|
||||
_fill_object_store_and_get(inner_oid_bytes, succeed=False)
|
||||
|
||||
|
||||
# Test that an object containing object IDs within it pins the inner IDs
|
||||
# recursively and for submitted tasks.
|
||||
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
|
||||
def test_recursively_nest_ids(one_worker_100MiB):
|
||||
@ray.remote
|
||||
def recursive(ref, dep, max_depth, depth=0):
|
||||
unwrapped = ray.get(ref[0])
|
||||
if depth == max_depth:
|
||||
return ray.get(dep[0])
|
||||
else:
|
||||
return recursive.remote(unwrapped, dep, max_depth, depth + 1)
|
||||
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
|
||||
max_depth = 5
|
||||
array_oid = put.remote()
|
||||
random_oid = ray.ObjectID.from_random()
|
||||
nested_oid = array_oid
|
||||
for _ in range(max_depth):
|
||||
nested_oid = ray.put([nested_oid])
|
||||
head_oid = recursive.remote([nested_oid], [random_oid], max_depth)
|
||||
|
||||
# Remove the local reference.
|
||||
array_oid_bytes = array_oid.binary()
|
||||
del array_oid, nested_oid
|
||||
|
||||
tail_oid = head_oid
|
||||
for _ in range(max_depth - 1):
|
||||
tail_oid = ray.get(tail_oid)
|
||||
|
||||
# Check that the remote reference pins the object.
|
||||
_fill_object_store_and_get(array_oid_bytes)
|
||||
|
||||
# Fulfill the dependency, causing the tail task to finish.
|
||||
ray.worker.global_worker.put_object(None, object_id=random_oid)
|
||||
ray.get(tail_oid)
|
||||
|
||||
# Reference should be gone, check that array gets evicted.
|
||||
_fill_object_store_and_get(array_oid_bytes, succeed=False)
|
||||
|
||||
|
||||
# Test that serialized objectIDs returned from remote tasks are pinned until
|
||||
# they go out of scope on the caller side.
|
||||
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
|
||||
def test_return_object_id(one_worker_100MiB):
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
|
||||
@ray.remote
|
||||
def return_an_id():
|
||||
return [put.remote()]
|
||||
|
||||
outer_oid = return_an_id.remote()
|
||||
inner_oid_binary = ray.get(outer_oid)[0].binary()
|
||||
|
||||
# Check that the inner ID is pinned by the outer ID.
|
||||
_fill_object_store_and_get(inner_oid_binary)
|
||||
|
||||
# Check that taking a reference to the inner ID and removing the outer ID
|
||||
# doesn't unpin the object.
|
||||
inner_oid = ray.get(outer_oid)[0]
|
||||
del outer_oid
|
||||
_fill_object_store_and_get(inner_oid_binary)
|
||||
|
||||
# Check that removing the inner ID unpins the object.
|
||||
del inner_oid
|
||||
_fill_object_store_and_get(inner_oid_binary, succeed=False)
|
||||
|
||||
|
||||
# Test that serialized objectIDs returned from remote tasks are pinned if
|
||||
# passed into another remote task by the caller.
|
||||
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
|
||||
def test_pass_returned_object_id(one_worker_100MiB):
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
|
||||
@ray.remote
|
||||
def return_an_id():
|
||||
return [put.remote()]
|
||||
|
||||
@ray.remote
|
||||
def pending(ref, dep):
|
||||
ray.get(dep[0])
|
||||
ray.get(ref[0])
|
||||
|
||||
outer_oid = return_an_id.remote()
|
||||
inner_oid_binary = ray.get(outer_oid)[0].binary()
|
||||
random_oid = ray.ObjectID.from_random()
|
||||
pending_oid = pending.remote([outer_oid], [random_oid])
|
||||
|
||||
# Remove the local reference to the returned ID.
|
||||
del outer_oid
|
||||
|
||||
# Check that the inner ID is pinned by the remote task ID.
|
||||
_fill_object_store_and_get(inner_oid_binary)
|
||||
|
||||
# Check that the task finishing unpins the object.
|
||||
ray.worker.global_worker.put_object(None, object_id=random_oid)
|
||||
ray.get(pending_oid)
|
||||
_fill_object_store_and_get(inner_oid_binary, succeed=False)
|
||||
|
||||
|
||||
# Call a recursive chain of tasks that pass a serialized reference that was
|
||||
# returned by another task to the end of the chain. The reference should still
|
||||
# exist while the final task in the chain is running and should be removed once
|
||||
# it finishes.
|
||||
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
|
||||
def test_recursively_pass_returned_object_id(one_worker_100MiB):
|
||||
@ray.remote
|
||||
def put():
|
||||
return np.zeros(40 * 1024 * 1024, dtype=np.uint8)
|
||||
|
||||
@ray.remote
|
||||
def return_an_id():
|
||||
return [put.remote()]
|
||||
|
||||
@ray.remote
|
||||
def recursive(ref, dep, max_depth, depth=0):
|
||||
ray.get(ref[0])
|
||||
if depth == max_depth:
|
||||
return ray.get(dep[0])
|
||||
else:
|
||||
return recursive.remote(ref, dep, max_depth, depth + 1)
|
||||
|
||||
max_depth = 5
|
||||
outer_oid = return_an_id.remote()
|
||||
inner_oid_bytes = ray.get(outer_oid)[0].binary()
|
||||
random_oid = ray.ObjectID.from_random()
|
||||
head_oid = recursive.remote([outer_oid], [random_oid], max_depth)
|
||||
|
||||
# Remove the local reference.
|
||||
del outer_oid
|
||||
|
||||
tail_oid = head_oid
|
||||
for _ in range(max_depth - 1):
|
||||
tail_oid = ray.get(tail_oid)
|
||||
|
||||
# Check that the remote reference pins the object.
|
||||
_fill_object_store_and_get(inner_oid_bytes)
|
||||
|
||||
# Fulfill the dependency, causing the tail task to finish.
|
||||
ray.worker.global_worker.put_object(None, object_id=random_oid)
|
||||
ray.get(tail_oid)
|
||||
|
||||
# Reference should be gone, check that returned ID gets evicted.
|
||||
_fill_object_store_and_get(inner_oid_bytes, succeed=False)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user