diff --git a/Makefile b/Makefile index 7c494a74e..ef5de50a7 100644 --- a/Makefile +++ b/Makefile @@ -7,8 +7,8 @@ all: $(BUILD)/photon_scheduler $(BUILD)/photon_client.so $(BUILD)/photon_client.so: photon_client.h photon_client.c common $(CC) $(CFLAGS) photon_client.c common/build/libcommon.a -fPIC -shared -o $(BUILD)/photon_client.so -$(BUILD)/photon_scheduler: photon.h photon.c common - $(CC) $(CFLAGS) -o $@ photon.c common/build/libcommon.a common/thirdparty/hiredis/libhiredis.a -Icommon/thirdparty -Icommon/ +$(BUILD)/photon_scheduler: photon.h photon_scheduler.c common + $(CC) $(CFLAGS) -o $@ photon_scheduler.c common/build/libcommon.a common/thirdparty/hiredis/libhiredis.a -Icommon/thirdparty -Icommon/ common: FORCE git submodule update --init --recursive diff --git a/lib/python/photon.py b/lib/python/photon.py index b0fc263a5..36f06ff2a 100644 --- a/lib/python/photon.py +++ b/lib/python/photon.py @@ -6,9 +6,57 @@ photon_client_library = ctypes.cdll.LoadLibrary(photon_client_library_path) photon_client_library.alloc_task_spec.restype = ctypes.c_void_p photon_client_library.photon_connect.restype = ctypes.c_void_p photon_client_library.photon_submit.restype = None +photon_client_library.photon_get_task.restype = ctypes.c_void_p ID = ctypes.c_ubyte * 20 +buffer_from_read_write_memory = ctypes.pythonapi.PyBuffer_FromReadWriteMemory +buffer_from_read_write_memory.argtypes = [ctypes.c_void_p, ctypes.c_int64] +buffer_from_read_write_memory.restype = ctypes.py_object + +buffer_from_memory = ctypes.pythonapi.PyBuffer_FromMemory +buffer_from_memory.argtypes = [ctypes.c_void_p, ctypes.c_int64] +buffer_from_memory.restype = ctypes.py_object + +photon_client_library.task_function.restype = ctypes.c_void_p +photon_client_library.task_num_args.restype = ctypes.c_int64 +photon_client_library.task_num_returns.restype = ctypes.c_int64 +photon_client_library.task_arg_type.restype = ctypes.c_int8 +photon_client_library.task_arg_id.restype = ctypes.c_void_p +photon_client_library.task_arg_val.restype = ctypes.c_void_p +photon_client_library.task_arg_length.restype = ctypes.c_void_p +photon_client_library.task_return.restype = ctypes.c_void_p + + +class TaskInfo(object): + def __init__(self, function_id, args, return_ids): + self.function_id = function_id + self.args = args + self.return_ids = return_ids + +def extract_task(c_task): + function_id = buffer_from_memory(photon_client_library.task_function(c_task), 20)[:] + num_args = photon_client_library.task_num_args(c_task) + num_returns = photon_client_library.task_num_returns(c_task) + arg_vals_and_ids = [] + for i in range(num_args): + arg_type = photon_client_library.task_arg_type(c_task, i) + if arg_type == 0: + arg_id = buffer_from_memory(photon_client_library.task_arg_id(c_task, i), 20) + arg_vals_and_ids.append((arg_type, arg_id)) + elif arg_type == 1: + arg_val = photon_client_library.task_arg_val(c_task, i)[:] + arg_length = photon_client_library.task_arg_length(c_task, i) + arg_value = buffer_from_memory(arg_val, arg_length)[:] + arg_vals_and_ids.append((arg_type, arg_value)) + else: + raise Exception("arg_type must be 0 or 1") + return_ids = [] + for i in range(num_returns): + ret_id = buffer_from_memory(photon_client_library.task_return(c_task, i), 20) + return_ids.append(ret_id[:]) + return TaskInfo(function_id, arg_vals_and_ids, return_ids) + class UniqueID(ctypes.Structure): _fields_ = [("unique_id", ID)] @@ -19,11 +67,18 @@ def make_id(string): return UniqueID(unique_id=ID(*unique_id)) class Task(object): - def __init__(self, function_id, args): + def __init__(self, function_id, args, return_ids): function_id = make_id(function_id) self.task_spec = ctypes.c_void_p(photon_client_library.alloc_task_spec(function_id, len(args), 1, 0)) for arg in args: - photon_client_library.task_args_add_ref(self.task_spec, arg) + photon_client_library.task_args_add_ref(self.task_spec, make_id(arg)) + + # Add return IDs. This may not be the appropriate place for this. + num_returns = photon_client_library.task_num_returns(self.task_spec) + for i in range(num_returns): + ret_id = buffer_from_read_write_memory(photon_client_library.task_return(self.task_spec, i), 20) + for j in range(20): + ret_id[j] = return_ids[i][j] def __del__(self): photon_client_library.free_task_spec(self.task_spec) @@ -33,6 +88,12 @@ class PhotonClient(object): def __init__(self, socket_name): self.photon_conn = ctypes.c_void_p(photon_client_library.photon_connect(socket_name)) - def submit(self, function_id, args): - task = Task(function_id, args) + def submit(self, function_id, args, return_ids): + task = Task(function_id, args, return_ids) photon_client_library.photon_submit(self.photon_conn, task.task_spec) + + def get_task(self): + c_task = ctypes.c_void_p(photon_client_library.photon_get_task(self.photon_conn)) + task = c_task # TODO Extract the actual task. EXTRACT...(c_task) + # photon_client_library.free_task_spec(c_task) + return extract_task(task) diff --git a/photon.c b/photon.c deleted file mode 100644 index 8c84c2b95..000000000 --- a/photon.c +++ /dev/null @@ -1,109 +0,0 @@ -#include -#include -#include -#include -#include -#include - -#include "common.h" -#include "event_loop.h" -#include "io.h" -#include "photon.h" -#include "state/db.h" -#include "state/task_queue.h" -#include "task.h" -#include "utarray.h" - -typedef struct { - db_handle *db; - UT_array *task_queue; -} local_scheduler_state; - -event_loop *init_local_scheduler() { return event_loop_create(); }; - -void process_message(event_loop *loop, int client_sock, void *context, - int events) { - local_scheduler_state *s = context; - - uint8_t *message; - int64_t type; - int64_t length; - read_message(client_sock, &type, &length, &message); - - switch (type) { - case SUBMIT_TASK: { - task_spec *task = (task_spec *)message; - CHECK(task_size(task) == length); - unique_id id = globally_unique_id(); - task_queue_submit_task(s->db, id, task); - } break; - case TASK_DONE: { - } break; - case DISCONNECT_CLIENT: { - LOG_INFO("Disconnecting client on fd %d", client_sock); - event_loop_remove_file(loop, client_sock); - } break; - case LOG_MESSAGE: { - } break; - default: - /* This code should be unreachable. */ - CHECK(0); - } - free(message); -} - -void new_client_connection(event_loop *loop, int listener_sock, void *context, - int events) { - local_scheduler_state *s = context; - int new_socket = accept_client(listener_sock); - event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message, s); - LOG_INFO("new connection with fd %d", new_socket); -} - -void start_server(const char *socket_name, const char *redis_addr, - int redis_port) { - int fd = bind_ipc_sock(socket_name); - local_scheduler_state state; - event_loop *loop = init_local_scheduler(); - - state.db = db_connect(redis_addr, redis_port, "photon", "", -1); - db_attach(state.db, loop); - - /* Run event loop. */ - event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection, &state); - event_loop_run(loop); -} - -int main(int argc, char *argv[]) { - /* Path of the listening socket of the local scheduler. */ - char *scheduler_socket_name = NULL; - /* IP address and port of redis. */ - char *redis_addr_port = NULL; - int c; - while ((c = getopt(argc, argv, "s:r:")) != -1) { - switch (c) { - case 's': - scheduler_socket_name = optarg; - break; - case 'r': - redis_addr_port = optarg; - break; - default: - LOG_ERR("unknown option %c", c); - exit(-1); - } - } - if (!scheduler_socket_name) { - LOG_ERR("please specify socket for incoming connections with -s switch"); - exit(-1); - } - char redis_addr[16] = {0}; - char redis_port[6] = {0}; - if (!redis_addr_port || - sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]", redis_addr, redis_port) != - 2) { - LOG_ERR("need to specify redis address like 127.0.0.1:6379 with -r switch"); - exit(-1); - } - start_server(scheduler_socket_name, &redis_addr[0], atoi(redis_port)); -} diff --git a/photon.h b/photon.h index 6a213c4a5..a59e5566f 100644 --- a/photon.h +++ b/photon.h @@ -4,11 +4,11 @@ enum photon_message_type { /** Notify the local scheduler that a task has finished. */ TASK_DONE = 64, -}; - -struct photon_conn_impl { - /* File descriptor of the Unix domain socket that connects to photon. */ - int conn; + /** Get a new task from the local scheduler. */ + GET_TASK, + /** This is sent from the local scheduler to a worker to tell the worker to + * execute a task. */ + EXECUTE_TASK, }; #endif diff --git a/photon_client.c b/photon_client.c index a33b25631..1bf87f491 100644 --- a/photon_client.c +++ b/photon_client.c @@ -14,6 +14,20 @@ void photon_submit(photon_conn *conn, task_spec *task) { write_message(conn->conn, SUBMIT_TASK, task_size(task), (uint8_t *)task); } +task_spec *photon_get_task(photon_conn *conn) { + write_message(conn->conn, GET_TASK, 0, NULL); + int64_t type; + int64_t length; + uint8_t *message; + /* Receive a task from the local scheduler. This will block until the local + * scheduler gives this client a task. */ + read_message(conn->conn, &type, &length, &message); + CHECK(type == EXECUTE_TASK); + task_spec *task = (task_spec *)message; + CHECK(length == task_size(task)); + return task; +} + void photon_task_done(photon_conn *conn) { write_message(conn->conn, TASK_DONE, 0, NULL); } diff --git a/photon_client.h b/photon_client.h index 3163d8b7d..76b09455c 100644 --- a/photon_client.h +++ b/photon_client.h @@ -4,24 +4,63 @@ #include "common/task.h" #include "photon.h" -typedef struct photon_conn_impl photon_conn; +typedef struct { + /* File descriptor of the Unix domain socket that connects to photon. */ + int conn; +} photon_conn; -/* Connect to the local scheduler. */ +/** + * Connect to the local scheduler. + * + * @param photon_socket The name of the socket to use to connect to the local + scheduler. + * @return The connection information. + */ photon_conn *photon_connect(const char *photon_socket); -/* Submit a task to the local scheduler. */ +/** + * Submit a task to the local scheduler. + * + * @param conn The connection information. + * @param task The address of the task to submit. + * @return Void. + */ void photon_submit(photon_conn *conn, task_spec *task); -/* Get next task for this client. */ +/** + * Get next task for this client. This will block until the scheduler assigns + * a task to this worker. This allocates and returns a task, and so the task + * must be freed by the caller. + * + * @todo When does this actually get freed? + * + * @param conn The connection information. + * @return The address of the assigned task. + */ task_spec *photon_get_task(photon_conn *conn); -/* Tell the local scheduler that the client has finished executing a task. */ +/** + * Tell the local scheduler that the client has finished executing a task. + * + * @param conn The connection information. + * @return Void. + */ void photon_task_done(photon_conn *conn); -/* Disconnect from the local scheduler. */ +/** + * Disconnect from the local scheduler. + * + * @param conn The connection information. + * @return Void. + */ void photon_disconnect(photon_conn *conn); -/* Send a log message to the local scheduler. */ +/** + * Send a log message to the local scheduler. + * + * @param conn The connection information. + * @return Void. + */ void photon_log_message(photon_conn *conn); #endif diff --git a/photon_scheduler.c b/photon_scheduler.c new file mode 100644 index 000000000..bf5672bcd --- /dev/null +++ b/photon_scheduler.c @@ -0,0 +1,181 @@ +#include +#include +#include +#include +#include +#include + +#include "common.h" +#include "event_loop.h" +#include "io.h" +#include "photon.h" +#include "state/db.h" +#include "state/task_queue.h" +#include "task.h" +#include "utarray.h" + +typedef struct { + /** The file descriptor used to communicate with the worker. */ + int client_sock; +} available_worker; + +/* These are needed to define the UT_arrays. */ +UT_icd task_ptr_icd = {sizeof(task_spec *), NULL, NULL, NULL}; +UT_icd worker_icd = {sizeof(available_worker), NULL, NULL, NULL}; + +typedef struct { + db_handle *db; + /** This is an array of pointers to tasks that are waiting to be scheduled. */ + UT_array *task_queue; + /** This is an array of file descriptors corresponding to clients that are + * waiting for tasks. */ + UT_array *available_worker_queue; +} local_scheduler_state; + +void try_to_assign_task(task_spec *task, local_scheduler_state *s); +void try_to_assign_task_to_worker(int client_sock, local_scheduler_state *s); + +event_loop *init_local_scheduler() { return event_loop_create(); }; + +void process_message(event_loop *loop, int client_sock, void *context, + int events) { + local_scheduler_state *s = context; + + uint8_t *message; + int64_t type; + int64_t length; + read_message(client_sock, &type, &length, &message); + + switch (type) { + case SUBMIT_TASK: { + task_spec *task = (task_spec *)message; + CHECK(task_size(task) == length); + /* Create a unique task instance ID. This is different from the task ID and + * is used to distinguish between potentially multiple executions of the + * task. */ + unique_id id = globally_unique_id(); + // task_queue_submit_task(s->db, id, task); + /* Try to assign the task to a worker locally. TODO(rkn): This should + * probably go somewhere else. */ + try_to_assign_task(task, s); + } break; + case TASK_DONE: { + } break; + case GET_TASK: { + try_to_assign_task_to_worker(client_sock, s); + } break; + case DISCONNECT_CLIENT: { + LOG_INFO("Disconnecting client on fd %d", client_sock); + event_loop_remove_file(loop, client_sock); + } break; + case LOG_MESSAGE: { + } break; + default: + /* This code should be unreachable. */ + CHECK(0); + } + free(message); +} + +void try_to_assign_task(task_spec *task, local_scheduler_state *s) { + /* Assign this task to an available worker. If there are no available workers, + * then add this task to the local task queue. */ + if (utarray_len(s->available_worker_queue) > 0) { + /* Get the last available worker in the available worker queue. */ + available_worker *worker = + (available_worker *)utarray_back(s->available_worker_queue); + /* Tell the available worker to execute the task. */ + write_message(worker->client_sock, EXECUTE_TASK, task_size(task), + (uint8_t *)task); + utarray_pop_back(s->available_worker_queue); + /* TODO: Do we need to free the available_worker struct? */ + } else { + /* Add the task to the task queue. */ + task_spec *task_copy = malloc(task_size(task)); + memcpy(task_copy, task, task_size(task)); + utarray_push_back(s->task_queue, &task_copy); + } +} + +void try_to_assign_task_to_worker(int client_sock, local_scheduler_state *s) { + if (utarray_len(s->task_queue) > 0) { + /* Get the last task in the task queue. */ + task_spec **task_ptr = (task_spec **)utarray_back(s->task_queue); + task_spec *task = *task_ptr; + /* Send a task to the worker. */ + write_message(client_sock, EXECUTE_TASK, task_size(task), (uint8_t *)task); + /* Update the task queue data structure and free the task. */ + utarray_pop_back(s->task_queue); + free(task); + } else { + /* Check that client_sock is not already in the available workers. */ + for (available_worker *p = + (available_worker *)utarray_front(s->available_worker_queue); + p != NULL; + p = (available_worker *)utarray_next(s->available_worker_queue, p)) { + CHECK(p->client_sock != client_sock); + } + /* Add client_sock to a list of available workers. */ + available_worker worker_info = {.client_sock = client_sock}; + utarray_push_back(s->available_worker_queue, &worker_info); + LOG_INFO("Adding client_sock %d to available workers.\n", client_sock); + } +} + +void new_client_connection(event_loop *loop, int listener_sock, void *context, + int events) { + local_scheduler_state *s = context; + int new_socket = accept_client(listener_sock); + event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message, s); + LOG_INFO("new connection with fd %d", new_socket); +} + +void start_server(const char *socket_name, const char *redis_addr, + int redis_port) { + int fd = bind_ipc_sock(socket_name); + local_scheduler_state state; + event_loop *loop = init_local_scheduler(); + + state.db = db_connect(redis_addr, redis_port, "photon", "", -1); + db_attach(state.db, loop); + utarray_new(state.task_queue, &task_ptr_icd); + utarray_new(state.available_worker_queue, &worker_icd); + + /* Run event loop. */ + event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection, &state); + event_loop_run(loop); +} + +int main(int argc, char *argv[]) { + /* Path of the listening socket of the local scheduler. */ + char *scheduler_socket_name = NULL; + /* IP address and port of redis. */ + char *redis_addr_port = NULL; + int c; + while ((c = getopt(argc, argv, "s:r:")) != -1) { + switch (c) { + case 's': + scheduler_socket_name = optarg; + break; + case 'r': + redis_addr_port = optarg; + break; + default: + LOG_ERR("unknown option %c", c); + exit(-1); + } + } + if (!scheduler_socket_name) { + LOG_ERR("please specify socket for incoming connections with -s switch"); + exit(-1); + } + char redis_addr[16] = {0}; + char redis_port[6] = {0}; + if (!redis_addr_port || + sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]", redis_addr, redis_port) != + 2) { + LOG_ERR("need to specify redis address like 127.0.0.1:6379 with -r switch"); + exit(-1); + } + start_server(scheduler_socket_name, &redis_addr[0], atoi(redis_port)); +} diff --git a/test/test.py b/test/test.py index 2307f2276..0517c40a5 100644 --- a/test/test.py +++ b/test/test.py @@ -30,8 +30,12 @@ class TestPhotonClient(unittest.TestCase): self.p2.kill() def test_create(self): - l = [photon.make_id(20 * "a"), photon.make_id(20 * "b"), photon.make_id(20 * "c")] - self.photon_client.submit(20 * "a", l) + l = [20 * "a", 20 * "b", 20 * "c"] + r = [20 * "e", 20 * "f"] + # Submit a task. + self.photon_client.submit(20 * "d", l, r) + # Get the task. + task = self.photon_client.get_task() if __name__ == "__main__": unittest.main(verbosity=2)