diff --git a/CMakeLists.txt b/CMakeLists.txt index bda3778bb..efcc8b70d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,7 +20,7 @@ endif() set(ARROW_DIR "${CMAKE_SOURCE_DIR}/../arrow/" CACHE STRING "Path of the arrow source directory") -set(ARROW_STATIC_LIB "${CMAKE_SOURCE_DIR}/../arrow/cpp/build/debug/libarrow.so" CACHE STRING +set(ARROW_STATIC_LIB "${CMAKE_SOURCE_DIR}/../arrow/cpp/build/release/libarrow.so" CACHE STRING "Path to libarrow.a (needs to be changed if arrow is build in debug mode)") include_directories("${ARROW_DIR}/cpp/src/") diff --git a/python/src/pynumbuf/adapters/numpy.cc b/python/src/pynumbuf/adapters/numpy.cc index 6ca6753ca..67537fd25 100644 --- a/python/src/pynumbuf/adapters/numpy.cc +++ b/python/src/pynumbuf/adapters/numpy.cc @@ -57,31 +57,47 @@ Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder) { for (int i = 0; i < ndim; ++i) { dims[i] = PyArray_DIM(array, i); } - auto data = PyArray_DATA(array); + // TODO(pcm): Once we don't use builders any more below and directly share + // the memory buffer, we need to be more careful about this and not + // decrease the reference count of "contiguous" before the serialization + // is finished + auto contiguous = PyArray_GETCONTIGUOUS(array); + auto data = PyArray_DATA(contiguous); switch (dtype) { case NPY_UINT8: - return builder.Append(dims, reinterpret_cast(data)); + RETURN_NOT_OK(builder.Append(dims, reinterpret_cast(data))); + break; case NPY_INT8: - return builder.Append(dims, reinterpret_cast(data)); + RETURN_NOT_OK(builder.Append(dims, reinterpret_cast(data))); + break; case NPY_UINT16: - return builder.Append(dims, reinterpret_cast(data)); + RETURN_NOT_OK(builder.Append(dims, reinterpret_cast(data))); + break; case NPY_INT16: - return builder.Append(dims, reinterpret_cast(data)); + RETURN_NOT_OK(builder.Append(dims, reinterpret_cast(data))); + break; case NPY_UINT32: - return builder.Append(dims, reinterpret_cast(data)); + RETURN_NOT_OK(builder.Append(dims, reinterpret_cast(data))); + break; case NPY_INT32: - return builder.Append(dims, reinterpret_cast(data)); + RETURN_NOT_OK(builder.Append(dims, reinterpret_cast(data))); + break; case NPY_UINT64: - return builder.Append(dims, reinterpret_cast(data)); + RETURN_NOT_OK(builder.Append(dims, reinterpret_cast(data))); + break; case NPY_INT64: - return builder.Append(dims, reinterpret_cast(data)); + RETURN_NOT_OK(builder.Append(dims, reinterpret_cast(data))); + break; case NPY_FLOAT: - return builder.Append(dims, reinterpret_cast(data)); + RETURN_NOT_OK(builder.Append(dims, reinterpret_cast(data))); + break; case NPY_DOUBLE: - return builder.Append(dims, reinterpret_cast(data)); + RETURN_NOT_OK(builder.Append(dims, reinterpret_cast(data))); + break; default: DCHECK(false) << "numpy data type not recognized: " << dtype; } + Py_XDECREF(contiguous); return Status::OK(); } diff --git a/python/src/pynumbuf/adapters/python.cc b/python/src/pynumbuf/adapters/python.cc index 751ebf91b..580c02f1d 100644 --- a/python/src/pynumbuf/adapters/python.cc +++ b/python/src/pynumbuf/adapters/python.cc @@ -96,8 +96,13 @@ Status SerializeSequences(std::vector sequences, std::shared_ptr **result) { +int PyObjectToArrow(PyObject* object, std::shared_ptr **result) { if (PyCapsule_IsValid(object, "arrow")) { - *result = reinterpret_cast*>(PyCapsule_GetPointer(object, "arrow")); + *result = reinterpret_cast*>(PyCapsule_GetPointer(object, "arrow")); return 1; } else { PyErr_SetString(PyExc_TypeError, "must be an 'arrow' capsule"); @@ -33,7 +31,7 @@ int PyObjectToArrow(PyObject* object, std::shared_ptr **result) { } static void ArrowCapsule_Destructor(PyObject* capsule) { - delete reinterpret_cast*>(PyCapsule_GetPointer(capsule, "arrow")); + delete reinterpret_cast*>(PyCapsule_GetPointer(capsule, "arrow")); } std::shared_ptr make_row_batch(std::shared_ptr data) { @@ -48,44 +46,45 @@ std::shared_ptr make_row_batch(std::shared_ptr data) { The argument must be a Python list \returns - A Python "arrow" capsule containing the arrow::Array + A bytearray object containing the schema metadata + + The size in bytes the serialized object will occupy in memory + + A Python "arrow" capsule containing the RowBatch */ PyObject* serialize_list(PyObject* self, PyObject* args) { PyObject* value; if (!PyArg_ParseTuple(args, "O", &value)) { return NULL; } - std::shared_ptr* result = new std::shared_ptr(); + std::shared_ptr array; if (PyList_Check(value)) { - Status s = SerializeSequences(std::vector({value}), result); + Status s = SerializeSequences(std::vector({value}), &array); if (!s.ok()) { PyErr_SetString(NumbufError, s.ToString().c_str()); return NULL; } - return PyCapsule_New(reinterpret_cast(result), "arrow", &ArrowCapsule_Destructor); + + auto batch = new std::shared_ptr(); + *batch = make_row_batch(array); + + int64_t size = 0; + ARROW_CHECK_OK(arrow::ipc::GetRowBatchSize(batch->get(), &size)); + + std::shared_ptr buffer; + ARROW_CHECK_OK(ipc::WriteSchema((*batch)->schema().get(), &buffer)); + auto ptr = reinterpret_cast(buffer->data()); + + PyObject* r = PyTuple_New(3); + PyTuple_SetItem(r, 0, PyByteArray_FromStringAndSize(ptr, buffer->size())); + PyTuple_SetItem(r, 1, PyInt_FromLong(size)); + PyTuple_SetItem(r, 2, PyCapsule_New(reinterpret_cast(batch), + "arrow", &ArrowCapsule_Destructor)); + return r; } return NULL; } -/*! Number of bytes the serialized version of the object will take. - - \param args - A Python "arrow" capsule containing the arrow::Array - - \returns - Size of the object in memory once it is serialized -*/ -PyObject* get_serialized_size(PyObject* self, PyObject* args) { - std::shared_ptr* data; - if (!PyArg_ParseTuple(args, "O&", &PyObjectToArrow, &data)) { - return NULL; - } - auto batch = make_row_batch(*data); - int64_t size = 0; - ARROW_CHECK_OK(arrow::ipc::GetRowBatchSize(batch.get(), &size)); - return PyInt_FromLong(size); -} - /*! Serialize an arrow::Array into a buffer. \param args @@ -96,42 +95,21 @@ PyObject* get_serialized_size(PyObject* self, PyObject* args) { The arrow metadata offset for the arrow metadata */ PyObject* write_to_buffer(PyObject* self, PyObject* args) { - std::shared_ptr* data; + std::shared_ptr* batch; PyObject* memoryview; - if (!PyArg_ParseTuple(args, "O&O", &PyObjectToArrow, &data, &memoryview)) { + if (!PyArg_ParseTuple(args, "O&O", &PyObjectToArrow, &batch, &memoryview)) { return NULL; } if (!PyMemoryView_Check(memoryview)) { return NULL; } - auto batch = make_row_batch(*data); Py_buffer* buffer = PyMemoryView_GET_BUFFER(memoryview); auto target = std::make_shared(reinterpret_cast(buffer->buf), buffer->len); int64_t metadata_offset; - ARROW_CHECK_OK(ipc::WriteRowBatch(target.get(), batch.get(), 0, &metadata_offset)); + ARROW_CHECK_OK(ipc::WriteRowBatch(target.get(), batch->get(), 0, &metadata_offset)); return PyInt_FromLong(metadata_offset); } -/*! Serialize schema metadata associated to and arrow::Array - - \param args - A Python "arrow" capsule containing the arrow::Array - - \return - A bytearray object containing the schema metadata -*/ -PyObject* get_schema_metadata(PyObject* self, PyObject* args) { - std::shared_ptr* data; - if (!PyArg_ParseTuple(args, "O&", &PyObjectToArrow, &data)) { - return NULL; - } - auto batch = make_row_batch(*data); - std::shared_ptr buffer; - ARROW_CHECK_OK(ipc::WriteSchema(batch->schema().get(), &buffer)); - auto ptr = reinterpret_cast(buffer->data()); - return PyByteArray_FromStringAndSize(ptr, buffer->size()); -} - /*! Read serialized data from buffer and produce an arrow capsule \param args @@ -139,7 +117,7 @@ PyObject* get_schema_metadata(PyObject* self, PyObject* args) { a Python bytearray containing the metadata and the metadata_offset \return - A Python "arrow" capsule containing the arrow data + A Python "arrow" capsule containing the arrow RowBatch */ PyObject* read_from_buffer(PyObject* self, PyObject* args) { PyObject* memoryview; @@ -162,33 +140,30 @@ PyObject* read_from_buffer(PyObject* self, PyObject* args) { auto source = std::make_shared(reinterpret_cast(buffer->buf), buffer->len); std::shared_ptr reader; ARROW_CHECK_OK(arrow::ipc::RowBatchReader::Open(source.get(), metadata_offset, &reader)); - std::shared_ptr data; - ARROW_CHECK_OK(reader->GetRowBatch(schema, &data)); + auto batch = new std::shared_ptr(); + ARROW_CHECK_OK(reader->GetRowBatch(schema, batch)); - std::shared_ptr* result = new std::shared_ptr(); - *result = data->column(0); - return PyCapsule_New(reinterpret_cast(result), "arrow", &ArrowCapsule_Destructor); + return PyCapsule_New(reinterpret_cast(batch), + "arrow", &ArrowCapsule_Destructor); } /*! */ PyObject* deserialize_list(PyObject* self, PyObject* args) { - std::shared_ptr* data; + std::shared_ptr* data; if (!PyArg_ParseTuple(args, "O&", &PyObjectToArrow, &data)) { return NULL; } PyObject* result; - ARROW_CHECK_OK(DeserializeList(*data, 0, (*data)->length(), &result)); + ARROW_CHECK_OK(DeserializeList((*data)->column(0), 0, (*data)->num_rows(), &result)); return result; } static PyMethodDef NumbufMethods[] = { { "serialize_list", serialize_list, METH_VARARGS, "serialize a Python list" }, { "deserialize_list", deserialize_list, METH_VARARGS, "deserialize a Python list" }, - { "get_serialized_size", get_serialized_size, METH_VARARGS, "get the number of bytes the object will occupy once serialized" }, { "write_to_buffer", write_to_buffer, METH_VARARGS, "write serialized data to buffer"}, { "read_from_buffer", read_from_buffer, METH_VARARGS, "read serialized data from buffer"}, - { "get_schema_metadata", get_schema_metadata, METH_VARARGS, "return the schema of an arrow object"}, { NULL, NULL, 0, NULL } }; diff --git a/python/test/runtest.py b/python/test/runtest.py index f3697d51f..9cfa62c92 100644 --- a/python/test/runtest.py +++ b/python/test/runtest.py @@ -15,7 +15,7 @@ TEST_OBJECTS = [[1, "hello", 3.0], 42, 43L, "hello world", 42.0, 1L << 62, class SerializationTests(unittest.TestCase): def roundTripTest(self, data): - serialized = libnumbuf.serialize_list(data) + schema, size, serialized = libnumbuf.serialize_list(data) result = libnumbuf.deserialize_list(serialized) assert_equal(data, result) @@ -52,12 +52,13 @@ class SerializationTests(unittest.TestCase): def testBuffer(self): for (i, obj) in enumerate(TEST_OBJECTS): - x = libnumbuf.serialize_list([1, 2, 3]) - schema = libnumbuf.get_schema_metadata(x) - size = libnumbuf.get_serialized_size(x) + 4096 # INITIAL_METADATA_SIZE in arrow + schema, size, batch = libnumbuf.serialize_list([obj]) + size = size + 4096 # INITIAL_METADATA_SIZE in arrow buff = np.zeros(size, dtype="uint8") - metadata_offset = libnumbuf.write_to_buffer(x, memoryview(buff)) + metadata_offset = libnumbuf.write_to_buffer(batch, memoryview(buff)) array = libnumbuf.read_from_buffer(memoryview(buff), schema, metadata_offset) + result = libnumbuf.deserialize_list(array) + assert_equal(result[0], obj) if __name__ == "__main__": unittest.main()