From ba86a02b375d1340ec77cf14a1bd099f58e8b0f4 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Sat, 21 Mar 2020 18:35:43 -0700 Subject: [PATCH] [core] Revert lineage pinning (#7499) (#7692) * Revert "fix (#7681)" This reverts commit 6a12a31b2ed2c424469627feca25a4ec4a7f1b5b. * Revert "[core] Pin lineage of plasma objects that are still in scope (#7499)" This reverts commit 014929e6585fa63e011e0761eb5e582e1a4c2017. --- python/ray/tests/test_failure.py | 32 +++ src/ray/common/ray_config_def.h | 2 - src/ray/core_worker/core_worker.cc | 16 +- src/ray/core_worker/future_resolver.cc | 4 +- src/ray/core_worker/reference_count.cc | 90 +------ src/ray/core_worker/reference_count.h | 81 +----- src/ray/core_worker/reference_count_test.cc | 128 +-------- .../memory_store/memory_store.cc | 8 +- .../memory_store/memory_store.h | 5 +- src/ray/core_worker/task_manager.cc | 162 +++--------- src/ray/core_worker/task_manager.h | 90 ++----- src/ray/core_worker/test/core_worker_test.cc | 6 +- .../test/direct_actor_transport_test.cc | 8 +- .../test/direct_task_transport_test.cc | 26 +- src/ray/core_worker/test/task_manager_test.cc | 249 +----------------- 15 files changed, 155 insertions(+), 752 deletions(-) diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index ca91c50da..171a36c1d 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -1014,6 +1014,38 @@ def test_eviction(ray_start_cluster): ray.get(dependent_task.remote(obj)) +@pytest.mark.parametrize( + "ray_start_cluster", [{ + "num_nodes": 1, + "num_cpus": 2, + }, { + "num_nodes": 2, + "num_cpus": 1, + }], + indirect=True) +def test_serialized_id_eviction(ray_start_cluster): + @ray.remote + def large_object(): + return np.zeros(10 * 1024 * 1024) + + @ray.remote + def get(obj_ids): + obj_id = obj_ids[0] + assert (isinstance(ray.get(obj_id), np.ndarray)) + # Wait for the object to be evicted. + ray.internal.free(obj_id) + while ray.worker.global_worker.core_worker.object_exists(obj_id): + time.sleep(1) + with pytest.raises(ray.exceptions.UnreconstructableError): + ray.get(obj_id) + print("get done", obj_ids) + + obj = large_object.remote() + result = get.remote([obj]) + ray.internal.free(obj) + ray.get(result) + + @pytest.mark.parametrize( "ray_start_cluster", [{ "num_nodes": 2, diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index fd035e4ec..7b6ada54c 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -101,8 +101,6 @@ RAY_CONFIG(int64_t, free_objects_period_milliseconds, 1000) /// to -1. RAY_CONFIG(size_t, free_objects_batch_size, 100) -RAY_CONFIG(bool, lineage_pinning_enabled, false) - /// Whether to enable the new scheduler. The new scheduler is designed /// only to work with direct calls. Once direct calls afre becoming /// the default, this scheduler will also become the default. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index a1b977da0..e59232e34 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -173,7 +173,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, reference_counter_ = std::make_shared( rpc_address_, RayConfig::instance().distributed_ref_counting_enabled(), - RayConfig::instance().lineage_pinning_enabled(), [this](const rpc::Address &addr) { + [this](const rpc::Address &addr) { return std::shared_ptr( new rpc::CoreWorkerClient(addr, *client_call_manager_)); }); @@ -485,8 +485,7 @@ Status CoreWorker::Put(const RayObject &object, RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id)); } } - RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id)); - return Status::OK(); + return memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id); } Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t data_size, @@ -530,8 +529,7 @@ Status CoreWorker::Seal(const ObjectID &object_id, bool pin_object, } else { RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id)); } - RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id)); - return Status::OK(); + return memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id); } Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_ms, @@ -1243,8 +1241,8 @@ Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task, // We need to put an OBJECT_IN_PLASMA error here so the subsequent call to Get() // properly redirects to the plasma store. if (task.ArgId(i, 0).IsDirectCallType()) { - RAY_UNUSED(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), - task.ArgId(i, 0))); + RAY_CHECK_OK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), + task.ArgId(i, 0))); } const auto &arg_id = task.ArgId(i, 0); by_ref_ids.insert(arg_id); @@ -1466,9 +1464,7 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest & rpc::SendReplyCallback send_reply_callback) { absl::MutexLock lock(&mutex_); auto stats = reply->mutable_core_worker_stats(); - // TODO(swang): Differentiate between tasks that are currently pending - // execution and tasks that have finished but may be retried. - stats->set_num_pending_tasks(task_manager_->NumSubmissibleTasks()); + stats->set_num_pending_tasks(task_manager_->NumPendingTasks()); stats->set_task_queue_length(task_queue_length_); stats->set_num_executed_tasks(num_executed_tasks_); stats->set_num_object_ids_in_scope(reference_counter_->NumObjectIDsInScope()); diff --git a/src/ray/core_worker/future_resolver.cc b/src/ray/core_worker/future_resolver.cc index 22bb7ca0e..f26b34c69 100644 --- a/src/ray/core_worker/future_resolver.cc +++ b/src/ray/core_worker/future_resolver.cc @@ -40,8 +40,8 @@ void FutureResolver::ResolveFutureAsync(const ObjectID &object_id, const TaskID // Either the owner is gone or the owner replied that the object has // been created. In both cases, we can now try to fetch the object via // plasma. - RAY_UNUSED(in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), - object_id)); + RAY_CHECK_OK(in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), + object_id)); })); } diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index d6231ffdf..a47b542d6 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -23,8 +23,7 @@ << (it->second.contained_in_borrowed_id.has_value() \ ? *it->second.contained_in_borrowed_id \ : ObjectID::Nil()) \ - << " contains: " << it->second.contains.size() \ - << " lineage_ref_count: " << it->second.lineage_ref_count; + << " contains: " << it->second.contains.size(); namespace {} // namespace @@ -216,7 +215,6 @@ void ReferenceCounter::UpdateSubmittedTaskReferences( const std::vector &argument_ids_to_remove, std::vector *deleted) { absl::MutexLock lock(&mutex_); for (const ObjectID &argument_id : argument_ids_to_add) { - RAY_LOG(DEBUG) << "Increment ref count for submitted task argument " << argument_id; auto it = object_id_refs_.find(argument_id); if (it == object_id_refs_.end()) { // This happens if a large argument is transparently passed by reference @@ -224,20 +222,13 @@ void ReferenceCounter::UpdateSubmittedTaskReferences( it = object_id_refs_.emplace(argument_id, Reference()).first; } it->second.submitted_task_ref_count++; - // The lineage ref will get released once the task finishes and cannot be - // retried again. - it->second.lineage_ref_count++; } - // Release the submitted task ref and the lineage ref for any argument IDs - // whose values were inlined. - RemoveSubmittedTaskReferences(argument_ids_to_remove, /*release_lineage=*/true, - deleted); + RemoveSubmittedTaskReferences(argument_ids_to_remove, deleted); } void ReferenceCounter::UpdateFinishedTaskReferences( - const std::vector &argument_ids, bool release_lineage, - const rpc::Address &worker_addr, const ReferenceTableProto &borrowed_refs, - std::vector *deleted) { + const std::vector &argument_ids, const rpc::Address &worker_addr, + const ReferenceTableProto &borrowed_refs, std::vector *deleted) { absl::MutexLock lock(&mutex_); // Must merge the borrower refs before decrementing any ref counts. This is // to make sure that for serialized IDs, we increment the borrower count for @@ -251,63 +242,19 @@ void ReferenceCounter::UpdateFinishedTaskReferences( MergeRemoteBorrowers(argument_id, worker_addr, refs); } - RemoveSubmittedTaskReferences(argument_ids, release_lineage, deleted); -} - -void ReferenceCounter::ReleaseLineageReferences( - const std::vector &argument_ids) { - absl::MutexLock lock(&mutex_); - ReleaseLineageReferencesInternal(argument_ids); -} - -void ReferenceCounter::ReleaseLineageReferencesInternal( - const std::vector &argument_ids) { - for (const ObjectID &argument_id : argument_ids) { - auto it = object_id_refs_.find(argument_id); - if (it == object_id_refs_.end()) { - // References can get evicted early when lineage pinning is disabled. - RAY_CHECK(!lineage_pinning_enabled_); - continue; - } - - if (it->second.lineage_ref_count == 0) { - // References can get evicted early when lineage pinning is disabled. - RAY_CHECK(!lineage_pinning_enabled_); - continue; - } - - RAY_LOG(DEBUG) << "Releasing lineage internal for argument " << argument_id; - it->second.lineage_ref_count--; - if (it->second.lineage_ref_count == 0) { - // Don't have to pass in a deleted vector here because the reference - // cannot have gone out of scope here since we are only modifying the - // lineage ref count. - DeleteReferenceInternal(it, nullptr); - } - } + RemoveSubmittedTaskReferences(argument_ids, deleted); } void ReferenceCounter::RemoveSubmittedTaskReferences( - const std::vector &argument_ids, bool release_lineage, - std::vector *deleted) { + const std::vector &argument_ids, std::vector *deleted) { for (const ObjectID &argument_id : argument_ids) { - RAY_LOG(DEBUG) << "Releasing ref for submitted task argument " << argument_id; auto it = object_id_refs_.find(argument_id); if (it == object_id_refs_.end()) { RAY_LOG(WARNING) << "Tried to decrease ref count for nonexistent object ID: " << argument_id; return; } - RAY_CHECK(it->second.submitted_task_ref_count > 0); it->second.submitted_task_ref_count--; - if (release_lineage) { - if (it->second.lineage_ref_count > 0) { - it->second.lineage_ref_count--; - } else { - // References can get evicted early when lineage pinning is disabled. - RAY_CHECK(!lineage_pinning_enabled_); - } - } if (it->second.RefCount() == 0) { DeleteReferenceInternal(it, deleted); } @@ -340,7 +287,7 @@ void ReferenceCounter::DeleteReferences(const std::vector &object_ids) } it->second.local_ref_count = 0; it->second.submitted_task_ref_count = 0; - if (distributed_ref_counting_enabled_ && !it->second.OutOfScope()) { + if (distributed_ref_counting_enabled_ && !it->second.CanDelete()) { RAY_LOG(ERROR) << "ray.internal.free does not currently work for objects that are still in " "scope when distributed reference " @@ -364,6 +311,8 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it, // Whether it is safe to unpin the value. bool should_delete_value = false; + // Whether it is safe to delete the Reference. + bool should_delete_reference = false; // If distributed ref counting is not enabled, then delete the object as soon // as its local ref count goes to 0. @@ -373,10 +322,11 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it, should_delete_value = true; } - if (it->second.OutOfScope()) { + if (it->second.CanDelete()) { // If distributed ref counting is enabled, then delete the object once its // ref count across all processes is 0. should_delete_value = true; + should_delete_reference = true; for (const auto &inner_id : it->second.contains) { auto inner_it = object_id_refs_.find(inner_id); if (inner_it != object_id_refs_.end()) { @@ -409,16 +359,8 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it, deleted->push_back(id); } } - if (it->second.ShouldDelete(lineage_pinning_enabled_)) { + if (should_delete_reference) { RAY_LOG(DEBUG) << "Deleting Reference to object " << id; - // TODO(swang): Update lineage_ref_count for nested objects? - if (on_lineage_released_ && it->second.owned_by_us) { - RAY_LOG(DEBUG) << "Releasing lineage for object " << id; - std::vector ids_to_release; - on_lineage_released_(id, &ids_to_release); - ReleaseLineageReferencesInternal(ids_to_release); - } - object_id_refs_.erase(it); ShutdownIfNeeded(); } @@ -719,7 +661,7 @@ void ReferenceCounter::HandleRefRemoved(const ObjectID &object_id, // the object was zero. Also, we should have stripped all distributed ref // count information and returned it to the owner. Therefore, it should be // okay to delete the object, if it wasn't already deleted. - RAY_CHECK(it->second.OutOfScope()); + RAY_CHECK(it->second.CanDelete()); } // Send the owner information about any new borrowers. ReferenceTableToProto(borrowed_refs, reply->mutable_borrowed_refs()); @@ -771,12 +713,6 @@ void ReferenceCounter::SetRefRemovedCallback( } } -void ReferenceCounter::SetReleaseLineageCallback( - const LineageReleasedCallback &callback) { - RAY_CHECK(on_lineage_released_ == nullptr); - on_lineage_released_ = callback; -} - ReferenceCounter::Reference ReferenceCounter::Reference::FromProto( const rpc::ObjectReferenceCount &ref_count) { Reference ref; diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index 30a9fcb11..97957f909 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -36,16 +36,12 @@ class ReferenceCounter { using ReferenceTableProto = ::google::protobuf::RepeatedPtrField; using ReferenceRemovedCallback = std::function; - using LineageReleasedCallback = - std::function *)>; ReferenceCounter(const rpc::WorkerAddress &rpc_address, bool distributed_ref_counting_enabled = true, - bool lineage_pinning_enabled = false, rpc::ClientFactoryFn client_factory = nullptr) : rpc_address_(rpc_address), distributed_ref_counting_enabled_(distributed_ref_counting_enabled), - lineage_pinning_enabled_(lineage_pinning_enabled), client_factory_(client_factory) {} ~ReferenceCounter() {} @@ -71,16 +67,9 @@ class ReferenceCounter { LOCKS_EXCLUDED(mutex_); /// Add references for the provided object IDs that correspond to them being - /// dependencies to a submitted task. If lineage pinning is enabled, then - /// this will also pin the Reference entry for each new argument until the - /// argument's lineage ref is released. + /// dependencies to a submitted task. /// - /// \param[in] argument_ids_to_add The arguments of the task to add - /// references for. - /// \param[out] argument_ids_to_remove The arguments of the task to remove - /// references for. - /// \param[out] deleted Any objects that are newly out of scope after this - /// function call. + /// \param[in] object_ids The object IDs to add references for. void UpdateSubmittedTaskReferences( const std::vector &argument_ids_to_add, const std::vector &argument_ids_to_remove = std::vector(), @@ -92,8 +81,6 @@ class ReferenceCounter { /// when the task finishes for plasma dependencies. /// /// \param[in] object_ids The object IDs to remove references for. - /// \param[in] release_lineage Whether to decrement the arguments' lineage - /// ref count. /// \param[in] worker_addr The address of the worker that executed the task. /// \param[in] borrowed_refs The references that the worker borrowed during /// the task. This table includes all task arguments that were passed by @@ -102,22 +89,11 @@ class ReferenceCounter { /// worker and/or a task that the worker submitted. /// \param[out] deleted The object IDs whos reference counts reached zero. void UpdateFinishedTaskReferences(const std::vector &argument_ids, - bool release_lineage, const rpc::Address &worker_addr, + const rpc::Address &worker_addr, const ReferenceTableProto &borrowed_refs, std::vector *deleted) LOCKS_EXCLUDED(mutex_); - /// Release the lineage ref count for this list of object IDs. An object's - /// lineage ref count is the number of tasks that depend on the object that - /// may be retried in the future (pending execution or finished but - /// retryable). If the object is direct (not stored in plasma), then its - /// lineage ref count is 0. - /// - /// \param[in] argument_ids The list of objects whose lineage ref counts we - /// should decrement. - void ReleaseLineageReferences(const std::vector &argument_ids) - LOCKS_EXCLUDED(mutex_); - /// Add an object that we own. The object may depend on other objects. /// Dependencies for each ObjectID must be set at most once. The local /// reference count for the ObjectID is set to zero, which assumes that an @@ -195,15 +171,6 @@ class ReferenceCounter { const ReferenceRemovedCallback &ref_removed_callback) LOCKS_EXCLUDED(mutex_); - /// Set a callback to call whenever a Reference that we own is deleted. A - /// Reference can only be deleted if: - /// 1. The ObjectID's ref count is 0 on all workers. - /// 2. There are no tasks that depend on the object that may be retried in - /// the future. - /// - /// \param[in] callback The callback to call. - void SetReleaseLineageCallback(const LineageReleasedCallback &callback); - /// Respond to the object's owner once we are no longer borrowing it. The /// sender is the owner of the object ID. We will send the reply when our /// RefCount() for the object ID goes to 0. @@ -309,13 +276,13 @@ class ReferenceCounter { return local_ref_count + submitted_task_ref_count + contained_in_owned.size(); } - /// Whether this reference is no longer in scope. A reference is in scope - /// if any of the following are true: + /// Whether we can delete this reference. A reference can NOT be deleted if + /// any of the following are true: /// - The reference is still being used by this process. /// - The reference was contained in another ID that we were borrowing, and /// we haven't told the process that gave us that ID yet. /// - We gave the reference to at least one other process. - bool OutOfScope() const { + bool CanDelete() const { bool in_scope = RefCount() > 0; bool was_contained_in_borrowed_id = contained_in_borrowed_id.has_value(); bool has_borrowers = borrowers.size() > 0; @@ -324,19 +291,6 @@ class ReferenceCounter { was_stored_in_objects); } - /// Whether the Reference can be deleted. A Reference can only be deleted - /// if: - /// 1. The ObjectID's ref count is 0 on all workers. - /// 2. If lineage pinning is enabled, there are no tasks that depend on - /// the object that may be retried in the future. - bool ShouldDelete(bool lineage_pinning_enabled) const { - if (lineage_pinning_enabled) { - return OutOfScope() && (lineage_ref_count == 0); - } else { - return OutOfScope(); - } - } - /// Description of the call site where the reference was created. std::string call_site = ""; /// Object size if known, otherwise -1; @@ -405,11 +359,6 @@ class ReferenceCounter { /// task's caller is also a borrower. The key is the task's return ID, and /// the value is the task ID and address of the task's caller. absl::flat_hash_map stored_in_objects; - /// The number of tasks that depend on this object that may be retried in - /// the future (pending execution or finished but retryable). If the object - /// is inlined (not stored in plasma), then its lineage ref count is 0 - /// because any dependent task will already have the value of the object. - size_t lineage_ref_count = 0; /// Callback that will be called when this ObjectID no longer has /// references. @@ -437,7 +386,7 @@ class ReferenceCounter { /// inlined dependencies are inlined or when the task finishes for plasma /// dependencies. void RemoveSubmittedTaskReferences(const std::vector &argument_ids, - bool release_lineage, std::vector *deleted) + std::vector *deleted) EXCLUSIVE_LOCKS_REQUIRED(mutex_); /// Helper method to mark that this ObjectID contains another ObjectID(s). @@ -523,10 +472,6 @@ class ReferenceCounter { std::vector *deleted) EXCLUSIVE_LOCKS_REQUIRED(mutex_); - /// Helper method to decrement the lineage ref count for a list of objects. - void ReleaseLineageReferencesInternal(const std::vector &argument_ids) - EXCLUSIVE_LOCKS_REQUIRED(mutex_); - /// Address of our RPC server. This is used to determine whether we own a /// given object or not, by comparing our WorkerID with the WorkerID of the /// object's owner. @@ -535,13 +480,7 @@ class ReferenceCounter { /// Feature flag for distributed ref counting. If this is false, then we will /// keep the distributed ref count, but only the local ref count will be used /// to decide when objects can be evicted. - const bool distributed_ref_counting_enabled_; - - /// Feature flag for lineage pinning. If this is false, then we will keep the - /// lineage ref count, but this will not be used to decide when the object's - /// Reference can be deleted. The object's lineage ref count is the number of - /// tasks that depend on that object that may be retried in the future. - const bool lineage_pinning_enabled_; + bool distributed_ref_counting_enabled_; /// Factory for producing new core worker clients. rpc::ClientFactoryFn client_factory_; @@ -558,10 +497,6 @@ class ReferenceCounter { /// Holds all reference counts and dependency information for tracked ObjectIDs. ReferenceTable object_id_refs_ GUARDED_BY(mutex_); - /// The callback to call once an object ID that we own is no longer in scope - /// and it has no tasks that depend on it that may be retried in the future. - /// The object's Reference will be erased after this callback. - LineageReleasedCallback on_lineage_released_; /// Optional shutdown hook to call when all references have gone /// out of scope. std::function shutdown_hook_ GUARDED_BY(mutex_) = nullptr; diff --git a/src/ray/core_worker/reference_count_test.cc b/src/ray/core_worker/reference_count_test.cc index 0e275444e..9a4244647 100644 --- a/src/ray/core_worker/reference_count_test.cc +++ b/src/ray/core_worker/reference_count_test.cc @@ -36,20 +36,6 @@ class ReferenceCountTest : public ::testing::Test { virtual void TearDown() {} }; -class ReferenceCountLineageEnabledTest : public ::testing::Test { - protected: - std::unique_ptr rc; - virtual void SetUp() { - rpc::Address addr; - rc = std::unique_ptr( - new ReferenceCounter(addr, - /*distributed_ref_counting_enabled=*/true, - /*lineage_pinning_enabled=*/true)); - } - - virtual void TearDown() {} -}; - class MockWorkerClient : public rpc::CoreWorkerClientInterface { public: // Helper function to generate a random address. @@ -64,9 +50,8 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { MockWorkerClient(const std::string &addr, rpc::ClientFactoryFn client_factory = nullptr) : task_id_(TaskID::ForFakeTask()), address_(CreateRandomAddress(addr)), - rc_(rpc::WorkerAddress(address_), - /*distributed_ref_counting_enabled=*/true, - /*lineage_pinning_enabled=*/false, client_factory) {} + rc_(rpc::WorkerAddress(address_), /*distributed_ref_counting_enabled=*/true, + client_factory) {} ray::Status WaitForRefRemoved( const rpc::WaitForRefRemovedRequest &request, @@ -171,8 +156,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { if (!arg_id.IsNil()) { arguments.push_back(arg_id); } - rc_.UpdateFinishedTaskReferences(arguments, false, borrower_address, borrower_refs, - nullptr); + rc_.UpdateFinishedTaskReferences(arguments, borrower_address, borrower_refs, nullptr); } // Global map from Worker ID -> MockWorkerClient. @@ -218,13 +202,13 @@ TEST_F(ReferenceCountTest, TestBasic) { rc->UpdateSubmittedTaskReferences({id1}); rc->UpdateSubmittedTaskReferences({id1, id2}); ASSERT_EQ(rc->NumObjectIDsInScope(), 2); - rc->UpdateFinishedTaskReferences({id1}, false, empty_borrower, empty_refs, &out); + rc->UpdateFinishedTaskReferences({id1}, empty_borrower, empty_refs, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 2); ASSERT_EQ(out.size(), 0); - rc->UpdateFinishedTaskReferences({id2}, false, empty_borrower, empty_refs, &out); + rc->UpdateFinishedTaskReferences({id2}, empty_borrower, empty_refs, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 1); ASSERT_EQ(out.size(), 1); - rc->UpdateFinishedTaskReferences({id1}, false, empty_borrower, empty_refs, &out); + rc->UpdateFinishedTaskReferences({id1}, empty_borrower, empty_refs, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 0); ASSERT_EQ(out.size(), 2); out.clear(); @@ -237,26 +221,16 @@ TEST_F(ReferenceCountTest, TestBasic) { rc->RemoveLocalReference(id1, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 2); ASSERT_EQ(out.size(), 0); - rc->UpdateFinishedTaskReferences({id2}, false, empty_borrower, empty_refs, &out); + rc->UpdateFinishedTaskReferences({id2}, empty_borrower, empty_refs, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 2); ASSERT_EQ(out.size(), 0); - rc->UpdateFinishedTaskReferences({id1}, false, empty_borrower, empty_refs, &out); + rc->UpdateFinishedTaskReferences({id1}, empty_borrower, empty_refs, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 1); ASSERT_EQ(out.size(), 1); rc->RemoveLocalReference(id2, &out); ASSERT_EQ(rc->NumObjectIDsInScope(), 0); ASSERT_EQ(out.size(), 2); out.clear(); - - // Submitted task with inlined references. - rc->UpdateSubmittedTaskReferences({id1}); - rc->UpdateSubmittedTaskReferences({id2}, {id1}, &out); - ASSERT_EQ(rc->NumObjectIDsInScope(), 1); - ASSERT_EQ(out.size(), 1); - rc->UpdateSubmittedTaskReferences({}, {id2}, &out); - ASSERT_EQ(rc->NumObjectIDsInScope(), 0); - ASSERT_EQ(out.size(), 2); - out.clear(); } // Tests call site tracking and ability to update object size. @@ -332,12 +306,12 @@ TEST(MemoryStoreIntegrationTest, TestSimple) { CoreWorkerMemoryStore store(nullptr, rc); // Tests putting an object with no references is ignored. - RAY_CHECK(store.Put(buffer, id2)); + RAY_CHECK_OK(store.Put(buffer, id2)); ASSERT_EQ(store.Size(), 0); // Tests ref counting overrides remove after get option. rc->AddLocalReference(id1, ""); - RAY_CHECK(store.Put(buffer, id1)); + RAY_CHECK_OK(store.Put(buffer, id1)); ASSERT_EQ(store.Size(), 1); std::vector> results; WorkerContext ctx(WorkerType::WORKER, JobID::Nil()); @@ -1733,88 +1707,6 @@ TEST(DistributedReferenceCountTest, TestReturnBorrowedIdChainOutOfOrder) { // TODO: Test Pop and Merge individually. -// Test to make sure that we call the lineage released callback correctly. -TEST_F(ReferenceCountLineageEnabledTest, TestBasicLineage) { - std::vector out; - std::vector lineage_deleted; - - ObjectID id = ObjectID::FromRandom(); - - rc->SetReleaseLineageCallback( - [&](const ObjectID &object_id, std::vector *ids_to_release) { - lineage_deleted.push_back(object_id); - }); - - // We should not keep lineage for borrowed objects. - rc->AddLocalReference(id, ""); - ASSERT_TRUE(rc->HasReference(id)); - rc->RemoveLocalReference(id, nullptr); - ASSERT_TRUE(lineage_deleted.empty()); - - // We should keep lineage for owned objects. - rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0); - rc->AddLocalReference(id, ""); - ASSERT_TRUE(rc->HasReference(id)); - rc->RemoveLocalReference(id, nullptr); - ASSERT_EQ(lineage_deleted.size(), 1); -} - -// Test for pinning the lineage of an object, where the lineage is a chain of -// tasks that each depend on the previous. The previous objects should already -// have gone out of scope, but their Reference entry is pinned until the final -// object goes out of scope. -TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) { - std::vector out; - std::vector lineage_deleted; - - std::vector ids; - for (int i = 0; i < 3; i++) { - ObjectID id = ObjectID::FromRandom(); - ids.push_back(id); - rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0); - } - - rc->SetReleaseLineageCallback( - [&](const ObjectID &object_id, std::vector *ids_to_release) { - lineage_deleted.push_back(object_id); - // Simulate releasing objects in downstream_id's lineage. - size_t i = 0; - for (; i < ids.size(); i++) { - if (ids[i] == object_id) { - break; - } - } - RAY_CHECK(i < ids.size()); - if (i > 0) { - ids_to_release->push_back(ids[i - 1]); - } - }); - - for (size_t i = 0; i < ids.size() - 1; i++) { - auto id = ids[i]; - // Submit a dependent task on id. - rc->AddLocalReference(id, ""); - ASSERT_TRUE(rc->HasReference(id)); - rc->UpdateSubmittedTaskReferences({id}); - rc->RemoveLocalReference(id, nullptr); - - // The task finishes but is retryable. - rc->UpdateFinishedTaskReferences({id}, false, empty_borrower, empty_refs, &out); - ASSERT_EQ(out.size(), 1); - out.clear(); - ASSERT_TRUE(lineage_deleted.empty()); - ASSERT_TRUE(rc->HasReference(id)); - } - - // The task return ID goes out of scope. - rc->AddLocalReference(ids.back(), ""); - rc->RemoveLocalReference(ids.back(), nullptr); - // The removal of the last return ID should recursively delete all - // references. - ASSERT_EQ(lineage_deleted.size(), ids.size()); - ASSERT_EQ(rc->NumObjectIDsInScope(), 0); -} - } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index 9260c9199..9391f5eb6 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -157,11 +157,10 @@ std::shared_ptr CoreWorkerMemoryStore::GetOrPromoteToPlasma( return nullptr; } -bool CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_id) { +Status CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_id) { std::vector)>> async_callbacks; auto object_entry = std::make_shared(object.GetData(), object.GetMetadata(), object.GetNestedIds(), true); - bool stored_in_direct_memory = true; // TODO(edoakes): we should instead return a flag to the caller to put the object in // plasma. @@ -171,7 +170,7 @@ bool CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_ auto iter = objects_.find(object_id); if (iter != objects_.end()) { - return true; // Object already exists in the store, which is fine. + return Status::OK(); // Object already exists in the store, which is fine. } auto async_callback_it = object_async_get_requests_.find(object_id); @@ -218,7 +217,6 @@ bool CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_ // in-memory store (would cause deadlock). if (should_put_in_plasma) { store_in_plasma_(object, object_id); - stored_in_direct_memory = false; } // It's important for performance to run the callbacks outside the lock. @@ -226,7 +224,7 @@ bool CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_ cb(object_entry); } - return stored_in_direct_memory; + return Status::OK(); } Status CoreWorkerMemoryStore::Get(const std::vector &object_ids, diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.h b/src/ray/core_worker/store_provider/memory_store/memory_store.h index 7967fa6d9..4db25d835 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.h +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.h @@ -42,9 +42,8 @@ class CoreWorkerMemoryStore { /// /// \param[in] object The ray object. /// \param[in] object_id Object ID specified by user. - /// \return Whether the object was put into the memory store. If false, then - /// this is because the object was promoted to and stored in plasma instead. - bool Put(const RayObject &object, const ObjectID &object_id); + /// \return Status. + Status Put(const RayObject &object, const ObjectID &object_id); /// Get a list of objects from the object store. /// diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 07e1e432d..7b369162a 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -29,6 +29,9 @@ void TaskManager::AddPendingTask(const TaskID &caller_id, const TaskSpecification &spec, const std::string &call_site, int max_retries) { RAY_LOG(DEBUG) << "Adding pending task " << spec.TaskId(); + absl::MutexLock lock(&mu_); + std::pair entry = {spec, max_retries}; + RAY_CHECK(pending_tasks_.emplace(spec.TaskId(), std::move(entry)).second); // Add references for the dependencies to the task. std::vector task_deps; @@ -68,23 +71,16 @@ void TaskManager::AddPendingTask(const TaskID &caller_id, /*inner_ids=*/{}, caller_id, caller_address, call_site, -1); } - - { - absl::MutexLock lock(&mu_); - RAY_CHECK(submissible_tasks_ - .emplace(spec.TaskId(), TaskEntry(spec, max_retries, num_returns)) - .second); - } } void TaskManager::DrainAndShutdown(std::function shutdown) { bool has_pending_tasks = false; { absl::MutexLock lock(&mu_); - if (!submissible_tasks_.empty()) { + if (!pending_tasks_.empty()) { has_pending_tasks = true; RAY_LOG(WARNING) - << "This worker is still managing " << submissible_tasks_.size() + << "This worker is still managing " << pending_tasks_.size() << " in flight tasks, waiting for them to finish before shutting down."; shutdown_hook_ = shutdown; } @@ -96,31 +92,27 @@ void TaskManager::DrainAndShutdown(std::function shutdown) { } } -bool TaskManager::IsTaskSubmissible(const TaskID &task_id) const { - absl::MutexLock lock(&mu_); - return submissible_tasks_.count(task_id) > 0; -} - bool TaskManager::IsTaskPending(const TaskID &task_id) const { absl::MutexLock lock(&mu_); - const auto it = submissible_tasks_.find(task_id); - if (it == submissible_tasks_.end()) { - return false; - } - return it->second.pending; -} - -int TaskManager::NumSubmissibleTasks() const { - absl::MutexLock lock(&mu_); - return submissible_tasks_.size(); + return pending_tasks_.count(task_id) > 0; } void TaskManager::CompletePendingTask(const TaskID &task_id, const rpc::PushTaskReply &reply, const rpc::Address &worker_addr) { RAY_LOG(DEBUG) << "Completing task " << task_id; + TaskSpecification spec; + { + absl::MutexLock lock(&mu_); + auto it = pending_tasks_.find(task_id); + RAY_CHECK(it != pending_tasks_.end()) + << "Tried to complete task that was not pending " << task_id; + spec = it->second.first; + pending_tasks_.erase(it); + } + + RemoveFinishedTaskReferences(spec, worker_addr, reply.borrowed_refs()); - std::vector direct_return_ids; for (int i = 0; i < reply.return_objects_size(); i++) { const auto &return_object = reply.return_objects(i); ObjectID object_id = ObjectID::FromBinary(return_object.object_id()); @@ -128,7 +120,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, if (return_object.in_plasma()) { // Mark it as in plasma with a dummy object. - RAY_CHECK( + RAY_CHECK_OK( in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id)); } else { std::shared_ptr data_buffer; @@ -145,53 +137,13 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, reinterpret_cast(return_object.metadata().data())), return_object.metadata().size()); } - bool stored_in_direct_memory = in_memory_store_->Put( + RAY_CHECK_OK(in_memory_store_->Put( RayObject(data_buffer, metadata_buffer, IdVectorFromProtobuf(return_object.nested_inlined_ids())), - object_id); - if (stored_in_direct_memory) { - direct_return_ids.push_back(object_id); - } + object_id)); } } - TaskSpecification spec; - bool release_lineage = true; - { - absl::MutexLock lock(&mu_); - auto it = submissible_tasks_.find(task_id); - RAY_CHECK(it != submissible_tasks_.end()) - << "Tried to complete task that was not pending " << task_id; - spec = it->second.spec; - - // Release the lineage for any non-plasma return objects. - for (const auto &direct_return_id : direct_return_ids) { - RAY_LOG(DEBUG) << "Task " << it->first << " returned direct object " - << direct_return_id << ", now has " - << it->second.reconstructable_return_ids.size() - << " plasma returns in scope"; - it->second.reconstructable_return_ids.erase(direct_return_id); - } - RAY_LOG(DEBUG) << "Task " << it->first << " now has " - << it->second.reconstructable_return_ids.size() - << " plasma returns in scope"; - it->second.pending = false; - - // A finished task can be only be re-executed if it has some number of - // retries left and returned at least one object that is still in use and - // stored in plasma. - bool task_retryable = - it->second.num_retries_left > 0 && !it->second.reconstructable_return_ids.empty(); - if (task_retryable) { - // Pin the task spec if it may be retried again. - release_lineage = false; - } else { - submissible_tasks_.erase(it); - } - } - - RemoveFinishedTaskReferences(spec, release_lineage, worker_addr, reply.borrowed_refs()); - ShutdownIfNeeded(); } @@ -203,20 +155,18 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_ << rpc::ErrorType_Name(error_type); int num_retries_left = 0; TaskSpecification spec; - bool release_lineage = true; { absl::MutexLock lock(&mu_); - auto it = submissible_tasks_.find(task_id); - RAY_CHECK(it != submissible_tasks_.end()) + auto it = pending_tasks_.find(task_id); + RAY_CHECK(it != pending_tasks_.end()) << "Tried to complete task that was not pending " << task_id; - spec = it->second.spec; - num_retries_left = it->second.num_retries_left; + spec = it->second.first; + num_retries_left = it->second.second; if (num_retries_left == 0) { - submissible_tasks_.erase(it); + pending_tasks_.erase(it); } else { - RAY_CHECK(it->second.num_retries_left > 0); - it->second.num_retries_left--; - release_lineage = false; + RAY_CHECK(num_retries_left > 0); + it->second.second--; } } @@ -249,7 +199,7 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_ } // The worker failed to execute the task, so it cannot be borrowing any // objects. - RemoveFinishedTaskReferences(spec, release_lineage, rpc::Address(), + RemoveFinishedTaskReferences(spec, rpc::Address(), ReferenceCounter::ReferenceTableProto()); MarkPendingTaskFailed(task_id, spec, error_type); } @@ -261,7 +211,7 @@ void TaskManager::ShutdownIfNeeded() { std::function shutdown_hook = nullptr; { absl::MutexLock lock(&mu_); - if (shutdown_hook_ && submissible_tasks_.empty()) { + if (shutdown_hook_ && pending_tasks_.empty()) { RAY_LOG(WARNING) << "All in flight tasks finished, worker will shut down after " "draining references."; std::swap(shutdown_hook_, shutdown_hook); @@ -284,7 +234,7 @@ void TaskManager::OnTaskDependenciesInlined( } void TaskManager::RemoveFinishedTaskReferences( - TaskSpecification &spec, bool release_lineage, const rpc::Address &borrower_addr, + TaskSpecification &spec, const rpc::Address &borrower_addr, const ReferenceCounter::ReferenceTableProto &borrowed_refs) { std::vector plasma_dependencies; for (size_t i = 0; i < spec.NumArgs(); i++) { @@ -305,51 +255,11 @@ void TaskManager::RemoveFinishedTaskReferences( } std::vector deleted; - reference_counter_->UpdateFinishedTaskReferences( - plasma_dependencies, release_lineage, borrower_addr, borrowed_refs, &deleted); + reference_counter_->UpdateFinishedTaskReferences(plasma_dependencies, borrower_addr, + borrowed_refs, &deleted); in_memory_store_->Delete(deleted); } -void TaskManager::RemoveLineageReference(const ObjectID &object_id, - std::vector *released_objects) { - absl::MutexLock lock(&mu_); - const TaskID &task_id = object_id.TaskId(); - auto it = submissible_tasks_.find(task_id); - if (it == submissible_tasks_.end()) { - RAY_LOG(DEBUG) << "No lineage for object " << object_id; - return; - } - - RAY_LOG(DEBUG) << "Plasma object " << object_id << " out of scope"; - for (const auto &plasma_id : it->second.reconstructable_return_ids) { - RAY_LOG(DEBUG) << "Task " << task_id << " has " << plasma_id << " in scope"; - } - it->second.reconstructable_return_ids.erase(object_id); - RAY_LOG(DEBUG) << "Task " << task_id << " now has " - << it->second.reconstructable_return_ids.size() - << " plasma returns in scope"; - - if (it->second.reconstructable_return_ids.empty() && !it->second.pending) { - // If the task can no longer be retried, decrement the lineage ref count - // for each of the task's args. - for (size_t i = 0; i < it->second.spec.NumArgs(); i++) { - if (it->second.spec.ArgByRef(i)) { - for (size_t j = 0; j < it->second.spec.ArgIdCount(i); j++) { - released_objects->push_back(it->second.spec.ArgId(i, j)); - } - } else { - const auto &inlined_ids = it->second.spec.ArgInlinedIds(i); - released_objects->insert(released_objects->end(), inlined_ids.begin(), - inlined_ids.end()); - } - } - - // The task has finished and none of the return IDs are in scope anymore, - // so it is safe to remove the task spec. - submissible_tasks_.erase(it); - } -} - void TaskManager::MarkPendingTaskFailed(const TaskID &task_id, const TaskSpecification &spec, rpc::ErrorType error_type) { @@ -360,7 +270,7 @@ void TaskManager::MarkPendingTaskFailed(const TaskID &task_id, const auto object_id = ObjectID::ForTaskReturn( task_id, /*index=*/i + 1, /*transport_type=*/static_cast(TaskTransportType::DIRECT)); - RAY_UNUSED(in_memory_store_->Put(RayObject(error_type), object_id)); + RAY_CHECK_OK(in_memory_store_->Put(RayObject(error_type), object_id)); } if (spec.IsActorCreationTask()) { @@ -372,9 +282,9 @@ void TaskManager::MarkPendingTaskFailed(const TaskID &task_id, TaskSpecification TaskManager::GetTaskSpec(const TaskID &task_id) const { absl::MutexLock lock(&mu_); - auto it = submissible_tasks_.find(task_id); - RAY_CHECK(it != submissible_tasks_.end()); - return it->second.spec; + auto it = pending_tasks_.find(task_id); + RAY_CHECK(it != pending_tasks_.end()); + return it->second.first; } } // namespace ray diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 4627d78fd..cea06bc98 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -53,12 +53,7 @@ class TaskManager : public TaskFinisherInterface { : in_memory_store_(in_memory_store), reference_counter_(reference_counter), actor_manager_(actor_manager), - retry_task_callback_(retry_task_callback) { - reference_counter_->SetReleaseLineageCallback( - [this](const ObjectID &object_id, std::vector *ids_to_release) { - RemoveLineageReference(object_id, ids_to_release); - }); - } + retry_task_callback_(retry_task_callback) {} /// Add a task that is pending execution. /// @@ -77,6 +72,12 @@ class TaskManager : public TaskFinisherInterface { /// \param shutdown The shutdown callback to call. void DrainAndShutdown(std::function shutdown); + /// Return whether the task is pending. + /// + /// \param[in] task_id ID of the task to query. + /// \return Whether the task is pending. + bool IsTaskPending(const TaskID &task_id) const; + /// Write return objects for a pending task to the memory store. /// /// \param[in] task_id ID of the pending task. @@ -110,68 +111,10 @@ class TaskManager : public TaskFinisherInterface { /// Return the spec for a pending task. TaskSpecification GetTaskSpec(const TaskID &task_id) const; - /// Return whether this task can be submitted for execution. - /// - /// \param[in] task_id ID of the task to query. - /// \return Whether the task can be submitted for execution. - bool IsTaskSubmissible(const TaskID &task_id) const; - - /// Return whether the task is pending. - /// - /// \param[in] task_id ID of the task to query. - /// \return Whether the task is pending. - bool IsTaskPending(const TaskID &task_id) const; - /// Return the number of pending tasks. - int NumSubmissibleTasks() const; + int NumPendingTasks() const { return pending_tasks_.size(); } private: - struct TaskEntry { - TaskEntry(const TaskSpecification &spec_arg, int num_retries_left_arg, - size_t num_returns) - : spec(spec_arg), num_retries_left(num_retries_left_arg) { - for (size_t i = 0; i < num_returns; i++) { - reconstructable_return_ids.insert(spec.ReturnId(i, TaskTransportType::DIRECT)); - } - } - /// The task spec. This is pinned as long as the following are true: - /// - The task is still pending execution. This means that the task may - /// fail and so it may be retried in the future. - /// - The task finished execution, but it has num_retries_left > 0 and - /// reconstructable_return_ids is not empty. This means that the task may - /// be retried in the future to recreate its return objects. - /// TODO(swang): The TaskSpec protobuf must be copied into the - /// PushTaskRequest protobuf when sent to a worker so that we can retry it if - /// the worker fails. We could avoid this by either not caching the full - /// TaskSpec for tasks that cannot be retried (e.g., actor tasks), or by - /// storing a shared_ptr to a PushTaskRequest protobuf for all tasks. - const TaskSpecification spec; - // Number of times this task may be resubmitted. If this reaches 0, then - // the task entry may be erased. - int num_retries_left; - // Whether this task is currently pending execution. This is used to pin - // the task entry if the task is still pending but all of its return IDs - // are out of scope. - bool pending = true; - // Objects returned by this task that are reconstructable. This is set - // initially to the task's return objects, since if the task fails, these - // objects may be reconstructed by resubmitting the task. Once the task - // finishes its first execution, then the objects that the task returned by - // value are removed from this set because they can be inlined in any - // dependent tasks. Objects that the task returned through plasma are - // reconstructable, so they are only removed from this set once: - // 1) The language frontend no longer has a reference to the object ID. - // 2) There are no tasks that depend on the object. This includes both - // pending tasks and tasks that finished execution but that may be - // retried in the future. - absl::flat_hash_set reconstructable_return_ids; - }; - - /// Remove a lineage reference to this object ID. This should be called - /// whenever a task that depended on this object ID can no longer be retried. - void RemoveLineageReference(const ObjectID &object_id, - std::vector *ids_to_release) LOCKS_EXCLUDED(mu_); - /// Treat a pending task as failed. The lock should not be held when calling /// this method because it may trigger callbacks in this or other classes. void MarkPendingTaskFailed(const TaskID &task_id, const TaskSpecification &spec, @@ -182,7 +125,7 @@ class TaskManager : public TaskFinisherInterface { /// failed. The remaining dependencies are plasma objects and any ObjectIDs /// that were inlined in the task spec. void RemoveFinishedTaskReferences( - TaskSpecification &spec, bool release_lineage, const rpc::Address &worker_addr, + TaskSpecification &spec, const rpc::Address &worker_addr, const ReferenceCounter::ReferenceTableProto &borrowed_refs); /// Shutdown if all tasks are finished and shutdown is scheduled. @@ -211,11 +154,16 @@ class TaskManager : public TaskFinisherInterface { /// Protects below fields. mutable absl::Mutex mu_; - /// This map contains one entry per task that may be submitted for - /// execution. This includes both tasks that are currently pending execution - /// and tasks that finished execution but that may be retried again in the - /// future. - absl::flat_hash_map submissible_tasks_ GUARDED_BY(mu_); + /// Map from task ID to a pair of: + /// {task spec, number of allowed retries left} + /// This map contains one entry per pending task that we submitted. + /// TODO(swang): The TaskSpec protobuf must be copied into the + /// PushTaskRequest protobuf when sent to a worker so that we can retry it if + /// the worker fails. We could avoid this by either not caching the full + /// TaskSpec for tasks that cannot be retried (e.g., actor tasks), or by + /// storing a shared_ptr to a PushTaskRequest protobuf for all tasks. + absl::flat_hash_map> pending_tasks_ + GUARDED_BY(mu_); /// Optional shutdown hook to call when pending tasks all finish. std::function shutdown_hook_ GUARDED_BY(mu_) = nullptr; diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 1750dd5d8..ab769508d 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -786,7 +786,7 @@ TEST_F(SingleNodeTest, TestMemoryStoreProvider) { std::vector ids(buffers.size()); for (size_t i = 0; i < ids.size(); i++) { ids[i] = ObjectID::FromRandom().WithDirectTransportType(); - RAY_CHECK(provider.Put(buffers[i], ids[i])); + RAY_CHECK_OK(provider.Put(buffers[i], ids[i])); } absl::flat_hash_set wait_ids(ids.begin(), ids.end()); @@ -843,7 +843,7 @@ TEST_F(SingleNodeTest, TestMemoryStoreProvider) { std::vector unready_ids(buffers.size()); for (size_t i = 0; i < unready_ids.size(); i++) { ready_ids[i] = ObjectID::FromRandom().WithDirectTransportType(); - RAY_CHECK(provider.Put(buffers[i], ready_ids[i])); + RAY_CHECK_OK(provider.Put(buffers[i], ready_ids[i])); unready_ids[i] = ObjectID::FromRandom().WithDirectTransportType(); } @@ -851,7 +851,7 @@ TEST_F(SingleNodeTest, TestMemoryStoreProvider) { sleep(1); for (size_t i = 0; i < unready_ids.size(); i++) { - RAY_CHECK(provider.Put(buffers[i], unready_ids[i])); + RAY_CHECK_OK(provider.Put(buffers[i], unready_ids[i])); } }; diff --git a/src/ray/core_worker/test/direct_actor_transport_test.cc b/src/ray/core_worker/test/direct_actor_transport_test.cc index c68d84d08..0caa9004a 100644 --- a/src/ray/core_worker/test/direct_actor_transport_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_test.cc @@ -136,9 +136,9 @@ TEST_F(DirectActorSubmitterTest, TestDependencies) { // Put the dependencies in the store in the same order as task submission. auto data = GenerateRandomObject(); - ASSERT_TRUE(store_->Put(*data, obj1)); + ASSERT_TRUE(store_->Put(*data, obj1).ok()); ASSERT_EQ(worker_client_->callbacks.size(), 1); - ASSERT_TRUE(store_->Put(*data, obj2)); + ASSERT_TRUE(store_->Put(*data, obj2).ok()); ASSERT_EQ(worker_client_->callbacks.size(), 2); } @@ -165,9 +165,9 @@ TEST_F(DirectActorSubmitterTest, TestOutOfOrderDependencies) { // Put the dependencies in the store in the opposite order of task // submission. auto data = GenerateRandomObject(); - ASSERT_TRUE(store_->Put(*data, obj2)); + ASSERT_TRUE(store_->Put(*data, obj2).ok()); ASSERT_EQ(worker_client_->callbacks.size(), 0); - ASSERT_TRUE(store_->Put(*data, obj1)); + ASSERT_TRUE(store_->Put(*data, obj1).ok()); ASSERT_EQ(worker_client_->callbacks.size(), 2); } diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 6d5cea0eb..259a2c20e 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -139,7 +139,7 @@ TEST(TestMemoryStore, TestPromoteToPlasma) { ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); auto data = GenerateRandomObject(); - ASSERT_TRUE(mem->Put(*data, obj1)); + ASSERT_TRUE(mem->Put(*data, obj1).ok()); // Test getting an already existing object. ASSERT_TRUE(mem->GetOrPromoteToPlasma(obj1) != nullptr); @@ -148,7 +148,7 @@ TEST(TestMemoryStore, TestPromoteToPlasma) { // Testing getting an object that doesn't exist yet causes promotion. ASSERT_TRUE(mem->GetOrPromoteToPlasma(obj2) == nullptr); ASSERT_TRUE(num_plasma_puts == 0); - ASSERT_FALSE(mem->Put(*data, obj2)); + ASSERT_TRUE(mem->Put(*data, obj2).ok()); ASSERT_TRUE(num_plasma_puts == 1); // The next time you get it, it's already there so no need to promote. @@ -191,7 +191,7 @@ TEST(LocalDependencyResolverTest, TestHandlePlasmaPromotion) { auto metadata = const_cast(reinterpret_cast(meta.data())); auto meta_buffer = std::make_shared(metadata, meta.size()); auto data = RayObject(nullptr, meta_buffer, std::vector()); - ASSERT_TRUE(store->Put(data, obj1)); + ASSERT_TRUE(store->Put(data, obj1).ok()); TaskSpecification task; task.GetMutableMessage().add_args()->add_object_ids(obj1.Binary()); ASSERT_TRUE(task.ArgId(0, 0).IsDirectCallType()); @@ -213,8 +213,8 @@ TEST(LocalDependencyResolverTest, TestInlineLocalDependencies) { ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); auto data = GenerateRandomObject(); // Ensure the data is already present in the local store. - ASSERT_TRUE(store->Put(*data, obj1)); - ASSERT_TRUE(store->Put(*data, obj2)); + ASSERT_TRUE(store->Put(*data, obj1).ok()); + ASSERT_TRUE(store->Put(*data, obj2).ok()); TaskSpecification task; task.GetMutableMessage().add_args()->add_object_ids(obj1.Binary()); task.GetMutableMessage().add_args()->add_object_ids(obj2.Binary()); @@ -244,8 +244,8 @@ TEST(LocalDependencyResolverTest, TestInlinePendingDependencies) { resolver.ResolveDependencies(task, [&ok]() { ok = true; }); ASSERT_EQ(resolver.NumPendingTasks(), 1); ASSERT_TRUE(!ok); - ASSERT_TRUE(store->Put(*data, obj1)); - ASSERT_TRUE(store->Put(*data, obj2)); + ASSERT_TRUE(store->Put(*data, obj1).ok()); + ASSERT_TRUE(store->Put(*data, obj2).ok()); // Tests that the task proto was rewritten to have inline argument values after // resolution completes. ASSERT_TRUE(ok); @@ -273,8 +273,8 @@ TEST(LocalDependencyResolverTest, TestInlinedObjectIds) { resolver.ResolveDependencies(task, [&ok]() { ok = true; }); ASSERT_EQ(resolver.NumPendingTasks(), 1); ASSERT_TRUE(!ok); - ASSERT_TRUE(store->Put(*data, obj1)); - ASSERT_TRUE(store->Put(*data, obj2)); + ASSERT_TRUE(store->Put(*data, obj1).ok()); + ASSERT_TRUE(store->Put(*data, obj2).ok()); // Tests that the task proto was rewritten to have inline argument values after // resolution completes. ASSERT_TRUE(ok); @@ -698,16 +698,16 @@ TEST(DirectTaskTransportTest, TestSchedulingKeys) { ObjectID plasma2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); // Ensure the data is already present in the local store for direct call objects. auto data = GenerateRandomObject(); - ASSERT_TRUE(store->Put(*data, direct1)); - ASSERT_TRUE(store->Put(*data, direct2)); + ASSERT_TRUE(store->Put(*data, direct1).ok()); + ASSERT_TRUE(store->Put(*data, direct2).ok()); // Force plasma objects to be promoted. std::string meta = std::to_string(static_cast(rpc::ErrorType::OBJECT_IN_PLASMA)); auto metadata = const_cast(reinterpret_cast(meta.data())); auto meta_buffer = std::make_shared(metadata, meta.size()); auto plasma_data = RayObject(nullptr, meta_buffer, std::vector()); - ASSERT_TRUE(store->Put(plasma_data, plasma1)); - ASSERT_TRUE(store->Put(plasma_data, plasma2)); + ASSERT_TRUE(store->Put(plasma_data, plasma1).ok()); + ASSERT_TRUE(store->Put(plasma_data, plasma2).ok()); TaskSpecification same_deps_1 = BuildTaskSpec(resources1, descriptor1); same_deps_1.GetMutableMessage().add_args()->add_object_ids(direct1.Binary()); diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index e9efc9ac7..3345d4304 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -44,11 +44,10 @@ class MockActorManager : public ActorManagerInterface { class TaskManagerTest : public ::testing::Test { public: - TaskManagerTest(bool lineage_pinning_enabled = false) + TaskManagerTest() : store_(std::shared_ptr(new CoreWorkerMemoryStore())), - reference_counter_(std::shared_ptr(new ReferenceCounter( - rpc::Address(), - /*distributed_ref_counting_enabled=*/true, lineage_pinning_enabled))), + reference_counter_( + std::shared_ptr(new ReferenceCounter(rpc::Address()))), actor_manager_(std::shared_ptr(new MockActorManager())), manager_(store_, reference_counter_, actor_manager_, [this](const TaskSpecification &spec) { @@ -63,11 +62,6 @@ class TaskManagerTest : public ::testing::Test { int num_retries_ = 0; }; -class TaskManagerLineageTest : public TaskManagerTest { - public: - TaskManagerLineageTest() : TaskManagerTest(true) {} -}; - TEST_F(TaskManagerTest, TestTaskSuccess) { TaskID caller_id = TaskID::Nil(); rpc::Address caller_address; @@ -159,7 +153,6 @@ TEST_F(TaskManagerTest, TestTaskRetry) { auto error = rpc::ErrorType::WORKER_DIED; for (int i = 0; i < num_retries; i++) { - RAY_LOG(INFO) << "Retry " << i; manager_.PendingTaskFailed(spec.TaskId(), error); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); @@ -174,7 +167,7 @@ TEST_F(TaskManagerTest, TestTaskRetry) { ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 1); std::vector> results; - RAY_CHECK_OK(store_->Get({return_id}, 1, 0, ctx, false, &results)); + RAY_CHECK_OK(store_->Get({return_id}, 1, -0, ctx, false, &results)); ASSERT_EQ(results.size(), 1); rpc::ErrorType stored_error; ASSERT_TRUE(results[0]->IsException(&stored_error)); @@ -187,240 +180,6 @@ TEST_F(TaskManagerTest, TestTaskRetry) { ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); } -// Test to make sure that the task spec and dependencies for an object are -// evicted when lineage pinning is disabled in the ReferenceCounter. -TEST_F(TaskManagerTest, TestLineageEvicted) { - TaskID caller_id = TaskID::Nil(); - rpc::Address caller_address; - ObjectID dep1 = ObjectID::FromRandom(); - ObjectID dep2 = ObjectID::FromRandom(); - ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); - auto spec = CreateTaskHelper(1, {dep1, dep2}); - int num_retries = 3; - manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); - - auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT); - rpc::PushTaskReply reply; - auto return_object = reply.add_return_objects(); - return_object->set_object_id(return_id.Binary()); - return_object->set_in_plasma(true); - manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address()); - // The task is still pinned because its return ID is still in scope. - ASSERT_TRUE(manager_.IsTaskSubmissible(spec.TaskId())); - ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); - // The dependencies should not be pinned because lineage pinning is - // disabled. - ASSERT_FALSE(reference_counter_->HasReference(dep1)); - ASSERT_FALSE(reference_counter_->HasReference(dep2)); - ASSERT_TRUE(reference_counter_->HasReference(return_id)); - - // Once the return ID goes out of scope, the task spec and its dependencies - // are released. - reference_counter_->AddLocalReference(return_id, ""); - reference_counter_->RemoveLocalReference(return_id, nullptr); - ASSERT_FALSE(manager_.IsTaskSubmissible(spec.TaskId())); - ASSERT_FALSE(reference_counter_->HasReference(return_id)); -} - -// Test to make sure that the task spec and dependencies for an object are -// pinned when lineage pinning is enabled in the ReferenceCounter. -TEST_F(TaskManagerLineageTest, TestLineagePinned) { - TaskID caller_id = TaskID::Nil(); - rpc::Address caller_address; - // Submit a task with 2 arguments. - ObjectID dep1 = ObjectID::FromRandom(); - ObjectID dep2 = ObjectID::FromRandom(); - ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); - auto spec = CreateTaskHelper(1, {dep1, dep2}); - ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); - int num_retries = 3; - manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); - auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT); - reference_counter_->AddLocalReference(return_id, ""); - ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); - ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); - - // The task completes. - rpc::PushTaskReply reply; - auto return_object = reply.add_return_objects(); - return_object->set_object_id(return_id.Binary()); - auto data = GenerateRandomBuffer(); - return_object->set_data(data->Data(), data->Size()); - return_object->set_in_plasma(true); - manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address()); - // The task should still be in the lineage because its return ID is in scope. - ASSERT_TRUE(manager_.IsTaskSubmissible(spec.TaskId())); - ASSERT_TRUE(reference_counter_->HasReference(dep1)); - ASSERT_TRUE(reference_counter_->HasReference(dep2)); - ASSERT_TRUE(reference_counter_->HasReference(return_id)); - - // All lineage should be erased. - reference_counter_->RemoveLocalReference(return_id, nullptr); - ASSERT_FALSE(manager_.IsTaskSubmissible(spec.TaskId())); - ASSERT_FALSE(reference_counter_->HasReference(dep1)); - ASSERT_FALSE(reference_counter_->HasReference(dep2)); - ASSERT_FALSE(reference_counter_->HasReference(return_id)); -} - -// Test to make sure that the task spec and dependencies for an object are -// evicted if the object is returned by value, instead of stored in plasma. -TEST_F(TaskManagerLineageTest, TestDirectObjectNoLineage) { - TaskID caller_id = TaskID::Nil(); - rpc::Address caller_address; - // Submit a task with 2 arguments. - ObjectID dep1 = ObjectID::FromRandom(); - ObjectID dep2 = ObjectID::FromRandom(); - ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); - auto spec = CreateTaskHelper(1, {dep1, dep2}); - ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); - int num_retries = 3; - manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); - auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT); - reference_counter_->AddLocalReference(return_id, ""); - ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); - ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); - - // The task completes. - rpc::PushTaskReply reply; - auto return_object = reply.add_return_objects(); - return_object->set_object_id(return_id.Binary()); - auto data = GenerateRandomBuffer(); - return_object->set_data(data->Data(), data->Size()); - return_object->set_in_plasma(false); - manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address()); - // All lineage should be erased because the return object was not stored in - // plasma. - ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); - ASSERT_FALSE(reference_counter_->HasReference(dep1)); - ASSERT_FALSE(reference_counter_->HasReference(dep2)); - ASSERT_TRUE(reference_counter_->HasReference(return_id)); -} - -// Test to make sure that the task spec and dependencies for an object are -// pinned if the object goes out of scope before the task finishes. This is -// needed in case the pending task fails and needs to be retried. -TEST_F(TaskManagerLineageTest, TestLineagePinnedOutOfOrder) { - TaskID caller_id = TaskID::Nil(); - rpc::Address caller_address; - // Submit a task with 2 arguments. - ObjectID dep1 = ObjectID::FromRandom(); - ObjectID dep2 = ObjectID::FromRandom(); - ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); - auto spec = CreateTaskHelper(1, {dep1, dep2}); - ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); - int num_retries = 3; - manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); - auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT); - reference_counter_->AddLocalReference(return_id, ""); - ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); - ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); - - // The return ID goes out of scope. The lineage should still be pinned - // because the task has not completed yet. - reference_counter_->RemoveLocalReference(return_id, nullptr); - ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); - ASSERT_TRUE(reference_counter_->HasReference(dep1)); - ASSERT_TRUE(reference_counter_->HasReference(dep2)); - ASSERT_FALSE(reference_counter_->HasReference(return_id)); - - // The task completes. - rpc::PushTaskReply reply; - auto return_object = reply.add_return_objects(); - return_object->set_object_id(return_id.Binary()); - auto data = GenerateRandomBuffer(); - return_object->set_data(data->Data(), data->Size()); - return_object->set_in_plasma(true); - manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address()); - // All lineage should be erased. - ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); - ASSERT_FALSE(reference_counter_->HasReference(dep1)); - ASSERT_FALSE(reference_counter_->HasReference(dep2)); - ASSERT_FALSE(reference_counter_->HasReference(return_id)); -} - -// Test for pinning the lineage of an object, where the lineage is a chain of -// tasks that each depend on the previous. All tasks should be pinned until the -// final object goes out of scope. -TEST_F(TaskManagerLineageTest, TestRecursiveLineagePinned) { - TaskID caller_id = TaskID::Nil(); - rpc::Address caller_address; - - ObjectID dep = ObjectID::FromRandom(); - reference_counter_->AddLocalReference(dep, ""); - for (int i = 0; i < 3; i++) { - auto spec = CreateTaskHelper(1, {dep}); - int num_retries = 3; - manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); - auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT); - reference_counter_->AddLocalReference(return_id, ""); - - // The task completes. - rpc::PushTaskReply reply; - auto return_object = reply.add_return_objects(); - return_object->set_object_id(return_id.Binary()); - auto data = GenerateRandomBuffer(); - return_object->set_data(data->Data(), data->Size()); - return_object->set_in_plasma(true); - manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address()); - - // All tasks should be pinned in the lineage. - ASSERT_EQ(manager_.NumSubmissibleTasks(), i + 1); - // All objects in the lineage of the newest return ID, plus the return ID - // itself, should be pinned. - ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), i + 2); - - reference_counter_->RemoveLocalReference(dep, nullptr); - dep = return_id; - } - - // The task's return ID goes out of scope before the task finishes. - reference_counter_->RemoveLocalReference(dep, nullptr); - ASSERT_EQ(manager_.NumSubmissibleTasks(), 0); - ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); -} - -// Test for evicting the lineage of an object passed by value, where the -// lineage is a chain of tasks that each depend on the previous and each return -// a direct value. All tasks should be evicted as soon as they complete, even -// though the final object is still in scope. -TEST_F(TaskManagerLineageTest, TestRecursiveDirectObjectNoLineage) { - TaskID caller_id = TaskID::Nil(); - rpc::Address caller_address; - - ObjectID dep = ObjectID::FromRandom(); - reference_counter_->AddLocalReference(dep, ""); - for (int i = 0; i < 3; i++) { - auto spec = CreateTaskHelper(1, {dep}); - int num_retries = 3; - manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); - auto return_id = spec.ReturnId(0, TaskTransportType::DIRECT); - reference_counter_->AddLocalReference(return_id, ""); - - // The task completes. - rpc::PushTaskReply reply; - auto return_object = reply.add_return_objects(); - return_object->set_object_id(return_id.Binary()); - auto data = GenerateRandomBuffer(); - return_object->set_data(data->Data(), data->Size()); - return_object->set_in_plasma(false); - manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address()); - - // No tasks should be pinned because they returned direct objects. - ASSERT_EQ(manager_.NumSubmissibleTasks(), 0); - // Only the dependency and the newest return ID should be in scope because - // all objects in the lineage were direct. - ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 2); - - reference_counter_->RemoveLocalReference(dep, nullptr); - dep = return_id; - } - - // The task's return ID goes out of scope before the task finishes. - reference_counter_->RemoveLocalReference(dep, nullptr); - ASSERT_EQ(manager_.NumSubmissibleTasks(), 0); - ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); -} - } // namespace ray int main(int argc, char **argv) {