From dede80f3df880ccfbe2abc942791271395c5403d Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Tue, 14 Aug 2018 15:25:32 -0700 Subject: [PATCH] [xray] Reduce fatal checks in the lineage cache that fail during reconstruction (#2642) * Loosen checks in the lineage cache and log appropriate warnings in the node manager * revert test --- src/ray/raylet/lineage_cache.cc | 151 ++++++++++++++++----------- src/ray/raylet/lineage_cache.h | 17 ++- src/ray/raylet/lineage_cache_test.cc | 44 ++++---- src/ray/raylet/node_manager.cc | 31 ++++-- 4 files changed, 148 insertions(+), 95 deletions(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index 0b49adb88..72aa7b045 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -171,8 +171,9 @@ void MergeLineageHelper(const TaskID &task_id, const Lineage &lineage_from, } } -void LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_lineage) { +bool LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_lineage) { auto task_id = task.GetTaskSpecification().TaskId(); + RAY_LOG(DEBUG) << "add waiting task " << task_id << " on " << client_id_; // Merge the uncommitted lineage into the lineage cache. MergeLineageHelper(task_id, uncommitted_lineage, lineage_, [](const LineageEntry &entry) { @@ -186,36 +187,38 @@ void LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_l return false; }); - // If the task was previously remote, then we may have been subscribed to - // it. Unsubscribe since we are now responsible for committing the task. auto entry = lineage_.GetEntry(task_id); if (entry) { - RAY_CHECK(entry->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE); - UnsubscribeTask(task_id); + if (entry->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE) { + // The task was previously remote, so we may have been subscribed to it. + // Unsubscribe since we are now responsible for committing the task. + UnsubscribeTask(task_id); + } } // Add the submitted task to the lineage cache as UNCOMMITTED_WAITING. It // should be marked as UNCOMMITTED_READY once the task starts execution. - RAY_CHECK(lineage_.SetEntry(task, GcsStatus::UNCOMMITTED_WAITING)); + return lineage_.SetEntry(task, GcsStatus::UNCOMMITTED_WAITING); } -void LineageCache::AddReadyTask(const Task &task) { +bool LineageCache::AddReadyTask(const Task &task) { const TaskID task_id = task.GetTaskSpecification().TaskId(); + RAY_LOG(DEBUG) << "add ready task " << task_id << " on " << client_id_; - // Tasks can only become READY if they were in WAITING. - auto entry = lineage_.GetEntryMutable(task_id); - RAY_CHECK(entry); - RAY_CHECK(entry->GetStatus() == GcsStatus::UNCOMMITTED_WAITING); - - entry->SetStatus(GcsStatus::UNCOMMITTED_READY); - // TaskSepc is immutable, just update TaskExecSpec. - entry->TaskDataMutable().CopyTaskExecutionSpec(task); - // Attempt to flush the task. - bool flushed = FlushTask(task_id); - if (!flushed) { - // If we fail to flush the task here, due to uncommitted parents, then add - // the task to a cache to be flushed in the future. - uncommitted_ready_tasks_.insert(task_id); + // Set the task to READY. + if (lineage_.SetEntry(task, GcsStatus::UNCOMMITTED_READY)) { + // Attempt to flush the task. + bool flushed = FlushTask(task_id); + if (!flushed) { + // If we fail to flush the task here, due to uncommitted parents, then add + // the task to a cache to be flushed in the future. + uncommitted_ready_tasks_.insert(task_id); + } + return true; + } else { + // The task was already ready to be committed (UNCOMMITTED_READY) or + // committing (COMMITTING). + return false; } } @@ -229,7 +232,9 @@ uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id, return 0; } auto entry = lineage_.GetEntry(task_id); - if (!entry) { + // Only count tasks that are remote. Tasks that are local will be evicted + // once they are committed in the GCS, along with their lineage. + if (!entry || entry->GetStatus() != GcsStatus::UNCOMMITTED_REMOTE) { return 0; } uint64_t cnt = 1; @@ -239,36 +244,58 @@ uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id, return cnt; } -void LineageCache::RemoveWaitingTask(const TaskID &task_id) { +bool LineageCache::RemoveWaitingTask(const TaskID &task_id) { + RAY_LOG(DEBUG) << "remove waiting task " << task_id << " on " << client_id_; auto entry = lineage_.GetEntryMutable(task_id); if (!entry) { - return; + // The task was already evicted. + return false; + } + + // If the task is already not in WAITING status, then exit. This should only + // happen when there are two copies of the task executing at the node, due to + // a spurious reconstruction. Then, either the task is already past WAITING + // status, in which case it will be committed, or it is in + // UNCOMMITTED_REMOTE, in which case it was already removed. + if (entry->GetStatus() != GcsStatus::UNCOMMITTED_WAITING) { + return false; } - // It's only okay to remove a task that is waiting for execution. - // TODO(swang): Is this necessarily true when there is reconstruction? - RAY_CHECK(entry->GetStatus() == GcsStatus::UNCOMMITTED_WAITING); // Reset the status to REMOTE. We keep the task instead of removing it // completely in case another task is submitted locally that depends on this // one. entry->ResetStatus(GcsStatus::UNCOMMITTED_REMOTE); - // Request a notification for every max_lineage_size_ tasks, - // so that the task and its uncommitted lineage can be evicted - // once the commit notification is received. - // By doing this, we make sure that the unevicted lineage won't be more than - // max_lineage_size_, and the number of subscribed tasks won't be more than - // N / max_lineage_size_, where N is the size of the task chain. - // NOTE(swang): The number of entries in the uncommitted lineage also - // includes local tasks that haven't been committed yet, not just remote - // tasks, so this is an overestimate. - std::unordered_set seen; - auto count = CountUnsubscribedLineage(task_id, seen); - if (count > max_lineage_size_) { - // Since this task was in state WAITING, check that we were not - // already subscribed to the task. + // Subscribe to the task if necessary. We do this if it has any local + // children that must be written to the GCS, or if its uncommitted remote + // lineage is too large. + if (uncommitted_ready_children_.find(task_id) != uncommitted_ready_children_.end()) { + // Subscribe to the task if it has any children in UNCOMMITTED_READY. We + // will attempt to flush its children once we receive a notification for + // this task's commit. Since this task was in state WAITING, check that we + // were not already subscribed to the task. RAY_CHECK(SubscribeTask(task_id)); + } else { + // Check if the uncommitted remote lineage is too large. Request a + // notification for every max_lineage_size_ tasks, so that the task and its + // uncommitted lineage can be evicted once the commit notification is + // received. By doing this, we make sure that the unevicted lineage won't + // be more than max_lineage_size_, and the number of subscribed tasks won't + // be more than N / max_lineage_size_, where N is the size of the task + // chain. + // NOTE(swang): The number of entries in the uncommitted lineage also + // includes local tasks that haven't been committed yet, not just remote + // tasks, so this is an overestimate. + std::unordered_set seen; + auto count = CountUnsubscribedLineage(task_id, seen); + if (count >= max_lineage_size_) { + // Since this task was in state WAITING, check that we were not + // already subscribed to the task. + RAY_CHECK(SubscribeTask(task_id)); + } } + // The task was successfully reset to UNCOMMITTED_REMOTE. + return true; } void LineageCache::MarkTaskAsForwarded(const TaskID &task_id, const ClientID &node_id) { @@ -305,8 +332,6 @@ bool LineageCache::FlushTask(const TaskID &task_id) { // committed yet, then as far as we know, it's still in flight to the // GCS. Skip this task for now. if (parent) { - RAY_CHECK(parent->GetStatus() != GcsStatus::UNCOMMITTED_WAITING) - << "Children should not become ready to flush before their parents."; // Request notifications about the parent entry's commit in the GCS if // the parent is remote. Otherwise, the parent is local and will // eventually be flushed. In either case, once we receive a @@ -389,8 +414,17 @@ bool LineageCache::UnsubscribeTask(const TaskID &task_id) { } boost::optional LineageCache::EvictTask(const TaskID &task_id) { + RAY_LOG(DEBUG) << "evicting task " << task_id << " on " << client_id_; auto entry = lineage_.PopEntry(task_id); if (!entry) { + // The entry has already been evicted. Check that the entry does not have + // any dependent tasks, since we should've already attempted to flush these + // tasks on the first eviction. + RAY_CHECK(uncommitted_ready_children_.count(task_id) == 0); + // Check that we already unsubscribed from the task when handling the + // first eviction. + RAY_CHECK(subscribed_tasks_.count(task_id) == 0); + // Do nothing if the entry has already been evicted. return entry; } @@ -421,18 +455,19 @@ boost::optional LineageCache::EvictTask(const TaskID &task_id) { } void LineageCache::EvictRemoteLineage(const TaskID &task_id) { - // Remove the ancestor task. - auto entry = EvictTask(task_id); + auto entry = lineage_.GetEntry(task_id); if (!entry) { return; } - // Tasks are committed in data dependency order per node, so the only - // ancestors of a committed task should be other remote tasks. - auto status = entry->GetStatus(); - RAY_CHECK(status == GcsStatus::UNCOMMITTED_REMOTE); - // Recurse and remove this task's ancestors. - for (const auto &parent_id : entry->GetParentTaskIds()) { - EvictRemoteLineage(parent_id); + // Only evict tasks that are remote. Other tasks, and their lineage, will be + // evicted once they are committed. + if (entry->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE) { + // Remove the ancestor task. + auto evicted_entry = EvictTask(task_id); + // Recurse and remove this task's ancestors. + for (const auto &parent_id : evicted_entry->GetParentTaskIds()) { + EvictRemoteLineage(parent_id); + } } } @@ -440,20 +475,16 @@ void LineageCache::HandleEntryCommitted(const TaskID &task_id) { RAY_LOG(DEBUG) << "task committed: " << task_id; auto entry = EvictTask(task_id); if (!entry) { - // The committed entry has already been evicted. Check that the committed - // entry does not have any dependent tasks, since we should've already - // attempted to flush these tasks on the first commit notification. - RAY_CHECK(uncommitted_ready_children_.count(task_id) == 0); - // Check that we already unsubscribed from the task when handling the - // first commit notification. - RAY_CHECK(subscribed_tasks_.count(task_id) == 0); - // Do nothing if the committed entry has already been evicted. + // The task has already been evicted due to a previous commit notification, + // or because one of its descendants was committed. return; } // Evict the committed task's uncommitted lineage. Since local tasks are // written in data dependency order, the uncommitted lineage should only // include remote tasks, i.e. tasks that were committed by a different node. + // In case of reconstruction, the uncommitted lineage may also include local + // tasks that were resubmitted. These tasks are not evicted. for (const auto &parent_id : entry->GetParentTaskIds()) { EvictRemoteLineage(parent_id); } diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index 97ee6dd61..d5d72027f 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -184,19 +184,30 @@ class LineageCache { /// \param uncommitted_lineage The task's uncommitted lineage. These are the /// tasks that the given task is data-dependent on, but that have not /// been made durable in the GCS, as far the task's submitter knows. - void AddWaitingTask(const Task &task, const Lineage &uncommitted_lineage); + /// \return Whether the task was successfully marked as waiting to be + /// committed. This will return false if the task is already waiting to be + /// committed (UNCOMMITTED_WAITING), ready to be committed + /// (UNCOMMITTED_READY), or committing (COMMITTING). + bool AddWaitingTask(const Task &task, const Lineage &uncommitted_lineage); /// Add a task that is ready for GCS writeback. This overwrites the task’s /// mutable fields in the execution specification. /// /// \param task The task to set as ready. - void AddReadyTask(const Task &task); + /// \return Whether the task was successfully marked as ready to be + /// committed. This will return false if the task is already ready to be + /// committed (UNCOMMITTED_READY) or committing (COMMITTING). + bool AddReadyTask(const Task &task); /// Remove a task that was waiting for execution. Its uncommitted lineage /// will remain unchanged. /// /// \param task_id The ID of the waiting task to remove. - void RemoveWaitingTask(const TaskID &task_id); + /// \return Whether the task was successfully removed. This will return false + /// if the task is not waiting to be committed. Then, the waiting task has + /// already been removed (UNCOMMITTED_REMOTE), or if it's ready to be + /// committed (UNCOMMITTED_READY) or committing (COMMITTING). + bool RemoveWaitingTask(const TaskID &task_id); /// Mark a task as having been explicitly forwarded to a node. /// The lineage of the task is implicitly assumed to have also been forwarded. diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index 240d622ec..c1bfe2517 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -129,7 +129,7 @@ std::vector InsertTaskChain(LineageCache &lineage_cache, std::vector arguments = initial_arguments; for (int i = 0; i < chain_size; i++) { auto task = ExampleTask(arguments, num_returns); - lineage_cache.AddWaitingTask(task, empty_lineage); + RAY_CHECK(lineage_cache.AddWaitingTask(task, empty_lineage)); inserted_tasks.push_back(task); arguments.clear(); for (int j = 0; j < task.GetTaskSpecification().NumReturns(); j++) { @@ -245,7 +245,7 @@ TEST_F(LineageCacheTest, TestWritebackReady) { InsertTaskChain(lineage_cache_, tasks, 3, std::vector(), 1); // Check that after marking the first task as ready, we flush only that task. - lineage_cache_.AddReadyTask(tasks.front()); + ASSERT_TRUE(lineage_cache_.AddReadyTask(tasks.front())); num_tasks_flushed++; CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); } @@ -259,7 +259,7 @@ TEST_F(LineageCacheTest, TestWritebackOrder) { // Mark all tasks as ready. The first task, which has no dependencies, should // be flushed. for (const auto &task : tasks) { - lineage_cache_.AddReadyTask(task); + ASSERT_TRUE(lineage_cache_.AddReadyTask(task)); } // Check that we write back the tasks in order of data dependencies. for (size_t i = 0; i < tasks.size(); i++) { @@ -287,20 +287,20 @@ TEST_F(LineageCacheTest, TestWritebackPartiallyReady) { auto dependencies = dependent_task.GetDependencies(); // Insert all tasks as waiting for execution. - lineage_cache_.AddWaitingTask(task1, Lineage()); - lineage_cache_.AddWaitingTask(task2, Lineage()); - lineage_cache_.AddWaitingTask(dependent_task, Lineage()); + ASSERT_TRUE(lineage_cache_.AddWaitingTask(task1, Lineage())); + ASSERT_TRUE(lineage_cache_.AddWaitingTask(task2, Lineage())); + ASSERT_TRUE(lineage_cache_.AddWaitingTask(dependent_task, Lineage())); // Flush one of the independent tasks. - lineage_cache_.AddReadyTask(task1); + ASSERT_TRUE(lineage_cache_.AddReadyTask(task1)); num_tasks_flushed++; CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); // Flush acknowledgements. The lineage cache should receive the commit for // the first task. mock_gcs_.Flush(); // Mark the other independent task and the dependent as ready. - lineage_cache_.AddReadyTask(task2); - lineage_cache_.AddReadyTask(dependent_task); + ASSERT_TRUE(lineage_cache_.AddReadyTask(task2)); + ASSERT_TRUE(lineage_cache_.AddReadyTask(dependent_task)); // Two tasks are ready, but only the independent task should be flushed. The // dependent task should only be flushed once commits for both independent // tasks are received. @@ -326,14 +326,14 @@ TEST_F(LineageCacheTest, TestForwardTasksRoundTrip) { // Simulate removing the task and forwarding it to another node. auto uncommitted_lineage = lineage_cache_.GetUncommittedLineage(task_id, ClientID::nil()); - lineage_cache_.RemoveWaitingTask(task_id); + ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id)); // Simulate receiving the task again. Make sure we can add the task back. flatbuffers::FlatBufferBuilder fbb; auto uncommitted_lineage_message = uncommitted_lineage.ToFlatbuffer(fbb, task_id); fbb.Finish(uncommitted_lineage_message); uncommitted_lineage = Lineage( *flatbuffers::GetRoot(fbb.GetBufferPointer())); - lineage_cache_.AddWaitingTask(*it, uncommitted_lineage); + ASSERT_TRUE(lineage_cache_.AddWaitingTask(*it, uncommitted_lineage)); } } @@ -350,11 +350,11 @@ TEST_F(LineageCacheTest, TestForwardTask) { auto task_id_to_remove = forwarded_task.GetTaskSpecification().TaskId(); auto uncommitted_lineage = lineage_cache_.GetUncommittedLineage(task_id_to_remove, ClientID::nil()); - lineage_cache_.RemoveWaitingTask(task_id_to_remove); + ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id_to_remove)); // Simulate executing the remaining tasks. for (const auto &task : tasks) { - lineage_cache_.AddReadyTask(task); + ASSERT_TRUE(lineage_cache_.AddReadyTask(task)); } // Check that the first task, which has no dependencies can be flushed. The // last task cannot be flushed since one of its dependencies has not been @@ -389,7 +389,7 @@ TEST_F(LineageCacheTest, TestEviction) { // Simulate forwarding the chain of tasks to a remote node. for (const auto &task : tasks) { auto task_id = task.GetTaskSpecification().TaskId(); - lineage_cache_.RemoveWaitingTask(task_id); + ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id)); } // Check that the last task in the chain still has all tasks in its @@ -441,7 +441,7 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) { // Simulate forwarding the chain of tasks to a remote node. for (const auto &task : tasks) { auto task_id = task.GetTaskSpecification().TaskId(); - lineage_cache_.RemoveWaitingTask(task_id); + ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id)); } // Check that we requested at most 2 notifications ASSERT_TRUE(mock_gcs_.NumRequestedNotifications() <= 2); @@ -493,7 +493,7 @@ TEST_F(LineageCacheTest, TestEvictionUncommittedChildren) { // Simulate forwarding the chain of tasks to a remote node. for (const auto &task : tasks) { auto task_id = task.GetTaskSpecification().TaskId(); - lineage_cache_.RemoveWaitingTask(task_id); + ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id)); } // Add more tasks to the lineage cache that will remain local. Each of these @@ -501,27 +501,19 @@ TEST_F(LineageCacheTest, TestEvictionUncommittedChildren) { for (const auto &task : tasks) { auto return_id = task.GetTaskSpecification().ReturnId(0); auto dependent_task = ExampleTask({return_id}, 1); - lineage_cache_.AddWaitingTask(dependent_task, Lineage()); - lineage_cache_.AddReadyTask(dependent_task); + ASSERT_TRUE(lineage_cache_.AddWaitingTask(dependent_task, Lineage())); + ASSERT_TRUE(lineage_cache_.AddReadyTask(dependent_task)); // Once the forwarded tasks are evicted from the lineage cache, we expect // each of these dependent tasks to be flushed, since all of their // dependencies have been committed. num_tasks_flushed++; } - // Check that the last task in the chain still has all tasks in its - // uncommitted lineage. - const auto last_task_id = tasks.back().GetTaskSpecification().TaskId(); - auto uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil()); - ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size); - // Simulate executing the last task on a remote node and adding it to the // GCS. auto task_data = std::make_shared(); auto it = tasks.rbegin(); RAY_CHECK_OK(mock_gcs_.RemoteAdd(it->GetTaskSpecification().TaskId(), task_data)); - it++; // We expect the task that was added remotely to be flushed. num_tasks_flushed++; // Check that once the last task in the forwarded chain is flushed, all local diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index fa88400eb..fb118a955 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -401,7 +401,11 @@ void NodeManager::HandleActorCreation(const ActorID &actor_id, // known. auto created_actor_methods = local_queues_.RemoveTasks(created_actor_method_ids); for (const auto &method : created_actor_methods) { - lineage_cache_.RemoveWaitingTask(method.GetTaskSpecification().TaskId()); + if (!lineage_cache_.RemoveWaitingTask(method.GetTaskSpecification().TaskId())) { + RAY_LOG(WARNING) << "Task " << method.GetTaskSpecification().TaskId() + << " already removed from the lineage cache. This is most " + "likely due to reconstruction."; + } // The task's uncommitted lineage was already added to the local lineage // cache upon the initial submission, so it's okay to resubmit it with an // empty lineage this time. @@ -824,15 +828,20 @@ void NodeManager::TreatTaskAsFailed(const TaskSpecification &spec) { void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineage, bool forwarded) { - if (local_queues_.HasTask(task.GetTaskSpecification().TaskId())) { - RAY_LOG(WARNING) << "Submitted task " << task.GetTaskSpecification().TaskId() + const TaskID task_id = task.GetTaskSpecification().TaskId(); + if (local_queues_.HasTask(task_id)) { + RAY_LOG(WARNING) << "Submitted task " << task_id << " is already queued and will not be reconstructed. This is most " "likely due to spurious reconstruction."; return; } // Add the task and its uncommitted lineage to the lineage cache. - lineage_cache_.AddWaitingTask(task, uncommitted_lineage); + if (!lineage_cache_.AddWaitingTask(task, uncommitted_lineage)) { + RAY_LOG(WARNING) + << "Task " << task_id + << " already in lineage cache. This is most likely due to reconstruction."; + } const TaskSpecification &spec = task.GetTaskSpecification(); if (spec.IsActorTask()) { @@ -1082,7 +1091,11 @@ void NodeManager::AssignTask(Task &task) { actor_entry->second.ExtendFrontier(spec.ActorHandleId(), spec.ActorDummyObject()); } // We started running the task, so the task is ready to write to GCS. - lineage_cache_.AddReadyTask(task); + if (!lineage_cache_.AddReadyTask(task)) { + RAY_LOG(WARNING) + << "Task " << spec.TaskId() + << " already in lineage cache. This is most likely due to reconstruction."; + } // Mark the task as running. // (See design_docs/task_states.rst for the state transition diagram.) local_queues_.QueueRunningTasks(std::vector({task})); @@ -1158,6 +1171,8 @@ void NodeManager::FinishAssignedTask(Worker &worker) { } void NodeManager::HandleTaskReconstruction(const TaskID &task_id) { + RAY_LOG(INFO) << "Reconstructing task " << task_id << " on client " + << gcs_client_->client_table().GetLocalClientId(); // Retrieve the task spec in order to re-execute the task. RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup( JobID::nil(), task_id, @@ -1335,7 +1350,11 @@ ray::Status NodeManager::ForwardTask(const Task &task, const ClientID &node_id) // If we were able to forward the task, remove the forwarded task from the // lineage cache since the receiving node is now responsible for writing // the task to the GCS. - lineage_cache_.RemoveWaitingTask(task_id); + if (!lineage_cache_.RemoveWaitingTask(task_id)) { + RAY_LOG(WARNING) << "Task " << task_id << " already removed from the lineage " + "cache. This is most likely due to " + "reconstruction."; + } // Mark as forwarded so that the task and its lineage is not re-forwarded // in the future to the receiving node. lineage_cache_.MarkTaskAsForwarded(task_id, node_id);