Merge task table and task log into a single table (#30)

* Merge task table and task log

* Fix test in db tests

* Address Robert's comments and some better error checking

* Add a LOG_FATAL that exits the program
This commit is contained in:
Stephanie Wang
2016-11-10 18:13:26 -08:00
committed by Philipp Moritz
parent 194bdb1d96
commit 9d1e750e8f
30 changed files with 1578 additions and 842 deletions
+6 -6
View File
@@ -4,7 +4,7 @@ BUILD = build
all: hiredis $(BUILD)/libcommon.a
$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o state/table.o state/object_table.o state/task_log.o thirdparty/ae/ae.o thirdparty/sha256.o
$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o state/redis.o state/table.o state/object_table.o state/task_table.o thirdparty/ae/ae.o thirdparty/sha256.o
ar rcs $@ $^
$(BUILD)/common_tests: test/common_tests.c $(BUILD)/libcommon.a
@@ -16,8 +16,8 @@ $(BUILD)/db_tests: hiredis test/db_tests.c $(BUILD)/libcommon.a
$(BUILD)/object_table_tests: hiredis test/object_table_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ test/object_table_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS)
$(BUILD)/task_log_tests: hiredis test/task_log_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ test/task_log_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS)
$(BUILD)/task_table_tests: hiredis test/task_table_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ test/task_table_tests.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS)
$(BUILD)/io_tests: test/io_tests.c $(BUILD)/libcommon.a
$(CC) -o $@ $^ $(CFLAGS)
@@ -38,7 +38,7 @@ redis:
hiredis:
git submodule update --init --recursive -- "thirdparty/hiredis" ; cd thirdparty/hiredis ; make
test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_log_tests $(BUILD)/object_table_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE
test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_table_tests $(BUILD)/object_table_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE
./thirdparty/redis-3.2.3/src/redis-server &
sleep 1s
./build/common_tests
@@ -46,7 +46,7 @@ test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_log_tests $(BUILD)/objec
./build/io_tests
./build/task_tests
./build/redis_tests
./build/task_log_tests
./build/task_table_tests
./build/object_table_tests
valgrind: test
@@ -55,7 +55,7 @@ valgrind: test
valgrind --leak-check=full --error-exitcode=1 ./build/io_tests
valgrind --leak-check=full --error-exitcode=1 ./build/task_tests
valgrind --leak-check=full --error-exitcode=1 ./build/redis_tests
valgrind --leak-check=full --error-exitcode=1 ./build/task_log_tests
valgrind --leak-check=full --error-exitcode=1 ./build/task_table_tests
valgrind --leak-check=full --error-exitcode=1 ./build/object_table_tests
FORCE:
+5 -1
View File
@@ -25,7 +25,11 @@ unique_id globally_unique_id(void) {
}
bool object_ids_equal(object_id first_id, object_id second_id) {
return memcmp(&first_id, &second_id, sizeof(object_id)) == 0 ? true : false;
return UNIQUE_ID_EQ(first_id, second_id) ? true : false;
}
bool object_id_is_nil(object_id id) {
return object_ids_equal(id, NIL_OBJECT_ID);
}
char *sha1_to_hex(const unsigned char *sha1, char *buffer) {
+27 -9
View File
@@ -25,17 +25,21 @@
#define LOG_INFO(M, ...) \
fprintf(stderr, "[INFO] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)
#define CHECKM(COND, M, ...) \
do { \
if (!(COND)) { \
LOG_ERR("Check failure: %s \n" M, #COND, ##__VA_ARGS__); \
void *buffer[255]; \
const int calls = backtrace(buffer, sizeof(buffer) / sizeof(void *)); \
backtrace_symbols_fd(buffer, calls, 1); \
exit(-1); \
} \
#define LOG_FATAL(M, ...) \
do { \
fprintf(stderr, "[FATAL] (%s:%d) " M "\n", __FILE__, __LINE__, \
##__VA_ARGS__); \
void *buffer[255]; \
const int calls = backtrace(buffer, sizeof(buffer) / sizeof(void *)); \
backtrace_symbols_fd(buffer, calls, 1); \
exit(-1); \
} while (0);
#define CHECKM(COND, M, ...) \
if (!(COND)) { \
LOG_ERR("Check failure: %s \n" M, #COND, ##__VA_ARGS__); \
}
#define CHECK(COND) CHECKM(COND, "")
/* This should be defined if we want to check calls to DCHECK. */
@@ -56,6 +60,10 @@
#define UNIQUE_ID_SIZE 20
#define UNIQUE_ID_EQ(id1, id2) (memcmp((id1).id, (id2).id, UNIQUE_ID_SIZE) == 0)
#define IS_NIL_ID(id) UNIQUE_ID_EQ(id, NIL_ID)
typedef struct { unsigned char id[UNIQUE_ID_SIZE]; } unique_id;
extern const UT_icd object_id_icd;
@@ -70,6 +78,8 @@ unique_id globally_unique_id(void);
* UNIQUE_ID_SIZE + 1 */
char *sha1_to_hex(const unsigned char *sha1, char *buffer);
#define NIL_OBJECT_ID NIL_ID
typedef unique_id object_id;
/**
@@ -81,4 +91,12 @@ typedef unique_id object_id;
*/
bool object_ids_equal(object_id first_id, object_id second_id);
/**
* Compare a object ID to the nil ID.
*
* @param id The object ID to compare to nil.
* @return True if the object ID is equal to nil.
*/
bool object_id_is_nil(object_id id);
#endif
+6 -7
View File
@@ -3,6 +3,7 @@
A *task specification* contains all information that is needed for computing
the results of a task:
- The ID of the task
- The function ID of the function that executes the task
- The arguments (either object IDs for pass by reference
or values for pass by value)
@@ -11,23 +12,21 @@ or values for pass by value)
From these, a task ID can be computed which is also stored in the task
specification.
A *task instance* represents one execution of a task specification.
A *task* represents the execution of a task specification.
It consists of:
- A scheduling state (WAITING, SCHEDULED, RUNNING, DONE)
- The target node where the task is scheduled or executed
- A unique task instance ID that identifies the particular execution
of the task.
- The task specification
The task data structures are defined in `common/task.h`.
The *task log* is a mapping from the task instance ID to a sequence of
updates to the status of the task instance. It is updated by various parts
of the system:
The *task table* is a mapping from the task ID to the *task* information. It is
updated by various parts of the system:
1. The local scheduler writes it with status WAITING when submits a task to the global scheduler
2. The global scheduler appends an update WAITING -> SCHEDULED together with the node ID when assigning the task to a local scheduler
3. The local scheduler appends an update SCHEDULED -> RUNNING when it assigns a task to a worker
4. The local scheduler appends an update RUNNING -> DONE when the task finishes execution
The task log is defined in `common/state/task_log.h`.
The task table is defined in `common/state/task_table.h`.
+4 -4
View File
@@ -28,12 +28,12 @@ static int PyObjectID_init(PyObjectID *self, PyObject *args, PyObject *kwds) {
if (!PyArg_ParseTuple(args, "s#", &data, &size)) {
return -1;
}
if (size != UNIQUE_ID_SIZE) {
if (size != sizeof(object_id)) {
PyErr_SetString(CommonError,
"ObjectID: object id string needs to have length 20");
return -1;
}
memcpy(&self->object_id.id[0], data, UNIQUE_ID_SIZE);
memcpy(&self->object_id.id[0], data, sizeof(object_id));
return 0;
}
@@ -48,7 +48,7 @@ PyObject *PyObjectID_make(object_id object_id) {
static PyObject *PyObjectID_id(PyObject *self) {
PyObjectID *s = (PyObjectID *) self;
return PyString_FromStringAndSize((char *) &s->object_id.id[0],
UNIQUE_ID_SIZE);
sizeof(object_id));
}
static PyObject *PyObjectID___reduce__(PyObjectID *self) {
@@ -176,7 +176,7 @@ static PyObject *PyTask_function_id(PyObject *self) {
}
static PyObject *PyTask_task_id(PyObject *self) {
task_id task_id = task_task_id(((PyTask *) self)->spec);
task_id task_id = task_spec_id(((PyTask *) self)->spec);
return PyObjectID_make(task_id);
}
+21
View File
@@ -36,3 +36,24 @@ void object_table_subscribe(
init_table_callback(db_handle, object_id, sub_data, retry, done_callback,
redis_object_table_subscribe, user_context);
}
void result_table_add(db_handle *db_handle,
object_id object_id,
task_id task_id_arg,
retry_info *retry,
result_table_done_callback done_callback,
void *user_context) {
task_id *task_id_copy = malloc(sizeof(task_id));
memcpy(task_id_copy, task_id_arg.id, sizeof(task_id));
init_table_callback(db_handle, object_id, task_id_copy, retry, done_callback,
redis_result_table_add, user_context);
}
void result_table_lookup(db_handle *db_handle,
object_id object_id,
retry_info *retry,
result_table_lookup_callback done_callback,
void *user_context) {
init_table_callback(db_handle, object_id, NULL, retry, done_callback,
redis_result_table_lookup, user_context);
}
+52
View File
@@ -4,6 +4,7 @@
#include "common.h"
#include "table.h"
#include "db.h"
#include "task.h"
/*
* ==== Lookup call and callback ====
@@ -123,4 +124,55 @@ typedef struct {
void *subscribe_context;
} object_table_subscribe_data;
/*
* ==== Result table ====
*/
/**
* Callback called when the add/remove operation for a result table entry
* completes. */
typedef void (*result_table_done_callback)(object_id object_id,
void *user_context);
/**
* Add information about a new object to the object table. This
* is immutable information like the ID of the task that
* created the object.
*
* @param db_handle Handle to object_table database.
* @param object_id ID of the object to add.
* @param task_id ID of the task that creates this object.
* @param retry Information about retrying the request to the database.
* @param done_callback Function to be called when database returns result.
* @param user_context Context passed by the caller.
* @return Void.
*/
void result_table_add(db_handle *db_handle,
object_id object_id,
task_id task_id,
retry_info *retry,
result_table_done_callback done_callback,
void *user_context);
/** Callback called when the result table lookup completes. */
typedef void (*result_table_lookup_callback)(object_id object_id,
task *task,
void *user_context);
/**
* Lookup the task that created an object in the result table.
*
* @param db_handle Handle to object_table database.
* @param object_id ID of the object to lookup.
* @param retry Information about retrying the request to the database.
* @param done_callback Function to be called when database returns result.
* @param user_context Context passed by the caller.
* @return Void.
*/
void result_table_lookup(db_handle *db_handle,
object_id object_id,
retry_info *retry,
result_table_lookup_callback done_callback,
void *user_context);
#endif /* OBJECT_TABLE_H */
+296 -84
View File
@@ -11,21 +11,19 @@
#include "db.h"
#include "object_table.h"
#include "task.h"
#include "task_log.h"
#include "task_table.h"
#include "event_loop.h"
#include "redis.h"
#include "io.h"
#define LOG_REDIS_ERR(context, M, ...) \
fprintf(stderr, "[ERROR] (%s:%d: message: %s) " M "\n", __FILE__, __LINE__, \
context->errstr, ##__VA_ARGS__)
#define LOG_REDIS_ERR(context, M, ...) \
LOG_INFO("Redis error %d %s; %s", context->err, context->errstr, M)
#define CHECK_REDIS_CONNECT(CONTEXT_TYPE, context, M, ...) \
do { \
CONTEXT_TYPE *_context = (context); \
if (!_context) { \
LOG_ERR("could not allocate redis context"); \
exit(-1); \
LOG_FATAL("could not allocate redis context"); \
} \
if (_context->err) { \
LOG_REDIS_ERR(_context, M, ##__VA_ARGS__); \
@@ -123,6 +121,62 @@ void db_attach(db_handle *db, event_loop *loop) {
redisAeAttach(loop, db->sub_context);
}
/**
* An internal function to allocate a task object and parse a hashmap reply
* from Redis into the task object. If the Redis reply is malformed, an empty
* task with the given task ID is returned.
*
* @param id The ID of the task we're looking up. If the reply from Redis is
* well-formed, the reply's ID should match this ID. Else, the returned
* task will have its ID set to this ID.
* @param num_redis_replies The number of keys and values in the Redis hashmap.
* @param redis_replies A pointer to the Redis hashmap keys and values.
* @return A pointer to the parsed task.
*/
task *parse_redis_task_table_entry(task_id id,
int num_redis_replies,
redisReply **redis_replies) {
task *task_result;
if (num_redis_replies == 0) {
/* There was no information about this task. */
return NULL;
}
/* Exit immediately if there weren't 6 fields, one for each key-value pair.
* The keys are "node", "state", and "task_spec". */
DCHECK(num_redis_replies == 6);
/* Parse the task struct's fields. */
scheduling_state state = 0;
node_id node = NIL_ID;
task_spec *spec = NULL;
for (int i = 0; i < num_redis_replies; i = i + 2) {
char *key = redis_replies[i]->str;
redisReply *value = redis_replies[i + 1];
if (strcmp(key, "node") == 0) {
memcpy(&node, value->str, value->len);
} else if (strcmp(key, "state") == 0) {
int scanned = sscanf(value->str, "%d", (int *) &state);
if (scanned != 1) {
LOG_FATAL("Scheduling state for task is malformed");
state = 0;
}
} else if (strcmp(key, "task_spec") == 0) {
spec = malloc(value->len);
memcpy(spec, value->str, value->len);
} else {
LOG_FATAL("Found unexpected %s field in task log", key);
}
}
/* Exit immediately if we couldn't parse the task spec. */
if (spec == NULL) {
LOG_FATAL("Could not parse task spec from task log");
}
/* Build and return the task. */
DCHECK(task_ids_equal(task_spec_id(spec), id));
task_result = alloc_task(spec, state, node);
free_task_spec(spec);
return task_result;
}
/*
* ==== object_table callbacks ====
*/
@@ -133,7 +187,7 @@ void redis_object_table_add_callback(redisAsyncContext *c,
REDIS_CALLBACK_HEADER(db, callback_data, r)
if (callback_data->done_callback) {
task_log_done_callback done_callback = callback_data->done_callback;
task_table_done_callback done_callback = callback_data->done_callback;
done_callback(callback_data->id, callback_data->user_context);
}
destroy_timer_callback(db->loop, callback_data);
@@ -142,10 +196,12 @@ void redis_object_table_add_callback(redisAsyncContext *c,
void redis_object_table_add(table_callback_data *callback_data) {
CHECK(callback_data);
db_handle *db = callback_data->db_handle;
redisAsyncCommand(db->context, redis_object_table_add_callback,
(void *) callback_data->timer_id, "SADD obj:%b %d",
&callback_data->id.id[0], UNIQUE_ID_SIZE, db->client_id);
if (db->context->err) {
object_id id = callback_data->id;
int status =
redisAsyncCommand(db->context, redis_object_table_add_callback,
(void *) callback_data->timer_id, "SADD obj:%b %d",
id.id, sizeof(object_id), db->client_id);
if ((status == REDIS_ERR) || db->context->err) {
LOG_REDIS_ERR(db->context, "could not add object_table entry");
}
}
@@ -155,14 +211,117 @@ void redis_object_table_lookup(table_callback_data *callback_data) {
db_handle *db = callback_data->db_handle;
/* Call redis asynchronously */
redisAsyncCommand(db->context, redis_object_table_get_entry,
(void *) callback_data->timer_id, "SMEMBERS obj:%b",
&callback_data->id.id[0], UNIQUE_ID_SIZE);
if (db->context->err) {
object_id id = callback_data->id;
int status = redisAsyncCommand(db->context, redis_object_table_get_entry,
(void *) callback_data->timer_id,
"SMEMBERS obj:%b", id.id, sizeof(object_id));
if ((status == REDIS_ERR) || db->context->err) {
LOG_REDIS_ERR(db->context, "error in object_table lookup");
}
}
void redis_result_table_add_callback(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r)
redisReply *reply = r;
CHECK(reply->type == REDIS_REPLY_STATUS ||
reply->type == REDIS_REPLY_INTEGER);
if (callback_data->done_callback) {
result_table_done_callback done_callback = callback_data->done_callback;
done_callback(callback_data->id, callback_data->user_context);
}
task_id *task_id = callback_data->data;
free(task_id);
destroy_timer_callback(db->loop, callback_data);
}
void redis_result_table_add(table_callback_data *callback_data) {
CHECK(callback_data);
db_handle *db = callback_data->db_handle;
object_id id = callback_data->id;
task_id *result_task_id = (task_id *) callback_data->data;
/* Add the result entry to the result table. */
int status = redisAsyncCommand(db->context, redis_result_table_add_callback,
(void *) callback_data->timer_id,
"SET result:%b %b", id.id, sizeof(object_id),
(*result_task_id).id, sizeof(task_id));
if ((status == REDIS_ERR) || db->context->err) {
LOG_REDIS_ERR(db->context, "Error in result table add");
}
}
void redis_result_table_lookup_task_callback(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r)
redisReply *reply = r;
/* Check that we received a Redis hashmap. */
if (reply->type != REDIS_REPLY_ARRAY) {
LOG_FATAL("Expected Redis array, received type %d %s", reply->type,
reply->str);
}
/* If the user registered a success callback, construct the task object from
* the Redis reply and call the callback. */
result_table_lookup_callback done_callback = callback_data->done_callback;
task_id *result_task_id = callback_data->data;
if (done_callback) {
task *task_reply = parse_redis_task_table_entry(
*result_task_id, reply->elements, reply->element);
done_callback(callback_data->id, task_reply, callback_data->user_context);
free_task(task_reply);
}
free(result_task_id);
destroy_timer_callback(db->loop, callback_data);
}
void redis_result_table_lookup_object_callback(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r)
redisReply *reply = r;
if (reply->type == REDIS_REPLY_STRING) {
/* If we found the object, get the spec of the task that created it. */
DCHECK(reply->len == sizeof(task_id));
task_id *result_task_id = malloc(sizeof(task_id));
memcpy(result_task_id, reply->str, reply->len);
callback_data->data = (void *) result_task_id;
int status =
redisAsyncCommand(db->context, redis_result_table_lookup_task_callback,
(void *) callback_data->timer_id, "HGETALL task:%b",
(*result_task_id).id, sizeof(task_id));
if ((status == REDIS_ERR) || db->context->err) {
LOG_REDIS_ERR(db->context, "Could not look up result table entry");
}
} else if (reply->type == REDIS_REPLY_NIL) {
/* The object with the requested ID was not in the table. */
LOG_ERR("Object's result not in table.");
result_table_lookup_callback done_callback = callback_data->done_callback;
if (done_callback) {
done_callback(callback_data->id, NULL, callback_data->user_context);
}
destroy_timer_callback(db->loop, callback_data);
return;
} else {
LOG_FATAL("expected string or nil, received type %d", reply->type);
}
}
void redis_result_table_lookup(table_callback_data *callback_data) {
CHECK(callback_data);
db_handle *db = callback_data->db_handle;
/* First, lookup the ID of the task that created this object. */
object_id id = callback_data->id;
int status =
redisAsyncCommand(db->context, redis_result_table_lookup_object_callback,
(void *) callback_data->timer_id, "GET result:%b",
id.id, sizeof(object_id));
if ((status == REDIS_ERR) || db->context->err) {
LOG_REDIS_ERR(db->context, "Error in result table lookup");
}
}
/**
* Get an entry from the plasma manager table in redis.
*
@@ -213,8 +372,7 @@ void redis_object_table_get_entry(redisAsyncContext *c,
destroy_timer_callback(callback_data->db_handle->loop, callback_data);
free(managers);
} else {
LOG_ERR("expected integer or string, received type %d", reply->type);
exit(-1);
LOG_FATAL("expected integer or string, received type %d", reply->type);
}
}
@@ -249,28 +407,64 @@ void redis_object_table_subscribe(table_callback_data *callback_data) {
db_handle *db = callback_data->db_handle;
/* subscribe to key notification associated to object id */
redisAsyncCommand(db->sub_context, object_table_redis_callback,
(void *) callback_data->timer_id,
"SUBSCRIBE __keyspace@0__:%b add",
(char *) &callback_data->id.id[0], UNIQUE_ID_SIZE);
if (db->sub_context->err) {
object_id id = callback_data->id;
int status = redisAsyncCommand(db->sub_context, object_table_redis_callback,
(void *) callback_data->timer_id,
"SUBSCRIBE __keyspace@0__:%b add", id.id,
sizeof(object_id));
if ((status == REDIS_ERR) || db->sub_context->err) {
LOG_REDIS_ERR(db->sub_context,
"error in redis_object_table_subscribe_callback");
}
}
/*
* ==== task_log callbacks ====
* ==== task_table callbacks ====
*/
void redis_task_log_publish(table_callback_data *callback_data) {
void redis_task_table_get_task_callback(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r)
redisReply *reply = r;
/* Check that we received a Redis hashmap. */
if (reply->type != REDIS_REPLY_ARRAY) {
LOG_FATAL("Expected Redis array, received type %d %s", reply->type,
reply->str);
}
/* If the user registered a success callback, construct the task object from
* the Redis reply and call the callback. */
if (callback_data->done_callback) {
task_table_get_callback done_callback = callback_data->done_callback;
task *task_reply = parse_redis_task_table_entry(
callback_data->id, reply->elements, reply->element);
done_callback(task_reply, callback_data->user_context);
free_task(task_reply);
}
destroy_timer_callback(db->loop, callback_data);
}
void redis_task_table_get_task(table_callback_data *callback_data) {
CHECK(callback_data);
db_handle *db = callback_data->db_handle;
task_instance *task_instance = callback_data->data;
task_iid task_iid = *task_instance_id(task_instance);
node_id node = *task_instance_node(task_instance);
int32_t state = *task_instance_state(task_instance);
task_id id = callback_data->id;
int status =
redisAsyncCommand(db->context, redis_task_table_get_task_callback,
(void *) callback_data->timer_id, "HGETALL task:%b",
id.id, sizeof(task_id));
if ((status == REDIS_ERR) || db->sub_context->err) {
LOG_REDIS_ERR(db->sub_context, "Could not get task from task table");
}
}
void redis_task_table_publish(table_callback_data *callback_data,
bool task_added) {
db_handle *db = callback_data->db_handle;
task *task = callback_data->data;
task_id id = task_task_id(task);
node_id node = task_node(task);
scheduling_state state = task_state(task);
task_spec *spec = task_task_spec(task);
LOG_DEBUG("Called log_publish callback");
@@ -294,84 +488,98 @@ void redis_task_log_publish(table_callback_data *callback_data) {
}
if (((bool *) callback_data->requests_info)[PUSH_INDEX] == false) {
if (*task_instance_state(task_instance) == TASK_STATUS_WAITING) {
redisAsyncCommand(db->context, redis_task_log_publish_push_callback,
(void *) callback_data->timer_id, "RPUSH tasklog:%b %b",
(char *) &task_iid.id[0], UNIQUE_ID_SIZE,
(char *) task_instance,
task_instance_size(task_instance));
/* If the task has already been added to the task table, only update the
* scheduling information fields. */
int status = REDIS_OK;
if (task_added) {
status = redisAsyncCommand(
db->context, redis_task_table_publish_push_callback,
(void *) callback_data->timer_id, "HMSET task:%b state %d node %b",
(char *) id.id, sizeof(task_id), state, (char *) node.id,
sizeof(node_id));
} else {
task_update update = {.state = state, .node = node};
redisAsyncCommand(db->context, redis_task_log_publish_push_callback,
(void *) callback_data->timer_id, "RPUSH tasklog:%b %b",
(char *) &task_iid.id[0], UNIQUE_ID_SIZE,
(char *) &update, sizeof(update));
status = redisAsyncCommand(
db->context, redis_task_table_publish_push_callback,
(void *) callback_data->timer_id,
"HMSET task:%b state %d node %b task_spec %b", (char *) id.id,
sizeof(task_id), state, (char *) node.id, sizeof(node_id),
(char *) spec, task_spec_size(spec));
}
if (db->context->err) {
LOG_REDIS_ERR(db->context, "error setting task in task_log_add_task");
if ((status = REDIS_ERR) || db->context->err) {
LOG_REDIS_ERR(db->context, "error setting task in task_table_add_task");
}
}
if (((bool *) callback_data->requests_info)[PUBLISH_INDEX] == false) {
redisAsyncCommand(db->context, redis_task_log_publish_publish_callback,
(void *) callback_data->timer_id,
"PUBLISH task_log:%b:%d %b", (char *) &node.id[0],
UNIQUE_ID_SIZE, state, (char *) task_instance,
task_instance_size(task_instance));
int status = redisAsyncCommand(
db->context, redis_task_table_publish_publish_callback,
(void *) callback_data->timer_id, "PUBLISH task:%b:%d %b",
(char *) node.id, sizeof(node_id), state, (char *) task,
task_size(task));
if (db->context->err) {
LOG_REDIS_ERR(db->context, "error publishing task in task_log_add_task");
if ((status == REDIS_ERR) || db->context->err) {
LOG_REDIS_ERR(db->context,
"error publishing task in task_table_add_task");
}
}
}
void redis_task_log_publish_push_callback(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r)
void redis_task_table_add_task(table_callback_data *callback_data) {
redis_task_table_publish(callback_data, false);
}
void redis_task_table_update(table_callback_data *callback_data) {
redis_task_table_publish(callback_data, true);
}
void redis_task_table_publish_push_callback(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r)
CHECK(callback_data->requests_info != NULL);
((bool *) callback_data->requests_info)[PUSH_INDEX] = true;
if (((bool *) callback_data->requests_info)[PUBLISH_INDEX] == true) {
if (callback_data->done_callback) {
task_log_done_callback done_callback = callback_data->done_callback;
task_table_done_callback done_callback = callback_data->done_callback;
done_callback(callback_data->id, callback_data->user_context);
}
destroy_timer_callback(db->loop, callback_data);
}
}
void redis_task_log_publish_publish_callback(redisAsyncContext *c,
void *r,
void *privdata) {
void redis_task_table_publish_publish_callback(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r)
CHECK(callback_data->requests_info != NULL);
((bool *) callback_data->requests_info)[PUBLISH_INDEX] = true;
if (((bool *) callback_data->requests_info)[PUSH_INDEX] == true) {
if (callback_data->done_callback) {
task_log_done_callback done_callback = callback_data->done_callback;
task_table_done_callback done_callback = callback_data->done_callback;
done_callback(callback_data->id, callback_data->user_context);
}
destroy_timer_callback(db->loop, callback_data);
}
}
void task_log_redis_callback(redisAsyncContext *c, void *r, void *privdata) {
void redis_task_table_subscribe_callback(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r)
redisReply *reply = r;
CHECK(reply->type == REDIS_REPLY_ARRAY);
/* First entry is message type, second is topic, third is payload. */
CHECK(reply->elements > 2);
/* If this condition is true, we got the initial message that acknowledged the
* subscription. */
if (reply->element[2]->str == NULL) {
CHECK(reply->elements > 2);
/* First entry is message type, then possibly the regex we psubscribed to,
* then topic, then payload. */
redisReply *payload = reply->element[reply->elements - 1];
if (payload->str == NULL) {
if (callback_data->done_callback) {
task_log_done_callback done_callback = callback_data->done_callback;
task_table_done_callback done_callback = callback_data->done_callback;
done_callback(callback_data->id, callback_data->user_context);
}
/* Note that we do not destroy the callback data yet because the
@@ -380,32 +588,36 @@ void task_log_redis_callback(redisAsyncContext *c, void *r, void *privdata) {
return;
}
/* Otherwise, parse the task and call the callback. */
task_log_subscribe_data *data = callback_data->data;
task_table_subscribe_data *data = callback_data->data;
task_instance *instance = malloc(reply->element[2]->len);
memcpy(instance, reply->element[2]->str, reply->element[2]->len);
task *task = malloc(payload->len);
memcpy(task, payload->str, payload->len);
if (data->subscribe_callback) {
data->subscribe_callback(instance, data->subscribe_context);
data->subscribe_callback(task, data->subscribe_context);
}
task_instance_free(instance);
free_task(task);
}
void redis_task_log_subscribe(table_callback_data *callback_data) {
void redis_task_table_subscribe(table_callback_data *callback_data) {
db_handle *db = callback_data->db_handle;
task_log_subscribe_data *data = callback_data->data;
if (memcmp(&data->node.id[0], &NIL_ID.id[0], UNIQUE_ID_SIZE) == 0) {
redisAsyncCommand(db->sub_context, task_log_redis_callback,
(void *) callback_data->timer_id,
"PSUBSCRIBE task_log:*:%d", data->state_filter);
task_table_subscribe_data *data = callback_data->data;
int status = REDIS_OK;
if (IS_NIL_ID(data->node)) {
/* TODO(swang): Implement the state_filter by translating the bitmask into
* a Redis key-matching pattern. */
status =
redisAsyncCommand(db->sub_context, redis_task_table_subscribe_callback,
(void *) callback_data->timer_id,
"PSUBSCRIBE task:*:%d", data->state_filter);
} else {
redisAsyncCommand(db->sub_context, task_log_redis_callback,
(void *) callback_data->timer_id,
"SUBSCRIBE task_log:%b:%d", (char *) &data->node.id[0],
UNIQUE_ID_SIZE, data->state_filter);
node_id node = data->node;
status = redisAsyncCommand(
db->sub_context, redis_task_table_subscribe_callback,
(void *) callback_data->timer_id, "SUBSCRIBE task:%b:%d",
(char *) node.id, sizeof(node_id), data->state_filter);
}
if (db->sub_context->err) {
LOG_REDIS_ERR(db->sub_context, "error in task_log_register_callback");
if ((status == REDIS_ERR) || db->sub_context->err) {
LOG_REDIS_ERR(db->sub_context, "error in task_table_register_callback");
}
}
+54 -14
View File
@@ -3,7 +3,7 @@
#include "db.h"
#include "object_table.h"
#include "task_log.h"
#include "task_table.h"
#include "hiredis/hiredis.h"
#include "hiredis/async.h"
@@ -65,7 +65,7 @@ void object_table_lookup_callback(redisAsyncContext *c,
void redis_object_table_lookup(table_callback_data *callback_data);
/**
* Add an entry to the object table in redis.
* Add a location entry to the object table in redis.
*
* @param callback_data Data structure containing redis connection and timeout
* information.
@@ -82,50 +82,90 @@ void redis_object_table_add(table_callback_data *callback_data);
*/
void redis_object_table_subscribe(table_callback_data *callback_data);
/**
* Add a new object to the object table in redis.
*
* @param callback_data Data structure containing redis connection and timeout
* information.
* @return Void.
*/
void redis_result_table_add(table_callback_data *callback_data);
/**
* Lookup the object in the object table in redis. The entry in
* the object table contains metadata about the object.
*
* @param callback_data Data structure containing redis connection and timeout
* information.
* @return Void.
*/
void redis_result_table_lookup(table_callback_data *callback_data);
/*
* ==== Redis task table function =====
*/
/**
* Add or update task log entry with new scheduling information.
* Get a task table entry, including the task spec and the task's scheduling
* information.
*
* @param callback_data Data structure containing redis connection and timeout
* information.
* @return Void.
*/
void redis_task_log_publish(table_callback_data *callback_data);
void redis_task_table_get_task(table_callback_data *callback_data);
/**
* Callback invoked when the replya from the task push command is received.
* Add a task table entry with a new task spec and the task's scheduling
* information.
*
* @param callback_data Data structure containing redis connection and timeout
* information.
* @return Void.
*/
void redis_task_table_add_task(table_callback_data *callback_data);
/**
* Update a task table entry with the task's scheduling information.
*
* @param callback_data Data structure containing redis connection and timeout
* information.
* @return Void.
*/
void redis_task_table_update(table_callback_data *callback_data);
/**
* Callback invoked when the reply from the task push command is received.
*
* @param c Redis context.
* @param r Reply (not used).
* @param privdata Data associated to the callback.
* @return Void.
*/
void redis_task_log_publish_push_callback(redisAsyncContext *c,
void *r,
void *privdata);
void redis_task_table_publish_push_callback(redisAsyncContext *c,
void *r,
void *privdata);
/**
* Callback invoked when the replya from the task publish command is received.
* Callback invoked when the reply from the task publish command is received.
*
* @param c Redis context.
* @param r Reply (not used).
* @param privdata Data associated to the callback.
* @return Void.
*/
void redis_task_log_publish_publish_callback(redisAsyncContext *c,
void *r,
void *privdata);
void redis_task_table_publish_publish_callback(redisAsyncContext *c,
void *r,
void *privdata);
/**
* Subscribe to updates of the task log.
* Subscribe to updates of the task table.
*
* @param callback_data Data structure containing redis connection and timeout
* information.
* @return Void.
*/
void redis_task_log_subscribe(table_callback_data *callback_data);
void redis_task_table_subscribe(table_callback_data *callback_data);
#endif /* REDIS_H */
+3 -2
View File
@@ -67,9 +67,10 @@ int64_t table_timeout_handler(event_loop *loop,
if (callback_data->retry.num_retries == 0) {
/* We didn't get a response from the database after exhausting all retries;
* let user know, cleanup the state, and remove the timer. */
LOG_ERR("Table command with timer ID %ld failed", timer_id);
if (callback_data->retry.fail_callback) {
callback_data->retry.fail_callback(callback_data->id,
callback_data->user_context);
callback_data->retry.fail_callback(
callback_data->id, callback_data->user_context, callback_data->data);
}
destroy_table_callback(callback_data);
return EVENT_LOOP_TIMER_DONE;
+16 -3
View File
@@ -12,8 +12,19 @@ typedef struct table_callback_data table_callback_data;
typedef void *table_done_callback;
/* The callback called when the database operation hasn't completed after
* the number of retries specified for the operation. */
typedef void (*table_fail_callback)(unique_id id, void *user_context);
* the number of retries specified for the operation.
*
* @param id The unique ID that identifies this callback. Examples include an
* object ID or task ID.
* @param user_context The state context for the callback. This is equivalent
* to the user_context field in table_callback_data.
* @param user_data A data argument for the callback. This is equivalent to the
* data field in table_callback_data. The user is responsible for
* freeing user_data.
*/
typedef void (*table_fail_callback)(unique_id id,
void *user_context,
void *user_data);
typedef void (*table_retry_callback)(table_callback_data *callback_data);
@@ -41,7 +52,9 @@ struct table_callback_data {
* before the next retry, and a pointer to the failure callback.
*/
retry_info retry;
/** Pointer to the data that is entered into the table. */
/** Pointer to the data that is entered into the table. This can be used to
* pass the result of the call to the callback. The user is responsible for
* freeing data in both the fail_callback and done_callback. */
void *data;
/** Pointer to the data used internally to handle multiple database requests.
*/
-34
View File
@@ -1,34 +0,0 @@
#include "task_log.h"
#include "redis.h"
#define NUM_DB_REQUESTS 2
void task_log_publish(db_handle *db_handle,
task_instance *task_instance,
retry_info *retry,
task_log_done_callback done_callback,
void *user_context) {
init_table_callback(db_handle, *task_instance_id(task_instance),
task_instance, retry, done_callback,
redis_task_log_publish, user_context);
}
/* TODO(swang): A corresponding task_log_unsubscribe. */
void task_log_subscribe(db_handle *db_handle,
node_id node,
int32_t state_filter,
task_log_subscribe_callback subscribe_callback,
void *subscribe_context,
retry_info *retry,
task_log_done_callback done_callback,
void *user_context) {
task_log_subscribe_data *sub_data = malloc(sizeof(task_log_subscribe_data));
utarray_push_back(db_handle->callback_freelist, &sub_data);
sub_data->node = node;
sub_data->state_filter = state_filter;
sub_data->subscribe_callback = subscribe_callback;
sub_data->subscribe_context = subscribe_context;
init_table_callback(db_handle, node, sub_data, retry, done_callback,
redis_task_log_subscribe, user_context);
}
-88
View File
@@ -1,88 +0,0 @@
#ifndef TASK_LOG_H
#define TASK_LOG_H
#include "db.h"
#include "table.h"
#include "task.h"
/**
* The task log is a message bus that is used for all communication between
* local and global schedulers (and also persisted to the state database).
* Here are examples of events that are recorded by the task log:
*
* 1) local scheduler writes it when submits a task to the global scheduler;
* 2) global scheduler reads it to get the task submitted by local schedulers;
* 3) global scheduler writes it when assigning the task to a local scheduler;
* 4) local scheduler reads it to get its tasks assigned by global scheduler;
* 5) local scheduler writes it when a task finishes execution;
* 6) global scheduler reads it to get the tasks that have finished; */
/* Callback called when the task log operation completes. */
typedef void (*task_log_done_callback)(task_iid task_iid, void *user_context);
/*
* ==== Publish the task log ====
*/
/**
* Add or update a task instance to the task log.
*
* @param db_handle Database handle.
* @param retry Information about retrying the request to the database.
* @param done_callback Function to be called when database returns result.
* @param user_context Data that will be passed to done_callback and
* fail_callback.
* @return Void.
*/
void task_log_publish(db_handle *db_handle,
task_instance *task_instance,
retry_info *retry,
task_log_done_callback done_callback,
void *user_context);
/*
* ==== Subscribing to the task log ====
*/
/* Callback for subscribing to the task log. */
typedef void (*task_log_subscribe_callback)(task_instance *task_instance,
void *user_context);
/**
* Register callback for a certain event.
*
* @param db_handle Database handle.
* @param subscribe_callback Callback that will be called when the task log is
* updated.
* @param subscribe_context Context that will be passed into the
* subscribe_callback.
* @param node Node whose events we want to listen to. If you want to register
* to updates from all nodes, set node = NIL_ID.
* @param state_filter Flags for events we want to listen to. If you want
* to listen to all events, use state_filter = TASK_WAITING |
* TASK_SCHEDULED | TASK_RUNNING | TASK_DONE.
* @param retry Information about retrying the request to the database.
* @param done_callback Function to be called when database returns result.
* @param user_context Data that will be passed to done_callback and
* fail_callback.
* @return Void.
*/
void task_log_subscribe(db_handle *db_handle,
node_id node,
int32_t state_filter,
task_log_subscribe_callback subscribe_callback,
void *subscribe_context,
retry_info *retry,
task_log_done_callback done_callback,
void *user_context);
/* Data that is needed to register task log subscribe callbacks with the state
* database. */
typedef struct {
node_id node;
int32_t state_filter;
task_log_subscribe_callback subscribe_callback;
void *subscribe_context;
} task_log_subscribe_data;
#endif /* TASK_LOG_H */
+52
View File
@@ -0,0 +1,52 @@
#include "task_table.h"
#include "redis.h"
#define NUM_DB_REQUESTS 2
void task_table_get_task(db_handle *db_handle,
task_id task_id,
retry_info *retry,
task_table_get_callback done_callback,
void *user_context) {
init_table_callback(db_handle, task_id, NULL, retry, done_callback,
redis_task_table_get_task, user_context);
}
void task_table_add_task(db_handle *db_handle,
task *task,
retry_info *retry,
task_table_done_callback done_callback,
void *user_context) {
init_table_callback(db_handle, task_task_id(task), task, retry, done_callback,
redis_task_table_add_task, user_context);
}
void task_table_update(db_handle *db_handle,
task *task,
retry_info *retry,
task_table_done_callback done_callback,
void *user_context) {
init_table_callback(db_handle, task_task_id(task), task, retry, done_callback,
redis_task_table_update, user_context);
}
/* TODO(swang): A corresponding task_table_unsubscribe. */
void task_table_subscribe(db_handle *db_handle,
node_id node,
scheduling_state state_filter,
task_table_subscribe_callback subscribe_callback,
void *subscribe_context,
retry_info *retry,
task_table_done_callback done_callback,
void *user_context) {
task_table_subscribe_data *sub_data =
malloc(sizeof(task_table_subscribe_data));
utarray_push_back(db_handle->callback_freelist, &sub_data);
sub_data->node = node;
sub_data->state_filter = state_filter;
sub_data->subscribe_callback = subscribe_callback;
sub_data->subscribe_context = subscribe_context;
init_table_callback(db_handle, node, sub_data, retry, done_callback,
redis_task_table_subscribe, user_context);
}
+125 -13
View File
@@ -1,20 +1,132 @@
#ifndef TASK_TABLE_H
#define TASK_TABLE_H
#ifndef task_table_H
#define task_table_H
#include "db.h"
#include "table.h"
#include "task.h"
/* Add task to the task table, handle errors here. */
status task_table_add_task(db_handle *db, task_spec *task);
/**
* The task table is a message bus that is used for all communication between
* local and global schedulers (and also persisted to the state database).
* Here are examples of events that are recorded by the task table:
*
* 1) local scheduler writes when it submits a task to the global scheduler;
* 2) global scheduler reads it to get the task submitted by local schedulers;
* 3) global scheduler writes it when assigning the task to a local scheduler;
* 4) local scheduler reads it to get its tasks assigned by global scheduler;
* 5) local scheduler writes it when a task finishes execution;
* 6) global scheduler reads it to get the tasks that have finished; */
/* Callback for getting an entry from the task table. Task spec will be freed
* by the system after the callback */
typedef void (*task_table_callback)(task_spec *task, void *context);
/* Callback called when a task table write operation completes. */
typedef void (*task_table_done_callback)(task_id task_id, void *user_context);
/* Get specific task from the task table. */
status task_table_get_task(db_handle *db,
task_id task_id,
task_table_callback callback,
void *context);
/* Callback called when a task table read operation completes. */
typedef void (*task_table_get_callback)(task *task, void *user_context);
#endif /* TASK_TABLE_H */
/**
* Get a task's entry from the task table.
*
* @param db_handle Database handle.
* @param task_id The ID of the task we want to look up.
* @param retry Information about retrying the request to the database.
* @param done_callback Function to be called when database returns result.
* @param user_context Data that will be passed to done_callback and
* fail_callback.
* @return Void.
*/
void task_table_get_task(db_handle *db,
task_id task_id,
retry_info *retry,
task_table_get_callback done_callback,
void *user_context);
/**
* Add a task entry, including task spec and scheduling information, to the
* task table. This will overwrite any task already in the task table with the
* same task ID.
*
* @param db_handle Database handle.
* @param task The task entry to add to the table.
* @param retry Information about retrying the request to the database.
* @param done_callback Function to be called when database returns result.
* @param user_context Data that will be passed to done_callback and
* fail_callback.
* @return Void.
*/
void task_table_add_task(db_handle *db_handle,
task *task,
retry_info *retry,
task_table_done_callback done_callback,
void *user_context);
/*
* ==== Publish the task table ====
*/
/**
* Update a task's scheduling information in the task table. This assumes that
* the task spec already exists in the task table entry.
*
* @param db_handle Database handle.
* @param task The task entry to add to the table. The task spec in the entry is
* ignored.
* @param retry Information about retrying the request to the database.
* @param done_callback Function to be called when database returns result.
* @param user_context Data that will be passed to done_callback and
* fail_callback.
* @return Void.
*/
void task_table_update(db_handle *db_handle,
task *task,
retry_info *retry,
task_table_done_callback done_callback,
void *user_context);
/*
* ==== Subscribing to the task table ====
*/
/* Callback for subscribing to the task table. */
typedef void (*task_table_subscribe_callback)(task *task, void *user_context);
/**
* Register a callback for a task event. An event is any update of a task in
* the task table, produced by task_table_add_task or task_table_add_task.
* Events include changes to the task's scheduling state or changes to the
* task's node location.
*
* @param db_handle Database handle.
* @param subscribe_callback Callback that will be called when the task table is
* updated.
* @param subscribe_context Context that will be passed into the
* subscribe_callback.
* @param node Node whose events we want to listen to. If you want to register
* to updates from all nodes, set node = NIL_ID.
* @param state_filter Flags for events we want to listen to. If you want
* to listen to all events, use state_filter = TASK_WAITING |
* TASK_SCHEDULED | TASK_RUNNING | TASK_DONE.
* @param retry Information about retrying the request to the database.
* @param done_callback Function to be called when database returns result.
* @param user_context Data that will be passed to done_callback and
* fail_callback.
* @return Void.
*/
void task_table_subscribe(db_handle *db_handle,
node_id node,
scheduling_state state_filter,
task_table_subscribe_callback subscribe_callback,
void *subscribe_context,
retry_info *retry,
task_table_done_callback done_callback,
void *user_context);
/* Data that is needed to register task table subscribe callbacks with the state
* database. */
typedef struct {
node_id node;
scheduling_state state_filter;
task_table_subscribe_callback subscribe_callback;
void *subscribe_context;
} task_table_subscribe_data;
#endif /* task_table_H */
+55 -32
View File
@@ -9,10 +9,6 @@
#include "common.h"
#include "io.h"
const unique_id NIL_TASK_ID = {{255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255}};
/* TASK SPECIFICATIONS */
/* Tasks are stored in a consecutive chunk of memory, the first
@@ -70,11 +66,19 @@ struct task_spec_impl {
(ARGS_VALUE_SIZE))
bool task_ids_equal(task_id first_id, task_id second_id) {
return memcmp(&first_id, &second_id, sizeof(task_id)) == 0 ? true : false;
return UNIQUE_ID_EQ(first_id, second_id) ? true : false;
}
bool task_id_is_nil(task_id id) {
return task_ids_equal(id, NIL_TASK_ID);
}
bool function_ids_equal(function_id first_id, function_id second_id) {
return memcmp(&first_id, &second_id, sizeof(function_id)) == 0 ? true : false;
return UNIQUE_ID_EQ(first_id, second_id) ? true : false;
}
bool function_id_is_nil(function_id id) {
return function_ids_equal(id, NIL_FUNCTION_ID);
}
task_id *task_return_ptr(task_spec *spec, int64_t return_index) {
@@ -99,7 +103,7 @@ task_id compute_task_id(task_spec *spec) {
SHA256_CTX ctx;
BYTE buff[SHA256_BLOCK_SIZE];
sha256_init(&ctx);
sha256_update(&ctx, (BYTE *) spec, task_size(spec));
sha256_update(&ctx, (BYTE *) spec, task_spec_size(spec));
sha256_final(&ctx, buff);
/* Create a task ID out of the hash. This will truncate the hash. */
task_id task_id;
@@ -151,7 +155,15 @@ void finish_construct_task_spec(task_spec *spec) {
}
}
int64_t task_size(task_spec *spec) {
task_spec *alloc_nil_task_spec(task_id task_id) {
task_spec *spec =
start_construct_task_spec(NIL_ID, 0, NIL_FUNCTION_ID, 0, 0, 0);
finish_construct_task_spec(spec);
spec->task_id = task_id;
return spec;
}
int64_t task_spec_size(task_spec *spec) {
return TASK_SPEC_SIZE(spec->num_args, spec->num_returns,
spec->args_value_size);
}
@@ -162,7 +174,7 @@ function_id task_function(task_spec *spec) {
return spec->function_id;
}
task_id task_task_id(task_spec *spec) {
task_id task_spec_id(task_spec *spec) {
/* Check that the task has been constructed. */
DCHECK(!task_ids_equal(spec->task_id, NIL_TASK_ID));
return spec->task_id;
@@ -269,47 +281,58 @@ void print_task(task_spec *spec, UT_string *output) {
/* TASK INSTANCES */
struct task_instance_impl {
task_iid iid;
int32_t state;
struct task_impl {
scheduling_state state;
node_id node;
task_spec spec;
};
task_instance *make_task_instance(task_iid task_iid,
task_spec *spec,
int32_t state,
node_id node) {
int64_t size = sizeof(task_instance) - sizeof(task_spec) + task_size(spec);
task_instance *result = malloc(size);
bool node_ids_equal(node_id first_id, node_id second_id) {
return UNIQUE_ID_EQ(first_id, second_id) ? true : false;
}
bool node_id_is_nil(node_id id) {
return node_ids_equal(id, NIL_NODE_ID);
}
task *alloc_task(task_spec *spec, scheduling_state state, node_id node) {
int64_t size = sizeof(task) - sizeof(task_spec) + task_spec_size(spec);
task *result = malloc(size);
memset(result, 0, size);
result->iid = task_iid;
result->state = state;
result->node = node;
memcpy(&result->spec, spec, task_size(spec));
memcpy(&result->spec, spec, task_spec_size(spec));
return result;
}
int64_t task_instance_size(task_instance *instance) {
return sizeof(task_instance) - sizeof(task_spec) + task_size(&instance->spec);
task *alloc_nil_task(task_id task_id) {
task_spec *nil_spec = alloc_nil_task_spec(task_id);
task *nil_task = alloc_task(nil_spec, 0, NIL_ID);
free_task_spec(nil_spec);
return nil_task;
}
task_iid *task_instance_id(task_instance *instance) {
return &instance->iid;
int64_t task_size(task *task_arg) {
return sizeof(task) - sizeof(task_spec) + task_spec_size(&task_arg->spec);
}
int32_t *task_instance_state(task_instance *instance) {
return &instance->state;
scheduling_state task_state(task *task) {
return task->state;
}
node_id *task_instance_node(task_instance *instance) {
return &instance->node;
node_id task_node(task *task) {
return task->node;
}
task_spec *task_instance_task_spec(task_instance *instance) {
return &instance->spec;
task_spec *task_task_spec(task *task) {
return &task->spec;
}
void task_instance_free(task_instance *instance) {
free(instance);
task_id task_task_id(task *task) {
task_spec *spec = task_task_spec(task);
return task_spec_id(spec);
}
void free_task(task *task) {
free(task);
}
+73 -32
View File
@@ -1,7 +1,8 @@
#ifndef TASK_H
#define TASK_H
/* This API specifies the task data structures. It is in C so we can
/**
* This API specifies the task data structures. It is in C so we can
* easily construct tasks from other languages like Python. The datastructures
* are also defined in such a way that memory is contiguous and all pointers
* are relative, so that we can memcpy the datastructure and ship it over the
@@ -13,6 +14,10 @@
#include "common.h"
#include "utstring.h"
#define NIL_TASK_ID NIL_ID
#define NIL_FUNCTION_ID NIL_ID
#define NIL_NODE_ID NIL_ID
typedef unique_id function_id;
/** The task ID is a deterministic hash of the function ID that the task
@@ -26,7 +31,7 @@ typedef unique_id task_iid;
/** The node id is an identifier for the node the task is scheduled on. */
typedef unique_id node_id;
/*
/**
* ==== Task specifications ====
* Contain all the information neccessary to execute the
* task (function id, arguments, return object ids).
@@ -46,6 +51,14 @@ enum arg_type { ARG_BY_REF, ARG_BY_VAL };
*/
bool task_ids_equal(task_id first_id, task_id second_id);
/**
* Compare a task ID to the nil ID.
*
* @param id The task ID to compare to nil.
* @return True if the task ID is equal to nil.
*/
bool task_id_is_nil(task_id id);
/**
* Compare two function IDs.
*
@@ -55,6 +68,14 @@ bool task_ids_equal(task_id first_id, task_id second_id);
*/
bool function_ids_equal(function_id first_id, function_id second_id);
/**
* Compare a function ID to the nil ID.
*
* @param id The function ID to compare to nil.
* @return True if the function ID is equal to nil.
*/
bool function_id_is_nil(function_id id);
/* Construct and modify task specifications. */
/**
@@ -94,7 +115,7 @@ void finish_construct_task_spec(task_spec *spec);
* @param spec The task_spec in question.
* @return The size of the task_spec in bytes.
*/
int64_t task_size(task_spec *spec);
int64_t task_spec_size(task_spec *spec);
/**
* Return the function ID of the task.
@@ -110,7 +131,7 @@ function_id task_function(task_spec *spec);
* @param spec The task_spec in question.
* @return The task ID of the task.
*/
task_id task_task_id(task_spec *spec);
task_id task_spec_id(task_spec *spec);
/**
* Get the number of arguments to this task.
@@ -216,8 +237,8 @@ void free_task_spec(task_spec *spec);
*/
void print_task(task_spec *spec, UT_string *output);
/*
* ==== Task instance ====
/**
* ==== Task ====
* Contains information about a scheduled task: The task iid,
* the task specification and the task status (WAITING, SCHEDULED,
* RUNNING, DONE) and which node the task is scheduled on.
@@ -225,50 +246,70 @@ void print_task(task_spec *spec, UT_string *output);
/** The scheduling_state can be used as a flag when we are listening
* for an event, for example TASK_WAITING | TASK_SCHEDULED. */
enum scheduling_state {
typedef enum {
TASK_STATUS_WAITING = 1,
TASK_STATUS_SCHEDULED = 2,
TASK_STATUS_RUNNING = 4,
TASK_STATUS_DONE = 8
};
} scheduling_state;
/** A task instance is one execution of a task specification. It has a unique
* instance id, a state of execution (see scheduling_state) and a node it is
* scheduled on or running on. */
typedef struct task_instance_impl task_instance;
/**
* Compare two node IDs.
*
* @param first_id The first node ID to compare.
* @param second_id The first node ID to compare.
* @return True if the node IDs are the same and false
* otherwise.
*/
bool node_ids_equal(node_id first_id, node_id second_id);
/* Allocate and initialize a new task instance. Must be freed with
* scheduled_task_free after use. */
task_instance *make_task_instance(task_iid task_iid,
task_spec *task,
int32_t state,
node_id node);
/**
* Compare a node ID to the nil ID.
*
* @param id The node ID to compare to nil.
* @return True if the node ID is equal to nil.
*/
bool node_id_is_nil(node_id id);
/* Size of task instance structure in bytes. */
int64_t task_instance_size(task_instance *instance);
/** A task is an execution of a task specification. It has a state of
* execution (see scheduling_state) and a node it is scheduled on or running
* on. */
typedef struct task_impl task;
/* Instance ID of the task instance. */
task_iid *task_instance_id(task_instance *instance);
/**
* Allocate a new task. Must be freed with free_task after use.
*
* @param spec The task spec for the new task.
* @param state The scheduling state for the new task.
* @param node The ID of the node that the task is scheduled on, if any.
*/
task *alloc_task(task_spec *spec, scheduling_state state, node_id node);
/* The scheduling state of the task instance. */
int32_t *task_instance_state(task_instance *instance);
/** Size of task structure in bytes. */
int64_t task_size(task *task);
/* Node this task instance has been assigned to or is running on. */
node_id *task_instance_node(task_instance *instance);
/** The scheduling state of the task. */
scheduling_state task_state(task *task);
/* Task specification of this task instance. */
task_spec *task_instance_task_spec(task_instance *instance);
/** Node this task has been assigned to or is running on. */
node_id task_node(task *task);
/* Free this task instance datastructure. */
void task_instance_free(task_instance *instance);
/** Task specification of this task. */
task_spec *task_task_spec(task *task);
/*
/** Task ID of this task. */
task_id task_task_id(task *task);
/** Free this task datastructure. */
void free_task(task *task);
/**
* ==== Task update ====
* Contains the information necessary to update a task in the task log.
*/
typedef struct {
int32_t state;
scheduling_state state;
node_id node;
} task_update;
+62 -35
View File
@@ -8,7 +8,7 @@
#include "test_common.h"
#include "state/db.h"
#include "state/object_table.h"
#include "state/task_log.h"
#include "state/task_table.h"
#include "state/redis.h"
#include "task.h"
@@ -54,7 +54,7 @@ void lookup_done_callback(object_id object_id,
void add_done_callback(object_id object_id, void *user_context) {}
/* Test if we got a timeout callback if we couldn't connect database. */
void timeout_callback(object_id object_id, void *context) {
void timeout_callback(object_id object_id, void *context, void *user_data) {
user_context *uc = (user_context *) context;
CHECK(uc->test_number == TEST_NUMBER)
}
@@ -101,71 +101,98 @@ TEST object_table_lookup_test(void) {
PASS();
}
void task_log_test_callback(task_instance *instance, void *userdata) {
task_instance *other = userdata;
CHECK(*task_instance_state(instance) == TASK_STATUS_SCHEDULED);
CHECK(task_instance_size(instance) == task_instance_size(other));
CHECK(memcmp(instance, other, task_instance_size(instance)) == 0);
int task_table_test_callback_called = 0;
task *task_table_test_task;
void task_table_test_fail_callback(unique_id id,
void *context,
void *user_data) {
event_loop *loop = user_data;
event_loop_stop(loop);
}
TEST task_log_test(void) {
int64_t task_table_delayed_add_task(event_loop *loop,
int64_t id,
void *context) {
db_handle *db = context;
retry_info retry = {
.num_retries = NUM_RETRIES,
.timeout = TIMEOUT,
.fail_callback = task_table_test_fail_callback,
};
task_table_add_task(db, task_table_test_task, &retry, NULL, (void *) loop);
return EVENT_LOOP_TIMER_DONE;
}
void task_table_test_callback(task *callback_task, void *user_data) {
task_table_test_callback_called = 1;
CHECK(task_state(callback_task) == TASK_STATUS_SCHEDULED);
CHECK(task_size(callback_task) == task_size(task_table_test_task));
CHECK(memcmp(callback_task, task_table_test_task, task_size(callback_task)) ==
0);
event_loop *loop = user_data;
event_loop_stop(loop);
}
TEST task_table_test(void) {
task_table_test_callback_called = 0;
event_loop *loop = event_loop_create();
db_handle *db = db_connect("127.0.0.1", 6379, "local_scheduler", "", -1);
db_attach(db, loop);
node_id node = globally_unique_id();
task_spec *task = example_task();
task_instance *instance = make_task_instance(globally_unique_id(), task,
TASK_STATUS_SCHEDULED, node);
task_spec *spec = example_task_spec();
task_table_test_task = alloc_task(spec, TASK_STATUS_SCHEDULED, node);
free_task_spec(spec);
retry_info retry = {
.num_retries = NUM_RETRIES, .timeout = TIMEOUT, .fail_callback = NULL,
.num_retries = NUM_RETRIES,
.timeout = TIMEOUT,
.fail_callback = task_table_test_fail_callback,
};
task_log_subscribe(db, node, TASK_STATUS_SCHEDULED, task_log_test_callback,
instance, &retry, NULL, NULL);
task_log_publish(db, instance, &retry, NULL, NULL);
event_loop_add_timer(loop, 200, (event_loop_timer_handler) timeout_handler,
NULL);
task_table_subscribe(db, node, TASK_STATUS_SCHEDULED,
task_table_test_callback, (void *) loop, &retry, NULL,
(void *) loop);
event_loop_add_timer(
loop, 200, (event_loop_timer_handler) task_table_delayed_add_task, db);
event_loop_run(loop);
task_instance_free(instance);
free_task_spec(task);
free_task(task_table_test_task);
db_disconnect(db);
destroy_outstanding_callbacks(loop);
event_loop_destroy(loop);
ASSERT(task_table_test_callback_called);
PASS();
}
int num_test_callback_called = 0;
void task_log_all_test_callback(task_instance *instance, void *userdata) {
void task_table_all_test_callback(task *task, void *user_data) {
num_test_callback_called += 1;
}
TEST task_log_all_test(void) {
TEST task_table_all_test(void) {
event_loop *loop = event_loop_create();
db_handle *db = db_connect("127.0.0.1", 6379, "local_scheduler", "", -1);
db_attach(db, loop);
task_spec *task = example_task();
task_spec *spec = example_task_spec();
/* Schedule two tasks on different nodes. */
task_instance *instance1 = make_task_instance(
globally_unique_id(), task, TASK_STATUS_SCHEDULED, globally_unique_id());
task_instance *instance2 = make_task_instance(
globally_unique_id(), task, TASK_STATUS_SCHEDULED, globally_unique_id());
task *task1 = alloc_task(spec, TASK_STATUS_SCHEDULED, globally_unique_id());
task *task2 = alloc_task(spec, TASK_STATUS_SCHEDULED, globally_unique_id());
retry_info retry = {
.num_retries = NUM_RETRIES, .timeout = TIMEOUT, .fail_callback = NULL,
};
task_log_subscribe(db, NIL_ID, TASK_STATUS_SCHEDULED,
task_log_all_test_callback, NULL, &retry, NULL, NULL);
task_table_subscribe(db, NIL_ID, TASK_STATUS_SCHEDULED,
task_table_all_test_callback, NULL, &retry, NULL, NULL);
event_loop_add_timer(loop, 50, (event_loop_timer_handler) timeout_handler,
NULL);
event_loop_run(loop);
/* TODO(pcm): Get rid of this sleep once the robust pubsub is implemented. */
task_log_publish(db, instance1, &retry, NULL, NULL);
task_log_publish(db, instance2, &retry, NULL, NULL);
task_table_update(db, task1, &retry, NULL, NULL);
task_table_update(db, task2, &retry, NULL, NULL);
event_loop_add_timer(loop, 200, (event_loop_timer_handler) timeout_handler,
NULL);
event_loop_run(loop);
task_instance_free(instance2);
task_instance_free(instance1);
free_task_spec(task);
free(task2);
free(task1);
free_task_spec(spec);
db_disconnect(db);
destroy_outstanding_callbacks(loop);
event_loop_destroy(loop);
@@ -198,8 +225,8 @@ TEST unique_client_id_test(void) {
SUITE(db_tests) {
RUN_REDIS_TEST(object_table_lookup_test);
RUN_REDIS_TEST(task_log_test);
RUN_REDIS_TEST(task_log_all_test);
RUN_REDIS_TEST(task_table_test);
RUN_REDIS_TEST(task_table_all_test);
RUN_REDIS_TEST(unique_client_id_test);
}
+165 -23
View File
@@ -12,6 +12,132 @@ SUITE(object_table_tests);
static event_loop *g_loop;
/* ==== Test adding and looking up metadata ==== */
int new_object_failed = 0;
int new_object_succeeded = 0;
object_id new_object_id;
task *new_object_task;
task_spec *new_object_task_spec;
task_id new_object_task_id;
void new_object_fail_callback(unique_id id,
void *user_context,
void *user_data) {
new_object_failed = 1;
event_loop_stop(g_loop);
}
/* === Test adding an object with an associated task === */
void new_object_done_callback(object_id object_id,
task *task,
void *user_context) {
new_object_succeeded = 1;
CHECK(object_ids_equal(object_id, new_object_id));
CHECK(task);
CHECK(memcmp(task, new_object_task, task_size(task)) == 0);
event_loop_stop(g_loop);
}
void new_object_lookup_callback(object_id object_id, void *user_context) {
CHECK(object_ids_equal(object_id, new_object_id));
retry_info retry = {
.num_retries = 5,
.timeout = 100,
.fail_callback = new_object_fail_callback,
};
db_handle *db = user_context;
result_table_lookup(db, new_object_id, &retry, new_object_done_callback,
NULL);
}
void new_object_task_callback(task_id task_id, void *user_context) {
retry_info retry = {
.num_retries = 5,
.timeout = 100,
.fail_callback = new_object_fail_callback,
};
db_handle *db = user_context;
result_table_add(db, new_object_id, new_object_task_id, &retry,
new_object_lookup_callback, (void *) db);
}
TEST new_object_test(void) {
new_object_failed = 0;
new_object_succeeded = 0;
new_object_id = globally_unique_id();
new_object_task = example_task();
new_object_task_spec = task_task_spec(new_object_task);
new_object_task_id = task_spec_id(new_object_task_spec);
g_loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234);
db_attach(db, g_loop);
retry_info retry = {
.num_retries = 5,
.timeout = 100,
.fail_callback = new_object_fail_callback,
};
task_table_add_task(db, new_object_task, &retry, new_object_task_callback,
db);
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
event_loop_destroy(g_loop);
free_task(new_object_task);
ASSERT(new_object_succeeded);
ASSERT(!new_object_failed);
PASS();
}
/* === Test adding an object without an associated task === */
void new_object_no_task_lookup_callback(object_id object_id,
task *task,
void *user_context) {
new_object_succeeded = 1;
CHECK(task == NULL);
event_loop_stop(g_loop);
}
void new_object_no_task_callback(object_id object_id, void *user_context) {
CHECK(node_ids_equal(object_id, new_object_id));
retry_info retry = {
.num_retries = 5,
.timeout = 100,
.fail_callback = new_object_fail_callback,
};
db_handle *db = user_context;
result_table_lookup(db, object_id, &retry, new_object_no_task_lookup_callback,
NULL);
}
TEST new_object_no_task_test(void) {
new_object_failed = 0;
new_object_succeeded = 0;
new_object_id = globally_unique_id();
new_object_task_id = globally_unique_id();
g_loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234);
db_attach(db, g_loop);
retry_info retry = {
.num_retries = 5,
.timeout = 100,
.fail_callback = new_object_fail_callback,
};
result_table_add(db, new_object_id, new_object_task_id, &retry,
new_object_no_task_callback, db);
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
event_loop_destroy(g_loop);
ASSERT(new_object_succeeded);
ASSERT(!new_object_failed);
PASS();
}
/* ==== Test if operations time out correctly ==== */
/* === Test lookup timeout === */
@@ -27,9 +153,9 @@ void lookup_done_callback(object_id object_id,
CHECK(0);
}
void lookup_fail_callback(unique_id id, void *user_data) {
void lookup_fail_callback(unique_id id, void *user_context, void *user_data) {
lookup_failed = 1;
CHECK(user_data == (void *) lookup_timeout_context);
CHECK(user_context == (void *) lookup_timeout_context);
event_loop_stop(g_loop);
}
@@ -63,9 +189,9 @@ void add_done_callback(object_id object_id, void *user_context) {
CHECK(0);
}
void add_fail_callback(unique_id id, void *user_data) {
void add_fail_callback(unique_id id, void *user_context, void *user_data) {
add_failed = 1;
CHECK(user_data == (void *) add_timeout_context);
CHECK(user_context == (void *) add_timeout_context);
event_loop_stop(g_loop);
}
@@ -99,9 +225,11 @@ void subscribe_done_callback(object_id object_id, void *user_context) {
CHECK(0);
}
void subscribe_fail_callback(unique_id id, void *user_data) {
void subscribe_fail_callback(unique_id id,
void *user_context,
void *user_data) {
subscribe_failed = 1;
CHECK(user_data == (void *) subscribe_timeout_context);
CHECK(user_context == (void *) subscribe_timeout_context);
event_loop_stop(g_loop);
}
@@ -166,7 +294,9 @@ void lookup_retry_done_callback(object_id object_id,
free(manager_vector);
}
void lookup_retry_fail_callback(unique_id id, void *user_data) {
void lookup_retry_fail_callback(unique_id id,
void *user_context,
void *user_data) {
/* The fail callback should not be called. */
CHECK(0);
}
@@ -210,7 +340,9 @@ void add_retry_done_callback(object_id object_id, void *user_context) {
add_retry_succeeded = 1;
}
void add_retry_fail_callback(unique_id id, void *user_data) {
void add_retry_fail_callback(unique_id id,
void *user_context,
void *user_data) {
/* The fail callback should not be called. */
CHECK(0);
}
@@ -269,7 +401,9 @@ void subscribe_retry_done_callback(object_id object_id, void *user_context) {
subscribe_retry_succeeded = 1;
}
void subscribe_retry_fail_callback(unique_id id, void *user_data) {
void subscribe_retry_fail_callback(unique_id id,
void *user_context,
void *user_data) {
/* The fail callback should not be called. */
CHECK(0);
}
@@ -312,7 +446,9 @@ TEST subscribe_retry_test(void) {
const char *lookup_late_context = "lookup_late";
int lookup_late_failed = 0;
void lookup_late_fail_callback(unique_id id, void *user_context) {
void lookup_late_fail_callback(unique_id id,
void *user_context,
void *user_data) {
CHECK(user_context == (void *) lookup_late_context);
lookup_late_failed = 1;
}
@@ -357,7 +493,7 @@ TEST lookup_late_test(void) {
const char *add_late_context = "add_late";
int add_late_failed = 0;
void add_late_fail_callback(unique_id id, void *user_context) {
void add_late_fail_callback(unique_id id, void *user_context, void *user_data) {
CHECK(user_context == (void *) add_late_context);
add_late_failed = 1;
}
@@ -397,7 +533,9 @@ TEST add_late_test(void) {
const char *subscribe_late_context = "subscribe_late";
int subscribe_late_failed = 0;
void subscribe_late_fail_callback(unique_id id, void *user_context) {
void subscribe_late_fail_callback(unique_id id,
void *user_context,
void *user_data) {
CHECK(user_context == (void *) subscribe_late_context);
subscribe_late_failed = 1;
}
@@ -441,7 +579,9 @@ const char *subscribe_success_context = "subscribe_success";
int subscribe_success_done = 0;
int subscribe_success_succeeded = 0;
void subscribe_success_fail_callback(unique_id id, void *user_context) {
void subscribe_success_fail_callback(unique_id id,
void *user_context,
void *user_data) {
/* This function should never be called. */
CHECK(0);
}
@@ -492,16 +632,18 @@ TEST subscribe_success_test(void) {
}
SUITE(object_table_tests) {
RUN_TEST(lookup_timeout_test);
RUN_TEST(add_timeout_test);
RUN_TEST(subscribe_timeout_test);
RUN_TEST(lookup_retry_test);
RUN_TEST(add_retry_test);
RUN_TEST(subscribe_retry_test);
RUN_TEST(lookup_late_test);
RUN_TEST(add_late_test);
RUN_TEST(subscribe_late_test);
RUN_TEST(subscribe_success_test);
RUN_REDIS_TEST(new_object_test);
RUN_REDIS_TEST(new_object_no_task_test);
RUN_REDIS_TEST(lookup_timeout_test);
RUN_REDIS_TEST(add_timeout_test);
RUN_REDIS_TEST(subscribe_timeout_test);
RUN_REDIS_TEST(lookup_retry_test);
RUN_REDIS_TEST(add_retry_test);
RUN_REDIS_TEST(subscribe_retry_test);
RUN_REDIS_TEST(lookup_late_test);
RUN_REDIS_TEST(add_late_test);
RUN_REDIS_TEST(subscribe_late_test);
RUN_REDIS_TEST(subscribe_success_test);
}
GREATEST_MAIN_DEFS();
-316
View File
@@ -1,316 +0,0 @@
#include "greatest.h"
#include "event_loop.h"
#include "test_common.h"
#include "common.h"
#include "state/object_table.h"
#include "state/redis.h"
#include <unistd.h>
#include <ae.h>
SUITE(task_log_tests);
event_loop *loop;
/* ==== Test if operations time out correctly ==== */
/* === Test subscribe timeout === */
const char *subscribe_timeout_context = "subscribe_timeout";
int subscribe_failed = 0;
void subscribe_done_callback(task_iid task_iid, void *user_context) {
/* The done callback should not be called. */
CHECK(0);
}
void subscribe_fail_callback(unique_id id, void *user_data) {
subscribe_failed = 1;
CHECK(user_data == (void *) subscribe_timeout_context);
event_loop_stop(loop);
}
TEST subscribe_timeout_test(void) {
loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234);
db_attach(db, loop);
retry_info retry = {
.num_retries = 5,
.timeout = 100,
.fail_callback = subscribe_fail_callback,
};
task_log_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry,
subscribe_done_callback,
(void *) subscribe_timeout_context);
/* Disconnect the database to see if the subscribe times out. */
close(db->sub_context->c.fd);
aeProcessEvents(loop, AE_TIME_EVENTS);
event_loop_run(loop);
db_disconnect(db);
destroy_outstanding_callbacks(loop);
event_loop_destroy(loop);
ASSERT(subscribe_failed);
PASS();
}
/* === Test publish timeout === */
const char *publish_timeout_context = "publish_timeout";
const int publish_test_number = 272;
int publish_failed = 0;
void publish_done_callback(task_iid task_iid, void *user_context) {
/* The done callback should not be called. */
CHECK(0);
}
void publish_fail_callback(unique_id id, void *user_data) {
publish_failed = 1;
CHECK(user_data == (void *) publish_timeout_context);
event_loop_stop(loop);
}
TEST publish_timeout_test(void) {
loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234);
db_attach(db, loop);
task_instance *task = example_task_instance();
retry_info retry = {
.num_retries = 5, .timeout = 100, .fail_callback = publish_fail_callback,
};
task_log_publish(db, task, &retry, publish_done_callback,
(void *) publish_timeout_context);
/* Disconnect the database to see if the publish times out. */
close(db->context->c.fd);
aeProcessEvents(loop, AE_TIME_EVENTS);
event_loop_run(loop);
db_disconnect(db);
destroy_outstanding_callbacks(loop);
event_loop_destroy(loop);
ASSERT(publish_failed);
task_instance_free(task);
PASS();
}
/* ==== Test if the retry is working correctly ==== */
int64_t reconnect_db_callback(event_loop *loop,
int64_t timer_id,
void *context) {
db_handle *db = context;
/* Reconnect to redis. */
redisAsyncFree(db->sub_context);
db->sub_context = redisAsyncConnect("127.0.0.1", 6379);
db->sub_context->data = (void *) db;
/* Re-attach the database to the event loop (the file descriptor changed). */
db_attach(db, loop);
return EVENT_LOOP_TIMER_DONE;
}
int64_t terminate_event_loop_callback(event_loop *loop,
int64_t timer_id,
void *context) {
event_loop_stop(loop);
return EVENT_LOOP_TIMER_DONE;
}
/* === Test subscribe retry === */
const char *subscribe_retry_context = "subscribe_retry";
const int subscribe_retry_test_number = 273;
int subscribe_retry_succeeded = 0;
void subscribe_retry_done_callback(object_id object_id, void *user_context) {
CHECK(user_context == (void *) subscribe_retry_context);
subscribe_retry_succeeded = 1;
}
void subscribe_retry_fail_callback(unique_id id, void *user_data) {
/* The fail callback should not be called. */
CHECK(0);
}
TEST subscribe_retry_test(void) {
loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235);
db_attach(db, loop);
retry_info retry = {
.num_retries = 5,
.timeout = 100,
.fail_callback = subscribe_retry_fail_callback,
};
task_log_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry,
subscribe_retry_done_callback,
(void *) subscribe_retry_context);
/* Disconnect the database to see if the subscribe times out. */
close(db->sub_context->c.fd);
/* Install handler for reconnecting the database. */
event_loop_add_timer(loop, 150,
(event_loop_timer_handler) reconnect_db_callback, db);
/* Install handler for terminating the event loop. */
event_loop_add_timer(loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
event_loop_run(loop);
db_disconnect(db);
destroy_outstanding_callbacks(loop);
event_loop_destroy(loop);
ASSERT(subscribe_retry_succeeded);
PASS();
}
/* === Test publish retry === */
const char *publish_retry_context = "publish_retry";
int publish_retry_succeeded = 0;
void publish_retry_done_callback(object_id object_id, void *user_context) {
CHECK(user_context == (void *) publish_retry_context);
publish_retry_succeeded = 1;
}
void publish_retry_fail_callback(unique_id id, void *user_data) {
/* The fail callback should not be called. */
CHECK(0);
}
TEST publish_retry_test(void) {
loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235);
db_attach(db, loop);
task_instance *task = example_task_instance();
retry_info retry = {
.num_retries = 5,
.timeout = 100,
.fail_callback = publish_retry_fail_callback,
};
task_log_publish(db, task, &retry, publish_retry_done_callback,
(void *) publish_retry_context);
/* Disconnect the database to see if the publish times out. */
close(db->sub_context->c.fd);
/* Install handler for reconnecting the database. */
event_loop_add_timer(loop, 150,
(event_loop_timer_handler) reconnect_db_callback, db);
/* Install handler for terminating the event loop. */
event_loop_add_timer(loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
event_loop_run(loop);
db_disconnect(db);
destroy_outstanding_callbacks(loop);
event_loop_destroy(loop);
ASSERT(publish_retry_succeeded);
task_instance_free(task);
PASS();
}
/* ==== Test if late succeed is working correctly ==== */
/* === Test subscribe late succeed === */
const char *subscribe_late_context = "subscribe_late";
int subscribe_late_failed = 0;
void subscribe_late_fail_callback(unique_id id, void *user_context) {
CHECK(user_context == (void *) subscribe_late_context);
subscribe_late_failed = 1;
}
void subscribe_late_done_callback(task_iid task_iid, void *user_context) {
/* This function should never be called. */
CHECK(0);
}
TEST subscribe_late_test(void) {
loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236);
db_attach(db, loop);
retry_info retry = {
.num_retries = 0,
.timeout = 0,
.fail_callback = subscribe_late_fail_callback,
};
task_log_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry,
subscribe_late_done_callback,
(void *) subscribe_late_context);
/* Install handler for terminating the event loop. */
event_loop_add_timer(loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
/* First process timer events to make sure the timeout is processed before
* anything else. */
aeProcessEvents(loop, AE_TIME_EVENTS);
event_loop_run(loop);
db_disconnect(db);
destroy_outstanding_callbacks(loop);
event_loop_destroy(loop);
ASSERT(subscribe_late_failed);
PASS();
}
/* === Test publish late succeed === */
const char *publish_late_context = "publish_late";
int publish_late_failed = 0;
void publish_late_fail_callback(unique_id id, void *user_context) {
CHECK(user_context == (void *) publish_late_context);
publish_late_failed = 1;
}
void publish_late_done_callback(task_iid task_iik, void *user_context) {
/* This function should never be called. */
CHECK(0);
}
TEST publish_late_test(void) {
loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236);
db_attach(db, loop);
task_instance *task = example_task_instance();
retry_info retry = {
.num_retries = 0,
.timeout = 0,
.fail_callback = publish_late_fail_callback,
};
task_log_publish(db, task, &retry, publish_late_done_callback,
(void *) publish_late_context);
/* Install handler for terminating the event loop. */
event_loop_add_timer(loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
/* First process timer events to make sure the timeout is processed before
* anything else. */
aeProcessEvents(loop, AE_TIME_EVENTS);
event_loop_run(loop);
db_disconnect(db);
destroy_outstanding_callbacks(loop);
event_loop_destroy(loop);
ASSERT(publish_late_failed);
task_instance_free(task);
PASS();
}
SUITE(task_log_tests) {
RUN_TEST(subscribe_timeout_test);
RUN_TEST(publish_timeout_test);
RUN_TEST(subscribe_retry_test);
RUN_TEST(publish_retry_test);
RUN_TEST(subscribe_late_test);
RUN_TEST(publish_late_test);
}
GREATEST_MAIN_DEFS();
int main(int argc, char **argv) {
GREATEST_MAIN_BEGIN();
RUN_SUITE(task_log_tests);
GREATEST_MAIN_END();
}
+430
View File
@@ -0,0 +1,430 @@
#include "greatest.h"
#include "event_loop.h"
#include "test_common.h"
#include "common.h"
#include "state/object_table.h"
#include "state/redis.h"
#include <unistd.h>
#include <ae.h>
SUITE(task_table_tests);
event_loop *g_loop;
/* ==== Test operations in non-failure scenario ==== */
/* === A lookup of a task not in the table === */
task_id lookup_nil_id;
int lookup_nil_success = 0;
const char *lookup_nil_context = "lookup_nil";
void lookup_nil_fail_callback(unique_id id,
void *user_context,
void *user_data) {
/* The fail callback should not be called. */
CHECK(0);
}
void lookup_nil_success_callback(task *task, void *context) {
lookup_nil_success = 1;
CHECK(task == NULL);
CHECK(context == (void *) lookup_nil_context);
event_loop_stop(g_loop);
}
TEST lookup_nil_test(void) {
lookup_nil_id = globally_unique_id();
g_loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234);
db_attach(db, g_loop);
retry_info retry = {
.num_retries = 5,
.timeout = 1000,
.fail_callback = lookup_nil_fail_callback,
};
task_table_get_task(db, lookup_nil_id, &retry, lookup_nil_success_callback,
(void *) lookup_nil_context);
/* Disconnect the database to see if the lookup times out. */
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
event_loop_destroy(g_loop);
ASSERT(lookup_nil_success);
PASS();
}
/* === A lookup of a task after it's added returns the same spec === */
int add_success = 0;
int lookup_success = 0;
task *add_lookup_task;
const char *add_lookup_context = "add_lookup";
void add_lookup_fail_callback(unique_id id,
void *user_context,
void *user_data) {
/* The fail callback should not be called. */
CHECK(0);
}
void lookup_success_callback(task *task, void *context) {
lookup_success = 1;
CHECK(memcmp(task, add_lookup_task, task_size(task)) == 0);
event_loop_stop(g_loop);
}
void add_success_callback(task_id task_id, void *context) {
add_success = 1;
CHECK(task_ids_equal(task_id, task_task_id(add_lookup_task)));
db_handle *db = context;
retry_info retry = {
.num_retries = 5,
.timeout = 1000,
.fail_callback = add_lookup_fail_callback,
};
task_table_get_task(db, task_id, &retry, lookup_success_callback,
(void *) add_lookup_context);
}
TEST add_lookup_test(void) {
add_lookup_task = example_task();
g_loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234);
db_attach(db, g_loop);
retry_info retry = {
.num_retries = 5,
.timeout = 1000,
.fail_callback = add_lookup_fail_callback,
};
task_table_add_task(db, add_lookup_task, &retry, add_success_callback,
(void *) db);
/* Disconnect the database to see if the lookup times out. */
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
event_loop_destroy(g_loop);
free(add_lookup_task);
ASSERT(add_success);
ASSERT(lookup_success);
PASS();
}
/* ==== Test if operations time out correctly ==== */
/* === Test subscribe timeout === */
const char *subscribe_timeout_context = "subscribe_timeout";
int subscribe_failed = 0;
void subscribe_done_callback(task_id task_id, void *user_context) {
/* The done callback should not be called. */
CHECK(0);
}
void subscribe_fail_callback(unique_id id,
void *user_context,
void *user_data) {
subscribe_failed = 1;
CHECK(user_context == (void *) subscribe_timeout_context);
event_loop_stop(g_loop);
}
TEST subscribe_timeout_test(void) {
g_loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234);
db_attach(db, g_loop);
retry_info retry = {
.num_retries = 5,
.timeout = 100,
.fail_callback = subscribe_fail_callback,
};
task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry,
subscribe_done_callback,
(void *) subscribe_timeout_context);
/* Disconnect the database to see if the subscribe times out. */
close(db->sub_context->c.fd);
aeProcessEvents(g_loop, AE_TIME_EVENTS);
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
event_loop_destroy(g_loop);
ASSERT(subscribe_failed);
PASS();
}
/* === Test publish timeout === */
const char *publish_timeout_context = "publish_timeout";
const int publish_test_number = 272;
int publish_failed = 0;
void publish_done_callback(task_id task_id, void *user_context) {
/* The done callback should not be called. */
CHECK(0);
}
void publish_fail_callback(unique_id id, void *user_context, void *user_data) {
publish_failed = 1;
CHECK(user_context == (void *) publish_timeout_context);
event_loop_stop(g_loop);
}
TEST publish_timeout_test(void) {
g_loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 1234);
db_attach(db, g_loop);
task *task = example_task();
retry_info retry = {
.num_retries = 5, .timeout = 100, .fail_callback = publish_fail_callback,
};
task_table_update(db, task, &retry, publish_done_callback,
(void *) publish_timeout_context);
/* Disconnect the database to see if the publish times out. */
close(db->context->c.fd);
aeProcessEvents(g_loop, AE_TIME_EVENTS);
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
event_loop_destroy(g_loop);
ASSERT(publish_failed);
free_task(task);
PASS();
}
/* ==== Test if the retry is working correctly ==== */
int64_t reconnect_db_callback(event_loop *loop,
int64_t timer_id,
void *context) {
db_handle *db = context;
/* Reconnect to redis. */
redisAsyncFree(db->sub_context);
db->sub_context = redisAsyncConnect("127.0.0.1", 6379);
db->sub_context->data = (void *) db;
/* Re-attach the database to the event loop (the file descriptor changed). */
db_attach(db, loop);
return EVENT_LOOP_TIMER_DONE;
}
int64_t terminate_event_loop_callback(event_loop *loop,
int64_t timer_id,
void *context) {
event_loop_stop(loop);
return EVENT_LOOP_TIMER_DONE;
}
/* === Test subscribe retry === */
const char *subscribe_retry_context = "subscribe_retry";
const int subscribe_retry_test_number = 273;
int subscribe_retry_succeeded = 0;
void subscribe_retry_done_callback(object_id object_id, void *user_context) {
CHECK(user_context == (void *) subscribe_retry_context);
subscribe_retry_succeeded = 1;
}
void subscribe_retry_fail_callback(unique_id id,
void *user_context,
void *user_data) {
/* The fail callback should not be called. */
CHECK(0);
}
TEST subscribe_retry_test(void) {
g_loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235);
db_attach(db, g_loop);
retry_info retry = {
.num_retries = 5,
.timeout = 100,
.fail_callback = subscribe_retry_fail_callback,
};
task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry,
subscribe_retry_done_callback,
(void *) subscribe_retry_context);
/* Disconnect the database to see if the subscribe times out. */
close(db->sub_context->c.fd);
/* Install handler for reconnecting the database. */
event_loop_add_timer(g_loop, 150,
(event_loop_timer_handler) reconnect_db_callback, db);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
event_loop_destroy(g_loop);
ASSERT(subscribe_retry_succeeded);
PASS();
}
/* === Test publish retry === */
const char *publish_retry_context = "publish_retry";
int publish_retry_succeeded = 0;
void publish_retry_done_callback(object_id object_id, void *user_context) {
CHECK(user_context == (void *) publish_retry_context);
publish_retry_succeeded = 1;
}
void publish_retry_fail_callback(unique_id id,
void *user_context,
void *user_data) {
/* The fail callback should not be called. */
CHECK(0);
}
TEST publish_retry_test(void) {
g_loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235);
db_attach(db, g_loop);
task *task = example_task();
retry_info retry = {
.num_retries = 5,
.timeout = 100,
.fail_callback = publish_retry_fail_callback,
};
task_table_update(db, task, &retry, publish_retry_done_callback,
(void *) publish_retry_context);
/* Disconnect the database to see if the publish times out. */
close(db->sub_context->c.fd);
/* Install handler for reconnecting the database. */
event_loop_add_timer(g_loop, 150,
(event_loop_timer_handler) reconnect_db_callback, db);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
event_loop_destroy(g_loop);
ASSERT(publish_retry_succeeded);
free_task(task);
PASS();
}
/* ==== Test if late succeed is working correctly ==== */
/* === Test subscribe late succeed === */
const char *subscribe_late_context = "subscribe_late";
int subscribe_late_failed = 0;
void subscribe_late_fail_callback(unique_id id,
void *user_context,
void *user_data) {
CHECK(user_context == (void *) subscribe_late_context);
subscribe_late_failed = 1;
}
void subscribe_late_done_callback(task_id task_id, void *user_context) {
/* This function should never be called. */
CHECK(0);
}
TEST subscribe_late_test(void) {
g_loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236);
db_attach(db, g_loop);
retry_info retry = {
.num_retries = 0,
.timeout = 0,
.fail_callback = subscribe_late_fail_callback,
};
task_table_subscribe(db, NIL_ID, TASK_STATUS_WAITING, NULL, NULL, &retry,
subscribe_late_done_callback,
(void *) subscribe_late_context);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
/* First process timer events to make sure the timeout is processed before
* anything else. */
aeProcessEvents(g_loop, AE_TIME_EVENTS);
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
event_loop_destroy(g_loop);
ASSERT(subscribe_late_failed);
PASS();
}
/* === Test publish late succeed === */
const char *publish_late_context = "publish_late";
int publish_late_failed = 0;
void publish_late_fail_callback(unique_id id,
void *user_context,
void *user_data) {
CHECK(user_context == (void *) publish_late_context);
publish_late_failed = 1;
}
void publish_late_done_callback(task_id task_id, void *user_context) {
/* This function should never be called. */
CHECK(0);
}
TEST publish_late_test(void) {
g_loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236);
db_attach(db, g_loop);
task *task = example_task();
retry_info retry = {
.num_retries = 0,
.timeout = 0,
.fail_callback = publish_late_fail_callback,
};
task_table_update(db, task, &retry, publish_late_done_callback,
(void *) publish_late_context);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
/* First process timer events to make sure the timeout is processed before
* anything else. */
aeProcessEvents(g_loop, AE_TIME_EVENTS);
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
event_loop_destroy(g_loop);
ASSERT(publish_late_failed);
free_task(task);
PASS();
}
SUITE(task_table_tests) {
RUN_REDIS_TEST(lookup_nil_test);
RUN_REDIS_TEST(add_lookup_test);
RUN_TEST(subscribe_timeout_test);
RUN_TEST(publish_timeout_test);
RUN_TEST(subscribe_retry_test);
RUN_TEST(publish_retry_test);
RUN_TEST(subscribe_late_test);
RUN_TEST(publish_late_test);
}
GREATEST_MAIN_DEFS();
int main(int argc, char **argv) {
GREATEST_MAIN_BEGIN();
RUN_SUITE(task_table_tests);
GREATEST_MAIN_END();
}
+80 -80
View File
@@ -14,33 +14,33 @@ SUITE(task_tests);
TEST task_test(void) {
task_id parent_task_id = globally_unique_id();
function_id func_id = globally_unique_id();
task_spec *task =
task_spec *spec =
start_construct_task_spec(parent_task_id, 0, func_id, 4, 2, 10);
ASSERT(task_num_args(task) == 4);
ASSERT(task_num_returns(task) == 2);
ASSERT(task_num_args(spec) == 4);
ASSERT(task_num_returns(spec) == 2);
unique_id arg1 = globally_unique_id();
ASSERT(task_args_add_ref(task, arg1) == 0);
ASSERT(task_args_add_val(task, (uint8_t *) "hello", 5) == 1);
ASSERT(task_args_add_ref(spec, arg1) == 0);
ASSERT(task_args_add_val(spec, (uint8_t *) "hello", 5) == 1);
unique_id arg2 = globally_unique_id();
ASSERT(task_args_add_ref(task, arg2) == 2);
ASSERT(task_args_add_val(task, (uint8_t *) "world", 5) == 3);
/* Finish constructing the task. This constructs the task ID and the task
ASSERT(task_args_add_ref(spec, arg2) == 2);
ASSERT(task_args_add_val(spec, (uint8_t *) "world", 5) == 3);
/* Finish constructing the spec. This constructs the task ID and the
* return IDs. */
finish_construct_task_spec(task);
finish_construct_task_spec(spec);
/* Check that the task was constructed as expected. */
ASSERT(task_num_args(task) == 4);
ASSERT(task_num_returns(task) == 2);
ASSERT(function_ids_equal(task_function(task), func_id));
ASSERT(object_ids_equal(task_arg_id(task, 0), arg1));
ASSERT(memcmp(task_arg_val(task, 1), (uint8_t *) "hello",
task_arg_length(task, 1)) == 0);
ASSERT(object_ids_equal(task_arg_id(task, 2), arg2));
ASSERT(memcmp(task_arg_val(task, 3), (uint8_t *) "world",
task_arg_length(task, 3)) == 0);
/* Check that the spec was constructed as expected. */
ASSERT(task_num_args(spec) == 4);
ASSERT(task_num_returns(spec) == 2);
ASSERT(function_ids_equal(task_function(spec), func_id));
ASSERT(object_ids_equal(task_arg_id(spec, 0), arg1));
ASSERT(memcmp(task_arg_val(spec, 1), (uint8_t *) "hello",
task_arg_length(spec, 1)) == 0);
ASSERT(object_ids_equal(task_arg_id(spec, 2), arg2));
ASSERT(memcmp(task_arg_val(spec, 3), (uint8_t *) "world",
task_arg_length(spec, 3)) == 0);
free_task_spec(task);
free_task_spec(spec);
PASS();
}
@@ -52,121 +52,121 @@ TEST deterministic_ids_test(void) {
uint8_t *arg2 = (uint8_t *) "hello world";
/* Construct a first task. */
task_spec *task1 =
task_spec *spec1 =
start_construct_task_spec(parent_task_id, 0, func_id, 2, 3, 11);
task_args_add_ref(task1, arg1);
task_args_add_val(task1, arg2, 11);
finish_construct_task_spec(task1);
task_args_add_ref(spec1, arg1);
task_args_add_val(spec1, arg2, 11);
finish_construct_task_spec(spec1);
/* Construct a second identical task. */
task_spec *task2 =
task_spec *spec2 =
start_construct_task_spec(parent_task_id, 0, func_id, 2, 3, 11);
task_args_add_ref(task2, arg1);
task_args_add_val(task2, arg2, 11);
finish_construct_task_spec(task2);
task_args_add_ref(spec2, arg1);
task_args_add_val(spec2, arg2, 11);
finish_construct_task_spec(spec2);
/* Check that these tasks have the same task IDs and the same return IDs.*/
ASSERT(task_ids_equal(task_task_id(task1), task_task_id(task2)));
ASSERT(object_ids_equal(task_return(task1, 0), task_return(task2, 0)));
ASSERT(object_ids_equal(task_return(task1, 1), task_return(task2, 1)));
ASSERT(object_ids_equal(task_return(task1, 2), task_return(task2, 2)));
ASSERT(task_ids_equal(task_spec_id(spec1), task_spec_id(spec2)));
ASSERT(object_ids_equal(task_return(spec1, 0), task_return(spec2, 0)));
ASSERT(object_ids_equal(task_return(spec1, 1), task_return(spec2, 1)));
ASSERT(object_ids_equal(task_return(spec1, 2), task_return(spec2, 2)));
/* Check that the return IDs are all distinct. */
ASSERT(!object_ids_equal(task_return(task1, 0), task_return(task2, 1)));
ASSERT(!object_ids_equal(task_return(task1, 0), task_return(task2, 2)));
ASSERT(!object_ids_equal(task_return(task1, 1), task_return(task2, 2)));
ASSERT(!object_ids_equal(task_return(spec1, 0), task_return(spec2, 1)));
ASSERT(!object_ids_equal(task_return(spec1, 0), task_return(spec2, 2)));
ASSERT(!object_ids_equal(task_return(spec1, 1), task_return(spec2, 2)));
/* Create more tasks that are only mildly different. */
/* Construct a task with a different parent task ID. */
task_spec *task3 =
task_spec *spec3 =
start_construct_task_spec(globally_unique_id(), 0, func_id, 2, 3, 11);
task_args_add_ref(task3, arg1);
task_args_add_val(task3, arg2, 11);
finish_construct_task_spec(task3);
task_args_add_ref(spec3, arg1);
task_args_add_val(spec3, arg2, 11);
finish_construct_task_spec(spec3);
/* Construct a task with a different parent counter. */
task_spec *task4 =
task_spec *spec4 =
start_construct_task_spec(parent_task_id, 1, func_id, 2, 3, 11);
task_args_add_ref(task4, arg1);
task_args_add_val(task4, arg2, 11);
finish_construct_task_spec(task4);
task_args_add_ref(spec4, arg1);
task_args_add_val(spec4, arg2, 11);
finish_construct_task_spec(spec4);
/* Construct a task with a different function ID. */
task_spec *task5 = start_construct_task_spec(parent_task_id, 0,
task_spec *spec5 = start_construct_task_spec(parent_task_id, 0,
globally_unique_id(), 2, 3, 11);
task_args_add_ref(task5, arg1);
task_args_add_val(task5, arg2, 11);
finish_construct_task_spec(task5);
task_args_add_ref(spec5, arg1);
task_args_add_val(spec5, arg2, 11);
finish_construct_task_spec(spec5);
/* Construct a task with a different object ID argument. */
task_spec *task6 =
task_spec *spec6 =
start_construct_task_spec(parent_task_id, 0, func_id, 2, 3, 11);
task_args_add_ref(task6, globally_unique_id());
task_args_add_val(task6, arg2, 11);
finish_construct_task_spec(task6);
task_args_add_ref(spec6, globally_unique_id());
task_args_add_val(spec6, arg2, 11);
finish_construct_task_spec(spec6);
/* Construct a task with a different value argument. */
task_spec *task7 =
task_spec *spec7 =
start_construct_task_spec(parent_task_id, 0, func_id, 2, 3, 11);
task_args_add_ref(task7, arg1);
task_args_add_val(task7, (uint8_t *) "hello_world", 11);
finish_construct_task_spec(task7);
task_args_add_ref(spec7, arg1);
task_args_add_val(spec7, (uint8_t *) "hello_world", 11);
finish_construct_task_spec(spec7);
/* Check that the task IDs are all distinct from the original. */
ASSERT(!task_ids_equal(task_task_id(task1), task_task_id(task3)));
ASSERT(!task_ids_equal(task_task_id(task1), task_task_id(task4)));
ASSERT(!task_ids_equal(task_task_id(task1), task_task_id(task5)));
ASSERT(!task_ids_equal(task_task_id(task1), task_task_id(task6)));
ASSERT(!task_ids_equal(task_task_id(task1), task_task_id(task7)));
ASSERT(!task_ids_equal(task_spec_id(spec1), task_spec_id(spec3)));
ASSERT(!task_ids_equal(task_spec_id(spec1), task_spec_id(spec4)));
ASSERT(!task_ids_equal(task_spec_id(spec1), task_spec_id(spec5)));
ASSERT(!task_ids_equal(task_spec_id(spec1), task_spec_id(spec6)));
ASSERT(!task_ids_equal(task_spec_id(spec1), task_spec_id(spec7)));
/* Check that the return object IDs are distinct from the originals. */
task_spec *tasks[6] = {task1, task3, task4, task5, task6, task7};
task_spec *specs[6] = {spec1, spec3, spec4, spec5, spec6, spec7};
for (int task_index1 = 0; task_index1 < 6; ++task_index1) {
for (int return_index1 = 0; return_index1 < 3; ++return_index1) {
for (int task_index2 = 0; task_index2 < 6; ++task_index2) {
for (int return_index2 = 0; return_index2 < 3; ++return_index2) {
if (task_index1 != task_index2 && return_index1 != return_index2) {
ASSERT(!object_ids_equal(
task_return(tasks[task_index1], return_index1),
task_return(tasks[task_index2], return_index2)));
task_return(specs[task_index1], return_index1),
task_return(specs[task_index2], return_index2)));
}
}
}
}
}
free_task_spec(task1);
free_task_spec(task2);
free_task_spec(task3);
free_task_spec(task4);
free_task_spec(task5);
free_task_spec(task6);
free_task_spec(task7);
free_task_spec(spec1);
free_task_spec(spec2);
free_task_spec(spec3);
free_task_spec(spec4);
free_task_spec(spec5);
free_task_spec(spec6);
free_task_spec(spec7);
PASS();
}
TEST send_task(void) {
task_id parent_task_id = globally_unique_id();
function_id func_id = globally_unique_id();
task_spec *task =
task_spec *spec =
start_construct_task_spec(parent_task_id, 0, func_id, 4, 2, 10);
task_args_add_ref(task, globally_unique_id());
task_args_add_val(task, (uint8_t *) "Hello", 5);
task_args_add_val(task, (uint8_t *) "World", 5);
task_args_add_ref(task, globally_unique_id());
finish_construct_task_spec(task);
task_args_add_ref(spec, globally_unique_id());
task_args_add_val(spec, (uint8_t *) "Hello", 5);
task_args_add_val(spec, (uint8_t *) "World", 5);
task_args_add_ref(spec, globally_unique_id());
finish_construct_task_spec(spec);
int fd[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, fd);
write_message(fd[0], SUBMIT_TASK, task_size(task), (uint8_t *) task);
write_message(fd[0], SUBMIT_TASK, task_spec_size(spec), (uint8_t *) spec);
int64_t type;
int64_t length;
uint8_t *message;
read_message(fd[1], &type, &length, &message);
task_spec *result = (task_spec *) message;
ASSERT(type == SUBMIT_TASK);
ASSERT(memcmp(task, result, task_size(task)) == 0);
ASSERT(memcmp(task, result, task_size(result)) == 0);
free(task);
ASSERT(memcmp(spec, result, task_spec_size(spec)) == 0);
ASSERT(memcmp(spec, result, task_spec_size(result)) == 0);
free(spec);
free(result);
PASS();
}
+4 -6
View File
@@ -5,7 +5,7 @@
#include "task.h"
task_spec *example_task(void) {
task_spec *example_task_spec(void) {
task_id parent_task_id = globally_unique_id();
function_id func_id = globally_unique_id();
task_spec *task =
@@ -16,11 +16,9 @@ task_spec *example_task(void) {
return task;
}
task_instance *example_task_instance(void) {
task_iid iid = globally_unique_id();
task_spec *spec = example_task();
task_instance *instance =
make_task_instance(iid, spec, TASK_STATUS_WAITING, NIL_ID);
task *example_task(void) {
task_spec *spec = example_task_spec();
task *instance = alloc_task(spec, TASK_STATUS_WAITING, NIL_ID);
free_task_spec(spec);
return instance;
}
+13 -13
View File
@@ -4,12 +4,12 @@
#include "utarray.h"
#include "utlist.h"
#include "state/task_log.h"
#include "state/task_table.h"
#include "photon.h"
#include "photon_scheduler.h"
typedef struct task_queue_entry {
task_instance *task;
task *task;
struct task_queue_entry *prev;
struct task_queue_entry *next;
} task_queue_entry;
@@ -102,7 +102,7 @@ int find_and_schedule_task_if_possible(scheduler_info *info,
int found_task_to_schedule = 0;
/* Find the first task whose dependencies are available locally. */
DL_FOREACH_SAFE(state->task_queue, elt, tmp) {
spec = task_instance_task_spec(elt->task);
spec = task_task_spec(elt->task);
if (can_run(state, spec)) {
found_task_to_schedule = 1;
break;
@@ -122,43 +122,43 @@ int find_and_schedule_task_if_possible(scheduler_info *info,
void handle_task_submitted(scheduler_info *info,
scheduler_state *s,
task_spec *task) {
task_spec *spec) {
/* Create a unique task instance ID. This is different from the task ID and
* is used to distinguish between potentially multiple executions of the
* task. */
task_iid task_iid = globally_unique_id();
task_instance *instance =
make_task_instance(task_iid, task, TASK_STATUS_WAITING, NIL_ID);
task *task = alloc_task(spec, TASK_STATUS_WAITING, NIL_ID);
/* If this task's dependencies are available locally, and if there is an
* available worker, then assign this task to an available worker. Otherwise,
* add this task to the local task queue. */
int schedule_locally =
(utarray_len(s->available_workers) > 0) && can_run(s, task);
(utarray_len(s->available_workers) > 0) && can_run(s, spec);
if (schedule_locally) {
/* Get the last available worker in the available worker queue. */
int *worker_index = (int *) utarray_back(s->available_workers);
/* Tell the available worker to execute the task. */
assign_task_to_worker(info, task, *worker_index);
assign_task_to_worker(info, spec, *worker_index);
/* Remove the available worker from the queue and free the struct. */
utarray_pop_back(s->available_workers);
} else {
/* Add the task to the task queue. This passes ownership of the task queue.
* And the task will be freed when it is assigned to a worker. */
task_queue_entry *elt = malloc(sizeof(task_queue_entry));
elt->task = instance;
elt->task = task;
DL_APPEND(s->task_queue, elt);
}
/* Submit the task to redis. */
/* TODO(swang): We should set these values in a config file somewhere. */
/* TODO(swang): We should set retry values in a config file somewhere. */
retry_info retry = {
.num_retries = 0, .timeout = 0, .fail_callback = NULL,
};
task_log_publish(info->db, instance, &retry, NULL, NULL);
/* TODO(swang): This should be task_table_update if the task is already in the
* log. */
task_table_add_task(info->db, task, &retry, NULL, NULL);
if (schedule_locally) {
/* If the task was scheduled locally, we need to free it. Otherwise,
* ownership of the task is passed to the task_queue, and it will be freed
* when it is assigned to a worker. */
free(instance);
free_task(task);
}
}
+1 -1
View File
@@ -42,7 +42,7 @@ void free_scheduler_state(scheduler_state *state);
*/
void handle_task_submitted(scheduler_info *info,
scheduler_state *state,
task_spec *task);
task_spec *spec);
/**
* This function will be called when a task is assigned by the global scheduler
+3 -2
View File
@@ -11,7 +11,8 @@ photon_conn *photon_connect(const char *photon_socket) {
}
void photon_submit(photon_conn *conn, task_spec *task) {
write_message(conn->conn, SUBMIT_TASK, task_size(task), (uint8_t *)task);
write_message(conn->conn, SUBMIT_TASK, task_spec_size(task),
(uint8_t *) task);
}
task_spec *photon_get_task(photon_conn *conn) {
@@ -24,7 +25,7 @@ task_spec *photon_get_task(photon_conn *conn) {
read_message(conn->conn, &type, &length, &message);
CHECK(type == EXECUTE_TASK);
task_spec *task = (task_spec *)message;
CHECK(length == task_size(task));
CHECK(length == task_spec_size(task));
return task;
}
+10 -12
View File
@@ -14,11 +14,12 @@
#include "photon_scheduler.h"
#include "plasma_client.h"
#include "state/db.h"
#include "state/task_log.h"
#include "state/task_table.h"
#include "state/object_table.h"
#include "utarray.h"
#include "uthash.h"
UT_icd task_ptr_icd = {sizeof(task_instance *), NULL, NULL, NULL};
UT_icd task_ptr_icd = {sizeof(task *), NULL, NULL, NULL};
UT_icd worker_icd = {sizeof(worker), NULL, NULL, NULL};
/** Association between the socket fd of a worker and its worker_index. */
@@ -95,11 +96,11 @@ void free_local_scheduler(local_scheduler_state *s) {
}
void assign_task_to_worker(scheduler_info *info,
task_spec *task,
task_spec *spec,
int worker_index) {
CHECK(worker_index < utarray_len(info->workers));
worker *w = (worker *) utarray_eltptr(info->workers, worker_index);
write_message(w->sock, EXECUTE_TASK, task_size(task), (uint8_t *) task);
write_message(w->sock, EXECUTE_TASK, task_spec_size(spec), (uint8_t *) spec);
}
void process_plasma_notification(event_loop *loop,
@@ -212,17 +213,14 @@ int main(int argc, char *argv[]) {
plasma_socket_name = optarg;
break;
default:
LOG_ERR("unknown option %c", c);
exit(-1);
LOG_FATAL("unknown option %c", c);
}
}
if (!scheduler_socket_name) {
LOG_ERR("please specify socket for incoming connections with -s switch");
exit(-1);
LOG_FATAL("please specify socket for incoming connections with -s switch");
}
if (!plasma_socket_name) {
LOG_ERR("please specify socket for connecting to Plasma with -p switch");
exit(-1);
LOG_FATAL("please specify socket for connecting to Plasma with -p switch");
}
/* Parse the Redis address into an IP address and a port. */
char redis_addr[16] = {0};
@@ -230,8 +228,8 @@ int main(int argc, char *argv[]) {
if (!redis_addr_port ||
sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]", redis_addr, redis_port) !=
2) {
LOG_ERR("need to specify redis address like 127.0.0.1:6379 with -r switch");
exit(-1);
LOG_FATAL(
"need to specify redis address like 127.0.0.1:6379 with -r switch");
}
start_server(scheduler_socket_name, &redis_addr[0], atoi(redis_port),
plasma_socket_name);
+2 -4
View File
@@ -127,8 +127,7 @@ uint8_t *lookup_or_mmap(plasma_connection *conn,
uint8_t *result =
mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (result == MAP_FAILED) {
LOG_ERR("mmap failed");
exit(-1);
LOG_FATAL("mmap failed");
}
close(fd);
entry = malloc(sizeof(client_mmap_table_entry));
@@ -369,8 +368,7 @@ int socket_connect_retry(const char *socket_name,
}
/* If we could not connect to the socket, exit. */
if (fd == -1) {
LOG_ERR("could not connect to socket %s", socket_name);
exit(-1);
LOG_FATAL("could not connect to socket %s", socket_name);
}
return fd;
}
+10 -15
View File
@@ -328,8 +328,9 @@ void write_object_chunk(client_connection *conn, plasma_request_buffer *buf) {
if (r > 0) {
LOG_ERR("partial write on fd %d", conn->fd);
} else {
LOG_ERR("write error");
exit(-1);
/* TODO(swang): This should not be a fatal error, since connections can
* close at any time. */
LOG_FATAL("write error");
}
} else {
conn->cursor += r;
@@ -381,7 +382,7 @@ void send_queued_request(event_loop *loop,
write_object_chunk(conn, buf);
break;
default:
LOG_ERR("Buffered request has unknown type.");
LOG_FATAL("Buffered request has unknown type.");
}
/* We are done sending this request. */
@@ -859,8 +860,7 @@ void process_message(event_loop *loop,
free(conn);
} break;
default:
LOG_ERR("invalid request %" PRId64, type);
exit(-1);
LOG_FATAL("invalid request %" PRId64, type);
}
free(req);
@@ -975,33 +975,28 @@ int main(int argc, char *argv[]) {
db_host = optarg;
break;
default:
LOG_ERR("unknown option %c", c);
exit(-1);
LOG_FATAL("unknown option %c", c);
}
}
if (!store_socket_name) {
LOG_ERR(
LOG_FATAL(
"please specify socket for connecting to the plasma store with -s "
"switch");
exit(-1);
}
if (!manager_socket_name) {
LOG_ERR(
LOG_FATAL(
"please specify socket name of the manager's local socket with -m "
"switch");
exit(-1);
}
if (!master_addr) {
LOG_ERR(
LOG_FATAL(
"please specify ip address of the current host in the format "
"123.456.789.10 with -h switch");
exit(-1);
}
if (port == -1) {
LOG_ERR(
LOG_FATAL(
"please specify port the plasma manager shall listen to in the"
"format 12345 with -p switch");
exit(-1);
}
char db_addr[16];
int db_port;
+3 -6
View File
@@ -42,8 +42,7 @@ void dlfree(void *);
void plasma_send_reply(int fd, plasma_reply *reply) {
int reply_count = sizeof(plasma_reply);
if (write(fd, reply, reply_count) != reply_count) {
LOG_ERR("write error, fd = %d", fd);
exit(-1);
LOG_FATAL("write error, fd = %d", fd);
}
}
@@ -557,12 +556,10 @@ int main(int argc, char *argv[]) {
}
}
if (!socket_name) {
LOG_ERR("please specify socket for incoming connections with -s switch");
exit(-1);
LOG_FATAL("please specify socket for incoming connections with -s switch");
}
if (system_memory == -1) {
LOG_ERR("please specify the amount of system memory with -m switch");
exit(-1);
LOG_FATAL("please specify the amount of system memory with -m switch");
}
LOG_DEBUG("starting server listening on %s", socket_name);
start_server(socket_name, system_memory);