Merge pull request #3 from pcmoritz/initial

Initial plasma commit
This commit is contained in:
Robert Nishihara
2016-08-14 01:47:38 -07:00
committed by GitHub
10 changed files with 1663 additions and 0 deletions
+2
View File
@@ -0,0 +1,2 @@
build/*
*~
+14
View File
@@ -0,0 +1,14 @@
CC = gcc
CFLAGS = -g -Wall
BUILD = build
all: $(BUILD)/plasma_store $(BUILD)/example
clean:
rm $(BUILD)/*
$(BUILD)/plasma_store: src/plasma_store.c src/plasma.h src/fling.h src/fling.c
$(CC) $(CFLAGS) --std=c99 -D_XOPEN_SOURCE=500 src/plasma_store.c src/fling.c -o $(BUILD)/plasma_store
$(BUILD)/example: src/plasma_client.c src/plasma.h src/example.c src/fling.h src/fling.c
$(CC) $(CFLAGS) --std=c99 -D_XOPEN_SOURCE=500 src/plasma_client.c src/example.c src/fling.c -o $(BUILD)/example
View File
+45
View File
@@ -0,0 +1,45 @@
// A simple example on how to use the plasma store
//
// Can be called in the following way:
//
// cd build
// ./plasma_store -s /tmp/plasma_socket
// ./example -s /tmp/plasma_socket -g
// ./example -s /tmp/plasma_socket -c -f
#include <stdlib.h>
#include <getopt.h>
#include <unistd.h>
#include <assert.h>
#include "plasma.h"
int main(int argc, char *argv[]) {
int conn = -1;
int c;
plasma_id id = {{255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255}};
while ((c = getopt(argc, argv, "s:cfg")) != -1) {
switch (c) {
case 's':
conn = plasma_store_connect(optarg);
break;
case 'c':
assert(conn != -1);
plasma_create(conn, id, 100);
break;
case 'f':
assert(conn != -1);
plasma_seal(conn, id);
break;
case 'g':
plasma_get(conn, id);
break;
default:
abort();
}
}
assert(conn != -1);
close(conn);
}
+73
View File
@@ -0,0 +1,73 @@
#include "fling.h"
void init_msg(struct msghdr *msg, struct iovec *iov,
char *buf, size_t buf_len) {
iov->iov_base = buf;
iov->iov_len = 1;
msg->msg_iov = iov;
msg->msg_iovlen = 1;
msg->msg_control = buf;
msg->msg_controllen = buf_len;
msg->msg_name = NULL;
msg->msg_namelen = 0;
}
int send_fd(int conn, int fd, const char* payload, int size) {
struct msghdr msg;
struct iovec iov;
char buf[CMSG_SPACE(sizeof(int))];
init_msg(&msg, &iov, buf, sizeof(buf));
struct cmsghdr *header = CMSG_FIRSTHDR(&msg);
header->cmsg_level = SOL_SOCKET;
header->cmsg_type = SCM_RIGHTS;
header->cmsg_len = CMSG_LEN(sizeof(int));
*(int *)CMSG_DATA(header) = fd;
// send file descriptor and payload
return sendmsg(conn, &msg, 0) != -1 && send(conn, payload, size, 0) == -1;
}
int recv_fd(int conn, char* payload, int size) {
struct msghdr msg;
struct iovec iov;
char buf[CMSG_SPACE(sizeof(int))];
init_msg(&msg, &iov, buf, sizeof(buf));
if (recvmsg(conn, &msg, 0) == -1)
return -1;
int found_fd = -1;
int oh_noes = 0;
for (struct cmsghdr *header = CMSG_FIRSTHDR(&msg); header != NULL; header = CMSG_NXTHDR(&msg, header))
if (header->cmsg_level == SOL_SOCKET && header->cmsg_type == SCM_RIGHTS) {
int count = (header->cmsg_len - (CMSG_DATA(header) - (unsigned char *)header)) / sizeof(int);
for (int i = 0; i < count; ++i) {
int fd = ((int *)CMSG_DATA(header))[i];
if (found_fd == -1) {
found_fd = fd;
} else {
close(fd);
oh_noes = 1;
}
}
}
// The sender sent us more than one file descriptor. We've closed
// them all to prevent fd leaks but notify the caller that we got
// a bad message.
if (oh_noes) {
close(found_fd);
errno = EBADMSG;
return -1;
}
ssize_t len = recv(conn, payload, size, 0);
if (len < 0) {
return -1;
}
return found_fd;
}
+35
View File
@@ -0,0 +1,35 @@
// FLING: Exchanging file descriptors over sockets
//
// This is a little library for sending file descriptors over a socket
// between processes. The reason for doing that (as opposed to using
// filenames to share the files) is so (a) no files remain in the
// filesystem after all the processes terminate, (b) to make sure that
// there are no name collisions and (c) to be able to control who has
// access to the data.
//
// Most of the code is from https://github.com/sharvil/flingfd
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
// This is neccessary for Mac OS X, see http://www.apuebook.com/faqs2e.html (10).
#if !defined(CMSG_SPACE) && !defined(CMSG_LEN)
#define CMSG_SPACE(len) (__DARWIN_ALIGN32(sizeof(struct cmsghdr)) + __DARWIN_ALIGN32(len))
#define CMSG_LEN(len) (__DARWIN_ALIGN32(sizeof(struct cmsghdr)) + (len))
#endif
void init_msg(struct msghdr *msg, struct iovec *iov,
char *buf, size_t buf_len);
// Send a file descriptor "fd" and a payload "payload" of size "size"
// over the socket "conn". Return 0 on success.
int send_fd(int conn, int fd, const char* payload, int size);
// Receive a file descriptor and a payload of size up to "size" from a
// socket "conn". The payload will be written to "payload" and the file
// descriptor will be returned. Returns -1 on failure.
int recv_fd(int conn, char* payload, int size);
+61
View File
@@ -0,0 +1,61 @@
#ifndef PLASMA_H
#define PLASMA_H
#include <inttypes.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define LOG_ERR(M, ...) \
fprintf(stderr, "[ERROR] (%s:%d: errno: %s) " M "\n", \
__FILE__, __LINE__, errno == 0 ? "None" : strerror(errno), ##__VA_ARGS__)
#define LOG_INFO(M, ...) \
fprintf(stderr, "[INFO] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)
typedef struct {
int64_t size;
int64_t create_time;
int64_t construct_duration;
} plasma_object_info;
// Represents an object id hash, can hold a full SHA1 hash
typedef struct {
unsigned char id[20];
} plasma_id;
enum plasma_request_type {
PLASMA_CREATE, // create a new object
PLASMA_GET, // get an object
PLASMA_SEAL // seal an object
};
typedef struct {
int type;
plasma_id object_id;
int64_t size;
} plasma_request;
enum plasma_reply_type {
PLASMA_OBJECT, // the file descriptor represents an object
PLASMA_FUTURE, // the file descriptor represents a future
};
typedef struct {
int type;
int64_t size;
} plasma_reply;
typedef struct {
plasma_id object_id;
void *data;
int64_t size;
int writable;
} plasma_buffer;
int plasma_store_connect(const char* socket_name);
plasma_buffer plasma_create(int conn, plasma_id object_id, int64_t size);
plasma_buffer plasma_get(int conn, plasma_id object_id);
void plasma_seal(int fd, plasma_id object_id);
#endif
+84
View File
@@ -0,0 +1,84 @@
// PLASMA CLIENT: Client library for using the plasma store and manager
#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <strings.h>
#include <netinet/in.h>
#include <netdb.h>
#include "plasma.h"
#include "fling.h"
void plasma_send(int fd, plasma_request *req) {
int req_count = sizeof(plasma_request);
if (write(fd, req, req_count) != req_count) {
if (req_count > 0) {
LOG_ERR("partial write");
} else {
LOG_ERR("write error");
exit(-1);
}
}
}
plasma_buffer plasma_create(int conn, plasma_id object_id, int64_t size) {
plasma_request req = { PLASMA_CREATE, object_id, size };
plasma_send(conn, &req);
plasma_reply reply;
int fd = recv_fd(conn, (char*)&reply, sizeof(plasma_reply));
assert(reply.type == PLASMA_OBJECT);
assert(reply.size == size);
void *data = mmap(NULL, reply.size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (data == MAP_FAILED) {
LOG_ERR("mmap failed");
exit(-1);
}
plasma_buffer buffer = { object_id, data, size, 1 };
return buffer;
}
plasma_buffer plasma_get(int conn, plasma_id object_id) {
plasma_request req = { PLASMA_GET, object_id };
plasma_send(conn, &req);
plasma_reply reply;
// the following loop is run at most twice
int fd = recv_fd(conn, (char*)&reply, sizeof(plasma_reply));
if (reply.type == PLASMA_FUTURE) {
int new_fd = recv_fd(fd, (char*)&reply, sizeof(plasma_reply));
close(fd);
fd = new_fd;
}
assert(reply.type == PLASMA_OBJECT);
void *data = mmap(NULL, reply.size, PROT_READ, 0, fd, 0);
plasma_buffer buffer = { object_id, data, reply.size, 0 };
return buffer;
}
void plasma_seal(int fd, plasma_id object_id) {
plasma_request req = { PLASMA_SEAL, object_id };
plasma_send(fd, &req);
}
int plasma_store_connect(const char* socket_name) {
assert(socket_name);
struct sockaddr_un addr;
int fd;
if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
LOG_ERR("socket error");
exit(-1);
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, socket_name, sizeof(addr.sun_path)-1);
if (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
LOG_ERR("connect error");
exit(-1);
}
return fd;
}
+275
View File
@@ -0,0 +1,275 @@
// PLASMA STORE: This is a simple object store server process
//
// It accepts incoming client connections on a unix domain socket
// (name passed in via the -s option of the executable) and uses a
// single thread to serve the clients. Each client establishes a
// connection and can create objects, wait for objects and seal
// objects through that connection.
//
// It keeps a hash table that maps object_ids (which are 20 byte long,
// just enough to store and SHA1 hash) to memory mapped files.
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <getopt.h>
#include <string.h>
#include <limits.h>
#include <poll.h>
#include "uthash.h"
#include "fling.h"
#include "plasma.h"
#define MAX_NUM_CLIENTS 2048
typedef struct {
int num_clients; // number of clients connected
int client_id[MAX_NUM_CLIENTS]; // unique identifier for the clients
struct pollfd waiting[MAX_NUM_CLIENTS]; // data structure for polling
} plasma_store_state;
void init_state(plasma_store_state* s) {
memset(&s->waiting, 0, sizeof(s->waiting));
memset(&s->client_id, 0, sizeof(s->client_id));
s->num_clients = 0;
}
int add_client(plasma_store_state* s, int fd) {
static int curr_id = 0;
s->waiting[s->num_clients].fd = fd;
s->waiting[s->num_clients].events = POLLIN;
s->client_id[s->num_clients] = curr_id;
s->num_clients += 1;
return curr_id++;
}
void remove_client(plasma_store_state* s, int i) {
memcpy(&s->waiting[i], &s->waiting[s->num_clients-1], sizeof(struct pollfd));
memset(&s->waiting[s->num_clients-1], 0, sizeof(struct pollfd));
s->client_id[i] = s->client_id[s->num_clients-1];
s->client_id[s->num_clients-1] = 0;
s->num_clients -= 1;
}
typedef struct {
plasma_id object_id; // object id of this object
plasma_object_info info; // object info like size, creation time and owner
int fd; // memory mapped file containing the object
UT_hash_handle handle; // handle for the uthash table
} object_table_entry;
// objects that are still being written by their owner process
object_table_entry* open_objects = NULL;
// objects that have already been sealed by their owner process and
// can now be shared with other processes
object_table_entry* sealed_objects = NULL;
typedef struct {
plasma_id object_id; // object id of this object
int num_waiting; // number of processes waiting for the object
int conn[MAX_NUM_CLIENTS]; // socket connections to waiting clients
UT_hash_handle handle; // handle for the uthash table
} object_notify_entry;
// objects that processes are waiting for
object_notify_entry* objects_notify = NULL;
// Create a buffer. This is creating a temporary file and then
// immediately unlinking it so we do not leave traces in the system.
int create_buffer(int64_t size) {
static char template[] = "/tmp/plasmaXXXXXX";
char file_name[32];
strncpy(file_name, template, 32);
int fd = mkstemp(file_name);
if (fd < 0)
return -1;
FILE* file = fdopen(fd, "a+");
if (!file) {
close(fd);
return -1;
}
if (unlink(file_name) != 0) {
LOG_ERR("unlink error");
return -1;
}
if (ftruncate(fd, (off_t) size) != 0) {
LOG_ERR("ftruncate error");
return -1;
}
return fd;
}
// create a new object buffer in the hash table
void create_object(int conn, plasma_request* req) {
int fd = create_buffer(req->size);
if (fd < 0) {
LOG_ERR("could not create shared memory buffer");
exit(-1);
}
object_table_entry *entry = malloc(sizeof(object_table_entry));
memcpy(&entry->object_id, &req->object_id, 20);
entry->info.size = req->size;
// TODO(pcm): set the other fields
entry->fd = fd;
HASH_ADD(handle, open_objects, object_id, sizeof(plasma_id), entry);
plasma_reply reply = { PLASMA_OBJECT, req->size };
send_fd(conn, fd, (char*) &reply, sizeof(plasma_reply));
}
// get an object from the hash table
void get_object(int conn, plasma_request* req) {
object_table_entry *entry;
HASH_FIND(handle, sealed_objects, &req->object_id, sizeof(plasma_id), entry);
if (entry) {
plasma_reply reply = { PLASMA_OBJECT, entry->info.size };
send_fd(conn, entry->fd, (char*) &reply, sizeof(plasma_reply));
} else {
LOG_INFO("object not in hash table of sealed objects");
int fd[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, fd);
object_notify_entry *notify_entry = malloc(sizeof(object_notify_entry));
memcpy(&notify_entry->object_id, &req->object_id, 20);
notify_entry->conn[notify_entry->num_waiting] = fd[0];
notify_entry->num_waiting += 1;
HASH_ADD(handle, objects_notify, object_id, sizeof(plasma_id), notify_entry);
plasma_reply reply = { PLASMA_FUTURE, -1 };
send_fd(conn, fd[1], (char*) &reply, sizeof(plasma_reply));
}
}
// seal an object that has been created in the hash table
void seal_object(int conn, plasma_request* req) {
object_table_entry *entry;
HASH_FIND(handle, open_objects, &req->object_id, sizeof(plasma_id), entry);
if (!entry) {
return; // TODO(pcm): return error
}
HASH_DELETE(handle, open_objects, entry);
int64_t size = entry->info.size;
int fd = entry->fd;
HASH_ADD(handle, sealed_objects, object_id, sizeof(plasma_id), entry);
// inform processes that the object is ready now
object_notify_entry* notify_entry;
HASH_FIND(handle, objects_notify, &req->object_id, sizeof(plasma_id), notify_entry);
if (!notify_entry) {
return;
}
plasma_reply reply = { PLASMA_OBJECT, size };
for (int i = 0; i < notify_entry->num_waiting; ++i) {
send_fd(notify_entry->conn[i], fd, (char*) &reply, sizeof(plasma_reply));
}
HASH_DELETE(handle, objects_notify, notify_entry);
free(notify_entry);
}
void process_event(int conn, plasma_request* req) {
switch (req->type) {
case PLASMA_CREATE:
create_object(conn, req);
break;
case PLASMA_GET:
get_object(conn, req);
break;
case PLASMA_SEAL:
seal_object(conn, req);
break;
}
}
void event_loop(int socket) {
plasma_store_state state;
init_state(&state);
add_client(&state, socket);
plasma_request req;
while (1) {
int num_ready = poll(state.waiting, state.num_clients, -1);
if (num_ready < 0) {
LOG_ERR("poll failed");
exit(-1);
}
for (int i = 0; i < state.num_clients; ++i) {
if (state.waiting[i].revents == 0)
continue;
if (state.waiting[i].fd == socket) {
while (1) {
// handle new incoming connections
int new_socket = accept(socket, NULL, NULL);
if (new_socket < 0) {
if (errno != EWOULDBLOCK) {
LOG_ERR("accept failed");
exit(-1);
}
break;
}
int client_id = add_client(&state, new_socket);
LOG_INFO("adding new client with id %d", client_id);
}
} else {
int r = read(state.waiting[i].fd, &req, sizeof(plasma_request));
if (r == -1) {
LOG_ERR("read error");
continue;
} else if (r == 0) {
LOG_INFO("client with id %d disconnected", state.client_id[i]);
remove_client(&state, i);
} else {
process_event(state.waiting[i].fd, &req);
}
}
}
}
}
void start_server(char* socket_name) {
int fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd == -1) {
LOG_ERR("socket error");
exit(-1);
}
int on = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on)) < 0) {
LOG_ERR("setsockopt failed");
close(fd);
exit(-1);
}
// TODO(pcm): http://stackoverflow.com/q/1150635
if (ioctl(fd, FIONBIO, (char*) &on) < 0) {
LOG_ERR("ioctl failed");
close(fd);
exit(-1);
}
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, socket_name, sizeof(addr.sun_path)-1);
unlink(socket_name);
bind(fd, (struct sockaddr*)&addr, sizeof(addr));
listen(fd, 5);
event_loop(fd);
}
int main(int argc, char* argv[]) {
char *socket_name = NULL;
int c;
while ((c = getopt(argc, argv, "s:")) != -1) {
switch (c) {
case 's':
socket_name = optarg;
break;
default:
exit(-1);
}
}
if (!socket_name) {
LOG_ERR("please specify socket for incoming connections with -s switch");
exit(-1);
}
LOG_INFO("starting server listening on %s", socket_name);
start_server(socket_name);
}
+1074
View File
File diff suppressed because it is too large Load Diff