diff --git a/src/common/redis_module/ray_redis_module.c b/src/common/redis_module/ray_redis_module.c index 6c7dfc6be..d2aa978ed 100644 --- a/src/common/redis_module/ray_redis_module.c +++ b/src/common/redis_module/ray_redis_module.c @@ -25,6 +25,7 @@ #define OBJECT_LOCATION_PREFIX "OL:" #define OBJECT_NOTIFICATION_PREFIX "ON:" #define TASK_PREFIX "TT:" +#define OBJECT_BCAST "BCAST" #define OBJECT_CHANNEL_PREFIX "OC:" @@ -228,11 +229,26 @@ int ObjectTableLookup_RedisCommand(RedisModuleCtx *ctx, bool PublishObjectNotification(RedisModuleCtx *ctx, RedisModuleString *client_id, RedisModuleString *object_id, + RedisModuleString *data_size, RedisModuleKey *key) { /* Create a string formatted as " MANAGERS * ..." */ RedisModuleString *manager_list = RedisModule_CreateStringFromString(ctx, object_id); + long long data_size_value; + if (RedisModule_StringToLongLong(data_size, &data_size_value) != + REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx, "data_size must be integer"); + } + + /* Add a space to the payload for human readability. */ + RedisModule_StringAppendBuffer(ctx, manager_list, " ", strlen(" ")); + + /* Append binary data size for this object. */ + RedisModule_StringAppendBuffer(ctx, manager_list, + (const char *) &data_size_value, + sizeof(data_size_value)); + RedisModule_StringAppendBuffer(ctx, manager_list, " MANAGERS", strlen(" MANAGERS")); @@ -328,6 +344,16 @@ int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx, /* Sets are not implemented yet, so we use ZSETs instead. */ RedisModule_ZsetAdd(table_key, 0.0, manager, NULL); + RedisModuleString *bcast_client_str = + RedisModule_CreateString(ctx, OBJECT_BCAST, strlen(OBJECT_BCAST)); + bool success = PublishObjectNotification(ctx, bcast_client_str, object_id, + data_size, table_key); + if (!success) { + /* The publish failed somehow. */ + return RedisModule_ReplyWithError(ctx, "PUBLISH BCAST unsuccessful"); + } + RedisModule_FreeString(ctx, bcast_client_str); + /* Get the zset of clients that requested a notification about the * availability of this object. */ RedisModuleKey *object_notification_key = @@ -344,14 +370,15 @@ int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx, /* Iterate over the list of clients that requested notifiations about the * availability of this object, and publish notifications to their object * notification channels. */ + do { RedisModuleString *client_id = RedisModule_ZsetRangeCurrentElement(object_notification_key, NULL); /* TODO(rkn): Some computation could be saved by batching the string * constructions in the multiple calls to PublishObjectNotification * together. */ - bool success = - PublishObjectNotification(ctx, client_id, object_id, table_key); + bool success = PublishObjectNotification(ctx, client_id, object_id, + data_size, table_key); if (!success) { /* The publish failed somehow. */ RedisModule_CloseKey(object_notification_key); @@ -420,7 +447,23 @@ int ObjectTableRequestNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModule_CloseKey(object_notification_key); } else { /* Publish a notification to the client's object notification channel. */ - bool success = PublishObjectNotification(ctx, client_id, object_id, key); + /* Extract the data_size first. */ + RedisModuleKey *object_info_key; + object_info_key = + OpenPrefixedKey(ctx, OBJECT_INFO_PREFIX, object_id, REDISMODULE_READ); + int keytype = RedisModule_KeyType(key); + if (keytype == REDISMODULE_KEYTYPE_EMPTY) { + RedisModule_CloseKey(object_info_key); + RedisModule_CloseKey(key); + return RedisModule_ReplyWithError(ctx, "requested object not found"); + } + RedisModuleString *existing_data_size; + RedisModule_HashGet(object_info_key, REDISMODULE_HASH_CFIELDS, + "data_size", &existing_data_size, NULL); + RedisModule_CloseKey(object_info_key); /* No longer needed. */ + + bool success = PublishObjectNotification(ctx, client_id, object_id, + existing_data_size, key); if (!success) { /* The publish failed somehow. */ RedisModule_CloseKey(key); diff --git a/src/common/redis_module/runtest.py b/src/common/redis_module/runtest.py index 10aae24b5..a6c075e41 100644 --- a/src/common/redis_module/runtest.py +++ b/src/common/redis_module/runtest.py @@ -5,6 +5,7 @@ from __future__ import print_function import os import random import subprocess +import sys import time import unittest import redis @@ -24,6 +25,20 @@ OBJECT_SUBSCRIBE_PREFIX = "OS:" TASK_PREFIX = "TT:" OBJECT_CHANNEL_PREFIX = "OC:" +def integerToAsciiHex(num, numbytes): + retstr = b"" + # Support 32 and 64 bit architecture. + assert(numbytes == 4 or numbytes == 8) + for i in range(numbytes): + curbyte = num & 0xff + if sys.version_info >= (3, 0): + retstr += bytes([curbyte]) + else: + retstr += chr(curbyte) + num = num >> 8 + + return retstr + class TestGlobalStateStore(unittest.TestCase): def setUp(self): @@ -37,6 +52,7 @@ class TestGlobalStateStore(unittest.TestCase): def tearDown(self): self.redis_process.kill() + def testInvalidObjectTableAdd(self): # Check that Redis returns an error when RAY.OBJECT_TABLE_ADD is called with # the wrong arguments. @@ -86,28 +102,33 @@ class TestGlobalStateStore(unittest.TestCase): self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", 1, "\x00hash2", "manager_id1") def testObjectTableSubscribeToNotifications(self): + data_size = 0xf1f0 p = self.redis.pubsub() # Subscribe to an object ID. p.psubscribe("{}manager_id1".format(OBJECT_CHANNEL_PREFIX)) - self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id2") + self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", data_size, "hash1", "manager_id2") # Receive the acknowledgement message. self.assertEqual(p.get_message()["data"], 1) # Request a notification and receive the data. self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id1") - self.assertEqual(p.get_message()["data"], b"object_id1 MANAGERS manager_id2") + self.assertEqual(p.get_message()["data"], b"object_id1 %s MANAGERS manager_id2"\ + %integerToAsciiHex(data_size, 8)) # Request a notification for an object that isn't there. Then add the object # and receive the data. Only the first call to RAY.OBJECT_TABLE_ADD should # trigger notifications. self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id2", "object_id3") - self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", 1, "hash1", "manager_id1") - self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", 1, "hash1", "manager_id2") - self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", 1, "hash1", "manager_id3") - self.assertEqual(p.get_message()["data"], b"object_id3 MANAGERS manager_id1") - self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id2", 1, "hash1", "manager_id3") - self.assertEqual(p.get_message()["data"], b"object_id2 MANAGERS manager_id3") + self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id1") + self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id2") + self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id3") + self.assertEqual(p.get_message()["data"], b"object_id3 %s MANAGERS manager_id1"\ + %integerToAsciiHex(data_size, 8)) + self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id2", data_size, "hash1", "manager_id3") + self.assertEqual(p.get_message()["data"], b"object_id2 %s MANAGERS manager_id3"\ + %integerToAsciiHex(data_size, 8)) # Request notifications for object_id3 again. self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id3") - self.assertEqual(p.get_message()["data"], b"object_id3 MANAGERS manager_id1 manager_id2 manager_id3") + self.assertEqual(p.get_message()["data"], b"object_id3 %s MANAGERS manager_id1 manager_id2 manager_id3"\ + %integerToAsciiHex(data_size, 8)) def testResultTableAddAndLookup(self): response = self.redis.execute_command("RAY.RESULT_TABLE_LOOKUP", "object_id1") diff --git a/src/common/state/object_table.c b/src/common/state/object_table.c index 56fffb834..2ad5d15ff 100644 --- a/src/common/state/object_table.c +++ b/src/common/state/object_table.c @@ -30,6 +30,7 @@ void object_table_add(db_handle *db_handle, void object_table_subscribe_to_notifications( db_handle *db_handle, + bool subscribe_all, object_table_object_available_callback object_available_callback, void *subscribe_context, retry_info *retry, @@ -40,6 +41,7 @@ void object_table_subscribe_to_notifications( malloc(sizeof(object_table_subscribe_data)); sub_data->object_available_callback = object_available_callback; sub_data->subscribe_context = subscribe_context; + sub_data->subscribe_all = subscribe_all; init_table_callback( db_handle, NIL_OBJECT_ID, __func__, sub_data, retry, done_callback, diff --git a/src/common/state/object_table.h b/src/common/state/object_table.h index 32515c496..7a705d5c4 100644 --- a/src/common/state/object_table.h +++ b/src/common/state/object_table.h @@ -19,6 +19,14 @@ typedef void (*object_table_lookup_done_callback)( OWNER const char *manager_vector[], void *user_context); +/* Callback called when object object_id is available. */ +typedef void (*object_table_object_available_callback)( + object_id object_id, + int64_t data_size, + int manager_count, + OWNER const char *manager_vector[], + void *user_context); + /** * Return the list of nodes storing object_id in their plasma stores. * @@ -97,10 +105,6 @@ void object_table_remove(db_handle *db, * ==== Subscribe to be announced when new object available ==== */ -/* Callback called when object object_id is available. */ -typedef object_table_lookup_done_callback - object_table_object_available_callback; - /** * Set up a client-specific channel for receiving notifications about available * objects from the object table. The callback will be called once per @@ -120,6 +124,7 @@ typedef object_table_lookup_done_callback */ void object_table_subscribe_to_notifications( db_handle *db_handle, + bool subscribe_all, object_table_object_available_callback object_available_callback, void *subscribe_context, retry_info *retry, @@ -153,6 +158,7 @@ typedef struct { /** Data that is needed to register new object available callbacks with the * state database. */ typedef struct { + bool subscribe_all; object_table_object_available_callback object_available_callback; void *subscribe_context; } object_table_subscribe_data; diff --git a/src/common/state/redis.c b/src/common/state/redis.c index 996027298..41b2fc0dd 100644 --- a/src/common/state/redis.c +++ b/src/common/state/redis.c @@ -548,13 +548,16 @@ object_id parse_subscribe_to_notifications_payload( db_handle *db, char *payload, int length, + int64_t *data_size, int *manager_count, const char ***manager_vector) { - int num_managers = (length - sizeof(object_id) - 1 - strlen("MANAGERS")) / + long long data_size_value = 0; + int num_managers = (length - sizeof(object_id) - 1 - sizeof(data_size_value) - + 1 - strlen("MANAGERS")) / (1 + sizeof(db_client_id)); CHECK(length == - sizeof(object_id) + 1 + strlen("MANAGERS") + - num_managers * (1 + sizeof(db_client_id))); + sizeof(object_id) + 1 + sizeof(data_size_value) + 1 + + strlen("MANAGERS") + num_managers * (1 + sizeof(db_client_id))); CHECK(num_managers > 0); object_id obj_id; /* Track our current offset in the payload. */ @@ -562,7 +565,14 @@ object_id parse_subscribe_to_notifications_payload( /* Parse the object ID. */ memcpy(&obj_id.id, &payload[offset], sizeof(obj_id.id)); offset += sizeof(obj_id.id); - /* The next part of the payload is the string " MANAGERS". */ + /* The next part of the payload is a space. */ + char *space_str = " "; + CHECK(memcmp(&payload[offset], space_str, strlen(space_str)) == 0); + offset += strlen(space_str); + /* The next part of the payload is binary data_size. */ + memcpy(&data_size_value, &payload[offset], sizeof(data_size_value)); + offset += sizeof(data_size_value); + /* The next part of the payload is the string " MANAGERS" with leading ' '. */ char *managers_str = " MANAGERS"; CHECK(memcmp(&payload[offset], managers_str, strlen(managers_str)) == 0); offset += strlen(managers_str); @@ -583,6 +593,7 @@ object_id parse_subscribe_to_notifications_payload( /* Return the manager array and the object ID. */ *manager_count = num_managers; *manager_vector = managers; + *data_size = data_size_value; return obj_id; } @@ -613,16 +624,17 @@ void object_table_redis_subscribe_to_notifications_callback( if (strcmp(message_type->str, "message") == 0) { /* Handle an object notification. */ + int64_t data_size = 0; int manager_count; const char **manager_vector; object_id obj_id = parse_subscribe_to_notifications_payload( - db, reply->element[2]->str, reply->element[2]->len, &manager_count, - &manager_vector); + db, reply->element[2]->str, reply->element[2]->len, &data_size, + &manager_count, &manager_vector); /* Call the subscribe callback. */ object_table_subscribe_data *data = callback_data->data; if (data->object_available_callback) { - data->object_available_callback(obj_id, manager_count, manager_vector, - data->subscribe_context); + data->object_available_callback(obj_id, data_size, manager_count, + manager_vector, data->subscribe_context); } free(manager_vector); } else if (strcmp(message_type->str, "subscribe") == 0) { @@ -650,13 +662,26 @@ void redis_object_table_subscribe_to_notifications( /* The object channel prefix must match the value defined in * src/common/redismodule/ray_redis_module.c. */ const char *object_channel_prefix = "OC:"; + const char *object_channel_bcast = "BCAST"; + int status = REDIS_OK; /* Subscribe to notifications from the object table. This uses the client ID * as the channel name so this channel is specific to this client. TODO(rkn): * The channel name should probably be the client ID with some prefix. */ - int status = redisAsyncCommand( - db->sub_context, object_table_redis_subscribe_to_notifications_callback, - (void *) callback_data->timer_id, "SUBSCRIBE %s%b", object_channel_prefix, - db->client.id, sizeof(db->client.id)); + CHECKM(callback_data->data != NULL, + "Object table subscribe data passed as NULL."); + if (((object_table_subscribe_data *) (callback_data->data))->subscribe_all) { + /* Subscribe to the object broadcast channel. */ + status = redisAsyncCommand( + db->sub_context, object_table_redis_subscribe_to_notifications_callback, + (void *) callback_data->timer_id, "SUBSCRIBE %s%s", + object_channel_prefix, object_channel_bcast); + } else { + status = redisAsyncCommand( + db->sub_context, object_table_redis_subscribe_to_notifications_callback, + (void *) callback_data->timer_id, "SUBSCRIBE %s%b", + object_channel_prefix, db->client.id, sizeof(db->client.id)); + } + if ((status == REDIS_ERR) || db->sub_context->err) { LOG_REDIS_DEBUG(db->sub_context, "error in redis_object_table_subscribe_to_notifications"); diff --git a/src/common/test/object_table_tests.c b/src/common/test/object_table_tests.c index 05fb5c4a5..c47a51489 100644 --- a/src/common/test/object_table_tests.c +++ b/src/common/test/object_table_tests.c @@ -219,6 +219,7 @@ TEST add_timeout_test(void) { int subscribe_failed = 0; void subscribe_done_callback(object_id object_id, + int64_t data_size, int manager_count, const char *manager_vector[], void *user_context) { @@ -243,8 +244,8 @@ TEST subscribe_timeout_test(void) { .timeout = 100, .fail_callback = subscribe_fail_callback, }; - object_table_subscribe_to_notifications(db, subscribe_done_callback, NULL, - &retry, NULL, NULL); + object_table_subscribe_to_notifications(db, false, subscribe_done_callback, + NULL, &retry, NULL, NULL); /* Disconnect the database to see if the lookup times out. */ close(db->sub_context->c.fd); event_loop_run(g_loop); @@ -472,7 +473,7 @@ TEST subscribe_retry_test(void) { .timeout = 100, .fail_callback = subscribe_retry_fail_callback, }; - object_table_subscribe_to_notifications(db, NULL, NULL, &retry, + object_table_subscribe_to_notifications(db, false, NULL, NULL, &retry, subscribe_retry_done_callback, (void *) subscribe_retry_context); /* Disconnect the database to let the subscribe times out the first time. */ @@ -612,7 +613,7 @@ TEST subscribe_late_test(void) { .timeout = 0, .fail_callback = subscribe_late_fail_callback, }; - object_table_subscribe_to_notifications(db, NULL, NULL, &retry, + object_table_subscribe_to_notifications(db, false, NULL, NULL, &retry, subscribe_late_done_callback, (void *) subscribe_late_context); /* Install handler for terminating the event loop. */ @@ -657,6 +658,7 @@ void subscribe_success_done_callback(object_id object_id, } void subscribe_success_object_available_callback(object_id object_id, + int64_t data_size, int manager_count, const char *manager_vector[], void *user_context) { @@ -679,7 +681,7 @@ TEST subscribe_success_test(void) { .fail_callback = subscribe_success_fail_callback, }; object_table_subscribe_to_notifications( - db, subscribe_success_object_available_callback, + db, false, subscribe_success_object_available_callback, (void *) subscribe_success_context, &retry, subscribe_success_done_callback, (void *) db); @@ -702,16 +704,24 @@ TEST subscribe_success_test(void) { } /* Test if subscribe succeeds if the object is already present. */ +typedef struct { + const char *teststr; + int64_t data_size; +} subscribe_object_present_context_t; -const char *subscribe_object_present_context = "subscribe_object_present"; +const char *subscribe_object_present_str = "subscribe_object_present"; int subscribe_object_present_succeeded = 0; void subscribe_object_present_object_available_callback( object_id object_id, + int64_t data_size, int manager_count, const char *manager_vector[], void *user_context) { - CHECK(user_context == (void *) subscribe_object_present_context); + subscribe_object_present_context_t *ctx = + (subscribe_object_present_context_t *) user_context; + CHECK(ctx->data_size == data_size); + CHECK(strcmp(subscribe_object_present_str, ctx->teststr) == 0); subscribe_object_present_succeeded = 1; CHECK(manager_count == 1); } @@ -722,6 +732,10 @@ void fatal_fail_callback(unique_id id, void *user_context, void *user_data) { } TEST subscribe_object_present_test(void) { + int64_t data_size = 0xF1F0; + subscribe_object_present_context_t myctx = {subscribe_object_present_str, + data_size}; + g_loop = event_loop_create(); db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); @@ -730,10 +744,11 @@ TEST subscribe_object_present_test(void) { retry_info retry = { .num_retries = 0, .timeout = 100, .fail_callback = fatal_fail_callback, }; - object_table_add(db, id, 0, (unsigned char *) NIL_DIGEST, &retry, NULL, NULL); + object_table_add(db, id, data_size, (unsigned char *) NIL_DIGEST, &retry, + NULL, NULL); object_table_subscribe_to_notifications( - db, subscribe_object_present_object_available_callback, - (void *) subscribe_object_present_context, &retry, NULL, (void *) db); + db, false, subscribe_object_present_object_available_callback, + (void *) &myctx, &retry, NULL, (void *) db); /* Install handler for terminating the event loop. */ event_loop_add_timer(g_loop, 750, (event_loop_timer_handler) terminate_event_loop_callback, @@ -764,6 +779,7 @@ const char *subscribe_object_not_present_context = void subscribe_object_not_present_object_available_callback( object_id object_id, + int64_t data_size, int manager_count, const char *manager_vector[], void *user_context) { @@ -781,7 +797,7 @@ TEST subscribe_object_not_present_test(void) { .num_retries = 0, .timeout = 100, .fail_callback = NULL, }; object_table_subscribe_to_notifications( - db, subscribe_object_not_present_object_available_callback, + db, false, subscribe_object_not_present_object_available_callback, (void *) subscribe_object_not_present_context, &retry, NULL, (void *) db); /* Install handler for terminating the event loop. */ event_loop_add_timer(g_loop, 750, @@ -813,16 +829,26 @@ int subscribe_object_available_later_succeeded = 0; void subscribe_object_available_later_object_available_callback( object_id object_id, + int64_t data_size, int manager_count, const char *manager_vector[], void *user_context) { - CHECK(user_context == (void *) subscribe_object_available_later_context); + subscribe_object_present_context_t *myctx = + (subscribe_object_present_context_t *) user_context; + CHECK(myctx->data_size == data_size); + CHECK(strcmp(myctx->teststr, subscribe_object_available_later_context) == 0); /* Make sure the callback is only called once. */ subscribe_object_available_later_succeeded += 1; CHECK(manager_count == 1); } TEST subscribe_object_available_later_test(void) { + int64_t data_size = 0xF1F0; + subscribe_object_present_context_t *myctx = + malloc(sizeof(subscribe_object_present_context_t)); + myctx->teststr = subscribe_object_available_later_context; + myctx->data_size = data_size; + g_loop = event_loop_create(); db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); @@ -832,9 +858,8 @@ TEST subscribe_object_available_later_test(void) { .num_retries = 0, .timeout = 100, .fail_callback = NULL, }; object_table_subscribe_to_notifications( - db, subscribe_object_available_later_object_available_callback, - (void *) subscribe_object_available_later_context, &retry, NULL, - (void *) db); + db, false, subscribe_object_available_later_object_available_callback, + (void *) myctx, &retry, NULL, (void *) db); /* Install handler for terminating the event loop. */ event_loop_add_timer(g_loop, 750, (event_loop_timer_handler) terminate_event_loop_callback, @@ -852,7 +877,8 @@ TEST subscribe_object_available_later_test(void) { event_loop_run(g_loop); ASSERT_EQ(subscribe_object_available_later_succeeded, 0); - object_table_add(db, id, 0, (unsigned char *) NIL_DIGEST, &retry, NULL, NULL); + object_table_add(db, id, data_size, (unsigned char *) NIL_DIGEST, &retry, + NULL, NULL); /* Install handler for terminating the event loop. */ event_loop_add_timer(g_loop, 750, (event_loop_timer_handler) terminate_event_loop_callback, @@ -864,6 +890,57 @@ TEST subscribe_object_available_later_test(void) { destroy_outstanding_callbacks(g_loop); event_loop_destroy(g_loop); ASSERT_EQ(subscribe_object_available_later_succeeded, 1); + /* Reset the global variable before exiting this unit test. */ + subscribe_object_available_later_succeeded = 0; + free(myctx); + PASS(); +} + +TEST subscribe_object_available_subscribe_all(void) { + int64_t data_size = 0xF1F0; + subscribe_object_present_context_t myctx = { + subscribe_object_available_later_context, data_size}; + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_attach(db, g_loop, false); + unique_id id = globally_unique_id(); + retry_info retry = { + .num_retries = 0, .timeout = 100, .fail_callback = NULL, + }; + object_table_subscribe_to_notifications( + db, true, subscribe_object_available_later_object_available_callback, + (void *) &myctx, &retry, NULL, (void *) db); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, + (event_loop_timer_handler) terminate_event_loop_callback, + NULL); + /* Run the event loop to do the subscribe. */ + event_loop_run(g_loop); + + /* At this point we don't expect any object notifications received. */ + ASSERT_EQ(subscribe_object_available_later_succeeded, 0); + object_table_add(db, id, data_size, (unsigned char *) NIL_DIGEST, &retry, + NULL, NULL); + /* Install handler to terminate event loop after 750ms. */ + event_loop_add_timer(g_loop, 750, + (event_loop_timer_handler) terminate_event_loop_callback, + NULL); + /* Run the event loop to do the object table add. */ + event_loop_run(g_loop); + /* At this point we assume that object table add completed. */ + + db_disconnect(db); + destroy_outstanding_callbacks(g_loop); + event_loop_destroy(g_loop); + /* Assert that the object table add completed and notification callback fired. + */ + printf("subscribe_all object info test: callback fired: %d times\n", + subscribe_object_available_later_succeeded); + fflush(stdout); + ASSERT_EQ(subscribe_object_available_later_succeeded, 1); + /* Reset the global variable before exiting this unit test. */ + subscribe_object_available_later_succeeded = 0; PASS(); } @@ -954,6 +1031,7 @@ SUITE(object_table_tests) { RUN_REDIS_TEST(subscribe_object_present_test); RUN_REDIS_TEST(subscribe_object_not_present_test); RUN_REDIS_TEST(subscribe_object_available_later_test); + RUN_REDIS_TEST(subscribe_object_available_subscribe_all); // RUN_REDIS_TEST(subscribe_object_info_success_test); } diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.c index e520d38f4..9082a7343 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.c @@ -963,6 +963,7 @@ void object_present_callback(object_id object_id, /* This callback is used by both fetch and wait. Therefore, it may have to * handle outstanding fetch and wait requests. */ void object_table_subscribe_callback(object_id object_id, + int64_t data_size, int manager_count, const char *manager_vector[], void *context) { @@ -1378,7 +1379,7 @@ void start_server(const char *store_socket_name, handle_new_client, g_manager_state); /* Set up a client-specific channel to receive notifications from the object * table. */ - object_table_subscribe_to_notifications(g_manager_state->db, + object_table_subscribe_to_notifications(g_manager_state->db, false, object_table_subscribe_callback, g_manager_state, NULL, NULL, NULL); /* Run the event loop. */