diff --git a/.travis.yml b/.travis.yml index df3fd9a23..6702f3fda 100644 --- a/.travis.yml +++ b/.travis.yml @@ -147,7 +147,7 @@ script: # ray tests - if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then RAY_FORCE_DIRECT=0 python -m pytest -v --durations=5 --timeout=300 python/ray/experimental/test/async_test.py; fi - - if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then RAY_FORCE_DIRECT=0 python -m pytest -v --durations=5 --timeout=300 python/ray/tests/py3_test.py; fi + - if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -m pytest -v --durations=5 --timeout=300 python/ray/tests/py3_test.py; fi - ./ci/keep_alive bazel test --spawn_strategy=local --flaky_test_attempts=3 --nocache_test_results --test_verbose_timeout_warnings --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=errors --test_tag_filters=-jenkins_only python/ray/... diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 78aa0e5c0..86d78dee9 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -592,6 +592,10 @@ cdef execute_task( c_resources.find(b"object_store_memory")).second))) def function_executor(*arguments, **kwarguments): + # function_executor is a generator to make sure python decrement + # stack counter on context switch for async mode. If it is not + # a generator, python will count the stacks of executor as part + # of the recursion limit, resulting in much lower concurrency. function = execution_info.function if PY3 and core_worker.current_actor_is_asyncio(): @@ -614,9 +618,9 @@ cdef execute_task( (core_worker.core_worker.get() .YieldCurrentFiber(fiber_event)) - return future.result() + yield future.result() - return function(actor, *arguments, **kwarguments) + yield function(actor, *arguments, **kwarguments) with core_worker.profile_event(b"task", extra_data=extra_data): try: @@ -634,6 +638,9 @@ cdef execute_task( with core_worker.profile_event(b"task:execute"): task_exception = True outputs = function_executor(*args, **kwargs) + # The function_executor is a generator in actor mode. + if inspect.isgenerator(outputs): + outputs = next(outputs) task_exception = False if c_return_ids.size() == 1: outputs = (outputs,) diff --git a/python/ray/actor.py b/python/ray/actor.py index 49b928f58..0cd2f6057 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -377,7 +377,7 @@ class ActorClass: is_direct_call: Use direct actor calls. max_concurrency: The max number of concurrent calls to allow for this actor. This only works with direct actor calls. The max - concurrency defaults to 1 for threaded execution, and 100 for + concurrency defaults to 1 for threaded execution, and 1000 for asyncio execution. Note that the execution order is not guaranteed when max_concurrency > 1. name: The globally unique name for the actor. @@ -397,7 +397,7 @@ class ActorClass: is_direct_call = ray_constants.direct_call_enabled() if max_concurrency is None: if is_asyncio: - max_concurrency = 100 + max_concurrency = 1000 else: max_concurrency = 1 diff --git a/python/ray/tests/py3_test.py b/python/ray/tests/py3_test.py index 3a7cd864f..978776c6b 100644 --- a/python/ray/tests/py3_test.py +++ b/python/ray/tests/py3_test.py @@ -6,6 +6,7 @@ from __future__ import print_function import asyncio import threading import pytest +import sys import ray import ray.cluster_utils @@ -172,6 +173,33 @@ def test_asyncio_actor_concurrency(ray_start_regular_shared): assert history == answer +def test_asyncio_actor_high_concurrency(ray_start_regular_shared): + # This tests actor can handle concurrency above recursionlimit. + + @ray.remote + class AsyncConcurrencyBatcher: + def __init__(self, batch_size): + self.batch = [] + self.event = asyncio.Event() + self.batch_size = batch_size + + async def add(self, x): + self.batch.append(x) + if len(self.batch) >= self.batch_size: + self.event.set() + else: + await self.event.wait() + return sorted(self.batch) + + batch_size = sys.getrecursionlimit() * 4 + actor = AsyncConcurrencyBatcher.options( + is_asyncio=True, max_concurrency=batch_size * 2, + is_direct_call=True).remote(batch_size) + result = ray.get([actor.add.remote(i) for i in range(batch_size)]) + assert result[0] == list(range(batch_size)) + assert result[-1] == list(range(batch_size)) + + @pytest.mark.asyncio async def test_asyncio_get(ray_start_regular_shared, event_loop): loop = event_loop diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index e1a6ba335..0da279185 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -186,7 +186,7 @@ void CoreWorkerDirectTaskReceiver::SetMaxActorConcurrency(int max_concurrency) { } } -void CoreWorkerDirectTaskReceiver::SetActorAsAsync() { +void CoreWorkerDirectTaskReceiver::SetActorAsAsync(int max_concurrency) { if (!is_asyncio_) { RAY_LOG(DEBUG) << "Setting direct actor as async, creating new fiber thread."; @@ -204,7 +204,8 @@ void CoreWorkerDirectTaskReceiver::SetActorAsAsync() { // immediately start working on any ready fibers. fiber_shutdown_event_.Wait(); }); - fiber_rate_limiter_.reset(new FiberRateLimiter(max_concurrency_)); + fiber_rate_limiter_.reset(new FiberRateLimiter(max_concurrency)); + max_concurrency_ = max_concurrency; is_asyncio_ = true; } }; @@ -220,9 +221,13 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( nullptr, nullptr); return; } - SetMaxActorConcurrency(worker_context_.CurrentActorMaxConcurrency()); + + // Only call SetMaxActorConcurrency to configure threadpool size when the + // actor is not async actor. Async actor is single threaded. if (worker_context_.CurrentActorIsAsync()) { - SetActorAsAsync(); + SetActorAsAsync(worker_context_.CurrentActorMaxConcurrency()); + } else { + SetMaxActorConcurrency(worker_context_.CurrentActorMaxConcurrency()); } std::vector dependencies; diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index d1839942c..56e0d1401 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -285,7 +285,7 @@ class FiberRateLimiter { private: boost::fibers::condition_variable cond_; boost::fibers::mutex mutex_; - int num_; + int num_ = 1; }; /// Used to ensure serial order of task execution per actor handle. @@ -472,7 +472,8 @@ class CoreWorkerDirectTaskReceiver { /// Set the max concurrency at runtime. It cannot be changed once set. void SetMaxActorConcurrency(int max_concurrency); - void SetActorAsAsync(); + /// Set the max concurrency and start async actor context. + void SetActorAsAsync(int max_concurrency); private: // Worker context.