From 7a18347b5cd3e6efdff1bf64677a86244d958486 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 15 Aug 2016 16:41:22 -0700 Subject: [PATCH] cleanup --- Makefile | 5 ++++- src/plasma.h | 8 ++++++- src/plasma_client.c | 17 +++++++++----- src/plasma_store.c | 55 +++++++++++++++++++++++++-------------------- 4 files changed, 53 insertions(+), 32 deletions(-) diff --git a/Makefile b/Makefile index 32cb50a61..85167f6cd 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ CC = gcc CFLAGS = -g -Wall BUILD = build -all: $(BUILD)/plasma_store $(BUILD)/example +all: $(BUILD)/plasma_store $(BUILD)/plasma_client.so $(BUILD)/example clean: rm $(BUILD)/* @@ -10,5 +10,8 @@ clean: $(BUILD)/plasma_store: src/plasma_store.c src/plasma.h src/fling.h src/fling.c $(CC) $(CFLAGS) --std=c99 -D_XOPEN_SOURCE=500 src/plasma_store.c src/fling.c -o $(BUILD)/plasma_store +$(BUILD)/plasma_client.so: src/plasma_client.c src/fling.h src/fling.c + $(CC) $(CFLAGS) --std=c99 -D_XOPEN_SOURCE=500 src/plasma_client.c src/fling.c -fPIC -shared -o $(BUILD)/plasma_client.so + $(BUILD)/example: src/plasma_client.c src/plasma.h src/example.c src/fling.h src/fling.c $(CC) $(CFLAGS) --std=c99 -D_XOPEN_SOURCE=500 src/plasma_client.c src/example.c src/fling.c -o $(BUILD)/example diff --git a/src/plasma.h b/src/plasma.h index 8577aab19..42ac2c6f5 100644 --- a/src/plasma.h +++ b/src/plasma.h @@ -27,13 +27,19 @@ typedef struct { enum plasma_request_type { PLASMA_CREATE, // create a new object PLASMA_GET, // get an object - PLASMA_SEAL // seal an object + PLASMA_SEAL, // seal an object + PLASMA_TRANSFER, // request transfer to another store + PLASMA_DATA, // header for sending data + PLASMA_REGISTER // register a plasma manager }; typedef struct { int type; + int manager_id; plasma_id object_id; int64_t size; + uint8_t addr[4]; + int port; } plasma_request; enum plasma_reply_type { diff --git a/src/plasma_client.c b/src/plasma_client.c index 9a8359eb6..86b49125d 100644 --- a/src/plasma_client.c +++ b/src/plasma_client.c @@ -19,7 +19,7 @@ void plasma_send(int fd, plasma_request *req) { int req_count = sizeof(plasma_request); if (write(fd, req, req_count) != req_count) { if (req_count > 0) { - LOG_ERR("partial write"); + LOG_ERR("partial write on fd %d", fd); } else { LOG_ERR("write error"); exit(-1); @@ -28,7 +28,8 @@ void plasma_send(int fd, plasma_request *req) { } plasma_buffer plasma_create(int conn, plasma_id object_id, int64_t size) { - plasma_request req = { PLASMA_CREATE, object_id, size }; + LOG_INFO("called plasma_create on conn %d with size %" PRId64, conn, size); + plasma_request req = { .type = PLASMA_CREATE, .object_id = object_id, .size = size }; plasma_send(conn, &req); plasma_reply reply; int fd = recv_fd(conn, (char*)&reply, sizeof(plasma_reply)); @@ -44,7 +45,7 @@ plasma_buffer plasma_create(int conn, plasma_id object_id, int64_t size) { } plasma_buffer plasma_get(int conn, plasma_id object_id) { - plasma_request req = { PLASMA_GET, object_id }; + plasma_request req = { .type = PLASMA_GET, .object_id = object_id }; plasma_send(conn, &req); plasma_reply reply; // the following loop is run at most twice @@ -55,13 +56,17 @@ plasma_buffer plasma_get(int conn, plasma_id object_id) { fd = new_fd; } assert(reply.type == PLASMA_OBJECT); - void *data = mmap(NULL, reply.size, PROT_READ, 0, fd, 0); + void *data = mmap(NULL, reply.size, PROT_READ, MAP_SHARED, fd, 0); + if (data == MAP_FAILED) { + LOG_ERR("mmap failed"); + exit(-1); + } plasma_buffer buffer = { object_id, data, reply.size, 0 }; return buffer; } void plasma_seal(int fd, plasma_id object_id) { - plasma_request req = { PLASMA_SEAL, object_id }; + plasma_request req = { .type = PLASMA_SEAL, .object_id = object_id }; plasma_send(fd, &req); } @@ -77,7 +82,7 @@ int plasma_store_connect(const char* socket_name) { addr.sun_family = AF_UNIX; strncpy(addr.sun_path, socket_name, sizeof(addr.sun_path)-1); if (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) { - LOG_ERR("connect error"); + LOG_ERR("could not connect to store %s", socket_name); exit(-1); } return fd; diff --git a/src/plasma_store.c b/src/plasma_store.c index 528f8d52c..3260cde9e 100644 --- a/src/plasma_store.c +++ b/src/plasma_store.c @@ -48,6 +48,8 @@ int add_client(plasma_store_state* s, int fd) { return curr_id++; } +// remove the client at index i by swapping it with the +// client at index num_clients-1 and zeroing the latter out void remove_client(plasma_store_state* s, int i) { memcpy(&s->waiting[i], &s->waiting[s->num_clients-1], sizeof(struct pollfd)); memset(&s->waiting[s->num_clients-1], 0, sizeof(struct pollfd)); @@ -107,6 +109,7 @@ int create_buffer(int64_t size) { // create a new object buffer in the hash table void create_object(int conn, plasma_request* req) { + LOG_INFO("creating object"); // TODO(pcm): add object_id here int fd = create_buffer(req->size); if (fd < 0) { LOG_ERR("could not create shared memory buffer"); @@ -145,6 +148,7 @@ void get_object(int conn, plasma_request* req) { // seal an object that has been created in the hash table void seal_object(int conn, plasma_request* req) { + LOG_INFO("sealing object"); // TODO(pcm): add object_id here object_table_entry *entry; HASH_FIND(handle, open_objects, &req->object_id, sizeof(plasma_id), entry); if (!entry) { @@ -179,6 +183,9 @@ void process_event(int conn, plasma_request* req) { case PLASMA_SEAL: seal_object(conn, req); break; + default: + LOG_ERR("invalid request %d", req->type); + exit(-1); } } @@ -195,32 +202,32 @@ void event_loop(int socket) { } for (int i = 0; i < state.num_clients; ++i) { if (state.waiting[i].revents == 0) - continue; + continue; if (state.waiting[i].fd == socket) { - while (1) { - // handle new incoming connections - int new_socket = accept(socket, NULL, NULL); - if (new_socket < 0) { - if (errno != EWOULDBLOCK) { - LOG_ERR("accept failed"); - exit(-1); - } - break; - } - int client_id = add_client(&state, new_socket); - LOG_INFO("adding new client with id %d", client_id); - } + while (1) { + // handle new incoming connections + int new_socket = accept(socket, NULL, NULL); + if (new_socket < 0) { + if (errno != EWOULDBLOCK) { + LOG_ERR("accept failed"); + exit(-1); + } + break; + } + int client_id = add_client(&state, new_socket); + LOG_INFO("adding new client with id %d", client_id); + } } else { - int r = read(state.waiting[i].fd, &req, sizeof(plasma_request)); - if (r == -1) { - LOG_ERR("read error"); - continue; - } else if (r == 0) { - LOG_INFO("client with id %d disconnected", state.client_id[i]); - remove_client(&state, i); - } else { - process_event(state.waiting[i].fd, &req); - } + int r = read(state.waiting[i].fd, &req, sizeof(plasma_request)); + if (r == -1) { + LOG_ERR("read error"); + continue; + } else if (r == 0) { + LOG_INFO("client with id %d disconnected", state.client_id[i]); + remove_client(&state, i); + } else { + process_event(state.waiting[i].fd, &req); + } } } }