From 99375c4cfcd27bb7554497fc9b2d4c1d3afaf43a Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 19 Jan 2021 11:01:45 -0800 Subject: [PATCH] [Object Spilling] Remove retries and use a timer instead. (#13175) --- python/ray/parameter.py | 1 - python/ray/tests/conftest.py | 1 - python/ray/tests/test_failure.py | 8 +-- python/ray/tests/test_object_spilling.py | 13 ---- python/ray/tests/test_reference_counting_2.py | 1 - src/ray/common/ray_config_def.h | 6 +- .../plasma/create_request_queue.cc | 29 ++++----- .../plasma/create_request_queue.h | 33 ++++++---- src/ray/object_manager/plasma/store.cc | 6 +- .../test/create_request_queue_test.cc | 65 +++++++++++++------ 10 files changed, 88 insertions(+), 75 deletions(-) diff --git a/python/ray/parameter.py b/python/ray/parameter.py index c0d3dc725..a9b20769d 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -210,7 +210,6 @@ class RayParams: raise Exception( "Object pinning cannot be enabled if using LRU eviction.") self._system_config["object_pinning_enabled"] = False - self._system_config["object_store_full_max_retries"] = -1 self._system_config["free_objects_period_milliseconds"] = 1000 # Set the internal config options for object reconstruction. diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 4fdfe68c6..db858a475 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -22,7 +22,6 @@ def get_default_fixure_system_config(): system_config = { "object_timeout_milliseconds": 200, "num_heartbeats_timeout": 10, - "object_store_full_max_retries": 3, "object_store_full_delay_ms": 100, } return system_config diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index f01868989..f45aea9b4 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -1023,9 +1023,6 @@ def test_connect_with_disconnected_node(shutdown_only): "ray_start_cluster_head", [{ "num_cpus": 5, "object_store_memory": 10**8, - "_system_config": { - "object_store_full_max_retries": 0 - } }], indirect=True) def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head): @@ -1042,10 +1039,7 @@ def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head): def test_fill_object_store_exception(shutdown_only): - ray.init( - num_cpus=2, - object_store_memory=10**8, - _system_config={"object_store_full_max_retries": 0}) + ray.init(num_cpus=2, object_store_memory=10**8) @ray.remote def expensive_task(): diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index aee4ae2d4..10b1da773 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -84,7 +84,6 @@ def test_spilling_not_done_for_pinned_object(tmp_path, shutdown_only): _system_config={ "max_io_workers": 4, "automatic_object_spilling_enabled": True, - "object_store_full_max_retries": 4, "object_store_full_delay_ms": 100, "object_spilling_config": json.dumps({ "type": "filesystem", @@ -117,7 +116,6 @@ def test_spilling_not_done_for_pinned_object(tmp_path, shutdown_only): "object_store_memory": 75 * 1024 * 1024, "_system_config": { "automatic_object_spilling_enabled": True, - "object_store_full_max_retries": 4, "object_store_full_delay_ms": 100, "max_io_workers": 4, "object_spilling_config": json.dumps({ @@ -170,7 +168,6 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only): _system_config={ "max_io_workers": 4, "automatic_object_spilling_enabled": True, - "object_store_full_max_retries": 4, "object_store_full_delay_ms": 100, "object_spilling_config": object_spilling_config, "min_spilling_size": 0 @@ -251,10 +248,6 @@ def test_spill_during_get(object_spilling_config, shutdown_only): _system_config={ "automatic_object_spilling_enabled": True, "object_store_full_delay_ms": 100, - # NOTE(swang): Use infinite retries because the OOM timer can still - # get accidentally triggered when objects are released too slowly - # (see github.com/ray-project/ray/issues/12040). - "object_store_full_max_retries": -1, "max_io_workers": 1, "object_spilling_config": object_spilling_config, "min_spilling_size": 0, @@ -286,7 +279,6 @@ def test_spill_deadlock(object_spilling_config, shutdown_only): _system_config={ "max_io_workers": 1, "automatic_object_spilling_enabled": True, - "object_store_full_max_retries": 4, "object_store_full_delay_ms": 100, "object_spilling_config": object_spilling_config, "min_spilling_size": 0, @@ -320,7 +312,6 @@ def test_delete_objects(tmp_path, shutdown_only): "max_io_workers": 1, "min_spilling_size": 0, "automatic_object_spilling_enabled": True, - "object_store_full_max_retries": 4, "object_store_full_delay_ms": 100, "object_spilling_config": json.dumps({ "type": "filesystem", @@ -363,7 +354,6 @@ def test_delete_objects_delete_while_creating(tmp_path, shutdown_only): "max_io_workers": 4, "min_spilling_size": 0, "automatic_object_spilling_enabled": True, - "object_store_full_max_retries": 4, "object_store_full_delay_ms": 100, "object_spilling_config": json.dumps({ "type": "filesystem", @@ -413,7 +403,6 @@ def test_delete_objects_on_worker_failure(tmp_path, shutdown_only): _system_config={ "max_io_workers": 4, "automatic_object_spilling_enabled": True, - "object_store_full_max_retries": 4, "object_store_full_delay_ms": 100, "object_spilling_config": json.dumps({ "type": "filesystem", @@ -489,7 +478,6 @@ def test_delete_objects_multi_node(tmp_path, ray_start_cluster): "max_io_workers": 2, "min_spilling_size": 20 * 1024 * 1024, "automatic_object_spilling_enabled": True, - "object_store_full_max_retries": 4, "object_store_full_delay_ms": 100, "object_spilling_config": json.dumps({ "type": "filesystem", @@ -563,7 +551,6 @@ def test_fusion_objects(tmp_path, shutdown_only): _system_config={ "max_io_workers": 3, "automatic_object_spilling_enabled": True, - "object_store_full_max_retries": 4, "object_store_full_delay_ms": 100, "object_spilling_config": json.dumps({ "type": "filesystem", diff --git a/python/ray/tests/test_reference_counting_2.py b/python/ray/tests/test_reference_counting_2.py index c8de9b3c9..8cc7576aa 100644 --- a/python/ray/tests/test_reference_counting_2.py +++ b/python/ray/tests/test_reference_counting_2.py @@ -20,7 +20,6 @@ logger = logging.getLogger(__name__) @pytest.fixture def one_worker_100MiB(request): config = { - "object_store_full_max_retries": 2, "task_retry_delay_ms": 0, "object_timeout_milliseconds": 1000, } diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index d5c4386e4..cfbc62517 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -238,9 +238,6 @@ RAY_CONFIG(uint32_t, maximum_gcs_dead_node_cached_count, 1000) /// The interval at which the gcs server will print debug info. RAY_CONFIG(int64_t, gcs_dump_debug_log_interval_minutes, 1) -/// Maximum number of times to retry putting an object when the plasma store is full. -/// Can be set to -1 to enable unlimited retries. -RAY_CONFIG(int32_t, object_store_full_max_retries, 1000) /// Duration to sleep after failing to put an object in plasma because it is full. RAY_CONFIG(uint32_t, object_store_full_delay_ms, 10) @@ -361,6 +358,9 @@ RAY_CONFIG(int64_t, min_spilling_size, 100 * 1024 * 1024) /// TODO(sang): Fix it. RAY_CONFIG(bool, automatic_object_deletion_enabled, true) +/// Grace period until we throw the OOM error to the application in seconds. +RAY_CONFIG(int64_t, oom_grace_period_s, 10) + /* Configuration parameters for locality-aware scheduling. */ /// Whether to enable locality-aware leasing. If enabled, then Ray will consider task /// dependency locality when choosing a worker for leasing. diff --git a/src/ray/object_manager/plasma/create_request_queue.cc b/src/ray/object_manager/plasma/create_request_queue.cc index 917a72102..ddb9b0891 100644 --- a/src/ray/object_manager/plasma/create_request_queue.cc +++ b/src/ray/object_manager/plasma/create_request_queue.cc @@ -81,18 +81,16 @@ std::pair CreateRequestQueue::TryRequestImmediately( } bool CreateRequestQueue::ProcessRequest(std::unique_ptr &request) { - // Return an OOM error to the client if we have hit the maximum number of - // retries. - // TODO(sang): Delete this logic? + // TODO(sang): Delete this logic when lru evict is removed. bool evict_if_full = evict_if_full_; - if (max_retries_ == 0) { - // If we cannot retry, then always evict on the first attempt. - evict_if_full = true; - } else if (num_retries_ > 0) { - // Always try to evict after the first attempt. + if (oom_start_time_ns_ != -1) { + // If the first attempt fails, we set the evict_if_full true. + // We need this logic because if lru_evict flag is on, this is false because we + // shouldn't evict objects in the first attempt. evict_if_full = true; } - request->error = request->create_callback(evict_if_full, &request->result); + request->error = + request->create_callback(/*evict_if_full=*/evict_if_full, &request->result); return request->error != PlasmaError::OutOfMemory; } @@ -100,21 +98,25 @@ Status CreateRequestQueue::ProcessRequests() { while (!queue_.empty()) { auto request_it = queue_.begin(); auto create_ok = ProcessRequest(*request_it); + auto now = get_time_(); if (create_ok) { FinishRequest(request_it); + // Reset the oom start time since the creation succeeds. + oom_start_time_ns_ = -1; } else { if (trigger_global_gc_) { trigger_global_gc_(); } + if (oom_start_time_ns_ == -1) { + oom_start_time_ns_ = now; + } if (spill_objects_callback_()) { return Status::TransientObjectStoreFull("Waiting for spilling."); - } else if (num_retries_ < max_retries_ || max_retries_ == -1) { + } else if (now - oom_start_time_ns_ < oom_grace_period_ns_) { // We need a grace period since (1) global GC takes a bit of time to // kick in, and (2) there is a race between spilling finishing and space // actually freeing up in the object store. - // If max_retries == -1, we retry infinitely. - num_retries_ += 1; return Status::ObjectStoreFull("Waiting for grace period."); } else { // Raise OOM. In this case, the request will be marked as OOM. @@ -135,9 +137,6 @@ void CreateRequestQueue::FinishRequest( RAY_CHECK(it->second == nullptr); it->second = std::move(request); queue_.erase(request_it); - - // Reset the number of retries since we are no longer trying this request. - num_retries_ = 0; } void CreateRequestQueue::RemoveDisconnectedClientRequests( diff --git a/src/ray/object_manager/plasma/create_request_queue.h b/src/ray/object_manager/plasma/create_request_queue.h index 212ce69e6..d2ac288bd 100644 --- a/src/ray/object_manager/plasma/create_request_queue.h +++ b/src/ray/object_manager/plasma/create_request_queue.h @@ -34,15 +34,18 @@ class CreateRequestQueue { using CreateObjectCallback = std::function; - CreateRequestQueue(int32_t max_retries, bool evict_if_full, + CreateRequestQueue(bool evict_if_full, int64_t oom_grace_period_s, ray::SpillObjectsCallback spill_objects_callback, - std::function trigger_global_gc) - : max_retries_(max_retries), - evict_if_full_(evict_if_full), + std::function trigger_global_gc, + std::function get_time) + : evict_if_full_(evict_if_full), + oom_grace_period_ns_(oom_grace_period_s * 1e9), spill_objects_callback_(spill_objects_callback), - trigger_global_gc_(trigger_global_gc) { - RAY_LOG(DEBUG) << "Starting plasma::CreateRequestQueue with " << max_retries_ - << " retries on OOM, evict if full? " << (evict_if_full_ ? 1 : 0); + trigger_global_gc_(trigger_global_gc), + get_time_(get_time) { + RAY_LOG(DEBUG) << "Starting plasma::CreateRequestQueue with OOM grace period " + << oom_grace_period_ns_ << ", evict if full? " + << (evict_if_full_ ? 1 : 0); } /// Add a request to the queue. The caller should use the returned request ID @@ -148,17 +151,15 @@ class CreateRequestQueue { /// a request by retrying. Start at 1 because 0 means "do not retry". uint64_t next_req_id_ = 1; - /// The maximum number of times to retry each request upon OOM. - const int32_t max_retries_; - - /// The number of times the request at the head of the queue has been tried. - int32_t num_retries_ = 0; - /// On the first attempt to create an object, whether to evict from the /// object store to make space. If the first attempt fails, then we will /// always try to evict. const bool evict_if_full_; + /// Grace period until we throw the OOM error to the application. + /// -1 means grace period is infinite. + const int64_t oom_grace_period_ns_; + /// A callback to trigger object spilling. It tries to spill objects upto max /// throughput. It returns true if space is made by object spilling, and false if /// there's no more space to be made. @@ -168,6 +169,9 @@ class CreateRequestQueue { /// full. const std::function trigger_global_gc_; + /// A callback to return the current time. + const std::function get_time_; + /// Queue of object creation requests to respond to. Requests will be placed /// on this queue if the object store does not have enough room at the time /// that the client made the creation request, but space may be made through @@ -189,6 +193,9 @@ class CreateRequestQueue { /// Last time global gc was invoked in ms. uint64_t last_global_gc_ms_; + /// The time OOM timer first starts. It becomes -1 upon every creation success. + int64_t oom_start_time_ns_ = -1; + friend class CreateRequestQueueTest; }; diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index 7fe41faa3..9bae68b3a 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -125,9 +125,11 @@ PlasmaStore::PlasmaStore(boost::asio::io_service &main_service, std::string dire usage_log_interval_ns_(RayConfig::instance().object_store_usage_log_interval_s() * 1e9), create_request_queue_( - RayConfig::instance().object_store_full_max_retries(), /*evict_if_full=*/RayConfig::instance().object_pinning_enabled(), - spill_objects_callback, object_store_full_callback) { + /*oom_grace_period_s=*/RayConfig::instance().oom_grace_period_s(), + spill_objects_callback, object_store_full_callback, + /*get_time=*/ + []() { return absl::GetCurrentTimeNanos(); }) { store_info_.directory = directory; store_info_.hugepages_enabled = hugepages_enabled; } diff --git a/src/ray/object_manager/test/create_request_queue_test.cc b/src/ray/object_manager/test/create_request_queue_test.cc index de60807a6..ec75e0043 100644 --- a/src/ray/object_manager/test/create_request_queue_test.cc +++ b/src/ray/object_manager/test/create_request_queue_test.cc @@ -46,17 +46,24 @@ class MockClient : public ClientInterface { class CreateRequestQueueTest : public ::testing::Test { public: CreateRequestQueueTest() - : queue_( - /*max_retries=*/2, + : oom_grace_period_s_(1), + current_time_ns_(0), + queue_( /*evict_if_full=*/true, + /*oom_grace_period_s=*/oom_grace_period_s_, /*spill_object_callback=*/[&]() { return false; }, - /*on_global_gc=*/[&]() { num_global_gc_++; }) {} + /*on_global_gc=*/[&]() { num_global_gc_++; }, + /*get_time=*/[&]() { return current_time_ns_; }) {} void AssertNoLeaks() { ASSERT_TRUE(queue_.queue_.empty()); ASSERT_TRUE(queue_.fulfilled_requests_.empty()); } + void TearDown() { current_time_ns_ = 0; } + + int64_t oom_grace_period_s_; + int64_t current_time_ns_; CreateRequestQueue queue_; int num_global_gc_ = 0; }; @@ -66,6 +73,8 @@ TEST_F(CreateRequestQueueTest, TestSimple) { result->data_size = 1234; return PlasmaError::OK; }; + // Advance the clock without processing objects. This shouldn't have an impact. + current_time_ns_ += 10e9; auto client = std::make_shared(); auto req_id = queue_.AddRequest(ObjectID::Nil(), client, request); ASSERT_REQUEST_UNFINISHED(queue_, req_id); @@ -115,8 +124,9 @@ TEST_F(CreateRequestQueueTest, TestOom) { ASSERT_REQUEST_UNFINISHED(queue_, req_id2); ASSERT_EQ(num_global_gc_, 2); - // Retries used up. The first request should reply with OOM and the second + // Grace period is done. The first request should reply with OOM and the second // request should also be served. + current_time_ns_ += oom_grace_period_s_ * 2e9; ASSERT_TRUE(queue_.ProcessRequests().ok()); ASSERT_EQ(num_global_gc_, 3); @@ -129,12 +139,14 @@ TEST_F(CreateRequestQueueTest, TestOom) { TEST(CreateRequestQueueParameterTest, TestOomInfiniteRetry) { int num_global_gc_ = 0; + int64_t current_time_ns; CreateRequestQueue queue( - /*max_retries=*/-1, /*evict_if_full=*/true, + /*oom_grace_period_s=*/100, // Spilling is failing. /*spill_object_callback=*/[&]() { return false; }, - /*on_global_gc=*/[&]() { num_global_gc_++; }); + /*on_global_gc=*/[&]() { num_global_gc_++; }, + /*get_time=*/[&]() { return current_time_ns; }); auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { return PlasmaError::OutOfMemory; @@ -148,7 +160,9 @@ TEST(CreateRequestQueueParameterTest, TestOomInfiniteRetry) { auto req_id1 = queue.AddRequest(ObjectID::Nil(), client, oom_request); auto req_id2 = queue.AddRequest(ObjectID::Nil(), client, blocked_request); - for (int i = 0; i < 3; i++) { + for (int i = 0; i < 10; i++) { + // Advance 1 second. + current_time_ns += 1e9; ASSERT_TRUE(queue.ProcessRequests().IsObjectStoreFull()); ASSERT_EQ(num_global_gc_, i + 1); } @@ -160,10 +174,11 @@ TEST(CreateRequestQueueParameterTest, TestOomInfiniteRetry) { TEST_F(CreateRequestQueueTest, TestTransientOom) { CreateRequestQueue queue( - /*max_retries=*/2, - /*evict_if_full=*/false, + /*evict_if_full=*/true, + /*oom_grace_period_s=*/oom_grace_period_s_, /*spill_object_callback=*/[&]() { return true; }, - /*on_global_gc=*/[&]() { num_global_gc_++; }); + /*on_global_gc=*/[&]() { num_global_gc_++; }, + /*get_time=*/[&]() { return current_time_ns_; }); auto return_status = PlasmaError::OutOfMemory; auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { @@ -181,14 +196,18 @@ TEST_F(CreateRequestQueueTest, TestTransientOom) { auto req_id1 = queue.AddRequest(ObjectID::Nil(), client, oom_request); auto req_id2 = queue.AddRequest(ObjectID::Nil(), client, blocked_request); - // Transient OOM should not use up any retries. - for (int i = 0; i < 3; i++) { + // Transient OOM should happen until the grace period. + for (int i = 0; i < 9; i++) { + // Advance 0.1 seconds. OOM grace period is 1 second, so it should return transient + // error. + current_time_ns_ += 1e8; ASSERT_TRUE(queue.ProcessRequests().IsTransientObjectStoreFull()); ASSERT_REQUEST_UNFINISHED(queue, req_id1); ASSERT_REQUEST_UNFINISHED(queue, req_id2); ASSERT_EQ(num_global_gc_, i + 1); } + current_time_ns_ += oom_grace_period_s_ * 2e9; // Return OK for the first request. The second request should also be served. return_status = PlasmaError::OK; ASSERT_TRUE(queue.ProcessRequests().ok()); @@ -201,10 +220,11 @@ TEST_F(CreateRequestQueueTest, TestTransientOom) { TEST_F(CreateRequestQueueTest, TestTransientOomThenOom) { bool is_spilling_possible = true; CreateRequestQueue queue( - /*max_retries=*/2, - /*evict_if_full=*/false, + /*evict_if_full=*/true, + /*oom_grace_period_s=*/oom_grace_period_s_, /*spill_object_callback=*/[&]() { return is_spilling_possible; }, - /*on_global_gc=*/[&]() { num_global_gc_++; }); + /*on_global_gc=*/[&]() { num_global_gc_++; }, + /*get_time=*/[&]() { return current_time_ns_; }); auto return_status = PlasmaError::OutOfMemory; auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { @@ -222,8 +242,10 @@ TEST_F(CreateRequestQueueTest, TestTransientOomThenOom) { auto req_id1 = queue.AddRequest(ObjectID::Nil(), client, oom_request); auto req_id2 = queue.AddRequest(ObjectID::Nil(), client, blocked_request); - // Transient OOM should not use up any retries. + // Transient OOM should not use up any until grace period is done. for (int i = 0; i < 3; i++) { + // Advance 0.1 seconds. OOM grace period is 1 second. + current_time_ns_ += 1e8; ASSERT_TRUE(queue.ProcessRequests().IsTransientObjectStoreFull()); ASSERT_REQUEST_UNFINISHED(queue, req_id1); ASSERT_REQUEST_UNFINISHED(queue, req_id2); @@ -238,8 +260,9 @@ TEST_F(CreateRequestQueueTest, TestTransientOomThenOom) { ASSERT_REQUEST_UNFINISHED(queue, req_id2); ASSERT_EQ(num_global_gc_, 5); - // Retries used up. The first request should reply with OOM and the second + // Grace period is done. The first request should reply with OOM and the second // request should also be served. + current_time_ns_ += oom_grace_period_s_ * 2e9; ASSERT_TRUE(queue.ProcessRequests().ok()); ASSERT_REQUEST_FINISHED(queue, req_id1, PlasmaError::OutOfMemory); ASSERT_REQUEST_FINISHED(queue, req_id2, PlasmaError::OK); @@ -261,13 +284,16 @@ TEST_F(CreateRequestQueueTest, TestEvictIfFull) { } TEST(CreateRequestQueueParameterTest, TestNoEvictIfFull) { + int64_t current_time_ns = 0; CreateRequestQueue queue( - /*max_retries=*/2, /*evict_if_full=*/false, + /*oom_grace_period_s=*/1, /*spill_object_callback=*/[&]() { return false; }, - /*on_global_gc=*/[&]() {}); + /*on_global_gc=*/[&]() {}, + /*get_time=*/[&]() { return current_time_ns; }); bool first_try = true; + auto oom_request = [&](bool evict_if_full, PlasmaObject *result) { if (first_try) { RAY_CHECK(!evict_if_full); @@ -281,6 +307,7 @@ TEST(CreateRequestQueueParameterTest, TestNoEvictIfFull) { auto client = std::make_shared(); static_cast(queue.AddRequest(ObjectID::Nil(), client, oom_request)); ASSERT_TRUE(queue.ProcessRequests().IsObjectStoreFull()); + current_time_ns += 1e8; ASSERT_TRUE(queue.ProcessRequests().IsObjectStoreFull()); }