diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd index 46e459232..d72f8cfa3 100644 --- a/python/ray/_raylet.pxd +++ b/python/ray/_raylet.pxd @@ -3,6 +3,8 @@ # cython: embedsignature = True # cython: language_level = 3 +from cpython.pystate cimport PyThreadState_Get + from libcpp cimport bool as c_bool from libcpp.string cimport string as c_string from libcpp.vector cimport vector as c_vector @@ -24,6 +26,19 @@ from ray.includes.function_descriptor cimport ( CFunctionDescriptor, ) +cdef extern from "Python.h": + # Note(simon): This is used to configure asyncio actor stack size. + # Cython made PyThreadState an opaque types. Saying that if the user wants + # specific attributes, they can be declared manually. + + # You can find the cpython definition in Include/cpython/pystate.h#L59 + ctypedef struct CPyThreadState "PyThreadState": + int recursion_depth + + # From Include/ceveal.h#67 + int Py_GetRecursionLimit() + void Py_SetRecursionLimit(int) + cdef class Buffer: cdef: shared_ptr[CBuffer] buffer diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 513aa903e..27b284971 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -14,6 +14,7 @@ import threading import time import logging import os +import pickle import sys from libc.stdint cimport ( @@ -76,6 +77,8 @@ from ray.includes.task cimport CTaskSpec from ray.includes.ray_config cimport RayConfig import ray +from ray.async_compat import (sync_to_async, + AsyncGetResponse, AsyncMonitorState) import ray.experimental.signal as ray_signal import ray.memory_monitor as memory_monitor import ray.ray_constants as ray_constants @@ -117,17 +120,6 @@ include "includes/libcoreworker.pxi" logger = logging.getLogger(__name__) MEMCOPY_THREADS = 12 -PY3 = cpython.PY_MAJOR_VERSION >= 3 - - -if PY3: - import pickle -else: - import cPickle as pickle - -if PY3: - from ray.async_compat import (sync_to_async, - AsyncGetResponse, AsyncMonitorState) def set_internal_config(dict options): @@ -207,6 +199,21 @@ def compute_task_id(ObjectID object_id): return TaskID(object_id.native().TaskId().Binary()) +cdef increase_recursion_limit(): + """Double the recusion limit if current depth is close to the limit""" + cdef: + CPyThreadState * s = PyThreadState_Get() + int current_depth = s.recursion_depth + int current_limit = Py_GetRecursionLimit() + int new_limit = current_limit * 2 + + if current_limit - current_depth < 500: + Py_SetRecursionLimit(new_limit) + logger.debug("Increasing Python recursion limit to {} " + "current recursion depth is {}.".format( + new_limit, current_depth)) + + @cython.auto_pickle(False) cdef class Language: cdef CLanguage lang @@ -297,8 +304,9 @@ cdef void prepare_args( else: args_vector.push_back( CTaskArg.PassByReference( - (CObjectID.FromBinary(core_worker.put_serialized_cobject( - serialized_arg))))) + (CObjectID.FromBinary( + core_worker.put_serialized_cobject( + serialized_arg))))) cdef deserialize_args( const c_vector[shared_ptr[CRayObject]] &c_args, @@ -385,13 +393,18 @@ cdef execute_task( c_resources.find(b"object_store_memory")).second))) def function_executor(*arguments, **kwarguments): - # function_executor is a generator to make sure python decrement - # stack counter on context switch for async mode. If it is not - # a generator, python will count the stacks of executor as part - # of the recursion limit, resulting in much lower concurrency. function = execution_info.function - if PY3 and core_worker.current_actor_is_asyncio(): + if core_worker.current_actor_is_asyncio(): + # Increase recursion limit if necessary. In asyncio mode, + # we have many parallel callstacks (represented in fibers) + # that's suspended for execution. Python interpreter will + # mistakenly count each callstack towards recusion limit. + # We don't need to worry about stackoverflow here because + # the max number of callstacks is limited in direct actor + # transport with max_concurrency flag. + increase_recursion_limit() + if inspect.iscoroutinefunction(function.method): async_function = function else: @@ -412,14 +425,13 @@ cdef execute_task( monitor_state.unregister_coroutine(coroutine) future.add_done_callback(callback) - with nogil: (core_worker.core_worker.get() .YieldCurrentFiber(fiber_event)) - yield future.result() + return future.result() - yield function(actor, *arguments, **kwarguments) + return function(actor, *arguments, **kwarguments) with core_worker.profile_event(b"task", extra_data=extra_data): try: @@ -431,22 +443,17 @@ cdef execute_task( with core_worker.profile_event(b"task:deserialize_arguments"): args, kwargs = deserialize_args(c_args, c_arg_reference_ids) - if (task_type == TASK_TYPE_ACTOR_CREATION_TASK): actor = worker.actors[core_worker.get_actor_id()] class_name = actor.__class__.__name__ actor_title = "{}({}, {})".format( class_name, repr(args), repr(kwargs)) core_worker.set_actor_title(actor_title.encode("utf-8")) - # Execute the task. with ray.worker._changeproctitle(title, next_title): with core_worker.profile_event(b"task:execute"): task_exception = True outputs = function_executor(*args, **kwargs) - # The function_executor is a generator in actor mode. - if inspect.isgenerator(outputs): - outputs = next(outputs) task_exception = False if c_return_ids.size() == 1: outputs = (outputs,) @@ -680,11 +687,12 @@ cdef class CoreWorker: def put_serialized_object(self, serialized_object, ObjectID object_id=None, c_bool pin_object=True): - return ObjectID(self.put_serialized_cobject(serialized_object, object_id, pin_object)) + return ObjectID(self.put_serialized_cobject( + serialized_object, object_id, pin_object)) def put_serialized_cobject(self, serialized_object, - ObjectID object_id=None, - c_bool pin_object=True): + ObjectID object_id=None, + c_bool pin_object=True): cdef: CObjectID c_object_id shared_ptr[CBuffer] data