Fix async actor recursion limitation (#6672)

* Do not start threadpool when using async

* Turn function_executor into a generator

* Add new test for high concurrency and bump the default

* Set direct call
This commit is contained in:
Simon Mo
2020-01-02 19:45:13 -06:00
committed by GitHub
parent 39a3459886
commit 9fe90cdafc
6 changed files with 52 additions and 11 deletions
+1 -1
View File
@@ -147,7 +147,7 @@ script:
# ray tests
- if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then RAY_FORCE_DIRECT=0 python -m pytest -v --durations=5 --timeout=300 python/ray/experimental/test/async_test.py; fi
- if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then RAY_FORCE_DIRECT=0 python -m pytest -v --durations=5 --timeout=300 python/ray/tests/py3_test.py; fi
- if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then python -m pytest -v --durations=5 --timeout=300 python/ray/tests/py3_test.py; fi
- ./ci/keep_alive bazel test --spawn_strategy=local --flaky_test_attempts=3 --nocache_test_results --test_verbose_timeout_warnings --progress_report_interval=100 --show_progress_rate_limit=100 --show_timestamps --test_output=errors --test_tag_filters=-jenkins_only python/ray/...
+9 -2
View File
@@ -592,6 +592,10 @@ cdef execute_task(
c_resources.find(b"object_store_memory")).second)))
def function_executor(*arguments, **kwarguments):
# function_executor is a generator to make sure python decrement
# stack counter on context switch for async mode. If it is not
# a generator, python will count the stacks of executor as part
# of the recursion limit, resulting in much lower concurrency.
function = execution_info.function
if PY3 and core_worker.current_actor_is_asyncio():
@@ -614,9 +618,9 @@ cdef execute_task(
(core_worker.core_worker.get()
.YieldCurrentFiber(fiber_event))
return future.result()
yield future.result()
return function(actor, *arguments, **kwarguments)
yield function(actor, *arguments, **kwarguments)
with core_worker.profile_event(b"task", extra_data=extra_data):
try:
@@ -634,6 +638,9 @@ cdef execute_task(
with core_worker.profile_event(b"task:execute"):
task_exception = True
outputs = function_executor(*args, **kwargs)
# The function_executor is a generator in actor mode.
if inspect.isgenerator(outputs):
outputs = next(outputs)
task_exception = False
if c_return_ids.size() == 1:
outputs = (outputs,)
+2 -2
View File
@@ -377,7 +377,7 @@ class ActorClass:
is_direct_call: Use direct actor calls.
max_concurrency: The max number of concurrent calls to allow for
this actor. This only works with direct actor calls. The max
concurrency defaults to 1 for threaded execution, and 100 for
concurrency defaults to 1 for threaded execution, and 1000 for
asyncio execution. Note that the execution order is not
guaranteed when max_concurrency > 1.
name: The globally unique name for the actor.
@@ -397,7 +397,7 @@ class ActorClass:
is_direct_call = ray_constants.direct_call_enabled()
if max_concurrency is None:
if is_asyncio:
max_concurrency = 100
max_concurrency = 1000
else:
max_concurrency = 1
+28
View File
@@ -6,6 +6,7 @@ from __future__ import print_function
import asyncio
import threading
import pytest
import sys
import ray
import ray.cluster_utils
@@ -172,6 +173,33 @@ def test_asyncio_actor_concurrency(ray_start_regular_shared):
assert history == answer
def test_asyncio_actor_high_concurrency(ray_start_regular_shared):
# This tests actor can handle concurrency above recursionlimit.
@ray.remote
class AsyncConcurrencyBatcher:
def __init__(self, batch_size):
self.batch = []
self.event = asyncio.Event()
self.batch_size = batch_size
async def add(self, x):
self.batch.append(x)
if len(self.batch) >= self.batch_size:
self.event.set()
else:
await self.event.wait()
return sorted(self.batch)
batch_size = sys.getrecursionlimit() * 4
actor = AsyncConcurrencyBatcher.options(
is_asyncio=True, max_concurrency=batch_size * 2,
is_direct_call=True).remote(batch_size)
result = ray.get([actor.add.remote(i) for i in range(batch_size)])
assert result[0] == list(range(batch_size))
assert result[-1] == list(range(batch_size))
@pytest.mark.asyncio
async def test_asyncio_get(ray_start_regular_shared, event_loop):
loop = event_loop
@@ -186,7 +186,7 @@ void CoreWorkerDirectTaskReceiver::SetMaxActorConcurrency(int max_concurrency) {
}
}
void CoreWorkerDirectTaskReceiver::SetActorAsAsync() {
void CoreWorkerDirectTaskReceiver::SetActorAsAsync(int max_concurrency) {
if (!is_asyncio_) {
RAY_LOG(DEBUG) << "Setting direct actor as async, creating new fiber thread.";
@@ -204,7 +204,8 @@ void CoreWorkerDirectTaskReceiver::SetActorAsAsync() {
// immediately start working on any ready fibers.
fiber_shutdown_event_.Wait();
});
fiber_rate_limiter_.reset(new FiberRateLimiter(max_concurrency_));
fiber_rate_limiter_.reset(new FiberRateLimiter(max_concurrency));
max_concurrency_ = max_concurrency;
is_asyncio_ = true;
}
};
@@ -220,9 +221,13 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
nullptr, nullptr);
return;
}
SetMaxActorConcurrency(worker_context_.CurrentActorMaxConcurrency());
// Only call SetMaxActorConcurrency to configure threadpool size when the
// actor is not async actor. Async actor is single threaded.
if (worker_context_.CurrentActorIsAsync()) {
SetActorAsAsync();
SetActorAsAsync(worker_context_.CurrentActorMaxConcurrency());
} else {
SetMaxActorConcurrency(worker_context_.CurrentActorMaxConcurrency());
}
std::vector<ObjectID> dependencies;
@@ -285,7 +285,7 @@ class FiberRateLimiter {
private:
boost::fibers::condition_variable cond_;
boost::fibers::mutex mutex_;
int num_;
int num_ = 1;
};
/// Used to ensure serial order of task execution per actor handle.
@@ -472,7 +472,8 @@ class CoreWorkerDirectTaskReceiver {
/// Set the max concurrency at runtime. It cannot be changed once set.
void SetMaxActorConcurrency(int max_concurrency);
void SetActorAsAsync();
/// Set the max concurrency and start async actor context.
void SetActorAsAsync(int max_concurrency);
private:
// Worker context.