[Core] New scheduler fixes (#9186)

* .

* test_args passes

* .

* test_basic.py::test_many_fractional_resources causes ray to hang

* test_basic.py::test_many_fractional_resources causes ray to hang

* .

* .

* useful

* test_many_fractional_resources fails instead of hanging now :)

* Passes test_fractional_resources

* .

* .

* Some cleanup

* git is hard

* cleanup

* Fixed scheduling tests

* .

* .
This commit is contained in:
Alex Wu
2020-07-09 15:37:51 -07:00
committed by GitHub
parent f0a72ad985
commit 34b85659d4
9 changed files with 327 additions and 161 deletions
+17
View File
@@ -7,6 +7,7 @@ import subprocess
import sys
import time
import socket
import math
import ray
import ray.services
@@ -277,6 +278,22 @@ class Semaphore:
return self._sema.locked()
def dicts_equal(dict1, dict2, abs_tol=1e-4):
"""Compares to dicts whose values may be floating point numbers."""
if dict1.keys() != dict2.keys():
return False
for k, v in dict1.items():
if isinstance(v, float) and \
isinstance(dict2[k], float) and \
math.isclose(v, dict2[k], abs_tol=abs_tol):
continue
if v != dict2[k]:
return False
return True
@ray.remote
def _put(obj):
return obj
+1 -1
View File
@@ -127,7 +127,7 @@ def test_many_fractional_resources(shutdown_only):
}
if block:
ray.get(g.remote())
return true_resources == accepted_resources
return ray.test_utils.dicts_equal(true_resources, accepted_resources)
# Check that the resource are assigned correctly.
result_ids = []
@@ -222,18 +222,33 @@ bool NodeResources::operator==(const NodeResources &other) {
std::string NodeResources::DebugString(StringIdMap string_to_in_map) const {
std::stringstream buffer;
buffer << " {";
for (size_t i = 0; i < static_cast<size_t>(this->predefined_resources.size()); i++) {
buffer << " {\n";
for (size_t i = 0; i < this->predefined_resources.size(); i++) {
buffer << "\t";
switch (i) {
case CPU:
buffer << "CPU: ";
break;
case MEM:
buffer << "MEM: ";
break;
case GPU:
buffer << "GPU: ";
break;
case TPU:
buffer << "TPU: ";
break;
default:
RAY_CHECK(false) << "This should never happen.";
break;
}
buffer << "(" << this->predefined_resources[i].total << ":"
<< this->predefined_resources[i].available << ") ";
<< this->predefined_resources[i].available << ")\n";
}
buffer << "}";
buffer << " {";
for (auto it = this->custom_resources.begin(); it != this->custom_resources.end();
++it) {
buffer << string_to_in_map.Get(it->first) << ":(" << it->second.total << ":"
<< it->second.available << ") ";
buffer << "\t" << string_to_in_map.Get(it->first) << ":(" << it->second.total << ":"
<< it->second.available << ")\n";
}
buffer << "}" << std::endl;
return buffer.str();
@@ -273,18 +288,34 @@ bool NodeResourceInstances::operator==(const NodeResourceInstances &other) {
std::string NodeResourceInstances::DebugString(StringIdMap string_to_int_map) const {
std::stringstream buffer;
buffer << " {";
buffer << "{\n";
for (size_t i = 0; i < this->predefined_resources.size(); i++) {
buffer << "\t";
switch (i) {
case CPU:
buffer << "CPU: ";
break;
case MEM:
buffer << "MEM: ";
break;
case GPU:
buffer << "GPU: ";
break;
case TPU:
buffer << "TPU: ";
break;
default:
RAY_CHECK(false) << "This should never happen.";
break;
}
buffer << "(" << VectorToString(predefined_resources[i].total) << ":"
<< VectorToString(this->predefined_resources[i].available) << ") ";
<< VectorToString(this->predefined_resources[i].available) << ")\n";
}
buffer << "}";
buffer << " {";
for (auto it = this->custom_resources.begin(); it != this->custom_resources.end();
++it) {
buffer << string_to_int_map.Get(it->first) << ":(" << VectorToString(it->second.total)
<< ":" << VectorToString(it->second.available) << ") ";
buffer << "\t" << string_to_int_map.Get(it->first) << ":("
<< VectorToString(it->second.total) << ":"
<< VectorToString(it->second.available) << ")\n";
}
buffer << "}" << std::endl;
return buffer.str();
@@ -763,10 +794,10 @@ void ClusterResourceScheduler::DeleteResource(const std::string &client_id_strin
std::string ClusterResourceScheduler::DebugString(void) const {
std::stringstream buffer;
buffer << "\n Local id: " << local_node_id_;
buffer << "\nLocal id: " << local_node_id_;
buffer << " Local resources: " << local_resources_.DebugString(string_to_int_map_);
for (auto &node : nodes_) {
buffer << " node id: " << node.first;
buffer << "node id: " << node.first;
buffer << node.second.DebugString(string_to_int_map_);
}
return buffer.str();
@@ -907,7 +938,7 @@ bool ClusterResourceScheduler::AllocateResourceInstances(
}
if (remaining_demand >= 1.) {
// Cannot satisfy a demand greater than one if no unit caapcity resource is available.
// Cannot satisfy a demand greater than one if no unit capacity resource is available.
return false;
}
@@ -1028,7 +1059,7 @@ std::vector<double> ClusterResourceScheduler::AddCPUResourceInstances(
VectorDoubleToVectorFixedPoint(cpu_instances);
if (cpu_instances.size() == 0) {
return cpu_instances; // No oveerflow.
return cpu_instances; // No overflow.
}
RAY_CHECK(nodes_.find(local_node_id_) != nodes_.end());
@@ -1056,6 +1087,40 @@ std::vector<double> ClusterResourceScheduler::SubtractCPUResourceInstances(
return VectorFixedPointToVectorDouble(underflow);
}
std::vector<double> ClusterResourceScheduler::AddGPUResourceInstances(
std::vector<double> &gpu_instances) {
std::vector<FixedPoint> gpu_instances_fp =
VectorDoubleToVectorFixedPoint(gpu_instances);
if (gpu_instances.size() == 0) {
return gpu_instances; // No overflow.
}
RAY_CHECK(nodes_.find(local_node_id_) != nodes_.end());
auto overflow = AddAvailableResourceInstances(
gpu_instances_fp, &local_resources_.predefined_resources[GPU]);
UpdateLocalAvailableResourcesFromResourceInstances();
return VectorFixedPointToVectorDouble(overflow);
}
std::vector<double> ClusterResourceScheduler::SubtractGPUResourceInstances(
std::vector<double> &gpu_instances) {
std::vector<FixedPoint> gpu_instances_fp =
VectorDoubleToVectorFixedPoint(gpu_instances);
if (gpu_instances.size() == 0) {
return gpu_instances; // No underflow.
}
RAY_CHECK(nodes_.find(local_node_id_) != nodes_.end());
auto underflow = SubtractAvailableResourceInstances(
gpu_instances_fp, &local_resources_.predefined_resources[GPU]);
UpdateLocalAvailableResourcesFromResourceInstances();
return VectorFixedPointToVectorDouble(underflow);
}
bool ClusterResourceScheduler::AllocateTaskResources(
int64_t node_id, const TaskRequest &task_req,
std::shared_ptr<TaskResourceInstances> task_allocation) {
@@ -14,87 +14,21 @@
#pragma once
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "ray/common/scheduling/scheduling_ids.h"
#include "ray/common/task/scheduling_resources.h"
#include "ray/util/logging.h"
#include <iostream>
#include <sstream>
#include <vector>
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "ray/common/scheduling/fixed_point.h"
#include "ray/common/scheduling/scheduling_ids.h"
#include "ray/common/task/scheduling_resources.h"
#include "ray/util/logging.h"
/// List of predefined resources.
enum PredefinedResources { CPU, MEM, GPU, TPU, PredefinedResources_MAX };
// Specify resources that consists of unit-size instances.
static std::unordered_set<int64_t> UnitInstanceResources{CPU, GPU, TPU};
/// Fixed point data type.
class FixedPoint {
#define RESOURCE_UNIT_SCALING 1000
private:
int64_t i_;
public:
FixedPoint(double d = 0) { i_ = (int64_t)(d * RESOURCE_UNIT_SCALING); }
FixedPoint operator+(FixedPoint const &ru) {
FixedPoint res;
res.i_ = i_ + ru.i_;
return res;
}
FixedPoint operator+=(FixedPoint const &ru) {
i_ += ru.i_;
return *this;
}
FixedPoint operator-(FixedPoint const &ru) {
FixedPoint res;
res.i_ = i_ - ru.i_;
return res;
}
FixedPoint operator-=(FixedPoint const &ru) {
i_ -= ru.i_;
return *this;
}
FixedPoint operator-() const {
FixedPoint res;
res.i_ = -i_;
return res;
}
FixedPoint operator+(double const d) {
FixedPoint res;
res.i_ = i_ + (int64_t)(d * RESOURCE_UNIT_SCALING);
return res;
}
FixedPoint operator-(double const d) {
FixedPoint res;
res.i_ = i_ - (int64_t)(d * RESOURCE_UNIT_SCALING);
return res;
}
FixedPoint operator=(double const d) {
i_ = (int64_t)(d * RESOURCE_UNIT_SCALING);
;
return *this;
}
friend bool operator<(FixedPoint const &ru1, FixedPoint const &ru2);
friend bool operator>(FixedPoint const &ru1, FixedPoint const &ru2);
friend bool operator<=(FixedPoint const &ru1, FixedPoint const &ru2);
friend bool operator>=(FixedPoint const &ru1, FixedPoint const &ru2);
friend bool operator==(FixedPoint const &ru1, FixedPoint const &ru2);
friend bool operator!=(FixedPoint const &ru1, FixedPoint const &ru2);
double Double() { return (double)i_ / RESOURCE_UNIT_SCALING; };
friend std::ostream &operator<<(std::ostream &out, const FixedPoint &ru);
};
static std::unordered_set<int64_t> UnitInstanceResources{GPU, TPU};
/// Helper function to compare two vectors with FixedPoint values.
bool EqualVectors(const std::vector<FixedPoint> &v1, const std::vector<FixedPoint> &v2);
@@ -175,6 +109,36 @@ class TaskResourceInstances {
return {};
}
};
/// Get GPU instances only.
std::vector<FixedPoint> GetGPUInstances() const {
if (!this->predefined_resources.empty()) {
return this->predefined_resources[GPU];
} else {
return {};
}
};
std::vector<double> GetGPUInstancesDouble() const {
if (!this->predefined_resources.empty()) {
return VectorFixedPointToVectorDouble(this->predefined_resources[GPU]);
} else {
return {};
}
};
/// Get mem instances only.
std::vector<FixedPoint> GetMemInstances() const {
if (!this->predefined_resources.empty()) {
return this->predefined_resources[MEM];
} else {
return {};
}
};
std::vector<double> GetMemInstancesDouble() const {
if (!this->predefined_resources.empty()) {
return VectorFixedPointToVectorDouble(this->predefined_resources[MEM]);
} else {
return {};
}
};
/// Check whether there are no resource instances.
bool IsEmpty() const;
/// Returns human-readable string for these resources.
@@ -496,6 +460,22 @@ class ClusterResourceScheduler {
/// capacities in cpu_instances.
std::vector<double> SubtractCPUResourceInstances(std::vector<double> &cpu_instances);
/// Increase the available GPU instances of this node.
///
/// \param gpu_instances GPU instances to be added to available gpus.
///
/// \return Overflow capacities of GPU instances after adding GPU
/// capacities in gpu_instances.
std::vector<double> AddGPUResourceInstances(std::vector<double> &gpu_instances);
/// Decrease the available GPU instances of this node.
///
/// \param gpu_instances GPU instances to be removed from available gpus.
///
/// \return Underflow capacities of GPU instances after subtracting GPU
/// capacities in gpu_instances.
std::vector<double> SubtractGPUResourceInstances(std::vector<double> &gpu_instances);
/// Subtract the resources required by a given task request (task_req) from the
/// local node. This function also updates the local node resources
/// at the instance granularity.
+56
View File
@@ -0,0 +1,56 @@
#include "fixed_point.h"
#include <cmath>
FixedPoint::FixedPoint(double d) {
// We need to round, not truncate because floating point multiplication can
// leave a number slightly smaller than the intended whole number.
i_ = (uint64_t)((d * RESOURCE_UNIT_SCALING) + 0.5);
}
FixedPoint FixedPoint::operator+(FixedPoint const &ru) {
FixedPoint res;
res.i_ = i_ + ru.i_;
return res;
}
FixedPoint FixedPoint::operator+=(FixedPoint const &ru) {
i_ += ru.i_;
return *this;
}
FixedPoint FixedPoint::operator-(FixedPoint const &ru) {
FixedPoint res;
res.i_ = i_ - ru.i_;
return res;
}
FixedPoint FixedPoint::operator-=(FixedPoint const &ru) {
i_ -= ru.i_;
return *this;
}
FixedPoint FixedPoint::operator-() const {
FixedPoint res;
res.i_ = -i_;
return res;
}
FixedPoint FixedPoint::operator+(double const d) {
FixedPoint res;
res.i_ = i_ + (int64_t)(d * RESOURCE_UNIT_SCALING);
return res;
}
FixedPoint FixedPoint::operator-(double const d) {
FixedPoint res;
res.i_ = i_ - (int64_t)(d * RESOURCE_UNIT_SCALING);
return res;
}
FixedPoint FixedPoint::operator=(double const d) {
i_ = (int64_t)(d * RESOURCE_UNIT_SCALING);
return *this;
}
double FixedPoint::Double() { return round(i_) / RESOURCE_UNIT_SCALING; };
+42
View File
@@ -0,0 +1,42 @@
#pragma once
#include <cstdint>
#include <iostream>
#define RESOURCE_UNIT_SCALING 10000
/// Fixed point data type.
class FixedPoint {
private:
int64_t i_;
public:
FixedPoint(double d = 0);
FixedPoint operator+(FixedPoint const &ru);
FixedPoint operator+=(FixedPoint const &ru);
FixedPoint operator-(FixedPoint const &ru);
FixedPoint operator-=(FixedPoint const &ru);
FixedPoint operator-() const;
FixedPoint operator+(double const d);
FixedPoint operator-(double const d);
FixedPoint operator=(double const d);
friend bool operator<(FixedPoint const &ru1, FixedPoint const &ru2);
friend bool operator>(FixedPoint const &ru1, FixedPoint const &ru2);
friend bool operator<=(FixedPoint const &ru1, FixedPoint const &ru2);
friend bool operator>=(FixedPoint const &ru1, FixedPoint const &ru2);
friend bool operator==(FixedPoint const &ru1, FixedPoint const &ru2);
friend bool operator!=(FixedPoint const &ru1, FixedPoint const &ru2);
double Double();
friend std::ostream &operator<<(std::ostream &out, const FixedPoint &ru);
};
+56 -56
View File
@@ -12,16 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <string>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "ray/common/scheduling/cluster_resource_scheduler.h"
#include "ray/common/scheduling/scheduling_ids.h"
#ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION
#include <chrono>
#include "absl/container/flat_hash_map.h"
#endif // UNORDERED_VS_ABSL_MAPS_EVALUATION
@@ -575,7 +575,7 @@ TEST_F(SchedulingTest, GetLocalAvailableResourcesTest) {
cluster_resources.GetLocalResources().GetAvailableResourceInstances();
TaskResourceInstances expected_cluster_resources;
addTaskResourceInstances(true, {1., 1., 1.}, 0, &expected_cluster_resources);
addTaskResourceInstances(true, {3.}, 0, &expected_cluster_resources);
addTaskResourceInstances(true, {4.}, 1, &expected_cluster_resources);
addTaskResourceInstances(true, {1., 1., 1., 1., 1.}, 2, &expected_cluster_resources);
@@ -704,7 +704,7 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) {
ASSERT_EQ(success, true);
TaskResourceInstances expected_task_allocation;
addTaskResourceInstances(true, {0., 0., 0.}, CPU, &expected_task_allocation);
addTaskResourceInstances(true, {0.}, CPU, &expected_task_allocation);
addTaskResourceInstances(true, {2.}, MEM, &expected_task_allocation);
addTaskResourceInstances(true, {0., 0.5, 1., 1., 1.}, GPU, &expected_task_allocation);
@@ -797,7 +797,7 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) {
ASSERT_EQ(success, true);
TaskResourceInstances expected_task_allocation;
addTaskResourceInstances(true, {0., 0., 0.}, CPU, &expected_task_allocation);
addTaskResourceInstances(true, {0.}, CPU, &expected_task_allocation);
addTaskResourceInstances(true, {2.}, MEM, &expected_task_allocation);
addTaskResourceInstances(true, {0., 0.5, 1., 1., 1.}, GPU, &expected_task_allocation);
addTaskResourceInstances(false, {1.}, 1, &expected_task_allocation);
@@ -842,102 +842,102 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest2) {
}
}
TEST_F(SchedulingTest, TaskCPUResourceInstancesTest) {
TEST_F(SchedulingTest, TaskGPUResourceInstancesTest) {
{
NodeResources node_resources;
vector<FixedPoint> pred_capacities{4 /* CPU */, 1 /* MEM */, 1 /* GPU */};
vector<FixedPoint> pred_capacities{1 /* CPU */, 1 /* MEM */, 4 /* GPU */};
vector<int64_t> cust_ids{1};
vector<FixedPoint> cust_capacities{8};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler cluster_resources(0, node_resources);
std::vector<double> allocate_cpu_instances{0.5, 0.5, 0.5, 0.5};
cluster_resources.SubtractCPUResourceInstances(allocate_cpu_instances);
std::vector<double> available_cpu_instances = cluster_resources.GetLocalResources()
std::vector<double> allocate_gpu_instances{0.5, 0.5, 0.5, 0.5};
cluster_resources.SubtractGPUResourceInstances(allocate_gpu_instances);
std::vector<double> available_gpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstancesDouble();
std::vector<double> expected_available_cpu_instances{0.5, 0.5, 0.5, 0.5};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(),
expected_available_cpu_instances.begin()));
.GetGPUInstancesDouble();
std::vector<double> expected_available_gpu_instances{0.5, 0.5, 0.5, 0.5};
ASSERT_TRUE(std::equal(available_gpu_instances.begin(), available_gpu_instances.end(),
expected_available_gpu_instances.begin()));
cluster_resources.AddCPUResourceInstances(allocate_cpu_instances);
available_cpu_instances = cluster_resources.GetLocalResources()
cluster_resources.AddGPUResourceInstances(allocate_gpu_instances);
available_gpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstancesDouble();
expected_available_cpu_instances = {1., 1., 1., 1.};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(),
expected_available_cpu_instances.begin()));
.GetGPUInstancesDouble();
expected_available_gpu_instances = {1., 1., 1., 1.};
ASSERT_TRUE(std::equal(available_gpu_instances.begin(), available_gpu_instances.end(),
expected_available_gpu_instances.begin()));
allocate_cpu_instances = {1.5, 1.5, .5, 1.5};
allocate_gpu_instances = {1.5, 1.5, .5, 1.5};
std::vector<double> underflow =
cluster_resources.SubtractCPUResourceInstances(allocate_cpu_instances);
cluster_resources.SubtractGPUResourceInstances(allocate_gpu_instances);
std::vector<double> expected_underflow{.5, .5, 0., .5};
ASSERT_TRUE(
std::equal(underflow.begin(), underflow.end(), expected_underflow.begin()));
available_cpu_instances = cluster_resources.GetLocalResources()
available_gpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstancesDouble();
expected_available_cpu_instances = {0., 0., 0.5, 0.};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(),
expected_available_cpu_instances.begin()));
.GetGPUInstancesDouble();
expected_available_gpu_instances = {0., 0., 0.5, 0.};
ASSERT_TRUE(std::equal(available_gpu_instances.begin(), available_gpu_instances.end(),
expected_available_gpu_instances.begin()));
allocate_cpu_instances = {1.0, .5, 1., .5};
allocate_gpu_instances = {1.0, .5, 1., .5};
std::vector<double> overflow =
cluster_resources.AddCPUResourceInstances(allocate_cpu_instances);
cluster_resources.AddGPUResourceInstances(allocate_gpu_instances);
std::vector<double> expected_overflow{.0, .0, .5, 0.};
ASSERT_TRUE(std::equal(overflow.begin(), overflow.end(), expected_overflow.begin()));
available_cpu_instances = cluster_resources.GetLocalResources()
available_gpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstancesDouble();
expected_available_cpu_instances = {1., .5, 1., .5};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(),
expected_available_cpu_instances.begin()));
.GetGPUInstancesDouble();
expected_available_gpu_instances = {1., .5, 1., .5};
ASSERT_TRUE(std::equal(available_gpu_instances.begin(), available_gpu_instances.end(),
expected_available_gpu_instances.begin()));
}
}
TEST_F(SchedulingTest, UpdateLocalAvailableResourcesFromResourceInstancesTest) {
{
NodeResources node_resources;
vector<FixedPoint> pred_capacities{4 /* CPU */, 1 /* MEM */, 1 /* GPU */};
vector<FixedPoint> pred_capacities{1 /* CPU */, 1 /* MEM */, 4 /* GPU */};
vector<int64_t> cust_ids{1};
vector<FixedPoint> cust_capacities{8};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler cluster_resources(0, node_resources);
{
std::vector<double> allocate_cpu_instances{0.5, 0.5, 2, 0.5};
// SubtractCPUResourceInstances() calls
std::vector<double> allocate_gpu_instances{0.5, 0.5, 2, 0.5};
// SubtractGPUResourceInstances() calls
// UpdateLocalAvailableResourcesFromResourceInstances() under the hood.
cluster_resources.SubtractCPUResourceInstances(allocate_cpu_instances);
std::vector<double> available_cpu_instances = cluster_resources.GetLocalResources()
cluster_resources.SubtractGPUResourceInstances(allocate_gpu_instances);
std::vector<double> available_gpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstancesDouble();
std::vector<double> expected_available_cpu_instances{0.5, 0.5, 0., 0.5};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(),
available_cpu_instances.end(),
expected_available_cpu_instances.begin()));
.GetGPUInstancesDouble();
std::vector<double> expected_available_gpu_instances{0.5, 0.5, 0., 0.5};
ASSERT_TRUE(std::equal(available_gpu_instances.begin(),
available_gpu_instances.end(),
expected_available_gpu_instances.begin()));
NodeResources nr;
cluster_resources.GetNodeResources(0, &nr);
ASSERT_TRUE(nr.predefined_resources[0].available == 1.5);
ASSERT_TRUE(nr.predefined_resources[GPU].available == 1.5);
}
{
std::vector<double> allocate_cpu_instances{1.5, 0.5, 2, 0.3};
// SubtractCPUResourceInstances() calls
std::vector<double> allocate_gpu_instances{1.5, 0.5, 2, 0.3};
// SubtractGPUResourceInstances() calls
// UpdateLocalAvailableResourcesFromResourceInstances() under the hood.
cluster_resources.AddCPUResourceInstances(allocate_cpu_instances);
std::vector<double> available_cpu_instances = cluster_resources.GetLocalResources()
cluster_resources.AddGPUResourceInstances(allocate_gpu_instances);
std::vector<double> available_gpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstancesDouble();
std::vector<double> expected_available_cpu_instances{1., 1., 1., 0.8};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(),
available_cpu_instances.end(),
expected_available_cpu_instances.begin()));
.GetGPUInstancesDouble();
std::vector<double> expected_available_gpu_instances{1., 1., 1., 0.8};
ASSERT_TRUE(std::equal(available_gpu_instances.begin(),
available_gpu_instances.end(),
expected_available_gpu_instances.begin()));
NodeResources nr;
cluster_resources.GetNodeResources(0, &nr);
ASSERT_TRUE(nr.predefined_resources[0].available == 3.8);
ASSERT_TRUE(nr.predefined_resources[GPU].available == 3.8);
}
}
}
+13 -2
View File
@@ -272,8 +272,19 @@ bool TaskSpecification::IsDetachedActor() const {
std::string TaskSpecification::DebugString() const {
std::ostringstream stream;
stream << "Type=" << TaskType_Name(message_->type())
<< ", Language=" << Language_Name(message_->language())
<< ", function_descriptor=";
<< ", Language=" << Language_Name(message_->language());
if (required_resources_ != nullptr) {
stream << ", Resources: {";
// Print resource description.
for (auto entry : GetRequiredResources().GetResourceMap()) {
stream << entry.first << ": " << entry.second << ", ";
}
stream << "}";
}
stream << ", function_descriptor=";
// Print function descriptor.
stream << FunctionDescriptor()->ToString();
+4 -9
View File
@@ -1699,6 +1699,7 @@ void NodeManager::DispatchScheduledTasksToWorkers() {
// Try next task in the dispatch queue.
continue;
}
worker->SetOwnerAddress(spec.CallerAddress());
if (spec.IsActorCreationTask()) {
// The actor belongs to this worker now.
@@ -1723,13 +1724,9 @@ void NodeManager::NewSchedulerSchedulePendingTasks() {
// blocking where a task which cannot be scheduled because
// there are not enough available resources blocks other
// tasks from being scheduled.
while (queue_size > 0) {
if (queue_size == 0) {
return;
} else {
queue_size--;
}
while (queue_size-- > 0) {
auto work = tasks_to_schedule_.front();
tasks_to_schedule_.pop_front();
auto task = work.second;
auto request_resources =
task.GetTaskSpecification().GetRequiredResources().GetResourceMap();
@@ -1738,13 +1735,13 @@ void NodeManager::NewSchedulerSchedulePendingTasks() {
new_resource_scheduler_->GetBestSchedulableNode(request_resources, &violations);
if (node_id_string.empty()) {
/// There is no node that has available resources to run the request.
tasks_to_schedule_.pop_front();
tasks_to_schedule_.push_back(work);
continue;
} else {
if (node_id_string == self_node_id_.Binary()) {
WaitForTaskArgsRequests(work);
} else {
// Should spill over to a different node.
new_resource_scheduler_->AllocateRemoteTaskResources(node_id_string,
request_resources);
@@ -1756,7 +1753,6 @@ void NodeManager::NewSchedulerSchedulePendingTasks() {
work.first(nullptr, node_id, node_info_opt->node_manager_address(),
node_info_opt->node_manager_port());
}
tasks_to_schedule_.pop_front();
}
}
DispatchScheduledTasksToWorkers();
@@ -1875,7 +1871,6 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
// Override the task dispatch to call back to the client instead of executing the
// task directly on the worker.
RAY_LOG(DEBUG) << "Worker lease request " << task.GetTaskSpecification().TaskId();
TaskID task_id = task.GetTaskSpecification().TaskId();
rpc::Address owner_address = task.GetTaskSpecification().CallerAddress();
task.OnDispatchInstead(