mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 19:33:06 +08:00
Capability to serialize most primitive Python types
This commit is contained in:
@@ -0,0 +1,88 @@
|
||||
#include "numpy.h"
|
||||
|
||||
#include <numbuf/tensor.h>
|
||||
|
||||
using namespace arrow;
|
||||
|
||||
namespace numbuf {
|
||||
|
||||
#define ARROW_TYPE_TO_NUMPY_CASE(TYPE) \
|
||||
case Type::TYPE: \
|
||||
return NPY_##TYPE;
|
||||
|
||||
#define DESERIALIZE_ARRAY_CASE(TYPE, ArrayType, type) \
|
||||
case Type::TYPE: { \
|
||||
auto values = std::dynamic_pointer_cast<ArrayType>(content->values()); \
|
||||
DCHECK(values); \
|
||||
type* data = const_cast<type*>(values->raw_data()) \
|
||||
+ content->offset(offset); \
|
||||
*out = PyArray_SimpleNewFromData(num_dims, dim.data(), NPY_##TYPE, \
|
||||
reinterpret_cast<void*>(data)); \
|
||||
} \
|
||||
return Status::OK();
|
||||
|
||||
Status DeserializeArray(std::shared_ptr<Array> array, int32_t offset, PyObject** out) {
|
||||
DCHECK(array);
|
||||
auto tensor = std::dynamic_pointer_cast<StructArray>(array);
|
||||
DCHECK(tensor);
|
||||
auto dims = std::dynamic_pointer_cast<ListArray>(tensor->field(0));
|
||||
auto content = std::dynamic_pointer_cast<ListArray>(tensor->field(1));
|
||||
npy_intp num_dims = dims->value_length(offset);
|
||||
std::vector<npy_intp> dim(num_dims);
|
||||
for (int i = dims->offset(offset); i < dims->offset(offset+1); ++i) {
|
||||
dim[i - dims->offset(offset)] =
|
||||
std::dynamic_pointer_cast<Int64Array>(dims->values())->Value(i);
|
||||
}
|
||||
switch (content->value_type()->type) {
|
||||
DESERIALIZE_ARRAY_CASE(INT8, Int8Array, int8_t)
|
||||
DESERIALIZE_ARRAY_CASE(INT16, Int16Array, int16_t)
|
||||
DESERIALIZE_ARRAY_CASE(INT32, Int32Array, int32_t)
|
||||
DESERIALIZE_ARRAY_CASE(INT64, Int64Array, int64_t)
|
||||
DESERIALIZE_ARRAY_CASE(UINT8, UInt8Array, uint8_t)
|
||||
DESERIALIZE_ARRAY_CASE(UINT16, UInt16Array, uint16_t)
|
||||
DESERIALIZE_ARRAY_CASE(UINT32, UInt32Array, uint32_t)
|
||||
DESERIALIZE_ARRAY_CASE(UINT64, UInt64Array, uint64_t)
|
||||
DESERIALIZE_ARRAY_CASE(FLOAT, FloatArray, float)
|
||||
DESERIALIZE_ARRAY_CASE(DOUBLE, DoubleArray, double)
|
||||
default:
|
||||
DCHECK(false) << "arrow type not recognized: " << content->value_type()->type;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder) {
|
||||
size_t ndim = PyArray_NDIM(array);
|
||||
int dtype = PyArray_TYPE(array);
|
||||
std::vector<int64_t> dims(ndim);
|
||||
for (int i = 0; i < ndim; ++i) {
|
||||
dims[i] = PyArray_DIM(array, i);
|
||||
}
|
||||
auto data = PyArray_DATA(array);
|
||||
switch (dtype) {
|
||||
case NPY_UINT8:
|
||||
return builder.Append(dims, reinterpret_cast<uint8_t*>(data));
|
||||
case NPY_INT8:
|
||||
return builder.Append(dims, reinterpret_cast<int8_t*>(data));
|
||||
case NPY_UINT16:
|
||||
return builder.Append(dims, reinterpret_cast<uint16_t*>(data));
|
||||
case NPY_INT16:
|
||||
return builder.Append(dims, reinterpret_cast<int16_t*>(data));
|
||||
case NPY_UINT32:
|
||||
return builder.Append(dims, reinterpret_cast<uint32_t*>(data));
|
||||
case NPY_INT32:
|
||||
return builder.Append(dims, reinterpret_cast<int32_t*>(data));
|
||||
case NPY_UINT64:
|
||||
return builder.Append(dims, reinterpret_cast<uint64_t*>(data));
|
||||
case NPY_INT64:
|
||||
return builder.Append(dims, reinterpret_cast<int64_t*>(data));
|
||||
case NPY_FLOAT:
|
||||
return builder.Append(dims, reinterpret_cast<float*>(data));
|
||||
case NPY_DOUBLE:
|
||||
return builder.Append(dims, reinterpret_cast<double*>(data));
|
||||
default:
|
||||
DCHECK(false) << "numpy data type not recognized: " << dtype;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
#ifndef PYNUMBUF_NUMPY_H
|
||||
#define PYNUMBUF_NUMPY_H
|
||||
|
||||
#include <arrow/api.h>
|
||||
#include <Python.h>
|
||||
|
||||
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
|
||||
#define NO_IMPORT_ARRAY
|
||||
#define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API
|
||||
#include <numpy/arrayobject.h>
|
||||
|
||||
#include <numbuf/tensor.h>
|
||||
#include <numbuf/sequence.h>
|
||||
|
||||
namespace numbuf {
|
||||
|
||||
arrow::Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder);
|
||||
arrow::Status DeserializeArray(std::shared_ptr<arrow::Array> array, int32_t offset, PyObject** out);
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,193 @@
|
||||
#include "python.h"
|
||||
|
||||
#include <sstream>
|
||||
|
||||
#include "scalars.h"
|
||||
|
||||
using namespace arrow;
|
||||
|
||||
namespace numbuf {
|
||||
|
||||
PyObject* get_value(ArrayPtr arr, int32_t index, int32_t type) {
|
||||
PyObject* result;
|
||||
switch (arr->type()->type) {
|
||||
case Type::BOOL:
|
||||
return PyBool_FromLong(std::static_pointer_cast<BooleanArray>(arr)->Value(index));
|
||||
case Type::INT64:
|
||||
return PyInt_FromLong(std::static_pointer_cast<Int64Array>(arr)->Value(index));
|
||||
case Type::STRING: {
|
||||
int32_t nchars;
|
||||
const uint8_t* str = std::static_pointer_cast<StringArray>(arr)->GetValue(index, &nchars);
|
||||
return PyString_FromStringAndSize(reinterpret_cast<const char*>(str), nchars);
|
||||
}
|
||||
case Type::FLOAT:
|
||||
return PyFloat_FromDouble(std::static_pointer_cast<FloatArray>(arr)->Value(index));
|
||||
case Type::DOUBLE:
|
||||
return PyFloat_FromDouble(std::static_pointer_cast<DoubleArray>(arr)->Value(index));
|
||||
case Type::STRUCT: {
|
||||
auto s = std::static_pointer_cast<StructArray>(arr);
|
||||
auto l = std::static_pointer_cast<ListArray>(s->field(0));
|
||||
if (s->type()->child(0)->name == "list") {
|
||||
ARROW_CHECK_OK(DeserializeList(l->values(), l->value_offset(index), l->value_offset(index+1), &result));
|
||||
} else if (s->type()->child(0)->name == "tuple") {
|
||||
ARROW_CHECK_OK(DeserializeTuple(l->values(), l->value_offset(index), l->value_offset(index+1), &result));
|
||||
} else if (s->type()->child(0)->name == "dict") {
|
||||
ARROW_CHECK_OK(DeserializeDict(l->values(), l->value_offset(index), l->value_offset(index+1), &result));
|
||||
} else {
|
||||
ARROW_CHECK_OK(DeserializeArray(arr, index, &result));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
default:
|
||||
DCHECK(false) << "union tag not recognized " << type;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Status append(PyObject* elem, SequenceBuilder& builder,
|
||||
std::vector<PyObject*>& sublists,
|
||||
std::vector<PyObject*>& subtuples,
|
||||
std::vector<PyObject*>& subdicts) {
|
||||
// The bool case must precede the int case (PyInt_Check passes for bools)
|
||||
if (PyBool_Check(elem)) {
|
||||
RETURN_NOT_OK(builder.Append(elem == Py_True));
|
||||
} else if (PyFloat_Check(elem)) {
|
||||
RETURN_NOT_OK(builder.Append(PyFloat_AS_DOUBLE(elem)));
|
||||
} else if (PyLong_Check(elem)) {
|
||||
int overflow = 0;
|
||||
int64_t data = PyLong_AsLongLongAndOverflow(elem, &overflow);
|
||||
RETURN_NOT_OK(builder.Append(data));
|
||||
if(overflow) {
|
||||
return Status::NotImplemented("long overflow");
|
||||
}
|
||||
} else if (PyInt_Check(elem)) {
|
||||
RETURN_NOT_OK(builder.Append(PyInt_AS_LONG(elem)));
|
||||
} else if (PyString_Check(elem)) {
|
||||
RETURN_NOT_OK(builder.Append(PyString_AS_STRING(elem), PyString_GET_SIZE(elem)));
|
||||
} else if (PyList_Check(elem)) {
|
||||
builder.AppendList(PyList_Size(elem));
|
||||
sublists.push_back(elem);
|
||||
} else if (PyDict_Check(elem)) {
|
||||
builder.AppendDict(PyDict_Size(elem));
|
||||
subdicts.push_back(elem);
|
||||
} else if (PyTuple_Check(elem)) {
|
||||
builder.AppendTuple(PyTuple_Size(elem));
|
||||
subtuples.push_back(elem);
|
||||
} else if (PyArray_IsScalar(elem, Generic)) {
|
||||
RETURN_NOT_OK(AppendScalar(elem, builder));
|
||||
} else if (PyArray_Check(elem)) {
|
||||
RETURN_NOT_OK(SerializeArray((PyArrayObject*) elem, builder));
|
||||
} else if (elem == Py_None) {
|
||||
RETURN_NOT_OK(builder.Append());
|
||||
} else {
|
||||
std::stringstream ss;
|
||||
ss << "data type of " << PyString_AS_STRING(PyObject_Repr(elem))
|
||||
<< " not recognized";
|
||||
return Status::NotImplemented(ss.str());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SerializeSequences(std::vector<PyObject*> sequences, std::shared_ptr<Array>* out) {
|
||||
DCHECK(out);
|
||||
SequenceBuilder builder(nullptr);
|
||||
std::vector<PyObject*> sublists, subtuples, subdicts;
|
||||
for (const auto& sequence : sequences) {
|
||||
PyObject* item;
|
||||
PyObject* iterator = PyObject_GetIter(sequence);
|
||||
while (item = PyIter_Next(iterator)) {
|
||||
RETURN_NOT_OK(append(item, builder, sublists, subtuples, subdicts));
|
||||
Py_DECREF(item);
|
||||
}
|
||||
Py_DECREF(iterator);
|
||||
}
|
||||
std::shared_ptr<Array> list;
|
||||
if (sublists.size() > 0) {
|
||||
RETURN_NOT_OK(SerializeSequences(sublists, &list));
|
||||
}
|
||||
std::shared_ptr<Array> tuple;
|
||||
if (subtuples.size() > 0) {
|
||||
RETURN_NOT_OK(SerializeSequences(subtuples, &tuple));
|
||||
}
|
||||
std::shared_ptr<Array> dict;
|
||||
if (subdicts.size() > 0) {
|
||||
RETURN_NOT_OK(SerializeDict(subdicts, &dict));
|
||||
}
|
||||
*out = builder.Finish(list, tuple, dict);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
#define DESERIALIZE_SEQUENCE(CREATE, SET_ITEM) \
|
||||
auto data = std::dynamic_pointer_cast<DenseUnionArray>(array); \
|
||||
int32_t size = array->length(); \
|
||||
PyObject* result = CREATE(stop_idx - start_idx); \
|
||||
auto types = std::make_shared<Int8Array>(size, data->types()); \
|
||||
auto offsets = std::make_shared<Int32Array>(size, data->offset_buf()); \
|
||||
for (size_t i = start_idx; i < stop_idx; ++i) { \
|
||||
if (data->IsNull(i)) { \
|
||||
Py_INCREF(Py_None); \
|
||||
SET_ITEM(result, i-start_idx, Py_None); \
|
||||
} else { \
|
||||
int32_t offset = offsets->Value(i); \
|
||||
int8_t type = types->Value(i); \
|
||||
ArrayPtr arr = data->child(type); \
|
||||
SET_ITEM(result, i-start_idx, get_value(arr, offset, type)); \
|
||||
} \
|
||||
} \
|
||||
*out = result; \
|
||||
return Status::OK();
|
||||
|
||||
Status DeserializeList(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx, PyObject** out) {
|
||||
DESERIALIZE_SEQUENCE(PyList_New, PyList_SetItem)
|
||||
}
|
||||
|
||||
Status DeserializeTuple(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx, PyObject** out) {
|
||||
DESERIALIZE_SEQUENCE(PyTuple_New, PyTuple_SetItem)
|
||||
}
|
||||
|
||||
Status SerializeDict(std::vector<PyObject*> dicts, std::shared_ptr<Array>* out) {
|
||||
DictBuilder result;
|
||||
std::vector<PyObject*> sublists, subtuples, subdicts, dummy;
|
||||
for (const auto& dict : dicts) {
|
||||
PyObject *key, *value;
|
||||
Py_ssize_t pos = 0;
|
||||
while (PyDict_Next(dict, &pos, &key, &value)) {
|
||||
RETURN_NOT_OK(append(key, result.keys(), dummy, dummy, dummy));
|
||||
DCHECK(dummy.size() == 0);
|
||||
RETURN_NOT_OK(append(value, result.vals(), sublists, subtuples, subdicts));
|
||||
}
|
||||
}
|
||||
std::shared_ptr<Array> val_list;
|
||||
if (sublists.size() > 0) {
|
||||
RETURN_NOT_OK(SerializeSequences(sublists, &val_list));
|
||||
}
|
||||
std::shared_ptr<Array> val_tuples;
|
||||
if (subtuples.size() > 0) {
|
||||
RETURN_NOT_OK(SerializeSequences(subtuples, &val_tuples));
|
||||
}
|
||||
std::shared_ptr<Array> val_dict;
|
||||
if (subdicts.size() > 0) {
|
||||
RETURN_NOT_OK(SerializeDict(subdicts, &val_dict));
|
||||
}
|
||||
*out = result.Finish(val_list, val_tuples, val_dict);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DeserializeDict(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx, PyObject** out) {
|
||||
auto data = std::dynamic_pointer_cast<StructArray>(array);
|
||||
// TODO(pcm): error handling, get rid of the temporary copy of the list
|
||||
PyObject *keys, *vals;
|
||||
PyObject* result = PyDict_New();
|
||||
ARROW_RETURN_NOT_OK(DeserializeList(data->field(0), start_idx, stop_idx, &keys));
|
||||
ARROW_RETURN_NOT_OK(DeserializeList(data->field(1), start_idx, stop_idx, &vals));
|
||||
for (size_t i = start_idx; i < stop_idx; ++i) {
|
||||
PyDict_SetItem(result, PyList_GetItem(keys, i - start_idx), PyList_GetItem(vals, i - start_idx));
|
||||
}
|
||||
Py_XDECREF(keys); // PyList_GetItem(keys, ...) incremented the reference count
|
||||
Py_XDECREF(vals); // PyList_GetItem(vals, ...) incremented the reference count
|
||||
*out = result;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
#ifndef PYNUMBUF_PYTHON_H
|
||||
#define PYNUMBUF_PYTHON_H
|
||||
|
||||
#include <Python.h>
|
||||
|
||||
#include <arrow/api.h>
|
||||
#include <numbuf/dict.h>
|
||||
#include <numbuf/sequence.h>
|
||||
|
||||
#include "numpy.h"
|
||||
|
||||
namespace numbuf {
|
||||
|
||||
arrow::Status SerializeSequences(std::vector<PyObject*> sequences, std::shared_ptr<arrow::Array>* out);
|
||||
arrow::Status SerializeDict(std::vector<PyObject*> dicts, std::shared_ptr<arrow::Array>* out);
|
||||
arrow::Status DeserializeList(std::shared_ptr<arrow::Array> array, int32_t start_idx, int32_t stop_idx, PyObject** out);
|
||||
arrow::Status DeserializeTuple(std::shared_ptr<arrow::Array> array, int32_t start_idx, int32_t stop_idx, PyObject** out);
|
||||
arrow::Status DeserializeDict(std::shared_ptr<arrow::Array> array, int32_t start_idx, int32_t stop_idx, PyObject** out);
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,54 @@
|
||||
#ifndef PYNUMBUF_SCALARS_H
|
||||
#define PYNUMBUF_SCALARS_H
|
||||
|
||||
#include <arrow/api.h>
|
||||
|
||||
#include <Python.h>
|
||||
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
|
||||
#define NO_IMPORT_ARRAY
|
||||
#define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API
|
||||
#include <numpy/arrayobject.h>
|
||||
#include <numpy/arrayscalars.h>
|
||||
|
||||
#include <numbuf/sequence.h>
|
||||
|
||||
namespace numbuf {
|
||||
|
||||
arrow::Status AppendScalar(PyObject* obj, SequenceBuilder& builder) {
|
||||
if (PyArray_IsScalar(obj, Bool)) {
|
||||
return builder.Append(((PyBoolScalarObject *)obj)->obval != 0);
|
||||
} else if (PyArray_IsScalar(obj, Float)) {
|
||||
return builder.Append(((PyFloatScalarObject *)obj)->obval);
|
||||
} else if (PyArray_IsScalar(obj, Double)) {
|
||||
return builder.Append(((PyDoubleScalarObject *)obj)->obval);
|
||||
}
|
||||
int64_t value = 0;
|
||||
if (PyArray_IsScalar(obj, Byte)) {
|
||||
value = ((PyByteScalarObject *)obj)->obval;
|
||||
} else if (PyArray_IsScalar(obj, UByte)) {
|
||||
value = ((PyUByteScalarObject *)obj)->obval;
|
||||
} else if (PyArray_IsScalar(obj, Short)) {
|
||||
value = ((PyShortScalarObject *)obj)->obval;
|
||||
} else if (PyArray_IsScalar(obj, UShort)) {
|
||||
value = ((PyUShortScalarObject *)obj)->obval;
|
||||
} else if (PyArray_IsScalar(obj, Int)) {
|
||||
value = ((PyIntScalarObject *)obj)->obval;
|
||||
} else if (PyArray_IsScalar(obj, UInt)) {
|
||||
value = ((PyUIntScalarObject *)obj)->obval;
|
||||
} else if (PyArray_IsScalar(obj, Long)) {
|
||||
value = ((PyLongScalarObject *)obj)->obval;
|
||||
} else if (PyArray_IsScalar(obj, ULong)) {
|
||||
value = ((PyULongScalarObject *)obj)->obval;
|
||||
} else if (PyArray_IsScalar(obj, LongLong)) {
|
||||
value = ((PyLongLongScalarObject *)obj)->obval;
|
||||
} else if (PyArray_IsScalar(obj, ULongLong)) {
|
||||
value = ((PyULongLongScalarObject *)obj)->obval;
|
||||
} else {
|
||||
DCHECK(false) << "scalar type not recognized";
|
||||
}
|
||||
return builder.Append(value);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
#endif // PYNUMBUF_SCALARS_H
|
||||
@@ -0,0 +1,47 @@
|
||||
#ifndef PYNUMBUF_MEMORY_H
|
||||
#define PYNUMBUF_MEMORY_H
|
||||
|
||||
#include <arrow/ipc/memory.h>
|
||||
|
||||
namespace numbuf {
|
||||
|
||||
class BufferSource : public arrow::ipc::MemorySource {
|
||||
public:
|
||||
virtual ~BufferSource() {}
|
||||
|
||||
explicit BufferSource(uint8_t* data, int64_t nbytes)
|
||||
: data_(data), size_(nbytes) {}
|
||||
|
||||
arrow::Status ReadAt(int64_t position, int64_t nbytes,
|
||||
std::shared_ptr<arrow::Buffer>* out) override {
|
||||
DCHECK(out);
|
||||
DCHECK(position + nbytes <= size_) << "position: " << position << " nbytes: " << nbytes << "size: " << size_;
|
||||
*out = std::make_shared<arrow::Buffer>(data_ + position, nbytes);
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
arrow::Status Close() override {
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
arrow::Status Write(int64_t position, const uint8_t* data,
|
||||
int64_t nbytes) override {
|
||||
DCHECK(position >= 0 && position < size_);
|
||||
DCHECK(position + nbytes <= size_) << "position: " << position << " nbytes: " << nbytes << "size: " << size_;
|
||||
uint8_t* dst = data_ + position;
|
||||
memcpy(dst, data, nbytes);
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
int64_t Size() const override {
|
||||
return size_;
|
||||
}
|
||||
|
||||
private:
|
||||
uint8_t* data_;
|
||||
int64_t size_;
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
#endif // PYNUMBUF_MEMORY_H
|
||||
@@ -0,0 +1,205 @@
|
||||
#include <Python.h>
|
||||
#include <arrow/api.h>
|
||||
#include <arrow/ipc/memory.h>
|
||||
#include <arrow/ipc/adapter.h>
|
||||
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
|
||||
#define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API
|
||||
#include <numpy/arrayobject.h>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
#include <arrow/ipc/metadata.h>
|
||||
|
||||
#include "adapters/python.h"
|
||||
#include "memory.h"
|
||||
|
||||
using namespace arrow;
|
||||
using namespace numbuf;
|
||||
|
||||
extern "C" {
|
||||
|
||||
// Error handling
|
||||
|
||||
static PyObject *NumbufError;
|
||||
|
||||
int PyObjectToArrow(PyObject* object, std::shared_ptr<Array> **result) {
|
||||
if (PyCapsule_IsValid(object, "arrow")) {
|
||||
*result = reinterpret_cast<std::shared_ptr<Array>*>(PyCapsule_GetPointer(object, "arrow"));
|
||||
return 1;
|
||||
} else {
|
||||
PyErr_SetString(PyExc_TypeError, "must be an 'arrow' capsule");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static void ArrowCapsule_Destructor(PyObject* capsule) {
|
||||
delete reinterpret_cast<std::shared_ptr<Array>*>(PyCapsule_GetPointer(capsule, "arrow"));
|
||||
}
|
||||
|
||||
std::shared_ptr<RowBatch> make_row_batch(std::shared_ptr<Array> data) {
|
||||
auto field = std::make_shared<Field>("list", data->type());
|
||||
std::shared_ptr<Schema> schema(new Schema({field}));
|
||||
return std::shared_ptr<RowBatch>(new RowBatch(schema, data->length(), {data}));
|
||||
}
|
||||
|
||||
/*! Serializes a Python list into an Arrow array.
|
||||
|
||||
\param args
|
||||
The argument must be a Python list
|
||||
|
||||
\returns
|
||||
A Python "arrow" capsule containing the arrow::Array
|
||||
*/
|
||||
PyObject* serialize_list(PyObject* self, PyObject* args) {
|
||||
PyObject* value;
|
||||
if (!PyArg_ParseTuple(args, "O", &value)) {
|
||||
return NULL;
|
||||
}
|
||||
std::shared_ptr<Array>* result = new std::shared_ptr<Array>();
|
||||
if (PyList_Check(value)) {
|
||||
Status s = SerializeSequences(std::vector<PyObject*>({value}), result);
|
||||
if (!s.ok()) {
|
||||
PyErr_SetString(NumbufError, s.ToString().c_str());
|
||||
return NULL;
|
||||
}
|
||||
return PyCapsule_New(reinterpret_cast<void*>(result), "arrow", &ArrowCapsule_Destructor);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*! Number of bytes the serialized version of the object will take.
|
||||
|
||||
\param args
|
||||
A Python "arrow" capsule containing the arrow::Array
|
||||
|
||||
\returns
|
||||
Size of the object in memory once it is serialized
|
||||
*/
|
||||
PyObject* get_serialized_size(PyObject* self, PyObject* args) {
|
||||
std::shared_ptr<Array>* data;
|
||||
if (!PyArg_ParseTuple(args, "O&", &PyObjectToArrow, &data)) {
|
||||
return NULL;
|
||||
}
|
||||
auto batch = make_row_batch(*data);
|
||||
int64_t size = 0;
|
||||
ARROW_CHECK_OK(arrow::ipc::GetRowBatchSize(batch.get(), &size));
|
||||
return PyInt_FromLong(size);
|
||||
}
|
||||
|
||||
/*! Serialize an arrow::Array into a buffer.
|
||||
|
||||
\param args
|
||||
A Python "arrow" capsule containing the arrow::Array and
|
||||
a python memoryview object the data will be written to
|
||||
|
||||
\return
|
||||
The arrow metadata offset for the arrow metadata
|
||||
*/
|
||||
PyObject* write_to_buffer(PyObject* self, PyObject* args) {
|
||||
std::shared_ptr<Array>* data;
|
||||
PyObject* memoryview;
|
||||
if (!PyArg_ParseTuple(args, "O&O", &PyObjectToArrow, &data, &memoryview)) {
|
||||
return NULL;
|
||||
}
|
||||
if (!PyMemoryView_Check(memoryview)) {
|
||||
return NULL;
|
||||
}
|
||||
auto batch = make_row_batch(*data);
|
||||
Py_buffer* buffer = PyMemoryView_GET_BUFFER(memoryview);
|
||||
auto target = std::make_shared<BufferSource>(reinterpret_cast<uint8_t*>(buffer->buf), buffer->len);
|
||||
int64_t metadata_offset;
|
||||
ARROW_CHECK_OK(ipc::WriteRowBatch(target.get(), batch.get(), 0, &metadata_offset));
|
||||
return PyInt_FromLong(metadata_offset);
|
||||
}
|
||||
|
||||
/*! Serialize schema metadata associated to and arrow::Array
|
||||
|
||||
\param args
|
||||
A Python "arrow" capsule containing the arrow::Array
|
||||
|
||||
\return
|
||||
A bytearray object containing the schema metadata
|
||||
*/
|
||||
PyObject* get_schema_metadata(PyObject* self, PyObject* args) {
|
||||
std::shared_ptr<Array>* data;
|
||||
if (!PyArg_ParseTuple(args, "O&", &PyObjectToArrow, &data)) {
|
||||
return NULL;
|
||||
}
|
||||
auto batch = make_row_batch(*data);
|
||||
std::shared_ptr<Buffer> buffer;
|
||||
ARROW_CHECK_OK(ipc::WriteSchema(batch->schema().get(), &buffer));
|
||||
auto ptr = reinterpret_cast<const char*>(buffer->data());
|
||||
return PyByteArray_FromStringAndSize(ptr, buffer->size());
|
||||
}
|
||||
|
||||
/*! Read serialized data from buffer and produce an arrow capsule
|
||||
|
||||
\param args
|
||||
A Python memoryview from which data will be loaded,
|
||||
a Python bytearray containing the metadata and the metadata_offset
|
||||
|
||||
\return
|
||||
A Python "arrow" capsule containing the arrow data
|
||||
*/
|
||||
PyObject* read_from_buffer(PyObject* self, PyObject* args) {
|
||||
PyObject* memoryview;
|
||||
PyObject* metadata;
|
||||
int64_t metadata_offset;
|
||||
if (!PyArg_ParseTuple(args, "OOl", &memoryview, &metadata, &metadata_offset)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
auto ptr = reinterpret_cast<uint8_t*>(PyByteArray_AsString(metadata));
|
||||
auto schema_buffer = std::make_shared<Buffer>(ptr, PyByteArray_Size(metadata));
|
||||
std::shared_ptr<ipc::Message> message;
|
||||
ARROW_CHECK_OK(ipc::Message::Open(schema_buffer, &message));
|
||||
DCHECK_EQ(ipc::Message::SCHEMA, message->type());
|
||||
std::shared_ptr<ipc::SchemaMessage> schema_msg = message->GetSchema();
|
||||
std::shared_ptr<Schema> schema;
|
||||
ARROW_CHECK_OK(schema_msg->GetSchema(&schema));
|
||||
|
||||
Py_buffer* buffer = PyMemoryView_GET_BUFFER(memoryview);
|
||||
auto source = std::make_shared<BufferSource>(reinterpret_cast<uint8_t*>(buffer->buf), buffer->len);
|
||||
std::shared_ptr<arrow::ipc::RowBatchReader> reader;
|
||||
ARROW_CHECK_OK(arrow::ipc::RowBatchReader::Open(source.get(), metadata_offset, &reader));
|
||||
std::shared_ptr<arrow::RowBatch> data;
|
||||
ARROW_CHECK_OK(reader->GetRowBatch(schema, &data));
|
||||
|
||||
std::shared_ptr<Array>* result = new std::shared_ptr<Array>();
|
||||
*result = data->column(0);
|
||||
return PyCapsule_New(reinterpret_cast<void*>(result), "arrow", &ArrowCapsule_Destructor);
|
||||
}
|
||||
|
||||
/*!
|
||||
*/
|
||||
PyObject* deserialize_list(PyObject* self, PyObject* args) {
|
||||
std::shared_ptr<Array>* data;
|
||||
if (!PyArg_ParseTuple(args, "O&", &PyObjectToArrow, &data)) {
|
||||
return NULL;
|
||||
}
|
||||
PyObject* result;
|
||||
ARROW_CHECK_OK(DeserializeList(*data, 0, (*data)->length(), &result));
|
||||
return result;
|
||||
}
|
||||
|
||||
static PyMethodDef NumbufMethods[] = {
|
||||
{ "serialize_list", serialize_list, METH_VARARGS, "serialize a Python list" },
|
||||
{ "deserialize_list", deserialize_list, METH_VARARGS, "deserialize a Python list" },
|
||||
{ "get_serialized_size", get_serialized_size, METH_VARARGS, "get the number of bytes the object will occupy once serialized" },
|
||||
{ "write_to_buffer", write_to_buffer, METH_VARARGS, "write serialized data to buffer"},
|
||||
{ "read_from_buffer", read_from_buffer, METH_VARARGS, "read serialized data from buffer"},
|
||||
{ "get_schema_metadata", get_schema_metadata, METH_VARARGS, "return the schema of an arrow object"},
|
||||
{ NULL, NULL, 0, NULL }
|
||||
};
|
||||
|
||||
PyMODINIT_FUNC initlibnumbuf(void) {
|
||||
PyObject* m;
|
||||
m = Py_InitModule3("libnumbuf", NumbufMethods, "Python C Extension for Numbuf");
|
||||
char numbuf_error[] = "numbuf.error";
|
||||
NumbufError = PyErr_NewException(numbuf_error, NULL, NULL);
|
||||
Py_INCREF(NumbufError);
|
||||
PyModule_AddObject(m, "numbuf_error", NumbufError);
|
||||
import_array();
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user