From 872e68b5b004cbde78aee18325cee187387fac9c Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Tue, 4 Oct 2016 16:25:11 -0700 Subject: [PATCH 1/4] submit task to redis --- common | 2 +- photon_scheduler.c | 130 +++++++++++++++++++++++---------------------- photon_scheduler.h | 23 ++++---- 3 files changed, 81 insertions(+), 74 deletions(-) diff --git a/common b/common index 084220b0e..49ac871ef 160000 --- a/common +++ b/common @@ -1 +1 @@ -Subproject commit 084220b0e70de6bed466e97e08f4b6909133aafb +Subproject commit 49ac871ef6a6751835dcd8448f1d67ef4d6c82ad diff --git a/photon_scheduler.c b/photon_scheduler.c index bf5672bcd..e6b6cdad2 100644 --- a/photon_scheduler.c +++ b/photon_scheduler.c @@ -9,9 +9,9 @@ #include "event_loop.h" #include "io.h" #include "photon.h" +#include "photon_scheduler.h" #include "state/db.h" -#include "state/task_queue.h" -#include "task.h" +#include "state/task_log.h" #include "utarray.h" typedef struct { @@ -20,22 +20,76 @@ typedef struct { } available_worker; /* These are needed to define the UT_arrays. */ -UT_icd task_ptr_icd = {sizeof(task_spec *), NULL, NULL, NULL}; +UT_icd task_ptr_icd = {sizeof(task_instance *), NULL, NULL, NULL}; UT_icd worker_icd = {sizeof(available_worker), NULL, NULL, NULL}; -typedef struct { +struct local_scheduler_state { db_handle *db; /** This is an array of pointers to tasks that are waiting to be scheduled. */ UT_array *task_queue; /** This is an array of file descriptors corresponding to clients that are * waiting for tasks. */ UT_array *available_worker_queue; -} local_scheduler_state; +}; -void try_to_assign_task(task_spec *task, local_scheduler_state *s); -void try_to_assign_task_to_worker(int client_sock, local_scheduler_state *s); +local_scheduler_state *init_local_scheduler(event_loop *loop, + const char *redis_addr, + int redis_port) { + local_scheduler_state *state = malloc(sizeof(local_scheduler_state)); + state->db = db_connect(redis_addr, redis_port, "photon", "", -1); + db_attach(state->db, loop); + utarray_new(state->task_queue, &task_ptr_icd); + utarray_new(state->available_worker_queue, &worker_icd); + return state; +}; -event_loop *init_local_scheduler() { return event_loop_create(); }; +void handle_submit_task(local_scheduler_state *s, task_spec *task) { + /* Assign this task to an available worker. If there are no available workers, + * then add this task to the local task queue. */ + task_iid task_iid = globally_unique_id(); + task_instance *instance = make_task_instance(task_iid, task, TASK_WAITING, NIL_ID); + if (utarray_len(s->available_worker_queue) > 0) { + /* Get the last available worker in the available worker queue. */ + available_worker *worker = + (available_worker *)utarray_back(s->available_worker_queue); + /* Tell the available worker to execute the task. */ + write_message(worker->client_sock, EXECUTE_TASK, task_size(task), + (uint8_t *)task); + utarray_pop_back(s->available_worker_queue); + /* TODO: Do we need to free the available_worker struct? */ + } else { + /* Add the task to the task queue. */ + utarray_push_back(s->task_queue, &instance); + } + /* Submit task to redis. */ + task_log_add_task(s->db, instance); + // free(instance); +} + +void handle_get_task(local_scheduler_state *s, int client_sock) { + if (utarray_len(s->task_queue) > 0) { + /* Get the last task in the task queue. */ + task_instance **back = (task_instance **)utarray_back(s->task_queue); + task_spec *task = task_instance_task_spec(*back); + /* Send a task to the worker. */ + write_message(client_sock, EXECUTE_TASK, task_size(task), (uint8_t *)task); + /* Update the task queue data structure and free the task. */ + utarray_pop_back(s->task_queue); + free(*back); + } else { + /* Check that client_sock is not already in the available workers. */ + for (available_worker *p = + (available_worker *)utarray_front(s->available_worker_queue); + p != NULL; + p = (available_worker *)utarray_next(s->available_worker_queue, p)) { + CHECK(p->client_sock != client_sock); + } + /* Add client_sock to a list of available workers. */ + available_worker worker_info = {.client_sock = client_sock}; + utarray_push_back(s->available_worker_queue, &worker_info); + LOG_INFO("Adding client_sock %d to available workers.\n", client_sock); + } +} void process_message(event_loop *loop, int client_sock, void *context, int events) { @@ -57,12 +111,12 @@ void process_message(event_loop *loop, int client_sock, void *context, // task_queue_submit_task(s->db, id, task); /* Try to assign the task to a worker locally. TODO(rkn): This should * probably go somewhere else. */ - try_to_assign_task(task, s); + handle_submit_task(s, task); } break; case TASK_DONE: { } break; case GET_TASK: { - try_to_assign_task_to_worker(client_sock, s); + handle_get_task(s, client_sock); } break; case DISCONNECT_CLIENT: { LOG_INFO("Disconnecting client on fd %d", client_sock); @@ -77,51 +131,6 @@ void process_message(event_loop *loop, int client_sock, void *context, free(message); } -void try_to_assign_task(task_spec *task, local_scheduler_state *s) { - /* Assign this task to an available worker. If there are no available workers, - * then add this task to the local task queue. */ - if (utarray_len(s->available_worker_queue) > 0) { - /* Get the last available worker in the available worker queue. */ - available_worker *worker = - (available_worker *)utarray_back(s->available_worker_queue); - /* Tell the available worker to execute the task. */ - write_message(worker->client_sock, EXECUTE_TASK, task_size(task), - (uint8_t *)task); - utarray_pop_back(s->available_worker_queue); - /* TODO: Do we need to free the available_worker struct? */ - } else { - /* Add the task to the task queue. */ - task_spec *task_copy = malloc(task_size(task)); - memcpy(task_copy, task, task_size(task)); - utarray_push_back(s->task_queue, &task_copy); - } -} - -void try_to_assign_task_to_worker(int client_sock, local_scheduler_state *s) { - if (utarray_len(s->task_queue) > 0) { - /* Get the last task in the task queue. */ - task_spec **task_ptr = (task_spec **)utarray_back(s->task_queue); - task_spec *task = *task_ptr; - /* Send a task to the worker. */ - write_message(client_sock, EXECUTE_TASK, task_size(task), (uint8_t *)task); - /* Update the task queue data structure and free the task. */ - utarray_pop_back(s->task_queue); - free(task); - } else { - /* Check that client_sock is not already in the available workers. */ - for (available_worker *p = - (available_worker *)utarray_front(s->available_worker_queue); - p != NULL; - p = (available_worker *)utarray_next(s->available_worker_queue, p)) { - CHECK(p->client_sock != client_sock); - } - /* Add client_sock to a list of available workers. */ - available_worker worker_info = {.client_sock = client_sock}; - utarray_push_back(s->available_worker_queue, &worker_info); - LOG_INFO("Adding client_sock %d to available workers.\n", client_sock); - } -} - void new_client_connection(event_loop *loop, int listener_sock, void *context, int events) { local_scheduler_state *s = context; @@ -133,16 +142,11 @@ void new_client_connection(event_loop *loop, int listener_sock, void *context, void start_server(const char *socket_name, const char *redis_addr, int redis_port) { int fd = bind_ipc_sock(socket_name); - local_scheduler_state state; - event_loop *loop = init_local_scheduler(); - - state.db = db_connect(redis_addr, redis_port, "photon", "", -1); - db_attach(state.db, loop); - utarray_new(state.task_queue, &task_ptr_icd); - utarray_new(state.available_worker_queue, &worker_icd); + event_loop *loop = event_loop_create(); + local_scheduler_state *state = init_local_scheduler(loop, redis_addr, redis_port); /* Run event loop. */ - event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection, &state); + event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection, state); event_loop_run(loop); } diff --git a/photon_scheduler.h b/photon_scheduler.h index cce91155d..bef1f42d9 100644 --- a/photon_scheduler.h +++ b/photon_scheduler.h @@ -1,15 +1,18 @@ -#ifndef PHOTON_SCHEDULER -#define PHOTON_SCHEDULER +#ifndef PHOTON_SCHEDULER_H +#define PHOTON_SCHEDULER_H + +#include "task.h" + +typedef struct local_scheduler_state local_scheduler_state; /* Establish a connection to a new client. */ -void new_client_connection(local_scheduler_state *s, int listener_sock); +void new_client_connection(event_loop *loop, int listener_sock, void *context, + int events); -/* schedule a task on a given worker. */ -void schedule_on_worker(local_scheduler_state *s, task_spec *task, - int client_id); +/* Assign a task to a worker. */ +void handle_get_task(local_scheduler_state *s, int client_sock); -/* Handle new incoming task that was scheduled by the globl scheduler on - * this local scheduler. */ -void schedule_task(local_scheduler_state *s, task_spec *task) +/* Handle incoming submit request by a worker. */ +void handle_submit_task(local_scheduler_state *s, task_spec *task); -#endif +#endif /* PHOTON_SCHEDULER_H */ From 67677c3c923f6e3e949532b642952997a951cc4d Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Tue, 4 Oct 2016 17:06:52 -0700 Subject: [PATCH 2/4] update documentation and common --- common | 2 +- photon_scheduler.c | 11 ++++++----- photon_scheduler.h | 24 +++++++++++++++++++++--- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/common b/common index 49ac871ef..4329afbd5 160000 --- a/common +++ b/common @@ -1 +1 @@ -Subproject commit 49ac871ef6a6751835dcd8448f1d67ef4d6c82ad +Subproject commit 4329afbd53449412b08e64d4057a2857393c2212 diff --git a/photon_scheduler.c b/photon_scheduler.c index e6b6cdad2..7d9b5f7ef 100644 --- a/photon_scheduler.c +++ b/photon_scheduler.c @@ -32,9 +32,8 @@ struct local_scheduler_state { UT_array *available_worker_queue; }; -local_scheduler_state *init_local_scheduler(event_loop *loop, - const char *redis_addr, - int redis_port) { +local_scheduler_state * +init_local_scheduler(event_loop *loop, const char *redis_addr, int redis_port) { local_scheduler_state *state = malloc(sizeof(local_scheduler_state)); state->db = db_connect(redis_addr, redis_port, "photon", "", -1); db_attach(state->db, loop); @@ -47,7 +46,8 @@ void handle_submit_task(local_scheduler_state *s, task_spec *task) { /* Assign this task to an available worker. If there are no available workers, * then add this task to the local task queue. */ task_iid task_iid = globally_unique_id(); - task_instance *instance = make_task_instance(task_iid, task, TASK_WAITING, NIL_ID); + task_instance *instance = + make_task_instance(task_iid, task, TASK_STATUS_WAITING, NIL_ID); if (utarray_len(s->available_worker_queue) > 0) { /* Get the last available worker in the available worker queue. */ available_worker *worker = @@ -143,7 +143,8 @@ void start_server(const char *socket_name, const char *redis_addr, int redis_port) { int fd = bind_ipc_sock(socket_name); event_loop *loop = event_loop_create(); - local_scheduler_state *state = init_local_scheduler(loop, redis_addr, redis_port); + local_scheduler_state *state = + init_local_scheduler(loop, redis_addr, redis_port); /* Run event loop. */ event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection, state); diff --git a/photon_scheduler.h b/photon_scheduler.h index bef1f42d9..54905bb88 100644 --- a/photon_scheduler.h +++ b/photon_scheduler.h @@ -5,14 +5,32 @@ typedef struct local_scheduler_state local_scheduler_state; -/* Establish a connection to a new client. */ +/** + * Establish a connection to a new client. + * + * @param loop Event loop of the local scheduler. + * @param listener_socket Socket the local scheduler is listening on for new + * client requests. + * @param context State of the local scheduler. + * @param events Flag for events that are available on the listener socket. + */ void new_client_connection(event_loop *loop, int listener_sock, void *context, int events); -/* Assign a task to a worker. */ +/** + * Assign a task to a worker. + * + * @param s State of the local scheduler. + * @param client_sock Socket by which the worker is connected. + */ void handle_get_task(local_scheduler_state *s, int client_sock); -/* Handle incoming submit request by a worker. */ +/** + * Handle incoming submit request by a worker. + * + * @param s State of the local scheduler. + * @param task Task specification of the task to be submitted. + */ void handle_submit_task(local_scheduler_state *s, task_spec *task); #endif /* PHOTON_SCHEDULER_H */ From a7a963445d50fa0c53ef771404a40da06abc3cc7 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Wed, 5 Oct 2016 13:30:10 -0700 Subject: [PATCH 3/4] fixes --- common | 2 +- photon_scheduler.h | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/common b/common index 4329afbd5..4204500d2 160000 --- a/common +++ b/common @@ -1 +1 @@ -Subproject commit 4329afbd53449412b08e64d4057a2857393c2212 +Subproject commit 4204500d23be7726e27598badb691d29d08a0ad7 diff --git a/photon_scheduler.h b/photon_scheduler.h index 54905bb88..591ffe0f5 100644 --- a/photon_scheduler.h +++ b/photon_scheduler.h @@ -10,9 +10,10 @@ typedef struct local_scheduler_state local_scheduler_state; * * @param loop Event loop of the local scheduler. * @param listener_socket Socket the local scheduler is listening on for new - * client requests. + * client requests. * @param context State of the local scheduler. * @param events Flag for events that are available on the listener socket. + * @return Void. */ void new_client_connection(event_loop *loop, int listener_sock, void *context, int events); @@ -22,6 +23,7 @@ void new_client_connection(event_loop *loop, int listener_sock, void *context, * * @param s State of the local scheduler. * @param client_sock Socket by which the worker is connected. + * @return Void. */ void handle_get_task(local_scheduler_state *s, int client_sock); @@ -30,6 +32,7 @@ void handle_get_task(local_scheduler_state *s, int client_sock); * * @param s State of the local scheduler. * @param task Task specification of the task to be submitted. + * @return Void. */ void handle_submit_task(local_scheduler_state *s, task_spec *task); From 0f97855333370f99e31391442a4ef9b97969032b Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Wed, 5 Oct 2016 14:11:02 -0700 Subject: [PATCH 4/4] More fixes. --- photon_scheduler.c | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/photon_scheduler.c b/photon_scheduler.c index 7d9b5f7ef..44022451c 100644 --- a/photon_scheduler.c +++ b/photon_scheduler.c @@ -43,30 +43,43 @@ init_local_scheduler(event_loop *loop, const char *redis_addr, int redis_port) { }; void handle_submit_task(local_scheduler_state *s, task_spec *task) { - /* Assign this task to an available worker. If there are no available workers, - * then add this task to the local task queue. */ + /* Create a unique task instance ID. This is different from the task ID and + * is used to distinguish between potentially multiple executions of the + * task. */ task_iid task_iid = globally_unique_id(); task_instance *instance = make_task_instance(task_iid, task, TASK_STATUS_WAITING, NIL_ID); - if (utarray_len(s->available_worker_queue) > 0) { + /* Assign this task to an available worker. If there are no available workers, + * then add this task to the local task queue. */ + int schedule_locally = utarray_len(s->available_worker_queue) > 0; + if (schedule_locally) { /* Get the last available worker in the available worker queue. */ available_worker *worker = (available_worker *)utarray_back(s->available_worker_queue); /* Tell the available worker to execute the task. */ write_message(worker->client_sock, EXECUTE_TASK, task_size(task), (uint8_t *)task); + /* Remove the available worker from the queue and free the struct. */ utarray_pop_back(s->available_worker_queue); - /* TODO: Do we need to free the available_worker struct? */ + free(worker); } else { - /* Add the task to the task queue. */ + /* Add the task to the task queue. This passes ownership of the task queue. + * And the task will be freed when it is assigned to a worker. */ utarray_push_back(s->task_queue, &instance); } - /* Submit task to redis. */ + /* Submit the task to redis. */ task_log_add_task(s->db, instance); - // free(instance); + if (schedule_locally) { + /* If the task was scheduled locally, we need to free it. Otherwise, + * ownership of the task is passed to the task_queue, and it will be freed + * when it is assigned to a worker. */ + free(instance); + } } void handle_get_task(local_scheduler_state *s, int client_sock) { + /* If there is an available task, assign that task to this worker. Otherwise + * add the worker to the queue of available workers. */ if (utarray_len(s->task_queue) > 0) { /* Get the last task in the task queue. */ task_instance **back = (task_instance **)utarray_back(s->task_queue); @@ -84,7 +97,8 @@ void handle_get_task(local_scheduler_state *s, int client_sock) { p = (available_worker *)utarray_next(s->available_worker_queue, p)) { CHECK(p->client_sock != client_sock); } - /* Add client_sock to a list of available workers. */ + /* Add client_sock to a list of available workers. This struct will be freed + * when a task is assigned to this worker. */ available_worker worker_info = {.client_sock = client_sock}; utarray_push_back(s->available_worker_queue, &worker_info); LOG_INFO("Adding client_sock %d to available workers.\n", client_sock); @@ -104,13 +118,6 @@ void process_message(event_loop *loop, int client_sock, void *context, case SUBMIT_TASK: { task_spec *task = (task_spec *)message; CHECK(task_size(task) == length); - /* Create a unique task instance ID. This is different from the task ID and - * is used to distinguish between potentially multiple executions of the - * task. */ - unique_id id = globally_unique_id(); - // task_queue_submit_task(s->db, id, task); - /* Try to assign the task to a worker locally. TODO(rkn): This should - * probably go somewhere else. */ handle_submit_task(s, task); } break; case TASK_DONE: {