From f1987cdc16cd196ce112368f1273406bbde1aedb Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Wed, 18 Jan 2017 20:27:40 -0800 Subject: [PATCH] Split local scheduler task queue (#211) * Split local scheduler task queue into waiting and dispatch queue * Fix memory leak * Add a new task scheduling status for when a task has been queued locally * Fix global scheduler test case and add task status doc * Documentation * Address Philipp's comments * Move tasks back to the waiting queue if their dependencies become unavailable * Update existing task table entries instead of overwriting --- python/global_scheduler/test/test.py | 33 ++- src/common/task.h | 17 +- src/photon/photon_algorithm.c | 355 ++++++++++++++++++--------- src/photon/photon_algorithm.h | 26 +- src/photon/photon_scheduler.c | 18 +- src/photon/photon_scheduler.h | 3 +- src/photon/test/photon_tests.c | 79 ++++-- 7 files changed, 356 insertions(+), 175 deletions(-) diff --git a/python/global_scheduler/test/test.py b/python/global_scheduler/test/test.py index 1b93871ed..05bcfbe94 100644 --- a/python/global_scheduler/test/test.py +++ b/python/global_scheduler/test/test.py @@ -25,8 +25,9 @@ ID_SIZE = 20 # These constants must match the scheduling state enum in task.h. TASK_STATUS_WAITING = 1 TASK_STATUS_SCHEDULED = 2 -TASK_STATUS_RUNNING = 4 -TASK_STATUS_DONE = 8 +TASK_STATUS_QUEUED = 4 +TASK_STATUS_RUNNING = 8 +TASK_STATUS_DONE = 16 # These constants are an implementation detail of ray_redis_module.c, so this # must be kept in sync with that file. @@ -161,8 +162,10 @@ class TestGlobalScheduler(unittest.TestCase): if len(task_entries) == 1: task_contents = self.redis_client.hgetall(task_entries[0]) task_status = int(task_contents[b"state"]) - self.assertTrue(task_status in [TASK_STATUS_WAITING, TASK_STATUS_SCHEDULED]) - if task_status == TASK_STATUS_SCHEDULED: + self.assertTrue(task_status in [TASK_STATUS_WAITING, + TASK_STATUS_SCHEDULED, + TASK_STATUS_QUEUED]) + if task_status == TASK_STATUS_QUEUED: break else: print(task_status) @@ -170,7 +173,7 @@ class TestGlobalScheduler(unittest.TestCase): num_retries -= 1 time.sleep(1) - if num_retries <= 0 and task_status != TASK_STATUS_SCHEDULED: + if num_retries <= 0 and task_status != TASK_STATUS_QUEUED: # Failed to submit and schedule a single task -- bail. self.tearDown() sys.exit(1) @@ -204,12 +207,18 @@ class TestGlobalScheduler(unittest.TestCase): if len(task_entries) == num_tasks: task_contents = [self.redis_client.hgetall(task_entries[i]) for i in range(len(task_entries))] task_statuses = [int(contents[b"state"]) for contents in task_contents] - self.assertTrue(all([status in [TASK_STATUS_WAITING, TASK_STATUS_SCHEDULED] for status in task_statuses])) - num_tasks_done = task_statuses.count(TASK_STATUS_SCHEDULED) + self.assertTrue(all([ + status in [TASK_STATUS_WAITING, + TASK_STATUS_SCHEDULED, + TASK_STATUS_QUEUED] for status in task_statuses + ])) + num_tasks_done = task_statuses.count(TASK_STATUS_QUEUED) + num_tasks_scheduled = task_statuses.count(TASK_STATUS_SCHEDULED) num_tasks_waiting = task_statuses.count(TASK_STATUS_WAITING) - print("tasks in Redis = {}, tasks waiting = {}, tasks scheduled = {}, retries left = {}" - .format(len(task_entries), num_tasks_waiting, num_tasks_done, num_retries)) - if all([status == TASK_STATUS_SCHEDULED for status in task_statuses]): + print("tasks in Redis = {}, tasks waiting = {}, tasks scheduled = {}, tasks queued = {}, retries left = {}" + .format(len(task_entries), num_tasks_waiting, + num_tasks_scheduled, num_tasks_done, num_retries)) + if all([status == TASK_STATUS_QUEUED for status in task_statuses]): # We're done, so pass. break num_retries -= 1 @@ -231,8 +240,8 @@ class TestGlobalScheduler(unittest.TestCase): if __name__ == "__main__": if len(sys.argv) > 1: # Pop the argument so we don't mess with unittest's own argument parser. - arg = sys.argv.pop() - if arg == "valgrind": + if sys.argv[-1] == "valgrind": + arg = sys.argv.pop() USE_VALGRIND = True print("Using valgrind for tests") unittest.main(verbosity=2) diff --git a/src/common/task.h b/src/common/task.h index e8989a8b6..3939879c3 100644 --- a/src/common/task.h +++ b/src/common/task.h @@ -244,18 +244,25 @@ void print_task(task_spec *spec, UT_string *output); /** * ==== Task ==== - * Contains information about a scheduled task: The task specification, the task - * schedulign state (WAITING, SCHEDULED, RUNNING, DONE), and which local - * scheduler the task is scheduled on. + * Contains information about a scheduled task: The task specification, the + * task scheduling state (WAITING, SCHEDULED, QUEUED, RUNNING, DONE), and which + * local scheduler the task is scheduled on. */ /** The scheduling_state can be used as a flag when we are listening * for an event, for example TASK_WAITING | TASK_SCHEDULED. */ typedef enum { + /** The task is waiting to be scheduled. */ TASK_STATUS_WAITING = 1, + /** The task has been scheduled to a node, but has not been queued yet. */ TASK_STATUS_SCHEDULED = 2, - TASK_STATUS_RUNNING = 4, - TASK_STATUS_DONE = 8 + /** The task has been queued on a node, where it will wait for its + * dependencies to become ready and a worker to become available. */ + TASK_STATUS_QUEUED = 4, + /** The task is running on a worker. */ + TASK_STATUS_RUNNING = 8, + /** The task is done executing. */ + TASK_STATUS_DONE = 16 } scheduling_state; /** A task is an execution of a task specification. It has a state of execution diff --git a/src/photon/photon_algorithm.c b/src/photon/photon_algorithm.c index 1a08122b6..471f1199d 100644 --- a/src/photon/photon_algorithm.c +++ b/src/photon/photon_algorithm.c @@ -13,9 +13,6 @@ typedef struct task_queue_entry { /** The task that is queued. */ task_spec *spec; - /** True if this task was assigned to this local scheduler by the global - * scheduler and false otherwise. */ - bool from_global_scheduler; struct task_queue_entry *prev; struct task_queue_entry *next; } task_queue_entry; @@ -43,8 +40,11 @@ typedef struct { /** Part of the photon state that is maintained by the scheduling algorithm. */ struct scheduling_algorithm_state { - /** An array of pointers to tasks that are waiting to be scheduled. */ - task_queue_entry *task_queue; + /** An array of pointers to tasks that are waiting for dependencies. */ + task_queue_entry *waiting_task_queue; + /** An array of pointers to tasks whose dependencies are ready but that are + * waiting to be assigned to a worker. */ + task_queue_entry *dispatch_task_queue; /** An array of worker indices corresponding to clients that are * waiting for tasks. */ UT_array *available_workers; @@ -62,7 +62,8 @@ scheduling_algorithm_state *make_scheduling_algorithm_state(void) { /* Initialize an empty hash map for the cache of local available objects. */ algorithm_state->local_objects = NULL; /* Initialize the local data structures used for queuing tasks and workers. */ - algorithm_state->task_queue = NULL; + algorithm_state->waiting_task_queue = NULL; + algorithm_state->dispatch_task_queue = NULL; utarray_new(algorithm_state->available_workers, &ut_int_icd); /* Initialize the hash table of objects being fetched. */ algorithm_state->fetch_requests = NULL; @@ -72,8 +73,13 @@ scheduling_algorithm_state *make_scheduling_algorithm_state(void) { void free_scheduling_algorithm_state( scheduling_algorithm_state *algorithm_state) { task_queue_entry *elt, *tmp1; - DL_FOREACH_SAFE(algorithm_state->task_queue, elt, tmp1) { - DL_DELETE(algorithm_state->task_queue, elt); + DL_FOREACH_SAFE(algorithm_state->waiting_task_queue, elt, tmp1) { + DL_DELETE(algorithm_state->waiting_task_queue, elt); + free_task_spec(elt->spec); + free(elt); + } + DL_FOREACH_SAFE(algorithm_state->dispatch_task_queue, elt, tmp1) { + DL_DELETE(algorithm_state->dispatch_task_queue, elt); free_task_spec(elt->spec); free(elt); } @@ -96,7 +102,15 @@ void provide_scheduler_info(local_scheduler_state *state, local_scheduler_info *info) { task_queue_entry *elt; info->total_num_workers = utarray_len(state->workers); - DL_COUNT(algorithm_state->task_queue, elt, info->task_queue_length); + /* TODO(swang): Provide separate counts for tasks that are waiting for + * dependencies vs tasks that are waiting to be assigned. */ + int waiting_task_queue_length; + DL_COUNT(algorithm_state->waiting_task_queue, elt, waiting_task_queue_length); + int dispatch_task_queue_length; + DL_COUNT(algorithm_state->dispatch_task_queue, elt, + dispatch_task_queue_length); + info->task_queue_length = + waiting_task_queue_length + dispatch_task_queue_length; info->available_workers = utarray_len(algorithm_state->available_workers); } @@ -168,78 +182,168 @@ void fetch_missing_dependencies(local_scheduler_state *state, } /** - * If there is a task whose dependencies are available locally, assign it to the - * worker. This does not remove the worker from the available worker queue. + * Assign as many tasks from the dispatch queue as possible. * - * @param s The scheduler state. - * @param worker_index The index of the worker. - * @return This returns 1 if it successfully assigned a task to the worker, - * otherwise it returns 0. + * @param state The scheduler state. + * @param algorithm_state The scheduling algorithm state. + * @return Void. */ -bool find_and_schedule_task_if_possible( - local_scheduler_state *state, - scheduling_algorithm_state *algorithm_state, - int worker_index) { - task_queue_entry *elt, *tmp; - bool found_task_to_schedule = false; - /* Find the first task whose dependencies are available locally. */ - DL_FOREACH_SAFE(algorithm_state->task_queue, elt, tmp) { - if (can_run(algorithm_state, elt->spec)) { - found_task_to_schedule = true; - break; - } +void dispatch_tasks(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state) { + /* Assign tasks while there are still tasks in the dispatch queue and + * available workers. */ + while ((algorithm_state->dispatch_task_queue != NULL) && + (utarray_len(algorithm_state->available_workers) > 0)) { + LOG_DEBUG("Dispatching task"); + /* Pop a task from the dispatch queue. */ + task_queue_entry *dispatched_task = algorithm_state->dispatch_task_queue; + DL_DELETE(algorithm_state->dispatch_task_queue, dispatched_task); + + /* Get the last available worker in the available worker queue. */ + int *worker_index = + (int *) utarray_back(algorithm_state->available_workers); + /* Tell the available worker to execute the task. */ + assign_task_to_worker(state, dispatched_task->spec, *worker_index); + /* Remove the available worker from the queue and free the struct. */ + utarray_pop_back(algorithm_state->available_workers); + free_task_spec(dispatched_task->spec); + free(dispatched_task); } - if (found_task_to_schedule) { - /* This task's dependencies are available locally, so assign the task to the - * worker. */ - assign_task_to_worker(state, elt->spec, worker_index, - elt->from_global_scheduler); - /* Update the task queue data structure and free the task. */ - DL_DELETE(algorithm_state->task_queue, elt); - free_task_spec(elt->spec); - free(elt); - } - return found_task_to_schedule; } -void run_task_immediately(local_scheduler_state *state, - scheduling_algorithm_state *algorithm_state, - task_spec *spec, - bool from_global_scheduler) { - /* Get the last available worker in the available worker queue. */ - int *worker_index = (int *) utarray_back(algorithm_state->available_workers); - /* Tell the available worker to execute the task. */ - assign_task_to_worker(state, spec, *worker_index, from_global_scheduler); - /* Remove the available worker from the queue and free the struct. */ - utarray_pop_back(algorithm_state->available_workers); -} - -void queue_task_locally(local_scheduler_state *state, - scheduling_algorithm_state *algorithm_state, - task_spec *spec, - bool from_global_scheduler) { +/** + * A helper function to allocate a queue entry for a task specification and + * push it onto a generic queue. + * + * @param state The state of the local scheduler. + * @param task_queue A pointer to a task queue. NOTE: Because we are using + * utlist.h, we must pass in a pointer to the queue we want to append + * to. If we passed in the queue itself and the queue was empty, this + * would append the task to a queue that we don't have a reference to. + * @param spec The task specification to queue. + * @param from_global_scheduler Whether or not the task was from a global + * scheduler. If false, the task was submitted by a worker. + * @return Void. + */ +void queue_task(local_scheduler_state *state, + task_queue_entry **task_queue, + task_spec *spec, + bool from_global_scheduler) { /* Copy the spec and add it to the task queue. The allocated spec will be * freed when it is assigned to a worker. */ task_queue_entry *elt = malloc(sizeof(task_queue_entry)); elt->spec = (task_spec *) malloc(task_spec_size(spec)); memcpy(elt->spec, spec, task_spec_size(spec)); - elt->from_global_scheduler = from_global_scheduler; - DL_APPEND(algorithm_state->task_queue, elt); - if (!from_global_scheduler && state->db != NULL) { + DL_APPEND((*task_queue), elt); + + /* The task has been added to a local scheduler queue. Write the entry in the + * task table to notify others that we have queued it. */ + if (state->db != NULL) { task *task = - alloc_task(spec, TASK_STATUS_SCHEDULED, get_db_client_id(state->db)); - task_table_add_task(state->db, task, (retry_info *) &photon_retry, NULL, + alloc_task(spec, TASK_STATUS_QUEUED, get_db_client_id(state->db)); + if (from_global_scheduler) { + /* If the task is from the global scheduler, it's already been added to + * the task table, so just update the entry. */ + task_table_update(state->db, task, (retry_info *) &photon_retry, NULL, NULL); + } else { + /* Otherwise, this is the first time the task has been seen in the system + * (unless it's a resubmission of a previous task), so add the entry. */ + task_table_add_task(state->db, task, (retry_info *) &photon_retry, NULL, + NULL); + } } } +/** + * Queue a task whose dependencies are missing. When the task's object + * dependencies become available, the task will be moved to the dispatch queue. + * If we have a connection to a plasma manager, begin trying to fetch the + * dependencies. + * + * @param state The scheduler state. + * @param algorithm_state The scheduling algorithm state. + * @param spec The task specification to queue. + * @param from_global_scheduler Whether or not the task was from a global + * scheduler. If false, the task was submitted by a worker. + * @return Void. + */ +void queue_waiting_task(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + task_spec *spec, + bool from_global_scheduler) { + LOG_DEBUG("Queueing task in waiting queue"); + /* Initiate fetch calls for any dependencies that are not present locally. */ + if (plasma_manager_is_connected(state->plasma_conn)) { + fetch_missing_dependencies(state, algorithm_state, spec); + } + queue_task(state, &algorithm_state->waiting_task_queue, spec, + from_global_scheduler); +} + +/** + * Queue a task whose dependencies are ready. When the task reaches the front + * of the dispatch queue and workers are available, it will be assigned. + * + * @param state The scheduler state. + * @param algorithm_state The scheduling algorithm state. + * @param spec The task specification to queue. + * @param from_global_scheduler Whether or not the task was from a global + * scheduler. If false, the task was submitted by a worker. + * @return Void. + */ +void queue_dispatch_task(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + task_spec *spec, + bool from_global_scheduler) { + LOG_DEBUG("Queueing task in dispatch queue"); + queue_task(state, &algorithm_state->dispatch_task_queue, spec, + from_global_scheduler); +} + +/** + * Add the task to the proper local scheduler queue. This assumes that the + * scheduling decision to place the task on this node has already been made, + * whether locally or by the global scheduler. + * + * @param state The scheduler state. + * @param algorithm_state The scheduling algorithm state. + * @param spec The task specification to queue. + * @param from_global_scheduler Whether or not the task was from a global + * scheduler. If false, the task was submitted by a worker. + * @return Void. + */ +void queue_task_locally(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + task_spec *spec, + bool from_global_scheduler) { + if (can_run(algorithm_state, spec)) { + /* Dependencies are ready, so push the task to the dispatch queue. */ + queue_dispatch_task(state, algorithm_state, spec, from_global_scheduler); + } else { + /* Dependencies are not ready, so push the task to the waiting queue. */ + queue_waiting_task(state, algorithm_state, spec, from_global_scheduler); + } +} + +/** + * Give a task to the global scheduler to schedule. + * + * @param state The scheduler state. + * @param algorithm_state The scheduling algorithm state. + * @param spec The task specification to schedule. + * @return Void. + */ void give_task_to_global_scheduler(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state, - task_spec *spec, - bool from_global_scheduler) { + task_spec *spec) { + if (state->db == NULL || !state->global_scheduler_exists) { + /* A global scheduler is not available, so queue the task locally. */ + queue_task_locally(state, algorithm_state, spec, false); + return; + } /* Pass on the task to the global scheduler. */ DCHECK(state->global_scheduler_exists); - DCHECK(!from_global_scheduler); task *task = alloc_task(spec, TASK_STATUS_WAITING, NIL_ID); DCHECK(state->db != NULL); task_table_add_task(state->db, task, (retry_info *) &photon_retry, NULL, @@ -254,14 +358,19 @@ void handle_task_submitted(local_scheduler_state *state, * cannot assign the task to a worker immediately, we either queue the task in * the local task queue or we pass the task to the global scheduler. For now, * we pass the task along to the global scheduler if there is one. */ - if ((utarray_len(algorithm_state->available_workers) > 0) && - can_run(algorithm_state, spec)) { - run_task_immediately(state, algorithm_state, spec, false); - } else if (state->db == NULL || !state->global_scheduler_exists) { - queue_task_locally(state, algorithm_state, spec, false); + if (can_run(algorithm_state, spec) && + (utarray_len(algorithm_state->available_workers) > 0)) { + /* Dependencies are ready and there is an available worker, so dispatch the + * task. */ + queue_dispatch_task(state, algorithm_state, spec, false); } else { - give_task_to_global_scheduler(state, algorithm_state, spec, false); + /* Give the task to the global scheduler to schedule, if it exists. */ + give_task_to_global_scheduler(state, algorithm_state, spec); } + + /* Try to dispatch tasks, since we may have added one to the queue. */ + dispatch_tasks(state, algorithm_state); + /* Update the result table, which holds mappings of object ID -> ID of the * task that created it. */ if (state->db != NULL) { @@ -282,17 +391,9 @@ void handle_task_scheduled(local_scheduler_state *state, * to the database. */ DCHECK(state->db != NULL); DCHECK(state->global_scheduler_exists); - /* Initiate fetch calls for any dependencies that are not present locally. */ - fetch_missing_dependencies(state, algorithm_state, spec); - /* If this task's dependencies are available locally, and if there is an - * available worker, then assign this task to an available worker. If we - * cannot assign the task to a worker immediately, queue the task locally. */ - if ((utarray_len(algorithm_state->available_workers) > 0) && - can_run(algorithm_state, spec)) { - run_task_immediately(state, algorithm_state, spec, true); - } else { - queue_task_locally(state, algorithm_state, spec, true); - } + /* Push the task to the appropriate queue. */ + queue_task_locally(state, algorithm_state, spec, true); + dispatch_tasks(state, algorithm_state); } void handle_worker_available(local_scheduler_state *state, @@ -301,50 +402,30 @@ void handle_worker_available(local_scheduler_state *state, worker *available_worker = (worker *) utarray_eltptr(state->workers, worker_index); CHECK(available_worker->task_in_progress == NULL); - /* Try to schedule another task to the worker. */ - int scheduled_task = - find_and_schedule_task_if_possible(state, algorithm_state, worker_index); - /* If we couldn't find a task to schedule, add the worker to the queue of - * available workers. */ - if (!scheduled_task) { - for (int *p = (int *) utarray_front(algorithm_state->available_workers); - p != NULL; - p = (int *) utarray_next(algorithm_state->available_workers, p)) { - DCHECK(*p != worker_index); - } - /* Add client_sock to a list of available workers. This struct will be freed - * when a task is assigned to this worker. */ - utarray_push_back(algorithm_state->available_workers, &worker_index); - LOG_DEBUG("Adding worker_index %d to available workers.\n", worker_index); + for (int *p = (int *) utarray_front(algorithm_state->available_workers); + p != NULL; + p = (int *) utarray_next(algorithm_state->available_workers, p)) { + DCHECK(*p != worker_index); } + /* Add worker to the list of available workers. */ + utarray_push_back(algorithm_state->available_workers, &worker_index); + LOG_DEBUG("Adding worker_index %d to available workers", worker_index); + + /* Try to dispatch tasks, since we now have available workers to assign them + * to. */ + dispatch_tasks(state, algorithm_state); } void handle_object_available(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state, object_id object_id) { - /* TODO(rkn): When does this get freed? */ + /* Available object entries get freed if the object is removed. */ available_object *entry = (available_object *) malloc(sizeof(available_object)); entry->object_id = object_id; HASH_ADD(handle, algorithm_state->local_objects, object_id, sizeof(object_id), entry); - /* Check if we can schedule any tasks. */ - int num_tasks_scheduled = 0; - for (int *p = (int *) utarray_front(algorithm_state->available_workers); - p != NULL; - p = (int *) utarray_next(algorithm_state->available_workers, p)) { - /* Schedule a task on this worker if possible. */ - int scheduled_task = - find_and_schedule_task_if_possible(state, algorithm_state, *p); - if (!scheduled_task) { - /* There are no tasks we can schedule, so exit the loop. */ - break; - } - num_tasks_scheduled += 1; - } - utarray_erase(algorithm_state->available_workers, 0, num_tasks_scheduled); - /* If we were previously trying to fetch this object, remove the fetch request * from the hash table. */ fetch_object_request *fetch_req; @@ -355,22 +436,64 @@ void handle_object_available(local_scheduler_state *state, CHECK(event_loop_remove_timer(state->loop, fetch_req->timer) == AE_OK); free(fetch_req); } + + /* Move any tasks whose object dependencies are now ready to the dispatch + * queue. */ + /* TODO(swang): This can be optimized by keeping a lookup table from object + * ID to list of dependent tasks in the waiting queue. */ + task_queue_entry *elt, *tmp; + DL_FOREACH_SAFE(algorithm_state->waiting_task_queue, elt, tmp) { + if (can_run(algorithm_state, elt->spec)) { + LOG_DEBUG("Moved task to dispatch queue"); + DL_DELETE(algorithm_state->waiting_task_queue, elt); + DL_APPEND(algorithm_state->dispatch_task_queue, elt); + } + } + + /* Try to dispatch tasks, since we may have added some from the waiting + * queue. */ + dispatch_tasks(state, algorithm_state); } -void handle_object_removed(local_scheduler_state *state, object_id object_id) { +void handle_object_removed(local_scheduler_state *state, + object_id removed_object_id) { scheduling_algorithm_state *algorithm_state = state->algorithm_state; available_object *entry; - HASH_FIND(handle, algorithm_state->local_objects, &object_id, - sizeof(object_id), entry); + HASH_FIND(handle, algorithm_state->local_objects, &removed_object_id, + sizeof(removed_object_id), entry); if (entry != NULL) { HASH_DELETE(handle, algorithm_state->local_objects, entry); free(entry); } + + /* Move dependent tasks from the dispatch queue back to the waiting queue. */ + task_queue_entry *elt, *tmp; + DL_FOREACH_SAFE(algorithm_state->dispatch_task_queue, elt, tmp) { + task_spec *task = elt->spec; + int64_t num_args = task_num_args(task); + for (int i = 0; i < num_args; ++i) { + if (task_arg_type(task, i) == ARG_BY_REF) { + object_id arg_id = task_arg_id(task, i); + if (object_ids_equal(arg_id, removed_object_id)) { + LOG_DEBUG("Moved task from dispatch queue back to waiting queue"); + DL_DELETE(algorithm_state->dispatch_task_queue, elt); + DL_APPEND(algorithm_state->waiting_task_queue, elt); + } + } + } + } } -int num_tasks_in_queue(scheduling_algorithm_state *algorithm_state) { +int num_waiting_tasks(scheduling_algorithm_state *algorithm_state) { task_queue_entry *elt; int count; - DL_COUNT(algorithm_state->task_queue, elt, count); + DL_COUNT(algorithm_state->waiting_task_queue, elt, count); + return count; +} + +int num_dispatch_tasks(scheduling_algorithm_state *algorithm_state) { + task_queue_entry *elt; + int count; + DL_COUNT(algorithm_state->dispatch_task_queue, elt, count); return count; } diff --git a/src/photon/photon_algorithm.h b/src/photon/photon_algorithm.h index 49f2ef981..1e767a5bd 100644 --- a/src/photon/photon_algorithm.h +++ b/src/photon/photon_algorithm.h @@ -43,8 +43,17 @@ void provide_scheduler_info(local_scheduler_state *state, /** * This function will be called when a new task is submitted by a worker for - * execution. + * execution. The task will either be: + * 1. Put into the waiting queue, where it will wait for its dependencies to + * become available. + * 2. Put into the dispatch queue, where it will wait for an available worker. + * 3. Given to the global scheduler to be scheduled. * + * Currently, the local scheduler policy is to keep the task if its + * dependencies are ready and there is an available worker. + * + * @param state The state of the local scheduler. + * @param algorithm_state State maintained by the scheduling algorithm. * @param task Task that is submitted by the worker. * @return Void. */ @@ -102,12 +111,21 @@ void handle_worker_available(local_scheduler_state *state, /** The following methods are for testing purposes only. */ #ifdef PHOTON_TEST /** - * Get the number of tasks currently queued locally. + * Get the number of tasks currently waiting for object dependencies to become + * available locally. * * @param algorithm_state State maintained by the scheduling algorithm. - * @return The number of tasks queued locally. + * @return The number of tasks queued. */ -int num_tasks_in_queue(scheduling_algorithm_state *algorithm_state); +int num_waiting_tasks(scheduling_algorithm_state *algorithm_state); + +/** + * Get the number of tasks currently waiting for a worker to become available. + * + * @param algorithm_state State maintained by the scheduling algorithm. + * @return The number of tasks queued. + */ +int num_dispatch_tasks(scheduling_algorithm_state *algorithm_state); #endif #endif /* PHOTON_ALGORITHM_H */ diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index 5edf076b7..8f283eafe 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -110,8 +110,7 @@ void free_local_scheduler(local_scheduler_state *state) { void assign_task_to_worker(local_scheduler_state *state, task_spec *spec, - int worker_index, - bool from_global_scheduler) { + int worker_index) { CHECK(worker_index < utarray_len(state->workers)); worker *w = (worker *) utarray_eltptr(state->workers, worker_index); if (write_message(w->sock, EXECUTE_TASK, task_spec_size(spec), @@ -131,13 +130,8 @@ void assign_task_to_worker(local_scheduler_state *state, if (state->db != NULL) { task *task = alloc_task(spec, TASK_STATUS_RUNNING, get_db_client_id(state->db)); - if (from_global_scheduler) { - task_table_update(state->db, task, (retry_info *) &photon_retry, NULL, - NULL); - } else { - task_table_add_task(state->db, task, (retry_info *) &photon_retry, NULL, - NULL); - } + task_table_update(state->db, task, (retry_info *) &photon_retry, NULL, + NULL); /* Record which task this worker is executing. This will be freed in * process_message when the worker sends a GET_TASK message to the local * scheduler. */ @@ -178,15 +172,13 @@ void reconstruct_object_task_lookup_callback(object_id reconstruct_object_id, CHECKM(task != NULL, "No task information found for object during reconstruction"); local_scheduler_state *state = user_context; - /* If the task's scheduling state is WAITING or SCHEDULED, assume that + /* If the task's scheduling state is pending completion, assume that * reconstruction is already being taken care of and cancel this * reconstruction operation. NOTE: This codepath is not responsible for * detecting failure of the other reconstruction, or updating the * scheduling_state accordingly. */ scheduling_state task_status = task_state(task); - if (task_status == TASK_STATUS_WAITING || - task_status == TASK_STATUS_SCHEDULED || - task_status == TASK_STATUS_RUNNING) { + if (task_status != TASK_STATUS_DONE) { LOG_DEBUG("Task to reconstruct had scheduling state %d", task_status); return; } diff --git a/src/photon/photon_scheduler.h b/src/photon/photon_scheduler.h index b62041887..5e52d8f1f 100644 --- a/src/photon/photon_scheduler.h +++ b/src/photon/photon_scheduler.h @@ -35,8 +35,7 @@ void new_client_connection(event_loop *loop, */ void assign_task_to_worker(local_scheduler_state *state, task_spec *task, - int worker_index, - bool from_global_scheduler); + int worker_index); /** * This is the callback that is used to process a notification from the Plasma diff --git a/src/photon/test/photon_tests.c b/src/photon/test/photon_tests.c index bdd816aa9..51c4906ee 100644 --- a/src/photon/test/photon_tests.c +++ b/src/photon/test/photon_tests.c @@ -138,7 +138,8 @@ TEST object_reconstruction_test(void) { * left in the local scheduler's task queue. Then, clean up. */ wait(NULL); free_task_spec(spec); - ASSERT_EQ(num_tasks_in_queue(photon->photon_state->algorithm_state), 0); + ASSERT_EQ(num_waiting_tasks(photon->photon_state->algorithm_state), 0); + ASSERT_EQ(num_dispatch_tasks(photon->photon_state->algorithm_state), 0); destroy_photon_mock(photon); PASS(); } @@ -217,7 +218,8 @@ TEST object_reconstruction_recursive_test(void) { /* Wait for the child process to exit and check that there are no tasks * left in the local scheduler's task queue. Then, clean up. */ wait(NULL); - ASSERT_EQ(num_tasks_in_queue(photon->photon_state->algorithm_state), 0); + ASSERT_EQ(num_waiting_tasks(photon->photon_state->algorithm_state), 0); + ASSERT_EQ(num_dispatch_tasks(photon->photon_state->algorithm_state), 0); for (int i = 0; i < NUM_TASKS; ++i) { free_task_spec(specs[i]); } @@ -278,7 +280,8 @@ TEST object_reconstruction_suppression_test(void) { /* Wait for the child process to exit and check that there are no tasks * left in the local scheduler's task queue. Then, clean up. */ wait(NULL); - ASSERT_EQ(num_tasks_in_queue(photon->photon_state->algorithm_state), 0); + ASSERT_EQ(num_waiting_tasks(photon->photon_state->algorithm_state), 0); + ASSERT_EQ(num_dispatch_tasks(photon->photon_state->algorithm_state), 0); free_task_spec(object_reconstruction_suppression_spec); db_disconnect(db); destroy_photon_mock(photon); @@ -294,35 +297,65 @@ TEST object_notifications_test(void) { task_spec *spec = example_task_spec(1, 1); object_id oid = task_arg_id(spec, 0); - /* Check that the task gets queued if the task is submitted and a worker is - * available, but the input is not. Once the input is available, the task - * gets assigned. */ + /* Check that the task gets queued in the waiting queue if the task is + * submitted, but the input and workers are not available. */ handle_task_submitted(state, algorithm_state, spec); - handle_worker_available(state, algorithm_state, worker_index); - ASSERT_EQ(num_tasks_in_queue(algorithm_state), 1); + ASSERT_EQ(num_waiting_tasks(algorithm_state), 1); + ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); + /* Once the input is available, the task gets moved to the dispatch queue. */ handle_object_available(state, algorithm_state, oid); - ASSERT_EQ(num_tasks_in_queue(algorithm_state), 0); - reset_worker(photon, worker_index); - - /* Check that the task gets queued if the task is submitted and the input is - * available, but no worker is available yet. Once a worker is available, the - * task gets assigned. */ - handle_task_submitted(state, algorithm_state, spec); - ASSERT_EQ(num_tasks_in_queue(algorithm_state), 1); + ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); + ASSERT_EQ(num_dispatch_tasks(algorithm_state), 1); + /* Once a worker is available, the task gets assigned. */ handle_worker_available(state, algorithm_state, worker_index); - ASSERT_EQ(num_tasks_in_queue(algorithm_state), 0); + ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); + ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); reset_worker(photon, worker_index); - /* If an object gets removed, check the first scenario again, where the task - * gets queued if the task is submitted and a worker is available, but the - * input is not. Once the input is made available again, the task gets - * assigned. */ + /* Check that the task gets queued in the waiting queue if the task is + * submitted and a worker is available, but the input is not. */ handle_object_removed(state, oid); handle_task_submitted(state, algorithm_state, spec); handle_worker_available(state, algorithm_state, worker_index); - ASSERT_EQ(num_tasks_in_queue(algorithm_state), 1); + ASSERT_EQ(num_waiting_tasks(algorithm_state), 1); + ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); + /* Once the input is available, the task gets assigned. */ handle_object_available(state, algorithm_state, oid); - ASSERT_EQ(num_tasks_in_queue(algorithm_state), 0); + ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); + ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); + reset_worker(photon, worker_index); + + /* Check that the task gets queued in the dispatch queue if the task is + * submitted and the input is available, but no worker is available yet. */ + handle_task_submitted(state, algorithm_state, spec); + ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); + ASSERT_EQ(num_dispatch_tasks(algorithm_state), 1); + /* Once a worker is available, the task gets assigned. */ + handle_worker_available(state, algorithm_state, worker_index); + ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); + ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); + reset_worker(photon, worker_index); + + /* If an object gets removed, check the first scenario again, where the task + * gets queued in the waiting task if the task is submitted and a worker is + * available, but the input is not. */ + handle_task_submitted(state, algorithm_state, spec); + ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); + ASSERT_EQ(num_dispatch_tasks(algorithm_state), 1); + /* If the input is removed while a task is in the dispatch queue, the task + * gets moved back to the waiting queue. */ + handle_object_removed(state, oid); + ASSERT_EQ(num_waiting_tasks(algorithm_state), 1); + ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); + /* Once the input is available, the task gets moved back to the dispatch + * queue. */ + handle_object_available(state, algorithm_state, oid); + ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); + ASSERT_EQ(num_dispatch_tasks(algorithm_state), 1); + /* Once a worker is available, the task gets assigned. */ + handle_worker_available(state, algorithm_state, worker_index); + ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); + ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); free_task_spec(spec); destroy_photon_mock(photon);