From 9d1e750e8fa55fcf8419510ff07023e7f7ff2eb8 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 10 Nov 2016 18:13:26 -0800 Subject: [PATCH] Merge task table and task log into a single table (#30) * Merge task table and task log * Fix test in db tests * Address Robert's comments and some better error checking * Add a LOG_FATAL that exits the program --- src/common/Makefile | 12 +- src/common/common.c | 6 +- src/common/common.h | 36 +- src/common/doc/tasks.md | 13 +- src/common/lib/python/common_extension.c | 8 +- src/common/state/object_table.c | 21 ++ src/common/state/object_table.h | 52 +++ src/common/state/redis.c | 380 +++++++++++++++----- src/common/state/redis.h | 68 +++- src/common/state/table.c | 5 +- src/common/state/table.h | 19 +- src/common/state/task_log.c | 34 -- src/common/state/task_log.h | 88 ----- src/common/state/task_table.c | 52 +++ src/common/state/task_table.h | 138 +++++++- src/common/task.c | 87 +++-- src/common/task.h | 105 ++++-- src/common/test/db_tests.c | 97 +++-- src/common/test/object_table_tests.c | 188 ++++++++-- src/common/test/task_log_tests.c | 316 ----------------- src/common/test/task_table_tests.c | 430 +++++++++++++++++++++++ src/common/test/task_tests.c | 160 ++++----- src/common/test/test_common.h | 10 +- src/photon/photon_algorithm.c | 26 +- src/photon/photon_algorithm.h | 2 +- src/photon/photon_client.c | 5 +- src/photon/photon_scheduler.c | 22 +- src/plasma/plasma_client.c | 6 +- src/plasma/plasma_manager.c | 25 +- src/plasma/plasma_store.c | 9 +- 30 files changed, 1578 insertions(+), 842 deletions(-) delete mode 100644 src/common/state/task_log.c delete mode 100644 src/common/state/task_log.h create mode 100644 src/common/state/task_table.c delete mode 100644 src/common/test/task_log_tests.c create mode 100644 src/common/test/task_table_tests.c diff --git a/src/common/Makefile b/src/common/Makefile index 86cab3688..fb898ee4b 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -4,7 +4,7 @@ BUILD = build all: hiredis $(BUILD)/libcommon.a -$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o state/table.o state/object_table.o state/task_log.o thirdparty/ae/ae.o thirdparty/sha256.o +$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o state/table.o state/object_table.o state/task_table.o thirdparty/ae/ae.o thirdparty/sha256.o ar rcs $@ $^ $(BUILD)/common_tests: test/common_tests.c $(BUILD)/libcommon.a @@ -16,8 +16,8 @@ $(BUILD)/db_tests: hiredis test/db_tests.c $(BUILD)/libcommon.a $(BUILD)/object_table_tests: hiredis test/object_table_tests.c $(BUILD)/libcommon.a $(CC) -o $@ test/object_table_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS) -$(BUILD)/task_log_tests: hiredis test/task_log_tests.c $(BUILD)/libcommon.a - $(CC) -o $@ test/task_log_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS) +$(BUILD)/task_table_tests: hiredis test/task_table_tests.c $(BUILD)/libcommon.a + $(CC) -o $@ test/task_table_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS) $(BUILD)/io_tests: test/io_tests.c $(BUILD)/libcommon.a $(CC) -o $@ $^ $(CFLAGS) @@ -38,7 +38,7 @@ redis: hiredis: git submodule update --init --recursive -- "thirdparty/hiredis" ; cd thirdparty/hiredis ; make -test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_log_tests $(BUILD)/object_table_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE +test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_table_tests $(BUILD)/object_table_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE ./thirdparty/redis-3.2.3/src/redis-server & sleep 1s ./build/common_tests @@ -46,7 +46,7 @@ test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_log_tests $(BUILD)/objec ./build/io_tests ./build/task_tests ./build/redis_tests - ./build/task_log_tests + ./build/task_table_tests ./build/object_table_tests valgrind: test @@ -55,7 +55,7 @@ valgrind: test valgrind --leak-check=full --error-exitcode=1 ./build/io_tests valgrind --leak-check=full --error-exitcode=1 ./build/task_tests valgrind --leak-check=full --error-exitcode=1 ./build/redis_tests - valgrind --leak-check=full --error-exitcode=1 ./build/task_log_tests + valgrind --leak-check=full --error-exitcode=1 ./build/task_table_tests valgrind --leak-check=full --error-exitcode=1 ./build/object_table_tests FORCE: diff --git a/src/common/common.c b/src/common/common.c index fdbe21616..8780c3af6 100644 --- a/src/common/common.c +++ b/src/common/common.c @@ -25,7 +25,11 @@ unique_id globally_unique_id(void) { } bool object_ids_equal(object_id first_id, object_id second_id) { - return memcmp(&first_id, &second_id, sizeof(object_id)) == 0 ? true : false; + return UNIQUE_ID_EQ(first_id, second_id) ? true : false; +} + +bool object_id_is_nil(object_id id) { + return object_ids_equal(id, NIL_OBJECT_ID); } char *sha1_to_hex(const unsigned char *sha1, char *buffer) { diff --git a/src/common/common.h b/src/common/common.h index e60bc7fdc..80eb545d5 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -25,17 +25,21 @@ #define LOG_INFO(M, ...) \ fprintf(stderr, "[INFO] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__) -#define CHECKM(COND, M, ...) \ - do { \ - if (!(COND)) { \ - LOG_ERR("Check failure: %s \n" M, #COND, ##__VA_ARGS__); \ - void *buffer[255]; \ - const int calls = backtrace(buffer, sizeof(buffer) / sizeof(void *)); \ - backtrace_symbols_fd(buffer, calls, 1); \ - exit(-1); \ - } \ +#define LOG_FATAL(M, ...) \ + do { \ + fprintf(stderr, "[FATAL] (%s:%d) " M "\n", __FILE__, __LINE__, \ + ##__VA_ARGS__); \ + void *buffer[255]; \ + const int calls = backtrace(buffer, sizeof(buffer) / sizeof(void *)); \ + backtrace_symbols_fd(buffer, calls, 1); \ + exit(-1); \ } while (0); +#define CHECKM(COND, M, ...) \ + if (!(COND)) { \ + LOG_ERR("Check failure: %s \n" M, #COND, ##__VA_ARGS__); \ + } + #define CHECK(COND) CHECKM(COND, "") /* This should be defined if we want to check calls to DCHECK. */ @@ -56,6 +60,10 @@ #define UNIQUE_ID_SIZE 20 +#define UNIQUE_ID_EQ(id1, id2) (memcmp((id1).id, (id2).id, UNIQUE_ID_SIZE) == 0) + +#define IS_NIL_ID(id) UNIQUE_ID_EQ(id, NIL_ID) + typedef struct { unsigned char id[UNIQUE_ID_SIZE]; } unique_id; extern const UT_icd object_id_icd; @@ -70,6 +78,8 @@ unique_id globally_unique_id(void); * UNIQUE_ID_SIZE + 1 */ char *sha1_to_hex(const unsigned char *sha1, char *buffer); +#define NIL_OBJECT_ID NIL_ID + typedef unique_id object_id; /** @@ -81,4 +91,12 @@ typedef unique_id object_id; */ bool object_ids_equal(object_id first_id, object_id second_id); +/** + * Compare a object ID to the nil ID. + * + * @param id The object ID to compare to nil. + * @return True if the object ID is equal to nil. + */ +bool object_id_is_nil(object_id id); + #endif diff --git a/src/common/doc/tasks.md b/src/common/doc/tasks.md index b56f89783..4431afae2 100644 --- a/src/common/doc/tasks.md +++ b/src/common/doc/tasks.md @@ -3,6 +3,7 @@ A *task specification* contains all information that is needed for computing the results of a task: +- The ID of the task - The function ID of the function that executes the task - The arguments (either object IDs for pass by reference or values for pass by value) @@ -11,23 +12,21 @@ or values for pass by value) From these, a task ID can be computed which is also stored in the task specification. -A *task instance* represents one execution of a task specification. +A *task* represents the execution of a task specification. It consists of: - A scheduling state (WAITING, SCHEDULED, RUNNING, DONE) - The target node where the task is scheduled or executed -- A unique task instance ID that identifies the particular execution - of the task. +- The task specification The task data structures are defined in `common/task.h`. -The *task log* is a mapping from the task instance ID to a sequence of -updates to the status of the task instance. It is updated by various parts -of the system: +The *task table* is a mapping from the task ID to the *task* information. It is +updated by various parts of the system: 1. The local scheduler writes it with status WAITING when submits a task to the global scheduler 2. The global scheduler appends an update WAITING -> SCHEDULED together with the node ID when assigning the task to a local scheduler 3. The local scheduler appends an update SCHEDULED -> RUNNING when it assigns a task to a worker 4. The local scheduler appends an update RUNNING -> DONE when the task finishes execution -The task log is defined in `common/state/task_log.h`. +The task table is defined in `common/state/task_table.h`. diff --git a/src/common/lib/python/common_extension.c b/src/common/lib/python/common_extension.c index bf994c83e..ff569915a 100644 --- a/src/common/lib/python/common_extension.c +++ b/src/common/lib/python/common_extension.c @@ -28,12 +28,12 @@ static int PyObjectID_init(PyObjectID *self, PyObject *args, PyObject *kwds) { if (!PyArg_ParseTuple(args, "s#", &data, &size)) { return -1; } - if (size != UNIQUE_ID_SIZE) { + if (size != sizeof(object_id)) { PyErr_SetString(CommonError, "ObjectID: object id string needs to have length 20"); return -1; } - memcpy(&self->object_id.id[0], data, UNIQUE_ID_SIZE); + memcpy(&self->object_id.id[0], data, sizeof(object_id)); return 0; } @@ -48,7 +48,7 @@ PyObject *PyObjectID_make(object_id object_id) { static PyObject *PyObjectID_id(PyObject *self) { PyObjectID *s = (PyObjectID *) self; return PyString_FromStringAndSize((char *) &s->object_id.id[0], - UNIQUE_ID_SIZE); + sizeof(object_id)); } static PyObject *PyObjectID___reduce__(PyObjectID *self) { @@ -176,7 +176,7 @@ static PyObject *PyTask_function_id(PyObject *self) { } static PyObject *PyTask_task_id(PyObject *self) { - task_id task_id = task_task_id(((PyTask *) self)->spec); + task_id task_id = task_spec_id(((PyTask *) self)->spec); return PyObjectID_make(task_id); } diff --git a/src/common/state/object_table.c b/src/common/state/object_table.c index 58dde9abb..54214c34f 100644 --- a/src/common/state/object_table.c +++ b/src/common/state/object_table.c @@ -36,3 +36,24 @@ void object_table_subscribe( init_table_callback(db_handle, object_id, sub_data, retry, done_callback, redis_object_table_subscribe, user_context); } + +void result_table_add(db_handle *db_handle, + object_id object_id, + task_id task_id_arg, + retry_info *retry, + result_table_done_callback done_callback, + void *user_context) { + task_id *task_id_copy = malloc(sizeof(task_id)); + memcpy(task_id_copy, task_id_arg.id, sizeof(task_id)); + init_table_callback(db_handle, object_id, task_id_copy, retry, done_callback, + redis_result_table_add, user_context); +} + +void result_table_lookup(db_handle *db_handle, + object_id object_id, + retry_info *retry, + result_table_lookup_callback done_callback, + void *user_context) { + init_table_callback(db_handle, object_id, NULL, retry, done_callback, + redis_result_table_lookup, user_context); +} diff --git a/src/common/state/object_table.h b/src/common/state/object_table.h index 112d174e2..00abb964c 100644 --- a/src/common/state/object_table.h +++ b/src/common/state/object_table.h @@ -4,6 +4,7 @@ #include "common.h" #include "table.h" #include "db.h" +#include "task.h" /* * ==== Lookup call and callback ==== @@ -123,4 +124,55 @@ typedef struct { void *subscribe_context; } object_table_subscribe_data; +/* + * ==== Result table ==== + */ + +/** + * Callback called when the add/remove operation for a result table entry + * completes. */ +typedef void (*result_table_done_callback)(object_id object_id, + void *user_context); + +/** + * Add information about a new object to the object table. This + * is immutable information like the ID of the task that + * created the object. + * + * @param db_handle Handle to object_table database. + * @param object_id ID of the object to add. + * @param task_id ID of the task that creates this object. + * @param retry Information about retrying the request to the database. + * @param done_callback Function to be called when database returns result. + * @param user_context Context passed by the caller. + * @return Void. + */ +void result_table_add(db_handle *db_handle, + object_id object_id, + task_id task_id, + retry_info *retry, + result_table_done_callback done_callback, + void *user_context); + +/** Callback called when the result table lookup completes. */ +typedef void (*result_table_lookup_callback)(object_id object_id, + task *task, + void *user_context); + +/** + * Lookup the task that created an object in the result table. + * + * @param db_handle Handle to object_table database. + * @param object_id ID of the object to lookup. + * @param retry Information about retrying the request to the database. + * @param done_callback Function to be called when database returns result. + * @param user_context Context passed by the caller. + * @return Void. + */ +void result_table_lookup(db_handle *db_handle, + object_id object_id, + retry_info *retry, + result_table_lookup_callback done_callback, + void *user_context); + #endif /* OBJECT_TABLE_H */ diff --git a/src/common/state/redis.c b/src/common/state/redis.c index 27c0d1327..3047f71d7 100644 --- a/src/common/state/redis.c +++ b/src/common/state/redis.c @@ -11,21 +11,19 @@ #include "db.h" #include "object_table.h" #include "task.h" -#include "task_log.h" +#include "task_table.h" #include "event_loop.h" #include "redis.h" #include "io.h" -#define LOG_REDIS_ERR(context, M, ...) \ - fprintf(stderr, "[ERROR] (%s:%d: message: %s) " M "\n", __FILE__, __LINE__, \ - context->errstr, ##__VA_ARGS__) +#define LOG_REDIS_ERR(context, M, ...) \ + LOG_INFO("Redis error %d %s; %s", context->err, context->errstr, M) #define CHECK_REDIS_CONNECT(CONTEXT_TYPE, context, M, ...) \ do { \ CONTEXT_TYPE *_context = (context); \ if (!_context) { \ - LOG_ERR("could not allocate redis context"); \ - exit(-1); \ + LOG_FATAL("could not allocate redis context"); \ } \ if (_context->err) { \ LOG_REDIS_ERR(_context, M, ##__VA_ARGS__); \ @@ -123,6 +121,62 @@ void db_attach(db_handle *db, event_loop *loop) { redisAeAttach(loop, db->sub_context); } +/** + * An internal function to allocate a task object and parse a hashmap reply + * from Redis into the task object. If the Redis reply is malformed, an empty + * task with the given task ID is returned. + * + * @param id The ID of the task we're looking up. If the reply from Redis is + * well-formed, the reply's ID should match this ID. Else, the returned + * task will have its ID set to this ID. + * @param num_redis_replies The number of keys and values in the Redis hashmap. + * @param redis_replies A pointer to the Redis hashmap keys and values. + * @return A pointer to the parsed task. + */ +task *parse_redis_task_table_entry(task_id id, + int num_redis_replies, + redisReply **redis_replies) { + task *task_result; + if (num_redis_replies == 0) { + /* There was no information about this task. */ + return NULL; + } + /* Exit immediately if there weren't 6 fields, one for each key-value pair. + * The keys are "node", "state", and "task_spec". */ + DCHECK(num_redis_replies == 6); + /* Parse the task struct's fields. */ + scheduling_state state = 0; + node_id node = NIL_ID; + task_spec *spec = NULL; + for (int i = 0; i < num_redis_replies; i = i + 2) { + char *key = redis_replies[i]->str; + redisReply *value = redis_replies[i + 1]; + if (strcmp(key, "node") == 0) { + memcpy(&node, value->str, value->len); + } else if (strcmp(key, "state") == 0) { + int scanned = sscanf(value->str, "%d", (int *) &state); + if (scanned != 1) { + LOG_FATAL("Scheduling state for task is malformed"); + state = 0; + } + } else if (strcmp(key, "task_spec") == 0) { + spec = malloc(value->len); + memcpy(spec, value->str, value->len); + } else { + LOG_FATAL("Found unexpected %s field in task log", key); + } + } + /* Exit immediately if we couldn't parse the task spec. */ + if (spec == NULL) { + LOG_FATAL("Could not parse task spec from task log"); + } + /* Build and return the task. */ + DCHECK(task_ids_equal(task_spec_id(spec), id)); + task_result = alloc_task(spec, state, node); + free_task_spec(spec); + return task_result; +} + /* * ==== object_table callbacks ==== */ @@ -133,7 +187,7 @@ void redis_object_table_add_callback(redisAsyncContext *c, REDIS_CALLBACK_HEADER(db, callback_data, r) if (callback_data->done_callback) { - task_log_done_callback done_callback = callback_data->done_callback; + task_table_done_callback done_callback = callback_data->done_callback; done_callback(callback_data->id, callback_data->user_context); } destroy_timer_callback(db->loop, callback_data); @@ -142,10 +196,12 @@ void redis_object_table_add_callback(redisAsyncContext *c, void redis_object_table_add(table_callback_data *callback_data) { CHECK(callback_data); db_handle *db = callback_data->db_handle; - redisAsyncCommand(db->context, redis_object_table_add_callback, - (void *) callback_data->timer_id, "SADD obj:%b %d", - &callback_data->id.id[0], UNIQUE_ID_SIZE, db->client_id); - if (db->context->err) { + object_id id = callback_data->id; + int status = + redisAsyncCommand(db->context, redis_object_table_add_callback, + (void *) callback_data->timer_id, "SADD obj:%b %d", + id.id, sizeof(object_id), db->client_id); + if ((status == REDIS_ERR) || db->context->err) { LOG_REDIS_ERR(db->context, "could not add object_table entry"); } } @@ -155,14 +211,117 @@ void redis_object_table_lookup(table_callback_data *callback_data) { db_handle *db = callback_data->db_handle; /* Call redis asynchronously */ - redisAsyncCommand(db->context, redis_object_table_get_entry, - (void *) callback_data->timer_id, "SMEMBERS obj:%b", - &callback_data->id.id[0], UNIQUE_ID_SIZE); - if (db->context->err) { + object_id id = callback_data->id; + int status = redisAsyncCommand(db->context, redis_object_table_get_entry, + (void *) callback_data->timer_id, + "SMEMBERS obj:%b", id.id, sizeof(object_id)); + if ((status == REDIS_ERR) || db->context->err) { LOG_REDIS_ERR(db->context, "error in object_table lookup"); } } +void redis_result_table_add_callback(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, callback_data, r) + redisReply *reply = r; + CHECK(reply->type == REDIS_REPLY_STATUS || + reply->type == REDIS_REPLY_INTEGER); + if (callback_data->done_callback) { + result_table_done_callback done_callback = callback_data->done_callback; + done_callback(callback_data->id, callback_data->user_context); + } + task_id *task_id = callback_data->data; + free(task_id); + destroy_timer_callback(db->loop, callback_data); +} + +void redis_result_table_add(table_callback_data *callback_data) { + CHECK(callback_data); + db_handle *db = callback_data->db_handle; + object_id id = callback_data->id; + task_id *result_task_id = (task_id *) callback_data->data; + /* Add the result entry to the result table. */ + int status = redisAsyncCommand(db->context, redis_result_table_add_callback, + (void *) callback_data->timer_id, + "SET result:%b %b", id.id, sizeof(object_id), + (*result_task_id).id, sizeof(task_id)); + if ((status == REDIS_ERR) || db->context->err) { + LOG_REDIS_ERR(db->context, "Error in result table add"); + } +} + +void redis_result_table_lookup_task_callback(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, callback_data, r) + redisReply *reply = r; + /* Check that we received a Redis hashmap. */ + if (reply->type != REDIS_REPLY_ARRAY) { + LOG_FATAL("Expected Redis array, received type %d %s", reply->type, + reply->str); + } + /* If the user registered a success callback, construct the task object from + * the Redis reply and call the callback. */ + result_table_lookup_callback done_callback = callback_data->done_callback; + task_id *result_task_id = callback_data->data; + if (done_callback) { + task *task_reply = parse_redis_task_table_entry( + *result_task_id, reply->elements, reply->element); + done_callback(callback_data->id, task_reply, callback_data->user_context); + free_task(task_reply); + } + free(result_task_id); + destroy_timer_callback(db->loop, callback_data); +} + +void redis_result_table_lookup_object_callback(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, callback_data, r) + redisReply *reply = r; + + if (reply->type == REDIS_REPLY_STRING) { + /* If we found the object, get the spec of the task that created it. */ + DCHECK(reply->len == sizeof(task_id)); + task_id *result_task_id = malloc(sizeof(task_id)); + memcpy(result_task_id, reply->str, reply->len); + callback_data->data = (void *) result_task_id; + int status = + redisAsyncCommand(db->context, redis_result_table_lookup_task_callback, + (void *) callback_data->timer_id, "HGETALL task:%b", + (*result_task_id).id, sizeof(task_id)); + if ((status == REDIS_ERR) || db->context->err) { + LOG_REDIS_ERR(db->context, "Could not look up result table entry"); + } + } else if (reply->type == REDIS_REPLY_NIL) { + /* The object with the requested ID was not in the table. */ + LOG_ERR("Object's result not in table."); + result_table_lookup_callback done_callback = callback_data->done_callback; + if (done_callback) { + done_callback(callback_data->id, NULL, callback_data->user_context); + } + destroy_timer_callback(db->loop, callback_data); + return; + } else { + LOG_FATAL("expected string or nil, received type %d", reply->type); + } +} + +void redis_result_table_lookup(table_callback_data *callback_data) { + CHECK(callback_data); + db_handle *db = callback_data->db_handle; + /* First, lookup the ID of the task that created this object. */ + object_id id = callback_data->id; + int status = + redisAsyncCommand(db->context, redis_result_table_lookup_object_callback, + (void *) callback_data->timer_id, "GET result:%b", + id.id, sizeof(object_id)); + if ((status == REDIS_ERR) || db->context->err) { + LOG_REDIS_ERR(db->context, "Error in result table lookup"); + } +} + /** * Get an entry from the plasma manager table in redis. * @@ -213,8 +372,7 @@ void redis_object_table_get_entry(redisAsyncContext *c, destroy_timer_callback(callback_data->db_handle->loop, callback_data); free(managers); } else { - LOG_ERR("expected integer or string, received type %d", reply->type); - exit(-1); + LOG_FATAL("expected integer or string, received type %d", reply->type); } } @@ -249,28 +407,64 @@ void redis_object_table_subscribe(table_callback_data *callback_data) { db_handle *db = callback_data->db_handle; /* subscribe to key notification associated to object id */ - - redisAsyncCommand(db->sub_context, object_table_redis_callback, - (void *) callback_data->timer_id, - "SUBSCRIBE __keyspace@0__:%b add", - (char *) &callback_data->id.id[0], UNIQUE_ID_SIZE); - - if (db->sub_context->err) { + object_id id = callback_data->id; + int status = redisAsyncCommand(db->sub_context, object_table_redis_callback, + (void *) callback_data->timer_id, + "SUBSCRIBE __keyspace@0__:%b add", id.id, + sizeof(object_id)); + if ((status == REDIS_ERR) || db->sub_context->err) { LOG_REDIS_ERR(db->sub_context, "error in redis_object_table_subscribe_callback"); } } /* - * ==== task_log callbacks ==== + * ==== task_table callbacks ==== */ -void redis_task_log_publish(table_callback_data *callback_data) { +void redis_task_table_get_task_callback(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, callback_data, r) + redisReply *reply = r; + /* Check that we received a Redis hashmap. */ + if (reply->type != REDIS_REPLY_ARRAY) { + LOG_FATAL("Expected Redis array, received type %d %s", reply->type, + reply->str); + } + /* If the user registered a success callback, construct the task object from + * the Redis reply and call the callback. */ + if (callback_data->done_callback) { + task_table_get_callback done_callback = callback_data->done_callback; + task *task_reply = parse_redis_task_table_entry( + callback_data->id, reply->elements, reply->element); + done_callback(task_reply, callback_data->user_context); + free_task(task_reply); + } + destroy_timer_callback(db->loop, callback_data); +} + +void redis_task_table_get_task(table_callback_data *callback_data) { + CHECK(callback_data); db_handle *db = callback_data->db_handle; - task_instance *task_instance = callback_data->data; - task_iid task_iid = *task_instance_id(task_instance); - node_id node = *task_instance_node(task_instance); - int32_t state = *task_instance_state(task_instance); + task_id id = callback_data->id; + int status = + redisAsyncCommand(db->context, redis_task_table_get_task_callback, + (void *) callback_data->timer_id, "HGETALL task:%b", + id.id, sizeof(task_id)); + if ((status == REDIS_ERR) || db->sub_context->err) { + LOG_REDIS_ERR(db->sub_context, "Could not get task from task table"); + } +} + +void redis_task_table_publish(table_callback_data *callback_data, + bool task_added) { + db_handle *db = callback_data->db_handle; + task *task = callback_data->data; + task_id id = task_task_id(task); + node_id node = task_node(task); + scheduling_state state = task_state(task); + task_spec *spec = task_task_spec(task); LOG_DEBUG("Called log_publish callback"); @@ -294,84 +488,98 @@ void redis_task_log_publish(table_callback_data *callback_data) { } if (((bool *) callback_data->requests_info)[PUSH_INDEX] == false) { - if (*task_instance_state(task_instance) == TASK_STATUS_WAITING) { - redisAsyncCommand(db->context, redis_task_log_publish_push_callback, - (void *) callback_data->timer_id, "RPUSH tasklog:%b %b", - (char *) &task_iid.id[0], UNIQUE_ID_SIZE, - (char *) task_instance, - task_instance_size(task_instance)); + /* If the task has already been added to the task table, only update the + * scheduling information fields. */ + int status = REDIS_OK; + if (task_added) { + status = redisAsyncCommand( + db->context, redis_task_table_publish_push_callback, + (void *) callback_data->timer_id, "HMSET task:%b state %d node %b", + (char *) id.id, sizeof(task_id), state, (char *) node.id, + sizeof(node_id)); } else { - task_update update = {.state = state, .node = node}; - redisAsyncCommand(db->context, redis_task_log_publish_push_callback, - (void *) callback_data->timer_id, "RPUSH tasklog:%b %b", - (char *) &task_iid.id[0], UNIQUE_ID_SIZE, - (char *) &update, sizeof(update)); + status = redisAsyncCommand( + db->context, redis_task_table_publish_push_callback, + (void *) callback_data->timer_id, + "HMSET task:%b state %d node %b task_spec %b", (char *) id.id, + sizeof(task_id), state, (char *) node.id, sizeof(node_id), + (char *) spec, task_spec_size(spec)); } - - if (db->context->err) { - LOG_REDIS_ERR(db->context, "error setting task in task_log_add_task"); + if ((status = REDIS_ERR) || db->context->err) { + LOG_REDIS_ERR(db->context, "error setting task in task_table_add_task"); } } if (((bool *) callback_data->requests_info)[PUBLISH_INDEX] == false) { - redisAsyncCommand(db->context, redis_task_log_publish_publish_callback, - (void *) callback_data->timer_id, - "PUBLISH task_log:%b:%d %b", (char *) &node.id[0], - UNIQUE_ID_SIZE, state, (char *) task_instance, - task_instance_size(task_instance)); + int status = redisAsyncCommand( + db->context, redis_task_table_publish_publish_callback, + (void *) callback_data->timer_id, "PUBLISH task:%b:%d %b", + (char *) node.id, sizeof(node_id), state, (char *) task, + task_size(task)); - if (db->context->err) { - LOG_REDIS_ERR(db->context, "error publishing task in task_log_add_task"); + if ((status == REDIS_ERR) || db->context->err) { + LOG_REDIS_ERR(db->context, + "error publishing task in task_table_add_task"); } } } -void redis_task_log_publish_push_callback(redisAsyncContext *c, - void *r, - void *privdata) { - REDIS_CALLBACK_HEADER(db, callback_data, r) +void redis_task_table_add_task(table_callback_data *callback_data) { + redis_task_table_publish(callback_data, false); +} +void redis_task_table_update(table_callback_data *callback_data) { + redis_task_table_publish(callback_data, true); +} + +void redis_task_table_publish_push_callback(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, callback_data, r) CHECK(callback_data->requests_info != NULL); ((bool *) callback_data->requests_info)[PUSH_INDEX] = true; if (((bool *) callback_data->requests_info)[PUBLISH_INDEX] == true) { if (callback_data->done_callback) { - task_log_done_callback done_callback = callback_data->done_callback; + task_table_done_callback done_callback = callback_data->done_callback; done_callback(callback_data->id, callback_data->user_context); } destroy_timer_callback(db->loop, callback_data); } } -void redis_task_log_publish_publish_callback(redisAsyncContext *c, - void *r, - void *privdata) { +void redis_task_table_publish_publish_callback(redisAsyncContext *c, + void *r, + void *privdata) { REDIS_CALLBACK_HEADER(db, callback_data, r) - CHECK(callback_data->requests_info != NULL); ((bool *) callback_data->requests_info)[PUBLISH_INDEX] = true; if (((bool *) callback_data->requests_info)[PUSH_INDEX] == true) { if (callback_data->done_callback) { - task_log_done_callback done_callback = callback_data->done_callback; + task_table_done_callback done_callback = callback_data->done_callback; done_callback(callback_data->id, callback_data->user_context); } destroy_timer_callback(db->loop, callback_data); } } -void task_log_redis_callback(redisAsyncContext *c, void *r, void *privdata) { +void redis_task_table_subscribe_callback(redisAsyncContext *c, + void *r, + void *privdata) { REDIS_CALLBACK_HEADER(db, callback_data, r) redisReply *reply = r; CHECK(reply->type == REDIS_REPLY_ARRAY); - /* First entry is message type, second is topic, third is payload. */ - CHECK(reply->elements > 2); /* If this condition is true, we got the initial message that acknowledged the * subscription. */ - if (reply->element[2]->str == NULL) { + CHECK(reply->elements > 2); + /* First entry is message type, then possibly the regex we psubscribed to, + * then topic, then payload. */ + redisReply *payload = reply->element[reply->elements - 1]; + if (payload->str == NULL) { if (callback_data->done_callback) { - task_log_done_callback done_callback = callback_data->done_callback; + task_table_done_callback done_callback = callback_data->done_callback; done_callback(callback_data->id, callback_data->user_context); } /* Note that we do not destroy the callback data yet because the @@ -380,32 +588,36 @@ void task_log_redis_callback(redisAsyncContext *c, void *r, void *privdata) { return; } /* Otherwise, parse the task and call the callback. */ - task_log_subscribe_data *data = callback_data->data; + task_table_subscribe_data *data = callback_data->data; - task_instance *instance = malloc(reply->element[2]->len); - memcpy(instance, reply->element[2]->str, reply->element[2]->len); + task *task = malloc(payload->len); + memcpy(task, payload->str, payload->len); if (data->subscribe_callback) { - data->subscribe_callback(instance, data->subscribe_context); + data->subscribe_callback(task, data->subscribe_context); } - task_instance_free(instance); + free_task(task); } -void redis_task_log_subscribe(table_callback_data *callback_data) { +void redis_task_table_subscribe(table_callback_data *callback_data) { db_handle *db = callback_data->db_handle; - task_log_subscribe_data *data = callback_data->data; - - if (memcmp(&data->node.id[0], &NIL_ID.id[0], UNIQUE_ID_SIZE) == 0) { - redisAsyncCommand(db->sub_context, task_log_redis_callback, - (void *) callback_data->timer_id, - "PSUBSCRIBE task_log:*:%d", data->state_filter); + task_table_subscribe_data *data = callback_data->data; + int status = REDIS_OK; + if (IS_NIL_ID(data->node)) { + /* TODO(swang): Implement the state_filter by translating the bitmask into + * a Redis key-matching pattern. */ + status = + redisAsyncCommand(db->sub_context, redis_task_table_subscribe_callback, + (void *) callback_data->timer_id, + "PSUBSCRIBE task:*:%d", data->state_filter); } else { - redisAsyncCommand(db->sub_context, task_log_redis_callback, - (void *) callback_data->timer_id, - "SUBSCRIBE task_log:%b:%d", (char *) &data->node.id[0], - UNIQUE_ID_SIZE, data->state_filter); + node_id node = data->node; + status = redisAsyncCommand( + db->sub_context, redis_task_table_subscribe_callback, + (void *) callback_data->timer_id, "SUBSCRIBE task:%b:%d", + (char *) node.id, sizeof(node_id), data->state_filter); } - if (db->sub_context->err) { - LOG_REDIS_ERR(db->sub_context, "error in task_log_register_callback"); + if ((status == REDIS_ERR) || db->sub_context->err) { + LOG_REDIS_ERR(db->sub_context, "error in task_table_register_callback"); } } diff --git a/src/common/state/redis.h b/src/common/state/redis.h index 8d29616fe..026cf30f9 100644 --- a/src/common/state/redis.h +++ b/src/common/state/redis.h @@ -3,7 +3,7 @@ #include "db.h" #include "object_table.h" -#include "task_log.h" +#include "task_table.h" #include "hiredis/hiredis.h" #include "hiredis/async.h" @@ -65,7 +65,7 @@ void object_table_lookup_callback(redisAsyncContext *c, void redis_object_table_lookup(table_callback_data *callback_data); /** - * Add an entry to the object table in redis. + * Add a location entry to the object table in redis. * * @param callback_data Data structure containing redis connection and timeout * information. @@ -82,50 +82,90 @@ void redis_object_table_add(table_callback_data *callback_data); */ void redis_object_table_subscribe(table_callback_data *callback_data); +/** + * Add a new object to the object table in redis. + * + * @param callback_data Data structure containing redis connection and timeout + * information. + * @return Void. + */ +void redis_result_table_add(table_callback_data *callback_data); + +/** + * Lookup the object in the object table in redis. The entry in + * the object table contains metadata about the object. + * + * @param callback_data Data structure containing redis connection and timeout + * information. + * @return Void. + */ +void redis_result_table_lookup(table_callback_data *callback_data); + /* * ==== Redis task table function ===== */ /** - * Add or update task log entry with new scheduling information. + * Get a task table entry, including the task spec and the task's scheduling + * information. * * @param callback_data Data structure containing redis connection and timeout * information. * @return Void. */ -void redis_task_log_publish(table_callback_data *callback_data); +void redis_task_table_get_task(table_callback_data *callback_data); /** - * Callback invoked when the replya from the task push command is received. + * Add a task table entry with a new task spec and the task's scheduling + * information. + * + * @param callback_data Data structure containing redis connection and timeout + * information. + * @return Void. + */ +void redis_task_table_add_task(table_callback_data *callback_data); + +/** + * Update a task table entry with the task's scheduling information. + * + * @param callback_data Data structure containing redis connection and timeout + * information. + * @return Void. + */ +void redis_task_table_update(table_callback_data *callback_data); + +/** + * Callback invoked when the reply from the task push command is received. * * @param c Redis context. * @param r Reply (not used). * @param privdata Data associated to the callback. * @return Void. */ -void redis_task_log_publish_push_callback(redisAsyncContext *c, - void *r, - void *privdata); + +void redis_task_table_publish_push_callback(redisAsyncContext *c, + void *r, + void *privdata); /** - * Callback invoked when the replya from the task publish command is received. + * Callback invoked when the reply from the task publish command is received. * * @param c Redis context. * @param r Reply (not used). * @param privdata Data associated to the callback. * @return Void. */ -void redis_task_log_publish_publish_callback(redisAsyncContext *c, - void *r, - void *privdata); +void redis_task_table_publish_publish_callback(redisAsyncContext *c, + void *r, + void *privdata); /** - * Subscribe to updates of the task log. + * Subscribe to updates of the task table. * * @param callback_data Data structure containing redis connection and timeout * information. * @return Void. */ -void redis_task_log_subscribe(table_callback_data *callback_data); +void redis_task_table_subscribe(table_callback_data *callback_data); #endif /* REDIS_H */ diff --git a/src/common/state/table.c b/src/common/state/table.c index f7337e1e5..c4e8ee021 100644 --- a/src/common/state/table.c +++ b/src/common/state/table.c @@ -67,9 +67,10 @@ int64_t table_timeout_handler(event_loop *loop, if (callback_data->retry.num_retries == 0) { /* We didn't get a response from the database after exhausting all retries; * let user know, cleanup the state, and remove the timer. */ + LOG_ERR("Table command with timer ID %ld failed", timer_id); if (callback_data->retry.fail_callback) { - callback_data->retry.fail_callback(callback_data->id, - callback_data->user_context); + callback_data->retry.fail_callback( + callback_data->id, callback_data->user_context, callback_data->data); } destroy_table_callback(callback_data); return EVENT_LOOP_TIMER_DONE; diff --git a/src/common/state/table.h b/src/common/state/table.h index f82084403..7dee332bc 100644 --- a/src/common/state/table.h +++ b/src/common/state/table.h @@ -12,8 +12,19 @@ typedef struct table_callback_data table_callback_data; typedef void *table_done_callback; /* The callback called when the database operation hasn't completed after - * the number of retries specified for the operation. */ -typedef void (*table_fail_callback)(unique_id id, void *user_context); + * the number of retries specified for the operation. + * + * @param id The unique ID that identifies this callback. Examples include an + * object ID or task ID. + * @param user_context The state context for the callback. This is equivalent + * to the user_context field in table_callback_data. + * @param user_data A data argument for the callback. This is equivalent to the + * data field in table_callback_data. The user is responsible for + * freeing user_data. + */ +typedef void (*table_fail_callback)(unique_id id, + void *user_context, + void *user_data); typedef void (*table_retry_callback)(table_callback_data *callback_data); @@ -41,7 +52,9 @@ struct table_callback_data { * before the next retry, and a pointer to the failure callback. */ retry_info retry; - /** Pointer to the data that is entered into the table. */ + /** Pointer to the data that is entered into the table. This can be used to + * pass the result of the call to the callback. The user is responsible for + * freeing data in both the fail_callback and done_callback. */ void *data; /** Pointer to the data used internally to handle multiple database requests. */ diff --git a/src/common/state/task_log.c b/src/common/state/task_log.c deleted file mode 100644 index d7887da02..000000000 --- a/src/common/state/task_log.c +++ /dev/null @@ -1,34 +0,0 @@ -#include "task_log.h" -#include "redis.h" - -#define NUM_DB_REQUESTS 2 - -void task_log_publish(db_handle *db_handle, - task_instance *task_instance, - retry_info *retry, - task_log_done_callback done_callback, - void *user_context) { - init_table_callback(db_handle, *task_instance_id(task_instance), - task_instance, retry, done_callback, - redis_task_log_publish, user_context); -} - -/* TODO(swang): A corresponding task_log_unsubscribe. */ -void task_log_subscribe(db_handle *db_handle, - node_id node, - int32_t state_filter, - task_log_subscribe_callback subscribe_callback, - void *subscribe_context, - retry_info *retry, - task_log_done_callback done_callback, - void *user_context) { - task_log_subscribe_data *sub_data = malloc(sizeof(task_log_subscribe_data)); - utarray_push_back(db_handle->callback_freelist, &sub_data); - sub_data->node = node; - sub_data->state_filter = state_filter; - sub_data->subscribe_callback = subscribe_callback; - sub_data->subscribe_context = subscribe_context; - - init_table_callback(db_handle, node, sub_data, retry, done_callback, - redis_task_log_subscribe, user_context); -} diff --git a/src/common/state/task_log.h b/src/common/state/task_log.h deleted file mode 100644 index 1176976f9..000000000 --- a/src/common/state/task_log.h +++ /dev/null @@ -1,88 +0,0 @@ -#ifndef TASK_LOG_H -#define TASK_LOG_H - -#include "db.h" -#include "table.h" -#include "task.h" - -/** - * The task log is a message bus that is used for all communication between - * local and global schedulers (and also persisted to the state database). - * Here are examples of events that are recorded by the task log: - * - * 1) local scheduler writes it when submits a task to the global scheduler; - * 2) global scheduler reads it to get the task submitted by local schedulers; - * 3) global scheduler writes it when assigning the task to a local scheduler; - * 4) local scheduler reads it to get its tasks assigned by global scheduler; - * 5) local scheduler writes it when a task finishes execution; - * 6) global scheduler reads it to get the tasks that have finished; */ - -/* Callback called when the task log operation completes. */ -typedef void (*task_log_done_callback)(task_iid task_iid, void *user_context); - -/* - * ==== Publish the task log ==== - */ - -/** - * Add or update a task instance to the task log. - * - * @param db_handle Database handle. - * @param retry Information about retrying the request to the database. - * @param done_callback Function to be called when database returns result. - * @param user_context Data that will be passed to done_callback and - * fail_callback. - * @return Void. - */ -void task_log_publish(db_handle *db_handle, - task_instance *task_instance, - retry_info *retry, - task_log_done_callback done_callback, - void *user_context); - -/* - * ==== Subscribing to the task log ==== - */ - -/* Callback for subscribing to the task log. */ -typedef void (*task_log_subscribe_callback)(task_instance *task_instance, - void *user_context); - -/** - * Register callback for a certain event. - * - * @param db_handle Database handle. - * @param subscribe_callback Callback that will be called when the task log is - * updated. - * @param subscribe_context Context that will be passed into the - * subscribe_callback. - * @param node Node whose events we want to listen to. If you want to register - * to updates from all nodes, set node = NIL_ID. - * @param state_filter Flags for events we want to listen to. If you want - * to listen to all events, use state_filter = TASK_WAITING | - * TASK_SCHEDULED | TASK_RUNNING | TASK_DONE. - * @param retry Information about retrying the request to the database. - * @param done_callback Function to be called when database returns result. - * @param user_context Data that will be passed to done_callback and - * fail_callback. - * @return Void. - */ -void task_log_subscribe(db_handle *db_handle, - node_id node, - int32_t state_filter, - task_log_subscribe_callback subscribe_callback, - void *subscribe_context, - retry_info *retry, - task_log_done_callback done_callback, - void *user_context); - -/* Data that is needed to register task log subscribe callbacks with the state - * database. */ -typedef struct { - node_id node; - int32_t state_filter; - task_log_subscribe_callback subscribe_callback; - void *subscribe_context; -} task_log_subscribe_data; - -#endif /* TASK_LOG_H */ diff --git a/src/common/state/task_table.c b/src/common/state/task_table.c new file mode 100644 index 000000000..1f62a794d --- /dev/null +++ b/src/common/state/task_table.c @@ -0,0 +1,52 @@ +#include "task_table.h" +#include "redis.h" + +#define NUM_DB_REQUESTS 2 + +void task_table_get_task(db_handle *db_handle, + task_id task_id, + retry_info *retry, + task_table_get_callback done_callback, + void *user_context) { + init_table_callback(db_handle, task_id, NULL, retry, done_callback, + redis_task_table_get_task, user_context); +} + +void task_table_add_task(db_handle *db_handle, + task *task, + retry_info *retry, + task_table_done_callback done_callback, + void *user_context) { + init_table_callback(db_handle, task_task_id(task), task, retry, done_callback, + redis_task_table_add_task, user_context); +} + +void task_table_update(db_handle *db_handle, + task *task, + retry_info *retry, + task_table_done_callback done_callback, + void *user_context) { + init_table_callback(db_handle, task_task_id(task), task, retry, done_callback, + redis_task_table_update, user_context); +} + +/* TODO(swang): A corresponding task_table_unsubscribe. */ +void task_table_subscribe(db_handle *db_handle, + node_id node, + scheduling_state state_filter, + task_table_subscribe_callback subscribe_callback, + void *subscribe_context, + retry_info *retry, + task_table_done_callback done_callback, + void *user_context) { + task_table_subscribe_data *sub_data = + malloc(sizeof(task_table_subscribe_data)); + utarray_push_back(db_handle->callback_freelist, &sub_data); + sub_data->node = node; + sub_data->state_filter = state_filter; + sub_data->subscribe_callback = subscribe_callback; + sub_data->subscribe_context = subscribe_context; + + init_table_callback(db_handle, node, sub_data, retry, done_callback, + redis_task_table_subscribe, user_context); +} diff --git a/src/common/state/task_table.h b/src/common/state/task_table.h index 3a1852522..576481eb1 100644 --- a/src/common/state/task_table.h +++ b/src/common/state/task_table.h @@ -1,20 +1,132 @@ -#ifndef TASK_TABLE_H -#define TASK_TABLE_H +#ifndef task_table_H +#define task_table_H #include "db.h" +#include "table.h" #include "task.h" -/* Add task to the task table, handle errors here. */ -status task_table_add_task(db_handle *db, task_spec *task); +/** + * The task table is a message bus that is used for all communication between + * local and global schedulers (and also persisted to the state database). + * Here are examples of events that are recorded by the task table: + * + * 1) local scheduler writes when it submits a task to the global scheduler; + * 2) global scheduler reads it to get the task submitted by local schedulers; + * 3) global scheduler writes it when assigning the task to a local scheduler; + * 4) local scheduler reads it to get its tasks assigned by global scheduler; + * 5) local scheduler writes it when a task finishes execution; + * 6) global scheduler reads it to get the tasks that have finished; */ -/* Callback for getting an entry from the task table. Task spec will be freed - * by the system after the callback */ -typedef void (*task_table_callback)(task_spec *task, void *context); +/* Callback called when a task table write operation completes. */ +typedef void (*task_table_done_callback)(task_id task_id, void *user_context); -/* Get specific task from the task table. */ -status task_table_get_task(db_handle *db, - task_id task_id, - task_table_callback callback, - void *context); +/* Callback called when a task table read operation completes. */ +typedef void (*task_table_get_callback)(task *task, void *user_context); -#endif /* TASK_TABLE_H */ +/** + * Get a task's entry from the task table. + * + * @param db_handle Database handle. + * @param task_id The ID of the task we want to look up. + * @param retry Information about retrying the request to the database. + * @param done_callback Function to be called when database returns result. + * @param user_context Data that will be passed to done_callback and + * fail_callback. + * @return Void. + */ +void task_table_get_task(db_handle *db, + task_id task_id, + retry_info *retry, + task_table_get_callback done_callback, + void *user_context); + +/** + * Add a task entry, including task spec and scheduling information, to the + * task table. This will overwrite any task already in the task table with the + * same task ID. + * + * @param db_handle Database handle. + * @param task The task entry to add to the table. + * @param retry Information about retrying the request to the database. + * @param done_callback Function to be called when database returns result. + * @param user_context Data that will be passed to done_callback and + * fail_callback. + * @return Void. + */ +void task_table_add_task(db_handle *db_handle, + task *task, + retry_info *retry, + task_table_done_callback done_callback, + void *user_context); + +/* + * ==== Publish the task table ==== + */ + +/** + * Update a task's scheduling information in the task table. This assumes that + * the task spec already exists in the task table entry. + * + * @param db_handle Database handle. + * @param task The task entry to add to the table. The task spec in the entry is + * ignored. + * @param retry Information about retrying the request to the database. + * @param done_callback Function to be called when database returns result. + * @param user_context Data that will be passed to done_callback and + * fail_callback. + * @return Void. + */ +void task_table_update(db_handle *db_handle, + task *task, + retry_info *retry, + task_table_done_callback done_callback, + void *user_context); + +/* + * ==== Subscribing to the task table ==== + */ + +/* Callback for subscribing to the task table. */ +typedef void (*task_table_subscribe_callback)(task *task, void *user_context); + +/** + * Register a callback for a task event. An event is any update of a task in + * the task table, produced by task_table_add_task or task_table_add_task. + * Events include changes to the task's scheduling state or changes to the + * task's node location. + * + * @param db_handle Database handle. + * @param subscribe_callback Callback that will be called when the task table is + * updated. + * @param subscribe_context Context that will be passed into the + * subscribe_callback. + * @param node Node whose events we want to listen to. If you want to register + * to updates from all nodes, set node = NIL_ID. + * @param state_filter Flags for events we want to listen to. If you want + * to listen to all events, use state_filter = TASK_WAITING | + * TASK_SCHEDULED | TASK_RUNNING | TASK_DONE. + * @param retry Information about retrying the request to the database. + * @param done_callback Function to be called when database returns result. + * @param user_context Data that will be passed to done_callback and + * fail_callback. + * @return Void. + */ +void task_table_subscribe(db_handle *db_handle, + node_id node, + scheduling_state state_filter, + task_table_subscribe_callback subscribe_callback, + void *subscribe_context, + retry_info *retry, + task_table_done_callback done_callback, + void *user_context); + +/* Data that is needed to register task table subscribe callbacks with the state + * database. */ +typedef struct { + node_id node; + scheduling_state state_filter; + task_table_subscribe_callback subscribe_callback; + void *subscribe_context; +} task_table_subscribe_data; + +#endif /* task_table_H */ diff --git a/src/common/task.c b/src/common/task.c index 57acf5009..ea35c06f4 100644 --- a/src/common/task.c +++ b/src/common/task.c @@ -9,10 +9,6 @@ #include "common.h" #include "io.h" -const unique_id NIL_TASK_ID = {{255, 255, 255, 255, 255, 255, 255, - 255, 255, 255, 255, 255, 255, 255, - 255, 255, 255, 255, 255, 255}}; - /* TASK SPECIFICATIONS */ /* Tasks are stored in a consecutive chunk of memory, the first @@ -70,11 +66,19 @@ struct task_spec_impl { (ARGS_VALUE_SIZE)) bool task_ids_equal(task_id first_id, task_id second_id) { - return memcmp(&first_id, &second_id, sizeof(task_id)) == 0 ? true : false; + return UNIQUE_ID_EQ(first_id, second_id) ? true : false; +} + +bool task_id_is_nil(task_id id) { + return task_ids_equal(id, NIL_TASK_ID); } bool function_ids_equal(function_id first_id, function_id second_id) { - return memcmp(&first_id, &second_id, sizeof(function_id)) == 0 ? true : false; + return UNIQUE_ID_EQ(first_id, second_id) ? true : false; +} + +bool function_id_is_nil(function_id id) { + return function_ids_equal(id, NIL_FUNCTION_ID); } task_id *task_return_ptr(task_spec *spec, int64_t return_index) { @@ -99,7 +103,7 @@ task_id compute_task_id(task_spec *spec) { SHA256_CTX ctx; BYTE buff[SHA256_BLOCK_SIZE]; sha256_init(&ctx); - sha256_update(&ctx, (BYTE *) spec, task_size(spec)); + sha256_update(&ctx, (BYTE *) spec, task_spec_size(spec)); sha256_final(&ctx, buff); /* Create a task ID out of the hash. This will truncate the hash. */ task_id task_id; @@ -151,7 +155,15 @@ void finish_construct_task_spec(task_spec *spec) { } } -int64_t task_size(task_spec *spec) { +task_spec *alloc_nil_task_spec(task_id task_id) { + task_spec *spec = + start_construct_task_spec(NIL_ID, 0, NIL_FUNCTION_ID, 0, 0, 0); + finish_construct_task_spec(spec); + spec->task_id = task_id; + return spec; +} + +int64_t task_spec_size(task_spec *spec) { return TASK_SPEC_SIZE(spec->num_args, spec->num_returns, spec->args_value_size); } @@ -162,7 +174,7 @@ function_id task_function(task_spec *spec) { return spec->function_id; } -task_id task_task_id(task_spec *spec) { +task_id task_spec_id(task_spec *spec) { /* Check that the task has been constructed. */ DCHECK(!task_ids_equal(spec->task_id, NIL_TASK_ID)); return spec->task_id; @@ -269,47 +281,58 @@ void print_task(task_spec *spec, UT_string *output) { /* TASK INSTANCES */ -struct task_instance_impl { - task_iid iid; - int32_t state; +struct task_impl { + scheduling_state state; node_id node; task_spec spec; }; -task_instance *make_task_instance(task_iid task_iid, - task_spec *spec, - int32_t state, - node_id node) { - int64_t size = sizeof(task_instance) - sizeof(task_spec) + task_size(spec); - task_instance *result = malloc(size); +bool node_ids_equal(node_id first_id, node_id second_id) { + return UNIQUE_ID_EQ(first_id, second_id) ? true : false; +} + +bool node_id_is_nil(node_id id) { + return node_ids_equal(id, NIL_NODE_ID); +} + +task *alloc_task(task_spec *spec, scheduling_state state, node_id node) { + int64_t size = sizeof(task) - sizeof(task_spec) + task_spec_size(spec); + task *result = malloc(size); memset(result, 0, size); - result->iid = task_iid; result->state = state; result->node = node; - memcpy(&result->spec, spec, task_size(spec)); + memcpy(&result->spec, spec, task_spec_size(spec)); return result; } -int64_t task_instance_size(task_instance *instance) { - return sizeof(task_instance) - sizeof(task_spec) + task_size(&instance->spec); +task *alloc_nil_task(task_id task_id) { + task_spec *nil_spec = alloc_nil_task_spec(task_id); + task *nil_task = alloc_task(nil_spec, 0, NIL_ID); + free_task_spec(nil_spec); + return nil_task; } -task_iid *task_instance_id(task_instance *instance) { - return &instance->iid; +int64_t task_size(task *task_arg) { + return sizeof(task) - sizeof(task_spec) + task_spec_size(&task_arg->spec); } -int32_t *task_instance_state(task_instance *instance) { - return &instance->state; +scheduling_state task_state(task *task) { + return task->state; } -node_id *task_instance_node(task_instance *instance) { - return &instance->node; +node_id task_node(task *task) { + return task->node; } -task_spec *task_instance_task_spec(task_instance *instance) { - return &instance->spec; +task_spec *task_task_spec(task *task) { + return &task->spec; } -void task_instance_free(task_instance *instance) { - free(instance); +task_id task_task_id(task *task) { + task_spec *spec = task_task_spec(task); + return task_spec_id(spec); +} + +void free_task(task *task) { + free(task); } diff --git a/src/common/task.h b/src/common/task.h index fb2e14fbe..8d026c05f 100644 --- a/src/common/task.h +++ b/src/common/task.h @@ -1,7 +1,8 @@ #ifndef TASK_H #define TASK_H -/* This API specifies the task data structures. It is in C so we can +/** + * This API specifies the task data structures. It is in C so we can * easily construct tasks from other languages like Python. The datastructures * are also defined in such a way that memory is contiguous and all pointers * are relative, so that we can memcpy the datastructure and ship it over the @@ -13,6 +14,10 @@ #include "common.h" #include "utstring.h" +#define NIL_TASK_ID NIL_ID +#define NIL_FUNCTION_ID NIL_ID +#define NIL_NODE_ID NIL_ID + typedef unique_id function_id; /** The task ID is a deterministic hash of the function ID that the task @@ -26,7 +31,7 @@ typedef unique_id task_iid; /** The node id is an identifier for the node the task is scheduled on. */ typedef unique_id node_id; -/* +/** * ==== Task specifications ==== * Contain all the information neccessary to execute the * task (function id, arguments, return object ids). @@ -46,6 +51,14 @@ enum arg_type { ARG_BY_REF, ARG_BY_VAL }; */ bool task_ids_equal(task_id first_id, task_id second_id); +/** + * Compare a task ID to the nil ID. + * + * @param id The task ID to compare to nil. + * @return True if the task ID is equal to nil. + */ +bool task_id_is_nil(task_id id); + /** * Compare two function IDs. * @@ -55,6 +68,14 @@ bool task_ids_equal(task_id first_id, task_id second_id); */ bool function_ids_equal(function_id first_id, function_id second_id); +/** + * Compare a function ID to the nil ID. + * + * @param id The function ID to compare to nil. + * @return True if the function ID is equal to nil. + */ +bool function_id_is_nil(function_id id); + /* Construct and modify task specifications. */ /** @@ -94,7 +115,7 @@ void finish_construct_task_spec(task_spec *spec); * @param spec The task_spec in question. * @return The size of the task_spec in bytes. */ -int64_t task_size(task_spec *spec); +int64_t task_spec_size(task_spec *spec); /** * Return the function ID of the task. @@ -110,7 +131,7 @@ function_id task_function(task_spec *spec); * @param spec The task_spec in question. * @return The task ID of the task. */ -task_id task_task_id(task_spec *spec); +task_id task_spec_id(task_spec *spec); /** * Get the number of arguments to this task. @@ -216,8 +237,8 @@ void free_task_spec(task_spec *spec); */ void print_task(task_spec *spec, UT_string *output); -/* - * ==== Task instance ==== +/** + * ==== Task ==== * Contains information about a scheduled task: The task iid, * the task specification and the task status (WAITING, SCHEDULED, * RUNNING, DONE) and which node the task is scheduled on. @@ -225,50 +246,70 @@ void print_task(task_spec *spec, UT_string *output); /** The scheduling_state can be used as a flag when we are listening * for an event, for example TASK_WAITING | TASK_SCHEDULED. */ -enum scheduling_state { +typedef enum { TASK_STATUS_WAITING = 1, TASK_STATUS_SCHEDULED = 2, TASK_STATUS_RUNNING = 4, TASK_STATUS_DONE = 8 -}; +} scheduling_state; -/** A task instance is one execution of a task specification. It has a unique - * instance id, a state of execution (see scheduling_state) and a node it is - * scheduled on or running on. */ -typedef struct task_instance_impl task_instance; +/** + * Compare two node IDs. + * + * @param first_id The first node ID to compare. + * @param second_id The first node ID to compare. + * @return True if the node IDs are the same and false + * otherwise. + */ +bool node_ids_equal(node_id first_id, node_id second_id); -/* Allocate and initialize a new task instance. Must be freed with - * scheduled_task_free after use. */ -task_instance *make_task_instance(task_iid task_iid, - task_spec *task, - int32_t state, - node_id node); +/** + * Compare a node ID to the nil ID. + * + * @param id The node ID to compare to nil. + * @return True if the node ID is equal to nil. + */ +bool node_id_is_nil(node_id id); -/* Size of task instance structure in bytes. */ -int64_t task_instance_size(task_instance *instance); +/** A task is an execution of a task specification. It has a state of + * execution (see scheduling_state) and a node it is scheduled on or running + * on. */ +typedef struct task_impl task; -/* Instance ID of the task instance. */ -task_iid *task_instance_id(task_instance *instance); +/** + * Allocate a new task. Must be freed with free_task after use. + * + * @param spec The task spec for the new task. + * @param state The scheduling state for the new task. + * @param node The ID of the node that the task is scheduled on, if any. + */ +task *alloc_task(task_spec *spec, scheduling_state state, node_id node); -/* The scheduling state of the task instance. */ -int32_t *task_instance_state(task_instance *instance); +/** Size of task structure in bytes. */ +int64_t task_size(task *task); -/* Node this task instance has been assigned to or is running on. */ -node_id *task_instance_node(task_instance *instance); +/** The scheduling state of the task. */ +scheduling_state task_state(task *task); -/* Task specification of this task instance. */ -task_spec *task_instance_task_spec(task_instance *instance); +/** Node this task has been assigned to or is running on. */ +node_id task_node(task *task); -/* Free this task instance datastructure. */ -void task_instance_free(task_instance *instance); +/** Task specification of this task. */ +task_spec *task_task_spec(task *task); -/* +/** Task ID of this task. */ +task_id task_task_id(task *task); + +/** Free this task datastructure. */ +void free_task(task *task); + +/** * ==== Task update ==== * Contains the information necessary to update a task in the task log. */ typedef struct { - int32_t state; + scheduling_state state; node_id node; } task_update; diff --git a/src/common/test/db_tests.c b/src/common/test/db_tests.c index 9bf3c96db..f473a9b7a 100644 --- a/src/common/test/db_tests.c +++ b/src/common/test/db_tests.c @@ -8,7 +8,7 @@ #include "test_common.h" #include "state/db.h" #include "state/object_table.h" -#include "state/task_log.h" +#include "state/task_table.h" #include "state/redis.h" #include "task.h" @@ -54,7 +54,7 @@ void lookup_done_callback(object_id object_id, void add_done_callback(object_id object_id, void *user_context) {} /* Test if we got a timeout callback if we couldn't connect database. */ -void timeout_callback(object_id object_id, void *context) { +void timeout_callback(object_id object_id, void *context, void *user_data) { user_context *uc = (user_context *) context; CHECK(uc->test_number == TEST_NUMBER) } @@ -101,71 +101,98 @@ TEST object_table_lookup_test(void) { PASS(); } -void task_log_test_callback(task_instance *instance, void *userdata) { - task_instance *other = userdata; - CHECK(*task_instance_state(instance) == TASK_STATUS_SCHEDULED); - CHECK(task_instance_size(instance) == task_instance_size(other)); - CHECK(memcmp(instance, other, task_instance_size(instance)) == 0); +int task_table_test_callback_called = 0; +task *task_table_test_task; + +void task_table_test_fail_callback(unique_id id, + void *context, + void *user_data) { + event_loop *loop = user_data; + event_loop_stop(loop); } -TEST task_log_test(void) { +int64_t task_table_delayed_add_task(event_loop *loop, + int64_t id, + void *context) { + db_handle *db = context; + retry_info retry = { + .num_retries = NUM_RETRIES, + .timeout = TIMEOUT, + .fail_callback = task_table_test_fail_callback, + }; + task_table_add_task(db, task_table_test_task, &retry, NULL, (void *) loop); + return EVENT_LOOP_TIMER_DONE; +} + +void task_table_test_callback(task *callback_task, void *user_data) { + task_table_test_callback_called = 1; + CHECK(task_state(callback_task) == TASK_STATUS_SCHEDULED); + CHECK(task_size(callback_task) == task_size(task_table_test_task)); + CHECK(memcmp(callback_task, task_table_test_task, task_size(callback_task)) == + 0); + event_loop *loop = user_data; + event_loop_stop(loop); +} + +TEST task_table_test(void) { + task_table_test_callback_called = 0; event_loop *loop = event_loop_create(); db_handle *db = db_connect("127.0.0.1", 6379, "local_scheduler", "", -1); db_attach(db, loop); node_id node = globally_unique_id(); - task_spec *task = example_task(); - task_instance *instance = make_task_instance(globally_unique_id(), task, - TASK_STATUS_SCHEDULED, node); + task_spec *spec = example_task_spec(); + task_table_test_task = alloc_task(spec, TASK_STATUS_SCHEDULED, node); + free_task_spec(spec); retry_info retry = { - .num_retries = NUM_RETRIES, .timeout = TIMEOUT, .fail_callback = NULL, + .num_retries = NUM_RETRIES, + .timeout = TIMEOUT, + .fail_callback = task_table_test_fail_callback, }; - task_log_subscribe(db, node, TASK_STATUS_SCHEDULED, task_log_test_callback, - instance, &retry, NULL, NULL); - task_log_publish(db, instance, &retry, NULL, NULL); - event_loop_add_timer(loop, 200, (event_loop_timer_handler) timeout_handler, - NULL); + task_table_subscribe(db, node, TASK_STATUS_SCHEDULED, + task_table_test_callback, (void *) loop, &retry, NULL, + (void *) loop); + event_loop_add_timer( + loop, 200, (event_loop_timer_handler) task_table_delayed_add_task, db); event_loop_run(loop); - task_instance_free(instance); - free_task_spec(task); + free_task(task_table_test_task); db_disconnect(db); destroy_outstanding_callbacks(loop); event_loop_destroy(loop); + ASSERT(task_table_test_callback_called); PASS(); } int num_test_callback_called = 0; -void task_log_all_test_callback(task_instance *instance, void *userdata) { +void task_table_all_test_callback(task *task, void *user_data) { num_test_callback_called += 1; } -TEST task_log_all_test(void) { +TEST task_table_all_test(void) { event_loop *loop = event_loop_create(); db_handle *db = db_connect("127.0.0.1", 6379, "local_scheduler", "", -1); db_attach(db, loop); - task_spec *task = example_task(); + task_spec *spec = example_task_spec(); /* Schedule two tasks on different nodes. */ - task_instance *instance1 = make_task_instance( - globally_unique_id(), task, TASK_STATUS_SCHEDULED, globally_unique_id()); - task_instance *instance2 = make_task_instance( - globally_unique_id(), task, TASK_STATUS_SCHEDULED, globally_unique_id()); + task *task1 = alloc_task(spec, TASK_STATUS_SCHEDULED, globally_unique_id()); + task *task2 = alloc_task(spec, TASK_STATUS_SCHEDULED, globally_unique_id()); retry_info retry = { .num_retries = NUM_RETRIES, .timeout = TIMEOUT, .fail_callback = NULL, }; - task_log_subscribe(db, NIL_ID, TASK_STATUS_SCHEDULED, - task_log_all_test_callback, NULL, &retry, NULL, NULL); + task_table_subscribe(db, NIL_ID, TASK_STATUS_SCHEDULED, + task_table_all_test_callback, NULL, &retry, NULL, NULL); event_loop_add_timer(loop, 50, (event_loop_timer_handler) timeout_handler, NULL); event_loop_run(loop); /* TODO(pcm): Get rid of this sleep once the robust pubsub is implemented. */ - task_log_publish(db, instance1, &retry, NULL, NULL); - task_log_publish(db, instance2, &retry, NULL, NULL); + task_table_update(db, task1, &retry, NULL, NULL); + task_table_update(db, task2, &retry, NULL, NULL); event_loop_add_timer(loop, 200, (event_loop_timer_handler) timeout_handler, NULL); event_loop_run(loop); - task_instance_free(instance2); - task_instance_free(instance1); - free_task_spec(task); + free(task2); + free(task1); + free_task_spec(spec); db_disconnect(db); destroy_outstanding_callbacks(loop); event_loop_destroy(loop); @@ -198,8 +225,8 @@ TEST unique_client_id_test(void) { SUITE(db_tests) { RUN_REDIS_TEST(object_table_lookup_test); - RUN_REDIS_TEST(task_log_test); - RUN_REDIS_TEST(task_log_all_test); + RUN_REDIS_TEST(task_table_test); + RUN_REDIS_TEST(task_table_all_test); RUN_REDIS_TEST(unique_client_id_test); } diff --git a/src/common/test/object_table_tests.c b/src/common/test/object_table_tests.c index e0e496af9..af0ac335b 100644 --- a/src/common/test/object_table_tests.c +++ b/src/common/test/object_table_tests.c @@ -12,6 +12,132 @@ SUITE(object_table_tests); static event_loop *g_loop; +/* ==== Test adding and looking up metadata ==== */ + +int new_object_failed = 0; +int new_object_succeeded = 0; +object_id new_object_id; +task *new_object_task; +task_spec *new_object_task_spec; +task_id new_object_task_id; + +void new_object_fail_callback(unique_id id, + void *user_context, + void *user_data) { + new_object_failed = 1; + event_loop_stop(g_loop); +} + +/* === Test adding an object with an associated task === */ + +void new_object_done_callback(object_id object_id, + task *task, + void *user_context) { + new_object_succeeded = 1; + CHECK(object_ids_equal(object_id, new_object_id)); + CHECK(task); + CHECK(memcmp(task, new_object_task, task_size(task)) == 0); + event_loop_stop(g_loop); +} + +void new_object_lookup_callback(object_id object_id, void *user_context) { + CHECK(object_ids_equal(object_id, new_object_id)); + retry_info retry = { + .num_retries = 5, + .timeout = 100, + .fail_callback = new_object_fail_callback, + }; + db_handle *db = user_context; + result_table_lookup(db, new_object_id, &retry, new_object_done_callback, + NULL); +} + +void new_object_task_callback(task_id task_id, void *user_context) { + retry_info retry = { + .num_retries = 5, + .timeout = 100, + .fail_callback = new_object_fail_callback, + }; + db_handle *db = user_context; + result_table_add(db, new_object_id, new_object_task_id, &retry, + new_object_lookup_callback, (void *) db); +} + +TEST new_object_test(void) { + new_object_failed = 0; + new_object_succeeded = 0; + new_object_id = globally_unique_id(); + new_object_task = example_task(); + new_object_task_spec = task_task_spec(new_object_task); + new_object_task_id = task_spec_id(new_object_task_spec); + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_attach(db, g_loop); + retry_info retry = { + .num_retries = 5, + .timeout = 100, + .fail_callback = new_object_fail_callback, + }; + task_table_add_task(db, new_object_task, &retry, new_object_task_callback, + db); + event_loop_run(g_loop); + db_disconnect(db); + destroy_outstanding_callbacks(g_loop); + event_loop_destroy(g_loop); + free_task(new_object_task); + ASSERT(new_object_succeeded); + ASSERT(!new_object_failed); + PASS(); +} + +/* === Test adding an object without an associated task === */ + +void new_object_no_task_lookup_callback(object_id object_id, + task *task, + void *user_context) { + new_object_succeeded = 1; + CHECK(task == NULL); + event_loop_stop(g_loop); +} + +void new_object_no_task_callback(object_id object_id, void *user_context) { + CHECK(node_ids_equal(object_id, new_object_id)); + retry_info retry = { + .num_retries = 5, + .timeout = 100, + .fail_callback = new_object_fail_callback, + }; + db_handle *db = user_context; + result_table_lookup(db, object_id, &retry, new_object_no_task_lookup_callback, + NULL); +} + +TEST new_object_no_task_test(void) { + new_object_failed = 0; + new_object_succeeded = 0; + new_object_id = globally_unique_id(); + new_object_task_id = globally_unique_id(); + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_attach(db, g_loop); + retry_info retry = { + .num_retries = 5, + .timeout = 100, + .fail_callback = new_object_fail_callback, + }; + result_table_add(db, new_object_id, new_object_task_id, &retry, + new_object_no_task_callback, db); + event_loop_run(g_loop); + db_disconnect(db); + destroy_outstanding_callbacks(g_loop); + event_loop_destroy(g_loop); + ASSERT(new_object_succeeded); + ASSERT(!new_object_failed); + PASS(); +} + /* ==== Test if operations time out correctly ==== */ /* === Test lookup timeout === */ @@ -27,9 +153,9 @@ void lookup_done_callback(object_id object_id, CHECK(0); } -void lookup_fail_callback(unique_id id, void *user_data) { +void lookup_fail_callback(unique_id id, void *user_context, void *user_data) { lookup_failed = 1; - CHECK(user_data == (void *) lookup_timeout_context); + CHECK(user_context == (void *) lookup_timeout_context); event_loop_stop(g_loop); } @@ -63,9 +189,9 @@ void add_done_callback(object_id object_id, void *user_context) { CHECK(0); } -void add_fail_callback(unique_id id, void *user_data) { +void add_fail_callback(unique_id id, void *user_context, void *user_data) { add_failed = 1; - CHECK(user_data == (void *) add_timeout_context); + CHECK(user_context == (void *) add_timeout_context); event_loop_stop(g_loop); } @@ -99,9 +225,11 @@ void subscribe_done_callback(object_id object_id, void *user_context) { CHECK(0); } -void subscribe_fail_callback(unique_id id, void *user_data) { +void subscribe_fail_callback(unique_id id, + void *user_context, + void *user_data) { subscribe_failed = 1; - CHECK(user_data == (void *) subscribe_timeout_context); + CHECK(user_context == (void *) subscribe_timeout_context); event_loop_stop(g_loop); } @@ -166,7 +294,9 @@ void lookup_retry_done_callback(object_id object_id, free(manager_vector); } -void lookup_retry_fail_callback(unique_id id, void *user_data) { +void lookup_retry_fail_callback(unique_id id, + void *user_context, + void *user_data) { /* The fail callback should not be called. */ CHECK(0); } @@ -210,7 +340,9 @@ void add_retry_done_callback(object_id object_id, void *user_context) { add_retry_succeeded = 1; } -void add_retry_fail_callback(unique_id id, void *user_data) { +void add_retry_fail_callback(unique_id id, + void *user_context, + void *user_data) { /* The fail callback should not be called. */ CHECK(0); } @@ -269,7 +401,9 @@ void subscribe_retry_done_callback(object_id object_id, void *user_context) { subscribe_retry_succeeded = 1; } -void subscribe_retry_fail_callback(unique_id id, void *user_data) { +void subscribe_retry_fail_callback(unique_id id, + void *user_context, + void *user_data) { /* The fail callback should not be called. */ CHECK(0); } @@ -312,7 +446,9 @@ TEST subscribe_retry_test(void) { const char *lookup_late_context = "lookup_late"; int lookup_late_failed = 0; -void lookup_late_fail_callback(unique_id id, void *user_context) { +void lookup_late_fail_callback(unique_id id, + void *user_context, + void *user_data) { CHECK(user_context == (void *) lookup_late_context); lookup_late_failed = 1; } @@ -357,7 +493,7 @@ TEST lookup_late_test(void) { const char *add_late_context = "add_late"; int add_late_failed = 0; -void add_late_fail_callback(unique_id id, void *user_context) { +void add_late_fail_callback(unique_id id, void *user_context, void *user_data) { CHECK(user_context == (void *) add_late_context); add_late_failed = 1; } @@ -397,7 +533,9 @@ TEST add_late_test(void) { const char *subscribe_late_context = "subscribe_late"; int subscribe_late_failed = 0; -void subscribe_late_fail_callback(unique_id id, void *user_context) { +void subscribe_late_fail_callback(unique_id id, + void *user_context, + void *user_data) { CHECK(user_context == (void *) subscribe_late_context); subscribe_late_failed = 1; } @@ -441,7 +579,9 @@ const char *subscribe_success_context = "subscribe_success"; int subscribe_success_done = 0; int subscribe_success_succeeded = 0; -void subscribe_success_fail_callback(unique_id id, void *user_context) { +void subscribe_success_fail_callback(unique_id id, + void *user_context, + void *user_data) { /* This function should never be called. */ CHECK(0); } @@ -492,16 +632,18 @@ TEST subscribe_success_test(void) { } SUITE(object_table_tests) { - RUN_TEST(lookup_timeout_test); - RUN_TEST(add_timeout_test); - RUN_TEST(subscribe_timeout_test); - RUN_TEST(lookup_retry_test); - RUN_TEST(add_retry_test); - RUN_TEST(subscribe_retry_test); - RUN_TEST(lookup_late_test); - RUN_TEST(add_late_test); - RUN_TEST(subscribe_late_test); - RUN_TEST(subscribe_success_test); + RUN_REDIS_TEST(new_object_test); + RUN_REDIS_TEST(new_object_no_task_test); + RUN_REDIS_TEST(lookup_timeout_test); + RUN_REDIS_TEST(add_timeout_test); + RUN_REDIS_TEST(subscribe_timeout_test); + RUN_REDIS_TEST(lookup_retry_test); + RUN_REDIS_TEST(add_retry_test); + RUN_REDIS_TEST(subscribe_retry_test); + RUN_REDIS_TEST(lookup_late_test); + RUN_REDIS_TEST(add_late_test); + RUN_REDIS_TEST(subscribe_late_test); + RUN_REDIS_TEST(subscribe_success_test); } GREATEST_MAIN_DEFS(); diff --git a/src/common/test/task_log_tests.c b/src/common/test/task_log_tests.c deleted file mode 100644 index 58c790768..000000000 --- a/src/common/test/task_log_tests.c +++ /dev/null @@ -1,316 +0,0 @@ -#include "greatest.h" - -#include "event_loop.h" -#include "test_common.h" -#include "common.h" -#include "state/object_table.h" -#include "state/redis.h" - -#include -#include - -SUITE(task_log_tests); - -event_loop *loop; - -/* ==== Test if operations time out correctly ==== */ - -/* === Test subscribe timeout === */ - -const char *subscribe_timeout_context = "subscribe_timeout"; -int subscribe_failed = 0; - -void subscribe_done_callback(task_iid task_iid, void *user_context) { - /* The done callback should not be called. */ - CHECK(0); -} - -void subscribe_fail_callback(unique_id id, void *user_data) { - subscribe_failed = 1; - CHECK(user_data == (void *) subscribe_timeout_context); - event_loop_stop(loop); -} - -TEST subscribe_timeout_test(void) { - loop = event_loop_create(); - db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); - db_attach(db, loop); - retry_info retry = { - .num_retries = 5, - .timeout = 100, - .fail_callback = subscribe_fail_callback, - }; - task_log_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry, - subscribe_done_callback, - (void *) subscribe_timeout_context); - /* Disconnect the database to see if the subscribe times out. */ - close(db->sub_context->c.fd); - aeProcessEvents(loop, AE_TIME_EVENTS); - event_loop_run(loop); - db_disconnect(db); - destroy_outstanding_callbacks(loop); - event_loop_destroy(loop); - ASSERT(subscribe_failed); - PASS(); -} - -/* === Test publish timeout === */ - -const char *publish_timeout_context = "publish_timeout"; -const int publish_test_number = 272; -int publish_failed = 0; - -void publish_done_callback(task_iid task_iid, void *user_context) { - /* The done callback should not be called. */ - CHECK(0); -} - -void publish_fail_callback(unique_id id, void *user_data) { - publish_failed = 1; - CHECK(user_data == (void *) publish_timeout_context); - event_loop_stop(loop); -} - -TEST publish_timeout_test(void) { - loop = event_loop_create(); - db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); - db_attach(db, loop); - task_instance *task = example_task_instance(); - retry_info retry = { - .num_retries = 5, .timeout = 100, .fail_callback = publish_fail_callback, - }; - task_log_publish(db, task, &retry, publish_done_callback, - (void *) publish_timeout_context); - /* Disconnect the database to see if the publish times out. */ - close(db->context->c.fd); - aeProcessEvents(loop, AE_TIME_EVENTS); - event_loop_run(loop); - db_disconnect(db); - destroy_outstanding_callbacks(loop); - event_loop_destroy(loop); - ASSERT(publish_failed); - task_instance_free(task); - PASS(); -} - -/* ==== Test if the retry is working correctly ==== */ - -int64_t reconnect_db_callback(event_loop *loop, - int64_t timer_id, - void *context) { - db_handle *db = context; - /* Reconnect to redis. */ - redisAsyncFree(db->sub_context); - db->sub_context = redisAsyncConnect("127.0.0.1", 6379); - db->sub_context->data = (void *) db; - /* Re-attach the database to the event loop (the file descriptor changed). */ - db_attach(db, loop); - return EVENT_LOOP_TIMER_DONE; -} - -int64_t terminate_event_loop_callback(event_loop *loop, - int64_t timer_id, - void *context) { - event_loop_stop(loop); - return EVENT_LOOP_TIMER_DONE; -} - -/* === Test subscribe retry === */ - -const char *subscribe_retry_context = "subscribe_retry"; -const int subscribe_retry_test_number = 273; -int subscribe_retry_succeeded = 0; - -void subscribe_retry_done_callback(object_id object_id, void *user_context) { - CHECK(user_context == (void *) subscribe_retry_context); - subscribe_retry_succeeded = 1; -} - -void subscribe_retry_fail_callback(unique_id id, void *user_data) { - /* The fail callback should not be called. */ - CHECK(0); -} - -TEST subscribe_retry_test(void) { - loop = event_loop_create(); - db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); - db_attach(db, loop); - retry_info retry = { - .num_retries = 5, - .timeout = 100, - .fail_callback = subscribe_retry_fail_callback, - }; - task_log_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry, - subscribe_retry_done_callback, - (void *) subscribe_retry_context); - /* Disconnect the database to see if the subscribe times out. */ - close(db->sub_context->c.fd); - /* Install handler for reconnecting the database. */ - event_loop_add_timer(loop, 150, - (event_loop_timer_handler) reconnect_db_callback, db); - /* Install handler for terminating the event loop. */ - event_loop_add_timer(loop, 750, - (event_loop_timer_handler) terminate_event_loop_callback, - NULL); - event_loop_run(loop); - db_disconnect(db); - destroy_outstanding_callbacks(loop); - event_loop_destroy(loop); - ASSERT(subscribe_retry_succeeded); - PASS(); -} - -/* === Test publish retry === */ - -const char *publish_retry_context = "publish_retry"; -int publish_retry_succeeded = 0; - -void publish_retry_done_callback(object_id object_id, void *user_context) { - CHECK(user_context == (void *) publish_retry_context); - publish_retry_succeeded = 1; -} - -void publish_retry_fail_callback(unique_id id, void *user_data) { - /* The fail callback should not be called. */ - CHECK(0); -} - -TEST publish_retry_test(void) { - loop = event_loop_create(); - db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); - db_attach(db, loop); - task_instance *task = example_task_instance(); - retry_info retry = { - .num_retries = 5, - .timeout = 100, - .fail_callback = publish_retry_fail_callback, - }; - task_log_publish(db, task, &retry, publish_retry_done_callback, - (void *) publish_retry_context); - /* Disconnect the database to see if the publish times out. */ - close(db->sub_context->c.fd); - /* Install handler for reconnecting the database. */ - event_loop_add_timer(loop, 150, - (event_loop_timer_handler) reconnect_db_callback, db); - /* Install handler for terminating the event loop. */ - event_loop_add_timer(loop, 750, - (event_loop_timer_handler) terminate_event_loop_callback, - NULL); - event_loop_run(loop); - db_disconnect(db); - destroy_outstanding_callbacks(loop); - event_loop_destroy(loop); - ASSERT(publish_retry_succeeded); - task_instance_free(task); - PASS(); -} - -/* ==== Test if late succeed is working correctly ==== */ - -/* === Test subscribe late succeed === */ - -const char *subscribe_late_context = "subscribe_late"; -int subscribe_late_failed = 0; - -void subscribe_late_fail_callback(unique_id id, void *user_context) { - CHECK(user_context == (void *) subscribe_late_context); - subscribe_late_failed = 1; -} - -void subscribe_late_done_callback(task_iid task_iid, void *user_context) { - /* This function should never be called. */ - CHECK(0); -} - -TEST subscribe_late_test(void) { - loop = event_loop_create(); - db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); - db_attach(db, loop); - retry_info retry = { - .num_retries = 0, - .timeout = 0, - .fail_callback = subscribe_late_fail_callback, - }; - task_log_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry, - subscribe_late_done_callback, - (void *) subscribe_late_context); - /* Install handler for terminating the event loop. */ - event_loop_add_timer(loop, 750, - (event_loop_timer_handler) terminate_event_loop_callback, - NULL); - /* First process timer events to make sure the timeout is processed before - * anything else. */ - aeProcessEvents(loop, AE_TIME_EVENTS); - event_loop_run(loop); - db_disconnect(db); - destroy_outstanding_callbacks(loop); - event_loop_destroy(loop); - ASSERT(subscribe_late_failed); - PASS(); -} - -/* === Test publish late succeed === */ - -const char *publish_late_context = "publish_late"; -int publish_late_failed = 0; - -void publish_late_fail_callback(unique_id id, void *user_context) { - CHECK(user_context == (void *) publish_late_context); - publish_late_failed = 1; -} - -void publish_late_done_callback(task_iid task_iik, void *user_context) { - /* This function should never be called. */ - CHECK(0); -} - -TEST publish_late_test(void) { - loop = event_loop_create(); - db_handle *db = - db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); - db_attach(db, loop); - task_instance *task = example_task_instance(); - retry_info retry = { - .num_retries = 0, - .timeout = 0, - .fail_callback = publish_late_fail_callback, - }; - task_log_publish(db, task, &retry, publish_late_done_callback, - (void *) publish_late_context); - /* Install handler for terminating the event loop. */ - event_loop_add_timer(loop, 750, - (event_loop_timer_handler) terminate_event_loop_callback, - NULL); - /* First process timer events to make sure the timeout is processed before - * anything else. */ - aeProcessEvents(loop, AE_TIME_EVENTS); - event_loop_run(loop); - db_disconnect(db); - destroy_outstanding_callbacks(loop); - event_loop_destroy(loop); - ASSERT(publish_late_failed); - task_instance_free(task); - PASS(); -} - -SUITE(task_log_tests) { - RUN_TEST(subscribe_timeout_test); - RUN_TEST(publish_timeout_test); - RUN_TEST(subscribe_retry_test); - RUN_TEST(publish_retry_test); - RUN_TEST(subscribe_late_test); - RUN_TEST(publish_late_test); -} - -GREATEST_MAIN_DEFS(); - -int main(int argc, char **argv) { - GREATEST_MAIN_BEGIN(); - RUN_SUITE(task_log_tests); - GREATEST_MAIN_END(); -} diff --git a/src/common/test/task_table_tests.c b/src/common/test/task_table_tests.c new file mode 100644 index 000000000..aa6890376 --- /dev/null +++ b/src/common/test/task_table_tests.c @@ -0,0 +1,430 @@ +#include "greatest.h" + +#include "event_loop.h" +#include "test_common.h" +#include "common.h" +#include "state/object_table.h" +#include "state/redis.h" + +#include +#include + +SUITE(task_table_tests); + +event_loop *g_loop; + +/* ==== Test operations in non-failure scenario ==== */ + +/* === A lookup of a task not in the table === */ + +task_id lookup_nil_id; +int lookup_nil_success = 0; +const char *lookup_nil_context = "lookup_nil"; + +void lookup_nil_fail_callback(unique_id id, + void *user_context, + void *user_data) { + /* The fail callback should not be called. */ + CHECK(0); +} + +void lookup_nil_success_callback(task *task, void *context) { + lookup_nil_success = 1; + CHECK(task == NULL); + CHECK(context == (void *) lookup_nil_context); + event_loop_stop(g_loop); +} + +TEST lookup_nil_test(void) { + lookup_nil_id = globally_unique_id(); + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_attach(db, g_loop); + retry_info retry = { + .num_retries = 5, + .timeout = 1000, + .fail_callback = lookup_nil_fail_callback, + }; + task_table_get_task(db, lookup_nil_id, &retry, lookup_nil_success_callback, + (void *) lookup_nil_context); + /* Disconnect the database to see if the lookup times out. */ + event_loop_run(g_loop); + db_disconnect(db); + destroy_outstanding_callbacks(g_loop); + event_loop_destroy(g_loop); + ASSERT(lookup_nil_success); + PASS(); +} + +/* === A lookup of a task after it's added returns the same spec === */ + +int add_success = 0; +int lookup_success = 0; +task *add_lookup_task; +const char *add_lookup_context = "add_lookup"; + +void add_lookup_fail_callback(unique_id id, + void *user_context, + void *user_data) { + /* The fail callback should not be called. */ + CHECK(0); +} + +void lookup_success_callback(task *task, void *context) { + lookup_success = 1; + CHECK(memcmp(task, add_lookup_task, task_size(task)) == 0); + event_loop_stop(g_loop); +} + +void add_success_callback(task_id task_id, void *context) { + add_success = 1; + CHECK(task_ids_equal(task_id, task_task_id(add_lookup_task))); + + db_handle *db = context; + retry_info retry = { + .num_retries = 5, + .timeout = 1000, + .fail_callback = add_lookup_fail_callback, + }; + task_table_get_task(db, task_id, &retry, lookup_success_callback, + (void *) add_lookup_context); +} + +TEST add_lookup_test(void) { + add_lookup_task = example_task(); + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_attach(db, g_loop); + retry_info retry = { + .num_retries = 5, + .timeout = 1000, + .fail_callback = add_lookup_fail_callback, + }; + task_table_add_task(db, add_lookup_task, &retry, add_success_callback, + (void *) db); + /* Disconnect the database to see if the lookup times out. */ + event_loop_run(g_loop); + db_disconnect(db); + destroy_outstanding_callbacks(g_loop); + event_loop_destroy(g_loop); + free(add_lookup_task); + ASSERT(add_success); + ASSERT(lookup_success); + PASS(); +} + +/* ==== Test if operations time out correctly ==== */ + +/* === Test subscribe timeout === */ + +const char *subscribe_timeout_context = "subscribe_timeout"; +int subscribe_failed = 0; + +void subscribe_done_callback(task_id task_id, void *user_context) { + /* The done callback should not be called. */ + CHECK(0); +} + +void subscribe_fail_callback(unique_id id, + void *user_context, + void *user_data) { + subscribe_failed = 1; + CHECK(user_context == (void *) subscribe_timeout_context); + event_loop_stop(g_loop); +} + +TEST subscribe_timeout_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_attach(db, g_loop); + retry_info retry = { + .num_retries = 5, + .timeout = 100, + .fail_callback = subscribe_fail_callback, + }; + task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry, + subscribe_done_callback, + (void *) subscribe_timeout_context); + /* Disconnect the database to see if the subscribe times out. */ + close(db->sub_context->c.fd); + aeProcessEvents(g_loop, AE_TIME_EVENTS); + event_loop_run(g_loop); + db_disconnect(db); + destroy_outstanding_callbacks(g_loop); + event_loop_destroy(g_loop); + ASSERT(subscribe_failed); + PASS(); +} + +/* === Test publish timeout === */ + +const char *publish_timeout_context = "publish_timeout"; +const int publish_test_number = 272; +int publish_failed = 0; + +void publish_done_callback(task_id task_id, void *user_context) { + /* The done callback should not be called. */ + CHECK(0); +} + +void publish_fail_callback(unique_id id, void *user_context, void *user_data) { + publish_failed = 1; + CHECK(user_context == (void *) publish_timeout_context); + event_loop_stop(g_loop); +} + +TEST publish_timeout_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234); + db_attach(db, g_loop); + task *task = example_task(); + retry_info retry = { + .num_retries = 5, .timeout = 100, .fail_callback = publish_fail_callback, + }; + task_table_update(db, task, &retry, publish_done_callback, + (void *) publish_timeout_context); + /* Disconnect the database to see if the publish times out. */ + close(db->context->c.fd); + aeProcessEvents(g_loop, AE_TIME_EVENTS); + event_loop_run(g_loop); + db_disconnect(db); + destroy_outstanding_callbacks(g_loop); + event_loop_destroy(g_loop); + ASSERT(publish_failed); + free_task(task); + PASS(); +} + +/* ==== Test if the retry is working correctly ==== */ + +int64_t reconnect_db_callback(event_loop *loop, + int64_t timer_id, + void *context) { + db_handle *db = context; + /* Reconnect to redis. */ + redisAsyncFree(db->sub_context); + db->sub_context = redisAsyncConnect("127.0.0.1", 6379); + db->sub_context->data = (void *) db; + /* Re-attach the database to the event loop (the file descriptor changed). */ + db_attach(db, loop); + return EVENT_LOOP_TIMER_DONE; +} + +int64_t terminate_event_loop_callback(event_loop *loop, + int64_t timer_id, + void *context) { + event_loop_stop(loop); + return EVENT_LOOP_TIMER_DONE; +} + +/* === Test subscribe retry === */ + +const char *subscribe_retry_context = "subscribe_retry"; +const int subscribe_retry_test_number = 273; +int subscribe_retry_succeeded = 0; + +void subscribe_retry_done_callback(object_id object_id, void *user_context) { + CHECK(user_context == (void *) subscribe_retry_context); + subscribe_retry_succeeded = 1; +} + +void subscribe_retry_fail_callback(unique_id id, + void *user_context, + void *user_data) { + /* The fail callback should not be called. */ + CHECK(0); +} + +TEST subscribe_retry_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); + db_attach(db, g_loop); + retry_info retry = { + .num_retries = 5, + .timeout = 100, + .fail_callback = subscribe_retry_fail_callback, + }; + task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry, + subscribe_retry_done_callback, + (void *) subscribe_retry_context); + /* Disconnect the database to see if the subscribe times out. */ + close(db->sub_context->c.fd); + /* Install handler for reconnecting the database. */ + event_loop_add_timer(g_loop, 150, + (event_loop_timer_handler) reconnect_db_callback, db); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, + (event_loop_timer_handler) terminate_event_loop_callback, + NULL); + event_loop_run(g_loop); + db_disconnect(db); + destroy_outstanding_callbacks(g_loop); + event_loop_destroy(g_loop); + ASSERT(subscribe_retry_succeeded); + PASS(); +} + +/* === Test publish retry === */ + +const char *publish_retry_context = "publish_retry"; +int publish_retry_succeeded = 0; + +void publish_retry_done_callback(object_id object_id, void *user_context) { + CHECK(user_context == (void *) publish_retry_context); + publish_retry_succeeded = 1; +} + +void publish_retry_fail_callback(unique_id id, + void *user_context, + void *user_data) { + /* The fail callback should not be called. */ + CHECK(0); +} + +TEST publish_retry_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); + db_attach(db, g_loop); + task *task = example_task(); + retry_info retry = { + .num_retries = 5, + .timeout = 100, + .fail_callback = publish_retry_fail_callback, + }; + task_table_update(db, task, &retry, publish_retry_done_callback, + (void *) publish_retry_context); + /* Disconnect the database to see if the publish times out. */ + close(db->sub_context->c.fd); + /* Install handler for reconnecting the database. */ + event_loop_add_timer(g_loop, 150, + (event_loop_timer_handler) reconnect_db_callback, db); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, + (event_loop_timer_handler) terminate_event_loop_callback, + NULL); + event_loop_run(g_loop); + db_disconnect(db); + destroy_outstanding_callbacks(g_loop); + event_loop_destroy(g_loop); + ASSERT(publish_retry_succeeded); + free_task(task); + PASS(); +} + +/* ==== Test if late succeed is working correctly ==== */ + +/* === Test subscribe late succeed === */ + +const char *subscribe_late_context = "subscribe_late"; +int subscribe_late_failed = 0; + +void subscribe_late_fail_callback(unique_id id, + void *user_context, + void *user_data) { + CHECK(user_context == (void *) subscribe_late_context); + subscribe_late_failed = 1; +} + +void subscribe_late_done_callback(task_id task_id, void *user_context) { + /* This function should never be called. */ + CHECK(0); +} + +TEST subscribe_late_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_attach(db, g_loop); + retry_info retry = { + .num_retries = 0, + .timeout = 0, + .fail_callback = subscribe_late_fail_callback, + }; + task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry, + subscribe_late_done_callback, + (void *) subscribe_late_context); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, + (event_loop_timer_handler) terminate_event_loop_callback, + NULL); + /* First process timer events to make sure the timeout is processed before + * anything else. */ + aeProcessEvents(g_loop, AE_TIME_EVENTS); + event_loop_run(g_loop); + db_disconnect(db); + destroy_outstanding_callbacks(g_loop); + event_loop_destroy(g_loop); + ASSERT(subscribe_late_failed); + PASS(); +} + +/* === Test publish late succeed === */ + +const char *publish_late_context = "publish_late"; +int publish_late_failed = 0; + +void publish_late_fail_callback(unique_id id, + void *user_context, + void *user_data) { + CHECK(user_context == (void *) publish_late_context); + publish_late_failed = 1; +} + +void publish_late_done_callback(task_id task_id, void *user_context) { + /* This function should never be called. */ + CHECK(0); +} + +TEST publish_late_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_attach(db, g_loop); + task *task = example_task(); + retry_info retry = { + .num_retries = 0, + .timeout = 0, + .fail_callback = publish_late_fail_callback, + }; + task_table_update(db, task, &retry, publish_late_done_callback, + (void *) publish_late_context); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, + (event_loop_timer_handler) terminate_event_loop_callback, + NULL); + /* First process timer events to make sure the timeout is processed before + * anything else. */ + aeProcessEvents(g_loop, AE_TIME_EVENTS); + event_loop_run(g_loop); + db_disconnect(db); + destroy_outstanding_callbacks(g_loop); + event_loop_destroy(g_loop); + ASSERT(publish_late_failed); + free_task(task); + PASS(); +} + +SUITE(task_table_tests) { + RUN_REDIS_TEST(lookup_nil_test); + RUN_REDIS_TEST(add_lookup_test); + RUN_TEST(subscribe_timeout_test); + RUN_TEST(publish_timeout_test); + RUN_TEST(subscribe_retry_test); + RUN_TEST(publish_retry_test); + RUN_TEST(subscribe_late_test); + RUN_TEST(publish_late_test); +} + +GREATEST_MAIN_DEFS(); + +int main(int argc, char **argv) { + GREATEST_MAIN_BEGIN(); + RUN_SUITE(task_table_tests); + GREATEST_MAIN_END(); +} diff --git a/src/common/test/task_tests.c b/src/common/test/task_tests.c index 1acff2264..301d78bab 100644 --- a/src/common/test/task_tests.c +++ b/src/common/test/task_tests.c @@ -14,33 +14,33 @@ SUITE(task_tests); TEST task_test(void) { task_id parent_task_id = globally_unique_id(); function_id func_id = globally_unique_id(); - task_spec *task = + task_spec *spec = start_construct_task_spec(parent_task_id, 0, func_id, 4, 2, 10); - ASSERT(task_num_args(task) == 4); - ASSERT(task_num_returns(task) == 2); + ASSERT(task_num_args(spec) == 4); + ASSERT(task_num_returns(spec) == 2); unique_id arg1 = globally_unique_id(); - ASSERT(task_args_add_ref(task, arg1) == 0); - ASSERT(task_args_add_val(task, (uint8_t *) "hello", 5) == 1); + ASSERT(task_args_add_ref(spec, arg1) == 0); + ASSERT(task_args_add_val(spec, (uint8_t *) "hello", 5) == 1); unique_id arg2 = globally_unique_id(); - ASSERT(task_args_add_ref(task, arg2) == 2); - ASSERT(task_args_add_val(task, (uint8_t *) "world", 5) == 3); - /* Finish constructing the task. This constructs the task ID and the task + ASSERT(task_args_add_ref(spec, arg2) == 2); + ASSERT(task_args_add_val(spec, (uint8_t *) "world", 5) == 3); + /* Finish constructing the spec. This constructs the task ID and the * return IDs. */ - finish_construct_task_spec(task); + finish_construct_task_spec(spec); - /* Check that the task was constructed as expected. */ - ASSERT(task_num_args(task) == 4); - ASSERT(task_num_returns(task) == 2); - ASSERT(function_ids_equal(task_function(task), func_id)); - ASSERT(object_ids_equal(task_arg_id(task, 0), arg1)); - ASSERT(memcmp(task_arg_val(task, 1), (uint8_t *) "hello", - task_arg_length(task, 1)) == 0); - ASSERT(object_ids_equal(task_arg_id(task, 2), arg2)); - ASSERT(memcmp(task_arg_val(task, 3), (uint8_t *) "world", - task_arg_length(task, 3)) == 0); + /* Check that the spec was constructed as expected. */ + ASSERT(task_num_args(spec) == 4); + ASSERT(task_num_returns(spec) == 2); + ASSERT(function_ids_equal(task_function(spec), func_id)); + ASSERT(object_ids_equal(task_arg_id(spec, 0), arg1)); + ASSERT(memcmp(task_arg_val(spec, 1), (uint8_t *) "hello", + task_arg_length(spec, 1)) == 0); + ASSERT(object_ids_equal(task_arg_id(spec, 2), arg2)); + ASSERT(memcmp(task_arg_val(spec, 3), (uint8_t *) "world", + task_arg_length(spec, 3)) == 0); - free_task_spec(task); + free_task_spec(spec); PASS(); } @@ -52,121 +52,121 @@ TEST deterministic_ids_test(void) { uint8_t *arg2 = (uint8_t *) "hello world"; /* Construct a first task. */ - task_spec *task1 = + task_spec *spec1 = start_construct_task_spec(parent_task_id, 0, func_id, 2, 3, 11); - task_args_add_ref(task1, arg1); - task_args_add_val(task1, arg2, 11); - finish_construct_task_spec(task1); + task_args_add_ref(spec1, arg1); + task_args_add_val(spec1, arg2, 11); + finish_construct_task_spec(spec1); /* Construct a second identical task. */ - task_spec *task2 = + task_spec *spec2 = start_construct_task_spec(parent_task_id, 0, func_id, 2, 3, 11); - task_args_add_ref(task2, arg1); - task_args_add_val(task2, arg2, 11); - finish_construct_task_spec(task2); + task_args_add_ref(spec2, arg1); + task_args_add_val(spec2, arg2, 11); + finish_construct_task_spec(spec2); /* Check that these tasks have the same task IDs and the same return IDs.*/ - ASSERT(task_ids_equal(task_task_id(task1), task_task_id(task2))); - ASSERT(object_ids_equal(task_return(task1, 0), task_return(task2, 0))); - ASSERT(object_ids_equal(task_return(task1, 1), task_return(task2, 1))); - ASSERT(object_ids_equal(task_return(task1, 2), task_return(task2, 2))); + ASSERT(task_ids_equal(task_spec_id(spec1), task_spec_id(spec2))); + ASSERT(object_ids_equal(task_return(spec1, 0), task_return(spec2, 0))); + ASSERT(object_ids_equal(task_return(spec1, 1), task_return(spec2, 1))); + ASSERT(object_ids_equal(task_return(spec1, 2), task_return(spec2, 2))); /* Check that the return IDs are all distinct. */ - ASSERT(!object_ids_equal(task_return(task1, 0), task_return(task2, 1))); - ASSERT(!object_ids_equal(task_return(task1, 0), task_return(task2, 2))); - ASSERT(!object_ids_equal(task_return(task1, 1), task_return(task2, 2))); + ASSERT(!object_ids_equal(task_return(spec1, 0), task_return(spec2, 1))); + ASSERT(!object_ids_equal(task_return(spec1, 0), task_return(spec2, 2))); + ASSERT(!object_ids_equal(task_return(spec1, 1), task_return(spec2, 2))); /* Create more tasks that are only mildly different. */ /* Construct a task with a different parent task ID. */ - task_spec *task3 = + task_spec *spec3 = start_construct_task_spec(globally_unique_id(), 0, func_id, 2, 3, 11); - task_args_add_ref(task3, arg1); - task_args_add_val(task3, arg2, 11); - finish_construct_task_spec(task3); + task_args_add_ref(spec3, arg1); + task_args_add_val(spec3, arg2, 11); + finish_construct_task_spec(spec3); /* Construct a task with a different parent counter. */ - task_spec *task4 = + task_spec *spec4 = start_construct_task_spec(parent_task_id, 1, func_id, 2, 3, 11); - task_args_add_ref(task4, arg1); - task_args_add_val(task4, arg2, 11); - finish_construct_task_spec(task4); + task_args_add_ref(spec4, arg1); + task_args_add_val(spec4, arg2, 11); + finish_construct_task_spec(spec4); /* Construct a task with a different function ID. */ - task_spec *task5 = start_construct_task_spec(parent_task_id, 0, + task_spec *spec5 = start_construct_task_spec(parent_task_id, 0, globally_unique_id(), 2, 3, 11); - task_args_add_ref(task5, arg1); - task_args_add_val(task5, arg2, 11); - finish_construct_task_spec(task5); + task_args_add_ref(spec5, arg1); + task_args_add_val(spec5, arg2, 11); + finish_construct_task_spec(spec5); /* Construct a task with a different object ID argument. */ - task_spec *task6 = + task_spec *spec6 = start_construct_task_spec(parent_task_id, 0, func_id, 2, 3, 11); - task_args_add_ref(task6, globally_unique_id()); - task_args_add_val(task6, arg2, 11); - finish_construct_task_spec(task6); + task_args_add_ref(spec6, globally_unique_id()); + task_args_add_val(spec6, arg2, 11); + finish_construct_task_spec(spec6); /* Construct a task with a different value argument. */ - task_spec *task7 = + task_spec *spec7 = start_construct_task_spec(parent_task_id, 0, func_id, 2, 3, 11); - task_args_add_ref(task7, arg1); - task_args_add_val(task7, (uint8_t *) "hello_world", 11); - finish_construct_task_spec(task7); + task_args_add_ref(spec7, arg1); + task_args_add_val(spec7, (uint8_t *) "hello_world", 11); + finish_construct_task_spec(spec7); /* Check that the task IDs are all distinct from the original. */ - ASSERT(!task_ids_equal(task_task_id(task1), task_task_id(task3))); - ASSERT(!task_ids_equal(task_task_id(task1), task_task_id(task4))); - ASSERT(!task_ids_equal(task_task_id(task1), task_task_id(task5))); - ASSERT(!task_ids_equal(task_task_id(task1), task_task_id(task6))); - ASSERT(!task_ids_equal(task_task_id(task1), task_task_id(task7))); + ASSERT(!task_ids_equal(task_spec_id(spec1), task_spec_id(spec3))); + ASSERT(!task_ids_equal(task_spec_id(spec1), task_spec_id(spec4))); + ASSERT(!task_ids_equal(task_spec_id(spec1), task_spec_id(spec5))); + ASSERT(!task_ids_equal(task_spec_id(spec1), task_spec_id(spec6))); + ASSERT(!task_ids_equal(task_spec_id(spec1), task_spec_id(spec7))); /* Check that the return object IDs are distinct from the originals. */ - task_spec *tasks[6] = {task1, task3, task4, task5, task6, task7}; + task_spec *specs[6] = {spec1, spec3, spec4, spec5, spec6, spec7}; for (int task_index1 = 0; task_index1 < 6; ++task_index1) { for (int return_index1 = 0; return_index1 < 3; ++return_index1) { for (int task_index2 = 0; task_index2 < 6; ++task_index2) { for (int return_index2 = 0; return_index2 < 3; ++return_index2) { if (task_index1 != task_index2 && return_index1 != return_index2) { ASSERT(!object_ids_equal( - task_return(tasks[task_index1], return_index1), - task_return(tasks[task_index2], return_index2))); + task_return(specs[task_index1], return_index1), + task_return(specs[task_index2], return_index2))); } } } } } - free_task_spec(task1); - free_task_spec(task2); - free_task_spec(task3); - free_task_spec(task4); - free_task_spec(task5); - free_task_spec(task6); - free_task_spec(task7); + free_task_spec(spec1); + free_task_spec(spec2); + free_task_spec(spec3); + free_task_spec(spec4); + free_task_spec(spec5); + free_task_spec(spec6); + free_task_spec(spec7); PASS(); } TEST send_task(void) { task_id parent_task_id = globally_unique_id(); function_id func_id = globally_unique_id(); - task_spec *task = + task_spec *spec = start_construct_task_spec(parent_task_id, 0, func_id, 4, 2, 10); - task_args_add_ref(task, globally_unique_id()); - task_args_add_val(task, (uint8_t *) "Hello", 5); - task_args_add_val(task, (uint8_t *) "World", 5); - task_args_add_ref(task, globally_unique_id()); - finish_construct_task_spec(task); + task_args_add_ref(spec, globally_unique_id()); + task_args_add_val(spec, (uint8_t *) "Hello", 5); + task_args_add_val(spec, (uint8_t *) "World", 5); + task_args_add_ref(spec, globally_unique_id()); + finish_construct_task_spec(spec); int fd[2]; socketpair(AF_UNIX, SOCK_STREAM, 0, fd); - write_message(fd[0], SUBMIT_TASK, task_size(task), (uint8_t *) task); + write_message(fd[0], SUBMIT_TASK, task_spec_size(spec), (uint8_t *) spec); int64_t type; int64_t length; uint8_t *message; read_message(fd[1], &type, &length, &message); task_spec *result = (task_spec *) message; ASSERT(type == SUBMIT_TASK); - ASSERT(memcmp(task, result, task_size(task)) == 0); - ASSERT(memcmp(task, result, task_size(result)) == 0); - free(task); + ASSERT(memcmp(spec, result, task_spec_size(spec)) == 0); + ASSERT(memcmp(spec, result, task_spec_size(result)) == 0); + free(spec); free(result); PASS(); } diff --git a/src/common/test/test_common.h b/src/common/test/test_common.h index fbcc3f35f..996af7619 100644 --- a/src/common/test/test_common.h +++ b/src/common/test/test_common.h @@ -5,7 +5,7 @@ #include "task.h" -task_spec *example_task(void) { +task_spec *example_task_spec(void) { task_id parent_task_id = globally_unique_id(); function_id func_id = globally_unique_id(); task_spec *task = @@ -16,11 +16,9 @@ task_spec *example_task(void) { return task; } -task_instance *example_task_instance(void) { - task_iid iid = globally_unique_id(); - task_spec *spec = example_task(); - task_instance *instance = - make_task_instance(iid, spec, TASK_STATUS_WAITING, NIL_ID); +task *example_task(void) { + task_spec *spec = example_task_spec(); + task *instance = alloc_task(spec, TASK_STATUS_WAITING, NIL_ID); free_task_spec(spec); return instance; } diff --git a/src/photon/photon_algorithm.c b/src/photon/photon_algorithm.c index d7a52bbfa..512450fff 100644 --- a/src/photon/photon_algorithm.c +++ b/src/photon/photon_algorithm.c @@ -4,12 +4,12 @@ #include "utarray.h" #include "utlist.h" -#include "state/task_log.h" +#include "state/task_table.h" #include "photon.h" #include "photon_scheduler.h" typedef struct task_queue_entry { - task_instance *task; + task *task; struct task_queue_entry *prev; struct task_queue_entry *next; } task_queue_entry; @@ -102,7 +102,7 @@ int find_and_schedule_task_if_possible(scheduler_info *info, int found_task_to_schedule = 0; /* Find the first task whose dependencies are available locally. */ DL_FOREACH_SAFE(state->task_queue, elt, tmp) { - spec = task_instance_task_spec(elt->task); + spec = task_task_spec(elt->task); if (can_run(state, spec)) { found_task_to_schedule = 1; break; @@ -122,43 +122,43 @@ int find_and_schedule_task_if_possible(scheduler_info *info, void handle_task_submitted(scheduler_info *info, scheduler_state *s, - task_spec *task) { + task_spec *spec) { /* Create a unique task instance ID. This is different from the task ID and * is used to distinguish between potentially multiple executions of the * task. */ - task_iid task_iid = globally_unique_id(); - task_instance *instance = - make_task_instance(task_iid, task, TASK_STATUS_WAITING, NIL_ID); + task *task = alloc_task(spec, TASK_STATUS_WAITING, NIL_ID); /* If this task's dependencies are available locally, and if there is an * available worker, then assign this task to an available worker. Otherwise, * add this task to the local task queue. */ int schedule_locally = - (utarray_len(s->available_workers) > 0) && can_run(s, task); + (utarray_len(s->available_workers) > 0) && can_run(s, spec); if (schedule_locally) { /* Get the last available worker in the available worker queue. */ int *worker_index = (int *) utarray_back(s->available_workers); /* Tell the available worker to execute the task. */ - assign_task_to_worker(info, task, *worker_index); + assign_task_to_worker(info, spec, *worker_index); /* Remove the available worker from the queue and free the struct. */ utarray_pop_back(s->available_workers); } else { /* Add the task to the task queue. This passes ownership of the task queue. * And the task will be freed when it is assigned to a worker. */ task_queue_entry *elt = malloc(sizeof(task_queue_entry)); - elt->task = instance; + elt->task = task; DL_APPEND(s->task_queue, elt); } /* Submit the task to redis. */ - /* TODO(swang): We should set these values in a config file somewhere. */ + /* TODO(swang): We should set retry values in a config file somewhere. */ retry_info retry = { .num_retries = 0, .timeout = 0, .fail_callback = NULL, }; - task_log_publish(info->db, instance, &retry, NULL, NULL); + /* TODO(swang): This should be task_table_update if the task is already in the + * log. */ + task_table_add_task(info->db, task, &retry, NULL, NULL); if (schedule_locally) { /* If the task was scheduled locally, we need to free it. Otherwise, * ownership of the task is passed to the task_queue, and it will be freed * when it is assigned to a worker. */ - free(instance); + free_task(task); } } diff --git a/src/photon/photon_algorithm.h b/src/photon/photon_algorithm.h index 714de8869..ff2f023e4 100644 --- a/src/photon/photon_algorithm.h +++ b/src/photon/photon_algorithm.h @@ -42,7 +42,7 @@ void free_scheduler_state(scheduler_state *state); */ void handle_task_submitted(scheduler_info *info, scheduler_state *state, - task_spec *task); + task_spec *spec); /** * This function will be called when a task is assigned by the global scheduler diff --git a/src/photon/photon_client.c b/src/photon/photon_client.c index 1bf87f491..2d20892cb 100644 --- a/src/photon/photon_client.c +++ b/src/photon/photon_client.c @@ -11,7 +11,8 @@ photon_conn *photon_connect(const char *photon_socket) { } void photon_submit(photon_conn *conn, task_spec *task) { - write_message(conn->conn, SUBMIT_TASK, task_size(task), (uint8_t *)task); + write_message(conn->conn, SUBMIT_TASK, task_spec_size(task), + (uint8_t *) task); } task_spec *photon_get_task(photon_conn *conn) { @@ -24,7 +25,7 @@ task_spec *photon_get_task(photon_conn *conn) { read_message(conn->conn, &type, &length, &message); CHECK(type == EXECUTE_TASK); task_spec *task = (task_spec *)message; - CHECK(length == task_size(task)); + CHECK(length == task_spec_size(task)); return task; } diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index 5c4e855b2..25c54e8a3 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -14,11 +14,12 @@ #include "photon_scheduler.h" #include "plasma_client.h" #include "state/db.h" -#include "state/task_log.h" +#include "state/task_table.h" +#include "state/object_table.h" #include "utarray.h" #include "uthash.h" -UT_icd task_ptr_icd = {sizeof(task_instance *), NULL, NULL, NULL}; +UT_icd task_ptr_icd = {sizeof(task *), NULL, NULL, NULL}; UT_icd worker_icd = {sizeof(worker), NULL, NULL, NULL}; /** Association between the socket fd of a worker and its worker_index. */ @@ -95,11 +96,11 @@ void free_local_scheduler(local_scheduler_state *s) { } void assign_task_to_worker(scheduler_info *info, - task_spec *task, + task_spec *spec, int worker_index) { CHECK(worker_index < utarray_len(info->workers)); worker *w = (worker *) utarray_eltptr(info->workers, worker_index); - write_message(w->sock, EXECUTE_TASK, task_size(task), (uint8_t *) task); + write_message(w->sock, EXECUTE_TASK, task_spec_size(spec), (uint8_t *) spec); } void process_plasma_notification(event_loop *loop, @@ -212,17 +213,14 @@ int main(int argc, char *argv[]) { plasma_socket_name = optarg; break; default: - LOG_ERR("unknown option %c", c); - exit(-1); + LOG_FATAL("unknown option %c", c); } } if (!scheduler_socket_name) { - LOG_ERR("please specify socket for incoming connections with -s switch"); - exit(-1); + LOG_FATAL("please specify socket for incoming connections with -s switch"); } if (!plasma_socket_name) { - LOG_ERR("please specify socket for connecting to Plasma with -p switch"); - exit(-1); + LOG_FATAL("please specify socket for connecting to Plasma with -p switch"); } /* Parse the Redis address into an IP address and a port. */ char redis_addr[16] = {0}; @@ -230,8 +228,8 @@ int main(int argc, char *argv[]) { if (!redis_addr_port || sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]", redis_addr, redis_port) != 2) { - LOG_ERR("need to specify redis address like 127.0.0.1:6379 with -r switch"); - exit(-1); + LOG_FATAL( + "need to specify redis address like 127.0.0.1:6379 with -r switch"); } start_server(scheduler_socket_name, &redis_addr[0], atoi(redis_port), plasma_socket_name); diff --git a/src/plasma/plasma_client.c b/src/plasma/plasma_client.c index 5ce42f3f5..695ae2c60 100644 --- a/src/plasma/plasma_client.c +++ b/src/plasma/plasma_client.c @@ -127,8 +127,7 @@ uint8_t *lookup_or_mmap(plasma_connection *conn, uint8_t *result = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (result == MAP_FAILED) { - LOG_ERR("mmap failed"); - exit(-1); + LOG_FATAL("mmap failed"); } close(fd); entry = malloc(sizeof(client_mmap_table_entry)); @@ -369,8 +368,7 @@ int socket_connect_retry(const char *socket_name, } /* If we could not connect to the socket, exit. */ if (fd == -1) { - LOG_ERR("could not connect to socket %s", socket_name); - exit(-1); + LOG_FATAL("could not connect to socket %s", socket_name); } return fd; } diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.c index c5c5894bc..d86b16d56 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.c @@ -328,8 +328,9 @@ void write_object_chunk(client_connection *conn, plasma_request_buffer *buf) { if (r > 0) { LOG_ERR("partial write on fd %d", conn->fd); } else { - LOG_ERR("write error"); - exit(-1); + /* TODO(swang): This should not be a fatal error, since connections can + * close at any time. */ + LOG_FATAL("write error"); } } else { conn->cursor += r; @@ -381,7 +382,7 @@ void send_queued_request(event_loop *loop, write_object_chunk(conn, buf); break; default: - LOG_ERR("Buffered request has unknown type."); + LOG_FATAL("Buffered request has unknown type."); } /* We are done sending this request. */ @@ -859,8 +860,7 @@ void process_message(event_loop *loop, free(conn); } break; default: - LOG_ERR("invalid request %" PRId64, type); - exit(-1); + LOG_FATAL("invalid request %" PRId64, type); } free(req); @@ -975,33 +975,28 @@ int main(int argc, char *argv[]) { db_host = optarg; break; default: - LOG_ERR("unknown option %c", c); - exit(-1); + LOG_FATAL("unknown option %c", c); } } if (!store_socket_name) { - LOG_ERR( + LOG_FATAL( "please specify socket for connecting to the plasma store with -s " "switch"); - exit(-1); } if (!manager_socket_name) { - LOG_ERR( + LOG_FATAL( "please specify socket name of the manager's local socket with -m " "switch"); - exit(-1); } if (!master_addr) { - LOG_ERR( + LOG_FATAL( "please specify ip address of the current host in the format " "123.456.789.10 with -h switch"); - exit(-1); } if (port == -1) { - LOG_ERR( + LOG_FATAL( "please specify port the plasma manager shall listen to in the" "format 12345 with -p switch"); - exit(-1); } char db_addr[16]; int db_port; diff --git a/src/plasma/plasma_store.c b/src/plasma/plasma_store.c index 8aae73a72..f3fa6830a 100644 --- a/src/plasma/plasma_store.c +++ b/src/plasma/plasma_store.c @@ -42,8 +42,7 @@ void dlfree(void *); void plasma_send_reply(int fd, plasma_reply *reply) { int reply_count = sizeof(plasma_reply); if (write(fd, reply, reply_count) != reply_count) { - LOG_ERR("write error, fd = %d", fd); - exit(-1); + LOG_FATAL("write error, fd = %d", fd); } } @@ -557,12 +556,10 @@ int main(int argc, char *argv[]) { } } if (!socket_name) { - LOG_ERR("please specify socket for incoming connections with -s switch"); - exit(-1); + LOG_FATAL("please specify socket for incoming connections with -s switch"); } if (system_memory == -1) { - LOG_ERR("please specify the amount of system memory with -m switch"); - exit(-1); + LOG_FATAL("please specify the amount of system memory with -m switch"); } LOG_DEBUG("starting server listening on %s", socket_name); start_server(socket_name, system_memory);