mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 21:46:57 +08:00
[GCS]Eviction of destroyed actors cached in GCS (#11338)
This commit is contained in:
@@ -233,7 +233,7 @@ RAY_CONFIG(uint32_t, gcs_create_actor_retry_interval_ms, 200)
|
||||
/// Duration to wait between retries for creating placement group in gcs server.
|
||||
RAY_CONFIG(uint32_t, gcs_create_placement_group_retry_interval_ms, 200)
|
||||
/// Maximum number of destroyed actors in GCS server memory cache.
|
||||
RAY_CONFIG(uint32_t, maximum_gcs_destroyed_actor_cached_count, 10000)
|
||||
RAY_CONFIG(uint32_t, maximum_gcs_destroyed_actor_cached_count, 100000)
|
||||
/// Maximum number of dead nodes in GCS server memory cache.
|
||||
RAY_CONFIG(uint32_t, maximum_gcs_dead_node_cached_count, 1000)
|
||||
|
||||
|
||||
@@ -1324,21 +1324,36 @@ TEST_F(ServiceBasedGcsClientTest, DISABLED_TestGetActorPerf) {
|
||||
<< actor_count << " actors.";
|
||||
}
|
||||
|
||||
TEST_F(ServiceBasedGcsClientTest, TestRandomEvictDestroyedActors) {
|
||||
TEST_F(ServiceBasedGcsClientTest, TestEvictExpiredDestroyedActors) {
|
||||
// Register actors and the actors will be destroyed.
|
||||
JobID job_id = JobID::FromInt(1);
|
||||
int actor_count = 20;
|
||||
int actor_count = RayConfig::instance().maximum_gcs_destroyed_actor_cached_count();
|
||||
for (int index = 0; index < actor_count; ++index) {
|
||||
auto actor_table_data = Mocker::GenActorTableData(job_id);
|
||||
RegisterActor(actor_table_data, false);
|
||||
}
|
||||
|
||||
// Restart GCS.
|
||||
RestartGcsServer();
|
||||
|
||||
absl::flat_hash_set<ActorID> actor_ids;
|
||||
for (int index = 0; index < actor_count; ++index) {
|
||||
auto actor_table_data = Mocker::GenActorTableData(job_id);
|
||||
RegisterActor(actor_table_data, false);
|
||||
actor_ids.insert(ActorID::FromBinary(actor_table_data->actor_id()));
|
||||
}
|
||||
|
||||
// Get all actors.
|
||||
auto condition = [this]() {
|
||||
return GetAllActors().size() ==
|
||||
RayConfig::instance().maximum_gcs_destroyed_actor_cached_count();
|
||||
};
|
||||
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));
|
||||
|
||||
auto actors = GetAllActors();
|
||||
for (const auto &actor : actors) {
|
||||
EXPECT_TRUE(actor_ids.contains(ActorID::FromBinary(actor.actor_id())));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ServiceBasedGcsClientTest, TestEvictExpiredDeadNodes) {
|
||||
|
||||
@@ -569,6 +569,7 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) {
|
||||
RAY_CHECK(it != registered_actors_.end())
|
||||
<< "Tried to destroy actor that does not exist " << actor_id;
|
||||
it->second->GetMutableActorTableData()->mutable_task_spec()->Clear();
|
||||
it->second->GetMutableActorTableData()->set_timestamp(current_sys_time_ms());
|
||||
AddDestroyedActorToCache(it->second);
|
||||
const auto actor = std::move(it->second);
|
||||
registered_actors_.erase(it);
|
||||
@@ -964,9 +965,15 @@ void GcsActorManager::LoadInitialData(const EmptyCallback &done) {
|
||||
node_to_workers[actor->GetNodeID()].emplace_back(actor->GetWorkerID());
|
||||
}
|
||||
} else {
|
||||
AddDestroyedActorToCache(actor);
|
||||
destroyed_actors_.emplace(item.first, actor);
|
||||
sorted_destroyed_actor_list_.emplace_back(item.first,
|
||||
(int64_t)item.second.timestamp());
|
||||
}
|
||||
}
|
||||
sorted_destroyed_actor_list_.sort([](const std::pair<ActorID, int64_t> &left,
|
||||
const std::pair<ActorID, int64_t> &right) {
|
||||
return left.second < right.second;
|
||||
});
|
||||
|
||||
// Notify raylets to release unused workers.
|
||||
gcs_actor_scheduler_->ReleaseUnusedWorkers(node_to_workers);
|
||||
@@ -1113,9 +1120,14 @@ void GcsActorManager::KillActor(const std::shared_ptr<GcsActor> &actor) {
|
||||
void GcsActorManager::AddDestroyedActorToCache(const std::shared_ptr<GcsActor> &actor) {
|
||||
if (destroyed_actors_.size() >=
|
||||
RayConfig::instance().maximum_gcs_destroyed_actor_cached_count()) {
|
||||
destroyed_actors_.erase(destroyed_actors_.begin());
|
||||
const auto &actor_id = sorted_destroyed_actor_list_.begin()->first;
|
||||
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Delete(actor_id, nullptr));
|
||||
destroyed_actors_.erase(actor_id);
|
||||
sorted_destroyed_actor_list_.erase(sorted_destroyed_actor_list_.begin());
|
||||
}
|
||||
destroyed_actors_.emplace(actor->GetActorID(), actor);
|
||||
sorted_destroyed_actor_list_.emplace_back(
|
||||
actor->GetActorID(), (int64_t)actor->GetActorTableData().timestamp());
|
||||
}
|
||||
|
||||
} // namespace gcs
|
||||
|
||||
@@ -374,6 +374,9 @@ class GcsActorManager : public rpc::ActorInfoHandler {
|
||||
absl::flat_hash_map<ActorID, std::shared_ptr<GcsActor>> registered_actors_;
|
||||
/// All destroyed actors.
|
||||
absl::flat_hash_map<ActorID, std::shared_ptr<GcsActor>> destroyed_actors_;
|
||||
/// The actors are sorted according to the timestamp, and the oldest is at the head of
|
||||
/// the list.
|
||||
std::list<std::pair<ActorID, int64_t>> sorted_destroyed_actor_list_;
|
||||
/// Maps actor names to their actor ID for lookups by name.
|
||||
absl::flat_hash_map<std::string, ActorID> named_actors_;
|
||||
/// The actors which dependencies have not been resolved.
|
||||
|
||||
@@ -522,6 +522,8 @@ void GcsNodeManager::UpdatePlacementGroupLoad(
|
||||
|
||||
void GcsNodeManager::AddDeadNodeToCache(std::shared_ptr<rpc::GcsNodeInfo> node) {
|
||||
if (dead_nodes_.size() >= RayConfig::instance().maximum_gcs_dead_node_cached_count()) {
|
||||
const auto &node_id = sorted_dead_node_list_.begin()->first;
|
||||
RAY_CHECK_OK(gcs_table_storage_->NodeTable().Delete(node_id, nullptr));
|
||||
dead_nodes_.erase(sorted_dead_node_list_.begin()->first);
|
||||
sorted_dead_node_list_.erase(sorted_dead_node_list_.begin());
|
||||
}
|
||||
|
||||
@@ -138,7 +138,7 @@ message ActorTableData {
|
||||
bool is_detached = 11;
|
||||
// Name of the actor. Only populated if is_detached is true.
|
||||
string name = 12;
|
||||
// Timestamp that the actor is created or reconstructed.
|
||||
// Last timestamp that the actor state was updated.
|
||||
double timestamp = 13;
|
||||
// The task specification of this actor's creation task.
|
||||
TaskSpec task_spec = 14;
|
||||
|
||||
Reference in New Issue
Block a user