From aa06c3b15ac4824081f9871b57dc9486f12e91e5 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Tue, 2 Jun 2020 11:48:03 -0700 Subject: [PATCH] Eager eviction even when object pinning is disabled (#8561) * Eager eviction even when object pinning is disabled, add regression test * Make test more robust * lint --- python/ray/tests/test_failure.py | 17 +++++- src/ray/raylet/node_manager.cc | 92 ++++++++++++++++---------------- 2 files changed, 62 insertions(+), 47 deletions(-) diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index e1350c36e..7813caa55 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -917,12 +917,27 @@ def test_fill_object_store_exception(shutdown_only): def test_fill_object_store_lru_fallback(shutdown_only): - ray.init(num_cpus=2, object_store_memory=10**8, lru_evict=True) + config = json.dumps({ + "free_objects_batch_size": 1, + }) + ray.init( + num_cpus=2, + object_store_memory=10**8, + lru_evict=True, + _internal_config=config) @ray.remote def expensive_task(): return np.zeros((10**8) // 2, dtype=np.uint8) + # Check that objects out of scope are cleaned up quickly. + ray.get(expensive_task.remote()) + start = time.time() + for _ in range(3): + ray.get(expensive_task.remote()) + end = time.time() + assert end - start < 3 + oids = [] for _ in range(3): oid = expensive_task.remote() diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index bd518e289..55f968f1d 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -3379,10 +3379,6 @@ std::string compact_tag_string(const opencensus::stats::ViewDescriptor &view, void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, rpc::PinObjectIDsReply *reply, rpc::SendReplyCallback send_reply_callback) { - if (!object_pinning_enabled_) { - send_reply_callback(Status::OK(), nullptr, nullptr); - return; - } WorkerID worker_id = WorkerID::FromBinary(request.owner_address().worker_id()); auto it = worker_rpc_clients_.find(worker_id); if (it == worker_rpc_clients_.end()) { @@ -3395,51 +3391,55 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, .first; } - // Pin the objects in plasma by getting them and holding a reference to - // the returned buffer. - // NOTE: the caller must ensure that the objects already exist in plamsa before - // sending a PinObjectIDs request. - std::vector plasma_ids; - plasma_ids.reserve(request.object_ids_size()); - for (const auto &object_id_binary : request.object_ids()) { - plasma_ids.push_back(plasma::ObjectID::from_binary(object_id_binary)); - } - std::vector plasma_results; - // TODO(swang): This `Get` has a timeout of 0, so the plasma store will not - // block when serving the request. However, if the plasma store is under - // heavy load, then this request can still block the NodeManager event loop - // since we must wait for the plasma store's reply. We should consider using - // an `AsyncGet` instead. - if (!store_client_.Get(plasma_ids, /*timeout_ms=*/0, &plasma_results).ok()) { - RAY_LOG(WARNING) << "Failed to get objects to be pinned from object store."; - send_reply_callback(Status::Invalid("Failed to get objects."), nullptr, nullptr); - return; - } - - // Pin the requested objects until the owner notifies us that the objects can be - // unpinned by responding to the WaitForObjectEviction message. - // TODO(edoakes): we should be batching these requests instead of sending one per - // pinned object. - size_t i = 0; - for (const auto &object_id_binary : request.object_ids()) { - ObjectID object_id = ObjectID::FromBinary(object_id_binary); - - if (plasma_results[i].data == nullptr) { - RAY_LOG(ERROR) << "Plasma object " << object_id - << " was evicted before the raylet could pin it."; - continue; + if (object_pinning_enabled_) { + // Pin the objects in plasma by getting them and holding a reference to + // the returned buffer. + // NOTE: the caller must ensure that the objects already exist in plasma before + // sending a PinObjectIDs request. + std::vector plasma_ids; + plasma_ids.reserve(request.object_ids_size()); + for (const auto &object_id_binary : request.object_ids()) { + plasma_ids.push_back(plasma::ObjectID::from_binary(object_id_binary)); + } + std::vector plasma_results; + // TODO(swang): This `Get` has a timeout of 0, so the plasma store will not + // block when serving the request. However, if the plasma store is under + // heavy load, then this request can still block the NodeManager event loop + // since we must wait for the plasma store's reply. We should consider using + // an `AsyncGet` instead. + if (!store_client_.Get(plasma_ids, /*timeout_ms=*/0, &plasma_results).ok()) { + RAY_LOG(WARNING) << "Failed to get objects to be pinned from object store."; + send_reply_callback(Status::Invalid("Failed to get objects."), nullptr, nullptr); + return; } - RAY_LOG(DEBUG) << "Pinning object " << object_id; - RAY_CHECK( - pinned_objects_ - .emplace(object_id, - std::unique_ptr(new RayObject( - std::make_shared(plasma_results[i].data), - std::make_shared(plasma_results[i].metadata), {}))) - .second); - i++; + // Pin the requested objects until the owner notifies us that the objects can be + // unpinned by responding to the WaitForObjectEviction message. + // TODO(edoakes): we should be batching these requests instead of sending one per + // pinned object. + for (int64_t i = 0; i < request.object_ids().size(); i++) { + ObjectID object_id = ObjectID::FromBinary(request.object_ids(i)); + if (plasma_results[i].data == nullptr) { + RAY_LOG(ERROR) << "Plasma object " << object_id + << " was evicted before the raylet could pin it."; + continue; + } + + RAY_LOG(DEBUG) << "Pinning object " << object_id; + RAY_CHECK( + pinned_objects_ + .emplace( + object_id, + std::unique_ptr(new RayObject( + std::make_shared(plasma_results[i].data), + std::make_shared(plasma_results[i].metadata), {}))) + .second); + } + } + + for (const auto &object_id_binary : request.object_ids()) { + ObjectID object_id = ObjectID::FromBinary(object_id_binary); // Send a long-running RPC request to the owner for each object. When we get a // response or the RPC fails (due to the owner crashing), unpin the object. rpc::WaitForObjectEvictionRequest wait_request;