mirror of
https://github.com/wassname/ray.git
synced 2026-07-03 21:48:43 +08:00
[xray] Reduce fatal checks in the lineage cache that fail during reconstruction (#2642)
* Loosen checks in the lineage cache and log appropriate warnings in the node manager * revert test
This commit is contained in:
committed by
Philipp Moritz
parent
4bd98eed45
commit
dede80f3df
@@ -171,8 +171,9 @@ void MergeLineageHelper(const TaskID &task_id, const Lineage &lineage_from,
|
||||
}
|
||||
}
|
||||
|
||||
void LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_lineage) {
|
||||
bool LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_lineage) {
|
||||
auto task_id = task.GetTaskSpecification().TaskId();
|
||||
RAY_LOG(DEBUG) << "add waiting task " << task_id << " on " << client_id_;
|
||||
// Merge the uncommitted lineage into the lineage cache.
|
||||
MergeLineageHelper(task_id, uncommitted_lineage, lineage_,
|
||||
[](const LineageEntry &entry) {
|
||||
@@ -186,36 +187,38 @@ void LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_l
|
||||
return false;
|
||||
});
|
||||
|
||||
// If the task was previously remote, then we may have been subscribed to
|
||||
// it. Unsubscribe since we are now responsible for committing the task.
|
||||
auto entry = lineage_.GetEntry(task_id);
|
||||
if (entry) {
|
||||
RAY_CHECK(entry->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE);
|
||||
UnsubscribeTask(task_id);
|
||||
if (entry->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE) {
|
||||
// The task was previously remote, so we may have been subscribed to it.
|
||||
// Unsubscribe since we are now responsible for committing the task.
|
||||
UnsubscribeTask(task_id);
|
||||
}
|
||||
}
|
||||
|
||||
// Add the submitted task to the lineage cache as UNCOMMITTED_WAITING. It
|
||||
// should be marked as UNCOMMITTED_READY once the task starts execution.
|
||||
RAY_CHECK(lineage_.SetEntry(task, GcsStatus::UNCOMMITTED_WAITING));
|
||||
return lineage_.SetEntry(task, GcsStatus::UNCOMMITTED_WAITING);
|
||||
}
|
||||
|
||||
void LineageCache::AddReadyTask(const Task &task) {
|
||||
bool LineageCache::AddReadyTask(const Task &task) {
|
||||
const TaskID task_id = task.GetTaskSpecification().TaskId();
|
||||
RAY_LOG(DEBUG) << "add ready task " << task_id << " on " << client_id_;
|
||||
|
||||
// Tasks can only become READY if they were in WAITING.
|
||||
auto entry = lineage_.GetEntryMutable(task_id);
|
||||
RAY_CHECK(entry);
|
||||
RAY_CHECK(entry->GetStatus() == GcsStatus::UNCOMMITTED_WAITING);
|
||||
|
||||
entry->SetStatus(GcsStatus::UNCOMMITTED_READY);
|
||||
// TaskSepc is immutable, just update TaskExecSpec.
|
||||
entry->TaskDataMutable().CopyTaskExecutionSpec(task);
|
||||
// Attempt to flush the task.
|
||||
bool flushed = FlushTask(task_id);
|
||||
if (!flushed) {
|
||||
// If we fail to flush the task here, due to uncommitted parents, then add
|
||||
// the task to a cache to be flushed in the future.
|
||||
uncommitted_ready_tasks_.insert(task_id);
|
||||
// Set the task to READY.
|
||||
if (lineage_.SetEntry(task, GcsStatus::UNCOMMITTED_READY)) {
|
||||
// Attempt to flush the task.
|
||||
bool flushed = FlushTask(task_id);
|
||||
if (!flushed) {
|
||||
// If we fail to flush the task here, due to uncommitted parents, then add
|
||||
// the task to a cache to be flushed in the future.
|
||||
uncommitted_ready_tasks_.insert(task_id);
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
// The task was already ready to be committed (UNCOMMITTED_READY) or
|
||||
// committing (COMMITTING).
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -229,7 +232,9 @@ uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id,
|
||||
return 0;
|
||||
}
|
||||
auto entry = lineage_.GetEntry(task_id);
|
||||
if (!entry) {
|
||||
// Only count tasks that are remote. Tasks that are local will be evicted
|
||||
// once they are committed in the GCS, along with their lineage.
|
||||
if (!entry || entry->GetStatus() != GcsStatus::UNCOMMITTED_REMOTE) {
|
||||
return 0;
|
||||
}
|
||||
uint64_t cnt = 1;
|
||||
@@ -239,36 +244,58 @@ uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id,
|
||||
return cnt;
|
||||
}
|
||||
|
||||
void LineageCache::RemoveWaitingTask(const TaskID &task_id) {
|
||||
bool LineageCache::RemoveWaitingTask(const TaskID &task_id) {
|
||||
RAY_LOG(DEBUG) << "remove waiting task " << task_id << " on " << client_id_;
|
||||
auto entry = lineage_.GetEntryMutable(task_id);
|
||||
if (!entry) {
|
||||
return;
|
||||
// The task was already evicted.
|
||||
return false;
|
||||
}
|
||||
|
||||
// If the task is already not in WAITING status, then exit. This should only
|
||||
// happen when there are two copies of the task executing at the node, due to
|
||||
// a spurious reconstruction. Then, either the task is already past WAITING
|
||||
// status, in which case it will be committed, or it is in
|
||||
// UNCOMMITTED_REMOTE, in which case it was already removed.
|
||||
if (entry->GetStatus() != GcsStatus::UNCOMMITTED_WAITING) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// It's only okay to remove a task that is waiting for execution.
|
||||
// TODO(swang): Is this necessarily true when there is reconstruction?
|
||||
RAY_CHECK(entry->GetStatus() == GcsStatus::UNCOMMITTED_WAITING);
|
||||
// Reset the status to REMOTE. We keep the task instead of removing it
|
||||
// completely in case another task is submitted locally that depends on this
|
||||
// one.
|
||||
entry->ResetStatus(GcsStatus::UNCOMMITTED_REMOTE);
|
||||
|
||||
// Request a notification for every max_lineage_size_ tasks,
|
||||
// so that the task and its uncommitted lineage can be evicted
|
||||
// once the commit notification is received.
|
||||
// By doing this, we make sure that the unevicted lineage won't be more than
|
||||
// max_lineage_size_, and the number of subscribed tasks won't be more than
|
||||
// N / max_lineage_size_, where N is the size of the task chain.
|
||||
// NOTE(swang): The number of entries in the uncommitted lineage also
|
||||
// includes local tasks that haven't been committed yet, not just remote
|
||||
// tasks, so this is an overestimate.
|
||||
std::unordered_set<TaskID> seen;
|
||||
auto count = CountUnsubscribedLineage(task_id, seen);
|
||||
if (count > max_lineage_size_) {
|
||||
// Since this task was in state WAITING, check that we were not
|
||||
// already subscribed to the task.
|
||||
// Subscribe to the task if necessary. We do this if it has any local
|
||||
// children that must be written to the GCS, or if its uncommitted remote
|
||||
// lineage is too large.
|
||||
if (uncommitted_ready_children_.find(task_id) != uncommitted_ready_children_.end()) {
|
||||
// Subscribe to the task if it has any children in UNCOMMITTED_READY. We
|
||||
// will attempt to flush its children once we receive a notification for
|
||||
// this task's commit. Since this task was in state WAITING, check that we
|
||||
// were not already subscribed to the task.
|
||||
RAY_CHECK(SubscribeTask(task_id));
|
||||
} else {
|
||||
// Check if the uncommitted remote lineage is too large. Request a
|
||||
// notification for every max_lineage_size_ tasks, so that the task and its
|
||||
// uncommitted lineage can be evicted once the commit notification is
|
||||
// received. By doing this, we make sure that the unevicted lineage won't
|
||||
// be more than max_lineage_size_, and the number of subscribed tasks won't
|
||||
// be more than N / max_lineage_size_, where N is the size of the task
|
||||
// chain.
|
||||
// NOTE(swang): The number of entries in the uncommitted lineage also
|
||||
// includes local tasks that haven't been committed yet, not just remote
|
||||
// tasks, so this is an overestimate.
|
||||
std::unordered_set<TaskID> seen;
|
||||
auto count = CountUnsubscribedLineage(task_id, seen);
|
||||
if (count >= max_lineage_size_) {
|
||||
// Since this task was in state WAITING, check that we were not
|
||||
// already subscribed to the task.
|
||||
RAY_CHECK(SubscribeTask(task_id));
|
||||
}
|
||||
}
|
||||
// The task was successfully reset to UNCOMMITTED_REMOTE.
|
||||
return true;
|
||||
}
|
||||
|
||||
void LineageCache::MarkTaskAsForwarded(const TaskID &task_id, const ClientID &node_id) {
|
||||
@@ -305,8 +332,6 @@ bool LineageCache::FlushTask(const TaskID &task_id) {
|
||||
// committed yet, then as far as we know, it's still in flight to the
|
||||
// GCS. Skip this task for now.
|
||||
if (parent) {
|
||||
RAY_CHECK(parent->GetStatus() != GcsStatus::UNCOMMITTED_WAITING)
|
||||
<< "Children should not become ready to flush before their parents.";
|
||||
// Request notifications about the parent entry's commit in the GCS if
|
||||
// the parent is remote. Otherwise, the parent is local and will
|
||||
// eventually be flushed. In either case, once we receive a
|
||||
@@ -389,8 +414,17 @@ bool LineageCache::UnsubscribeTask(const TaskID &task_id) {
|
||||
}
|
||||
|
||||
boost::optional<LineageEntry> LineageCache::EvictTask(const TaskID &task_id) {
|
||||
RAY_LOG(DEBUG) << "evicting task " << task_id << " on " << client_id_;
|
||||
auto entry = lineage_.PopEntry(task_id);
|
||||
if (!entry) {
|
||||
// The entry has already been evicted. Check that the entry does not have
|
||||
// any dependent tasks, since we should've already attempted to flush these
|
||||
// tasks on the first eviction.
|
||||
RAY_CHECK(uncommitted_ready_children_.count(task_id) == 0);
|
||||
// Check that we already unsubscribed from the task when handling the
|
||||
// first eviction.
|
||||
RAY_CHECK(subscribed_tasks_.count(task_id) == 0);
|
||||
// Do nothing if the entry has already been evicted.
|
||||
return entry;
|
||||
}
|
||||
|
||||
@@ -421,18 +455,19 @@ boost::optional<LineageEntry> LineageCache::EvictTask(const TaskID &task_id) {
|
||||
}
|
||||
|
||||
void LineageCache::EvictRemoteLineage(const TaskID &task_id) {
|
||||
// Remove the ancestor task.
|
||||
auto entry = EvictTask(task_id);
|
||||
auto entry = lineage_.GetEntry(task_id);
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
// Tasks are committed in data dependency order per node, so the only
|
||||
// ancestors of a committed task should be other remote tasks.
|
||||
auto status = entry->GetStatus();
|
||||
RAY_CHECK(status == GcsStatus::UNCOMMITTED_REMOTE);
|
||||
// Recurse and remove this task's ancestors.
|
||||
for (const auto &parent_id : entry->GetParentTaskIds()) {
|
||||
EvictRemoteLineage(parent_id);
|
||||
// Only evict tasks that are remote. Other tasks, and their lineage, will be
|
||||
// evicted once they are committed.
|
||||
if (entry->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE) {
|
||||
// Remove the ancestor task.
|
||||
auto evicted_entry = EvictTask(task_id);
|
||||
// Recurse and remove this task's ancestors.
|
||||
for (const auto &parent_id : evicted_entry->GetParentTaskIds()) {
|
||||
EvictRemoteLineage(parent_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -440,20 +475,16 @@ void LineageCache::HandleEntryCommitted(const TaskID &task_id) {
|
||||
RAY_LOG(DEBUG) << "task committed: " << task_id;
|
||||
auto entry = EvictTask(task_id);
|
||||
if (!entry) {
|
||||
// The committed entry has already been evicted. Check that the committed
|
||||
// entry does not have any dependent tasks, since we should've already
|
||||
// attempted to flush these tasks on the first commit notification.
|
||||
RAY_CHECK(uncommitted_ready_children_.count(task_id) == 0);
|
||||
// Check that we already unsubscribed from the task when handling the
|
||||
// first commit notification.
|
||||
RAY_CHECK(subscribed_tasks_.count(task_id) == 0);
|
||||
// Do nothing if the committed entry has already been evicted.
|
||||
// The task has already been evicted due to a previous commit notification,
|
||||
// or because one of its descendants was committed.
|
||||
return;
|
||||
}
|
||||
|
||||
// Evict the committed task's uncommitted lineage. Since local tasks are
|
||||
// written in data dependency order, the uncommitted lineage should only
|
||||
// include remote tasks, i.e. tasks that were committed by a different node.
|
||||
// In case of reconstruction, the uncommitted lineage may also include local
|
||||
// tasks that were resubmitted. These tasks are not evicted.
|
||||
for (const auto &parent_id : entry->GetParentTaskIds()) {
|
||||
EvictRemoteLineage(parent_id);
|
||||
}
|
||||
|
||||
@@ -184,19 +184,30 @@ class LineageCache {
|
||||
/// \param uncommitted_lineage The task's uncommitted lineage. These are the
|
||||
/// tasks that the given task is data-dependent on, but that have not
|
||||
/// been made durable in the GCS, as far the task's submitter knows.
|
||||
void AddWaitingTask(const Task &task, const Lineage &uncommitted_lineage);
|
||||
/// \return Whether the task was successfully marked as waiting to be
|
||||
/// committed. This will return false if the task is already waiting to be
|
||||
/// committed (UNCOMMITTED_WAITING), ready to be committed
|
||||
/// (UNCOMMITTED_READY), or committing (COMMITTING).
|
||||
bool AddWaitingTask(const Task &task, const Lineage &uncommitted_lineage);
|
||||
|
||||
/// Add a task that is ready for GCS writeback. This overwrites the task’s
|
||||
/// mutable fields in the execution specification.
|
||||
///
|
||||
/// \param task The task to set as ready.
|
||||
void AddReadyTask(const Task &task);
|
||||
/// \return Whether the task was successfully marked as ready to be
|
||||
/// committed. This will return false if the task is already ready to be
|
||||
/// committed (UNCOMMITTED_READY) or committing (COMMITTING).
|
||||
bool AddReadyTask(const Task &task);
|
||||
|
||||
/// Remove a task that was waiting for execution. Its uncommitted lineage
|
||||
/// will remain unchanged.
|
||||
///
|
||||
/// \param task_id The ID of the waiting task to remove.
|
||||
void RemoveWaitingTask(const TaskID &task_id);
|
||||
/// \return Whether the task was successfully removed. This will return false
|
||||
/// if the task is not waiting to be committed. Then, the waiting task has
|
||||
/// already been removed (UNCOMMITTED_REMOTE), or if it's ready to be
|
||||
/// committed (UNCOMMITTED_READY) or committing (COMMITTING).
|
||||
bool RemoveWaitingTask(const TaskID &task_id);
|
||||
|
||||
/// Mark a task as having been explicitly forwarded to a node.
|
||||
/// The lineage of the task is implicitly assumed to have also been forwarded.
|
||||
|
||||
@@ -129,7 +129,7 @@ std::vector<ObjectID> InsertTaskChain(LineageCache &lineage_cache,
|
||||
std::vector<ObjectID> arguments = initial_arguments;
|
||||
for (int i = 0; i < chain_size; i++) {
|
||||
auto task = ExampleTask(arguments, num_returns);
|
||||
lineage_cache.AddWaitingTask(task, empty_lineage);
|
||||
RAY_CHECK(lineage_cache.AddWaitingTask(task, empty_lineage));
|
||||
inserted_tasks.push_back(task);
|
||||
arguments.clear();
|
||||
for (int j = 0; j < task.GetTaskSpecification().NumReturns(); j++) {
|
||||
@@ -245,7 +245,7 @@ TEST_F(LineageCacheTest, TestWritebackReady) {
|
||||
InsertTaskChain(lineage_cache_, tasks, 3, std::vector<ObjectID>(), 1);
|
||||
|
||||
// Check that after marking the first task as ready, we flush only that task.
|
||||
lineage_cache_.AddReadyTask(tasks.front());
|
||||
ASSERT_TRUE(lineage_cache_.AddReadyTask(tasks.front()));
|
||||
num_tasks_flushed++;
|
||||
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
|
||||
}
|
||||
@@ -259,7 +259,7 @@ TEST_F(LineageCacheTest, TestWritebackOrder) {
|
||||
// Mark all tasks as ready. The first task, which has no dependencies, should
|
||||
// be flushed.
|
||||
for (const auto &task : tasks) {
|
||||
lineage_cache_.AddReadyTask(task);
|
||||
ASSERT_TRUE(lineage_cache_.AddReadyTask(task));
|
||||
}
|
||||
// Check that we write back the tasks in order of data dependencies.
|
||||
for (size_t i = 0; i < tasks.size(); i++) {
|
||||
@@ -287,20 +287,20 @@ TEST_F(LineageCacheTest, TestWritebackPartiallyReady) {
|
||||
auto dependencies = dependent_task.GetDependencies();
|
||||
|
||||
// Insert all tasks as waiting for execution.
|
||||
lineage_cache_.AddWaitingTask(task1, Lineage());
|
||||
lineage_cache_.AddWaitingTask(task2, Lineage());
|
||||
lineage_cache_.AddWaitingTask(dependent_task, Lineage());
|
||||
ASSERT_TRUE(lineage_cache_.AddWaitingTask(task1, Lineage()));
|
||||
ASSERT_TRUE(lineage_cache_.AddWaitingTask(task2, Lineage()));
|
||||
ASSERT_TRUE(lineage_cache_.AddWaitingTask(dependent_task, Lineage()));
|
||||
|
||||
// Flush one of the independent tasks.
|
||||
lineage_cache_.AddReadyTask(task1);
|
||||
ASSERT_TRUE(lineage_cache_.AddReadyTask(task1));
|
||||
num_tasks_flushed++;
|
||||
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
|
||||
// Flush acknowledgements. The lineage cache should receive the commit for
|
||||
// the first task.
|
||||
mock_gcs_.Flush();
|
||||
// Mark the other independent task and the dependent as ready.
|
||||
lineage_cache_.AddReadyTask(task2);
|
||||
lineage_cache_.AddReadyTask(dependent_task);
|
||||
ASSERT_TRUE(lineage_cache_.AddReadyTask(task2));
|
||||
ASSERT_TRUE(lineage_cache_.AddReadyTask(dependent_task));
|
||||
// Two tasks are ready, but only the independent task should be flushed. The
|
||||
// dependent task should only be flushed once commits for both independent
|
||||
// tasks are received.
|
||||
@@ -326,14 +326,14 @@ TEST_F(LineageCacheTest, TestForwardTasksRoundTrip) {
|
||||
// Simulate removing the task and forwarding it to another node.
|
||||
auto uncommitted_lineage =
|
||||
lineage_cache_.GetUncommittedLineage(task_id, ClientID::nil());
|
||||
lineage_cache_.RemoveWaitingTask(task_id);
|
||||
ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id));
|
||||
// Simulate receiving the task again. Make sure we can add the task back.
|
||||
flatbuffers::FlatBufferBuilder fbb;
|
||||
auto uncommitted_lineage_message = uncommitted_lineage.ToFlatbuffer(fbb, task_id);
|
||||
fbb.Finish(uncommitted_lineage_message);
|
||||
uncommitted_lineage = Lineage(
|
||||
*flatbuffers::GetRoot<protocol::ForwardTaskRequest>(fbb.GetBufferPointer()));
|
||||
lineage_cache_.AddWaitingTask(*it, uncommitted_lineage);
|
||||
ASSERT_TRUE(lineage_cache_.AddWaitingTask(*it, uncommitted_lineage));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -350,11 +350,11 @@ TEST_F(LineageCacheTest, TestForwardTask) {
|
||||
auto task_id_to_remove = forwarded_task.GetTaskSpecification().TaskId();
|
||||
auto uncommitted_lineage =
|
||||
lineage_cache_.GetUncommittedLineage(task_id_to_remove, ClientID::nil());
|
||||
lineage_cache_.RemoveWaitingTask(task_id_to_remove);
|
||||
ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id_to_remove));
|
||||
|
||||
// Simulate executing the remaining tasks.
|
||||
for (const auto &task : tasks) {
|
||||
lineage_cache_.AddReadyTask(task);
|
||||
ASSERT_TRUE(lineage_cache_.AddReadyTask(task));
|
||||
}
|
||||
// Check that the first task, which has no dependencies can be flushed. The
|
||||
// last task cannot be flushed since one of its dependencies has not been
|
||||
@@ -389,7 +389,7 @@ TEST_F(LineageCacheTest, TestEviction) {
|
||||
// Simulate forwarding the chain of tasks to a remote node.
|
||||
for (const auto &task : tasks) {
|
||||
auto task_id = task.GetTaskSpecification().TaskId();
|
||||
lineage_cache_.RemoveWaitingTask(task_id);
|
||||
ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id));
|
||||
}
|
||||
|
||||
// Check that the last task in the chain still has all tasks in its
|
||||
@@ -441,7 +441,7 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) {
|
||||
// Simulate forwarding the chain of tasks to a remote node.
|
||||
for (const auto &task : tasks) {
|
||||
auto task_id = task.GetTaskSpecification().TaskId();
|
||||
lineage_cache_.RemoveWaitingTask(task_id);
|
||||
ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id));
|
||||
}
|
||||
// Check that we requested at most 2 notifications
|
||||
ASSERT_TRUE(mock_gcs_.NumRequestedNotifications() <= 2);
|
||||
@@ -493,7 +493,7 @@ TEST_F(LineageCacheTest, TestEvictionUncommittedChildren) {
|
||||
// Simulate forwarding the chain of tasks to a remote node.
|
||||
for (const auto &task : tasks) {
|
||||
auto task_id = task.GetTaskSpecification().TaskId();
|
||||
lineage_cache_.RemoveWaitingTask(task_id);
|
||||
ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id));
|
||||
}
|
||||
|
||||
// Add more tasks to the lineage cache that will remain local. Each of these
|
||||
@@ -501,27 +501,19 @@ TEST_F(LineageCacheTest, TestEvictionUncommittedChildren) {
|
||||
for (const auto &task : tasks) {
|
||||
auto return_id = task.GetTaskSpecification().ReturnId(0);
|
||||
auto dependent_task = ExampleTask({return_id}, 1);
|
||||
lineage_cache_.AddWaitingTask(dependent_task, Lineage());
|
||||
lineage_cache_.AddReadyTask(dependent_task);
|
||||
ASSERT_TRUE(lineage_cache_.AddWaitingTask(dependent_task, Lineage()));
|
||||
ASSERT_TRUE(lineage_cache_.AddReadyTask(dependent_task));
|
||||
// Once the forwarded tasks are evicted from the lineage cache, we expect
|
||||
// each of these dependent tasks to be flushed, since all of their
|
||||
// dependencies have been committed.
|
||||
num_tasks_flushed++;
|
||||
}
|
||||
|
||||
// Check that the last task in the chain still has all tasks in its
|
||||
// uncommitted lineage.
|
||||
const auto last_task_id = tasks.back().GetTaskSpecification().TaskId();
|
||||
auto uncommitted_lineage =
|
||||
lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil());
|
||||
ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size);
|
||||
|
||||
// Simulate executing the last task on a remote node and adding it to the
|
||||
// GCS.
|
||||
auto task_data = std::make_shared<protocol::TaskT>();
|
||||
auto it = tasks.rbegin();
|
||||
RAY_CHECK_OK(mock_gcs_.RemoteAdd(it->GetTaskSpecification().TaskId(), task_data));
|
||||
it++;
|
||||
// We expect the task that was added remotely to be flushed.
|
||||
num_tasks_flushed++;
|
||||
// Check that once the last task in the forwarded chain is flushed, all local
|
||||
|
||||
@@ -401,7 +401,11 @@ void NodeManager::HandleActorCreation(const ActorID &actor_id,
|
||||
// known.
|
||||
auto created_actor_methods = local_queues_.RemoveTasks(created_actor_method_ids);
|
||||
for (const auto &method : created_actor_methods) {
|
||||
lineage_cache_.RemoveWaitingTask(method.GetTaskSpecification().TaskId());
|
||||
if (!lineage_cache_.RemoveWaitingTask(method.GetTaskSpecification().TaskId())) {
|
||||
RAY_LOG(WARNING) << "Task " << method.GetTaskSpecification().TaskId()
|
||||
<< " already removed from the lineage cache. This is most "
|
||||
"likely due to reconstruction.";
|
||||
}
|
||||
// The task's uncommitted lineage was already added to the local lineage
|
||||
// cache upon the initial submission, so it's okay to resubmit it with an
|
||||
// empty lineage this time.
|
||||
@@ -824,15 +828,20 @@ void NodeManager::TreatTaskAsFailed(const TaskSpecification &spec) {
|
||||
|
||||
void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineage,
|
||||
bool forwarded) {
|
||||
if (local_queues_.HasTask(task.GetTaskSpecification().TaskId())) {
|
||||
RAY_LOG(WARNING) << "Submitted task " << task.GetTaskSpecification().TaskId()
|
||||
const TaskID task_id = task.GetTaskSpecification().TaskId();
|
||||
if (local_queues_.HasTask(task_id)) {
|
||||
RAY_LOG(WARNING) << "Submitted task " << task_id
|
||||
<< " is already queued and will not be reconstructed. This is most "
|
||||
"likely due to spurious reconstruction.";
|
||||
return;
|
||||
}
|
||||
|
||||
// Add the task and its uncommitted lineage to the lineage cache.
|
||||
lineage_cache_.AddWaitingTask(task, uncommitted_lineage);
|
||||
if (!lineage_cache_.AddWaitingTask(task, uncommitted_lineage)) {
|
||||
RAY_LOG(WARNING)
|
||||
<< "Task " << task_id
|
||||
<< " already in lineage cache. This is most likely due to reconstruction.";
|
||||
}
|
||||
|
||||
const TaskSpecification &spec = task.GetTaskSpecification();
|
||||
if (spec.IsActorTask()) {
|
||||
@@ -1082,7 +1091,11 @@ void NodeManager::AssignTask(Task &task) {
|
||||
actor_entry->second.ExtendFrontier(spec.ActorHandleId(), spec.ActorDummyObject());
|
||||
}
|
||||
// We started running the task, so the task is ready to write to GCS.
|
||||
lineage_cache_.AddReadyTask(task);
|
||||
if (!lineage_cache_.AddReadyTask(task)) {
|
||||
RAY_LOG(WARNING)
|
||||
<< "Task " << spec.TaskId()
|
||||
<< " already in lineage cache. This is most likely due to reconstruction.";
|
||||
}
|
||||
// Mark the task as running.
|
||||
// (See design_docs/task_states.rst for the state transition diagram.)
|
||||
local_queues_.QueueRunningTasks(std::vector<Task>({task}));
|
||||
@@ -1158,6 +1171,8 @@ void NodeManager::FinishAssignedTask(Worker &worker) {
|
||||
}
|
||||
|
||||
void NodeManager::HandleTaskReconstruction(const TaskID &task_id) {
|
||||
RAY_LOG(INFO) << "Reconstructing task " << task_id << " on client "
|
||||
<< gcs_client_->client_table().GetLocalClientId();
|
||||
// Retrieve the task spec in order to re-execute the task.
|
||||
RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup(
|
||||
JobID::nil(), task_id,
|
||||
@@ -1335,7 +1350,11 @@ ray::Status NodeManager::ForwardTask(const Task &task, const ClientID &node_id)
|
||||
// If we were able to forward the task, remove the forwarded task from the
|
||||
// lineage cache since the receiving node is now responsible for writing
|
||||
// the task to the GCS.
|
||||
lineage_cache_.RemoveWaitingTask(task_id);
|
||||
if (!lineage_cache_.RemoveWaitingTask(task_id)) {
|
||||
RAY_LOG(WARNING) << "Task " << task_id << " already removed from the lineage "
|
||||
"cache. This is most likely due to "
|
||||
"reconstruction.";
|
||||
}
|
||||
// Mark as forwarded so that the task and its lineage is not re-forwarded
|
||||
// in the future to the receiving node.
|
||||
lineage_cache_.MarkTaskAsForwarded(task_id, node_id);
|
||||
|
||||
Reference in New Issue
Block a user