Object table subscribe with new semantics (#62)

* new plasma subscribe implementation

* object table subscribe with test

* clang-format

* fix

* fix test

* fix tests

* fix clang-format

* add check

* final clang-format

* final fixes

* fix clang-format
This commit is contained in:
Philipp Moritz
2016-11-27 21:26:23 -08:00
committed by Robert Nishihara
parent bc1d7db926
commit c7073d623b
2 changed files with 197 additions and 16 deletions
+60 -16
View File
@@ -56,6 +56,10 @@ db_handle *db_connect(const char *address,
redisContext *context = redisConnect(address, port);
CHECK_REDIS_CONNECT(redisContext, context, "could not connect to redis %s:%d",
address, port);
/* Enable keyspace events. */
reply = redisCommand(context, "CONFIG SET notify-keyspace-events AKE");
CHECK(reply != NULL);
freeReplyObject(reply);
/* Add new client using optimistic locking. */
db_client_id client = globally_unique_id();
while (true) {
@@ -388,9 +392,38 @@ void redis_object_table_get_entry(redisAsyncContext *c,
}
}
void object_table_redis_callback(redisAsyncContext *c,
void *r,
void *privdata) {
void redis_object_table_subscribe_lookup(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r);
redisReply *reply = r;
if (reply->type == REDIS_REPLY_ARRAY) {
if (reply->elements > 0) {
CHECK(reply->element[0]->len == UNIQUE_ID_SIZE);
/* Check that the reply corresponds to the right object ID. */
CHECK(strncmp(reply->element[0]->str, callback_data->id.id,
UNIQUE_ID_SIZE));
object_table_subscribe_data *data = callback_data->data;
if (data->object_available_callback) {
data->object_available_callback(callback_data->id,
data->subscribe_context);
}
}
} else {
LOG_FATAL("expected integer or string, received type %d", reply->type);
}
if (callback_data->done_callback) {
object_table_done_callback done_callback = callback_data->done_callback;
done_callback(callback_data->id, callback_data->user_context);
}
event_loop_remove_timer(db->loop, callback_data->timer_id);
}
void object_table_redis_subscribe_callback(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r);
redisReply *reply = r;
@@ -399,19 +432,30 @@ void object_table_redis_callback(redisAsyncContext *c,
CHECK(reply->elements > 2);
/* If this condition is true, we got the initial message that acknowledged the
* subscription. */
if (strncmp(reply->element[1]->str, "add", 3) != 0) {
if (callback_data->done_callback) {
object_table_done_callback done_callback = callback_data->done_callback;
done_callback(callback_data->id, callback_data->user_context);
bool is_add =
reply->element[1]->str && strcmp(reply->element[1]->str, "sadd") == 0;
if (is_add) {
/* Do a lookup to see if the key has been in redis before we started the
* subscription. */
int status =
redisAsyncCommand(db->context, redis_object_table_subscribe_lookup,
(void *) callback_data->timer_id, "SMEMBERS obj:%b",
callback_data->id.id, sizeof(callback_data->id.id));
if ((status == REDIS_ERR) || db->context->err) {
LOG_REDIS_ERROR(db->context,
"error in redis_object_table_subscribe_callback");
}
event_loop_remove_timer(db->loop, callback_data->timer_id);
return;
}
/* Otherwise, parse the task and call the callback. */
object_table_subscribe_data *data = callback_data->data;
if (data->object_available_callback) {
data->object_available_callback(callback_data->id, data->subscribe_context);
/* If the subscription is issued, parse the task and call the callback. */
if (strcmp(reply->element[0]->str, "message") == 0) {
object_table_subscribe_data *data = callback_data->data;
if (data->object_available_callback) {
data->object_available_callback(callback_data->id,
data->subscribe_context);
}
}
}
@@ -420,10 +464,10 @@ void redis_object_table_subscribe(table_callback_data *callback_data) {
/* subscribe to key notification associated to object id */
object_id id = callback_data->id;
int status = redisAsyncCommand(db->sub_context, object_table_redis_callback,
(void *) callback_data->timer_id,
"SUBSCRIBE __keyspace@0__:%b add", id.id,
sizeof(id.id));
int status = redisAsyncCommand(
db->sub_context, object_table_redis_subscribe_callback,
(void *) callback_data->timer_id, "SUBSCRIBE __keyspace@0__:obj:%b sadd",
id.id, sizeof(id.id));
if ((status == REDIS_ERR) || db->sub_context->err) {
LOG_REDIS_DEBUG(db->sub_context,
"error in redis_object_table_subscribe_callback");
+137
View File
@@ -387,9 +387,12 @@ int64_t reconnect_sub_context_callback(event_loop *loop,
db_handle *db = context;
/* Reconnect to redis. This is not reconnecting the pub/sub channel. */
redisAsyncFree(db->sub_context);
redisAsyncFree(db->context);
redisFree(db->sync_context);
db->sub_context = redisAsyncConnect("127.0.0.1", 6379);
db->sub_context->data = (void *) db;
db->context = redisAsyncConnect("127.0.0.1", 6379);
db->context->data = (void *) db;
db->sync_context = redisConnect("127.0.0.1", 6379);
/* Re-attach the database to the event loop (the file descriptor changed). */
db_attach(db, loop);
@@ -631,6 +634,137 @@ TEST subscribe_success_test(void) {
PASS();
}
/* Test if subscribe succeeds if the object is already present. */
const char *subscribe_object_present_context = "subscribe_object_present";
int subscribe_object_present_succeeded = 0;
void subscribe_object_present_object_available_callback(object_id object_id,
void *user_context) {
CHECK(user_context == (void *) subscribe_object_present_context);
subscribe_object_present_succeeded = 1;
}
TEST subscribe_object_present_test(void) {
g_loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236);
db_attach(db, g_loop);
unique_id id = globally_unique_id();
retry_info retry = {
.num_retries = 0, .timeout = 100, .fail_callback = NULL,
};
object_table_add(db, id, &retry, NULL, NULL);
object_table_subscribe(
db, id, subscribe_object_present_object_available_callback,
(void *) subscribe_object_present_context, &retry, NULL, (void *) db);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
event_loop_destroy(g_loop);
ASSERT(subscribe_object_present_succeeded == 1);
PASS();
}
/* Test if subscribe is not called if object is not present. */
const char *subscribe_object_not_present_context =
"subscribe_object_not_present";
int subscribe_object_not_present_succeeded = 0;
void subscribe_object_not_present_object_available_callback(
object_id object_id,
void *user_context) {
CHECK(user_context == (void *) subscribe_object_not_present_context);
subscribe_object_not_present_succeeded = 1;
}
TEST subscribe_object_not_present_test(void) {
g_loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236);
db_attach(db, g_loop);
unique_id id = globally_unique_id();
retry_info retry = {
.num_retries = 0, .timeout = 100, .fail_callback = NULL,
};
object_table_subscribe(
db, id, subscribe_object_not_present_object_available_callback,
(void *) subscribe_object_not_present_context, &retry, NULL, (void *) db);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
event_loop_destroy(g_loop);
ASSERT(subscribe_object_not_present_succeeded == 0);
PASS();
}
/* Test if subscribe is called if object becomes available later. */
const char *subscribe_object_available_later_context =
"subscribe_object_available_later";
int subscribe_object_available_later_succeeded = 0;
void subscribe_object_available_later_object_available_callback(
object_id object_id,
void *user_context) {
CHECK(user_context == (void *) subscribe_object_available_later_context);
/* Make sure the callback is only called once. */
subscribe_object_available_later_succeeded += 1;
}
int64_t add_object_callback(event_loop *loop, int64_t timer_id, void *context) {
db_handle *db = (db_handle *) context;
retry_info retry = {
.num_retries = 0, .timeout = 100, .fail_callback = NULL,
};
object_table_add(db, NIL_ID, &retry, NULL, NULL);
/* Reset the timer to this large value, so it doesn't trigger again. */
return 10000;
}
TEST subscribe_object_available_later_test(void) {
g_loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236);
db_attach(db, g_loop);
unique_id id = NIL_ID;
retry_info retry = {
.num_retries = 0, .timeout = 100, .fail_callback = NULL,
};
object_table_subscribe(
db, id, subscribe_object_available_later_object_available_callback,
(void *) subscribe_object_available_later_context, &retry, NULL,
(void *) db);
event_loop_add_timer(g_loop, 300,
(event_loop_timer_handler) add_object_callback, db);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
event_loop_destroy(g_loop);
ASSERT(subscribe_object_available_later_succeeded == 1);
PASS();
}
SUITE(object_table_tests) {
RUN_REDIS_TEST(new_object_test);
RUN_REDIS_TEST(new_object_no_task_test);
@@ -644,6 +778,9 @@ SUITE(object_table_tests) {
RUN_REDIS_TEST(add_late_test);
RUN_REDIS_TEST(subscribe_late_test);
RUN_REDIS_TEST(subscribe_success_test);
RUN_REDIS_TEST(subscribe_object_present_test);
RUN_REDIS_TEST(subscribe_object_not_present_test);
RUN_REDIS_TEST(subscribe_object_available_later_test);
}
GREATEST_MAIN_DEFS();