Re-route asyncio plasma code path through raylet instead of direct plasma connection (#7234)

This commit is contained in:
ijrsvt
2020-03-03 12:43:47 -08:00
committed by GitHub
parent c2c6d96490
commit fb76092d75
14 changed files with 169 additions and 25 deletions
+5 -2
View File
@@ -635,9 +635,12 @@ cdef class CoreWorker:
def set_actor_title(self, title):
self.core_worker.get().SetActorTitle(title)
def subscribe_to_plasma(self, plasma_event_handler):
def set_plasma_added_callback(self, plasma_event_handler):
self.plasma_event_handler = plasma_event_handler
self.core_worker.get().SubscribeToAsyncPlasma(async_plasma_callback)
self.core_worker.get().SetPlasmaAddedCallback(async_plasma_callback)
def subscribe_to_plasma_object(self, ObjectID object_id):
self.core_worker.get().SubscribeToPlasmaAdd(object_id.native())
def get_plasma_event_handler(self):
return self.plasma_event_handler
+1 -1
View File
@@ -16,7 +16,7 @@ async def _async_init():
worker = ray.worker.global_worker
loop = asyncio.get_event_loop()
handler = PlasmaEventHandler(loop, worker)
worker.core_worker.subscribe_to_plasma(handler)
worker.core_worker.set_plasma_added_callback(handler)
logger.debug("AsyncPlasma Connection Created!")
+4 -2
View File
@@ -71,6 +71,8 @@ class PlasmaEventHandler:
future = PlasmaObjectFuture(loop=self._loop)
self._waiting_dict[object_id].append(future)
self.check_immediately(object_id)
if not self.check_immediately(object_id) and len(
self._waiting_dict[object_id]) == 1:
# Only subscribe once
self._worker.core_worker.subscribe_to_plasma_object(object_id)
return future
+3 -1
View File
@@ -197,4 +197,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const double capacity,
const CClientID &client_Id)
void SubscribeToAsyncPlasma(plasma_callback_function callback)
void SetPlasmaAddedCallback(plasma_callback_function callback)
void SubscribeToPlasmaAdd(const CObjectID &object_id)
+16 -13
View File
@@ -250,15 +250,9 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
if (direct_task_receiver_ != nullptr) {
direct_task_receiver_->Init(client_factory, rpc_address_);
}
plasma_notifier_.reset(new ObjectStoreNotificationManager(io_service_, store_socket,
/*exit_on_error*/ false));
}
CoreWorker::~CoreWorker() {
// ObjectStoreNotificationManager depends on io_service_ so we need to shut it down
// first.
plasma_notifier_->Shutdown();
io_service_.stop();
io_thread_.join();
if (log_dir_ != "") {
@@ -1401,13 +1395,22 @@ void CoreWorker::GetAsync(const ObjectID &object_id, SetResultCallback success_c
});
}
void CoreWorker::SubscribeToAsyncPlasma(PlasmaSubscriptionCallback subscribe_callback) {
plasma_notifier_->SubscribeObjAdded(
[subscribe_callback](const object_manager::protocol::ObjectInfoT &info) {
// This callback must be asynchronous to allow plasma to receive objects
subscribe_callback(ObjectID::FromPlasmaIdBinary(info.object_id), info.data_size,
info.metadata_size);
});
void CoreWorker::SetPlasmaAddedCallback(PlasmaSubscriptionCallback subscribe_callback) {
plasma_done_callback_ = subscribe_callback;
}
void CoreWorker::SubscribeToPlasmaAdd(const ObjectID &object_id) {
RAY_CHECK_OK(local_raylet_client_->SubscribeToPlasma(object_id));
}
void CoreWorker::HandlePlasmaObjectReady(const rpc::PlasmaObjectReadyRequest &request,
rpc::PlasmaObjectReadyReply *reply,
rpc::SendReplyCallback send_reply_callback) {
RAY_CHECK(plasma_done_callback_ != nullptr) << "Plasma done callback not defined.";
// This callback must be asynchronous to allow plasma to receive objects
plasma_done_callback_(ObjectID::FromBinary(request.object_id()), request.data_size(),
request.metadata_size());
send_reply_callback(Status::OK(), nullptr, nullptr);
}
void CoreWorker::SetActorId(const ActorID &actor_id) {
+15 -4
View File
@@ -481,6 +481,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
void HandleKillActor(const rpc::KillActorRequest &request, rpc::KillActorReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Implements gRPC server handler.
void HandlePlasmaObjectReady(const rpc::PlasmaObjectReadyRequest &request,
rpc::PlasmaObjectReadyReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Get statistics from core worker.
void HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest &request,
rpc::GetCoreWorkerStatsReply *reply,
@@ -515,11 +520,17 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Connect to plasma store for async futures
using PlasmaSubscriptionCallback = std::function<void(ray::ObjectID, int64_t, int64_t)>;
/// Subscribe to plasma store
/// Set callback when an item is added to the plasma store.
///
/// \param[in] subscribe_callback The callback when an item is added to plasma.
/// \return void
void SubscribeToAsyncPlasma(PlasmaSubscriptionCallback subscribe_callback);
void SetPlasmaAddedCallback(PlasmaSubscriptionCallback subscribe_callback);
/// Subscribe to receive notification of an object entering the plasma store.
///
/// \param[in] object_id The object to wait for.
/// \return void
void SubscribeToPlasmaAdd(const ObjectID &object_id);
private:
/// Run the io_service_ event loop. This should be called in a background thread.
@@ -785,8 +796,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
// Queue of tasks to resubmit when the specified time passes.
std::deque<std::pair<int64_t, TaskSpecification>> to_resubmit_ GUARDED_BY(mutex_);
// Plasma notification manager
std::unique_ptr<ObjectStoreNotificationManager> plasma_notifier_;
// Plasma Callback
PlasmaSubscriptionCallback plasma_done_callback_;
friend class CoreWorkerTest;
};
+11
View File
@@ -224,6 +224,15 @@ message LocalGCRequest {
message LocalGCReply {
}
message PlasmaObjectReadyRequest {
bytes object_id = 1;
int64 data_size = 2;
int64 metadata_size = 3;
}
message PlasmaObjectReadyReply {
}
service CoreWorkerService {
// Push a task to a worker from the raylet.
rpc AssignTask(AssignTaskRequest) returns (AssignTaskReply);
@@ -246,4 +255,6 @@ service CoreWorkerService {
rpc WaitForRefRemoved(WaitForRefRemovedRequest) returns (WaitForRefRemovedReply);
// Trigger local GC on the worker.
rpc LocalGC(LocalGCRequest) returns (LocalGCReply);
// Notification from raylet that an object ID is available in local plasma.
rpc PlasmaObjectReady(PlasmaObjectReadyRequest) returns (PlasmaObjectReadyReply);
}
+7
View File
@@ -80,6 +80,8 @@ enum MessageType:int {
ConnectClient,
// Set dynamic custom resource.
SetResourceRequest,
// Subscribe to Plasma updates
SubscribePlasmaReady,
}
table TaskExecutionSpecification {
@@ -263,3 +265,8 @@ table SetResourceRequest {
// Client ID where this resource will be set.
client_id: string;
}
table SubscribePlasmaReady {
// ObjectID to wait for
object_id: string;
}
+65
View File
@@ -138,6 +138,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
// Run the node manger rpc server.
node_manager_server_.RegisterService(node_manager_service_);
node_manager_server_.Run();
RAY_CHECK_OK(SetupPlasmaSubscription());
}
ray::Status NodeManager::RegisterGcs() {
@@ -1007,6 +1009,9 @@ void NodeManager::ProcessClientMessage(
case protocol::MessageType::NotifyActorResumedFromCheckpoint: {
ProcessNotifyActorResumedFromCheckpoint(message_data);
} break;
case protocol::MessageType::SubscribePlasmaReady: {
ProcessSubscribePlasmaReady(client, message_data);
} break;
default:
RAY_LOG(FATAL) << "Received unexpected message type " << message_type;
@@ -3054,6 +3059,61 @@ void NodeManager::FinishAssignTask(const std::shared_ptr<Worker> &worker,
}
}
void NodeManager::ProcessSubscribePlasmaReady(
const std::shared_ptr<LocalClientConnection> &client, const uint8_t *message_data) {
std::shared_ptr<Worker> associated_worker = worker_pool_.GetRegisteredWorker(client);
if (associated_worker == nullptr) {
associated_worker = worker_pool_.GetRegisteredDriver(client);
}
RAY_CHECK(associated_worker != nullptr)
<< "No worker exists for CoreWorker with client: " << client->DebugString();
auto message = flatbuffers::GetRoot<protocol::SubscribePlasmaReady>(message_data);
ObjectID id = from_flatbuf<ObjectID>(*message->object_id());
{
absl::MutexLock guard(&plasma_object_notification_lock_);
if (!async_plasma_objects_notification_.contains(id)) {
async_plasma_objects_notification_.emplace(
id, absl::flat_hash_set<std::shared_ptr<Worker>>());
}
// Only insert a worker once
if (!async_plasma_objects_notification_[id].contains(associated_worker)) {
async_plasma_objects_notification_[id].insert(associated_worker);
}
}
}
ray::Status NodeManager::SetupPlasmaSubscription() {
return object_manager_.SubscribeObjAdded(
[this](const object_manager::protocol::ObjectInfoT &object_info) {
ObjectID object_id = ObjectID::FromPlasmaIdBinary(object_info.object_id);
auto waiting_workers = absl::flat_hash_set<std::shared_ptr<Worker>>();
{
absl::MutexLock guard(&plasma_object_notification_lock_);
auto waiting = this->async_plasma_objects_notification_.extract(object_id);
if (!waiting.empty()) {
waiting_workers.swap(waiting.mapped());
}
}
rpc::PlasmaObjectReadyRequest request;
request.set_object_id(object_id.Binary());
request.set_metadata_size(object_info.metadata_size);
request.set_data_size(object_info.data_size);
for (auto worker : waiting_workers) {
RAY_CHECK_OK(worker->rpc_client()->PlasmaObjectReady(
request, [](Status status, const rpc::PlasmaObjectReadyReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO)
<< "Problem with telling worker that plasma object is ready"
<< status.ToString();
}
}));
}
});
}
void NodeManager::DumpDebugState() const {
std::fstream fs;
fs.open(initial_config_.session_dir + "/debug_state.txt",
@@ -3080,6 +3140,11 @@ std::string NodeManager::DebugString() const {
result << "\n" << reconstruction_policy_.DebugString();
result << "\n" << task_dependency_manager_.DebugString();
result << "\n" << lineage_cache_.DebugString();
{
absl::MutexLock guard(&plasma_object_notification_lock_);
result << "\nnum async plasma notifications: "
<< async_plasma_objects_notification_.size();
}
result << "\nActorRegistry:";
auto statistical_data = GetActorStatisticalData(actor_registry_);
+20
View File
@@ -538,6 +538,19 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
void FinishAssignTask(const std::shared_ptr<Worker> &worker, const TaskID &task_id,
bool success);
/// Process worker subscribing to plasma.
///
/// \param client The client that sent the message.
/// \param message_data A pointer to the message data.
/// \return void.
void ProcessSubscribePlasmaReady(const std::shared_ptr<LocalClientConnection> &client,
const uint8_t *message_data);
/// Setup callback with Object Manager.
///
/// \return Status indicating whether setup was successful.
ray::Status SetupPlasmaSubscription();
/// Handle a `WorkerLease` request.
void HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest &request,
rpc::RequestWorkerLeaseReply *reply,
@@ -718,6 +731,13 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// Cache for the ClientTable in the GCS.
absl::flat_hash_set<ClientID> failed_nodes_cache_;
/// Concurrency for the following map
mutable absl::Mutex plasma_object_notification_lock_;
/// Keeps track of workers waiting for objects
absl::flat_hash_map<ObjectID, absl::flat_hash_set<std::shared_ptr<Worker>>>
async_plasma_objects_notification_ GUARDED_BY(plasma_object_notification_lock_);
/// Objects that are out of scope in the application and that should be freed
/// from plasma. The cache is flushed when it reaches the config's
/// free_objects_batch_size, or if objects have been in the cache for longer
+7
View File
@@ -383,4 +383,11 @@ Status raylet::RayletClient::GlobalGC(
return grpc_client_->GlobalGC(request, callback);
}
Status raylet::RayletClient::SubscribeToPlasma(const ObjectID &object_id) {
flatbuffers::FlatBufferBuilder fbb;
auto message = protocol::CreateSubscribePlasmaReady(fbb, to_flatbuf(fbb, object_id));
fbb.Finish(message);
return conn_->WriteMessage(MessageType::SubscribePlasmaReady, &fbb);
}
} // namespace ray
+3
View File
@@ -259,6 +259,9 @@ class RayletClient : public WorkerLeaseInterface {
ray::Status GlobalGC(const rpc::ClientCallback<rpc::GlobalGCReply> &callback);
// Subscribe to receive notification on plasma object
ray::Status SubscribeToPlasma(const ObjectID &object_id);
WorkerID GetWorkerID() const { return worker_id_; }
JobID GetJobID() const { return job_id_; }
+8
View File
@@ -160,6 +160,12 @@ class CoreWorkerClientInterface {
return Status::NotImplemented("");
}
virtual ray::Status PlasmaObjectReady(
const PlasmaObjectReadyRequest &request,
const ClientCallback<PlasmaObjectReadyReply> &callback) {
return Status::NotImplemented("");
}
virtual ~CoreWorkerClientInterface(){};
};
@@ -198,6 +204,8 @@ class CoreWorkerClient : public std::enable_shared_from_this<CoreWorkerClient>,
RPC_CLIENT_METHOD(CoreWorkerService, WaitForRefRemoved, grpc_client_, override)
RPC_CLIENT_METHOD(CoreWorkerService, PlasmaObjectReady, grpc_client_, override)
ray::Status PushActorTask(std::unique_ptr<PushTaskRequest> request,
const ClientCallback<PushTaskReply> &callback) override {
request->set_sequence_number(request->task_spec().actor_task_spec().actor_counter());
+4 -2
View File
@@ -23,7 +23,8 @@ namespace rpc {
RPC_SERVICE_HANDLER(CoreWorkerService, WaitForRefRemoved, 9999) \
RPC_SERVICE_HANDLER(CoreWorkerService, KillActor, 9999) \
RPC_SERVICE_HANDLER(CoreWorkerService, GetCoreWorkerStats, 100) \
RPC_SERVICE_HANDLER(CoreWorkerService, LocalGC, 100)
RPC_SERVICE_HANDLER(CoreWorkerService, LocalGC, 100) \
RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady, 9999)
#define RAY_CORE_WORKER_DECLARE_RPC_HANDLERS \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AssignTask) \
@@ -34,7 +35,8 @@ namespace rpc {
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(WaitForRefRemoved) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(KillActor) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(GetCoreWorkerStats) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(LocalGC)
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(LocalGC) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PlasmaObjectReady)
/// Interface of the `CoreWorkerServiceHandler`, see `src/ray/protobuf/core_worker.proto`.
class CoreWorkerServiceHandler {