[New scheduler] Fix new scheduler bug (#9467)

* fix new scheduler bug

* add testcase for soft resource allocation

* modify RemoveNode
This commit is contained in:
kisuke95
2020-07-21 04:09:53 +08:00
committed by GitHub
parent f3ef9060e4
commit 4e2e3bd348
4 changed files with 46 additions and 1 deletions
+9
View File
@@ -671,6 +671,15 @@ void NodeManager::NodeRemoved(const GcsNodeInfo &node_info) {
return;
}
// Remove the client from the resource map.
if (new_scheduler_enabled_) {
if (!new_resource_scheduler_->RemoveNode(node_id.Binary())) {
RAY_LOG(DEBUG) << "Received NodeRemoved callback for an unknown node: " << node_id
<< ".";
return;
}
}
// Remove the node manager client.
const auto client_entry = remote_node_manager_clients_.find(node_id);
if (client_entry != remote_node_manager_clients_.end()) {
@@ -494,6 +494,15 @@ bool ClusterResourceScheduler::RemoveNode(int64_t node_id) {
}
}
bool ClusterResourceScheduler::RemoveNode(const std::string &node_id_string) {
auto node_id = string_to_int_map_.Get(node_id_string);
if (node_id == -1) {
return false;
}
return RemoveNode(node_id);
}
int64_t ClusterResourceScheduler::IsSchedulable(const TaskRequest &task_req,
int64_t node_id,
const NodeResources &resources) {
@@ -929,7 +938,7 @@ bool ClusterResourceScheduler::AllocateResourceInstances(
(*allocation)[i] = remaining_demand;
return true;
} else {
(*allocation)[i] = available[i];
(*allocation)[i] += available[i];
remaining_demand -= available[i];
available[i] = 0;
}
@@ -250,6 +250,7 @@ class ClusterResourceScheduler {
///
/// \param ID of the node to be removed.
bool RemoveNode(int64_t node_id);
bool RemoveNode(const std::string &node_id_string);
/// Check whether a task request can be scheduled given a node.
///
@@ -942,6 +942,32 @@ TEST_F(SchedulingTest, UpdateLocalAvailableResourcesFromResourceInstancesTest) {
}
}
TEST_F(SchedulingTest, TaskResourceInstanceWithHardRequestTest) {
NodeResources node_resources;
vector<FixedPoint> pred_capacities{4. /* CPU */, 2. /* MEM */, 4. /* GPU */};
initNodeResources(node_resources, pred_capacities, EmptyIntVector,
EmptyFixedPointVector);
ClusterResourceScheduler cluster_resources(0, node_resources);
TaskRequest task_req;
vector<FixedPoint> pred_demands = {2. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
vector<bool> pred_soft = {false, false, true};
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector,
EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector);
std::shared_ptr<TaskResourceInstances> task_allocation =
std::make_shared<TaskResourceInstances>();
bool success =
cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation);
ASSERT_EQ(success, true);
vector<FixedPoint> cpu_instances = task_allocation->GetGPUInstances();
vector<FixedPoint> expect_cpu_instance{1., 0.5, 0., 0.};
ASSERT_TRUE(EqualVectors(cpu_instances, expect_cpu_instance));
}
} // namespace ray
int main(int argc, char **argv) {