mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 15:40:09 +08:00
switch to release mode
This commit is contained in:
@@ -57,31 +57,47 @@ Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder) {
|
||||
for (int i = 0; i < ndim; ++i) {
|
||||
dims[i] = PyArray_DIM(array, i);
|
||||
}
|
||||
auto data = PyArray_DATA(array);
|
||||
// TODO(pcm): Once we don't use builders any more below and directly share
|
||||
// the memory buffer, we need to be more careful about this and not
|
||||
// decrease the reference count of "contiguous" before the serialization
|
||||
// is finished
|
||||
auto contiguous = PyArray_GETCONTIGUOUS(array);
|
||||
auto data = PyArray_DATA(contiguous);
|
||||
switch (dtype) {
|
||||
case NPY_UINT8:
|
||||
return builder.Append(dims, reinterpret_cast<uint8_t*>(data));
|
||||
RETURN_NOT_OK(builder.Append(dims, reinterpret_cast<uint8_t*>(data)));
|
||||
break;
|
||||
case NPY_INT8:
|
||||
return builder.Append(dims, reinterpret_cast<int8_t*>(data));
|
||||
RETURN_NOT_OK(builder.Append(dims, reinterpret_cast<int8_t*>(data)));
|
||||
break;
|
||||
case NPY_UINT16:
|
||||
return builder.Append(dims, reinterpret_cast<uint16_t*>(data));
|
||||
RETURN_NOT_OK(builder.Append(dims, reinterpret_cast<uint16_t*>(data)));
|
||||
break;
|
||||
case NPY_INT16:
|
||||
return builder.Append(dims, reinterpret_cast<int16_t*>(data));
|
||||
RETURN_NOT_OK(builder.Append(dims, reinterpret_cast<int16_t*>(data)));
|
||||
break;
|
||||
case NPY_UINT32:
|
||||
return builder.Append(dims, reinterpret_cast<uint32_t*>(data));
|
||||
RETURN_NOT_OK(builder.Append(dims, reinterpret_cast<uint32_t*>(data)));
|
||||
break;
|
||||
case NPY_INT32:
|
||||
return builder.Append(dims, reinterpret_cast<int32_t*>(data));
|
||||
RETURN_NOT_OK(builder.Append(dims, reinterpret_cast<int32_t*>(data)));
|
||||
break;
|
||||
case NPY_UINT64:
|
||||
return builder.Append(dims, reinterpret_cast<uint64_t*>(data));
|
||||
RETURN_NOT_OK(builder.Append(dims, reinterpret_cast<uint64_t*>(data)));
|
||||
break;
|
||||
case NPY_INT64:
|
||||
return builder.Append(dims, reinterpret_cast<int64_t*>(data));
|
||||
RETURN_NOT_OK(builder.Append(dims, reinterpret_cast<int64_t*>(data)));
|
||||
break;
|
||||
case NPY_FLOAT:
|
||||
return builder.Append(dims, reinterpret_cast<float*>(data));
|
||||
RETURN_NOT_OK(builder.Append(dims, reinterpret_cast<float*>(data)));
|
||||
break;
|
||||
case NPY_DOUBLE:
|
||||
return builder.Append(dims, reinterpret_cast<double*>(data));
|
||||
RETURN_NOT_OK(builder.Append(dims, reinterpret_cast<double*>(data)));
|
||||
break;
|
||||
default:
|
||||
DCHECK(false) << "numpy data type not recognized: " << dtype;
|
||||
}
|
||||
Py_XDECREF(contiguous);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@@ -96,8 +96,13 @@ Status SerializeSequences(std::vector<PyObject*> sequences, std::shared_ptr<Arra
|
||||
PyObject* item;
|
||||
PyObject* iterator = PyObject_GetIter(sequence);
|
||||
while (item = PyIter_Next(iterator)) {
|
||||
RETURN_NOT_OK(append(item, builder, sublists, subtuples, subdicts));
|
||||
Status s = append(item, builder, sublists, subtuples, subdicts);
|
||||
Py_DECREF(item);
|
||||
// if an error occurs, we need to decrement the reference counts before returning
|
||||
if (!s.ok()) {
|
||||
Py_DECREF(iterator);
|
||||
return s;
|
||||
}
|
||||
}
|
||||
Py_DECREF(iterator);
|
||||
}
|
||||
|
||||
@@ -18,13 +18,11 @@ using namespace numbuf;
|
||||
|
||||
extern "C" {
|
||||
|
||||
// Error handling
|
||||
|
||||
static PyObject *NumbufError;
|
||||
|
||||
int PyObjectToArrow(PyObject* object, std::shared_ptr<Array> **result) {
|
||||
int PyObjectToArrow(PyObject* object, std::shared_ptr<RowBatch> **result) {
|
||||
if (PyCapsule_IsValid(object, "arrow")) {
|
||||
*result = reinterpret_cast<std::shared_ptr<Array>*>(PyCapsule_GetPointer(object, "arrow"));
|
||||
*result = reinterpret_cast<std::shared_ptr<RowBatch>*>(PyCapsule_GetPointer(object, "arrow"));
|
||||
return 1;
|
||||
} else {
|
||||
PyErr_SetString(PyExc_TypeError, "must be an 'arrow' capsule");
|
||||
@@ -33,7 +31,7 @@ int PyObjectToArrow(PyObject* object, std::shared_ptr<Array> **result) {
|
||||
}
|
||||
|
||||
static void ArrowCapsule_Destructor(PyObject* capsule) {
|
||||
delete reinterpret_cast<std::shared_ptr<Array>*>(PyCapsule_GetPointer(capsule, "arrow"));
|
||||
delete reinterpret_cast<std::shared_ptr<RowBatch>*>(PyCapsule_GetPointer(capsule, "arrow"));
|
||||
}
|
||||
|
||||
std::shared_ptr<RowBatch> make_row_batch(std::shared_ptr<Array> data) {
|
||||
@@ -48,44 +46,45 @@ std::shared_ptr<RowBatch> make_row_batch(std::shared_ptr<Array> data) {
|
||||
The argument must be a Python list
|
||||
|
||||
\returns
|
||||
A Python "arrow" capsule containing the arrow::Array
|
||||
A bytearray object containing the schema metadata
|
||||
|
||||
The size in bytes the serialized object will occupy in memory
|
||||
|
||||
A Python "arrow" capsule containing the RowBatch
|
||||
*/
|
||||
PyObject* serialize_list(PyObject* self, PyObject* args) {
|
||||
PyObject* value;
|
||||
if (!PyArg_ParseTuple(args, "O", &value)) {
|
||||
return NULL;
|
||||
}
|
||||
std::shared_ptr<Array>* result = new std::shared_ptr<Array>();
|
||||
std::shared_ptr<Array> array;
|
||||
if (PyList_Check(value)) {
|
||||
Status s = SerializeSequences(std::vector<PyObject*>({value}), result);
|
||||
Status s = SerializeSequences(std::vector<PyObject*>({value}), &array);
|
||||
if (!s.ok()) {
|
||||
PyErr_SetString(NumbufError, s.ToString().c_str());
|
||||
return NULL;
|
||||
}
|
||||
return PyCapsule_New(reinterpret_cast<void*>(result), "arrow", &ArrowCapsule_Destructor);
|
||||
|
||||
auto batch = new std::shared_ptr<RowBatch>();
|
||||
*batch = make_row_batch(array);
|
||||
|
||||
int64_t size = 0;
|
||||
ARROW_CHECK_OK(arrow::ipc::GetRowBatchSize(batch->get(), &size));
|
||||
|
||||
std::shared_ptr<Buffer> buffer;
|
||||
ARROW_CHECK_OK(ipc::WriteSchema((*batch)->schema().get(), &buffer));
|
||||
auto ptr = reinterpret_cast<const char*>(buffer->data());
|
||||
|
||||
PyObject* r = PyTuple_New(3);
|
||||
PyTuple_SetItem(r, 0, PyByteArray_FromStringAndSize(ptr, buffer->size()));
|
||||
PyTuple_SetItem(r, 1, PyInt_FromLong(size));
|
||||
PyTuple_SetItem(r, 2, PyCapsule_New(reinterpret_cast<void*>(batch),
|
||||
"arrow", &ArrowCapsule_Destructor));
|
||||
return r;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*! Number of bytes the serialized version of the object will take.
|
||||
|
||||
\param args
|
||||
A Python "arrow" capsule containing the arrow::Array
|
||||
|
||||
\returns
|
||||
Size of the object in memory once it is serialized
|
||||
*/
|
||||
PyObject* get_serialized_size(PyObject* self, PyObject* args) {
|
||||
std::shared_ptr<Array>* data;
|
||||
if (!PyArg_ParseTuple(args, "O&", &PyObjectToArrow, &data)) {
|
||||
return NULL;
|
||||
}
|
||||
auto batch = make_row_batch(*data);
|
||||
int64_t size = 0;
|
||||
ARROW_CHECK_OK(arrow::ipc::GetRowBatchSize(batch.get(), &size));
|
||||
return PyInt_FromLong(size);
|
||||
}
|
||||
|
||||
/*! Serialize an arrow::Array into a buffer.
|
||||
|
||||
\param args
|
||||
@@ -96,42 +95,21 @@ PyObject* get_serialized_size(PyObject* self, PyObject* args) {
|
||||
The arrow metadata offset for the arrow metadata
|
||||
*/
|
||||
PyObject* write_to_buffer(PyObject* self, PyObject* args) {
|
||||
std::shared_ptr<Array>* data;
|
||||
std::shared_ptr<RowBatch>* batch;
|
||||
PyObject* memoryview;
|
||||
if (!PyArg_ParseTuple(args, "O&O", &PyObjectToArrow, &data, &memoryview)) {
|
||||
if (!PyArg_ParseTuple(args, "O&O", &PyObjectToArrow, &batch, &memoryview)) {
|
||||
return NULL;
|
||||
}
|
||||
if (!PyMemoryView_Check(memoryview)) {
|
||||
return NULL;
|
||||
}
|
||||
auto batch = make_row_batch(*data);
|
||||
Py_buffer* buffer = PyMemoryView_GET_BUFFER(memoryview);
|
||||
auto target = std::make_shared<BufferSource>(reinterpret_cast<uint8_t*>(buffer->buf), buffer->len);
|
||||
int64_t metadata_offset;
|
||||
ARROW_CHECK_OK(ipc::WriteRowBatch(target.get(), batch.get(), 0, &metadata_offset));
|
||||
ARROW_CHECK_OK(ipc::WriteRowBatch(target.get(), batch->get(), 0, &metadata_offset));
|
||||
return PyInt_FromLong(metadata_offset);
|
||||
}
|
||||
|
||||
/*! Serialize schema metadata associated to and arrow::Array
|
||||
|
||||
\param args
|
||||
A Python "arrow" capsule containing the arrow::Array
|
||||
|
||||
\return
|
||||
A bytearray object containing the schema metadata
|
||||
*/
|
||||
PyObject* get_schema_metadata(PyObject* self, PyObject* args) {
|
||||
std::shared_ptr<Array>* data;
|
||||
if (!PyArg_ParseTuple(args, "O&", &PyObjectToArrow, &data)) {
|
||||
return NULL;
|
||||
}
|
||||
auto batch = make_row_batch(*data);
|
||||
std::shared_ptr<Buffer> buffer;
|
||||
ARROW_CHECK_OK(ipc::WriteSchema(batch->schema().get(), &buffer));
|
||||
auto ptr = reinterpret_cast<const char*>(buffer->data());
|
||||
return PyByteArray_FromStringAndSize(ptr, buffer->size());
|
||||
}
|
||||
|
||||
/*! Read serialized data from buffer and produce an arrow capsule
|
||||
|
||||
\param args
|
||||
@@ -139,7 +117,7 @@ PyObject* get_schema_metadata(PyObject* self, PyObject* args) {
|
||||
a Python bytearray containing the metadata and the metadata_offset
|
||||
|
||||
\return
|
||||
A Python "arrow" capsule containing the arrow data
|
||||
A Python "arrow" capsule containing the arrow RowBatch
|
||||
*/
|
||||
PyObject* read_from_buffer(PyObject* self, PyObject* args) {
|
||||
PyObject* memoryview;
|
||||
@@ -162,33 +140,30 @@ PyObject* read_from_buffer(PyObject* self, PyObject* args) {
|
||||
auto source = std::make_shared<BufferSource>(reinterpret_cast<uint8_t*>(buffer->buf), buffer->len);
|
||||
std::shared_ptr<arrow::ipc::RowBatchReader> reader;
|
||||
ARROW_CHECK_OK(arrow::ipc::RowBatchReader::Open(source.get(), metadata_offset, &reader));
|
||||
std::shared_ptr<arrow::RowBatch> data;
|
||||
ARROW_CHECK_OK(reader->GetRowBatch(schema, &data));
|
||||
auto batch = new std::shared_ptr<arrow::RowBatch>();
|
||||
ARROW_CHECK_OK(reader->GetRowBatch(schema, batch));
|
||||
|
||||
std::shared_ptr<Array>* result = new std::shared_ptr<Array>();
|
||||
*result = data->column(0);
|
||||
return PyCapsule_New(reinterpret_cast<void*>(result), "arrow", &ArrowCapsule_Destructor);
|
||||
return PyCapsule_New(reinterpret_cast<void*>(batch),
|
||||
"arrow", &ArrowCapsule_Destructor);
|
||||
}
|
||||
|
||||
/*!
|
||||
*/
|
||||
PyObject* deserialize_list(PyObject* self, PyObject* args) {
|
||||
std::shared_ptr<Array>* data;
|
||||
std::shared_ptr<RowBatch>* data;
|
||||
if (!PyArg_ParseTuple(args, "O&", &PyObjectToArrow, &data)) {
|
||||
return NULL;
|
||||
}
|
||||
PyObject* result;
|
||||
ARROW_CHECK_OK(DeserializeList(*data, 0, (*data)->length(), &result));
|
||||
ARROW_CHECK_OK(DeserializeList((*data)->column(0), 0, (*data)->num_rows(), &result));
|
||||
return result;
|
||||
}
|
||||
|
||||
static PyMethodDef NumbufMethods[] = {
|
||||
{ "serialize_list", serialize_list, METH_VARARGS, "serialize a Python list" },
|
||||
{ "deserialize_list", deserialize_list, METH_VARARGS, "deserialize a Python list" },
|
||||
{ "get_serialized_size", get_serialized_size, METH_VARARGS, "get the number of bytes the object will occupy once serialized" },
|
||||
{ "write_to_buffer", write_to_buffer, METH_VARARGS, "write serialized data to buffer"},
|
||||
{ "read_from_buffer", read_from_buffer, METH_VARARGS, "read serialized data from buffer"},
|
||||
{ "get_schema_metadata", get_schema_metadata, METH_VARARGS, "return the schema of an arrow object"},
|
||||
{ NULL, NULL, 0, NULL }
|
||||
};
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ TEST_OBJECTS = [[1, "hello", 3.0], 42, 43L, "hello world", 42.0, 1L << 62,
|
||||
class SerializationTests(unittest.TestCase):
|
||||
|
||||
def roundTripTest(self, data):
|
||||
serialized = libnumbuf.serialize_list(data)
|
||||
schema, size, serialized = libnumbuf.serialize_list(data)
|
||||
result = libnumbuf.deserialize_list(serialized)
|
||||
assert_equal(data, result)
|
||||
|
||||
@@ -52,12 +52,13 @@ class SerializationTests(unittest.TestCase):
|
||||
|
||||
def testBuffer(self):
|
||||
for (i, obj) in enumerate(TEST_OBJECTS):
|
||||
x = libnumbuf.serialize_list([1, 2, 3])
|
||||
schema = libnumbuf.get_schema_metadata(x)
|
||||
size = libnumbuf.get_serialized_size(x) + 4096 # INITIAL_METADATA_SIZE in arrow
|
||||
schema, size, batch = libnumbuf.serialize_list([obj])
|
||||
size = size + 4096 # INITIAL_METADATA_SIZE in arrow
|
||||
buff = np.zeros(size, dtype="uint8")
|
||||
metadata_offset = libnumbuf.write_to_buffer(x, memoryview(buff))
|
||||
metadata_offset = libnumbuf.write_to_buffer(batch, memoryview(buff))
|
||||
array = libnumbuf.read_from_buffer(memoryview(buff), schema, metadata_offset)
|
||||
result = libnumbuf.deserialize_list(array)
|
||||
assert_equal(result[0], obj)
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user