mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 22:51:19 +08:00
Object table remove (#139)
* Object table remove redis module * Test case for object table remove redis module * Client code for object_table_remove * Delete object notifications in plasma * Test for object deletion notifications * Fix subscribe deletion test * Address Robert's comments * free hash table entry
This commit is contained in:
committed by
Philipp Moritz
parent
cb3e6cde9e
commit
d729f9b7ea
@@ -15,6 +15,7 @@ typedef struct {
|
||||
int64_t create_time;
|
||||
int64_t construct_duration;
|
||||
unsigned char digest[DIGEST_SIZE];
|
||||
bool is_deletion;
|
||||
} object_info;
|
||||
|
||||
#endif
|
||||
|
||||
@@ -191,7 +191,8 @@ int ObjectTableLookup_RedisCommand(RedisModuleCtx *ctx,
|
||||
OpenPrefixedKey(ctx, OBJECT_LOCATION_PREFIX, argv[1], REDISMODULE_READ);
|
||||
|
||||
int keytype = RedisModule_KeyType(key);
|
||||
if (keytype == REDISMODULE_KEYTYPE_EMPTY) {
|
||||
if (keytype == REDISMODULE_KEYTYPE_EMPTY ||
|
||||
RedisModule_ValueLength(key) == 0) {
|
||||
return RedisModule_ReplyWithArray(ctx, 0);
|
||||
}
|
||||
|
||||
@@ -392,6 +393,49 @@ int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModule_CloseKey(object_notification_key);
|
||||
}
|
||||
|
||||
RedisModule_CloseKey(table_key);
|
||||
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a manager from a location entry in the object table.
|
||||
*
|
||||
* This is called from a client with the command:
|
||||
*
|
||||
* RAY.OBJECT_TABLE_REMOVE <object id> <manager id>
|
||||
*
|
||||
* @param object_id A string representing the object ID.
|
||||
* @param manager A string which represents the manager ID of the plasma manager
|
||||
* to remove.
|
||||
* @return OK if the operation was successful or an error with string
|
||||
* "object not found" if the entry for the object_id doesn't exist. The
|
||||
* operation is counted as a success if the manager was already not in
|
||||
* the entry.
|
||||
*/
|
||||
int ObjectTableRemove_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
if (argc != 3) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
||||
RedisModuleString *object_id = argv[1];
|
||||
RedisModuleString *manager = argv[2];
|
||||
|
||||
/* Remove the location from the object location table. */
|
||||
RedisModuleKey *table_key;
|
||||
table_key = OpenPrefixedKey(ctx, OBJECT_LOCATION_PREFIX, object_id,
|
||||
REDISMODULE_READ | REDISMODULE_WRITE);
|
||||
int keytype = RedisModule_KeyType(table_key);
|
||||
if (keytype == REDISMODULE_KEYTYPE_EMPTY) {
|
||||
RedisModule_CloseKey(table_key);
|
||||
return RedisModule_ReplyWithError(ctx, "object not found");
|
||||
}
|
||||
|
||||
RedisModule_ZsetRem(table_key, manager, NULL);
|
||||
RedisModule_CloseKey(table_key);
|
||||
|
||||
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
@@ -808,6 +852,12 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx,
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "ray.object_table_remove",
|
||||
ObjectTableRemove_RedisCommand, "write", 0, 0,
|
||||
0) == REDISMODULE_ERR) {
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "ray.object_table_request_notifications",
|
||||
ObjectTableRequestNotifications_RedisCommand,
|
||||
"write pubsub", 0, 0, 0) == REDISMODULE_ERR) {
|
||||
|
||||
@@ -101,6 +101,40 @@ class TestGlobalStateStore(unittest.TestCase):
|
||||
with self.assertRaises(redis.ResponseError):
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", 1, "\x00hash2", "manager_id1")
|
||||
|
||||
def testObjectTableAddAndRemove(self):
|
||||
# Try removing a manager from an object ID that has not been added yet.
|
||||
with self.assertRaises(redis.ResponseError):
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id1")
|
||||
# Try calling RAY.OBJECT_TABLE_LOOKUP with an object ID that has not been
|
||||
# added yet.
|
||||
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
|
||||
self.assertEqual(set(response), set([]))
|
||||
# Add some managers and try again.
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id1")
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id2")
|
||||
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
|
||||
self.assertEqual(set(response), {b"manager_id1", b"manager_id2"})
|
||||
# Remove a manager that doesn't exist, and make sure we still have the same set.
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id3")
|
||||
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
|
||||
self.assertEqual(set(response), {b"manager_id1", b"manager_id2"})
|
||||
# Remove a manager that does exist. Make sure it gets removed the first
|
||||
# time and does nothing the second time.
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id1")
|
||||
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
|
||||
self.assertEqual(set(response), {b"manager_id2"})
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id1")
|
||||
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
|
||||
self.assertEqual(set(response), {b"manager_id2"})
|
||||
# Remove the last manager, and make sure we have an empty set.
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id2")
|
||||
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
|
||||
self.assertEqual(set(response), set())
|
||||
# Remove a manager from an empty set, and make sure we still have an empty set.
|
||||
self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id3")
|
||||
response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1")
|
||||
self.assertEqual(set(response), set())
|
||||
|
||||
def testObjectTableSubscribeToNotifications(self):
|
||||
data_size = 0xf1f0
|
||||
p = self.redis.pubsub()
|
||||
|
||||
@@ -28,6 +28,23 @@ void object_table_add(db_handle *db_handle,
|
||||
done_callback, redis_object_table_add, user_context);
|
||||
}
|
||||
|
||||
void object_table_remove(db_handle *db_handle,
|
||||
object_id object_id,
|
||||
db_client_id *client_id,
|
||||
retry_info *retry,
|
||||
object_table_done_callback done_callback,
|
||||
void *user_context) {
|
||||
CHECK(db_handle != NULL);
|
||||
/* Copy the client ID, if one was provided. */
|
||||
db_client_id *client_id_copy = NULL;
|
||||
if (client_id != NULL) {
|
||||
client_id_copy = malloc(sizeof(db_client_id));
|
||||
*client_id_copy = *client_id;
|
||||
}
|
||||
init_table_callback(db_handle, object_id, __func__, client_id_copy, retry,
|
||||
done_callback, redis_object_table_remove, user_context);
|
||||
}
|
||||
|
||||
void object_table_subscribe_to_notifications(
|
||||
db_handle *db_handle,
|
||||
bool subscribe_all,
|
||||
|
||||
@@ -86,20 +86,20 @@ typedef struct {
|
||||
*
|
||||
* @param db_handle Handle to db.
|
||||
* @param object_id Object unique identifier.
|
||||
* @param client_id A pointer to the database client ID to remove. If this is
|
||||
* set to NULL, then the client ID associated with db_handle will be
|
||||
* removed.
|
||||
* @param retry Information about retrying the request to the database.
|
||||
* @param done_callback Callback to be called when lookup completes.
|
||||
* @param user_context User context to be passed in the callbacks.
|
||||
* @return Void.
|
||||
*/
|
||||
/*
|
||||
void object_table_remove(db_handle *db,
|
||||
void object_table_remove(db_handle *db_handle,
|
||||
object_id object_id,
|
||||
lookup_callback callback,
|
||||
void *context);
|
||||
db_client_id *client_id,
|
||||
retry_info *retry,
|
||||
object_table_done_callback done_callback,
|
||||
void *user_context);
|
||||
*/
|
||||
|
||||
/*
|
||||
* ==== Subscribe to be announced when new object available ====
|
||||
|
||||
@@ -314,7 +314,7 @@ void redis_object_table_add_callback(redisAsyncContext *c,
|
||||
CHECK(strcmp(reply->str, "OK") == 0);
|
||||
/* Call the done callback if there is one. */
|
||||
if (callback_data->done_callback != NULL) {
|
||||
task_table_done_callback done_callback = callback_data->done_callback;
|
||||
object_table_done_callback done_callback = callback_data->done_callback;
|
||||
done_callback(callback_data->id, callback_data->user_context);
|
||||
}
|
||||
/* Clean up the timer and callback. */
|
||||
@@ -340,6 +340,49 @@ void redis_object_table_add(table_callback_data *callback_data) {
|
||||
}
|
||||
}
|
||||
|
||||
void redis_object_table_remove_callback(redisAsyncContext *c,
|
||||
void *r,
|
||||
void *privdata) {
|
||||
REDIS_CALLBACK_HEADER(db, callback_data, r);
|
||||
|
||||
/* Do some minimal checking. */
|
||||
redisReply *reply = r;
|
||||
if (strcmp(reply->str, "object not found") == 0) {
|
||||
/* If our object entry was not in the table, it's probably a race
|
||||
* condition with an object_table_add. */
|
||||
return;
|
||||
}
|
||||
CHECK(reply->type != REDIS_REPLY_ERROR);
|
||||
CHECK(strcmp(reply->str, "OK") == 0);
|
||||
/* Call the done callback if there is one. */
|
||||
if (callback_data->done_callback != NULL) {
|
||||
object_table_done_callback done_callback = callback_data->done_callback;
|
||||
done_callback(callback_data->id, callback_data->user_context);
|
||||
}
|
||||
/* Clean up the timer and callback. */
|
||||
destroy_timer_callback(db->loop, callback_data);
|
||||
}
|
||||
|
||||
void redis_object_table_remove(table_callback_data *callback_data) {
|
||||
db_handle *db = callback_data->db_handle;
|
||||
|
||||
object_id obj_id = callback_data->id;
|
||||
/* If the caller provided a manager ID to delete, use it. Otherwise, use our
|
||||
* own client ID as the ID to delete. */
|
||||
db_client_id *client_id = callback_data->data;
|
||||
if (client_id == NULL) {
|
||||
client_id = &db->client;
|
||||
}
|
||||
int status = redisAsyncCommand(
|
||||
db->context, redis_object_table_remove_callback,
|
||||
(void *) callback_data->timer_id, "RAY.OBJECT_TABLE_REMOVE %b %b",
|
||||
obj_id.id, sizeof(obj_id.id), client_id->id, sizeof(client_id->id));
|
||||
|
||||
if ((status == REDIS_ERR) || db->context->err) {
|
||||
LOG_REDIS_DEBUG(db->context, "error in redis_object_table_remove");
|
||||
}
|
||||
}
|
||||
|
||||
void redis_object_table_lookup(table_callback_data *callback_data) {
|
||||
CHECK(callback_data);
|
||||
db_handle *db = callback_data->db_handle;
|
||||
|
||||
@@ -81,6 +81,15 @@ void redis_object_table_lookup(table_callback_data *callback_data);
|
||||
*/
|
||||
void redis_object_table_add(table_callback_data *callback_data);
|
||||
|
||||
/**
|
||||
* Remove a location entry from the object table in redis.
|
||||
*
|
||||
* @param callback_data Data structure containing redis connection and timeout
|
||||
* information.
|
||||
* @return Void.
|
||||
*/
|
||||
void redis_object_table_remove(table_callback_data *callback_data);
|
||||
|
||||
/**
|
||||
* Create a client-specific channel for receiving notifications from the object
|
||||
* table.
|
||||
|
||||
@@ -425,6 +425,63 @@ TEST add_lookup_test(void) {
|
||||
PASS();
|
||||
}
|
||||
|
||||
/* === Test add, remove, then lookup === */
|
||||
void add_remove_lookup_done_callback(object_id object_id,
|
||||
int manager_count,
|
||||
const char *manager_vector[],
|
||||
void *context) {
|
||||
CHECK(context == (void *) lookup_retry_context);
|
||||
CHECK(manager_count == 0);
|
||||
lookup_retry_succeeded = 1;
|
||||
}
|
||||
|
||||
void add_remove_lookup_callback(object_id object_id, void *user_context) {
|
||||
db_handle *db = user_context;
|
||||
retry_info retry = {
|
||||
.num_retries = 5,
|
||||
.timeout = 100,
|
||||
.fail_callback = lookup_retry_fail_callback,
|
||||
};
|
||||
object_table_lookup(db, NIL_ID, &retry, add_remove_lookup_done_callback,
|
||||
(void *) lookup_retry_context);
|
||||
}
|
||||
|
||||
void add_remove_callback(object_id object_id, void *user_context) {
|
||||
db_handle *db = user_context;
|
||||
retry_info retry = {
|
||||
.num_retries = 5,
|
||||
.timeout = 100,
|
||||
.fail_callback = lookup_retry_fail_callback,
|
||||
};
|
||||
object_table_remove(db, NIL_ID, NULL, &retry, add_remove_lookup_callback,
|
||||
(void *) db);
|
||||
}
|
||||
|
||||
TEST add_remove_lookup_test(void) {
|
||||
g_loop = event_loop_create();
|
||||
lookup_retry_succeeded = 0;
|
||||
db_handle *db =
|
||||
db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235);
|
||||
db_attach(db, g_loop, true);
|
||||
retry_info retry = {
|
||||
.num_retries = 5,
|
||||
.timeout = 100,
|
||||
.fail_callback = lookup_retry_fail_callback,
|
||||
};
|
||||
object_table_add(db, NIL_ID, 0, (unsigned char *) NIL_DIGEST, &retry,
|
||||
add_remove_callback, (void *) db);
|
||||
/* Install handler for terminating the event loop. */
|
||||
event_loop_add_timer(g_loop, 750,
|
||||
(event_loop_timer_handler) terminate_event_loop_callback,
|
||||
NULL);
|
||||
event_loop_run(g_loop);
|
||||
db_disconnect(db);
|
||||
destroy_outstanding_callbacks(g_loop);
|
||||
event_loop_destroy(g_loop);
|
||||
ASSERT(lookup_retry_succeeded);
|
||||
PASS();
|
||||
}
|
||||
|
||||
/* === Test subscribe retry === */
|
||||
|
||||
const char *subscribe_retry_context = "subscribe_retry";
|
||||
@@ -1023,6 +1080,7 @@ SUITE(object_table_tests) {
|
||||
RUN_REDIS_TEST(lookup_retry_test);
|
||||
RUN_REDIS_TEST(add_retry_test);
|
||||
RUN_REDIS_TEST(add_lookup_test);
|
||||
RUN_REDIS_TEST(add_remove_lookup_test);
|
||||
RUN_REDIS_TEST(subscribe_retry_test);
|
||||
RUN_REDIS_TEST(lookup_late_test);
|
||||
RUN_REDIS_TEST(add_late_test);
|
||||
|
||||
@@ -337,6 +337,16 @@ void handle_object_available(local_scheduler_state *state,
|
||||
}
|
||||
}
|
||||
|
||||
void handle_object_removed(local_scheduler_state *state, object_id object_id) {
|
||||
scheduling_algorithm_state *algorithm_state = state->algorithm_state;
|
||||
available_object *entry;
|
||||
HASH_FIND(handle, algorithm_state->local_objects, &object_id,
|
||||
sizeof(object_id), entry);
|
||||
if (entry != NULL) {
|
||||
HASH_DELETE(handle, algorithm_state->local_objects, entry);
|
||||
}
|
||||
}
|
||||
|
||||
int num_tasks_in_queue(scheduling_algorithm_state *algorithm_state) {
|
||||
task_queue_entry *elt;
|
||||
int count;
|
||||
|
||||
@@ -70,6 +70,15 @@ void handle_object_available(local_scheduler_state *state,
|
||||
scheduling_algorithm_state *algorithm_state,
|
||||
object_id object_id);
|
||||
|
||||
/**
|
||||
* This function is called if an object is removed from the local plasma store.
|
||||
*
|
||||
* @param state The state of the local scheduler.
|
||||
* @param object_id ID of the object that was removed.
|
||||
* @return Void.
|
||||
*/
|
||||
void handle_object_removed(local_scheduler_state *state, object_id object_id);
|
||||
|
||||
/**
|
||||
* This function is called when a new worker becomes available
|
||||
*
|
||||
|
||||
@@ -122,7 +122,12 @@ void process_plasma_notification(event_loop *loop,
|
||||
close(client_sock);
|
||||
return;
|
||||
}
|
||||
handle_object_available(state, state->algorithm_state, object_info.obj_id);
|
||||
|
||||
if (object_info.is_deletion) {
|
||||
handle_object_removed(state, object_info.obj_id);
|
||||
} else {
|
||||
handle_object_available(state, state->algorithm_state, object_info.obj_id);
|
||||
}
|
||||
}
|
||||
|
||||
void reconstruct_object_task_lookup_callback(object_id reconstruct_object_id,
|
||||
|
||||
@@ -328,8 +328,10 @@ PyObject *PyPlasma_receive_notification(PyObject *self, PyObject *args) {
|
||||
if (!PyArg_ParseTuple(args, "i", &plasma_sock)) {
|
||||
return NULL;
|
||||
}
|
||||
/* Receive object notification from the plasma connection socket,
|
||||
* return a tuple of its fields: object_id, data_size, metadata_size. */
|
||||
/* Receive object notification from the plasma connection socket. If the
|
||||
* object was added, return a tuple of its fields: object_id, data_size,
|
||||
* metadata_size. If the object was deleted, data_size and metadata_size will
|
||||
* be set to -1. */
|
||||
int nbytes =
|
||||
read_bytes(plasma_sock, (uint8_t *) &object_info, sizeof(object_info));
|
||||
|
||||
@@ -342,8 +344,13 @@ PyObject *PyPlasma_receive_notification(PyObject *self, PyObject *args) {
|
||||
PyObject *t = PyTuple_New(3);
|
||||
PyTuple_SetItem(t, 0, PyBytes_FromStringAndSize(
|
||||
(char *) object_info.obj_id.id, UNIQUE_ID_SIZE));
|
||||
PyTuple_SetItem(t, 1, PyLong_FromLong(object_info.data_size));
|
||||
PyTuple_SetItem(t, 2, PyLong_FromLong(object_info.metadata_size));
|
||||
if (object_info.is_deletion) {
|
||||
PyTuple_SetItem(t, 1, PyLong_FromLong(-1));
|
||||
PyTuple_SetItem(t, 2, PyLong_FromLong(-1));
|
||||
} else {
|
||||
PyTuple_SetItem(t, 1, PyLong_FromLong(object_info.data_size));
|
||||
PyTuple_SetItem(t, 2, PyLong_FromLong(object_info.metadata_size));
|
||||
}
|
||||
|
||||
return t;
|
||||
}
|
||||
|
||||
+59
-26
@@ -1206,33 +1206,35 @@ void process_status_request(client_connection *client_conn,
|
||||
request_status_done, client_conn);
|
||||
}
|
||||
|
||||
void process_object_notification(event_loop *loop,
|
||||
int client_sock,
|
||||
void *context,
|
||||
int events) {
|
||||
plasma_manager_state *state = context;
|
||||
object_id obj_id;
|
||||
object_info object_info;
|
||||
retry_info retry = {
|
||||
.num_retries = NUM_RETRIES,
|
||||
.timeout = MANAGER_TIMEOUT,
|
||||
.fail_callback = NULL,
|
||||
};
|
||||
/* Read the notification from Plasma. */
|
||||
int error =
|
||||
read_bytes(client_sock, (uint8_t *) &object_info, sizeof(object_info));
|
||||
if (error < 0) {
|
||||
/* The store has closed the socket. */
|
||||
LOG_DEBUG(
|
||||
"The plasma store has closed the object notification socket, or some "
|
||||
"other error has occurred.");
|
||||
event_loop_remove_file(loop, client_sock);
|
||||
close(client_sock);
|
||||
return;
|
||||
void process_delete_object_notification(plasma_manager_state *state,
|
||||
object_info object_info) {
|
||||
object_id obj_id = object_info.obj_id;
|
||||
available_object *entry;
|
||||
HASH_FIND(hh, state->local_available_objects, &obj_id, sizeof(obj_id), entry);
|
||||
if (entry != NULL) {
|
||||
HASH_DELETE(hh, state->local_available_objects, entry);
|
||||
free(entry);
|
||||
}
|
||||
obj_id = object_info.obj_id;
|
||||
/* Add object to locally available object. */
|
||||
/* TODO(pcm): Where is this deallocated? */
|
||||
|
||||
/* Remove this object from the (redis) object table. */
|
||||
if (state->db) {
|
||||
retry_info retry = {
|
||||
.num_retries = NUM_RETRIES,
|
||||
.timeout = MANAGER_TIMEOUT,
|
||||
.fail_callback = NULL,
|
||||
};
|
||||
object_table_remove(state->db, obj_id, NULL, &retry, NULL, NULL);
|
||||
}
|
||||
|
||||
/* NOTE: There could be pending wait requests for this object that will now
|
||||
* return when the object is not actually available. For simplicity, we allow
|
||||
* this scenario rather than try to keep the wait request statuses exactly
|
||||
* up-to-date. */
|
||||
}
|
||||
|
||||
void process_add_object_notification(plasma_manager_state *state,
|
||||
object_info object_info) {
|
||||
object_id obj_id = object_info.obj_id;
|
||||
available_object *entry =
|
||||
(available_object *) malloc(sizeof(available_object));
|
||||
entry->object_id = obj_id;
|
||||
@@ -1243,6 +1245,11 @@ void process_object_notification(event_loop *loop,
|
||||
if (state->db) {
|
||||
/* TODO(swang): Log the error if we fail to add the object, and possibly
|
||||
* retry later? */
|
||||
retry_info retry = {
|
||||
.num_retries = NUM_RETRIES,
|
||||
.timeout = MANAGER_TIMEOUT,
|
||||
.fail_callback = NULL,
|
||||
};
|
||||
object_table_add(state->db, obj_id,
|
||||
object_info.data_size + object_info.metadata_size,
|
||||
object_info.digest, &retry, NULL, NULL);
|
||||
@@ -1263,6 +1270,32 @@ void process_object_notification(event_loop *loop,
|
||||
PLASMA_OBJECT_LOCAL);
|
||||
}
|
||||
|
||||
void process_object_notification(event_loop *loop,
|
||||
int client_sock,
|
||||
void *context,
|
||||
int events) {
|
||||
plasma_manager_state *state = context;
|
||||
object_info object_info;
|
||||
/* Read the notification from Plasma. */
|
||||
int error =
|
||||
read_bytes(client_sock, (uint8_t *) &object_info, sizeof(object_info));
|
||||
if (error < 0) {
|
||||
/* The store has closed the socket. */
|
||||
LOG_DEBUG(
|
||||
"The plasma store has closed the object notification socket, or some "
|
||||
"other error has occurred.");
|
||||
event_loop_remove_file(loop, client_sock);
|
||||
close(client_sock);
|
||||
return;
|
||||
}
|
||||
/* Add object to locally available object. */
|
||||
if (object_info.is_deletion) {
|
||||
process_delete_object_notification(state, object_info);
|
||||
} else {
|
||||
process_add_object_notification(state, object_info);
|
||||
}
|
||||
}
|
||||
|
||||
void process_message(event_loop *loop,
|
||||
int client_sock,
|
||||
void *context,
|
||||
|
||||
@@ -103,6 +103,8 @@ plasma_store_state *init_plasma_store(event_loop *loop, int64_t system_memory) {
|
||||
return state;
|
||||
}
|
||||
|
||||
void push_notification(plasma_store_state *state, object_id object_id);
|
||||
|
||||
/* If this client is not already using the object, add the client to the
|
||||
* object's list of clients, otherwise do nothing. */
|
||||
void add_client_to_object_clients(object_table_entry *entry,
|
||||
@@ -321,12 +323,7 @@ void seal_object(client *client_context,
|
||||
/* Set the object digest. */
|
||||
memcpy(entry->info.digest, digest, DIGEST_SIZE);
|
||||
/* Inform all subscribers that a new object has been sealed. */
|
||||
notification_queue *queue, *temp_queue;
|
||||
HASH_ITER(hh, plasma_state->pending_notifications, queue, temp_queue) {
|
||||
utarray_push_back(queue->object_ids, &object_id);
|
||||
send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state,
|
||||
0);
|
||||
}
|
||||
push_notification(plasma_state, object_id);
|
||||
|
||||
/* Inform processes getting this object that the object is ready now. */
|
||||
object_notify_entry *notify_entry;
|
||||
@@ -375,6 +372,8 @@ void delete_object(plasma_store_state *plasma_state, object_id object_id) {
|
||||
dlfree(pointer);
|
||||
utarray_free(entry->clients);
|
||||
free(entry);
|
||||
/* Inform all subscribers that the object has been deleted. */
|
||||
push_notification(plasma_state, object_id);
|
||||
}
|
||||
|
||||
void remove_objects(plasma_store_state *plasma_state,
|
||||
@@ -390,6 +389,15 @@ void remove_objects(plasma_store_state *plasma_state,
|
||||
}
|
||||
}
|
||||
|
||||
void push_notification(plasma_store_state *plasma_state, object_id object_id) {
|
||||
notification_queue *queue, *temp_queue;
|
||||
HASH_ITER(hh, plasma_state->pending_notifications, queue, temp_queue) {
|
||||
utarray_push_back(queue->object_ids, &object_id);
|
||||
send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state,
|
||||
0);
|
||||
}
|
||||
}
|
||||
|
||||
/* Send more notifications to a subscriber. */
|
||||
void send_notifications(event_loop *loop,
|
||||
int client_sock,
|
||||
@@ -409,9 +417,16 @@ void send_notifications(event_loop *loop,
|
||||
/* This object should already exist in plasma store state. */
|
||||
HASH_FIND(handle, plasma_state->plasma_store_info->objects, obj_id,
|
||||
sizeof(object_id), entry);
|
||||
CHECK(entry != NULL);
|
||||
|
||||
object_info object_info = entry->info;
|
||||
object_info object_info;
|
||||
if (entry == NULL) {
|
||||
memset(&object_info, 0, sizeof(object_info));
|
||||
object_info.obj_id = *obj_id;
|
||||
object_info.is_deletion = true;
|
||||
} else {
|
||||
object_info = entry->info;
|
||||
object_info.is_deletion = false;
|
||||
}
|
||||
|
||||
/* Attempt to send a notification about this object ID. */
|
||||
int nbytes =
|
||||
|
||||
@@ -351,6 +351,63 @@ class TestPlasmaClient(unittest.TestCase):
|
||||
self.assertEqual(data_sizes[j], recv_dsize)
|
||||
self.assertEqual(metadata_sizes[j], recv_msize)
|
||||
|
||||
def test_subscribe_deletions(self):
|
||||
# Subscribe to notifications from the Plasma Store. We use plasma_client2
|
||||
# to make sure that all used objects will get evicted properly.
|
||||
sock = self.plasma_client2.subscribe()
|
||||
for i in [1, 10, 100, 1000, 10000, 100000]:
|
||||
object_ids = [random_object_id() for _ in range(i)]
|
||||
# Add 1 to the sizes to make sure we have nonzero object sizes.
|
||||
metadata_sizes = [np.random.randint(1000) + 1 for _ in range(i)]
|
||||
data_sizes = [np.random.randint(1000) + 1 for _ in range(i)]
|
||||
for j in range(i):
|
||||
x = self.plasma_client2.create(object_ids[j], size=data_sizes[j],
|
||||
metadata=bytearray(np.random.bytes(metadata_sizes[j])))
|
||||
self.plasma_client2.seal(object_ids[j])
|
||||
del x
|
||||
# Check that we received notifications for creating all of the objects.
|
||||
for j in range(i):
|
||||
recv_objid, recv_dsize, recv_msize = self.plasma_client2.get_next_notification()
|
||||
self.assertEqual(object_ids[j], recv_objid)
|
||||
self.assertEqual(data_sizes[j], recv_dsize)
|
||||
self.assertEqual(metadata_sizes[j], recv_msize)
|
||||
|
||||
# Check that we receive notifications for deleting all objects, as we
|
||||
# evict them.
|
||||
for j in range(i):
|
||||
self.assertEqual(self.plasma_client2.evict(1), data_sizes[j] + metadata_sizes[j])
|
||||
recv_objid, recv_dsize, recv_msize = self.plasma_client2.get_next_notification()
|
||||
self.assertEqual(object_ids[j], recv_objid)
|
||||
self.assertEqual(-1, recv_dsize)
|
||||
self.assertEqual(-1, recv_msize)
|
||||
|
||||
# Test multiple deletion notifications. The first 9 object IDs have size 0,
|
||||
# and the last has a nonzero size. When Plasma evicts 1 byte, it will evict
|
||||
# all objects, so we should receive deletion notifications for each.
|
||||
num_object_ids = 10
|
||||
object_ids = [random_object_id() for _ in range(num_object_ids)]
|
||||
metadata_sizes = [0] * (num_object_ids - 1)
|
||||
data_sizes = [0] * (num_object_ids - 1)
|
||||
metadata_sizes.append(np.random.randint(1000))
|
||||
data_sizes.append(np.random.randint(1000))
|
||||
for i in range(num_object_ids):
|
||||
x = self.plasma_client2.create(object_ids[i], size=data_sizes[i],
|
||||
metadata=bytearray(np.random.bytes(metadata_sizes[i])))
|
||||
self.plasma_client2.seal(object_ids[i])
|
||||
del x
|
||||
for i in range(num_object_ids):
|
||||
recv_objid, recv_dsize, recv_msize = self.plasma_client2.get_next_notification()
|
||||
self.assertEqual(object_ids[i], recv_objid)
|
||||
self.assertEqual(data_sizes[i], recv_dsize)
|
||||
self.assertEqual(metadata_sizes[i], recv_msize)
|
||||
self.assertEqual(self.plasma_client2.evict(1), data_sizes[-1] + metadata_sizes[-1])
|
||||
for i in range(num_object_ids):
|
||||
recv_objid, recv_dsize, recv_msize = self.plasma_client2.get_next_notification()
|
||||
self.assertEqual(object_ids[i], recv_objid)
|
||||
self.assertEqual(-1, recv_dsize)
|
||||
self.assertEqual(-1, recv_msize)
|
||||
|
||||
|
||||
class TestPlasmaManager(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
||||
Reference in New Issue
Block a user