[Pull manager] Only pull once per retry period (#13245)

* .

* docs

* cleanup

* .

* .

* .

* .

Co-authored-by: Alex <alex@anyscale.com>
This commit is contained in:
Alex Wu
2021-01-08 14:51:11 -08:00
committed by GitHub
parent 66daed99f5
commit 6ca4fb1054
4 changed files with 110 additions and 38 deletions
-1
View File
@@ -189,7 +189,6 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only):
replay_buffer.append(ref)
solution_buffer.append(arr)
print("-----------------------------------")
# randomly sample objects
for _ in range(1000):
index = random.choice(list(range(buffer_length)))
+40 -26
View File
@@ -30,8 +30,10 @@ uint64_t PullManager::Pull(const std::vector<rpc::ObjectReference> &object_ref_b
// We don't have an active pull for this object yet. Ask the caller to
// send us notifications about the object's location.
objects_to_locate->push_back(ref);
// The first pull request doesn't need to be special case. Instead we can just let
// the retry timer fire immediately.
it = object_pull_requests_
.emplace(obj_id, ObjectPullRequest(get_time_() + pull_timeout_ms_ / 1000))
.emplace(obj_id, ObjectPullRequest(/*next_pull_time=*/get_time_()))
.first;
}
it->second.bundle_request_ids.insert(bundle_it->first);
@@ -89,43 +91,49 @@ void PullManager::TryToMakeObjectLocal(const ObjectID &object_id) {
if (it == object_pull_requests_.end()) {
return;
}
auto &request = it->second;
if (request.next_pull_time > get_time_()) {
return;
}
if (!request.spilled_url.empty()) {
// Try to restore the spilled object.
restore_spilled_object_(object_id, request.spilled_url,
[this, object_id](const ray::Status &status) {
// Fall back to fetching from another object manager.
if (!status.ok()) {
PullFromRandomLocation(object_id);
}
});
restore_spilled_object_(
object_id, request.spilled_url, [this, object_id](const ray::Status &status) {
bool did_pull = true;
// Fall back to fetching from another object manager.
if (!status.ok()) {
did_pull = PullFromRandomLocation(object_id);
}
if (!did_pull) {
RAY_LOG(WARNING) << "Object restoration failed and the object could not be "
"found on any other nodes. Object id: "
<< object_id;
}
});
UpdateRetryTimer(request);
} else {
// New object locations were found, so begin trying to pull from a
// client. This will be called every time a new client location
// appears.
PullFromRandomLocation(object_id);
bool did_pull = PullFromRandomLocation(object_id);
if (did_pull) {
UpdateRetryTimer(request);
}
}
const auto time = get_time_();
auto retry_timeout_len = (pull_timeout_ms_ / 1000.) * (1UL << request.num_retries);
request.next_pull_time = time + retry_timeout_len;
// Bound the retry time at 10 * 1024 seconds.
request.num_retries = std::min(request.num_retries + 1, 10);
}
void PullManager::PullFromRandomLocation(const ObjectID &object_id) {
bool PullManager::PullFromRandomLocation(const ObjectID &object_id) {
auto it = object_pull_requests_.find(object_id);
if (it == object_pull_requests_.end()) {
return;
return false;
}
auto &node_vector = it->second.client_locations;
// The timer should never fire if there are no expected client locations.
if (node_vector.empty()) {
return;
return false;
}
RAY_CHECK(!object_is_local_(object_id));
@@ -138,7 +146,7 @@ void PullManager::PullFromRandomLocation(const ObjectID &object_id) {
<< "already has the object. The object may have been evicted. It is "
<< "most likely due to memory pressure, object pull has been "
<< "requested before object location is updated.";
return;
return false;
}
// Choose a random client to pull the object from.
@@ -163,16 +171,22 @@ void PullManager::PullFromRandomLocation(const ObjectID &object_id) {
RAY_LOG(DEBUG) << "Sending pull request from " << self_node_id_ << " to " << node_id
<< " of object " << object_id;
send_pull_request_(object_id, node_id);
return true;
}
void PullManager::UpdateRetryTimer(ObjectPullRequest &request) {
const auto time = get_time_();
auto retry_timeout_len = (pull_timeout_ms_ / 1000.) * (1UL << request.num_retries);
request.next_pull_time = time + retry_timeout_len;
// Bound the retry time at 10 * 1024 seconds.
request.num_retries = std::min(request.num_retries + 1, 10);
}
void PullManager::Tick() {
for (auto &pair : object_pull_requests_) {
const auto &object_id = pair.first;
auto &request = pair.second;
const auto time = get_time_();
if (time >= request.next_pull_time) {
TryToMakeObjectLocal(object_id);
}
TryToMakeObjectLocal(object_id);
}
}
+9 -1
View File
@@ -102,7 +102,15 @@ class PullManager {
/// Try to Pull an object from one of its expected client locations. If there
/// are more client locations to try after this attempt, then this method
/// will try each of the other clients in succession.
void PullFromRandomLocation(const ObjectID &object_id);
///
/// \return True if a pull request was sent, otherwise false.
bool PullFromRandomLocation(const ObjectID &object_id);
/// Update the request retry time for the given request.
/// The retry timer is incremented exponentially, capped at 1024 * 10 seconds.
///
/// \param request The request to update the retry time of.
void UpdateRetryTimer(ObjectPullRequest &request);
/// See the constructor's arguments.
NodeID self_node_id_;
@@ -24,8 +24,9 @@ class PullManagerTest : public ::testing::Test {
num_send_pull_request_calls_++;
},
[this](const ObjectID &, const std::string &,
std::function<void(const ray::Status &)>) {
std::function<void(const ray::Status &)> callback) {
num_restore_spilled_object_calls_++;
restore_object_callback_ = callback;
},
[this]() { return fake_time_; }, 10000) {}
@@ -33,6 +34,7 @@ class PullManagerTest : public ::testing::Test {
bool object_is_local_;
int num_send_pull_request_calls_;
int num_restore_spilled_object_calls_;
std::function<void(const ray::Status &)> restore_object_callback_;
double fake_time_;
PullManager pull_manager_;
};
@@ -98,6 +100,7 @@ TEST_F(PullManagerTest, TestRestoreSpilledObject) {
ASSERT_EQ(num_restore_spilled_object_calls_, 1);
client_ids.insert(NodeID::FromRandom());
fake_time_ += 10.;
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar");
// The behavior is supposed to be to always restore the spilled object if possible (even
@@ -116,6 +119,56 @@ TEST_F(PullManagerTest, TestRestoreSpilledObject) {
ASSERT_EQ(pull_manager_.NumActiveRequests(), 0);
}
TEST_F(PullManagerTest, TestRestoreObjectFailed) {
auto refs = CreateObjectRefs(1);
auto obj1 = ObjectRefsToIds(refs)[0];
rpc::Address addr1;
ASSERT_EQ(pull_manager_.NumActiveRequests(), 0);
std::vector<rpc::ObjectReference> objects_to_locate;
pull_manager_.Pull(refs, &objects_to_locate);
ASSERT_EQ(ObjectRefsToIds(objects_to_locate), ObjectRefsToIds(refs));
ASSERT_EQ(pull_manager_.NumActiveRequests(), 1);
std::unordered_set<NodeID> client_ids;
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar");
// client_ids is empty here, so there's nowhere to pull from.
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 1);
restore_object_callback_(ray::Status::IOError(":("));
// client_ids is empty here, so there's nowhere to pull from.
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 1);
client_ids.insert(NodeID::FromRandom());
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar");
// We always assume the restore succeeded so there's only 1 restore call still.
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 1);
fake_time_ += 10.0;
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar");
ASSERT_EQ(num_send_pull_request_calls_, 0);
ASSERT_EQ(num_restore_spilled_object_calls_, 2);
restore_object_callback_(ray::Status::IOError(":("));
// Since restore failed, we can fallback to pulling from another node immediately.
ASSERT_EQ(num_send_pull_request_calls_, 1);
ASSERT_EQ(num_restore_spilled_object_calls_, 2);
pull_manager_.OnLocationChange(obj1, client_ids, "remote_url/foo/bar");
// Now that we've successfully sent a pull request, we need to wait for the retry period
// before sending another one.
ASSERT_EQ(num_send_pull_request_calls_, 1);
ASSERT_EQ(num_restore_spilled_object_calls_, 2);
}
TEST_F(PullManagerTest, TestManyUpdates) {
auto refs = CreateObjectRefs(1);
auto obj1 = ObjectRefsToIds(refs)[0];
@@ -133,7 +186,8 @@ TEST_F(PullManagerTest, TestManyUpdates) {
pull_manager_.OnLocationChange(obj1, client_ids, "");
}
ASSERT_EQ(num_send_pull_request_calls_, 100);
// Since no time has passed, only send a single pull request.
ASSERT_EQ(num_send_pull_request_calls_, 1);
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
auto objects_to_cancel = pull_manager_.CancelPull(req_id);
@@ -160,22 +214,18 @@ TEST_F(PullManagerTest, TestRetryTimer) {
ASSERT_EQ(num_send_pull_request_calls_, 1);
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
for (; fake_time_ <= 127 * 10; fake_time_ += 1.) {
for (; fake_time_ <= 7 * 10; fake_time_ += 1.) {
pull_manager_.Tick();
}
// Rapid set of location changes.
for (int i = 0; i < 127; i++) {
fake_time_ += 0.1;
// Location changes can trigger reset timer.
for (; fake_time_ <= 120 * 10; fake_time_ += 1.) {
pull_manager_.OnLocationChange(obj1, client_ids, "");
}
// We should make a pull request every tick (even if it's a duplicate to a node we're
// already pulling from).
// OnLocationChange also doesn't count towards the retry timer.
// To the casual observer, this may seem off-by-one, but this is due to floating point
// error (0.1 + 0.1 ... 10k times > 10 == True)
ASSERT_EQ(num_send_pull_request_calls_, 1 + 7 + 127);
ASSERT_EQ(num_send_pull_request_calls_, 7);
ASSERT_EQ(num_restore_spilled_object_calls_, 0);
// Don't retry an object if it's local.
@@ -255,6 +305,7 @@ TEST_F(PullManagerTest, TestDeduplicateBundles) {
ASSERT_TRUE(objects_to_cancel.empty());
// Objects should still be pulled because the other request is still open.
ASSERT_EQ(pull_manager_.NumActiveRequests(), oids.size());
fake_time_ += 10;
num_send_pull_request_calls_ = 0;
for (size_t i = 0; i < oids.size(); i++) {
pull_manager_.OnLocationChange(oids[i], client_ids, "");