diff --git a/src/serialize.cc b/src/serialize.cc new file mode 100644 index 000000000..c564eae6f --- /dev/null +++ b/src/serialize.cc @@ -0,0 +1,201 @@ +#include "serialize.h" + +using namespace arrow; + +template +struct npy_traits { +}; + +template <> +struct npy_traits { + typedef uint8_t value_type; + static const std::shared_ptr primitive_type; + using ArrayType = arrow::BooleanArray; +}; + +const std::shared_ptr npy_traits::primitive_type = std::make_shared(); + +#define NPY_INT_DECL(TYPE, CapType, T) \ + template <> \ + struct npy_traits { \ + typedef T value_type; \ + static const std::shared_ptr primitive_type; \ + using ArrayType = arrow::CapType##Array; \ + }; \ + \ + const std::shared_ptr npy_traits::primitive_type = std::make_shared(); + +NPY_INT_DECL(INT8, Int8, int8_t); +NPY_INT_DECL(INT16, Int16, int16_t); +NPY_INT_DECL(INT32, Int32, int32_t); +NPY_INT_DECL(INT64, Int64, int64_t); +NPY_INT_DECL(UINT8, UInt8, uint8_t); +NPY_INT_DECL(UINT16, UInt16, uint16_t); +NPY_INT_DECL(UINT32, UInt32, uint32_t); +NPY_INT_DECL(UINT64, UInt64, uint64_t); + +template <> +struct npy_traits { + typedef float value_type; + static const std::shared_ptr primitive_type; + using ArrayType = arrow::FloatArray; +}; + +const std::shared_ptr npy_traits::primitive_type = std::make_shared(); + +template <> +struct npy_traits { + typedef double value_type; + static const std::shared_ptr primitive_type; + using ArrayType = arrow::DoubleArray; +}; + +const std::shared_ptr npy_traits::primitive_type = std::make_shared(); + +template <> +struct npy_traits { + typedef PyObject* value_type; +}; + +template +std::shared_ptr make_flat_array(const std::string& fieldname, size_t size, std::shared_ptr data) { + auto field = std::make_shared(fieldname, npy_traits::primitive_type); + std::shared_ptr schema(new arrow::Schema({field})); + auto array = std::make_shared::ArrayType>(size, data); + return std::shared_ptr(new RowBatch(schema, size, {array})); +} + +const int64_t MAX_METADATA_SIZE = 5000; + +#define SIZE_ARROW_CASE(TYPE) \ + case TYPE: \ + return size * sizeof(npy_traits::value_type) + MAX_METADATA_SIZE; + +size_t arrow_size(PyArrayObject* array) { + npy_intp size = PyArray_SIZE(array); + switch (PyArray_TYPE(array)) { + SIZE_ARROW_CASE(NPY_INT8) + SIZE_ARROW_CASE(NPY_INT16) + SIZE_ARROW_CASE(NPY_INT32) + SIZE_ARROW_CASE(NPY_INT64) + SIZE_ARROW_CASE(NPY_UINT8) + SIZE_ARROW_CASE(NPY_UINT16) + SIZE_ARROW_CASE(NPY_UINT32) + SIZE_ARROW_CASE(NPY_UINT64) + SIZE_ARROW_CASE(NPY_FLOAT) + SIZE_ARROW_CASE(NPY_DOUBLE) + default: + ORCH_LOG(ORCH_FATAL, "serialization: numpy datatype not know"); + } +} + +#define SERIALIZE_ARROW_CASE(TYPE) \ + case TYPE: \ + { \ + data = std::make_shared(reinterpret_cast(PyArray_DATA(array)), sizeof(npy_traits::value_type) * size); \ + batch_size = size * sizeof(npy_traits::value_type) + MAX_METADATA_SIZE; \ + batch = make_flat_array("data", size, data); \ + } \ + break; + +// TODO(pcm): At the moment, this assumes that arrays are consecutive in memory +void store_arrow(PyArrayObject* array, ObjHandle& location, MemorySegmentPool* pool) { + npy_intp size = PyArray_SIZE(array); + std::shared_ptr data; + std::shared_ptr batch; + int64_t batch_size = 0; + switch (PyArray_TYPE(array)) { + SERIALIZE_ARROW_CASE(NPY_INT8) + SERIALIZE_ARROW_CASE(NPY_INT16) + SERIALIZE_ARROW_CASE(NPY_INT32) + SERIALIZE_ARROW_CASE(NPY_INT64) + SERIALIZE_ARROW_CASE(NPY_UINT8) + SERIALIZE_ARROW_CASE(NPY_UINT16) + SERIALIZE_ARROW_CASE(NPY_UINT32) + SERIALIZE_ARROW_CASE(NPY_UINT64) + SERIALIZE_ARROW_CASE(NPY_FLOAT) + SERIALIZE_ARROW_CASE(NPY_DOUBLE) + default: + ORCH_LOG(ORCH_FATAL, "serialization: numpy datatype not know"); + } + + // int64_t data_batch_size = ipc::GetRowBatchSize(batch.get()); // FIXME(pcm): once GetRowBatchSize is implemented, use it + + size_t ndim = PyArray_NDIM(array); + MemoryPool* default_pool = arrow::default_memory_pool(); + + auto metadata = std::make_shared(default_pool); + size_t metadata_size = 1 + ndim + 1; // dtype, list of shapes, pointer to header of the data segment + metadata->Resize(metadata_size * sizeof(int64_t)); + + int64_t* buffer = reinterpret_cast(metadata->mutable_data()); + buffer[0] = PyArray_TYPE(array); + // serialize the shape information + for (size_t i = 0; i < ndim; ++i) { + buffer[i+1] = PyArray_DIM(array, i); + } + std::shared_ptr metadata_batch = make_flat_array("metadata", metadata_size, metadata); + + // int64_t metadata_batch_size = ipc::GetRowBatchSize(metadata_batch.get()); // FIXME(pcm): once GetRowBatchSize is implemented, use it + + uint8_t* address = pool->get_address(location); + auto source = std::make_shared(address, location.size()); + + int64_t data_header_offset = 0; + ipc::WriteRowBatch(source.get(), batch.get(), 0, &data_header_offset); + + buffer[1 + ndim] = data_header_offset; + + int64_t metadata_header_offset = 0; + ipc::WriteRowBatch(source.get(), metadata_batch.get(), location.size() + MAX_METADATA_SIZE/2, &metadata_header_offset); + location.set_metadata_offset(metadata_header_offset); +} + +template +std::shared_ptr read_flat_array(BufferMemorySource* source, int64_t metadata_offset) { + std::shared_ptr reader; + Status s = ipc::RowBatchReader::Open(source, metadata_offset, &reader); + if (!s.ok()) { + ORCH_LOG(ORCH_FATAL, s.ToString()); + } + auto field = std::make_shared("data", npy_traits::primitive_type); + std::shared_ptr schema(new arrow::Schema({field})); + std::shared_ptr data; + reader->GetRowBatch(schema, &data); + return data->column(0); + +} + +#define DESERIALIZE_ARROW_CASE(TYPE) \ + case TYPE: \ + { \ + auto array = read_flat_array(source.get(), buffer[metadata_array->length()-1]); \ + auto data_primitive_array = dynamic_cast::ArrayType*>(array.get()); \ + return PyArray_SimpleNewFromData(dims.size(), &dims[0], TYPE, (void*)data_primitive_array->raw_data()); \ + } + +PyObject* deserialize_array(ObjHandle handle, MemorySegmentPool* pool) { + auto source = std::make_shared(pool->get_address(handle), handle.size()); + auto metadata_array = read_flat_array(source.get(), handle.metadata_offset()); + const uint64_t* buffer = dynamic_cast(metadata_array.get())->raw_data(); + uint64_t type = buffer[0]; + std::vector dims; + for (int i = 1; i < metadata_array->length()-1; ++i) { + dims.push_back(buffer[i]); + } + + switch (type) { + DESERIALIZE_ARROW_CASE(NPY_INT8) + DESERIALIZE_ARROW_CASE(NPY_INT16) + DESERIALIZE_ARROW_CASE(NPY_INT32) + DESERIALIZE_ARROW_CASE(NPY_INT64) + DESERIALIZE_ARROW_CASE(NPY_UINT8) + DESERIALIZE_ARROW_CASE(NPY_UINT16) + DESERIALIZE_ARROW_CASE(NPY_UINT32) + DESERIALIZE_ARROW_CASE(NPY_UINT64) + DESERIALIZE_ARROW_CASE(NPY_FLOAT) + DESERIALIZE_ARROW_CASE(NPY_DOUBLE) + default: + ORCH_LOG(ORCH_FATAL, "deserialization: numpy datatype not know"); + } +} diff --git a/src/serialize.h b/src/serialize.h new file mode 100644 index 000000000..efe01eb35 --- /dev/null +++ b/src/serialize.h @@ -0,0 +1,21 @@ +#ifndef ORCHESTRA_SERIALIZE_H +#define ORCHESTRA_SERIALIZE_H + +#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION + +#include +#include +#include +#include +#define NO_IMPORT_ARRAY +#define PY_ARRAY_UNIQUE_SYMBOL ORCHESTRA_ARRAY_API +#include +#include + +#include "ipc.h" + +size_t arrow_size(PyArrayObject* array); +void store_arrow(PyArrayObject* array, ObjHandle& location, MemorySegmentPool* pool); +PyObject* deserialize_array(ObjHandle handle, MemorySegmentPool* pool); + +#endif