diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index 5fe9b697b..aee4ae2d4 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -189,7 +189,6 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only): replay_buffer.append(ref) solution_buffer.append(arr) - print("-----------------------------------") # randomly sample objects for _ in range(1000): index = random.choice(list(range(buffer_length))) diff --git a/src/ray/object_manager/pull_manager.cc b/src/ray/object_manager/pull_manager.cc index c9fa13177..289ad13eb 100644 --- a/src/ray/object_manager/pull_manager.cc +++ b/src/ray/object_manager/pull_manager.cc @@ -30,8 +30,10 @@ uint64_t PullManager::Pull(const std::vector &object_ref_b // We don't have an active pull for this object yet. Ask the caller to // send us notifications about the object's location. objects_to_locate->push_back(ref); + // The first pull request doesn't need to be special case. Instead we can just let + // the retry timer fire immediately. it = object_pull_requests_ - .emplace(obj_id, ObjectPullRequest(get_time_() + pull_timeout_ms_ / 1000)) + .emplace(obj_id, ObjectPullRequest(/*next_pull_time=*/get_time_())) .first; } it->second.bundle_request_ids.insert(bundle_it->first); @@ -89,43 +91,49 @@ void PullManager::TryToMakeObjectLocal(const ObjectID &object_id) { if (it == object_pull_requests_.end()) { return; } - auto &request = it->second; + if (request.next_pull_time > get_time_()) { + return; + } + if (!request.spilled_url.empty()) { // Try to restore the spilled object. - restore_spilled_object_(object_id, request.spilled_url, - [this, object_id](const ray::Status &status) { - // Fall back to fetching from another object manager. - if (!status.ok()) { - PullFromRandomLocation(object_id); - } - }); + restore_spilled_object_( + object_id, request.spilled_url, [this, object_id](const ray::Status &status) { + bool did_pull = true; + // Fall back to fetching from another object manager. + if (!status.ok()) { + did_pull = PullFromRandomLocation(object_id); + } + if (!did_pull) { + RAY_LOG(WARNING) << "Object restoration failed and the object could not be " + "found on any other nodes. Object id: " + << object_id; + } + }); + UpdateRetryTimer(request); } else { // New object locations were found, so begin trying to pull from a // client. This will be called every time a new client location // appears. - PullFromRandomLocation(object_id); + bool did_pull = PullFromRandomLocation(object_id); + if (did_pull) { + UpdateRetryTimer(request); + } } - - const auto time = get_time_(); - auto retry_timeout_len = (pull_timeout_ms_ / 1000.) * (1UL << request.num_retries); - request.next_pull_time = time + retry_timeout_len; - - // Bound the retry time at 10 * 1024 seconds. - request.num_retries = std::min(request.num_retries + 1, 10); } -void PullManager::PullFromRandomLocation(const ObjectID &object_id) { +bool PullManager::PullFromRandomLocation(const ObjectID &object_id) { auto it = object_pull_requests_.find(object_id); if (it == object_pull_requests_.end()) { - return; + return false; } auto &node_vector = it->second.client_locations; // The timer should never fire if there are no expected client locations. if (node_vector.empty()) { - return; + return false; } RAY_CHECK(!object_is_local_(object_id)); @@ -138,7 +146,7 @@ void PullManager::PullFromRandomLocation(const ObjectID &object_id) { << "already has the object. The object may have been evicted. It is " << "most likely due to memory pressure, object pull has been " << "requested before object location is updated."; - return; + return false; } // Choose a random client to pull the object from. @@ -163,16 +171,22 @@ void PullManager::PullFromRandomLocation(const ObjectID &object_id) { RAY_LOG(DEBUG) << "Sending pull request from " << self_node_id_ << " to " << node_id << " of object " << object_id; send_pull_request_(object_id, node_id); + return true; +} + +void PullManager::UpdateRetryTimer(ObjectPullRequest &request) { + const auto time = get_time_(); + auto retry_timeout_len = (pull_timeout_ms_ / 1000.) * (1UL << request.num_retries); + request.next_pull_time = time + retry_timeout_len; + + // Bound the retry time at 10 * 1024 seconds. + request.num_retries = std::min(request.num_retries + 1, 10); } void PullManager::Tick() { for (auto &pair : object_pull_requests_) { const auto &object_id = pair.first; - auto &request = pair.second; - const auto time = get_time_(); - if (time >= request.next_pull_time) { - TryToMakeObjectLocal(object_id); - } + TryToMakeObjectLocal(object_id); } } diff --git a/src/ray/object_manager/pull_manager.h b/src/ray/object_manager/pull_manager.h index 33710a18e..6364ae34a 100644 --- a/src/ray/object_manager/pull_manager.h +++ b/src/ray/object_manager/pull_manager.h @@ -102,7 +102,15 @@ class PullManager { /// Try to Pull an object from one of its expected client locations. If there /// are more client locations to try after this attempt, then this method /// will try each of the other clients in succession. - void PullFromRandomLocation(const ObjectID &object_id); + /// + /// \return True if a pull request was sent, otherwise false. + bool PullFromRandomLocation(const ObjectID &object_id); + + /// Update the request retry time for the given request. + /// The retry timer is incremented exponentially, capped at 1024 * 10 seconds. + /// + /// \param request The request to update the retry time of. + void UpdateRetryTimer(ObjectPullRequest &request); /// See the constructor's arguments. NodeID self_node_id_; diff --git a/src/ray/object_manager/test/pull_manager_test.cc b/src/ray/object_manager/test/pull_manager_test.cc index 21e41f874..9230c87e9 100644 --- a/src/ray/object_manager/test/pull_manager_test.cc +++ b/src/ray/object_manager/test/pull_manager_test.cc @@ -24,8 +24,9 @@ class PullManagerTest : public ::testing::Test { num_send_pull_request_calls_++; }, [this](const ObjectID &, const std::string &, - std::function) { + std::function callback) { num_restore_spilled_object_calls_++; + restore_object_callback_ = callback; }, [this]() { return fake_time_; }, 10000) {} @@ -33,6 +34,7 @@ class PullManagerTest : public ::testing::Test { bool object_is_local_; int num_send_pull_request_calls_; int num_restore_spilled_object_calls_; + std::function restore_object_callback_; double fake_time_; PullManager pull_manager_; }; @@ -98,6 +100,7 @@ TEST_F(PullManagerTest, TestRestoreSpilledObject) { ASSERT_EQ(num_restore_spilled_object_calls_, 1); client_ids.insert(NodeID::FromRandom()); + fake_time_ += 10.; pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar"); // The behavior is supposed to be to always restore the spilled object if possible (even @@ -116,6 +119,56 @@ TEST_F(PullManagerTest, TestRestoreSpilledObject) { ASSERT_EQ(pull_manager_.NumActiveRequests(), 0); } +TEST_F(PullManagerTest, TestRestoreObjectFailed) { + auto refs = CreateObjectRefs(1); + auto obj1 = ObjectRefsToIds(refs)[0]; + rpc::Address addr1; + ASSERT_EQ(pull_manager_.NumActiveRequests(), 0); + std::vector objects_to_locate; + pull_manager_.Pull(refs, &objects_to_locate); + ASSERT_EQ(ObjectRefsToIds(objects_to_locate), ObjectRefsToIds(refs)); + ASSERT_EQ(pull_manager_.NumActiveRequests(), 1); + + std::unordered_set client_ids; + pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar"); + + // client_ids is empty here, so there's nowhere to pull from. + ASSERT_EQ(num_send_pull_request_calls_, 0); + ASSERT_EQ(num_restore_spilled_object_calls_, 1); + + restore_object_callback_(ray::Status::IOError(":(")); + + // client_ids is empty here, so there's nowhere to pull from. + ASSERT_EQ(num_send_pull_request_calls_, 0); + ASSERT_EQ(num_restore_spilled_object_calls_, 1); + + client_ids.insert(NodeID::FromRandom()); + pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar"); + + // We always assume the restore succeeded so there's only 1 restore call still. + ASSERT_EQ(num_send_pull_request_calls_, 0); + ASSERT_EQ(num_restore_spilled_object_calls_, 1); + + fake_time_ += 10.0; + pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar"); + + ASSERT_EQ(num_send_pull_request_calls_, 0); + ASSERT_EQ(num_restore_spilled_object_calls_, 2); + + restore_object_callback_(ray::Status::IOError(":(")); + + // Since restore failed, we can fallback to pulling from another node immediately. + ASSERT_EQ(num_send_pull_request_calls_, 1); + ASSERT_EQ(num_restore_spilled_object_calls_, 2); + + pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar"); + + // Now that we've successfully sent a pull request, we need to wait for the retry period + // before sending another one. + ASSERT_EQ(num_send_pull_request_calls_, 1); + ASSERT_EQ(num_restore_spilled_object_calls_, 2); +} + TEST_F(PullManagerTest, TestManyUpdates) { auto refs = CreateObjectRefs(1); auto obj1 = ObjectRefsToIds(refs)[0]; @@ -133,7 +186,8 @@ TEST_F(PullManagerTest, TestManyUpdates) { pull_manager_.OnLocationChange(obj1, client_ids, ""); } - ASSERT_EQ(num_send_pull_request_calls_, 100); + // Since no time has passed, only send a single pull request. + ASSERT_EQ(num_send_pull_request_calls_, 1); ASSERT_EQ(num_restore_spilled_object_calls_, 0); auto objects_to_cancel = pull_manager_.CancelPull(req_id); @@ -160,22 +214,18 @@ TEST_F(PullManagerTest, TestRetryTimer) { ASSERT_EQ(num_send_pull_request_calls_, 1); ASSERT_EQ(num_restore_spilled_object_calls_, 0); - for (; fake_time_ <= 127 * 10; fake_time_ += 1.) { + for (; fake_time_ <= 7 * 10; fake_time_ += 1.) { pull_manager_.Tick(); } - // Rapid set of location changes. - for (int i = 0; i < 127; i++) { - fake_time_ += 0.1; + // Location changes can trigger reset timer. + for (; fake_time_ <= 120 * 10; fake_time_ += 1.) { pull_manager_.OnLocationChange(obj1, client_ids, ""); } // We should make a pull request every tick (even if it's a duplicate to a node we're // already pulling from). - // OnLocationChange also doesn't count towards the retry timer. - // To the casual observer, this may seem off-by-one, but this is due to floating point - // error (0.1 + 0.1 ... 10k times > 10 == True) - ASSERT_EQ(num_send_pull_request_calls_, 1 + 7 + 127); + ASSERT_EQ(num_send_pull_request_calls_, 7); ASSERT_EQ(num_restore_spilled_object_calls_, 0); // Don't retry an object if it's local. @@ -255,6 +305,7 @@ TEST_F(PullManagerTest, TestDeduplicateBundles) { ASSERT_TRUE(objects_to_cancel.empty()); // Objects should still be pulled because the other request is still open. ASSERT_EQ(pull_manager_.NumActiveRequests(), oids.size()); + fake_time_ += 10; num_send_pull_request_calls_ = 0; for (size_t i = 0; i < oids.size(); i++) { pull_manager_.OnLocationChange(oids[i], client_ids, "");