[New scheduler] Cluster Resource Scheduler dynamic resources (for placement groups) (#12518)

* prepare implemented

* dynamic resources

* .

* commit

* .

* .

* Still needs to be cleaned up

* Passes basic tests + cleanup

* .

* .

* .

* Apply suggestions from code review

Co-authored-by: SangBin Cho <rkooo567@gmail.com>

* fix

* lint

Co-authored-by: Alex <alex@anyscale.com>
Co-authored-by: SangBin Cho <rkooo567@gmail.com>
This commit is contained in:
Alex Wu
2020-12-09 12:05:31 -08:00
committed by GitHub
parent ef9ebbc636
commit 0b6e44efb8
4 changed files with 83 additions and 2 deletions
@@ -83,7 +83,8 @@ TaskRequest ResourceMapToTaskRequest(
} else if (resource.first == ray::kMemory_ResourceLabel) {
task_request.predefined_resources[MEM].demand = resource.second;
} else {
task_request.custom_resources[i].id = string_to_int_map.Insert(resource.first);
string_to_int_map.Insert(resource.first);
task_request.custom_resources[i].id = string_to_int_map.Get(resource.first);
task_request.custom_resources[i].demand = resource.second;
task_request.custom_resources[i].soft = false;
i++;
@@ -366,6 +366,33 @@ bool ClusterResourceScheduler::GetNodeResources(int64_t node_id,
int64_t ClusterResourceScheduler::NumNodes() { return nodes_.size(); }
void ClusterResourceScheduler::AddLocalResource(const std::string &resource_name,
double resource_total) {
string_to_int_map_.Insert(resource_name);
int64_t resource_id = string_to_int_map_.Get(resource_name);
if (local_resources_.custom_resources.contains(resource_id)) {
FixedPoint total(resource_total);
auto &instances = local_resources_.custom_resources[resource_id];
instances.total[0] += total;
instances.available[0] += total;
auto &capacity = nodes_[local_node_id_].custom_resources[resource_id];
capacity.available += total;
capacity.total += total;
} else {
ResourceInstanceCapacities capacity;
capacity.total.resize(1);
capacity.total[0] = resource_total;
capacity.available.resize(1);
capacity.available[0] = resource_total;
local_resources_.custom_resources.emplace(resource_id, capacity);
std::string node_id_string = string_to_int_map_.Get(local_node_id_);
RAY_CHECK(string_to_int_map_.Get(node_id_string) == local_node_id_);
UpdateResourceCapacity(node_id_string, resource_name, resource_total);
UpdateLocalAvailableResourcesFromResourceInstances();
}
}
void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &node_id_string,
const std::string &resource_name,
double resource_total) {
@@ -404,7 +431,8 @@ void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &node_id
it->second.predefined_resources[idx].total = 0;
}
} else {
int64_t resource_id = string_to_int_map_.Insert(resource_name);
string_to_int_map_.Insert(resource_name);
int64_t resource_id = string_to_int_map_.Get(resource_name);
auto itr = it->second.custom_resources.find(resource_id);
if (itr != it->second.custom_resources.end()) {
auto diff_capacity = resource_total_fp - itr->second.total;
@@ -424,6 +452,10 @@ void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &node_id
}
}
void ClusterResourceScheduler::DeleteLocalResource(const std::string &resource_name) {
DeleteResource(string_to_int_map_.Get(local_node_id_), resource_name);
}
void ClusterResourceScheduler::DeleteResource(const std::string &node_id_string,
const std::string &resource_name) {
int64_t node_id = string_to_int_map_.Get(node_id_string);
@@ -444,6 +476,11 @@ void ClusterResourceScheduler::DeleteResource(const std::string &node_id_string,
};
if (idx != -1) {
it->second.predefined_resources[idx].total = 0;
if (node_id == local_node_id_) {
local_resources_.predefined_resources[idx].total.clear();
local_resources_.predefined_resources[idx].available.clear();
}
} else {
int64_t resource_id = string_to_int_map_.Get(resource_name);
auto itr = it->second.custom_resources.find(resource_id);
@@ -451,6 +488,11 @@ void ClusterResourceScheduler::DeleteResource(const std::string &node_id_string,
string_to_int_map_.Remove(resource_id);
it->second.custom_resources.erase(itr);
}
if (node_id == local_node_id_) {
local_resources_.custom_resources[resource_id].total.clear();
local_resources_.custom_resources[resource_id].available.clear();
}
}
}
@@ -215,6 +215,12 @@ class ClusterResourceScheduler {
/// Get number of nodes in the cluster.
int64_t NumNodes();
/// Add a local resource that is available.
///
/// \param resource_name: Resource which we want to update.
/// \param resource_total: New capacity of the resource.
void AddLocalResource(const std::string &resource_name, double resource_total);
/// Update total capacity of a given resource of a given node.
///
/// \param node_name: Node whose resource we want to update.
@@ -223,6 +229,11 @@ class ClusterResourceScheduler {
void UpdateResourceCapacity(const std::string &node_name,
const std::string &resource_name, double resource_total);
/// Delete a given resource from the local node.
///
/// \param resource_name: Resource we want to delete
void DeleteLocalResource(const std::string &resource_name);
/// Delete a given resource from a given node.
///
/// \param node_name: Node whose resource we want to delete.
@@ -1085,6 +1085,33 @@ TEST_F(ClusterResourceSchedulerTest, HeartbeatTest) {
}
}
TEST_F(ClusterResourceSchedulerTest, DynamicResourceTest) {
ClusterResourceScheduler cluster_resources("local", {{"CPU", 2}});
std::unordered_map<std::string, double> task_request = {{"CPU", 1}, {"custom123", 2}};
int64_t t;
std::string result = cluster_resources.GetBestSchedulableNode(task_request, false, &t);
ASSERT_TRUE(result.empty());
cluster_resources.AddLocalResource("custom123", 5);
result = cluster_resources.GetBestSchedulableNode(task_request, false, &t);
ASSERT_FALSE(result.empty());
task_request["custom123"] = 6;
result = cluster_resources.GetBestSchedulableNode(task_request, false, &t);
ASSERT_TRUE(result.empty());
cluster_resources.AddLocalResource("custom123", 5);
result = cluster_resources.GetBestSchedulableNode(task_request, false, &t);
ASSERT_FALSE(result.empty());
cluster_resources.DeleteLocalResource("custom123");
result = cluster_resources.GetBestSchedulableNode(task_request, false, &t);
ASSERT_TRUE(result.empty());
}
} // namespace ray
int main(int argc, char **argv) {