diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 3be251003..8ddb8612f 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1497,13 +1497,6 @@ cdef class CoreWorker: check_status(CCoreWorkerProcess.GetCoreWorker() .SpillObjects(object_ids)) - def force_restore_spilled_objects(self, object_refs): - cdef c_vector[CObjectID] object_ids - object_ids = ObjectRefsToVector(object_refs) - with nogil: - check_status(CCoreWorkerProcess.GetCoreWorker() - .ForceRestoreSpilledObjects(object_ids)) - cdef void async_set_result(shared_ptr[CRayObject] obj, CObjectID object_ref, void *future) with gil: diff --git a/python/ray/experimental/__init__.py b/python/ray/experimental/__init__.py index dab64ed61..c59ef2702 100644 --- a/python/ray/experimental/__init__.py +++ b/python/ray/experimental/__init__.py @@ -1,7 +1,6 @@ from .dynamic_resources import set_resource -from .object_spilling import force_spill_objects, force_restore_spilled_objects +from .object_spilling import force_spill_objects __all__ = [ "set_resource", "force_spill_objects", - "force_restore_spilled_objects", ] diff --git a/python/ray/experimental/object_spilling.py b/python/ray/experimental/object_spilling.py index 6a61ae22b..def5d2353 100644 --- a/python/ray/experimental/object_spilling.py +++ b/python/ray/experimental/object_spilling.py @@ -16,20 +16,3 @@ def force_spill_objects(object_refs): f"Attempting to call `force_spill_objects` on the " f"value {object_ref}, which is not an ray.ObjectRef.") return core_worker.force_spill_objects(object_refs) - - -def force_restore_spilled_objects(object_refs): - """Force restoring objects from external storage. - - Args: - object_refs: Object refs of the objects to be - restored. - """ - core_worker = ray.worker.global_worker.core_worker - # Make sure that the values are object refs. - for object_ref in object_refs: - if not isinstance(object_ref, ray.ObjectRef): - raise TypeError( - f"Attempting to call `force_restore_spilled_objects` on the " - f"value {object_ref}, which is not an ray.ObjectRef.") - return core_worker.force_restore_spilled_objects(object_refs) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 87899d142..cd5915733 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -200,8 +200,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const double capacity, const CNodeID &client_Id) CRayStatus SpillObjects(const c_vector[CObjectID] &object_ids) - CRayStatus ForceRestoreSpilledObjects( - const c_vector[CObjectID] &object_ids) cdef cppclass CCoreWorkerOptions "ray::CoreWorkerOptions": CWorkerType worker_type diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index cbca59185..d1cf613f8 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -1,5 +1,7 @@ import json import random +import platform +import sys import time import numpy as np @@ -8,6 +10,8 @@ import psutil import ray +@pytest.mark.skipif( + platform.system() == "Windows", reason="Failing on Windows.") def test_spill_objects_manually(shutdown_only): # Limit our object store to 75 MiB of memory. ray.init( @@ -25,7 +29,6 @@ def test_spill_objects_manually(shutdown_only): arr = np.random.rand(1024 * 1024) # 8 MB data replay_buffer = [] pinned_objects = set() - spilled_objects = set() # Create objects of more than 200 MiB. for _ in range(25): @@ -38,7 +41,6 @@ def test_spill_objects_manually(shutdown_only): except ray.exceptions.ObjectStoreFullError: ref_to_spill = pinned_objects.pop() ray.experimental.force_spill_objects([ref_to_spill]) - spilled_objects.add(ref_to_spill) def is_worker(cmdline): return cmdline and cmdline[0].startswith("ray::") @@ -54,17 +56,16 @@ def test_spill_objects_manually(shutdown_only): # restoring objects back. refs_to_spill = (pinned_objects.pop(), pinned_objects.pop()) ray.experimental.force_spill_objects(refs_to_spill) - spilled_objects.update(refs_to_spill) # randomly sample objects for _ in range(100): ref = random.choice(replay_buffer) - if ref in spilled_objects: - ray.experimental.force_restore_spilled_objects([ref]) sample = ray.get(ref) assert np.array_equal(sample, arr) +@pytest.mark.skipif( + platform.system() == "Windows", reason="Failing on Windows.") def test_spill_objects_manually_from_workers(shutdown_only): # Limit our object store to 100 MiB of memory. ray.init( @@ -82,15 +83,22 @@ def test_spill_objects_manually_from_workers(shutdown_only): @ray.remote def _worker(): - arr = np.random.rand(100 * 1024) + arr = np.random.rand(1024 * 1024) # 8 MB data ref = ray.put(arr) ray.experimental.force_spill_objects([ref]) - ray.experimental.force_restore_spilled_objects([ref]) - assert np.array_equal(ray.get(ref), arr) + return ref - ray.get([_worker.remote() for _ in range(50)]) + # Create objects of more than 200 MiB. + replay_buffer = [ray.get(_worker.remote()) for _ in range(25)] + values = {ref: np.copy(ray.get(ref)) for ref in replay_buffer} + # Randomly sample objects. + for _ in range(100): + ref = random.choice(replay_buffer) + sample = ray.get(ref) + assert np.array_equal(sample, values[ref]) +@pytest.mark.skip(reason="Not implemented yet.") def test_spill_objects_manually_with_workers(shutdown_only): # Limit our object store to 75 MiB of memory. ray.init( @@ -118,27 +126,29 @@ def test_spill_objects_manually_with_workers(shutdown_only): assert np.array_equal(restored, arr) +@pytest.mark.skipif( + platform.system() == "Windows", reason="Failing on Windows.") @pytest.mark.parametrize( "ray_start_cluster_head", [{ "num_cpus": 0, "object_store_memory": 75 * 1024 * 1024, - "_object_spilling_config": { + "object_spilling_config": { "type": "filesystem", "params": { "directory_path": "/tmp" } }, - "_system_config": json.dumps({ + "_system_config": { "object_store_full_max_retries": 0, "max_io_workers": 4, - }), + }, }], indirect=True) def test_spill_remote_object(ray_start_cluster_head): cluster = ray_start_cluster_head cluster.add_node( object_store_memory=75 * 1024 * 1024, - _object_spilling_config={ + object_spilling_config={ "type": "filesystem", "params": { "directory_path": "/tmp" @@ -149,23 +159,33 @@ def test_spill_remote_object(ray_start_cluster_head): def put(): return np.random.rand(5 * 1024 * 1024) # 40 MB data - # Create 2 objects. Only 1 should fit. + @ray.remote + def depends(arg): + return + ref = put.remote() - ray.get(ref) + copy = np.copy(ray.get(ref)) + # Evict local copy. + ray.put(np.random.rand(5 * 1024 * 1024)) # 40 MB data + # Remote copy should not fit. with pytest.raises(ray.exceptions.RayTaskError): ray.get(put.remote()) - time.sleep(1) # Spill 1 object. The second should now fit. ray.experimental.force_spill_objects([ref]) ray.get(put.remote()) - # TODO(swang): Restoring from the object directory is not yet supported. - # ray.experimental.force_restore_spilled_objects([ref]) - # sample = ray.get(ref) - # assert np.array_equal(sample, copy) + sample = ray.get(ref) + assert np.array_equal(sample, copy) + # Evict the spilled object. + del sample + ray.get(put.remote()) + ray.put(np.random.rand(5 * 1024 * 1024)) # 40 MB data + + # Test passing the spilled object as an arg to another task. + ray.get(depends.remote(ref)) -@pytest.mark.skip(reason="have not been fully implemented") +@pytest.mark.skip(reason="Not implemented yet.") def test_spill_objects_automatically(shutdown_only): # Limit our object store to 75 MiB of memory. ray.init( @@ -196,3 +216,7 @@ def test_spill_objects_automatically(shutdown_only): ref = random.choice(replay_buffer) sample = ray.get(ref, timeout=0) assert np.array_equal(sample, arr) + + +if __name__ == "__main__": + sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 549f60903..0b838e5a2 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -496,13 +496,13 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ auto object_lookup_fn = [this](const ObjectID &object_id, const ObjectLookupCallback &callback) { return gcs_client_->Objects().AsyncGetLocations( - object_id, - [this, object_id, callback](const Status &status, - const std::vector &results) { + object_id, [this, object_id, callback]( + const Status &status, + const boost::optional &result) { RAY_CHECK_OK(status); std::vector locations; - for (const auto &result : results) { - const auto &node_id = NodeID::FromBinary(result.manager()); + for (const auto &loc : result->locations()) { + const auto &node_id = NodeID::FromBinary(loc.manager()); auto node = gcs_client_->Nodes().Get(node_id); RAY_CHECK(node.has_value()); if (node->state() == rpc::GcsNodeInfo::ALIVE) { @@ -1170,7 +1170,13 @@ void CoreWorker::SpillOwnedObject(const ObjectID &object_id, // Find the raylet that hosts the primary copy of the object. NodeID pinned_at; - RAY_CHECK(reference_counter_->IsPlasmaObjectPinned(object_id, &pinned_at)); + bool spilled; + RAY_CHECK( + reference_counter_->IsPlasmaObjectPinnedOrSpilled(object_id, &pinned_at, &spilled)); + if (spilled) { + // The object has already been spilled. + return; + } auto node = gcs_client_->Nodes().Get(pinned_at); if (pinned_at.IsNil() || !node) { RAY_LOG(ERROR) << "Primary raylet for object " << object_id << " unreachable"; @@ -1179,6 +1185,7 @@ void CoreWorker::SpillOwnedObject(const ObjectID &object_id, } // Ask the raylet to spill the object. + RAY_LOG(DEBUG) << "Sending spill request to raylet for object " << object_id; auto raylet_client = std::make_shared(rpc::NodeManagerWorkerClient::make( node->node_manager_address(), node->node_manager_port(), @@ -1237,11 +1244,11 @@ Status CoreWorker::SpillObjects(const std::vector &object_ids) { } ready_promise->get_future().wait(); - return final_status; -} -Status CoreWorker::ForceRestoreSpilledObjects(const std::vector &object_ids) { - return local_raylet_client_->ForceRestoreSpilledObjects(object_ids); + for (const auto &object_id : object_ids) { + reference_counter_->HandleObjectSpilled(object_id); + } + return final_status; } std::unordered_map AddPlacementGroupConstraint( diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 688300604..64fd72d35 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -632,11 +632,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// to spill the object. Status SpillObjects(const std::vector &object_ids); - /// Restore objects from external storage. - /// \param[in] object_ids The objects to be restored. - /// \return Status - Status ForceRestoreSpilledObjects(const std::vector &object_ids); - /// Submit a normal task. /// /// \param[in] function The remote function to execute. diff --git a/src/ray/core_worker/object_recovery_manager.cc b/src/ray/core_worker/object_recovery_manager.cc index c5cdd5054..48346a8aa 100644 --- a/src/ray/core_worker/object_recovery_manager.cc +++ b/src/ray/core_worker/object_recovery_manager.cc @@ -21,7 +21,9 @@ namespace ray { Status ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { // Check the ReferenceCounter to see if there is a location for the object. NodeID pinned_at; - bool owned_by_us = reference_counter_->IsPlasmaObjectPinned(object_id, &pinned_at); + bool spilled; + bool owned_by_us = + reference_counter_->IsPlasmaObjectPinnedOrSpilled(object_id, &pinned_at, &spilled); if (!owned_by_us) { return Status::Invalid( "Object reference no longer exists or is not owned by us. Either lineage pinning " @@ -29,7 +31,7 @@ Status ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { } bool already_pending_recovery = true; - if (pinned_at.IsNil()) { + if (pinned_at.IsNil() && !spilled) { { absl::MutexLock lock(&mu_); // Mark that we are attempting recovery for this object to prevent diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 81f3040e6..a21350ef9 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -492,6 +492,9 @@ bool ReferenceCounter::SetDeleteCallback( // The object has been freed by the language frontend, so it // should be deleted immediately. return false; + } else if (it->second.spilled) { + // The object has been spilled, so it can be released immediately. + return false; } // NOTE: In two cases, `GcsActorManager` will send `WaitForActorOutOfScope` request more @@ -539,12 +542,14 @@ void ReferenceCounter::UpdateObjectPinnedAtRaylet(const ObjectID &object_id, } } -bool ReferenceCounter::IsPlasmaObjectPinned(const ObjectID &object_id, - NodeID *pinned_at) const { +bool ReferenceCounter::IsPlasmaObjectPinnedOrSpilled(const ObjectID &object_id, + NodeID *pinned_at, + bool *spilled) const { absl::MutexLock lock(&mutex_); auto it = object_id_refs_.find(object_id); if (it != object_id_refs_.end()) { if (it->second.owned_by_us) { + *spilled = it->second.spilled; *pinned_at = it->second.pinned_at_raylet_id.value_or(NodeID::Nil()); return true; } @@ -920,6 +925,19 @@ std::unordered_set ReferenceCounter::GetObjectLocations( return locations; } +void ReferenceCounter::HandleObjectSpilled(const ObjectID &object_id) { + absl::MutexLock lock(&mutex_); + auto it = object_id_refs_.find(object_id); + if (it == object_id_refs_.end()) { + RAY_LOG(WARNING) << "Spilled object " << object_id << " already out of scope"; + return; + } + + it->second.spilled = true; + // Release the primary plasma copy, if any. + ReleasePlasmaObject(it); +} + ReferenceCounter::Reference ReferenceCounter::Reference::FromProto( const rpc::ObjectReferenceCount &ref_count) { Reference ref; diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index b77a9a7cb..bd950dae6 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -322,16 +322,19 @@ class ReferenceCounter : public ReferenceCounterInterface { void UpdateObjectPinnedAtRaylet(const ObjectID &object_id, const NodeID &raylet_id) LOCKS_EXCLUDED(mutex_); - /// Check whether the object is pinned at a remote plasma store node. + /// Check whether the object is pinned at a remote plasma store node or + /// spilled to external storage. In either case, a copy of the object is + /// available to fetch. /// /// \param[in] object_id The object to check. /// \param[out] pinned_at The node ID of the raylet at which this object is + /// \param[out] spilled Whether this object has been spilled. /// pinned. Set to nil if the object is not pinned. /// \return True if the object exists and is owned by us, false otherwise. We /// return false here because a borrower should not know the pinned location /// for an object. - bool IsPlasmaObjectPinned(const ObjectID &object_id, NodeID *pinned_at) const - LOCKS_EXCLUDED(mutex_); + bool IsPlasmaObjectPinnedOrSpilled(const ObjectID &object_id, NodeID *pinned_at, + bool *spilled) const LOCKS_EXCLUDED(mutex_); /// Get and reset the objects that were pinned on the given node. This /// method should be called upon a node failure, to determine which plasma @@ -376,6 +379,12 @@ class ReferenceCounter : public ReferenceCounterInterface { std::unordered_set GetObjectLocations(const ObjectID &object_id) LOCKS_EXCLUDED(mutex_); + /// Handle an object has been spilled to external storage. + /// + /// This notifies the primary raylet that the object is safe to release and + /// records that the object has been spilled to suppress reconstruction. + void HandleObjectSpilled(const ObjectID &object_id); + private: struct Reference { /// Constructor for a reference whose origin is unknown. @@ -524,6 +533,8 @@ class ReferenceCounter : public ReferenceCounterInterface { /// is inlined (not stored in plasma), then its lineage ref count is 0 /// because any dependent task will already have the value of the object. size_t lineage_ref_count = 0; + /// Whether this object has been spilled to external storage. + bool spilled = false; /// Callback that will be called when this ObjectID no longer has /// references. diff --git a/src/ray/core_worker/reference_count_test.cc b/src/ray/core_worker/reference_count_test.cc index 6fd68699d..46291943e 100644 --- a/src/ray/core_worker/reference_count_test.cc +++ b/src/ray/core_worker/reference_count_test.cc @@ -1987,21 +1987,22 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { ObjectID borrowed_id = ObjectID::FromRandom(); rc->AddLocalReference(borrowed_id, ""); NodeID pinned_at; - ASSERT_FALSE(rc->IsPlasmaObjectPinned(borrowed_id, &pinned_at)); + bool spilled; + ASSERT_FALSE(rc->IsPlasmaObjectPinnedOrSpilled(borrowed_id, &pinned_at, &spilled)); ObjectID id = ObjectID::FromRandom(); NodeID node_id = NodeID::FromRandom(); rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true); rc->AddLocalReference(id, ""); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); - ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned_at)); + ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled)); ASSERT_TRUE(pinned_at.IsNil()); rc->UpdateObjectPinnedAtRaylet(id, node_id); - ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned_at)); + ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled)); ASSERT_FALSE(pinned_at.IsNil()); rc->RemoveLocalReference(id, nullptr); - ASSERT_FALSE(rc->IsPlasmaObjectPinned(id, &pinned_at)); + ASSERT_FALSE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled)); ASSERT_TRUE(deleted->count(id) > 0); deleted->clear(); @@ -2012,7 +2013,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { auto objects = rc->ResetObjectsOnRemovedNode(node_id); ASSERT_EQ(objects.size(), 1); ASSERT_EQ(objects[0], id); - ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned_at)); + ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled)); ASSERT_TRUE(pinned_at.IsNil()); ASSERT_TRUE(deleted->count(id) > 0); deleted->clear(); @@ -2035,7 +2036,8 @@ TEST_F(ReferenceCountTest, TestFree) { ASSERT_EQ(deleted->count(id), 0); rc->UpdateObjectPinnedAtRaylet(id, node_id); NodeID pinned_at; - ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned_at)); + bool spilled; + ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled)); ASSERT_TRUE(pinned_at.IsNil()); ASSERT_TRUE(rc->IsPlasmaObjectFreed(id)); rc->RemoveLocalReference(id, nullptr); @@ -2050,7 +2052,7 @@ TEST_F(ReferenceCountTest, TestFree) { rc->FreePlasmaObjects({id}); ASSERT_TRUE(rc->IsPlasmaObjectFreed(id)); ASSERT_TRUE(deleted->count(id) > 0); - ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned_at)); + ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled)); ASSERT_TRUE(pinned_at.IsNil()); rc->RemoveLocalReference(id, nullptr); ASSERT_FALSE(rc->IsPlasmaObjectFreed(id)); diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 27e360b48..a360ff078 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -352,7 +352,7 @@ class ObjectInfoAccessor { /// \return Status virtual Status AsyncGetLocations( const ObjectID &object_id, - const MultiItemCallback &callback) = 0; + const OptionalItemCallback &callback) = 0; /// Get all object's locations from GCS asynchronously. /// @@ -370,6 +370,16 @@ class ObjectInfoAccessor { virtual Status AsyncAddLocation(const ObjectID &object_id, const NodeID &node_id, const StatusCallback &callback) = 0; + /// Add spilled location of object to GCS asynchronously. + /// + /// \param object_id The ID of object which location will be added to GCS. + /// \param spilled_url The URL where the object has been spilled. + /// \param callback Callback that will be called after object has been added to GCS. + /// \return Status + virtual Status AsyncAddSpilledUrl(const ObjectID &object_id, + const std::string &spilled_url, + const StatusCallback &callback) = 0; + /// Remove location of object from GCS asynchronously. /// /// \param object_id The ID of object which location will be removed from GCS. @@ -388,7 +398,8 @@ class ObjectInfoAccessor { /// \return Status virtual Status AsyncSubscribeToLocations( const ObjectID &object_id, - const SubscribeCallback &subscribe, + const SubscribeCallback> + &subscribe, const StatusCallback &done) = 0; /// Cancel subscription to any update of an object's location. diff --git a/src/ray/gcs/gcs_client/global_state_accessor.cc b/src/ray/gcs/gcs_client/global_state_accessor.cc index 3d427d7b6..b556f7479 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.cc +++ b/src/ray/gcs/gcs_client/global_state_accessor.cc @@ -108,17 +108,12 @@ std::unique_ptr GlobalStateAccessor::GetObjectInfo( const ObjectID &object_id) { std::unique_ptr object_info; std::promise promise; - auto on_done = [object_id, &object_info, &promise]( + auto on_done = [&object_info, &promise]( const Status &status, - const std::vector &result) { + const boost::optional &result) { RAY_CHECK_OK(status); - if (!result.empty()) { - rpc::ObjectLocationInfo object_location_info; - object_location_info.set_object_id(object_id.Binary()); - for (auto &data : result) { - object_location_info.add_locations()->CopyFrom(data); - } - object_info.reset(new std::string(object_location_info.SerializeAsString())); + if (result) { + object_info.reset(new std::string(result->SerializeAsString())); } promise.set_value(true); }; diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 2524c4c31..6677bf238 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -1086,19 +1086,15 @@ ServiceBasedObjectInfoAccessor::ServiceBasedObjectInfoAccessor( : client_impl_(client_impl) {} Status ServiceBasedObjectInfoAccessor::AsyncGetLocations( - const ObjectID &object_id, const MultiItemCallback &callback) { + const ObjectID &object_id, + const OptionalItemCallback &callback) { RAY_LOG(DEBUG) << "Getting object locations, object id = " << object_id; rpc::GetObjectLocationsRequest request; request.set_object_id(object_id.Binary()); client_impl_->GetGcsRpcClient().GetObjectLocations( request, [object_id, callback](const Status &status, const rpc::GetObjectLocationsReply &reply) { - std::vector result; - result.reserve((reply.object_table_data_list_size())); - for (int index = 0; index < reply.object_table_data_list_size(); ++index) { - result.emplace_back(reply.object_table_data_list(index)); - } - callback(status, result); + callback(status, reply.location_info()); RAY_LOG(DEBUG) << "Finished getting object locations, status = " << status << ", object id = " << object_id; }); @@ -1151,6 +1147,31 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddLocation(const ObjectID &object_i return Status::OK(); } +Status ServiceBasedObjectInfoAccessor::AsyncAddSpilledUrl( + const ObjectID &object_id, const std::string &spilled_url, + const StatusCallback &callback) { + RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id + << ", spilled_url = " << spilled_url; + rpc::AddObjectLocationRequest request; + request.set_object_id(object_id.Binary()); + request.set_spilled_url(spilled_url); + + auto operation = [this, request, callback](const SequencerDoneCallback &done_callback) { + client_impl_->GetGcsRpcClient().AddObjectLocation( + request, [callback, done_callback](const Status &status, + const rpc::AddObjectLocationReply &reply) { + if (callback) { + callback(status); + } + + done_callback(); + }); + }; + + sequencer_.Post(object_id, operation); + return Status::OK(); +} + Status ServiceBasedObjectInfoAccessor::AsyncRemoveLocation( const ObjectID &object_id, const NodeID &node_id, const StatusCallback &callback) { RAY_LOG(DEBUG) << "Removing object location, object id = " << object_id @@ -1179,7 +1200,7 @@ Status ServiceBasedObjectInfoAccessor::AsyncRemoveLocation( Status ServiceBasedObjectInfoAccessor::AsyncSubscribeToLocations( const ObjectID &object_id, - const SubscribeCallback &subscribe, + const SubscribeCallback> &subscribe, const StatusCallback &done) { RAY_CHECK(subscribe != nullptr) << "Failed to subscribe object location, object id = " << object_id; @@ -1188,10 +1209,20 @@ Status ServiceBasedObjectInfoAccessor::AsyncSubscribeToLocations( subscribe](const StatusCallback &fetch_done) { auto callback = [object_id, subscribe, fetch_done]( const Status &status, - const std::vector &result) { + const boost::optional &result) { if (status.ok()) { - gcs::ObjectChangeNotification notification(rpc::GcsChangeMode::APPEND_OR_ADD, - result); + std::vector notification; + for (const auto &loc : result->locations()) { + rpc::ObjectLocationChange update; + update.set_is_add(true); + update.set_node_id(loc.manager()); + notification.push_back(update); + } + if (!result->spilled_url().empty()) { + rpc::ObjectLocationChange update; + update.set_spilled_url(result->spilled_url()); + notification.push_back(update); + } subscribe(object_id, notification); } if (fetch_done) { @@ -1207,13 +1238,7 @@ Status ServiceBasedObjectInfoAccessor::AsyncSubscribeToLocations( const std::string &data) { rpc::ObjectLocationChange object_location_change; object_location_change.ParseFromString(data); - std::vector object_data_vector; - object_data_vector.emplace_back(object_location_change.data()); - auto change_mode = object_location_change.is_add() - ? rpc::GcsChangeMode::APPEND_OR_ADD - : rpc::GcsChangeMode::REMOVE; - gcs::ObjectChangeNotification notification(change_mode, object_data_vector); - subscribe(object_id, notification); + subscribe(object_id, {object_location_change}); }; return client_impl_->GetGcsPubSub().Subscribe(OBJECT_CHANNEL, object_id.Hex(), on_subscribe, subscribe_done); diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index d5f67ffbd..0d53da1ae 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -326,19 +326,23 @@ class ServiceBasedObjectInfoAccessor : public ObjectInfoAccessor { Status AsyncGetLocations( const ObjectID &object_id, - const MultiItemCallback &callback) override; + const OptionalItemCallback &callback) override; Status AsyncGetAll(const MultiItemCallback &callback) override; Status AsyncAddLocation(const ObjectID &object_id, const NodeID &node_id, const StatusCallback &callback) override; + Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url, + const StatusCallback &callback) override; + Status AsyncRemoveLocation(const ObjectID &object_id, const NodeID &node_id, const StatusCallback &callback) override; Status AsyncSubscribeToLocations( const ObjectID &object_id, - const SubscribeCallback &subscribe, + const SubscribeCallback> + &subscribe, const StatusCallback &done) override; Status AsyncUnsubscribeToLocations(const ObjectID &object_id) override; diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 36d69604e..5fa772f13 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -440,7 +440,8 @@ class ServiceBasedGcsClientTest : public ::testing::Test { bool SubscribeToLocations( const ObjectID &object_id, - const gcs::SubscribeCallback &subscribe) { + const gcs::SubscribeCallback> + &subscribe) { std::promise promise; RAY_CHECK_OK(gcs_client_->Objects().AsyncSubscribeToLocations( object_id, subscribe, @@ -479,9 +480,12 @@ class ServiceBasedGcsClientTest : public ::testing::Test { std::promise promise; std::vector locations; RAY_CHECK_OK(gcs_client_->Objects().AsyncGetLocations( - object_id, [&locations, &promise]( - Status status, const std::vector &result) { - locations = result; + object_id, + [&locations, &promise](Status status, + const boost::optional &result) { + for (const auto &loc : result->locations()) { + locations.push_back(loc); + } promise.set_value(status.ok()); })); EXPECT_TRUE(WaitReady(promise.get_future(), timeout_ms_)); @@ -851,11 +855,11 @@ TEST_F(ServiceBasedGcsClientTest, TestObjectInfo) { std::atomic object_remove_count(0); auto on_subscribe = [&object_add_count, &object_remove_count]( const ObjectID &object_id, - const gcs::ObjectChangeNotification &result) { - if (!result.GetData().empty()) { - if (result.IsAdded()) { + const std::vector &result) { + for (const auto &res : result) { + if (res.is_add()) { ++object_add_count; - } else if (result.IsRemoved()) { + } else { ++object_remove_count; } } @@ -1011,16 +1015,18 @@ TEST_F(ServiceBasedGcsClientTest, TestObjectTableResubscribe) { std::atomic object1_change_count(0); std::atomic object2_change_count(0); ASSERT_TRUE(SubscribeToLocations( - object1_id, [&object1_change_count](const ObjectID &object_id, - const gcs::ObjectChangeNotification &result) { - if (!result.GetData().empty()) { + object1_id, + [&object1_change_count](const ObjectID &object_id, + const std::vector &result) { + if (!result.empty()) { ++object1_change_count; } })); ASSERT_TRUE(SubscribeToLocations( - object2_id, [&object2_change_count](const ObjectID &object_id, - const gcs::ObjectChangeNotification &result) { - if (!result.GetData().empty()) { + object2_id, + [&object2_change_count](const ObjectID &object_id, + const std::vector &result) { + if (!result.empty()) { ++object2_change_count; } })); @@ -1232,8 +1238,8 @@ TEST_F(ServiceBasedGcsClientTest, TestMultiThreadSubAndUnsub) { for (int index = 0; index < sub_and_unsub_loop_count; ++index) { auto object_id = ObjectID::FromRandom(); ASSERT_TRUE(SubscribeToLocations( - object_id, - [](const ObjectID &id, const gcs::ObjectChangeNotification &result) {})); + object_id, [](const ObjectID &id, + const std::vector &result) {})); gcs_client_->Objects().AsyncResubscribe(false); UnsubscribeToLocations(object_id); } diff --git a/src/ray/gcs/gcs_server/gcs_object_manager.cc b/src/ray/gcs/gcs_server/gcs_object_manager.cc index 406180e4a..2528f6006 100644 --- a/src/ray/gcs/gcs_server/gcs_object_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_object_manager.cc @@ -23,15 +23,15 @@ namespace gcs { void GcsObjectManager::HandleGetObjectLocations( const rpc::GetObjectLocationsRequest &request, rpc::GetObjectLocationsReply *reply, rpc::SendReplyCallback send_reply_callback) { + reply->mutable_location_info()->set_object_id(request.object_id()); + ObjectID object_id = ObjectID::FromBinary(request.object_id()); RAY_LOG(DEBUG) << "Getting object locations, job id = " << object_id.TaskId().JobId() << ", object id = " << object_id; - auto object_locations = GetObjectLocations(object_id); - for (auto &node_id : object_locations) { - rpc::ObjectTableData object_table_data; - object_table_data.set_manager(node_id.Binary()); - reply->add_object_table_data_list()->CopyFrom(object_table_data); - } + + absl::MutexLock lock(&mutex_); + auto object_data = GenObjectLocationInfo(object_id); + reply->mutable_location_info()->Swap(&object_data); RAY_LOG(DEBUG) << "Finished getting object locations, job id = " << object_id.TaskId().JobId() << ", object id = " << object_id; GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); @@ -45,7 +45,7 @@ void GcsObjectManager::HandleGetAllObjectLocations( for (auto &item : object_to_locations_) { rpc::ObjectLocationInfo object_location_info; object_location_info.set_object_id(item.first.Binary()); - for (auto &node_id : item.second) { + for (auto &node_id : item.second.locations) { rpc::ObjectTableData object_table_data; object_table_data.set_manager(node_id.Binary()); object_location_info.add_locations()->CopyFrom(object_table_data); @@ -60,17 +60,33 @@ void GcsObjectManager::HandleAddObjectLocation( const rpc::AddObjectLocationRequest &request, rpc::AddObjectLocationReply *reply, rpc::SendReplyCallback send_reply_callback) { ObjectID object_id = ObjectID::FromBinary(request.object_id()); - NodeID node_id = NodeID::FromBinary(request.node_id()); - RAY_LOG(DEBUG) << "Adding object location, job id = " << object_id.TaskId().JobId() - << ", object id = " << object_id << ", node id = " << node_id; - AddObjectLocationInCache(object_id, node_id); - auto on_done = [this, object_id, node_id, reply, + NodeID node_id; + std::string spilled_url; + if (!request.node_id().empty()) { + node_id = NodeID::FromBinary(request.node_id()); + RAY_LOG(DEBUG) << "Adding object location, job id = " << object_id.TaskId().JobId() + << ", object id = " << object_id << ", node id = " << node_id; + AddObjectLocationInCache(object_id, node_id); + } else { + absl::MutexLock lock(&mutex_); + object_to_locations_[object_id].spilled_url = request.spilled_url(); + RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id; + } + + auto on_done = [this, object_id, node_id, spilled_url, reply, send_reply_callback](const Status &status) { if (status.ok()) { - RAY_CHECK_OK(gcs_pub_sub_->Publish( - OBJECT_CHANNEL, object_id.Hex(), - gcs::CreateObjectLocationChange(node_id, true)->SerializeAsString(), nullptr)); + rpc::ObjectLocationChange notification; + notification.set_is_add(true); + if (!node_id.IsNil()) { + notification.set_node_id(node_id.Binary()); + } + if (!spilled_url.empty()) { + notification.set_spilled_url(spilled_url); + } + RAY_CHECK_OK(gcs_pub_sub_->Publish(OBJECT_CHANNEL, object_id.Hex(), + notification.SerializeAsString(), nullptr)); RAY_LOG(DEBUG) << "Finished adding object location, job id = " << object_id.TaskId().JobId() << ", object id = " << object_id << ", node id = " << node_id << ", task id = " << object_id.TaskId(); @@ -86,11 +102,8 @@ void GcsObjectManager::HandleAddObjectLocation( }; absl::MutexLock lock(&mutex_); - auto object_location_set = - GetObjectLocationSet(object_id, /* create_if_not_exist */ false); - auto object_table_data_list = GenObjectTableDataList(*object_location_set); - Status status = - gcs_table_storage_->ObjectTable().Put(object_id, *object_table_data_list, on_done); + const auto object_data = GenObjectLocationInfo(object_id); + Status status = gcs_table_storage_->ObjectTable().Put(object_id, object_data, on_done); if (!status.ok()) { on_done(status); } @@ -130,9 +143,8 @@ void GcsObjectManager::HandleRemoveObjectLocation( GetObjectLocationSet(object_id, /* create_if_not_exist */ false); Status status; if (object_location_set != nullptr) { - auto object_table_data_list = GenObjectTableDataList(*object_location_set); - status = gcs_table_storage_->ObjectTable().Put(object_id, *object_table_data_list, - on_done); + const auto object_data = GenObjectLocationInfo(object_id); + status = gcs_table_storage_->ObjectTable().Put(object_id, object_data, on_done); } else { status = gcs_table_storage_->ObjectTable().Delete(object_id, on_done); } @@ -154,7 +166,7 @@ void GcsObjectManager::AddObjectsLocation( for (const auto &object_id : object_ids) { auto *object_locations = GetObjectLocationSet(object_id, /* create_if_not_exist */ true); - object_locations->emplace(node_id); + object_locations->locations.emplace(node_id); } } @@ -167,7 +179,7 @@ void GcsObjectManager::AddObjectLocationInCache(const ObjectID &object_id, auto *object_locations = GetObjectLocationSet(object_id, /* create_if_not_exist */ true); - object_locations->emplace(node_id); + object_locations->locations.emplace(node_id); } absl::flat_hash_set GcsObjectManager::GetObjectLocations( @@ -176,7 +188,7 @@ absl::flat_hash_set GcsObjectManager::GetObjectLocations( auto *object_locations = GetObjectLocationSet(object_id); if (object_locations) { - return *object_locations; + return object_locations->locations; } return absl::flat_hash_set{}; } @@ -198,8 +210,8 @@ void GcsObjectManager::OnNodeRemoved(const NodeID &node_id) { for (const auto &object_id : objects_on_node) { auto *object_locations = GetObjectLocationSet(object_id); if (object_locations) { - object_locations->erase(node_id); - if (object_locations->empty()) { + object_locations->locations.erase(node_id); + if (object_locations->locations.empty() && object_locations->spilled_url.empty()) { object_to_locations_.erase(object_id); } } @@ -212,8 +224,8 @@ void GcsObjectManager::RemoveObjectLocationInCache(const ObjectID &object_id, auto *object_locations = GetObjectLocationSet(object_id); if (object_locations) { - object_locations->erase(node_id); - if (object_locations->empty()) { + object_locations->locations.erase(node_id); + if (object_locations->locations.empty() && object_locations->spilled_url.empty()) { object_to_locations_.erase(object_id); } } @@ -258,25 +270,28 @@ GcsObjectManager::ObjectSet *GcsObjectManager::GetObjectSetByNode( return objects_on_node; } -std::shared_ptr GcsObjectManager::GenObjectTableDataList( - const GcsObjectManager::LocationSet &location_set) const { - auto object_table_data_list = std::make_shared(); - for (auto &node_id : location_set) { - object_table_data_list->add_items()->set_manager(node_id.Binary()); +const ObjectLocationInfo GcsObjectManager::GenObjectLocationInfo( + const ObjectID &object_id) const { + ObjectLocationInfo object_data; + object_data.set_object_id(object_id.Binary()); + auto it = object_to_locations_.find(object_id); + if (it != object_to_locations_.end()) { + for (const auto &node_id : it->second.locations) { + object_data.add_locations()->set_manager(node_id.Binary()); + } + object_data.set_spilled_url(it->second.spilled_url); } - return object_table_data_list; + return object_data; } void GcsObjectManager::LoadInitialData(const EmptyCallback &done) { RAY_LOG(INFO) << "Loading initial data."; - auto callback = [this, done]( - const std::unordered_map &result) { + auto callback = [this, + done](const std::unordered_map &result) { absl::flat_hash_map node_to_objects; for (auto &item : result) { - auto object_list = item.second; - for (int index = 0; index < object_list.items_size(); ++index) { - node_to_objects[NodeID::FromBinary(object_list.items(index).manager())].insert( - item.first); + for (const auto &loc : item.second.locations()) { + node_to_objects[NodeID::FromBinary(loc.manager())].insert(item.first); } } diff --git a/src/ray/gcs/gcs_server/gcs_object_manager.h b/src/ray/gcs/gcs_server/gcs_object_manager.h index 6555da1ba..dc5249c1c 100644 --- a/src/ray/gcs/gcs_server/gcs_object_manager.h +++ b/src/ray/gcs/gcs_server/gcs_object_manager.h @@ -60,7 +60,10 @@ class GcsObjectManager : public rpc::ObjectInfoHandler { void LoadInitialData(const EmptyCallback &done); protected: - typedef absl::flat_hash_set LocationSet; + struct LocationSet { + absl::flat_hash_set locations; + std::string spilled_url = ""; + }; /// Add a location of objects. /// If the GCS server restarts, this function is used to reload data from storage. @@ -82,7 +85,8 @@ class GcsObjectManager : public rpc::ObjectInfoHandler { /// /// \param object_id The id of object to lookup. /// \return Object locations. - LocationSet GetObjectLocations(const ObjectID &object_id) LOCKS_EXCLUDED(mutex_); + absl::flat_hash_set GetObjectLocations(const ObjectID &object_id) + LOCKS_EXCLUDED(mutex_); /// Handler if a node is removed. /// @@ -99,8 +103,8 @@ class GcsObjectManager : public rpc::ObjectInfoHandler { private: typedef absl::flat_hash_set ObjectSet; - std::shared_ptr GenObjectTableDataList( - const GcsObjectManager::LocationSet &location_set) const; + const ObjectLocationInfo GenObjectLocationInfo(const ObjectID &object_id) const + EXCLUSIVE_LOCKS_REQUIRED(mutex_); /// Get object locations by object id from map. /// Will create it if not exist and the flag create_if_not_exist is set to true. diff --git a/src/ray/gcs/gcs_server/gcs_table_storage.cc b/src/ray/gcs/gcs_server/gcs_table_storage.cc index 234a63855..93cb7cc21 100644 --- a/src/ray/gcs/gcs_server/gcs_table_storage.cc +++ b/src/ray/gcs/gcs_server/gcs_table_storage.cc @@ -140,14 +140,14 @@ template class GcsTable; template class GcsTable; template class GcsTable; template class GcsTable; -template class GcsTable; +template class GcsTable; template class GcsTable; template class GcsTableWithJobId; template class GcsTableWithJobId; template class GcsTableWithJobId; template class GcsTableWithJobId; template class GcsTableWithJobId; -template class GcsTableWithJobId; +template class GcsTableWithJobId; template class GcsTable; template class GcsTable; diff --git a/src/ray/gcs/gcs_server/gcs_table_storage.h b/src/ray/gcs/gcs_server/gcs_table_storage.h index eff54cc11..db6315d81 100644 --- a/src/ray/gcs/gcs_server/gcs_table_storage.h +++ b/src/ray/gcs/gcs_server/gcs_table_storage.h @@ -31,8 +31,8 @@ using rpc::GcsNodeInfo; using rpc::HeartbeatBatchTableData; using rpc::HeartbeatTableData; using rpc::JobTableData; +using rpc::ObjectLocationInfo; using rpc::ObjectTableData; -using rpc::ObjectTableDataList; using rpc::PlacementGroupTableData; using rpc::ProfileTableData; using rpc::ResourceMap; @@ -234,7 +234,7 @@ class GcsTaskReconstructionTable JobID GetJobIdFromKey(const TaskID &key) override { return key.ActorId().JobId(); } }; -class GcsObjectTable : public GcsTableWithJobId { +class GcsObjectTable : public GcsTableWithJobId { public: explicit GcsObjectTable(std::shared_ptr &store_client) : GcsTableWithJobId(store_client) { diff --git a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc index 4ca1472ea..d6d897e3f 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc @@ -295,8 +295,8 @@ class GcsServerTest : public ::testing::Test { request, [&object_locations, &promise]( const Status &status, const rpc::GetObjectLocationsReply &reply) { RAY_CHECK_OK(status); - for (int index = 0; index < reply.object_table_data_list_size(); ++index) { - object_locations.push_back(reply.object_table_data_list(index)); + for (const auto &loc : reply.location_info().locations()) { + object_locations.push_back(loc); } promise.set_value(true); }); diff --git a/src/ray/gcs/pb_util.h b/src/ray/gcs/pb_util.h index 3b2923724..6855e4492 100644 --- a/src/ray/gcs/pb_util.h +++ b/src/ray/gcs/pb_util.h @@ -105,11 +105,9 @@ inline std::shared_ptr CreateWorkerFailureData( /// \return The object location change created by this method. inline std::shared_ptr CreateObjectLocationChange( const NodeID &node_id, bool is_add) { - ray::rpc::ObjectTableData object_table_data; - object_table_data.set_manager(node_id.Binary()); auto object_location_change = std::make_shared(); object_location_change->set_is_add(is_add); - object_location_change->mutable_data()->CopyFrom(object_table_data); + object_location_change->set_node_id(node_id.Binary()); return object_location_change; } diff --git a/src/ray/gcs/redis_accessor.cc b/src/ray/gcs/redis_accessor.cc index c2d945bb1..1df21cae8 100644 --- a/src/ray/gcs/redis_accessor.cc +++ b/src/ray/gcs/redis_accessor.cc @@ -414,11 +414,18 @@ RedisObjectInfoAccessor::RedisObjectInfoAccessor(RedisGcsClient *client_impl) : client_impl_(client_impl), object_sub_executor_(client_impl->object_table()) {} Status RedisObjectInfoAccessor::AsyncGetLocations( - const ObjectID &object_id, const MultiItemCallback &callback) { + const ObjectID &object_id, + const OptionalItemCallback &callback) { RAY_CHECK(callback != nullptr); auto on_done = [callback](RedisGcsClient *client, const ObjectID &object_id, const std::vector &data) { - callback(Status::OK(), data); + rpc::ObjectLocationInfo info; + info.set_object_id(object_id.Binary()); + for (const auto &item : data) { + auto item_ptr = info.add_locations(); + item_ptr->CopyFrom(item); + } + callback(Status::OK(), info); }; ObjectTable &object_table = client_impl_->object_table(); @@ -463,10 +470,22 @@ Status RedisObjectInfoAccessor::AsyncRemoveLocation(const ObjectID &object_id, Status RedisObjectInfoAccessor::AsyncSubscribeToLocations( const ObjectID &object_id, - const SubscribeCallback &subscribe, + const SubscribeCallback> &subscribe, const StatusCallback &done) { RAY_CHECK(subscribe != nullptr); - return object_sub_executor_.AsyncSubscribe(subscribe_id_, object_id, subscribe, done); + return object_sub_executor_.AsyncSubscribe( + subscribe_id_, object_id, + [subscribe](const ObjectID &id, const ObjectChangeNotification ¬ification_data) { + std::vector updates; + for (const auto &item : notification_data.GetData()) { + rpc::ObjectLocationChange update; + update.set_is_add(notification_data.IsAdded()); + update.set_node_id(item.manager()); + updates.push_back(update); + } + subscribe(id, updates); + }, + done); } Status RedisObjectInfoAccessor::AsyncUnsubscribeToLocations(const ObjectID &object_id) { diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index f3a3eb4cb..491cf89ce 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -264,8 +264,9 @@ class RedisObjectInfoAccessor : public ObjectInfoAccessor { virtual ~RedisObjectInfoAccessor() {} - Status AsyncGetLocations(const ObjectID &object_id, - const MultiItemCallback &callback) override; + Status AsyncGetLocations( + const ObjectID &object_id, + const OptionalItemCallback &callback) override; Status AsyncGetAll( const MultiItemCallback &callback) override { @@ -275,12 +276,18 @@ class RedisObjectInfoAccessor : public ObjectInfoAccessor { Status AsyncAddLocation(const ObjectID &object_id, const NodeID &node_id, const StatusCallback &callback) override; + Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url, + const StatusCallback &callback) override { + return Status::NotImplemented("AsyncAddSpilledUrl not implemented"); + } + Status AsyncRemoveLocation(const ObjectID &object_id, const NodeID &node_id, const StatusCallback &callback) override; Status AsyncSubscribeToLocations( const ObjectID &object_id, - const SubscribeCallback &subscribe, + const SubscribeCallback> + &subscribe, const StatusCallback &done) override; Status AsyncUnsubscribeToLocations(const ObjectID &object_id) override; diff --git a/src/ray/gcs/test/redis_object_info_accessor_test.cc b/src/ray/gcs/test/redis_object_info_accessor_test.cc index e69c9bc79..bbe310b97 100644 --- a/src/ray/gcs/test/redis_object_info_accessor_test.cc +++ b/src/ray/gcs/test/redis_object_info_accessor_test.cc @@ -70,9 +70,10 @@ TEST_F(RedisObjectInfoAccessorTest, TestGetAddRemove) { size_t total_size = elem.second.size(); RAY_CHECK_OK(object_accessor.AsyncGetLocations( elem.first, - [this, total_size](Status status, const std::vector &result) { + [this, total_size](Status status, + const boost::optional &result) { RAY_CHECK_OK(status); - RAY_CHECK(total_size == result.size()); + ASSERT_EQ(total_size, result->locations().size()); --pending_count_; })); } @@ -83,17 +84,18 @@ TEST_F(RedisObjectInfoAccessorTest, TestGetAddRemove) { // subscribe && delete // subscribe std::atomic sub_pending_count(0); - auto subscribe = [this, &sub_pending_count](const ObjectID &object_id, - const ObjectChangeNotification &result) { + auto subscribe = [this, &sub_pending_count]( + const ObjectID &object_id, + const std::vector &result) { const auto it = object_id_to_data_.find(object_id); ASSERT_TRUE(it != object_id_to_data_.end()); static size_t response_count = 1; size_t cur_count = response_count <= object_count_ ? copy_count_ : 1; - ASSERT_EQ(result.GetData().size(), cur_count); - rpc::GcsChangeMode change_mode = response_count <= object_count_ - ? rpc::GcsChangeMode::APPEND_OR_ADD - : rpc::GcsChangeMode::REMOVE; - ASSERT_EQ(change_mode, result.GetGcsChangeMode()); + ASSERT_EQ(result.size(), cur_count); + bool change_mode = response_count <= object_count_; + for (const auto &res : result) { + ASSERT_EQ(change_mode, res.is_add()); + } ++response_count; --sub_pending_count; }; @@ -128,9 +130,10 @@ TEST_F(RedisObjectInfoAccessorTest, TestGetAddRemove) { size_t total_size = elem.second.size(); RAY_CHECK_OK(object_accessor.AsyncGetLocations( elem.first, - [this, total_size](Status status, const std::vector &result) { + [this, total_size](Status status, + const boost::optional &result) { RAY_CHECK_OK(status); - ASSERT_EQ(total_size - 1, result.size()); + ASSERT_EQ(total_size - 1, result->locations().size()); --pending_count_; })); } diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index 403e05893..6ef217a1f 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -29,22 +29,31 @@ using ray::rpc::ObjectTableData; /// Process a notification of the object table entries and store the result in /// node_ids. This assumes that node_ids already contains the result of the /// object table entries up to but not including this notification. -bool UpdateObjectLocations(bool is_added, - const std::vector &location_updates, +bool UpdateObjectLocations(const std::vector &location_updates, std::shared_ptr gcs_client, - std::unordered_set *node_ids) { + std::unordered_set *node_ids, + std::string *spilled_url) { // location_updates contains the updates of locations of the object. // with GcsChangeMode, we can determine whether the update mode is // addition or deletion. bool isUpdated = false; - for (const auto &object_table_data : location_updates) { - NodeID node_id = NodeID::FromBinary(object_table_data.manager()); - if (is_added && 0 == node_ids->count(node_id)) { - node_ids->insert(node_id); - isUpdated = true; - } else if (!is_added && 1 == node_ids->count(node_id)) { - node_ids->erase(node_id); - isUpdated = true; + for (const auto &update : location_updates) { + if (!update.node_id().empty()) { + NodeID node_id = NodeID::FromBinary(update.node_id()); + if (update.is_add() && 0 == node_ids->count(node_id)) { + node_ids->insert(node_id); + isUpdated = true; + } else if (!update.is_add() && 1 == node_ids->count(node_id)) { + node_ids->erase(node_id); + isUpdated = true; + } + } else { + RAY_CHECK(!update.spilled_url().empty()); + RAY_LOG(DEBUG) << "Received object spilled at " << update.spilled_url(); + if (update.spilled_url() != *spilled_url) { + *spilled_url = update.spilled_url(); + isUpdated = true; + } } } // Filter out the removed clients from the object locations. @@ -111,14 +120,15 @@ void ObjectDirectory::HandleClientRemoved(const NodeID &client_id) { if (listener.second.current_object_locations.count(client_id) > 0) { // If the subscribed object has the removed client as a location, update // its locations with an empty update so that the location will be removed. - UpdateObjectLocations(/*is_added*/ true, {}, gcs_client_, - &listener.second.current_object_locations); + UpdateObjectLocations({}, gcs_client_, &listener.second.current_object_locations, + &listener.second.spilled_url); // Re-call all the subscribed callbacks for the object, since its // locations have changed. for (const auto &callback_pair : listener.second.callbacks) { // It is safe to call the callback directly since this is already running // in the subscription callback stack. - callback_pair.second(object_id, listener.second.current_object_locations); + callback_pair.second(object_id, listener.second.current_object_locations, + listener.second.spilled_url); } } } @@ -135,7 +145,7 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i auto object_notification_callback = [this](const ObjectID &object_id, - const gcs::ObjectChangeNotification &object_notification) { + const std::vector &object_notifications) { // Objects are added to this map in SubscribeObjectLocations. auto it = listeners_.find(object_id); // Do nothing for objects we are not listening for. @@ -147,9 +157,9 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i it->second.subscribed = true; // Update entries for this object. - if (!UpdateObjectLocations(object_notification.IsAdded(), - object_notification.GetData(), gcs_client_, - &it->second.current_object_locations)) { + if (!UpdateObjectLocations(object_notifications, gcs_client_, + &it->second.current_object_locations, + &it->second.spilled_url)) { return; } // Copy the callbacks so that the callbacks can unsubscribe without interrupting @@ -162,7 +172,8 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i for (const auto &callback_pair : callbacks) { // It is safe to call the callback directly since this is already running // in the subscription callback stack. - callback_pair.second(object_id, it->second.current_object_locations); + callback_pair.second(object_id, it->second.current_object_locations, + it->second.spilled_url); } }; status = gcs_client_->Objects().AsyncSubscribeToLocations( @@ -179,8 +190,10 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i // immediately notify the caller of the current known locations. if (listener_state.subscribed) { auto &locations = listener_state.current_object_locations; - io_service_.post( - [callback, locations, object_id]() { callback(object_id, locations); }); + auto &spilled_url = listener_state.spilled_url; + io_service_.post([callback, locations, spilled_url, object_id]() { + callback(object_id, locations, spilled_url); + }); } return status; } @@ -211,8 +224,10 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id, // the object's creation, then call the callback immediately with the // cached locations. auto &locations = it->second.current_object_locations; - io_service_.post( - [callback, object_id, locations]() { callback(object_id, locations); }); + auto &spilled_url = it->second.spilled_url; + io_service_.post([callback, object_id, spilled_url, locations]() { + callback(object_id, locations, spilled_url); + }); } else { // We do not have any locations cached due to a concurrent // SubscribeObjectLocations call, so look up the object's locations @@ -220,16 +235,29 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id, status = gcs_client_->Objects().AsyncGetLocations( object_id, [this, object_id, callback]( - Status status, const std::vector &location_updates) { + Status status, const boost::optional &update) { RAY_CHECK(status.ok()) << "Failed to get object location from GCS: " << status.message(); // Build the set of current locations based on the entries in the log. + std::vector notification; + for (const auto &loc : update->locations()) { + rpc::ObjectLocationChange change; + change.set_is_add(true); + change.set_node_id(loc.manager()); + notification.push_back(change); + } + if (!update->spilled_url().empty()) { + rpc::ObjectLocationChange change; + change.set_spilled_url(update->spilled_url()); + notification.push_back(change); + } + std::unordered_set node_ids; - UpdateObjectLocations(/*is_added*/ true, location_updates, gcs_client_, - &node_ids); + std::string spilled_url; + UpdateObjectLocations(notification, gcs_client_, &node_ids, &spilled_url); // It is safe to call the callback directly since this is already running // in the GCS client's lookup callback stack. - callback(object_id, node_ids); + callback(object_id, node_ids, spilled_url); }); } return status; diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index b399b66b9..77590b987 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -59,8 +59,9 @@ class ObjectDirectoryInterface { virtual std::vector LookupAllRemoteConnections() const = 0; /// Callback for object location notifications. - using OnLocationsFound = std::function &)>; + using OnLocationsFound = + std::function &, const std::string &)>; /// Lookup object locations. Callback may be invoked with empty list of client ids. /// @@ -182,6 +183,8 @@ class ObjectDirectory : public ObjectDirectoryInterface { std::unordered_map callbacks; /// The current set of known locations of this object. std::unordered_set current_object_locations; + /// The location where this object has been spilled, if any. + std::string spilled_url = ""; /// This flag will get set to true if received any notification of the object. /// It means current_object_locations is up-to-date with GCS. It /// should never go back to false once set to true. If this is true, and diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 186b824e8..c279806cc 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -50,7 +50,8 @@ ObjectStoreRunner::~ObjectStoreRunner() { ObjectManager::ObjectManager(asio::io_service &main_service, const NodeID &self_node_id, const ObjectManagerConfig &config, - std::shared_ptr object_directory) + std::shared_ptr object_directory, + RestoreSpilledObjectCallback restore_spilled_object) : self_node_id_(self_node_id), config_(config), object_directory_(std::move(object_directory)), @@ -61,7 +62,8 @@ ObjectManager::ObjectManager(asio::io_service &main_service, const NodeID &self_ object_manager_server_("ObjectManager", config_.object_manager_port, config_.rpc_service_threads_number), object_manager_service_(rpc_service_, *this), - client_call_manager_(main_service, config_.rpc_service_threads_number) { + client_call_manager_(main_service, config_.rpc_service_threads_number), + restore_spilled_object_(restore_spilled_object) { RAY_CHECK(config_.rpc_service_threads_number > 0); main_service_ = &main_service; @@ -184,7 +186,8 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id, // no ordering guarantee between notifications. return object_directory_->SubscribeObjectLocations( object_directory_pull_callback_id_, object_id, owner_address, - [this](const ObjectID &object_id, const std::unordered_set &client_ids) { + [this](const ObjectID &object_id, const std::unordered_set &client_ids, + const std::string &spilled_url) { // Exit if the Pull request has already been fulfilled or canceled. auto it = pull_requests_.find(object_id); if (it == pull_requests_.end()) { @@ -196,7 +199,16 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id, // before. it->second.client_locations = std::vector(client_ids.begin(), client_ids.end()); - if (it->second.client_locations.empty()) { + if (!spilled_url.empty()) { + // Try to restore the spilled object. + restore_spilled_object_(object_id, spilled_url, + [this, object_id](const ray::Status &status) { + // Fall back to fetching from another object manager. + if (!status.ok()) { + TryPull(object_id); + } + }); + } else if (it->second.client_locations.empty()) { // The object locations are now empty, so we should wait for the next // notification about a new object location. Cancel the timer until // the next Pull attempt since there are no more clients to try. @@ -605,12 +617,14 @@ ray::Status ObjectManager::LookupRemainingWaitObjects(const UniqueID &wait_id) { RAY_RETURN_NOT_OK(object_directory_->LookupLocations( object_id, wait_state.owner_addresses[object_id], [this, wait_id](const ObjectID &lookup_object_id, - const std::unordered_set &client_ids) { + const std::unordered_set &client_ids, + const std::string &spilled_url) { auto &wait_state = active_wait_requests_.find(wait_id)->second; // Note that the object is guaranteed to be added to local_objects_ before // the notification is triggered. + bool remote_object_ready = !client_ids.empty() || !spilled_url.empty(); if (local_objects_.count(lookup_object_id) > 0 || - (!wait_state.wait_local && !client_ids.empty())) { + (!wait_state.wait_local && remote_object_ready)) { wait_state.remaining.erase(lookup_object_id); wait_state.found.insert(lookup_object_id); } @@ -646,7 +660,8 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) { RAY_CHECK_OK(object_directory_->SubscribeObjectLocations( wait_id, object_id, wait_state.owner_addresses[object_id], [this, wait_id](const ObjectID &subscribe_object_id, - const std::unordered_set &client_ids) { + const std::unordered_set &client_ids, + const std::string &spilled_url) { auto object_id_wait_state = active_wait_requests_.find(wait_id); if (object_id_wait_state == active_wait_requests_.end()) { // Depending on the timing of calls to the object directory, we @@ -658,8 +673,9 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) { auto &wait_state = object_id_wait_state->second; // Note that the object is guaranteed to be added to local_objects_ before // the notification is triggered. + bool remote_object_ready = !client_ids.empty() || !spilled_url.empty(); if (local_objects_.count(subscribe_object_id) > 0 || - (!wait_state.wait_local && !client_ids.empty())) { + (!wait_state.wait_local && remote_object_ready)) { RAY_LOG(DEBUG) << "Wait request " << wait_id << ": subscription notification received for object " << subscribe_object_id; diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index f5dacb11a..b972d2771 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -100,6 +100,9 @@ class ObjectManagerInterface { class ObjectManager : public ObjectManagerInterface, public rpc::ObjectManagerServiceHandler { public: + using RestoreSpilledObjectCallback = std::function)>; + /// Implementation of object manager service /// Handle push request from remote object manager @@ -186,7 +189,8 @@ class ObjectManager : public ObjectManagerInterface, /// \param object_directory An object implementing the object directory interface. explicit ObjectManager(boost::asio::io_service &main_service, const NodeID &self_node_id, const ObjectManagerConfig &config, - std::shared_ptr object_directory); + std::shared_ptr object_directory, + RestoreSpilledObjectCallback restore_spilled_object); ~ObjectManager(); @@ -466,6 +470,8 @@ class ObjectManager : public ObjectManagerInterface, std::unordered_map> remote_object_manager_clients_; + const RestoreSpilledObjectCallback restore_spilled_object_; + /// Running sum of the amount of memory used in the object store. int64_t used_memory_ = 0; }; diff --git a/src/ray/object_manager/ownership_based_object_directory.cc b/src/ray/object_manager/ownership_based_object_directory.cc index c73bbe104..998513f06 100644 --- a/src/ray/object_manager/ownership_based_object_directory.cc +++ b/src/ray/object_manager/ownership_based_object_directory.cc @@ -141,7 +141,7 @@ void OwnershipBasedObjectDirectory::SubscriptionCallback( for (const auto &callback_pair : callbacks) { // It is safe to call the callback directly since this is already running // in the subscription callback stack. - callback_pair.second(object_id, it->second.current_object_locations); + callback_pair.second(object_id, it->second.current_object_locations, ""); } } @@ -207,8 +207,9 @@ ray::Status OwnershipBasedObjectDirectory::LookupLocations( if (rpc_client == nullptr) { RAY_LOG(WARNING) << "Object " << object_id << " does not have owner. " << "LookupLocations returns an empty list of locations."; - io_service_.post( - [callback, object_id]() { callback(object_id, std::unordered_set()); }); + io_service_.post([callback, object_id]() { + callback(object_id, std::unordered_set(), ""); + }); return Status::OK(); } @@ -228,7 +229,7 @@ ray::Status OwnershipBasedObjectDirectory::LookupLocations( client_ids.emplace(NodeID::FromBinary(client_id)); } FilterRemovedClients(gcs_client_, &client_ids); - callback(object_id, client_ids); + callback(object_id, client_ids, ""); }); return Status::OK(); } diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index 60ca1e967..14e02112b 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -54,7 +54,8 @@ class MockServer { config_(object_manager_config), gcs_client_(gcs_client), object_manager_(main_service, node_id_, object_manager_config, - std::make_shared(main_service, gcs_client_)) { + std::make_shared(main_service, gcs_client_), + nullptr) { RAY_CHECK_OK(RegisterGcs(main_service)); } diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index 80bdfd31e..6779f7936 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -50,7 +50,8 @@ class MockServer { config_(object_manager_config), gcs_client_(gcs_client), object_manager_(main_service, node_id_, object_manager_config, - std::make_shared(main_service, gcs_client_)) { + std::make_shared(main_service, gcs_client_), + nullptr) { RAY_CHECK_OK(RegisterGcs(main_service)); } @@ -262,9 +263,9 @@ class TestObjectManager : public TestObjectManagerBase { RAY_CHECK_OK(server1->object_manager_.object_directory_->SubscribeObjectLocations( sub_id, object_1, rpc::Address(), - [this, sub_id, object_1, object_2]( - const ray::ObjectID &object_id, - const std::unordered_set &clients) { + [this, sub_id, object_1, object_2](const ray::ObjectID &object_id, + const std::unordered_set &clients, + const std::string &spilled_url) { if (!clients.empty()) { TestWaitWhileSubscribed(sub_id, object_1, object_2); } diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 110376f13..a85f2be84 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -79,10 +79,8 @@ message GcsEntry { } message ObjectTableData { - // The size of the object. - uint64 object_size = 1; // The node manager ID that this object appeared on or was evicted by. - bytes manager = 2; + bytes manager = 1; } message TaskReconstructionData { @@ -410,19 +408,22 @@ message StoredConfig { map config = 1; } -message ObjectTableDataList { - repeated ObjectTableData items = 1; -} - message ObjectLocationInfo { bytes object_id = 1; repeated ObjectTableData locations = 2; + // For objects that have been spilled to external storage, the URL from which + // they can be retrieved. + string spilled_url = 3; } // A notification message about one object's locations being changed. message ObjectLocationChange { bool is_add = 1; - ObjectTableData data = 2; + // The node manager ID that this object appeared on or was evicted by. + bytes node_id = 2; + // The object has been spilled to this URL. This should be set xor the above + // fields are set. + string spilled_url = 3; } // A notification message about one node's resources being changed. diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index ac048e4cf..e43d90440 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -303,8 +303,8 @@ message GetObjectLocationsRequest { message GetObjectLocationsReply { GcsStatus status = 1; - // Data of object. - repeated ObjectTableData object_table_data_list = 2; + // Object location information. + ObjectLocationInfo location_info = 2; } message GetAllObjectLocationsRequest { @@ -321,12 +321,19 @@ message AddObjectLocationRequest { bytes object_id = 1; // The location that will be added to GCS Service. bytes node_id = 2; + // The spilled URL that will be added to GCS Service. Either this or the node + // ID should be set. + string spilled_url = 3; } message AddObjectLocationReply { GcsStatus status = 1; } +message AddObjectSpilledUrlReply { + GcsStatus status = 1; +} + message RemoveObjectLocationRequest { // The ID of object which location will be removed from GCS Service. bytes object_id = 1; diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index d3faef5e9..21930c3dd 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -82,9 +82,6 @@ enum MessageType:int { SetResourceRequest, // Subscribe to Plasma updates SubscribePlasmaReady, - // Manually restore objects from external storage. - ForceRestoreSpilledObjectsRequest, - ForceRestoreSpilledObjectsReply, } table TaskExecutionSpecification { @@ -308,11 +305,3 @@ table ForceSpillObjectsRequest { table ForceSpillObjectsReply { } - -table ForceRestoreSpilledObjectsRequest { - // List of object IDs to be restored from external storage. - object_ids: [string]; -} - -table ForceRestoreSpilledObjectsReply { -} diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index cf0fff2aa..b0bf1a488 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -531,14 +531,9 @@ void NodeManager::HandleRequestObjectSpillage( }); } -void NodeManager::SpillObjects(const std::vector &objects_ids_to_spill, +void NodeManager::SpillObjects(const std::vector &objects_ids, std::function callback) { - std::vector objects_ids; - for (const auto &id : objects_ids_to_spill) { - // Do not spill already spilled objects. - if (spilled_objects_.count(id) == 0) { - objects_ids.push_back(id); - } + for (const auto &id : objects_ids) { // We should not spill an object that we are not the primary copy for. // TODO(swang): We should really return an error here but right now there // is a race condition where the raylet receives the owner's request to @@ -549,12 +544,6 @@ void NodeManager::SpillObjects(const std::vector &objects_ids_to_spill "the primary copy."; } } - if (objects_ids.empty()) { - if (callback) { - callback(Status::OK()); - } - return; - } worker_pool_.PopIOWorker([this, objects_ids, callback](std::shared_ptr io_worker) { rpc::SpillObjectsRequest request; @@ -569,50 +558,46 @@ void NodeManager::SpillObjects(const std::vector &objects_ids_to_spill if (!status.ok()) { RAY_LOG(ERROR) << "Failed to send object spilling request: " << status.ToString(); + if (callback) { + callback(status); + } } else { - RAY_CHECK(static_cast(r.spilled_objects_url_size()) == - objects_ids.size()); for (size_t i = 0; i < objects_ids.size(); ++i) { const ObjectID &object_id = objects_ids[i]; const std::string &object_url = r.spilled_objects_url(i); RAY_LOG(DEBUG) << "Object " << object_id << " spilled at " << object_url; - // TODO(suquark): write to object directory. - spilled_objects_[object_id] = object_url; - auto search = pinned_objects_.find(object_id); - if (search != pinned_objects_.end()) { - pinned_objects_.erase(search); - } else { - RAY_LOG(ERROR) << "The spilled object " << object_id.Hex() - << " is not pinned."; - } + // Write to object directory. Wait for the write to finish before + // releasing the object to make sure that the spilled object can + // be retrieved by other raylets. + RAY_CHECK_OK(gcs_client_->Objects().AsyncAddSpilledUrl( + object_id, object_url, [this, object_id, callback](Status status) { + RAY_CHECK_OK(status); + // Unpin the object. + // NOTE(swang): Due to a race condition, the object may not be in + // the map yet. In that case, the owner will respond to the + // WaitForObjectEvictionRequest and we will unpin the object + // then. + pinned_objects_.erase(object_id); + if (callback) { + callback(status); + } + })); } } - if (callback) { - callback(status); - } }); }); } -void NodeManager::RestoreSpilledObjects( - const std::vector &object_ids, +void NodeManager::AsyncRestoreSpilledObject( + const ObjectID &object_id, const std::string &object_url, std::function callback) { - std::vector object_urls; - object_urls.reserve(object_ids.size()); - for (const auto &object_id : object_ids) { - if (spilled_objects_.count(object_id) == 0) { - callback(Status::Invalid("No object URL recorded")); - return; - } - object_urls.push_back(spilled_objects_[object_id]); - } - worker_pool_.PopIOWorker([this, object_urls, + RAY_LOG(DEBUG) << "Restoring spilled object " << object_id << " from URL " + << object_url; + worker_pool_.PopIOWorker([this, object_url, callback](std::shared_ptr io_worker) { RAY_LOG(DEBUG) << "Sending restore spilled object request"; rpc::RestoreSpilledObjectsRequest request; - for (const auto &url : object_urls) { - request.add_spilled_objects_url(std::move(url)); - } + request.add_spilled_objects_url(std::move(object_url)); io_worker->rpc_client()->RestoreSpilledObjects( request, [this, callback, io_worker](const ray::Status &status, const rpc::RestoreSpilledObjectsReply &r) { @@ -1226,24 +1211,6 @@ void NodeManager::ProcessClientMessage(const std::shared_ptr & case protocol::MessageType::SubscribePlasmaReady: { ProcessSubscribePlasmaReady(client, message_data); } break; - case protocol::MessageType::ForceRestoreSpilledObjectsRequest: { - auto message = - flatbuffers::GetRoot(message_data); - std::vector object_ids = from_flatbuf(*message->object_ids()); - RestoreSpilledObjects(object_ids, [this, client](const ray::Status &status) { - flatbuffers::FlatBufferBuilder fbb; - flatbuffers::Offset reply = - protocol::CreateForceRestoreSpilledObjectsReply(fbb); - fbb.Finish(reply); - auto reply_status = client->WriteMessage( - static_cast(protocol::MessageType::ForceRestoreSpilledObjectsReply), - fbb.GetSize(), fbb.GetBufferPointer()); - if (!reply_status.ok()) { - // We failed to write to the client, so disconnect the client. - ProcessDisconnectClientMessage(client); - } - }); - } break; default: RAY_LOG(FATAL) << "Received unexpected message type " << message_type; } @@ -1515,23 +1482,15 @@ void NodeManager::ProcessFetchOrReconstructMessage( const auto refs = FlatbufferToObjectReference(*message->object_ids(), *message->owner_addresses()); if (message->fetch_only()) { - std::vector spilled_object_ids; for (const auto &ref : refs) { ObjectID object_id = ObjectID::FromBinary(ref.object_id()); // If only a fetch is required, then do not subscribe to the // dependencies to the task dependency manager. if (!task_dependency_manager_.CheckObjectLocal(object_id)) { - if (spilled_objects_.count(object_id) > 0) { - spilled_object_ids.push_back(object_id); - } else { - // Fetch the object if it's not already local. - RAY_CHECK_OK(object_manager_.Pull(object_id, ref.owner_address())); - } + // Fetch the object if it's not already local. + RAY_CHECK_OK(object_manager_.Pull(object_id, ref.owner_address())); } } - if (spilled_object_ids.size() > 0) { - RestoreSpilledObjects(spilled_object_ids); - } } else { // The values are needed. Add all requested objects to the list to // subscribe to in the task dependency manager. These objects will be @@ -2278,45 +2237,6 @@ void NodeManager::MarkObjectsAsFailed( } } -void NodeManager::TreatTaskAsFailedIfLost(const Task &task) { - const TaskSpecification &spec = task.GetTaskSpecification(); - RAY_LOG(DEBUG) << "Treating task " << spec.TaskId() - << " as failed if return values lost."; - // Loop over the return IDs (except the dummy ID) and check whether a - // location for the return ID exists. - int64_t num_returns = spec.NumReturns(); - if (spec.IsActorCreationTask()) { - // TODO(rkn): We subtract 1 to avoid the dummy ID. However, this leaks - // information about the TaskSpecification implementation. - num_returns -= 1; - } - // Use a shared flag to make sure that we only treat the task as failed at - // most once. This flag will get deallocated once all of the object table - // lookup callbacks are fired. - auto task_marked_as_failed = std::make_shared(false); - for (int64_t i = 0; i < num_returns; i++) { - const ObjectID object_id = spec.ReturnId(i); - // Lookup the return value's locations. - RAY_CHECK_OK(object_directory_->LookupLocations( - object_id, spec.CallerAddress(), - [this, task_marked_as_failed, task]( - const ray::ObjectID &object_id, - const std::unordered_set &clients) { - if (!*task_marked_as_failed) { - // Only process the object locations if we haven't already marked the - // task as failed. - if (clients.empty()) { - // The object does not exist on any nodes but has been created - // before, so the object has been lost. Mark the task as failed to - // prevent any tasks that depend on this object from hanging. - TreatTaskAsFailed(task, ErrorType::OBJECT_UNRECONSTRUCTABLE); - *task_marked_as_failed = true; - } - } - })); - } -} - void NodeManager::SubmitTask(const Task &task) { stats::TaskCountReceived().Record(1); const TaskSpecification &spec = task.GetTaskSpecification(); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index d72a071ab..f7b82b556 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -168,6 +168,14 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Get the port of the node manager rpc server. int GetServerPort() const { return node_manager_server_.GetPort(); } + /// Restore a spilled object from external storage back into local memory. + /// \param object_id The ID of the object to restore. + /// \param object_url The URL in external storage from which the object can be restored. + /// \param callback A callback to call when the restoration is done. Status + /// will contain the error during restoration, if any. + void AsyncRestoreSpilledObject(const ObjectID &object_id, const std::string &object_url, + std::function callback); + private: /// Methods for handling clients. @@ -260,14 +268,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { void MarkObjectsAsFailed(const ErrorType &error_type, const std::vector object_ids, const JobID &job_id); - /// This is similar to TreatTaskAsFailed, but it will only mark the task as - /// failed if at least one of the task's return values is lost. A return - /// value is lost if it has been created before, but no longer exists on any - /// nodes, due to either node failure or eviction. - /// - /// \param task The task to potentially fail. - /// \return Void. - void TreatTaskAsFailedIfLost(const Task &task); /// Handle specified task's submission to the local node manager. /// /// \param task The task being submitted. @@ -658,11 +658,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { void SpillObjects(const std::vector &objects_ids_to_spill, std::function callback = nullptr); - /// Restore spilled objects from external storage. - /// \param object_ids Objects to be restored. - void RestoreSpilledObjects(const std::vector &object_ids, - std::function callback = nullptr); - /// Push an error to the driver if this node is full of actors and so we are /// unable to schedule new tasks or actors at all. void WarnResourceDeadlock(); @@ -754,9 +749,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// A mapping from actor ID to registration information about that actor /// (including which node manager owns it). std::unordered_map actor_registry_; - /// A mapping from ObjectIDs to external object URLs for spilled objects. - /// TODO(suquark): Move it into object directory. - absl::flat_hash_map spilled_objects_; /// This map stores actor ID to the ID of the checkpoint that will be used to /// restore the actor. std::unordered_map checkpoint_id_to_restore_; diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index b63513b53..32dbab8aa 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -69,8 +69,12 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ gcs_client_)) : std::dynamic_pointer_cast( std::make_shared(main_service, gcs_client_))), - object_manager_(main_service, self_node_id_, object_manager_config, - object_directory_), + object_manager_( + main_service, self_node_id_, object_manager_config, object_directory_, + [this](const ObjectID &object_id, const std::string &spilled_url, + std::function callback) { + node_manager_.AsyncRestoreSpilledObject(object_id, spilled_url, callback); + }), node_manager_(main_service, self_node_id_, node_manager_config, object_manager_, gcs_client_, object_directory_), socket_name_(socket_name), diff --git a/src/ray/raylet/reconstruction_policy.cc b/src/ray/raylet/reconstruction_policy.cc index 98b49dca4..fb0fa594a 100644 --- a/src/ray/raylet/reconstruction_policy.cc +++ b/src/ray/raylet/reconstruction_policy.cc @@ -179,8 +179,9 @@ void ReconstructionPolicy::HandleTaskLeaseExpired(const TaskID &task_id) { created_object_id, it->second.owner_addresses[created_object_id], [this, task_id, reconstruction_attempt]( const ray::ObjectID &object_id, - const std::unordered_set &clients) { - if (clients.empty()) { + const std::unordered_set &clients, + const std::string &spilled_url) { + if (clients.empty() && spilled_url.empty()) { // The required object no longer exists on any live nodes. Attempt // reconstruction. AttemptReconstruction(task_id, object_id, reconstruction_attempt); diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index bb4168466..698dd3f2d 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -56,9 +56,9 @@ class MockObjectDirectory : public ObjectDirectoryInterface { const ObjectID object_id = callback.first; auto it = locations_.find(object_id); if (it == locations_.end()) { - callback.second(object_id, std::unordered_set()); + callback.second(object_id, std::unordered_set(), ""); } else { - callback.second(object_id, it->second); + callback.second(object_id, it->second, ""); } } callbacks_.clear(); diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 80f174810..8a7e57044 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -172,7 +172,8 @@ Process WorkerPool::StartWorkerProcess(const Language &language, const JobID &job_id, std::vector dynamic_options) { rpc::JobConfig *job_config = nullptr; - if (RayConfig::instance().enable_multi_tenancy()) { + if (RayConfig::instance().enable_multi_tenancy() && + worker_type != rpc::WorkerType::IO_WORKER) { RAY_CHECK(!job_id.IsNil()); auto it = unfinished_jobs_.find(job_id); if (it == unfinished_jobs_.end()) { @@ -213,15 +214,15 @@ Process WorkerPool::StartWorkerProcess(const Language &language, } } - if (RayConfig::instance().enable_multi_tenancy() && - !job_config->jvm_options().empty()) { - // Note that we push the item to the front of the vector to make - // sure this is the freshest option than others. - dynamic_options.insert(dynamic_options.begin(), job_config->jvm_options().begin(), - job_config->jvm_options().end()); - } // For non-multi-tenancy mode, job code search path is embedded in worker_command. if (RayConfig::instance().enable_multi_tenancy() && job_config) { + // Note that we push the item to the front of the vector to make + // sure this is the freshest option than others. + if (!job_config->jvm_options().empty()) { + dynamic_options.insert(dynamic_options.begin(), job_config->jvm_options().begin(), + job_config->jvm_options().end()); + } + std::string code_search_path_str; for (int i = 0; i < job_config->code_search_path_size(); i++) { auto path = job_config->code_search_path(i); @@ -314,11 +315,11 @@ Process WorkerPool::StartWorkerProcess(const Language &language, } ProcessEnvironment env; - if (RayConfig::instance().enable_multi_tenancy()) { + if (RayConfig::instance().enable_multi_tenancy() && job_config) { env.insert(job_config->worker_env().begin(), job_config->worker_env().end()); } Process proc = StartProcess(worker_command_args, env); - if (RayConfig::instance().enable_multi_tenancy()) { + if (RayConfig::instance().enable_multi_tenancy() && job_config) { // If the pid is reused between processes, the old process must have exited. // So it's safe to bind the pid with another job ID. RAY_LOG(DEBUG) << "Worker process " << proc.GetId() << " is bound to job " << job_id; @@ -464,7 +465,8 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr &worker state.registered_workers.insert(worker); - if (RayConfig::instance().enable_multi_tenancy()) { + if (RayConfig::instance().enable_multi_tenancy() && + worker->GetWorkerType() != rpc::WorkerType::IO_WORKER) { auto dedicated_workers_it = state.worker_pids_to_assigned_jobs.find(pid); RAY_CHECK(dedicated_workers_it != state.worker_pids_to_assigned_jobs.end()); auto job_id = dedicated_workers_it->second; diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 35b5f5dee..e706d74af 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -331,23 +331,6 @@ void raylet::RayletClient::RequestObjectSpillage( grpc_client_->RequestObjectSpillage(request, callback); } -/// Restore spilled objects from external storage. -/// \param object_ids The IDs of objects to be restored. -Status raylet::RayletClient::ForceRestoreSpilledObjects( - const std::vector &object_ids) { - flatbuffers::FlatBufferBuilder fbb; - auto message = - protocol::CreateForceRestoreSpilledObjectsRequest(fbb, to_flatbuf(fbb, object_ids)); - fbb.Finish(message); - std::vector reply; - RAY_RETURN_NOT_OK(conn_->AtomicRequestReply( - MessageType::ForceRestoreSpilledObjectsRequest, - MessageType::ForceRestoreSpilledObjectsReply, &reply, &fbb)); - RAY_UNUSED( - flatbuffers::GetRoot(reply.data())); - return Status::OK(); -} - Status raylet::RayletClient::ReturnWorker(int worker_port, const WorkerID &worker_id, bool disconnect_worker) { rpc::ReturnWorkerRequest request; diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index 8176def5b..6cbdd2e53 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -341,11 +341,6 @@ class RayletClient : public PinObjectsInterface, const ObjectID &object_id, const rpc::ClientCallback &callback); - /// Restore spilled objects from external storage. - /// \param object_ids The IDs of objects to be restored. - /// \return ray::Status - ray::Status ForceRestoreSpilledObjects(const std::vector &object_ids); - /// Implements WorkerLeaseInterface. void RequestWorkerLease( const ray::TaskSpecification &resource_spec,