GCS adapts to node resource table pub sub (#8305)

This commit is contained in:
fangfengbin
2020-05-09 10:31:35 +08:00
committed by GitHub
parent 304e31b7e5
commit 7fec602f2e
12 changed files with 89 additions and 53 deletions
+1 -1
View File
@@ -463,7 +463,7 @@ class NodeInfoAccessor {
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribeToResources(
const SubscribeCallback<ClientID, ResourceChangeNotification> &subscribe,
const ItemCallback<rpc::NodeResourceChange> &subscribe,
const StatusCallback &done) = 0;
/// Report heartbeat of a node to GCS asynchronously.
@@ -282,8 +282,7 @@ Status ServiceBasedActorInfoAccessor::AsyncGetCheckpointID(
ServiceBasedNodeInfoAccessor::ServiceBasedNodeInfoAccessor(
ServiceBasedGcsClient *client_impl)
: client_impl_(client_impl),
resource_sub_executor_(client_impl->GetRedisGcsClient().resource_table()) {}
: client_impl_(client_impl) {}
Status ServiceBasedNodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_info) {
auto node_id = ClientID::FromBinary(local_node_info.node_id());
@@ -522,12 +521,18 @@ Status ServiceBasedNodeInfoAccessor::AsyncDeleteResources(
}
Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToResources(
const SubscribeCallback<ClientID, ResourceChangeNotification> &subscribe,
const StatusCallback &done) {
const ItemCallback<rpc::NodeResourceChange> &subscribe, const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing node resources change.";
RAY_CHECK(subscribe != nullptr);
auto status =
resource_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), subscribe, done);
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
rpc::NodeResourceChange node_resource_change;
node_resource_change.ParseFromString(data);
subscribe(node_resource_change);
};
auto status = client_impl_->GetGcsPubSub().SubscribeAll(NODE_RESOURCE_CHANNEL,
on_subscribe, done);
RAY_LOG(DEBUG) << "Finished subscribing node resources change.";
return status;
}
@@ -150,9 +150,8 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor {
const std::vector<std::string> &resource_names,
const StatusCallback &callback) override;
Status AsyncSubscribeToResources(
const SubscribeCallback<ClientID, ResourceChangeNotification> &subscribe,
const StatusCallback &done) override;
Status AsyncSubscribeToResources(const ItemCallback<rpc::NodeResourceChange> &subscribe,
const StatusCallback &done) override;
Status AsyncReportHeartbeat(const std::shared_ptr<rpc::HeartbeatTableData> &data_ptr,
const StatusCallback &callback) override;
@@ -177,10 +176,6 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor {
using NodeChangeCallback =
std::function<void(const ClientID &id, const GcsNodeInfo &node_info)>;
typedef SubscriptionExecutor<ClientID, ResourceChangeNotification, DynamicResourceTable>
DynamicResourceSubscriptionExecutor;
DynamicResourceSubscriptionExecutor resource_sub_executor_;
GcsNodeInfo local_node_info_;
ClientID local_node_id_;
@@ -225,9 +225,7 @@ class ServiceBasedGcsClientTest : public RedisServiceManagerForTest {
return WaitReady(promise.get_future(), timeout_ms_);
}
bool SubscribeToResources(
const gcs::SubscribeCallback<ClientID, gcs::ResourceChangeNotification>
&subscribe) {
bool SubscribeToResources(const gcs::ItemCallback<rpc::NodeResourceChange> &subscribe) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToResources(
subscribe, [&promise](Status status) { promise.set_value(status.ok()); }));
@@ -613,12 +611,11 @@ TEST_F(ServiceBasedGcsClientTest, TestNodeResources) {
// Subscribe to node resource changes.
std::atomic<int> add_count(0);
std::atomic<int> remove_count(0);
auto on_subscribe = [&add_count, &remove_count](
const ClientID &id,
const gcs::ResourceChangeNotification &notification) {
if (notification.IsAdded()) {
auto on_subscribe = [&add_count,
&remove_count](const rpc::NodeResourceChange &notification) {
if (0 == notification.deleted_resources_size()) {
++add_count;
} else if (notification.IsRemoved()) {
} else {
++remove_count;
}
};
+23 -3
View File
@@ -228,9 +228,19 @@ void GcsNodeManager::HandleUpdateResources(const rpc::UpdateResourcesRequest &re
for (auto &entry : *to_be_updated_resources) {
iter->second[entry.first] = entry.second;
}
auto on_done = [node_id, to_be_updated_resources, reply,
send_reply_callback](Status status) {
auto on_done = [this, node_id, to_be_updated_resources, reply,
send_reply_callback](const Status &status) {
RAY_CHECK_OK(status);
rpc::NodeResourceChange node_resource_change;
node_resource_change.set_node_id(node_id.Binary());
for (auto &it : *to_be_updated_resources) {
(*node_resource_change.mutable_updated_resources())[it.first] =
it.second->resource_capacity();
}
RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_RESOURCE_CHANNEL, node_id.Hex(),
node_resource_change.SerializeAsString(),
nullptr));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
RAY_LOG(DEBUG) << "Finished updating resources, node id = " << node_id;
};
@@ -255,8 +265,18 @@ void GcsNodeManager::HandleDeleteResources(const rpc::DeleteResourcesRequest &re
for (auto &resource_name : resource_names) {
iter->second.erase(resource_name);
}
auto on_done = [reply, send_reply_callback](Status status) {
auto on_done = [this, node_id, resource_names, reply,
send_reply_callback](const Status &status) {
RAY_CHECK_OK(status);
rpc::NodeResourceChange node_resource_change;
node_resource_change.set_node_id(node_id.Binary());
for (const auto &resource_name : resource_names) {
node_resource_change.add_deleted_resources(resource_name);
}
RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_RESOURCE_CHANNEL, node_id.Hex(),
node_resource_change.SerializeAsString(),
nullptr));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
RAY_CHECK_OK(
@@ -308,8 +308,7 @@ struct GcsServerMocker {
}
Status AsyncSubscribeToResources(
const gcs::SubscribeCallback<ClientID, gcs::ResourceChangeNotification>
&subscribe,
const gcs::ItemCallback<rpc::NodeResourceChange> &subscribe,
const gcs::StatusCallback &done) override {
return Status::NotImplemented("");
}
+1
View File
@@ -27,6 +27,7 @@ namespace gcs {
#define JOB_CHANNEL "JOB"
#define NODE_CHANNEL "NODE"
#define NODE_RESOURCE_CHANNEL "NODE_RESOURCE"
#define WORKER_FAILURE_CHANNEL "WORKER_FAILURE"
#define OBJECT_CHANNEL "OBJECT"
#define TASK_CHANNEL "TASK"
+18 -3
View File
@@ -711,10 +711,25 @@ Status RedisNodeInfoAccessor::AsyncDeleteResources(
}
Status RedisNodeInfoAccessor::AsyncSubscribeToResources(
const SubscribeCallback<ClientID, ResourceChangeNotification> &subscribe,
const StatusCallback &done) {
const ItemCallback<rpc::NodeResourceChange> &subscribe, const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
return resource_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), subscribe, done);
auto on_subscribe = [subscribe](const ClientID &id,
const ResourceChangeNotification &result) {
rpc::NodeResourceChange node_resource_change;
node_resource_change.set_node_id(id.Binary());
if (result.IsAdded()) {
for (auto &it : result.GetData()) {
(*node_resource_change.mutable_updated_resources())[it.first] =
it.second->resource_capacity();
}
} else {
for (auto &it : result.GetData()) {
node_resource_change.add_deleted_resources(it.first);
}
}
subscribe(node_resource_change);
};
return resource_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), on_subscribe, done);
}
RedisErrorInfoAccessor::RedisErrorInfoAccessor(RedisGcsClient *client_impl)
+2 -3
View File
@@ -319,9 +319,8 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor {
const std::vector<std::string> &resource_names,
const StatusCallback &callback) override;
Status AsyncSubscribeToResources(
const SubscribeCallback<ClientID, ResourceChangeNotification> &subscribe,
const StatusCallback &done) override;
Status AsyncSubscribeToResources(const ItemCallback<rpc::NodeResourceChange> &subscribe,
const StatusCallback &done) override;
Status AsyncReportHeartbeat(const std::shared_ptr<HeartbeatTableData> &data_ptr,
const StatusCallback &callback) override;
@@ -126,15 +126,15 @@ TEST_F(NodeDynamicResourceTest, Subscribe) {
}
WaitPendingDone(wait_pending_timeout_);
auto subscribe = [this](const ClientID &id,
const ResourceChangeNotification &notification) {
auto subscribe = [this](const rpc::NodeResourceChange &notification) {
auto id = ClientID::FromBinary(notification.node_id());
RAY_LOG(INFO) << "receive client id=" << id;
auto it = id_to_resource_map_.find(id);
ASSERT_TRUE(it != id_to_resource_map_.end());
if (notification.IsAdded()) {
ASSERT_EQ(notification.GetData().size(), it->second.size());
if (0 == notification.deleted_resources_size()) {
ASSERT_EQ(notification.updated_resources_size(), it->second.size());
} else {
ASSERT_EQ(notification.GetData().size(), resource_to_delete_.size());
ASSERT_EQ(notification.deleted_resources_size(), resource_to_delete_.size());
}
--sub_pending_count_;
};
+10
View File
@@ -317,6 +317,16 @@ message ObjectLocationChange {
ObjectTableData data = 2;
}
// A notification message about one node's resources being changed.
message NodeResourceChange {
// ID of the node whose resources have changed.
bytes node_id = 1;
// Labels of the updated resources and their latest capacities.
map<string, double> updated_resources = 2;
// Labels of the resources that were deleted.
repeated string deleted_resources = 3;
}
message PubSubMessage {
bytes id = 1;
bytes data = 2;
+10 -15
View File
@@ -218,22 +218,17 @@ ray::Status NodeManager::RegisterGcs() {
RAY_CHECK_OK(status);
// Subscribe to resource changes.
const auto &resources_changed =
[this](const ClientID &id,
const gcs::ResourceChangeNotification &resource_notification) {
if (resource_notification.IsAdded()) {
ResourceSet resource_set;
for (auto &entry : resource_notification.GetData()) {
resource_set.AddOrUpdateResource(entry.first,
entry.second->resource_capacity());
}
[this](const rpc::NodeResourceChange &resource_notification) {
auto id = ClientID::FromBinary(resource_notification.node_id());
if (resource_notification.updated_resources_size() != 0) {
ResourceSet resource_set(
MapFromProtobuf(resource_notification.updated_resources()));
ResourceCreateUpdated(id, resource_set);
} else {
RAY_CHECK(resource_notification.IsRemoved());
std::vector<std::string> resource_names;
for (auto &entry : resource_notification.GetData()) {
resource_names.push_back(entry.first);
}
ResourceDeleted(id, resource_names);
}
if (resource_notification.deleted_resources_size() != 0) {
ResourceDeleted(
id, VectorFromProtobuf(resource_notification.deleted_resources()));
}
};
RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToResources(