Convert Plasma client to STL (#486)

* convert mmap table to STL

* update

* fix

* convert objects_in_use

* fix

* convert release_history

* cleanup

* linting

* update

* fix

* linting
This commit is contained in:
Philipp Moritz
2017-04-25 01:25:40 -07:00
committed by Robert Nishihara
parent 0ac125e9b2
commit b7ace01b5f
+72 -134
View File
@@ -25,10 +25,9 @@
#include "plasma.h"
#include "plasma_protocol.h"
#include "plasma_client.h"
#include "uthash.h"
#include "utlist.h"
/* C++ includes */
#include <deque>
#include <vector>
#include <thread>
@@ -46,7 +45,7 @@ extern "C" {
#define BYTES_IN_MB (1 << 20)
static std::vector<std::thread> threadpool_(THREADPOOL_SIZE);
typedef struct {
struct ClientMmapTableEntry {
/** Key that uniquely identifies the memory mapped file. In practice, we
* take the numerical value of the file descriptor in the object store. */
int key;
@@ -57,25 +56,21 @@ typedef struct {
/** The number of objects in this memory-mapped file that are currently being
* used by the client. When this count reaches zeros, we unmap the file. */
int count;
/** Handle for the uthash table. */
UT_hash_handle hh;
} client_mmap_table_entry;
};
typedef struct {
struct ObjectInUseEntry {
/** The ID of the object. This is used as the key in the hash table. */
ObjectID object_id;
/** A count of the number of times this client has called plasma_create or
* plasma_get on this object ID minus the number of calls to plasma_release.
* When this count reaches zero, we remove the entry from the objects_in_use
* and decrement a count in the relevant client_mmap_table_entry. */
* and decrement a count in the relevant ClientMmapTableEntry. */
int count;
/** Cached information to read the object. */
PlasmaObject object;
/** A flag representing whether the object has been sealed. */
bool is_sealed;
/** Handle for the uthash table. */
UT_hash_handle hh;
} object_in_use_entry;
};
/** Configuration options for the plasma client. */
typedef struct {
@@ -85,17 +80,6 @@ typedef struct {
int release_delay;
} plasma_client_config;
/** An element representing a pending release call in a doubly-linked list. This
* is used to implement the delayed release mechanism. */
typedef struct pending_release {
/** The object_id of the released object. */
ObjectID object_id;
/** Needed for the doubly-linked list macros. */
struct pending_release *prev;
/** Needed for the doubly-linked list macros. */
struct pending_release *next;
} pending_release;
/** Information about a connection between a Plasma Client and Plasma Store.
* This is used to avoid mapping the same files into memory multiple times. */
struct PlasmaConnection {
@@ -112,19 +96,17 @@ struct PlasmaConnection {
/** Table of dlmalloc buffer files that have been memory mapped so far. This
* is a hash table mapping a file descriptor to a struct containing the
* address of the corresponding memory-mapped file. */
client_mmap_table_entry *mmap_table;
std::unordered_map<int, ClientMmapTableEntry *> mmap_table;
/** A hash table of the object IDs that are currently being used by this
* client. */
object_in_use_entry *objects_in_use;
/** Object IDs of the last few release calls. This is a doubly-linked list and
std::unordered_map<ObjectID, ObjectInUseEntry *, UniqueIDHasher>
objects_in_use;
/** Object IDs of the last few release calls. This is a deque and
* is used to delay releasing objects to see if they can be reused by
* subsequent tasks so we do not unneccessarily invalidate cpu caches.
* TODO(pcm): replace this with a proper lru cache using the size of the L3
* cache. */
pending_release *release_history;
/** The length of the release_history doubly-linked list. This is an
* implementation detail. */
int release_history_length;
std::deque<ObjectID> release_history;
/** The number of bytes in the combined objects that are held in the release
* history doubly-linked list. If this is too large then the client starts
* releasing objects. */
@@ -145,11 +127,10 @@ uint8_t *lookup_or_mmap(PlasmaConnection *conn,
int fd,
int store_fd_val,
int64_t map_size) {
client_mmap_table_entry *entry;
HASH_FIND_INT(conn->mmap_table, &store_fd_val, entry);
if (entry) {
auto entry = conn->mmap_table.find(store_fd_val);
if (entry != conn->mmap_table.end()) {
close(fd);
return entry->pointer;
return entry->second->pointer;
} else {
uint8_t *result = (uint8_t *) mmap(NULL, map_size, PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
@@ -157,12 +138,12 @@ uint8_t *lookup_or_mmap(PlasmaConnection *conn,
LOG_FATAL("mmap failed");
}
close(fd);
entry = (client_mmap_table_entry *) malloc(sizeof(client_mmap_table_entry));
ClientMmapTableEntry *entry = new ClientMmapTableEntry();
entry->key = store_fd_val;
entry->pointer = result;
entry->length = map_size;
entry->count = 0;
HASH_ADD_INT(conn->mmap_table, key, entry);
conn->mmap_table[store_fd_val] = entry;
return result;
}
}
@@ -170,10 +151,9 @@ uint8_t *lookup_or_mmap(PlasmaConnection *conn,
/* Get a pointer to a file that we know has been memory mapped in this client
* process before. */
uint8_t *lookup_mmapped_file(PlasmaConnection *conn, int store_fd_val) {
client_mmap_table_entry *entry;
HASH_FIND_INT(conn->mmap_table, &store_fd_val, entry);
CHECK(entry);
return entry->pointer;
auto entry = conn->mmap_table.find(store_fd_val);
CHECK(entry != conn->mmap_table.end());
return entry->second->pointer;
}
void increment_object_count(PlasmaConnection *conn,
@@ -182,31 +162,29 @@ void increment_object_count(PlasmaConnection *conn,
bool is_sealed) {
/* Increment the count of the object to track the fact that it is being used.
* The corresponding decrement should happen in plasma_release. */
object_in_use_entry *object_entry;
HASH_FIND(hh, conn->objects_in_use, &object_id, sizeof(object_id),
object_entry);
if (object_entry == NULL) {
auto elem = conn->objects_in_use.find(object_id);
ObjectInUseEntry *object_entry;
if (elem == conn->objects_in_use.end()) {
/* Add this object ID to the hash table of object IDs in use. The
* corresponding call to free happens in plasma_release. */
object_entry = (object_in_use_entry *) malloc(sizeof(object_in_use_entry));
object_entry = new ObjectInUseEntry();
object_entry->object_id = object_id;
object_entry->object = *object;
object_entry->count = 0;
object_entry->is_sealed = is_sealed;
HASH_ADD(hh, conn->objects_in_use, object_id, sizeof(object_id),
object_entry);
conn->objects_in_use[object_id] = object_entry;
/* Increment the count of the number of objects in the memory-mapped file
* that are being used. The corresponding decrement should happen in
* plasma_release. */
client_mmap_table_entry *entry;
HASH_FIND_INT(conn->mmap_table, &object->handle.store_fd, entry);
CHECK(entry != NULL);
CHECK(entry->count >= 0);
auto entry = conn->mmap_table.find(object->handle.store_fd);
CHECK(entry != conn->mmap_table.end());
CHECK(entry->second->count >= 0);
/* Update the in_use_object_bytes. */
conn->in_use_object_bytes +=
(object_entry->object.data_size + object_entry->object.metadata_size);
entry->count += 1;
entry->second->count += 1;
} else {
object_entry = elem->second;
CHECK(object_entry->count > 0);
}
/* Increment the count of the number of instances of this object that are
@@ -277,22 +255,19 @@ void plasma_get(PlasmaConnection *conn,
/* Fill out the info for the objects that are already in use locally. */
bool all_present = true;
for (int i = 0; i < num_objects; ++i) {
object_in_use_entry *object_entry;
HASH_FIND(hh, conn->objects_in_use, &object_ids[i], sizeof(object_ids[i]),
object_entry);
if (object_entry == NULL) {
auto object_entry = conn->objects_in_use.find(object_ids[i]);
if (object_entry == conn->objects_in_use.end()) {
/* This object is not currently in use by this client, so we need to send
* a request to the store. */
all_present = false;
/* Make a note to ourselves that the object is not present. */
object_buffers[i].data_size = -1;
} else {
PlasmaObject *object;
/* NOTE: If the object is still unsealed, we will deadlock, since we must
* have been the one who created it. */
CHECKM(object_entry->is_sealed,
CHECKM(object_entry->second->is_sealed,
"Plasma client called get on an unsealed object that it created");
object = &object_entry->object;
PlasmaObject *object = &object_entry->second->object;
object_buffers[i].data =
lookup_mmapped_file(conn, object->handle.store_fd);
object_buffers[i].data = object_buffers[i].data + object->data_offset;
@@ -386,80 +361,61 @@ void plasma_perform_release(PlasmaConnection *conn, ObjectID object_id) {
/* Decrement the count of the number of instances of this object that are
* being used by this client. The corresponding increment should have happened
* in plasma_get. */
object_in_use_entry *object_entry;
HASH_FIND(hh, conn->objects_in_use, &object_id, sizeof(object_id),
object_entry);
CHECK(object_entry != NULL);
object_entry->count -= 1;
CHECK(object_entry->count >= 0);
auto object_entry = conn->objects_in_use.find(object_id);
CHECK(object_entry != conn->objects_in_use.end());
object_entry->second->count -= 1;
CHECK(object_entry->second->count >= 0);
/* Check if the client is no longer using this object. */
if (object_entry->count == 0) {
if (object_entry->second->count == 0) {
/* Decrement the count of the number of objects in this memory-mapped file
* that the client is using. The corresponding increment should have
* happened in plasma_get. */
client_mmap_table_entry *entry;
int fd = object_entry->object.handle.store_fd;
HASH_FIND_INT(conn->mmap_table, &fd, entry);
CHECK(entry != NULL);
entry->count -= 1;
CHECK(entry->count >= 0);
int fd = object_entry->second->object.handle.store_fd;
auto entry = conn->mmap_table.find(fd);
CHECK(entry != conn->mmap_table.end());
entry->second->count -= 1;
CHECK(entry->second->count >= 0);
/* If none are being used then unmap the file. */
if (entry->count == 0) {
munmap(entry->pointer, entry->length);
if (entry->second->count == 0) {
munmap(entry->second->pointer, entry->second->length);
/* Remove the corresponding entry from the hash table. */
HASH_DELETE(hh, conn->mmap_table, entry);
free(entry);
delete entry->second;
conn->mmap_table.erase(fd);
}
/* Tell the store that the client no longer needs the object. */
CHECK(plasma_send_ReleaseRequest(conn->store_conn, conn->builder,
object_id) >= 0);
/* Update the in_use_object_bytes. */
conn->in_use_object_bytes -=
(object_entry->object.data_size + object_entry->object.metadata_size);
conn->in_use_object_bytes -= (object_entry->second->object.data_size +
object_entry->second->object.metadata_size);
DCHECK(conn->in_use_object_bytes >= 0);
/* Remove the entry from the hash table of objects currently in use. */
HASH_DELETE(hh, conn->objects_in_use, object_entry);
free(object_entry);
delete object_entry->second;
conn->objects_in_use.erase(object_id);
}
}
void plasma_release(PlasmaConnection *conn, ObjectID obj_id) {
/* Add the new object to the release history. The corresponding call to free
* will occur in plasma_perform_release or in plasma_disconnect. */
pending_release *pending_release_entry =
(pending_release *) malloc(sizeof(pending_release));
pending_release_entry->object_id = obj_id;
DL_APPEND(conn->release_history, pending_release_entry);
conn->release_history_length += 1;
/* Add the new object to the release history. */
conn->release_history.push_front(obj_id);
/* If there are too many bytes in use by the client or if there are too many
* pending release calls, and there are at least some pending release calls in
* the release_history list, then release some objects. */
while ((conn->in_use_object_bytes >
MIN(L3_CACHE_SIZE_BYTES, conn->store_capacity / 100) ||
conn->release_history_length > conn->config.release_delay) &&
conn->release_history_length > 0) {
DCHECK(conn->release_history != NULL);
conn->release_history.size() > conn->config.release_delay) &&
conn->release_history.size() > 0) {
/* Perform a release for the object ID for the first pending release. */
plasma_perform_release(conn, conn->release_history->object_id);
/* Remove the first entry from the doubly-linked list. Note that the pointer
* to the doubly linked list is just the pointer to the first entry. */
pending_release *release_history_first_entry = conn->release_history;
DL_DELETE(conn->release_history, release_history_first_entry);
free(release_history_first_entry);
conn->release_history_length -= 1;
DCHECK(conn->release_history_length >= 0);
}
if (conn->release_history_length == 0) {
DCHECK(conn->release_history == NULL);
plasma_perform_release(conn, conn->release_history.back());
/* Remove the last entry from the release history. */
conn->release_history.pop_back();
}
}
/* This method is used to query whether the plasma store contains an object. */
void plasma_contains(PlasmaConnection *conn, ObjectID obj_id, int *has_object) {
/* Check if we already have a reference to the object. */
object_in_use_entry *object_entry;
HASH_FIND(hh, conn->objects_in_use, &obj_id, sizeof(obj_id), object_entry);
if (object_entry) {
if (conn->objects_in_use.count(obj_id) > 0) {
*has_object = 1;
} else {
/* If we don't already have a reference to the object, check with the store
@@ -558,14 +514,12 @@ bool plasma_compute_object_hash(PlasmaConnection *conn,
void plasma_seal(PlasmaConnection *conn, ObjectID object_id) {
/* Make sure this client has a reference to the object before sending the
* request to Plasma. */
object_in_use_entry *object_entry;
HASH_FIND(hh, conn->objects_in_use, &object_id, sizeof(object_id),
object_entry);
CHECKM(object_entry != NULL,
auto object_entry = conn->objects_in_use.find(object_id);
CHECKM(object_entry != conn->objects_in_use.end(),
"Plasma client called seal an object without a reference to it");
CHECKM(!object_entry->is_sealed,
CHECKM(!object_entry->second->is_sealed,
"Plasma client called seal an already sealed object");
object_entry->is_sealed = true;
object_entry->second->is_sealed = true;
/* Send the seal request to Plasma. */
static unsigned char digest[DIGEST_SIZE];
CHECK(plasma_compute_object_hash(conn, object_id, &digest[0]));
@@ -623,8 +577,7 @@ PlasmaConnection *plasma_connect(const char *store_socket_name,
const char *manager_socket_name,
int release_delay) {
/* Initialize the store connection struct */
PlasmaConnection *result =
(PlasmaConnection *) malloc(sizeof(PlasmaConnection));
PlasmaConnection *result = new PlasmaConnection();
result->store_conn = connect_ipc_sock_retry(store_socket_name, -1, -1);
if (manager_socket_name != NULL) {
result->manager_conn = connect_ipc_sock_retry(manager_socket_name, -1, -1);
@@ -632,13 +585,7 @@ PlasmaConnection *plasma_connect(const char *store_socket_name,
result->manager_conn = -1;
}
result->builder = make_protocol_builder();
result->mmap_table = NULL;
result->objects_in_use = NULL;
result->config.release_delay = release_delay;
/* Initialize the release history doubly-linked list to NULL and also
* initialize other implementation details of the release history. */
result->release_history = NULL;
result->release_history_length = 0;
result->in_use_object_bytes = 0;
/* Send a ConnectRequest to the store to get its memory capacity. */
plasma_send_ConnectRequest(result->store_conn, result->builder);
@@ -650,23 +597,14 @@ PlasmaConnection *plasma_connect(const char *store_socket_name,
}
void plasma_disconnect(PlasmaConnection *conn) {
/* Clean up state for objects and memory pages in use. NOTE: We purposefully
* do not finish sending release calls for objects in use, so that we don't
* duplicate plasma_release calls (when handling a SIGTERM, for example). */
pending_release *element, *temp;
DL_FOREACH_SAFE(conn->release_history, element, temp) {
DL_DELETE(conn->release_history, element);
free(element);
/* NOTE: We purposefully do not finish sending release calls for objects in
* use, so that we don't duplicate plasma_release calls (when handling a
* SIGTERM, for example). */
for (auto &entry : conn->objects_in_use) {
delete entry.second;
}
object_in_use_entry *current_entry, *temp_entry;
HASH_ITER(hh, conn->objects_in_use, current_entry, temp_entry) {
HASH_DELETE(hh, conn->objects_in_use, current_entry);
free(current_entry);
}
client_mmap_table_entry *mmap_entry, *temp_mmap_entry;
HASH_ITER(hh, conn->mmap_table, mmap_entry, temp_mmap_entry) {
HASH_DELETE(hh, conn->mmap_table, mmap_entry);
free(mmap_entry);
for (auto &entry : conn->mmap_table) {
delete entry.second;
}
free_protocol_builder(conn->builder);
/* Close the connections to Plasma. The Plasma store will release the objects
@@ -675,7 +613,7 @@ void plasma_disconnect(PlasmaConnection *conn) {
if (conn->manager_conn >= 0) {
close(conn->manager_conn);
}
free(conn);
delete conn;
}
bool plasma_manager_is_connected(PlasmaConnection *conn) {