diff --git a/python/ray/tests/test_reconstruction.py b/python/ray/tests/test_reconstruction.py index d0d7cf4f5..074a6ad3c 100644 --- a/python/ray/tests/test_reconstruction.py +++ b/python/ray/tests/test_reconstruction.py @@ -164,6 +164,61 @@ def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled): raise e.as_instanceof_cause() +@pytest.mark.parametrize("reconstruction_enabled", [False, True]) +def test_basic_reconstruction_put(ray_start_cluster, reconstruction_enabled): + config = json.dumps({ + "num_heartbeats_timeout": 10, + "raylet_heartbeat_timeout_milliseconds": 100, + "lineage_pinning_enabled": 1 if reconstruction_enabled else 0, + "free_objects_period_milliseconds": -1, + }) + cluster = Cluster() + # Head node with no resources. + cluster.add_node(num_cpus=0, _internal_config=config) + # Node to place the initial object. + node_to_kill = cluster.add_node( + num_cpus=1, + resources={"node1": 1}, + object_store_memory=10**8, + _internal_config=config) + cluster.add_node( + num_cpus=1, + resources={"node2": 1}, + object_store_memory=10**8, + _internal_config=config) + cluster.wait_for_nodes() + ray.init(address=cluster.address, _internal_config=config) + + @ray.remote(max_retries=1 if reconstruction_enabled else 0) + def large_object(): + return np.zeros(10**7, dtype=np.uint8) + + @ray.remote + def dependent_task(x): + return x + + obj = ray.put(np.zeros(10**7, dtype=np.uint8)) + result = dependent_task.options(resources={"node1": 1}).remote(obj) + ray.get(result) + del obj + + cluster.remove_node(node_to_kill, allow_graceful=False) + cluster.add_node( + num_cpus=1, + resources={"node1": 1}, + object_store_memory=10**8, + _internal_config=config) + + for _ in range(20): + ray.put(np.zeros(10**7, dtype=np.uint8)) + + if reconstruction_enabled: + ray.get(result) + else: + with pytest.raises(ray.exceptions.UnreconstructableError): + ray.get(result) + + @pytest.mark.parametrize("reconstruction_enabled", [False, True]) def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled): config = json.dumps({ diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 2adaa9d25..bb7c7ea51 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -732,7 +732,9 @@ Status CoreWorker::Put(const RayObject &object, worker_context_.GetNextPutIndex(), static_cast(TaskTransportType::DIRECT)); reference_counter_->AddOwnedObject(*object_id, contained_object_ids, GetCallerId(), - rpc_address_, CurrentCallSite(), object.GetSize()); + rpc_address_, CurrentCallSite(), object.GetSize(), + /*is_reconstructable=*/false, + ClientID::FromBinary(rpc_address_.raylet_id())); return Put(object, contained_object_ids, *object_id, /*pin_object=*/true); } @@ -783,9 +785,10 @@ Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t // Only add the object to the reference counter if it didn't already exist. if (data) { - reference_counter_->AddOwnedObject(*object_id, contained_object_ids, GetCallerId(), - rpc_address_, CurrentCallSite(), - data_size + metadata->Size()); + reference_counter_->AddOwnedObject( + *object_id, contained_object_ids, GetCallerId(), rpc_address_, CurrentCallSite(), + data_size + metadata->Size(), + /*is_reconstructable=*/false, ClientID::FromBinary(rpc_address_.raylet_id())); } return Status::OK(); } @@ -1512,7 +1515,8 @@ Status CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec, for (size_t i = 0; i < task_spec.NumReturns(); i++) { reference_counter_->AddOwnedObject(task_spec.ReturnId(i, TaskTransportType::DIRECT), /*inner_ids=*/{}, GetCallerId(), rpc_address_, - CurrentCallSite(), -1); + CurrentCallSite(), -1, + /*is_reconstructable=*/false); } auto old_id = GetActorId(); SetActorId(actor_id); diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index a41b618f2..bf1e3e7ea 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -54,8 +54,8 @@ ReferenceCounter::ReferenceTable ReferenceCounter::ReferenceTableFromProto( const ReferenceTableProto &proto) { ReferenceTable refs; for (const auto &ref : proto) { - refs[ray::ObjectID::FromBinary(ref.reference().object_id())] = - Reference::FromProto(ref); + refs.emplace(ray::ObjectID::FromBinary(ref.reference().object_id()), + Reference::FromProto(ref)); } return refs; } @@ -145,12 +145,11 @@ void ReferenceCounter::AddObjectRefStats( } } -void ReferenceCounter::AddOwnedObject(const ObjectID &object_id, - const std::vector &inner_ids, - const TaskID &owner_id, - const rpc::Address &owner_address, - const std::string &call_site, - const int64_t object_size) { +void ReferenceCounter::AddOwnedObject( + const ObjectID &object_id, const std::vector &inner_ids, + const TaskID &owner_id, const rpc::Address &owner_address, + const std::string &call_site, const int64_t object_size, bool is_reconstructable, + const absl::optional &pinned_at_raylet_id) { RAY_LOG(DEBUG) << "Adding owned object " << object_id; absl::MutexLock lock(&mutex_); RAY_CHECK(object_id_refs_.count(object_id) == 0) @@ -159,7 +158,8 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id, // because this corresponds to a submitted task whose return ObjectID will be created // in the frontend language, incrementing the reference count. object_id_refs_.emplace(object_id, - Reference(owner_id, owner_address, call_site, object_size)); + Reference(owner_id, owner_address, call_site, object_size, + is_reconstructable, pinned_at_raylet_id)); if (!inner_ids.empty()) { // Mark that this object ID contains other inner IDs. Then, we will not GC // the inner objects until the outer object ID goes out of scope. @@ -354,7 +354,8 @@ void ReferenceCounter::DeleteReferences(const std::vector &object_ids) } it->second.local_ref_count = 0; it->second.submitted_task_ref_count = 0; - if (distributed_ref_counting_enabled_ && !it->second.OutOfScope()) { + if (distributed_ref_counting_enabled_ && + !it->second.OutOfScope(lineage_pinning_enabled_)) { RAY_LOG(ERROR) << "ray.internal.free does not currently work for objects that are still in " "scope when distributed reference " @@ -387,7 +388,7 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it, should_delete_value = true; } - if (it->second.OutOfScope()) { + if (it->second.OutOfScope(lineage_pinning_enabled_)) { // If distributed ref counting is enabled, then delete the object once its // ref count across all processes is 0. should_delete_value = true; @@ -445,7 +446,7 @@ bool ReferenceCounter::SetDeleteCallback( auto it = object_id_refs_.find(object_id); if (it == object_id_refs_.end()) { return false; - } else if (it->second.OutOfScope() && + } else if (it->second.OutOfScope(lineage_pinning_enabled_) && !it->second.ShouldDelete(lineage_pinning_enabled_)) { // The object has already gone out of scope but cannot be deleted yet. Do // not set the deletion callback because it may never get called. @@ -486,7 +487,7 @@ void ReferenceCounter::UpdateObjectPinnedAtRaylet(const ObjectID &object_id, RAY_CHECK(!it->second.pinned_at_raylet_id.has_value()); // Only the owner tracks the location. RAY_CHECK(it->second.owned_by_us); - if (!it->second.OutOfScope()) { + if (!it->second.OutOfScope(lineage_pinning_enabled_)) { it->second.pinned_at_raylet_id = raylet_id; } } @@ -789,7 +790,7 @@ void ReferenceCounter::HandleRefRemoved(const ObjectID &object_id, // the object was zero. Also, we should have stripped all distributed ref // count information and returned it to the owner. Therefore, it should be // okay to delete the object, if it wasn't already deleted. - RAY_CHECK(it->second.OutOfScope()); + RAY_CHECK(it->second.OutOfScope(lineage_pinning_enabled_)); } // Send the owner information about any new borrowers. ReferenceTableToProto(borrowed_refs, reply->mutable_borrowed_refs()); diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index 122e864ea..238735f30 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -137,15 +137,20 @@ class ReferenceCounter { /// possible to have leftover references after a task has finished. /// /// \param[in] object_id The ID of the object that we own. - /// \param[in] inner_ids ObjectIDs that are contained in the object's value. + /// \param[in] contained_ids ObjectIDs that are contained in the object's value. /// As long as the object_id is in scope, the inner objects should not be GC'ed. /// \param[in] owner_id The ID of the object's owner. /// \param[in] owner_address The address of the object's owner. - /// \param[in] dependencies The objects that the object depends on. - void AddOwnedObject(const ObjectID &object_id, - const std::vector &contained_ids, const TaskID &owner_id, - const rpc::Address &owner_address, const std::string &call_site, - const int64_t object_size) LOCKS_EXCLUDED(mutex_); + /// \param[in] call_site Description of the call site where the reference was created. + /// \param[in] object_size Object size if known, otherwise -1; + /// \param[in] is_reconstructable Whether the object can be reconstructed + /// through lineage re-execution. + void AddOwnedObject( + const ObjectID &object_id, const std::vector &contained_ids, + const TaskID &owner_id, const rpc::Address &owner_address, + const std::string &call_site, const int64_t object_size, bool is_reconstructable, + const absl::optional &pinned_at_raylet_id = absl::optional()) + LOCKS_EXCLUDED(mutex_); /// Update the size of the object. /// @@ -326,11 +331,14 @@ class ReferenceCounter { : call_site(call_site), object_size(object_size) {} /// Constructor for a reference that we created. Reference(const TaskID &owner_id, const rpc::Address &owner_address, - std::string call_site, const int64_t object_size) + std::string call_site, const int64_t object_size, bool is_reconstructable, + const absl::optional &pinned_at_raylet_id) : call_site(call_site), object_size(object_size), owned_by_us(true), - owner({owner_id, owner_address}) {} + owner({owner_id, owner_address}), + is_reconstructable(is_reconstructable), + pinned_at_raylet_id(pinned_at_raylet_id) {} /// Constructor from a protobuf. This is assumed to be a message from /// another process, so the object defaults to not being owned by us. @@ -353,13 +361,19 @@ class ReferenceCounter { /// - The reference was contained in another ID that we were borrowing, and /// we haven't told the process that gave us that ID yet. /// - We gave the reference to at least one other process. - bool OutOfScope() const { + bool OutOfScope(bool lineage_pinning_enabled) const { bool in_scope = RefCount() > 0; bool was_contained_in_borrowed_id = contained_in_borrowed_id.has_value(); bool has_borrowers = borrowers.size() > 0; bool was_stored_in_objects = stored_in_objects.size() > 0; + + bool has_lineage_references = false; + if (lineage_pinning_enabled && owned_by_us && !is_reconstructable) { + has_lineage_references = lineage_ref_count > 0; + } + return !(in_scope || was_contained_in_borrowed_id || has_borrowers || - was_stored_in_objects); + was_stored_in_objects || has_lineage_references); } /// Whether the Reference can be deleted. A Reference can only be deleted @@ -369,9 +383,9 @@ class ReferenceCounter { /// the object that may be retried in the future. bool ShouldDelete(bool lineage_pinning_enabled) const { if (lineage_pinning_enabled) { - return OutOfScope() && (lineage_ref_count == 0); + return OutOfScope(lineage_pinning_enabled) && (lineage_ref_count == 0); } else { - return OutOfScope(); + return OutOfScope(lineage_pinning_enabled); } } @@ -392,6 +406,10 @@ class ReferenceCounter { // counting is enabled, then some raylet must be pinning the object value. // This is the address of that raylet. absl::optional pinned_at_raylet_id; + // Whether this object can be reconstructed via lineage. If false, then the + // object's value will be pinned as long as it is referenced by any other + // object's lineage. + const bool is_reconstructable = false; /// The local ref count for the ObjectID in the language frontend. size_t local_ref_count = 0; diff --git a/src/ray/core_worker/reference_count_test.cc b/src/ray/core_worker/reference_count_test.cc index 4c2757828..2d9dec563 100644 --- a/src/ray/core_worker/reference_count_test.cc +++ b/src/ray/core_worker/reference_count_test.cc @@ -113,12 +113,12 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { // The below methods mirror a core worker's operations, e.g., `Put` simulates // a ray.put(). void Put(const ObjectID &object_id) { - rc_.AddOwnedObject(object_id, {}, task_id_, address_, "", 0); + rc_.AddOwnedObject(object_id, {}, task_id_, address_, "", 0, false); rc_.AddLocalReference(object_id, ""); } void PutWrappedId(const ObjectID outer_id, const ObjectID &inner_id) { - rc_.AddOwnedObject(outer_id, {inner_id}, task_id_, address_, "", 0); + rc_.AddOwnedObject(outer_id, {inner_id}, task_id_, address_, "", 0, false); rc_.AddLocalReference(outer_id, ""); } @@ -139,7 +139,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { ObjectID SubmitTaskWithArg(const ObjectID &arg_id) { rc_.UpdateSubmittedTaskReferences({arg_id}); ObjectID return_id = ObjectID::FromRandom(); - rc_.AddOwnedObject(return_id, {}, task_id_, address_, "", 0); + rc_.AddOwnedObject(return_id, {}, task_id_, address_, "", 0, false); // Add a sentinel reference to keep all nested object IDs in scope. rc_.AddLocalReference(return_id, ""); return return_id; @@ -259,6 +259,38 @@ TEST_F(ReferenceCountTest, TestBasic) { out.clear(); } +TEST_F(ReferenceCountTest, TestUnreconstructableObjectOutOfScope) { + ObjectID id = ObjectID::FromRandom(); + TaskID task_id = TaskID::ForFakeTask(); + rpc::Address address; + address.set_ip_address("1234"); + + auto out_of_scope = std::make_shared(false); + auto callback = [&](const ObjectID &object_id) { *out_of_scope = true; }; + + // The object goes out of scope once it has no more refs. + std::vector out; + ASSERT_FALSE(rc->SetDeleteCallback(id, callback)); + rc->AddOwnedObject(id, {}, task_id, address, "", 0, false); + ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); + ASSERT_FALSE(*out_of_scope); + rc->AddLocalReference(id, ""); + ASSERT_FALSE(*out_of_scope); + rc->RemoveLocalReference(id, &out); + ASSERT_TRUE(*out_of_scope); + + // Unreconstructable objects go out of scope even if they have a nonzero + // lineage ref count. + *out_of_scope = false; + ASSERT_FALSE(rc->SetDeleteCallback(id, callback)); + rc->AddOwnedObject(id, {}, task_id, address, "", 0, false); + ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); + rc->UpdateSubmittedTaskReferences({id}); + ASSERT_FALSE(*out_of_scope); + rc->UpdateFinishedTaskReferences({id}, false, empty_borrower, empty_refs, &out); + ASSERT_TRUE(*out_of_scope); +} + // Tests call site tracking and ability to update object size. TEST_F(ReferenceCountTest, TestReferenceStats) { ObjectID id1 = ObjectID::FromRandom(); @@ -279,7 +311,7 @@ TEST_F(ReferenceCountTest, TestReferenceStats) { ASSERT_EQ(stats.object_refs(0).call_site(), "file.py:42"); rc->RemoveLocalReference(id1, nullptr); - rc->AddOwnedObject(id2, {}, task_id, address, "file2.py:43", 100); + rc->AddOwnedObject(id2, {}, task_id, address, "file2.py:43", 100, false); rpc::CoreWorkerStats stats2; rc->AddObjectRefStats({}, &stats2); ASSERT_EQ(stats2.object_refs_size(), 1); @@ -297,7 +329,7 @@ TEST_F(ReferenceCountTest, TestOwnerAddress) { TaskID task_id = TaskID::ForFakeTask(); rpc::Address address; address.set_ip_address("1234"); - rc->AddOwnedObject(object_id, {}, task_id, address, "", 0); + rc->AddOwnedObject(object_id, {}, task_id, address, "", 0, false); TaskID added_id; rpc::Address added_address; @@ -308,7 +340,7 @@ TEST_F(ReferenceCountTest, TestOwnerAddress) { auto object_id2 = ObjectID::FromRandom(); task_id = TaskID::ForFakeTask(); address.set_ip_address("5678"); - rc->AddOwnedObject(object_id2, {}, task_id, address, "", 0); + rc->AddOwnedObject(object_id2, {}, task_id, address, "", 0, false); ASSERT_TRUE(rc->GetOwner(object_id2, &added_id, &added_address)); ASSERT_EQ(task_id, added_id); ASSERT_EQ(address.ip_address(), added_address.ip_address()); @@ -1733,6 +1765,44 @@ TEST(DistributedReferenceCountTest, TestReturnBorrowedIdChainOutOfOrder) { // TODO: Test Pop and Merge individually. +TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope) { + ObjectID id = ObjectID::FromRandom(); + TaskID task_id = TaskID::ForFakeTask(); + rpc::Address address; + address.set_ip_address("1234"); + + auto out_of_scope = std::make_shared(false); + auto callback = [&](const ObjectID &object_id) { *out_of_scope = true; }; + + // The object goes out of scope once it has no more refs. + std::vector out; + ASSERT_FALSE(rc->SetDeleteCallback(id, callback)); + rc->AddOwnedObject(id, {}, task_id, address, "", 0, false); + ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); + ASSERT_FALSE(*out_of_scope); + rc->AddLocalReference(id, ""); + ASSERT_FALSE(*out_of_scope); + rc->RemoveLocalReference(id, &out); + ASSERT_TRUE(*out_of_scope); + + // Unreconstructable objects stay in scope if they have a nonzero lineage ref + // count. + *out_of_scope = false; + ASSERT_FALSE(rc->SetDeleteCallback(id, callback)); + rc->AddOwnedObject(id, {}, task_id, address, "", 0, false); + ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); + rc->UpdateSubmittedTaskReferences({id}); + ASSERT_FALSE(*out_of_scope); + rc->UpdateFinishedTaskReferences({id}, false, empty_borrower, empty_refs, &out); + ASSERT_FALSE(*out_of_scope); + + // Unreconstructable objects go out of scope once their lineage ref count + // reaches 0. + rc->UpdateResubmittedTaskReferences({id}); + rc->UpdateFinishedTaskReferences({id}, true, empty_borrower, empty_refs, &out); + ASSERT_TRUE(*out_of_scope); +} + // Test to make sure that we call the lineage released callback correctly. TEST_F(ReferenceCountLineageEnabledTest, TestBasicLineage) { std::vector out; @@ -1752,7 +1822,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestBasicLineage) { ASSERT_TRUE(lineage_deleted.empty()); // We should keep lineage for owned objects. - rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0); + rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0, false); rc->AddLocalReference(id, ""); ASSERT_TRUE(rc->HasReference(id)); rc->RemoveLocalReference(id, nullptr); @@ -1771,7 +1841,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) { for (int i = 0; i < 3; i++) { ObjectID id = ObjectID::FromRandom(); ids.push_back(id); - rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0); + rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0, true); } rc->SetReleaseLineageCallback( @@ -1825,7 +1895,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestResubmittedTask) { std::vector lineage_deleted; ObjectID id = ObjectID::FromRandom(); - rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0); + rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0, true); rc->SetReleaseLineageCallback( [&](const ObjectID &object_id, std::vector *ids_to_release) { @@ -1868,7 +1938,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { ObjectID id = ObjectID::FromRandom(); ClientID node_id = ClientID::FromRandom(); - rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0); + rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0, true); rc->AddLocalReference(id, ""); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned)); @@ -1882,7 +1952,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { ASSERT_TRUE(deleted->count(id) > 0); deleted->clear(); - rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0); + rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0, true); rc->AddLocalReference(id, ""); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); rc->UpdateObjectPinnedAtRaylet(id, node_id); diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index a314f7749..9361d7917 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -66,7 +66,7 @@ void TaskManager::AddPendingTask(const TaskID &caller_id, // PushTaskReply. reference_counter_->AddOwnedObject(spec.ReturnId(i, TaskTransportType::DIRECT), /*inner_ids=*/{}, caller_id, caller_address, - call_site, -1); + call_site, -1, /*is_reconstructable=*/true); } { diff --git a/src/ray/core_worker/test/object_recovery_manager_test.cc b/src/ray/core_worker/test/object_recovery_manager_test.cc index 9f69ba93f..9ad5b8a41 100644 --- a/src/ray/core_worker/test/object_recovery_manager_test.cc +++ b/src/ray/core_worker/test/object_recovery_manager_test.cc @@ -151,7 +151,7 @@ class ObjectRecoveryManagerTest : public ::testing::Test { TEST_F(ObjectRecoveryManagerTest, TestNoReconstruction) { ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0); + ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0, true); ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); ASSERT_TRUE(failed_reconstructions_.empty()); ASSERT_TRUE(object_directory_->Flush() == 1); @@ -161,7 +161,7 @@ TEST_F(ObjectRecoveryManagerTest, TestNoReconstruction) { TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) { ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0); + ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0, true); std::vector addresses({rpc::Address()}); object_directory_->SetLocations(object_id, addresses); @@ -174,7 +174,7 @@ TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) { TEST_F(ObjectRecoveryManagerTest, TestReconstruction) { ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0); + ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0, true); task_resubmitter_->AddTask(object_id.TaskId(), {}); ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); @@ -186,7 +186,7 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstruction) { TEST_F(ObjectRecoveryManagerTest, TestReconstructionSuppression) { ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0); + ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0, true); ref_counter_->AddLocalReference(object_id, ""); ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); @@ -215,7 +215,8 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionChain) { std::vector dependencies; for (int i = 0; i < 3; i++) { ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0); + ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0, + true); task_resubmitter_->AddTask(object_id.TaskId(), dependencies); dependencies = {object_id}; object_ids.push_back(object_id);