From a82fa80f7b00863d1732d7e74ba6b63b383f7a90 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Thu, 21 Jan 2021 10:15:18 -0700 Subject: [PATCH] Inline small objects in GetObjectStatus response. (#13309) --- python/ray/_raylet.pyx | 7 +-- python/ray/includes/libcoreworker.pxd | 3 +- python/ray/tests/test_advanced.py | 37 ++++++++++++++ src/ray/core_worker/core_worker.cc | 48 +++++++++++++----- src/ray/core_worker/core_worker.h | 4 +- src/ray/core_worker/future_resolver.cc | 69 +++++++++++++++++--------- src/ray/core_worker/future_resolver.h | 1 + src/ray/protobuf/core_worker.proto | 12 +++++ 8 files changed, 140 insertions(+), 41 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 8ba80852f..4b5f9deee 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -898,16 +898,17 @@ cdef class CoreWorker: return RayObjectsToDataMetadataPairs(results) - def object_exists(self, ObjectRef object_ref): + def object_exists(self, ObjectRef object_ref, memory_store_only=False): cdef: c_bool has_object + c_bool is_in_plasma CObjectID c_object_id = object_ref.native() with nogil: check_status(CCoreWorkerProcess.GetCoreWorker().Contains( - c_object_id, &has_object)) + c_object_id, &has_object, &is_in_plasma)) - return has_object + return has_object and (not memory_store_only or not is_in_plasma) cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata, size_t data_size, ObjectRef object_ref, diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index f1acad1fa..637dbd750 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -183,7 +183,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms, c_vector[shared_ptr[CRayObject]] *results, c_bool plasma_objects_only) - CRayStatus Contains(const CObjectID &object_id, c_bool *has_object) + CRayStatus Contains(const CObjectID &object_id, c_bool *has_object, + c_bool *is_in_plasma) CRayStatus Wait(const c_vector[CObjectID] &object_ids, int num_objects, int64_t timeout_ms, c_vector[c_bool] *results, c_bool fetch_local) diff --git a/python/ray/tests/test_advanced.py b/python/ray/tests/test_advanced.py index 6df746fdc..8f607009e 100644 --- a/python/ray/tests/test_advanced.py +++ b/python/ray/tests/test_advanced.py @@ -521,6 +521,43 @@ def test_wait_makes_object_local(ray_start_cluster): assert ray.worker.global_worker.core_worker.object_exists(x_id) +@pytest.mark.skipif(client_test_enabled(), reason="internal api") +def test_future_resolution_skip_plasma(ray_start_cluster): + cluster = ray_start_cluster + # Disable worker caching so worker leases are not reused; set object + # inlining size threshold and enable storing of small objects in in-memory + # object store so the borrowed ref is inlined. + cluster.add_node( + num_cpus=1, + resources={"pin_head": 1}, + _system_config={ + "worker_lease_timeout_milliseconds": 0, + "max_direct_call_object_size": 100 * 1024, + "put_small_object_in_memory_store": True, + }, + ) + cluster.add_node(num_cpus=1, resources={"pin_worker": 1}) + ray.init(address=cluster.address) + + @ray.remote(resources={"pin_head": 1}) + def f(x): + return x + 1 + + @ray.remote(resources={"pin_worker": 1}) + def g(x): + borrowed_ref = x[0] + f_ref = f.remote(borrowed_ref) + # borrowed_ref should be inlined on future resolution and shouldn't be + # in Plasma. + assert ray.worker.global_worker.core_worker.object_exists( + borrowed_ref, memory_store_only=True) + return ray.get(f_ref) * 2 + + one = ray.put(1) + g_ref = g.remote([one]) + assert ray.get(g_ref) == 4 + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 21fc462a7..dfbe8ef2c 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1058,7 +1058,8 @@ Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_m return Status::OK(); } -Status CoreWorker::Contains(const ObjectID &object_id, bool *has_object) { +Status CoreWorker::Contains(const ObjectID &object_id, bool *has_object, + bool *is_in_plasma) { bool found = false; bool in_plasma = false; found = memory_store_->Contains(object_id, &in_plasma); @@ -1066,6 +1067,9 @@ Status CoreWorker::Contains(const ObjectID &object_id, bool *has_object) { RAY_RETURN_NOT_OK(plasma_store_provider_->Contains(object_id, &found)); } *has_object = found; + if (is_in_plasma != nullptr) { + *is_in_plasma = found && in_plasma; + } return Status::OK(); } @@ -2091,25 +2095,43 @@ void CoreWorker::HandleGetObjectStatus(const rpc::GetObjectStatusRequest &reques send_reply_callback(Status::OK(), nullptr, nullptr); } else { RAY_CHECK(owner_address.worker_id() == request.owner_worker_id()); + bool is_freed = reference_counter_->IsPlasmaObjectFreed(object_id); - if (reference_counter_->IsPlasmaObjectFreed(object_id)) { - reply->set_status(rpc::GetObjectStatusReply::FREED); - } else { - reply->set_status(rpc::GetObjectStatusReply::CREATED); - } // Send the reply once the value has become available. The value is // guaranteed to become available eventually because we own the object and // its ref count is > 0. - // TODO(swang): We could probably just send the object value if it is small - // enough and we have it local. - memory_store_->GetAsync(object_id, - [send_reply_callback](std::shared_ptr obj) { - send_reply_callback(Status::OK(), nullptr, nullptr); - }); + memory_store_->GetAsync(object_id, [reply, send_reply_callback, + is_freed](std::shared_ptr obj) { + if (is_freed) { + reply->set_status(rpc::GetObjectStatusReply::FREED); + } else { + // If obj is the concrete object value, it is small, so we + // send the object back to the caller in the GetObjectStatus + // reply, bypassing a Plasma put and object transfer. If obj + // is an indicator that the object is in Plasma, we set an + // in_plasma indicator on the message, and the caller will + // have to facilitate a Plasma object transfer to get the + // object value. + auto *object = reply->mutable_object(); + if (obj->HasData()) { + const auto &data = obj->GetData(); + object->set_data(data->Data(), data->Size()); + } + if (obj->HasMetadata()) { + const auto &metadata = obj->GetMetadata(); + object->set_metadata(metadata->Data(), metadata->Size()); + } + for (const auto &nested_id : obj->GetNestedIds()) { + object->add_nested_inlined_ids(nested_id.Binary()); + } + reply->set_status(rpc::GetObjectStatusReply::CREATED); + } + send_reply_callback(Status::OK(), nullptr, nullptr); + }); } RemoveLocalReference(object_id); -} +} // namespace ray void CoreWorker::HandleWaitForActorOutOfScope( const rpc::WaitForActorOutOfScopeRequest &request, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 088ba346a..3002b9003 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -559,8 +559,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// /// \param[in] object_id ID of the objects to check for. /// \param[out] has_object Whether or not the object is present. + /// \param[out] is_in_plasma Whether or not the object is in Plasma. /// \return Status. - Status Contains(const ObjectID &object_id, bool *has_object); + Status Contains(const ObjectID &object_id, bool *has_object, + bool *is_in_plasma = nullptr); /// Wait for a list of objects to appear in the object store. /// Duplicate object ids are supported, and `num_objects` includes duplicate ids in this diff --git a/src/ray/core_worker/future_resolver.cc b/src/ray/core_worker/future_resolver.cc index 8a1cc3f07..c625507cd 100644 --- a/src/ray/core_worker/future_resolver.cc +++ b/src/ray/core_worker/future_resolver.cc @@ -28,30 +28,53 @@ void FutureResolver::ResolveFutureAsync(const ObjectID &object_id, rpc::GetObjectStatusRequest request; request.set_object_id(object_id.Binary()); request.set_owner_worker_id(owner_address.worker_id()); - conn->GetObjectStatus( - request, - [this, object_id](const Status &status, const rpc::GetObjectStatusReply &reply) { - if (!status.ok()) { - RAY_LOG(WARNING) << "Error retrieving the value of object ID " << object_id - << " that was deserialized: " << status.ToString(); - } + conn->GetObjectStatus(request, [this, object_id]( + const Status &status, + const rpc::GetObjectStatusReply &reply) { + if (!status.ok()) { + RAY_LOG(WARNING) << "Error retrieving the value of object ID " << object_id + << " that was deserialized: " << status.ToString(); + } - if (!status.ok() || reply.status() == rpc::GetObjectStatusReply::OUT_OF_SCOPE) { - // The owner is gone or the owner replied that the object has gone - // out of scope (this is an edge case in the distributed ref counting - // protocol where a borrower dies before it can notify the owner of - // another borrower). Store an error so that an exception will be - // thrown immediately when the worker tries to get the value. - RAY_UNUSED(in_memory_store_->Put( - RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE), object_id)); - } else { - // We can now try to fetch the object via plasma. If the owner later - // fails or the object is released, the raylet will eventually store - // an error in plasma on our behalf. - RAY_UNUSED(in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), - object_id)); - } - }); + if (!status.ok() || reply.status() == rpc::GetObjectStatusReply::OUT_OF_SCOPE) { + // The owner is gone or the owner replied that the object has gone + // out of scope (this is an edge case in the distributed ref counting + // protocol where a borrower dies before it can notify the owner of + // another borrower). Store an error so that an exception will be + // thrown immediately when the worker tries to get the value. + RAY_UNUSED(in_memory_store_->Put( + RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE), object_id)); + } else if (reply.status() == rpc::GetObjectStatusReply::CREATED) { + // The object is either an indicator that the object is in Plasma, or + // the object has been returned directly in the reply. In either + // case, we put the corresponding RayObject into the in-memory store. + // If the owner later fails or the object is released, the raylet + // will eventually store an error in Plasma on our behalf. + const auto &data = reply.object().data(); + std::shared_ptr data_buffer; + if (data.size() > 0) { + RAY_LOG(DEBUG) << "Object returned directly in GetObjectStatus reply, putting " + << object_id << " in memory store"; + data_buffer = std::make_shared( + const_cast(reinterpret_cast(data.data())), + data.size()); + } else { + RAY_LOG(DEBUG) << "Object not returned directly in GetObjectStatus reply, " + << object_id << " will have to be fetched from Plasma"; + } + const auto &metadata = reply.object().metadata(); + std::shared_ptr metadata_buffer; + if (metadata.size() > 0) { + metadata_buffer = std::make_shared( + const_cast(reinterpret_cast(metadata.data())), + metadata.size()); + } + auto inlined_ids = + IdVectorFromProtobuf(reply.object().nested_inlined_ids()); + RAY_UNUSED(in_memory_store_->Put( + RayObject(data_buffer, metadata_buffer, inlined_ids), object_id)); + } + }); } } // namespace ray diff --git a/src/ray/core_worker/future_resolver.h b/src/ray/core_worker/future_resolver.h index be504a582..b774434b7 100644 --- a/src/ray/core_worker/future_resolver.h +++ b/src/ray/core_worker/future_resolver.h @@ -16,6 +16,7 @@ #include +#include "ray/common/grpc_util.h" #include "ray/common/id.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" #include "ray/rpc/worker/core_worker_client.h" diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 799530d27..43dfaa45b 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -132,6 +132,15 @@ message GetObjectStatusRequest { bytes object_id = 2; } +message RayObject { + // Data of the object. + bytes data = 1; + // Metadata of the object. + bytes metadata = 2; + // ObjectIDs that were nested in data. This is only set for inlined objects. + repeated bytes nested_inlined_ids = 3; +} + message GetObjectStatusReply { enum ObjectStatus { CREATED = 0; @@ -139,6 +148,9 @@ message GetObjectStatusReply { FREED = 2; } ObjectStatus status = 1; + // The Ray object: either a concrete value, an in-Plasma indicator, or an + // exception. + RayObject object = 2; } message WaitForActorOutOfScopeRequest {