diff --git a/Makefile b/Makefile index e303d65b5..f2c7ff9a5 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ CC = gcc CFLAGS = -g -Wall BUILD = build -all: $(BUILD)/plasma_store $(BUILD)/plasma_client.so $(BUILD)/example +all: $(BUILD)/plasma_store $(BUILD)/plasma_manager $(BUILD)/plasma_client.so $(BUILD)/example clean: rm -r $(BUILD)/* @@ -10,6 +10,9 @@ clean: $(BUILD)/plasma_store: src/plasma_store.c src/plasma.h src/fling.h src/fling.c $(CC) $(CFLAGS) --std=c99 -D_XOPEN_SOURCE=500 src/plasma_store.c src/fling.c -o $(BUILD)/plasma_store +$(BUILD)/plasma_manager: src/plasma_manager.c src/plasma.h src/plasma_client.c src/fling.h src/fling.c + $(CC) $(CFLAGS) --std=c99 -D_XOPEN_SOURCE=500 src/plasma_manager.c src/plasma_client.c src/fling.c -o $(BUILD)/plasma_manager + $(BUILD)/plasma_client.so: src/plasma_client.c src/fling.h src/fling.c $(CC) $(CFLAGS) --std=c99 -D_XOPEN_SOURCE=500 src/plasma_client.c src/fling.c -fPIC -shared -o $(BUILD)/plasma_client.so diff --git a/lib/python/plasma.py b/lib/python/plasma.py index 69361d63c..83de5f1b8 100644 --- a/lib/python/plasma.py +++ b/lib/python/plasma.py @@ -9,8 +9,13 @@ ID = ctypes.c_ubyte * 20 class PlasmaID(ctypes.Structure): _fields_ = [("plasma_id", ID)] -# these must be in sync with plasma_request_type in plasma.h +# these must be in sync with plasma_request_type in plasma.h (can we have a test for that?) PLASMA_CREATE = 0 +PLASMA_GET = 1 +PLASMA_SEAL = 2 +PLASMA_TRANSFER = 3 +PLASMA_DATA = 4 +PLASMA_REGISTER = 5 class PlasmaRequest(ctypes.Structure): _fields_ = [("type", ctypes.c_int), @@ -32,6 +37,24 @@ def make_plasma_id(string): object_id = map(ord, string) return PlasmaID(plasma_id=ID(*object_id)) +class PlasmaManager(object): + + def __init__(self, addr, port): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.connect((addr, port)) + + def register(self, manager_id, addr, port): + "Register another object manager." + req = PlasmaRequest(type=PLASMA_REGISTER, manager_id=manager_id, + addr=Addr(*map(int, addr.split("."))), port=port) + self.sock.send(buffer(req)[:]) + + def transfer(self, manager_id, object_id): + "Transfer local object with id object_id to manager with id manager_id." + req = PlasmaRequest(type=PLASMA_TRANSFER, manager_id=manager_id, + object_id=make_plasma_id(object_id)) + self.sock.send(buffer(req)[:]) + class PlasmaClient(object): def __init__(self, socket_name): plasma_client_library = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/plasma_client.so") diff --git a/src/plasma.h b/src/plasma.h index 42ac2c6f5..2e4fbfdcd 100644 --- a/src/plasma.h +++ b/src/plasma.h @@ -6,6 +6,13 @@ #include #include +#ifdef NDEBUG + #define LOG_DEBUG(M, ...) +#else + #define LOG_DEBUG(M, ...) \ + fprintf(stderr, "[DEBUG] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__) +#endif + #define LOG_ERR(M, ...) \ fprintf(stderr, "[ERROR] (%s:%d: errno: %s) " M "\n", \ __FILE__, __LINE__, errno == 0 ? "None" : strerror(errno), ##__VA_ARGS__) @@ -24,6 +31,7 @@ typedef struct { unsigned char id[20]; } plasma_id; +// these values must be in sync with the ones in plasma.py (can we have a test for that?) enum plasma_request_type { PLASMA_CREATE, // create a new object PLASMA_GET, // get an object @@ -64,4 +72,6 @@ plasma_buffer plasma_create(int conn, plasma_id object_id, int64_t size); plasma_buffer plasma_get(int conn, plasma_id object_id); void plasma_seal(int fd, plasma_id object_id); +void plasma_send(int fd, plasma_request *req); + #endif diff --git a/src/plasma_manager.c b/src/plasma_manager.c new file mode 100644 index 000000000..7ad2585b6 --- /dev/null +++ b/src/plasma_manager.c @@ -0,0 +1,392 @@ +// PLASMA MANAGER: Local to a node, connects to other managers to send and +// receive objects from them +// +// The storage manager listens on its main listening port, and if a request for +// transfering an object to another object store comes in, it ships the data +// using a new connection to the target object manager. Also keeps a list of +// other object managers. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "plasma.h" + +#define MAX_CONNECTIONS 2048 +#define MAX_NUM_MANAGERS 1024 + +enum conn_type { + // Connection to send commands to the manager. + CONN_CONTROL, + // Connection to send data to another manager. + CONN_WRITE_DATA, + // Connection to receive data from another manager. + CONN_READ_DATA +}; + +typedef struct { + // Unique identifier for the connection. + int id; + // Of type conn_type. + int type; + // Socket of the plasma store that is accessed for reading or writing data for + // this connection. + int store_conn; + // Buffer this connection is reading from or writing to. + plasma_buffer buf; + // Current position in the buffer. + int64_t cursor; +} conn_state; + +typedef struct { + // Address of the manager. + struct sockaddr_in name; + // Is this manager connected? + int connected; +} manager_state; + +typedef struct { + // Name of the socket connecting to local plasma store. + const char* store_socket_name; + // Number of connections. + int num_conn; + // For the "poll" system call. + struct pollfd waiting[MAX_CONNECTIONS]; + // Status of connections (both control and data). + conn_state conn[MAX_CONNECTIONS]; + // Other plasma managers in the cluster. + manager_state managers[MAX_NUM_MANAGERS]; +} plasma_manager_state; + +void init_manager_state(plasma_manager_state *s, const char* store_socket_name) { + memset(&s->waiting, 0, sizeof(s->waiting)); + memset(&s->conn, 0, sizeof(s->conn)); + memset(&s->managers, 0, sizeof(s->managers)); + s->num_conn = 0; + s->store_socket_name = store_socket_name; +} + +#define h_addr h_addr_list[0] + +// Add name info for another plasma manager from the cluster. +void add_manager(plasma_manager_state *s, int manager_id, char *ip_address, int port) { + assert(ip_address); + assert(s); + struct hostent *manager = gethostbyname(ip_address); + if (!manager) { + LOG_ERR("plasma manager %s not found", ip_address); + exit(-1); + } + s->managers[manager_id].connected = 1; + struct sockaddr_in *name = &s->managers[manager_id].name; + name->sin_family = AF_INET; + bcopy(manager->h_addr, &name->sin_addr.s_addr, manager->h_length); + name->sin_port = htons(port); +} + +// Add connection for sending commands or data to another plasma manager +// (returns the connection id). +int add_conn(plasma_manager_state* s, int type, int fd, int events, plasma_buffer* buf) { + static int conn_id = 0; + s->waiting[s->num_conn].fd = fd; + s->waiting[s->num_conn].events = events; + s->conn[s->num_conn].id = conn_id; + s->conn[s->num_conn].type = type; + if (buf) { + s->conn[s->num_conn].buf = *buf; + } + s->conn[s->num_conn].cursor = 0; + s->num_conn += 1; + return conn_id++; +} + +// Remove connection with index i by swapping it with the last element. +void remove_conn(plasma_manager_state* s, int i) { + memcpy(&s->waiting[i], &s->waiting[s->num_conn-1], sizeof(struct pollfd)); + memset(&s->waiting[s->num_conn-1], 0, sizeof(struct pollfd)); + memcpy(&s->conn[i], &s->conn[s->num_conn-1], sizeof(conn_state)); + memset(&s->conn[s->num_conn-1], 0, sizeof(conn_state)); +} + +#define BUFSIZE 4096 + +// Start transfering data to another object store manager. This establishes +// a connection to both the manager and the local object store and sends +// the data header to the other object manager. +void initiate_transfer(plasma_manager_state* state, plasma_request* req) { + int manager_id = req->manager_id; + int c = plasma_store_connect(state->store_socket_name); + plasma_buffer buf = plasma_get(c, req->object_id); + + int fd = socket(PF_INET, SOCK_STREAM, 0); + if (fd < 0) { + LOG_ERR("could not create socket"); + exit(-1); + } + int r = connect(fd, (struct sockaddr*) &state->managers[manager_id].name, sizeof(state->managers[manager_id].name)); + if (r < 0) { + LOG_ERR("could not establish connection to manager with id %d", manager_id); + exit(-1); + } + + add_conn(state, CONN_WRITE_DATA, fd, POLLOUT, &buf); + + plasma_request manager_req = { .type = PLASMA_DATA, .object_id = req->object_id, .size = buf.size }; + LOG_INFO("filedescriptor is %d", fd); + plasma_send(fd, &manager_req); +} + +void setup_data_connection(int conn_idx, plasma_manager_state* state, plasma_request* req) { + int store_conn = plasma_store_connect(state->store_socket_name); + state->conn[conn_idx].type = CONN_READ_DATA; + state->conn[conn_idx].store_conn = store_conn; + state->conn[conn_idx].buf = plasma_create(store_conn, req->object_id, req->size); + state->conn[conn_idx].cursor = 0; +} + +// Handle a command request that came in through a socket (transfering data, +// registering object managers, accepting incoming data). +void process_command(int conn_idx, plasma_manager_state* state, plasma_request* req) { + switch (req->type) { + case PLASMA_TRANSFER: + LOG_INFO("transfering object to manager with id %d", req->manager_id); + initiate_transfer(state, req); + break; + case PLASMA_REGISTER: { + char buff[16]; + snprintf(buff, 32, "%d.%d.%d.%d", + req->addr[0], req->addr[1], + req->addr[2], req->addr[3]); + if (req->manager_id >= MAX_NUM_MANAGERS) { + LOG_ERR("manager_id %d out of bounds", req->manager_id); + } else { + add_manager(state, req->manager_id, buff, req->port); + LOG_INFO("registering %s:%d with id %d", buff, req->port, req->manager_id); + } + } + break; + case PLASMA_DATA: + LOG_INFO("starting to stream data"); + setup_data_connection(conn_idx, state, req); + break; + default: + LOG_ERR("invalid request %d", req->type); + exit(-1); + } +} + +// Handle data or command event incoming on socket with index i. +void read_from_socket(plasma_manager_state* state, int i, plasma_request* req) { + ssize_t r, s; + switch (state->conn[i].type) { + case CONN_CONTROL: + r = read(state->waiting[i].fd, req, sizeof(plasma_request)); + if (r == 1) { + LOG_ERR("read error"); + } else if (r == 0) { + LOG_INFO("connection with id %d disconnected", state->conn[i].id); + remove_conn(state, i); + } else { + process_command(i, state, req); + } + break; + case CONN_READ_DATA: + LOG_DEBUG("polled CONN_READ_DATA"); + r = read(state->waiting[i].fd, state->conn[i].buf.data + state->conn[i].cursor, BUFSIZE); + if (r == -1) { + LOG_ERR("read error"); + } else if (r == 0) { + LOG_INFO("end of file"); + } else { + state->conn[i].cursor += r; + } + if (r == 0) { + close(state->waiting[i].fd); + state->waiting[i].fd = 0; + state->waiting[i].events = 0; + plasma_seal(state->conn[i].store_conn, state->conn[i].buf.object_id); + } + break; + case CONN_WRITE_DATA: + LOG_DEBUG("polled CONN_WRITE_DATA"); + s = state->conn[i].buf.size - state->conn[i].cursor; + if (s > BUFSIZE) + s = BUFSIZE; + r = write(state->waiting[i].fd, state->conn[i].buf.data + state->conn[i].cursor, s); + if (r != s) { + if (r > 0) { + LOG_ERR("partial write on fd %d", state->waiting[i].fd); + } else { + LOG_ERR("write error"); + exit(-1); + } + } else { + state->conn[i].cursor += r; + } + if (r == 0) { + close(state->waiting[i].fd); + state->waiting[i].fd = 0; + state->waiting[i].events = 0; + } + break; + default: + LOG_ERR("invalid connection type"); + exit(-1); + } +} + +// Main event loop of the plasma manager. +void event_loop(int sock, plasma_manager_state* state) { + // Add listening socket. + add_conn(state, CONN_CONTROL, sock, POLLIN, NULL); + plasma_request req; + while (1) { + int num_ready = poll(state->waiting, state->num_conn, -1); + if (num_ready < 0) { + LOG_ERR("poll failed"); + exit(-1); + } + for (int i = 0; i < state->num_conn; ++i) { + if (state->waiting[i].revents == 0) + continue; + if (state->waiting[i].fd == sock) { + // Handle new incoming connections. + int new_socket = accept(sock, NULL, NULL); + if (new_socket < 0) { + if (errno != EWOULDBLOCK) { + LOG_ERR("accept failed"); + exit(-1); + } + break; + } + int conn_id = add_conn(state, CONN_CONTROL, new_socket, POLLIN, NULL); + LOG_INFO("new connection with id %d", conn_id); + } else { + read_from_socket(state, i, &req); + } + } + } +} + +// Register this plasma manager with the nameserver. +void register_with_nameserver(const char* nameserver_addr, int nameserver_port, + const char* manager_addr, int manager_port) { + int fd = socket(PF_INET, SOCK_STREAM, 0); + if (fd < 0) { + LOG_ERR("socket for nameserver connection could not be established"); + exit(-1); + } + struct hostent *host = gethostbyname(nameserver_addr); + if (!host) { + LOG_ERR("nameserver %s not found", nameserver_addr); + exit(-1); + } + struct sockaddr_in nameserver; + memset(&nameserver, 0, sizeof(struct sockaddr_in)); + nameserver.sin_family = AF_INET; + bcopy(host->h_addr, &nameserver.sin_addr.s_addr, host->h_length); + nameserver.sin_port = htons(nameserver_port); + if (connect(fd, (struct sockaddr*) &nameserver, sizeof(nameserver)) == -1) { + LOG_ERR("could not connect to nameserver %s:%d", nameserver_addr, nameserver_port); + exit(-1); + } + plasma_request req = { .type = PLASMA_REGISTER, .port = manager_port }; + // TODO(pcm): input validation + sscanf(manager_addr, "%" SCNu8 ".%" SCNu8 ".%" SCNu8 ".%" SCNu8, &req.addr[0], &req.addr[1], &req.addr[2], &req.addr[3]); + plasma_send(fd, &req); + close(fd); +} + +void start_server(const char *store_socket_name, const char* master_addr, + const char* nameserver_addr, int nameserver_port) { + struct sockaddr_in name; + int sock = socket(PF_INET, SOCK_STREAM, 0); + if (sock < 0) { + LOG_ERR("could not create socket"); + exit(-1); + } + name.sin_family = AF_INET; + name.sin_port = 0; + name.sin_addr.s_addr = htonl(INADDR_ANY); + int on = 1; + // TODO(pcm): http://stackoverflow.com/q/1150635 + if (ioctl(sock, FIONBIO, (char*) &on) < 0) { + LOG_ERR("ioctl failed"); + close(sock); + exit(-1); + } + if (bind(sock, (struct sockaddr*) &name, sizeof(name)) < 0) { + LOG_ERR("could not bind socket"); + exit(-1); + } + socklen_t len = sizeof(name); + if (getsockname(sock, (struct sockaddr*) &name, &len) == -1) { + LOG_ERR("getsockname failed"); + } else { + LOG_INFO("listening on port %d", ntohs(name.sin_port)); + } + if (listen(sock, 5) == -1) { + LOG_ERR("could not listen to socket"); + exit(-1); + } + register_with_nameserver(nameserver_addr, nameserver_port, master_addr, ntohs(name.sin_port)); + plasma_manager_state state; + init_manager_state(&state, store_socket_name); + event_loop(sock, &state); +} + + + +int main(int argc, char* argv[]) { + // Socket name of the plasma store this manager is connected to. + char *store_socket_name = NULL; + // IP address and port of the nameserver. + char *nameserver_addr_port = NULL; + // IP address this host can be reached at from the outside. + char *master_addr = NULL; + int c; + while ((c = getopt(argc, argv, "n:s:m:")) != -1) { + switch (c) { + case 's': + store_socket_name = optarg; + break; + case 'n': + nameserver_addr_port = optarg; + break; + case 'm': + master_addr = optarg; + break; + default: + LOG_ERR("unknown option %c", c); + exit(-1); + } + } + if (!store_socket_name) { + LOG_ERR("please specify socket for connecting to the plasma store with -s switch"); + exit(-1); + } + if (!master_addr) { + LOG_ERR("please specify ip address of the current host in the format 123.456.789.10 with -m switch"); + exit(-1); + } + // Parse nameserver address and port. + const char *format = "%15[0-9.]:%5[0-9]"; + char nameserver_addr[16] = { 0 }; + char nameserver_port[6] = { 0 }; + if(!nameserver_addr_port || sscanf(nameserver_addr_port, format, nameserver_addr, nameserver_port) != 2) { + LOG_ERR("need to specify nameserver address in the format 123.456.789.10:12345 with -n switch"); + exit(-1); + } + start_server(store_socket_name, master_addr, nameserver_addr, atoi(nameserver_port)); +} diff --git a/src/plasma_store.c b/src/plasma_store.c index 3260cde9e..d67420c24 100644 --- a/src/plasma_store.c +++ b/src/plasma_store.c @@ -28,9 +28,12 @@ #define MAX_NUM_CLIENTS 2048 typedef struct { - int num_clients; // number of clients connected - int client_id[MAX_NUM_CLIENTS]; // unique identifier for the clients - struct pollfd waiting[MAX_NUM_CLIENTS]; // data structure for polling + // Number of clients connected. + int num_clients; + // Unique identifier for the clients. + int client_id[MAX_NUM_CLIENTS]; + // Data structure for polling. + struct pollfd waiting[MAX_NUM_CLIENTS]; } plasma_store_state; void init_state(plasma_store_state* s) { @@ -48,8 +51,8 @@ int add_client(plasma_store_state* s, int fd) { return curr_id++; } -// remove the client at index i by swapping it with the -// client at index num_clients-1 and zeroing the latter out +// Remove the client at index i by swapping it with the +// client at index num_clients-1 and zeroing the latter out. void remove_client(plasma_store_state* s, int i) { memcpy(&s->waiting[i], &s->waiting[s->num_clients-1], sizeof(struct pollfd)); memset(&s->waiting[s->num_clients-1], 0, sizeof(struct pollfd)); @@ -59,27 +62,35 @@ void remove_client(plasma_store_state* s, int i) { } typedef struct { - plasma_id object_id; // object id of this object - plasma_object_info info; // object info like size, creation time and owner - int fd; // memory mapped file containing the object - UT_hash_handle handle; // handle for the uthash table + // Object id of this object. + plasma_id object_id; + // Object info like size, creation time and owner. + plasma_object_info info; + // Memory mapped file containing the object. + int fd; + // Handle for the uthash table. + UT_hash_handle handle; } object_table_entry; // objects that are still being written by their owner process object_table_entry* open_objects = NULL; -// objects that have already been sealed by their owner process and -// can now be shared with other processes +// Objects that have already been sealed by their owner process and +// can now be shared with other processes. object_table_entry* sealed_objects = NULL; typedef struct { - plasma_id object_id; // object id of this object - int num_waiting; // number of processes waiting for the object - int conn[MAX_NUM_CLIENTS]; // socket connections to waiting clients - UT_hash_handle handle; // handle for the uthash table + // Object id of this object. + plasma_id object_id; + // Number of processes waiting for the object. + int num_waiting; + // Socket connections to waiting clients. + int conn[MAX_NUM_CLIENTS]; + // Handle for the uthash table. + UT_hash_handle handle; } object_notify_entry; -// objects that processes are waiting for +// Objects that processes are waiting for. object_notify_entry* objects_notify = NULL; // Create a buffer. This is creating a temporary file and then @@ -107,7 +118,7 @@ int create_buffer(int64_t size) { return fd; } -// create a new object buffer in the hash table +// Create a new object buffer in the hash table. void create_object(int conn, plasma_request* req) { LOG_INFO("creating object"); // TODO(pcm): add object_id here int fd = create_buffer(req->size); @@ -125,7 +136,7 @@ void create_object(int conn, plasma_request* req) { send_fd(conn, fd, (char*) &reply, sizeof(plasma_reply)); } -// get an object from the hash table +// Get an object from the hash table. void get_object(int conn, plasma_request* req) { object_table_entry *entry; HASH_FIND(handle, sealed_objects, &req->object_id, sizeof(plasma_id), entry); @@ -146,7 +157,7 @@ void get_object(int conn, plasma_request* req) { } } -// seal an object that has been created in the hash table +// Seal an object that has been created in the hash table. void seal_object(int conn, plasma_request* req) { LOG_INFO("sealing object"); // TODO(pcm): add object_id here object_table_entry *entry; @@ -158,7 +169,7 @@ void seal_object(int conn, plasma_request* req) { int64_t size = entry->info.size; int fd = entry->fd; HASH_ADD(handle, sealed_objects, object_id, sizeof(plasma_id), entry); - // inform processes that the object is ready now + // Inform processes that the object is ready now. object_notify_entry* notify_entry; HASH_FIND(handle, objects_notify, &req->object_id, sizeof(plasma_id), notify_entry); if (!notify_entry) { @@ -205,7 +216,7 @@ void event_loop(int socket) { continue; if (state.waiting[i].fd == socket) { while (1) { - // handle new incoming connections + // Handle new incoming connections. int new_socket = accept(socket, NULL, NULL); if (new_socket < 0) { if (errno != EWOULDBLOCK) { diff --git a/test/nameserver.py b/test/nameserver.py new file mode 100644 index 000000000..6fcd965ca --- /dev/null +++ b/test/nameserver.py @@ -0,0 +1,33 @@ +import collections +import socket +import ctypes +import plasma + +DEFAULT_PORT = 16121 + +Connection = collections.namedtuple("Connection", ["address", "port"]) + +# list of IP addresses and ports of managers +object_managers = [] + +def send_addresses(conn, object_managers): + manager = plasma.PlasmaManager(conn.address, conn.port) + for (manager_id, object_manager) in enumerate(object_managers): + manager.register(manager_id, object_manager.address, object_manager.port) + +if __name__ == '__main__': + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(('', DEFAULT_PORT)) + sock.listen(5) + + while True: + (client, address) = sock.accept() + request = plasma.PlasmaRequest() + client.recv_into(request) + address = ".".join(map(str, request.addr[:])) + conn = Connection(address=address, port=request.port) + print "object manager " + str(conn) + " connected" + object_managers.append(conn) + for c in object_managers: + send_addresses(c, object_managers) +