[core] Revert lineage pinning (#7499) (#7692)

* Revert "fix (#7681)"

This reverts commit 6a12a31b2e.

* Revert "[core] Pin lineage of plasma objects that are still in scope (#7499)"

This reverts commit 014929e658.
This commit is contained in:
Stephanie Wang
2020-03-21 18:35:43 -07:00
committed by GitHub
parent 89d959fd6a
commit ba86a02b37
15 changed files with 155 additions and 752 deletions
+32
View File
@@ -1014,6 +1014,38 @@ def test_eviction(ray_start_cluster):
ray.get(dependent_task.remote(obj))
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_nodes": 1,
"num_cpus": 2,
}, {
"num_nodes": 2,
"num_cpus": 1,
}],
indirect=True)
def test_serialized_id_eviction(ray_start_cluster):
@ray.remote
def large_object():
return np.zeros(10 * 1024 * 1024)
@ray.remote
def get(obj_ids):
obj_id = obj_ids[0]
assert (isinstance(ray.get(obj_id), np.ndarray))
# Wait for the object to be evicted.
ray.internal.free(obj_id)
while ray.worker.global_worker.core_worker.object_exists(obj_id):
time.sleep(1)
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(obj_id)
print("get done", obj_ids)
obj = large_object.remote()
result = get.remote([obj])
ray.internal.free(obj)
ray.get(result)
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_nodes": 2,
-2
View File
@@ -101,8 +101,6 @@ RAY_CONFIG(int64_t, free_objects_period_milliseconds, 1000)
/// to -1.
RAY_CONFIG(size_t, free_objects_batch_size, 100)
RAY_CONFIG(bool, lineage_pinning_enabled, false)
/// Whether to enable the new scheduler. The new scheduler is designed
/// only to work with direct calls. Once direct calls afre becoming
/// the default, this scheduler will also become the default.
+6 -10
View File
@@ -173,7 +173,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
reference_counter_ = std::make_shared<ReferenceCounter>(
rpc_address_, RayConfig::instance().distributed_ref_counting_enabled(),
RayConfig::instance().lineage_pinning_enabled(), [this](const rpc::Address &addr) {
[this](const rpc::Address &addr) {
return std::shared_ptr<rpc::CoreWorkerClient>(
new rpc::CoreWorkerClient(addr, *client_call_manager_));
});
@@ -485,8 +485,7 @@ Status CoreWorker::Put(const RayObject &object,
RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id));
}
}
RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));
return Status::OK();
return memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id);
}
Status CoreWorker::Create(const std::shared_ptr<Buffer> &metadata, const size_t data_size,
@@ -530,8 +529,7 @@ Status CoreWorker::Seal(const ObjectID &object_id, bool pin_object,
} else {
RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id));
}
RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));
return Status::OK();
return memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id);
}
Status CoreWorker::Get(const std::vector<ObjectID> &ids, const int64_t timeout_ms,
@@ -1243,8 +1241,8 @@ Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task,
// We need to put an OBJECT_IN_PLASMA error here so the subsequent call to Get()
// properly redirects to the plasma store.
if (task.ArgId(i, 0).IsDirectCallType()) {
RAY_UNUSED(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA),
task.ArgId(i, 0)));
RAY_CHECK_OK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA),
task.ArgId(i, 0)));
}
const auto &arg_id = task.ArgId(i, 0);
by_ref_ids.insert(arg_id);
@@ -1466,9 +1464,7 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest &
rpc::SendReplyCallback send_reply_callback) {
absl::MutexLock lock(&mutex_);
auto stats = reply->mutable_core_worker_stats();
// TODO(swang): Differentiate between tasks that are currently pending
// execution and tasks that have finished but may be retried.
stats->set_num_pending_tasks(task_manager_->NumSubmissibleTasks());
stats->set_num_pending_tasks(task_manager_->NumPendingTasks());
stats->set_task_queue_length(task_queue_length_);
stats->set_num_executed_tasks(num_executed_tasks_);
stats->set_num_object_ids_in_scope(reference_counter_->NumObjectIDsInScope());
+2 -2
View File
@@ -40,8 +40,8 @@ void FutureResolver::ResolveFutureAsync(const ObjectID &object_id, const TaskID
// Either the owner is gone or the owner replied that the object has
// been created. In both cases, we can now try to fetch the object via
// plasma.
RAY_UNUSED(in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA),
object_id));
RAY_CHECK_OK(in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA),
object_id));
}));
}
+13 -77
View File
@@ -23,8 +23,7 @@
<< (it->second.contained_in_borrowed_id.has_value() \
? *it->second.contained_in_borrowed_id \
: ObjectID::Nil()) \
<< " contains: " << it->second.contains.size() \
<< " lineage_ref_count: " << it->second.lineage_ref_count;
<< " contains: " << it->second.contains.size();
namespace {} // namespace
@@ -216,7 +215,6 @@ void ReferenceCounter::UpdateSubmittedTaskReferences(
const std::vector<ObjectID> &argument_ids_to_remove, std::vector<ObjectID> *deleted) {
absl::MutexLock lock(&mutex_);
for (const ObjectID &argument_id : argument_ids_to_add) {
RAY_LOG(DEBUG) << "Increment ref count for submitted task argument " << argument_id;
auto it = object_id_refs_.find(argument_id);
if (it == object_id_refs_.end()) {
// This happens if a large argument is transparently passed by reference
@@ -224,20 +222,13 @@ void ReferenceCounter::UpdateSubmittedTaskReferences(
it = object_id_refs_.emplace(argument_id, Reference()).first;
}
it->second.submitted_task_ref_count++;
// The lineage ref will get released once the task finishes and cannot be
// retried again.
it->second.lineage_ref_count++;
}
// Release the submitted task ref and the lineage ref for any argument IDs
// whose values were inlined.
RemoveSubmittedTaskReferences(argument_ids_to_remove, /*release_lineage=*/true,
deleted);
RemoveSubmittedTaskReferences(argument_ids_to_remove, deleted);
}
void ReferenceCounter::UpdateFinishedTaskReferences(
const std::vector<ObjectID> &argument_ids, bool release_lineage,
const rpc::Address &worker_addr, const ReferenceTableProto &borrowed_refs,
std::vector<ObjectID> *deleted) {
const std::vector<ObjectID> &argument_ids, const rpc::Address &worker_addr,
const ReferenceTableProto &borrowed_refs, std::vector<ObjectID> *deleted) {
absl::MutexLock lock(&mutex_);
// Must merge the borrower refs before decrementing any ref counts. This is
// to make sure that for serialized IDs, we increment the borrower count for
@@ -251,63 +242,19 @@ void ReferenceCounter::UpdateFinishedTaskReferences(
MergeRemoteBorrowers(argument_id, worker_addr, refs);
}
RemoveSubmittedTaskReferences(argument_ids, release_lineage, deleted);
}
void ReferenceCounter::ReleaseLineageReferences(
const std::vector<ObjectID> &argument_ids) {
absl::MutexLock lock(&mutex_);
ReleaseLineageReferencesInternal(argument_ids);
}
void ReferenceCounter::ReleaseLineageReferencesInternal(
const std::vector<ObjectID> &argument_ids) {
for (const ObjectID &argument_id : argument_ids) {
auto it = object_id_refs_.find(argument_id);
if (it == object_id_refs_.end()) {
// References can get evicted early when lineage pinning is disabled.
RAY_CHECK(!lineage_pinning_enabled_);
continue;
}
if (it->second.lineage_ref_count == 0) {
// References can get evicted early when lineage pinning is disabled.
RAY_CHECK(!lineage_pinning_enabled_);
continue;
}
RAY_LOG(DEBUG) << "Releasing lineage internal for argument " << argument_id;
it->second.lineage_ref_count--;
if (it->second.lineage_ref_count == 0) {
// Don't have to pass in a deleted vector here because the reference
// cannot have gone out of scope here since we are only modifying the
// lineage ref count.
DeleteReferenceInternal(it, nullptr);
}
}
RemoveSubmittedTaskReferences(argument_ids, deleted);
}
void ReferenceCounter::RemoveSubmittedTaskReferences(
const std::vector<ObjectID> &argument_ids, bool release_lineage,
std::vector<ObjectID> *deleted) {
const std::vector<ObjectID> &argument_ids, std::vector<ObjectID> *deleted) {
for (const ObjectID &argument_id : argument_ids) {
RAY_LOG(DEBUG) << "Releasing ref for submitted task argument " << argument_id;
auto it = object_id_refs_.find(argument_id);
if (it == object_id_refs_.end()) {
RAY_LOG(WARNING) << "Tried to decrease ref count for nonexistent object ID: "
<< argument_id;
return;
}
RAY_CHECK(it->second.submitted_task_ref_count > 0);
it->second.submitted_task_ref_count--;
if (release_lineage) {
if (it->second.lineage_ref_count > 0) {
it->second.lineage_ref_count--;
} else {
// References can get evicted early when lineage pinning is disabled.
RAY_CHECK(!lineage_pinning_enabled_);
}
}
if (it->second.RefCount() == 0) {
DeleteReferenceInternal(it, deleted);
}
@@ -340,7 +287,7 @@ void ReferenceCounter::DeleteReferences(const std::vector<ObjectID> &object_ids)
}
it->second.local_ref_count = 0;
it->second.submitted_task_ref_count = 0;
if (distributed_ref_counting_enabled_ && !it->second.OutOfScope()) {
if (distributed_ref_counting_enabled_ && !it->second.CanDelete()) {
RAY_LOG(ERROR)
<< "ray.internal.free does not currently work for objects that are still in "
"scope when distributed reference "
@@ -364,6 +311,8 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it,
// Whether it is safe to unpin the value.
bool should_delete_value = false;
// Whether it is safe to delete the Reference.
bool should_delete_reference = false;
// If distributed ref counting is not enabled, then delete the object as soon
// as its local ref count goes to 0.
@@ -373,10 +322,11 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it,
should_delete_value = true;
}
if (it->second.OutOfScope()) {
if (it->second.CanDelete()) {
// If distributed ref counting is enabled, then delete the object once its
// ref count across all processes is 0.
should_delete_value = true;
should_delete_reference = true;
for (const auto &inner_id : it->second.contains) {
auto inner_it = object_id_refs_.find(inner_id);
if (inner_it != object_id_refs_.end()) {
@@ -409,16 +359,8 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it,
deleted->push_back(id);
}
}
if (it->second.ShouldDelete(lineage_pinning_enabled_)) {
if (should_delete_reference) {
RAY_LOG(DEBUG) << "Deleting Reference to object " << id;
// TODO(swang): Update lineage_ref_count for nested objects?
if (on_lineage_released_ && it->second.owned_by_us) {
RAY_LOG(DEBUG) << "Releasing lineage for object " << id;
std::vector<ObjectID> ids_to_release;
on_lineage_released_(id, &ids_to_release);
ReleaseLineageReferencesInternal(ids_to_release);
}
object_id_refs_.erase(it);
ShutdownIfNeeded();
}
@@ -719,7 +661,7 @@ void ReferenceCounter::HandleRefRemoved(const ObjectID &object_id,
// the object was zero. Also, we should have stripped all distributed ref
// count information and returned it to the owner. Therefore, it should be
// okay to delete the object, if it wasn't already deleted.
RAY_CHECK(it->second.OutOfScope());
RAY_CHECK(it->second.CanDelete());
}
// Send the owner information about any new borrowers.
ReferenceTableToProto(borrowed_refs, reply->mutable_borrowed_refs());
@@ -771,12 +713,6 @@ void ReferenceCounter::SetRefRemovedCallback(
}
}
void ReferenceCounter::SetReleaseLineageCallback(
const LineageReleasedCallback &callback) {
RAY_CHECK(on_lineage_released_ == nullptr);
on_lineage_released_ = callback;
}
ReferenceCounter::Reference ReferenceCounter::Reference::FromProto(
const rpc::ObjectReferenceCount &ref_count) {
Reference ref;
+8 -73
View File
@@ -36,16 +36,12 @@ class ReferenceCounter {
using ReferenceTableProto =
::google::protobuf::RepeatedPtrField<rpc::ObjectReferenceCount>;
using ReferenceRemovedCallback = std::function<void(const ObjectID &)>;
using LineageReleasedCallback =
std::function<void(const ObjectID &, std::vector<ObjectID> *)>;
ReferenceCounter(const rpc::WorkerAddress &rpc_address,
bool distributed_ref_counting_enabled = true,
bool lineage_pinning_enabled = false,
rpc::ClientFactoryFn client_factory = nullptr)
: rpc_address_(rpc_address),
distributed_ref_counting_enabled_(distributed_ref_counting_enabled),
lineage_pinning_enabled_(lineage_pinning_enabled),
client_factory_(client_factory) {}
~ReferenceCounter() {}
@@ -71,16 +67,9 @@ class ReferenceCounter {
LOCKS_EXCLUDED(mutex_);
/// Add references for the provided object IDs that correspond to them being
/// dependencies to a submitted task. If lineage pinning is enabled, then
/// this will also pin the Reference entry for each new argument until the
/// argument's lineage ref is released.
/// dependencies to a submitted task.
///
/// \param[in] argument_ids_to_add The arguments of the task to add
/// references for.
/// \param[out] argument_ids_to_remove The arguments of the task to remove
/// references for.
/// \param[out] deleted Any objects that are newly out of scope after this
/// function call.
/// \param[in] object_ids The object IDs to add references for.
void UpdateSubmittedTaskReferences(
const std::vector<ObjectID> &argument_ids_to_add,
const std::vector<ObjectID> &argument_ids_to_remove = std::vector<ObjectID>(),
@@ -92,8 +81,6 @@ class ReferenceCounter {
/// when the task finishes for plasma dependencies.
///
/// \param[in] object_ids The object IDs to remove references for.
/// \param[in] release_lineage Whether to decrement the arguments' lineage
/// ref count.
/// \param[in] worker_addr The address of the worker that executed the task.
/// \param[in] borrowed_refs The references that the worker borrowed during
/// the task. This table includes all task arguments that were passed by
@@ -102,22 +89,11 @@ class ReferenceCounter {
/// worker and/or a task that the worker submitted.
/// \param[out] deleted The object IDs whos reference counts reached zero.
void UpdateFinishedTaskReferences(const std::vector<ObjectID> &argument_ids,
bool release_lineage, const rpc::Address &worker_addr,
const rpc::Address &worker_addr,
const ReferenceTableProto &borrowed_refs,
std::vector<ObjectID> *deleted)
LOCKS_EXCLUDED(mutex_);
/// Release the lineage ref count for this list of object IDs. An object's
/// lineage ref count is the number of tasks that depend on the object that
/// may be retried in the future (pending execution or finished but
/// retryable). If the object is direct (not stored in plasma), then its
/// lineage ref count is 0.
///
/// \param[in] argument_ids The list of objects whose lineage ref counts we
/// should decrement.
void ReleaseLineageReferences(const std::vector<ObjectID> &argument_ids)
LOCKS_EXCLUDED(mutex_);
/// Add an object that we own. The object may depend on other objects.
/// Dependencies for each ObjectID must be set at most once. The local
/// reference count for the ObjectID is set to zero, which assumes that an
@@ -195,15 +171,6 @@ class ReferenceCounter {
const ReferenceRemovedCallback &ref_removed_callback)
LOCKS_EXCLUDED(mutex_);
/// Set a callback to call whenever a Reference that we own is deleted. A
/// Reference can only be deleted if:
/// 1. The ObjectID's ref count is 0 on all workers.
/// 2. There are no tasks that depend on the object that may be retried in
/// the future.
///
/// \param[in] callback The callback to call.
void SetReleaseLineageCallback(const LineageReleasedCallback &callback);
/// Respond to the object's owner once we are no longer borrowing it. The
/// sender is the owner of the object ID. We will send the reply when our
/// RefCount() for the object ID goes to 0.
@@ -309,13 +276,13 @@ class ReferenceCounter {
return local_ref_count + submitted_task_ref_count + contained_in_owned.size();
}
/// Whether this reference is no longer in scope. A reference is in scope
/// if any of the following are true:
/// Whether we can delete this reference. A reference can NOT be deleted if
/// any of the following are true:
/// - The reference is still being used by this process.
/// - The reference was contained in another ID that we were borrowing, and
/// we haven't told the process that gave us that ID yet.
/// - We gave the reference to at least one other process.
bool OutOfScope() const {
bool CanDelete() const {
bool in_scope = RefCount() > 0;
bool was_contained_in_borrowed_id = contained_in_borrowed_id.has_value();
bool has_borrowers = borrowers.size() > 0;
@@ -324,19 +291,6 @@ class ReferenceCounter {
was_stored_in_objects);
}
/// Whether the Reference can be deleted. A Reference can only be deleted
/// if:
/// 1. The ObjectID's ref count is 0 on all workers.
/// 2. If lineage pinning is enabled, there are no tasks that depend on
/// the object that may be retried in the future.
bool ShouldDelete(bool lineage_pinning_enabled) const {
if (lineage_pinning_enabled) {
return OutOfScope() && (lineage_ref_count == 0);
} else {
return OutOfScope();
}
}
/// Description of the call site where the reference was created.
std::string call_site = "<unknown>";
/// Object size if known, otherwise -1;
@@ -405,11 +359,6 @@ class ReferenceCounter {
/// task's caller is also a borrower. The key is the task's return ID, and
/// the value is the task ID and address of the task's caller.
absl::flat_hash_map<ObjectID, rpc::WorkerAddress> stored_in_objects;
/// The number of tasks that depend on this object that may be retried in
/// the future (pending execution or finished but retryable). If the object
/// is inlined (not stored in plasma), then its lineage ref count is 0
/// because any dependent task will already have the value of the object.
size_t lineage_ref_count = 0;
/// Callback that will be called when this ObjectID no longer has
/// references.
@@ -437,7 +386,7 @@ class ReferenceCounter {
/// inlined dependencies are inlined or when the task finishes for plasma
/// dependencies.
void RemoveSubmittedTaskReferences(const std::vector<ObjectID> &argument_ids,
bool release_lineage, std::vector<ObjectID> *deleted)
std::vector<ObjectID> *deleted)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/// Helper method to mark that this ObjectID contains another ObjectID(s).
@@ -523,10 +472,6 @@ class ReferenceCounter {
std::vector<ObjectID> *deleted)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/// Helper method to decrement the lineage ref count for a list of objects.
void ReleaseLineageReferencesInternal(const std::vector<ObjectID> &argument_ids)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/// Address of our RPC server. This is used to determine whether we own a
/// given object or not, by comparing our WorkerID with the WorkerID of the
/// object's owner.
@@ -535,13 +480,7 @@ class ReferenceCounter {
/// Feature flag for distributed ref counting. If this is false, then we will
/// keep the distributed ref count, but only the local ref count will be used
/// to decide when objects can be evicted.
const bool distributed_ref_counting_enabled_;
/// Feature flag for lineage pinning. If this is false, then we will keep the
/// lineage ref count, but this will not be used to decide when the object's
/// Reference can be deleted. The object's lineage ref count is the number of
/// tasks that depend on that object that may be retried in the future.
const bool lineage_pinning_enabled_;
bool distributed_ref_counting_enabled_;
/// Factory for producing new core worker clients.
rpc::ClientFactoryFn client_factory_;
@@ -558,10 +497,6 @@ class ReferenceCounter {
/// Holds all reference counts and dependency information for tracked ObjectIDs.
ReferenceTable object_id_refs_ GUARDED_BY(mutex_);
/// The callback to call once an object ID that we own is no longer in scope
/// and it has no tasks that depend on it that may be retried in the future.
/// The object's Reference will be erased after this callback.
LineageReleasedCallback on_lineage_released_;
/// Optional shutdown hook to call when all references have gone
/// out of scope.
std::function<void()> shutdown_hook_ GUARDED_BY(mutex_) = nullptr;
+10 -118
View File
@@ -36,20 +36,6 @@ class ReferenceCountTest : public ::testing::Test {
virtual void TearDown() {}
};
class ReferenceCountLineageEnabledTest : public ::testing::Test {
protected:
std::unique_ptr<ReferenceCounter> rc;
virtual void SetUp() {
rpc::Address addr;
rc = std::unique_ptr<ReferenceCounter>(
new ReferenceCounter(addr,
/*distributed_ref_counting_enabled=*/true,
/*lineage_pinning_enabled=*/true));
}
virtual void TearDown() {}
};
class MockWorkerClient : public rpc::CoreWorkerClientInterface {
public:
// Helper function to generate a random address.
@@ -64,9 +50,8 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface {
MockWorkerClient(const std::string &addr, rpc::ClientFactoryFn client_factory = nullptr)
: task_id_(TaskID::ForFakeTask()),
address_(CreateRandomAddress(addr)),
rc_(rpc::WorkerAddress(address_),
/*distributed_ref_counting_enabled=*/true,
/*lineage_pinning_enabled=*/false, client_factory) {}
rc_(rpc::WorkerAddress(address_), /*distributed_ref_counting_enabled=*/true,
client_factory) {}
ray::Status WaitForRefRemoved(
const rpc::WaitForRefRemovedRequest &request,
@@ -171,8 +156,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface {
if (!arg_id.IsNil()) {
arguments.push_back(arg_id);
}
rc_.UpdateFinishedTaskReferences(arguments, false, borrower_address, borrower_refs,
nullptr);
rc_.UpdateFinishedTaskReferences(arguments, borrower_address, borrower_refs, nullptr);
}
// Global map from Worker ID -> MockWorkerClient.
@@ -218,13 +202,13 @@ TEST_F(ReferenceCountTest, TestBasic) {
rc->UpdateSubmittedTaskReferences({id1});
rc->UpdateSubmittedTaskReferences({id1, id2});
ASSERT_EQ(rc->NumObjectIDsInScope(), 2);
rc->UpdateFinishedTaskReferences({id1}, false, empty_borrower, empty_refs, &out);
rc->UpdateFinishedTaskReferences({id1}, empty_borrower, empty_refs, &out);
ASSERT_EQ(rc->NumObjectIDsInScope(), 2);
ASSERT_EQ(out.size(), 0);
rc->UpdateFinishedTaskReferences({id2}, false, empty_borrower, empty_refs, &out);
rc->UpdateFinishedTaskReferences({id2}, empty_borrower, empty_refs, &out);
ASSERT_EQ(rc->NumObjectIDsInScope(), 1);
ASSERT_EQ(out.size(), 1);
rc->UpdateFinishedTaskReferences({id1}, false, empty_borrower, empty_refs, &out);
rc->UpdateFinishedTaskReferences({id1}, empty_borrower, empty_refs, &out);
ASSERT_EQ(rc->NumObjectIDsInScope(), 0);
ASSERT_EQ(out.size(), 2);
out.clear();
@@ -237,26 +221,16 @@ TEST_F(ReferenceCountTest, TestBasic) {
rc->RemoveLocalReference(id1, &out);
ASSERT_EQ(rc->NumObjectIDsInScope(), 2);
ASSERT_EQ(out.size(), 0);
rc->UpdateFinishedTaskReferences({id2}, false, empty_borrower, empty_refs, &out);
rc->UpdateFinishedTaskReferences({id2}, empty_borrower, empty_refs, &out);
ASSERT_EQ(rc->NumObjectIDsInScope(), 2);
ASSERT_EQ(out.size(), 0);
rc->UpdateFinishedTaskReferences({id1}, false, empty_borrower, empty_refs, &out);
rc->UpdateFinishedTaskReferences({id1}, empty_borrower, empty_refs, &out);
ASSERT_EQ(rc->NumObjectIDsInScope(), 1);
ASSERT_EQ(out.size(), 1);
rc->RemoveLocalReference(id2, &out);
ASSERT_EQ(rc->NumObjectIDsInScope(), 0);
ASSERT_EQ(out.size(), 2);
out.clear();
// Submitted task with inlined references.
rc->UpdateSubmittedTaskReferences({id1});
rc->UpdateSubmittedTaskReferences({id2}, {id1}, &out);
ASSERT_EQ(rc->NumObjectIDsInScope(), 1);
ASSERT_EQ(out.size(), 1);
rc->UpdateSubmittedTaskReferences({}, {id2}, &out);
ASSERT_EQ(rc->NumObjectIDsInScope(), 0);
ASSERT_EQ(out.size(), 2);
out.clear();
}
// Tests call site tracking and ability to update object size.
@@ -332,12 +306,12 @@ TEST(MemoryStoreIntegrationTest, TestSimple) {
CoreWorkerMemoryStore store(nullptr, rc);
// Tests putting an object with no references is ignored.
RAY_CHECK(store.Put(buffer, id2));
RAY_CHECK_OK(store.Put(buffer, id2));
ASSERT_EQ(store.Size(), 0);
// Tests ref counting overrides remove after get option.
rc->AddLocalReference(id1, "");
RAY_CHECK(store.Put(buffer, id1));
RAY_CHECK_OK(store.Put(buffer, id1));
ASSERT_EQ(store.Size(), 1);
std::vector<std::shared_ptr<RayObject>> results;
WorkerContext ctx(WorkerType::WORKER, JobID::Nil());
@@ -1733,88 +1707,6 @@ TEST(DistributedReferenceCountTest, TestReturnBorrowedIdChainOutOfOrder) {
// TODO: Test Pop and Merge individually.
// Test to make sure that we call the lineage released callback correctly.
TEST_F(ReferenceCountLineageEnabledTest, TestBasicLineage) {
std::vector<ObjectID> out;
std::vector<ObjectID> lineage_deleted;
ObjectID id = ObjectID::FromRandom();
rc->SetReleaseLineageCallback(
[&](const ObjectID &object_id, std::vector<ObjectID> *ids_to_release) {
lineage_deleted.push_back(object_id);
});
// We should not keep lineage for borrowed objects.
rc->AddLocalReference(id, "");
ASSERT_TRUE(rc->HasReference(id));
rc->RemoveLocalReference(id, nullptr);
ASSERT_TRUE(lineage_deleted.empty());
// We should keep lineage for owned objects.
rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0);
rc->AddLocalReference(id, "");
ASSERT_TRUE(rc->HasReference(id));
rc->RemoveLocalReference(id, nullptr);
ASSERT_EQ(lineage_deleted.size(), 1);
}
// Test for pinning the lineage of an object, where the lineage is a chain of
// tasks that each depend on the previous. The previous objects should already
// have gone out of scope, but their Reference entry is pinned until the final
// object goes out of scope.
TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) {
std::vector<ObjectID> out;
std::vector<ObjectID> lineage_deleted;
std::vector<ObjectID> ids;
for (int i = 0; i < 3; i++) {
ObjectID id = ObjectID::FromRandom();
ids.push_back(id);
rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0);
}
rc->SetReleaseLineageCallback(
[&](const ObjectID &object_id, std::vector<ObjectID> *ids_to_release) {
lineage_deleted.push_back(object_id);
// Simulate releasing objects in downstream_id's lineage.
size_t i = 0;
for (; i < ids.size(); i++) {
if (ids[i] == object_id) {
break;
}
}
RAY_CHECK(i < ids.size());
if (i > 0) {
ids_to_release->push_back(ids[i - 1]);
}
});
for (size_t i = 0; i < ids.size() - 1; i++) {
auto id = ids[i];
// Submit a dependent task on id.
rc->AddLocalReference(id, "");
ASSERT_TRUE(rc->HasReference(id));
rc->UpdateSubmittedTaskReferences({id});
rc->RemoveLocalReference(id, nullptr);
// The task finishes but is retryable.
rc->UpdateFinishedTaskReferences({id}, false, empty_borrower, empty_refs, &out);
ASSERT_EQ(out.size(), 1);
out.clear();
ASSERT_TRUE(lineage_deleted.empty());
ASSERT_TRUE(rc->HasReference(id));
}
// The task return ID goes out of scope.
rc->AddLocalReference(ids.back(), "");
rc->RemoveLocalReference(ids.back(), nullptr);
// The removal of the last return ID should recursively delete all
// references.
ASSERT_EQ(lineage_deleted.size(), ids.size());
ASSERT_EQ(rc->NumObjectIDsInScope(), 0);
}
} // namespace ray
int main(int argc, char **argv) {
@@ -157,11 +157,10 @@ std::shared_ptr<RayObject> CoreWorkerMemoryStore::GetOrPromoteToPlasma(
return nullptr;
}
bool CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_id) {
Status CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_id) {
std::vector<std::function<void(std::shared_ptr<RayObject>)>> async_callbacks;
auto object_entry = std::make_shared<RayObject>(object.GetData(), object.GetMetadata(),
object.GetNestedIds(), true);
bool stored_in_direct_memory = true;
// TODO(edoakes): we should instead return a flag to the caller to put the object in
// plasma.
@@ -171,7 +170,7 @@ bool CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_
auto iter = objects_.find(object_id);
if (iter != objects_.end()) {
return true; // Object already exists in the store, which is fine.
return Status::OK(); // Object already exists in the store, which is fine.
}
auto async_callback_it = object_async_get_requests_.find(object_id);
@@ -218,7 +217,6 @@ bool CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_
// in-memory store (would cause deadlock).
if (should_put_in_plasma) {
store_in_plasma_(object, object_id);
stored_in_direct_memory = false;
}
// It's important for performance to run the callbacks outside the lock.
@@ -226,7 +224,7 @@ bool CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_
cb(object_entry);
}
return stored_in_direct_memory;
return Status::OK();
}
Status CoreWorkerMemoryStore::Get(const std::vector<ObjectID> &object_ids,
@@ -42,9 +42,8 @@ class CoreWorkerMemoryStore {
///
/// \param[in] object The ray object.
/// \param[in] object_id Object ID specified by user.
/// \return Whether the object was put into the memory store. If false, then
/// this is because the object was promoted to and stored in plasma instead.
bool Put(const RayObject &object, const ObjectID &object_id);
/// \return Status.
Status Put(const RayObject &object, const ObjectID &object_id);
/// Get a list of objects from the object store.
///
+36 -126
View File
@@ -29,6 +29,9 @@ void TaskManager::AddPendingTask(const TaskID &caller_id,
const TaskSpecification &spec,
const std::string &call_site, int max_retries) {
RAY_LOG(DEBUG) << "Adding pending task " << spec.TaskId();
absl::MutexLock lock(&mu_);
std::pair<TaskSpecification, int> entry = {spec, max_retries};
RAY_CHECK(pending_tasks_.emplace(spec.TaskId(), std::move(entry)).second);
// Add references for the dependencies to the task.
std::vector<ObjectID> task_deps;
@@ -68,23 +71,16 @@ void TaskManager::AddPendingTask(const TaskID &caller_id,
/*inner_ids=*/{}, caller_id, caller_address,
call_site, -1);
}
{
absl::MutexLock lock(&mu_);
RAY_CHECK(submissible_tasks_
.emplace(spec.TaskId(), TaskEntry(spec, max_retries, num_returns))
.second);
}
}
void TaskManager::DrainAndShutdown(std::function<void()> shutdown) {
bool has_pending_tasks = false;
{
absl::MutexLock lock(&mu_);
if (!submissible_tasks_.empty()) {
if (!pending_tasks_.empty()) {
has_pending_tasks = true;
RAY_LOG(WARNING)
<< "This worker is still managing " << submissible_tasks_.size()
<< "This worker is still managing " << pending_tasks_.size()
<< " in flight tasks, waiting for them to finish before shutting down.";
shutdown_hook_ = shutdown;
}
@@ -96,31 +92,27 @@ void TaskManager::DrainAndShutdown(std::function<void()> shutdown) {
}
}
bool TaskManager::IsTaskSubmissible(const TaskID &task_id) const {
absl::MutexLock lock(&mu_);
return submissible_tasks_.count(task_id) > 0;
}
bool TaskManager::IsTaskPending(const TaskID &task_id) const {
absl::MutexLock lock(&mu_);
const auto it = submissible_tasks_.find(task_id);
if (it == submissible_tasks_.end()) {
return false;
}
return it->second.pending;
}
int TaskManager::NumSubmissibleTasks() const {
absl::MutexLock lock(&mu_);
return submissible_tasks_.size();
return pending_tasks_.count(task_id) > 0;
}
void TaskManager::CompletePendingTask(const TaskID &task_id,
const rpc::PushTaskReply &reply,
const rpc::Address &worker_addr) {
RAY_LOG(DEBUG) << "Completing task " << task_id;
TaskSpecification spec;
{
absl::MutexLock lock(&mu_);
auto it = pending_tasks_.find(task_id);
RAY_CHECK(it != pending_tasks_.end())
<< "Tried to complete task that was not pending " << task_id;
spec = it->second.first;
pending_tasks_.erase(it);
}
RemoveFinishedTaskReferences(spec, worker_addr, reply.borrowed_refs());
std::vector<ObjectID> direct_return_ids;
for (int i = 0; i < reply.return_objects_size(); i++) {
const auto &return_object = reply.return_objects(i);
ObjectID object_id = ObjectID::FromBinary(return_object.object_id());
@@ -128,7 +120,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
if (return_object.in_plasma()) {
// Mark it as in plasma with a dummy object.
RAY_CHECK(
RAY_CHECK_OK(
in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));
} else {
std::shared_ptr<LocalMemoryBuffer> data_buffer;
@@ -145,53 +137,13 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
reinterpret_cast<const uint8_t *>(return_object.metadata().data())),
return_object.metadata().size());
}
bool stored_in_direct_memory = in_memory_store_->Put(
RAY_CHECK_OK(in_memory_store_->Put(
RayObject(data_buffer, metadata_buffer,
IdVectorFromProtobuf<ObjectID>(return_object.nested_inlined_ids())),
object_id);
if (stored_in_direct_memory) {
direct_return_ids.push_back(object_id);
}
object_id));
}
}
TaskSpecification spec;
bool release_lineage = true;
{
absl::MutexLock lock(&mu_);
auto it = submissible_tasks_.find(task_id);
RAY_CHECK(it != submissible_tasks_.end())
<< "Tried to complete task that was not pending " << task_id;
spec = it->second.spec;
// Release the lineage for any non-plasma return objects.
for (const auto &direct_return_id : direct_return_ids) {
RAY_LOG(DEBUG) << "Task " << it->first << " returned direct object "
<< direct_return_id << ", now has "
<< it->second.reconstructable_return_ids.size()
<< " plasma returns in scope";
it->second.reconstructable_return_ids.erase(direct_return_id);
}
RAY_LOG(DEBUG) << "Task " << it->first << " now has "
<< it->second.reconstructable_return_ids.size()
<< " plasma returns in scope";
it->second.pending = false;
// A finished task can be only be re-executed if it has some number of
// retries left and returned at least one object that is still in use and
// stored in plasma.
bool task_retryable =
it->second.num_retries_left > 0 && !it->second.reconstructable_return_ids.empty();
if (task_retryable) {
// Pin the task spec if it may be retried again.
release_lineage = false;
} else {
submissible_tasks_.erase(it);
}
}
RemoveFinishedTaskReferences(spec, release_lineage, worker_addr, reply.borrowed_refs());
ShutdownIfNeeded();
}
@@ -203,20 +155,18 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_
<< rpc::ErrorType_Name(error_type);
int num_retries_left = 0;
TaskSpecification spec;
bool release_lineage = true;
{
absl::MutexLock lock(&mu_);
auto it = submissible_tasks_.find(task_id);
RAY_CHECK(it != submissible_tasks_.end())
auto it = pending_tasks_.find(task_id);
RAY_CHECK(it != pending_tasks_.end())
<< "Tried to complete task that was not pending " << task_id;
spec = it->second.spec;
num_retries_left = it->second.num_retries_left;
spec = it->second.first;
num_retries_left = it->second.second;
if (num_retries_left == 0) {
submissible_tasks_.erase(it);
pending_tasks_.erase(it);
} else {
RAY_CHECK(it->second.num_retries_left > 0);
it->second.num_retries_left--;
release_lineage = false;
RAY_CHECK(num_retries_left > 0);
it->second.second--;
}
}
@@ -249,7 +199,7 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_
}
// The worker failed to execute the task, so it cannot be borrowing any
// objects.
RemoveFinishedTaskReferences(spec, release_lineage, rpc::Address(),
RemoveFinishedTaskReferences(spec, rpc::Address(),
ReferenceCounter::ReferenceTableProto());
MarkPendingTaskFailed(task_id, spec, error_type);
}
@@ -261,7 +211,7 @@ void TaskManager::ShutdownIfNeeded() {
std::function<void()> shutdown_hook = nullptr;
{
absl::MutexLock lock(&mu_);
if (shutdown_hook_ && submissible_tasks_.empty()) {
if (shutdown_hook_ && pending_tasks_.empty()) {
RAY_LOG(WARNING) << "All in flight tasks finished, worker will shut down after "
"draining references.";
std::swap(shutdown_hook_, shutdown_hook);
@@ -284,7 +234,7 @@ void TaskManager::OnTaskDependenciesInlined(
}
void TaskManager::RemoveFinishedTaskReferences(
TaskSpecification &spec, bool release_lineage, const rpc::Address &borrower_addr,
TaskSpecification &spec, const rpc::Address &borrower_addr,
const ReferenceCounter::ReferenceTableProto &borrowed_refs) {
std::vector<ObjectID> plasma_dependencies;
for (size_t i = 0; i < spec.NumArgs(); i++) {
@@ -305,51 +255,11 @@ void TaskManager::RemoveFinishedTaskReferences(
}
std::vector<ObjectID> deleted;
reference_counter_->UpdateFinishedTaskReferences(
plasma_dependencies, release_lineage, borrower_addr, borrowed_refs, &deleted);
reference_counter_->UpdateFinishedTaskReferences(plasma_dependencies, borrower_addr,
borrowed_refs, &deleted);
in_memory_store_->Delete(deleted);
}
void TaskManager::RemoveLineageReference(const ObjectID &object_id,
std::vector<ObjectID> *released_objects) {
absl::MutexLock lock(&mu_);
const TaskID &task_id = object_id.TaskId();
auto it = submissible_tasks_.find(task_id);
if (it == submissible_tasks_.end()) {
RAY_LOG(DEBUG) << "No lineage for object " << object_id;
return;
}
RAY_LOG(DEBUG) << "Plasma object " << object_id << " out of scope";
for (const auto &plasma_id : it->second.reconstructable_return_ids) {
RAY_LOG(DEBUG) << "Task " << task_id << " has " << plasma_id << " in scope";
}
it->second.reconstructable_return_ids.erase(object_id);
RAY_LOG(DEBUG) << "Task " << task_id << " now has "
<< it->second.reconstructable_return_ids.size()
<< " plasma returns in scope";
if (it->second.reconstructable_return_ids.empty() && !it->second.pending) {
// If the task can no longer be retried, decrement the lineage ref count
// for each of the task's args.
for (size_t i = 0; i < it->second.spec.NumArgs(); i++) {
if (it->second.spec.ArgByRef(i)) {
for (size_t j = 0; j < it->second.spec.ArgIdCount(i); j++) {
released_objects->push_back(it->second.spec.ArgId(i, j));
}
} else {
const auto &inlined_ids = it->second.spec.ArgInlinedIds(i);
released_objects->insert(released_objects->end(), inlined_ids.begin(),
inlined_ids.end());
}
}
// The task has finished and none of the return IDs are in scope anymore,
// so it is safe to remove the task spec.
submissible_tasks_.erase(it);
}
}
void TaskManager::MarkPendingTaskFailed(const TaskID &task_id,
const TaskSpecification &spec,
rpc::ErrorType error_type) {
@@ -360,7 +270,7 @@ void TaskManager::MarkPendingTaskFailed(const TaskID &task_id,
const auto object_id = ObjectID::ForTaskReturn(
task_id, /*index=*/i + 1,
/*transport_type=*/static_cast<int>(TaskTransportType::DIRECT));
RAY_UNUSED(in_memory_store_->Put(RayObject(error_type), object_id));
RAY_CHECK_OK(in_memory_store_->Put(RayObject(error_type), object_id));
}
if (spec.IsActorCreationTask()) {
@@ -372,9 +282,9 @@ void TaskManager::MarkPendingTaskFailed(const TaskID &task_id,
TaskSpecification TaskManager::GetTaskSpec(const TaskID &task_id) const {
absl::MutexLock lock(&mu_);
auto it = submissible_tasks_.find(task_id);
RAY_CHECK(it != submissible_tasks_.end());
return it->second.spec;
auto it = pending_tasks_.find(task_id);
RAY_CHECK(it != pending_tasks_.end());
return it->second.first;
}
} // namespace ray
+19 -71
View File
@@ -53,12 +53,7 @@ class TaskManager : public TaskFinisherInterface {
: in_memory_store_(in_memory_store),
reference_counter_(reference_counter),
actor_manager_(actor_manager),
retry_task_callback_(retry_task_callback) {
reference_counter_->SetReleaseLineageCallback(
[this](const ObjectID &object_id, std::vector<ObjectID> *ids_to_release) {
RemoveLineageReference(object_id, ids_to_release);
});
}
retry_task_callback_(retry_task_callback) {}
/// Add a task that is pending execution.
///
@@ -77,6 +72,12 @@ class TaskManager : public TaskFinisherInterface {
/// \param shutdown The shutdown callback to call.
void DrainAndShutdown(std::function<void()> shutdown);
/// Return whether the task is pending.
///
/// \param[in] task_id ID of the task to query.
/// \return Whether the task is pending.
bool IsTaskPending(const TaskID &task_id) const;
/// Write return objects for a pending task to the memory store.
///
/// \param[in] task_id ID of the pending task.
@@ -110,68 +111,10 @@ class TaskManager : public TaskFinisherInterface {
/// Return the spec for a pending task.
TaskSpecification GetTaskSpec(const TaskID &task_id) const;
/// Return whether this task can be submitted for execution.
///
/// \param[in] task_id ID of the task to query.
/// \return Whether the task can be submitted for execution.
bool IsTaskSubmissible(const TaskID &task_id) const;
/// Return whether the task is pending.
///
/// \param[in] task_id ID of the task to query.
/// \return Whether the task is pending.
bool IsTaskPending(const TaskID &task_id) const;
/// Return the number of pending tasks.
int NumSubmissibleTasks() const;
int NumPendingTasks() const { return pending_tasks_.size(); }
private:
struct TaskEntry {
TaskEntry(const TaskSpecification &spec_arg, int num_retries_left_arg,
size_t num_returns)
: spec(spec_arg), num_retries_left(num_retries_left_arg) {
for (size_t i = 0; i < num_returns; i++) {
reconstructable_return_ids.insert(spec.ReturnId(i, TaskTransportType::DIRECT));
}
}
/// The task spec. This is pinned as long as the following are true:
/// - The task is still pending execution. This means that the task may
/// fail and so it may be retried in the future.
/// - The task finished execution, but it has num_retries_left > 0 and
/// reconstructable_return_ids is not empty. This means that the task may
/// be retried in the future to recreate its return objects.
/// TODO(swang): The TaskSpec protobuf must be copied into the
/// PushTaskRequest protobuf when sent to a worker so that we can retry it if
/// the worker fails. We could avoid this by either not caching the full
/// TaskSpec for tasks that cannot be retried (e.g., actor tasks), or by
/// storing a shared_ptr to a PushTaskRequest protobuf for all tasks.
const TaskSpecification spec;
// Number of times this task may be resubmitted. If this reaches 0, then
// the task entry may be erased.
int num_retries_left;
// Whether this task is currently pending execution. This is used to pin
// the task entry if the task is still pending but all of its return IDs
// are out of scope.
bool pending = true;
// Objects returned by this task that are reconstructable. This is set
// initially to the task's return objects, since if the task fails, these
// objects may be reconstructed by resubmitting the task. Once the task
// finishes its first execution, then the objects that the task returned by
// value are removed from this set because they can be inlined in any
// dependent tasks. Objects that the task returned through plasma are
// reconstructable, so they are only removed from this set once:
// 1) The language frontend no longer has a reference to the object ID.
// 2) There are no tasks that depend on the object. This includes both
// pending tasks and tasks that finished execution but that may be
// retried in the future.
absl::flat_hash_set<ObjectID> reconstructable_return_ids;
};
/// Remove a lineage reference to this object ID. This should be called
/// whenever a task that depended on this object ID can no longer be retried.
void RemoveLineageReference(const ObjectID &object_id,
std::vector<ObjectID> *ids_to_release) LOCKS_EXCLUDED(mu_);
/// Treat a pending task as failed. The lock should not be held when calling
/// this method because it may trigger callbacks in this or other classes.
void MarkPendingTaskFailed(const TaskID &task_id, const TaskSpecification &spec,
@@ -182,7 +125,7 @@ class TaskManager : public TaskFinisherInterface {
/// failed. The remaining dependencies are plasma objects and any ObjectIDs
/// that were inlined in the task spec.
void RemoveFinishedTaskReferences(
TaskSpecification &spec, bool release_lineage, const rpc::Address &worker_addr,
TaskSpecification &spec, const rpc::Address &worker_addr,
const ReferenceCounter::ReferenceTableProto &borrowed_refs);
/// Shutdown if all tasks are finished and shutdown is scheduled.
@@ -211,11 +154,16 @@ class TaskManager : public TaskFinisherInterface {
/// Protects below fields.
mutable absl::Mutex mu_;
/// This map contains one entry per task that may be submitted for
/// execution. This includes both tasks that are currently pending execution
/// and tasks that finished execution but that may be retried again in the
/// future.
absl::flat_hash_map<TaskID, TaskEntry> submissible_tasks_ GUARDED_BY(mu_);
/// Map from task ID to a pair of:
/// {task spec, number of allowed retries left}
/// This map contains one entry per pending task that we submitted.
/// TODO(swang): The TaskSpec protobuf must be copied into the
/// PushTaskRequest protobuf when sent to a worker so that we can retry it if
/// the worker fails. We could avoid this by either not caching the full
/// TaskSpec for tasks that cannot be retried (e.g., actor tasks), or by
/// storing a shared_ptr to a PushTaskRequest protobuf for all tasks.
absl::flat_hash_map<TaskID, std::pair<TaskSpecification, int>> pending_tasks_
GUARDED_BY(mu_);
/// Optional shutdown hook to call when pending tasks all finish.
std::function<void()> shutdown_hook_ GUARDED_BY(mu_) = nullptr;
+3 -3
View File
@@ -786,7 +786,7 @@ TEST_F(SingleNodeTest, TestMemoryStoreProvider) {
std::vector<ObjectID> ids(buffers.size());
for (size_t i = 0; i < ids.size(); i++) {
ids[i] = ObjectID::FromRandom().WithDirectTransportType();
RAY_CHECK(provider.Put(buffers[i], ids[i]));
RAY_CHECK_OK(provider.Put(buffers[i], ids[i]));
}
absl::flat_hash_set<ObjectID> wait_ids(ids.begin(), ids.end());
@@ -843,7 +843,7 @@ TEST_F(SingleNodeTest, TestMemoryStoreProvider) {
std::vector<ObjectID> unready_ids(buffers.size());
for (size_t i = 0; i < unready_ids.size(); i++) {
ready_ids[i] = ObjectID::FromRandom().WithDirectTransportType();
RAY_CHECK(provider.Put(buffers[i], ready_ids[i]));
RAY_CHECK_OK(provider.Put(buffers[i], ready_ids[i]));
unready_ids[i] = ObjectID::FromRandom().WithDirectTransportType();
}
@@ -851,7 +851,7 @@ TEST_F(SingleNodeTest, TestMemoryStoreProvider) {
sleep(1);
for (size_t i = 0; i < unready_ids.size(); i++) {
RAY_CHECK(provider.Put(buffers[i], unready_ids[i]));
RAY_CHECK_OK(provider.Put(buffers[i], unready_ids[i]));
}
};
@@ -136,9 +136,9 @@ TEST_F(DirectActorSubmitterTest, TestDependencies) {
// Put the dependencies in the store in the same order as task submission.
auto data = GenerateRandomObject();
ASSERT_TRUE(store_->Put(*data, obj1));
ASSERT_TRUE(store_->Put(*data, obj1).ok());
ASSERT_EQ(worker_client_->callbacks.size(), 1);
ASSERT_TRUE(store_->Put(*data, obj2));
ASSERT_TRUE(store_->Put(*data, obj2).ok());
ASSERT_EQ(worker_client_->callbacks.size(), 2);
}
@@ -165,9 +165,9 @@ TEST_F(DirectActorSubmitterTest, TestOutOfOrderDependencies) {
// Put the dependencies in the store in the opposite order of task
// submission.
auto data = GenerateRandomObject();
ASSERT_TRUE(store_->Put(*data, obj2));
ASSERT_TRUE(store_->Put(*data, obj2).ok());
ASSERT_EQ(worker_client_->callbacks.size(), 0);
ASSERT_TRUE(store_->Put(*data, obj1));
ASSERT_TRUE(store_->Put(*data, obj1).ok());
ASSERT_EQ(worker_client_->callbacks.size(), 2);
}
@@ -139,7 +139,7 @@ TEST(TestMemoryStore, TestPromoteToPlasma) {
ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
auto data = GenerateRandomObject();
ASSERT_TRUE(mem->Put(*data, obj1));
ASSERT_TRUE(mem->Put(*data, obj1).ok());
// Test getting an already existing object.
ASSERT_TRUE(mem->GetOrPromoteToPlasma(obj1) != nullptr);
@@ -148,7 +148,7 @@ TEST(TestMemoryStore, TestPromoteToPlasma) {
// Testing getting an object that doesn't exist yet causes promotion.
ASSERT_TRUE(mem->GetOrPromoteToPlasma(obj2) == nullptr);
ASSERT_TRUE(num_plasma_puts == 0);
ASSERT_FALSE(mem->Put(*data, obj2));
ASSERT_TRUE(mem->Put(*data, obj2).ok());
ASSERT_TRUE(num_plasma_puts == 1);
// The next time you get it, it's already there so no need to promote.
@@ -191,7 +191,7 @@ TEST(LocalDependencyResolverTest, TestHandlePlasmaPromotion) {
auto metadata = const_cast<uint8_t *>(reinterpret_cast<const uint8_t *>(meta.data()));
auto meta_buffer = std::make_shared<LocalMemoryBuffer>(metadata, meta.size());
auto data = RayObject(nullptr, meta_buffer, std::vector<ObjectID>());
ASSERT_TRUE(store->Put(data, obj1));
ASSERT_TRUE(store->Put(data, obj1).ok());
TaskSpecification task;
task.GetMutableMessage().add_args()->add_object_ids(obj1.Binary());
ASSERT_TRUE(task.ArgId(0, 0).IsDirectCallType());
@@ -213,8 +213,8 @@ TEST(LocalDependencyResolverTest, TestInlineLocalDependencies) {
ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
auto data = GenerateRandomObject();
// Ensure the data is already present in the local store.
ASSERT_TRUE(store->Put(*data, obj1));
ASSERT_TRUE(store->Put(*data, obj2));
ASSERT_TRUE(store->Put(*data, obj1).ok());
ASSERT_TRUE(store->Put(*data, obj2).ok());
TaskSpecification task;
task.GetMutableMessage().add_args()->add_object_ids(obj1.Binary());
task.GetMutableMessage().add_args()->add_object_ids(obj2.Binary());
@@ -244,8 +244,8 @@ TEST(LocalDependencyResolverTest, TestInlinePendingDependencies) {
resolver.ResolveDependencies(task, [&ok]() { ok = true; });
ASSERT_EQ(resolver.NumPendingTasks(), 1);
ASSERT_TRUE(!ok);
ASSERT_TRUE(store->Put(*data, obj1));
ASSERT_TRUE(store->Put(*data, obj2));
ASSERT_TRUE(store->Put(*data, obj1).ok());
ASSERT_TRUE(store->Put(*data, obj2).ok());
// Tests that the task proto was rewritten to have inline argument values after
// resolution completes.
ASSERT_TRUE(ok);
@@ -273,8 +273,8 @@ TEST(LocalDependencyResolverTest, TestInlinedObjectIds) {
resolver.ResolveDependencies(task, [&ok]() { ok = true; });
ASSERT_EQ(resolver.NumPendingTasks(), 1);
ASSERT_TRUE(!ok);
ASSERT_TRUE(store->Put(*data, obj1));
ASSERT_TRUE(store->Put(*data, obj2));
ASSERT_TRUE(store->Put(*data, obj1).ok());
ASSERT_TRUE(store->Put(*data, obj2).ok());
// Tests that the task proto was rewritten to have inline argument values after
// resolution completes.
ASSERT_TRUE(ok);
@@ -698,16 +698,16 @@ TEST(DirectTaskTransportTest, TestSchedulingKeys) {
ObjectID plasma2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
// Ensure the data is already present in the local store for direct call objects.
auto data = GenerateRandomObject();
ASSERT_TRUE(store->Put(*data, direct1));
ASSERT_TRUE(store->Put(*data, direct2));
ASSERT_TRUE(store->Put(*data, direct1).ok());
ASSERT_TRUE(store->Put(*data, direct2).ok());
// Force plasma objects to be promoted.
std::string meta = std::to_string(static_cast<int>(rpc::ErrorType::OBJECT_IN_PLASMA));
auto metadata = const_cast<uint8_t *>(reinterpret_cast<const uint8_t *>(meta.data()));
auto meta_buffer = std::make_shared<LocalMemoryBuffer>(metadata, meta.size());
auto plasma_data = RayObject(nullptr, meta_buffer, std::vector<ObjectID>());
ASSERT_TRUE(store->Put(plasma_data, plasma1));
ASSERT_TRUE(store->Put(plasma_data, plasma2));
ASSERT_TRUE(store->Put(plasma_data, plasma1).ok());
ASSERT_TRUE(store->Put(plasma_data, plasma2).ok());
TaskSpecification same_deps_1 = BuildTaskSpec(resources1, descriptor1);
same_deps_1.GetMutableMessage().add_args()->add_object_ids(direct1.Binary());
+4 -245
View File
@@ -44,11 +44,10 @@ class MockActorManager : public ActorManagerInterface {
class TaskManagerTest : public ::testing::Test {
public:
TaskManagerTest(bool lineage_pinning_enabled = false)
TaskManagerTest()
: store_(std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore())),
reference_counter_(std::shared_ptr<ReferenceCounter>(new ReferenceCounter(
rpc::Address(),
/*distributed_ref_counting_enabled=*/true, lineage_pinning_enabled))),
reference_counter_(
std::shared_ptr<ReferenceCounter>(new ReferenceCounter(rpc::Address()))),
actor_manager_(std::shared_ptr<ActorManagerInterface>(new MockActorManager())),
manager_(store_, reference_counter_, actor_manager_,
[this](const TaskSpecification &spec) {
@@ -63,11 +62,6 @@ class TaskManagerTest : public ::testing::Test {
int num_retries_ = 0;
};
class TaskManagerLineageTest : public TaskManagerTest {
public:
TaskManagerLineageTest() : TaskManagerTest(true) {}
};
TEST_F(TaskManagerTest, TestTaskSuccess) {
TaskID caller_id = TaskID::Nil();
rpc::Address caller_address;
@@ -159,7 +153,6 @@ TEST_F(TaskManagerTest, TestTaskRetry) {
auto error = rpc::ErrorType::WORKER_DIED;
for (int i = 0; i < num_retries; i++) {
RAY_LOG(INFO) << "Retry " << i;
manager_.PendingTaskFailed(spec.TaskId(), error);
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3);
@@ -174,7 +167,7 @@ TEST_F(TaskManagerTest, TestTaskRetry) {
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 1);
std::vector<std::shared_ptr<RayObject>> results;
RAY_CHECK_OK(store_->Get({return_id}, 1, 0, ctx, false, &results));
RAY_CHECK_OK(store_->Get({return_id}, 1, -0, ctx, false, &results));
ASSERT_EQ(results.size(), 1);
rpc::ErrorType stored_error;
ASSERT_TRUE(results[0]->IsException(&stored_error));
@@ -187,240 +180,6 @@ TEST_F(TaskManagerTest, TestTaskRetry) {
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
}
// Test to make sure that the task spec and dependencies for an object are
// evicted when lineage pinning is disabled in the ReferenceCounter.
TEST_F(TaskManagerTest, TestLineageEvicted) {
TaskID caller_id = TaskID::Nil();
rpc::Address caller_address;
ObjectID dep1 = ObjectID::FromRandom();
ObjectID dep2 = ObjectID::FromRandom();
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
auto spec = CreateTaskHelper(1, {dep1, dep2});
int num_retries = 3;
manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT);
rpc::PushTaskReply reply;
auto return_object = reply.add_return_objects();
return_object->set_object_id(return_id.Binary());
return_object->set_in_plasma(true);
manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address());
// The task is still pinned because its return ID is still in scope.
ASSERT_TRUE(manager_.IsTaskSubmissible(spec.TaskId()));
ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId()));
// The dependencies should not be pinned because lineage pinning is
// disabled.
ASSERT_FALSE(reference_counter_->HasReference(dep1));
ASSERT_FALSE(reference_counter_->HasReference(dep2));
ASSERT_TRUE(reference_counter_->HasReference(return_id));
// Once the return ID goes out of scope, the task spec and its dependencies
// are released.
reference_counter_->AddLocalReference(return_id, "");
reference_counter_->RemoveLocalReference(return_id, nullptr);
ASSERT_FALSE(manager_.IsTaskSubmissible(spec.TaskId()));
ASSERT_FALSE(reference_counter_->HasReference(return_id));
}
// Test to make sure that the task spec and dependencies for an object are
// pinned when lineage pinning is enabled in the ReferenceCounter.
TEST_F(TaskManagerLineageTest, TestLineagePinned) {
TaskID caller_id = TaskID::Nil();
rpc::Address caller_address;
// Submit a task with 2 arguments.
ObjectID dep1 = ObjectID::FromRandom();
ObjectID dep2 = ObjectID::FromRandom();
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
auto spec = CreateTaskHelper(1, {dep1, dep2});
ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId()));
int num_retries = 3;
manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT);
reference_counter_->AddLocalReference(return_id, "");
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3);
// The task completes.
rpc::PushTaskReply reply;
auto return_object = reply.add_return_objects();
return_object->set_object_id(return_id.Binary());
auto data = GenerateRandomBuffer();
return_object->set_data(data->Data(), data->Size());
return_object->set_in_plasma(true);
manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address());
// The task should still be in the lineage because its return ID is in scope.
ASSERT_TRUE(manager_.IsTaskSubmissible(spec.TaskId()));
ASSERT_TRUE(reference_counter_->HasReference(dep1));
ASSERT_TRUE(reference_counter_->HasReference(dep2));
ASSERT_TRUE(reference_counter_->HasReference(return_id));
// All lineage should be erased.
reference_counter_->RemoveLocalReference(return_id, nullptr);
ASSERT_FALSE(manager_.IsTaskSubmissible(spec.TaskId()));
ASSERT_FALSE(reference_counter_->HasReference(dep1));
ASSERT_FALSE(reference_counter_->HasReference(dep2));
ASSERT_FALSE(reference_counter_->HasReference(return_id));
}
// Test to make sure that the task spec and dependencies for an object are
// evicted if the object is returned by value, instead of stored in plasma.
TEST_F(TaskManagerLineageTest, TestDirectObjectNoLineage) {
TaskID caller_id = TaskID::Nil();
rpc::Address caller_address;
// Submit a task with 2 arguments.
ObjectID dep1 = ObjectID::FromRandom();
ObjectID dep2 = ObjectID::FromRandom();
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
auto spec = CreateTaskHelper(1, {dep1, dep2});
ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId()));
int num_retries = 3;
manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT);
reference_counter_->AddLocalReference(return_id, "");
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3);
// The task completes.
rpc::PushTaskReply reply;
auto return_object = reply.add_return_objects();
return_object->set_object_id(return_id.Binary());
auto data = GenerateRandomBuffer();
return_object->set_data(data->Data(), data->Size());
return_object->set_in_plasma(false);
manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address());
// All lineage should be erased because the return object was not stored in
// plasma.
ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId()));
ASSERT_FALSE(reference_counter_->HasReference(dep1));
ASSERT_FALSE(reference_counter_->HasReference(dep2));
ASSERT_TRUE(reference_counter_->HasReference(return_id));
}
// Test to make sure that the task spec and dependencies for an object are
// pinned if the object goes out of scope before the task finishes. This is
// needed in case the pending task fails and needs to be retried.
TEST_F(TaskManagerLineageTest, TestLineagePinnedOutOfOrder) {
TaskID caller_id = TaskID::Nil();
rpc::Address caller_address;
// Submit a task with 2 arguments.
ObjectID dep1 = ObjectID::FromRandom();
ObjectID dep2 = ObjectID::FromRandom();
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
auto spec = CreateTaskHelper(1, {dep1, dep2});
ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId()));
int num_retries = 3;
manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT);
reference_counter_->AddLocalReference(return_id, "");
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3);
// The return ID goes out of scope. The lineage should still be pinned
// because the task has not completed yet.
reference_counter_->RemoveLocalReference(return_id, nullptr);
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
ASSERT_TRUE(reference_counter_->HasReference(dep1));
ASSERT_TRUE(reference_counter_->HasReference(dep2));
ASSERT_FALSE(reference_counter_->HasReference(return_id));
// The task completes.
rpc::PushTaskReply reply;
auto return_object = reply.add_return_objects();
return_object->set_object_id(return_id.Binary());
auto data = GenerateRandomBuffer();
return_object->set_data(data->Data(), data->Size());
return_object->set_in_plasma(true);
manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address());
// All lineage should be erased.
ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId()));
ASSERT_FALSE(reference_counter_->HasReference(dep1));
ASSERT_FALSE(reference_counter_->HasReference(dep2));
ASSERT_FALSE(reference_counter_->HasReference(return_id));
}
// Test for pinning the lineage of an object, where the lineage is a chain of
// tasks that each depend on the previous. All tasks should be pinned until the
// final object goes out of scope.
TEST_F(TaskManagerLineageTest, TestRecursiveLineagePinned) {
TaskID caller_id = TaskID::Nil();
rpc::Address caller_address;
ObjectID dep = ObjectID::FromRandom();
reference_counter_->AddLocalReference(dep, "");
for (int i = 0; i < 3; i++) {
auto spec = CreateTaskHelper(1, {dep});
int num_retries = 3;
manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT);
reference_counter_->AddLocalReference(return_id, "");
// The task completes.
rpc::PushTaskReply reply;
auto return_object = reply.add_return_objects();
return_object->set_object_id(return_id.Binary());
auto data = GenerateRandomBuffer();
return_object->set_data(data->Data(), data->Size());
return_object->set_in_plasma(true);
manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address());
// All tasks should be pinned in the lineage.
ASSERT_EQ(manager_.NumSubmissibleTasks(), i + 1);
// All objects in the lineage of the newest return ID, plus the return ID
// itself, should be pinned.
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), i + 2);
reference_counter_->RemoveLocalReference(dep, nullptr);
dep = return_id;
}
// The task's return ID goes out of scope before the task finishes.
reference_counter_->RemoveLocalReference(dep, nullptr);
ASSERT_EQ(manager_.NumSubmissibleTasks(), 0);
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
}
// Test for evicting the lineage of an object passed by value, where the
// lineage is a chain of tasks that each depend on the previous and each return
// a direct value. All tasks should be evicted as soon as they complete, even
// though the final object is still in scope.
TEST_F(TaskManagerLineageTest, TestRecursiveDirectObjectNoLineage) {
TaskID caller_id = TaskID::Nil();
rpc::Address caller_address;
ObjectID dep = ObjectID::FromRandom();
reference_counter_->AddLocalReference(dep, "");
for (int i = 0; i < 3; i++) {
auto spec = CreateTaskHelper(1, {dep});
int num_retries = 3;
manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT);
reference_counter_->AddLocalReference(return_id, "");
// The task completes.
rpc::PushTaskReply reply;
auto return_object = reply.add_return_objects();
return_object->set_object_id(return_id.Binary());
auto data = GenerateRandomBuffer();
return_object->set_data(data->Data(), data->Size());
return_object->set_in_plasma(false);
manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address());
// No tasks should be pinned because they returned direct objects.
ASSERT_EQ(manager_.NumSubmissibleTasks(), 0);
// Only the dependency and the newest return ID should be in scope because
// all objects in the lineage were direct.
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 2);
reference_counter_->RemoveLocalReference(dep, nullptr);
dep = return_id;
}
// The task's return ID goes out of scope before the task finishes.
reference_counter_->RemoveLocalReference(dep, nullptr);
ASSERT_EQ(manager_.NumSubmissibleTasks(), 0);
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
}
} // namespace ray
int main(int argc, char **argv) {