From fb76092d75dd3adfe02ed6ccfca220291da22304 Mon Sep 17 00:00:00 2001 From: ijrsvt Date: Tue, 3 Mar 2020 12:43:47 -0800 Subject: [PATCH] Re-route asyncio plasma code path through raylet instead of direct plasma connection (#7234) --- python/ray/_raylet.pyx | 7 ++- python/ray/experimental/async_api.py | 2 +- python/ray/experimental/async_plasma.py | 6 ++- python/ray/includes/libcoreworker.pxd | 4 +- src/ray/core_worker/core_worker.cc | 29 ++++++----- src/ray/core_worker/core_worker.h | 19 ++++++-- src/ray/protobuf/core_worker.proto | 11 +++++ src/ray/raylet/format/node_manager.fbs | 7 +++ src/ray/raylet/node_manager.cc | 65 +++++++++++++++++++++++++ src/ray/raylet/node_manager.h | 20 ++++++++ src/ray/raylet/raylet_client.cc | 7 +++ src/ray/raylet/raylet_client.h | 3 ++ src/ray/rpc/worker/core_worker_client.h | 8 +++ src/ray/rpc/worker/core_worker_server.h | 6 ++- 14 files changed, 169 insertions(+), 25 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 366e80df1..33e6c61e6 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -635,9 +635,12 @@ cdef class CoreWorker: def set_actor_title(self, title): self.core_worker.get().SetActorTitle(title) - def subscribe_to_plasma(self, plasma_event_handler): + def set_plasma_added_callback(self, plasma_event_handler): self.plasma_event_handler = plasma_event_handler - self.core_worker.get().SubscribeToAsyncPlasma(async_plasma_callback) + self.core_worker.get().SetPlasmaAddedCallback(async_plasma_callback) + + def subscribe_to_plasma_object(self, ObjectID object_id): + self.core_worker.get().SubscribeToPlasmaAdd(object_id.native()) def get_plasma_event_handler(self): return self.plasma_event_handler diff --git a/python/ray/experimental/async_api.py b/python/ray/experimental/async_api.py index c15c9e3cc..5ae3323fc 100644 --- a/python/ray/experimental/async_api.py +++ b/python/ray/experimental/async_api.py @@ -16,7 +16,7 @@ async def _async_init(): worker = ray.worker.global_worker loop = asyncio.get_event_loop() handler = PlasmaEventHandler(loop, worker) - worker.core_worker.subscribe_to_plasma(handler) + worker.core_worker.set_plasma_added_callback(handler) logger.debug("AsyncPlasma Connection Created!") diff --git a/python/ray/experimental/async_plasma.py b/python/ray/experimental/async_plasma.py index 2418280be..f99be4522 100644 --- a/python/ray/experimental/async_plasma.py +++ b/python/ray/experimental/async_plasma.py @@ -71,6 +71,8 @@ class PlasmaEventHandler: future = PlasmaObjectFuture(loop=self._loop) self._waiting_dict[object_id].append(future) - self.check_immediately(object_id) - + if not self.check_immediately(object_id) and len( + self._waiting_dict[object_id]) == 1: + # Only subscribe once + self._worker.core_worker.subscribe_to_plasma_object(object_id) return future diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index d51c812a3..5cc3f0824 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -197,4 +197,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const double capacity, const CClientID &client_Id) - void SubscribeToAsyncPlasma(plasma_callback_function callback) + void SetPlasmaAddedCallback(plasma_callback_function callback) + + void SubscribeToPlasmaAdd(const CObjectID &object_id) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index aa856a8ce..2aebc9c9d 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -250,15 +250,9 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, if (direct_task_receiver_ != nullptr) { direct_task_receiver_->Init(client_factory, rpc_address_); } - plasma_notifier_.reset(new ObjectStoreNotificationManager(io_service_, store_socket, - /*exit_on_error*/ false)); } CoreWorker::~CoreWorker() { - // ObjectStoreNotificationManager depends on io_service_ so we need to shut it down - // first. - plasma_notifier_->Shutdown(); - io_service_.stop(); io_thread_.join(); if (log_dir_ != "") { @@ -1401,13 +1395,22 @@ void CoreWorker::GetAsync(const ObjectID &object_id, SetResultCallback success_c }); } -void CoreWorker::SubscribeToAsyncPlasma(PlasmaSubscriptionCallback subscribe_callback) { - plasma_notifier_->SubscribeObjAdded( - [subscribe_callback](const object_manager::protocol::ObjectInfoT &info) { - // This callback must be asynchronous to allow plasma to receive objects - subscribe_callback(ObjectID::FromPlasmaIdBinary(info.object_id), info.data_size, - info.metadata_size); - }); +void CoreWorker::SetPlasmaAddedCallback(PlasmaSubscriptionCallback subscribe_callback) { + plasma_done_callback_ = subscribe_callback; +} + +void CoreWorker::SubscribeToPlasmaAdd(const ObjectID &object_id) { + RAY_CHECK_OK(local_raylet_client_->SubscribeToPlasma(object_id)); +} + +void CoreWorker::HandlePlasmaObjectReady(const rpc::PlasmaObjectReadyRequest &request, + rpc::PlasmaObjectReadyReply *reply, + rpc::SendReplyCallback send_reply_callback) { + RAY_CHECK(plasma_done_callback_ != nullptr) << "Plasma done callback not defined."; + // This callback must be asynchronous to allow plasma to receive objects + plasma_done_callback_(ObjectID::FromBinary(request.object_id()), request.data_size(), + request.metadata_size()); + send_reply_callback(Status::OK(), nullptr, nullptr); } void CoreWorker::SetActorId(const ActorID &actor_id) { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index e9249f2c9..8ca480153 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -481,6 +481,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { void HandleKillActor(const rpc::KillActorRequest &request, rpc::KillActorReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Implements gRPC server handler. + void HandlePlasmaObjectReady(const rpc::PlasmaObjectReadyRequest &request, + rpc::PlasmaObjectReadyReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Get statistics from core worker. void HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest &request, rpc::GetCoreWorkerStatsReply *reply, @@ -515,11 +520,17 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Connect to plasma store for async futures using PlasmaSubscriptionCallback = std::function; - /// Subscribe to plasma store + /// Set callback when an item is added to the plasma store. /// /// \param[in] subscribe_callback The callback when an item is added to plasma. /// \return void - void SubscribeToAsyncPlasma(PlasmaSubscriptionCallback subscribe_callback); + void SetPlasmaAddedCallback(PlasmaSubscriptionCallback subscribe_callback); + + /// Subscribe to receive notification of an object entering the plasma store. + /// + /// \param[in] object_id The object to wait for. + /// \return void + void SubscribeToPlasmaAdd(const ObjectID &object_id); private: /// Run the io_service_ event loop. This should be called in a background thread. @@ -785,8 +796,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { // Queue of tasks to resubmit when the specified time passes. std::deque> to_resubmit_ GUARDED_BY(mutex_); - // Plasma notification manager - std::unique_ptr plasma_notifier_; + // Plasma Callback + PlasmaSubscriptionCallback plasma_done_callback_; friend class CoreWorkerTest; }; diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 2978909d6..725c96af7 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -224,6 +224,15 @@ message LocalGCRequest { message LocalGCReply { } +message PlasmaObjectReadyRequest { + bytes object_id = 1; + int64 data_size = 2; + int64 metadata_size = 3; +} + +message PlasmaObjectReadyReply { +} + service CoreWorkerService { // Push a task to a worker from the raylet. rpc AssignTask(AssignTaskRequest) returns (AssignTaskReply); @@ -246,4 +255,6 @@ service CoreWorkerService { rpc WaitForRefRemoved(WaitForRefRemovedRequest) returns (WaitForRefRemovedReply); // Trigger local GC on the worker. rpc LocalGC(LocalGCRequest) returns (LocalGCReply); + // Notification from raylet that an object ID is available in local plasma. + rpc PlasmaObjectReady(PlasmaObjectReadyRequest) returns (PlasmaObjectReadyReply); } diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index 0bf51445d..727adf6b3 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -80,6 +80,8 @@ enum MessageType:int { ConnectClient, // Set dynamic custom resource. SetResourceRequest, + // Subscribe to Plasma updates + SubscribePlasmaReady, } table TaskExecutionSpecification { @@ -263,3 +265,8 @@ table SetResourceRequest { // Client ID where this resource will be set. client_id: string; } + +table SubscribePlasmaReady { + // ObjectID to wait for + object_id: string; +} diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 8c6716dbe..e45ce81b7 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -138,6 +138,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, // Run the node manger rpc server. node_manager_server_.RegisterService(node_manager_service_); node_manager_server_.Run(); + + RAY_CHECK_OK(SetupPlasmaSubscription()); } ray::Status NodeManager::RegisterGcs() { @@ -1007,6 +1009,9 @@ void NodeManager::ProcessClientMessage( case protocol::MessageType::NotifyActorResumedFromCheckpoint: { ProcessNotifyActorResumedFromCheckpoint(message_data); } break; + case protocol::MessageType::SubscribePlasmaReady: { + ProcessSubscribePlasmaReady(client, message_data); + } break; default: RAY_LOG(FATAL) << "Received unexpected message type " << message_type; @@ -3054,6 +3059,61 @@ void NodeManager::FinishAssignTask(const std::shared_ptr &worker, } } +void NodeManager::ProcessSubscribePlasmaReady( + const std::shared_ptr &client, const uint8_t *message_data) { + std::shared_ptr associated_worker = worker_pool_.GetRegisteredWorker(client); + if (associated_worker == nullptr) { + associated_worker = worker_pool_.GetRegisteredDriver(client); + } + RAY_CHECK(associated_worker != nullptr) + << "No worker exists for CoreWorker with client: " << client->DebugString(); + + auto message = flatbuffers::GetRoot(message_data); + ObjectID id = from_flatbuf(*message->object_id()); + { + absl::MutexLock guard(&plasma_object_notification_lock_); + if (!async_plasma_objects_notification_.contains(id)) { + async_plasma_objects_notification_.emplace( + id, absl::flat_hash_set>()); + } + + // Only insert a worker once + if (!async_plasma_objects_notification_[id].contains(associated_worker)) { + async_plasma_objects_notification_[id].insert(associated_worker); + } + } +} + +ray::Status NodeManager::SetupPlasmaSubscription() { + return object_manager_.SubscribeObjAdded( + [this](const object_manager::protocol::ObjectInfoT &object_info) { + ObjectID object_id = ObjectID::FromPlasmaIdBinary(object_info.object_id); + auto waiting_workers = absl::flat_hash_set>(); + { + absl::MutexLock guard(&plasma_object_notification_lock_); + auto waiting = this->async_plasma_objects_notification_.extract(object_id); + if (!waiting.empty()) { + waiting_workers.swap(waiting.mapped()); + } + } + rpc::PlasmaObjectReadyRequest request; + request.set_object_id(object_id.Binary()); + request.set_metadata_size(object_info.metadata_size); + request.set_data_size(object_info.data_size); + + for (auto worker : waiting_workers) { + RAY_CHECK_OK(worker->rpc_client()->PlasmaObjectReady( + request, [](Status status, const rpc::PlasmaObjectReadyReply &reply) { + if (!status.ok()) { + RAY_LOG(INFO) + << "Problem with telling worker that plasma object is ready" + << status.ToString(); + } + })); + } + }); +} + void NodeManager::DumpDebugState() const { std::fstream fs; fs.open(initial_config_.session_dir + "/debug_state.txt", @@ -3080,6 +3140,11 @@ std::string NodeManager::DebugString() const { result << "\n" << reconstruction_policy_.DebugString(); result << "\n" << task_dependency_manager_.DebugString(); result << "\n" << lineage_cache_.DebugString(); + { + absl::MutexLock guard(&plasma_object_notification_lock_); + result << "\nnum async plasma notifications: " + << async_plasma_objects_notification_.size(); + } result << "\nActorRegistry:"; auto statistical_data = GetActorStatisticalData(actor_registry_); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 5788eee8d..7f5db4898 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -538,6 +538,19 @@ class NodeManager : public rpc::NodeManagerServiceHandler { void FinishAssignTask(const std::shared_ptr &worker, const TaskID &task_id, bool success); + /// Process worker subscribing to plasma. + /// + /// \param client The client that sent the message. + /// \param message_data A pointer to the message data. + /// \return void. + void ProcessSubscribePlasmaReady(const std::shared_ptr &client, + const uint8_t *message_data); + + /// Setup callback with Object Manager. + /// + /// \return Status indicating whether setup was successful. + ray::Status SetupPlasmaSubscription(); + /// Handle a `WorkerLease` request. void HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest &request, rpc::RequestWorkerLeaseReply *reply, @@ -718,6 +731,13 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Cache for the ClientTable in the GCS. absl::flat_hash_set failed_nodes_cache_; + /// Concurrency for the following map + mutable absl::Mutex plasma_object_notification_lock_; + + /// Keeps track of workers waiting for objects + absl::flat_hash_map>> + async_plasma_objects_notification_ GUARDED_BY(plasma_object_notification_lock_); + /// Objects that are out of scope in the application and that should be freed /// from plasma. The cache is flushed when it reaches the config's /// free_objects_batch_size, or if objects have been in the cache for longer diff --git a/src/ray/raylet/raylet_client.cc b/src/ray/raylet/raylet_client.cc index 9e45a3b54..f6c835a43 100644 --- a/src/ray/raylet/raylet_client.cc +++ b/src/ray/raylet/raylet_client.cc @@ -383,4 +383,11 @@ Status raylet::RayletClient::GlobalGC( return grpc_client_->GlobalGC(request, callback); } +Status raylet::RayletClient::SubscribeToPlasma(const ObjectID &object_id) { + flatbuffers::FlatBufferBuilder fbb; + auto message = protocol::CreateSubscribePlasmaReady(fbb, to_flatbuf(fbb, object_id)); + fbb.Finish(message); + return conn_->WriteMessage(MessageType::SubscribePlasmaReady, &fbb); +} + } // namespace ray diff --git a/src/ray/raylet/raylet_client.h b/src/ray/raylet/raylet_client.h index f1716bf52..cf92a4b4c 100644 --- a/src/ray/raylet/raylet_client.h +++ b/src/ray/raylet/raylet_client.h @@ -259,6 +259,9 @@ class RayletClient : public WorkerLeaseInterface { ray::Status GlobalGC(const rpc::ClientCallback &callback); + // Subscribe to receive notification on plasma object + ray::Status SubscribeToPlasma(const ObjectID &object_id); + WorkerID GetWorkerID() const { return worker_id_; } JobID GetJobID() const { return job_id_; } diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index 5ace160de..21e491f0c 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -160,6 +160,12 @@ class CoreWorkerClientInterface { return Status::NotImplemented(""); } + virtual ray::Status PlasmaObjectReady( + const PlasmaObjectReadyRequest &request, + const ClientCallback &callback) { + return Status::NotImplemented(""); + } + virtual ~CoreWorkerClientInterface(){}; }; @@ -198,6 +204,8 @@ class CoreWorkerClient : public std::enable_shared_from_this, RPC_CLIENT_METHOD(CoreWorkerService, WaitForRefRemoved, grpc_client_, override) + RPC_CLIENT_METHOD(CoreWorkerService, PlasmaObjectReady, grpc_client_, override) + ray::Status PushActorTask(std::unique_ptr request, const ClientCallback &callback) override { request->set_sequence_number(request->task_spec().actor_task_spec().actor_counter()); diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index 7d384d213..fa5b7d1ba 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -23,7 +23,8 @@ namespace rpc { RPC_SERVICE_HANDLER(CoreWorkerService, WaitForRefRemoved, 9999) \ RPC_SERVICE_HANDLER(CoreWorkerService, KillActor, 9999) \ RPC_SERVICE_HANDLER(CoreWorkerService, GetCoreWorkerStats, 100) \ - RPC_SERVICE_HANDLER(CoreWorkerService, LocalGC, 100) + RPC_SERVICE_HANDLER(CoreWorkerService, LocalGC, 100) \ + RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady, 9999) #define RAY_CORE_WORKER_DECLARE_RPC_HANDLERS \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AssignTask) \ @@ -34,7 +35,8 @@ namespace rpc { DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(WaitForRefRemoved) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(KillActor) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(GetCoreWorkerStats) \ - DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(LocalGC) + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(LocalGC) \ + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PlasmaObjectReady) /// Interface of the `CoreWorkerServiceHandler`, see `src/ray/protobuf/core_worker.proto`. class CoreWorkerServiceHandler {