[Placement Group] Atomic Placement Group Part 1, Basic Structure. (#10482)

* Write a test.

* Basic structure done.

* Reduce flakiness of tests.

* Addressed code review.

* Skipping tests because it is flaky for now.

* Fix linting issues.

* Increase sleep time to see lint messages.

* Lint issue fixed.
This commit is contained in:
SangBin Cho
2020-09-02 18:14:46 -07:00
committed by GitHub
parent 4324dd5929
commit dc7fe1a4c5
13 changed files with 602 additions and 206 deletions
+1 -1
View File
@@ -159,7 +159,7 @@ matrix:
- . ./ci/travis/ci.sh lint
- . ./ci/travis/ci.sh build
script:
- true # we still need this block to exist, otherwise it will fall back to the global one
- sleep 30 # we still need this block to exist, otherwise it will fall back to the global one
# Build MacOS wheels and MacOS jars
- os: osx
+63
View File
@@ -632,5 +632,68 @@ def test_schedule_placement_group_when_node_add(ray_start_cluster):
wait_for_condition(is_placement_group_created)
@pytest.mark.skip(reason="Not working yet")
def test_atomic_creation(ray_start_cluster):
# Setup cluster.
cluster = ray_start_cluster
bundle_cpu_size = 2
bundle_per_node = 2
num_nodes = 5
nodes = [
cluster.add_node(num_cpus=bundle_cpu_size * bundle_per_node)
for _ in range(num_nodes)
]
ray.init(address=cluster.address)
@ray.remote(num_cpus=1)
class NormalActor:
def ping(self):
pass
# Create an actor that will fail bundle scheduling.
# It is important to use pack strategy to make test less flaky.
pg = ray.experimental.placement_group(
name="name",
strategy="PACK",
bundles=[{
"CPU": bundle_cpu_size
} for _ in range(num_nodes * bundle_per_node)])
# Create a placement group actor.
# This shouldn't be scheduled until placement group creation is done.
pg_actor = NormalActor.options(
placement_group=pg,
placement_group_bundle_index=num_nodes * bundle_per_node - 1).remote()
# Destroy some nodes to fail placement group creation.
nodes_to_kill = get_other_nodes(cluster, exclude_head=True)
for node_to_kill in nodes_to_kill:
cluster.remove_node(node_to_kill)
# Wait on the placement group now. It should be unready
# because normal actor takes resources that are required
# for one of bundle creation.
ready, unready = ray.wait([pg.ready()], timeout=0)
assert len(ready) == 0
assert len(unready) == 1
# Add a node back to schedule placement group.
for _ in range(len(nodes_to_kill)):
nodes.append(
cluster.add_node(num_cpus=bundle_cpu_size * bundle_per_node))
# Wait on the placement group creation.
ready, unready = ray.wait([pg.ready()])
assert len(ready) == 1
assert len(unready) == 0
# Confirm that the placement group actor is created. It will
# raise an exception if actor was scheduled before placement group was
# created.
# TODO(sang): This with statement should be removed after atomic creation
# is implemented. It will be done in the next PR.
with pytest.raises(ray.exceptions.RayActorError):
ray.get(pg_actor.ping.remote(), timeout=3.0)
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
@@ -224,40 +224,56 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
for (auto &bundle : bundles) {
const auto &bundle_id = bundle->BundleId();
const auto &node_id = selected_nodes[bundle_id];
lease_status_tracker->MarkLeaseStarted(node_id, bundle);
lease_status_tracker->MarkPreparePhaseStarted(node_id, bundle);
// TODO(sang): The callback might not be called at all if nodes are dead. We should
// handle this case properly.
ReserveResourceFromNode(
bundle, gcs_node_manager_.GetNode(node_id),
[this, bundle, node_id, lease_status_tracker, failure_callback,
success_callback](const Status &status) {
lease_status_tracker->MarkLeaseReturned(node_id, bundle, status);
if (lease_status_tracker->IsAllLeaseRequestReturned()) {
OnAllBundleSchedulingRequestReturned(lease_status_tracker, failure_callback,
success_callback);
}
});
PrepareResources(bundle, gcs_node_manager_.GetNode(node_id),
[this, bundle, node_id, lease_status_tracker, failure_callback,
success_callback](const Status &status) {
lease_status_tracker->MarkPrepareRequestReturned(node_id, bundle,
status);
if (lease_status_tracker->AllPrepareRequestsReturned()) {
OnAllBundlePrepareRequestReturned(
lease_status_tracker, failure_callback, success_callback);
}
});
}
}
void GcsPlacementGroupScheduler::DestroyPlacementGroupBundleResourcesIfExists(
const PlacementGroupID &placement_group_id) {
bool is_committed = false;
bool is_prepared = false;
std::shared_ptr<BundleLocations> bundle_locations = std::make_shared<BundleLocations>();
// Check if we can find committed bundle locations.
const auto &maybe_bundle_locations =
bundle_location_index_.GetBundleLocations(placement_group_id);
committed_bundle_location_index_.GetBundleLocations(placement_group_id);
// If bundle location has been already removed, it means bundles
// are already destroyed. Do nothing.
if (!maybe_bundle_locations.has_value()) {
return;
if (maybe_bundle_locations.has_value()) {
is_committed = true;
bundle_locations = maybe_bundle_locations.value();
}
const auto &bundle_locations = maybe_bundle_locations.value();
auto it = placement_group_leasing_in_progress_.find(placement_group_id);
if (it != placement_group_leasing_in_progress_.end()) {
const auto &leasing_context = it->second;
is_prepared = true;
bundle_locations = leasing_context->GetPreparedBundleLocations();
}
RAY_CHECK(!(is_committed && is_prepared))
<< "Anomaly detected. It shouldn't be possible that placement group is both "
"committing and preparing.";
// Cancel all resource reservation.
for (const auto &iter : *(bundle_locations)) {
auto &bundle_spec = iter.second.second;
auto &node_id = iter.second.first;
CancelResourceReserve(bundle_spec, gcs_node_manager_.GetNode(node_id));
}
bundle_location_index_.Erase(placement_group_id);
committed_bundle_location_index_.Erase(placement_group_id);
}
void GcsPlacementGroupScheduler::MarkScheduleCancelled(
@@ -267,21 +283,16 @@ void GcsPlacementGroupScheduler::MarkScheduleCancelled(
it->second->MarkPlacementGroupScheduleCancelled();
}
void GcsPlacementGroupScheduler::ReserveResourceFromNode(
void GcsPlacementGroupScheduler::PrepareResources(
const std::shared_ptr<BundleSpecification> &bundle,
const std::shared_ptr<ray::rpc::GcsNodeInfo> &node, const StatusCallback &callback) {
rpc::Address remote_address;
remote_address.set_raylet_id(node->node_id());
remote_address.set_ip_address(node->node_manager_address());
remote_address.set_port(node->node_manager_port());
auto node_id = ClientID::FromBinary(node->node_id());
auto lease_client = GetOrConnectLeaseClient(remote_address);
RAY_LOG(INFO) << "Leasing resource from node " << node_id
<< " for bundle: " << bundle->DebugString();
lease_client->RequestResourceReserve(
const auto lease_client = GetLeaseClientFromNode(node);
const auto &node_id = node->node_id();
RAY_LOG(INFO) << "Preparing resource from node " << node_id
<< " for a bundle: " << bundle->DebugString();
lease_client->PrepareBundleResources(
*bundle, [node_id, bundle, callback](
const Status &status, const rpc::RequestResourceReserveReply &reply) {
// TODO(AlisaWu): Add placement group cancel.
const Status &status, const rpc::PrepareBundleResourcesReply &reply) {
auto result = reply.success() ? Status::OK()
: Status::IOError("Failed to reserve resource");
if (result.ok()) {
@@ -295,6 +306,29 @@ void GcsPlacementGroupScheduler::ReserveResourceFromNode(
});
}
void GcsPlacementGroupScheduler::CommitResources(
const std::shared_ptr<BundleSpecification> &bundle,
const std::shared_ptr<ray::rpc::GcsNodeInfo> &node, const StatusCallback callback) {
RAY_CHECK(node != nullptr);
const auto lease_client = GetLeaseClientFromNode(node);
const auto &node_id = node->node_id();
RAY_LOG(INFO) << "Committing resource to a node " << node_id
<< " for a bundle: " << bundle->DebugString();
lease_client->CommitBundleResources(
*bundle, [bundle, node_id, callback](const Status &status,
const rpc::CommitBundleResourcesReply &reply) {
if (status.ok()) {
RAY_LOG(INFO) << "Finished committing resource to " << node_id
<< " for bundle: " << bundle->DebugString();
} else {
RAY_LOG(WARNING) << "Failed to commit resource to " << node_id
<< " for bundle: " << bundle->DebugString();
}
RAY_CHECK(callback);
callback(status);
});
}
void GcsPlacementGroupScheduler::CancelResourceReserve(
const std::shared_ptr<BundleSpecification> &bundle_spec,
const std::shared_ptr<ray::rpc::GcsNodeInfo> &node) {
@@ -308,11 +342,7 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(
auto node_id = ClientID::FromBinary(node->node_id());
RAY_LOG(INFO) << "Cancelling the resource reserved for bundle: "
<< bundle_spec->DebugString() << " at node " << node_id;
rpc::Address remote_address;
remote_address.set_raylet_id(node->node_id());
remote_address.set_ip_address(node->node_manager_address());
remote_address.set_port(node->node_manager_port());
auto return_client = GetOrConnectLeaseClient(remote_address);
const auto return_client = GetLeaseClientFromNode(node);
return_client->CancelResourceReserve(
*bundle_spec, [bundle_spec, node_id](const Status &status,
const rpc::CancelResourceReserveReply &reply) {
@@ -332,69 +362,150 @@ GcsPlacementGroupScheduler::GetOrConnectLeaseClient(const rpc::Address &raylet_a
return iter->second;
}
void GcsPlacementGroupScheduler::OnAllBundleSchedulingRequestReturned(
std::shared_ptr<ResourceReserveInterface>
GcsPlacementGroupScheduler::GetLeaseClientFromNode(
const std::shared_ptr<ray::rpc::GcsNodeInfo> &node) {
rpc::Address remote_address;
remote_address.set_raylet_id(node->node_id());
remote_address.set_ip_address(node->node_manager_address());
remote_address.set_port(node->node_manager_port());
return GetOrConnectLeaseClient(remote_address);
}
void GcsPlacementGroupScheduler::CommitAllBundles(
const std::shared_ptr<LeaseStatusTracker> &lease_status_tracker,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_failure_handler,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_success_handler) {
RAY_CHECK(lease_status_tracker->IsAllLeaseRequestReturned())
<< "This method can be called only after all bundle scheduling requests are "
"returend.";
const auto &placement_group = lease_status_tracker->GetPlacementGroup();
const auto &bundles = lease_status_tracker->GetUnplacedBundles();
const auto &bundle_locations = lease_status_tracker->GetBundleLocations();
const auto &placement_group_id = placement_group->GetPlacementGroupID();
bundle_location_index_.AddBundleLocations(placement_group_id, bundle_locations);
const std::shared_ptr<BundleLocations> &prepared_bundle_locations =
lease_status_tracker->GetPreparedBundleLocations();
lease_status_tracker->MarkCommitPhaseStarted();
for (const auto &bundle_to_commit : *prepared_bundle_locations) {
const auto &node_id = bundle_to_commit.second.first;
const auto &node = gcs_node_manager_.GetNode(node_id);
const auto &bundle = bundle_to_commit.second.second;
if (!lease_status_tracker->IsLeasingSucceed()) {
// If the lease request has been already cancelled
// or not every lease request succeeds.
DestroyPlacementGroupBundleResourcesIfExists(placement_group_id);
schedule_failure_handler(placement_group);
} else {
// If we successfully created placement group, store them to GCS.
rpc::ScheduleData data;
for (const auto &iter : bundles) {
// TODO(ekl) this is a hack to get a string key for the proto
auto key = iter->PlacementGroupId().Hex() + "_" + std::to_string(iter->Index());
data.mutable_schedule_plan()->insert(
{key, (*bundle_locations)[iter->BundleId()].first.Binary()});
}
RAY_CHECK_OK(gcs_table_storage_->PlacementGroupScheduleTable().Put(
placement_group_id, data,
[schedule_success_handler, placement_group](Status status) {
schedule_success_handler(placement_group);
}));
for (const auto &iter : *bundle_locations) {
const auto &location = iter.second;
placement_group->GetMutableBundle(location.second->Index())
->set_node_id(location.first.Binary());
}
// TODO(sang) Handle the case nodes are dead.
CommitResources(
bundle, node,
[this, lease_status_tracker, bundle, node_id, schedule_failure_handler,
schedule_success_handler](const Status &status) {
lease_status_tracker->MarkCommitRequestReturned(node_id, bundle, status);
if (lease_status_tracker->AllCommitRequestReturned()) {
OnAllBundleCommitRequestReturned(
lease_status_tracker, schedule_failure_handler, schedule_success_handler);
}
});
}
}
void GcsPlacementGroupScheduler::OnAllBundlePrepareRequestReturned(
const std::shared_ptr<LeaseStatusTracker> &lease_status_tracker,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_failure_handler,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_success_handler) {
RAY_CHECK(lease_status_tracker->AllPrepareRequestsReturned())
<< "This method can be called only after all bundle scheduling requests are "
"returned.";
const auto &placement_group = lease_status_tracker->GetPlacementGroup();
const auto &bundles = lease_status_tracker->GetBundlesToSchedule();
const auto &prepared_bundle_locations =
lease_status_tracker->GetPreparedBundleLocations();
const auto &placement_group_id = placement_group->GetPlacementGroupID();
if (!lease_status_tracker->AllPrepareRequestsSuccessful()) {
// Erase the status tracker from a in-memory map if exists.
DestroyPlacementGroupBundleResourcesIfExists(placement_group_id);
auto it = placement_group_leasing_in_progress_.find(placement_group_id);
RAY_CHECK(it != placement_group_leasing_in_progress_.end());
placement_group_leasing_in_progress_.erase(it);
schedule_failure_handler(placement_group);
return;
}
// If the prepare requests succeed, update the bundle location.
for (const auto &iter : *prepared_bundle_locations) {
const auto &location = iter.second;
placement_group->GetMutableBundle(location.second->Index())
->set_node_id(location.first.Binary());
}
// Store data to GCS.
rpc::ScheduleData data;
for (const auto &iter : bundles) {
// TODO(ekl) this is a hack to get a string key for the proto
auto key = iter->PlacementGroupId().Hex() + "_" + std::to_string(iter->Index());
data.mutable_schedule_plan()->insert(
{key, (*prepared_bundle_locations)[iter->BundleId()].first.Binary()});
}
RAY_CHECK_OK(gcs_table_storage_->PlacementGroupScheduleTable().Put(
placement_group_id, data,
[this, schedule_success_handler, schedule_failure_handler,
lease_status_tracker](Status status) {
CommitAllBundles(lease_status_tracker, schedule_failure_handler,
schedule_success_handler);
}));
}
void GcsPlacementGroupScheduler::OnAllBundleCommitRequestReturned(
const std::shared_ptr<LeaseStatusTracker> &lease_status_tracker,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_failure_handler,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_success_handler) {
const auto &placement_group = lease_status_tracker->GetPlacementGroup();
const auto &prepared_bundle_locations =
lease_status_tracker->GetPreparedBundleLocations();
const auto &placement_group_id = placement_group->GetPlacementGroupID();
// Clean up the leasing progress map.
auto it = placement_group_leasing_in_progress_.find(placement_group_id);
RAY_CHECK(it != placement_group_leasing_in_progress_.end());
placement_group_leasing_in_progress_.erase(it);
// Add a prepared bundle locations to committed bundle locations.
committed_bundle_location_index_.AddBundleLocations(placement_group_id,
prepared_bundle_locations);
if (!lease_status_tracker->AllCommitRequestsSuccessful()) {
if (lease_status_tracker->GetLeasingState() == LeasingState::CANCELLED) {
DestroyPlacementGroupBundleResourcesIfExists(placement_group_id);
} else {
// Update the state to be reschedule so that the failure handle will reschedule the
// failed bundles.
const auto &uncommitted_bundle_locations =
lease_status_tracker->GetUnCommittedBundleLocations();
for (const auto &bundle : *uncommitted_bundle_locations) {
placement_group->GetMutableBundle(bundle.first.second)->clear_node_id();
}
placement_group->UpdateState(rpc::PlacementGroupTableData::RESCHEDULING);
}
schedule_failure_handler(placement_group);
return;
}
schedule_success_handler(placement_group);
}
std::unique_ptr<ScheduleContext> GcsPlacementGroupScheduler::GetScheduleContext(
const PlacementGroupID &placement_group_id) {
auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes();
bundle_location_index_.AddNodes(alive_nodes);
committed_bundle_location_index_.AddNodes(alive_nodes);
auto node_to_bundles = std::make_shared<absl::flat_hash_map<ClientID, int64_t>>();
for (const auto &node_it : alive_nodes) {
const auto &node_id = node_it.first;
const auto &bundle_locations_on_node =
bundle_location_index_.GetBundleLocationsOnNode(node_id);
committed_bundle_location_index_.GetBundleLocationsOnNode(node_id);
RAY_CHECK(bundle_locations_on_node)
<< "Bundle locations haven't been registered for node id " << node_id;
const int bundles_size = bundle_locations_on_node.value()->size();
node_to_bundles->emplace(node_id, bundles_size);
}
auto &bundle_locations = bundle_location_index_.GetBundleLocations(placement_group_id);
auto &bundle_locations =
committed_bundle_location_index_.GetBundleLocations(placement_group_id);
return std::unique_ptr<ScheduleContext>(new ScheduleContext(
std::move(node_to_bundles), bundle_locations, gcs_node_manager_));
}
@@ -403,7 +514,7 @@ absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>>
GcsPlacementGroupScheduler::GetBundlesOnNode(const ClientID &node_id) {
absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>> bundles_on_node;
const auto &maybe_bundle_locations =
bundle_location_index_.GetBundleLocationsOnNode(node_id);
committed_bundle_location_index_.GetBundleLocationsOnNode(node_id);
if (maybe_bundle_locations.has_value()) {
const auto &bundle_locations = maybe_bundle_locations.value();
for (auto &bundle : *bundle_locations) {
@@ -411,7 +522,7 @@ GcsPlacementGroupScheduler::GetBundlesOnNode(const ClientID &node_id) {
const auto &bundle_index = bundle.first.second;
bundles_on_node[bundle_placement_group_id].push_back(bundle_index);
}
bundle_location_index_.Erase(node_id);
committed_bundle_location_index_.Erase(node_id);
}
return bundles_on_node;
}
@@ -507,22 +618,23 @@ void BundleLocationIndex::AddNodes(
LeaseStatusTracker::LeaseStatusTracker(
std::shared_ptr<GcsPlacementGroup> placement_group,
std::vector<std::shared_ptr<BundleSpecification>> &unplaced_bundles)
: placement_group_(placement_group), unplaced_bundles_(unplaced_bundles) {
bundle_locations_ = std::make_shared<BundleLocations>();
: placement_group_(placement_group), bundles_to_schedule_(unplaced_bundles) {
preparing_bundle_locations_ = std::make_shared<BundleLocations>();
uncommitted_bundle_locations_ = std::make_shared<BundleLocations>();
}
bool LeaseStatusTracker::MarkLeaseStarted(const ClientID &node_id,
std::shared_ptr<BundleSpecification> bundle) {
bool LeaseStatusTracker::MarkPreparePhaseStarted(
const ClientID &node_id, std::shared_ptr<BundleSpecification> bundle) {
const auto &bundle_id = bundle->BundleId();
return node_to_bundles_when_leasing_[node_id].emplace(bundle_id).second;
return node_to_bundles_when_preparing_[node_id].emplace(bundle_id).second;
}
void LeaseStatusTracker::MarkLeaseReturned(
void LeaseStatusTracker::MarkPrepareRequestReturned(
const ClientID &node_id, const std::shared_ptr<BundleSpecification> bundle,
const Status &status) {
RAY_CHECK(returned_count_ <= unplaced_bundles_.size());
auto leasing_bundles = node_to_bundles_when_leasing_.find(node_id);
RAY_CHECK(leasing_bundles != node_to_bundles_when_leasing_.end());
RAY_CHECK(prepare_request_returned_count_ <= bundles_to_schedule_.size());
auto leasing_bundles = node_to_bundles_when_preparing_.find(node_id);
RAY_CHECK(leasing_bundles != node_to_bundles_when_preparing_.end());
auto bundle_iter = leasing_bundles->second.find(bundle->BundleId());
RAY_CHECK(bundle_iter != leasing_bundles->second.end());
@@ -530,42 +642,66 @@ void LeaseStatusTracker::MarkLeaseReturned(
// remote node.
leasing_bundles->second.erase(bundle_iter);
if (leasing_bundles->second.empty()) {
node_to_bundles_when_leasing_.erase(leasing_bundles);
node_to_bundles_when_preparing_.erase(leasing_bundles);
}
// If the request succeeds, record it.
const auto &bundle_id = bundle->BundleId();
if (status.ok()) {
bundle_locations_->emplace(bundle_id, std::make_pair(node_id, bundle));
}
returned_count_ += 1;
// If every bundle lease request is returned, mark it as all returned.
if (IsAllLeaseRequestReturned()) {
UpdateLeasingState(LeasingState::ALL_RETURNED);
preparing_bundle_locations_->emplace(bundle_id, std::make_pair(node_id, bundle));
}
prepare_request_returned_count_ += 1;
}
bool LeaseStatusTracker::IsAllLeaseRequestReturned() const {
return returned_count_ == unplaced_bundles_.size();
bool LeaseStatusTracker::AllPrepareRequestsReturned() const {
return prepare_request_returned_count_ == bundles_to_schedule_.size();
}
bool LeaseStatusTracker::IsLeasingSucceed() const {
return IsAllLeaseRequestReturned() &&
(bundle_locations_->size() == unplaced_bundles_.size()) &&
bool LeaseStatusTracker::AllPrepareRequestsSuccessful() const {
return AllPrepareRequestsReturned() &&
(preparing_bundle_locations_->size() == bundles_to_schedule_.size()) &&
(leasing_state_ != LeasingState::CANCELLED);
}
void LeaseStatusTracker::MarkCommitRequestReturned(
const ClientID &node_id, const std::shared_ptr<BundleSpecification> bundle,
const Status &status) {
commit_request_returned_count_ += 1;
// If the request succeeds, record it.
const auto &bundle_id = bundle->BundleId();
if (!status.ok()) {
uncommitted_bundle_locations_->emplace(bundle_id, std::make_pair(node_id, bundle));
}
}
bool LeaseStatusTracker::AllCommitRequestReturned() const {
return commit_request_returned_count_ == bundles_to_schedule_.size();
}
bool LeaseStatusTracker::AllCommitRequestsSuccessful() const {
// We don't check cancel state here because we shouldn't destroy bundles when
// commit requests failed. Cancel state should be treated separately.
return AllCommitRequestReturned() &&
preparing_bundle_locations_->size() == bundles_to_schedule_.size();
}
const std::shared_ptr<GcsPlacementGroup> &LeaseStatusTracker::GetPlacementGroup() const {
return placement_group_;
}
const std::shared_ptr<BundleLocations> &LeaseStatusTracker::GetBundleLocations() const {
return bundle_locations_;
const std::shared_ptr<BundleLocations> &LeaseStatusTracker::GetPreparedBundleLocations()
const {
return preparing_bundle_locations_;
}
const std::shared_ptr<BundleLocations>
&LeaseStatusTracker::GetUnCommittedBundleLocations() const {
return uncommitted_bundle_locations_;
}
const std::vector<std::shared_ptr<BundleSpecification>>
&LeaseStatusTracker::GetUnplacedBundles() const {
return unplaced_bundles_;
&LeaseStatusTracker::GetBundlesToSchedule() const {
return bundles_to_schedule_;
}
const LeasingState LeaseStatusTracker::GetLeasingState() const { return leasing_state_; }
@@ -583,5 +719,9 @@ bool LeaseStatusTracker::UpdateLeasingState(LeasingState leasing_state) {
return true;
}
void LeaseStatusTracker::MarkCommitPhaseStarted() {
UpdateLeasingState(LeasingState::COMMITTING);
}
} // namespace gcs
} // namespace ray
@@ -66,7 +66,11 @@ class GcsPlacementGroupSchedulerInterface {
virtual void DestroyPlacementGroupBundleResourcesIfExists(
const PlacementGroupID &placement_group_id) = 0;
/// Mark the placement group schedule as cancelled. Cancelled bundles will be destroyed.
/// Mark the placement group scheduling is cancelled.
/// This method will incur check failure if scheduling
/// is not actually going on to guarantee strong consistency.
///
/// \param placement_group_id The placement group id scheduling is in progress.
virtual void MarkScheduleCancelled(const PlacementGroupID &placement_group_id) = 0;
virtual ~GcsPlacementGroupSchedulerInterface() {}
@@ -132,11 +136,11 @@ class GcsStrictSpreadStrategy : public GcsScheduleStrategy {
};
enum class LeasingState {
// TODO(sang): Use prepare and commit instead for 2PC.
/// The phase where lease requests haven't been returned.
SCHEDULING,
/// The phase where lease requests have returned
ALL_RETURNED,
/// The first phase of 2PC. It means requests to nodes are sent to prepare resources.
PREPARING,
/// The second phase of 2PC. It means that all prepare requests succeed, and GCS is
/// committing resources to each node.
COMMITTING,
/// Placement group has been removed, and this leasing is not valid.
CANCELLED
};
@@ -149,43 +153,122 @@ class LeaseStatusTracker {
std::vector<std::shared_ptr<BundleSpecification>> &unplaced_bundles);
~LeaseStatusTracker() = default;
bool MarkLeaseStarted(const ClientID &node_id,
std::shared_ptr<BundleSpecification> bundle);
void MarkLeaseReturned(const ClientID &node_id,
std::shared_ptr<BundleSpecification> bundle,
const Status &status);
bool IsAllLeaseRequestReturned() const;
bool IsLeasingSucceed() const;
/// Indicate the tracker that prepare requests are sent to a specific node.
///
/// \param node_id Id of a node where prepare request is sent.
/// \param bundle Bundle specification the node is supposed to prepare.
/// \return False if the prepare phase was already started. True otherwise.
bool MarkPreparePhaseStarted(const ClientID &node_id,
std::shared_ptr<BundleSpecification> bundle);
/// Indicate the tracker that all prepare requests are returned.
///
/// \param node_id Id of a node where prepare request is returned.
/// \param bundle Bundle specification the node was supposed to schedule.
/// \param status Status of the prepare response.
/// \param void
void MarkPrepareRequestReturned(const ClientID &node_id,
std::shared_ptr<BundleSpecification> bundle,
const Status &status);
/// Used to know if all prepare requests are returned.
///
/// \return True if all prepare requests are returned. False otherwise.
bool AllPrepareRequestsReturned() const;
/// Used to know if the prepare phase succeed.
///
/// \return True if all prepare requests were successful.
bool AllPrepareRequestsSuccessful() const;
/// Indicate the tracker that the commit request of a bundle from a node has returned.
///
/// \param node_id Id of a node where commit request is returned.
/// \param bundle Bundle specification the node was supposed to schedule.
/// \param status Status of the returned commit request.
void MarkCommitRequestReturned(const ClientID &node_id,
const std::shared_ptr<BundleSpecification> bundle,
const Status &status);
/// Used to know if all commit requests are returend.
///
/// \return True if all commit requests are returned. False otherwise.
bool AllCommitRequestReturned() const;
/// Used to know if the commit phase succeed.
///
/// \return True if all commit requests were successful..
bool AllCommitRequestsSuccessful() const;
/// Return a placement group this status tracker is associated with.
///
/// \return The placement group of this lease status tracker is tracking.
const std::shared_ptr<GcsPlacementGroup> &GetPlacementGroup() const;
const std::vector<std::shared_ptr<BundleSpecification>> &GetUnplacedBundles() const;
const std::shared_ptr<BundleLocations> &GetBundleLocations() const;
/// Return bundles that should be scheduled.
///
/// \return List of bundle specification that are supposed to be scheduled.
const std::vector<std::shared_ptr<BundleSpecification>> &GetBundlesToSchedule() const;
/// This method returns bundle locations that succeed to prepare resources.
///
/// \return Location of bundles that succeed to prepare resources on a node.
const std::shared_ptr<BundleLocations> &GetPreparedBundleLocations() const;
/// This method returns bundle locations that succeed to commit resources.
///
/// \return Location of bundles that succeed to commit resources on a node.
const std::shared_ptr<BundleLocations> &GetUnCommittedBundleLocations() const;
/// Return the leasing state.
///
/// \return Leasing state.
const LeasingState GetLeasingState() const;
/// Mark that this leasing is cancelled.
void MarkPlacementGroupScheduleCancelled();
/// Mark that the commit phase is started.
/// There's no need to mark commit phase is done because in that case, we won't need the
/// status tracker anymore.
void MarkCommitPhaseStarted();
private:
/// Method to update leasing states.
///
/// \param leasing_state The state to update.
/// \return True if succeeds to update. False otherwise.
bool UpdateLeasingState(LeasingState leasing_state);
/// Placement group of which this leasing context is associated with.
std::shared_ptr<GcsPlacementGroup> placement_group_;
/// Location of bundles that lease requests were sent.
/// If schedule success, the decision will be set as schedule_map[bundles[pos]]
/// Location of bundles that prepare requests were sent.
/// If prepare succeeds, the decision will be set as schedule_map[bundles[pos]]
/// else will be set ClientID::Nil().
std::shared_ptr<BundleLocations> bundle_locations_;
/// Number of lease requests that are returned.
size_t returned_count_ = 0;
std::shared_ptr<BundleLocations> preparing_bundle_locations_;
/// Number of prepare requests that are returned.
size_t prepare_request_returned_count_ = 0;
/// Number of commit requests that are returned.
size_t commit_request_returned_count_ = 0;
/// Location of bundles that commit requests failed.
std::shared_ptr<BundleLocations> uncommitted_bundle_locations_;
/// The leasing stage. This is used to know the state of current leasing context.
LeasingState leasing_state_ = LeasingState::SCHEDULING;
LeasingState leasing_state_ = LeasingState::PREPARING;
/// Map from node ID to the set of bundles for whom we are trying to acquire a lease
/// from that node. This is needed so that we can retry lease requests from the node
/// until we receive a reply or the node is removed.
/// TODO(sang): We don't currently handle retry.
absl::flat_hash_map<ClientID, absl::flat_hash_set<BundleID>>
node_to_bundles_when_leasing_;
/// Unplaced bundle specification for this leasing context.
std::vector<std::shared_ptr<BundleSpecification>> unplaced_bundles_;
node_to_bundles_when_preparing_;
/// Bundles to schedule.
std::vector<std::shared_ptr<BundleSpecification>> bundles_to_schedule_;
};
/// A data structure that helps fast bundle location lookup.
@@ -266,8 +349,8 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// Schedule unplaced bundles of the specified placement group.
/// If there is no available nodes then the `schedule_failed_handler` will be
/// triggered, otherwise the bundle in placement_group will be add into a queue and
/// schedule all bundle by calling ReserveResourceFromNode().
/// triggered, otherwise the bundle in placement_group will be added into a queue and
/// scheduled to all nodes.
///
/// \param placement_group to be scheduled.
/// \param failure_callback This function is called if the schedule is failed.
@@ -277,18 +360,21 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
std::function<void(std::shared_ptr<GcsPlacementGroup>)> failure_handler,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> success_handler) override;
/// Destroy bundle resources from all nodes in the placement group.
/// This doesn't do anything if bundles are already destroyed.
/// Destroy the actual bundle resources or locked resources (for 2PC)
/// on all nodes associated with this placement group.
/// The method is idempotent, meaning if all bundles are already cancelled,
/// this method won't do anything.
///
/// \param placement_group_id The id of a placement group to destroy all bundle
/// resources.
/// or locked resources.
void DestroyPlacementGroupBundleResourcesIfExists(
const PlacementGroupID &placement_group_id) override;
/// Mark the placement group schedule as cancelled.
/// Cancelled bundles will be destroyed.
/// \param placement_group_id The id of a placement group to mark that scheduling is
/// cancelled.
/// Mark the placement group scheduling is cancelled.
/// This method will incur check failure if scheduling
/// is not actually going on to guarantee strong consistency.
///
/// \param placement_group_id The placement group id scheduling is in progress.
void MarkScheduleCancelled(const PlacementGroupID &placement_group_id) override;
/// Get bundles belong to the specified node.
@@ -299,29 +385,66 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
const ClientID &node_id) override;
protected:
/// Lease resource from the specified node for the specified bundle.
void ReserveResourceFromNode(const std::shared_ptr<BundleSpecification> &bundle,
const std::shared_ptr<ray::rpc::GcsNodeInfo> &node,
const StatusCallback &callback);
/// Send a bundle PREPARE request to a node. The PREPARE request will lock resources
/// on a node until COMMIT or CANCEL requests are sent to a node.
///
/// \param bundle A bundle to schedule on a node.
/// \param node A node to prepare resources for a given bundle.
/// \param callback
void PrepareResources(const std::shared_ptr<BundleSpecification> &bundle,
const std::shared_ptr<ray::rpc::GcsNodeInfo> &node,
const StatusCallback &callback);
/// return resource for the specified node for the specified bundle.
/// Send a bundle COMMIT request to a node. This means the placement group creation
/// is ready and GCS will commit resources on a given node.
///
/// \param bundle A bundle to schedule on a node.
/// \param node A node to commit resources for a given bundle.
/// \param callback
void CommitResources(const std::shared_ptr<BundleSpecification> &bundle,
const std::shared_ptr<ray::rpc::GcsNodeInfo> &node,
const StatusCallback callback);
/// Cacnel prepared or committed resources from a node.
/// Nodes will be in charge of tracking state of a bundle.
/// This method is supposed to be idempotent.
///
/// \param bundle A description of the bundle to return.
/// \param node The node that the worker will be returned for.
void CancelResourceReserve(const std::shared_ptr<BundleSpecification> &bundle_spec,
const std::shared_ptr<ray::rpc::GcsNodeInfo> &node);
/// Get an existing lease client or connect a new one.
/// Get an existing lease client or connect a new one or connect a new one.
std::shared_ptr<ResourceReserveInterface> GetOrConnectLeaseClient(
const rpc::Address &raylet_address);
void OnAllBundleSchedulingRequestReturned(
/// Get an existing lease client for a given node.
std::shared_ptr<ResourceReserveInterface> GetLeaseClientFromNode(
const std::shared_ptr<ray::rpc::GcsNodeInfo> &node);
/// Called when all prepare requests are returned from nodes.
void OnAllBundlePrepareRequestReturned(
const std::shared_ptr<LeaseStatusTracker> &lease_status_tracker,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_failure_handler,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_success_handler);
/// Called when all commit requests are returned from nodes.
void OnAllBundleCommitRequestReturned(
const std::shared_ptr<LeaseStatusTracker> &lease_status_tracker,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_failure_handler,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_success_handler);
/// Commit all bundles recorded in lease status tracker.
void CommitAllBundles(const std::shared_ptr<LeaseStatusTracker> &lease_status_tracker,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_failure_handler,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_success_handler);
/// Generate schedule context.
std::unique_ptr<ScheduleContext> GetScheduleContext(
const PlacementGroupID &placement_group_id);
@@ -345,8 +468,8 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// A vector to store all the schedule strategy.
std::vector<std::shared_ptr<GcsScheduleStrategy>> scheduler_strategies_;
/// Index to lookup bundle locations of node or placement group.
BundleLocationIndex bundle_location_index_;
/// Index to lookup committed bundle locations of node or placement group.
BundleLocationIndex committed_bundle_location_index_;
/// Set of placement group that have lease requests in flight to nodes.
/// It is required to know if placement group has been removed or not.
@@ -120,8 +120,8 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
ASSERT_EQ(2, raylet_client_->num_lease_requested);
ASSERT_EQ(2, raylet_client_->lease_callbacks.size());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
WaitPendingDone(failure_placement_groups_, 0);
WaitPendingDone(success_placement_groups_, 1);
ASSERT_EQ(placement_group, success_placement_groups_.front());
@@ -152,8 +152,8 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
AddNode(Mocker::GenNodeInfo(0), 2);
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler,
success_handler);
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
WaitPendingDone(success_placement_groups_, 1);
}
@@ -229,8 +229,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReplyFailure) {
ASSERT_EQ(2, raylet_client_->lease_callbacks.size());
// Reply failure, so the placement group scheduling failed.
ASSERT_TRUE(raylet_client_->GrantResourceReserve(false));
ASSERT_TRUE(raylet_client_->GrantResourceReserve(false));
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources(false));
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources(false));
WaitPendingDone(failure_placement_groups_, 1);
WaitPendingDone(success_placement_groups_, 0);
ASSERT_EQ(placement_group, failure_placement_groups_.front());
@@ -285,8 +285,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource)
ASSERT_EQ(2, raylet_client_->num_lease_requested);
ASSERT_EQ(2, raylet_client_->lease_callbacks.size());
// One bundle success and the other failed.
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantResourceReserve(false));
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources(false));
ASSERT_EQ(1, raylet_client_->num_return_requested);
// Reply the placement_group creation request, then the placement_group should be
// scheduled successfully.
@@ -318,12 +318,12 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyBalancedScheduling)
success_handler);
if (!raylet_client_->lease_callbacks.empty()) {
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
++select_node0_count;
} else {
ASSERT_TRUE(raylet_client1_->GrantResourceReserve());
ASSERT_TRUE(raylet_client1_->GrantResourceReserve());
ASSERT_TRUE(raylet_client1_->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_client1_->GrantPrepareBundleResources());
++select_node1_count;
}
}
@@ -351,8 +351,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) {
Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::STRICT_PACK);
auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request);
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler);
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
WaitPendingDone(success_placement_groups_, 1);
// Node1 has less number of bundles, but it doesn't satisfy the resource
@@ -364,8 +364,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) {
auto placement_group2 =
std::make_shared<gcs::GcsPlacementGroup>(create_placement_group_request2);
scheduler_->ScheduleUnplacedBundles(placement_group2, failure_handler, success_handler);
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
WaitPendingDone(success_placement_groups_, 2);
}
@@ -390,15 +390,15 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyPlacementGroup) {
absl::MutexLock lock(&vector_mutex_);
success_placement_groups_.emplace_back(std::move(placement_group));
});
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
WaitPendingDone(failure_placement_groups_, 0);
WaitPendingDone(success_placement_groups_, 1);
RAY_LOG(ERROR) << "sanngbin1";
const auto &placement_group_id = placement_group->GetPlacementGroupID();
scheduler_->DestroyPlacementGroupBundleResourcesIfExists(placement_group_id);
ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve());
ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve());
// Subsequent destroy request should not do anything.
scheduler_->DestroyPlacementGroupBundleResourcesIfExists(placement_group_id);
ASSERT_FALSE(raylet_client_->GrantCancelResourceReserve());
@@ -429,9 +429,9 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyCancelledPlacementGroup) {
});
// Now, cancel the schedule request.
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
scheduler_->MarkScheduleCancelled(placement_group_id);
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve());
ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve());
WaitPendingDone(failure_placement_groups_, 1);
@@ -462,10 +462,10 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPackStrategyLargeBundlesScheduling) {
RAY_CHECK(raylet_client_->num_lease_requested > 0);
RAY_CHECK(raylet_client1_->num_lease_requested > 0);
for (int index = 0; index < raylet_client_->num_lease_requested; ++index) {
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
}
for (int index = 0; index < raylet_client1_->num_lease_requested; ++index) {
ASSERT_TRUE(raylet_client1_->GrantResourceReserve());
ASSERT_TRUE(raylet_client1_->GrantPrepareBundleResources());
}
WaitPendingDone(success_placement_groups_, 1);
}
@@ -492,8 +492,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestRescheduleWhenNodeDead) {
};
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler);
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client1_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_client1_->GrantPrepareBundleResources());
WaitPendingDone(success_placement_groups_, 1);
auto bundles_on_node0 =
@@ -509,8 +509,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestRescheduleWhenNodeDead) {
// TODO(ffbin): We need to see which node the other bundles that have been placed are
// deployed on, and spread them as far as possible. It will be implemented in the next
// pr.
raylet_client_->GrantResourceReserve();
raylet_client1_->GrantResourceReserve();
raylet_client_->GrantPrepareBundleResources();
raylet_client1_->GrantPrepareBundleResources();
WaitPendingDone(success_placement_groups_, 2);
}
@@ -543,8 +543,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictSpreadStrategyResourceCheck) {
auto node2 = Mocker::GenNodeInfo(2);
AddNode(node2);
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler);
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client2_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_client2_->GrantPrepareBundleResources());
WaitPendingDone(success_placement_groups_, 1);
}
@@ -158,14 +158,22 @@ struct GcsServerMocker {
class MockRayletResourceClient : public ResourceReserveInterface {
public:
void RequestResourceReserve(
void PrepareBundleResources(
const BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::RequestResourceReserveReply> &callback)
const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> &callback)
override {
num_lease_requested += 1;
lease_callbacks.push_back(callback);
}
void CommitBundleResources(
const BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::CommitBundleResourcesReply> &callback)
override {
rpc::CommitBundleResourcesReply reply;
callback(Status::OK(), reply);
}
void CancelResourceReserve(
BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::CancelResourceReserveReply> &callback)
@@ -175,9 +183,9 @@ struct GcsServerMocker {
}
// Trigger reply to RequestWorkerLease.
bool GrantResourceReserve(bool success = true) {
bool GrantPrepareBundleResources(bool success = true) {
Status status = Status::OK();
rpc::RequestResourceReserveReply reply;
rpc::PrepareBundleResourcesReply reply;
reply.set_success(success);
if (lease_callbacks.size() == 0) {
return false;
@@ -208,7 +216,7 @@ struct GcsServerMocker {
int num_lease_requested = 0;
int num_return_requested = 0;
ClientID node_id = ClientID::FromRandom();
std::list<rpc::ClientCallback<rpc::RequestResourceReserveReply>> lease_callbacks = {};
std::list<rpc::ClientCallback<rpc::PrepareBundleResourcesReply>> lease_callbacks = {};
std::list<rpc::ClientCallback<rpc::CancelResourceReserveReply>> return_callbacks = {};
};
class MockedGcsActorScheduler : public gcs::GcsActorScheduler {
+19 -6
View File
@@ -37,16 +37,24 @@ message RequestWorkerLeaseReply {
bool canceled = 4;
}
message RequestResourceReserveRequest {
message PrepareBundleResourcesRequest {
// Bundle containing the requested resources.
Bundle bundle_spec = 1;
}
message RequestResourceReserveReply {
// The status if resource reserve success.
message PrepareBundleResourcesReply {
// The status if prepare request was successful.
bool success = 1;
}
message CommitBundleResourcesRequest {
// Bundle containing the requested resources.
Bundle bundle_spec = 1;
}
message CommitBundleResourcesReply {
}
message CancelResourceReserveRequest {
// Bundle containing the requested resources.
Bundle bundle_spec = 1;
@@ -166,9 +174,14 @@ service NodeManagerService {
// are still needed. And Raylet will release other leased workers.
rpc ReleaseUnusedWorkers(ReleaseUnusedWorkersRequest)
returns (ReleaseUnusedWorkersReply);
// Request resource from the raylet for a bundle.
rpc RequestResourceReserve(RequestResourceReserveRequest)
returns (RequestResourceReserveReply);
// Request a raylet to lock resources for a bundle.
// This is the first phase of 2PC protocol for atomic placement group creation.
rpc PrepareBundleResources(PrepareBundleResourcesRequest)
returns (PrepareBundleResourcesReply);
// Commit bundle resources to a raylet.
// This is the second phase of 2PC protocol for atomic placement group creation.
rpc CommitBundleResources(CommitBundleResourcesRequest)
returns (CommitBundleResourcesReply);
// Return resource for the raylet.
rpc CancelResourceReserve(CancelResourceReserveRequest)
returns (CancelResourceReserveReply);
+13 -5
View File
@@ -1795,12 +1795,13 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
SubmitTask(task);
}
void NodeManager::HandleRequestResourceReserve(
const rpc::RequestResourceReserveRequest &request,
rpc::RequestResourceReserveReply *reply, rpc::SendReplyCallback send_reply_callback) {
RAY_CHECK(!new_scheduler_enabled_) << "Not implemented";
void NodeManager::HandlePrepareBundleResources(
const rpc::PrepareBundleResourcesRequest &request,
rpc::PrepareBundleResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) {
// TODO(sang): Port this onto the new scheduler.
RAY_CHECK(!new_scheduler_enabled_) << "Not implemented yet.";
auto bundle_spec = BundleSpecification(request.bundle_spec());
RAY_LOG(DEBUG) << "bundle lease request " << bundle_spec.BundleId().first
RAY_LOG(DEBUG) << "bundle prepare request " << bundle_spec.BundleId().first
<< bundle_spec.BundleId().second;
auto resource_ids = ScheduleBundle(cluster_resource_map_, bundle_spec);
if (resource_ids.AvailableResources().size() == 0) {
@@ -1815,6 +1816,13 @@ void NodeManager::HandleRequestResourceReserve(
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
void NodeManager::HandleCommitBundleResources(
const rpc::CommitBundleResourcesRequest &request,
rpc::CommitBundleResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) {
send_reply_callback(Status::OK(), nullptr, nullptr);
// TODO(sang): Implement this in the next PR.
}
void NodeManager::HandleCancelResourceReserve(
const rpc::CancelResourceReserveRequest &request,
rpc::CancelResourceReserveReply *reply, rpc::SendReplyCallback send_reply_callback) {
+8 -3
View File
@@ -552,11 +552,16 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// \return Status indicating whether setup was successful.
ray::Status SetupPlasmaSubscription();
/// Handle a `ResourcesLease` request.
void HandleRequestResourceReserve(const rpc::RequestResourceReserveRequest &request,
rpc::RequestResourceReserveReply *reply,
/// Handle a `PrepareBundleResources` request.
void HandlePrepareBundleResources(const rpc::PrepareBundleResourcesRequest &request,
rpc::PrepareBundleResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `CommitBundleResources` request.
void HandleCommitBundleResources(const rpc::CommitBundleResourcesRequest &request,
rpc::CommitBundleResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `ResourcesReturn` request.
void HandleCancelResourceReserve(const rpc::CancelResourceReserveRequest &request,
rpc::CancelResourceReserveReply *reply,
+12 -4
View File
@@ -378,12 +378,20 @@ void raylet::RayletClient::CancelWorkerLease(
grpc_client_->CancelWorkerLease(request, callback);
}
void raylet::RayletClient::RequestResourceReserve(
void raylet::RayletClient::PrepareBundleResources(
const BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::RequestResourceReserveReply> &callback) {
rpc::RequestResourceReserveRequest request;
const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> &callback) {
rpc::PrepareBundleResourcesRequest request;
request.mutable_bundle_spec()->CopyFrom(bundle_spec.GetMessage());
grpc_client_->RequestResourceReserve(request, callback);
grpc_client_->PrepareBundleResources(request, callback);
}
void raylet::RayletClient::CommitBundleResources(
const BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::CommitBundleResourcesReply> &callback) {
rpc::CommitBundleResourcesRequest request;
request.mutable_bundle_spec()->CopyFrom(bundle_spec.GetMessage());
grpc_client_->CommitBundleResources(request, callback);
}
void raylet::RayletClient::CancelResourceReserve(
+27 -8
View File
@@ -91,14 +91,27 @@ class WorkerLeaseInterface {
/// Interface for leasing resource.
class ResourceReserveInterface {
public:
/// Requests a resource from the raylet. The callback will be sent via gRPC.
/// \param resource_spec Resources that should be allocated for the worker.
/// Request a raylet to prepare resources of a given bundle for atomic placement group
/// creation. This is used for the first phase of atomic placement group creation. The
/// callback will be sent via gRPC.
/// \param resource_spec Resources that should be
/// allocated for the worker.
/// \return ray::Status
virtual void RequestResourceReserve(
virtual void PrepareBundleResources(
const BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::RequestResourceReserveReply>
const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply>
&callback) = 0;
/// Request a raylet to commit resources of a given bundle for atomic placement group
/// creation. This is used for the first phase of atomic placement group creation. The
/// callback will be sent via gRPC.
/// \param resource_spec Resources that should be
/// allocated for the worker.
/// \return ray::Status
virtual void CommitBundleResources(
const BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::CommitBundleResourcesReply> &callback) = 0;
virtual void CancelResourceReserve(
BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::CancelResourceReserveReply> &callback) = 0;
@@ -351,13 +364,19 @@ class RayletClient : public PinObjectsInterface,
const TaskID &task_id,
const rpc::ClientCallback<rpc::CancelWorkerLeaseReply> &callback) override;
/// Implements ResourceReserveInterface.
void RequestResourceReserve(
/// Implements PrepareBundleResourcesInterface.
void PrepareBundleResources(
const BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::RequestResourceReserveReply> &callback)
const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> &callback)
override;
/// Implements ResourceReserveInterface.
/// Implements CommitBundleResourcesInterface.
void CommitBundleResources(
const BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::CommitBundleResourcesReply> &callback)
override;
/// Implements CancelResourceReserveInterface.
void CancelResourceReserve(
BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::CancelResourceReserveReply> &callback)
@@ -82,8 +82,11 @@ class NodeManagerWorkerClient
/// Cancel a pending worker lease request.
VOID_RPC_CLIENT_METHOD(NodeManagerService, CancelWorkerLease, grpc_client_, )
/// Request resource lease.
VOID_RPC_CLIENT_METHOD(NodeManagerService, RequestResourceReserve, grpc_client_, )
/// Request prepare resources for an atomic placement group creation.
VOID_RPC_CLIENT_METHOD(NodeManagerService, PrepareBundleResources, grpc_client_, )
/// Request commit resources for an atomic placement group creation.
VOID_RPC_CLIENT_METHOD(NodeManagerService, CommitBundleResources, grpc_client_, )
/// Return resource lease.
VOID_RPC_CLIENT_METHOD(NodeManagerService, CancelResourceReserve, grpc_client_, )
+10 -4
View File
@@ -32,7 +32,8 @@ namespace rpc {
RPC_SERVICE_HANDLER(NodeManagerService, GetNodeStats) \
RPC_SERVICE_HANDLER(NodeManagerService, GlobalGC) \
RPC_SERVICE_HANDLER(NodeManagerService, FormatGlobalMemoryInfo) \
RPC_SERVICE_HANDLER(NodeManagerService, RequestResourceReserve) \
RPC_SERVICE_HANDLER(NodeManagerService, PrepareBundleResources) \
RPC_SERVICE_HANDLER(NodeManagerService, CommitBundleResources) \
RPC_SERVICE_HANDLER(NodeManagerService, CancelResourceReserve) \
RPC_SERVICE_HANDLER(NodeManagerService, RequestObjectSpillage)
@@ -66,9 +67,14 @@ class NodeManagerServiceHandler {
rpc::CancelWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) = 0;
virtual void HandleRequestResourceReserve(
const rpc::RequestResourceReserveRequest &request,
rpc::RequestResourceReserveReply *reply,
virtual void HandlePrepareBundleResources(
const rpc::PrepareBundleResourcesRequest &request,
rpc::PrepareBundleResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) = 0;
virtual void HandleCommitBundleResources(
const rpc::CommitBundleResourcesRequest &request,
rpc::CommitBundleResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) = 0;
virtual void HandleCancelResourceReserve(