mirror of
https://github.com/wassname/ray.git
synced 2026-07-05 09:24:28 +08:00
passing object info information with redis module (#138)
* adding object broadcast channel; published on each object table add * publishing data size to the bcast channel * bug fix: objectkey * update object tests to test for data size: C + py * remove debug * clang format * Minor changes. * Fix error. * merging with Robert's comments * clang format for the object table test upgrade
This commit is contained in:
committed by
Robert Nishihara
parent
269f37e26f
commit
cb3e6cde9e
@@ -25,6 +25,7 @@
|
||||
#define OBJECT_LOCATION_PREFIX "OL:"
|
||||
#define OBJECT_NOTIFICATION_PREFIX "ON:"
|
||||
#define TASK_PREFIX "TT:"
|
||||
#define OBJECT_BCAST "BCAST"
|
||||
|
||||
#define OBJECT_CHANNEL_PREFIX "OC:"
|
||||
|
||||
@@ -228,11 +229,26 @@ int ObjectTableLookup_RedisCommand(RedisModuleCtx *ctx,
|
||||
bool PublishObjectNotification(RedisModuleCtx *ctx,
|
||||
RedisModuleString *client_id,
|
||||
RedisModuleString *object_id,
|
||||
RedisModuleString *data_size,
|
||||
RedisModuleKey *key) {
|
||||
/* Create a string formatted as "<object id> MANAGERS <manager id1>
|
||||
* <manager id2> ..." */
|
||||
RedisModuleString *manager_list =
|
||||
RedisModule_CreateStringFromString(ctx, object_id);
|
||||
long long data_size_value;
|
||||
if (RedisModule_StringToLongLong(data_size, &data_size_value) !=
|
||||
REDISMODULE_OK) {
|
||||
return RedisModule_ReplyWithError(ctx, "data_size must be integer");
|
||||
}
|
||||
|
||||
/* Add a space to the payload for human readability. */
|
||||
RedisModule_StringAppendBuffer(ctx, manager_list, " ", strlen(" "));
|
||||
|
||||
/* Append binary data size for this object. */
|
||||
RedisModule_StringAppendBuffer(ctx, manager_list,
|
||||
(const char *) &data_size_value,
|
||||
sizeof(data_size_value));
|
||||
|
||||
RedisModule_StringAppendBuffer(ctx, manager_list, " MANAGERS",
|
||||
strlen(" MANAGERS"));
|
||||
|
||||
@@ -328,6 +344,16 @@ int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx,
|
||||
/* Sets are not implemented yet, so we use ZSETs instead. */
|
||||
RedisModule_ZsetAdd(table_key, 0.0, manager, NULL);
|
||||
|
||||
RedisModuleString *bcast_client_str =
|
||||
RedisModule_CreateString(ctx, OBJECT_BCAST, strlen(OBJECT_BCAST));
|
||||
bool success = PublishObjectNotification(ctx, bcast_client_str, object_id,
|
||||
data_size, table_key);
|
||||
if (!success) {
|
||||
/* The publish failed somehow. */
|
||||
return RedisModule_ReplyWithError(ctx, "PUBLISH BCAST unsuccessful");
|
||||
}
|
||||
RedisModule_FreeString(ctx, bcast_client_str);
|
||||
|
||||
/* Get the zset of clients that requested a notification about the
|
||||
* availability of this object. */
|
||||
RedisModuleKey *object_notification_key =
|
||||
@@ -344,14 +370,15 @@ int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx,
|
||||
/* Iterate over the list of clients that requested notifiations about the
|
||||
* availability of this object, and publish notifications to their object
|
||||
* notification channels. */
|
||||
|
||||
do {
|
||||
RedisModuleString *client_id =
|
||||
RedisModule_ZsetRangeCurrentElement(object_notification_key, NULL);
|
||||
/* TODO(rkn): Some computation could be saved by batching the string
|
||||
* constructions in the multiple calls to PublishObjectNotification
|
||||
* together. */
|
||||
bool success =
|
||||
PublishObjectNotification(ctx, client_id, object_id, table_key);
|
||||
bool success = PublishObjectNotification(ctx, client_id, object_id,
|
||||
data_size, table_key);
|
||||
if (!success) {
|
||||
/* The publish failed somehow. */
|
||||
RedisModule_CloseKey(object_notification_key);
|
||||
@@ -420,7 +447,23 @@ int ObjectTableRequestNotifications_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModule_CloseKey(object_notification_key);
|
||||
} else {
|
||||
/* Publish a notification to the client's object notification channel. */
|
||||
bool success = PublishObjectNotification(ctx, client_id, object_id, key);
|
||||
/* Extract the data_size first. */
|
||||
RedisModuleKey *object_info_key;
|
||||
object_info_key =
|
||||
OpenPrefixedKey(ctx, OBJECT_INFO_PREFIX, object_id, REDISMODULE_READ);
|
||||
int keytype = RedisModule_KeyType(key);
|
||||
if (keytype == REDISMODULE_KEYTYPE_EMPTY) {
|
||||
RedisModule_CloseKey(object_info_key);
|
||||
RedisModule_CloseKey(key);
|
||||
return RedisModule_ReplyWithError(ctx, "requested object not found");
|
||||
}
|
||||
RedisModuleString *existing_data_size;
|
||||
RedisModule_HashGet(object_info_key, REDISMODULE_HASH_CFIELDS,
|
||||
"data_size", &existing_data_size, NULL);
|
||||
RedisModule_CloseKey(object_info_key); /* No longer needed. */
|
||||
|
||||
bool success = PublishObjectNotification(ctx, client_id, object_id,
|
||||
existing_data_size, key);
|
||||
if (!success) {
|
||||
/* The publish failed somehow. */
|
||||
RedisModule_CloseKey(key);
|
||||
|
||||
@@ -5,6 +5,7 @@ from __future__ import print_function
|
||||
import os
|
||||
import random
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import unittest
|
||||
import redis
|
||||
@@ -24,6 +25,20 @@ OBJECT_SUBSCRIBE_PREFIX = "OS:"
|
||||
TASK_PREFIX = "TT:"
|
||||
OBJECT_CHANNEL_PREFIX = "OC:"
|
||||
|
||||
def integerToAsciiHex(num, numbytes):
|
||||
retstr = b""
|
||||
# Support 32 and 64 bit architecture.
|
||||
assert(numbytes == 4 or numbytes == 8)
|
||||
for i in range(numbytes):
|
||||
curbyte = num & 0xff
|
||||
if sys.version_info >= (3, 0):
|
||||
retstr += bytes([curbyte])
|
||||
else:
|
||||
retstr += chr(curbyte)
|
||||
num = num >> 8
|
||||
|
||||
return retstr
|
||||
|
||||
class TestGlobalStateStore(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
@@ -37,6 +52,7 @@ class TestGlobalStateStore(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
self.redis_process.kill()
|
||||
|
||||
|
||||
def testInvalidObjectTableAdd(self):
|
||||
# Check that Redis returns an error when RAY.OBJECT_TABLE_ADD is called with
|
||||
# the wrong arguments.
|
||||
@@ -86,28 +102,33 @@ class TestGlobalStateStore(unittest.TestCase):
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", 1, "\x00hash2", "manager_id1")
|
||||
|
||||
def testObjectTableSubscribeToNotifications(self):
|
||||
data_size = 0xf1f0
|
||||
p = self.redis.pubsub()
|
||||
# Subscribe to an object ID.
|
||||
p.psubscribe("{}manager_id1".format(OBJECT_CHANNEL_PREFIX))
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id2")
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", data_size, "hash1", "manager_id2")
|
||||
# Receive the acknowledgement message.
|
||||
self.assertEqual(p.get_message()["data"], 1)
|
||||
# Request a notification and receive the data.
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id1")
|
||||
self.assertEqual(p.get_message()["data"], b"object_id1 MANAGERS manager_id2")
|
||||
self.assertEqual(p.get_message()["data"], b"object_id1 %s MANAGERS manager_id2"\
|
||||
%integerToAsciiHex(data_size, 8))
|
||||
# Request a notification for an object that isn't there. Then add the object
|
||||
# and receive the data. Only the first call to RAY.OBJECT_TABLE_ADD should
|
||||
# trigger notifications.
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id2", "object_id3")
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", 1, "hash1", "manager_id1")
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", 1, "hash1", "manager_id2")
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", 1, "hash1", "manager_id3")
|
||||
self.assertEqual(p.get_message()["data"], b"object_id3 MANAGERS manager_id1")
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id2", 1, "hash1", "manager_id3")
|
||||
self.assertEqual(p.get_message()["data"], b"object_id2 MANAGERS manager_id3")
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id1")
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id2")
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", data_size, "hash1", "manager_id3")
|
||||
self.assertEqual(p.get_message()["data"], b"object_id3 %s MANAGERS manager_id1"\
|
||||
%integerToAsciiHex(data_size, 8))
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id2", data_size, "hash1", "manager_id3")
|
||||
self.assertEqual(p.get_message()["data"], b"object_id2 %s MANAGERS manager_id3"\
|
||||
%integerToAsciiHex(data_size, 8))
|
||||
# Request notifications for object_id3 again.
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id3")
|
||||
self.assertEqual(p.get_message()["data"], b"object_id3 MANAGERS manager_id1 manager_id2 manager_id3")
|
||||
self.assertEqual(p.get_message()["data"], b"object_id3 %s MANAGERS manager_id1 manager_id2 manager_id3"\
|
||||
%integerToAsciiHex(data_size, 8))
|
||||
|
||||
def testResultTableAddAndLookup(self):
|
||||
response = self.redis.execute_command("RAY.RESULT_TABLE_LOOKUP", "object_id1")
|
||||
|
||||
@@ -30,6 +30,7 @@ void object_table_add(db_handle *db_handle,
|
||||
|
||||
void object_table_subscribe_to_notifications(
|
||||
db_handle *db_handle,
|
||||
bool subscribe_all,
|
||||
object_table_object_available_callback object_available_callback,
|
||||
void *subscribe_context,
|
||||
retry_info *retry,
|
||||
@@ -40,6 +41,7 @@ void object_table_subscribe_to_notifications(
|
||||
malloc(sizeof(object_table_subscribe_data));
|
||||
sub_data->object_available_callback = object_available_callback;
|
||||
sub_data->subscribe_context = subscribe_context;
|
||||
sub_data->subscribe_all = subscribe_all;
|
||||
|
||||
init_table_callback(
|
||||
db_handle, NIL_OBJECT_ID, __func__, sub_data, retry, done_callback,
|
||||
|
||||
@@ -19,6 +19,14 @@ typedef void (*object_table_lookup_done_callback)(
|
||||
OWNER const char *manager_vector[],
|
||||
void *user_context);
|
||||
|
||||
/* Callback called when object object_id is available. */
|
||||
typedef void (*object_table_object_available_callback)(
|
||||
object_id object_id,
|
||||
int64_t data_size,
|
||||
int manager_count,
|
||||
OWNER const char *manager_vector[],
|
||||
void *user_context);
|
||||
|
||||
/**
|
||||
* Return the list of nodes storing object_id in their plasma stores.
|
||||
*
|
||||
@@ -97,10 +105,6 @@ void object_table_remove(db_handle *db,
|
||||
* ==== Subscribe to be announced when new object available ====
|
||||
*/
|
||||
|
||||
/* Callback called when object object_id is available. */
|
||||
typedef object_table_lookup_done_callback
|
||||
object_table_object_available_callback;
|
||||
|
||||
/**
|
||||
* Set up a client-specific channel for receiving notifications about available
|
||||
* objects from the object table. The callback will be called once per
|
||||
@@ -120,6 +124,7 @@ typedef object_table_lookup_done_callback
|
||||
*/
|
||||
void object_table_subscribe_to_notifications(
|
||||
db_handle *db_handle,
|
||||
bool subscribe_all,
|
||||
object_table_object_available_callback object_available_callback,
|
||||
void *subscribe_context,
|
||||
retry_info *retry,
|
||||
@@ -153,6 +158,7 @@ typedef struct {
|
||||
/** Data that is needed to register new object available callbacks with the
|
||||
* state database. */
|
||||
typedef struct {
|
||||
bool subscribe_all;
|
||||
object_table_object_available_callback object_available_callback;
|
||||
void *subscribe_context;
|
||||
} object_table_subscribe_data;
|
||||
|
||||
+37
-12
@@ -548,13 +548,16 @@ object_id parse_subscribe_to_notifications_payload(
|
||||
db_handle *db,
|
||||
char *payload,
|
||||
int length,
|
||||
int64_t *data_size,
|
||||
int *manager_count,
|
||||
const char ***manager_vector) {
|
||||
int num_managers = (length - sizeof(object_id) - 1 - strlen("MANAGERS")) /
|
||||
long long data_size_value = 0;
|
||||
int num_managers = (length - sizeof(object_id) - 1 - sizeof(data_size_value) -
|
||||
1 - strlen("MANAGERS")) /
|
||||
(1 + sizeof(db_client_id));
|
||||
CHECK(length ==
|
||||
sizeof(object_id) + 1 + strlen("MANAGERS") +
|
||||
num_managers * (1 + sizeof(db_client_id)));
|
||||
sizeof(object_id) + 1 + sizeof(data_size_value) + 1 +
|
||||
strlen("MANAGERS") + num_managers * (1 + sizeof(db_client_id)));
|
||||
CHECK(num_managers > 0);
|
||||
object_id obj_id;
|
||||
/* Track our current offset in the payload. */
|
||||
@@ -562,7 +565,14 @@ object_id parse_subscribe_to_notifications_payload(
|
||||
/* Parse the object ID. */
|
||||
memcpy(&obj_id.id, &payload[offset], sizeof(obj_id.id));
|
||||
offset += sizeof(obj_id.id);
|
||||
/* The next part of the payload is the string " MANAGERS". */
|
||||
/* The next part of the payload is a space. */
|
||||
char *space_str = " ";
|
||||
CHECK(memcmp(&payload[offset], space_str, strlen(space_str)) == 0);
|
||||
offset += strlen(space_str);
|
||||
/* The next part of the payload is binary data_size. */
|
||||
memcpy(&data_size_value, &payload[offset], sizeof(data_size_value));
|
||||
offset += sizeof(data_size_value);
|
||||
/* The next part of the payload is the string " MANAGERS" with leading ' '. */
|
||||
char *managers_str = " MANAGERS";
|
||||
CHECK(memcmp(&payload[offset], managers_str, strlen(managers_str)) == 0);
|
||||
offset += strlen(managers_str);
|
||||
@@ -583,6 +593,7 @@ object_id parse_subscribe_to_notifications_payload(
|
||||
/* Return the manager array and the object ID. */
|
||||
*manager_count = num_managers;
|
||||
*manager_vector = managers;
|
||||
*data_size = data_size_value;
|
||||
return obj_id;
|
||||
}
|
||||
|
||||
@@ -613,16 +624,17 @@ void object_table_redis_subscribe_to_notifications_callback(
|
||||
|
||||
if (strcmp(message_type->str, "message") == 0) {
|
||||
/* Handle an object notification. */
|
||||
int64_t data_size = 0;
|
||||
int manager_count;
|
||||
const char **manager_vector;
|
||||
object_id obj_id = parse_subscribe_to_notifications_payload(
|
||||
db, reply->element[2]->str, reply->element[2]->len, &manager_count,
|
||||
&manager_vector);
|
||||
db, reply->element[2]->str, reply->element[2]->len, &data_size,
|
||||
&manager_count, &manager_vector);
|
||||
/* Call the subscribe callback. */
|
||||
object_table_subscribe_data *data = callback_data->data;
|
||||
if (data->object_available_callback) {
|
||||
data->object_available_callback(obj_id, manager_count, manager_vector,
|
||||
data->subscribe_context);
|
||||
data->object_available_callback(obj_id, data_size, manager_count,
|
||||
manager_vector, data->subscribe_context);
|
||||
}
|
||||
free(manager_vector);
|
||||
} else if (strcmp(message_type->str, "subscribe") == 0) {
|
||||
@@ -650,13 +662,26 @@ void redis_object_table_subscribe_to_notifications(
|
||||
/* The object channel prefix must match the value defined in
|
||||
* src/common/redismodule/ray_redis_module.c. */
|
||||
const char *object_channel_prefix = "OC:";
|
||||
const char *object_channel_bcast = "BCAST";
|
||||
int status = REDIS_OK;
|
||||
/* Subscribe to notifications from the object table. This uses the client ID
|
||||
* as the channel name so this channel is specific to this client. TODO(rkn):
|
||||
* The channel name should probably be the client ID with some prefix. */
|
||||
int status = redisAsyncCommand(
|
||||
db->sub_context, object_table_redis_subscribe_to_notifications_callback,
|
||||
(void *) callback_data->timer_id, "SUBSCRIBE %s%b", object_channel_prefix,
|
||||
db->client.id, sizeof(db->client.id));
|
||||
CHECKM(callback_data->data != NULL,
|
||||
"Object table subscribe data passed as NULL.");
|
||||
if (((object_table_subscribe_data *) (callback_data->data))->subscribe_all) {
|
||||
/* Subscribe to the object broadcast channel. */
|
||||
status = redisAsyncCommand(
|
||||
db->sub_context, object_table_redis_subscribe_to_notifications_callback,
|
||||
(void *) callback_data->timer_id, "SUBSCRIBE %s%s",
|
||||
object_channel_prefix, object_channel_bcast);
|
||||
} else {
|
||||
status = redisAsyncCommand(
|
||||
db->sub_context, object_table_redis_subscribe_to_notifications_callback,
|
||||
(void *) callback_data->timer_id, "SUBSCRIBE %s%b",
|
||||
object_channel_prefix, db->client.id, sizeof(db->client.id));
|
||||
}
|
||||
|
||||
if ((status == REDIS_ERR) || db->sub_context->err) {
|
||||
LOG_REDIS_DEBUG(db->sub_context,
|
||||
"error in redis_object_table_subscribe_to_notifications");
|
||||
|
||||
@@ -219,6 +219,7 @@ TEST add_timeout_test(void) {
|
||||
int subscribe_failed = 0;
|
||||
|
||||
void subscribe_done_callback(object_id object_id,
|
||||
int64_t data_size,
|
||||
int manager_count,
|
||||
const char *manager_vector[],
|
||||
void *user_context) {
|
||||
@@ -243,8 +244,8 @@ TEST subscribe_timeout_test(void) {
|
||||
.timeout = 100,
|
||||
.fail_callback = subscribe_fail_callback,
|
||||
};
|
||||
object_table_subscribe_to_notifications(db, subscribe_done_callback, NULL,
|
||||
&retry, NULL, NULL);
|
||||
object_table_subscribe_to_notifications(db, false, subscribe_done_callback,
|
||||
NULL, &retry, NULL, NULL);
|
||||
/* Disconnect the database to see if the lookup times out. */
|
||||
close(db->sub_context->c.fd);
|
||||
event_loop_run(g_loop);
|
||||
@@ -472,7 +473,7 @@ TEST subscribe_retry_test(void) {
|
||||
.timeout = 100,
|
||||
.fail_callback = subscribe_retry_fail_callback,
|
||||
};
|
||||
object_table_subscribe_to_notifications(db, NULL, NULL, &retry,
|
||||
object_table_subscribe_to_notifications(db, false, NULL, NULL, &retry,
|
||||
subscribe_retry_done_callback,
|
||||
(void *) subscribe_retry_context);
|
||||
/* Disconnect the database to let the subscribe times out the first time. */
|
||||
@@ -612,7 +613,7 @@ TEST subscribe_late_test(void) {
|
||||
.timeout = 0,
|
||||
.fail_callback = subscribe_late_fail_callback,
|
||||
};
|
||||
object_table_subscribe_to_notifications(db, NULL, NULL, &retry,
|
||||
object_table_subscribe_to_notifications(db, false, NULL, NULL, &retry,
|
||||
subscribe_late_done_callback,
|
||||
(void *) subscribe_late_context);
|
||||
/* Install handler for terminating the event loop. */
|
||||
@@ -657,6 +658,7 @@ void subscribe_success_done_callback(object_id object_id,
|
||||
}
|
||||
|
||||
void subscribe_success_object_available_callback(object_id object_id,
|
||||
int64_t data_size,
|
||||
int manager_count,
|
||||
const char *manager_vector[],
|
||||
void *user_context) {
|
||||
@@ -679,7 +681,7 @@ TEST subscribe_success_test(void) {
|
||||
.fail_callback = subscribe_success_fail_callback,
|
||||
};
|
||||
object_table_subscribe_to_notifications(
|
||||
db, subscribe_success_object_available_callback,
|
||||
db, false, subscribe_success_object_available_callback,
|
||||
(void *) subscribe_success_context, &retry,
|
||||
subscribe_success_done_callback, (void *) db);
|
||||
|
||||
@@ -702,16 +704,24 @@ TEST subscribe_success_test(void) {
|
||||
}
|
||||
|
||||
/* Test if subscribe succeeds if the object is already present. */
|
||||
typedef struct {
|
||||
const char *teststr;
|
||||
int64_t data_size;
|
||||
} subscribe_object_present_context_t;
|
||||
|
||||
const char *subscribe_object_present_context = "subscribe_object_present";
|
||||
const char *subscribe_object_present_str = "subscribe_object_present";
|
||||
int subscribe_object_present_succeeded = 0;
|
||||
|
||||
void subscribe_object_present_object_available_callback(
|
||||
object_id object_id,
|
||||
int64_t data_size,
|
||||
int manager_count,
|
||||
const char *manager_vector[],
|
||||
void *user_context) {
|
||||
CHECK(user_context == (void *) subscribe_object_present_context);
|
||||
subscribe_object_present_context_t *ctx =
|
||||
(subscribe_object_present_context_t *) user_context;
|
||||
CHECK(ctx->data_size == data_size);
|
||||
CHECK(strcmp(subscribe_object_present_str, ctx->teststr) == 0);
|
||||
subscribe_object_present_succeeded = 1;
|
||||
CHECK(manager_count == 1);
|
||||
}
|
||||
@@ -722,6 +732,10 @@ void fatal_fail_callback(unique_id id, void *user_context, void *user_data) {
|
||||
}
|
||||
|
||||
TEST subscribe_object_present_test(void) {
|
||||
int64_t data_size = 0xF1F0;
|
||||
subscribe_object_present_context_t myctx = {subscribe_object_present_str,
|
||||
data_size};
|
||||
|
||||
g_loop = event_loop_create();
|
||||
db_handle *db =
|
||||
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236);
|
||||
@@ -730,10 +744,11 @@ TEST subscribe_object_present_test(void) {
|
||||
retry_info retry = {
|
||||
.num_retries = 0, .timeout = 100, .fail_callback = fatal_fail_callback,
|
||||
};
|
||||
object_table_add(db, id, 0, (unsigned char *) NIL_DIGEST, &retry, NULL, NULL);
|
||||
object_table_add(db, id, data_size, (unsigned char *) NIL_DIGEST, &retry,
|
||||
NULL, NULL);
|
||||
object_table_subscribe_to_notifications(
|
||||
db, subscribe_object_present_object_available_callback,
|
||||
(void *) subscribe_object_present_context, &retry, NULL, (void *) db);
|
||||
db, false, subscribe_object_present_object_available_callback,
|
||||
(void *) &myctx, &retry, NULL, (void *) db);
|
||||
/* Install handler for terminating the event loop. */
|
||||
event_loop_add_timer(g_loop, 750,
|
||||
(event_loop_timer_handler) terminate_event_loop_callback,
|
||||
@@ -764,6 +779,7 @@ const char *subscribe_object_not_present_context =
|
||||
|
||||
void subscribe_object_not_present_object_available_callback(
|
||||
object_id object_id,
|
||||
int64_t data_size,
|
||||
int manager_count,
|
||||
const char *manager_vector[],
|
||||
void *user_context) {
|
||||
@@ -781,7 +797,7 @@ TEST subscribe_object_not_present_test(void) {
|
||||
.num_retries = 0, .timeout = 100, .fail_callback = NULL,
|
||||
};
|
||||
object_table_subscribe_to_notifications(
|
||||
db, subscribe_object_not_present_object_available_callback,
|
||||
db, false, subscribe_object_not_present_object_available_callback,
|
||||
(void *) subscribe_object_not_present_context, &retry, NULL, (void *) db);
|
||||
/* Install handler for terminating the event loop. */
|
||||
event_loop_add_timer(g_loop, 750,
|
||||
@@ -813,16 +829,26 @@ int subscribe_object_available_later_succeeded = 0;
|
||||
|
||||
void subscribe_object_available_later_object_available_callback(
|
||||
object_id object_id,
|
||||
int64_t data_size,
|
||||
int manager_count,
|
||||
const char *manager_vector[],
|
||||
void *user_context) {
|
||||
CHECK(user_context == (void *) subscribe_object_available_later_context);
|
||||
subscribe_object_present_context_t *myctx =
|
||||
(subscribe_object_present_context_t *) user_context;
|
||||
CHECK(myctx->data_size == data_size);
|
||||
CHECK(strcmp(myctx->teststr, subscribe_object_available_later_context) == 0);
|
||||
/* Make sure the callback is only called once. */
|
||||
subscribe_object_available_later_succeeded += 1;
|
||||
CHECK(manager_count == 1);
|
||||
}
|
||||
|
||||
TEST subscribe_object_available_later_test(void) {
|
||||
int64_t data_size = 0xF1F0;
|
||||
subscribe_object_present_context_t *myctx =
|
||||
malloc(sizeof(subscribe_object_present_context_t));
|
||||
myctx->teststr = subscribe_object_available_later_context;
|
||||
myctx->data_size = data_size;
|
||||
|
||||
g_loop = event_loop_create();
|
||||
db_handle *db =
|
||||
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236);
|
||||
@@ -832,9 +858,8 @@ TEST subscribe_object_available_later_test(void) {
|
||||
.num_retries = 0, .timeout = 100, .fail_callback = NULL,
|
||||
};
|
||||
object_table_subscribe_to_notifications(
|
||||
db, subscribe_object_available_later_object_available_callback,
|
||||
(void *) subscribe_object_available_later_context, &retry, NULL,
|
||||
(void *) db);
|
||||
db, false, subscribe_object_available_later_object_available_callback,
|
||||
(void *) myctx, &retry, NULL, (void *) db);
|
||||
/* Install handler for terminating the event loop. */
|
||||
event_loop_add_timer(g_loop, 750,
|
||||
(event_loop_timer_handler) terminate_event_loop_callback,
|
||||
@@ -852,7 +877,8 @@ TEST subscribe_object_available_later_test(void) {
|
||||
event_loop_run(g_loop);
|
||||
|
||||
ASSERT_EQ(subscribe_object_available_later_succeeded, 0);
|
||||
object_table_add(db, id, 0, (unsigned char *) NIL_DIGEST, &retry, NULL, NULL);
|
||||
object_table_add(db, id, data_size, (unsigned char *) NIL_DIGEST, &retry,
|
||||
NULL, NULL);
|
||||
/* Install handler for terminating the event loop. */
|
||||
event_loop_add_timer(g_loop, 750,
|
||||
(event_loop_timer_handler) terminate_event_loop_callback,
|
||||
@@ -864,6 +890,57 @@ TEST subscribe_object_available_later_test(void) {
|
||||
destroy_outstanding_callbacks(g_loop);
|
||||
event_loop_destroy(g_loop);
|
||||
ASSERT_EQ(subscribe_object_available_later_succeeded, 1);
|
||||
/* Reset the global variable before exiting this unit test. */
|
||||
subscribe_object_available_later_succeeded = 0;
|
||||
free(myctx);
|
||||
PASS();
|
||||
}
|
||||
|
||||
TEST subscribe_object_available_subscribe_all(void) {
|
||||
int64_t data_size = 0xF1F0;
|
||||
subscribe_object_present_context_t myctx = {
|
||||
subscribe_object_available_later_context, data_size};
|
||||
g_loop = event_loop_create();
|
||||
db_handle *db =
|
||||
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236);
|
||||
db_attach(db, g_loop, false);
|
||||
unique_id id = globally_unique_id();
|
||||
retry_info retry = {
|
||||
.num_retries = 0, .timeout = 100, .fail_callback = NULL,
|
||||
};
|
||||
object_table_subscribe_to_notifications(
|
||||
db, true, subscribe_object_available_later_object_available_callback,
|
||||
(void *) &myctx, &retry, NULL, (void *) db);
|
||||
/* Install handler for terminating the event loop. */
|
||||
event_loop_add_timer(g_loop, 750,
|
||||
(event_loop_timer_handler) terminate_event_loop_callback,
|
||||
NULL);
|
||||
/* Run the event loop to do the subscribe. */
|
||||
event_loop_run(g_loop);
|
||||
|
||||
/* At this point we don't expect any object notifications received. */
|
||||
ASSERT_EQ(subscribe_object_available_later_succeeded, 0);
|
||||
object_table_add(db, id, data_size, (unsigned char *) NIL_DIGEST, &retry,
|
||||
NULL, NULL);
|
||||
/* Install handler to terminate event loop after 750ms. */
|
||||
event_loop_add_timer(g_loop, 750,
|
||||
(event_loop_timer_handler) terminate_event_loop_callback,
|
||||
NULL);
|
||||
/* Run the event loop to do the object table add. */
|
||||
event_loop_run(g_loop);
|
||||
/* At this point we assume that object table add completed. */
|
||||
|
||||
db_disconnect(db);
|
||||
destroy_outstanding_callbacks(g_loop);
|
||||
event_loop_destroy(g_loop);
|
||||
/* Assert that the object table add completed and notification callback fired.
|
||||
*/
|
||||
printf("subscribe_all object info test: callback fired: %d times\n",
|
||||
subscribe_object_available_later_succeeded);
|
||||
fflush(stdout);
|
||||
ASSERT_EQ(subscribe_object_available_later_succeeded, 1);
|
||||
/* Reset the global variable before exiting this unit test. */
|
||||
subscribe_object_available_later_succeeded = 0;
|
||||
PASS();
|
||||
}
|
||||
|
||||
@@ -954,6 +1031,7 @@ SUITE(object_table_tests) {
|
||||
RUN_REDIS_TEST(subscribe_object_present_test);
|
||||
RUN_REDIS_TEST(subscribe_object_not_present_test);
|
||||
RUN_REDIS_TEST(subscribe_object_available_later_test);
|
||||
RUN_REDIS_TEST(subscribe_object_available_subscribe_all);
|
||||
// RUN_REDIS_TEST(subscribe_object_info_success_test);
|
||||
}
|
||||
|
||||
|
||||
@@ -963,6 +963,7 @@ void object_present_callback(object_id object_id,
|
||||
/* This callback is used by both fetch and wait. Therefore, it may have to
|
||||
* handle outstanding fetch and wait requests. */
|
||||
void object_table_subscribe_callback(object_id object_id,
|
||||
int64_t data_size,
|
||||
int manager_count,
|
||||
const char *manager_vector[],
|
||||
void *context) {
|
||||
@@ -1378,7 +1379,7 @@ void start_server(const char *store_socket_name,
|
||||
handle_new_client, g_manager_state);
|
||||
/* Set up a client-specific channel to receive notifications from the object
|
||||
* table. */
|
||||
object_table_subscribe_to_notifications(g_manager_state->db,
|
||||
object_table_subscribe_to_notifications(g_manager_state->db, false,
|
||||
object_table_subscribe_callback,
|
||||
g_manager_state, NULL, NULL, NULL);
|
||||
/* Run the event loop. */
|
||||
|
||||
Reference in New Issue
Block a user