diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index b3c6dae49..dead48d95 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1460,7 +1460,7 @@ cdef class CoreWorker: object_ids = ObjectRefsToVector(object_refs) with nogil: check_status(CCoreWorkerProcess.GetCoreWorker() - .ForceSpillObjects(object_ids)) + .SpillObjects(object_ids)) def force_restore_spilled_objects(self, object_refs): cdef c_vector[CObjectID] object_ids diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index a33de9ca8..c2cb1f575 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -195,7 +195,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus SetResource(const c_string &resource_name, const double capacity, const CClientID &client_Id) - CRayStatus ForceSpillObjects(const c_vector[CObjectID] &object_ids) + CRayStatus SpillObjects(const c_vector[CObjectID] &object_ids) CRayStatus ForceRestoreSpilledObjects( const c_vector[CObjectID] &object_ids) diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index cce0320ef..2d487bb7b 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -107,6 +107,53 @@ def test_spill_objects_manually_with_workers(shutdown_only): assert np.array_equal(restored, arr) +@pytest.mark.parametrize( + "ray_start_cluster_head", [{ + "num_cpus": 0, + "object_store_memory": 75 * 1024 * 1024, + "object_spilling_config": { + "type": "filesystem", + "params": { + "directory_path": "/tmp" + } + }, + "_internal_config": json.dumps({ + "object_store_full_max_retries": 0, + "max_io_workers": 4, + }), + }], + indirect=True) +def test_spill_remote_object(ray_start_cluster_head): + cluster = ray_start_cluster_head + cluster.add_node( + object_store_memory=75 * 1024 * 1024, + object_spilling_config={ + "type": "filesystem", + "params": { + "directory_path": "/tmp" + } + }) + + @ray.remote + def put(): + return np.random.rand(5 * 1024 * 1024) # 40 MB data + + # Create 2 objects. Only 1 should fit. + ref = put.remote() + ray.get(ref) + with pytest.raises(ray.exceptions.RayTaskError): + ray.get(put.remote()) + time.sleep(1) + # Spill 1 object. The second should now fit. + ray.experimental.force_spill_objects([ref]) + ray.get(put.remote()) + + # TODO(swang): Restoring from the object directory is not yet supported. + # ray.experimental.force_restore_spilled_objects([ref]) + # sample = ray.get(ref) + # assert np.array_equal(sample, copy) + + @pytest.mark.skip(reason="have not been fully implemented") def test_spill_objects_automatically(shutdown_only): # Limit our object store to 75 MiB of memory. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 48282e486..30d9cbf16 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1174,8 +1174,85 @@ Status CoreWorker::SetResource(const std::string &resource_name, const double ca return local_raylet_client_->SetResource(resource_name, capacity, client_id); } -Status CoreWorker::ForceSpillObjects(const std::vector &object_ids) { - return local_raylet_client_->ForceSpillObjects(object_ids); +void CoreWorker::SpillOwnedObject(const ObjectID &object_id, + const std::shared_ptr &obj, + std::function callback) { + if (!obj->IsInPlasmaError()) { + RAY_LOG(ERROR) << "Cannot spill inlined object " << object_id; + callback(); + return; + } + + // Find the raylet that hosts the primary copy of the object. + ClientID pinned_at; + RAY_CHECK(reference_counter_->IsPlasmaObjectPinned(object_id, &pinned_at)); + auto node = gcs_client_->Nodes().Get(pinned_at); + if (pinned_at.IsNil() || !node) { + RAY_LOG(ERROR) << "Primary raylet for object " << object_id << " unreachable"; + callback(); + return; + } + + // Ask the raylet to spill the object. + auto raylet_client = + std::make_shared(rpc::NodeManagerWorkerClient::make( + node->node_manager_address(), node->node_manager_port(), + *client_call_manager_)); + raylet_client->RequestObjectSpillage( + object_id, [object_id, callback](const Status &status, + const rpc::RequestObjectSpillageReply &reply) { + if (!status.ok() || !reply.success()) { + RAY_LOG(ERROR) << "Failed to spill object " << object_id + << ", raylet unreachable or object could not be spilled."; + } + callback(); + }); +} + +Status CoreWorker::SpillObjects(const std::vector &object_ids) { + auto mutex = std::make_shared(); + auto num_remaining = std::make_shared(object_ids.size()); + auto ready_promise = std::make_shared>(std::promise()); + Status final_status; + + auto callback = [mutex, num_remaining, ready_promise]() { + absl::MutexLock lock(mutex.get()); + (*num_remaining)--; + if (*num_remaining == 0) { + ready_promise->set_value(); + } + }; + + for (const auto &object_id : object_ids) { + RAY_LOG(DEBUG) << "Requesting spill for object " << object_id; + // Acquire a temporary reference to make sure that the object is still in + // scope by the time we register the callback to spill the object. + // Otherwise, the callback may never get called. + AddLocalReference(object_id, ""); + + rpc::Address owner_address; + auto has_owner = reference_counter_->GetOwner(object_id, &owner_address); + if (!has_owner) { + final_status = + Status::Invalid("Cannot call spill on objects that have gone out of scope."); + callback(); + } else if (WorkerID::FromBinary(owner_address.worker_id()) != + worker_context_.GetWorkerID()) { + final_status = Status::Invalid("Cannot call spill on objects that we do not own."); + callback(); + } else { + memory_store_->GetAsync( + object_id, [this, object_id, callback](std::shared_ptr obj) { + SpillOwnedObject(object_id, obj, callback); + }); + } + + // Remove the temporary reference. + RemoveLocalReference(object_id); + } + + ready_promise->get_future().wait(); + return final_status; } Status CoreWorker::ForceRestoreSpilledObjects(const std::vector &object_ids) { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 4fbe30068..387700933 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -586,10 +586,13 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { Status SetResource(const std::string &resource_name, const double capacity, const ClientID &client_id); - /// Force spilling objects to external storage. + /// Request an object to be spilled to external storage. /// \param[in] object_ids The objects to be spilled. - /// \return Status - Status ForceSpillObjects(const std::vector &object_ids); + /// \return Status. Returns Status::Invalid if any of the objects are not + /// eligible for spilling (they have gone out of scope or we do not own the + /// object). Otherwise, the return status is ok and we will use best effort + /// to spill the object. + Status SpillObjects(const std::vector &object_ids); /// Restore objects from external storage. /// \param[in] object_ids The objects to be restored. @@ -996,6 +999,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Handler if a raylet node is removed from the cluster. void OnNodeRemoved(const rpc::GcsNodeInfo &node_info); + /// Request the spillage of an object that we own from the primary that hosts + /// the primary copy to spill. + void SpillOwnedObject(const ObjectID &object_id, const std::shared_ptr &obj, + std::function callback); + const CoreWorkerOptions options_; /// Callback to get the current language (e.g., Python) call site. diff --git a/src/ray/core_worker/object_recovery_manager.cc b/src/ray/core_worker/object_recovery_manager.cc index 92beaa21f..6e7004e4d 100644 --- a/src/ray/core_worker/object_recovery_manager.cc +++ b/src/ray/core_worker/object_recovery_manager.cc @@ -20,8 +20,8 @@ namespace ray { Status ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { // Check the ReferenceCounter to see if there is a location for the object. - bool pinned = false; - bool owned_by_us = reference_counter_->IsPlasmaObjectPinned(object_id, &pinned); + ClientID pinned_at; + bool owned_by_us = reference_counter_->IsPlasmaObjectPinned(object_id, &pinned_at); if (!owned_by_us) { return Status::Invalid( "Object reference no longer exists or is not owned by us. Either lineage pinning " @@ -29,7 +29,7 @@ Status ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) { } bool already_pending_recovery = true; - if (!pinned) { + if (pinned_at.IsNil()) { { absl::MutexLock lock(&mu_); // Mark that we are attempting recovery for this object to prevent diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index d1bbfe9dd..2376799ee 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -539,12 +539,12 @@ void ReferenceCounter::UpdateObjectPinnedAtRaylet(const ObjectID &object_id, } bool ReferenceCounter::IsPlasmaObjectPinned(const ObjectID &object_id, - bool *pinned) const { + ClientID *pinned_at) const { absl::MutexLock lock(&mutex_); auto it = object_id_refs_.find(object_id); if (it != object_id_refs_.end()) { if (it->second.owned_by_us) { - *pinned = it->second.pinned_at_raylet_id.has_value(); + *pinned_at = it->second.pinned_at_raylet_id.value_or(ClientID::Nil()); return true; } } diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index 812e65ce0..1b40874c3 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -327,12 +327,12 @@ class ReferenceCounter : public ReferenceCounterInterface { /// Check whether the object is pinned at a remote plasma store node. /// /// \param[in] object_id The object to check. - /// \param[out] pinned Whether the object was pinned at a remote plasma store - /// node. + /// \param[out] pinned_at The node ID of the raylet at which this object is + /// pinned. Set to nil if the object is not pinned. /// \return True if the object exists and is owned by us, false otherwise. We /// return false here because a borrower should not know the pinned location /// for an object. - bool IsPlasmaObjectPinned(const ObjectID &object_id, bool *pinned) const + bool IsPlasmaObjectPinned(const ObjectID &object_id, ClientID *pinned_at) const LOCKS_EXCLUDED(mutex_); /// Get and reset the objects that were pinned on the given node. This diff --git a/src/ray/core_worker/reference_count_test.cc b/src/ray/core_worker/reference_count_test.cc index ea17419b5..605233439 100644 --- a/src/ray/core_worker/reference_count_test.cc +++ b/src/ray/core_worker/reference_count_test.cc @@ -1986,22 +1986,22 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { ObjectID borrowed_id = ObjectID::FromRandom(); rc->AddLocalReference(borrowed_id, ""); - bool pinned = false; - ASSERT_FALSE(rc->IsPlasmaObjectPinned(borrowed_id, &pinned)); + ClientID pinned_at; + ASSERT_FALSE(rc->IsPlasmaObjectPinned(borrowed_id, &pinned_at)); ObjectID id = ObjectID::FromRandom(); ClientID node_id = ClientID::FromRandom(); rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true); rc->AddLocalReference(id, ""); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); - ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned)); - ASSERT_FALSE(pinned); + ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned_at)); + ASSERT_TRUE(pinned_at.IsNil()); rc->UpdateObjectPinnedAtRaylet(id, node_id); - ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned)); - ASSERT_TRUE(pinned); + ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned_at)); + ASSERT_FALSE(pinned_at.IsNil()); rc->RemoveLocalReference(id, nullptr); - ASSERT_FALSE(rc->IsPlasmaObjectPinned(id, &pinned)); + ASSERT_FALSE(rc->IsPlasmaObjectPinned(id, &pinned_at)); ASSERT_TRUE(deleted->count(id) > 0); deleted->clear(); @@ -2012,8 +2012,8 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { auto objects = rc->ResetObjectsOnRemovedNode(node_id); ASSERT_EQ(objects.size(), 1); ASSERT_EQ(objects[0], id); - ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned)); - ASSERT_FALSE(pinned); + ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned_at)); + ASSERT_TRUE(pinned_at.IsNil()); ASSERT_TRUE(deleted->count(id) > 0); deleted->clear(); } @@ -2034,9 +2034,9 @@ TEST_F(ReferenceCountTest, TestFree) { ASSERT_FALSE(rc->SetDeleteCallback(id, callback)); ASSERT_EQ(deleted->count(id), 0); rc->UpdateObjectPinnedAtRaylet(id, node_id); - bool pinned = true; - ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned)); - ASSERT_FALSE(pinned); + ClientID pinned_at; + ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned_at)); + ASSERT_TRUE(pinned_at.IsNil()); ASSERT_TRUE(rc->IsPlasmaObjectFreed(id)); rc->RemoveLocalReference(id, nullptr); ASSERT_FALSE(rc->IsPlasmaObjectFreed(id)); @@ -2050,8 +2050,8 @@ TEST_F(ReferenceCountTest, TestFree) { rc->FreePlasmaObjects({id}); ASSERT_TRUE(rc->IsPlasmaObjectFreed(id)); ASSERT_TRUE(deleted->count(id) > 0); - ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned)); - ASSERT_FALSE(pinned); + ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned_at)); + ASSERT_TRUE(pinned_at.IsNil()); rc->RemoveLocalReference(id, nullptr); ASSERT_FALSE(rc->IsPlasmaObjectFreed(id)); } diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 121d71d3d..2c288efe5 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -185,10 +185,10 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, if (return_object.in_plasma()) { const auto pinned_at_raylet_id = ClientID::FromBinary(worker_addr.raylet_id()); if (check_node_alive_(pinned_at_raylet_id)) { + reference_counter_->UpdateObjectPinnedAtRaylet(object_id, pinned_at_raylet_id); // Mark it as in plasma with a dummy object. RAY_CHECK(in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id)); - reference_counter_->UpdateObjectPinnedAtRaylet(object_id, pinned_at_raylet_id); } else { RAY_LOG(INFO) << "Task " << task_id << " returned object " << object_id << " in plasma on a dead node, attempting to recover"; diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index e720c534c..c33f0abcb 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -139,6 +139,16 @@ message FormatGlobalMemoryInfoReply { string memory_summary = 1; } +message RequestObjectSpillageRequest { + // ObjectID to spill. + bytes object_id = 1; +} + +message RequestObjectSpillageReply { + // Whether the object spilling was successful or not. + bool success = 1; +} + // Service for inter-node-manager communication. service NodeManagerService { // Request a worker from the raylet. @@ -169,4 +179,7 @@ service NodeManagerService { // Get global object reference stats in formatted form. rpc FormatGlobalMemoryInfo(FormatGlobalMemoryInfoRequest) returns (FormatGlobalMemoryInfoReply); + // Ask the raylet to spill an object to external storage. + rpc RequestObjectSpillage(RequestObjectSpillageRequest) + returns (RequestObjectSpillageReply); } diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index 34cda671c..2d401c0ab 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -82,9 +82,6 @@ enum MessageType:int { SetResourceRequest, // Subscribe to Plasma updates SubscribePlasmaReady, - // Manually spill objects to external storage. - ForceSpillObjectsRequest, - ForceSpillObjectsReply, // Manually restore objects from external storage. ForceRestoreSpilledObjectsRequest, ForceRestoreSpilledObjectsReply, diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 55917b457..e5b2b659b 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -506,6 +506,18 @@ void NodeManager::DoLocalGC() { } } +void NodeManager::HandleRequestObjectSpillage( + const rpc::RequestObjectSpillageRequest &request, + rpc::RequestObjectSpillageReply *reply, rpc::SendReplyCallback send_reply_callback) { + SpillObjects({ObjectID::FromBinary(request.object_id())}, + [reply, send_reply_callback](const ray::Status &status) { + if (status.ok()) { + reply->set_success(true); + } + send_reply_callback(Status::OK(), nullptr, nullptr); + }); +} + void NodeManager::SpillObjects(const std::vector &objects_ids_to_spill, std::function callback) { std::vector objects_ids; @@ -514,6 +526,15 @@ void NodeManager::SpillObjects(const std::vector &objects_ids_to_spill if (spilled_objects_.count(id) == 0) { objects_ids.push_back(id); } + // We should not spill an object that we are not the primary copy for. + // TODO(swang): We should really return an error here but right now there + // is a race condition where the raylet receives the owner's request to + // spill an object before it receives the message to pin the objects from + // the local worker. + if (pinned_objects_.count(id) == 0) { + RAY_LOG(WARNING) << "Requested spill for object that has not yet been marked as " + "the primary copy"; + } } if (objects_ids.empty()) { if (callback) { @@ -521,42 +542,43 @@ void NodeManager::SpillObjects(const std::vector &objects_ids_to_spill } return; } - worker_pool_.PopIOWorker( - [this, objects_ids, callback](std::shared_ptr io_worker) { - RAY_LOG(DEBUG) << "Sending object spilling request"; - rpc::SpillObjectsRequest request; - for (const auto &object_id : objects_ids) { - request.add_object_ids_to_spill(object_id.Binary()); - } - io_worker->rpc_client()->SpillObjects( - request, [this, objects_ids, callback, io_worker]( - const ray::Status &status, const rpc::SpillObjectsReply &r) { - worker_pool_.PushIOWorker(io_worker); - if (!status.ok()) { - RAY_LOG(ERROR) << "Failed to send object spilling request: " - << status.ToString(); + worker_pool_.PopIOWorker([this, objects_ids, + callback](std::shared_ptr io_worker) { + rpc::SpillObjectsRequest request; + for (const auto &object_id : objects_ids) { + RAY_LOG(DEBUG) << "Sending spill request for object " << object_id; + request.add_object_ids_to_spill(object_id.Binary()); + } + io_worker->rpc_client()->SpillObjects( + request, [this, objects_ids, callback, io_worker]( + const ray::Status &status, const rpc::SpillObjectsReply &r) { + worker_pool_.PushIOWorker(io_worker); + if (!status.ok()) { + RAY_LOG(ERROR) << "Failed to send object spilling request: " + << status.ToString(); + } else { + RAY_CHECK(static_cast(r.spilled_objects_url_size()) == + objects_ids.size()); + for (size_t i = 0; i < objects_ids.size(); ++i) { + const ObjectID &object_id = objects_ids[i]; + const std::string &object_url = r.spilled_objects_url(i); + RAY_LOG(DEBUG) << "Object " << object_id << " spilled at " << object_url; + // TODO(suquark): write to object directory. + spilled_objects_[object_id] = object_url; + auto search = pinned_objects_.find(object_id); + if (search != pinned_objects_.end()) { + pinned_objects_.erase(search); } else { - RAY_CHECK(static_cast(r.spilled_objects_url_size()) == - objects_ids.size()); - for (size_t i = 0; i < objects_ids.size(); ++i) { - const ObjectID &object_id = objects_ids[i]; - const std::string &object_url = r.spilled_objects_url(i); - // TODO(suquark): write to object directory. - spilled_objects_[object_id] = object_url; - auto search = pinned_objects_.find(object_id); - if (search != pinned_objects_.end()) { - pinned_objects_.erase(search); - } else { - RAY_LOG(ERROR) - << "The spilled object " << object_id.Hex() << " is not pinned."; - } - } + RAY_LOG(ERROR) << "The spilled object " << object_id.Hex() + << " is not pinned."; } - if (callback) { - callback(status); - } - }); - }); + } + } + if (callback) { + callback(status); + } + }); + }); } void NodeManager::RestoreSpilledObjects( @@ -565,6 +587,10 @@ void NodeManager::RestoreSpilledObjects( std::vector object_urls; object_urls.reserve(object_ids.size()); for (const auto &object_id : object_ids) { + if (spilled_objects_.count(object_id) == 0) { + callback(Status::Invalid("No object URL recorded")); + return; + } object_urls.push_back(spilled_objects_[object_id]); } worker_pool_.PopIOWorker([this, object_urls, @@ -1226,23 +1252,6 @@ void NodeManager::ProcessClientMessage(const std::shared_ptr & case protocol::MessageType::SubscribePlasmaReady: { ProcessSubscribePlasmaReady(client, message_data); } break; - case protocol::MessageType::ForceSpillObjectsRequest: { - auto message = flatbuffers::GetRoot(message_data); - std::vector object_ids = from_flatbuf(*message->object_ids()); - SpillObjects(object_ids, [this, client](const ray::Status &status) { - flatbuffers::FlatBufferBuilder fbb; - flatbuffers::Offset reply = - protocol::CreateForceSpillObjectsReply(fbb); - fbb.Finish(reply); - auto reply_status = client->WriteMessage( - static_cast(protocol::MessageType::ForceSpillObjectsReply), - fbb.GetSize(), fbb.GetBufferPointer()); - if (!reply_status.ok()) { - // We failed to write to the client, so disconnect the client. - ProcessDisconnectClientMessage(client); - } - }); - } break; case protocol::MessageType::ForceRestoreSpilledObjectsRequest: { auto message = flatbuffers::GetRoot(message_data); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 15192640e..612d7004b 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -607,6 +607,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler { rpc::FormatGlobalMemoryInfoReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Handle a `RequestObjectSpillage` request. + void HandleRequestObjectSpillage(const rpc::RequestObjectSpillageRequest &request, + rpc::RequestObjectSpillageReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Trigger global GC across the cluster to free up references to actors or /// object ids. void TriggerGlobalGC(); diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 545569280..f289bdb20 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -311,18 +311,12 @@ void raylet::RayletClient::RequestWorkerLease( } /// Spill objects to external storage. -/// \param object_ids The IDs of objects to be spilled. -Status raylet::RayletClient::ForceSpillObjects(const std::vector &object_ids) { - flatbuffers::FlatBufferBuilder fbb; - auto message = - protocol::CreateForceSpillObjectsRequest(fbb, to_flatbuf(fbb, object_ids)); - fbb.Finish(message); - std::vector reply; - RAY_RETURN_NOT_OK(conn_->AtomicRequestReply(MessageType::ForceSpillObjectsRequest, - MessageType::ForceSpillObjectsReply, &reply, - &fbb)); - RAY_UNUSED(flatbuffers::GetRoot(reply.data())); - return Status::OK(); +void raylet::RayletClient::RequestObjectSpillage( + const ObjectID &object_id, + const rpc::ClientCallback &callback) { + rpc::RequestObjectSpillageRequest request; + request.set_object_id(object_id.Binary()); + grpc_client_->RequestObjectSpillage(request, callback); } /// Restore spilled objects from external storage. diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index ddf80c6ea..d84a66c29 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -319,10 +319,13 @@ class RayletClient : public PinObjectsInterface, ray::Status SetResource(const std::string &resource_name, const double capacity, const ray::ClientID &client_Id); - /// Spill objects to external storage. - /// \param object_ids The IDs of objects to be spilled. - /// \return ray::Status - ray::Status ForceSpillObjects(const std::vector &object_ids); + /// Ask the raylet to spill an object to external storage. + /// \param object_id The ID of the object to be spilled. + /// \param callback Callback that will be called after raylet completes the + /// object spilling (or it fails). + void RequestObjectSpillage( + const ObjectID &object_id, + const rpc::ClientCallback &callback); /// Restore spilled objects from external storage. /// \param object_ids The IDs of objects to be restored. diff --git a/src/ray/rpc/node_manager/node_manager_client.h b/src/ray/rpc/node_manager/node_manager_client.h index e6fe99d50..3dc1cd4e6 100644 --- a/src/ray/rpc/node_manager/node_manager_client.h +++ b/src/ray/rpc/node_manager/node_manager_client.h @@ -94,6 +94,9 @@ class NodeManagerWorkerClient /// Trigger global GC across the cluster. VOID_RPC_CLIENT_METHOD(NodeManagerService, GlobalGC, grpc_client_, ) + /// Ask the raylet to spill an object to external storage. + VOID_RPC_CLIENT_METHOD(NodeManagerService, RequestObjectSpillage, grpc_client_, ) + private: /// Constructor. /// diff --git a/src/ray/rpc/node_manager/node_manager_server.h b/src/ray/rpc/node_manager/node_manager_server.h index 66783976e..f383dc9d8 100644 --- a/src/ray/rpc/node_manager/node_manager_server.h +++ b/src/ray/rpc/node_manager/node_manager_server.h @@ -33,7 +33,8 @@ namespace rpc { RPC_SERVICE_HANDLER(NodeManagerService, GlobalGC) \ RPC_SERVICE_HANDLER(NodeManagerService, FormatGlobalMemoryInfo) \ RPC_SERVICE_HANDLER(NodeManagerService, RequestResourceReserve) \ - RPC_SERVICE_HANDLER(NodeManagerService, CancelResourceReserve) + RPC_SERVICE_HANDLER(NodeManagerService, CancelResourceReserve) \ + RPC_SERVICE_HANDLER(NodeManagerService, RequestObjectSpillage) /// Interface of the `NodeManagerService`, see `src/ray/protobuf/node_manager.proto`. class NodeManagerServiceHandler { @@ -89,6 +90,10 @@ class NodeManagerServiceHandler { virtual void HandleFormatGlobalMemoryInfo(const FormatGlobalMemoryInfoRequest &request, FormatGlobalMemoryInfoReply *reply, SendReplyCallback send_reply_callback) = 0; + + virtual void HandleRequestObjectSpillage(const RequestObjectSpillageRequest &request, + RequestObjectSpillageReply *reply, + SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `NodeManagerService`.