From 12fdb3f53aff04d18246b007ed4c478a8c2b7dba Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 14 Dec 2017 20:47:54 -0800 Subject: [PATCH] Convert actor dummy objects to task execution edges. (#1281) * Define execution dependencies flatbuffer and add to Redis commands * Convert TaskSpec to TaskExecutionSpec * Add execution dependencies to Python bindings * Submitting actor tasks uses execution dependency API instead of dummy argument * Fix dependency getters and some cleanup for fetching missing dependencies * C++ convention * Make TaskExecutionSpec a C++ class * Convert local scheduler to use TaskExecutionSpec class * Convert some pointers to references * Finish conversion to TaskExecutionSpec class * fix * Fix * Fix memory errors? * Cast flatbuffers GetSize to size_t * Fixes * add more retries in global scheduler unit test * fix linting and cast fbb.GetSize to size_t * Style and doc * Fix linting and simplify from_flatbuf. --- python/ray/actor.py | 14 +- python/ray/common/redis_module/runtest.py | 21 +- python/ray/experimental/state.py | 2 + python/ray/global_scheduler/test/test.py | 4 +- python/ray/monitor.py | 3 +- python/ray/worker.py | 10 +- src/common/common_protocol.cc | 21 ++ src/common/common_protocol.h | 18 + src/common/format/common.fbs | 10 + src/common/lib/python/common_extension.cc | 51 ++- src/common/lib/python/common_extension.h | 3 + src/common/redis_module/ray_redis_module.cc | 38 +- src/common/state/redis.cc | 56 ++- src/common/task.cc | 177 +++++++-- src/common/task.h | 93 ++++- src/common/test/db_tests.cc | 21 +- src/common/test/example_task.h | 55 ++- src/common/test/object_table_tests.cc | 2 +- src/common/test/task_table_tests.cc | 2 +- src/global_scheduler/global_scheduler.cc | 2 +- .../global_scheduler_algorithm.cc | 2 +- .../format/local_scheduler.fbs | 5 + src/local_scheduler/local_scheduler.cc | 64 ++-- src/local_scheduler/local_scheduler.h | 3 +- .../local_scheduler_algorithm.cc | 340 ++++++++---------- .../local_scheduler_algorithm.h | 12 +- src/local_scheduler/local_scheduler_client.cc | 15 +- src/local_scheduler/local_scheduler_client.h | 5 +- .../local_scheduler_extension.cc | 5 +- .../test/local_scheduler_tests.cc | 94 +++-- src/plasma/plasma_manager.cc | 2 +- 31 files changed, 719 insertions(+), 431 deletions(-) diff --git a/python/ray/actor.py b/python/ray/actor.py index 5e66ea413..d058c4cc2 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -165,9 +165,6 @@ def make_actor_method_executor(worker, method_name, method): def actor_method_executor(dummy_return_id, task_counter, actor, *args): - # An actor task's dependency on the previous task is represented by - # a dummy argument. Remove this argument before invocation. - args = args[:-1] if method_name == "__ray_checkpoint__": # Execute the checkpoint task. actor_checkpoint_failed, error = method(actor, *args) @@ -616,9 +613,11 @@ def make_actor_handle_class(class_name): ray.worker.global_worker.actors[self._ray_actor_id], method_name)(*copy.deepcopy(args)) - # Add the dummy argument that represents dependency on a preceding - # task. - args.append(dependency) + # Add the execution dependency. + if dependency is None: + execution_dependencies = [] + else: + execution_dependencies = [dependency] is_actor_checkpoint_method = (method_name == "__ray_checkpoint__") @@ -628,7 +627,8 @@ def make_actor_handle_class(class_name): function_id, args, actor_id=self._ray_actor_id, actor_handle_id=self._ray_actor_handle_id, actor_counter=self._ray_actor_counter, - is_actor_checkpoint_method=is_actor_checkpoint_method) + is_actor_checkpoint_method=is_actor_checkpoint_method, + execution_dependencies=execution_dependencies) # Update the actor counter and cursor to reflect the most recent # invocation. self._ray_actor_counter += 1 diff --git a/python/ray/common/redis_module/runtest.py b/python/ray/common/redis_module/runtest.py index d826b2056..f1f1dd4a4 100644 --- a/python/ray/common/redis_module/runtest.py +++ b/python/ray/common/redis_module/runtest.py @@ -308,7 +308,7 @@ class TestGlobalStateStore(unittest.TestCase): with self.assertRaises(redis.ResponseError): # Should not be able to update a non-existent task. self.redis.execute_command("RAY.TASK_TABLE_UPDATE", "task_id", 10, - "node_id") + "node_id", b"") def testTaskTableAddAndLookup(self): TASK_STATUS_WAITING = 1 @@ -321,7 +321,8 @@ class TestGlobalStateStore(unittest.TestCase): p.psubscribe("{prefix}*:*".format(prefix=TASK_PREFIX)) def check_task_reply(message, task_args, updated=False): - task_status, local_scheduler_id, task_spec = task_args + (task_status, local_scheduler_id, execution_dependencies_string, + task_spec) = task_args task_reply_object = TaskReply.GetRootAsTaskReply(message, 0) self.assertEqual(task_reply_object.State(), task_status) self.assertEqual(task_reply_object.LocalSchedulerId(), @@ -330,7 +331,7 @@ class TestGlobalStateStore(unittest.TestCase): self.assertEqual(task_reply_object.Updated(), updated) # Check that task table adds, updates, and lookups work correctly. - task_args = [TASK_STATUS_WAITING, b"node_id", b"task_spec"] + task_args = [TASK_STATUS_WAITING, b"node_id", b"", b"task_spec"] response = self.redis.execute_command("RAY.TASK_TABLE_ADD", "task_id", *task_args) response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id") @@ -338,7 +339,7 @@ class TestGlobalStateStore(unittest.TestCase): task_args[0] = TASK_STATUS_SCHEDULED self.redis.execute_command("RAY.TASK_TABLE_UPDATE", "task_id", - *task_args[:2]) + *task_args[:3]) response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id") check_task_reply(response, task_args) @@ -407,17 +408,19 @@ class TestGlobalStateStore(unittest.TestCase): def check_task_subscription(self, p, scheduling_state, local_scheduler_id): task_args = [b"task_id", scheduling_state, - local_scheduler_id.encode("ascii"), b"task_spec"] + local_scheduler_id.encode("ascii"), b"", b"task_spec"] self.redis.execute_command("RAY.TASK_TABLE_ADD", *task_args) # Receive the data. message = get_next_message(p)["data"] # Check that the notification object is correct. notification_object = TaskReply.GetRootAsTaskReply(message, 0) - self.assertEqual(notification_object.TaskId(), b"task_id") - self.assertEqual(notification_object.State(), scheduling_state) + self.assertEqual(notification_object.TaskId(), task_args[0]) + self.assertEqual(notification_object.State(), task_args[1]) self.assertEqual(notification_object.LocalSchedulerId(), - local_scheduler_id.encode("ascii")) - self.assertEqual(notification_object.TaskSpec(), b"task_spec") + task_args[2]) + self.assertEqual(notification_object.ExecutionDependencies(), + task_args[3]) + self.assertEqual(notification_object.TaskSpec(), task_args[4]) def testTaskTableSubscribe(self): scheduling_state = 1 diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 8370c021c..c331a230e 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -271,6 +271,8 @@ class GlobalState(object): return {"State": task_table_message.State(), "LocalSchedulerID": binary_to_hex( task_table_message.LocalSchedulerId()), + "ExecutionDependenciesString": + task_table_message.ExecutionDependencies(), "TaskSpec": task_spec_info} def task_table(self, task_id=None): diff --git a/python/ray/global_scheduler/test/test.py b/python/ray/global_scheduler/test/test.py index 1a6737bcf..115f6dcd7 100644 --- a/python/ray/global_scheduler/test/test.py +++ b/python/ray/global_scheduler/test/test.py @@ -171,7 +171,7 @@ class TestGlobalScheduler(unittest.TestCase): [random_object_id()], 0, random_task_id(), 0, local_scheduler.ObjectID(NIL_ACTOR_ID), local_scheduler.ObjectID(NIL_ACTOR_ID), - 0, 0, {"CPU": 1, "GPU": 2}) + 0, 0, [], {"CPU": 1, "GPU": 2}) self.assertEqual(task2.required_resources(), {"CPU": 1, "GPU": 2}) def test_redis_only_single_task(self): @@ -268,7 +268,7 @@ class TestGlobalScheduler(unittest.TestCase): self.local_scheduler_clients[0].submit(task) # Check that there are the correct number of tasks in Redis and that # they all get assigned to the local scheduler. - num_retries = 10 + num_retries = 20 num_tasks_done = 0 while num_retries > 0: task_entries = self.state.task_table() diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 42143ab89..c906e720d 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -185,7 +185,8 @@ class Monitor(object): ok = self.state._execute_command( key, "RAY.TASK_TABLE_UPDATE", hex_to_binary(task_id), - ray.experimental.state.TASK_STATUS_LOST, NIL_ID) + ray.experimental.state.TASK_STATUS_LOST, NIL_ID, + task["ExecutionDependenciesString"]) if ok != b"OK": log.warn("Failed to update lost task for dead scheduler.") num_tasks_updated += 1 diff --git a/python/ray/worker.py b/python/ray/worker.py index 25619a260..168f88500 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -488,7 +488,8 @@ class Worker(object): def submit_task(self, function_id, args, actor_id=None, actor_handle_id=None, actor_counter=0, - is_actor_checkpoint_method=False): + is_actor_checkpoint_method=False, + execution_dependencies=None): """Submit a remote task to the scheduler. Tell the scheduler to schedule the execution of the function with ID @@ -527,6 +528,10 @@ class Worker(object): else: args_for_local_scheduler.append(put(arg)) + # By default, there are no execution dependencies. + if execution_dependencies is None: + execution_dependencies = [] + # Look up the various function properties. function_properties = self.function_properties[ self.task_driver_id.id()][function_id.id()] @@ -543,6 +548,7 @@ class Worker(object): actor_handle_id, actor_counter, is_actor_checkpoint_method, + execution_dependencies, function_properties.resources) # Increment the worker's task index to track how many tasks have # been submitted by the current task so far. @@ -1885,6 +1891,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, ray.local_scheduler.ObjectID(NIL_ACTOR_ID), nil_actor_counter, False, + [], {"CPU": 0}) global_state._execute_command( driver_task.task_id(), @@ -1892,6 +1899,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, driver_task.task_id().id(), TASK_STATUS_RUNNING, NIL_LOCAL_SCHEDULER_ID, + driver_task.execution_dependencies_string(), ray.local_scheduler.task_to_string(driver_task)) # Set the driver's current task ID to the task ID assigned to the # driver task. diff --git a/src/common/common_protocol.cc b/src/common/common_protocol.cc index 038047ec0..d27efa2ce 100644 --- a/src/common/common_protocol.cc +++ b/src/common/common_protocol.cc @@ -13,6 +13,16 @@ ObjectID from_flatbuf(const flatbuffers::String &string) { return object_id; } +const std::vector from_flatbuf( + const flatbuffers::Vector> + &vector) { + std::vector object_ids; + for (int64_t i = 0; i < vector.Length(); i++) { + object_ids.push_back(from_flatbuf(*vector.Get(i))); + } + return object_ids; +} + flatbuffers::Offset< flatbuffers::Vector>> to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, @@ -25,6 +35,17 @@ to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, return fbb.CreateVector(results); } +flatbuffers::Offset< + flatbuffers::Vector>> +to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, + const std::vector &object_ids) { + std::vector> results; + for (auto object_id : object_ids) { + results.push_back(to_flatbuf(fbb, object_id)); + } + return fbb.CreateVector(results); +} + std::string string_from_flatbuf(const flatbuffers::String &string) { return std::string(string.data(), string.size()); } diff --git a/src/common/common_protocol.h b/src/common/common_protocol.h index ad46b9066..3ad8ffd5a 100644 --- a/src/common/common_protocol.h +++ b/src/common/common_protocol.h @@ -24,6 +24,14 @@ flatbuffers::Offset to_flatbuf( /// @return The object ID. ObjectID from_flatbuf(const flatbuffers::String &string); +/// Convert a flatbuffer vector of strings to a vector of object IDs. +/// +/// @param vector The flatbuffer vector. +/// @return The vector of object IDs. +const std::vector from_flatbuf( + const flatbuffers::Vector> + &vector); + /// Convert an array of object IDs to a flatbuffer vector of strings. /// /// @param fbb Reference to the flatbuffer builder. @@ -36,6 +44,16 @@ to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, ObjectID object_ids[], int64_t num_objects); +/// Convert a vector of object IDs to a flatbuffer vector of strings. +/// +/// @param fbb Reference to the flatbuffer builder. +/// @param object_ids Vector of object IDs. +/// @return Flatbuffer vector of strings. +flatbuffers::Offset< + flatbuffers::Vector>> +to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, + const std::vector &object_ids); + /// Convert a flatbuffer string to a std::string. /// /// @param fbb Reference to the flatbuffer builder. diff --git a/src/common/format/common.fbs b/src/common/format/common.fbs index 85969d386..6884aa0f1 100644 --- a/src/common/format/common.fbs +++ b/src/common/format/common.fbs @@ -70,6 +70,14 @@ table ObjectInfo { root_type TaskInfo; +table TaskExecutionDependencies { + // A list of object IDs representing this task's dependencies at execution + // time. + execution_dependencies: [string]; +} + +root_type TaskExecutionDependencies; + table SubscribeToNotificationsReply { // The object ID of the object that the notification is about. object_id: string; @@ -89,6 +97,8 @@ table TaskReply { state: long; // A local scheduler ID. local_scheduler_id: string; + // A string of bytes representing the task's TaskExecutionDependencies. + execution_dependencies: string; // A string of bytes representing the task specification. task_spec: string; // A boolean representing whether the update was successful. This field diff --git a/src/common/lib/python/common_extension.cc b/src/common/lib/python/common_extension.cc index c9077f1e5..7806f4df0 100644 --- a/src/common/lib/python/common_extension.cc +++ b/src/common/lib/python/common_extension.cc @@ -4,6 +4,7 @@ #include "common.h" #include "common_extension.h" +#include "common_protocol.h" #include "task.h" #include @@ -104,6 +105,8 @@ PyObject *PyTask_from_string(PyObject *self, PyObject *args) { result = (PyTask *) PyObject_Init((PyObject *) result, &PyTaskType); result->size = size; result->spec = TaskSpec_copy((TaskSpec *) data, size); + /* The created task does not include any execution dependencies. */ + result->execution_dependencies = new std::vector(); /* TODO(pcm): Use flatbuffers validation here. */ return (PyObject *) result; } @@ -288,14 +291,18 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { TaskID parent_task_id; /* The number of tasks that the parent task has called prior to this one. */ int parent_counter; + /* Arguments of the task that are execution-dependent. These must be + * PyObjectIDs). */ + PyObject *execution_arguments = NULL; /* Dictionary of resource requirements for this task. */ PyObject *resource_map = NULL; - if (!PyArg_ParseTuple( - args, "O&O&OiO&i|O&O&iOO", &PyObjectToUniqueID, &driver_id, - &PyObjectToUniqueID, &function_id, &arguments, &num_returns, - &PyObjectToUniqueID, &parent_task_id, &parent_counter, - &PyObjectToUniqueID, &actor_id, &PyObjectToUniqueID, &actor_handle_id, - &actor_counter, &is_actor_checkpoint_method_object, &resource_map)) { + if (!PyArg_ParseTuple(args, "O&O&OiO&i|O&O&iOOO", &PyObjectToUniqueID, + &driver_id, &PyObjectToUniqueID, &function_id, + &arguments, &num_returns, &PyObjectToUniqueID, + &parent_task_id, &parent_counter, &PyObjectToUniqueID, + &actor_id, &PyObjectToUniqueID, &actor_handle_id, + &actor_counter, &is_actor_checkpoint_method_object, + &execution_arguments, &resource_map)) { return -1; } @@ -371,6 +378,23 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { /* Compute the task ID and the return object IDs. */ self->spec = TaskSpec_finish_construct(g_task_builder, &self->size); + + /* Set the task's execution dependencies. */ + self->execution_dependencies = new std::vector(); + if (execution_arguments != NULL) { + size = PyList_Size(execution_arguments); + for (Py_ssize_t i = 0; i < size; ++i) { + PyObject *execution_arg = PyList_GetItem(execution_arguments, i); + if (!PyObject_IsInstance(execution_arg, (PyObject *) &PyObjectIDType)) { + PyErr_SetString(PyExc_TypeError, + "Execution arguments must be an ObjectID."); + return -1; + } + self->execution_dependencies->push_back( + ((PyObjectID *) execution_arg)->object_id); + } + } + return 0; } @@ -378,6 +402,7 @@ static void PyTask_dealloc(PyTask *self) { if (self->spec != NULL) { TaskSpec_free(self->spec); } + delete self->execution_dependencies; Py_TYPE(self)->tp_free((PyObject *) self); } @@ -471,6 +496,15 @@ static PyObject *PyTask_returns(PyObject *self) { return return_id_list; } +static PyObject *PyTask_execution_dependencies_string(PyTask *self) { + flatbuffers::FlatBufferBuilder fbb; + auto execution_dependencies = CreateTaskExecutionDependencies( + fbb, to_flatbuf(fbb, *self->execution_dependencies)); + fbb.Finish(execution_dependencies); + return PyBytes_FromStringAndSize((char *) fbb.GetBufferPointer(), + fbb.GetSize()); +} + static PyMethodDef PyTask_methods[] = { {"function_id", (PyCFunction) PyTask_function_id, METH_NOARGS, "Return the function ID for this task."}, @@ -492,6 +526,9 @@ static PyMethodDef PyTask_methods[] = { "Return the resource vector of the task."}, {"returns", (PyCFunction) PyTask_returns, METH_NOARGS, "Return the object IDs for the return values of the task."}, + {"execution_dependencies_string", + (PyCFunction) PyTask_execution_dependencies_string, METH_NOARGS, + "Return the execution dependencies for the task as a string."}, {NULL} /* Sentinel */ }; @@ -543,6 +580,8 @@ PyObject *PyTask_make(TaskSpec *task_spec, int64_t task_size) { result = (PyTask *) PyObject_Init((PyObject *) result, &PyTaskType); result->spec = task_spec; result->size = task_size; + /* The created task does not include any execution dependencies. */ + result->execution_dependencies = new std::vector(); return (PyObject *) result; } diff --git a/src/common/lib/python/common_extension.h b/src/common/lib/python/common_extension.h index 72070fb0b..b3e8e4f89 100644 --- a/src/common/lib/python/common_extension.h +++ b/src/common/lib/python/common_extension.h @@ -1,6 +1,8 @@ #ifndef COMMON_EXTENSION_H #define COMMON_EXTENSION_H +#include + #include #include "marshal.h" #include "structmember.h" @@ -22,6 +24,7 @@ typedef struct { PyObject_HEAD int64_t size; TaskSpec *spec; + std::vector *execution_dependencies; } PyTask; // clang-format on diff --git a/src/common/redis_module/ray_redis_module.cc b/src/common/redis_module/ray_redis_module.cc index a6827e0e0..a0d424ce2 100644 --- a/src/common/redis_module/ray_redis_module.cc +++ b/src/common/redis_module/ray_redis_module.cc @@ -760,11 +760,14 @@ int ReplyWithTask(RedisModuleCtx *ctx, /* If the key exists, look up the fields and return them in an array. */ RedisModuleString *state = NULL; RedisModuleString *local_scheduler_id = NULL; + RedisModuleString *execution_dependencies = NULL; RedisModuleString *task_spec = NULL; RedisModule_HashGet(key, REDISMODULE_HASH_CFIELDS, "state", &state, - "local_scheduler_id", &local_scheduler_id, "TaskSpec", - &task_spec, NULL); - if (state == NULL || local_scheduler_id == NULL || task_spec == NULL) { + "local_scheduler_id", &local_scheduler_id, + "execution_dependencies", &execution_dependencies, + "TaskSpec", &task_spec, NULL); + if (state == NULL || local_scheduler_id == NULL || + execution_dependencies == NULL || task_spec == NULL) { /* We must have either all fields or no fields. */ RedisModule_CloseKey(key); return RedisModule_ReplyWithError( @@ -777,6 +780,7 @@ int ReplyWithTask(RedisModuleCtx *ctx, RedisModule_CloseKey(key); RedisModule_FreeString(ctx, state); RedisModule_FreeString(ctx, local_scheduler_id); + RedisModule_FreeString(ctx, execution_dependencies); RedisModule_FreeString(ctx, task_spec); return RedisModule_ReplyWithError(ctx, "Found invalid scheduling state."); } @@ -785,6 +789,7 @@ int ReplyWithTask(RedisModuleCtx *ctx, auto message = CreateTaskReply(fbb, RedisStringToFlatbuf(fbb, task_id), state_integer, RedisStringToFlatbuf(fbb, local_scheduler_id), + RedisStringToFlatbuf(fbb, execution_dependencies), RedisStringToFlatbuf(fbb, task_spec), updated); fbb.Finish(message); @@ -794,6 +799,7 @@ int ReplyWithTask(RedisModuleCtx *ctx, RedisModule_FreeString(ctx, state); RedisModule_FreeString(ctx, local_scheduler_id); + RedisModule_FreeString(ctx, execution_dependencies); RedisModule_FreeString(ctx, task_spec); } else { /* If the key does not exist, return nil. */ @@ -904,6 +910,7 @@ int TaskTableWrite(RedisModuleCtx *ctx, RedisModuleString *task_id, RedisModuleString *state, RedisModuleString *local_scheduler_id, + RedisModuleString *execution_dependencies, RedisModuleString *task_spec) { /* Extract the scheduling state. */ long long state_value; @@ -917,7 +924,8 @@ int TaskTableWrite(RedisModuleCtx *ctx, OpenPrefixedKey(ctx, TASK_PREFIX, task_id, REDISMODULE_WRITE); if (task_spec == NULL) { RedisModule_HashSet(key, REDISMODULE_HASH_CFIELDS, "state", state, - "local_scheduler_id", local_scheduler_id, NULL); + "local_scheduler_id", local_scheduler_id, + "execution_dependencies", execution_dependencies, NULL); RedisModule_HashGet(key, REDISMODULE_HASH_CFIELDS, "TaskSpec", &existing_task_spec, NULL); if (existing_task_spec == NULL) { @@ -927,8 +935,9 @@ int TaskTableWrite(RedisModuleCtx *ctx, } } else { RedisModule_HashSet(key, REDISMODULE_HASH_CFIELDS, "state", state, - "local_scheduler_id", local_scheduler_id, "TaskSpec", - task_spec, NULL); + "local_scheduler_id", local_scheduler_id, + "execution_dependencies", execution_dependencies, + "TaskSpec", task_spec, NULL); } RedisModule_CloseKey(key); @@ -953,6 +962,7 @@ int TaskTableWrite(RedisModuleCtx *ctx, auto message = CreateTaskReply(fbb, RedisStringToFlatbuf(fbb, task_id), state_value, RedisStringToFlatbuf(fbb, local_scheduler_id), + RedisStringToFlatbuf(fbb, execution_dependencies), RedisStringToFlatbuf(fbb, task_spec_to_use)); fbb.Finish(message); @@ -996,13 +1006,16 @@ int TaskTableWrite(RedisModuleCtx *ctx, * * This is called from a client with the command: * - * RAY.TASK_TABLE_ADD + * RAY.TASK_TABLE_ADD + * * * @param task_id A string that is the ID of the task. * @param state A string that is the current scheduling state (a * scheduling_state enum instance). * @param local_scheduler_id A string that is the ray client ID of the * associated local scheduler, if any. + * @param execution_dependencies A string that is the list of execution + * dependencies. * @param task_spec A string that is the specification of the task, which can * be cast to a `task_spec`. * @return OK if the operation was successful. @@ -1010,11 +1023,11 @@ int TaskTableWrite(RedisModuleCtx *ctx, int TaskTableAddTask_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - if (argc != 5) { + if (argc != 6) { return RedisModule_WrongArity(ctx); } - return TaskTableWrite(ctx, argv[1], argv[2], argv[3], argv[4]); + return TaskTableWrite(ctx, argv[1], argv[2], argv[3], argv[4], argv[5]); } /** @@ -1024,22 +1037,25 @@ int TaskTableAddTask_RedisCommand(RedisModuleCtx *ctx, * This is called from a client with the command: * * RAY.TASK_TABLE_UPDATE + * * * @param task_id A string that is the ID of the task. * @param state A string that is the current scheduling state (a * scheduling_state enum instance). * @param ray_client_id A string that is the ray client ID of the associated * local scheduler, if any. + * @param execution_dependencies A string that is the list of execution + * dependencies. * @return OK if the operation was successful. */ int TaskTableUpdate_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - if (argc != 4) { + if (argc != 5) { return RedisModule_WrongArity(ctx); } - return TaskTableWrite(ctx, argv[1], argv[2], argv[3], NULL); + return TaskTableWrite(ctx, argv[1], argv[2], argv[3], argv[4], NULL); } /** diff --git a/src/common/state/redis.cc b/src/common/state/redis.cc index ba4f06bf2..e9a05888d 100644 --- a/src/common/state/redis.cc +++ b/src/common/state/redis.cc @@ -531,8 +531,13 @@ Task *parse_and_construct_task_from_redis_reply(redisReply *reply) { auto message = flatbuffers::GetRoot(reply->str); TaskSpec *spec = (TaskSpec *) message->task_spec()->data(); int64_t task_spec_size = message->task_spec()->size(); - task = Task_alloc(spec, task_spec_size, message->state(), - from_flatbuf(*message->local_scheduler_id())); + auto execution_dependencies = + flatbuffers::GetRoot( + message->execution_dependencies()->data()); + task = Task_alloc( + spec, task_spec_size, message->state(), + from_flatbuf(*message->local_scheduler_id()), + from_flatbuf(*execution_dependencies->execution_dependencies())); } else { LOG_FATAL("Unexpected reply type %d", reply->type); } @@ -859,7 +864,9 @@ void redis_task_table_get_task_callback(redisAsyncContext *c, done_callback(task, callback_data->user_context); } /* Free the task if it is not NULL. */ - Task_free(task); + if (task != NULL) { + Task_free(task); + } /* Clean up the timer and callback. */ destroy_timer_callback(db->loop, callback_data); @@ -917,18 +924,27 @@ void redis_task_table_add_task_callback(redisAsyncContext *c, void redis_task_table_add_task(TableCallbackData *callback_data) { DBHandle *db = callback_data->db_handle; Task *task = (Task *) callback_data->data->Get(); + CHECKM(task != NULL, "NULL task passed to redis_task_table_add_task."); + TaskID task_id = Task_task_id(task); DBClientID local_scheduler_id = Task_local_scheduler(task); redisAsyncContext *context = get_redis_context(db, task_id); int state = Task_state(task); - TaskSpec *spec = Task_task_spec(task); - CHECKM(task != NULL, "NULL task passed to redis_task_table_add_task."); + TaskExecutionSpec *execution_spec = Task_task_execution_spec(task); + TaskSpec *spec = execution_spec->Spec(); + + flatbuffers::FlatBufferBuilder fbb; + auto execution_dependencies = CreateTaskExecutionDependencies( + fbb, to_flatbuf(fbb, execution_spec->ExecutionDependencies())); + fbb.Finish(execution_dependencies); + int status = redisAsyncCommand( context, redis_task_table_add_task_callback, - (void *) callback_data->timer_id, "RAY.TASK_TABLE_ADD %b %d %b %b", + (void *) callback_data->timer_id, "RAY.TASK_TABLE_ADD %b %d %b %b %b", task_id.id, sizeof(task_id.id), state, local_scheduler_id.id, - sizeof(local_scheduler_id.id), spec, Task_task_spec_size(task)); + sizeof(local_scheduler_id.id), fbb.GetBufferPointer(), + (size_t) fbb.GetSize(), spec, execution_spec->SpecSize()); if ((status == REDIS_ERR) || context->err) { LOG_REDIS_DEBUG(context, "error in redis_task_table_add_task"); } @@ -972,17 +988,25 @@ void redis_task_table_update_callback(redisAsyncContext *c, void redis_task_table_update(TableCallbackData *callback_data) { DBHandle *db = callback_data->db_handle; Task *task = (Task *) callback_data->data->Get(); + CHECKM(task != NULL, "NULL task passed to redis_task_table_update."); + TaskID task_id = Task_task_id(task); redisAsyncContext *context = get_redis_context(db, task_id); DBClientID local_scheduler_id = Task_local_scheduler(task); int state = Task_state(task); - CHECKM(task != NULL, "NULL task passed to redis_task_table_update."); + TaskExecutionSpec *execution_spec = Task_task_execution_spec(task); + flatbuffers::FlatBufferBuilder fbb; + auto execution_dependencies = CreateTaskExecutionDependencies( + fbb, to_flatbuf(fbb, execution_spec->ExecutionDependencies())); + fbb.Finish(execution_dependencies); + int status = redisAsyncCommand( context, redis_task_table_update_callback, - (void *) callback_data->timer_id, "RAY.TASK_TABLE_UPDATE %b %d %b", + (void *) callback_data->timer_id, "RAY.TASK_TABLE_UPDATE %b %d %b %b", task_id.id, sizeof(task_id.id), state, local_scheduler_id.id, - sizeof(local_scheduler_id.id)); + sizeof(local_scheduler_id.id), fbb.GetBufferPointer(), + (size_t) fbb.GetSize()); if ((status == REDIS_ERR) || context->err) { LOG_REDIS_DEBUG(context, "error in redis_task_table_update"); } @@ -1081,11 +1105,17 @@ void redis_task_table_subscribe_callback(redisAsyncContext *c, /* Extract the local scheduler ID. */ DBClientID local_scheduler_id = from_flatbuf(*message->local_scheduler_id()); + /* Extract the execution dependencies. */ + auto execution_dependencies = + flatbuffers::GetRoot( + message->execution_dependencies()->data()); /* Extract the task spec. */ TaskSpec *spec = (TaskSpec *) message->task_spec()->data(); int64_t task_spec_size = message->task_spec()->size(); /* Create a task. */ - Task *task = Task_alloc(spec, task_spec_size, state, local_scheduler_id); + Task *task = Task_alloc( + spec, task_spec_size, state, local_scheduler_id, + from_flatbuf(*execution_dependencies->execution_dependencies())); /* Call the subscribe callback if there is one. */ TaskTableSubscribeData *data = @@ -1382,7 +1412,7 @@ void redis_local_scheduler_table_disconnect(DBHandle *db) { redisReply *reply = (redisReply *) redisCommand( db->sync_context, "PUBLISH local_schedulers %b", fbb.GetBufferPointer(), - fbb.GetSize()); + (size_t) fbb.GetSize()); CHECK(reply->type != REDIS_REPLY_ERROR); CHECK(reply->type == REDIS_REPLY_INTEGER); LOG_DEBUG("%" PRId64 " subscribers received this publish.\n", reply->integer); @@ -1467,7 +1497,7 @@ void redis_driver_table_send_driver_death(TableCallbackData *callback_data) { int status = redisAsyncCommand( db->context, redis_driver_table_send_driver_death_callback, (void *) callback_data->timer_id, "PUBLISH driver_deaths %b", - fbb.GetBufferPointer(), fbb.GetSize()); + fbb.GetBufferPointer(), (size_t) fbb.GetSize()); if ((status == REDIS_ERR) || db->context->err) { LOG_REDIS_DEBUG(db->context, "error in redis_driver_table_send_driver_death"); diff --git a/src/common/task.cc b/src/common/task.cc index d00f3722c..235a71b14 100644 --- a/src/common/task.cc +++ b/src/common/task.cc @@ -282,6 +282,17 @@ int64_t TaskSpec_num_args(TaskSpec *spec) { return message->args()->size(); } +int64_t TaskSpec_num_args_by_ref(TaskSpec *spec) { + int64_t num_args = TaskSpec_num_args(spec); + int64_t num_args_by_ref = 0; + for (int64_t i = 0; i < num_args; i++) { + if (TaskSpec_arg_by_ref(spec, i)) { + num_args_by_ref++; + } + } + return num_args_by_ref; +} + int TaskSpec_arg_id_count(TaskSpec *spec, int64_t arg_index) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); @@ -348,20 +359,6 @@ const std::unordered_map TaskSpec_get_required_resources( return map_from_flatbuf(*message->required_resources()); } -bool TaskSpec_is_dependent_on(TaskSpec *spec, ObjectID object_id) { - int64_t num_args = TaskSpec_num_args(spec); - for (int i = 0; i < num_args; ++i) { - int count = TaskSpec_arg_id_count(spec, i); - for (int j = 0; j < count; j++) { - ObjectID arg_id = TaskSpec_arg_id(spec, i, j); - if (ObjectID_equal(arg_id, object_id)) { - return true; - } - } - } - return false; -} - TaskSpec *TaskSpec_copy(TaskSpec *spec, int64_t task_spec_size) { TaskSpec *copy = (TaskSpec *) malloc(task_spec_size); memcpy(copy, spec, task_spec_size); @@ -372,32 +369,147 @@ void TaskSpec_free(TaskSpec *spec) { free(spec); } +TaskExecutionSpec::TaskExecutionSpec( + const std::vector &execution_dependencies, + TaskSpec *spec, + int64_t task_spec_size) { + execution_dependencies_ = execution_dependencies; + task_spec_size_ = task_spec_size; + TaskSpec *spec_copy = new TaskSpec[task_spec_size_]; + memcpy(spec_copy, spec, task_spec_size); + spec_ = std::unique_ptr(spec_copy); +} + +TaskExecutionSpec::TaskExecutionSpec(TaskExecutionSpec *other) { + execution_dependencies_ = other->execution_dependencies_; + task_spec_size_ = other->task_spec_size_; + TaskSpec *spec_copy = new TaskSpec[task_spec_size_]; + memcpy(spec_copy, other->spec_.get(), task_spec_size_); + spec_ = std::unique_ptr(spec_copy); +} + +std::vector TaskExecutionSpec::ExecutionDependencies() { + return execution_dependencies_; +} + +int64_t TaskExecutionSpec::SpecSize() { + return task_spec_size_; +} + +TaskSpec *TaskExecutionSpec::Spec() { + return spec_.get(); +} + +int64_t TaskExecutionSpec::NumDependencies() { + TaskSpec *spec = Spec(); + int64_t num_dependencies = TaskSpec_num_args(spec); + num_dependencies += execution_dependencies_.size(); + return num_dependencies; +} + +int TaskExecutionSpec::DependencyIdCount(int64_t dependency_index) { + TaskSpec *spec = Spec(); + /* The first dependencies are the arguments of the task itself, followed by + * the execution dependencies. Find the total number of task arguments so + * that we can index into the correct list. */ + int64_t num_args = TaskSpec_num_args(spec); + if (dependency_index < num_args) { + /* Index into the task arguments. */ + return TaskSpec_arg_id_count(spec, dependency_index); + } else { + /* Index into the execution dependencies. */ + dependency_index -= num_args; + CHECK((size_t) dependency_index < execution_dependencies_.size()); + /* All elements in the execution dependency list have exactly one ID. */ + return 1; + } +} + +ObjectID TaskExecutionSpec::DependencyId(int64_t dependency_index, + int64_t id_index) { + TaskSpec *spec = Spec(); + /* The first dependencies are the arguments of the task itself, followed by + * the execution dependencies. Find the total number of task arguments so + * that we can index into the correct list. */ + int64_t num_args = TaskSpec_num_args(spec); + if (dependency_index < num_args) { + /* Index into the task arguments. */ + return TaskSpec_arg_id(spec, dependency_index, id_index); + } else { + /* Index into the execution dependencies. */ + dependency_index -= num_args; + CHECK((size_t) dependency_index < execution_dependencies_.size()); + return execution_dependencies_[dependency_index]; + } +} + +bool TaskExecutionSpec::DependsOn(ObjectID object_id) { + // Iterate through the task arguments to see if it contains object_id. + TaskSpec *spec = Spec(); + int64_t num_args = TaskSpec_num_args(spec); + for (int i = 0; i < num_args; ++i) { + int count = TaskSpec_arg_id_count(spec, i); + for (int j = 0; j < count; j++) { + ObjectID arg_id = TaskSpec_arg_id(spec, i, j); + if (ObjectID_equal(arg_id, object_id)) { + return true; + } + } + } + // Iterate through the execution dependencies to see if it contains object_id. + for (auto dependency_id : execution_dependencies_) { + if (ObjectID_equal(dependency_id, object_id)) { + return true; + } + } + // The requested object ID was not a task argument or an execution dependency. + // This task is not dependent on it. + return false; +} + +bool TaskExecutionSpec::IsStaticDependency(int64_t dependency_index) { + TaskSpec *spec = Spec(); + /* The first dependencies are the arguments of the task itself, followed by + * the execution dependencies. If the requested dependency index is a task + * argument, then it is a task dependency. */ + int64_t num_args = TaskSpec_num_args(spec); + return (dependency_index < num_args); +} + /* TASK INSTANCES */ Task *Task_alloc(TaskSpec *spec, int64_t task_spec_size, int state, - DBClientID local_scheduler_id) { - int64_t size = sizeof(Task) - sizeof(TaskSpec) + task_spec_size; - Task *result = (Task *) malloc(size); - memset(result, 0, size); + DBClientID local_scheduler_id, + const std::vector &execution_dependencies) { + Task *result = new Task(); + auto execution_spec = + new TaskExecutionSpec(execution_dependencies, spec, task_spec_size); + result->execution_spec = std::unique_ptr(execution_spec); + result->state = state; + result->local_scheduler_id = local_scheduler_id; + return result; +} + +Task *Task_alloc(TaskExecutionSpec &execution_spec, + int state, + DBClientID local_scheduler_id) { + Task *result = new Task(); + result->execution_spec = std::unique_ptr( + new TaskExecutionSpec(&execution_spec)); result->state = state; result->local_scheduler_id = local_scheduler_id; - result->task_spec_size = task_spec_size; - memcpy(&result->spec, spec, task_spec_size); return result; } Task *Task_copy(Task *other) { - int64_t size = Task_size(other); - Task *copy = (Task *) malloc(size); - CHECK(copy != NULL); - memcpy(copy, other, size); - return copy; + return Task_alloc(*Task_task_execution_spec(other), other->state, + other->local_scheduler_id); } int64_t Task_size(Task *task_arg) { - return sizeof(Task) - sizeof(TaskSpec) + task_arg->task_spec_size; + return sizeof(Task) - sizeof(TaskSpec) + task_arg->execution_spec->SpecSize(); } int Task_state(Task *task) { @@ -416,19 +528,16 @@ void Task_set_local_scheduler(Task *task, DBClientID local_scheduler_id) { task->local_scheduler_id = local_scheduler_id; } -TaskSpec *Task_task_spec(Task *task) { - return &task->spec; -} - -int64_t Task_task_spec_size(Task *task) { - return task->task_spec_size; +TaskExecutionSpec *Task_task_execution_spec(Task *task) { + return task->execution_spec.get(); } TaskID Task_task_id(Task *task) { - TaskSpec *spec = Task_task_spec(task); + TaskExecutionSpec *execution_spec = Task_task_execution_spec(task); + TaskSpec *spec = execution_spec->Spec(); return TaskSpec_task_id(spec); } void Task_free(Task *task) { - free(task); + delete task; } diff --git a/src/common/task.h b/src/common/task.h index 7dd9d6370..1773a22f5 100644 --- a/src/common/task.h +++ b/src/common/task.h @@ -13,6 +13,73 @@ typedef uint8_t TaskSpec; +class TaskExecutionSpec { + public: + TaskExecutionSpec(const std::vector &execution_dependencies, + TaskSpec *spec, + int64_t task_spec_size); + TaskExecutionSpec(TaskExecutionSpec *execution_spec); + + /// Get the task's execution dependencies. + /// + /// @return A vector of object IDs representing this task's execution + /// dependencies. + std::vector ExecutionDependencies(); + + /// Get the task spec size. + /// + /// @return The size of the immutable task spec. + int64_t SpecSize(); + + /// Get the task spec. + /// + /// @return A pointer to the immutable task spec. + TaskSpec *Spec(); + + /// Get the number of dependencies. This comprises the immutable task + /// arguments and the mutable execution dependencies. + /// + /// @return The number of dependencies. + int64_t NumDependencies(); + + /// Get the number of object IDs at the given dependency index. + /// + /// @param dependency_index The dependency index whose object IDs to count. + /// @return The number of object IDs at the given dependency_index. + int DependencyIdCount(int64_t dependency_index); + + /// Get the object ID of a given dependency index. + /// + /// @param dependency_index The index at which we should look up the object + /// ID. + /// @param id_index The index of the object ID. + ObjectID DependencyId(int64_t dependency_index, int64_t id_index); + + /// Compute whether the task is dependent on an object ID. + /// + /// @param object_id The object ID that the task may be dependent on. + /// @return bool This returns true if the task is dependent on the given + /// object ID and false otherwise. + bool DependsOn(ObjectID object_id); + + /// Returns whether the given dependency index is a static dependency (an + /// argument of the immutable task). + /// + /// @param dependency_index The requested dependency index. + /// @return bool This returns true if the requested dependency index is + /// immutable (an argument of the task). + bool IsStaticDependency(int64_t dependency_index); + + private: + /** A list of object IDs representing this task's dependencies at execution + * time. */ + std::vector execution_dependencies_; + /** The size of the task specification for this task. */ + int64_t task_spec_size_; + /** The task specification for this task. */ + std::unique_ptr spec_; +}; + class TaskBuilder; #define NIL_TASK_ID NIL_ID @@ -346,16 +413,6 @@ double TaskSpec_get_required_resource(const TaskSpec *spec, const std::unordered_map TaskSpec_get_required_resources( const TaskSpec *spec); -/** - * Compute whether the task is dependent on an object ID. - * - * @param spec Task specification. - * @param object_id The object ID that the task may be dependent on. - * @return bool This returns true if the task is dependent on the given object - * ID and false otherwise. - */ -bool TaskSpec_is_dependent_on(TaskSpec *spec, ObjectID object_id); - /** * Compute the object id associated to a put call. * @@ -426,10 +483,8 @@ struct Task { int state; /** The ID of the local scheduler involved. */ DBClientID local_scheduler_id; - /** The size of the task specification for this task. */ - int64_t task_spec_size; - /** The task specification for this task. */ - TaskSpec spec; + /** The execution specification for this task. */ + std::unique_ptr execution_spec; }; /** @@ -442,6 +497,11 @@ struct Task { */ Task *Task_alloc(TaskSpec *spec, int64_t task_spec_size, + int state, + DBClientID local_scheduler_id, + const std::vector &execution_dependencies); + +Task *Task_alloc(TaskExecutionSpec &execution_spec, int state, DBClientID local_scheduler_id); @@ -468,10 +528,7 @@ DBClientID Task_local_scheduler(Task *task); /** Set the local scheduler ID for this task. */ void Task_set_local_scheduler(Task *task, DBClientID local_scheduler_id); -/** Task specification of this task. */ -TaskSpec *Task_task_spec(Task *task); - -int64_t Task_task_spec_size(Task *task); +TaskExecutionSpec *Task_task_execution_spec(Task *task); /** Task ID of this task. */ TaskID Task_task_id(Task *task); diff --git a/src/common/test/db_tests.cc b/src/common/test/db_tests.cc index 5a4437af9..5ed016404 100644 --- a/src/common/test/db_tests.cc +++ b/src/common/test/db_tests.cc @@ -138,8 +138,7 @@ void task_table_test_callback(Task *callback_task, void *user_data) { task_table_test_callback_called = 1; CHECK(Task_state(callback_task) == TASK_STATUS_SCHEDULED); CHECK(Task_size(callback_task) == Task_size(task_table_test_task)); - CHECK(memcmp(callback_task, task_table_test_task, Task_size(callback_task)) == - 0); + CHECK(Task_equals(callback_task, task_table_test_task)); event_loop *loop = (event_loop *) user_data; event_loop_stop(loop); } @@ -151,11 +150,9 @@ TEST task_table_test(void) { "127.0.0.1", std::vector()); db_attach(db, loop, false); DBClientID local_scheduler_id = globally_unique_id(); - int64_t task_spec_size; - TaskSpec *spec = example_task_spec(1, 1, &task_spec_size); - task_table_test_task = Task_alloc(spec, task_spec_size, TASK_STATUS_SCHEDULED, - local_scheduler_id); - TaskSpec_free(spec); + TaskExecutionSpec spec = example_task_execution_spec(1, 1); + task_table_test_task = + Task_alloc(spec, TASK_STATUS_SCHEDULED, local_scheduler_id); RetryInfo retry = { .num_retries = NUM_RETRIES, .timeout = TIMEOUT, @@ -186,13 +183,10 @@ TEST task_table_all_test(void) { DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "local_scheduler", "127.0.0.1", std::vector()); db_attach(db, loop, false); - int64_t task_spec_size; - TaskSpec *spec = example_task_spec(1, 1, &task_spec_size); + TaskExecutionSpec spec = example_task_execution_spec(1, 1); /* Schedule two tasks on different local local schedulers. */ - Task *task1 = Task_alloc(spec, task_spec_size, TASK_STATUS_SCHEDULED, - globally_unique_id()); - Task *task2 = Task_alloc(spec, task_spec_size, TASK_STATUS_SCHEDULED, - globally_unique_id()); + Task *task1 = Task_alloc(spec, TASK_STATUS_SCHEDULED, globally_unique_id()); + Task *task2 = Task_alloc(spec, TASK_STATUS_SCHEDULED, globally_unique_id()); RetryInfo retry = { .num_retries = NUM_RETRIES, .timeout = TIMEOUT, .fail_callback = NULL, }; @@ -207,7 +201,6 @@ TEST task_table_all_test(void) { event_loop_add_timer(loop, 200, (event_loop_timer_handler) timeout_handler, NULL); event_loop_run(loop); - TaskSpec_free(spec); db_disconnect(db); destroy_outstanding_callbacks(loop); event_loop_destroy(loop); diff --git a/src/common/test/example_task.h b/src/common/test/example_task.h index ba26da7f5..8d44cd205 100644 --- a/src/common/test/example_task.h +++ b/src/common/test/example_task.h @@ -7,10 +7,10 @@ extern TaskBuilder *g_task_builder; const int64_t arg_value_size = 1000; -static inline TaskSpec *example_task_spec_with_args(int64_t num_args, - int64_t num_returns, - ObjectID arg_ids[], - int64_t *task_spec_size) { +static inline TaskExecutionSpec example_task_execution_spec_with_args( + int64_t num_args, + int64_t num_returns, + ObjectID arg_ids[]) { TaskID parent_task_id = globally_unique_id(); FunctionID func_id = globally_unique_id(); TaskSpec_start_construct(g_task_builder, NIL_ID, parent_task_id, 0, @@ -25,36 +25,53 @@ static inline TaskSpec *example_task_spec_with_args(int64_t num_args, } TaskSpec_args_add_ref(g_task_builder, &arg_id, 1); } - return TaskSpec_finish_construct(g_task_builder, task_spec_size); + int64_t task_spec_size; + TaskSpec *spec = TaskSpec_finish_construct(g_task_builder, &task_spec_size); + std::vector execution_dependencies; + auto execution_spec = + TaskExecutionSpec(execution_dependencies, spec, task_spec_size); + TaskSpec_free(spec); + return execution_spec; } -static inline TaskSpec *example_task_spec(int64_t num_args, - int64_t num_returns, - int64_t *task_spec_size) { - return example_task_spec_with_args(num_args, num_returns, NULL, - task_spec_size); +static inline TaskExecutionSpec example_task_execution_spec( + int64_t num_args, + int64_t num_returns) { + return example_task_execution_spec_with_args(num_args, num_returns, NULL); } static inline Task *example_task_with_args(int64_t num_args, int64_t num_returns, int task_state, ObjectID arg_ids[]) { - int64_t task_spec_size; - TaskSpec *spec = example_task_spec_with_args(num_args, num_returns, arg_ids, - &task_spec_size); - Task *instance = Task_alloc(spec, task_spec_size, task_state, NIL_ID); - TaskSpec_free(spec); + TaskExecutionSpec spec = + example_task_execution_spec_with_args(num_args, num_returns, arg_ids); + Task *instance = Task_alloc(spec, task_state, NIL_ID); return instance; } static inline Task *example_task(int64_t num_args, int64_t num_returns, int task_state) { - int64_t task_spec_size; - TaskSpec *spec = example_task_spec(num_args, num_returns, &task_spec_size); - Task *instance = Task_alloc(spec, task_spec_size, task_state, NIL_ID); - TaskSpec_free(spec); + TaskExecutionSpec spec = example_task_execution_spec(num_args, num_returns); + Task *instance = Task_alloc(spec, task_state, NIL_ID); return instance; } +static inline bool Task_equals(Task *task1, Task *task2) { + if (task1->state != task2->state) { + return false; + } + if (!DBClientID_equal(task1->local_scheduler_id, task2->local_scheduler_id)) { + return false; + } + auto execution_spec1 = Task_task_execution_spec(task1); + auto execution_spec2 = Task_task_execution_spec(task2); + if (execution_spec1->SpecSize() != execution_spec2->SpecSize()) { + return false; + } + return memcmp(execution_spec1->Spec(), execution_spec2->Spec(), + execution_spec1->SpecSize()) == 0; +} + #endif /* EXAMPLE_TASK_H */ diff --git a/src/common/test/object_table_tests.cc b/src/common/test/object_table_tests.cc index d9c768197..584ba9475 100644 --- a/src/common/test/object_table_tests.cc +++ b/src/common/test/object_table_tests.cc @@ -80,7 +80,7 @@ TEST new_object_test(void) { new_object_succeeded = 0; new_object_id = globally_unique_id(); new_object_task = example_task(1, 1, TASK_STATUS_WAITING); - new_object_task_spec = Task_task_spec(new_object_task); + new_object_task_spec = Task_task_execution_spec(new_object_task)->Spec(); new_object_task_id = TaskSpec_task_id(new_object_task_spec); g_loop = event_loop_create(); DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager", diff --git a/src/common/test/task_table_tests.cc b/src/common/test/task_table_tests.cc index c51120526..1b9eccd37 100644 --- a/src/common/test/task_table_tests.cc +++ b/src/common/test/task_table_tests.cc @@ -75,7 +75,7 @@ void add_lookup_fail_callback(UniqueID id, void lookup_success_callback(Task *task, void *context) { lookup_success = 1; - CHECK(memcmp(task, add_lookup_task, Task_size(task)) == 0); + CHECK(Task_equals(task, add_lookup_task)); event_loop_stop(g_loop); } diff --git a/src/global_scheduler/global_scheduler.cc b/src/global_scheduler/global_scheduler.cc index e3256bbd9..989d09188 100644 --- a/src/global_scheduler/global_scheduler.cc +++ b/src/global_scheduler/global_scheduler.cc @@ -65,7 +65,7 @@ void assign_task_to_local_scheduler(GlobalSchedulerState *state, Task *task, DBClientID local_scheduler_id) { char id_string[ID_STRING_SIZE]; - TaskSpec *spec = Task_task_spec(task); + TaskSpec *spec = Task_task_execution_spec(task)->Spec(); LOG_DEBUG("assigning task to local_scheduler_id = %s", ObjectID_to_string(local_scheduler_id, id_string, ID_STRING_SIZE)); Task_set_state(task, TASK_STATUS_SCHEDULED); diff --git a/src/global_scheduler/global_scheduler_algorithm.cc b/src/global_scheduler/global_scheduler_algorithm.cc index 83c588c77..03d978bd9 100644 --- a/src/global_scheduler/global_scheduler_algorithm.cc +++ b/src/global_scheduler/global_scheduler_algorithm.cc @@ -117,7 +117,7 @@ double calculate_cost_pending(const GlobalSchedulerState *state, bool handle_task_waiting(GlobalSchedulerState *state, GlobalSchedulerPolicyState *policy_state, Task *task) { - TaskSpec *task_spec = Task_task_spec(task); + TaskSpec *task_spec = Task_task_execution_spec(task)->Spec(); CHECKM(task_spec != NULL, "task wait handler encounted a task with NULL spec"); diff --git a/src/local_scheduler/format/local_scheduler.fbs b/src/local_scheduler/format/local_scheduler.fbs index 18dd65e5b..04ea001a2 100644 --- a/src/local_scheduler/format/local_scheduler.fbs +++ b/src/local_scheduler/format/local_scheduler.fbs @@ -36,6 +36,11 @@ enum MessageType:int { PutObject } +table SubmitTaskRequest { + execution_dependencies: [string]; + task_spec: string; +} + // This message is sent from a worker to a local scheduler. table GetTaskRequest { // Whether the previously assigned task was a checkpoint task that failed. diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 8c8c5b9ff..d5ceae82f 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -121,7 +121,7 @@ void kill_worker(LocalSchedulerState *state, /* If this worker is still running a task and we aren't cleaning up, push an * error message to the driver responsible for the task. */ if (worker->task_in_progress != NULL && !cleanup && !suppress_warning) { - TaskSpec *spec = Task_task_spec(worker->task_in_progress); + TaskSpec *spec = Task_task_execution_spec(worker->task_in_progress)->Spec(); TaskID task_id = TaskSpec_task_id(spec); push_error(state->db, TaskSpec_driver_id(spec), WORKER_DIED_ERROR_INDEX, sizeof(task_id), task_id.id); @@ -519,9 +519,10 @@ bool is_driver_alive(LocalSchedulerState *state, WorkerID driver_id) { } void assign_task_to_worker(LocalSchedulerState *state, - TaskSpec *spec, - int64_t task_spec_size, + TaskExecutionSpec &execution_spec, LocalSchedulerClient *worker) { + int64_t task_spec_size = execution_spec.SpecSize(); + TaskSpec *spec = execution_spec.Spec(); // Acquire the necessary resources for running this task. const std::unordered_map required_resources = TaskSpec_get_required_resources(spec); @@ -560,7 +561,7 @@ void assign_task_to_worker(LocalSchedulerState *state, } } - Task *task = Task_alloc(spec, task_spec_size, TASK_STATUS_RUNNING, + Task *task = Task_alloc(execution_spec, TASK_STATUS_RUNNING, state->db ? get_db_client_id(state->db) : NIL_ID); /* Record which task this worker is executing. This will be freed in * process_message when the worker sends a GetTask message to the local @@ -578,7 +579,7 @@ void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker, bool actor_checkpoint_failed) { if (worker->task_in_progress != NULL) { - TaskSpec *spec = Task_task_spec(worker->task_in_progress); + TaskSpec *spec = Task_task_execution_spec(worker->task_in_progress)->Spec(); /* Return dynamic resources back for the task in progress. */ CHECK(worker->resources_in_use["CPU"] == TaskSpec_get_required_resource(spec, "CPU")); @@ -663,22 +664,21 @@ void reconstruct_task_update_callback(Task *task, /* Otherwise, the test-and-set succeeded, so resubmit the task for execution * to ensure that reconstruction will happen. */ - TaskSpec *spec = Task_task_spec(task); + TaskExecutionSpec *execution_spec = Task_task_execution_spec(task); + TaskSpec *spec = execution_spec->Spec(); if (ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)) { - handle_task_submitted(state, state->algorithm_state, Task_task_spec(task), - Task_task_spec_size(task)); + handle_task_submitted(state, state->algorithm_state, *execution_spec); } else { - handle_actor_task_submitted(state, state->algorithm_state, - Task_task_spec(task), - Task_task_spec_size(task)); + handle_actor_task_submitted(state, state->algorithm_state, *execution_spec); } /* Recursively reconstruct the task's inputs, if necessary. */ - for (int64_t i = 0; i < TaskSpec_num_args(spec); ++i) { - int count = TaskSpec_arg_id_count(spec, i); + int64_t num_dependencies = execution_spec->NumDependencies(); + for (int64_t i = 0; i < num_dependencies; ++i) { + int count = execution_spec->DependencyIdCount(i); for (int64_t j = 0; j < count; ++j) { - ObjectID arg_id = TaskSpec_arg_id(spec, i, j); - reconstruct_object(state, arg_id); + ObjectID dependency_id = execution_spec->DependencyId(i, j); + reconstruct_object(state, dependency_id); } } } @@ -706,7 +706,7 @@ void reconstruct_put_task_update_callback(Task *task, /* (1) The task is still executing on a live node. The object created * by `ray.put` was not able to be reconstructed, and the workload will * likely hang. Push an error to the appropriate driver. */ - TaskSpec *spec = Task_task_spec(task); + TaskSpec *spec = Task_task_execution_spec(task)->Spec(); FunctionID function = TaskSpec_function(spec); push_error(state->db, TaskSpec_driver_id(spec), PUT_RECONSTRUCTION_ERROR_INDEX, sizeof(function), @@ -716,7 +716,7 @@ void reconstruct_put_task_update_callback(Task *task, /* (1) The task is still executing and it is the driver task. We cannot * restart the driver task, so the workload will hang. Push an error to * the appropriate driver. */ - TaskSpec *spec = Task_task_spec(task); + TaskSpec *spec = Task_task_execution_spec(task)->Spec(); FunctionID function = TaskSpec_function(spec); push_error(state->db, TaskSpec_driver_id(spec), PUT_RECONSTRUCTION_ERROR_INDEX, sizeof(function), function.id); @@ -951,7 +951,8 @@ void handle_driver_removed_callback(WorkerID driver_id, void *user_context) { kill_worker(state, *it, false, true); } } else if (task != NULL) { - if (WorkerID_equal(TaskSpec_driver_id(Task_task_spec(task)), driver_id)) { + TaskSpec *spec = Task_task_execution_spec(task)->Spec(); + if (WorkerID_equal(TaskSpec_driver_id(spec), driver_id)) { LOG_DEBUG("Killing a worker executing a task for a removed driver."); kill_worker(state, *it, false, true); } @@ -989,14 +990,19 @@ void process_message(event_loop *loop, LocalSchedulerState *state = worker->local_scheduler_state; int64_t type; - int64_t length = read_vector(client_sock, &type, state->input_buffer); + read_vector(client_sock, &type, state->input_buffer); uint8_t *input = state->input_buffer.data(); LOG_DEBUG("New event of type %" PRId64, type); switch (type) { case MessageType_SubmitTask: { - TaskSpec *spec = (TaskSpec *) input; + auto message = flatbuffers::GetRoot(input); + TaskExecutionSpec execution_spec = + TaskExecutionSpec(from_flatbuf(*message->execution_dependencies()), + (TaskSpec *) message->task_spec()->data(), + message->task_spec()->size()); + TaskSpec *spec = execution_spec.Spec(); /* Update the result table, which holds mappings of object ID -> ID of the * task that created it. */ if (state->db != NULL) { @@ -1010,11 +1016,11 @@ void process_message(event_loop *loop, /* Handle the task submission. */ if (ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)) { - handle_task_submitted(state, state->algorithm_state, spec, length); + handle_task_submitted(state, state->algorithm_state, execution_spec); } else { - handle_actor_task_submitted(state, state->algorithm_state, spec, length); + handle_actor_task_submitted(state, state->algorithm_state, + execution_spec); } - } break; case MessageType_TaskDone: { } break; @@ -1097,7 +1103,8 @@ void process_message(event_loop *loop, * maximum number of resources. This could be fixed by having blocked * workers explicitly yield and wait to be given back resources before * continuing execution. */ - TaskSpec *spec = Task_task_spec(worker->task_in_progress); + TaskSpec *spec = + Task_task_execution_spec(worker->task_in_progress)->Spec(); std::unordered_map cpu_resources; cpu_resources["CPU"] = TaskSpec_get_required_resource(spec, "CPU"); acquire_resources(state, worker, cpu_resources); @@ -1182,7 +1189,8 @@ void signal_handler(int signal) { void handle_task_scheduled_callback(Task *original_task, void *subscribe_context) { LocalSchedulerState *state = (LocalSchedulerState *) subscribe_context; - TaskSpec *spec = Task_task_spec(original_task); + TaskExecutionSpec *execution_spec = Task_task_execution_spec(original_task); + TaskSpec *spec = execution_spec->Spec(); /* If the driver for this task has been removed, then don't bother telling the * scheduling algorithm. */ @@ -1194,13 +1202,11 @@ void handle_task_scheduled_callback(Task *original_task, if (ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)) { /* This task does not involve an actor. Handle it normally. */ - handle_task_scheduled(state, state->algorithm_state, spec, - Task_task_spec_size(original_task)); + handle_task_scheduled(state, state->algorithm_state, *execution_spec); } else { /* This task involves an actor. Call the scheduling algorithm's actor * handler. */ - handle_actor_task_scheduled(state, state->algorithm_state, spec, - Task_task_spec_size(original_task)); + handle_actor_task_scheduled(state, state->algorithm_state, *execution_spec); } } diff --git a/src/local_scheduler/local_scheduler.h b/src/local_scheduler/local_scheduler.h index 0bb634630..82de03878 100644 --- a/src/local_scheduler/local_scheduler.h +++ b/src/local_scheduler/local_scheduler.h @@ -37,8 +37,7 @@ bool is_driver_alive(WorkerID driver_id); * @return Void. */ void assign_task_to_worker(LocalSchedulerState *state, - TaskSpec *task, - int64_t task_spec_size, + TaskExecutionSpec &task, LocalSchedulerClient *worker); /* diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index 1c00b83c2..c6fbf2bc5 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -17,21 +17,13 @@ void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id); void give_task_to_global_scheduler(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *spec, - int64_t task_spec_size); + TaskExecutionSpec &execution_spec); void give_task_to_local_scheduler(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *spec, - int64_t task_spec_size, + TaskExecutionSpec &execution_spec, DBClientID local_scheduler_id); -struct TaskQueueEntry { - /** The task that is queued. */ - TaskSpec *spec; - int64_t task_spec_size; -}; - /** A data structure used to track which objects are available locally and * which objects are being actively fetched. Objects of this type are used for * both the scheduling algorithm state's local_objects and remote_objects @@ -42,7 +34,7 @@ struct ObjectEntry { * the tasks in the waiting queue. Each element actually stores a reference * to the corresponding task's queue entry in waiting queue, for fast * deletion when all of the task's dependencies become available. */ - std::vector::iterator> dependent_tasks; + std::vector::iterator> dependent_tasks; /** Whether or not to request a transfer of this object. This should be set * to true for all objects except for actor dummy objects, where the object * must be generated by executing the task locally. */ @@ -75,7 +67,7 @@ typedef struct { bool loaded; /** A queue of tasks to be executed on this actor. The tasks will be sorted by * the order of their actor counters. */ - std::list *task_queue; + std::list *task_queue; /** The worker that the actor is running on. */ LocalSchedulerClient *worker; /** True if the worker is available and false otherwise. */ @@ -86,10 +78,10 @@ typedef struct { * algorithm. */ struct SchedulingAlgorithmState { /** An array of pointers to tasks that are waiting for dependencies. */ - std::list *waiting_task_queue; + std::list *waiting_task_queue; /** An array of pointers to tasks whose dependencies are ready but that are * waiting to be assigned to a worker. */ - std::list *dispatch_task_queue; + std::list *dispatch_task_queue; /** This is a hash table from actor ID to information about that actor. In * particular, a queue of tasks that are waiting to execute on that actor. * This is only used for actors that exist locally. */ @@ -104,7 +96,7 @@ struct SchedulingAlgorithmState { * assign them to the correct local scheduler yet. Whenever a notification * about a new local scheduler arrives, we will resubmit all of these tasks * locally. */ - std::vector cached_submitted_actor_tasks; + std::vector cached_submitted_actor_tasks; /** An array of pointers to workers in the worker pool. These are workers * that have registered a PID with us and that are now waiting to be * assigned a task to execute. */ @@ -129,38 +121,19 @@ struct SchedulingAlgorithmState { std::unordered_map remote_objects; }; -TaskQueueEntry TaskQueueEntry_init(TaskSpec *spec, int64_t task_spec_size) { - TaskQueueEntry elt; - elt.spec = TaskSpec_copy(spec, task_spec_size); - elt.task_spec_size = task_spec_size; - return elt; -} - -void TaskQueueEntry_free(TaskQueueEntry *entry) { - TaskSpec_free(entry->spec); -} - SchedulingAlgorithmState *SchedulingAlgorithmState_init(void) { SchedulingAlgorithmState *algorithm_state = new SchedulingAlgorithmState(); /* Initialize the local data structures used for queuing tasks and workers. */ - algorithm_state->waiting_task_queue = new std::list(); - algorithm_state->dispatch_task_queue = new std::list(); + algorithm_state->waiting_task_queue = new std::list(); + algorithm_state->dispatch_task_queue = new std::list(); return algorithm_state; } void SchedulingAlgorithmState_free(SchedulingAlgorithmState *algorithm_state) { /* Free all of the tasks in the waiting queue. */ - for (auto &task : *algorithm_state->waiting_task_queue) { - TaskQueueEntry_free(&task); - } - algorithm_state->waiting_task_queue->clear(); delete algorithm_state->waiting_task_queue; /* Free all the tasks in the dispatch queue. */ - for (auto &task : *algorithm_state->dispatch_task_queue) { - TaskQueueEntry_free(&task); - } - algorithm_state->dispatch_task_queue->clear(); delete algorithm_state->dispatch_task_queue; /* Remove all of the remaining actors. */ while (algorithm_state->local_actor_infos.size() != 0) { @@ -168,12 +141,6 @@ void SchedulingAlgorithmState_free(SchedulingAlgorithmState *algorithm_state) { ActorID actor_id = it->first; remove_actor(algorithm_state, actor_id); } - /* Free the list of cached actor task specs and the task specs themselves. */ - for (size_t i = 0; i < algorithm_state->cached_submitted_actor_tasks.size(); - ++i) { - TaskQueueEntry task = algorithm_state->cached_submitted_actor_tasks[i]; - TaskQueueEntry_free(&task); - } /* Free the algorithm state. */ delete algorithm_state; } @@ -254,7 +221,7 @@ void create_actor(SchedulingAlgorithmState *algorithm_state, entry.task_counters[NIL_ACTOR_ID] = 0; entry.assigned_task_counter = -1; entry.assigned_task_handle_id = NIL_ACTOR_ID; - entry.task_queue = new std::list(); + entry.task_queue = new std::list(); entry.worker = worker; entry.worker_available = false; entry.loaded = false; @@ -283,10 +250,6 @@ void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id) { } ARROW_UNUSED(id_string); - /* Free all remaining tasks in the actor queue. */ - for (auto &task : *entry.task_queue) { - TaskQueueEntry_free(&task); - } entry.task_queue->clear(); delete entry.task_queue; /* Remove the entry from the hash table. */ @@ -337,8 +300,9 @@ bool dispatch_actor_task(LocalSchedulerState *state, /* Check whether we can execute the first task in the queue. */ auto task = entry.task_queue->begin(); - int64_t next_task_counter = TaskSpec_actor_counter(task->spec); - ActorID next_task_handle_id = TaskSpec_actor_handle_id(task->spec); + TaskSpec *spec = task->Spec(); + int64_t next_task_counter = TaskSpec_actor_counter(spec); + ActorID next_task_handle_id = TaskSpec_actor_handle_id(spec); if (entry.loaded) { /* Once the actor has loaded, we can only execute tasks in order of * task_counter. */ @@ -350,25 +314,22 @@ bool dispatch_actor_task(LocalSchedulerState *state, * matches task_counter (the first task), or a checkpoint task. */ if (next_task_counter != entry.task_counters[next_task_handle_id]) { /* No other task should be first in the queue. */ - CHECK(TaskSpec_is_actor_checkpoint_method(task->spec)); + CHECK(TaskSpec_is_actor_checkpoint_method(spec)); } } /* If there are not enough resources available, we cannot assign the task. */ - CHECK(0 == TaskSpec_get_required_resource(task->spec, "GPU")); - if (!check_dynamic_resources(state, - TaskSpec_get_required_resources(task->spec))) { + CHECK(0 == TaskSpec_get_required_resource(spec, "GPU")); + if (!check_dynamic_resources(state, TaskSpec_get_required_resources(spec))) { return false; } /* Assign the first task in the task queue to the worker and mark the worker * as unavailable. */ - assign_task_to_worker(state, task->spec, task->task_spec_size, entry.worker); + assign_task_to_worker(state, *task, entry.worker); entry.assigned_task_counter = next_task_counter; entry.assigned_task_handle_id = next_task_handle_id; entry.worker_available = false; - /* Free the task queue entry. */ - TaskQueueEntry_free(&(*task)); /* Remove the task from the actor's task queue. */ entry.task_queue->erase(task); @@ -400,8 +361,8 @@ void handle_actor_worker_connect(LocalSchedulerState *state, * Finishes a killed task by inserting dummy objects for each of its returns. */ void finish_killed_task(LocalSchedulerState *state, - TaskSpec *spec, - int64_t task_spec_size) { + TaskExecutionSpec &execution_spec) { + TaskSpec *spec = execution_spec.Spec(); int64_t num_returns = TaskSpec_num_returns(spec); for (int i = 0; i < num_returns; i++) { ObjectID object_id = TaskSpec_return(spec, i); @@ -418,7 +379,7 @@ void finish_killed_task(LocalSchedulerState *state, } /* Mark the task as done. */ if (state->db != NULL) { - Task *task = Task_alloc(spec, task_spec_size, TASK_STATUS_DONE, + Task *task = Task_alloc(execution_spec, TASK_STATUS_DONE, get_db_client_id(state->db)); task_table_update(state->db, task, NULL, NULL, NULL); } @@ -437,15 +398,16 @@ void finish_killed_task(LocalSchedulerState *state, */ void insert_actor_task_queue(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskQueueEntry task_entry) { + TaskExecutionSpec task_entry) { + TaskSpec *spec = task_entry.Spec(); /* Get the local actor entry for this actor. */ - ActorID actor_id = TaskSpec_actor_id(task_entry.spec); - ActorID task_handle_id = TaskSpec_actor_handle_id(task_entry.spec); - int64_t task_counter = TaskSpec_actor_counter(task_entry.spec); + ActorID actor_id = TaskSpec_actor_id(spec); + ActorID task_handle_id = TaskSpec_actor_handle_id(spec); + int64_t task_counter = TaskSpec_actor_counter(spec); /* Fail the task immediately; it's destined for a dead actor. */ if (state->removed_actors.find(actor_id) != state->removed_actors.end()) { - finish_killed_task(state, task_entry.spec, task_entry.task_spec_size); + finish_killed_task(state, task_entry); return; } @@ -480,23 +442,25 @@ void insert_actor_task_queue(LocalSchedulerState *state, * the submitted task's and the same handle ID. */ auto it = entry.task_queue->begin(); for (; it != entry.task_queue->end(); it++) { + TaskSpec *pending_task_spec = it->Spec(); /* Skip tasks submitted by a different handle. */ - if (!ActorID_equal(task_handle_id, TaskSpec_actor_handle_id(it->spec))) { + if (!ActorID_equal(task_handle_id, + TaskSpec_actor_handle_id(pending_task_spec))) { continue; } /* A duplicate task submitted by the same handle. */ - if (task_counter == TaskSpec_actor_counter(it->spec)) { + if (task_counter == TaskSpec_actor_counter(pending_task_spec)) { LOG_INFO( "A task was resubmitted, so we are ignoring it. This should only " "happen during reconstruction."); return; } /* We found a task with the same handle ID and a greater task counter. */ - if (task_counter < TaskSpec_actor_counter(it->spec)) { + if (task_counter < TaskSpec_actor_counter(pending_task_spec)) { break; } } - entry.task_queue->insert(it, task_entry); + entry.task_queue->insert(it, std::move(task_entry)); /* Record the fact that this actor has a task waiting to execute. */ algorithm_state->actors_with_pending_tasks.insert(actor_id); @@ -517,15 +481,15 @@ void insert_actor_task_queue(LocalSchedulerState *state, */ void queue_actor_task(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *spec, - int64_t task_spec_size, + TaskExecutionSpec &execution_spec, bool from_global_scheduler) { + TaskSpec *spec = execution_spec.Spec(); ActorID actor_id = TaskSpec_actor_id(spec); DCHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID)); /* Update the task table. */ if (state->db != NULL) { - Task *task = Task_alloc(spec, task_spec_size, TASK_STATUS_QUEUED, + Task *task = Task_alloc(execution_spec, TASK_STATUS_QUEUED, get_db_client_id(state->db)); if (from_global_scheduler) { /* If the task is from the global scheduler, it's already been added to @@ -542,8 +506,8 @@ void queue_actor_task(LocalSchedulerState *state, // Create a new task queue entry. This must come after the above block because // insert_actor_task_queue may call task_table_update internally, which must // come after the prior call to task_table_add_task. - TaskQueueEntry elt = TaskQueueEntry_init(spec, task_spec_size); - insert_actor_task_queue(state, algorithm_state, elt); + TaskExecutionSpec copy = TaskExecutionSpec(&execution_spec); + insert_actor_task_queue(state, algorithm_state, std::move(copy)); } /** @@ -555,14 +519,18 @@ void queue_actor_task(LocalSchedulerState *state, * @param algorithm_state The scheduling algorithm state. * @param task_entry_it A reference to the task entry in the waiting queue. * @param obj_id The ID of the object that the task is dependent on. - * @param arg_index The object's index in the dependent task's arguments. + * @param request_transfer Whether to request a transfer of this object from + * other plasma managers. This should be set to false for execution + * dependencies, which should be fulfilled by executing the + * corresponding task locally. * @returns Void. */ -void fetch_missing_dependency(LocalSchedulerState *state, - SchedulingAlgorithmState *algorithm_state, - std::list::iterator task_entry_it, - plasma::ObjectID obj_id, - int64_t arg_index) { +void fetch_missing_dependency( + LocalSchedulerState *state, + SchedulingAlgorithmState *algorithm_state, + std::list::iterator task_entry_it, + plasma::ObjectID obj_id, + bool request_transfer) { if (algorithm_state->remote_objects.count(obj_id) == 0) { /* We weren't actively fetching this object. Try the fetch once * immediately. */ @@ -585,15 +553,7 @@ void fetch_missing_dependency(LocalSchedulerState *state, * the object becomes available locally. It will get freed if the object is * subsequently removed locally. */ ObjectEntry entry; - /* If the task is for an actor, and the missing object is a dummy object, - * then we must generate it locally by executing the corresponding task. - * All other objects may be requested from another plasma manager. */ - if (TaskSpec_is_actor_task(task_entry_it->spec) && - TaskSpec_arg_is_actor_dummy_object(task_entry_it->spec, arg_index)) { - entry.request_transfer = false; - } else { - entry.request_transfer = true; - } + entry.request_transfer = request_transfer; algorithm_state->remote_objects[obj_id] = entry; } algorithm_state->remote_objects[obj_id].dependent_tasks.push_back( @@ -613,18 +573,20 @@ void fetch_missing_dependency(LocalSchedulerState *state, void fetch_missing_dependencies( LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - std::list::iterator task_entry_it) { - TaskSpec *task = task_entry_it->spec; - int64_t num_args = TaskSpec_num_args(task); + std::list::iterator task_entry_it) { + int64_t num_dependencies = task_entry_it->NumDependencies(); int num_missing_dependencies = 0; - for (int64_t i = 0; i < num_args; ++i) { - int count = TaskSpec_arg_id_count(task, i); + for (int64_t i = 0; i < num_dependencies; ++i) { + int count = task_entry_it->DependencyIdCount(i); for (int j = 0; j < count; ++j) { - ObjectID obj_id = TaskSpec_arg_id(task, i, j); + ObjectID obj_id = task_entry_it->DependencyId(i, j); + /* If the entry is not yet available locally, record the dependency. */ if (algorithm_state->local_objects.count(obj_id) == 0) { - /* If the entry is not yet available locally, record the dependency. */ + /* Do not request a transfer from other plasma managers if this is an + * execution dependency. */ + bool request_transfer = task_entry_it->IsStaticDependency(i); fetch_missing_dependency(state, algorithm_state, task_entry_it, - obj_id.to_plasma_id(), i); + obj_id.to_plasma_id(), request_transfer); ++num_missing_dependencies; } } @@ -642,12 +604,13 @@ void fetch_missing_dependencies( * task are present in the local object store, otherwise it returns * false. */ -bool can_run(SchedulingAlgorithmState *algorithm_state, TaskSpec *task) { - int64_t num_args = TaskSpec_num_args(task); - for (int i = 0; i < num_args; ++i) { - int count = TaskSpec_arg_id_count(task, i); +bool can_run(SchedulingAlgorithmState *algorithm_state, + TaskExecutionSpec &task) { + int64_t num_dependencies = task.NumDependencies(); + for (int i = 0; i < num_dependencies; ++i) { + int count = task.DependencyIdCount(i); for (int j = 0; j < count; ++j) { - ObjectID obj_id = TaskSpec_arg_id(task, i, j); + ObjectID obj_id = task.DependencyId(i, j); if (algorithm_state->local_objects.count(obj_id) == 0) { /* The object is not present locally, so this task cannot be scheduled * right now. */ @@ -797,7 +760,7 @@ void dispatch_tasks(LocalSchedulerState *state, /* Assign as many tasks as we can, while there are workers available. */ for (auto it = algorithm_state->dispatch_task_queue->begin(); it != algorithm_state->dispatch_task_queue->end();) { - TaskQueueEntry task = *it; + TaskSpec *spec = it->Spec(); /* If there is a task to assign, but there are no more available workers in * the worker pool, then exit. Ensure that there will be an available * worker during a future invocation of dispatch_tasks. */ @@ -817,7 +780,7 @@ void dispatch_tasks(LocalSchedulerState *state, /* Skip to the next task if this task cannot currently be satisfied. */ if (!check_dynamic_resources(state, - TaskSpec_get_required_resources(task.spec))) { + TaskSpec_get_required_resources(spec))) { /* This task could not be satisfied -- proceed to the next task. */ ++it; continue; @@ -828,14 +791,12 @@ void dispatch_tasks(LocalSchedulerState *state, /* Get the last available worker in the available worker queue. */ LocalSchedulerClient *worker = algorithm_state->available_workers.back(); /* Tell the available worker to execute the task. */ - assign_task_to_worker(state, task.spec, task.task_spec_size, worker); + assign_task_to_worker(state, *it, worker); /* Remove the worker from the available queue, and add it to the executing * workers. */ algorithm_state->available_workers.pop_back(); algorithm_state->executing_workers.push_back(worker); - print_resource_info(state, task.spec); - /* Free the task queue entry. */ - TaskQueueEntry_free(&task); + print_resource_info(state, spec); /* Dequeue the task. */ it = algorithm_state->dispatch_task_queue->erase(it); } /* End for each task in the dispatch queue. */ @@ -883,24 +844,16 @@ void dispatch_all_tasks(LocalSchedulerState *state, * scheduler. If false, the task was submitted by a worker. * @return A reference to the entry in the queue that was pushed. */ -std::list::iterator queue_task( +std::list::iterator queue_task( LocalSchedulerState *state, - std::list *task_queue, - TaskQueueEntry *task_entry, + std::list *task_queue, + TaskExecutionSpec &task_entry, bool from_global_scheduler) { - /* Copy the spec and add it to the task queue. The allocated spec will be - * freed when it is assigned to a worker. */ - task_queue->push_back(*task_entry); - /* Since we just queued the task, we can get a reference to it by going to - * the last element in the queue. */ - auto it = task_queue->end(); - --it; - /* The task has been added to a local scheduler queue. Write the entry in the * task table to notify others that we have queued it. */ if (state->db != NULL) { - Task *task = Task_alloc(task_entry->spec, task_entry->task_spec_size, - TASK_STATUS_QUEUED, get_db_client_id(state->db)); + Task *task = + Task_alloc(task_entry, TASK_STATUS_QUEUED, get_db_client_id(state->db)); if (from_global_scheduler) { /* If the task is from the global scheduler, it's already been added to * the task table, so just update the entry. */ @@ -912,6 +865,15 @@ std::list::iterator queue_task( } } + /* Copy the spec and add it to the task queue. The allocated spec will be + * freed when it is assigned to a worker. */ + TaskExecutionSpec copy = TaskExecutionSpec(&task_entry); + task_queue->push_back(std::move(copy)); + /* Since we just queued the task, we can get a reference to it by going to + * the last element in the queue. */ + auto it = task_queue->end(); + --it; + return it; } @@ -930,13 +892,11 @@ std::list::iterator queue_task( */ void queue_waiting_task(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *spec, - int64_t task_spec_size, + TaskExecutionSpec &execution_spec, bool from_global_scheduler) { LOG_DEBUG("Queueing task in waiting queue"); - TaskQueueEntry task_entry = TaskQueueEntry_init(spec, task_spec_size); - auto it = queue_task(state, algorithm_state->waiting_task_queue, &task_entry, - from_global_scheduler); + auto it = queue_task(state, algorithm_state->waiting_task_queue, + execution_spec, from_global_scheduler); fetch_missing_dependencies(state, algorithm_state, it); } @@ -953,16 +913,15 @@ void queue_waiting_task(LocalSchedulerState *state, */ void queue_dispatch_task(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *spec, - int64_t task_spec_size, + TaskExecutionSpec &execution_spec, bool from_global_scheduler) { LOG_DEBUG("Queueing task in dispatch queue"); - TaskQueueEntry task_entry = TaskQueueEntry_init(spec, task_spec_size); + TaskSpec *spec = execution_spec.Spec(); if (TaskSpec_is_actor_task(spec)) { - queue_actor_task(state, algorithm_state, spec, task_spec_size, + queue_actor_task(state, algorithm_state, execution_spec, from_global_scheduler); } else { - queue_task(state, algorithm_state->dispatch_task_queue, &task_entry, + queue_task(state, algorithm_state->dispatch_task_queue, execution_spec, from_global_scheduler); } } @@ -981,16 +940,15 @@ void queue_dispatch_task(LocalSchedulerState *state, */ void queue_task_locally(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *spec, - int64_t task_spec_size, + TaskExecutionSpec &execution_spec, bool from_global_scheduler) { - if (can_run(algorithm_state, spec)) { + if (can_run(algorithm_state, execution_spec)) { /* Dependencies are ready, so push the task to the dispatch queue. */ - queue_dispatch_task(state, algorithm_state, spec, task_spec_size, + queue_dispatch_task(state, algorithm_state, execution_spec, from_global_scheduler); } else { /* Dependencies are not ready, so push the task to the waiting queue. */ - queue_waiting_task(state, algorithm_state, spec, task_spec_size, + queue_waiting_task(state, algorithm_state, execution_spec, from_global_scheduler); } } @@ -1002,14 +960,15 @@ void give_task_to_local_scheduler_retry(UniqueID id, Task *task = (Task *) user_data; CHECK(Task_state(task) == TASK_STATUS_SCHEDULED); - TaskSpec *spec = Task_task_spec(task); + TaskExecutionSpec *execution_spec = Task_task_execution_spec(task); + TaskSpec *spec = execution_spec->Spec(); CHECK(TaskSpec_is_actor_task(spec)); ActorID actor_id = TaskSpec_actor_id(spec); CHECK(state->actor_mapping.count(actor_id) == 1); give_task_to_local_scheduler( - state, state->algorithm_state, spec, Task_task_spec_size(task), + state, state->algorithm_state, *execution_spec, state->actor_mapping[actor_id].local_scheduler_id); } @@ -1025,8 +984,7 @@ void give_task_to_local_scheduler_retry(UniqueID id, */ void give_task_to_local_scheduler(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *spec, - int64_t task_spec_size, + TaskExecutionSpec &execution_spec, DBClientID local_scheduler_id) { if (DBClientID_equal(local_scheduler_id, get_db_client_id(state->db))) { LOG_WARN("Local scheduler is trying to assign a task to itself."); @@ -1034,8 +992,8 @@ void give_task_to_local_scheduler(LocalSchedulerState *state, CHECK(state->db != NULL); /* Assign the task to the relevant local scheduler. */ DCHECK(state->config.global_scheduler_exists); - Task *task = Task_alloc(spec, task_spec_size, TASK_STATUS_SCHEDULED, - local_scheduler_id); + Task *task = + Task_alloc(execution_spec, TASK_STATUS_SCHEDULED, local_scheduler_id); auto retryInfo = RetryInfo{ .num_retries = 0, // This value is unused. .timeout = 0, // This value is unused. @@ -1051,11 +1009,11 @@ void give_task_to_global_scheduler_retry(UniqueID id, Task *task = (Task *) user_data; CHECK(Task_state(task) == TASK_STATUS_WAITING); - TaskSpec *spec = Task_task_spec(task); + TaskExecutionSpec *execution_spec = Task_task_execution_spec(task); + TaskSpec *spec = execution_spec->Spec(); CHECK(!TaskSpec_is_actor_task(spec)); - give_task_to_global_scheduler(state, state->algorithm_state, spec, - Task_task_spec_size(task)); + give_task_to_global_scheduler(state, state->algorithm_state, *execution_spec); } /** @@ -1068,16 +1026,15 @@ void give_task_to_global_scheduler_retry(UniqueID id, */ void give_task_to_global_scheduler(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *spec, - int64_t task_spec_size) { + TaskExecutionSpec &execution_spec) { if (state->db == NULL || !state->config.global_scheduler_exists) { /* A global scheduler is not available, so queue the task locally. */ - queue_task_locally(state, algorithm_state, spec, task_spec_size, false); + queue_task_locally(state, algorithm_state, execution_spec, false); return; } /* Pass on the task to the global scheduler. */ DCHECK(state->config.global_scheduler_exists); - Task *task = Task_alloc(spec, task_spec_size, TASK_STATUS_WAITING, NIL_ID); + Task *task = Task_alloc(execution_spec, TASK_STATUS_WAITING, NIL_ID); DCHECK(state->db != NULL); auto retryInfo = RetryInfo{ .num_retries = 0, // This value is unused. @@ -1103,8 +1060,8 @@ bool resource_constraints_satisfied(LocalSchedulerState *state, void handle_task_submitted(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *spec, - int64_t task_spec_size) { + TaskExecutionSpec &execution_spec) { + TaskSpec *spec = execution_spec.Spec(); /* TODO(atumanov): if static is satisfied and local objects ready, but dynamic * resource is currently unavailable, then consider queueing task locally and * recheck dynamic next time. */ @@ -1115,11 +1072,11 @@ void handle_task_submitted(LocalSchedulerState *state, * the global scheduler if there is one. */ if (resource_constraints_satisfied(state, spec) && (algorithm_state->available_workers.size() > 0) && - can_run(algorithm_state, spec)) { - queue_dispatch_task(state, algorithm_state, spec, task_spec_size, false); + can_run(algorithm_state, execution_spec)) { + queue_dispatch_task(state, algorithm_state, execution_spec, false); } else { /* Give the task to the global scheduler to schedule, if it exists. */ - give_task_to_global_scheduler(state, algorithm_state, spec, task_spec_size); + give_task_to_global_scheduler(state, algorithm_state, execution_spec); } /* Try to dispatch tasks, since we may have added one to the queue. */ @@ -1128,8 +1085,8 @@ void handle_task_submitted(LocalSchedulerState *state, void handle_actor_task_submitted(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *task_spec, - int64_t task_spec_size) { + TaskExecutionSpec &execution_spec) { + TaskSpec *task_spec = execution_spec.Spec(); CHECK(TaskSpec_is_actor_task(task_spec)); ActorID actor_id = TaskSpec_actor_id(task_spec); @@ -1139,8 +1096,9 @@ void handle_actor_task_submitted(LocalSchedulerState *state, * will be resubmitted (internally by the local scheduler) whenever a new * actor notification arrives. NOTE(swang): These tasks have not yet been * added to the task table. */ - TaskQueueEntry task_entry = TaskQueueEntry_init(task_spec, task_spec_size); - algorithm_state->cached_submitted_actor_tasks.push_back(task_entry); + TaskExecutionSpec task_entry = TaskExecutionSpec(&execution_spec); + algorithm_state->cached_submitted_actor_tasks.push_back( + std::move(task_entry)); return; } @@ -1148,8 +1106,7 @@ void handle_actor_task_submitted(LocalSchedulerState *state, get_db_client_id(state->db))) { /* This local scheduler is responsible for the actor, so handle the task * locally. */ - queue_task_locally(state, algorithm_state, task_spec, task_spec_size, - false); + queue_task_locally(state, algorithm_state, execution_spec, false); /* Attempt to dispatch tasks to this actor. */ dispatch_actor_task(state, algorithm_state, actor_id); } else { @@ -1157,7 +1114,7 @@ void handle_actor_task_submitted(LocalSchedulerState *state, * scheduler that is responsible for this actor and assign the task directly * to that local scheduler. */ give_task_to_local_scheduler( - state, algorithm_state, task_spec, task_spec_size, + state, algorithm_state, execution_spec, state->actor_mapping[actor_id].local_scheduler_id); } } @@ -1171,11 +1128,10 @@ void handle_actor_creation_notification( algorithm_state->cached_submitted_actor_tasks.size(); for (int i = 0; i < num_cached_actor_tasks; ++i) { - TaskQueueEntry task = algorithm_state->cached_submitted_actor_tasks[i]; + TaskExecutionSpec &task = algorithm_state->cached_submitted_actor_tasks[i]; /* Note that handle_actor_task_submitted may append the spec to the end of * the cached_submitted_actor_tasks array. */ - handle_actor_task_submitted(state, algorithm_state, task.spec, - task.task_spec_size); + handle_actor_task_submitted(state, algorithm_state, task); } /* Remove all the tasks that were resubmitted. This does not erase the tasks * that were newly appended to the cached_submitted_actor_tasks array. */ @@ -1186,22 +1142,21 @@ void handle_actor_creation_notification( void handle_task_scheduled(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *spec, - int64_t task_spec_size) { + TaskExecutionSpec &execution_spec) { /* This callback handles tasks that were assigned to this local scheduler by * the global scheduler, so we can safely assert that there is a connection to * the database. */ DCHECK(state->db != NULL); DCHECK(state->config.global_scheduler_exists); /* Push the task to the appropriate queue. */ - queue_task_locally(state, algorithm_state, spec, task_spec_size, true); + queue_task_locally(state, algorithm_state, execution_spec, true); dispatch_tasks(state, algorithm_state); } void handle_actor_task_scheduled(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *spec, - int64_t task_spec_size) { + TaskExecutionSpec &execution_spec) { + TaskSpec *spec = execution_spec.Spec(); /* This callback handles tasks that were assigned to this local scheduler by * the global scheduler or by other workers, so we can safely assert that * there is a connection to the database. */ @@ -1225,7 +1180,7 @@ void handle_actor_task_scheduled(LocalSchedulerState *state, "corresponding actor_map_entry is not present. This should be rare."); } /* Push the task to the appropriate queue. */ - queue_task_locally(state, algorithm_state, spec, task_spec_size, true); + queue_task_locally(state, algorithm_state, execution_spec, true); dispatch_actor_task(state, algorithm_state, actor_id); } @@ -1301,8 +1256,8 @@ void handle_actor_worker_disconnect(LocalSchedulerState *state, } if (worker->task_in_progress != NULL) { - TaskSpec *spec = Task_task_spec(worker->task_in_progress); - finish_killed_task(state, spec, worker->task_in_progress->task_spec_size); + finish_killed_task(state, + *Task_task_execution_spec(worker->task_in_progress)); } state->removed_actors.insert(worker->actor_id); @@ -1311,7 +1266,7 @@ void handle_actor_worker_disconnect(LocalSchedulerState *state, LocalActorInfo &entry = algorithm_state->local_actor_infos.find(worker->actor_id)->second; for (auto &task : *entry.task_queue) { - finish_killed_task(state, task.spec, task.task_spec_size); + finish_killed_task(state, task); } } @@ -1417,11 +1372,11 @@ void handle_object_available(LocalSchedulerState *state, /* Out of the tasks that were dependent on this object, if they are now * ready to run, move them to the dispatch queue. */ for (auto &it : entry.dependent_tasks) { - if (can_run(algorithm_state, it->spec)) { - if (TaskSpec_is_actor_task(it->spec)) { - insert_actor_task_queue(state, algorithm_state, *it); + if (can_run(algorithm_state, *it)) { + if (TaskSpec_is_actor_task(it->Spec())) { + insert_actor_task_queue(state, algorithm_state, std::move(*it)); } else { - algorithm_state->dispatch_task_queue->push_back(*it); + algorithm_state->dispatch_task_queue->push_back(std::move(*it)); } /* Remove the entry with a matching TaskSpec pointer from the waiting * queue, but do not free the task spec. */ @@ -1453,11 +1408,10 @@ void handle_object_removed(LocalSchedulerState *state, * these tasks from the dispatch queue and push them to the waiting queue. */ for (auto it = algorithm_state->dispatch_task_queue->begin(); it != algorithm_state->dispatch_task_queue->end();) { - TaskQueueEntry task = *it; - if (TaskSpec_is_dependent_on(task.spec, removed_object_id)) { + if (it->DependsOn(removed_object_id)) { /* This task was dependent on the removed object. */ LOG_DEBUG("Moved task from dispatch queue back to waiting queue"); - algorithm_state->waiting_task_queue->push_back(task); + algorithm_state->waiting_task_queue->push_back(std::move(*it)); /* Remove the task from the dispatch queue, but do not free the task * spec. */ it = algorithm_state->dispatch_task_queue->erase(it); @@ -1473,10 +1427,10 @@ void handle_object_removed(LocalSchedulerState *state, auto actor_info = algorithm_state->local_actor_infos[*it]; for (auto queue_it = actor_info.task_queue->begin(); queue_it != actor_info.task_queue->end();) { - if (TaskSpec_is_dependent_on(queue_it->spec, removed_object_id)) { + if (queue_it->DependsOn(removed_object_id)) { /* This task was dependent on the removed object. */ LOG_DEBUG("Moved task from actor dispatch queue back to waiting queue"); - algorithm_state->waiting_task_queue->push_back(*queue_it); + algorithm_state->waiting_task_queue->push_back(std::move(*queue_it)); /* Remove the task from the dispatch queue, but do not free the task * spec. */ queue_it = actor_info.task_queue->erase(queue_it); @@ -1496,14 +1450,18 @@ void handle_object_removed(LocalSchedulerState *state, * those that were just moved from the dispatch queue. */ for (auto it = algorithm_state->waiting_task_queue->begin(); it != algorithm_state->waiting_task_queue->end(); ++it) { - int64_t num_args = TaskSpec_num_args(it->spec); - for (int64_t i = 0; i < num_args; ++i) { - int count = TaskSpec_arg_id_count(it->spec, i); + int64_t num_dependencies = it->NumDependencies(); + for (int64_t i = 0; i < num_dependencies; ++i) { + int count = it->DependencyIdCount(i); for (int j = 0; j < count; ++j) { - ObjectID arg_id = TaskSpec_arg_id(it->spec, i, j); - if (ObjectID_equal(arg_id, removed_object_id)) { + ObjectID dependency_id = it->DependencyId(i, j); + if (ObjectID_equal(dependency_id, removed_object_id)) { + /* Do not request a transfer from other plasma managers if this is an + * execution dependency. */ + bool request_transfer = it->IsStaticDependency(i); fetch_missing_dependency(state, algorithm_state, it, - removed_object_id.to_plasma_id(), i); + removed_object_id.to_plasma_id(), + request_transfer); } } } @@ -1524,7 +1482,7 @@ void handle_driver_removed(LocalSchedulerState *state, while (task_it_it != it->second.dependent_tasks.end()) { /* If the dependent task was a task for the removed driver, remove it from * this vector. */ - TaskSpec *spec = (*task_it_it)->spec; + TaskSpec *spec = (*task_it_it)->Spec(); if (WorkerID_equal(TaskSpec_driver_id(spec), driver_id)) { task_it_it = it->second.dependent_tasks.erase(task_it_it); } else { @@ -1543,7 +1501,8 @@ void handle_driver_removed(LocalSchedulerState *state, /* Remove this driver's tasks from the waiting task queue. */ auto it = algorithm_state->waiting_task_queue->begin(); while (it != algorithm_state->waiting_task_queue->end()) { - if (WorkerID_equal(TaskSpec_driver_id(it->spec), driver_id)) { + TaskSpec *spec = it->Spec(); + if (WorkerID_equal(TaskSpec_driver_id(spec), driver_id)) { it = algorithm_state->waiting_task_queue->erase(it); } else { it++; @@ -1553,7 +1512,8 @@ void handle_driver_removed(LocalSchedulerState *state, /* Remove this driver's tasks from the dispatch task queue. */ it = algorithm_state->dispatch_task_queue->begin(); while (it != algorithm_state->dispatch_task_queue->end()) { - if (WorkerID_equal(TaskSpec_driver_id(it->spec), driver_id)) { + TaskSpec *spec = it->Spec(); + if (WorkerID_equal(TaskSpec_driver_id(spec), driver_id)) { it = algorithm_state->dispatch_task_queue->erase(it); } else { it++; diff --git a/src/local_scheduler/local_scheduler_algorithm.h b/src/local_scheduler/local_scheduler_algorithm.h index 67f97f17d..81accd385 100644 --- a/src/local_scheduler/local_scheduler_algorithm.h +++ b/src/local_scheduler/local_scheduler_algorithm.h @@ -53,8 +53,7 @@ void provide_scheduler_info(LocalSchedulerState *state, */ void handle_task_submitted(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *spec, - int64_t task_spec_size); + TaskExecutionSpec &execution_spec); /** * This version of handle_task_submitted is used when the task being submitted @@ -67,8 +66,7 @@ void handle_task_submitted(LocalSchedulerState *state, */ void handle_actor_task_submitted(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *spec, - int64_t task_spec_size); + TaskExecutionSpec &execution_spec); /** * This function will be called when the local scheduler receives a notification @@ -98,8 +96,7 @@ void handle_actor_creation_notification( */ void handle_task_scheduled(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *spec, - int64_t task_spec_size); + TaskExecutionSpec &execution_spec); /** * This function will be called when an actor task is assigned by the global @@ -113,8 +110,7 @@ void handle_task_scheduled(LocalSchedulerState *state, */ void handle_actor_task_scheduled(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *spec, - int64_t task_spec_size); + TaskExecutionSpec &execution_spec); /** * This function is called if a new object becomes available in the local diff --git a/src/local_scheduler/local_scheduler_client.cc b/src/local_scheduler/local_scheduler_client.cc index ea34a5bfd..32fa40614 100644 --- a/src/local_scheduler/local_scheduler_client.cc +++ b/src/local_scheduler/local_scheduler_client.cc @@ -88,10 +88,17 @@ void local_scheduler_log_event(LocalSchedulerConnection *conn, } void local_scheduler_submit(LocalSchedulerConnection *conn, - TaskSpec *task, - int64_t task_size) { - write_message(conn->conn, MessageType_SubmitTask, task_size, - (uint8_t *) task); + TaskExecutionSpec &execution_spec) { + flatbuffers::FlatBufferBuilder fbb; + auto execution_dependencies = + to_flatbuf(fbb, execution_spec.ExecutionDependencies()); + auto task_spec = fbb.CreateString((char *) execution_spec.Spec(), + execution_spec.SpecSize()); + auto message = + CreateSubmitTaskRequest(fbb, execution_dependencies, task_spec); + fbb.Finish(message); + write_message(conn->conn, MessageType_SubmitTask, fbb.GetSize(), + fbb.GetBufferPointer()); } TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn, diff --git a/src/local_scheduler/local_scheduler_client.h b/src/local_scheduler/local_scheduler_client.h index 00ce0812f..58be90cb4 100644 --- a/src/local_scheduler/local_scheduler_client.h +++ b/src/local_scheduler/local_scheduler_client.h @@ -48,12 +48,11 @@ void LocalSchedulerConnection_free(LocalSchedulerConnection *conn); * Submit a task to the local scheduler. * * @param conn The connection information. - * @param task The address of the task to submit. + * @param execution_spec The execution spec for the task to submit. * @return Void. */ void local_scheduler_submit(LocalSchedulerConnection *conn, - TaskSpec *task, - int64_t task_size); + TaskExecutionSpec &execution_spec); /** * Notify the local scheduler that this client is disconnecting gracefully. This diff --git a/src/local_scheduler/local_scheduler_extension.cc b/src/local_scheduler/local_scheduler_extension.cc index b2e51a783..ff138609c 100644 --- a/src/local_scheduler/local_scheduler_extension.cc +++ b/src/local_scheduler/local_scheduler_extension.cc @@ -53,9 +53,12 @@ static PyObject *PyLocalSchedulerClient_submit(PyObject *self, PyObject *args) { if (!PyArg_ParseTuple(args, "O", &py_task)) { return NULL; } + PyTask *task = (PyTask *) py_task; + TaskExecutionSpec execution_spec = + TaskExecutionSpec(*task->execution_dependencies, task->spec, task->size); local_scheduler_submit( ((PyLocalSchedulerClient *) self)->local_scheduler_connection, - ((PyTask *) py_task)->spec, ((PyTask *) py_task)->size); + execution_spec); Py_RETURN_NONE; } diff --git a/src/local_scheduler/test/local_scheduler_tests.cc b/src/local_scheduler/test/local_scheduler_tests.cc index 29b3f4fe4..6e532c3c6 100644 --- a/src/local_scheduler/test/local_scheduler_tests.cc +++ b/src/local_scheduler/test/local_scheduler_tests.cc @@ -175,8 +175,9 @@ TEST object_reconstruction_test(void) { LocalSchedulerConnection *worker = local_scheduler->conns[0]; /* Create a task with zero dependencies and one return value. */ - int64_t task_size; - TaskSpec *spec = example_task_spec(0, 1, &task_size); + TaskExecutionSpec execution_spec = example_task_execution_spec(0, 1); + TaskSpec *spec = execution_spec.Spec(); + int64_t task_size = execution_spec.SpecSize(); ObjectID return_id = TaskSpec_return(spec, 0); /* Add an empty object table entry for the object we want to reconstruct, to @@ -207,7 +208,7 @@ TEST object_reconstruction_test(void) { /* Make sure we receive the task twice. First from the initial submission, * and second from the reconstruct request. */ int64_t task_assigned_size; - local_scheduler_submit(worker, spec, task_size); + local_scheduler_submit(worker, execution_spec); TaskSpec *task_assigned = local_scheduler_get_task(worker, &task_assigned_size, true); ASSERT_EQ(memcmp(task_assigned, spec, task_size), 0); @@ -220,7 +221,6 @@ TEST object_reconstruction_test(void) { /* Clean up. */ free(reconstruct_task); free(task_assigned); - TaskSpec_free(spec); LocalSchedulerMock_free(local_scheduler); exit(0); } else { @@ -232,7 +232,7 @@ TEST object_reconstruction_test(void) { /* Set the task's status to TASK_STATUS_DONE to prevent the race condition * that would suppress object reconstruction. */ Task *task = Task_alloc( - spec, task_size, TASK_STATUS_DONE, + execution_spec, TASK_STATUS_DONE, get_db_client_id(local_scheduler->local_scheduler_state->db)); task_table_add_task(local_scheduler->local_scheduler_state->db, task, NULL, NULL, NULL); @@ -245,7 +245,6 @@ TEST object_reconstruction_test(void) { /* Wait for the child process to exit and check that there are no tasks * left in the local scheduler's task queue. Then, clean up. */ wait(NULL); - TaskSpec_free(spec); ASSERT_EQ(num_waiting_tasks( local_scheduler->local_scheduler_state->algorithm_state), 0); @@ -268,15 +267,14 @@ TEST object_reconstruction_recursive_test(void) { /* Create a chain of tasks, each one dependent on the one before it. Mark * each object as available so that tasks will run immediately. */ const int NUM_TASKS = 10; - TaskSpec *specs[NUM_TASKS]; - int64_t task_sizes[NUM_TASKS]; - specs[0] = example_task_spec(0, 1, &task_sizes[0]); + std::vector specs; + specs.push_back(example_task_execution_spec(0, 1)); for (int i = 1; i < NUM_TASKS; ++i) { - ObjectID arg_id = TaskSpec_return(specs[i - 1], 0); + ObjectID arg_id = TaskSpec_return(specs[i - 1].Spec(), 0); handle_object_available( local_scheduler->local_scheduler_state, local_scheduler->local_scheduler_state->algorithm_state, arg_id); - specs[i] = example_task_spec_with_args(1, 1, &arg_id, &task_sizes[i]); + specs.push_back(example_task_execution_spec_with_args(1, 1, &arg_id)); } /* Add an empty object table entry for each object we want to reconstruct, to @@ -293,7 +291,7 @@ TEST object_reconstruction_recursive_test(void) { ASSERT(db_shards_addresses.size() == 1); context = redisConnect(db_shards_addresses[0].c_str(), db_shards_ports[0]); for (int i = 0; i < NUM_TASKS; ++i) { - ObjectID return_id = TaskSpec_return(specs[i], 0); + ObjectID return_id = TaskSpec_return(specs[i].Spec(), 0); redisReply *reply = (redisReply *) redisCommand( context, "RAY.OBJECT_TABLE_ADD %b %ld %b %s", return_id.id, sizeof(return_id.id), 1, NIL_DIGEST, (size_t) DIGEST_SIZE, client_id); @@ -309,15 +307,15 @@ TEST object_reconstruction_recursive_test(void) { if (pid == 0) { /* Submit the tasks, and make sure each one gets assigned to a worker. */ for (int i = 0; i < NUM_TASKS; ++i) { - local_scheduler_submit(worker, specs[i], task_sizes[i]); + local_scheduler_submit(worker, specs[i]); } /* Make sure we receive each task from the initial submission. */ for (int i = 0; i < NUM_TASKS; ++i) { int64_t task_size; TaskSpec *task_assigned = local_scheduler_get_task(worker, &task_size, true); - ASSERT_EQ(memcmp(task_assigned, specs[i], task_sizes[i]), 0); - ASSERT_EQ(task_size, task_sizes[i]); + ASSERT_EQ(memcmp(task_assigned, specs[i].Spec(), specs[i].SpecSize()), 0); + ASSERT_EQ(task_size, specs[i].SpecSize()); free(task_assigned); } /* Check that the workers receive all tasks in the final return object's @@ -326,20 +324,15 @@ TEST object_reconstruction_recursive_test(void) { int64_t task_assigned_size; TaskSpec *task_assigned = local_scheduler_get_task(worker, &task_assigned_size, true); - bool found = false; - for (int j = 0; j < NUM_TASKS; ++j) { - if (specs[j] == NULL) { - continue; - } - if (memcmp(task_assigned, specs[j], task_assigned_size) == 0) { - found = true; - TaskSpec_free(specs[j]); - specs[j] = NULL; + for (auto it = specs.begin(); it != specs.end(); it++) { + if (memcmp(task_assigned, it->Spec(), task_assigned_size) == 0) { + specs.erase(it); + break; } } free(task_assigned); - ASSERT(found); } + ASSERT(specs.size() == 0); LocalSchedulerMock_free(local_scheduler); exit(0); } else { @@ -351,13 +344,13 @@ TEST object_reconstruction_recursive_test(void) { /* Set the final task's status to TASK_STATUS_DONE to prevent the race * condition that would suppress object reconstruction. */ Task *last_task = Task_alloc( - specs[NUM_TASKS - 1], task_sizes[NUM_TASKS - 1], TASK_STATUS_DONE, + specs[NUM_TASKS - 1], TASK_STATUS_DONE, get_db_client_id(local_scheduler->local_scheduler_state->db)); task_table_add_task(local_scheduler->local_scheduler_state->db, last_task, NULL, NULL, NULL); /* Trigger reconstruction for the last object, and run the event loop * again. */ - ObjectID return_id = TaskSpec_return(specs[NUM_TASKS - 1], 0); + ObjectID return_id = TaskSpec_return(specs[NUM_TASKS - 1].Spec(), 0); local_scheduler_reconstruct_object(worker, return_id); event_loop_add_timer(local_scheduler->loop, 500, (event_loop_timer_handler) timeout_handler, NULL); @@ -371,9 +364,7 @@ TEST object_reconstruction_recursive_test(void) { ASSERT_EQ(num_dispatch_tasks( local_scheduler->local_scheduler_state->algorithm_state), 0); - for (int i = 0; i < NUM_TASKS; ++i) { - TaskSpec_free(specs[i]); - } + specs.clear(); LocalSchedulerMock_free(local_scheduler); PASS(); } @@ -383,8 +374,7 @@ TEST object_reconstruction_recursive_test(void) { * Test that object reconstruction gets suppressed when there is a location * listed for the object in the object table. */ -TaskSpec *object_reconstruction_suppression_spec; -int64_t object_reconstruction_suppression_size; +TaskExecutionSpec *object_reconstruction_suppression_spec; void object_reconstruction_suppression_callback(ObjectID object_id, bool success, @@ -392,18 +382,17 @@ void object_reconstruction_suppression_callback(ObjectID object_id, CHECK(success); /* Submit the task after adding the object to the object table. */ LocalSchedulerConnection *worker = (LocalSchedulerConnection *) user_context; - local_scheduler_submit(worker, object_reconstruction_suppression_spec, - object_reconstruction_suppression_size); + local_scheduler_submit(worker, *object_reconstruction_suppression_spec); } TEST object_reconstruction_suppression_test(void) { LocalSchedulerMock *local_scheduler = LocalSchedulerMock_init(0, 1); LocalSchedulerConnection *worker = local_scheduler->conns[0]; - object_reconstruction_suppression_spec = - example_task_spec(0, 1, &object_reconstruction_suppression_size); + TaskExecutionSpec execution_spec = example_task_execution_spec(0, 1); + object_reconstruction_suppression_spec = &execution_spec; ObjectID return_id = - TaskSpec_return(object_reconstruction_suppression_spec, 0); + TaskSpec_return(object_reconstruction_suppression_spec->Spec(), 0); pid_t pid = fork(); if (pid == 0) { /* Make sure we receive the task once. This will block until the @@ -411,15 +400,15 @@ TEST object_reconstruction_suppression_test(void) { int64_t task_assigned_size; TaskSpec *task_assigned = local_scheduler_get_task(worker, &task_assigned_size, true); - ASSERT_EQ(memcmp(task_assigned, object_reconstruction_suppression_spec, - object_reconstruction_suppression_size), - 0); + ASSERT_EQ( + memcmp(task_assigned, object_reconstruction_suppression_spec->Spec(), + object_reconstruction_suppression_spec->SpecSize()), + 0); /* Trigger a reconstruction. We will check that no tasks get queued as a * result of this line in the event loop process. */ local_scheduler_reconstruct_object(worker, return_id); /* Clean up. */ free(task_assigned); - TaskSpec_free(object_reconstruction_suppression_spec); LocalSchedulerMock_free(local_scheduler); exit(0); } else { @@ -448,7 +437,6 @@ TEST object_reconstruction_suppression_test(void) { ASSERT_EQ(num_dispatch_tasks( local_scheduler->local_scheduler_state->algorithm_state), 0); - TaskSpec_free(object_reconstruction_suppression_spec); db_disconnect(db); LocalSchedulerMock_free(local_scheduler); PASS(); @@ -461,13 +449,13 @@ TEST task_dependency_test(void) { SchedulingAlgorithmState *algorithm_state = state->algorithm_state; /* Get the first worker. */ LocalSchedulerClient *worker = state->workers.front(); - int64_t task_size; - TaskSpec *spec = example_task_spec(1, 1, &task_size); + TaskExecutionSpec execution_spec = example_task_execution_spec(1, 1); + TaskSpec *spec = execution_spec.Spec(); ObjectID oid = TaskSpec_arg_id(spec, 0, 0); /* Check that the task gets queued in the waiting queue if the task is * submitted, but the input and workers are not available. */ - handle_task_submitted(state, algorithm_state, spec, task_size); + handle_task_submitted(state, algorithm_state, execution_spec); ASSERT_EQ(num_waiting_tasks(algorithm_state), 1); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); /* Once the input is available, the task gets moved to the dispatch queue. */ @@ -483,7 +471,7 @@ TEST task_dependency_test(void) { /* Check that the task gets queued in the waiting queue if the task is * submitted and a worker is available, but the input is not. */ handle_object_removed(state, oid); - handle_task_submitted(state, algorithm_state, spec, task_size); + handle_task_submitted(state, algorithm_state, execution_spec); handle_worker_available(state, algorithm_state, worker); ASSERT_EQ(num_waiting_tasks(algorithm_state), 1); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); @@ -495,7 +483,7 @@ TEST task_dependency_test(void) { /* Check that the task gets queued in the dispatch queue if the task is * submitted and the input is available, but no worker is available yet. */ - handle_task_submitted(state, algorithm_state, spec, task_size); + handle_task_submitted(state, algorithm_state, execution_spec); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 1); /* Once a worker is available, the task gets assigned. */ @@ -507,7 +495,7 @@ TEST task_dependency_test(void) { /* If an object gets removed, check the first scenario again, where the task * gets queued in the waiting task if the task is submitted and a worker is * available, but the input is not. */ - handle_task_submitted(state, algorithm_state, spec, task_size); + handle_task_submitted(state, algorithm_state, execution_spec); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 1); /* If the input is removed while a task is in the dispatch queue, the task @@ -525,7 +513,6 @@ TEST task_dependency_test(void) { ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); - TaskSpec_free(spec); LocalSchedulerMock_free(local_scheduler); PASS(); } @@ -536,14 +523,14 @@ TEST task_multi_dependency_test(void) { SchedulingAlgorithmState *algorithm_state = state->algorithm_state; /* Get the first worker. */ LocalSchedulerClient *worker = state->workers.front(); - int64_t task_size; - TaskSpec *spec = example_task_spec(2, 1, &task_size); + TaskExecutionSpec execution_spec = example_task_execution_spec(2, 1); + TaskSpec *spec = execution_spec.Spec(); ObjectID oid1 = TaskSpec_arg_id(spec, 0, 0); ObjectID oid2 = TaskSpec_arg_id(spec, 1, 0); /* Check that the task gets queued in the waiting queue if the task is * submitted, but the inputs and workers are not available. */ - handle_task_submitted(state, algorithm_state, spec, task_size); + handle_task_submitted(state, algorithm_state, execution_spec); ASSERT_EQ(num_waiting_tasks(algorithm_state), 1); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); /* Check that the task stays in the waiting queue if only one input becomes @@ -563,7 +550,7 @@ TEST task_multi_dependency_test(void) { /* Check that the task gets queued in the dispatch queue if the task is * submitted and the inputs are available, but no worker is available yet. */ - handle_task_submitted(state, algorithm_state, spec, task_size); + handle_task_submitted(state, algorithm_state, execution_spec); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 1); /* If any input is removed while a task is in the dispatch queue, the task @@ -599,7 +586,6 @@ TEST task_multi_dependency_test(void) { ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); reset_worker(local_scheduler, worker); - TaskSpec_free(spec); LocalSchedulerMock_free(local_scheduler); PASS(); } diff --git a/src/plasma/plasma_manager.cc b/src/plasma/plasma_manager.cc index c774a9541..f5ad1260b 100644 --- a/src/plasma/plasma_manager.cc +++ b/src/plasma/plasma_manager.cc @@ -1251,7 +1251,7 @@ void log_object_hash_mismatch_error_task_callback(Task *task, void *user_context) { CHECK(task != NULL); PlasmaManagerState *state = (PlasmaManagerState *) user_context; - TaskSpec *spec = Task_task_spec(task); + TaskSpec *spec = Task_task_execution_spec(task)->Spec(); FunctionID function = TaskSpec_function(spec); /* Push the error to the Python driver that caused the nondeterministic task * to be submitted. */