diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 35219d944..60e1642e5 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -233,7 +233,7 @@ RAY_CONFIG(uint32_t, gcs_create_actor_retry_interval_ms, 200) /// Duration to wait between retries for creating placement group in gcs server. RAY_CONFIG(uint32_t, gcs_create_placement_group_retry_interval_ms, 200) /// Maximum number of destroyed actors in GCS server memory cache. -RAY_CONFIG(uint32_t, maximum_gcs_destroyed_actor_cached_count, 10000) +RAY_CONFIG(uint32_t, maximum_gcs_destroyed_actor_cached_count, 100000) /// Maximum number of dead nodes in GCS server memory cache. RAY_CONFIG(uint32_t, maximum_gcs_dead_node_cached_count, 1000) diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 9969b75f4..c1912fe5f 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -1324,21 +1324,36 @@ TEST_F(ServiceBasedGcsClientTest, DISABLED_TestGetActorPerf) { << actor_count << " actors."; } -TEST_F(ServiceBasedGcsClientTest, TestRandomEvictDestroyedActors) { +TEST_F(ServiceBasedGcsClientTest, TestEvictExpiredDestroyedActors) { // Register actors and the actors will be destroyed. JobID job_id = JobID::FromInt(1); - int actor_count = 20; + int actor_count = RayConfig::instance().maximum_gcs_destroyed_actor_cached_count(); for (int index = 0; index < actor_count; ++index) { auto actor_table_data = Mocker::GenActorTableData(job_id); RegisterActor(actor_table_data, false); } + // Restart GCS. + RestartGcsServer(); + + absl::flat_hash_set actor_ids; + for (int index = 0; index < actor_count; ++index) { + auto actor_table_data = Mocker::GenActorTableData(job_id); + RegisterActor(actor_table_data, false); + actor_ids.insert(ActorID::FromBinary(actor_table_data->actor_id())); + } + // Get all actors. auto condition = [this]() { return GetAllActors().size() == RayConfig::instance().maximum_gcs_destroyed_actor_cached_count(); }; EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count())); + + auto actors = GetAllActors(); + for (const auto &actor : actors) { + EXPECT_TRUE(actor_ids.contains(ActorID::FromBinary(actor.actor_id()))); + } } TEST_F(ServiceBasedGcsClientTest, TestEvictExpiredDeadNodes) { diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index bb4e83a67..d65ae6b89 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -569,6 +569,7 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) { RAY_CHECK(it != registered_actors_.end()) << "Tried to destroy actor that does not exist " << actor_id; it->second->GetMutableActorTableData()->mutable_task_spec()->Clear(); + it->second->GetMutableActorTableData()->set_timestamp(current_sys_time_ms()); AddDestroyedActorToCache(it->second); const auto actor = std::move(it->second); registered_actors_.erase(it); @@ -964,9 +965,15 @@ void GcsActorManager::LoadInitialData(const EmptyCallback &done) { node_to_workers[actor->GetNodeID()].emplace_back(actor->GetWorkerID()); } } else { - AddDestroyedActorToCache(actor); + destroyed_actors_.emplace(item.first, actor); + sorted_destroyed_actor_list_.emplace_back(item.first, + (int64_t)item.second.timestamp()); } } + sorted_destroyed_actor_list_.sort([](const std::pair &left, + const std::pair &right) { + return left.second < right.second; + }); // Notify raylets to release unused workers. gcs_actor_scheduler_->ReleaseUnusedWorkers(node_to_workers); @@ -1113,9 +1120,14 @@ void GcsActorManager::KillActor(const std::shared_ptr &actor) { void GcsActorManager::AddDestroyedActorToCache(const std::shared_ptr &actor) { if (destroyed_actors_.size() >= RayConfig::instance().maximum_gcs_destroyed_actor_cached_count()) { - destroyed_actors_.erase(destroyed_actors_.begin()); + const auto &actor_id = sorted_destroyed_actor_list_.begin()->first; + RAY_CHECK_OK(gcs_table_storage_->ActorTable().Delete(actor_id, nullptr)); + destroyed_actors_.erase(actor_id); + sorted_destroyed_actor_list_.erase(sorted_destroyed_actor_list_.begin()); } destroyed_actors_.emplace(actor->GetActorID(), actor); + sorted_destroyed_actor_list_.emplace_back( + actor->GetActorID(), (int64_t)actor->GetActorTableData().timestamp()); } } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index dabb81f61..995c3d4ce 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -374,6 +374,9 @@ class GcsActorManager : public rpc::ActorInfoHandler { absl::flat_hash_map> registered_actors_; /// All destroyed actors. absl::flat_hash_map> destroyed_actors_; + /// The actors are sorted according to the timestamp, and the oldest is at the head of + /// the list. + std::list> sorted_destroyed_actor_list_; /// Maps actor names to their actor ID for lookups by name. absl::flat_hash_map named_actors_; /// The actors which dependencies have not been resolved. diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 299ebe4fb..4e23930d9 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -522,6 +522,8 @@ void GcsNodeManager::UpdatePlacementGroupLoad( void GcsNodeManager::AddDeadNodeToCache(std::shared_ptr node) { if (dead_nodes_.size() >= RayConfig::instance().maximum_gcs_dead_node_cached_count()) { + const auto &node_id = sorted_dead_node_list_.begin()->first; + RAY_CHECK_OK(gcs_table_storage_->NodeTable().Delete(node_id, nullptr)); dead_nodes_.erase(sorted_dead_node_list_.begin()->first); sorted_dead_node_list_.erase(sorted_dead_node_list_.begin()); } diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index ad7aa56f5..e3d50f042 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -138,7 +138,7 @@ message ActorTableData { bool is_detached = 11; // Name of the actor. Only populated if is_detached is true. string name = 12; - // Timestamp that the actor is created or reconstructed. + // Last timestamp that the actor state was updated. double timestamp = 13; // The task specification of this actor's creation task. TaskSpec task_spec = 14;