[Placement Group]Reschedule bundles when the node of bundles is dead (#10021)

This commit is contained in:
fangfengbin
2020-08-20 04:24:42 +08:00
committed by GitHub
parent 888f0a2c60
commit 9734dbca3e
11 changed files with 443 additions and 155 deletions
+69 -1
View File
@@ -8,7 +8,7 @@ except ImportError:
pytest_timeout = None
import ray
from ray.test_utils import wait_for_condition
from ray.test_utils import get_other_nodes, wait_for_condition
import ray.cluster_utils
from ray._raylet import PlacementGroupID
@@ -350,5 +350,73 @@ def test_cuda_visible_devices(ray_start_cluster):
assert devices == "0", devices
def test_placement_group_reschedule_when_node_dead(ray_start_cluster):
@ray.remote(num_cpus=1)
class Actor(object):
def __init__(self):
self.n = 0
def value(self):
return self.n
cluster = ray_start_cluster
cluster.add_node(num_cpus=4)
cluster.add_node(num_cpus=4)
cluster.add_node(num_cpus=4)
cluster.wait_for_nodes()
ray.init(address=cluster.address)
# Make sure both head and worker node are alive.
nodes = ray.nodes()
assert len(nodes) == 3
assert nodes[0]["alive"] and nodes[1]["alive"] and nodes[2]["alive"]
placement_group_id = ray.experimental.placement_group(
name="name",
strategy="SPREAD",
bundles=[{
"CPU": 2
}, {
"CPU": 2
}, {
"CPU": 2
}])
actor_1 = Actor.options(
placement_group_id=placement_group_id,
placement_group_bundle_index=0,
detached=True).remote()
actor_2 = Actor.options(
placement_group_id=placement_group_id,
placement_group_bundle_index=1,
detached=True).remote()
actor_3 = Actor.options(
placement_group_id=placement_group_id,
placement_group_bundle_index=2,
detached=True).remote()
print(ray.get(actor_1.value.remote()))
print(ray.get(actor_2.value.remote()))
print(ray.get(actor_3.value.remote()))
cluster.remove_node(get_other_nodes(cluster, exclude_head=True)[-1])
cluster.wait_for_nodes()
actor_4 = Actor.options(
placement_group_id=placement_group_id,
placement_group_bundle_index=0,
detached=True).remote()
actor_5 = Actor.options(
placement_group_id=placement_group_id,
placement_group_bundle_index=1,
detached=True).remote()
actor_6 = Actor.options(
placement_group_id=placement_group_id,
placement_group_bundle_index=2,
detached=True).remote()
print(ray.get(actor_4.value.remote()))
print(ray.get(actor_5.value.remote()))
print(ray.get(actor_6.value.remote()))
ray.shutdown()
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
@@ -395,6 +395,8 @@ std::shared_ptr<rpc::GcsNodeInfo> GcsNodeManager::RemoveNode(
alive_nodes_.erase(iter);
// Remove from cluster resources.
cluster_resources_.erase(node_id);
// Remove from cluster realtime resources.
cluster_realtime_resources_.erase(node_id);
if (!is_intended) {
// Broadcast a warning to all of the drivers indicating that the node
// has been marked as dead.
@@ -40,7 +40,7 @@ std::string GcsPlacementGroup::GetName() const {
}
std::vector<std::shared_ptr<BundleSpecification>> GcsPlacementGroup::GetBundles() const {
auto bundles = placement_group_table_data_.bundles();
const auto &bundles = placement_group_table_data_.bundles();
std::vector<std::shared_ptr<BundleSpecification>> ret_bundles;
for (auto &bundle : bundles) {
ret_bundles.push_back(std::make_shared<BundleSpecification>(bundle));
@@ -48,6 +48,18 @@ std::vector<std::shared_ptr<BundleSpecification>> GcsPlacementGroup::GetBundles(
return ret_bundles;
}
std::vector<std::shared_ptr<BundleSpecification>> GcsPlacementGroup::GetUnplacedBundles()
const {
const auto &bundles = placement_group_table_data_.bundles();
std::vector<std::shared_ptr<BundleSpecification>> unplaced_bundles;
for (auto &bundle : bundles) {
if (ClientID::FromBinary(bundle.node_id()).IsNil()) {
unplaced_bundles.push_back(std::make_shared<BundleSpecification>(bundle));
}
}
return unplaced_bundles;
}
rpc::PlacementStrategy GcsPlacementGroup::GetStrategy() const {
return placement_group_table_data_.strategy();
}
@@ -56,13 +68,17 @@ const rpc::PlacementGroupTableData &GcsPlacementGroup::GetPlacementGroupTableDat
return placement_group_table_data_;
}
const std::string GcsPlacementGroup::DebugString() const {
std::string GcsPlacementGroup::DebugString() const {
std::stringstream stream;
stream << "placement group id = " << GetPlacementGroupID() << ", name = " << GetName()
<< ", strategy = " << GetStrategy();
return stream.str();
}
rpc::Bundle *GcsPlacementGroup::GetMutableBundle(int bundle_index) {
return placement_group_table_data_.mutable_bundles(bundle_index);
}
/////////////////////////////////////////////////////////////////////////////////////////
GcsPlacementGroupManager::GcsPlacementGroupManager(
@@ -74,20 +90,18 @@ GcsPlacementGroupManager::GcsPlacementGroupManager(
gcs_table_storage_(std::move(gcs_table_storage)) {}
void GcsPlacementGroupManager::RegisterPlacementGroup(
const ray::rpc::CreatePlacementGroupRequest &request, StatusCallback callback) {
const std::shared_ptr<GcsPlacementGroup> &placement_group, StatusCallback callback) {
RAY_CHECK(callback);
const auto &placement_group_spec = request.placement_group_spec();
auto placement_group_id =
PlacementGroupID::FromBinary(placement_group_spec.placement_group_id());
auto placement_group = std::make_shared<GcsPlacementGroup>(request);
// TODO(ffbin): If GCS is restarted, GCS client will repeatedly send
// `CreatePlacementGroup` requests,
// which will lead to resource leakage, we will solve it in next pr.
// Mark the callback as pending and invoke it after the placement_group has been
// successfully created.
placement_group_to_register_callback_[placement_group_id] = std::move(callback);
registered_placement_groups_.emplace(placement_group_id, placement_group);
placement_group_to_register_callback_[placement_group->GetPlacementGroupID()] =
std::move(callback);
registered_placement_groups_.emplace(placement_group->GetPlacementGroupID(),
placement_group);
pending_placement_groups_.emplace_back(std::move(placement_group));
SchedulePendingPlacementGroups();
}
@@ -108,9 +122,20 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationFailed(
std::shared_ptr<GcsPlacementGroup> placement_group) {
RAY_LOG(WARNING) << "Failed to create placement group " << placement_group->GetName()
<< ", try again.";
// We will attempt to schedule this placement_group once
// an eligible node is registered.
pending_placement_groups_.emplace_back(std::move(placement_group));
// We will attempt to schedule this placement_group once an eligible node is
// registered.
auto state = placement_group->GetState();
RAY_CHECK(state == rpc::PlacementGroupTableData::RESCHEDULING ||
state == rpc::PlacementGroupTableData::PENDING);
if (state == rpc::PlacementGroupTableData::RESCHEDULING) {
// NOTE: If a node is dead, the placement group scheduler should try to recover the
// group by rescheduling the bundles of the dead node. This should have higher
// priority than trying to place other placement groups.
pending_placement_groups_.emplace_front(std::move(placement_group));
} else {
pending_placement_groups_.emplace_back(std::move(placement_group));
}
MarkSchedulingDone();
RetryCreatingPlacementGroup();
}
@@ -147,7 +172,7 @@ void GcsPlacementGroupManager::SchedulePendingPlacementGroups() {
if (registered_placement_groups_.find(placement_group_id) !=
registered_placement_groups_.end()) {
MarkSchedulingStarted(placement_group_id);
gcs_placement_group_scheduler_->Schedule(
gcs_placement_group_scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<GcsPlacementGroup> placement_group) {
OnPlacementGroupCreationFailed(std::move(placement_group));
@@ -186,18 +211,18 @@ void GcsPlacementGroupManager::HandleCreatePlacementGroup(
return;
}
RegisterPlacementGroup(
request, [reply, send_reply_callback, placement_group](Status status) {
if (status.ok()) {
RAY_LOG(INFO) << "Finished registering placement group, "
<< placement_group->DebugString();
} else {
RAY_LOG(WARNING)
<< "Failed to register placement group, "
<< placement_group->DebugString() << ", cause: " << status.message();
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
});
RegisterPlacementGroup(placement_group, [reply, send_reply_callback,
placement_group](Status status) {
if (status.ok()) {
RAY_LOG(INFO) << "Finished registering placement group, "
<< placement_group->DebugString();
} else {
RAY_LOG(WARNING) << "Failed to register placement group, "
<< placement_group->DebugString()
<< ", cause: " << status.message();
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
});
}));
}
@@ -299,5 +324,26 @@ void GcsPlacementGroupManager::RetryCreatingPlacementGroup() {
RayConfig::instance().gcs_create_placement_group_retry_interval_ms());
}
void GcsPlacementGroupManager::OnNodeDead(const ClientID &node_id) {
RAY_LOG(WARNING) << "Node " << node_id
<< " failed, rescheduling the placement groups on the dead node.";
auto bundles = gcs_placement_group_scheduler_->GetBundlesOnNode(node_id);
for (const auto &bundle : bundles) {
auto iter = registered_placement_groups_.find(bundle.first);
if (iter != registered_placement_groups_.end()) {
for (const auto &bundle_index : bundle.second) {
iter->second->GetMutableBundle(bundle_index)->clear_node_id();
}
// TODO(ffbin): If we have a placement group bundle that requires a unique resource
// (for example gpu resource when theres only one gpu node), this can postpone
// creating until a node with the resources is added. we will solve it in next pr.
iter->second->UpdateState(rpc::PlacementGroupTableData::RESCHEDULING);
pending_placement_groups_.emplace_front(iter->second);
}
}
SchedulePendingPlacementGroups();
}
} // namespace gcs
} // namespace ray
@@ -57,24 +57,32 @@ class GcsPlacementGroup {
/// Get the immutable PlacementGroupTableData of this placement group.
const rpc::PlacementGroupTableData &GetPlacementGroupTableData();
/// Get the mutable bundle of this placement group.
rpc::Bundle *GetMutableBundle(int bundle_index);
/// Update the state of this placement_group.
void UpdateState(rpc::PlacementGroupTableData::PlacementGroupState state);
/// Get the state of this gcs placement_group.
rpc::PlacementGroupTableData::PlacementGroupState GetState() const;
/// Get the id of this placement_group.
PlacementGroupID GetPlacementGroupID() const;
/// Get the name of this placement_group.
std::string GetName() const;
/// Get the bundles of this placement_group
/// Get the bundles of this placement_group (including unplaced).
std::vector<std::shared_ptr<BundleSpecification>> GetBundles() const;
/// Get the unplaced bundles of this placement group.
std::vector<std::shared_ptr<BundleSpecification>> GetUnplacedBundles() const;
/// Get the Strategy
rpc::PlacementStrategy GetStrategy() const;
// Get debug string for the placement group.
const std::string DebugString() const;
std::string DebugString() const;
private:
/// The placement_group meta data which contains the task specification as well as the
@@ -120,16 +128,15 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// Register placement_group asynchronously.
///
/// \param request Contains the meta info to create the placement_group.
/// \param placement_group The placement group to be created.
/// \param callback Will be invoked after the placement_group is created successfully or
/// be invoked immediately if the placement_group is already registered to
/// `registered_placement_groups_` and its state is `CREATED`. The callback will not be
/// called in this case.
void RegisterPlacementGroup(const rpc::CreatePlacementGroupRequest &request,
void RegisterPlacementGroup(const std::shared_ptr<GcsPlacementGroup> &placement_group,
StatusCallback callback);
/// Schedule placement_groups in the `pending_placement_groups_` queue.
/// This function is exposed for testing only.
void SchedulePendingPlacementGroups();
/// Get the placement_group ID for the named placement_group. Returns nil if the
@@ -156,6 +163,12 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
void RemovePlacementGroup(const PlacementGroupID &placement_group_id,
StatusCallback on_placement_group_removed);
/// Handle a node death. This will reschedule all bundles associated with the
/// specified node id.
///
/// \param node_id The specified node id.
void OnNodeDead(const ClientID &node_id);
private:
/// Try to create placement group after a short time.
void RetryCreatingPlacementGroup();
@@ -192,6 +205,9 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// The pending placement_groups which will not be scheduled until there's a resource
/// change.
/// NOTE: When we remove placement group, we need to look for
/// `pending_placement_groups_` and delete the specific placement group, so we can't use
/// `std::priority_queue`.
std::deque<std::shared_ptr<GcsPlacementGroup>> pending_placement_groups_;
/// The scheduler to schedule all registered placement_groups.
@@ -114,15 +114,17 @@ ScheduleMap GcsSpreadStrategy::Schedule(
return schedule_map;
}
void GcsPlacementGroupScheduler::Schedule(
void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
std::shared_ptr<GcsPlacementGroup> placement_group,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> failure_callback,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> success_callback) {
RAY_LOG(INFO) << "Scheduling placement group " << placement_group->GetName();
auto bundles = placement_group->GetBundles();
auto bundles = placement_group->GetUnplacedBundles();
auto strategy = placement_group->GetStrategy();
auto selected_nodes =
scheduler_strategies_[strategy]->Schedule(bundles, GetScheduleContext());
RAY_LOG(INFO) << "Scheduling placement group " << placement_group->GetName()
<< ", bundles size = " << bundles.size();
auto selected_nodes = scheduler_strategies_[strategy]->Schedule(
bundles, GetScheduleContext(placement_group->GetPlacementGroupID()));
// If no nodes are available, scheduling fails.
if (selected_nodes.empty()) {
@@ -177,10 +179,10 @@ void GcsPlacementGroupScheduler::Schedule(
void GcsPlacementGroupScheduler::DestroyPlacementGroupBundleResourcesIfExists(
const PlacementGroupID &placement_group_id) {
auto it = placement_group_to_bundle_location_.find(placement_group_id);
auto it = placement_group_to_bundle_locations_.find(placement_group_id);
// If bundle location has been already removed, it means bundles
// are already destroyed. Do nothing.
if (it == placement_group_to_bundle_location_.end()) {
if (it == placement_group_to_bundle_locations_.end()) {
return;
}
@@ -190,7 +192,7 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupBundleResourcesIfExists(
auto &node_id = iter.second.first;
CancelResourceReserve(bundle_spec, gcs_node_manager_.GetNode(node_id));
}
placement_group_to_bundle_location_.erase(it);
placement_group_to_bundle_locations_.erase(it);
// Remove bundles from node_to_leased_bundles_ because bundels are removed now.
for (const auto &bundle_location : *bundle_locations) {
@@ -243,7 +245,7 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(
const std::shared_ptr<BundleSpecification> &bundle_spec,
const std::shared_ptr<ray::rpc::GcsNodeInfo> &node) {
if (node == nullptr) {
RAY_LOG(WARNING) << "Node id " << node->node_id() << " for a placement group id "
RAY_LOG(WARNING) << "Node for a placement group id "
<< bundle_spec->PlacementGroupId() << " and a bundle index, "
<< bundle_spec->Index()
<< " has already removed. Cancellation request will be ignored.";
@@ -285,9 +287,7 @@ void GcsPlacementGroupScheduler::OnAllBundleSchedulingRequestReturned(
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_success_handler) {
const auto &placement_group_id = placement_group->GetPlacementGroupID();
RAY_CHECK(
placement_group_to_bundle_location_.emplace(placement_group_id, bundle_locations)
.second);
placement_group_to_bundle_locations_.emplace(placement_group_id, bundle_locations);
if (placement_group_leasing_in_progress_.find(placement_group_id) ==
placement_group_leasing_in_progress_.end() ||
@@ -310,12 +310,14 @@ void GcsPlacementGroupScheduler::OnAllBundleSchedulingRequestReturned(
[schedule_success_handler, placement_group](Status status) {
schedule_success_handler(placement_group);
}));
// Update `node_to_leased_bundles_`.
for (const auto &iter : *bundle_locations) {
const auto &location = iter.second;
const auto &bundle_sepc = location.second;
node_to_leased_bundles_[location.first].emplace(bundle_sepc->BundleId(),
bundle_sepc);
placement_group->GetMutableBundle(location.second->Index())
->set_node_id(location.first.Binary());
}
}
// Erase leasing in progress placement group.
@@ -326,7 +328,8 @@ void GcsPlacementGroupScheduler::OnAllBundleSchedulingRequestReturned(
}
}
std::unique_ptr<ScheduleContext> GcsPlacementGroupScheduler::GetScheduleContext() {
std::unique_ptr<ScheduleContext> GcsPlacementGroupScheduler::GetScheduleContext(
const PlacementGroupID &placement_group_id) {
// TODO(ffbin): We will add listener to the GCS node manager to handle node deletion.
auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes();
for (const auto &iter : alive_nodes) {
@@ -341,8 +344,28 @@ std::unique_ptr<ScheduleContext> GcsPlacementGroupScheduler::GetScheduleContext(
for (const auto &iter : node_to_leased_bundles_) {
node_to_bundles->emplace(iter.first, iter.second.size());
}
return std::unique_ptr<ScheduleContext>(
new ScheduleContext(node_to_bundles, gcs_node_manager_));
std::shared_ptr<BundleLocations> bundle_locations = nullptr;
auto iter = placement_group_to_bundle_locations_.find(placement_group_id);
if (iter != placement_group_to_bundle_locations_.end()) {
bundle_locations = iter->second;
}
return std::unique_ptr<ScheduleContext>(new ScheduleContext(
std::move(node_to_bundles), bundle_locations, gcs_node_manager_));
}
absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>>
GcsPlacementGroupScheduler::GetBundlesOnNode(const ClientID &node_id) {
absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>> bundles_on_node;
const auto node_iter = node_to_leased_bundles_.find(node_id);
if (node_iter != node_to_leased_bundles_.end()) {
const auto &bundles = node_iter->second;
for (auto &bundle : bundles) {
bundles_on_node[bundle.first.first].push_back(bundle.second->BundleId().second);
}
node_to_leased_bundles_.erase(node_iter);
}
return bundles_on_node;
}
} // namespace gcs
@@ -40,18 +40,27 @@ struct pair_hash {
using ScheduleMap = std::unordered_map<BundleID, ClientID, pair_hash>;
using BundleLocations = std::unordered_map<
BundleID, std::pair<ClientID, std::shared_ptr<BundleSpecification>>, pair_hash>;
class GcsPlacementGroup;
class GcsPlacementGroupSchedulerInterface {
public:
/// Schedule the specified placement_group.
/// Schedule unplaced bundles of the specified placement group.
///
/// \param placement_group to be scheduled.
virtual void Schedule(
/// \param placement_group The placement group to be scheduled.
/// \param failure_callback This function is called if the schedule is failed.
/// \param success_callback This function is called if the schedule is successful.
virtual void ScheduleUnplacedBundles(
std::shared_ptr<GcsPlacementGroup> placement_group,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> schedule_failure_handler,
std::function<void(std::shared_ptr<GcsPlacementGroup>)>
schedule_success_handler) = 0;
std::function<void(std::shared_ptr<GcsPlacementGroup>)> failure_callback,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> success_callback) = 0;
/// Get bundles belong to the specified node.
///
/// \param node_id ID of the dead node.
/// \return The bundles belong to the dead node.
virtual absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>> GetBundlesOnNode(
const ClientID &node_id) = 0;
/// Destroy bundle resources from all nodes in the placement group.
virtual void DestroyPlacementGroupBundleResourcesIfExists(
@@ -66,11 +75,16 @@ class GcsPlacementGroupSchedulerInterface {
class ScheduleContext {
public:
ScheduleContext(std::shared_ptr<absl::flat_hash_map<ClientID, int64_t>> node_to_bundles,
const std::shared_ptr<BundleLocations> &bundle_locations,
const GcsNodeManager &node_manager)
: node_to_bundles_(std::move(node_to_bundles)), node_manager_(node_manager) {}
: node_to_bundles_(std::move(node_to_bundles)),
bundle_locations_(bundle_locations),
node_manager_(node_manager) {}
// Key is node id, value is the number of bundles on the node.
std::shared_ptr<absl::flat_hash_map<ClientID, int64_t>> node_to_bundles_;
const std::shared_ptr<absl::flat_hash_map<ClientID, int64_t>> node_to_bundles_;
// The locations of existing bundles for this placement group.
const std::shared_ptr<BundleLocations> &bundle_locations_;
const GcsNodeManager &node_manager_;
};
@@ -124,7 +138,7 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
virtual ~GcsPlacementGroupScheduler() = default;
/// Schedule the specified placement_group.
/// Schedule unplaced bundles of the specified placement group.
/// If there is no available nodes then the `schedule_failed_handler` will be
/// triggered, otherwise the bundle in placement_group will be add into a queue and
/// schedule all bundle by calling ReserveResourceFromNode().
@@ -132,7 +146,7 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// \param placement_group to be scheduled.
/// \param failure_callback This function is called if the schedule is failed.
/// \param success_callback This function is called if the schedule is successful.
void Schedule(
void ScheduleUnplacedBundles(
std::shared_ptr<GcsPlacementGroup> placement_group,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> failure_handler,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> success_handler) override;
@@ -151,6 +165,13 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// cancelled.
void MarkScheduleCancelled(const PlacementGroupID &placement_group_id) override;
/// Get bundles belong to the specified node.
///
/// \param node_id ID of the dead node.
/// \return The bundles belong to the dead node.
absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>> GetBundlesOnNode(
const ClientID &node_id) override;
protected:
/// Lease resource from the specified node for the specified bundle.
void ReserveResourceFromNode(const std::shared_ptr<BundleSpecification> &bundle,
@@ -177,8 +198,9 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_success_handler);
/// Generate schedule conetext.
std::unique_ptr<ScheduleContext> GetScheduleContext();
/// Generate schedule context.
std::unique_ptr<ScheduleContext> GetScheduleContext(
const PlacementGroupID &placement_group_id);
/// A timer that ticks every cancel resource failure milliseconds.
boost::asio::deadline_timer return_timer_;
@@ -217,10 +239,11 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
absl::flat_hash_set<PlacementGroupID> placement_group_leasing_in_progress_;
/// A map from placement group id to bundle locations.
/// It is used to destroy bundles for the placement group.
/// It is used to destroy bundles for the placement group. When we reschedule bundles,
/// we can get the location of other bundles from here.
/// NOTE: It is a reverse index of `node_to_leased_bundles`.
absl::flat_hash_map<PlacementGroupID, std::shared_ptr<BundleLocations>>
placement_group_to_bundle_location_;
placement_group_to_bundle_locations_;
};
} // namespace gcs
+3 -2
View File
@@ -204,8 +204,9 @@ void GcsServer::InitGcsActorManager() {
gcs_node_manager_->AddNodeRemovedListener(
[this](std::shared_ptr<rpc::GcsNodeInfo> node) {
// All of the related actors should be reconstructed when a node is removed from
// the GCS.
// All of the related placement groups and actors should be reconstructed when a
// node is removed from the GCS.
gcs_placement_group_manager_->OnNodeDead(ClientID::FromBinary(node->node_id()));
gcs_actor_manager_->OnNodeDead(ClientID::FromBinary(node->node_id()));
});
@@ -24,14 +24,14 @@ using ::testing::_;
class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterface {
public:
MockPlacementGroupScheduler() {}
MockPlacementGroupScheduler() = default;
void Schedule(std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
std::function<void(std::shared_ptr<gcs::GcsPlacementGroup>)>
schedule_failure_handler = nullptr,
std::function<void(std::shared_ptr<gcs::GcsPlacementGroup>)>
schedule_success_handler = nullptr) {
placement_groups.push_back(placement_group);
void ScheduleUnplacedBundles(
std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
std::function<void(std::shared_ptr<gcs::GcsPlacementGroup>)> failure_handler,
std::function<void(std::shared_ptr<gcs::GcsPlacementGroup>)> success_handler)
override {
placement_groups_.push_back(placement_group);
}
MOCK_METHOD1(DestroyPlacementGroupBundleResourcesIfExists,
@@ -39,7 +39,16 @@ class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterf
MOCK_METHOD1(MarkScheduleCancelled, void(const PlacementGroupID &placement_group_id));
std::vector<std::shared_ptr<gcs::GcsPlacementGroup>> placement_groups;
absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>> GetBundlesOnNode(
const ClientID &node_id) override {
absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>> bundles;
bundles[group_on_dead_node_] = bundles_on_dead_node_;
return bundles;
}
PlacementGroupID group_on_dead_node_;
std::vector<int64_t> bundles_on_dead_node_;
std::vector<std::shared_ptr<gcs::GcsPlacementGroup>> placement_groups_;
};
class GcsPlacementGroupManagerTest : public ::testing::Test {
@@ -75,16 +84,17 @@ class GcsPlacementGroupManagerTest : public ::testing::Test {
};
TEST_F(GcsPlacementGroupManagerTest, TestBasic) {
auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
auto request = Mocker::GenCreatePlacementGroupRequest();
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
create_placement_group_request, [&finished_placement_group_count](Status status) {
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups.back();
mock_placement_group_scheduler_->placement_groups.pop_back();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.pop_back();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
WaitForExpectedCount(finished_placement_group_count, 1);
@@ -92,22 +102,23 @@ TEST_F(GcsPlacementGroupManagerTest, TestBasic) {
}
TEST_F(GcsPlacementGroupManagerTest, TestSchedulingFailed) {
auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
auto request = Mocker::GenCreatePlacementGroupRequest();
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
create_placement_group_request, [&finished_placement_group_count](Status status) {
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups.back();
mock_placement_group_scheduler_->placement_groups.clear();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.clear();
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group);
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1);
mock_placement_group_scheduler_->placement_groups.clear();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
mock_placement_group_scheduler_->placement_groups_.clear();
ASSERT_EQ(finished_placement_group_count, 0);
// Check that the placement_group is in state `CREATED`.
@@ -117,54 +128,54 @@ TEST_F(GcsPlacementGroupManagerTest, TestSchedulingFailed) {
}
TEST_F(GcsPlacementGroupManagerTest, TestGetPlacementGroupIDByName) {
auto create_placement_group_request =
Mocker::GenCreatePlacementGroupRequest("test_name");
auto request = Mocker::GenCreatePlacementGroupRequest("test_name");
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
create_placement_group_request, [&finished_placement_group_count](Status status) {
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups.back();
mock_placement_group_scheduler_->placement_groups.pop_back();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.pop_back();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
WaitForExpectedCount(finished_placement_group_count, 1);
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED);
ASSERT_EQ(
gcs_placement_group_manager_->GetPlacementGroupIDByName("test_name"),
PlacementGroupID::FromBinary(
create_placement_group_request.placement_group_spec().placement_group_id()));
PlacementGroupID::FromBinary(request.placement_group_spec().placement_group_id()));
}
TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeAdd) {
auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
auto request = Mocker::GenCreatePlacementGroupRequest();
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
create_placement_group_request, [&finished_placement_group_count](Status status) {
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups.back();
mock_placement_group_scheduler_->placement_groups.pop_back();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.pop_back();
// If the creation of placement group fails, it will be rescheduled after a short time.
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group);
auto condition = [this]() {
return (int)mock_placement_group_scheduler_->placement_groups.size() == 1;
return (int)mock_placement_group_scheduler_->placement_groups_.size() == 1;
};
EXPECT_TRUE(WaitForCondition(condition, 10 * 1000));
}
TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) {
auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
auto request = Mocker::GenCreatePlacementGroupRequest();
std::atomic<int> finished_placement_group_count(0);
std::atomic<int> failed_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
create_placement_group_request,
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count, &failed_placement_group_count](Status status) {
if (status.ok()) {
++finished_placement_group_count;
@@ -175,9 +186,9 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) {
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(failed_placement_group_count, 0);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups.back();
mock_placement_group_scheduler_->placement_groups.clear();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.clear();
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group);
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::PENDING);
@@ -188,8 +199,8 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) {
// Make sure it is not rescheduled
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 0);
mock_placement_group_scheduler_->placement_groups.clear();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0);
mock_placement_group_scheduler_->placement_groups_.clear();
WaitForExpectedCount(finished_placement_group_count, 0);
WaitForExpectedCount(failed_placement_group_count, 1);
@@ -199,11 +210,11 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) {
}
TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) {
auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
auto request = Mocker::GenCreatePlacementGroupRequest();
std::atomic<int> finished_placement_group_count(0);
std::atomic<int> failed_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
create_placement_group_request,
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count, &failed_placement_group_count](Status status) {
if (status.ok()) {
++finished_placement_group_count;
@@ -214,9 +225,9 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) {
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(failed_placement_group_count, 0);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups.back();
mock_placement_group_scheduler_->placement_groups.clear();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.clear();
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::PENDING);
// Placement group is in leasing state.
@@ -229,8 +240,8 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) {
// Make sure it is not rescheduled
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 0);
mock_placement_group_scheduler_->placement_groups.clear();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0);
mock_placement_group_scheduler_->placement_groups_.clear();
WaitForExpectedCount(finished_placement_group_count, 0);
WaitForExpectedCount(failed_placement_group_count, 1);
@@ -240,16 +251,17 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) {
}
TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) {
auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
auto request = Mocker::GenCreatePlacementGroupRequest();
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
create_placement_group_request, [&finished_placement_group_count](Status status) {
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
if (status.ok()) {
++finished_placement_group_count;
}
});
auto placement_group = mock_placement_group_scheduler_->placement_groups.back();
mock_placement_group_scheduler_->placement_groups.pop_back();
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.pop_back();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
WaitForExpectedCount(finished_placement_group_count, 1);
@@ -267,8 +279,8 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) {
// Make sure it is not rescheduled
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 0);
mock_placement_group_scheduler_->placement_groups.clear();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0);
mock_placement_group_scheduler_->placement_groups_.clear();
ASSERT_EQ(finished_placement_group_count, 1);
// Make sure we can re-remove again.
@@ -276,6 +288,61 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) {
placement_group_id, [](Status status) { ASSERT_TRUE(status.ok()); });
}
TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeDead) {
auto request1 = Mocker::GenCreatePlacementGroupRequest();
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request1),
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
auto request2 = Mocker::GenCreatePlacementGroupRequest();
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request2),
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
placement_group->GetMutableBundle(0)->set_node_id(ClientID::FromRandom().Binary());
placement_group->GetMutableBundle(1)->set_node_id(ClientID::FromRandom().Binary());
mock_placement_group_scheduler_->placement_groups_.pop_back();
// If a node dies, we will set the bundles above it to be unplaced and reschedule the
// placement group. The placement group state is set to `RESCHEDULING` and will be
// scheduled first.
mock_placement_group_scheduler_->group_on_dead_node_ =
placement_group->GetPlacementGroupID();
mock_placement_group_scheduler_->bundles_on_dead_node_.push_back(0);
gcs_placement_group_manager_->OnNodeDead(ClientID::FromRandom());
// Trigger scheduling `RESCHEDULING` placement group.
auto finished_group = std::make_shared<gcs::GcsPlacementGroup>(
placement_group->GetPlacementGroupTableData());
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(finished_group);
WaitForExpectedCount(finished_placement_group_count, 1);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_[0]->GetPlacementGroupID(),
placement_group->GetPlacementGroupID());
const auto &bundles =
mock_placement_group_scheduler_->placement_groups_[0]->GetBundles();
EXPECT_TRUE(ClientID::FromBinary(bundles[0]->GetMutableMessage().node_id()).IsNil());
EXPECT_FALSE(ClientID::FromBinary(bundles[1]->GetMutableMessage().node_id()).IsNil());
// If `RESCHEDULING` placement group fails to create, we will schedule it again first.
placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.pop_back();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0);
gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group);
auto condition = [this]() {
return (int)mock_placement_group_scheduler_->placement_groups_.size() == 1;
};
EXPECT_TRUE(WaitForCondition(condition, 10 * 1000));
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_[0]->GetPlacementGroupID(),
placement_group->GetPlacementGroupID());
}
} // namespace ray
int main(int argc, char **argv) {
@@ -37,17 +37,16 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
io_service_, io_service_, gcs_pub_sub_, gcs_table_storage_);
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
gcs_placement_group_scheduler_ =
std::make_shared<GcsServerMocker::MockedGcsPlacementGroupScheduler>(
io_service_, gcs_table_storage_, *gcs_node_manager_,
/*lease_client_fplacement_groupy=*/
[this](const rpc::Address &address) {
if (0 == address.port()) {
return raylet_client_;
} else {
return raylet_client1_;
}
});
scheduler_ = std::make_shared<GcsServerMocker::MockedGcsPlacementGroupScheduler>(
io_service_, gcs_table_storage_, *gcs_node_manager_,
/*lease_client_fplacement_groupy=*/
[this](const rpc::Address &address) {
if (0 == address.port()) {
return raylet_client_;
} else {
return raylet_client1_;
}
});
}
void TearDown() override {
@@ -88,15 +87,15 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
// Failed to schedule the placement group, because the node resources is not enough.
auto request = Mocker::GenCreatePlacementGroupRequest("", strategy);
auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request);
gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler,
success_handler);
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler,
success_handler);
WaitPendingDone(failure_placement_groups_, 1);
ASSERT_EQ(0, success_placement_groups_.size());
// A new node is added, and the rescheduling is successful.
AddNode(Mocker::GenNodeInfo(0), 2);
gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler,
success_handler);
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler,
success_handler);
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
WaitPendingDone(success_placement_groups_, 1);
@@ -112,8 +111,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
std::shared_ptr<GcsServerMocker::MockRayletResourceClient> raylet_client_;
std::shared_ptr<GcsServerMocker::MockRayletResourceClient> raylet_client1_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;
std::shared_ptr<GcsServerMocker::MockedGcsPlacementGroupScheduler>
gcs_placement_group_scheduler_;
std::shared_ptr<GcsServerMocker::MockedGcsPlacementGroupScheduler> scheduler_;
std::vector<std::shared_ptr<gcs::GcsPlacementGroup>> success_placement_groups_;
std::vector<std::shared_ptr<gcs::GcsPlacementGroup>> failure_placement_groups_;
std::shared_ptr<GcsServerMocker::MockGcsPubSub> gcs_pub_sub_;
@@ -127,7 +125,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestScheduleFailedWithZeroNode) {
auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request);
// Schedule the placement_group with zero node.
gcs_placement_group_scheduler_->Schedule(
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
failure_placement_groups_.emplace_back(std::move(placement_group));
@@ -154,7 +152,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupSuccess) {
// Schedule the placement_group with 1 available node, and the lease request should be
// send to the node.
gcs_placement_group_scheduler_->Schedule(
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
@@ -184,7 +182,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupFailed) {
// Schedule the placement_group with 1 available node, and the lease request should be
// send to the node.
gcs_placement_group_scheduler_->Schedule(
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
@@ -216,7 +214,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource)
// Schedule the placement_group with 1 available node, and the lease request should be
// send to the node.
gcs_placement_group_scheduler_->Schedule(
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
@@ -259,8 +257,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyBalancedScheduling)
auto request =
Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::STRICT_PACK);
auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request);
gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler,
success_handler);
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler,
success_handler);
if (!raylet_client_->lease_callbacks.empty()) {
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
@@ -295,8 +293,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) {
auto request =
Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::STRICT_PACK);
auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request);
gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler,
success_handler);
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler);
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
WaitPendingDone(success_placement_groups_, 1);
@@ -309,8 +306,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) {
Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::STRICT_PACK);
auto placement_group2 =
std::make_shared<gcs::GcsPlacementGroup>(create_placement_group_request2);
gcs_placement_group_scheduler_->Schedule(placement_group2, failure_handler,
success_handler);
scheduler_->ScheduleUnplacedBundles(placement_group2, failure_handler, success_handler);
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
WaitPendingDone(success_placement_groups_, 2);
@@ -327,7 +323,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyPlacementGroup) {
// Schedule the placement_group with 1 available node, and the lease request should be
// send to the node.
gcs_placement_group_scheduler_->Schedule(
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
@@ -342,14 +338,12 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyPlacementGroup) {
WaitPendingDone(failure_placement_groups_, 0);
WaitPendingDone(success_placement_groups_, 1);
const auto &placement_group_id = placement_group->GetPlacementGroupID();
gcs_placement_group_scheduler_->DestroyPlacementGroupBundleResourcesIfExists(
placement_group_id);
scheduler_->DestroyPlacementGroupBundleResourcesIfExists(placement_group_id);
ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve());
ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve());
// Subsequent destroy request should not do anything.
gcs_placement_group_scheduler_->DestroyPlacementGroupBundleResourcesIfExists(
placement_group_id);
scheduler_->DestroyPlacementGroupBundleResourcesIfExists(placement_group_id);
ASSERT_FALSE(raylet_client_->GrantCancelResourceReserve());
ASSERT_FALSE(raylet_client_->GrantCancelResourceReserve());
}
@@ -366,7 +360,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyCancelledPlacementGroup) {
// Schedule the placement_group with 1 available node, and the lease request should be
// send to the node.
gcs_placement_group_scheduler_->Schedule(
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
@@ -379,7 +373,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyCancelledPlacementGroup) {
// Now, cancel the schedule request.
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
gcs_placement_group_scheduler_->MarkScheduleCancelled(placement_group_id);
scheduler_->MarkScheduleCancelled(placement_group_id);
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve());
ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve());
@@ -407,8 +401,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPackStrategyLargeBundlesScheduling) {
auto request =
Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK, 15);
auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request);
gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler,
success_handler);
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler);
RAY_CHECK(raylet_client_->num_lease_requested > 0);
RAY_CHECK(raylet_client1_->num_lease_requested > 0);
for (int index = 0; index < raylet_client_->num_lease_requested; ++index) {
@@ -420,6 +413,51 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPackStrategyLargeBundlesScheduling) {
WaitPendingDone(success_placement_groups_, 1);
}
TEST_F(GcsPlacementGroupSchedulerTest, TestRescheduleWhenNodeDead) {
auto node0 = Mocker::GenNodeInfo(0);
auto node1 = Mocker::GenNodeInfo(1);
AddNode(node0);
AddNode(node1);
ASSERT_EQ(2, gcs_node_manager_->GetAllAliveNodes().size());
auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
auto placement_group =
std::make_shared<gcs::GcsPlacementGroup>(create_placement_group_request);
// Schedule the placement group successfully.
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
};
auto success_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
success_placement_groups_.emplace_back(std::move(placement_group));
};
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler);
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client1_->GrantResourceReserve());
WaitPendingDone(success_placement_groups_, 1);
auto bundles_on_node0 =
scheduler_->GetBundlesOnNode(ClientID::FromBinary(node0->node_id()));
ASSERT_EQ(1, bundles_on_node0.size());
auto bundles_on_node1 =
scheduler_->GetBundlesOnNode(ClientID::FromBinary(node1->node_id()));
ASSERT_EQ(1, bundles_on_node1.size());
// Node1 is dead, reschedule the placement group.
auto bundle_on_dead_node = placement_group->GetMutableBundle(0);
bundle_on_dead_node->clear_node_id();
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler);
if (0 == bundles_on_node0[placement_group->GetPlacementGroupID()][0]) {
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
} else {
ASSERT_TRUE(raylet_client1_->GrantResourceReserve());
}
WaitPendingDone(success_placement_groups_, 2);
}
} // namespace ray
int main(int argc, char **argv) {
+4 -2
View File
@@ -146,6 +146,8 @@ message Bundle {
}
BundleIdentifier bundle_id = 1;
map<string, double> unit_resources = 2;
// The location of this bundle.
bytes node_id = 3;
}
message PlacementGroupSpec {
@@ -192,11 +194,11 @@ message ActorCreationTaskSpec {
repeated string dynamic_worker_options = 4;
// The max number of concurrent calls for direct call actors.
int32 max_concurrency = 5;
// Whether the actor is persistent
// Whether the actor is persistent.
bool is_detached = 6;
// Globally-unique name of the actor. Should only be populated when is_detached is true.
string name = 7;
// Whether the actor use async actor calls
// Whether the actor use async actor calls.
bool is_asyncio = 8;
// Field used for storing application-level extensions to the actor definition.
string extension_data = 9;
+2
View File
@@ -169,6 +169,8 @@ message PlacementGroupTableData {
CREATED = 1;
// Placement Group is already removed and won't be reschedule.
REMOVED = 2;
// Placement Group is rescheduling because the node it placed is dead.
RESCHEDULING = 3;
}
// ID of the PlacementGroup.