From 4618fd45b1b45abce0146b148f5d6392eaea92ca Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Mon, 20 Mar 2017 16:31:46 -0700 Subject: [PATCH] Port Ray to latest Arrow version (#370) * rebase on top of latest arrow * clang-format * address comments * fix --- src/common/common.h | 4 + src/numbuf/cpp/src/numbuf/dict.cc | 2 +- src/numbuf/cpp/src/numbuf/sequence.cc | 10 +- src/numbuf/cpp/src/numbuf/sequence.h | 13 +- src/numbuf/cpp/src/numbuf/tensor.cc | 2 +- src/numbuf/cpp/src/numbuf/tensor.h | 4 +- .../python/src/pynumbuf/adapters/numpy.cc | 22 ++-- .../python/src/pynumbuf/adapters/python.cc | 44 +++---- src/numbuf/python/src/pynumbuf/memory.h | 22 ++++ src/numbuf/python/src/pynumbuf/numbuf.cc | 124 ++++++++---------- src/numbuf/python/test/runtest.py | 16 +-- src/numbuf/thirdparty/download_thirdparty.sh | 6 +- 12 files changed, 148 insertions(+), 121 deletions(-) diff --git a/src/common/common.h b/src/common/common.h index f5aa17b7c..398ab7fad 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -44,7 +44,11 @@ extern "C" { #define RAY_COMMON_LOG_LEVEL RAY_COMMON_INFO #endif +/* Arrow defines the same macro, only define it if it has not already been + * defined. */ +#ifndef UNUSED #define UNUSED(x) ((void) (x)) +#endif /** * Macros to enable each level of Ray logging statements depending on the diff --git a/src/numbuf/cpp/src/numbuf/dict.cc b/src/numbuf/cpp/src/numbuf/dict.cc index 047191d8c..14daefa47 100644 --- a/src/numbuf/cpp/src/numbuf/dict.cc +++ b/src/numbuf/cpp/src/numbuf/dict.cc @@ -16,7 +16,7 @@ Status DictBuilder::Finish(std::shared_ptr key_tuple_data, auto vals_field = std::make_shared("vals", vals->type()); auto type = std::make_shared(std::vector({keys_field, vals_field})); - std::vector field_arrays({keys, vals}); + std::vector> field_arrays({keys, vals}); DCHECK(keys->length() == vals->length()); out->reset(new StructArray(type, keys->length(), field_arrays)); return Status::OK(); diff --git a/src/numbuf/cpp/src/numbuf/sequence.cc b/src/numbuf/cpp/src/numbuf/sequence.cc index 05f72f059..e34f4d511 100644 --- a/src/numbuf/cpp/src/numbuf/sequence.cc +++ b/src/numbuf/cpp/src/numbuf/sequence.cc @@ -13,7 +13,7 @@ SequenceBuilder::SequenceBuilder(MemoryPool* pool) bools_(pool, std::make_shared()), ints_(pool, std::make_shared()), bytes_(pool, std::make_shared()), - strings_(pool, std::make_shared()), + strings_(pool), floats_(pool, std::make_shared()), doubles_(pool, std::make_shared()), uint8_tensors_(std::make_shared(), pool), @@ -152,6 +152,7 @@ Status SequenceBuilder::AppendDict(int32_t size) { types[TAG] = std::make_shared("", VARNAME.type()); \ RETURN_NOT_OK(VARNAME.Finish(&children[TAG])); \ RETURN_NOT_OK(nones_.AppendToBitmap(true)); \ + type_ids.push_back(TAG); \ } #define ADD_SUBSEQUENCE(DATA, OFFSETS, BUILDER, TAG, NAME) \ @@ -166,6 +167,7 @@ Status SequenceBuilder::AppendDict(int32_t size) { ARROW_CHECK_OK(list_builder->Append(OFFSETS.data(), OFFSETS.size())); \ builder.Append(); \ ADD_ELEMENT(builder, TAG); \ + type_ids.push_back(TAG); \ } else { \ DCHECK(OFFSETS.size() == 1); \ } @@ -174,7 +176,8 @@ Status SequenceBuilder::Finish(std::shared_ptr list_data, std::shared_ptr tuple_data, std::shared_ptr dict_data, std::shared_ptr* out) { std::vector> types(num_tags); - std::vector children(num_tags); + std::vector> children(num_tags); + std::vector type_ids; ADD_ELEMENT(bools_, bool_tag); ADD_ELEMENT(ints_, int_tag); @@ -201,10 +204,9 @@ Status SequenceBuilder::Finish(std::shared_ptr list_data, ADD_SUBSEQUENCE(tuple_data, tuple_offsets_, tuple_builder, tuple_tag, "tuple"); ADD_SUBSEQUENCE(dict_data, dict_offsets_, dict_builder, dict_tag, "dict"); - std::vector type_ids = {}; TypePtr type = TypePtr(new UnionType(types, type_ids, UnionMode::DENSE)); out->reset(new UnionArray(type, types_.length(), children, types_.data(), - offsets_.data(), nones_.null_count(), nones_.null_bitmap())); + offsets_.data(), nones_.null_bitmap(), nones_.null_count())); return Status::OK(); } } diff --git a/src/numbuf/cpp/src/numbuf/sequence.h b/src/numbuf/cpp/src/numbuf/sequence.h index 476f9c45d..95af937b8 100644 --- a/src/numbuf/cpp/src/numbuf/sequence.h +++ b/src/numbuf/cpp/src/numbuf/sequence.h @@ -3,10 +3,19 @@ #include "tensor.h" #include -#include namespace numbuf { +class NullArrayBuilder : public arrow::ArrayBuilder { + public: + explicit NullArrayBuilder(arrow::MemoryPool* pool, const arrow::TypePtr& type) + : arrow::ArrayBuilder(pool, type) {} + virtual ~NullArrayBuilder(){}; + arrow::Status Finish(std::shared_ptr* out) override { + return arrow::Status::OK(); + } +}; + /*! A Sequence is a heterogeneous collections of elements. It can contain scalar Python types, lists, tuples, dictionaries and tensors. */ @@ -92,7 +101,7 @@ class SequenceBuilder { /* Total number of bytes needed to represent this sequence. */ int64_t total_num_bytes_; - arrow::NullArrayBuilder nones_; + NullArrayBuilder nones_; arrow::BooleanBuilder bools_; arrow::Int64Builder ints_; arrow::BinaryBuilder bytes_; diff --git a/src/numbuf/cpp/src/numbuf/tensor.cc b/src/numbuf/cpp/src/numbuf/tensor.cc index db9325563..c5602af90 100644 --- a/src/numbuf/cpp/src/numbuf/tensor.cc +++ b/src/numbuf/cpp/src/numbuf/tensor.cc @@ -29,7 +29,7 @@ Status TensorBuilder::Append(const std::vector& dims, const elem_typ RETURN_NOT_OK(tensors_->Append()); RETURN_NOT_OK(dims_->Append()); RETURN_NOT_OK(values_->Append()); - int32_t size = 1; + int64_t size = 1; for (auto dim : dims) { size *= dim; RETURN_NOT_OK(dim_data_->Append(dim)); diff --git a/src/numbuf/cpp/src/numbuf/tensor.h b/src/numbuf/cpp/src/numbuf/tensor.h index 2725f4615..36e078508 100644 --- a/src/numbuf/cpp/src/numbuf/tensor.h +++ b/src/numbuf/cpp/src/numbuf/tensor.h @@ -2,7 +2,7 @@ #define NUMBUF_TENSOR_H #include -#include +#include #include namespace numbuf { @@ -38,7 +38,7 @@ class TensorBuilder { //! Number of tensors in the column int32_t length() { return tensors_->length(); } - const arrow::TypePtr& type() { return tensors_->type(); } + std::shared_ptr type() { return tensors_->type(); } private: arrow::TypePtr dtype_; diff --git a/src/numbuf/python/src/pynumbuf/adapters/numpy.cc b/src/numbuf/python/src/pynumbuf/adapters/numpy.cc index 42b79f252..727f7747a 100644 --- a/src/numbuf/python/src/pynumbuf/adapters/numpy.cc +++ b/src/numbuf/python/src/pynumbuf/adapters/numpy.cc @@ -18,15 +18,15 @@ namespace numbuf { case Type::TYPE: \ return NPY_##TYPE; -#define DESERIALIZE_ARRAY_CASE(TYPE, ArrayType, type) \ - case Type::TYPE: { \ - auto values = std::dynamic_pointer_cast(content->values()); \ - DCHECK(values); \ - type* data = const_cast(values->raw_data()) + content->offset(offset); \ - *out = PyArray_SimpleNewFromData( \ - num_dims, dim.data(), NPY_##TYPE, reinterpret_cast(data)); \ - if (base != Py_None) { PyArray_SetBaseObject((PyArrayObject*)*out, base); } \ - Py_XINCREF(base); \ +#define DESERIALIZE_ARRAY_CASE(TYPE, ArrayType, type) \ + case Type::TYPE: { \ + auto values = std::dynamic_pointer_cast(content->values()); \ + DCHECK(values); \ + type* data = const_cast(values->raw_data()) + content->value_offset(offset); \ + *out = PyArray_SimpleNewFromData( \ + num_dims, dim.data(), NPY_##TYPE, reinterpret_cast(data)); \ + if (base != Py_None) { PyArray_SetBaseObject((PyArrayObject*)*out, base); } \ + Py_XINCREF(base); \ } break; Status DeserializeArray( @@ -38,8 +38,8 @@ Status DeserializeArray( auto content = std::dynamic_pointer_cast(tensor->field(1)); npy_intp num_dims = dims->value_length(offset); std::vector dim(num_dims); - for (int i = dims->offset(offset); i < dims->offset(offset + 1); ++i) { - dim[i - dims->offset(offset)] = + for (int i = dims->value_offset(offset); i < dims->value_offset(offset + 1); ++i) { + dim[i - dims->value_offset(offset)] = std::dynamic_pointer_cast(dims->values())->Value(i); } switch (content->value_type()->type) { diff --git a/src/numbuf/python/src/pynumbuf/adapters/python.cc b/src/numbuf/python/src/pynumbuf/adapters/python.cc index 6cf88bf8c..ca5aba393 100644 --- a/src/numbuf/python/src/pynumbuf/adapters/python.cc +++ b/src/numbuf/python/src/pynumbuf/adapters/python.cc @@ -20,8 +20,8 @@ namespace numbuf { #define PyInt_FromLong PyLong_FromLong #endif -Status get_value( - ArrayPtr arr, int32_t index, int32_t type, PyObject* base, PyObject** result) { +Status get_value(std::shared_ptr arr, int32_t index, int32_t type, PyObject* base, + PyObject** result) { switch (arr->type()->type) { case Type::BOOL: *result = @@ -181,26 +181,26 @@ Status SerializeSequences(std::vector sequences, int32_t recursion_de return builder.Finish(list, tuple, dict, out); } -#define DESERIALIZE_SEQUENCE(CREATE, SET_ITEM) \ - auto data = std::dynamic_pointer_cast(array); \ - int32_t size = array->length(); \ - PyObject* result = CREATE(stop_idx - start_idx); \ - auto types = std::make_shared(size, data->types()); \ - auto offsets = std::make_shared(size, data->offset_buf()); \ - for (size_t i = start_idx; i < stop_idx; ++i) { \ - if (data->IsNull(i)) { \ - Py_INCREF(Py_None); \ - SET_ITEM(result, i - start_idx, Py_None); \ - } else { \ - int32_t offset = offsets->Value(i); \ - int8_t type = types->Value(i); \ - ArrayPtr arr = data->child(type); \ - PyObject* value; \ - RETURN_NOT_OK(get_value(arr, offset, type, base, &value)); \ - SET_ITEM(result, i - start_idx, value); \ - } \ - } \ - *out = result; \ +#define DESERIALIZE_SEQUENCE(CREATE, SET_ITEM) \ + auto data = std::dynamic_pointer_cast(array); \ + int32_t size = array->length(); \ + PyObject* result = CREATE(stop_idx - start_idx); \ + auto types = std::make_shared(size, data->type_ids()); \ + auto offsets = std::make_shared(size, data->value_offsets()); \ + for (size_t i = start_idx; i < stop_idx; ++i) { \ + if (data->IsNull(i)) { \ + Py_INCREF(Py_None); \ + SET_ITEM(result, i - start_idx, Py_None); \ + } else { \ + int32_t offset = offsets->Value(i); \ + int8_t type = types->Value(i); \ + std::shared_ptr arr = data->child(type); \ + PyObject* value; \ + RETURN_NOT_OK(get_value(arr, offset, type, base, &value)); \ + SET_ITEM(result, i - start_idx, value); \ + } \ + } \ + *out = result; \ return Status::OK(); Status DeserializeList(std::shared_ptr array, int32_t start_idx, int32_t stop_idx, diff --git a/src/numbuf/python/src/pynumbuf/memory.h b/src/numbuf/python/src/pynumbuf/memory.h index 2acc39801..ed3c13a63 100644 --- a/src/numbuf/python/src/pynumbuf/memory.h +++ b/src/numbuf/python/src/pynumbuf/memory.h @@ -62,6 +62,28 @@ class FixedBufferStream : public arrow::io::OutputStream, int64_t size_; }; +class MockBufferStream : public arrow::io::OutputStream { + public: + virtual ~MockBufferStream() {} + + explicit MockBufferStream() : position_(0) {} + + arrow::Status Close() override { return arrow::Status::OK(); } + + arrow::Status Tell(int64_t* position) override { + *position = position_; + return arrow::Status::OK(); + } + + arrow::Status Write(const uint8_t* data, int64_t nbytes) override { + position_ += nbytes; + return arrow::Status::OK(); + } + + private: + int64_t position_; +}; + } // namespace numbuf #endif // PYNUMBUF_MEMORY_H diff --git a/src/numbuf/python/src/pynumbuf/numbuf.cc b/src/numbuf/python/src/pynumbuf/numbuf.cc index 8a694d050..a391fe062 100644 --- a/src/numbuf/python/src/pynumbuf/numbuf.cc +++ b/src/numbuf/python/src/pynumbuf/numbuf.cc @@ -1,6 +1,4 @@ #include -#include -#include #define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION #define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API #include @@ -9,7 +7,8 @@ #include -#include +#include +#include #include "adapters/python.h" #include "memory.h" @@ -28,33 +27,40 @@ PyObject* NumbufPlasmaObjectExistsError; #endif +// Each arrow object is stored in the format +// | length of the object in bytes | object data |. +// LENGTH_PREFIX_SIZE is the number of bytes occupied by the +// object length field. +constexpr int64_t LENGTH_PREFIX_SIZE = sizeof(int64_t); + using namespace arrow; using namespace numbuf; -int64_t make_schema_and_batch(std::shared_ptr data, - std::shared_ptr* metadata_out, std::shared_ptr* batch_out) { +std::shared_ptr make_batch(std::shared_ptr data) { auto field = std::make_shared("list", data->type()); std::shared_ptr schema(new Schema({field})); - *batch_out = - std::shared_ptr(new RecordBatch(schema, data->length(), {data})); - int64_t size = 0; - ARROW_CHECK_OK(ipc::GetRecordBatchSize(batch_out->get(), &size)); - ARROW_CHECK_OK(ipc::WriteSchema((*batch_out)->schema().get(), metadata_out)); + return std::shared_ptr(new RecordBatch(schema, data->length(), {data})); +} + +int64_t get_batch_size(std::shared_ptr batch) { + // Determine the size of the file by writing to a mock file. + auto mock = std::make_shared(); + std::shared_ptr writer; + ipc::FileWriter::Open(mock.get(), batch->schema(), &writer); + writer->WriteRecordBatch(*batch); + writer->Close(); + int64_t size; + ARROW_CHECK_OK(mock->Tell(&size)); return size; } -Status read_batch(std::shared_ptr schema_buffer, int64_t header_end_offset, - uint8_t* data, int64_t size, std::shared_ptr* batch_out) { - std::shared_ptr message; - RETURN_NOT_OK(ipc::Message::Open(schema_buffer, &message)); - DCHECK_EQ(ipc::Message::SCHEMA, message->type()); - std::shared_ptr schema_msg = message->GetSchema(); - std::shared_ptr schema; - RETURN_NOT_OK(schema_msg->GetSchema(&schema)); - auto source = std::make_shared(data, size); - std::shared_ptr reader; - RETURN_NOT_OK(ipc::RecordBatchReader::Open(source.get(), header_end_offset, &reader)); - RETURN_NOT_OK(reader->GetRecordBatch(schema, batch_out)); +Status read_batch(uint8_t* data, int64_t size, std::shared_ptr* batch_out) { + std::shared_ptr reader; + auto source = std::make_shared( + LENGTH_PREFIX_SIZE + data, size - LENGTH_PREFIX_SIZE); + int64_t data_size = *((int64_t*)data); + arrow::ipc::FileReader::Open(source, data_size, &reader); + reader->GetRecordBatch(0, batch_out); return Status::OK(); } @@ -104,14 +110,13 @@ static PyObject* serialize_list(PyObject* self, PyObject* args) { CHECK_SERIALIZATION_ERROR(s); auto batch = new std::shared_ptr(); - std::shared_ptr metadata; - int64_t size = make_schema_and_batch(array, &metadata, batch); + *batch = make_batch(array); - auto ptr = reinterpret_cast(metadata->data()); - PyObject* r = PyTuple_New(3); - PyTuple_SetItem(r, 0, PyByteArray_FromStringAndSize(ptr, metadata->size())); - PyTuple_SetItem(r, 1, PyLong_FromLong(size)); - PyTuple_SetItem(r, 2, + int64_t size = get_batch_size(*batch); + + PyObject* r = PyTuple_New(2); + PyTuple_SetItem(r, 0, PyLong_FromLong(LENGTH_PREFIX_SIZE + size)); + PyTuple_SetItem(r, 1, PyCapsule_New(reinterpret_cast(batch), "arrow", &ArrowCapsule_Destructor)); return r; } @@ -128,32 +133,26 @@ static PyObject* write_to_buffer(PyObject* self, PyObject* args) { if (!PyMemoryView_Check(memoryview)) { return NULL; } Py_buffer* buffer = PyMemoryView_GET_BUFFER(memoryview); auto target = std::make_shared( - reinterpret_cast(buffer->buf), buffer->len); - int64_t body_end_offset; - int64_t header_end_offset; - ARROW_CHECK_OK(ipc::WriteRecordBatch((*batch)->columns(), (*batch)->num_rows(), - target.get(), &body_end_offset, &header_end_offset)); - return PyLong_FromLong(header_end_offset); + LENGTH_PREFIX_SIZE + reinterpret_cast(buffer->buf), + buffer->len - LENGTH_PREFIX_SIZE); + std::shared_ptr writer; + ipc::FileWriter::Open(target.get(), (*batch)->schema(), &writer); + writer->WriteRecordBatch(*(*batch)); + writer->Close(); + *((int64_t*)buffer->buf) = buffer->len - LENGTH_PREFIX_SIZE; + Py_RETURN_NONE; } /* Documented in doc/numbuf.rst in ray-core */ static PyObject* read_from_buffer(PyObject* self, PyObject* args) { PyObject* data_memoryview; - PyObject* metadata_memoryview; - int64_t header_end_offset; - if (!PyArg_ParseTuple( - args, "OOL", &data_memoryview, &metadata_memoryview, &header_end_offset)) { - return NULL; - } + if (!PyArg_ParseTuple(args, "O", &data_memoryview)) { return NULL; } - Py_buffer* metadata_buffer = PyMemoryView_GET_BUFFER(metadata_memoryview); Py_buffer* data_buffer = PyMemoryView_GET_BUFFER(data_memoryview); - auto ptr = reinterpret_cast(metadata_buffer->buf); - auto schema_buffer = std::make_shared(ptr, metadata_buffer->len); auto batch = new std::shared_ptr(); - ARROW_CHECK_OK(read_batch(schema_buffer, header_end_offset, - reinterpret_cast(data_buffer->buf), data_buffer->len, batch)); + ARROW_CHECK_OK( + read_batch(reinterpret_cast(data_buffer->buf), data_buffer->len, batch)); return PyCapsule_New(reinterpret_cast(batch), "arrow", &ArrowCapsule_Destructor); } @@ -251,18 +250,16 @@ static PyObject* store_list(PyObject* self, PyObject* args) { Status s = SerializeSequences(std::vector({value}), recursion_depth, &array); CHECK_SERIALIZATION_ERROR(s); - std::shared_ptr batch; - std::shared_ptr metadata; - int64_t size = make_schema_and_batch(array, &metadata, &batch); + std::shared_ptr batch = make_batch(array); + int64_t size = get_batch_size(batch); uint8_t* data; /* The arrow schema is stored as the metadata of the plasma object and * both the arrow data and the header end offset are * stored in the plasma data buffer. The header end offset is stored in - * the first sizeof(int64_t) bytes of the data buffer. The RecordBatch + * the first LENGTH_PREFIX_SIZE bytes of the data buffer. The RecordBatch * data is stored after that. */ - int error_code = plasma_create(conn, obj_id, sizeof(size) + size, - (uint8_t*)metadata->data(), metadata->size(), &data); + int error_code = plasma_create(conn, obj_id, LENGTH_PREFIX_SIZE + size, NULL, 0, &data); if (error_code == PlasmaError_ObjectExists) { PyErr_SetString(NumbufPlasmaObjectExistsError, "An object with this ID already exists in the plasma " @@ -277,14 +274,13 @@ static PyObject* store_list(PyObject* self, PyObject* args) { } CHECK(error_code == PlasmaError_OK); - auto target = std::make_shared(sizeof(size) + data, size); - int64_t body_end_offset; - int64_t header_end_offset; - ARROW_CHECK_OK(ipc::WriteRecordBatch(batch->columns(), batch->num_rows(), target.get(), - &body_end_offset, &header_end_offset)); + auto target = std::make_shared(LENGTH_PREFIX_SIZE + data, size); + std::shared_ptr writer; + ipc::FileWriter::Open(target.get(), batch->schema(), &writer); + writer->WriteRecordBatch(*batch); + writer->Close(); + *((int64_t*)data) = size; - /* Save the header end offset at the beginning of the plasma data buffer. */ - *((int64_t*)data) = header_end_offset; /* Do the plasma_release corresponding to the call to plasma_create. */ plasma_release(conn, obj_id); /* Seal the object. */ @@ -349,15 +345,9 @@ static PyObject* retrieve_list(PyObject* self, PyObject* args) { PyCapsule_SetContext(base, plasma_conn); Py_XINCREF(plasma_conn); - /* Remember: The metadata offset was written at the beginning of the plasma buffer. - */ - int64_t header_end_offset = *((int64_t*)object_buffers[i].data); - auto schema_buffer = std::make_shared( - object_buffers[i].metadata, object_buffers[i].metadata_size); auto batch = std::shared_ptr(); - ARROW_CHECK_OK(read_batch(schema_buffer, header_end_offset, - object_buffers[i].data + sizeof(object_buffers[i].data_size), - object_buffers[i].data_size - sizeof(object_buffers[i].data_size), &batch)); + ARROW_CHECK_OK( + read_batch(object_buffers[i].data, object_buffers[i].data_size, &batch)); PyObject* result; Status s = DeserializeList(batch->column(0), 0, batch->num_rows(), base, &result); diff --git a/src/numbuf/python/test/runtest.py b/src/numbuf/python/test/runtest.py index 90080b968..f0c4daca6 100644 --- a/src/numbuf/python/test/runtest.py +++ b/src/numbuf/python/test/runtest.py @@ -26,7 +26,7 @@ if sys.version_info < (3, 0): class SerializationTests(unittest.TestCase): def roundTripTest(self, data): - schema, size, serialized = numbuf.serialize_list(data) + size, serialized = numbuf.serialize_list(data) result = numbuf.deserialize_list(serialized) assert_equal(data, result) @@ -89,7 +89,7 @@ class SerializationTests(unittest.TestCase): numbuf.register_callbacks(serialize, deserialize) - metadata, size, serialized = numbuf.serialize_list([bar]) + size, serialized = numbuf.serialize_list([bar]) self.assertEqual(numbuf.deserialize_list(serialized)[0].foo.x, 42) def testObjectArray(self): @@ -105,23 +105,23 @@ class SerializationTests(unittest.TestCase): numbuf.register_callbacks(myserialize, mydeserialize) - metadata, size, serialized = numbuf.serialize_list([x, y]) + size, serialized = numbuf.serialize_list([x, y]) assert_equal(numbuf.deserialize_list(serialized), [x, y]) def testBuffer(self): for (i, obj) in enumerate(TEST_OBJECTS): - schema, size, batch = numbuf.serialize_list([obj]) - size = size + 4096 # INITIAL_METADATA_SIZE in arrow. + size, batch = numbuf.serialize_list([obj]) + size = size buff = np.zeros(size, dtype="uint8") - metadata_offset = numbuf.write_to_buffer(batch, memoryview(buff)) - array = numbuf.read_from_buffer(memoryview(buff), memoryview(schema), metadata_offset) + numbuf.write_to_buffer(batch, memoryview(buff)) + array = numbuf.read_from_buffer(memoryview(buff)) result = numbuf.deserialize_list(array) assert_equal(result[0], obj) def testObjectArrayImmutable(self): obj = np.zeros([10]) - schema, size, serialized = numbuf.serialize_list([obj]) + size, serialized = numbuf.serialize_list([obj]) result = numbuf.deserialize_list(serialized) assert_equal(result[0], obj) with self.assertRaises(ValueError): diff --git a/src/numbuf/thirdparty/download_thirdparty.sh b/src/numbuf/thirdparty/download_thirdparty.sh index 324a6b54e..65349802a 100755 --- a/src/numbuf/thirdparty/download_thirdparty.sh +++ b/src/numbuf/thirdparty/download_thirdparty.sh @@ -8,7 +8,7 @@ set -e TP_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) if [ ! -d $TP_DIR/arrow ]; then - git clone https://github.com/pcmoritz/arrow.git "$TP_DIR/arrow" + git clone https://github.com/apache/arrow/ "$TP_DIR/arrow" fi -cd "$TP_DIR/arrow" -git checkout a4a5526e4a8fbc4e4d5382a5c806ec871d2fbd9f +cd $TP_DIR/arrow +git checkout 98a52b4823f3cd0880eaef066dc932f533170292