diff --git a/.gitmodules b/.gitmodules index 4a575a0e8..65a586d5a 100644 --- a/.gitmodules +++ b/.gitmodules @@ -3,4 +3,7 @@ url = https://github.com/grpc/grpc [submodule "thirdparty/numbuf"] path = thirdparty/numbuf - url = https://github.com/amplab/numbuf.git + url = https://github.com/pcmoritz/numbuf.git +[submodule "thirdparty/numbuf-old"] + path = thirdparty/numbuf-old + url = https://github.com/amplab/numbuf diff --git a/CMakeLists.txt b/CMakeLists.txt index c5921f72d..501021c0d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -127,27 +127,35 @@ if(APPLE) SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so") endif(APPLE) -set(ARROW_LIB ${CMAKE_SOURCE_DIR}/thirdparty/arrow/cpp/build/release/libarrow.a) +set(ARROW_LIB ${CMAKE_SOURCE_DIR}/thirdparty/arrow-old/cpp/build/release/libarrow.a) add_definitions(-fPIC) -include_directories("${CMAKE_SOURCE_DIR}/thirdparty/arrow/cpp/src/") -include_directories("${CMAKE_SOURCE_DIR}/thirdparty/numbuf/cpp/src/") -include_directories("${CMAKE_SOURCE_DIR}/thirdparty/numbuf/python/src/") -add_library(pynumbuf STATIC ${CMAKE_SOURCE_DIR}/thirdparty/numbuf/cpp/src/numbuf/tensor.cc - ${CMAKE_SOURCE_DIR}/thirdparty/numbuf/cpp/src/numbuf/types.cc - ${CMAKE_SOURCE_DIR}/thirdparty/numbuf/cpp/src/numbuf/metadata.cc - ${CMAKE_SOURCE_DIR}/thirdparty/numbuf/cpp/src/numbuf/dict.cc - ${CMAKE_SOURCE_DIR}/thirdparty/numbuf/python/src/pynumbuf/serialize.cc - ${CMAKE_SOURCE_DIR}/thirdparty/numbuf/python/src/pynumbuf/adapters/numpy.cc - ${CMAKE_SOURCE_DIR}/thirdparty/numbuf/python/src/pynumbuf/adapters/python.cc) -target_link_libraries(pynumbuf ${ARROW_LIB} ${PYTHON_LIBRARIES}) + +if(NOT APPLE) + include_directories("${CMAKE_SOURCE_DIR}/thirdparty/arrow-old/cpp/src/") + include_directories("${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/cpp/src/") + include_directories("${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/python/src/") + add_library(pynumbuf STATIC ${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/cpp/src/numbuf/tensor.cc + ${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/cpp/src/numbuf/types.cc + ${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/cpp/src/numbuf/metadata.cc + ${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/cpp/src/numbuf/dict.cc + ${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/python/src/pynumbuf/serialize.cc + ${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/python/src/pynumbuf/adapters/numpy.cc + ${CMAKE_SOURCE_DIR}/thirdparty/numbuf-old/python/src/pynumbuf/adapters/python.cc) + target_link_libraries(pynumbuf ${ARROW_LIB} ${PYTHON_LIBRARIES}) +endif() add_executable(objstore src/objstore.cc src/ipc.cc src/utils.cc ${GENERATED_PROTOBUF_FILES}) -target_link_libraries(objstore ${ARROW_LIB} pynumbuf) +if(NOT APPLE) + target_link_libraries(objstore ${ARROW_LIB} pynumbuf) +endif() add_executable(scheduler src/scheduler.cc src/computation_graph.cc src/utils.cc ${GENERATED_PROTOBUF_FILES}) -target_link_libraries(scheduler) add_library(raylib SHARED src/raylib.cc src/worker.cc src/ipc.cc src/utils.cc ${GENERATED_PROTOBUF_FILES}) -target_link_libraries(raylib ${ARROW_LIB} pynumbuf) +if(NOT APPLE) + target_link_libraries(raylib ${ARROW_LIB} pynumbuf) +else() + target_link_libraries(raylib ${PYTHON_LIBRARIES}) +endif() get_filename_component(PYTHON_SHARED_LIBRARY ${PYTHON_LIBRARIES} NAME) if(APPLE) diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index 19fd68338..19ebe5cc7 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -15,6 +15,7 @@ import serialization import ray.internal.graph_pb2 import ray.graph import services +import libnumbuf class RayFailedObject(object): """An object used internally to represent a task that threw an exception. @@ -286,9 +287,28 @@ class Worker(object): objref (ray.ObjRef): The object reference of the value to be put. value (serializable object): The value to put in the object store. """ - if serialization.is_arrow_serializable(value): - ray.lib.put_arrow(self.handle, objref, value) - else: + try: + # We put the value into a list here because in arrow the concept of + # "serializing a single object" does not exits. + schema, size, serialized = libnumbuf.serialize_list([value]) + # TODO(pcm): Right now, metadata is serialized twice, change that in the future + # in the following line, the "8" is for storing the metadata size, + # the len(schema) is for storing the metadata and the 4096 is for storing + # the metadata in the batch (see INITIAL_METADATA_SIZE in arrow) + size = size + 8 + len(schema) + 4096 + buff, segmentid = ray.lib.allocate_buffer(self.handle, objref, size) + # write the metadata length + np.frombuffer(buff, dtype="int64", count=1)[0] = len(schema) + # metadata buffer + metadata = np.frombuffer(buff, dtype="byte", offset=8, count=len(schema)) + # write the metadata + metadata[:] = schema + data = np.frombuffer(buff, dtype="byte")[8 + len(schema):] + metadata_offset = libnumbuf.write_to_buffer(serialized, memoryview(data)) + ray.lib.finish_buffer(self.handle, objref, segmentid, metadata_offset) + except: + # At the moment, custom object and objects that contain object references take this path + # TODO(pcm): Make sure that these are the only objects getting serialized to protobuf object_capsule, contained_objrefs = serialization.serialize(self.handle, value) # contained_objrefs is a list of the objrefs contained in object_capsule ray.lib.put_object(self.handle, objref, object_capsule, contained_objrefs) @@ -302,10 +322,22 @@ class Worker(object): objref (ray.ObjRef): The object reference of the value to retrieve. """ if ray.lib.is_arrow(self.handle, objref): - result, segmentid = ray.lib.get_arrow(self.handle, objref) + ## this is the new codepath + buff, segmentid, metadata_offset = ray.libraylib.get_buffer(self.handle, objref) + metadata_size = np.frombuffer(buff, dtype="int64", count=1)[0] + metadata = np.frombuffer(buff, dtype="byte", offset=8, count=metadata_size) + data = np.frombuffer(buff, dtype="byte")[8 + metadata_size:] + serialized = libnumbuf.read_from_buffer(memoryview(data), bytearray(metadata), metadata_offset) + deserialized = libnumbuf.deserialize_list(serialized) + # Unwrap the object from the list (it was wrapped put_object) + assert len(deserialized) == 1 + result = deserialized[0] + ## this is the old codepath + # result, segmentid = ray.lib.get_arrow(self.handle, objref) else: object_capsule, segmentid = ray.lib.get_object(self.handle, objref) result = serialization.deserialize(self.handle, object_capsule) + if isinstance(result, int): result = serialization.Int(result) elif isinstance(result, long): diff --git a/setup-env.sh b/setup-env.sh index 235f3bd52..bbfeec16f 100755 --- a/setup-env.sh +++ b/setup-env.sh @@ -4,4 +4,4 @@ echo "Adding Ray to PYTHONPATH" 1>&2 ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) -export PYTHONPATH="$ROOT_DIR/lib/python/:$ROOT_DIR/thirdparty/numbuf/python" +export PYTHONPATH="$ROOT_DIR/lib/python/:$ROOT_DIR/thirdparty/numbuf/build" diff --git a/src/ipc.cc b/src/ipc.cc index 999801f91..8c2ad08ea 100644 --- a/src/ipc.cc +++ b/src/ipc.cc @@ -7,12 +7,16 @@ #include #include "ray/ray.h" -using namespace arrow; +#ifndef __APPLE__ + using namespace arrow; +#endif ObjHandle::ObjHandle(SegmentId segmentid, size_t size, IpcPointer ipcpointer, size_t metadata_offset) : segmentid_(segmentid), size_(size), ipcpointer_(ipcpointer), metadata_offset_(metadata_offset) {} +#ifndef __APPLE__ + Status BufferMemorySource::Write(int64_t position, const uint8_t* data, int64_t nbytes) { // TODO(pcm): error handling std::memcpy(data_ + position, data, nbytes); @@ -33,6 +37,8 @@ int64_t BufferMemorySource::Size() const { return size_; } +#endif + MessageQueue<>::MessageQueue() : create_(false) { } MessageQueue<>::~MessageQueue() { diff --git a/src/ipc.h b/src/ipc.h index 24f535408..8f9ee431a 100644 --- a/src/ipc.h +++ b/src/ipc.h @@ -23,8 +23,10 @@ namespace boost { #include #include -#include -#include +#ifndef __APPLE__ + #include + #include +#endif #include "ray/ray.h" @@ -108,6 +110,8 @@ private: size_t metadata_offset_; // offset of the metadata that describes this object }; +#ifndef __APPLE__ + class BufferMemorySource: public arrow::ipc::MemorySource { public: BufferMemorySource(uint8_t* data, int64_t capacity) : data_(data), capacity_(capacity), size_(0) {} @@ -121,6 +125,8 @@ public: int64_t size_; }; +#endif + // Memory segment pool: A collection of shared memory segments // used in two modes: // \item on the object store it is used with create = true, in this case the diff --git a/src/raylib.cc b/src/raylib.cc index f5f4e1545..5aee44d4c 100644 --- a/src/raylib.cc +++ b/src/raylib.cc @@ -4,9 +4,11 @@ #include #include -#define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API +#define PY_ARRAY_UNIQUE_SYMBOL RAYLIB_ARRAY_API #include -#include +#ifndef __APPLE__ + #include +#endif #include #include "types.pb.h" @@ -480,6 +482,7 @@ static PyObject* serialize_object(PyObject* self, PyObject* args) { return t; } +#ifndef __APPLE__ static PyObject* put_arrow(PyObject* self, PyObject* args) { Worker* worker; ObjRef objref; @@ -494,6 +497,7 @@ static PyObject* put_arrow(PyObject* self, PyObject* args) { Py_XDECREF(array); // GETCONTIGUOUS from above returned a new reference Py_RETURN_NONE; } +#endif static PyObject* allocate_buffer(PyObject* self, PyObject* args) { Worker* worker; @@ -527,18 +531,22 @@ static PyObject* get_buffer(PyObject* self, PyObject* args) { ObjRef objref; int64_t size; SegmentId segmentid; + int64_t metadata_offset; if (!PyArg_ParseTuple(args, "O&O&", &PyObjectToWorker, &worker, &PyObjectToObjRef, &objref)) { return NULL; } - void* address = reinterpret_cast(const_cast(worker->get_buffer(objref, size, segmentid))); + void* address = reinterpret_cast(const_cast(worker->get_buffer(objref, size, segmentid, metadata_offset))); std::vector dim({size}); - PyObject* t = PyTuple_New(2); + PyObject* t = PyTuple_New(3); PyTuple_SetItem(t, 0, PyArray_SimpleNewFromData(1, dim.data(), NPY_BYTE, address)); PyTuple_SetItem(t, 1, PyInt_FromLong(segmentid)); + PyTuple_SetItem(t, 2, PyInt_FromLong(metadata_offset)); return t; } +#ifndef __APPLE__ + static PyObject* get_arrow(PyObject* self, PyObject* args) { Worker* worker; ObjRef objref; @@ -553,6 +561,8 @@ static PyObject* get_arrow(PyObject* self, PyObject* args) { return val_and_segmentid; } +#endif + static PyObject* is_arrow(PyObject* self, PyObject* args) { Worker* worker; ObjRef objref; @@ -999,11 +1009,15 @@ static PyObject* kill_workers(PyObject* self, PyObject* args) { static PyMethodDef RayLibMethods[] = { { "serialize_object", serialize_object, METH_VARARGS, "serialize an object to protocol buffers" }, { "deserialize_object", deserialize_object, METH_VARARGS, "deserialize an object from protocol buffers" }, +#ifndef __APPLE__ { "put_arrow", put_arrow, METH_VARARGS, "put an arrow array on the local object store"}, +#endif { "allocate_buffer", allocate_buffer, METH_VARARGS, "Allocates and returns buffer for objref."}, { "finish_buffer", finish_buffer, METH_VARARGS, "Makes the buffer immutable and closes memory segment of objref."}, { "get_buffer", get_buffer, METH_VARARGS, "Gets buffer for objref"}, +#ifndef __APPLE__ { "get_arrow", get_arrow, METH_VARARGS, "get an arrow array from the local object store"}, +#endif { "is_arrow", is_arrow, METH_VARARGS, "is the object in the local object store an arrow object?"}, { "unmap_object", unmap_object, METH_VARARGS, "unmap the object from the client's shared memory pool"}, { "serialize_task", serialize_task, METH_VARARGS, "serialize a task to protocol buffers" }, diff --git a/src/worker.cc b/src/worker.cc index a5a2fea40..6dc37b243 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -5,7 +5,9 @@ #include "utils.h" -#include +#ifndef __APPLE__ + #include +#endif extern "C" { static PyObject *RayError; @@ -194,6 +196,8 @@ void Worker::put_object(ObjRef objref, const Obj* obj, std::vector &cont } \ } while (0); +#ifndef __APPLE__ + PyObject* Worker::put_arrow(ObjRef objref, PyObject* value) { RAY_CHECK(connected_, "Attempted to perform put_arrow but failed."); ObjRequest request; @@ -221,6 +225,8 @@ PyObject* Worker::put_arrow(ObjRef objref, PyObject* value) { Py_RETURN_NONE; } +#endif + const char* Worker::allocate_buffer(ObjRef objref, int64_t size, SegmentId& segmentid) { RAY_CHECK(connected_, "Attempted to perform put_arrow but failed."); ObjRequest request; @@ -247,7 +253,7 @@ PyObject* Worker::finish_buffer(ObjRef objref, SegmentId segmentid, int64_t meta Py_RETURN_NONE; } -const char* Worker::get_buffer(ObjRef objref, int64_t &size, SegmentId& segmentid) { +const char* Worker::get_buffer(ObjRef objref, int64_t &size, SegmentId& segmentid, int64_t& metadata_offset) { RAY_CHECK(connected_, "Attempted to perform get_arrow but failed."); ObjRequest request; request.workerid = workerid_; @@ -259,9 +265,12 @@ const char* Worker::get_buffer(ObjRef objref, int64_t &size, SegmentId& segmenti const char* address = reinterpret_cast(segmentpool_->get_address(result)); size = result.size(); segmentid = result.segmentid(); + metadata_offset = result.metadata_offset(); return address; } +#ifndef __APPLE__ + // returns python list containing the value represented by objref and the // segmentid in which the object is stored PyObject* Worker::get_arrow(ObjRef objref, SegmentId& segmentid) { @@ -281,6 +290,8 @@ PyObject* Worker::get_arrow(ObjRef objref, SegmentId& segmentid) { return value; } +#endif + bool Worker::is_arrow(ObjRef objref) { RAY_CHECK(connected_, "Attempted to perform is_arrow but failed."); ObjRequest request; diff --git a/src/worker.h b/src/worker.h index 0f5a43c2e..a35c331ef 100644 --- a/src/worker.h +++ b/src/worker.h @@ -67,16 +67,20 @@ class Worker { void put_object(ObjRef objref, const Obj* obj, std::vector &contained_objrefs); // retrieve serialized object from local object store slice get_object(ObjRef objref); +#ifndef __APPLE__ // stores an arrow object to the local object store PyObject* put_arrow(ObjRef objref, PyObject* array); +#endif // Allocates buffer for objref with size of size const char* allocate_buffer(ObjRef objref, int64_t size, SegmentId& segmentid); // Finishes buffer with segmentid and an offset of metadata_ofset PyObject* finish_buffer(ObjRef objref, SegmentId segmentid, int64_t metadata_offset); // Gets the buffer for objref - const char* get_buffer(ObjRef objref, int64_t& size, SegmentId& segmentid); + const char* get_buffer(ObjRef objref, int64_t& size, SegmentId& segmentid, int64_t& metadata_offset); +#ifndef __APPLE__ // gets an arrow object from the local object store PyObject* get_arrow(ObjRef objref, SegmentId& segmentid); +#endif // determine if the object stored in objref is an arrow object // TODO(pcm): more general mechanism for this? bool is_arrow(ObjRef objref); // unmap the segment containing an object from the local address space diff --git a/thirdparty/build_thirdparty.sh b/thirdparty/build_thirdparty.sh index 3d23f8d0c..a3d011619 100755 --- a/thirdparty/build_thirdparty.sh +++ b/thirdparty/build_thirdparty.sh @@ -26,6 +26,22 @@ cd $TP_DIR/arrow/cpp/build cmake -DLIBARROW_LINKAGE=STATIC -DCMAKE_BUILD_TYPE=Release .. make VERBOSE=1 -j$PARALLEL +# TODO(pcm): Remove this +echo "building arrow (old version)" +cd $TP_DIR/arrow-old/cpp +source setup_build_env.sh +mkdir -p $TP_DIR/arrow-old/cpp/build +cd $TP_DIR/arrow-old/cpp/build +cmake -DLIBARROW_LINKAGE=STATIC -DCMAKE_BUILD_TYPE=Release .. +make VERBOSE=1 -j$PARALLEL + +echo "building numbuf" +cd $TP_DIR/numbuf +mkdir -p build +cd $TP_DIR/numbuf/build +cmake .. +make VERBOSE=1 -j$PARALLEL + echo "building GRPC" cd $TP_DIR/grpc make static HAS_SYSTEM_PROTOBUF=false HAS_SYSTEM_ZLIB=false HAS_SYSTEM_OPENSSL_ALPN=false HAS_SYSTEM_OPENSSL_NPN=false -j$PARALLEL diff --git a/thirdparty/download_thirdparty.sh b/thirdparty/download_thirdparty.sh index 07a63cd0b..b913d173a 100755 --- a/thirdparty/download_thirdparty.sh +++ b/thirdparty/download_thirdparty.sh @@ -9,6 +9,15 @@ if [ ! -d arrow ]; then echo "Fetching arrow" git clone https://github.com/pcmoritz/arrow.git cd arrow + git checkout scratch + cd .. +fi + +# TODO(pcm): Remove this +if [ ! -d arrow-old ]; then + echo "Fetching old version of arrow" + git clone https://github.com/pcmoritz/arrow.git arrow-old + cd arrow-old git checkout static cd .. fi diff --git a/thirdparty/numbuf b/thirdparty/numbuf index cacf53088..0dcfcfa2e 160000 --- a/thirdparty/numbuf +++ b/thirdparty/numbuf @@ -1 +1 @@ -Subproject commit cacf53088bb7bc99cd981c78148be0cc789a31bd +Subproject commit 0dcfcfa2e912f135c7bbae577856364c5aa26afe diff --git a/thirdparty/numbuf-old b/thirdparty/numbuf-old new file mode 160000 index 000000000..95f5fcfab --- /dev/null +++ b/thirdparty/numbuf-old @@ -0,0 +1 @@ +Subproject commit 95f5fcfab9ce6d439ed9d5e3ca1800c368a80d5e