From dadbcae17d729c18e93ff2f27b9bc091671aaba3 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 1 Dec 2016 16:46:17 -0800 Subject: [PATCH] Cache plasma object information on the plasma client (#73) * Cache plasma object information on the plasma client * Plasma client uses cached object references to get the object data --- src/plasma/plasma_client.c | 144 ++++++++++++++++++++++++++----------- src/plasma/test/test.py | 2 +- 2 files changed, 102 insertions(+), 44 deletions(-) diff --git a/src/plasma/plasma_client.c b/src/plasma/plasma_client.c index 53bd3074c..ff0d5db87 100644 --- a/src/plasma/plasma_client.c +++ b/src/plasma/plasma_client.c @@ -55,13 +55,15 @@ typedef struct { typedef struct { /** The ID of the object. This is used as the key in the hash table. */ object_id object_id; - /** The file descriptor of the memory-mapped file that contains the object. */ - int fd; /** A count of the number of times this client has called plasma_create or * plasma_get on this object ID minus the number of calls to plasma_release. * When this count reaches zero, we remove the entry from the objects_in_use * and decrement a count in the relevant client_mmap_table_entry. */ int count; + /** Cached information to read the object. */ + plasma_object object; + /** A flag representing whether the object has been sealed. */ + bool is_sealed; /** Handle for the uthash table. */ UT_hash_handle hh; } object_in_use_entry; @@ -130,9 +132,19 @@ uint8_t *lookup_or_mmap(plasma_connection *conn, } } +/* Get a pointer to a file that we know has been memory mapped in this client + * process before. */ +uint8_t *lookup_mmapped_file(plasma_connection *conn, int store_fd_val) { + client_mmap_table_entry *entry; + HASH_FIND_INT(conn->mmap_table, &store_fd_val, entry); + CHECK(entry); + return entry->pointer; +} + void increment_object_count(plasma_connection *conn, object_id object_id, - int fd) { + plasma_object *object, + bool is_sealed) { /* Increment the count of the object to track the fact that it is being used. * The corresponding decrement should happen in plasma_release. */ object_in_use_entry *object_entry; @@ -143,15 +155,16 @@ void increment_object_count(plasma_connection *conn, * corresponding call to free happens in plasma_release. */ object_entry = malloc(sizeof(object_in_use_entry)); object_entry->object_id = object_id; - object_entry->fd = fd; + object_entry->object = *object; object_entry->count = 0; + object_entry->is_sealed = is_sealed; HASH_ADD(hh, conn->objects_in_use, object_id, sizeof(object_id), object_entry); /* Increment the count of the number of objects in the memory-mapped file * that are being used. The corresponding decrement should happen in * plasma_release. */ client_mmap_table_entry *entry; - HASH_FIND_INT(conn->mmap_table, &object_entry->fd, entry); + HASH_FIND_INT(conn->mmap_table, &object->handle.store_fd, entry); CHECK(entry != NULL); CHECK(entry->count >= 0); entry->count += 1; @@ -202,8 +215,8 @@ bool plasma_create(plasma_connection *conn, } /* Increment the count of the number of instances of this object that this * client is using. A call to plasma_release is required to decrement this - * count. */ - increment_object_count(conn, object_id, object->handle.store_fd); + * count. Cache the reference to the object. */ + increment_object_count(conn, object_id, object, false); return true; } @@ -214,16 +227,34 @@ void plasma_get(plasma_connection *conn, uint8_t **data, int64_t *metadata_size, uint8_t **metadata) { - plasma_request req = plasma_make_request(object_id); - CHECK(plasma_send_request(conn->store_conn, PLASMA_GET, &req) >= 0); - plasma_reply reply; - CHECK(plasma_receive_reply(conn->store_conn, sizeof(reply), &reply) >= 0); - int fd = recv_fd(conn->store_conn); - CHECK(fd >= 0); - plasma_object *object = &reply.object; - *data = lookup_or_mmap(conn, fd, object->handle.store_fd, - object->handle.mmap_size) + - object->data_offset; + /* Check if we already have a reference to the object. */ + object_in_use_entry *object_entry; + HASH_FIND(hh, conn->objects_in_use, &object_id, sizeof(object_id), + object_entry); + plasma_object *object; + if (object_entry) { + /* If we have already have a reference to the object, use it to get the + * data pointer. + * NOTE: If the object is still unsealed, we will deadlock, since we must + * have been the one who created it. */ + CHECKM(object_entry->is_sealed, + "Plasma client called get on an unsealed object that it created"); + object = &object_entry->object; + *data = lookup_mmapped_file(conn, object->handle.store_fd); + } else { + /* Else, request a reference to the object data from the plasma store. */ + plasma_request req = plasma_make_request(object_id); + CHECK(plasma_send_request(conn->store_conn, PLASMA_GET, &req) >= 0); + plasma_reply reply; + CHECK(plasma_receive_reply(conn->store_conn, sizeof(reply), &reply) >= 0); + int fd = recv_fd(conn->store_conn); + CHECK(fd >= 0); + object = &reply.object; + *data = lookup_or_mmap(conn, fd, object->handle.store_fd, + object->handle.mmap_size); + } + /* Finish filling out the return values. */ + *data = *data + object->data_offset; *size = object->data_size; /* If requested, return the metadata as well. */ if (metadata != NULL) { @@ -232,8 +263,8 @@ void plasma_get(plasma_connection *conn, } /* Increment the count of the number of instances of this object that this * client is using. A call to plasma_release is required to decrement this - * count. */ - increment_object_count(conn, object_id, object->handle.store_fd); + * count. Cache the reference to the object. */ + increment_object_count(conn, object_id, object, true); } void plasma_perform_release(plasma_connection *conn, object_id object_id) { @@ -252,7 +283,8 @@ void plasma_perform_release(plasma_connection *conn, object_id object_id) { * that the client is using. The corresponding increment should have * happened in plasma_get. */ client_mmap_table_entry *entry; - HASH_FIND_INT(conn->mmap_table, &object_entry->fd, entry); + int fd = object_entry->object.handle.store_fd; + HASH_FIND_INT(conn->mmap_table, &fd, entry); CHECK(entry != NULL); entry->count -= 1; CHECK(entry->count >= 0); @@ -302,6 +334,17 @@ void plasma_contains(plasma_connection *conn, } void plasma_seal(plasma_connection *conn, object_id object_id) { + /* Make sure this client has a reference to the object before sending the + * request to Plasma. */ + object_in_use_entry *object_entry; + HASH_FIND(hh, conn->objects_in_use, &object_id, sizeof(object_id), + object_entry); + CHECKM(object_entry != NULL, + "Plasma client called seal an object without a reference to it"); + CHECKM(!object_entry->is_sealed, + "Plasma client called seal an already sealed object"); + object_entry->is_sealed = true; + /* Send the seal request to Plasma. */ plasma_request req = plasma_make_request(object_id); CHECK(plasma_send_request(conn->store_conn, PLASMA_SEAL, &req) >= 0); if (conn->manager_conn >= 0) { @@ -549,33 +592,48 @@ bool plasma_get_local(plasma_connection *conn, object_id object_id, object_buffer *object_buffer) { CHECK(conn != NULL); + /* Check if we already have a reference to the object. */ + object_in_use_entry *object_entry; + HASH_FIND(hh, conn->objects_in_use, &object_id, sizeof(object_id), + object_entry); + plasma_object *object; + if (object_entry) { + /* If we have already have a reference to the object, use it to get the + * data pointer. */ + if (!object_entry->is_sealed) { + /* The object is in our local store, but it hasn't been sealed yet. */ + return false; + } + object = &object_entry->object; + object_buffer->data = lookup_mmapped_file(conn, object->handle.store_fd); + } else { + /* Else, request a reference to the object data from the plasma store. */ + plasma_request req = plasma_make_request(object_id); + CHECK(plasma_send_request(conn->store_conn, PLASMA_GET_LOCAL, &req) >= 0); - plasma_request req = plasma_make_request(object_id); - CHECK(plasma_send_request(conn->store_conn, PLASMA_GET_LOCAL, &req) >= 0); + plasma_reply reply; + CHECK(plasma_receive_reply(conn->store_conn, sizeof(reply), &reply) >= 0); + int fd = recv_fd(conn->store_conn); + CHECKM(fd >= 0, "recv_fd not successful"); - plasma_reply reply; - CHECK(plasma_receive_reply(conn->store_conn, sizeof(reply), &reply) >= 0); - int fd = recv_fd(conn->store_conn); - CHECKM(fd >= 0, "recv_fd not successful"); - - if (reply.has_object) { - plasma_object *object = &reply.object; + if (!reply.has_object) { + /* The object is not in our local store. */ + return false; + } + object = &reply.object; object_buffer->data = lookup_or_mmap(conn, fd, object->handle.store_fd, - object->handle.mmap_size) + - object->data_offset; - object_buffer->data_size = object->data_size; - object_buffer->metadata = object_buffer->data + object->data_size; - object_buffer->metadata_size = object->metadata_size; - - /* Increment the count of the number of instances of this object that this - * client is using. A call to plasma_release is required to decrement this - * count. */ - increment_object_count(conn, object_id, object->handle.store_fd); - return true; + object->handle.mmap_size); } - /* The object is either (1) not available in the local Plasma store, or (2) it - * is not sealed yet. */ - return false; + /* Finish filling out the return values. */ + object_buffer->data += object->data_offset; + object_buffer->data_size = object->data_size; + object_buffer->metadata = object_buffer->data + object->data_size; + object_buffer->metadata_size = object->metadata_size; + /* Increment the count of the number of instances of this object that this + * client is using. A call to plasma_release is required to decrement this + * count. Cache the reference to the object. */ + increment_object_count(conn, object_id, object, true); + return true; } int plasma_fetch_remote(plasma_connection *conn, object_id object_id) { diff --git a/src/plasma/test/test.py b/src/plasma/test/test.py index 5112fc5e3..7e916f14c 100644 --- a/src/plasma/test/test.py +++ b/src/plasma/test/test.py @@ -206,8 +206,8 @@ class TestPlasmaClient(unittest.TestCase): object_id3 = random_object_id() b2 = client.create(object_id2, 999) b3 = client.create(object_id3, 998) - del b3 client.seal(object_id3) + del b3 self.assertEqual(client.evict(1000), 998) object_id4 = random_object_id()