From 95bb0c5357d265f7e284395f6495e2ad9947bb38 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Mon, 9 Mar 2020 10:30:44 -0700 Subject: [PATCH] Upgrade plasma to latest version, use synchronous Seal (#7470) * Upgrade arrow to master * fix build * todo * lint * Fix hanging test --- bazel/BUILD.plasma | 13 ++++++++++++- bazel/ray_deps_setup.bzl | 4 ++-- python/ray/tests/test_reference_counting.py | 16 +++++++++------- src/ray/raylet/node_manager.cc | 5 +++++ 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/bazel/BUILD.plasma b/bazel/BUILD.plasma index eddd5e287..3d8091014 100644 --- a/bazel/BUILD.plasma +++ b/bazel/BUILD.plasma @@ -23,9 +23,13 @@ cc_library( name = "arrow", srcs = [ "cpp/src/arrow/buffer.cc", + "cpp/src/arrow/device.cc", "cpp/src/arrow/io/interfaces.cc", + "cpp/src/arrow/io/memory.cc", "cpp/src/arrow/memory_pool.cc", + "cpp/src/arrow/result.cc", "cpp/src/arrow/status.cc", + "cpp/src/arrow/util/future.cc", "cpp/src/arrow/util/io_util.cc", "cpp/src/arrow/util/logging.cc", "cpp/src/arrow/util/memory.cc", @@ -36,9 +40,12 @@ cc_library( ], hdrs = [ "cpp/src/arrow/buffer.h", + "cpp/src/arrow/device.h", "cpp/src/arrow/io/concurrency.h", "cpp/src/arrow/io/interfaces.h", + "cpp/src/arrow/io/memory.h", "cpp/src/arrow/io/mman.h", + "cpp/src/arrow/io/type_fwd.h", "cpp/src/arrow/io/util_internal.h", "cpp/src/arrow/memory_pool.h", "cpp/src/arrow/result.h", @@ -48,12 +55,14 @@ cc_library( "cpp/src/arrow/util/checked_cast.h", "cpp/src/arrow/util/compare.h", "cpp/src/arrow/util/functional.h", + "cpp/src/arrow/util/future.h", "cpp/src/arrow/util/io_util.h", "cpp/src/arrow/util/iterator.h", "cpp/src/arrow/util/logging.h", "cpp/src/arrow/util/macros.h", + "cpp/src/arrow/util/make_unique.h", "cpp/src/arrow/util/memory.h", - "cpp/src/arrow/util/stl.h", + "cpp/src/arrow/util/optional.h", "cpp/src/arrow/util/string.h", "cpp/src/arrow/util/string_builder.h", "cpp/src/arrow/util/string_view.h", @@ -64,6 +73,8 @@ cc_library( "cpp/src/arrow/util/variant.h", "cpp/src/arrow/util/visibility.h", "cpp/src/arrow/util/windows_compatibility.h", + "cpp/src/arrow/util/windows_fixup.h", + "cpp/src/arrow/vendored/optional.hpp", "cpp/src/arrow/vendored/string_view.hpp", "cpp/src/arrow/vendored/utf8cpp/checked.h", "cpp/src/arrow/vendored/utf8cpp/core.h", diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index 78f1af2af..8bbb78306 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -172,8 +172,8 @@ def ray_deps_setup(): auto_http_archive( name = "plasma", build_file = True, - url = "https://github.com/apache/arrow/archive/86f34aa07e611787d9cc98c6a33b0a0a536dce57.tar.gz", - sha256 = "6b5f55d10681a3938bbf8f07eee52c4eb6e761da6ba27490f55ccb89ce645ac8", + url = "https://github.com/apache/arrow/archive/66b05abc267661172286b47b9246ad55f1581555.tar.gz", + sha256 = "fb0227005116f64dca4b19b451ae793e9a2591c019136b70424ebe3d4f5334fe", patches = [ "//thirdparty/patches:arrow-headers-unused.patch", "//thirdparty/patches:arrow-windows-export.patch", diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index 2f317b850..08b52dcea 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -665,24 +665,26 @@ def test_pass_returned_object_id(one_worker_100MiB, use_ray_put): ] @ray.remote - def pending(ref, signal): - ray.get(signal.wait.remote()) + def pending(ref): ray.get(ref[0]) + return ref[0] signal = SignalActor.remote() outer_oid = return_an_id.remote() - inner_oid_binary = ray.get(outer_oid)[0].binary() - pending_oid = pending.remote([outer_oid], signal) + pending_oid = pending.remote([outer_oid]) # Remove the local reference to the returned ID. del outer_oid # Check that the inner ID is pinned by the remote task ID. + _fill_object_store_and_get(pending_oid, succeed=False) + ray.get(signal.send.remote()) + inner_oid = ray.get(pending_oid) + inner_oid_binary = inner_oid.binary() _fill_object_store_and_get(inner_oid_binary) - # Check that the task finishing unpins the object. - ray.get(signal.send.remote()) - ray.get(pending_oid) + del pending_oid + del inner_oid _fill_object_store_and_get(inner_oid_binary, succeed=False) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index e45ce81b7..2b4fbbe2c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -3206,6 +3206,11 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, plasma_ids.push_back(plasma::ObjectID::from_binary(object_id_binary)); } std::vector plasma_results; + // TODO(swang): This `Get` has a timeout of 0, so the plasma store will not + // block when serving the request. However, if the plasma store is under + // heavy load, then this request can still block the NodeManager event loop + // since we must wait for the plasma store's reply. We should consider using + // an `AsyncGet` instead. if (!store_client_.Get(plasma_ids, /*timeout_ms=*/0, &plasma_results).ok()) { RAY_LOG(WARNING) << "Failed to get objects to be pinned from object store."; send_reply_callback(Status::Invalid("Failed to get objects."), nullptr, nullptr);