Enable function_descriptor in backend to replace the function_id (#3028)

This commit is contained in:
Yuhong Guo
2018-12-19 07:53:59 +08:00
committed by Robert Nishihara
parent 3822b20319
commit fb33fa9097
20 changed files with 557 additions and 282 deletions
+64 -73
View File
@@ -36,7 +36,7 @@ import ray.plasma
import ray.ray_constants as ray_constants
from ray import import_thread
from ray import profiling
from ray.function_manager import FunctionActorManager
from ray.function_manager import (FunctionActorManager, FunctionDescriptor)
from ray.utils import (
check_oversized_pickle,
is_cython,
@@ -54,7 +54,6 @@ ERROR_KEY_PREFIX = b"Error:"
# This must match the definition of NIL_ACTOR_ID in task.h.
NIL_ID = ray_constants.ID_SIZE * b"\xff"
NIL_LOCAL_SCHEDULER_ID = NIL_ID
NIL_FUNCTION_ID = NIL_ID
NIL_ACTOR_ID = NIL_ID
NIL_ACTOR_HANDLE_ID = NIL_ID
NIL_CLIENT_ID = ray_constants.ID_SIZE * b"\xff"
@@ -161,10 +160,6 @@ class Worker(object):
self.make_actor = None
self.actors = {}
self.actor_task_counter = 0
# A set of all of the actor class keys that have been imported by the
# import thread. It is safe to convert this worker into an actor of
# these types.
self.imported_actor_classes = set()
# The number of threads Plasma should use when putting an object in the
# object store.
self.memcopy_threads = 12
@@ -518,7 +513,7 @@ class Worker(object):
return final_results
def submit_task(self,
function_id,
function_descriptor,
args,
actor_id=None,
actor_handle_id=None,
@@ -531,15 +526,16 @@ class Worker(object):
num_return_vals=None,
resources=None,
placement_resources=None,
driver_id=None):
driver_id=None,
language=ray.gcs_utils.Language.PYTHON):
"""Submit a remote task to the scheduler.
Tell the scheduler to schedule the execution of the function with ID
function_id with arguments args. Retrieve object IDs for the outputs of
the function from the scheduler and immediately return them.
Tell the scheduler to schedule the execution of the function with
function_descriptor with arguments args. Retrieve object IDs for the
outputs of the function from the scheduler and immediately return them.
Args:
function_id: The ID of the function to execute.
function_descriptor: The function descriptor to execute.
args: The arguments to pass into the function. Arguments can be
object IDs or they can be values. If they are values, they must
be serializable objects.
@@ -623,13 +619,15 @@ class Worker(object):
# The parent task must be set for the submitted task.
assert not self.current_task_id.is_nil()
# Submit the task to local scheduler.
function_descriptor_list = (
function_descriptor.get_function_descriptor_list())
task = ray.raylet.Task(
driver_id, ray.ObjectID(function_id.id()),
args_for_local_scheduler, num_return_vals,
self.current_task_id, task_index, actor_creation_id,
actor_creation_dummy_object_id, max_actor_reconstructions,
actor_id, actor_handle_id, actor_counter,
execution_dependencies, resources, placement_resources)
driver_id, function_descriptor_list, args_for_local_scheduler,
num_return_vals, self.current_task_id, task_index,
actor_creation_id, actor_creation_dummy_object_id,
max_actor_reconstructions, actor_id, actor_handle_id,
actor_counter, execution_dependencies, resources,
placement_resources)
self.raylet_client.submit_task(task)
return task.returns()
@@ -778,10 +776,12 @@ class Worker(object):
self.task_driver_id = task.driver_id()
self.current_task_id = task.task_id()
function_id = task.function_id()
function_descriptor = FunctionDescriptor.from_bytes_list(
task.function_descriptor_list())
args = task.arguments()
return_object_ids = task.returns()
if task.actor_id().id() != NIL_ACTOR_ID:
if (task.actor_id().id() != NIL_ACTOR_ID
or task.actor_creation_id().id() != NIL_ACTOR_ID):
dummy_return_id = return_object_ids.pop()
function_executor = function_execution_info.function
function_name = function_execution_info.function_name
@@ -796,33 +796,36 @@ class Worker(object):
function_name, args)
except RayTaskError as e:
self._handle_process_task_failure(
function_id, function_name, return_object_ids, e,
function_descriptor, return_object_ids, e,
ray.utils.format_error_message(traceback.format_exc()))
return
except Exception as e:
self._handle_process_task_failure(
function_id, function_name, return_object_ids, e,
function_descriptor, return_object_ids, e,
ray.utils.format_error_message(traceback.format_exc()))
return
# Execute the task.
try:
with profiling.profile("task:execute", worker=self):
if task.actor_id().id() == NIL_ACTOR_ID:
if (task.actor_id().id() == NIL_ACTOR_ID
and task.actor_creation_id().id() == NIL_ACTOR_ID):
outputs = function_executor(*arguments)
else:
outputs = function_executor(
dummy_return_id, self.actors[task.actor_id().id()],
*arguments)
if task.actor_id().id() != NIL_ACTOR_ID:
key = task.actor_id().id()
else:
key = task.actor_creation_id().id()
outputs = function_executor(dummy_return_id,
self.actors[key], *arguments)
except Exception as e:
# Determine whether the exception occured during a task, not an
# actor method.
task_exception = task.actor_id().id() == NIL_ACTOR_ID
traceback_str = ray.utils.format_error_message(
traceback.format_exc(), task_exception=task_exception)
self._handle_process_task_failure(function_id, function_name,
return_object_ids, e,
traceback_str)
self._handle_process_task_failure(
function_descriptor, return_object_ids, e, traceback_str)
return
# Store the outputs in the local object store.
@@ -837,11 +840,13 @@ class Worker(object):
self._store_outputs_in_object_store(return_object_ids, outputs)
except Exception as e:
self._handle_process_task_failure(
function_id, function_name, return_object_ids, e,
function_descriptor, return_object_ids, e,
ray.utils.format_error_message(traceback.format_exc()))
def _handle_process_task_failure(self, function_id, function_name,
def _handle_process_task_failure(self, function_descriptor,
return_object_ids, error, backtrace):
function_name = function_descriptor.function_name
function_id = function_descriptor.function_id
failure_object = RayTaskError(function_name, backtrace)
failure_objects = [
failure_object for _ in range(len(return_object_ids))
@@ -855,53 +860,34 @@ class Worker(object):
driver_id=self.task_driver_id.id(),
data={
"function_id": function_id.id(),
"function_name": function_name
"function_name": function_name,
"module_name": function_descriptor.module_name,
"class_name": function_descriptor.class_name
})
# Mark the actor init as failed
if self.actor_id != NIL_ACTOR_ID and function_name == "__init__":
self.mark_actor_init_failed(error)
def _become_actor(self, task):
"""Turn this worker into an actor.
Args:
task: The actor creation task.
"""
assert self.actor_id == NIL_ACTOR_ID
arguments = task.arguments()
assert len(arguments) == 1
self.actor_id = task.actor_creation_id().id()
class_id = arguments[0]
key = b"ActorClass:" + class_id
# Wait for the actor class key to have been imported by the import
# thread. TODO(rkn): It shouldn't be possible to end up in an infinite
# loop here, but we should push an error to the driver if too much time
# is spent here.
while key not in self.imported_actor_classes:
time.sleep(0.001)
with self.lock:
self.function_actor_manager.fetch_and_register_actor(key)
def _wait_for_and_process_task(self, task):
"""Wait for a task to be ready and process the task.
Args:
task: The task to execute.
"""
function_id = task.function_id()
function_descriptor = FunctionDescriptor.from_bytes_list(
task.function_descriptor_list())
driver_id = task.driver_id().id()
# TODO(rkn): It would be preferable for actor creation tasks to share
# more of the code path with regular task execution.
if (task.actor_creation_id() != ray.ObjectID(NIL_ACTOR_ID)):
self._become_actor(task)
return
assert self.actor_id == NIL_ACTOR_ID
self.actor_id = task.actor_creation_id().id()
self.function_actor_manager.load_actor(driver_id,
function_descriptor)
execution_info = self.function_actor_manager.get_execution_info(
driver_id, function_id)
driver_id, function_descriptor)
# Execute the task.
# TODO(rkn): Consider acquiring this lock with a timeout and pushing a
@@ -915,8 +901,14 @@ class Worker(object):
"task_id": task.task_id().hex()
}
if task.actor_id().id() == NIL_ACTOR_ID:
title = "ray_worker:{}()".format(function_name)
next_title = "ray_worker"
if (task.actor_creation_id() == ray.ObjectID(NIL_ACTOR_ID)):
title = "ray_worker:{}()".format(function_name)
next_title = "ray_worker"
else:
actor = self.actors[task.actor_creation_id().id()]
title = "ray_{}:{}()".format(actor.__class__.__name__,
function_name)
next_title = "ray_{}".format(actor.__class__.__name__)
else:
actor = self.actors[task.actor_id().id()]
title = "ray_{}:{}()".format(actor.__class__.__name__,
@@ -934,10 +926,10 @@ class Worker(object):
# Increase the task execution counter.
self.function_actor_manager.increase_task_counter(
driver_id, function_id.id())
driver_id, function_descriptor)
reached_max_executions = (self.function_actor_manager.get_task_counter(
driver_id, function_id.id()) == execution_info.max_calls)
driver_id, function_descriptor) == execution_info.max_calls)
if reached_max_executions:
self.raylet_client.disconnect()
sys.exit(0)
@@ -2096,15 +2088,14 @@ def connect(info,
# rerun the driver.
nil_actor_counter = 0
driver_task = ray.raylet.Task(worker.task_driver_id,
ray.ObjectID(NIL_FUNCTION_ID), [], 0,
worker.current_task_id,
worker.task_index,
ray.ObjectID(NIL_ACTOR_ID),
ray.ObjectID(NIL_ACTOR_ID), 0,
ray.ObjectID(NIL_ACTOR_ID),
ray.ObjectID(NIL_ACTOR_ID),
nil_actor_counter, [], {"CPU": 0}, {})
function_descriptor = FunctionDescriptor.for_driver_task()
driver_task = ray.raylet.Task(
worker.task_driver_id,
function_descriptor.get_function_descriptor_list(), [], 0,
worker.current_task_id, worker.task_index,
ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID), 0,
ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID),
nil_actor_counter, [], {"CPU": 0}, {})
# Add the driver task to the task table.
global_state._execute_command(driver_task.task_id(), "RAY.TABLE_ADD",