Change TaskSpec to allow multiple object IDs per argument. (#1204)

* Implement object ID bags

* linting

* fix tests

* fix linting

* fix comments
This commit is contained in:
Philipp Moritz
2017-11-10 16:33:34 -08:00
committed by Robert Nishihara
parent 07f0532b9b
commit e798a652bc
11 changed files with 120 additions and 84 deletions
+3 -2
View File
@@ -260,8 +260,9 @@ class GlobalState(object):
args = []
for i in range(task_spec_message.ArgsLength()):
arg = task_spec_message.Args(i)
if len(arg.ObjectId()) != 0:
args.append(binary_to_object_id(arg.ObjectId()))
if arg.ObjectIdsLength() != 0:
for j in range(arg.ObjectIdsLength()):
args.append(binary_to_object_id(arg.ObjectIds(j)))
else:
args.append(pickle.loads(arg.Data()))
# TODO(atumanov): Instead of hard coding these indices, we should use
+5 -2
View File
@@ -17,8 +17,11 @@ enum ResourceIndex:int {
}
table Arg {
// Object ID for pass-by-reference arguments.
object_id: string;
// Object ID for pass-by-reference arguments. Normally there is only one
// object ID in this list which represents the object that is being passed.
// However to support reducers in a MapReduce workload, we also support
// passing multiple object IDs for each argument.
object_ids: [string];
// Data for pass-by-value arguments.
data: string;
}
+6 -4
View File
@@ -315,7 +315,8 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) {
for (Py_ssize_t i = 0; i < size; ++i) {
PyObject *arg = PyList_GetItem(arguments, i);
if (PyObject_IsInstance(arg, (PyObject *) &PyObjectIDType)) {
TaskSpec_args_add_ref(g_task_builder, ((PyObjectID *) arg)->object_id);
TaskSpec_args_add_ref(g_task_builder, &((PyObjectID *) arg)->object_id,
1);
} else {
/* We do this check because we cast a signed int to an unsigned int. */
PyObject *data = PyObject_CallMethodObjArgs(pickle_module, pickle_dumps,
@@ -391,9 +392,10 @@ static PyObject *PyTask_arguments(PyObject *self) {
int64_t num_args = TaskSpec_num_args(task);
PyObject *arg_list = PyList_New((Py_ssize_t) num_args);
for (int i = 0; i < num_args; ++i) {
if (TaskSpec_arg_by_ref(task, i)) {
ObjectID object_id = TaskSpec_arg_id(task, i);
PyList_SetItem(arg_list, i, PyObjectID_make(object_id));
int count = TaskSpec_arg_id_count(task, i);
if (count > 0) {
assert(count == 1);
PyList_SetItem(arg_list, i, PyObjectID_make(TaskSpec_arg_id(task, i, 0)));
} else {
CHECK(pickle_module != NULL);
CHECK(pickle_loads != NULL);
+25 -12
View File
@@ -65,15 +65,17 @@ class TaskBuilder {
sha256_update(&ctx, (BYTE *) &function_id, sizeof(function_id));
}
void NextReferenceArgument(ObjectID object_id) {
args.push_back(CreateArg(fbb, to_flatbuf(fbb, object_id)));
sha256_update(&ctx, (BYTE *) &object_id, sizeof(object_id));
void NextReferenceArgument(ObjectID object_ids[], int num_object_ids) {
args.push_back(
CreateArg(fbb, to_flatbuf(fbb, &object_ids[0], num_object_ids)));
sha256_update(&ctx, (BYTE *) &object_ids[0],
sizeof(object_ids[0]) * num_object_ids);
}
void NextValueArgument(uint8_t *value, int64_t length) {
auto arg = fbb.CreateString((const char *) value, length);
auto empty_id = fbb.CreateString("", 0);
args.push_back(CreateArg(fbb, empty_id, arg));
auto empty_ids = fbb.CreateVectorOfStrings({});
args.push_back(CreateArg(fbb, empty_ids, arg));
sha256_update(&ctx, (BYTE *) value, length);
}
@@ -190,8 +192,10 @@ uint8_t *TaskSpec_finish_construct(TaskBuilder *builder, int64_t *size) {
return builder->Finish(size);
}
void TaskSpec_args_add_ref(TaskBuilder *builder, ObjectID object_id) {
builder->NextReferenceArgument(object_id);
void TaskSpec_args_add_ref(TaskBuilder *builder,
ObjectID object_ids[],
int num_object_ids) {
builder->NextReferenceArgument(&object_ids[0], num_object_ids);
}
void TaskSpec_args_add_val(TaskBuilder *builder,
@@ -285,10 +289,18 @@ int64_t TaskSpec_num_args(TaskSpec *spec) {
return message->args()->size();
}
ObjectID TaskSpec_arg_id(TaskSpec *spec, int64_t arg_index) {
int TaskSpec_arg_id_count(TaskSpec *spec, int64_t arg_index) {
CHECK(spec);
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
return from_flatbuf(message->args()->Get(arg_index)->object_id());
auto ids = message->args()->Get(arg_index)->object_ids();
return ids->size();
}
ObjectID TaskSpec_arg_id(TaskSpec *spec, int64_t arg_index, int64_t id_index) {
CHECK(spec);
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
return from_flatbuf(
message->args()->Get(arg_index)->object_ids()->Get(id_index));
}
const uint8_t *TaskSpec_arg_val(TaskSpec *spec, int64_t arg_index) {
@@ -312,7 +324,7 @@ int64_t TaskSpec_num_returns(TaskSpec *spec) {
bool TaskSpec_arg_by_ref(TaskSpec *spec, int64_t arg_index) {
CHECK(spec);
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
return message->args()->Get(arg_index)->object_id()->size() != 0;
return message->args()->Get(arg_index)->object_ids()->size() != 0;
}
ObjectID TaskSpec_return(TaskSpec *spec, int64_t return_index) {
@@ -331,8 +343,9 @@ double TaskSpec_get_required_resource(const TaskSpec *spec,
bool TaskSpec_is_dependent_on(TaskSpec *spec, ObjectID object_id) {
int64_t num_args = TaskSpec_num_args(spec);
for (int i = 0; i < num_args; ++i) {
if (TaskSpec_arg_by_ref(spec, i)) {
ObjectID arg_id = TaskSpec_arg_id(spec, i);
int count = TaskSpec_arg_id_count(spec, i);
for (int j = 0; j < count; j++) {
ObjectID arg_id = TaskSpec_arg_id(spec, i, j);
if (ObjectID_equal(arg_id, object_id)) {
return true;
}
+16 -3
View File
@@ -238,15 +238,25 @@ int64_t TaskSpec_num_returns(TaskSpec *spec);
*/
bool TaskSpec_arg_by_ref(TaskSpec *spec, int64_t arg_index);
/**
* Get number of object IDs in a given argument
*
* @param spec The task_spec in question.
* @param arg_index The index of the argument in question.
* @return number of object IDs in this argument
*/
int TaskSpec_arg_id_count(TaskSpec *spec, int64_t arg_index);
/**
* Get a particular argument to this task. This assumes the argument is an
* object ID.
*
* @param spec The task_spec in question.
* @param arg_index The index of the argument in question.
* @param id_index The index of the object ID in this arg.
* @return The argument at that index.
*/
ObjectID TaskSpec_arg_id(TaskSpec *spec, int64_t arg_index);
ObjectID TaskSpec_arg_id(TaskSpec *spec, int64_t arg_index, int64_t id_index);
/**
* Get a particular argument to this task. This assumes the argument is a value.
@@ -272,11 +282,14 @@ int64_t TaskSpec_arg_length(TaskSpec *spec, int64_t arg_index);
* arguments in their order of appearance.
*
* @param spec The task_spec in question.
* @param The object ID to set the argument to.
* @param object_ids The object IDs to set the argument to.
* @param num_object_ids number of IDs in this param, usually 1.
* @return The number of task arguments that have been set before this one. This
* is only used for testing.
*/
void TaskSpec_args_add_ref(TaskBuilder *spec, ObjectID obj_id);
void TaskSpec_args_add_ref(TaskBuilder *spec,
ObjectID object_ids[],
int num_object_ids);
/**
* Set the next task argument. Note that this API only allows you to set the
+1 -1
View File
@@ -23,7 +23,7 @@ static inline TaskSpec *example_task_spec_with_args(int64_t num_args,
} else {
arg_id = arg_ids[i];
}
TaskSpec_args_add_ref(g_task_builder, arg_id);
TaskSpec_args_add_ref(g_task_builder, &arg_id, 1);
}
return TaskSpec_finish_construct(g_task_builder, task_spec_size);
}
+16 -13
View File
@@ -19,10 +19,10 @@ TEST task_test(void) {
NIL_ACTOR_ID, 0, false, func_id, 2);
UniqueID arg1 = globally_unique_id();
TaskSpec_args_add_ref(builder, arg1);
TaskSpec_args_add_ref(builder, &arg1, 1);
TaskSpec_args_add_val(builder, (uint8_t *) "hello", 5);
UniqueID arg2 = globally_unique_id();
TaskSpec_args_add_ref(builder, arg2);
TaskSpec_args_add_ref(builder, &arg2, 1);
TaskSpec_args_add_val(builder, (uint8_t *) "world", 5);
/* Finish constructing the spec. This constructs the task ID and the
* return IDs. */
@@ -33,10 +33,10 @@ TEST task_test(void) {
ASSERT(TaskSpec_num_args(spec) == 4);
ASSERT(TaskSpec_num_returns(spec) == 2);
ASSERT(FunctionID_equal(TaskSpec_function(spec), func_id));
ASSERT(ObjectID_equal(TaskSpec_arg_id(spec, 0), arg1));
ASSERT(ObjectID_equal(TaskSpec_arg_id(spec, 0, 0), arg1));
ASSERT(memcmp(TaskSpec_arg_val(spec, 1), (uint8_t *) "hello",
TaskSpec_arg_length(spec, 1)) == 0);
ASSERT(ObjectID_equal(TaskSpec_arg_id(spec, 2), arg2));
ASSERT(ObjectID_equal(TaskSpec_arg_id(spec, 2, 0), arg2));
ASSERT(memcmp(TaskSpec_arg_val(spec, 3), (uint8_t *) "world",
TaskSpec_arg_length(spec, 3)) == 0);
@@ -56,7 +56,7 @@ TEST deterministic_ids_test(void) {
/* Construct a first task. */
TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID,
NIL_ACTOR_ID, 0, false, func_id, 3);
TaskSpec_args_add_ref(builder, arg1);
TaskSpec_args_add_ref(builder, &arg1, 1);
TaskSpec_args_add_val(builder, arg2, 11);
int64_t size1;
TaskSpec *spec1 = TaskSpec_finish_construct(builder, &size1);
@@ -64,7 +64,7 @@ TEST deterministic_ids_test(void) {
/* Construct a second identical task. */
TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID,
NIL_ACTOR_ID, 0, false, func_id, 3);
TaskSpec_args_add_ref(builder, arg1);
TaskSpec_args_add_ref(builder, &arg1, 1);
TaskSpec_args_add_val(builder, arg2, 11);
int64_t size2;
TaskSpec *spec2 = TaskSpec_finish_construct(builder, &size2);
@@ -84,7 +84,7 @@ TEST deterministic_ids_test(void) {
/* Construct a task with a different parent task ID. */
TaskSpec_start_construct(builder, NIL_ID, globally_unique_id(), 0,
NIL_ACTOR_ID, NIL_ACTOR_ID, 0, false, func_id, 3);
TaskSpec_args_add_ref(builder, arg1);
TaskSpec_args_add_ref(builder, &arg1, 1);
TaskSpec_args_add_val(builder, arg2, 11);
int64_t size3;
TaskSpec *spec3 = TaskSpec_finish_construct(builder, &size3);
@@ -92,7 +92,7 @@ TEST deterministic_ids_test(void) {
/* Construct a task with a different parent counter. */
TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 1, NIL_ACTOR_ID,
NIL_ACTOR_ID, 0, false, func_id, 3);
TaskSpec_args_add_ref(builder, arg1);
TaskSpec_args_add_ref(builder, &arg1, 1);
TaskSpec_args_add_val(builder, arg2, 11);
int64_t size4;
TaskSpec *spec4 = TaskSpec_finish_construct(builder, &size4);
@@ -100,7 +100,7 @@ TEST deterministic_ids_test(void) {
/* Construct a task with a different function ID. */
TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID,
NIL_ACTOR_ID, 0, false, globally_unique_id(), 3);
TaskSpec_args_add_ref(builder, arg1);
TaskSpec_args_add_ref(builder, &arg1, 1);
TaskSpec_args_add_val(builder, arg2, 11);
int64_t size5;
TaskSpec *spec5 = TaskSpec_finish_construct(builder, &size5);
@@ -108,7 +108,8 @@ TEST deterministic_ids_test(void) {
/* Construct a task with a different object ID argument. */
TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID,
NIL_ACTOR_ID, 0, false, func_id, 3);
TaskSpec_args_add_ref(builder, globally_unique_id());
ObjectID object_id = globally_unique_id();
TaskSpec_args_add_ref(builder, &object_id, 1);
TaskSpec_args_add_val(builder, arg2, 11);
int64_t size6;
TaskSpec *spec6 = TaskSpec_finish_construct(builder, &size6);
@@ -116,7 +117,7 @@ TEST deterministic_ids_test(void) {
/* Construct a task with a different value argument. */
TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID,
NIL_ACTOR_ID, 0, false, func_id, 3);
TaskSpec_args_add_ref(builder, arg1);
TaskSpec_args_add_ref(builder, &arg1, 1);
TaskSpec_args_add_val(builder, (uint8_t *) "hello_world", 11);
int64_t size7;
TaskSpec *spec7 = TaskSpec_finish_construct(builder, &size7);
@@ -161,10 +162,12 @@ TEST send_task(void) {
FunctionID func_id = globally_unique_id();
TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID,
NIL_ACTOR_ID, 0, false, func_id, 2);
TaskSpec_args_add_ref(builder, globally_unique_id());
ObjectID object_id = globally_unique_id();
TaskSpec_args_add_ref(builder, &object_id, 1);
TaskSpec_args_add_val(builder, (uint8_t *) "Hello", 5);
TaskSpec_args_add_val(builder, (uint8_t *) "World", 5);
TaskSpec_args_add_ref(builder, globally_unique_id());
object_id = globally_unique_id();
TaskSpec_args_add_ref(builder, &object_id, 1);
int64_t size;
TaskSpec *spec = TaskSpec_finish_construct(builder, &size);
int fd[2];
@@ -94,43 +94,40 @@ int64_t locally_available_data_size(const GlobalSchedulerState *state,
/* TODO(rkn): Note that if the same object ID appears as multiple arguments,
* then it will be overcounted. */
for (int64_t i = 0; i < TaskSpec_num_args(task_spec); ++i) {
if (!TaskSpec_arg_by_ref(task_spec, i)) {
/* Ignore arguments that are not object IDs since these are serialized as
* part of the task spec and so they don't require any data transfer. */
continue;
int count = TaskSpec_arg_id_count(task_spec, i);
for (int j = 0; j < count; ++j) {
ObjectID object_id = TaskSpec_arg_id(task_spec, i, j);
if (state->scheduler_object_info_table.count(object_id) == 0) {
/* If this global scheduler is not aware of this object ID, then ignore
* it. */
continue;
}
const SchedulerObjectInfo &object_size_info =
state->scheduler_object_info_table.at(object_id);
if (std::find(object_size_info.object_locations.begin(),
object_size_info.object_locations.end(), plasma_manager) ==
object_size_info.object_locations.end()) {
/* This local scheduler does not have access to this object, so don't
* count this object. */
continue;
}
/* Look at the size of the object. */
int64_t object_size = object_size_info.data_size;
if (object_size == -1) {
/* This means that this global scheduler does not know the object size
* yet, so assume that the object is one megabyte. TODO(rkn): Maybe we
* should instead use the average object size. */
object_size = 1000000;
}
/* If we get here, then this local scheduler has access to this object, so
* count the contribution of this object. */
task_data_size += object_size;
}
ObjectID object_id = TaskSpec_arg_id(task_spec, i);
if (state->scheduler_object_info_table.count(object_id) == 0) {
/* If this global scheduler is not aware of this object ID, then ignore
* it.*/
continue;
}
const SchedulerObjectInfo &object_size_info =
state->scheduler_object_info_table.at(object_id);
if (std::find(object_size_info.object_locations.begin(),
object_size_info.object_locations.end(),
plasma_manager) == object_size_info.object_locations.end()) {
/* This local scheduler does not have access to this object, so don't
* count this object. */
continue;
}
/* Look at the size of the object. */
int64_t object_size = object_size_info.data_size;
if (object_size == -1) {
/* This means that this global scheduler does not know the object size
* yet, so assume that the object is one megabyte. TODO(rkn): Maybe we
* should instead use the average object size. */
object_size = 1000000;
}
/* If we get here, then this local scheduler has access to this object, so
* count the contribution of this object. */
task_data_size += object_size;
}
return task_data_size;
+3 -2
View File
@@ -675,8 +675,9 @@ void reconstruct_task_update_callback(Task *task,
/* Recursively reconstruct the task's inputs, if necessary. */
for (int64_t i = 0; i < TaskSpec_num_args(spec); ++i) {
if (TaskSpec_arg_by_ref(spec, i)) {
ObjectID arg_id = TaskSpec_arg_id(spec, i);
int count = TaskSpec_arg_id_count(spec, i);
for (int64_t j = 0; j < count; ++j) {
ObjectID arg_id = TaskSpec_arg_id(spec, i, j);
reconstruct_object(state, arg_id);
}
}
@@ -583,8 +583,9 @@ void fetch_missing_dependencies(
int64_t num_args = TaskSpec_num_args(task);
int num_missing_dependencies = 0;
for (int64_t i = 0; i < num_args; ++i) {
if (TaskSpec_arg_by_ref(task, i)) {
ObjectID obj_id = TaskSpec_arg_id(task, i);
int count = TaskSpec_arg_id_count(task, i);
for (int j = 0; j < count; ++j) {
ObjectID obj_id = TaskSpec_arg_id(task, i, j);
if (algorithm_state->local_objects.count(obj_id) == 0) {
/* If the entry is not yet available locally, record the dependency. */
fetch_missing_dependency(state, algorithm_state, task_entry_it,
@@ -609,8 +610,9 @@ void fetch_missing_dependencies(
bool can_run(SchedulingAlgorithmState *algorithm_state, TaskSpec *task) {
int64_t num_args = TaskSpec_num_args(task);
for (int i = 0; i < num_args; ++i) {
if (TaskSpec_arg_by_ref(task, i)) {
ObjectID obj_id = TaskSpec_arg_id(task, i);
int count = TaskSpec_arg_id_count(task, i);
for (int j = 0; j < count; ++j) {
ObjectID obj_id = TaskSpec_arg_id(task, i, j);
if (algorithm_state->local_objects.count(obj_id) == 0) {
/* The object is not present locally, so this task cannot be scheduled
* right now. */
@@ -1443,8 +1445,9 @@ void handle_object_removed(LocalSchedulerState *state,
it != algorithm_state->waiting_task_queue->end(); ++it) {
int64_t num_args = TaskSpec_num_args(it->spec);
for (int64_t i = 0; i < num_args; ++i) {
if (TaskSpec_arg_by_ref(it->spec, i)) {
ObjectID arg_id = TaskSpec_arg_id(it->spec, i);
int count = TaskSpec_arg_id_count(it->spec, i);
for (int j = 0; j < count; ++j) {
ObjectID arg_id = TaskSpec_arg_id(it->spec, i, j);
if (ObjectID_equal(arg_id, removed_object_id)) {
fetch_missing_dependency(state, algorithm_state, it,
removed_object_id.to_plasma_id(), i);
@@ -463,7 +463,7 @@ TEST task_dependency_test(void) {
LocalSchedulerClient *worker = state->workers.front();
int64_t task_size;
TaskSpec *spec = example_task_spec(1, 1, &task_size);
ObjectID oid = TaskSpec_arg_id(spec, 0);
ObjectID oid = TaskSpec_arg_id(spec, 0, 0);
/* Check that the task gets queued in the waiting queue if the task is
* submitted, but the input and workers are not available. */
@@ -538,8 +538,8 @@ TEST task_multi_dependency_test(void) {
LocalSchedulerClient *worker = state->workers.front();
int64_t task_size;
TaskSpec *spec = example_task_spec(2, 1, &task_size);
ObjectID oid1 = TaskSpec_arg_id(spec, 0);
ObjectID oid2 = TaskSpec_arg_id(spec, 1);
ObjectID oid1 = TaskSpec_arg_id(spec, 0, 0);
ObjectID oid2 = TaskSpec_arg_id(spec, 1, 0);
/* Check that the task gets queued in the waiting queue if the task is
* submitted, but the inputs and workers are not available. */