PlasmaRequestBuffer data structure updates (#1023)

* Replaced utstring with std::string

* Converted transfer_queue to a list

* Converted pending_object_transfers to unordered_map

* Fix free/delete bug and small modifications.
This commit is contained in:
Peter Schafhalter
2017-09-27 19:50:37 -07:00
committed by Robert Nishihara
parent b020e6bf1f
commit bb76d4ca0a
2 changed files with 28 additions and 58 deletions
+28 -46
View File
@@ -21,12 +21,12 @@
#include <netinet/in.h>
/* C++ includes. */
#include <list>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "uthash.h"
#include "utlist.h"
#include "common_protocol.h"
#include "io.h"
#include "net.h"
@@ -288,15 +288,14 @@ struct ClientConnection {
PlasmaManagerState *manager_state;
/** Current position in the buffer. */
int64_t cursor;
/** Buffer that this connection is reading from. If this is a connection to
* write data to another plasma store, then it is a linked
* list of buffers to write. */
/** Linked list of buffers to read or write. */
/* TODO(swang): Split into two queues, data transfers and data requests. */
PlasmaRequestBuffer *transfer_queue;
std::list<PlasmaRequestBuffer *> transfer_queue;
/* A set of object IDs which are queued in the transfer_queue and waiting to
* be sent. This is used to avoid sending the same object ID to the same
* manager multiple times. */
PlasmaRequestBuffer *pending_object_transfers;
std::unordered_map<ObjectID, PlasmaRequestBuffer *, UniqueIDHasher>
pending_object_transfers;
/** Buffer used to receive transfers (data fetches) we want to ignore */
PlasmaRequestBuffer *ignore_buffer;
/** File descriptor for the socket connected to the other
@@ -652,7 +651,7 @@ void send_queued_request(event_loop *loop,
ClientConnection *conn = (ClientConnection *) context;
PlasmaManagerState *state = conn->manager_state;
if (conn->transfer_queue == NULL) {
if (conn->transfer_queue.size() == 0) {
/* If there are no objects to transfer, temporarily remove this connection
* from the event loop. It will be reawoken when we receive another
* data request. */
@@ -660,7 +659,7 @@ void send_queued_request(event_loop *loop,
return;
}
PlasmaRequestBuffer *buf = conn->transfer_queue;
PlasmaRequestBuffer *buf = conn->transfer_queue.front();
int err = 0;
switch (buf->type) {
case MessageType_PlasmaDataRequest:
@@ -705,9 +704,9 @@ void send_queued_request(event_loop *loop,
if (buf->type == MessageType_PlasmaDataReply) {
/* If we just finished sending an object to a remote manager, then remove
* the object from the hash table of pending transfer requests. */
HASH_DELETE(hh, conn->pending_object_transfers, buf);
conn->pending_object_transfers.erase(buf->object_id);
}
DL_DELETE(conn->transfer_queue, buf);
conn->transfer_queue.pop_front();
free(buf);
}
}
@@ -747,7 +746,7 @@ void process_data_chunk(event_loop *loop,
int events) {
/* Read the object chunk. */
ClientConnection *conn = (ClientConnection *) context;
PlasmaRequestBuffer *buf = conn->transfer_queue;
PlasmaRequestBuffer *buf = conn->transfer_queue.front();
int done = read_object_chunk(conn, buf);
if (!done) {
return;
@@ -763,7 +762,7 @@ void process_data_chunk(event_loop *loop,
ARROW_CHECK_OK(
conn->manager_state->plasma_conn->Release(buf->object_id.to_plasma_id()));
/* Remove the request buffer used for reading this object's data. */
DL_DELETE(conn->transfer_queue, buf);
conn->transfer_queue.pop_front();
free(buf);
/* Switch to listening for requests from this socket, instead of reading
* object data. */
@@ -827,10 +826,8 @@ void process_transfer_request(event_loop *loop,
/* If there is already a request in the transfer queue with the same object
* ID, do not add the transfer request. */
PlasmaRequestBuffer *pending;
HASH_FIND(hh, manager_conn->pending_object_transfers, &obj_id, sizeof(obj_id),
pending);
if (pending != NULL) {
auto pending_it = manager_conn->pending_object_transfers.find(obj_id);
if (pending_it != manager_conn->pending_object_transfers.end()) {
return;
}
@@ -852,7 +849,7 @@ void process_transfer_request(event_loop *loop,
/* If we already have a connection to this manager and its inactive,
* (re)register it with the event loop again. */
if (manager_conn->transfer_queue == NULL) {
if (manager_conn->transfer_queue.size() == 0) {
bool success = event_loop_add_file(loop, manager_conn->fd, EVENT_LOOP_WRITE,
send_queued_request, manager_conn);
if (!success) {
@@ -863,8 +860,7 @@ void process_transfer_request(event_loop *loop,
DCHECK(object_buffer.metadata ==
object_buffer.data + object_buffer.data_size);
PlasmaRequestBuffer *buf =
(PlasmaRequestBuffer *) malloc(sizeof(PlasmaRequestBuffer));
PlasmaRequestBuffer *buf = new PlasmaRequestBuffer();
buf->type = MessageType_PlasmaDataReply;
buf->object_id = obj_id;
/* We treat buf->data as a pointer to the concatenated data and metadata, so
@@ -873,9 +869,8 @@ void process_transfer_request(event_loop *loop,
buf->data_size = object_buffer.data_size;
buf->metadata_size = object_buffer.metadata_size;
DL_APPEND(manager_conn->transfer_queue, buf);
HASH_ADD(hh, manager_conn->pending_object_transfers, object_id,
sizeof(buf->object_id), buf);
manager_conn->transfer_queue.push_back(buf);
manager_conn->pending_object_transfers[object_id] = buf;
}
/**
@@ -897,8 +892,7 @@ void process_data_request(event_loop *loop,
int64_t data_size,
int64_t metadata_size,
ClientConnection *conn) {
PlasmaRequestBuffer *buf =
(PlasmaRequestBuffer *) malloc(sizeof(PlasmaRequestBuffer));
PlasmaRequestBuffer *buf = new PlasmaRequestBuffer();
buf->object_id = object_id;
buf->data_size = data_size;
buf->metadata_size = metadata_size;
@@ -913,7 +907,7 @@ void process_data_request(event_loop *loop,
if (s.ok()) {
/* Add buffer where the fetched data is to be stored to
* conn->transfer_queue. */
DL_APPEND(conn->transfer_queue, buf);
conn->transfer_queue.push_back(buf);
}
CHECK(conn->cursor == 0);
@@ -965,19 +959,18 @@ void request_transfer_from(PlasmaManagerState *manager_state,
"This manager is attempting to request a transfer from itself.");
}
PlasmaRequestBuffer *transfer_request =
(PlasmaRequestBuffer *) malloc(sizeof(PlasmaRequestBuffer));
PlasmaRequestBuffer *transfer_request = new PlasmaRequestBuffer();
transfer_request->type = MessageType_PlasmaDataRequest;
transfer_request->object_id = fetch_req->object_id;
if (manager_conn->transfer_queue == NULL) {
if (manager_conn->transfer_queue.size() == 0) {
/* If we already have a connection to this manager and it's inactive,
* (re)register it with the event loop. */
event_loop_add_file(manager_state->loop, manager_conn->fd,
EVENT_LOOP_WRITE, send_queued_request, manager_conn);
}
/* Add this transfer request to this connection's transfer queue. */
DL_APPEND(manager_conn->transfer_queue, transfer_request);
manager_conn->transfer_queue.push_back(transfer_request);
}
/* On the next attempt, try the next manager in manager_vector. */
@@ -1446,12 +1439,9 @@ ClientConnection *ClientConnection_init(PlasmaManagerState *state,
int client_sock,
const char *client_key) {
/* Create a new data connection context per client. */
ClientConnection *conn =
(ClientConnection *) malloc(sizeof(ClientConnection));
ClientConnection *conn = new ClientConnection();
conn->manager_state = state;
conn->cursor = 0;
conn->transfer_queue = NULL;
conn->pending_object_transfers = NULL;
conn->fd = client_sock;
conn->num_return_objects = 0;
@@ -1479,21 +1469,13 @@ ClientConnection *ClientConnection_listen(event_loop *loop,
void ClientConnection_free(ClientConnection *client_conn) {
PlasmaManagerState *state = client_conn->manager_state;
HASH_DELETE(manager_hh, state->manager_connections, client_conn);
/* Free the hash table of object IDs that are waiting to be transferred. */
PlasmaRequestBuffer *request_buffer, *tmp_buffer;
HASH_ITER(hh, client_conn->pending_object_transfers, request_buffer,
tmp_buffer) {
/* We do not free the PlasmaRequestBuffer here because it is also in the
* transfer queue and will be freed below. */
HASH_DELETE(hh, client_conn->pending_object_transfers, request_buffer);
}
client_conn->pending_object_transfers.clear();
/* Free the transfer queue. */
PlasmaRequestBuffer *head = client_conn->transfer_queue;
while (head) {
DL_DELETE(client_conn->transfer_queue, head);
free(head);
head = client_conn->transfer_queue;
while (client_conn->transfer_queue.size()) {
delete client_conn->transfer_queue.front();
client_conn->transfer_queue.pop_front();
}
/* Close the manager connection and free the remaining state. */
close(client_conn->fd);
-12
View File
@@ -173,18 +173,6 @@ typedef struct PlasmaRequestBuffer {
int64_t data_size;
uint8_t *metadata;
int64_t metadata_size;
/* Pointer to the next buffer that we will write to this plasma manager. This
* field is only used if we're pushing requests to another plasma manager,
* not if we are receiving data. */
PlasmaRequestBuffer *next;
/* This is required to implement a doubly-linked list. We do not use this
* field except through the UT_list macros. */
PlasmaRequestBuffer *prev;
/* This is used to also store the PlasmaRequestBuffer in a hash table. The
* hash table is used as a set to make sure we don't try to send the same
* object multiple times to the same manager. The object_id field will be the
* key to the hash table. */
UT_hash_handle hh;
} PlasmaRequestBuffer;
/**