diff --git a/CMakeLists.txt b/CMakeLists.txt index ce05bd9c3..e8bd351ab 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -77,9 +77,9 @@ if (UNIX AND NOT APPLE) endif() add_executable(objstore src/objstore.cc src/ipc.cc ${GENERATED_PROTOBUF_FILES}) -target_link_libraries(objstore arrow) +target_link_libraries(objstore arrow numbuf pynumbuf) add_executable(scheduler src/scheduler.cc ${GENERATED_PROTOBUF_FILES}) -add_library(orchpylib SHARED src/orchpylib.cc src/worker.cc src/ipc.cc src/serialize.cc ${GENERATED_PROTOBUF_FILES}) -target_link_libraries(orchpylib arrow) +add_library(orchpylib SHARED src/orchpylib.cc src/worker.cc src/ipc.cc ${GENERATED_PROTOBUF_FILES}) +target_link_libraries(orchpylib arrow numbuf pynumbuf) install(TARGETS objstore scheduler orchpylib DESTINATION ${CMAKE_SOURCE_DIR}/lib/orchpy/orchpy) diff --git a/README.md b/README.md index 60dcb392a..be044884c 100644 --- a/README.md +++ b/README.md @@ -39,10 +39,28 @@ For a description of our design decisions, see 8. `cd ..` 9. `python setup.py install` +**Install Numbuf** + +1. `git clone git@github.com:amplab/numbuf.git` +2. `cd numbuf/cpp/` +3. `mkdir build` +4. `cd build` +5. `cmake ..` +6. `sudo make install` +7. `cd ../..` +8. `cd python/` +9. `mkdir build` +10. `cd build` +11. `cmake ..` +12. `sudo make install` +13. `cd ..` +14. `sudo python setup.py install` + + **Install Orchestra** -1. `git clone git@github.com:amplab/orch.git` -2. `cd orch` +1. `git clone git@github.com:amplab/photon.git` +2. `cd photon` 3. `mkdir build` 4. `cd build` 5. `cmake ..` diff --git a/lib/orchpy/orchpy/worker.py b/lib/orchpy/orchpy/worker.py index 09709158b..33235c8c1 100644 --- a/lib/orchpy/orchpy/worker.py +++ b/lib/orchpy/orchpy/worker.py @@ -1,6 +1,7 @@ from types import ModuleType import typing import numpy as np +import pynumbuf import orchpy import serialization @@ -14,11 +15,11 @@ class Worker(object): def put_object(self, objref, value): """Put `value` in the local object store with objref `objref`. This assumes that the value for `objref` has not yet been placed in the local object store.""" - # if type(value) == np.ndarray: - # orchpy.lib.put_arrow(self.handle, objref, value) - # else: - object_capsule, contained_objrefs = serialization.serialize(self.handle, value) # contained_objrefs is a list of the objrefs contained in object_capsule - orchpy.lib.put_object(self.handle, objref, object_capsule, contained_objrefs) + if pynumbuf.serializable(value): + orchpy.lib.put_arrow(self.handle, objref, value) + else: + object_capsule, contained_objrefs = serialization.serialize(self.handle, value) # contained_objrefs is a list of the objrefs contained in object_capsule + orchpy.lib.put_object(self.handle, objref, object_capsule, contained_objrefs) def get_object(self, objref): """ @@ -27,11 +28,11 @@ class Worker(object): WARNING: get_object can only be called on a canonical objref. """ - # if orchpy.lib.is_arrow(self.handle, objref): - # return orchpy.lib.get_arrow(self.handle, objref) - # else: - object_capsule = orchpy.lib.get_object(self.handle, objref) - return serialization.deserialize(self.handle, object_capsule) + if orchpy.lib.is_arrow(self.handle, objref): + return orchpy.lib.get_arrow(self.handle, objref) + else: + object_capsule = orchpy.lib.get_object(self.handle, objref) + return serialization.deserialize(self.handle, object_capsule) def alias_objrefs(self, alias_objref, target_objref): """Make `alias_objref` refer to the same object that `target_objref` refers to.""" diff --git a/src/orchpylib.cc b/src/orchpylib.cc index 04c133954..58d8bccc4 100644 --- a/src/orchpylib.cc +++ b/src/orchpylib.cc @@ -4,7 +4,7 @@ #include #include -#define PY_ARRAY_UNIQUE_SYMBOL ORCHESTRA_ARRAY_API +#define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API #include #include #include @@ -12,8 +12,6 @@ #include "types.pb.h" #include "worker.h" -#include "serialize.h" - extern "C" { int PyObjectToWorker(PyObject* object, Worker **worker); @@ -459,12 +457,7 @@ PyObject* put_arrow(PyObject* self, PyObject* args) { if (!PyArg_ParseTuple(args, "O&O&O", &PyObjectToWorker, &worker, &PyObjectToObjRef, &objref, &value)) { return NULL; } - if (!PyArray_Check(value)) { - PyErr_SetString(PyExc_TypeError, "only support arrays at this point"); - return NULL; - } - PyArrayObject* array = PyArray_GETCONTIGUOUS((PyArrayObject*) value); - worker->put_arrow(objref, array); + worker->put_arrow(objref, value); Py_RETURN_NONE; } diff --git a/src/serialize.cc b/src/serialize.cc deleted file mode 100644 index 3fbf83fdf..000000000 --- a/src/serialize.cc +++ /dev/null @@ -1,201 +0,0 @@ -#include "serialize.h" - -using namespace arrow; - -template -struct npy_traits { -}; - -template <> -struct npy_traits { - typedef uint8_t value_type; - static const std::shared_ptr primitive_type; - using ArrayType = arrow::BooleanArray; -}; - -const std::shared_ptr npy_traits::primitive_type = std::make_shared(); - -#define NPY_INT_DECL(TYPE, CapType, T) \ - template <> \ - struct npy_traits { \ - typedef T value_type; \ - static const std::shared_ptr primitive_type; \ - using ArrayType = arrow::CapType##Array; \ - }; \ - \ - const std::shared_ptr npy_traits::primitive_type = std::make_shared(); - -NPY_INT_DECL(INT8, Int8, int8_t); -NPY_INT_DECL(INT16, Int16, int16_t); -NPY_INT_DECL(INT32, Int32, int32_t); -NPY_INT_DECL(INT64, Int64, int64_t); -NPY_INT_DECL(UINT8, UInt8, uint8_t); -NPY_INT_DECL(UINT16, UInt16, uint16_t); -NPY_INT_DECL(UINT32, UInt32, uint32_t); -NPY_INT_DECL(UINT64, UInt64, uint64_t); - -template <> -struct npy_traits { - typedef float value_type; - static const std::shared_ptr primitive_type; - using ArrayType = arrow::FloatArray; -}; - -const std::shared_ptr npy_traits::primitive_type = std::make_shared(); - -template <> -struct npy_traits { - typedef double value_type; - static const std::shared_ptr primitive_type; - using ArrayType = arrow::DoubleArray; -}; - -const std::shared_ptr npy_traits::primitive_type = std::make_shared(); - -template <> -struct npy_traits { - typedef PyObject* value_type; -}; - -template -std::shared_ptr make_flat_array(const std::string& fieldname, size_t size, std::shared_ptr data) { - auto field = std::make_shared(fieldname, npy_traits::primitive_type); - std::shared_ptr schema(new arrow::Schema({field})); - auto array = std::make_shared::ArrayType>(size, data); - return std::shared_ptr(new RowBatch(schema, size, {array})); -} - -const int64_t MAX_METADATA_SIZE = 5000; - -#define SIZE_ARROW_CASE(TYPE) \ - case TYPE: \ - return size * sizeof(npy_traits::value_type) + MAX_METADATA_SIZE; - -size_t arrow_size(PyArrayObject* array) { - npy_intp size = PyArray_SIZE(array); - switch (PyArray_TYPE(array)) { - SIZE_ARROW_CASE(NPY_INT8) - SIZE_ARROW_CASE(NPY_INT16) - SIZE_ARROW_CASE(NPY_INT32) - SIZE_ARROW_CASE(NPY_INT64) - SIZE_ARROW_CASE(NPY_UINT8) - SIZE_ARROW_CASE(NPY_UINT16) - SIZE_ARROW_CASE(NPY_UINT32) - SIZE_ARROW_CASE(NPY_UINT64) - SIZE_ARROW_CASE(NPY_FLOAT) - SIZE_ARROW_CASE(NPY_DOUBLE) - default: - ORCH_LOG(ORCH_FATAL, "serialization: numpy datatype not know"); - } -} - -#define SERIALIZE_ARROW_CASE(TYPE) \ - case TYPE: \ - { \ - data = std::make_shared(reinterpret_cast(PyArray_DATA(array)), sizeof(npy_traits::value_type) * size); \ - batch_size = size * sizeof(npy_traits::value_type) + MAX_METADATA_SIZE; \ - batch = make_flat_array("data", size, data); \ - } \ - break; - -// TODO(pcm): At the moment, this assumes that arrays are consecutive in memory -void store_arrow(PyArrayObject* array, ObjHandle& location, MemorySegmentPool* pool) { - npy_intp size = PyArray_SIZE(array); - std::shared_ptr data; - std::shared_ptr batch; - int64_t batch_size = 0; - switch (PyArray_TYPE(array)) { - SERIALIZE_ARROW_CASE(NPY_INT8) - SERIALIZE_ARROW_CASE(NPY_INT16) - SERIALIZE_ARROW_CASE(NPY_INT32) - SERIALIZE_ARROW_CASE(NPY_INT64) - SERIALIZE_ARROW_CASE(NPY_UINT8) - SERIALIZE_ARROW_CASE(NPY_UINT16) - SERIALIZE_ARROW_CASE(NPY_UINT32) - SERIALIZE_ARROW_CASE(NPY_UINT64) - SERIALIZE_ARROW_CASE(NPY_FLOAT) - SERIALIZE_ARROW_CASE(NPY_DOUBLE) - default: - ORCH_LOG(ORCH_FATAL, "serialization: numpy datatype not know"); - } - - // int64_t data_batch_size = ipc::GetRowBatchSize(batch.get()); // FIXME(pcm): once GetRowBatchSize is implemented, use it - - size_t ndim = PyArray_NDIM(array); - MemoryPool* default_pool = arrow::default_memory_pool(); - - auto metadata = std::make_shared(default_pool); - size_t metadata_size = 1 + ndim + 1; // dtype, list of shapes, pointer to header of the data segment - metadata->Resize(metadata_size * sizeof(int64_t)); - - int64_t* buffer = reinterpret_cast(metadata->mutable_data()); - buffer[0] = PyArray_TYPE(array); - // serialize the shape information - for (size_t i = 0; i < ndim; ++i) { - buffer[i+1] = PyArray_DIM(array, i); - } - std::shared_ptr metadata_batch = make_flat_array("metadata", metadata_size, metadata); - - // int64_t metadata_batch_size = ipc::GetRowBatchSize(metadata_batch.get()); // FIXME(pcm): once GetRowBatchSize is implemented, use it - - uint8_t* address = pool->get_address(location); - auto source = std::make_shared(address, location.size()); - - int64_t data_header_offset = 0; - ipc::WriteRowBatch(source.get(), batch.get(), 0, &data_header_offset); - - buffer[1 + ndim] = data_header_offset; - - int64_t metadata_header_offset = 0; - ipc::WriteRowBatch(source.get(), metadata_batch.get(), location.size() + MAX_METADATA_SIZE/2, &metadata_header_offset); - location.set_metadata_offset(metadata_header_offset); -} - -template -std::shared_ptr read_flat_array(BufferMemorySource* source, int64_t metadata_offset) { - std::shared_ptr reader; - Status s = ipc::RowBatchReader::Open(source, metadata_offset, &reader); - if (!s.ok()) { - ORCH_LOG(ORCH_FATAL, "Error in read_flat_array: " << s.ToString()); - } - auto field = std::make_shared("data", npy_traits::primitive_type); - std::shared_ptr schema(new arrow::Schema({field})); - std::shared_ptr data; - reader->GetRowBatch(schema, &data); - return data->column(0); - -} - -#define DESERIALIZE_ARROW_CASE(TYPE) \ - case TYPE: \ - { \ - auto array = read_flat_array(source.get(), buffer[metadata_array->length()-1]); \ - auto data_primitive_array = dynamic_cast::ArrayType*>(array.get()); \ - return PyArray_SimpleNewFromData(dims.size(), &dims[0], TYPE, (void*)data_primitive_array->raw_data()); \ - } - -PyObject* deserialize_array(ObjHandle handle, MemorySegmentPool* pool) { - auto source = std::make_shared(pool->get_address(handle), handle.size()); - auto metadata_array = read_flat_array(source.get(), handle.metadata_offset()); - const uint64_t* buffer = dynamic_cast(metadata_array.get())->raw_data(); - uint64_t type = buffer[0]; - std::vector dims; - for (int i = 1; i < metadata_array->length()-1; ++i) { - dims.push_back(buffer[i]); - } - - switch (type) { - DESERIALIZE_ARROW_CASE(NPY_INT8) - DESERIALIZE_ARROW_CASE(NPY_INT16) - DESERIALIZE_ARROW_CASE(NPY_INT32) - DESERIALIZE_ARROW_CASE(NPY_INT64) - DESERIALIZE_ARROW_CASE(NPY_UINT8) - DESERIALIZE_ARROW_CASE(NPY_UINT16) - DESERIALIZE_ARROW_CASE(NPY_UINT32) - DESERIALIZE_ARROW_CASE(NPY_UINT64) - DESERIALIZE_ARROW_CASE(NPY_FLOAT) - DESERIALIZE_ARROW_CASE(NPY_DOUBLE) - default: - ORCH_LOG(ORCH_FATAL, "deserialization: numpy datatype not know"); - } -} diff --git a/src/serialize.h b/src/serialize.h deleted file mode 100644 index efe01eb35..000000000 --- a/src/serialize.h +++ /dev/null @@ -1,21 +0,0 @@ -#ifndef ORCHESTRA_SERIALIZE_H -#define ORCHESTRA_SERIALIZE_H - -#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION - -#include -#include -#include -#include -#define NO_IMPORT_ARRAY -#define PY_ARRAY_UNIQUE_SYMBOL ORCHESTRA_ARRAY_API -#include -#include - -#include "ipc.h" - -size_t arrow_size(PyArrayObject* array); -void store_arrow(PyArrayObject* array, ObjHandle& location, MemorySegmentPool* pool); -PyObject* deserialize_array(ObjHandle handle, MemorySegmentPool* pool); - -#endif diff --git a/src/worker.cc b/src/worker.cc index 29719aaef..ec28aa56b 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -1,5 +1,11 @@ #include "worker.h" +#include + +extern "C" { + static PyObject *OrchPyError; +} + Status WorkerServiceImpl::InvokeCall(ServerContext* context, const InvokeCallRequest* request, InvokeCallReply* reply) { call_ = request->call(); // Copy call ORCH_LOG(ORCH_INFO, "invoked task " << request->call().name()); @@ -122,12 +128,25 @@ void Worker::put_object(ObjRef objref, const Obj* obj, std::vector &cont scheduler_stub_->AddContainedObjRefs(&context, contained_objrefs_request, &reply); } -void Worker::put_arrow(ObjRef objref, PyArrayObject* array) { +#define CHECK_ARROW_STATUS(s, msg) \ + do { \ + arrow::Status _s = (s); \ + if (!_s.ok()) { \ + std::string _errmsg = std::string(msg) + _s.ToString(); \ + PyErr_SetString(OrchPyError, _errmsg.c_str()); \ + return NULL; \ + } \ + } while (0); + +PyObject* Worker::put_arrow(ObjRef objref, PyObject* value) { if (!connected_) { ORCH_LOG(ORCH_FATAL, "Attempting to perform put_arrow, but connected_ = " << connected_ << "."); } ObjRequest request; - size_t size = arrow_size(array); + pynumbuf::PythonObjectWriter writer; + int64_t size; + CHECK_ARROW_STATUS(writer.AssemblePayload(value), "error during AssemblePayload: "); + CHECK_ARROW_STATUS(writer.GetTotalSize(&size), "error during GetTotalSize: "); request.workerid = workerid_; request.type = ObjRequestType::ALLOC; request.objref = objref; @@ -135,13 +154,17 @@ void Worker::put_arrow(ObjRef objref, PyArrayObject* array) { request_obj_queue_.send(&request); ObjHandle result; receive_obj_queue_.receive(&result); - store_arrow(array, result, segmentpool_.get()); + int64_t metadata_offset; + uint8_t* address = segmentpool_->get_address(result); + auto source = std::make_shared(address, size); + CHECK_ARROW_STATUS(writer.Write(source.get(), &metadata_offset), "error during Write: "); request.type = ObjRequestType::WORKER_DONE; - request.metadata_offset = result.metadata_offset(); + request.metadata_offset = metadata_offset; request_obj_queue_.send(&request); + Py_RETURN_NONE; } -PyArrayObject* Worker::get_arrow(ObjRef objref) { +PyObject* Worker::get_arrow(ObjRef objref) { if (!connected_) { ORCH_LOG(ORCH_FATAL, "Attempting to perform get_arrow, but connected_ = " << connected_ << "."); } @@ -152,7 +175,11 @@ PyArrayObject* Worker::get_arrow(ObjRef objref) { request_obj_queue_.send(&request); ObjHandle result; receive_obj_queue_.receive(&result); - return (PyArrayObject*)deserialize_array(result, segmentpool_.get()); + uint8_t* address = segmentpool_->get_address(result); + auto source = std::make_shared(address, result.size()); + PyObject* value; + CHECK_ARROW_STATUS(pynumbuf::ReadPythonObjectFrom(source.get(), result.metadata_offset(), &value), "error during ReadPythonObjectFrom: "); + return value; } bool Worker::is_arrow(ObjRef objref) { diff --git a/src/worker.h b/src/worker.h index 31f3bff18..3453de16a 100644 --- a/src/worker.h +++ b/src/worker.h @@ -8,6 +8,8 @@ #include +#include + using grpc::Server; using grpc::ServerBuilder; using grpc::ServerContext; @@ -16,7 +18,6 @@ using grpc::Status; #include "orchestra.grpc.pb.h" #include "orchestra/orchestra.h" #include "ipc.h" -#include "serialize.h" using grpc::Channel; using grpc::ClientContext; @@ -52,10 +53,9 @@ class Worker { // retrieve serialized object from local object store slice get_object(ObjRef objref); // stores an arrow object to the local object store - // FIXME(pcm): Once we have structs in arrow, get rid of the memcpy here - void put_arrow(ObjRef objref, PyArrayObject* array); + PyObject* put_arrow(ObjRef objref, PyObject* array); // gets an arrow object from the local object store - PyArrayObject* get_arrow(ObjRef objref); + PyObject* get_arrow(ObjRef objref); // determine if the object stored in objref is an arrow object // TODO(pcm): more general mechanism for this? bool is_arrow(ObjRef objref); // make `alias_objref` refer to the same object that `target_objref` refers to