diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 108ee0a68..d1c6d5234 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -260,8 +260,9 @@ class GlobalState(object): args = [] for i in range(task_spec_message.ArgsLength()): arg = task_spec_message.Args(i) - if len(arg.ObjectId()) != 0: - args.append(binary_to_object_id(arg.ObjectId())) + if arg.ObjectIdsLength() != 0: + for j in range(arg.ObjectIdsLength()): + args.append(binary_to_object_id(arg.ObjectIds(j))) else: args.append(pickle.loads(arg.Data())) # TODO(atumanov): Instead of hard coding these indices, we should use diff --git a/src/common/format/common.fbs b/src/common/format/common.fbs index 58ed7c356..389d83e63 100644 --- a/src/common/format/common.fbs +++ b/src/common/format/common.fbs @@ -17,8 +17,11 @@ enum ResourceIndex:int { } table Arg { - // Object ID for pass-by-reference arguments. - object_id: string; + // Object ID for pass-by-reference arguments. Normally there is only one + // object ID in this list which represents the object that is being passed. + // However to support reducers in a MapReduce workload, we also support + // passing multiple object IDs for each argument. + object_ids: [string]; // Data for pass-by-value arguments. data: string; } diff --git a/src/common/lib/python/common_extension.cc b/src/common/lib/python/common_extension.cc index 327361ba2..a592109ab 100644 --- a/src/common/lib/python/common_extension.cc +++ b/src/common/lib/python/common_extension.cc @@ -315,7 +315,8 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { for (Py_ssize_t i = 0; i < size; ++i) { PyObject *arg = PyList_GetItem(arguments, i); if (PyObject_IsInstance(arg, (PyObject *) &PyObjectIDType)) { - TaskSpec_args_add_ref(g_task_builder, ((PyObjectID *) arg)->object_id); + TaskSpec_args_add_ref(g_task_builder, &((PyObjectID *) arg)->object_id, + 1); } else { /* We do this check because we cast a signed int to an unsigned int. */ PyObject *data = PyObject_CallMethodObjArgs(pickle_module, pickle_dumps, @@ -391,9 +392,10 @@ static PyObject *PyTask_arguments(PyObject *self) { int64_t num_args = TaskSpec_num_args(task); PyObject *arg_list = PyList_New((Py_ssize_t) num_args); for (int i = 0; i < num_args; ++i) { - if (TaskSpec_arg_by_ref(task, i)) { - ObjectID object_id = TaskSpec_arg_id(task, i); - PyList_SetItem(arg_list, i, PyObjectID_make(object_id)); + int count = TaskSpec_arg_id_count(task, i); + if (count > 0) { + assert(count == 1); + PyList_SetItem(arg_list, i, PyObjectID_make(TaskSpec_arg_id(task, i, 0))); } else { CHECK(pickle_module != NULL); CHECK(pickle_loads != NULL); diff --git a/src/common/task.cc b/src/common/task.cc index 044875fba..baa4c139f 100644 --- a/src/common/task.cc +++ b/src/common/task.cc @@ -65,15 +65,17 @@ class TaskBuilder { sha256_update(&ctx, (BYTE *) &function_id, sizeof(function_id)); } - void NextReferenceArgument(ObjectID object_id) { - args.push_back(CreateArg(fbb, to_flatbuf(fbb, object_id))); - sha256_update(&ctx, (BYTE *) &object_id, sizeof(object_id)); + void NextReferenceArgument(ObjectID object_ids[], int num_object_ids) { + args.push_back( + CreateArg(fbb, to_flatbuf(fbb, &object_ids[0], num_object_ids))); + sha256_update(&ctx, (BYTE *) &object_ids[0], + sizeof(object_ids[0]) * num_object_ids); } void NextValueArgument(uint8_t *value, int64_t length) { auto arg = fbb.CreateString((const char *) value, length); - auto empty_id = fbb.CreateString("", 0); - args.push_back(CreateArg(fbb, empty_id, arg)); + auto empty_ids = fbb.CreateVectorOfStrings({}); + args.push_back(CreateArg(fbb, empty_ids, arg)); sha256_update(&ctx, (BYTE *) value, length); } @@ -190,8 +192,10 @@ uint8_t *TaskSpec_finish_construct(TaskBuilder *builder, int64_t *size) { return builder->Finish(size); } -void TaskSpec_args_add_ref(TaskBuilder *builder, ObjectID object_id) { - builder->NextReferenceArgument(object_id); +void TaskSpec_args_add_ref(TaskBuilder *builder, + ObjectID object_ids[], + int num_object_ids) { + builder->NextReferenceArgument(&object_ids[0], num_object_ids); } void TaskSpec_args_add_val(TaskBuilder *builder, @@ -285,10 +289,18 @@ int64_t TaskSpec_num_args(TaskSpec *spec) { return message->args()->size(); } -ObjectID TaskSpec_arg_id(TaskSpec *spec, int64_t arg_index) { +int TaskSpec_arg_id_count(TaskSpec *spec, int64_t arg_index) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); - return from_flatbuf(message->args()->Get(arg_index)->object_id()); + auto ids = message->args()->Get(arg_index)->object_ids(); + return ids->size(); +} + +ObjectID TaskSpec_arg_id(TaskSpec *spec, int64_t arg_index, int64_t id_index) { + CHECK(spec); + auto message = flatbuffers::GetRoot(spec); + return from_flatbuf( + message->args()->Get(arg_index)->object_ids()->Get(id_index)); } const uint8_t *TaskSpec_arg_val(TaskSpec *spec, int64_t arg_index) { @@ -312,7 +324,7 @@ int64_t TaskSpec_num_returns(TaskSpec *spec) { bool TaskSpec_arg_by_ref(TaskSpec *spec, int64_t arg_index) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); - return message->args()->Get(arg_index)->object_id()->size() != 0; + return message->args()->Get(arg_index)->object_ids()->size() != 0; } ObjectID TaskSpec_return(TaskSpec *spec, int64_t return_index) { @@ -331,8 +343,9 @@ double TaskSpec_get_required_resource(const TaskSpec *spec, bool TaskSpec_is_dependent_on(TaskSpec *spec, ObjectID object_id) { int64_t num_args = TaskSpec_num_args(spec); for (int i = 0; i < num_args; ++i) { - if (TaskSpec_arg_by_ref(spec, i)) { - ObjectID arg_id = TaskSpec_arg_id(spec, i); + int count = TaskSpec_arg_id_count(spec, i); + for (int j = 0; j < count; j++) { + ObjectID arg_id = TaskSpec_arg_id(spec, i, j); if (ObjectID_equal(arg_id, object_id)) { return true; } diff --git a/src/common/task.h b/src/common/task.h index 588f5c92e..c466536eb 100644 --- a/src/common/task.h +++ b/src/common/task.h @@ -238,15 +238,25 @@ int64_t TaskSpec_num_returns(TaskSpec *spec); */ bool TaskSpec_arg_by_ref(TaskSpec *spec, int64_t arg_index); +/** + * Get number of object IDs in a given argument + * + * @param spec The task_spec in question. + * @param arg_index The index of the argument in question. + * @return number of object IDs in this argument + */ +int TaskSpec_arg_id_count(TaskSpec *spec, int64_t arg_index); + /** * Get a particular argument to this task. This assumes the argument is an * object ID. * * @param spec The task_spec in question. * @param arg_index The index of the argument in question. + * @param id_index The index of the object ID in this arg. * @return The argument at that index. */ -ObjectID TaskSpec_arg_id(TaskSpec *spec, int64_t arg_index); +ObjectID TaskSpec_arg_id(TaskSpec *spec, int64_t arg_index, int64_t id_index); /** * Get a particular argument to this task. This assumes the argument is a value. @@ -272,11 +282,14 @@ int64_t TaskSpec_arg_length(TaskSpec *spec, int64_t arg_index); * arguments in their order of appearance. * * @param spec The task_spec in question. - * @param The object ID to set the argument to. + * @param object_ids The object IDs to set the argument to. + * @param num_object_ids number of IDs in this param, usually 1. * @return The number of task arguments that have been set before this one. This * is only used for testing. */ -void TaskSpec_args_add_ref(TaskBuilder *spec, ObjectID obj_id); +void TaskSpec_args_add_ref(TaskBuilder *spec, + ObjectID object_ids[], + int num_object_ids); /** * Set the next task argument. Note that this API only allows you to set the diff --git a/src/common/test/example_task.h b/src/common/test/example_task.h index fce0697b2..ba26da7f5 100644 --- a/src/common/test/example_task.h +++ b/src/common/test/example_task.h @@ -23,7 +23,7 @@ static inline TaskSpec *example_task_spec_with_args(int64_t num_args, } else { arg_id = arg_ids[i]; } - TaskSpec_args_add_ref(g_task_builder, arg_id); + TaskSpec_args_add_ref(g_task_builder, &arg_id, 1); } return TaskSpec_finish_construct(g_task_builder, task_spec_size); } diff --git a/src/common/test/task_tests.cc b/src/common/test/task_tests.cc index f22bf28d0..303ec6392 100644 --- a/src/common/test/task_tests.cc +++ b/src/common/test/task_tests.cc @@ -19,10 +19,10 @@ TEST task_test(void) { NIL_ACTOR_ID, 0, false, func_id, 2); UniqueID arg1 = globally_unique_id(); - TaskSpec_args_add_ref(builder, arg1); + TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, (uint8_t *) "hello", 5); UniqueID arg2 = globally_unique_id(); - TaskSpec_args_add_ref(builder, arg2); + TaskSpec_args_add_ref(builder, &arg2, 1); TaskSpec_args_add_val(builder, (uint8_t *) "world", 5); /* Finish constructing the spec. This constructs the task ID and the * return IDs. */ @@ -33,10 +33,10 @@ TEST task_test(void) { ASSERT(TaskSpec_num_args(spec) == 4); ASSERT(TaskSpec_num_returns(spec) == 2); ASSERT(FunctionID_equal(TaskSpec_function(spec), func_id)); - ASSERT(ObjectID_equal(TaskSpec_arg_id(spec, 0), arg1)); + ASSERT(ObjectID_equal(TaskSpec_arg_id(spec, 0, 0), arg1)); ASSERT(memcmp(TaskSpec_arg_val(spec, 1), (uint8_t *) "hello", TaskSpec_arg_length(spec, 1)) == 0); - ASSERT(ObjectID_equal(TaskSpec_arg_id(spec, 2), arg2)); + ASSERT(ObjectID_equal(TaskSpec_arg_id(spec, 2, 0), arg2)); ASSERT(memcmp(TaskSpec_arg_val(spec, 3), (uint8_t *) "world", TaskSpec_arg_length(spec, 3)) == 0); @@ -56,7 +56,7 @@ TEST deterministic_ids_test(void) { /* Construct a first task. */ TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, NIL_ACTOR_ID, 0, false, func_id, 3); - TaskSpec_args_add_ref(builder, arg1); + TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size1; TaskSpec *spec1 = TaskSpec_finish_construct(builder, &size1); @@ -64,7 +64,7 @@ TEST deterministic_ids_test(void) { /* Construct a second identical task. */ TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, NIL_ACTOR_ID, 0, false, func_id, 3); - TaskSpec_args_add_ref(builder, arg1); + TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size2; TaskSpec *spec2 = TaskSpec_finish_construct(builder, &size2); @@ -84,7 +84,7 @@ TEST deterministic_ids_test(void) { /* Construct a task with a different parent task ID. */ TaskSpec_start_construct(builder, NIL_ID, globally_unique_id(), 0, NIL_ACTOR_ID, NIL_ACTOR_ID, 0, false, func_id, 3); - TaskSpec_args_add_ref(builder, arg1); + TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size3; TaskSpec *spec3 = TaskSpec_finish_construct(builder, &size3); @@ -92,7 +92,7 @@ TEST deterministic_ids_test(void) { /* Construct a task with a different parent counter. */ TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 1, NIL_ACTOR_ID, NIL_ACTOR_ID, 0, false, func_id, 3); - TaskSpec_args_add_ref(builder, arg1); + TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size4; TaskSpec *spec4 = TaskSpec_finish_construct(builder, &size4); @@ -100,7 +100,7 @@ TEST deterministic_ids_test(void) { /* Construct a task with a different function ID. */ TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, NIL_ACTOR_ID, 0, false, globally_unique_id(), 3); - TaskSpec_args_add_ref(builder, arg1); + TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size5; TaskSpec *spec5 = TaskSpec_finish_construct(builder, &size5); @@ -108,7 +108,8 @@ TEST deterministic_ids_test(void) { /* Construct a task with a different object ID argument. */ TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, NIL_ACTOR_ID, 0, false, func_id, 3); - TaskSpec_args_add_ref(builder, globally_unique_id()); + ObjectID object_id = globally_unique_id(); + TaskSpec_args_add_ref(builder, &object_id, 1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size6; TaskSpec *spec6 = TaskSpec_finish_construct(builder, &size6); @@ -116,7 +117,7 @@ TEST deterministic_ids_test(void) { /* Construct a task with a different value argument. */ TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, NIL_ACTOR_ID, 0, false, func_id, 3); - TaskSpec_args_add_ref(builder, arg1); + TaskSpec_args_add_ref(builder, &arg1, 1); TaskSpec_args_add_val(builder, (uint8_t *) "hello_world", 11); int64_t size7; TaskSpec *spec7 = TaskSpec_finish_construct(builder, &size7); @@ -161,10 +162,12 @@ TEST send_task(void) { FunctionID func_id = globally_unique_id(); TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, NIL_ACTOR_ID, 0, false, func_id, 2); - TaskSpec_args_add_ref(builder, globally_unique_id()); + ObjectID object_id = globally_unique_id(); + TaskSpec_args_add_ref(builder, &object_id, 1); TaskSpec_args_add_val(builder, (uint8_t *) "Hello", 5); TaskSpec_args_add_val(builder, (uint8_t *) "World", 5); - TaskSpec_args_add_ref(builder, globally_unique_id()); + object_id = globally_unique_id(); + TaskSpec_args_add_ref(builder, &object_id, 1); int64_t size; TaskSpec *spec = TaskSpec_finish_construct(builder, &size); int fd[2]; diff --git a/src/global_scheduler/global_scheduler_algorithm.cc b/src/global_scheduler/global_scheduler_algorithm.cc index 79959d714..cb6d3caa3 100644 --- a/src/global_scheduler/global_scheduler_algorithm.cc +++ b/src/global_scheduler/global_scheduler_algorithm.cc @@ -94,43 +94,40 @@ int64_t locally_available_data_size(const GlobalSchedulerState *state, /* TODO(rkn): Note that if the same object ID appears as multiple arguments, * then it will be overcounted. */ for (int64_t i = 0; i < TaskSpec_num_args(task_spec); ++i) { - if (!TaskSpec_arg_by_ref(task_spec, i)) { - /* Ignore arguments that are not object IDs since these are serialized as - * part of the task spec and so they don't require any data transfer. */ - continue; + int count = TaskSpec_arg_id_count(task_spec, i); + for (int j = 0; j < count; ++j) { + ObjectID object_id = TaskSpec_arg_id(task_spec, i, j); + + if (state->scheduler_object_info_table.count(object_id) == 0) { + /* If this global scheduler is not aware of this object ID, then ignore + * it. */ + continue; + } + + const SchedulerObjectInfo &object_size_info = + state->scheduler_object_info_table.at(object_id); + + if (std::find(object_size_info.object_locations.begin(), + object_size_info.object_locations.end(), plasma_manager) == + object_size_info.object_locations.end()) { + /* This local scheduler does not have access to this object, so don't + * count this object. */ + continue; + } + + /* Look at the size of the object. */ + int64_t object_size = object_size_info.data_size; + if (object_size == -1) { + /* This means that this global scheduler does not know the object size + * yet, so assume that the object is one megabyte. TODO(rkn): Maybe we + * should instead use the average object size. */ + object_size = 1000000; + } + + /* If we get here, then this local scheduler has access to this object, so + * count the contribution of this object. */ + task_data_size += object_size; } - - ObjectID object_id = TaskSpec_arg_id(task_spec, i); - - if (state->scheduler_object_info_table.count(object_id) == 0) { - /* If this global scheduler is not aware of this object ID, then ignore - * it.*/ - continue; - } - - const SchedulerObjectInfo &object_size_info = - state->scheduler_object_info_table.at(object_id); - - if (std::find(object_size_info.object_locations.begin(), - object_size_info.object_locations.end(), - plasma_manager) == object_size_info.object_locations.end()) { - /* This local scheduler does not have access to this object, so don't - * count this object. */ - continue; - } - - /* Look at the size of the object. */ - int64_t object_size = object_size_info.data_size; - if (object_size == -1) { - /* This means that this global scheduler does not know the object size - * yet, so assume that the object is one megabyte. TODO(rkn): Maybe we - * should instead use the average object size. */ - object_size = 1000000; - } - - /* If we get here, then this local scheduler has access to this object, so - * count the contribution of this object. */ - task_data_size += object_size; } return task_data_size; diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index a29c7a3a6..3da8da1fc 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -675,8 +675,9 @@ void reconstruct_task_update_callback(Task *task, /* Recursively reconstruct the task's inputs, if necessary. */ for (int64_t i = 0; i < TaskSpec_num_args(spec); ++i) { - if (TaskSpec_arg_by_ref(spec, i)) { - ObjectID arg_id = TaskSpec_arg_id(spec, i); + int count = TaskSpec_arg_id_count(spec, i); + for (int64_t j = 0; j < count; ++j) { + ObjectID arg_id = TaskSpec_arg_id(spec, i, j); reconstruct_object(state, arg_id); } } diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index c9784932a..e1dded718 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -583,8 +583,9 @@ void fetch_missing_dependencies( int64_t num_args = TaskSpec_num_args(task); int num_missing_dependencies = 0; for (int64_t i = 0; i < num_args; ++i) { - if (TaskSpec_arg_by_ref(task, i)) { - ObjectID obj_id = TaskSpec_arg_id(task, i); + int count = TaskSpec_arg_id_count(task, i); + for (int j = 0; j < count; ++j) { + ObjectID obj_id = TaskSpec_arg_id(task, i, j); if (algorithm_state->local_objects.count(obj_id) == 0) { /* If the entry is not yet available locally, record the dependency. */ fetch_missing_dependency(state, algorithm_state, task_entry_it, @@ -609,8 +610,9 @@ void fetch_missing_dependencies( bool can_run(SchedulingAlgorithmState *algorithm_state, TaskSpec *task) { int64_t num_args = TaskSpec_num_args(task); for (int i = 0; i < num_args; ++i) { - if (TaskSpec_arg_by_ref(task, i)) { - ObjectID obj_id = TaskSpec_arg_id(task, i); + int count = TaskSpec_arg_id_count(task, i); + for (int j = 0; j < count; ++j) { + ObjectID obj_id = TaskSpec_arg_id(task, i, j); if (algorithm_state->local_objects.count(obj_id) == 0) { /* The object is not present locally, so this task cannot be scheduled * right now. */ @@ -1443,8 +1445,9 @@ void handle_object_removed(LocalSchedulerState *state, it != algorithm_state->waiting_task_queue->end(); ++it) { int64_t num_args = TaskSpec_num_args(it->spec); for (int64_t i = 0; i < num_args; ++i) { - if (TaskSpec_arg_by_ref(it->spec, i)) { - ObjectID arg_id = TaskSpec_arg_id(it->spec, i); + int count = TaskSpec_arg_id_count(it->spec, i); + for (int j = 0; j < count; ++j) { + ObjectID arg_id = TaskSpec_arg_id(it->spec, i, j); if (ObjectID_equal(arg_id, removed_object_id)) { fetch_missing_dependency(state, algorithm_state, it, removed_object_id.to_plasma_id(), i); diff --git a/src/local_scheduler/test/local_scheduler_tests.cc b/src/local_scheduler/test/local_scheduler_tests.cc index 3b17b8646..23d2dd4c4 100644 --- a/src/local_scheduler/test/local_scheduler_tests.cc +++ b/src/local_scheduler/test/local_scheduler_tests.cc @@ -463,7 +463,7 @@ TEST task_dependency_test(void) { LocalSchedulerClient *worker = state->workers.front(); int64_t task_size; TaskSpec *spec = example_task_spec(1, 1, &task_size); - ObjectID oid = TaskSpec_arg_id(spec, 0); + ObjectID oid = TaskSpec_arg_id(spec, 0, 0); /* Check that the task gets queued in the waiting queue if the task is * submitted, but the input and workers are not available. */ @@ -538,8 +538,8 @@ TEST task_multi_dependency_test(void) { LocalSchedulerClient *worker = state->workers.front(); int64_t task_size; TaskSpec *spec = example_task_spec(2, 1, &task_size); - ObjectID oid1 = TaskSpec_arg_id(spec, 0); - ObjectID oid2 = TaskSpec_arg_id(spec, 1); + ObjectID oid1 = TaskSpec_arg_id(spec, 0, 0); + ObjectID oid2 = TaskSpec_arg_id(spec, 1, 0); /* Check that the task gets queued in the waiting queue if the task is * submitted, but the inputs and workers are not available. */