diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index b9b33df36..7559e5392 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -39,7 +39,7 @@ RAY_CONFIG(int64_t, raylet_heartbeat_timeout_milliseconds, 100) /// Whether to send heartbeat lightly. When it is enalbed, only changed part, /// like should_global_gc or changed resources, will be included in the heartbeat, /// and gcs only broadcast the changed heartbeat. -RAY_CONFIG(bool, light_heartbeat_enabled, false) +RAY_CONFIG(bool, light_heartbeat_enabled, true) /// If a component has not sent a heartbeat in the last num_heartbeats_timeout /// heartbeat intervals, the raylet monitor process will report /// it as dead to the db_client table. diff --git a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc index 3d5985afc..1198c0908 100644 --- a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc +++ b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc @@ -206,17 +206,59 @@ TEST_F(GlobalStateAccessorTest, TestGetAllHeartbeat) { auto node_table = global_state_->GetAllNodeInfo(); ASSERT_EQ(node_table.size(), 1); - // Report heartbeat + // Report heartbeat first time. std::promise promise1; - auto heartbeat = std::make_shared(); - heartbeat->set_client_id(node_table_data->node_id()); + auto heartbeat1 = std::make_shared(); + heartbeat1->set_client_id(node_table_data->node_id()); RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportHeartbeat( - heartbeat, [&promise1](Status status) { promise1.set_value(status.ok()); })); + heartbeat1, [&promise1](Status status) { promise1.set_value(status.ok()); })); WaitReady(promise1.get_future(), timeout_ms_); heartbeats = global_state_->GetAllHeartbeat(); heartbeat_batch_data.ParseFromString(*heartbeats.get()); ASSERT_EQ(heartbeat_batch_data.batch_size(), 1); + + // Report heartbeat with resources changed. + std::promise promise2; + auto heartbeat2 = std::make_shared(); + heartbeat2->set_client_id(node_table_data->node_id()); + (*heartbeat2->mutable_resources_total())["CPU"] = 1; + (*heartbeat2->mutable_resources_total())["GPU"] = 10; + heartbeat2->set_resources_available_changed(true); + (*heartbeat2->mutable_resources_available())["GPU"] = 5; + RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportHeartbeat( + heartbeat2, [&promise2](Status status) { promise2.set_value(status.ok()); })); + WaitReady(promise2.get_future(), timeout_ms_); + + heartbeats = global_state_->GetAllHeartbeat(); + heartbeat_batch_data.ParseFromString(*heartbeats.get()); + ASSERT_EQ(heartbeat_batch_data.batch_size(), 1); + auto heartbeat_data = heartbeat_batch_data.mutable_batch()->at(0); + ASSERT_EQ(heartbeat_data.resources_total_size(), 2); + ASSERT_EQ((*heartbeat_data.mutable_resources_total())["CPU"], 1.0); + ASSERT_EQ((*heartbeat_data.mutable_resources_total())["GPU"], 10.0); + ASSERT_EQ(heartbeat_data.resources_available_size(), 1); + ASSERT_EQ((*heartbeat_data.mutable_resources_available())["GPU"], 5.0); + + // Report heartbeat with resources unchanged. (Only works with light heartbeat enabled) + std::promise promise3; + auto heartbeat3 = std::make_shared(); + heartbeat3->set_client_id(node_table_data->node_id()); + (*heartbeat3->mutable_resources_available())["CPU"] = 1; + (*heartbeat3->mutable_resources_available())["GPU"] = 6; + RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportHeartbeat( + heartbeat3, [&promise3](Status status) { promise3.set_value(status.ok()); })); + WaitReady(promise3.get_future(), timeout_ms_); + + heartbeats = global_state_->GetAllHeartbeat(); + heartbeat_batch_data.ParseFromString(*heartbeats.get()); + ASSERT_EQ(heartbeat_batch_data.batch_size(), 1); + heartbeat_data = heartbeat_batch_data.mutable_batch()->at(0); + ASSERT_EQ(heartbeat_data.resources_total_size(), 2); + ASSERT_EQ((*heartbeat_data.mutable_resources_total())["CPU"], 1.0); + ASSERT_EQ((*heartbeat_data.mutable_resources_total())["GPU"], 10.0); + ASSERT_EQ(heartbeat_data.resources_available_size(), 1); + ASSERT_EQ((*heartbeat_data.mutable_resources_available())["GPU"], 5.0); } TEST_F(GlobalStateAccessorTest, TestProfileTable) { diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index d59ec24d8..4fbeb1cf5 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -808,6 +808,34 @@ TEST_F(ServiceBasedGcsClientTest, TestNodeHeartbeat) { WaitForExpectedCount(heartbeat_batch_count, 1); } +TEST_F(ServiceBasedGcsClientTest, TestNodeHeartbeatWithLightHeartbeat) { + // Subscribe batched state of all nodes from GCS. + std::atomic heartbeat_batch_count(0); + auto on_subscribe = + [&heartbeat_batch_count](const gcs::HeartbeatBatchTableData &result) { + ++heartbeat_batch_count; + }; + ASSERT_TRUE(SubscribeBatchHeartbeat(on_subscribe)); + + // Register node. + auto node_info = Mocker::GenNodeInfo(); + RAY_CHECK(RegisterNode(*node_info)); + + // Report unchanged heartbeat of a node to GCS. + NodeID node_id = NodeID::FromBinary(node_info->node_id()); + auto heartbeat = std::make_shared(); + heartbeat->set_client_id(node_id.Binary()); + ASSERT_TRUE(ReportHeartbeat(heartbeat)); + WaitForExpectedCount(heartbeat_batch_count, 0); + + // Report changed heartbeat of a node to GCS. + auto heartbeat1 = std::make_shared(); + heartbeat1->set_client_id(node_id.Binary()); + heartbeat1->set_resources_available_changed(true); + ASSERT_TRUE(ReportHeartbeat(heartbeat1)); + WaitForExpectedCount(heartbeat_batch_count, 1); +} + TEST_F(ServiceBasedGcsClientTest, TestGetAllAvailableResources) { // Subscribe batched state of all nodes from GCS. std::atomic heartbeat_batch_count(0); @@ -825,8 +853,8 @@ TEST_F(ServiceBasedGcsClientTest, TestGetAllAvailableResources) { NodeID node_id = NodeID::FromBinary(node_info->node_id()); auto heartbeat = std::make_shared(); heartbeat->set_client_id(node_id.Binary()); - // Set this flag because GCS won't publish unchanged heartbeat. - heartbeat->set_should_global_gc(true); + // Set this flag to indicate resources has changed. + heartbeat->set_resources_available_changed(true); (*heartbeat->mutable_resources_available())["CPU"] = 1.0; (*heartbeat->mutable_resources_available())["GPU"] = 10.0; ASSERT_TRUE(ReportHeartbeat(heartbeat)); @@ -840,6 +868,51 @@ TEST_F(ServiceBasedGcsClientTest, TestGetAllAvailableResources) { EXPECT_EQ((*resources[0].mutable_resources_available())["GPU"], 10.0); } +TEST_F(ServiceBasedGcsClientTest, TestGetAllAvailableResourcesWithLightHeartbeat) { + // Subscribe batched state of all nodes from GCS. + std::atomic heartbeat_batch_count(0); + auto on_subscribe = + [&heartbeat_batch_count](const gcs::HeartbeatBatchTableData &result) { + ++heartbeat_batch_count; + }; + ASSERT_TRUE(SubscribeBatchHeartbeat(on_subscribe)); + + // Register node. + auto node_info = Mocker::GenNodeInfo(); + RAY_CHECK(RegisterNode(*node_info)); + + // Report heartbeat of a node to GCS. + NodeID node_id = NodeID::FromBinary(node_info->node_id()); + auto heartbeat = std::make_shared(); + heartbeat->set_client_id(node_id.Binary()); + heartbeat->set_resources_available_changed(true); + (*heartbeat->mutable_resources_available())["CPU"] = 1.0; + (*heartbeat->mutable_resources_available())["GPU"] = 10.0; + ASSERT_TRUE(ReportHeartbeat(heartbeat)); + WaitForExpectedCount(heartbeat_batch_count, 1); + + // Assert get all available resources right. + std::vector resources = GetAllAvailableResources(); + EXPECT_EQ(resources.size(), 1); + EXPECT_EQ(resources[0].resources_available_size(), 2); + EXPECT_EQ((*resources[0].mutable_resources_available())["CPU"], 1.0); + EXPECT_EQ((*resources[0].mutable_resources_available())["GPU"], 10.0); + + // Report unchanged heartbeat of a node to GCS. + auto heartbeat1 = std::make_shared(); + heartbeat1->set_client_id(node_id.Binary()); + (*heartbeat1->mutable_resources_available())["GPU"] = 8.0; + ASSERT_TRUE(ReportHeartbeat(heartbeat1)); + WaitForExpectedCount(heartbeat_batch_count, 1); + + // The value would remain unchanged. + std::vector resources1 = GetAllAvailableResources(); + EXPECT_EQ(resources1.size(), 1); + EXPECT_EQ(resources1[0].resources_available_size(), 2); + EXPECT_EQ((*resources1[0].mutable_resources_available())["CPU"], 1.0); + EXPECT_EQ((*resources1[0].mutable_resources_available())["GPU"], 10.0); +} + TEST_F(ServiceBasedGcsClientTest, TestTaskInfo) { JobID job_id = JobID::FromInt(1); TaskID task_id = TaskID::ForDriverTask(job_id); diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 954f16bd6..8d2577bdb 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -352,7 +352,7 @@ void GcsNodeManager::HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &re if (!node_heartbeats_.empty()) { auto batch = std::make_shared(); absl::flat_hash_map aggregate_load; - for (auto &heartbeat : node_heartbeats_) { + for (const auto &heartbeat : node_heartbeats_) { // Aggregate the load reported by each raylet. auto load = heartbeat.second.resource_load_by_shape(); for (const auto &demand : load.resource_demands()) { @@ -369,14 +369,13 @@ void GcsNodeManager::HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &re demand.backlog_size()); } } - heartbeat.second.clear_resource_load_by_shape(); - batch->add_batch()->Swap(&heartbeat.second); + batch->add_batch()->CopyFrom(heartbeat.second); } - for (auto &demand : aggregate_load) { + for (const auto &demand : aggregate_load) { auto demand_proto = batch->mutable_resource_load_by_shape()->add_resource_demands(); - demand_proto->Swap(&demand.second); + demand_proto->CopyFrom(demand.second); for (const auto &resource_pair : demand.first.GetResourceMap()) { (*demand_proto->mutable_shape())[resource_pair.first] = resource_pair.second; } @@ -415,6 +414,8 @@ void GcsNodeManager::UpdateNodeHeartbeat(const NodeID node_id, if (request.heartbeat().resource_load_changed()) { (*iter->second.mutable_resource_load()) = request.heartbeat().resource_load(); } + (*iter->second.mutable_resource_load_by_shape()) = + request.heartbeat().resource_load_by_shape(); } } @@ -533,6 +534,7 @@ void GcsNodeManager::StartNodeFailureDetector() { void GcsNodeManager::UpdateNodeRealtimeResources( const NodeID &node_id, const rpc::HeartbeatTableData &heartbeat) { if (!RayConfig::instance().light_heartbeat_enabled() || + cluster_realtime_resources_.count(node_id) == 0 || heartbeat.resources_available_changed()) { cluster_realtime_resources_[node_id] = ResourceSet(MapFromProtobuf(heartbeat.resources_available())); @@ -564,7 +566,6 @@ void GcsNodeManager::AddDeadNodeToCache(std::shared_ptr node) void GcsNodeManager::SendBatchedHeartbeat() { if (!heartbeat_buffer_.empty()) { auto batch = std::make_shared(); - std::unordered_map aggregate_load; for (auto &heartbeat : heartbeat_buffer_) { batch->add_batch()->Swap(&heartbeat.second); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 07008917f..bb247f28e 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -485,10 +485,12 @@ void NodeManager::Heartbeat() { } } - // Add resource load by shape. This will be used by the new autoscaler. - auto resource_load = local_queues_.GetResourceLoadByShape( - RayConfig::instance().max_resource_shapes_per_load_report()); - heartbeat_data->mutable_resource_load_by_shape()->Swap(&resource_load); + if (!new_scheduler_enabled_) { + // Add resource load by shape. This will be used by the new autoscaler. + auto resource_load = local_queues_.GetResourceLoadByShape( + RayConfig::instance().max_resource_shapes_per_load_report()); + heartbeat_data->mutable_resource_load_by_shape()->Swap(&resource_load); + } // Set the global gc bit on the outgoing heartbeat message. if (should_global_gc_) { @@ -884,7 +886,7 @@ void NodeManager::HeartbeatAdded(const NodeID &client_id, ResourceSet remote_total(MapFromProtobuf(heartbeat_data.resources_total())); remote_resources.SetTotalResources(std::move(remote_total)); } - if (heartbeat_data.resource_load_changed()) { + if (heartbeat_data.resources_available_changed()) { ResourceSet remote_available(MapFromProtobuf(heartbeat_data.resources_available())); remote_resources.SetAvailableResources(std::move(remote_available)); } diff --git a/src/ray/raylet/scheduling/cluster_resource_data.h b/src/ray/raylet/scheduling/cluster_resource_data.h index 1d1f8c592..9c769ecf4 100644 --- a/src/ray/raylet/scheduling/cluster_resource_data.h +++ b/src/ray/raylet/scheduling/cluster_resource_data.h @@ -150,6 +150,10 @@ class TaskResourceInstances { /// Total and available capacities of each resource of a node. class NodeResources { public: + NodeResources() {} + NodeResources(const NodeResources &other) + : predefined_resources(other.predefined_resources), + custom_resources(other.custom_resources) {} /// Available and total capacities for predefined resources. std::vector predefined_resources; /// Map containing custom resources. The key of each entry represents the diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 3646fabae..86ffb2e82 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -804,17 +804,16 @@ void ClusterResourceScheduler::FreeLocalTaskResources( } void ClusterResourceScheduler::Heartbeat( - bool light_heartbeat_enabled, - std::shared_ptr heartbeat_data) const { + bool light_heartbeat_enabled, std::shared_ptr heartbeat_data) { NodeResources resources; RAY_CHECK(GetNodeResources(local_node_id_, &resources)) << "Error: Populating heartbeat failed. Please file a bug report: " "https://github.com/ray-project/ray/issues/new."; - if (light_heartbeat_enabled) { - // TODO - RAY_CHECK(false) << "TODO"; + if (light_heartbeat_enabled && last_report_resources_ && + resources == *last_report_resources_.get()) { + return; } else { for (int i = 0; i < PredefinedResources_MAX; i++) { const auto &label = ResourceEnumToString((PredefinedResources)i); @@ -840,6 +839,10 @@ void ClusterResourceScheduler::Heartbeat( (*heartbeat_data->mutable_resources_total())[label] = capacity.total.Double(); } } + heartbeat_data->set_resources_available_changed(true); + if (light_heartbeat_enabled) { + last_report_resources_.reset(new NodeResources(resources)); + } } } diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index c71859f3e..15375bd10 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -49,6 +49,8 @@ class ClusterResourceScheduler { /// Keep the mapping between node and resource IDs in string representation /// to integer representation. Used for improving map performance. StringIdMap string_to_int_map_; + /// Cached resources, used to compare with newest one in light heartbeat mode. + std::unique_ptr last_report_resources_; /// Set predefined resources. /// @@ -382,8 +384,7 @@ class ClusterResourceScheduler { /// \param light_heartbeat_enabled Only send changed fields if true. /// \param Output parameter. `resources_available` and `resources_total` are the only /// fields used. - void Heartbeat(bool light_heartbeat_enabled, - std::shared_ptr data) const; + void Heartbeat(bool light_heartbeat_enabled, std::shared_ptr data); /// Return human-readable string for this scheduler state. std::string DebugString() const; diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index e0487cbdd..482cf6eb4 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -229,60 +229,59 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) { void ClusterTaskManager::Heartbeat(bool light_heartbeat_enabled, std::shared_ptr data) const { + // TODO (WangTao): Find a way to check if load changed and combine it with light + // heartbeat. Now we just report it every time. + data->set_resource_load_changed(true); auto resource_loads = data->mutable_resource_load(); auto resource_load_by_shape = data->mutable_resource_load_by_shape()->mutable_resource_demands(); - if (light_heartbeat_enabled) { - RAY_CHECK(false) << "TODO"; - } else { - // TODO (Alex): Implement the 1-CPU task optimization. - for (const auto &pair : tasks_to_schedule_) { - const auto &scheduling_class = pair.first; - const auto &resources = - TaskSpecification::GetSchedulingClassDescriptor(scheduling_class) - .GetResourceMap(); - const auto &queue = pair.second; - const auto &count = queue.size(); + // TODO (Alex): Implement the 1-CPU task optimization. + for (const auto &pair : tasks_to_schedule_) { + const auto &scheduling_class = pair.first; + const auto &resources = + TaskSpecification::GetSchedulingClassDescriptor(scheduling_class) + .GetResourceMap(); + const auto &queue = pair.second; + const auto &count = queue.size(); - auto by_shape_entry = resource_load_by_shape->Add(); + auto by_shape_entry = resource_load_by_shape->Add(); - for (const auto &resource : resources) { - // Add to `resource_loads`. - const auto &label = resource.first; - const auto &quantity = resource.second; - (*resource_loads)[label] += quantity * count; + for (const auto &resource : resources) { + // Add to `resource_loads`. + const auto &label = resource.first; + const auto &quantity = resource.second; + (*resource_loads)[label] += quantity * count; - // Add to `resource_load_by_shape`. - (*by_shape_entry->mutable_shape())[label] = quantity; - // TODO (Alex): Technically being on `tasks_to_schedule` could also mean - // that the entire cluster is utilized. - by_shape_entry->set_num_infeasible_requests_queued(count); - } + // Add to `resource_load_by_shape`. + (*by_shape_entry->mutable_shape())[label] = quantity; + // TODO (Alex): Technically being on `tasks_to_schedule` could also mean + // that the entire cluster is utilized. + by_shape_entry->set_num_infeasible_requests_queued(count); } + } - for (const auto &pair : tasks_to_dispatch_) { - const auto &scheduling_class = pair.first; - const auto &resources = - TaskSpecification::GetSchedulingClassDescriptor(scheduling_class) - .GetResourceMap(); - const auto &queue = pair.second; - const auto &count = queue.size(); + for (const auto &pair : tasks_to_dispatch_) { + const auto &scheduling_class = pair.first; + const auto &resources = + TaskSpecification::GetSchedulingClassDescriptor(scheduling_class) + .GetResourceMap(); + const auto &queue = pair.second; + const auto &count = queue.size(); - auto by_shape_entry = resource_load_by_shape->Add(); + auto by_shape_entry = resource_load_by_shape->Add(); - for (const auto &resource : resources) { - // Add to `resource_loads`. - const auto &label = resource.first; - const auto &quantity = resource.second; - (*resource_loads)[label] += quantity * count; + for (const auto &resource : resources) { + // Add to `resource_loads`. + const auto &label = resource.first; + const auto &quantity = resource.second; + (*resource_loads)[label] += quantity * count; - // Add to `resource_load_by_shape`. - (*by_shape_entry->mutable_shape())[label] = quantity; - // TODO (Alex): Technically being on `tasks_to_schedule` could also mean - // that the entire cluster is utilized. - by_shape_entry->set_num_ready_requests_queued(count); - } + // Add to `resource_load_by_shape`. + (*by_shape_entry->mutable_shape())[label] = quantity; + // TODO (Alex): Technically being on `tasks_to_schedule` could also mean + // that the entire cluster is utilized. + by_shape_entry->set_num_ready_requests_queued(count); } } }