diff --git a/lib/python/plasma.py b/lib/python/plasma.py index 8a114f294..2bbcd6d49 100644 --- a/lib/python/plasma.py +++ b/lib/python/plasma.py @@ -40,15 +40,9 @@ class PlasmaClient(object): plasma_client_library = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/plasma_client.so") self.client = ctypes.cdll.LoadLibrary(plasma_client_library) - self.client.plasma_store_connect.restype = ctypes.c_int - - self.client.plasma_create.argtypes = [ctypes.c_int, PlasmaID, ctypes.c_int64, ctypes.POINTER(ctypes.c_uint8), ctypes.c_int64, ctypes.POINTER(ctypes.c_void_p)] + self.client.plasma_store_connect.restype = ctypes.c_void_p self.client.plasma_create.restype = None - - self.client.plasma_get.argtypes = [ctypes.c_int, PlasmaID, ctypes.POINTER(ctypes.c_int64), ctypes.POINTER(ctypes.c_void_p), ctypes.POINTER(ctypes.c_int64), ctypes.POINTER(ctypes.c_void_p)] self.client.plasma_get.restype = None - - self.client.plasma_seal.argtypes = [ctypes.c_int, PlasmaID] self.client.plasma_seal.restype = None self.buffer_from_memory = ctypes.pythonapi.PyBuffer_FromMemory @@ -59,7 +53,7 @@ class PlasmaClient(object): self.buffer_from_read_write_memory.argtypes = [ctypes.c_void_p, ctypes.c_int64] self.buffer_from_read_write_memory.restype = ctypes.py_object - self.sock = self.client.plasma_store_connect(socket_name) + self.store_conn = ctypes.c_void_p(self.client.plasma_store_connect(socket_name)) if addr is not None and port is not None: self.manager_conn = self.client.plasma_manager_connect(addr, port) @@ -82,7 +76,7 @@ class PlasmaClient(object): # Turn the metadata into the right type. metadata = buffer("") if metadata is None else metadata metadata = (ctypes.c_ubyte * len(metadata)).from_buffer_copy(metadata) - self.client.plasma_create(self.sock, make_plasma_id(object_id), size, metadata, len(metadata), ctypes.byref(data)) + self.client.plasma_create(self.store_conn, make_plasma_id(object_id), size, ctypes.cast(metadata, ctypes.POINTER(ctypes.c_ubyte * len(metadata))), len(metadata), ctypes.byref(data)) return self.buffer_from_read_write_memory(data, size) def get(self, object_id): @@ -98,7 +92,7 @@ class PlasmaClient(object): data = ctypes.c_void_p() metadata_size = ctypes.c_int64() metadata = ctypes.c_void_p() - buf = self.client.plasma_get(self.sock, make_plasma_id(object_id), ctypes.byref(size), ctypes.byref(data), ctypes.byref(metadata_size), ctypes.byref(metadata)) + buf = self.client.plasma_get(self.store_conn, make_plasma_id(object_id), ctypes.byref(size), ctypes.byref(data), ctypes.byref(metadata_size), ctypes.byref(metadata)) return self.buffer_from_memory(data, size) def get_metadata(self, object_id): @@ -114,7 +108,7 @@ class PlasmaClient(object): data = ctypes.c_void_p() metadata_size = ctypes.c_int64() metadata = ctypes.c_void_p() - buf = self.client.plasma_get(self.sock, make_plasma_id(object_id), ctypes.byref(size), ctypes.byref(data), ctypes.byref(metadata_size), ctypes.byref(metadata)) + buf = self.client.plasma_get(self.store_conn, make_plasma_id(object_id), ctypes.byref(size), ctypes.byref(data), ctypes.byref(metadata_size), ctypes.byref(metadata)) return self.buffer_from_memory(metadata, metadata_size) def seal(self, object_id): @@ -126,7 +120,7 @@ class PlasmaClient(object): Args: object_id (str): A string used to identify an object. """ - self.client.plasma_seal(self.sock, make_plasma_id(object_id)) + self.client.plasma_seal(self.store_conn, make_plasma_id(object_id)) def transfer(self, addr, port, object_id): """Transfer local object with id object_id to another plasma instance diff --git a/src/plasma.h b/src/plasma.h index 3295d9a89..ddf89ad34 100644 --- a/src/plasma.h +++ b/src/plasma.h @@ -7,6 +7,8 @@ #include #include +#include "uthash.h" + #ifdef NDEBUG #define LOG_DEBUG(M, ...) #else @@ -79,6 +81,8 @@ typedef struct { int64_t data_size; /* The size of the metadata. */ int64_t metadata_size; + /* Numerical value of the fd of the memory mapped file in the store. */ + int store_fd_val; } plasma_reply; typedef struct { @@ -90,25 +94,23 @@ typedef struct { int writable; } plasma_buffer; -/* Connect to the local plasma store UNIX domain socket */ -int plasma_store_connect(const char *socket_name); +typedef struct { + /* Key that uniquely identifies the memory mapped file. In practice, we + * take the numerical value of the file descriptor in the object store. */ + int key; + /* The result of mmap for this file descriptor. */ + uint8_t *pointer; + /* Handle for the uthash table. */ + UT_hash_handle hh; +} client_mmap_table_entry; -/* Connect to a possibly remote plasma manager */ -int plasma_manager_connect(const char *addr, int port); - -void plasma_create(int conn, - plasma_id object_id, - int64_t size, - uint8_t *metadata, - int64_t metadata_size, - uint8_t **data); -void plasma_get(int conn, - plasma_id object_id, - int64_t *size, - uint8_t **data, - int64_t *metadata_size, - uint8_t **metadata); -void plasma_seal(int store, plasma_id object_id); +/* A client connection with a plasma store */ +typedef struct { + /* File descriptor of the Unix domain socket that connects to the store. */ + int conn; + /* Table of dlmalloc buffer files that have been memory mapped so far. */ + client_mmap_table_entry *mmap_table; +} plasma_store_conn; void plasma_send(int conn, plasma_request *req); diff --git a/src/plasma_client.c b/src/plasma_client.c index 20e5e087f..3d94e503b 100644 --- a/src/plasma_client.c +++ b/src/plasma_client.c @@ -13,17 +13,46 @@ #include #include "plasma.h" +#include "plasma_client.h" #include "fling.h" void plasma_send(int fd, plasma_request *req) { int req_count = sizeof(plasma_request); if (write(fd, req, req_count) != req_count) { - LOG_ERR("write error"); + LOG_ERR("write error, fd = %d", fd); exit(-1); } } -void plasma_create(int conn, +/* If the file descriptor fd has been mmapped in this client process before, + * return the pointer that was returned by mmap, otherwise mmap it and store the + * pointer in a hash table. */ +uint8_t *lookup_or_mmap(plasma_store_conn *conn, + int fd, + int store_fd_val, + int64_t map_size) { + client_mmap_table_entry *entry; + HASH_FIND_INT(conn->mmap_table, &store_fd_val, entry); + if (entry) { + close(fd); + return entry->pointer; + } else { + uint8_t *result = + mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (result == MAP_FAILED) { + LOG_ERR("mmap failed"); + exit(-1); + } + close(fd); + entry = malloc(sizeof(client_mmap_table_entry)); + entry->key = store_fd_val; + entry->pointer = result; + HASH_ADD_INT(conn->mmap_table, key, entry); + return result; + } +} + +void plasma_create(plasma_store_conn *conn, plasma_id object_id, int64_t data_size, uint8_t *metadata, @@ -37,22 +66,15 @@ void plasma_create(int conn, .object_id = object_id, .data_size = data_size, .metadata_size = metadata_size}; - plasma_send(conn, &req); + plasma_send(conn->conn, &req); plasma_reply reply; - int fd = recv_fd(conn, (char *) &reply, sizeof(plasma_reply)); + int fd = recv_fd(conn->conn, (char *) &reply, sizeof(plasma_reply)); assert(reply.data_size == data_size); assert(reply.metadata_size == metadata_size); /* The metadata should come right after the data. */ assert(reply.metadata_offset == reply.data_offset + data_size); - - // TOOD(rshin): Don't call mmap if this fd has already been mapepd. - *data = ((uint8_t *) mmap(NULL, reply.map_size, PROT_READ | PROT_WRITE, - MAP_SHARED, fd, 0)) + + *data = lookup_or_mmap(conn, fd, reply.store_fd_val, reply.map_size) + reply.data_offset; - if (*data == MAP_FAILED) { - LOG_ERR("mmap failed"); - exit(-1); - } /* If plasma_create is being called from a transfer, then we will not copy the * metadata here. The metadata will be written along with the data streamed * from the transfer. */ @@ -60,31 +82,21 @@ void plasma_create(int conn, /* Copy the metadata to the buffer. */ memcpy(*data + reply.data_size, metadata, metadata_size); } - close(fd); } /* This method is used to get both the data and the metadata. */ -void plasma_get(int conn, +void plasma_get(plasma_store_conn *conn, plasma_id object_id, int64_t *size, uint8_t **data, int64_t *metadata_size, uint8_t **metadata) { plasma_request req = {.type = PLASMA_GET, .object_id = object_id}; - plasma_send(conn, &req); + plasma_send(conn->conn, &req); plasma_reply reply; - /* The following loop is run at most twice. */ - int fd = recv_fd(conn, (char *) &reply, sizeof(plasma_reply)); - - // TOOD(rshin): Don't call mmap if this fd has already been mapepd. - *data = - ((uint8_t *) mmap(NULL, reply.map_size, PROT_READ, MAP_SHARED, fd, 0)) + - reply.data_offset; - if (*data == MAP_FAILED) { - LOG_ERR("mmap failed"); - exit(-1); - } - close(fd); + int fd = recv_fd(conn->conn, (char *) &reply, sizeof(plasma_reply)); + *data = lookup_or_mmap(conn, fd, reply.store_fd_val, reply.map_size) + + reply.data_offset; *size = reply.data_size; /* If requested, return the metadata as well. */ if (metadata != NULL) { @@ -93,12 +105,12 @@ void plasma_get(int conn, } } -void plasma_seal(int fd, plasma_id object_id) { +void plasma_seal(plasma_store_conn *conn, plasma_id object_id) { plasma_request req = {.type = PLASMA_SEAL, .object_id = object_id}; - plasma_send(fd, &req); + plasma_send(conn->conn, &req); } -int plasma_store_connect(const char *socket_name) { +plasma_store_conn *plasma_store_connect(const char *socket_name) { assert(socket_name); struct sockaddr_un addr; int fd; @@ -125,7 +137,11 @@ int plasma_store_connect(const char *socket_name) { LOG_ERR("could not connect to store %s", socket_name); exit(-1); } - return fd; + /* Initialize the store connection struct */ + plasma_store_conn *result = malloc(sizeof(plasma_store_conn)); + result->conn = fd; + result->mmap_table = NULL; + return result; } #define h_addr h_addr_list[0] diff --git a/src/plasma_client.h b/src/plasma_client.h new file mode 100644 index 000000000..4c7cc008e --- /dev/null +++ b/src/plasma_client.h @@ -0,0 +1,27 @@ +#ifndef PLASMA_CLIENT_H +#define PLASMA_CLIENT_H + +/* Connect to the local plasma store UNIX domain socket with path socket_name + * and return the resulting connection. */ +plasma_store_conn *plasma_store_connect(const char *socket_name); + +/* Connect to a possibly remote plasma manager */ +int plasma_manager_connect(const char *addr, int port); + +void plasma_create(plasma_store_conn *conn, + plasma_id object_id, + int64_t size, + uint8_t *metadata, + int64_t metadata_size, + uint8_t **data); + +void plasma_get(plasma_store_conn *conn, + plasma_id object_id, + int64_t *size, + uint8_t **data, + int64_t *metadata_size, + uint8_t **metadata); + +void plasma_seal(plasma_store_conn *conn, plasma_id object_id); + +#endif diff --git a/src/plasma_manager.c b/src/plasma_manager.c index 15da6ecc6..7cad02504 100644 --- a/src/plasma_manager.c +++ b/src/plasma_manager.c @@ -22,35 +22,36 @@ #include "event_loop.h" #include "plasma.h" +#include "plasma_client.h" #include "plasma_manager.h" typedef struct { - /* Name of the socket connecting to local plasma store. */ - const char* store_socket_name; + /* Connection to local plasma store. */ + plasma_store_conn *conn; /* Event loop. */ - event_loop* loop; + event_loop *loop; } plasma_manager_state; /* Initialize the plasma manager. This function initializes the event loop * of the plasma manager, and stores the address 'store_socket_name' of * the local plasma store socket. */ -void init_plasma_manager(plasma_manager_state* s, - const char* store_socket_name) { +void init_plasma_manager(plasma_manager_state *s, + const char *store_socket_name) { s->loop = malloc(sizeof(event_loop)); event_loop_init(s->loop); - s->store_socket_name = store_socket_name; + s->conn = plasma_store_connect(store_socket_name); + LOG_INFO("Connected to object store %s", store_socket_name); } /* Start transfering data to another object store manager. This establishes - * a connection to both the manager and the local object store and sends - * the data header to the other object manager. */ -void initiate_transfer(plasma_manager_state* s, plasma_request* req) { - int store_conn = plasma_store_connect(s->store_socket_name); - uint8_t* data; + * a connection to the remote manager and sends the data header to the other + * object manager. */ +void initiate_transfer(plasma_manager_state *s, plasma_request *req) { + uint8_t *data; int64_t data_size; - uint8_t* metadata; + uint8_t *metadata; int64_t metadata_size; - plasma_get(store_conn, req->object_id, &data_size, &data, &metadata_size, + plasma_get(s->conn, req->object_id, &data_size, &data, &metadata_size, &metadata); assert(metadata == data + data_size); plasma_buffer buf = {.object_id = req->object_id, @@ -65,7 +66,7 @@ void initiate_transfer(plasma_manager_state* s, plasma_request* req) { int fd = plasma_manager_connect(&ip_addr[0], req->port); data_connection conn = {.type = DATA_CONNECTION_WRITE, - .store_conn = store_conn, + .store_conn = s->conn->conn, .buf = buf, .cursor = 0}; event_loop_attach(s->loop, CONNECTION_DATA, &conn, fd, POLLOUT); @@ -80,17 +81,16 @@ void initiate_transfer(plasma_manager_state* s, plasma_request* req) { * Initializes the object we are going to write to in the * local plasma store and then switches the data socket to reading mode. */ void start_reading_data(int64_t index, - plasma_manager_state* s, - plasma_request* req) { - int store_conn = plasma_store_connect(s->store_socket_name); + plasma_manager_state *s, + plasma_request *req) { plasma_buffer buf = {.object_id = req->object_id, .data_size = req->data_size, .metadata_size = req->metadata_size, .writable = 1}; - plasma_create(store_conn, req->object_id, req->data_size, NULL, + plasma_create(s->conn, req->object_id, req->data_size, NULL, req->metadata_size, &buf.data); data_connection conn = {.type = DATA_CONNECTION_READ, - .store_conn = store_conn, + .store_conn = s->conn->conn, .buf = buf, .cursor = 0}; event_loop_set_connection(s->loop, index, &conn); @@ -99,8 +99,8 @@ void start_reading_data(int64_t index, /* Handle a command request that came in through a socket (transfering data, * or accepting incoming data). */ void process_command(int64_t id, - plasma_manager_state* state, - plasma_request* req) { + plasma_manager_state *state, + plasma_request *req) { switch (req->type) { case PLASMA_TRANSFER: LOG_INFO("transfering object to manager with port %d", req->port); @@ -117,12 +117,12 @@ void process_command(int64_t id, } /* Handle data or command event incoming on socket with index "index". */ -void read_from_socket(plasma_manager_state* state, - struct pollfd* waiting, +void read_from_socket(plasma_manager_state *state, + struct pollfd *waiting, int64_t index, - plasma_request* req) { + plasma_request *req) { ssize_t r, s; - data_connection* conn = event_loop_get_connection(state->loop, index); + data_connection *conn = event_loop_get_connection(state->loop, index); switch (conn->type) { case DATA_CONNECTION_HEADER: r = read(waiting->fd, req, sizeof(plasma_request)); @@ -147,8 +147,7 @@ void read_from_socket(plasma_manager_state* state, } if (r == 0) { LOG_DEBUG("reading on channel %" PRId64 " finished", index); - plasma_seal(conn->store_conn, conn->buf.object_id); - close(conn->store_conn); + plasma_seal(state->conn, conn->buf.object_id); event_loop_detach(state->loop, index, 1); } break; @@ -170,7 +169,6 @@ void read_from_socket(plasma_manager_state* state, } if (r == 0) { LOG_DEBUG("writing on channel %" PRId64 " finished", index); - close(conn->store_conn); event_loop_detach(state->loop, index, 1); } break; @@ -181,7 +179,7 @@ void read_from_socket(plasma_manager_state* state, } /* Main event loop of the plasma manager. */ -void run_event_loop(int sock, plasma_manager_state* s) { +void run_event_loop(int sock, plasma_manager_state *s) { /* Add listening socket. */ event_loop_attach(s->loop, CONNECTION_LISTENER, NULL, sock, POLLIN); plasma_request req; @@ -192,7 +190,7 @@ void run_event_loop(int sock, plasma_manager_state* s) { exit(-1); } for (int i = 0; i < event_loop_size(s->loop); ++i) { - struct pollfd* waiting = event_loop_get(s->loop, i); + struct pollfd *waiting = event_loop_get(s->loop, i); if (waiting->revents == 0) continue; if (waiting->fd == sock) { @@ -215,8 +213,8 @@ void run_event_loop(int sock, plasma_manager_state* s) { } } -void start_server(const char* store_socket_name, - const char* master_addr, +void start_server(const char *store_socket_name, + const char *master_addr, int port) { struct sockaddr_in name; int sock = socket(PF_INET, SOCK_STREAM, 0); @@ -249,11 +247,11 @@ void start_server(const char* store_socket_name, run_event_loop(sock, &state); } -int main(int argc, char* argv[]) { +int main(int argc, char *argv[]) { /* Socket name of the plasma store this manager is connected to. */ - char* store_socket_name = NULL; + char *store_socket_name = NULL; /* IP address of this node. */ - char* master_addr = NULL; + char *master_addr = NULL; /* Port number the manager should use. */ int port; int c; diff --git a/src/plasma_store.c b/src/plasma_store.c index c2a082634..a7aef2c00 100644 --- a/src/plasma_store.c +++ b/src/plasma_store.c @@ -108,6 +108,7 @@ void create_object(int conn, plasma_request* req) { reply.map_size = map_size; reply.data_size = req->data_size; reply.metadata_size = req->metadata_size; + reply.store_fd_val = fd; send_fd(conn, fd, (char*) &reply, sizeof(reply)); } @@ -122,6 +123,7 @@ void get_object(int conn, plasma_request* req) { reply.map_size = entry->map_size; reply.data_size = entry->info.data_size; reply.metadata_size = entry->info.metadata_size; + reply.store_fd_val = entry->fd; send_fd(conn, entry->fd, (char*) &reply, sizeof(plasma_reply)); } else { object_notify_entry* notify_entry; @@ -151,6 +153,7 @@ void seal_object(int conn, plasma_request* req) { if (!entry) { return; /* TODO(pcm): return error */ } + int fd = entry->fd; HASH_DELETE(handle, open_objects, entry); HASH_ADD(handle, sealed_objects, object_id, sizeof(plasma_id), entry); /* Inform processes that the object is ready now. */ @@ -162,7 +165,9 @@ void seal_object(int conn, plasma_request* req) { } plasma_reply reply = {.data_offset = entry->offset, .map_size = entry->map_size, - .data_size = entry->info.data_size}; + .data_size = entry->info.data_size, + .metadata_size = entry->info.metadata_size, + .store_fd_val = fd}; for (int i = 0; i < notify_entry->num_waiting; ++i) { send_fd(notify_entry->conn[i], entry->fd, (char*) &reply, sizeof(plasma_reply));