From e90ecef2970f6f4bb71ef41376f879fa5eef0bb5 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Wed, 1 Aug 2018 00:23:02 -0700 Subject: [PATCH] [xray] Try to flush children of a task that is evicted from the lineage cache (#2531) --- src/ray/raylet/lineage_cache.cc | 101 ++++++++++++++------------- src/ray/raylet/lineage_cache.h | 16 +++-- src/ray/raylet/lineage_cache_test.cc | 47 +++++++++++++ 3 files changed, 110 insertions(+), 54 deletions(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index 7d724cb6d..47daf769b 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -27,8 +27,8 @@ const TaskID LineageEntry::GetEntryId() const { return task_.GetTaskSpecification().TaskId(); } -const std::unordered_set LineageEntry::GetParentTaskIds() const { - std::unordered_set parent_ids; +const std::unordered_set LineageEntry::GetParentTaskIds() const { + std::unordered_set parent_ids; // A task's parents are the tasks that created its arguments. auto dependencies = task_.GetDependencies(); for (auto &dependency : dependencies) { @@ -52,7 +52,7 @@ Lineage::Lineage(const protocol::ForwardTaskRequest &task_request) { } } -boost::optional Lineage::GetEntry(const UniqueID &task_id) const { +boost::optional Lineage::GetEntry(const TaskID &task_id) const { auto entry = entries_.find(task_id); if (entry != entries_.end()) { return entry->second; @@ -61,7 +61,7 @@ boost::optional Lineage::GetEntry(const UniqueID &task_id) } } -boost::optional Lineage::GetEntryMutable(const UniqueID &task_id) { +boost::optional Lineage::GetEntryMutable(const TaskID &task_id) { auto entry = entries_.find(task_id); if (entry != entries_.end()) { return entry->second; @@ -89,7 +89,7 @@ bool Lineage::SetEntry(const Task &task, GcsStatus status) { } } -boost::optional Lineage::PopEntry(const UniqueID &task_id) { +boost::optional Lineage::PopEntry(const TaskID &task_id) { auto entry = entries_.find(task_id); if (entry != entries_.end()) { LineageEntry entry = std::move(entries_.at(task_id)); @@ -100,7 +100,7 @@ boost::optional Lineage::PopEntry(const UniqueID &task_id) { } } -const std::unordered_map &Lineage::GetEntries() const { +const std::unordered_map &Lineage::GetEntries() const { return entries_; } @@ -137,7 +137,7 @@ LineageCache::LineageCache(const ClientID &client_id, /// \param lineage_to The lineage to merge entries into. /// \param stopping_condition A stopping condition for the DFS over /// lineage_from. This should return true if the merge should stop. -void MergeLineageHelper(const UniqueID &task_id, const Lineage &lineage_from, +void MergeLineageHelper(const TaskID &task_id, const Lineage &lineage_from, Lineage &lineage_to, std::function stopping_condition) { // If the entry is not found in the lineage to merge, then we stop since @@ -338,7 +338,7 @@ void LineageCache::Flush() { } } -bool LineageCache::SubscribeTask(const UniqueID &task_id) { +bool LineageCache::SubscribeTask(const TaskID &task_id) { auto inserted = subscribed_tasks_.insert(task_id); bool unsubscribed = inserted.second; if (unsubscribed) { @@ -351,7 +351,7 @@ bool LineageCache::SubscribeTask(const UniqueID &task_id) { return unsubscribed; } -bool LineageCache::UnsubscribeTask(const UniqueID &task_id) { +bool LineageCache::UnsubscribeTask(const TaskID &task_id) { auto it = subscribed_tasks_.find(task_id); bool subscribed = (it != subscribed_tasks_.end()); if (subscribed) { @@ -365,48 +365,10 @@ bool LineageCache::UnsubscribeTask(const UniqueID &task_id) { return subscribed; } -void LineageCache::EvictRemoteLineage(const UniqueID &task_id) { - // Remove the ancestor task. +boost::optional LineageCache::EvictTask(const TaskID &task_id) { auto entry = lineage_.PopEntry(task_id); if (!entry) { - return; - } - // Tasks are committed in data dependency order per node, so the only - // ancestors of a committed task should be other remote tasks. - auto status = entry->GetStatus(); - RAY_CHECK(status == GcsStatus::UNCOMMITTED_REMOTE); - // We are evicting the remote ancestors of a task, so there should not be - // any dependent tasks that need to be flushed. - RAY_CHECK(uncommitted_ready_children_.count(task_id) == 0); - // Unsubscribe from the remote ancestor task if we were subscribed to - // notifications. - UnsubscribeTask(task_id); - // Recurse and remove this task's ancestors. - for (const auto &parent_id : entry->GetParentTaskIds()) { - EvictRemoteLineage(parent_id); - } -} - -void LineageCache::HandleEntryCommitted(const UniqueID &task_id) { - RAY_LOG(DEBUG) << "task committed: " << task_id; - auto entry = lineage_.PopEntry(task_id); - if (!entry) { - // The committed entry has already been evicted. Check that the committed - // entry does not have any dependent tasks, since we should've already - // attempted to flush these tasks on the first commit notification. - RAY_CHECK(uncommitted_ready_children_.count(task_id) == 0); - // Check that we already unsubscribed from the task when handling the - // first commit notification. - RAY_CHECK(subscribed_tasks_.count(task_id) == 0); - // Do nothing if the committed entry has already been evicted. - return; - } - - // Evict the committed task's uncommitted lineage. Since local tasks are - // written in data dependency order, the uncommitted lineage should only - // include remote tasks, i.e. tasks that were committed by a different node. - for (const auto &parent_id : entry->GetParentTaskIds()) { - EvictRemoteLineage(parent_id); + return entry; } // Stop listening for notifications about this task. @@ -431,6 +393,47 @@ void LineageCache::HandleEntryCommitted(const UniqueID &task_id) { } } } + + return entry; +} + +void LineageCache::EvictRemoteLineage(const TaskID &task_id) { + // Remove the ancestor task. + auto entry = EvictTask(task_id); + if (!entry) { + return; + } + // Tasks are committed in data dependency order per node, so the only + // ancestors of a committed task should be other remote tasks. + auto status = entry->GetStatus(); + RAY_CHECK(status == GcsStatus::UNCOMMITTED_REMOTE); + // Recurse and remove this task's ancestors. + for (const auto &parent_id : entry->GetParentTaskIds()) { + EvictRemoteLineage(parent_id); + } +} + +void LineageCache::HandleEntryCommitted(const TaskID &task_id) { + RAY_LOG(DEBUG) << "task committed: " << task_id; + auto entry = EvictTask(task_id); + if (!entry) { + // The committed entry has already been evicted. Check that the committed + // entry does not have any dependent tasks, since we should've already + // attempted to flush these tasks on the first commit notification. + RAY_CHECK(uncommitted_ready_children_.count(task_id) == 0); + // Check that we already unsubscribed from the task when handling the + // first commit notification. + RAY_CHECK(subscribed_tasks_.count(task_id) == 0); + // Do nothing if the committed entry has already been evicted. + return; + } + + // Evict the committed task's uncommitted lineage. Since local tasks are + // written in data dependency order, the uncommitted lineage should only + // include remote tasks, i.e. tasks that were committed by a different node. + for (const auto &parent_id : entry->GetParentTaskIds()) { + EvictRemoteLineage(parent_id); + } } } // namespace raylet diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index 9ce55d033..7987953d0 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -113,7 +113,7 @@ class Lineage { /// \return An optional reference to the entry. If this is empty, then the /// entry ID is not in the lineage. boost::optional GetEntry(const TaskID &entry_id) const; - boost::optional GetEntryMutable(const UniqueID &task_id); + boost::optional GetEntryMutable(const TaskID &task_id); /// Set an entry in the lineage. If an entry with this ID already exists, /// then the entry is overwritten if and only if the new entry has a higher @@ -211,17 +211,23 @@ class LineageCache { /// parents that are not committed yet, then the child will be flushed once /// the parents have been committed. bool FlushTask(const TaskID &task_id); + /// Evict a single task. This should only be called if we are sure that the + /// task has been committed and will trigger an attempt to flush any of the + /// evicted task's children that are in UNCOMMITTED_READY state. Returns an + /// optional reference to the evicted task that is empty if the task was not + /// in the lineage cache. + boost::optional EvictTask(const TaskID &task_id); /// Evict a remote task and its lineage. This should only be called if we /// are sure that the remote task and its lineage are committed. - void EvictRemoteLineage(const UniqueID &task_id); + void EvictRemoteLineage(const TaskID &task_id); /// Subscribe to notifications for a task. Returns whether the operation /// was successful (whether we were not already subscribed). - bool SubscribeTask(const UniqueID &task_id); + bool SubscribeTask(const TaskID &task_id); /// Unsubscribe from notifications for a task. Returns whether the operation /// was successful (whether we were subscribed). - bool UnsubscribeTask(const UniqueID &task_id); + bool UnsubscribeTask(const TaskID &task_id); /// Count the size of unsubscribed and uncommitted lineage - uint64_t CountUnsubscribedLineage(const UniqueID &task_id) const; + uint64_t CountUnsubscribedLineage(const TaskID &task_id) const; /// The client ID, used to request notifications for specific tasks. /// TODO(swang): Move the ClientID into the generic Table implementation. diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index de26467bf..ba6f5367b 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -450,6 +450,53 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) { ASSERT_TRUE(uncommitted_lineage.GetEntries().size() <= max_lineage_size_); } +TEST_F(LineageCacheTest, TestEvictionUncommittedChildren) { + // Insert a chain of dependent tasks. + size_t num_tasks_flushed = 0; + uint64_t lineage_size = max_lineage_size_ + 1; + std::vector tasks; + InsertTaskChain(lineage_cache_, tasks, lineage_size, std::vector(), 1); + + // Simulate forwarding the chain of tasks to a remote node. + for (const auto &task : tasks) { + auto task_id = task.GetTaskSpecification().TaskId(); + lineage_cache_.RemoveWaitingTask(task_id); + } + + // Add more tasks to the lineage cache that will remain local. Each of these + // tasks is dependent one of the tasks that was forwarded above. + for (const auto &task : tasks) { + auto return_id = task.GetTaskSpecification().ReturnId(0); + auto dependent_task = ExampleTask({return_id}, 1); + lineage_cache_.AddWaitingTask(dependent_task, Lineage()); + lineage_cache_.AddReadyTask(dependent_task); + // Once the forwarded tasks are evicted from the lineage cache, we expect + // each of these dependent tasks to be flushed, since all of their + // dependencies have been committed. + num_tasks_flushed++; + } + + // Check that the last task in the chain still has all tasks in its + // uncommitted lineage. + const auto last_task_id = tasks.back().GetTaskSpecification().TaskId(); + auto uncommitted_lineage = lineage_cache_.GetUncommittedLineage(last_task_id); + ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size); + + // Simulate executing the last task on a remote node and adding it to the + // GCS. + auto task_data = std::make_shared(); + auto it = tasks.rbegin(); + RAY_CHECK_OK(mock_gcs_.RemoteAdd(it->GetTaskSpecification().TaskId(), task_data)); + it++; + // We expect the task that was added remotely to be flushed. + num_tasks_flushed++; + // Check that once the last task in the forwarded chain is flushed, all local + // tasks are flushed, since all of their dependencies have been evicted and + // are therefore committed in the GCS. + mock_gcs_.Flush(); + CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); +} + } // namespace raylet } // namespace ray