diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 96ffaf944..4f8930a03 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -431,7 +431,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ }; auto reconstruct_object_callback = [this](const ObjectID &object_id) { io_service_.post([this, object_id]() { - RAY_CHECK_OK(object_recovery_manager_->RecoverObject(object_id)); + RAY_CHECK(object_recovery_manager_->RecoverObject(object_id)); }); }; task_manager_.reset(new TaskManager( @@ -656,8 +656,11 @@ void CoreWorker::OnNodeRemoved(const rpc::GcsNodeInfo &node_info) { memory_store_->Delete(lost_objects); for (const auto &object_id : lost_objects) { RAY_LOG(INFO) << "Object " << object_id << " lost due to node failure " << node_id; - // The lost object must have been owned by us. - RAY_CHECK_OK(object_recovery_manager_->RecoverObject(object_id)); + // NOTE(swang): There is a race condition where this can return false if + // the reference went out of scope since the call to the ref counter to get + // the lost objects. It's okay to not mark the object as failed or recover + // the object since there are no reference holders. + static_cast(object_recovery_manager_->RecoverObject(object_id)); } } @@ -1189,8 +1192,10 @@ void CoreWorker::SpillOwnedObject(const ObjectID &object_id, // Find the raylet that hosts the primary copy of the object. NodeID pinned_at; bool spilled; - RAY_CHECK( - reference_counter_->IsPlasmaObjectPinnedOrSpilled(object_id, &pinned_at, &spilled)); + bool owned_by_us; + RAY_CHECK(reference_counter_->IsPlasmaObjectPinnedOrSpilled(object_id, &owned_by_us, + &pinned_at, &spilled)); + RAY_CHECK(owned_by_us); if (spilled) { // The object has already been spilled. return; diff --git a/src/ray/core_worker/object_recovery_manager.cc b/src/ray/core_worker/object_recovery_manager.cc index 48346a8aa..0489c8a17 100644 --- a/src/ray/core_worker/object_recovery_manager.cc +++ b/src/ray/core_worker/object_recovery_manager.cc @@ -18,16 +18,23 @@ namespace ray { -Status ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { +bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { // Check the ReferenceCounter to see if there is a location for the object. + bool owned_by_us; NodeID pinned_at; bool spilled; - bool owned_by_us = - reference_counter_->IsPlasmaObjectPinnedOrSpilled(object_id, &pinned_at, &spilled); + bool ref_exists = reference_counter_->IsPlasmaObjectPinnedOrSpilled( + object_id, &owned_by_us, &pinned_at, &spilled); + if (!ref_exists) { + // References that have gone out of scope cannot be recovered. + return false; + } + if (!owned_by_us) { - return Status::Invalid( - "Object reference no longer exists or is not owned by us. Either lineage pinning " - "is disabled or this object ID is borrowed."); + RAY_LOG(INFO) << "Reconstruction for borrowed objects (" << object_id + << ") is not supported"; + reconstruction_failure_callback_(object_id, /*pin_object=*/false); + return true; } bool already_pending_recovery = true; @@ -49,7 +56,7 @@ Status ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { RAY_LOG(INFO) << "Recovery complete for object " << object_id; }); // Lookup the object in the GCS to find another copy. - RAY_RETURN_NOT_OK(object_lookup_( + RAY_CHECK_OK(object_lookup_( object_id, [this](const ObjectID &object_id, const std::vector &locations) { PinOrReconstructObject(object_id, locations); @@ -57,7 +64,7 @@ Status ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { } else { RAY_LOG(DEBUG) << "Recovery already started for object " << object_id; } - return Status::OK(); + return true; } void ObjectRecoveryManager::PinOrReconstructObject( @@ -130,8 +137,8 @@ void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) { if (status.ok()) { // Try to recover the task's dependencies. for (const auto &dep : task_deps) { - auto status = RecoverObject(dep); - if (!status.ok()) { + auto recovered = RecoverObject(dep); + if (!recovered) { RAY_LOG(INFO) << "Failed to reconstruct object " << dep << ": " << status.message(); // We do not pin the dependency because we may not be the owner. diff --git a/src/ray/core_worker/object_recovery_manager.h b/src/ray/core_worker/object_recovery_manager.h index c43f2de03..059fb8472 100644 --- a/src/ray/core_worker/object_recovery_manager.h +++ b/src/ray/core_worker/object_recovery_manager.h @@ -79,11 +79,12 @@ class ObjectRecoveryManager { /// plasma arguments to the task. The recovery operation will succeed once /// the task completes and stores a new value for its return object. /// - /// \return OK if recovery for the object has successfully started, Invalid - /// if the object is not recoverable because we do not own it. Note that the - /// Status::OK value only indicates that the recovery operation has started, - /// but does not guarantee that the recovery operation is successful. - Status RecoverObject(const ObjectID &object_id); + /// \return True if recovery for the object has successfully started, false + /// if the object is not recoverable because we do not have any metadata + /// about the object. If this returns true, then eventually recovery will + /// either succeed (a value will be put into the memory store) or fail (the + /// reconstruction failure callback will be called for this object). + bool RecoverObject(const ObjectID &object_id); private: /// Pin a new copy for a lost object from the given locations or, if that diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index a21350ef9..ef0168af7 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -543,16 +543,17 @@ void ReferenceCounter::UpdateObjectPinnedAtRaylet(const ObjectID &object_id, } bool ReferenceCounter::IsPlasmaObjectPinnedOrSpilled(const ObjectID &object_id, - NodeID *pinned_at, + bool *owned_by_us, NodeID *pinned_at, bool *spilled) const { absl::MutexLock lock(&mutex_); auto it = object_id_refs_.find(object_id); if (it != object_id_refs_.end()) { if (it->second.owned_by_us) { + *owned_by_us = true; *spilled = it->second.spilled; *pinned_at = it->second.pinned_at_raylet_id.value_or(NodeID::Nil()); - return true; } + return true; } return false; } diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index bd950dae6..03bffa5fe 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -327,14 +327,15 @@ class ReferenceCounter : public ReferenceCounterInterface { /// available to fetch. /// /// \param[in] object_id The object to check. + /// \param[out] owned_by_us Whether this object is owned by us. The pinned_at + /// and spilled out-parameters are set if this is true. /// \param[out] pinned_at The node ID of the raylet at which this object is /// \param[out] spilled Whether this object has been spilled. /// pinned. Set to nil if the object is not pinned. - /// \return True if the object exists and is owned by us, false otherwise. We - /// return false here because a borrower should not know the pinned location - /// for an object. - bool IsPlasmaObjectPinnedOrSpilled(const ObjectID &object_id, NodeID *pinned_at, - bool *spilled) const LOCKS_EXCLUDED(mutex_); + /// \return True if the reference exists, false otherwise. + bool IsPlasmaObjectPinnedOrSpilled(const ObjectID &object_id, bool *owned_by_us, + NodeID *pinned_at, bool *spilled) const + LOCKS_EXCLUDED(mutex_); /// Get and reset the objects that were pinned on the given node. This /// method should be called upon a node failure, to determine which plasma diff --git a/src/ray/core_worker/reference_count_test.cc b/src/ray/core_worker/reference_count_test.cc index 46291943e..8362b2d21 100644 --- a/src/ray/core_worker/reference_count_test.cc +++ b/src/ray/core_worker/reference_count_test.cc @@ -1986,23 +1986,28 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { ObjectID borrowed_id = ObjectID::FromRandom(); rc->AddLocalReference(borrowed_id, ""); + bool owned_by_us; NodeID pinned_at; bool spilled; - ASSERT_FALSE(rc->IsPlasmaObjectPinnedOrSpilled(borrowed_id, &pinned_at, &spilled)); + ASSERT_TRUE( + rc->IsPlasmaObjectPinnedOrSpilled(borrowed_id, &owned_by_us, &pinned_at, &spilled)); + ASSERT_FALSE(owned_by_us); ObjectID id = ObjectID::FromRandom(); NodeID node_id = NodeID::FromRandom(); rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true); rc->AddLocalReference(id, ""); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); - ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled)); + ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled)); + ASSERT_TRUE(owned_by_us); ASSERT_TRUE(pinned_at.IsNil()); rc->UpdateObjectPinnedAtRaylet(id, node_id); - ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled)); + ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled)); + ASSERT_TRUE(owned_by_us); ASSERT_FALSE(pinned_at.IsNil()); rc->RemoveLocalReference(id, nullptr); - ASSERT_FALSE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled)); + ASSERT_FALSE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled)); ASSERT_TRUE(deleted->count(id) > 0); deleted->clear(); @@ -2013,7 +2018,8 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { auto objects = rc->ResetObjectsOnRemovedNode(node_id); ASSERT_EQ(objects.size(), 1); ASSERT_EQ(objects[0], id); - ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled)); + ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled)); + ASSERT_TRUE(owned_by_us); ASSERT_TRUE(pinned_at.IsNil()); ASSERT_TRUE(deleted->count(id) > 0); deleted->clear(); @@ -2035,9 +2041,11 @@ TEST_F(ReferenceCountTest, TestFree) { ASSERT_FALSE(rc->SetDeleteCallback(id, callback)); ASSERT_EQ(deleted->count(id), 0); rc->UpdateObjectPinnedAtRaylet(id, node_id); + bool owned_by_us; NodeID pinned_at; bool spilled; - ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled)); + ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled)); + ASSERT_TRUE(owned_by_us); ASSERT_TRUE(pinned_at.IsNil()); ASSERT_TRUE(rc->IsPlasmaObjectFreed(id)); rc->RemoveLocalReference(id, nullptr); @@ -2052,7 +2060,8 @@ TEST_F(ReferenceCountTest, TestFree) { rc->FreePlasmaObjects({id}); ASSERT_TRUE(rc->IsPlasmaObjectFreed(id)); ASSERT_TRUE(deleted->count(id) > 0); - ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled)); + ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled)); + ASSERT_TRUE(owned_by_us); ASSERT_TRUE(pinned_at.IsNil()); rc->RemoveLocalReference(id, nullptr); ASSERT_FALSE(rc->IsPlasmaObjectFreed(id)); diff --git a/src/ray/core_worker/test/object_recovery_manager_test.cc b/src/ray/core_worker/test/object_recovery_manager_test.cc index da29ab908..a8d61599b 100644 --- a/src/ray/core_worker/test/object_recovery_manager_test.cc +++ b/src/ray/core_worker/test/object_recovery_manager_test.cc @@ -132,8 +132,6 @@ class ObjectRecoveryManagerTest : public ::testing::Test { std::make_shared(metadata, meta.size()); auto data = RayObject(nullptr, meta_buffer, std::vector()); RAY_CHECK(memory_store_->Put(data, object_id)); - - ref_counter_->UpdateObjectPinnedAtRaylet(object_id, local_raylet_id_); }, /*lineage_reconstruction_enabled=*/true) {} @@ -149,13 +147,27 @@ class ObjectRecoveryManagerTest : public ::testing::Test { }; TEST_F(ObjectRecoveryManagerTest, TestNoReconstruction) { + // Lineage recording disabled. ObjectID object_id = ObjectID::FromRandom(); ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); - ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); + ASSERT_TRUE(manager_.RecoverObject(object_id)); ASSERT_TRUE(failed_reconstructions_.empty()); ASSERT_TRUE(object_directory_->Flush() == 1); ASSERT_TRUE(failed_reconstructions_.count(object_id) == 1); ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0); + + // Borrowed object. + object_id = ObjectID::FromRandom(); + ref_counter_->AddLocalReference(object_id, ""); + ASSERT_TRUE(manager_.RecoverObject(object_id)); + ASSERT_TRUE(failed_reconstructions_.count(object_id) == 1); + ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0); + + // Ref went out of scope. + object_id = ObjectID::FromRandom(); + ASSERT_FALSE(manager_.RecoverObject(object_id)); + ASSERT_TRUE(failed_reconstructions_.count(object_id) == 0); + ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0); } TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) { @@ -164,7 +176,7 @@ TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) { std::vector addresses({rpc::Address()}); object_directory_->SetLocations(object_id, addresses); - ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); + ASSERT_TRUE(manager_.RecoverObject(object_id)); ASSERT_TRUE(object_directory_->Flush() == 1); ASSERT_TRUE(raylet_client_->Flush() == 1); ASSERT_TRUE(failed_reconstructions_.empty()); @@ -176,7 +188,7 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstruction) { ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); task_resubmitter_->AddTask(object_id.TaskId(), {}); - ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); + ASSERT_TRUE(manager_.RecoverObject(object_id)); ASSERT_TRUE(object_directory_->Flush() == 1); ASSERT_TRUE(failed_reconstructions_.empty()); @@ -188,24 +200,30 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionSuppression) { ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); ref_counter_->AddLocalReference(object_id, ""); - ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); + ASSERT_TRUE(manager_.RecoverObject(object_id)); // A second attempt to recover the object will not trigger any more // callbacks. - ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); + ASSERT_TRUE(manager_.RecoverObject(object_id)); + // A new copy of the object is pinned. + NodeID remote_node_id = NodeID::FromRandom(); + rpc::Address address; + address.set_raylet_id(remote_node_id.Binary()); + object_directory_->SetLocations(object_id, {address}); ASSERT_TRUE(object_directory_->Flush() == 1); - failed_reconstructions_.clear(); + ASSERT_TRUE(raylet_client_->Flush() == 1); - // The object has been marked as failed. Another attempt to recover the - // object will not trigger any callbacks. - ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); + // The object has been marked as failed but it is still pinned on the new + // node. Another attempt to recover the object will not trigger any + // callbacks. + ASSERT_TRUE(manager_.RecoverObject(object_id)); ASSERT_EQ(object_directory_->Flush(), 0); // The object is removed and can be recovered again. - auto objects = ref_counter_->ResetObjectsOnRemovedNode(local_raylet_id_); + auto objects = ref_counter_->ResetObjectsOnRemovedNode(remote_node_id); ASSERT_EQ(objects.size(), 1); ASSERT_EQ(objects[0], object_id); memory_store_->Delete(objects); - ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); + ASSERT_TRUE(manager_.RecoverObject(object_id)); ASSERT_TRUE(object_directory_->Flush() == 1); } @@ -220,7 +238,7 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionChain) { object_ids.push_back(object_id); } - ASSERT_TRUE(manager_.RecoverObject(object_ids.back()).ok()); + ASSERT_TRUE(manager_.RecoverObject(object_ids.back())); for (int i = 0; i < 3; i++) { RAY_LOG(INFO) << i; ASSERT_EQ(object_directory_->Flush(), 1);