From d41566a499d75a31d1345b774e9a9a1cb433e13f Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Fri, 23 Sep 2016 15:07:50 -0700 Subject: [PATCH] Implement delete and contains for plasma client and store. (#28) * Implement delete and contains for plasma client and store. * Fix style, free object, add tests. --- lib/python/plasma.py | 28 ++++++++++++++++++++++ src/plasma.h | 16 +++++++++---- src/plasma_client.c | 28 ++++++++++++++++++---- src/plasma_client.h | 12 ++++++++++ src/plasma_manager.c | 2 +- src/plasma_store.c | 42 +++++++++++++++++++++++++++++++++ test/test.py | 55 ++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 173 insertions(+), 10 deletions(-) diff --git a/lib/python/plasma.py b/lib/python/plasma.py index 2bbcd6d49..37fa8f72c 100644 --- a/lib/python/plasma.py +++ b/lib/python/plasma.py @@ -43,7 +43,9 @@ class PlasmaClient(object): self.client.plasma_store_connect.restype = ctypes.c_void_p self.client.plasma_create.restype = None self.client.plasma_get.restype = None + self.client.plasma_contains.restype = None self.client.plasma_seal.restype = None + self.client.plasma_delete.restype = None self.buffer_from_memory = ctypes.pythonapi.PyBuffer_FromMemory self.buffer_from_memory.argtypes = [ctypes.c_void_p, ctypes.c_int64] @@ -111,6 +113,22 @@ class PlasmaClient(object): buf = self.client.plasma_get(self.store_conn, make_plasma_id(object_id), ctypes.byref(size), ctypes.byref(data), ctypes.byref(metadata_size), ctypes.byref(metadata)) return self.buffer_from_memory(metadata, metadata_size) + def contains(self, object_id): + """Check if the object is present and has been sealed in the PlasmaStore. + + Args: + object_id (str): A string used to identify an object. + """ + has_object = ctypes.c_int() + self.client.plasma_contains(self.store_conn, make_plasma_id(object_id), ctypes.byref(has_object)) + has_object = has_object.value + if has_object == 1: + return True + elif has_object == 0: + return False + else: + raise Exception("This code should be unreachable.") + def seal(self, object_id): """Seal the buffer in the PlasmaStore for a particular object ID. @@ -122,6 +140,16 @@ class PlasmaClient(object): """ self.client.plasma_seal(self.store_conn, make_plasma_id(object_id)) + def delete(self, object_id): + """Delete the buffer in the PlasmaStore for a particular object ID. + + Once a buffer has been deleted, the buffer is no longer accessible. + + Args: + object_id (str): A string used to identify an object. + """ + self.client.plasma_delete(self.store_conn, make_plasma_id(object_id)) + def transfer(self, addr, port, object_id): """Transfer local object with id object_id to another plasma instance diff --git a/src/plasma.h b/src/plasma.h index ddf89ad34..4e8e15464 100644 --- a/src/plasma.h +++ b/src/plasma.h @@ -51,11 +51,15 @@ enum plasma_request_type { PLASMA_CREATE, /* Get an object. */ PLASMA_GET, - /* seal an object */ + /* Check if an object is present. */ + PLASMA_CONTAINS, + /* Seal an object. */ PLASMA_SEAL, - /* request transfer to another store */ + /* Delete an object. */ + PLASMA_DELETE, + /* Request transfer to another store. */ PLASMA_TRANSFER, - /* Header for sending data */ + /* Header for sending data. */ PLASMA_DATA, }; @@ -81,6 +85,8 @@ typedef struct { int64_t data_size; /* The size of the metadata. */ int64_t metadata_size; + /* 1 if the object is present and 0 otherwise. Used for plasma_contains. */ + int has_object; /* Numerical value of the fd of the memory mapped file in the store. */ int store_fd_val; } plasma_reply; @@ -112,6 +118,8 @@ typedef struct { client_mmap_table_entry *mmap_table; } plasma_store_conn; -void plasma_send(int conn, plasma_request *req); +void plasma_send_request(int conn, plasma_request *req); + +void plasma_send_reply(int conn, plasma_reply *req); #endif diff --git a/src/plasma_client.c b/src/plasma_client.c index 3d94e503b..e486e1aa8 100644 --- a/src/plasma_client.c +++ b/src/plasma_client.c @@ -16,7 +16,7 @@ #include "plasma_client.h" #include "fling.h" -void plasma_send(int fd, plasma_request *req) { +void plasma_send_request(int fd, plasma_request *req) { int req_count = sizeof(plasma_request); if (write(fd, req, req_count) != req_count) { LOG_ERR("write error, fd = %d", fd); @@ -66,7 +66,7 @@ void plasma_create(plasma_store_conn *conn, .object_id = object_id, .data_size = data_size, .metadata_size = metadata_size}; - plasma_send(conn->conn, &req); + plasma_send_request(conn->conn, &req); plasma_reply reply; int fd = recv_fd(conn->conn, (char *) &reply, sizeof(plasma_reply)); assert(reply.data_size == data_size); @@ -92,7 +92,7 @@ void plasma_get(plasma_store_conn *conn, int64_t *metadata_size, uint8_t **metadata) { plasma_request req = {.type = PLASMA_GET, .object_id = object_id}; - plasma_send(conn->conn, &req); + plasma_send_request(conn->conn, &req); plasma_reply reply; int fd = recv_fd(conn->conn, (char *) &reply, sizeof(plasma_reply)); *data = lookup_or_mmap(conn, fd, reply.store_fd_val, reply.map_size) + @@ -105,9 +105,27 @@ void plasma_get(plasma_store_conn *conn, } } +/* This method is used to query whether the plasma store contains an object. */ +void plasma_contains(plasma_store_conn *conn, + plasma_id object_id, + int *has_object) { + plasma_request req = {.type = PLASMA_CONTAINS, .object_id = object_id}; + plasma_send_request(conn->conn, &req); + plasma_reply reply; + int r = read(conn->conn, &reply, sizeof(plasma_reply)); + PLASMA_CHECK(r != -1, "read error"); + PLASMA_CHECK(r != 0, "connection disconnected"); + *has_object = reply.has_object; +} + void plasma_seal(plasma_store_conn *conn, plasma_id object_id) { plasma_request req = {.type = PLASMA_SEAL, .object_id = object_id}; - plasma_send(conn->conn, &req); + plasma_send_request(conn->conn, &req); +} + +void plasma_delete(plasma_store_conn *conn, plasma_id object_id) { + plasma_request req = {.type = PLASMA_DELETE, .object_id = object_id}; + plasma_send_request(conn->conn, &req); } plasma_store_conn *plasma_store_connect(const char *socket_name) { @@ -187,5 +205,5 @@ void plasma_transfer(int manager, /* skip the '.' */ end += 1; } - plasma_send(manager, &req); + plasma_send_request(manager, &req); } diff --git a/src/plasma_client.h b/src/plasma_client.h index 4c7cc008e..87a33269f 100644 --- a/src/plasma_client.h +++ b/src/plasma_client.h @@ -22,6 +22,18 @@ void plasma_get(plasma_store_conn *conn, int64_t *metadata_size, uint8_t **metadata); +/* Check if the object store contains a particular object and the object has + * been sealed. The result will be stored in has_object. TODO(rkn): We may want + * to indicate whether the object is currently being written. */ +void plasma_contains(plasma_store_conn *conn, + plasma_id object_id, + int *has_object); + void plasma_seal(plasma_store_conn *conn, plasma_id object_id); +/* Delete an object from the object store. This currently assumes that the + * object is present and has been sealed. TODO(rkn): We may want to allow the + * deletion of objects that are not present or haven't been sealed. */ +void plasma_delete(plasma_store_conn *conn, plasma_id object_id); + #endif diff --git a/src/plasma_manager.c b/src/plasma_manager.c index 7cad02504..a4952295d 100644 --- a/src/plasma_manager.c +++ b/src/plasma_manager.c @@ -74,7 +74,7 @@ void initiate_transfer(plasma_manager_state *s, plasma_request *req) { .object_id = req->object_id, .data_size = buf.data_size, .metadata_size = buf.metadata_size}; - plasma_send(fd, &manager_req); + plasma_send_request(fd, &manager_req); } /* Start reading data from another object manager. diff --git a/src/plasma_store.c b/src/plasma_store.c index a7aef2c00..85c9ad0b9 100644 --- a/src/plasma_store.c +++ b/src/plasma_store.c @@ -30,12 +30,21 @@ #define MAX_NUM_CLIENTS 100 void* dlmalloc(size_t); +void dlfree(void*); typedef struct { /* Event loop for the plasma store. */ event_loop* loop; } plasma_store_state; +void plasma_send_reply(int fd, plasma_reply* reply) { + int reply_count = sizeof(plasma_reply); + if (write(fd, reply, reply_count) != reply_count) { + LOG_ERR("write error, fd = %d", fd); + exit(-1); + } +} + void init_state(plasma_store_state* s) { s->loop = malloc(sizeof(event_loop)); event_loop_init(s->loop); @@ -54,6 +63,8 @@ typedef struct { ptrdiff_t offset; /* Handle for the uthash table. */ UT_hash_handle handle; + /* Pointer to the object data. Needed to free the object. */ + uint8_t* pointer; } object_table_entry; /* Objects that are still being written by their owner process. */ @@ -96,6 +107,7 @@ void create_object(int conn, plasma_request* req) { memcpy(&entry->object_id, &req->object_id, 20); entry->info.data_size = req->data_size; entry->info.metadata_size = req->metadata_size; + entry->pointer = pointer; /* TODO(pcm): set the other fields */ entry->fd = fd; entry->map_size = map_size; @@ -145,6 +157,16 @@ void get_object(int conn, plasma_request* req) { } } +/* Check if an object is present. */ +void check_if_object_present(int conn, plasma_request* req) { + object_table_entry* entry; + HASH_FIND(handle, sealed_objects, &req->object_id, sizeof(plasma_id), entry); + plasma_reply reply; + memset(&reply, 0, sizeof(plasma_reply)); + reply.has_object = entry ? 1 : 0; + plasma_send_reply(conn, &reply); +} + /* Seal an object that has been created in the hash table. */ void seal_object(int conn, plasma_request* req) { LOG_INFO("sealing object"); // TODO(pcm): add object_id here @@ -176,6 +198,20 @@ void seal_object(int conn, plasma_request* req) { free(notify_entry); } +/* Delete an object that has been created in the hash table. */ +void delete_object(int conn, plasma_request* req) { + LOG_INFO("deleting object"); // TODO(rkn): add object_id here + object_table_entry* entry; + HASH_FIND(handle, sealed_objects, &req->object_id, sizeof(plasma_id), entry); + /* TODO(rkn): This should probably not fail, but should instead throw an + * error. Maybe we should also support deleting objects that have been created + * but not sealed. */ + PLASMA_CHECK(entry != NULL, "To delete an object it must have been sealed."); + uint8_t* pointer = entry->pointer; + HASH_DELETE(handle, sealed_objects, entry); + dlfree(pointer); +} + void process_event(int conn, plasma_request* req) { switch (req->type) { case PLASMA_CREATE: @@ -184,9 +220,15 @@ void process_event(int conn, plasma_request* req) { case PLASMA_GET: get_object(conn, req); break; + case PLASMA_CONTAINS: + check_if_object_present(conn, req); + break; case PLASMA_SEAL: seal_object(conn, req); break; + case PLASMA_DELETE: + delete_object(conn, req); + break; default: LOG_ERR("invalid request %d", req->type); exit(-1); diff --git a/test/test.py b/test/test.py index aae9f2b9e..fcf0be437 100644 --- a/test/test.py +++ b/test/test.py @@ -90,6 +90,61 @@ class TestPlasmaClient(unittest.TestCase): for i in range(len(metadata)): self.assertEqual(metadata[i], metadata_buffer[i]) + def test_contains(self): + fake_object_ids = [random_object_id() for _ in range(100)] + real_object_ids = [random_object_id() for _ in range(100)] + for object_id in real_object_ids: + self.assertFalse(self.plasma_client.contains(object_id)) + memory_buffer = self.plasma_client.create(object_id, 100) + self.plasma_client.seal(object_id) + self.assertTrue(self.plasma_client.contains(object_id)) + for object_id in fake_object_ids: + self.assertFalse(self.plasma_client.contains(object_id)) + for object_id in real_object_ids: + self.assertTrue(self.plasma_client.contains(object_id)) + + def test_individual_delete(self): + length = 100 + # Create an object id string. + object_id = random_object_id() + # Create a random metadata string. + metadata = generate_metadata(100) + # Create a new buffer and write to it. + memory_buffer = self.plasma_client.create(object_id, length, metadata) + for i in range(length): + memory_buffer[i] = chr(i % 256) + # Seal the object. + self.plasma_client.seal(object_id) + # Check that the object is present. + self.assertTrue(self.plasma_client.contains(object_id)) + # Delete the object. + self.plasma_client.delete(object_id) + # Make sure the object is no longer present. + self.assertFalse(self.plasma_client.contains(object_id)) + + def test_delete(self): + # Create some objects. + object_ids = [random_object_id() for _ in range(100)] + for object_id in object_ids: + length = 100 + # Create a random metadata string. + metadata = generate_metadata(100) + # Create a new buffer and write to it. + memory_buffer = self.plasma_client.create(object_id, length, metadata) + for i in range(length): + memory_buffer[i] = chr(i % 256) + # Seal the object. + self.plasma_client.seal(object_id) + # Check that the object is present. + self.assertTrue(self.plasma_client.contains(object_id)) + + # Delete the objects and make sure they are no longer present. + for object_id in object_ids: + # Delete the object. + self.plasma_client.delete(object_id) + # Make sure the object is no longer present. + self.assertFalse(self.plasma_client.contains(object_id)) + def test_illegal_functionality(self): # Create an object id string. object_id = random_object_id()