From e86bc431ccae77daf523dd35af4650382fb672c2 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Wed, 31 Jul 2019 10:43:29 -0700 Subject: [PATCH] Lineage cache performance optimization to avoid duplicate GCS requests #5327 --- src/ray/raylet/lineage_cache.cc | 17 ++++++----------- src/ray/raylet/lineage_cache.h | 12 +++++------- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index 1bdd7d4a2..e000d7e55 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -313,16 +313,15 @@ bool LineageCache::UnsubscribeTask(const TaskID &task_id) { } void LineageCache::EvictTask(const TaskID &task_id) { - // If we haven't received a commit for this task yet, do not evict. - auto commit_it = committed_tasks_.find(task_id); - if (commit_it == committed_tasks_.end()) { - return; - } // If the entry has already been evicted, exit. auto entry = lineage_.GetEntry(task_id); if (!entry) { return; } + // If the entry has not yet been committed, exit. + if (entry->GetStatus() != GcsStatus::COMMITTED) { + return; + } // Entries cannot be safely evicted until their parents are all evicted. for (const auto &parent_id : entry->GetParentTaskIds()) { if (ContainsTask(parent_id)) { @@ -333,7 +332,6 @@ void LineageCache::EvictTask(const TaskID &task_id) { // Evict the task. RAY_LOG(DEBUG) << "Evicting task " << task_id << " on " << client_id_; lineage_.PopEntry(task_id); - committed_tasks_.erase(commit_it); // Try to evict the children of the evict task. These are the tasks that have // a dependency on the evicted task. const auto children = lineage_.GetChildren(task_id); @@ -344,13 +342,13 @@ void LineageCache::EvictTask(const TaskID &task_id) { void LineageCache::HandleEntryCommitted(const TaskID &task_id) { RAY_LOG(DEBUG) << "Task committed: " << task_id; - auto entry = lineage_.GetEntry(task_id); + auto entry = lineage_.GetEntryMutable(task_id); if (!entry) { // The task has already been evicted due to a previous commit notification. return; } // Record the commit acknowledgement and attempt to evict the task. - committed_tasks_.insert(task_id); + entry->SetStatus(GcsStatus::COMMITTED); EvictTask(task_id); // We got the notification about the task's commit, so no longer need any // more notifications. @@ -375,7 +373,6 @@ const Lineage &LineageCache::GetLineage() const { return lineage_; } std::string LineageCache::DebugString() const { std::stringstream result; result << "LineageCache:"; - result << "\n- committed tasks: " << committed_tasks_.size(); result << "\n- child map size: " << lineage_.GetChildrenSize(); result << "\n- num subscribed tasks: " << subscribed_tasks_.size(); result << "\n- lineage size: " << lineage_.GetEntries().size(); @@ -383,8 +380,6 @@ std::string LineageCache::DebugString() const { } void LineageCache::RecordMetrics() const { - stats::LineageCacheStats().Record(committed_tasks_.size(), - {{stats::ValueTypeKey, "num_committed_tasks"}}); stats::LineageCacheStats().Record(lineage_.GetChildrenSize(), {{stats::ValueTypeKey, "num_children"}}); stats::LineageCacheStats().Record(subscribed_tasks_.size(), diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index c30bd2da4..13d8b5598 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -28,11 +28,11 @@ enum class GcsStatus { UNCOMMITTED, /// We flushed this task and are waiting for the commit acknowledgement. COMMITTING, - // TODO(swang): Add a COMMITTED state for tasks for which we received a - // commit acknowledgement, but which we cannot evict yet (due to an ancestor - // that has not been evicted). This is to allow a performance optimization - // that avoids unnecessary subscribes when we receive tasks that were - // already COMMITTED at the sender. + // Tasks for which we received a commit acknowledgement, but which we cannot + // evict yet (due to an ancestor that has not been evicted). This is to allow + // a performance optimization that avoids unnecessary subscribes when we + // receive tasks that were already COMMITTED at the sender. + COMMITTED, }; /// \class LineageEntry @@ -306,8 +306,6 @@ class LineageCache { /// The pubsub storage system for task information. This can be used to /// request notifications for the commit of a task entry. gcs::PubsubInterface &task_pubsub_; - /// The set of tasks that have been committed but not evicted. - std::unordered_set committed_tasks_; /// All tasks and objects that we are responsible for writing back to the /// GCS, and the tasks and objects in their lineage. Lineage lineage_;