[New Scheduler] Heartbeat (#11024)

* .

* refactor

* .

* .

* done?

* .

* .

* .

* lint

* no light heartbeat, no tests, fields 2,3

* .

* manually clang format :(

* .

* .

* test

* .

* .

* task manager heartbeat

* lint

* .

* add reminder

* CR

* CR

* cleanup

* CR

* comment

* lint

* .
This commit is contained in:
Alex Wu
2020-10-01 15:54:53 -07:00
committed by GitHub
parent 1ad52bdfbc
commit a866be381c
13 changed files with 369 additions and 52 deletions
+47 -41
View File
@@ -391,64 +391,70 @@ void NodeManager::Heartbeat() {
SchedulingResources &local_resources = cluster_resource_map_[self_node_id_];
heartbeat_data->set_client_id(self_node_id_.Binary());
// TODO(atumanov): modify the heartbeat table protocol to use the ResourceSet directly.
// TODO(atumanov): implement a ResourceSet const_iterator.
// If light heartbeat enabled, we only set filed that represent resources changed.
if (light_heartbeat_enabled_) {
if (!last_heartbeat_resources_.GetTotalResources().IsEqual(
local_resources.GetTotalResources())) {
if (new_scheduler_enabled_) {
new_resource_scheduler_->Heartbeat(light_heartbeat_enabled_, heartbeat_data);
cluster_task_manager_->Heartbeat(light_heartbeat_enabled_, heartbeat_data);
} else {
// TODO(atumanov): modify the heartbeat table protocol to use the ResourceSet
// directly.
// TODO(atumanov): implement a ResourceSet const_iterator.
// If light heartbeat enabled, we only set filed that represent resources changed.
if (light_heartbeat_enabled_) {
if (!last_heartbeat_resources_.GetTotalResources().IsEqual(
local_resources.GetTotalResources())) {
for (const auto &resource_pair :
local_resources.GetTotalResources().GetResourceMap()) {
(*heartbeat_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_.SetTotalResources(
ResourceSet(local_resources.GetTotalResources()));
}
if (!last_heartbeat_resources_.GetAvailableResources().IsEqual(
local_resources.GetAvailableResources())) {
heartbeat_data->set_resources_available_changed(true);
for (const auto &resource_pair :
local_resources.GetAvailableResources().GetResourceMap()) {
(*heartbeat_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_.SetAvailableResources(
ResourceSet(local_resources.GetAvailableResources()));
}
local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad());
if (!last_heartbeat_resources_.GetLoadResources().IsEqual(
local_resources.GetLoadResources())) {
heartbeat_data->set_resource_load_changed(true);
for (const auto &resource_pair :
local_resources.GetLoadResources().GetResourceMap()) {
(*heartbeat_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_.SetLoadResources(
ResourceSet(local_resources.GetLoadResources()));
}
} else {
// If light heartbeat disabled, we send whole resources information every time.
for (const auto &resource_pair :
local_resources.GetTotalResources().GetResourceMap()) {
(*heartbeat_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_.SetTotalResources(
ResourceSet(local_resources.GetTotalResources()));
}
if (!last_heartbeat_resources_.GetAvailableResources().IsEqual(
local_resources.GetAvailableResources())) {
heartbeat_data->set_resources_available_changed(true);
for (const auto &resource_pair :
local_resources.GetAvailableResources().GetResourceMap()) {
(*heartbeat_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_.SetAvailableResources(
ResourceSet(local_resources.GetAvailableResources()));
}
local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad());
if (!last_heartbeat_resources_.GetLoadResources().IsEqual(
local_resources.GetLoadResources())) {
heartbeat_data->set_resource_load_changed(true);
local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad());
for (const auto &resource_pair :
local_resources.GetLoadResources().GetResourceMap()) {
(*heartbeat_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_.SetLoadResources(
ResourceSet(local_resources.GetLoadResources()));
}
} else {
// If light heartbeat disabled, we send whole resources information every time.
for (const auto &resource_pair :
local_resources.GetTotalResources().GetResourceMap()) {
(*heartbeat_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}
for (const auto &resource_pair :
local_resources.GetAvailableResources().GetResourceMap()) {
(*heartbeat_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad());
for (const auto &resource_pair :
local_resources.GetLoadResources().GetResourceMap()) {
(*heartbeat_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
}
@@ -1,5 +1,18 @@
#include "ray/raylet/scheduling/cluster_resource_data.h"
const std::string resource_labels[] = {ray::kCPU_ResourceLabel,
ray::kMemory_ResourceLabel,
ray::kGPU_ResourceLabel, ray::kTPU_ResourceLabel};
const std::string ResourceEnumToString(PredefinedResources resource) {
// TODO (Alex): We should replace this with a protobuf enum.
RAY_CHECK(resource < PredefinedResources_MAX)
<< "Something went wrong. Please file a bug report with this stack "
"trace: https://github.com/ray-project/ray/issues/new.";
std::string label = resource_labels[resource];
return label;
}
std::string VectorToString(const std::vector<FixedPoint> &vector) {
std::stringstream buffer;
@@ -28,6 +28,8 @@
/// List of predefined resources.
enum PredefinedResources { CPU, MEM, GPU, TPU, PredefinedResources_MAX };
const std::string ResourceEnumToString(PredefinedResources resource);
/// Helper function to compare two vectors with FixedPoint values.
bool EqualVectors(const std::vector<FixedPoint> &v1, const std::vector<FixedPoint> &v2);
@@ -14,6 +14,8 @@
#include "ray/raylet/scheduling/cluster_resource_scheduler.h"
namespace ray {
ClusterResourceScheduler::ClusterResourceScheduler(
int64_t local_node_id, const NodeResources &local_node_resources)
: local_node_id_(local_node_id) {
@@ -294,7 +296,7 @@ bool ClusterResourceScheduler::AddNodeAvailableResources(
}
bool ClusterResourceScheduler::GetNodeResources(int64_t node_id,
NodeResources *ret_resources) {
NodeResources *ret_resources) const {
auto it = nodes_.find(node_id);
if (it != nodes_.end()) {
*ret_resources = it->second;
@@ -779,3 +781,45 @@ void ClusterResourceScheduler::FreeLocalTaskResources(
FreeTaskResourceInstances(task_allocation);
UpdateLocalAvailableResourcesFromResourceInstances();
}
void ClusterResourceScheduler::Heartbeat(
bool light_heartbeat_enabled,
std::shared_ptr<HeartbeatTableData> heartbeat_data) const {
NodeResources resources;
RAY_CHECK(GetNodeResources(local_node_id_, &resources))
<< "Error: Populating heartbeat failed. Please file a bug report: "
"https://github.com/ray-project/ray/issues/new.";
if (light_heartbeat_enabled) {
// TODO
RAY_CHECK(false) << "TODO";
} else {
for (int i = 0; i < PredefinedResources_MAX; i++) {
const auto &label = ResourceEnumToString((PredefinedResources)i);
const auto &capacity = resources.predefined_resources[i];
if (capacity.available != 0) {
(*heartbeat_data->mutable_resources_available())[label] =
capacity.available.Double();
}
if (capacity.total != 0) {
(*heartbeat_data->mutable_resources_total())[label] = capacity.total.Double();
}
}
for (auto it = resources.custom_resources.begin();
it != resources.custom_resources.end(); it++) {
uint64_t custom_id = it->first;
const auto &capacity = it->second;
const auto &label = string_to_int_map_.Get(custom_id);
if (capacity.available != 0) {
(*heartbeat_data->mutable_resources_available())[label] =
capacity.available.Double();
}
if (capacity.total != 0) {
(*heartbeat_data->mutable_resources_total())[label] = capacity.total.Double();
}
}
}
}
} // namespace ray
@@ -26,6 +26,12 @@
#include "ray/raylet/scheduling/scheduling_ids.h"
#include "ray/util/logging.h"
#include "src/ray/protobuf/gcs.pb.h"
namespace ray {
using rpc::HeartbeatTableData;
// Specify resources that consists of unit-size instances.
static std::unordered_set<int64_t> UnitInstanceResources{CPU, GPU, TPU};
@@ -185,7 +191,7 @@ class ClusterResourceScheduler {
/// Return resources associated to the given node_id in ret_resources.
/// If node_id not found, return false; otherwise return true.
bool GetNodeResources(int64_t node_id, NodeResources *ret_resources);
bool GetNodeResources(int64_t node_id, NodeResources *ret_resources) const;
/// Get number of nodes in the cluster.
int64_t NumNodes();
@@ -366,6 +372,18 @@ class ClusterResourceScheduler {
// resources availabile at that node is 0.2 + 0.3 + 0.1 + 1. = 1.6
void UpdateLocalAvailableResourcesFromResourceInstances();
/// Populate the relevant parts of the heartbeat table. This is intended for
/// sending raylet <-> gcs heartbeats. In particular, this should fill in
/// resources_available and resources_total.
///
/// \param light_heartbeat_enabled Only send changed fields if true.
/// \param Output parameter. `resources_available` and `resources_total` are the only
/// fields used.
void Heartbeat(bool light_heartbeat_enabled,
std::shared_ptr<HeartbeatTableData> data) const;
/// Return human-readable string for this scheduler state.
std::string DebugString() const;
};
} // end namespace ray
@@ -18,6 +18,7 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "ray/common/task/scheduling_resources.h"
#include "ray/raylet/scheduling/scheduling_ids.h"
#ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION
@@ -28,6 +29,7 @@
using namespace std;
namespace ray {
// Used to path empty vector argiuments.
vector<int64_t> EmptyIntVector;
vector<bool> EmptyBoolVector;
@@ -172,8 +174,6 @@ bool nodeResourcesEqual(const NodeResources &nr1, const NodeResources &nr2) {
return true;
}
namespace ray {
class ClusterResourceSchedulerTest : public ::testing::Test {
public:
void SetUp() {}
@@ -970,6 +970,82 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstanceWithHardRequestTest) {
ASSERT_TRUE(EqualVectors(cpu_instances, expect_cpu_instance));
}
TEST_F(ClusterResourceSchedulerTest, HeartbeatTest) {
vector<int64_t> cust_ids{1, 2, 3, 4, 5};
NodeResources node_resources;
std::unordered_map<std::string, double> initial_resources(
{{"CPU", 1}, {"GPU", 2}, {"memory", 3}, {"1", 1}, {"2", 2}, {"3", 3}});
ClusterResourceScheduler cluster_resources("0", initial_resources);
NodeResources other_node_resources;
vector<FixedPoint> other_pred_capacities{1. /* CPU */, 1. /* MEM */, 1. /* GPU */};
vector<FixedPoint> other_cust_capacities{5., 4., 3., 2., 1.};
initNodeResources(other_node_resources, other_pred_capacities, cust_ids,
other_cust_capacities);
cluster_resources.AddOrUpdateNode(12345, other_node_resources);
{ // Cluster is idle.
auto data = std::make_shared<rpc::HeartbeatTableData>();
cluster_resources.Heartbeat(false, data);
auto available = data->resources_available();
auto total = data->resources_total();
ASSERT_EQ(available[kCPU_ResourceLabel], 1);
ASSERT_EQ(available[kGPU_ResourceLabel], 2);
ASSERT_EQ(available[kMemory_ResourceLabel], 3);
ASSERT_EQ(available["1"], 1);
ASSERT_EQ(available["2"], 2);
ASSERT_EQ(available["3"], 3);
ASSERT_EQ(total[kCPU_ResourceLabel], 1);
ASSERT_EQ(total[kGPU_ResourceLabel], 2);
ASSERT_EQ(total[kMemory_ResourceLabel], 3);
ASSERT_EQ(total["1"], 1);
ASSERT_EQ(total["2"], 2);
ASSERT_EQ(total["3"], 3);
// GCS doesn't like entries which are 0 (like TPU).
ASSERT_EQ(available.size(), 6);
ASSERT_EQ(total.size(), 6);
}
{ // Task running on node with {"CPU": 0.1, "1": 0.1}
std::shared_ptr<TaskResourceInstances> allocations =
std::make_shared<TaskResourceInstances>();
allocations->predefined_resources = {
{0.1}, // CPU
};
allocations->custom_resources = {
{1, {0.1}}, // "1"
};
std::unordered_map<std::string, double> allocation_map({
{"CPU", 0.1},
{"1", 0.1},
});
cluster_resources.AllocateLocalTaskResources(allocation_map, allocations);
auto data = std::make_shared<rpc::HeartbeatTableData>();
cluster_resources.Heartbeat(false, data);
auto available = data->resources_available();
auto total = data->resources_total();
ASSERT_EQ(available[kCPU_ResourceLabel], 0.9);
ASSERT_EQ(available[kGPU_ResourceLabel], 2);
ASSERT_EQ(available[kMemory_ResourceLabel], 3);
ASSERT_EQ(available["1"], 0.9);
ASSERT_EQ(available["2"], 2);
ASSERT_EQ(available["3"], 3);
ASSERT_EQ(total[kCPU_ResourceLabel], 1);
ASSERT_EQ(total[kGPU_ResourceLabel], 2);
ASSERT_EQ(total[kMemory_ResourceLabel], 3);
ASSERT_EQ(total["1"], 1);
ASSERT_EQ(total["2"], 2);
ASSERT_EQ(total["3"], 3);
}
}
} // namespace ray
int main(int argc, char **argv) {
@@ -1,5 +1,6 @@
#include "ray/raylet/scheduling/cluster_task_manager.h"
#include <google/protobuf/map.h>
#include "ray/raylet/scheduling/cluster_task_manager.h"
#include "ray/util/logging.h"
namespace ray {
@@ -184,6 +185,73 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) {
return false;
}
void ClusterTaskManager::Heartbeat(bool light_heartbeat_enabled,
std::shared_ptr<HeartbeatTableData> data) const {
auto resource_loads = data->mutable_resource_load();
auto resource_load_by_shape =
data->mutable_resource_load_by_shape()->mutable_resource_demands();
if (light_heartbeat_enabled) {
RAY_CHECK(false) << "TODO";
} else {
// TODO (Alex): Implement the 1-CPU task optimization.
for (const auto &work : tasks_to_schedule_) {
const auto &task = std::get<0>(work);
const auto &resources =
task.GetTaskSpecification().GetRequiredResources().GetResourceMap();
auto by_shape_entry = resource_load_by_shape->Add();
for (const auto &resource : resources) {
// Add to `resource_loads`.
const auto &label = resource.first;
const auto &quantity = resource.second;
const auto &entry = resource_loads->find(label);
if (entry == resource_loads->end()) {
(*resource_loads)[label] = quantity;
} else {
(*resource_loads)[label] = entry->second + quantity;
}
// TODO (Alex): Adding repeated entries with quantity 1 is fine, but inefficient.
// Add to `resource_load_by_shape`.
(*by_shape_entry->mutable_shape())[label] = quantity;
// TODO (Alex): Technically being on `tasks_to_schedule` could also mean
// that the entire cluster is utilized.
by_shape_entry->set_num_infeasible_requests_queued(1);
}
}
for (const auto &work : tasks_to_dispatch_) {
const auto &task = std::get<0>(work);
const auto &resources =
task.GetTaskSpecification().GetRequiredResources().GetResourceMap();
auto by_shape_entry = resource_load_by_shape->Add();
for (auto to_add_it = resources.begin(); to_add_it != resources.end();
to_add_it++) {
// Add to `resource_loads`.
const auto &label = to_add_it->first;
const auto &quantity = to_add_it->second;
const auto &entry = resource_loads->find(label);
if (entry == resource_loads->end()) {
(*resource_loads)[label] = quantity;
} else {
(*resource_loads)[label] = entry->second + quantity;
}
// TODO (Alex): Adding repeated entries with quantity 1 is fine, but inefficient.
// Add to `resource_load_by_shape`.
(*by_shape_entry->mutable_shape())[label] = quantity;
// TODO (Alex): Technically being on `tasks_to_schedule` could also mean
// that the entire cluster is utilized.
by_shape_entry->set_num_ready_requests_queued(1);
}
}
}
}
std::string ClusterTaskManager::DebugString() {
std::stringstream buffer;
buffer << "========== Node: " << self_node_id_ << " =================\n";
@@ -96,6 +96,16 @@ class ClusterTaskManager {
/// false if the task is already running.
bool CancelTask(const TaskID &task_id);
/// Populate the relevant parts of the heartbeat table. This is intended for
/// sending raylet <-> gcs heartbeats. In particular, this should fill in
/// resource_load and resource_load_by_shape.
///
/// \param light_heartbeat_enabled Only send changed fields if true.
/// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only
/// fields used.
void Heartbeat(bool light_heartbeat_enabled,
std::shared_ptr<HeartbeatTableData> data) const;
std::string DebugString();
private:
@@ -483,10 +483,90 @@ TEST_F(ClusterTaskManagerTest, TaskCancellationTest) {
ASSERT_EQ(leased_workers_.size(), 1);
}
TEST_F(ClusterTaskManagerTest, HeartbeatTest) {
std::shared_ptr<MockWorker> worker =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));
{
Task task = CreateTask({{ray::kCPU_ResourceLabel, 1}});
rpc::RequestWorkerLeaseReply reply;
bool callback_called = false;
bool *callback_called_ptr = &callback_called;
auto callback = [callback_called_ptr]() { *callback_called_ptr = true; };
task_manager_.QueueTask(task, &reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_TRUE(callback_called);
// Now {CPU: 7, GPU: 4, MEM:128}
}
{
Task task = CreateTask({{ray::kCPU_ResourceLabel, 1}});
rpc::RequestWorkerLeaseReply reply;
bool callback_called = false;
bool *callback_called_ptr = &callback_called;
auto callback = [callback_called_ptr]() { *callback_called_ptr = true; };
task_manager_.QueueTask(task, &reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_FALSE(callback_called); // No worker available.
// Now {CPU: 7, GPU: 4, MEM:128} with 1 queued task.
}
{
Task task = CreateTask({{ray::kCPU_ResourceLabel, 9}, {ray::kGPU_ResourceLabel, 5}});
rpc::RequestWorkerLeaseReply reply;
bool callback_called = false;
bool *callback_called_ptr = &callback_called;
auto callback = [callback_called_ptr]() { *callback_called_ptr = true; };
task_manager_.QueueTask(task, &reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_FALSE(callback_called); // Infeasible.
// Now there is also an infeasible task {CPU: 9}.
}
{
auto data = std::make_shared<rpc::HeartbeatTableData>();
task_manager_.Heartbeat(false, data);
auto load = data->mutable_resource_load();
ASSERT_EQ(load->size(), 2);
ASSERT_EQ((*load)["CPU"], 10); // 9 + 1 = 10
ASSERT_EQ((*load)["GPU"], 5);
auto load_by_shape =
data->mutable_resource_load_by_shape()->mutable_resource_demands();
ASSERT_EQ(load_by_shape->size(), 2);
auto load1 = (*load_by_shape)[0];
auto load2 = (*load_by_shape)[1];
ASSERT_EQ(load1.num_infeasible_requests_queued(), 1);
ASSERT_EQ(load1.num_ready_requests_queued(), 0);
ASSERT_EQ((*load1.mutable_shape())["CPU"], 9);
ASSERT_EQ((*load1.mutable_shape())["GPU"], 5);
ASSERT_EQ((*load1.mutable_shape()).size(), 2);
ASSERT_EQ(load2.num_infeasible_requests_queued(), 0);
ASSERT_EQ(load2.num_ready_requests_queued(), 1);
ASSERT_EQ((*load2.mutable_shape())["CPU"], 1);
ASSERT_EQ((*load2.mutable_shape()).size(), 1);
}
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
} // namespace raylet
} // namespace ray
+1 -1
View File
@@ -67,4 +67,4 @@ std::ostream &operator<<(std::ostream &out, FixedPoint const &ru1) {
return out;
}
double FixedPoint::Double() { return round(i_) / RESOURCE_UNIT_SCALING; };
double FixedPoint::Double() const { return round(i_) / RESOURCE_UNIT_SCALING; };
+1 -1
View File
@@ -37,7 +37,7 @@ class FixedPoint {
bool operator==(FixedPoint const &ru1) const;
bool operator!=(FixedPoint const &ru1) const;
double Double();
double Double() const;
friend std::ostream &operator<<(std::ostream &out, FixedPoint const &ru1);
};
+2 -2
View File
@@ -14,7 +14,7 @@
#include "ray/raylet/scheduling/scheduling_ids.h"
int64_t StringIdMap::Get(const std::string &string_id) {
int64_t StringIdMap::Get(const std::string &string_id) const {
auto it = string_to_int_.find(string_id);
if (it == string_to_int_.end()) {
return -1;
@@ -23,7 +23,7 @@ int64_t StringIdMap::Get(const std::string &string_id) {
}
};
std::string StringIdMap::Get(uint64_t id) {
std::string StringIdMap::Get(uint64_t id) const {
std::string id_string;
auto it = int_to_string_.find(id);
if (it == int_to_string_.end()) {
+2 -2
View File
@@ -36,13 +36,13 @@ class StringIdMap {
///
/// \param String ID.
/// \return The integer ID associated with the given string ID.
int64_t Get(const std::string &string_id);
int64_t Get(const std::string &string_id) const;
/// Get string ID associated with an existing integer ID.
///
/// \param Integre ID.
/// \return The string ID associated with the given integer ID.
std::string Get(uint64_t id);
std::string Get(uint64_t id) const;
/// Insert a string ID and get the associated integer ID.
///