diff --git a/lib/python/plasma.py b/lib/python/plasma.py index 83de5f1b8..28b3f086d 100644 --- a/lib/python/plasma.py +++ b/lib/python/plasma.py @@ -16,6 +16,8 @@ PLASMA_SEAL = 2 PLASMA_TRANSFER = 3 PLASMA_DATA = 4 PLASMA_REGISTER = 5 +PLASMA_GET_MANAGER_PORT = 6 +PLASMA_RETURN_MANAGER_PORT = 7 class PlasmaRequest(ctypes.Structure): _fields_ = [("type", ctypes.c_int), @@ -38,24 +40,43 @@ def make_plasma_id(string): return PlasmaID(plasma_id=ID(*object_id)) class PlasmaManager(object): + """The PlasmaManager is used to manage a PlasmaStore. + + There should be one PlasmaManager per PlasmaStore. The PlasmaManager is + responsible for interfacing with other PlasmaManagers in order to transfer + objects between PlasmaStores. This class sends commands to the C + implementation of the PlasmaManager using sockets. + + Attributes: + sock: The socket used to communicate with the C implementation of the + PlasmaManager. + """ def __init__(self, addr, port): + """Initialize the PlasmaManager.""" self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((addr, port)) def register(self, manager_id, addr, port): - "Register another object manager." + """Register another object manager.""" req = PlasmaRequest(type=PLASMA_REGISTER, manager_id=manager_id, addr=Addr(*map(int, addr.split("."))), port=port) self.sock.send(buffer(req)[:]) def transfer(self, manager_id, object_id): - "Transfer local object with id object_id to manager with id manager_id." + """Transfer local object with id object_id to manager with id manager_id.""" req = PlasmaRequest(type=PLASMA_TRANSFER, manager_id=manager_id, object_id=make_plasma_id(object_id)) self.sock.send(buffer(req)[:]) class PlasmaClient(object): + """The PlasmaClient is used to interface with a PlasmaStore. + + The PlasmaClient can ask the PlasmaStore to allocate a new buffer, seal a + buffer, and get a buffer. Buffers are referred to by object IDs, which are + strings. + """ + def __init__(self, socket_name): plasma_client_library = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/plasma_client.so") self.client = ctypes.cdll.LoadLibrary(plasma_client_library) @@ -82,12 +103,36 @@ class PlasmaClient(object): self.sock = self.client.plasma_store_connect(socket_name) def create(self, object_id, size): + """Create a new buffer in the PlasmaStore for a particular object ID. + + The returned buffer is mutable until seal is called. + + Args: + object_id (str): A string used to identify an object. + size (int): The size in bytes of the created buffer. + """ buf = self.client.plasma_create(self.sock, make_plasma_id(object_id), size) return self.buffer_from_read_write_memory(buf.data, buf.size) def get(self, object_id): + """Create a buffer from the PlasmaStore based on object ID. + + This method can only be called after the buffer has been sealed. The + retrieved buffer is immutable. + + Args: + object_id (str): A string used to identify an object. + """ buf = self.client.plasma_get(self.sock, make_plasma_id(object_id)) return self.buffer_from_memory(buf.data, buf.size) def seal(self, object_id): + """Seal the buffer in the PlasmaStore for a particular object ID. + + Once a buffer has been sealed, the buffer is immutable and can only be + accessed through get. + + Args: + object_id (str): A string used to identify an object. + """ self.client.plasma_seal(self.sock, make_plasma_id(object_id)) diff --git a/src/plasma_manager.c b/src/plasma_manager.c index 7ad2585b6..b439fcf35 100644 --- a/src/plasma_manager.c +++ b/src/plasma_manager.c @@ -128,7 +128,7 @@ void initiate_transfer(plasma_manager_state* state, plasma_request* req) { int manager_id = req->manager_id; int c = plasma_store_connect(state->store_socket_name); plasma_buffer buf = plasma_get(c, req->object_id); - + int fd = socket(PF_INET, SOCK_STREAM, 0); if (fd < 0) { LOG_ERR("could not create socket"); @@ -274,7 +274,7 @@ void event_loop(int sock, plasma_manager_state* state) { LOG_INFO("new connection with id %d", conn_id); } else { read_from_socket(state, i, &req); - } + } } } } @@ -346,8 +346,6 @@ void start_server(const char *store_socket_name, const char* master_addr, event_loop(sock, &state); } - - int main(int argc, char* argv[]) { // Socket name of the plasma store this manager is connected to. char *store_socket_name = NULL; @@ -384,7 +382,7 @@ int main(int argc, char* argv[]) { const char *format = "%15[0-9.]:%5[0-9]"; char nameserver_addr[16] = { 0 }; char nameserver_port[6] = { 0 }; - if(!nameserver_addr_port || sscanf(nameserver_addr_port, format, nameserver_addr, nameserver_port) != 2) { + if (!nameserver_addr_port || sscanf(nameserver_addr_port, format, nameserver_addr, nameserver_port) != 2) { LOG_ERR("need to specify nameserver address in the format 123.456.789.10:12345 with -n switch"); exit(-1); } diff --git a/test/nameserver.py b/test/nameserver.py index 6fcd965ca..66fa6cf52 100644 --- a/test/nameserver.py +++ b/test/nameserver.py @@ -1,6 +1,8 @@ import collections import socket import ctypes +import atexit + import plasma DEFAULT_PORT = 16121 @@ -15,19 +17,30 @@ def send_addresses(conn, object_managers): for (manager_id, object_manager) in enumerate(object_managers): manager.register(manager_id, object_manager.address, object_manager.port) -if __name__ == '__main__': +if __name__ == "__main__": sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(('', DEFAULT_PORT)) + sock.bind(("", DEFAULT_PORT)) sock.listen(5) - + + def cleanup(): + sock.shutdown(socket.SHUT_RDWR) + sock.close() + atexit.register(cleanup) + while True: - (client, address) = sock.accept() + client, address = sock.accept() request = plasma.PlasmaRequest() client.recv_into(request) - address = ".".join(map(str, request.addr[:])) - conn = Connection(address=address, port=request.port) - print "object manager " + str(conn) + " connected" - object_managers.append(conn) - for c in object_managers: - send_addresses(c, object_managers) - + if request.type == plasma.PLASMA_REGISTER: + address = ".".join(map(str, request.addr[:])) + conn = Connection(address=address, port=request.port) + print "object manager " + str(conn) + " connected" + object_managers.append(conn) + for c in object_managers: + send_addresses(c, object_managers) + elif request.type == plasma.PLASMA_GET_MANAGER_PORT: + port = object_managers[request.manager_id].port + req = plasma.PlasmaRequest(type=plasma.PLASMA_RETURN_MANAGER_PORT, port=port) + client.send(buffer(req)[:]) + else: + raise Exception("This code should be unreachable.") diff --git a/test/test.py b/test/test.py index 6d4bee003..ba9789e81 100644 --- a/test/test.py +++ b/test/test.py @@ -1,11 +1,17 @@ import os +import socket import subprocess import sys import unittest +import random +import time import plasma -class TestPlasmaAPI(unittest.TestCase): +def random_object_id(): + return "".join([chr(random.randint(0, 256)) for _ in range(20)]) + +class TestPlasmaClient(unittest.TestCase): def setUp(self): # Start Plasma. @@ -15,12 +21,12 @@ class TestPlasmaAPI(unittest.TestCase): self.plasma_client = plasma.PlasmaClient("/tmp/store") def tearDown(self): - # Kill the plasma stoe process. + # Kill the plasma store process. self.p.kill() def test_create(self): - # Create an object string. - object_id = "id" + 18 * "x" + # Create an object id string. + object_id = random_object_id() # Create a new buffer and write to it. length = 1000 memory_buffer = self.plasma_client.create(object_id, length) @@ -34,8 +40,8 @@ class TestPlasmaAPI(unittest.TestCase): self.assertEqual(memory_buffer[i], chr(i % 256)) def test_illegal_functionality(self): - # Create an object string. - object_id = "id" + 18 * "x" + # Create an object id string. + object_id = random_object_id() # Create a new buffer and write to it. length = 1000 memory_buffer = self.plasma_client.create(object_id, length) @@ -55,5 +61,93 @@ class TestPlasmaAPI(unittest.TestCase): memory_buffer[0] = chr(0) self.assertRaises(Exception, illegal_assignment) +class TestPlasmaManager(unittest.TestCase): + + def setUp(self): + # Start the nameserver. + nameserver_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "nameserver.py") + self.p1 = subprocess.Popen(["python", nameserver_path]) + # Start two PlasmaStores. + plasma_store_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/plasma_store") + self.p2 = subprocess.Popen([plasma_store_executable, "-s", "/tmp/store1"]) + self.p3 = subprocess.Popen([plasma_store_executable, "-s", "/tmp/store2"]) + # Connect two PlasmaClients. + self.client1 = plasma.PlasmaClient("/tmp/store1") + self.client2 = plasma.PlasmaClient("/tmp/store2") + # Start two PlasmaManagers. + time.sleep(0.1) + plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/plasma_manager") + self.p4 = subprocess.Popen([plasma_manager_executable, "-n", "127.0.0.1:16121", "-s", "/tmp/store1", "-m", "127.0.0.1"]) + self.p5 = subprocess.Popen([plasma_manager_executable, "-n", "127.0.0.1:16121", "-s", "/tmp/store2", "-m", "127.0.0.1"]) + time.sleep(0.1) + # Connect to the nameserver. + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(("127.0.0.1", 16121)) + # Get the port for the first PlasmaManager. + req = plasma.PlasmaRequest(type=plasma.PLASMA_GET_MANAGER_PORT, manager_id=0) + sock.send(buffer(req)[:]) + request = plasma.PlasmaRequest() + sock.recv_into(request) + port1 = request.port + time.sleep(0.1) + # Get the port for the second PlasmaManager. + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(("127.0.0.1", 16121)) + req = plasma.PlasmaRequest(type=plasma.PLASMA_GET_MANAGER_PORT, manager_id=1) + sock.send(buffer(req)[:]) + request = plasma.PlasmaRequest() + sock.recv_into(request) + port2 = request.port + # Connect two Python PlasmaManagers. + self.manager1 = plasma.PlasmaManager("127.0.0.1", port1) + self.manager2 = plasma.PlasmaManager("127.0.0.1", port2) + + def tearDown(self): + # Kill the nameserver, PlasmaStore and PlasmaManager processes. + self.p1.kill() + self.p2.kill() + self.p3.kill() + self.p4.kill() + self.p5.kill() + + def test_transfer(self): + # Create an object id string. + object_id1 = random_object_id() + # Create a new buffer and write to it. + memory_buffer = self.client1.create(object_id1, 20000) + for i in range(len(memory_buffer)): + memory_buffer[i] = chr(i % 10) + # Seal the buffer. + self.client1.seal(object_id1) + # Transfer the buffer to the the other PlasmaStore. + self.manager1.transfer(1, object_id1) + # Compare the two buffers. + self.assertEqual(self.client1.get(object_id1)[:], self.client2.get(object_id1)[:]) + # Transfer the buffer again. + self.manager1.transfer(1, object_id1) + # Compare the two buffers. + self.assertEqual(self.client1.get(object_id1)[:], self.client2.get(object_id1)[:]) + # Create a new object id string. + object_id2 = random_object_id() + # Create a new buffer and write to it. + memory_buffer = self.client2.create(object_id2, 20000) + for i in range(len(memory_buffer)): + memory_buffer[i] = chr(i % 10) + # Seal the buffer. + self.client2.seal(object_id2) + # Transfer the buffer to the the other PlasmaStore. + self.manager2.transfer(0, object_id2) + # Compare the two buffers. + self.assertEqual(self.client1.get(object_id2)[:], self.client2.get(object_id2)[:]) + + def test_illegal_functionality(self): + # Create an object id string. + object_id = random_object_id() + # Create a new buffer. + memory_buffer = self.client1.create(object_id, 20000) + # This test is commented out because it currently fails. + # # Transferring the buffer before sealing it should fail. + # self.assertRaises(Exception, lambda : self.manager1.transfer(1, object_id)) + if __name__ == "__main__": unittest.main(verbosity=2)