diff --git a/common.c b/common.c index 53e32fe13..d9fe1e951 100644 --- a/common.c +++ b/common.c @@ -6,6 +6,9 @@ #include #include +const unique_id NIL_ID = {{255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255}}; + unique_id globally_unique_id(void) { /* Use /dev/urandom for "real" randomness. */ int fd; @@ -31,63 +34,3 @@ char *sha1_to_hex(const unsigned char *sha1, char *buffer) { return buffer; } - -const signed char hexval_table[256] = { - -1, -1, -1, -1, -1, -1, -1, -1, /* 00-07 */ - -1, -1, -1, -1, -1, -1, -1, -1, /* 08-0f */ - -1, -1, -1, -1, -1, -1, -1, -1, /* 10-17 */ - -1, -1, -1, -1, -1, -1, -1, -1, /* 18-1f */ - -1, -1, -1, -1, -1, -1, -1, -1, /* 20-27 */ - -1, -1, -1, -1, -1, -1, -1, -1, /* 28-2f */ - +0, +1, +2, +3, +4, +5, +6, +7, /* 30-37 */ - +8, +9, -1, -1, -1, -1, -1, -1, /* 38-3f */ - -1, 10, 11, 12, 13, 14, 15, -1, /* 40-47 */ - -1, -1, -1, -1, -1, -1, -1, -1, /* 48-4f */ - -1, -1, -1, -1, -1, -1, -1, -1, /* 50-57 */ - -1, -1, -1, -1, -1, -1, -1, -1, /* 58-5f */ - -1, 10, 11, 12, 13, 14, 15, -1, /* 60-67 */ - -1, -1, -1, -1, -1, -1, -1, -1, /* 68-67 */ - -1, -1, -1, -1, -1, -1, -1, -1, /* 70-77 */ - -1, -1, -1, -1, -1, -1, -1, -1, /* 78-7f */ - -1, -1, -1, -1, -1, -1, -1, -1, /* 80-87 */ - -1, -1, -1, -1, -1, -1, -1, -1, /* 88-8f */ - -1, -1, -1, -1, -1, -1, -1, -1, /* 90-97 */ - -1, -1, -1, -1, -1, -1, -1, -1, /* 98-9f */ - -1, -1, -1, -1, -1, -1, -1, -1, /* a0-a7 */ - -1, -1, -1, -1, -1, -1, -1, -1, /* a8-af */ - -1, -1, -1, -1, -1, -1, -1, -1, /* b0-b7 */ - -1, -1, -1, -1, -1, -1, -1, -1, /* b8-bf */ - -1, -1, -1, -1, -1, -1, -1, -1, /* c0-c7 */ - -1, -1, -1, -1, -1, -1, -1, -1, /* c8-cf */ - -1, -1, -1, -1, -1, -1, -1, -1, /* d0-d7 */ - -1, -1, -1, -1, -1, -1, -1, -1, /* d8-df */ - -1, -1, -1, -1, -1, -1, -1, -1, /* e0-e7 */ - -1, -1, -1, -1, -1, -1, -1, -1, /* e8-ef */ - -1, -1, -1, -1, -1, -1, -1, -1, /* f0-f7 */ - -1, -1, -1, -1, -1, -1, -1, -1, /* f8-ff */ -}; - -static inline unsigned int hexval(unsigned char c) { - return hexval_table[c]; -} - -/* - * Convert two consecutive hexadecimal digits into a char. Return a - * negative value on error. Don't run over the end of short strings. - */ -static inline int hex2chr(const char *s) { - int val = hexval(s[0]); - return (val < 0) ? val : (val << 4) | hexval(s[1]); -} - -int hex_to_sha1(const char *hex, unsigned char *sha1) { - int i; - for (i = 0; i < UNIQUE_ID_SIZE; i++) { - int val = hex2chr(hex); - if (val < 0) - return -1; - *sha1++ = val; - hex += 2; - } - return 0; -} diff --git a/common.h b/common.h index 5444739bc..047644cc8 100644 --- a/common.h +++ b/common.h @@ -38,6 +38,8 @@ typedef struct { unsigned char id[UNIQUE_ID_SIZE]; } unique_id; +extern const unique_id NIL_ID; + /* Generate a globally unique ID. */ unique_id globally_unique_id(void); @@ -46,11 +48,6 @@ unique_id globally_unique_id(void); * UNIQUE_ID_SIZE + 1 */ char *sha1_to_hex(const unsigned char *sha1, char *buffer); -/* Convert a hexdecimal string of length 40 to a 20 byte sha1 hash. This - * function assumes that sha1 points to an already allocated char array of size - * UNIQUE_ID_SIZE. */ -int hex_to_sha1(const char *hex, unsigned char *sha1); - typedef unique_id object_id; #endif diff --git a/state/redis.c b/state/redis.c index aa2011d9a..db008cb5d 100644 --- a/state/redis.c +++ b/state/redis.c @@ -9,7 +9,7 @@ #include "common.h" #include "db.h" #include "object_table.h" -#include "task_queue.h" +#include "task_log.h" #include "event_loop.h" #include "redis.h" #include "io.h" @@ -65,22 +65,28 @@ db_handle *db_connect(const char *address, db->client_type = strdup(client_type); db->client_id = num_clients; - db->reading = 0; - db->writing = 0; db->service_cache = NULL; db->sync_context = context; + utarray_new(db->callback_freelist, &ut_ptr_icd); /* Establish async connection */ db->context = redisAsyncConnect(address, port); CHECK_REDIS_CONNECT(redisAsyncContext, db->context, "could not connect to redis %s:%d", address, port); db->context->data = (void *) db; + /* Establish async connection for subscription */ + db->sub_context = redisAsyncConnect(address, port); + CHECK_REDIS_CONNECT(redisAsyncContext, db->sub_context, + "could not connect to redis %s:%d", address, port); + db->sub_context->data = (void *) db; + return db; } void db_disconnect(db_handle *db) { redisFree(db->sync_context); redisAsyncFree(db->context); + redisAsyncFree(db->sub_context); service_cache_entry *e, *tmp; HASH_ITER(hh, db->service_cache, e, tmp) { free(e->addr); @@ -88,18 +94,22 @@ void db_disconnect(db_handle *db) { free(e); } free(db->client_type); + void **p = NULL; + while ((p = (void **) utarray_next(db->callback_freelist, p))) { + free(*p); + } + utarray_free(db->callback_freelist); free(db); } void db_attach(db_handle *db, event_loop *loop) { redisAeAttach(loop, db->context); + redisAeAttach(loop, db->sub_context); } void object_table_add(db_handle *db, unique_id object_id) { - static char hex_object_id[2 * UNIQUE_ID_SIZE + 1]; - sha1_to_hex(&object_id.id[0], &hex_object_id[0]); - redisAsyncCommand(db->context, NULL, NULL, "SADD obj:%s %d", - &hex_object_id[0], db->client_id); + redisAsyncCommand(db->context, NULL, NULL, "SADD obj:%b %d", &object_id.id[0], + UNIQUE_ID_SIZE, db->client_id); if (db->context->err) { LOG_REDIS_ERR(db->context, "could not add object_table entry"); } @@ -148,30 +158,75 @@ void object_table_get_entry(redisAsyncContext *c, void *r, void *privdata) { void object_table_lookup(db_handle *db, object_id object_id, lookup_callback callback) { - static char hex_object_id[2 * UNIQUE_ID_SIZE + 1]; - sha1_to_hex(&object_id.id[0], &hex_object_id[0]); lookup_callback_data *cb_data = malloc(sizeof(lookup_callback_data)); cb_data->callback = callback; cb_data->object_id = object_id; redisAsyncCommand(db->context, object_table_get_entry, cb_data, - "SMEMBERS obj:%s", &hex_object_id[0]); + "SMEMBERS obj:%b", &object_id.id[0], UNIQUE_ID_SIZE); if (db->context->err) { LOG_REDIS_ERR(db->context, "error in object_table lookup"); } } -void task_queue_submit_task(db_handle *db, task_iid task_iid, task_spec *task) { - /* For converting an id to hex, which has double the number - * of bytes compared to the id (+ 1 byte for '\0'). */ - static char hex[2 * UNIQUE_ID_SIZE + 1]; - UT_string *command; - utstring_new(command); - sha1_to_hex(&task_iid.id[0], &hex[0]); - utstring_printf(command, "HMSET queue:%s ", &hex[0]); - print_task(task, command); - redisAsyncCommand(db->context, NULL, NULL, utstring_body(command)); +void task_log_add_task(db_handle *db, task_instance *task_instance) { + task_iid task_iid = *task_instance_id(task_instance); + redisAsyncCommand(db->context, NULL, NULL, "HMSET tasklog:%b 0 %b", + (char *) &task_iid.id[0], UNIQUE_ID_SIZE, + (char *) task_instance, task_instance_size(task_instance)); if (db->context->err) { - LOG_REDIS_ERR(db->context, "error in task_queue submit_task"); + LOG_REDIS_ERR(db->context, "error setting task in task_log_add_task"); + } + node_id node = *task_instance_node(task_instance); + int32_t state = *task_instance_state(task_instance); + redisAsyncCommand(db->context, NULL, NULL, "PUBLISH task_log:%b:%d %b", + (char *) &node.id[0], UNIQUE_ID_SIZE, state, + (char *) task_instance, task_instance_size(task_instance)); + if (db->context->err) { + LOG_REDIS_ERR(db->context, "error publishing task in task_log_add_task"); + } +} + +void task_log_redis_callback(redisAsyncContext *c, + void *reply, + void *privdata) { + redisReply *r = reply; + if (reply == NULL) + return; + CHECK(r->type == REDIS_REPLY_ARRAY); + /* First entry is message type, second is topic, third is payload. */ + CHECK(r->elements > 2); + /* If this condition is true, we got the initial message that acknowledged the + * subscription. */ + if (r->element[2]->str == NULL) { + return; + } + /* Otherwise, parse the task and call the callback. */ + CHECK(privdata); + task_log_callback_data *callback_data = privdata; + task_instance *instance = malloc(r->element[2]->len); + memcpy(instance, r->element[2]->str, r->element[2]->len); + callback_data->callback(instance, callback_data->userdata); + task_instance_free(instance); +} +void task_log_register_callback(db_handle *db, + task_log_callback callback, + node_id node, + int32_t state, + void *userdata) { + task_log_callback_data *callback_data = + malloc(sizeof(task_log_callback_data)); + utarray_push_back(db->callback_freelist, &callback_data); + callback_data->callback = callback; + callback_data->userdata = userdata; + if (memcmp(&node.id[0], &NIL_ID.id[0], UNIQUE_ID_SIZE) == 0) { + redisAsyncCommand(db->sub_context, task_log_redis_callback, callback_data, + "PSUBSCRIBE task_log:*:%d", state); + } else { + redisAsyncCommand(db->sub_context, task_log_redis_callback, callback_data, + "SUBSCRIBE task_log:%b:%d", (char *) &node.id[0], + UNIQUE_ID_SIZE, state); + } + if (db->sub_context->err) { + LOG_REDIS_ERR(db->sub_context, "error in task_log_register_callback"); } - utstring_free(command); } diff --git a/state/redis.h b/state/redis.h index 23ebb2ba6..51479f8f0 100644 --- a/state/redis.h +++ b/state/redis.h @@ -3,10 +3,12 @@ #include "db.h" #include "object_table.h" +#include "task_log.h" #include "hiredis/hiredis.h" #include "hiredis/async.h" #include "uthash.h" +#include "utarray.h" typedef struct { /* Unique ID for this service. */ @@ -17,6 +19,13 @@ typedef struct { UT_hash_handle hh; } service_cache_entry; +typedef struct { + /* The callback that will be called. */ + task_log_callback callback; + /* Userdata associated with the callback. */ + void *userdata; +} task_log_callback_data; + struct db_handle_impl { /* String that identifies this client type. */ char *client_type; @@ -24,8 +33,10 @@ struct db_handle_impl { int64_t client_id; /* Redis context for this global state store connection. */ redisAsyncContext *context; - /* Which events are we processing (read, write)? */ - int reading, writing; + /* Redis context for "subscribe" communication. + * Yes, we need a separate one for that, see + * https://github.com/redis/hiredis/issues/55 */ + redisAsyncContext *sub_context; /* The event loop this global state store connection is part of. */ event_loop *loop; /* Index of the database connection in the event loop */ @@ -35,6 +46,8 @@ struct db_handle_impl { /* Redis context for synchronous connections. * Should only be used very rarely, it is not asynchronous. */ redisContext *sync_context; + /* Data structure for callbacks that needs to be freed. */ + UT_array *callback_freelist; }; typedef struct { diff --git a/state/task_log.h b/state/task_log.h new file mode 100644 index 000000000..acf5dbcd0 --- /dev/null +++ b/state/task_log.h @@ -0,0 +1,41 @@ +#ifndef TASK_LOG_H +#define TASK_LOG_H + +#include "db.h" +#include "task.h" + +/* The task log is a message bus that is used for all communication between + * local and global schedulers (and also persisted to the state database). + * Here are examples of events that are recorded by the task log: + * + * 1) local scheduler writes it when submits a task to the global scheduler; + * 2) global scheduler reads it to get the task submitted by local schedulers; + * 3) global scheduler writes it when assigning the task to a local scheduler; + * 4) local scheduler reads it to get its tasks assigned by global scheduler; + * 5) local scheduler writes it when a task finishes execution; + * 6) global scheduler reads it to get the tasks that have finished; */ + +/* Callback for subscribing to the task log. */ +typedef void (*task_log_callback)(task_instance *task_instance, void *userdata); + +/* Initially add a task instance to the task log. */ +void task_log_add_task(db_handle *db, task_instance *task_instance); + +/* Update task instance in the task log. */ +void task_log_update_task(db_handle *db, + task_iid task_iid, + int32_t state, + node_id node); + +/* Register callback for a certain event. The node specifies the node whose + * events we want to listen to. If you want to listen to all events for this + * node, use state_filter = + * TASK_WAITING | TASK_SCHEDULED | TASK_RUNNING | TASK_DONE. + * If you want to register to updates from all nodes, set node = NIL_ID. */ +void task_log_register_callback(db_handle *db, + task_log_callback callback, + node_id node, + int32_t state_filter, + void *userdata); + +#endif /* TASK_LOG_H */ diff --git a/state/task_queue.h b/state/task_queue.h deleted file mode 100644 index 92968707e..000000000 --- a/state/task_queue.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef TASK_QUEUE_H -#define TASK_QUEUE_H - -#include "db.h" -#include "task.h" - -/* The task ID is a deterministic hash of the function ID that - * the task executes and the argument IDs or argument values */ -typedef unique_id task_id; - -/* The task instance ID is a globally unique ID generated which - * identifies this particular execution of the task */ -typedef unique_id task_iid; - -/* The node id is an identifier for the node the task is - * scheduled on */ -typedef unique_id node_id; - -/* Callback for subscribing to the task queue. The only argument this - * callback gets is the task_id of the. */ -typedef void (*task_queue_callback)(task_iid *task_iid, task_spec *task); - -/* Submit task to the global scheduler. */ -void task_queue_submit_task(db_handle *db, task_iid task_iid, task_spec *task); - -/* Submit task to a local scheduler based on the decision made by the global - * scheduler. */ -void task_queue_schedule_task(db_handle *db, task_iid task_iid, node_id node); - -/* Subscribe to task queue. */ -void task_queue_register_callback(db_handle *db, task_queue_callback callback); - -#endif diff --git a/state/task_table.h b/state/task_table.h index 71e879c2c..3a1852522 100644 --- a/state/task_table.h +++ b/state/task_table.h @@ -5,9 +5,16 @@ #include "task.h" /* Add task to the task table, handle errors here. */ -status task_table_add_task(db_handle *db, task_iid task_iid, task_spec *task); +status task_table_add_task(db_handle *db, task_spec *task); + +/* Callback for getting an entry from the task table. Task spec will be freed + * by the system after the callback */ +typedef void (*task_table_callback)(task_spec *task, void *context); /* Get specific task from the task table. */ -status task_table_get_task(db_handle *db, task_iid task_iid, task_spec *task); +status task_table_get_task(db_handle *db, + task_id task_id, + task_table_callback callback, + void *context); #endif /* TASK_TABLE_H */ diff --git a/task.c b/task.c index 30fde8d31..0086eb337 100644 --- a/task.c +++ b/task.c @@ -8,6 +8,8 @@ #include "common.h" #include "io.h" +/* TASK SPECIFICATIONS */ + /* Tasks are stored in a consecutive chunk of memory, the first * sizeof(task_spec) bytes are arranged according to the struct * task_spec. Then there is an array of task_args of length @@ -168,65 +170,49 @@ void print_task(task_spec *spec, UT_string *output) { } } -UT_icd unique_id_icd = {sizeof(unique_id), NULL, NULL, NULL}; +/* TASK INSTANCES */ -task_spec *parse_task(char *task_string, int64_t task_length) { - /* We make one pass through task_string to store all the argument ids - * in "args" and all the return ids in "returns". */ - UT_array *args; - utarray_new(args, &unique_id_icd); - UT_array *returns; - utarray_new(returns, &unique_id_icd); - function_id function_id; - char *cursor = strtok(task_string, " "); - int index = 0; - while (cursor != NULL) { - /* This will be equal to "args" or "returns" depending on whether we - * are processing an argument id or a return id. */ - UT_array *target = NULL; - if (strncmp("fun", cursor, 3) == 0) { - /* Parse function id. */ - CHECK(cursor + 2 * UNIQUE_ID_SIZE + 1 <= task_string + task_length); - cursor = strtok(NULL, " "); - hex_to_sha1(cursor, &function_id.id[0]); - cursor = strtok(NULL, " "); - CHECK(cursor); - continue; - } else if (strncmp("id:", cursor, 3) == 0) { - /* Parse pass by reference argument. */ - sscanf(cursor, "id:%d", &index); - target = args; - } else if (strncmp("val:", cursor, 4) == 0) { - /* Parse pass by value argument. */ - sscanf(cursor, "val:%d", &index); - CHECK(0); /* Not implemented yet */ - } else if (strncmp("ret:", cursor, 4) == 0) { - /* Parse return object reference. */ - sscanf(cursor, "ret:%d", &index); - target = returns; - } - cursor = strtok(NULL, " "); - CHECK(cursor); - if (index >= utarray_len(target)) { - utarray_resize(target, index + 1); - } - object_id *id = (object_id *) utarray_eltptr(target, index); - hex_to_sha1(cursor, &id->id[0]); - cursor = strtok(NULL, " "); - } - /* TODO(pcm): Implement pass by value. */ - /* Now assemble the task specification. */ - task_spec *spec = - alloc_task_spec(function_id, utarray_len(args), utarray_len(returns), 0); - for (int i = 0; i < utarray_len(args); ++i) { - object_id *id = (object_id *) utarray_eltptr(args, i); - task_args_add_ref(spec, *id); - } - for (int i = 0; i < utarray_len(returns); ++i) { - object_id *id = (object_id *) utarray_eltptr(returns, i); - *task_return(spec, i) = *id; - } - utarray_free(args); - utarray_free(returns); - return spec; +struct task_instance_impl { + task_iid iid; + int32_t state; + node_id node; + task_spec spec; +}; + +task_instance *make_task_instance(task_iid task_iid, + task_spec *spec, + int32_t state, + node_id node) { + int64_t size = sizeof(task_instance) - sizeof(task_spec) + task_size(spec); + task_instance *result = malloc(size); + memset(result, 0, size); + result->iid = task_iid; + result->state = state; + result->node = node; + memcpy(&result->spec, spec, task_size(spec)); + return result; +} + +int64_t task_instance_size(task_instance *instance) { + return sizeof(task_instance) - sizeof(task_spec) + task_size(&instance->spec); +} + +task_iid *task_instance_id(task_instance *instance) { + return &instance->iid; +} + +int32_t *task_instance_state(task_instance *instance) { + return &instance->state; +} + +node_id *task_instance_node(task_instance *instance) { + return &instance->node; +} + +task_spec *task_instance_task_spec(task_instance *instance) { + return &instance->spec; +} + +void task_instance_free(task_instance *instance) { + free(instance); } diff --git a/task.h b/task.h index 96c97b80d..9267edb65 100644 --- a/task.h +++ b/task.h @@ -1,7 +1,7 @@ #ifndef TASK_H #define TASK_H -/* This API specifies the task data structure. It is in C so we can +/* This API specifies the task data structures. It is in C so we can * easily construct tasks from other languages like Python. The datastructures * are also defined in such a way that memory is contiguous and all pointers * are relative, so that we can memcpy the datastructure and ship it over the @@ -15,6 +15,24 @@ typedef unique_id function_id; typedef unique_id object_id; +/* The task ID is a deterministic hash of the function ID that + * the task executes and the argument IDs or argument values */ +typedef unique_id task_id; + +/* The task instance ID is a globally unique ID generated which + * identifies this particular execution of the task */ +typedef unique_id task_iid; + +/* The node id is an identifier for the node the task is + * scheduled on */ +typedef unique_id node_id; + +/* + * TASK SPECIFICATIONS: Contain all the information neccessary + * to execute the task (function id, arguments, return object ids). + * + */ + typedef struct task_spec_impl task_spec; /* If argument is passed by value or reference. */ @@ -65,7 +83,51 @@ task_spec *read_task(int fd); /* Print task as a humanly readable string. */ void print_task(task_spec *spec, UT_string *output); -/* Parse task as printed by print_task. */ -task_spec *parse_task(char *task_string, int64_t task_length); +/* + * SCHEDULED TASK: Contains information about a scheduled task: + * the task iid, the task specification and the task status + * (WAITING, SCHEDULED, RUNNING, DONE) and which node the + * task is scheduled on. + * + */ + +/* The scheduling_state can be used as a flag when we are listening + * for an event, for example TASK_WAITING | TASK_SCHEDULED. */ +enum scheduling_state { + TASK_WAITING = 1, + TASK_SCHEDULED = 2, + TASK_RUNNING = 4, + TASK_DONE = 8 +}; + +/* A task instance is one execution of a task specification. + * It has a unique instance id, a state of execution (see scheduling_state) + * and a node it is scheduled on or running on. */ +typedef struct task_instance_impl task_instance; + +/* Allocate and initialize a new task instance. Must be freed with + * scheduled_task_free after use. */ +task_instance *make_task_instance(task_iid task_iid, + task_spec *task, + int32_t state, + node_id node); + +/* Size of task instance structure in bytes. */ +int64_t task_instance_size(task_instance *instance); + +/* Instance ID of the task instance. */ +task_iid *task_instance_id(task_instance *instance); + +/* The scheduling state of the task instance. */ +int32_t *task_instance_state(task_instance *instance); + +/* Node this task instance has been assigned to or is running on. */ +node_id *task_instance_node(task_instance *instance); + +/* Task specification of this task instance. */ +task_spec *task_instance_task_spec(task_instance *instance); + +/* Free this task instance datastructure. */ +void task_instance_free(task_instance *instance); #endif diff --git a/test/common_tests.c b/test/common_tests.c index 3673c335d..47b643039 100644 --- a/test/common_tests.c +++ b/test/common_tests.c @@ -6,11 +6,8 @@ SUITE(common_tests); TEST sha1_test(void) { static char hex[2 * UNIQUE_ID_SIZE + 1]; - static unsigned char id[UNIQUE_ID_SIZE]; unique_id uid = globally_unique_id(); sha1_to_hex(&uid.id[0], &hex[0]); - hex_to_sha1(&hex[0], &id[0]); - ASSERT(memcmp(&uid.id[0], &id[0], 20) == 0); PASS(); } diff --git a/test/db_tests.c b/test/db_tests.c index 99fad7e18..96f16b528 100644 --- a/test/db_tests.c +++ b/test/db_tests.c @@ -6,7 +6,7 @@ #include "test/example_task.h" #include "state/db.h" #include "state/object_table.h" -#include "state/task_queue.h" +#include "state/task_log.h" #include "state/redis.h" #include "task.h" @@ -72,27 +72,70 @@ TEST object_table_lookup_test(void) { PASS(); } -TEST task_queue_test(void) { +void task_log_test_callback(task_instance *instance, void *userdata) { + task_instance *other = userdata; + CHECK(*task_instance_state(instance) == TASK_SCHEDULED); + CHECK(task_instance_size(instance) == task_instance_size(other)); + CHECK(memcmp(instance, other, task_instance_size(instance)) == 0); +} + +TEST task_log_test(void) { event_loop *loop = event_loop_create(); db_handle *db = db_connect("127.0.0.1", 6379, "local_scheduler", "", -1); db_attach(db, loop); - + node_id node = globally_unique_id(); task_spec *task = example_task(); - task_queue_submit_task(db, globally_unique_id(), task); + task_instance *instance = + make_task_instance(globally_unique_id(), task, TASK_SCHEDULED, node); + task_log_register_callback(db, task_log_test_callback, node, TASK_SCHEDULED, + instance); + task_log_add_task(db, instance); event_loop_add_timer(loop, 100, timeout_handler, NULL); event_loop_run(loop); - + task_instance_free(instance); free_task_spec(task); db_disconnect(db); event_loop_destroy(loop); PASS(); } +int num_test_callback_called = 0; + +void task_log_all_test_callback(task_instance *instance, void *userdata) { + num_test_callback_called += 1; +} + +TEST task_log_all_test(void) { + event_loop *loop = event_loop_create(); + db_handle *db = db_connect("127.0.0.1", 6379, "local_scheduler", "", -1); + db_attach(db, loop); + task_spec *task = example_task(); + /* Schedule two tasks on different nodes. */ + task_instance *instance1 = make_task_instance( + globally_unique_id(), task, TASK_SCHEDULED, globally_unique_id()); + task_instance *instance2 = make_task_instance( + globally_unique_id(), task, TASK_SCHEDULED, globally_unique_id()); + task_log_register_callback(db, task_log_all_test_callback, NIL_ID, + TASK_SCHEDULED, NULL); + task_log_add_task(db, instance1); + task_log_add_task(db, instance2); + event_loop_add_timer(loop, 100, timeout_handler, NULL); + event_loop_run(loop); + task_instance_free(instance2); + task_instance_free(instance1); + free_task_spec(task); + db_disconnect(db); + event_loop_destroy(loop); + ASSERT(num_test_callback_called == 2); + PASS(); +} + SUITE(db_tests) { redisContext *context = redisConnect("127.0.0.1", 6379); freeReplyObject(redisCommand(context, "FLUSHALL")); RUN_REDIS_TEST(context, object_table_lookup_test); - RUN_REDIS_TEST(context, task_queue_test); + RUN_REDIS_TEST(context, task_log_test); + RUN_REDIS_TEST(context, task_log_all_test); redisFree(context); } diff --git a/test/task_tests.c b/test/task_tests.c index fcb714737..f72a0e2c2 100644 --- a/test/task_tests.c +++ b/test/task_tests.c @@ -63,28 +63,9 @@ TEST send_task(void) { PASS(); } -TEST print_and_parse_task(void) { - task_spec *task = example_task(); - - UT_string *output; - utstring_new(output); - print_task(task, output); - task_spec *result = parse_task(utstring_body(output), utstring_len(output)); - utstring_free(output); - - ASSERT_EQ(task_size(task), task_size(result)); - ASSERT(memcmp(task, result, task_size(task)) == 0); - - free_task_spec(task); - free_task_spec(result); - - PASS(); -} - SUITE(task_tests) { RUN_TEST(task_test); RUN_TEST(send_task); - RUN_TEST(print_and_parse_task); } GREATEST_MAIN_DEFS();