diff --git a/src/common/state/redis.c b/src/common/state/redis.c index bf4662b90..47822f86f 100644 --- a/src/common/state/redis.c +++ b/src/common/state/redis.c @@ -56,6 +56,10 @@ db_handle *db_connect(const char *address, redisContext *context = redisConnect(address, port); CHECK_REDIS_CONNECT(redisContext, context, "could not connect to redis %s:%d", address, port); + /* Enable keyspace events. */ + reply = redisCommand(context, "CONFIG SET notify-keyspace-events AKE"); + CHECK(reply != NULL); + freeReplyObject(reply); /* Add new client using optimistic locking. */ db_client_id client = globally_unique_id(); while (true) { @@ -388,9 +392,38 @@ void redis_object_table_get_entry(redisAsyncContext *c, } } -void object_table_redis_callback(redisAsyncContext *c, - void *r, - void *privdata) { +void redis_object_table_subscribe_lookup(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, callback_data, r); + redisReply *reply = r; + + if (reply->type == REDIS_REPLY_ARRAY) { + if (reply->elements > 0) { + CHECK(reply->element[0]->len == UNIQUE_ID_SIZE); + /* Check that the reply corresponds to the right object ID. */ + CHECK(strncmp(reply->element[0]->str, callback_data->id.id, + UNIQUE_ID_SIZE)); + object_table_subscribe_data *data = callback_data->data; + if (data->object_available_callback) { + data->object_available_callback(callback_data->id, + data->subscribe_context); + } + } + } else { + LOG_FATAL("expected integer or string, received type %d", reply->type); + } + + if (callback_data->done_callback) { + object_table_done_callback done_callback = callback_data->done_callback; + done_callback(callback_data->id, callback_data->user_context); + } + event_loop_remove_timer(db->loop, callback_data->timer_id); +} + +void object_table_redis_subscribe_callback(redisAsyncContext *c, + void *r, + void *privdata) { REDIS_CALLBACK_HEADER(db, callback_data, r); redisReply *reply = r; @@ -399,19 +432,30 @@ void object_table_redis_callback(redisAsyncContext *c, CHECK(reply->elements > 2); /* If this condition is true, we got the initial message that acknowledged the * subscription. */ - if (strncmp(reply->element[1]->str, "add", 3) != 0) { - if (callback_data->done_callback) { - object_table_done_callback done_callback = callback_data->done_callback; - done_callback(callback_data->id, callback_data->user_context); + bool is_add = + reply->element[1]->str && strcmp(reply->element[1]->str, "sadd") == 0; + if (is_add) { + /* Do a lookup to see if the key has been in redis before we started the + * subscription. */ + int status = + redisAsyncCommand(db->context, redis_object_table_subscribe_lookup, + (void *) callback_data->timer_id, "SMEMBERS obj:%b", + callback_data->id.id, sizeof(callback_data->id.id)); + if ((status == REDIS_ERR) || db->context->err) { + LOG_REDIS_ERROR(db->context, + "error in redis_object_table_subscribe_callback"); } - event_loop_remove_timer(db->loop, callback_data->timer_id); return; } - /* Otherwise, parse the task and call the callback. */ - object_table_subscribe_data *data = callback_data->data; - if (data->object_available_callback) { - data->object_available_callback(callback_data->id, data->subscribe_context); + /* If the subscription is issued, parse the task and call the callback. */ + if (strcmp(reply->element[0]->str, "message") == 0) { + object_table_subscribe_data *data = callback_data->data; + + if (data->object_available_callback) { + data->object_available_callback(callback_data->id, + data->subscribe_context); + } } } @@ -420,10 +464,10 @@ void redis_object_table_subscribe(table_callback_data *callback_data) { /* subscribe to key notification associated to object id */ object_id id = callback_data->id; - int status = redisAsyncCommand(db->sub_context, object_table_redis_callback, - (void *) callback_data->timer_id, - "SUBSCRIBE __keyspace@0__:%b add", id.id, - sizeof(id.id)); + int status = redisAsyncCommand( + db->sub_context, object_table_redis_subscribe_callback, + (void *) callback_data->timer_id, "SUBSCRIBE __keyspace@0__:obj:%b sadd", + id.id, sizeof(id.id)); if ((status == REDIS_ERR) || db->sub_context->err) { LOG_REDIS_DEBUG(db->sub_context, "error in redis_object_table_subscribe_callback"); diff --git a/src/common/test/object_table_tests.c b/src/common/test/object_table_tests.c index af0ac335b..dbf579dbd 100644 --- a/src/common/test/object_table_tests.c +++ b/src/common/test/object_table_tests.c @@ -387,9 +387,12 @@ int64_t reconnect_sub_context_callback(event_loop *loop, db_handle *db = context; /* Reconnect to redis. This is not reconnecting the pub/sub channel. */ redisAsyncFree(db->sub_context); + redisAsyncFree(db->context); redisFree(db->sync_context); db->sub_context = redisAsyncConnect("127.0.0.1", 6379); db->sub_context->data = (void *) db; + db->context = redisAsyncConnect("127.0.0.1", 6379); + db->context->data = (void *) db; db->sync_context = redisConnect("127.0.0.1", 6379); /* Re-attach the database to the event loop (the file descriptor changed). */ db_attach(db, loop); @@ -631,6 +634,137 @@ TEST subscribe_success_test(void) { PASS(); } +/* Test if subscribe succeeds if the object is already present. */ + +const char *subscribe_object_present_context = "subscribe_object_present"; +int subscribe_object_present_succeeded = 0; + +void subscribe_object_present_object_available_callback(object_id object_id, + void *user_context) { + CHECK(user_context == (void *) subscribe_object_present_context); + subscribe_object_present_succeeded = 1; +} + +TEST subscribe_object_present_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_attach(db, g_loop); + unique_id id = globally_unique_id(); + retry_info retry = { + .num_retries = 0, .timeout = 100, .fail_callback = NULL, + }; + object_table_add(db, id, &retry, NULL, NULL); + object_table_subscribe( + db, id, subscribe_object_present_object_available_callback, + (void *) subscribe_object_present_context, &retry, NULL, (void *) db); + + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, + (event_loop_timer_handler) terminate_event_loop_callback, + NULL); + + event_loop_run(g_loop); + db_disconnect(db); + destroy_outstanding_callbacks(g_loop); + event_loop_destroy(g_loop); + ASSERT(subscribe_object_present_succeeded == 1); + PASS(); +} + +/* Test if subscribe is not called if object is not present. */ + +const char *subscribe_object_not_present_context = + "subscribe_object_not_present"; +int subscribe_object_not_present_succeeded = 0; + +void subscribe_object_not_present_object_available_callback( + object_id object_id, + void *user_context) { + CHECK(user_context == (void *) subscribe_object_not_present_context); + subscribe_object_not_present_succeeded = 1; +} + +TEST subscribe_object_not_present_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_attach(db, g_loop); + unique_id id = globally_unique_id(); + retry_info retry = { + .num_retries = 0, .timeout = 100, .fail_callback = NULL, + }; + object_table_subscribe( + db, id, subscribe_object_not_present_object_available_callback, + (void *) subscribe_object_not_present_context, &retry, NULL, (void *) db); + + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, + (event_loop_timer_handler) terminate_event_loop_callback, + NULL); + + event_loop_run(g_loop); + db_disconnect(db); + destroy_outstanding_callbacks(g_loop); + event_loop_destroy(g_loop); + ASSERT(subscribe_object_not_present_succeeded == 0); + PASS(); +} + +/* Test if subscribe is called if object becomes available later. */ + +const char *subscribe_object_available_later_context = + "subscribe_object_available_later"; +int subscribe_object_available_later_succeeded = 0; + +void subscribe_object_available_later_object_available_callback( + object_id object_id, + void *user_context) { + CHECK(user_context == (void *) subscribe_object_available_later_context); + /* Make sure the callback is only called once. */ + subscribe_object_available_later_succeeded += 1; +} + +int64_t add_object_callback(event_loop *loop, int64_t timer_id, void *context) { + db_handle *db = (db_handle *) context; + retry_info retry = { + .num_retries = 0, .timeout = 100, .fail_callback = NULL, + }; + object_table_add(db, NIL_ID, &retry, NULL, NULL); + /* Reset the timer to this large value, so it doesn't trigger again. */ + return 10000; +} + +TEST subscribe_object_available_later_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_attach(db, g_loop); + unique_id id = NIL_ID; + retry_info retry = { + .num_retries = 0, .timeout = 100, .fail_callback = NULL, + }; + object_table_subscribe( + db, id, subscribe_object_available_later_object_available_callback, + (void *) subscribe_object_available_later_context, &retry, NULL, + (void *) db); + + event_loop_add_timer(g_loop, 300, + (event_loop_timer_handler) add_object_callback, db); + + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, + (event_loop_timer_handler) terminate_event_loop_callback, + NULL); + + event_loop_run(g_loop); + db_disconnect(db); + destroy_outstanding_callbacks(g_loop); + event_loop_destroy(g_loop); + ASSERT(subscribe_object_available_later_succeeded == 1); + PASS(); +} + SUITE(object_table_tests) { RUN_REDIS_TEST(new_object_test); RUN_REDIS_TEST(new_object_no_task_test); @@ -644,6 +778,9 @@ SUITE(object_table_tests) { RUN_REDIS_TEST(add_late_test); RUN_REDIS_TEST(subscribe_late_test); RUN_REDIS_TEST(subscribe_success_test); + RUN_REDIS_TEST(subscribe_object_present_test); + RUN_REDIS_TEST(subscribe_object_not_present_test); + RUN_REDIS_TEST(subscribe_object_available_later_test); } GREATEST_MAIN_DEFS();