diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 42b976c7e..06eaf5ad3 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -671,6 +671,15 @@ void NodeManager::NodeRemoved(const GcsNodeInfo &node_info) { return; } + // Remove the client from the resource map. + if (new_scheduler_enabled_) { + if (!new_resource_scheduler_->RemoveNode(node_id.Binary())) { + RAY_LOG(DEBUG) << "Received NodeRemoved callback for an unknown node: " << node_id + << "."; + return; + } + } + // Remove the node manager client. const auto client_entry = remote_node_manager_clients_.find(node_id); if (client_entry != remote_node_manager_clients_.end()) { diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 87d43ca1b..92405e26b 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -494,6 +494,15 @@ bool ClusterResourceScheduler::RemoveNode(int64_t node_id) { } } +bool ClusterResourceScheduler::RemoveNode(const std::string &node_id_string) { + auto node_id = string_to_int_map_.Get(node_id_string); + if (node_id == -1) { + return false; + } + + return RemoveNode(node_id); +} + int64_t ClusterResourceScheduler::IsSchedulable(const TaskRequest &task_req, int64_t node_id, const NodeResources &resources) { @@ -929,7 +938,7 @@ bool ClusterResourceScheduler::AllocateResourceInstances( (*allocation)[i] = remaining_demand; return true; } else { - (*allocation)[i] = available[i]; + (*allocation)[i] += available[i]; remaining_demand -= available[i]; available[i] = 0; } diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index 7d0bcfc33..e6fe3a75e 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -250,6 +250,7 @@ class ClusterResourceScheduler { /// /// \param ID of the node to be removed. bool RemoveNode(int64_t node_id); + bool RemoveNode(const std::string &node_id_string); /// Check whether a task request can be scheduled given a node. /// diff --git a/src/ray/raylet/scheduling/scheduling_test.cc b/src/ray/raylet/scheduling/scheduling_test.cc index c8c964403..d7d9d9802 100644 --- a/src/ray/raylet/scheduling/scheduling_test.cc +++ b/src/ray/raylet/scheduling/scheduling_test.cc @@ -942,6 +942,32 @@ TEST_F(SchedulingTest, UpdateLocalAvailableResourcesFromResourceInstancesTest) { } } +TEST_F(SchedulingTest, TaskResourceInstanceWithHardRequestTest) { + NodeResources node_resources; + vector pred_capacities{4. /* CPU */, 2. /* MEM */, 4. /* GPU */}; + initNodeResources(node_resources, pred_capacities, EmptyIntVector, + EmptyFixedPointVector); + ClusterResourceScheduler cluster_resources(0, node_resources); + + TaskRequest task_req; + vector pred_demands = {2. /* CPU */, 2. /* MEM */, 1.5 /* GPU */}; + vector pred_soft = {false, false, true}; + initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, + EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector); + + std::shared_ptr task_allocation = + std::make_shared(); + bool success = + cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation); + + ASSERT_EQ(success, true); + + vector cpu_instances = task_allocation->GetGPUInstances(); + vector expect_cpu_instance{1., 0.5, 0., 0.}; + + ASSERT_TRUE(EqualVectors(cpu_instances, expect_cpu_instance)); +} + } // namespace ray int main(int argc, char **argv) {