From b7ace01b5facbdaabf27d0f5da604703ddeb5d56 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Tue, 25 Apr 2017 01:25:40 -0700 Subject: [PATCH] Convert Plasma client to STL (#486) * convert mmap table to STL * update * fix * convert objects_in_use * fix * convert release_history * cleanup * linting * update * fix * linting --- src/plasma/plasma_client.cc | 206 +++++++++++++----------------------- 1 file changed, 72 insertions(+), 134 deletions(-) diff --git a/src/plasma/plasma_client.cc b/src/plasma/plasma_client.cc index 7c196ef45..60985f8b9 100644 --- a/src/plasma/plasma_client.cc +++ b/src/plasma/plasma_client.cc @@ -25,10 +25,9 @@ #include "plasma.h" #include "plasma_protocol.h" #include "plasma_client.h" -#include "uthash.h" -#include "utlist.h" /* C++ includes */ +#include #include #include @@ -46,7 +45,7 @@ extern "C" { #define BYTES_IN_MB (1 << 20) static std::vector threadpool_(THREADPOOL_SIZE); -typedef struct { +struct ClientMmapTableEntry { /** Key that uniquely identifies the memory mapped file. In practice, we * take the numerical value of the file descriptor in the object store. */ int key; @@ -57,25 +56,21 @@ typedef struct { /** The number of objects in this memory-mapped file that are currently being * used by the client. When this count reaches zeros, we unmap the file. */ int count; - /** Handle for the uthash table. */ - UT_hash_handle hh; -} client_mmap_table_entry; +}; -typedef struct { +struct ObjectInUseEntry { /** The ID of the object. This is used as the key in the hash table. */ ObjectID object_id; /** A count of the number of times this client has called plasma_create or * plasma_get on this object ID minus the number of calls to plasma_release. * When this count reaches zero, we remove the entry from the objects_in_use - * and decrement a count in the relevant client_mmap_table_entry. */ + * and decrement a count in the relevant ClientMmapTableEntry. */ int count; /** Cached information to read the object. */ PlasmaObject object; /** A flag representing whether the object has been sealed. */ bool is_sealed; - /** Handle for the uthash table. */ - UT_hash_handle hh; -} object_in_use_entry; +}; /** Configuration options for the plasma client. */ typedef struct { @@ -85,17 +80,6 @@ typedef struct { int release_delay; } plasma_client_config; -/** An element representing a pending release call in a doubly-linked list. This - * is used to implement the delayed release mechanism. */ -typedef struct pending_release { - /** The object_id of the released object. */ - ObjectID object_id; - /** Needed for the doubly-linked list macros. */ - struct pending_release *prev; - /** Needed for the doubly-linked list macros. */ - struct pending_release *next; -} pending_release; - /** Information about a connection between a Plasma Client and Plasma Store. * This is used to avoid mapping the same files into memory multiple times. */ struct PlasmaConnection { @@ -112,19 +96,17 @@ struct PlasmaConnection { /** Table of dlmalloc buffer files that have been memory mapped so far. This * is a hash table mapping a file descriptor to a struct containing the * address of the corresponding memory-mapped file. */ - client_mmap_table_entry *mmap_table; + std::unordered_map mmap_table; /** A hash table of the object IDs that are currently being used by this * client. */ - object_in_use_entry *objects_in_use; - /** Object IDs of the last few release calls. This is a doubly-linked list and + std::unordered_map + objects_in_use; + /** Object IDs of the last few release calls. This is a deque and * is used to delay releasing objects to see if they can be reused by * subsequent tasks so we do not unneccessarily invalidate cpu caches. * TODO(pcm): replace this with a proper lru cache using the size of the L3 * cache. */ - pending_release *release_history; - /** The length of the release_history doubly-linked list. This is an - * implementation detail. */ - int release_history_length; + std::deque release_history; /** The number of bytes in the combined objects that are held in the release * history doubly-linked list. If this is too large then the client starts * releasing objects. */ @@ -145,11 +127,10 @@ uint8_t *lookup_or_mmap(PlasmaConnection *conn, int fd, int store_fd_val, int64_t map_size) { - client_mmap_table_entry *entry; - HASH_FIND_INT(conn->mmap_table, &store_fd_val, entry); - if (entry) { + auto entry = conn->mmap_table.find(store_fd_val); + if (entry != conn->mmap_table.end()) { close(fd); - return entry->pointer; + return entry->second->pointer; } else { uint8_t *result = (uint8_t *) mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); @@ -157,12 +138,12 @@ uint8_t *lookup_or_mmap(PlasmaConnection *conn, LOG_FATAL("mmap failed"); } close(fd); - entry = (client_mmap_table_entry *) malloc(sizeof(client_mmap_table_entry)); + ClientMmapTableEntry *entry = new ClientMmapTableEntry(); entry->key = store_fd_val; entry->pointer = result; entry->length = map_size; entry->count = 0; - HASH_ADD_INT(conn->mmap_table, key, entry); + conn->mmap_table[store_fd_val] = entry; return result; } } @@ -170,10 +151,9 @@ uint8_t *lookup_or_mmap(PlasmaConnection *conn, /* Get a pointer to a file that we know has been memory mapped in this client * process before. */ uint8_t *lookup_mmapped_file(PlasmaConnection *conn, int store_fd_val) { - client_mmap_table_entry *entry; - HASH_FIND_INT(conn->mmap_table, &store_fd_val, entry); - CHECK(entry); - return entry->pointer; + auto entry = conn->mmap_table.find(store_fd_val); + CHECK(entry != conn->mmap_table.end()); + return entry->second->pointer; } void increment_object_count(PlasmaConnection *conn, @@ -182,31 +162,29 @@ void increment_object_count(PlasmaConnection *conn, bool is_sealed) { /* Increment the count of the object to track the fact that it is being used. * The corresponding decrement should happen in plasma_release. */ - object_in_use_entry *object_entry; - HASH_FIND(hh, conn->objects_in_use, &object_id, sizeof(object_id), - object_entry); - if (object_entry == NULL) { + auto elem = conn->objects_in_use.find(object_id); + ObjectInUseEntry *object_entry; + if (elem == conn->objects_in_use.end()) { /* Add this object ID to the hash table of object IDs in use. The * corresponding call to free happens in plasma_release. */ - object_entry = (object_in_use_entry *) malloc(sizeof(object_in_use_entry)); + object_entry = new ObjectInUseEntry(); object_entry->object_id = object_id; object_entry->object = *object; object_entry->count = 0; object_entry->is_sealed = is_sealed; - HASH_ADD(hh, conn->objects_in_use, object_id, sizeof(object_id), - object_entry); + conn->objects_in_use[object_id] = object_entry; /* Increment the count of the number of objects in the memory-mapped file * that are being used. The corresponding decrement should happen in * plasma_release. */ - client_mmap_table_entry *entry; - HASH_FIND_INT(conn->mmap_table, &object->handle.store_fd, entry); - CHECK(entry != NULL); - CHECK(entry->count >= 0); + auto entry = conn->mmap_table.find(object->handle.store_fd); + CHECK(entry != conn->mmap_table.end()); + CHECK(entry->second->count >= 0); /* Update the in_use_object_bytes. */ conn->in_use_object_bytes += (object_entry->object.data_size + object_entry->object.metadata_size); - entry->count += 1; + entry->second->count += 1; } else { + object_entry = elem->second; CHECK(object_entry->count > 0); } /* Increment the count of the number of instances of this object that are @@ -277,22 +255,19 @@ void plasma_get(PlasmaConnection *conn, /* Fill out the info for the objects that are already in use locally. */ bool all_present = true; for (int i = 0; i < num_objects; ++i) { - object_in_use_entry *object_entry; - HASH_FIND(hh, conn->objects_in_use, &object_ids[i], sizeof(object_ids[i]), - object_entry); - if (object_entry == NULL) { + auto object_entry = conn->objects_in_use.find(object_ids[i]); + if (object_entry == conn->objects_in_use.end()) { /* This object is not currently in use by this client, so we need to send * a request to the store. */ all_present = false; /* Make a note to ourselves that the object is not present. */ object_buffers[i].data_size = -1; } else { - PlasmaObject *object; /* NOTE: If the object is still unsealed, we will deadlock, since we must * have been the one who created it. */ - CHECKM(object_entry->is_sealed, + CHECKM(object_entry->second->is_sealed, "Plasma client called get on an unsealed object that it created"); - object = &object_entry->object; + PlasmaObject *object = &object_entry->second->object; object_buffers[i].data = lookup_mmapped_file(conn, object->handle.store_fd); object_buffers[i].data = object_buffers[i].data + object->data_offset; @@ -386,80 +361,61 @@ void plasma_perform_release(PlasmaConnection *conn, ObjectID object_id) { /* Decrement the count of the number of instances of this object that are * being used by this client. The corresponding increment should have happened * in plasma_get. */ - object_in_use_entry *object_entry; - HASH_FIND(hh, conn->objects_in_use, &object_id, sizeof(object_id), - object_entry); - CHECK(object_entry != NULL); - object_entry->count -= 1; - CHECK(object_entry->count >= 0); + auto object_entry = conn->objects_in_use.find(object_id); + CHECK(object_entry != conn->objects_in_use.end()); + object_entry->second->count -= 1; + CHECK(object_entry->second->count >= 0); /* Check if the client is no longer using this object. */ - if (object_entry->count == 0) { + if (object_entry->second->count == 0) { /* Decrement the count of the number of objects in this memory-mapped file * that the client is using. The corresponding increment should have * happened in plasma_get. */ - client_mmap_table_entry *entry; - int fd = object_entry->object.handle.store_fd; - HASH_FIND_INT(conn->mmap_table, &fd, entry); - CHECK(entry != NULL); - entry->count -= 1; - CHECK(entry->count >= 0); + int fd = object_entry->second->object.handle.store_fd; + auto entry = conn->mmap_table.find(fd); + CHECK(entry != conn->mmap_table.end()); + entry->second->count -= 1; + CHECK(entry->second->count >= 0); /* If none are being used then unmap the file. */ - if (entry->count == 0) { - munmap(entry->pointer, entry->length); + if (entry->second->count == 0) { + munmap(entry->second->pointer, entry->second->length); /* Remove the corresponding entry from the hash table. */ - HASH_DELETE(hh, conn->mmap_table, entry); - free(entry); + delete entry->second; + conn->mmap_table.erase(fd); } /* Tell the store that the client no longer needs the object. */ CHECK(plasma_send_ReleaseRequest(conn->store_conn, conn->builder, object_id) >= 0); /* Update the in_use_object_bytes. */ - conn->in_use_object_bytes -= - (object_entry->object.data_size + object_entry->object.metadata_size); + conn->in_use_object_bytes -= (object_entry->second->object.data_size + + object_entry->second->object.metadata_size); DCHECK(conn->in_use_object_bytes >= 0); /* Remove the entry from the hash table of objects currently in use. */ - HASH_DELETE(hh, conn->objects_in_use, object_entry); - free(object_entry); + delete object_entry->second; + conn->objects_in_use.erase(object_id); } } void plasma_release(PlasmaConnection *conn, ObjectID obj_id) { - /* Add the new object to the release history. The corresponding call to free - * will occur in plasma_perform_release or in plasma_disconnect. */ - pending_release *pending_release_entry = - (pending_release *) malloc(sizeof(pending_release)); - pending_release_entry->object_id = obj_id; - DL_APPEND(conn->release_history, pending_release_entry); - conn->release_history_length += 1; + /* Add the new object to the release history. */ + conn->release_history.push_front(obj_id); /* If there are too many bytes in use by the client or if there are too many * pending release calls, and there are at least some pending release calls in * the release_history list, then release some objects. */ while ((conn->in_use_object_bytes > MIN(L3_CACHE_SIZE_BYTES, conn->store_capacity / 100) || - conn->release_history_length > conn->config.release_delay) && - conn->release_history_length > 0) { - DCHECK(conn->release_history != NULL); + conn->release_history.size() > conn->config.release_delay) && + conn->release_history.size() > 0) { /* Perform a release for the object ID for the first pending release. */ - plasma_perform_release(conn, conn->release_history->object_id); - /* Remove the first entry from the doubly-linked list. Note that the pointer - * to the doubly linked list is just the pointer to the first entry. */ - pending_release *release_history_first_entry = conn->release_history; - DL_DELETE(conn->release_history, release_history_first_entry); - free(release_history_first_entry); - conn->release_history_length -= 1; - DCHECK(conn->release_history_length >= 0); - } - if (conn->release_history_length == 0) { - DCHECK(conn->release_history == NULL); + plasma_perform_release(conn, conn->release_history.back()); + /* Remove the last entry from the release history. */ + conn->release_history.pop_back(); } } /* This method is used to query whether the plasma store contains an object. */ void plasma_contains(PlasmaConnection *conn, ObjectID obj_id, int *has_object) { /* Check if we already have a reference to the object. */ - object_in_use_entry *object_entry; - HASH_FIND(hh, conn->objects_in_use, &obj_id, sizeof(obj_id), object_entry); - if (object_entry) { + if (conn->objects_in_use.count(obj_id) > 0) { *has_object = 1; } else { /* If we don't already have a reference to the object, check with the store @@ -558,14 +514,12 @@ bool plasma_compute_object_hash(PlasmaConnection *conn, void plasma_seal(PlasmaConnection *conn, ObjectID object_id) { /* Make sure this client has a reference to the object before sending the * request to Plasma. */ - object_in_use_entry *object_entry; - HASH_FIND(hh, conn->objects_in_use, &object_id, sizeof(object_id), - object_entry); - CHECKM(object_entry != NULL, + auto object_entry = conn->objects_in_use.find(object_id); + CHECKM(object_entry != conn->objects_in_use.end(), "Plasma client called seal an object without a reference to it"); - CHECKM(!object_entry->is_sealed, + CHECKM(!object_entry->second->is_sealed, "Plasma client called seal an already sealed object"); - object_entry->is_sealed = true; + object_entry->second->is_sealed = true; /* Send the seal request to Plasma. */ static unsigned char digest[DIGEST_SIZE]; CHECK(plasma_compute_object_hash(conn, object_id, &digest[0])); @@ -623,8 +577,7 @@ PlasmaConnection *plasma_connect(const char *store_socket_name, const char *manager_socket_name, int release_delay) { /* Initialize the store connection struct */ - PlasmaConnection *result = - (PlasmaConnection *) malloc(sizeof(PlasmaConnection)); + PlasmaConnection *result = new PlasmaConnection(); result->store_conn = connect_ipc_sock_retry(store_socket_name, -1, -1); if (manager_socket_name != NULL) { result->manager_conn = connect_ipc_sock_retry(manager_socket_name, -1, -1); @@ -632,13 +585,7 @@ PlasmaConnection *plasma_connect(const char *store_socket_name, result->manager_conn = -1; } result->builder = make_protocol_builder(); - result->mmap_table = NULL; - result->objects_in_use = NULL; result->config.release_delay = release_delay; - /* Initialize the release history doubly-linked list to NULL and also - * initialize other implementation details of the release history. */ - result->release_history = NULL; - result->release_history_length = 0; result->in_use_object_bytes = 0; /* Send a ConnectRequest to the store to get its memory capacity. */ plasma_send_ConnectRequest(result->store_conn, result->builder); @@ -650,23 +597,14 @@ PlasmaConnection *plasma_connect(const char *store_socket_name, } void plasma_disconnect(PlasmaConnection *conn) { - /* Clean up state for objects and memory pages in use. NOTE: We purposefully - * do not finish sending release calls for objects in use, so that we don't - * duplicate plasma_release calls (when handling a SIGTERM, for example). */ - pending_release *element, *temp; - DL_FOREACH_SAFE(conn->release_history, element, temp) { - DL_DELETE(conn->release_history, element); - free(element); + /* NOTE: We purposefully do not finish sending release calls for objects in + * use, so that we don't duplicate plasma_release calls (when handling a + * SIGTERM, for example). */ + for (auto &entry : conn->objects_in_use) { + delete entry.second; } - object_in_use_entry *current_entry, *temp_entry; - HASH_ITER(hh, conn->objects_in_use, current_entry, temp_entry) { - HASH_DELETE(hh, conn->objects_in_use, current_entry); - free(current_entry); - } - client_mmap_table_entry *mmap_entry, *temp_mmap_entry; - HASH_ITER(hh, conn->mmap_table, mmap_entry, temp_mmap_entry) { - HASH_DELETE(hh, conn->mmap_table, mmap_entry); - free(mmap_entry); + for (auto &entry : conn->mmap_table) { + delete entry.second; } free_protocol_builder(conn->builder); /* Close the connections to Plasma. The Plasma store will release the objects @@ -675,7 +613,7 @@ void plasma_disconnect(PlasmaConnection *conn) { if (conn->manager_conn >= 0) { close(conn->manager_conn); } - free(conn); + delete conn; } bool plasma_manager_is_connected(PlasmaConnection *conn) {