diff --git a/lib/python/plasma.py b/lib/python/plasma.py index 28b3f086d..713c3f3ad 100644 --- a/lib/python/plasma.py +++ b/lib/python/plasma.py @@ -16,12 +16,9 @@ PLASMA_SEAL = 2 PLASMA_TRANSFER = 3 PLASMA_DATA = 4 PLASMA_REGISTER = 5 -PLASMA_GET_MANAGER_PORT = 6 -PLASMA_RETURN_MANAGER_PORT = 7 class PlasmaRequest(ctypes.Structure): _fields_ = [("type", ctypes.c_int), - ("manager_id", ctypes.c_int), ("object_id", PlasmaID), ("size", ctypes.c_int64), ("addr", Addr), @@ -57,16 +54,11 @@ class PlasmaManager(object): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((addr, port)) - def register(self, manager_id, addr, port): - """Register another object manager.""" - req = PlasmaRequest(type=PLASMA_REGISTER, manager_id=manager_id, - addr=Addr(*map(int, addr.split("."))), port=port) - self.sock.send(buffer(req)[:]) - - def transfer(self, manager_id, object_id): + def transfer(self, addr, port, object_id): """Transfer local object with id object_id to manager with id manager_id.""" - req = PlasmaRequest(type=PLASMA_TRANSFER, manager_id=manager_id, - object_id=make_plasma_id(object_id)) + req = PlasmaRequest(type=PLASMA_TRANSFER, object_id=make_plasma_id(object_id), + addr=Addr(*map(int, addr.split("."))), port=port) + print "sending port", port self.sock.send(buffer(req)[:]) class PlasmaClient(object): diff --git a/src/plasma.h b/src/plasma.h index 2e4fbfdcd..39d04700f 100644 --- a/src/plasma.h +++ b/src/plasma.h @@ -38,12 +38,10 @@ enum plasma_request_type { PLASMA_SEAL, // seal an object PLASMA_TRANSFER, // request transfer to another store PLASMA_DATA, // header for sending data - PLASMA_REGISTER // register a plasma manager }; typedef struct { int type; - int manager_id; plasma_id object_id; int64_t size; uint8_t addr[4]; diff --git a/src/plasma_manager.c b/src/plasma_manager.c index b439fcf35..34a8e474a 100644 --- a/src/plasma_manager.c +++ b/src/plasma_manager.c @@ -49,13 +49,6 @@ typedef struct { int64_t cursor; } conn_state; -typedef struct { - // Address of the manager. - struct sockaddr_in name; - // Is this manager connected? - int connected; -} manager_state; - typedef struct { // Name of the socket connecting to local plasma store. const char* store_socket_name; @@ -65,36 +58,17 @@ typedef struct { struct pollfd waiting[MAX_CONNECTIONS]; // Status of connections (both control and data). conn_state conn[MAX_CONNECTIONS]; - // Other plasma managers in the cluster. - manager_state managers[MAX_NUM_MANAGERS]; } plasma_manager_state; void init_manager_state(plasma_manager_state *s, const char* store_socket_name) { memset(&s->waiting, 0, sizeof(s->waiting)); memset(&s->conn, 0, sizeof(s->conn)); - memset(&s->managers, 0, sizeof(s->managers)); s->num_conn = 0; s->store_socket_name = store_socket_name; } #define h_addr h_addr_list[0] -// Add name info for another plasma manager from the cluster. -void add_manager(plasma_manager_state *s, int manager_id, char *ip_address, int port) { - assert(ip_address); - assert(s); - struct hostent *manager = gethostbyname(ip_address); - if (!manager) { - LOG_ERR("plasma manager %s not found", ip_address); - exit(-1); - } - s->managers[manager_id].connected = 1; - struct sockaddr_in *name = &s->managers[manager_id].name; - name->sin_family = AF_INET; - bcopy(manager->h_addr, &name->sin_addr.s_addr, manager->h_length); - name->sin_port = htons(port); -} - // Add connection for sending commands or data to another plasma manager // (returns the connection id). int add_conn(plasma_manager_state* s, int type, int fd, int events, plasma_buffer* buf) { @@ -125,7 +99,6 @@ void remove_conn(plasma_manager_state* s, int i) { // a connection to both the manager and the local object store and sends // the data header to the other object manager. void initiate_transfer(plasma_manager_state* state, plasma_request* req) { - int manager_id = req->manager_id; int c = plasma_store_connect(state->store_socket_name); plasma_buffer buf = plasma_get(c, req->object_id); @@ -134,9 +107,24 @@ void initiate_transfer(plasma_manager_state* state, plasma_request* req) { LOG_ERR("could not create socket"); exit(-1); } - int r = connect(fd, (struct sockaddr*) &state->managers[manager_id].name, sizeof(state->managers[manager_id].name)); + + char ip_addr[16]; + snprintf(ip_addr, 32, "%d.%d.%d.%d", + req->addr[0], req->addr[1], + req->addr[2], req->addr[3]); + struct hostent *manager = gethostbyname(ip_addr); // TODO(pcm): cache this + if (!manager) { + LOG_ERR("plasma manager %s not found", ip_addr); + exit(-1); + } + struct sockaddr_in addr; + addr.sin_family = AF_INET; + bcopy(manager->h_addr, &addr.sin_addr.s_addr, manager->h_length); + addr.sin_port = htons(req->port); + + int r = connect(fd, (struct sockaddr*) &addr, sizeof(addr)); if (r < 0) { - LOG_ERR("could not establish connection to manager with id %d", manager_id); + LOG_ERR("could not establish connection to manager with id %s:%d", &ip_addr[0], req->port); exit(-1); } @@ -160,22 +148,9 @@ void setup_data_connection(int conn_idx, plasma_manager_state* state, plasma_req void process_command(int conn_idx, plasma_manager_state* state, plasma_request* req) { switch (req->type) { case PLASMA_TRANSFER: - LOG_INFO("transfering object to manager with id %d", req->manager_id); + LOG_INFO("transfering object to manager with port %d", req->port); initiate_transfer(state, req); break; - case PLASMA_REGISTER: { - char buff[16]; - snprintf(buff, 32, "%d.%d.%d.%d", - req->addr[0], req->addr[1], - req->addr[2], req->addr[3]); - if (req->manager_id >= MAX_NUM_MANAGERS) { - LOG_ERR("manager_id %d out of bounds", req->manager_id); - } else { - add_manager(state, req->manager_id, buff, req->port); - LOG_INFO("registering %s:%d with id %d", buff, req->port, req->manager_id); - } - } - break; case PLASMA_DATA: LOG_INFO("starting to stream data"); setup_data_connection(conn_idx, state, req); @@ -279,37 +254,7 @@ void event_loop(int sock, plasma_manager_state* state) { } } -// Register this plasma manager with the nameserver. -void register_with_nameserver(const char* nameserver_addr, int nameserver_port, - const char* manager_addr, int manager_port) { - int fd = socket(PF_INET, SOCK_STREAM, 0); - if (fd < 0) { - LOG_ERR("socket for nameserver connection could not be established"); - exit(-1); - } - struct hostent *host = gethostbyname(nameserver_addr); - if (!host) { - LOG_ERR("nameserver %s not found", nameserver_addr); - exit(-1); - } - struct sockaddr_in nameserver; - memset(&nameserver, 0, sizeof(struct sockaddr_in)); - nameserver.sin_family = AF_INET; - bcopy(host->h_addr, &nameserver.sin_addr.s_addr, host->h_length); - nameserver.sin_port = htons(nameserver_port); - if (connect(fd, (struct sockaddr*) &nameserver, sizeof(nameserver)) == -1) { - LOG_ERR("could not connect to nameserver %s:%d", nameserver_addr, nameserver_port); - exit(-1); - } - plasma_request req = { .type = PLASMA_REGISTER, .port = manager_port }; - // TODO(pcm): input validation - sscanf(manager_addr, "%" SCNu8 ".%" SCNu8 ".%" SCNu8 ".%" SCNu8, &req.addr[0], &req.addr[1], &req.addr[2], &req.addr[3]); - plasma_send(fd, &req); - close(fd); -} - -void start_server(const char *store_socket_name, const char* master_addr, - const char* nameserver_addr, int nameserver_port) { +void start_server(const char *store_socket_name, const char* master_addr, int port) { struct sockaddr_in name; int sock = socket(PF_INET, SOCK_STREAM, 0); if (sock < 0) { @@ -317,7 +262,7 @@ void start_server(const char *store_socket_name, const char* master_addr, exit(-1); } name.sin_family = AF_INET; - name.sin_port = 0; + name.sin_port = htons(port); name.sin_addr.s_addr = htonl(INADDR_ANY); int on = 1; // TODO(pcm): http://stackoverflow.com/q/1150635 @@ -330,17 +275,11 @@ void start_server(const char *store_socket_name, const char* master_addr, LOG_ERR("could not bind socket"); exit(-1); } - socklen_t len = sizeof(name); - if (getsockname(sock, (struct sockaddr*) &name, &len) == -1) { - LOG_ERR("getsockname failed"); - } else { - LOG_INFO("listening on port %d", ntohs(name.sin_port)); - } + LOG_INFO("listening on port %d", port); if (listen(sock, 5) == -1) { LOG_ERR("could not listen to socket"); exit(-1); } - register_with_nameserver(nameserver_addr, nameserver_port, master_addr, ntohs(name.sin_port)); plasma_manager_state state; init_manager_state(&state, store_socket_name); event_loop(sock, &state); @@ -349,22 +288,22 @@ void start_server(const char *store_socket_name, const char* master_addr, int main(int argc, char* argv[]) { // Socket name of the plasma store this manager is connected to. char *store_socket_name = NULL; - // IP address and port of the nameserver. - char *nameserver_addr_port = NULL; - // IP address this host can be reached at from the outside. + // IP address of this node char *master_addr = NULL; + // Port number the manager should use + int port; int c; - while ((c = getopt(argc, argv, "n:s:m:")) != -1) { + while ((c = getopt(argc, argv, "s:m:p:")) != -1) { switch (c) { case 's': store_socket_name = optarg; break; - case 'n': - nameserver_addr_port = optarg; - break; case 'm': master_addr = optarg; break; + case 'p': + port = atoi(optarg); + break; default: LOG_ERR("unknown option %c", c); exit(-1); @@ -378,13 +317,5 @@ int main(int argc, char* argv[]) { LOG_ERR("please specify ip address of the current host in the format 123.456.789.10 with -m switch"); exit(-1); } - // Parse nameserver address and port. - const char *format = "%15[0-9.]:%5[0-9]"; - char nameserver_addr[16] = { 0 }; - char nameserver_port[6] = { 0 }; - if (!nameserver_addr_port || sscanf(nameserver_addr_port, format, nameserver_addr, nameserver_port) != 2) { - LOG_ERR("need to specify nameserver address in the format 123.456.789.10:12345 with -n switch"); - exit(-1); - } - start_server(store_socket_name, master_addr, nameserver_addr, atoi(nameserver_port)); + start_server(store_socket_name, master_addr, port); } diff --git a/test/nameserver.py b/test/nameserver.py deleted file mode 100644 index 66fa6cf52..000000000 --- a/test/nameserver.py +++ /dev/null @@ -1,46 +0,0 @@ -import collections -import socket -import ctypes -import atexit - -import plasma - -DEFAULT_PORT = 16121 - -Connection = collections.namedtuple("Connection", ["address", "port"]) - -# list of IP addresses and ports of managers -object_managers = [] - -def send_addresses(conn, object_managers): - manager = plasma.PlasmaManager(conn.address, conn.port) - for (manager_id, object_manager) in enumerate(object_managers): - manager.register(manager_id, object_manager.address, object_manager.port) - -if __name__ == "__main__": - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(("", DEFAULT_PORT)) - sock.listen(5) - - def cleanup(): - sock.shutdown(socket.SHUT_RDWR) - sock.close() - atexit.register(cleanup) - - while True: - client, address = sock.accept() - request = plasma.PlasmaRequest() - client.recv_into(request) - if request.type == plasma.PLASMA_REGISTER: - address = ".".join(map(str, request.addr[:])) - conn = Connection(address=address, port=request.port) - print "object manager " + str(conn) + " connected" - object_managers.append(conn) - for c in object_managers: - send_addresses(c, object_managers) - elif request.type == plasma.PLASMA_GET_MANAGER_PORT: - port = object_managers[request.manager_id].port - req = plasma.PlasmaRequest(type=plasma.PLASMA_RETURN_MANAGER_PORT, port=port) - client.send(buffer(req)[:]) - else: - raise Exception("This code should be unreachable.") diff --git a/test/test.py b/test/test.py index ba9789e81..b0284cb54 100644 --- a/test/test.py +++ b/test/test.py @@ -5,11 +5,12 @@ import sys import unittest import random import time +import tempfile import plasma def random_object_id(): - return "".join([chr(random.randint(0, 256)) for _ in range(20)]) + return "".join([chr(random.randint(0, 255)) for _ in range(20)]) class TestPlasmaClient(unittest.TestCase): @@ -64,9 +65,6 @@ class TestPlasmaClient(unittest.TestCase): class TestPlasmaManager(unittest.TestCase): def setUp(self): - # Start the nameserver. - nameserver_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "nameserver.py") - self.p1 = subprocess.Popen(["python", nameserver_path]) # Start two PlasmaStores. plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/plasma_store") self.p2 = subprocess.Popen([plasma_store_executable, "-s", "/tmp/store1"]) @@ -75,36 +73,19 @@ class TestPlasmaManager(unittest.TestCase): self.client1 = plasma.PlasmaClient("/tmp/store1") self.client2 = plasma.PlasmaClient("/tmp/store2") # Start two PlasmaManagers. - time.sleep(0.1) + self.port1 = random.randint(10000, 50000) + self.port2 = random.randint(10000, 50000) plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/plasma_manager") - self.p4 = subprocess.Popen([plasma_manager_executable, "-n", "127.0.0.1:16121", "-s", "/tmp/store1", "-m", "127.0.0.1"]) - self.p5 = subprocess.Popen([plasma_manager_executable, "-n", "127.0.0.1:16121", "-s", "/tmp/store2", "-m", "127.0.0.1"]) + self.p4 = subprocess.Popen([plasma_manager_executable, "-s", "/tmp/store1", "-m", "127.0.0.1", "-p", str(self.port1)]) + self.p5 = subprocess.Popen([plasma_manager_executable, "-s", "/tmp/store2", "-m", "127.0.0.1", "-p", str(self.port2)]) time.sleep(0.1) - # Connect to the nameserver. - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(("127.0.0.1", 16121)) - # Get the port for the first PlasmaManager. - req = plasma.PlasmaRequest(type=plasma.PLASMA_GET_MANAGER_PORT, manager_id=0) - sock.send(buffer(req)[:]) - request = plasma.PlasmaRequest() - sock.recv_into(request) - port1 = request.port - time.sleep(0.1) - # Get the port for the second PlasmaManager. - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(("127.0.0.1", 16121)) - req = plasma.PlasmaRequest(type=plasma.PLASMA_GET_MANAGER_PORT, manager_id=1) - sock.send(buffer(req)[:]) - request = plasma.PlasmaRequest() - sock.recv_into(request) - port2 = request.port # Connect two Python PlasmaManagers. - self.manager1 = plasma.PlasmaManager("127.0.0.1", port1) - self.manager2 = plasma.PlasmaManager("127.0.0.1", port2) + self.manager1 = plasma.PlasmaManager("127.0.0.1", self.port1) + self.manager2 = plasma.PlasmaManager("127.0.0.1", self.port2) + time.sleep(0.5) def tearDown(self): # Kill the nameserver, PlasmaStore and PlasmaManager processes. - self.p1.kill() self.p2.kill() self.p3.kill() self.p4.kill() @@ -120,11 +101,11 @@ class TestPlasmaManager(unittest.TestCase): # Seal the buffer. self.client1.seal(object_id1) # Transfer the buffer to the the other PlasmaStore. - self.manager1.transfer(1, object_id1) + self.manager1.transfer("127.0.0.1", self.port2, object_id1) # Compare the two buffers. self.assertEqual(self.client1.get(object_id1)[:], self.client2.get(object_id1)[:]) # Transfer the buffer again. - self.manager1.transfer(1, object_id1) + self.manager1.transfer("127.0.0.1", self.port2, object_id1) # Compare the two buffers. self.assertEqual(self.client1.get(object_id1)[:], self.client2.get(object_id1)[:]) # Create a new object id string. @@ -136,7 +117,7 @@ class TestPlasmaManager(unittest.TestCase): # Seal the buffer. self.client2.seal(object_id2) # Transfer the buffer to the the other PlasmaStore. - self.manager2.transfer(0, object_id2) + self.manager2.transfer("127.0.0.1", self.port1, object_id2) # Compare the two buffers. self.assertEqual(self.client1.get(object_id2)[:], self.client2.get(object_id2)[:])