From 46cf433f0ec17aa9500f391a398330cd0107be90 Mon Sep 17 00:00:00 2001 From: "Siyuan (Ryans) Zhuang" Date: Mon, 4 Jan 2021 11:19:09 -0800 Subject: [PATCH] [Core] Remove Arrow dependencies (#13157) * remove arrow ubsan * remove arrow build depend * remove arrow buffer --- BUILD.bazel | 2 - bazel/BUILD.arrow | 97 ------------------- bazel/ray_deps_setup.bzl | 10 -- ci/travis/bazel-format.sh | 2 +- src/ray/common/buffer.h | 67 +++++++++++-- .../store_provider/plasma_store_provider.cc | 12 +-- src/ray/object_manager/object_buffer_pool.cc | 12 +-- src/ray/object_manager/plasma/client.cc | 38 ++++---- src/ray/object_manager/plasma/client.h | 15 +-- src/ray/object_manager/plasma/protocol.cc | 45 ++++++--- .../test/object_manager_stress_test.cc | 12 +-- .../test/object_manager_test.cc | 2 +- src/ray/raylet/node_manager.cc | 2 +- thirdparty/patches/arrow-windows-export.patch | 8 -- 14 files changed, 135 insertions(+), 189 deletions(-) delete mode 100644 bazel/BUILD.arrow delete mode 100644 thirdparty/patches/arrow-windows-export.patch diff --git a/BUILD.bazel b/BUILD.bazel index 03bcc36aa..126dc6b45 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -274,7 +274,6 @@ cc_library( ":ray_common", ":ray_util", "//src/ray/protobuf:common_cc_proto", - "@arrow", "@com_github_google_glog//:glog", "@msgpack", ], @@ -360,7 +359,6 @@ cc_library( ":ray_util", "//src/ray/protobuf:common_cc_proto", "//src/ray/protobuf:gcs_cc_proto", - "@arrow", "@boost//:asio", "@com_github_grpc_grpc//:grpc++", "@com_google_absl//absl/container:flat_hash_map", diff --git a/bazel/BUILD.arrow b/bazel/BUILD.arrow deleted file mode 100644 index 444a3d64e..000000000 --- a/bazel/BUILD.arrow +++ /dev/null @@ -1,97 +0,0 @@ -load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library") - -# TODO(mehrdadn): (How to) support dynamic linking? -PROPAGATED_WINDOWS_DEFINES = ["ARROW_STATIC"] - -COPTS = [] + select({ - "@bazel_tools//src/conditions:windows": [ - "-D" + define - for define in PROPAGATED_WINDOWS_DEFINES - ], - "//conditions:default": [ - "-DARROW_USE_GLOG", - ], -}) - -LINKOPTS = [] + select({ - "@bazel_tools//src/conditions:windows": [ - "-DefaultLib:" + "ws2_32.lib", - ], - "//conditions:default": [ - ], -}) - -cc_library( - name = "arrow", - srcs = [ - "cpp/src/arrow/buffer.cc", - "cpp/src/arrow/device.cc", - "cpp/src/arrow/io/interfaces.cc", - "cpp/src/arrow/io/memory.cc", - "cpp/src/arrow/memory_pool.cc", - "cpp/src/arrow/result.cc", - "cpp/src/arrow/status.cc", - "cpp/src/arrow/util/future.cc", - "cpp/src/arrow/util/io_util.cc", - "cpp/src/arrow/util/logging.cc", - "cpp/src/arrow/util/memory.cc", - "cpp/src/arrow/util/string.cc", - "cpp/src/arrow/util/string_builder.cc", - "cpp/src/arrow/util/thread_pool.cc", - "cpp/src/arrow/util/utf8.cc", - ], - hdrs = [ - "cpp/src/arrow/buffer.h", - "cpp/src/arrow/device.h", - "cpp/src/arrow/io/concurrency.h", - "cpp/src/arrow/io/interfaces.h", - "cpp/src/arrow/io/memory.h", - "cpp/src/arrow/io/mman.h", - "cpp/src/arrow/io/type_fwd.h", - "cpp/src/arrow/io/util_internal.h", - "cpp/src/arrow/memory_pool.h", - "cpp/src/arrow/result.h", - "cpp/src/arrow/status.h", - "cpp/src/arrow/type_fwd.h", - "cpp/src/arrow/util/bit_util.h", - "cpp/src/arrow/util/checked_cast.h", - "cpp/src/arrow/util/compare.h", - "cpp/src/arrow/util/functional.h", - "cpp/src/arrow/util/future.h", - "cpp/src/arrow/util/io_util.h", - "cpp/src/arrow/util/iterator.h", - "cpp/src/arrow/util/logging.h", - "cpp/src/arrow/util/macros.h", - "cpp/src/arrow/util/make_unique.h", - "cpp/src/arrow/util/memory.h", - "cpp/src/arrow/util/optional.h", - "cpp/src/arrow/util/string.h", - "cpp/src/arrow/util/string_builder.h", - "cpp/src/arrow/util/string_view.h", - "cpp/src/arrow/util/thread_pool.h", - "cpp/src/arrow/util/type_traits.h", - "cpp/src/arrow/util/ubsan.h", - "cpp/src/arrow/util/utf8.h", - "cpp/src/arrow/util/variant.h", - "cpp/src/arrow/util/visibility.h", - "cpp/src/arrow/util/windows_compatibility.h", - "cpp/src/arrow/util/windows_fixup.h", - "cpp/src/arrow/vendored/optional.hpp", - "cpp/src/arrow/vendored/string_view.hpp", - "cpp/src/arrow/vendored/utf8cpp/checked.h", - "cpp/src/arrow/vendored/utf8cpp/core.h", - "cpp/src/arrow/vendored/variant.hpp", - "cpp/src/arrow/vendored/xxhash.h", - "cpp/src/arrow/vendored/xxhash/xxh3.h", - "cpp/src/arrow/vendored/xxhash/xxhash.c", - "cpp/src/arrow/vendored/xxhash/xxhash.h", - ], - copts = COPTS, - linkopts = LINKOPTS, - strip_include_prefix = "cpp/src", - visibility = ["//visibility:public"], - deps = [ - "@boost//:filesystem", - "@com_github_google_glog//:glog", - ], -) diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index 5a26e9696..c398dd15d 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -184,16 +184,6 @@ def ray_deps_setup(): ], ) - auto_http_archive( - name = "arrow", - build_file = True, - url = "https://github.com/apache/arrow/archive/af45b9212156980f55c399e2e88b4e19b4bb8ec1.tar.gz", - sha256 = "2f0aaa50053792aa274b402f2530e63c1542085021cfef83beee9281412c12f6", - patches = [ - "//thirdparty/patches:arrow-windows-export.patch", - ], - ) - auto_http_archive( name = "cython", build_file = True, diff --git a/ci/travis/bazel-format.sh b/ci/travis/bazel-format.sh index 86bbb9f8c..3910529a4 100755 --- a/ci/travis/bazel-format.sh +++ b/ci/travis/bazel-format.sh @@ -44,7 +44,7 @@ while [ $# -gt 0 ]; do done pushd "$ROOT_DIR"/../.. -BAZEL_FILES=(bazel/BUILD bazel/BUILD.arrow bazel/ray.bzl BUILD.bazel java/BUILD.bazel \ +BAZEL_FILES=(bazel/BUILD bazel/ray.bzl BUILD.bazel java/BUILD.bazel \ cpp/BUILD.bazel streaming/BUILD.bazel streaming/java/BUILD.bazel WORKSPACE) buildifier -mode=$RUN_TYPE -diff_command="diff -u" "${BAZEL_FILES[@]}" popd diff --git a/src/ray/common/buffer.h b/src/ray/common/buffer.h index ccaa6c9fb..2bcc92751 100644 --- a/src/ray/common/buffer.h +++ b/src/ray/common/buffer.h @@ -17,7 +17,9 @@ #include #include -#include "arrow/buffer.h" +#include +#include + #include "ray/common/status.h" #include "ray/thirdparty/aligned_alloc.h" @@ -117,17 +119,70 @@ class LocalMemoryBuffer : public Buffer { uint8_t *buffer_ = NULL; }; +/// Represents a byte buffer in shared memory. +class SharedMemoryBuffer : public Buffer { + public: + /// Constructor. + /// + /// By default when initializing a SharedMemoryBuffer with a data pointer and a length, + /// it just assigns the pointer and length without coping the data content. This is + /// for performance reasons. In this case the buffer cannot ensure data validity. It + /// instead relies on the lifetime passed in data pointer. + /// + /// \param data The data pointer to the passed-in buffer. + /// \param size The size of the passed in buffer. + SharedMemoryBuffer(uint8_t *data, size_t size) { + data_ = data; + size_ = size; + } + + /// Make a slice. + SharedMemoryBuffer(const std::shared_ptr &buffer, int64_t offset, int64_t size) + : size_(size), parent_(buffer) { + data_ = buffer->Data() + offset; + RAY_CHECK(size_ <= parent_->Size()); + } + + static std::shared_ptr Slice(const std::shared_ptr &buffer, + int64_t offset, int64_t size) { + return std::make_shared(buffer, offset, size); + } + + uint8_t *Data() const override { return data_; } + + size_t Size() const override { return size_; } + + bool OwnsData() const override { return false; } + + bool IsPlasmaBuffer() const override { return true; } + + ~SharedMemoryBuffer() = default; + + private: + /// Disable copy constructor and assignment, as default copy will + /// cause invalid data_. + SharedMemoryBuffer &operator=(const LocalMemoryBuffer &) = delete; + SharedMemoryBuffer(const LocalMemoryBuffer &) = delete; + + /// Pointer to the data. + uint8_t *data_; + /// Size of the buffer. + size_t size_; + /// Keep the parent where the buffer is sliced from. + std::shared_ptr parent_; +}; + /// Represents a byte buffer for plasma object. This can be used to hold the /// reference to a plasma object (via the underlying plasma::PlasmaBuffer). class PlasmaBuffer : public Buffer { public: - PlasmaBuffer(std::shared_ptr buffer, + PlasmaBuffer(std::shared_ptr buffer, std::function on_delete = nullptr) : buffer_(buffer), on_delete_(on_delete) {} - uint8_t *Data() const override { return const_cast(buffer_->data()); } + uint8_t *Data() const override { return buffer_->Data(); } - size_t Size() const override { return buffer_->size(); } + size_t Size() const override { return buffer_->Size(); } bool OwnsData() const override { return true; } @@ -140,9 +195,9 @@ class PlasmaBuffer : public Buffer { }; private: - /// shared_ptr to arrow buffer which can potentially hold a reference + /// shared_ptr to a buffer which can potentially hold a reference /// for the object (when it's a plasma::PlasmaBuffer). - std::shared_ptr buffer_; + std::shared_ptr buffer_; /// Callback to run on destruction. std::function on_delete_; }; diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index f7559e9b9..0d6789dc4 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -85,13 +85,13 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &meta const rpc::Address &owner_address, std::shared_ptr *data) { Status status; - std::shared_ptr arrow_buffer; + std::shared_ptr plasma_buffer; uint64_t retry_with_request_id = 0; { std::lock_guard guard(store_client_mutex_); status = store_client_.Create( object_id, owner_address, data_size, metadata ? metadata->Data() : nullptr, - metadata ? metadata->Size() : 0, &retry_with_request_id, &arrow_buffer, + metadata ? metadata->Size() : 0, &retry_with_request_id, &plasma_buffer, /*device_num=*/0); } @@ -104,7 +104,7 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &meta << retry_with_request_id; status = store_client_.RetryCreate(object_id, retry_with_request_id, metadata ? metadata->Data() : nullptr, - &retry_with_request_id, &arrow_buffer); + &retry_with_request_id, &plasma_buffer); } } @@ -129,7 +129,7 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &meta status = Status::OK(); } else { RAY_RETURN_NOT_OK(status); - *data = std::make_shared(PlasmaBuffer(arrow_buffer)); + *data = std::make_shared(PlasmaBuffer(plasma_buffer)); } return status; } @@ -173,7 +173,7 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore( const auto &object_id = batch_ids[i]; std::shared_ptr data = nullptr; std::shared_ptr metadata = nullptr; - if (plasma_results[i].data && plasma_results[i].data->size()) { + if (plasma_results[i].data && plasma_results[i].data->Size()) { // We track the set of active data buffers in active_buffers_. On destruction, // the buffer entry will be removed from the set via callback. std::shared_ptr tracker = buffer_tracker_; @@ -190,7 +190,7 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore( tracker->active_buffers_[std::make_pair(object_id, data.get())] = call_site; } } - if (plasma_results[i].metadata && plasma_results[i].metadata->size()) { + if (plasma_results[i].metadata && plasma_results[i].metadata->Size()) { metadata = std::make_shared(plasma_results[i].metadata); } const auto result_object = diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index cdeaa7b84..4b6a44e6b 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -64,11 +64,11 @@ std::pair ObjectBufferPool::Ge errored_chunk_, ray::Status::IOError("Unable to obtain object chunk, object not local.")); } - RAY_CHECK(object_buffer.metadata->data() == - object_buffer.data->data() + object_buffer.data->size()); - RAY_CHECK(data_size == static_cast(object_buffer.data->size() + - object_buffer.metadata->size())); - auto *data = const_cast(object_buffer.data->data()); + RAY_CHECK(object_buffer.metadata->Data() == + object_buffer.data->Data() + object_buffer.data->Size()); + RAY_CHECK(data_size == static_cast(object_buffer.data->Size() + + object_buffer.metadata->Size())); + auto *data = object_buffer.data->Data(); uint64_t num_chunks = GetNumChunks(data_size); get_buffer_state_.emplace( std::piecewise_construct, std::forward_as_tuple(object_id), @@ -115,7 +115,7 @@ std::pair ObjectBufferPool::Cr errored_chunk_, ray::Status::IOError(s.message())); } // Read object into store. - uint8_t *mutable_data = data->mutable_data(); + uint8_t *mutable_data = data->Data(); uint64_t num_chunks = GetNumChunks(data_size); create_buffer_state_.emplace( std::piecewise_construct, std::forward_as_tuple(object_id), diff --git a/src/ray/object_manager/plasma/client.cc b/src/ray/object_manager/plasma/client.cc index 5d3a9c7d3..a5429d985 100644 --- a/src/ray/object_manager/plasma/client.cc +++ b/src/ray/object_manager/plasma/client.cc @@ -31,8 +31,6 @@ #include -#include "arrow/buffer.h" - #include "ray/object_manager/plasma/connection.h" #include "ray/object_manager/plasma/plasma.h" #include "ray/object_manager/plasma/protocol.h" @@ -45,24 +43,20 @@ namespace plasma { using fb::MessageType; using fb::PlasmaError; -using arrow::MutableBuffer; - // ---------------------------------------------------------------------- // PlasmaBuffer /// A Buffer class that automatically releases the backing plasma object /// when it goes out of scope. This is returned by Get. -class RAY_NO_EXPORT PlasmaBuffer : public Buffer { +class RAY_NO_EXPORT PlasmaBuffer : public SharedMemoryBuffer { public: ~PlasmaBuffer(); PlasmaBuffer(std::shared_ptr client, const ObjectID &object_id, const std::shared_ptr &buffer) - : Buffer(buffer, 0, buffer->size()), client_(client), object_id_(object_id) { - if (buffer->is_mutable()) { - is_mutable_ = true; - } - } + : SharedMemoryBuffer(buffer, 0, buffer->Size()), + client_(client), + object_id_(object_id) {} private: std::shared_ptr client_; @@ -72,11 +66,11 @@ class RAY_NO_EXPORT PlasmaBuffer : public Buffer { /// A mutable Buffer class that keeps the backing data alive by keeping a /// PlasmaClient shared pointer. This is returned by Create. Release will /// be called in the associated Seal call. -class RAY_NO_EXPORT PlasmaMutableBuffer : public MutableBuffer { +class RAY_NO_EXPORT PlasmaMutableBuffer : public SharedMemoryBuffer { public: PlasmaMutableBuffer(std::shared_ptr client, uint8_t *mutable_data, int64_t data_size) - : MutableBuffer(mutable_data, data_size), client_(client) {} + : SharedMemoryBuffer(mutable_data, data_size), client_(client) {} private: std::shared_ptr client_; @@ -308,7 +302,7 @@ Status PlasmaClient::Impl::HandleCreateReply(const ObjectID &object_id, // from the transfer. if (metadata != NULL) { // Copy the metadata to the buffer. - memcpy((*data)->mutable_data() + object.data_size, metadata, object.metadata_size); + memcpy((*data)->Data() + object.data_size, metadata, object.metadata_size); } } else { RAY_LOG(FATAL) << "GPU is not enabled."; @@ -392,15 +386,16 @@ Status PlasmaClient::Impl::GetBuffers( if (object->device_num == 0) { uint8_t *data = LookupMmappedFile(object->store_fd); - physical_buf = std::make_shared( + physical_buf = std::make_shared( data + object->data_offset, object->data_size + object->metadata_size); } else { RAY_LOG(FATAL) << "GPU library is not enabled."; } physical_buf = wrap_buffer(object_ids[i], physical_buf); - object_buffers[i].data = SliceBuffer(physical_buf, 0, object->data_size); - object_buffers[i].metadata = - SliceBuffer(physical_buf, object->data_size, object->metadata_size); + object_buffers[i].data = + SharedMemoryBuffer::Slice(physical_buf, 0, object->data_size); + object_buffers[i].metadata = SharedMemoryBuffer::Slice( + physical_buf, object->data_size, object->metadata_size); object_buffers[i].device_num = object->device_num; // Increment the count of the number of instances of this object that this // client is using. Cache the reference to the object. @@ -449,16 +444,17 @@ Status PlasmaClient::Impl::GetBuffers( std::shared_ptr physical_buf; if (object->device_num == 0) { uint8_t *data = LookupMmappedFile(object->store_fd); - physical_buf = std::make_shared( + physical_buf = std::make_shared( data + object->data_offset, object->data_size + object->metadata_size); } else { RAY_LOG(FATAL) << "Arrow GPU library is not enabled."; } // Finish filling out the return values. physical_buf = wrap_buffer(object_ids[i], physical_buf); - object_buffers[i].data = SliceBuffer(physical_buf, 0, object->data_size); - object_buffers[i].metadata = - SliceBuffer(physical_buf, object->data_size, object->metadata_size); + object_buffers[i].data = + SharedMemoryBuffer::Slice(physical_buf, 0, object->data_size); + object_buffers[i].metadata = SharedMemoryBuffer::Slice( + physical_buf, object->data_size, object->metadata_size); object_buffers[i].device_num = object->device_num; // Increment the count of the number of instances of this object that this // client is using. Cache the reference to the object. diff --git a/src/ray/object_manager/plasma/client.h b/src/ray/object_manager/plasma/client.h index 8c5e607b8..e88a9eb13 100644 --- a/src/ray/object_manager/plasma/client.h +++ b/src/ray/object_manager/plasma/client.h @@ -22,25 +22,24 @@ #include #include -#include "arrow/buffer.h" - +#include "ray/common/buffer.h" #include "ray/common/status.h" #include "ray/object_manager/plasma/common.h" #include "ray/util/visibility.h" #include "src/ray/protobuf/common.pb.h" -using arrow::Buffer; - namespace plasma { +using ray::Buffer; +using ray::SharedMemoryBuffer; using ray::Status; /// Object buffer data structure. struct ObjectBuffer { /// The data buffer. - std::shared_ptr data; + std::shared_ptr data; /// The metadata buffer. - std::shared_ptr metadata; + std::shared_ptr metadata; /// The device number. int device_num; }; @@ -272,10 +271,6 @@ class PlasmaClient { private: friend class PlasmaBuffer; friend class PlasmaMutableBuffer; - FRIEND_TEST(TestPlasmaStore, GetTest); - FRIEND_TEST(TestPlasmaStore, LegacyGetTest); - FRIEND_TEST(TestPlasmaStore, AbortTest); - bool IsInUse(const ObjectID &object_id); class Impl; diff --git a/src/ray/object_manager/plasma/protocol.cc b/src/ray/object_manager/plasma/protocol.cc index d321a2624..8c3164d6a 100644 --- a/src/ray/object_manager/plasma/protocol.cc +++ b/src/ray/object_manager/plasma/protocol.cc @@ -19,7 +19,6 @@ #include -#include "arrow/util/ubsan.h" #include "flatbuffers/flatbuffers.h" #include "ray/object_manager/plasma/common.h" #include "ray/object_manager/plasma/connection.h" @@ -35,8 +34,28 @@ using fb::PlasmaObjectSpec; using flatbuffers::uoffset_t; -#define PLASMA_CHECK_ENUM(x, y) \ - static_assert(static_cast(x) == static_cast(y), "protocol mismatch") +namespace internal { + +static uint8_t non_null_filler; + +} // namespace internal + +/// \brief Returns maybe_null if not null or a non-null pointer to an arbitrary memory +/// that shouldn't be dereferenced. +/// +/// Memset/Memcpy are undefined when a nullptr is passed as an argument use this utility +/// method to wrap locations where this could happen. +/// +/// Note: Flatbuffers has UBSan warnings if a zero length vector is passed. +/// https://github.com/google/flatbuffers/pull/5355 is trying to resolve +/// them. +template +inline T *MakeNonNull(T *maybe_null) { + if (RAY_PREDICT_TRUE(maybe_null != nullptr)) { + return maybe_null; + } + return reinterpret_cast(&internal::non_null_filler); +} flatbuffers::Offset>> ToFlatbuffer(flatbuffers::FlatBufferBuilder *fbb, const ObjectID *object_ids, @@ -45,7 +64,7 @@ ToFlatbuffer(flatbuffers::FlatBufferBuilder *fbb, const ObjectID *object_ids, for (int64_t i = 0; i < num_objects; i++) { results.push_back(fbb->CreateString(object_ids[i].Binary())); } - return fbb->CreateVector(arrow::util::MakeNonNull(results.data()), results.size()); + return fbb->CreateVector(MakeNonNull(results.data()), results.size()); } flatbuffers::Offset>> @@ -56,12 +75,12 @@ ToFlatbuffer(flatbuffers::FlatBufferBuilder *fbb, results.push_back(fbb->CreateString(strings[i])); } - return fbb->CreateVector(arrow::util::MakeNonNull(results.data()), results.size()); + return fbb->CreateVector(MakeNonNull(results.data()), results.size()); } flatbuffers::Offset> ToFlatbuffer( flatbuffers::FlatBufferBuilder *fbb, const std::vector &data) { - return fbb->CreateVector(arrow::util::MakeNonNull(data.data()), data.size()); + return fbb->CreateVector(MakeNonNull(data.data()), data.size()); } Status PlasmaReceive(const std::shared_ptr &store_conn, @@ -418,9 +437,8 @@ Status SendDeleteReply(const std::shared_ptr &client, auto message = fb::CreatePlasmaDeleteReply( fbb, static_cast(object_ids.size()), ToFlatbuffer(&fbb, &object_ids[0], object_ids.size()), - fbb.CreateVector( - arrow::util::MakeNonNull(reinterpret_cast(errors.data())), - object_ids.size())); + fbb.CreateVector(MakeNonNull(reinterpret_cast(errors.data())), + object_ids.size())); return PlasmaSend(client, MessageType::PlasmaDeleteReply, &fbb, message); } @@ -576,11 +594,10 @@ Status SendGetReply(const std::shared_ptr &client, ObjectID object_ids[] } auto message = fb::CreatePlasmaGetReply( fbb, ToFlatbuffer(&fbb, object_ids, num_objects), - fbb.CreateVectorOfStructs(arrow::util::MakeNonNull(objects.data()), num_objects), - fbb.CreateVector(arrow::util::MakeNonNull(store_fds_as_int.data()), - store_fds_as_int.size()), - fbb.CreateVector(arrow::util::MakeNonNull(mmap_sizes.data()), mmap_sizes.size()), - fbb.CreateVector(arrow::util::MakeNonNull(handles.data()), handles.size())); + fbb.CreateVectorOfStructs(MakeNonNull(objects.data()), num_objects), + fbb.CreateVector(MakeNonNull(store_fds_as_int.data()), store_fds_as_int.size()), + fbb.CreateVector(MakeNonNull(mmap_sizes.data()), mmap_sizes.size()), + fbb.CreateVector(MakeNonNull(handles.data()), handles.size())); return PlasmaSend(client, MessageType::PlasmaGetReply, &fbb, message); } diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index 6a55d6180..8896ba996 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -169,7 +169,7 @@ class TestObjectManagerBase : public ::testing::Test { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); uint64_t retry_with_request_id = 0; - std::shared_ptr data; + std::shared_ptr data; RAY_CHECK_OK(client.Create(object_id, ray::rpc::Address(), data_size, metadata, metadata_size, &retry_with_request_id, &data)); RAY_CHECK(retry_with_request_id == 0); @@ -300,11 +300,11 @@ class StressTestObjectManager : public TestObjectManagerBase { void CompareObjects(ObjectID &object_id_1, ObjectID &object_id_2) { plasma::ObjectBuffer object_buffer_1 = GetObject(client1, object_id_1); plasma::ObjectBuffer object_buffer_2 = GetObject(client2, object_id_2); - uint8_t *data_1 = const_cast(object_buffer_1.data->data()); - uint8_t *data_2 = const_cast(object_buffer_2.data->data()); - ASSERT_EQ(object_buffer_1.data->size(), object_buffer_2.data->size()); - ASSERT_EQ(object_buffer_1.metadata->size(), object_buffer_2.metadata->size()); - int64_t total_size = object_buffer_1.data->size() + object_buffer_1.metadata->size(); + uint8_t *data_1 = const_cast(object_buffer_1.data->Data()); + uint8_t *data_2 = const_cast(object_buffer_2.data->Data()); + ASSERT_EQ(object_buffer_1.data->Size(), object_buffer_2.data->Size()); + ASSERT_EQ(object_buffer_1.metadata->Size(), object_buffer_2.metadata->Size()); + int64_t total_size = object_buffer_1.data->Size() + object_buffer_1.metadata->Size(); RAY_LOG(DEBUG) << "total_size " << total_size; for (int i = -1; ++i < total_size;) { ASSERT_TRUE(data_1[i] == data_2[i]); diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index 9fbecc4ca..7afe2e42e 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -154,7 +154,7 @@ class TestObjectManagerBase : public ::testing::Test { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); uint64_t retry_with_request_id = 0; - std::shared_ptr data; + std::shared_ptr data; RAY_CHECK_OK(client.Create(object_id, ray::rpc::Address(), data_size, metadata, metadata_size, &retry_with_request_id, &data)); RAY_CHECK(retry_with_request_id == 0); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 6fba62e43..c5e08e187 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2039,7 +2039,7 @@ void NodeManager::MarkObjectsAsFailed( const std::string meta = std::to_string(static_cast(error_type)); for (const auto &ref : objects_to_fail) { ObjectID object_id = ObjectID::FromBinary(ref.object_id()); - std::shared_ptr data; + std::shared_ptr data; Status status; status = store_client_.TryCreateImmediately( object_id, ref.owner_address(), 0, diff --git a/thirdparty/patches/arrow-windows-export.patch b/thirdparty/patches/arrow-windows-export.patch deleted file mode 100644 index e1a62f1fa..000000000 --- a/thirdparty/patches/arrow-windows-export.patch +++ /dev/null @@ -1,8 +0,0 @@ -diff --git cpp/src/arrow/util/logging.cc cpp/src/arrow/util/logging.cc ---- cpp/src/arrow/util/logging.cc -+++ cpp/src/arrow/util/logging.cc -@@ -87,1 +87,3 @@ -+#if !defined(_WIN32) || defined(ARROW_STATIC) || defined(ARROW_EXPORTING) || !defined(ARROW_EXPORT) - ArrowLogLevel ArrowLog::severity_threshold_ = ArrowLogLevel::ARROW_INFO; -+#endif ---