mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 11:17:13 +08:00
Avoid segfaults in arrow if data is too large (#287)
* arrow limits * more logging * set the right limit * update * simplify * fix * account for subsequences * fixes and deactivate arrow limit tests in travis * fixes * Minor formatting. * Add a couple more tests.
This commit is contained in:
committed by
Robert Nishihara
parent
88a5b4e77b
commit
dd7e8d9105
@@ -8,6 +8,7 @@ SequenceBuilder::SequenceBuilder(MemoryPool* pool)
|
||||
: pool_(pool),
|
||||
types_(pool, std::make_shared<Int8Type>()),
|
||||
offsets_(pool, std::make_shared<Int32Type>()),
|
||||
total_num_bytes_(0),
|
||||
nones_(pool, std::make_shared<NullType>()),
|
||||
bools_(pool, std::make_shared<BooleanType>()),
|
||||
ints_(pool, std::make_shared<Int64Type>()),
|
||||
@@ -29,52 +30,68 @@ SequenceBuilder::SequenceBuilder(MemoryPool* pool)
|
||||
tuple_offsets_({0}),
|
||||
dict_offsets_({0}) {}
|
||||
|
||||
#define UPDATE(OFFSET, TAG) \
|
||||
if (TAG == -1) { \
|
||||
TAG = num_tags; \
|
||||
num_tags += 1; \
|
||||
} \
|
||||
RETURN_NOT_OK(offsets_.Append(OFFSET)); \
|
||||
RETURN_NOT_OK(types_.Append(TAG)); \
|
||||
/* We need to ensure that the number of bytes allocated by arrow
|
||||
* does not exceed 2**31 - 1. To make sure that is the case, allocation needs
|
||||
* to be capped at 2**29 - 1, because arrow calculates the next power of two
|
||||
* for allocations (see arrow::ArrayBuilder::Reserve).
|
||||
*/
|
||||
#define UPDATE(OFFSET, TAG) \
|
||||
if (total_num_bytes_ >= 1 << 29 - 1) { \
|
||||
return Status::NotImplemented("Sequence contains too many elements"); \
|
||||
} \
|
||||
if (TAG == -1) { \
|
||||
TAG = num_tags; \
|
||||
num_tags += 1; \
|
||||
} \
|
||||
RETURN_NOT_OK(offsets_.Append(OFFSET)); \
|
||||
RETURN_NOT_OK(types_.Append(TAG)); \
|
||||
RETURN_NOT_OK(nones_.AppendToBitmap(true));
|
||||
|
||||
Status SequenceBuilder::AppendNone() {
|
||||
total_num_bytes_ += sizeof(int32_t);
|
||||
RETURN_NOT_OK(offsets_.Append(0));
|
||||
RETURN_NOT_OK(types_.Append(0));
|
||||
return nones_.AppendToBitmap(false);
|
||||
}
|
||||
|
||||
Status SequenceBuilder::AppendBool(bool data) {
|
||||
total_num_bytes_ += sizeof(bool);
|
||||
UPDATE(bools_.length(), bool_tag);
|
||||
return bools_.Append(data);
|
||||
}
|
||||
|
||||
Status SequenceBuilder::AppendInt64(int64_t data) {
|
||||
total_num_bytes_ += sizeof(int64_t);
|
||||
UPDATE(ints_.length(), int_tag);
|
||||
return ints_.Append(data);
|
||||
}
|
||||
|
||||
Status SequenceBuilder::AppendUInt64(uint64_t data) {
|
||||
total_num_bytes_ += sizeof(uint64_t);
|
||||
UPDATE(ints_.length(), int_tag);
|
||||
return ints_.Append(data);
|
||||
}
|
||||
|
||||
Status SequenceBuilder::AppendBytes(const uint8_t* data, int32_t length) {
|
||||
total_num_bytes_ += length * sizeof(uint8_t);
|
||||
UPDATE(bytes_.length(), bytes_tag);
|
||||
return bytes_.Append(data, length);
|
||||
}
|
||||
|
||||
Status SequenceBuilder::AppendString(const char* data, int32_t length) {
|
||||
total_num_bytes_ += length * sizeof(char);
|
||||
UPDATE(strings_.length(), string_tag);
|
||||
return strings_.Append(data, length);
|
||||
}
|
||||
|
||||
Status SequenceBuilder::AppendFloat(float data) {
|
||||
total_num_bytes_ += sizeof(float);
|
||||
UPDATE(floats_.length(), float_tag);
|
||||
return floats_.Append(data);
|
||||
}
|
||||
|
||||
Status SequenceBuilder::AppendDouble(double data) {
|
||||
total_num_bytes_ += sizeof(double);
|
||||
UPDATE(doubles_.length(), double_tag);
|
||||
return doubles_.Append(data);
|
||||
}
|
||||
@@ -82,6 +99,11 @@ Status SequenceBuilder::AppendDouble(double data) {
|
||||
#define DEF_TENSOR_APPEND(NAME, TYPE, TAG) \
|
||||
Status SequenceBuilder::AppendTensor(const std::vector<int64_t>& dims, TYPE* data) { \
|
||||
if (TAG == -1) { NAME.Start(); } \
|
||||
int64_t size = 1; \
|
||||
for (auto dim : dims) { \
|
||||
size *= dim; \
|
||||
} \
|
||||
total_num_bytes_ += size * sizeof(TYPE); \
|
||||
UPDATE(NAME.length(), TAG); \
|
||||
return NAME.Append(dims, data); \
|
||||
}
|
||||
@@ -98,18 +120,27 @@ DEF_TENSOR_APPEND(float_tensors_, float, float_tensor_tag);
|
||||
DEF_TENSOR_APPEND(double_tensors_, double, double_tensor_tag);
|
||||
|
||||
Status SequenceBuilder::AppendList(int32_t size) {
|
||||
// Increase number of bytes to account for offsets
|
||||
// (types and bitmaps are smaller).
|
||||
total_num_bytes_ += size * sizeof(int32_t);
|
||||
UPDATE(list_offsets_.size() - 1, list_tag);
|
||||
list_offsets_.push_back(list_offsets_.back() + size);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SequenceBuilder::AppendTuple(int32_t size) {
|
||||
// Increase number of bytes to account for offsets
|
||||
// (types and bitmaps are smaller).
|
||||
total_num_bytes_ += size * sizeof(int32_t);
|
||||
UPDATE(tuple_offsets_.size() - 1, tuple_tag);
|
||||
tuple_offsets_.push_back(tuple_offsets_.back() + size);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SequenceBuilder::AppendDict(int32_t size) {
|
||||
// Increase number of bytes to account for offsets
|
||||
// (types and bitmaps are smaller).
|
||||
total_num_bytes_ += size * sizeof(int32_t);
|
||||
UPDATE(dict_offsets_.size() - 1, dict_tag);
|
||||
dict_offsets_.push_back(dict_offsets_.back() + size);
|
||||
return Status::OK();
|
||||
|
||||
@@ -58,7 +58,7 @@ class SequenceBuilder {
|
||||
arrow::Status AppendTensor(const std::vector<int64_t>& dims, float* data);
|
||||
arrow::Status AppendTensor(const std::vector<int64_t>& dims, double* data);
|
||||
|
||||
/*! Add a sublist to the sequenc. The data contained in the sublist will be
|
||||
/*! Add a sublist to the sequence. The data contained in the sublist will be
|
||||
specified in the "Finish" method.
|
||||
|
||||
To construct l = [[11, 22], 33, [44, 55]] you would for example run
|
||||
@@ -89,6 +89,9 @@ class SequenceBuilder {
|
||||
arrow::Int8Builder types_;
|
||||
arrow::Int32Builder offsets_;
|
||||
|
||||
/* Total number of bytes needed to represent this sequence. */
|
||||
int64_t total_num_bytes_;
|
||||
|
||||
arrow::NullArrayBuilder nones_;
|
||||
arrow::BooleanBuilder bools_;
|
||||
arrow::Int64Builder ints_;
|
||||
|
||||
@@ -6,6 +6,7 @@ import unittest
|
||||
import numbuf
|
||||
import numpy as np
|
||||
from numpy.testing import assert_equal
|
||||
import os
|
||||
import sys
|
||||
|
||||
TEST_OBJECTS = [{(1,2) : 1}, {() : 2}, [1, "hello", 3.0], 42, 43, "hello world",
|
||||
@@ -126,5 +127,43 @@ class SerializationTests(unittest.TestCase):
|
||||
with self.assertRaises(ValueError):
|
||||
result[0][0] = 1
|
||||
|
||||
def testArrowLimits(self):
|
||||
# Test that objects that are too large for Arrow throw a Python exception.
|
||||
# These tests give out of memory errors on Travis and need to be run on a
|
||||
# machine with lots of RAM.
|
||||
if os.getenv("TRAVIS") is None:
|
||||
l = 2 ** 29 * [1.0]
|
||||
with self.assertRaises(numbuf.numbuf_error):
|
||||
self.roundTripTest(l)
|
||||
self.roundTripTest([l])
|
||||
del l
|
||||
l = 2 ** 29 * ["s"]
|
||||
with self.assertRaises(numbuf.numbuf_error):
|
||||
self.roundTripTest(l)
|
||||
self.roundTripTest([l])
|
||||
del l
|
||||
l = 2 ** 29 * [["1"], 2, 3, [{"s": 4}]]
|
||||
with self.assertRaises(numbuf.numbuf_error):
|
||||
self.roundTripTest(l)
|
||||
self.roundTripTest([l])
|
||||
del l
|
||||
with self.assertRaises(numbuf.numbuf_error):
|
||||
l = 2 ** 29 * [{"s": 1}] + 2 ** 29 * [1.0]
|
||||
self.roundTripTest(l)
|
||||
self.roundTripTest([l])
|
||||
del l
|
||||
with self.assertRaises(numbuf.numbuf_error):
|
||||
l = np.zeros(2 ** 25)
|
||||
self.roundTripTest([l])
|
||||
del l
|
||||
with self.assertRaises(numbuf.numbuf_error):
|
||||
l = [np.zeros(2 ** 18) for _ in range(2 ** 7)]
|
||||
self.roundTripTest(l)
|
||||
self.roundTripTest([l])
|
||||
del l
|
||||
else:
|
||||
print("Not running testArrowLimits on Travis because of the test's "
|
||||
"memory requirements.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main(verbosity=2)
|
||||
|
||||
Reference in New Issue
Block a user