diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index b9ee6aa2b..306408c47 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -29,6 +29,7 @@ py_test_module_list( "test_error_ray_not_initialized.py", "test_gcs_fault_tolerance.py", "test_global_gc.py", + "test_global_state.py", "test_iter.py", "test_joblib.py", "test_resource_demand_scheduler.py", @@ -81,7 +82,6 @@ py_test_module_list( "test_dask_scheduler.py", "test_dask_callback.py", "test_debug_tools.py", - "test_global_state.py", "test_job.py", "test_memstat.py", "test_metrics_agent.py", diff --git a/python/ray/tests/test_global_state.py b/python/ray/tests/test_global_state.py index 40b4168a9..e02e372b2 100644 --- a/python/ray/tests/test_global_state.py +++ b/python/ray/tests/test_global_state.py @@ -212,6 +212,87 @@ def test_load_report(shutdown_only, max_shapes): else: assert demand.num_ready_requests_queued > 0 assert demand.num_infeasible_requests_queued == 0 + client.close() + + +def test_placement_group_load_report(ray_start_cluster): + cluster = ray_start_cluster + # Add a head node that doesn't have gpu resource. + cluster.add_node(num_cpus=4) + ray.init(address=cluster.address) + redis = ray._private.services.create_redis_client( + cluster.address, password=ray.ray_constants.REDIS_DEFAULT_PASSWORD) + redis = ray._private.services.create_redis_client( + cluster.address, password=ray.ray_constants.REDIS_DEFAULT_PASSWORD) + client = redis.pubsub(ignore_subscribe_messages=True) + client.psubscribe(ray.gcs_utils.XRAY_HEARTBEAT_BATCH_PATTERN) + + class PgLoadChecker: + def nothing_is_ready(self): + heartbeat = self._read_heartbeat() + if not heartbeat: + return False + if heartbeat.HasField("placement_group_load"): + pg_load = heartbeat.placement_group_load + return len(pg_load.placement_group_data) == 2 + return False + + def only_first_one_ready(self): + heartbeat = self._read_heartbeat() + if not heartbeat: + return False + if heartbeat.HasField("placement_group_load"): + pg_load = heartbeat.placement_group_load + return len(pg_load.placement_group_data) == 1 + return False + + def two_infeasible_pg(self): + heartbeat = self._read_heartbeat() + if not heartbeat: + return False + if heartbeat.HasField("placement_group_load"): + pg_load = heartbeat.placement_group_load + return len(pg_load.placement_group_data) == 2 + return False + + def _read_heartbeat(self): + try: + message = client.get_message() + except redis.exceptions.ConnectionError: + pass + if message is None: + return None + + pattern = message["pattern"] + data = message["data"] + if pattern != ray.gcs_utils.XRAY_HEARTBEAT_BATCH_PATTERN: + return None + pub_message = ray.gcs_utils.PubSubMessage.FromString(data) + heartbeat_data = pub_message.data + heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString( + heartbeat_data) + return heartbeat + + checker = PgLoadChecker() + + # Create 2 placement groups that are infeasible. + pg_feasible = ray.util.placement_group([{"A": 1}]) + pg_infeasible = ray.util.placement_group([{"B": 1}]) + _, unready = ray.wait( + [pg_feasible.ready(), pg_infeasible.ready()], timeout=0) + assert len(unready) == 2 + ray.test_utils.wait_for_condition(checker.nothing_is_ready) + + # Add a node that makes pg feasible. Make sure load include this change. + cluster.add_node(resources={"A": 1}) + ray.get(pg_feasible.ready()) + ray.test_utils.wait_for_condition(checker.only_first_one_ready) + # Create one more infeasible pg and make sure load is properly updated. + pg_infeasible_second = ray.util.placement_group([{"C": 1}]) + _, unready = ray.wait([pg_infeasible_second.ready()], timeout=0) + assert len(unready) == 1 + ray.test_utils.wait_for_condition(checker.two_infeasible_pg) + client.close() if __name__ == "__main__": diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 83c1f1800..7e3c367f5 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -310,3 +310,7 @@ RAY_CONFIG(int, max_io_workers, 1) /// Enable the task timeline. If this is enabled, certain events such as task /// execution are profiled and sent to the GCS. RAY_CONFIG(bool, enable_timeline, true) + +/// The maximum number of pending placement group entries that are reported to monitor to +/// autoscale the cluster. +RAY_CONFIG(int64_t, max_placement_group_load_report_size, 100) diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 910f17fd5..53d38bec7 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -63,6 +63,11 @@ void GcsNodeManager::NodeFailureDetector::HandleHeartbeat( } } +void GcsNodeManager::NodeFailureDetector::UpdatePlacementGroupLoad( + std::shared_ptr placement_group_load) { + placement_group_load_ = absl::make_optional(placement_group_load); +} + /// A periodic timer that checks for timed out clients. void GcsNodeManager::NodeFailureDetector::Tick() { DetectDeadNodes(); @@ -116,6 +121,15 @@ void GcsNodeManager::NodeFailureDetector::SendBatchedHeartbeat() { } } + // Update placement group load to heartbeat batch. + // This is updated only one per second. + if (placement_group_load_.has_value()) { + auto placement_group_load = placement_group_load_.value(); + auto placement_group_load_proto = batch->mutable_placement_group_load(); + placement_group_load_proto->Swap(placement_group_load.get()); + placement_group_load_.reset(); + } + RAY_CHECK_OK(gcs_pub_sub_->Publish(HEARTBEAT_BATCH_CHANNEL, "", batch->SerializeAsString(), nullptr)); heartbeat_buffer_.clear(); @@ -486,5 +500,14 @@ const absl::flat_hash_map> return cluster_realtime_resources_; } +void GcsNodeManager::UpdatePlacementGroupLoad( + std::shared_ptr placement_group_load) const { + // Node failure detector code should be running in a separate thread to avoid heartbeat + // lagging. + node_failure_detector_service_.post([this, placement_group_load] { + node_failure_detector_->UpdatePlacementGroupLoad(move(placement_group_load)); + }); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index db4278a35..f0c38eb18 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -157,6 +157,13 @@ class GcsNodeManager : public rpc::NodeInfoHandler { const absl::flat_hash_map> &GetClusterRealtimeResources() const; + /// Update the placement group load information so that it will be reported through + /// heartbeat. + /// + /// \param placement_group_load placement group load protobuf. + void UpdatePlacementGroupLoad( + std::shared_ptr placement_group_load) const; + protected: class NodeFailureDetector { public: @@ -192,6 +199,12 @@ class GcsNodeManager : public rpc::NodeInfoHandler { void HandleHeartbeat(const NodeID &node_id, const rpc::HeartbeatTableData &heartbeat_data); + /// Public interface to update placement group load information. + /// + /// \param placement_group_load placement group load protobuf. + void UpdatePlacementGroupLoad( + std::shared_ptr placement_group_load); + protected: /// A periodic timer that fires on every heartbeat period. Raylets that have /// not sent a heartbeat within the last num_heartbeats_timeout ticks will be @@ -228,6 +241,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler { std::shared_ptr gcs_pub_sub_; /// Is the detect started. bool is_started_ = false; + /// Placement group load information that is used for autoscaler. + absl::optional> placement_group_load_; }; private: diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index a6b83b419..a9756747d 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -84,10 +84,14 @@ rpc::Bundle *GcsPlacementGroup::GetMutableBundle(int bundle_index) { GcsPlacementGroupManager::GcsPlacementGroupManager( boost::asio::io_context &io_context, std::shared_ptr scheduler, - std::shared_ptr gcs_table_storage) + std::shared_ptr gcs_table_storage, + GcsNodeManager &gcs_node_manager) : io_context_(io_context), gcs_placement_group_scheduler_(std::move(scheduler)), - gcs_table_storage_(std::move(gcs_table_storage)) {} + gcs_table_storage_(std::move(gcs_table_storage)), + gcs_node_manager_(gcs_node_manager) { + Tick(); +} void GcsPlacementGroupManager::RegisterPlacementGroup( const std::shared_ptr &placement_group, StatusCallback callback) { @@ -165,6 +169,7 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess( } void GcsPlacementGroupManager::SchedulePendingPlacementGroups() { + // Update the placement group load to report load information to the autoscaler. if (pending_placement_groups_.empty() || IsSchedulingInProgress()) { return; } @@ -349,5 +354,26 @@ void GcsPlacementGroupManager::OnNodeDead(const NodeID &node_id) { SchedulePendingPlacementGroups(); } +void GcsPlacementGroupManager::Tick() const { + UpdatePlacementGroupLoad(); + execute_after(io_context_, [this] { Tick(); }, 1000 /* milliseconds */); +} + +void GcsPlacementGroupManager::UpdatePlacementGroupLoad() const { + std::shared_ptr placement_group_load = + std::make_shared(); + int total_cnt = 0; + for (const auto pending_pg_spec : pending_placement_groups_) { + auto placement_group_data = placement_group_load->add_placement_group_data(); + auto placement_group_table_data = pending_pg_spec->GetPlacementGroupTableData(); + placement_group_data->Swap(&placement_group_table_data); + total_cnt += 1; + if (total_cnt >= RayConfig::instance().max_placement_group_load_report_size()) { + break; + } + } + gcs_node_manager_.UpdatePlacementGroupLoad(move(placement_group_load)); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h index b5e275739..a6fb807ce 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h @@ -20,6 +20,7 @@ #include "ray/common/id.h" #include "ray/common/task/task_execution_spec.h" #include "ray/common/task/task_spec.h" +#include "ray/gcs/gcs_server/gcs_node_manager.h" #include "ray/gcs/gcs_server/gcs_placement_group_scheduler.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" #include "ray/gcs/pubsub/gcs_pub_sub.h" @@ -107,10 +108,12 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { /// \param io_context The event loop to run the monitor on. /// \param scheduler Used to schedule placement group creation tasks. /// \param gcs_table_storage Used to flush placement group data to storage. + /// \param gcs_node_manager Reference of GcsNodeManager. explicit GcsPlacementGroupManager( boost::asio::io_context &io_context, std::shared_ptr scheduler, - std::shared_ptr gcs_table_storage); + std::shared_ptr gcs_table_storage, + GcsNodeManager &gcs_node_manager); ~GcsPlacementGroupManager() = default; @@ -191,6 +194,12 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { return scheduling_in_progress_id_ != PlacementGroupID::Nil(); } + // Method that is invoked every second. + void Tick() const; + + // Update placement group load information so that the autoscaler can use it. + void UpdatePlacementGroupLoad() const; + /// The io loop that is used to delay execution of tasks (e.g., /// execute_after). boost::asio::io_context &io_context_; @@ -221,6 +230,9 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { /// TODO(sang): Currently, only one placement group can be scheduled at a time. /// We should probably support concurrenet creation (or batching). PlacementGroupID scheduling_in_progress_id_ = PlacementGroupID::Nil(); + + /// Reference of GcsNodeManager. + const GcsNodeManager &gcs_node_manager_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index b0f8a9bbb..7cbe050e4 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -243,7 +243,7 @@ void GcsServer::InitGcsPlacementGroupManager() { }); gcs_placement_group_manager_ = std::make_shared( - main_service_, scheduler, gcs_table_storage_); + main_service_, scheduler, gcs_table_storage_, *gcs_node_manager_); } std::unique_ptr GcsServer::InitObjectManager() { diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc index ca58ee16c..b5b357896 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -55,9 +55,13 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { public: GcsPlacementGroupManagerTest() : mock_placement_group_scheduler_(new MockPlacementGroupScheduler()) { + gcs_pub_sub_ = std::make_shared(redis_client_); gcs_table_storage_ = std::make_shared(io_service_); - gcs_placement_group_manager_.reset(new gcs::GcsPlacementGroupManager( - io_service_, mock_placement_group_scheduler_, gcs_table_storage_)); + gcs_node_manager_ = std::make_shared( + io_service_, io_service_, gcs_pub_sub_, gcs_table_storage_); + gcs_placement_group_manager_.reset( + new gcs::GcsPlacementGroupManager(io_service_, mock_placement_group_scheduler_, + gcs_table_storage_, *gcs_node_manager_)); } void SetUp() override { @@ -81,6 +85,9 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { std::unique_ptr thread_io_service_; boost::asio::io_service io_service_; std::shared_ptr gcs_table_storage_; + std::shared_ptr gcs_node_manager_; + std::shared_ptr gcs_pub_sub_; + std::shared_ptr redis_client_; }; TEST_F(GcsPlacementGroupManagerTest, TestBasic) { diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index a85f2be84..6e34762b2 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -281,6 +281,11 @@ message ResourceLoad { repeated ResourceDemand resource_demands = 1; } +message PlacementGroupLoad { + // The list of pending placement group specifications. + repeated PlacementGroupTableData placement_group_data = 1; +} + message HeartbeatTableData { // Node manager client id bytes client_id = 1; @@ -309,6 +314,8 @@ message HeartbeatBatchTableData { // The total resource demand on all nodes included in the batch, sorted by // resource shape. ResourceLoad resource_load_by_shape = 2; + // The pending list of placement groups. + PlacementGroupLoad placement_group_load = 3; } // Data for a lease on task execution.