From 29ba6bfc649e3d0aa1bfa3f34602f16c0cb7bcef Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Thu, 21 Nov 2019 11:56:46 -0800 Subject: [PATCH] Basic Async Actor Call (#6183) * Start trying to figure out where to put fibers * Pass is_async flag from python to context * Just running things in fiber works * Yield implemented, need some debugging to make it work * It worked! * Remove debug prints * Lint * Revert the clang-format * Remove unnecessary log * Remove unncessary import * Add attribution * Address comment * Add test * Missed a merge conflict * Make test pass and compile * Address comment * Rename async -> asyncio * Move async test to py3 only * Fix ignore path --- .travis.yml | 4 +- BUILD.bazel | 5 +- python/ray/_raylet.pyx | 50 +++++++++++++++-- python/ray/actor.py | 13 ++++- python/ray/includes/common.pxd | 2 +- python/ray/includes/libcoreworker.pxd | 8 +++ .../tests/{py3_args_test.py => py3_test.py} | 30 +++++++++++ python/ray/tests/test_actor.py | 1 + src/ray/common/task/task_spec.cc | 7 +++ src/ray/common/task/task_spec.h | 2 + src/ray/common/task/task_util.h | 4 +- src/ray/core_worker/common.h | 7 ++- src/ray/core_worker/context.cc | 3 ++ src/ray/core_worker/context.h | 3 ++ src/ray/core_worker/core_worker.cc | 19 +++++-- src/ray/core_worker/core_worker.h | 8 +++ src/ray/core_worker/test/core_worker_test.cc | 17 ++++-- .../transport/direct_actor_transport.cc | 31 ++++++++++- .../transport/direct_actor_transport.h | 54 ++++++++++++++++++- src/ray/protobuf/common.proto | 2 + 20 files changed, 242 insertions(+), 28 deletions(-) rename python/ray/tests/{py3_args_test.py => py3_test.py} (77%) diff --git a/.travis.yml b/.travis.yml index 5e8fc29a0..4776b4a20 100644 --- a/.travis.yml +++ b/.travis.yml @@ -185,8 +185,8 @@ script: # ray tests # Python3.5+ only. Otherwise we will get `SyntaxError` regardless of how we set the tester. - if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -c 'import sys;exit(sys.version_info>=(3,5))' || python -m pytest -v --durations=5 --timeout=300 python/ray/experimental/test/async_test.py; fi - - if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -c 'import sys;exit(sys.version_info>=(3,5))' || python -m pytest -v --durations=5 --timeout=300 python/ray/tests/py3_args_test.py; fi - - if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -m pytest -v --durations=10 --timeout=300 python/ray/tests --ignore=python/ray/tests/perf_integration_tests --ignore=python/ray/tests/py3_args_test.py; fi + - if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -c 'import sys;exit(sys.version_info>=(3,5))' || python -m pytest -v --durations=5 --timeout=300 python/ray/tests/py3_test.py; fi + - if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -m pytest -v --durations=10 --timeout=300 python/ray/tests --ignore=python/ray/tests/perf_integration_tests --ignore=python/ray/tests/py3_test.py; fi deploy: - provider: s3 diff --git a/BUILD.bazel b/BUILD.bazel index ef4507940..f7748fdf5 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1,12 +1,12 @@ # Bazel build # C/C++ documentation: https://docs.bazel.build/versions/master/be/c-cpp.html +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@rules_cc//cc:defs.bzl", "cc_binary", "cc_library", "cc_proto_library", "cc_test") load("@com_github_grpc_grpc//bazel:cc_grpc_library.bzl", "cc_grpc_library") load("@com_github_grpc_grpc//bazel:cython_library.bzl", "pyx_library") -load("@rules_proto_grpc//python:defs.bzl", "python_proto_compile") load("@rules_proto_grpc//python:defs.bzl", "python_grpc_compile") load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library") -load("@//bazel:ray.bzl", "flatbuffer_py_library") COPTS = ["-DRAY_USE_GLOG"] + select({ "@bazel_tools//src/conditions:windows": [ @@ -358,6 +358,7 @@ cc_library( ":raylet_lib", ":worker_rpc", ":gcs", + "@boost//:fiber", ], ) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 842dca315..6af5e737a 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -5,8 +5,14 @@ from cpython.exc cimport PyErr_CheckSignals +try: + import asyncio +except ImportError: + # Python2 doesn't have asyncio + asyncio = None import numpy import gc +import inspect import threading import time import logging @@ -71,6 +77,7 @@ from ray.includes.libcoreworker cimport ( CCoreWorker, CTaskOptions, ResourceMappingType, + CFiberEvent ) from ray.includes.task cimport CTaskSpec from ray.includes.ray_config cimport RayConfig @@ -120,6 +127,7 @@ include "includes/libcoreworker.pxi" logger = logging.getLogger(__name__) MEMCOPY_THREADS = 12 +PY3 = cpython.PY_MAJOR_VERSION >= 3 if cpython.PY_MAJOR_VERSION >= 3: @@ -494,6 +502,7 @@ cdef execute_task( CoreWorker core_worker = worker.core_worker JobID job_id = core_worker.get_current_job_id() CTaskID task_id = core_worker.core_worker.get().GetCurrentTaskId() + CFiberEvent fiber_event # Automatically restrict the GPUs available to this task. ray.utils.set_cuda_visible_devices(ray.get_gpu_ids()) @@ -547,7 +556,24 @@ cdef execute_task( c_resources.find(b"object_store_memory")).second))) def function_executor(*arguments, **kwarguments): - return execution_info.function(actor, *arguments, **kwarguments) + function = execution_info.function + result_or_coroutine = function(actor, *arguments, **kwarguments) + + if PY3 and inspect.iscoroutine(result_or_coroutine): + coroutine = result_or_coroutine + loop = core_worker.create_or_get_event_loop() + + future = asyncio.run_coroutine_threadsafe(coroutine, loop) + future.add_done_callback( + lambda future: fiber_event.Notify()) + + with nogil: + (core_worker.core_worker.get() + .YieldCurrentFiber(fiber_event)) + + return future.result() + + return result_or_coroutine with core_worker.profile_event(b"task", extra_data=extra_data): try: @@ -702,7 +728,10 @@ cdef write_serialized_object( cdef class CoreWorker: - cdef unique_ptr[CCoreWorker] core_worker + cdef: + unique_ptr[CCoreWorker] core_worker + object async_thread + object async_event_loop def __cinit__(self, is_driver, store_socket, raylet_socket, JobID job_id, GcsClientOptions gcs_options, log_dir, @@ -901,7 +930,8 @@ cdef class CoreWorker: placement_resources, c_bool is_direct_call, int32_t max_concurrency, - c_bool is_detached): + c_bool is_detached, + c_bool is_asyncio): cdef: CRayFunction ray_function c_vector[CTaskArg] args_vector @@ -923,7 +953,7 @@ cdef class CoreWorker: CActorCreationOptions( max_reconstructions, is_direct_call, max_concurrency, c_resources, c_placement_resources, - dynamic_worker_options, is_detached), + dynamic_worker_options, is_detached, is_asyncio), &c_actor_id)) return ActorID(c_actor_id.Binary()) @@ -1060,3 +1090,15 @@ cdef class CoreWorker: else: write_serialized_object( serialized_object, returns[0][i].get().GetData()) + + def create_or_get_event_loop(self): + if self.async_event_loop is None: + self.async_event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.async_event_loop) + if self.async_thread is None: + self.async_thread = threading.Thread( + target=lambda: self.async_event_loop.run_forever() + ) + self.async_thread.start() + + return self.async_event_loop diff --git a/python/ray/actor.py b/python/ray/actor.py index 4d0c6dd0a..743624a70 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -358,7 +358,8 @@ class ActorClass(object): is_direct_call=None, max_concurrency=None, name=None, - detached=False): + detached=False, + is_asyncio=False): """Create an actor. This method allows more flexibility than the remote method because @@ -381,6 +382,8 @@ class ActorClass(object): name: The globally unique name for the actor. detached: Whether the actor should be kept alive after driver exits. + is_asyncio: Turn on async actor calls. This only works with direct + actor calls. Returns: A handle to the newly created actor. @@ -400,6 +403,12 @@ class ActorClass(object): if max_concurrency < 1: raise ValueError("max_concurrency must be >= 1") + if is_asyncio and not is_direct_call: + raise ValueError( + "Setting is_asyncio requires is_direct_call=True.") + if is_asyncio and max_concurrency != 1: + raise ValueError("Setting is_asyncio requires max_concurrency=1.") + worker = ray.worker.get_global_worker() if worker.mode is None: raise Exception("Actors cannot be created before ray.init() " @@ -487,7 +496,7 @@ class ActorClass(object): function_descriptor.get_function_descriptor_list(), creation_args, meta.max_reconstructions, resources, actor_placement_resources, is_direct_call, max_concurrency, - detached) + detached, is_asyncio) actor_handle = ActorHandle( actor_id, diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index afdb8c43a..76e34bbc2 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -217,7 +217,7 @@ cdef extern from "ray/core_worker/common.h" nogil: const unordered_map[c_string, double] &resources, const unordered_map[c_string, double] &placement_resources, const c_vector[c_string] &dynamic_worker_options, - c_bool is_detached) + c_bool is_detached, c_bool is_asyncio) cdef extern from "ray/gcs/gcs_client_interface.h" nogil: cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions": diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 9edcdcad9..0030afb7a 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -48,6 +48,12 @@ cdef extern from "ray/core_worker/profiling.h" nogil: cdef cppclass CProfileEvent "ray::worker::ProfileEvent": void SetExtraData(const c_string &extra_data) +cdef extern from "ray/core_worker/transport/direct_actor_transport.h" nogil: + cdef cppclass CFiberEvent "ray::FiberEvent": + CFiberEvent() + void Wait() + void Notify() + cdef extern from "ray/core_worker/core_worker.h" nogil: cdef cppclass CCoreWorker "ray::CoreWorker": CCoreWorker(const CWorkerType worker_type, const CLanguage language, @@ -125,3 +131,5 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus Delete(const c_vector[CObjectID] &object_ids, c_bool local_only, c_bool delete_creating_tasks) c_string MemoryUsageString() + + void YieldCurrentFiber(CFiberEvent &coroutine_done) diff --git a/python/ray/tests/py3_args_test.py b/python/ray/tests/py3_test.py similarity index 77% rename from python/ray/tests/py3_args_test.py rename to python/ray/tests/py3_test.py index e67074211..1c174a3b3 100644 --- a/python/ray/tests/py3_args_test.py +++ b/python/ray/tests/py3_test.py @@ -3,6 +3,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import asyncio import pytest import ray @@ -94,3 +95,32 @@ def test_args_intertwined(ray_start_regular): local_method = local_actor.cls_args_intertwined test_function(local_method, actor_method) ray.get(remote_test_function.remote(local_method, actor_method)) + +def test_asyncio_actor(ray_start_regular): + @ray.remote + class AsyncBatcher(object): + def __init__(self): + self.batch = [] + # The event currently need to be created from the same thread. + # We currently run async coroutines from a different thread. + self.event = None + + async def add(self, x): + if self.event is None: + self.event = asyncio.Event() + self.batch.append(x) + if len(self.batch) >= 3: + self.event.set() + else: + await self.event.wait() + return sorted(self.batch) + + a = AsyncBatcher.options(is_direct_call=True, is_asyncio=True).remote() + x1 = a.add.remote(1) + x2 = a.add.remote(2) + x3 = a.add.remote(3) + r1 = ray.get(x1) + r2 = ray.get(x2) + r3 = ray.get(x3) + assert r1 == [1, 2, 3] + assert r1 == r2 == r3 diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index c16019a89..9d55133bb 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -2852,3 +2852,4 @@ ray.get(actor.ping.remote()) run_string_as_driver(driver_script) detached_actor = ray.experimental.get_actor(actor_name) assert ray.get(detached_actor.ping.remote()) == "pong" + diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 2738aa5b3..57f9ff31e 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -199,6 +199,11 @@ int TaskSpecification::MaxActorConcurrency() const { return message_->actor_creation_task_spec().max_concurrency(); } +bool TaskSpecification::IsAsyncioActor() const { + RAY_CHECK(IsActorCreationTask()); + return message_->actor_creation_task_spec().is_asyncio(); +} + bool TaskSpecification::IsDetachedActor() const { RAY_CHECK(IsActorCreationTask()); return message_->actor_creation_task_spec().is_detached(); @@ -229,6 +234,8 @@ std::string TaskSpecification::DebugString() const { stream << ", actor_creation_task_spec={actor_id=" << ActorCreationId() << ", max_reconstructions=" << MaxActorReconstructions() << ", is_direct_call=" << IsDirectCall() + << ", max_concurrency=" << MaxActorConcurrency() + << ", is_asyncio_actor=" << IsAsyncioActor() << ", is_detached=" << IsDetachedActor() << "}"; } else if (IsActorTask()) { // Print actor task spec. diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 5c2b993a9..553bfc082 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -150,6 +150,8 @@ class TaskSpecification : public MessageWrapper { int MaxActorConcurrency() const; + bool IsAsyncioActor() const; + bool IsDetachedActor() const; ObjectID ActorDummyObject() const; diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index 31f894029..c383a9ad5 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -95,7 +95,8 @@ class TaskSpecBuilder { TaskSpecBuilder &SetActorCreationTaskSpec( const ActorID &actor_id, uint64_t max_reconstructions = 0, const std::vector &dynamic_worker_options = {}, - bool is_direct_call = false, int max_concurrency = 1, bool is_detached = false) { + bool is_direct_call = false, int max_concurrency = 1, bool is_detached = false, + bool is_asyncio = false) { message_->set_type(TaskType::ACTOR_CREATION_TASK); auto actor_creation_spec = message_->mutable_actor_creation_task_spec(); actor_creation_spec->set_actor_id(actor_id.Binary()); @@ -105,6 +106,7 @@ class TaskSpecBuilder { } actor_creation_spec->set_is_direct_call(is_direct_call); actor_creation_spec->set_max_concurrency(max_concurrency); + actor_creation_spec->set_is_asyncio(is_asyncio); actor_creation_spec->set_is_detached(is_detached); return *this; } diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 60fc373f9..fe86decc3 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -105,14 +105,15 @@ struct ActorCreationOptions { const std::unordered_map &resources, const std::unordered_map &placement_resources, const std::vector &dynamic_worker_options, - bool is_detached) + bool is_detached, bool is_asyncio) : max_reconstructions(max_reconstructions), is_direct_call(is_direct_call), max_concurrency(max_concurrency), resources(resources), placement_resources(placement_resources), dynamic_worker_options(dynamic_worker_options), - is_detached(is_detached){}; + is_detached(is_detached), + is_asyncio(is_asyncio){}; /// Maximum number of times that the actor should be reconstructed when it dies /// unexpectedly. It must be non-negative. If it's 0, the actor won't be reconstructed. @@ -132,6 +133,8 @@ struct ActorCreationOptions { /// Whether to keep the actor persistent after driver exit. If true, this will set /// the worker to not be destroyed after the driver shutdown. const bool is_detached = false; + /// Whether to use async mode of direct actor call. is_direct_call must be true. + const bool is_asyncio = false; }; } // namespace ray diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index 0eba2ebd8..d8f745530 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -98,6 +98,7 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { current_actor_id_ = task_spec.ActorCreationId(); current_actor_is_direct_call_ = task_spec.IsDirectCall(); current_actor_max_concurrency_ = task_spec.MaxActorConcurrency(); + current_actor_is_asyncio_ = task_spec.IsAsyncioActor(); } else if (task_spec.IsActorTask()) { RAY_CHECK(current_job_id_ == task_spec.JobId()); RAY_CHECK(current_actor_id_ == task_spec.ActorId()); @@ -135,6 +136,8 @@ int WorkerContext::CurrentActorMaxConcurrency() const { return current_actor_max_concurrency_; } +bool WorkerContext::CurrentActorIsAsync() const { return current_actor_is_asyncio_; } + WorkerThreadContext &WorkerContext::GetThreadContext(bool for_main_thread) { if (thread_context_ == nullptr) { thread_context_ = std::unique_ptr(new WorkerThreadContext()); diff --git a/src/ray/core_worker/context.h b/src/ray/core_worker/context.h index 08f79a9fd..b9cb059b8 100644 --- a/src/ray/core_worker/context.h +++ b/src/ray/core_worker/context.h @@ -48,6 +48,8 @@ class WorkerContext { int CurrentActorMaxConcurrency() const; + bool CurrentActorIsAsync() const; + int GetNextTaskIndex(); int GetNextPutIndex(); @@ -60,6 +62,7 @@ class WorkerContext { bool current_actor_is_direct_call_ = false; bool current_task_is_direct_call_ = false; int current_actor_max_concurrency_ = 1; + bool current_actor_is_asyncio_ = false; /// The id of the (main) thread that constructed this worker context. boost::thread::id main_thread_id_; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 8201afc45..bab5e5fbc 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1,9 +1,12 @@ #include +#include "boost/fiber/all.hpp" + #include "ray/common/ray_config.h" #include "ray/common/task/task_util.h" #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" +#include "ray/core_worker/transport/direct_actor_transport.h" #include "ray/core_worker/transport/raylet_transport.h" namespace { @@ -584,11 +587,11 @@ Status CoreWorker::CreateActor(const RayFunction &function, rpc_address_, function, args, 1, actor_creation_options.resources, actor_creation_options.placement_resources, TaskTransportType::RAYLET, &return_ids); - builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_reconstructions, - actor_creation_options.dynamic_worker_options, - actor_creation_options.is_direct_call, - actor_creation_options.max_concurrency, - actor_creation_options.is_detached); + builder.SetActorCreationTaskSpec( + actor_id, actor_creation_options.max_reconstructions, + actor_creation_options.dynamic_worker_options, + actor_creation_options.is_direct_call, actor_creation_options.max_concurrency, + actor_creation_options.is_detached, actor_creation_options.is_asyncio); std::unique_ptr actor_handle(new ActorHandle( actor_id, job_id, /*actor_cursor=*/return_ids[0], function.GetLanguage(), @@ -900,4 +903,10 @@ void CoreWorker::HandleDirectActorCallArgWaitComplete( }); } +void CoreWorker::YieldCurrentFiber(FiberEvent &event) { + RAY_CHECK(worker_context_.CurrentActorIsAsync()); + boost::this_fiber::yield(); + event.Wait(); +} + } // namespace ray diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index c0299499d..172567f1d 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -354,6 +354,14 @@ class CoreWorker { rpc::DirectActorCallArgWaitCompleteReply *reply, rpc::SendReplyCallback send_reply_callback); + /// + /// Public methods related to async actor call. This should only be used when + /// the actor is (1) direct actor and (2) using asyncio mode. + /// + + /// Block current fiber until event is triggered. + void YieldCurrentFiber(FiberEvent &event); + private: /// Run the io_service_ event loop. This should be called in a background thread. void RunIOService(); diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 8fde21c06..37b3962bc 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -56,9 +56,10 @@ ActorID CreateActorHelper(CoreWorker &worker, std::vector args; args.emplace_back(TaskArg::PassByValue(std::make_shared(buffer, nullptr))); - ActorCreationOptions actor_options{max_reconstructions, is_direct_call, - /*max_concurrency*/ 1, resources, resources, {}, - /*is_detached*/ false}; + ActorCreationOptions actor_options{ + max_reconstructions, is_direct_call, + /*max_concurrency*/ 1, resources, resources, {}, + /*is_detached*/ false, /*is_asyncio*/ false}; // Create an actor. ActorID actor_id; @@ -487,8 +488,14 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { args.emplace_back(TaskArg::PassByValue(std::make_shared(buffer, nullptr))); std::unordered_map resources; - ActorCreationOptions actor_options{0, /*is_direct_call*/ true, 1, resources, resources, - {}, /*is_detached*/ false}; + ActorCreationOptions actor_options{0, + /*is_direct_call*/ true, + 1, + resources, + resources, + {}, + /*is_detached*/ false, + /*is_asyncio*/ false}; const auto job_id = NextJobId(); ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1), job_id, ObjectID::FromRandom(), function.GetLanguage(), true, diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 4e19b0559..d1085f0b7 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -1,5 +1,7 @@ -#include "ray/core_worker/transport/direct_actor_transport.h" +#include + #include "ray/common/task/task.h" +#include "ray/core_worker/transport/direct_actor_transport.h" using ray::rpc::ActorTableData; @@ -237,6 +239,28 @@ void CoreWorkerDirectTaskReceiver::SetMaxActorConcurrency(int max_concurrency) { } } +void CoreWorkerDirectTaskReceiver::SetActorAsAsync() { + if (!is_asyncio_) { + RAY_LOG(DEBUG) << "Setting direct actor as async, creating new fiber thread."; + + // The main thread will be used the creating new fibers. + // The fiber_runner_thread_ will run all fibers. + // boost::fibers::algo::shared_work allows two threads to transparently + // share all the fibers. + boost::fibers::use_scheduling_algorithm(); + + fiber_runner_thread_ = std::thread([&]() { + boost::fibers::use_scheduling_algorithm(); + + // The event here is used to make sure fiber_runner_thread_ never terminates. + // Because fiber_shutdown_event_ is never notified, fiber_runner_thread_ will + // immediately start working on any ready fibers. + fiber_shutdown_event_.Wait(); + }); + is_asyncio_ = true; + } +}; + void CoreWorkerDirectTaskReceiver::HandlePushTask( const rpc::PushTaskRequest &request, rpc::PushTaskReply *reply, rpc::SendReplyCallback send_reply_callback) { @@ -249,6 +273,9 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( return; } SetMaxActorConcurrency(worker_context_.CurrentActorMaxConcurrency()); + if (worker_context_.CurrentActorIsAsync()) { + SetActorAsAsync(); + } // TODO(ekl) resolving object dependencies is expensive and requires an IPC to // the raylet, which is a central bottleneck. In the future, we should inline @@ -265,7 +292,7 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( if (it == scheduling_queue_.end()) { auto result = scheduling_queue_.emplace( task_spec.CallerId(), std::unique_ptr(new SchedulingQueue( - task_main_io_service_, *waiter_, pool_))); + task_main_io_service_, *waiter_, pool_, is_asyncio_))); it = result.first; } diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index a1f52da1d..4311d004d 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -2,6 +2,7 @@ #define RAY_CORE_WORKER_DIRECT_ACTOR_TRANSPORT_H #include +#include #include #include #include @@ -237,18 +238,47 @@ class BoundedExecutor { boost::asio::thread_pool pool_; }; +/// Used by async actor mode. The fiber event will be used +/// from python to switch control among different coroutines. +/// Taken from boost::fiber examples +/// https://github.com/boostorg/fiber/blob/7be4f860e733a92d2fa80a848dd110df009a20e1/examples/wait_stuff.cpp#L115-L142 +class FiberEvent { + public: + // Block the fiber until the event is notified. + void Wait() { + std::unique_lock lock(mutex_); + cond_.wait(lock, [this]() { return ready_; }); + } + + // Notify the event and unblock all waiters. + void Notify() { + { + std::unique_lock lock(mutex_); + ready_ = true; + } + cond_.notify_one(); + } + + private: + boost::fibers::condition_variable cond_; + boost::fibers::mutex mutex_; + bool ready_ = false; +}; + /// Used to ensure serial order of task execution per actor handle. /// See direct_actor.proto for a description of the ordering protocol. class SchedulingQueue { public: SchedulingQueue(boost::asio::io_service &main_io_service, DependencyWaiter &waiter, std::shared_ptr pool = nullptr, + bool use_asyncio = false, int64_t reorder_wait_seconds = kMaxReorderWaitSeconds) : wait_timer_(main_io_service), waiter_(waiter), reorder_wait_seconds_(reorder_wait_seconds), main_thread_id_(boost::this_thread::get_id()), - pool_(pool) {} + pool_(pool), + use_asyncio_(use_asyncio) {} void Add(int64_t seq_no, int64_t client_processed_up_to, std::function accept_request, std::function reject_request, @@ -295,7 +325,10 @@ class SchedulingQueue { pending_tasks_.begin()->second.CanExecute()) { auto head = pending_tasks_.begin(); auto request = head->second; - if (pool_ != nullptr) { + + if (use_asyncio_) { + boost::fibers::fiber([request]() mutable { request.Accept(); }).detach(); + } else if (pool_ != nullptr) { pool_->PostBlocking([request]() mutable { request.Accept(); }); } else { request.Accept(); @@ -349,6 +382,9 @@ class SchedulingQueue { DependencyWaiter &waiter_; /// If concurrent calls are allowed, holds the pool for executing these tasks. std::shared_ptr pool_; + /// Whether we should enqueue requests into asyncio pool. Setting this to true + /// will instantiate all tasks as fibers that can be yielded. + bool use_asyncio_; friend class SchedulingQueueTest; }; @@ -364,6 +400,11 @@ class CoreWorkerDirectTaskReceiver { const TaskHandler &task_handler, const std::function &exit_handler); + ~CoreWorkerDirectTaskReceiver() { + fiber_shutdown_event_.Notify(); + fiber_runner_thread_.join(); + } + /// Initialize this receiver. This must be called prior to use. void Init(RayletClient &client); @@ -388,6 +429,8 @@ class CoreWorkerDirectTaskReceiver { /// Set the max concurrency at runtime. It cannot be changed once set. void SetMaxActorConcurrency(int max_concurrency); + void SetActorAsAsync(); + private: // Worker context. WorkerContext &worker_context_; @@ -408,6 +451,13 @@ class CoreWorkerDirectTaskReceiver { bool exiting_ = false; /// If concurrent calls are allowed, holds the pool for executing these tasks. std::shared_ptr pool_; + /// Whether this actor use asyncio for concurrency. + bool is_asyncio_ = false; + /// The thread that runs all asyncio fibers. is_asyncio_ must be true. + std::thread fiber_runner_thread_; + /// The fiber event used to block fiber_runner_thread_ from shutdown. + /// is_asyncio_ must be true. + FiberEvent fiber_shutdown_event_; }; } // namespace ray diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 7b68c88f8..ee52bb811 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -110,6 +110,8 @@ message ActorCreationTaskSpec { int32 max_concurrency = 6; // Whether the actor is persistent bool is_detached = 7; + // Whether the actor use async actor calls + bool is_asyncio = 8; } // Task spec of an actor task.