diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 102adf030..c62498021 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1112,24 +1112,20 @@ cdef class CoreWorker: def serialize_and_promote_object_id(self, ObjectID object_id): cdef: CObjectID c_object_id = object_id.native() - CTaskID c_owner_id = CTaskID.Nil() CAddress c_owner_address = CAddress() CCoreWorkerProcess.GetCoreWorker().PromoteToPlasmaAndGetOwnershipInfo( - c_object_id, &c_owner_id, &c_owner_address) + c_object_id, &c_owner_address) return (object_id, - TaskID(c_owner_id.Binary()), c_owner_address.SerializeAsString()) def deserialize_and_register_object_id( self, const c_string &object_id_binary, ObjectID outer_object_id, - const c_string &owner_id_binary, const c_string &serialized_owner_address): cdef: CObjectID c_object_id = CObjectID.FromBinary(object_id_binary) CObjectID c_outer_object_id = (outer_object_id.native() if outer_object_id else CObjectID.Nil()) - CTaskID c_owner_id = CTaskID.FromBinary(owner_id_binary) CAddress c_owner_address = CAddress() c_owner_address.ParseFromString(serialized_owner_address) @@ -1137,7 +1133,6 @@ cdef class CoreWorker: .RegisterOwnershipInfoAndResolveFuture( c_object_id, c_outer_object_id, - c_owner_id, c_owner_address)) cdef store_task_outputs( diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index a9a8c43d6..c9d5ab2f2 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -128,12 +128,10 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: void RemoveLocalReference(const CObjectID &object_id) void PromoteObjectToPlasma(const CObjectID &object_id) void PromoteToPlasmaAndGetOwnershipInfo(const CObjectID &object_id, - CTaskID *owner_id, CAddress *owner_address) void RegisterOwnershipInfoAndResolveFuture( const CObjectID &object_id, const CObjectID &outer_object_id, - const CTaskID &owner_id, const CAddress &owner_address) CRayStatus SetClientOptions(c_string client_name, int64_t limit) diff --git a/python/ray/serialization.py b/python/ray/serialization.py index 33a767293..65eafcc6d 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -125,14 +125,13 @@ class SerializationContext: self.add_contained_object_id(obj) worker = ray.worker.global_worker worker.check_connected() - obj, owner_id, owner_address = ( + obj, owner_address = ( worker.core_worker.serialize_and_promote_object_id(obj)) obj = id_serializer(obj) - owner_id = id_serializer(owner_id) if owner_id else owner_id - return obj, owner_id, owner_address + return obj, owner_address def object_id_deserializer(serialized_obj): - obj_id, owner_id, owner_address = serialized_obj + obj_id, owner_address = serialized_obj # NOTE(swang): Must deserialize the object first before asking # the core worker to resolve the value. This is to make sure # that the ref count for the ObjectID is greater than 0 by the @@ -142,7 +141,7 @@ class SerializationContext: # to 'self' here instead, but this function is itself pickled # somewhere, which causes an error. context = ray.worker.global_worker.get_serialization_context() - if owner_id: + if owner_address: worker = ray.worker.global_worker worker.check_connected() # UniqueIDs are serialized as @@ -153,7 +152,7 @@ class SerializationContext: if outer_id is None: outer_id = ray.ObjectID.nil() worker.core_worker.deserialize_and_register_object_id( - obj_id[1][0], outer_id, owner_id[1][0], owner_address) + obj_id[1][0], outer_id, owner_address) return deserialized_object_id for id_type in ray._raylet._ID_TYPES: diff --git a/python/ray/tests/test_reference_counting_2.py b/python/ray/tests/test_reference_counting_2.py index 12551f012..87a1db5ed 100644 --- a/python/ray/tests/test_reference_counting_2.py +++ b/python/ray/tests/test_reference_counting_2.py @@ -23,6 +23,7 @@ def one_worker_100MiB(request): config = json.dumps({ "object_store_full_max_retries": 2, "task_retry_delay_ms": 0, + "initial_reconstruction_timeout_milliseconds": 1000, }) yield ray.init( num_cpus=1, @@ -282,6 +283,59 @@ def test_recursively_return_borrowed_object_id(one_worker_100MiB, use_ray_put, _fill_object_store_and_get(final_oid_bytes, succeed=False) +@pytest.mark.parametrize("failure", [False, True]) +def test_borrowed_id_failure(one_worker_100MiB, failure): + @ray.remote + class Parent: + def __init__(self): + pass + + def pass_ref(self, ref, borrower): + self.ref = ref[0] + ray.get(borrower.receive_ref.remote(ref)) + if failure: + sys.exit(-1) + + def ping(self): + return + + @ray.remote + class Borrower: + def __init__(self): + self.ref = None + + def receive_ref(self, ref): + self.ref = ref[0] + + def resolve_ref(self): + assert self.ref is not None + if failure: + with pytest.raises(ray.exceptions.UnreconstructableError): + ray.get(self.ref) + else: + ray.get(self.ref) + + def ping(self): + return + + parent = Parent.remote() + borrower = Borrower.remote() + ray.get(borrower.ping.remote()) + + obj = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8)) + if failure: + with pytest.raises(ray.exceptions.RayActorError): + ray.get(parent.pass_ref.remote([obj], borrower)) + else: + ray.get(parent.pass_ref.remote([obj], borrower)) + obj_bytes = obj.binary() + del obj + + _fill_object_store_and_get(obj_bytes, succeed=not failure) + # The borrower should not hang when trying to get the object's value. + ray.get(borrower.resolve_ref.remote()) + + if __name__ == "__main__": import sys sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 535c02442..ba3990510 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -448,7 +448,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ memory_store_, task_manager_, local_raylet_id, RayConfig::instance().worker_lease_timeout_milliseconds(), std::move(actor_create_callback), boost::asio::steady_timer(io_service_))); - future_resolver_.reset(new FutureResolver(memory_store_, client_factory)); + future_resolver_.reset(new FutureResolver(memory_store_, client_factory, rpc_address_)); // Unfortunately the raylet client has to be constructed after the receivers. if (direct_task_receiver_ != nullptr) { direct_task_receiver_->Init(client_factory, rpc_address_, local_raylet_client_); @@ -704,7 +704,6 @@ CoreWorker::GetAllReferenceCounts() const { } void CoreWorker::PromoteToPlasmaAndGetOwnershipInfo(const ObjectID &object_id, - TaskID *owner_id, rpc::Address *owner_address) { auto value = memory_store_->GetOrPromoteToPlasma(object_id); if (value) { @@ -713,29 +712,26 @@ void CoreWorker::PromoteToPlasmaAndGetOwnershipInfo(const ObjectID &object_id, Put(*value, /*contained_object_ids=*/{}, object_id, /*pin_object=*/true)); } - auto has_owner = reference_counter_->GetOwner(object_id, owner_id, owner_address); + auto has_owner = reference_counter_->GetOwner(object_id, owner_address); RAY_CHECK(has_owner) << "Object IDs generated randomly (ObjectID.from_random()) or out-of-band " "(ObjectID.from_binary(...)) cannot be serialized because Ray does not know " "which task will create them. " "If this was not how your object ID was generated, please file an issue " "at https://github.com/ray-project/ray/issues/"; - RAY_LOG(DEBUG) << "Promoted object to plasma " << object_id << " owned by " - << *owner_id; + RAY_LOG(DEBUG) << "Promoted object to plasma " << object_id; } void CoreWorker::RegisterOwnershipInfoAndResolveFuture( - const ObjectID &object_id, const ObjectID &outer_object_id, const TaskID &owner_id, + const ObjectID &object_id, const ObjectID &outer_object_id, const rpc::Address &owner_address) { // Add the object's owner to the local metadata in case it gets serialized // again. - reference_counter_->AddBorrowedObject(object_id, outer_object_id, owner_id, - owner_address); + reference_counter_->AddBorrowedObject(object_id, outer_object_id, owner_address); - RAY_CHECK(!owner_id.IsNil() || options_.is_local_mode); // We will ask the owner about the object until the object is // created or we can no longer reach the owner. - future_resolver_->ResolveFutureAsync(object_id, owner_id, owner_address); + future_resolver_->ResolveFutureAsync(object_id, owner_address); } Status CoreWorker::SetClientOptions(std::string name, int64_t limit_bytes) { @@ -748,10 +744,9 @@ Status CoreWorker::Put(const RayObject &object, ObjectID *object_id) { *object_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(), worker_context_.GetNextPutIndex()); - reference_counter_->AddOwnedObject(*object_id, contained_object_ids, GetCallerId(), - rpc_address_, CurrentCallSite(), object.GetSize(), - /*is_reconstructable=*/false, - ClientID::FromBinary(rpc_address_.raylet_id())); + reference_counter_->AddOwnedObject( + *object_id, contained_object_ids, rpc_address_, CurrentCallSite(), object.GetSize(), + /*is_reconstructable=*/false, ClientID::FromBinary(rpc_address_.raylet_id())); return Put(object, contained_object_ids, *object_id, /*pin_object=*/true); } @@ -799,10 +794,10 @@ Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t } // Only add the object to the reference counter if it didn't already exist. if (data) { - reference_counter_->AddOwnedObject( - *object_id, contained_object_ids, GetCallerId(), rpc_address_, CurrentCallSite(), - data_size + metadata->Size(), - /*is_reconstructable=*/false, ClientID::FromBinary(rpc_address_.raylet_id())); + reference_counter_->AddOwnedObject(*object_id, contained_object_ids, rpc_address_, + CurrentCallSite(), data_size + metadata->Size(), + /*is_reconstructable=*/false, + ClientID::FromBinary(rpc_address_.raylet_id())); } return Status::OK(); } @@ -1109,8 +1104,8 @@ void CoreWorker::SubmitTask(const RayFunction &function, const std::vectorAddPendingTask(GetCallerId(), rpc_address_, task_spec, - CurrentCallSite(), max_retries); + task_manager_->AddPendingTask(task_spec.CallerAddress(), task_spec, CurrentCallSite(), + max_retries); io_service_.post([this, task_spec]() { RAY_UNUSED(direct_task_submitter_->SubmitTask(task_spec)); }); @@ -1164,8 +1159,8 @@ Status CoreWorker::CreateActor(const RayFunction &function, max_retries = std::max((int64_t)RayConfig::instance().actor_creation_min_retries(), actor_creation_options.max_restarts); } - task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, - CurrentCallSite(), max_retries); + task_manager_->AddPendingTask(rpc_address_, task_spec, CurrentCallSite(), + max_retries); status = direct_task_submitter_->SubmitTask(task_spec); } return status; @@ -1203,8 +1198,8 @@ void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &fun if (options_.is_local_mode) { ExecuteTaskLocalMode(task_spec, actor_id); } else { - task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, - CurrentCallSite(), actor_handle->MaxTaskRetries()); + task_manager_->AddPendingTask(rpc_address_, task_spec, CurrentCallSite(), + actor_handle->MaxTaskRetries()); io_service_.post([this, task_spec]() { RAY_UNUSED(direct_actor_submitter_->SubmitTask(task_spec)); }); @@ -1218,7 +1213,7 @@ Status CoreWorker::CancelTask(const ObjectID &object_id, bool force_kill) { return Status::Invalid("Actor task cancellation is not supported."); } rpc::Address obj_addr; - if (!reference_counter_->GetOwner(object_id, nullptr, &obj_addr)) { + if (!reference_counter_->GetOwner(object_id, &obj_addr)) { return Status::Invalid("No owner found for object."); } if (obj_addr.SerializeAsString() != rpc_address_.SerializeAsString()) { @@ -1248,14 +1243,12 @@ ActorID CoreWorker::DeserializeAndRegisterActorHandle(const std::string &seriali const ObjectID &outer_object_id) { std::unique_ptr actor_handle(new ActorHandle(serialized)); const auto actor_id = actor_handle->GetActorID(); - const auto owner_id = actor_handle->GetOwnerId(); const auto owner_address = actor_handle->GetOwnerAddress(); RAY_UNUSED(AddActorHandle(std::move(actor_handle), /*is_owner_handle=*/false)); ObjectID actor_handle_id = ObjectID::ForActorHandle(actor_id); - reference_counter_->AddBorrowedObject(actor_handle_id, outer_object_id, owner_id, - owner_address); + reference_counter_->AddBorrowedObject(actor_handle_id, outer_object_id, owner_address); return actor_id; } @@ -1277,8 +1270,8 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle, const auto actor_creation_return_id = ObjectID::ForActorHandle(actor_id); if (is_owner_handle) { reference_counter_->AddOwnedObject(actor_creation_return_id, - /*inner_ids=*/{}, GetCallerId(), rpc_address_, - CurrentCallSite(), -1, + /*inner_ids=*/{}, rpc_address_, CurrentCallSite(), + -1, /*is_reconstructable=*/true); } @@ -1617,7 +1610,7 @@ void CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec, if (!task_spec.IsActorCreationTask()) { for (size_t i = 0; i < task_spec.NumReturns(); i++) { reference_counter_->AddOwnedObject(task_spec.ReturnId(i), - /*inner_ids=*/{}, GetCallerId(), rpc_address_, + /*inner_ids=*/{}, rpc_address_, CurrentCallSite(), -1, /*is_reconstructable=*/false); } @@ -1757,13 +1750,15 @@ void CoreWorker::HandleDirectActorCallArgWaitComplete( void CoreWorker::HandleGetObjectStatus(const rpc::GetObjectStatusRequest &request, rpc::GetObjectStatusReply *reply, rpc::SendReplyCallback send_reply_callback) { + if (HandleWrongRecipient(WorkerID::FromBinary(request.owner_worker_id()), + send_reply_callback)) { + RAY_LOG(INFO) << "Handling GetObjectStatus for object produced by a previous worker " + "with the same address"; + return; + } + ObjectID object_id = ObjectID::FromBinary(request.object_id()); RAY_LOG(DEBUG) << "Received GetObjectStatus " << object_id; - TaskID owner_id = TaskID::FromBinary(request.owner_id()); - if (owner_id != GetCallerId()) { - RAY_LOG(INFO) << "Handling GetObjectStatus for object produced by previous task " - << owner_id.Hex(); - } // We own the task. Reply back to the borrower once the object has been // created. // TODO(swang): We could probably just send the object value if it is small @@ -1852,15 +1847,14 @@ void CoreWorker::HandleWaitForRefRemoved(const rpc::WaitForRefRemovedRequest &re } const ObjectID &object_id = ObjectID::FromBinary(request.reference().object_id()); ObjectID contained_in_id = ObjectID::FromBinary(request.contained_in_id()); - const auto owner_id = TaskID::FromBinary(request.reference().owner_id()); const auto owner_address = request.reference().owner_address(); auto ref_removed_callback = boost::bind(&ReferenceCounter::HandleRefRemoved, reference_counter_, object_id, reply, send_reply_callback); // Set a callback to send the reply when the requested object ID's ref count // goes to 0. - reference_counter_->SetRefRemovedCallback(object_id, contained_in_id, owner_id, - owner_address, ref_removed_callback); + reference_counter_->SetRefRemovedCallback(object_id, contained_in_id, owner_address, + ref_removed_callback); } void CoreWorker::HandleRemoteCancelTask(const rpc::RemoteCancelTaskRequest &request, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index e5bf4db1e..020df00c3 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -351,11 +351,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Postcondition: Get(object_id) is valid. /// /// \param[in] object_id The object ID to serialize. - /// \param[out] owner_id The ID of the object's owner. This should be /// appended to the serialized object ID. /// \param[out] owner_address The address of the object's owner. This should /// be appended to the serialized object ID. - void PromoteToPlasmaAndGetOwnershipInfo(const ObjectID &object_id, TaskID *owner_id, + void PromoteToPlasmaAndGetOwnershipInfo(const ObjectID &object_id, rpc::Address *owner_address); /// Add a reference to an ObjectID that was deserialized by the language @@ -370,11 +369,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// any. This may be nil if the object ID was inlined directly in a task spec /// or if it was passed out-of-band by the application (deserialized from a /// byte string). - /// \param[out] owner_id The ID of the object's owner. /// \param[out] owner_address The address of the object's owner. void RegisterOwnershipInfoAndResolveFuture(const ObjectID &object_id, const ObjectID &outer_object_id, - const TaskID &owner_id, const rpc::Address &owner_address); /// diff --git a/src/ray/core_worker/future_resolver.cc b/src/ray/core_worker/future_resolver.cc index c1d64ac40..7eb6620e7 100644 --- a/src/ray/core_worker/future_resolver.cc +++ b/src/ray/core_worker/future_resolver.cc @@ -16,19 +16,25 @@ namespace ray { -void FutureResolver::ResolveFutureAsync(const ObjectID &object_id, const TaskID &owner_id, +void FutureResolver::ResolveFutureAsync(const ObjectID &object_id, const rpc::Address &owner_address) { absl::MutexLock lock(&mu_); - auto it = owner_clients_.find(owner_id); + const auto owner_worker_id = WorkerID::FromBinary(owner_address.worker_id()); + if (rpc_address_.worker_id() == owner_address.worker_id()) { + // We do not need to resolve objects that we own. This can happen if a task + // with a borrowed reference executes on the object's owning worker. + return; + } + auto it = owner_clients_.find(owner_worker_id); if (it == owner_clients_.end()) { auto client = std::shared_ptr(client_factory_(owner_address)); - it = owner_clients_.emplace(owner_id, std::move(client)).first; + it = owner_clients_.emplace(owner_worker_id, std::move(client)).first; } rpc::GetObjectStatusRequest request; request.set_object_id(object_id.Binary()); - request.set_owner_id(owner_id.Binary()); + request.set_owner_worker_id(owner_worker_id.Binary()); RAY_CHECK_OK(it->second->GetObjectStatus( request, [this, object_id](const Status &status, const rpc::GetObjectStatusReply &reply) { diff --git a/src/ray/core_worker/future_resolver.h b/src/ray/core_worker/future_resolver.h index dbe1965fc..8c8b38e04 100644 --- a/src/ray/core_worker/future_resolver.h +++ b/src/ray/core_worker/future_resolver.h @@ -28,8 +28,10 @@ namespace ray { class FutureResolver { public: FutureResolver(std::shared_ptr store, - rpc::ClientFactoryFn client_factory) - : in_memory_store_(store), client_factory_(client_factory) {} + rpc::ClientFactoryFn client_factory, const rpc::Address &rpc_address) + : in_memory_store_(store), + client_factory_(client_factory), + rpc_address_(rpc_address) {} /// Resolve the value for a future. This will periodically contact the given /// owner until the owner dies or the owner has finished creating the object. @@ -37,11 +39,9 @@ class FutureResolver { /// value. /// /// \param[in] object_id The ID of the future to resolve. - /// \param[in] owner_id The ID of the task or actor that owns the future. /// \param[in] owner_address The address of the task or actor that owns the /// future. - void ResolveFutureAsync(const ObjectID &object_id, const TaskID &owner_id, - const rpc::Address &owner_address); + void ResolveFutureAsync(const ObjectID &object_id, const rpc::Address &owner_address); private: /// Used to store values of resolved futures. @@ -50,11 +50,16 @@ class FutureResolver { /// Factory for producing new core worker clients. const rpc::ClientFactoryFn client_factory_; + /// Address of our RPC server. Used to notify borrowed objects' owners of our + /// address, so the owner can contact us to ask when our reference to the + /// object has gone out of scope. + const rpc::Address rpc_address_; + /// Protects against concurrent access to internal state. absl::Mutex mu_; /// Cache of gRPC clients to the objects' owners. - absl::flat_hash_map> + absl::flat_hash_map> owner_clients_ GUARDED_BY(mu_); }; diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index bf1e3e7ea..ddc2f5f1c 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -70,15 +70,14 @@ void ReferenceCounter::ReferenceTableToProto(const ReferenceTable &table, } bool ReferenceCounter::AddBorrowedObject(const ObjectID &object_id, - const ObjectID &outer_id, const TaskID &owner_id, + const ObjectID &outer_id, const rpc::Address &owner_address) { absl::MutexLock lock(&mutex_); - return AddBorrowedObjectInternal(object_id, outer_id, owner_id, owner_address); + return AddBorrowedObjectInternal(object_id, outer_id, owner_address); } bool ReferenceCounter::AddBorrowedObjectInternal(const ObjectID &object_id, const ObjectID &outer_id, - const TaskID &owner_id, const rpc::Address &owner_address) { auto it = object_id_refs_.find(object_id); RAY_CHECK(it != object_id_refs_.end()); @@ -87,12 +86,12 @@ bool ReferenceCounter::AddBorrowedObjectInternal(const ObjectID &object_id, // Skip adding this object as a borrower if we already have ownership info. // If we already have ownership info, then either we are the owner or someone // else already knows that we are a borrower. - if (it->second.owner.has_value()) { + if (it->second.owner_address) { RAY_LOG(DEBUG) << "Skipping add borrowed object " << object_id; return false; } - it->second.owner = {owner_id, owner_address}; + it->second.owner_address = owner_address; if (!outer_id.IsNil()) { auto outer_it = object_id_refs_.find(outer_id); @@ -147,8 +146,8 @@ void ReferenceCounter::AddObjectRefStats( void ReferenceCounter::AddOwnedObject( const ObjectID &object_id, const std::vector &inner_ids, - const TaskID &owner_id, const rpc::Address &owner_address, - const std::string &call_site, const int64_t object_size, bool is_reconstructable, + const rpc::Address &owner_address, const std::string &call_site, + const int64_t object_size, bool is_reconstructable, const absl::optional &pinned_at_raylet_id) { RAY_LOG(DEBUG) << "Adding owned object " << object_id; absl::MutexLock lock(&mutex_); @@ -157,9 +156,8 @@ void ReferenceCounter::AddOwnedObject( // If the entry doesn't exist, we initialize the direct reference count to zero // because this corresponds to a submitted task whose return ObjectID will be created // in the frontend language, incrementing the reference count. - object_id_refs_.emplace(object_id, - Reference(owner_id, owner_address, call_site, object_size, - is_reconstructable, pinned_at_raylet_id)); + object_id_refs_.emplace(object_id, Reference(owner_address, call_site, object_size, + is_reconstructable, pinned_at_raylet_id)); if (!inner_ids.empty()) { // Mark that this object ID contains other inner IDs. Then, we will not GC // the inner objects until the outer object ID goes out of scope. @@ -324,7 +322,7 @@ void ReferenceCounter::RemoveSubmittedTaskReferences( } } -bool ReferenceCounter::GetOwner(const ObjectID &object_id, TaskID *owner_id, +bool ReferenceCounter::GetOwner(const ObjectID &object_id, rpc::Address *owner_address) const { absl::MutexLock lock(&mutex_); auto it = object_id_refs_.find(object_id); @@ -332,13 +330,8 @@ bool ReferenceCounter::GetOwner(const ObjectID &object_id, TaskID *owner_id, return false; } - if (it->second.owner.has_value()) { - if (owner_id) { - *owner_id = it->second.owner.value().first; - } - if (owner_address) { - *owner_address = it->second.owner.value().second; - } + if (it->second.owner_address) { + *owner_address = *it->second.owner_address; return true; } else { return false; @@ -621,14 +614,13 @@ void ReferenceCounter::MergeRemoteBorrowers(const ObjectID &object_id, if (it == object_id_refs_.end()) { it = object_id_refs_.emplace(object_id, Reference()).first; } - if (!it->second.owner.has_value() && - borrower_ref.contained_in_borrowed_id.has_value()) { + if (!it->second.owner_address && borrower_ref.contained_in_borrowed_id.has_value()) { // We don't have owner information about this object ID yet and the worker // received it because it was nested in another ID that the worker was // borrowing. Copy this information to our local table. - RAY_CHECK(borrower_ref.owner.has_value()); + RAY_CHECK(borrower_ref.owner_address); AddBorrowedObjectInternal(object_id, *borrower_it->second.contained_in_borrowed_id, - borrower_ref.owner->first, borrower_ref.owner->second); + *borrower_ref.owner_address); } std::vector new_borrowers; @@ -683,9 +675,8 @@ void ReferenceCounter::WaitForRefRemoved(const ReferenceTable::iterator &ref_it, // Only the owner should send requests to borrowers. RAY_CHECK(ref_it->second.owned_by_us); request.mutable_reference()->set_object_id(object_id.Binary()); - request.mutable_reference()->set_owner_id(ref_it->second.owner->first.Binary()); request.mutable_reference()->mutable_owner_address()->CopyFrom( - ref_it->second.owner->second); + *ref_it->second.owner_address); request.set_contained_in_id(contained_in_id.Binary()); request.set_intended_worker_id(addr.worker_id.Binary()); @@ -801,7 +792,7 @@ void ReferenceCounter::HandleRefRemoved(const ObjectID &object_id, } void ReferenceCounter::SetRefRemovedCallback( - const ObjectID &object_id, const ObjectID &contained_in_id, const TaskID &owner_id, + const ObjectID &object_id, const ObjectID &contained_in_id, const rpc::Address &owner_address, const ReferenceCounter::ReferenceRemovedCallback &ref_removed_callback) { absl::MutexLock lock(&mutex_); @@ -853,8 +844,7 @@ void ReferenceCounter::SetReleaseLineageCallback( ReferenceCounter::Reference ReferenceCounter::Reference::FromProto( const rpc::ObjectReferenceCount &ref_count) { Reference ref; - ref.owner = {TaskID::FromBinary(ref_count.reference().owner_id()), - ref_count.reference().owner_address()}; + ref.owner_address = ref_count.reference().owner_address(); ref.local_ref_count = ref_count.has_local_ref() ? 1 : 0; for (const auto &borrower : ref_count.borrowers()) { @@ -876,9 +866,8 @@ ReferenceCounter::Reference ReferenceCounter::Reference::FromProto( } void ReferenceCounter::Reference::ToProto(rpc::ObjectReferenceCount *ref) const { - if (owner.has_value()) { - ref->mutable_reference()->set_owner_id(owner->first.Binary()); - ref->mutable_reference()->mutable_owner_address()->CopyFrom(owner->second); + if (owner_address) { + ref->mutable_reference()->mutable_owner_address()->CopyFrom(*owner_address); } bool has_local_ref = RefCount() > 0; ref->set_has_local_ref(has_local_ref); diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index 9f73c4e44..37cb2acbc 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -130,7 +130,7 @@ class ReferenceCounter { /// reference count for the ObjectID is set to zero, which assumes that an /// ObjectID for it will be created in the language frontend after this call. /// - /// TODO(swang): We could avoid copying the owner_id and owner_address since + /// TODO(swang): We could avoid copying the owner_address since /// we are the owner, but it is easier to store a copy for now, since the /// owner ID will change for workers executing normal tasks and it is /// possible to have leftover references after a task has finished. @@ -138,7 +138,6 @@ class ReferenceCounter { /// \param[in] object_id The ID of the object that we own. /// \param[in] contained_ids ObjectIDs that are contained in the object's value. /// As long as the object_id is in scope, the inner objects should not be GC'ed. - /// \param[in] owner_id The ID of the object's owner. /// \param[in] owner_address The address of the object's owner. /// \param[in] call_site Description of the call site where the reference was created. /// \param[in] object_size Object size if known, otherwise -1; @@ -146,8 +145,8 @@ class ReferenceCounter { /// through lineage re-execution. void AddOwnedObject( const ObjectID &object_id, const std::vector &contained_ids, - const TaskID &owner_id, const rpc::Address &owner_address, - const std::string &call_site, const int64_t object_size, bool is_reconstructable, + const rpc::Address &owner_address, const std::string &call_site, + const int64_t object_size, bool is_reconstructable, const absl::optional &pinned_at_raylet_id = absl::optional()) LOCKS_EXCLUDED(mutex_); @@ -165,20 +164,17 @@ class ReferenceCounter { /// if one exists. An outer_id may not exist if object_id was inlined /// directly in a task spec, or if it was passed in the application /// out-of-band. - /// \param[in] owner_id The ID of the owner of the object. This is either the /// task ID (for non-actors) or the actor ID of the owner. /// \param[in] owner_address The owner's address. bool AddBorrowedObject(const ObjectID &object_id, const ObjectID &outer_id, - const TaskID &owner_id, const rpc::Address &owner_address) - LOCKS_EXCLUDED(mutex_); + const rpc::Address &owner_address) LOCKS_EXCLUDED(mutex_); /// Get the owner ID and address of the given object. /// /// \param[in] object_id The ID of the object to look up. - /// \param[out] owner_id The TaskID of the object owner. /// \param[out] owner_address The address of the object owner. - bool GetOwner(const ObjectID &object_id, TaskID *owner_id = nullptr, - rpc::Address *owner_address = nullptr) const LOCKS_EXCLUDED(mutex_); + bool GetOwner(const ObjectID &object_id, rpc::Address *owner_address = nullptr) const + LOCKS_EXCLUDED(mutex_); /// Manually delete the objects from the reference counter. void DeleteReferences(const std::vector &object_ids) LOCKS_EXCLUDED(mutex_); @@ -200,13 +196,11 @@ class ReferenceCounter { /// This is used for cases when object_id was returned from a task that we /// submitted. Then, as long as we have contained_in_id in scope, we are /// borrowing object_id. - /// \param[in] owner_id The ID of the owner of object_id. This is either the - /// task ID (for non-actors) or the actor ID of the owner. /// \param[in] owner_address The owner of object_id's address. /// \param[in] ref_removed_callback The callback to call when we are no /// longer borrowing the object. void SetRefRemovedCallback(const ObjectID &object_id, const ObjectID &contained_in_id, - const TaskID &owner_id, const rpc::Address &owner_address, + const rpc::Address &owner_address, const ReferenceRemovedCallback &ref_removed_callback) LOCKS_EXCLUDED(mutex_); @@ -329,13 +323,13 @@ class ReferenceCounter { Reference(std::string call_site, const int64_t object_size) : call_site(call_site), object_size(object_size) {} /// Constructor for a reference that we created. - Reference(const TaskID &owner_id, const rpc::Address &owner_address, - std::string call_site, const int64_t object_size, bool is_reconstructable, + Reference(const rpc::Address &owner_address, std::string call_site, + const int64_t object_size, bool is_reconstructable, const absl::optional &pinned_at_raylet_id) : call_site(call_site), object_size(object_size), owned_by_us(true), - owner({owner_id, owner_address}), + owner_address(owner_address), is_reconstructable(is_reconstructable), pinned_at_raylet_id(pinned_at_raylet_id) {} @@ -397,10 +391,11 @@ class ReferenceCounter { /// responsible for tracking the state of the task that creates the object /// (see task_manager.h). bool owned_by_us = false; - /// The object's owner, if we know it. This has no value if the object is - /// if we do not know the object's owner (because distributed ref counting - /// is not yet implemented). - absl::optional> owner; + /// The object's owner's address, if we know it. If this process is the + /// owner, then this is added during creation of the Reference. If this is + /// process is a borrower, the borrower must add the owner's address before + /// using the ObjectID. + absl::optional owner_address; // If this object is owned by us and stored in plasma, and reference // counting is enabled, then some raylet must be pinning the object value. // This is the address of that raylet. @@ -571,7 +566,6 @@ class ReferenceCounter { /// deserializing IDs from a task's arguments, or when deserializing an ID /// during ray.get(). bool AddBorrowedObjectInternal(const ObjectID &object_id, const ObjectID &outer_id, - const TaskID &owner_id, const rpc::Address &owner_address) EXCLUSIVE_LOCKS_REQUIRED(mutex_); diff --git a/src/ray/core_worker/reference_count_test.cc b/src/ray/core_worker/reference_count_test.cc index e623eea4e..f62e6f65b 100644 --- a/src/ray/core_worker/reference_count_test.cc +++ b/src/ray/core_worker/reference_count_test.cc @@ -62,8 +62,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { } MockWorkerClient(const std::string &addr, rpc::ClientFactoryFn client_factory = nullptr) - : task_id_(TaskID::ForFakeTask()), - address_(CreateRandomAddress(addr)), + : address_(CreateRandomAddress(addr)), rc_(rpc::WorkerAddress(address_), /*distributed_ref_counting_enabled=*/true, /*lineage_pinning_enabled=*/false, client_factory) {} @@ -84,12 +83,11 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { auto borrower_callback = [=]() { const ObjectID &object_id = ObjectID::FromBinary(request.reference().object_id()); ObjectID contained_in_id = ObjectID::FromBinary(request.contained_in_id()); - const auto owner_id = TaskID::FromBinary(request.reference().owner_id()); const auto owner_address = request.reference().owner_address(); auto ref_removed_callback = boost::bind(&ReferenceCounter::HandleRefRemoved, &rc_, _1, requests_[r].first.get(), send_reply_callback); - rc_.SetRefRemovedCallback(object_id, contained_in_id, owner_id, owner_address, + rc_.SetRefRemovedCallback(object_id, contained_in_id, owner_address, ref_removed_callback); }; borrower_callbacks_[r] = borrower_callback; @@ -110,36 +108,42 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { } } + void FailAllWaitForRefRemovedRequests() { + for (const auto &request : requests_) { + request.second.second(Status::IOError("disconnected"), *request.second.first); + } + } + // The below methods mirror a core worker's operations, e.g., `Put` simulates // a ray.put(). void Put(const ObjectID &object_id) { - rc_.AddOwnedObject(object_id, {}, task_id_, address_, "", 0, false); + rc_.AddOwnedObject(object_id, {}, address_, "", 0, false); rc_.AddLocalReference(object_id, ""); } void PutWrappedId(const ObjectID outer_id, const ObjectID &inner_id) { - rc_.AddOwnedObject(outer_id, {inner_id}, task_id_, address_, "", 0, false); + rc_.AddOwnedObject(outer_id, {inner_id}, address_, "", 0, false); rc_.AddLocalReference(outer_id, ""); } void GetSerializedObjectId(const ObjectID outer_id, const ObjectID &inner_id, - const TaskID &owner_id, const rpc::Address &owner_address) { + const rpc::Address &owner_address) { rc_.AddLocalReference(inner_id, ""); - rc_.AddBorrowedObject(inner_id, outer_id, owner_id, owner_address); + rc_.AddBorrowedObject(inner_id, outer_id, owner_address); } void ExecuteTaskWithArg(const ObjectID &arg_id, const ObjectID &inner_id, - const TaskID &owner_id, const rpc::Address &owner_address) { + const rpc::Address &owner_address) { // Add a sentinel reference to keep the argument ID in scope even though // the frontend won't have a reference. rc_.AddLocalReference(arg_id, ""); - GetSerializedObjectId(arg_id, inner_id, owner_id, owner_address); + GetSerializedObjectId(arg_id, inner_id, owner_address); } ObjectID SubmitTaskWithArg(const ObjectID &arg_id) { rc_.UpdateSubmittedTaskReferences({arg_id}); ObjectID return_id = ObjectID::FromRandom(); - rc_.AddOwnedObject(return_id, {}, task_id_, address_, "", 0, false); + rc_.AddOwnedObject(return_id, {}, address_, "", 0, false); // Add a sentinel reference to keep all nested object IDs in scope. rc_.AddLocalReference(return_id, ""); return return_id; @@ -179,7 +183,6 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { // Global map from Object ID -> owner worker ID, list of objects that it depends on, // worker address that it's scheduled on. Worker map of pending return IDs. - TaskID task_id_; rpc::Address address_; // The ReferenceCounter at the "client". ReferenceCounter rc_; @@ -261,7 +264,6 @@ TEST_F(ReferenceCountTest, TestBasic) { TEST_F(ReferenceCountTest, TestUnreconstructableObjectOutOfScope) { ObjectID id = ObjectID::FromRandom(); - TaskID task_id = TaskID::ForFakeTask(); rpc::Address address; address.set_ip_address("1234"); @@ -271,7 +273,7 @@ TEST_F(ReferenceCountTest, TestUnreconstructableObjectOutOfScope) { // The object goes out of scope once it has no more refs. std::vector out; ASSERT_FALSE(rc->SetDeleteCallback(id, callback)); - rc->AddOwnedObject(id, {}, task_id, address, "", 0, false); + rc->AddOwnedObject(id, {}, address, "", 0, false); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); ASSERT_FALSE(*out_of_scope); rc->AddLocalReference(id, ""); @@ -283,7 +285,7 @@ TEST_F(ReferenceCountTest, TestUnreconstructableObjectOutOfScope) { // lineage ref count. *out_of_scope = false; ASSERT_FALSE(rc->SetDeleteCallback(id, callback)); - rc->AddOwnedObject(id, {}, task_id, address, "", 0, false); + rc->AddOwnedObject(id, {}, address, "", 0, false); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); rc->UpdateSubmittedTaskReferences({id}); ASSERT_FALSE(*out_of_scope); @@ -295,7 +297,6 @@ TEST_F(ReferenceCountTest, TestUnreconstructableObjectOutOfScope) { TEST_F(ReferenceCountTest, TestReferenceStats) { ObjectID id1 = ObjectID::FromRandom(); ObjectID id2 = ObjectID::FromRandom(); - TaskID task_id = TaskID::ForFakeTask(); rpc::Address address; address.set_ip_address("1234"); @@ -311,7 +312,7 @@ TEST_F(ReferenceCountTest, TestReferenceStats) { ASSERT_EQ(stats.object_refs(0).call_site(), "file.py:42"); rc->RemoveLocalReference(id1, nullptr); - rc->AddOwnedObject(id2, {}, task_id, address, "file2.py:43", 100, false); + rc->AddOwnedObject(id2, {}, address, "file2.py:43", 100, false); rpc::CoreWorkerStats stats2; rc->AddObjectRefStats({}, &stats2); ASSERT_EQ(stats2.object_refs_size(), 1); @@ -326,29 +327,25 @@ TEST_F(ReferenceCountTest, TestReferenceStats) { // origin we do not know. TEST_F(ReferenceCountTest, TestOwnerAddress) { auto object_id = ObjectID::FromRandom(); - TaskID task_id = TaskID::ForFakeTask(); rpc::Address address; address.set_ip_address("1234"); - rc->AddOwnedObject(object_id, {}, task_id, address, "", 0, false); + rc->AddOwnedObject(object_id, {}, address, "", 0, false); TaskID added_id; rpc::Address added_address; - ASSERT_TRUE(rc->GetOwner(object_id, &added_id, &added_address)); - ASSERT_EQ(task_id, added_id); + ASSERT_TRUE(rc->GetOwner(object_id, &added_address)); ASSERT_EQ(address.ip_address(), added_address.ip_address()); auto object_id2 = ObjectID::FromRandom(); - task_id = TaskID::ForFakeTask(); address.set_ip_address("5678"); - rc->AddOwnedObject(object_id2, {}, task_id, address, "", 0, false); - ASSERT_TRUE(rc->GetOwner(object_id2, &added_id, &added_address)); - ASSERT_EQ(task_id, added_id); + rc->AddOwnedObject(object_id2, {}, address, "", 0, false); + ASSERT_TRUE(rc->GetOwner(object_id2, &added_address)); ASSERT_EQ(address.ip_address(), added_address.ip_address()); auto object_id3 = ObjectID::FromRandom(); - ASSERT_FALSE(rc->GetOwner(object_id3, &added_id, &added_address)); + ASSERT_FALSE(rc->GetOwner(object_id3, &added_address)); rc->AddLocalReference(object_id3, ""); - ASSERT_FALSE(rc->GetOwner(object_id3, &added_id, &added_address)); + ASSERT_FALSE(rc->GetOwner(object_id3, &added_address)); } // Tests that the ref counts are properly integrated into the local @@ -412,7 +409,7 @@ TEST(DistributedReferenceCountTest, TestNoBorrow) { ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // The borrower is given a reference to the inner object. - borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->task_id_, owner->address_); + borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->address_); // The borrower submits a task that depends on the inner object. borrower->SubmitTaskWithArg(inner_id); borrower->rc_.RemoveLocalReference(inner_id, nullptr); @@ -467,7 +464,7 @@ TEST(DistributedReferenceCountTest, TestSimpleBorrower) { ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // The borrower is given a reference to the inner object. - borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->task_id_, owner->address_); + borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->address_); // The borrower submits a task that depends on the inner object. borrower->SubmitTaskWithArg(inner_id); borrower->rc_.RemoveLocalReference(inner_id, nullptr); @@ -500,6 +497,70 @@ TEST(DistributedReferenceCountTest, TestSimpleBorrower) { ASSERT_FALSE(owner->rc_.HasReference(outer_id)); } +// A borrower is given a reference to an object ID, submits a task, does not +// wait for it to finish. The borrower then fails before the task finishes. +// +// @ray.remote +// def borrower(inner_ids): +// inner_id = inner_ids[0] +// foo.remote(inner_id) +// # Process exits before task finishes. +// +// inner_id = ray.put(1) +// outer_id = ray.put([inner_id]) +// res = borrower.remote(outer_id) +TEST(DistributedReferenceCountTest, TestSimpleBorrowerFailure) { + auto borrower = std::make_shared("1"); + auto owner = std::make_shared( + "2", [&](const rpc::Address &addr) { return borrower; }); + + // The owner creates an inner object and wraps it. + auto inner_id = ObjectID::FromRandom(); + auto outer_id = ObjectID::FromRandom(); + owner->Put(inner_id); + owner->PutWrappedId(outer_id, inner_id); + + // The owner submits a task that depends on the outer object. The task will + // be given a reference to inner_id. + owner->SubmitTaskWithArg(outer_id); + // The owner's references go out of scope. + owner->rc_.RemoveLocalReference(outer_id, nullptr); + owner->rc_.RemoveLocalReference(inner_id, nullptr); + // The owner's ref count > 0 for both objects. + ASSERT_TRUE(owner->rc_.HasReference(outer_id)); + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); + + // The borrower is given a reference to the inner object. + borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->address_); + // The borrower submits a task that depends on the inner object. + borrower->SubmitTaskWithArg(inner_id); + borrower->rc_.RemoveLocalReference(inner_id, nullptr); + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); + + // The borrower task returns to the owner without waiting for its submitted + // task to finish. + auto borrower_refs = borrower->FinishExecutingTask(outer_id, ObjectID::Nil()); + // ASSERT_FALSE(borrower->rc_.HasReference(outer_id)); + // Check that the borrower's ref count for inner_id > 0 because of the + // pending task. + ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); + + // The owner receives the borrower's reply and merges the borrower's ref + // count into its own. + owner->HandleSubmittedTaskFinished(outer_id, {}, borrower->address_, borrower_refs); + borrower->FlushBorrowerCallbacks(); + // Check that owner now has borrower in inner's borrowers list. + ASSERT_TRUE(owner->rc_.HasReference(inner_id)); + // Check that owner's ref count for outer == 0 since the borrower task + // returned and there were no local references to outer_id. + ASSERT_FALSE(owner->rc_.HasReference(outer_id)); + + // The borrower fails. The owner's ref count should go to 0. + borrower->FailAllWaitForRefRemovedRequests(); + ASSERT_FALSE(owner->rc_.HasReference(inner_id)); + ASSERT_FALSE(owner->rc_.HasReference(outer_id)); +} + // A borrower is given a reference to an object ID, keeps the reference past // the task's lifetime, then deletes the reference before it hears from the // owner. @@ -534,7 +595,7 @@ TEST(DistributedReferenceCountTest, TestSimpleBorrowerReferenceRemoved) { ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // The borrower is given a reference to the inner object. - borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->task_id_, owner->address_); + borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->address_); ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); // The borrower task returns to the owner while still using inner_id. @@ -606,7 +667,7 @@ TEST(DistributedReferenceCountTest, TestBorrowerTree) { ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // Borrower 1 is given a reference to the inner object. - borrower1->ExecuteTaskWithArg(outer_id, inner_id, owner->task_id_, owner->address_); + borrower1->ExecuteTaskWithArg(outer_id, inner_id, owner->address_); // The borrower submits a task that depends on the inner object. auto outer_id2 = ObjectID::FromRandom(); borrower1->PutWrappedId(outer_id2, inner_id); @@ -635,7 +696,7 @@ TEST(DistributedReferenceCountTest, TestBorrowerTree) { // Borrower 2 starts executing. It is given a reference to the inner object // when it gets outer_id2 as an argument. - borrower2->ExecuteTaskWithArg(outer_id2, inner_id, owner->task_id_, owner->address_); + borrower2->ExecuteTaskWithArg(outer_id2, inner_id, owner->address_); ASSERT_TRUE(borrower2->rc_.HasReference(inner_id)); // Borrower 2 finishes but it is still using inner_id. borrower_refs = borrower2->FinishExecutingTask(outer_id2, ObjectID::Nil()); @@ -696,12 +757,12 @@ TEST(DistributedReferenceCountTest, TestNestedObjectNoBorrow) { ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // The borrower is given a reference to the middle object. - borrower->ExecuteTaskWithArg(outer_id, mid_id, owner->task_id_, owner->address_); + borrower->ExecuteTaskWithArg(outer_id, mid_id, owner->address_); ASSERT_TRUE(borrower->rc_.HasReference(mid_id)); ASSERT_FALSE(borrower->rc_.HasReference(inner_id)); // The borrower unwraps the inner object with ray.get. - borrower->GetSerializedObjectId(mid_id, inner_id, owner->task_id_, owner->address_); + borrower->GetSerializedObjectId(mid_id, inner_id, owner->address_); borrower->rc_.RemoveLocalReference(mid_id, nullptr); ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); // The borrower's reference to inner_id goes out of scope. @@ -761,12 +822,12 @@ TEST(DistributedReferenceCountTest, TestNestedObject) { ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // The borrower is given a reference to the middle object. - borrower->ExecuteTaskWithArg(outer_id, mid_id, owner->task_id_, owner->address_); + borrower->ExecuteTaskWithArg(outer_id, mid_id, owner->address_); ASSERT_TRUE(borrower->rc_.HasReference(mid_id)); ASSERT_FALSE(borrower->rc_.HasReference(inner_id)); // The borrower unwraps the inner object with ray.get. - borrower->GetSerializedObjectId(mid_id, inner_id, owner->task_id_, owner->address_); + borrower->GetSerializedObjectId(mid_id, inner_id, owner->address_); borrower->rc_.RemoveLocalReference(mid_id, nullptr); ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); @@ -845,7 +906,7 @@ TEST(DistributedReferenceCountTest, TestNestedObjectDifferentOwners) { owner->rc_.RemoveLocalReference(owner_id3, nullptr); // The borrower is given a reference to the middle object. - borrower1->ExecuteTaskWithArg(owner_id3, owner_id2, owner->task_id_, owner->address_); + borrower1->ExecuteTaskWithArg(owner_id3, owner_id2, owner->address_); ASSERT_TRUE(borrower1->rc_.HasReference(owner_id2)); ASSERT_FALSE(borrower1->rc_.HasReference(owner_id1)); @@ -858,11 +919,10 @@ TEST(DistributedReferenceCountTest, TestNestedObjectDifferentOwners) { // will be given a reference to owner_id2. borrower1->SubmitTaskWithArg(borrower_id); borrower1->rc_.RemoveLocalReference(borrower_id, nullptr); - borrower2->ExecuteTaskWithArg(borrower_id, owner_id2, owner->task_id_, owner->address_); + borrower2->ExecuteTaskWithArg(borrower_id, owner_id2, owner->address_); // The nested task returns while still using owner_id1. - borrower2->GetSerializedObjectId(owner_id2, owner_id1, owner->task_id_, - owner->address_); + borrower2->GetSerializedObjectId(owner_id2, owner_id1, owner->address_); borrower2->rc_.RemoveLocalReference(owner_id2, nullptr); auto borrower_refs = borrower2->FinishExecutingTask(borrower_id, ObjectID::Nil()); ASSERT_TRUE(borrower2->rc_.HasReference(owner_id1)); @@ -944,7 +1004,7 @@ TEST(DistributedReferenceCountTest, TestNestedObjectDifferentOwners2) { owner->rc_.RemoveLocalReference(owner_id3, nullptr); // The borrower is given a reference to the middle object. - borrower1->ExecuteTaskWithArg(owner_id3, owner_id2, owner->task_id_, owner->address_); + borrower1->ExecuteTaskWithArg(owner_id3, owner_id2, owner->address_); ASSERT_TRUE(borrower1->rc_.HasReference(owner_id2)); ASSERT_FALSE(borrower1->rc_.HasReference(owner_id1)); @@ -956,11 +1016,10 @@ TEST(DistributedReferenceCountTest, TestNestedObjectDifferentOwners2) { // Borrower 1 submits a task that depends on the wrapped object. The task // will be given a reference to owner_id2. borrower1->SubmitTaskWithArg(borrower_id); - borrower2->ExecuteTaskWithArg(borrower_id, owner_id2, owner->task_id_, owner->address_); + borrower2->ExecuteTaskWithArg(borrower_id, owner_id2, owner->address_); // The nested task returns while still using owner_id1. - borrower2->GetSerializedObjectId(owner_id2, owner_id1, owner->task_id_, - owner->address_); + borrower2->GetSerializedObjectId(owner_id2, owner_id1, owner->address_); borrower2->rc_.RemoveLocalReference(owner_id2, nullptr); auto borrower_refs = borrower2->FinishExecutingTask(borrower_id, ObjectID::Nil()); ASSERT_TRUE(borrower2->rc_.HasReference(owner_id1)); @@ -1038,7 +1097,7 @@ TEST(DistributedReferenceCountTest, TestBorrowerPingPong) { owner->rc_.RemoveLocalReference(inner_id, nullptr); // Borrower 1 is given a reference to the inner object. - borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->task_id_, owner->address_); + borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->address_); // The borrower submits a task that depends on the inner object. auto outer_id2 = ObjectID::FromRandom(); borrower->PutWrappedId(outer_id2, inner_id); @@ -1067,7 +1126,7 @@ TEST(DistributedReferenceCountTest, TestBorrowerPingPong) { // Owner starts executing the submitted task. It is given a second reference // to the inner object when it gets outer_id2 as an argument. - owner->ExecuteTaskWithArg(outer_id2, inner_id, owner->task_id_, owner->address_); + owner->ExecuteTaskWithArg(outer_id2, inner_id, owner->address_); ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // Owner finishes but it is still using inner_id. borrower_refs = owner->FinishExecutingTask(outer_id2, ObjectID::Nil()); @@ -1119,7 +1178,7 @@ TEST(DistributedReferenceCountTest, TestDuplicateBorrower) { ASSERT_TRUE(owner->rc_.HasReference(inner_id)); // The borrower is given a reference to the inner object. - borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->task_id_, owner->address_); + borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->address_); // The borrower submits a task that depends on the inner object. borrower->SubmitTaskWithArg(inner_id); borrower->rc_.RemoveLocalReference(inner_id, nullptr); @@ -1135,7 +1194,7 @@ TEST(DistributedReferenceCountTest, TestDuplicateBorrower) { // The borrower is given a 2nd reference to the inner object. owner->SubmitTaskWithArg(outer_id); owner->rc_.RemoveLocalReference(outer_id, nullptr); - borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->task_id_, owner->address_); + borrower->ExecuteTaskWithArg(outer_id, inner_id, owner->address_); auto borrower_refs2 = borrower->FinishExecutingTask(outer_id, ObjectID::Nil()); // The owner receives the borrower's replies and merges the borrower's ref @@ -1188,9 +1247,8 @@ TEST(DistributedReferenceCountTest, TestDuplicateNestedObject) { owner->rc_.RemoveLocalReference(owner_id2, nullptr); owner->rc_.RemoveLocalReference(owner_id3, nullptr); - borrower2->ExecuteTaskWithArg(owner_id3, owner_id2, owner->task_id_, owner->address_); - borrower2->GetSerializedObjectId(owner_id2, owner_id1, owner->task_id_, - owner->address_); + borrower2->ExecuteTaskWithArg(owner_id3, owner_id2, owner->address_); + borrower2->GetSerializedObjectId(owner_id2, owner_id1, owner->address_); borrower2->rc_.RemoveLocalReference(owner_id2, nullptr); // The nested task returns while still using owner_id1. auto borrower_refs = borrower2->FinishExecutingTask(owner_id3, ObjectID::Nil()); @@ -1198,7 +1256,7 @@ TEST(DistributedReferenceCountTest, TestDuplicateNestedObject) { ASSERT_TRUE(borrower2->FlushBorrowerCallbacks()); // The owner submits a task that is given a reference to owner_id1. - borrower1->ExecuteTaskWithArg(owner_id2, owner_id1, owner->task_id_, owner->address_); + borrower1->ExecuteTaskWithArg(owner_id2, owner_id1, owner->address_); // The borrower wraps the object ID again. auto borrower_id = ObjectID::FromRandom(); borrower1->PutWrappedId(borrower_id, owner_id1); @@ -1207,7 +1265,7 @@ TEST(DistributedReferenceCountTest, TestDuplicateNestedObject) { // will be given a reference to owner_id1. borrower1->SubmitTaskWithArg(borrower_id); borrower1->rc_.RemoveLocalReference(borrower_id, nullptr); - borrower2->ExecuteTaskWithArg(borrower_id, owner_id1, owner->task_id_, owner->address_); + borrower2->ExecuteTaskWithArg(borrower_id, owner_id1, owner->address_); // The nested task returns while still using owner_id1. // It should now have 2 local references to owner_id1, one from the owner and // one from the borrower. @@ -1362,7 +1420,7 @@ TEST(DistributedReferenceCountTest, TestReturnObjectIdBorrowChain) { // Borrower receives a reference to inner_id. It still has a reference when // the task returns. - borrower->ExecuteTaskWithArg(return_id, inner_id, owner->task_id_, owner->address_); + borrower->ExecuteTaskWithArg(return_id, inner_id, owner->address_); ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); auto borrower_refs = borrower->FinishExecutingTask(return_id, return_id); ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); @@ -1432,7 +1490,7 @@ TEST(DistributedReferenceCountTest, TestReturnBorrowedId) { // Borrower receives a reference to inner_id. It returns the inner_id as its // return value. - borrower->ExecuteTaskWithArg(return_id, inner_id, owner->task_id_, owner->address_); + borrower->ExecuteTaskWithArg(return_id, inner_id, owner->address_); ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); auto borrower_refs = borrower->FinishExecutingTask(return_id, borrower_return_id, &inner_id, &addr); @@ -1515,7 +1573,7 @@ TEST(DistributedReferenceCountTest, TestReturnBorrowedIdDeserialize) { // Borrower receives a reference to inner_id. It returns the inner_id as its // return value. - borrower->ExecuteTaskWithArg(return_id, inner_id, owner->task_id_, owner->address_); + borrower->ExecuteTaskWithArg(return_id, inner_id, owner->address_); ASSERT_TRUE(borrower->rc_.HasReference(inner_id)); auto borrower_refs = borrower->FinishExecutingTask(return_id, borrower_return_id, &inner_id, &addr); @@ -1530,8 +1588,7 @@ TEST(DistributedReferenceCountTest, TestReturnBorrowedIdDeserialize) { ASSERT_TRUE(caller->rc_.HasReference(borrower_return_id)); ASSERT_TRUE(owner->rc_.HasReference(inner_id)); - caller->GetSerializedObjectId(borrower_return_id, inner_id, owner->task_id_, - owner->address_); + caller->GetSerializedObjectId(borrower_return_id, inner_id, owner->address_); caller->rc_.RemoveLocalReference(borrower_return_id, nullptr); ASSERT_TRUE(caller->FlushBorrowerCallbacks()); caller->rc_.RemoveLocalReference(inner_id, nullptr); @@ -1660,8 +1717,7 @@ TEST(DistributedReferenceCountTest, TestReturnBorrowedIdChain) { worker->HandleSubmittedTaskFinished(ObjectID::Nil(), {{nested_return_id, {inner_id}}}); worker->FlushBorrowerCallbacks(); // Worker deserializes the inner_id and returns it. - worker->GetSerializedObjectId(nested_return_id, inner_id, nested_worker->task_id_, - nested_worker->address_); + worker->GetSerializedObjectId(nested_return_id, inner_id, nested_worker->address_); rpc::WorkerAddress addr(root->address_); auto refs = worker->FinishExecutingTask(ObjectID::Nil(), return_id, &inner_id, &addr); @@ -1739,8 +1795,7 @@ TEST(DistributedReferenceCountTest, TestReturnBorrowedIdChainOutOfOrder) { worker->HandleSubmittedTaskFinished(ObjectID::Nil(), {{nested_return_id, {inner_id}}}); worker->FlushBorrowerCallbacks(); // Worker deserializes the inner_id and returns it. - worker->GetSerializedObjectId(nested_return_id, inner_id, nested_worker->task_id_, - nested_worker->address_); + worker->GetSerializedObjectId(nested_return_id, inner_id, nested_worker->address_); rpc::WorkerAddress addr(root->address_); auto refs = worker->FinishExecutingTask(ObjectID::Nil(), return_id, &inner_id, &addr); @@ -1767,7 +1822,6 @@ TEST(DistributedReferenceCountTest, TestReturnBorrowedIdChainOutOfOrder) { TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope) { ObjectID id = ObjectID::FromRandom(); - TaskID task_id = TaskID::ForFakeTask(); rpc::Address address; address.set_ip_address("1234"); @@ -1777,7 +1831,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope) // The object goes out of scope once it has no more refs. std::vector out; ASSERT_FALSE(rc->SetDeleteCallback(id, callback)); - rc->AddOwnedObject(id, {}, task_id, address, "", 0, false); + rc->AddOwnedObject(id, {}, address, "", 0, false); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); ASSERT_FALSE(*out_of_scope); rc->AddLocalReference(id, ""); @@ -1789,7 +1843,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope) // count. *out_of_scope = false; ASSERT_FALSE(rc->SetDeleteCallback(id, callback)); - rc->AddOwnedObject(id, {}, task_id, address, "", 0, false); + rc->AddOwnedObject(id, {}, address, "", 0, false); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); rc->UpdateSubmittedTaskReferences({id}); ASSERT_FALSE(*out_of_scope); @@ -1822,7 +1876,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestBasicLineage) { ASSERT_TRUE(lineage_deleted.empty()); // We should keep lineage for owned objects. - rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0, false); + rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, false); rc->AddLocalReference(id, ""); ASSERT_TRUE(rc->HasReference(id)); rc->RemoveLocalReference(id, nullptr); @@ -1841,7 +1895,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) { for (int i = 0; i < 3; i++) { ObjectID id = ObjectID::FromRandom(); ids.push_back(id); - rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0, true); + rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true); } rc->SetReleaseLineageCallback( @@ -1895,7 +1949,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestResubmittedTask) { std::vector lineage_deleted; ObjectID id = ObjectID::FromRandom(); - rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0, true); + rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true); rc->SetReleaseLineageCallback( [&](const ObjectID &object_id, std::vector *ids_to_release) { @@ -1938,7 +1992,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { ObjectID id = ObjectID::FromRandom(); ClientID node_id = ClientID::FromRandom(); - rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0, true); + rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true); rc->AddLocalReference(id, ""); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned)); @@ -1952,7 +2006,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) { ASSERT_TRUE(deleted->count(id) > 0); deleted->clear(); - rc->AddOwnedObject(id, {}, TaskID::Nil(), rpc::Address(), "", 0, true); + rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true); rc->AddLocalReference(id, ""); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); rc->UpdateObjectPinnedAtRaylet(id, node_id); diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 7180d7329..d6b2ead52 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -24,8 +24,7 @@ const int64_t kTaskFailureThrottlingThreshold = 50; // Throttle task failure logs to once this interval. const int64_t kTaskFailureLoggingFrequencyMillis = 5000; -void TaskManager::AddPendingTask(const TaskID &caller_id, - const rpc::Address &caller_address, +void TaskManager::AddPendingTask(const rpc::Address &caller_address, const TaskSpecification &spec, const std::string &call_site, int max_retries) { RAY_LOG(DEBUG) << "Adding pending task " << spec.TaskId() << " with " << max_retries @@ -66,8 +65,8 @@ void TaskManager::AddPendingTask(const TaskID &caller_id, // the inner IDs. Note that this RPC can be received *before* the // PushTaskReply. reference_counter_->AddOwnedObject(spec.ReturnId(i), - /*inner_ids=*/{}, caller_id, caller_address, - call_site, -1, /*is_reconstructable=*/true); + /*inner_ids=*/{}, caller_address, call_site, -1, + /*is_reconstructable=*/true); } } diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index e2aea2eee..17b975a18 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -77,15 +77,13 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// Add a task that is pending execution. /// - /// \param[in] caller_id The TaskID of the calling task. /// \param[in] caller_address The rpc address of the calling task. /// \param[in] spec The spec of the pending task. /// \param[in] max_retries Number of times this task may be retried /// on failure. /// \return Void. - void AddPendingTask(const TaskID &caller_id, const rpc::Address &caller_address, - const TaskSpecification &spec, const std::string &call_site, - int max_retries = 0); + void AddPendingTask(const rpc::Address &caller_address, const TaskSpecification &spec, + const std::string &call_site, int max_retries = 0); /// Resubmit a task that has completed execution before. This is used to /// reconstruct objects stored in Plasma that were lost. diff --git a/src/ray/core_worker/test/object_recovery_manager_test.cc b/src/ray/core_worker/test/object_recovery_manager_test.cc index cfd822100..fe8d6a64f 100644 --- a/src/ray/core_worker/test/object_recovery_manager_test.cc +++ b/src/ray/core_worker/test/object_recovery_manager_test.cc @@ -151,7 +151,7 @@ class ObjectRecoveryManagerTest : public ::testing::Test { TEST_F(ObjectRecoveryManagerTest, TestNoReconstruction) { ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0, true); + ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); ASSERT_TRUE(failed_reconstructions_.empty()); ASSERT_TRUE(object_directory_->Flush() == 1); @@ -161,7 +161,7 @@ TEST_F(ObjectRecoveryManagerTest, TestNoReconstruction) { TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) { ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0, true); + ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); std::vector addresses({rpc::Address()}); object_directory_->SetLocations(object_id, addresses); @@ -174,7 +174,7 @@ TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) { TEST_F(ObjectRecoveryManagerTest, TestReconstruction) { ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0, true); + ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); task_resubmitter_->AddTask(object_id.TaskId(), {}); ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); @@ -186,7 +186,7 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstruction) { TEST_F(ObjectRecoveryManagerTest, TestReconstructionSuppression) { ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0, true); + ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); ref_counter_->AddLocalReference(object_id, ""); ASSERT_TRUE(manager_.RecoverObject(object_id).ok()); @@ -215,8 +215,7 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionChain) { std::vector dependencies; for (int i = 0; i < 3; i++) { ObjectID object_id = ObjectID::FromRandom(); - ref_counter_->AddOwnedObject(object_id, {}, TaskID::Nil(), rpc::Address(), "", 0, - true); + ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true); task_resubmitter_->AddTask(object_id.TaskId(), dependencies); dependencies = {object_id}; object_ids.push_back(object_id); diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index f02e3eb4f..cb060d6bf 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -75,13 +75,12 @@ class TaskManagerLineageTest : public TaskManagerTest { }; TEST_F(TaskManagerTest, TestTaskSuccess) { - TaskID caller_id = TaskID::Nil(); rpc::Address caller_address; ObjectID dep1 = ObjectID::FromRandom(); ObjectID dep2 = ObjectID::FromRandom(); auto spec = CreateTaskHelper(1, {dep1, dep2}); ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); - manager_.AddPendingTask(caller_id, caller_address, spec, ""); + manager_.AddPendingTask(caller_address, spec, ""); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); auto return_id = spec.ReturnId(0); @@ -114,14 +113,13 @@ TEST_F(TaskManagerTest, TestTaskSuccess) { } TEST_F(TaskManagerTest, TestTaskFailure) { - TaskID caller_id = TaskID::Nil(); rpc::Address caller_address; ObjectID dep1 = ObjectID::FromRandom(); ObjectID dep2 = ObjectID::FromRandom(); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); auto spec = CreateTaskHelper(1, {dep1, dep2}); ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); - manager_.AddPendingTask(caller_id, caller_address, spec, ""); + manager_.AddPendingTask(caller_address, spec, ""); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); auto return_id = spec.ReturnId(0); @@ -149,11 +147,10 @@ TEST_F(TaskManagerTest, TestTaskFailure) { } TEST_F(TaskManagerTest, TestPlasmaConcurrentFailure) { - TaskID caller_id = TaskID::Nil(); rpc::Address caller_address; auto spec = CreateTaskHelper(1, {}); ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); - manager_.AddPendingTask(caller_id, caller_address, spec, ""); + manager_.AddPendingTask(caller_address, spec, ""); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); auto return_id = spec.ReturnId(0); WorkerContext ctx(WorkerType::WORKER, WorkerID::FromRandom(), JobID::FromInt(0)); @@ -176,7 +173,6 @@ TEST_F(TaskManagerTest, TestPlasmaConcurrentFailure) { } TEST_F(TaskManagerTest, TestTaskReconstruction) { - TaskID caller_id = TaskID::Nil(); rpc::Address caller_address; ObjectID dep1 = ObjectID::FromRandom(); ObjectID dep2 = ObjectID::FromRandom(); @@ -184,7 +180,7 @@ TEST_F(TaskManagerTest, TestTaskReconstruction) { auto spec = CreateTaskHelper(1, {dep1, dep2}); ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); int num_retries = 3; - manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); + manager_.AddPendingTask(caller_address, spec, "", num_retries); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); auto return_id = spec.ReturnId(0); @@ -221,13 +217,12 @@ TEST_F(TaskManagerTest, TestTaskReconstruction) { } TEST_F(TaskManagerTest, TestTaskKill) { - TaskID caller_id = TaskID::Nil(); rpc::Address caller_address; ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); auto spec = CreateTaskHelper(1, {}); ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); int num_retries = 3; - manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); + manager_.AddPendingTask(caller_address, spec, "", num_retries); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 1); auto return_id = spec.ReturnId(0); @@ -248,14 +243,13 @@ TEST_F(TaskManagerTest, TestTaskKill) { // Test to make sure that the task spec and dependencies for an object are // evicted when lineage pinning is disabled in the ReferenceCounter. TEST_F(TaskManagerTest, TestLineageEvicted) { - TaskID caller_id = TaskID::Nil(); rpc::Address caller_address; ObjectID dep1 = ObjectID::FromRandom(); ObjectID dep2 = ObjectID::FromRandom(); ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0); auto spec = CreateTaskHelper(1, {dep1, dep2}); int num_retries = 3; - manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); + manager_.AddPendingTask(caller_address, spec, "", num_retries); auto return_id = spec.ReturnId(0); rpc::PushTaskReply reply; @@ -283,7 +277,6 @@ TEST_F(TaskManagerTest, TestLineageEvicted) { // Test to make sure that the task spec and dependencies for an object are // pinned when lineage pinning is enabled in the ReferenceCounter. TEST_F(TaskManagerLineageTest, TestLineagePinned) { - TaskID caller_id = TaskID::Nil(); rpc::Address caller_address; // Submit a task with 2 arguments. ObjectID dep1 = ObjectID::FromRandom(); @@ -292,7 +285,7 @@ TEST_F(TaskManagerLineageTest, TestLineagePinned) { auto spec = CreateTaskHelper(1, {dep1, dep2}); ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); int num_retries = 3; - manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); + manager_.AddPendingTask(caller_address, spec, "", num_retries); auto return_id = spec.ReturnId(0); reference_counter_->AddLocalReference(return_id, ""); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); @@ -323,7 +316,6 @@ TEST_F(TaskManagerLineageTest, TestLineagePinned) { // Test to make sure that the task spec and dependencies for an object are // evicted if the object is returned by value, instead of stored in plasma. TEST_F(TaskManagerLineageTest, TestDirectObjectNoLineage) { - TaskID caller_id = TaskID::Nil(); rpc::Address caller_address; // Submit a task with 2 arguments. ObjectID dep1 = ObjectID::FromRandom(); @@ -332,7 +324,7 @@ TEST_F(TaskManagerLineageTest, TestDirectObjectNoLineage) { auto spec = CreateTaskHelper(1, {dep1, dep2}); ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); int num_retries = 3; - manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); + manager_.AddPendingTask(caller_address, spec, "", num_retries); auto return_id = spec.ReturnId(0); reference_counter_->AddLocalReference(return_id, ""); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); @@ -358,7 +350,6 @@ TEST_F(TaskManagerLineageTest, TestDirectObjectNoLineage) { // pinned if the object goes out of scope before the task finishes. This is // needed in case the pending task fails and needs to be retried. TEST_F(TaskManagerLineageTest, TestLineagePinnedOutOfOrder) { - TaskID caller_id = TaskID::Nil(); rpc::Address caller_address; // Submit a task with 2 arguments. ObjectID dep1 = ObjectID::FromRandom(); @@ -367,7 +358,7 @@ TEST_F(TaskManagerLineageTest, TestLineagePinnedOutOfOrder) { auto spec = CreateTaskHelper(1, {dep1, dep2}); ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); int num_retries = 3; - manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); + manager_.AddPendingTask(caller_address, spec, "", num_retries); auto return_id = spec.ReturnId(0); reference_counter_->AddLocalReference(return_id, ""); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); @@ -400,7 +391,6 @@ TEST_F(TaskManagerLineageTest, TestLineagePinnedOutOfOrder) { // tasks that each depend on the previous. All tasks should be pinned until the // final object goes out of scope. TEST_F(TaskManagerLineageTest, TestRecursiveLineagePinned) { - TaskID caller_id = TaskID::Nil(); rpc::Address caller_address; ObjectID dep = ObjectID::FromRandom(); @@ -408,7 +398,7 @@ TEST_F(TaskManagerLineageTest, TestRecursiveLineagePinned) { for (int i = 0; i < 3; i++) { auto spec = CreateTaskHelper(1, {dep}); int num_retries = 3; - manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); + manager_.AddPendingTask(caller_address, spec, "", num_retries); auto return_id = spec.ReturnId(0); reference_counter_->AddLocalReference(return_id, ""); @@ -442,7 +432,6 @@ TEST_F(TaskManagerLineageTest, TestRecursiveLineagePinned) { // a direct value. All tasks should be evicted as soon as they complete, even // though the final object is still in scope. TEST_F(TaskManagerLineageTest, TestRecursiveDirectObjectNoLineage) { - TaskID caller_id = TaskID::Nil(); rpc::Address caller_address; ObjectID dep = ObjectID::FromRandom(); @@ -450,7 +439,7 @@ TEST_F(TaskManagerLineageTest, TestRecursiveDirectObjectNoLineage) { for (int i = 0; i < 3; i++) { auto spec = CreateTaskHelper(1, {dep}); int num_retries = 3; - manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); + manager_.AddPendingTask(caller_address, spec, "", num_retries); auto return_id = spec.ReturnId(0); reference_counter_->AddLocalReference(return_id, ""); @@ -482,7 +471,6 @@ TEST_F(TaskManagerLineageTest, TestRecursiveDirectObjectNoLineage) { // Test to make sure that the task manager only resubmits tasks whose specs are // pinned and that are not already pending execution. TEST_F(TaskManagerLineageTest, TestResubmitTask) { - TaskID caller_id = TaskID::Nil(); rpc::Address caller_address; // Submit a task with 2 arguments. ObjectID dep1 = ObjectID::FromRandom(); @@ -496,7 +484,7 @@ TEST_F(TaskManagerLineageTest, TestResubmitTask) { ASSERT_TRUE(resubmitted_task_deps.empty()); ASSERT_EQ(num_retries_, 0); - manager_.AddPendingTask(caller_id, caller_address, spec, "", num_retries); + manager_.AddPendingTask(caller_address, spec, "", num_retries); // A task that is already pending does not get resubmitted. ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps).ok()); ASSERT_TRUE(resubmitted_task_deps.empty()); diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 98c181983..d7816f2e7 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -143,16 +143,18 @@ message DirectActorCallArgWaitCompleteReply { } message GetObjectStatusRequest { - // The owner of the object. Note that we do not need to include - // intended_worker_id since the new worker can service this request too by - // inspecting the owner_id field. - bytes owner_id = 1; + // The ID of the worker that owns this object. This is also + // the ID of the worker that this message is intended for. + bytes owner_worker_id = 1; // Wait for this object's status. bytes object_id = 2; } message GetObjectStatusReply { - enum ObjectStatus { CREATED = 0; } + enum ObjectStatus { + CREATED = 0; + OUT_OF_SCOPE = 1; + } ObjectStatus status = 1; } @@ -226,8 +228,6 @@ message GetCoreWorkerStatsReply { message ObjectReference { // ObjectID that the worker has a reference to. bytes object_id = 1; - // The task or actor ID of the object's owner. - bytes owner_id = 2; // The address of the object's owner. Address owner_address = 3; }