Port Ray to latest Arrow version (#370)

* rebase on top of latest arrow

* clang-format

* address comments

* fix
This commit is contained in:
Philipp Moritz
2017-03-20 16:31:46 -07:00
committed by Robert Nishihara
parent 29c8471fd4
commit 4618fd45b1
12 changed files with 148 additions and 121 deletions
+4
View File
@@ -44,7 +44,11 @@ extern "C" {
#define RAY_COMMON_LOG_LEVEL RAY_COMMON_INFO
#endif
/* Arrow defines the same macro, only define it if it has not already been
* defined. */
#ifndef UNUSED
#define UNUSED(x) ((void) (x))
#endif
/**
* Macros to enable each level of Ray logging statements depending on the
+1 -1
View File
@@ -16,7 +16,7 @@ Status DictBuilder::Finish(std::shared_ptr<Array> key_tuple_data,
auto vals_field = std::make_shared<Field>("vals", vals->type());
auto type =
std::make_shared<StructType>(std::vector<FieldPtr>({keys_field, vals_field}));
std::vector<ArrayPtr> field_arrays({keys, vals});
std::vector<std::shared_ptr<Array>> field_arrays({keys, vals});
DCHECK(keys->length() == vals->length());
out->reset(new StructArray(type, keys->length(), field_arrays));
return Status::OK();
+6 -4
View File
@@ -13,7 +13,7 @@ SequenceBuilder::SequenceBuilder(MemoryPool* pool)
bools_(pool, std::make_shared<BooleanType>()),
ints_(pool, std::make_shared<Int64Type>()),
bytes_(pool, std::make_shared<BinaryType>()),
strings_(pool, std::make_shared<StringType>()),
strings_(pool),
floats_(pool, std::make_shared<FloatType>()),
doubles_(pool, std::make_shared<DoubleType>()),
uint8_tensors_(std::make_shared<UInt8Type>(), pool),
@@ -152,6 +152,7 @@ Status SequenceBuilder::AppendDict(int32_t size) {
types[TAG] = std::make_shared<Field>("", VARNAME.type()); \
RETURN_NOT_OK(VARNAME.Finish(&children[TAG])); \
RETURN_NOT_OK(nones_.AppendToBitmap(true)); \
type_ids.push_back(TAG); \
}
#define ADD_SUBSEQUENCE(DATA, OFFSETS, BUILDER, TAG, NAME) \
@@ -166,6 +167,7 @@ Status SequenceBuilder::AppendDict(int32_t size) {
ARROW_CHECK_OK(list_builder->Append(OFFSETS.data(), OFFSETS.size())); \
builder.Append(); \
ADD_ELEMENT(builder, TAG); \
type_ids.push_back(TAG); \
} else { \
DCHECK(OFFSETS.size() == 1); \
}
@@ -174,7 +176,8 @@ Status SequenceBuilder::Finish(std::shared_ptr<Array> list_data,
std::shared_ptr<Array> tuple_data, std::shared_ptr<Array> dict_data,
std::shared_ptr<Array>* out) {
std::vector<std::shared_ptr<Field>> types(num_tags);
std::vector<ArrayPtr> children(num_tags);
std::vector<std::shared_ptr<Array>> children(num_tags);
std::vector<uint8_t> type_ids;
ADD_ELEMENT(bools_, bool_tag);
ADD_ELEMENT(ints_, int_tag);
@@ -201,10 +204,9 @@ Status SequenceBuilder::Finish(std::shared_ptr<Array> list_data,
ADD_SUBSEQUENCE(tuple_data, tuple_offsets_, tuple_builder, tuple_tag, "tuple");
ADD_SUBSEQUENCE(dict_data, dict_offsets_, dict_builder, dict_tag, "dict");
std::vector<uint8_t> type_ids = {};
TypePtr type = TypePtr(new UnionType(types, type_ids, UnionMode::DENSE));
out->reset(new UnionArray(type, types_.length(), children, types_.data(),
offsets_.data(), nones_.null_count(), nones_.null_bitmap()));
offsets_.data(), nones_.null_bitmap(), nones_.null_count()));
return Status::OK();
}
}
+11 -2
View File
@@ -3,10 +3,19 @@
#include "tensor.h"
#include <arrow/api.h>
#include <arrow/types/union.h>
namespace numbuf {
class NullArrayBuilder : public arrow::ArrayBuilder {
public:
explicit NullArrayBuilder(arrow::MemoryPool* pool, const arrow::TypePtr& type)
: arrow::ArrayBuilder(pool, type) {}
virtual ~NullArrayBuilder(){};
arrow::Status Finish(std::shared_ptr<arrow::Array>* out) override {
return arrow::Status::OK();
}
};
/*! A Sequence is a heterogeneous collections of elements. It can contain
scalar Python types, lists, tuples, dictionaries and tensors.
*/
@@ -92,7 +101,7 @@ class SequenceBuilder {
/* Total number of bytes needed to represent this sequence. */
int64_t total_num_bytes_;
arrow::NullArrayBuilder nones_;
NullArrayBuilder nones_;
arrow::BooleanBuilder bools_;
arrow::Int64Builder ints_;
arrow::BinaryBuilder bytes_;
+1 -1
View File
@@ -29,7 +29,7 @@ Status TensorBuilder<T>::Append(const std::vector<int64_t>& dims, const elem_typ
RETURN_NOT_OK(tensors_->Append());
RETURN_NOT_OK(dims_->Append());
RETURN_NOT_OK(values_->Append());
int32_t size = 1;
int64_t size = 1;
for (auto dim : dims) {
size *= dim;
RETURN_NOT_OK(dim_data_->Append(dim));
+2 -2
View File
@@ -2,7 +2,7 @@
#define NUMBUF_TENSOR_H
#include <arrow/api.h>
#include <arrow/type.h>
#include <arrow/util/logging.h>
#include <memory>
namespace numbuf {
@@ -38,7 +38,7 @@ class TensorBuilder {
//! Number of tensors in the column
int32_t length() { return tensors_->length(); }
const arrow::TypePtr& type() { return tensors_->type(); }
std::shared_ptr<arrow::DataType> type() { return tensors_->type(); }
private:
arrow::TypePtr dtype_;
@@ -18,15 +18,15 @@ namespace numbuf {
case Type::TYPE: \
return NPY_##TYPE;
#define DESERIALIZE_ARRAY_CASE(TYPE, ArrayType, type) \
case Type::TYPE: { \
auto values = std::dynamic_pointer_cast<ArrayType>(content->values()); \
DCHECK(values); \
type* data = const_cast<type*>(values->raw_data()) + content->offset(offset); \
*out = PyArray_SimpleNewFromData( \
num_dims, dim.data(), NPY_##TYPE, reinterpret_cast<void*>(data)); \
if (base != Py_None) { PyArray_SetBaseObject((PyArrayObject*)*out, base); } \
Py_XINCREF(base); \
#define DESERIALIZE_ARRAY_CASE(TYPE, ArrayType, type) \
case Type::TYPE: { \
auto values = std::dynamic_pointer_cast<ArrayType>(content->values()); \
DCHECK(values); \
type* data = const_cast<type*>(values->raw_data()) + content->value_offset(offset); \
*out = PyArray_SimpleNewFromData( \
num_dims, dim.data(), NPY_##TYPE, reinterpret_cast<void*>(data)); \
if (base != Py_None) { PyArray_SetBaseObject((PyArrayObject*)*out, base); } \
Py_XINCREF(base); \
} break;
Status DeserializeArray(
@@ -38,8 +38,8 @@ Status DeserializeArray(
auto content = std::dynamic_pointer_cast<ListArray>(tensor->field(1));
npy_intp num_dims = dims->value_length(offset);
std::vector<npy_intp> dim(num_dims);
for (int i = dims->offset(offset); i < dims->offset(offset + 1); ++i) {
dim[i - dims->offset(offset)] =
for (int i = dims->value_offset(offset); i < dims->value_offset(offset + 1); ++i) {
dim[i - dims->value_offset(offset)] =
std::dynamic_pointer_cast<Int64Array>(dims->values())->Value(i);
}
switch (content->value_type()->type) {
@@ -20,8 +20,8 @@ namespace numbuf {
#define PyInt_FromLong PyLong_FromLong
#endif
Status get_value(
ArrayPtr arr, int32_t index, int32_t type, PyObject* base, PyObject** result) {
Status get_value(std::shared_ptr<Array> arr, int32_t index, int32_t type, PyObject* base,
PyObject** result) {
switch (arr->type()->type) {
case Type::BOOL:
*result =
@@ -181,26 +181,26 @@ Status SerializeSequences(std::vector<PyObject*> sequences, int32_t recursion_de
return builder.Finish(list, tuple, dict, out);
}
#define DESERIALIZE_SEQUENCE(CREATE, SET_ITEM) \
auto data = std::dynamic_pointer_cast<UnionArray>(array); \
int32_t size = array->length(); \
PyObject* result = CREATE(stop_idx - start_idx); \
auto types = std::make_shared<Int8Array>(size, data->types()); \
auto offsets = std::make_shared<Int32Array>(size, data->offset_buf()); \
for (size_t i = start_idx; i < stop_idx; ++i) { \
if (data->IsNull(i)) { \
Py_INCREF(Py_None); \
SET_ITEM(result, i - start_idx, Py_None); \
} else { \
int32_t offset = offsets->Value(i); \
int8_t type = types->Value(i); \
ArrayPtr arr = data->child(type); \
PyObject* value; \
RETURN_NOT_OK(get_value(arr, offset, type, base, &value)); \
SET_ITEM(result, i - start_idx, value); \
} \
} \
*out = result; \
#define DESERIALIZE_SEQUENCE(CREATE, SET_ITEM) \
auto data = std::dynamic_pointer_cast<UnionArray>(array); \
int32_t size = array->length(); \
PyObject* result = CREATE(stop_idx - start_idx); \
auto types = std::make_shared<Int8Array>(size, data->type_ids()); \
auto offsets = std::make_shared<Int32Array>(size, data->value_offsets()); \
for (size_t i = start_idx; i < stop_idx; ++i) { \
if (data->IsNull(i)) { \
Py_INCREF(Py_None); \
SET_ITEM(result, i - start_idx, Py_None); \
} else { \
int32_t offset = offsets->Value(i); \
int8_t type = types->Value(i); \
std::shared_ptr<Array> arr = data->child(type); \
PyObject* value; \
RETURN_NOT_OK(get_value(arr, offset, type, base, &value)); \
SET_ITEM(result, i - start_idx, value); \
} \
} \
*out = result; \
return Status::OK();
Status DeserializeList(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx,
+22
View File
@@ -62,6 +62,28 @@ class FixedBufferStream : public arrow::io::OutputStream,
int64_t size_;
};
class MockBufferStream : public arrow::io::OutputStream {
public:
virtual ~MockBufferStream() {}
explicit MockBufferStream() : position_(0) {}
arrow::Status Close() override { return arrow::Status::OK(); }
arrow::Status Tell(int64_t* position) override {
*position = position_;
return arrow::Status::OK();
}
arrow::Status Write(const uint8_t* data, int64_t nbytes) override {
position_ += nbytes;
return arrow::Status::OK();
}
private:
int64_t position_;
};
} // namespace numbuf
#endif // PYNUMBUF_MEMORY_H
+57 -67
View File
@@ -1,6 +1,4 @@
#include <Python.h>
#include <arrow/api.h>
#include <arrow/ipc/adapter.h>
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
#define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API
#include <numpy/arrayobject.h>
@@ -9,7 +7,8 @@
#include <iostream>
#include <arrow/ipc/metadata.h>
#include <arrow/api.h>
#include <arrow/ipc/api.h>
#include "adapters/python.h"
#include "memory.h"
@@ -28,33 +27,40 @@ PyObject* NumbufPlasmaObjectExistsError;
#endif
// Each arrow object is stored in the format
// | length of the object in bytes | object data |.
// LENGTH_PREFIX_SIZE is the number of bytes occupied by the
// object length field.
constexpr int64_t LENGTH_PREFIX_SIZE = sizeof(int64_t);
using namespace arrow;
using namespace numbuf;
int64_t make_schema_and_batch(std::shared_ptr<Array> data,
std::shared_ptr<Buffer>* metadata_out, std::shared_ptr<RecordBatch>* batch_out) {
std::shared_ptr<RecordBatch> make_batch(std::shared_ptr<Array> data) {
auto field = std::make_shared<Field>("list", data->type());
std::shared_ptr<Schema> schema(new Schema({field}));
*batch_out =
std::shared_ptr<RecordBatch>(new RecordBatch(schema, data->length(), {data}));
int64_t size = 0;
ARROW_CHECK_OK(ipc::GetRecordBatchSize(batch_out->get(), &size));
ARROW_CHECK_OK(ipc::WriteSchema((*batch_out)->schema().get(), metadata_out));
return std::shared_ptr<RecordBatch>(new RecordBatch(schema, data->length(), {data}));
}
int64_t get_batch_size(std::shared_ptr<RecordBatch> batch) {
// Determine the size of the file by writing to a mock file.
auto mock = std::make_shared<MockBufferStream>();
std::shared_ptr<arrow::ipc::FileWriter> writer;
ipc::FileWriter::Open(mock.get(), batch->schema(), &writer);
writer->WriteRecordBatch(*batch);
writer->Close();
int64_t size;
ARROW_CHECK_OK(mock->Tell(&size));
return size;
}
Status read_batch(std::shared_ptr<Buffer> schema_buffer, int64_t header_end_offset,
uint8_t* data, int64_t size, std::shared_ptr<RecordBatch>* batch_out) {
std::shared_ptr<ipc::Message> message;
RETURN_NOT_OK(ipc::Message::Open(schema_buffer, &message));
DCHECK_EQ(ipc::Message::SCHEMA, message->type());
std::shared_ptr<ipc::SchemaMessage> schema_msg = message->GetSchema();
std::shared_ptr<Schema> schema;
RETURN_NOT_OK(schema_msg->GetSchema(&schema));
auto source = std::make_shared<FixedBufferStream>(data, size);
std::shared_ptr<arrow::ipc::RecordBatchReader> reader;
RETURN_NOT_OK(ipc::RecordBatchReader::Open(source.get(), header_end_offset, &reader));
RETURN_NOT_OK(reader->GetRecordBatch(schema, batch_out));
Status read_batch(uint8_t* data, int64_t size, std::shared_ptr<RecordBatch>* batch_out) {
std::shared_ptr<arrow::ipc::FileReader> reader;
auto source = std::make_shared<FixedBufferStream>(
LENGTH_PREFIX_SIZE + data, size - LENGTH_PREFIX_SIZE);
int64_t data_size = *((int64_t*)data);
arrow::ipc::FileReader::Open(source, data_size, &reader);
reader->GetRecordBatch(0, batch_out);
return Status::OK();
}
@@ -104,14 +110,13 @@ static PyObject* serialize_list(PyObject* self, PyObject* args) {
CHECK_SERIALIZATION_ERROR(s);
auto batch = new std::shared_ptr<RecordBatch>();
std::shared_ptr<Buffer> metadata;
int64_t size = make_schema_and_batch(array, &metadata, batch);
*batch = make_batch(array);
auto ptr = reinterpret_cast<const char*>(metadata->data());
PyObject* r = PyTuple_New(3);
PyTuple_SetItem(r, 0, PyByteArray_FromStringAndSize(ptr, metadata->size()));
PyTuple_SetItem(r, 1, PyLong_FromLong(size));
PyTuple_SetItem(r, 2,
int64_t size = get_batch_size(*batch);
PyObject* r = PyTuple_New(2);
PyTuple_SetItem(r, 0, PyLong_FromLong(LENGTH_PREFIX_SIZE + size));
PyTuple_SetItem(r, 1,
PyCapsule_New(reinterpret_cast<void*>(batch), "arrow", &ArrowCapsule_Destructor));
return r;
}
@@ -128,32 +133,26 @@ static PyObject* write_to_buffer(PyObject* self, PyObject* args) {
if (!PyMemoryView_Check(memoryview)) { return NULL; }
Py_buffer* buffer = PyMemoryView_GET_BUFFER(memoryview);
auto target = std::make_shared<FixedBufferStream>(
reinterpret_cast<uint8_t*>(buffer->buf), buffer->len);
int64_t body_end_offset;
int64_t header_end_offset;
ARROW_CHECK_OK(ipc::WriteRecordBatch((*batch)->columns(), (*batch)->num_rows(),
target.get(), &body_end_offset, &header_end_offset));
return PyLong_FromLong(header_end_offset);
LENGTH_PREFIX_SIZE + reinterpret_cast<uint8_t*>(buffer->buf),
buffer->len - LENGTH_PREFIX_SIZE);
std::shared_ptr<arrow::ipc::FileWriter> writer;
ipc::FileWriter::Open(target.get(), (*batch)->schema(), &writer);
writer->WriteRecordBatch(*(*batch));
writer->Close();
*((int64_t*)buffer->buf) = buffer->len - LENGTH_PREFIX_SIZE;
Py_RETURN_NONE;
}
/* Documented in doc/numbuf.rst in ray-core */
static PyObject* read_from_buffer(PyObject* self, PyObject* args) {
PyObject* data_memoryview;
PyObject* metadata_memoryview;
int64_t header_end_offset;
if (!PyArg_ParseTuple(
args, "OOL", &data_memoryview, &metadata_memoryview, &header_end_offset)) {
return NULL;
}
if (!PyArg_ParseTuple(args, "O", &data_memoryview)) { return NULL; }
Py_buffer* metadata_buffer = PyMemoryView_GET_BUFFER(metadata_memoryview);
Py_buffer* data_buffer = PyMemoryView_GET_BUFFER(data_memoryview);
auto ptr = reinterpret_cast<uint8_t*>(metadata_buffer->buf);
auto schema_buffer = std::make_shared<Buffer>(ptr, metadata_buffer->len);
auto batch = new std::shared_ptr<arrow::RecordBatch>();
ARROW_CHECK_OK(read_batch(schema_buffer, header_end_offset,
reinterpret_cast<uint8_t*>(data_buffer->buf), data_buffer->len, batch));
ARROW_CHECK_OK(
read_batch(reinterpret_cast<uint8_t*>(data_buffer->buf), data_buffer->len, batch));
return PyCapsule_New(reinterpret_cast<void*>(batch), "arrow", &ArrowCapsule_Destructor);
}
@@ -251,18 +250,16 @@ static PyObject* store_list(PyObject* self, PyObject* args) {
Status s = SerializeSequences(std::vector<PyObject*>({value}), recursion_depth, &array);
CHECK_SERIALIZATION_ERROR(s);
std::shared_ptr<RecordBatch> batch;
std::shared_ptr<Buffer> metadata;
int64_t size = make_schema_and_batch(array, &metadata, &batch);
std::shared_ptr<RecordBatch> batch = make_batch(array);
int64_t size = get_batch_size(batch);
uint8_t* data;
/* The arrow schema is stored as the metadata of the plasma object and
* both the arrow data and the header end offset are
* stored in the plasma data buffer. The header end offset is stored in
* the first sizeof(int64_t) bytes of the data buffer. The RecordBatch
* the first LENGTH_PREFIX_SIZE bytes of the data buffer. The RecordBatch
* data is stored after that. */
int error_code = plasma_create(conn, obj_id, sizeof(size) + size,
(uint8_t*)metadata->data(), metadata->size(), &data);
int error_code = plasma_create(conn, obj_id, LENGTH_PREFIX_SIZE + size, NULL, 0, &data);
if (error_code == PlasmaError_ObjectExists) {
PyErr_SetString(NumbufPlasmaObjectExistsError,
"An object with this ID already exists in the plasma "
@@ -277,14 +274,13 @@ static PyObject* store_list(PyObject* self, PyObject* args) {
}
CHECK(error_code == PlasmaError_OK);
auto target = std::make_shared<FixedBufferStream>(sizeof(size) + data, size);
int64_t body_end_offset;
int64_t header_end_offset;
ARROW_CHECK_OK(ipc::WriteRecordBatch(batch->columns(), batch->num_rows(), target.get(),
&body_end_offset, &header_end_offset));
auto target = std::make_shared<FixedBufferStream>(LENGTH_PREFIX_SIZE + data, size);
std::shared_ptr<arrow::ipc::FileWriter> writer;
ipc::FileWriter::Open(target.get(), batch->schema(), &writer);
writer->WriteRecordBatch(*batch);
writer->Close();
*((int64_t*)data) = size;
/* Save the header end offset at the beginning of the plasma data buffer. */
*((int64_t*)data) = header_end_offset;
/* Do the plasma_release corresponding to the call to plasma_create. */
plasma_release(conn, obj_id);
/* Seal the object. */
@@ -349,15 +345,9 @@ static PyObject* retrieve_list(PyObject* self, PyObject* args) {
PyCapsule_SetContext(base, plasma_conn);
Py_XINCREF(plasma_conn);
/* Remember: The metadata offset was written at the beginning of the plasma buffer.
*/
int64_t header_end_offset = *((int64_t*)object_buffers[i].data);
auto schema_buffer = std::make_shared<Buffer>(
object_buffers[i].metadata, object_buffers[i].metadata_size);
auto batch = std::shared_ptr<RecordBatch>();
ARROW_CHECK_OK(read_batch(schema_buffer, header_end_offset,
object_buffers[i].data + sizeof(object_buffers[i].data_size),
object_buffers[i].data_size - sizeof(object_buffers[i].data_size), &batch));
ARROW_CHECK_OK(
read_batch(object_buffers[i].data, object_buffers[i].data_size, &batch));
PyObject* result;
Status s = DeserializeList(batch->column(0), 0, batch->num_rows(), base, &result);
+8 -8
View File
@@ -26,7 +26,7 @@ if sys.version_info < (3, 0):
class SerializationTests(unittest.TestCase):
def roundTripTest(self, data):
schema, size, serialized = numbuf.serialize_list(data)
size, serialized = numbuf.serialize_list(data)
result = numbuf.deserialize_list(serialized)
assert_equal(data, result)
@@ -89,7 +89,7 @@ class SerializationTests(unittest.TestCase):
numbuf.register_callbacks(serialize, deserialize)
metadata, size, serialized = numbuf.serialize_list([bar])
size, serialized = numbuf.serialize_list([bar])
self.assertEqual(numbuf.deserialize_list(serialized)[0].foo.x, 42)
def testObjectArray(self):
@@ -105,23 +105,23 @@ class SerializationTests(unittest.TestCase):
numbuf.register_callbacks(myserialize, mydeserialize)
metadata, size, serialized = numbuf.serialize_list([x, y])
size, serialized = numbuf.serialize_list([x, y])
assert_equal(numbuf.deserialize_list(serialized), [x, y])
def testBuffer(self):
for (i, obj) in enumerate(TEST_OBJECTS):
schema, size, batch = numbuf.serialize_list([obj])
size = size + 4096 # INITIAL_METADATA_SIZE in arrow.
size, batch = numbuf.serialize_list([obj])
size = size
buff = np.zeros(size, dtype="uint8")
metadata_offset = numbuf.write_to_buffer(batch, memoryview(buff))
array = numbuf.read_from_buffer(memoryview(buff), memoryview(schema), metadata_offset)
numbuf.write_to_buffer(batch, memoryview(buff))
array = numbuf.read_from_buffer(memoryview(buff))
result = numbuf.deserialize_list(array)
assert_equal(result[0], obj)
def testObjectArrayImmutable(self):
obj = np.zeros([10])
schema, size, serialized = numbuf.serialize_list([obj])
size, serialized = numbuf.serialize_list([obj])
result = numbuf.deserialize_list(serialized)
assert_equal(result[0], obj)
with self.assertRaises(ValueError):
+3 -3
View File
@@ -8,7 +8,7 @@ set -e
TP_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
if [ ! -d $TP_DIR/arrow ]; then
git clone https://github.com/pcmoritz/arrow.git "$TP_DIR/arrow"
git clone https://github.com/apache/arrow/ "$TP_DIR/arrow"
fi
cd "$TP_DIR/arrow"
git checkout a4a5526e4a8fbc4e4d5382a5c806ec871d2fbd9f
cd $TP_DIR/arrow
git checkout 98a52b4823f3cd0880eaef066dc932f533170292