From 986ed5c9e86cf8763b52fd93ea10d2075d283e6c Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sun, 13 Nov 2016 16:23:28 -0800 Subject: [PATCH] Plasma C extensions (#34) * switch plasma from ctypes to python C API * clang-format * various fixes --- build.sh | 8 +- doc/install-on-macosx.md | 2 +- doc/install-on-ubuntu.md | 2 +- install-dependencies.sh | 2 +- lib/python/ray/worker.py | 10 +- lib/python/setup.py | 2 +- src/common/lib/python/common_module.c | 1 - src/plasma/CMakeLists.txt | 71 ++++++ src/plasma/lib/python/plasma.py | 131 ++--------- src/plasma/lib/python/setup.py | 23 ++ src/plasma/plasma_client.c | 12 +- src/plasma/plasma_client.h | 25 +++ src/plasma/plasma_extension.c | 302 ++++++++++++++++++++++++++ src/plasma/plasma_store.c | 15 +- src/plasma/test/test.py | 5 +- 15 files changed, 481 insertions(+), 130 deletions(-) create mode 100644 src/plasma/CMakeLists.txt create mode 100644 src/plasma/lib/python/setup.py create mode 100644 src/plasma/plasma_extension.c diff --git a/build.sh b/build.sh index 9685457a7..5fe6559d6 100755 --- a/build.sh +++ b/build.sh @@ -34,11 +34,15 @@ cp "$COMMON_DIR/thirdparty/redis-3.2.3/src/redis-server" "$PYTHON_COMMON_DIR/thi pushd "$PLASMA_DIR" make make test + pushd "$PLASMA_DIR/build" + cmake .. + make install + popd popd cp "$PLASMA_DIR/build/plasma_store" "$PYTHON_PLASMA_DIR/build/" cp "$PLASMA_DIR/build/plasma_manager" "$PYTHON_PLASMA_DIR/build/" -cp "$PLASMA_DIR/build/plasma_client.so" "$PYTHON_PLASMA_DIR/build/" cp "$PLASMA_DIR/lib/python/plasma.py" "$PYTHON_PLASMA_DIR/lib/python/" +cp "$PLASMA_DIR/lib/python/libplasma.so" "$PYTHON_PLASMA_DIR/lib/python/" pushd "$PHOTON_DIR" make @@ -47,5 +51,5 @@ pushd "$PHOTON_DIR" make install popd popd -cp "$PHOTON_DIR/build/photon_scheduler" "$PYTHON_PHOTON_DIR/build" +cp "$PHOTON_DIR/build/photon_scheduler" "$PYTHON_PHOTON_DIR/build/" cp "$PHOTON_DIR/photon/libphoton.so" "$PYTHON_PHOTON_DIR/" diff --git a/doc/install-on-macosx.md b/doc/install-on-macosx.md index 89a863724..721c9f4e3 100644 --- a/doc/install-on-macosx.md +++ b/doc/install-on-macosx.md @@ -14,7 +14,7 @@ sudo easy_install pip # If you're using Anaconda, then this is unnecessary. pip install numpy funcsigs colorama psutil redis --ignore-installed six pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples. -pip install --upgrade --verbose git+git://github.com/ray-project/numbuf.git@d1974afbab9f0f1bcf8af15a8c476d868ad31aff +pip install --upgrade --verbose git+git://github.com/ray-project/numbuf.git@488f881d708bc54e86ed375ee97aa94540808fa1 ``` # Install Ray diff --git a/doc/install-on-ubuntu.md b/doc/install-on-ubuntu.md index 8c47a66e8..625fed7da 100644 --- a/doc/install-on-ubuntu.md +++ b/doc/install-on-ubuntu.md @@ -14,7 +14,7 @@ sudo apt-get install -y cmake build-essential autoconf curl libtool python-dev p pip install numpy funcsigs colorama psutil redis pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples. -pip install --upgrade --verbose git+git://github.com/ray-project/numbuf.git@d1974afbab9f0f1bcf8af15a8c476d868ad31aff +pip install --upgrade --verbose git+git://github.com/ray-project/numbuf.git@488f881d708bc54e86ed375ee97aa94540808fa1 ``` # Install Ray diff --git a/install-dependencies.sh b/install-dependencies.sh index 109f2b860..a9045801a 100755 --- a/install-dependencies.sh +++ b/install-dependencies.sh @@ -37,4 +37,4 @@ elif [[ $platform == "macosx" ]]; then fi sudo pip install --upgrade git+git://github.com/cloudpipe/cloudpickle.git@0d225a4695f1f65ae1cbb2e0bbc145e10167cce4 # We use the latest version of cloudpickle because it can serialize named tuples. -sudo pip install --upgrade --verbose git+git://github.com/ray-project/numbuf.git@d1974afbab9f0f1bcf8af15a8c476d868ad31aff +sudo pip install --upgrade --verbose git+git://github.com/ray-project/numbuf.git@488f881d708bc54e86ed375ee97aa94540808fa1 diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index c7dce49f4..7f304cce3 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -29,11 +29,11 @@ WORKER_MODE = 1 PYTHON_MODE = 2 SILENT_MODE = 3 -def random_object_id(): - return photon.ObjectID("".join([chr(random.randint(0, 255)) for _ in range(20)])) - def random_string(): - return "".join([chr(random.randint(0, 255)) for _ in range(20)]) + return np.random.bytes(20) + +def random_object_id(): + return photon.ObjectID(random_string()) class FunctionID(object): def __init__(self, function_id): @@ -418,7 +418,7 @@ class Worker(object): # Serialize and put the object in the object store. schema, size, serialized = numbuf_serialize(value) size = size + 4096 * 4 + 8 # The last 8 bytes are for the metadata offset. This is temporary. - buff = self.plasma_client.create(objectid.id(), size, buffer(schema)) + buff = self.plasma_client.create(objectid.id(), size, bytearray(schema)) data = np.frombuffer(buff.buffer, dtype="byte")[8:] metadata_offset = numbuf.write_to_buffer(serialized, memoryview(data)) np.frombuffer(buff.buffer, dtype="int64", count=1)[0] = metadata_offset diff --git a/lib/python/setup.py b/lib/python/setup.py index e23a2a7d7..d17ace9a2 100644 --- a/lib/python/setup.py +++ b/lib/python/setup.py @@ -19,7 +19,7 @@ setup(name="ray", package_data={"common": ["thirdparty/redis-3.2.3/src/redis-server"], "plasma": ["build/plasma_store", "build/plasma_manager", - "build/plasma_client.so"], + "lib/python/libplasma.so"], "photon": ["build/photon_scheduler", "libphoton.so"]}, cmdclass={"install": install}, diff --git a/src/common/lib/python/common_module.c b/src/common/lib/python/common_module.c index d5222cd87..a32f508a6 100644 --- a/src/common/lib/python/common_module.c +++ b/src/common/lib/python/common_module.c @@ -1,5 +1,4 @@ #include -#include "node.h" #include "common_extension.h" diff --git a/src/plasma/CMakeLists.txt b/src/plasma/CMakeLists.txt new file mode 100644 index 000000000..a8f1faca7 --- /dev/null +++ b/src/plasma/CMakeLists.txt @@ -0,0 +1,71 @@ +cmake_minimum_required(VERSION 2.8) + +project(plasma) + +if(NOT APPLE) + find_package(PythonInterp REQUIRED) + find_package(PythonLibs REQUIRED) + set(CUSTOM_PYTHON_EXECUTABLE ${PYTHON_EXECUTABLE}) +else() + find_program(CUSTOM_PYTHON_EXECUTABLE python) + message("-- Found Python program: ${CUSTOM_PYTHON_EXECUTABLE}") + execute_process(COMMAND ${CUSTOM_PYTHON_EXECUTABLE} -c + "import sys; print 'python' + sys.version[0:3]" + OUTPUT_VARIABLE PYTHON_LIBRARY_NAME OUTPUT_STRIP_TRAILING_WHITESPACE) + execute_process(COMMAND ${CUSTOM_PYTHON_EXECUTABLE} -c + "import sys; print sys.exec_prefix" + OUTPUT_VARIABLE PYTHON_PREFIX OUTPUT_STRIP_TRAILING_WHITESPACE) + FIND_LIBRARY(PYTHON_LIBRARIES + NAMES ${PYTHON_LIBRARY_NAME} + HINTS "${PYTHON_PREFIX}" + PATH_SUFFIXES "lib" "libs" + NO_DEFAULT_PATH) + execute_process(COMMAND ${CUSTOM_PYTHON_EXECUTABLE} -c + "from distutils.sysconfig import *; print get_python_inc()" + OUTPUT_VARIABLE PYTHON_INCLUDE_DIRS OUTPUT_STRIP_TRAILING_WHITESPACE) + if(PYTHON_LIBRARIES AND PYTHON_INCLUDE_DIRS) + SET(PYTHONLIBS_FOUND TRUE) + message("-- Found PythonLibs: " ${PYTHON_LIBRARIES}) + message("-- -- Used custom search path") + else() + find_package(PythonLibs REQUIRED) + message("-- -- Used find_package(PythonLibs)") + endif() +endif() + +if(APPLE) + SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so") +endif(APPLE) + +include_directories("${PYTHON_INCLUDE_DIRS}") + +set(CMAKE_C_FLAGS "${CMAKE_CXX_FLAGS} --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L") + +if (UNIX AND NOT APPLE) + link_libraries(rt) +endif() + +set(COMMON_LIB "${CMAKE_SOURCE_DIR}/../common/build/libcommon.a" CACHE STRING + "Path to libcommon.a") + +include_directories("${CMAKE_SOURCE_DIR}/") +include_directories("${CMAKE_SOURCE_DIR}/../") +include_directories("${CMAKE_SOURCE_DIR}/../common/") +include_directories("${CMAKE_SOURCE_DIR}/../common/thirdparty/") +include_directories("${CMAKE_SOURCE_DIR}/../common/lib/python/") + +add_library(plasma SHARED + plasma_extension.c + plasma_client.c + fling.c) + +get_filename_component(PYTHON_SHARED_LIBRARY ${PYTHON_LIBRARIES} NAME) +if(APPLE) + add_custom_command(TARGET plasma + POST_BUILD COMMAND + ${CMAKE_INSTALL_NAME_TOOL} -change ${PYTHON_SHARED_LIBRARY} ${PYTHON_LIBRARIES} libplasma.so) +endif(APPLE) + +target_link_libraries(plasma ${COMMON_LIB} ${PYTHON_LIBRARIES}) + +install(TARGETS plasma DESTINATION ${CMAKE_SOURCE_DIR}/lib/python) diff --git a/src/plasma/lib/python/plasma.py b/src/plasma/lib/python/plasma.py index 5e3a51e5d..3390013e5 100644 --- a/src/plasma/lib/python/plasma.py +++ b/src/plasma/lib/python/plasma.py @@ -1,25 +1,12 @@ -import ctypes import os import random import socket import subprocess import time - -Addr = ctypes.c_ubyte * 4 +import libplasma PLASMA_ID_SIZE = 20 -ID = ctypes.c_ubyte * PLASMA_ID_SIZE - -class PlasmaID(ctypes.Structure): - _fields_ = [("plasma_id", ID)] - -def make_plasma_id(string): - if len(string) != PLASMA_ID_SIZE: - raise Exception("PlasmaIDs must be {} characters long".format(PLASMA_ID_SIZE)) - return PlasmaID(plasma_id=ID.from_buffer_copy(string)) - -def plasma_id_to_str(plasma_id): - return str(bytearray(plasma_id.plasma_id)) +PLASMA_WAIT_TIMEOUT = 2 ** 36 class PlasmaBuffer(object): """This is the type of objects returned by calls to get with a PlasmaClient. @@ -47,7 +34,7 @@ class PlasmaBuffer(object): If the plasma client has been shut down, then don't do anything. """ if self.plasma_client.alive: - self.plasma_client.client.plasma_release(self.plasma_client.plasma_conn, self.plasma_id) + libplasma.release(self.plasma_client.conn, self.plasma_id) def __getitem__(self, index): """Read from the PlasmaBuffer as if it were just a regular buffer.""" @@ -80,33 +67,11 @@ class PlasmaClient(object): manager_socket_name (str): Name of the socket the plasma manager is listening at. """ self.alive = True - plasma_client_library = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/plasma_client.so") - self.client = ctypes.cdll.LoadLibrary(plasma_client_library) - - self.client.plasma_connect.restype = ctypes.c_void_p - self.client.plasma_create.restype = None - self.client.plasma_get.restype = None - self.client.plasma_release.restype = None - self.client.plasma_contains.restype = None - self.client.plasma_seal.restype = None - self.client.plasma_delete.restype = None - self.client.plasma_subscribe.restype = ctypes.c_int - self.client.plasma_wait.restype = ctypes.c_int - - self.buffer_from_memory = ctypes.pythonapi.PyBuffer_FromMemory - self.buffer_from_memory.argtypes = [ctypes.c_void_p, ctypes.c_int64] - self.buffer_from_memory.restype = ctypes.py_object - - self.buffer_from_read_write_memory = ctypes.pythonapi.PyBuffer_FromReadWriteMemory - self.buffer_from_read_write_memory.argtypes = [ctypes.c_void_p, ctypes.c_int64] - self.buffer_from_read_write_memory.restype = ctypes.py_object if manager_socket_name is not None: - self.has_manager_conn = True - self.plasma_conn = ctypes.c_void_p(self.client.plasma_connect(store_socket_name, manager_socket_name, release_delay)) + self.conn = libplasma.connect(store_socket_name, manager_socket_name, release_delay) else: - self.has_manager_conn = False - self.plasma_conn = ctypes.c_void_p(self.client.plasma_connect(store_socket_name, None, release_delay)) + self.conn = libplasma.connect(store_socket_name, "", release_delay) def shutdown(self): """Shutdown the client so that it does not send messages. @@ -115,6 +80,8 @@ class PlasmaClient(object): to, then we can use this method to prevent the client from trying to send messages to the killed processes. """ + if self.alive: + libplasma.disconnect(self.conn) self.alive = False def create(self, object_id, size, metadata=None): @@ -128,13 +95,10 @@ class PlasmaClient(object): metadata (buffer): An optional buffer encoding whatever metadata the user wishes to encode. """ - # This is used to hold the address of the buffer. - data = ctypes.c_void_p() # Turn the metadata into the right type. - metadata = buffer("") if metadata is None else metadata - metadata = (ctypes.c_ubyte * len(metadata)).from_buffer_copy(metadata) - self.client.plasma_create(self.plasma_conn, make_plasma_id(object_id), size, ctypes.cast(metadata, ctypes.POINTER(ctypes.c_ubyte * len(metadata))), len(metadata), ctypes.byref(data)) - return PlasmaBuffer(self.buffer_from_read_write_memory(data, size), make_plasma_id(object_id), self) + metadata = bytearray("") if metadata is None else metadata + buff = libplasma.create(self.conn, object_id, size, metadata) + return PlasmaBuffer(buff, object_id, self) def get(self, object_id): """Create a buffer from the PlasmaStore based on object ID. @@ -145,12 +109,8 @@ class PlasmaClient(object): Args: object_id (str): A string used to identify an object. """ - size = ctypes.c_int64() - data = ctypes.c_void_p() - metadata_size = ctypes.c_int64() - metadata = ctypes.c_void_p() - self.client.plasma_get(self.plasma_conn, make_plasma_id(object_id), ctypes.byref(size), ctypes.byref(data), ctypes.byref(metadata_size), ctypes.byref(metadata)) - return PlasmaBuffer(self.buffer_from_memory(data, size), make_plasma_id(object_id), self) + buff = libplasma.get(self.conn, object_id)[0] + return PlasmaBuffer(buff, object_id, self) def get_metadata(self, object_id): """Create a buffer from the PlasmaStore based on object ID. @@ -161,12 +121,8 @@ class PlasmaClient(object): Args: object_id (str): A string used to identify an object. """ - size = ctypes.c_int64() - data = ctypes.c_void_p() - metadata_size = ctypes.c_int64() - metadata = ctypes.c_void_p() - self.client.plasma_get(self.plasma_conn, make_plasma_id(object_id), ctypes.byref(size), ctypes.byref(data), ctypes.byref(metadata_size), ctypes.byref(metadata)) - return PlasmaBuffer(self.buffer_from_memory(metadata, metadata_size), make_plasma_id(object_id), self) + buff = libplasma.get(self.conn, object_id)[1] + return PlasmaBuffer(buff, object_id, self) def contains(self, object_id): """Check if the object is present and has been sealed in the PlasmaStore. @@ -174,15 +130,7 @@ class PlasmaClient(object): Args: object_id (str): A string used to identify an object. """ - has_object = ctypes.c_int() - self.client.plasma_contains(self.plasma_conn, make_plasma_id(object_id), ctypes.byref(has_object)) - has_object = has_object.value - if has_object == 1: - return True - elif has_object == 0: - return False - else: - raise Exception("This code should be unreachable.") + return libplasma.contains(self.conn, object_id) def seal(self, object_id): """Seal the buffer in the PlasmaStore for a particular object ID. @@ -193,7 +141,7 @@ class PlasmaClient(object): Args: object_id (str): A string used to identify an object. """ - self.client.plasma_seal(self.plasma_conn, make_plasma_id(object_id)) + libplasma.seal(self.conn, object_id) def delete(self, object_id): """Delete the buffer in the PlasmaStore for a particular object ID. @@ -203,7 +151,7 @@ class PlasmaClient(object): Args: object_id (str): A string used to identify an object. """ - self.client.plasma_delete(self.plasma_conn, make_plasma_id(object_id)) + libplasma.delete(self.conn, object_id) def evict(self, num_bytes): """Evict some objects until to recover some bytes. @@ -213,8 +161,7 @@ class PlasmaClient(object): Args: num_bytes (int): The number of bytes to attempt to recover. """ - num_bytes_evicted = self.client.plasma_evict(self.plasma_conn, num_bytes) - return num_bytes_evicted + return libplasma.evict(self.conn, num_bytes) def transfer(self, addr, port, object_id): """Transfer local object with id object_id to another plasma instance @@ -224,9 +171,7 @@ class PlasmaClient(object): port (int): Port number of the plasma instance the object is sent to. object_id (str): A string used to identify an object. """ - if not self.has_manager_conn: - raise Exception("Not connected to the plasma manager socket") - self.client.plasma_transfer(self.plasma_conn, addr, port, make_plasma_id(object_id)) + return libplasma.transfer(self.conn, object_id, addr, port) def fetch(self, object_ids): """Fetch the object with id object_id from another plasma manager instance. @@ -234,19 +179,9 @@ class PlasmaClient(object): Args: object_id (str): A string used to identify an object. """ - object_id_array = (len(object_ids) * PlasmaID)() - for i, object_id in enumerate(object_ids): - object_id_array[i] = make_plasma_id(object_id) - success_array = (len(object_ids) * ctypes.c_int)() - if not self.has_manager_conn: - raise Exception("Not connected to the plasma manager socket") - self.client.plasma_fetch(self.plasma_conn, - object_id_array._length_, - object_id_array, - success_array); - return [bool(success) for success in success_array] + return libplasma.fetch(self.conn, object_ids) - def wait(self, object_ids, timeout, num_returns): + def wait(self, object_ids, timeout=PLASMA_WAIT_TIMEOUT, num_returns=1): """Wait until num_returns objects in object_ids are ready. Args: @@ -258,30 +193,12 @@ class PlasmaClient(object): ready_ids, waiting_ids (List[str], List[str]): List of object IDs that are ready and list of object IDs we might still wait on respectively. """ - if not self.has_manager_conn: - raise Exception("Not connected to the plasma manager socket") - if num_returns < 0: - raise Exception("The argument num_returns cannot be less than one.") - if num_returns > len(object_ids): - raise Exception("The argument num_returns cannot be greater than len(object_ids): num_returns is {}, len(object_ids) is {}.".format(num_returns, len(object_ids))) - if timeout > 2 ** 36: - raise Exception("The method wait currently cannot be used with a timeout greater than 2 ** 36.") - object_id_array = (len(object_ids) * PlasmaID)() - for i, object_id in enumerate(object_ids): - object_id_array[i] = make_plasma_id(object_id) - return_id_array = (num_returns * PlasmaID)() - num_return_objects = self.client.plasma_wait(self.plasma_conn, - object_id_array._length_, - object_id_array, - ctypes.c_int64(timeout), - num_returns, - return_id_array) - ready_ids = map(plasma_id_to_str, return_id_array[num_returns-num_return_objects:]) - return ready_ids, list(set(object_ids) - set(ready_ids)) + ready_ids, waiting_ids = libplasma.wait(self.conn, object_ids, timeout, num_returns) + return ready_ids, list(waiting_ids) def subscribe(self): """Subscribe to notifications about sealed objects.""" - fd = self.client.plasma_subscribe(self.plasma_conn) + fd = libplasma.subscribe(self.conn) self.notification_sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM) # Make the socket non-blocking. self.notification_sock.setblocking(0) diff --git a/src/plasma/lib/python/setup.py b/src/plasma/lib/python/setup.py new file mode 100644 index 000000000..c02f4d220 --- /dev/null +++ b/src/plasma/lib/python/setup.py @@ -0,0 +1,23 @@ +from setuptools import setup, find_packages +import setuptools.command.install as _install + +import subprocess + +class install(_install.install): + def run(self): + subprocess.check_call(["make"], cwd="../../") + subprocess.check_call(["cmake", ".."], cwd="../../build") + subprocess.check_call(["make", "install"], cwd="../../build") + # Calling _install.install.run(self) does not fetch required packages and + # instead performs an old-style install. See command/install.py in + # setuptools. So, calling do_egg_install() manually here. + self.do_egg_install() + +setup(name="Plasma", + version="0.0.1", + description="Plasma client for Python", + packages=find_packages(), + package_data={"plasma": ["libplasma.so"]}, + cmdclass={"install": install}, + include_package_data=True, + zip_safe=False) diff --git a/src/plasma/plasma_client.c b/src/plasma/plasma_client.c index 695ae2c60..9e1d547e3 100644 --- a/src/plasma/plasma_client.c +++ b/src/plasma/plasma_client.c @@ -394,18 +394,22 @@ plasma_connection *plasma_connect(const char *store_socket_name, } void plasma_disconnect(plasma_connection *conn) { - close(conn->store_conn); - if (conn->manager_conn >= 0) { - close(conn->manager_conn); - } object_id *id = NULL; while ((id = (object_id *) utringbuffer_next(conn->release_history, id))) { plasma_perform_release(conn, *id); } utringbuffer_free(conn->release_history); + close(conn->store_conn); + if (conn->manager_conn >= 0) { + close(conn->manager_conn); + } free(conn); } +bool plasma_manager_is_connected(plasma_connection *conn) { + return conn->manager_conn >= 0; +} + #define h_addr h_addr_list[0] int plasma_manager_try_connect(const char *ip_addr, int port) { diff --git a/src/plasma/plasma_client.h b/src/plasma/plasma_client.h index d163495cb..ac167f872 100644 --- a/src/plasma/plasma_client.h +++ b/src/plasma/plasma_client.h @@ -1,6 +1,8 @@ #ifndef PLASMA_CLIENT_H #define PLASMA_CLIENT_H +#include + #include "plasma.h" #define PLASMA_DEFAULT_RELEASE_DELAY 64 @@ -74,6 +76,14 @@ plasma_connection *plasma_connect(const char *store_socket_name, */ void plasma_disconnect(plasma_connection *conn); +/** + * Return true if the plasma manager is connected. + * + * @param conn The connection to the local plasma store and plasma manager. + * @return True if the plasma manager is connected and false otherwise. + */ +bool plasma_manager_is_connected(plasma_connection *conn); + /** * Try to connect to a possibly remote Plasma Manager. * @@ -209,6 +219,21 @@ void plasma_fetch(plasma_connection *conn, object_id object_ids[], int is_fetched[]); +/** + * Transfer local object to a different plasma manager. + * + * @param conn The object containing the connection state. + * @param addr IP address of the plasma manager we are transfering to. + * @param port Port of the plasma manager we are transfering to. + * @object_id ObjectID of the object we are transfering. + * + * @return Void. + */ +void plasma_transfer(plasma_connection *conn, + const char *addr, + int port, + object_id object_id); + /** * Wait for objects to be created (right now, wait for local objects). * diff --git a/src/plasma/plasma_extension.c b/src/plasma/plasma_extension.c new file mode 100644 index 000000000..8bc9b527c --- /dev/null +++ b/src/plasma/plasma_extension.c @@ -0,0 +1,302 @@ +#include + +#include "common.h" +#include "plasma_client.h" + +static int PyObjectToPlasmaConnection(PyObject *object, + plasma_connection **conn) { + if (PyCapsule_IsValid(object, "plasma")) { + *conn = (plasma_connection *) PyCapsule_GetPointer(object, "plasma"); + return 1; + } else { + PyErr_SetString(PyExc_TypeError, "must be a 'plasma' capsule"); + return 0; + } +} + +static int PyObjectToUniqueID(PyObject *object, object_id *object_id) { + if (PyString_Check(object)) { + memcpy(&object_id->id[0], PyString_AsString(object), UNIQUE_ID_SIZE); + return 1; + } else { + PyErr_SetString(PyExc_TypeError, "must be a 20 character string"); + return 0; + } +} + +PyObject *PyPlasma_connect(PyObject *self, PyObject *args) { + const char *store_socket_name; + const char *manager_socket_name; + int release_delay; + if (!PyArg_ParseTuple(args, "ssi", &store_socket_name, &manager_socket_name, + &release_delay)) { + return NULL; + } + plasma_connection *conn; + if (strlen(manager_socket_name) == 0) { + conn = plasma_connect(store_socket_name, NULL, release_delay); + } else { + conn = + plasma_connect(store_socket_name, manager_socket_name, release_delay); + } + return PyCapsule_New(conn, "plasma", NULL); +} + +PyObject *PyPlasma_disconnect(PyObject *self, PyObject *args) { + plasma_connection *conn; + if (!PyArg_ParseTuple(args, "O&", PyObjectToPlasmaConnection, &conn)) { + return NULL; + } + plasma_disconnect(conn); + Py_RETURN_NONE; +} + +PyObject *PyPlasma_create(PyObject *self, PyObject *args) { + plasma_connection *conn; + object_id object_id; + long long size; + PyObject *metadata; + if (!PyArg_ParseTuple(args, "O&O&LO", PyObjectToPlasmaConnection, &conn, + PyObjectToUniqueID, &object_id, &size, &metadata)) { + return NULL; + } + if (!PyByteArray_Check(metadata)) { + PyErr_SetString(PyExc_TypeError, "metadata must be a bytearray"); + return NULL; + } + uint8_t *data; + plasma_create(conn, object_id, size, + (uint8_t *) PyByteArray_AsString(metadata), + PyByteArray_Size(metadata), &data); + return PyBuffer_FromReadWriteMemory((void *) data, (Py_ssize_t) size); +} + +PyObject *PyPlasma_seal(PyObject *self, PyObject *args) { + plasma_connection *conn; + object_id object_id; + if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaConnection, &conn, + PyObjectToUniqueID, &object_id)) { + return NULL; + } + plasma_seal(conn, object_id); + Py_RETURN_NONE; +} + +PyObject *PyPlasma_release(PyObject *self, PyObject *args) { + plasma_connection *conn; + object_id object_id; + if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaConnection, &conn, + PyObjectToUniqueID, &object_id)) { + return NULL; + } + plasma_release(conn, object_id); + Py_RETURN_NONE; +} + +PyObject *PyPlasma_get(PyObject *self, PyObject *args) { + plasma_connection *conn; + object_id object_id; + if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaConnection, &conn, + PyObjectToUniqueID, &object_id)) { + return NULL; + } + int64_t size; + uint8_t *data; + int64_t metadata_size; + uint8_t *metadata; + plasma_get(conn, object_id, &size, &data, &metadata_size, &metadata); + PyObject *t = PyTuple_New(2); + PyTuple_SetItem(t, 0, PyBuffer_FromMemory((void *) data, (Py_ssize_t) size)); + PyTuple_SetItem(t, 1, PyByteArray_FromStringAndSize( + (void *) metadata, (Py_ssize_t) metadata_size)); + return t; +} + +PyObject *PyPlasma_contains(PyObject *self, PyObject *args) { + plasma_connection *conn; + object_id object_id; + if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaConnection, &conn, + PyObjectToUniqueID, &object_id)) { + return NULL; + } + int has_object; + plasma_contains(conn, object_id, &has_object); + + if (has_object) + Py_RETURN_TRUE; + else + Py_RETURN_FALSE; +} + +PyObject *PyPlasma_fetch(PyObject *self, PyObject *args) { + plasma_connection *conn; + PyObject *object_id_list; + if (!PyArg_ParseTuple(args, "O&O", PyObjectToPlasmaConnection, &conn, + &object_id_list)) { + return NULL; + } + + if (!plasma_manager_is_connected(conn)) { + PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager"); + return NULL; + } + + Py_ssize_t n = PyList_Size(object_id_list); + object_id *object_ids = malloc(sizeof(object_id) * n); + for (int i = 0; i < n; ++i) { + PyObjectToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]); + } + int *success_array = malloc(sizeof(int) * n); + plasma_fetch(conn, (int) n, object_ids, success_array); + PyObject *success_list = PyList_New(n); + for (int i = 0; i < n; ++i) { + if (success_array[i]) { + Py_INCREF(Py_True); + PyList_SetItem(success_list, i, Py_True); + } else { + Py_INCREF(Py_False); + PyList_SetItem(success_list, i, Py_False); + } + } + free(object_ids); + free(success_array); + return success_list; +} + +PyObject *PyPlasma_wait(PyObject *self, PyObject *args) { + plasma_connection *conn; + PyObject *object_id_list; + long long timeout; + int num_returns; + if (!PyArg_ParseTuple(args, "O&OLi", PyObjectToPlasmaConnection, &conn, + &object_id_list, &timeout, &num_returns)) { + return NULL; + } + Py_ssize_t n = PyList_Size(object_id_list); + + if (!plasma_manager_is_connected(conn)) { + PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager"); + return NULL; + } + if (num_returns < 0) { + PyErr_SetString(PyExc_RuntimeError, + "The argument num_returns cannot be less than zero."); + return NULL; + } + if (num_returns > n) { + PyErr_SetString( + PyExc_RuntimeError, + "The argument num_returns cannot be greater than len(object_ids)"); + return NULL; + } + + object_id *object_ids = malloc(sizeof(object_id) * n); + for (int i = 0; i < n; ++i) { + PyObjectToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]); + } + object_id *return_ids = malloc(sizeof(object_id) * num_returns); + + /* Drop the global interpreter lock while we are waiting, so other threads can + * run. */ + int num_return_objects; + Py_BEGIN_ALLOW_THREADS; + num_return_objects = plasma_wait(conn, (int) n, object_ids, + (uint64_t) timeout, num_returns, return_ids); + Py_END_ALLOW_THREADS; + + PyObject *ready_ids = PyList_New(num_return_objects); + PyObject *waiting_ids = PySet_New(object_id_list); + for (int i = num_returns - num_return_objects; i < num_returns; ++i) { + PyObject *ready = + PyString_FromStringAndSize((char *) return_ids[i].id, UNIQUE_ID_SIZE); + PyList_SetItem(ready_ids, i - (num_returns - num_return_objects), ready); + PySet_Discard(waiting_ids, ready); + } + PyObject *t = PyTuple_New(2); + PyTuple_SetItem(t, 0, ready_ids); + PyTuple_SetItem(t, 1, waiting_ids); + return t; +} + +PyObject *PyPlasma_evict(PyObject *self, PyObject *args) { + plasma_connection *conn; + long long num_bytes; + if (!PyArg_ParseTuple(args, "O&L", PyObjectToPlasmaConnection, &conn, + &num_bytes)) { + return NULL; + } + int64_t evicted_bytes = plasma_evict(conn, (int64_t) num_bytes); + return PyInt_FromLong((long) evicted_bytes); +} + +PyObject *PyPlasma_delete(PyObject *self, PyObject *args) { + plasma_connection *conn; + object_id object_id; + if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaConnection, &conn, + PyObjectToUniqueID, &object_id)) { + return NULL; + } + plasma_delete(conn, object_id); + Py_RETURN_NONE; +} + +PyObject *PyPlasma_transfer(PyObject *self, PyObject *args) { + plasma_connection *conn; + object_id object_id; + const char *addr; + int port; + if (!PyArg_ParseTuple(args, "O&O&si", PyObjectToPlasmaConnection, &conn, + PyObjectToUniqueID, &object_id, &addr, &port)) { + return NULL; + } + + if (!plasma_manager_is_connected(conn)) { + PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager"); + return NULL; + } + + plasma_transfer(conn, addr, port, object_id); + Py_RETURN_NONE; +} + +PyObject *PyPlasma_subscribe(PyObject *self, PyObject *args) { + plasma_connection *conn; + if (!PyArg_ParseTuple(args, "O&", PyObjectToPlasmaConnection, &conn)) { + return NULL; + } + int sock = plasma_subscribe(conn); + return PyInt_FromLong(sock); +} + +static PyMethodDef plasma_methods[] = { + {"connect", PyPlasma_connect, METH_VARARGS, "Connect to plasma."}, + {"disconnect", PyPlasma_disconnect, METH_VARARGS, + "Disconnect from plasma."}, + {"create", PyPlasma_create, METH_VARARGS, "Create a new plasma object."}, + {"seal", PyPlasma_seal, METH_VARARGS, "Seal a plasma object."}, + {"get", PyPlasma_get, METH_VARARGS, "Get a plasma object."}, + {"contains", PyPlasma_contains, METH_VARARGS, + "Does the plasma store contain this plasma object?"}, + {"fetch", PyPlasma_fetch, METH_VARARGS, + "Fetch the object from another plasma manager instance."}, + {"wait", PyPlasma_wait, METH_VARARGS, + "Wait until num_returns objects in object_ids are ready."}, + {"evict", PyPlasma_evict, METH_VARARGS, + "Evict some objects until we recover some number of bytes."}, + {"release", PyPlasma_release, METH_VARARGS, "Release the plasma object."}, + {"delete", PyPlasma_delete, METH_VARARGS, "Deleta a plasma object."}, + {"transfer", PyPlasma_transfer, METH_VARARGS, + "Transfer object to another plasma manager."}, + {"subscribe", PyPlasma_subscribe, METH_VARARGS, + "Subscribe to the plasma notification socket."}, + {NULL} /* Sentinel */ +}; + +#ifndef PyMODINIT_FUNC /* declarations for DLL import/export */ +#define PyMODINIT_FUNC void +#endif + +PyMODINIT_FUNC initlibplasma(void) { + Py_InitModule3("libplasma", plasma_methods, + "A Python client library for plasma"); +} diff --git a/src/plasma/plasma_store.c b/src/plasma/plasma_store.c index f3fa6830a..ed250b94a 100644 --- a/src/plasma/plasma_store.c +++ b/src/plasma/plasma_store.c @@ -91,8 +91,13 @@ struct plasma_store_state { plasma_store_info *plasma_store_info; /** The state that is managed by the eviction policy. */ eviction_state *eviction_state; + /** Input buffer. This is allocated only once to avoid mallocs for every + * call to process_message. */ + UT_array *input_buffer; }; +UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL}; + plasma_store_state *init_plasma_store(event_loop *loop, int64_t system_memory) { plasma_store_state *state = malloc(sizeof(plasma_store_state)); state->loop = loop; @@ -103,6 +108,7 @@ plasma_store_state *init_plasma_store(event_loop *loop, int64_t system_memory) { state->plasma_store_info->objects = NULL; /* Initialize the eviction state. */ state->eviction_state = make_eviction_state(system_memory); + utarray_new(state->input_buffer, &byte_icd); return state; } @@ -424,10 +430,11 @@ void process_message(event_loop *loop, void *context, int events) { client *client_context = context; + plasma_store_state *state = client_context->plasma_state; int64_t type; - int64_t length; - plasma_request *req; - read_message(client_sock, &type, &length, (uint8_t **) &req); + read_buffer(client_sock, &type, state->input_buffer); + plasma_request *req = (plasma_request *) utarray_front(state->input_buffer); + /* We're only sending a single object ID at a time for now. */ plasma_reply reply; memset(&reply, 0, sizeof(reply)); @@ -495,8 +502,6 @@ void process_message(event_loop *loop, /* This code should be unreachable. */ CHECK(0); } - - free(req); } void new_client_connection(event_loop *loop, diff --git a/src/plasma/test/test.py b/src/plasma/test/test.py index 9c0d01c12..c721ef143 100644 --- a/src/plasma/test/test.py +++ b/src/plasma/test/test.py @@ -11,6 +11,7 @@ import random import time import tempfile import threading +import numpy as np import plasma @@ -18,7 +19,7 @@ USE_VALGRIND = False PLASMA_STORE_MEMORY = 1000000000 def random_object_id(): - return "".join([chr(random.randint(0, 255)) for _ in range(plasma.PLASMA_ID_SIZE)]) + return np.random.bytes(20) def generate_metadata(length): metadata = length * ["\x00"] @@ -27,7 +28,7 @@ def generate_metadata(length): metadata[-1] = chr(random.randint(0, 255)) for _ in range(100): metadata[random.randint(0, length - 1)] = chr(random.randint(0, 255)) - return buffer("".join(metadata)) + return bytearray("".join(metadata)) def write_to_data_buffer(buff, length): if length > 0: