Reduce actor submission python overhead (#5949)

This commit is contained in:
Philipp Moritz
2019-10-23 00:11:32 -07:00
committed by GitHub
parent 875c84ed63
commit 09d05bb3fa
15 changed files with 57 additions and 69 deletions
+3 -3
View File
@@ -895,7 +895,7 @@ cdef class CoreWorker:
c_vector[CTaskArg] args_vector
c_vector[CObjectID] return_ids
with profiling.profile("submit_task"):
with self.profile_event("submit_task"):
prepare_resources(resources, &c_resources)
task_options = CTaskOptions(num_return_vals, c_resources)
ray_function = CRayFunction(
@@ -954,7 +954,7 @@ cdef class CoreWorker:
c_vector[CTaskArg] args_vector
c_vector[CObjectID] return_ids
with profiling.profile("submit_task"):
with self.profile_event("submit_task"):
prepare_resources(resources, &c_resources)
task_options = CTaskOptions(num_return_vals, c_resources)
ray_function = CRayFunction(
@@ -991,7 +991,7 @@ cdef class CoreWorker:
return resources_dict
def profile_event(self, event_type, dict extra_data):
def profile_event(self, event_type, object extra_data=None):
cdef:
c_string c_event_type = event_type.encode("ascii")
+16 -15
View File
@@ -16,7 +16,7 @@ import ray.ray_constants as ray_constants
import ray._raylet
import ray.signature as signature
import ray.worker
from ray import ActorID, ActorClassID, profiling
from ray import ActorID, ActorClassID
logger = logging.getLogger(__name__)
@@ -468,6 +468,12 @@ class ActorHandle(object):
self._ray_class_name = class_name
self._ray_actor_method_cpus = actor_method_cpus
self._ray_session_and_job = session_and_job
self._ray_function_descriptor_lists = {
method_name: FunctionDescriptor(
self._ray_module_name, method_name,
self._ray_class_name).get_function_descriptor_list()
for method_name in self._ray_method_signatures.keys()
}
def _actor_method_call(self,
method_name,
@@ -500,20 +506,15 @@ class ActorHandle(object):
kwargs = kwargs or {}
list_args = signature.flatten_args(function_signature, args, kwargs)
function_descriptor = FunctionDescriptor(
self._ray_module_name, method_name, self._ray_class_name)
with profiling.profile("submit_task"):
if worker.mode == ray.LOCAL_MODE:
function = getattr(worker.actors[self._actor_id], method_name)
object_ids = worker.local_mode_manager.execute(
function, function_descriptor, args, kwargs,
num_return_vals)
else:
object_ids = worker.core_worker.submit_actor_task(
self._ray_actor_id,
function_descriptor.get_function_descriptor_list(),
list_args, num_return_vals,
{"CPU": self._ray_actor_method_cpus})
if worker.mode == ray.LOCAL_MODE:
function = getattr(worker.actors[self._actor_id], method_name)
object_ids = worker.local_mode_manager.execute(
function, method_name, args, kwargs, num_return_vals)
else:
object_ids = worker.core_worker.submit_actor_task(
self._ray_actor_id,
self._ray_function_descriptor_lists[method_name], list_args,
num_return_vals, {"CPU": self._ray_actor_method_cpus})
if len(object_ids) == 1:
object_ids = object_ids[0]
+4 -4
View File
@@ -9,10 +9,10 @@ cdef class ProfileEvent:
"""Cython wrapper class of C++ `ray::worker::ProfileEvent`."""
cdef:
unique_ptr[CProfileEvent] inner
dict extra_data
object extra_data
@staticmethod
cdef make(unique_ptr[CProfileEvent] event, dict extra_data):
cdef make(unique_ptr[CProfileEvent] event, object extra_data):
cdef ProfileEvent self = ProfileEvent.__new__(ProfileEvent)
self.inner = move(event)
self.extra_data = extra_data
@@ -25,7 +25,7 @@ cdef class ProfileEvent:
pass
def __exit__(self, type, value, tb):
extra_data = {}
extra_data = None
if type is not None:
extra_data = {
"type": str(type),
@@ -35,7 +35,7 @@ cdef class ProfileEvent:
elif self.extra_data is not None:
extra_data = self.extra_data
self.inner.get().SetExtraData(json.dumps(extra_data).encode("ascii"))
self.inner.get().SetExtraData(json.dumps(extra_data).encode("ascii") if extra_data else b"{}")
# Deleting the CProfileEvent will add it to a queue to be pushed to
# the driver.
+2 -4
View File
@@ -29,8 +29,7 @@ class LocalModeManager(object):
def __init__(self):
"""Initialize a LocalModeManager."""
def execute(self, function, function_descriptor, args, kwargs,
num_return_vals):
def execute(self, function, function_name, args, kwargs, num_return_vals):
"""Synchronously executes a "remote" function or actor method.
Stores results directly in the generated and returned
@@ -40,7 +39,7 @@ class LocalModeManager(object):
Args:
function: The function to execute.
function_descriptor: Metadata about the function.
function_name: Name of the function to execute.
args: Arguments to the function. These will not be modified by
the function execution.
kwargs: Keyword arguments to the function.
@@ -61,7 +60,6 @@ class LocalModeManager(object):
for object_id, result in zip(object_ids, results):
object_id.value = result
except Exception as e:
function_name = function_descriptor.function_name
backtrace = format_error_message(traceback.format_exc())
task_error = RayTaskError(function_name, backtrace, e.__class__)
for object_id in object_ids: