Fix asyncio plasma integration in cluster mode (#11665)

This commit is contained in:
Simon Mo
2020-10-29 11:53:10 -07:00
committed by GitHub
parent 0b7a3d9e02
commit e82ff08b0c
11 changed files with 135 additions and 73 deletions
+1
View File
@@ -91,6 +91,7 @@ py_test_module_list(
files = [
"test_args.py",
"test_asyncio.py",
"test_asyncio_cluster.py",
"test_autoscaler.py",
"test_autoscaler_yaml.py",
"test_component_failures.py",
+34
View File
@@ -0,0 +1,34 @@
# coding: utf-8
import asyncio
import sys
import pytest
import numpy as np
import ray
from ray.cluster_utils import Cluster
@pytest.mark.asyncio
async def test_asyncio_cluster_wait():
cluster = Cluster()
head_node = cluster.add_node()
cluster.add_node(resources={"OTHER_NODE": 100})
ray.init(address=head_node.address)
@ray.remote(num_cpus=0, resources={"OTHER_NODE": 1})
def get_array():
return np.random.random((192, 1080, 3)).astype(np.uint8) # ~ 0.5MB
object_ref = get_array.remote()
await asyncio.wait_for(object_ref, timeout=10)
ray.shutdown()
cluster.shutdown()
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
+34 -44
View File
@@ -2278,48 +2278,6 @@ void CoreWorker::YieldCurrentFiber(FiberEvent &event) {
event.Wait();
}
void CoreWorker::PlasmaCallback(SetResultCallback success,
std::shared_ptr<RayObject> ray_object, ObjectID object_id,
void *py_future) {
std::vector<std::shared_ptr<RayObject>> vec;
// Check if object is available before subscribing to plasma.
if (Get(std::vector<ObjectID>{object_id}, 0, &vec).ok() && vec.size() > 0) {
return success(vec.front(), object_id, py_future);
}
{
absl::MutexLock lock(&plasma_mutex_);
auto it = async_plasma_callbacks_.find(object_id);
auto plasma_arrived_callback = [this, success, object_id, py_future]() {
GetAsync(object_id, success, py_future);
};
if (it == async_plasma_callbacks_.end()) {
async_plasma_callbacks_.emplace(
object_id, std::vector<std::function<void(void)>>{plasma_arrived_callback});
} else {
it->second.push_back({plasma_arrived_callback});
}
}
SubscribeToPlasmaAdd(object_id);
// Check in-memory store in case object became ready *before* SubscribeToPlasmaAdd.
if (Get(std::vector<ObjectID>{object_id}, 0, &vec).ok() && vec.size() > 0) {
std::vector<std::function<void(void)>> callbacks;
{
absl::MutexLock lock(&plasma_mutex_);
auto after_iter = async_plasma_callbacks_.extract(object_id);
callbacks = after_iter.mapped();
}
for (auto callback : callbacks) {
// This callback needs to be asynchronous because it runs on the io_service_, so no
// RPCs can be processed while it's running. This can easily lead to deadlock (for
// example if the callback calls ray.get() on an object that is dependent on an RPC
// to be ready).
callback();
}
}
}
void CoreWorker::GetAsync(const ObjectID &object_id, SetResultCallback success_callback,
void *python_future) {
auto fallback_callback =
@@ -2336,8 +2294,40 @@ void CoreWorker::GetAsync(const ObjectID &object_id, SetResultCallback success_c
});
}
void CoreWorker::SubscribeToPlasmaAdd(const ObjectID &object_id) {
RAY_CHECK_OK(local_raylet_client_->SubscribeToPlasma(object_id));
void CoreWorker::PlasmaCallback(SetResultCallback success,
std::shared_ptr<RayObject> ray_object, ObjectID object_id,
void *py_future) {
RAY_CHECK(ray_object->IsInPlasmaError());
// First check if the object is available in local plasma store.
// Note that we are using Contains instead of Get so it won't trigger pull request
// to remote nodes.
bool object_is_local = false;
if (Contains(object_id, &object_is_local).ok() && object_is_local) {
std::vector<std::shared_ptr<RayObject>> vec;
RAY_CHECK_OK(Get(std::vector<ObjectID>{object_id}, 0, &vec));
RAY_CHECK(vec.size() > 0)
<< "Failed to get local object but Raylet notified object is local.";
return success(vec.front(), object_id, py_future);
}
// Object is not available locally. We now add the callback to listener queue.
{
absl::MutexLock lock(&plasma_mutex_);
auto plasma_arrived_callback = [this, success, object_id, py_future]() {
// This callback is invoked on the io_service_ event loop, so it cannot call
// blocking call like Get(). We used GetAsync here, which should immediate call
// PlasmaCallback again with object available locally.
GetAsync(object_id, success, py_future);
};
async_plasma_callbacks_[object_id].push_back(plasma_arrived_callback);
}
// Ask raylet to subscribe to object notification. Raylet will call this core worker
// when the object is local (and it will fire the callback immediately if the object
// exists). CoreWorker::HandlePlasmaObjectReady handles such request.
local_raylet_client_->SubscribeToPlasma(object_id, GetOwnerAddress(object_id));
}
void CoreWorker::HandlePlasmaObjectReady(const rpc::PlasmaObjectReadyRequest &request,
+1 -7
View File
@@ -903,7 +903,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
using SetResultCallback =
std::function<void(std::shared_ptr<RayObject>, ObjectID object_id, void *)>;
/// Perform async get from in-memory store.
/// Perform async get from the object store.
///
/// \param[in] object_id The id to call get on.
/// \param[in] success_callback The callback to use the result object.
@@ -912,12 +912,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
void GetAsync(const ObjectID &object_id, SetResultCallback success_callback,
void *python_future);
/// Subscribe to receive notification of an object entering the plasma store.
///
/// \param[in] object_id The object to wait for.
/// \return void
void SubscribeToPlasmaAdd(const ObjectID &object_id);
private:
void SetCurrentTaskId(const TaskID &task_id);
+1
View File
@@ -176,6 +176,7 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id,
return ray::Status::OK();
}
if (pull_requests_.find(object_id) != pull_requests_.end()) {
RAY_LOG(DEBUG) << object_id << " has inflight pull_requests, skipping.";
return ray::Status::OK();
}
-2
View File
@@ -287,8 +287,6 @@ message LocalGCReply {
message PlasmaObjectReadyRequest {
bytes object_id = 1;
int64 data_size = 2;
int64 metadata_size = 3;
}
message PlasmaObjectReadyReply {
+3 -1
View File
@@ -80,7 +80,7 @@ enum MessageType:int {
ConnectClient,
// Set dynamic custom resource.
SetResourceRequest,
// Subscribe to Plasma updates
// Subscribe to Plasma updates.
SubscribePlasmaReady,
}
@@ -296,6 +296,8 @@ table SetResourceRequest {
table SubscribePlasmaReady {
// ObjectID to wait for
object_id: string;
// The owner address for the ObjectID
owner_address: Address;
}
table ForceSpillObjectsRequest {
+44 -10
View File
@@ -56,6 +56,17 @@ ActorStats GetActorStatisticalData(
return item;
}
inline ray::rpc::ObjectReference FlatbufferToSingleObjectReference(
const flatbuffers::String &object_id, const ray::protocol::Address &address) {
ray::rpc::ObjectReference ref;
ref.set_object_id(object_id.str());
ref.mutable_owner_address()->set_raylet_id(address.raylet_id()->str());
ref.mutable_owner_address()->set_ip_address(address.ip_address()->str());
ref.mutable_owner_address()->set_port(address.port());
ref.mutable_owner_address()->set_worker_id(address.worker_id()->str());
return ref;
}
std::vector<ray::rpc::ObjectReference> FlatbufferToObjectReference(
const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>> &object_ids,
const flatbuffers::Vector<flatbuffers::Offset<ray::protocol::Address>>
@@ -2877,15 +2888,40 @@ void NodeManager::ProcessSubscribePlasmaReady(
auto message = flatbuffers::GetRoot<protocol::SubscribePlasmaReady>(message_data);
ObjectID id = from_flatbuf<ObjectID>(*message->object_id());
{
absl::MutexLock guard(&plasma_object_notification_lock_);
if (!async_plasma_objects_notification_.contains(id)) {
async_plasma_objects_notification_.emplace(
id, absl::flat_hash_set<std::shared_ptr<WorkerInterface>>());
}
// Only insert a worker once
if (!async_plasma_objects_notification_[id].contains(associated_worker)) {
if (task_dependency_manager_.CheckObjectLocal(id)) {
// Object is already local, so we directly fire the callback to tell the core worker
// that the plasma object is ready.
rpc::PlasmaObjectReadyRequest request;
request.set_object_id(id.Binary());
RAY_LOG(DEBUG) << "Object " << id << " is already local, firing callback directly.";
associated_worker->rpc_client()->PlasmaObjectReady(
request, [](Status status, const rpc::PlasmaObjectReadyReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO) << "Problem with telling worker that plasma object is ready"
<< status.ToString();
}
});
} else {
// The object is not local, so we are subscribing to pull and wait for the objects.
std::vector<rpc::ObjectReference> refs = {FlatbufferToSingleObjectReference(
*message->object_id(), *message->owner_address())};
// NOTE(simon): This call will issue a pull request to remote workers and make sure
// the object will be local.
// 1. We currently do not allow user to cancel this call. The object will be pulled
// even if the `await object_ref` is cancelled.
// 2. We currently do not handle edge cases with object eviction where the object
// is local at this time but when the core worker was notified, the object is
// is evicted. The core worker should be able to handle evicted object in this
// case.
task_dependency_manager_.SubscribeWaitDependencies(associated_worker->WorkerId(),
refs);
// Add this worker to the listeners for the object ID.
{
absl::MutexLock guard(&plasma_object_notification_lock_);
async_plasma_objects_notification_[id].insert(associated_worker);
}
}
@@ -2905,8 +2941,6 @@ ray::Status NodeManager::SetupPlasmaSubscription() {
}
rpc::PlasmaObjectReadyRequest request;
request.set_object_id(object_id.Binary());
request.set_metadata_size(object_info.metadata_size);
request.set_data_size(object_info.data_size);
for (auto worker : waiting_workers) {
worker->rpc_client()->PlasmaObjectReady(
+3 -1
View File
@@ -570,7 +570,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
void FinishAssignTask(const std::shared_ptr<WorkerInterface> &worker,
const TaskID &task_id, bool success);
/// Process worker subscribing to plasma.
/// Process worker subscribing to a given plasma object become available. This handler
/// makes sure that the plasma object is local and calls core worker's PlasmaObjectReady
/// gRPC endpoint.
///
/// \param client The client that sent the message.
/// \param message_data A pointer to the message data.
+13 -7
View File
@@ -25,6 +25,12 @@
using MessageType = ray::protocol::MessageType;
namespace {
inline flatbuffers::Offset<ray::protocol::Address> to_flatbuf(
flatbuffers::FlatBufferBuilder &fbb, const ray::rpc::Address &address) {
return ray::protocol::CreateAddress(
fbb, fbb.CreateString(address.raylet_id()), fbb.CreateString(address.ip_address()),
address.port(), fbb.CreateString(address.worker_id()));
}
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<ray::protocol::Address>>>
AddressesToFlatbuffer(flatbuffers::FlatBufferBuilder &fbb,
@@ -32,10 +38,7 @@ AddressesToFlatbuffer(flatbuffers::FlatBufferBuilder &fbb,
std::vector<flatbuffers::Offset<ray::protocol::Address>> address_vec;
address_vec.reserve(addresses.size());
for (const auto &addr : addresses) {
auto fbb_addr = ray::protocol::CreateAddress(
fbb, fbb.CreateString(addr.raylet_id()), fbb.CreateString(addr.ip_address()),
addr.port(), fbb.CreateString(addr.worker_id()));
address_vec.push_back(fbb_addr);
address_vec.push_back(to_flatbuf(fbb, addr));
}
return fbb.CreateVector(address_vec);
}
@@ -416,11 +419,14 @@ void raylet::RayletClient::GlobalGC(
grpc_client_->GlobalGC(request, callback);
}
Status raylet::RayletClient::SubscribeToPlasma(const ObjectID &object_id) {
void raylet::RayletClient::SubscribeToPlasma(const ObjectID &object_id,
const rpc::Address &owner_address) {
flatbuffers::FlatBufferBuilder fbb;
auto message = protocol::CreateSubscribePlasmaReady(fbb, to_flatbuf(fbb, object_id));
auto message = protocol::CreateSubscribePlasmaReady(fbb, to_flatbuf(fbb, object_id),
to_flatbuf(fbb, owner_address));
fbb.Finish(message);
return conn_->WriteMessage(MessageType::SubscribePlasmaReady, &fbb);
RAY_CHECK_OK(conn_->WriteMessage(MessageType::SubscribePlasmaReady, &fbb));
}
} // namespace ray
+1 -1
View File
@@ -387,7 +387,7 @@ class RayletClient : public PinObjectsInterface,
void GlobalGC(const rpc::ClientCallback<rpc::GlobalGCReply> &callback);
// Subscribe to receive notification on plasma object
ray::Status SubscribeToPlasma(const ObjectID &object_id);
void SubscribeToPlasma(const ObjectID &object_id, const rpc::Address &owner_address);
WorkerID GetWorkerID() const { return worker_id_; }