diff --git a/src/ray/raylet/scheduling/cluster_resource_data.cc b/src/ray/raylet/scheduling/cluster_resource_data.cc index 91856c5b8..566c2a4d3 100644 --- a/src/ray/raylet/scheduling/cluster_resource_data.cc +++ b/src/ray/raylet/scheduling/cluster_resource_data.cc @@ -83,7 +83,8 @@ TaskRequest ResourceMapToTaskRequest( } else if (resource.first == ray::kMemory_ResourceLabel) { task_request.predefined_resources[MEM].demand = resource.second; } else { - task_request.custom_resources[i].id = string_to_int_map.Insert(resource.first); + string_to_int_map.Insert(resource.first); + task_request.custom_resources[i].id = string_to_int_map.Get(resource.first); task_request.custom_resources[i].demand = resource.second; task_request.custom_resources[i].soft = false; i++; diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 6b53f212b..4e180802f 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -366,6 +366,33 @@ bool ClusterResourceScheduler::GetNodeResources(int64_t node_id, int64_t ClusterResourceScheduler::NumNodes() { return nodes_.size(); } +void ClusterResourceScheduler::AddLocalResource(const std::string &resource_name, + double resource_total) { + string_to_int_map_.Insert(resource_name); + int64_t resource_id = string_to_int_map_.Get(resource_name); + + if (local_resources_.custom_resources.contains(resource_id)) { + FixedPoint total(resource_total); + auto &instances = local_resources_.custom_resources[resource_id]; + instances.total[0] += total; + instances.available[0] += total; + auto &capacity = nodes_[local_node_id_].custom_resources[resource_id]; + capacity.available += total; + capacity.total += total; + } else { + ResourceInstanceCapacities capacity; + capacity.total.resize(1); + capacity.total[0] = resource_total; + capacity.available.resize(1); + capacity.available[0] = resource_total; + local_resources_.custom_resources.emplace(resource_id, capacity); + std::string node_id_string = string_to_int_map_.Get(local_node_id_); + RAY_CHECK(string_to_int_map_.Get(node_id_string) == local_node_id_); + UpdateResourceCapacity(node_id_string, resource_name, resource_total); + UpdateLocalAvailableResourcesFromResourceInstances(); + } +} + void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &node_id_string, const std::string &resource_name, double resource_total) { @@ -404,7 +431,8 @@ void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &node_id it->second.predefined_resources[idx].total = 0; } } else { - int64_t resource_id = string_to_int_map_.Insert(resource_name); + string_to_int_map_.Insert(resource_name); + int64_t resource_id = string_to_int_map_.Get(resource_name); auto itr = it->second.custom_resources.find(resource_id); if (itr != it->second.custom_resources.end()) { auto diff_capacity = resource_total_fp - itr->second.total; @@ -424,6 +452,10 @@ void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &node_id } } +void ClusterResourceScheduler::DeleteLocalResource(const std::string &resource_name) { + DeleteResource(string_to_int_map_.Get(local_node_id_), resource_name); +} + void ClusterResourceScheduler::DeleteResource(const std::string &node_id_string, const std::string &resource_name) { int64_t node_id = string_to_int_map_.Get(node_id_string); @@ -444,6 +476,11 @@ void ClusterResourceScheduler::DeleteResource(const std::string &node_id_string, }; if (idx != -1) { it->second.predefined_resources[idx].total = 0; + + if (node_id == local_node_id_) { + local_resources_.predefined_resources[idx].total.clear(); + local_resources_.predefined_resources[idx].available.clear(); + } } else { int64_t resource_id = string_to_int_map_.Get(resource_name); auto itr = it->second.custom_resources.find(resource_id); @@ -451,6 +488,11 @@ void ClusterResourceScheduler::DeleteResource(const std::string &node_id_string, string_to_int_map_.Remove(resource_id); it->second.custom_resources.erase(itr); } + + if (node_id == local_node_id_) { + local_resources_.custom_resources[resource_id].total.clear(); + local_resources_.custom_resources[resource_id].available.clear(); + } } } diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index f8ef7307e..1adb2aa70 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -215,6 +215,12 @@ class ClusterResourceScheduler { /// Get number of nodes in the cluster. int64_t NumNodes(); + /// Add a local resource that is available. + /// + /// \param resource_name: Resource which we want to update. + /// \param resource_total: New capacity of the resource. + void AddLocalResource(const std::string &resource_name, double resource_total); + /// Update total capacity of a given resource of a given node. /// /// \param node_name: Node whose resource we want to update. @@ -223,6 +229,11 @@ class ClusterResourceScheduler { void UpdateResourceCapacity(const std::string &node_name, const std::string &resource_name, double resource_total); + /// Delete a given resource from the local node. + /// + /// \param resource_name: Resource we want to delete + void DeleteLocalResource(const std::string &resource_name); + /// Delete a given resource from a given node. /// /// \param node_name: Node whose resource we want to delete. diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc index fbd24185e..f0edc6358 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc @@ -1085,6 +1085,33 @@ TEST_F(ClusterResourceSchedulerTest, HeartbeatTest) { } } +TEST_F(ClusterResourceSchedulerTest, DynamicResourceTest) { + ClusterResourceScheduler cluster_resources("local", {{"CPU", 2}}); + + std::unordered_map task_request = {{"CPU", 1}, {"custom123", 2}}; + int64_t t; + + std::string result = cluster_resources.GetBestSchedulableNode(task_request, false, &t); + ASSERT_TRUE(result.empty()); + + cluster_resources.AddLocalResource("custom123", 5); + + result = cluster_resources.GetBestSchedulableNode(task_request, false, &t); + ASSERT_FALSE(result.empty()); + + task_request["custom123"] = 6; + result = cluster_resources.GetBestSchedulableNode(task_request, false, &t); + ASSERT_TRUE(result.empty()); + + cluster_resources.AddLocalResource("custom123", 5); + result = cluster_resources.GetBestSchedulableNode(task_request, false, &t); + ASSERT_FALSE(result.empty()); + + cluster_resources.DeleteLocalResource("custom123"); + result = cluster_resources.GetBestSchedulableNode(task_request, false, &t); + ASSERT_TRUE(result.empty()); +} + } // namespace ray int main(int argc, char **argv) {