diff --git a/python/ray/serialization.py b/python/ray/serialization.py index 04530ff2d..3a4c3acc2 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -177,9 +177,11 @@ class SerializationContext(object): def object_id_deserializer(serialized_obj): obj_id, owner_id, owner_address = pickle.loads(serialized_obj) - # Must deserialize the object in the core worker before we - # create the ObjectID to ensure that the reference is added - # before we increment its count to 1. + # NOTE(swang): Must deserialize the object first before asking + # the core worker to resolve the value. This is to make sure + # that the ref count for the ObjectID is greater than 0 by the + # time the core worker resolves the value of the object. + deserialized_object_id = obj_id[0](obj_id[1][0]) if owner_id: worker = ray.worker.get_global_worker() worker.check_connected() @@ -187,8 +189,7 @@ class SerializationContext(object): # (class name, (unique bytes,)). worker.core_worker.deserialize_and_register_object_id( obj_id[1][0], owner_id[1][0], owner_address) - obj_id = obj_id[0](obj_id[1][0]) - return obj_id + return deserialized_object_id for id_type in ray._raylet._ID_TYPES: if id_type == ray._raylet.ObjectID: @@ -241,9 +242,11 @@ class SerializationContext(object): def object_id_deserializer(serialized_obj): obj_id, owner_id, owner_address = serialized_obj - # Must deserialize the object in the core worker before we - # create the ObjectID to ensure that the reference is added - # before we increment its count to 1. + # NOTE(swang): Must deserialize the object first before asking + # the core worker to resolve the value. This is to make sure + # that the ref count for the ObjectID is greater than 0 by the + # time the core worker resolves the value of the object. + deserialized_object_id = id_deserializer(obj_id) if owner_id: worker = ray.worker.get_global_worker() worker.check_connected() @@ -251,8 +254,7 @@ class SerializationContext(object): # (class name, (unique bytes,)). worker.core_worker.deserialize_and_register_object_id( obj_id[1][0], owner_id[1][0], owner_address) - obj_id = id_deserializer(obj_id) - return obj_id + return deserialized_object_id for id_type in ray._raylet._ID_TYPES: if id_type == ray._raylet.ObjectID: diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 3506d42df..b81678fa2 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -224,8 +224,7 @@ bool TaskSpecification::IsAsyncioActor() const { } bool TaskSpecification::IsDetachedActor() const { - RAY_CHECK(IsActorCreationTask()); - return message_->actor_creation_task_spec().is_detached(); + return IsActorCreationTask() && message_->actor_creation_task_spec().is_detached(); } std::string TaskSpecification::DebugString() const { diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index c17d569a2..7b43ee81c 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -972,26 +972,33 @@ void CoreWorker::HandleGetObjectStatus(const rpc::GetObjectStatusRequest &reques ObjectID object_id = ObjectID::FromBinary(request.object_id()); TaskID owner_id = TaskID::FromBinary(request.owner_id()); if (owner_id != GetCallerId()) { - // We may have owned this object in the past, but we are now executing some - // other task or actor. - reply->set_status(rpc::GetObjectStatusReply::WRONG_OWNER); - send_reply_callback(Status::OK(), nullptr, nullptr); - } else { - // We own the task. Reply back to the borrower once the object has been - // created. - // TODO: We could probably just send the object value if it is small - // enough and we have it local. - reply->set_status(rpc::GetObjectStatusReply::CREATED); + RAY_LOG(INFO) << "Handling GetObjectStatus for object produced by previous task " + << owner_id.Hex(); + } + // We own the task. Reply back to the borrower once the object has been + // created. + // TODO(swang): We could probably just send the object value if it is small + // enough and we have it local. + reply->set_status(rpc::GetObjectStatusReply::CREATED); + if (task_manager_->IsTaskPending(object_id.TaskId())) { + // Acquire a reference and retry. This prevents the object from being + // evicted out from under us before we can start the get. + AddObjectIDReference(object_id); if (task_manager_->IsTaskPending(object_id.TaskId())) { // The task is pending. Send the reply once the task finishes. memory_store_->GetAsync(object_id, [send_reply_callback](std::shared_ptr obj) { send_reply_callback(Status::OK(), nullptr, nullptr); }); + RemoveObjectIDReference(object_id); } else { - // The task is done. Send the reply immediately. + // We lost the race, the task is done. + RemoveObjectIDReference(object_id); send_reply_callback(Status::OK(), nullptr, nullptr); } + } else { + // The task is done. Send the reply immediately. + send_reply_callback(Status::OK(), nullptr, nullptr); } } diff --git a/src/ray/core_worker/future_resolver.cc b/src/ray/core_worker/future_resolver.cc index b360341e1..ce9ee8574 100644 --- a/src/ray/core_worker/future_resolver.cc +++ b/src/ray/core_worker/future_resolver.cc @@ -19,11 +19,9 @@ void FutureResolver::ResolveFutureAsync(const ObjectID &object_id, const TaskID RAY_CHECK_OK(it->second->GetObjectStatus( request, [this, object_id](const Status &status, const rpc::GetObjectStatusReply &reply) { - if (!status.ok() || reply.status() == rpc::GetObjectStatusReply::WRONG_OWNER) { - RAY_LOG(ERROR) - << "Error retrieving the value of object ID " << object_id - << " that was deserialized. Probably, the task or actor that created the " - "object ID initially (via ray.put or task submission) has exited."; + if (!status.ok()) { + RAY_LOG(ERROR) << "Error retrieving the value of object ID " << object_id + << " that was deserialized: " << status.ToString(); } // Either the owner is gone or the owner replied that the object has // been created. In both cases, we can now try to fetch the object via diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index bae2f3d4d..50c29400a 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -6,8 +6,12 @@ void ReferenceCounter::AddBorrowedObject(const ObjectID &object_id, const TaskID &owner_id, const rpc::Address &owner_address) { absl::MutexLock lock(&mutex_); - RAY_CHECK( - object_id_refs_.emplace(object_id, Reference(owner_id, owner_address)).second); + auto it = object_id_refs_.find(object_id); + RAY_CHECK(it != object_id_refs_.end()); + + if (!it->second.owner.has_value()) { + it->second.owner = {owner_id, owner_address}; + } } void ReferenceCounter::AddOwnedObject( diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index 13f82560e..d4d0b13a7 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -107,7 +107,7 @@ class ReferenceCounter { /// The object's owner, if we know it. This has no value if the object is /// if we do not know the object's owner (because distributed ref counting /// is not yet implemented). - const absl::optional> owner; + absl::optional> owner; }; /// Helper function with the same semantics as AddReference to allow adding a reference diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 67ac09a52..9481b902c 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -9,6 +9,11 @@ void TaskManager::AddPendingTask(const TaskSpecification &spec, int max_retries) RAY_CHECK(pending_tasks_.emplace(spec.TaskId(), std::move(entry)).second); } +bool TaskManager::IsTaskPending(const TaskID &task_id) const { + absl::MutexLock lock(&mu_); + return pending_tasks_.count(task_id) > 0; +} + void TaskManager::CompletePendingTask(const TaskID &task_id, const rpc::PushTaskReply &reply) { RAY_LOG(DEBUG) << "Completing task " << task_id; @@ -50,17 +55,10 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, } void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type) { - if (error_type == rpc::ErrorType::ACTOR_DIED) { - // Note that this might be the __ray_terminate__ task, so we don't log - // loudly with ERROR here. - RAY_LOG(INFO) << "Task " << task_id << " failed with error " - << rpc::ErrorType_Name(error_type); - } else { - RAY_LOG(ERROR) << "Task " << task_id << " failed with error " - << rpc::ErrorType_Name(error_type); - } - - RAY_LOG(DEBUG) << "Failing task " << task_id; + // Note that this might be the __ray_terminate__ task, so we don't log + // loudly with ERROR here. + RAY_LOG(DEBUG) << "Task " << task_id << " failed with error " + << rpc::ErrorType_Name(error_type); int num_retries_left = 0; TaskSpecification spec; { diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 52b7d3baf..2f15fcf82 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -43,9 +43,7 @@ class TaskManager : public TaskFinisherInterface { /// /// \param[in] task_id ID of the task to query. /// \return Whether the task is pending. - bool IsTaskPending(const TaskID &task_id) const { - return pending_tasks_.count(task_id) > 0; - } + bool IsTaskPending(const TaskID &task_id) const; /// Write return objects for a pending task to the memory store. /// @@ -75,7 +73,7 @@ class TaskManager : public TaskFinisherInterface { const RetryTaskCallback retry_task_callback_; /// Protects below fields. - absl::Mutex mu_; + mutable absl::Mutex mu_; /// Map from task ID to a pair of: /// {task spec, number of allowed retries left} diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index 506ec609e..7250d9dd8 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -99,7 +99,7 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded( auto lease_client = GetOrConnectLeaseClient(raylet_address); TaskSpecification &resource_spec = it->second.front(); TaskID task_id = resource_spec.TaskId(); - RAY_CHECK_OK(lease_client->RequestWorkerLease( + auto status = lease_client->RequestWorkerLease( resource_spec, [this, lease_client, task_id, scheduling_key]( const Status &status, const rpc::WorkerLeaseReply &reply) mutable { @@ -120,32 +120,40 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded( RequestNewWorkerIfNeeded(scheduling_key, &reply.retry_at_raylet_address()); } } else { - RAY_LOG(DEBUG) << "Retrying lease request " << task_id; - if (lease_client != local_lease_client_) { - // A lease request to a remote raylet failed. Retry locally if the lease is - // still needed. - // TODO(swang): Fail after some number of retries? - RAY_LOG(ERROR) << "Retrying attempt to schedule task at remote node. Error: " - << status.ToString(); - RequestNewWorkerIfNeeded(scheduling_key); - } else { - // A local request failed. This shouldn't happen if the raylet is still alive - // and we don't currently handle raylet failures, so treat it as a fatal - // error. - RAY_LOG(FATAL) << "Lost connection with local raylet. Error: " - << status.ToString(); - } + RetryLeaseRequest(status, lease_client, scheduling_key); } - })); + }); + if (!status.ok()) { + RetryLeaseRequest(status, lease_client, scheduling_key); + } pending_lease_requests_.insert(scheduling_key); } +void CoreWorkerDirectTaskSubmitter::RetryLeaseRequest( + Status status, std::shared_ptr lease_client, + const SchedulingKey &scheduling_key) { + if (lease_client != local_lease_client_) { + // A lease request to a remote raylet failed. Retry locally if the lease is + // still needed. + // TODO(swang): Fail after some number of retries? + RAY_LOG(ERROR) << "Retrying attempt to schedule task at remote node. Error: " + << status.ToString(); + RequestNewWorkerIfNeeded(scheduling_key); + } else { + // A local request failed. This shouldn't happen if the raylet is still alive + // and we don't currently handle raylet failures, so treat it as a fatal + // error. + RAY_LOG(FATAL) << "Lost connection with local raylet. Error: " << status.ToString(); + } +} + void CoreWorkerDirectTaskSubmitter::PushNormalTask( const rpc::WorkerAddress &addr, rpc::CoreWorkerClientInterface &client, const SchedulingKey &scheduling_key, const TaskSpecification &task_spec, const google::protobuf::RepeatedPtrField &assigned_resources) { auto task_id = task_spec.TaskId(); auto request = std::unique_ptr(new rpc::PushTaskRequest); + bool is_actor = task_spec.IsActorTask(); RAY_LOG(DEBUG) << "Pushing normal task " << task_spec.TaskId(); // NOTE(swang): CopyFrom is needed because if we use Swap here and the task // fails, then the task data will be gone when the TaskManager attempts to @@ -153,8 +161,9 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask( request->mutable_task_spec()->CopyFrom(task_spec.GetMessage()); request->mutable_resource_mapping()->CopyFrom(assigned_resources); RAY_CHECK_OK(client.PushNormalTask( - std::move(request), [this, task_id, scheduling_key, addr, assigned_resources]( - Status status, const rpc::PushTaskReply &reply) { + std::move(request), + [this, task_id, is_actor, scheduling_key, addr, assigned_resources]( + Status status, const rpc::PushTaskReply &reply) { { absl::MutexLock lock(&mu_); OnWorkerIdle(addr, scheduling_key, /*error=*/!status.ok(), assigned_resources); @@ -164,7 +173,9 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask( // failure (e.g., by contacting the raylet). If it was a process // failure, it may have been an application-level error and it may // not make sense to retry the task. - task_finisher_->PendingTaskFailed(task_id, rpc::ErrorType::WORKER_DIED); + task_finisher_->PendingTaskFailed(task_id, is_actor + ? rpc::ErrorType::ACTOR_DIED + : rpc::ErrorType::WORKER_DIED); } else { task_finisher_->CompletePendingTask(task_id, reply); } diff --git a/src/ray/core_worker/transport/direct_task_transport.h b/src/ray/core_worker/transport/direct_task_transport.h index 1623d2358..e0283d61f 100644 --- a/src/ray/core_worker/transport/direct_task_transport.h +++ b/src/ray/core_worker/transport/direct_task_transport.h @@ -64,6 +64,12 @@ class CoreWorkerDirectTaskSubmitter { const google::protobuf::RepeatedPtrField &assigned_resources) EXCLUSIVE_LOCKS_REQUIRED(mu_); + /// Retry a failed lease request. + void RetryLeaseRequest(Status status, + std::shared_ptr lease_client, + const SchedulingKey &scheduling_key) + EXCLUSIVE_LOCKS_REQUIRED(mu_); + /// Get an existing lease client or connect a new one. If a raylet_address is /// provided, this connects to a remote raylet. Else, this connects to the /// local raylet. diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index f2964d6e9..0e68e0882 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -103,7 +103,6 @@ message GetObjectStatusRequest { message GetObjectStatusReply { enum ObjectStatus { CREATED = 0; - WRONG_OWNER = 1; } ObjectStatus status = 1; } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index e74438014..4dddd876d 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1472,7 +1472,10 @@ void NodeManager::ProcessReportActiveObjectIDs( std::shared_ptr worker = worker_pool_.GetRegisteredWorker(client); if (!worker) { worker = worker_pool_.GetRegisteredDriver(client); - RAY_CHECK(worker); + if (!worker) { + RAY_LOG(ERROR) << "Ignoring object ids report from failed / unknown worker."; + return; + } } auto message = flatbuffers::GetRoot(message_data); @@ -2273,8 +2276,12 @@ void NodeManager::AssignTask(const std::shared_ptr &worker, const Task & auto task_id = spec.TaskId(); if (task.OnDispatch() != nullptr) { + if (task.GetTaskSpecification().IsDetachedActor()) { + worker->MarkDetachedActor(); + } task.OnDispatch()(worker, initial_config_.node_manager_address, worker->Port(), - worker->GetTaskResourceIds()); + spec.IsActorCreationTask() ? worker->GetLifetimeResourceIds() + : worker->GetTaskResourceIds()); post_assign_callbacks->push_back([this, worker, task_id]() { FinishAssignTask(worker, task_id, /*success=*/true); }); diff --git a/src/ray/raylet/raylet_client.cc b/src/ray/raylet/raylet_client.cc index 9b917c0cc..80d8888ef 100644 --- a/src/ray/raylet/raylet_client.cc +++ b/src/ray/raylet/raylet_client.cc @@ -419,7 +419,7 @@ ray::Status RayletClient::ReturnWorker(int worker_port, bool disconnect_worker) return grpc_client_->ReturnWorker( request, [](const ray::Status &status, const ray::rpc::ReturnWorkerReply &reply) { if (!status.ok()) { - RAY_LOG(ERROR) << "Error returning worker: " << status; + RAY_LOG(INFO) << "Error returning worker: " << status; } }); }