From 6ffc849d23bb8f550bd3c6561e65c9056fd60ac3 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 10 Apr 2017 01:37:34 -0700 Subject: [PATCH] Use Arrow Tensors for serializing numpy arrays and get rid of extra memcpy. (#436) * Use Arrow Tensors for serializing numpy arrays and get rid of extra memcpy * fix nondeterminism problem * mark array as immutable * make arrays contiguous * fix serialize_list and deseralize_list * fix numbuf tests * linting * add optimization flags * fixes * roll back arrow --- src/numbuf/CMakeLists.txt | 13 +- src/numbuf/cpp/src/numbuf/sequence.cc | 64 ++------ src/numbuf/cpp/src/numbuf/sequence.h | 47 ++---- src/numbuf/cpp/src/numbuf/tensor.cc | 56 ------- src/numbuf/cpp/src/numbuf/tensor.h | 65 -------- .../python/src/pynumbuf/adapters/numpy.cc | 88 ++--------- .../python/src/pynumbuf/adapters/numpy.h | 12 +- .../python/src/pynumbuf/adapters/python.cc | 67 ++++---- .../python/src/pynumbuf/adapters/python.h | 14 +- .../python/src/pynumbuf/adapters/scalars.h | 2 +- src/numbuf/python/src/pynumbuf/memory.h | 3 +- src/numbuf/python/src/pynumbuf/numbuf.cc | 147 +++++++++++------- src/numbuf/thirdparty/build_thirdparty.sh | 2 +- src/numbuf/thirdparty/download_thirdparty.sh | 2 +- src/plasma/CMakeLists.txt | 4 +- 15 files changed, 197 insertions(+), 389 deletions(-) delete mode 100644 src/numbuf/cpp/src/numbuf/tensor.cc delete mode 100644 src/numbuf/cpp/src/numbuf/tensor.h diff --git a/src/numbuf/CMakeLists.txt b/src/numbuf/CMakeLists.txt index d843914cf..e7980e5ac 100644 --- a/src/numbuf/CMakeLists.txt +++ b/src/numbuf/CMakeLists.txt @@ -23,7 +23,7 @@ endif(APPLE) include_directories("${PYTHON_INCLUDE_DIRS}") include_directories("${NUMPY_INCLUDE_DIR}") -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -march=native -mtune=native -O3") if(UNIX AND NOT APPLE) link_libraries(rt) @@ -38,6 +38,8 @@ set(ARROW_IO_LIB "${CMAKE_CURRENT_LIST_DIR}/thirdparty/arrow/cpp/build/release/l CACHE STRING "Path to libarrow_io.a (needs to be changed if arrow is build in debug mode)") set(ARROW_IPC_LIB "${CMAKE_CURRENT_LIST_DIR}/thirdparty/arrow/cpp/build/release/libarrow_ipc.a" CACHE STRING "Path to libarrow_ipc.a (needs to be changed if arrow is build in debug mode)") +set(ARROW_PYTHON_LIB "${CMAKE_CURRENT_LIST_DIR}/thirdparty/arrow/cpp/build/release/libarrow_python.a" + CACHE STRING "Path to libarrow_python.a (needs to be changed if arrow is build in debug mode)") include_directories("${ARROW_DIR}/cpp/src/") include_directories("cpp/src/") @@ -53,7 +55,6 @@ endif() add_definitions(-fPIC) add_library(numbuf SHARED - cpp/src/numbuf/tensor.cc cpp/src/numbuf/dict.cc cpp/src/numbuf/sequence.cc python/src/pynumbuf/numbuf.cc @@ -68,15 +69,15 @@ if(APPLE) endif(APPLE) if(APPLE) - target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} -lpthread) + target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${ARROW_PYTHON_LIB} ${PYTHON_LIBRARIES} -lpthread) else() - target_link_libraries(numbuf -Wl,--whole-archive ${ARROW_LIB} -Wl,--no-whole-archive ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} -lpthread) + target_link_libraries(numbuf -Wl,--whole-archive ${ARROW_LIB} -Wl,--no-whole-archive ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${ARROW_PYTHON_LIB} ${PYTHON_LIBRARIES} -lpthread) endif() if(HAS_PLASMA) - target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} plasma_lib common ${FLATBUFFERS_STATIC_LIB} -lpthread) + target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${ARROW_PYTHON_LIB} ${PYTHON_LIBRARIES} plasma_lib common ${FLATBUFFERS_STATIC_LIB} -lpthread) else() - target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${PYTHON_LIBRARIES} -lpthread) + target_link_libraries(numbuf ${ARROW_LIB} ${ARROW_IO_LIB} ${ARROW_IPC_LIB} ${ARROW_PYTHON_LIB} ${PYTHON_LIBRARIES} -lpthread) endif() install(TARGETS numbuf DESTINATION ${CMAKE_SOURCE_DIR}/numbuf/) diff --git a/src/numbuf/cpp/src/numbuf/sequence.cc b/src/numbuf/cpp/src/numbuf/sequence.cc index bc26cb5d5..f77654a24 100644 --- a/src/numbuf/cpp/src/numbuf/sequence.cc +++ b/src/numbuf/cpp/src/numbuf/sequence.cc @@ -15,27 +15,18 @@ SequenceBuilder::SequenceBuilder(MemoryPool* pool) strings_(pool), floats_(pool, std::make_shared()), doubles_(pool, std::make_shared()), - uint8_tensors_(std::make_shared(), pool), - int8_tensors_(std::make_shared(), pool), - uint16_tensors_(std::make_shared(), pool), - int16_tensors_(std::make_shared(), pool), - uint32_tensors_(std::make_shared(), pool), - int32_tensors_(std::make_shared(), pool), - uint64_tensors_(std::make_shared(), pool), - int64_tensors_(std::make_shared(), pool), - float_tensors_(std::make_shared(), pool), - double_tensors_(std::make_shared(), pool), + tensor_indices_(pool, std::make_shared()), list_offsets_({0}), tuple_offsets_({0}), dict_offsets_({0}) {} -#define UPDATE(OFFSET, TAG) \ - if (TAG == -1) { \ - TAG = num_tags; \ - num_tags += 1; \ - } \ - RETURN_NOT_OK(offsets_.Append(OFFSET)); \ - RETURN_NOT_OK(types_.Append(TAG)); \ +#define UPDATE(OFFSET, TAG) \ + if (TAG == -1) { \ + TAG = num_tags; \ + num_tags += 1; \ + } \ + RETURN_NOT_OK(offsets_.Append(OFFSET)); \ + RETURN_NOT_OK(types_.Append(TAG)); \ RETURN_NOT_OK(nones_.AppendToBitmap(true)); Status SequenceBuilder::AppendNone() { @@ -79,27 +70,10 @@ Status SequenceBuilder::AppendDouble(double data) { return doubles_.Append(data); } -#define DEF_TENSOR_APPEND(NAME, TYPE, TAG) \ - Status SequenceBuilder::AppendTensor(const std::vector& dims, TYPE* data) { \ - if (TAG == -1) { NAME.Start(); } \ - int64_t size = 1; \ - for (auto dim : dims) { \ - size *= dim; \ - } \ - UPDATE(NAME.length(), TAG); \ - return NAME.Append(dims, data); \ - } - -DEF_TENSOR_APPEND(uint8_tensors_, uint8_t, uint8_tensor_tag); -DEF_TENSOR_APPEND(int8_tensors_, int8_t, int8_tensor_tag); -DEF_TENSOR_APPEND(uint16_tensors_, uint16_t, uint16_tensor_tag); -DEF_TENSOR_APPEND(int16_tensors_, int16_t, int16_tensor_tag); -DEF_TENSOR_APPEND(uint32_tensors_, uint32_t, uint32_tensor_tag); -DEF_TENSOR_APPEND(int32_tensors_, int32_t, int32_tensor_tag); -DEF_TENSOR_APPEND(uint64_tensors_, uint64_t, uint64_tensor_tag); -DEF_TENSOR_APPEND(int64_tensors_, int64_t, int64_tensor_tag); -DEF_TENSOR_APPEND(float_tensors_, float, float_tensor_tag); -DEF_TENSOR_APPEND(double_tensors_, double, double_tensor_tag); +Status SequenceBuilder::AppendTensor(int32_t tensor_index) { + UPDATE(tensor_indices_.length(), tensor_tag); + return tensor_indices_.Append(tensor_index); +} Status SequenceBuilder::AppendList(int32_t size) { UPDATE(list_offsets_.size() - 1, list_tag); @@ -158,19 +132,7 @@ Status SequenceBuilder::Finish(std::shared_ptr list_data, ADD_ELEMENT(floats_, float_tag); ADD_ELEMENT(doubles_, double_tag); - ADD_ELEMENT(uint8_tensors_, uint8_tensor_tag); - - ADD_ELEMENT(int8_tensors_, int8_tensor_tag); - ADD_ELEMENT(uint16_tensors_, uint16_tensor_tag); - ADD_ELEMENT(int16_tensors_, int16_tensor_tag); - ADD_ELEMENT(uint32_tensors_, uint32_tensor_tag); - - ADD_ELEMENT(int32_tensors_, int32_tensor_tag); - ADD_ELEMENT(uint64_tensors_, uint64_tensor_tag); - ADD_ELEMENT(int64_tensors_, int64_tensor_tag); - - ADD_ELEMENT(float_tensors_, float_tensor_tag); - ADD_ELEMENT(double_tensors_, double_tensor_tag); + ADD_ELEMENT(tensor_indices_, tensor_tag); ADD_SUBSEQUENCE(list_data, list_offsets_, list_builder, list_tag, "list"); ADD_SUBSEQUENCE(tuple_data, tuple_offsets_, tuple_builder, tuple_tag, "tuple"); diff --git a/src/numbuf/cpp/src/numbuf/sequence.h b/src/numbuf/cpp/src/numbuf/sequence.h index 95af937b8..fea1d21cd 100644 --- a/src/numbuf/cpp/src/numbuf/sequence.h +++ b/src/numbuf/cpp/src/numbuf/sequence.h @@ -1,8 +1,8 @@ #ifndef NUMBUF_LIST_H #define NUMBUF_LIST_H -#include "tensor.h" #include +#include namespace numbuf { @@ -49,23 +49,9 @@ class SequenceBuilder { /*! Appending a tensor to the sequence - \param dims - A vector of dimensions - - \param data - A pointer to the start of the data block. The length of the data block - will be the product of the dimensions + \param tensor_index Index of the tensor in the object. */ - arrow::Status AppendTensor(const std::vector& dims, uint8_t* data); - arrow::Status AppendTensor(const std::vector& dims, int8_t* data); - arrow::Status AppendTensor(const std::vector& dims, uint16_t* data); - arrow::Status AppendTensor(const std::vector& dims, int16_t* data); - arrow::Status AppendTensor(const std::vector& dims, uint32_t* data); - arrow::Status AppendTensor(const std::vector& dims, int32_t* data); - arrow::Status AppendTensor(const std::vector& dims, uint64_t* data); - arrow::Status AppendTensor(const std::vector& dims, int64_t* data); - arrow::Status AppendTensor(const std::vector& dims, float* data); - arrow::Status AppendTensor(const std::vector& dims, double* data); + arrow::Status AppendTensor(int32_t tensor_index); /*! Add a sublist to the sequence. The data contained in the sublist will be specified in the "Finish" method. @@ -109,16 +95,11 @@ class SequenceBuilder { arrow::FloatBuilder floats_; arrow::DoubleBuilder doubles_; - UInt8TensorBuilder uint8_tensors_; - Int8TensorBuilder int8_tensors_; - UInt16TensorBuilder uint16_tensors_; - Int16TensorBuilder int16_tensors_; - UInt32TensorBuilder uint32_tensors_; - Int32TensorBuilder int32_tensors_; - UInt64TensorBuilder uint64_tensors_; - Int64TensorBuilder int64_tensors_; - FloatTensorBuilder float_tensors_; - DoubleTensorBuilder double_tensors_; + // We use an Int32Builder here to distinguish the tensor indices from + // the ints_ above (see the case Type::INT32 in get_value in python.cc). + // TODO(pcm): Replace this by using the union tags to distinguish between + // these two cases. + arrow::Int32Builder tensor_indices_; std::vector list_offsets_; std::vector tuple_offsets_; @@ -131,17 +112,7 @@ class SequenceBuilder { int8_t float_tag = -1; int8_t double_tag = -1; - int8_t uint8_tensor_tag = -1; - int8_t int8_tensor_tag = -1; - int8_t uint16_tensor_tag = -1; - int8_t int16_tensor_tag = -1; - int8_t uint32_tensor_tag = -1; - int8_t int32_tensor_tag = -1; - int8_t uint64_tensor_tag = -1; - int8_t int64_tensor_tag = -1; - int8_t float_tensor_tag = -1; - int8_t double_tensor_tag = -1; - + int8_t tensor_tag = -1; int8_t list_tag = -1; int8_t tuple_tag = -1; int8_t dict_tag = -1; diff --git a/src/numbuf/cpp/src/numbuf/tensor.cc b/src/numbuf/cpp/src/numbuf/tensor.cc deleted file mode 100644 index c5602af90..000000000 --- a/src/numbuf/cpp/src/numbuf/tensor.cc +++ /dev/null @@ -1,56 +0,0 @@ -#include "tensor.h" - -using namespace arrow; - -namespace numbuf { - -template -TensorBuilder::TensorBuilder(const TypePtr& dtype, MemoryPool* pool) - : dtype_(dtype), pool_(pool) {} - -template -Status TensorBuilder::Start() { - dim_data_ = std::make_shared(pool_, std::make_shared()); - dims_ = std::make_shared(pool_, dim_data_); - value_data_ = std::make_shared>(pool_, dtype_); - values_ = std::make_shared(pool_, value_data_); - auto dims_field = std::make_shared("dims", dims_->type()); - auto values_field = std::make_shared("data", values_->type()); - auto type = - std::make_shared(std::vector({dims_field, values_field})); - tensors_ = std::make_shared( - pool_, type, std::vector>({dims_, values_})); - return Status::OK(); -} - -template -Status TensorBuilder::Append(const std::vector& dims, const elem_type* data) { - DCHECK(tensors_); - RETURN_NOT_OK(tensors_->Append()); - RETURN_NOT_OK(dims_->Append()); - RETURN_NOT_OK(values_->Append()); - int64_t size = 1; - for (auto dim : dims) { - size *= dim; - RETURN_NOT_OK(dim_data_->Append(dim)); - } - RETURN_NOT_OK(value_data_->Append(data, size)); - return Status::OK(); // tensors_->Append(); -} - -template -Status TensorBuilder::Finish(std::shared_ptr* out) { - return tensors_->Finish(out); -} - -template class TensorBuilder; -template class TensorBuilder; -template class TensorBuilder; -template class TensorBuilder; -template class TensorBuilder; -template class TensorBuilder; -template class TensorBuilder; -template class TensorBuilder; -template class TensorBuilder; -template class TensorBuilder; -} diff --git a/src/numbuf/cpp/src/numbuf/tensor.h b/src/numbuf/cpp/src/numbuf/tensor.h deleted file mode 100644 index 36e078508..000000000 --- a/src/numbuf/cpp/src/numbuf/tensor.h +++ /dev/null @@ -1,65 +0,0 @@ -#ifndef NUMBUF_TENSOR_H -#define NUMBUF_TENSOR_H - -#include -#include -#include - -namespace numbuf { - -/*! This is a class for building a dataframe where each row corresponds to - a Tensor (= multidimensional array) of numerical data. There are two - columns, "dims" which contains an array of dimensions for each Tensor - and "data" which contains data buffer of the Tensor as a flattened array. -*/ -template -class TensorBuilder { - public: - typedef typename T::c_type elem_type; - - TensorBuilder(const arrow::TypePtr& dtype, arrow::MemoryPool* pool = nullptr); - - arrow::Status Start(); - - /*! Append a new tensor. - - \param dims - The dimensions of the Tensor - - \param data - Pointer to the beginning of the data buffer of the Tensor. The - total length of the buffer is sizeof(elem_type) * product of dims[i] over i - */ - arrow::Status Append(const std::vector& dims, const elem_type* data); - - //! Convert the tensors to an Arrow StructArray - arrow::Status Finish(std::shared_ptr* out); - - //! Number of tensors in the column - int32_t length() { return tensors_->length(); } - - std::shared_ptr type() { return tensors_->type(); } - - private: - arrow::TypePtr dtype_; - arrow::MemoryPool* pool_; - std::shared_ptr dim_data_; - std::shared_ptr dims_; - std::shared_ptr> value_data_; - std::shared_ptr values_; - std::shared_ptr tensors_; -}; - -typedef TensorBuilder UInt8TensorBuilder; -typedef TensorBuilder Int8TensorBuilder; -typedef TensorBuilder UInt16TensorBuilder; -typedef TensorBuilder Int16TensorBuilder; -typedef TensorBuilder UInt32TensorBuilder; -typedef TensorBuilder Int32TensorBuilder; -typedef TensorBuilder UInt64TensorBuilder; -typedef TensorBuilder Int64TensorBuilder; -typedef TensorBuilder FloatTensorBuilder; -typedef TensorBuilder DoubleTensorBuilder; -} - -#endif // NUMBUF_TENSOR_H diff --git a/src/numbuf/python/src/pynumbuf/adapters/numpy.cc b/src/numbuf/python/src/pynumbuf/adapters/numpy.cc index 727f7747a..43af042f7 100644 --- a/src/numbuf/python/src/pynumbuf/adapters/numpy.cc +++ b/src/numbuf/python/src/pynumbuf/adapters/numpy.cc @@ -3,7 +3,7 @@ #include -#include +#include using namespace arrow; @@ -14,48 +14,11 @@ extern PyObject* numbuf_deserialize_callback; namespace numbuf { -#define ARROW_TYPE_TO_NUMPY_CASE(TYPE) \ - case Type::TYPE: \ - return NPY_##TYPE; - -#define DESERIALIZE_ARRAY_CASE(TYPE, ArrayType, type) \ - case Type::TYPE: { \ - auto values = std::dynamic_pointer_cast(content->values()); \ - DCHECK(values); \ - type* data = const_cast(values->raw_data()) + content->value_offset(offset); \ - *out = PyArray_SimpleNewFromData( \ - num_dims, dim.data(), NPY_##TYPE, reinterpret_cast(data)); \ - if (base != Py_None) { PyArray_SetBaseObject((PyArrayObject*)*out, base); } \ - Py_XINCREF(base); \ - } break; - -Status DeserializeArray( - std::shared_ptr array, int32_t offset, PyObject* base, PyObject** out) { +Status DeserializeArray(std::shared_ptr array, int32_t offset, PyObject* base, + const std::vector>& tensors, PyObject** out) { DCHECK(array); - auto tensor = std::dynamic_pointer_cast(array); - DCHECK(tensor); - auto dims = std::dynamic_pointer_cast(tensor->field(0)); - auto content = std::dynamic_pointer_cast(tensor->field(1)); - npy_intp num_dims = dims->value_length(offset); - std::vector dim(num_dims); - for (int i = dims->value_offset(offset); i < dims->value_offset(offset + 1); ++i) { - dim[i - dims->value_offset(offset)] = - std::dynamic_pointer_cast(dims->values())->Value(i); - } - switch (content->value_type()->type) { - DESERIALIZE_ARRAY_CASE(INT8, Int8Array, int8_t) - DESERIALIZE_ARRAY_CASE(INT16, Int16Array, int16_t) - DESERIALIZE_ARRAY_CASE(INT32, Int32Array, int32_t) - DESERIALIZE_ARRAY_CASE(INT64, Int64Array, int64_t) - DESERIALIZE_ARRAY_CASE(UINT8, UInt8Array, uint8_t) - DESERIALIZE_ARRAY_CASE(UINT16, UInt16Array, uint16_t) - DESERIALIZE_ARRAY_CASE(UINT32, UInt32Array, uint32_t) - DESERIALIZE_ARRAY_CASE(UINT64, UInt64Array, uint64_t) - DESERIALIZE_ARRAY_CASE(FLOAT, FloatArray, float) - DESERIALIZE_ARRAY_CASE(DOUBLE, DoubleArray, double) - default: - DCHECK(false) << "arrow type not recognized: " << content->value_type()->type; - } + int32_t index = std::static_pointer_cast(array)->Value(offset); + RETURN_NOT_OK(py::TensorToNdarray(*tensors[index], base, out)); /* Mark the array as immutable. */ PyObject* flags = PyObject_GetAttrString(*out, "flags"); DCHECK(flags != NULL) << "Could not mark Numpy array immutable"; @@ -65,51 +28,23 @@ Status DeserializeArray( return Status::OK(); } -Status SerializeArray( - PyArrayObject* array, SequenceBuilder& builder, std::vector& subdicts) { - size_t ndim = PyArray_NDIM(array); +Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder, + std::vector& subdicts, std::vector& tensors_out) { int dtype = PyArray_TYPE(array); - std::vector dims(ndim); - for (int i = 0; i < ndim; ++i) { - dims[i] = PyArray_DIM(array, i); - } - // TODO(pcm): Once we don't use builders any more below and directly share - // the memory buffer, we need to be more careful about this and not - // decrease the reference count of "contiguous" before the serialization - // is finished - auto contiguous = PyArray_GETCONTIGUOUS(array); - auto data = PyArray_DATA(contiguous); switch (dtype) { case NPY_UINT8: - RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast(data))); - break; case NPY_INT8: - RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast(data))); - break; case NPY_UINT16: - RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast(data))); - break; case NPY_INT16: - RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast(data))); - break; case NPY_UINT32: - RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast(data))); - break; case NPY_INT32: - RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast(data))); - break; case NPY_UINT64: - RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast(data))); - break; case NPY_INT64: - RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast(data))); - break; case NPY_FLOAT: - RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast(data))); - break; - case NPY_DOUBLE: - RETURN_NOT_OK(builder.AppendTensor(dims, reinterpret_cast(data))); - break; + case NPY_DOUBLE: { + RETURN_NOT_OK(builder.AppendTensor(tensors_out.size())); + tensors_out.push_back(reinterpret_cast(array)); + } break; default: if (!numbuf_serialize_callback) { std::stringstream stream; @@ -126,7 +61,6 @@ Status SerializeArray( subdicts.push_back(result); } } - Py_XDECREF(contiguous); return Status::OK(); } } diff --git a/src/numbuf/python/src/pynumbuf/adapters/numpy.h b/src/numbuf/python/src/pynumbuf/adapters/numpy.h index 91f0473da..1d2bf887e 100644 --- a/src/numbuf/python/src/pynumbuf/adapters/numpy.h +++ b/src/numbuf/python/src/pynumbuf/adapters/numpy.h @@ -6,18 +6,18 @@ #define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION #define NO_IMPORT_ARRAY -#define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API +#define PY_ARRAY_UNIQUE_SYMBOL arrow_ARRAY_API #include #include -#include namespace numbuf { -arrow::Status SerializeArray( - PyArrayObject* array, SequenceBuilder& builder, std::vector& subdicts); -arrow::Status DeserializeArray( - std::shared_ptr array, int32_t offset, PyObject* base, PyObject** out); +arrow::Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder, + std::vector& subdicts, std::vector& tensors_out); +arrow::Status DeserializeArray(std::shared_ptr array, int32_t offset, + PyObject* base, const std::vector>& tensors, + PyObject** out); } #endif diff --git a/src/numbuf/python/src/pynumbuf/adapters/python.cc b/src/numbuf/python/src/pynumbuf/adapters/python.cc index ca5aba393..afd1b6405 100644 --- a/src/numbuf/python/src/pynumbuf/adapters/python.cc +++ b/src/numbuf/python/src/pynumbuf/adapters/python.cc @@ -21,7 +21,7 @@ namespace numbuf { #endif Status get_value(std::shared_ptr arr, int32_t index, int32_t type, PyObject* base, - PyObject** result) { + const std::vector>& tensors, PyObject** result) { switch (arr->type()->type) { case Type::BOOL: *result = @@ -57,17 +57,23 @@ Status get_value(std::shared_ptr arr, int32_t index, int32_t type, PyObje auto l = std::static_pointer_cast(s->field(0)); if (s->type()->child(0)->name == "list") { return DeserializeList(l->values(), l->value_offset(index), - l->value_offset(index + 1), base, result); + l->value_offset(index + 1), base, tensors, result); } else if (s->type()->child(0)->name == "tuple") { return DeserializeTuple(l->values(), l->value_offset(index), - l->value_offset(index + 1), base, result); + l->value_offset(index + 1), base, tensors, result); } else if (s->type()->child(0)->name == "dict") { return DeserializeDict(l->values(), l->value_offset(index), - l->value_offset(index + 1), base, result); + l->value_offset(index + 1), base, tensors, result); } else { - return DeserializeArray(arr, index, base, result); + DCHECK(false) << "error"; } } + // We use an Int32Builder here to distinguish the tensor indices from + // the Type::INT64 above (see tensor_indices_ in sequence.h). + case Type::INT32: { + int32_t val = std::static_pointer_cast(arr)->Value(index); + return DeserializeArray(arr, index, base, tensors, result); + } default: DCHECK(false) << "union tag not recognized " << type; } @@ -75,7 +81,8 @@ Status get_value(std::shared_ptr arr, int32_t index, int32_t type, PyObje } Status append(PyObject* elem, SequenceBuilder& builder, std::vector& sublists, - std::vector& subtuples, std::vector& subdicts) { + std::vector& subtuples, std::vector& subdicts, + std::vector& tensors_out) { // The bool case must precede the int case (PyInt_Check passes for bools) if (PyBool_Check(elem)) { RETURN_NOT_OK(builder.AppendBool(elem == Py_True)); @@ -119,7 +126,7 @@ Status append(PyObject* elem, SequenceBuilder& builder, std::vector& } else if (PyArray_IsScalar(elem, Generic)) { RETURN_NOT_OK(AppendScalar(elem, builder)); } else if (PyArray_Check(elem)) { - RETURN_NOT_OK(SerializeArray((PyArrayObject*)elem, builder, subdicts)); + RETURN_NOT_OK(SerializeArray((PyArrayObject*)elem, builder, subdicts, tensors_out)); } else if (elem == Py_None) { RETURN_NOT_OK(builder.AppendNone()); } else { @@ -143,7 +150,7 @@ Status append(PyObject* elem, SequenceBuilder& builder, std::vector& } Status SerializeSequences(std::vector sequences, int32_t recursion_depth, - std::shared_ptr* out) { + std::shared_ptr* out, std::vector& tensors_out) { DCHECK(out); if (recursion_depth >= MAX_RECURSION_DEPTH) { return Status::NotImplemented( @@ -156,7 +163,7 @@ Status SerializeSequences(std::vector sequences, int32_t recursion_de PyObject* item; PyObject* iterator = PyObject_GetIter(sequence); while ((item = PyIter_Next(iterator))) { - Status s = append(item, builder, sublists, subtuples, subdicts); + Status s = append(item, builder, sublists, subtuples, subdicts, tensors_out); Py_DECREF(item); // if an error occurs, we need to decrement the reference counts before returning if (!s.ok()) { @@ -168,15 +175,16 @@ Status SerializeSequences(std::vector sequences, int32_t recursion_de } std::shared_ptr list; if (sublists.size() > 0) { - RETURN_NOT_OK(SerializeSequences(sublists, recursion_depth + 1, &list)); + RETURN_NOT_OK(SerializeSequences(sublists, recursion_depth + 1, &list, tensors_out)); } std::shared_ptr tuple; if (subtuples.size() > 0) { - RETURN_NOT_OK(SerializeSequences(subtuples, recursion_depth + 1, &tuple)); + RETURN_NOT_OK( + SerializeSequences(subtuples, recursion_depth + 1, &tuple, tensors_out)); } std::shared_ptr dict; if (subdicts.size() > 0) { - RETURN_NOT_OK(SerializeDict(subdicts, recursion_depth + 1, &dict)); + RETURN_NOT_OK(SerializeDict(subdicts, recursion_depth + 1, &dict, tensors_out)); } return builder.Finish(list, tuple, dict, out); } @@ -196,7 +204,7 @@ Status SerializeSequences(std::vector sequences, int32_t recursion_de int8_t type = types->Value(i); \ std::shared_ptr arr = data->child(type); \ PyObject* value; \ - RETURN_NOT_OK(get_value(arr, offset, type, base, &value)); \ + RETURN_NOT_OK(get_value(arr, offset, type, base, tensors, &value)); \ SET_ITEM(result, i - start_idx, value); \ } \ } \ @@ -204,17 +212,17 @@ Status SerializeSequences(std::vector sequences, int32_t recursion_de return Status::OK(); Status DeserializeList(std::shared_ptr array, int32_t start_idx, int32_t stop_idx, - PyObject* base, PyObject** out) { + PyObject* base, const std::vector>& tensors, PyObject** out) { DESERIALIZE_SEQUENCE(PyList_New, PyList_SetItem) } Status DeserializeTuple(std::shared_ptr array, int32_t start_idx, int32_t stop_idx, - PyObject* base, PyObject** out) { + PyObject* base, const std::vector>& tensors, PyObject** out) { DESERIALIZE_SEQUENCE(PyTuple_New, PyTuple_SetItem) } -Status SerializeDict( - std::vector dicts, int32_t recursion_depth, std::shared_ptr* out) { +Status SerializeDict(std::vector dicts, int32_t recursion_depth, + std::shared_ptr* out, std::vector& tensors_out) { DictBuilder result; if (recursion_depth >= MAX_RECURSION_DEPTH) { return Status::NotImplemented( @@ -226,26 +234,31 @@ Status SerializeDict( PyObject *key, *value; Py_ssize_t pos = 0; while (PyDict_Next(dict, &pos, &key, &value)) { - RETURN_NOT_OK(append(key, result.keys(), dummy, key_tuples, dummy)); + RETURN_NOT_OK(append(key, result.keys(), dummy, key_tuples, dummy, tensors_out)); DCHECK(dummy.size() == 0); - RETURN_NOT_OK(append(value, result.vals(), val_lists, val_tuples, val_dicts)); + RETURN_NOT_OK( + append(value, result.vals(), val_lists, val_tuples, val_dicts, tensors_out)); } } std::shared_ptr key_tuples_arr; if (key_tuples.size() > 0) { - RETURN_NOT_OK(SerializeSequences(key_tuples, recursion_depth + 1, &key_tuples_arr)); + RETURN_NOT_OK(SerializeSequences( + key_tuples, recursion_depth + 1, &key_tuples_arr, tensors_out)); } std::shared_ptr val_list_arr; if (val_lists.size() > 0) { - RETURN_NOT_OK(SerializeSequences(val_lists, recursion_depth + 1, &val_list_arr)); + RETURN_NOT_OK( + SerializeSequences(val_lists, recursion_depth + 1, &val_list_arr, tensors_out)); } std::shared_ptr val_tuples_arr; if (val_tuples.size() > 0) { - RETURN_NOT_OK(SerializeSequences(val_tuples, recursion_depth + 1, &val_tuples_arr)); + RETURN_NOT_OK(SerializeSequences( + val_tuples, recursion_depth + 1, &val_tuples_arr, tensors_out)); } std::shared_ptr val_dict_arr; if (val_dicts.size() > 0) { - RETURN_NOT_OK(SerializeDict(val_dicts, recursion_depth + 1, &val_dict_arr)); + RETURN_NOT_OK( + SerializeDict(val_dicts, recursion_depth + 1, &val_dict_arr, tensors_out)); } result.Finish(key_tuples_arr, val_list_arr, val_tuples_arr, val_dict_arr, out); @@ -266,13 +279,15 @@ Status SerializeDict( } Status DeserializeDict(std::shared_ptr array, int32_t start_idx, int32_t stop_idx, - PyObject* base, PyObject** out) { + PyObject* base, const std::vector>& tensors, PyObject** out) { auto data = std::dynamic_pointer_cast(array); // TODO(pcm): error handling, get rid of the temporary copy of the list PyObject *keys, *vals; PyObject* result = PyDict_New(); - ARROW_RETURN_NOT_OK(DeserializeList(data->field(0), start_idx, stop_idx, base, &keys)); - ARROW_RETURN_NOT_OK(DeserializeList(data->field(1), start_idx, stop_idx, base, &vals)); + ARROW_RETURN_NOT_OK( + DeserializeList(data->field(0), start_idx, stop_idx, base, tensors, &keys)); + ARROW_RETURN_NOT_OK( + DeserializeList(data->field(1), start_idx, stop_idx, base, tensors, &vals)); for (size_t i = start_idx; i < stop_idx; ++i) { PyDict_SetItem( result, PyList_GetItem(keys, i - start_idx), PyList_GetItem(vals, i - start_idx)); diff --git a/src/numbuf/python/src/pynumbuf/adapters/python.h b/src/numbuf/python/src/pynumbuf/adapters/python.h index dc25ad018..d66aec109 100644 --- a/src/numbuf/python/src/pynumbuf/adapters/python.h +++ b/src/numbuf/python/src/pynumbuf/adapters/python.h @@ -12,15 +12,19 @@ namespace numbuf { arrow::Status SerializeSequences(std::vector sequences, - int32_t recursion_depth, std::shared_ptr* out); + int32_t recursion_depth, std::shared_ptr* out, + std::vector& tensors_out); arrow::Status SerializeDict(std::vector dicts, int32_t recursion_depth, - std::shared_ptr* out); + std::shared_ptr* out, std::vector& tensors_out); arrow::Status DeserializeList(std::shared_ptr array, int32_t start_idx, - int32_t stop_idx, PyObject* base, PyObject** out); + int32_t stop_idx, PyObject* base, + const std::vector>& tensors, PyObject** out); arrow::Status DeserializeTuple(std::shared_ptr array, int32_t start_idx, - int32_t stop_idx, PyObject* base, PyObject** out); + int32_t stop_idx, PyObject* base, + const std::vector>& tensors, PyObject** out); arrow::Status DeserializeDict(std::shared_ptr array, int32_t start_idx, - int32_t stop_idx, PyObject* base, PyObject** out); + int32_t stop_idx, PyObject* base, + const std::vector>& tensors, PyObject** out); } #endif diff --git a/src/numbuf/python/src/pynumbuf/adapters/scalars.h b/src/numbuf/python/src/pynumbuf/adapters/scalars.h index e64027c9e..c7c65dce1 100644 --- a/src/numbuf/python/src/pynumbuf/adapters/scalars.h +++ b/src/numbuf/python/src/pynumbuf/adapters/scalars.h @@ -6,7 +6,7 @@ #include #define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION #define NO_IMPORT_ARRAY -#define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API +#define PY_ARRAY_UNIQUE_SYMBOL arrow_ARRAY_API #include #include diff --git a/src/numbuf/python/src/pynumbuf/memory.h b/src/numbuf/python/src/pynumbuf/memory.h index 8b1c612a4..0853c6778 100644 --- a/src/numbuf/python/src/pynumbuf/memory.h +++ b/src/numbuf/python/src/pynumbuf/memory.h @@ -113,8 +113,7 @@ class FixedBufferStream : public arrow::io::OutputStream, arrow::Status Read(int64_t nbytes, std::shared_ptr* out) override { DCHECK(out); - DCHECK(position_ + nbytes <= size_) << "position: " << position_ - << " nbytes: " << nbytes << "size: " << size_; + if (position_ + nbytes > size_) { return arrow::Status::IOError("EOF"); } *out = std::make_shared(data_ + position_, nbytes); position_ += nbytes; return arrow::Status::OK(); diff --git a/src/numbuf/python/src/pynumbuf/numbuf.cc b/src/numbuf/python/src/pynumbuf/numbuf.cc index 3d6afb0c4..d8a69b500 100644 --- a/src/numbuf/python/src/pynumbuf/numbuf.cc +++ b/src/numbuf/python/src/pynumbuf/numbuf.cc @@ -1,6 +1,6 @@ #include #define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION -#define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API +#define PY_ARRAY_UNIQUE_SYMBOL arrow_ARRAY_API #include #include "bytesobject.h" @@ -9,6 +9,9 @@ #include #include +#include + +#include #include "adapters/python.h" #include "memory.h" @@ -27,39 +30,65 @@ PyObject* NumbufPlasmaObjectExistsError; #endif +using namespace arrow; +using namespace numbuf; + +struct RayObject { + std::shared_ptr batch; + std::vector arrays; + std::vector> tensors; +}; + // Each arrow object is stored in the format // | length of the object in bytes | object data |. // LENGTH_PREFIX_SIZE is the number of bytes occupied by the // object length field. constexpr int64_t LENGTH_PREFIX_SIZE = sizeof(int64_t); -using namespace arrow; -using namespace numbuf; - std::shared_ptr make_batch(std::shared_ptr data) { auto field = std::make_shared("list", data->type()); std::shared_ptr schema(new Schema({field})); return std::shared_ptr(new RecordBatch(schema, data->length(), {data})); } -Status get_batch_size(std::shared_ptr batch, int64_t* size) { - // Determine the size of the file by writing to a mock file. - auto mock = std::make_shared(); +Status write_batch_and_tensors(io::OutputStream* stream, + std::shared_ptr batch, const std::vector& tensors, + int64_t* batch_size, int64_t* total_size) { std::shared_ptr writer; - RETURN_NOT_OK(ipc::FileWriter::Open(mock.get(), batch->schema(), &writer)); + RETURN_NOT_OK(ipc::FileWriter::Open(stream, batch->schema(), &writer)); RETURN_NOT_OK(writer->WriteRecordBatch(*batch, true)); RETURN_NOT_OK(writer->Close()); - RETURN_NOT_OK(mock->Tell(size)); + RETURN_NOT_OK(stream->Tell(batch_size)); + for (auto array : tensors) { + int32_t metadata_length; + int64_t body_length; + std::shared_ptr tensor; + auto contiguous = (PyObject*)PyArray_GETCONTIGUOUS((PyArrayObject*)array); + RETURN_NOT_OK(py::NdarrayToTensor(NULL, contiguous, &tensor)); + RETURN_NOT_OK(ipc::WriteTensor(*tensor, stream, &metadata_length, &body_length)); + Py_XDECREF(contiguous); + } + RETURN_NOT_OK(stream->Tell(total_size)); return Status::OK(); } -Status read_batch(uint8_t* data, int64_t size, std::shared_ptr* batch_out) { +Status read_batch_and_tensors(uint8_t* data, int64_t size, + std::shared_ptr* batch_out, + std::vector>& tensors_out) { std::shared_ptr reader; + int64_t batch_size = *((int64_t*)data); auto source = std::make_shared( LENGTH_PREFIX_SIZE + data, size - LENGTH_PREFIX_SIZE); - int64_t data_size = *((int64_t*)data); - RETURN_NOT_OK(arrow::ipc::FileReader::Open(source, data_size, &reader)); + RETURN_NOT_OK(arrow::ipc::FileReader::Open(source, batch_size, &reader)); RETURN_NOT_OK(reader->GetRecordBatch(0, batch_out)); + int64_t offset = batch_size; + while (true) { + std::shared_ptr tensor; + Status s = ipc::ReadTensor(offset, source.get(), &tensor); + if (!s.ok()) { break; } + tensors_out.push_back(tensor); + RETURN_NOT_OK(source->Tell(&offset)); + } return Status::OK(); } @@ -81,10 +110,9 @@ static PyObject* NumbufError; PyObject* numbuf_serialize_callback = NULL; PyObject* numbuf_deserialize_callback = NULL; -int PyObjectToArrow(PyObject* object, std::shared_ptr** result) { +int PyObjectToArrow(PyObject* object, RayObject** result) { if (PyCapsule_IsValid(object, "arrow")) { - *result = reinterpret_cast*>( - PyCapsule_GetPointer(object, "arrow")); + *result = reinterpret_cast(PyCapsule_GetPointer(object, "arrow")); return 1; } else { PyErr_SetString(PyExc_TypeError, "must be an 'arrow' capsule"); @@ -93,8 +121,7 @@ int PyObjectToArrow(PyObject* object, std::shared_ptr** result) { } static void ArrowCapsule_Destructor(PyObject* capsule) { - delete reinterpret_cast*>( - PyCapsule_GetPointer(capsule, "arrow")); + delete reinterpret_cast(PyCapsule_GetPointer(capsule, "arrow")); } /* Documented in doc/numbuf.rst in ray-core */ @@ -103,21 +130,31 @@ static PyObject* serialize_list(PyObject* self, PyObject* args) { if (!PyArg_ParseTuple(args, "O", &value)) { return NULL; } std::shared_ptr array; if (PyList_Check(value)) { + RayObject* object = new RayObject(); int32_t recursion_depth = 0; - Status s = - SerializeSequences(std::vector({value}), recursion_depth, &array); + Status s = SerializeSequences( + std::vector({value}), recursion_depth, &array, object->arrays); CHECK_SERIALIZATION_ERROR(s); - auto batch = new std::shared_ptr(); - *batch = make_batch(array); + for (auto array : object->arrays) { + int32_t metadata_length; + int64_t body_length; + std::shared_ptr tensor; + ARROW_CHECK_OK(py::NdarrayToTensor(NULL, array, &tensor)); + object->tensors.push_back(tensor); + } - int64_t size; - ARROW_CHECK_OK(get_batch_size(*batch, &size)); + object->batch = make_batch(array); + + int64_t data_size, total_size; + auto mock = std::make_shared(); + write_batch_and_tensors( + mock.get(), object->batch, object->arrays, &data_size, &total_size); PyObject* r = PyTuple_New(2); - PyTuple_SetItem(r, 0, PyLong_FromLong(LENGTH_PREFIX_SIZE + size)); - PyTuple_SetItem(r, 1, - PyCapsule_New(reinterpret_cast(batch), "arrow", &ArrowCapsule_Destructor)); + PyTuple_SetItem(r, 0, PyLong_FromLong(LENGTH_PREFIX_SIZE + total_size)); + PyTuple_SetItem(r, 1, PyCapsule_New(reinterpret_cast(object), "arrow", + &ArrowCapsule_Destructor)); return r; } return NULL; @@ -125,9 +162,9 @@ static PyObject* serialize_list(PyObject* self, PyObject* args) { /* Documented in doc/numbuf.rst in ray-core */ static PyObject* write_to_buffer(PyObject* self, PyObject* args) { - std::shared_ptr* batch; + RayObject* object; PyObject* memoryview; - if (!PyArg_ParseTuple(args, "O&O", &PyObjectToArrow, &batch, &memoryview)) { + if (!PyArg_ParseTuple(args, "O&O", &PyObjectToArrow, &object, &memoryview)) { return NULL; } if (!PyMemoryView_Check(memoryview)) { return NULL; } @@ -135,10 +172,9 @@ static PyObject* write_to_buffer(PyObject* self, PyObject* args) { auto target = std::make_shared( LENGTH_PREFIX_SIZE + reinterpret_cast(buffer->buf), buffer->len - LENGTH_PREFIX_SIZE); - std::shared_ptr writer; - ARROW_CHECK_OK(ipc::FileWriter::Open(target.get(), (*batch)->schema(), &writer)); - ARROW_CHECK_OK(writer->WriteRecordBatch(*(*batch), true)); - ARROW_CHECK_OK(writer->Close()); + int64_t batch_size, total_size; + ARROW_CHECK_OK(write_batch_and_tensors( + target.get(), object->batch, object->arrays, &batch_size, &total_size)); *((int64_t*)buffer->buf) = buffer->len - LENGTH_PREFIX_SIZE; Py_RETURN_NONE; } @@ -150,20 +186,22 @@ static PyObject* read_from_buffer(PyObject* self, PyObject* args) { Py_buffer* data_buffer = PyMemoryView_GET_BUFFER(data_memoryview); - auto batch = new std::shared_ptr(); - ARROW_CHECK_OK( - read_batch(reinterpret_cast(data_buffer->buf), data_buffer->len, batch)); + RayObject* object = new RayObject(); + ARROW_CHECK_OK(read_batch_and_tensors(reinterpret_cast(data_buffer->buf), + data_buffer->len, &object->batch, object->tensors)); - return PyCapsule_New(reinterpret_cast(batch), "arrow", &ArrowCapsule_Destructor); + return PyCapsule_New( + reinterpret_cast(object), "arrow", &ArrowCapsule_Destructor); } /* Documented in doc/numbuf.rst in ray-core */ static PyObject* deserialize_list(PyObject* self, PyObject* args) { - std::shared_ptr* data; + RayObject* object; PyObject* base = Py_None; - if (!PyArg_ParseTuple(args, "O&|O", &PyObjectToArrow, &data, &base)) { return NULL; } + if (!PyArg_ParseTuple(args, "O&|O", &PyObjectToArrow, &object, &base)) { return NULL; } PyObject* result; - Status s = DeserializeList((*data)->column(0), 0, (*data)->num_rows(), base, &result); + Status s = DeserializeList(object->batch->column(0), 0, object->batch->num_rows(), base, + object->tensors, &result); CHECK_SERIALIZATION_ERROR(s); return result; } @@ -247,12 +285,16 @@ static PyObject* store_list(PyObject* self, PyObject* args) { std::shared_ptr array; int32_t recursion_depth = 0; - Status s = SerializeSequences(std::vector({value}), recursion_depth, &array); + std::vector tensors; + Status s = SerializeSequences( + std::vector({value}), recursion_depth, &array, tensors); CHECK_SERIALIZATION_ERROR(s); std::shared_ptr batch = make_batch(array); - int64_t size; - ARROW_CHECK_OK(get_batch_size(batch, &size)); + + int64_t data_size, total_size; + auto mock = std::make_shared(); + write_batch_and_tensors(mock.get(), batch, tensors, &data_size, &total_size); uint8_t* data; /* The arrow schema is stored as the metadata of the plasma object and @@ -260,7 +302,8 @@ static PyObject* store_list(PyObject* self, PyObject* args) { * stored in the plasma data buffer. The header end offset is stored in * the first LENGTH_PREFIX_SIZE bytes of the data buffer. The RecordBatch * data is stored after that. */ - int error_code = plasma_create(conn, obj_id, LENGTH_PREFIX_SIZE + size, NULL, 0, &data); + int error_code = + plasma_create(conn, obj_id, LENGTH_PREFIX_SIZE + total_size, NULL, 0, &data); if (error_code == PlasmaError_ObjectExists) { PyErr_SetString(NumbufPlasmaObjectExistsError, "An object with this ID already exists in the plasma " @@ -275,12 +318,10 @@ static PyObject* store_list(PyObject* self, PyObject* args) { } CHECK(error_code == PlasmaError_OK); - auto target = std::make_shared(LENGTH_PREFIX_SIZE + data, size); - std::shared_ptr writer; - ipc::FileWriter::Open(target.get(), batch->schema(), &writer); - writer->WriteRecordBatch(*batch, true); - writer->Close(); - *((int64_t*)data) = size; + auto target = + std::make_shared(LENGTH_PREFIX_SIZE + data, total_size); + write_batch_and_tensors(target.get(), batch, tensors, &data_size, &total_size); + *((int64_t*)data) = data_size; /* Do the plasma_release corresponding to the call to plasma_create. */ plasma_release(conn, obj_id); @@ -347,11 +388,13 @@ static PyObject* retrieve_list(PyObject* self, PyObject* args) { Py_XINCREF(plasma_conn); auto batch = std::shared_ptr(); - ARROW_CHECK_OK( - read_batch(object_buffers[i].data, object_buffers[i].data_size, &batch)); + std::vector> tensors; + ARROW_CHECK_OK(read_batch_and_tensors( + object_buffers[i].data, object_buffers[i].data_size, &batch, tensors)); PyObject* result; - Status s = DeserializeList(batch->column(0), 0, batch->num_rows(), base, &result); + Status s = + DeserializeList(batch->column(0), 0, batch->num_rows(), base, tensors, &result); CHECK_SERIALIZATION_ERROR(s); Py_XDECREF(base); diff --git a/src/numbuf/thirdparty/build_thirdparty.sh b/src/numbuf/thirdparty/build_thirdparty.sh index 2e1421f10..c1249a124 100755 --- a/src/numbuf/thirdparty/build_thirdparty.sh +++ b/src/numbuf/thirdparty/build_thirdparty.sh @@ -24,5 +24,5 @@ echo "building arrow" cd $TP_DIR/arrow/cpp mkdir -p $TP_DIR/arrow/cpp/build cd $TP_DIR/arrow/cpp/build -cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS="-g" -DCMAKE_CXX_FLAGS="-g" -DARROW_BUILD_TESTS=OFF .. +cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS="-g -O3 -march=native -mtune=native" -DCMAKE_CXX_FLAGS="-g -O3 -march=native -mtune=native" -DARROW_BUILD_TESTS=OFF -DARROW_PYTHON=on .. make VERBOSE=1 -j$PARALLEL diff --git a/src/numbuf/thirdparty/download_thirdparty.sh b/src/numbuf/thirdparty/download_thirdparty.sh index faaefd1a2..a05f98cf1 100755 --- a/src/numbuf/thirdparty/download_thirdparty.sh +++ b/src/numbuf/thirdparty/download_thirdparty.sh @@ -11,4 +11,4 @@ if [ ! -d $TP_DIR/arrow ]; then git clone https://github.com/apache/arrow/ "$TP_DIR/arrow" fi cd $TP_DIR/arrow -git checkout 067cd4ebfbd9be9b607658a2a249017cc6db84f9 +git checkout b0863cb63d62ae7c4a429164e5a2e350d3c1f21a diff --git a/src/plasma/CMakeLists.txt b/src/plasma/CMakeLists.txt index 22c0deacd..879464807 100644 --- a/src/plasma/CMakeLists.txt +++ b/src/plasma/CMakeLists.txt @@ -11,8 +11,8 @@ endif(APPLE) include_directories("${PYTHON_INCLUDE_DIRS}" thirdparty) -set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L") -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --std=c++11 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L") +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -march=native -mtune=native -O3") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --std=c++11 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -march=native -mtune=native -O3") # Compile flatbuffers