From d729f9b7eaec4c4ceb76e271f96dcfa060685e49 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Mon, 19 Dec 2016 23:18:57 -0800 Subject: [PATCH] Object table remove (#139) * Object table remove redis module * Test case for object table remove redis module * Client code for object_table_remove * Delete object notifications in plasma * Test for object deletion notifications * Fix subscribe deletion test * Address Robert's comments * free hash table entry --- src/common/object_info.h | 1 + src/common/redis_module/ray_redis_module.c | 52 ++++++++++++- src/common/redis_module/runtest.py | 34 +++++++++ src/common/state/object_table.c | 17 +++++ src/common/state/object_table.h | 10 +-- src/common/state/redis.c | 45 +++++++++++- src/common/state/redis.h | 9 +++ src/common/test/object_table_tests.c | 58 +++++++++++++++ src/photon/photon_algorithm.c | 10 +++ src/photon/photon_algorithm.h | 9 +++ src/photon/photon_scheduler.c | 7 +- src/plasma/plasma_extension.c | 15 +++- src/plasma/plasma_manager.c | 85 +++++++++++++++------- src/plasma/plasma_store.c | 31 ++++++-- src/plasma/test/test.py | 57 +++++++++++++++ 15 files changed, 394 insertions(+), 46 deletions(-) diff --git a/src/common/object_info.h b/src/common/object_info.h index dcbf67bc3..cf6173bfb 100644 --- a/src/common/object_info.h +++ b/src/common/object_info.h @@ -15,6 +15,7 @@ typedef struct { int64_t create_time; int64_t construct_duration; unsigned char digest[DIGEST_SIZE]; + bool is_deletion; } object_info; #endif diff --git a/src/common/redis_module/ray_redis_module.c b/src/common/redis_module/ray_redis_module.c index d2aa978ed..3218643c5 100644 --- a/src/common/redis_module/ray_redis_module.c +++ b/src/common/redis_module/ray_redis_module.c @@ -191,7 +191,8 @@ int ObjectTableLookup_RedisCommand(RedisModuleCtx *ctx, OpenPrefixedKey(ctx, OBJECT_LOCATION_PREFIX, argv[1], REDISMODULE_READ); int keytype = RedisModule_KeyType(key); - if (keytype == REDISMODULE_KEYTYPE_EMPTY) { + if (keytype == REDISMODULE_KEYTYPE_EMPTY || + RedisModule_ValueLength(key) == 0) { return RedisModule_ReplyWithArray(ctx, 0); } @@ -392,6 +393,49 @@ int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx, RedisModule_CloseKey(object_notification_key); } + RedisModule_CloseKey(table_key); + RedisModule_ReplyWithSimpleString(ctx, "OK"); + return REDISMODULE_OK; +} + +/** + * Remove a manager from a location entry in the object table. + * + * This is called from a client with the command: + * + * RAY.OBJECT_TABLE_REMOVE + * + * @param object_id A string representing the object ID. + * @param manager A string which represents the manager ID of the plasma manager + * to remove. + * @return OK if the operation was successful or an error with string + * "object not found" if the entry for the object_id doesn't exist. The + * operation is counted as a success if the manager was already not in + * the entry. + */ +int ObjectTableRemove_RedisCommand(RedisModuleCtx *ctx, + RedisModuleString **argv, + int argc) { + if (argc != 3) { + return RedisModule_WrongArity(ctx); + } + + RedisModuleString *object_id = argv[1]; + RedisModuleString *manager = argv[2]; + + /* Remove the location from the object location table. */ + RedisModuleKey *table_key; + table_key = OpenPrefixedKey(ctx, OBJECT_LOCATION_PREFIX, object_id, + REDISMODULE_READ | REDISMODULE_WRITE); + int keytype = RedisModule_KeyType(table_key); + if (keytype == REDISMODULE_KEYTYPE_EMPTY) { + RedisModule_CloseKey(table_key); + return RedisModule_ReplyWithError(ctx, "object not found"); + } + + RedisModule_ZsetRem(table_key, manager, NULL); + RedisModule_CloseKey(table_key); + RedisModule_ReplyWithSimpleString(ctx, "OK"); return REDISMODULE_OK; } @@ -808,6 +852,12 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, return REDISMODULE_ERR; } + if (RedisModule_CreateCommand(ctx, "ray.object_table_remove", + ObjectTableRemove_RedisCommand, "write", 0, 0, + 0) == REDISMODULE_ERR) { + return REDISMODULE_ERR; + } + if (RedisModule_CreateCommand(ctx, "ray.object_table_request_notifications", ObjectTableRequestNotifications_RedisCommand, "write pubsub", 0, 0, 0) == REDISMODULE_ERR) { diff --git a/src/common/redis_module/runtest.py b/src/common/redis_module/runtest.py index a6c075e41..3d9544116 100644 --- a/src/common/redis_module/runtest.py +++ b/src/common/redis_module/runtest.py @@ -101,6 +101,40 @@ class TestGlobalStateStore(unittest.TestCase): with self.assertRaises(redis.ResponseError): self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id3", 1, "\x00hash2", "manager_id1") + def testObjectTableAddAndRemove(self): + # Try removing a manager from an object ID that has not been added yet. + with self.assertRaises(redis.ResponseError): + self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id1") + # Try calling RAY.OBJECT_TABLE_LOOKUP with an object ID that has not been + # added yet. + response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1") + self.assertEqual(set(response), set([])) + # Add some managers and try again. + self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id1") + self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id2") + response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1") + self.assertEqual(set(response), {b"manager_id1", b"manager_id2"}) + # Remove a manager that doesn't exist, and make sure we still have the same set. + self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id3") + response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1") + self.assertEqual(set(response), {b"manager_id1", b"manager_id2"}) + # Remove a manager that does exist. Make sure it gets removed the first + # time and does nothing the second time. + self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id1") + response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1") + self.assertEqual(set(response), {b"manager_id2"}) + self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id1") + response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1") + self.assertEqual(set(response), {b"manager_id2"}) + # Remove the last manager, and make sure we have an empty set. + self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id2") + response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1") + self.assertEqual(set(response), set()) + # Remove a manager from an empty set, and make sure we still have an empty set. + self.redis.execute_command("RAY.OBJECT_TABLE_REMOVE", "object_id1", "manager_id3") + response = self.redis.execute_command("RAY.OBJECT_TABLE_LOOKUP", "object_id1") + self.assertEqual(set(response), set()) + def testObjectTableSubscribeToNotifications(self): data_size = 0xf1f0 p = self.redis.pubsub() diff --git a/src/common/state/object_table.c b/src/common/state/object_table.c index 2ad5d15ff..eb26360a4 100644 --- a/src/common/state/object_table.c +++ b/src/common/state/object_table.c @@ -28,6 +28,23 @@ void object_table_add(db_handle *db_handle, done_callback, redis_object_table_add, user_context); } +void object_table_remove(db_handle *db_handle, + object_id object_id, + db_client_id *client_id, + retry_info *retry, + object_table_done_callback done_callback, + void *user_context) { + CHECK(db_handle != NULL); + /* Copy the client ID, if one was provided. */ + db_client_id *client_id_copy = NULL; + if (client_id != NULL) { + client_id_copy = malloc(sizeof(db_client_id)); + *client_id_copy = *client_id; + } + init_table_callback(db_handle, object_id, __func__, client_id_copy, retry, + done_callback, redis_object_table_remove, user_context); +} + void object_table_subscribe_to_notifications( db_handle *db_handle, bool subscribe_all, diff --git a/src/common/state/object_table.h b/src/common/state/object_table.h index 7a705d5c4..6ed364d35 100644 --- a/src/common/state/object_table.h +++ b/src/common/state/object_table.h @@ -86,20 +86,20 @@ typedef struct { * * @param db_handle Handle to db. * @param object_id Object unique identifier. + * @param client_id A pointer to the database client ID to remove. If this is + * set to NULL, then the client ID associated with db_handle will be + * removed. * @param retry Information about retrying the request to the database. * @param done_callback Callback to be called when lookup completes. * @param user_context User context to be passed in the callbacks. * @return Void. */ -/* -void object_table_remove(db_handle *db, +void object_table_remove(db_handle *db_handle, object_id object_id, - lookup_callback callback, - void *context); + db_client_id *client_id, retry_info *retry, object_table_done_callback done_callback, void *user_context); -*/ /* * ==== Subscribe to be announced when new object available ==== diff --git a/src/common/state/redis.c b/src/common/state/redis.c index 41b2fc0dd..92f138c88 100644 --- a/src/common/state/redis.c +++ b/src/common/state/redis.c @@ -314,7 +314,7 @@ void redis_object_table_add_callback(redisAsyncContext *c, CHECK(strcmp(reply->str, "OK") == 0); /* Call the done callback if there is one. */ if (callback_data->done_callback != NULL) { - task_table_done_callback done_callback = callback_data->done_callback; + object_table_done_callback done_callback = callback_data->done_callback; done_callback(callback_data->id, callback_data->user_context); } /* Clean up the timer and callback. */ @@ -340,6 +340,49 @@ void redis_object_table_add(table_callback_data *callback_data) { } } +void redis_object_table_remove_callback(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, callback_data, r); + + /* Do some minimal checking. */ + redisReply *reply = r; + if (strcmp(reply->str, "object not found") == 0) { + /* If our object entry was not in the table, it's probably a race + * condition with an object_table_add. */ + return; + } + CHECK(reply->type != REDIS_REPLY_ERROR); + CHECK(strcmp(reply->str, "OK") == 0); + /* Call the done callback if there is one. */ + if (callback_data->done_callback != NULL) { + object_table_done_callback done_callback = callback_data->done_callback; + done_callback(callback_data->id, callback_data->user_context); + } + /* Clean up the timer and callback. */ + destroy_timer_callback(db->loop, callback_data); +} + +void redis_object_table_remove(table_callback_data *callback_data) { + db_handle *db = callback_data->db_handle; + + object_id obj_id = callback_data->id; + /* If the caller provided a manager ID to delete, use it. Otherwise, use our + * own client ID as the ID to delete. */ + db_client_id *client_id = callback_data->data; + if (client_id == NULL) { + client_id = &db->client; + } + int status = redisAsyncCommand( + db->context, redis_object_table_remove_callback, + (void *) callback_data->timer_id, "RAY.OBJECT_TABLE_REMOVE %b %b", + obj_id.id, sizeof(obj_id.id), client_id->id, sizeof(client_id->id)); + + if ((status == REDIS_ERR) || db->context->err) { + LOG_REDIS_DEBUG(db->context, "error in redis_object_table_remove"); + } +} + void redis_object_table_lookup(table_callback_data *callback_data) { CHECK(callback_data); db_handle *db = callback_data->db_handle; diff --git a/src/common/state/redis.h b/src/common/state/redis.h index 2ea7cea76..6c9c3d7cd 100644 --- a/src/common/state/redis.h +++ b/src/common/state/redis.h @@ -81,6 +81,15 @@ void redis_object_table_lookup(table_callback_data *callback_data); */ void redis_object_table_add(table_callback_data *callback_data); +/** + * Remove a location entry from the object table in redis. + * + * @param callback_data Data structure containing redis connection and timeout + * information. + * @return Void. + */ +void redis_object_table_remove(table_callback_data *callback_data); + /** * Create a client-specific channel for receiving notifications from the object * table. diff --git a/src/common/test/object_table_tests.c b/src/common/test/object_table_tests.c index c47a51489..35c38a583 100644 --- a/src/common/test/object_table_tests.c +++ b/src/common/test/object_table_tests.c @@ -425,6 +425,63 @@ TEST add_lookup_test(void) { PASS(); } +/* === Test add, remove, then lookup === */ +void add_remove_lookup_done_callback(object_id object_id, + int manager_count, + const char *manager_vector[], + void *context) { + CHECK(context == (void *) lookup_retry_context); + CHECK(manager_count == 0); + lookup_retry_succeeded = 1; +} + +void add_remove_lookup_callback(object_id object_id, void *user_context) { + db_handle *db = user_context; + retry_info retry = { + .num_retries = 5, + .timeout = 100, + .fail_callback = lookup_retry_fail_callback, + }; + object_table_lookup(db, NIL_ID, &retry, add_remove_lookup_done_callback, + (void *) lookup_retry_context); +} + +void add_remove_callback(object_id object_id, void *user_context) { + db_handle *db = user_context; + retry_info retry = { + .num_retries = 5, + .timeout = 100, + .fail_callback = lookup_retry_fail_callback, + }; + object_table_remove(db, NIL_ID, NULL, &retry, add_remove_lookup_callback, + (void *) db); +} + +TEST add_remove_lookup_test(void) { + g_loop = event_loop_create(); + lookup_retry_succeeded = 0; + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11235); + db_attach(db, g_loop, true); + retry_info retry = { + .num_retries = 5, + .timeout = 100, + .fail_callback = lookup_retry_fail_callback, + }; + object_table_add(db, NIL_ID, 0, (unsigned char *) NIL_DIGEST, &retry, + add_remove_callback, (void *) db); + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, + (event_loop_timer_handler) terminate_event_loop_callback, + NULL); + event_loop_run(g_loop); + db_disconnect(db); + destroy_outstanding_callbacks(g_loop); + event_loop_destroy(g_loop); + ASSERT(lookup_retry_succeeded); + PASS(); +} + /* === Test subscribe retry === */ const char *subscribe_retry_context = "subscribe_retry"; @@ -1023,6 +1080,7 @@ SUITE(object_table_tests) { RUN_REDIS_TEST(lookup_retry_test); RUN_REDIS_TEST(add_retry_test); RUN_REDIS_TEST(add_lookup_test); + RUN_REDIS_TEST(add_remove_lookup_test); RUN_REDIS_TEST(subscribe_retry_test); RUN_REDIS_TEST(lookup_late_test); RUN_REDIS_TEST(add_late_test); diff --git a/src/photon/photon_algorithm.c b/src/photon/photon_algorithm.c index 9e792e953..acebca71f 100644 --- a/src/photon/photon_algorithm.c +++ b/src/photon/photon_algorithm.c @@ -337,6 +337,16 @@ void handle_object_available(local_scheduler_state *state, } } +void handle_object_removed(local_scheduler_state *state, object_id object_id) { + scheduling_algorithm_state *algorithm_state = state->algorithm_state; + available_object *entry; + HASH_FIND(handle, algorithm_state->local_objects, &object_id, + sizeof(object_id), entry); + if (entry != NULL) { + HASH_DELETE(handle, algorithm_state->local_objects, entry); + } +} + int num_tasks_in_queue(scheduling_algorithm_state *algorithm_state) { task_queue_entry *elt; int count; diff --git a/src/photon/photon_algorithm.h b/src/photon/photon_algorithm.h index a4a13db2c..905e95ad4 100644 --- a/src/photon/photon_algorithm.h +++ b/src/photon/photon_algorithm.h @@ -70,6 +70,15 @@ void handle_object_available(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state, object_id object_id); +/** + * This function is called if an object is removed from the local plasma store. + * + * @param state The state of the local scheduler. + * @param object_id ID of the object that was removed. + * @return Void. + */ +void handle_object_removed(local_scheduler_state *state, object_id object_id); + /** * This function is called when a new worker becomes available * diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index a66eabe83..6b31f77a8 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -122,7 +122,12 @@ void process_plasma_notification(event_loop *loop, close(client_sock); return; } - handle_object_available(state, state->algorithm_state, object_info.obj_id); + + if (object_info.is_deletion) { + handle_object_removed(state, object_info.obj_id); + } else { + handle_object_available(state, state->algorithm_state, object_info.obj_id); + } } void reconstruct_object_task_lookup_callback(object_id reconstruct_object_id, diff --git a/src/plasma/plasma_extension.c b/src/plasma/plasma_extension.c index 727a53b2f..507010ef1 100644 --- a/src/plasma/plasma_extension.c +++ b/src/plasma/plasma_extension.c @@ -328,8 +328,10 @@ PyObject *PyPlasma_receive_notification(PyObject *self, PyObject *args) { if (!PyArg_ParseTuple(args, "i", &plasma_sock)) { return NULL; } - /* Receive object notification from the plasma connection socket, - * return a tuple of its fields: object_id, data_size, metadata_size. */ + /* Receive object notification from the plasma connection socket. If the + * object was added, return a tuple of its fields: object_id, data_size, + * metadata_size. If the object was deleted, data_size and metadata_size will + * be set to -1. */ int nbytes = read_bytes(plasma_sock, (uint8_t *) &object_info, sizeof(object_info)); @@ -342,8 +344,13 @@ PyObject *PyPlasma_receive_notification(PyObject *self, PyObject *args) { PyObject *t = PyTuple_New(3); PyTuple_SetItem(t, 0, PyBytes_FromStringAndSize( (char *) object_info.obj_id.id, UNIQUE_ID_SIZE)); - PyTuple_SetItem(t, 1, PyLong_FromLong(object_info.data_size)); - PyTuple_SetItem(t, 2, PyLong_FromLong(object_info.metadata_size)); + if (object_info.is_deletion) { + PyTuple_SetItem(t, 1, PyLong_FromLong(-1)); + PyTuple_SetItem(t, 2, PyLong_FromLong(-1)); + } else { + PyTuple_SetItem(t, 1, PyLong_FromLong(object_info.data_size)); + PyTuple_SetItem(t, 2, PyLong_FromLong(object_info.metadata_size)); + } return t; } diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.c index 9082a7343..32c9e3cf0 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.c @@ -1206,33 +1206,35 @@ void process_status_request(client_connection *client_conn, request_status_done, client_conn); } -void process_object_notification(event_loop *loop, - int client_sock, - void *context, - int events) { - plasma_manager_state *state = context; - object_id obj_id; - object_info object_info; - retry_info retry = { - .num_retries = NUM_RETRIES, - .timeout = MANAGER_TIMEOUT, - .fail_callback = NULL, - }; - /* Read the notification from Plasma. */ - int error = - read_bytes(client_sock, (uint8_t *) &object_info, sizeof(object_info)); - if (error < 0) { - /* The store has closed the socket. */ - LOG_DEBUG( - "The plasma store has closed the object notification socket, or some " - "other error has occurred."); - event_loop_remove_file(loop, client_sock); - close(client_sock); - return; +void process_delete_object_notification(plasma_manager_state *state, + object_info object_info) { + object_id obj_id = object_info.obj_id; + available_object *entry; + HASH_FIND(hh, state->local_available_objects, &obj_id, sizeof(obj_id), entry); + if (entry != NULL) { + HASH_DELETE(hh, state->local_available_objects, entry); + free(entry); } - obj_id = object_info.obj_id; - /* Add object to locally available object. */ - /* TODO(pcm): Where is this deallocated? */ + + /* Remove this object from the (redis) object table. */ + if (state->db) { + retry_info retry = { + .num_retries = NUM_RETRIES, + .timeout = MANAGER_TIMEOUT, + .fail_callback = NULL, + }; + object_table_remove(state->db, obj_id, NULL, &retry, NULL, NULL); + } + + /* NOTE: There could be pending wait requests for this object that will now + * return when the object is not actually available. For simplicity, we allow + * this scenario rather than try to keep the wait request statuses exactly + * up-to-date. */ +} + +void process_add_object_notification(plasma_manager_state *state, + object_info object_info) { + object_id obj_id = object_info.obj_id; available_object *entry = (available_object *) malloc(sizeof(available_object)); entry->object_id = obj_id; @@ -1243,6 +1245,11 @@ void process_object_notification(event_loop *loop, if (state->db) { /* TODO(swang): Log the error if we fail to add the object, and possibly * retry later? */ + retry_info retry = { + .num_retries = NUM_RETRIES, + .timeout = MANAGER_TIMEOUT, + .fail_callback = NULL, + }; object_table_add(state->db, obj_id, object_info.data_size + object_info.metadata_size, object_info.digest, &retry, NULL, NULL); @@ -1263,6 +1270,32 @@ void process_object_notification(event_loop *loop, PLASMA_OBJECT_LOCAL); } +void process_object_notification(event_loop *loop, + int client_sock, + void *context, + int events) { + plasma_manager_state *state = context; + object_info object_info; + /* Read the notification from Plasma. */ + int error = + read_bytes(client_sock, (uint8_t *) &object_info, sizeof(object_info)); + if (error < 0) { + /* The store has closed the socket. */ + LOG_DEBUG( + "The plasma store has closed the object notification socket, or some " + "other error has occurred."); + event_loop_remove_file(loop, client_sock); + close(client_sock); + return; + } + /* Add object to locally available object. */ + if (object_info.is_deletion) { + process_delete_object_notification(state, object_info); + } else { + process_add_object_notification(state, object_info); + } +} + void process_message(event_loop *loop, int client_sock, void *context, diff --git a/src/plasma/plasma_store.c b/src/plasma/plasma_store.c index fcbe24eee..7aab1e5cb 100644 --- a/src/plasma/plasma_store.c +++ b/src/plasma/plasma_store.c @@ -103,6 +103,8 @@ plasma_store_state *init_plasma_store(event_loop *loop, int64_t system_memory) { return state; } +void push_notification(plasma_store_state *state, object_id object_id); + /* If this client is not already using the object, add the client to the * object's list of clients, otherwise do nothing. */ void add_client_to_object_clients(object_table_entry *entry, @@ -321,12 +323,7 @@ void seal_object(client *client_context, /* Set the object digest. */ memcpy(entry->info.digest, digest, DIGEST_SIZE); /* Inform all subscribers that a new object has been sealed. */ - notification_queue *queue, *temp_queue; - HASH_ITER(hh, plasma_state->pending_notifications, queue, temp_queue) { - utarray_push_back(queue->object_ids, &object_id); - send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state, - 0); - } + push_notification(plasma_state, object_id); /* Inform processes getting this object that the object is ready now. */ object_notify_entry *notify_entry; @@ -375,6 +372,8 @@ void delete_object(plasma_store_state *plasma_state, object_id object_id) { dlfree(pointer); utarray_free(entry->clients); free(entry); + /* Inform all subscribers that the object has been deleted. */ + push_notification(plasma_state, object_id); } void remove_objects(plasma_store_state *plasma_state, @@ -390,6 +389,15 @@ void remove_objects(plasma_store_state *plasma_state, } } +void push_notification(plasma_store_state *plasma_state, object_id object_id) { + notification_queue *queue, *temp_queue; + HASH_ITER(hh, plasma_state->pending_notifications, queue, temp_queue) { + utarray_push_back(queue->object_ids, &object_id); + send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state, + 0); + } +} + /* Send more notifications to a subscriber. */ void send_notifications(event_loop *loop, int client_sock, @@ -409,9 +417,16 @@ void send_notifications(event_loop *loop, /* This object should already exist in plasma store state. */ HASH_FIND(handle, plasma_state->plasma_store_info->objects, obj_id, sizeof(object_id), entry); - CHECK(entry != NULL); - object_info object_info = entry->info; + object_info object_info; + if (entry == NULL) { + memset(&object_info, 0, sizeof(object_info)); + object_info.obj_id = *obj_id; + object_info.is_deletion = true; + } else { + object_info = entry->info; + object_info.is_deletion = false; + } /* Attempt to send a notification about this object ID. */ int nbytes = diff --git a/src/plasma/test/test.py b/src/plasma/test/test.py index b35b42d8e..6c6198583 100644 --- a/src/plasma/test/test.py +++ b/src/plasma/test/test.py @@ -351,6 +351,63 @@ class TestPlasmaClient(unittest.TestCase): self.assertEqual(data_sizes[j], recv_dsize) self.assertEqual(metadata_sizes[j], recv_msize) + def test_subscribe_deletions(self): + # Subscribe to notifications from the Plasma Store. We use plasma_client2 + # to make sure that all used objects will get evicted properly. + sock = self.plasma_client2.subscribe() + for i in [1, 10, 100, 1000, 10000, 100000]: + object_ids = [random_object_id() for _ in range(i)] + # Add 1 to the sizes to make sure we have nonzero object sizes. + metadata_sizes = [np.random.randint(1000) + 1 for _ in range(i)] + data_sizes = [np.random.randint(1000) + 1 for _ in range(i)] + for j in range(i): + x = self.plasma_client2.create(object_ids[j], size=data_sizes[j], + metadata=bytearray(np.random.bytes(metadata_sizes[j]))) + self.plasma_client2.seal(object_ids[j]) + del x + # Check that we received notifications for creating all of the objects. + for j in range(i): + recv_objid, recv_dsize, recv_msize = self.plasma_client2.get_next_notification() + self.assertEqual(object_ids[j], recv_objid) + self.assertEqual(data_sizes[j], recv_dsize) + self.assertEqual(metadata_sizes[j], recv_msize) + + # Check that we receive notifications for deleting all objects, as we + # evict them. + for j in range(i): + self.assertEqual(self.plasma_client2.evict(1), data_sizes[j] + metadata_sizes[j]) + recv_objid, recv_dsize, recv_msize = self.plasma_client2.get_next_notification() + self.assertEqual(object_ids[j], recv_objid) + self.assertEqual(-1, recv_dsize) + self.assertEqual(-1, recv_msize) + + # Test multiple deletion notifications. The first 9 object IDs have size 0, + # and the last has a nonzero size. When Plasma evicts 1 byte, it will evict + # all objects, so we should receive deletion notifications for each. + num_object_ids = 10 + object_ids = [random_object_id() for _ in range(num_object_ids)] + metadata_sizes = [0] * (num_object_ids - 1) + data_sizes = [0] * (num_object_ids - 1) + metadata_sizes.append(np.random.randint(1000)) + data_sizes.append(np.random.randint(1000)) + for i in range(num_object_ids): + x = self.plasma_client2.create(object_ids[i], size=data_sizes[i], + metadata=bytearray(np.random.bytes(metadata_sizes[i]))) + self.plasma_client2.seal(object_ids[i]) + del x + for i in range(num_object_ids): + recv_objid, recv_dsize, recv_msize = self.plasma_client2.get_next_notification() + self.assertEqual(object_ids[i], recv_objid) + self.assertEqual(data_sizes[i], recv_dsize) + self.assertEqual(metadata_sizes[i], recv_msize) + self.assertEqual(self.plasma_client2.evict(1), data_sizes[-1] + metadata_sizes[-1]) + for i in range(num_object_ids): + recv_objid, recv_dsize, recv_msize = self.plasma_client2.get_next_notification() + self.assertEqual(object_ids[i], recv_objid) + self.assertEqual(-1, recv_dsize) + self.assertEqual(-1, recv_msize) + + class TestPlasmaManager(unittest.TestCase): def setUp(self):