mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 16:13:54 +08:00
Allow large returns from direct actor calls (#6088)
This commit is contained in:
@@ -52,7 +52,7 @@ cdef extern from "ray/common/task/task_spec.h" nogil:
|
||||
c_bool ArgByRef(uint64_t arg_index) const
|
||||
int ArgIdCount(uint64_t arg_index) const
|
||||
CObjectID ArgId(uint64_t arg_index, uint64_t id_index) const
|
||||
CObjectID ReturnId(uint64_t return_index) const
|
||||
CObjectID ReturnIdForPlasma(uint64_t return_index) const
|
||||
const uint8_t *ArgData(uint64_t arg_index) const
|
||||
size_t ArgDataSize(uint64_t arg_index) const
|
||||
const uint8_t *ArgMetadata(uint64_t arg_index) const
|
||||
|
||||
@@ -108,7 +108,7 @@ cdef class TaskSpec:
|
||||
return_id_list = []
|
||||
for i in range(self.task_spec.get().NumReturns()):
|
||||
return_id_list.append(
|
||||
ObjectID(self.task_spec.get().ReturnId(i).Binary()))
|
||||
ObjectID(self.task_spec.get().ReturnIdForPlasma(i).Binary()))
|
||||
return return_id_list
|
||||
|
||||
def required_resources(self):
|
||||
|
||||
@@ -1206,6 +1206,25 @@ def test_direct_actor_enabled(ray_start_regular):
|
||||
assert ray.get(obj_id) == 2
|
||||
|
||||
|
||||
def test_direct_actor_large_objects(ray_start_regular):
|
||||
@ray.remote
|
||||
class Actor(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def f(self):
|
||||
time.sleep(1)
|
||||
return np.zeros(10000000)
|
||||
|
||||
a = Actor._remote(is_direct_call=True)
|
||||
obj_id = a.f.remote()
|
||||
assert not ray.worker.global_worker.core_worker.object_exists(obj_id)
|
||||
done, _ = ray.wait([obj_id])
|
||||
assert len(done) == 1
|
||||
assert ray.worker.global_worker.core_worker.object_exists(obj_id)
|
||||
assert isinstance(ray.get(obj_id), np.ndarray)
|
||||
|
||||
|
||||
def test_direct_actor_errors(ray_start_regular):
|
||||
@ray.remote
|
||||
class Actor(object):
|
||||
|
||||
@@ -463,6 +463,8 @@ class Worker(object):
|
||||
elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"):
|
||||
return UnreconstructableError(ray.ObjectID(object_id.binary()))
|
||||
else:
|
||||
assert error_type != ErrorType.Value("OBJECT_IN_PLASMA"), \
|
||||
"Tried to get object that has been promoted to plasma."
|
||||
assert False, "Unrecognized error type " + str(error_type)
|
||||
elif data:
|
||||
# If data is not empty, deserialize the object.
|
||||
|
||||
Reference in New Issue
Block a user