diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 82c252a51..320e695a8 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -53,7 +53,7 @@ cdef extern from "ray/core_worker/profiling.h" nogil: cdef cppclass CProfileEvent "ray::worker::ProfileEvent": void SetExtraData(const c_string &extra_data) -cdef extern from "ray/core_worker/transport/direct_actor_transport.h" nogil: +cdef extern from "ray/core_worker/fiber.h" nogil: cdef cppclass CFiberEvent "ray::FiberEvent": CFiberEvent() void Wait() diff --git a/src/ray/core_worker/fiber.h b/src/ray/core_worker/fiber.h new file mode 100644 index 000000000..b8b6acdd8 --- /dev/null +++ b/src/ray/core_worker/fiber.h @@ -0,0 +1,132 @@ +#ifndef RAY_CORE_WORKER_FIBER_H +#define RAY_CORE_WORKER_FIBER_H + +#include +#include +namespace ray { + +/// Used by async actor mode. The fiber event will be used +/// from python to switch control among different coroutines. +/// Taken from boost::fiber examples +/// https://github.com/boostorg/fiber/blob/7be4f860e733a92d2fa80a848dd110df009a20e1/examples/wait_stuff.cpp#L115-L142 +class FiberEvent { + public: + // Block the fiber until the event is notified. + void Wait() { + std::unique_lock lock(mutex_); + cond_.wait(lock, [this]() { return ready_; }); + } + + // Notify the event and unblock all waiters. + void Notify() { + { + std::unique_lock lock(mutex_); + ready_ = true; + } + cond_.notify_one(); + } + + private: + boost::fibers::condition_variable cond_; + boost::fibers::mutex mutex_; + bool ready_ = false; +}; + +/// Used by async actor mode. The FiberRateLimiter is a barrier that +/// allows at most num fibers running at once. It implements the +/// semaphore data structure. +class FiberRateLimiter { + public: + FiberRateLimiter(int num) : num_(num) {} + + // Enter the semaphore. Wait for the value to be > 0 and decrement the value. + void Acquire() { + std::unique_lock lock(mutex_); + cond_.wait(lock, [this]() { return num_ > 0; }); + num_ -= 1; + } + + // Exit the semaphore. Increment the value and notify other waiter. + void Release() { + { + std::unique_lock lock(mutex_); + num_ += 1; + } + // NOTE(simon): This not does guarantee to wake up the first queued fiber. + // This could be a problem for certain workloads because there is no guarantee + // on task ordering. + cond_.notify_one(); + } + + private: + boost::fibers::condition_variable cond_; + boost::fibers::mutex mutex_; + int num_ = 1; +}; + +using FiberChannel = boost::fibers::unbuffered_channel>; + +class FiberState { + public: + FiberState(int max_concurrency) : rate_limiter_(max_concurrency) { + fiber_runner_thread_ = + std::thread( + [&]() { + while (!channel_.is_closed()) { + std::function func; + auto op_status = channel_.pop(func); + if (op_status == boost::fibers::channel_op_status::success) { + boost::fibers::fiber(boost::fibers::launch::dispatch, func).detach(); + } else if (op_status == boost::fibers::channel_op_status::closed) { + // The channel was closed. We will just exit the loop and finish + // cleanup. + break; + } else { + RAY_LOG(ERROR) + << "Async actor fiber channel returned unexpected error code, " + << "shutting down the worker thread. Please submit a github issue " + << "at https://github.com/ray-project/ray"; + return; + } + } + // The event here is used to make sure fiber_runner_thread_ never + // terminates. Because fiber_shutdown_event_ is never notified, + // fiber_runner_thread_ will immediately start working on any ready fibers. + shutdown_worker_event_.Wait(); + }); + } + + void EnqueueFiber(std::function &&callback) { + auto op_status = channel_.push([this, callback]() { + rate_limiter_.Acquire(); + callback(); + rate_limiter_.Release(); + }); + RAY_CHECK(op_status == boost::fibers::channel_op_status::success); + } + + ~FiberState() { + channel_.close(); + shutdown_worker_event_.Notify(); + if (fiber_runner_thread_.joinable()) { + fiber_runner_thread_.join(); + } + } + + private: + /// The fiber channel used to send task between the submitter thread + /// (main direct_actor_trasnport thread) and the fiber_worker_thread_ (defined below) + FiberChannel channel_; + /// The fiber semaphore used to limit the number of concurrent fibers + /// running at once. + FiberRateLimiter rate_limiter_; + /// The fiber event used to block fiber_runner_thread_ from shutdown. + /// is_asyncio_ must be true. + FiberEvent shutdown_worker_event_; + /// The thread that runs all asyncio fibers. is_asyncio_ must be true. + std::thread fiber_runner_thread_; +}; + +} // namespace ray + +#endif // RAY_CORE_WORKER_FIBER_H \ No newline at end of file diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 7c9b85cf0..436bff16c 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -189,22 +189,7 @@ void CoreWorkerDirectTaskReceiver::SetMaxActorConcurrency(int max_concurrency) { void CoreWorkerDirectTaskReceiver::SetActorAsAsync(int max_concurrency) { if (!is_asyncio_) { RAY_LOG(DEBUG) << "Setting direct actor as async, creating new fiber thread."; - - // The main thread will be used the creating new fibers. - // The fiber_runner_thread_ will run all fibers. - // boost::fibers::algo::shared_work allows two threads to transparently - // share all the fibers. - boost::fibers::use_scheduling_algorithm(); - - fiber_runner_thread_ = std::thread([&]() { - boost::fibers::use_scheduling_algorithm(); - - // The event here is used to make sure fiber_runner_thread_ never terminates. - // Because fiber_shutdown_event_ is never notified, fiber_runner_thread_ will - // immediately start working on any ready fibers. - fiber_shutdown_event_.Wait(); - }); - fiber_rate_limiter_.reset(new FiberRateLimiter(max_concurrency)); + fiber_state_.reset(new FiberState(max_concurrency)); max_concurrency_ = max_concurrency; is_asyncio_ = true; } @@ -349,7 +334,7 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( auto result = scheduling_queue_.emplace( task_spec.CallerId(), std::unique_ptr(new SchedulingQueue( - task_main_io_service_, *waiter_, pool_, is_asyncio_, fiber_rate_limiter_))); + task_main_io_service_, *waiter_, pool_, is_asyncio_, fiber_state_))); it = result.first; } it->second->Add(request.sequence_number(), request.client_processed_up_to(), diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index bb6f1d39a..99da8e5d2 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -2,7 +2,6 @@ #define RAY_CORE_WORKER_DIRECT_ACTOR_TRANSPORT_H #include -#include #include #include #include @@ -16,6 +15,7 @@ #include "ray/common/id.h" #include "ray/common/ray_object.h" #include "ray/core_worker/context.h" +#include "ray/core_worker/fiber.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" #include "ray/core_worker/task_manager.h" #include "ray/core_worker/transport/dependency_resolver.h" @@ -234,65 +234,6 @@ class BoundedExecutor { boost::asio::thread_pool pool_; }; -/// Used by async actor mode. The fiber event will be used -/// from python to switch control among different coroutines. -/// Taken from boost::fiber examples -/// https://github.com/boostorg/fiber/blob/7be4f860e733a92d2fa80a848dd110df009a20e1/examples/wait_stuff.cpp#L115-L142 -class FiberEvent { - public: - // Block the fiber until the event is notified. - void Wait() { - std::unique_lock lock(mutex_); - cond_.wait(lock, [this]() { return ready_; }); - } - - // Notify the event and unblock all waiters. - void Notify() { - { - std::unique_lock lock(mutex_); - ready_ = true; - } - cond_.notify_one(); - } - - private: - boost::fibers::condition_variable cond_; - boost::fibers::mutex mutex_; - bool ready_ = false; -}; - -/// Used by async actor mode. The FiberRateLimiter is a barrier that -/// allows at most num fibers running at once. It implements the -/// semaphore data structure. -class FiberRateLimiter { - public: - FiberRateLimiter(int num) : num_(num) {} - - // Enter the semaphore. Wait fo the value to be > 0 and decrement the value. - void Acquire() { - std::unique_lock lock(mutex_); - cond_.wait(lock, [this]() { return num_ > 0; }); - num_ -= 1; - } - - // Exit the semaphore. Increment the value and notify other waiter. - void Release() { - { - std::unique_lock lock(mutex_); - num_ += 1; - } - // TODO(simon): This not does guarantee to wake up the first queued fiber. - // This could be a problem for certain workloads because there is no guarantee - // on task ordering . - cond_.notify_one(); - } - - private: - boost::fibers::condition_variable cond_; - boost::fibers::mutex mutex_; - int num_ = 1; -}; - /// Used to ensure serial order of task execution per actor handle. /// See direct_actor.proto for a description of the ordering protocol. class SchedulingQueue { @@ -300,7 +241,7 @@ class SchedulingQueue { SchedulingQueue(boost::asio::io_service &main_io_service, DependencyWaiter &waiter, std::shared_ptr pool = nullptr, bool use_asyncio = false, - std::shared_ptr fiber_rate_limiter = nullptr, + std::shared_ptr fiber_state = nullptr, int64_t reorder_wait_seconds = kMaxReorderWaitSeconds) : wait_timer_(main_io_service), waiter_(waiter), @@ -308,7 +249,7 @@ class SchedulingQueue { main_thread_id_(boost::this_thread::get_id()), pool_(pool), use_asyncio_(use_asyncio), - fiber_rate_limiter_(fiber_rate_limiter) {} + fiber_state_(fiber_state) {} void Add(int64_t seq_no, int64_t client_processed_up_to, std::function accept_request, std::function reject_request, @@ -358,12 +299,7 @@ class SchedulingQueue { auto request = head->second; if (use_asyncio_) { - boost::fibers::fiber([request, this]() mutable { - fiber_rate_limiter_->Acquire(); - request.Accept(); - fiber_rate_limiter_->Release(); - }) - .detach(); + fiber_state_->EnqueueFiber([request]() mutable { request.Accept(); }); } else if (pool_ != nullptr) { pool_->PostBlocking([request]() mutable { request.Accept(); }); } else { @@ -421,10 +357,9 @@ class SchedulingQueue { /// Whether we should enqueue requests into asyncio pool. Setting this to true /// will instantiate all tasks as fibers that can be yielded. bool use_asyncio_; - /// If use_asyncio_ is true, fiber_rate_limiter_ limits the max number of async - /// tasks running at once. - std::shared_ptr fiber_rate_limiter_; - + /// If use_asyncio_ is true, fiber_state_ contains the running state required + /// to enable continuation and work together with python asyncio. + std::shared_ptr fiber_state_; friend class SchedulingQueueTest; }; @@ -446,14 +381,6 @@ class CoreWorkerDirectTaskReceiver { exit_handler_(exit_handler), task_main_io_service_(main_io_service) {} - ~CoreWorkerDirectTaskReceiver() { - fiber_shutdown_event_.Notify(); - // Only join the fiber thread if it was spawned in the first place. - if (fiber_runner_thread_.joinable()) { - fiber_runner_thread_.join(); - } - } - /// Initialize this receiver. This must be called prior to use. void Init(rpc::ClientFactoryFn client_factory, rpc::Address rpc_address); @@ -511,14 +438,9 @@ class CoreWorkerDirectTaskReceiver { /// Whether this actor use asyncio for concurrency. /// TODO(simon) group all asyncio related fields into a separate struct. bool is_asyncio_ = false; - /// The thread that runs all asyncio fibers. is_asyncio_ must be true. - std::thread fiber_runner_thread_; - /// The fiber event used to block fiber_runner_thread_ from shutdown. - /// is_asyncio_ must be true. - FiberEvent fiber_shutdown_event_; - /// The fiber semaphore used to limit the number of concurrent fibers - /// running at once. - std::shared_ptr fiber_rate_limiter_; + /// If use_asyncio_ is true, fiber_state_ contains the running state required + /// to enable continuation and work together with python asyncio. + std::shared_ptr fiber_state_; }; } // namespace ray