diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index fd59fad7d..e372d7a42 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -7,6 +7,7 @@ import subprocess import sys import time import socket +import math import ray import ray.services @@ -277,6 +278,22 @@ class Semaphore: return self._sema.locked() +def dicts_equal(dict1, dict2, abs_tol=1e-4): + """Compares to dicts whose values may be floating point numbers.""" + + if dict1.keys() != dict2.keys(): + return False + + for k, v in dict1.items(): + if isinstance(v, float) and \ + isinstance(dict2[k], float) and \ + math.isclose(v, dict2[k], abs_tol=abs_tol): + continue + if v != dict2[k]: + return False + return True + + @ray.remote def _put(obj): return obj diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index ded9b6123..fe6178839 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -127,7 +127,7 @@ def test_many_fractional_resources(shutdown_only): } if block: ray.get(g.remote()) - return true_resources == accepted_resources + return ray.test_utils.dicts_equal(true_resources, accepted_resources) # Check that the resource are assigned correctly. result_ids = [] diff --git a/src/ray/common/scheduling/cluster_resource_scheduler.cc b/src/ray/common/scheduling/cluster_resource_scheduler.cc index b97a09081..58e13c1d2 100644 --- a/src/ray/common/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/common/scheduling/cluster_resource_scheduler.cc @@ -222,18 +222,33 @@ bool NodeResources::operator==(const NodeResources &other) { std::string NodeResources::DebugString(StringIdMap string_to_in_map) const { std::stringstream buffer; - buffer << " {"; - for (size_t i = 0; i < static_cast(this->predefined_resources.size()); i++) { + buffer << " {\n"; + for (size_t i = 0; i < this->predefined_resources.size(); i++) { + buffer << "\t"; + switch (i) { + case CPU: + buffer << "CPU: "; + break; + case MEM: + buffer << "MEM: "; + break; + case GPU: + buffer << "GPU: "; + break; + case TPU: + buffer << "TPU: "; + break; + default: + RAY_CHECK(false) << "This should never happen."; + break; + } buffer << "(" << this->predefined_resources[i].total << ":" - << this->predefined_resources[i].available << ") "; + << this->predefined_resources[i].available << ")\n"; } - buffer << "}"; - - buffer << " {"; for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); ++it) { - buffer << string_to_in_map.Get(it->first) << ":(" << it->second.total << ":" - << it->second.available << ") "; + buffer << "\t" << string_to_in_map.Get(it->first) << ":(" << it->second.total << ":" + << it->second.available << ")\n"; } buffer << "}" << std::endl; return buffer.str(); @@ -273,18 +288,34 @@ bool NodeResourceInstances::operator==(const NodeResourceInstances &other) { std::string NodeResourceInstances::DebugString(StringIdMap string_to_int_map) const { std::stringstream buffer; - buffer << " {"; + buffer << "{\n"; for (size_t i = 0; i < this->predefined_resources.size(); i++) { + buffer << "\t"; + switch (i) { + case CPU: + buffer << "CPU: "; + break; + case MEM: + buffer << "MEM: "; + break; + case GPU: + buffer << "GPU: "; + break; + case TPU: + buffer << "TPU: "; + break; + default: + RAY_CHECK(false) << "This should never happen."; + break; + } buffer << "(" << VectorToString(predefined_resources[i].total) << ":" - << VectorToString(this->predefined_resources[i].available) << ") "; + << VectorToString(this->predefined_resources[i].available) << ")\n"; } - buffer << "}"; - - buffer << " {"; for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); ++it) { - buffer << string_to_int_map.Get(it->first) << ":(" << VectorToString(it->second.total) - << ":" << VectorToString(it->second.available) << ") "; + buffer << "\t" << string_to_int_map.Get(it->first) << ":(" + << VectorToString(it->second.total) << ":" + << VectorToString(it->second.available) << ")\n"; } buffer << "}" << std::endl; return buffer.str(); @@ -763,10 +794,10 @@ void ClusterResourceScheduler::DeleteResource(const std::string &client_id_strin std::string ClusterResourceScheduler::DebugString(void) const { std::stringstream buffer; - buffer << "\n Local id: " << local_node_id_; + buffer << "\nLocal id: " << local_node_id_; buffer << " Local resources: " << local_resources_.DebugString(string_to_int_map_); for (auto &node : nodes_) { - buffer << " node id: " << node.first; + buffer << "node id: " << node.first; buffer << node.second.DebugString(string_to_int_map_); } return buffer.str(); @@ -907,7 +938,7 @@ bool ClusterResourceScheduler::AllocateResourceInstances( } if (remaining_demand >= 1.) { - // Cannot satisfy a demand greater than one if no unit caapcity resource is available. + // Cannot satisfy a demand greater than one if no unit capacity resource is available. return false; } @@ -1028,7 +1059,7 @@ std::vector ClusterResourceScheduler::AddCPUResourceInstances( VectorDoubleToVectorFixedPoint(cpu_instances); if (cpu_instances.size() == 0) { - return cpu_instances; // No oveerflow. + return cpu_instances; // No overflow. } RAY_CHECK(nodes_.find(local_node_id_) != nodes_.end()); @@ -1056,6 +1087,40 @@ std::vector ClusterResourceScheduler::SubtractCPUResourceInstances( return VectorFixedPointToVectorDouble(underflow); } +std::vector ClusterResourceScheduler::AddGPUResourceInstances( + std::vector &gpu_instances) { + std::vector gpu_instances_fp = + VectorDoubleToVectorFixedPoint(gpu_instances); + + if (gpu_instances.size() == 0) { + return gpu_instances; // No overflow. + } + RAY_CHECK(nodes_.find(local_node_id_) != nodes_.end()); + + auto overflow = AddAvailableResourceInstances( + gpu_instances_fp, &local_resources_.predefined_resources[GPU]); + UpdateLocalAvailableResourcesFromResourceInstances(); + + return VectorFixedPointToVectorDouble(overflow); +} + +std::vector ClusterResourceScheduler::SubtractGPUResourceInstances( + std::vector &gpu_instances) { + std::vector gpu_instances_fp = + VectorDoubleToVectorFixedPoint(gpu_instances); + + if (gpu_instances.size() == 0) { + return gpu_instances; // No underflow. + } + RAY_CHECK(nodes_.find(local_node_id_) != nodes_.end()); + + auto underflow = SubtractAvailableResourceInstances( + gpu_instances_fp, &local_resources_.predefined_resources[GPU]); + UpdateLocalAvailableResourcesFromResourceInstances(); + + return VectorFixedPointToVectorDouble(underflow); +} + bool ClusterResourceScheduler::AllocateTaskResources( int64_t node_id, const TaskRequest &task_req, std::shared_ptr task_allocation) { diff --git a/src/ray/common/scheduling/cluster_resource_scheduler.h b/src/ray/common/scheduling/cluster_resource_scheduler.h index a3128374c..a1f04e648 100644 --- a/src/ray/common/scheduling/cluster_resource_scheduler.h +++ b/src/ray/common/scheduling/cluster_resource_scheduler.h @@ -14,87 +14,21 @@ #pragma once -#include "absl/container/flat_hash_map.h" -#include "absl/container/flat_hash_set.h" -#include "ray/common/scheduling/scheduling_ids.h" -#include "ray/common/task/scheduling_resources.h" -#include "ray/util/logging.h" - #include #include #include +#include "absl/container/flat_hash_map.h" +#include "absl/container/flat_hash_set.h" +#include "ray/common/scheduling/fixed_point.h" +#include "ray/common/scheduling/scheduling_ids.h" +#include "ray/common/task/scheduling_resources.h" +#include "ray/util/logging.h" + /// List of predefined resources. enum PredefinedResources { CPU, MEM, GPU, TPU, PredefinedResources_MAX }; // Specify resources that consists of unit-size instances. -static std::unordered_set UnitInstanceResources{CPU, GPU, TPU}; - -/// Fixed point data type. -class FixedPoint { -#define RESOURCE_UNIT_SCALING 1000 - private: - int64_t i_; - - public: - FixedPoint(double d = 0) { i_ = (int64_t)(d * RESOURCE_UNIT_SCALING); } - - FixedPoint operator+(FixedPoint const &ru) { - FixedPoint res; - res.i_ = i_ + ru.i_; - return res; - } - - FixedPoint operator+=(FixedPoint const &ru) { - i_ += ru.i_; - return *this; - } - - FixedPoint operator-(FixedPoint const &ru) { - FixedPoint res; - res.i_ = i_ - ru.i_; - return res; - } - - FixedPoint operator-=(FixedPoint const &ru) { - i_ -= ru.i_; - return *this; - } - - FixedPoint operator-() const { - FixedPoint res; - res.i_ = -i_; - return res; - } - - FixedPoint operator+(double const d) { - FixedPoint res; - res.i_ = i_ + (int64_t)(d * RESOURCE_UNIT_SCALING); - return res; - } - - FixedPoint operator-(double const d) { - FixedPoint res; - res.i_ = i_ - (int64_t)(d * RESOURCE_UNIT_SCALING); - return res; - } - - FixedPoint operator=(double const d) { - i_ = (int64_t)(d * RESOURCE_UNIT_SCALING); - ; - return *this; - } - - friend bool operator<(FixedPoint const &ru1, FixedPoint const &ru2); - friend bool operator>(FixedPoint const &ru1, FixedPoint const &ru2); - friend bool operator<=(FixedPoint const &ru1, FixedPoint const &ru2); - friend bool operator>=(FixedPoint const &ru1, FixedPoint const &ru2); - friend bool operator==(FixedPoint const &ru1, FixedPoint const &ru2); - friend bool operator!=(FixedPoint const &ru1, FixedPoint const &ru2); - - double Double() { return (double)i_ / RESOURCE_UNIT_SCALING; }; - - friend std::ostream &operator<<(std::ostream &out, const FixedPoint &ru); -}; +static std::unordered_set UnitInstanceResources{GPU, TPU}; /// Helper function to compare two vectors with FixedPoint values. bool EqualVectors(const std::vector &v1, const std::vector &v2); @@ -175,6 +109,36 @@ class TaskResourceInstances { return {}; } }; + /// Get GPU instances only. + std::vector GetGPUInstances() const { + if (!this->predefined_resources.empty()) { + return this->predefined_resources[GPU]; + } else { + return {}; + } + }; + std::vector GetGPUInstancesDouble() const { + if (!this->predefined_resources.empty()) { + return VectorFixedPointToVectorDouble(this->predefined_resources[GPU]); + } else { + return {}; + } + }; + /// Get mem instances only. + std::vector GetMemInstances() const { + if (!this->predefined_resources.empty()) { + return this->predefined_resources[MEM]; + } else { + return {}; + } + }; + std::vector GetMemInstancesDouble() const { + if (!this->predefined_resources.empty()) { + return VectorFixedPointToVectorDouble(this->predefined_resources[MEM]); + } else { + return {}; + } + }; /// Check whether there are no resource instances. bool IsEmpty() const; /// Returns human-readable string for these resources. @@ -496,6 +460,22 @@ class ClusterResourceScheduler { /// capacities in cpu_instances. std::vector SubtractCPUResourceInstances(std::vector &cpu_instances); + /// Increase the available GPU instances of this node. + /// + /// \param gpu_instances GPU instances to be added to available gpus. + /// + /// \return Overflow capacities of GPU instances after adding GPU + /// capacities in gpu_instances. + std::vector AddGPUResourceInstances(std::vector &gpu_instances); + + /// Decrease the available GPU instances of this node. + /// + /// \param gpu_instances GPU instances to be removed from available gpus. + /// + /// \return Underflow capacities of GPU instances after subtracting GPU + /// capacities in gpu_instances. + std::vector SubtractGPUResourceInstances(std::vector &gpu_instances); + /// Subtract the resources required by a given task request (task_req) from the /// local node. This function also updates the local node resources /// at the instance granularity. diff --git a/src/ray/common/scheduling/fixed_point.cc b/src/ray/common/scheduling/fixed_point.cc new file mode 100644 index 000000000..24420ada3 --- /dev/null +++ b/src/ray/common/scheduling/fixed_point.cc @@ -0,0 +1,56 @@ +#include "fixed_point.h" + +#include + +FixedPoint::FixedPoint(double d) { + // We need to round, not truncate because floating point multiplication can + // leave a number slightly smaller than the intended whole number. + i_ = (uint64_t)((d * RESOURCE_UNIT_SCALING) + 0.5); +} + +FixedPoint FixedPoint::operator+(FixedPoint const &ru) { + FixedPoint res; + res.i_ = i_ + ru.i_; + return res; +} + +FixedPoint FixedPoint::operator+=(FixedPoint const &ru) { + i_ += ru.i_; + return *this; +} + +FixedPoint FixedPoint::operator-(FixedPoint const &ru) { + FixedPoint res; + res.i_ = i_ - ru.i_; + return res; +} + +FixedPoint FixedPoint::operator-=(FixedPoint const &ru) { + i_ -= ru.i_; + return *this; +} + +FixedPoint FixedPoint::operator-() const { + FixedPoint res; + res.i_ = -i_; + return res; +} + +FixedPoint FixedPoint::operator+(double const d) { + FixedPoint res; + res.i_ = i_ + (int64_t)(d * RESOURCE_UNIT_SCALING); + return res; +} + +FixedPoint FixedPoint::operator-(double const d) { + FixedPoint res; + res.i_ = i_ - (int64_t)(d * RESOURCE_UNIT_SCALING); + return res; +} + +FixedPoint FixedPoint::operator=(double const d) { + i_ = (int64_t)(d * RESOURCE_UNIT_SCALING); + return *this; +} + +double FixedPoint::Double() { return round(i_) / RESOURCE_UNIT_SCALING; }; diff --git a/src/ray/common/scheduling/fixed_point.h b/src/ray/common/scheduling/fixed_point.h new file mode 100644 index 000000000..dbdad32ef --- /dev/null +++ b/src/ray/common/scheduling/fixed_point.h @@ -0,0 +1,42 @@ +#pragma once + +#include +#include + +#define RESOURCE_UNIT_SCALING 10000 + +/// Fixed point data type. +class FixedPoint { + private: + int64_t i_; + + public: + FixedPoint(double d = 0); + + FixedPoint operator+(FixedPoint const &ru); + + FixedPoint operator+=(FixedPoint const &ru); + + FixedPoint operator-(FixedPoint const &ru); + + FixedPoint operator-=(FixedPoint const &ru); + + FixedPoint operator-() const; + + FixedPoint operator+(double const d); + + FixedPoint operator-(double const d); + + FixedPoint operator=(double const d); + + friend bool operator<(FixedPoint const &ru1, FixedPoint const &ru2); + friend bool operator>(FixedPoint const &ru1, FixedPoint const &ru2); + friend bool operator<=(FixedPoint const &ru1, FixedPoint const &ru2); + friend bool operator>=(FixedPoint const &ru1, FixedPoint const &ru2); + friend bool operator==(FixedPoint const &ru1, FixedPoint const &ru2); + friend bool operator!=(FixedPoint const &ru1, FixedPoint const &ru2); + + double Double(); + + friend std::ostream &operator<<(std::ostream &out, const FixedPoint &ru); +}; diff --git a/src/ray/common/scheduling/scheduling_test.cc b/src/ray/common/scheduling/scheduling_test.cc index 14a1f94d5..89e3152b0 100644 --- a/src/ray/common/scheduling/scheduling_test.cc +++ b/src/ray/common/scheduling/scheduling_test.cc @@ -12,16 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "gmock/gmock.h" -#include "gtest/gtest.h" - #include +#include "gmock/gmock.h" +#include "gtest/gtest.h" #include "ray/common/scheduling/cluster_resource_scheduler.h" #include "ray/common/scheduling/scheduling_ids.h" #ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION #include + #include "absl/container/flat_hash_map.h" #endif // UNORDERED_VS_ABSL_MAPS_EVALUATION @@ -575,7 +575,7 @@ TEST_F(SchedulingTest, GetLocalAvailableResourcesTest) { cluster_resources.GetLocalResources().GetAvailableResourceInstances(); TaskResourceInstances expected_cluster_resources; - addTaskResourceInstances(true, {1., 1., 1.}, 0, &expected_cluster_resources); + addTaskResourceInstances(true, {3.}, 0, &expected_cluster_resources); addTaskResourceInstances(true, {4.}, 1, &expected_cluster_resources); addTaskResourceInstances(true, {1., 1., 1., 1., 1.}, 2, &expected_cluster_resources); @@ -704,7 +704,7 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) { ASSERT_EQ(success, true); TaskResourceInstances expected_task_allocation; - addTaskResourceInstances(true, {0., 0., 0.}, CPU, &expected_task_allocation); + addTaskResourceInstances(true, {0.}, CPU, &expected_task_allocation); addTaskResourceInstances(true, {2.}, MEM, &expected_task_allocation); addTaskResourceInstances(true, {0., 0.5, 1., 1., 1.}, GPU, &expected_task_allocation); @@ -797,7 +797,7 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) { ASSERT_EQ(success, true); TaskResourceInstances expected_task_allocation; - addTaskResourceInstances(true, {0., 0., 0.}, CPU, &expected_task_allocation); + addTaskResourceInstances(true, {0.}, CPU, &expected_task_allocation); addTaskResourceInstances(true, {2.}, MEM, &expected_task_allocation); addTaskResourceInstances(true, {0., 0.5, 1., 1., 1.}, GPU, &expected_task_allocation); addTaskResourceInstances(false, {1.}, 1, &expected_task_allocation); @@ -842,102 +842,102 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest2) { } } -TEST_F(SchedulingTest, TaskCPUResourceInstancesTest) { +TEST_F(SchedulingTest, TaskGPUResourceInstancesTest) { { NodeResources node_resources; - vector pred_capacities{4 /* CPU */, 1 /* MEM */, 1 /* GPU */}; + vector pred_capacities{1 /* CPU */, 1 /* MEM */, 4 /* GPU */}; vector cust_ids{1}; vector cust_capacities{8}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); ClusterResourceScheduler cluster_resources(0, node_resources); - std::vector allocate_cpu_instances{0.5, 0.5, 0.5, 0.5}; - cluster_resources.SubtractCPUResourceInstances(allocate_cpu_instances); - std::vector available_cpu_instances = cluster_resources.GetLocalResources() + std::vector allocate_gpu_instances{0.5, 0.5, 0.5, 0.5}; + cluster_resources.SubtractGPUResourceInstances(allocate_gpu_instances); + std::vector available_gpu_instances = cluster_resources.GetLocalResources() .GetAvailableResourceInstances() - .GetCPUInstancesDouble(); - std::vector expected_available_cpu_instances{0.5, 0.5, 0.5, 0.5}; - ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(), - expected_available_cpu_instances.begin())); + .GetGPUInstancesDouble(); + std::vector expected_available_gpu_instances{0.5, 0.5, 0.5, 0.5}; + ASSERT_TRUE(std::equal(available_gpu_instances.begin(), available_gpu_instances.end(), + expected_available_gpu_instances.begin())); - cluster_resources.AddCPUResourceInstances(allocate_cpu_instances); - available_cpu_instances = cluster_resources.GetLocalResources() + cluster_resources.AddGPUResourceInstances(allocate_gpu_instances); + available_gpu_instances = cluster_resources.GetLocalResources() .GetAvailableResourceInstances() - .GetCPUInstancesDouble(); - expected_available_cpu_instances = {1., 1., 1., 1.}; - ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(), - expected_available_cpu_instances.begin())); + .GetGPUInstancesDouble(); + expected_available_gpu_instances = {1., 1., 1., 1.}; + ASSERT_TRUE(std::equal(available_gpu_instances.begin(), available_gpu_instances.end(), + expected_available_gpu_instances.begin())); - allocate_cpu_instances = {1.5, 1.5, .5, 1.5}; + allocate_gpu_instances = {1.5, 1.5, .5, 1.5}; std::vector underflow = - cluster_resources.SubtractCPUResourceInstances(allocate_cpu_instances); + cluster_resources.SubtractGPUResourceInstances(allocate_gpu_instances); std::vector expected_underflow{.5, .5, 0., .5}; ASSERT_TRUE( std::equal(underflow.begin(), underflow.end(), expected_underflow.begin())); - available_cpu_instances = cluster_resources.GetLocalResources() + available_gpu_instances = cluster_resources.GetLocalResources() .GetAvailableResourceInstances() - .GetCPUInstancesDouble(); - expected_available_cpu_instances = {0., 0., 0.5, 0.}; - ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(), - expected_available_cpu_instances.begin())); + .GetGPUInstancesDouble(); + expected_available_gpu_instances = {0., 0., 0.5, 0.}; + ASSERT_TRUE(std::equal(available_gpu_instances.begin(), available_gpu_instances.end(), + expected_available_gpu_instances.begin())); - allocate_cpu_instances = {1.0, .5, 1., .5}; + allocate_gpu_instances = {1.0, .5, 1., .5}; std::vector overflow = - cluster_resources.AddCPUResourceInstances(allocate_cpu_instances); + cluster_resources.AddGPUResourceInstances(allocate_gpu_instances); std::vector expected_overflow{.0, .0, .5, 0.}; ASSERT_TRUE(std::equal(overflow.begin(), overflow.end(), expected_overflow.begin())); - available_cpu_instances = cluster_resources.GetLocalResources() + available_gpu_instances = cluster_resources.GetLocalResources() .GetAvailableResourceInstances() - .GetCPUInstancesDouble(); - expected_available_cpu_instances = {1., .5, 1., .5}; - ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(), - expected_available_cpu_instances.begin())); + .GetGPUInstancesDouble(); + expected_available_gpu_instances = {1., .5, 1., .5}; + ASSERT_TRUE(std::equal(available_gpu_instances.begin(), available_gpu_instances.end(), + expected_available_gpu_instances.begin())); } } TEST_F(SchedulingTest, UpdateLocalAvailableResourcesFromResourceInstancesTest) { { NodeResources node_resources; - vector pred_capacities{4 /* CPU */, 1 /* MEM */, 1 /* GPU */}; + vector pred_capacities{1 /* CPU */, 1 /* MEM */, 4 /* GPU */}; vector cust_ids{1}; vector cust_capacities{8}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); ClusterResourceScheduler cluster_resources(0, node_resources); { - std::vector allocate_cpu_instances{0.5, 0.5, 2, 0.5}; - // SubtractCPUResourceInstances() calls + std::vector allocate_gpu_instances{0.5, 0.5, 2, 0.5}; + // SubtractGPUResourceInstances() calls // UpdateLocalAvailableResourcesFromResourceInstances() under the hood. - cluster_resources.SubtractCPUResourceInstances(allocate_cpu_instances); - std::vector available_cpu_instances = cluster_resources.GetLocalResources() + cluster_resources.SubtractGPUResourceInstances(allocate_gpu_instances); + std::vector available_gpu_instances = cluster_resources.GetLocalResources() .GetAvailableResourceInstances() - .GetCPUInstancesDouble(); - std::vector expected_available_cpu_instances{0.5, 0.5, 0., 0.5}; - ASSERT_TRUE(std::equal(available_cpu_instances.begin(), - available_cpu_instances.end(), - expected_available_cpu_instances.begin())); + .GetGPUInstancesDouble(); + std::vector expected_available_gpu_instances{0.5, 0.5, 0., 0.5}; + ASSERT_TRUE(std::equal(available_gpu_instances.begin(), + available_gpu_instances.end(), + expected_available_gpu_instances.begin())); NodeResources nr; cluster_resources.GetNodeResources(0, &nr); - ASSERT_TRUE(nr.predefined_resources[0].available == 1.5); + ASSERT_TRUE(nr.predefined_resources[GPU].available == 1.5); } { - std::vector allocate_cpu_instances{1.5, 0.5, 2, 0.3}; - // SubtractCPUResourceInstances() calls + std::vector allocate_gpu_instances{1.5, 0.5, 2, 0.3}; + // SubtractGPUResourceInstances() calls // UpdateLocalAvailableResourcesFromResourceInstances() under the hood. - cluster_resources.AddCPUResourceInstances(allocate_cpu_instances); - std::vector available_cpu_instances = cluster_resources.GetLocalResources() + cluster_resources.AddGPUResourceInstances(allocate_gpu_instances); + std::vector available_gpu_instances = cluster_resources.GetLocalResources() .GetAvailableResourceInstances() - .GetCPUInstancesDouble(); - std::vector expected_available_cpu_instances{1., 1., 1., 0.8}; - ASSERT_TRUE(std::equal(available_cpu_instances.begin(), - available_cpu_instances.end(), - expected_available_cpu_instances.begin())); + .GetGPUInstancesDouble(); + std::vector expected_available_gpu_instances{1., 1., 1., 0.8}; + ASSERT_TRUE(std::equal(available_gpu_instances.begin(), + available_gpu_instances.end(), + expected_available_gpu_instances.begin())); NodeResources nr; cluster_resources.GetNodeResources(0, &nr); - ASSERT_TRUE(nr.predefined_resources[0].available == 3.8); + ASSERT_TRUE(nr.predefined_resources[GPU].available == 3.8); } } } diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index f77851bca..572ba58ec 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -272,8 +272,19 @@ bool TaskSpecification::IsDetachedActor() const { std::string TaskSpecification::DebugString() const { std::ostringstream stream; stream << "Type=" << TaskType_Name(message_->type()) - << ", Language=" << Language_Name(message_->language()) - << ", function_descriptor="; + << ", Language=" << Language_Name(message_->language()); + + if (required_resources_ != nullptr) { + stream << ", Resources: {"; + + // Print resource description. + for (auto entry : GetRequiredResources().GetResourceMap()) { + stream << entry.first << ": " << entry.second << ", "; + } + stream << "}"; + } + + stream << ", function_descriptor="; // Print function descriptor. stream << FunctionDescriptor()->ToString(); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index f8a9b7868..3bb0c0e1e 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1699,6 +1699,7 @@ void NodeManager::DispatchScheduledTasksToWorkers() { // Try next task in the dispatch queue. continue; } + worker->SetOwnerAddress(spec.CallerAddress()); if (spec.IsActorCreationTask()) { // The actor belongs to this worker now. @@ -1723,13 +1724,9 @@ void NodeManager::NewSchedulerSchedulePendingTasks() { // blocking where a task which cannot be scheduled because // there are not enough available resources blocks other // tasks from being scheduled. - while (queue_size > 0) { - if (queue_size == 0) { - return; - } else { - queue_size--; - } + while (queue_size-- > 0) { auto work = tasks_to_schedule_.front(); + tasks_to_schedule_.pop_front(); auto task = work.second; auto request_resources = task.GetTaskSpecification().GetRequiredResources().GetResourceMap(); @@ -1738,13 +1735,13 @@ void NodeManager::NewSchedulerSchedulePendingTasks() { new_resource_scheduler_->GetBestSchedulableNode(request_resources, &violations); if (node_id_string.empty()) { /// There is no node that has available resources to run the request. - tasks_to_schedule_.pop_front(); tasks_to_schedule_.push_back(work); continue; } else { if (node_id_string == self_node_id_.Binary()) { WaitForTaskArgsRequests(work); } else { + // Should spill over to a different node. new_resource_scheduler_->AllocateRemoteTaskResources(node_id_string, request_resources); @@ -1756,7 +1753,6 @@ void NodeManager::NewSchedulerSchedulePendingTasks() { work.first(nullptr, node_id, node_info_opt->node_manager_address(), node_info_opt->node_manager_port()); } - tasks_to_schedule_.pop_front(); } } DispatchScheduledTasksToWorkers(); @@ -1875,7 +1871,6 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest // Override the task dispatch to call back to the client instead of executing the // task directly on the worker. - RAY_LOG(DEBUG) << "Worker lease request " << task.GetTaskSpecification().TaskId(); TaskID task_id = task.GetTaskSpecification().TaskId(); rpc::Address owner_address = task.GetTaskSpecification().CallerAddress(); task.OnDispatchInstead(