From 8b783ecafaee2b70849e5aa60000108b876b8dd7 Mon Sep 17 00:00:00 2001 From: Alex Wu Date: Wed, 16 Dec 2020 14:18:43 -0800 Subject: [PATCH] Fix pull manager retry (#12907) --- python/ray/tests/test_object_manager.py | 42 ++++++++++++++++++++++++ src/ray/object_manager/object_manager.cc | 21 +++++++----- src/ray/object_manager/object_manager.h | 3 +- 3 files changed, 55 insertions(+), 11 deletions(-) diff --git a/python/ray/tests/test_object_manager.py b/python/ray/tests/test_object_manager.py index c474fced4..b29b9caa2 100644 --- a/python/ray/tests/test_object_manager.py +++ b/python/ray/tests/test_object_manager.py @@ -254,6 +254,48 @@ def test_many_small_transfers(ray_start_cluster_with_resource): do_transfers() +# This is a basic test to ensure that the pull request retry timer is +# integrated properly. To test it, we create a 2 node cluster then do the +# following: +# (1) Fill up the driver's object store. +# (2) Fill up the remote node's object store. +# (3) Try to get the remote object. This should fail due to an OOM error caused +# by step 1. +# (4) Allow the local object to be evicted. +# (5) Try to get the object again. Now the retry timer should kick in and +# successfuly pull the remote object. +@pytest.mark.timeout(30) +def test_pull_request_retry(shutdown_only): + cluster = Cluster() + cluster.add_node(num_cpus=0, num_gpus=1, object_store_memory=100 * 2**20) + cluster.add_node(num_cpus=1, num_gpus=0, object_store_memory=100 * 2**20) + cluster.wait_for_nodes() + ray.init(address=cluster.address) + + @ray.remote + def put(): + return np.zeros(64 * 2**20, dtype=np.int8) + + @ray.remote(num_cpus=0, num_gpus=1) + def driver(): + local_ref = ray.put(np.zeros(64 * 2**20, dtype=np.int8)) + + remote_ref = put.remote() + + ready, _ = ray.wait([remote_ref], timeout=1) + assert len(ready) == 0 + + del local_ref + + # This should always complete within 10 seconds. + ready, _ = ray.wait([remote_ref], timeout=20) + assert len(ready) > 0 + + # Pretend the GPU node is the driver. We do this to force the placement of + # the driver and `put` task on different nodes. + ray.get(driver.remote()) + + if __name__ == "__main__": import pytest import sys diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 2eb641d04..3a31da864 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -89,14 +89,7 @@ ObjectManager::ObjectManager(asio::io_service &main_service, const NodeID &self_ static_cast(1L), static_cast(config_.max_bytes_in_flight / config_.object_chunk_size)))); - pull_retry_timer_.async_wait([this](const boost::system::error_code &e) { - RAY_CHECK(!e) << "The raylet's object manager has failed unexpectedly with error: " - << e - << ". Please file a bug report on here: " - "https://github.com/ray-project/ray/issues"; - - Tick(); - }); + pull_retry_timer_.async_wait([this](const boost::system::error_code &e) { Tick(e); }); if (plasma::plasma_store_runner) { store_notification_ = std::make_shared(main_service); @@ -814,6 +807,16 @@ void ObjectManager::RecordMetrics() const { stats::ObjectManagerPullRequests().Record(pull_manager_->NumActiveRequests()); } -void ObjectManager::Tick() { pull_manager_->Tick(); } +void ObjectManager::Tick(const boost::system::error_code &e) { + RAY_CHECK(!e) << "The raylet's object manager has failed unexpectedly with error: " << e + << ". Please file a bug report on here: " + "https://github.com/ray-project/ray/issues"; + + pull_manager_->Tick(); + + auto interval = boost::posix_time::milliseconds(config_.timer_freq_ms); + pull_retry_timer_.expires_from_now(interval); + pull_retry_timer_.async_wait([this](const boost::system::error_code &e) { Tick(e); }); +} } // namespace ray diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 75dbb5bd4..ff409eb18 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -284,7 +284,7 @@ class ObjectManager : public ObjectManagerInterface, /// Record metrics. void RecordMetrics() const; - void Tick(); + void Tick(const boost::system::error_code &e); private: friend class TestObjectManager; @@ -458,7 +458,6 @@ class ObjectManager : public ObjectManagerInterface, const RestoreSpilledObjectCallback restore_spilled_object_; /// Pull manager retry timer . - /* std::unique_ptr pull_retry_timer_; */ boost::asio::deadline_timer pull_retry_timer_; /// Object push manager.