mirror of
https://github.com/wassname/ray.git
synced 2026-07-03 15:44:37 +08:00
Rebase numbuf to latest arrow (#23)
* rebase to latest arrow * fix arrow linking * finish rebase * point arrow to pcmoritz's arrow * fix * fix macOS * fix
This commit is contained in:
committed by
Robert Nishihara
parent
c3db225cbe
commit
c3ab68e88c
@@ -166,12 +166,11 @@ Status SerializeSequences(std::vector<PyObject*> sequences, int32_t recursion_de
|
||||
if (subdicts.size() > 0) {
|
||||
RETURN_NOT_OK(SerializeDict(subdicts, recursion_depth + 1, &dict));
|
||||
}
|
||||
*out = builder.Finish(list, tuple, dict);
|
||||
return Status::OK();
|
||||
return builder.Finish(list, tuple, dict, out);
|
||||
}
|
||||
|
||||
#define DESERIALIZE_SEQUENCE(CREATE, SET_ITEM) \
|
||||
auto data = std::dynamic_pointer_cast<DenseUnionArray>(array); \
|
||||
auto data = std::dynamic_pointer_cast<UnionArray>(array); \
|
||||
int32_t size = array->length(); \
|
||||
PyObject* result = CREATE(stop_idx - start_idx); \
|
||||
auto types = std::make_shared<Int8Array>(size, data->types()); \
|
||||
@@ -231,7 +230,7 @@ Status SerializeDict(std::vector<PyObject*> dicts, int32_t recursion_depth, std:
|
||||
if (val_dicts.size() > 0) {
|
||||
RETURN_NOT_OK(SerializeDict(val_dicts, recursion_depth + 1, &val_dict_arr));
|
||||
}
|
||||
*out = result.Finish(key_tuples_arr, val_list_arr, val_tuples_arr, val_dict_arr);
|
||||
result.Finish(key_tuples_arr, val_list_arr, val_tuples_arr, val_dict_arr, out);
|
||||
|
||||
// This block is used to decrement the reference counts of the results
|
||||
// returned by the serialization callback, which is called in SerializeArray
|
||||
|
||||
@@ -1,22 +1,32 @@
|
||||
#ifndef PYNUMBUF_MEMORY_H
|
||||
#define PYNUMBUF_MEMORY_H
|
||||
|
||||
#include <arrow/ipc/memory.h>
|
||||
#include <arrow/io/interfaces.h>
|
||||
|
||||
namespace numbuf {
|
||||
|
||||
class BufferSource : public arrow::ipc::MemorySource {
|
||||
class FixedBufferStream : public arrow::io::OutputStream, public arrow::io::ReadableFileInterface {
|
||||
public:
|
||||
virtual ~BufferSource() {}
|
||||
virtual ~FixedBufferStream() {}
|
||||
|
||||
explicit BufferSource(uint8_t* data, int64_t nbytes)
|
||||
: data_(data), size_(nbytes) {}
|
||||
explicit FixedBufferStream(uint8_t* data, int64_t nbytes)
|
||||
: data_(data), position_(0), size_(nbytes) {}
|
||||
|
||||
arrow::Status ReadAt(int64_t position, int64_t nbytes,
|
||||
std::shared_ptr<arrow::Buffer>* out) override {
|
||||
arrow::Status Read(int64_t nbytes, std::shared_ptr<arrow::Buffer>* out) override {
|
||||
DCHECK(out);
|
||||
DCHECK(position + nbytes <= size_) << "position: " << position << " nbytes: " << nbytes << "size: " << size_;
|
||||
*out = std::make_shared<arrow::Buffer>(data_ + position, nbytes);
|
||||
DCHECK(position_ + nbytes <= size_) << "position: " << position_ << " nbytes: " << nbytes << "size: " << size_;
|
||||
*out = std::make_shared<arrow::Buffer>(data_ + position_, nbytes);
|
||||
position_ += nbytes;
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
arrow::Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
|
||||
assert(0);
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
arrow::Status Seek(int64_t position) override {
|
||||
position_ = position;
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
@@ -24,24 +34,35 @@ class BufferSource : public arrow::ipc::MemorySource {
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
arrow::Status Write(int64_t position, const uint8_t* data,
|
||||
int64_t nbytes) override {
|
||||
DCHECK(position >= 0 && position < size_);
|
||||
DCHECK(position + nbytes <= size_) << "position: " << position << " nbytes: " << nbytes << "size: " << size_;
|
||||
uint8_t* dst = data_ + position;
|
||||
memcpy(dst, data, nbytes);
|
||||
arrow::Status Tell(int64_t* position) override {
|
||||
*position = position_;
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
int64_t Size() const override {
|
||||
return size_;
|
||||
arrow::Status Write(const uint8_t* data, int64_t nbytes) override {
|
||||
DCHECK(position_ >= 0 && position_ < size_);
|
||||
DCHECK(position_ + nbytes <= size_) << "position: " << position_ << " nbytes: " << nbytes << "size: " << size_;
|
||||
uint8_t* dst = data_ + position_;
|
||||
memcpy(dst, data, nbytes);
|
||||
position_ += nbytes;
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
arrow::Status GetSize(int64_t *size) override {
|
||||
*size = size_;
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
bool supports_zero_copy() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
uint8_t* data_;
|
||||
int64_t position_;
|
||||
int64_t size_;
|
||||
};
|
||||
|
||||
} // namespace
|
||||
} // namespace numbuf
|
||||
|
||||
#endif // PYNUMBUF_MEMORY_H
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
#include <Python.h>
|
||||
#include <arrow/api.h>
|
||||
#include <arrow/ipc/memory.h>
|
||||
#include <arrow/ipc/adapter.h>
|
||||
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
|
||||
#define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API
|
||||
@@ -16,10 +15,10 @@
|
||||
using namespace arrow;
|
||||
using namespace numbuf;
|
||||
|
||||
std::shared_ptr<RowBatch> make_row_batch(std::shared_ptr<Array> data) {
|
||||
std::shared_ptr<RecordBatch> make_row_batch(std::shared_ptr<Array> data) {
|
||||
auto field = std::make_shared<Field>("list", data->type());
|
||||
std::shared_ptr<Schema> schema(new Schema({field}));
|
||||
return std::shared_ptr<RowBatch>(new RowBatch(schema, data->length(), {data}));
|
||||
return std::shared_ptr<RecordBatch>(new RecordBatch(schema, data->length(), {data}));
|
||||
}
|
||||
|
||||
extern "C" {
|
||||
@@ -29,9 +28,9 @@ static PyObject *NumbufError;
|
||||
PyObject *numbuf_serialize_callback = NULL;
|
||||
PyObject *numbuf_deserialize_callback = NULL;
|
||||
|
||||
int PyObjectToArrow(PyObject* object, std::shared_ptr<RowBatch> **result) {
|
||||
int PyObjectToArrow(PyObject* object, std::shared_ptr<RecordBatch> **result) {
|
||||
if (PyCapsule_IsValid(object, "arrow")) {
|
||||
*result = reinterpret_cast<std::shared_ptr<RowBatch>*>(PyCapsule_GetPointer(object, "arrow"));
|
||||
*result = reinterpret_cast<std::shared_ptr<RecordBatch>*>(PyCapsule_GetPointer(object, "arrow"));
|
||||
return 1;
|
||||
} else {
|
||||
PyErr_SetString(PyExc_TypeError, "must be an 'arrow' capsule");
|
||||
@@ -40,7 +39,7 @@ int PyObjectToArrow(PyObject* object, std::shared_ptr<RowBatch> **result) {
|
||||
}
|
||||
|
||||
static void ArrowCapsule_Destructor(PyObject* capsule) {
|
||||
delete reinterpret_cast<std::shared_ptr<RowBatch>*>(PyCapsule_GetPointer(capsule, "arrow"));
|
||||
delete reinterpret_cast<std::shared_ptr<RecordBatch>*>(PyCapsule_GetPointer(capsule, "arrow"));
|
||||
}
|
||||
|
||||
/* Documented in doc/numbuf.rst in ray-core */
|
||||
@@ -62,11 +61,11 @@ static PyObject* serialize_list(PyObject* self, PyObject* args) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
auto batch = new std::shared_ptr<RowBatch>();
|
||||
auto batch = new std::shared_ptr<RecordBatch>();
|
||||
*batch = make_row_batch(array);
|
||||
|
||||
int64_t size = 0;
|
||||
ARROW_CHECK_OK(arrow::ipc::GetRowBatchSize(batch->get(), &size));
|
||||
ARROW_CHECK_OK(arrow::ipc::GetRecordBatchSize(batch->get(), &size));
|
||||
|
||||
std::shared_ptr<Buffer> buffer;
|
||||
ARROW_CHECK_OK(ipc::WriteSchema((*batch)->schema().get(), &buffer));
|
||||
@@ -84,7 +83,7 @@ static PyObject* serialize_list(PyObject* self, PyObject* args) {
|
||||
|
||||
/* Documented in doc/numbuf.rst in ray-core */
|
||||
static PyObject* write_to_buffer(PyObject* self, PyObject* args) {
|
||||
std::shared_ptr<RowBatch>* batch;
|
||||
std::shared_ptr<RecordBatch>* batch;
|
||||
PyObject* memoryview;
|
||||
if (!PyArg_ParseTuple(args, "O&O", &PyObjectToArrow, &batch, &memoryview)) {
|
||||
return NULL;
|
||||
@@ -93,10 +92,11 @@ static PyObject* write_to_buffer(PyObject* self, PyObject* args) {
|
||||
return NULL;
|
||||
}
|
||||
Py_buffer* buffer = PyMemoryView_GET_BUFFER(memoryview);
|
||||
auto target = std::make_shared<BufferSource>(reinterpret_cast<uint8_t*>(buffer->buf), buffer->len);
|
||||
int64_t metadata_offset;
|
||||
ARROW_CHECK_OK(ipc::WriteRowBatch(target.get(), batch->get(), 0, &metadata_offset));
|
||||
return PyInt_FromLong(metadata_offset);
|
||||
auto target = std::make_shared<FixedBufferStream>(reinterpret_cast<uint8_t*>(buffer->buf), buffer->len);
|
||||
int64_t body_end_offset;
|
||||
int64_t header_end_offset;
|
||||
ARROW_CHECK_OK(ipc::WriteRecordBatch((*batch)->columns(), (*batch)->num_rows(), target.get(), &body_end_offset, &header_end_offset));
|
||||
return PyInt_FromLong(header_end_offset);
|
||||
}
|
||||
|
||||
/* Documented in doc/numbuf.rst in ray-core */
|
||||
@@ -118,11 +118,11 @@ static PyObject* read_from_buffer(PyObject* self, PyObject* args) {
|
||||
ARROW_CHECK_OK(schema_msg->GetSchema(&schema));
|
||||
|
||||
Py_buffer* buffer = PyMemoryView_GET_BUFFER(memoryview);
|
||||
auto source = std::make_shared<BufferSource>(reinterpret_cast<uint8_t*>(buffer->buf), buffer->len);
|
||||
std::shared_ptr<arrow::ipc::RowBatchReader> reader;
|
||||
ARROW_CHECK_OK(arrow::ipc::RowBatchReader::Open(source.get(), metadata_offset, &reader));
|
||||
auto batch = new std::shared_ptr<arrow::RowBatch>();
|
||||
ARROW_CHECK_OK(reader->GetRowBatch(schema, batch));
|
||||
auto source = std::make_shared<FixedBufferStream>(reinterpret_cast<uint8_t*>(buffer->buf), buffer->len);
|
||||
std::shared_ptr<arrow::ipc::RecordBatchReader> reader;
|
||||
ARROW_CHECK_OK(arrow::ipc::RecordBatchReader::Open(source.get(), metadata_offset, &reader));
|
||||
auto batch = new std::shared_ptr<arrow::RecordBatch>();
|
||||
ARROW_CHECK_OK(reader->GetRecordBatch(schema, batch));
|
||||
|
||||
return PyCapsule_New(reinterpret_cast<void*>(batch),
|
||||
"arrow", &ArrowCapsule_Destructor);
|
||||
@@ -130,7 +130,7 @@ static PyObject* read_from_buffer(PyObject* self, PyObject* args) {
|
||||
|
||||
/* Documented in doc/numbuf.rst in ray-core */
|
||||
static PyObject* deserialize_list(PyObject* self, PyObject* args) {
|
||||
std::shared_ptr<RowBatch>* data;
|
||||
std::shared_ptr<RecordBatch>* data;
|
||||
PyObject* base = Py_None;
|
||||
if (!PyArg_ParseTuple(args, "O&|O", &PyObjectToArrow, &data, &base)) {
|
||||
return NULL;
|
||||
|
||||
Reference in New Issue
Block a user