Fix raylet pinning race condition (#7235)

This commit is contained in:
Edward Oakes
2020-02-20 10:41:36 -08:00
committed by GitHub
parent 6043ce710d
commit 16e37416cd
8 changed files with 94 additions and 72 deletions
@@ -8,7 +8,6 @@ import time
import pytest
import logging
import uuid
import gc
import ray
import ray.cluster_utils
@@ -292,9 +291,6 @@ def test_basic_serialized_reference(one_worker_100MiB):
# Remove the local reference.
array_oid_bytes = array_oid.binary()
del array_oid
# Needed due to Python GC issue in cloudpickle.
# https://github.com/cloudpipe/cloudpickle/issues/343
gc.collect()
# Check that the remote reference pins the object.
_fill_object_store_and_get(array_oid_bytes)
@@ -310,8 +306,6 @@ def test_basic_serialized_reference(one_worker_100MiB):
# Call a recursive chain of tasks that pass a serialized reference to the end
# of the chain. The reference should still exist while the final task in the
# chain is running and should be removed once it finishes.
@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle "
"(https://github.com/cloudpipe/cloudpickle/issues/343).")
def test_recursive_serialized_reference(one_worker_100MiB):
@ray.remote
def recursive(ref, dep, max_depth, depth=0):
@@ -385,9 +379,6 @@ def test_actor_holding_serialized_reference(one_worker_100MiB):
# Remove the local reference.
array_oid_bytes = array_oid.binary()
del array_oid
# Needed due to Python GC issue in cloudpickle.
# https://github.com/cloudpipe/cloudpickle/issues/343
gc.collect()
# Test that the remote references still pin the object.
_fill_object_store_and_get(array_oid_bytes)
@@ -404,8 +395,6 @@ def test_actor_holding_serialized_reference(one_worker_100MiB):
# Test that a passed reference held by an actor after a task finishes
# is kept until the reference is removed from the worker. Also tests giving
# the worker a duplicate reference to the same object ID.
@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle "
"(https://github.com/cloudpipe/cloudpickle/issues/343).")
def test_worker_holding_serialized_reference(one_worker_100MiB):
@ray.remote
def child(dep1, dep2):
@@ -448,9 +437,6 @@ def test_basic_nested_ids(one_worker_100MiB):
# Remove the local reference to the inner object.
inner_oid_bytes = inner_oid.binary()
del inner_oid
# Needed due to Python GC issue in cloudpickle.
# https://github.com/cloudpipe/cloudpickle/issues/343
gc.collect()
# Check that the outer reference pins the inner object.
_fill_object_store_and_get(inner_oid_bytes)
@@ -462,8 +448,6 @@ def test_basic_nested_ids(one_worker_100MiB):
# Test that an object containing object IDs within it pins the inner IDs
# recursively and for submitted tasks.
@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle "
"(https://github.com/cloudpipe/cloudpickle/issues/343).")
def test_recursively_nest_ids(one_worker_100MiB):
@ray.remote
def recursive(ref, dep, max_depth, depth=0):
@@ -506,8 +490,6 @@ def test_recursively_nest_ids(one_worker_100MiB):
# Test that serialized objectIDs returned from remote tasks are pinned until
# they go out of scope on the caller side.
@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle "
"(https://github.com/cloudpipe/cloudpickle/issues/343).")
def test_return_object_id(one_worker_100MiB):
@ray.remote
def put():
@@ -536,8 +518,6 @@ def test_return_object_id(one_worker_100MiB):
# Test that serialized objectIDs returned from remote tasks are pinned if
# passed into another remote task by the caller.
@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle "
"(https://github.com/cloudpipe/cloudpickle/issues/343).")
def test_pass_returned_object_id(one_worker_100MiB):
@ray.remote
def put():
@@ -573,8 +553,6 @@ def test_pass_returned_object_id(one_worker_100MiB):
# returned by another task to the end of the chain. The reference should still
# exist while the final task in the chain is running and should be removed once
# it finishes.
@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle "
"(https://github.com/cloudpipe/cloudpickle/issues/343).")
def test_recursively_pass_returned_object_id(one_worker_100MiB):
@ray.remote
def put():
+46 -27
View File
@@ -182,12 +182,8 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
store_socket, local_raylet_client_, check_signals_));
memory_store_.reset(new CoreWorkerMemoryStore(
[this](const RayObject &obj, const ObjectID &obj_id) {
bool object_exists;
RAY_CHECK_OK(plasma_store_provider_->Put(obj, obj_id, &object_exists));
if (!object_exists) {
RAY_LOG(DEBUG) << "Pinning object promoted to plasma " << obj_id;
RAY_CHECK_OK(local_raylet_client_->PinObjectIDs(rpc_address_, {obj_id}));
}
RAY_LOG(DEBUG) << "Promoting object to plasma " << obj_id;
RAY_CHECK_OK(Put(obj, /*contained_object_ids=*/{}, obj_id, /*pin_object=*/true));
},
ref_counting_enabled ? reference_counter_ : nullptr, local_raylet_client_,
check_signals_));
@@ -353,13 +349,8 @@ void CoreWorker::PromoteToPlasmaAndGetOwnershipInfo(const ObjectID &object_id,
auto value = memory_store_->GetOrPromoteToPlasma(object_id);
if (value) {
RAY_LOG(DEBUG) << "Storing object promoted to plasma " << object_id;
bool object_exists;
RAY_CHECK_OK(plasma_store_provider_->Put(*value, object_id, &object_exists));
if (!object_exists) {
RAY_LOG(DEBUG) << "PromoteToPlasma: Pinning object promoted to plasma "
<< object_id;
RAY_CHECK_OK(local_raylet_client_->PinObjectIDs(rpc_address_, {object_id}));
}
RAY_CHECK_OK(
Put(*value, /*contained_object_ids=*/{}, object_id, /*pin_object=*/true));
}
auto has_owner = reference_counter_->GetOwner(object_id, owner_id, owner_address);
@@ -405,20 +396,33 @@ Status CoreWorker::Put(const RayObject &object,
static_cast<uint8_t>(TaskTransportType::RAYLET));
reference_counter_->AddOwnedObject(*object_id, contained_object_ids, GetCallerId(),
rpc_address_);
RAY_RETURN_NOT_OK(Put(object, contained_object_ids, *object_id));
// Tell the raylet to pin the object **after** it is created.
RAY_CHECK_OK(local_raylet_client_->PinObjectIDs(rpc_address_, {*object_id}));
return Status::OK();
return Put(object, contained_object_ids, *object_id, /*pin_object=*/true);
}
Status CoreWorker::Put(const RayObject &object,
const std::vector<ObjectID> &contained_object_ids,
const ObjectID &object_id) {
RAY_CHECK(object_id.GetTransportType() ==
static_cast<uint8_t>(TaskTransportType::RAYLET))
<< "Invalid transport type flag in object ID: " << object_id.GetTransportType();
// TODO(edoakes,swang): add contained object IDs to the reference counter.
return plasma_store_provider_->Put(object, object_id, nullptr);
const ObjectID &object_id, bool pin_object) {
bool object_exists;
RAY_RETURN_NOT_OK(plasma_store_provider_->Put(object, object_id, &object_exists));
if (!object_exists) {
if (pin_object) {
// Tell the raylet to pin the object **after** it is created.
RAY_LOG(DEBUG) << "Pinning put object " << object_id;
RAY_CHECK_OK(local_raylet_client_->PinObjectIDs(
rpc_address_, {object_id},
[this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) {
// Only release the object once the raylet has responded to avoid the race
// condition that the object could be evicted before the raylet pins it.
if (!plasma_store_provider_->Release(object_id).ok()) {
RAY_LOG(ERROR) << "Failed to release ObjectID (" << object_id
<< "), might cause a leak in plasma.";
}
}));
} else {
RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id));
}
}
return Status::OK();
}
Status CoreWorker::Create(const std::shared_ptr<Buffer> &metadata, const size_t data_size,
@@ -442,12 +446,24 @@ Status CoreWorker::Create(const std::shared_ptr<Buffer> &metadata, const size_t
return plasma_store_provider_->Create(metadata, data_size, object_id, data);
}
Status CoreWorker::Seal(const ObjectID &object_id, bool pin_object) {
Status CoreWorker::Seal(const ObjectID &object_id, bool pin_object,
const absl::optional<rpc::Address> &owner_address) {
RAY_RETURN_NOT_OK(plasma_store_provider_->Seal(object_id));
if (pin_object) {
// Tell the raylet to pin the object **after** it is created.
RAY_LOG(DEBUG) << "Pinning created object " << object_id;
RAY_CHECK_OK(local_raylet_client_->PinObjectIDs(rpc_address_, {object_id}));
RAY_LOG(DEBUG) << "Pinning sealed object " << object_id;
RAY_CHECK_OK(local_raylet_client_->PinObjectIDs(
owner_address.has_value() ? *owner_address : rpc_address_, {object_id},
[this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) {
// Only release the object once the raylet has responded to avoid the race
// condition that the object could be evicted before the raylet pins it.
if (!plasma_store_provider_->Release(object_id).ok()) {
RAY_LOG(ERROR) << "Failed to release ObjectID (" << object_id
<< "), might cause a leak in plasma.";
}
}));
} else {
RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id));
}
return Status::OK();
}
@@ -1024,6 +1040,9 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
task_type, func, task_spec.GetRequiredResources().GetResourceMap(), args,
arg_reference_ids, return_ids, return_objects, worker_context_.GetWorkerID());
absl::optional<rpc::Address> caller_address(
worker_context_.GetCurrentTask()->CallerAddress());
for (size_t i = 0; i < return_objects->size(); i++) {
// The object is nullptr if it already existed in the object store.
if (!return_objects->at(i)) {
@@ -1031,7 +1050,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
}
if (return_objects->at(i)->GetData() != nullptr &&
return_objects->at(i)->GetData()->IsPlasmaBuffer()) {
if (!Seal(return_ids[i], /*pin_object=*/false).ok()) {
if (!Seal(return_ids[i], /*pin_object=*/true, caller_address).ok()) {
RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to seal object "
<< return_ids[i] << " in store: " << status.message();
}
+6 -2
View File
@@ -202,9 +202,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[in] object The ray object.
/// \param[in] contained_object_ids The IDs serialized in this object.
/// \param[in] object_id Object ID specified by the user.
/// \param[in] pin_object Whether or not to tell the raylet to pin this object.
/// \return Status.
Status Put(const RayObject &object, const std::vector<ObjectID> &contained_object_ids,
const ObjectID &object_id);
const ObjectID &object_id, bool pin_object = false);
/// Create and return a buffer in the object store that can be directly written
/// into. After writing to the buffer, the caller must call `Seal()` to finalize
@@ -239,8 +240,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
///
/// \param[in] object_id Object ID corresponding to the object.
/// \param[in] pin_object Whether or not to pin the object at the local raylet.
/// \param[in] owner_address Address of the owner of the object who will be contacted by
/// the raylet if the object is pinned. If not provided, defaults to this worker.
/// \return Status.
Status Seal(const ObjectID &object_id, bool pin_object);
Status Seal(const ObjectID &object_id, bool pin_object,
const absl::optional<rpc::Address> &owner_address = absl::nullopt);
/// Get a list of objects from the object store. Objects that failed to be retrieved
/// will be returned as nullptrs.
@@ -84,6 +84,14 @@ Status CoreWorkerPlasmaStoreProvider::Seal(const ObjectID &object_id) {
{
std::lock_guard<std::mutex> guard(store_client_mutex_);
RAY_ARROW_RETURN_NOT_OK(store_client_.Seal(plasma_id));
}
return Status::OK();
}
Status CoreWorkerPlasmaStoreProvider::Release(const ObjectID &object_id) {
auto plasma_id = object_id.ToPlasmaId();
{
std::lock_guard<std::mutex> guard(store_client_mutex_);
RAY_ARROW_RETURN_NOT_OK(store_client_.Release(plasma_id));
}
return Status::OK();
@@ -29,19 +29,43 @@ class CoreWorkerPlasmaStoreProvider {
/// Create and seal an object.
///
/// NOTE: The caller must subsequently call Release() to release the first reference to
/// the created object. Until then, the object is pinned and cannot be evicted.
///
/// \param[in] object The object to create.
/// \param[in] object_id The ID of the object. This can be used as an
/// argument to Get to retrieve the object data.
/// \param[in] object_id The ID of the object.
/// \param[out] object_exists Optional. Returns whether an object with the
/// same ID already exists. If this is true, then the Put does not write any
/// object data.
Status Put(const RayObject &object, const ObjectID &object_id, bool *object_exists);
/// Create an object in plasma and return a mutable buffer to it. The buffer should be
/// subsequently written to and then sealed using Seal().
///
/// \param[in] metadata The metadata of the object.
/// \param[in] data_size The size of the object.
/// \param[in] object_id The ID of the object.
/// \param[out] data The mutable object buffer in plasma that can be written to.
Status Create(const std::shared_ptr<Buffer> &metadata, const size_t data_size,
const ObjectID &object_id, std::shared_ptr<Buffer> *data);
/// Seal an object buffer created with Create().
///
/// NOTE: The caller must subsequently call Release() to release the first reference to
/// the created object. Until then, the object is pinned and cannot be evicted.
///
/// \param[in] object_id The ID of the object. This can be used as an
/// argument to Get to retrieve the object data.
Status Seal(const ObjectID &object_id);
/// Release the first reference to the object created by Put() or Create(). This should
/// be called exactly once per object and until it is called, the object is pinned and
/// cannot be evicted.
///
/// \param[in] object_id The ID of the object. This can be used as an
/// argument to Get to retrieve the object data.
Status Release(const ObjectID &object_id);
Status Get(const absl::flat_hash_set<ObjectID> &object_ids, int64_t timeout_ms,
const WorkerContext &ctx,
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> *results,
@@ -238,9 +238,7 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
}
}
const rpc::Address &caller_address = request.caller_address();
auto accept_callback = [this, caller_address, reply, send_reply_callback, task_spec,
resource_ids]() {
auto accept_callback = [this, reply, send_reply_callback, task_spec, resource_ids]() {
// We have posted an exit task onto the main event loop,
// so shouldn't bother executing any further work.
if (exiting_) return;
@@ -258,7 +256,6 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
bool objects_valid = return_objects.size() == num_returns;
if (objects_valid) {
std::vector<ObjectID> plasma_return_ids;
for (size_t i = 0; i < return_objects.size(); i++) {
auto return_object = reply->add_return_objects();
ObjectID id = ObjectID::ForTaskReturn(
@@ -270,7 +267,6 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
const auto &result = return_objects[i];
if (result->GetData() != nullptr && result->GetData()->IsPlasmaBuffer()) {
return_object->set_in_plasma(true);
plasma_return_ids.push_back(id);
} else {
if (result->GetData() != nullptr) {
return_object->set_data(result->GetData()->Data(), result->GetData()->Size());
@@ -284,15 +280,6 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
}
}
}
// If we spilled any return objects to plasma, notify the raylet to pin them.
// The raylet will then coordinate with the caller to manage the objects'
// lifetimes.
// TODO(edoakes): the plasma objects could be evicted between creating them
// here and when raylet pins them.
if (!plasma_return_ids.empty()) {
RAY_CHECK_OK(
local_raylet_client_->PinObjectIDs(caller_address, plasma_return_ids));
}
if (task_spec.IsActorCreationTask()) {
RAY_LOG(INFO) << "Actor creation task finished, task_id: " << task_spec.TaskId()
<< ", actor_id: " << task_spec.ActorCreationId();
+4 -3
View File
@@ -366,14 +366,15 @@ Status raylet::RayletClient::ReturnWorker(int worker_port, const WorkerID &worke
});
}
Status raylet::RayletClient::PinObjectIDs(const rpc::Address &caller_address,
const std::vector<ObjectID> &object_ids) {
Status raylet::RayletClient::PinObjectIDs(
const rpc::Address &caller_address, const std::vector<ObjectID> &object_ids,
const rpc::ClientCallback<rpc::PinObjectIDsReply> &callback) {
rpc::PinObjectIDsRequest request;
request.mutable_owner_address()->CopyFrom(caller_address);
for (const ObjectID &object_id : object_ids) {
request.add_object_ids(object_id.Binary());
}
return grpc_client_->PinObjectIDs(request, nullptr);
return grpc_client_->PinObjectIDs(request, callback);
}
} // namespace ray
+3 -2
View File
@@ -253,8 +253,9 @@ class RayletClient : public WorkerLeaseInterface {
ray::Status ReturnWorker(int worker_port, const WorkerID &worker_id,
bool disconnect_worker) override;
ray::Status PinObjectIDs(const rpc::Address &caller_address,
const std::vector<ObjectID> &object_ids);
ray::Status PinObjectIDs(
const rpc::Address &caller_address, const std::vector<ObjectID> &object_ids,
const ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply> &callback);
WorkerID GetWorkerID() const { return worker_id_; }