Cache plasma object information on the plasma client (#73)

* Cache plasma object information on the plasma client

* Plasma client uses cached object references to get the object data
This commit is contained in:
Stephanie Wang
2016-12-01 16:46:17 -08:00
committed by Philipp Moritz
parent f89be9699c
commit dadbcae17d
2 changed files with 102 additions and 44 deletions
+101 -43
View File
@@ -55,13 +55,15 @@ typedef struct {
typedef struct {
/** The ID of the object. This is used as the key in the hash table. */
object_id object_id;
/** The file descriptor of the memory-mapped file that contains the object. */
int fd;
/** A count of the number of times this client has called plasma_create or
* plasma_get on this object ID minus the number of calls to plasma_release.
* When this count reaches zero, we remove the entry from the objects_in_use
* and decrement a count in the relevant client_mmap_table_entry. */
int count;
/** Cached information to read the object. */
plasma_object object;
/** A flag representing whether the object has been sealed. */
bool is_sealed;
/** Handle for the uthash table. */
UT_hash_handle hh;
} object_in_use_entry;
@@ -130,9 +132,19 @@ uint8_t *lookup_or_mmap(plasma_connection *conn,
}
}
/* Get a pointer to a file that we know has been memory mapped in this client
* process before. */
uint8_t *lookup_mmapped_file(plasma_connection *conn, int store_fd_val) {
client_mmap_table_entry *entry;
HASH_FIND_INT(conn->mmap_table, &store_fd_val, entry);
CHECK(entry);
return entry->pointer;
}
void increment_object_count(plasma_connection *conn,
object_id object_id,
int fd) {
plasma_object *object,
bool is_sealed) {
/* Increment the count of the object to track the fact that it is being used.
* The corresponding decrement should happen in plasma_release. */
object_in_use_entry *object_entry;
@@ -143,15 +155,16 @@ void increment_object_count(plasma_connection *conn,
* corresponding call to free happens in plasma_release. */
object_entry = malloc(sizeof(object_in_use_entry));
object_entry->object_id = object_id;
object_entry->fd = fd;
object_entry->object = *object;
object_entry->count = 0;
object_entry->is_sealed = is_sealed;
HASH_ADD(hh, conn->objects_in_use, object_id, sizeof(object_id),
object_entry);
/* Increment the count of the number of objects in the memory-mapped file
* that are being used. The corresponding decrement should happen in
* plasma_release. */
client_mmap_table_entry *entry;
HASH_FIND_INT(conn->mmap_table, &object_entry->fd, entry);
HASH_FIND_INT(conn->mmap_table, &object->handle.store_fd, entry);
CHECK(entry != NULL);
CHECK(entry->count >= 0);
entry->count += 1;
@@ -202,8 +215,8 @@ bool plasma_create(plasma_connection *conn,
}
/* Increment the count of the number of instances of this object that this
* client is using. A call to plasma_release is required to decrement this
* count. */
increment_object_count(conn, object_id, object->handle.store_fd);
* count. Cache the reference to the object. */
increment_object_count(conn, object_id, object, false);
return true;
}
@@ -214,16 +227,34 @@ void plasma_get(plasma_connection *conn,
uint8_t **data,
int64_t *metadata_size,
uint8_t **metadata) {
plasma_request req = plasma_make_request(object_id);
CHECK(plasma_send_request(conn->store_conn, PLASMA_GET, &req) >= 0);
plasma_reply reply;
CHECK(plasma_receive_reply(conn->store_conn, sizeof(reply), &reply) >= 0);
int fd = recv_fd(conn->store_conn);
CHECK(fd >= 0);
plasma_object *object = &reply.object;
*data = lookup_or_mmap(conn, fd, object->handle.store_fd,
object->handle.mmap_size) +
object->data_offset;
/* Check if we already have a reference to the object. */
object_in_use_entry *object_entry;
HASH_FIND(hh, conn->objects_in_use, &object_id, sizeof(object_id),
object_entry);
plasma_object *object;
if (object_entry) {
/* If we have already have a reference to the object, use it to get the
* data pointer.
* NOTE: If the object is still unsealed, we will deadlock, since we must
* have been the one who created it. */
CHECKM(object_entry->is_sealed,
"Plasma client called get on an unsealed object that it created");
object = &object_entry->object;
*data = lookup_mmapped_file(conn, object->handle.store_fd);
} else {
/* Else, request a reference to the object data from the plasma store. */
plasma_request req = plasma_make_request(object_id);
CHECK(plasma_send_request(conn->store_conn, PLASMA_GET, &req) >= 0);
plasma_reply reply;
CHECK(plasma_receive_reply(conn->store_conn, sizeof(reply), &reply) >= 0);
int fd = recv_fd(conn->store_conn);
CHECK(fd >= 0);
object = &reply.object;
*data = lookup_or_mmap(conn, fd, object->handle.store_fd,
object->handle.mmap_size);
}
/* Finish filling out the return values. */
*data = *data + object->data_offset;
*size = object->data_size;
/* If requested, return the metadata as well. */
if (metadata != NULL) {
@@ -232,8 +263,8 @@ void plasma_get(plasma_connection *conn,
}
/* Increment the count of the number of instances of this object that this
* client is using. A call to plasma_release is required to decrement this
* count. */
increment_object_count(conn, object_id, object->handle.store_fd);
* count. Cache the reference to the object. */
increment_object_count(conn, object_id, object, true);
}
void plasma_perform_release(plasma_connection *conn, object_id object_id) {
@@ -252,7 +283,8 @@ void plasma_perform_release(plasma_connection *conn, object_id object_id) {
* that the client is using. The corresponding increment should have
* happened in plasma_get. */
client_mmap_table_entry *entry;
HASH_FIND_INT(conn->mmap_table, &object_entry->fd, entry);
int fd = object_entry->object.handle.store_fd;
HASH_FIND_INT(conn->mmap_table, &fd, entry);
CHECK(entry != NULL);
entry->count -= 1;
CHECK(entry->count >= 0);
@@ -302,6 +334,17 @@ void plasma_contains(plasma_connection *conn,
}
void plasma_seal(plasma_connection *conn, object_id object_id) {
/* Make sure this client has a reference to the object before sending the
* request to Plasma. */
object_in_use_entry *object_entry;
HASH_FIND(hh, conn->objects_in_use, &object_id, sizeof(object_id),
object_entry);
CHECKM(object_entry != NULL,
"Plasma client called seal an object without a reference to it");
CHECKM(!object_entry->is_sealed,
"Plasma client called seal an already sealed object");
object_entry->is_sealed = true;
/* Send the seal request to Plasma. */
plasma_request req = plasma_make_request(object_id);
CHECK(plasma_send_request(conn->store_conn, PLASMA_SEAL, &req) >= 0);
if (conn->manager_conn >= 0) {
@@ -549,33 +592,48 @@ bool plasma_get_local(plasma_connection *conn,
object_id object_id,
object_buffer *object_buffer) {
CHECK(conn != NULL);
/* Check if we already have a reference to the object. */
object_in_use_entry *object_entry;
HASH_FIND(hh, conn->objects_in_use, &object_id, sizeof(object_id),
object_entry);
plasma_object *object;
if (object_entry) {
/* If we have already have a reference to the object, use it to get the
* data pointer. */
if (!object_entry->is_sealed) {
/* The object is in our local store, but it hasn't been sealed yet. */
return false;
}
object = &object_entry->object;
object_buffer->data = lookup_mmapped_file(conn, object->handle.store_fd);
} else {
/* Else, request a reference to the object data from the plasma store. */
plasma_request req = plasma_make_request(object_id);
CHECK(plasma_send_request(conn->store_conn, PLASMA_GET_LOCAL, &req) >= 0);
plasma_request req = plasma_make_request(object_id);
CHECK(plasma_send_request(conn->store_conn, PLASMA_GET_LOCAL, &req) >= 0);
plasma_reply reply;
CHECK(plasma_receive_reply(conn->store_conn, sizeof(reply), &reply) >= 0);
int fd = recv_fd(conn->store_conn);
CHECKM(fd >= 0, "recv_fd not successful");
plasma_reply reply;
CHECK(plasma_receive_reply(conn->store_conn, sizeof(reply), &reply) >= 0);
int fd = recv_fd(conn->store_conn);
CHECKM(fd >= 0, "recv_fd not successful");
if (reply.has_object) {
plasma_object *object = &reply.object;
if (!reply.has_object) {
/* The object is not in our local store. */
return false;
}
object = &reply.object;
object_buffer->data = lookup_or_mmap(conn, fd, object->handle.store_fd,
object->handle.mmap_size) +
object->data_offset;
object_buffer->data_size = object->data_size;
object_buffer->metadata = object_buffer->data + object->data_size;
object_buffer->metadata_size = object->metadata_size;
/* Increment the count of the number of instances of this object that this
* client is using. A call to plasma_release is required to decrement this
* count. */
increment_object_count(conn, object_id, object->handle.store_fd);
return true;
object->handle.mmap_size);
}
/* The object is either (1) not available in the local Plasma store, or (2) it
* is not sealed yet. */
return false;
/* Finish filling out the return values. */
object_buffer->data += object->data_offset;
object_buffer->data_size = object->data_size;
object_buffer->metadata = object_buffer->data + object->data_size;
object_buffer->metadata_size = object->metadata_size;
/* Increment the count of the number of instances of this object that this
* client is using. A call to plasma_release is required to decrement this
* count. Cache the reference to the object. */
increment_object_count(conn, object_id, object, true);
return true;
}
int plasma_fetch_remote(plasma_connection *conn, object_id object_id) {
+1 -1
View File
@@ -206,8 +206,8 @@ class TestPlasmaClient(unittest.TestCase):
object_id3 = random_object_id()
b2 = client.create(object_id2, 999)
b3 = client.create(object_id3, 998)
del b3
client.seal(object_id3)
del b3
self.assertEqual(client.evict(1000), 998)
object_id4 = random_object_id()