Split heartbeat message (#12535)

* first

* xxx

* Split heartbeat message

* only report resource usage when changed

* Fix GetAllResourceUsage

* Fix report resource usage

* Increase default heartbeat interval

* regularize heartbeat interval in test case
This commit is contained in:
Tao Wang
2020-12-11 21:19:57 +08:00
committed by GitHub
parent 867d2a8aa3
commit 295b6e5ce4
58 changed files with 1018 additions and 910 deletions
+6 -6
View File
@@ -7,8 +7,8 @@ from ray.core.generated.gcs_pb2 import (
JobConfig,
ErrorTableData,
GcsEntry,
HeartbeatBatchTableData,
HeartbeatTableData,
ResourceUsageBatchData,
ResourcesData,
ObjectTableData,
ProfileTableData,
TablePrefix,
@@ -33,8 +33,8 @@ __all__ = [
"ErrorTableData",
"ErrorType",
"GcsEntry",
"HeartbeatBatchTableData",
"HeartbeatTableData",
"ResourceUsageBatchData",
"ResourcesData",
"ObjectTableData",
"ProfileTableData",
"TablePrefix",
@@ -55,8 +55,8 @@ FUNCTION_PREFIX = "RemoteFunction:"
LOG_FILE_CHANNEL = "RAY_LOG_CHANNEL"
REPORTER_CHANNEL = "RAY_REPORTER"
# xray heartbeats
XRAY_HEARTBEAT_BATCH_PATTERN = "HEARTBEAT_BATCH:".encode("ascii")
# xray resource usages
XRAY_RESOURCES_BATCH_PATTERN = "RESOURCES_BATCH:".encode("ascii")
# xray job updates
XRAY_JOB_PATTERN = "JOB:*".encode("ascii")
@@ -23,7 +23,7 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
c_vector[c_string] GetAllProfileInfo()
c_vector[c_string] GetAllObjectInfo()
unique_ptr[c_string] GetObjectInfo(const CObjectID &object_id)
unique_ptr[c_string] GetAllHeartbeat()
unique_ptr[c_string] GetAllResourceUsage()
c_vector[c_string] GetAllActorInfo()
unique_ptr[c_string] GetActorInfo(const CActorID &actor_id)
c_string GetNodeResourceInfo(const CNodeID &node_id)
@@ -78,11 +78,11 @@ cdef class GlobalStateAccessor:
return c_string(object_info.get().data(), object_info.get().size())
return None
def get_all_heartbeat(self):
"""Get newest heartbeat of all nodes from GCS service."""
def get_all_resource_usage(self):
"""Get newest resource usage of all nodes from GCS service."""
cdef unique_ptr[c_string] result
with nogil:
result = self.inner.get().GetAllHeartbeat()
result = self.inner.get().GetAllResourceUsage()
if result:
return c_string(result.get().data(), result.get().size())
return None
+1 -1
View File
@@ -15,7 +15,7 @@ cdef extern from "ray/common/ray_config.h" nogil:
int64_t raylet_heartbeat_timeout_milliseconds() const
c_bool light_heartbeat_enabled() const
c_bool light_report_resource_usage_enabled() const
int64_t debug_dump_period_milliseconds() const
+2 -2
View File
@@ -14,8 +14,8 @@ cdef class Config:
return RayConfig.instance().raylet_heartbeat_timeout_milliseconds()
@staticmethod
def light_heartbeat_enabled():
return RayConfig.instance().light_heartbeat_enabled()
def light_report_resource_usage_enabled():
return RayConfig.instance().light_report_resource_usage_enabled()
@staticmethod
def debug_dump_period_milliseconds():
+11 -11
View File
@@ -139,24 +139,24 @@ class Monitor:
self.primary_subscribe_client.subscribe(channel)
def update_load_metrics(self):
"""Fetches heartbeat data from GCS and updates load metrics."""
"""Fetches resource usage data from GCS and updates load metrics."""
all_heartbeat = self.global_state_accessor.get_all_heartbeat()
heartbeat_batch_data = \
ray.gcs_utils.HeartbeatBatchTableData.FromString(all_heartbeat)
for heartbeat_message in heartbeat_batch_data.batch:
resource_load = dict(heartbeat_message.resource_load)
total_resources = dict(heartbeat_message.resources_total)
available_resources = dict(heartbeat_message.resources_available)
all_resources = self.global_state_accessor.get_all_resource_usage()
resources_batch_data = \
ray.gcs_utils.ResourceUsageBatchData.FromString(all_resources)
for resource_message in resources_batch_data.batch:
resource_load = dict(resource_message.resource_load)
total_resources = dict(resource_message.resources_total)
available_resources = dict(resource_message.resources_available)
waiting_bundles, infeasible_bundles = parse_resource_demands(
heartbeat_batch_data.resource_load_by_shape)
resources_batch_data.resource_load_by_shape)
pending_placement_groups = list(
heartbeat_batch_data.placement_group_load.placement_group_data)
resources_batch_data.placement_group_load.placement_group_data)
# Update the load metrics for this raylet.
node_id = ray.utils.binary_to_hex(heartbeat_message.node_id)
node_id = ray.utils.binary_to_hex(resource_message.node_id)
ip = self.raylet_id_to_ip_map.get(node_id)
if ip:
self.load_metrics.update(ip, total_resources,
+15 -14
View File
@@ -1055,11 +1055,11 @@ def test_actor_resource_demand(shutdown_only):
ray.get(a.foo.remote())
time.sleep(1)
message = global_state_accessor.get_all_heartbeat()
heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(message)
message = global_state_accessor.get_all_resource_usage()
resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString(message)
# The actor is scheduled so there should be no more demands left.
assert len(heartbeat.resource_load_by_shape.resource_demands) == 0
assert len(resource_usages.resource_load_by_shape.resource_demands) == 0
@ray.remote(num_cpus=80)
class Actor2:
@@ -1070,23 +1070,24 @@ def test_actor_resource_demand(shutdown_only):
time.sleep(1)
# This actor cannot be scheduled.
message = global_state_accessor.get_all_heartbeat()
heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(message)
assert len(heartbeat.resource_load_by_shape.resource_demands) == 1
assert (heartbeat.resource_load_by_shape.resource_demands[0].shape == {
"CPU": 80.0
})
assert (heartbeat.resource_load_by_shape.resource_demands[0]
message = global_state_accessor.get_all_resource_usage()
resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString(message)
assert len(resource_usages.resource_load_by_shape.resource_demands) == 1
assert (
resource_usages.resource_load_by_shape.resource_demands[0].shape == {
"CPU": 80.0
})
assert (resource_usages.resource_load_by_shape.resource_demands[0]
.num_infeasible_requests_queued == 1)
actors.append(Actor2.remote())
time.sleep(1)
# Two actors cannot be scheduled.
message = global_state_accessor.get_all_heartbeat()
heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(message)
assert len(heartbeat.resource_load_by_shape.resource_demands) == 1
assert (heartbeat.resource_load_by_shape.resource_demands[0]
message = global_state_accessor.get_all_resource_usage()
resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString(message)
assert len(resource_usages.resource_load_by_shape.resource_demands) == 1
assert (resource_usages.resource_load_by_shape.resource_demands[0]
.num_infeasible_requests_queued == 2)
global_state_accessor.disconnect()
+1 -1
View File
@@ -237,7 +237,7 @@ def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster):
cluster.add_node(
num_cpus=10 * num_gpus_per_raylet,
num_gpus=num_gpus_per_raylet,
_system_config={"num_heartbeats_timeout": 1000} if i == 0 else {})
_system_config={"num_heartbeats_timeout": 100} if i == 0 else {})
ray.init(address=cluster.address)
@ray.remote
+4 -3
View File
@@ -610,9 +610,10 @@ def test_lease_request_leak(shutdown_only):
del obj_ref
ray.get(tasks)
time.sleep(
1) # Sleep for an amount longer than the reconstruction timeout.
assert len(ray.objects()) == 0, ray.objects()
def _no_objects():
return len(ray.objects()) == 0
wait_for_condition(_no_objects, timeout=10)
@pytest.mark.parametrize(
@@ -143,7 +143,7 @@ def check_components_alive(cluster, component_type, check_component_alive):
"num_cpus": 8,
"num_nodes": 4,
"_system_config": {
"num_heartbeats_timeout": 100
"num_heartbeats_timeout": 10
},
}],
indirect=True)
+1 -1
View File
@@ -76,7 +76,7 @@ def test_gcs_server_restart_during_actor_creation(ray_start_regular):
@pytest.mark.parametrize(
"ray_start_cluster_head", [
generate_system_config_map(
num_heartbeats_timeout=20, ping_gcs_rpc_server_max_retries=60)
num_heartbeats_timeout=2, ping_gcs_rpc_server_max_retries=60)
],
indirect=True)
def test_node_failure_detector_when_gcs_server_restart(ray_start_cluster_head):
+24 -22
View File
@@ -173,13 +173,14 @@ def test_load_report(shutdown_only, max_shapes):
self.report = None
def check_load_report(self):
message = global_state_accessor.get_all_heartbeat()
message = global_state_accessor.get_all_resource_usage()
if message is None:
return False
heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(
resource_usage = ray.gcs_utils.ResourceUsageBatchData.FromString(
message)
self.report = heartbeat.resource_load_by_shape.resource_demands
self.report = \
resource_usage.resource_load_by_shape.resource_demands
if max_shapes == 0:
return True
elif max_shapes == 2:
@@ -227,40 +228,40 @@ def test_placement_group_load_report(ray_start_cluster):
class PgLoadChecker:
def nothing_is_ready(self):
heartbeat = self._read_heartbeat()
if not heartbeat:
resource_usage = self._read_resource_usage()
if not resource_usage:
return False
if heartbeat.HasField("placement_group_load"):
pg_load = heartbeat.placement_group_load
if resource_usage.HasField("placement_group_load"):
pg_load = resource_usage.placement_group_load
return len(pg_load.placement_group_data) == 2
return False
def only_first_one_ready(self):
heartbeat = self._read_heartbeat()
if not heartbeat:
resource_usage = self._read_resource_usage()
if not resource_usage:
return False
if heartbeat.HasField("placement_group_load"):
pg_load = heartbeat.placement_group_load
if resource_usage.HasField("placement_group_load"):
pg_load = resource_usage.placement_group_load
return len(pg_load.placement_group_data) == 1
return False
def two_infeasible_pg(self):
heartbeat = self._read_heartbeat()
if not heartbeat:
resource_usage = self._read_resource_usage()
if not resource_usage:
return False
if heartbeat.HasField("placement_group_load"):
pg_load = heartbeat.placement_group_load
if resource_usage.HasField("placement_group_load"):
pg_load = resource_usage.placement_group_load
return len(pg_load.placement_group_data) == 2
return False
def _read_heartbeat(self):
message = global_state_accessor.get_all_heartbeat()
def _read_resource_usage(self):
message = global_state_accessor.get_all_resource_usage()
if message is None:
return False
heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(
resource_usage = ray.gcs_utils.ResourceUsageBatchData.FromString(
message)
return heartbeat
return resource_usage
checker = PgLoadChecker()
@@ -301,13 +302,14 @@ def test_backlog_report(shutdown_only):
return None
def backlog_size_set():
message = global_state_accessor.get_all_heartbeat()
message = global_state_accessor.get_all_resource_usage()
if message is None:
return False
heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(message)
resource_usage = ray.gcs_utils.ResourceUsageBatchData.FromString(
message)
aggregate_resource_load = \
heartbeat.resource_load_by_shape.resource_demands
resource_usage.resource_load_by_shape.resource_demands
if len(aggregate_resource_load) == 1:
backlog_size = aggregate_resource_load[0].backlog_size
print(backlog_size)
+3 -3
View File
@@ -34,7 +34,7 @@ def test_shutdown():
@pytest.mark.parametrize(
"ray_start_cluster_head", [
generate_system_config_map(
num_heartbeats_timeout=20, object_timeout_milliseconds=12345)
num_heartbeats_timeout=2, object_timeout_milliseconds=12345)
],
indirect=True)
def test_system_config(ray_start_cluster_head):
@@ -52,12 +52,12 @@ def test_system_config(ray_start_cluster_head):
@ray.remote
def f():
assert ray._config.object_timeout_milliseconds() == 12345
assert ray._config.num_heartbeats_timeout() == 20
assert ray._config.num_heartbeats_timeout() == 2
ray.get([f.remote() for _ in range(5)])
cluster.remove_node(worker, allow_graceful=False)
time.sleep(1)
time.sleep(0.9)
assert ray.cluster_resources()["CPU"] == 2
time.sleep(2)
+3 -3
View File
@@ -1165,7 +1165,7 @@ ray.shutdown()
@pytest.mark.parametrize(
"ray_start_cluster_head", [
generate_system_config_map(
num_heartbeats_timeout=20, ping_gcs_rpc_server_max_retries=60)
num_heartbeats_timeout=3, ping_gcs_rpc_server_max_retries=60)
],
indirect=True)
def test_create_placement_group_after_gcs_server_restart(
@@ -1203,7 +1203,7 @@ def test_create_placement_group_after_gcs_server_restart(
@pytest.mark.parametrize(
"ray_start_cluster_head", [
generate_system_config_map(
num_heartbeats_timeout=20, ping_gcs_rpc_server_max_retries=60)
num_heartbeats_timeout=3, ping_gcs_rpc_server_max_retries=60)
],
indirect=True)
def test_create_actor_with_placement_group_after_gcs_server_restart(
@@ -1227,7 +1227,7 @@ def test_create_actor_with_placement_group_after_gcs_server_restart(
@pytest.mark.parametrize(
"ray_start_cluster_head", [
generate_system_config_map(
num_heartbeats_timeout=20, ping_gcs_rpc_server_max_retries=60)
num_heartbeats_timeout=3, ping_gcs_rpc_server_max_retries=60)
],
indirect=True)
def test_create_placement_group_during_gcs_server_restart(
+8 -6
View File
@@ -35,20 +35,22 @@ RAY_CONFIG(int64_t, ray_cookie, 0x5241590000000000)
RAY_CONFIG(int64_t, handler_warning_timeout_ms, 100)
/// The duration between heartbeats sent by the raylets.
RAY_CONFIG(int64_t, raylet_heartbeat_timeout_milliseconds, 100)
/// Whether to send heartbeat lightly. When it is enalbed, only changed part,
/// like should_global_gc or changed resources, will be included in the heartbeat,
/// and gcs only broadcast the changed heartbeat.
RAY_CONFIG(bool, light_heartbeat_enabled, true)
RAY_CONFIG(int64_t, raylet_heartbeat_timeout_milliseconds, 1000)
/// If a component has not sent a heartbeat in the last num_heartbeats_timeout
/// heartbeat intervals, the raylet monitor process will report
/// it as dead to the db_client table.
RAY_CONFIG(int64_t, num_heartbeats_timeout, 300)
RAY_CONFIG(int64_t, num_heartbeats_timeout, 30)
/// For a raylet, if the last heartbeat was sent more than this many
/// heartbeat periods ago, then a warning will be logged that the heartbeat
/// handler is drifting.
RAY_CONFIG(uint64_t, num_heartbeats_warning, 5)
/// The duration between reporting resources sent by the raylets.
RAY_CONFIG(int64_t, raylet_report_resources_period_milliseconds, 100)
/// Whether to report resource usage lightly. When it is enalbed, only changed part,
/// like should_global_gc or changed resources, will be included in the message.
RAY_CONFIG(bool, light_report_resource_usage_enabled, true)
/// The duration between dumping debug info to logs, or -1 to disable.
RAY_CONFIG(int64_t, debug_dump_period_milliseconds, 10000)
+22 -14
View File
@@ -543,30 +543,38 @@ class NodeInfoAccessor {
const std::shared_ptr<rpc::HeartbeatTableData> &data_ptr,
const StatusCallback &callback) = 0;
/// Resend heartbeat when GCS restarts from a failure.
virtual void AsyncReReportHeartbeat() = 0;
/// Report resource usage of a node to GCS asynchronously.
///
/// \param data_ptr The data that will be reported to GCS.
/// \param callback Callback that will be called after report finishes.
/// \return Status
virtual Status AsyncReportResourceUsage(
const std::shared_ptr<rpc::ResourcesData> &data_ptr,
const StatusCallback &callback) = 0;
/// Resend resource usage when GCS restarts from a failure.
virtual void AsyncReReportResourceUsage() = 0;
/// Return resources in last report. Used by light heartbeat.
std::shared_ptr<SchedulingResources> &GetLastHeartbeatResources() {
return last_heartbeat_resources_;
std::shared_ptr<SchedulingResources> &GetLastResourceUsage() {
return last_resource_usage_;
}
/// Get newest heartbeat of all nodes from GCS asynchronously. Only used when light
/// heartbeat enabled.
/// Get newest resource usage of all nodes from GCS asynchronously.
///
/// \param callback Callback that will be called after lookup finishes.
/// \return Status
virtual Status AsyncGetAllHeartbeat(
const ItemCallback<rpc::HeartbeatBatchTableData> &callback) = 0;
virtual Status AsyncGetAllResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &callback) = 0;
/// Subscribe batched state of all nodes from GCS.
///
/// \param subscribe Callback that will be called each time when batch heartbeat is
/// \param subscribe Callback that will be called each time when batch resource usage is
/// updated.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribeBatchHeartbeat(
const ItemCallback<rpc::HeartbeatBatchTableData> &subscribe,
virtual Status AsyncSubscribeBatchedResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &subscribe,
const StatusCallback &done) = 0;
/// Reestablish subscription.
@@ -598,9 +606,9 @@ class NodeInfoAccessor {
NodeInfoAccessor() = default;
private:
/// Cache which stores resources in last heartbeat used to check if they are changed.
/// Used by light heartbeat.
std::shared_ptr<SchedulingResources> last_heartbeat_resources_ =
/// Cache which stores resource usage in last report used to check if they are changed.
/// Used by light resource usage report.
std::shared_ptr<SchedulingResources> last_resource_usage_ =
std::make_shared<SchedulingResources>();
};
@@ -174,14 +174,14 @@ std::string GlobalStateAccessor::GetInternalConfig() {
return config_proto.SerializeAsString();
}
std::unique_ptr<std::string> GlobalStateAccessor::GetAllHeartbeat() {
std::unique_ptr<std::string> heartbeat_batch_data;
std::unique_ptr<std::string> GlobalStateAccessor::GetAllResourceUsage() {
std::unique_ptr<std::string> resource_batch_data;
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAllHeartbeat(
TransformForItemCallback<rpc::HeartbeatBatchTableData>(heartbeat_batch_data,
promise)));
RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAllResourceUsage(
TransformForItemCallback<rpc::ResourceUsageBatchData>(resource_batch_data,
promise)));
promise.get_future().get();
return heartbeat_batch_data;
return resource_batch_data;
}
std::vector<std::string> GlobalStateAccessor::GetAllActorInfo() {
@@ -99,13 +99,13 @@ class GlobalStateAccessor {
/// and serialized as a string to allow multi-language support.
std::string GetInternalConfig();
/// Get newest heartbeat of all nodes from GCS Service. Only used when light
/// heartbeat enabled.
/// Get newest resource usage of all nodes from GCS Service. Only used when light
/// rerouce usage report enabled.
///
/// \return node heartbeat info. To support multi-language, we serialize each
/// HeartbeatTableData and return the serialized string. Where used, it needs to be
/// \return resource usage info. To support multi-language, we serialize each
/// data and return the serialized string. Where used, it needs to be
/// deserialized with protobuf function.
std::unique_ptr<std::string> GetAllHeartbeat();
std::unique_ptr<std::string> GetAllResourceUsage();
/// Get information of all actors from GCS Service.
///
@@ -609,11 +609,10 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToResources(
Status ServiceBasedNodeInfoAccessor::AsyncReportHeartbeat(
const std::shared_ptr<rpc::HeartbeatTableData> &data_ptr,
const StatusCallback &callback) {
absl::MutexLock lock(&mutex_);
cached_heartbeat_.mutable_heartbeat()->CopyFrom(*data_ptr);
rpc::ReportHeartbeatRequest request;
request.mutable_heartbeat()->CopyFrom(*data_ptr);
client_impl_->GetGcsRpcClient().ReportHeartbeat(
cached_heartbeat_,
[callback](const Status &status, const rpc::ReportHeartbeatReply &reply) {
request, [callback](const Status &status, const rpc::ReportHeartbeatReply &reply) {
if (callback) {
callback(status);
}
@@ -621,74 +620,90 @@ Status ServiceBasedNodeInfoAccessor::AsyncReportHeartbeat(
return Status::OK();
}
void ServiceBasedNodeInfoAccessor::AsyncReReportHeartbeat() {
Status ServiceBasedNodeInfoAccessor::AsyncReportResourceUsage(
const std::shared_ptr<rpc::ResourcesData> &data_ptr, const StatusCallback &callback) {
absl::MutexLock lock(&mutex_);
if (cached_heartbeat_.has_heartbeat()) {
RAY_LOG(INFO) << "Rereport heartbeat.";
FillHeartbeatRequest(cached_heartbeat_);
client_impl_->GetGcsRpcClient().ReportHeartbeat(
cached_heartbeat_,
[](const Status &status, const rpc::ReportHeartbeatReply &reply) {});
}
}
void ServiceBasedNodeInfoAccessor::FillHeartbeatRequest(
rpc::ReportHeartbeatRequest &heartbeat) {
if (RayConfig::instance().light_heartbeat_enabled()) {
SchedulingResources cached_resources =
SchedulingResources(*GetLastHeartbeatResources());
auto heartbeat_data = heartbeat.mutable_heartbeat();
heartbeat_data->clear_resources_total();
for (const auto &resource_pair :
cached_resources.GetTotalResources().GetResourceMap()) {
(*heartbeat_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}
heartbeat_data->clear_resources_available();
heartbeat_data->set_resources_available_changed(true);
for (const auto &resource_pair :
cached_resources.GetAvailableResources().GetResourceMap()) {
(*heartbeat_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
heartbeat_data->clear_resource_load();
heartbeat_data->set_resource_load_changed(true);
for (const auto &resource_pair :
cached_resources.GetLoadResources().GetResourceMap()) {
(*heartbeat_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
}
}
Status ServiceBasedNodeInfoAccessor::AsyncGetAllHeartbeat(
const ItemCallback<rpc::HeartbeatBatchTableData> &callback) {
rpc::GetAllHeartbeatRequest request;
client_impl_->GetGcsRpcClient().GetAllHeartbeat(
request, [callback](const Status &status, const rpc::GetAllHeartbeatReply &reply) {
callback(reply.heartbeat_data());
RAY_LOG(DEBUG) << "Finished getting heartbeat of all nodes, status = " << status;
cached_resource_usage_.mutable_resources()->CopyFrom(*data_ptr);
client_impl_->GetGcsRpcClient().ReportResourceUsage(
cached_resource_usage_,
[callback](const Status &status, const rpc::ReportResourceUsageReply &reply) {
if (callback) {
callback(status);
}
});
return Status::OK();
}
Status ServiceBasedNodeInfoAccessor::AsyncSubscribeBatchHeartbeat(
const ItemCallback<rpc::HeartbeatBatchTableData> &subscribe,
void ServiceBasedNodeInfoAccessor::AsyncReReportResourceUsage() {
absl::MutexLock lock(&mutex_);
if (cached_resource_usage_.has_resources()) {
RAY_LOG(INFO) << "Rereport resource usage.";
FillResourceUsageRequest(cached_resource_usage_);
client_impl_->GetGcsRpcClient().ReportResourceUsage(
cached_resource_usage_,
[](const Status &status, const rpc::ReportResourceUsageReply &reply) {});
}
}
void ServiceBasedNodeInfoAccessor::FillResourceUsageRequest(
rpc::ReportResourceUsageRequest &resources) {
if (RayConfig::instance().light_report_resource_usage_enabled()) {
SchedulingResources cached_resources = SchedulingResources(*GetLastResourceUsage());
auto resources_data = resources.mutable_resources();
resources_data->clear_resources_total();
for (const auto &resource_pair :
cached_resources.GetTotalResources().GetResourceMap()) {
(*resources_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}
resources_data->clear_resources_available();
resources_data->set_resources_available_changed(true);
for (const auto &resource_pair :
cached_resources.GetAvailableResources().GetResourceMap()) {
(*resources_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
resources_data->clear_resource_load();
resources_data->set_resource_load_changed(true);
for (const auto &resource_pair :
cached_resources.GetLoadResources().GetResourceMap()) {
(*resources_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
}
}
Status ServiceBasedNodeInfoAccessor::AsyncGetAllResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &callback) {
rpc::GetAllResourceUsageRequest request;
client_impl_->GetGcsRpcClient().GetAllResourceUsage(
request,
[callback](const Status &status, const rpc::GetAllResourceUsageReply &reply) {
callback(reply.resource_usage_data());
RAY_LOG(DEBUG) << "Finished getting resource usage of all nodes, status = "
<< status;
});
return Status::OK();
}
Status ServiceBasedNodeInfoAccessor::AsyncSubscribeBatchedResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
subscribe_batch_heartbeat_operation_ = [this, subscribe](const StatusCallback &done) {
subscribe_batch_resource_usage_operation_ = [this,
subscribe](const StatusCallback &done) {
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
rpc::HeartbeatBatchTableData heartbeat_batch_table_data;
heartbeat_batch_table_data.ParseFromString(data);
subscribe(heartbeat_batch_table_data);
rpc::ResourceUsageBatchData resources_batch_data;
resources_batch_data.ParseFromString(data);
subscribe(resources_batch_data);
};
return client_impl_->GetGcsPubSub().Subscribe(HEARTBEAT_BATCH_CHANNEL, "",
return client_impl_->GetGcsPubSub().Subscribe(RESOURCES_BATCH_CHANNEL, "",
on_subscribe, done);
};
return subscribe_batch_heartbeat_operation_(done);
return subscribe_batch_resource_usage_operation_(done);
}
void ServiceBasedNodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_info) {
@@ -752,8 +767,8 @@ void ServiceBasedNodeInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restar
if (subscribe_resource_operation_ != nullptr) {
RAY_CHECK_OK(subscribe_resource_operation_(nullptr));
}
if (subscribe_batch_heartbeat_operation_ != nullptr) {
RAY_CHECK_OK(subscribe_batch_heartbeat_operation_(nullptr));
if (subscribe_batch_resource_usage_operation_ != nullptr) {
RAY_CHECK_OK(subscribe_batch_resource_usage_operation_(nullptr));
}
} else {
if (fetch_node_data_operation_ != nullptr) {
+15 -12
View File
@@ -182,16 +182,19 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor {
Status AsyncReportHeartbeat(const std::shared_ptr<rpc::HeartbeatTableData> &data_ptr,
const StatusCallback &callback) override;
void AsyncReReportHeartbeat() override;
Status AsyncReportResourceUsage(const std::shared_ptr<rpc::ResourcesData> &data_ptr,
const StatusCallback &callback) override;
/// Fill resource fields with cached resources. Used by light heartbeat.
void FillHeartbeatRequest(rpc::ReportHeartbeatRequest &heartbeat);
void AsyncReReportResourceUsage() override;
Status AsyncGetAllHeartbeat(
const ItemCallback<rpc::HeartbeatBatchTableData> &callback) override;
/// Fill resource fields with cached resources. Used by light resource usage report.
void FillResourceUsageRequest(rpc::ReportResourceUsageRequest &resource_usage);
Status AsyncSubscribeBatchHeartbeat(
const ItemCallback<rpc::HeartbeatBatchTableData> &subscribe,
Status AsyncGetAllResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &callback) override;
Status AsyncSubscribeBatchedResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &subscribe,
const StatusCallback &done) override;
void AsyncResubscribe(bool is_pubsub_server_restarted) override;
@@ -208,18 +211,18 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor {
/// server restarts from a failure.
SubscribeOperation subscribe_node_operation_;
SubscribeOperation subscribe_resource_operation_;
SubscribeOperation subscribe_batch_heartbeat_operation_;
SubscribeOperation subscribe_batch_resource_usage_operation_;
/// Save the fetch data operation in this function, so we can call it again when GCS
/// server restarts from a failure.
FetchDataOperation fetch_node_data_operation_;
// Mutex to protect the cached_heartbeat_ field.
// Mutex to protect the cached_resource_usage_ field.
absl::Mutex mutex_;
/// Save the heartbeat data, so we can resend it again when GCS server restarts from a
/// failure.
rpc::ReportHeartbeatRequest cached_heartbeat_ GUARDED_BY(mutex_);
/// Save the resource usage data, so we can resend it again when GCS server restarts
/// from a failure.
rpc::ReportResourceUsageRequest cached_resource_usage_ GUARDED_BY(mutex_);
void HandleNotification(const GcsNodeInfo &node_info);
@@ -175,8 +175,8 @@ void ServiceBasedGcsClient::GcsServiceFailureDetected(rpc::GcsServiceFailureType
// because we use the same Redis server for both GCS storage and pub-sub. So the
// following flag is alway false.
resubscribe_func_(false);
// Resend heartbeat after reconnected, needed by resource view in GCS.
node_accessor_->AsyncReReportHeartbeat();
// Resend resource usage after reconnected, needed by resource view in GCS.
node_accessor_->AsyncReReportResourceUsage();
break;
default:
RAY_LOG(FATAL) << "Unsupported failure type: " << type;
@@ -186,12 +186,11 @@ TEST_F(GlobalStateAccessorTest, TestInternalConfig) {
}
}
TEST_F(GlobalStateAccessorTest, TestGetAllHeartbeat) {
std::unique_ptr<std::string> heartbeats = global_state_->GetAllHeartbeat();
rpc::HeartbeatBatchTableData heartbeat_batch_data;
heartbeat_batch_data.ParseFromString(*heartbeats.get());
ASSERT_EQ(heartbeat_batch_data.batch_size(), 0);
TEST_F(GlobalStateAccessorTest, TestGetAllResourceUsage) {
std::unique_ptr<std::string> resources = global_state_->GetAllResourceUsage();
rpc::ResourceUsageBatchData resource_usage_batch_data;
resource_usage_batch_data.ParseFromString(*resources.get());
ASSERT_EQ(resource_usage_batch_data.batch_size(), 0);
auto node_table_data = Mocker::GenNodeInfo();
std::promise<bool> promise;
@@ -201,59 +200,60 @@ TEST_F(GlobalStateAccessorTest, TestGetAllHeartbeat) {
auto node_table = global_state_->GetAllNodeInfo();
ASSERT_EQ(node_table.size(), 1);
// Report heartbeat first time.
// Report resource usage first time.
std::promise<bool> promise1;
auto heartbeat1 = std::make_shared<rpc::HeartbeatTableData>();
heartbeat1->set_node_id(node_table_data->node_id());
RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportHeartbeat(
heartbeat1, [&promise1](Status status) { promise1.set_value(status.ok()); }));
auto resources1 = std::make_shared<rpc::ResourcesData>();
resources1->set_node_id(node_table_data->node_id());
RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportResourceUsage(
resources1, [&promise1](Status status) { promise1.set_value(status.ok()); }));
WaitReady(promise1.get_future(), timeout_ms_);
heartbeats = global_state_->GetAllHeartbeat();
heartbeat_batch_data.ParseFromString(*heartbeats.get());
ASSERT_EQ(heartbeat_batch_data.batch_size(), 1);
resources = global_state_->GetAllResourceUsage();
resource_usage_batch_data.ParseFromString(*resources.get());
ASSERT_EQ(resource_usage_batch_data.batch_size(), 1);
// Report heartbeat with resources changed.
// Report changed resource usage.
std::promise<bool> promise2;
auto heartbeat2 = std::make_shared<rpc::HeartbeatTableData>();
auto heartbeat2 = std::make_shared<rpc::ResourcesData>();
heartbeat2->set_node_id(node_table_data->node_id());
(*heartbeat2->mutable_resources_total())["CPU"] = 1;
(*heartbeat2->mutable_resources_total())["GPU"] = 10;
heartbeat2->set_resources_available_changed(true);
(*heartbeat2->mutable_resources_available())["GPU"] = 5;
RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportHeartbeat(
RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportResourceUsage(
heartbeat2, [&promise2](Status status) { promise2.set_value(status.ok()); }));
WaitReady(promise2.get_future(), timeout_ms_);
heartbeats = global_state_->GetAllHeartbeat();
heartbeat_batch_data.ParseFromString(*heartbeats.get());
ASSERT_EQ(heartbeat_batch_data.batch_size(), 1);
auto heartbeat_data = heartbeat_batch_data.mutable_batch()->at(0);
ASSERT_EQ(heartbeat_data.resources_total_size(), 2);
ASSERT_EQ((*heartbeat_data.mutable_resources_total())["CPU"], 1.0);
ASSERT_EQ((*heartbeat_data.mutable_resources_total())["GPU"], 10.0);
ASSERT_EQ(heartbeat_data.resources_available_size(), 1);
ASSERT_EQ((*heartbeat_data.mutable_resources_available())["GPU"], 5.0);
resources = global_state_->GetAllResourceUsage();
resource_usage_batch_data.ParseFromString(*resources.get());
ASSERT_EQ(resource_usage_batch_data.batch_size(), 1);
auto resources_data = resource_usage_batch_data.mutable_batch()->at(0);
ASSERT_EQ(resources_data.resources_total_size(), 2);
ASSERT_EQ((*resources_data.mutable_resources_total())["CPU"], 1.0);
ASSERT_EQ((*resources_data.mutable_resources_total())["GPU"], 10.0);
ASSERT_EQ(resources_data.resources_available_size(), 1);
ASSERT_EQ((*resources_data.mutable_resources_available())["GPU"], 5.0);
// Report heartbeat with resources unchanged. (Only works with light heartbeat enabled)
// Report unchanged resource usage. (Only works with light resource usage report
// enabled)
std::promise<bool> promise3;
auto heartbeat3 = std::make_shared<rpc::HeartbeatTableData>();
auto heartbeat3 = std::make_shared<rpc::ResourcesData>();
heartbeat3->set_node_id(node_table_data->node_id());
(*heartbeat3->mutable_resources_available())["CPU"] = 1;
(*heartbeat3->mutable_resources_available())["GPU"] = 6;
RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportHeartbeat(
RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportResourceUsage(
heartbeat3, [&promise3](Status status) { promise3.set_value(status.ok()); }));
WaitReady(promise3.get_future(), timeout_ms_);
heartbeats = global_state_->GetAllHeartbeat();
heartbeat_batch_data.ParseFromString(*heartbeats.get());
ASSERT_EQ(heartbeat_batch_data.batch_size(), 1);
heartbeat_data = heartbeat_batch_data.mutable_batch()->at(0);
ASSERT_EQ(heartbeat_data.resources_total_size(), 2);
ASSERT_EQ((*heartbeat_data.mutable_resources_total())["CPU"], 1.0);
ASSERT_EQ((*heartbeat_data.mutable_resources_total())["GPU"], 10.0);
ASSERT_EQ(heartbeat_data.resources_available_size(), 1);
ASSERT_EQ((*heartbeat_data.mutable_resources_available())["GPU"], 5.0);
resources = global_state_->GetAllResourceUsage();
resource_usage_batch_data.ParseFromString(*resources.get());
ASSERT_EQ(resource_usage_batch_data.batch_size(), 1);
resources_data = resource_usage_batch_data.mutable_batch()->at(0);
ASSERT_EQ(resources_data.resources_total_size(), 2);
ASSERT_EQ((*resources_data.mutable_resources_total())["CPU"], 1.0);
ASSERT_EQ((*resources_data.mutable_resources_total())["GPU"], 10.0);
ASSERT_EQ(resources_data.resources_available_size(), 1);
ASSERT_EQ((*resources_data.mutable_resources_available())["GPU"], 5.0);
}
TEST_F(GlobalStateAccessorTest, TestProfileTable) {
@@ -328,10 +328,10 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
return WaitReady(promise.get_future(), timeout_ms_);
}
bool SubscribeBatchHeartbeat(
const gcs::ItemCallback<rpc::HeartbeatBatchTableData> &subscribe) {
bool SubscribeBatchResourceUsage(
const gcs::ItemCallback<rpc::ResourceUsageBatchData> &subscribe) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeBatchHeartbeat(
RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeBatchedResourceUsage(
subscribe, [&promise](Status status) { promise.set_value(status.ok()); }));
return WaitReady(promise.get_future(), timeout_ms_);
}
@@ -343,6 +343,13 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
return WaitReady(promise.get_future(), timeout_ms_);
}
bool ReportResourceUsage(const std::shared_ptr<rpc::ResourcesData> resources) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportResourceUsage(
resources, [&promise](Status status) { promise.set_value(status.ok()); }));
return WaitReady(promise.get_future(), timeout_ms_);
}
std::vector<rpc::AvailableResources> GetAllAvailableResources() {
std::promise<bool> promise;
std::vector<rpc::AvailableResources> resources;
@@ -715,80 +722,76 @@ TEST_F(ServiceBasedGcsClientTest, TestNodeResources) {
ASSERT_TRUE(GetResources(node_id).empty());
}
TEST_F(ServiceBasedGcsClientTest, TestNodeHeartbeat) {
TEST_F(ServiceBasedGcsClientTest, TestNodeResourceUsage) {
// Subscribe batched state of all nodes from GCS.
std::atomic<int> heartbeat_batch_count(0);
auto on_subscribe =
[&heartbeat_batch_count](const gcs::HeartbeatBatchTableData &result) {
++heartbeat_batch_count;
};
ASSERT_TRUE(SubscribeBatchHeartbeat(on_subscribe));
std::atomic<int> resource_batch_count(0);
auto on_subscribe = [&resource_batch_count](const gcs::ResourceUsageBatchData &result) {
++resource_batch_count;
};
ASSERT_TRUE(SubscribeBatchResourceUsage(on_subscribe));
// Register node.
auto node_info = Mocker::GenNodeInfo();
RAY_CHECK(RegisterNode(*node_info));
// Report heartbeat of a node to GCS.
// Report resource usage of a node to GCS.
NodeID node_id = NodeID::FromBinary(node_info->node_id());
auto heartbeat = std::make_shared<rpc::HeartbeatTableData>();
heartbeat->set_node_id(node_id.Binary());
// Set this flag because GCS won't publish unchanged heartbeat.
heartbeat->set_should_global_gc(true);
ASSERT_TRUE(ReportHeartbeat(heartbeat));
WaitForExpectedCount(heartbeat_batch_count, 1);
auto resource = std::make_shared<rpc::ResourcesData>();
resource->set_node_id(node_id.Binary());
resource->set_should_global_gc(true);
ASSERT_TRUE(ReportResourceUsage(resource));
WaitForExpectedCount(resource_batch_count, 1);
}
TEST_F(ServiceBasedGcsClientTest, TestNodeHeartbeatWithLightHeartbeat) {
TEST_F(ServiceBasedGcsClientTest, TestNodeResourceUsageWithLightResourceUsageReport) {
// Subscribe batched state of all nodes from GCS.
std::atomic<int> heartbeat_batch_count(0);
auto on_subscribe =
[&heartbeat_batch_count](const gcs::HeartbeatBatchTableData &result) {
++heartbeat_batch_count;
};
ASSERT_TRUE(SubscribeBatchHeartbeat(on_subscribe));
std::atomic<int> resource_batch_count(0);
auto on_subscribe = [&resource_batch_count](const gcs::ResourceUsageBatchData &result) {
++resource_batch_count;
};
ASSERT_TRUE(SubscribeBatchResourceUsage(on_subscribe));
// Register node.
auto node_info = Mocker::GenNodeInfo();
RAY_CHECK(RegisterNode(*node_info));
// Report unchanged heartbeat of a node to GCS.
// Report unchanged resource usage of a node to GCS.
NodeID node_id = NodeID::FromBinary(node_info->node_id());
auto heartbeat = std::make_shared<rpc::HeartbeatTableData>();
heartbeat->set_node_id(node_id.Binary());
ASSERT_TRUE(ReportHeartbeat(heartbeat));
WaitForExpectedCount(heartbeat_batch_count, 0);
auto resource = std::make_shared<rpc::ResourcesData>();
resource->set_node_id(node_id.Binary());
ASSERT_TRUE(ReportResourceUsage(resource));
WaitForExpectedCount(resource_batch_count, 0);
// Report changed heartbeat of a node to GCS.
auto heartbeat1 = std::make_shared<rpc::HeartbeatTableData>();
heartbeat1->set_node_id(node_id.Binary());
heartbeat1->set_resources_available_changed(true);
ASSERT_TRUE(ReportHeartbeat(heartbeat1));
WaitForExpectedCount(heartbeat_batch_count, 1);
// Report changed resource usage of a node to GCS.
auto resource1 = std::make_shared<rpc::ResourcesData>();
resource1->set_node_id(node_id.Binary());
resource1->set_resources_available_changed(true);
ASSERT_TRUE(ReportResourceUsage(resource1));
WaitForExpectedCount(resource_batch_count, 1);
}
TEST_F(ServiceBasedGcsClientTest, TestGetAllAvailableResources) {
// Subscribe batched state of all nodes from GCS.
std::atomic<int> heartbeat_batch_count(0);
auto on_subscribe =
[&heartbeat_batch_count](const gcs::HeartbeatBatchTableData &result) {
++heartbeat_batch_count;
};
ASSERT_TRUE(SubscribeBatchHeartbeat(on_subscribe));
std::atomic<int> resource_batch_count(0);
auto on_subscribe = [&resource_batch_count](const gcs::ResourceUsageBatchData &result) {
++resource_batch_count;
};
ASSERT_TRUE(SubscribeBatchResourceUsage(on_subscribe));
// Register node.
auto node_info = Mocker::GenNodeInfo();
RAY_CHECK(RegisterNode(*node_info));
// Report heartbeat of a node to GCS.
// Report resource usage of a node to GCS.
NodeID node_id = NodeID::FromBinary(node_info->node_id());
auto heartbeat = std::make_shared<rpc::HeartbeatTableData>();
heartbeat->set_node_id(node_id.Binary());
auto resource = std::make_shared<rpc::ResourcesData>();
resource->set_node_id(node_id.Binary());
// Set this flag to indicate resources has changed.
heartbeat->set_resources_available_changed(true);
(*heartbeat->mutable_resources_available())["CPU"] = 1.0;
(*heartbeat->mutable_resources_available())["GPU"] = 10.0;
ASSERT_TRUE(ReportHeartbeat(heartbeat));
WaitForExpectedCount(heartbeat_batch_count, 1);
resource->set_resources_available_changed(true);
(*resource->mutable_resources_available())["CPU"] = 1.0;
(*resource->mutable_resources_available())["GPU"] = 10.0;
ASSERT_TRUE(ReportResourceUsage(resource));
WaitForExpectedCount(resource_batch_count, 1);
// Assert get all available resources right.
std::vector<rpc::AvailableResources> resources = GetAllAvailableResources();
@@ -798,28 +801,28 @@ TEST_F(ServiceBasedGcsClientTest, TestGetAllAvailableResources) {
EXPECT_EQ((*resources[0].mutable_resources_available())["GPU"], 10.0);
}
TEST_F(ServiceBasedGcsClientTest, TestGetAllAvailableResourcesWithLightHeartbeat) {
TEST_F(ServiceBasedGcsClientTest,
TestGetAllAvailableResourcesWithLightResourceUsageReport) {
// Subscribe batched state of all nodes from GCS.
std::atomic<int> heartbeat_batch_count(0);
auto on_subscribe =
[&heartbeat_batch_count](const gcs::HeartbeatBatchTableData &result) {
++heartbeat_batch_count;
};
ASSERT_TRUE(SubscribeBatchHeartbeat(on_subscribe));
std::atomic<int> resource_batch_count(0);
auto on_subscribe = [&resource_batch_count](const gcs::ResourceUsageBatchData &result) {
++resource_batch_count;
};
ASSERT_TRUE(SubscribeBatchResourceUsage(on_subscribe));
// Register node.
auto node_info = Mocker::GenNodeInfo();
RAY_CHECK(RegisterNode(*node_info));
// Report heartbeat of a node to GCS.
// Report resource usage of a node to GCS.
NodeID node_id = NodeID::FromBinary(node_info->node_id());
auto heartbeat = std::make_shared<rpc::HeartbeatTableData>();
heartbeat->set_node_id(node_id.Binary());
heartbeat->set_resources_available_changed(true);
(*heartbeat->mutable_resources_available())["CPU"] = 1.0;
(*heartbeat->mutable_resources_available())["GPU"] = 10.0;
ASSERT_TRUE(ReportHeartbeat(heartbeat));
WaitForExpectedCount(heartbeat_batch_count, 1);
auto resource = std::make_shared<rpc::ResourcesData>();
resource->set_node_id(node_id.Binary());
resource->set_resources_available_changed(true);
(*resource->mutable_resources_available())["CPU"] = 1.0;
(*resource->mutable_resources_available())["GPU"] = 10.0;
ASSERT_TRUE(ReportResourceUsage(resource));
WaitForExpectedCount(resource_batch_count, 1);
// Assert get all available resources right.
std::vector<rpc::AvailableResources> resources = GetAllAvailableResources();
@@ -828,12 +831,12 @@ TEST_F(ServiceBasedGcsClientTest, TestGetAllAvailableResourcesWithLightHeartbeat
EXPECT_EQ((*resources[0].mutable_resources_available())["CPU"], 1.0);
EXPECT_EQ((*resources[0].mutable_resources_available())["GPU"], 10.0);
// Report unchanged heartbeat of a node to GCS.
auto heartbeat1 = std::make_shared<rpc::HeartbeatTableData>();
heartbeat1->set_node_id(node_id.Binary());
(*heartbeat1->mutable_resources_available())["GPU"] = 8.0;
ASSERT_TRUE(ReportHeartbeat(heartbeat1));
WaitForExpectedCount(heartbeat_batch_count, 1);
// Report unchanged resource usage of a node to GCS.
auto resource1 = std::make_shared<rpc::ResourcesData>();
resource1->set_node_id(node_id.Binary());
(*resource1->mutable_resources_available())["GPU"] = 8.0;
ASSERT_TRUE(ReportResourceUsage(resource1));
WaitForExpectedCount(resource_batch_count, 1);
// The value would remain unchanged.
std::vector<rpc::AvailableResources> resources1 = GetAllAvailableResources();
@@ -1145,24 +1148,24 @@ TEST_F(ServiceBasedGcsClientTest, TestNodeTableResubscribe) {
ASSERT_TRUE(SubscribeToResources(resource_subscribe));
// Subscribe batched state of all nodes from GCS.
std::atomic<int> batch_heartbeat_count(0);
auto batch_heartbeat_subscribe =
[&batch_heartbeat_count](const rpc::HeartbeatBatchTableData &result) {
++batch_heartbeat_count;
std::atomic<int> batch_resource_usage_count(0);
auto batch_resource_usage_subscribe =
[&batch_resource_usage_count](const rpc::ResourceUsageBatchData &result) {
++batch_resource_usage_count;
};
ASSERT_TRUE(SubscribeBatchHeartbeat(batch_heartbeat_subscribe));
ASSERT_TRUE(SubscribeBatchResourceUsage(batch_resource_usage_subscribe));
auto node_info = Mocker::GenNodeInfo(1);
ASSERT_TRUE(RegisterNode(*node_info));
NodeID node_id = NodeID::FromBinary(node_info->node_id());
std::string key = "CPU";
ASSERT_TRUE(UpdateResources(node_id, key));
auto heartbeat = std::make_shared<rpc::HeartbeatTableData>();
heartbeat->set_node_id(node_info->node_id());
// Set this flag because GCS won't publish unchanged heartbeat.
heartbeat->set_should_global_gc(true);
ASSERT_TRUE(ReportHeartbeat(heartbeat));
WaitForExpectedCount(batch_heartbeat_count, 1);
auto resources = std::make_shared<rpc::ResourcesData>();
resources->set_node_id(node_info->node_id());
// Set this flag because GCS won't publish unchanged resources.
resources->set_should_global_gc(true);
ASSERT_TRUE(ReportResourceUsage(resources));
WaitForExpectedCount(batch_resource_usage_count, 1);
RestartGcsServer();
@@ -1170,12 +1173,12 @@ TEST_F(ServiceBasedGcsClientTest, TestNodeTableResubscribe) {
ASSERT_TRUE(RegisterNode(*node_info));
node_id = NodeID::FromBinary(node_info->node_id());
ASSERT_TRUE(UpdateResources(node_id, key));
heartbeat->set_node_id(node_info->node_id());
ASSERT_TRUE(ReportHeartbeat(heartbeat));
resources->set_node_id(node_info->node_id());
ASSERT_TRUE(ReportResourceUsage(resources));
WaitForExpectedCount(node_change_count, 2);
WaitForExpectedCount(resource_change_count, 2);
WaitForExpectedCount(batch_heartbeat_count, 2);
WaitForExpectedCount(batch_resource_usage_count, 2);
}
TEST_F(ServiceBasedGcsClientTest, TestTaskTableResubscribe) {
@@ -0,0 +1,111 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/gcs/gcs_server/gcs_heartbeat_manager.h"
#include "ray/common/ray_config.h"
#include "ray/gcs/pb_util.h"
#include "src/ray/protobuf/gcs.pb.h"
namespace ray {
namespace gcs {
GcsHeartbeatManager::GcsHeartbeatManager(
boost::asio::io_service &io_service,
std::function<void(const NodeID &)> on_node_death_callback)
: io_service_(io_service),
on_node_death_callback_(std::move(on_node_death_callback)),
num_heartbeats_timeout_(RayConfig::instance().num_heartbeats_timeout()),
detect_timer_(io_service) {
io_service_thread_.reset(new std::thread([this] {
/// The asio work to keep io_service_ alive.
boost::asio::io_service::work io_service_work_(io_service_);
io_service_.run();
}));
}
void GcsHeartbeatManager::Start() {
io_service_.post([this] {
if (!is_started_) {
Tick();
is_started_ = true;
}
});
}
void GcsHeartbeatManager::Stop() {
io_service_.stop();
if (io_service_thread_->joinable()) {
io_service_thread_->join();
}
}
void GcsHeartbeatManager::AddNode(const NodeID &node_id) {
io_service_.post(
[this, node_id] { heartbeats_.emplace(node_id, num_heartbeats_timeout_); });
}
void GcsHeartbeatManager::HandleReportHeartbeat(
const rpc::ReportHeartbeatRequest &request, rpc::ReportHeartbeatReply *reply,
rpc::SendReplyCallback send_reply_callback) {
NodeID node_id = NodeID::FromBinary(request.heartbeat().node_id());
auto iter = heartbeats_.find(node_id);
if (iter == heartbeats_.end()) {
// Ignore this heartbeat as the node is not registered.
// TODO(Shanly): Maybe we should reply the raylet with an error. So the raylet can
// crash itself as soon as possible.
return;
}
iter->second = num_heartbeats_timeout_;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}
/// A periodic timer that checks for timed out clients.
void GcsHeartbeatManager::Tick() {
DetectDeadNodes();
ScheduleTick();
}
void GcsHeartbeatManager::DetectDeadNodes() {
for (auto it = heartbeats_.begin(); it != heartbeats_.end();) {
auto current = it++;
current->second = current->second - 1;
if (current->second == 0) {
auto node_id = current->first;
RAY_LOG(WARNING) << "Node timed out: " << node_id;
heartbeats_.erase(current);
if (on_node_death_callback_) {
on_node_death_callback_(node_id);
}
}
}
}
void GcsHeartbeatManager::ScheduleTick() {
auto heartbeat_period = boost::posix_time::milliseconds(
RayConfig::instance().raylet_heartbeat_timeout_milliseconds());
detect_timer_.expires_from_now(heartbeat_period);
detect_timer_.async_wait([this](const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
// `operation_aborted` is set when `detect_timer_` is canceled or destroyed.
// The Monitor lifetime may be short than the object who use it. (e.g. gcs_server)
return;
}
RAY_CHECK(!error) << "Checking heartbeat failed with error: " << error.message();
Tick();
});
}
} // namespace gcs
} // namespace ray
@@ -0,0 +1,89 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "absl/container/flat_hash_map.h"
#include "ray/common/id.h"
#include "ray/gcs/accessor.h"
#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/rpc/client_call.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
#include "src/ray/protobuf/gcs.pb.h"
namespace ray {
namespace gcs {
/// GcsHeartbeatManager is responsible for monitoring nodes liveness as well as
/// handing heartbeat rpc requests. This class is not thread-safe.
class GcsHeartbeatManager : public rpc::HeartbeatInfoHandler {
public:
/// Create a GcsHeartbeatManager.
///
/// \param io_service The event loop to run the monitor on.
/// \param on_node_death_callback Callback that will be called when node death is
/// detected.
explicit GcsHeartbeatManager(
boost::asio::io_service &io_service,
std::function<void(const NodeID &)> on_node_death_callback);
/// Handle heartbeat rpc come from raylet.
void HandleReportHeartbeat(const rpc::ReportHeartbeatRequest &request,
rpc::ReportHeartbeatReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Start node failure detect loop.
void Start();
/// Stop node failure detect loop.
void Stop();
/// Register node to this detector.
/// Only if the node has registered, its heartbeat data will be accepted.
///
/// \param node_id ID of the node to be registered.
void AddNode(const NodeID &node_id);
protected:
/// A periodic timer that fires on every heartbeat period. Raylets that have
/// not sent a heartbeat within the last num_heartbeats_timeout ticks will be
/// marked as dead in the client table.
void Tick();
/// Check that if any raylet is inactive due to no heartbeat for a period of time.
/// If found any, mark it as dead.
void DetectDeadNodes();
/// Schedule another tick after a short time.
void ScheduleTick();
private:
/// The main event loop for node failure detector.
boost::asio::io_service &io_service_;
std::unique_ptr<std::thread> io_service_thread_;
/// The callback of node death.
std::function<void(const NodeID &)> on_node_death_callback_;
/// The number of heartbeats that can be missed before a node is removed.
int64_t num_heartbeats_timeout_;
/// A timer that ticks every heartbeat_timeout_ms_ milliseconds.
boost::asio::deadline_timer detect_timer_;
/// For each Raylet that we receive a heartbeat from, the number of ticks
/// that may pass before the Raylet will be declared dead.
absl::flat_hash_map<NodeID, int64_t> heartbeats_;
/// Is the detect started.
bool is_started_ = false;
};
} // namespace gcs
} // namespace ray
+84 -196
View File
@@ -22,113 +22,18 @@
namespace ray {
namespace gcs {
GcsNodeManager::NodeFailureDetector::NodeFailureDetector(
boost::asio::io_service &io_service,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::function<void(const NodeID &)> on_node_death_callback)
: gcs_table_storage_(std::move(gcs_table_storage)),
on_node_death_callback_(std::move(on_node_death_callback)),
num_heartbeats_timeout_(RayConfig::instance().num_heartbeats_timeout()),
light_heartbeat_enabled_(RayConfig::instance().light_heartbeat_enabled()),
detect_timer_(io_service),
gcs_pub_sub_(std::move(gcs_pub_sub)) {}
void GcsNodeManager::NodeFailureDetector::Start() {
if (!is_started_) {
Tick();
is_started_ = true;
}
}
void GcsNodeManager::NodeFailureDetector::AddNode(const NodeID &node_id) {
heartbeats_.emplace(node_id, num_heartbeats_timeout_);
}
void GcsNodeManager::NodeFailureDetector::HandleHeartbeat(const NodeID &node_id) {
auto iter = heartbeats_.find(node_id);
if (iter == heartbeats_.end()) {
// Ignore this heartbeat as the node is not registered.
// TODO(Shanly): Maybe we should reply the raylet with an error. So the raylet can
// crash itself as soon as possible.
return;
}
iter->second = num_heartbeats_timeout_;
}
/// A periodic timer that checks for timed out clients.
void GcsNodeManager::NodeFailureDetector::Tick() {
DetectDeadNodes();
ScheduleTick();
}
void GcsNodeManager::NodeFailureDetector::DetectDeadNodes() {
for (auto it = heartbeats_.begin(); it != heartbeats_.end();) {
auto current = it++;
current->second = current->second - 1;
if (current->second == 0) {
auto node_id = current->first;
RAY_LOG(WARNING) << "Node timed out: " << node_id;
heartbeats_.erase(current);
if (on_node_death_callback_) {
on_node_death_callback_(node_id);
}
}
}
}
void GcsNodeManager::NodeFailureDetector::ScheduleTick() {
auto heartbeat_period = boost::posix_time::milliseconds(
RayConfig::instance().raylet_heartbeat_timeout_milliseconds());
detect_timer_.expires_from_now(heartbeat_period);
detect_timer_.async_wait([this](const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
// `operation_aborted` is set when `detect_timer_` is canceled or destroyed.
// The Monitor lifetime may be short than the object who use it. (e.g. gcs_server)
return;
}
RAY_CHECK(!error) << "Checking heartbeat failed with error: " << error.message();
Tick();
});
}
//////////////////////////////////////////////////////////////////////////////////////////
GcsNodeManager::GcsNodeManager(
boost::asio::io_service &main_io_service,
boost::asio::io_service &node_failure_detector_io_service,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
boost::asio::io_service &main_io_service, std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager)
: main_io_service_(main_io_service),
node_failure_detector_(new NodeFailureDetector(
node_failure_detector_io_service, gcs_table_storage, gcs_pub_sub,
[this](const NodeID &node_id) {
// Post this to main event loop to avoid potential concurrency issues.
main_io_service_.post([this, node_id] {
if (auto node = RemoveNode(node_id, /* is_intended = */ false)) {
node->set_state(rpc::GcsNodeInfo::DEAD);
node->set_timestamp(current_sys_time_ms());
AddDeadNodeToCache(node);
auto on_done = [this, node_id, node](const Status &status) {
auto on_done = [this, node_id, node](const Status &status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
NODE_CHANNEL, node_id.Hex(), node->SerializeAsString(), nullptr));
};
RAY_CHECK_OK(
gcs_table_storage_->NodeResourceTable().Delete(node_id, on_done));
};
RAY_CHECK_OK(
gcs_table_storage_->NodeTable().Put(node_id, *node, on_done));
}
});
})),
node_failure_detector_service_(node_failure_detector_io_service),
heartbeat_timer_(main_io_service),
: resource_timer_(main_io_service),
light_report_resource_usage_enabled_(
RayConfig::instance().light_report_resource_usage_enabled()),
gcs_pub_sub_(gcs_pub_sub),
gcs_table_storage_(gcs_table_storage),
gcs_resource_manager_(gcs_resource_manager) {
SendBatchedHeartbeat();
SendBatchedResourceUsage();
}
void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request,
@@ -192,46 +97,27 @@ void GcsNodeManager::HandleGetAllNodeInfo(const rpc::GetAllNodeInfoRequest &requ
++counts_[CountType::GET_ALL_NODE_INFO_REQUEST];
}
void GcsNodeManager::HandleReportHeartbeat(const rpc::ReportHeartbeatRequest &request,
rpc::ReportHeartbeatReply *reply,
rpc::SendReplyCallback send_reply_callback) {
NodeID node_id = NodeID::FromBinary(request.heartbeat().node_id());
auto heartbeat_data = std::make_shared<rpc::HeartbeatTableData>();
heartbeat_data->CopyFrom(request.heartbeat());
UpdateNodeHeartbeat(node_id, request);
// Update node realtime resources.
UpdateNodeRealtimeResources(node_id, *heartbeat_data);
if (!RayConfig::instance().light_heartbeat_enabled() ||
heartbeat_data->should_global_gc() || heartbeat_data->resources_total_size() > 0 ||
heartbeat_data->resources_available_changed() ||
heartbeat_data->resource_load_changed()) {
heartbeat_buffer_[node_id] = *heartbeat_data;
}
// Note: To avoid heartbeats being delayed by main thread, make sure heartbeat is always
// handled by its own IO service.
node_failure_detector_service_.post(
[this, node_id] { node_failure_detector_->HandleHeartbeat(node_id); });
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
++counts_[CountType::REPORT_HEARTBEAT_REQUEST];
}
// TODO(WangTao): Implenent this to handle resource usage report. Basically move resources
// related operations in `HandleReportHeartbeat`.
void GcsNodeManager::HandleReportResourceUsage(
const rpc::ReportResourceUsageRequest &request, rpc::ReportResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) {
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}
NodeID node_id = NodeID::FromBinary(request.resources().node_id());
auto resources_data = std::make_shared<rpc::ResourcesData>();
resources_data->CopyFrom(request.resources());
UpdateNodeResourceUsage(node_id, request);
// Update node realtime resources.
UpdateNodeRealtimeResources(node_id, *resources_data);
if (!light_report_resource_usage_enabled_ || resources_data->should_global_gc() ||
resources_data->resources_total_size() > 0 ||
resources_data->resources_available_changed() ||
resources_data->resource_load_changed()) {
resources_buffer_[node_id] = *resources_data;
}
// TODO(WangTao): Implement this. Basically copy from `HandleGetAllHeartbeat`.
void GcsNodeManager::HandleGetAllResourceUsage(
const rpc::GetAllResourceUsageRequest &request, rpc::GetAllResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) {
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST];
}
void GcsNodeManager::HandleGetResources(const rpc::GetResourcesRequest &request,
@@ -364,15 +250,15 @@ void GcsNodeManager::HandleGetAllAvailableResources(
++counts_[CountType::GET_ALL_AVAILABLE_RESOURCES_REQUEST];
}
void GcsNodeManager::HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &request,
rpc::GetAllHeartbeatReply *reply,
rpc::SendReplyCallback send_reply_callback) {
if (!node_heartbeats_.empty()) {
auto batch = std::make_shared<rpc::HeartbeatBatchTableData>();
void GcsNodeManager::HandleGetAllResourceUsage(
const rpc::GetAllResourceUsageRequest &request, rpc::GetAllResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) {
if (!node_resource_usages_.empty()) {
auto batch = std::make_shared<rpc::ResourceUsageBatchData>();
absl::flat_hash_map<ResourceSet, rpc::ResourceDemand> aggregate_load;
for (const auto &heartbeat : node_heartbeats_) {
for (const auto &usage : node_resource_usages_) {
// Aggregate the load reported by each raylet.
auto load = heartbeat.second.resource_load_by_shape();
auto load = usage.second.resource_load_by_shape();
for (const auto &demand : load.resource_demands()) {
auto scheduling_key = ResourceSet(MapFromProtobuf(demand.shape()));
auto &aggregate_demand = aggregate_load[scheduling_key];
@@ -388,7 +274,7 @@ void GcsNodeManager::HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &re
}
}
batch->add_batch()->CopyFrom(heartbeat.second);
batch->add_batch()->CopyFrom(usage.second);
}
for (const auto &demand : aggregate_load) {
@@ -406,34 +292,33 @@ void GcsNodeManager::HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &re
auto placement_group_load_proto = batch->mutable_placement_group_load();
placement_group_load_proto->CopyFrom(*placement_group_load.get());
}
reply->mutable_heartbeat_data()->CopyFrom(*batch);
reply->mutable_resource_usage_data()->CopyFrom(*batch);
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
++counts_[CountType::GET_ALL_HEARTBEAT_REQUEST];
++counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST];
}
void GcsNodeManager::UpdateNodeHeartbeat(const NodeID node_id,
const rpc::ReportHeartbeatRequest &request) {
auto iter = node_heartbeats_.find(node_id);
if (!RayConfig::instance().light_heartbeat_enabled() ||
iter == node_heartbeats_.end()) {
auto heartbeat_data = std::make_shared<rpc::HeartbeatTableData>();
heartbeat_data->CopyFrom(request.heartbeat());
node_heartbeats_[node_id] = *heartbeat_data;
void GcsNodeManager::UpdateNodeResourceUsage(
const NodeID node_id, const rpc::ReportResourceUsageRequest &request) {
auto iter = node_resource_usages_.find(node_id);
if (!light_report_resource_usage_enabled_ || iter == node_resource_usages_.end()) {
auto resources_data = std::make_shared<rpc::ResourcesData>();
resources_data->CopyFrom(request.resources());
node_resource_usages_[node_id] = *resources_data;
} else {
if (request.heartbeat().resources_total_size() > 0) {
(*iter->second.mutable_resources_total()) = request.heartbeat().resources_total();
if (request.resources().resources_total_size() > 0) {
(*iter->second.mutable_resources_total()) = request.resources().resources_total();
}
if (request.heartbeat().resources_available_changed()) {
if (request.resources().resources_available_changed()) {
(*iter->second.mutable_resources_available()) =
request.heartbeat().resources_available();
request.resources().resources_available();
}
if (request.heartbeat().resource_load_changed()) {
(*iter->second.mutable_resource_load()) = request.heartbeat().resource_load();
if (request.resources().resource_load_changed()) {
(*iter->second.mutable_resource_load()) = request.resources().resource_load();
}
(*iter->second.mutable_resource_load_by_shape()) =
request.heartbeat().resource_load_by_shape();
request.resources().resource_load_by_shape();
}
}
@@ -454,11 +339,6 @@ void GcsNodeManager::AddNode(std::shared_ptr<rpc::GcsNodeInfo> node) {
alive_nodes_.emplace(node_id, node);
// Add an empty resources for this node.
RAY_CHECK(cluster_resources_.emplace(node_id, rpc::ResourceMap()).second);
// Register this node to the `node_failure_detector_` which will start monitoring it.
// Note: To avoid heartbeats being delayed by main thread, make sure node addition is
// always handled by its own IO service.
node_failure_detector_service_.post(
[this, node_id] { node_failure_detector_->AddNode(node_id); });
// Notify all listeners.
for (auto &listener : node_added_listeners_) {
@@ -480,7 +360,7 @@ std::shared_ptr<rpc::GcsNodeInfo> GcsNodeManager::RemoveNode(
alive_nodes_.erase(iter);
// Remove from cluster resources.
cluster_resources_.erase(node_id);
heartbeat_buffer_.erase(node_id);
resources_buffer_.erase(node_id);
if (!is_intended) {
// Broadcast a warning to all of the drivers indicating that the node
// has been marked as dead.
@@ -505,11 +385,25 @@ std::shared_ptr<rpc::GcsNodeInfo> GcsNodeManager::RemoveNode(
return removed_node;
}
void GcsNodeManager::OnNodeFailure(const NodeID &node_id) {
if (auto node = RemoveNode(node_id, /* is_intended = */ false)) {
node->set_state(rpc::GcsNodeInfo::DEAD);
node->set_timestamp(current_sys_time_ms());
AddDeadNodeToCache(node);
auto on_done = [this, node_id, node](const Status &status) {
auto on_done = [this, node_id, node](const Status &status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(),
node->SerializeAsString(), nullptr));
};
RAY_CHECK_OK(gcs_table_storage_->NodeResourceTable().Delete(node_id, on_done));
};
RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_done));
}
}
void GcsNodeManager::Initialize(const GcsInitData &gcs_init_data) {
for (const auto &item : gcs_init_data.Nodes()) {
if (item.second.state() == rpc::GcsNodeInfo::ALIVE) {
// Call `AddNode` for this node to make sure it is tracked by the failure
// detector.
AddNode(std::make_shared<rpc::GcsNodeInfo>(item.second));
} else if (item.second.state() == rpc::GcsNodeInfo::DEAD) {
dead_nodes_.emplace(item.first, std::make_shared<rpc::GcsNodeInfo>(item.second));
@@ -527,19 +421,13 @@ void GcsNodeManager::Initialize(const GcsInitData &gcs_init_data) {
}
}
void GcsNodeManager::StartNodeFailureDetector() {
// Note: To avoid heartbeats being delayed by main thread, make sure detector start is
// always handled by its own IO service.
node_failure_detector_service_.post([this] { node_failure_detector_->Start(); });
}
void GcsNodeManager::UpdateNodeRealtimeResources(
const NodeID &node_id, const rpc::HeartbeatTableData &heartbeat) {
if (!RayConfig::instance().light_heartbeat_enabled() ||
const NodeID &node_id, const rpc::ResourcesData &resource_data) {
if (!light_report_resource_usage_enabled_ ||
gcs_resource_manager_->GetClusterResources().count(node_id) == 0 ||
heartbeat.resources_available_changed()) {
resource_data.resources_available_changed()) {
gcs_resource_manager_->UpdateResources(
node_id, ResourceSet(MapFromProtobuf(heartbeat.resources_available())));
node_id, ResourceSet(MapFromProtobuf(resource_data.resources_available())));
}
}
@@ -560,31 +448,31 @@ void GcsNodeManager::AddDeadNodeToCache(std::shared_ptr<rpc::GcsNodeInfo> node)
sorted_dead_node_list_.emplace_back(node_id, node->timestamp());
}
void GcsNodeManager::SendBatchedHeartbeat() {
if (!heartbeat_buffer_.empty()) {
auto batch = std::make_shared<rpc::HeartbeatBatchTableData>();
for (auto &heartbeat : heartbeat_buffer_) {
batch->add_batch()->Swap(&heartbeat.second);
void GcsNodeManager::SendBatchedResourceUsage() {
if (!resources_buffer_.empty()) {
auto batch = std::make_shared<rpc::ResourceUsageBatchData>();
for (auto &resources : resources_buffer_) {
batch->add_batch()->Swap(&resources.second);
}
stats::OutboundHeartbeatSizeKB.Record((double)(batch->ByteSizeLong() / 1024.0));
RAY_CHECK_OK(gcs_pub_sub_->Publish(HEARTBEAT_BATCH_CHANNEL, "",
RAY_CHECK_OK(gcs_pub_sub_->Publish(RESOURCES_BATCH_CHANNEL, "",
batch->SerializeAsString(), nullptr));
heartbeat_buffer_.clear();
resources_buffer_.clear();
}
auto heartbeat_period = boost::posix_time::milliseconds(
RayConfig::instance().raylet_heartbeat_timeout_milliseconds());
heartbeat_timer_.expires_from_now(heartbeat_period);
heartbeat_timer_.async_wait([this](const boost::system::error_code &error) {
auto resources_period = boost::posix_time::milliseconds(
RayConfig::instance().raylet_report_resources_period_milliseconds());
resource_timer_.expires_from_now(resources_period);
resource_timer_.async_wait([this](const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
// `operation_aborted` is set when `heartbeat_timer_` is canceled or destroyed.
// `operation_aborted` is set when `resource_timer_` is canceled or destroyed.
// The Monitor lifetime may be short than the object who use it. (e.g. gcs_server)
return;
}
RAY_CHECK(!error) << "Sending batched heartbeat failed with error: "
RAY_CHECK(!error) << "Sending batched resource usage failed with error: "
<< error.message();
SendBatchedHeartbeat();
SendBatchedResourceUsage();
});
}
@@ -596,11 +484,11 @@ std::string GcsNodeManager::DebugString() const {
<< counts_[CountType::UNREGISTER_NODE_REQUEST]
<< ", GetAllNodeInfo request count: "
<< counts_[CountType::GET_ALL_NODE_INFO_REQUEST]
<< ", ReportHeartbeat request count: "
<< counts_[CountType::REPORT_HEARTBEAT_REQUEST]
<< ", ReportResourceUsage request count: "
<< counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST]
<< ", GetHeartbeat request count: " << counts_[CountType::GET_HEARTBEAT_REQUEST]
<< ", GetAllHeartbeat request count: "
<< counts_[CountType::GET_ALL_HEARTBEAT_REQUEST]
<< ", GetAllResourceUsage request count: "
<< counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST]
<< ", GetResources request count: " << counts_[CountType::GET_RESOURCES_REQUEST]
<< ", UpdateResources request count: "
<< counts_[CountType::UPDATE_RESOURCES_REQUEST]
+19 -102
View File
@@ -37,12 +37,10 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
/// Create a GcsNodeManager.
///
/// \param main_io_service The main event loop.
/// \param node_failure_detector_io_service The event loop of node failure detector.
/// \param gcs_pub_sub GCS message publisher.
/// \param gcs_table_storage GCS table external storage accessor.
/// \param gcs_resource_manager GCS resource manager.
explicit GcsNodeManager(boost::asio::io_service &main_io_service,
boost::asio::io_service &node_failure_detector_io_service,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager);
@@ -62,11 +60,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
rpc::GetAllNodeInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle heartbeat rpc come from raylet.
void HandleReportHeartbeat(const rpc::ReportHeartbeatRequest &request,
rpc::ReportHeartbeatReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle report resource usage rpc come from raylet.
void HandleReportResourceUsage(const rpc::ReportResourceUsageRequest &request,
rpc::ReportResourceUsageReply *reply,
@@ -108,17 +101,14 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
rpc::GetAllAvailableResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle get all heartbeat rpc request. Only used when light heartbeat enabled.
void HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &request,
rpc::GetAllHeartbeatReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Update heartbeat of given node.
/// Update resource usage of given node.
///
/// \param node_id Node id.
/// \param request Request containing heartbeat.
void UpdateNodeHeartbeat(const NodeID node_id,
const rpc::ReportHeartbeatRequest &request);
/// \param request Request containing resource usage.
void UpdateNodeResourceUsage(const NodeID node_id,
const rpc::ReportResourceUsageRequest &request);
void OnNodeFailure(const NodeID &node_id);
/// Add an alive node.
///
@@ -171,12 +161,9 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
/// \param gcs_init_data.
void Initialize(const GcsInitData &gcs_init_data);
/// Start node failure detector.
void StartNodeFailureDetector();
// Update node realtime resources.
void UpdateNodeRealtimeResources(const NodeID &node_id,
const rpc::HeartbeatTableData &heartbeat);
const rpc::ResourcesData &heartbeat);
/// Update the placement group load information so that it will be reported through
/// heartbeat.
@@ -187,72 +174,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
std::string DebugString() const;
protected:
class NodeFailureDetector {
public:
/// Create a NodeFailureDetector.
///
/// \param io_service The event loop to run the monitor on.
/// \param gcs_table_storage GCS table external storage accessor.
/// \param gcs_pub_sub GCS message publisher.
/// \param on_node_death_callback Callback that will be called when node death is
/// detected.
explicit NodeFailureDetector(
boost::asio::io_service &io_service,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::function<void(const NodeID &)> on_node_death_callback);
// Note: To avoid heartbeats being delayed by main thread, all public methods below
// should be posted to its own IO service.
/// Start failure detector.
void Start();
/// Register node to this detector.
/// Only if the node has registered, its heartbeat data will be accepted.
///
/// \param node_id ID of the node to be registered.
void AddNode(const NodeID &node_id);
/// Handle a heartbeat from a Raylet.
///
/// \param node_id The node ID of the Raylet that sent the heartbeat.
void HandleHeartbeat(const NodeID &node_id);
protected:
/// A periodic timer that fires on every heartbeat period. Raylets that have
/// not sent a heartbeat within the last num_heartbeats_timeout ticks will be
/// marked as dead in the client table.
void Tick();
/// Check that if any raylet is inactive due to no heartbeat for a period of time.
/// If found any, mark it as dead.
void DetectDeadNodes();
/// Schedule another tick after a short time.
void ScheduleTick();
protected:
/// Storage for GCS tables.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
/// The callback of node death.
std::function<void(const NodeID &)> on_node_death_callback_;
/// The number of heartbeats that can be missed before a node is removed.
int64_t num_heartbeats_timeout_;
// Only the changed part will be included in heartbeat if this is true.
const bool light_heartbeat_enabled_;
/// A timer that ticks every heartbeat_timeout_ms_ milliseconds.
boost::asio::deadline_timer detect_timer_;
/// For each Raylet that we receive a heartbeat from, the number of ticks
/// that may pass before the Raylet will be declared dead.
absl::flat_hash_map<NodeID, int64_t> heartbeats_;
/// A publisher for publishing gcs messages.
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
/// Is the detect started.
bool is_started_ = false;
};
private:
/// Add the dead node to the cache. If the cache is full, the earliest dead node is
/// evicted.
@@ -260,17 +181,13 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
/// \param node The node which is dead.
void AddDeadNodeToCache(std::shared_ptr<rpc::GcsNodeInfo> node);
/// Send any buffered heartbeats as a single publish.
void SendBatchedHeartbeat();
/// Send any buffered resource usage as a single publish.
void SendBatchedResourceUsage();
/// The main event loop for node failure detector.
boost::asio::io_service &main_io_service_;
/// Detector to detect the failure of node.
std::unique_ptr<NodeFailureDetector> node_failure_detector_;
/// The event loop for node failure detector.
boost::asio::io_service &node_failure_detector_service_;
/// A timer that ticks every heartbeat_timeout_ms_ milliseconds.
boost::asio::deadline_timer heartbeat_timer_;
/// A timer that ticks every raylet_report_resources_period_milliseconds.
boost::asio::deadline_timer resource_timer_;
// Only the changed part will be reported if this is true.
const bool light_report_resource_usage_enabled_;
/// Alive nodes.
absl::flat_hash_map<NodeID, std::shared_ptr<rpc::GcsNodeInfo>> alive_nodes_;
/// Dead nodes.
@@ -280,10 +197,10 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
std::list<std::pair<NodeID, int64_t>> sorted_dead_node_list_;
/// Cluster resources.
absl::flat_hash_map<NodeID, rpc::ResourceMap> cluster_resources_;
/// Newest heartbeat of all nodes.
absl::flat_hash_map<NodeID, rpc::HeartbeatTableData> node_heartbeats_;
/// A buffer containing heartbeats received from node managers in the last tick.
absl::flat_hash_map<NodeID, rpc::HeartbeatTableData> heartbeat_buffer_;
/// Newest resource usage of all nodes.
absl::flat_hash_map<NodeID, rpc::ResourcesData> node_resource_usages_;
/// A buffer containing resource usage received from node managers in the last tick.
absl::flat_hash_map<NodeID, rpc::ResourcesData> resources_buffer_;
/// Listeners which monitors the addition of nodes.
std::vector<std::function<void(std::shared_ptr<rpc::GcsNodeInfo>)>>
node_added_listeners_;
@@ -304,9 +221,9 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
REGISTER_NODE_REQUEST = 0,
UNREGISTER_NODE_REQUEST = 1,
GET_ALL_NODE_INFO_REQUEST = 2,
REPORT_HEARTBEAT_REQUEST = 3,
REPORT_RESOURCE_USAGE_REQUEST = 3,
GET_HEARTBEAT_REQUEST = 4,
GET_ALL_HEARTBEAT_REQUEST = 5,
GET_ALL_RESOURCE_USAGE_REQUEST = 5,
GET_RESOURCES_REQUEST = 6,
UPDATE_RESOURCES_REQUEST = 7,
DELETE_RESOURCES_REQUEST = 8,
+26 -14
View File
@@ -73,6 +73,9 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init gcs node manager.
InitGcsNodeManager(gcs_init_data);
// Init gcs heartbeat manager.
InitGcsHeartbeatManager(gcs_init_data);
// Init gcs job manager.
InitGcsJobManager();
@@ -103,11 +106,11 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Store gcs rpc server address in redis.
StoreGcsServerAddressInRedis();
// Only after the rpc_server_ is running can the node failure
// detector be run. Otherwise the node failure detector will mistake
// Only after the rpc_server_ is running can the heartbeat manager
// be run. Otherwise the node failure detector will mistake
// some living nodes as dead as the timer inside node failure
// detector is already run.
gcs_node_manager_->StartNodeFailureDetector();
gcs_heartbeat_manager_->Start();
// Print debug info periodically.
PrintDebugInfo();
@@ -121,10 +124,7 @@ void GcsServer::Stop() {
// Shutdown the rpc server
rpc_server_.Shutdown();
node_manager_io_service_.stop();
if (node_manager_io_service_thread_->joinable()) {
node_manager_io_service_thread_->join();
}
gcs_heartbeat_manager_->Stop();
is_stopped_ = true;
RAY_LOG(INFO) << "GCS server stopped.";
@@ -133,14 +133,8 @@ void GcsServer::Stop() {
void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(redis_gcs_client_ && gcs_table_storage_ && gcs_pub_sub_);
node_manager_io_service_thread_.reset(new std::thread([this] {
/// The asio work to keep node_manager_io_service_ alive.
boost::asio::io_service::work node_manager_io_service_work_(node_manager_io_service_);
node_manager_io_service_.run();
}));
gcs_node_manager_ = std::make_shared<GcsNodeManager>(
main_service_, node_manager_io_service_, gcs_pub_sub_, gcs_table_storage_,
gcs_resource_manager_);
main_service_, gcs_pub_sub_, gcs_table_storage_, gcs_resource_manager_);
// Initialize by gcs tables data.
gcs_node_manager_->Initialize(gcs_init_data);
// Register service.
@@ -149,6 +143,23 @@ void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) {
rpc_server_.RegisterService(*node_info_service_);
}
void GcsServer::InitGcsHeartbeatManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_node_manager_);
gcs_heartbeat_manager_ = std::make_shared<GcsHeartbeatManager>(
heartbeat_manager_io_service_, /*on_node_death_callback=*/
[this](const NodeID &node_id) {
main_service_.post(
[this, node_id] { return gcs_node_manager_->OnNodeFailure(node_id); });
});
for (const auto &node : gcs_init_data.Nodes()) {
gcs_heartbeat_manager_->AddNode(node.first);
}
// Register service.
heartbeat_info_service_.reset(new rpc::HeartbeatInfoGrpcService(
heartbeat_manager_io_service_, *gcs_heartbeat_manager_));
rpc_server_.RegisterService(*heartbeat_info_service_);
}
void GcsServer::InitGcsResourceManager() {
gcs_resource_manager_ = std::make_shared<GcsResourceManager>();
}
@@ -276,6 +287,7 @@ void GcsServer::InstallEventListeners() {
// placement groups and the pending actors.
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
gcs_actor_manager_->SchedulePendingActors();
gcs_heartbeat_manager_->AddNode(NodeID::FromBinary(node->node_id()));
});
gcs_node_manager_->AddNodeRemovedListener(
[this](std::shared_ptr<rpc::GcsNodeInfo> node) {
+11 -4
View File
@@ -14,6 +14,7 @@
#pragma once
#include "ray/gcs/gcs_server/gcs_heartbeat_manager.h"
#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/gcs/gcs_server/gcs_object_manager.h"
#include "ray/gcs/gcs_server/gcs_redis_failure_detector.h"
@@ -79,6 +80,9 @@ class GcsServer {
/// Initialize gcs node manager.
void InitGcsNodeManager(const GcsInitData &gcs_init_data);
/// Initialize gcs heartbeat manager.
void InitGcsHeartbeatManager(const GcsInitData &gcs_init_data);
/// Initialize gcs resource manager.
void InitGcsResourceManager();
@@ -124,10 +128,9 @@ class GcsServer {
GcsServerConfig config_;
/// The main io service to drive event posted from grpc threads.
boost::asio::io_context &main_service_;
/// The io service used by node manager in case of node failure detector being blocked
/// by main thread.
boost::asio::io_service node_manager_io_service_;
std::unique_ptr<std::thread> node_manager_io_service_thread_;
/// The io service used by heartbeat manager in case of node failure detector being
/// blocked by main thread.
boost::asio::io_service heartbeat_manager_io_service_;
/// The grpc server
rpc::GrpcServer rpc_server_;
/// The `ClientCallManager` object that is shared by all `NodeManagerWorkerClient`s.
@@ -138,6 +141,8 @@ class GcsServer {
std::shared_ptr<GcsResourceManager> gcs_resource_manager_;
/// The gcs node manager.
std::shared_ptr<GcsNodeManager> gcs_node_manager_;
/// The heartbeat manager.
std::shared_ptr<GcsHeartbeatManager> gcs_heartbeat_manager_;
/// The gcs redis failure detector.
std::shared_ptr<GcsRedisFailureDetector> gcs_redis_failure_detector_;
/// The gcs actor manager
@@ -151,6 +156,8 @@ class GcsServer {
std::unique_ptr<rpc::ActorInfoGrpcService> actor_info_service_;
/// Node info handler and service
std::unique_ptr<rpc::NodeInfoGrpcService> node_info_service_;
/// Heartbeat info handler and service
std::unique_ptr<rpc::HeartbeatInfoGrpcService> heartbeat_info_service_;
/// Object info handler and service
std::unique_ptr<gcs::GcsObjectManager> gcs_object_manager_;
std::unique_ptr<rpc::ObjectInfoGrpcService> object_info_service_;
+1 -1
View File
@@ -130,7 +130,7 @@ template class GcsTable<JobID, JobTableData>;
template class GcsTable<NodeID, GcsNodeInfo>;
template class GcsTable<NodeID, ResourceMap>;
template class GcsTable<NodeID, HeartbeatTableData>;
template class GcsTable<NodeID, HeartbeatBatchTableData>;
template class GcsTable<NodeID, ResourceUsageBatchData>;
template class GcsTable<JobID, ErrorTableData>;
template class GcsTable<UniqueID, ProfileTableData>;
template class GcsTable<WorkerID, WorkerTableData>;
+10 -10
View File
@@ -26,7 +26,6 @@ namespace gcs {
using rpc::ActorTableData;
using rpc::ErrorTableData;
using rpc::GcsNodeInfo;
using rpc::HeartbeatBatchTableData;
using rpc::HeartbeatTableData;
using rpc::JobTableData;
using rpc::ObjectLocationInfo;
@@ -35,6 +34,7 @@ using rpc::PlacementGroupTableData;
using rpc::ProfileTableData;
using rpc::ResourceMap;
using rpc::ResourceTableData;
using rpc::ResourceUsageBatchData;
using rpc::ScheduleData;
using rpc::StoredConfig;
using rpc::TaskLeaseData;
@@ -255,11 +255,11 @@ class GcsPlacementGroupScheduleTable : public GcsTable<PlacementGroupID, Schedul
}
};
class GcsHeartbeatBatchTable : public GcsTable<NodeID, HeartbeatBatchTableData> {
class GcsResourceUsageBatchTable : public GcsTable<NodeID, ResourceUsageBatchData> {
public:
explicit GcsHeartbeatBatchTable(std::shared_ptr<StoreClient> &store_client)
explicit GcsResourceUsageBatchTable(std::shared_ptr<StoreClient> &store_client)
: GcsTable(store_client) {
table_name_ = TablePrefix_Name(TablePrefix::HEARTBEAT_BATCH);
table_name_ = TablePrefix_Name(TablePrefix::RESOURCE_USAGE_BATCH);
}
};
@@ -348,9 +348,9 @@ class GcsTableStorage {
return *heartbeat_table_;
}
GcsHeartbeatBatchTable &HeartbeatBatchTable() {
RAY_CHECK(heartbeat_batch_table_ != nullptr);
return *heartbeat_batch_table_;
GcsResourceUsageBatchTable &HeartbeatBatchTable() {
RAY_CHECK(resource_usage_batch_table_ != nullptr);
return *resource_usage_batch_table_;
}
GcsProfileTable &ProfileTable() {
@@ -381,7 +381,7 @@ class GcsTableStorage {
std::unique_ptr<GcsNodeResourceTable> node_resource_table_;
std::unique_ptr<GcsPlacementGroupScheduleTable> placement_group_schedule_table_;
std::unique_ptr<GcsHeartbeatTable> heartbeat_table_;
std::unique_ptr<GcsHeartbeatBatchTable> heartbeat_batch_table_;
std::unique_ptr<GcsResourceUsageBatchTable> resource_usage_batch_table_;
std::unique_ptr<GcsProfileTable> profile_table_;
std::unique_ptr<GcsWorkerTable> worker_table_;
std::unique_ptr<GcsInternalConfigTable> system_config_table_;
@@ -408,7 +408,7 @@ class RedisGcsTableStorage : public GcsTableStorage {
heartbeat_table_.reset(new GcsHeartbeatTable(store_client_));
placement_group_schedule_table_.reset(
new GcsPlacementGroupScheduleTable(store_client_));
heartbeat_batch_table_.reset(new GcsHeartbeatBatchTable(store_client_));
resource_usage_batch_table_.reset(new GcsResourceUsageBatchTable(store_client_));
profile_table_.reset(new GcsProfileTable(store_client_));
worker_table_.reset(new GcsWorkerTable(store_client_));
system_config_table_.reset(new GcsInternalConfigTable(store_client_));
@@ -434,7 +434,7 @@ class InMemoryGcsTableStorage : public GcsTableStorage {
placement_group_schedule_table_.reset(
new GcsPlacementGroupScheduleTable(store_client_));
heartbeat_table_.reset(new GcsHeartbeatTable(store_client_));
heartbeat_batch_table_.reset(new GcsHeartbeatBatchTable(store_client_));
resource_usage_batch_table_.reset(new GcsResourceUsageBatchTable(store_client_));
profile_table_.reset(new GcsProfileTable(store_client_));
worker_table_.reset(new GcsWorkerTable(store_client_));
system_config_table_.reset(new GcsInternalConfigTable(store_client_));
@@ -27,9 +27,8 @@ class GcsActorSchedulerTest : public ::testing::Test {
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_table_storage_ = std::make_shared<gcs::RedisGcsTableStorage>(redis_client_);
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>();
gcs_node_manager_ =
std::make_shared<gcs::GcsNodeManager>(io_service_, io_service_, gcs_pub_sub_,
gcs_table_storage_, gcs_resource_manager_);
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
io_service_, gcs_pub_sub_, gcs_table_storage_, gcs_resource_manager_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
gcs_actor_table_ =
std::make_shared<GcsServerMocker::MockedGcsActorTable>(store_client_);
@@ -34,8 +34,8 @@ class GcsNodeManagerTest : public ::testing::Test {
TEST_F(GcsNodeManagerTest, TestManagement) {
boost::asio::io_service io_service;
gcs::GcsNodeManager node_manager(io_service, io_service, gcs_pub_sub_,
gcs_table_storage_, gcs_resource_manager_);
gcs::GcsNodeManager node_manager(io_service, gcs_pub_sub_, gcs_table_storage_,
gcs_resource_manager_);
// Test Add/Get/Remove functionality.
auto node = Mocker::GenNodeInfo();
auto node_id = NodeID::FromBinary(node->node_id());
@@ -49,8 +49,8 @@ TEST_F(GcsNodeManagerTest, TestManagement) {
TEST_F(GcsNodeManagerTest, TestListener) {
boost::asio::io_service io_service;
gcs::GcsNodeManager node_manager(io_service, io_service, gcs_pub_sub_,
gcs_table_storage_, gcs_resource_manager_);
gcs::GcsNodeManager node_manager(io_service, gcs_pub_sub_, gcs_table_storage_,
gcs_resource_manager_);
// Test AddNodeAddedListener.
int node_count = 1000;
std::vector<std::shared_ptr<rpc::GcsNodeInfo>> added_nodes;
@@ -55,9 +55,8 @@ class GcsObjectManagerTest : public ::testing::Test {
void SetUp() override {
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>();
gcs_node_manager_ =
std::make_shared<gcs::GcsNodeManager>(io_service_, io_service_, gcs_pub_sub_,
gcs_table_storage_, gcs_resource_manager_);
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
io_service_, gcs_pub_sub_, gcs_table_storage_, gcs_resource_manager_);
gcs_object_manager_ = std::make_shared<MockedGcsObjectManager>(
gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_);
GenTestData();
@@ -69,9 +69,8 @@ class GcsPlacementGroupManagerTest : public ::testing::Test {
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>();
gcs_node_manager_ =
std::make_shared<gcs::GcsNodeManager>(io_service_, io_service_, gcs_pub_sub_,
gcs_table_storage_, gcs_resource_manager_);
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
io_service_, gcs_pub_sub_, gcs_table_storage_, gcs_resource_manager_);
gcs_placement_group_manager_.reset(
new gcs::GcsPlacementGroupManager(io_service_, mock_placement_group_scheduler_,
gcs_table_storage_, *gcs_node_manager_));
@@ -40,9 +40,8 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>();
gcs_node_manager_ =
std::make_shared<gcs::GcsNodeManager>(io_service_, io_service_, gcs_pub_sub_,
gcs_table_storage_, gcs_resource_manager_);
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
io_service_, gcs_pub_sub_, gcs_table_storage_, gcs_resource_manager_);
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
raylet_client_pool_ = std::make_shared<rpc::NodeManagerClientPool>(
@@ -99,11 +98,11 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
void AddNode(const std::shared_ptr<rpc::GcsNodeInfo> &node, int cpu_num = 10) {
gcs_node_manager_->AddNode(node);
rpc::HeartbeatTableData heartbeat;
heartbeat.set_node_id(node->node_id());
(*heartbeat.mutable_resources_available())["CPU"] = cpu_num;
rpc::ResourcesData resource;
resource.set_node_id(node->node_id());
(*resource.mutable_resources_available())["CPU"] = cpu_num;
gcs_node_manager_->UpdateNodeRealtimeResources(NodeID::FromBinary(node->node_id()),
heartbeat);
resource);
}
void ScheduleFailedWithZeroNodeTest(rpc::PlacementStrategy strategy) {
@@ -394,8 +394,8 @@ struct GcsServerMocker {
return Status::NotImplemented("");
}
Status AsyncSubscribeBatchHeartbeat(
const gcs::ItemCallback<rpc::HeartbeatBatchTableData> &subscribe,
Status AsyncSubscribeBatchedResourceUsage(
const gcs::ItemCallback<rpc::ResourceUsageBatchData> &subscribe,
const gcs::StatusCallback &done) override {
return Status::NotImplemented("");
}
+1 -1
View File
@@ -32,7 +32,7 @@ namespace gcs {
#define OBJECT_CHANNEL "OBJECT"
#define TASK_CHANNEL "TASK"
#define TASK_LEASE_CHANNEL "TASK_LEASE"
#define HEARTBEAT_BATCH_CHANNEL "HEARTBEAT_BATCH"
#define RESOURCES_BATCH_CHANNEL "RESOURCES_BATCH"
#define ERROR_INFO_CHANNEL "ERROR_INFO"
/// \class GcsPubSub
+12 -7
View File
@@ -422,7 +422,7 @@ Status RedisObjectInfoAccessor::AsyncUnsubscribeToLocations(const ObjectID &obje
RedisNodeInfoAccessor::RedisNodeInfoAccessor(RedisGcsClient *client_impl)
: client_impl_(client_impl),
resource_sub_executor_(client_impl_->resource_table()),
heartbeat_batch_sub_executor_(client_impl->heartbeat_batch_table()) {}
resource_usage_batch_sub_executor_(client_impl->resource_usage_batch_table()) {}
Status RedisNodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_info,
const StatusCallback &callback) {
@@ -530,18 +530,23 @@ Status RedisNodeInfoAccessor::AsyncReportHeartbeat(
return heartbeat_table.Add(JobID::Nil(), node_id, data_ptr, on_done);
}
void RedisNodeInfoAccessor::AsyncReReportHeartbeat() {}
Status RedisNodeInfoAccessor::AsyncReportResourceUsage(
const std::shared_ptr<rpc::ResourcesData> &data_ptr, const StatusCallback &callback) {
return Status::Invalid("Not implemented");
}
Status RedisNodeInfoAccessor::AsyncSubscribeBatchHeartbeat(
const ItemCallback<HeartbeatBatchTableData> &subscribe, const StatusCallback &done) {
void RedisNodeInfoAccessor::AsyncReReportResourceUsage() {}
Status RedisNodeInfoAccessor::AsyncSubscribeBatchedResourceUsage(
const ItemCallback<ResourceUsageBatchData> &subscribe, const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
auto on_subscribe = [subscribe](const NodeID &node_id,
const HeartbeatBatchTableData &data) {
const ResourceUsageBatchData &data) {
subscribe(data);
};
return heartbeat_batch_sub_executor_.AsyncSubscribeAll(NodeID::Nil(), on_subscribe,
done);
return resource_usage_batch_sub_executor_.AsyncSubscribeAll(NodeID::Nil(), on_subscribe,
done);
}
Status RedisNodeInfoAccessor::AsyncGetResources(
+11 -8
View File
@@ -347,15 +347,18 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor {
Status AsyncReportHeartbeat(const std::shared_ptr<HeartbeatTableData> &data_ptr,
const StatusCallback &callback) override;
void AsyncReReportHeartbeat() override;
Status AsyncReportResourceUsage(const std::shared_ptr<rpc::ResourcesData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncGetAllHeartbeat(
const ItemCallback<rpc::HeartbeatBatchTableData> &callback) override {
return Status::NotImplemented("AsyncGetAllHeartbeat not implemented");
void AsyncReReportResourceUsage() override;
Status AsyncGetAllResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &callback) override {
return Status::NotImplemented("AsyncGetAllResourceUsage not implemented");
}
Status AsyncSubscribeBatchHeartbeat(
const ItemCallback<HeartbeatBatchTableData> &subscribe,
Status AsyncSubscribeBatchedResourceUsage(
const ItemCallback<ResourceUsageBatchData> &subscribe,
const StatusCallback &done) override;
void AsyncResubscribe(bool is_pubsub_server_restarted) override {}
@@ -378,9 +381,9 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor {
DynamicResourceSubscriptionExecutor;
DynamicResourceSubscriptionExecutor resource_sub_executor_;
typedef SubscriptionExecutor<NodeID, HeartbeatBatchTableData, HeartbeatBatchTable>
typedef SubscriptionExecutor<NodeID, ResourceUsageBatchData, ResourceUsageBatchTable>
HeartbeatBatchSubscriptionExecutor;
HeartbeatBatchSubscriptionExecutor heartbeat_batch_sub_executor_;
HeartbeatBatchSubscriptionExecutor resource_usage_batch_sub_executor_;
};
/// \class RedisErrorInfoAccessor
+3 -3
View File
@@ -55,7 +55,7 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) {
node_table_.reset(new NodeTable({primary_context}, this));
job_table_.reset(new JobTable({primary_context}, this));
heartbeat_batch_table_.reset(new HeartbeatBatchTable({primary_context}, this));
resource_usage_batch_table_.reset(new ResourceUsageBatchTable({primary_context}, this));
// Tables below would be sharded.
object_table_.reset(new ObjectTable(shard_contexts, this));
raylet_task_table_.reset(new raylet::TaskTable(shard_contexts, this, command_type_));
@@ -128,8 +128,8 @@ NodeTable &RedisGcsClient::node_table() { return *node_table_; }
HeartbeatTable &RedisGcsClient::heartbeat_table() { return *heartbeat_table_; }
HeartbeatBatchTable &RedisGcsClient::heartbeat_batch_table() {
return *heartbeat_batch_table_;
ResourceUsageBatchTable &RedisGcsClient::resource_usage_batch_table() {
return *resource_usage_batch_table_;
}
JobTable &RedisGcsClient::job_table() { return *job_table_; }
+2 -2
View File
@@ -93,7 +93,7 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
/// Implements the Nodes() interface.
NodeTable &node_table();
HeartbeatTable &heartbeat_table();
HeartbeatBatchTable &heartbeat_batch_table();
ResourceUsageBatchTable &resource_usage_batch_table();
DynamicResourceTable &resource_table();
/// Implements the Tasks() interface.
virtual raylet::TaskTable &raylet_task_table();
@@ -118,7 +118,7 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
std::unique_ptr<TaskReconstructionLog> task_reconstruction_log_;
std::unique_ptr<TaskLeaseTable> task_lease_table_;
std::unique_ptr<HeartbeatTable> heartbeat_table_;
std::unique_ptr<HeartbeatBatchTable> heartbeat_batch_table_;
std::unique_ptr<ResourceUsageBatchTable> resource_usage_batch_table_;
std::unique_ptr<ProfileTable> profile_table_;
std::unique_ptr<NodeTable> node_table_;
std::unique_ptr<DynamicResourceTable> resource_table_;
+2 -1
View File
@@ -206,7 +206,8 @@ template class SubscriptionExecutor<TaskID, boost::optional<TaskLeaseData>,
TaskLeaseTable>;
template class SubscriptionExecutor<NodeID, ResourceChangeNotification,
DynamicResourceTable>;
template class SubscriptionExecutor<NodeID, HeartbeatBatchTableData, HeartbeatBatchTable>;
template class SubscriptionExecutor<NodeID, ResourceUsageBatchData,
ResourceUsageBatchTable>;
template class SubscriptionExecutor<WorkerID, WorkerTableData, WorkerTable>;
} // namespace gcs
+2 -2
View File
@@ -829,12 +829,12 @@ template class Log<ActorID, ActorTableData>;
template class Log<TaskID, TaskReconstructionData>;
template class Table<TaskID, TaskLeaseData>;
template class Table<NodeID, HeartbeatTableData>;
template class Table<NodeID, HeartbeatBatchTableData>;
template class Table<NodeID, ResourceUsageBatchData>;
template class Log<NodeID, GcsNodeInfo>;
template class Log<JobID, JobTableData>;
template class Log<UniqueID, ProfileTableData>;
template class Log<NodeID, HeartbeatTableData>;
template class Log<NodeID, HeartbeatBatchTableData>;
template class Log<NodeID, ResourceUsageBatchData>;
template class Log<WorkerID, WorkerTableData>;
template class Table<WorkerID, WorkerTableData>;
template class Table<ActorID, ActorTableData>;
+7 -7
View File
@@ -39,12 +39,12 @@ using rpc::ErrorTableData;
using rpc::GcsChangeMode;
using rpc::GcsEntry;
using rpc::GcsNodeInfo;
using rpc::HeartbeatBatchTableData;
using rpc::HeartbeatTableData;
using rpc::JobTableData;
using rpc::ObjectTableData;
using rpc::ProfileTableData;
using rpc::ResourceTableData;
using rpc::ResourceUsageBatchData;
using rpc::TablePrefix;
using rpc::TablePubsub;
using rpc::TaskLeaseData;
@@ -688,15 +688,15 @@ class HeartbeatTable : public Table<NodeID, HeartbeatTableData> {
virtual ~HeartbeatTable() {}
};
class HeartbeatBatchTable : public Table<NodeID, HeartbeatBatchTableData> {
class ResourceUsageBatchTable : public Table<NodeID, ResourceUsageBatchData> {
public:
HeartbeatBatchTable(const std::vector<std::shared_ptr<RedisContext>> &contexts,
RedisGcsClient *client)
ResourceUsageBatchTable(const std::vector<std::shared_ptr<RedisContext>> &contexts,
RedisGcsClient *client)
: Table(contexts, client) {
pubsub_channel_ = TablePubsub::HEARTBEAT_BATCH_PUBSUB;
prefix_ = TablePrefix::HEARTBEAT_BATCH;
pubsub_channel_ = TablePubsub::RESOURCE_USAGE_BATCH_PUBSUB;
prefix_ = TablePrefix::RESOURCE_USAGE_BATCH;
}
virtual ~HeartbeatBatchTable() {}
virtual ~ResourceUsageBatchTable() {}
};
class JobTable : public Log<JobID, JobTableData> {
+9 -25
View File
@@ -32,7 +32,7 @@ enum TablePrefix {
FUNCTION = 7;
TASK_RECONSTRUCTION = 8;
HEARTBEAT = 9;
HEARTBEAT_BATCH = 10;
RESOURCE_USAGE_BATCH = 10;
JOB = 11;
PROFILE = 12;
TASK_LEASE = 13;
@@ -56,7 +56,7 @@ enum TablePubsub {
OBJECT_PUBSUB = 5;
ACTOR_PUBSUB = 6;
HEARTBEAT_PUBSUB = 7;
HEARTBEAT_BATCH_PUBSUB = 8;
RESOURCE_USAGE_BATCH_PUBSUB = 8;
TASK_LEASE_PUBSUB = 9;
JOB_PUBSUB = 10;
NODE_RESOURCE_PUBSUB = 11;
@@ -302,6 +302,11 @@ message PlacementGroupLoad {
message HeartbeatTableData {
// Node id.
bytes node_id = 1;
}
message ResourcesData {
// Node id.
bytes node_id = 1;
// Resource capacity currently available on this node manager.
map<string, double> resources_available = 2;
// Indicates whether available resources is changed. Only used when light
@@ -320,29 +325,8 @@ message HeartbeatTableData {
bool should_global_gc = 8;
}
message ResourcesData {
// Node id.
bytes node_id = 1;
// Resource capacity currently available on this node manager.
map<string, double> resources_available = 2;
// Indicates whether avaialbe resources is changed. Only used when
// light heartbeat enabled.
bool resources_available_changed = 3;
// Total resource capacity configured for this node manager.
map<string, double> resources_total = 4;
// Aggregate outstanding resource load on this node manager.
map<string, double> resource_load = 5;
// Indicates whether resource load is changed. Only used when
// light heartbeat enabled.
bool resource_load_changed = 6;
// The resource load on this node, sorted by resource shape.
ResourceLoad resource_load_by_shape = 7;
// Whether this node manager is requesting global GC.
bool should_global_gc = 8;
}
message HeartbeatBatchTableData {
repeated HeartbeatTableData batch = 1;
message ResourceUsageBatchData {
repeated ResourcesData batch = 1;
// The total resource demand on all nodes included in the batch, sorted by
// resource shape.
ResourceLoad resource_load_by_shape = 2;
+7 -13
View File
@@ -164,14 +164,6 @@ message ReportHeartbeatReply {
GcsStatus status = 1;
}
message GetAllHeartbeatRequest {
}
message GetAllHeartbeatReply {
GcsStatus status = 1;
HeartbeatBatchTableData heartbeat_data = 2;
}
message ReportResourceUsageRequest {
ResourcesData resources = 1;
}
@@ -185,7 +177,7 @@ message GetAllResourceUsageRequest {
message GetAllResourceUsageReply {
GcsStatus status = 1;
repeated ResourcesData resources_list = 2;
ResourceUsageBatchData resource_usage_data = 2;
}
message GetResourcesRequest {
@@ -247,11 +239,7 @@ service NodeInfoGcsService {
rpc UnregisterNode(UnregisterNodeRequest) returns (UnregisterNodeReply);
// Get information of all nodes from GCS Service.
rpc GetAllNodeInfo(GetAllNodeInfoRequest) returns (GetAllNodeInfoReply);
// Report heartbeat of a node to GCS Service.
rpc ReportHeartbeat(ReportHeartbeatRequest) returns (ReportHeartbeatReply);
// Get newest heartbeat of all nodes from GCS Service.
rpc GetAllHeartbeat(GetAllHeartbeatRequest) returns (GetAllHeartbeatReply);
// Report resource usage of a node to GCS Service.
rpc ReportResourceUsage(ReportResourceUsageRequest) returns (ReportResourceUsageReply);
// Get resource usage of all nodes from GCS Service.
rpc GetAllResourceUsage(GetAllResourceUsageRequest) returns (GetAllResourceUsageReply);
@@ -270,6 +258,12 @@ service NodeInfoGcsService {
returns (GetAllAvailableResourcesReply);
}
// Service for heartbeat info access.
service HeartbeatInfoGcsService {
// Report heartbeat of a node to GCS Service.
rpc ReportHeartbeat(ReportHeartbeatRequest) returns (ReportHeartbeatReply);
}
message GetObjectLocationsRequest {
// The ID of object to lookup in GCS Service.
bytes object_id = 1;
+2
View File
@@ -214,6 +214,8 @@ int main(int argc, char *argv[]) {
node_manager_config.heartbeat_period_ms =
RayConfig::instance().raylet_heartbeat_timeout_milliseconds();
node_manager_config.report_resources_period_ms =
RayConfig::instance().raylet_report_resources_period_milliseconds();
node_manager_config.debug_dump_period_ms =
RayConfig::instance().debug_dump_period_milliseconds();
node_manager_config.record_metrics_period_ms =
+150 -122
View File
@@ -124,12 +124,16 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
object_directory_(object_directory),
heartbeat_timer_(io_service),
heartbeat_period_(std::chrono::milliseconds(config.heartbeat_period_ms)),
report_resources_timer_(io_service),
report_resources_period_(
std::chrono::milliseconds(config.report_resources_period_ms)),
debug_dump_period_(config.debug_dump_period_ms),
fair_queueing_enabled_(config.fair_queueing_enabled),
object_pinning_enabled_(config.object_pinning_enabled),
temp_dir_(config.temp_dir),
object_manager_profile_timer_(io_service),
light_heartbeat_enabled_(RayConfig::instance().light_heartbeat_enabled()),
light_report_resource_usage_enabled_(
RayConfig::instance().light_report_resource_usage_enabled()),
initial_config_(config),
local_available_resources_(config.resource_config),
worker_pool_(
@@ -268,13 +272,13 @@ ray::Status NodeManager::RegisterGcs() {
RAY_RETURN_NOT_OK(
gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, on_done));
// Subscribe to heartbeat batches from the monitor.
const auto &heartbeat_batch_added =
[this](const HeartbeatBatchTableData &heartbeat_batch) {
HeartbeatBatchAdded(heartbeat_batch);
// Subscribe to resource usage batches from the monitor.
const auto &resource_usage_batch_added =
[this](const ResourceUsageBatchData &resource_usage_batch) {
ResourceUsageBatchAdded(resource_usage_batch);
};
RAY_RETURN_NOT_OK(gcs_client_->Nodes().AsyncSubscribeBatchHeartbeat(
heartbeat_batch_added, /*done*/ nullptr));
RAY_RETURN_NOT_OK(gcs_client_->Nodes().AsyncSubscribeBatchedResourceUsage(
resource_usage_batch_added, /*done*/ nullptr));
// Subscribe to all unexpected failure notifications from the local and
// remote raylets. Note that this does not include workers that failed due to
@@ -303,6 +307,7 @@ ray::Status NodeManager::RegisterGcs() {
last_heartbeat_at_ms_ = current_time_ms();
last_debug_dump_at_ms_ = current_time_ms();
Heartbeat();
ReportResourceUsage();
// Start the timer that gets object manager profiling information and sends it
// to the GCS.
GetObjectManagerProfileInfo();
@@ -402,97 +407,7 @@ void NodeManager::Heartbeat() {
stats::HeartbeatReportMs.Record(interval);
auto heartbeat_data = std::make_shared<HeartbeatTableData>();
SchedulingResources &local_resources = cluster_resource_map_[self_node_id_];
heartbeat_data->set_node_id(self_node_id_.Binary());
if (new_scheduler_enabled_) {
new_resource_scheduler_->Heartbeat(light_heartbeat_enabled_, heartbeat_data);
cluster_task_manager_->Heartbeat(light_heartbeat_enabled_, heartbeat_data);
} else {
// TODO(atumanov): modify the heartbeat table protocol to use the ResourceSet
// directly.
// TODO(atumanov): implement a ResourceSet const_iterator.
// If light heartbeat enabled, we only set filed that represent resources changed.
if (light_heartbeat_enabled_) {
auto last_heartbeat_resources = gcs_client_->Nodes().GetLastHeartbeatResources();
if (!last_heartbeat_resources->GetTotalResources().IsEqual(
local_resources.GetTotalResources())) {
for (const auto &resource_pair :
local_resources.GetTotalResources().GetResourceMap()) {
(*heartbeat_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources->SetTotalResources(
ResourceSet(local_resources.GetTotalResources()));
}
if (!last_heartbeat_resources->GetAvailableResources().IsEqual(
local_resources.GetAvailableResources())) {
heartbeat_data->set_resources_available_changed(true);
for (const auto &resource_pair :
local_resources.GetAvailableResources().GetResourceMap()) {
(*heartbeat_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources->SetAvailableResources(
ResourceSet(local_resources.GetAvailableResources()));
}
local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad());
if (!last_heartbeat_resources->GetLoadResources().IsEqual(
local_resources.GetLoadResources())) {
heartbeat_data->set_resource_load_changed(true);
for (const auto &resource_pair :
local_resources.GetLoadResources().GetResourceMap()) {
(*heartbeat_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources->SetLoadResources(
ResourceSet(local_resources.GetLoadResources()));
}
} else {
// If light heartbeat disabled, we send whole resources information every time.
for (const auto &resource_pair :
local_resources.GetTotalResources().GetResourceMap()) {
(*heartbeat_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}
for (const auto &resource_pair :
local_resources.GetAvailableResources().GetResourceMap()) {
(*heartbeat_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad());
for (const auto &resource_pair :
local_resources.GetLoadResources().GetResourceMap()) {
(*heartbeat_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
}
}
if (!new_scheduler_enabled_) {
// Add resource load by shape. This will be used by the new autoscaler.
auto resource_load = local_queues_.GetResourceLoadByShape(
RayConfig::instance().max_resource_shapes_per_load_report());
heartbeat_data->mutable_resource_load_by_shape()->Swap(&resource_load);
}
// Set the global gc bit on the outgoing heartbeat message.
if (should_global_gc_) {
heartbeat_data->set_should_global_gc(true);
should_global_gc_ = false;
}
// Trigger local GC if needed. This throttles the frequency of local GC calls
// to at most once per heartbeat interval.
if (should_local_gc_) {
DoLocalGC();
should_local_gc_ = false;
}
RAY_CHECK_OK(
gcs_client_->Nodes().AsyncReportHeartbeat(heartbeat_data, /*done*/ nullptr));
@@ -521,6 +436,118 @@ void NodeManager::Heartbeat() {
});
}
void NodeManager::ReportResourceUsage() {
auto resources_data = std::make_shared<rpc::ResourcesData>();
SchedulingResources &local_resources = cluster_resource_map_[self_node_id_];
resources_data->set_node_id(self_node_id_.Binary());
if (new_scheduler_enabled_) {
new_resource_scheduler_->FillResourceUsage(light_report_resource_usage_enabled_,
resources_data);
cluster_task_manager_->FillResourceUsage(light_report_resource_usage_enabled_,
resources_data);
} else {
// TODO(atumanov): modify the heartbeat table protocol to use the ResourceSet
// directly.
// TODO(atumanov): implement a ResourceSet const_iterator.
// If light resource usage report enabled, we only set filed that represent resources
// changed.
if (light_report_resource_usage_enabled_) {
auto last_heartbeat_resources = gcs_client_->Nodes().GetLastResourceUsage();
if (!last_heartbeat_resources->GetTotalResources().IsEqual(
local_resources.GetTotalResources())) {
for (const auto &resource_pair :
local_resources.GetTotalResources().GetResourceMap()) {
(*resources_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources->SetTotalResources(
ResourceSet(local_resources.GetTotalResources()));
}
if (!last_heartbeat_resources->GetAvailableResources().IsEqual(
local_resources.GetAvailableResources())) {
resources_data->set_resources_available_changed(true);
for (const auto &resource_pair :
local_resources.GetAvailableResources().GetResourceMap()) {
(*resources_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources->SetAvailableResources(
ResourceSet(local_resources.GetAvailableResources()));
}
local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad());
if (!last_heartbeat_resources->GetLoadResources().IsEqual(
local_resources.GetLoadResources())) {
resources_data->set_resource_load_changed(true);
for (const auto &resource_pair :
local_resources.GetLoadResources().GetResourceMap()) {
(*resources_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources->SetLoadResources(
ResourceSet(local_resources.GetLoadResources()));
}
} else {
// If light resource usage report disabled, we send whole resources information
// every time.
for (const auto &resource_pair :
local_resources.GetTotalResources().GetResourceMap()) {
(*resources_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}
for (const auto &resource_pair :
local_resources.GetAvailableResources().GetResourceMap()) {
(*resources_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad());
for (const auto &resource_pair :
local_resources.GetLoadResources().GetResourceMap()) {
(*resources_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
}
}
if (!new_scheduler_enabled_) {
// Add resource load by shape. This will be used by the new autoscaler.
auto resource_load = local_queues_.GetResourceLoadByShape(
RayConfig::instance().max_resource_shapes_per_load_report());
resources_data->mutable_resource_load_by_shape()->Swap(&resource_load);
}
// Set the global gc bit on the outgoing heartbeat message.
if (should_global_gc_) {
resources_data->set_should_global_gc(true);
should_global_gc_ = false;
}
// Trigger local GC if needed. This throttles the frequency of local GC calls
// to at most once per heartbeat interval.
if (should_local_gc_) {
DoLocalGC();
should_local_gc_ = false;
}
if (resources_data->resources_total_size() > 0 ||
resources_data->resources_available_changed() ||
resources_data->resource_load_changed() || resources_data->should_global_gc()) {
RAY_CHECK_OK(
gcs_client_->Nodes().AsyncReportResourceUsage(resources_data, /*done*/ nullptr));
}
// Reset the timer.
report_resources_timer_.expires_from_now(report_resources_period_);
report_resources_timer_.async_wait([this](const boost::system::error_code &error) {
RAY_CHECK(!error);
ReportResourceUsage();
});
}
void NodeManager::DoLocalGC() {
auto all_workers = worker_pool_.GetAllRegisteredWorkers();
for (const auto &driver : worker_pool_.GetAllRegisteredDrivers()) {
@@ -908,47 +935,47 @@ void NodeManager::TryLocalInfeasibleTaskScheduling() {
}
}
void NodeManager::HeartbeatAdded(const NodeID &node_id,
const HeartbeatTableData &heartbeat_data) {
void NodeManager::ResourceUsageAdded(const NodeID &node_id,
const rpc::ResourcesData &resource_data) {
// Locate the node id in remote node table and update available resources based on
// the received heartbeat information.
// the received resource usage information.
auto it = cluster_resource_map_.find(node_id);
if (it == cluster_resource_map_.end()) {
// Haven't received the node registration for this node yet, skip this heartbeat.
RAY_LOG(INFO) << "[HeartbeatAdded]: received heartbeat from unknown node id "
// Haven't received the node registration for this node yet, skip this message.
RAY_LOG(INFO) << "[ResourceUsageAdded]: received resource usage from unknown node id "
<< node_id;
return;
}
// Trigger local GC at the next heartbeat interval.
if (heartbeat_data.should_global_gc()) {
if (resource_data.should_global_gc()) {
should_local_gc_ = true;
}
SchedulingResources &remote_resources = it->second;
// If light heartbeat enabled, we update remote resources only when related resources
// map in heartbeat is not empty.
if (light_heartbeat_enabled_) {
if (heartbeat_data.resources_total_size() > 0) {
ResourceSet remote_total(MapFromProtobuf(heartbeat_data.resources_total()));
// If light resource usage report enabled, we update remote resources only when related
// resources map in heartbeat is not empty.
if (light_report_resource_usage_enabled_) {
if (resource_data.resources_total_size() > 0) {
ResourceSet remote_total(MapFromProtobuf(resource_data.resources_total()));
remote_resources.SetTotalResources(std::move(remote_total));
}
if (heartbeat_data.resources_available_changed()) {
ResourceSet remote_available(MapFromProtobuf(heartbeat_data.resources_available()));
if (resource_data.resources_available_changed()) {
ResourceSet remote_available(MapFromProtobuf(resource_data.resources_available()));
remote_resources.SetAvailableResources(std::move(remote_available));
}
if (heartbeat_data.resource_load_changed()) {
ResourceSet remote_load(MapFromProtobuf(heartbeat_data.resource_load()));
if (resource_data.resource_load_changed()) {
ResourceSet remote_load(MapFromProtobuf(resource_data.resource_load()));
// Extract the load information and save it locally.
remote_resources.SetLoadResources(std::move(remote_load));
}
} else {
// If light heartbeat disabled, we update remote resources every time.
ResourceSet remote_total(MapFromProtobuf(heartbeat_data.resources_total()));
// If light resource usage report disabled, we update remote resources every time.
ResourceSet remote_total(MapFromProtobuf(resource_data.resources_total()));
remote_resources.SetTotalResources(std::move(remote_total));
ResourceSet remote_available(MapFromProtobuf(heartbeat_data.resources_available()));
ResourceSet remote_available(MapFromProtobuf(resource_data.resources_available()));
remote_resources.SetAvailableResources(std::move(remote_available));
ResourceSet remote_load(MapFromProtobuf(heartbeat_data.resource_load()));
ResourceSet remote_load(MapFromProtobuf(resource_data.resource_load()));
// Extract the load information and save it locally.
remote_resources.SetLoadResources(std::move(remote_load));
}
@@ -987,15 +1014,16 @@ void NodeManager::HeartbeatAdded(const NodeID &node_id,
}
}
void NodeManager::HeartbeatBatchAdded(const HeartbeatBatchTableData &heartbeat_batch) {
// Update load information provided by each heartbeat.
for (const auto &heartbeat_data : heartbeat_batch.batch()) {
const NodeID &node_id = NodeID::FromBinary(heartbeat_data.node_id());
void NodeManager::ResourceUsageBatchAdded(
const ResourceUsageBatchData &resource_usage_batch) {
// Update load information provided by each message.
for (const auto &resource_usage : resource_usage_batch.batch()) {
const NodeID &node_id = NodeID::FromBinary(resource_usage.node_id());
if (node_id == self_node_id_) {
// Skip heartbeats from self.
// Skip messages from self.
continue;
}
HeartbeatAdded(node_id, heartbeat_data);
ResourceUsageAdded(node_id, resource_usage);
}
}
+19 -10
View File
@@ -49,9 +49,9 @@ namespace raylet {
using rpc::ActorTableData;
using rpc::ErrorType;
using rpc::GcsNodeInfo;
using rpc::HeartbeatBatchTableData;
using rpc::HeartbeatTableData;
using rpc::JobTableData;
using rpc::ResourceUsageBatchData;
struct NodeManagerConfig {
/// The node's resource configuration.
@@ -83,6 +83,8 @@ struct NodeManagerConfig {
std::string agent_command;
/// The time between heartbeats in milliseconds.
uint64_t heartbeat_period_ms;
/// The time between reports resources in milliseconds.
uint64_t report_resources_period_ms;
/// The time between debug dumps in milliseconds, or -1 to disable.
uint64_t debug_dump_period_ms;
/// Whether to enable fair queueing between task classes in raylet.
@@ -218,6 +220,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// Send heartbeats to the GCS.
void Heartbeat();
/// Report resource usage to the GCS.
void ReportResourceUsage();
/// Write out debug state to a file.
void DumpDebugState() const;
@@ -230,16 +235,16 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// \return Void.
void GetObjectManagerProfileInfo();
/// Handler for a heartbeat notification from the GCS.
/// Handler for a resource usage notification from the GCS.
///
/// \param id The ID of the node manager that sent the heartbeat.
/// \param data The heartbeat data including load information.
/// \param id The ID of the node manager that sent the resources data.
/// \param data The resources data including load information.
/// \return Void.
void HeartbeatAdded(const NodeID &id, const HeartbeatTableData &data);
/// Handler for a heartbeat batch notification from the GCS
void ResourceUsageAdded(const NodeID &id, const rpc::ResourcesData &data);
/// Handler for a resource usage batch notification from the GCS
///
/// \param heartbeat_batch The batch of heartbeat data.
void HeartbeatBatchAdded(const HeartbeatBatchTableData &heartbeat_batch);
/// \param resource_usage_batch The batch of resource usage data.
void ResourceUsageBatchAdded(const ResourceUsageBatchData &resource_usage_batch);
/// Methods for task scheduling.
@@ -680,6 +685,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
boost::asio::steady_timer heartbeat_timer_;
/// The period used for the heartbeat timer.
std::chrono::milliseconds heartbeat_period_;
/// The timer used to report resources.
boost::asio::steady_timer report_resources_timer_;
/// The period used for the resources report timer.
std::chrono::milliseconds report_resources_period_;
/// The period between debug state dumps.
int64_t debug_dump_period_;
/// Whether to enable fair queueing between task classes in raylet.
@@ -698,8 +707,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// The time that the last heartbeat was sent at. Used to make sure we are
/// keeping up with heartbeats.
uint64_t last_heartbeat_at_ms_;
/// Only the changed part will be included in heartbeat if this is true.
const bool light_heartbeat_enabled_;
/// Only the changed part will be included in resource usage if this is true.
const bool light_report_resource_usage_enabled_;
/// The time that the last debug string was logged to the console.
uint64_t last_debug_dump_at_ms_;
/// The number of heartbeats that we should wait before sending the
@@ -822,25 +822,26 @@ void ClusterResourceScheduler::FreeLocalTaskResources(
UpdateLocalAvailableResourcesFromResourceInstances();
}
void ClusterResourceScheduler::Heartbeat(
bool light_heartbeat_enabled, std::shared_ptr<HeartbeatTableData> heartbeat_data) {
void ClusterResourceScheduler::FillResourceUsage(
bool light_report_resource_usage_enabled,
std::shared_ptr<rpc::ResourcesData> resources_data) {
NodeResources resources;
RAY_CHECK(GetNodeResources(local_node_id_, &resources))
<< "Error: Populating heartbeat failed. Please file a bug report: "
"https://github.com/ray-project/ray/issues/new.";
if (!light_heartbeat_enabled || !last_report_resources_ ||
if (!light_report_resource_usage_enabled || !last_report_resources_ ||
resources != *last_report_resources_.get()) {
for (int i = 0; i < PredefinedResources_MAX; i++) {
const auto &label = ResourceEnumToString((PredefinedResources)i);
const auto &capacity = resources.predefined_resources[i];
if (capacity.available != 0) {
(*heartbeat_data->mutable_resources_available())[label] =
(*resources_data->mutable_resources_available())[label] =
capacity.available.Double();
}
if (capacity.total != 0) {
(*heartbeat_data->mutable_resources_total())[label] = capacity.total.Double();
(*resources_data->mutable_resources_total())[label] = capacity.total.Double();
}
}
for (auto it = resources.custom_resources.begin();
@@ -849,20 +850,20 @@ void ClusterResourceScheduler::Heartbeat(
const auto &capacity = it->second;
const auto &label = string_to_int_map_.Get(custom_id);
if (capacity.available != 0) {
(*heartbeat_data->mutable_resources_available())[label] =
(*resources_data->mutable_resources_available())[label] =
capacity.available.Double();
}
if (capacity.total != 0) {
(*heartbeat_data->mutable_resources_total())[label] = capacity.total.Double();
(*resources_data->mutable_resources_total())[label] = capacity.total.Double();
}
}
heartbeat_data->set_resources_available_changed(true);
if (light_heartbeat_enabled) {
resources_data->set_resources_available_changed(true);
if (light_report_resource_usage_enabled) {
last_report_resources_.reset(new NodeResources(resources));
}
}
if (light_heartbeat_enabled) {
if (light_report_resource_usage_enabled) {
// Reset all local views for remote nodes. This is needed in case tasks that
// we spilled back to a remote node were not actually scheduled on the
// node. Then, the remote node's resource availability may not change and
@@ -345,10 +345,11 @@ class ClusterResourceScheduler {
/// sending raylet <-> gcs heartbeats. In particular, this should fill in
/// resources_available and resources_total.
///
/// \param light_heartbeat_enabled Only send changed fields if true.
/// \param light_report_resource_usage_enabled Only send changed fields if true.
/// \param Output parameter. `resources_available` and `resources_total` are the only
/// fields used.
void Heartbeat(bool light_heartbeat_enabled, std::shared_ptr<HeartbeatTableData> data);
void FillResourceUsage(bool light_report_resource_usage_enabled,
std::shared_ptr<rpc::ResourcesData> resources_data);
/// Return human-readable string for this scheduler state.
std::string DebugString() const;
@@ -1042,8 +1042,8 @@ TEST_F(ClusterResourceSchedulerTest, HeartbeatTest) {
cluster_resources.AddOrUpdateNode(12345, other_node_resources);
{ // Cluster is idle.
auto data = std::make_shared<rpc::HeartbeatTableData>();
cluster_resources.Heartbeat(false, data);
auto data = std::make_shared<rpc::ResourcesData>();
cluster_resources.FillResourceUsage(false, data);
auto available = data->resources_available();
auto total = data->resources_total();
@@ -1080,8 +1080,8 @@ TEST_F(ClusterResourceSchedulerTest, HeartbeatTest) {
{"1", 0.1},
});
cluster_resources.AllocateLocalTaskResources(allocation_map, allocations);
auto data = std::make_shared<rpc::HeartbeatTableData>();
cluster_resources.Heartbeat(false, data);
auto data = std::make_shared<rpc::ResourcesData>();
cluster_resources.FillResourceUsage(false, data);
auto available = data->resources_available();
auto total = data->resources_total();
@@ -1102,32 +1102,32 @@ TEST_F(ClusterResourceSchedulerTest, HeartbeatTest) {
}
}
TEST_F(ClusterResourceSchedulerTest, TestLightHeartbeat) {
TEST_F(ClusterResourceSchedulerTest, TestLightResourceUsageReport) {
std::unordered_map<std::string, double> initial_resources({{"CPU", 1}});
ClusterResourceScheduler cluster_resources("local", initial_resources);
// Report heartbeat on initialization.
auto data = std::make_shared<rpc::HeartbeatTableData>();
cluster_resources.Heartbeat(true, data);
// Fill resource usage usage on initialization.
auto data = std::make_shared<rpc::ResourcesData>();
cluster_resources.FillResourceUsage(true, data);
ASSERT_RESOURCES_EQ(data, 1, 1);
// Don't report heartbeats if resource availability hasn't changed.
// Don't report resource usage if resource availability hasn't changed.
for (int i = 0; i < 3; i++) {
data->Clear();
cluster_resources.Heartbeat(true, data);
cluster_resources.FillResourceUsage(true, data);
ASSERT_RESOURCES_EMPTY(data);
}
// Report heartbeat if resource availability has changed.
// Report resource usage if resource availability has changed.
cluster_resources.AddOrUpdateNode("local", {{"CPU", 1.}}, {{"CPU", 0.}});
data->Clear();
cluster_resources.Heartbeat(true, data);
cluster_resources.FillResourceUsage(true, data);
ASSERT_RESOURCES_EQ(data, 0, 1);
// Don't report heartbeats if resource availability hasn't changed.
// Don't report resource usage if resource availability hasn't changed.
for (int i = 0; i < 3; i++) {
data->Clear();
cluster_resources.Heartbeat(true, data);
cluster_resources.FillResourceUsage(true, data);
ASSERT_RESOURCES_EMPTY(data);
}
}
@@ -1145,20 +1145,20 @@ TEST_F(ClusterResourceSchedulerTest, TestDirtyLocalView) {
ASSERT_TRUE(cluster_resources.AllocateLocalTaskResources(task_spec, task_allocation));
task_allocation = std::make_shared<TaskResourceInstances>();
ASSERT_FALSE(cluster_resources.AllocateLocalTaskResources(task_spec, task_allocation));
// View of local resources is not affected by heartbeats.
auto data = std::make_shared<rpc::HeartbeatTableData>();
cluster_resources.Heartbeat(true, data);
// View of local resources is not affected by resource usage report.
auto data = std::make_shared<rpc::ResourcesData>();
cluster_resources.FillResourceUsage(true, data);
ASSERT_FALSE(cluster_resources.AllocateLocalTaskResources(task_spec, task_allocation));
for (int num_slots_available = 0; num_slots_available <= 2; num_slots_available++) {
// Remote node reports updated resource availability.
cluster_resources.AddOrUpdateNode("remote", {{"CPU", 2.}},
{{"CPU", num_slots_available}});
auto data = std::make_shared<rpc::HeartbeatTableData>();
auto data = std::make_shared<rpc::ResourcesData>();
int64_t t;
for (int i = 0; i < 3; i++) {
// Heartbeat tick should reset the remote node's resources.
cluster_resources.Heartbeat(true, data);
// Resource usage report tick should reset the remote node's resources.
cluster_resources.FillResourceUsage(true, data);
for (int j = 0; j < num_slots_available; j++) {
ASSERT_EQ(cluster_resources.GetBestSchedulableNode(task_spec, false, &t),
"remote");
@@ -272,8 +272,9 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) {
return false;
}
void ClusterTaskManager::Heartbeat(bool light_heartbeat_enabled,
std::shared_ptr<HeartbeatTableData> data) const {
void ClusterTaskManager::FillResourceUsage(
bool light_report_resource_usage_enabled,
std::shared_ptr<rpc::ResourcesData> data) const {
// TODO (WangTao): Find a way to check if load changed and combine it with light
// heartbeat. Now we just report it every time.
data->set_resource_load_changed(true);
@@ -107,11 +107,11 @@ class ClusterTaskManager {
/// sending raylet <-> gcs heartbeats. In particular, this should fill in
/// resource_load and resource_load_by_shape.
///
/// \param light_heartbeat_enabled Only send changed fields if true.
/// \param light_report_resource_usage_enabled Only send changed fields if true.
/// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only
/// fields used.
void Heartbeat(bool light_heartbeat_enabled,
std::shared_ptr<HeartbeatTableData> data) const;
void FillResourceUsage(bool light_report_resource_usage_enabled,
std::shared_ptr<rpc::ResourcesData> data) const;
std::string DebugString() const;
@@ -437,8 +437,8 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) {
}
{
auto data = std::make_shared<rpc::HeartbeatTableData>();
task_manager_.Heartbeat(false, data);
auto data = std::make_shared<rpc::ResourcesData>();
task_manager_.FillResourceUsage(false, data);
auto load_by_shape =
data->mutable_resource_load_by_shape()->mutable_resource_demands();
+7 -9
View File
@@ -97,6 +97,8 @@ class GcsRpcClient {
new GrpcClient<ActorInfoGcsService>(address, port, client_call_manager));
node_info_grpc_client_ = std::unique_ptr<GrpcClient<NodeInfoGcsService>>(
new GrpcClient<NodeInfoGcsService>(address, port, client_call_manager));
heartbeat_info_grpc_client_ = std::unique_ptr<GrpcClient<HeartbeatInfoGcsService>>(
new GrpcClient<HeartbeatInfoGcsService>(address, port, client_call_manager));
object_info_grpc_client_ = std::unique_ptr<GrpcClient<ObjectInfoGcsService>>(
new GrpcClient<ObjectInfoGcsService>(address, port, client_call_manager));
task_info_grpc_client_ = std::unique_ptr<GrpcClient<TaskInfoGcsService>>(
@@ -155,15 +157,6 @@ class GcsRpcClient {
/// Get information of all nodes from GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, GetAllNodeInfo, node_info_grpc_client_, )
/// Report heartbeat of a node to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, ReportHeartbeat,
node_info_grpc_client_, )
/// Get newest heartbeat of all nodes from GCS Service. Only used when light heartbeat
/// enabled.
VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, GetAllHeartbeat,
node_info_grpc_client_, )
/// Report resource usage of a node to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, ReportResourceUsage,
node_info_grpc_client_, )
@@ -195,6 +188,10 @@ class GcsRpcClient {
VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, GetAllAvailableResources,
node_info_grpc_client_, )
/// Report heartbeat of a node to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(HeartbeatInfoGcsService, ReportHeartbeat,
heartbeat_info_grpc_client_, )
/// Get object's locations from GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ObjectInfoGcsService, GetObjectLocations,
object_info_grpc_client_, )
@@ -278,6 +275,7 @@ class GcsRpcClient {
std::unique_ptr<GrpcClient<JobInfoGcsService>> job_info_grpc_client_;
std::unique_ptr<GrpcClient<ActorInfoGcsService>> actor_info_grpc_client_;
std::unique_ptr<GrpcClient<NodeInfoGcsService>> node_info_grpc_client_;
std::unique_ptr<GrpcClient<HeartbeatInfoGcsService>> heartbeat_info_grpc_client_;
std::unique_ptr<GrpcClient<ObjectInfoGcsService>> object_info_grpc_client_;
std::unique_ptr<GrpcClient<TaskInfoGcsService>> task_info_grpc_client_;
std::unique_ptr<GrpcClient<StatsGcsService>> stats_grpc_client_;
+36 -10
View File
@@ -30,6 +30,9 @@ namespace rpc {
#define NODE_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(NodeInfoGcsService, HANDLER)
#define HEARTBEAT_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(HeartbeatInfoGcsService, HANDLER)
#define OBJECT_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(ObjectInfoGcsService, HANDLER)
@@ -177,14 +180,6 @@ class NodeInfoGcsServiceHandler {
GetAllNodeInfoReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleReportHeartbeat(const ReportHeartbeatRequest &request,
ReportHeartbeatReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleGetAllHeartbeat(const GetAllHeartbeatRequest &request,
GetAllHeartbeatReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleReportResourceUsage(const ReportResourceUsageRequest &request,
ReportResourceUsageReply *reply,
SendReplyCallback send_reply_callback) = 0;
@@ -238,8 +233,6 @@ class NodeInfoGrpcService : public GrpcService {
NODE_INFO_SERVICE_RPC_HANDLER(RegisterNode);
NODE_INFO_SERVICE_RPC_HANDLER(UnregisterNode);
NODE_INFO_SERVICE_RPC_HANDLER(GetAllNodeInfo);
NODE_INFO_SERVICE_RPC_HANDLER(ReportHeartbeat);
NODE_INFO_SERVICE_RPC_HANDLER(GetAllHeartbeat);
NODE_INFO_SERVICE_RPC_HANDLER(ReportResourceUsage);
NODE_INFO_SERVICE_RPC_HANDLER(GetAllResourceUsage);
NODE_INFO_SERVICE_RPC_HANDLER(GetResources);
@@ -257,6 +250,38 @@ class NodeInfoGrpcService : public GrpcService {
NodeInfoGcsServiceHandler &service_handler_;
};
class HeartbeatInfoGcsServiceHandler {
public:
virtual ~HeartbeatInfoGcsServiceHandler() = default;
virtual void HandleReportHeartbeat(const ReportHeartbeatRequest &request,
ReportHeartbeatReply *reply,
SendReplyCallback send_reply_callback) = 0;
};
/// The `GrpcService` for `HeartbeatInfoGcsService`.
class HeartbeatInfoGrpcService : public GrpcService {
public:
/// Constructor.
///
/// \param[in] handler The service handler that actually handle the requests.
explicit HeartbeatInfoGrpcService(boost::asio::io_service &io_service,
HeartbeatInfoGcsServiceHandler &handler)
: GrpcService(io_service), service_handler_(handler){};
protected:
grpc::Service &GetGrpcService() override { return service_; }
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
HEARTBEAT_INFO_SERVICE_RPC_HANDLER(ReportHeartbeat);
}
private:
/// The grpc async service object.
HeartbeatInfoGcsService::AsyncService service_;
/// The service handler that actually handle the requests.
HeartbeatInfoGcsServiceHandler &service_handler_;
};
class ObjectInfoGcsServiceHandler {
public:
virtual ~ObjectInfoGcsServiceHandler() = default;
@@ -514,6 +539,7 @@ class PlacementGroupInfoGrpcService : public GrpcService {
using JobInfoHandler = JobInfoGcsServiceHandler;
using ActorInfoHandler = ActorInfoGcsServiceHandler;
using NodeInfoHandler = NodeInfoGcsServiceHandler;
using HeartbeatInfoHandler = HeartbeatInfoGcsServiceHandler;
using ObjectInfoHandler = ObjectInfoGcsServiceHandler;
using TaskInfoHandler = TaskInfoGcsServiceHandler;
using StatsHandler = StatsGcsServiceHandler;