From e82ff08b0c1f8e2b7ea7a32fd183bc81e21befd1 Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Thu, 29 Oct 2020 11:53:10 -0700 Subject: [PATCH] Fix asyncio plasma integration in cluster mode (#11665) --- python/ray/tests/BUILD | 1 + python/ray/tests/test_asyncio_cluster.py | 34 +++++++++++ src/ray/core_worker/core_worker.cc | 78 +++++++++++------------- src/ray/core_worker/core_worker.h | 8 +-- src/ray/object_manager/object_manager.cc | 1 + src/ray/protobuf/core_worker.proto | 2 - src/ray/raylet/format/node_manager.fbs | 4 +- src/ray/raylet/node_manager.cc | 54 +++++++++++++--- src/ray/raylet/node_manager.h | 4 +- src/ray/raylet_client/raylet_client.cc | 20 +++--- src/ray/raylet_client/raylet_client.h | 2 +- 11 files changed, 135 insertions(+), 73 deletions(-) create mode 100644 python/ray/tests/test_asyncio_cluster.py diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 26d6d4b7f..90faadfb0 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -91,6 +91,7 @@ py_test_module_list( files = [ "test_args.py", "test_asyncio.py", + "test_asyncio_cluster.py", "test_autoscaler.py", "test_autoscaler_yaml.py", "test_component_failures.py", diff --git a/python/ray/tests/test_asyncio_cluster.py b/python/ray/tests/test_asyncio_cluster.py new file mode 100644 index 000000000..da5689a20 --- /dev/null +++ b/python/ray/tests/test_asyncio_cluster.py @@ -0,0 +1,34 @@ +# coding: utf-8 +import asyncio +import sys + +import pytest +import numpy as np + +import ray +from ray.cluster_utils import Cluster + + +@pytest.mark.asyncio +async def test_asyncio_cluster_wait(): + cluster = Cluster() + head_node = cluster.add_node() + cluster.add_node(resources={"OTHER_NODE": 100}) + + ray.init(address=head_node.address) + + @ray.remote(num_cpus=0, resources={"OTHER_NODE": 1}) + def get_array(): + return np.random.random((192, 1080, 3)).astype(np.uint8) # ~ 0.5MB + + object_ref = get_array.remote() + + await asyncio.wait_for(object_ref, timeout=10) + + ray.shutdown() + cluster.shutdown() + + +if __name__ == "__main__": + import pytest + sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 6196fc02b..8430172da 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2278,48 +2278,6 @@ void CoreWorker::YieldCurrentFiber(FiberEvent &event) { event.Wait(); } -void CoreWorker::PlasmaCallback(SetResultCallback success, - std::shared_ptr ray_object, ObjectID object_id, - void *py_future) { - std::vector> vec; - // Check if object is available before subscribing to plasma. - if (Get(std::vector{object_id}, 0, &vec).ok() && vec.size() > 0) { - return success(vec.front(), object_id, py_future); - } - { - absl::MutexLock lock(&plasma_mutex_); - auto it = async_plasma_callbacks_.find(object_id); - auto plasma_arrived_callback = [this, success, object_id, py_future]() { - GetAsync(object_id, success, py_future); - }; - - if (it == async_plasma_callbacks_.end()) { - async_plasma_callbacks_.emplace( - object_id, std::vector>{plasma_arrived_callback}); - } else { - it->second.push_back({plasma_arrived_callback}); - } - } - SubscribeToPlasmaAdd(object_id); - - // Check in-memory store in case object became ready *before* SubscribeToPlasmaAdd. - if (Get(std::vector{object_id}, 0, &vec).ok() && vec.size() > 0) { - std::vector> callbacks; - { - absl::MutexLock lock(&plasma_mutex_); - auto after_iter = async_plasma_callbacks_.extract(object_id); - callbacks = after_iter.mapped(); - } - for (auto callback : callbacks) { - // This callback needs to be asynchronous because it runs on the io_service_, so no - // RPCs can be processed while it's running. This can easily lead to deadlock (for - // example if the callback calls ray.get() on an object that is dependent on an RPC - // to be ready). - callback(); - } - } -} - void CoreWorker::GetAsync(const ObjectID &object_id, SetResultCallback success_callback, void *python_future) { auto fallback_callback = @@ -2336,8 +2294,40 @@ void CoreWorker::GetAsync(const ObjectID &object_id, SetResultCallback success_c }); } -void CoreWorker::SubscribeToPlasmaAdd(const ObjectID &object_id) { - RAY_CHECK_OK(local_raylet_client_->SubscribeToPlasma(object_id)); +void CoreWorker::PlasmaCallback(SetResultCallback success, + std::shared_ptr ray_object, ObjectID object_id, + void *py_future) { + RAY_CHECK(ray_object->IsInPlasmaError()); + + // First check if the object is available in local plasma store. + // Note that we are using Contains instead of Get so it won't trigger pull request + // to remote nodes. + bool object_is_local = false; + if (Contains(object_id, &object_is_local).ok() && object_is_local) { + std::vector> vec; + RAY_CHECK_OK(Get(std::vector{object_id}, 0, &vec)); + RAY_CHECK(vec.size() > 0) + << "Failed to get local object but Raylet notified object is local."; + return success(vec.front(), object_id, py_future); + } + + // Object is not available locally. We now add the callback to listener queue. + { + absl::MutexLock lock(&plasma_mutex_); + auto plasma_arrived_callback = [this, success, object_id, py_future]() { + // This callback is invoked on the io_service_ event loop, so it cannot call + // blocking call like Get(). We used GetAsync here, which should immediate call + // PlasmaCallback again with object available locally. + GetAsync(object_id, success, py_future); + }; + + async_plasma_callbacks_[object_id].push_back(plasma_arrived_callback); + } + + // Ask raylet to subscribe to object notification. Raylet will call this core worker + // when the object is local (and it will fire the callback immediately if the object + // exists). CoreWorker::HandlePlasmaObjectReady handles such request. + local_raylet_client_->SubscribeToPlasma(object_id, GetOwnerAddress(object_id)); } void CoreWorker::HandlePlasmaObjectReady(const rpc::PlasmaObjectReadyRequest &request, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index c7f419732..7f624c3bb 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -903,7 +903,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { using SetResultCallback = std::function, ObjectID object_id, void *)>; - /// Perform async get from in-memory store. + /// Perform async get from the object store. /// /// \param[in] object_id The id to call get on. /// \param[in] success_callback The callback to use the result object. @@ -912,12 +912,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { void GetAsync(const ObjectID &object_id, SetResultCallback success_callback, void *python_future); - /// Subscribe to receive notification of an object entering the plasma store. - /// - /// \param[in] object_id The object to wait for. - /// \return void - void SubscribeToPlasmaAdd(const ObjectID &object_id); - private: void SetCurrentTaskId(const TaskID &task_id); diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index c279806cc..c3df404e5 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -176,6 +176,7 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id, return ray::Status::OK(); } if (pull_requests_.find(object_id) != pull_requests_.end()) { + RAY_LOG(DEBUG) << object_id << " has inflight pull_requests, skipping."; return ray::Status::OK(); } diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index f62ad8070..5a5c751e3 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -287,8 +287,6 @@ message LocalGCReply { message PlasmaObjectReadyRequest { bytes object_id = 1; - int64 data_size = 2; - int64 metadata_size = 3; } message PlasmaObjectReadyReply { diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index 21930c3dd..4234f59c8 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -80,7 +80,7 @@ enum MessageType:int { ConnectClient, // Set dynamic custom resource. SetResourceRequest, - // Subscribe to Plasma updates + // Subscribe to Plasma updates. SubscribePlasmaReady, } @@ -296,6 +296,8 @@ table SetResourceRequest { table SubscribePlasmaReady { // ObjectID to wait for object_id: string; + // The owner address for the ObjectID + owner_address: Address; } table ForceSpillObjectsRequest { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 72041cacb..d7ef3674b 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -56,6 +56,17 @@ ActorStats GetActorStatisticalData( return item; } +inline ray::rpc::ObjectReference FlatbufferToSingleObjectReference( + const flatbuffers::String &object_id, const ray::protocol::Address &address) { + ray::rpc::ObjectReference ref; + ref.set_object_id(object_id.str()); + ref.mutable_owner_address()->set_raylet_id(address.raylet_id()->str()); + ref.mutable_owner_address()->set_ip_address(address.ip_address()->str()); + ref.mutable_owner_address()->set_port(address.port()); + ref.mutable_owner_address()->set_worker_id(address.worker_id()->str()); + return ref; +} + std::vector FlatbufferToObjectReference( const flatbuffers::Vector> &object_ids, const flatbuffers::Vector> @@ -2877,15 +2888,40 @@ void NodeManager::ProcessSubscribePlasmaReady( auto message = flatbuffers::GetRoot(message_data); ObjectID id = from_flatbuf(*message->object_id()); - { - absl::MutexLock guard(&plasma_object_notification_lock_); - if (!async_plasma_objects_notification_.contains(id)) { - async_plasma_objects_notification_.emplace( - id, absl::flat_hash_set>()); - } - // Only insert a worker once - if (!async_plasma_objects_notification_[id].contains(associated_worker)) { + if (task_dependency_manager_.CheckObjectLocal(id)) { + // Object is already local, so we directly fire the callback to tell the core worker + // that the plasma object is ready. + rpc::PlasmaObjectReadyRequest request; + request.set_object_id(id.Binary()); + + RAY_LOG(DEBUG) << "Object " << id << " is already local, firing callback directly."; + associated_worker->rpc_client()->PlasmaObjectReady( + request, [](Status status, const rpc::PlasmaObjectReadyReply &reply) { + if (!status.ok()) { + RAY_LOG(INFO) << "Problem with telling worker that plasma object is ready" + << status.ToString(); + } + }); + } else { + // The object is not local, so we are subscribing to pull and wait for the objects. + std::vector refs = {FlatbufferToSingleObjectReference( + *message->object_id(), *message->owner_address())}; + + // NOTE(simon): This call will issue a pull request to remote workers and make sure + // the object will be local. + // 1. We currently do not allow user to cancel this call. The object will be pulled + // even if the `await object_ref` is cancelled. + // 2. We currently do not handle edge cases with object eviction where the object + // is local at this time but when the core worker was notified, the object is + // is evicted. The core worker should be able to handle evicted object in this + // case. + task_dependency_manager_.SubscribeWaitDependencies(associated_worker->WorkerId(), + refs); + + // Add this worker to the listeners for the object ID. + { + absl::MutexLock guard(&plasma_object_notification_lock_); async_plasma_objects_notification_[id].insert(associated_worker); } } @@ -2905,8 +2941,6 @@ ray::Status NodeManager::SetupPlasmaSubscription() { } rpc::PlasmaObjectReadyRequest request; request.set_object_id(object_id.Binary()); - request.set_metadata_size(object_info.metadata_size); - request.set_data_size(object_info.data_size); for (auto worker : waiting_workers) { worker->rpc_client()->PlasmaObjectReady( diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 57b26a432..20734e14c 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -570,7 +570,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler { void FinishAssignTask(const std::shared_ptr &worker, const TaskID &task_id, bool success); - /// Process worker subscribing to plasma. + /// Process worker subscribing to a given plasma object become available. This handler + /// makes sure that the plasma object is local and calls core worker's PlasmaObjectReady + /// gRPC endpoint. /// /// \param client The client that sent the message. /// \param message_data A pointer to the message data. diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 22b1a88b4..77c4aff93 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -25,6 +25,12 @@ using MessageType = ray::protocol::MessageType; namespace { +inline flatbuffers::Offset to_flatbuf( + flatbuffers::FlatBufferBuilder &fbb, const ray::rpc::Address &address) { + return ray::protocol::CreateAddress( + fbb, fbb.CreateString(address.raylet_id()), fbb.CreateString(address.ip_address()), + address.port(), fbb.CreateString(address.worker_id())); +} flatbuffers::Offset>> AddressesToFlatbuffer(flatbuffers::FlatBufferBuilder &fbb, @@ -32,10 +38,7 @@ AddressesToFlatbuffer(flatbuffers::FlatBufferBuilder &fbb, std::vector> address_vec; address_vec.reserve(addresses.size()); for (const auto &addr : addresses) { - auto fbb_addr = ray::protocol::CreateAddress( - fbb, fbb.CreateString(addr.raylet_id()), fbb.CreateString(addr.ip_address()), - addr.port(), fbb.CreateString(addr.worker_id())); - address_vec.push_back(fbb_addr); + address_vec.push_back(to_flatbuf(fbb, addr)); } return fbb.CreateVector(address_vec); } @@ -416,11 +419,14 @@ void raylet::RayletClient::GlobalGC( grpc_client_->GlobalGC(request, callback); } -Status raylet::RayletClient::SubscribeToPlasma(const ObjectID &object_id) { +void raylet::RayletClient::SubscribeToPlasma(const ObjectID &object_id, + const rpc::Address &owner_address) { flatbuffers::FlatBufferBuilder fbb; - auto message = protocol::CreateSubscribePlasmaReady(fbb, to_flatbuf(fbb, object_id)); + auto message = protocol::CreateSubscribePlasmaReady(fbb, to_flatbuf(fbb, object_id), + to_flatbuf(fbb, owner_address)); fbb.Finish(message); - return conn_->WriteMessage(MessageType::SubscribePlasmaReady, &fbb); + + RAY_CHECK_OK(conn_->WriteMessage(MessageType::SubscribePlasmaReady, &fbb)); } } // namespace ray diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index 4ee749b33..8feb437e5 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -387,7 +387,7 @@ class RayletClient : public PinObjectsInterface, void GlobalGC(const rpc::ClientCallback &callback); // Subscribe to receive notification on plasma object - ray::Status SubscribeToPlasma(const ObjectID &object_id); + void SubscribeToPlasma(const ObjectID &object_id, const rpc::Address &owner_address); WorkerID GetWorkerID() const { return worker_id_; }