diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index a93acc629..820372076 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -68,7 +68,7 @@ logging_options = [ default="auto", help=("Use color logging. " "Auto enables color logging if stdout is a TTY.")), - click.option("-v", "--verbose", count=True) + click.option("-v", "--verbose", default=None, count=True) ] diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 348e5e9cc..9d6b83af6 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -1162,14 +1162,13 @@ ray.shutdown() wait_for_condition(lambda: assert_num_cpus(num_nodes * num_cpu_per_node)) -@pytest.mark.skip("This test is flaky.") @pytest.mark.parametrize( "ray_start_cluster_head", [ generate_system_config_map( num_heartbeats_timeout=20, ping_gcs_rpc_server_max_retries=60) ], indirect=True) -def test_create_placement_group_after_gcs_server_restarts( +def test_create_placement_group_after_gcs_server_restart( ray_start_cluster_head): cluster = ray_start_cluster_head cluster.add_node(num_cpus=2) @@ -1201,7 +1200,6 @@ def test_create_placement_group_after_gcs_server_restarts( assert table["state"] == "PENDING" -@pytest.mark.skip("This test is flaky.") @pytest.mark.parametrize( "ray_start_cluster_head", [ generate_system_config_map( @@ -1226,7 +1224,6 @@ def test_create_actor_with_placement_group_after_gcs_server_restart( assert ray.get(actor_2.method.remote(1)) == 3 -@pytest.mark.skip("This test is flaky.") @pytest.mark.parametrize( "ray_start_cluster_head", [ generate_system_config_map( diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 713887e3b..15511012f 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -576,6 +576,11 @@ class NodeInfoAccessor { /// Resend heartbeat when GCS restarts from a failure. virtual void AsyncReReportHeartbeat() = 0; + /// Return resources in last report. Used by light heartbeat. + std::shared_ptr &GetLastHeartbeatResources() { + return last_heartbeat_resources_; + } + /// Get newest heartbeat of all nodes from GCS asynchronously. Only used when light /// heartbeat enabled. /// @@ -621,6 +626,12 @@ class NodeInfoAccessor { protected: NodeInfoAccessor() = default; + + private: + /// Cache which stores resources in last heartbeat used to check if they are changed. + /// Used by light heartbeat. + std::shared_ptr last_heartbeat_resources_ = + std::make_shared(); }; /// \class ErrorInfoAccessor diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 7b33a9489..30fc67a7d 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -703,12 +703,45 @@ void ServiceBasedNodeInfoAccessor::AsyncReReportHeartbeat() { absl::MutexLock lock(&mutex_); if (cached_heartbeat_.has_heartbeat()) { RAY_LOG(INFO) << "Rereport heartbeat."; + FillHeartbeatRequest(cached_heartbeat_); client_impl_->GetGcsRpcClient().ReportHeartbeat( cached_heartbeat_, [](const Status &status, const rpc::ReportHeartbeatReply &reply) {}); } } +void ServiceBasedNodeInfoAccessor::FillHeartbeatRequest( + rpc::ReportHeartbeatRequest &heartbeat) { + if (RayConfig::instance().light_heartbeat_enabled()) { + SchedulingResources cached_resources = + SchedulingResources(*GetLastHeartbeatResources()); + + auto heartbeat_data = heartbeat.mutable_heartbeat(); + heartbeat_data->clear_resources_total(); + for (const auto &resource_pair : + cached_resources.GetTotalResources().GetResourceMap()) { + (*heartbeat_data->mutable_resources_total())[resource_pair.first] = + resource_pair.second; + } + + heartbeat_data->clear_resources_available(); + heartbeat_data->set_resources_available_changed(true); + for (const auto &resource_pair : + cached_resources.GetAvailableResources().GetResourceMap()) { + (*heartbeat_data->mutable_resources_available())[resource_pair.first] = + resource_pair.second; + } + + heartbeat_data->clear_resource_load(); + heartbeat_data->set_resource_load_changed(true); + for (const auto &resource_pair : + cached_resources.GetLoadResources().GetResourceMap()) { + (*heartbeat_data->mutable_resource_load())[resource_pair.first] = + resource_pair.second; + } + } +} + Status ServiceBasedNodeInfoAccessor::AsyncGetAllHeartbeat( const ItemCallback &callback) { rpc::GetAllHeartbeatRequest request; diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index 6b53c415f..180f8f46b 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -194,6 +194,9 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor { void AsyncReReportHeartbeat() override; + /// Fill resource fields with cached resources. Used by light heartbeat. + void FillHeartbeatRequest(rpc::ReportHeartbeatRequest &heartbeat); + Status AsyncGetAllHeartbeat( const ItemCallback &callback) override; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 3ce5a4da8..079271ebf 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -426,18 +426,19 @@ void NodeManager::Heartbeat() { // TODO(atumanov): implement a ResourceSet const_iterator. // If light heartbeat enabled, we only set filed that represent resources changed. if (light_heartbeat_enabled_) { - if (!last_heartbeat_resources_.GetTotalResources().IsEqual( + auto last_heartbeat_resources = gcs_client_->Nodes().GetLastHeartbeatResources(); + if (!last_heartbeat_resources->GetTotalResources().IsEqual( local_resources.GetTotalResources())) { for (const auto &resource_pair : local_resources.GetTotalResources().GetResourceMap()) { (*heartbeat_data->mutable_resources_total())[resource_pair.first] = resource_pair.second; } - last_heartbeat_resources_.SetTotalResources( + last_heartbeat_resources->SetTotalResources( ResourceSet(local_resources.GetTotalResources())); } - if (!last_heartbeat_resources_.GetAvailableResources().IsEqual( + if (!last_heartbeat_resources->GetAvailableResources().IsEqual( local_resources.GetAvailableResources())) { heartbeat_data->set_resources_available_changed(true); for (const auto &resource_pair : @@ -445,12 +446,12 @@ void NodeManager::Heartbeat() { (*heartbeat_data->mutable_resources_available())[resource_pair.first] = resource_pair.second; } - last_heartbeat_resources_.SetAvailableResources( + last_heartbeat_resources->SetAvailableResources( ResourceSet(local_resources.GetAvailableResources())); } local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); - if (!last_heartbeat_resources_.GetLoadResources().IsEqual( + if (!last_heartbeat_resources->GetLoadResources().IsEqual( local_resources.GetLoadResources())) { heartbeat_data->set_resource_load_changed(true); for (const auto &resource_pair : @@ -458,7 +459,7 @@ void NodeManager::Heartbeat() { (*heartbeat_data->mutable_resource_load())[resource_pair.first] = resource_pair.second; } - last_heartbeat_resources_.SetLoadResources( + last_heartbeat_resources->SetLoadResources( ResourceSet(local_resources.GetLoadResources())); } } else { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 536892830..552977de6 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -726,9 +726,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { uint64_t last_heartbeat_at_ms_; /// Only the changed part will be included in heartbeat if this is true. const bool light_heartbeat_enabled_; - /// Cache which stores resources in last heartbeat used to check if they are changed. - /// Used by light heartbeat. - SchedulingResources last_heartbeat_resources_; /// The time that the last debug string was logged to the console. uint64_t last_debug_dump_at_ms_; /// The number of heartbeats that we should wait before sending the