From b89cac976ae171d6d9b3245394e4932288fc6f11 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 28 Oct 2019 22:09:04 -0700 Subject: [PATCH] Basic direct actor call support in Python (#5991) --- python/ray/_raylet.pyx | 61 +++++++++++++++-- python/ray/actor.py | 6 +- python/ray/includes/common.pxd | 1 + python/ray/includes/libcoreworker.pxd | 1 + python/ray/includes/unique_ids.pxd | 2 + python/ray/includes/unique_ids.pxi | 3 + python/ray/ray_perf.py | 56 +++++++++++++--- python/ray/signature.py | 8 +++ python/ray/tests/test_basic.py | 67 +++++++++++++++++++ python/ray/worker.py | 44 +++++++++--- src/ray/common/buffer.h | 11 ++- src/ray/common/id.h | 9 +++ src/ray/core_worker/common.h | 2 - src/ray/core_worker/context.cc | 1 + src/ray/core_worker/core_worker.cc | 42 +++++++----- src/ray/core_worker/core_worker.h | 2 +- src/ray/core_worker/test/mock_worker.cc | 10 +-- .../transport/direct_actor_transport.cc | 10 +-- .../transport/direct_actor_transport.h | 6 +- src/ray/object_manager/object_manager.cc | 3 +- 20 files changed, 283 insertions(+), 62 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index e7f30ec51..82f5b2eaa 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -455,7 +455,18 @@ cdef deserialize_args( return ray.signature.recover_args(args) -cdef _store_task_outputs(worker, return_ids, outputs): +cdef _store_task_outputs( + worker, return_ids, outputs, + c_bool return_outputs_directly, + c_vector[shared_ptr[CRayObject]] *returns): + + # Direct actor call returns are not placed in the object store directly, + # but returned to the core worker. + if return_outputs_directly: + return_buffer = [] + else: + return_buffer = None + for i in range(len(return_ids)): return_id, output = return_ids[i], outputs[i] if isinstance(output, ray.actor.ActorHandle): @@ -468,7 +479,13 @@ cdef _store_task_outputs(worker, return_ids, outputs): "from a remote function, but the corresponding " "ObjectID does not exist in the local object store.") else: - worker.put_object(output, object_id=return_id) + worker.put_object( + output, object_id=return_id, return_buffer=return_buffer) + + if return_outputs_directly: + assert len(return_ids) == len(return_buffer), \ + (return_ids, return_buffer) + push_objects_into_return_vector(return_buffer, returns) cdef execute_task( @@ -478,6 +495,7 @@ cdef execute_task( const c_vector[shared_ptr[CRayObject]] &c_args, const c_vector[CObjectID] &c_arg_reference_ids, const c_vector[CObjectID] &c_return_ids, + c_bool return_outputs_directly, c_vector[shared_ptr[CRayObject]] *returns): worker = ray.worker.global_worker @@ -566,7 +584,9 @@ cdef execute_task( # Store the outputs in the object store. with core_worker.profile_event(b"task:store_outputs"): - _store_task_outputs(worker, return_ids, outputs) + _store_task_outputs( + worker, return_ids, outputs, return_outputs_directly, + returns) except Exception as error: if (task_type == TASK_TYPE_ACTOR_CREATION_TASK): worker.mark_actor_init_failed(error) @@ -581,7 +601,8 @@ cdef execute_task( failure_object = RayTaskError(function_name, backtrace, error.__class__) _store_task_outputs( - worker, return_ids, [failure_object] * len(return_ids)) + worker, return_ids, [failure_object] * len(return_ids), + return_outputs_directly, returns) ray.utils.push_error_to_driver( worker, ray_constants.TASK_PUSH_ERROR, @@ -621,6 +642,7 @@ cdef CRayStatus task_execution_handler( const c_vector[shared_ptr[CRayObject]] &c_args, const c_vector[CObjectID] &c_arg_reference_ids, const c_vector[CObjectID] &c_return_ids, + c_bool return_results_directly, c_vector[shared_ptr[CRayObject]] *returns) nogil: with gil: @@ -628,7 +650,8 @@ cdef CRayStatus task_execution_handler( # The call to execute_task should never raise an exception. If it # does, that indicates that there was an unexpected internal error. execute_task(task_type, ray_function, c_resources, c_args, - c_arg_reference_ids, c_return_ids, returns) + c_arg_reference_ids, c_return_ids, + return_results_directly, returns) except Exception: traceback_str = traceback.format_exc() + ( "An unexpected internal error occurred while the worker was" @@ -654,6 +677,29 @@ cdef CRayStatus check_signals() nogil: return CRayStatus.Interrupted(b"") return CRayStatus.OK() + +cdef void push_objects_into_return_vector( + py_objects, + c_vector[shared_ptr[CRayObject]] *returns): + + cdef: + shared_ptr[CBuffer] data + shared_ptr[CBuffer] metadata + shared_ptr[CRayObject] ray_object + int64_t data_size + + for serialized_object in py_objects: + data_size = serialized_object.total_bytes + data = dynamic_pointer_cast[ + CBuffer, LocalMemoryBuffer]( + make_shared[LocalMemoryBuffer](data_size)) + stream = pyarrow.FixedSizeBufferWriter( + pyarrow.py_buffer(Buffer.make(data))) + serialized_object.write_to(stream) + ray_object = make_shared[CRayObject](data, metadata) + returns.push_back(ray_object) + + cdef class CoreWorker: cdef unique_ptr[CCoreWorker] core_worker @@ -901,7 +947,8 @@ cdef class CoreWorker: args, uint64_t max_reconstructions, resources, - placement_resources): + placement_resources, + c_bool is_direct_call): cdef: CRayFunction ray_function c_vector[CTaskArg] args_vector @@ -921,7 +968,7 @@ cdef class CoreWorker: check_status(self.core_worker.get().CreateActor( ray_function, args_vector, CActorCreationOptions( - max_reconstructions, False, c_resources, + max_reconstructions, is_direct_call, c_resources, c_placement_resources, dynamic_worker_options), &c_actor_id)) diff --git a/python/ray/actor.py b/python/ray/actor.py index 203b83ffc..7e0f354e6 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -306,7 +306,8 @@ class ActorClass(object): num_gpus=None, memory=None, object_store_memory=None, - resources=None): + resources=None, + is_direct_call=None): """Create an actor. This method allows more flexibility than the remote method because @@ -323,6 +324,7 @@ class ActorClass(object): this actor when creating objects. resources: The custom resources required by the actor creation task. + is_direct_call: Use direct actor calls. Returns: A handle to the newly created actor. @@ -401,7 +403,7 @@ class ActorClass(object): actor_id = worker.core_worker.create_actor( function_descriptor.get_function_descriptor_list(), creation_args, meta.max_reconstructions, resources, - actor_placement_resources) + actor_placement_resources, is_direct_call) actor_handle = ActorHandle( actor_id, diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index d302a4a57..9189850b6 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -171,6 +171,7 @@ cdef extern from "ray/common/buffer.h" namespace "ray" nogil: cdef cppclass LocalMemoryBuffer(CBuffer): LocalMemoryBuffer(uint8_t *data, size_t size, c_bool copy_data) + LocalMemoryBuffer(size_t size) cdef extern from "ray/common/ray_object.h" nogil: cdef cppclass CRayObject "ray::RayObject": diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 106792282..858914c60 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -62,6 +62,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const c_vector[shared_ptr[CRayObject]] &args, const c_vector[CObjectID] &arg_reference_ids, const c_vector[CObjectID] &return_ids, + c_bool is_direct_call, c_vector[shared_ptr[CRayObject]] *returns) nogil, CRayStatus() nogil) void Disconnect() diff --git a/python/ray/includes/unique_ids.pxd b/python/ray/includes/unique_ids.pxd index 764c86e10..04f2207e2 100644 --- a/python/ray/includes/unique_ids.pxd +++ b/python/ray/includes/unique_ids.pxd @@ -148,6 +148,8 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: c_bool is_put() + c_bool IsDirectActorType() + int64_t ObjectIndex() const CTaskID TaskId() const diff --git a/python/ray/includes/unique_ids.pxi b/python/ray/includes/unique_ids.pxi index 0f98709d7..571ecedc3 100644 --- a/python/ray/includes/unique_ids.pxi +++ b/python/ray/includes/unique_ids.pxi @@ -176,6 +176,9 @@ cdef class ObjectID(BaseID): def hex(self): return decode(self.data.Hex()) + def is_direct_actor_type(self): + return self.data.IsDirectActorType() + def is_nil(self): return self.data.IsNil() diff --git a/python/ray/ray_perf.py b/python/ray/ray_perf.py index 8926dbb69..96f22d838 100644 --- a/python/ray/ray_perf.py +++ b/python/ray/ray_perf.py @@ -15,6 +15,15 @@ class Actor(object): ray.get([small_value.remote() for _ in range(n)]) +@ray.remote +class Client(object): + def __init__(self, server): + self.server = server + + def small_value_batch(self, n): + ray.get([self.server.small_value.remote() for _ in range(n)]) + + @ray.remote def small_value(): return 0 @@ -54,17 +63,17 @@ def main(): def get_small(): ray.get(value) - timeit("single core get calls", get_small) + timeit("single client get calls", get_small) def put_small(): ray.put(0) - timeit("single core put calls", put_small) + timeit("single client put calls", put_small) def put_large(): ray.put(arr) - timeit("single core put gigabytes", put_large, 8 * 0.1) + timeit("single client put gigabytes", put_large, 8 * 0.1) @ray.remote def do_put_small(): @@ -74,7 +83,7 @@ def main(): def put_multi_small(): ray.get([do_put_small.remote() for _ in range(10)]) - timeit("multi core put calls", put_multi_small, 1000) + timeit("multi client put calls", put_multi_small, 1000) @ray.remote def do_put(): @@ -84,17 +93,17 @@ def main(): def put_multi(): ray.get([do_put.remote() for _ in range(10)]) - timeit("multi core put gigabytes", put_multi, 10 * 8 * 0.1) + timeit("multi client put gigabytes", put_multi, 10 * 8 * 0.1) def small_task(): ray.get(small_value.remote()) - timeit("single core tasks sync", small_task) + timeit("single client tasks sync", small_task) def small_task_async(): ray.get([small_value.remote() for _ in range(1000)]) - timeit("single core tasks async", small_task_async, 1000) + timeit("single client tasks async", small_task_async, 1000) n = 10000 m = 4 @@ -104,21 +113,21 @@ def main(): submitted = [a.small_value_batch.remote(n) for a in actors] ray.get(submitted) - timeit("multi core tasks async", multi_task, n * m) + timeit("multi client tasks async", multi_task, n * m) a = Actor.remote() def actor_sync(): ray.get(a.small_value.remote()) - timeit("single core actor calls sync", actor_sync) + timeit("single client actor calls sync", actor_sync) a = Actor.remote() def actor_async(): ray.get([a.small_value.remote() for _ in range(1000)]) - timeit("single core actor calls async", actor_async, 1000) + timeit("single client actor calls async", actor_async, 1000) n_cpu = multiprocessing.cpu_count() // 2 a = [Actor.remote() for _ in range(n_cpu)] @@ -130,7 +139,32 @@ def main(): def actor_multi2(): ray.get([work.remote(a) for _ in range(m)]) - timeit("multi core actor calls async", actor_multi2, m * n) + timeit("multi client actor calls async", actor_multi2, m * n) + + a = Actor._remote(is_direct_call=True) + + def actor_sync_direct(): + ray.get(a.small_value.remote()) + + timeit("single client direct actor calls sync", actor_sync_direct) + + a = Actor._remote(is_direct_call=True) + + def actor_async_direct(): + ray.get([a.small_value.remote() for _ in range(1000)]) + + timeit("single client direct actor calls async", actor_async_direct, 1000) + + n = 5000 + n_cpu = multiprocessing.cpu_count() // 2 + actors = [Actor._remote(is_direct_call=True) for _ in range(n_cpu)] + clients = [Client.remote(a) for a in actors] + + def actor_multi2_direct(): + ray.get([c.small_value_batch.remote(n) for c in clients]) + + timeit("multi client direct actor calls async", actor_multi2_direct, + n * len(clients)) if __name__ == "__main__": diff --git a/python/ray/signature.py b/python/ray/signature.py index baa8028dc..b06445713 100644 --- a/python/ray/signature.py +++ b/python/ray/signature.py @@ -7,6 +7,7 @@ import funcsigs from funcsigs import Parameter import logging +import ray from ray.utils import is_cython # Logger for this module. It should be configured at the entry point @@ -134,6 +135,13 @@ def flatten_args(signature_parameters, args, kwargs): >>> flatten_args([1, 2, 3], {"a": 4}) [None, 1, None, 2, None, 3, "a", 4] """ + + for obj in args: + if isinstance(obj, ray.ObjectID) and obj.is_direct_actor_type(): + raise NotImplementedError( + "Objects produced by direct actor calls cannot be " + "passed to other tasks as arguments.") + restored = _restore_parameters(signature_parameters) reconstructed_signature = funcsigs.Signature(parameters=restored) try: diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index c09942393..6be4de0ec 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1184,6 +1184,73 @@ def test_get_dict(ray_start_regular): assert result == expected +def test_direct_actor_enabled(ray_start_regular): + @ray.remote + class Actor(object): + def __init__(self): + pass + + def f(self, x): + return x * 2 + + a = Actor._remote(is_direct_call=True) + obj_id = a.f.remote(1) + # it is not stored in plasma + assert not ray.worker.global_worker.core_worker.object_exists(obj_id) + assert ray.get(obj_id) == 2 + + +def test_direct_actor_errors(ray_start_regular): + @ray.remote + class Actor(object): + def __init__(self): + pass + + def f(self, x): + return x * 2 + + @ray.remote + def f(x): + return 1 + + a = Actor._remote(is_direct_call=True) + + # cannot pass returns to other methods directly + with pytest.raises(Exception): + ray.get(f.remote(a.f.remote(2))) + + # cannot pass returns to other methods even in a list + with pytest.raises(Exception): + ray.get(f.remote([a.f.remote(2)])) + + # by ref args not implemented + with pytest.raises(ray.exceptions.RayletError): + a.f.remote(f.remote(2)) + + +def test_direct_actor_recursive(ray_start_regular): + @ray.remote + class Actor(object): + def __init__(self, delegate=None): + self.delegate = delegate + + def f(self, x): + if self.delegate: + return ray.get(self.delegate.f.remote(x)) + return x * 2 + + a = Actor._remote(is_direct_call=True) + b = Actor._remote(args=[a], is_direct_call=False) + c = Actor._remote(args=[b], is_direct_call=True) + + result = ray.get([c.f.remote(i) for i in range(100)]) + assert result == [x * 2 for x in range(100)] + + result, _ = ray.wait([c.f.remote(i) for i in range(100)], num_returns=100) + result = ray.get(result) + assert result == [x * 2 for x in range(100)] + + def test_wait(ray_start_regular): @ray.remote def f(delay): diff --git a/python/ray/worker.py b/python/ray/worker.py index fa8ed1df4..2faecfdaa 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -251,7 +251,7 @@ class Worker(object): """ self.mode = mode - def put_object(self, value, object_id=None): + def put_object(self, value, object_id=None, return_buffer=None): """Put value in the local object store with object id `objectid`. This assumes that the value for `objectid` has not yet been placed in @@ -265,6 +265,8 @@ class Worker(object): value: The value to put in the object store. object_id (object_id.ObjectID): The object ID of the value to be put. If None, one will be generated. + return_buffer: If specified, append returns to this list instead + of storing directly in the object store. Returns: object_id.ObjectID: The object ID the object was put under. @@ -284,6 +286,9 @@ class Worker(object): "call 'put' on it (or return it).") if isinstance(value, bytes): + if return_buffer is not None: + raise NotImplementedError( + "returning raw buffers from direct actor calls") # If the object is a byte array, skip serializing it and # use a special metadata to indicate it's raw binary. So # that this object can also be read by Java. @@ -293,9 +298,13 @@ class Worker(object): memcopy_threads=self.memcopy_threads) if self.use_pickle: + if return_buffer is not None: + raise NotImplementedError( + "pickle5 serialization with direct actor calls") return self._serialize_and_put_pickle5(value, object_id=object_id) else: - return self._serialize_and_put_pyarrow(value, object_id=object_id) + return self._serialize_and_put_pyarrow( + value, object_id=object_id, return_buffer=return_buffer) def _serialize_and_put_pickle5(self, value, object_id=None): """Serialize an object using pickle5 and store it in the object store. @@ -321,13 +330,18 @@ class Worker(object): object_id=object_id, memcopy_threads=self.memcopy_threads) - def _serialize_and_put_pyarrow(self, value, object_id=None): + def _serialize_and_put_pyarrow(self, + value, + object_id=None, + return_buffer=None): """Wraps `store_and_register` with cases for existence and pickling. Args: object_id (object_id.ObjectID): The object ID of the value to be put. value: The value to put in the object store. + return_buffer: If specified, append returns to this list instead + of storing directly in the object store. """ try: serialized_value = self._serialize_with_pyarrow(value) @@ -341,10 +355,13 @@ class Worker(object): "falling back to cloudpickle.".format(type(value))) serialized_value = self._serialize_with_pyarrow(value) - return self.core_worker.put_serialized_object( - serialized_value, - object_id=object_id, - memcopy_threads=self.memcopy_threads) + if return_buffer is not None: + return_buffer.append(serialized_value) + else: + return self.core_worker.put_serialized_object( + serialized_value, + object_id=object_id, + memcopy_threads=self.memcopy_threads) def _serialize_with_pyarrow(self, value, depth=100): """Store an object and attempt to register its class if needed. @@ -721,11 +738,22 @@ def _initialize_serialization(job_id, worker=global_worker): serialization_context.set_pickle(pickle.dumps, pickle.loads) pyarrow.register_torch_serialization_handlers(serialization_context) + def id_serializer(obj): + if isinstance(obj, ray.ObjectID) and obj.is_direct_actor_type(): + raise NotImplementedError( + "Objects produced by direct actor calls cannot be " + "passed to other tasks as arguments.") + return pickle.dumps(obj) + + def id_deserializer(serialized_obj): + return pickle.loads(serialized_obj) + for id_type in ray._raylet._ID_TYPES: serialization_context.register_type( id_type, "{}.{}".format(id_type.__module__, id_type.__name__), - pickle=True) + custom_serializer=id_serializer, + custom_deserializer=id_deserializer) def actor_handle_serializer(obj): return obj._serialization_helper(True) diff --git a/src/ray/common/buffer.h b/src/ray/common/buffer.h index 15e928958..da9ceadab 100644 --- a/src/ray/common/buffer.h +++ b/src/ray/common/buffer.h @@ -4,6 +4,7 @@ #include #include #include "plasma/client.h" +#include "ray/common/status.h" namespace arrow { class Buffer; @@ -47,10 +48,11 @@ class LocalMemoryBuffer : public Buffer { /// \param data The data pointer to the passed-in buffer. /// \param size The size of the passed in buffer. /// \param copy_data If true, data will be copied and owned by this buffer, - /// otherwise the buffer only points to the given address. + /// otherwise the buffer only points to the given address. LocalMemoryBuffer(uint8_t *data, size_t size, bool copy_data = false) : has_data_copy_(copy_data) { if (copy_data) { + RAY_CHECK(data != nullptr); buffer_.insert(buffer_.end(), data, data + size); data_ = buffer_.data(); size_ = buffer_.size(); @@ -60,6 +62,13 @@ class LocalMemoryBuffer : public Buffer { } } + /// Construct a LocalMemoryBuffer of all zeros of the given size. + LocalMemoryBuffer(size_t size) : has_data_copy_(true) { + buffer_.resize(size, 0); + data_ = buffer_.data(); + size_ = buffer_.size(); + } + uint8_t *Data() const override { return data_; } size_t Size() const override { return size_; } diff --git a/src/ray/common/id.h b/src/ray/common/id.h index bc82b206d..637efedae 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -18,6 +18,8 @@ namespace ray { +enum class TaskTransportType { RAYLET, DIRECT_ACTOR }; + class TaskID; class WorkerID; class UniqueID; @@ -285,6 +287,13 @@ class ObjectID : public BaseID { /// \return True if this object is a return value of a task. bool IsReturnObject() const; + /// Return if this is a direct actor call object. + /// + /// \return True if this is a direct actor object return. + bool IsDirectActorType() const { + return GetTransportType() == static_cast(TaskTransportType::DIRECT_ACTOR); + } + /// Get the transport type of this object. /// /// \return The type of the transport which is used to transfer this object. diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index c063c2ab4..943dbaa53 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -81,8 +81,6 @@ class TaskArg { const std::shared_ptr value_; }; -enum class TaskTransportType { RAYLET, DIRECT_ACTOR }; - /// Options for all tasks (actor and non-actor) except for actor creation. struct TaskOptions { TaskOptions() {} diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index 6d3ab7e8a..0fcce82ed 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -90,6 +90,7 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { if (task_spec.IsNormalTask()) { RAY_CHECK(current_job_id_.IsNil()); SetCurrentJobId(task_spec.JobId()); + current_actor_use_direct_call_ = false; } else if (task_spec.IsActorCreationTask()) { RAY_CHECK(current_job_id_.IsNil()); SetCurrentJobId(task_spec.JobId()); diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 9a8413b82..64df69ecf 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -366,25 +366,33 @@ Status CoreWorker::Wait(const std::vector &ids, int num_objects, // Wait from both store providers with timeout set to 0. This is to avoid the case // where we might use up the entire timeout on trying to get objects from one store // provider before even trying another (which might have all of the objects available). - RAY_RETURN_NOT_OK( - plasma_store_provider_->Wait(plasma_object_ids, num_objects, /*timeout_ms=*/0, - worker_context_.GetCurrentTaskID(), &ready)); - RAY_RETURN_NOT_OK(memory_store_provider_->Wait( - memory_object_ids, std::max(0, static_cast(ready.size()) - num_objects), - /*timeout_ms=*/0, worker_context_.GetCurrentTaskID(), &ready)); + if (plasma_object_ids.size() > 0) { + RAY_RETURN_NOT_OK( + plasma_store_provider_->Wait(plasma_object_ids, num_objects, /*timeout_ms=*/0, + worker_context_.GetCurrentTaskID(), &ready)); + } + if (memory_object_ids.size() > 0) { + RAY_RETURN_NOT_OK(memory_store_provider_->Wait( + memory_object_ids, std::max(0, static_cast(ready.size()) - num_objects), + /*timeout_ms=*/0, worker_context_.GetCurrentTaskID(), &ready)); + } if (static_cast(ready.size()) < num_objects && timeout_ms != 0) { int64_t start_time = current_time_ms(); - RAY_RETURN_NOT_OK( - plasma_store_provider_->Wait(plasma_object_ids, num_objects, timeout_ms, - worker_context_.GetCurrentTaskID(), &ready)); + if (plasma_object_ids.size() > 0) { + RAY_RETURN_NOT_OK( + plasma_store_provider_->Wait(plasma_object_ids, num_objects, timeout_ms, + worker_context_.GetCurrentTaskID(), &ready)); + } if (timeout_ms > 0) { timeout_ms = std::max(0, static_cast(timeout_ms - (current_time_ms() - start_time))); } - RAY_RETURN_NOT_OK( - memory_store_provider_->Wait(memory_object_ids, num_objects, timeout_ms, - worker_context_.GetCurrentTaskID(), &ready)); + if (memory_object_ids.size() > 0) { + RAY_RETURN_NOT_OK( + memory_store_provider_->Wait(memory_object_ids, num_objects, timeout_ms, + worker_context_.GetCurrentTaskID(), &ready)); + } } for (size_t i = 0; i < ids.size(); i++) { @@ -621,16 +629,16 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, return_ids.pop_back(); task_type = TaskType::ACTOR_TASK; } - status = task_execution_callback_(task_type, func, - task_spec.GetRequiredResources().GetResourceMap(), - args, arg_reference_ids, return_ids, results); + bool direct_call = worker_context_.CurrentActorUseDirectCall(); + status = task_execution_callback_( + task_type, func, task_spec.GetRequiredResources().GetResourceMap(), args, + arg_reference_ids, return_ids, direct_call, results); SetCurrentTaskId(TaskID::Nil()); worker_context_.ResetCurrentTask(task_spec); - // TODO(edoakes): also check if not direct actor call. // TODO(edoakes): this is only used by java. - if (results->size() != 0) { + if (results->size() != 0 && !direct_call) { for (size_t i = 0; i < results->size(); i++) { ObjectID id = ObjectID::ForTaskReturn( task_spec.TaskId(), /*index=*/i + 1, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 700f7cdca..a03b72060 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -28,7 +28,7 @@ class CoreWorker { const std::unordered_map &required_resources, const std::vector> &args, const std::vector &arg_reference_ids, - const std::vector &return_ids, + const std::vector &return_ids, const bool return_results_directly, std::vector> *results)>; public: diff --git a/src/ray/core_worker/test/mock_worker.cc b/src/ray/core_worker/test/mock_worker.cc index c50d4187c..0d8e26541 100644 --- a/src/ray/core_worker/test/mock_worker.cc +++ b/src/ray/core_worker/test/mock_worker.cc @@ -20,10 +20,11 @@ class MockWorker { public: MockWorker(const std::string &store_socket, const std::string &raylet_socket, const gcs::GcsClientOptions &gcs_options) - : worker_(WorkerType::WORKER, Language::PYTHON, store_socket, raylet_socket, - JobID::FromInt(1), gcs_options, /*log_dir=*/"", - /*node_id_address=*/"127.0.0.1", - std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7)) {} + : worker_( + WorkerType::WORKER, Language::PYTHON, store_socket, raylet_socket, + JobID::FromInt(1), gcs_options, /*log_dir=*/"", + /*node_id_address=*/"127.0.0.1", + std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, _8)) {} void StartExecutingTasks() { worker_.StartExecutingTasks(); } @@ -33,6 +34,7 @@ class MockWorker { const std::vector> &args, const std::vector &arg_reference_ids, const std::vector &return_ids, + const bool return_results_directly, std::vector> *results) { // Note that this doesn't include dummy object id. RAY_CHECK(return_ids.size() >= 0); diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 993d780d5..75b81700c 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -199,12 +199,12 @@ bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) c } CoreWorkerDirectActorTaskReceiver::CoreWorkerDirectActorTaskReceiver( - WorkerContext &worker_context, boost::asio::io_service &io_service, + WorkerContext &worker_context, boost::asio::io_service &main_io_service, rpc::GrpcServer &server, const TaskHandler &task_handler) : worker_context_(worker_context), - io_service_(io_service), - task_service_(io_service, *this), - task_handler_(task_handler) { + task_service_(main_io_service, *this), + task_handler_(task_handler), + task_main_io_service_(main_io_service) { server.RegisterService(task_service_); } @@ -229,7 +229,7 @@ void CoreWorkerDirectActorTaskReceiver::HandlePushTask( if (it == scheduling_queue_.end()) { auto result = scheduling_queue_.emplace( task_spec.CallerId(), - std::unique_ptr(new SchedulingQueue(io_service_))); + std::unique_ptr(new SchedulingQueue(task_main_io_service_))); it = result.first; } it->second->Add( diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index 51d929e8a..d1407aac7 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -204,7 +204,7 @@ class CoreWorkerDirectActorTaskReceiver : public rpc::DirectActorHandler { std::vector> *results)>; CoreWorkerDirectActorTaskReceiver(WorkerContext &worker_context, - boost::asio::io_service &io_service, + boost::asio::io_service &main_io_service, rpc::GrpcServer &server, const TaskHandler &task_handler); @@ -221,12 +221,12 @@ class CoreWorkerDirectActorTaskReceiver : public rpc::DirectActorHandler { private: // Worker context. WorkerContext &worker_context_; - /// The IO event loop. - boost::asio::io_service &io_service_; /// The rpc service for `DirectActorService`. rpc::DirectActorGrpcService task_service_; /// The callback function to process a task. TaskHandler task_handler_; + /// The IO event loop for running tasks on. + boost::asio::io_service &task_main_io_service_; /// Queue of pending requests per actor handle. /// TODO(ekl) GC these queues once the handle is no longer active. std::unordered_map> scheduling_queue_; diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 9f94f1242..bf96e5a61 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -504,7 +504,8 @@ ray::Status ObjectManager::AddWaitRequest(const UniqueID &wait_id, RAY_CHECK(timeout_ms >= 0 || timeout_ms == -1); RAY_CHECK(num_required_objects != 0); - RAY_CHECK(num_required_objects <= object_ids.size()); + RAY_CHECK(num_required_objects <= object_ids.size()) + << num_required_objects << " " << object_ids.size(); if (object_ids.size() == 0) { callback(std::vector(), std::vector()); }