diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 26c2f515a..a424fc3e3 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -154,6 +154,17 @@ def test_background_tasks_with_max_calls(shutdown_only): # wait for g to finish before exiting. ray.get([x[0] for x in nested]) + @ray.remote(max_calls=1, max_retries=0) + def f(): + return os.getpid(), g.remote() + + nested = ray.get([f.remote() for _ in range(10)]) + while nested: + pid, g_id = nested.pop(0) + ray.get(g_id) + del g_id + ray.test_utils.wait_for_pid_to_exit(pid) + def test_fair_queueing(shutdown_only): ray.init( diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 171a36c1d..ca91c50da 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -1014,38 +1014,6 @@ def test_eviction(ray_start_cluster): ray.get(dependent_task.remote(obj)) -@pytest.mark.parametrize( - "ray_start_cluster", [{ - "num_nodes": 1, - "num_cpus": 2, - }, { - "num_nodes": 2, - "num_cpus": 1, - }], - indirect=True) -def test_serialized_id_eviction(ray_start_cluster): - @ray.remote - def large_object(): - return np.zeros(10 * 1024 * 1024) - - @ray.remote - def get(obj_ids): - obj_id = obj_ids[0] - assert (isinstance(ray.get(obj_id), np.ndarray)) - # Wait for the object to be evicted. - ray.internal.free(obj_id) - while ray.worker.global_worker.core_worker.object_exists(obj_id): - time.sleep(1) - with pytest.raises(ray.exceptions.UnreconstructableError): - ray.get(obj_id) - print("get done", obj_ids) - - obj = large_object.remote() - result = get.remote([obj]) - ray.internal.free(obj) - ray.get(result) - - @pytest.mark.parametrize( "ray_start_cluster", [{ "num_nodes": 2, diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 7b6ada54c..fd035e4ec 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -101,6 +101,8 @@ RAY_CONFIG(int64_t, free_objects_period_milliseconds, 1000) /// to -1. RAY_CONFIG(size_t, free_objects_batch_size, 100) +RAY_CONFIG(bool, lineage_pinning_enabled, false) + /// Whether to enable the new scheduler. The new scheduler is designed /// only to work with direct calls. Once direct calls afre becoming /// the default, this scheduler will also become the default. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index cadb5c42f..279edb9ec 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -174,7 +174,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, reference_counter_ = std::make_shared( rpc_address_, RayConfig::instance().distributed_ref_counting_enabled(), - [this](const rpc::Address &addr) { + RayConfig::instance().lineage_pinning_enabled(), [this](const rpc::Address &addr) { return std::shared_ptr( new rpc::CoreWorkerClient(addr, *client_call_manager_)); }); @@ -311,25 +311,32 @@ void CoreWorker::Exit(bool intentional) { // Callback to drain objects once all pending tasks have been drained. auto drain_references_callback = [this, shutdown]() { - bool not_actor_task = false; - { - absl::MutexLock lock(&mutex_); - not_actor_task = actor_id_.IsNil(); - } - if (not_actor_task) { - // If we are a task, then we cannot hold any object references in the - // heap. Therefore, any active object references are being held by other - // processes. Wait for these processes to release their references before - // we shutdown. - // NOTE(swang): This could still cause this worker process to stay alive - // forever if another process holds a reference forever. - reference_counter_->DrainAndShutdown(shutdown); - } else { - // If we are an actor, then we may be holding object references in the - // heap. Then, we should not wait to drain the object references before - // shutdown since this could hang. - shutdown(); - } + // Post to the event loop to avoid a deadlock between the TaskManager and + // the ReferenceCounter. The deadlock can occur because this callback may + // get called by the TaskManager while the ReferenceCounter's lock is held, + // but the callback itself must acquire the ReferenceCounter's lock to + // drain the object references. + task_execution_service_.post([this, shutdown]() { + bool not_actor_task = false; + { + absl::MutexLock lock(&mutex_); + not_actor_task = actor_id_.IsNil(); + } + if (not_actor_task) { + // If we are a task, then we cannot hold any object references in the + // heap. Therefore, any active object references are being held by other + // processes. Wait for these processes to release their references before + // we shutdown. + // NOTE(swang): This could still cause this worker process to stay alive + // forever if another process holds a reference forever. + reference_counter_->DrainAndShutdown(shutdown); + } else { + // If we are an actor, then we may be holding object references in the + // heap. Then, we should not wait to drain the object references before + // shutdown since this could hang. + shutdown(); + } + }); }; task_manager_->DrainAndShutdown(drain_references_callback); @@ -486,7 +493,8 @@ Status CoreWorker::Put(const RayObject &object, RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id)); } } - return memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id); + RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id)); + return Status::OK(); } Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t data_size, @@ -530,7 +538,8 @@ Status CoreWorker::Seal(const ObjectID &object_id, bool pin_object, } else { RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id)); } - return memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id); + RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id)); + return Status::OK(); } Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_ms, @@ -1203,8 +1212,8 @@ Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task, // We need to put an OBJECT_IN_PLASMA error here so the subsequent call to Get() // properly redirects to the plasma store. if (task.ArgId(i, 0).IsDirectCallType()) { - RAY_CHECK_OK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), - task.ArgId(i, 0))); + RAY_UNUSED(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), + task.ArgId(i, 0))); } const auto &arg_id = task.ArgId(i, 0); by_ref_ids.insert(arg_id); diff --git a/src/ray/core_worker/future_resolver.cc b/src/ray/core_worker/future_resolver.cc index f26b34c69..22bb7ca0e 100644 --- a/src/ray/core_worker/future_resolver.cc +++ b/src/ray/core_worker/future_resolver.cc @@ -40,8 +40,8 @@ void FutureResolver::ResolveFutureAsync(const ObjectID &object_id, const TaskID // Either the owner is gone or the owner replied that the object has // been created. In both cases, we can now try to fetch the object via // plasma. - RAY_CHECK_OK(in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), - object_id)); + RAY_UNUSED(in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), + object_id)); })); } diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 3b88de9f3..b3b86dae3 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -23,7 +23,8 @@ << (it->second.contained_in_borrowed_id.has_value() \ ? *it->second.contained_in_borrowed_id \ : ObjectID::Nil()) \ - << " contains: " << it->second.contains.size(); + << " contains: " << it->second.contains.size() \ + << " lineage_ref_count: " << it->second.lineage_ref_count; namespace {} // namespace @@ -37,8 +38,8 @@ void ReferenceCounter::DrainAndShutdown(std::function shutdown) { RAY_LOG(WARNING) << "This worker is still managing " << object_id_refs_.size() << " objects, waiting for them to go out of scope before shutting down."; + shutdown_hook_ = shutdown; } - shutdown_hook_ = shutdown; } void ReferenceCounter::ShutdownIfNeeded() { @@ -215,6 +216,7 @@ void ReferenceCounter::UpdateSubmittedTaskReferences( const std::vector &argument_ids_to_remove, std::vector *deleted) { absl::MutexLock lock(&mutex_); for (const ObjectID &argument_id : argument_ids_to_add) { + RAY_LOG(DEBUG) << "Increment ref count for submitted task argument " << argument_id; auto it = object_id_refs_.find(argument_id); if (it == object_id_refs_.end()) { // This happens if a large argument is transparently passed by reference @@ -222,13 +224,20 @@ void ReferenceCounter::UpdateSubmittedTaskReferences( it = object_id_refs_.emplace(argument_id, Reference()).first; } it->second.submitted_task_ref_count++; + // The lineage ref will get released once the task finishes and cannot be + // retried again. + it->second.lineage_ref_count++; } - RemoveSubmittedTaskReferences(argument_ids_to_remove, deleted); + // Release the submitted task ref and the lineage ref for any argument IDs + // whose values were inlined. + RemoveSubmittedTaskReferences(argument_ids_to_remove, /*release_lineage=*/true, + deleted); } void ReferenceCounter::UpdateFinishedTaskReferences( - const std::vector &argument_ids, const rpc::Address &worker_addr, - const ReferenceTableProto &borrowed_refs, std::vector *deleted) { + const std::vector &argument_ids, bool release_lineage, + const rpc::Address &worker_addr, const ReferenceTableProto &borrowed_refs, + std::vector *deleted) { absl::MutexLock lock(&mutex_); // Must merge the borrower refs before decrementing any ref counts. This is // to make sure that for serialized IDs, we increment the borrower count for @@ -242,19 +251,63 @@ void ReferenceCounter::UpdateFinishedTaskReferences( MergeRemoteBorrowers(argument_id, worker_addr, refs); } - RemoveSubmittedTaskReferences(argument_ids, deleted); + RemoveSubmittedTaskReferences(argument_ids, release_lineage, deleted); +} + +void ReferenceCounter::ReleaseLineageReferences( + const std::vector &argument_ids) { + absl::MutexLock lock(&mutex_); + ReleaseLineageReferencesInternal(argument_ids); +} + +void ReferenceCounter::ReleaseLineageReferencesInternal( + const std::vector &argument_ids) { + for (const ObjectID &argument_id : argument_ids) { + auto it = object_id_refs_.find(argument_id); + if (it == object_id_refs_.end()) { + // References can get evicted early when lineage pinning is disabled. + RAY_CHECK(!lineage_pinning_enabled_); + continue; + } + + if (it->second.lineage_ref_count == 0) { + // References can get evicted early when lineage pinning is disabled. + RAY_CHECK(!lineage_pinning_enabled_); + continue; + } + + RAY_LOG(DEBUG) << "Releasing lineage internal for argument " << argument_id; + it->second.lineage_ref_count--; + if (it->second.lineage_ref_count == 0) { + // Don't have to pass in a deleted vector here because the reference + // cannot have gone out of scope here since we are only modifying the + // lineage ref count. + DeleteReferenceInternal(it, nullptr); + } + } } void ReferenceCounter::RemoveSubmittedTaskReferences( - const std::vector &argument_ids, std::vector *deleted) { + const std::vector &argument_ids, bool release_lineage, + std::vector *deleted) { for (const ObjectID &argument_id : argument_ids) { + RAY_LOG(DEBUG) << "Releasing ref for submitted task argument " << argument_id; auto it = object_id_refs_.find(argument_id); if (it == object_id_refs_.end()) { RAY_LOG(WARNING) << "Tried to decrease ref count for nonexistent object ID: " << argument_id; return; } + RAY_CHECK(it->second.submitted_task_ref_count > 0); it->second.submitted_task_ref_count--; + if (release_lineage) { + if (it->second.lineage_ref_count > 0) { + it->second.lineage_ref_count--; + } else { + // References can get evicted early when lineage pinning is disabled. + RAY_CHECK(!lineage_pinning_enabled_); + } + } if (it->second.RefCount() == 0) { DeleteReferenceInternal(it, deleted); } @@ -287,7 +340,7 @@ void ReferenceCounter::DeleteReferences(const std::vector &object_ids) } it->second.local_ref_count = 0; it->second.submitted_task_ref_count = 0; - if (distributed_ref_counting_enabled_ && !it->second.CanDelete()) { + if (distributed_ref_counting_enabled_ && !it->second.OutOfScope()) { RAY_LOG(ERROR) << "ray.internal.free does not currently work for objects that are still in " "scope when distributed reference " @@ -311,8 +364,6 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it, // Whether it is safe to unpin the value. bool should_delete_value = false; - // Whether it is safe to delete the Reference. - bool should_delete_reference = false; // If distributed ref counting is not enabled, then delete the object as soon // as its local ref count goes to 0. @@ -322,11 +373,10 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it, should_delete_value = true; } - if (it->second.CanDelete()) { + if (it->second.OutOfScope()) { // If distributed ref counting is enabled, then delete the object once its // ref count across all processes is 0. should_delete_value = true; - should_delete_reference = true; for (const auto &inner_id : it->second.contains) { auto inner_it = object_id_refs_.find(inner_id); if (inner_it != object_id_refs_.end()) { @@ -359,8 +409,16 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it, deleted->push_back(id); } } - if (should_delete_reference) { + if (it->second.ShouldDelete(lineage_pinning_enabled_)) { RAY_LOG(DEBUG) << "Deleting Reference to object " << id; + // TODO(swang): Update lineage_ref_count for nested objects? + if (on_lineage_released_ && it->second.owned_by_us) { + RAY_LOG(DEBUG) << "Releasing lineage for object " << id; + std::vector ids_to_release; + on_lineage_released_(id, &ids_to_release); + ReleaseLineageReferencesInternal(ids_to_release); + } + object_id_refs_.erase(it); ShutdownIfNeeded(); } @@ -662,7 +720,7 @@ void ReferenceCounter::HandleRefRemoved(const ObjectID &object_id, // the object was zero. Also, we should have stripped all distributed ref // count information and returned it to the owner. Therefore, it should be // okay to delete the object, if it wasn't already deleted. - RAY_CHECK(it->second.CanDelete()); + RAY_CHECK(it->second.OutOfScope()); } // Send the owner information about any new borrowers. ReferenceTableToProto(borrowed_refs, reply->mutable_borrowed_refs()); @@ -716,6 +774,12 @@ void ReferenceCounter::SetRefRemovedCallback( } } +void ReferenceCounter::SetReleaseLineageCallback( + const LineageReleasedCallback &callback) { + RAY_CHECK(on_lineage_released_ == nullptr); + on_lineage_released_ = callback; +} + ReferenceCounter::Reference ReferenceCounter::Reference::FromProto( const rpc::ObjectReferenceCount &ref_count) { Reference ref; diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index 97957f909..30a9fcb11 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -36,12 +36,16 @@ class ReferenceCounter { using ReferenceTableProto = ::google::protobuf::RepeatedPtrField; using ReferenceRemovedCallback = std::function; + using LineageReleasedCallback = + std::function *)>; ReferenceCounter(const rpc::WorkerAddress &rpc_address, bool distributed_ref_counting_enabled = true, + bool lineage_pinning_enabled = false, rpc::ClientFactoryFn client_factory = nullptr) : rpc_address_(rpc_address), distributed_ref_counting_enabled_(distributed_ref_counting_enabled), + lineage_pinning_enabled_(lineage_pinning_enabled), client_factory_(client_factory) {} ~ReferenceCounter() {} @@ -67,9 +71,16 @@ class ReferenceCounter { LOCKS_EXCLUDED(mutex_); /// Add references for the provided object IDs that correspond to them being - /// dependencies to a submitted task. + /// dependencies to a submitted task. If lineage pinning is enabled, then + /// this will also pin the Reference entry for each new argument until the + /// argument's lineage ref is released. /// - /// \param[in] object_ids The object IDs to add references for. + /// \param[in] argument_ids_to_add The arguments of the task to add + /// references for. + /// \param[out] argument_ids_to_remove The arguments of the task to remove + /// references for. + /// \param[out] deleted Any objects that are newly out of scope after this + /// function call. void UpdateSubmittedTaskReferences( const std::vector &argument_ids_to_add, const std::vector &argument_ids_to_remove = std::vector(), @@ -81,6 +92,8 @@ class ReferenceCounter { /// when the task finishes for plasma dependencies. /// /// \param[in] object_ids The object IDs to remove references for. + /// \param[in] release_lineage Whether to decrement the arguments' lineage + /// ref count. /// \param[in] worker_addr The address of the worker that executed the task. /// \param[in] borrowed_refs The references that the worker borrowed during /// the task. This table includes all task arguments that were passed by @@ -89,11 +102,22 @@ class ReferenceCounter { /// worker and/or a task that the worker submitted. /// \param[out] deleted The object IDs whos reference counts reached zero. void UpdateFinishedTaskReferences(const std::vector &argument_ids, - const rpc::Address &worker_addr, + bool release_lineage, const rpc::Address &worker_addr, const ReferenceTableProto &borrowed_refs, std::vector *deleted) LOCKS_EXCLUDED(mutex_); + /// Release the lineage ref count for this list of object IDs. An object's + /// lineage ref count is the number of tasks that depend on the object that + /// may be retried in the future (pending execution or finished but + /// retryable). If the object is direct (not stored in plasma), then its + /// lineage ref count is 0. + /// + /// \param[in] argument_ids The list of objects whose lineage ref counts we + /// should decrement. + void ReleaseLineageReferences(const std::vector &argument_ids) + LOCKS_EXCLUDED(mutex_); + /// Add an object that we own. The object may depend on other objects. /// Dependencies for each ObjectID must be set at most once. The local /// reference count for the ObjectID is set to zero, which assumes that an @@ -171,6 +195,15 @@ class ReferenceCounter { const ReferenceRemovedCallback &ref_removed_callback) LOCKS_EXCLUDED(mutex_); + /// Set a callback to call whenever a Reference that we own is deleted. A + /// Reference can only be deleted if: + /// 1. The ObjectID's ref count is 0 on all workers. + /// 2. There are no tasks that depend on the object that may be retried in + /// the future. + /// + /// \param[in] callback The callback to call. + void SetReleaseLineageCallback(const LineageReleasedCallback &callback); + /// Respond to the object's owner once we are no longer borrowing it. The /// sender is the owner of the object ID. We will send the reply when our /// RefCount() for the object ID goes to 0. @@ -276,13 +309,13 @@ class ReferenceCounter { return local_ref_count + submitted_task_ref_count + contained_in_owned.size(); } - /// Whether we can delete this reference. A reference can NOT be deleted if - /// any of the following are true: + /// Whether this reference is no longer in scope. A reference is in scope + /// if any of the following are true: /// - The reference is still being used by this process. /// - The reference was contained in another ID that we were borrowing, and /// we haven't told the process that gave us that ID yet. /// - We gave the reference to at least one other process. - bool CanDelete() const { + bool OutOfScope() const { bool in_scope = RefCount() > 0; bool was_contained_in_borrowed_id = contained_in_borrowed_id.has_value(); bool has_borrowers = borrowers.size() > 0; @@ -291,6 +324,19 @@ class ReferenceCounter { was_stored_in_objects); } + /// Whether the Reference can be deleted. A Reference can only be deleted + /// if: + /// 1. The ObjectID's ref count is 0 on all workers. + /// 2. If lineage pinning is enabled, there are no tasks that depend on + /// the object that may be retried in the future. + bool ShouldDelete(bool lineage_pinning_enabled) const { + if (lineage_pinning_enabled) { + return OutOfScope() && (lineage_ref_count == 0); + } else { + return OutOfScope(); + } + } + /// Description of the call site where the reference was created. std::string call_site = ""; /// Object size if known, otherwise -1; @@ -359,6 +405,11 @@ class ReferenceCounter { /// task's caller is also a borrower. The key is the task's return ID, and /// the value is the task ID and address of the task's caller. absl::flat_hash_map stored_in_objects; + /// The number of tasks that depend on this object that may be retried in + /// the future (pending execution or finished but retryable). If the object + /// is inlined (not stored in plasma), then its lineage ref count is 0 + /// because any dependent task will already have the value of the object. + size_t lineage_ref_count = 0; /// Callback that will be called when this ObjectID no longer has /// references. @@ -386,7 +437,7 @@ class ReferenceCounter { /// inlined dependencies are inlined or when the task finishes for plasma /// dependencies. void RemoveSubmittedTaskReferences(const std::vector &argument_ids, - std::vector *deleted) + bool release_lineage, std::vector *deleted) EXCLUSIVE_LOCKS_REQUIRED(mutex_); /// Helper method to mark that this ObjectID contains another ObjectID(s). @@ -472,6 +523,10 @@ class ReferenceCounter { std::vector *deleted) EXCLUSIVE_LOCKS_REQUIRED(mutex_); + /// Helper method to decrement the lineage ref count for a list of objects. + void ReleaseLineageReferencesInternal(const std::vector &argument_ids) + EXCLUSIVE_LOCKS_REQUIRED(mutex_); + /// Address of our RPC server. This is used to determine whether we own a /// given object or not, by comparing our WorkerID with the WorkerID of the /// object's owner. @@ -480,7 +535,13 @@ class ReferenceCounter { /// Feature flag for distributed ref counting. If this is false, then we will /// keep the distributed ref count, but only the local ref count will be used /// to decide when objects can be evicted. - bool distributed_ref_counting_enabled_; + const bool distributed_ref_counting_enabled_; + + /// Feature flag for lineage pinning. If this is false, then we will keep the + /// lineage ref count, but this will not be used to decide when the object's + /// Reference can be deleted. The object's lineage ref count is the number of + /// tasks that depend on that object that may be retried in the future. + const bool lineage_pinning_enabled_; /// Factory for producing new core worker clients. rpc::ClientFactoryFn client_factory_; @@ -497,6 +558,10 @@ class ReferenceCounter { /// Holds all reference counts and dependency information for tracked ObjectIDs. ReferenceTable object_id_refs_ GUARDED_BY(mutex_); + /// The callback to call once an object ID that we own is no longer in scope + /// and it has no tasks that depend on it that may be retried in the future. + /// The object's Reference will be erased after this callback. + LineageReleasedCallback on_lineage_released_; /// Optional shutdown hook to call when all references have gone /// out of scope. std::function shutdown_hook_ GUARDED_BY(mutex_) = nullptr; diff --git a/src/ray/core_worker/reference_count_test.cc b/src/ray/core_worker/reference_count_test.cc index 9a4244647..0e275444e 100644 --- a/src/ray/core_worker/reference_count_test.cc +++ b/src/ray/core_worker/reference_count_test.cc @@ -36,6 +36,20 @@ class ReferenceCountTest : public ::testing::Test { virtual void TearDown() {} }; +class ReferenceCountLineageEnabledTest : public ::testing::Test { + protected: + std::unique_ptr rc; + virtual void SetUp() { + rpc::Address addr; + rc = std::unique_ptr( + new ReferenceCounter(addr, + /*distributed_ref_counting_enabled=*/true, + /*lineage_pinning_enabled=*/true)); + } + + virtual void TearDown() {} +}; + class MockWorkerClient : public rpc::CoreWorkerClientInterface { public: // Helper function to generate a random address. @@ -50,8 +64,9 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { MockWorkerClient(const std::string &addr, rpc::ClientFactoryFn client_factory = nullptr) : task_id_(TaskID::ForFakeTask()), address_(CreateRandomAddress(addr)), - rc_(rpc::WorkerAddress(address_), /*distributed_ref_counting_enabled=*/true, - client_factory) {} + rc_(rpc::WorkerAddress(address_), + /*distributed_ref_counting_enabled=*/true, + /*lineage_pinning_enabled=*/false, client_factory) {} ray::Status WaitForRefRemoved( const rpc::WaitForRefRemovedRequest &request, @@ -156,7 +171,8 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { if (!arg_id.IsNil()) { arguments.push_back(arg_id); } - rc_.UpdateFinishedTaskReferences(arguments, borrower_address, borrower_refs, nullptr); + rc_.UpdateFinishedTaskReferences(arguments, false, borrower_address, borrower_refs, + nullptr); } // Global map from Worker ID -> MockWorkerClient. @@ -202,13 +218,13 @@ TEST_F(ReferenceCountTest, TestBasic) { rc->UpdateSubmittedTaskReferences({id1}); rc->UpdateSubmittedTaskReferences({id1, id2}); ASSERT_EQ(rc->NumObjectIDsInScope(), 2); - rc->UpdateFinishedTaskReferences({id1}, empty_borrower, empty_refs, &out); + rc->UpdateFinishedTaskReferences({id1}, false, empty_borrower, empty_refs, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 2); ASSERT_EQ(out.size(), 0); - rc->UpdateFinishedTaskReferences({id2}, empty_borrower, empty_refs, &out); + rc->UpdateFinishedTaskReferences({id2}, false, empty_borrower, empty_refs, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 1); ASSERT_EQ(out.size(), 1); - rc->UpdateFinishedTaskReferences({id1}, empty_borrower, empty_refs, &out); + rc->UpdateFinishedTaskReferences({id1}, false, empty_borrower, empty_refs, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 0); ASSERT_EQ(out.size(), 2); out.clear(); @@ -221,16 +237,26 @@ TEST_F(ReferenceCountTest, TestBasic) { rc->RemoveLocalReference(id1, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 2); ASSERT_EQ(out.size(), 0); - rc->UpdateFinishedTaskReferences({id2}, empty_borrower, empty_refs, &out); + rc->UpdateFinishedTaskReferences({id2}, false, empty_borrower, empty_refs, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 2); ASSERT_EQ(out.size(), 0); - rc->UpdateFinishedTaskReferences({id1}, empty_borrower, empty_refs, &out); + rc->UpdateFinishedTaskReferences({id1}, false, empty_borrower, empty_refs, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 1); ASSERT_EQ(out.size(), 1); rc->RemoveLocalReference(id2, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 0); ASSERT_EQ(out.size(), 2); out.clear(); + + // Submitted task with inlined references. + rc->UpdateSubmittedTaskReferences({id1}); + rc->UpdateSubmittedTaskReferences({id2}, {id1}, &out); + ASSERT_EQ(rc->NumObjectIDsInScope(), 1); + ASSERT_EQ(out.size(), 1); + rc->UpdateSubmittedTaskReferences({}, {id2}, &out); + ASSERT_EQ(rc->NumObjectIDsInScope(), 0); + ASSERT_EQ(out.size(), 2); + out.clear(); } // Tests call site tracking and ability to update object size. @@ -306,12 +332,12 @@ TEST(MemoryStoreIntegrationTest, TestSimple) { CoreWorkerMemoryStore store(nullptr, rc); // Tests putting an object with no references is ignored. - RAY_CHECK_OK(store.Put(buffer, id2)); + RAY_CHECK(store.Put(buffer, id2)); ASSERT_EQ(store.Size(), 0); // Tests ref counting overrides remove after get option. rc->AddLocalReference(id1, ""); - RAY_CHECK_OK(store.Put(buffer, id1)); + RAY_CHECK(store.Put(buffer, id1)); ASSERT_EQ(store.Size(), 1); std::vector> results; WorkerContext ctx(WorkerType::WORKER, JobID::Nil()); @@ -1707,6 +1733,88 @@ TEST(DistributedReferenceCountTest, TestReturnBorrowedIdChainOutOfOrder) { // TODO: Test Pop and Merge individually. +// Test to make sure that we call the lineage released callback correctly. +TEST_F(ReferenceCountLineageEnabledTest, TestBasicLineage) { + std::vector out; + std::vector lineage_deleted; + + ObjectID id = ObjectID::FromRandom(); + + rc->SetReleaseLineageCallback( + [&](const ObjectID &object_id, std::vector *ids_to_release) { + lineage_deleted.push_back(object_id); + }); + + // We should not keep lineage for borrowed objects. + rc->AddLocalReference(id, ""); + ASSERT_TRUE(rc->HasReference(id)); + rc->RemoveLocalReference(id, nullptr); + ASSERT_TRUE(lineage_deleted.empty()); + + // We should keep lineage for owned objects. + rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0); + rc->AddLocalReference(id, ""); + ASSERT_TRUE(rc->HasReference(id)); + rc->RemoveLocalReference(id, nullptr); + ASSERT_EQ(lineage_deleted.size(), 1); +} + +// Test for pinning the lineage of an object, where the lineage is a chain of +// tasks that each depend on the previous. The previous objects should already +// have gone out of scope, but their Reference entry is pinned until the final +// object goes out of scope. +TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) { + std::vector out; + std::vector lineage_deleted; + + std::vector ids; + for (int i = 0; i < 3; i++) { + ObjectID id = ObjectID::FromRandom(); + ids.push_back(id); + rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0); + } + + rc->SetReleaseLineageCallback( + [&](const ObjectID &object_id, std::vector *ids_to_release) { + lineage_deleted.push_back(object_id); + // Simulate releasing objects in downstream_id's lineage. + size_t i = 0; + for (; i < ids.size(); i++) { + if (ids[i] == object_id) { + break; + } + } + RAY_CHECK(i < ids.size()); + if (i > 0) { + ids_to_release->push_back(ids[i - 1]); + } + }); + + for (size_t i = 0; i < ids.size() - 1; i++) { + auto id = ids[i]; + // Submit a dependent task on id. + rc->AddLocalReference(id, ""); + ASSERT_TRUE(rc->HasReference(id)); + rc->UpdateSubmittedTaskReferences({id}); + rc->RemoveLocalReference(id, nullptr); + + // The task finishes but is retryable. + rc->UpdateFinishedTaskReferences({id}, false, empty_borrower, empty_refs, &out); + ASSERT_EQ(out.size(), 1); + out.clear(); + ASSERT_TRUE(lineage_deleted.empty()); + ASSERT_TRUE(rc->HasReference(id)); + } + + // The task return ID goes out of scope. + rc->AddLocalReference(ids.back(), ""); + rc->RemoveLocalReference(ids.back(), nullptr); + // The removal of the last return ID should recursively delete all + // references. + ASSERT_EQ(lineage_deleted.size(), ids.size()); + ASSERT_EQ(rc->NumObjectIDsInScope(), 0); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index 9391f5eb6..9260c9199 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -157,10 +157,11 @@ std::shared_ptr CoreWorkerMemoryStore::GetOrPromoteToPlasma( return nullptr; } -Status CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_id) { +bool CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_id) { std::vector)>> async_callbacks; auto object_entry = std::make_shared(object.GetData(), object.GetMetadata(), object.GetNestedIds(), true); + bool stored_in_direct_memory = true; // TODO(edoakes): we should instead return a flag to the caller to put the object in // plasma. @@ -170,7 +171,7 @@ Status CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &objec auto iter = objects_.find(object_id); if (iter != objects_.end()) { - return Status::OK(); // Object already exists in the store, which is fine. + return true; // Object already exists in the store, which is fine. } auto async_callback_it = object_async_get_requests_.find(object_id); @@ -217,6 +218,7 @@ Status CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &objec // in-memory store (would cause deadlock). if (should_put_in_plasma) { store_in_plasma_(object, object_id); + stored_in_direct_memory = false; } // It's important for performance to run the callbacks outside the lock. @@ -224,7 +226,7 @@ Status CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &objec cb(object_entry); } - return Status::OK(); + return stored_in_direct_memory; } Status CoreWorkerMemoryStore::Get(const std::vector &object_ids, diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.h b/src/ray/core_worker/store_provider/memory_store/memory_store.h index 4db25d835..7967fa6d9 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.h +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.h @@ -42,8 +42,9 @@ class CoreWorkerMemoryStore { /// /// \param[in] object The ray object. /// \param[in] object_id Object ID specified by user. - /// \return Status. - Status Put(const RayObject &object, const ObjectID &object_id); + /// \return Whether the object was put into the memory store. If false, then + /// this is because the object was promoted to and stored in plasma instead. + bool Put(const RayObject &object, const ObjectID &object_id); /// Get a list of objects from the object store. /// diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 7b369162a..0a7c9da3a 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -29,9 +29,6 @@ void TaskManager::AddPendingTask(const TaskID &caller_id, const TaskSpecification &spec, const std::string &call_site, int max_retries) { RAY_LOG(DEBUG) << "Adding pending task " << spec.TaskId(); - absl::MutexLock lock(&mu_); - std::pair entry = {spec, max_retries}; - RAY_CHECK(pending_tasks_.emplace(spec.TaskId(), std::move(entry)).second); // Add references for the dependencies to the task. std::vector task_deps; @@ -71,16 +68,24 @@ void TaskManager::AddPendingTask(const TaskID &caller_id, /*inner_ids=*/{}, caller_id, caller_address, call_site, -1); } + + { + absl::MutexLock lock(&mu_); + RAY_CHECK(submissible_tasks_ + .emplace(spec.TaskId(), TaskEntry(spec, max_retries, num_returns)) + .second); + num_pending_tasks_++; + } } void TaskManager::DrainAndShutdown(std::function shutdown) { bool has_pending_tasks = false; { absl::MutexLock lock(&mu_); - if (!pending_tasks_.empty()) { + if (num_pending_tasks_ > 0) { has_pending_tasks = true; RAY_LOG(WARNING) - << "This worker is still managing " << pending_tasks_.size() + << "This worker is still managing " << submissible_tasks_.size() << " in flight tasks, waiting for them to finish before shutting down."; shutdown_hook_ = shutdown; } @@ -92,27 +97,36 @@ void TaskManager::DrainAndShutdown(std::function shutdown) { } } +bool TaskManager::IsTaskSubmissible(const TaskID &task_id) const { + absl::MutexLock lock(&mu_); + return submissible_tasks_.count(task_id) > 0; +} + bool TaskManager::IsTaskPending(const TaskID &task_id) const { absl::MutexLock lock(&mu_); - return pending_tasks_.count(task_id) > 0; + const auto it = submissible_tasks_.find(task_id); + if (it == submissible_tasks_.end()) { + return false; + } + return it->second.pending; +} + +size_t TaskManager::NumSubmissibleTasks() const { + absl::MutexLock lock(&mu_); + return submissible_tasks_.size(); +} + +size_t TaskManager::NumPendingTasks() const { + absl::MutexLock lock(&mu_); + return num_pending_tasks_; } void TaskManager::CompletePendingTask(const TaskID &task_id, const rpc::PushTaskReply &reply, const rpc::Address &worker_addr) { RAY_LOG(DEBUG) << "Completing task " << task_id; - TaskSpecification spec; - { - absl::MutexLock lock(&mu_); - auto it = pending_tasks_.find(task_id); - RAY_CHECK(it != pending_tasks_.end()) - << "Tried to complete task that was not pending " << task_id; - spec = it->second.first; - pending_tasks_.erase(it); - } - - RemoveFinishedTaskReferences(spec, worker_addr, reply.borrowed_refs()); + std::vector direct_return_ids; for (int i = 0; i < reply.return_objects_size(); i++) { const auto &return_object = reply.return_objects(i); ObjectID object_id = ObjectID::FromBinary(return_object.object_id()); @@ -120,7 +134,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, if (return_object.in_plasma()) { // Mark it as in plasma with a dummy object. - RAY_CHECK_OK( + RAY_CHECK( in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id)); } else { std::shared_ptr data_buffer; @@ -137,13 +151,54 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, reinterpret_cast(return_object.metadata().data())), return_object.metadata().size()); } - RAY_CHECK_OK(in_memory_store_->Put( + bool stored_in_direct_memory = in_memory_store_->Put( RayObject(data_buffer, metadata_buffer, IdVectorFromProtobuf(return_object.nested_inlined_ids())), - object_id)); + object_id); + if (stored_in_direct_memory) { + direct_return_ids.push_back(object_id); + } } } + TaskSpecification spec; + bool release_lineage = true; + { + absl::MutexLock lock(&mu_); + auto it = submissible_tasks_.find(task_id); + RAY_CHECK(it != submissible_tasks_.end()) + << "Tried to complete task that was not pending " << task_id; + spec = it->second.spec; + + // Release the lineage for any non-plasma return objects. + for (const auto &direct_return_id : direct_return_ids) { + RAY_LOG(DEBUG) << "Task " << it->first << " returned direct object " + << direct_return_id << ", now has " + << it->second.reconstructable_return_ids.size() + << " plasma returns in scope"; + it->second.reconstructable_return_ids.erase(direct_return_id); + } + RAY_LOG(DEBUG) << "Task " << it->first << " now has " + << it->second.reconstructable_return_ids.size() + << " plasma returns in scope"; + it->second.pending = false; + num_pending_tasks_--; + + // A finished task can be only be re-executed if it has some number of + // retries left and returned at least one object that is still in use and + // stored in plasma. + bool task_retryable = + it->second.num_retries_left > 0 && !it->second.reconstructable_return_ids.empty(); + if (task_retryable) { + // Pin the task spec if it may be retried again. + release_lineage = false; + } else { + submissible_tasks_.erase(it); + } + } + + RemoveFinishedTaskReferences(spec, release_lineage, worker_addr, reply.borrowed_refs()); + ShutdownIfNeeded(); } @@ -155,18 +210,23 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_ << rpc::ErrorType_Name(error_type); int num_retries_left = 0; TaskSpecification spec; + bool release_lineage = true; { absl::MutexLock lock(&mu_); - auto it = pending_tasks_.find(task_id); - RAY_CHECK(it != pending_tasks_.end()) + auto it = submissible_tasks_.find(task_id); + RAY_CHECK(it != submissible_tasks_.end()) << "Tried to complete task that was not pending " << task_id; - spec = it->second.first; - num_retries_left = it->second.second; + RAY_CHECK(it->second.pending) + << "Tried to complete task that was not pending " << task_id; + spec = it->second.spec; + num_retries_left = it->second.num_retries_left; if (num_retries_left == 0) { - pending_tasks_.erase(it); + submissible_tasks_.erase(it); + num_pending_tasks_--; } else { - RAY_CHECK(num_retries_left > 0); - it->second.second--; + RAY_CHECK(it->second.num_retries_left > 0); + it->second.num_retries_left--; + release_lineage = false; } } @@ -199,7 +259,7 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_ } // The worker failed to execute the task, so it cannot be borrowing any // objects. - RemoveFinishedTaskReferences(spec, rpc::Address(), + RemoveFinishedTaskReferences(spec, release_lineage, rpc::Address(), ReferenceCounter::ReferenceTableProto()); MarkPendingTaskFailed(task_id, spec, error_type); } @@ -211,7 +271,7 @@ void TaskManager::ShutdownIfNeeded() { std::function shutdown_hook = nullptr; { absl::MutexLock lock(&mu_); - if (shutdown_hook_ && pending_tasks_.empty()) { + if (shutdown_hook_ && num_pending_tasks_ == 0) { RAY_LOG(WARNING) << "All in flight tasks finished, worker will shut down after " "draining references."; std::swap(shutdown_hook_, shutdown_hook); @@ -234,7 +294,7 @@ void TaskManager::OnTaskDependenciesInlined( } void TaskManager::RemoveFinishedTaskReferences( - TaskSpecification &spec, const rpc::Address &borrower_addr, + TaskSpecification &spec, bool release_lineage, const rpc::Address &borrower_addr, const ReferenceCounter::ReferenceTableProto &borrowed_refs) { std::vector plasma_dependencies; for (size_t i = 0; i < spec.NumArgs(); i++) { @@ -255,11 +315,51 @@ void TaskManager::RemoveFinishedTaskReferences( } std::vector deleted; - reference_counter_->UpdateFinishedTaskReferences(plasma_dependencies, borrower_addr, - borrowed_refs, &deleted); + reference_counter_->UpdateFinishedTaskReferences( + plasma_dependencies, release_lineage, borrower_addr, borrowed_refs, &deleted); in_memory_store_->Delete(deleted); } +void TaskManager::RemoveLineageReference(const ObjectID &object_id, + std::vector *released_objects) { + absl::MutexLock lock(&mu_); + const TaskID &task_id = object_id.TaskId(); + auto it = submissible_tasks_.find(task_id); + if (it == submissible_tasks_.end()) { + RAY_LOG(DEBUG) << "No lineage for object " << object_id; + return; + } + + RAY_LOG(DEBUG) << "Plasma object " << object_id << " out of scope"; + for (const auto &plasma_id : it->second.reconstructable_return_ids) { + RAY_LOG(DEBUG) << "Task " << task_id << " has " << plasma_id << " in scope"; + } + it->second.reconstructable_return_ids.erase(object_id); + RAY_LOG(DEBUG) << "Task " << task_id << " now has " + << it->second.reconstructable_return_ids.size() + << " plasma returns in scope"; + + if (it->second.reconstructable_return_ids.empty() && !it->second.pending) { + // If the task can no longer be retried, decrement the lineage ref count + // for each of the task's args. + for (size_t i = 0; i < it->second.spec.NumArgs(); i++) { + if (it->second.spec.ArgByRef(i)) { + for (size_t j = 0; j < it->second.spec.ArgIdCount(i); j++) { + released_objects->push_back(it->second.spec.ArgId(i, j)); + } + } else { + const auto &inlined_ids = it->second.spec.ArgInlinedIds(i); + released_objects->insert(released_objects->end(), inlined_ids.begin(), + inlined_ids.end()); + } + } + + // The task has finished and none of the return IDs are in scope anymore, + // so it is safe to remove the task spec. + submissible_tasks_.erase(it); + } +} + void TaskManager::MarkPendingTaskFailed(const TaskID &task_id, const TaskSpecification &spec, rpc::ErrorType error_type) { @@ -270,7 +370,7 @@ void TaskManager::MarkPendingTaskFailed(const TaskID &task_id, const auto object_id = ObjectID::ForTaskReturn( task_id, /*index=*/i + 1, /*transport_type=*/static_cast(TaskTransportType::DIRECT)); - RAY_CHECK_OK(in_memory_store_->Put(RayObject(error_type), object_id)); + RAY_UNUSED(in_memory_store_->Put(RayObject(error_type), object_id)); } if (spec.IsActorCreationTask()) { @@ -282,9 +382,9 @@ void TaskManager::MarkPendingTaskFailed(const TaskID &task_id, TaskSpecification TaskManager::GetTaskSpec(const TaskID &task_id) const { absl::MutexLock lock(&mu_); - auto it = pending_tasks_.find(task_id); - RAY_CHECK(it != pending_tasks_.end()); - return it->second.first; + auto it = submissible_tasks_.find(task_id); + RAY_CHECK(it != submissible_tasks_.end()); + return it->second.spec; } } // namespace ray diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index cea06bc98..1743a93ff 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -53,7 +53,13 @@ class TaskManager : public TaskFinisherInterface { : in_memory_store_(in_memory_store), reference_counter_(reference_counter), actor_manager_(actor_manager), - retry_task_callback_(retry_task_callback) {} + retry_task_callback_(retry_task_callback) { + reference_counter_->SetReleaseLineageCallback( + [this](const ObjectID &object_id, std::vector *ids_to_release) { + RemoveLineageReference(object_id, ids_to_release); + ShutdownIfNeeded(); + }); + } /// Add a task that is pending execution. /// @@ -72,12 +78,6 @@ class TaskManager : public TaskFinisherInterface { /// \param shutdown The shutdown callback to call. void DrainAndShutdown(std::function shutdown); - /// Return whether the task is pending. - /// - /// \param[in] task_id ID of the task to query. - /// \return Whether the task is pending. - bool IsTaskPending(const TaskID &task_id) const; - /// Write return objects for a pending task to the memory store. /// /// \param[in] task_id ID of the pending task. @@ -111,10 +111,73 @@ class TaskManager : public TaskFinisherInterface { /// Return the spec for a pending task. TaskSpecification GetTaskSpec(const TaskID &task_id) const; + /// Return whether this task can be submitted for execution. + /// + /// \param[in] task_id ID of the task to query. + /// \return Whether the task can be submitted for execution. + bool IsTaskSubmissible(const TaskID &task_id) const; + + /// Return whether the task is pending. + /// + /// \param[in] task_id ID of the task to query. + /// \return Whether the task is pending. + bool IsTaskPending(const TaskID &task_id) const; + + /// Return the number of submissible tasks. This includes both tasks that are + /// pending execution and tasks that have finished but that may be + /// re-executed to recover from a failure. + size_t NumSubmissibleTasks() const; + /// Return the number of pending tasks. - int NumPendingTasks() const { return pending_tasks_.size(); } + size_t NumPendingTasks() const; private: + struct TaskEntry { + TaskEntry(const TaskSpecification &spec_arg, int num_retries_left_arg, + size_t num_returns) + : spec(spec_arg), num_retries_left(num_retries_left_arg) { + for (size_t i = 0; i < num_returns; i++) { + reconstructable_return_ids.insert(spec.ReturnId(i, TaskTransportType::DIRECT)); + } + } + /// The task spec. This is pinned as long as the following are true: + /// - The task is still pending execution. This means that the task may + /// fail and so it may be retried in the future. + /// - The task finished execution, but it has num_retries_left > 0 and + /// reconstructable_return_ids is not empty. This means that the task may + /// be retried in the future to recreate its return objects. + /// TODO(swang): The TaskSpec protobuf must be copied into the + /// PushTaskRequest protobuf when sent to a worker so that we can retry it if + /// the worker fails. We could avoid this by either not caching the full + /// TaskSpec for tasks that cannot be retried (e.g., actor tasks), or by + /// storing a shared_ptr to a PushTaskRequest protobuf for all tasks. + const TaskSpecification spec; + // Number of times this task may be resubmitted. If this reaches 0, then + // the task entry may be erased. + int num_retries_left; + // Whether this task is currently pending execution. This is used to pin + // the task entry if the task is still pending but all of its return IDs + // are out of scope. + bool pending = true; + // Objects returned by this task that are reconstructable. This is set + // initially to the task's return objects, since if the task fails, these + // objects may be reconstructed by resubmitting the task. Once the task + // finishes its first execution, then the objects that the task returned by + // value are removed from this set because they can be inlined in any + // dependent tasks. Objects that the task returned through plasma are + // reconstructable, so they are only removed from this set once: + // 1) The language frontend no longer has a reference to the object ID. + // 2) There are no tasks that depend on the object. This includes both + // pending tasks and tasks that finished execution but that may be + // retried in the future. + absl::flat_hash_set reconstructable_return_ids; + }; + + /// Remove a lineage reference to this object ID. This should be called + /// whenever a task that depended on this object ID can no longer be retried. + void RemoveLineageReference(const ObjectID &object_id, + std::vector *ids_to_release) LOCKS_EXCLUDED(mu_); + /// Treat a pending task as failed. The lock should not be held when calling /// this method because it may trigger callbacks in this or other classes. void MarkPendingTaskFailed(const TaskID &task_id, const TaskSpecification &spec, @@ -125,7 +188,7 @@ class TaskManager : public TaskFinisherInterface { /// failed. The remaining dependencies are plasma objects and any ObjectIDs /// that were inlined in the task spec. void RemoveFinishedTaskReferences( - TaskSpecification &spec, const rpc::Address &worker_addr, + TaskSpecification &spec, bool release_lineage, const rpc::Address &worker_addr, const ReferenceCounter::ReferenceTableProto &borrowed_refs); /// Shutdown if all tasks are finished and shutdown is scheduled. @@ -154,16 +217,16 @@ class TaskManager : public TaskFinisherInterface { /// Protects below fields. mutable absl::Mutex mu_; - /// Map from task ID to a pair of: - /// {task spec, number of allowed retries left} - /// This map contains one entry per pending task that we submitted. - /// TODO(swang): The TaskSpec protobuf must be copied into the - /// PushTaskRequest protobuf when sent to a worker so that we can retry it if - /// the worker fails. We could avoid this by either not caching the full - /// TaskSpec for tasks that cannot be retried (e.g., actor tasks), or by - /// storing a shared_ptr to a PushTaskRequest protobuf for all tasks. - absl::flat_hash_map> pending_tasks_ - GUARDED_BY(mu_); + /// This map contains one entry per task that may be submitted for + /// execution. This includes both tasks that are currently pending execution + /// and tasks that finished execution but that may be retried again in the + /// future. + absl::flat_hash_map submissible_tasks_ GUARDED_BY(mu_); + + /// Number of tasks that are pending. This is a count of all tasks in + /// submissible_tasks_ that have been submitted and are currently pending + /// execution. + size_t num_pending_tasks_ = 0; /// Optional shutdown hook to call when pending tasks all finish. std::function shutdown_hook_ GUARDED_BY(mu_) = nullptr; diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 57f3f0343..7ebb88302 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -773,7 +773,7 @@ TEST_F(SingleNodeTest, TestMemoryStoreProvider) { std::vector ids(buffers.size()); for (size_t i = 0; i < ids.size(); i++) { ids[i] = ObjectID::FromRandom().WithDirectTransportType(); - RAY_CHECK_OK(provider.Put(buffers[i], ids[i])); + RAY_CHECK(provider.Put(buffers[i], ids[i])); } absl::flat_hash_set wait_ids(ids.begin(), ids.end()); @@ -830,7 +830,7 @@ TEST_F(SingleNodeTest, TestMemoryStoreProvider) { std::vector unready_ids(buffers.size()); for (size_t i = 0; i < unready_ids.size(); i++) { ready_ids[i] = ObjectID::FromRandom().WithDirectTransportType(); - RAY_CHECK_OK(provider.Put(buffers[i], ready_ids[i])); + RAY_CHECK(provider.Put(buffers[i], ready_ids[i])); unready_ids[i] = ObjectID::FromRandom().WithDirectTransportType(); } @@ -838,7 +838,7 @@ TEST_F(SingleNodeTest, TestMemoryStoreProvider) { sleep(1); for (size_t i = 0; i < unready_ids.size(); i++) { - RAY_CHECK_OK(provider.Put(buffers[i], unready_ids[i])); + RAY_CHECK(provider.Put(buffers[i], unready_ids[i])); } }; diff --git a/src/ray/core_worker/test/direct_actor_transport_test.cc b/src/ray/core_worker/test/direct_actor_transport_test.cc index 0caa9004a..c68d84d08 100644 --- a/src/ray/core_worker/test/direct_actor_transport_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_test.cc @@ -136,9 +136,9 @@ TEST_F(DirectActorSubmitterTest, TestDependencies) { // Put the dependencies in the store in the same order as task submission. auto data = GenerateRandomObject(); - ASSERT_TRUE(store_->Put(*data, obj1).ok()); + ASSERT_TRUE(store_->Put(*data, obj1)); ASSERT_EQ(worker_client_->callbacks.size(), 1); - ASSERT_TRUE(store_->Put(*data, obj2).ok()); + ASSERT_TRUE(store_->Put(*data, obj2)); ASSERT_EQ(worker_client_->callbacks.size(), 2); } @@ -165,9 +165,9 @@ TEST_F(DirectActorSubmitterTest, TestOutOfOrderDependencies) { // Put the dependencies in the store in the opposite order of task // submission. auto data = GenerateRandomObject(); - ASSERT_TRUE(store_->Put(*data, obj2).ok()); + ASSERT_TRUE(store_->Put(*data, obj2)); ASSERT_EQ(worker_client_->callbacks.size(), 0); - ASSERT_TRUE(store_->Put(*data, obj1).ok()); + ASSERT_TRUE(store_->Put(*data, obj1)); ASSERT_EQ(worker_client_->callbacks.size(), 2); } diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 6c3502a82..0b19f2772 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -139,7 +139,7 @@ TEST(TestMemoryStore, TestPromoteToPlasma) { ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); auto data = GenerateRandomObject(); - ASSERT_TRUE(mem->Put(*data, obj1).ok()); + ASSERT_TRUE(mem->Put(*data, obj1)); // Test getting an already existing object. ASSERT_TRUE(mem->GetOrPromoteToPlasma(obj1) != nullptr); @@ -148,7 +148,7 @@ TEST(TestMemoryStore, TestPromoteToPlasma) { // Testing getting an object that doesn't exist yet causes promotion. ASSERT_TRUE(mem->GetOrPromoteToPlasma(obj2) == nullptr); ASSERT_TRUE(num_plasma_puts == 0); - ASSERT_TRUE(mem->Put(*data, obj2).ok()); + ASSERT_FALSE(mem->Put(*data, obj2)); ASSERT_TRUE(num_plasma_puts == 1); // The next time you get it, it's already there so no need to promote. @@ -191,7 +191,7 @@ TEST(LocalDependencyResolverTest, TestHandlePlasmaPromotion) { auto metadata = const_cast(reinterpret_cast(meta.data())); auto meta_buffer = std::make_shared(metadata, meta.size()); auto data = RayObject(nullptr, meta_buffer, std::vector()); - ASSERT_TRUE(store->Put(data, obj1).ok()); + ASSERT_TRUE(store->Put(data, obj1)); TaskSpecification task; task.GetMutableMessage().add_args()->add_object_ids(obj1.Binary()); ASSERT_TRUE(task.ArgId(0, 0).IsDirectCallType()); @@ -213,8 +213,8 @@ TEST(LocalDependencyResolverTest, TestInlineLocalDependencies) { ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); auto data = GenerateRandomObject(); // Ensure the data is already present in the local store. - ASSERT_TRUE(store->Put(*data, obj1).ok()); - ASSERT_TRUE(store->Put(*data, obj2).ok()); + ASSERT_TRUE(store->Put(*data, obj1)); + ASSERT_TRUE(store->Put(*data, obj2)); TaskSpecification task; task.GetMutableMessage().add_args()->add_object_ids(obj1.Binary()); task.GetMutableMessage().add_args()->add_object_ids(obj2.Binary()); @@ -244,8 +244,8 @@ TEST(LocalDependencyResolverTest, TestInlinePendingDependencies) { resolver.ResolveDependencies(task, [&ok]() { ok = true; }); ASSERT_EQ(resolver.NumPendingTasks(), 1); ASSERT_TRUE(!ok); - ASSERT_TRUE(store->Put(*data, obj1).ok()); - ASSERT_TRUE(store->Put(*data, obj2).ok()); + ASSERT_TRUE(store->Put(*data, obj1)); + ASSERT_TRUE(store->Put(*data, obj2)); // Tests that the task proto was rewritten to have inline argument values after // resolution completes. ASSERT_TRUE(ok); @@ -273,8 +273,8 @@ TEST(LocalDependencyResolverTest, TestInlinedObjectIds) { resolver.ResolveDependencies(task, [&ok]() { ok = true; }); ASSERT_EQ(resolver.NumPendingTasks(), 1); ASSERT_TRUE(!ok); - ASSERT_TRUE(store->Put(*data, obj1).ok()); - ASSERT_TRUE(store->Put(*data, obj2).ok()); + ASSERT_TRUE(store->Put(*data, obj1)); + ASSERT_TRUE(store->Put(*data, obj2)); // Tests that the task proto was rewritten to have inline argument values after // resolution completes. ASSERT_TRUE(ok); @@ -698,16 +698,16 @@ TEST(DirectTaskTransportTest, TestSchedulingKeys) { ObjectID plasma2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); // Ensure the data is already present in the local store for direct call objects. auto data = GenerateRandomObject(); - ASSERT_TRUE(store->Put(*data, direct1).ok()); - ASSERT_TRUE(store->Put(*data, direct2).ok()); + ASSERT_TRUE(store->Put(*data, direct1)); + ASSERT_TRUE(store->Put(*data, direct2)); // Force plasma objects to be promoted. std::string meta = std::to_string(static_cast(rpc::ErrorType::OBJECT_IN_PLASMA)); auto metadata = const_cast(reinterpret_cast(meta.data())); auto meta_buffer = std::make_shared(metadata, meta.size()); auto plasma_data = RayObject(nullptr, meta_buffer, std::vector()); - ASSERT_TRUE(store->Put(plasma_data, plasma1).ok()); - ASSERT_TRUE(store->Put(plasma_data, plasma2).ok()); + ASSERT_TRUE(store->Put(plasma_data, plasma1)); + ASSERT_TRUE(store->Put(plasma_data, plasma2)); TaskSpecification same_deps_1 = BuildTaskSpec(resources1, descriptor1); same_deps_1.GetMutableMessage().add_args()->add_object_ids(direct1.Binary()); diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index 3345d4304..e9efc9ac7 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -44,10 +44,11 @@ class MockActorManager : public ActorManagerInterface { class TaskManagerTest : public ::testing::Test { public: - TaskManagerTest() + TaskManagerTest(bool lineage_pinning_enabled = false) : store_(std::shared_ptr(new CoreWorkerMemoryStore())), - reference_counter_( - std::shared_ptr(new ReferenceCounter(rpc::Address()))), + reference_counter_(std::shared_ptr(new ReferenceCounter( + rpc::Address(), + /*distributed_ref_counting_enabled=*/true, lineage_pinning_enabled))), actor_manager_(std::shared_ptr(new MockActorManager())), manager_(store_, reference_counter_, actor_manager_, [this](const TaskSpecification &spec) { @@ -62,6 +63,11 @@ class TaskManagerTest : public ::testing::Test { int num_retries_ = 0; }; +class TaskManagerLineageTest : public TaskManagerTest { + public: + TaskManagerLineageTest() : TaskManagerTest(true) {} +}; + TEST_F(TaskManagerTest, TestTaskSuccess) { TaskID caller_id = TaskID::Nil(); rpc::Address caller_address; @@ -153,6 +159,7 @@ TEST_F(TaskManagerTest, TestTaskRetry) { auto error = rpc::ErrorType::WORKER_DIED; for (int i = 0; i < num_retries; i++) { + RAY_LOG(INFO) << "Retry " << i; manager_.PendingTaskFailed(spec.TaskId(), error); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); @@ -167,7 +174,7 @@ TEST_F(TaskManagerTest, TestTaskRetry) { ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 1); std::vector> results; - RAY_CHECK_OK(store_->Get({return_id}, 1, -0, ctx, false, &results)); + RAY_CHECK_OK(store_->Get({return_id}, 1, 0, ctx, false, &results)); ASSERT_EQ(results.size(), 1); rpc::ErrorType stored_error; ASSERT_TRUE(results[0]->IsException(&stored_error)); @@ -180,6 +187,240 @@ TEST_F(TaskManagerTest, TestTaskRetry) { ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); } +// Test to make sure that the task spec and dependencies for an object are +// evicted when lineage pinning is disabled in the ReferenceCounter. +TEST_F(TaskManagerTest, TestLineageEvicted) { + TaskID caller_id = TaskID::Nil(); + rpc::Address caller_address; + ObjectID dep1 = ObjectID::FromRandom(); + ObjectID dep2 = ObjectID::FromRandom(); + ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); + auto spec = CreateTaskHelper(1, {dep1, dep2}); + int num_retries = 3; + manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); + + auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT); + rpc::PushTaskReply reply; + auto return_object = reply.add_return_objects(); + return_object->set_object_id(return_id.Binary()); + return_object->set_in_plasma(true); + manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address()); + // The task is still pinned because its return ID is still in scope. + ASSERT_TRUE(manager_.IsTaskSubmissible(spec.TaskId())); + ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); + // The dependencies should not be pinned because lineage pinning is + // disabled. + ASSERT_FALSE(reference_counter_->HasReference(dep1)); + ASSERT_FALSE(reference_counter_->HasReference(dep2)); + ASSERT_TRUE(reference_counter_->HasReference(return_id)); + + // Once the return ID goes out of scope, the task spec and its dependencies + // are released. + reference_counter_->AddLocalReference(return_id, ""); + reference_counter_->RemoveLocalReference(return_id, nullptr); + ASSERT_FALSE(manager_.IsTaskSubmissible(spec.TaskId())); + ASSERT_FALSE(reference_counter_->HasReference(return_id)); +} + +// Test to make sure that the task spec and dependencies for an object are +// pinned when lineage pinning is enabled in the ReferenceCounter. +TEST_F(TaskManagerLineageTest, TestLineagePinned) { + TaskID caller_id = TaskID::Nil(); + rpc::Address caller_address; + // Submit a task with 2 arguments. + ObjectID dep1 = ObjectID::FromRandom(); + ObjectID dep2 = ObjectID::FromRandom(); + ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); + auto spec = CreateTaskHelper(1, {dep1, dep2}); + ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); + int num_retries = 3; + manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); + auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT); + reference_counter_->AddLocalReference(return_id, ""); + ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); + ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); + + // The task completes. + rpc::PushTaskReply reply; + auto return_object = reply.add_return_objects(); + return_object->set_object_id(return_id.Binary()); + auto data = GenerateRandomBuffer(); + return_object->set_data(data->Data(), data->Size()); + return_object->set_in_plasma(true); + manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address()); + // The task should still be in the lineage because its return ID is in scope. + ASSERT_TRUE(manager_.IsTaskSubmissible(spec.TaskId())); + ASSERT_TRUE(reference_counter_->HasReference(dep1)); + ASSERT_TRUE(reference_counter_->HasReference(dep2)); + ASSERT_TRUE(reference_counter_->HasReference(return_id)); + + // All lineage should be erased. + reference_counter_->RemoveLocalReference(return_id, nullptr); + ASSERT_FALSE(manager_.IsTaskSubmissible(spec.TaskId())); + ASSERT_FALSE(reference_counter_->HasReference(dep1)); + ASSERT_FALSE(reference_counter_->HasReference(dep2)); + ASSERT_FALSE(reference_counter_->HasReference(return_id)); +} + +// Test to make sure that the task spec and dependencies for an object are +// evicted if the object is returned by value, instead of stored in plasma. +TEST_F(TaskManagerLineageTest, TestDirectObjectNoLineage) { + TaskID caller_id = TaskID::Nil(); + rpc::Address caller_address; + // Submit a task with 2 arguments. + ObjectID dep1 = ObjectID::FromRandom(); + ObjectID dep2 = ObjectID::FromRandom(); + ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); + auto spec = CreateTaskHelper(1, {dep1, dep2}); + ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); + int num_retries = 3; + manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); + auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT); + reference_counter_->AddLocalReference(return_id, ""); + ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); + ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); + + // The task completes. + rpc::PushTaskReply reply; + auto return_object = reply.add_return_objects(); + return_object->set_object_id(return_id.Binary()); + auto data = GenerateRandomBuffer(); + return_object->set_data(data->Data(), data->Size()); + return_object->set_in_plasma(false); + manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address()); + // All lineage should be erased because the return object was not stored in + // plasma. + ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); + ASSERT_FALSE(reference_counter_->HasReference(dep1)); + ASSERT_FALSE(reference_counter_->HasReference(dep2)); + ASSERT_TRUE(reference_counter_->HasReference(return_id)); +} + +// Test to make sure that the task spec and dependencies for an object are +// pinned if the object goes out of scope before the task finishes. This is +// needed in case the pending task fails and needs to be retried. +TEST_F(TaskManagerLineageTest, TestLineagePinnedOutOfOrder) { + TaskID caller_id = TaskID::Nil(); + rpc::Address caller_address; + // Submit a task with 2 arguments. + ObjectID dep1 = ObjectID::FromRandom(); + ObjectID dep2 = ObjectID::FromRandom(); + ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); + auto spec = CreateTaskHelper(1, {dep1, dep2}); + ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); + int num_retries = 3; + manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); + auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT); + reference_counter_->AddLocalReference(return_id, ""); + ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); + ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); + + // The return ID goes out of scope. The lineage should still be pinned + // because the task has not completed yet. + reference_counter_->RemoveLocalReference(return_id, nullptr); + ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); + ASSERT_TRUE(reference_counter_->HasReference(dep1)); + ASSERT_TRUE(reference_counter_->HasReference(dep2)); + ASSERT_FALSE(reference_counter_->HasReference(return_id)); + + // The task completes. + rpc::PushTaskReply reply; + auto return_object = reply.add_return_objects(); + return_object->set_object_id(return_id.Binary()); + auto data = GenerateRandomBuffer(); + return_object->set_data(data->Data(), data->Size()); + return_object->set_in_plasma(true); + manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address()); + // All lineage should be erased. + ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); + ASSERT_FALSE(reference_counter_->HasReference(dep1)); + ASSERT_FALSE(reference_counter_->HasReference(dep2)); + ASSERT_FALSE(reference_counter_->HasReference(return_id)); +} + +// Test for pinning the lineage of an object, where the lineage is a chain of +// tasks that each depend on the previous. All tasks should be pinned until the +// final object goes out of scope. +TEST_F(TaskManagerLineageTest, TestRecursiveLineagePinned) { + TaskID caller_id = TaskID::Nil(); + rpc::Address caller_address; + + ObjectID dep = ObjectID::FromRandom(); + reference_counter_->AddLocalReference(dep, ""); + for (int i = 0; i < 3; i++) { + auto spec = CreateTaskHelper(1, {dep}); + int num_retries = 3; + manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); + auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT); + reference_counter_->AddLocalReference(return_id, ""); + + // The task completes. + rpc::PushTaskReply reply; + auto return_object = reply.add_return_objects(); + return_object->set_object_id(return_id.Binary()); + auto data = GenerateRandomBuffer(); + return_object->set_data(data->Data(), data->Size()); + return_object->set_in_plasma(true); + manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address()); + + // All tasks should be pinned in the lineage. + ASSERT_EQ(manager_.NumSubmissibleTasks(), i + 1); + // All objects in the lineage of the newest return ID, plus the return ID + // itself, should be pinned. + ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), i + 2); + + reference_counter_->RemoveLocalReference(dep, nullptr); + dep = return_id; + } + + // The task's return ID goes out of scope before the task finishes. + reference_counter_->RemoveLocalReference(dep, nullptr); + ASSERT_EQ(manager_.NumSubmissibleTasks(), 0); + ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); +} + +// Test for evicting the lineage of an object passed by value, where the +// lineage is a chain of tasks that each depend on the previous and each return +// a direct value. All tasks should be evicted as soon as they complete, even +// though the final object is still in scope. +TEST_F(TaskManagerLineageTest, TestRecursiveDirectObjectNoLineage) { + TaskID caller_id = TaskID::Nil(); + rpc::Address caller_address; + + ObjectID dep = ObjectID::FromRandom(); + reference_counter_->AddLocalReference(dep, ""); + for (int i = 0; i < 3; i++) { + auto spec = CreateTaskHelper(1, {dep}); + int num_retries = 3; + manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); + auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT); + reference_counter_->AddLocalReference(return_id, ""); + + // The task completes. + rpc::PushTaskReply reply; + auto return_object = reply.add_return_objects(); + return_object->set_object_id(return_id.Binary()); + auto data = GenerateRandomBuffer(); + return_object->set_data(data->Data(), data->Size()); + return_object->set_in_plasma(false); + manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address()); + + // No tasks should be pinned because they returned direct objects. + ASSERT_EQ(manager_.NumSubmissibleTasks(), 0); + // Only the dependency and the newest return ID should be in scope because + // all objects in the lineage were direct. + ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 2); + + reference_counter_->RemoveLocalReference(dep, nullptr); + dep = return_id; + } + + // The task's return ID goes out of scope before the task finishes. + reference_counter_->RemoveLocalReference(dep, nullptr); + ASSERT_EQ(manager_.NumSubmissibleTasks(), 0); + ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); +} + } // namespace ray int main(int argc, char **argv) {