diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 420c0c594..7f1e8e639 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -94,8 +94,13 @@ def test_local_scheduling_first(ray_start_cluster): assert local() -@pytest.mark.skipif(new_scheduler_enabled(), reason="flakes more often") -def test_load_balancing_with_dependencies(ray_start_cluster): +@pytest.mark.parametrize("fast", [True, False]) +def test_load_balancing_with_dependencies(ray_start_cluster, fast): + if fast and new_scheduler_enabled: + # Load-balancing on new scheduler can be inefficient if (task + # duration:heartbeat interval) is small enough. + pytest.skip() + # This test ensures that tasks are being assigned to all raylets in a # roughly equal manner even when the tasks have dependencies. cluster = ray_start_cluster @@ -106,7 +111,10 @@ def test_load_balancing_with_dependencies(ray_start_cluster): @ray.remote def f(x): - time.sleep(0.010) + if fast: + time.sleep(0.010) + else: + time.sleep(0.1) return ray.worker.global_worker.node.unique_id # This object will be local to one of the raylets. Make sure diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 580623185..f24eaf8af 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -304,8 +304,8 @@ message HeartbeatTableData { bytes node_id = 1; // Resource capacity currently available on this node manager. map resources_available = 2; - // Indicates whether avaialbe resources is changed. Only used when - // light heartbeat enabled. + // Indicates whether available resources is changed. Only used when light + // heartbeat enabled. bool resources_available_changed = 3; // Total resource capacity configured for this node manager. map resources_total = 4; diff --git a/src/ray/raylet/scheduling/cluster_resource_data.cc b/src/ray/raylet/scheduling/cluster_resource_data.cc index 566c2a4d3..cb0214dab 100644 --- a/src/ray/raylet/scheduling/cluster_resource_data.cc +++ b/src/ray/raylet/scheduling/cluster_resource_data.cc @@ -196,6 +196,8 @@ bool NodeResources::operator==(const NodeResources &other) { return true; } +bool NodeResources::operator!=(const NodeResources &other) { return !(*this == other); } + std::string NodeResources::DebugString(StringIdMap string_to_in_map) const { std::stringstream buffer; buffer << " {\n"; diff --git a/src/ray/raylet/scheduling/cluster_resource_data.h b/src/ray/raylet/scheduling/cluster_resource_data.h index 9c769ecf4..96b4c4359 100644 --- a/src/ray/raylet/scheduling/cluster_resource_data.h +++ b/src/ray/raylet/scheduling/cluster_resource_data.h @@ -161,6 +161,7 @@ class NodeResources { absl::flat_hash_map custom_resources; /// Returns if this equals another node resources. bool operator==(const NodeResources &other); + bool operator!=(const NodeResources &other); /// Returns human-readable string for these resources. std::string DebugString(StringIdMap string_to_int_map) const; }; diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 4e180802f..97f00bdd7 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -43,25 +43,6 @@ void ClusterResourceScheduler::AddOrUpdateNode( AddOrUpdateNode(string_to_int_map_.Insert(node_id), node_resources); } -void ClusterResourceScheduler::SetPredefinedResources(const NodeResources &new_resources, - NodeResources *old_resources) { - for (size_t i = 0; i < PredefinedResources_MAX; i++) { - old_resources->predefined_resources[i].total = - new_resources.predefined_resources[i].total; - old_resources->predefined_resources[i].available = - new_resources.predefined_resources[i].available; - } -} - -void ClusterResourceScheduler::SetCustomResources( - const absl::flat_hash_map &new_custom_resources, - absl::flat_hash_map *old_custom_resources) { - old_custom_resources->clear(); - for (auto &elem : new_custom_resources) { - old_custom_resources->insert(elem); - } -} - void ClusterResourceScheduler::AddOrUpdateNode(int64_t node_id, const NodeResources &node_resources) { auto it = nodes_.find(node_id); @@ -70,9 +51,7 @@ void ClusterResourceScheduler::AddOrUpdateNode(int64_t node_id, nodes_.emplace(node_id, node_resources); } else { // This node exists, so update its resources. - NodeResources &resources = it->second; - SetPredefinedResources(node_resources, &resources); - SetCustomResources(node_resources.custom_resources, &resources.custom_resources); + it->second = Node(node_resources); } } @@ -82,7 +61,6 @@ bool ClusterResourceScheduler::RemoveNode(int64_t node_id) { // Node not found. return false; } else { - it->second.custom_resources.clear(); nodes_.erase(it); string_to_int_map_.Remove(node_id); return true; @@ -216,7 +194,8 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task // the local node only if there are zero violations. const auto local_node_it = nodes_.find(local_node_id_); if (local_node_it != nodes_.end()) { - if (IsSchedulable(task_req, local_node_it->first, local_node_it->second) == 0) { + if (IsSchedulable(task_req, local_node_it->first, + local_node_it->second.GetLocalView()) == 0) { return local_node_id_; } } @@ -226,20 +205,21 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task for (const auto &task_req_placement_hint : task_req.placement_hints) { auto it = nodes_.find(task_req_placement_hint); if (it != nodes_.end()) { - if (IsSchedulable(task_req, it->first, it->second) == 0) { + if (IsSchedulable(task_req, it->first, it->second.GetLocalView()) == 0) { return it->first; } } } - bool local_node_feasible = IsFeasible(task_req, local_node_it->second); + bool local_node_feasible = IsFeasible(task_req, local_node_it->second.GetLocalView()); for (const auto &node : nodes_) { // Return -1 if node not schedulable. otherwise return the number // of soft constraint violations. - int64_t violations = IsSchedulable(task_req, node.first, node.second); + int64_t violations = IsSchedulable(task_req, node.first, node.second.GetLocalView()); if (violations == -1) { - if (!local_node_feasible && best_node == -1 && IsFeasible(task_req, node.second)) { + if (!local_node_feasible && best_node == -1 && + IsFeasible(task_req, node.second.GetLocalView())) { // If the local node is not feasible, and a better node has not yet // been found, and this node does not currently have the resources // available but is feasible, then schedule to this node. @@ -283,30 +263,32 @@ std::string ClusterResourceScheduler::GetBestSchedulableNode( return string_to_int_map_.Get(node_id); } -bool ClusterResourceScheduler::SubtractNodeAvailableResources( +bool ClusterResourceScheduler::SubtractRemoteNodeAvailableResources( int64_t node_id, const TaskRequest &task_req) { + RAY_CHECK(node_id != local_node_id_); + auto it = nodes_.find(node_id); if (it == nodes_.end()) { return false; } - NodeResources &resources = it->second; + NodeResources *resources = it->second.GetMutableLocalView(); // Just double check this node can still schedule the task request. - if (IsSchedulable(task_req, node_id, resources) == -1) { + if (IsSchedulable(task_req, node_id, *resources) == -1) { return false; } FixedPoint zero(0.); for (size_t i = 0; i < PredefinedResources_MAX; i++) { - resources.predefined_resources[i].available = - std::max(FixedPoint(0), resources.predefined_resources[i].available - + resources->predefined_resources[i].available = + std::max(FixedPoint(0), resources->predefined_resources[i].available - task_req.predefined_resources[i].demand); } for (const auto &task_req_custom_resource : task_req.custom_resources) { - auto it = resources.custom_resources.find(task_req_custom_resource.id); - if (it != resources.custom_resources.end()) { + auto it = resources->custom_resources.find(task_req_custom_resource.id); + if (it != resources->custom_resources.end()) { it->second.available = std::max(FixedPoint(0), it->second.available - task_req_custom_resource.demand); } @@ -314,50 +296,11 @@ bool ClusterResourceScheduler::SubtractNodeAvailableResources( return true; } -bool ClusterResourceScheduler::SubtractNodeAvailableResources( - const std::string &node_id, - const std::unordered_map &resource_map) { - TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, resource_map); - return SubtractNodeAvailableResources(string_to_int_map_.Get(node_id), task_request); -} - -bool ClusterResourceScheduler::AddNodeAvailableResources(int64_t node_id, - const TaskRequest &task_req) { - auto it = nodes_.find(node_id); - if (it == nodes_.end()) { - return false; - } - NodeResources &resources = it->second; - - for (size_t i = 0; i < PredefinedResources_MAX; i++) { - resources.predefined_resources[i].available = - std::min(resources.predefined_resources[i].available + - task_req.predefined_resources[i].demand, - resources.predefined_resources[i].total); - } - - for (const auto &task_req_custom_resource : task_req.custom_resources) { - auto it = resources.custom_resources.find(task_req_custom_resource.id); - if (it != resources.custom_resources.end()) { - it->second.available = std::min( - it->second.available + task_req_custom_resource.demand, it->second.total); - } - } - return true; -} - -bool ClusterResourceScheduler::AddNodeAvailableResources( - const std::string &node_id, - const std::unordered_map &resource_map) { - TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, resource_map); - return AddNodeAvailableResources(string_to_int_map_.Get(node_id), task_request); -} - bool ClusterResourceScheduler::GetNodeResources(int64_t node_id, NodeResources *ret_resources) const { auto it = nodes_.find(node_id); if (it != nodes_.end()) { - *ret_resources = it->second; + *ret_resources = it->second.GetLocalView(); return true; } else { return false; @@ -376,7 +319,10 @@ void ClusterResourceScheduler::AddLocalResource(const std::string &resource_name auto &instances = local_resources_.custom_resources[resource_id]; instances.total[0] += total; instances.available[0] += total; - auto &capacity = nodes_[local_node_id_].custom_resources[resource_id]; + auto local_node_it = nodes_.find(local_node_id_); + RAY_CHECK(local_node_it != nodes_.end()); + auto &capacity = + local_node_it->second.GetMutableLocalView()->custom_resources[resource_id]; capacity.available += total; capacity.total += total; } else { @@ -403,9 +349,7 @@ void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &node_id NodeResources node_resources; node_resources.predefined_resources.resize(PredefinedResources_MAX); node_id = string_to_int_map_.Insert(node_id_string); - RAY_CHECK(nodes_.emplace(node_id, node_resources).second); - it = nodes_.find(node_id); - RAY_CHECK(it != nodes_.end()); + it = nodes_.emplace(node_id, node_resources).first; } int idx = -1; @@ -419,22 +363,23 @@ void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &node_id idx = (int)MEM; }; + auto local_view = it->second.GetMutableLocalView(); FixedPoint resource_total_fp(resource_total); if (idx != -1) { - auto diff_capacity = resource_total_fp - it->second.predefined_resources[idx].total; - it->second.predefined_resources[idx].total += diff_capacity; - it->second.predefined_resources[idx].available += diff_capacity; - if (it->second.predefined_resources[idx].available < 0) { - it->second.predefined_resources[idx].available = 0; + auto diff_capacity = resource_total_fp - local_view->predefined_resources[idx].total; + local_view->predefined_resources[idx].total += diff_capacity; + local_view->predefined_resources[idx].available += diff_capacity; + if (local_view->predefined_resources[idx].available < 0) { + local_view->predefined_resources[idx].available = 0; } - if (it->second.predefined_resources[idx].total < 0) { - it->second.predefined_resources[idx].total = 0; + if (local_view->predefined_resources[idx].total < 0) { + local_view->predefined_resources[idx].total = 0; } } else { string_to_int_map_.Insert(resource_name); int64_t resource_id = string_to_int_map_.Get(resource_name); - auto itr = it->second.custom_resources.find(resource_id); - if (itr != it->second.custom_resources.end()) { + auto itr = local_view->custom_resources.find(resource_id); + if (itr != local_view->custom_resources.end()) { auto diff_capacity = resource_total_fp - itr->second.total; itr->second.total += diff_capacity; itr->second.available += diff_capacity; @@ -447,7 +392,7 @@ void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &node_id } else { ResourceCapacity resource_capacity; resource_capacity.total = resource_capacity.available = resource_total_fp; - it->second.custom_resources.emplace(resource_id, resource_capacity); + local_view->custom_resources.emplace(resource_id, resource_capacity); } } } @@ -474,8 +419,9 @@ void ClusterResourceScheduler::DeleteResource(const std::string &node_id_string, } else if (resource_name == ray::kMemory_ResourceLabel) { idx = (int)MEM; }; + auto local_view = it->second.GetMutableLocalView(); if (idx != -1) { - it->second.predefined_resources[idx].total = 0; + local_view->predefined_resources[idx].total = 0; if (node_id == local_node_id_) { local_resources_.predefined_resources[idx].total.clear(); @@ -483,10 +429,10 @@ void ClusterResourceScheduler::DeleteResource(const std::string &node_id_string, } } else { int64_t resource_id = string_to_int_map_.Get(resource_name); - auto itr = it->second.custom_resources.find(resource_id); - if (itr != it->second.custom_resources.end()) { + auto itr = local_view->custom_resources.find(resource_id); + if (itr != local_view->custom_resources.end()) { string_to_int_map_.Remove(resource_id); - it->second.custom_resources.erase(itr); + local_view->custom_resources.erase(itr); } if (node_id == local_node_id_) { @@ -502,7 +448,7 @@ std::string ClusterResourceScheduler::DebugString(void) const { buffer << " Local resources: " << local_resources_.DebugString(string_to_int_map_); for (auto &node : nodes_) { buffer << "node id: " << node.first; - buffer << node.second.DebugString(string_to_int_map_); + buffer << node.second.GetLocalView().DebugString(string_to_int_map_); } return buffer.str(); } @@ -720,16 +666,17 @@ void ClusterResourceScheduler::UpdateLocalAvailableResourcesFromResourceInstance auto it_local_node = nodes_.find(local_node_id_); RAY_CHECK(it_local_node != nodes_.end()); + auto local_view = it_local_node->second.GetMutableLocalView(); for (size_t i = 0; i < PredefinedResources_MAX; i++) { - it_local_node->second.predefined_resources[i].available = 0; + local_view->predefined_resources[i].available = 0; for (size_t j = 0; j < local_resources_.predefined_resources[i].available.size(); j++) { - it_local_node->second.predefined_resources[i].available += + local_view->predefined_resources[i].available += local_resources_.predefined_resources[i].available[j]; } } - for (auto &custom_resource : it_local_node->second.custom_resources) { + for (auto &custom_resource : local_view->custom_resources) { auto it = local_resources_.custom_resources.find(custom_resource.first); if (it != local_resources_.custom_resources.end()) { custom_resource.second.available = 0; @@ -825,19 +772,12 @@ std::vector ClusterResourceScheduler::SubtractGPUResourceInstances( return VectorFixedPointToVectorDouble(underflow); } -bool ClusterResourceScheduler::AllocateTaskResources( - int64_t node_id, const TaskRequest &task_req, +bool ClusterResourceScheduler::AllocateLocalTaskResources( + const TaskRequest &task_request, std::shared_ptr task_allocation) { - if (node_id == local_node_id_) { - RAY_CHECK(task_allocation != nullptr); - if (AllocateTaskResourceInstances(task_req, task_allocation)) { - UpdateLocalAvailableResourcesFromResourceInstances(); - return true; - } - } else { - if (SubtractNodeAvailableResources(node_id, task_req)) { - return true; - } + if (AllocateTaskResourceInstances(task_request, task_allocation)) { + UpdateLocalAvailableResourcesFromResourceInstances(); + return true; } return false; } @@ -847,7 +787,7 @@ bool ClusterResourceScheduler::AllocateLocalTaskResources( std::shared_ptr task_allocation) { RAY_CHECK(task_allocation != nullptr); TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, task_resources); - return AllocateTaskResources(local_node_id_, task_request, task_allocation); + return AllocateLocalTaskResources(task_request, task_allocation); } std::string ClusterResourceScheduler::GetResourceNameFromIndex(int64_t res_idx) { @@ -864,13 +804,13 @@ std::string ClusterResourceScheduler::GetResourceNameFromIndex(int64_t res_idx) } } -void ClusterResourceScheduler::AllocateRemoteTaskResources( +bool ClusterResourceScheduler::AllocateRemoteTaskResources( const std::string &node_string, const std::unordered_map &task_resources) { TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, task_resources); auto node_id = string_to_int_map_.Insert(node_string); RAY_CHECK(node_id != local_node_id_); - AllocateTaskResources(node_id, task_request, nullptr); + return SubtractRemoteNodeAvailableResources(node_id, task_request); } void ClusterResourceScheduler::FreeLocalTaskResources( @@ -890,10 +830,8 @@ void ClusterResourceScheduler::Heartbeat( << "Error: Populating heartbeat failed. Please file a bug report: " "https://github.com/ray-project/ray/issues/new."; - if (light_heartbeat_enabled && last_report_resources_ && - resources == *last_report_resources_.get()) { - return; - } else { + if (!light_heartbeat_enabled || !last_report_resources_ || + resources != *last_report_resources_.get()) { for (int i = 0; i < PredefinedResources_MAX; i++) { const auto &label = ResourceEnumToString((PredefinedResources)i); const auto &capacity = resources.predefined_resources[i]; @@ -923,6 +861,18 @@ void ClusterResourceScheduler::Heartbeat( last_report_resources_.reset(new NodeResources(resources)); } } + + if (light_heartbeat_enabled) { + // Reset all local views for remote nodes. This is needed in case tasks that + // we spilled back to a remote node were not actually scheduled on the + // node. Then, the remote node's resource availability may not change and + // so it may not send us another update. + for (auto &node : nodes_) { + if (node.first != local_node_id_) { + node.second.ResetLocalView(); + } + } + } } } // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index 1adb2aa70..5984d444f 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -39,46 +39,6 @@ static std::unordered_set UnitInstanceResources{CPU, GPU, TPU}; /// tasks to nodes based on the task's constraints and the available /// resources at those nodes. class ClusterResourceScheduler { - /// List of nodes in the clusters and their resources organized as a map. - /// The key of the map is the node ID. - absl::flat_hash_map nodes_; - /// Identifier of local node. - int64_t local_node_id_; - /// Resources of local node. - NodeResourceInstances local_resources_; - /// Keep the mapping between node and resource IDs in string representation - /// to integer representation. Used for improving map performance. - StringIdMap string_to_int_map_; - /// Cached resources, used to compare with newest one in light heartbeat mode. - std::unique_ptr last_report_resources_; - - /// Set predefined resources. - /// - /// \param[in] new_resources: New predefined resources. - /// \param[out] old_resources: Predefined resources to be updated. - void SetPredefinedResources(const NodeResources &new_resources, - NodeResources *old_resources); - /// Set custom resources. - /// - /// \param[in] new_resources: New custom resources. - /// \param[out] old_resources: Custom resources to be updated. - void SetCustomResources( - const absl::flat_hash_map &new_custom_resources, - absl::flat_hash_map *old_custom_resources); - - /// Subtract the resources required by a given task request (task_req) from - /// a given node (node_id). - /// - /// \param node_id Node whose resources we allocate. Can be the local or a remote node. - /// \param task_req Task for which we allocate resources. - /// \param task_allocation Resources allocated to the task at instance granularity. - /// This is a return parameter. - /// - /// \return True if the node has enough resources to satisfy the task request. - /// False otherwise. - bool AllocateTaskResources(int64_t node_id, const TaskRequest &task_req, - std::shared_ptr task_allocation); - public: ClusterResourceScheduler(void){}; @@ -182,32 +142,6 @@ class ClusterResourceScheduler { const std::unordered_map &task_request, bool actor_creation, int64_t *violations); - /// Decrease the available resources of a node when a task request is - /// scheduled on the given node. - /// - /// \param node_id: ID of node on which request is being scheduled. - /// \param task_req: task request being scheduled. - /// - /// \return true, if task_req can be indeed scheduled on the node, - /// and false otherwise. - bool SubtractNodeAvailableResources(int64_t node_id, const TaskRequest &task_request); - bool SubtractNodeAvailableResources( - const std::string &node_id, - const std::unordered_map &task_request); - - /// Increase available resources of a node when a worker has finished - /// a task. - /// - /// \param node_id: ID of node on which request is being scheduled. - /// \param task_request: resource requests of the task finishing execution. - /// - /// \return true, if task_req can be indeed scheduled on the node, - /// and false otherwise. - bool AddNodeAvailableResources(int64_t node_id, const TaskRequest &task_request); - bool AddNodeAvailableResources( - const std::string &node_id, - const std::unordered_map &task_request); - /// Return resources associated to the given node_id in ret_resources. /// If node_id not found, return false; otherwise return true. bool GetNodeResources(int64_t node_id, NodeResources *ret_resources) const; @@ -380,12 +314,17 @@ class ClusterResourceScheduler { const std::unordered_map &task_resources, std::shared_ptr task_allocation); + bool AllocateLocalTaskResources(const TaskRequest &task_request, + std::shared_ptr task_allocation); + /// Subtract the resources required by a given task request (task_req) from a given /// remote node. /// /// \param node_id Remote node whose resources we allocate. /// \param task_req Task for which we allocate resources. - void AllocateRemoteTaskResources( + /// \return True if remote node has enough resources to satisfy the task request. + /// False otherwise. + bool AllocateRemoteTaskResources( const std::string &node_id, const std::unordered_map &task_resources); @@ -413,6 +352,57 @@ class ClusterResourceScheduler { /// Return human-readable string for this scheduler state. std::string DebugString() const; + + private: + struct Node { + Node(const NodeResources &resources) + : last_reported_(resources), local_view_(resources) {} + + void ResetLocalView() { local_view_ = last_reported_; } + + NodeResources *GetMutableLocalView() { return &local_view_; } + + const NodeResources &GetLocalView() const { return local_view_; } + + private: + /// The resource information according to the last heartbeat reported by + /// this node. + /// NOTE(swang): For the local node, this field should be ignored because + /// we do not receive heartbeats from ourselves and the local view is + /// therefore always the most up-to-date. + NodeResources last_reported_; + /// Our local view of the remote node's resources. This may be dirty + /// because it includes any resource requests that we allocated to this + /// node through spillback since our last heartbeat tick. This view will + /// get overwritten by the last reported view on each heartbeat tick, to + /// make sure that our local view does not skew too much from the actual + /// resources when light heartbeats are enabled. + NodeResources local_view_; + }; + + /// Decrease the available resources of a node when a task request is + /// scheduled on the given node. + /// + /// \param node_id: ID of node on which request is being scheduled. + /// \param task_req: task request being scheduled. + /// + /// \return true, if task_req can be indeed scheduled on the node, + /// and false otherwise. + bool SubtractRemoteNodeAvailableResources(int64_t node_id, + const TaskRequest &task_request); + + /// List of nodes in the clusters and their resources organized as a map. + /// The key of the map is the node ID. + absl::flat_hash_map nodes_; + /// Identifier of local node. + int64_t local_node_id_; + /// Resources of local node. + NodeResourceInstances local_resources_; + /// Keep the mapping between node and resource IDs in string representation + /// to integer representation. Used for improving map performance. + StringIdMap string_to_int_map_; + /// Cached resources, used to compare with newest one in light heartbeat mode. + std::unique_ptr last_report_resources_; }; } // end namespace ray diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc index f0edc6358..5de3172fc 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc @@ -29,6 +29,21 @@ using namespace std; +#define ASSERT_RESOURCES_EQ(data, expected_available, expected_total) \ + { \ + auto available = data->resources_available(); \ + ASSERT_EQ(available[kCPU_ResourceLabel], expected_available); \ + auto total = data->resources_total(); \ + ASSERT_EQ(total[kCPU_ResourceLabel], expected_total); \ + } + +#define ASSERT_RESOURCES_EMPTY(data) \ + { \ + ASSERT_FALSE(data->resources_available_changed()); \ + ASSERT_TRUE(data->resources_available().empty()); \ + ASSERT_TRUE(data->resources_total().empty()); \ + } + namespace ray { // Used to path empty vector argiuments. vector EmptyIntVector; @@ -334,11 +349,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) { int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, false, &violations); ASSERT_TRUE(node_id != -1); + ASSERT_EQ(node_id, 1); ASSERT_TRUE(violations > 0); NodeResources nr1, nr2; ASSERT_TRUE(cluster_resources.GetNodeResources(node_id, &nr1)); - cluster_resources.SubtractNodeAvailableResources(node_id, task_req); + auto task_allocation = std::make_shared(); + ASSERT_TRUE(cluster_resources.AllocateLocalTaskResources(task_req, task_allocation)); ASSERT_TRUE(cluster_resources.GetNodeResources(node_id, &nr2)); for (size_t i = 0; i < PRED_CUSTOM_LEN; i++) { @@ -1085,6 +1102,79 @@ TEST_F(ClusterResourceSchedulerTest, HeartbeatTest) { } } +TEST_F(ClusterResourceSchedulerTest, TestLightHeartbeat) { + std::unordered_map initial_resources({{"CPU", 1}}); + ClusterResourceScheduler cluster_resources("local", initial_resources); + + // Report heartbeat on initialization. + auto data = std::make_shared(); + cluster_resources.Heartbeat(true, data); + ASSERT_RESOURCES_EQ(data, 1, 1); + + // Don't report heartbeats if resource availability hasn't changed. + for (int i = 0; i < 3; i++) { + data->Clear(); + cluster_resources.Heartbeat(true, data); + ASSERT_RESOURCES_EMPTY(data); + } + + // Report heartbeat if resource availability has changed. + cluster_resources.AddOrUpdateNode("local", {{"CPU", 1.}}, {{"CPU", 0.}}); + data->Clear(); + cluster_resources.Heartbeat(true, data); + ASSERT_RESOURCES_EQ(data, 0, 1); + + // Don't report heartbeats if resource availability hasn't changed. + for (int i = 0; i < 3; i++) { + data->Clear(); + cluster_resources.Heartbeat(true, data); + ASSERT_RESOURCES_EMPTY(data); + } +} + +TEST_F(ClusterResourceSchedulerTest, TestDirtyLocalView) { + std::unordered_map initial_resources({{"CPU", 1}}); + ClusterResourceScheduler cluster_resources("local", initial_resources); + cluster_resources.AddOrUpdateNode("remote", {{"CPU", 2.}}, {{"CPU", 2.}}); + const std::unordered_map task_spec = {{"CPU", 1.}}; + + // Allocate local resources to force tasks onto the remote node when + // resources are available. + std::shared_ptr task_allocation = + std::make_shared(); + ASSERT_TRUE(cluster_resources.AllocateLocalTaskResources(task_spec, task_allocation)); + task_allocation = std::make_shared(); + ASSERT_FALSE(cluster_resources.AllocateLocalTaskResources(task_spec, task_allocation)); + // View of local resources is not affected by heartbeats. + auto data = std::make_shared(); + cluster_resources.Heartbeat(true, data); + ASSERT_FALSE(cluster_resources.AllocateLocalTaskResources(task_spec, task_allocation)); + + for (int num_slots_available = 0; num_slots_available <= 2; num_slots_available++) { + // Remote node reports updated resource availability. + cluster_resources.AddOrUpdateNode("remote", {{"CPU", 2.}}, + {{"CPU", num_slots_available}}); + auto data = std::make_shared(); + int64_t t; + for (int i = 0; i < 3; i++) { + // Heartbeat tick should reset the remote node's resources. + cluster_resources.Heartbeat(true, data); + for (int j = 0; j < num_slots_available; j++) { + ASSERT_EQ(cluster_resources.GetBestSchedulableNode(task_spec, false, &t), + "remote"); + // Allocate remote resources. + ASSERT_TRUE(cluster_resources.AllocateRemoteTaskResources("remote", task_spec)); + } + // Our local view says there are not enough resources on the remote node to + // schedule another task. + ASSERT_EQ(cluster_resources.GetBestSchedulableNode(task_spec, false, &t), ""); + ASSERT_FALSE( + cluster_resources.AllocateLocalTaskResources(task_spec, task_allocation)); + ASSERT_FALSE(cluster_resources.AllocateRemoteTaskResources("remote", task_spec)); + } + } +} + TEST_F(ClusterResourceSchedulerTest, DynamicResourceTest) { ClusterResourceScheduler cluster_resources("local", {{"CPU", 2}}); diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index a62eaabba..64e40b457 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -411,8 +411,12 @@ void ClusterTaskManager::Dispatch( void ClusterTaskManager::Spillback(const NodeID &spillback_to, const Work &work) { const auto &task_spec = std::get<0>(work).GetTaskSpecification(); RAY_LOG(DEBUG) << "Spilling task " << task_spec.TaskId() << " to node " << spillback_to; - cluster_resource_scheduler_->AllocateRemoteTaskResources( - spillback_to.Binary(), task_spec.GetRequiredResources().GetResourceMap()); + + if (!cluster_resource_scheduler_->AllocateRemoteTaskResources( + spillback_to.Binary(), task_spec.GetRequiredResources().GetResourceMap())) { + RAY_LOG(INFO) << "Tried to allocate resources for request " << task_spec.TaskId() + << " on a remote node that are no longer available"; + } auto node_info_opt = get_node_info_(spillback_to); RAY_CHECK(node_info_opt)