From a6fed4820ea26f755af02f267a26d14a9ac9edfa Mon Sep 17 00:00:00 2001 From: Zhuohan Li Date: Tue, 11 Aug 2020 15:04:13 -0700 Subject: [PATCH] [Core] Preliminary implementation of ownership-based object directory (#9735) --- BUILD.bazel | 4 + python/ray/_raylet.pyx | 5 +- python/ray/includes/libcoreworker.pxd | 1 + src/ray/common/ray_config_def.h | 3 + src/ray/core_worker/core_worker.cc | 63 ++++- src/ray/core_worker/core_worker.h | 20 +- ...io_ray_runtime_object_NativeObjectStore.cc | 3 +- src/ray/core_worker/reference_count.cc | 30 +++ src/ray/core_worker/reference_count.h | 29 +++ .../store_provider/plasma_store_provider.cc | 8 +- .../store_provider/plasma_store_provider.h | 8 +- .../object_manager/format/object_manager.fbs | 8 + src/ray/object_manager/object_buffer_pool.cc | 7 +- src/ray/object_manager/object_buffer_pool.h | 5 +- src/ray/object_manager/object_directory.cc | 2 + src/ray/object_manager/object_directory.h | 6 +- src/ray/object_manager/object_manager.cc | 63 +++-- src/ray/object_manager/object_manager.h | 36 ++- .../ownership_based_object_directory.cc | 243 ++++++++++++++++++ .../ownership_based_object_directory.h | 88 +++++++ src/ray/object_manager/plasma/client.cc | 15 +- src/ray/object_manager/plasma/client.h | 7 +- src/ray/object_manager/plasma/common.h | 11 + src/ray/object_manager/plasma/plasma.fbs | 8 + src/ray/object_manager/plasma/protocol.cc | 15 +- src/ray/object_manager/plasma/protocol.h | 8 +- src/ray/object_manager/plasma/store.cc | 61 ++++- src/ray/object_manager/plasma/store.h | 8 +- .../test/object_manager_stress_test.cc | 16 +- .../test/object_manager_test.cc | 12 +- src/ray/protobuf/BUILD | 1 + src/ray/protobuf/core_worker.proto | 36 +++ src/ray/protobuf/object_manager.proto | 12 +- src/ray/raylet/node_manager.cc | 62 +++-- src/ray/raylet/node_manager.h | 3 +- src/ray/raylet/raylet.cc | 8 +- src/ray/raylet/reconstruction_policy.cc | 11 +- src/ray/raylet/reconstruction_policy.h | 8 +- src/ray/raylet/reconstruction_policy_test.cc | 25 +- src/ray/raylet/task_dependency_manager.cc | 15 +- src/ray/raylet/task_dependency_manager.h | 2 +- .../raylet/task_dependency_manager_test.cc | 54 ++-- src/ray/rpc/worker/core_worker_client.h | 21 ++ src/ray/rpc/worker/core_worker_server.h | 6 + 44 files changed, 882 insertions(+), 175 deletions(-) create mode 100644 src/ray/object_manager/ownership_based_object_directory.cc create mode 100644 src/ray/object_manager/ownership_based_object_directory.h diff --git a/BUILD.bazel b/BUILD.bazel index d720049c4..4994aca8c 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -246,6 +246,7 @@ cc_library( ":plasma_fbs", ":ray_common", ":ray_util", + "//src/ray/protobuf:common_cc_proto", "@arrow", "@com_github_google_glog//:glog", "@msgpack", @@ -1214,6 +1215,7 @@ cc_library( copts = COPTS, strip_include_prefix = "src", deps = [ + ":core_worker_lib", ":gcs", ":object_manager_fbs", ":object_manager_rpc", @@ -1231,6 +1233,7 @@ cc_binary( copts = COPTS, deps = [ ":object_manager", + "//src/ray/protobuf:common_cc_proto", "@com_google_googletest//:gtest_main", ], ) @@ -1242,6 +1245,7 @@ cc_binary( copts = COPTS, deps = [ ":object_manager", + "//src/ray/protobuf:common_cc_proto", "@com_google_googletest//:gtest_main", ], ) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 36e0655b2..5a52a22c1 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -761,8 +761,9 @@ cdef class CoreWorker: c_object_id[0] = object_ref.native() with nogil: check_status(CCoreWorkerProcess.GetCoreWorker().Create( - metadata, data_size, - c_object_id[0], data)) + metadata, data_size, c_object_id[0], + CCoreWorkerProcess.GetCoreWorker().GetRpcAddress(), + data)) # If data is nullptr, that means the ObjectRef already existed, # which we ignore. diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index a52f3f044..120b8a40f 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -161,6 +161,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus Create(const shared_ptr[CBuffer] &metadata, const size_t data_size, const CObjectID &object_id, + const CAddress &owner_address, shared_ptr[CBuffer] *data) CRayStatus Seal(const CObjectID &object_id, c_bool pin_object) CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms, diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index b73eb626e..c78a66be0 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -337,3 +337,6 @@ RAY_CONFIG(int64_t, gcs_server_request_timeout_seconds, 5) /// Whether to enable multi tenancy features. RAY_CONFIG(bool, enable_multi_tenancy, false) + +/// Whether start the Plasma Store as a Raylet thread. +RAY_CONFIG(bool, ownership_based_object_directory_enabled, false) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index ef3dab8eb..edd98cda3 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -735,7 +735,10 @@ CoreWorker::GetAllReferenceCounts() const { void CoreWorker::PutObjectIntoPlasma(const RayObject &object, const ObjectID &object_id) { bool object_exists; - RAY_CHECK_OK(plasma_store_provider_->Put(object, object_id, &object_exists)); + // This call will only be used by PromoteObjectToPlasma, which means that the + // object will always owned by us. + RAY_CHECK_OK(plasma_store_provider_->Put( + object, object_id, /* owner_address = */ rpc_address_, &object_exists)); if (!object_exists) { // Tell the raylet to pin the object **after** it is created. RAY_LOG(DEBUG) << "Pinning put object " << object_id; @@ -826,7 +829,8 @@ Status CoreWorker::Put(const RayObject &object, RAY_CHECK(memory_store_->Put(object, object_id)); return Status::OK(); } - RAY_RETURN_NOT_OK(plasma_store_provider_->Put(object, object_id, &object_exists)); + RAY_RETURN_NOT_OK(plasma_store_provider_->Put( + object, object_id, /* owner_address = */ rpc_address_, &object_exists)); if (!object_exists) { if (pin_object) { // Tell the raylet to pin the object **after** it is created. @@ -860,8 +864,8 @@ Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t RayConfig::instance().max_direct_call_object_size())) { *data = std::make_shared(data_size); } else { - RAY_RETURN_NOT_OK( - plasma_store_provider_->Create(metadata, data_size, *object_id, data)); + RAY_RETURN_NOT_OK(plasma_store_provider_->Create( + metadata, data_size, *object_id, /* owner_address = */ rpc_address_, data)); } // Only add the object to the reference counter if it didn't already exist. if (data) { @@ -874,12 +878,14 @@ Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t } Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t data_size, - const ObjectID &object_id, std::shared_ptr *data) { + const ObjectID &object_id, const rpc::Address &owner_address, + std::shared_ptr *data) { if (options_.is_local_mode) { return Status::NotImplemented( "Creating an object with a pre-existing ObjectID is not supported in local mode"); } else { - return plasma_store_provider_->Create(metadata, data_size, object_id, data); + return plasma_store_provider_->Create(metadata, data_size, object_id, owner_address, + data); } } @@ -1526,8 +1532,8 @@ Status CoreWorker::AllocateReturnObjects( RayConfig::instance().max_direct_call_object_size()) { data_buffer = std::make_shared(data_sizes[i]); } else { - RAY_RETURN_NOT_OK( - Create(metadatas[i], data_sizes[i], object_ids[i], &data_buffer)); + RAY_RETURN_NOT_OK(Create(metadatas[i], data_sizes[i], object_ids[i], + owner_address, &data_buffer)); object_already_exists = !data_buffer; } } @@ -1903,6 +1909,47 @@ void CoreWorker::HandleWaitForObjectEviction( } } +void CoreWorker::HandleAddObjectLocationOwner( + const rpc::AddObjectLocationOwnerRequest &request, + rpc::AddObjectLocationOwnerReply *reply, rpc::SendReplyCallback send_reply_callback) { + if (HandleWrongRecipient(WorkerID::FromBinary(request.intended_worker_id()), + send_reply_callback)) { + return; + } + reference_counter_->AddObjectLocation(ObjectID::FromBinary(request.object_id()), + ClientID::FromBinary(request.client_id())); + send_reply_callback(Status::OK(), nullptr, nullptr); +} + +void CoreWorker::HandleRemoveObjectLocationOwner( + const rpc::RemoveObjectLocationOwnerRequest &request, + rpc::RemoveObjectLocationOwnerReply *reply, + rpc::SendReplyCallback send_reply_callback) { + if (HandleWrongRecipient(WorkerID::FromBinary(request.intended_worker_id()), + send_reply_callback)) { + return; + } + reference_counter_->RemoveObjectLocation(ObjectID::FromBinary(request.object_id()), + ClientID::FromBinary(request.client_id())); + send_reply_callback(Status::OK(), nullptr, nullptr); +} + +void CoreWorker::HandleGetObjectLocationsOwner( + const rpc::GetObjectLocationsOwnerRequest &request, + rpc::GetObjectLocationsOwnerReply *reply, + rpc::SendReplyCallback send_reply_callback) { + if (HandleWrongRecipient(WorkerID::FromBinary(request.intended_worker_id()), + send_reply_callback)) { + return; + } + std::unordered_set client_ids = + reference_counter_->GetObjectLocations(ObjectID::FromBinary(request.object_id())); + for (const auto &client_id : client_ids) { + reply->add_client_ids(client_id.Binary()); + } + send_reply_callback(Status::OK(), nullptr, nullptr); +} + void CoreWorker::HandleWaitForRefRemoved(const rpc::WaitForRefRemovedRequest &request, rpc::WaitForRefRemovedReply *reply, rpc::SendReplyCallback send_reply_callback) { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 0549063ba..c1eb621bb 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -466,10 +466,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] metadata Metadata of the object to be written. /// \param[in] data_size Size of the object to be written. /// \param[in] object_id Object ID specified by the user. + /// \param[in] owner_address The address of the object's owner. /// \param[out] data Buffer for the user to write the object into. /// \return Status. Status Create(const std::shared_ptr &metadata, const size_t data_size, - const ObjectID &object_id, std::shared_ptr *data); + const ObjectID &object_id, const rpc::Address &owner_address, + std::shared_ptr *data); /// Finalize placing an object into the object store. This should be called after /// a corresponding `Create()` call and then writing into the returned buffer. @@ -763,6 +765,22 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { rpc::WaitForRefRemovedReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Implements gRPC server handler. + void HandleAddObjectLocationOwner(const rpc::AddObjectLocationOwnerRequest &request, + rpc::AddObjectLocationOwnerReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + + /// Implements gRPC server handler. + void HandleRemoveObjectLocationOwner( + const rpc::RemoveObjectLocationOwnerRequest &request, + rpc::RemoveObjectLocationOwnerReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + + /// Implements gRPC server handler. + void HandleGetObjectLocationsOwner(const rpc::GetObjectLocationsOwnerRequest &request, + rpc::GetObjectLocationsOwnerReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Implements gRPC server handler. void HandleKillActor(const rpc::KillActorRequest &request, rpc::KillActorReply *reply, rpc::SendReplyCallback send_reply_callback) override; diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc index 81bbaeea9..89a802b20 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc @@ -38,7 +38,8 @@ ray::Status PutSerializedObject(JNIEnv *env, jobject obj, ray::ObjectID object_i out_object_id, &data); } else { status = ray::CoreWorkerProcess::GetCoreWorker().Create( - native_ray_object->GetMetadata(), data_size, object_id, &data); + native_ray_object->GetMetadata(), data_size, object_id, + ray::CoreWorkerProcess::GetCoreWorker().GetRpcAddress(), &data); *out_object_id = object_id; } if (!status.ok()) { diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 2bbe3f296..d1bbfe9dd 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -889,6 +889,36 @@ void ReferenceCounter::SetReleaseLineageCallback( on_lineage_released_ = callback; } +void ReferenceCounter::AddObjectLocation(const ObjectID &object_id, + const ClientID &node_id) { + absl::MutexLock lock(&mutex_); + auto it = object_id_locations_.find(object_id); + if (it == object_id_locations_.end()) { + it = object_id_locations_.emplace(object_id, absl::flat_hash_set()).first; + } + it->second.insert(node_id); +} + +void ReferenceCounter::RemoveObjectLocation(const ObjectID &object_id, + const ClientID &node_id) { + absl::MutexLock lock(&mutex_); + auto it = object_id_locations_.find(object_id); + RAY_CHECK(it != object_id_locations_.end()); + it->second.erase(node_id); +} + +std::unordered_set ReferenceCounter::GetObjectLocations( + const ObjectID &object_id) { + absl::MutexLock lock(&mutex_); + auto it = object_id_locations_.find(object_id); + RAY_CHECK(it != object_id_locations_.end()); + std::unordered_set locations; + for (const auto &location : it->second) { + locations.insert(location); + } + return locations; +} + ReferenceCounter::Reference ReferenceCounter::Reference::FromProto( const rpc::ObjectReferenceCount &ref_count) { Reference ref; diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index f9b30cf75..812e65ce0 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -357,6 +357,27 @@ class ReferenceCounter : public ReferenceCounterInterface { const absl::flat_hash_map> pinned_objects, rpc::CoreWorkerStats *stats) const LOCKS_EXCLUDED(mutex_); + /// Add location to the location table of the given object. + /// + /// \param[in] object_id The object to update. + /// \param[in] node_id The node to be added to the location table. + void AddObjectLocation(const ObjectID &object_id, const ClientID &node_id) + LOCKS_EXCLUDED(mutex_); + + /// Remove location from the location table of the given object. + /// + /// \param[in] object_id The object to update. + /// \param[in] node_id The node to be removed from the location table. + void RemoveObjectLocation(const ObjectID &object_id, const ClientID &node_id) + LOCKS_EXCLUDED(mutex_); + + /// Get the locations from the location table of the given object. + /// + /// \param[in] object_id The object to get locations for. + /// \return The nodes that have the object. + std::unordered_set GetObjectLocations(const ObjectID &object_id) + LOCKS_EXCLUDED(mutex_); + private: struct Reference { /// Constructor for a reference whose origin is unknown. @@ -659,6 +680,14 @@ class ReferenceCounter : public ReferenceCounterInterface { /// Holds all reference counts and dependency information for tracked ObjectIDs. ReferenceTable object_id_refs_ GUARDED_BY(mutex_); + using LocationTable = absl::flat_hash_map>; + + /// Holds the client information for the owned objects. This table is seperate from + /// the reference table because we add object reference after putting object into the + /// plasma store and add the location to the object directory. Therefore we will receive + /// object location information before the reference is created. + LocationTable object_id_locations_ GUARDED_BY(mutex_); + /// Objects whose values have been freed by the language frontend. /// The values in plasma will not be pinned. An object ID is /// removed from this set once its Reference has been deleted diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index ae54cda74..f26647aa3 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -56,12 +56,13 @@ Status CoreWorkerPlasmaStoreProvider::SetClientOptions(std::string name, Status CoreWorkerPlasmaStoreProvider::Put(const RayObject &object, const ObjectID &object_id, + const rpc::Address &owner_address, bool *object_exists) { RAY_CHECK(!object.IsInPlasmaError()) << object_id; std::shared_ptr data; RAY_RETURN_NOT_OK(Create(object.GetMetadata(), object.HasData() ? object.GetData()->Size() : 0, object_id, - &data)); + owner_address, &data)); // data could be a nullptr if the ObjectID already existed, but this does // not throw an error. if (data != nullptr) { @@ -81,6 +82,7 @@ Status CoreWorkerPlasmaStoreProvider::Put(const RayObject &object, Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &metadata, const size_t data_size, const ObjectID &object_id, + const rpc::Address &owner_address, std::shared_ptr *data) { int32_t retries = 0; int32_t max_retries = RayConfig::instance().object_store_full_max_retries(); @@ -95,7 +97,7 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &meta std::shared_ptr arrow_buffer; { std::lock_guard guard(store_client_mutex_); - plasma_status = store_client_.Create(object_id, data_size, + plasma_status = store_client_.Create(object_id, owner_address, data_size, metadata ? metadata->Data() : nullptr, metadata ? metadata->Size() : 0, &arrow_buffer, /*device_num=*/0, evict_if_full); @@ -423,7 +425,7 @@ void CoreWorkerPlasmaStoreProvider::WarnIfAttemptedTooManyTimes( Status CoreWorkerPlasmaStoreProvider::WarmupStore() { ObjectID object_id = ObjectID::FromRandom(); std::shared_ptr data; - RAY_RETURN_NOT_OK(Create(nullptr, 8, object_id, &data)); + RAY_RETURN_NOT_OK(Create(nullptr, 8, object_id, rpc::Address(), &data)); RAY_RETURN_NOT_OK(Seal(object_id)); RAY_RETURN_NOT_OK(Release(object_id)); RAY_RETURN_NOT_OK(Delete({object_id}, false, false)); diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index b8a0fd143..1a6867b49 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -52,10 +52,12 @@ class CoreWorkerPlasmaStoreProvider { /// /// \param[in] object The object to create. /// \param[in] object_id The ID of the object. + /// \param[in] owner_address The address of the object's owner. /// \param[out] object_exists Optional. Returns whether an object with the /// same ID already exists. If this is true, then the Put does not write any /// object data. - Status Put(const RayObject &object, const ObjectID &object_id, bool *object_exists); + Status Put(const RayObject &object, const ObjectID &object_id, + const rpc::Address &owner_address, bool *object_exists); /// Create an object in plasma and return a mutable buffer to it. The buffer should be /// subsequently written to and then sealed using Seal(). @@ -63,9 +65,11 @@ class CoreWorkerPlasmaStoreProvider { /// \param[in] metadata The metadata of the object. /// \param[in] data_size The size of the object. /// \param[in] object_id The ID of the object. + /// \param[in] owner_address The address of the object's owner. /// \param[out] data The mutable object buffer in plasma that can be written to. Status Create(const std::shared_ptr &metadata, const size_t data_size, - const ObjectID &object_id, std::shared_ptr *data); + const ObjectID &object_id, const rpc::Address &owner_address, + std::shared_ptr *data); /// Seal an object buffer created with Create(). /// diff --git a/src/ray/object_manager/format/object_manager.fbs b/src/ray/object_manager/format/object_manager.fbs index 887c87bbe..23a77abe8 100644 --- a/src/ray/object_manager/format/object_manager.fbs +++ b/src/ray/object_manager/format/object_manager.fbs @@ -19,6 +19,14 @@ namespace ray.object_manager.protocol; table ObjectInfo { // Object ID of this object. object_id: string; + // Owner raylet ID of this object. + owner_raylet_id: string; + // Owner IP address of this object. + owner_ip_address: string; + // Owner port address of this object. + owner_port: int; + // Unique id for the owner worker. + owner_worker_id: string; // Number of bytes the content of this object occupies in memory. data_size: long; // Number of bytes the metadata of this object occupies in memory. diff --git a/src/ray/object_manager/object_buffer_pool.cc b/src/ray/object_manager/object_buffer_pool.cc index 22805bbf8..a81126289 100644 --- a/src/ray/object_manager/object_buffer_pool.cc +++ b/src/ray/object_manager/object_buffer_pool.cc @@ -97,14 +97,15 @@ void ObjectBufferPool::AbortGet(const ObjectID &object_id) { } std::pair ObjectBufferPool::CreateChunk( - const ObjectID &object_id, uint64_t data_size, uint64_t metadata_size, - uint64_t chunk_index) { + const ObjectID &object_id, const rpc::Address &owner_address, uint64_t data_size, + uint64_t metadata_size, uint64_t chunk_index) { std::lock_guard lock(pool_mutex_); if (create_buffer_state_.count(object_id) == 0) { int64_t object_size = data_size - metadata_size; // Try to create shared buffer. std::shared_ptr data; - Status s = store_client_.Create(object_id, object_size, NULL, metadata_size, &data); + Status s = store_client_.Create(object_id, owner_address, object_size, NULL, + metadata_size, &data); std::vector buffer; if (!s.ok()) { // Create failed. The object may already exist locally. If something else went diff --git a/src/ray/object_manager/object_buffer_pool.h b/src/ray/object_manager/object_buffer_pool.h index 8b7017675..ee013563f 100644 --- a/src/ray/object_manager/object_buffer_pool.h +++ b/src/ray/object_manager/object_buffer_pool.h @@ -100,6 +100,7 @@ class ObjectBufferPool { /// SealChunk has already been invoked. /// /// \param object_id The ObjectID. + /// \param owner_address The address of the object's owner. /// \param data_size The sum of the object size and metadata size. /// \param metadata_size The size of the metadata. /// \param chunk_index The index of the chunk. @@ -108,8 +109,8 @@ class ObjectBufferPool { /// or if create is invoked consecutively on the same chunk /// (with no intermediate AbortCreateChunk). std::pair CreateChunk( - const ObjectID &object_id, uint64_t data_size, uint64_t metadata_size, - uint64_t chunk_index); + const ObjectID &object_id, const rpc::Address &owner_address, uint64_t data_size, + uint64_t metadata_size, uint64_t chunk_index); /// Abort the create operation associated with a chunk at chunk_index. /// This method will fail if it's invoked on a chunk_index on which diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index 25023f831..973d457b4 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -126,6 +126,7 @@ void ObjectDirectory::HandleClientRemoved(const ClientID &client_id) { ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_id, const ObjectID &object_id, + const rpc::Address &owner_address, const OnLocationsFound &callback) { ray::Status status = ray::Status::OK(); auto it = listeners_.find(object_id); @@ -200,6 +201,7 @@ ray::Status ObjectDirectory::UnsubscribeObjectLocations(const UniqueID &callback } ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id, + const rpc::Address &owner_address, const OnLocationsFound &callback) { ray::Status status; auto it = listeners_.find(object_id); diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index 426992f57..a186e766f 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -68,6 +68,7 @@ class ObjectDirectoryInterface { /// \param callback Invoked with (possibly empty) list of client ids and object_id. /// \return Status of whether async call to backend succeeded. virtual ray::Status LookupLocations(const ObjectID &object_id, + const rpc::Address &owner_address, const OnLocationsFound &callback) = 0; /// Handle the removal of an object manager client. This updates the @@ -92,6 +93,7 @@ class ObjectDirectoryInterface { /// \return Status of whether subscription succeeded. virtual ray::Status SubscribeObjectLocations(const UniqueID &callback_id, const ObjectID &object_id, + const rpc::Address &owner_address, const OnLocationsFound &callback) = 0; /// Unsubscribe to object location notifications. @@ -149,12 +151,14 @@ class ObjectDirectory : public ObjectDirectoryInterface { std::vector LookupAllRemoteConnections() const override; ray::Status LookupLocations(const ObjectID &object_id, + const rpc::Address &owner_address, const OnLocationsFound &callback) override; void HandleClientRemoved(const ClientID &client_id) override; ray::Status SubscribeObjectLocations(const UniqueID &callback_id, const ObjectID &object_id, + const rpc::Address &owner_address, const OnLocationsFound &callback) override; ray::Status UnsubscribeObjectLocations(const UniqueID &callback_id, const ObjectID &object_id) override; @@ -171,7 +175,7 @@ class ObjectDirectory : public ObjectDirectoryInterface { /// ObjectDirectory should not be copied. RAY_DISALLOW_COPY_AND_ASSIGN(ObjectDirectory); - private: + protected: /// Callbacks associated with a call to GetLocations. struct LocationListenerState { /// The callback to invoke when object locations are found. diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 3c20987c6..ca32165ab 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -162,7 +162,8 @@ ray::Status ObjectManager::SubscribeObjDeleted( return ray::Status::OK(); } -ray::Status ObjectManager::Pull(const ObjectID &object_id) { +ray::Status ObjectManager::Pull(const ObjectID &object_id, + const rpc::Address &owner_address) { RAY_LOG(DEBUG) << "Pull on " << self_node_id_ << " of object " << object_id; // Check if object is already local. if (local_objects_.count(object_id) != 0) { @@ -179,7 +180,7 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) { // be received if the list of locations is empty. The set of client IDs has // no ordering guarantee between notifications. return object_directory_->SubscribeObjectLocations( - object_directory_pull_callback_id_, object_id, + object_directory_pull_callback_id_, object_id, owner_address, [this](const ObjectID &object_id, const std::unordered_set &client_ids) { // Exit if the Pull request has already been fulfilled or canceled. auto it = pull_requests_.find(object_id); @@ -445,16 +446,22 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { uint64_t metadata_size = static_cast(object_info.metadata_size); uint64_t num_chunks = buffer_pool_.GetNumChunks(data_size); + rpc::Address owner_address; + owner_address.set_raylet_id(object_info.owner_raylet_id); + owner_address.set_ip_address(object_info.owner_ip_address); + owner_address.set_port(object_info.owner_port); + owner_address.set_worker_id(object_info.owner_worker_id); + RAY_LOG(DEBUG) << "Sending object chunks of " << object_id << " to client " << client_id << ", number of chunks: " << num_chunks << ", total data size: " << data_size; UniqueID push_id = UniqueID::FromRandom(); for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) { - rpc_service_.post([this, push_id, object_id, client_id, data_size, metadata_size, - chunk_index, rpc_client]() { - auto st = SendObjectChunk(push_id, object_id, client_id, data_size, metadata_size, - chunk_index, rpc_client); + rpc_service_.post([this, push_id, object_id, owner_address, client_id, data_size, + metadata_size, chunk_index, rpc_client]() { + auto st = SendObjectChunk(push_id, object_id, owner_address, client_id, data_size, + metadata_size, chunk_index, rpc_client); if (!st.ok()) { RAY_LOG(WARNING) << "Send object " << object_id << " chunk failed due to " << st.message() << ", chunk index " << chunk_index; @@ -469,14 +476,15 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { } ray::Status ObjectManager::SendObjectChunk( - const UniqueID &push_id, const ObjectID &object_id, const ClientID &client_id, - uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index, - std::shared_ptr rpc_client) { + const UniqueID &push_id, const ObjectID &object_id, const rpc::Address &owner_address, + const ClientID &client_id, uint64_t data_size, uint64_t metadata_size, + uint64_t chunk_index, std::shared_ptr rpc_client) { double start_time = absl::GetCurrentTimeNanos() / 1e9; rpc::PushRequest push_request; // Set request header push_request.set_push_id(push_id.Binary()); push_request.set_object_id(object_id.Binary()); + push_request.mutable_owner_address()->CopyFrom(owner_address); push_request.set_client_id(self_node_id_.Binary()); push_request.set_data_size(data_size); push_request.set_metadata_size(metadata_size); @@ -530,24 +538,24 @@ void ObjectManager::CancelPull(const ObjectID &object_id) { pull_requests_.erase(it); } -ray::Status ObjectManager::Wait(const std::vector &object_ids, - int64_t timeout_ms, uint64_t num_required_objects, - bool wait_local, const WaitCallback &callback) { +ray::Status ObjectManager::Wait( + const std::vector &object_ids, + const std::unordered_map &owner_addresses, int64_t timeout_ms, + uint64_t num_required_objects, bool wait_local, const WaitCallback &callback) { UniqueID wait_id = UniqueID::FromRandom(); RAY_LOG(DEBUG) << "Wait request " << wait_id << " on " << self_node_id_; - RAY_RETURN_NOT_OK(AddWaitRequest(wait_id, object_ids, timeout_ms, num_required_objects, - wait_local, callback)); + RAY_RETURN_NOT_OK(AddWaitRequest(wait_id, object_ids, owner_addresses, timeout_ms, + num_required_objects, wait_local, callback)); RAY_RETURN_NOT_OK(LookupRemainingWaitObjects(wait_id)); // LookupRemainingWaitObjects invokes SubscribeRemainingWaitObjects once lookup has // been performed on all remaining objects. return ray::Status::OK(); } -ray::Status ObjectManager::AddWaitRequest(const UniqueID &wait_id, - const std::vector &object_ids, - int64_t timeout_ms, - uint64_t num_required_objects, bool wait_local, - const WaitCallback &callback) { +ray::Status ObjectManager::AddWaitRequest( + const UniqueID &wait_id, const std::vector &object_ids, + const std::unordered_map &owner_addresses, int64_t timeout_ms, + uint64_t num_required_objects, bool wait_local, const WaitCallback &callback) { RAY_CHECK(timeout_ms >= 0 || timeout_ms == -1); RAY_CHECK(num_required_objects != 0); RAY_CHECK(num_required_objects <= object_ids.size()) @@ -560,6 +568,7 @@ ray::Status ObjectManager::AddWaitRequest(const UniqueID &wait_id, active_wait_requests_.emplace(wait_id, WaitState(*main_service_, timeout_ms, callback)); auto &wait_state = active_wait_requests_.find(wait_id)->second; wait_state.object_id_order = object_ids; + wait_state.owner_addresses = owner_addresses; wait_state.timeout_ms = timeout_ms; wait_state.num_required_objects = num_required_objects; wait_state.wait_local = wait_local; @@ -589,8 +598,9 @@ ray::Status ObjectManager::LookupRemainingWaitObjects(const UniqueID &wait_id) { // Lookup remaining objects. wait_state.requested_objects.insert(object_id); RAY_RETURN_NOT_OK(object_directory_->LookupLocations( - object_id, [this, wait_id](const ObjectID &lookup_object_id, - const std::unordered_set &client_ids) { + object_id, wait_state.owner_addresses[object_id], + [this, wait_id](const ObjectID &lookup_object_id, + const std::unordered_set &client_ids) { auto &wait_state = active_wait_requests_.find(wait_id)->second; // Note that the object is guaranteed to be added to local_objects_ before // the notification is triggered. @@ -629,7 +639,7 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) { wait_state.requested_objects.insert(object_id); // Subscribe to object notifications. RAY_CHECK_OK(object_directory_->SubscribeObjectLocations( - wait_id, object_id, + wait_id, object_id, wait_state.owner_addresses[object_id], [this, wait_id](const ObjectID &subscribe_object_id, const std::unordered_set &client_ids) { auto object_id_wait_state = active_wait_requests_.find(wait_id); @@ -728,11 +738,12 @@ void ObjectManager::HandlePush(const rpc::PushRequest &request, rpc::PushReply * uint64_t chunk_index = request.chunk_index(); uint64_t metadata_size = request.metadata_size(); uint64_t data_size = request.data_size(); + const rpc::Address &owner_address = request.owner_address(); const std::string &data = request.data(); double start_time = absl::GetCurrentTimeNanos() / 1e9; - auto status = ReceiveObjectChunk(client_id, object_id, data_size, metadata_size, - chunk_index, data); + auto status = ReceiveObjectChunk(client_id, object_id, owner_address, data_size, + metadata_size, chunk_index, data); double end_time = absl::GetCurrentTimeNanos() / 1e9; HandleReceiveFinished(object_id, client_id, chunk_index, start_time, end_time, status); @@ -741,6 +752,7 @@ void ObjectManager::HandlePush(const rpc::PushRequest &request, rpc::PushReply * ray::Status ObjectManager::ReceiveObjectChunk(const ClientID &client_id, const ObjectID &object_id, + const rpc::Address &owner_address, uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index, const std::string &data) { @@ -750,7 +762,8 @@ ray::Status ObjectManager::ReceiveObjectChunk(const ClientID &client_id, << ", object size: " << data_size; std::pair chunk_status = - buffer_pool_.CreateChunk(object_id, data_size, metadata_size, chunk_index); + buffer_pool_.CreateChunk(object_id, owner_address, data_size, metadata_size, + chunk_index); ray::Status status; ObjectBufferPool::ChunkInfo chunk_info = chunk_status.first; if (chunk_status.second.ok()) { diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 92d7b4f91..2b7142bdd 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -26,6 +26,12 @@ #include #include +#include +#include +#include + +#include "absl/container/flat_hash_map.h" +#include "absl/container/flat_hash_set.h" #include "absl/time/clock.h" #include "ray/common/id.h" #include "ray/common/ray_config.h" @@ -34,6 +40,7 @@ #include "ray/object_manager/notification/object_store_notification_manager_ipc.h" #include "ray/object_manager/object_buffer_pool.h" #include "ray/object_manager/object_directory.h" +#include "ray/object_manager/ownership_based_object_directory.h" #include "ray/object_manager/plasma/store_runner.h" #include "ray/rpc/object_manager/object_manager_client.h" #include "ray/rpc/object_manager/object_manager_server.h" @@ -87,7 +94,8 @@ class ObjectStoreRunner { class ObjectManagerInterface { public: - virtual ray::Status Pull(const ObjectID &object_id) = 0; + virtual ray::Status Pull(const ObjectID &object_id, + const rpc::Address &owner_address) = 0; virtual void CancelPull(const ObjectID &object_id) = 0; virtual ~ObjectManagerInterface(){}; }; @@ -132,11 +140,13 @@ class ObjectManager : public ObjectManagerInterface, /// contains only one chunk /// \param push_id Unique push id to indicate this push request /// \param object_id Object id + /// \param owner_address The address of the object's owner /// \param data_size Data size /// \param metadata_size Metadata size /// \param chunk_index Chunk index of this object chunk, start with 0 /// \param rpc_client Rpc client used to send message to remote object manager ray::Status SendObjectChunk(const UniqueID &push_id, const ObjectID &object_id, + const rpc::Address &owner_address, const ClientID &client_id, uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index, std::shared_ptr rpc_client); @@ -145,13 +155,15 @@ class ObjectManager : public ObjectManagerInterface, /// /// \param client_id Client id of remote object manager which sends this chunk /// \param object_id Object id + /// \param owner_address The address of the object's owner /// \param data_size Data size /// \param metadata_size Metadata size /// \param chunk_index Chunk index /// \param data Chunk data ray::Status ReceiveObjectChunk(const ClientID &client_id, const ObjectID &object_id, - uint64_t data_size, uint64_t metadata_size, - uint64_t chunk_index, const std::string &data); + const rpc::Address &owner_address, uint64_t data_size, + uint64_t metadata_size, uint64_t chunk_index, + const std::string &data); /// Send pull request /// @@ -215,7 +227,7 @@ class ObjectManager : public ObjectManagerInterface, /// /// \param object_id The object's object id. /// \return Status of whether the pull request successfully initiated. - ray::Status Pull(const ObjectID &object_id) override; + ray::Status Pull(const ObjectID &object_id, const rpc::Address &owner_address) override; /// Try to Pull an object from one of its expected client locations. If there /// are more client locations to try after this attempt, then this method @@ -249,8 +261,9 @@ class ObjectManager : public ObjectManagerInterface, /// \param callback Invoked when either timeout_ms is satisfied OR num_ready_objects /// is satisfied. /// \return Status of whether the wait successfully initiated. - ray::Status Wait(const std::vector &object_ids, int64_t timeout_ms, - uint64_t num_required_objects, bool wait_local, + ray::Status Wait(const std::vector &object_ids, + const std::unordered_map &owner_addresses, + int64_t timeout_ms, uint64_t num_required_objects, bool wait_local, const WaitCallback &callback); /// Free a list of objects from object store. @@ -302,6 +315,8 @@ class ObjectManager : public ObjectManagerInterface, WaitCallback callback; /// Ordered input object_ids. std::vector object_id_order; + /// Objects' owners. + std::unordered_map owner_addresses; /// The objects that have not yet been found. std::unordered_set remaining; /// The objects that have been found. Note that if wait_local is true, then @@ -314,10 +329,11 @@ class ObjectManager : public ObjectManagerInterface, }; /// Creates a wait request and adds it to active_wait_requests_. - ray::Status AddWaitRequest(const UniqueID &wait_id, - const std::vector &object_ids, int64_t timeout_ms, - uint64_t num_required_objects, bool wait_local, - const WaitCallback &callback); + ray::Status AddWaitRequest( + const UniqueID &wait_id, const std::vector &object_ids, + const std::unordered_map &owner_addresses, + int64_t timeout_ms, uint64_t num_required_objects, bool wait_local, + const WaitCallback &callback); /// Lookup any remaining objects that are not local. This is invoked after /// the wait request is created and local objects are identified. diff --git a/src/ray/object_manager/ownership_based_object_directory.cc b/src/ray/object_manager/ownership_based_object_directory.cc new file mode 100644 index 000000000..0a8d8c22d --- /dev/null +++ b/src/ray/object_manager/ownership_based_object_directory.cc @@ -0,0 +1,243 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/object_manager/ownership_based_object_directory.h" + +namespace ray { + +OwnershipBasedObjectDirectory::OwnershipBasedObjectDirectory( + boost::asio::io_service &io_service, std::shared_ptr &gcs_client) + : ObjectDirectory(io_service, gcs_client), client_call_manager_(io_service) {} + +namespace { + +/// Filter out the removed clients from the object locations. +void FilterRemovedClients(std::shared_ptr gcs_client, + std::unordered_set *node_ids) { + for (auto it = node_ids->begin(); it != node_ids->end();) { + if (gcs_client->Nodes().IsRemoved(*it)) { + it = node_ids->erase(it); + } else { + it++; + } + } +} + +rpc::Address GetOwnerAddressFromObjectInfo( + const object_manager::protocol::ObjectInfoT &object_info) { + rpc::Address owner_address; + owner_address.set_raylet_id(object_info.owner_raylet_id); + owner_address.set_ip_address(object_info.owner_ip_address); + owner_address.set_port(object_info.owner_port); + owner_address.set_worker_id(object_info.owner_worker_id); + return owner_address; +} + +} // namespace + +std::shared_ptr OwnershipBasedObjectDirectory::GetClient( + const rpc::Address &owner_address) { + WorkerID worker_id = WorkerID::FromBinary(owner_address.worker_id()); + if (worker_id.IsNil()) { + // If an object does not have owner, return nullptr. + return nullptr; + } + auto it = worker_rpc_clients_.find(worker_id); + if (it == worker_rpc_clients_.end()) { + it = worker_rpc_clients_ + .emplace(worker_id, std::make_shared( + owner_address, client_call_manager_)) + .first; + } + return it->second; +} + +ray::Status OwnershipBasedObjectDirectory::ReportObjectAdded( + const ObjectID &object_id, const ClientID &client_id, + const object_manager::protocol::ObjectInfoT &object_info) { + WorkerID worker_id = WorkerID::FromBinary(object_info.owner_worker_id); + rpc::Address owner_address = GetOwnerAddressFromObjectInfo(object_info); + std::shared_ptr rpc_client = GetClient(owner_address); + if (rpc_client == nullptr) { + RAY_LOG(WARNING) << "Object " << object_id << " does not have owner. " + << "ReportObjectAdded becomes a no-op."; + return Status::OK(); + } + rpc::AddObjectLocationOwnerRequest request; + request.set_intended_worker_id(object_info.owner_worker_id); + request.set_object_id(object_id.Binary()); + request.set_client_id(client_id.Binary()); + + rpc_client->AddObjectLocationOwner( + request, [worker_id, object_id](Status status, + const rpc::AddObjectLocationOwnerReply &reply) { + if (!status.ok()) { + RAY_LOG(ERROR) << "Worker " << worker_id << " failed to add the location for " + << object_id; + } + }); + return Status::OK(); +} + +ray::Status OwnershipBasedObjectDirectory::ReportObjectRemoved( + const ObjectID &object_id, const ClientID &client_id, + const object_manager::protocol::ObjectInfoT &object_info) { + WorkerID worker_id = WorkerID::FromBinary(object_info.owner_worker_id); + rpc::Address owner_address = GetOwnerAddressFromObjectInfo(object_info); + std::shared_ptr rpc_client = GetClient(owner_address); + if (rpc_client == nullptr) { + RAY_LOG(WARNING) << "Object " << object_id << " does not have owner. " + << "ReportObjectRemoved becomes a no-op."; + return Status::OK(); + } + + rpc::RemoveObjectLocationOwnerRequest request; + request.set_intended_worker_id(object_info.owner_worker_id); + request.set_object_id(object_id.Binary()); + request.set_client_id(client_id.Binary()); + + rpc_client->RemoveObjectLocationOwner( + request, [worker_id, object_id](Status status, + const rpc::RemoveObjectLocationOwnerReply &reply) { + if (!status.ok()) { + RAY_LOG(ERROR) << "Worker " << worker_id + << " failed to remove the location for " << object_id; + } + }); + return Status::OK(); +}; + +void OwnershipBasedObjectDirectory::SubscriptionCallback( + ObjectID object_id, WorkerID worker_id, Status status, + const rpc::GetObjectLocationsOwnerReply &reply) { + auto it = listeners_.find(object_id); + if (it == listeners_.end()) { + return; + } + + std::unordered_set client_ids; + for (auto const &client_id : reply.client_ids()) { + client_ids.emplace(ClientID::FromBinary(client_id)); + } + FilterRemovedClients(gcs_client_, &client_ids); + if (client_ids != it->second.current_object_locations) { + it->second.current_object_locations = std::move(client_ids); + auto callbacks = it->second.callbacks; + // Call all callbacks associated with the object id locations we have + // received. This notifies the client even if the list of locations is + // empty, since this may indicate that the objects have been evicted from + // all nodes. + for (const auto &callback_pair : callbacks) { + // It is safe to call the callback directly since this is already running + // in the subscription callback stack. + callback_pair.second(object_id, it->second.current_object_locations); + } + } + + auto worker_it = worker_rpc_clients_.find(worker_id); + rpc::GetObjectLocationsOwnerRequest request; + request.set_intended_worker_id(worker_id.Binary()); + request.set_object_id(object_id.Binary()); + // TODO(zhuohan): Fix this infinite loop. + worker_it->second->GetObjectLocationsOwner( + request, + std::bind(&OwnershipBasedObjectDirectory::SubscriptionCallback, this, object_id, + worker_id, std::placeholders::_1, std::placeholders::_2)); +} + +ray::Status OwnershipBasedObjectDirectory::SubscribeObjectLocations( + const UniqueID &callback_id, const ObjectID &object_id, + const rpc::Address &owner_address, const OnLocationsFound &callback) { + auto it = listeners_.find(object_id); + if (it == listeners_.end()) { + WorkerID worker_id = WorkerID::FromBinary(owner_address.worker_id()); + std::shared_ptr rpc_client = GetClient(owner_address); + if (rpc_client == nullptr) { + RAY_LOG(WARNING) << "Object " << object_id << " does not have owner. " + << "SubscribeObjectLocations becomes a no-op."; + return Status::OK(); + } + rpc::GetObjectLocationsOwnerRequest request; + request.set_intended_worker_id(owner_address.worker_id()); + request.set_object_id(object_id.Binary()); + rpc_client->GetObjectLocationsOwner( + request, + std::bind(&OwnershipBasedObjectDirectory::SubscriptionCallback, this, object_id, + worker_id, std::placeholders::_1, std::placeholders::_2)); + it = listeners_.emplace(object_id, LocationListenerState()).first; + } + auto &listener_state = it->second; + + if (listener_state.callbacks.count(callback_id) > 0) { + return Status::OK(); + } + listener_state.callbacks.emplace(callback_id, callback); + return Status::OK(); +} + +ray::Status OwnershipBasedObjectDirectory::UnsubscribeObjectLocations( + const UniqueID &callback_id, const ObjectID &object_id) { + auto entry = listeners_.find(object_id); + if (entry == listeners_.end()) { + return Status::OK(); + } + entry->second.callbacks.erase(callback_id); + if (entry->second.callbacks.empty()) { + listeners_.erase(entry); + } + return Status::OK(); +} + +ray::Status OwnershipBasedObjectDirectory::LookupLocations( + const ObjectID &object_id, const rpc::Address &owner_address, + const OnLocationsFound &callback) { + WorkerID worker_id = WorkerID::FromBinary(owner_address.worker_id()); + std::shared_ptr rpc_client = GetClient(owner_address); + if (rpc_client == nullptr) { + RAY_LOG(WARNING) << "Object " << object_id << " does not have owner. " + << "LookupLocations returns an empty list of locations."; + io_service_.post( + [callback, object_id]() { callback(object_id, std::unordered_set()); }); + return Status::OK(); + } + + rpc::GetObjectLocationsOwnerRequest request; + request.set_intended_worker_id(owner_address.worker_id()); + request.set_object_id(object_id.Binary()); + + rpc_client->GetObjectLocationsOwner( + request, [this, worker_id, object_id, callback]( + Status status, const rpc::GetObjectLocationsOwnerReply &reply) { + if (!status.ok()) { + RAY_LOG(ERROR) << "Worker " << worker_id << " failed to get the location for " + << object_id; + } + std::unordered_set client_ids; + for (auto const &client_id : reply.client_ids()) { + client_ids.emplace(ClientID::FromBinary(client_id)); + } + FilterRemovedClients(gcs_client_, &client_ids); + callback(object_id, client_ids); + }); + return Status::OK(); +} + +std::string OwnershipBasedObjectDirectory::DebugString() const { + std::stringstream result; + result << "OwnershipBasedObjectDirectory:"; + result << "\n- num listeners: " << listeners_.size(); + return result.str(); +} + +} // namespace ray diff --git a/src/ray/object_manager/ownership_based_object_directory.h b/src/ray/object_manager/ownership_based_object_directory.h new file mode 100644 index 000000000..12b3f8de2 --- /dev/null +++ b/src/ray/object_manager/ownership_based_object_directory.h @@ -0,0 +1,88 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include "absl/container/flat_hash_map.h" +#include "ray/common/id.h" +#include "ray/common/status.h" +#include "ray/gcs/redis_gcs_client.h" +#include "ray/object_manager/format/object_manager_generated.h" +#include "ray/object_manager/object_directory.h" +#include "ray/rpc/worker/core_worker_client.h" + +namespace ray { + +/// Ray OwnershipBasedObjectDirectory declaration. +class OwnershipBasedObjectDirectory : public ObjectDirectory { + public: + /// Create an ownership based object directory. + /// + /// \param io_service The event loop to dispatch callbacks to. This should + /// usually be the same event loop that the given gcs_client runs on. + /// \param gcs_client A Ray GCS client to request object and client + /// information from. + OwnershipBasedObjectDirectory(boost::asio::io_service &io_service, + std::shared_ptr &gcs_client); + + virtual ~OwnershipBasedObjectDirectory() {} + + ray::Status LookupLocations(const ObjectID &object_id, + const rpc::Address &owner_address, + const OnLocationsFound &callback) override; + + ray::Status SubscribeObjectLocations(const UniqueID &callback_id, + const ObjectID &object_id, + const rpc::Address &owner_address, + const OnLocationsFound &callback) override; + ray::Status UnsubscribeObjectLocations(const UniqueID &callback_id, + const ObjectID &object_id) override; + + ray::Status ReportObjectAdded( + const ObjectID &object_id, const ClientID &client_id, + const object_manager::protocol::ObjectInfoT &object_info) override; + ray::Status ReportObjectRemoved( + const ObjectID &object_id, const ClientID &client_id, + const object_manager::protocol::ObjectInfoT &object_info) override; + + std::string DebugString() const override; + + /// OwnershipBasedObjectDirectory should not be copied. + RAY_DISALLOW_COPY_AND_ASSIGN(OwnershipBasedObjectDirectory); + + private: + /// The client call manager used to create the RPC clients. + rpc::ClientCallManager client_call_manager_; + /// Cache of gRPC clients to workers (not necessarily running on this node). + /// Also includes the number of inflight requests to each worker - when this + /// reaches zero, the client will be deleted and a new one will need to be created + /// for any subsequent requests. + absl::flat_hash_map> + worker_rpc_clients_; + + /// Get or create the rpc client in the worker_rpc_clients. + std::shared_ptr GetClient(const rpc::Address &owner_address); + + /// Internal callback function used by SubscribeObjectLocations. + void SubscriptionCallback(ObjectID object_id, WorkerID worker_id, Status status, + const rpc::GetObjectLocationsOwnerReply &reply); +}; + +} // namespace ray diff --git a/src/ray/object_manager/plasma/client.cc b/src/ray/object_manager/plasma/client.cc index 5fd5c7315..8e6f2eb2f 100644 --- a/src/ray/object_manager/plasma/client.cc +++ b/src/ray/object_manager/plasma/client.cc @@ -153,8 +153,9 @@ class PlasmaClient::Impl : public std::enable_shared_from_this* data, int device_num = 0, + Status Create(const ObjectID& object_id, const ray::rpc::Address& owner_address, + int64_t data_size, const uint8_t* metadata, int64_t metadata_size, + std::shared_ptr* data, int device_num = 0, bool evict_if_full = true); Status Get(const std::vector& object_ids, int64_t timeout_ms, @@ -305,15 +306,15 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id, object_entry->count += 1; } -Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size, - const uint8_t* metadata, int64_t metadata_size, +Status PlasmaClient::Impl::Create(const ObjectID& object_id, const ray::rpc::Address& owner_address, + int64_t data_size, const uint8_t* metadata, int64_t metadata_size, std::shared_ptr* data, int device_num, bool evict_if_full) { std::lock_guard guard(client_mutex_); RAY_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size " << data_size << " and metadata size " << metadata_size; - RAY_RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, evict_if_full, data_size, + RAY_RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, owner_address, evict_if_full, data_size, metadata_size, device_num)); std::vector buffer; RAY_RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaCreateReply, &buffer)); @@ -789,11 +790,11 @@ Status PlasmaClient::SetClientOptions(const std::string& client_name, return impl_->SetClientOptions(client_name, output_memory_quota); } -Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size, +Status PlasmaClient::Create(const ObjectID& object_id, const ray::rpc::Address& owner_address, int64_t data_size, const uint8_t* metadata, int64_t metadata_size, std::shared_ptr* data, int device_num, bool evict_if_full) { - return impl_->Create(object_id, data_size, metadata, metadata_size, data, device_num, + return impl_->Create(object_id, owner_address, data_size, metadata, metadata_size, data, device_num, evict_if_full); } diff --git a/src/ray/object_manager/plasma/client.h b/src/ray/object_manager/plasma/client.h index f67b52d2f..1ecfaded7 100644 --- a/src/ray/object_manager/plasma/client.h +++ b/src/ray/object_manager/plasma/client.h @@ -27,6 +27,7 @@ #include "ray/common/status.h" #include "ray/object_manager/plasma/common.h" #include "ray/util/visibility.h" +#include "src/ray/protobuf/common.pb.h" using arrow::Buffer; @@ -77,6 +78,7 @@ class RAY_EXPORT PlasmaClient { /// be passed in when the object is created. /// /// \param object_id The ID to use for the newly created object. + /// \param owner_address The address of the object's owner. /// \param data_size The size in bytes of the space to be allocated for this /// object's /// data (this does not include space used for metadata). @@ -97,8 +99,9 @@ class RAY_EXPORT PlasmaClient { /// /// The returned object must be released once it is done with. It must also /// be either sealed or aborted. - Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata, - int64_t metadata_size, std::shared_ptr* data, int device_num = 0, + Status Create(const ObjectID& object_id, const ray::rpc::Address& owner_address, + int64_t data_size, const uint8_t* metadata, int64_t metadata_size, + std::shared_ptr* data, int device_num = 0, bool evict_if_full = true); /// Get some objects from the Plasma Store. This function will block until the diff --git a/src/ray/object_manager/plasma/common.h b/src/ray/object_manager/plasma/common.h index fe081c0cf..75af867c8 100644 --- a/src/ray/object_manager/plasma/common.h +++ b/src/ray/object_manager/plasma/common.h @@ -24,6 +24,7 @@ #include #include "ray/common/id.h" +#include "ray/object_manager/format/object_manager_generated.h" #include "ray/object_manager/plasma/compat.h" #ifdef PLASMA_CUDA @@ -33,6 +34,8 @@ namespace plasma { using ray::ObjectID; +using ray::ClientID; +using ray::WorkerID; enum class ObjectLocation : int32_t { Local, Remote, Nonexistent }; @@ -77,6 +80,14 @@ struct ObjectTableEntry { int64_t metadata_size; /// Number of clients currently using this object. int ref_count; + /// Owner's raylet ID. + ClientID owner_raylet_id; + /// Owner's IP address. + std::string owner_ip_address; + /// Owner's port. + int owner_port; + /// Owner's worker ID. + WorkerID owner_worker_id; /// Unix epoch of when this object was created. int64_t create_time; /// How long creation of this object took. diff --git a/src/ray/object_manager/plasma/plasma.fbs b/src/ray/object_manager/plasma/plasma.fbs index 856b6d5b7..e7e4fdcd8 100644 --- a/src/ray/object_manager/plasma/plasma.fbs +++ b/src/ray/object_manager/plasma/plasma.fbs @@ -128,6 +128,14 @@ table PlasmaGetDebugStringReply { table PlasmaCreateRequest { // ID of the object to be created. object_id: string; + // Owner raylet ID of this object. + owner_raylet_id: string; + // Owner IP address of this object. + owner_ip_address: string; + // Owner port address of this object. + owner_port: int; + // Unique id for the owner worker. + owner_worker_id: string; // Whether to evict other objects to make room for this one. evict_if_full: bool; // The size of the object's data in bytes. diff --git a/src/ray/object_manager/plasma/protocol.cc b/src/ray/object_manager/plasma/protocol.cc index 9ab1d7b32..85c61af78 100644 --- a/src/ray/object_manager/plasma/protocol.cc +++ b/src/ray/object_manager/plasma/protocol.cc @@ -191,17 +191,24 @@ Status ReadGetDebugStringReply(uint8_t* data, size_t size, std::string* debug_st // Create messages. -Status SendCreateRequest(const std::shared_ptr &store_conn, ObjectID object_id, bool evict_if_full, +Status SendCreateRequest(const std::shared_ptr &store_conn, ObjectID object_id, + const ray::rpc::Address& owner_address, bool evict_if_full, int64_t data_size, int64_t metadata_size, int device_num) { flatbuffers::FlatBufferBuilder fbb; auto message = fb::CreatePlasmaCreateRequest(fbb, fbb.CreateString(object_id.Binary()), + fbb.CreateString(owner_address.raylet_id()), + fbb.CreateString(owner_address.ip_address()), + owner_address.port(), + fbb.CreateString(owner_address.worker_id()), evict_if_full, data_size, metadata_size, device_num); return PlasmaSend(store_conn, MessageType::PlasmaCreateRequest, &fbb, message); } Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id, - bool* evict_if_full, int64_t* data_size, int64_t* metadata_size, + ClientID* owner_raylet_id, std::string* owner_ip_address, + int* owner_port, WorkerID* owner_worker_id, bool* evict_if_full, + int64_t* data_size, int64_t* metadata_size, int* device_num) { RAY_DCHECK(data); auto message = flatbuffers::GetRoot(data); @@ -210,6 +217,10 @@ Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id, *data_size = message->data_size(); *metadata_size = message->metadata_size(); *object_id = ObjectID::FromBinary(message->object_id()->str()); + *owner_raylet_id = ClientID::FromBinary(message->owner_raylet_id()->str()); + *owner_ip_address = message->owner_ip_address()->str(); + *owner_port = message->owner_port(); + *owner_worker_id = WorkerID::FromBinary(message->owner_worker_id()->str()); *device_num = message->device_num(); return Status::OK(); } diff --git a/src/ray/object_manager/plasma/protocol.h b/src/ray/object_manager/plasma/protocol.h index 44f726149..a321b0340 100644 --- a/src/ray/object_manager/plasma/protocol.h +++ b/src/ray/object_manager/plasma/protocol.h @@ -25,6 +25,7 @@ #include "ray/common/status.h" #include "ray/object_manager/plasma/plasma.h" #include "ray/object_manager/plasma/plasma_generated.h" +#include "src/ray/protobuf/common.pb.h" namespace plasma { @@ -79,11 +80,14 @@ Status ReadGetDebugStringReply(uint8_t* data, size_t size, std::string* debug_st /* Plasma Create message functions. */ -Status SendCreateRequest(const std::shared_ptr &store_conn, ObjectID object_id, bool evict_if_full, +Status SendCreateRequest(const std::shared_ptr &store_conn, ObjectID object_id, + const ray::rpc::Address &owner_address, bool evict_if_full, int64_t data_size, int64_t metadata_size, int device_num); Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id, - bool* evict_if_full, int64_t* data_size, int64_t* metadata_size, + ClientID* owner_raylet_id, std::string* owner_ip_address, + int* owner_port, WorkerID* owner_worker_id, bool* evict_if_full, + int64_t* data_size, int64_t* metadata_size, int* device_num); Status SendCreateReply(const std::shared_ptr &client, ObjectID object_id, PlasmaObject* object, diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index e3175f29b..4982048be 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -226,9 +226,13 @@ Status PlasmaStore::FreeCudaMemory(int device_num, int64_t size, uint8_t* pointe #endif // Create a new object buffer in the hash table. -PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_full, - int64_t data_size, int64_t metadata_size, - int device_num, const std::shared_ptr &client, +PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, + const ClientID& owner_raylet_id, + const std::string& owner_ip_address, + int owner_port, const WorkerID& owner_worker_id, + bool evict_if_full, int64_t data_size, + int64_t metadata_size, int device_num, + const std::shared_ptr &client, PlasmaObject* result) { RAY_LOG(DEBUG) << "creating object " << object_id.Hex(); @@ -282,6 +286,10 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_f entry->offset = offset; entry->state = ObjectState::PLASMA_CREATED; entry->device_num = device_num; + entry->owner_raylet_id = owner_raylet_id; + entry->owner_ip_address = owner_ip_address; + entry->owner_port = owner_port; + entry->owner_worker_id = owner_worker_id; entry->create_time = std::time(nullptr); entry->construct_duration = -1; @@ -607,6 +615,10 @@ void PlasmaStore::SealObjects(const std::vector& object_ids) { object_info.object_id = object_ids[i].Binary(); object_info.data_size = entry->data_size; + object_info.owner_raylet_id = entry->owner_raylet_id.Binary(); + object_info.owner_ip_address = entry->owner_ip_address; + object_info.owner_port = entry->owner_port; + object_info.owner_worker_id = entry->owner_worker_id.Binary(); object_info.metadata_size = entry->metadata_size; infos.push_back(object_info); } @@ -660,14 +672,19 @@ PlasmaError PlasmaStore::DeleteObject(ObjectID& object_id) { return PlasmaError::ObjectInUse; } + // Prepare the notification before deleting the object. + ObjectInfoT notification; + notification.object_id = object_id.Binary(); + notification.owner_raylet_id = entry->owner_raylet_id.Binary(); + notification.owner_ip_address = entry->owner_ip_address; + notification.owner_port = entry->owner_port; + notification.owner_worker_id = entry->owner_worker_id.Binary(); + notification.is_deletion = true; + eviction_policy_.RemoveObject(object_id); EraseFromObjectTable(object_id); // Inform all subscribers that the object has been deleted. - ObjectInfoT notification; - notification.object_id = object_id.Binary(); - notification.is_deletion = true; PushNotification(¬ification); - return PlasmaError::OK; } @@ -698,13 +715,18 @@ void PlasmaStore::EvictObjects(const std::vector& object_ids) { entry->pointer, entry->data_size + entry->metadata_size)); evicted_entries.push_back(entry); } else { + // Prepare the notification before deleting the object. + ObjectInfoT notification; + notification.object_id = object_id.Binary(); + notification.owner_raylet_id = entry->owner_raylet_id.Binary(); + notification.owner_ip_address = entry->owner_ip_address; + notification.owner_port = entry->owner_port; + notification.owner_worker_id = entry->owner_worker_id.Binary(); + notification.is_deletion = true; // If there is no backing external store, just erase the object entry // and send a deletion notification. EraseFromObjectTable(object_id); // Inform all subscribers that the object has been deleted. - ObjectInfoT notification; - notification.object_id = object_id.Binary(); - notification.is_deletion = true; PushNotification(¬ification); } } @@ -840,6 +862,10 @@ void PlasmaStore::SubscribeToUpdates(const std::shared_ptr &client) { info.object_id = entry.first.Binary(); info.data_size = entry.second->data_size; info.metadata_size = entry.second->metadata_size; + info.owner_raylet_id = entry.second->owner_raylet_id.Binary(); + info.owner_ip_address = entry.second->owner_ip_address; + info.owner_port = entry.second->owner_port; + info.owner_worker_id = entry.second->owner_worker_id.Binary(); infos.push_back(info); } } @@ -858,14 +884,21 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr &client, // Process the different types of requests. switch (type) { case fb::MessageType::PlasmaCreateRequest: { + ClientID owner_raylet_id; + std::string owner_ip_address; + int owner_port; + WorkerID owner_worker_id; bool evict_if_full; int64_t data_size; int64_t metadata_size; int device_num; - RAY_RETURN_NOT_OK(ReadCreateRequest(input, input_size, &object_id, &evict_if_full, - &data_size, &metadata_size, &device_num)); - PlasmaError error_code = CreateObject(object_id, evict_if_full, data_size, - metadata_size, device_num, client, &object); + RAY_RETURN_NOT_OK(ReadCreateRequest( + input, input_size, &object_id, &owner_raylet_id, &owner_ip_address, &owner_port, + &owner_worker_id, &evict_if_full, &data_size, &metadata_size, &device_num)); + PlasmaError error_code = CreateObject(object_id, owner_raylet_id, owner_ip_address, + owner_port, owner_worker_id, evict_if_full, + data_size, metadata_size, device_num, client, + &object); int64_t mmap_size = 0; if (error_code == PlasmaError::OK && device_num == 0) { mmap_size = GetMmapSize(object.store_fd); diff --git a/src/ray/object_manager/plasma/store.h b/src/ray/object_manager/plasma/store.h index 968e08b99..29fce0e5e 100644 --- a/src/ray/object_manager/plasma/store.h +++ b/src/ray/object_manager/plasma/store.h @@ -69,6 +69,10 @@ class PlasmaStore { /// the store when it is done with the object. /// /// \param object_id Object ID of the object to be created. + /// \param owner_raylet_id Raylet ID of the object's owner. + /// \param owner_ip_address IP address of the object's owner. + /// \param owner_port Port of the object's owner. + /// \param owner_worker_id Worker ID of the object's owner. /// \param evict_if_full If this is true, then when the object store is full, /// try to evict objects that are not currently referenced before /// creating the object. Else, do not evict any objects and @@ -90,7 +94,9 @@ class PlasmaStore { /// - PlasmaError::OutOfMemory, if the store is out of memory and /// cannot create the object. In this case, the client should not call /// plasma_release. - PlasmaError CreateObject(const ObjectID& object_id, bool evict_if_full, + PlasmaError CreateObject(const ObjectID& object_id, const ClientID& owner_raylet_id, + const std::string& owner_ip_address, int owner_port, + const WorkerID& owner_worker_id, bool evict_if_full, int64_t data_size, int64_t metadata_size, int device_num, const std::shared_ptr &client, PlasmaObject* result); diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index 586d3782a..60c7ab4b6 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -22,6 +22,7 @@ #include "ray/common/test_util.h" #include "ray/object_manager/object_manager.h" #include "ray/util/filesystem.h" +#include "src/ray/protobuf/common.pb.h" extern "C" { #include "hiredis/hiredis.h" @@ -145,7 +146,8 @@ class TestObjectManagerBase : public ::testing::Test { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - RAY_CHECK_OK(client.Create(object_id, data_size, metadata, metadata_size, &data)); + RAY_CHECK_OK(client.Create(object_id, ray::rpc::Address(), data_size, metadata, + metadata_size, &data)); RAY_CHECK_OK(client.Seal(object_id)); return object_id; } @@ -347,21 +349,21 @@ class StressTestObjectManager : public TestObjectManagerBase { case TransferPattern::PULL_A_B: { for (int i = -1; ++i < num_trials;) { ObjectID oid1 = WriteDataToClient(client1, data_size); - status = server2->object_manager_.Pull(oid1); + status = server2->object_manager_.Pull(oid1, rpc::Address()); } } break; case TransferPattern::PULL_B_A: { for (int i = -1; ++i < num_trials;) { ObjectID oid2 = WriteDataToClient(client2, data_size); - status = server1->object_manager_.Pull(oid2); + status = server1->object_manager_.Pull(oid2, rpc::Address()); } } break; case TransferPattern::BIDIRECTIONAL_PULL: { for (int i = -1; ++i < num_trials;) { ObjectID oid1 = WriteDataToClient(client1, data_size); - status = server2->object_manager_.Pull(oid1); + status = server2->object_manager_.Pull(oid1, rpc::Address()); ObjectID oid2 = WriteDataToClient(client2, data_size); - status = server1->object_manager_.Pull(oid2); + status = server1->object_manager_.Pull(oid2, rpc::Address()); } } break; case TransferPattern::BIDIRECTIONAL_PULL_VARIABLE_DATA_SIZE: { @@ -370,9 +372,9 @@ class StressTestObjectManager : public TestObjectManagerBase { std::uniform_int_distribution<> dis(1, 50); for (int i = -1; ++i < num_trials;) { ObjectID oid1 = WriteDataToClient(client1, data_size + dis(gen)); - status = server2->object_manager_.Pull(oid1); + status = server2->object_manager_.Pull(oid1, rpc::Address()); ObjectID oid2 = WriteDataToClient(client2, data_size + dis(gen)); - status = server1->object_manager_.Pull(oid2); + status = server1->object_manager_.Pull(oid2, rpc::Address()); } } break; default: { diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index 8a91822dd..77324af04 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -21,6 +21,7 @@ #include "ray/common/status.h" #include "ray/common/test_util.h" #include "ray/util/filesystem.h" +#include "src/ray/protobuf/common.pb.h" extern "C" { #include "hiredis/hiredis.h" @@ -144,7 +145,8 @@ class TestObjectManagerBase : public ::testing::Test { uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); std::shared_ptr data; - RAY_CHECK_OK(client.Create(object_id, data_size, metadata, metadata_size, &data)); + RAY_CHECK_OK(client.Create(object_id, ray::rpc::Address(), data_size, metadata, + metadata_size, &data)); RAY_CHECK_OK(client.Seal(object_id)); return object_id; } @@ -259,7 +261,7 @@ class TestObjectManager : public TestObjectManagerBase { UniqueID sub_id = ray::UniqueID::FromRandom(); RAY_CHECK_OK(server1->object_manager_.object_directory_->SubscribeObjectLocations( - sub_id, object_1, + sub_id, object_1, rpc::Address(), [this, sub_id, object_1, object_2]( const ray::ObjectID &object_id, const std::unordered_set &clients) { @@ -279,7 +281,8 @@ class TestObjectManager : public TestObjectManagerBase { UniqueID wait_id = UniqueID::FromRandom(); RAY_CHECK_OK(server1->object_manager_.AddWaitRequest( - wait_id, object_ids, timeout_ms, required_objects, false, + wait_id, object_ids, std::unordered_map(), timeout_ms, + required_objects, false, [this, sub_id, object_1, object_ids, start_time]( const std::vector &found, const std::vector &remaining) { @@ -352,7 +355,8 @@ class TestObjectManager : public TestObjectManagerBase { boost::posix_time::ptime start_time = boost::posix_time::second_clock::local_time(); RAY_CHECK_OK(server1->object_manager_.Wait( - object_ids, timeout_ms, required_objects, false, + object_ids, std::unordered_map(), timeout_ms, + required_objects, false, [this, object_ids, num_objects, timeout_ms, required_objects, start_time]( const std::vector &found, const std::vector &remaining) { diff --git a/src/ray/protobuf/BUILD b/src/ray/protobuf/BUILD index 1e7c3c982..a08b76fee 100644 --- a/src/ray/protobuf/BUILD +++ b/src/ray/protobuf/BUILD @@ -86,6 +86,7 @@ cc_proto_library( proto_library( name = "object_manager_proto", srcs = ["object_manager.proto"], + deps = [":common_proto"], ) cc_proto_library( diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 51f0a9ecf..80f7ce30e 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -161,6 +161,33 @@ message WaitForObjectEvictionRequest { message WaitForObjectEvictionReply { } +message AddObjectLocationOwnerRequest { + bytes intended_worker_id = 1; + bytes object_id = 2; + bytes client_id = 3; +} + +message AddObjectLocationOwnerReply { +} + +message RemoveObjectLocationOwnerRequest { + bytes intended_worker_id = 1; + bytes object_id = 2; + bytes client_id = 3; +} + +message RemoveObjectLocationOwnerReply { +} + +message GetObjectLocationsOwnerRequest { + bytes intended_worker_id = 1; + bytes object_id = 2; +} + +message GetObjectLocationsOwnerReply { + repeated bytes client_ids = 1; +} + message KillActorRequest { // ID of the actor that is intended to be killed. bytes intended_actor_id = 1; @@ -284,6 +311,15 @@ service CoreWorkerService { // to this message indicates that the raylet should unpin the object. rpc WaitForObjectEviction(WaitForObjectEvictionRequest) returns (WaitForObjectEvictionReply); + // Add object location to the ownership-based object directory. + rpc AddObjectLocationOwner(AddObjectLocationOwnerRequest) + returns (AddObjectLocationOwnerReply); + // Remove object location from the ownership-based object directory. + rpc RemoveObjectLocationOwner(RemoveObjectLocationOwnerRequest) + returns (RemoveObjectLocationOwnerReply); + // Get object locations from the ownership-based object directory. + rpc GetObjectLocationsOwner(GetObjectLocationsOwnerRequest) + returns (GetObjectLocationsOwnerReply); // Request that the worker shut down without completing outstanding work. rpc KillActor(KillActorRequest) returns (KillActorReply); // Request that a worker cancels a task. diff --git a/src/ray/protobuf/object_manager.proto b/src/ray/protobuf/object_manager.proto index ea95c8f4c..f03456c9d 100644 --- a/src/ray/protobuf/object_manager.proto +++ b/src/ray/protobuf/object_manager.proto @@ -16,6 +16,8 @@ syntax = "proto3"; package ray.rpc; +import "src/ray/protobuf/common.proto"; + message PushRequest { // The push ID to allow the receiver to differentiate different push attempts // from the same sender. @@ -24,14 +26,16 @@ message PushRequest { bytes object_id = 2; // The client ID of client sending this object bytes client_id = 3; + // The owner address + Address owner_address = 4; // The index of the chunk being transferred. - uint32 chunk_index = 4; + uint32 chunk_index = 5; // The data_size include object_size and metadata_size - uint64 data_size = 5; + uint64 data_size = 6; // The metadata size. - uint64 metadata_size = 6; + uint64 metadata_size = 7; // The chunk data - bytes data = 7; + bytes data = 8; } message PullRequest { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 7914b04b1..f5602dffa 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1406,7 +1406,7 @@ void NodeManager::ProcessFetchOrReconstructMessage( // dependencies to the task dependency manager. if (!task_dependency_manager_.CheckObjectLocal(object_id)) { // Fetch the object if it's not already local. - RAY_CHECK_OK(object_manager_.Pull(object_id)); + RAY_CHECK_OK(object_manager_.Pull(object_id, ref.owner_address())); } } } else { @@ -1428,6 +1428,12 @@ void NodeManager::ProcessWaitRequestMessage( int64_t wait_ms = message->timeout(); uint64_t num_required_objects = static_cast(message->num_ready_objects()); bool wait_local = message->wait_local(); + const auto refs = + FlatbufferToObjectReference(*message->object_ids(), *message->owner_addresses()); + std::unordered_map owner_addresses; + for (const auto &ref : refs) { + owner_addresses.emplace(ObjectID::FromBinary(ref.object_id()), ref.owner_address()); + } bool resolve_objects = false; for (auto const &object_id : object_ids) { @@ -1444,14 +1450,12 @@ void NodeManager::ProcessWaitRequestMessage( // already local. Missing objects will be pulled from remote node managers. // If an object's owner dies, an error will be stored as the object's // value. - const auto refs = - FlatbufferToObjectReference(*message->object_ids(), *message->owner_addresses()); AsyncResolveObjects(client, refs, current_task_id, /*ray_get=*/false, /*mark_worker_blocked*/ was_blocked); } ray::Status status = object_manager_.Wait( - object_ids, wait_ms, num_required_objects, wait_local, + object_ids, owner_addresses, wait_ms, num_required_objects, wait_local, [this, resolve_objects, was_blocked, client, current_task_id]( std::vector found, std::vector remaining) { // Write the data. @@ -1487,6 +1491,10 @@ void NodeManager::ProcessWaitForDirectActorCallArgsRequestMessage( // managers or store an error if the objects have failed. const auto refs = FlatbufferToObjectReference(*message->object_ids(), *message->owner_addresses()); + std::unordered_map owner_addresses; + for (const auto &ref : refs) { + owner_addresses.emplace(ObjectID::FromBinary(ref.object_id()), ref.owner_address()); + } AsyncResolveObjects(client, refs, TaskID::Nil(), /*ray_get=*/false, /*mark_worker_blocked*/ false); // Reply to the client once a location has been found for all arguments. @@ -1494,7 +1502,7 @@ void NodeManager::ProcessWaitForDirectActorCallArgsRequestMessage( // has been found, so the object may still be on a remote node when the // client receives the reply. ray::Status status = object_manager_.Wait( - object_ids, -1, object_ids.size(), false, + object_ids, owner_addresses, -1, object_ids.size(), false, [this, client, tag](std::vector found, std::vector remaining) { RAY_CHECK(remaining.empty()); std::shared_ptr worker = @@ -1992,9 +2000,12 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ num_returns -= 1; } // Determine which IDs should be marked as failed. - std::vector objects_to_fail; + std::vector objects_to_fail; for (int64_t i = 0; i < num_returns; i++) { - objects_to_fail.push_back(spec.ReturnId(i)); + rpc::ObjectReference ref; + ref.set_object_id(spec.ReturnId(i).Binary()); + ref.mutable_owner_address()->CopyFrom(spec.CallerAddress()); + objects_to_fail.push_back(ref); } const JobID job_id = task.GetTaskSpecification().JobId(); MarkObjectsAsFailed(error_type, objects_to_fail, job_id); @@ -2007,14 +2018,15 @@ void NodeManager::TreatTaskAsFailed(const Task &task, const ErrorType &error_typ task_dependency_manager_.UnsubscribeGetDependencies(spec.TaskId()); } -void NodeManager::MarkObjectsAsFailed(const ErrorType &error_type, - const std::vector objects_to_fail, - const JobID &job_id) { +void NodeManager::MarkObjectsAsFailed( + const ErrorType &error_type, const std::vector objects_to_fail, + const JobID &job_id) { const std::string meta = std::to_string(static_cast(error_type)); - for (const auto &object_id : objects_to_fail) { + for (const auto &ref : objects_to_fail) { + ObjectID object_id = ObjectID::FromBinary(ref.object_id()); std::shared_ptr data; Status status; - status = store_client_.Create(object_id, 0, + status = store_client_.Create(object_id, ref.owner_address(), 0, reinterpret_cast(meta.c_str()), meta.length(), &data); if (status.ok()) { @@ -2057,9 +2069,10 @@ void NodeManager::TreatTaskAsFailedIfLost(const Task &task) { const ObjectID object_id = spec.ReturnId(i); // Lookup the return value's locations. RAY_CHECK_OK(object_directory_->LookupLocations( - object_id, [this, task_marked_as_failed, task]( - const ray::ObjectID &object_id, - const std::unordered_set &clients) { + object_id, spec.CallerAddress(), + [this, task_marked_as_failed, task]( + const ray::ObjectID &object_id, + const std::unordered_set &clients) { if (!*task_marked_as_failed) { // Only process the object locations if we haven't already marked the // task as failed. @@ -2527,8 +2540,10 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id, // LRU eviction is enabled. The object may still be in scope, but we // weren't able to fetch the value within the timeout, so the value has // most likely been evicted. Mark the object as unreachable. - MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, {required_object_id}, - JobID::Nil()); + rpc::ObjectReference ref; + ref.set_object_id(required_object_id.Binary()); + ref.mutable_owner_address()->CopyFrom(owner_addr); + MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, {ref}, JobID::Nil()); } else { RAY_LOG(DEBUG) << "Required object " << required_object_id << " fetch timed out, asking owner " @@ -2544,7 +2559,7 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id, rpc::GetObjectStatusRequest request; request.set_object_id(required_object_id.Binary()); request.set_owner_worker_id(owner_addr.worker_id()); - client->GetObjectStatus(request, [this, required_object_id]( + client->GetObjectStatus(request, [this, required_object_id, owner_addr]( Status status, const rpc::GetObjectStatusReply &reply) { if (!status.ok() || reply.status() == rpc::GetObjectStatusReply::OUT_OF_SCOPE || @@ -2556,8 +2571,10 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id, // freed. Store an error in the local plasma store so that an // exception will be thrown when the worker tries to get the // value. - MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, {required_object_id}, - JobID::Nil()); + rpc::ObjectReference ref; + ref.set_object_id(required_object_id.Binary()); + ref.mutable_owner_address()->CopyFrom(owner_addr); + MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, {ref}, JobID::Nil()); } // Do nothing if the owner replied that the object is available. The // object manager will continue trying to fetch the object, and this @@ -2574,8 +2591,9 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id, "If this was not how your object ID was generated, please file an " "issue " "at https://github.com/ray-project/ray/issues/"; - MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, {required_object_id}, - JobID::Nil()); + rpc::ObjectReference ref; + ref.set_object_id(required_object_id.Binary()); + MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, {ref}, JobID::Nil()); } } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index fe746e0b9..f8b8a6797 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -233,7 +233,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \param object_ids The object ids to store error messages into. /// \param job_id The optional job to push errors to if the writes fail. void MarkObjectsAsFailed(const ErrorType &error_type, - const std::vector object_ids, const JobID &job_id); + const std::vector object_ids, + const JobID &job_id); /// This is similar to TreatTaskAsFailed, but it will only mark the task as /// failed if at least one of the task's return values is lost. A return /// value is lost if it has been created before, but no longer exists on any diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index b0977c4f7..29c6d6891 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -62,7 +62,13 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ std::shared_ptr gcs_client, int metrics_export_port) : self_node_id_(ClientID::FromRandom()), gcs_client_(gcs_client), - object_directory_(std::make_shared(main_service, gcs_client_)), + object_directory_( + RayConfig::instance().ownership_based_object_directory_enabled() + ? std::dynamic_pointer_cast( + std::make_shared(main_service, + gcs_client_)) + : std::dynamic_pointer_cast( + std::make_shared(main_service, gcs_client_))), object_manager_(main_service, self_node_id_, object_manager_config, object_directory_), node_manager_(main_service, self_node_id_, node_manager_config, object_manager_, diff --git a/src/ray/raylet/reconstruction_policy.cc b/src/ray/raylet/reconstruction_policy.cc index b8a214939..aa1649b47 100644 --- a/src/ray/raylet/reconstruction_policy.cc +++ b/src/ray/raylet/reconstruction_policy.cc @@ -176,9 +176,10 @@ void ReconstructionPolicy::HandleTaskLeaseExpired(const TaskID &task_id) { // attempted asynchronously. for (const auto &created_object_id : it->second.created_objects) { RAY_CHECK_OK(object_directory_->LookupLocations( - created_object_id, [this, task_id, reconstruction_attempt]( - const ray::ObjectID &object_id, - const std::unordered_set &clients) { + created_object_id, it->second.owner_addresses[created_object_id], + [this, task_id, reconstruction_attempt]( + const ray::ObjectID &object_id, + const std::unordered_set &clients) { if (clients.empty()) { // The required object no longer exists on any live nodes. Attempt // reconstruction. @@ -207,7 +208,8 @@ void ReconstructionPolicy::HandleTaskLeaseNotification(const TaskID &task_id, } } -void ReconstructionPolicy::ListenAndMaybeReconstruct(const ObjectID &object_id) { +void ReconstructionPolicy::ListenAndMaybeReconstruct(const ObjectID &object_id, + const rpc::Address &owner_address) { RAY_LOG(DEBUG) << "Listening and maybe reconstructing object " << object_id; TaskID task_id = object_id.TaskId(); auto it = listening_tasks_.find(task_id); @@ -220,6 +222,7 @@ void ReconstructionPolicy::ListenAndMaybeReconstruct(const ObjectID &object_id) SetTaskTimeout(it, initial_reconstruction_timeout_ms_); } it->second.created_objects.insert(object_id); + it->second.owner_addresses.emplace(object_id, owner_address); } void ReconstructionPolicy::Cancel(const ObjectID &object_id) { diff --git a/src/ray/raylet/reconstruction_policy.h b/src/ray/raylet/reconstruction_policy.h index 5d70268f7..43d22d2d3 100644 --- a/src/ray/raylet/reconstruction_policy.h +++ b/src/ray/raylet/reconstruction_policy.h @@ -31,7 +31,8 @@ using rpc::TaskReconstructionData; class ReconstructionPolicyInterface { public: - virtual void ListenAndMaybeReconstruct(const ObjectID &object_id) = 0; + virtual void ListenAndMaybeReconstruct(const ObjectID &object_id, + const rpc::Address &owner_address) = 0; virtual void Cancel(const ObjectID &object_id) = 0; virtual ~ReconstructionPolicyInterface(){}; }; @@ -63,7 +64,8 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface { /// for the task that created the object. /// /// \param object_id The object to check for reconstruction. - void ListenAndMaybeReconstruct(const ObjectID &object_id); + void ListenAndMaybeReconstruct(const ObjectID &object_id, + const rpc::Address &owner_address); /// Cancel listening for an object. Notifications for the object will be /// ignored. This does not cancel a reconstruction attempt that is already in @@ -100,6 +102,8 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface { // The objects created by this task that we are listening for notifications for. std::unordered_set created_objects; + // Owner addresses of created objects. + std::unordered_map owner_addresses; // The time at which the timer for this task expires, according to this // node's steady clock. int64_t expires_at; diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index ef7008fa2..279dca5a7 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -45,6 +45,7 @@ class MockObjectDirectory : public ObjectDirectoryInterface { MockObjectDirectory() {} ray::Status LookupLocations(const ObjectID &object_id, + const rpc::Address &owner_address, const OnLocationsFound &callback) override { callbacks_.push_back({object_id, callback}); return ray::Status::OK(); @@ -79,9 +80,9 @@ class MockObjectDirectory : public ObjectDirectoryInterface { MOCK_METHOD0(GetLocalClientID, ray::ClientID()); MOCK_CONST_METHOD1(LookupRemoteConnectionInfo, void(RemoteConnectionInfo &)); MOCK_CONST_METHOD0(LookupAllRemoteConnections, std::vector()); - MOCK_METHOD3(SubscribeObjectLocations, + MOCK_METHOD4(SubscribeObjectLocations, ray::Status(const ray::UniqueID &, const ObjectID &, - const OnLocationsFound &)); + const rpc::Address &owner_address, const OnLocationsFound &)); MOCK_METHOD2(UnsubscribeObjectLocations, ray::Status(const ray::UniqueID &, const ObjectID &)); MOCK_METHOD3(ReportObjectAdded, @@ -281,7 +282,7 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionSimple) { ObjectID object_id = ObjectID::ForTaskReturn(task_id, /*index=*/1); // Listen for an object. - reconstruction_policy_->ListenAndMaybeReconstruct(object_id); + reconstruction_policy_->ListenAndMaybeReconstruct(object_id, rpc::Address()); // Run the test for longer than the reconstruction timeout. Run(reconstruction_timeout_ms_ * 1.1); // Check that reconstruction was triggered for the task that created the @@ -300,7 +301,7 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionEvicted) { mock_object_directory_->SetObjectLocations(object_id, {ClientID::FromRandom()}); // Listen for both objects. - reconstruction_policy_->ListenAndMaybeReconstruct(object_id); + reconstruction_policy_->ListenAndMaybeReconstruct(object_id, rpc::Address()); // Run the test for longer than the reconstruction timeout. Run(reconstruction_timeout_ms_ * 1.1); // Check that reconstruction was not triggered, since the objects still @@ -324,7 +325,7 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionObjectLost) { mock_object_directory_->SetObjectLocations(object_id, {client_id}); // Listen for both objects. - reconstruction_policy_->ListenAndMaybeReconstruct(object_id); + reconstruction_policy_->ListenAndMaybeReconstruct(object_id, rpc::Address()); // Run the test for longer than the reconstruction timeout. Run(reconstruction_timeout_ms_ * 1.1); // Check that reconstruction was not triggered, since the objects still @@ -347,8 +348,8 @@ TEST_F(ReconstructionPolicyTest, TestDuplicateReconstruction) { ObjectID object_id2 = ObjectID::ForTaskReturn(task_id, /*index=*/2); // Listen for both objects. - reconstruction_policy_->ListenAndMaybeReconstruct(object_id1); - reconstruction_policy_->ListenAndMaybeReconstruct(object_id2); + reconstruction_policy_->ListenAndMaybeReconstruct(object_id1, rpc::Address()); + reconstruction_policy_->ListenAndMaybeReconstruct(object_id2, rpc::Address()); // Run the test for longer than the reconstruction timeout. Run(reconstruction_timeout_ms_ * 1.1); // Check that reconstruction is only triggered once for the task that created @@ -376,7 +377,7 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionSuppressed) { RAY_CHECK_OK(mock_gcs_->Tasks().AsyncAddTaskLease(task_lease_data, nullptr)); // Listen for an object. - reconstruction_policy_->ListenAndMaybeReconstruct(object_id); + reconstruction_policy_->ListenAndMaybeReconstruct(object_id, rpc::Address()); // Run the test. Run(test_period); // Check that reconstruction is suppressed by the active task lease. @@ -393,7 +394,7 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionContinuallySuppressed) { ObjectID object_id = ObjectID::ForTaskReturn(task_id, /*index=*/1); // Listen for an object. - reconstruction_policy_->ListenAndMaybeReconstruct(object_id); + reconstruction_policy_->ListenAndMaybeReconstruct(object_id, rpc::Address()); // Send the reconstruction manager heartbeats about the object. SetPeriodicTimer(reconstruction_timeout_ms_ / 2, [this, task_id]() { auto task_lease_data = std::make_shared(); @@ -421,7 +422,7 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionCanceled) { ObjectID object_id = ObjectID::ForTaskReturn(task_id, /*index=*/1); // Listen for an object. - reconstruction_policy_->ListenAndMaybeReconstruct(object_id); + reconstruction_policy_->ListenAndMaybeReconstruct(object_id, rpc::Address()); // Halfway through the reconstruction timeout, cancel the object // reconstruction. auto timer_period = boost::posix_time::milliseconds(reconstruction_timeout_ms_); @@ -435,7 +436,7 @@ TEST_F(ReconstructionPolicyTest, TestReconstructionCanceled) { ASSERT_TRUE(reconstructed_tasks_.empty()); // Listen for the object again. - reconstruction_policy_->ListenAndMaybeReconstruct(object_id); + reconstruction_policy_->ListenAndMaybeReconstruct(object_id, rpc::Address()); // Run the test again. Run(reconstruction_timeout_ms_ * 1.1); // Check that this time, reconstruction is triggered. @@ -459,7 +460,7 @@ TEST_F(ReconstructionPolicyTest, TestSimultaneousReconstructionSuppressed) { [](Status status) { ASSERT_TRUE(status.ok()); })); // Listen for an object. - reconstruction_policy_->ListenAndMaybeReconstruct(object_id); + reconstruction_policy_->ListenAndMaybeReconstruct(object_id, rpc::Address()); // Run the test for longer than the reconstruction timeout. Run(reconstruction_timeout_ms_ * 1.1); // Check that reconstruction is suppressed by the reconstruction attempt diff --git a/src/ray/raylet/task_dependency_manager.cc b/src/ray/raylet/task_dependency_manager.cc index 8cf51631c..173e7bba4 100644 --- a/src/ray/raylet/task_dependency_manager.cc +++ b/src/ray/raylet/task_dependency_manager.cc @@ -37,7 +37,8 @@ bool TaskDependencyManager::CheckObjectLocal(const ObjectID &object_id) const { return local_objects_.count(object_id) == 1; } -bool TaskDependencyManager::CheckObjectRequired(const ObjectID &object_id) const { +bool TaskDependencyManager::CheckObjectRequired(const ObjectID &object_id, + rpc::Address *owner_address) const { const TaskID task_id = object_id.TaskId(); auto task_entry = required_tasks_.find(task_id); // If there are no subscribed tasks that are dependent on the object, then do @@ -58,25 +59,29 @@ bool TaskDependencyManager::CheckObjectRequired(const ObjectID &object_id) const if (pending_tasks_.count(task_id) == 1) { return false; } + if (owner_address != nullptr) { + *owner_address = task_entry->second.at(object_id).owner_address; + } return true; } void TaskDependencyManager::HandleRemoteDependencyRequired(const ObjectID &object_id) { - bool required = CheckObjectRequired(object_id); + rpc::Address owner_address; + bool required = CheckObjectRequired(object_id, &owner_address); // If the object is required, then try to make the object available locally. if (required) { auto inserted = required_objects_.insert(object_id); if (inserted.second) { // If we haven't already, request the object manager to pull it from a // remote node. - RAY_CHECK_OK(object_manager_.Pull(object_id)); - reconstruction_policy_.ListenAndMaybeReconstruct(object_id); + RAY_CHECK_OK(object_manager_.Pull(object_id, owner_address)); + reconstruction_policy_.ListenAndMaybeReconstruct(object_id, owner_address); } } } void TaskDependencyManager::HandleRemoteDependencyCanceled(const ObjectID &object_id) { - bool required = CheckObjectRequired(object_id); + bool required = CheckObjectRequired(object_id, nullptr); // If the object is no longer required, then cancel the object. if (!required) { auto it = required_objects_.find(object_id); diff --git a/src/ray/raylet/task_dependency_manager.h b/src/ray/raylet/task_dependency_manager.h index 33999700f..ca2739877 100644 --- a/src/ray/raylet/task_dependency_manager.h +++ b/src/ray/raylet/task_dependency_manager.h @@ -227,7 +227,7 @@ class TaskDependencyManager { /// transfer or reconstruction. These are objects for which: (1) there is a /// subscribed task dependent on it, (2) the object is not local, and (3) the /// task that creates the object is not pending execution locally. - bool CheckObjectRequired(const ObjectID &object_id) const; + bool CheckObjectRequired(const ObjectID &object_id, rpc::Address *owner_address) const; /// If the given object is required, then request that the object be made /// available through object transfer or reconstruction. void HandleRemoteDependencyRequired(const ObjectID &object_id); diff --git a/src/ray/raylet/task_dependency_manager_test.cc b/src/ray/raylet/task_dependency_manager_test.cc index 7220108f1..a4e9fa1fa 100644 --- a/src/ray/raylet/task_dependency_manager_test.cc +++ b/src/ray/raylet/task_dependency_manager_test.cc @@ -36,13 +36,15 @@ const static TaskID kDefaultDriverTaskId = TaskID::ForDriverTask(kDefaultJobId); class MockObjectManager : public ObjectManagerInterface { public: - MOCK_METHOD1(Pull, ray::Status(const ObjectID &object_id)); + MOCK_METHOD2(Pull, + ray::Status(const ObjectID &object_id, const rpc::Address &owner_address)); MOCK_METHOD1(CancelPull, void(const ObjectID &object_id)); }; class MockReconstructionPolicy : public ReconstructionPolicyInterface { public: - MOCK_METHOD1(ListenAndMaybeReconstruct, void(const ObjectID &object_id)); + MOCK_METHOD2(ListenAndMaybeReconstruct, + void(const ObjectID &object_id, const rpc::Address &owner_address)); MOCK_METHOD1(Cancel, void(const ObjectID &object_id)); }; @@ -147,8 +149,8 @@ TEST_F(TaskDependencyManagerTest, TestSimpleTask) { // No objects have been registered in the task dependency manager, so all // arguments should be remote. for (const auto &argument_id : arguments) { - EXPECT_CALL(object_manager_mock_, Pull(argument_id)); - EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id)); + EXPECT_CALL(object_manager_mock_, Pull(argument_id, _)); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id, _)); } // Subscribe to the task's dependencies. bool ready = task_dependency_manager_.SubscribeGetDependencies( @@ -186,8 +188,8 @@ TEST_F(TaskDependencyManagerTest, TestDuplicateSubscribeGetDependencies) { // Subscribe to the task's dependencies. All arguments except the last are // duplicates of previous subscription calls. Each argument should only be // requested from the node manager once. - EXPECT_CALL(object_manager_mock_, Pull(argument_id)); - EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id)); + EXPECT_CALL(object_manager_mock_, Pull(argument_id, _)); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id, _)); bool ready = task_dependency_manager_.SubscribeGetDependencies( task_id, ObjectIdsToRefs(arguments)); ASSERT_FALSE(ready); @@ -219,8 +221,8 @@ TEST_F(TaskDependencyManagerTest, TestMultipleTasks) { int num_dependent_tasks = 3; // The object should only be requested from the object manager once for all // three tasks. - EXPECT_CALL(object_manager_mock_, Pull(argument_id)); - EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id)); + EXPECT_CALL(object_manager_mock_, Pull(argument_id, _)); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id, _)); for (int i = 0; i < num_dependent_tasks; i++) { TaskID task_id = RandomTaskId(); dependent_tasks.push_back(task_id); @@ -251,8 +253,8 @@ TEST_F(TaskDependencyManagerTest, TestTaskChain) { int i = 0; // No objects should be remote or canceled since each task depends on a // locally queued task. - EXPECT_CALL(object_manager_mock_, Pull(_)).Times(0); - EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(_)).Times(0); + EXPECT_CALL(object_manager_mock_, Pull(_, _)).Times(0); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(_, _)).Times(0); EXPECT_CALL(object_manager_mock_, CancelPull(_)).Times(0); EXPECT_CALL(reconstruction_policy_mock_, Cancel(_)).Times(0); for (const auto &task : tasks) { @@ -310,8 +312,8 @@ TEST_F(TaskDependencyManagerTest, TestDependentPut) { // No objects have been registered in the task dependency manager, so the put // object should be remote. - EXPECT_CALL(object_manager_mock_, Pull(put_id)); - EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(put_id)); + EXPECT_CALL(object_manager_mock_, Pull(put_id, _)); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(put_id, _)); // Subscribe to the task's dependencies. bool ready = task_dependency_manager_.SubscribeGetDependencies( task2.GetTaskSpecification().TaskId(), ObjectIdsToRefs({put_id})); @@ -346,8 +348,8 @@ TEST_F(TaskDependencyManagerTest, TestTaskForwarding) { task_dependency_manager_.UnsubscribeGetDependencies(task_id); // The object returned by the first task should be considered remote once we // cancel the forwarded task, since the second task depends on it. - EXPECT_CALL(object_manager_mock_, Pull(return_id)); - EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(return_id)); + EXPECT_CALL(object_manager_mock_, Pull(return_id, _)); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(return_id, _)); task_dependency_manager_.TaskCanceled(task_id); // Simulate the task executing on a remote node and its return value @@ -371,8 +373,8 @@ TEST_F(TaskDependencyManagerTest, TestEviction) { // No objects have been registered in the task dependency manager, so all // arguments should be remote. for (const auto &argument_id : arguments) { - EXPECT_CALL(object_manager_mock_, Pull(argument_id)); - EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id)); + EXPECT_CALL(object_manager_mock_, Pull(argument_id, _)); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id, _)); } // Subscribe to the task's dependencies. bool ready = task_dependency_manager_.SubscribeGetDependencies( @@ -399,8 +401,8 @@ TEST_F(TaskDependencyManagerTest, TestEviction) { // Simulate each of the arguments getting evicted. Each object should now be // considered remote. for (const auto &argument_id : arguments) { - EXPECT_CALL(object_manager_mock_, Pull(argument_id)); - EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id)); + EXPECT_CALL(object_manager_mock_, Pull(argument_id, _)); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(argument_id, _)); } for (size_t i = 0; i < arguments.size(); i++) { std::vector waiting_tasks; @@ -467,8 +469,8 @@ TEST_F(TaskDependencyManagerTest, TestRemoveTasksAndRelatedObjects) { auto tasks = MakeTaskChain(num_tasks, {}, 1); // No objects should be remote or canceled since each task depends on a // locally queued task. - EXPECT_CALL(object_manager_mock_, Pull(_)).Times(0); - EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(_)).Times(0); + EXPECT_CALL(object_manager_mock_, Pull(_, _)).Times(0); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(_, _)).Times(0); EXPECT_CALL(object_manager_mock_, CancelPull(_)).Times(0); EXPECT_CALL(reconstruction_policy_mock_, Cancel(_)).Times(0); for (const auto &task : tasks) { @@ -528,8 +530,8 @@ TEST_F(TaskDependencyManagerTest, TestWaitDependencies) { wait_object_ids.push_back(ObjectID::FromRandom()); } // Simulate a worker calling `ray.wait` on some objects. - EXPECT_CALL(object_manager_mock_, Pull(_)).Times(num_objects); - EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(_)) + EXPECT_CALL(object_manager_mock_, Pull(_, _)).Times(num_objects); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(_, _)) .Times(num_objects); task_dependency_manager_.SubscribeWaitDependencies(worker_id, ObjectIdsToRefs(wait_object_ids)); @@ -564,8 +566,8 @@ TEST_F(TaskDependencyManagerTest, TestWaitDependenciesObjectLocal) { // requests for the objects that are not local. for (const auto &object_id : wait_object_ids) { if (object_id != local_object_id) { - EXPECT_CALL(object_manager_mock_, Pull(object_id)); - EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(object_id)); + EXPECT_CALL(object_manager_mock_, Pull(object_id, _)); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(object_id, _)); } } task_dependency_manager_.SubscribeWaitDependencies(worker_id, @@ -596,8 +598,8 @@ TEST_F(TaskDependencyManagerTest, TestWaitDependenciesHandleObjectLocal) { wait_object_ids.push_back(ObjectID::FromRandom()); } // Simulate a worker calling `ray.wait` on some objects. - EXPECT_CALL(object_manager_mock_, Pull(_)).Times(num_objects); - EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(_)) + EXPECT_CALL(object_manager_mock_, Pull(_, _)).Times(num_objects); + EXPECT_CALL(reconstruction_policy_mock_, ListenAndMaybeReconstruct(_, _)) .Times(num_objects); task_dependency_manager_.SubscribeWaitDependencies(worker_id, ObjectIdsToRefs(wait_object_ids)); diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index 8af9329e5..f30046fa5 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -142,6 +142,18 @@ class CoreWorkerClientInterface { const WaitForObjectEvictionRequest &request, const ClientCallback &callback) {} + virtual void AddObjectLocationOwner( + const AddObjectLocationOwnerRequest &request, + const ClientCallback &callback) {} + + virtual void RemoveObjectLocationOwner( + const RemoveObjectLocationOwnerRequest &request, + const ClientCallback &callback) {} + + virtual void GetObjectLocationsOwner( + const GetObjectLocationsOwnerRequest &request, + const ClientCallback &callback) {} + /// Tell this actor to exit immediately. virtual void KillActor(const KillActorRequest &request, const ClientCallback &callback) {} @@ -204,6 +216,15 @@ class CoreWorkerClient : public std::enable_shared_from_this, VOID_RPC_CLIENT_METHOD(CoreWorkerService, WaitForObjectEviction, grpc_client_, override) + VOID_RPC_CLIENT_METHOD(CoreWorkerService, AddObjectLocationOwner, grpc_client_, + override) + + VOID_RPC_CLIENT_METHOD(CoreWorkerService, RemoveObjectLocationOwner, grpc_client_, + override) + + VOID_RPC_CLIENT_METHOD(CoreWorkerService, GetObjectLocationsOwner, grpc_client_, + override) + VOID_RPC_CLIENT_METHOD(CoreWorkerService, GetCoreWorkerStats, grpc_client_, override) VOID_RPC_CLIENT_METHOD(CoreWorkerService, LocalGC, grpc_client_, override) diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index ff95a32c8..cf0c0e519 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -33,6 +33,9 @@ namespace rpc { RPC_SERVICE_HANDLER(CoreWorkerService, WaitForActorOutOfScope) \ RPC_SERVICE_HANDLER(CoreWorkerService, WaitForObjectEviction) \ RPC_SERVICE_HANDLER(CoreWorkerService, WaitForRefRemoved) \ + RPC_SERVICE_HANDLER(CoreWorkerService, AddObjectLocationOwner) \ + RPC_SERVICE_HANDLER(CoreWorkerService, RemoveObjectLocationOwner) \ + RPC_SERVICE_HANDLER(CoreWorkerService, GetObjectLocationsOwner) \ RPC_SERVICE_HANDLER(CoreWorkerService, KillActor) \ RPC_SERVICE_HANDLER(CoreWorkerService, CancelTask) \ RPC_SERVICE_HANDLER(CoreWorkerService, RemoteCancelTask) \ @@ -47,6 +50,9 @@ namespace rpc { DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(WaitForActorOutOfScope) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(WaitForObjectEviction) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(WaitForRefRemoved) \ + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AddObjectLocationOwner) \ + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(RemoveObjectLocationOwner) \ + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(GetObjectLocationsOwner) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(KillActor) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(CancelTask) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(RemoteCancelTask) \