diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index 6331a00e8..78114a80d 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -8,7 +8,6 @@ import time import pytest import logging import uuid -import gc import ray import ray.cluster_utils @@ -292,9 +291,6 @@ def test_basic_serialized_reference(one_worker_100MiB): # Remove the local reference. array_oid_bytes = array_oid.binary() del array_oid - # Needed due to Python GC issue in cloudpickle. - # https://github.com/cloudpipe/cloudpickle/issues/343 - gc.collect() # Check that the remote reference pins the object. _fill_object_store_and_get(array_oid_bytes) @@ -310,8 +306,6 @@ def test_basic_serialized_reference(one_worker_100MiB): # Call a recursive chain of tasks that pass a serialized reference to the end # of the chain. The reference should still exist while the final task in the # chain is running and should be removed once it finishes. -@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle " - "(https://github.com/cloudpipe/cloudpickle/issues/343).") def test_recursive_serialized_reference(one_worker_100MiB): @ray.remote def recursive(ref, dep, max_depth, depth=0): @@ -385,9 +379,6 @@ def test_actor_holding_serialized_reference(one_worker_100MiB): # Remove the local reference. array_oid_bytes = array_oid.binary() del array_oid - # Needed due to Python GC issue in cloudpickle. - # https://github.com/cloudpipe/cloudpickle/issues/343 - gc.collect() # Test that the remote references still pin the object. _fill_object_store_and_get(array_oid_bytes) @@ -404,8 +395,6 @@ def test_actor_holding_serialized_reference(one_worker_100MiB): # Test that a passed reference held by an actor after a task finishes # is kept until the reference is removed from the worker. Also tests giving # the worker a duplicate reference to the same object ID. -@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle " - "(https://github.com/cloudpipe/cloudpickle/issues/343).") def test_worker_holding_serialized_reference(one_worker_100MiB): @ray.remote def child(dep1, dep2): @@ -448,9 +437,6 @@ def test_basic_nested_ids(one_worker_100MiB): # Remove the local reference to the inner object. inner_oid_bytes = inner_oid.binary() del inner_oid - # Needed due to Python GC issue in cloudpickle. - # https://github.com/cloudpipe/cloudpickle/issues/343 - gc.collect() # Check that the outer reference pins the inner object. _fill_object_store_and_get(inner_oid_bytes) @@ -462,8 +448,6 @@ def test_basic_nested_ids(one_worker_100MiB): # Test that an object containing object IDs within it pins the inner IDs # recursively and for submitted tasks. -@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle " - "(https://github.com/cloudpipe/cloudpickle/issues/343).") def test_recursively_nest_ids(one_worker_100MiB): @ray.remote def recursive(ref, dep, max_depth, depth=0): @@ -506,8 +490,6 @@ def test_recursively_nest_ids(one_worker_100MiB): # Test that serialized objectIDs returned from remote tasks are pinned until # they go out of scope on the caller side. -@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle " - "(https://github.com/cloudpipe/cloudpickle/issues/343).") def test_return_object_id(one_worker_100MiB): @ray.remote def put(): @@ -536,8 +518,6 @@ def test_return_object_id(one_worker_100MiB): # Test that serialized objectIDs returned from remote tasks are pinned if # passed into another remote task by the caller. -@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle " - "(https://github.com/cloudpipe/cloudpickle/issues/343).") def test_pass_returned_object_id(one_worker_100MiB): @ray.remote def put(): @@ -573,8 +553,6 @@ def test_pass_returned_object_id(one_worker_100MiB): # returned by another task to the end of the chain. The reference should still # exist while the final task in the chain is running and should be removed once # it finishes. -@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle " - "(https://github.com/cloudpipe/cloudpickle/issues/343).") def test_recursively_pass_returned_object_id(one_worker_100MiB): @ray.remote def put(): diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 029d4d955..10acc56ab 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -182,12 +182,8 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, store_socket, local_raylet_client_, check_signals_)); memory_store_.reset(new CoreWorkerMemoryStore( [this](const RayObject &obj, const ObjectID &obj_id) { - bool object_exists; - RAY_CHECK_OK(plasma_store_provider_->Put(obj, obj_id, &object_exists)); - if (!object_exists) { - RAY_LOG(DEBUG) << "Pinning object promoted to plasma " << obj_id; - RAY_CHECK_OK(local_raylet_client_->PinObjectIDs(rpc_address_, {obj_id})); - } + RAY_LOG(DEBUG) << "Promoting object to plasma " << obj_id; + RAY_CHECK_OK(Put(obj, /*contained_object_ids=*/{}, obj_id, /*pin_object=*/true)); }, ref_counting_enabled ? reference_counter_ : nullptr, local_raylet_client_, check_signals_)); @@ -353,13 +349,8 @@ void CoreWorker::PromoteToPlasmaAndGetOwnershipInfo(const ObjectID &object_id, auto value = memory_store_->GetOrPromoteToPlasma(object_id); if (value) { RAY_LOG(DEBUG) << "Storing object promoted to plasma " << object_id; - bool object_exists; - RAY_CHECK_OK(plasma_store_provider_->Put(*value, object_id, &object_exists)); - if (!object_exists) { - RAY_LOG(DEBUG) << "PromoteToPlasma: Pinning object promoted to plasma " - << object_id; - RAY_CHECK_OK(local_raylet_client_->PinObjectIDs(rpc_address_, {object_id})); - } + RAY_CHECK_OK( + Put(*value, /*contained_object_ids=*/{}, object_id, /*pin_object=*/true)); } auto has_owner = reference_counter_->GetOwner(object_id, owner_id, owner_address); @@ -405,20 +396,33 @@ Status CoreWorker::Put(const RayObject &object, static_cast(TaskTransportType::RAYLET)); reference_counter_->AddOwnedObject(*object_id, contained_object_ids, GetCallerId(), rpc_address_); - RAY_RETURN_NOT_OK(Put(object, contained_object_ids, *object_id)); - // Tell the raylet to pin the object **after** it is created. - RAY_CHECK_OK(local_raylet_client_->PinObjectIDs(rpc_address_, {*object_id})); - return Status::OK(); + return Put(object, contained_object_ids, *object_id, /*pin_object=*/true); } Status CoreWorker::Put(const RayObject &object, const std::vector &contained_object_ids, - const ObjectID &object_id) { - RAY_CHECK(object_id.GetTransportType() == - static_cast(TaskTransportType::RAYLET)) - << "Invalid transport type flag in object ID: " << object_id.GetTransportType(); - // TODO(edoakes,swang): add contained object IDs to the reference counter. - return plasma_store_provider_->Put(object, object_id, nullptr); + const ObjectID &object_id, bool pin_object) { + bool object_exists; + RAY_RETURN_NOT_OK(plasma_store_provider_->Put(object, object_id, &object_exists)); + if (!object_exists) { + if (pin_object) { + // Tell the raylet to pin the object **after** it is created. + RAY_LOG(DEBUG) << "Pinning put object " << object_id; + RAY_CHECK_OK(local_raylet_client_->PinObjectIDs( + rpc_address_, {object_id}, + [this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) { + // Only release the object once the raylet has responded to avoid the race + // condition that the object could be evicted before the raylet pins it. + if (!plasma_store_provider_->Release(object_id).ok()) { + RAY_LOG(ERROR) << "Failed to release ObjectID (" << object_id + << "), might cause a leak in plasma."; + } + })); + } else { + RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id)); + } + } + return Status::OK(); } Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t data_size, @@ -442,12 +446,24 @@ Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t return plasma_store_provider_->Create(metadata, data_size, object_id, data); } -Status CoreWorker::Seal(const ObjectID &object_id, bool pin_object) { +Status CoreWorker::Seal(const ObjectID &object_id, bool pin_object, + const absl::optional &owner_address) { RAY_RETURN_NOT_OK(plasma_store_provider_->Seal(object_id)); if (pin_object) { // Tell the raylet to pin the object **after** it is created. - RAY_LOG(DEBUG) << "Pinning created object " << object_id; - RAY_CHECK_OK(local_raylet_client_->PinObjectIDs(rpc_address_, {object_id})); + RAY_LOG(DEBUG) << "Pinning sealed object " << object_id; + RAY_CHECK_OK(local_raylet_client_->PinObjectIDs( + owner_address.has_value() ? *owner_address : rpc_address_, {object_id}, + [this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) { + // Only release the object once the raylet has responded to avoid the race + // condition that the object could be evicted before the raylet pins it. + if (!plasma_store_provider_->Release(object_id).ok()) { + RAY_LOG(ERROR) << "Failed to release ObjectID (" << object_id + << "), might cause a leak in plasma."; + } + })); + } else { + RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id)); } return Status::OK(); } @@ -1024,6 +1040,9 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, task_type, func, task_spec.GetRequiredResources().GetResourceMap(), args, arg_reference_ids, return_ids, return_objects, worker_context_.GetWorkerID()); + absl::optional caller_address( + worker_context_.GetCurrentTask()->CallerAddress()); + for (size_t i = 0; i < return_objects->size(); i++) { // The object is nullptr if it already existed in the object store. if (!return_objects->at(i)) { @@ -1031,7 +1050,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, } if (return_objects->at(i)->GetData() != nullptr && return_objects->at(i)->GetData()->IsPlasmaBuffer()) { - if (!Seal(return_ids[i], /*pin_object=*/false).ok()) { + if (!Seal(return_ids[i], /*pin_object=*/true, caller_address).ok()) { RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to seal object " << return_ids[i] << " in store: " << status.message(); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index d1595fe9f..811c64b2a 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -202,9 +202,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] object The ray object. /// \param[in] contained_object_ids The IDs serialized in this object. /// \param[in] object_id Object ID specified by the user. + /// \param[in] pin_object Whether or not to tell the raylet to pin this object. /// \return Status. Status Put(const RayObject &object, const std::vector &contained_object_ids, - const ObjectID &object_id); + const ObjectID &object_id, bool pin_object = false); /// Create and return a buffer in the object store that can be directly written /// into. After writing to the buffer, the caller must call `Seal()` to finalize @@ -239,8 +240,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// /// \param[in] object_id Object ID corresponding to the object. /// \param[in] pin_object Whether or not to pin the object at the local raylet. + /// \param[in] owner_address Address of the owner of the object who will be contacted by + /// the raylet if the object is pinned. If not provided, defaults to this worker. /// \return Status. - Status Seal(const ObjectID &object_id, bool pin_object); + Status Seal(const ObjectID &object_id, bool pin_object, + const absl::optional &owner_address = absl::nullopt); /// Get a list of objects from the object store. Objects that failed to be retrieved /// will be returned as nullptrs. diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 2b5624d2b..672cc3820 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -84,6 +84,14 @@ Status CoreWorkerPlasmaStoreProvider::Seal(const ObjectID &object_id) { { std::lock_guard guard(store_client_mutex_); RAY_ARROW_RETURN_NOT_OK(store_client_.Seal(plasma_id)); + } + return Status::OK(); +} + +Status CoreWorkerPlasmaStoreProvider::Release(const ObjectID &object_id) { + auto plasma_id = object_id.ToPlasmaId(); + { + std::lock_guard guard(store_client_mutex_); RAY_ARROW_RETURN_NOT_OK(store_client_.Release(plasma_id)); } return Status::OK(); diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 992b339bd..d84c430bb 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -29,19 +29,43 @@ class CoreWorkerPlasmaStoreProvider { /// Create and seal an object. /// + /// NOTE: The caller must subsequently call Release() to release the first reference to + /// the created object. Until then, the object is pinned and cannot be evicted. + /// /// \param[in] object The object to create. - /// \param[in] object_id The ID of the object. This can be used as an - /// argument to Get to retrieve the object data. + /// \param[in] object_id The ID of the object. /// \param[out] object_exists Optional. Returns whether an object with the /// same ID already exists. If this is true, then the Put does not write any /// object data. Status Put(const RayObject &object, const ObjectID &object_id, bool *object_exists); + /// Create an object in plasma and return a mutable buffer to it. The buffer should be + /// subsequently written to and then sealed using Seal(). + /// + /// \param[in] metadata The metadata of the object. + /// \param[in] data_size The size of the object. + /// \param[in] object_id The ID of the object. + /// \param[out] data The mutable object buffer in plasma that can be written to. Status Create(const std::shared_ptr &metadata, const size_t data_size, const ObjectID &object_id, std::shared_ptr *data); + /// Seal an object buffer created with Create(). + /// + /// NOTE: The caller must subsequently call Release() to release the first reference to + /// the created object. Until then, the object is pinned and cannot be evicted. + /// + /// \param[in] object_id The ID of the object. This can be used as an + /// argument to Get to retrieve the object data. Status Seal(const ObjectID &object_id); + /// Release the first reference to the object created by Put() or Create(). This should + /// be called exactly once per object and until it is called, the object is pinned and + /// cannot be evicted. + /// + /// \param[in] object_id The ID of the object. This can be used as an + /// argument to Get to retrieve the object data. + Status Release(const ObjectID &object_id); + Status Get(const absl::flat_hash_set &object_ids, int64_t timeout_ms, const WorkerContext &ctx, absl::flat_hash_map> *results, diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index c26ad9826..825771744 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -238,9 +238,7 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( } } - const rpc::Address &caller_address = request.caller_address(); - auto accept_callback = [this, caller_address, reply, send_reply_callback, task_spec, - resource_ids]() { + auto accept_callback = [this, reply, send_reply_callback, task_spec, resource_ids]() { // We have posted an exit task onto the main event loop, // so shouldn't bother executing any further work. if (exiting_) return; @@ -258,7 +256,6 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( bool objects_valid = return_objects.size() == num_returns; if (objects_valid) { - std::vector plasma_return_ids; for (size_t i = 0; i < return_objects.size(); i++) { auto return_object = reply->add_return_objects(); ObjectID id = ObjectID::ForTaskReturn( @@ -270,7 +267,6 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( const auto &result = return_objects[i]; if (result->GetData() != nullptr && result->GetData()->IsPlasmaBuffer()) { return_object->set_in_plasma(true); - plasma_return_ids.push_back(id); } else { if (result->GetData() != nullptr) { return_object->set_data(result->GetData()->Data(), result->GetData()->Size()); @@ -284,15 +280,6 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( } } } - // If we spilled any return objects to plasma, notify the raylet to pin them. - // The raylet will then coordinate with the caller to manage the objects' - // lifetimes. - // TODO(edoakes): the plasma objects could be evicted between creating them - // here and when raylet pins them. - if (!plasma_return_ids.empty()) { - RAY_CHECK_OK( - local_raylet_client_->PinObjectIDs(caller_address, plasma_return_ids)); - } if (task_spec.IsActorCreationTask()) { RAY_LOG(INFO) << "Actor creation task finished, task_id: " << task_spec.TaskId() << ", actor_id: " << task_spec.ActorCreationId(); diff --git a/src/ray/raylet/raylet_client.cc b/src/ray/raylet/raylet_client.cc index 2a26eb84a..022b1ea11 100644 --- a/src/ray/raylet/raylet_client.cc +++ b/src/ray/raylet/raylet_client.cc @@ -366,14 +366,15 @@ Status raylet::RayletClient::ReturnWorker(int worker_port, const WorkerID &worke }); } -Status raylet::RayletClient::PinObjectIDs(const rpc::Address &caller_address, - const std::vector &object_ids) { +Status raylet::RayletClient::PinObjectIDs( + const rpc::Address &caller_address, const std::vector &object_ids, + const rpc::ClientCallback &callback) { rpc::PinObjectIDsRequest request; request.mutable_owner_address()->CopyFrom(caller_address); for (const ObjectID &object_id : object_ids) { request.add_object_ids(object_id.Binary()); } - return grpc_client_->PinObjectIDs(request, nullptr); + return grpc_client_->PinObjectIDs(request, callback); } } // namespace ray diff --git a/src/ray/raylet/raylet_client.h b/src/ray/raylet/raylet_client.h index 69b04e8de..8ef8764c4 100644 --- a/src/ray/raylet/raylet_client.h +++ b/src/ray/raylet/raylet_client.h @@ -253,8 +253,9 @@ class RayletClient : public WorkerLeaseInterface { ray::Status ReturnWorker(int worker_port, const WorkerID &worker_id, bool disconnect_worker) override; - ray::Status PinObjectIDs(const rpc::Address &caller_address, - const std::vector &object_ids); + ray::Status PinObjectIDs( + const rpc::Address &caller_address, const std::vector &object_ids, + const ray::rpc::ClientCallback &callback); WorkerID GetWorkerID() const { return worker_id_; }