[Asyncio] Increase recursion limit manually (#7142)

This commit is contained in:
Simon Mo
2020-02-12 14:15:36 -08:00
committed by GitHub
parent f41a9b9813
commit 0e94e1dc2a
2 changed files with 52 additions and 29 deletions
+15
View File
@@ -3,6 +3,8 @@
# cython: embedsignature = True
# cython: language_level = 3
from cpython.pystate cimport PyThreadState_Get
from libcpp cimport bool as c_bool
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
@@ -24,6 +26,19 @@ from ray.includes.function_descriptor cimport (
CFunctionDescriptor,
)
cdef extern from "Python.h":
# Note(simon): This is used to configure asyncio actor stack size.
# Cython made PyThreadState an opaque types. Saying that if the user wants
# specific attributes, they can be declared manually.
# You can find the cpython definition in Include/cpython/pystate.h#L59
ctypedef struct CPyThreadState "PyThreadState":
int recursion_depth
# From Include/ceveal.h#67
int Py_GetRecursionLimit()
void Py_SetRecursionLimit(int)
cdef class Buffer:
cdef:
shared_ptr[CBuffer] buffer
+37 -29
View File
@@ -14,6 +14,7 @@ import threading
import time
import logging
import os
import pickle
import sys
from libc.stdint cimport (
@@ -76,6 +77,8 @@ from ray.includes.task cimport CTaskSpec
from ray.includes.ray_config cimport RayConfig
import ray
from ray.async_compat import (sync_to_async,
AsyncGetResponse, AsyncMonitorState)
import ray.experimental.signal as ray_signal
import ray.memory_monitor as memory_monitor
import ray.ray_constants as ray_constants
@@ -117,17 +120,6 @@ include "includes/libcoreworker.pxi"
logger = logging.getLogger(__name__)
MEMCOPY_THREADS = 12
PY3 = cpython.PY_MAJOR_VERSION >= 3
if PY3:
import pickle
else:
import cPickle as pickle
if PY3:
from ray.async_compat import (sync_to_async,
AsyncGetResponse, AsyncMonitorState)
def set_internal_config(dict options):
@@ -207,6 +199,21 @@ def compute_task_id(ObjectID object_id):
return TaskID(object_id.native().TaskId().Binary())
cdef increase_recursion_limit():
"""Double the recusion limit if current depth is close to the limit"""
cdef:
CPyThreadState * s = <CPyThreadState *> PyThreadState_Get()
int current_depth = s.recursion_depth
int current_limit = Py_GetRecursionLimit()
int new_limit = current_limit * 2
if current_limit - current_depth < 500:
Py_SetRecursionLimit(new_limit)
logger.debug("Increasing Python recursion limit to {} "
"current recursion depth is {}.".format(
new_limit, current_depth))
@cython.auto_pickle(False)
cdef class Language:
cdef CLanguage lang
@@ -297,8 +304,9 @@ cdef void prepare_args(
else:
args_vector.push_back(
CTaskArg.PassByReference(
(CObjectID.FromBinary(core_worker.put_serialized_cobject(
serialized_arg)))))
(CObjectID.FromBinary(
core_worker.put_serialized_cobject(
serialized_arg)))))
cdef deserialize_args(
const c_vector[shared_ptr[CRayObject]] &c_args,
@@ -385,13 +393,18 @@ cdef execute_task(
c_resources.find(b"object_store_memory")).second)))
def function_executor(*arguments, **kwarguments):
# function_executor is a generator to make sure python decrement
# stack counter on context switch for async mode. If it is not
# a generator, python will count the stacks of executor as part
# of the recursion limit, resulting in much lower concurrency.
function = execution_info.function
if PY3 and core_worker.current_actor_is_asyncio():
if core_worker.current_actor_is_asyncio():
# Increase recursion limit if necessary. In asyncio mode,
# we have many parallel callstacks (represented in fibers)
# that's suspended for execution. Python interpreter will
# mistakenly count each callstack towards recusion limit.
# We don't need to worry about stackoverflow here because
# the max number of callstacks is limited in direct actor
# transport with max_concurrency flag.
increase_recursion_limit()
if inspect.iscoroutinefunction(function.method):
async_function = function
else:
@@ -412,14 +425,13 @@ cdef execute_task(
monitor_state.unregister_coroutine(coroutine)
future.add_done_callback(callback)
with nogil:
(core_worker.core_worker.get()
.YieldCurrentFiber(fiber_event))
yield future.result()
return future.result()
yield function(actor, *arguments, **kwarguments)
return function(actor, *arguments, **kwarguments)
with core_worker.profile_event(b"task", extra_data=extra_data):
try:
@@ -431,22 +443,17 @@ cdef execute_task(
with core_worker.profile_event(b"task:deserialize_arguments"):
args, kwargs = deserialize_args(c_args, c_arg_reference_ids)
if (<int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK):
actor = worker.actors[core_worker.get_actor_id()]
class_name = actor.__class__.__name__
actor_title = "{}({}, {})".format(
class_name, repr(args), repr(kwargs))
core_worker.set_actor_title(actor_title.encode("utf-8"))
# Execute the task.
with ray.worker._changeproctitle(title, next_title):
with core_worker.profile_event(b"task:execute"):
task_exception = True
outputs = function_executor(*args, **kwargs)
# The function_executor is a generator in actor mode.
if inspect.isgenerator(outputs):
outputs = next(outputs)
task_exception = False
if c_return_ids.size() == 1:
outputs = (outputs,)
@@ -680,11 +687,12 @@ cdef class CoreWorker:
def put_serialized_object(self, serialized_object,
ObjectID object_id=None,
c_bool pin_object=True):
return ObjectID(self.put_serialized_cobject(serialized_object, object_id, pin_object))
return ObjectID(self.put_serialized_cobject(
serialized_object, object_id, pin_object))
def put_serialized_cobject(self, serialized_object,
ObjectID object_id=None,
c_bool pin_object=True):
ObjectID object_id=None,
c_bool pin_object=True):
cdef:
CObjectID c_object_id
shared_ptr[CBuffer] data