From dc7fe1a4c53c1eabde2f04bcc0a50fa1106c4ce0 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 2 Sep 2020 18:14:46 -0700 Subject: [PATCH] [Placement Group] Atomic Placement Group Part 1, Basic Structure. (#10482) * Write a test. * Basic structure done. * Reduce flakiness of tests. * Addressed code review. * Skipping tests because it is flaky for now. * Fix linting issues. * Increase sleep time to see lint messages. * Lint issue fixed. --- .travis.yml | 2 +- python/ray/tests/test_placement_group.py | 63 ++++ .../gcs_placement_group_scheduler.cc | 334 +++++++++++++----- .../gcs_placement_group_scheduler.h | 207 ++++++++--- .../gcs_placement_group_scheduler_test.cc | 58 +-- .../gcs_server/test/gcs_server_test_util.h | 18 +- src/ray/protobuf/node_manager.proto | 25 +- src/ray/raylet/node_manager.cc | 18 +- src/ray/raylet/node_manager.h | 11 +- src/ray/raylet_client/raylet_client.cc | 16 +- src/ray/raylet_client/raylet_client.h | 35 +- .../rpc/node_manager/node_manager_client.h | 7 +- .../rpc/node_manager/node_manager_server.h | 14 +- 13 files changed, 602 insertions(+), 206 deletions(-) diff --git a/.travis.yml b/.travis.yml index 7ebea95b9..af31662b6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -159,7 +159,7 @@ matrix: - . ./ci/travis/ci.sh lint - . ./ci/travis/ci.sh build script: - - true # we still need this block to exist, otherwise it will fall back to the global one + - sleep 30 # we still need this block to exist, otherwise it will fall back to the global one # Build MacOS wheels and MacOS jars - os: osx diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 0b5fdf67f..c4ba42740 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -632,5 +632,68 @@ def test_schedule_placement_group_when_node_add(ray_start_cluster): wait_for_condition(is_placement_group_created) +@pytest.mark.skip(reason="Not working yet") +def test_atomic_creation(ray_start_cluster): + # Setup cluster. + cluster = ray_start_cluster + bundle_cpu_size = 2 + bundle_per_node = 2 + num_nodes = 5 + + nodes = [ + cluster.add_node(num_cpus=bundle_cpu_size * bundle_per_node) + for _ in range(num_nodes) + ] + ray.init(address=cluster.address) + + @ray.remote(num_cpus=1) + class NormalActor: + def ping(self): + pass + + # Create an actor that will fail bundle scheduling. + # It is important to use pack strategy to make test less flaky. + pg = ray.experimental.placement_group( + name="name", + strategy="PACK", + bundles=[{ + "CPU": bundle_cpu_size + } for _ in range(num_nodes * bundle_per_node)]) + + # Create a placement group actor. + # This shouldn't be scheduled until placement group creation is done. + pg_actor = NormalActor.options( + placement_group=pg, + placement_group_bundle_index=num_nodes * bundle_per_node - 1).remote() + # Destroy some nodes to fail placement group creation. + nodes_to_kill = get_other_nodes(cluster, exclude_head=True) + for node_to_kill in nodes_to_kill: + cluster.remove_node(node_to_kill) + + # Wait on the placement group now. It should be unready + # because normal actor takes resources that are required + # for one of bundle creation. + ready, unready = ray.wait([pg.ready()], timeout=0) + assert len(ready) == 0 + assert len(unready) == 1 + + # Add a node back to schedule placement group. + for _ in range(len(nodes_to_kill)): + nodes.append( + cluster.add_node(num_cpus=bundle_cpu_size * bundle_per_node)) + # Wait on the placement group creation. + ready, unready = ray.wait([pg.ready()]) + assert len(ready) == 1 + assert len(unready) == 0 + + # Confirm that the placement group actor is created. It will + # raise an exception if actor was scheduled before placement group was + # created. + # TODO(sang): This with statement should be removed after atomic creation + # is implemented. It will be done in the next PR. + with pytest.raises(ray.exceptions.RayActorError): + ray.get(pg_actor.ping.remote(), timeout=3.0) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index b836eed5a..067d07234 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -224,40 +224,56 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( for (auto &bundle : bundles) { const auto &bundle_id = bundle->BundleId(); const auto &node_id = selected_nodes[bundle_id]; - lease_status_tracker->MarkLeaseStarted(node_id, bundle); + lease_status_tracker->MarkPreparePhaseStarted(node_id, bundle); // TODO(sang): The callback might not be called at all if nodes are dead. We should // handle this case properly. - ReserveResourceFromNode( - bundle, gcs_node_manager_.GetNode(node_id), - [this, bundle, node_id, lease_status_tracker, failure_callback, - success_callback](const Status &status) { - lease_status_tracker->MarkLeaseReturned(node_id, bundle, status); - if (lease_status_tracker->IsAllLeaseRequestReturned()) { - OnAllBundleSchedulingRequestReturned(lease_status_tracker, failure_callback, - success_callback); - } - }); + PrepareResources(bundle, gcs_node_manager_.GetNode(node_id), + [this, bundle, node_id, lease_status_tracker, failure_callback, + success_callback](const Status &status) { + lease_status_tracker->MarkPrepareRequestReturned(node_id, bundle, + status); + if (lease_status_tracker->AllPrepareRequestsReturned()) { + OnAllBundlePrepareRequestReturned( + lease_status_tracker, failure_callback, success_callback); + } + }); } } void GcsPlacementGroupScheduler::DestroyPlacementGroupBundleResourcesIfExists( const PlacementGroupID &placement_group_id) { + bool is_committed = false; + bool is_prepared = false; + std::shared_ptr bundle_locations = std::make_shared(); + + // Check if we can find committed bundle locations. const auto &maybe_bundle_locations = - bundle_location_index_.GetBundleLocations(placement_group_id); + committed_bundle_location_index_.GetBundleLocations(placement_group_id); // If bundle location has been already removed, it means bundles // are already destroyed. Do nothing. - if (!maybe_bundle_locations.has_value()) { - return; + if (maybe_bundle_locations.has_value()) { + is_committed = true; + bundle_locations = maybe_bundle_locations.value(); } - const auto &bundle_locations = maybe_bundle_locations.value(); + auto it = placement_group_leasing_in_progress_.find(placement_group_id); + if (it != placement_group_leasing_in_progress_.end()) { + const auto &leasing_context = it->second; + is_prepared = true; + bundle_locations = leasing_context->GetPreparedBundleLocations(); + } + + RAY_CHECK(!(is_committed && is_prepared)) + << "Anomaly detected. It shouldn't be possible that placement group is both " + "committing and preparing."; + // Cancel all resource reservation. for (const auto &iter : *(bundle_locations)) { auto &bundle_spec = iter.second.second; auto &node_id = iter.second.first; CancelResourceReserve(bundle_spec, gcs_node_manager_.GetNode(node_id)); } - bundle_location_index_.Erase(placement_group_id); + committed_bundle_location_index_.Erase(placement_group_id); } void GcsPlacementGroupScheduler::MarkScheduleCancelled( @@ -267,21 +283,16 @@ void GcsPlacementGroupScheduler::MarkScheduleCancelled( it->second->MarkPlacementGroupScheduleCancelled(); } -void GcsPlacementGroupScheduler::ReserveResourceFromNode( +void GcsPlacementGroupScheduler::PrepareResources( const std::shared_ptr &bundle, const std::shared_ptr &node, const StatusCallback &callback) { - rpc::Address remote_address; - remote_address.set_raylet_id(node->node_id()); - remote_address.set_ip_address(node->node_manager_address()); - remote_address.set_port(node->node_manager_port()); - auto node_id = ClientID::FromBinary(node->node_id()); - auto lease_client = GetOrConnectLeaseClient(remote_address); - RAY_LOG(INFO) << "Leasing resource from node " << node_id - << " for bundle: " << bundle->DebugString(); - lease_client->RequestResourceReserve( + const auto lease_client = GetLeaseClientFromNode(node); + const auto &node_id = node->node_id(); + RAY_LOG(INFO) << "Preparing resource from node " << node_id + << " for a bundle: " << bundle->DebugString(); + lease_client->PrepareBundleResources( *bundle, [node_id, bundle, callback]( - const Status &status, const rpc::RequestResourceReserveReply &reply) { - // TODO(AlisaWu): Add placement group cancel. + const Status &status, const rpc::PrepareBundleResourcesReply &reply) { auto result = reply.success() ? Status::OK() : Status::IOError("Failed to reserve resource"); if (result.ok()) { @@ -295,6 +306,29 @@ void GcsPlacementGroupScheduler::ReserveResourceFromNode( }); } +void GcsPlacementGroupScheduler::CommitResources( + const std::shared_ptr &bundle, + const std::shared_ptr &node, const StatusCallback callback) { + RAY_CHECK(node != nullptr); + const auto lease_client = GetLeaseClientFromNode(node); + const auto &node_id = node->node_id(); + RAY_LOG(INFO) << "Committing resource to a node " << node_id + << " for a bundle: " << bundle->DebugString(); + lease_client->CommitBundleResources( + *bundle, [bundle, node_id, callback](const Status &status, + const rpc::CommitBundleResourcesReply &reply) { + if (status.ok()) { + RAY_LOG(INFO) << "Finished committing resource to " << node_id + << " for bundle: " << bundle->DebugString(); + } else { + RAY_LOG(WARNING) << "Failed to commit resource to " << node_id + << " for bundle: " << bundle->DebugString(); + } + RAY_CHECK(callback); + callback(status); + }); +} + void GcsPlacementGroupScheduler::CancelResourceReserve( const std::shared_ptr &bundle_spec, const std::shared_ptr &node) { @@ -308,11 +342,7 @@ void GcsPlacementGroupScheduler::CancelResourceReserve( auto node_id = ClientID::FromBinary(node->node_id()); RAY_LOG(INFO) << "Cancelling the resource reserved for bundle: " << bundle_spec->DebugString() << " at node " << node_id; - rpc::Address remote_address; - remote_address.set_raylet_id(node->node_id()); - remote_address.set_ip_address(node->node_manager_address()); - remote_address.set_port(node->node_manager_port()); - auto return_client = GetOrConnectLeaseClient(remote_address); + const auto return_client = GetLeaseClientFromNode(node); return_client->CancelResourceReserve( *bundle_spec, [bundle_spec, node_id](const Status &status, const rpc::CancelResourceReserveReply &reply) { @@ -332,69 +362,150 @@ GcsPlacementGroupScheduler::GetOrConnectLeaseClient(const rpc::Address &raylet_a return iter->second; } -void GcsPlacementGroupScheduler::OnAllBundleSchedulingRequestReturned( +std::shared_ptr +GcsPlacementGroupScheduler::GetLeaseClientFromNode( + const std::shared_ptr &node) { + rpc::Address remote_address; + remote_address.set_raylet_id(node->node_id()); + remote_address.set_ip_address(node->node_manager_address()); + remote_address.set_port(node->node_manager_port()); + return GetOrConnectLeaseClient(remote_address); +} + +void GcsPlacementGroupScheduler::CommitAllBundles( const std::shared_ptr &lease_status_tracker, const std::function)> &schedule_failure_handler, const std::function)> &schedule_success_handler) { - RAY_CHECK(lease_status_tracker->IsAllLeaseRequestReturned()) - << "This method can be called only after all bundle scheduling requests are " - "returend."; - const auto &placement_group = lease_status_tracker->GetPlacementGroup(); - const auto &bundles = lease_status_tracker->GetUnplacedBundles(); - const auto &bundle_locations = lease_status_tracker->GetBundleLocations(); - const auto &placement_group_id = placement_group->GetPlacementGroupID(); - bundle_location_index_.AddBundleLocations(placement_group_id, bundle_locations); + const std::shared_ptr &prepared_bundle_locations = + lease_status_tracker->GetPreparedBundleLocations(); + lease_status_tracker->MarkCommitPhaseStarted(); + for (const auto &bundle_to_commit : *prepared_bundle_locations) { + const auto &node_id = bundle_to_commit.second.first; + const auto &node = gcs_node_manager_.GetNode(node_id); + const auto &bundle = bundle_to_commit.second.second; - if (!lease_status_tracker->IsLeasingSucceed()) { - // If the lease request has been already cancelled - // or not every lease request succeeds. - DestroyPlacementGroupBundleResourcesIfExists(placement_group_id); - schedule_failure_handler(placement_group); - } else { - // If we successfully created placement group, store them to GCS. - rpc::ScheduleData data; - for (const auto &iter : bundles) { - // TODO(ekl) this is a hack to get a string key for the proto - auto key = iter->PlacementGroupId().Hex() + "_" + std::to_string(iter->Index()); - data.mutable_schedule_plan()->insert( - {key, (*bundle_locations)[iter->BundleId()].first.Binary()}); - } - RAY_CHECK_OK(gcs_table_storage_->PlacementGroupScheduleTable().Put( - placement_group_id, data, - [schedule_success_handler, placement_group](Status status) { - schedule_success_handler(placement_group); - })); - - for (const auto &iter : *bundle_locations) { - const auto &location = iter.second; - placement_group->GetMutableBundle(location.second->Index()) - ->set_node_id(location.first.Binary()); - } + // TODO(sang) Handle the case nodes are dead. + CommitResources( + bundle, node, + [this, lease_status_tracker, bundle, node_id, schedule_failure_handler, + schedule_success_handler](const Status &status) { + lease_status_tracker->MarkCommitRequestReturned(node_id, bundle, status); + if (lease_status_tracker->AllCommitRequestReturned()) { + OnAllBundleCommitRequestReturned( + lease_status_tracker, schedule_failure_handler, schedule_success_handler); + } + }); } +} + +void GcsPlacementGroupScheduler::OnAllBundlePrepareRequestReturned( + const std::shared_ptr &lease_status_tracker, + const std::function)> + &schedule_failure_handler, + const std::function)> + &schedule_success_handler) { + RAY_CHECK(lease_status_tracker->AllPrepareRequestsReturned()) + << "This method can be called only after all bundle scheduling requests are " + "returned."; + const auto &placement_group = lease_status_tracker->GetPlacementGroup(); + const auto &bundles = lease_status_tracker->GetBundlesToSchedule(); + const auto &prepared_bundle_locations = + lease_status_tracker->GetPreparedBundleLocations(); + const auto &placement_group_id = placement_group->GetPlacementGroupID(); + + if (!lease_status_tracker->AllPrepareRequestsSuccessful()) { + // Erase the status tracker from a in-memory map if exists. + DestroyPlacementGroupBundleResourcesIfExists(placement_group_id); + auto it = placement_group_leasing_in_progress_.find(placement_group_id); + RAY_CHECK(it != placement_group_leasing_in_progress_.end()); + placement_group_leasing_in_progress_.erase(it); + schedule_failure_handler(placement_group); + return; + } + + // If the prepare requests succeed, update the bundle location. + for (const auto &iter : *prepared_bundle_locations) { + const auto &location = iter.second; + placement_group->GetMutableBundle(location.second->Index()) + ->set_node_id(location.first.Binary()); + } + + // Store data to GCS. + rpc::ScheduleData data; + for (const auto &iter : bundles) { + // TODO(ekl) this is a hack to get a string key for the proto + auto key = iter->PlacementGroupId().Hex() + "_" + std::to_string(iter->Index()); + data.mutable_schedule_plan()->insert( + {key, (*prepared_bundle_locations)[iter->BundleId()].first.Binary()}); + } + RAY_CHECK_OK(gcs_table_storage_->PlacementGroupScheduleTable().Put( + placement_group_id, data, + [this, schedule_success_handler, schedule_failure_handler, + lease_status_tracker](Status status) { + CommitAllBundles(lease_status_tracker, schedule_failure_handler, + schedule_success_handler); + })); +} + +void GcsPlacementGroupScheduler::OnAllBundleCommitRequestReturned( + const std::shared_ptr &lease_status_tracker, + const std::function)> + &schedule_failure_handler, + const std::function)> + &schedule_success_handler) { + const auto &placement_group = lease_status_tracker->GetPlacementGroup(); + const auto &prepared_bundle_locations = + lease_status_tracker->GetPreparedBundleLocations(); + const auto &placement_group_id = placement_group->GetPlacementGroupID(); + + // Clean up the leasing progress map. auto it = placement_group_leasing_in_progress_.find(placement_group_id); RAY_CHECK(it != placement_group_leasing_in_progress_.end()); placement_group_leasing_in_progress_.erase(it); + + // Add a prepared bundle locations to committed bundle locations. + committed_bundle_location_index_.AddBundleLocations(placement_group_id, + prepared_bundle_locations); + + if (!lease_status_tracker->AllCommitRequestsSuccessful()) { + if (lease_status_tracker->GetLeasingState() == LeasingState::CANCELLED) { + DestroyPlacementGroupBundleResourcesIfExists(placement_group_id); + } else { + // Update the state to be reschedule so that the failure handle will reschedule the + // failed bundles. + const auto &uncommitted_bundle_locations = + lease_status_tracker->GetUnCommittedBundleLocations(); + for (const auto &bundle : *uncommitted_bundle_locations) { + placement_group->GetMutableBundle(bundle.first.second)->clear_node_id(); + } + placement_group->UpdateState(rpc::PlacementGroupTableData::RESCHEDULING); + } + schedule_failure_handler(placement_group); + return; + } + schedule_success_handler(placement_group); } std::unique_ptr GcsPlacementGroupScheduler::GetScheduleContext( const PlacementGroupID &placement_group_id) { auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes(); - bundle_location_index_.AddNodes(alive_nodes); + committed_bundle_location_index_.AddNodes(alive_nodes); auto node_to_bundles = std::make_shared>(); for (const auto &node_it : alive_nodes) { const auto &node_id = node_it.first; const auto &bundle_locations_on_node = - bundle_location_index_.GetBundleLocationsOnNode(node_id); + committed_bundle_location_index_.GetBundleLocationsOnNode(node_id); RAY_CHECK(bundle_locations_on_node) << "Bundle locations haven't been registered for node id " << node_id; const int bundles_size = bundle_locations_on_node.value()->size(); node_to_bundles->emplace(node_id, bundles_size); } - auto &bundle_locations = bundle_location_index_.GetBundleLocations(placement_group_id); + auto &bundle_locations = + committed_bundle_location_index_.GetBundleLocations(placement_group_id); return std::unique_ptr(new ScheduleContext( std::move(node_to_bundles), bundle_locations, gcs_node_manager_)); } @@ -403,7 +514,7 @@ absl::flat_hash_map> GcsPlacementGroupScheduler::GetBundlesOnNode(const ClientID &node_id) { absl::flat_hash_map> bundles_on_node; const auto &maybe_bundle_locations = - bundle_location_index_.GetBundleLocationsOnNode(node_id); + committed_bundle_location_index_.GetBundleLocationsOnNode(node_id); if (maybe_bundle_locations.has_value()) { const auto &bundle_locations = maybe_bundle_locations.value(); for (auto &bundle : *bundle_locations) { @@ -411,7 +522,7 @@ GcsPlacementGroupScheduler::GetBundlesOnNode(const ClientID &node_id) { const auto &bundle_index = bundle.first.second; bundles_on_node[bundle_placement_group_id].push_back(bundle_index); } - bundle_location_index_.Erase(node_id); + committed_bundle_location_index_.Erase(node_id); } return bundles_on_node; } @@ -507,22 +618,23 @@ void BundleLocationIndex::AddNodes( LeaseStatusTracker::LeaseStatusTracker( std::shared_ptr placement_group, std::vector> &unplaced_bundles) - : placement_group_(placement_group), unplaced_bundles_(unplaced_bundles) { - bundle_locations_ = std::make_shared(); + : placement_group_(placement_group), bundles_to_schedule_(unplaced_bundles) { + preparing_bundle_locations_ = std::make_shared(); + uncommitted_bundle_locations_ = std::make_shared(); } -bool LeaseStatusTracker::MarkLeaseStarted(const ClientID &node_id, - std::shared_ptr bundle) { +bool LeaseStatusTracker::MarkPreparePhaseStarted( + const ClientID &node_id, std::shared_ptr bundle) { const auto &bundle_id = bundle->BundleId(); - return node_to_bundles_when_leasing_[node_id].emplace(bundle_id).second; + return node_to_bundles_when_preparing_[node_id].emplace(bundle_id).second; } -void LeaseStatusTracker::MarkLeaseReturned( +void LeaseStatusTracker::MarkPrepareRequestReturned( const ClientID &node_id, const std::shared_ptr bundle, const Status &status) { - RAY_CHECK(returned_count_ <= unplaced_bundles_.size()); - auto leasing_bundles = node_to_bundles_when_leasing_.find(node_id); - RAY_CHECK(leasing_bundles != node_to_bundles_when_leasing_.end()); + RAY_CHECK(prepare_request_returned_count_ <= bundles_to_schedule_.size()); + auto leasing_bundles = node_to_bundles_when_preparing_.find(node_id); + RAY_CHECK(leasing_bundles != node_to_bundles_when_preparing_.end()); auto bundle_iter = leasing_bundles->second.find(bundle->BundleId()); RAY_CHECK(bundle_iter != leasing_bundles->second.end()); @@ -530,42 +642,66 @@ void LeaseStatusTracker::MarkLeaseReturned( // remote node. leasing_bundles->second.erase(bundle_iter); if (leasing_bundles->second.empty()) { - node_to_bundles_when_leasing_.erase(leasing_bundles); + node_to_bundles_when_preparing_.erase(leasing_bundles); } // If the request succeeds, record it. const auto &bundle_id = bundle->BundleId(); if (status.ok()) { - bundle_locations_->emplace(bundle_id, std::make_pair(node_id, bundle)); - } - returned_count_ += 1; - // If every bundle lease request is returned, mark it as all returned. - if (IsAllLeaseRequestReturned()) { - UpdateLeasingState(LeasingState::ALL_RETURNED); + preparing_bundle_locations_->emplace(bundle_id, std::make_pair(node_id, bundle)); } + prepare_request_returned_count_ += 1; } -bool LeaseStatusTracker::IsAllLeaseRequestReturned() const { - return returned_count_ == unplaced_bundles_.size(); +bool LeaseStatusTracker::AllPrepareRequestsReturned() const { + return prepare_request_returned_count_ == bundles_to_schedule_.size(); } -bool LeaseStatusTracker::IsLeasingSucceed() const { - return IsAllLeaseRequestReturned() && - (bundle_locations_->size() == unplaced_bundles_.size()) && +bool LeaseStatusTracker::AllPrepareRequestsSuccessful() const { + return AllPrepareRequestsReturned() && + (preparing_bundle_locations_->size() == bundles_to_schedule_.size()) && (leasing_state_ != LeasingState::CANCELLED); } +void LeaseStatusTracker::MarkCommitRequestReturned( + const ClientID &node_id, const std::shared_ptr bundle, + const Status &status) { + commit_request_returned_count_ += 1; + // If the request succeeds, record it. + const auto &bundle_id = bundle->BundleId(); + if (!status.ok()) { + uncommitted_bundle_locations_->emplace(bundle_id, std::make_pair(node_id, bundle)); + } +} + +bool LeaseStatusTracker::AllCommitRequestReturned() const { + return commit_request_returned_count_ == bundles_to_schedule_.size(); +} + +bool LeaseStatusTracker::AllCommitRequestsSuccessful() const { + // We don't check cancel state here because we shouldn't destroy bundles when + // commit requests failed. Cancel state should be treated separately. + return AllCommitRequestReturned() && + preparing_bundle_locations_->size() == bundles_to_schedule_.size(); +} + const std::shared_ptr &LeaseStatusTracker::GetPlacementGroup() const { return placement_group_; } -const std::shared_ptr &LeaseStatusTracker::GetBundleLocations() const { - return bundle_locations_; +const std::shared_ptr &LeaseStatusTracker::GetPreparedBundleLocations() + const { + return preparing_bundle_locations_; +} + +const std::shared_ptr + &LeaseStatusTracker::GetUnCommittedBundleLocations() const { + return uncommitted_bundle_locations_; } const std::vector> - &LeaseStatusTracker::GetUnplacedBundles() const { - return unplaced_bundles_; + &LeaseStatusTracker::GetBundlesToSchedule() const { + return bundles_to_schedule_; } const LeasingState LeaseStatusTracker::GetLeasingState() const { return leasing_state_; } @@ -583,5 +719,9 @@ bool LeaseStatusTracker::UpdateLeasingState(LeasingState leasing_state) { return true; } +void LeaseStatusTracker::MarkCommitPhaseStarted() { + UpdateLeasingState(LeasingState::COMMITTING); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index d886ebb5a..be9b998ee 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -66,7 +66,11 @@ class GcsPlacementGroupSchedulerInterface { virtual void DestroyPlacementGroupBundleResourcesIfExists( const PlacementGroupID &placement_group_id) = 0; - /// Mark the placement group schedule as cancelled. Cancelled bundles will be destroyed. + /// Mark the placement group scheduling is cancelled. + /// This method will incur check failure if scheduling + /// is not actually going on to guarantee strong consistency. + /// + /// \param placement_group_id The placement group id scheduling is in progress. virtual void MarkScheduleCancelled(const PlacementGroupID &placement_group_id) = 0; virtual ~GcsPlacementGroupSchedulerInterface() {} @@ -132,11 +136,11 @@ class GcsStrictSpreadStrategy : public GcsScheduleStrategy { }; enum class LeasingState { - // TODO(sang): Use prepare and commit instead for 2PC. - /// The phase where lease requests haven't been returned. - SCHEDULING, - /// The phase where lease requests have returned - ALL_RETURNED, + /// The first phase of 2PC. It means requests to nodes are sent to prepare resources. + PREPARING, + /// The second phase of 2PC. It means that all prepare requests succeed, and GCS is + /// committing resources to each node. + COMMITTING, /// Placement group has been removed, and this leasing is not valid. CANCELLED }; @@ -149,43 +153,122 @@ class LeaseStatusTracker { std::vector> &unplaced_bundles); ~LeaseStatusTracker() = default; - bool MarkLeaseStarted(const ClientID &node_id, - std::shared_ptr bundle); - void MarkLeaseReturned(const ClientID &node_id, - std::shared_ptr bundle, - const Status &status); - bool IsAllLeaseRequestReturned() const; - bool IsLeasingSucceed() const; + /// Indicate the tracker that prepare requests are sent to a specific node. + /// + /// \param node_id Id of a node where prepare request is sent. + /// \param bundle Bundle specification the node is supposed to prepare. + /// \return False if the prepare phase was already started. True otherwise. + bool MarkPreparePhaseStarted(const ClientID &node_id, + std::shared_ptr bundle); + + /// Indicate the tracker that all prepare requests are returned. + /// + /// \param node_id Id of a node where prepare request is returned. + /// \param bundle Bundle specification the node was supposed to schedule. + /// \param status Status of the prepare response. + /// \param void + void MarkPrepareRequestReturned(const ClientID &node_id, + std::shared_ptr bundle, + const Status &status); + + /// Used to know if all prepare requests are returned. + /// + /// \return True if all prepare requests are returned. False otherwise. + bool AllPrepareRequestsReturned() const; + + /// Used to know if the prepare phase succeed. + /// + /// \return True if all prepare requests were successful. + bool AllPrepareRequestsSuccessful() const; + + /// Indicate the tracker that the commit request of a bundle from a node has returned. + /// + /// \param node_id Id of a node where commit request is returned. + /// \param bundle Bundle specification the node was supposed to schedule. + /// \param status Status of the returned commit request. + void MarkCommitRequestReturned(const ClientID &node_id, + const std::shared_ptr bundle, + const Status &status); + + /// Used to know if all commit requests are returend. + /// + /// \return True if all commit requests are returned. False otherwise. + bool AllCommitRequestReturned() const; + + /// Used to know if the commit phase succeed. + /// + /// \return True if all commit requests were successful.. + bool AllCommitRequestsSuccessful() const; + + /// Return a placement group this status tracker is associated with. + /// + /// \return The placement group of this lease status tracker is tracking. const std::shared_ptr &GetPlacementGroup() const; - const std::vector> &GetUnplacedBundles() const; - const std::shared_ptr &GetBundleLocations() const; + + /// Return bundles that should be scheduled. + /// + /// \return List of bundle specification that are supposed to be scheduled. + const std::vector> &GetBundlesToSchedule() const; + + /// This method returns bundle locations that succeed to prepare resources. + /// + /// \return Location of bundles that succeed to prepare resources on a node. + const std::shared_ptr &GetPreparedBundleLocations() const; + + /// This method returns bundle locations that succeed to commit resources. + /// + /// \return Location of bundles that succeed to commit resources on a node. + const std::shared_ptr &GetUnCommittedBundleLocations() const; + + /// Return the leasing state. + /// + /// \return Leasing state. const LeasingState GetLeasingState() const; + + /// Mark that this leasing is cancelled. void MarkPlacementGroupScheduleCancelled(); + /// Mark that the commit phase is started. + /// There's no need to mark commit phase is done because in that case, we won't need the + /// status tracker anymore. + void MarkCommitPhaseStarted(); + private: /// Method to update leasing states. /// /// \param leasing_state The state to update. /// \return True if succeeds to update. False otherwise. bool UpdateLeasingState(LeasingState leasing_state); + /// Placement group of which this leasing context is associated with. std::shared_ptr placement_group_; - /// Location of bundles that lease requests were sent. - /// If schedule success, the decision will be set as schedule_map[bundles[pos]] + + /// Location of bundles that prepare requests were sent. + /// If prepare succeeds, the decision will be set as schedule_map[bundles[pos]] /// else will be set ClientID::Nil(). - std::shared_ptr bundle_locations_; - /// Number of lease requests that are returned. - size_t returned_count_ = 0; + std::shared_ptr preparing_bundle_locations_; + + /// Number of prepare requests that are returned. + size_t prepare_request_returned_count_ = 0; + + /// Number of commit requests that are returned. + size_t commit_request_returned_count_ = 0; + + /// Location of bundles that commit requests failed. + std::shared_ptr uncommitted_bundle_locations_; + /// The leasing stage. This is used to know the state of current leasing context. - LeasingState leasing_state_ = LeasingState::SCHEDULING; + LeasingState leasing_state_ = LeasingState::PREPARING; + /// Map from node ID to the set of bundles for whom we are trying to acquire a lease /// from that node. This is needed so that we can retry lease requests from the node /// until we receive a reply or the node is removed. /// TODO(sang): We don't currently handle retry. absl::flat_hash_map> - node_to_bundles_when_leasing_; - /// Unplaced bundle specification for this leasing context. - std::vector> unplaced_bundles_; + node_to_bundles_when_preparing_; + + /// Bundles to schedule. + std::vector> bundles_to_schedule_; }; /// A data structure that helps fast bundle location lookup. @@ -266,8 +349,8 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// Schedule unplaced bundles of the specified placement group. /// If there is no available nodes then the `schedule_failed_handler` will be - /// triggered, otherwise the bundle in placement_group will be add into a queue and - /// schedule all bundle by calling ReserveResourceFromNode(). + /// triggered, otherwise the bundle in placement_group will be added into a queue and + /// scheduled to all nodes. /// /// \param placement_group to be scheduled. /// \param failure_callback This function is called if the schedule is failed. @@ -277,18 +360,21 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { std::function)> failure_handler, std::function)> success_handler) override; - /// Destroy bundle resources from all nodes in the placement group. - /// This doesn't do anything if bundles are already destroyed. + /// Destroy the actual bundle resources or locked resources (for 2PC) + /// on all nodes associated with this placement group. + /// The method is idempotent, meaning if all bundles are already cancelled, + /// this method won't do anything. /// /// \param placement_group_id The id of a placement group to destroy all bundle - /// resources. + /// or locked resources. void DestroyPlacementGroupBundleResourcesIfExists( const PlacementGroupID &placement_group_id) override; - /// Mark the placement group schedule as cancelled. - /// Cancelled bundles will be destroyed. - /// \param placement_group_id The id of a placement group to mark that scheduling is - /// cancelled. + /// Mark the placement group scheduling is cancelled. + /// This method will incur check failure if scheduling + /// is not actually going on to guarantee strong consistency. + /// + /// \param placement_group_id The placement group id scheduling is in progress. void MarkScheduleCancelled(const PlacementGroupID &placement_group_id) override; /// Get bundles belong to the specified node. @@ -299,29 +385,66 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { const ClientID &node_id) override; protected: - /// Lease resource from the specified node for the specified bundle. - void ReserveResourceFromNode(const std::shared_ptr &bundle, - const std::shared_ptr &node, - const StatusCallback &callback); + /// Send a bundle PREPARE request to a node. The PREPARE request will lock resources + /// on a node until COMMIT or CANCEL requests are sent to a node. + /// + /// \param bundle A bundle to schedule on a node. + /// \param node A node to prepare resources for a given bundle. + /// \param callback + void PrepareResources(const std::shared_ptr &bundle, + const std::shared_ptr &node, + const StatusCallback &callback); - /// return resource for the specified node for the specified bundle. + /// Send a bundle COMMIT request to a node. This means the placement group creation + /// is ready and GCS will commit resources on a given node. + /// + /// \param bundle A bundle to schedule on a node. + /// \param node A node to commit resources for a given bundle. + /// \param callback + void CommitResources(const std::shared_ptr &bundle, + const std::shared_ptr &node, + const StatusCallback callback); + + /// Cacnel prepared or committed resources from a node. + /// Nodes will be in charge of tracking state of a bundle. + /// This method is supposed to be idempotent. /// /// \param bundle A description of the bundle to return. /// \param node The node that the worker will be returned for. void CancelResourceReserve(const std::shared_ptr &bundle_spec, const std::shared_ptr &node); - /// Get an existing lease client or connect a new one. + /// Get an existing lease client or connect a new one or connect a new one. std::shared_ptr GetOrConnectLeaseClient( const rpc::Address &raylet_address); - void OnAllBundleSchedulingRequestReturned( + /// Get an existing lease client for a given node. + std::shared_ptr GetLeaseClientFromNode( + const std::shared_ptr &node); + + /// Called when all prepare requests are returned from nodes. + void OnAllBundlePrepareRequestReturned( const std::shared_ptr &lease_status_tracker, const std::function)> &schedule_failure_handler, const std::function)> &schedule_success_handler); + /// Called when all commit requests are returned from nodes. + void OnAllBundleCommitRequestReturned( + const std::shared_ptr &lease_status_tracker, + const std::function)> + &schedule_failure_handler, + const std::function)> + &schedule_success_handler); + + /// Commit all bundles recorded in lease status tracker. + void CommitAllBundles(const std::shared_ptr &lease_status_tracker, + const std::function)> + &schedule_failure_handler, + const std::function)> + &schedule_success_handler); + /// Generate schedule context. std::unique_ptr GetScheduleContext( const PlacementGroupID &placement_group_id); @@ -345,8 +468,8 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// A vector to store all the schedule strategy. std::vector> scheduler_strategies_; - /// Index to lookup bundle locations of node or placement group. - BundleLocationIndex bundle_location_index_; + /// Index to lookup committed bundle locations of node or placement group. + BundleLocationIndex committed_bundle_location_index_; /// Set of placement group that have lease requests in flight to nodes. /// It is required to know if placement group has been removed or not. diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index a02343003..28ddaf61e 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -120,8 +120,8 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { ASSERT_EQ(2, raylet_client_->num_lease_requested); ASSERT_EQ(2, raylet_client_->lease_callbacks.size()); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); WaitPendingDone(failure_placement_groups_, 0); WaitPendingDone(success_placement_groups_, 1); ASSERT_EQ(placement_group, success_placement_groups_.front()); @@ -152,8 +152,8 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { AddNode(Mocker::GenNodeInfo(0), 2); scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); WaitPendingDone(success_placement_groups_, 1); } @@ -229,8 +229,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReplyFailure) { ASSERT_EQ(2, raylet_client_->lease_callbacks.size()); // Reply failure, so the placement group scheduling failed. - ASSERT_TRUE(raylet_client_->GrantResourceReserve(false)); - ASSERT_TRUE(raylet_client_->GrantResourceReserve(false)); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources(false)); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources(false)); WaitPendingDone(failure_placement_groups_, 1); WaitPendingDone(success_placement_groups_, 0); ASSERT_EQ(placement_group, failure_placement_groups_.front()); @@ -285,8 +285,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource) ASSERT_EQ(2, raylet_client_->num_lease_requested); ASSERT_EQ(2, raylet_client_->lease_callbacks.size()); // One bundle success and the other failed. - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); - ASSERT_TRUE(raylet_client_->GrantResourceReserve(false)); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources(false)); ASSERT_EQ(1, raylet_client_->num_return_requested); // Reply the placement_group creation request, then the placement_group should be // scheduled successfully. @@ -318,12 +318,12 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyBalancedScheduling) success_handler); if (!raylet_client_->lease_callbacks.empty()) { - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); ++select_node0_count; } else { - ASSERT_TRUE(raylet_client1_->GrantResourceReserve()); - ASSERT_TRUE(raylet_client1_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client1_->GrantPrepareBundleResources()); + ASSERT_TRUE(raylet_client1_->GrantPrepareBundleResources()); ++select_node1_count; } } @@ -351,8 +351,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) { Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::STRICT_PACK); auto placement_group = std::make_shared(request); scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); WaitPendingDone(success_placement_groups_, 1); // Node1 has less number of bundles, but it doesn't satisfy the resource @@ -364,8 +364,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) { auto placement_group2 = std::make_shared(create_placement_group_request2); scheduler_->ScheduleUnplacedBundles(placement_group2, failure_handler, success_handler); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); WaitPendingDone(success_placement_groups_, 2); } @@ -390,15 +390,15 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyPlacementGroup) { absl::MutexLock lock(&vector_mutex_); success_placement_groups_.emplace_back(std::move(placement_group)); }); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); WaitPendingDone(failure_placement_groups_, 0); WaitPendingDone(success_placement_groups_, 1); + RAY_LOG(ERROR) << "sanngbin1"; const auto &placement_group_id = placement_group->GetPlacementGroupID(); scheduler_->DestroyPlacementGroupBundleResourcesIfExists(placement_group_id); ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve()); ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve()); - // Subsequent destroy request should not do anything. scheduler_->DestroyPlacementGroupBundleResourcesIfExists(placement_group_id); ASSERT_FALSE(raylet_client_->GrantCancelResourceReserve()); @@ -429,9 +429,9 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyCancelledPlacementGroup) { }); // Now, cancel the schedule request. - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); scheduler_->MarkScheduleCancelled(placement_group_id); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve()); ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve()); WaitPendingDone(failure_placement_groups_, 1); @@ -462,10 +462,10 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPackStrategyLargeBundlesScheduling) { RAY_CHECK(raylet_client_->num_lease_requested > 0); RAY_CHECK(raylet_client1_->num_lease_requested > 0); for (int index = 0; index < raylet_client_->num_lease_requested; ++index) { - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); } for (int index = 0; index < raylet_client1_->num_lease_requested; ++index) { - ASSERT_TRUE(raylet_client1_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client1_->GrantPrepareBundleResources()); } WaitPendingDone(success_placement_groups_, 1); } @@ -492,8 +492,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestRescheduleWhenNodeDead) { }; scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); - ASSERT_TRUE(raylet_client1_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); + ASSERT_TRUE(raylet_client1_->GrantPrepareBundleResources()); WaitPendingDone(success_placement_groups_, 1); auto bundles_on_node0 = @@ -509,8 +509,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestRescheduleWhenNodeDead) { // TODO(ffbin): We need to see which node the other bundles that have been placed are // deployed on, and spread them as far as possible. It will be implemented in the next // pr. - raylet_client_->GrantResourceReserve(); - raylet_client1_->GrantResourceReserve(); + raylet_client_->GrantPrepareBundleResources(); + raylet_client1_->GrantPrepareBundleResources(); WaitPendingDone(success_placement_groups_, 2); } @@ -543,8 +543,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictSpreadStrategyResourceCheck) { auto node2 = Mocker::GenNodeInfo(2); AddNode(node2); scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); - ASSERT_TRUE(raylet_client2_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantPrepareBundleResources()); + ASSERT_TRUE(raylet_client2_->GrantPrepareBundleResources()); WaitPendingDone(success_placement_groups_, 1); } diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index 73020e90c..3c6755917 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -158,14 +158,22 @@ struct GcsServerMocker { class MockRayletResourceClient : public ResourceReserveInterface { public: - void RequestResourceReserve( + void PrepareBundleResources( const BundleSpecification &bundle_spec, - const ray::rpc::ClientCallback &callback) + const ray::rpc::ClientCallback &callback) override { num_lease_requested += 1; lease_callbacks.push_back(callback); } + void CommitBundleResources( + const BundleSpecification &bundle_spec, + const ray::rpc::ClientCallback &callback) + override { + rpc::CommitBundleResourcesReply reply; + callback(Status::OK(), reply); + } + void CancelResourceReserve( BundleSpecification &bundle_spec, const ray::rpc::ClientCallback &callback) @@ -175,9 +183,9 @@ struct GcsServerMocker { } // Trigger reply to RequestWorkerLease. - bool GrantResourceReserve(bool success = true) { + bool GrantPrepareBundleResources(bool success = true) { Status status = Status::OK(); - rpc::RequestResourceReserveReply reply; + rpc::PrepareBundleResourcesReply reply; reply.set_success(success); if (lease_callbacks.size() == 0) { return false; @@ -208,7 +216,7 @@ struct GcsServerMocker { int num_lease_requested = 0; int num_return_requested = 0; ClientID node_id = ClientID::FromRandom(); - std::list> lease_callbacks = {}; + std::list> lease_callbacks = {}; std::list> return_callbacks = {}; }; class MockedGcsActorScheduler : public gcs::GcsActorScheduler { diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 45c22be96..167212301 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -37,16 +37,24 @@ message RequestWorkerLeaseReply { bool canceled = 4; } -message RequestResourceReserveRequest { +message PrepareBundleResourcesRequest { // Bundle containing the requested resources. Bundle bundle_spec = 1; } -message RequestResourceReserveReply { - // The status if resource reserve success. +message PrepareBundleResourcesReply { + // The status if prepare request was successful. bool success = 1; } +message CommitBundleResourcesRequest { + // Bundle containing the requested resources. + Bundle bundle_spec = 1; +} + +message CommitBundleResourcesReply { +} + message CancelResourceReserveRequest { // Bundle containing the requested resources. Bundle bundle_spec = 1; @@ -166,9 +174,14 @@ service NodeManagerService { // are still needed. And Raylet will release other leased workers. rpc ReleaseUnusedWorkers(ReleaseUnusedWorkersRequest) returns (ReleaseUnusedWorkersReply); - // Request resource from the raylet for a bundle. - rpc RequestResourceReserve(RequestResourceReserveRequest) - returns (RequestResourceReserveReply); + // Request a raylet to lock resources for a bundle. + // This is the first phase of 2PC protocol for atomic placement group creation. + rpc PrepareBundleResources(PrepareBundleResourcesRequest) + returns (PrepareBundleResourcesReply); + // Commit bundle resources to a raylet. + // This is the second phase of 2PC protocol for atomic placement group creation. + rpc CommitBundleResources(CommitBundleResourcesRequest) + returns (CommitBundleResourcesReply); // Return resource for the raylet. rpc CancelResourceReserve(CancelResourceReserveRequest) returns (CancelResourceReserveReply); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index bce98cba5..e719aba2c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1795,12 +1795,13 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest SubmitTask(task); } -void NodeManager::HandleRequestResourceReserve( - const rpc::RequestResourceReserveRequest &request, - rpc::RequestResourceReserveReply *reply, rpc::SendReplyCallback send_reply_callback) { - RAY_CHECK(!new_scheduler_enabled_) << "Not implemented"; +void NodeManager::HandlePrepareBundleResources( + const rpc::PrepareBundleResourcesRequest &request, + rpc::PrepareBundleResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) { + // TODO(sang): Port this onto the new scheduler. + RAY_CHECK(!new_scheduler_enabled_) << "Not implemented yet."; auto bundle_spec = BundleSpecification(request.bundle_spec()); - RAY_LOG(DEBUG) << "bundle lease request " << bundle_spec.BundleId().first + RAY_LOG(DEBUG) << "bundle prepare request " << bundle_spec.BundleId().first << bundle_spec.BundleId().second; auto resource_ids = ScheduleBundle(cluster_resource_map_, bundle_spec); if (resource_ids.AvailableResources().size() == 0) { @@ -1815,6 +1816,13 @@ void NodeManager::HandleRequestResourceReserve( DispatchTasks(local_queues_.GetReadyTasksByClass()); } +void NodeManager::HandleCommitBundleResources( + const rpc::CommitBundleResourcesRequest &request, + rpc::CommitBundleResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) { + send_reply_callback(Status::OK(), nullptr, nullptr); + // TODO(sang): Implement this in the next PR. +} + void NodeManager::HandleCancelResourceReserve( const rpc::CancelResourceReserveRequest &request, rpc::CancelResourceReserveReply *reply, rpc::SendReplyCallback send_reply_callback) { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 74e1b9aaf..b79084eeb 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -552,11 +552,16 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \return Status indicating whether setup was successful. ray::Status SetupPlasmaSubscription(); - /// Handle a `ResourcesLease` request. - void HandleRequestResourceReserve(const rpc::RequestResourceReserveRequest &request, - rpc::RequestResourceReserveReply *reply, + /// Handle a `PrepareBundleResources` request. + void HandlePrepareBundleResources(const rpc::PrepareBundleResourcesRequest &request, + rpc::PrepareBundleResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Handle a `CommitBundleResources` request. + void HandleCommitBundleResources(const rpc::CommitBundleResourcesRequest &request, + rpc::CommitBundleResourcesReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Handle a `ResourcesReturn` request. void HandleCancelResourceReserve(const rpc::CancelResourceReserveRequest &request, rpc::CancelResourceReserveReply *reply, diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index ec40b3a79..935b23897 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -378,12 +378,20 @@ void raylet::RayletClient::CancelWorkerLease( grpc_client_->CancelWorkerLease(request, callback); } -void raylet::RayletClient::RequestResourceReserve( +void raylet::RayletClient::PrepareBundleResources( const BundleSpecification &bundle_spec, - const ray::rpc::ClientCallback &callback) { - rpc::RequestResourceReserveRequest request; + const ray::rpc::ClientCallback &callback) { + rpc::PrepareBundleResourcesRequest request; request.mutable_bundle_spec()->CopyFrom(bundle_spec.GetMessage()); - grpc_client_->RequestResourceReserve(request, callback); + grpc_client_->PrepareBundleResources(request, callback); +} + +void raylet::RayletClient::CommitBundleResources( + const BundleSpecification &bundle_spec, + const ray::rpc::ClientCallback &callback) { + rpc::CommitBundleResourcesRequest request; + request.mutable_bundle_spec()->CopyFrom(bundle_spec.GetMessage()); + grpc_client_->CommitBundleResources(request, callback); } void raylet::RayletClient::CancelResourceReserve( diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index dc088fd45..061cd23f4 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -91,14 +91,27 @@ class WorkerLeaseInterface { /// Interface for leasing resource. class ResourceReserveInterface { public: - /// Requests a resource from the raylet. The callback will be sent via gRPC. - /// \param resource_spec Resources that should be allocated for the worker. + /// Request a raylet to prepare resources of a given bundle for atomic placement group + /// creation. This is used for the first phase of atomic placement group creation. The + /// callback will be sent via gRPC. + /// \param resource_spec Resources that should be + /// allocated for the worker. /// \return ray::Status - virtual void RequestResourceReserve( + virtual void PrepareBundleResources( const BundleSpecification &bundle_spec, - const ray::rpc::ClientCallback + const ray::rpc::ClientCallback &callback) = 0; + /// Request a raylet to commit resources of a given bundle for atomic placement group + /// creation. This is used for the first phase of atomic placement group creation. The + /// callback will be sent via gRPC. + /// \param resource_spec Resources that should be + /// allocated for the worker. + /// \return ray::Status + virtual void CommitBundleResources( + const BundleSpecification &bundle_spec, + const ray::rpc::ClientCallback &callback) = 0; + virtual void CancelResourceReserve( BundleSpecification &bundle_spec, const ray::rpc::ClientCallback &callback) = 0; @@ -351,13 +364,19 @@ class RayletClient : public PinObjectsInterface, const TaskID &task_id, const rpc::ClientCallback &callback) override; - /// Implements ResourceReserveInterface. - void RequestResourceReserve( + /// Implements PrepareBundleResourcesInterface. + void PrepareBundleResources( const BundleSpecification &bundle_spec, - const ray::rpc::ClientCallback &callback) + const ray::rpc::ClientCallback &callback) override; - /// Implements ResourceReserveInterface. + /// Implements CommitBundleResourcesInterface. + void CommitBundleResources( + const BundleSpecification &bundle_spec, + const ray::rpc::ClientCallback &callback) + override; + + /// Implements CancelResourceReserveInterface. void CancelResourceReserve( BundleSpecification &bundle_spec, const ray::rpc::ClientCallback &callback) diff --git a/src/ray/rpc/node_manager/node_manager_client.h b/src/ray/rpc/node_manager/node_manager_client.h index 3dc1cd4e6..e82c45b7e 100644 --- a/src/ray/rpc/node_manager/node_manager_client.h +++ b/src/ray/rpc/node_manager/node_manager_client.h @@ -82,8 +82,11 @@ class NodeManagerWorkerClient /// Cancel a pending worker lease request. VOID_RPC_CLIENT_METHOD(NodeManagerService, CancelWorkerLease, grpc_client_, ) - /// Request resource lease. - VOID_RPC_CLIENT_METHOD(NodeManagerService, RequestResourceReserve, grpc_client_, ) + /// Request prepare resources for an atomic placement group creation. + VOID_RPC_CLIENT_METHOD(NodeManagerService, PrepareBundleResources, grpc_client_, ) + + /// Request commit resources for an atomic placement group creation. + VOID_RPC_CLIENT_METHOD(NodeManagerService, CommitBundleResources, grpc_client_, ) /// Return resource lease. VOID_RPC_CLIENT_METHOD(NodeManagerService, CancelResourceReserve, grpc_client_, ) diff --git a/src/ray/rpc/node_manager/node_manager_server.h b/src/ray/rpc/node_manager/node_manager_server.h index f383dc9d8..2db6706eb 100644 --- a/src/ray/rpc/node_manager/node_manager_server.h +++ b/src/ray/rpc/node_manager/node_manager_server.h @@ -32,7 +32,8 @@ namespace rpc { RPC_SERVICE_HANDLER(NodeManagerService, GetNodeStats) \ RPC_SERVICE_HANDLER(NodeManagerService, GlobalGC) \ RPC_SERVICE_HANDLER(NodeManagerService, FormatGlobalMemoryInfo) \ - RPC_SERVICE_HANDLER(NodeManagerService, RequestResourceReserve) \ + RPC_SERVICE_HANDLER(NodeManagerService, PrepareBundleResources) \ + RPC_SERVICE_HANDLER(NodeManagerService, CommitBundleResources) \ RPC_SERVICE_HANDLER(NodeManagerService, CancelResourceReserve) \ RPC_SERVICE_HANDLER(NodeManagerService, RequestObjectSpillage) @@ -66,9 +67,14 @@ class NodeManagerServiceHandler { rpc::CancelWorkerLeaseReply *reply, rpc::SendReplyCallback send_reply_callback) = 0; - virtual void HandleRequestResourceReserve( - const rpc::RequestResourceReserveRequest &request, - rpc::RequestResourceReserveReply *reply, + virtual void HandlePrepareBundleResources( + const rpc::PrepareBundleResourcesRequest &request, + rpc::PrepareBundleResourcesReply *reply, + rpc::SendReplyCallback send_reply_callback) = 0; + + virtual void HandleCommitBundleResources( + const rpc::CommitBundleResourcesRequest &request, + rpc::CommitBundleResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) = 0; virtual void HandleCancelResourceReserve(