From dd7e8d91057eccfed151bf1c074a28fc6836e69a Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Thu, 16 Feb 2017 15:16:20 -0800 Subject: [PATCH] Avoid segfaults in arrow if data is too large (#287) * arrow limits * more logging * set the right limit * update * simplify * fix * account for subsequences * fixes and deactivate arrow limit tests in travis * fixes * Minor formatting. * Add a couple more tests. --- src/numbuf/cpp/src/numbuf/sequence.cc | 45 ++++++++++++++++++++++----- src/numbuf/cpp/src/numbuf/sequence.h | 5 ++- src/numbuf/python/test/runtest.py | 39 +++++++++++++++++++++++ 3 files changed, 81 insertions(+), 8 deletions(-) diff --git a/src/numbuf/cpp/src/numbuf/sequence.cc b/src/numbuf/cpp/src/numbuf/sequence.cc index b1b5fe2e6..c2d707742 100644 --- a/src/numbuf/cpp/src/numbuf/sequence.cc +++ b/src/numbuf/cpp/src/numbuf/sequence.cc @@ -8,6 +8,7 @@ SequenceBuilder::SequenceBuilder(MemoryPool* pool) : pool_(pool), types_(pool, std::make_shared()), offsets_(pool, std::make_shared()), + total_num_bytes_(0), nones_(pool, std::make_shared()), bools_(pool, std::make_shared()), ints_(pool, std::make_shared()), @@ -29,52 +30,68 @@ SequenceBuilder::SequenceBuilder(MemoryPool* pool) tuple_offsets_({0}), dict_offsets_({0}) {} -#define UPDATE(OFFSET, TAG) \ - if (TAG == -1) { \ - TAG = num_tags; \ - num_tags += 1; \ - } \ - RETURN_NOT_OK(offsets_.Append(OFFSET)); \ - RETURN_NOT_OK(types_.Append(TAG)); \ +/* We need to ensure that the number of bytes allocated by arrow + * does not exceed 2**31 - 1. To make sure that is the case, allocation needs + * to be capped at 2**29 - 1, because arrow calculates the next power of two + * for allocations (see arrow::ArrayBuilder::Reserve). + */ +#define UPDATE(OFFSET, TAG) \ + if (total_num_bytes_ >= 1 << 29 - 1) { \ + return Status::NotImplemented("Sequence contains too many elements"); \ + } \ + if (TAG == -1) { \ + TAG = num_tags; \ + num_tags += 1; \ + } \ + RETURN_NOT_OK(offsets_.Append(OFFSET)); \ + RETURN_NOT_OK(types_.Append(TAG)); \ RETURN_NOT_OK(nones_.AppendToBitmap(true)); Status SequenceBuilder::AppendNone() { + total_num_bytes_ += sizeof(int32_t); RETURN_NOT_OK(offsets_.Append(0)); RETURN_NOT_OK(types_.Append(0)); return nones_.AppendToBitmap(false); } Status SequenceBuilder::AppendBool(bool data) { + total_num_bytes_ += sizeof(bool); UPDATE(bools_.length(), bool_tag); return bools_.Append(data); } Status SequenceBuilder::AppendInt64(int64_t data) { + total_num_bytes_ += sizeof(int64_t); UPDATE(ints_.length(), int_tag); return ints_.Append(data); } Status SequenceBuilder::AppendUInt64(uint64_t data) { + total_num_bytes_ += sizeof(uint64_t); UPDATE(ints_.length(), int_tag); return ints_.Append(data); } Status SequenceBuilder::AppendBytes(const uint8_t* data, int32_t length) { + total_num_bytes_ += length * sizeof(uint8_t); UPDATE(bytes_.length(), bytes_tag); return bytes_.Append(data, length); } Status SequenceBuilder::AppendString(const char* data, int32_t length) { + total_num_bytes_ += length * sizeof(char); UPDATE(strings_.length(), string_tag); return strings_.Append(data, length); } Status SequenceBuilder::AppendFloat(float data) { + total_num_bytes_ += sizeof(float); UPDATE(floats_.length(), float_tag); return floats_.Append(data); } Status SequenceBuilder::AppendDouble(double data) { + total_num_bytes_ += sizeof(double); UPDATE(doubles_.length(), double_tag); return doubles_.Append(data); } @@ -82,6 +99,11 @@ Status SequenceBuilder::AppendDouble(double data) { #define DEF_TENSOR_APPEND(NAME, TYPE, TAG) \ Status SequenceBuilder::AppendTensor(const std::vector& dims, TYPE* data) { \ if (TAG == -1) { NAME.Start(); } \ + int64_t size = 1; \ + for (auto dim : dims) { \ + size *= dim; \ + } \ + total_num_bytes_ += size * sizeof(TYPE); \ UPDATE(NAME.length(), TAG); \ return NAME.Append(dims, data); \ } @@ -98,18 +120,27 @@ DEF_TENSOR_APPEND(float_tensors_, float, float_tensor_tag); DEF_TENSOR_APPEND(double_tensors_, double, double_tensor_tag); Status SequenceBuilder::AppendList(int32_t size) { + // Increase number of bytes to account for offsets + // (types and bitmaps are smaller). + total_num_bytes_ += size * sizeof(int32_t); UPDATE(list_offsets_.size() - 1, list_tag); list_offsets_.push_back(list_offsets_.back() + size); return Status::OK(); } Status SequenceBuilder::AppendTuple(int32_t size) { + // Increase number of bytes to account for offsets + // (types and bitmaps are smaller). + total_num_bytes_ += size * sizeof(int32_t); UPDATE(tuple_offsets_.size() - 1, tuple_tag); tuple_offsets_.push_back(tuple_offsets_.back() + size); return Status::OK(); } Status SequenceBuilder::AppendDict(int32_t size) { + // Increase number of bytes to account for offsets + // (types and bitmaps are smaller). + total_num_bytes_ += size * sizeof(int32_t); UPDATE(dict_offsets_.size() - 1, dict_tag); dict_offsets_.push_back(dict_offsets_.back() + size); return Status::OK(); diff --git a/src/numbuf/cpp/src/numbuf/sequence.h b/src/numbuf/cpp/src/numbuf/sequence.h index 4c87d81a6..476f9c45d 100644 --- a/src/numbuf/cpp/src/numbuf/sequence.h +++ b/src/numbuf/cpp/src/numbuf/sequence.h @@ -58,7 +58,7 @@ class SequenceBuilder { arrow::Status AppendTensor(const std::vector& dims, float* data); arrow::Status AppendTensor(const std::vector& dims, double* data); - /*! Add a sublist to the sequenc. The data contained in the sublist will be + /*! Add a sublist to the sequence. The data contained in the sublist will be specified in the "Finish" method. To construct l = [[11, 22], 33, [44, 55]] you would for example run @@ -89,6 +89,9 @@ class SequenceBuilder { arrow::Int8Builder types_; arrow::Int32Builder offsets_; + /* Total number of bytes needed to represent this sequence. */ + int64_t total_num_bytes_; + arrow::NullArrayBuilder nones_; arrow::BooleanBuilder bools_; arrow::Int64Builder ints_; diff --git a/src/numbuf/python/test/runtest.py b/src/numbuf/python/test/runtest.py index 82cacb999..67f59ed08 100644 --- a/src/numbuf/python/test/runtest.py +++ b/src/numbuf/python/test/runtest.py @@ -6,6 +6,7 @@ import unittest import numbuf import numpy as np from numpy.testing import assert_equal +import os import sys TEST_OBJECTS = [{(1,2) : 1}, {() : 2}, [1, "hello", 3.0], 42, 43, "hello world", @@ -126,5 +127,43 @@ class SerializationTests(unittest.TestCase): with self.assertRaises(ValueError): result[0][0] = 1 + def testArrowLimits(self): + # Test that objects that are too large for Arrow throw a Python exception. + # These tests give out of memory errors on Travis and need to be run on a + # machine with lots of RAM. + if os.getenv("TRAVIS") is None: + l = 2 ** 29 * [1.0] + with self.assertRaises(numbuf.numbuf_error): + self.roundTripTest(l) + self.roundTripTest([l]) + del l + l = 2 ** 29 * ["s"] + with self.assertRaises(numbuf.numbuf_error): + self.roundTripTest(l) + self.roundTripTest([l]) + del l + l = 2 ** 29 * [["1"], 2, 3, [{"s": 4}]] + with self.assertRaises(numbuf.numbuf_error): + self.roundTripTest(l) + self.roundTripTest([l]) + del l + with self.assertRaises(numbuf.numbuf_error): + l = 2 ** 29 * [{"s": 1}] + 2 ** 29 * [1.0] + self.roundTripTest(l) + self.roundTripTest([l]) + del l + with self.assertRaises(numbuf.numbuf_error): + l = np.zeros(2 ** 25) + self.roundTripTest([l]) + del l + with self.assertRaises(numbuf.numbuf_error): + l = [np.zeros(2 ** 18) for _ in range(2 ** 7)] + self.roundTripTest(l) + self.roundTripTest([l]) + del l + else: + print("Not running testArrowLimits on Travis because of the test's " + "memory requirements.") + if __name__ == "__main__": unittest.main(verbosity=2)