From dcb9e03fde3116f7c43787947ea6f0b37ddb3210 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 8 Sep 2020 13:11:11 -0700 Subject: [PATCH] [Placement Group] Atomic Creation using 2 phase protocol part 2. (#10599) * In progress. * In Progress * Basic done. * Fix build issues. * Addressed code review. * Change the confusing test name. * Fix comments. * Addressed code review. --- python/ray/tests/test_placement_group.py | 152 +++++++++++++++--- src/ray/common/task/scheduling_resources.cc | 28 ++-- src/ray/common/task/scheduling_resources.h | 40 +++-- .../common/task/scheduling_resources_test.cc | 20 +-- src/ray/raylet/node_manager.cc | 87 ++++++++-- src/ray/raylet/node_manager.h | 46 +++++- 6 files changed, 305 insertions(+), 68 deletions(-) diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 17bad8acf..47c0e1787 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -616,15 +616,14 @@ def test_schedule_placement_group_when_node_add(ray_start_cluster): wait_for_condition(is_placement_group_created) -@pytest.mark.skip(reason="Not working yet") def test_atomic_creation(ray_start_cluster): # Setup cluster. cluster = ray_start_cluster bundle_cpu_size = 2 bundle_per_node = 2 - num_nodes = 5 + num_nodes = 2 - nodes = [ + [ cluster.add_node(num_cpus=bundle_cpu_size * bundle_per_node) for _ in range(num_nodes) ] @@ -635,24 +634,29 @@ def test_atomic_creation(ray_start_cluster): def ping(self): pass + @ray.remote(num_cpus=3) + def bothering_task(): + import time + time.sleep(1) + return True + + # Schedule tasks to fail initial placement group creation. + tasks = [bothering_task.remote() for _ in range(2)] # Create an actor that will fail bundle scheduling. # It is important to use pack strategy to make test less flaky. pg = ray.util.placement_group( name="name", - strategy="PACK", + strategy="SPREAD", bundles=[{ "CPU": bundle_cpu_size } for _ in range(num_nodes * bundle_per_node)]) # Create a placement group actor. - # This shouldn't be scheduled until placement group creation is done. + # This shouldn't be scheduled because atomic + # placement group creation should've failed. pg_actor = NormalActor.options( placement_group=pg, placement_group_bundle_index=num_nodes * bundle_per_node - 1).remote() - # Destroy some nodes to fail placement group creation. - nodes_to_kill = get_other_nodes(cluster, exclude_head=True) - for node_to_kill in nodes_to_kill: - cluster.remove_node(node_to_kill) # Wait on the placement group now. It should be unready # because normal actor takes resources that are required @@ -660,23 +664,131 @@ def test_atomic_creation(ray_start_cluster): ready, unready = ray.wait([pg.ready()], timeout=0) assert len(ready) == 0 assert len(unready) == 1 + # Wait until all tasks are done. + assert all(ray.get(tasks)) - # Add a node back to schedule placement group. - for _ in range(len(nodes_to_kill)): - nodes.append( - cluster.add_node(num_cpus=bundle_cpu_size * bundle_per_node)) - # Wait on the placement group creation. + # Wait on the placement group creation. Since resources are now available, + # it should be ready soon. ready, unready = ray.wait([pg.ready()]) assert len(ready) == 1 assert len(unready) == 0 # Confirm that the placement group actor is created. It will - # raise an exception if actor was scheduled before placement group was - # created. - # TODO(sang): This with statement should be removed after atomic creation - # is implemented. It will be done in the next PR. - with pytest.raises(ray.exceptions.RayActorError): - ray.get(pg_actor.ping.remote(), timeout=3.0) + # raise an exception if actor was scheduled before placement + # group was created thus it checks atomicity. + ray.get(pg_actor.ping.remote(), timeout=3.0) + ray.kill(pg_actor) + + # Make sure atomic creation failure didn't impact resources. + @ray.remote(num_cpus=bundle_cpu_size) + def resource_check(): + return True + + # This should hang because every resources + # are claimed by placement group. + check_without_pg = [ + resource_check.remote() for _ in range(bundle_per_node * num_nodes) + ] + + # This all should scheduled on each bundle. + check_with_pg = [ + resource_check.options( + placement_group=pg, placement_group_bundle_index=i).remote() + for i in range(bundle_per_node * num_nodes) + ] + + # Make sure these are hanging. + ready, unready = ray.wait(check_without_pg, timeout=0) + assert len(ready) == 0 + assert len(unready) == bundle_per_node * num_nodes + + # Make sure these are all scheduled. + assert all(ray.get(check_with_pg)) + + ray.util.remove_placement_group(pg) + + def pg_removed(): + return ray.util.placement_group_table(pg)["state"] == "REMOVED" + + wait_for_condition(pg_removed) + + # Make sure check without pgs are all + # scheduled properly because resources are cleaned up. + assert all(ray.get(check_without_pg)) + + +def test_mini_integration(ray_start_cluster): + # Create bundles as many as number of gpus in the cluster. + # Do some random work and make sure all resources are properly recovered. + + cluster = ray_start_cluster + + num_nodes = 5 + per_bundle_gpus = 2 + gpu_per_node = 4 + total_gpus = num_nodes * per_bundle_gpus * gpu_per_node + per_node_gpus = per_bundle_gpus * gpu_per_node + + bundles_per_pg = 2 + total_num_pg = total_gpus // (bundles_per_pg * per_bundle_gpus) + + [ + cluster.add_node(num_cpus=2, num_gpus=per_bundle_gpus * gpu_per_node) + for _ in range(num_nodes) + ] + cluster.wait_for_nodes() + ray.init(address=cluster.address) + + @ray.remote(num_cpus=0, num_gpus=1) + def random_tasks(): + import time + import random + sleep_time = random.uniform(0.1, 0.2) + time.sleep(sleep_time) + return True + + pgs = [] + pg_tasks = [] + # total bundle gpu usage = bundles_per_pg * total_num_pg * per_bundle_gpus + # Note this is half of total + for _ in range(total_num_pg): + pgs.append( + ray.util.placement_group( + name="name", + strategy="PACK", + bundles=[{ + "GPU": per_bundle_gpus + } for _ in range(bundles_per_pg)])) + + # Schedule tasks. + for i in range(total_num_pg): + pg = pgs[i] + pg_tasks.append([ + random_tasks.options( + placement_group=pg, + placement_group_bundle_index=bundle_index).remote() + for bundle_index in range(bundles_per_pg) + ]) + + # Make sure tasks are done and we remove placement groups. + num_removed_pg = 0 + pg_indexes = [2, 3, 1, 7, 8, 9, 0, 6, 4, 5] + while num_removed_pg < total_num_pg: + index = pg_indexes[num_removed_pg] + pg = pgs[index] + assert all(ray.get(pg_tasks[index])) + ray.util.remove_placement_group(pg) + num_removed_pg += 1 + + @ray.remote(num_cpus=2, num_gpus=per_node_gpus) + class A: + def ping(self): + return True + + # Make sure all resources are properly returned by scheduling + # actors that take up all existing resources. + actors = [A.remote() for _ in range(num_nodes)] + assert all(ray.get([a.ping.remote() for a in actors])) if __name__ == "__main__": diff --git a/src/ray/common/task/scheduling_resources.cc b/src/ray/common/task/scheduling_resources.cc index 68503e846..e5080cab5 100644 --- a/src/ray/common/task/scheduling_resources.cc +++ b/src/ray/common/task/scheduling_resources.cc @@ -228,8 +228,9 @@ void ResourceSet::AddResources(const ResourceSet &other) { } } -void ResourceSet::AddBundleResources(const PlacementGroupID &group_id, - const int bundle_index, const ResourceSet &other) { +void ResourceSet::CommitBundleResources(const PlacementGroupID &group_id, + const int bundle_index, + const ResourceSet &other) { for (const auto &resource_pair : other.GetResourceAmountMap()) { // With bundle index (e.g., CPU_group_i_zzz). const std::string &resource_label = @@ -685,10 +686,10 @@ void ResourceIdSet::AddOrUpdateResource(const std::string &resource_name, } } -void ResourceIdSet::AddBundleResourceIds(const PlacementGroupID &group_id, - const int bundle_index, - const std::string &resource_name, - ResourceIds &resource_ids) { +void ResourceIdSet::CommitBundleResourceIds(const PlacementGroupID &group_id, + const int bundle_index, + const std::string &resource_name, + ResourceIds &resource_ids) { auto index_name = FormatPlacementGroupResource(resource_name, group_id, bundle_index); auto wildcard_name = FormatPlacementGroupResource(resource_name, group_id, -1); available_resources_[index_name] = available_resources_[index_name].Plus(resource_ids); @@ -872,13 +873,18 @@ void SchedulingResources::UpdateResourceCapacity(const std::string &resource_nam } } -void SchedulingResources::TransferToBundleResources(const PlacementGroupID &group, - const int bundle_index, - const ResourceSet &resource_set) { +void SchedulingResources::PrepareBundleResources(const PlacementGroupID &group, + const int bundle_index, + const ResourceSet &resource_set) { resources_available_.SubtractResourcesStrict(resource_set); - resources_available_.AddBundleResources(group, bundle_index, resource_set); resources_total_.SubtractResourcesStrict(resource_set); - resources_total_.AddBundleResources(group, bundle_index, resource_set); +} + +void SchedulingResources::CommitBundleResources(const PlacementGroupID &group, + const int bundle_index, + const ResourceSet &resource_set) { + resources_available_.CommitBundleResources(group, bundle_index, resource_set); + resources_total_.CommitBundleResources(group, bundle_index, resource_set); } void SchedulingResources::ReturnBundleResources(const PlacementGroupID &group_id, diff --git a/src/ray/common/task/scheduling_resources.h b/src/ray/common/task/scheduling_resources.h index 491d25bcf..44fecfe40 100644 --- a/src/ray/common/task/scheduling_resources.h +++ b/src/ray/common/task/scheduling_resources.h @@ -154,17 +154,21 @@ class ResourceSet { /// This adds both the the indexed and wildcard resources (e.g., both /// CPU_group_i_zzz and CPU_group_zzz). /// + /// NOTE: This method should be used AFTER resources are COMMITTED. + /// It can have unexpected behavior if you call this method on PREPARED resources. + /// /// \param group_id: The placement group id. /// \param bundle_index: The index of the bundle. /// \param other: The other resource set to add. /// \return Void. - void AddBundleResources(const PlacementGroupID &group_id, const int bundle_index, - const ResourceSet &other); + void CommitBundleResources(const PlacementGroupID &group_id, const int bundle_index, + const ResourceSet &other); /// \brief Return back all the bundle resource. Changing the resource name and adding /// any missing resource labels to this set. /// - /// This is the inverse of AddBundleResources(). + /// Note that this method assumes bundle resources are COMMITTED. + /// Please make sure to commit bundle resources before calling this method. /// /// \param group_id: The placement group id. /// \param bundle_index: The bundle index to return resources for. @@ -427,7 +431,7 @@ class ResourceIdSet { /// \param capacity capacity of the resource being added void AddOrUpdateResource(const std::string &resource_name, int64_t capacity); - /// \brief Add a Bundle resource in the ResourceIdSet. + /// \brief Commit a Bundle resource in the ResourceIdSet. /// /// This adds both the the indexed and wildcard resources (e.g., both /// CPU_group_i_zzz and CPU_group_zzz). @@ -436,12 +440,15 @@ class ResourceIdSet { /// \param bundle_index: The index of the bundle. /// \param resource_name the name of the resource to create/update (e.g., "CPU"). /// \param resource_ids resource_ids of the resource being added - void AddBundleResourceIds(const PlacementGroupID &group_id, const int bundle_index, - const std::string &resource_name, ResourceIds &resource_ids); + void CommitBundleResourceIds(const PlacementGroupID &group_id, const int bundle_index, + const std::string &resource_name, + ResourceIds &resource_ids); /// \brief remove a Bundle resource in the ResourceIdSet. /// /// The bundle resources will be returned to their original resource names. + /// Note that the bundle resources should've been COMMITTED before this method is + /// called. /// /// \param group_id: The placement group id. /// \param bundle_index: The index of the bundle. @@ -571,7 +578,22 @@ class SchedulingResources { void UpdateResourceCapacity(const std::string &resource_name, int64_t capacity); /// \brief Update total, available and load resources with the ResourceIds. - /// Create if not exists. + /// Create if not exists. This will only update resources, but it won't + /// create placement group resources. That'll be done when resources are + /// COMMITTED. Commit should be done by CommitBundleResources. + /// + /// We need this step for running 2PC protocol for atomic placement group creation. + /// + /// \param resource_name: Name of the resource to be modified. + /// \param resource_set: New resource_set of the resource. + void PrepareBundleResources(const PlacementGroupID &group, const int bundle_index, + const ResourceSet &resource_set); + + /// \brief Commit placement group resources. It means this method'll create + /// placement group resources. The original resources should've been updated + /// by PrepareBundleResources. + /// + /// We need this step for running 2PC protocol for atomic placement group creation. /// /// The resources will be transfered from their original resource names. /// This includes both the the indexed and wildcard resources (e.g., both @@ -579,8 +601,8 @@ class SchedulingResources { /// /// \param resource_name: Name of the resource to be modified /// \param resource_set: New resource_set of the resource. - void TransferToBundleResources(const PlacementGroupID &group, const int bundle_index, - const ResourceSet &resource_set); + void CommitBundleResources(const PlacementGroupID &group, const int bundle_index, + const ResourceSet &resource_set); /// \brief delete total, available and load resources with the ResourceIds. /// diff --git a/src/ray/common/task/scheduling_resources_test.cc b/src/ray/common/task/scheduling_resources_test.cc index ca919cdff..120f9f124 100644 --- a/src/ray/common/task/scheduling_resources_test.cc +++ b/src/ray/common/task/scheduling_resources_test.cc @@ -32,12 +32,12 @@ class SchedulingResourcesTest : public ::testing::Test { std::shared_ptr resource_id_set; }; -TEST_F(SchedulingResourcesTest, AddBundleResources) { +TEST_F(SchedulingResourcesTest, CommitBundleResources) { PlacementGroupID group_id = PlacementGroupID::FromRandom(); std::vector resource_labels = {"CPU"}; std::vector resource_capacity = {1.0}; ResourceSet resource(resource_labels, resource_capacity); - resource_set->AddBundleResources(group_id, 1, resource); + resource_set->CommitBundleResources(group_id, 1, resource); resource_labels.pop_back(); resource_labels.push_back("CPU_group_1_" + group_id.Hex()); resource_labels.push_back("CPU_group_" + group_id.Hex()); @@ -52,7 +52,7 @@ TEST_F(SchedulingResourcesTest, AddBundleResource) { std::string index_name = "CPU_group_1_" + group_id.Hex(); std::vector whole_ids = {1, 2, 3}; ResourceIds resource_ids(whole_ids); - resource_id_set->AddBundleResourceIds(group_id, 1, "CPU", resource_ids); + resource_id_set->CommitBundleResourceIds(group_id, 1, "CPU", resource_ids); ASSERT_EQ(2, resource_id_set->AvailableResources().size()); for (auto res : resource_id_set->AvailableResources()) { ASSERT_TRUE(res.first == wild_name || res.first == index_name) << res.first; @@ -64,7 +64,7 @@ TEST_F(SchedulingResourcesTest, ReturnBundleResources) { std::vector resource_labels = {"CPU"}; std::vector resource_capacity = {1.0}; ResourceSet resource(resource_labels, resource_capacity); - resource_set->AddBundleResources(group_id, 1, resource); + resource_set->CommitBundleResources(group_id, 1, resource); resource_labels.pop_back(); resource_labels.push_back("CPU_group_" + group_id.Hex()); resource_labels.push_back("CPU_group_1_" + group_id.Hex()); @@ -83,8 +83,8 @@ TEST_F(SchedulingResourcesTest, MultipleBundlesAddRemove) { ResourceSet resource(resource_labels, resource_capacity); // Construct resource set containing two bundles. - resource_set->AddBundleResources(group_id, 1, resource); - resource_set->AddBundleResources(group_id, 2, resource); + resource_set->CommitBundleResources(group_id, 1, resource); + resource_set->CommitBundleResources(group_id, 2, resource); resource_labels = { "CPU_group_" + group_id.Hex(), "CPU_group_1_" + group_id.Hex(), @@ -120,10 +120,10 @@ TEST_F(SchedulingResourcesTest, MultipleBundlesAddRemoveIdSet) { // Construct resource set containing two bundles. auto rid1 = ResourceIds({1, 2}); auto rid2 = ResourceIds({3, 4}); - resource_ids.AddBundleResourceIds(group_id, 1, "CPU", rid1); - resource_ids.AddBundleResourceIds(group_id, 2, "CPU", rid2); - resource_ids.AddBundleResourceIds(group_id, 1, "GPU", rid1); - resource_ids.AddBundleResourceIds(group_id, 2, "GPU", rid2); + resource_ids.CommitBundleResourceIds(group_id, 1, "CPU", rid1); + resource_ids.CommitBundleResourceIds(group_id, 2, "CPU", rid2); + resource_ids.CommitBundleResourceIds(group_id, 1, "GPU", rid1); + resource_ids.CommitBundleResourceIds(group_id, 2, "GPU", rid2); auto result = ResourceSet( { "CPU_group_" + group_id.Hex(), diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 73527e0f2..7d4fa0961 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1808,8 +1808,8 @@ void NodeManager::HandlePrepareBundleResources( auto bundle_spec = BundleSpecification(request.bundle_spec()); RAY_LOG(DEBUG) << "bundle prepare request " << bundle_spec.BundleId().first << bundle_spec.BundleId().second; - auto resource_ids = ScheduleBundle(cluster_resource_map_, bundle_spec); - if (resource_ids.AvailableResources().size() == 0) { + auto prepared = PrepareBundle(cluster_resource_map_, bundle_spec); + if (!prepared) { reply->set_success(false); send_reply_callback(Status::OK(), nullptr, nullptr); } else { @@ -1824,8 +1824,17 @@ void NodeManager::HandlePrepareBundleResources( void NodeManager::HandleCommitBundleResources( const rpc::CommitBundleResourcesRequest &request, rpc::CommitBundleResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) { + RAY_CHECK(!new_scheduler_enabled_) << "Not implemented yet."; + + auto bundle_spec = BundleSpecification(request.bundle_spec()); + RAY_LOG(DEBUG) << "Received bundle commit request " << bundle_spec.BundleId().first + << bundle_spec.BundleId().second; + CommitBundle(cluster_resource_map_, bundle_spec); send_reply_callback(Status::OK(), nullptr, nullptr); - // TODO(sang): Implement this in the next PR. + + // Call task dispatch to assign work to the new group. + TryLocalInfeasibleTaskScheduling(); + DispatchTasks(local_queues_.GetReadyTasksByClass()); } void NodeManager::HandleCancelResourceReserve( @@ -1860,6 +1869,20 @@ void NodeManager::HandleCancelResourceReserve( worker->MarkDead(); KillWorker(worker); } + + // We should commit resources if it weren't because + // ReturnBundleResources requires resources to be committed when it is called. + auto it = bundle_state_map_.find(bundle_spec.BundleId()); + RAY_CHECK(it != bundle_state_map_.end()) + << "Cancel requests are received to raylet although it hasn't received any prepare " + "or commit requests. This must be an anomaly."; + const auto &bundle_state = it->second; + if (bundle_state->state == CommitState::PREPARED) { + CommitBundle(cluster_resource_map_, bundle_spec); + } + bundle_state_map_.erase(it); + + // Return resources. for (auto resource : resource_set.GetResourceMap()) { local_available_resources_.ReturnBundleResources(bundle_spec.PlacementGroupId(), bundle_spec.Index(), resource.first); @@ -2014,30 +2037,66 @@ void NodeManager::ProcessSetResourceRequest( } } -ResourceIdSet NodeManager::ScheduleBundle( +bool NodeManager::PrepareBundle( std::unordered_map &resource_map, const BundleSpecification &bundle_spec) { - // If the resource map contains the local raylet, update load before calling policy. + // TODO(sang): It is currently not idempotent because we don't retry. Make it idempotent + // once retry is implemented. If the resource map contains the local raylet, update load + // before calling policy. if (resource_map.count(self_node_id_) > 0) { resource_map[self_node_id_].SetLoadResources(local_queues_.GetTotalResourceLoad()); } // Invoke the scheduling policy. auto reserve_resource_success = scheduling_policy_.ScheduleBundle(resource_map, self_node_id_, bundle_spec); - ResourceIdSet acquired_resources; + + auto bundle_state = std::make_shared(); if (reserve_resource_success) { - acquired_resources = + // Register states. + const auto &bundle_id = bundle_spec.BundleId(); + auto it = bundle_state_map_.find(bundle_id); + // Same bundle cannot be rescheduled. + RAY_CHECK(it == bundle_state_map_.end()); + + // Prepare resources. This shouldn't create formatted placement group resources + // because that'll be done at the commit phase. + bundle_state->acquired_resources = local_available_resources_.Acquire(bundle_spec.GetRequiredResources()); - for (auto resource : acquired_resources.AvailableResources()) { - local_available_resources_.AddBundleResourceIds(bundle_spec.PlacementGroupId(), - bundle_spec.Index(), resource.first, - resource.second); - } - resource_map[self_node_id_].TransferToBundleResources( + resource_map[self_node_id_].PrepareBundleResources( bundle_spec.PlacementGroupId(), bundle_spec.Index(), bundle_spec.GetRequiredResources()); + + // Register bundle state. + bundle_state->state = CommitState::PREPARED; + bundle_state_map_.emplace(bundle_id, bundle_state); } - return acquired_resources; + return bundle_state->acquired_resources.AvailableResources().size() > 0; +} + +void NodeManager::CommitBundle( + std::unordered_map &resource_map, + const BundleSpecification &bundle_spec) { + // TODO(sang): It is currently not idempotent because we don't retry. Make it idempotent + // once retry is implemented. + const auto &bundle_id = bundle_spec.BundleId(); + auto it = bundle_state_map_.find(bundle_id); + // When bundle is committed, it should've been prepared already. + // We don't need this check if commit becomes idempotent. + RAY_CHECK(it != bundle_state_map_.end()); + const auto &bundle_state = it->second; + bundle_state->state = CommitState::COMMITTED; + const auto &acquired_resources = bundle_state->acquired_resources; + for (auto resource : acquired_resources.AvailableResources()) { + local_available_resources_.CommitBundleResourceIds(bundle_spec.PlacementGroupId(), + bundle_spec.Index(), + resource.first, resource.second); + } + + resource_map[self_node_id_].CommitBundleResources(bundle_spec.PlacementGroupId(), + bundle_spec.Index(), + bundle_spec.GetRequiredResources()); + RAY_CHECK(bundle_state->acquired_resources.AvailableResources().size() > 0) + << "Prepare should've been failed if there were no acquireable resources."; } void NodeManager::ScheduleTasks( diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 025b7e7f9..c96db6d19 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -99,6 +99,28 @@ struct NodeManagerConfig { std::unordered_map raylet_config; }; +typedef std::pair BundleID; +struct pair_hash { + template + std::size_t operator()(const std::pair &pair) const { + return std::hash()(pair.first) ^ std::hash()(pair.second); + } +}; + +enum CommitState { + /// Resources are prepared. + PREPARED, + /// Resources are COMMITTED. + COMMITTED +}; + +struct BundleState { + /// Leasing state for 2PC protocol. + CommitState state; + /// Resources that are acquired at preparation stage. + ResourceIdSet acquired_resources; +}; + class NodeManager : public rpc::NodeManagerServiceHandler { public: /// Create a node manager. @@ -288,16 +310,27 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \return Void. void ScheduleTasks(std::unordered_map &resource_map); + /// Make a placement decision for the resource_map and subtract original resources so + /// that the node is ready to commit (create) placement group resources. + /// + /// \param resource_map A mapping from node manager ID to an estimate of the + /// resources available to that node manager. Scheduling decisions will only + /// consider the local node manager and the node managers in the keys of the + /// resource_map argument. + /// \param bundle_spec Specification of bundle that will be prepared. + /// \return True is resources were successfully prepared. False otherwise. + bool PrepareBundle(std::unordered_map &resource_map, + const BundleSpecification &bundle_spec); + /// Make a placement decision for the resource_map. /// /// \param resource_map A mapping from node manager ID to an estimate of the /// resources available to that node manager. Scheduling decisions will only /// consider the local node manager and the node managers in the keys of the /// resource_map argument. - /// \return ResourceIdSet. - ResourceIdSet ScheduleBundle( - std::unordered_map &resource_map, - const BundleSpecification &bundle_spec); + /// \param bundle_spec Specification of bundle that will be prepared. + void CommitBundle(std::unordered_map &resource_map, + const BundleSpecification &bundle_spec); /// Handle a task whose return value(s) must be reconstructed. /// @@ -794,6 +827,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// free_objects_batch_size, or if objects have been in the cache for longer /// than the config's free_objects_period, whichever occurs first. std::vector objects_to_free_; + + /// This map represents the commit state of 2PC protocol for atomic placement group + /// creation. + absl::flat_hash_map, pair_hash> + bundle_state_map_; }; } // namespace raylet