[xray] Try to flush children of a task that is evicted from the lineage cache (#2531)

This commit is contained in:
Stephanie Wang
2018-08-01 00:23:02 -07:00
committed by Robert Nishihara
parent 909d7172b1
commit e90ecef297
3 changed files with 110 additions and 54 deletions
+52 -49
View File
@@ -27,8 +27,8 @@ const TaskID LineageEntry::GetEntryId() const {
return task_.GetTaskSpecification().TaskId();
}
const std::unordered_set<UniqueID> LineageEntry::GetParentTaskIds() const {
std::unordered_set<UniqueID> parent_ids;
const std::unordered_set<TaskID> LineageEntry::GetParentTaskIds() const {
std::unordered_set<TaskID> parent_ids;
// A task's parents are the tasks that created its arguments.
auto dependencies = task_.GetDependencies();
for (auto &dependency : dependencies) {
@@ -52,7 +52,7 @@ Lineage::Lineage(const protocol::ForwardTaskRequest &task_request) {
}
}
boost::optional<const LineageEntry &> Lineage::GetEntry(const UniqueID &task_id) const {
boost::optional<const LineageEntry &> Lineage::GetEntry(const TaskID &task_id) const {
auto entry = entries_.find(task_id);
if (entry != entries_.end()) {
return entry->second;
@@ -61,7 +61,7 @@ boost::optional<const LineageEntry &> Lineage::GetEntry(const UniqueID &task_id)
}
}
boost::optional<LineageEntry &> Lineage::GetEntryMutable(const UniqueID &task_id) {
boost::optional<LineageEntry &> Lineage::GetEntryMutable(const TaskID &task_id) {
auto entry = entries_.find(task_id);
if (entry != entries_.end()) {
return entry->second;
@@ -89,7 +89,7 @@ bool Lineage::SetEntry(const Task &task, GcsStatus status) {
}
}
boost::optional<LineageEntry> Lineage::PopEntry(const UniqueID &task_id) {
boost::optional<LineageEntry> Lineage::PopEntry(const TaskID &task_id) {
auto entry = entries_.find(task_id);
if (entry != entries_.end()) {
LineageEntry entry = std::move(entries_.at(task_id));
@@ -100,7 +100,7 @@ boost::optional<LineageEntry> Lineage::PopEntry(const UniqueID &task_id) {
}
}
const std::unordered_map<const UniqueID, LineageEntry> &Lineage::GetEntries() const {
const std::unordered_map<const TaskID, LineageEntry> &Lineage::GetEntries() const {
return entries_;
}
@@ -137,7 +137,7 @@ LineageCache::LineageCache(const ClientID &client_id,
/// \param lineage_to The lineage to merge entries into.
/// \param stopping_condition A stopping condition for the DFS over
/// lineage_from. This should return true if the merge should stop.
void MergeLineageHelper(const UniqueID &task_id, const Lineage &lineage_from,
void MergeLineageHelper(const TaskID &task_id, const Lineage &lineage_from,
Lineage &lineage_to,
std::function<bool(GcsStatus)> stopping_condition) {
// If the entry is not found in the lineage to merge, then we stop since
@@ -338,7 +338,7 @@ void LineageCache::Flush() {
}
}
bool LineageCache::SubscribeTask(const UniqueID &task_id) {
bool LineageCache::SubscribeTask(const TaskID &task_id) {
auto inserted = subscribed_tasks_.insert(task_id);
bool unsubscribed = inserted.second;
if (unsubscribed) {
@@ -351,7 +351,7 @@ bool LineageCache::SubscribeTask(const UniqueID &task_id) {
return unsubscribed;
}
bool LineageCache::UnsubscribeTask(const UniqueID &task_id) {
bool LineageCache::UnsubscribeTask(const TaskID &task_id) {
auto it = subscribed_tasks_.find(task_id);
bool subscribed = (it != subscribed_tasks_.end());
if (subscribed) {
@@ -365,48 +365,10 @@ bool LineageCache::UnsubscribeTask(const UniqueID &task_id) {
return subscribed;
}
void LineageCache::EvictRemoteLineage(const UniqueID &task_id) {
// Remove the ancestor task.
boost::optional<LineageEntry> LineageCache::EvictTask(const TaskID &task_id) {
auto entry = lineage_.PopEntry(task_id);
if (!entry) {
return;
}
// Tasks are committed in data dependency order per node, so the only
// ancestors of a committed task should be other remote tasks.
auto status = entry->GetStatus();
RAY_CHECK(status == GcsStatus::UNCOMMITTED_REMOTE);
// We are evicting the remote ancestors of a task, so there should not be
// any dependent tasks that need to be flushed.
RAY_CHECK(uncommitted_ready_children_.count(task_id) == 0);
// Unsubscribe from the remote ancestor task if we were subscribed to
// notifications.
UnsubscribeTask(task_id);
// Recurse and remove this task's ancestors.
for (const auto &parent_id : entry->GetParentTaskIds()) {
EvictRemoteLineage(parent_id);
}
}
void LineageCache::HandleEntryCommitted(const UniqueID &task_id) {
RAY_LOG(DEBUG) << "task committed: " << task_id;
auto entry = lineage_.PopEntry(task_id);
if (!entry) {
// The committed entry has already been evicted. Check that the committed
// entry does not have any dependent tasks, since we should've already
// attempted to flush these tasks on the first commit notification.
RAY_CHECK(uncommitted_ready_children_.count(task_id) == 0);
// Check that we already unsubscribed from the task when handling the
// first commit notification.
RAY_CHECK(subscribed_tasks_.count(task_id) == 0);
// Do nothing if the committed entry has already been evicted.
return;
}
// Evict the committed task's uncommitted lineage. Since local tasks are
// written in data dependency order, the uncommitted lineage should only
// include remote tasks, i.e. tasks that were committed by a different node.
for (const auto &parent_id : entry->GetParentTaskIds()) {
EvictRemoteLineage(parent_id);
return entry;
}
// Stop listening for notifications about this task.
@@ -431,6 +393,47 @@ void LineageCache::HandleEntryCommitted(const UniqueID &task_id) {
}
}
}
return entry;
}
void LineageCache::EvictRemoteLineage(const TaskID &task_id) {
// Remove the ancestor task.
auto entry = EvictTask(task_id);
if (!entry) {
return;
}
// Tasks are committed in data dependency order per node, so the only
// ancestors of a committed task should be other remote tasks.
auto status = entry->GetStatus();
RAY_CHECK(status == GcsStatus::UNCOMMITTED_REMOTE);
// Recurse and remove this task's ancestors.
for (const auto &parent_id : entry->GetParentTaskIds()) {
EvictRemoteLineage(parent_id);
}
}
void LineageCache::HandleEntryCommitted(const TaskID &task_id) {
RAY_LOG(DEBUG) << "task committed: " << task_id;
auto entry = EvictTask(task_id);
if (!entry) {
// The committed entry has already been evicted. Check that the committed
// entry does not have any dependent tasks, since we should've already
// attempted to flush these tasks on the first commit notification.
RAY_CHECK(uncommitted_ready_children_.count(task_id) == 0);
// Check that we already unsubscribed from the task when handling the
// first commit notification.
RAY_CHECK(subscribed_tasks_.count(task_id) == 0);
// Do nothing if the committed entry has already been evicted.
return;
}
// Evict the committed task's uncommitted lineage. Since local tasks are
// written in data dependency order, the uncommitted lineage should only
// include remote tasks, i.e. tasks that were committed by a different node.
for (const auto &parent_id : entry->GetParentTaskIds()) {
EvictRemoteLineage(parent_id);
}
}
} // namespace raylet
+11 -5
View File
@@ -113,7 +113,7 @@ class Lineage {
/// \return An optional reference to the entry. If this is empty, then the
/// entry ID is not in the lineage.
boost::optional<const LineageEntry &> GetEntry(const TaskID &entry_id) const;
boost::optional<LineageEntry &> GetEntryMutable(const UniqueID &task_id);
boost::optional<LineageEntry &> GetEntryMutable(const TaskID &task_id);
/// Set an entry in the lineage. If an entry with this ID already exists,
/// then the entry is overwritten if and only if the new entry has a higher
@@ -211,17 +211,23 @@ class LineageCache {
/// parents that are not committed yet, then the child will be flushed once
/// the parents have been committed.
bool FlushTask(const TaskID &task_id);
/// Evict a single task. This should only be called if we are sure that the
/// task has been committed and will trigger an attempt to flush any of the
/// evicted task's children that are in UNCOMMITTED_READY state. Returns an
/// optional reference to the evicted task that is empty if the task was not
/// in the lineage cache.
boost::optional<LineageEntry> EvictTask(const TaskID &task_id);
/// Evict a remote task and its lineage. This should only be called if we
/// are sure that the remote task and its lineage are committed.
void EvictRemoteLineage(const UniqueID &task_id);
void EvictRemoteLineage(const TaskID &task_id);
/// Subscribe to notifications for a task. Returns whether the operation
/// was successful (whether we were not already subscribed).
bool SubscribeTask(const UniqueID &task_id);
bool SubscribeTask(const TaskID &task_id);
/// Unsubscribe from notifications for a task. Returns whether the operation
/// was successful (whether we were subscribed).
bool UnsubscribeTask(const UniqueID &task_id);
bool UnsubscribeTask(const TaskID &task_id);
/// Count the size of unsubscribed and uncommitted lineage
uint64_t CountUnsubscribedLineage(const UniqueID &task_id) const;
uint64_t CountUnsubscribedLineage(const TaskID &task_id) const;
/// The client ID, used to request notifications for specific tasks.
/// TODO(swang): Move the ClientID into the generic Table implementation.
+47
View File
@@ -450,6 +450,53 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) {
ASSERT_TRUE(uncommitted_lineage.GetEntries().size() <= max_lineage_size_);
}
TEST_F(LineageCacheTest, TestEvictionUncommittedChildren) {
// Insert a chain of dependent tasks.
size_t num_tasks_flushed = 0;
uint64_t lineage_size = max_lineage_size_ + 1;
std::vector<Task> tasks;
InsertTaskChain(lineage_cache_, tasks, lineage_size, std::vector<ObjectID>(), 1);
// Simulate forwarding the chain of tasks to a remote node.
for (const auto &task : tasks) {
auto task_id = task.GetTaskSpecification().TaskId();
lineage_cache_.RemoveWaitingTask(task_id);
}
// Add more tasks to the lineage cache that will remain local. Each of these
// tasks is dependent one of the tasks that was forwarded above.
for (const auto &task : tasks) {
auto return_id = task.GetTaskSpecification().ReturnId(0);
auto dependent_task = ExampleTask({return_id}, 1);
lineage_cache_.AddWaitingTask(dependent_task, Lineage());
lineage_cache_.AddReadyTask(dependent_task);
// Once the forwarded tasks are evicted from the lineage cache, we expect
// each of these dependent tasks to be flushed, since all of their
// dependencies have been committed.
num_tasks_flushed++;
}
// Check that the last task in the chain still has all tasks in its
// uncommitted lineage.
const auto last_task_id = tasks.back().GetTaskSpecification().TaskId();
auto uncommitted_lineage = lineage_cache_.GetUncommittedLineage(last_task_id);
ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size);
// Simulate executing the last task on a remote node and adding it to the
// GCS.
auto task_data = std::make_shared<protocol::TaskT>();
auto it = tasks.rbegin();
RAY_CHECK_OK(mock_gcs_.RemoteAdd(it->GetTaskSpecification().TaskId(), task_data));
it++;
// We expect the task that was added remotely to be flushed.
num_tasks_flushed++;
// Check that once the last task in the forwarded chain is flushed, all local
// tasks are flushed, since all of their dependencies have been evicted and
// are therefore committed in the GCS.
mock_gcs_.Flush();
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
}
} // namespace raylet
} // namespace ray