Basic direct actor call support in Python (#5991)

This commit is contained in:
Eric Liang
2019-10-28 22:09:04 -07:00
committed by GitHub
parent 4c4342c165
commit b89cac976a
20 changed files with 283 additions and 62 deletions
+54 -7
View File
@@ -455,7 +455,18 @@ cdef deserialize_args(
return ray.signature.recover_args(args)
cdef _store_task_outputs(worker, return_ids, outputs):
cdef _store_task_outputs(
worker, return_ids, outputs,
c_bool return_outputs_directly,
c_vector[shared_ptr[CRayObject]] *returns):
# Direct actor call returns are not placed in the object store directly,
# but returned to the core worker.
if return_outputs_directly:
return_buffer = []
else:
return_buffer = None
for i in range(len(return_ids)):
return_id, output = return_ids[i], outputs[i]
if isinstance(output, ray.actor.ActorHandle):
@@ -468,7 +479,13 @@ cdef _store_task_outputs(worker, return_ids, outputs):
"from a remote function, but the corresponding "
"ObjectID does not exist in the local object store.")
else:
worker.put_object(output, object_id=return_id)
worker.put_object(
output, object_id=return_id, return_buffer=return_buffer)
if return_outputs_directly:
assert len(return_ids) == len(return_buffer), \
(return_ids, return_buffer)
push_objects_into_return_vector(return_buffer, returns)
cdef execute_task(
@@ -478,6 +495,7 @@ cdef execute_task(
const c_vector[shared_ptr[CRayObject]] &c_args,
const c_vector[CObjectID] &c_arg_reference_ids,
const c_vector[CObjectID] &c_return_ids,
c_bool return_outputs_directly,
c_vector[shared_ptr[CRayObject]] *returns):
worker = ray.worker.global_worker
@@ -566,7 +584,9 @@ cdef execute_task(
# Store the outputs in the object store.
with core_worker.profile_event(b"task:store_outputs"):
_store_task_outputs(worker, return_ids, outputs)
_store_task_outputs(
worker, return_ids, outputs, return_outputs_directly,
returns)
except Exception as error:
if (<int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK):
worker.mark_actor_init_failed(error)
@@ -581,7 +601,8 @@ cdef execute_task(
failure_object = RayTaskError(function_name, backtrace,
error.__class__)
_store_task_outputs(
worker, return_ids, [failure_object] * len(return_ids))
worker, return_ids, [failure_object] * len(return_ids),
return_outputs_directly, returns)
ray.utils.push_error_to_driver(
worker,
ray_constants.TASK_PUSH_ERROR,
@@ -621,6 +642,7 @@ cdef CRayStatus task_execution_handler(
const c_vector[shared_ptr[CRayObject]] &c_args,
const c_vector[CObjectID] &c_arg_reference_ids,
const c_vector[CObjectID] &c_return_ids,
c_bool return_results_directly,
c_vector[shared_ptr[CRayObject]] *returns) nogil:
with gil:
@@ -628,7 +650,8 @@ cdef CRayStatus task_execution_handler(
# The call to execute_task should never raise an exception. If it
# does, that indicates that there was an unexpected internal error.
execute_task(task_type, ray_function, c_resources, c_args,
c_arg_reference_ids, c_return_ids, returns)
c_arg_reference_ids, c_return_ids,
return_results_directly, returns)
except Exception:
traceback_str = traceback.format_exc() + (
"An unexpected internal error occurred while the worker was"
@@ -654,6 +677,29 @@ cdef CRayStatus check_signals() nogil:
return CRayStatus.Interrupted(b"")
return CRayStatus.OK()
cdef void push_objects_into_return_vector(
py_objects,
c_vector[shared_ptr[CRayObject]] *returns):
cdef:
shared_ptr[CBuffer] data
shared_ptr[CBuffer] metadata
shared_ptr[CRayObject] ray_object
int64_t data_size
for serialized_object in py_objects:
data_size = serialized_object.total_bytes
data = dynamic_pointer_cast[
CBuffer, LocalMemoryBuffer](
make_shared[LocalMemoryBuffer](data_size))
stream = pyarrow.FixedSizeBufferWriter(
pyarrow.py_buffer(Buffer.make(data)))
serialized_object.write_to(stream)
ray_object = make_shared[CRayObject](data, metadata)
returns.push_back(ray_object)
cdef class CoreWorker:
cdef unique_ptr[CCoreWorker] core_worker
@@ -901,7 +947,8 @@ cdef class CoreWorker:
args,
uint64_t max_reconstructions,
resources,
placement_resources):
placement_resources,
c_bool is_direct_call):
cdef:
CRayFunction ray_function
c_vector[CTaskArg] args_vector
@@ -921,7 +968,7 @@ cdef class CoreWorker:
check_status(self.core_worker.get().CreateActor(
ray_function, args_vector,
CActorCreationOptions(
max_reconstructions, False, c_resources,
max_reconstructions, is_direct_call, c_resources,
c_placement_resources, dynamic_worker_options),
&c_actor_id))
+4 -2
View File
@@ -306,7 +306,8 @@ class ActorClass(object):
num_gpus=None,
memory=None,
object_store_memory=None,
resources=None):
resources=None,
is_direct_call=None):
"""Create an actor.
This method allows more flexibility than the remote method because
@@ -323,6 +324,7 @@ class ActorClass(object):
this actor when creating objects.
resources: The custom resources required by the actor creation
task.
is_direct_call: Use direct actor calls.
Returns:
A handle to the newly created actor.
@@ -401,7 +403,7 @@ class ActorClass(object):
actor_id = worker.core_worker.create_actor(
function_descriptor.get_function_descriptor_list(),
creation_args, meta.max_reconstructions, resources,
actor_placement_resources)
actor_placement_resources, is_direct_call)
actor_handle = ActorHandle(
actor_id,
+1
View File
@@ -171,6 +171,7 @@ cdef extern from "ray/common/buffer.h" namespace "ray" nogil:
cdef cppclass LocalMemoryBuffer(CBuffer):
LocalMemoryBuffer(uint8_t *data, size_t size, c_bool copy_data)
LocalMemoryBuffer(size_t size)
cdef extern from "ray/common/ray_object.h" nogil:
cdef cppclass CRayObject "ray::RayObject":
+1
View File
@@ -62,6 +62,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const c_vector[shared_ptr[CRayObject]] &args,
const c_vector[CObjectID] &arg_reference_ids,
const c_vector[CObjectID] &return_ids,
c_bool is_direct_call,
c_vector[shared_ptr[CRayObject]] *returns) nogil,
CRayStatus() nogil)
void Disconnect()
+2
View File
@@ -148,6 +148,8 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
c_bool is_put()
c_bool IsDirectActorType()
int64_t ObjectIndex() const
CTaskID TaskId() const
+3
View File
@@ -176,6 +176,9 @@ cdef class ObjectID(BaseID):
def hex(self):
return decode(self.data.Hex())
def is_direct_actor_type(self):
return self.data.IsDirectActorType()
def is_nil(self):
return self.data.IsNil()
+45 -11
View File
@@ -15,6 +15,15 @@ class Actor(object):
ray.get([small_value.remote() for _ in range(n)])
@ray.remote
class Client(object):
def __init__(self, server):
self.server = server
def small_value_batch(self, n):
ray.get([self.server.small_value.remote() for _ in range(n)])
@ray.remote
def small_value():
return 0
@@ -54,17 +63,17 @@ def main():
def get_small():
ray.get(value)
timeit("single core get calls", get_small)
timeit("single client get calls", get_small)
def put_small():
ray.put(0)
timeit("single core put calls", put_small)
timeit("single client put calls", put_small)
def put_large():
ray.put(arr)
timeit("single core put gigabytes", put_large, 8 * 0.1)
timeit("single client put gigabytes", put_large, 8 * 0.1)
@ray.remote
def do_put_small():
@@ -74,7 +83,7 @@ def main():
def put_multi_small():
ray.get([do_put_small.remote() for _ in range(10)])
timeit("multi core put calls", put_multi_small, 1000)
timeit("multi client put calls", put_multi_small, 1000)
@ray.remote
def do_put():
@@ -84,17 +93,17 @@ def main():
def put_multi():
ray.get([do_put.remote() for _ in range(10)])
timeit("multi core put gigabytes", put_multi, 10 * 8 * 0.1)
timeit("multi client put gigabytes", put_multi, 10 * 8 * 0.1)
def small_task():
ray.get(small_value.remote())
timeit("single core tasks sync", small_task)
timeit("single client tasks sync", small_task)
def small_task_async():
ray.get([small_value.remote() for _ in range(1000)])
timeit("single core tasks async", small_task_async, 1000)
timeit("single client tasks async", small_task_async, 1000)
n = 10000
m = 4
@@ -104,21 +113,21 @@ def main():
submitted = [a.small_value_batch.remote(n) for a in actors]
ray.get(submitted)
timeit("multi core tasks async", multi_task, n * m)
timeit("multi client tasks async", multi_task, n * m)
a = Actor.remote()
def actor_sync():
ray.get(a.small_value.remote())
timeit("single core actor calls sync", actor_sync)
timeit("single client actor calls sync", actor_sync)
a = Actor.remote()
def actor_async():
ray.get([a.small_value.remote() for _ in range(1000)])
timeit("single core actor calls async", actor_async, 1000)
timeit("single client actor calls async", actor_async, 1000)
n_cpu = multiprocessing.cpu_count() // 2
a = [Actor.remote() for _ in range(n_cpu)]
@@ -130,7 +139,32 @@ def main():
def actor_multi2():
ray.get([work.remote(a) for _ in range(m)])
timeit("multi core actor calls async", actor_multi2, m * n)
timeit("multi client actor calls async", actor_multi2, m * n)
a = Actor._remote(is_direct_call=True)
def actor_sync_direct():
ray.get(a.small_value.remote())
timeit("single client direct actor calls sync", actor_sync_direct)
a = Actor._remote(is_direct_call=True)
def actor_async_direct():
ray.get([a.small_value.remote() for _ in range(1000)])
timeit("single client direct actor calls async", actor_async_direct, 1000)
n = 5000
n_cpu = multiprocessing.cpu_count() // 2
actors = [Actor._remote(is_direct_call=True) for _ in range(n_cpu)]
clients = [Client.remote(a) for a in actors]
def actor_multi2_direct():
ray.get([c.small_value_batch.remote(n) for c in clients])
timeit("multi client direct actor calls async", actor_multi2_direct,
n * len(clients))
if __name__ == "__main__":
+8
View File
@@ -7,6 +7,7 @@ import funcsigs
from funcsigs import Parameter
import logging
import ray
from ray.utils import is_cython
# Logger for this module. It should be configured at the entry point
@@ -134,6 +135,13 @@ def flatten_args(signature_parameters, args, kwargs):
>>> flatten_args([1, 2, 3], {"a": 4})
[None, 1, None, 2, None, 3, "a", 4]
"""
for obj in args:
if isinstance(obj, ray.ObjectID) and obj.is_direct_actor_type():
raise NotImplementedError(
"Objects produced by direct actor calls cannot be "
"passed to other tasks as arguments.")
restored = _restore_parameters(signature_parameters)
reconstructed_signature = funcsigs.Signature(parameters=restored)
try:
+67
View File
@@ -1184,6 +1184,73 @@ def test_get_dict(ray_start_regular):
assert result == expected
def test_direct_actor_enabled(ray_start_regular):
@ray.remote
class Actor(object):
def __init__(self):
pass
def f(self, x):
return x * 2
a = Actor._remote(is_direct_call=True)
obj_id = a.f.remote(1)
# it is not stored in plasma
assert not ray.worker.global_worker.core_worker.object_exists(obj_id)
assert ray.get(obj_id) == 2
def test_direct_actor_errors(ray_start_regular):
@ray.remote
class Actor(object):
def __init__(self):
pass
def f(self, x):
return x * 2
@ray.remote
def f(x):
return 1
a = Actor._remote(is_direct_call=True)
# cannot pass returns to other methods directly
with pytest.raises(Exception):
ray.get(f.remote(a.f.remote(2)))
# cannot pass returns to other methods even in a list
with pytest.raises(Exception):
ray.get(f.remote([a.f.remote(2)]))
# by ref args not implemented
with pytest.raises(ray.exceptions.RayletError):
a.f.remote(f.remote(2))
def test_direct_actor_recursive(ray_start_regular):
@ray.remote
class Actor(object):
def __init__(self, delegate=None):
self.delegate = delegate
def f(self, x):
if self.delegate:
return ray.get(self.delegate.f.remote(x))
return x * 2
a = Actor._remote(is_direct_call=True)
b = Actor._remote(args=[a], is_direct_call=False)
c = Actor._remote(args=[b], is_direct_call=True)
result = ray.get([c.f.remote(i) for i in range(100)])
assert result == [x * 2 for x in range(100)]
result, _ = ray.wait([c.f.remote(i) for i in range(100)], num_returns=100)
result = ray.get(result)
assert result == [x * 2 for x in range(100)]
def test_wait(ray_start_regular):
@ray.remote
def f(delay):
+36 -8
View File
@@ -251,7 +251,7 @@ class Worker(object):
"""
self.mode = mode
def put_object(self, value, object_id=None):
def put_object(self, value, object_id=None, return_buffer=None):
"""Put value in the local object store with object id `objectid`.
This assumes that the value for `objectid` has not yet been placed in
@@ -265,6 +265,8 @@ class Worker(object):
value: The value to put in the object store.
object_id (object_id.ObjectID): The object ID of the value to be
put. If None, one will be generated.
return_buffer: If specified, append returns to this list instead
of storing directly in the object store.
Returns:
object_id.ObjectID: The object ID the object was put under.
@@ -284,6 +286,9 @@ class Worker(object):
"call 'put' on it (or return it).")
if isinstance(value, bytes):
if return_buffer is not None:
raise NotImplementedError(
"returning raw buffers from direct actor calls")
# If the object is a byte array, skip serializing it and
# use a special metadata to indicate it's raw binary. So
# that this object can also be read by Java.
@@ -293,9 +298,13 @@ class Worker(object):
memcopy_threads=self.memcopy_threads)
if self.use_pickle:
if return_buffer is not None:
raise NotImplementedError(
"pickle5 serialization with direct actor calls")
return self._serialize_and_put_pickle5(value, object_id=object_id)
else:
return self._serialize_and_put_pyarrow(value, object_id=object_id)
return self._serialize_and_put_pyarrow(
value, object_id=object_id, return_buffer=return_buffer)
def _serialize_and_put_pickle5(self, value, object_id=None):
"""Serialize an object using pickle5 and store it in the object store.
@@ -321,13 +330,18 @@ class Worker(object):
object_id=object_id,
memcopy_threads=self.memcopy_threads)
def _serialize_and_put_pyarrow(self, value, object_id=None):
def _serialize_and_put_pyarrow(self,
value,
object_id=None,
return_buffer=None):
"""Wraps `store_and_register` with cases for existence and pickling.
Args:
object_id (object_id.ObjectID): The object ID of the value to be
put.
value: The value to put in the object store.
return_buffer: If specified, append returns to this list instead
of storing directly in the object store.
"""
try:
serialized_value = self._serialize_with_pyarrow(value)
@@ -341,10 +355,13 @@ class Worker(object):
"falling back to cloudpickle.".format(type(value)))
serialized_value = self._serialize_with_pyarrow(value)
return self.core_worker.put_serialized_object(
serialized_value,
object_id=object_id,
memcopy_threads=self.memcopy_threads)
if return_buffer is not None:
return_buffer.append(serialized_value)
else:
return self.core_worker.put_serialized_object(
serialized_value,
object_id=object_id,
memcopy_threads=self.memcopy_threads)
def _serialize_with_pyarrow(self, value, depth=100):
"""Store an object and attempt to register its class if needed.
@@ -721,11 +738,22 @@ def _initialize_serialization(job_id, worker=global_worker):
serialization_context.set_pickle(pickle.dumps, pickle.loads)
pyarrow.register_torch_serialization_handlers(serialization_context)
def id_serializer(obj):
if isinstance(obj, ray.ObjectID) and obj.is_direct_actor_type():
raise NotImplementedError(
"Objects produced by direct actor calls cannot be "
"passed to other tasks as arguments.")
return pickle.dumps(obj)
def id_deserializer(serialized_obj):
return pickle.loads(serialized_obj)
for id_type in ray._raylet._ID_TYPES:
serialization_context.register_type(
id_type,
"{}.{}".format(id_type.__module__, id_type.__name__),
pickle=True)
custom_serializer=id_serializer,
custom_deserializer=id_deserializer)
def actor_handle_serializer(obj):
return obj._serialization_helper(True)