mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 11:01:06 +08:00
[core] Refactor distributed ref counting to remove owner task ID (#9049)
* Add intended worker ID to GetObjectStatus, tests * Remove TaskID owner_id * lint * Update message * lint * Fix build
This commit is contained in:
@@ -1112,24 +1112,20 @@ cdef class CoreWorker:
|
||||
def serialize_and_promote_object_id(self, ObjectID object_id):
|
||||
cdef:
|
||||
CObjectID c_object_id = object_id.native()
|
||||
CTaskID c_owner_id = CTaskID.Nil()
|
||||
CAddress c_owner_address = CAddress()
|
||||
CCoreWorkerProcess.GetCoreWorker().PromoteToPlasmaAndGetOwnershipInfo(
|
||||
c_object_id, &c_owner_id, &c_owner_address)
|
||||
c_object_id, &c_owner_address)
|
||||
return (object_id,
|
||||
TaskID(c_owner_id.Binary()),
|
||||
c_owner_address.SerializeAsString())
|
||||
|
||||
def deserialize_and_register_object_id(
|
||||
self, const c_string &object_id_binary, ObjectID outer_object_id,
|
||||
const c_string &owner_id_binary,
|
||||
const c_string &serialized_owner_address):
|
||||
cdef:
|
||||
CObjectID c_object_id = CObjectID.FromBinary(object_id_binary)
|
||||
CObjectID c_outer_object_id = (outer_object_id.native() if
|
||||
outer_object_id else
|
||||
CObjectID.Nil())
|
||||
CTaskID c_owner_id = CTaskID.FromBinary(owner_id_binary)
|
||||
CAddress c_owner_address = CAddress()
|
||||
|
||||
c_owner_address.ParseFromString(serialized_owner_address)
|
||||
@@ -1137,7 +1133,6 @@ cdef class CoreWorker:
|
||||
.RegisterOwnershipInfoAndResolveFuture(
|
||||
c_object_id,
|
||||
c_outer_object_id,
|
||||
c_owner_id,
|
||||
c_owner_address))
|
||||
|
||||
cdef store_task_outputs(
|
||||
|
||||
@@ -128,12 +128,10 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
|
||||
void RemoveLocalReference(const CObjectID &object_id)
|
||||
void PromoteObjectToPlasma(const CObjectID &object_id)
|
||||
void PromoteToPlasmaAndGetOwnershipInfo(const CObjectID &object_id,
|
||||
CTaskID *owner_id,
|
||||
CAddress *owner_address)
|
||||
void RegisterOwnershipInfoAndResolveFuture(
|
||||
const CObjectID &object_id,
|
||||
const CObjectID &outer_object_id,
|
||||
const CTaskID &owner_id,
|
||||
const CAddress &owner_address)
|
||||
|
||||
CRayStatus SetClientOptions(c_string client_name, int64_t limit)
|
||||
|
||||
@@ -125,14 +125,13 @@ class SerializationContext:
|
||||
self.add_contained_object_id(obj)
|
||||
worker = ray.worker.global_worker
|
||||
worker.check_connected()
|
||||
obj, owner_id, owner_address = (
|
||||
obj, owner_address = (
|
||||
worker.core_worker.serialize_and_promote_object_id(obj))
|
||||
obj = id_serializer(obj)
|
||||
owner_id = id_serializer(owner_id) if owner_id else owner_id
|
||||
return obj, owner_id, owner_address
|
||||
return obj, owner_address
|
||||
|
||||
def object_id_deserializer(serialized_obj):
|
||||
obj_id, owner_id, owner_address = serialized_obj
|
||||
obj_id, owner_address = serialized_obj
|
||||
# NOTE(swang): Must deserialize the object first before asking
|
||||
# the core worker to resolve the value. This is to make sure
|
||||
# that the ref count for the ObjectID is greater than 0 by the
|
||||
@@ -142,7 +141,7 @@ class SerializationContext:
|
||||
# to 'self' here instead, but this function is itself pickled
|
||||
# somewhere, which causes an error.
|
||||
context = ray.worker.global_worker.get_serialization_context()
|
||||
if owner_id:
|
||||
if owner_address:
|
||||
worker = ray.worker.global_worker
|
||||
worker.check_connected()
|
||||
# UniqueIDs are serialized as
|
||||
@@ -153,7 +152,7 @@ class SerializationContext:
|
||||
if outer_id is None:
|
||||
outer_id = ray.ObjectID.nil()
|
||||
worker.core_worker.deserialize_and_register_object_id(
|
||||
obj_id[1][0], outer_id, owner_id[1][0], owner_address)
|
||||
obj_id[1][0], outer_id, owner_address)
|
||||
return deserialized_object_id
|
||||
|
||||
for id_type in ray._raylet._ID_TYPES:
|
||||
|
||||
@@ -23,6 +23,7 @@ def one_worker_100MiB(request):
|
||||
config = json.dumps({
|
||||
"object_store_full_max_retries": 2,
|
||||
"task_retry_delay_ms": 0,
|
||||
"initial_reconstruction_timeout_milliseconds": 1000,
|
||||
})
|
||||
yield ray.init(
|
||||
num_cpus=1,
|
||||
@@ -282,6 +283,59 @@ def test_recursively_return_borrowed_object_id(one_worker_100MiB, use_ray_put,
|
||||
_fill_object_store_and_get(final_oid_bytes, succeed=False)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("failure", [False, True])
|
||||
def test_borrowed_id_failure(one_worker_100MiB, failure):
|
||||
@ray.remote
|
||||
class Parent:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def pass_ref(self, ref, borrower):
|
||||
self.ref = ref[0]
|
||||
ray.get(borrower.receive_ref.remote(ref))
|
||||
if failure:
|
||||
sys.exit(-1)
|
||||
|
||||
def ping(self):
|
||||
return
|
||||
|
||||
@ray.remote
|
||||
class Borrower:
|
||||
def __init__(self):
|
||||
self.ref = None
|
||||
|
||||
def receive_ref(self, ref):
|
||||
self.ref = ref[0]
|
||||
|
||||
def resolve_ref(self):
|
||||
assert self.ref is not None
|
||||
if failure:
|
||||
with pytest.raises(ray.exceptions.UnreconstructableError):
|
||||
ray.get(self.ref)
|
||||
else:
|
||||
ray.get(self.ref)
|
||||
|
||||
def ping(self):
|
||||
return
|
||||
|
||||
parent = Parent.remote()
|
||||
borrower = Borrower.remote()
|
||||
ray.get(borrower.ping.remote())
|
||||
|
||||
obj = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
|
||||
if failure:
|
||||
with pytest.raises(ray.exceptions.RayActorError):
|
||||
ray.get(parent.pass_ref.remote([obj], borrower))
|
||||
else:
|
||||
ray.get(parent.pass_ref.remote([obj], borrower))
|
||||
obj_bytes = obj.binary()
|
||||
del obj
|
||||
|
||||
_fill_object_store_and_get(obj_bytes, succeed=not failure)
|
||||
# The borrower should not hang when trying to get the object's value.
|
||||
ray.get(borrower.resolve_ref.remote())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
sys.exit(pytest.main(["-v", __file__]))
|
||||
|
||||
Reference in New Issue
Block a user