diff --git a/src/plasma/plasma.h b/src/plasma/plasma.h index 34efd4060..5ea28a029 100644 --- a/src/plasma/plasma.h +++ b/src/plasma/plasma.h @@ -22,6 +22,8 @@ /** Allocation granularity used in plasma for object allocation. */ #define BLOCK_SIZE 64 +struct Client; + /** * Object request data structure. Used in the plasma_wait_for_objects() * argument. @@ -111,7 +113,7 @@ typedef struct { /** Pointer to the object data. Needed to free the object. */ uint8_t *pointer; /** An array of the clients that are currently using this object. */ - UT_array *clients; + std::vector clients; /** The state of the object, e.g., whether it is open or sealed. */ object_state state; /** The digest of the object. Used to see if two objects are the same. */ diff --git a/src/plasma/plasma_store.cc b/src/plasma/plasma_store.cc index d5f273dec..9fe627e17 100644 --- a/src/plasma/plasma_store.cc +++ b/src/plasma/plasma_store.cc @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -59,10 +60,6 @@ struct Client { PlasmaStoreState *plasma_state; }; -/* This is used to define the array of clients used to define the - * object_table_entry type. */ -UT_icd client_icd = {sizeof(Client *), NULL, NULL, NULL}; - /* This is used to define the queue of object notifications for plasma * subscribers. */ UT_icd object_info_icd = {sizeof(uint8_t *), NULL, NULL, NULL}; @@ -72,7 +69,7 @@ typedef struct { int subscriber_fd; /** The object notifications for clients. We notify the client about the * objects in the order that the objects were sealed or deleted. */ - UT_array *object_notifications; + std::deque *object_notifications; /** Handle for the uthash table. */ UT_hash_handle hh; } NotificationQueue; @@ -159,18 +156,16 @@ void PlasmaStoreState_free(PlasmaStoreState *state) { * up to make the valgrind warnings go away. Objects that * are still reachable are not cleaned up. */ for (const auto &it : state->plasma_store_info->objects) { - utarray_free(it.second->clients); delete it.second; } NotificationQueue *queue, *temp_queue; HASH_ITER(hh, state->pending_notifications, queue, temp_queue) { - for (int i = 0; i < utarray_len(queue->object_notifications); ++i) { - uint8_t **notification = - (uint8_t **) utarray_eltptr(queue->object_notifications, i); - uint8_t *data = *notification; + for (int i = 0; i < queue->object_notifications->size(); ++i) { + uint8_t *notification = (uint8_t *) queue->object_notifications->at(i); + uint8_t *data = notification; free(data); } - utarray_free(queue->object_notifications); + delete queue->object_notifications; } } @@ -182,15 +177,14 @@ void push_notification(PlasmaStoreState *state, void add_client_to_object_clients(object_table_entry *entry, Client *client_info) { /* Check if this client is already using the object. */ - for (int i = 0; i < utarray_len(entry->clients); ++i) { - Client **c = (Client **) utarray_eltptr(entry->clients, i); - if (*c == client_info) { + for (int i = 0; i < entry->clients.size(); ++i) { + if (entry->clients[i] == client_info) { return; } } /* If there are no other clients using this object, notify the eviction policy * that the object is being used. */ - if (utarray_len(entry->clients) == 0) { + if (entry->clients.size() == 0) { /* Tell the eviction policy that this object is being used. */ int64_t num_objects_to_evict; ObjectID *objects_to_evict; @@ -202,7 +196,7 @@ void add_client_to_object_clients(object_table_entry *entry, objects_to_evict); } /* Add the client pointer to the list of clients using this object. */ - utarray_push_back(entry->clients, &client_info); + entry->clients.push_back(client_info); } /* Create a new object buffer in the hash table. */ @@ -262,7 +256,6 @@ int create_object(Client *client_context, entry->map_size = map_size; entry->offset = offset; entry->state = PLASMA_CREATED; - utarray_new(entry->clients, &client_icd); plasma_state->plasma_store_info->objects[obj_id] = entry; result->handle.store_fd = fd; result->handle.mmap_size = map_size; @@ -453,14 +446,13 @@ void process_get_request(Client *client_context, int remove_client_from_object_clients(object_table_entry *entry, Client *client_info) { /* Find the location of the client in the array. */ - for (int i = 0; i < utarray_len(entry->clients); ++i) { - Client **c = (Client **) utarray_eltptr(entry->clients, i); - if (*c == client_info) { + for (int i = 0; i < entry->clients.size(); ++i) { + if (entry->clients[i] == client_info) { /* Remove the client from the array. */ - utarray_erase(entry->clients, i, 1); + entry->clients.erase(entry->clients.begin() + i); /* If no more clients are using this object, notify the eviction policy * that the object is no longer being used. */ - if (utarray_len(entry->clients) == 0) { + if (entry->clients.size() == 0) { /* Tell the eviction policy that this object is no longer being used. */ int64_t num_objects_to_evict; ObjectID *objects_to_evict; @@ -530,11 +522,10 @@ void delete_object(PlasmaStoreState *plasma_state, ObjectID object_id) { CHECKM(entry != NULL, "To delete an object it must be in the object table."); CHECKM(entry->state == PLASMA_SEALED, "To delete an object it must have been sealed."); - CHECKM(utarray_len(entry->clients) == 0, + CHECKM(entry->clients.size() == 0, "To delete an object, there must be no clients currently using it."); uint8_t *pointer = entry->pointer; dlfree(pointer); - utarray_free(entry->clients); plasma_state->plasma_store_info->objects.erase(object_id); delete entry; /* Inform all subscribers that the object has been deleted. */ @@ -563,7 +554,7 @@ void push_notification(PlasmaStoreState *plasma_state, NotificationQueue *queue, *temp_queue; HASH_ITER(hh, plasma_state->pending_notifications, queue, temp_queue) { uint8_t *notification = create_object_info_buffer(object_info); - utarray_push_back(queue->object_notifications, ¬ification); + queue->object_notifications->push_back(notification); send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state, 0); /* The notification gets freed in send_notifications when the notification @@ -585,15 +576,13 @@ void send_notifications(event_loop *loop, bool closed = false; /* Loop over the array of pending notifications and send as many of them as * possible. */ - for (int i = 0; i < utarray_len(queue->object_notifications); ++i) { - uint8_t **notification = - (uint8_t **) utarray_eltptr(queue->object_notifications, i); - uint8_t *data = *notification; + for (int i = 0; i < queue->object_notifications->size(); ++i) { + uint8_t *notification = (uint8_t *) queue->object_notifications->at(i); /* Decode the length, which is the first bytes of the message. */ - int64_t size = *((int64_t *) data); + int64_t size = *((int64_t *) notification); /* Attempt to send a notification about this object ID. */ - int nbytes = send(client_sock, data, sizeof(int64_t) + size, 0); + int nbytes = send(client_sock, notification, sizeof(int64_t) + size, 0); if (nbytes >= 0) { CHECK(nbytes == sizeof(int64_t) + size); } else if (nbytes == -1 && @@ -618,21 +607,23 @@ void send_notifications(event_loop *loop, num_processed += 1; /* The corresponding malloc happened in create_object_info_buffer * within push_notification. */ - free(data); + free(notification); } /* Remove the sent notifications from the array. */ - utarray_erase(queue->object_notifications, 0, num_processed); + queue->object_notifications->erase( + queue->object_notifications->begin(), + queue->object_notifications->begin() + num_processed); /* Stop sending notifications if the pipe was broken. */ if (closed) { close(client_sock); - utarray_free(queue->object_notifications); + delete queue->object_notifications; HASH_DEL(plasma_state->pending_notifications, queue); free(queue); } /* If we have sent all notifications, remove the fd from the event loop. */ - if (utarray_len(queue->object_notifications) == 0) { + if (queue->object_notifications->empty()) { event_loop_remove_file(loop, client_sock); } } @@ -656,7 +647,7 @@ void subscribe_to_updates(Client *client_context, int conn) { NotificationQueue *queue = (NotificationQueue *) malloc(sizeof(NotificationQueue)); queue->subscriber_fd = fd; - utarray_new(queue->object_notifications, &object_info_icd); + queue->object_notifications = new std::deque(); HASH_ADD_INT(plasma_state->pending_notifications, subscriber_fd, queue); /* Push notifications to the new subscriber about existing objects. */