diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 4b5f9deee..8ba80852f 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -898,17 +898,16 @@ cdef class CoreWorker: return RayObjectsToDataMetadataPairs(results) - def object_exists(self, ObjectRef object_ref, memory_store_only=False): + def object_exists(self, ObjectRef object_ref): cdef: c_bool has_object - c_bool is_in_plasma CObjectID c_object_id = object_ref.native() with nogil: check_status(CCoreWorkerProcess.GetCoreWorker().Contains( - c_object_id, &has_object, &is_in_plasma)) + c_object_id, &has_object)) - return has_object and (not memory_store_only or not is_in_plasma) + return has_object cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata, size_t data_size, ObjectRef object_ref, diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 637dbd750..f1acad1fa 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -183,8 +183,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms, c_vector[shared_ptr[CRayObject]] *results, c_bool plasma_objects_only) - CRayStatus Contains(const CObjectID &object_id, c_bool *has_object, - c_bool *is_in_plasma) + CRayStatus Contains(const CObjectID &object_id, c_bool *has_object) CRayStatus Wait(const c_vector[CObjectID] &object_ids, int num_objects, int64_t timeout_ms, c_vector[c_bool] *results, c_bool fetch_local) diff --git a/python/ray/tests/test_advanced.py b/python/ray/tests/test_advanced.py index 8f607009e..6df746fdc 100644 --- a/python/ray/tests/test_advanced.py +++ b/python/ray/tests/test_advanced.py @@ -521,43 +521,6 @@ def test_wait_makes_object_local(ray_start_cluster): assert ray.worker.global_worker.core_worker.object_exists(x_id) -@pytest.mark.skipif(client_test_enabled(), reason="internal api") -def test_future_resolution_skip_plasma(ray_start_cluster): - cluster = ray_start_cluster - # Disable worker caching so worker leases are not reused; set object - # inlining size threshold and enable storing of small objects in in-memory - # object store so the borrowed ref is inlined. - cluster.add_node( - num_cpus=1, - resources={"pin_head": 1}, - _system_config={ - "worker_lease_timeout_milliseconds": 0, - "max_direct_call_object_size": 100 * 1024, - "put_small_object_in_memory_store": True, - }, - ) - cluster.add_node(num_cpus=1, resources={"pin_worker": 1}) - ray.init(address=cluster.address) - - @ray.remote(resources={"pin_head": 1}) - def f(x): - return x + 1 - - @ray.remote(resources={"pin_worker": 1}) - def g(x): - borrowed_ref = x[0] - f_ref = f.remote(borrowed_ref) - # borrowed_ref should be inlined on future resolution and shouldn't be - # in Plasma. - assert ray.worker.global_worker.core_worker.object_exists( - borrowed_ref, memory_store_only=True) - return ray.get(f_ref) * 2 - - one = ray.put(1) - g_ref = g.remote([one]) - assert ray.get(g_ref) == 4 - - if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index dfbe8ef2c..21fc462a7 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1058,8 +1058,7 @@ Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_m return Status::OK(); } -Status CoreWorker::Contains(const ObjectID &object_id, bool *has_object, - bool *is_in_plasma) { +Status CoreWorker::Contains(const ObjectID &object_id, bool *has_object) { bool found = false; bool in_plasma = false; found = memory_store_->Contains(object_id, &in_plasma); @@ -1067,9 +1066,6 @@ Status CoreWorker::Contains(const ObjectID &object_id, bool *has_object, RAY_RETURN_NOT_OK(plasma_store_provider_->Contains(object_id, &found)); } *has_object = found; - if (is_in_plasma != nullptr) { - *is_in_plasma = found && in_plasma; - } return Status::OK(); } @@ -2095,43 +2091,25 @@ void CoreWorker::HandleGetObjectStatus(const rpc::GetObjectStatusRequest &reques send_reply_callback(Status::OK(), nullptr, nullptr); } else { RAY_CHECK(owner_address.worker_id() == request.owner_worker_id()); - bool is_freed = reference_counter_->IsPlasmaObjectFreed(object_id); + if (reference_counter_->IsPlasmaObjectFreed(object_id)) { + reply->set_status(rpc::GetObjectStatusReply::FREED); + } else { + reply->set_status(rpc::GetObjectStatusReply::CREATED); + } // Send the reply once the value has become available. The value is // guaranteed to become available eventually because we own the object and // its ref count is > 0. - memory_store_->GetAsync(object_id, [reply, send_reply_callback, - is_freed](std::shared_ptr obj) { - if (is_freed) { - reply->set_status(rpc::GetObjectStatusReply::FREED); - } else { - // If obj is the concrete object value, it is small, so we - // send the object back to the caller in the GetObjectStatus - // reply, bypassing a Plasma put and object transfer. If obj - // is an indicator that the object is in Plasma, we set an - // in_plasma indicator on the message, and the caller will - // have to facilitate a Plasma object transfer to get the - // object value. - auto *object = reply->mutable_object(); - if (obj->HasData()) { - const auto &data = obj->GetData(); - object->set_data(data->Data(), data->Size()); - } - if (obj->HasMetadata()) { - const auto &metadata = obj->GetMetadata(); - object->set_metadata(metadata->Data(), metadata->Size()); - } - for (const auto &nested_id : obj->GetNestedIds()) { - object->add_nested_inlined_ids(nested_id.Binary()); - } - reply->set_status(rpc::GetObjectStatusReply::CREATED); - } - send_reply_callback(Status::OK(), nullptr, nullptr); - }); + // TODO(swang): We could probably just send the object value if it is small + // enough and we have it local. + memory_store_->GetAsync(object_id, + [send_reply_callback](std::shared_ptr obj) { + send_reply_callback(Status::OK(), nullptr, nullptr); + }); } RemoveLocalReference(object_id); -} // namespace ray +} void CoreWorker::HandleWaitForActorOutOfScope( const rpc::WaitForActorOutOfScopeRequest &request, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 3002b9003..088ba346a 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -559,10 +559,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// /// \param[in] object_id ID of the objects to check for. /// \param[out] has_object Whether or not the object is present. - /// \param[out] is_in_plasma Whether or not the object is in Plasma. /// \return Status. - Status Contains(const ObjectID &object_id, bool *has_object, - bool *is_in_plasma = nullptr); + Status Contains(const ObjectID &object_id, bool *has_object); /// Wait for a list of objects to appear in the object store. /// Duplicate object ids are supported, and `num_objects` includes duplicate ids in this diff --git a/src/ray/core_worker/future_resolver.cc b/src/ray/core_worker/future_resolver.cc index c625507cd..8a1cc3f07 100644 --- a/src/ray/core_worker/future_resolver.cc +++ b/src/ray/core_worker/future_resolver.cc @@ -28,53 +28,30 @@ void FutureResolver::ResolveFutureAsync(const ObjectID &object_id, rpc::GetObjectStatusRequest request; request.set_object_id(object_id.Binary()); request.set_owner_worker_id(owner_address.worker_id()); - conn->GetObjectStatus(request, [this, object_id]( - const Status &status, - const rpc::GetObjectStatusReply &reply) { - if (!status.ok()) { - RAY_LOG(WARNING) << "Error retrieving the value of object ID " << object_id - << " that was deserialized: " << status.ToString(); - } + conn->GetObjectStatus( + request, + [this, object_id](const Status &status, const rpc::GetObjectStatusReply &reply) { + if (!status.ok()) { + RAY_LOG(WARNING) << "Error retrieving the value of object ID " << object_id + << " that was deserialized: " << status.ToString(); + } - if (!status.ok() || reply.status() == rpc::GetObjectStatusReply::OUT_OF_SCOPE) { - // The owner is gone or the owner replied that the object has gone - // out of scope (this is an edge case in the distributed ref counting - // protocol where a borrower dies before it can notify the owner of - // another borrower). Store an error so that an exception will be - // thrown immediately when the worker tries to get the value. - RAY_UNUSED(in_memory_store_->Put( - RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE), object_id)); - } else if (reply.status() == rpc::GetObjectStatusReply::CREATED) { - // The object is either an indicator that the object is in Plasma, or - // the object has been returned directly in the reply. In either - // case, we put the corresponding RayObject into the in-memory store. - // If the owner later fails or the object is released, the raylet - // will eventually store an error in Plasma on our behalf. - const auto &data = reply.object().data(); - std::shared_ptr data_buffer; - if (data.size() > 0) { - RAY_LOG(DEBUG) << "Object returned directly in GetObjectStatus reply, putting " - << object_id << " in memory store"; - data_buffer = std::make_shared( - const_cast(reinterpret_cast(data.data())), - data.size()); - } else { - RAY_LOG(DEBUG) << "Object not returned directly in GetObjectStatus reply, " - << object_id << " will have to be fetched from Plasma"; - } - const auto &metadata = reply.object().metadata(); - std::shared_ptr metadata_buffer; - if (metadata.size() > 0) { - metadata_buffer = std::make_shared( - const_cast(reinterpret_cast(metadata.data())), - metadata.size()); - } - auto inlined_ids = - IdVectorFromProtobuf(reply.object().nested_inlined_ids()); - RAY_UNUSED(in_memory_store_->Put( - RayObject(data_buffer, metadata_buffer, inlined_ids), object_id)); - } - }); + if (!status.ok() || reply.status() == rpc::GetObjectStatusReply::OUT_OF_SCOPE) { + // The owner is gone or the owner replied that the object has gone + // out of scope (this is an edge case in the distributed ref counting + // protocol where a borrower dies before it can notify the owner of + // another borrower). Store an error so that an exception will be + // thrown immediately when the worker tries to get the value. + RAY_UNUSED(in_memory_store_->Put( + RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE), object_id)); + } else { + // We can now try to fetch the object via plasma. If the owner later + // fails or the object is released, the raylet will eventually store + // an error in plasma on our behalf. + RAY_UNUSED(in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), + object_id)); + } + }); } } // namespace ray diff --git a/src/ray/core_worker/future_resolver.h b/src/ray/core_worker/future_resolver.h index b774434b7..be504a582 100644 --- a/src/ray/core_worker/future_resolver.h +++ b/src/ray/core_worker/future_resolver.h @@ -16,7 +16,6 @@ #include -#include "ray/common/grpc_util.h" #include "ray/common/id.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" #include "ray/rpc/worker/core_worker_client.h" diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 43dfaa45b..799530d27 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -132,15 +132,6 @@ message GetObjectStatusRequest { bytes object_id = 2; } -message RayObject { - // Data of the object. - bytes data = 1; - // Metadata of the object. - bytes metadata = 2; - // ObjectIDs that were nested in data. This is only set for inlined objects. - repeated bytes nested_inlined_ids = 3; -} - message GetObjectStatusReply { enum ObjectStatus { CREATED = 0; @@ -148,9 +139,6 @@ message GetObjectStatusReply { FREED = 2; } ObjectStatus status = 1; - // The Ray object: either a concrete value, an in-Plasma indicator, or an - // exception. - RayObject object = 2; } message WaitForActorOutOfScopeRequest {