Convert actor dummy objects to task execution edges. (#1281)

* Define execution dependencies flatbuffer and add to Redis commands

* Convert TaskSpec to TaskExecutionSpec

* Add execution dependencies to Python bindings

* Submitting actor tasks uses execution dependency API instead of dummy argument

* Fix dependency getters and some cleanup for fetching missing dependencies

* C++ convention

* Make TaskExecutionSpec a C++ class

* Convert local scheduler to use TaskExecutionSpec class

* Convert some pointers to references

* Finish conversion to TaskExecutionSpec class

* fix

* Fix

* Fix memory errors?

* Cast flatbuffers GetSize to size_t

* Fixes

* add more retries in global scheduler unit test

* fix linting and cast fbb.GetSize to size_t

* Style and doc

* Fix linting and simplify from_flatbuf.
This commit is contained in:
Stephanie Wang
2017-12-14 20:47:54 -08:00
committed by Robert Nishihara
parent cac5f47600
commit 12fdb3f53a
31 changed files with 719 additions and 431 deletions
+7 -7
View File
@@ -165,9 +165,6 @@ def make_actor_method_executor(worker, method_name, method):
def actor_method_executor(dummy_return_id, task_counter, actor,
*args):
# An actor task's dependency on the previous task is represented by
# a dummy argument. Remove this argument before invocation.
args = args[:-1]
if method_name == "__ray_checkpoint__":
# Execute the checkpoint task.
actor_checkpoint_failed, error = method(actor, *args)
@@ -616,9 +613,11 @@ def make_actor_handle_class(class_name):
ray.worker.global_worker.actors[self._ray_actor_id],
method_name)(*copy.deepcopy(args))
# Add the dummy argument that represents dependency on a preceding
# task.
args.append(dependency)
# Add the execution dependency.
if dependency is None:
execution_dependencies = []
else:
execution_dependencies = [dependency]
is_actor_checkpoint_method = (method_name == "__ray_checkpoint__")
@@ -628,7 +627,8 @@ def make_actor_handle_class(class_name):
function_id, args, actor_id=self._ray_actor_id,
actor_handle_id=self._ray_actor_handle_id,
actor_counter=self._ray_actor_counter,
is_actor_checkpoint_method=is_actor_checkpoint_method)
is_actor_checkpoint_method=is_actor_checkpoint_method,
execution_dependencies=execution_dependencies)
# Update the actor counter and cursor to reflect the most recent
# invocation.
self._ray_actor_counter += 1
+12 -9
View File
@@ -308,7 +308,7 @@ class TestGlobalStateStore(unittest.TestCase):
with self.assertRaises(redis.ResponseError):
# Should not be able to update a non-existent task.
self.redis.execute_command("RAY.TASK_TABLE_UPDATE", "task_id", 10,
"node_id")
"node_id", b"")
def testTaskTableAddAndLookup(self):
TASK_STATUS_WAITING = 1
@@ -321,7 +321,8 @@ class TestGlobalStateStore(unittest.TestCase):
p.psubscribe("{prefix}*:*".format(prefix=TASK_PREFIX))
def check_task_reply(message, task_args, updated=False):
task_status, local_scheduler_id, task_spec = task_args
(task_status, local_scheduler_id, execution_dependencies_string,
task_spec) = task_args
task_reply_object = TaskReply.GetRootAsTaskReply(message, 0)
self.assertEqual(task_reply_object.State(), task_status)
self.assertEqual(task_reply_object.LocalSchedulerId(),
@@ -330,7 +331,7 @@ class TestGlobalStateStore(unittest.TestCase):
self.assertEqual(task_reply_object.Updated(), updated)
# Check that task table adds, updates, and lookups work correctly.
task_args = [TASK_STATUS_WAITING, b"node_id", b"task_spec"]
task_args = [TASK_STATUS_WAITING, b"node_id", b"", b"task_spec"]
response = self.redis.execute_command("RAY.TASK_TABLE_ADD", "task_id",
*task_args)
response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
@@ -338,7 +339,7 @@ class TestGlobalStateStore(unittest.TestCase):
task_args[0] = TASK_STATUS_SCHEDULED
self.redis.execute_command("RAY.TASK_TABLE_UPDATE", "task_id",
*task_args[:2])
*task_args[:3])
response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id")
check_task_reply(response, task_args)
@@ -407,17 +408,19 @@ class TestGlobalStateStore(unittest.TestCase):
def check_task_subscription(self, p, scheduling_state, local_scheduler_id):
task_args = [b"task_id", scheduling_state,
local_scheduler_id.encode("ascii"), b"task_spec"]
local_scheduler_id.encode("ascii"), b"", b"task_spec"]
self.redis.execute_command("RAY.TASK_TABLE_ADD", *task_args)
# Receive the data.
message = get_next_message(p)["data"]
# Check that the notification object is correct.
notification_object = TaskReply.GetRootAsTaskReply(message, 0)
self.assertEqual(notification_object.TaskId(), b"task_id")
self.assertEqual(notification_object.State(), scheduling_state)
self.assertEqual(notification_object.TaskId(), task_args[0])
self.assertEqual(notification_object.State(), task_args[1])
self.assertEqual(notification_object.LocalSchedulerId(),
local_scheduler_id.encode("ascii"))
self.assertEqual(notification_object.TaskSpec(), b"task_spec")
task_args[2])
self.assertEqual(notification_object.ExecutionDependencies(),
task_args[3])
self.assertEqual(notification_object.TaskSpec(), task_args[4])
def testTaskTableSubscribe(self):
scheduling_state = 1
+2
View File
@@ -271,6 +271,8 @@ class GlobalState(object):
return {"State": task_table_message.State(),
"LocalSchedulerID": binary_to_hex(
task_table_message.LocalSchedulerId()),
"ExecutionDependenciesString":
task_table_message.ExecutionDependencies(),
"TaskSpec": task_spec_info}
def task_table(self, task_id=None):
+2 -2
View File
@@ -171,7 +171,7 @@ class TestGlobalScheduler(unittest.TestCase):
[random_object_id()], 0, random_task_id(),
0, local_scheduler.ObjectID(NIL_ACTOR_ID),
local_scheduler.ObjectID(NIL_ACTOR_ID),
0, 0, {"CPU": 1, "GPU": 2})
0, 0, [], {"CPU": 1, "GPU": 2})
self.assertEqual(task2.required_resources(), {"CPU": 1, "GPU": 2})
def test_redis_only_single_task(self):
@@ -268,7 +268,7 @@ class TestGlobalScheduler(unittest.TestCase):
self.local_scheduler_clients[0].submit(task)
# Check that there are the correct number of tasks in Redis and that
# they all get assigned to the local scheduler.
num_retries = 10
num_retries = 20
num_tasks_done = 0
while num_retries > 0:
task_entries = self.state.task_table()
+2 -1
View File
@@ -185,7 +185,8 @@ class Monitor(object):
ok = self.state._execute_command(
key, "RAY.TASK_TABLE_UPDATE",
hex_to_binary(task_id),
ray.experimental.state.TASK_STATUS_LOST, NIL_ID)
ray.experimental.state.TASK_STATUS_LOST, NIL_ID,
task["ExecutionDependenciesString"])
if ok != b"OK":
log.warn("Failed to update lost task for dead scheduler.")
num_tasks_updated += 1
+9 -1
View File
@@ -488,7 +488,8 @@ class Worker(object):
def submit_task(self, function_id, args, actor_id=None,
actor_handle_id=None, actor_counter=0,
is_actor_checkpoint_method=False):
is_actor_checkpoint_method=False,
execution_dependencies=None):
"""Submit a remote task to the scheduler.
Tell the scheduler to schedule the execution of the function with ID
@@ -527,6 +528,10 @@ class Worker(object):
else:
args_for_local_scheduler.append(put(arg))
# By default, there are no execution dependencies.
if execution_dependencies is None:
execution_dependencies = []
# Look up the various function properties.
function_properties = self.function_properties[
self.task_driver_id.id()][function_id.id()]
@@ -543,6 +548,7 @@ class Worker(object):
actor_handle_id,
actor_counter,
is_actor_checkpoint_method,
execution_dependencies,
function_properties.resources)
# Increment the worker's task index to track how many tasks have
# been submitted by the current task so far.
@@ -1885,6 +1891,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
ray.local_scheduler.ObjectID(NIL_ACTOR_ID),
nil_actor_counter,
False,
[],
{"CPU": 0})
global_state._execute_command(
driver_task.task_id(),
@@ -1892,6 +1899,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
driver_task.task_id().id(),
TASK_STATUS_RUNNING,
NIL_LOCAL_SCHEDULER_ID,
driver_task.execution_dependencies_string(),
ray.local_scheduler.task_to_string(driver_task))
# Set the driver's current task ID to the task ID assigned to the
# driver task.