[Placement Group]Placement Group supports gcs failover (Part1) (#11933)

This commit is contained in:
fangfengbin
2020-11-16 14:42:56 +08:00
committed by GitHub
parent d35de2272d
commit 8fb926565c
6 changed files with 235 additions and 73 deletions
@@ -117,19 +117,58 @@ GcsPlacementGroupManager::GcsPlacementGroupManager(
void GcsPlacementGroupManager::RegisterPlacementGroup(
const std::shared_ptr<GcsPlacementGroup> &placement_group, StatusCallback callback) {
// NOTE: After the abnormal recovery of the network between GCS client and GCS server or
// the GCS server is restarted, it is required to continue to register placement group
// successfully.
RAY_CHECK(callback);
const auto &placement_group_id = placement_group->GetPlacementGroupID();
auto iter = registered_placement_groups_.find(placement_group_id);
if (iter != registered_placement_groups_.end()) {
auto pending_register_iter =
placement_group_to_register_callback_.find(placement_group_id);
if (pending_register_iter != placement_group_to_register_callback_.end()) {
// 1. The GCS client sends the `RegisterPlacementGroup` request to the GCS server.
// 2. The GCS client receives some network errors.
// 3. The GCS client resends the `RegisterPlacementGroup` request to the GCS server.
pending_register_iter->second = std::move(callback);
} else {
// 1. The GCS client sends the `RegisterPlacementGroup` request to the GCS server.
// 2. The GCS server flushes the placement group to the storage and restarts before
// replying to the GCS client.
// 3. The GCS client resends the `RegisterPlacementGroup` request to the GCS server.
RAY_LOG(INFO) << "Placement group " << placement_group_id
<< " is already registered.";
callback(Status::OK());
}
return;
}
// TODO(ffbin): If GCS is restarted, GCS client will repeatedly send
// `CreatePlacementGroup` requests,
// which will lead to resource leakage, we will solve it in next pr.
// Mark the callback as pending and invoke it after the placement_group has been
// successfully created.
placement_group_to_register_callback_[placement_group->GetPlacementGroupID()] =
std::move(callback);
registered_placement_groups_.emplace(placement_group->GetPlacementGroupID(),
placement_group);
pending_placement_groups_.emplace_back(std::move(placement_group));
SchedulePendingPlacementGroups();
pending_placement_groups_.emplace_back(placement_group);
RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put(
placement_group_id, placement_group->GetPlacementGroupTableData(),
[this, placement_group_id, placement_group](Status status) {
RAY_CHECK_OK(status);
if (!registered_placement_groups_.contains(placement_group_id)) {
auto iter = placement_group_to_register_callback_.find(placement_group_id);
if (iter != placement_group_to_register_callback_.end()) {
std::stringstream stream;
stream << "Placement group of id " << placement_group_id
<< " has been removed before registration.";
iter->second(Status::NotFound(stream.str()));
placement_group_to_register_callback_.erase(iter);
}
} else {
SchedulePendingPlacementGroups();
}
}));
}
PlacementGroupID GcsPlacementGroupManager::GetPlacementGroupIDByName(
@@ -217,41 +256,19 @@ void GcsPlacementGroupManager::HandleCreatePlacementGroup(
const ray::rpc::CreatePlacementGroupRequest &request,
ray::rpc::CreatePlacementGroupReply *reply,
ray::rpc::SendReplyCallback send_reply_callback) {
auto placement_group_id =
PlacementGroupID::FromBinary(request.placement_group_spec().placement_group_id());
auto placement_group = std::make_shared<GcsPlacementGroup>(request);
RAY_LOG(INFO) << "Registering placement group, " << placement_group->DebugString();
// We need this call here because otherwise, if placement group is removed right after
// here, it can cause inconsistent states.
registered_placement_groups_.emplace(placement_group_id, placement_group);
RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put(
placement_group_id, placement_group->GetPlacementGroupTableData(),
[this, request, reply, send_reply_callback, placement_group_id,
placement_group](Status status) {
RAY_CHECK_OK(status);
if (!registered_placement_groups_.contains(placement_group_id)) {
std::stringstream stream;
stream << "Placement group of id " << placement_group_id
<< " has been removed before registration.";
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::NotFound(stream.str()));
return;
}
RegisterPlacementGroup(placement_group, [reply, send_reply_callback,
placement_group](Status status) {
if (status.ok()) {
RAY_LOG(INFO) << "Finished registering placement group, "
<< placement_group->DebugString();
} else {
RAY_LOG(INFO) << "Failed to register placement group, "
<< placement_group->DebugString()
<< ", cause: " << status.message();
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
});
}));
RegisterPlacementGroup(placement_group, [reply, send_reply_callback,
placement_group](Status status) {
if (status.ok()) {
RAY_LOG(INFO) << "Finished registering placement group, "
<< placement_group->DebugString();
} else {
RAY_LOG(INFO) << "Failed to register placement group, "
<< placement_group->DebugString() << ", cause: " << status.message();
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
});
}
void GcsPlacementGroupManager::HandleRemovePlacementGroup(
@@ -448,5 +465,29 @@ void GcsPlacementGroupManager::UpdatePlacementGroupLoad() {
gcs_node_manager_.UpdatePlacementGroupLoad(move(placement_group_load));
}
void GcsPlacementGroupManager::LoadInitialData(const EmptyCallback &done) {
RAY_LOG(INFO) << "GcsPlacementGroupManager loading initial data.";
auto callback = [this,
done](const std::unordered_map<PlacementGroupID,
rpc::PlacementGroupTableData> &result) {
for (auto &item : result) {
auto placement_group = std::make_shared<GcsPlacementGroup>(item.second);
if (item.second.state() != rpc::PlacementGroupTableData::REMOVED) {
registered_placement_groups_.emplace(item.first, placement_group);
if (item.second.state() == rpc::PlacementGroupTableData::PENDING ||
item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) {
pending_placement_groups_.emplace_back(std::move(placement_group));
}
}
}
SchedulePendingPlacementGroups();
RAY_LOG(INFO) << "Finished loading initial data.";
done();
};
RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().GetAll(callback));
}
} // namespace gcs
} // namespace ray
@@ -231,6 +231,12 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// Collect stats from gcs placement group manager in-memory data structures.
void CollectStats() const;
/// Load initial data from gcs storage to memory cache asynchronously.
/// This should be called when GCS server restarts after a failure.
///
/// \param done Callback that will be called when load is complete.
void LoadInitialData(const EmptyCallback &done);
private:
/// Try to create placement group after a short time.
void RetryCreatingPlacementGroup();
+2 -1
View File
@@ -101,7 +101,7 @@ void GcsServer::Start() {
rpc_server_.RegisterService(*worker_info_service_);
auto load_completed_count = std::make_shared<int>(0);
int load_count = 2;
int load_count = 3;
auto on_done = [this, load_count, load_completed_count]() {
++(*load_completed_count);
@@ -126,6 +126,7 @@ void GcsServer::Start() {
};
gcs_object_manager_->LoadInitialData(on_done);
gcs_node_manager_->LoadInitialData(on_done);
gcs_placement_group_manager_->LoadInitialData(on_done);
// Print debug info periodically.
PrintDebugInfo();
@@ -31,6 +31,7 @@ class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterf
std::function<void(std::shared_ptr<gcs::GcsPlacementGroup>)> failure_handler,
std::function<void(std::shared_ptr<gcs::GcsPlacementGroup>)> success_handler)
override {
absl::MutexLock lock(&mutex_);
placement_groups_.push_back(placement_group);
}
@@ -46,9 +47,15 @@ class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterf
return bundles;
}
int GetPlacementGroupCount() {
absl::MutexLock lock(&mutex_);
return placement_groups_.size();
}
PlacementGroupID group_on_dead_node_;
std::vector<int64_t> bundles_on_dead_node_;
std::vector<std::shared_ptr<gcs::GcsPlacementGroup>> placement_groups_;
absl::Mutex mutex_;
};
class GcsPlacementGroupManagerTest : public ::testing::Test {
@@ -78,6 +85,13 @@ class GcsPlacementGroupManagerTest : public ::testing::Test {
thread_io_service_->join();
}
void WaitForExpectedPgCount(int expected_count) {
auto condition = [this, expected_count]() {
return mock_placement_group_scheduler_->GetPlacementGroupCount() == expected_count;
};
EXPECT_TRUE(WaitForCondition(condition, 10 * 1000));
}
std::shared_ptr<MockPlacementGroupScheduler> mock_placement_group_scheduler_;
std::unique_ptr<gcs::GcsPlacementGroupManager> gcs_placement_group_manager_;
@@ -95,11 +109,11 @@ TEST_F(GcsPlacementGroupManagerTest, TestBasic) {
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
[&finished_placement_group_count](const Status &status) {
++finished_placement_group_count;
});
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.pop_back();
@@ -113,12 +127,12 @@ TEST_F(GcsPlacementGroupManagerTest, TestSchedulingFailed) {
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
[&finished_placement_group_count](const Status &status) {
++finished_placement_group_count;
});
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.clear();
@@ -139,12 +153,12 @@ TEST_F(GcsPlacementGroupManagerTest, TestGetPlacementGroupIDByName) {
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
[&finished_placement_group_count](const Status &status) {
++finished_placement_group_count;
});
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.pop_back();
@@ -161,20 +175,17 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeAdd) {
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
[&finished_placement_group_count](const Status &status) {
++finished_placement_group_count;
});
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.pop_back();
// If the creation of placement group fails, it will be rescheduled after a short time.
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group);
auto condition = [this]() {
return (int)mock_placement_group_scheduler_->placement_groups_.size() == 1;
};
EXPECT_TRUE(WaitForCondition(condition, 10 * 1000));
WaitForExpectedPgCount(1);
}
TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) {
@@ -183,7 +194,8 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) {
std::atomic<int> failed_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count, &failed_placement_group_count](Status status) {
[&finished_placement_group_count,
&failed_placement_group_count](const Status &status) {
if (status.ok()) {
++finished_placement_group_count;
} else {
@@ -191,9 +203,9 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) {
}
});
WaitForExpectedPgCount(1);
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(failed_placement_group_count, 0);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.clear();
@@ -201,7 +213,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) {
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::PENDING);
const auto &placement_group_id = placement_group->GetPlacementGroupID();
gcs_placement_group_manager_->RemovePlacementGroup(placement_group_id,
[](Status status) {});
[](const Status &status) {});
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::REMOVED);
// Make sure it is not rescheduled
@@ -213,7 +225,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) {
// Make sure we can re-remove again.
gcs_placement_group_manager_->RemovePlacementGroup(
placement_group_id, [](Status status) { ASSERT_TRUE(status.ok()); });
placement_group_id, [](const Status &status) { ASSERT_TRUE(status.ok()); });
}
TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) {
@@ -222,7 +234,8 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) {
std::atomic<int> failed_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count, &failed_placement_group_count](Status status) {
[&finished_placement_group_count,
&failed_placement_group_count](const Status &status) {
if (status.ok()) {
++finished_placement_group_count;
} else {
@@ -230,9 +243,9 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) {
}
});
WaitForExpectedPgCount(1);
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(failed_placement_group_count, 0);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.clear();
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::PENDING);
@@ -242,7 +255,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) {
EXPECT_CALL(*mock_placement_group_scheduler_, MarkScheduleCancelled(placement_group_id))
.Times(1);
gcs_placement_group_manager_->RemovePlacementGroup(placement_group_id,
[](Status status) {});
[](const Status &status) {});
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::REMOVED);
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group);
@@ -255,7 +268,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) {
// Make sure we can re-remove again.
gcs_placement_group_manager_->RemovePlacementGroup(
placement_group_id, [](Status status) { ASSERT_TRUE(status.ok()); });
placement_group_id, [](const Status &status) { ASSERT_TRUE(status.ok()); });
}
TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) {
@@ -263,11 +276,12 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) {
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
[&finished_placement_group_count](const Status &status) {
if (status.ok()) {
++finished_placement_group_count;
}
});
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.pop_back();
@@ -282,7 +296,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) {
EXPECT_CALL(*mock_placement_group_scheduler_, MarkScheduleCancelled(placement_group_id))
.Times(0);
gcs_placement_group_manager_->RemovePlacementGroup(placement_group_id,
[](Status status) {});
[](const Status &status) {});
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::REMOVED);
// Make sure it is not rescheduled
@@ -293,7 +307,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) {
// Make sure we can re-remove again.
gcs_placement_group_manager_->RemovePlacementGroup(
placement_group_id, [](Status status) { ASSERT_TRUE(status.ok()); });
placement_group_id, [](const Status &status) { ASSERT_TRUE(status.ok()); });
}
TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeDead) {
@@ -301,17 +315,17 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeDead) {
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request1),
[&finished_placement_group_count](Status status) {
[&finished_placement_group_count](const Status &status) {
++finished_placement_group_count;
});
auto request2 = Mocker::GenCreatePlacementGroupRequest();
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request2),
[&finished_placement_group_count](Status status) {
[&finished_placement_group_count](const Status &status) {
++finished_placement_group_count;
});
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
placement_group->GetMutableBundle(0)->set_node_id(NodeID::FromRandom().Binary());
placement_group->GetMutableBundle(1)->set_node_id(NodeID::FromRandom().Binary());
@@ -343,10 +357,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeDead) {
mock_placement_group_scheduler_->placement_groups_.pop_back();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0);
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group);
auto condition = [this]() {
return (int)mock_placement_group_scheduler_->placement_groups_.size() == 1;
};
EXPECT_TRUE(WaitForCondition(condition, 10 * 1000));
WaitForExpectedPgCount(1);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_[0]->GetPlacementGroupID(),
placement_group->GetPlacementGroupID());
}
@@ -367,6 +378,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenActorDeadAndJobDead
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
auto placement_group_id = placement_group->GetPlacementGroupID();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
@@ -400,6 +412,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenActorAndJobDead) {
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
auto placement_group_id = placement_group->GetPlacementGroupID();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
@@ -433,6 +446,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenOnlyJobDead) {
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
auto placement_group_id = placement_group->GetPlacementGroupID();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
@@ -463,6 +477,7 @@ TEST_F(GcsPlacementGroupManagerTest,
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
auto placement_group_id = placement_group->GetPlacementGroupID();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);