diff --git a/src/plasma/plasma_manager.cc b/src/plasma/plasma_manager.cc index bb5a7b69a..5333f5901 100644 --- a/src/plasma/plasma_manager.cc +++ b/src/plasma/plasma_manager.cc @@ -21,12 +21,12 @@ #include /* C++ includes. */ +#include #include #include #include #include "uthash.h" -#include "utlist.h" #include "common_protocol.h" #include "io.h" #include "net.h" @@ -288,15 +288,14 @@ struct ClientConnection { PlasmaManagerState *manager_state; /** Current position in the buffer. */ int64_t cursor; - /** Buffer that this connection is reading from. If this is a connection to - * write data to another plasma store, then it is a linked - * list of buffers to write. */ + /** Linked list of buffers to read or write. */ /* TODO(swang): Split into two queues, data transfers and data requests. */ - PlasmaRequestBuffer *transfer_queue; + std::list transfer_queue; /* A set of object IDs which are queued in the transfer_queue and waiting to * be sent. This is used to avoid sending the same object ID to the same * manager multiple times. */ - PlasmaRequestBuffer *pending_object_transfers; + std::unordered_map + pending_object_transfers; /** Buffer used to receive transfers (data fetches) we want to ignore */ PlasmaRequestBuffer *ignore_buffer; /** File descriptor for the socket connected to the other @@ -652,7 +651,7 @@ void send_queued_request(event_loop *loop, ClientConnection *conn = (ClientConnection *) context; PlasmaManagerState *state = conn->manager_state; - if (conn->transfer_queue == NULL) { + if (conn->transfer_queue.size() == 0) { /* If there are no objects to transfer, temporarily remove this connection * from the event loop. It will be reawoken when we receive another * data request. */ @@ -660,7 +659,7 @@ void send_queued_request(event_loop *loop, return; } - PlasmaRequestBuffer *buf = conn->transfer_queue; + PlasmaRequestBuffer *buf = conn->transfer_queue.front(); int err = 0; switch (buf->type) { case MessageType_PlasmaDataRequest: @@ -705,9 +704,9 @@ void send_queued_request(event_loop *loop, if (buf->type == MessageType_PlasmaDataReply) { /* If we just finished sending an object to a remote manager, then remove * the object from the hash table of pending transfer requests. */ - HASH_DELETE(hh, conn->pending_object_transfers, buf); + conn->pending_object_transfers.erase(buf->object_id); } - DL_DELETE(conn->transfer_queue, buf); + conn->transfer_queue.pop_front(); free(buf); } } @@ -747,7 +746,7 @@ void process_data_chunk(event_loop *loop, int events) { /* Read the object chunk. */ ClientConnection *conn = (ClientConnection *) context; - PlasmaRequestBuffer *buf = conn->transfer_queue; + PlasmaRequestBuffer *buf = conn->transfer_queue.front(); int done = read_object_chunk(conn, buf); if (!done) { return; @@ -763,7 +762,7 @@ void process_data_chunk(event_loop *loop, ARROW_CHECK_OK( conn->manager_state->plasma_conn->Release(buf->object_id.to_plasma_id())); /* Remove the request buffer used for reading this object's data. */ - DL_DELETE(conn->transfer_queue, buf); + conn->transfer_queue.pop_front(); free(buf); /* Switch to listening for requests from this socket, instead of reading * object data. */ @@ -827,10 +826,8 @@ void process_transfer_request(event_loop *loop, /* If there is already a request in the transfer queue with the same object * ID, do not add the transfer request. */ - PlasmaRequestBuffer *pending; - HASH_FIND(hh, manager_conn->pending_object_transfers, &obj_id, sizeof(obj_id), - pending); - if (pending != NULL) { + auto pending_it = manager_conn->pending_object_transfers.find(obj_id); + if (pending_it != manager_conn->pending_object_transfers.end()) { return; } @@ -852,7 +849,7 @@ void process_transfer_request(event_loop *loop, /* If we already have a connection to this manager and its inactive, * (re)register it with the event loop again. */ - if (manager_conn->transfer_queue == NULL) { + if (manager_conn->transfer_queue.size() == 0) { bool success = event_loop_add_file(loop, manager_conn->fd, EVENT_LOOP_WRITE, send_queued_request, manager_conn); if (!success) { @@ -863,8 +860,7 @@ void process_transfer_request(event_loop *loop, DCHECK(object_buffer.metadata == object_buffer.data + object_buffer.data_size); - PlasmaRequestBuffer *buf = - (PlasmaRequestBuffer *) malloc(sizeof(PlasmaRequestBuffer)); + PlasmaRequestBuffer *buf = new PlasmaRequestBuffer(); buf->type = MessageType_PlasmaDataReply; buf->object_id = obj_id; /* We treat buf->data as a pointer to the concatenated data and metadata, so @@ -873,9 +869,8 @@ void process_transfer_request(event_loop *loop, buf->data_size = object_buffer.data_size; buf->metadata_size = object_buffer.metadata_size; - DL_APPEND(manager_conn->transfer_queue, buf); - HASH_ADD(hh, manager_conn->pending_object_transfers, object_id, - sizeof(buf->object_id), buf); + manager_conn->transfer_queue.push_back(buf); + manager_conn->pending_object_transfers[object_id] = buf; } /** @@ -897,8 +892,7 @@ void process_data_request(event_loop *loop, int64_t data_size, int64_t metadata_size, ClientConnection *conn) { - PlasmaRequestBuffer *buf = - (PlasmaRequestBuffer *) malloc(sizeof(PlasmaRequestBuffer)); + PlasmaRequestBuffer *buf = new PlasmaRequestBuffer(); buf->object_id = object_id; buf->data_size = data_size; buf->metadata_size = metadata_size; @@ -913,7 +907,7 @@ void process_data_request(event_loop *loop, if (s.ok()) { /* Add buffer where the fetched data is to be stored to * conn->transfer_queue. */ - DL_APPEND(conn->transfer_queue, buf); + conn->transfer_queue.push_back(buf); } CHECK(conn->cursor == 0); @@ -965,19 +959,18 @@ void request_transfer_from(PlasmaManagerState *manager_state, "This manager is attempting to request a transfer from itself."); } - PlasmaRequestBuffer *transfer_request = - (PlasmaRequestBuffer *) malloc(sizeof(PlasmaRequestBuffer)); + PlasmaRequestBuffer *transfer_request = new PlasmaRequestBuffer(); transfer_request->type = MessageType_PlasmaDataRequest; transfer_request->object_id = fetch_req->object_id; - if (manager_conn->transfer_queue == NULL) { + if (manager_conn->transfer_queue.size() == 0) { /* If we already have a connection to this manager and it's inactive, * (re)register it with the event loop. */ event_loop_add_file(manager_state->loop, manager_conn->fd, EVENT_LOOP_WRITE, send_queued_request, manager_conn); } /* Add this transfer request to this connection's transfer queue. */ - DL_APPEND(manager_conn->transfer_queue, transfer_request); + manager_conn->transfer_queue.push_back(transfer_request); } /* On the next attempt, try the next manager in manager_vector. */ @@ -1446,12 +1439,9 @@ ClientConnection *ClientConnection_init(PlasmaManagerState *state, int client_sock, const char *client_key) { /* Create a new data connection context per client. */ - ClientConnection *conn = - (ClientConnection *) malloc(sizeof(ClientConnection)); + ClientConnection *conn = new ClientConnection(); conn->manager_state = state; conn->cursor = 0; - conn->transfer_queue = NULL; - conn->pending_object_transfers = NULL; conn->fd = client_sock; conn->num_return_objects = 0; @@ -1479,21 +1469,13 @@ ClientConnection *ClientConnection_listen(event_loop *loop, void ClientConnection_free(ClientConnection *client_conn) { PlasmaManagerState *state = client_conn->manager_state; HASH_DELETE(manager_hh, state->manager_connections, client_conn); - /* Free the hash table of object IDs that are waiting to be transferred. */ - PlasmaRequestBuffer *request_buffer, *tmp_buffer; - HASH_ITER(hh, client_conn->pending_object_transfers, request_buffer, - tmp_buffer) { - /* We do not free the PlasmaRequestBuffer here because it is also in the - * transfer queue and will be freed below. */ - HASH_DELETE(hh, client_conn->pending_object_transfers, request_buffer); - } + + client_conn->pending_object_transfers.clear(); /* Free the transfer queue. */ - PlasmaRequestBuffer *head = client_conn->transfer_queue; - while (head) { - DL_DELETE(client_conn->transfer_queue, head); - free(head); - head = client_conn->transfer_queue; + while (client_conn->transfer_queue.size()) { + delete client_conn->transfer_queue.front(); + client_conn->transfer_queue.pop_front(); } /* Close the manager connection and free the remaining state. */ close(client_conn->fd); diff --git a/src/plasma/plasma_manager.h b/src/plasma/plasma_manager.h index e9247a4f1..ea9f96432 100644 --- a/src/plasma/plasma_manager.h +++ b/src/plasma/plasma_manager.h @@ -173,18 +173,6 @@ typedef struct PlasmaRequestBuffer { int64_t data_size; uint8_t *metadata; int64_t metadata_size; - /* Pointer to the next buffer that we will write to this plasma manager. This - * field is only used if we're pushing requests to another plasma manager, - * not if we are receiving data. */ - PlasmaRequestBuffer *next; - /* This is required to implement a doubly-linked list. We do not use this - * field except through the UT_list macros. */ - PlasmaRequestBuffer *prev; - /* This is used to also store the PlasmaRequestBuffer in a hash table. The - * hash table is used as a set to make sure we don't try to send the same - * object multiple times to the same manager. The object_id field will be the - * key to the hash table. */ - UT_hash_handle hh; } PlasmaRequestBuffer; /**