From ab2229dcb748e50851e2670e56f690f98acfea61 Mon Sep 17 00:00:00 2001 From: Tao Wang Date: Fri, 8 Jan 2021 10:30:00 +0800 Subject: [PATCH] [GCS] Remove old lightweight resource usage report code path (#13192) --- python/ray/includes/ray_config.pxd | 2 - python/ray/includes/ray_config.pxi | 4 - src/ray/common/ray_config_def.h | 3 - .../gcs/gcs_client/service_based_accessor.cc | 43 ++- .../gcs/gcs_server/gcs_resource_manager.cc | 10 +- src/ray/gcs/gcs_server/gcs_resource_manager.h | 2 - src/ray/raylet/node_manager.cc | 112 +++---- src/ray/raylet/node_manager.h | 7 +- .../scheduling/cluster_resource_scheduler.cc | 103 +++--- .../scheduling/cluster_resource_scheduler.h | 9 +- .../cluster_resource_scheduler_test.cc | 295 ++++++++---------- .../raylet/scheduling/cluster_task_manager.cc | 1 - .../raylet/scheduling/cluster_task_manager.h | 6 +- .../scheduling/cluster_task_manager_test.cc | 6 +- 14 files changed, 239 insertions(+), 364 deletions(-) diff --git a/python/ray/includes/ray_config.pxd b/python/ray/includes/ray_config.pxd index 03979863d..079f30690 100644 --- a/python/ray/includes/ray_config.pxd +++ b/python/ray/includes/ray_config.pxd @@ -15,8 +15,6 @@ cdef extern from "ray/common/ray_config.h" nogil: int64_t raylet_heartbeat_timeout_milliseconds() const - c_bool light_report_resource_usage_enabled() const - int64_t debug_dump_period_milliseconds() const int64_t num_heartbeats_timeout() const diff --git a/python/ray/includes/ray_config.pxi b/python/ray/includes/ray_config.pxi index b9e26cf08..96a2a14f2 100644 --- a/python/ray/includes/ray_config.pxi +++ b/python/ray/includes/ray_config.pxi @@ -13,10 +13,6 @@ cdef class Config: def raylet_heartbeat_timeout_milliseconds(): return RayConfig.instance().raylet_heartbeat_timeout_milliseconds() - @staticmethod - def light_report_resource_usage_enabled(): - return RayConfig.instance().light_report_resource_usage_enabled() - @staticmethod def debug_dump_period_milliseconds(): return RayConfig.instance().debug_dump_period_milliseconds() diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 0e470b45d..d5c4386e4 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -47,9 +47,6 @@ RAY_CONFIG(uint64_t, num_heartbeats_warning, 5) /// The duration between reporting resources sent by the raylets. RAY_CONFIG(int64_t, raylet_report_resources_period_milliseconds, 100) -/// Whether to report resource usage lightly. When it is enalbed, only changed part, -/// like should_global_gc or changed resources, will be included in the message. -RAY_CONFIG(bool, light_report_resource_usage_enabled, true) /// The duration between dumping debug info to logs, or -1 to disable. RAY_CONFIG(int64_t, debug_dump_period_milliseconds, 10000) diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index c276a7c5f..0f30d748d 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -731,32 +731,29 @@ void ServiceBasedNodeResourceInfoAccessor::AsyncReReportResourceUsage() { void ServiceBasedNodeResourceInfoAccessor::FillResourceUsageRequest( rpc::ReportResourceUsageRequest &resources) { - if (RayConfig::instance().light_report_resource_usage_enabled()) { - SchedulingResources cached_resources = SchedulingResources(*GetLastResourceUsage()); + SchedulingResources cached_resources = SchedulingResources(*GetLastResourceUsage()); - auto resources_data = resources.mutable_resources(); - resources_data->clear_resources_total(); - for (const auto &resource_pair : - cached_resources.GetTotalResources().GetResourceMap()) { - (*resources_data->mutable_resources_total())[resource_pair.first] = - resource_pair.second; - } + auto resources_data = resources.mutable_resources(); + resources_data->clear_resources_total(); + for (const auto &resource_pair : + cached_resources.GetTotalResources().GetResourceMap()) { + (*resources_data->mutable_resources_total())[resource_pair.first] = + resource_pair.second; + } - resources_data->clear_resources_available(); - resources_data->set_resources_available_changed(true); - for (const auto &resource_pair : - cached_resources.GetAvailableResources().GetResourceMap()) { - (*resources_data->mutable_resources_available())[resource_pair.first] = - resource_pair.second; - } + resources_data->clear_resources_available(); + resources_data->set_resources_available_changed(true); + for (const auto &resource_pair : + cached_resources.GetAvailableResources().GetResourceMap()) { + (*resources_data->mutable_resources_available())[resource_pair.first] = + resource_pair.second; + } - resources_data->clear_resource_load(); - resources_data->set_resource_load_changed(true); - for (const auto &resource_pair : - cached_resources.GetLoadResources().GetResourceMap()) { - (*resources_data->mutable_resource_load())[resource_pair.first] = - resource_pair.second; - } + resources_data->clear_resource_load(); + resources_data->set_resource_load_changed(true); + for (const auto &resource_pair : cached_resources.GetLoadResources().GetResourceMap()) { + (*resources_data->mutable_resource_load())[resource_pair.first] = + resource_pair.second; } } diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index 483b915a9..15662e5b8 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -23,8 +23,6 @@ GcsResourceManager::GcsResourceManager( boost::asio::io_service &main_io_service, std::shared_ptr gcs_pub_sub, std::shared_ptr gcs_table_storage) : resource_timer_(main_io_service), - light_report_resource_usage_enabled_( - RayConfig::instance().light_report_resource_usage_enabled()), gcs_pub_sub_(gcs_pub_sub), gcs_table_storage_(gcs_table_storage) { SendBatchedResourceUsage(); @@ -171,8 +169,7 @@ void GcsResourceManager::HandleReportResourceUsage( // We use `node_resource_usages_` to filter out the nodes that report resource // information for the first time. `UpdateNodeResourceUsage` will modify // `node_resource_usages_`, so we need to do it before `UpdateNodeResourceUsage`. - if (!light_report_resource_usage_enabled_ || - node_resource_usages_.count(node_id) == 0 || + if (node_resource_usages_.count(node_id) == 0 || resources_data->resources_available_changed()) { const auto &resource_changed = MapFromProtobuf(resources_data->resources_available()); SetAvailableResources(node_id, ResourceSet(resource_changed)); @@ -180,8 +177,7 @@ void GcsResourceManager::HandleReportResourceUsage( UpdateNodeResourceUsage(node_id, request); - if (!light_report_resource_usage_enabled_ || resources_data->should_global_gc() || - resources_data->resources_total_size() > 0 || + if (resources_data->should_global_gc() || resources_data->resources_total_size() > 0 || resources_data->resources_available_changed() || resources_data->resource_load_changed()) { resources_buffer_[node_id] = *resources_data; @@ -243,7 +239,7 @@ void GcsResourceManager::HandleGetAllResourceUsage( void GcsResourceManager::UpdateNodeResourceUsage( const NodeID node_id, const rpc::ReportResourceUsageRequest &request) { auto iter = node_resource_usages_.find(node_id); - if (!light_report_resource_usage_enabled_ || iter == node_resource_usages_.end()) { + if (iter == node_resource_usages_.end()) { auto resources_data = std::make_shared(); resources_data->CopyFrom(request.resources()); node_resource_usages_[node_id] = *resources_data; diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.h b/src/ray/gcs/gcs_server/gcs_resource_manager.h index f606c9823..792a8b3e4 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.h +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.h @@ -157,8 +157,6 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler { /// A timer that ticks every raylet_report_resources_period_milliseconds. boost::asio::deadline_timer resource_timer_; - // Only the changed part will be reported if this is true. - const bool light_report_resource_usage_enabled_; /// Newest resource usage of all nodes. absl::flat_hash_map node_resource_usages_; /// A buffer containing resource usage received from node managers in the last tick. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 356b5fa34..4df7a65b5 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -133,8 +133,6 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self object_pinning_enabled_(config.object_pinning_enabled), temp_dir_(config.temp_dir), object_manager_profile_timer_(io_service), - light_report_resource_usage_enabled_( - RayConfig::instance().light_report_resource_usage_enabled()), initial_config_(config), local_available_resources_(config.resource_config), worker_pool_(io_service, config.num_workers_soft_limit, @@ -286,7 +284,7 @@ ray::Status NodeManager::RegisterGcs() { // Subscribe to resource usage batches from the monitor. const auto &resource_usage_batch_added = [this](const ResourceUsageBatchData &resource_usage_batch) { - ResourceUsageBatchAdded(resource_usage_batch); + ResourceUsageBatchReceived(resource_usage_batch); }; RAY_RETURN_NOT_OK(gcs_client_->NodeResources().AsyncSubscribeBatchedResourceUsage( resource_usage_batch_added, /*done*/ nullptr)); @@ -447,76 +445,50 @@ void NodeManager::ReportResourceUsage() { if (new_scheduler_enabled_) { // Update local chche from gcs remote cache, this is needed when gcs restart. // We should always keep the cache view consistent. - new_resource_scheduler_->UpdateLastReportResourcesFromGcs( + new_resource_scheduler_->UpdateLastResourceUsage( gcs_client_->NodeResources().GetLastResourceUsage()); - new_resource_scheduler_->FillResourceUsage(light_report_resource_usage_enabled_, - resources_data); - cluster_task_manager_->FillResourceUsage(light_report_resource_usage_enabled_, - resources_data); + new_resource_scheduler_->FillResourceUsage(resources_data); + cluster_task_manager_->FillResourceUsage(resources_data); } else { // TODO(atumanov): modify the heartbeat table protocol to use the ResourceSet // directly. // TODO(atumanov): implement a ResourceSet const_iterator. - // If light resource usage report enabled, we only set filed that represent resources - // changed. - if (light_report_resource_usage_enabled_) { - auto last_heartbeat_resources = gcs_client_->NodeResources().GetLastResourceUsage(); - if (!last_heartbeat_resources->GetTotalResources().IsEqual( - local_resources.GetTotalResources())) { - for (const auto &resource_pair : - local_resources.GetTotalResources().GetResourceMap()) { - (*resources_data->mutable_resources_total())[resource_pair.first] = - resource_pair.second; - } - last_heartbeat_resources->SetTotalResources( - ResourceSet(local_resources.GetTotalResources())); - } - - if (!last_heartbeat_resources->GetAvailableResources().IsEqual( - local_resources.GetAvailableResources())) { - resources_data->set_resources_available_changed(true); - for (const auto &resource_pair : - local_resources.GetAvailableResources().GetResourceMap()) { - (*resources_data->mutable_resources_available())[resource_pair.first] = - resource_pair.second; - } - last_heartbeat_resources->SetAvailableResources( - ResourceSet(local_resources.GetAvailableResources())); - } - - local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); - if (!last_heartbeat_resources->GetLoadResources().IsEqual( - local_resources.GetLoadResources())) { - resources_data->set_resource_load_changed(true); - for (const auto &resource_pair : - local_resources.GetLoadResources().GetResourceMap()) { - (*resources_data->mutable_resource_load())[resource_pair.first] = - resource_pair.second; - } - last_heartbeat_resources->SetLoadResources( - ResourceSet(local_resources.GetLoadResources())); - } - } else { - // If light resource usage report disabled, we send whole resources information - // every time. + // We only set fileds that changed. + auto last_heartbeat_resources = gcs_client_->NodeResources().GetLastResourceUsage(); + if (!last_heartbeat_resources->GetTotalResources().IsEqual( + local_resources.GetTotalResources())) { for (const auto &resource_pair : local_resources.GetTotalResources().GetResourceMap()) { (*resources_data->mutable_resources_total())[resource_pair.first] = resource_pair.second; } + last_heartbeat_resources->SetTotalResources( + ResourceSet(local_resources.GetTotalResources())); + } + if (!last_heartbeat_resources->GetAvailableResources().IsEqual( + local_resources.GetAvailableResources())) { + resources_data->set_resources_available_changed(true); for (const auto &resource_pair : local_resources.GetAvailableResources().GetResourceMap()) { (*resources_data->mutable_resources_available())[resource_pair.first] = resource_pair.second; } + last_heartbeat_resources->SetAvailableResources( + ResourceSet(local_resources.GetAvailableResources())); + } - local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); + local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); + if (!last_heartbeat_resources->GetLoadResources().IsEqual( + local_resources.GetLoadResources())) { + resources_data->set_resource_load_changed(true); for (const auto &resource_pair : local_resources.GetLoadResources().GetResourceMap()) { (*resources_data->mutable_resource_load())[resource_pair.first] = resource_pair.second; } + last_heartbeat_resources->SetLoadResources( + ResourceSet(local_resources.GetLoadResources())); } } @@ -951,15 +923,16 @@ void NodeManager::TryLocalInfeasibleTaskScheduling() { } } -void NodeManager::ResourceUsageAdded(const NodeID &node_id, - const rpc::ResourcesData &resource_data) { +void NodeManager::UpdateResourceUsage(const NodeID &node_id, + const rpc::ResourcesData &resource_data) { // Locate the node id in remote node table and update available resources based on // the received resource usage information. auto it = cluster_resource_map_.find(node_id); if (it == cluster_resource_map_.end()) { // Haven't received the node registration for this node yet, skip this message. - RAY_LOG(INFO) << "[ResourceUsageAdded]: received resource usage from unknown node id " - << node_id; + RAY_LOG(INFO) + << "[UpdateResourceUsage]: received resource usage from unknown node id " + << node_id; return; } // Trigger local GC at the next heartbeat interval. @@ -969,28 +942,17 @@ void NodeManager::ResourceUsageAdded(const NodeID &node_id, SchedulingResources &remote_resources = it->second; - // If light resource usage report enabled, we update remote resources only when related - // resources map in heartbeat is not empty. - if (light_report_resource_usage_enabled_) { - if (resource_data.resources_total_size() > 0) { - ResourceSet remote_total(MapFromProtobuf(resource_data.resources_total())); - remote_resources.SetTotalResources(std::move(remote_total)); - } - if (resource_data.resources_available_changed()) { - ResourceSet remote_available(MapFromProtobuf(resource_data.resources_available())); - remote_resources.SetAvailableResources(std::move(remote_available)); - } - if (resource_data.resource_load_changed()) { - ResourceSet remote_load(MapFromProtobuf(resource_data.resource_load())); - // Extract the load information and save it locally. - remote_resources.SetLoadResources(std::move(remote_load)); - } - } else { - // If light resource usage report disabled, we update remote resources every time. + // We update remote resources only when related + // resources map in message changed. + if (resource_data.resources_total_size() > 0) { ResourceSet remote_total(MapFromProtobuf(resource_data.resources_total())); remote_resources.SetTotalResources(std::move(remote_total)); + } + if (resource_data.resources_available_changed()) { ResourceSet remote_available(MapFromProtobuf(resource_data.resources_available())); remote_resources.SetAvailableResources(std::move(remote_available)); + } + if (resource_data.resource_load_changed()) { ResourceSet remote_load(MapFromProtobuf(resource_data.resource_load())); // Extract the load information and save it locally. remote_resources.SetLoadResources(std::move(remote_load)); @@ -1030,7 +992,7 @@ void NodeManager::ResourceUsageAdded(const NodeID &node_id, } } -void NodeManager::ResourceUsageBatchAdded( +void NodeManager::ResourceUsageBatchReceived( const ResourceUsageBatchData &resource_usage_batch) { // Update load information provided by each message. for (const auto &resource_usage : resource_usage_batch.batch()) { @@ -1039,7 +1001,7 @@ void NodeManager::ResourceUsageBatchAdded( // Skip messages from self. continue; } - ResourceUsageAdded(node_id, resource_usage); + UpdateResourceUsage(node_id, resource_usage); } } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 63f741af0..402c11c1e 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -225,11 +225,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \param id The ID of the node manager that sent the resources data. /// \param data The resources data including load information. /// \return Void. - void ResourceUsageAdded(const NodeID &id, const rpc::ResourcesData &data); + void UpdateResourceUsage(const NodeID &id, const rpc::ResourcesData &data); + /// Handler for a resource usage batch notification from the GCS /// /// \param resource_usage_batch The batch of resource usage data. - void ResourceUsageBatchAdded(const ResourceUsageBatchData &resource_usage_batch); + void ResourceUsageBatchReceived(const ResourceUsageBatchData &resource_usage_batch); /// Methods for task scheduling. @@ -673,8 +674,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// The time that the last heartbeat was sent at. Used to make sure we are /// keeping up with heartbeats. uint64_t last_heartbeat_at_ms_; - /// Only the changed part will be included in resource usage if this is true. - const bool light_report_resource_usage_enabled_; /// The time that the last debug string was logged to the console. uint64_t last_debug_dump_at_ms_; /// The number of heartbeats that we should wait before sending the diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 10eae694c..5e60c453a 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -888,7 +888,7 @@ void ClusterResourceScheduler::FreeLocalTaskResources( UpdateLocalAvailableResourcesFromResourceInstances(); } -void ClusterResourceScheduler::UpdateLastReportResourcesFromGcs( +void ClusterResourceScheduler::UpdateLastResourceUsage( std::shared_ptr gcs_resources) { NodeResources node_resources = ResourceMapToNodeResources( string_to_int_map_, gcs_resources->GetTotalResources().GetResourceMap(), @@ -897,7 +897,6 @@ void ClusterResourceScheduler::UpdateLastReportResourcesFromGcs( } void ClusterResourceScheduler::FillResourceUsage( - bool light_report_resource_usage_enabled, std::shared_ptr resources_data) { NodeResources resources; @@ -912,78 +911,48 @@ void ClusterResourceScheduler::FillResourceUsage( last_report_resources_.reset(new NodeResources(node_resources)); } - if (light_report_resource_usage_enabled) { - // Reset all local views for remote nodes. This is needed in case tasks that - // we spilled back to a remote node were not actually scheduled on the - // node. Then, the remote node's resource availability may not change and - // so it may not send us another update. - for (auto &node : nodes_) { - if (node.first != local_node_id_) { - node.second.ResetLocalView(); - } + // Reset all local views for remote nodes. This is needed in case tasks that + // we spilled back to a remote node were not actually scheduled on the + // node. Then, the remote node's resource availability may not change and + // so it may not send us another update. + for (auto &node : nodes_) { + if (node.first != local_node_id_) { + node.second.ResetLocalView(); } + } - for (int i = 0; i < PredefinedResources_MAX; i++) { - const auto &label = ResourceEnumToString((PredefinedResources)i); - const auto &capacity = resources.predefined_resources[i]; - const auto &last_capacity = last_report_resources_->predefined_resources[i]; - // Note: available may be negative, but only report positive to GCS. - if (capacity.available != last_capacity.available && capacity.available > 0) { - resources_data->set_resources_available_changed(true); - (*resources_data->mutable_resources_available())[label] = - capacity.available.Double(); - } - if (capacity.total != last_capacity.total) { - (*resources_data->mutable_resources_total())[label] = capacity.total.Double(); - } + for (int i = 0; i < PredefinedResources_MAX; i++) { + const auto &label = ResourceEnumToString((PredefinedResources)i); + const auto &capacity = resources.predefined_resources[i]; + const auto &last_capacity = last_report_resources_->predefined_resources[i]; + // Note: available may be negative, but only report positive to GCS. + if (capacity.available != last_capacity.available && capacity.available > 0) { + resources_data->set_resources_available_changed(true); + (*resources_data->mutable_resources_available())[label] = + capacity.available.Double(); } - for (auto it = resources.custom_resources.begin(); - it != resources.custom_resources.end(); it++) { - uint64_t custom_id = it->first; - const auto &capacity = it->second; - const auto &last_capacity = last_report_resources_->custom_resources[custom_id]; - const auto &label = string_to_int_map_.Get(custom_id); - // Note: available may be negative, but only report positive to GCS. - if (capacity.available != last_capacity.available && capacity.available > 0) { - resources_data->set_resources_available_changed(true); - (*resources_data->mutable_resources_available())[label] = - capacity.available.Double(); - } - if (capacity.total != last_capacity.total) { - (*resources_data->mutable_resources_total())[label] = capacity.total.Double(); - } + if (capacity.total != last_capacity.total) { + (*resources_data->mutable_resources_total())[label] = capacity.total.Double(); } - if (resources != *last_report_resources_.get()) { - last_report_resources_.reset(new NodeResources(resources)); + } + for (const auto &it : resources.custom_resources) { + uint64_t custom_id = it.first; + const auto &capacity = it.second; + const auto &last_capacity = last_report_resources_->custom_resources[custom_id]; + const auto &label = string_to_int_map_.Get(custom_id); + // Note: available may be negative, but only report positive to GCS. + if (capacity.available != last_capacity.available && capacity.available > 0) { + resources_data->set_resources_available_changed(true); + (*resources_data->mutable_resources_available())[label] = + capacity.available.Double(); } - } else { - for (int i = 0; i < PredefinedResources_MAX; i++) { - const auto &label = ResourceEnumToString((PredefinedResources)i); - const auto &capacity = resources.predefined_resources[i]; - // Note: available may be negative, but only report positive to GCS. - if (capacity.available > 0) { - (*resources_data->mutable_resources_available())[label] = - capacity.available.Double(); - } - if (capacity.total != 0) { - (*resources_data->mutable_resources_total())[label] = capacity.total.Double(); - } - } - for (auto it = resources.custom_resources.begin(); - it != resources.custom_resources.end(); it++) { - uint64_t custom_id = it->first; - const auto &capacity = it->second; - const auto &label = string_to_int_map_.Get(custom_id); - // Note: available may be negative, but only report positive to GCS. - if (capacity.available > 0) { - (*resources_data->mutable_resources_available())[label] = - capacity.available.Double(); - } - if (capacity.total != 0) { - (*resources_data->mutable_resources_total())[label] = capacity.total.Double(); - } + if (capacity.total != last_capacity.total) { + (*resources_data->mutable_resources_total())[label] = capacity.total.Double(); } } + if (resources != *last_report_resources_.get()) { + last_report_resources_.reset(new NodeResources(resources)); + } } } // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index 7d1f5253c..bce8b760c 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -363,21 +363,18 @@ class ClusterResourceScheduler { void UpdateLocalAvailableResourcesFromResourceInstances(); /// Populate the relevant parts of the heartbeat table. This is intended for - /// sending raylet <-> gcs heartbeats. In particular, this should fill in + /// sending resource usage of raylet to gcs. In particular, this should fill in /// resources_available and resources_total. /// - /// \param light_report_resource_usage_enabled Only send changed fields if true. /// \param Output parameter. `resources_available` and `resources_total` are the only /// fields used. - void FillResourceUsage(bool light_report_resource_usage_enabled, - std::shared_ptr resources_data); + void FillResourceUsage(std::shared_ptr resources_data); /// Update last report resources local cache from gcs cache, /// this is needed when gcs fo. /// /// \param gcs_resources: The remote cache from gcs. - void UpdateLastReportResourcesFromGcs( - std::shared_ptr gcs_resources); + void UpdateLastResourceUsage(std::shared_ptr gcs_resources); /// Return human-readable string for this scheduler state. std::string DebugString() const; diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc index db8fa44ed..317acde31 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc @@ -119,7 +119,7 @@ void initNodeResources(NodeResources &node, vector &pred_capacities, } } -void initCluster(ClusterResourceScheduler &cluster_resources, int n) { +void initCluster(ClusterResourceScheduler &resource_scheduler, int n) { vector pred_capacities; vector cust_ids; vector cust_capacities; @@ -146,7 +146,7 @@ void initCluster(ClusterResourceScheduler &cluster_resources, int n) { initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - cluster_resources.AddOrUpdateNode(i, node_resources); + resource_scheduler.AddOrUpdateNode(i, node_resources); node_resources.custom_resources.clear(); } @@ -273,31 +273,31 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingIdTest) { TEST_F(ClusterResourceSchedulerTest, SchedulingInitClusterTest) { int num_nodes = 10; - ClusterResourceScheduler cluster_resources; + ClusterResourceScheduler resource_scheduler; - initCluster(cluster_resources, num_nodes); + initCluster(resource_scheduler, num_nodes); - ASSERT_EQ(cluster_resources.NumNodes(), num_nodes); + ASSERT_EQ(resource_scheduler.NumNodes(), num_nodes); } TEST_F(ClusterResourceSchedulerTest, SchedulingDeleteClusterNodeTest) { int num_nodes = 4; int64_t remove_id = 2; - ClusterResourceScheduler cluster_resources; + ClusterResourceScheduler resource_scheduler; - initCluster(cluster_resources, num_nodes); - cluster_resources.RemoveNode(remove_id); + initCluster(resource_scheduler, num_nodes); + resource_scheduler.RemoveNode(remove_id); - ASSERT_TRUE(num_nodes - 1 == cluster_resources.NumNodes()); + ASSERT_TRUE(num_nodes - 1 == resource_scheduler.NumNodes()); } TEST_F(ClusterResourceSchedulerTest, SchedulingModifyClusterNodeTest) { int num_nodes = 4; int64_t update_id = 2; - ClusterResourceScheduler cluster_resources; + ClusterResourceScheduler resource_scheduler; - initCluster(cluster_resources, num_nodes); + initCluster(resource_scheduler, num_nodes); NodeResources node_resources; vector pred_capacities; @@ -321,9 +321,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingModifyClusterNodeTest) { cust_capacities.push_back(rand() % 10); initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - cluster_resources.AddOrUpdateNode(update_id, node_resources); + resource_scheduler.AddOrUpdateNode(update_id, node_resources); } - ASSERT_TRUE(num_nodes == cluster_resources.NumNodes()); + ASSERT_TRUE(num_nodes == resource_scheduler.NumNodes()); } TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) { @@ -333,7 +333,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) { vector cust_ids{1, 2}; vector cust_capacities{5, 5}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler cluster_resources(1, node_resources); + ClusterResourceScheduler resource_scheduler(1, node_resources); { TaskRequest task_req; @@ -347,17 +347,17 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) { EmptyIntVector); int64_t violations; bool is_infeasible; - int64_t node_id = cluster_resources.GetBestSchedulableNode( + int64_t node_id = resource_scheduler.GetBestSchedulableNode( task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id != -1); ASSERT_EQ(node_id, 1); ASSERT_TRUE(violations > 0); NodeResources nr1, nr2; - ASSERT_TRUE(cluster_resources.GetNodeResources(node_id, &nr1)); + ASSERT_TRUE(resource_scheduler.GetNodeResources(node_id, &nr1)); auto task_allocation = std::make_shared(); - ASSERT_TRUE(cluster_resources.AllocateLocalTaskResources(task_req, task_allocation)); - ASSERT_TRUE(cluster_resources.GetNodeResources(node_id, &nr2)); + ASSERT_TRUE(resource_scheduler.AllocateLocalTaskResources(task_req, task_allocation)); + ASSERT_TRUE(resource_scheduler.GetNodeResources(node_id, &nr2)); for (size_t i = 0; i < PRED_CUSTOM_LEN; i++) { auto t = @@ -381,7 +381,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) { } TEST_F(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest) { - ClusterResourceScheduler cluster_resources; + ClusterResourceScheduler resource_scheduler; NodeResources nr, nr_out; int64_t node_id = 1; @@ -392,12 +392,12 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest) { vector cust_ids{1, 2}; vector cust_capacities{5, 5}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - cluster_resources.AddOrUpdateNode(node_id, node_resources); + resource_scheduler.AddOrUpdateNode(node_id, node_resources); nr = node_resources; } // Check whether node resources were correctly added. - if (cluster_resources.GetNodeResources(node_id, &nr_out)) { + if (resource_scheduler.GetNodeResources(node_id, &nr_out)) { ASSERT_TRUE(nodeResourcesEqual(nr, nr_out)); } else { ASSERT_TRUE(false); @@ -410,10 +410,10 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest) { vector cust_ids{2, 3}; vector cust_capacities{6, 6}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - cluster_resources.AddOrUpdateNode(node_id, node_resources); + resource_scheduler.AddOrUpdateNode(node_id, node_resources); nr = node_resources; } - if (cluster_resources.GetNodeResources(node_id, &nr_out)) { + if (resource_scheduler.GetNodeResources(node_id, &nr_out)) { ASSERT_TRUE(nodeResourcesEqual(nr, nr_out)); } else { ASSERT_TRUE(false); @@ -427,9 +427,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { vector cust_ids{1}; vector cust_capacities{10}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler cluster_resources(0, node_resources); - - std::cerr << "XXXXXXXXXXX" << std::endl; + ClusterResourceScheduler resource_scheduler(0, node_resources); { NodeResources node_resources; @@ -437,7 +435,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { vector cust_ids{1, 2}; vector cust_capacities{5, 5}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - cluster_resources.AddOrUpdateNode(1, node_resources); + resource_scheduler.AddOrUpdateNode(1, node_resources); } // Predefined resources, hard constraint violation { @@ -448,7 +446,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector); int64_t violations; bool is_infeasible; - int64_t node_id = cluster_resources.GetBestSchedulableNode( + int64_t node_id = resource_scheduler.GetBestSchedulableNode( task_req, false, &violations, &is_infeasible); ASSERT_EQ(node_id, -1); } @@ -461,7 +459,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector); int64_t violations; bool is_infeasible; - int64_t node_id = cluster_resources.GetBestSchedulableNode( + int64_t node_id = resource_scheduler.GetBestSchedulableNode( task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations > 0); @@ -476,7 +474,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector); int64_t violations; bool is_infeasible; - int64_t node_id = cluster_resources.GetBestSchedulableNode( + int64_t node_id = resource_scheduler.GetBestSchedulableNode( task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations == 0); @@ -493,7 +491,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { EmptyIntVector); int64_t violations; bool is_infeasible; - int64_t node_id = cluster_resources.GetBestSchedulableNode( + int64_t node_id = resource_scheduler.GetBestSchedulableNode( task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id == -1); } @@ -509,7 +507,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { EmptyIntVector); int64_t violations; bool is_infeasible; - int64_t node_id = cluster_resources.GetBestSchedulableNode( + int64_t node_id = resource_scheduler.GetBestSchedulableNode( task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations > 0); @@ -526,7 +524,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { EmptyIntVector); int64_t violations; bool is_infeasible; - int64_t node_id = cluster_resources.GetBestSchedulableNode( + int64_t node_id = resource_scheduler.GetBestSchedulableNode( task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations == 0); @@ -543,7 +541,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { EmptyIntVector); int64_t violations; bool is_infeasible; - int64_t node_id = cluster_resources.GetBestSchedulableNode( + int64_t node_id = resource_scheduler.GetBestSchedulableNode( task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id == -1); } @@ -559,7 +557,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { EmptyIntVector); int64_t violations; bool is_infeasible; - int64_t node_id = cluster_resources.GetBestSchedulableNode( + int64_t node_id = resource_scheduler.GetBestSchedulableNode( task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations > 0); @@ -577,7 +575,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { placement_hints); int64_t violations; bool is_infeasible; - int64_t node_id = cluster_resources.GetBestSchedulableNode( + int64_t node_id = resource_scheduler.GetBestSchedulableNode( task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations > 0); @@ -595,7 +593,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { placement_hints); int64_t violations; bool is_infeasible; - int64_t node_id = cluster_resources.GetBestSchedulableNode( + int64_t node_id = resource_scheduler.GetBestSchedulableNode( task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations == 0); @@ -609,10 +607,10 @@ TEST_F(ClusterResourceSchedulerTest, GetLocalAvailableResourcesTest) { vector cust_ids{1}; vector cust_capacities{8}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler cluster_resources(0, node_resources); + ClusterResourceScheduler resource_scheduler(0, node_resources); TaskResourceInstances available_cluster_resources = - cluster_resources.GetLocalResources().GetAvailableResourceInstances(); + resource_scheduler.GetLocalResources().GetAvailableResourceInstances(); TaskResourceInstances expected_cluster_resources; addTaskResourceInstances(true, {1., 1., 1.}, 0, &expected_cluster_resources); @@ -676,7 +674,7 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { vector pred_capacities{3. /* CPU */, 4. /* MEM */, 5. /* GPU */}; initNodeResources(node_resources, pred_capacities, EmptyIntVector, EmptyFixedPointVector); - ClusterResourceScheduler cluster_resources(0, node_resources); + ClusterResourceScheduler resource_scheduler(0, node_resources); TaskRequest task_req; vector pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */}; @@ -684,18 +682,18 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector); - NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources(); + NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources(); std::shared_ptr task_allocation = std::make_shared(); bool success = - cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation); + resource_scheduler.AllocateTaskResourceInstances(task_req, task_allocation); ASSERT_EQ(success, true); - cluster_resources.FreeTaskResourceInstances(task_allocation); + resource_scheduler.FreeTaskResourceInstances(task_allocation); - ASSERT_EQ((cluster_resources.GetLocalResources() == old_local_resources), true); + ASSERT_EQ((resource_scheduler.GetLocalResources() == old_local_resources), true); } // Try to allocate resources for a task request that overallocates a hard constrained // resource. @@ -704,7 +702,7 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { vector pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */}; initNodeResources(node_resources, pred_capacities, EmptyIntVector, EmptyFixedPointVector); - ClusterResourceScheduler cluster_resources(0, node_resources); + ClusterResourceScheduler resource_scheduler(0, node_resources); TaskRequest task_req; vector pred_demands = {4. /* CPU */, 2. /* MEM */, 1.5 /* GPU */}; @@ -712,14 +710,14 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector); - NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources(); + NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources(); std::shared_ptr task_allocation = std::make_shared(); bool success = - cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation); + resource_scheduler.AllocateTaskResourceInstances(task_req, task_allocation); ASSERT_EQ(success, false); - ASSERT_EQ((cluster_resources.GetLocalResources() == old_local_resources), true); + ASSERT_EQ((resource_scheduler.GetLocalResources() == old_local_resources), true); } // Allocate resources for a task request that overallocates a soft constrained resource. { @@ -727,7 +725,7 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { vector pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */}; initNodeResources(node_resources, pred_capacities, EmptyIntVector, EmptyFixedPointVector); - ClusterResourceScheduler cluster_resources(0, node_resources); + ClusterResourceScheduler resource_scheduler(0, node_resources); TaskRequest task_req; vector pred_demands = {4. /* CPU */, 2. /* MEM */, 1.5 /* GPU */}; @@ -735,11 +733,11 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector); - NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources(); + NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources(); std::shared_ptr task_allocation = std::make_shared(); bool success = - cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation); + resource_scheduler.AllocateTaskResourceInstances(task_req, task_allocation); ASSERT_EQ(success, true); @@ -749,7 +747,7 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { addTaskResourceInstances(true, {0., 0.5, 1., 1., 1.}, GPU, &expected_task_allocation); TaskResourceInstances local_available_resources = - cluster_resources.GetLocalResources().GetAvailableResourceInstances(); + resource_scheduler.GetLocalResources().GetAvailableResourceInstances(); ASSERT_EQ((local_available_resources == expected_task_allocation), true); } @@ -761,7 +759,7 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { vector cust_ids{1, 2}; vector cust_capacities{4, 4}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler cluster_resources(0, node_resources); + ClusterResourceScheduler resource_scheduler(0, node_resources); TaskRequest task_req; vector pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */}; @@ -771,17 +769,17 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, EmptyIntVector); - NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources(); + NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources(); std::shared_ptr task_allocation = std::make_shared(); bool success = - cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation); + resource_scheduler.AllocateTaskResourceInstances(task_req, task_allocation); ASSERT_EQ(success, true); - cluster_resources.FreeTaskResourceInstances(task_allocation); + resource_scheduler.FreeTaskResourceInstances(task_allocation); - ASSERT_EQ((cluster_resources.GetLocalResources() == old_local_resources), true); + ASSERT_EQ((resource_scheduler.GetLocalResources() == old_local_resources), true); } // Allocate resources for a task request specifying both predefined and custom // resources, but overallocates a hard-constrained custom resource. @@ -791,7 +789,7 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { vector cust_ids{1, 2}; vector cust_capacities{4, 4}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler cluster_resources(0, node_resources); + ClusterResourceScheduler resource_scheduler(0, node_resources); TaskRequest task_req; vector pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */}; @@ -801,14 +799,14 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, EmptyIntVector); - NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources(); + NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources(); std::shared_ptr task_allocation = std::make_shared(); bool success = - cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation); + resource_scheduler.AllocateTaskResourceInstances(task_req, task_allocation); ASSERT_EQ(success, false); - ASSERT_EQ((cluster_resources.GetLocalResources() == old_local_resources), true); + ASSERT_EQ((resource_scheduler.GetLocalResources() == old_local_resources), true); } // Allocate resources for a task request specifying both predefined and custom // resources, but overallocates a soft-constrained custom resource. @@ -818,7 +816,7 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { vector cust_ids{1, 2}; vector cust_capacities{4, 4}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler cluster_resources(0, node_resources); + ClusterResourceScheduler resource_scheduler(0, node_resources); TaskRequest task_req; vector pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */}; @@ -828,11 +826,11 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, EmptyIntVector); - NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources(); + NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources(); std::shared_ptr task_allocation = std::make_shared(); bool success = - cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation); + resource_scheduler.AllocateTaskResourceInstances(task_req, task_allocation); ASSERT_EQ(success, true); @@ -844,7 +842,7 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { addTaskResourceInstances(false, {0.}, 2, &expected_task_allocation); TaskResourceInstances local_available_resources = - cluster_resources.GetLocalResources().GetAvailableResourceInstances(); + resource_scheduler.GetLocalResources().GetAvailableResourceInstances(); ASSERT_EQ((local_available_resources == expected_task_allocation), true); } @@ -857,7 +855,7 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest2) { vector cust_ids{1, 2}; vector cust_capacities{4., 4.}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler cluster_resources(0, node_resources); + ClusterResourceScheduler resource_scheduler(0, node_resources); TaskRequest task_req; vector pred_demands = {2. /* CPU */, 2. /* MEM */, 1.5 /* GPU */}; @@ -870,15 +868,15 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest2) { std::shared_ptr task_allocation = std::make_shared(); bool success = - cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation); + resource_scheduler.AllocateTaskResourceInstances(task_req, task_allocation); - NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources(); + NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources(); ASSERT_EQ(success, true); std::vector cpu_instances = task_allocation->GetCPUInstancesDouble(); - cluster_resources.AddCPUResourceInstances(cpu_instances); - cluster_resources.SubtractCPUResourceInstances(cpu_instances); + resource_scheduler.AddCPUResourceInstances(cpu_instances); + resource_scheduler.SubtractCPUResourceInstances(cpu_instances); - ASSERT_EQ((cluster_resources.GetLocalResources() == old_local_resources), true); + ASSERT_EQ((resource_scheduler.GetLocalResources() == old_local_resources), true); } } @@ -889,19 +887,19 @@ TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) { vector cust_ids{1}; vector cust_capacities{8}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler cluster_resources(0, node_resources); + ClusterResourceScheduler resource_scheduler(0, node_resources); std::vector allocate_gpu_instances{0.5, 0.5, 0.5, 0.5}; - cluster_resources.SubtractGPUResourceInstances(allocate_gpu_instances); - std::vector available_gpu_instances = cluster_resources.GetLocalResources() + resource_scheduler.SubtractGPUResourceInstances(allocate_gpu_instances); + std::vector available_gpu_instances = resource_scheduler.GetLocalResources() .GetAvailableResourceInstances() .GetGPUInstancesDouble(); std::vector expected_available_gpu_instances{0.5, 0.5, 0.5, 0.5}; ASSERT_TRUE(std::equal(available_gpu_instances.begin(), available_gpu_instances.end(), expected_available_gpu_instances.begin())); - cluster_resources.AddGPUResourceInstances(allocate_gpu_instances); - available_gpu_instances = cluster_resources.GetLocalResources() + resource_scheduler.AddGPUResourceInstances(allocate_gpu_instances); + available_gpu_instances = resource_scheduler.GetLocalResources() .GetAvailableResourceInstances() .GetGPUInstancesDouble(); expected_available_gpu_instances = {1., 1., 1., 1.}; @@ -910,11 +908,11 @@ TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) { allocate_gpu_instances = {1.5, 1.5, .5, 1.5}; std::vector underflow = - cluster_resources.SubtractGPUResourceInstances(allocate_gpu_instances); + resource_scheduler.SubtractGPUResourceInstances(allocate_gpu_instances); std::vector expected_underflow{.5, .5, 0., .5}; ASSERT_TRUE( std::equal(underflow.begin(), underflow.end(), expected_underflow.begin())); - available_gpu_instances = cluster_resources.GetLocalResources() + available_gpu_instances = resource_scheduler.GetLocalResources() .GetAvailableResourceInstances() .GetGPUInstancesDouble(); expected_available_gpu_instances = {0., 0., 0.5, 0.}; @@ -923,10 +921,10 @@ TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) { allocate_gpu_instances = {1.0, .5, 1., .5}; std::vector overflow = - cluster_resources.AddGPUResourceInstances(allocate_gpu_instances); + resource_scheduler.AddGPUResourceInstances(allocate_gpu_instances); std::vector expected_overflow{.0, .0, .5, 0.}; ASSERT_TRUE(std::equal(overflow.begin(), overflow.end(), expected_overflow.begin())); - available_gpu_instances = cluster_resources.GetLocalResources() + available_gpu_instances = resource_scheduler.GetLocalResources() .GetAvailableResourceInstances() .GetGPUInstancesDouble(); expected_available_gpu_instances = {1., .5, 1., .5}; @@ -943,14 +941,14 @@ TEST_F(ClusterResourceSchedulerTest, vector cust_ids{1}; vector cust_capacities{8}; initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler cluster_resources(0, node_resources); + ClusterResourceScheduler resource_scheduler(0, node_resources); { std::vector allocate_gpu_instances{0.5, 0.5, 2, 0.5}; // SubtractGPUResourceInstances() calls // UpdateLocalAvailableResourcesFromResourceInstances() under the hood. - cluster_resources.SubtractGPUResourceInstances(allocate_gpu_instances); - std::vector available_gpu_instances = cluster_resources.GetLocalResources() + resource_scheduler.SubtractGPUResourceInstances(allocate_gpu_instances); + std::vector available_gpu_instances = resource_scheduler.GetLocalResources() .GetAvailableResourceInstances() .GetGPUInstancesDouble(); std::vector expected_available_gpu_instances{0.5, 0.5, 0., 0.5}; @@ -959,7 +957,7 @@ TEST_F(ClusterResourceSchedulerTest, expected_available_gpu_instances.begin())); NodeResources nr; - cluster_resources.GetNodeResources(0, &nr); + resource_scheduler.GetNodeResources(0, &nr); ASSERT_TRUE(nr.predefined_resources[GPU].available == 1.5); } @@ -967,8 +965,8 @@ TEST_F(ClusterResourceSchedulerTest, std::vector allocate_gpu_instances{1.5, 0.5, 2, 0.3}; // SubtractGPUResourceInstances() calls // UpdateLocalAvailableResourcesFromResourceInstances() under the hood. - cluster_resources.AddGPUResourceInstances(allocate_gpu_instances); - std::vector available_gpu_instances = cluster_resources.GetLocalResources() + resource_scheduler.AddGPUResourceInstances(allocate_gpu_instances); + std::vector available_gpu_instances = resource_scheduler.GetLocalResources() .GetAvailableResourceInstances() .GetGPUInstancesDouble(); std::vector expected_available_gpu_instances{1., 1., 1., 0.8}; @@ -977,7 +975,7 @@ TEST_F(ClusterResourceSchedulerTest, expected_available_gpu_instances.begin())); NodeResources nr; - cluster_resources.GetNodeResources(0, &nr); + resource_scheduler.GetNodeResources(0, &nr); ASSERT_TRUE(nr.predefined_resources[GPU].available == 3.8); } } @@ -988,7 +986,7 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstanceWithHardRequestTest) { vector pred_capacities{4. /* CPU */, 2. /* MEM */, 4. /* GPU */}; initNodeResources(node_resources, pred_capacities, EmptyIntVector, EmptyFixedPointVector); - ClusterResourceScheduler cluster_resources(0, node_resources); + ClusterResourceScheduler resource_scheduler(0, node_resources); TaskRequest task_req; vector pred_demands = {2. /* CPU */, 2. /* MEM */, 1.5 /* GPU */}; @@ -999,7 +997,7 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstanceWithHardRequestTest) { std::shared_ptr task_allocation = std::make_shared(); bool success = - cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation); + resource_scheduler.AllocateTaskResourceInstances(task_req, task_allocation); ASSERT_EQ(success, true); @@ -1011,51 +1009,51 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstanceWithHardRequestTest) { TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) { std::unordered_map resource_spec({{"CPU", 1}}); - ClusterResourceScheduler cluster_resources("local", {}); + ClusterResourceScheduler resource_scheduler("local", {}); for (int i = 0; i < 100; i++) { - cluster_resources.AddOrUpdateNode(std::to_string(i), {}, {}); + resource_scheduler.AddOrUpdateNode(std::to_string(i), {}, {}); } // No feasible nodes. int64_t total_violations; bool is_infeasible; - ASSERT_EQ(cluster_resources.GetBestSchedulableNode(resource_spec, false, - &total_violations, &is_infeasible), + ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, false, + &total_violations, &is_infeasible), ""); // Feasible remote node, but doesn't currently have resources available. We // should spill there. - cluster_resources.AddOrUpdateNode("remote_feasible", resource_spec, {{"CPU", 0.}}); - ASSERT_EQ(cluster_resources.GetBestSchedulableNode(resource_spec, false, - &total_violations, &is_infeasible), + resource_scheduler.AddOrUpdateNode("remote_feasible", resource_spec, {{"CPU", 0.}}); + ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, false, + &total_violations, &is_infeasible), "remote_feasible"); // Feasible remote node, and it currently has resources available. We should // prefer to spill there. - cluster_resources.AddOrUpdateNode("remote_available", resource_spec, resource_spec); - ASSERT_EQ(cluster_resources.GetBestSchedulableNode(resource_spec, false, - &total_violations, &is_infeasible), + resource_scheduler.AddOrUpdateNode("remote_available", resource_spec, resource_spec); + ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, false, + &total_violations, &is_infeasible), "remote_available"); } -TEST_F(ClusterResourceSchedulerTest, HeartbeatTest) { +TEST_F(ClusterResourceSchedulerTest, ResourceUsageReportTest) { vector cust_ids{1, 2, 3, 4, 5}; NodeResources node_resources; std::unordered_map initial_resources( {{"CPU", 1}, {"GPU", 2}, {"memory", 3}, {"1", 1}, {"2", 2}, {"3", 3}}); - ClusterResourceScheduler cluster_resources("0", initial_resources); + ClusterResourceScheduler resource_scheduler("0", initial_resources); NodeResources other_node_resources; vector other_pred_capacities{1. /* CPU */, 1. /* MEM */, 1. /* GPU */}; vector other_cust_capacities{5., 4., 3., 2., 1.}; initNodeResources(other_node_resources, other_pred_capacities, cust_ids, other_cust_capacities); - cluster_resources.AddOrUpdateNode(12345, other_node_resources); + resource_scheduler.AddOrUpdateNode(12345, other_node_resources); { // Cluster is idle. auto data = std::make_shared(); - cluster_resources.FillResourceUsage(false, data); + resource_scheduler.FillResourceUsage(data); auto available = data->resources_available(); auto total = data->resources_total(); @@ -1091,9 +1089,10 @@ TEST_F(ClusterResourceSchedulerTest, HeartbeatTest) { {"CPU", 0.1}, {"1", 0.1}, }); - cluster_resources.AllocateLocalTaskResources(allocation_map, allocations); + resource_scheduler.AllocateLocalTaskResources(allocation_map, allocations); auto data = std::make_shared(); - cluster_resources.FillResourceUsage(false, data); + resource_scheduler.UpdateLastResourceUsage(std::make_shared()); + resource_scheduler.FillResourceUsage(data); auto available = data->resources_available(); auto total = data->resources_total(); @@ -1114,125 +1113,95 @@ TEST_F(ClusterResourceSchedulerTest, HeartbeatTest) { } } -TEST_F(ClusterResourceSchedulerTest, TestLightResourceUsageReport) { +TEST_F(ClusterResourceSchedulerTest, DirtyLocalViewTest) { std::unordered_map initial_resources({{"CPU", 1}}); - ClusterResourceScheduler cluster_resources("local", initial_resources); - - // Fill resource usage usage on initialization. - auto data = std::make_shared(); - cluster_resources.FillResourceUsage(true, data); - ASSERT_RESOURCES_EQ(data, 1, 1); - - // Don't report resource usage if resource availability hasn't changed. - for (int i = 0; i < 3; i++) { - data->Clear(); - cluster_resources.FillResourceUsage(true, data); - ASSERT_RESOURCES_EMPTY(data); - } - - // Report resource usage if resource availability has changed. - cluster_resources.AddOrUpdateNode("local", {{"CPU", 2.}}, {{"CPU", 0.}}); - data->Clear(); - cluster_resources.FillResourceUsage(true, data); - ASSERT_RESOURCES_EQ(data, 0, 2); - - // Don't report resource usage if resource availability hasn't changed. - for (int i = 0; i < 3; i++) { - data->Clear(); - cluster_resources.FillResourceUsage(true, data); - ASSERT_RESOURCES_EMPTY(data); - } -} - -TEST_F(ClusterResourceSchedulerTest, TestDirtyLocalView) { - std::unordered_map initial_resources({{"CPU", 1}}); - ClusterResourceScheduler cluster_resources("local", initial_resources); - cluster_resources.AddOrUpdateNode("remote", {{"CPU", 2.}}, {{"CPU", 2.}}); + ClusterResourceScheduler resource_scheduler("local", initial_resources); + resource_scheduler.AddOrUpdateNode("remote", {{"CPU", 2.}}, {{"CPU", 2.}}); const std::unordered_map task_spec = {{"CPU", 1.}}; // Allocate local resources to force tasks onto the remote node when // resources are available. std::shared_ptr task_allocation = std::make_shared(); - ASSERT_TRUE(cluster_resources.AllocateLocalTaskResources(task_spec, task_allocation)); + ASSERT_TRUE(resource_scheduler.AllocateLocalTaskResources(task_spec, task_allocation)); task_allocation = std::make_shared(); - ASSERT_FALSE(cluster_resources.AllocateLocalTaskResources(task_spec, task_allocation)); + ASSERT_FALSE(resource_scheduler.AllocateLocalTaskResources(task_spec, task_allocation)); // View of local resources is not affected by resource usage report. auto data = std::make_shared(); - cluster_resources.FillResourceUsage(true, data); - ASSERT_FALSE(cluster_resources.AllocateLocalTaskResources(task_spec, task_allocation)); + resource_scheduler.FillResourceUsage(data); + ASSERT_FALSE(resource_scheduler.AllocateLocalTaskResources(task_spec, task_allocation)); for (int num_slots_available = 0; num_slots_available <= 2; num_slots_available++) { // Remote node reports updated resource availability. - cluster_resources.AddOrUpdateNode("remote", {{"CPU", 2.}}, - {{"CPU", num_slots_available}}); + resource_scheduler.AddOrUpdateNode("remote", {{"CPU", 2.}}, + {{"CPU", num_slots_available}}); auto data = std::make_shared(); int64_t t; bool is_infeasible; for (int i = 0; i < 3; i++) { // Resource usage report tick should reset the remote node's resources. - cluster_resources.FillResourceUsage(true, data); + resource_scheduler.FillResourceUsage(data); for (int j = 0; j < num_slots_available; j++) { - ASSERT_EQ(cluster_resources.GetBestSchedulableNode(task_spec, false, &t, - &is_infeasible), + ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(task_spec, false, &t, + &is_infeasible), "remote"); // Allocate remote resources. - ASSERT_TRUE(cluster_resources.AllocateRemoteTaskResources("remote", task_spec)); + ASSERT_TRUE(resource_scheduler.AllocateRemoteTaskResources("remote", task_spec)); } // Our local view says there are not enough resources on the remote node to // schedule another task. ASSERT_EQ( - cluster_resources.GetBestSchedulableNode(task_spec, false, &t, &is_infeasible), + resource_scheduler.GetBestSchedulableNode(task_spec, false, &t, &is_infeasible), ""); ASSERT_FALSE( - cluster_resources.AllocateLocalTaskResources(task_spec, task_allocation)); - ASSERT_FALSE(cluster_resources.AllocateRemoteTaskResources("remote", task_spec)); + resource_scheduler.AllocateLocalTaskResources(task_spec, task_allocation)); + ASSERT_FALSE(resource_scheduler.AllocateRemoteTaskResources("remote", task_spec)); } } } TEST_F(ClusterResourceSchedulerTest, DynamicResourceTest) { - ClusterResourceScheduler cluster_resources("local", {{"CPU", 2}}); + ClusterResourceScheduler resource_scheduler("local", {{"CPU", 2}}); std::unordered_map task_request = {{"CPU", 1}, {"custom123", 2}}; int64_t t; bool is_infeasible; std::string result = - cluster_resources.GetBestSchedulableNode(task_request, false, &t, &is_infeasible); + resource_scheduler.GetBestSchedulableNode(task_request, false, &t, &is_infeasible); ASSERT_TRUE(result.empty()); - cluster_resources.AddLocalResource("custom123", 5); + resource_scheduler.AddLocalResource("custom123", 5); result = - cluster_resources.GetBestSchedulableNode(task_request, false, &t, &is_infeasible); + resource_scheduler.GetBestSchedulableNode(task_request, false, &t, &is_infeasible); ASSERT_FALSE(result.empty()); task_request["custom123"] = 6; result = - cluster_resources.GetBestSchedulableNode(task_request, false, &t, &is_infeasible); + resource_scheduler.GetBestSchedulableNode(task_request, false, &t, &is_infeasible); ASSERT_TRUE(result.empty()); - cluster_resources.AddLocalResource("custom123", 5); + resource_scheduler.AddLocalResource("custom123", 5); result = - cluster_resources.GetBestSchedulableNode(task_request, false, &t, &is_infeasible); + resource_scheduler.GetBestSchedulableNode(task_request, false, &t, &is_infeasible); ASSERT_FALSE(result.empty()); - cluster_resources.DeleteLocalResource("custom123"); + resource_scheduler.DeleteLocalResource("custom123"); result = - cluster_resources.GetBestSchedulableNode(task_request, false, &t, &is_infeasible); + resource_scheduler.GetBestSchedulableNode(task_request, false, &t, &is_infeasible); ASSERT_TRUE(result.empty()); } TEST_F(ClusterResourceSchedulerTest, AvailableResourceEmptyTest) { - ClusterResourceScheduler cluster_resources("local", {{"custom123", 5}}); + ClusterResourceScheduler resource_scheduler("local", {{"custom123", 5}}); std::shared_ptr resource_instances = std::make_shared(); std::unordered_map task_request = {{"custom123", 5}}; bool allocated = - cluster_resources.AllocateLocalTaskResources(task_request, resource_instances); + resource_scheduler.AllocateLocalTaskResources(task_request, resource_instances); ASSERT_TRUE(allocated); - ASSERT_TRUE(cluster_resources.IsAvailableResourceEmpty("custom123")); + ASSERT_TRUE(resource_scheduler.IsAvailableResourceEmpty("custom123")); } } // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index fc03a2a77..35ef056b0 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -395,7 +395,6 @@ void ClusterTaskManager::FillPendingActorInfo(rpc::GetNodeStatsReply *reply) con } void ClusterTaskManager::FillResourceUsage( - bool light_report_resource_usage_enabled, std::shared_ptr data) const { if (max_resource_shapes_per_load_report_ == 0) { return; diff --git a/src/ray/raylet/scheduling/cluster_task_manager.h b/src/ray/raylet/scheduling/cluster_task_manager.h index 269954eb9..ebbf4770c 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.h +++ b/src/ray/raylet/scheduling/cluster_task_manager.h @@ -111,14 +111,12 @@ class ClusterTaskManager { void FillPendingActorInfo(rpc::GetNodeStatsReply *reply) const; /// Populate the relevant parts of the heartbeat table. This is intended for - /// sending raylet <-> gcs heartbeats. In particular, this should fill in + /// sending resource usage of raylet to gcs. In particular, this should fill in /// resource_load and resource_load_by_shape. /// - /// \param light_report_resource_usage_enabled Only send changed fields if true. /// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only /// fields used. - void FillResourceUsage(bool light_report_resource_usage_enabled, - std::shared_ptr data) const; + void FillResourceUsage(std::shared_ptr data) const; /// Return if any tasks are pending resource acquisition. /// diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index e2ca4c5b7..604ca1b1d 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -512,7 +512,7 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) { { auto data = std::make_shared(); - task_manager_.FillResourceUsage(false, data); + task_manager_.FillResourceUsage(data); auto load_by_shape = data->mutable_resource_load_by_shape()->mutable_resource_demands(); @@ -586,7 +586,7 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) { ASSERT_EQ(node_info_calls_, 0); auto data = std::make_shared(); - task_manager_.FillResourceUsage(false, data); + task_manager_.FillResourceUsage(data); auto resource_load_by_shape = data->resource_load_by_shape(); auto shape1 = resource_load_by_shape.resource_demands()[0]; @@ -600,7 +600,7 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) { } data = std::make_shared(); - task_manager_.FillResourceUsage(false, data); + task_manager_.FillResourceUsage(data); resource_load_by_shape = data->resource_load_by_shape(); shape1 = resource_load_by_shape.resource_demands()[0];