mirror of
https://github.com/wassname/ray.git
synced 2026-07-03 03:10:54 +08:00
Lineage cache performance optimization to avoid duplicate GCS requests #5327
This commit is contained in:
@@ -313,16 +313,15 @@ bool LineageCache::UnsubscribeTask(const TaskID &task_id) {
|
||||
}
|
||||
|
||||
void LineageCache::EvictTask(const TaskID &task_id) {
|
||||
// If we haven't received a commit for this task yet, do not evict.
|
||||
auto commit_it = committed_tasks_.find(task_id);
|
||||
if (commit_it == committed_tasks_.end()) {
|
||||
return;
|
||||
}
|
||||
// If the entry has already been evicted, exit.
|
||||
auto entry = lineage_.GetEntry(task_id);
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
// If the entry has not yet been committed, exit.
|
||||
if (entry->GetStatus() != GcsStatus::COMMITTED) {
|
||||
return;
|
||||
}
|
||||
// Entries cannot be safely evicted until their parents are all evicted.
|
||||
for (const auto &parent_id : entry->GetParentTaskIds()) {
|
||||
if (ContainsTask(parent_id)) {
|
||||
@@ -333,7 +332,6 @@ void LineageCache::EvictTask(const TaskID &task_id) {
|
||||
// Evict the task.
|
||||
RAY_LOG(DEBUG) << "Evicting task " << task_id << " on " << client_id_;
|
||||
lineage_.PopEntry(task_id);
|
||||
committed_tasks_.erase(commit_it);
|
||||
// Try to evict the children of the evict task. These are the tasks that have
|
||||
// a dependency on the evicted task.
|
||||
const auto children = lineage_.GetChildren(task_id);
|
||||
@@ -344,13 +342,13 @@ void LineageCache::EvictTask(const TaskID &task_id) {
|
||||
|
||||
void LineageCache::HandleEntryCommitted(const TaskID &task_id) {
|
||||
RAY_LOG(DEBUG) << "Task committed: " << task_id;
|
||||
auto entry = lineage_.GetEntry(task_id);
|
||||
auto entry = lineage_.GetEntryMutable(task_id);
|
||||
if (!entry) {
|
||||
// The task has already been evicted due to a previous commit notification.
|
||||
return;
|
||||
}
|
||||
// Record the commit acknowledgement and attempt to evict the task.
|
||||
committed_tasks_.insert(task_id);
|
||||
entry->SetStatus(GcsStatus::COMMITTED);
|
||||
EvictTask(task_id);
|
||||
// We got the notification about the task's commit, so no longer need any
|
||||
// more notifications.
|
||||
@@ -375,7 +373,6 @@ const Lineage &LineageCache::GetLineage() const { return lineage_; }
|
||||
std::string LineageCache::DebugString() const {
|
||||
std::stringstream result;
|
||||
result << "LineageCache:";
|
||||
result << "\n- committed tasks: " << committed_tasks_.size();
|
||||
result << "\n- child map size: " << lineage_.GetChildrenSize();
|
||||
result << "\n- num subscribed tasks: " << subscribed_tasks_.size();
|
||||
result << "\n- lineage size: " << lineage_.GetEntries().size();
|
||||
@@ -383,8 +380,6 @@ std::string LineageCache::DebugString() const {
|
||||
}
|
||||
|
||||
void LineageCache::RecordMetrics() const {
|
||||
stats::LineageCacheStats().Record(committed_tasks_.size(),
|
||||
{{stats::ValueTypeKey, "num_committed_tasks"}});
|
||||
stats::LineageCacheStats().Record(lineage_.GetChildrenSize(),
|
||||
{{stats::ValueTypeKey, "num_children"}});
|
||||
stats::LineageCacheStats().Record(subscribed_tasks_.size(),
|
||||
|
||||
@@ -28,11 +28,11 @@ enum class GcsStatus {
|
||||
UNCOMMITTED,
|
||||
/// We flushed this task and are waiting for the commit acknowledgement.
|
||||
COMMITTING,
|
||||
// TODO(swang): Add a COMMITTED state for tasks for which we received a
|
||||
// commit acknowledgement, but which we cannot evict yet (due to an ancestor
|
||||
// that has not been evicted). This is to allow a performance optimization
|
||||
// that avoids unnecessary subscribes when we receive tasks that were
|
||||
// already COMMITTED at the sender.
|
||||
// Tasks for which we received a commit acknowledgement, but which we cannot
|
||||
// evict yet (due to an ancestor that has not been evicted). This is to allow
|
||||
// a performance optimization that avoids unnecessary subscribes when we
|
||||
// receive tasks that were already COMMITTED at the sender.
|
||||
COMMITTED,
|
||||
};
|
||||
|
||||
/// \class LineageEntry
|
||||
@@ -306,8 +306,6 @@ class LineageCache {
|
||||
/// The pubsub storage system for task information. This can be used to
|
||||
/// request notifications for the commit of a task entry.
|
||||
gcs::PubsubInterface<TaskID> &task_pubsub_;
|
||||
/// The set of tasks that have been committed but not evicted.
|
||||
std::unordered_set<TaskID> committed_tasks_;
|
||||
/// All tasks and objects that we are responsible for writing back to the
|
||||
/// GCS, and the tasks and objects in their lineage.
|
||||
Lineage lineage_;
|
||||
|
||||
Reference in New Issue
Block a user