diff --git a/lib/python/plasma.py b/lib/python/plasma.py index 713c3f3ad..5d18b06e4 100644 --- a/lib/python/plasma.py +++ b/lib/python/plasma.py @@ -9,77 +9,38 @@ ID = ctypes.c_ubyte * 20 class PlasmaID(ctypes.Structure): _fields_ = [("plasma_id", ID)] -# these must be in sync with plasma_request_type in plasma.h (can we have a test for that?) -PLASMA_CREATE = 0 -PLASMA_GET = 1 -PLASMA_SEAL = 2 -PLASMA_TRANSFER = 3 -PLASMA_DATA = 4 -PLASMA_REGISTER = 5 - -class PlasmaRequest(ctypes.Structure): - _fields_ = [("type", ctypes.c_int), - ("object_id", PlasmaID), - ("size", ctypes.c_int64), - ("addr", Addr), - ("port", ctypes.c_int)] - -class PlasmaBuffer(ctypes.Structure): - _fields_ = [("plasma_id", PlasmaID), - ("data", ctypes.c_void_p), - ("size", ctypes.c_int64), - ("writable", ctypes.c_int)] - def make_plasma_id(string): if len(string) != 20: raise Exception("PlasmaIDs must be 20 characters long") object_id = map(ord, string) return PlasmaID(plasma_id=ID(*object_id)) -class PlasmaManager(object): - """The PlasmaManager is used to manage a PlasmaStore. - - There should be one PlasmaManager per PlasmaStore. The PlasmaManager is - responsible for interfacing with other PlasmaManagers in order to transfer - objects between PlasmaStores. This class sends commands to the C - implementation of the PlasmaManager using sockets. - - Attributes: - sock: The socket used to communicate with the C implementation of the - PlasmaManager. - """ - - def __init__(self, addr, port): - """Initialize the PlasmaManager.""" - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.connect((addr, port)) - - def transfer(self, addr, port, object_id): - """Transfer local object with id object_id to manager with id manager_id.""" - req = PlasmaRequest(type=PLASMA_TRANSFER, object_id=make_plasma_id(object_id), - addr=Addr(*map(int, addr.split("."))), port=port) - print "sending port", port - self.sock.send(buffer(req)[:]) - class PlasmaClient(object): - """The PlasmaClient is used to interface with a PlasmaStore. + """The PlasmaClient is used to interface with a plasma store and a plasma manager. The PlasmaClient can ask the PlasmaStore to allocate a new buffer, seal a buffer, and get a buffer. Buffers are referred to by object IDs, which are strings. """ - def __init__(self, socket_name): + def __init__(self, socket_name, addr=None, port=None): + """Initialize the PlasmaClient. + + Args: + socket_name (str): Name of the socket the plasma store is listening at. + addr (str): IPv4 address of plasma manager attached to the plasma store. + port (int): Port number of the plasma manager attached to the plasma store. + """ plasma_client_library = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/plasma_client.so") self.client = ctypes.cdll.LoadLibrary(plasma_client_library) self.client.plasma_store_connect.restype = ctypes.c_int - self.client.plasma_create.argtypes = [ctypes.c_int, PlasmaID, ctypes.c_int64] - self.client.plasma_create.restype = PlasmaBuffer + self.client.plasma_create.argtypes = [ctypes.c_int, PlasmaID, ctypes.c_int64, ctypes.POINTER(ctypes.c_void_p)] + self.client.plasma_create.restype = None - self.client.plasma_get.argtypes = [ctypes.c_int, PlasmaID] - self.client.plasma_get.restype = PlasmaBuffer + self.client.plasma_get.argtypes = [ctypes.c_int, PlasmaID, ctypes.POINTER(ctypes.c_int64), ctypes.POINTER(ctypes.c_void_p)] + self.client.plasma_get.restype = None self.client.plasma_seal.argtypes = [ctypes.c_int, PlasmaID] self.client.plasma_seal.restype = None @@ -94,6 +55,11 @@ class PlasmaClient(object): self.sock = self.client.plasma_store_connect(socket_name) + if addr is not None and port is not None: + self.manager_conn = self.client.plasma_manager_connect(addr, port) + else: + self.manager_conn = -1 # not connected + def create(self, object_id, size): """Create a new buffer in the PlasmaStore for a particular object ID. @@ -103,8 +69,9 @@ class PlasmaClient(object): object_id (str): A string used to identify an object. size (int): The size in bytes of the created buffer. """ - buf = self.client.plasma_create(self.sock, make_plasma_id(object_id), size) - return self.buffer_from_read_write_memory(buf.data, buf.size) + data = ctypes.c_void_p() + self.client.plasma_create(self.sock, make_plasma_id(object_id), size, ctypes.byref(data)) + return self.buffer_from_read_write_memory(data, size) def get(self, object_id): """Create a buffer from the PlasmaStore based on object ID. @@ -115,8 +82,10 @@ class PlasmaClient(object): Args: object_id (str): A string used to identify an object. """ - buf = self.client.plasma_get(self.sock, make_plasma_id(object_id)) - return self.buffer_from_memory(buf.data, buf.size) + size = ctypes.c_int64() + data = ctypes.c_void_p() + buf = self.client.plasma_get(self.sock, make_plasma_id(object_id), ctypes.byref(size), ctypes.byref(data)) + return self.buffer_from_memory(data, size) def seal(self, object_id): """Seal the buffer in the PlasmaStore for a particular object ID. @@ -128,3 +97,16 @@ class PlasmaClient(object): object_id (str): A string used to identify an object. """ self.client.plasma_seal(self.sock, make_plasma_id(object_id)) + + def transfer(self, addr, port, object_id): + """Transfer local object with id object_id to another plasma instance + + Args: + addr (str): IPv4 address of the plasma instance the object is sent to. + port (int): Port number of the plasma instance the object is sent to. + object_id (str): A string used to identify an object. + """ + if self.manager_conn == -1: + raise Exception("Not connected to the plasma manager socket") + self.client.plasma_transfer(self.manager_conn, addr, port, make_plasma_id(object_id)) + diff --git a/src/example.c b/src/example.c index 20ecd7b64..1d8d74b5c 100644 --- a/src/example.c +++ b/src/example.c @@ -16,6 +16,8 @@ int main(int argc, char *argv[]) { int conn = -1; + int64_t size; + void *data; int c; plasma_id id = {{255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255}}; @@ -26,14 +28,14 @@ int main(int argc, char *argv[]) { break; case 'c': assert(conn != -1); - plasma_create(conn, id, 100); + plasma_create(conn, id, 100, &data); break; case 'f': assert(conn != -1); plasma_seal(conn, id); break; case 'g': - plasma_get(conn, id); + plasma_get(conn, id, &size, &data); break; default: abort(); @@ -42,4 +44,3 @@ int main(int argc, char *argv[]) { assert(conn != -1); close(conn); } - diff --git a/src/plasma.h b/src/plasma.h index 39d04700f..9fdec86c2 100644 --- a/src/plasma.h +++ b/src/plasma.h @@ -65,11 +65,16 @@ typedef struct { int writable; } plasma_buffer; +// Connect to the local plasma store UNIX domain socket int plasma_store_connect(const char* socket_name); -plasma_buffer plasma_create(int conn, plasma_id object_id, int64_t size); -plasma_buffer plasma_get(int conn, plasma_id object_id); -void plasma_seal(int fd, plasma_id object_id); -void plasma_send(int fd, plasma_request *req); +// Connect to a possibly remote plasma manager +int plasma_manager_connect(const char* addr, int port); + +void plasma_create(int store, plasma_id object_id, int64_t size, void **data); +void plasma_get(int store, plasma_id object_id, int64_t *size, void **data); +void plasma_seal(int store, plasma_id object_id); + +void plasma_send(int conn, plasma_request *req); #endif diff --git a/src/plasma_client.c b/src/plasma_client.c index 5dd36ae7e..e8e160d70 100644 --- a/src/plasma_client.c +++ b/src/plasma_client.c @@ -23,7 +23,7 @@ void plasma_send(int fd, plasma_request *req) { } } -plasma_buffer plasma_create(int conn, plasma_id object_id, int64_t size) { +void plasma_create(int conn, plasma_id object_id, int64_t size, void **data) { LOG_INFO("called plasma_create on conn %d with size %" PRId64, conn, size); plasma_request req = { .type = PLASMA_CREATE, .object_id = object_id, .size = size }; plasma_send(conn, &req); @@ -31,16 +31,14 @@ plasma_buffer plasma_create(int conn, plasma_id object_id, int64_t size) { int fd = recv_fd(conn, (char*)&reply, sizeof(plasma_reply)); assert(reply.type == PLASMA_OBJECT); assert(reply.size == size); - void *data = mmap(NULL, reply.size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - if (data == MAP_FAILED) { + *data = mmap(NULL, reply.size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (*data == MAP_FAILED) { LOG_ERR("mmap failed"); exit(-1); } - plasma_buffer buffer = { object_id, data, size, 1 }; - return buffer; } -plasma_buffer plasma_get(int conn, plasma_id object_id) { +void plasma_get(int conn, plasma_id object_id, int64_t *size, void **data) { plasma_request req = { .type = PLASMA_GET, .object_id = object_id }; plasma_send(conn, &req); plasma_reply reply; @@ -52,13 +50,12 @@ plasma_buffer plasma_get(int conn, plasma_id object_id) { fd = new_fd; } assert(reply.type == PLASMA_OBJECT); - void *data = mmap(NULL, reply.size, PROT_READ, MAP_SHARED, fd, 0); - if (data == MAP_FAILED) { + *data = mmap(NULL, reply.size, PROT_READ, MAP_SHARED, fd, 0); + if (*data == MAP_FAILED) { LOG_ERR("mmap failed"); exit(-1); } - plasma_buffer buffer = { object_id, data, reply.size, 0 }; - return buffer; + *size = reply.size; } void plasma_seal(int fd, plasma_id object_id) { @@ -94,3 +91,41 @@ int plasma_store_connect(const char* socket_name) { } return fd; } + +#define h_addr h_addr_list[0] + +int plasma_manager_connect(const char* ip_addr, int port) { + int fd = socket(PF_INET, SOCK_STREAM, 0); + if (fd < 0) { + LOG_ERR("could not create socket"); + exit(-1); + } + + struct hostent *manager = gethostbyname(ip_addr); // TODO(pcm): cache this + if (!manager) { + LOG_ERR("plasma manager %s not found", ip_addr); + exit(-1); + } + + struct sockaddr_in addr; + addr.sin_family = AF_INET; + bcopy(manager->h_addr, &addr.sin_addr.s_addr, manager->h_length); + addr.sin_port = htons(port); + + int r = connect(fd, (struct sockaddr*) &addr, sizeof(addr)); + if (r < 0) { + LOG_ERR("could not establish connection to manager with id %s:%d", &ip_addr[0], port); + exit(-1); + } + return fd; +} + +void plasma_transfer(int manager, const char* addr, int port, plasma_id object_id) { + plasma_request req = {.type = PLASMA_TRANSFER, .object_id = object_id, .port = port}; + char* end = NULL; + for (int i = 0; i < 4; ++i) { + req.addr[i] = strtol(end ? end : addr, &end, 10); + end += 1; // skip the '.' + } + plasma_send(manager, &req); +} diff --git a/src/plasma_manager.c b/src/plasma_manager.c index 34a8e474a..90e0d4087 100644 --- a/src/plasma_manager.c +++ b/src/plasma_manager.c @@ -22,43 +22,7 @@ #include #include "plasma.h" - -#define MAX_CONNECTIONS 2048 -#define MAX_NUM_MANAGERS 1024 - -enum conn_type { - // Connection to send commands to the manager. - CONN_CONTROL, - // Connection to send data to another manager. - CONN_WRITE_DATA, - // Connection to receive data from another manager. - CONN_READ_DATA -}; - -typedef struct { - // Unique identifier for the connection. - int id; - // Of type conn_type. - int type; - // Socket of the plasma store that is accessed for reading or writing data for - // this connection. - int store_conn; - // Buffer this connection is reading from or writing to. - plasma_buffer buf; - // Current position in the buffer. - int64_t cursor; -} conn_state; - -typedef struct { - // Name of the socket connecting to local plasma store. - const char* store_socket_name; - // Number of connections. - int num_conn; - // For the "poll" system call. - struct pollfd waiting[MAX_CONNECTIONS]; - // Status of connections (both control and data). - conn_state conn[MAX_CONNECTIONS]; -} plasma_manager_state; +#include "plasma_manager.h" void init_manager_state(plasma_manager_state *s, const char* store_socket_name) { memset(&s->waiting, 0, sizeof(s->waiting)); @@ -67,22 +31,17 @@ void init_manager_state(plasma_manager_state *s, const char* store_socket_name) s->store_socket_name = store_socket_name; } -#define h_addr h_addr_list[0] - // Add connection for sending commands or data to another plasma manager -// (returns the connection id). +// (returns the connection index). int add_conn(plasma_manager_state* s, int type, int fd, int events, plasma_buffer* buf) { - static int conn_id = 0; s->waiting[s->num_conn].fd = fd; s->waiting[s->num_conn].events = events; - s->conn[s->num_conn].id = conn_id; s->conn[s->num_conn].type = type; if (buf) { s->conn[s->num_conn].buf = *buf; } s->conn[s->num_conn].cursor = 0; - s->num_conn += 1; - return conn_id++; + return s->num_conn++; } // Remove connection with index i by swapping it with the last element. @@ -100,33 +59,15 @@ void remove_conn(plasma_manager_state* s, int i) { // the data header to the other object manager. void initiate_transfer(plasma_manager_state* state, plasma_request* req) { int c = plasma_store_connect(state->store_socket_name); - plasma_buffer buf = plasma_get(c, req->object_id); - - int fd = socket(PF_INET, SOCK_STREAM, 0); - if (fd < 0) { - LOG_ERR("could not create socket"); - exit(-1); - } + plasma_buffer buf = { .object_id = req->object_id, .writable = 0 }; + plasma_get(c, req->object_id, &buf.size, &buf.data); char ip_addr[16]; snprintf(ip_addr, 32, "%d.%d.%d.%d", req->addr[0], req->addr[1], req->addr[2], req->addr[3]); - struct hostent *manager = gethostbyname(ip_addr); // TODO(pcm): cache this - if (!manager) { - LOG_ERR("plasma manager %s not found", ip_addr); - exit(-1); - } - struct sockaddr_in addr; - addr.sin_family = AF_INET; - bcopy(manager->h_addr, &addr.sin_addr.s_addr, manager->h_length); - addr.sin_port = htons(req->port); - - int r = connect(fd, (struct sockaddr*) &addr, sizeof(addr)); - if (r < 0) { - LOG_ERR("could not establish connection to manager with id %s:%d", &ip_addr[0], req->port); - exit(-1); - } + + int fd = plasma_manager_connect(&ip_addr[0], req->port); add_conn(state, CONN_WRITE_DATA, fd, POLLOUT, &buf); @@ -139,7 +80,10 @@ void setup_data_connection(int conn_idx, plasma_manager_state* state, plasma_req int store_conn = plasma_store_connect(state->store_socket_name); state->conn[conn_idx].type = CONN_READ_DATA; state->conn[conn_idx].store_conn = store_conn; - state->conn[conn_idx].buf = plasma_create(store_conn, req->object_id, req->size); + state->conn[conn_idx].buf.object_id = req->object_id; + state->conn[conn_idx].buf.size = req->size; + state->conn[conn_idx].buf.writable = 1; + plasma_create(store_conn, req->object_id, req->size, &state->conn[conn_idx].buf.data); state->conn[conn_idx].cursor = 0; } @@ -170,7 +114,7 @@ void read_from_socket(plasma_manager_state* state, int i, plasma_request* req) { if (r == 1) { LOG_ERR("read error"); } else if (r == 0) { - LOG_INFO("connection with id %d disconnected", state->conn[i].id); + LOG_INFO("connection with index %d disconnected", i); remove_conn(state, i); } else { process_command(i, state, req); diff --git a/src/plasma_manager.h b/src/plasma_manager.h new file mode 100644 index 000000000..efa326cb9 --- /dev/null +++ b/src/plasma_manager.h @@ -0,0 +1,42 @@ +#ifndef PLASMA_MANAGER_H +#define PLASMA_MANAGER_H + +#include + +#define MAX_CONNECTIONS 2048 + +enum conn_type { + // Connection to send commands to the manager. + CONN_CONTROL, + // Connection to send data to another manager. + CONN_WRITE_DATA, + // Connection to receive data from another manager. + CONN_READ_DATA +}; + +typedef struct { + // Of type conn_type. + int type; + // Socket of the plasma store that is accessed for reading or writing data for + // this connection. + int store_conn; + // Buffer this connection is reading from or writing to. + plasma_buffer buf; + // Current position in the buffer. + int64_t cursor; +} conn_state; + +typedef struct { + // ID of this manager + int64_t manager_id; + // Name of the socket connecting to local plasma store. + const char* store_socket_name; + // Number of connections. + int num_conn; + // For the "poll" system call. + struct pollfd waiting[MAX_CONNECTIONS]; + // Status of connections (both control and data). + conn_state conn[MAX_CONNECTIONS]; +} plasma_manager_state; + +#endif diff --git a/test/test.py b/test/test.py index b0284cb54..82ba406a8 100644 --- a/test/test.py +++ b/test/test.py @@ -69,9 +69,6 @@ class TestPlasmaManager(unittest.TestCase): plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/plasma_store") self.p2 = subprocess.Popen([plasma_store_executable, "-s", "/tmp/store1"]) self.p3 = subprocess.Popen([plasma_store_executable, "-s", "/tmp/store2"]) - # Connect two PlasmaClients. - self.client1 = plasma.PlasmaClient("/tmp/store1") - self.client2 = plasma.PlasmaClient("/tmp/store2") # Start two PlasmaManagers. self.port1 = random.randint(10000, 50000) self.port2 = random.randint(10000, 50000) @@ -79,13 +76,13 @@ class TestPlasmaManager(unittest.TestCase): self.p4 = subprocess.Popen([plasma_manager_executable, "-s", "/tmp/store1", "-m", "127.0.0.1", "-p", str(self.port1)]) self.p5 = subprocess.Popen([plasma_manager_executable, "-s", "/tmp/store2", "-m", "127.0.0.1", "-p", str(self.port2)]) time.sleep(0.1) - # Connect two Python PlasmaManagers. - self.manager1 = plasma.PlasmaManager("127.0.0.1", self.port1) - self.manager2 = plasma.PlasmaManager("127.0.0.1", self.port2) + # Connect two PlasmaClients. + self.client1 = plasma.PlasmaClient("/tmp/store1", "127.0.0.1", self.port1) + self.client2 = plasma.PlasmaClient("/tmp/store2", "127.0.0.1", self.port2) time.sleep(0.5) def tearDown(self): - # Kill the nameserver, PlasmaStore and PlasmaManager processes. + # Kill the PlasmaStore and PlasmaManager processes. self.p2.kill() self.p3.kill() self.p4.kill() @@ -101,11 +98,11 @@ class TestPlasmaManager(unittest.TestCase): # Seal the buffer. self.client1.seal(object_id1) # Transfer the buffer to the the other PlasmaStore. - self.manager1.transfer("127.0.0.1", self.port2, object_id1) + self.client1.transfer("127.0.0.1", self.port2, object_id1) # Compare the two buffers. self.assertEqual(self.client1.get(object_id1)[:], self.client2.get(object_id1)[:]) # Transfer the buffer again. - self.manager1.transfer("127.0.0.1", self.port2, object_id1) + self.client1.transfer("127.0.0.1", self.port2, object_id1) # Compare the two buffers. self.assertEqual(self.client1.get(object_id1)[:], self.client2.get(object_id1)[:]) # Create a new object id string. @@ -117,7 +114,7 @@ class TestPlasmaManager(unittest.TestCase): # Seal the buffer. self.client2.seal(object_id2) # Transfer the buffer to the the other PlasmaStore. - self.manager2.transfer("127.0.0.1", self.port1, object_id2) + self.client2.transfer("127.0.0.1", self.port1, object_id2) # Compare the two buffers. self.assertEqual(self.client1.get(object_id2)[:], self.client2.get(object_id2)[:])