Implement object table notification subscriptions and switch to using Redis modules for object table. (#134)

* Implement RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS.

* Call object_table_request_notifications from plasma manager.

* Use Redis modules for object table.

* Cleaning up code.

* More checks.

* Formatting.

* Make object table tests pass.

* Formatting.

* Add prefix to the object notification channel name.

* Formatting.

* Fixes.

* Increase time in redismodule test.
This commit is contained in:
Robert Nishihara
2016-12-18 18:19:02 -08:00
committed by Philipp Moritz
parent c89bf4e5bc
commit 269f37e26f
10 changed files with 673 additions and 469 deletions
+1
View File
@@ -1,4 +1,5 @@
BasedOnStyle: Chromium
ColumnLimit: 80
DerivePointerAlignment: false
IndentCaseLabels: false
PointerAlignment: Right
+155 -39
View File
@@ -1,5 +1,6 @@
#include "redismodule.h"
#include <stdbool.h>
#include <string.h>
/**
@@ -22,9 +23,11 @@
#define DB_CLIENT_PREFIX "CL:"
#define OBJECT_INFO_PREFIX "OI:"
#define OBJECT_LOCATION_PREFIX "OL:"
#define OBJECT_SUBSCRIBE_PREFIX "OS:"
#define OBJECT_NOTIFICATION_PREFIX "ON:"
#define TASK_PREFIX "TT:"
#define OBJECT_CHANNEL_PREFIX "OC:"
#define CHECK_ERROR(STATUS, MESSAGE) \
if ((STATUS) == REDISMODULE_ERR) { \
return RedisModule_ReplyWithError(ctx, (MESSAGE)); \
@@ -211,6 +214,56 @@ int ObjectTableLookup_RedisCommand(RedisModuleCtx *ctx,
return REDISMODULE_OK;
}
/**
* Publish a notification to a client's object notification channel if at least
* one manager is listed as having the object in the object table.
*
* @param ctx The Redis context.
* @param client_id The ID of the client that is being notified.
* @param object_id The object ID of interest.
* @param key The opened key for the entry in the object table corresponding to
* the object ID of interest.
* @return True if the publish was successful and false otherwise.
*/
bool PublishObjectNotification(RedisModuleCtx *ctx,
RedisModuleString *client_id,
RedisModuleString *object_id,
RedisModuleKey *key) {
/* Create a string formatted as "<object id> MANAGERS <manager id1>
* <manager id2> ..." */
RedisModuleString *manager_list =
RedisModule_CreateStringFromString(ctx, object_id);
RedisModule_StringAppendBuffer(ctx, manager_list, " MANAGERS",
strlen(" MANAGERS"));
CHECK_ERROR(
RedisModule_ZsetFirstInScoreRange(key, REDISMODULE_NEGATIVE_INFINITE,
REDISMODULE_POSITIVE_INFINITE, 1, 1),
"Unable to initialize zset iterator");
/* Loop over the managers in the object table for this object ID. */
do {
RedisModuleString *curr = RedisModule_ZsetRangeCurrentElement(key, NULL);
RedisModule_StringAppendBuffer(ctx, manager_list, " ", 1);
size_t size;
const char *val = RedisModule_StringPtrLen(curr, &size);
RedisModule_StringAppendBuffer(ctx, manager_list, val, size);
} while (RedisModule_ZsetRangeNext(key));
/* Publish the notification to the clients notification channel.
* TODO(rkn): These notifications could be batched together. */
RedisModuleString *channel_name =
CreatePrefixedString(ctx, OBJECT_CHANNEL_PREFIX, client_id);
RedisModuleCallReply *reply;
reply = RedisModule_Call(ctx, "PUBLISH", "ss", channel_name, manager_list);
RedisModule_FreeString(ctx, channel_name);
RedisModule_FreeString(ctx, manager_list);
if (reply == NULL) {
return false;
}
return true;
}
/**
* Add a new entry to the object table or update an existing one.
*
@@ -275,47 +328,110 @@ int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx,
/* Sets are not implemented yet, so we use ZSETs instead. */
RedisModule_ZsetAdd(table_key, 0.0, manager, NULL);
/* Build the PUBLISH topic and message for object table subscribers. The
* topic is a string in the format "OBJECT_LOCATION_PREFIX:<object ID>". The
* message is a string in the format: "<manager ID> <manager ID> ... <manager
* ID>". */
RedisModuleString *publish_topic =
CreatePrefixedString(ctx, OBJECT_LOCATION_PREFIX, object_id);
const char *MANAGERS = "MANAGERS";
RedisModuleString *publish =
RedisModule_CreateString(ctx, MANAGERS, strlen(MANAGERS));
CHECK_ERROR(RedisModule_ZsetFirstInScoreRange(
table_key, REDISMODULE_NEGATIVE_INFINITE,
REDISMODULE_POSITIVE_INFINITE, 1, 1),
"Unable to initialize zset iterator");
do {
RedisModuleString *curr =
RedisModule_ZsetRangeCurrentElement(table_key, NULL);
RedisModule_StringAppendBuffer(ctx, publish, " ", 1);
size_t size;
const char *val = RedisModule_StringPtrLen(curr, &size);
RedisModule_StringAppendBuffer(ctx, publish, val, size);
} while (RedisModule_ZsetRangeNext(table_key));
RedisModuleCallReply *reply =
RedisModule_Call(ctx, "PUBLISH", "ss", publish_topic, publish);
RedisModule_FreeString(ctx, publish);
RedisModule_FreeString(ctx, publish_topic);
RedisModule_CloseKey(table_key);
if (reply == NULL) {
return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful");
/* Get the zset of clients that requested a notification about the
* availability of this object. */
RedisModuleKey *object_notification_key =
OpenPrefixedKey(ctx, OBJECT_NOTIFICATION_PREFIX, object_id,
REDISMODULE_READ | REDISMODULE_WRITE);
/* If the zset exists, initialize the key to iterate over the zset. */
int object_notification_keytype =
RedisModule_KeyType(object_notification_key);
if (object_notification_keytype != REDISMODULE_KEYTYPE_EMPTY) {
CHECK_ERROR(RedisModule_ZsetFirstInScoreRange(
object_notification_key, REDISMODULE_NEGATIVE_INFINITE,
REDISMODULE_POSITIVE_INFINITE, 1, 1),
"Unable to initialize zset iterator");
/* Iterate over the list of clients that requested notifiations about the
* availability of this object, and publish notifications to their object
* notification channels. */
do {
RedisModuleString *client_id =
RedisModule_ZsetRangeCurrentElement(object_notification_key, NULL);
/* TODO(rkn): Some computation could be saved by batching the string
* constructions in the multiple calls to PublishObjectNotification
* together. */
bool success =
PublishObjectNotification(ctx, client_id, object_id, table_key);
if (!success) {
/* The publish failed somehow. */
RedisModule_CloseKey(object_notification_key);
return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful");
}
} while (RedisModule_ZsetRangeNext(object_notification_key));
/* Now that the clients have been notified, remove the zset of clients
* waiting for notifications. */
CHECK_ERROR(RedisModule_DeleteKey(object_notification_key),
"Unable to delete zset key.");
RedisModule_CloseKey(object_notification_key);
}
RedisModule_ReplyWithSimpleString(ctx, "OK");
return REDISMODULE_OK;
}
int ObjectTableSubscribe_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
/**
* Request notifications about the presence of some object IDs. This command
* takes a list of object IDs. There will be an immediate reply acknowledging
* the call and containing a list of all the object IDs that are already
* present in the object table along with vectors of the plasma managers that
* contain each object. For each object ID that is not already present in the
* object table, there will be a separate subsequent reply that returns the list
* of manager vectors conaining the object ID, and this will be called as soon
* as the object is added to the object table.
*
* This is called from a client with the command:
*
* RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS <client id> <object id1>
* <object id2> ...
*
* @param client_id The ID of the client that is requesting the notifications.
* @param object_id(n) The ID of the nth object ID that is passed to this
* command. This command can take any number of object IDs.
* @return OK if the operation was successful.
*/
int ObjectTableRequestNotifications_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
if (argc < 3) {
return RedisModule_WrongArity(ctx);
}
/* The first argument is the client ID. The other arguments are object IDs. */
RedisModuleString *client_id = argv[1];
/* Loop over the object ID arguments to this command. */
for (int i = 2; i < argc; ++i) {
RedisModuleString *object_id = argv[i];
RedisModuleKey *key = OpenPrefixedKey(ctx, OBJECT_LOCATION_PREFIX,
object_id, REDISMODULE_READ);
int keytype = RedisModule_KeyType(key);
if (keytype == REDISMODULE_KEYTYPE_EMPTY ||
RedisModule_ValueLength(key) == 0) {
/* This object ID is currently not present, so make a note that this
* client should be notified when this object ID becomes available. */
RedisModuleKey *object_notification_key =
OpenPrefixedKey(ctx, OBJECT_NOTIFICATION_PREFIX, object_id,
REDISMODULE_READ | REDISMODULE_WRITE);
/* Add this client to the list of clients that will be notified when this
* object becomes available. */
CHECK_ERROR(
RedisModule_ZsetAdd(object_notification_key, 0.0, client_id, NULL),
"ZsetAdd failed.");
RedisModule_CloseKey(object_notification_key);
} else {
/* Publish a notification to the client's object notification channel. */
bool success = PublishObjectNotification(ctx, client_id, object_id, key);
if (!success) {
/* The publish failed somehow. */
RedisModule_CloseKey(key);
return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful");
}
}
/* Clean up. */
RedisModule_CloseKey(key);
}
RedisModule_ReplyWithSimpleString(ctx, "OK");
return REDISMODULE_OK;
}
@@ -649,9 +765,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx,
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "ray.object_table_subscribe",
ObjectTableSubscribe_RedisCommand, "pubsub", 0,
0, 0) == REDISMODULE_ERR) {
if (RedisModule_CreateCommand(ctx, "ray.object_table_request_notifications",
ObjectTableRequestNotifications_RedisCommand,
"write pubsub", 0, 0, 0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
+21 -6
View File
@@ -22,6 +22,7 @@ OBJECT_INFO_PREFIX = "OI:"
OBJECT_LOCATION_PREFIX = "OL:"
OBJECT_SUBSCRIBE_PREFIX = "OS:"
TASK_PREFIX = "TT:"
OBJECT_CHANNEL_PREFIX = "OC:"
class TestGlobalStateStore(unittest.TestCase):
@@ -30,7 +31,7 @@ class TestGlobalStateStore(unittest.TestCase):
self.redis_process = subprocess.Popen([redis_path,
"--port", str(redis_port),
"--loadmodule", module_path])
time.sleep(0.5)
time.sleep(1.5)
self.redis = redis.StrictRedis(host="localhost", port=redis_port, db=0)
def tearDown(self):
@@ -84,15 +85,29 @@ class TestGlobalStateStore(unittest.TestCase):
with self.assertRaises(redis.ResponseError):
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", 1, "\x00hash2", "manager_id1")
def testObjectTableSubscribe(self):
def testObjectTableSubscribeToNotifications(self):
p = self.redis.pubsub()
# Subscribe to an object ID.
p.psubscribe("{0}*".format(OBJECT_LOCATION_PREFIX))
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id1")
p.psubscribe("{}manager_id1".format(OBJECT_CHANNEL_PREFIX))
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id2")
# Receive the acknowledgement message.
self.assertEqual(p.get_message()["data"], 1)
# Receive the actual data.
self.assertEqual(p.get_message()["data"], b"MANAGERS manager_id1")
# Request a notification and receive the data.
self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id1")
self.assertEqual(p.get_message()["data"], b"object_id1 MANAGERS manager_id2")
# Request a notification for an object that isn't there. Then add the object
# and receive the data. Only the first call to RAY.OBJECT_TABLE_ADD should
# trigger notifications.
self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id2", "object_id3")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", 1, "hash1", "manager_id1")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", 1, "hash1", "manager_id2")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", 1, "hash1", "manager_id3")
self.assertEqual(p.get_message()["data"], b"object_id3 MANAGERS manager_id1")
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id2", 1, "hash1", "manager_id3")
self.assertEqual(p.get_message()["data"], b"object_id2 MANAGERS manager_id3")
# Request notifications for object_id3 again.
self.redis.execute_command("RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS", "manager_id1", "object_id3")
self.assertEqual(p.get_message()["data"], b"object_id3 MANAGERS manager_id1 manager_id2 manager_id3")
def testResultTableAddAndLookup(self):
response = self.redis.execute_command("RAY.RESULT_TABLE_LOOKUP", "object_id1")
+23 -8
View File
@@ -14,23 +14,22 @@ void object_table_lookup(db_handle *db_handle,
void object_table_add(db_handle *db_handle,
object_id object_id,
int64_t data_size,
int64_t object_size,
unsigned char digest[],
retry_info *retry,
object_table_done_callback done_callback,
void *user_context) {
CHECK(db_handle != NULL);
object_info *info = malloc(sizeof(object_info));
info->data_size = data_size;
object_table_add_data *info = malloc(sizeof(object_table_add_data));
info->object_size = object_size;
memcpy(&info->digest[0], digest, DIGEST_SIZE);
init_table_callback(db_handle, object_id, __func__, info, retry,
done_callback, redis_object_table_add, user_context);
}
void object_table_subscribe(
void object_table_subscribe_to_notifications(
db_handle *db_handle,
object_id object_id,
object_table_object_available_callback object_available_callback,
void *subscribe_context,
retry_info *retry,
@@ -42,9 +41,25 @@ void object_table_subscribe(
sub_data->object_available_callback = object_available_callback;
sub_data->subscribe_context = subscribe_context;
init_table_callback(db_handle, object_id, __func__, sub_data, retry,
done_callback, redis_object_table_subscribe,
user_context);
init_table_callback(
db_handle, NIL_OBJECT_ID, __func__, sub_data, retry, done_callback,
redis_object_table_subscribe_to_notifications, user_context);
}
void object_table_request_notifications(db_handle *db_handle,
int num_object_ids,
object_id object_ids[],
retry_info *retry) {
CHECK(db_handle != NULL);
CHECK(num_object_ids > 0);
object_table_request_notifications_data *data =
malloc(sizeof(object_table_request_notifications_data) +
num_object_ids * sizeof(object_id));
data->num_object_ids = num_object_ids;
memcpy(data->object_ids, object_ids, num_object_ids * sizeof(object_id));
init_table_callback(db_handle, NIL_OBJECT_ID, __func__, data, retry, NULL,
redis_object_table_request_notifications, NULL);
}
void object_info_subscribe(db_handle *db_handle,
+43 -13
View File
@@ -57,12 +57,18 @@ typedef void (*object_table_done_callback)(object_id object_id,
*/
void object_table_add(db_handle *db_handle,
object_id object_id,
int64_t data_size,
int64_t object_size,
unsigned char digest[],
retry_info *retry,
object_table_done_callback done_callback,
void *user_context);
/** Data that is needed to add new objects to the object table. */
typedef struct {
int64_t object_size;
unsigned char digest[DIGEST_SIZE];
} object_table_add_data;
/*
* ==== Remove object call and callback ====
*/
@@ -96,32 +102,56 @@ typedef object_table_lookup_done_callback
object_table_object_available_callback;
/**
* Subcribing to new object available function.
* Set up a client-specific channel for receiving notifications about available
* objects from the object table. The callback will be called once per
* notification received on this channel.
*
* @param db_handle Handle to db.
* @param object_id Object unique identifier.
* @param object_available_callback callback to be called when new object
* @param object_available_callback Callback to be called when new object
* becomes available.
* @param subscribe_context caller context which will be passed back in the
* @param subscribe_context Caller context which will be passed to the
* object_available_callback.
* @param retry Information about retrying the request to the database.
* @param done_callback Callback to be called when subscription is installed.
* @param user_context User context to be passed into the done and fail
* callbacks.
* This is only used for the tests.
* @param user_context User context to be passed into the done callback. This is
* only used for the tests.
* @return Void.
*/
void object_table_subscribe(
db_handle *db,
object_id object_id,
void object_table_subscribe_to_notifications(
db_handle *db_handle,
object_table_object_available_callback object_available_callback,
void *subscribe_context,
retry_info *retry,
object_table_lookup_done_callback done_callback,
void *user_context);
/* Data that is needed to register new object available callbacks with the state
* database. */
/**
* Request notifications about the availability of some objects from the object
* table. The notifications will be published to this client's object
* notification channel, which was set up by the method
* object_table_subscribe_to_notifications.
*
* @param db_handle Handle to db.
* @param object_ids The object IDs to receive notifications about.
* @param retry Information about retrying the request to the database.
* @return Void.
*/
void object_table_request_notifications(db_handle *db,
int num_object_ids,
object_id object_ids[],
retry_info *retry);
/** Data that is needed to run object_request_notifications requests. */
typedef struct {
/** The number of object IDs. */
int num_object_ids;
/** This field is used to store a variable number of object IDs. */
object_id object_ids[0];
} object_table_request_notifications_data;
/** Data that is needed to register new object available callbacks with the
* state database. */
typedef struct {
object_table_object_available_callback object_available_callback;
void *subscribe_context;
+241 -255
View File
@@ -295,138 +295,65 @@ task *parse_redis_task_table_entry(task_id id,
* ==== object_table callbacks ====
*/
enum {
OBJECT_TABLE_ADD_CHECK_HASH_INDEX = 0,
OBJECT_TABLE_ADD_GET_HASH_INDEX,
OBJECT_TABLE_ADD_REGISTER_MANAGER_INDEX,
OBJECT_TABLE_ADD_SET_SIZE_INDEX,
OBJECT_TABLE_ADD_PUBLISH_INDEX,
OBJECT_TABLE_ADD_MAX
};
void redis_object_table_add_callback(redisAsyncContext *c,
void *r,
void *privdata) {
LOG_DEBUG("Calling object table add callback");
REDIS_MULTI_CALLBACK_HEADER(db, callback_data, r, requests_info);
REDIS_CALLBACK_HEADER(db, callback_data, r);
/* Do some minimal checking. */
redisReply *reply = r;
object_id id = callback_data->id;
object_info *info = callback_data->data;
/* Check that we're at a valid command index. */
int request_index = requests_info->request_index;
LOG_DEBUG("Object table add request index is %d", request_index);
CHECK(request_index <= OBJECT_TABLE_ADD_MAX);
/* If we're on a valid command index, execute the current command and
* register a callback that will execute the next command by incrementing the
* request_index. */
int status = REDIS_OK;
++requests_info->request_index;
if (request_index == OBJECT_TABLE_ADD_CHECK_HASH_INDEX) {
/* Atomically set the object hash and get the previous value to compare to
* our hash, if a previous value existed. */
requests_info->is_redis_reply = true;
status = redisAsyncCommand(db->context, redis_object_table_add_callback,
(void *) requests_info, "SETNX objhash:%b %b",
id.id, sizeof(object_id), info->digest,
(size_t) DIGEST_SIZE);
} else if (request_index == OBJECT_TABLE_ADD_GET_HASH_INDEX) {
/* If there was an object hash in the table previously, check that it's
* equal to ours. */
CHECKM(reply->type == REDIS_REPLY_INTEGER,
"Expected Redis integer, received type %d %s", reply->type,
reply->str);
CHECKM(reply->integer == 0 || reply->integer == 1,
"Expected 0 or 1 from REDIS, received %lld", reply->integer);
if (reply->integer == 1) {
requests_info->is_redis_reply = false;
redis_object_table_add_callback(c, reply, (void *) requests_info);
} else {
requests_info->is_redis_reply = true;
status = redisAsyncCommand(db->context, redis_object_table_add_callback,
(void *) requests_info, "GET objhash:%b",
id.id, sizeof(object_id));
}
} else if (request_index == OBJECT_TABLE_ADD_REGISTER_MANAGER_INDEX) {
if (requests_info->is_redis_reply) {
CHECKM(reply->type == REDIS_REPLY_STRING,
"Expected Redis string, received type %d %s", reply->type,
reply->str);
DCHECK(reply->len == DIGEST_SIZE);
if (memcmp(info->digest, reply->str, reply->len) != 0) {
/* If our object hash doesn't match the one recorded in the table,
* report the error back to the user and exit immediately. */
LOG_FATAL(
"Found objects with different value but same object ID, most "
"likely because a nondeterministic task was executed twice, either "
"for reconstruction or for speculation.");
}
}
/* Add ourselves to the object's locations. */
requests_info->is_redis_reply = true;
status = redisAsyncCommand(db->context, redis_object_table_add_callback,
(void *) requests_info, "SADD obj:%b %b", id.id,
sizeof(id.id), (char *) db->client.id,
sizeof(db->client.id));
} else if (request_index == OBJECT_TABLE_ADD_SET_SIZE_INDEX) {
requests_info->is_redis_reply = true;
status = redisAsyncCommand(db->context, redis_object_table_add_callback,
(void *) requests_info, "HMSET obj:%b size %d",
(char *) id.id, sizeof(id.id), info->data_size);
} else if (request_index == OBJECT_TABLE_ADD_PUBLISH_INDEX) {
requests_info->is_redis_reply = true;
status = redisAsyncCommand(db->context, redis_object_table_add_callback,
(void *) requests_info, "PUBLISH obj:info %b:%d",
id.id, sizeof(id.id), info->data_size);
} else {
/* We finished executing all the Redis commands for this attempt at the
* table operation. */
free(requests_info);
/* If the transaction failed, exit and let the table operation's timout
* handler handle it. */
if (reply->type == REDIS_REPLY_NIL) {
return;
}
/* Else, call the done callback and clean up the table state. */
if (callback_data->done_callback) {
task_table_done_callback done_callback = callback_data->done_callback;
done_callback(callback_data->id, callback_data->user_context);
}
destroy_timer_callback(db->loop, callback_data);
if (strcmp(reply->str, "hash mismatch") == 0) {
/* If our object hash doesn't match the one recorded in the table, report
* the error back to the user and exit immediately. */
LOG_FATAL(
"Found objects with different value but same object ID, most likely "
"because a nondeterministic task was executed twice, either for "
"reconstruction or for speculation.");
}
/* If there was an error executing the current command, this attempt was a
* failure, so clean up the request info. */
if ((status == REDIS_ERR) || db->context->err) {
LOG_REDIS_DEBUG(db->context, "could not add object_table entry");
free(requests_info);
CHECK(reply->type != REDIS_REPLY_ERROR);
CHECK(strcmp(reply->str, "OK") == 0);
/* Call the done callback if there is one. */
if (callback_data->done_callback != NULL) {
task_table_done_callback done_callback = callback_data->done_callback;
done_callback(callback_data->id, callback_data->user_context);
}
/* Clean up the timer and callback. */
destroy_timer_callback(db->loop, callback_data);
}
void redis_object_table_add(table_callback_data *callback_data) {
CHECK(callback_data);
LOG_DEBUG("Calling object table add");
redis_requests_info *requests_info = malloc(sizeof(redis_requests_info));
requests_info->timer_id = callback_data->timer_id;
requests_info->request_index = OBJECT_TABLE_ADD_CHECK_HASH_INDEX;
requests_info->is_redis_reply = false;
db_handle *db = callback_data->db_handle;
redis_object_table_add_callback(db->context, NULL, (void *) requests_info);
object_table_add_data *info = callback_data->data;
object_id obj_id = callback_data->id;
int64_t object_size = info->object_size;
unsigned char *digest = info->digest;
int status = redisAsyncCommand(
db->context, redis_object_table_add_callback,
(void *) callback_data->timer_id, "RAY.OBJECT_TABLE_ADD %b %ld %b %b",
obj_id.id, sizeof(obj_id.id), object_size, digest, (size_t) DIGEST_SIZE,
db->client.id, sizeof(db->client.id));
if ((status == REDIS_ERR) || db->context->err) {
LOG_REDIS_DEBUG(db->context, "error in redis_object_table_add");
}
}
void redis_object_table_lookup(table_callback_data *callback_data) {
CHECK(callback_data);
db_handle *db = callback_data->db_handle;
/* Call redis asynchronously */
object_id id = callback_data->id;
object_table_get_entry_info *context =
malloc(sizeof(object_table_get_entry_info));
context->timer_id = callback_data->timer_id;
context->object_id = id;
int status = redisAsyncCommand(db->context, redis_object_table_get_entry,
(void *) context, "SMEMBERS obj:%b", id.id,
sizeof(id.id));
object_id obj_id = callback_data->id;
// object_table_get_entry_info *context =
// malloc(sizeof(object_table_get_entry_info));
// context->timer_id = callback_data->timer_id;
// context->object_id = id;
int status = redisAsyncCommand(
db->context, redis_object_table_lookup_callback,
(void *) callback_data->timer_id, "RAY.OBJECT_TABLE_LOOKUP %b", obj_id.id,
sizeof(obj_id.id));
if ((status == REDIS_ERR) || db->context->err) {
LOG_REDIS_DEBUG(db->context, "error in object_table lookup");
}
@@ -561,168 +488,227 @@ void redis_get_cached_db_client(db_handle *db,
*manager = entry->addr;
}
void redis_object_table_get_entry(redisAsyncContext *c,
void *r,
void *privdata) {
/* TODO(swang): This is a hack to pass the callback the original object ID
* argument. Remove once we're ready to integrate the Redis module
* implementation. */
object_table_get_entry_info *context = privdata;
privdata = (void *) context->timer_id;
object_id id = context->object_id;
free(context);
void redis_object_table_lookup_callback(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r);
redisReply *reply = r;
LOG_DEBUG("Object table get entry callback");
db_client_id *managers = malloc(reply->elements * sizeof(db_client_id));
object_id obj_id = callback_data->id;
LOG_DEBUG("Object table lookup callback");
CHECK(reply->type == REDIS_REPLY_ARRAY);
int64_t manager_count = reply->elements;
if (reply->type == REDIS_REPLY_ARRAY) {
const char **manager_vector = NULL;
if (manager_count > 0) {
manager_vector = malloc(manager_count * sizeof(char *));
}
for (int j = 0; j < reply->elements; ++j) {
CHECK(reply->element[j]->type == REDIS_REPLY_STRING);
memcpy(managers[j].id, reply->element[j]->str, sizeof(managers[j].id));
redis_get_cached_db_client(db, managers[j], manager_vector + j);
}
object_table_lookup_done_callback done_callback =
callback_data->done_callback;
if (done_callback) {
done_callback(id, manager_count, manager_vector,
callback_data->user_context);
}
if (callback_data->data != NULL) {
/* This callback was called from a subscribe call. */
object_table_subscribe_data *sub_data = callback_data->data;
object_table_object_available_callback sub_callback =
sub_data->object_available_callback;
if (manager_count > 0) {
/* TODO(swang): For global scheduler subscriptions, we should be
* calling this even when the manager_count is 0. */
if (sub_callback) {
sub_callback(id, manager_count, manager_vector,
sub_data->subscribe_context);
}
}
/* For the subscribe, don't delete the callback, only the timer. */
event_loop_remove_timer(callback_data->db_handle->loop,
callback_data->timer_id);
} else {
/* This callback was called from a publish call. */
/* For the lookup, remove timer and callback handler. */
destroy_timer_callback(callback_data->db_handle->loop, callback_data);
}
if (manager_count > 0) {
free(manager_vector);
}
} else {
LOG_FATAL("expected integer or string, received type %d", reply->type);
db_client_id *managers = NULL;
const char **manager_vector = NULL;
if (manager_count > 0) {
managers = malloc(reply->elements * sizeof(db_client_id));
manager_vector = malloc(manager_count * sizeof(char *));
}
for (int j = 0; j < reply->elements; ++j) {
CHECK(reply->element[j]->type == REDIS_REPLY_STRING);
memcpy(managers[j].id, reply->element[j]->str, sizeof(managers[j].id));
redis_get_cached_db_client(db, managers[j], manager_vector + j);
}
object_table_lookup_done_callback done_callback =
callback_data->done_callback;
if (done_callback) {
done_callback(obj_id, manager_count, manager_vector,
callback_data->user_context);
}
/* Clean up timer and callback. */
destroy_timer_callback(callback_data->db_handle->loop, callback_data);
if (manager_count > 0) {
free(managers);
free(manager_vector);
}
free(managers);
LOG_DEBUG("Object table get entry finishing");
}
void object_table_redis_subscribe_callback(redisAsyncContext *c,
void *r,
void *privdata) {
/**
* This will parse a payload string published on the object notification
* channel. The string must have the format:
*
* <object id> MANAGERS <manager id1> <manager id2> ...
*
* where there may be any positive number of manager IDs.
*
* @param db The db handle.
* @param payload The payload string.
* @param length The length of the string.
* @param manager_count This method will write the number of managers at this
* address.
* @param manager_vector This method will allocate an array of pointers to
* manager addresses and write the address of the array at this address.
* The caller is responsible for freeing this array.
* @return The object ID that the notification is about.
*/
object_id parse_subscribe_to_notifications_payload(
db_handle *db,
char *payload,
int length,
int *manager_count,
const char ***manager_vector) {
int num_managers = (length - sizeof(object_id) - 1 - strlen("MANAGERS")) /
(1 + sizeof(db_client_id));
CHECK(length ==
sizeof(object_id) + 1 + strlen("MANAGERS") +
num_managers * (1 + sizeof(db_client_id)));
CHECK(num_managers > 0);
object_id obj_id;
/* Track our current offset in the payload. */
int offset = 0;
/* Parse the object ID. */
memcpy(&obj_id.id, &payload[offset], sizeof(obj_id.id));
offset += sizeof(obj_id.id);
/* The next part of the payload is the string " MANAGERS". */
char *managers_str = " MANAGERS";
CHECK(memcmp(&payload[offset], managers_str, strlen(managers_str)) == 0);
offset += strlen(managers_str);
/* Parse the managers. */
const char **managers = malloc(num_managers * sizeof(char *));
for (int i = 0; i < num_managers; ++i) {
/* First there is a space. */
CHECK(memcmp(&payload[offset], " ", strlen(" ")) == 0);
offset += strlen(" ");
/* Get the manager ID. */
db_client_id manager_id;
memcpy(&manager_id.id, &payload[offset], sizeof(manager_id.id));
offset += sizeof(manager_id.id);
/* Write the address of the corresponding manager to the returned array. */
redis_get_cached_db_client(db, manager_id, &managers[i]);
}
CHECK(offset == length);
/* Return the manager array and the object ID. */
*manager_count = num_managers;
*manager_vector = managers;
return obj_id;
}
void object_table_redis_subscribe_to_notifications_callback(
redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r);
/* Replies to the SUBSCRIBE command have 3 elements. There are two
* possibilities. Either the reply is the initial acknowledgment of the
* subscribe command, or it is a message. If it is the initial acknowledgment,
* then
* - reply->element[0]->str is "subscribe"
* - reply->element[1]->str is the name of the channel
* - reply->emement[2]->str is null.
* If it is an actual message, then
* - reply->element[0]->str is "message"
* - reply->element[1]->str is the name of the channel
* - reply->emement[2]->str is the contents of the message.
*/
redisReply *reply = r;
CHECK(reply->type == REDIS_REPLY_ARRAY);
CHECK(reply->elements > 2);
/* Parse the message. First entry is message type, either "subscribe",
* "psubscribe", "message" or "pmessage". If the message type was "pmessage",
* there is an additional next message that is the pattern that the client
* PSUBSCRIBEd to. The next message is the topic published to. The final
* message is always the payload. */
object_id id = NIL_ID;
CHECK(reply->elements == 3);
redisReply *message_type = reply->element[0];
LOG_DEBUG("Object table subscribe callback, message %s", message_type->str);
LOG_DEBUG("Object table subscribe to notifications callback, message %s",
message_type->str);
if (strcmp(message_type->str, "message") == 0) {
/* A SUBSCRIBE notification. */
DCHECK(!IS_NIL_ID(callback_data->id));
DCHECK(reply->elements == 3);
/* Take the object ID from the original table operation call. */
id = callback_data->id;
} else if (strcmp(message_type->str, "pmessage") == 0) {
/* A PSUBSCRIBE notification. */
DCHECK(IS_NIL_ID(callback_data->id));
DCHECK(reply->elements == 4);
/* Parse the object ID from the keyspace. */
redisReply *keyspace = reply->element[2];
size_t prefix_length = strlen("__keyspace@0__:obj:");
DCHECK(keyspace->len == prefix_length + sizeof(object_id));
memcpy(&id, keyspace->str + prefix_length, sizeof(object_id));
/* Handle an object notification. */
int manager_count;
const char **manager_vector;
object_id obj_id = parse_subscribe_to_notifications_payload(
db, reply->element[2]->str, reply->element[2]->len, &manager_count,
&manager_vector);
/* Call the subscribe callback. */
object_table_subscribe_data *data = callback_data->data;
if (data->object_available_callback) {
data->object_available_callback(obj_id, manager_count, manager_vector,
data->subscribe_context);
}
free(manager_vector);
} else if (strcmp(message_type->str, "subscribe") == 0) {
/* The reply for the initial SUBSCRIBE. */
DCHECK(reply->elements == 3);
/* Take the object ID from the original table operation call. */
id = callback_data->id;
} else if (strcmp(message_type->str, "psubscribe") == 0) {
/* The reply for the initial PSUBSCRIBE. */
DCHECK(reply->elements == 3);
/* If the initial PSUBSCRIBE was successful, call the done callback with a
* NIL object ID to notify the client, and clean up the timer. */
object_table_lookup_done_callback done_callback =
callback_data->done_callback;
if (done_callback) {
/* The reply for the initial SUBSCRIBE command. */
/* Call the done callback if there is one. This code path should only be
* used in the tests. */
if (callback_data->done_callback != NULL) {
object_table_lookup_done_callback done_callback =
callback_data->done_callback;
done_callback(NIL_ID, 0, NULL, callback_data->user_context);
}
/* If the initial SUBSCRIBE was successful, clean up the timer, but don't
* destroy the callback data. */
event_loop_remove_timer(callback_data->db_handle->loop,
callback_data->timer_id);
callback_data->done_callback = NULL;
/* For PSUBSCRIBEs, always return before doing the lookup for the data,
* since we don't know what key to lookup yet. */
return;
} else {
LOG_FATAL("Unexpected reply type from object table subscribe");
}
/* Do a lookup for the actual data. */
CHECK(!IS_NIL_ID(id));
object_table_get_entry_info *context =
malloc(sizeof(object_table_get_entry_info));
context->timer_id = callback_data->timer_id;
context->object_id = id;
int status =
redisAsyncCommand(db->context, redis_object_table_get_entry,
(void *) context, "SMEMBERS obj:%b", id.id, sizeof(id));
if ((status == REDIS_ERR) || db->context->err) {
LOG_REDIS_ERROR(db->context,
"error in redis_object_table_subscribe_callback");
LOG_FATAL(
"Unexpected reply type from object table subscribe to notifications.");
}
}
void redis_object_table_subscribe(table_callback_data *callback_data) {
void redis_object_table_subscribe_to_notifications(
table_callback_data *callback_data) {
db_handle *db = callback_data->db_handle;
/* subscribe to key notification associated to object id */
object_id id = callback_data->id;
int status = REDIS_OK;
if (IS_NIL_ID(id)) {
/* Subscribe to all object events. */
status = redisAsyncCommand(
db->sub_context, object_table_redis_subscribe_callback,
(void *) callback_data->timer_id, "PSUBSCRIBE __keyspace@0__:obj:*");
} else {
/* Subscribe to the specified object id. */
status = redisAsyncCommand(
db->sub_context, object_table_redis_subscribe_callback,
(void *) callback_data->timer_id, "SUBSCRIBE __keyspace@0__:obj:%b",
id.id, sizeof(id.id));
}
/* The object channel prefix must match the value defined in
* src/common/redismodule/ray_redis_module.c. */
const char *object_channel_prefix = "OC:";
/* Subscribe to notifications from the object table. This uses the client ID
* as the channel name so this channel is specific to this client. TODO(rkn):
* The channel name should probably be the client ID with some prefix. */
int status = redisAsyncCommand(
db->sub_context, object_table_redis_subscribe_to_notifications_callback,
(void *) callback_data->timer_id, "SUBSCRIBE %s%b", object_channel_prefix,
db->client.id, sizeof(db->client.id));
if ((status == REDIS_ERR) || db->sub_context->err) {
LOG_REDIS_DEBUG(db->sub_context,
"error in redis_object_table_subscribe_callback");
"error in redis_object_table_subscribe_to_notifications");
}
}
void redis_object_table_request_notifications_callback(redisAsyncContext *c,
void *r,
void *privdata) {
REDIS_CALLBACK_HEADER(db, callback_data, r);
/* Do some minimal checking. */
redisReply *reply = r;
CHECK(strcmp(reply->str, "OK") == 0);
CHECK(callback_data->done_callback == NULL);
/* Clean up the timer and callback. */
destroy_timer_callback(db->loop, callback_data);
}
void redis_object_table_request_notifications(
table_callback_data *callback_data) {
db_handle *db = callback_data->db_handle;
object_table_request_notifications_data *request_data = callback_data->data;
int num_object_ids = request_data->num_object_ids;
object_id *object_ids = request_data->object_ids;
/* Create the arguments for the Redis command. */
int num_args = 1 + 1 + num_object_ids;
const char **argv = malloc(sizeof(char *) * num_args);
size_t *argvlen = malloc(sizeof(size_t) * num_args);
/* Set the command name argument. */
argv[0] = "RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS";
argvlen[0] = strlen(argv[0]);
/* Set the client ID argument. */
argv[1] = (char *) db->client.id;
argvlen[1] = sizeof(db->client.id);
/* Set the object ID arguments. */
for (int i = 0; i < num_object_ids; ++i) {
argv[2 + i] = (char *) object_ids[i].id;
argvlen[2 + i] = sizeof(object_ids[i].id);
}
int status = redisAsyncCommandArgv(
db->context, redis_object_table_request_notifications_callback,
(void *) callback_data->timer_id, num_args, argv, argvlen);
free(argv);
free(argvlen);
if ((status == REDIS_ERR) || db->context->err) {
LOG_REDIS_DEBUG(db->context,
"error in redis_object_table_subscribe_to_notifications");
}
}
@@ -813,7 +799,7 @@ void redis_task_table_publish(table_callback_data *callback_data,
sizeof(id.id), state, (char *) node.id, sizeof(node.id),
(char *) spec, task_spec_size(spec));
}
if ((status = REDIS_ERR) || db->context->err) {
if ((status == REDIS_ERR) || db->context->err) {
LOG_REDIS_DEBUG(db->context, "error setting task in task_table_add_task");
}
}
+27 -3
View File
@@ -82,13 +82,25 @@ void redis_object_table_lookup(table_callback_data *callback_data);
void redis_object_table_add(table_callback_data *callback_data);
/**
* Subscribe to learn when a new object becomes available.
* Create a client-specific channel for receiving notifications from the object
* table.
*
* @param callback_data Data structure containing redis connection and timeout
* information.
* @return Void.
*/
void redis_object_table_subscribe(table_callback_data *callback_data);
void redis_object_table_subscribe_to_notifications(
table_callback_data *callback_data);
/**
* Request notifications about when certain objects become available.
*
* @param callback_data Data structure containing redis connection and timeout
* information.
* @return Void.
*/
void redis_object_table_request_notifications(
table_callback_data *callback_data);
/**
* Add a new object to the object table in redis.
@@ -109,6 +121,19 @@ void redis_result_table_add(table_callback_data *callback_data);
*/
void redis_result_table_lookup(table_callback_data *callback_data);
/**
* Callback invoked when the reply from the object table lookup command is
* received.
*
* @param c Redis context.
* @param r Reply.
* @param privdata Data associated to the callback.
* @return Void.
*/
void redis_object_table_lookup_callback(redisAsyncContext *c,
void *r,
void *privdata);
/*
* ==== Redis task table function =====
*/
@@ -150,7 +175,6 @@ void redis_task_table_update(table_callback_data *callback_data);
* @param privdata Data associated to the callback.
* @return Void.
*/
void redis_task_table_publish_push_callback(redisAsyncContext *c,
void *r,
void *privdata);
+15
View File
@@ -3,6 +3,17 @@
#include <inttypes.h>
#include "redis.h"
void default_table_failure_callback(object_id id,
void *user_context,
void *user_data) {
CHECKM(0, "default_table_failure_callback was called.");
}
static const retry_info default_retry = {
.num_retries = 0,
.timeout = 1000,
.fail_callback = default_table_failure_callback};
table_callback_data *init_table_callback(db_handle *db_handle,
unique_id id,
const char *label,
@@ -13,6 +24,10 @@ table_callback_data *init_table_callback(db_handle *db_handle,
void *user_context) {
CHECK(db_handle);
CHECK(db_handle->loop);
/* If no retry info is provided, use the default retry info. */
if (retry == NULL) {
retry = (retry_info *) &default_retry;
}
CHECK(retry);
/* Allocate and initialize callback data structure for object table */
table_callback_data *callback_data = malloc(sizeof(table_callback_data));
+74 -113
View File
@@ -216,7 +216,6 @@ TEST add_timeout_test(void) {
/* === Test subscribe timeout === */
const char *subscribe_timeout_context = "subscribe_timeout";
int subscribe_failed = 0;
void subscribe_done_callback(object_id object_id,
@@ -231,7 +230,6 @@ void subscribe_fail_callback(unique_id id,
void *user_context,
void *user_data) {
subscribe_failed = 1;
CHECK(user_context == (void *) subscribe_timeout_context);
event_loop_stop(g_loop);
}
@@ -245,9 +243,8 @@ TEST subscribe_timeout_test(void) {
.timeout = 100,
.fail_callback = subscribe_fail_callback,
};
object_table_subscribe(db, NIL_ID, NULL, NULL, &retry,
subscribe_done_callback,
(void *) subscribe_timeout_context);
object_table_subscribe_to_notifications(db, subscribe_done_callback, NULL,
&retry, NULL, NULL);
/* Disconnect the database to see if the lookup times out. */
close(db->sub_context->c.fd);
event_loop_run(g_loop);
@@ -475,9 +472,9 @@ TEST subscribe_retry_test(void) {
.timeout = 100,
.fail_callback = subscribe_retry_fail_callback,
};
object_table_subscribe(db, NIL_ID, NULL, NULL, &retry,
subscribe_retry_done_callback,
(void *) subscribe_retry_context);
object_table_subscribe_to_notifications(db, NULL, NULL, &retry,
subscribe_retry_done_callback,
(void *) subscribe_retry_context);
/* Disconnect the database to let the subscribe times out the first time. */
close(db->sub_context->c.fd);
/* Install handler for reconnecting the database. */
@@ -615,9 +612,9 @@ TEST subscribe_late_test(void) {
.timeout = 0,
.fail_callback = subscribe_late_fail_callback,
};
object_table_subscribe(db, NIL_ID, NULL, NULL, &retry,
subscribe_late_done_callback,
(void *) subscribe_late_context);
object_table_subscribe_to_notifications(db, NULL, NULL, &retry,
subscribe_late_done_callback,
(void *) subscribe_late_context);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
@@ -651,11 +648,10 @@ void subscribe_success_done_callback(object_id object_id,
int manager_count,
const char *manager_vector[],
void *user_context) {
CHECK(object_ids_equal(object_id, subscribe_id));
retry_info retry = {
.num_retries = 0, .timeout = 750, .fail_callback = NULL,
};
object_table_add((db_handle *) user_context, object_id, 0,
object_table_add((db_handle *) user_context, subscribe_id, 0,
(unsigned char *) NIL_DIGEST, &retry, NULL, NULL);
subscribe_success_done = 1;
}
@@ -682,10 +678,13 @@ TEST subscribe_success_test(void) {
.timeout = 100,
.fail_callback = subscribe_success_fail_callback,
};
object_table_subscribe(db, subscribe_id,
subscribe_success_object_available_callback,
(void *) subscribe_success_context, &retry,
subscribe_success_done_callback, (void *) db);
object_table_subscribe_to_notifications(
db, subscribe_success_object_available_callback,
(void *) subscribe_success_context, &retry,
subscribe_success_done_callback, (void *) db);
object_id object_ids[1] = {subscribe_id};
object_table_request_notifications(db, 1, object_ids, &retry);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
@@ -702,68 +701,6 @@ TEST subscribe_success_test(void) {
PASS();
}
/* === Test psubscribe object available succeed === */
const char *psubscribe_success_context = "psubscribe_success";
int psubscribe_success_done = 0;
int psubscribe_success_succeeded = 0;
object_id psubscribe_id;
void psubscribe_success_done_callback(object_id callback_object_id,
int manager_count,
const char *manager_vector[],
void *user_context) {
CHECK(IS_NIL_ID(callback_object_id));
retry_info retry = {
.num_retries = 0, .timeout = 750, .fail_callback = NULL,
};
object_table_add((db_handle *) user_context, psubscribe_id, 0,
(unsigned char *) NIL_DIGEST, &retry, NULL, NULL);
psubscribe_success_done = 1;
}
void psubscribe_success_object_available_callback(object_id object_id,
int manager_count,
const char *manager_vector[],
void *user_context) {
CHECK(user_context == (void *) psubscribe_success_context);
CHECK(object_ids_equal(object_id, psubscribe_id));
CHECK(manager_count == 1);
psubscribe_success_succeeded = 1;
}
TEST psubscribe_success_test(void) {
g_loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236);
db_attach(db, g_loop, false);
psubscribe_id = globally_unique_id();
retry_info retry = {
.num_retries = 0,
.timeout = 100,
.fail_callback = subscribe_success_fail_callback,
};
object_table_subscribe(db, NIL_ID,
psubscribe_success_object_available_callback,
(void *) psubscribe_success_context, &retry,
psubscribe_success_done_callback, (void *) db);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
event_loop_destroy(g_loop);
ASSERT(psubscribe_success_done);
ASSERT(psubscribe_success_succeeded);
PASS();
}
/* Test if subscribe succeeds if the object is already present. */
const char *subscribe_object_present_context = "subscribe_object_present";
@@ -779,6 +716,11 @@ void subscribe_object_present_object_available_callback(
CHECK(manager_count == 1);
}
void fatal_fail_callback(unique_id id, void *user_context, void *user_data) {
/* This function should never be called. */
CHECK(0);
}
TEST subscribe_object_present_test(void) {
g_loop = event_loop_create();
db_handle *db =
@@ -786,19 +728,28 @@ TEST subscribe_object_present_test(void) {
db_attach(db, g_loop, false);
unique_id id = globally_unique_id();
retry_info retry = {
.num_retries = 0, .timeout = 100, .fail_callback = NULL,
.num_retries = 0, .timeout = 100, .fail_callback = fatal_fail_callback,
};
object_table_add(db, id, 0, (unsigned char *) NIL_DIGEST, &retry, NULL, NULL);
object_table_subscribe(
db, id, subscribe_object_present_object_available_callback,
object_table_subscribe_to_notifications(
db, subscribe_object_present_object_available_callback,
(void *) subscribe_object_present_context, &retry, NULL, (void *) db);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
/* Run the event loop to create do the add and subscribe. */
event_loop_run(g_loop);
object_id object_ids[1] = {id};
object_table_request_notifications(db, 1, object_ids, &retry);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
/* Run the event loop to do the request notifications. */
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
event_loop_destroy(g_loop);
@@ -810,15 +761,14 @@ TEST subscribe_object_present_test(void) {
const char *subscribe_object_not_present_context =
"subscribe_object_not_present";
int subscribe_object_not_present_succeeded = 0;
void subscribe_object_not_present_object_available_callback(
object_id object_id,
int manager_count,
const char *manager_vector[],
void *user_context) {
CHECK(user_context == (void *) subscribe_object_not_present_context);
subscribe_object_not_present_succeeded = 1;
/* This should not be called. */
CHECK(0);
}
TEST subscribe_object_not_present_test(void) {
@@ -830,20 +780,28 @@ TEST subscribe_object_not_present_test(void) {
retry_info retry = {
.num_retries = 0, .timeout = 100, .fail_callback = NULL,
};
object_table_subscribe(
db, id, subscribe_object_not_present_object_available_callback,
object_table_subscribe_to_notifications(
db, subscribe_object_not_present_object_available_callback,
(void *) subscribe_object_not_present_context, &retry, NULL, (void *) db);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
/* Run the event loop to do the subscribe. */
event_loop_run(g_loop);
object_id object_ids[1] = {id};
object_table_request_notifications(db, 1, object_ids, &retry);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
/* Run the event loop to do the request notifications. */
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
event_loop_destroy(g_loop);
ASSERT(subscribe_object_not_present_succeeded == 0);
PASS();
}
@@ -864,40 +822,44 @@ void subscribe_object_available_later_object_available_callback(
CHECK(manager_count == 1);
}
int64_t add_object_callback(event_loop *loop, int64_t timer_id, void *context) {
db_handle *db = (db_handle *) context;
retry_info retry = {
.num_retries = 0, .timeout = 100, .fail_callback = NULL,
};
object_id id = globally_unique_id();
object_table_add(db, id, 0, (unsigned char *) NIL_DIGEST, &retry, NULL, NULL);
/* Reset the timer to this large value, so it doesn't trigger again. */
return 10000;
}
TEST subscribe_object_available_later_test(void) {
g_loop = event_loop_create();
db_handle *db =
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236);
db_attach(db, g_loop, false);
unique_id id = NIL_ID;
unique_id id = globally_unique_id();
retry_info retry = {
.num_retries = 0, .timeout = 100, .fail_callback = NULL,
};
object_table_subscribe(
db, id, subscribe_object_available_later_object_available_callback,
object_table_subscribe_to_notifications(
db, subscribe_object_available_later_object_available_callback,
(void *) subscribe_object_available_later_context, &retry, NULL,
(void *) db);
event_loop_add_timer(g_loop, 300,
(event_loop_timer_handler) add_object_callback, db);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
/* Run the event loop to do the subscribe. */
event_loop_run(g_loop);
object_id object_ids[1] = {id};
object_table_request_notifications(db, 1, object_ids, &retry);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
/* Run the event loop to do the request notifications. */
event_loop_run(g_loop);
ASSERT_EQ(subscribe_object_available_later_succeeded, 0);
object_table_add(db, id, 0, (unsigned char *) NIL_DIGEST, &retry, NULL, NULL);
/* Install handler for terminating the event loop. */
event_loop_add_timer(g_loop, 750,
(event_loop_timer_handler) terminate_event_loop_callback,
NULL);
/* Run the event loop to do the object table add. */
event_loop_run(g_loop);
db_disconnect(db);
destroy_outstanding_callbacks(g_loop);
event_loop_destroy(g_loop);
@@ -989,11 +951,10 @@ SUITE(object_table_tests) {
RUN_REDIS_TEST(add_late_test);
RUN_REDIS_TEST(subscribe_late_test);
RUN_REDIS_TEST(subscribe_success_test);
RUN_REDIS_TEST(psubscribe_success_test);
RUN_REDIS_TEST(subscribe_object_present_test);
RUN_REDIS_TEST(subscribe_object_not_present_test);
RUN_REDIS_TEST(subscribe_object_available_later_test);
RUN_REDIS_TEST(subscribe_object_info_success_test);
// RUN_REDIS_TEST(subscribe_object_info_success_test);
}
GREATEST_MAIN_DEFS();
+73 -32
View File
@@ -718,6 +718,9 @@ void process_transfer_request(event_loop *loop,
* forever if we don't end up sealing this object. */
/* The corresponding call to plasma_release will happen in
* write_object_chunk. */
int has_obj;
plasma_contains(conn->manager_state->plasma_conn, object_id, &has_obj);
DCHECK(has_obj);
plasma_get(conn->manager_state->plasma_conn, object_id, &data_size, &data,
&metadata_size, &metadata);
assert(metadata == data + data_size);
@@ -822,6 +825,17 @@ void request_transfer_from(plasma_manager_state *manager_state,
client_connection *manager_conn =
get_manager_connection(manager_state, addr, port);
/* Check that this manager isn't trying to request an object from itself.
* TODO(rkn): Later this should not be fatal. */
uint8_t temp_addr[4];
sscanf(addr, "%hhu.%hhu.%hhu.%hhu", &temp_addr[0], &temp_addr[1],
&temp_addr[2], &temp_addr[3]);
if (memcmp(temp_addr, manager_state->addr, 4) == 0 &&
port == manager_state->port) {
LOG_FATAL("This manager is attempting to request a transfer from itself.");
}
plasma_request_buffer *transfer_request =
malloc(sizeof(plasma_request_buffer));
transfer_request->type = PLASMA_TRANSFER;
@@ -968,6 +982,12 @@ void process_fetch_requests(client_connection *client_conn,
int num_object_ids,
object_request object_requests[]) {
plasma_manager_state *manager_state = client_conn->manager_state;
int num_object_ids_to_request = 0;
/* This is allocating more space than necessary, but we do not know the exact
* number of object IDs to request notifications for yet. */
object_id *object_ids_to_request = malloc(num_object_ids * sizeof(object_id));
for (int i = 0; i < num_object_ids; ++i) {
object_id obj_id = object_requests[i].object_id;
@@ -989,21 +1009,26 @@ void process_fetch_requests(client_connection *client_conn,
entry = create_fetch_request(manager_state, obj_id);
HASH_ADD(hh, manager_state->fetch_requests, object_id,
sizeof(entry->object_id), entry);
/* Get a list of Plasma Managers that have this object from the object
* table. If the list of Plasma Managers is non-empty, the callback should
* initiate a transfer. */
/* TODO(rkn): Make sure this also handles the case where the list is
* initially empty. */
/* Add this object ID to the list of object IDs to request notifications for
* from the object table. */
object_ids_to_request[num_object_ids_to_request] = obj_id;
num_object_ids_to_request += 1;
}
if (num_object_ids_to_request > 0) {
/* Request notifications from the object table when these object IDs become
* available. The notifications will call the callback that was passed to
* object_table_subscribe_to_notifications, which will initiate a transfer
* of the object to this plasma manager. */
retry_info retry;
memset(&retry, 0, sizeof(retry));
retry.num_retries = 0;
retry.timeout = MANAGER_TIMEOUT;
retry.fail_callback = fatal_table_callback;
object_table_subscribe(manager_state->db, obj_id,
object_table_subscribe_callback, manager_state,
&retry, NULL, NULL);
object_table_request_notifications(manager_state->db,
num_object_ids_to_request,
object_ids_to_request, &retry);
}
free(object_ids_to_request);
}
int wait_timeout_handler(event_loop *loop, timer_id id, void *context) {
@@ -1036,6 +1061,12 @@ void process_wait_request(client_connection *client_conn,
wait_req->num_objects_to_wait_for = num_ready_objects;
wait_req->num_satisfied = 0;
int num_object_ids_to_request = 0;
/* This is allocating more space than necessary, but we do not know the exact
* number of object IDs to request notifications for yet. */
object_id *object_ids_to_request =
malloc(num_object_requests * sizeof(object_id));
for (int i = 0; i < num_object_requests; ++i) {
object_id obj_id = object_requests[i].object_id;
@@ -1055,23 +1086,10 @@ void process_wait_request(client_connection *client_conn,
/* TODO(rkn): If desired, we could issue a fetch command here to retrieve
* the object. */
} else if (wait_req->object_requests[i].type == PLASMA_QUERY_ANYWHERE) {
/* Subscribe to a notification for when the object is available somewhere
* in the system. */
retry_info retry;
memset(&retry, 0, sizeof(retry));
retry.num_retries = 0;
/* TODO(rkn): This timeout is excessive. However, the number of calls to
* object_table_subscribe here is also excessive. The issue may be the
* number of timers added to the manager event loop. Under heavy usage,
* this will trigger the fatal failure callback. The solution is probably
* to use Redis modules to write a special purpose command so that we only
* need to do a single call to Redis here (and hence create only a single
* timer). */
retry.timeout = 100000;
retry.fail_callback = fatal_table_callback;
object_table_subscribe(manager_state->db, obj_id,
object_table_subscribe_callback, manager_state,
&retry, NULL, NULL);
/* Add this object ID to the list of object IDs to request notifications
* for from the object table. */
object_ids_to_request[num_object_ids_to_request] = obj_id;
num_object_ids_to_request += 1;
} else {
/* This code should be unreachable. */
CHECK(0);
@@ -1082,12 +1100,27 @@ void process_wait_request(client_connection *client_conn,
* client. */
if (wait_req->num_satisfied >= wait_req->num_objects_to_wait_for) {
return_from_wait(manager_state, wait_req);
return;
}
} else {
if (num_object_ids_to_request > 0) {
/* Request notifications from the object table when these object IDs
* become available. The notifications will call the callback that was
* passed to object_table_subscribe_to_notifications, which will update
* the wait request. */
retry_info retry;
memset(&retry, 0, sizeof(retry));
retry.num_retries = 0;
retry.timeout = MANAGER_TIMEOUT;
retry.fail_callback = fatal_table_callback;
object_table_request_notifications(manager_state->db,
num_object_ids_to_request,
object_ids_to_request, &retry);
}
/* Set a timer that will cause the wait request to return to the client. */
wait_req->timer = event_loop_add_timer(manager_state->loop, timeout_ms,
wait_timeout_handler, wait_req);
/* Set a timer that will cause the wait request to return to the client. */
wait_req->timer = event_loop_add_timer(manager_state->loop, timeout_ms,
wait_timeout_handler, wait_req);
}
free(object_ids_to_request);
}
/**
@@ -1209,7 +1242,8 @@ void process_object_notification(event_loop *loop,
if (state->db) {
/* TODO(swang): Log the error if we fail to add the object, and possibly
* retry later? */
object_table_add(state->db, obj_id, object_info.data_size,
object_table_add(state->db, obj_id,
object_info.data_size + object_info.metadata_size,
object_info.digest, &retry, NULL, NULL);
}
@@ -1240,6 +1274,7 @@ void process_message(event_loop *loop,
switch (type) {
case PLASMA_TRANSFER:
LOG_DEBUG("Processing plasma transfer request.");
DCHECK(req->num_object_ids == 1);
process_transfer_request(loop, req->object_requests[0].object_id, req->addr,
req->port, conn);
@@ -1341,6 +1376,12 @@ void start_server(const char *store_socket_name,
handle_new_client, g_manager_state);
event_loop_add_file(g_manager_state->loop, remote_sock, EVENT_LOOP_READ,
handle_new_client, g_manager_state);
/* Set up a client-specific channel to receive notifications from the object
* table. */
object_table_subscribe_to_notifications(g_manager_state->db,
object_table_subscribe_callback,
g_manager_state, NULL, NULL, NULL);
/* Run the event loop. */
event_loop_run(g_manager_state->loop);
}