Convert plasma client array and object notification queue to STL (#482)

* Conver plasma clients to STL

* use a deque for object notifications in plasma store for perf

* cleanup

* linting

* fix include order
This commit is contained in:
Philipp Moritz
2017-04-24 00:43:48 -07:00
committed by Robert Nishihara
parent e36de2dad1
commit 892e53d69e
2 changed files with 30 additions and 37 deletions
+3 -1
View File
@@ -22,6 +22,8 @@
/** Allocation granularity used in plasma for object allocation. */
#define BLOCK_SIZE 64
struct Client;
/**
* Object request data structure. Used in the plasma_wait_for_objects()
* argument.
@@ -111,7 +113,7 @@ typedef struct {
/** Pointer to the object data. Needed to free the object. */
uint8_t *pointer;
/** An array of the clients that are currently using this object. */
UT_array *clients;
std::vector<Client *> clients;
/** The state of the object, e.g., whether it is open or sealed. */
object_state state;
/** The digest of the object. Used to see if two objects are the same. */
+27 -36
View File
@@ -25,6 +25,7 @@
#include <limits.h>
#include <poll.h>
#include <deque>
#include <unordered_map>
#include <unordered_set>
#include <vector>
@@ -59,10 +60,6 @@ struct Client {
PlasmaStoreState *plasma_state;
};
/* This is used to define the array of clients used to define the
* object_table_entry type. */
UT_icd client_icd = {sizeof(Client *), NULL, NULL, NULL};
/* This is used to define the queue of object notifications for plasma
* subscribers. */
UT_icd object_info_icd = {sizeof(uint8_t *), NULL, NULL, NULL};
@@ -72,7 +69,7 @@ typedef struct {
int subscriber_fd;
/** The object notifications for clients. We notify the client about the
* objects in the order that the objects were sealed or deleted. */
UT_array *object_notifications;
std::deque<uint8_t *> *object_notifications;
/** Handle for the uthash table. */
UT_hash_handle hh;
} NotificationQueue;
@@ -159,18 +156,16 @@ void PlasmaStoreState_free(PlasmaStoreState *state) {
* up to make the valgrind warnings go away. Objects that
* are still reachable are not cleaned up. */
for (const auto &it : state->plasma_store_info->objects) {
utarray_free(it.second->clients);
delete it.second;
}
NotificationQueue *queue, *temp_queue;
HASH_ITER(hh, state->pending_notifications, queue, temp_queue) {
for (int i = 0; i < utarray_len(queue->object_notifications); ++i) {
uint8_t **notification =
(uint8_t **) utarray_eltptr(queue->object_notifications, i);
uint8_t *data = *notification;
for (int i = 0; i < queue->object_notifications->size(); ++i) {
uint8_t *notification = (uint8_t *) queue->object_notifications->at(i);
uint8_t *data = notification;
free(data);
}
utarray_free(queue->object_notifications);
delete queue->object_notifications;
}
}
@@ -182,15 +177,14 @@ void push_notification(PlasmaStoreState *state,
void add_client_to_object_clients(object_table_entry *entry,
Client *client_info) {
/* Check if this client is already using the object. */
for (int i = 0; i < utarray_len(entry->clients); ++i) {
Client **c = (Client **) utarray_eltptr(entry->clients, i);
if (*c == client_info) {
for (int i = 0; i < entry->clients.size(); ++i) {
if (entry->clients[i] == client_info) {
return;
}
}
/* If there are no other clients using this object, notify the eviction policy
* that the object is being used. */
if (utarray_len(entry->clients) == 0) {
if (entry->clients.size() == 0) {
/* Tell the eviction policy that this object is being used. */
int64_t num_objects_to_evict;
ObjectID *objects_to_evict;
@@ -202,7 +196,7 @@ void add_client_to_object_clients(object_table_entry *entry,
objects_to_evict);
}
/* Add the client pointer to the list of clients using this object. */
utarray_push_back(entry->clients, &client_info);
entry->clients.push_back(client_info);
}
/* Create a new object buffer in the hash table. */
@@ -262,7 +256,6 @@ int create_object(Client *client_context,
entry->map_size = map_size;
entry->offset = offset;
entry->state = PLASMA_CREATED;
utarray_new(entry->clients, &client_icd);
plasma_state->plasma_store_info->objects[obj_id] = entry;
result->handle.store_fd = fd;
result->handle.mmap_size = map_size;
@@ -453,14 +446,13 @@ void process_get_request(Client *client_context,
int remove_client_from_object_clients(object_table_entry *entry,
Client *client_info) {
/* Find the location of the client in the array. */
for (int i = 0; i < utarray_len(entry->clients); ++i) {
Client **c = (Client **) utarray_eltptr(entry->clients, i);
if (*c == client_info) {
for (int i = 0; i < entry->clients.size(); ++i) {
if (entry->clients[i] == client_info) {
/* Remove the client from the array. */
utarray_erase(entry->clients, i, 1);
entry->clients.erase(entry->clients.begin() + i);
/* If no more clients are using this object, notify the eviction policy
* that the object is no longer being used. */
if (utarray_len(entry->clients) == 0) {
if (entry->clients.size() == 0) {
/* Tell the eviction policy that this object is no longer being used. */
int64_t num_objects_to_evict;
ObjectID *objects_to_evict;
@@ -530,11 +522,10 @@ void delete_object(PlasmaStoreState *plasma_state, ObjectID object_id) {
CHECKM(entry != NULL, "To delete an object it must be in the object table.");
CHECKM(entry->state == PLASMA_SEALED,
"To delete an object it must have been sealed.");
CHECKM(utarray_len(entry->clients) == 0,
CHECKM(entry->clients.size() == 0,
"To delete an object, there must be no clients currently using it.");
uint8_t *pointer = entry->pointer;
dlfree(pointer);
utarray_free(entry->clients);
plasma_state->plasma_store_info->objects.erase(object_id);
delete entry;
/* Inform all subscribers that the object has been deleted. */
@@ -563,7 +554,7 @@ void push_notification(PlasmaStoreState *plasma_state,
NotificationQueue *queue, *temp_queue;
HASH_ITER(hh, plasma_state->pending_notifications, queue, temp_queue) {
uint8_t *notification = create_object_info_buffer(object_info);
utarray_push_back(queue->object_notifications, &notification);
queue->object_notifications->push_back(notification);
send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state,
0);
/* The notification gets freed in send_notifications when the notification
@@ -585,15 +576,13 @@ void send_notifications(event_loop *loop,
bool closed = false;
/* Loop over the array of pending notifications and send as many of them as
* possible. */
for (int i = 0; i < utarray_len(queue->object_notifications); ++i) {
uint8_t **notification =
(uint8_t **) utarray_eltptr(queue->object_notifications, i);
uint8_t *data = *notification;
for (int i = 0; i < queue->object_notifications->size(); ++i) {
uint8_t *notification = (uint8_t *) queue->object_notifications->at(i);
/* Decode the length, which is the first bytes of the message. */
int64_t size = *((int64_t *) data);
int64_t size = *((int64_t *) notification);
/* Attempt to send a notification about this object ID. */
int nbytes = send(client_sock, data, sizeof(int64_t) + size, 0);
int nbytes = send(client_sock, notification, sizeof(int64_t) + size, 0);
if (nbytes >= 0) {
CHECK(nbytes == sizeof(int64_t) + size);
} else if (nbytes == -1 &&
@@ -618,21 +607,23 @@ void send_notifications(event_loop *loop,
num_processed += 1;
/* The corresponding malloc happened in create_object_info_buffer
* within push_notification. */
free(data);
free(notification);
}
/* Remove the sent notifications from the array. */
utarray_erase(queue->object_notifications, 0, num_processed);
queue->object_notifications->erase(
queue->object_notifications->begin(),
queue->object_notifications->begin() + num_processed);
/* Stop sending notifications if the pipe was broken. */
if (closed) {
close(client_sock);
utarray_free(queue->object_notifications);
delete queue->object_notifications;
HASH_DEL(plasma_state->pending_notifications, queue);
free(queue);
}
/* If we have sent all notifications, remove the fd from the event loop. */
if (utarray_len(queue->object_notifications) == 0) {
if (queue->object_notifications->empty()) {
event_loop_remove_file(loop, client_sock);
}
}
@@ -656,7 +647,7 @@ void subscribe_to_updates(Client *client_context, int conn) {
NotificationQueue *queue =
(NotificationQueue *) malloc(sizeof(NotificationQueue));
queue->subscriber_fd = fd;
utarray_new(queue->object_notifications, &object_info_icd);
queue->object_notifications = new std::deque<uint8_t *>();
HASH_ADD_INT(plasma_state->pending_notifications, subscriber_fd, queue);
/* Push notifications to the new subscriber about existing objects. */