clang-fixes

This commit is contained in:
Philipp Moritz
2016-11-19 13:24:51 -08:00
parent 9d6da1281e
commit 05e0226047
14 changed files with 239 additions and 243 deletions
+5 -8
View File
@@ -4,12 +4,9 @@ using namespace arrow;
namespace numbuf {
Status DictBuilder::Finish(
std::shared_ptr<Array> key_tuple_data,
std::shared_ptr<Array> val_list_data,
std::shared_ptr<Array> val_tuple_data,
std::shared_ptr<Array> val_dict_data,
std::shared_ptr<arrow::Array>* out) {
Status DictBuilder::Finish(std::shared_ptr<Array> key_tuple_data,
std::shared_ptr<Array> val_list_data, std::shared_ptr<Array> val_tuple_data,
std::shared_ptr<Array> val_dict_data, std::shared_ptr<arrow::Array>* out) {
// lists and dicts can't be keys of dicts in Python, that is why for
// the keys we do not need to collect sublists
std::shared_ptr<Array> keys, vals;
@@ -17,11 +14,11 @@ Status DictBuilder::Finish(
RETURN_NOT_OK(vals_.Finish(val_list_data, val_tuple_data, val_dict_data, &vals));
auto keys_field = std::make_shared<Field>("keys", keys->type());
auto vals_field = std::make_shared<Field>("vals", vals->type());
auto type = std::make_shared<StructType>(std::vector<FieldPtr>({keys_field, vals_field}));
auto type =
std::make_shared<StructType>(std::vector<FieldPtr>({keys_field, vals_field}));
std::vector<ArrayPtr> field_arrays({keys, vals});
DCHECK(keys->length() == vals->length());
out->reset(new StructArray(type, keys->length(), field_arrays));
return Status::OK();
}
}
+7 -11
View File
@@ -13,9 +13,8 @@ namespace numbuf {
can be obtained via the Finish method.
*/
class DictBuilder {
public:
DictBuilder(arrow::MemoryPool* pool = nullptr)
: keys_(pool), vals_(pool) {}
public:
DictBuilder(arrow::MemoryPool* pool = nullptr) : keys_(pool), vals_(pool) {}
//! Builder for the keys of the dictionary
SequenceBuilder& keys() { return keys_; }
@@ -33,18 +32,15 @@ public:
List containing the data from nested dictionaries in the
value list of the dictionary
*/
arrow::Status Finish(
std::shared_ptr<arrow::Array> key_tuple_data,
std::shared_ptr<arrow::Array> val_list_data,
std::shared_ptr<arrow::Array> val_tuple_data,
std::shared_ptr<arrow::Array> val_dict_data,
std::shared_ptr<arrow::Array>* out);
arrow::Status Finish(std::shared_ptr<arrow::Array> key_tuple_data,
std::shared_ptr<arrow::Array> val_list_data,
std::shared_ptr<arrow::Array> val_tuple_data,
std::shared_ptr<arrow::Array> val_dict_data, std::shared_ptr<arrow::Array>* out);
private:
private:
SequenceBuilder keys_;
SequenceBuilder vals_;
};
}
#endif
+16 -21
View File
@@ -25,15 +25,17 @@ SequenceBuilder::SequenceBuilder(MemoryPool* pool)
int64_tensors_(std::make_shared<Int64Type>(), pool),
float_tensors_(std::make_shared<FloatType>(), pool),
double_tensors_(std::make_shared<DoubleType>(), pool),
list_offsets_({0}), tuple_offsets_({0}), dict_offsets_({0}) {}
list_offsets_({0}),
tuple_offsets_({0}),
dict_offsets_({0}) {}
#define UPDATE(OFFSET, TAG) \
if (TAG == -1) { \
TAG = num_tags; \
num_tags += 1; \
} \
RETURN_NOT_OK(offsets_.Append(OFFSET)); \
RETURN_NOT_OK(types_.Append(TAG)); \
#define UPDATE(OFFSET, TAG) \
if (TAG == -1) { \
TAG = num_tags; \
num_tags += 1; \
} \
RETURN_NOT_OK(offsets_.Append(OFFSET)); \
RETURN_NOT_OK(types_.Append(TAG)); \
RETURN_NOT_OK(nones_.AppendToBitmap(true));
Status SequenceBuilder::AppendNone() {
@@ -79,9 +81,7 @@ Status SequenceBuilder::AppendDouble(double data) {
#define DEF_TENSOR_APPEND(NAME, TYPE, TAG) \
Status SequenceBuilder::AppendTensor(const std::vector<int64_t>& dims, TYPE* data) { \
if (TAG == -1) { \
NAME.Start(); \
} \
if (TAG == -1) { NAME.Start(); } \
UPDATE(NAME.length(), TAG); \
return NAME.Append(dims, data); \
}
@@ -138,12 +138,9 @@ Status SequenceBuilder::AppendDict(int32_t size) {
DCHECK(OFFSETS.size() == 1); \
}
Status SequenceBuilder::Finish(
std::shared_ptr<Array> list_data,
std::shared_ptr<Array> tuple_data,
std::shared_ptr<Array> dict_data,
std::shared_ptr<Array>* out) {
Status SequenceBuilder::Finish(std::shared_ptr<Array> list_data,
std::shared_ptr<Array> tuple_data, std::shared_ptr<Array> dict_data,
std::shared_ptr<Array>* out) {
std::vector<std::shared_ptr<Field>> types(num_tags);
std::vector<ArrayPtr> children(num_tags);
@@ -174,10 +171,8 @@ Status SequenceBuilder::Finish(
std::vector<uint8_t> type_ids = {};
TypePtr type = TypePtr(new UnionType(types, type_ids, UnionMode::DENSE));
out->reset(new UnionArray(type, types_.length(),
children, types_.data(), offsets_.data(),
nones_.null_count(), nones_.null_bitmap()));
out->reset(new UnionArray(type, types_.length(), children, types_.data(),
offsets_.data(), nones_.null_count(), nones_.null_bitmap()));
return Status::OK();
}
}
+7 -9
View File
@@ -1,9 +1,9 @@
#ifndef NUMBUF_LIST_H
#define NUMBUF_LIST_H
#include "tensor.h"
#include <arrow/api.h>
#include <arrow/types/union.h>
#include "tensor.h"
namespace numbuf {
@@ -25,7 +25,7 @@ class SequenceBuilder {
//! Appending an uint64_t to the sequence
arrow::Status AppendUInt64(uint64_t data);
//! Append a list of bytes to the sequence
arrow::Status AppendBytes(const uint8_t* data, int32_t length);
@@ -79,11 +79,9 @@ class SequenceBuilder {
arrow::Status AppendDict(int32_t size);
//! Finish building the sequence and return the result
arrow::Status Finish(
std::shared_ptr<arrow::Array> list_data,
std::shared_ptr<arrow::Array> tuple_data,
std::shared_ptr<arrow::Array> dict_data,
std::shared_ptr<arrow::Array>* out);
arrow::Status Finish(std::shared_ptr<arrow::Array> list_data,
std::shared_ptr<arrow::Array> tuple_data, std::shared_ptr<arrow::Array> dict_data,
std::shared_ptr<arrow::Array>* out);
private:
arrow::MemoryPool* pool_;
@@ -139,6 +137,6 @@ class SequenceBuilder {
int8_t num_tags = 0;
};
} // namespace numbuf
} // namespace numbuf
#endif // NUMBUF_LIST_H
#endif // NUMBUF_LIST_H
+9 -8
View File
@@ -4,11 +4,11 @@ using namespace arrow;
namespace numbuf {
template<typename T>
template <typename T>
TensorBuilder<T>::TensorBuilder(const TypePtr& dtype, MemoryPool* pool)
: dtype_(dtype), pool_(pool) {}
template<typename T>
template <typename T>
Status TensorBuilder<T>::Start() {
dim_data_ = std::make_shared<Int64Builder>(pool_, std::make_shared<Int64Type>());
dims_ = std::make_shared<ListBuilder>(pool_, dim_data_);
@@ -16,12 +16,14 @@ Status TensorBuilder<T>::Start() {
values_ = std::make_shared<ListBuilder>(pool_, value_data_);
auto dims_field = std::make_shared<Field>("dims", dims_->type());
auto values_field = std::make_shared<Field>("data", values_->type());
auto type = std::make_shared<StructType>(std::vector<FieldPtr>({dims_field, values_field}));
tensors_ = std::make_shared<StructBuilder>(pool_, type, std::vector<std::shared_ptr<ArrayBuilder>>({dims_, values_}));
auto type =
std::make_shared<StructType>(std::vector<FieldPtr>({dims_field, values_field}));
tensors_ = std::make_shared<StructBuilder>(
pool_, type, std::vector<std::shared_ptr<ArrayBuilder>>({dims_, values_}));
return Status::OK();
}
template<typename T>
template <typename T>
Status TensorBuilder<T>::Append(const std::vector<int64_t>& dims, const elem_type* data) {
DCHECK(tensors_);
RETURN_NOT_OK(tensors_->Append());
@@ -33,10 +35,10 @@ Status TensorBuilder<T>::Append(const std::vector<int64_t>& dims, const elem_typ
RETURN_NOT_OK(dim_data_->Append(dim));
}
RETURN_NOT_OK(value_data_->Append(data, size));
return Status::OK(); // tensors_->Append();
return Status::OK(); // tensors_->Append();
}
template<typename T>
template <typename T>
Status TensorBuilder<T>::Finish(std::shared_ptr<Array>* out) {
return tensors_->Finish(out);
}
@@ -51,5 +53,4 @@ template class TensorBuilder<UInt64Type>;
template class TensorBuilder<Int64Type>;
template class TensorBuilder<FloatType>;
template class TensorBuilder<DoubleType>;
}
+10 -15
View File
@@ -1,9 +1,9 @@
#ifndef NUMBUF_TENSOR_H
#define NUMBUF_TENSOR_H
#include <memory>
#include <arrow/type.h>
#include <arrow/api.h>
#include <arrow/type.h>
#include <memory>
namespace numbuf {
@@ -12,13 +12,13 @@ namespace numbuf {
columns, "dims" which contains an array of dimensions for each Tensor
and "data" which contains data buffer of the Tensor as a flattened array.
*/
template<typename T>
template <typename T>
class TensorBuilder {
public:
public:
typedef typename T::c_type elem_type;
TensorBuilder(const arrow::TypePtr& dtype, arrow::MemoryPool* pool = nullptr);
arrow::Status Start();
/*! Append a new tensor.
@@ -36,16 +36,12 @@ public:
arrow::Status Finish(std::shared_ptr<arrow::Array>* out);
//! Number of tensors in the column
int32_t length() {
return tensors_->length();
}
int32_t length() { return tensors_->length(); }
const arrow::TypePtr& type() {
return tensors_->type();
}
const arrow::TypePtr& type() { return tensors_->type(); }
private:
arrow::TypePtr dtype_;
private:
arrow::TypePtr dtype_;
arrow::MemoryPool* pool_;
std::shared_ptr<arrow::Int64Builder> dim_data_;
std::shared_ptr<arrow::ListBuilder> dims_;
@@ -64,7 +60,6 @@ typedef TensorBuilder<arrow::UInt64Type> UInt64TensorBuilder;
typedef TensorBuilder<arrow::Int64Type> Int64TensorBuilder;
typedef TensorBuilder<arrow::FloatType> FloatTensorBuilder;
typedef TensorBuilder<arrow::DoubleType> DoubleTensorBuilder;
}
#endif // NUMBUF_TENSOR_H
#endif // NUMBUF_TENSOR_H
+21 -23
View File
@@ -8,8 +8,8 @@
using namespace arrow;
extern "C" {
extern PyObject *numbuf_serialize_callback;
extern PyObject *numbuf_deserialize_callback;
extern PyObject* numbuf_serialize_callback;
extern PyObject* numbuf_deserialize_callback;
}
namespace numbuf {
@@ -18,22 +18,20 @@ namespace numbuf {
case Type::TYPE: \
return NPY_##TYPE;
#define DESERIALIZE_ARRAY_CASE(TYPE, ArrayType, type) \
case Type::TYPE: { \
auto values = std::dynamic_pointer_cast<ArrayType>(content->values()); \
DCHECK(values); \
type* data = const_cast<type*>(values->raw_data()) \
+ content->offset(offset); \
*out = PyArray_SimpleNewFromData(num_dims, dim.data(), NPY_##TYPE, \
reinterpret_cast<void*>(data)); \
if (base != Py_None) { \
PyArray_SetBaseObject((PyArrayObject*) *out, base); \
} \
Py_XINCREF(base); \
} \
return Status::OK();
#define DESERIALIZE_ARRAY_CASE(TYPE, ArrayType, type) \
case Type::TYPE: { \
auto values = std::dynamic_pointer_cast<ArrayType>(content->values()); \
DCHECK(values); \
type* data = const_cast<type*>(values->raw_data()) + content->offset(offset); \
*out = PyArray_SimpleNewFromData( \
num_dims, dim.data(), NPY_##TYPE, reinterpret_cast<void*>(data)); \
if (base != Py_None) { PyArray_SetBaseObject((PyArrayObject*)*out, base); } \
Py_XINCREF(base); \
} \
return Status::OK();
Status DeserializeArray(std::shared_ptr<Array> array, int32_t offset, PyObject* base, PyObject** out) {
Status DeserializeArray(
std::shared_ptr<Array> array, int32_t offset, PyObject* base, PyObject** out) {
DCHECK(array);
auto tensor = std::dynamic_pointer_cast<StructArray>(array);
DCHECK(tensor);
@@ -41,9 +39,9 @@ Status DeserializeArray(std::shared_ptr<Array> array, int32_t offset, PyObject*
auto content = std::dynamic_pointer_cast<ListArray>(tensor->field(1));
npy_intp num_dims = dims->value_length(offset);
std::vector<npy_intp> dim(num_dims);
for (int i = dims->offset(offset); i < dims->offset(offset+1); ++i) {
for (int i = dims->offset(offset); i < dims->offset(offset + 1); ++i) {
dim[i - dims->offset(offset)] =
std::dynamic_pointer_cast<Int64Array>(dims->values())->Value(i);
std::dynamic_pointer_cast<Int64Array>(dims->values())->Value(i);
}
switch (content->value_type()->type) {
DESERIALIZE_ARRAY_CASE(INT8, Int8Array, int8_t)
@@ -62,8 +60,8 @@ Status DeserializeArray(std::shared_ptr<Array> array, int32_t offset, PyObject*
return Status::OK();
}
Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder,
std::vector<PyObject*>& subdicts) {
Status SerializeArray(
PyArrayObject* array, SequenceBuilder& builder, std::vector<PyObject*>& subdicts) {
size_t ndim = PyArray_NDIM(array);
int dtype = PyArray_TYPE(array);
std::vector<int64_t> dims(ndim);
@@ -119,7 +117,8 @@ Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder,
PyObject* result = PyObject_CallObject(numbuf_serialize_callback, arglist);
Py_XDECREF(arglist);
if (!result) {
return Status::NotImplemented("python error"); // TODO(pcm): https://github.com/ray-project/numbuf/issues/10
return Status::NotImplemented("python error"); // TODO(pcm):
// https://github.com/ray-project/numbuf/issues/10
}
builder.AppendDict(PyDict_Size(result));
subdicts.push_back(result);
@@ -128,5 +127,4 @@ Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder,
Py_XDECREF(contiguous);
return Status::OK();
}
}
+6 -5
View File
@@ -1,22 +1,23 @@
#ifndef PYNUMBUF_NUMPY_H
#define PYNUMBUF_NUMPY_H
#include <arrow/api.h>
#include <Python.h>
#include <arrow/api.h>
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
#define NO_IMPORT_ARRAY
#define PY_ARRAY_UNIQUE_SYMBOL NUMBUF_ARRAY_API
#include <numpy/arrayobject.h>
#include <numbuf/tensor.h>
#include <numbuf/sequence.h>
#include <numbuf/tensor.h>
namespace numbuf {
arrow::Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder, std::vector<PyObject*>& subdicts);
arrow::Status DeserializeArray(std::shared_ptr<arrow::Array> array, int32_t offset, PyObject* base, PyObject** out);
arrow::Status SerializeArray(
PyArrayObject* array, SequenceBuilder& builder, std::vector<PyObject*>& subdicts);
arrow::Status DeserializeArray(
std::shared_ptr<arrow::Array> array, int32_t offset, PyObject* base, PyObject** out);
}
#endif
+77 -62
View File
@@ -12,46 +12,54 @@ extern "C" {
extern PyObject* numbuf_serialize_callback;
extern PyObject* numbuf_deserialize_callback;
}
namespace numbuf {
Status get_value(ArrayPtr arr, int32_t index, int32_t type, PyObject* base, PyObject** result) {
Status get_value(
ArrayPtr arr, int32_t index, int32_t type, PyObject* base, PyObject** result) {
switch (arr->type()->type) {
case Type::BOOL:
*result = PyBool_FromLong(std::static_pointer_cast<BooleanArray>(arr)->Value(index));
*result =
PyBool_FromLong(std::static_pointer_cast<BooleanArray>(arr)->Value(index));
return Status::OK();
case Type::INT64:
*result = PyInt_FromLong(std::static_pointer_cast<Int64Array>(arr)->Value(index));
return Status::OK();
case Type::BINARY: {
int32_t nchars;
const uint8_t* str = std::static_pointer_cast<BinaryArray>(arr)->GetValue(index, &nchars);
const uint8_t* str =
std::static_pointer_cast<BinaryArray>(arr)->GetValue(index, &nchars);
*result = PyString_FromStringAndSize(reinterpret_cast<const char*>(str), nchars);
return Status::OK();
}
case Type::STRING: {
int32_t nchars;
const uint8_t* str = std::static_pointer_cast<StringArray>(arr)->GetValue(index, &nchars);
const uint8_t* str =
std::static_pointer_cast<StringArray>(arr)->GetValue(index, &nchars);
*result = PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(str), nchars);
return Status::OK();
}
case Type::FLOAT:
*result = PyFloat_FromDouble(std::static_pointer_cast<FloatArray>(arr)->Value(index));
*result =
PyFloat_FromDouble(std::static_pointer_cast<FloatArray>(arr)->Value(index));
return Status::OK();
case Type::DOUBLE:
*result = PyFloat_FromDouble(std::static_pointer_cast<DoubleArray>(arr)->Value(index));
*result =
PyFloat_FromDouble(std::static_pointer_cast<DoubleArray>(arr)->Value(index));
return Status::OK();
case Type::STRUCT: {
auto s = std::static_pointer_cast<StructArray>(arr);
auto l = std::static_pointer_cast<ListArray>(s->field(0));
if (s->type()->child(0)->name == "list") {
return DeserializeList(l->values(), l->value_offset(index), l->value_offset(index+1), base, result);
return DeserializeList(l->values(), l->value_offset(index),
l->value_offset(index + 1), base, result);
} else if (s->type()->child(0)->name == "tuple") {
return DeserializeTuple(l->values(), l->value_offset(index), l->value_offset(index+1), base, result);
return DeserializeTuple(l->values(), l->value_offset(index),
l->value_offset(index + 1), base, result);
} else if (s->type()->child(0)->name == "dict") {
return DeserializeDict(l->values(), l->value_offset(index), l->value_offset(index+1), base, result);
return DeserializeDict(l->values(), l->value_offset(index),
l->value_offset(index + 1), base, result);
} else {
return DeserializeArray(arr, index, base, result);
}
@@ -62,10 +70,8 @@ Status get_value(ArrayPtr arr, int32_t index, int32_t type, PyObject* base, PyOb
return Status::OK();
}
Status append(PyObject* elem, SequenceBuilder& builder,
std::vector<PyObject*>& sublists,
std::vector<PyObject*>& subtuples,
std::vector<PyObject*>& subdicts) {
Status append(PyObject* elem, SequenceBuilder& builder, std::vector<PyObject*>& sublists,
std::vector<PyObject*>& subtuples, std::vector<PyObject*>& subdicts) {
// The bool case must precede the int case (PyInt_Check passes for bools)
if (PyBool_Check(elem)) {
RETURN_NOT_OK(builder.AppendBool(elem == Py_True));
@@ -75,9 +81,7 @@ Status append(PyObject* elem, SequenceBuilder& builder,
int overflow = 0;
int64_t data = PyLong_AsLongLongAndOverflow(elem, &overflow);
RETURN_NOT_OK(builder.AppendInt64(data));
if(overflow) {
return Status::NotImplemented("long overflow");
}
if (overflow) { return Status::NotImplemented("long overflow"); }
} else if (PyInt_Check(elem)) {
RETURN_NOT_OK(builder.AppendInt64(static_cast<int64_t>(PyInt_AS_LONG(elem))));
} else if (PyString_Check(elem)) {
@@ -86,13 +90,14 @@ Status append(PyObject* elem, SequenceBuilder& builder,
RETURN_NOT_OK(builder.AppendBytes(data, size));
} else if (PyUnicode_Check(elem)) {
Py_ssize_t size;
#if PY_MAJOR_VERSION >= 3
char* data = PyUnicode_AsUTF8AndSize(elem, &size); // TODO(pcm): Check if this is correct
#else
PyObject* str = PyUnicode_AsUTF8String(elem);
char* data = PyString_AS_STRING(str);
size = PyString_GET_SIZE(str);
#endif
#if PY_MAJOR_VERSION >= 3
char* data =
PyUnicode_AsUTF8AndSize(elem, &size); // TODO(pcm): Check if this is correct
#else
PyObject* str = PyUnicode_AsUTF8String(elem);
char* data = PyString_AS_STRING(str);
size = PyString_GET_SIZE(str);
#endif
Status s = builder.AppendString(data, size);
Py_XDECREF(str);
RETURN_NOT_OK(s);
@@ -108,14 +113,14 @@ Status append(PyObject* elem, SequenceBuilder& builder,
} else if (PyArray_IsScalar(elem, Generic)) {
RETURN_NOT_OK(AppendScalar(elem, builder));
} else if (PyArray_Check(elem)) {
RETURN_NOT_OK(SerializeArray((PyArrayObject*) elem, builder, subdicts));
RETURN_NOT_OK(SerializeArray((PyArrayObject*)elem, builder, subdicts));
} else if (elem == Py_None) {
RETURN_NOT_OK(builder.AppendNone());
} else {
if (!numbuf_serialize_callback) {
std::stringstream ss;
ss << "data type of " << PyString_AS_STRING(PyObject_Repr(elem))
<< " not recognized and custom serialization handler not registered";
<< " not recognized and custom serialization handler not registered";
return Status::NotImplemented(ss.str());
} else {
PyObject* arglist = Py_BuildValue("(O)", elem);
@@ -124,7 +129,8 @@ Status append(PyObject* elem, SequenceBuilder& builder,
PyObject* result = PyObject_CallObject(numbuf_serialize_callback, arglist);
Py_XDECREF(arglist);
if (!result) {
return Status::NotImplemented("python error"); // TODO(pcm): https://github.com/ray-project/numbuf/issues/10
return Status::NotImplemented("python error"); // TODO(pcm):
// https://github.com/ray-project/numbuf/issues/10
}
builder.AppendDict(PyDict_Size(result));
subdicts.push_back(result);
@@ -133,13 +139,16 @@ Status append(PyObject* elem, SequenceBuilder& builder,
return Status::OK();
}
Status SerializeSequences(std::vector<PyObject*> sequences, int32_t recursion_depth, std::shared_ptr<Array>* out) {
Status SerializeSequences(std::vector<PyObject*> sequences, int32_t recursion_depth,
std::shared_ptr<Array>* out) {
DCHECK(out);
if (recursion_depth >= MAX_RECURSION_DEPTH) {
return Status::NotImplemented("This object exceeds the maximum recursion depth. It may contain itself recursively.");
return Status::NotImplemented(
"This object exceeds the maximum recursion depth. It may contain itself "
"recursively.");
}
SequenceBuilder builder(nullptr);
std::vector<PyObject*> sublists, subtuples, subdicts;
std::vector<PyObject *> sublists, subtuples, subdicts;
for (const auto& sequence : sequences) {
PyObject* item;
PyObject* iterator = PyObject_GetIter(sequence);
@@ -169,42 +178,47 @@ Status SerializeSequences(std::vector<PyObject*> sequences, int32_t recursion_de
return builder.Finish(list, tuple, dict, out);
}
#define DESERIALIZE_SEQUENCE(CREATE, SET_ITEM) \
auto data = std::dynamic_pointer_cast<UnionArray>(array); \
int32_t size = array->length(); \
PyObject* result = CREATE(stop_idx - start_idx); \
auto types = std::make_shared<Int8Array>(size, data->types()); \
auto offsets = std::make_shared<Int32Array>(size, data->offset_buf()); \
for (size_t i = start_idx; i < stop_idx; ++i) { \
if (data->IsNull(i)) { \
Py_INCREF(Py_None); \
SET_ITEM(result, i-start_idx, Py_None); \
} else { \
int32_t offset = offsets->Value(i); \
int8_t type = types->Value(i); \
ArrayPtr arr = data->child(type); \
PyObject* value; \
RETURN_NOT_OK(get_value(arr, offset, type, base, &value)); \
SET_ITEM(result, i-start_idx, value); \
} \
} \
*out = result; \
#define DESERIALIZE_SEQUENCE(CREATE, SET_ITEM) \
auto data = std::dynamic_pointer_cast<UnionArray>(array); \
int32_t size = array->length(); \
PyObject* result = CREATE(stop_idx - start_idx); \
auto types = std::make_shared<Int8Array>(size, data->types()); \
auto offsets = std::make_shared<Int32Array>(size, data->offset_buf()); \
for (size_t i = start_idx; i < stop_idx; ++i) { \
if (data->IsNull(i)) { \
Py_INCREF(Py_None); \
SET_ITEM(result, i - start_idx, Py_None); \
} else { \
int32_t offset = offsets->Value(i); \
int8_t type = types->Value(i); \
ArrayPtr arr = data->child(type); \
PyObject* value; \
RETURN_NOT_OK(get_value(arr, offset, type, base, &value)); \
SET_ITEM(result, i - start_idx, value); \
} \
} \
*out = result; \
return Status::OK();
Status DeserializeList(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx, PyObject* base, PyObject** out) {
Status DeserializeList(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx,
PyObject* base, PyObject** out) {
DESERIALIZE_SEQUENCE(PyList_New, PyList_SetItem)
}
Status DeserializeTuple(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx, PyObject* base, PyObject** out) {
Status DeserializeTuple(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx,
PyObject* base, PyObject** out) {
DESERIALIZE_SEQUENCE(PyTuple_New, PyTuple_SetItem)
}
Status SerializeDict(std::vector<PyObject*> dicts, int32_t recursion_depth, std::shared_ptr<Array>* out) {
Status SerializeDict(
std::vector<PyObject*> dicts, int32_t recursion_depth, std::shared_ptr<Array>* out) {
DictBuilder result;
if (recursion_depth >= MAX_RECURSION_DEPTH) {
return Status::NotImplemented("This object exceeds the maximum recursion depth. It may contain itself recursively.");
return Status::NotImplemented(
"This object exceeds the maximum recursion depth. It may contain itself "
"recursively.");
}
std::vector<PyObject*> key_tuples, val_lists, val_tuples, val_dicts, dummy;
std::vector<PyObject *> key_tuples, val_lists, val_tuples, val_dicts, dummy;
for (const auto& dict : dicts) {
PyObject *key, *value;
Py_ssize_t pos = 0;
@@ -248,7 +262,8 @@ Status SerializeDict(std::vector<PyObject*> dicts, int32_t recursion_depth, std:
return Status::OK();
}
Status DeserializeDict(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx, PyObject* base, PyObject** out) {
Status DeserializeDict(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx,
PyObject* base, PyObject** out) {
auto data = std::dynamic_pointer_cast<StructArray>(array);
// TODO(pcm): error handling, get rid of the temporary copy of the list
PyObject *keys, *vals;
@@ -256,10 +271,11 @@ Status DeserializeDict(std::shared_ptr<Array> array, int32_t start_idx, int32_t
ARROW_RETURN_NOT_OK(DeserializeList(data->field(0), start_idx, stop_idx, base, &keys));
ARROW_RETURN_NOT_OK(DeserializeList(data->field(1), start_idx, stop_idx, base, &vals));
for (size_t i = start_idx; i < stop_idx; ++i) {
PyDict_SetItem(result, PyList_GetItem(keys, i - start_idx), PyList_GetItem(vals, i - start_idx));
PyDict_SetItem(
result, PyList_GetItem(keys, i - start_idx), PyList_GetItem(vals, i - start_idx));
}
Py_XDECREF(keys); // PyList_GetItem(keys, ...) incremented the reference count
Py_XDECREF(vals); // PyList_GetItem(vals, ...) incremented the reference count
Py_XDECREF(keys); // PyList_GetItem(keys, ...) incremented the reference count
Py_XDECREF(vals); // PyList_GetItem(vals, ...) incremented the reference count
static PyObject* py_type = PyString_FromString("_pytype_");
if (PyDict_Contains(result, py_type) && numbuf_deserialize_callback) {
PyObject* arglist = Py_BuildValue("(O)", result);
@@ -270,12 +286,11 @@ Status DeserializeDict(std::shared_ptr<Array> array, int32_t start_idx, int32_t
Py_XDECREF(result);
result = callback_result;
if (!callback_result) {
return Status::NotImplemented("python error"); // TODO(pcm): https://github.com/ray-project/numbuf/issues/10
return Status::NotImplemented("python error"); // TODO(pcm):
// https://github.com/ray-project/numbuf/issues/10
}
}
*out = result;
return Status::OK();
}
}
+10 -6
View File
@@ -11,12 +11,16 @@
namespace numbuf {
arrow::Status SerializeSequences(std::vector<PyObject*> sequences, int32_t recursion_depth, std::shared_ptr<arrow::Array>* out);
arrow::Status SerializeDict(std::vector<PyObject*> dicts, int32_t recursion_depth, std::shared_ptr<arrow::Array>* out);
arrow::Status DeserializeList(std::shared_ptr<arrow::Array> array, int32_t start_idx, int32_t stop_idx, PyObject* base, PyObject** out);
arrow::Status DeserializeTuple(std::shared_ptr<arrow::Array> array, int32_t start_idx, int32_t stop_idx, PyObject* base, PyObject** out);
arrow::Status DeserializeDict(std::shared_ptr<arrow::Array> array, int32_t start_idx, int32_t stop_idx, PyObject* base, PyObject** out);
arrow::Status SerializeSequences(std::vector<PyObject*> sequences,
int32_t recursion_depth, std::shared_ptr<arrow::Array>* out);
arrow::Status SerializeDict(std::vector<PyObject*> dicts, int32_t recursion_depth,
std::shared_ptr<arrow::Array>* out);
arrow::Status DeserializeList(std::shared_ptr<arrow::Array> array, int32_t start_idx,
int32_t stop_idx, PyObject* base, PyObject** out);
arrow::Status DeserializeTuple(std::shared_ptr<arrow::Array> array, int32_t start_idx,
int32_t stop_idx, PyObject* base, PyObject** out);
arrow::Status DeserializeDict(std::shared_ptr<arrow::Array> array, int32_t start_idx,
int32_t stop_idx, PyObject* base, PyObject** out);
}
#endif
+15 -15
View File
@@ -16,39 +16,39 @@ namespace numbuf {
arrow::Status AppendScalar(PyObject* obj, SequenceBuilder& builder) {
if (PyArray_IsScalar(obj, Bool)) {
return builder.AppendBool(((PyBoolScalarObject *)obj)->obval != 0);
return builder.AppendBool(((PyBoolScalarObject*)obj)->obval != 0);
} else if (PyArray_IsScalar(obj, Float)) {
return builder.AppendFloat(((PyFloatScalarObject *)obj)->obval);
return builder.AppendFloat(((PyFloatScalarObject*)obj)->obval);
} else if (PyArray_IsScalar(obj, Double)) {
return builder.AppendDouble(((PyDoubleScalarObject *)obj)->obval);
return builder.AppendDouble(((PyDoubleScalarObject*)obj)->obval);
}
int64_t value = 0;
if (PyArray_IsScalar(obj, Byte)) {
value = ((PyByteScalarObject *)obj)->obval;
value = ((PyByteScalarObject*)obj)->obval;
} else if (PyArray_IsScalar(obj, UByte)) {
value = ((PyUByteScalarObject *)obj)->obval;
value = ((PyUByteScalarObject*)obj)->obval;
} else if (PyArray_IsScalar(obj, Short)) {
value = ((PyShortScalarObject *)obj)->obval;
value = ((PyShortScalarObject*)obj)->obval;
} else if (PyArray_IsScalar(obj, UShort)) {
value = ((PyUShortScalarObject *)obj)->obval;
value = ((PyUShortScalarObject*)obj)->obval;
} else if (PyArray_IsScalar(obj, Int)) {
value = ((PyIntScalarObject *)obj)->obval;
value = ((PyIntScalarObject*)obj)->obval;
} else if (PyArray_IsScalar(obj, UInt)) {
value = ((PyUIntScalarObject *)obj)->obval;
value = ((PyUIntScalarObject*)obj)->obval;
} else if (PyArray_IsScalar(obj, Long)) {
value = ((PyLongScalarObject *)obj)->obval;
value = ((PyLongScalarObject*)obj)->obval;
} else if (PyArray_IsScalar(obj, ULong)) {
value = ((PyULongScalarObject *)obj)->obval;
value = ((PyULongScalarObject*)obj)->obval;
} else if (PyArray_IsScalar(obj, LongLong)) {
value = ((PyLongLongScalarObject *)obj)->obval;
value = ((PyLongLongScalarObject*)obj)->obval;
} else if (PyArray_IsScalar(obj, ULongLong)) {
value = ((PyULongLongScalarObject *)obj)->obval;
value = ((PyULongLongScalarObject*)obj)->obval;
} else {
DCHECK(false) << "scalar type not recognized";
}
return builder.AppendInt64(value);
}
} // namespace
} // namespace
#endif // PYNUMBUF_SCALARS_H
#endif // PYNUMBUF_SCALARS_H
+16 -17
View File
@@ -5,16 +5,18 @@
namespace numbuf {
class FixedBufferStream : public arrow::io::OutputStream, public arrow::io::ReadableFileInterface {
class FixedBufferStream : public arrow::io::OutputStream,
public arrow::io::ReadableFileInterface {
public:
virtual ~FixedBufferStream() {}
explicit FixedBufferStream(uint8_t* data, int64_t nbytes)
: data_(data), position_(0), size_(nbytes) {}
: data_(data), position_(0), size_(nbytes) {}
arrow::Status Read(int64_t nbytes, std::shared_ptr<arrow::Buffer>* out) override {
DCHECK(out);
DCHECK(position_ + nbytes <= size_) << "position: " << position_ << " nbytes: " << nbytes << "size: " << size_;
DCHECK(position_ + nbytes <= size_) << "position: " << position_
<< " nbytes: " << nbytes << "size: " << size_;
*out = std::make_shared<arrow::Buffer>(data_ + position_, nbytes);
position_ += nbytes;
return arrow::Status::OK();
@@ -30,9 +32,7 @@ class FixedBufferStream : public arrow::io::OutputStream, public arrow::io::Read
return arrow::Status::OK();
}
arrow::Status Close() override {
return arrow::Status::OK();
}
arrow::Status Close() override { return arrow::Status::OK(); }
arrow::Status Tell(int64_t* position) override {
*position = position_;
@@ -41,28 +41,27 @@ class FixedBufferStream : public arrow::io::OutputStream, public arrow::io::Read
arrow::Status Write(const uint8_t* data, int64_t nbytes) override {
DCHECK(position_ >= 0 && position_ < size_);
DCHECK(position_ + nbytes <= size_) << "position: " << position_ << " nbytes: " << nbytes << "size: " << size_;
DCHECK(position_ + nbytes <= size_) << "position: " << position_
<< " nbytes: " << nbytes << "size: " << size_;
uint8_t* dst = data_ + position_;
memcpy(dst, data, nbytes);
position_ += nbytes;
return arrow::Status::OK();
}
arrow::Status GetSize(int64_t *size) override {
arrow::Status GetSize(int64_t* size) override {
*size = size_;
return arrow::Status::OK();
}
bool supports_zero_copy() const override {
return true;
}
bool supports_zero_copy() const override { return true; }
private:
uint8_t* data_;
int64_t position_;
int64_t size_;
private:
uint8_t* data_;
int64_t position_;
int64_t size_;
};
} // namespace numbuf
} // namespace numbuf
#endif // PYNUMBUF_MEMORY_H
#endif // PYNUMBUF_MEMORY_H
+40 -43
View File
@@ -23,14 +23,15 @@ std::shared_ptr<RecordBatch> make_row_batch(std::shared_ptr<Array> data) {
extern "C" {
static PyObject *NumbufError;
static PyObject* NumbufError;
PyObject *numbuf_serialize_callback = NULL;
PyObject *numbuf_deserialize_callback = NULL;
PyObject* numbuf_serialize_callback = NULL;
PyObject* numbuf_deserialize_callback = NULL;
int PyObjectToArrow(PyObject* object, std::shared_ptr<RecordBatch> **result) {
int PyObjectToArrow(PyObject* object, std::shared_ptr<RecordBatch>** result) {
if (PyCapsule_IsValid(object, "arrow")) {
*result = reinterpret_cast<std::shared_ptr<RecordBatch>*>(PyCapsule_GetPointer(object, "arrow"));
*result = reinterpret_cast<std::shared_ptr<RecordBatch>*>(
PyCapsule_GetPointer(object, "arrow"));
return 1;
} else {
PyErr_SetString(PyExc_TypeError, "must be an 'arrow' capsule");
@@ -39,25 +40,23 @@ int PyObjectToArrow(PyObject* object, std::shared_ptr<RecordBatch> **result) {
}
static void ArrowCapsule_Destructor(PyObject* capsule) {
delete reinterpret_cast<std::shared_ptr<RecordBatch>*>(PyCapsule_GetPointer(capsule, "arrow"));
delete reinterpret_cast<std::shared_ptr<RecordBatch>*>(
PyCapsule_GetPointer(capsule, "arrow"));
}
/* Documented in doc/numbuf.rst in ray-core */
static PyObject* serialize_list(PyObject* self, PyObject* args) {
PyObject* value;
if (!PyArg_ParseTuple(args, "O", &value)) {
return NULL;
}
if (!PyArg_ParseTuple(args, "O", &value)) { return NULL; }
std::shared_ptr<Array> array;
if (PyList_Check(value)) {
int32_t recursion_depth = 0;
Status s = SerializeSequences(std::vector<PyObject*>({value}), recursion_depth, &array);
Status s =
SerializeSequences(std::vector<PyObject*>({value}), recursion_depth, &array);
if (!s.ok()) {
// If this condition is true, there was an error in the callback that
// needs to be passed through
if (!PyErr_Occurred()) {
PyErr_SetString(NumbufError, s.ToString().c_str());
}
if (!PyErr_Occurred()) { PyErr_SetString(NumbufError, s.ToString().c_str()); }
return NULL;
}
@@ -74,8 +73,8 @@ static PyObject* serialize_list(PyObject* self, PyObject* args) {
PyObject* r = PyTuple_New(3);
PyTuple_SetItem(r, 0, PyByteArray_FromStringAndSize(ptr, buffer->size()));
PyTuple_SetItem(r, 1, PyInt_FromLong(size));
PyTuple_SetItem(r, 2, PyCapsule_New(reinterpret_cast<void*>(batch),
"arrow", &ArrowCapsule_Destructor));
PyTuple_SetItem(r, 2,
PyCapsule_New(reinterpret_cast<void*>(batch), "arrow", &ArrowCapsule_Destructor));
return r;
}
return NULL;
@@ -88,14 +87,14 @@ static PyObject* write_to_buffer(PyObject* self, PyObject* args) {
if (!PyArg_ParseTuple(args, "O&O", &PyObjectToArrow, &batch, &memoryview)) {
return NULL;
}
if (!PyMemoryView_Check(memoryview)) {
return NULL;
}
if (!PyMemoryView_Check(memoryview)) { return NULL; }
Py_buffer* buffer = PyMemoryView_GET_BUFFER(memoryview);
auto target = std::make_shared<FixedBufferStream>(reinterpret_cast<uint8_t*>(buffer->buf), buffer->len);
auto target = std::make_shared<FixedBufferStream>(
reinterpret_cast<uint8_t*>(buffer->buf), buffer->len);
int64_t body_end_offset;
int64_t header_end_offset;
ARROW_CHECK_OK(ipc::WriteRecordBatch((*batch)->columns(), (*batch)->num_rows(), target.get(), &body_end_offset, &header_end_offset));
ARROW_CHECK_OK(ipc::WriteRecordBatch((*batch)->columns(), (*batch)->num_rows(),
target.get(), &body_end_offset, &header_end_offset));
return PyInt_FromLong(header_end_offset);
}
@@ -118,31 +117,28 @@ static PyObject* read_from_buffer(PyObject* self, PyObject* args) {
ARROW_CHECK_OK(schema_msg->GetSchema(&schema));
Py_buffer* buffer = PyMemoryView_GET_BUFFER(memoryview);
auto source = std::make_shared<FixedBufferStream>(reinterpret_cast<uint8_t*>(buffer->buf), buffer->len);
auto source = std::make_shared<FixedBufferStream>(
reinterpret_cast<uint8_t*>(buffer->buf), buffer->len);
std::shared_ptr<arrow::ipc::RecordBatchReader> reader;
ARROW_CHECK_OK(arrow::ipc::RecordBatchReader::Open(source.get(), metadata_offset, &reader));
ARROW_CHECK_OK(
arrow::ipc::RecordBatchReader::Open(source.get(), metadata_offset, &reader));
auto batch = new std::shared_ptr<arrow::RecordBatch>();
ARROW_CHECK_OK(reader->GetRecordBatch(schema, batch));
return PyCapsule_New(reinterpret_cast<void*>(batch),
"arrow", &ArrowCapsule_Destructor);
return PyCapsule_New(reinterpret_cast<void*>(batch), "arrow", &ArrowCapsule_Destructor);
}
/* Documented in doc/numbuf.rst in ray-core */
static PyObject* deserialize_list(PyObject* self, PyObject* args) {
std::shared_ptr<RecordBatch>* data;
PyObject* base = Py_None;
if (!PyArg_ParseTuple(args, "O&|O", &PyObjectToArrow, &data, &base)) {
return NULL;
}
if (!PyArg_ParseTuple(args, "O&|O", &PyObjectToArrow, &data, &base)) { return NULL; }
PyObject* result;
Status s = DeserializeList((*data)->column(0), 0, (*data)->num_rows(), base, &result);
if (!s.ok()) {
// If this condition is true, there was an error in the callback that
// needs to be passed through
if (!PyErr_Occurred()) {
PyErr_SetString(NumbufError, s.ToString().c_str());
}
if (!PyErr_Occurred()) { PyErr_SetString(NumbufError, s.ToString().c_str()); }
return NULL;
}
return result;
@@ -152,7 +148,8 @@ static PyObject* register_callbacks(PyObject* self, PyObject* args) {
PyObject* result = NULL;
PyObject* serialize_callback;
PyObject* deserialize_callback;
if (PyArg_ParseTuple(args, "OO:register_callbacks", &serialize_callback, &deserialize_callback)) {
if (PyArg_ParseTuple(
args, "OO:register_callbacks", &serialize_callback, &deserialize_callback)) {
if (!PyCallable_Check(serialize_callback)) {
PyErr_SetString(PyExc_TypeError, "serialize_callback must be callable");
return NULL;
@@ -161,10 +158,10 @@ static PyObject* register_callbacks(PyObject* self, PyObject* args) {
PyErr_SetString(PyExc_TypeError, "deserialize_callback must be callable");
return NULL;
}
Py_XINCREF(serialize_callback); // Add a reference to new serialization callback
Py_XINCREF(deserialize_callback); // Add a reference to new deserialization callback
Py_XDECREF(numbuf_serialize_callback); // Dispose of old serialization callback
Py_XDECREF(numbuf_deserialize_callback); // Dispose of old deserialization callback
Py_XINCREF(serialize_callback); // Add a reference to new serialization callback
Py_XINCREF(deserialize_callback); // Add a reference to new deserialization callback
Py_XDECREF(numbuf_serialize_callback); // Dispose of old serialization callback
Py_XDECREF(numbuf_deserialize_callback); // Dispose of old deserialization callback
numbuf_serialize_callback = serialize_callback;
numbuf_deserialize_callback = deserialize_callback;
Py_INCREF(Py_None);
@@ -174,13 +171,14 @@ static PyObject* register_callbacks(PyObject* self, PyObject* args) {
}
static PyMethodDef NumbufMethods[] = {
{ "serialize_list", serialize_list, METH_VARARGS, "serialize a Python list" },
{ "deserialize_list", deserialize_list, METH_VARARGS, "deserialize a Python list" },
{ "write_to_buffer", write_to_buffer, METH_VARARGS, "write serialized data to buffer"},
{ "read_from_buffer", read_from_buffer, METH_VARARGS, "read serialized data from buffer"},
{ "register_callbacks", register_callbacks, METH_VARARGS, "set serialization and deserialization callbacks"},
{ NULL, NULL, 0, NULL }
};
{"serialize_list", serialize_list, METH_VARARGS, "serialize a Python list"},
{"deserialize_list", deserialize_list, METH_VARARGS, "deserialize a Python list"},
{"write_to_buffer", write_to_buffer, METH_VARARGS, "write serialized data to buffer"},
{"read_from_buffer", read_from_buffer, METH_VARARGS,
"read serialized data from buffer"},
{"register_callbacks", register_callbacks, METH_VARARGS,
"set serialization and deserialization callbacks"},
{NULL, NULL, 0, NULL}};
PyMODINIT_FUNC initlibnumbuf(void) {
PyObject* m;
@@ -191,5 +189,4 @@ PyMODINIT_FUNC initlibnumbuf(void) {
PyModule_AddObject(m, "numbuf_error", NumbufError);
import_array();
}
}