This commit is contained in:
Philipp Moritz
2016-08-15 16:41:22 -07:00
parent 504044c033
commit 7a18347b5c
4 changed files with 53 additions and 32 deletions
+4 -1
View File
@@ -2,7 +2,7 @@ CC = gcc
CFLAGS = -g -Wall
BUILD = build
all: $(BUILD)/plasma_store $(BUILD)/example
all: $(BUILD)/plasma_store $(BUILD)/plasma_client.so $(BUILD)/example
clean:
rm $(BUILD)/*
@@ -10,5 +10,8 @@ clean:
$(BUILD)/plasma_store: src/plasma_store.c src/plasma.h src/fling.h src/fling.c
$(CC) $(CFLAGS) --std=c99 -D_XOPEN_SOURCE=500 src/plasma_store.c src/fling.c -o $(BUILD)/plasma_store
$(BUILD)/plasma_client.so: src/plasma_client.c src/fling.h src/fling.c
$(CC) $(CFLAGS) --std=c99 -D_XOPEN_SOURCE=500 src/plasma_client.c src/fling.c -fPIC -shared -o $(BUILD)/plasma_client.so
$(BUILD)/example: src/plasma_client.c src/plasma.h src/example.c src/fling.h src/fling.c
$(CC) $(CFLAGS) --std=c99 -D_XOPEN_SOURCE=500 src/plasma_client.c src/example.c src/fling.c -o $(BUILD)/example
+7 -1
View File
@@ -27,13 +27,19 @@ typedef struct {
enum plasma_request_type {
PLASMA_CREATE, // create a new object
PLASMA_GET, // get an object
PLASMA_SEAL // seal an object
PLASMA_SEAL, // seal an object
PLASMA_TRANSFER, // request transfer to another store
PLASMA_DATA, // header for sending data
PLASMA_REGISTER // register a plasma manager
};
typedef struct {
int type;
int manager_id;
plasma_id object_id;
int64_t size;
uint8_t addr[4];
int port;
} plasma_request;
enum plasma_reply_type {
+11 -6
View File
@@ -19,7 +19,7 @@ void plasma_send(int fd, plasma_request *req) {
int req_count = sizeof(plasma_request);
if (write(fd, req, req_count) != req_count) {
if (req_count > 0) {
LOG_ERR("partial write");
LOG_ERR("partial write on fd %d", fd);
} else {
LOG_ERR("write error");
exit(-1);
@@ -28,7 +28,8 @@ void plasma_send(int fd, plasma_request *req) {
}
plasma_buffer plasma_create(int conn, plasma_id object_id, int64_t size) {
plasma_request req = { PLASMA_CREATE, object_id, size };
LOG_INFO("called plasma_create on conn %d with size %" PRId64, conn, size);
plasma_request req = { .type = PLASMA_CREATE, .object_id = object_id, .size = size };
plasma_send(conn, &req);
plasma_reply reply;
int fd = recv_fd(conn, (char*)&reply, sizeof(plasma_reply));
@@ -44,7 +45,7 @@ plasma_buffer plasma_create(int conn, plasma_id object_id, int64_t size) {
}
plasma_buffer plasma_get(int conn, plasma_id object_id) {
plasma_request req = { PLASMA_GET, object_id };
plasma_request req = { .type = PLASMA_GET, .object_id = object_id };
plasma_send(conn, &req);
plasma_reply reply;
// the following loop is run at most twice
@@ -55,13 +56,17 @@ plasma_buffer plasma_get(int conn, plasma_id object_id) {
fd = new_fd;
}
assert(reply.type == PLASMA_OBJECT);
void *data = mmap(NULL, reply.size, PROT_READ, 0, fd, 0);
void *data = mmap(NULL, reply.size, PROT_READ, MAP_SHARED, fd, 0);
if (data == MAP_FAILED) {
LOG_ERR("mmap failed");
exit(-1);
}
plasma_buffer buffer = { object_id, data, reply.size, 0 };
return buffer;
}
void plasma_seal(int fd, plasma_id object_id) {
plasma_request req = { PLASMA_SEAL, object_id };
plasma_request req = { .type = PLASMA_SEAL, .object_id = object_id };
plasma_send(fd, &req);
}
@@ -77,7 +82,7 @@ int plasma_store_connect(const char* socket_name) {
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, socket_name, sizeof(addr.sun_path)-1);
if (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
LOG_ERR("connect error");
LOG_ERR("could not connect to store %s", socket_name);
exit(-1);
}
return fd;
+31 -24
View File
@@ -48,6 +48,8 @@ int add_client(plasma_store_state* s, int fd) {
return curr_id++;
}
// remove the client at index i by swapping it with the
// client at index num_clients-1 and zeroing the latter out
void remove_client(plasma_store_state* s, int i) {
memcpy(&s->waiting[i], &s->waiting[s->num_clients-1], sizeof(struct pollfd));
memset(&s->waiting[s->num_clients-1], 0, sizeof(struct pollfd));
@@ -107,6 +109,7 @@ int create_buffer(int64_t size) {
// create a new object buffer in the hash table
void create_object(int conn, plasma_request* req) {
LOG_INFO("creating object"); // TODO(pcm): add object_id here
int fd = create_buffer(req->size);
if (fd < 0) {
LOG_ERR("could not create shared memory buffer");
@@ -145,6 +148,7 @@ void get_object(int conn, plasma_request* req) {
// seal an object that has been created in the hash table
void seal_object(int conn, plasma_request* req) {
LOG_INFO("sealing object"); // TODO(pcm): add object_id here
object_table_entry *entry;
HASH_FIND(handle, open_objects, &req->object_id, sizeof(plasma_id), entry);
if (!entry) {
@@ -179,6 +183,9 @@ void process_event(int conn, plasma_request* req) {
case PLASMA_SEAL:
seal_object(conn, req);
break;
default:
LOG_ERR("invalid request %d", req->type);
exit(-1);
}
}
@@ -195,32 +202,32 @@ void event_loop(int socket) {
}
for (int i = 0; i < state.num_clients; ++i) {
if (state.waiting[i].revents == 0)
continue;
continue;
if (state.waiting[i].fd == socket) {
while (1) {
// handle new incoming connections
int new_socket = accept(socket, NULL, NULL);
if (new_socket < 0) {
if (errno != EWOULDBLOCK) {
LOG_ERR("accept failed");
exit(-1);
}
break;
}
int client_id = add_client(&state, new_socket);
LOG_INFO("adding new client with id %d", client_id);
}
while (1) {
// handle new incoming connections
int new_socket = accept(socket, NULL, NULL);
if (new_socket < 0) {
if (errno != EWOULDBLOCK) {
LOG_ERR("accept failed");
exit(-1);
}
break;
}
int client_id = add_client(&state, new_socket);
LOG_INFO("adding new client with id %d", client_id);
}
} else {
int r = read(state.waiting[i].fd, &req, sizeof(plasma_request));
if (r == -1) {
LOG_ERR("read error");
continue;
} else if (r == 0) {
LOG_INFO("client with id %d disconnected", state.client_id[i]);
remove_client(&state, i);
} else {
process_event(state.waiting[i].fd, &req);
}
int r = read(state.waiting[i].fd, &req, sizeof(plasma_request));
if (r == -1) {
LOG_ERR("read error");
continue;
} else if (r == 0) {
LOG_INFO("client with id %d disconnected", state.client_id[i]);
remove_client(&state, i);
} else {
process_event(state.waiting[i].fd, &req);
}
}
}
}