From 295b6e5ce46b08823b4b9fbaba1d813bf376882c Mon Sep 17 00:00:00 2001 From: Tao Wang Date: Fri, 11 Dec 2020 21:19:57 +0800 Subject: [PATCH] Split heartbeat message (#12535) * first * xxx * Split heartbeat message * only report resource usage when changed * Fix GetAllResourceUsage * Fix report resource usage * Increase default heartbeat interval * regularize heartbeat interval in test case --- python/ray/gcs_utils.py | 12 +- python/ray/includes/global_state_accessor.pxd | 2 +- python/ray/includes/global_state_accessor.pxi | 6 +- python/ray/includes/ray_config.pxd | 2 +- python/ray/includes/ray_config.pxi | 4 +- python/ray/monitor.py | 22 +- python/ray/tests/test_actor_advanced.py | 29 +- python/ray/tests/test_actor_resources.py | 2 +- python/ray/tests/test_advanced_3.py | 7 +- python/ray/tests/test_component_failures_2.py | 2 +- python/ray/tests/test_gcs_fault_tolerance.py | 2 +- python/ray/tests/test_global_state.py | 46 +-- python/ray/tests/test_multi_node_2.py | 6 +- python/ray/tests/test_placement_group.py | 6 +- src/ray/common/ray_config_def.h | 14 +- src/ray/gcs/accessor.h | 36 ++- .../gcs/gcs_client/global_state_accessor.cc | 12 +- .../gcs/gcs_client/global_state_accessor.h | 10 +- .../gcs/gcs_client/service_based_accessor.cc | 141 +++++---- .../gcs/gcs_client/service_based_accessor.h | 27 +- .../gcs_client/service_based_gcs_client.cc | 4 +- .../test/global_state_accessor_test.cc | 76 ++--- .../test/service_based_gcs_client_test.cc | 171 +++++------ .../gcs/gcs_server/gcs_heartbeat_manager.cc | 111 +++++++ .../gcs/gcs_server/gcs_heartbeat_manager.h | 89 ++++++ src/ray/gcs/gcs_server/gcs_node_manager.cc | 280 ++++++------------ src/ray/gcs/gcs_server/gcs_node_manager.h | 121 ++------ src/ray/gcs/gcs_server/gcs_server.cc | 40 ++- src/ray/gcs/gcs_server/gcs_server.h | 15 +- src/ray/gcs/gcs_server/gcs_table_storage.cc | 2 +- src/ray/gcs/gcs_server/gcs_table_storage.h | 20 +- .../test/gcs_actor_scheduler_test.cc | 5 +- .../gcs_server/test/gcs_node_manager_test.cc | 8 +- .../test/gcs_object_manager_test.cc | 5 +- .../test/gcs_placement_group_manager_test.cc | 5 +- .../gcs_placement_group_scheduler_test.cc | 13 +- .../gcs_server/test/gcs_server_test_util.h | 4 +- src/ray/gcs/pubsub/gcs_pub_sub.h | 2 +- src/ray/gcs/redis_accessor.cc | 19 +- src/ray/gcs/redis_accessor.h | 19 +- src/ray/gcs/redis_gcs_client.cc | 6 +- src/ray/gcs/redis_gcs_client.h | 4 +- src/ray/gcs/subscription_executor.cc | 3 +- src/ray/gcs/tables.cc | 4 +- src/ray/gcs/tables.h | 14 +- src/ray/protobuf/gcs.proto | 34 +-- src/ray/protobuf/gcs_service.proto | 20 +- src/ray/raylet/main.cc | 2 + src/ray/raylet/node_manager.cc | 272 +++++++++-------- src/ray/raylet/node_manager.h | 29 +- .../scheduling/cluster_resource_scheduler.cc | 21 +- .../scheduling/cluster_resource_scheduler.h | 5 +- .../cluster_resource_scheduler_test.cc | 40 +-- .../raylet/scheduling/cluster_task_manager.cc | 5 +- .../raylet/scheduling/cluster_task_manager.h | 6 +- .../scheduling/cluster_task_manager_test.cc | 4 +- src/ray/rpc/gcs_server/gcs_rpc_client.h | 16 +- src/ray/rpc/gcs_server/gcs_rpc_server.h | 46 ++- 58 files changed, 1018 insertions(+), 910 deletions(-) create mode 100644 src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc create mode 100644 src/ray/gcs/gcs_server/gcs_heartbeat_manager.h diff --git a/python/ray/gcs_utils.py b/python/ray/gcs_utils.py index ef95b4c0a..2aed4526d 100644 --- a/python/ray/gcs_utils.py +++ b/python/ray/gcs_utils.py @@ -7,8 +7,8 @@ from ray.core.generated.gcs_pb2 import ( JobConfig, ErrorTableData, GcsEntry, - HeartbeatBatchTableData, - HeartbeatTableData, + ResourceUsageBatchData, + ResourcesData, ObjectTableData, ProfileTableData, TablePrefix, @@ -33,8 +33,8 @@ __all__ = [ "ErrorTableData", "ErrorType", "GcsEntry", - "HeartbeatBatchTableData", - "HeartbeatTableData", + "ResourceUsageBatchData", + "ResourcesData", "ObjectTableData", "ProfileTableData", "TablePrefix", @@ -55,8 +55,8 @@ FUNCTION_PREFIX = "RemoteFunction:" LOG_FILE_CHANNEL = "RAY_LOG_CHANNEL" REPORTER_CHANNEL = "RAY_REPORTER" -# xray heartbeats -XRAY_HEARTBEAT_BATCH_PATTERN = "HEARTBEAT_BATCH:".encode("ascii") +# xray resource usages +XRAY_RESOURCES_BATCH_PATTERN = "RESOURCES_BATCH:".encode("ascii") # xray job updates XRAY_JOB_PATTERN = "JOB:*".encode("ascii") diff --git a/python/ray/includes/global_state_accessor.pxd b/python/ray/includes/global_state_accessor.pxd index 2d0c70c32..31418f10c 100644 --- a/python/ray/includes/global_state_accessor.pxd +++ b/python/ray/includes/global_state_accessor.pxd @@ -23,7 +23,7 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil: c_vector[c_string] GetAllProfileInfo() c_vector[c_string] GetAllObjectInfo() unique_ptr[c_string] GetObjectInfo(const CObjectID &object_id) - unique_ptr[c_string] GetAllHeartbeat() + unique_ptr[c_string] GetAllResourceUsage() c_vector[c_string] GetAllActorInfo() unique_ptr[c_string] GetActorInfo(const CActorID &actor_id) c_string GetNodeResourceInfo(const CNodeID &node_id) diff --git a/python/ray/includes/global_state_accessor.pxi b/python/ray/includes/global_state_accessor.pxi index 25a88028b..cbb1bac0a 100644 --- a/python/ray/includes/global_state_accessor.pxi +++ b/python/ray/includes/global_state_accessor.pxi @@ -78,11 +78,11 @@ cdef class GlobalStateAccessor: return c_string(object_info.get().data(), object_info.get().size()) return None - def get_all_heartbeat(self): - """Get newest heartbeat of all nodes from GCS service.""" + def get_all_resource_usage(self): + """Get newest resource usage of all nodes from GCS service.""" cdef unique_ptr[c_string] result with nogil: - result = self.inner.get().GetAllHeartbeat() + result = self.inner.get().GetAllResourceUsage() if result: return c_string(result.get().data(), result.get().size()) return None diff --git a/python/ray/includes/ray_config.pxd b/python/ray/includes/ray_config.pxd index 1efce3343..03979863d 100644 --- a/python/ray/includes/ray_config.pxd +++ b/python/ray/includes/ray_config.pxd @@ -15,7 +15,7 @@ cdef extern from "ray/common/ray_config.h" nogil: int64_t raylet_heartbeat_timeout_milliseconds() const - c_bool light_heartbeat_enabled() const + c_bool light_report_resource_usage_enabled() const int64_t debug_dump_period_milliseconds() const diff --git a/python/ray/includes/ray_config.pxi b/python/ray/includes/ray_config.pxi index 766782fa1..b9e26cf08 100644 --- a/python/ray/includes/ray_config.pxi +++ b/python/ray/includes/ray_config.pxi @@ -14,8 +14,8 @@ cdef class Config: return RayConfig.instance().raylet_heartbeat_timeout_milliseconds() @staticmethod - def light_heartbeat_enabled(): - return RayConfig.instance().light_heartbeat_enabled() + def light_report_resource_usage_enabled(): + return RayConfig.instance().light_report_resource_usage_enabled() @staticmethod def debug_dump_period_milliseconds(): diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 5f93e535a..edebb14fa 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -139,24 +139,24 @@ class Monitor: self.primary_subscribe_client.subscribe(channel) def update_load_metrics(self): - """Fetches heartbeat data from GCS and updates load metrics.""" + """Fetches resource usage data from GCS and updates load metrics.""" - all_heartbeat = self.global_state_accessor.get_all_heartbeat() - heartbeat_batch_data = \ - ray.gcs_utils.HeartbeatBatchTableData.FromString(all_heartbeat) - for heartbeat_message in heartbeat_batch_data.batch: - resource_load = dict(heartbeat_message.resource_load) - total_resources = dict(heartbeat_message.resources_total) - available_resources = dict(heartbeat_message.resources_available) + all_resources = self.global_state_accessor.get_all_resource_usage() + resources_batch_data = \ + ray.gcs_utils.ResourceUsageBatchData.FromString(all_resources) + for resource_message in resources_batch_data.batch: + resource_load = dict(resource_message.resource_load) + total_resources = dict(resource_message.resources_total) + available_resources = dict(resource_message.resources_available) waiting_bundles, infeasible_bundles = parse_resource_demands( - heartbeat_batch_data.resource_load_by_shape) + resources_batch_data.resource_load_by_shape) pending_placement_groups = list( - heartbeat_batch_data.placement_group_load.placement_group_data) + resources_batch_data.placement_group_load.placement_group_data) # Update the load metrics for this raylet. - node_id = ray.utils.binary_to_hex(heartbeat_message.node_id) + node_id = ray.utils.binary_to_hex(resource_message.node_id) ip = self.raylet_id_to_ip_map.get(node_id) if ip: self.load_metrics.update(ip, total_resources, diff --git a/python/ray/tests/test_actor_advanced.py b/python/ray/tests/test_actor_advanced.py index af490eb77..8bc6801f8 100644 --- a/python/ray/tests/test_actor_advanced.py +++ b/python/ray/tests/test_actor_advanced.py @@ -1055,11 +1055,11 @@ def test_actor_resource_demand(shutdown_only): ray.get(a.foo.remote()) time.sleep(1) - message = global_state_accessor.get_all_heartbeat() - heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(message) + message = global_state_accessor.get_all_resource_usage() + resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString(message) # The actor is scheduled so there should be no more demands left. - assert len(heartbeat.resource_load_by_shape.resource_demands) == 0 + assert len(resource_usages.resource_load_by_shape.resource_demands) == 0 @ray.remote(num_cpus=80) class Actor2: @@ -1070,23 +1070,24 @@ def test_actor_resource_demand(shutdown_only): time.sleep(1) # This actor cannot be scheduled. - message = global_state_accessor.get_all_heartbeat() - heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(message) - assert len(heartbeat.resource_load_by_shape.resource_demands) == 1 - assert (heartbeat.resource_load_by_shape.resource_demands[0].shape == { - "CPU": 80.0 - }) - assert (heartbeat.resource_load_by_shape.resource_demands[0] + message = global_state_accessor.get_all_resource_usage() + resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString(message) + assert len(resource_usages.resource_load_by_shape.resource_demands) == 1 + assert ( + resource_usages.resource_load_by_shape.resource_demands[0].shape == { + "CPU": 80.0 + }) + assert (resource_usages.resource_load_by_shape.resource_demands[0] .num_infeasible_requests_queued == 1) actors.append(Actor2.remote()) time.sleep(1) # Two actors cannot be scheduled. - message = global_state_accessor.get_all_heartbeat() - heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(message) - assert len(heartbeat.resource_load_by_shape.resource_demands) == 1 - assert (heartbeat.resource_load_by_shape.resource_demands[0] + message = global_state_accessor.get_all_resource_usage() + resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString(message) + assert len(resource_usages.resource_load_by_shape.resource_demands) == 1 + assert (resource_usages.resource_load_by_shape.resource_demands[0] .num_infeasible_requests_queued == 2) global_state_accessor.disconnect() diff --git a/python/ray/tests/test_actor_resources.py b/python/ray/tests/test_actor_resources.py index f0c254c5f..65357fb8c 100644 --- a/python/ray/tests/test_actor_resources.py +++ b/python/ray/tests/test_actor_resources.py @@ -237,7 +237,7 @@ def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster): cluster.add_node( num_cpus=10 * num_gpus_per_raylet, num_gpus=num_gpus_per_raylet, - _system_config={"num_heartbeats_timeout": 1000} if i == 0 else {}) + _system_config={"num_heartbeats_timeout": 100} if i == 0 else {}) ray.init(address=cluster.address) @ray.remote diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 7f1e8e639..4fffeb5b0 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -610,9 +610,10 @@ def test_lease_request_leak(shutdown_only): del obj_ref ray.get(tasks) - time.sleep( - 1) # Sleep for an amount longer than the reconstruction timeout. - assert len(ray.objects()) == 0, ray.objects() + def _no_objects(): + return len(ray.objects()) == 0 + + wait_for_condition(_no_objects, timeout=10) @pytest.mark.parametrize( diff --git a/python/ray/tests/test_component_failures_2.py b/python/ray/tests/test_component_failures_2.py index 2235c5745..c45156ba6 100644 --- a/python/ray/tests/test_component_failures_2.py +++ b/python/ray/tests/test_component_failures_2.py @@ -143,7 +143,7 @@ def check_components_alive(cluster, component_type, check_component_alive): "num_cpus": 8, "num_nodes": 4, "_system_config": { - "num_heartbeats_timeout": 100 + "num_heartbeats_timeout": 10 }, }], indirect=True) diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index 32f20d42a..d13dbf75a 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -76,7 +76,7 @@ def test_gcs_server_restart_during_actor_creation(ray_start_regular): @pytest.mark.parametrize( "ray_start_cluster_head", [ generate_system_config_map( - num_heartbeats_timeout=20, ping_gcs_rpc_server_max_retries=60) + num_heartbeats_timeout=2, ping_gcs_rpc_server_max_retries=60) ], indirect=True) def test_node_failure_detector_when_gcs_server_restart(ray_start_cluster_head): diff --git a/python/ray/tests/test_global_state.py b/python/ray/tests/test_global_state.py index 967d6d7ea..21808eb2b 100644 --- a/python/ray/tests/test_global_state.py +++ b/python/ray/tests/test_global_state.py @@ -173,13 +173,14 @@ def test_load_report(shutdown_only, max_shapes): self.report = None def check_load_report(self): - message = global_state_accessor.get_all_heartbeat() + message = global_state_accessor.get_all_resource_usage() if message is None: return False - heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString( + resource_usage = ray.gcs_utils.ResourceUsageBatchData.FromString( message) - self.report = heartbeat.resource_load_by_shape.resource_demands + self.report = \ + resource_usage.resource_load_by_shape.resource_demands if max_shapes == 0: return True elif max_shapes == 2: @@ -227,40 +228,40 @@ def test_placement_group_load_report(ray_start_cluster): class PgLoadChecker: def nothing_is_ready(self): - heartbeat = self._read_heartbeat() - if not heartbeat: + resource_usage = self._read_resource_usage() + if not resource_usage: return False - if heartbeat.HasField("placement_group_load"): - pg_load = heartbeat.placement_group_load + if resource_usage.HasField("placement_group_load"): + pg_load = resource_usage.placement_group_load return len(pg_load.placement_group_data) == 2 return False def only_first_one_ready(self): - heartbeat = self._read_heartbeat() - if not heartbeat: + resource_usage = self._read_resource_usage() + if not resource_usage: return False - if heartbeat.HasField("placement_group_load"): - pg_load = heartbeat.placement_group_load + if resource_usage.HasField("placement_group_load"): + pg_load = resource_usage.placement_group_load return len(pg_load.placement_group_data) == 1 return False def two_infeasible_pg(self): - heartbeat = self._read_heartbeat() - if not heartbeat: + resource_usage = self._read_resource_usage() + if not resource_usage: return False - if heartbeat.HasField("placement_group_load"): - pg_load = heartbeat.placement_group_load + if resource_usage.HasField("placement_group_load"): + pg_load = resource_usage.placement_group_load return len(pg_load.placement_group_data) == 2 return False - def _read_heartbeat(self): - message = global_state_accessor.get_all_heartbeat() + def _read_resource_usage(self): + message = global_state_accessor.get_all_resource_usage() if message is None: return False - heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString( + resource_usage = ray.gcs_utils.ResourceUsageBatchData.FromString( message) - return heartbeat + return resource_usage checker = PgLoadChecker() @@ -301,13 +302,14 @@ def test_backlog_report(shutdown_only): return None def backlog_size_set(): - message = global_state_accessor.get_all_heartbeat() + message = global_state_accessor.get_all_resource_usage() if message is None: return False - heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(message) + resource_usage = ray.gcs_utils.ResourceUsageBatchData.FromString( + message) aggregate_resource_load = \ - heartbeat.resource_load_by_shape.resource_demands + resource_usage.resource_load_by_shape.resource_demands if len(aggregate_resource_load) == 1: backlog_size = aggregate_resource_load[0].backlog_size print(backlog_size) diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index 6578bdeb9..d53d243c0 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -34,7 +34,7 @@ def test_shutdown(): @pytest.mark.parametrize( "ray_start_cluster_head", [ generate_system_config_map( - num_heartbeats_timeout=20, object_timeout_milliseconds=12345) + num_heartbeats_timeout=2, object_timeout_milliseconds=12345) ], indirect=True) def test_system_config(ray_start_cluster_head): @@ -52,12 +52,12 @@ def test_system_config(ray_start_cluster_head): @ray.remote def f(): assert ray._config.object_timeout_milliseconds() == 12345 - assert ray._config.num_heartbeats_timeout() == 20 + assert ray._config.num_heartbeats_timeout() == 2 ray.get([f.remote() for _ in range(5)]) cluster.remove_node(worker, allow_graceful=False) - time.sleep(1) + time.sleep(0.9) assert ray.cluster_resources()["CPU"] == 2 time.sleep(2) diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 0a87d9b8f..641dbc1e1 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -1165,7 +1165,7 @@ ray.shutdown() @pytest.mark.parametrize( "ray_start_cluster_head", [ generate_system_config_map( - num_heartbeats_timeout=20, ping_gcs_rpc_server_max_retries=60) + num_heartbeats_timeout=3, ping_gcs_rpc_server_max_retries=60) ], indirect=True) def test_create_placement_group_after_gcs_server_restart( @@ -1203,7 +1203,7 @@ def test_create_placement_group_after_gcs_server_restart( @pytest.mark.parametrize( "ray_start_cluster_head", [ generate_system_config_map( - num_heartbeats_timeout=20, ping_gcs_rpc_server_max_retries=60) + num_heartbeats_timeout=3, ping_gcs_rpc_server_max_retries=60) ], indirect=True) def test_create_actor_with_placement_group_after_gcs_server_restart( @@ -1227,7 +1227,7 @@ def test_create_actor_with_placement_group_after_gcs_server_restart( @pytest.mark.parametrize( "ray_start_cluster_head", [ generate_system_config_map( - num_heartbeats_timeout=20, ping_gcs_rpc_server_max_retries=60) + num_heartbeats_timeout=3, ping_gcs_rpc_server_max_retries=60) ], indirect=True) def test_create_placement_group_during_gcs_server_restart( diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 75208e5c5..4d1110e20 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -35,20 +35,22 @@ RAY_CONFIG(int64_t, ray_cookie, 0x5241590000000000) RAY_CONFIG(int64_t, handler_warning_timeout_ms, 100) /// The duration between heartbeats sent by the raylets. -RAY_CONFIG(int64_t, raylet_heartbeat_timeout_milliseconds, 100) -/// Whether to send heartbeat lightly. When it is enalbed, only changed part, -/// like should_global_gc or changed resources, will be included in the heartbeat, -/// and gcs only broadcast the changed heartbeat. -RAY_CONFIG(bool, light_heartbeat_enabled, true) +RAY_CONFIG(int64_t, raylet_heartbeat_timeout_milliseconds, 1000) /// If a component has not sent a heartbeat in the last num_heartbeats_timeout /// heartbeat intervals, the raylet monitor process will report /// it as dead to the db_client table. -RAY_CONFIG(int64_t, num_heartbeats_timeout, 300) +RAY_CONFIG(int64_t, num_heartbeats_timeout, 30) /// For a raylet, if the last heartbeat was sent more than this many /// heartbeat periods ago, then a warning will be logged that the heartbeat /// handler is drifting. RAY_CONFIG(uint64_t, num_heartbeats_warning, 5) +/// The duration between reporting resources sent by the raylets. +RAY_CONFIG(int64_t, raylet_report_resources_period_milliseconds, 100) +/// Whether to report resource usage lightly. When it is enalbed, only changed part, +/// like should_global_gc or changed resources, will be included in the message. +RAY_CONFIG(bool, light_report_resource_usage_enabled, true) + /// The duration between dumping debug info to logs, or -1 to disable. RAY_CONFIG(int64_t, debug_dump_period_milliseconds, 10000) diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 82442535c..ce932fd59 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -543,30 +543,38 @@ class NodeInfoAccessor { const std::shared_ptr &data_ptr, const StatusCallback &callback) = 0; - /// Resend heartbeat when GCS restarts from a failure. - virtual void AsyncReReportHeartbeat() = 0; + /// Report resource usage of a node to GCS asynchronously. + /// + /// \param data_ptr The data that will be reported to GCS. + /// \param callback Callback that will be called after report finishes. + /// \return Status + virtual Status AsyncReportResourceUsage( + const std::shared_ptr &data_ptr, + const StatusCallback &callback) = 0; + + /// Resend resource usage when GCS restarts from a failure. + virtual void AsyncReReportResourceUsage() = 0; /// Return resources in last report. Used by light heartbeat. - std::shared_ptr &GetLastHeartbeatResources() { - return last_heartbeat_resources_; + std::shared_ptr &GetLastResourceUsage() { + return last_resource_usage_; } - /// Get newest heartbeat of all nodes from GCS asynchronously. Only used when light - /// heartbeat enabled. + /// Get newest resource usage of all nodes from GCS asynchronously. /// /// \param callback Callback that will be called after lookup finishes. /// \return Status - virtual Status AsyncGetAllHeartbeat( - const ItemCallback &callback) = 0; + virtual Status AsyncGetAllResourceUsage( + const ItemCallback &callback) = 0; /// Subscribe batched state of all nodes from GCS. /// - /// \param subscribe Callback that will be called each time when batch heartbeat is + /// \param subscribe Callback that will be called each time when batch resource usage is /// updated. /// \param done Callback that will be called when subscription is complete. /// \return Status - virtual Status AsyncSubscribeBatchHeartbeat( - const ItemCallback &subscribe, + virtual Status AsyncSubscribeBatchedResourceUsage( + const ItemCallback &subscribe, const StatusCallback &done) = 0; /// Reestablish subscription. @@ -598,9 +606,9 @@ class NodeInfoAccessor { NodeInfoAccessor() = default; private: - /// Cache which stores resources in last heartbeat used to check if they are changed. - /// Used by light heartbeat. - std::shared_ptr last_heartbeat_resources_ = + /// Cache which stores resource usage in last report used to check if they are changed. + /// Used by light resource usage report. + std::shared_ptr last_resource_usage_ = std::make_shared(); }; diff --git a/src/ray/gcs/gcs_client/global_state_accessor.cc b/src/ray/gcs/gcs_client/global_state_accessor.cc index a1162ced0..8940ef6d0 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.cc +++ b/src/ray/gcs/gcs_client/global_state_accessor.cc @@ -174,14 +174,14 @@ std::string GlobalStateAccessor::GetInternalConfig() { return config_proto.SerializeAsString(); } -std::unique_ptr GlobalStateAccessor::GetAllHeartbeat() { - std::unique_ptr heartbeat_batch_data; +std::unique_ptr GlobalStateAccessor::GetAllResourceUsage() { + std::unique_ptr resource_batch_data; std::promise promise; - RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAllHeartbeat( - TransformForItemCallback(heartbeat_batch_data, - promise))); + RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAllResourceUsage( + TransformForItemCallback(resource_batch_data, + promise))); promise.get_future().get(); - return heartbeat_batch_data; + return resource_batch_data; } std::vector GlobalStateAccessor::GetAllActorInfo() { diff --git a/src/ray/gcs/gcs_client/global_state_accessor.h b/src/ray/gcs/gcs_client/global_state_accessor.h index 01c3ebb12..0c5695780 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.h +++ b/src/ray/gcs/gcs_client/global_state_accessor.h @@ -99,13 +99,13 @@ class GlobalStateAccessor { /// and serialized as a string to allow multi-language support. std::string GetInternalConfig(); - /// Get newest heartbeat of all nodes from GCS Service. Only used when light - /// heartbeat enabled. + /// Get newest resource usage of all nodes from GCS Service. Only used when light + /// rerouce usage report enabled. /// - /// \return node heartbeat info. To support multi-language, we serialize each - /// HeartbeatTableData and return the serialized string. Where used, it needs to be + /// \return resource usage info. To support multi-language, we serialize each + /// data and return the serialized string. Where used, it needs to be /// deserialized with protobuf function. - std::unique_ptr GetAllHeartbeat(); + std::unique_ptr GetAllResourceUsage(); /// Get information of all actors from GCS Service. /// diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 2fbbb5c34..66179249b 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -609,11 +609,10 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToResources( Status ServiceBasedNodeInfoAccessor::AsyncReportHeartbeat( const std::shared_ptr &data_ptr, const StatusCallback &callback) { - absl::MutexLock lock(&mutex_); - cached_heartbeat_.mutable_heartbeat()->CopyFrom(*data_ptr); + rpc::ReportHeartbeatRequest request; + request.mutable_heartbeat()->CopyFrom(*data_ptr); client_impl_->GetGcsRpcClient().ReportHeartbeat( - cached_heartbeat_, - [callback](const Status &status, const rpc::ReportHeartbeatReply &reply) { + request, [callback](const Status &status, const rpc::ReportHeartbeatReply &reply) { if (callback) { callback(status); } @@ -621,74 +620,90 @@ Status ServiceBasedNodeInfoAccessor::AsyncReportHeartbeat( return Status::OK(); } -void ServiceBasedNodeInfoAccessor::AsyncReReportHeartbeat() { +Status ServiceBasedNodeInfoAccessor::AsyncReportResourceUsage( + const std::shared_ptr &data_ptr, const StatusCallback &callback) { absl::MutexLock lock(&mutex_); - if (cached_heartbeat_.has_heartbeat()) { - RAY_LOG(INFO) << "Rereport heartbeat."; - FillHeartbeatRequest(cached_heartbeat_); - client_impl_->GetGcsRpcClient().ReportHeartbeat( - cached_heartbeat_, - [](const Status &status, const rpc::ReportHeartbeatReply &reply) {}); - } -} - -void ServiceBasedNodeInfoAccessor::FillHeartbeatRequest( - rpc::ReportHeartbeatRequest &heartbeat) { - if (RayConfig::instance().light_heartbeat_enabled()) { - SchedulingResources cached_resources = - SchedulingResources(*GetLastHeartbeatResources()); - - auto heartbeat_data = heartbeat.mutable_heartbeat(); - heartbeat_data->clear_resources_total(); - for (const auto &resource_pair : - cached_resources.GetTotalResources().GetResourceMap()) { - (*heartbeat_data->mutable_resources_total())[resource_pair.first] = - resource_pair.second; - } - - heartbeat_data->clear_resources_available(); - heartbeat_data->set_resources_available_changed(true); - for (const auto &resource_pair : - cached_resources.GetAvailableResources().GetResourceMap()) { - (*heartbeat_data->mutable_resources_available())[resource_pair.first] = - resource_pair.second; - } - - heartbeat_data->clear_resource_load(); - heartbeat_data->set_resource_load_changed(true); - for (const auto &resource_pair : - cached_resources.GetLoadResources().GetResourceMap()) { - (*heartbeat_data->mutable_resource_load())[resource_pair.first] = - resource_pair.second; - } - } -} - -Status ServiceBasedNodeInfoAccessor::AsyncGetAllHeartbeat( - const ItemCallback &callback) { - rpc::GetAllHeartbeatRequest request; - client_impl_->GetGcsRpcClient().GetAllHeartbeat( - request, [callback](const Status &status, const rpc::GetAllHeartbeatReply &reply) { - callback(reply.heartbeat_data()); - RAY_LOG(DEBUG) << "Finished getting heartbeat of all nodes, status = " << status; + cached_resource_usage_.mutable_resources()->CopyFrom(*data_ptr); + client_impl_->GetGcsRpcClient().ReportResourceUsage( + cached_resource_usage_, + [callback](const Status &status, const rpc::ReportResourceUsageReply &reply) { + if (callback) { + callback(status); + } }); return Status::OK(); } -Status ServiceBasedNodeInfoAccessor::AsyncSubscribeBatchHeartbeat( - const ItemCallback &subscribe, +void ServiceBasedNodeInfoAccessor::AsyncReReportResourceUsage() { + absl::MutexLock lock(&mutex_); + if (cached_resource_usage_.has_resources()) { + RAY_LOG(INFO) << "Rereport resource usage."; + FillResourceUsageRequest(cached_resource_usage_); + client_impl_->GetGcsRpcClient().ReportResourceUsage( + cached_resource_usage_, + [](const Status &status, const rpc::ReportResourceUsageReply &reply) {}); + } +} + +void ServiceBasedNodeInfoAccessor::FillResourceUsageRequest( + rpc::ReportResourceUsageRequest &resources) { + if (RayConfig::instance().light_report_resource_usage_enabled()) { + SchedulingResources cached_resources = SchedulingResources(*GetLastResourceUsage()); + + auto resources_data = resources.mutable_resources(); + resources_data->clear_resources_total(); + for (const auto &resource_pair : + cached_resources.GetTotalResources().GetResourceMap()) { + (*resources_data->mutable_resources_total())[resource_pair.first] = + resource_pair.second; + } + + resources_data->clear_resources_available(); + resources_data->set_resources_available_changed(true); + for (const auto &resource_pair : + cached_resources.GetAvailableResources().GetResourceMap()) { + (*resources_data->mutable_resources_available())[resource_pair.first] = + resource_pair.second; + } + + resources_data->clear_resource_load(); + resources_data->set_resource_load_changed(true); + for (const auto &resource_pair : + cached_resources.GetLoadResources().GetResourceMap()) { + (*resources_data->mutable_resource_load())[resource_pair.first] = + resource_pair.second; + } + } +} + +Status ServiceBasedNodeInfoAccessor::AsyncGetAllResourceUsage( + const ItemCallback &callback) { + rpc::GetAllResourceUsageRequest request; + client_impl_->GetGcsRpcClient().GetAllResourceUsage( + request, + [callback](const Status &status, const rpc::GetAllResourceUsageReply &reply) { + callback(reply.resource_usage_data()); + RAY_LOG(DEBUG) << "Finished getting resource usage of all nodes, status = " + << status; + }); + return Status::OK(); +} + +Status ServiceBasedNodeInfoAccessor::AsyncSubscribeBatchedResourceUsage( + const ItemCallback &subscribe, const StatusCallback &done) { RAY_CHECK(subscribe != nullptr); - subscribe_batch_heartbeat_operation_ = [this, subscribe](const StatusCallback &done) { + subscribe_batch_resource_usage_operation_ = [this, + subscribe](const StatusCallback &done) { auto on_subscribe = [subscribe](const std::string &id, const std::string &data) { - rpc::HeartbeatBatchTableData heartbeat_batch_table_data; - heartbeat_batch_table_data.ParseFromString(data); - subscribe(heartbeat_batch_table_data); + rpc::ResourceUsageBatchData resources_batch_data; + resources_batch_data.ParseFromString(data); + subscribe(resources_batch_data); }; - return client_impl_->GetGcsPubSub().Subscribe(HEARTBEAT_BATCH_CHANNEL, "", + return client_impl_->GetGcsPubSub().Subscribe(RESOURCES_BATCH_CHANNEL, "", on_subscribe, done); }; - return subscribe_batch_heartbeat_operation_(done); + return subscribe_batch_resource_usage_operation_(done); } void ServiceBasedNodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_info) { @@ -752,8 +767,8 @@ void ServiceBasedNodeInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restar if (subscribe_resource_operation_ != nullptr) { RAY_CHECK_OK(subscribe_resource_operation_(nullptr)); } - if (subscribe_batch_heartbeat_operation_ != nullptr) { - RAY_CHECK_OK(subscribe_batch_heartbeat_operation_(nullptr)); + if (subscribe_batch_resource_usage_operation_ != nullptr) { + RAY_CHECK_OK(subscribe_batch_resource_usage_operation_(nullptr)); } } else { if (fetch_node_data_operation_ != nullptr) { diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index 1b9988af8..188850d06 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -182,16 +182,19 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor { Status AsyncReportHeartbeat(const std::shared_ptr &data_ptr, const StatusCallback &callback) override; - void AsyncReReportHeartbeat() override; + Status AsyncReportResourceUsage(const std::shared_ptr &data_ptr, + const StatusCallback &callback) override; - /// Fill resource fields with cached resources. Used by light heartbeat. - void FillHeartbeatRequest(rpc::ReportHeartbeatRequest &heartbeat); + void AsyncReReportResourceUsage() override; - Status AsyncGetAllHeartbeat( - const ItemCallback &callback) override; + /// Fill resource fields with cached resources. Used by light resource usage report. + void FillResourceUsageRequest(rpc::ReportResourceUsageRequest &resource_usage); - Status AsyncSubscribeBatchHeartbeat( - const ItemCallback &subscribe, + Status AsyncGetAllResourceUsage( + const ItemCallback &callback) override; + + Status AsyncSubscribeBatchedResourceUsage( + const ItemCallback &subscribe, const StatusCallback &done) override; void AsyncResubscribe(bool is_pubsub_server_restarted) override; @@ -208,18 +211,18 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor { /// server restarts from a failure. SubscribeOperation subscribe_node_operation_; SubscribeOperation subscribe_resource_operation_; - SubscribeOperation subscribe_batch_heartbeat_operation_; + SubscribeOperation subscribe_batch_resource_usage_operation_; /// Save the fetch data operation in this function, so we can call it again when GCS /// server restarts from a failure. FetchDataOperation fetch_node_data_operation_; - // Mutex to protect the cached_heartbeat_ field. + // Mutex to protect the cached_resource_usage_ field. absl::Mutex mutex_; - /// Save the heartbeat data, so we can resend it again when GCS server restarts from a - /// failure. - rpc::ReportHeartbeatRequest cached_heartbeat_ GUARDED_BY(mutex_); + /// Save the resource usage data, so we can resend it again when GCS server restarts + /// from a failure. + rpc::ReportResourceUsageRequest cached_resource_usage_ GUARDED_BY(mutex_); void HandleNotification(const GcsNodeInfo &node_info); diff --git a/src/ray/gcs/gcs_client/service_based_gcs_client.cc b/src/ray/gcs/gcs_client/service_based_gcs_client.cc index 552058f61..9bbbbde25 100644 --- a/src/ray/gcs/gcs_client/service_based_gcs_client.cc +++ b/src/ray/gcs/gcs_client/service_based_gcs_client.cc @@ -175,8 +175,8 @@ void ServiceBasedGcsClient::GcsServiceFailureDetected(rpc::GcsServiceFailureType // because we use the same Redis server for both GCS storage and pub-sub. So the // following flag is alway false. resubscribe_func_(false); - // Resend heartbeat after reconnected, needed by resource view in GCS. - node_accessor_->AsyncReReportHeartbeat(); + // Resend resource usage after reconnected, needed by resource view in GCS. + node_accessor_->AsyncReReportResourceUsage(); break; default: RAY_LOG(FATAL) << "Unsupported failure type: " << type; diff --git a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc index 19533b087..7bfc07545 100644 --- a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc +++ b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc @@ -186,12 +186,11 @@ TEST_F(GlobalStateAccessorTest, TestInternalConfig) { } } -TEST_F(GlobalStateAccessorTest, TestGetAllHeartbeat) { - std::unique_ptr heartbeats = global_state_->GetAllHeartbeat(); - rpc::HeartbeatBatchTableData heartbeat_batch_data; - heartbeat_batch_data.ParseFromString(*heartbeats.get()); - - ASSERT_EQ(heartbeat_batch_data.batch_size(), 0); +TEST_F(GlobalStateAccessorTest, TestGetAllResourceUsage) { + std::unique_ptr resources = global_state_->GetAllResourceUsage(); + rpc::ResourceUsageBatchData resource_usage_batch_data; + resource_usage_batch_data.ParseFromString(*resources.get()); + ASSERT_EQ(resource_usage_batch_data.batch_size(), 0); auto node_table_data = Mocker::GenNodeInfo(); std::promise promise; @@ -201,59 +200,60 @@ TEST_F(GlobalStateAccessorTest, TestGetAllHeartbeat) { auto node_table = global_state_->GetAllNodeInfo(); ASSERT_EQ(node_table.size(), 1); - // Report heartbeat first time. + // Report resource usage first time. std::promise promise1; - auto heartbeat1 = std::make_shared(); - heartbeat1->set_node_id(node_table_data->node_id()); - RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportHeartbeat( - heartbeat1, [&promise1](Status status) { promise1.set_value(status.ok()); })); + auto resources1 = std::make_shared(); + resources1->set_node_id(node_table_data->node_id()); + RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportResourceUsage( + resources1, [&promise1](Status status) { promise1.set_value(status.ok()); })); WaitReady(promise1.get_future(), timeout_ms_); - heartbeats = global_state_->GetAllHeartbeat(); - heartbeat_batch_data.ParseFromString(*heartbeats.get()); - ASSERT_EQ(heartbeat_batch_data.batch_size(), 1); + resources = global_state_->GetAllResourceUsage(); + resource_usage_batch_data.ParseFromString(*resources.get()); + ASSERT_EQ(resource_usage_batch_data.batch_size(), 1); - // Report heartbeat with resources changed. + // Report changed resource usage. std::promise promise2; - auto heartbeat2 = std::make_shared(); + auto heartbeat2 = std::make_shared(); heartbeat2->set_node_id(node_table_data->node_id()); (*heartbeat2->mutable_resources_total())["CPU"] = 1; (*heartbeat2->mutable_resources_total())["GPU"] = 10; heartbeat2->set_resources_available_changed(true); (*heartbeat2->mutable_resources_available())["GPU"] = 5; - RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportHeartbeat( + RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportResourceUsage( heartbeat2, [&promise2](Status status) { promise2.set_value(status.ok()); })); WaitReady(promise2.get_future(), timeout_ms_); - heartbeats = global_state_->GetAllHeartbeat(); - heartbeat_batch_data.ParseFromString(*heartbeats.get()); - ASSERT_EQ(heartbeat_batch_data.batch_size(), 1); - auto heartbeat_data = heartbeat_batch_data.mutable_batch()->at(0); - ASSERT_EQ(heartbeat_data.resources_total_size(), 2); - ASSERT_EQ((*heartbeat_data.mutable_resources_total())["CPU"], 1.0); - ASSERT_EQ((*heartbeat_data.mutable_resources_total())["GPU"], 10.0); - ASSERT_EQ(heartbeat_data.resources_available_size(), 1); - ASSERT_EQ((*heartbeat_data.mutable_resources_available())["GPU"], 5.0); + resources = global_state_->GetAllResourceUsage(); + resource_usage_batch_data.ParseFromString(*resources.get()); + ASSERT_EQ(resource_usage_batch_data.batch_size(), 1); + auto resources_data = resource_usage_batch_data.mutable_batch()->at(0); + ASSERT_EQ(resources_data.resources_total_size(), 2); + ASSERT_EQ((*resources_data.mutable_resources_total())["CPU"], 1.0); + ASSERT_EQ((*resources_data.mutable_resources_total())["GPU"], 10.0); + ASSERT_EQ(resources_data.resources_available_size(), 1); + ASSERT_EQ((*resources_data.mutable_resources_available())["GPU"], 5.0); - // Report heartbeat with resources unchanged. (Only works with light heartbeat enabled) + // Report unchanged resource usage. (Only works with light resource usage report + // enabled) std::promise promise3; - auto heartbeat3 = std::make_shared(); + auto heartbeat3 = std::make_shared(); heartbeat3->set_node_id(node_table_data->node_id()); (*heartbeat3->mutable_resources_available())["CPU"] = 1; (*heartbeat3->mutable_resources_available())["GPU"] = 6; - RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportHeartbeat( + RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportResourceUsage( heartbeat3, [&promise3](Status status) { promise3.set_value(status.ok()); })); WaitReady(promise3.get_future(), timeout_ms_); - heartbeats = global_state_->GetAllHeartbeat(); - heartbeat_batch_data.ParseFromString(*heartbeats.get()); - ASSERT_EQ(heartbeat_batch_data.batch_size(), 1); - heartbeat_data = heartbeat_batch_data.mutable_batch()->at(0); - ASSERT_EQ(heartbeat_data.resources_total_size(), 2); - ASSERT_EQ((*heartbeat_data.mutable_resources_total())["CPU"], 1.0); - ASSERT_EQ((*heartbeat_data.mutable_resources_total())["GPU"], 10.0); - ASSERT_EQ(heartbeat_data.resources_available_size(), 1); - ASSERT_EQ((*heartbeat_data.mutable_resources_available())["GPU"], 5.0); + resources = global_state_->GetAllResourceUsage(); + resource_usage_batch_data.ParseFromString(*resources.get()); + ASSERT_EQ(resource_usage_batch_data.batch_size(), 1); + resources_data = resource_usage_batch_data.mutable_batch()->at(0); + ASSERT_EQ(resources_data.resources_total_size(), 2); + ASSERT_EQ((*resources_data.mutable_resources_total())["CPU"], 1.0); + ASSERT_EQ((*resources_data.mutable_resources_total())["GPU"], 10.0); + ASSERT_EQ(resources_data.resources_available_size(), 1); + ASSERT_EQ((*resources_data.mutable_resources_available())["GPU"], 5.0); } TEST_F(GlobalStateAccessorTest, TestProfileTable) { diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 5b14f8f67..32294b242 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -328,10 +328,10 @@ class ServiceBasedGcsClientTest : public ::testing::Test { return WaitReady(promise.get_future(), timeout_ms_); } - bool SubscribeBatchHeartbeat( - const gcs::ItemCallback &subscribe) { + bool SubscribeBatchResourceUsage( + const gcs::ItemCallback &subscribe) { std::promise promise; - RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeBatchHeartbeat( + RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeBatchedResourceUsage( subscribe, [&promise](Status status) { promise.set_value(status.ok()); })); return WaitReady(promise.get_future(), timeout_ms_); } @@ -343,6 +343,13 @@ class ServiceBasedGcsClientTest : public ::testing::Test { return WaitReady(promise.get_future(), timeout_ms_); } + bool ReportResourceUsage(const std::shared_ptr resources) { + std::promise promise; + RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportResourceUsage( + resources, [&promise](Status status) { promise.set_value(status.ok()); })); + return WaitReady(promise.get_future(), timeout_ms_); + } + std::vector GetAllAvailableResources() { std::promise promise; std::vector resources; @@ -715,80 +722,76 @@ TEST_F(ServiceBasedGcsClientTest, TestNodeResources) { ASSERT_TRUE(GetResources(node_id).empty()); } -TEST_F(ServiceBasedGcsClientTest, TestNodeHeartbeat) { +TEST_F(ServiceBasedGcsClientTest, TestNodeResourceUsage) { // Subscribe batched state of all nodes from GCS. - std::atomic heartbeat_batch_count(0); - auto on_subscribe = - [&heartbeat_batch_count](const gcs::HeartbeatBatchTableData &result) { - ++heartbeat_batch_count; - }; - ASSERT_TRUE(SubscribeBatchHeartbeat(on_subscribe)); + std::atomic resource_batch_count(0); + auto on_subscribe = [&resource_batch_count](const gcs::ResourceUsageBatchData &result) { + ++resource_batch_count; + }; + ASSERT_TRUE(SubscribeBatchResourceUsage(on_subscribe)); // Register node. auto node_info = Mocker::GenNodeInfo(); RAY_CHECK(RegisterNode(*node_info)); - // Report heartbeat of a node to GCS. + // Report resource usage of a node to GCS. NodeID node_id = NodeID::FromBinary(node_info->node_id()); - auto heartbeat = std::make_shared(); - heartbeat->set_node_id(node_id.Binary()); - // Set this flag because GCS won't publish unchanged heartbeat. - heartbeat->set_should_global_gc(true); - ASSERT_TRUE(ReportHeartbeat(heartbeat)); - WaitForExpectedCount(heartbeat_batch_count, 1); + auto resource = std::make_shared(); + resource->set_node_id(node_id.Binary()); + resource->set_should_global_gc(true); + ASSERT_TRUE(ReportResourceUsage(resource)); + WaitForExpectedCount(resource_batch_count, 1); } -TEST_F(ServiceBasedGcsClientTest, TestNodeHeartbeatWithLightHeartbeat) { +TEST_F(ServiceBasedGcsClientTest, TestNodeResourceUsageWithLightResourceUsageReport) { // Subscribe batched state of all nodes from GCS. - std::atomic heartbeat_batch_count(0); - auto on_subscribe = - [&heartbeat_batch_count](const gcs::HeartbeatBatchTableData &result) { - ++heartbeat_batch_count; - }; - ASSERT_TRUE(SubscribeBatchHeartbeat(on_subscribe)); + std::atomic resource_batch_count(0); + auto on_subscribe = [&resource_batch_count](const gcs::ResourceUsageBatchData &result) { + ++resource_batch_count; + }; + ASSERT_TRUE(SubscribeBatchResourceUsage(on_subscribe)); // Register node. auto node_info = Mocker::GenNodeInfo(); RAY_CHECK(RegisterNode(*node_info)); - // Report unchanged heartbeat of a node to GCS. + // Report unchanged resource usage of a node to GCS. NodeID node_id = NodeID::FromBinary(node_info->node_id()); - auto heartbeat = std::make_shared(); - heartbeat->set_node_id(node_id.Binary()); - ASSERT_TRUE(ReportHeartbeat(heartbeat)); - WaitForExpectedCount(heartbeat_batch_count, 0); + auto resource = std::make_shared(); + resource->set_node_id(node_id.Binary()); + ASSERT_TRUE(ReportResourceUsage(resource)); + WaitForExpectedCount(resource_batch_count, 0); - // Report changed heartbeat of a node to GCS. - auto heartbeat1 = std::make_shared(); - heartbeat1->set_node_id(node_id.Binary()); - heartbeat1->set_resources_available_changed(true); - ASSERT_TRUE(ReportHeartbeat(heartbeat1)); - WaitForExpectedCount(heartbeat_batch_count, 1); + // Report changed resource usage of a node to GCS. + auto resource1 = std::make_shared(); + resource1->set_node_id(node_id.Binary()); + resource1->set_resources_available_changed(true); + ASSERT_TRUE(ReportResourceUsage(resource1)); + WaitForExpectedCount(resource_batch_count, 1); } TEST_F(ServiceBasedGcsClientTest, TestGetAllAvailableResources) { // Subscribe batched state of all nodes from GCS. - std::atomic heartbeat_batch_count(0); - auto on_subscribe = - [&heartbeat_batch_count](const gcs::HeartbeatBatchTableData &result) { - ++heartbeat_batch_count; - }; - ASSERT_TRUE(SubscribeBatchHeartbeat(on_subscribe)); + std::atomic resource_batch_count(0); + auto on_subscribe = [&resource_batch_count](const gcs::ResourceUsageBatchData &result) { + ++resource_batch_count; + }; + ASSERT_TRUE(SubscribeBatchResourceUsage(on_subscribe)); // Register node. auto node_info = Mocker::GenNodeInfo(); RAY_CHECK(RegisterNode(*node_info)); - // Report heartbeat of a node to GCS. + // Report resource usage of a node to GCS. NodeID node_id = NodeID::FromBinary(node_info->node_id()); - auto heartbeat = std::make_shared(); - heartbeat->set_node_id(node_id.Binary()); + auto resource = std::make_shared(); + resource->set_node_id(node_id.Binary()); // Set this flag to indicate resources has changed. - heartbeat->set_resources_available_changed(true); - (*heartbeat->mutable_resources_available())["CPU"] = 1.0; - (*heartbeat->mutable_resources_available())["GPU"] = 10.0; - ASSERT_TRUE(ReportHeartbeat(heartbeat)); - WaitForExpectedCount(heartbeat_batch_count, 1); + resource->set_resources_available_changed(true); + (*resource->mutable_resources_available())["CPU"] = 1.0; + (*resource->mutable_resources_available())["GPU"] = 10.0; + ASSERT_TRUE(ReportResourceUsage(resource)); + WaitForExpectedCount(resource_batch_count, 1); // Assert get all available resources right. std::vector resources = GetAllAvailableResources(); @@ -798,28 +801,28 @@ TEST_F(ServiceBasedGcsClientTest, TestGetAllAvailableResources) { EXPECT_EQ((*resources[0].mutable_resources_available())["GPU"], 10.0); } -TEST_F(ServiceBasedGcsClientTest, TestGetAllAvailableResourcesWithLightHeartbeat) { +TEST_F(ServiceBasedGcsClientTest, + TestGetAllAvailableResourcesWithLightResourceUsageReport) { // Subscribe batched state of all nodes from GCS. - std::atomic heartbeat_batch_count(0); - auto on_subscribe = - [&heartbeat_batch_count](const gcs::HeartbeatBatchTableData &result) { - ++heartbeat_batch_count; - }; - ASSERT_TRUE(SubscribeBatchHeartbeat(on_subscribe)); + std::atomic resource_batch_count(0); + auto on_subscribe = [&resource_batch_count](const gcs::ResourceUsageBatchData &result) { + ++resource_batch_count; + }; + ASSERT_TRUE(SubscribeBatchResourceUsage(on_subscribe)); // Register node. auto node_info = Mocker::GenNodeInfo(); RAY_CHECK(RegisterNode(*node_info)); - // Report heartbeat of a node to GCS. + // Report resource usage of a node to GCS. NodeID node_id = NodeID::FromBinary(node_info->node_id()); - auto heartbeat = std::make_shared(); - heartbeat->set_node_id(node_id.Binary()); - heartbeat->set_resources_available_changed(true); - (*heartbeat->mutable_resources_available())["CPU"] = 1.0; - (*heartbeat->mutable_resources_available())["GPU"] = 10.0; - ASSERT_TRUE(ReportHeartbeat(heartbeat)); - WaitForExpectedCount(heartbeat_batch_count, 1); + auto resource = std::make_shared(); + resource->set_node_id(node_id.Binary()); + resource->set_resources_available_changed(true); + (*resource->mutable_resources_available())["CPU"] = 1.0; + (*resource->mutable_resources_available())["GPU"] = 10.0; + ASSERT_TRUE(ReportResourceUsage(resource)); + WaitForExpectedCount(resource_batch_count, 1); // Assert get all available resources right. std::vector resources = GetAllAvailableResources(); @@ -828,12 +831,12 @@ TEST_F(ServiceBasedGcsClientTest, TestGetAllAvailableResourcesWithLightHeartbeat EXPECT_EQ((*resources[0].mutable_resources_available())["CPU"], 1.0); EXPECT_EQ((*resources[0].mutable_resources_available())["GPU"], 10.0); - // Report unchanged heartbeat of a node to GCS. - auto heartbeat1 = std::make_shared(); - heartbeat1->set_node_id(node_id.Binary()); - (*heartbeat1->mutable_resources_available())["GPU"] = 8.0; - ASSERT_TRUE(ReportHeartbeat(heartbeat1)); - WaitForExpectedCount(heartbeat_batch_count, 1); + // Report unchanged resource usage of a node to GCS. + auto resource1 = std::make_shared(); + resource1->set_node_id(node_id.Binary()); + (*resource1->mutable_resources_available())["GPU"] = 8.0; + ASSERT_TRUE(ReportResourceUsage(resource1)); + WaitForExpectedCount(resource_batch_count, 1); // The value would remain unchanged. std::vector resources1 = GetAllAvailableResources(); @@ -1145,24 +1148,24 @@ TEST_F(ServiceBasedGcsClientTest, TestNodeTableResubscribe) { ASSERT_TRUE(SubscribeToResources(resource_subscribe)); // Subscribe batched state of all nodes from GCS. - std::atomic batch_heartbeat_count(0); - auto batch_heartbeat_subscribe = - [&batch_heartbeat_count](const rpc::HeartbeatBatchTableData &result) { - ++batch_heartbeat_count; + std::atomic batch_resource_usage_count(0); + auto batch_resource_usage_subscribe = + [&batch_resource_usage_count](const rpc::ResourceUsageBatchData &result) { + ++batch_resource_usage_count; }; - ASSERT_TRUE(SubscribeBatchHeartbeat(batch_heartbeat_subscribe)); + ASSERT_TRUE(SubscribeBatchResourceUsage(batch_resource_usage_subscribe)); auto node_info = Mocker::GenNodeInfo(1); ASSERT_TRUE(RegisterNode(*node_info)); NodeID node_id = NodeID::FromBinary(node_info->node_id()); std::string key = "CPU"; ASSERT_TRUE(UpdateResources(node_id, key)); - auto heartbeat = std::make_shared(); - heartbeat->set_node_id(node_info->node_id()); - // Set this flag because GCS won't publish unchanged heartbeat. - heartbeat->set_should_global_gc(true); - ASSERT_TRUE(ReportHeartbeat(heartbeat)); - WaitForExpectedCount(batch_heartbeat_count, 1); + auto resources = std::make_shared(); + resources->set_node_id(node_info->node_id()); + // Set this flag because GCS won't publish unchanged resources. + resources->set_should_global_gc(true); + ASSERT_TRUE(ReportResourceUsage(resources)); + WaitForExpectedCount(batch_resource_usage_count, 1); RestartGcsServer(); @@ -1170,12 +1173,12 @@ TEST_F(ServiceBasedGcsClientTest, TestNodeTableResubscribe) { ASSERT_TRUE(RegisterNode(*node_info)); node_id = NodeID::FromBinary(node_info->node_id()); ASSERT_TRUE(UpdateResources(node_id, key)); - heartbeat->set_node_id(node_info->node_id()); - ASSERT_TRUE(ReportHeartbeat(heartbeat)); + resources->set_node_id(node_info->node_id()); + ASSERT_TRUE(ReportResourceUsage(resources)); WaitForExpectedCount(node_change_count, 2); WaitForExpectedCount(resource_change_count, 2); - WaitForExpectedCount(batch_heartbeat_count, 2); + WaitForExpectedCount(batch_resource_usage_count, 2); } TEST_F(ServiceBasedGcsClientTest, TestTaskTableResubscribe) { diff --git a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc new file mode 100644 index 000000000..592500e72 --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc @@ -0,0 +1,111 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/gcs/gcs_server/gcs_heartbeat_manager.h" +#include "ray/common/ray_config.h" +#include "ray/gcs/pb_util.h" +#include "src/ray/protobuf/gcs.pb.h" + +namespace ray { +namespace gcs { + +GcsHeartbeatManager::GcsHeartbeatManager( + boost::asio::io_service &io_service, + std::function on_node_death_callback) + : io_service_(io_service), + on_node_death_callback_(std::move(on_node_death_callback)), + num_heartbeats_timeout_(RayConfig::instance().num_heartbeats_timeout()), + detect_timer_(io_service) { + io_service_thread_.reset(new std::thread([this] { + /// The asio work to keep io_service_ alive. + boost::asio::io_service::work io_service_work_(io_service_); + io_service_.run(); + })); +} + +void GcsHeartbeatManager::Start() { + io_service_.post([this] { + if (!is_started_) { + Tick(); + is_started_ = true; + } + }); +} + +void GcsHeartbeatManager::Stop() { + io_service_.stop(); + if (io_service_thread_->joinable()) { + io_service_thread_->join(); + } +} + +void GcsHeartbeatManager::AddNode(const NodeID &node_id) { + io_service_.post( + [this, node_id] { heartbeats_.emplace(node_id, num_heartbeats_timeout_); }); +} + +void GcsHeartbeatManager::HandleReportHeartbeat( + const rpc::ReportHeartbeatRequest &request, rpc::ReportHeartbeatReply *reply, + rpc::SendReplyCallback send_reply_callback) { + NodeID node_id = NodeID::FromBinary(request.heartbeat().node_id()); + auto iter = heartbeats_.find(node_id); + if (iter == heartbeats_.end()) { + // Ignore this heartbeat as the node is not registered. + // TODO(Shanly): Maybe we should reply the raylet with an error. So the raylet can + // crash itself as soon as possible. + return; + } + + iter->second = num_heartbeats_timeout_; + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); +} + +/// A periodic timer that checks for timed out clients. +void GcsHeartbeatManager::Tick() { + DetectDeadNodes(); + ScheduleTick(); +} + +void GcsHeartbeatManager::DetectDeadNodes() { + for (auto it = heartbeats_.begin(); it != heartbeats_.end();) { + auto current = it++; + current->second = current->second - 1; + if (current->second == 0) { + auto node_id = current->first; + RAY_LOG(WARNING) << "Node timed out: " << node_id; + heartbeats_.erase(current); + if (on_node_death_callback_) { + on_node_death_callback_(node_id); + } + } + } +} + +void GcsHeartbeatManager::ScheduleTick() { + auto heartbeat_period = boost::posix_time::milliseconds( + RayConfig::instance().raylet_heartbeat_timeout_milliseconds()); + detect_timer_.expires_from_now(heartbeat_period); + detect_timer_.async_wait([this](const boost::system::error_code &error) { + if (error == boost::asio::error::operation_aborted) { + // `operation_aborted` is set when `detect_timer_` is canceled or destroyed. + // The Monitor lifetime may be short than the object who use it. (e.g. gcs_server) + return; + } + RAY_CHECK(!error) << "Checking heartbeat failed with error: " << error.message(); + Tick(); + }); +} + +} // namespace gcs +} // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h new file mode 100644 index 000000000..580daa6f3 --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h @@ -0,0 +1,89 @@ + +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "absl/container/flat_hash_map.h" +#include "ray/common/id.h" +#include "ray/gcs/accessor.h" +#include "ray/gcs/gcs_server/gcs_init_data.h" +#include "ray/rpc/client_call.h" +#include "ray/rpc/gcs_server/gcs_rpc_server.h" +#include "src/ray/protobuf/gcs.pb.h" +namespace ray { +namespace gcs { + +/// GcsHeartbeatManager is responsible for monitoring nodes liveness as well as +/// handing heartbeat rpc requests. This class is not thread-safe. +class GcsHeartbeatManager : public rpc::HeartbeatInfoHandler { + public: + /// Create a GcsHeartbeatManager. + /// + /// \param io_service The event loop to run the monitor on. + /// \param on_node_death_callback Callback that will be called when node death is + /// detected. + explicit GcsHeartbeatManager( + boost::asio::io_service &io_service, + std::function on_node_death_callback); + + /// Handle heartbeat rpc come from raylet. + void HandleReportHeartbeat(const rpc::ReportHeartbeatRequest &request, + rpc::ReportHeartbeatReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + + /// Start node failure detect loop. + void Start(); + + /// Stop node failure detect loop. + void Stop(); + + /// Register node to this detector. + /// Only if the node has registered, its heartbeat data will be accepted. + /// + /// \param node_id ID of the node to be registered. + void AddNode(const NodeID &node_id); + + protected: + /// A periodic timer that fires on every heartbeat period. Raylets that have + /// not sent a heartbeat within the last num_heartbeats_timeout ticks will be + /// marked as dead in the client table. + void Tick(); + + /// Check that if any raylet is inactive due to no heartbeat for a period of time. + /// If found any, mark it as dead. + void DetectDeadNodes(); + + /// Schedule another tick after a short time. + void ScheduleTick(); + + private: + /// The main event loop for node failure detector. + boost::asio::io_service &io_service_; + std::unique_ptr io_service_thread_; + /// The callback of node death. + std::function on_node_death_callback_; + /// The number of heartbeats that can be missed before a node is removed. + int64_t num_heartbeats_timeout_; + /// A timer that ticks every heartbeat_timeout_ms_ milliseconds. + boost::asio::deadline_timer detect_timer_; + /// For each Raylet that we receive a heartbeat from, the number of ticks + /// that may pass before the Raylet will be declared dead. + absl::flat_hash_map heartbeats_; + /// Is the detect started. + bool is_started_ = false; +}; + +} // namespace gcs +} // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 3860daee2..7fc9320df 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -22,113 +22,18 @@ namespace ray { namespace gcs { -GcsNodeManager::NodeFailureDetector::NodeFailureDetector( - boost::asio::io_service &io_service, - std::shared_ptr gcs_table_storage, - std::shared_ptr gcs_pub_sub, - std::function on_node_death_callback) - : gcs_table_storage_(std::move(gcs_table_storage)), - on_node_death_callback_(std::move(on_node_death_callback)), - num_heartbeats_timeout_(RayConfig::instance().num_heartbeats_timeout()), - light_heartbeat_enabled_(RayConfig::instance().light_heartbeat_enabled()), - detect_timer_(io_service), - gcs_pub_sub_(std::move(gcs_pub_sub)) {} - -void GcsNodeManager::NodeFailureDetector::Start() { - if (!is_started_) { - Tick(); - is_started_ = true; - } -} - -void GcsNodeManager::NodeFailureDetector::AddNode(const NodeID &node_id) { - heartbeats_.emplace(node_id, num_heartbeats_timeout_); -} - -void GcsNodeManager::NodeFailureDetector::HandleHeartbeat(const NodeID &node_id) { - auto iter = heartbeats_.find(node_id); - if (iter == heartbeats_.end()) { - // Ignore this heartbeat as the node is not registered. - // TODO(Shanly): Maybe we should reply the raylet with an error. So the raylet can - // crash itself as soon as possible. - return; - } - - iter->second = num_heartbeats_timeout_; -} - -/// A periodic timer that checks for timed out clients. -void GcsNodeManager::NodeFailureDetector::Tick() { - DetectDeadNodes(); - ScheduleTick(); -} - -void GcsNodeManager::NodeFailureDetector::DetectDeadNodes() { - for (auto it = heartbeats_.begin(); it != heartbeats_.end();) { - auto current = it++; - current->second = current->second - 1; - if (current->second == 0) { - auto node_id = current->first; - RAY_LOG(WARNING) << "Node timed out: " << node_id; - heartbeats_.erase(current); - if (on_node_death_callback_) { - on_node_death_callback_(node_id); - } - } - } -} - -void GcsNodeManager::NodeFailureDetector::ScheduleTick() { - auto heartbeat_period = boost::posix_time::milliseconds( - RayConfig::instance().raylet_heartbeat_timeout_milliseconds()); - detect_timer_.expires_from_now(heartbeat_period); - detect_timer_.async_wait([this](const boost::system::error_code &error) { - if (error == boost::asio::error::operation_aborted) { - // `operation_aborted` is set when `detect_timer_` is canceled or destroyed. - // The Monitor lifetime may be short than the object who use it. (e.g. gcs_server) - return; - } - RAY_CHECK(!error) << "Checking heartbeat failed with error: " << error.message(); - Tick(); - }); -} - ////////////////////////////////////////////////////////////////////////////////////////// GcsNodeManager::GcsNodeManager( - boost::asio::io_service &main_io_service, - boost::asio::io_service &node_failure_detector_io_service, - std::shared_ptr gcs_pub_sub, + boost::asio::io_service &main_io_service, std::shared_ptr gcs_pub_sub, std::shared_ptr gcs_table_storage, std::shared_ptr gcs_resource_manager) - : main_io_service_(main_io_service), - node_failure_detector_(new NodeFailureDetector( - node_failure_detector_io_service, gcs_table_storage, gcs_pub_sub, - [this](const NodeID &node_id) { - // Post this to main event loop to avoid potential concurrency issues. - main_io_service_.post([this, node_id] { - if (auto node = RemoveNode(node_id, /* is_intended = */ false)) { - node->set_state(rpc::GcsNodeInfo::DEAD); - node->set_timestamp(current_sys_time_ms()); - AddDeadNodeToCache(node); - auto on_done = [this, node_id, node](const Status &status) { - auto on_done = [this, node_id, node](const Status &status) { - RAY_CHECK_OK(gcs_pub_sub_->Publish( - NODE_CHANNEL, node_id.Hex(), node->SerializeAsString(), nullptr)); - }; - RAY_CHECK_OK( - gcs_table_storage_->NodeResourceTable().Delete(node_id, on_done)); - }; - RAY_CHECK_OK( - gcs_table_storage_->NodeTable().Put(node_id, *node, on_done)); - } - }); - })), - node_failure_detector_service_(node_failure_detector_io_service), - heartbeat_timer_(main_io_service), + : resource_timer_(main_io_service), + light_report_resource_usage_enabled_( + RayConfig::instance().light_report_resource_usage_enabled()), gcs_pub_sub_(gcs_pub_sub), gcs_table_storage_(gcs_table_storage), gcs_resource_manager_(gcs_resource_manager) { - SendBatchedHeartbeat(); + SendBatchedResourceUsage(); } void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request, @@ -192,46 +97,27 @@ void GcsNodeManager::HandleGetAllNodeInfo(const rpc::GetAllNodeInfoRequest &requ ++counts_[CountType::GET_ALL_NODE_INFO_REQUEST]; } -void GcsNodeManager::HandleReportHeartbeat(const rpc::ReportHeartbeatRequest &request, - rpc::ReportHeartbeatReply *reply, - rpc::SendReplyCallback send_reply_callback) { - NodeID node_id = NodeID::FromBinary(request.heartbeat().node_id()); - auto heartbeat_data = std::make_shared(); - heartbeat_data->CopyFrom(request.heartbeat()); - - UpdateNodeHeartbeat(node_id, request); - - // Update node realtime resources. - UpdateNodeRealtimeResources(node_id, *heartbeat_data); - - if (!RayConfig::instance().light_heartbeat_enabled() || - heartbeat_data->should_global_gc() || heartbeat_data->resources_total_size() > 0 || - heartbeat_data->resources_available_changed() || - heartbeat_data->resource_load_changed()) { - heartbeat_buffer_[node_id] = *heartbeat_data; - } - - // Note: To avoid heartbeats being delayed by main thread, make sure heartbeat is always - // handled by its own IO service. - node_failure_detector_service_.post( - [this, node_id] { node_failure_detector_->HandleHeartbeat(node_id); }); - GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); - ++counts_[CountType::REPORT_HEARTBEAT_REQUEST]; -} - -// TODO(WangTao): Implenent this to handle resource usage report. Basically move resources -// related operations in `HandleReportHeartbeat`. void GcsNodeManager::HandleReportResourceUsage( const rpc::ReportResourceUsageRequest &request, rpc::ReportResourceUsageReply *reply, rpc::SendReplyCallback send_reply_callback) { - GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); -} + NodeID node_id = NodeID::FromBinary(request.resources().node_id()); + auto resources_data = std::make_shared(); + resources_data->CopyFrom(request.resources()); + + UpdateNodeResourceUsage(node_id, request); + + // Update node realtime resources. + UpdateNodeRealtimeResources(node_id, *resources_data); + + if (!light_report_resource_usage_enabled_ || resources_data->should_global_gc() || + resources_data->resources_total_size() > 0 || + resources_data->resources_available_changed() || + resources_data->resource_load_changed()) { + resources_buffer_[node_id] = *resources_data; + } -// TODO(WangTao): Implement this. Basically copy from `HandleGetAllHeartbeat`. -void GcsNodeManager::HandleGetAllResourceUsage( - const rpc::GetAllResourceUsageRequest &request, rpc::GetAllResourceUsageReply *reply, - rpc::SendReplyCallback send_reply_callback) { GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + ++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST]; } void GcsNodeManager::HandleGetResources(const rpc::GetResourcesRequest &request, @@ -364,15 +250,15 @@ void GcsNodeManager::HandleGetAllAvailableResources( ++counts_[CountType::GET_ALL_AVAILABLE_RESOURCES_REQUEST]; } -void GcsNodeManager::HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &request, - rpc::GetAllHeartbeatReply *reply, - rpc::SendReplyCallback send_reply_callback) { - if (!node_heartbeats_.empty()) { - auto batch = std::make_shared(); +void GcsNodeManager::HandleGetAllResourceUsage( + const rpc::GetAllResourceUsageRequest &request, rpc::GetAllResourceUsageReply *reply, + rpc::SendReplyCallback send_reply_callback) { + if (!node_resource_usages_.empty()) { + auto batch = std::make_shared(); absl::flat_hash_map aggregate_load; - for (const auto &heartbeat : node_heartbeats_) { + for (const auto &usage : node_resource_usages_) { // Aggregate the load reported by each raylet. - auto load = heartbeat.second.resource_load_by_shape(); + auto load = usage.second.resource_load_by_shape(); for (const auto &demand : load.resource_demands()) { auto scheduling_key = ResourceSet(MapFromProtobuf(demand.shape())); auto &aggregate_demand = aggregate_load[scheduling_key]; @@ -388,7 +274,7 @@ void GcsNodeManager::HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &re } } - batch->add_batch()->CopyFrom(heartbeat.second); + batch->add_batch()->CopyFrom(usage.second); } for (const auto &demand : aggregate_load) { @@ -406,34 +292,33 @@ void GcsNodeManager::HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &re auto placement_group_load_proto = batch->mutable_placement_group_load(); placement_group_load_proto->CopyFrom(*placement_group_load.get()); } - reply->mutable_heartbeat_data()->CopyFrom(*batch); + reply->mutable_resource_usage_data()->CopyFrom(*batch); } GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); - ++counts_[CountType::GET_ALL_HEARTBEAT_REQUEST]; + ++counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST]; } -void GcsNodeManager::UpdateNodeHeartbeat(const NodeID node_id, - const rpc::ReportHeartbeatRequest &request) { - auto iter = node_heartbeats_.find(node_id); - if (!RayConfig::instance().light_heartbeat_enabled() || - iter == node_heartbeats_.end()) { - auto heartbeat_data = std::make_shared(); - heartbeat_data->CopyFrom(request.heartbeat()); - node_heartbeats_[node_id] = *heartbeat_data; +void GcsNodeManager::UpdateNodeResourceUsage( + const NodeID node_id, const rpc::ReportResourceUsageRequest &request) { + auto iter = node_resource_usages_.find(node_id); + if (!light_report_resource_usage_enabled_ || iter == node_resource_usages_.end()) { + auto resources_data = std::make_shared(); + resources_data->CopyFrom(request.resources()); + node_resource_usages_[node_id] = *resources_data; } else { - if (request.heartbeat().resources_total_size() > 0) { - (*iter->second.mutable_resources_total()) = request.heartbeat().resources_total(); + if (request.resources().resources_total_size() > 0) { + (*iter->second.mutable_resources_total()) = request.resources().resources_total(); } - if (request.heartbeat().resources_available_changed()) { + if (request.resources().resources_available_changed()) { (*iter->second.mutable_resources_available()) = - request.heartbeat().resources_available(); + request.resources().resources_available(); } - if (request.heartbeat().resource_load_changed()) { - (*iter->second.mutable_resource_load()) = request.heartbeat().resource_load(); + if (request.resources().resource_load_changed()) { + (*iter->second.mutable_resource_load()) = request.resources().resource_load(); } (*iter->second.mutable_resource_load_by_shape()) = - request.heartbeat().resource_load_by_shape(); + request.resources().resource_load_by_shape(); } } @@ -454,11 +339,6 @@ void GcsNodeManager::AddNode(std::shared_ptr node) { alive_nodes_.emplace(node_id, node); // Add an empty resources for this node. RAY_CHECK(cluster_resources_.emplace(node_id, rpc::ResourceMap()).second); - // Register this node to the `node_failure_detector_` which will start monitoring it. - // Note: To avoid heartbeats being delayed by main thread, make sure node addition is - // always handled by its own IO service. - node_failure_detector_service_.post( - [this, node_id] { node_failure_detector_->AddNode(node_id); }); // Notify all listeners. for (auto &listener : node_added_listeners_) { @@ -480,7 +360,7 @@ std::shared_ptr GcsNodeManager::RemoveNode( alive_nodes_.erase(iter); // Remove from cluster resources. cluster_resources_.erase(node_id); - heartbeat_buffer_.erase(node_id); + resources_buffer_.erase(node_id); if (!is_intended) { // Broadcast a warning to all of the drivers indicating that the node // has been marked as dead. @@ -505,11 +385,25 @@ std::shared_ptr GcsNodeManager::RemoveNode( return removed_node; } +void GcsNodeManager::OnNodeFailure(const NodeID &node_id) { + if (auto node = RemoveNode(node_id, /* is_intended = */ false)) { + node->set_state(rpc::GcsNodeInfo::DEAD); + node->set_timestamp(current_sys_time_ms()); + AddDeadNodeToCache(node); + auto on_done = [this, node_id, node](const Status &status) { + auto on_done = [this, node_id, node](const Status &status) { + RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(), + node->SerializeAsString(), nullptr)); + }; + RAY_CHECK_OK(gcs_table_storage_->NodeResourceTable().Delete(node_id, on_done)); + }; + RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_done)); + } +} + void GcsNodeManager::Initialize(const GcsInitData &gcs_init_data) { for (const auto &item : gcs_init_data.Nodes()) { if (item.second.state() == rpc::GcsNodeInfo::ALIVE) { - // Call `AddNode` for this node to make sure it is tracked by the failure - // detector. AddNode(std::make_shared(item.second)); } else if (item.second.state() == rpc::GcsNodeInfo::DEAD) { dead_nodes_.emplace(item.first, std::make_shared(item.second)); @@ -527,19 +421,13 @@ void GcsNodeManager::Initialize(const GcsInitData &gcs_init_data) { } } -void GcsNodeManager::StartNodeFailureDetector() { - // Note: To avoid heartbeats being delayed by main thread, make sure detector start is - // always handled by its own IO service. - node_failure_detector_service_.post([this] { node_failure_detector_->Start(); }); -} - void GcsNodeManager::UpdateNodeRealtimeResources( - const NodeID &node_id, const rpc::HeartbeatTableData &heartbeat) { - if (!RayConfig::instance().light_heartbeat_enabled() || + const NodeID &node_id, const rpc::ResourcesData &resource_data) { + if (!light_report_resource_usage_enabled_ || gcs_resource_manager_->GetClusterResources().count(node_id) == 0 || - heartbeat.resources_available_changed()) { + resource_data.resources_available_changed()) { gcs_resource_manager_->UpdateResources( - node_id, ResourceSet(MapFromProtobuf(heartbeat.resources_available()))); + node_id, ResourceSet(MapFromProtobuf(resource_data.resources_available()))); } } @@ -560,31 +448,31 @@ void GcsNodeManager::AddDeadNodeToCache(std::shared_ptr node) sorted_dead_node_list_.emplace_back(node_id, node->timestamp()); } -void GcsNodeManager::SendBatchedHeartbeat() { - if (!heartbeat_buffer_.empty()) { - auto batch = std::make_shared(); - for (auto &heartbeat : heartbeat_buffer_) { - batch->add_batch()->Swap(&heartbeat.second); +void GcsNodeManager::SendBatchedResourceUsage() { + if (!resources_buffer_.empty()) { + auto batch = std::make_shared(); + for (auto &resources : resources_buffer_) { + batch->add_batch()->Swap(&resources.second); } stats::OutboundHeartbeatSizeKB.Record((double)(batch->ByteSizeLong() / 1024.0)); - RAY_CHECK_OK(gcs_pub_sub_->Publish(HEARTBEAT_BATCH_CHANNEL, "", + RAY_CHECK_OK(gcs_pub_sub_->Publish(RESOURCES_BATCH_CHANNEL, "", batch->SerializeAsString(), nullptr)); - heartbeat_buffer_.clear(); + resources_buffer_.clear(); } - auto heartbeat_period = boost::posix_time::milliseconds( - RayConfig::instance().raylet_heartbeat_timeout_milliseconds()); - heartbeat_timer_.expires_from_now(heartbeat_period); - heartbeat_timer_.async_wait([this](const boost::system::error_code &error) { + auto resources_period = boost::posix_time::milliseconds( + RayConfig::instance().raylet_report_resources_period_milliseconds()); + resource_timer_.expires_from_now(resources_period); + resource_timer_.async_wait([this](const boost::system::error_code &error) { if (error == boost::asio::error::operation_aborted) { - // `operation_aborted` is set when `heartbeat_timer_` is canceled or destroyed. + // `operation_aborted` is set when `resource_timer_` is canceled or destroyed. // The Monitor lifetime may be short than the object who use it. (e.g. gcs_server) return; } - RAY_CHECK(!error) << "Sending batched heartbeat failed with error: " + RAY_CHECK(!error) << "Sending batched resource usage failed with error: " << error.message(); - SendBatchedHeartbeat(); + SendBatchedResourceUsage(); }); } @@ -596,11 +484,11 @@ std::string GcsNodeManager::DebugString() const { << counts_[CountType::UNREGISTER_NODE_REQUEST] << ", GetAllNodeInfo request count: " << counts_[CountType::GET_ALL_NODE_INFO_REQUEST] - << ", ReportHeartbeat request count: " - << counts_[CountType::REPORT_HEARTBEAT_REQUEST] + << ", ReportResourceUsage request count: " + << counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST] << ", GetHeartbeat request count: " << counts_[CountType::GET_HEARTBEAT_REQUEST] - << ", GetAllHeartbeat request count: " - << counts_[CountType::GET_ALL_HEARTBEAT_REQUEST] + << ", GetAllResourceUsage request count: " + << counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST] << ", GetResources request count: " << counts_[CountType::GET_RESOURCES_REQUEST] << ", UpdateResources request count: " << counts_[CountType::UPDATE_RESOURCES_REQUEST] diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index 41b70b957..e0543ee00 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -37,12 +37,10 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// Create a GcsNodeManager. /// /// \param main_io_service The main event loop. - /// \param node_failure_detector_io_service The event loop of node failure detector. /// \param gcs_pub_sub GCS message publisher. /// \param gcs_table_storage GCS table external storage accessor. /// \param gcs_resource_manager GCS resource manager. explicit GcsNodeManager(boost::asio::io_service &main_io_service, - boost::asio::io_service &node_failure_detector_io_service, std::shared_ptr gcs_pub_sub, std::shared_ptr gcs_table_storage, std::shared_ptr gcs_resource_manager); @@ -62,11 +60,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler { rpc::GetAllNodeInfoReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Handle heartbeat rpc come from raylet. - void HandleReportHeartbeat(const rpc::ReportHeartbeatRequest &request, - rpc::ReportHeartbeatReply *reply, - rpc::SendReplyCallback send_reply_callback) override; - /// Handle report resource usage rpc come from raylet. void HandleReportResourceUsage(const rpc::ReportResourceUsageRequest &request, rpc::ReportResourceUsageReply *reply, @@ -108,17 +101,14 @@ class GcsNodeManager : public rpc::NodeInfoHandler { rpc::GetAllAvailableResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Handle get all heartbeat rpc request. Only used when light heartbeat enabled. - void HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &request, - rpc::GetAllHeartbeatReply *reply, - rpc::SendReplyCallback send_reply_callback) override; - - /// Update heartbeat of given node. + /// Update resource usage of given node. /// /// \param node_id Node id. - /// \param request Request containing heartbeat. - void UpdateNodeHeartbeat(const NodeID node_id, - const rpc::ReportHeartbeatRequest &request); + /// \param request Request containing resource usage. + void UpdateNodeResourceUsage(const NodeID node_id, + const rpc::ReportResourceUsageRequest &request); + + void OnNodeFailure(const NodeID &node_id); /// Add an alive node. /// @@ -171,12 +161,9 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// \param gcs_init_data. void Initialize(const GcsInitData &gcs_init_data); - /// Start node failure detector. - void StartNodeFailureDetector(); - // Update node realtime resources. void UpdateNodeRealtimeResources(const NodeID &node_id, - const rpc::HeartbeatTableData &heartbeat); + const rpc::ResourcesData &heartbeat); /// Update the placement group load information so that it will be reported through /// heartbeat. @@ -187,72 +174,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler { std::string DebugString() const; - protected: - class NodeFailureDetector { - public: - /// Create a NodeFailureDetector. - /// - /// \param io_service The event loop to run the monitor on. - /// \param gcs_table_storage GCS table external storage accessor. - /// \param gcs_pub_sub GCS message publisher. - /// \param on_node_death_callback Callback that will be called when node death is - /// detected. - explicit NodeFailureDetector( - boost::asio::io_service &io_service, - std::shared_ptr gcs_table_storage, - std::shared_ptr gcs_pub_sub, - std::function on_node_death_callback); - - // Note: To avoid heartbeats being delayed by main thread, all public methods below - // should be posted to its own IO service. - - /// Start failure detector. - void Start(); - - /// Register node to this detector. - /// Only if the node has registered, its heartbeat data will be accepted. - /// - /// \param node_id ID of the node to be registered. - void AddNode(const NodeID &node_id); - - /// Handle a heartbeat from a Raylet. - /// - /// \param node_id The node ID of the Raylet that sent the heartbeat. - void HandleHeartbeat(const NodeID &node_id); - - protected: - /// A periodic timer that fires on every heartbeat period. Raylets that have - /// not sent a heartbeat within the last num_heartbeats_timeout ticks will be - /// marked as dead in the client table. - void Tick(); - - /// Check that if any raylet is inactive due to no heartbeat for a period of time. - /// If found any, mark it as dead. - void DetectDeadNodes(); - - /// Schedule another tick after a short time. - void ScheduleTick(); - - protected: - /// Storage for GCS tables. - std::shared_ptr gcs_table_storage_; - /// The callback of node death. - std::function on_node_death_callback_; - /// The number of heartbeats that can be missed before a node is removed. - int64_t num_heartbeats_timeout_; - // Only the changed part will be included in heartbeat if this is true. - const bool light_heartbeat_enabled_; - /// A timer that ticks every heartbeat_timeout_ms_ milliseconds. - boost::asio::deadline_timer detect_timer_; - /// For each Raylet that we receive a heartbeat from, the number of ticks - /// that may pass before the Raylet will be declared dead. - absl::flat_hash_map heartbeats_; - /// A publisher for publishing gcs messages. - std::shared_ptr gcs_pub_sub_; - /// Is the detect started. - bool is_started_ = false; - }; - private: /// Add the dead node to the cache. If the cache is full, the earliest dead node is /// evicted. @@ -260,17 +181,13 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// \param node The node which is dead. void AddDeadNodeToCache(std::shared_ptr node); - /// Send any buffered heartbeats as a single publish. - void SendBatchedHeartbeat(); + /// Send any buffered resource usage as a single publish. + void SendBatchedResourceUsage(); - /// The main event loop for node failure detector. - boost::asio::io_service &main_io_service_; - /// Detector to detect the failure of node. - std::unique_ptr node_failure_detector_; - /// The event loop for node failure detector. - boost::asio::io_service &node_failure_detector_service_; - /// A timer that ticks every heartbeat_timeout_ms_ milliseconds. - boost::asio::deadline_timer heartbeat_timer_; + /// A timer that ticks every raylet_report_resources_period_milliseconds. + boost::asio::deadline_timer resource_timer_; + // Only the changed part will be reported if this is true. + const bool light_report_resource_usage_enabled_; /// Alive nodes. absl::flat_hash_map> alive_nodes_; /// Dead nodes. @@ -280,10 +197,10 @@ class GcsNodeManager : public rpc::NodeInfoHandler { std::list> sorted_dead_node_list_; /// Cluster resources. absl::flat_hash_map cluster_resources_; - /// Newest heartbeat of all nodes. - absl::flat_hash_map node_heartbeats_; - /// A buffer containing heartbeats received from node managers in the last tick. - absl::flat_hash_map heartbeat_buffer_; + /// Newest resource usage of all nodes. + absl::flat_hash_map node_resource_usages_; + /// A buffer containing resource usage received from node managers in the last tick. + absl::flat_hash_map resources_buffer_; /// Listeners which monitors the addition of nodes. std::vector)>> node_added_listeners_; @@ -304,9 +221,9 @@ class GcsNodeManager : public rpc::NodeInfoHandler { REGISTER_NODE_REQUEST = 0, UNREGISTER_NODE_REQUEST = 1, GET_ALL_NODE_INFO_REQUEST = 2, - REPORT_HEARTBEAT_REQUEST = 3, + REPORT_RESOURCE_USAGE_REQUEST = 3, GET_HEARTBEAT_REQUEST = 4, - GET_ALL_HEARTBEAT_REQUEST = 5, + GET_ALL_RESOURCE_USAGE_REQUEST = 5, GET_RESOURCES_REQUEST = 6, UPDATE_RESOURCES_REQUEST = 7, DELETE_RESOURCES_REQUEST = 8, diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 0c6ef079e..489484a94 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -73,6 +73,9 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) { // Init gcs node manager. InitGcsNodeManager(gcs_init_data); + // Init gcs heartbeat manager. + InitGcsHeartbeatManager(gcs_init_data); + // Init gcs job manager. InitGcsJobManager(); @@ -103,11 +106,11 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) { // Store gcs rpc server address in redis. StoreGcsServerAddressInRedis(); - // Only after the rpc_server_ is running can the node failure - // detector be run. Otherwise the node failure detector will mistake + // Only after the rpc_server_ is running can the heartbeat manager + // be run. Otherwise the node failure detector will mistake // some living nodes as dead as the timer inside node failure // detector is already run. - gcs_node_manager_->StartNodeFailureDetector(); + gcs_heartbeat_manager_->Start(); // Print debug info periodically. PrintDebugInfo(); @@ -121,10 +124,7 @@ void GcsServer::Stop() { // Shutdown the rpc server rpc_server_.Shutdown(); - node_manager_io_service_.stop(); - if (node_manager_io_service_thread_->joinable()) { - node_manager_io_service_thread_->join(); - } + gcs_heartbeat_manager_->Stop(); is_stopped_ = true; RAY_LOG(INFO) << "GCS server stopped."; @@ -133,14 +133,8 @@ void GcsServer::Stop() { void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) { RAY_CHECK(redis_gcs_client_ && gcs_table_storage_ && gcs_pub_sub_); - node_manager_io_service_thread_.reset(new std::thread([this] { - /// The asio work to keep node_manager_io_service_ alive. - boost::asio::io_service::work node_manager_io_service_work_(node_manager_io_service_); - node_manager_io_service_.run(); - })); gcs_node_manager_ = std::make_shared( - main_service_, node_manager_io_service_, gcs_pub_sub_, gcs_table_storage_, - gcs_resource_manager_); + main_service_, gcs_pub_sub_, gcs_table_storage_, gcs_resource_manager_); // Initialize by gcs tables data. gcs_node_manager_->Initialize(gcs_init_data); // Register service. @@ -149,6 +143,23 @@ void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) { rpc_server_.RegisterService(*node_info_service_); } +void GcsServer::InitGcsHeartbeatManager(const GcsInitData &gcs_init_data) { + RAY_CHECK(gcs_node_manager_); + gcs_heartbeat_manager_ = std::make_shared( + heartbeat_manager_io_service_, /*on_node_death_callback=*/ + [this](const NodeID &node_id) { + main_service_.post( + [this, node_id] { return gcs_node_manager_->OnNodeFailure(node_id); }); + }); + for (const auto &node : gcs_init_data.Nodes()) { + gcs_heartbeat_manager_->AddNode(node.first); + } + // Register service. + heartbeat_info_service_.reset(new rpc::HeartbeatInfoGrpcService( + heartbeat_manager_io_service_, *gcs_heartbeat_manager_)); + rpc_server_.RegisterService(*heartbeat_info_service_); +} + void GcsServer::InitGcsResourceManager() { gcs_resource_manager_ = std::make_shared(); } @@ -276,6 +287,7 @@ void GcsServer::InstallEventListeners() { // placement groups and the pending actors. gcs_placement_group_manager_->SchedulePendingPlacementGroups(); gcs_actor_manager_->SchedulePendingActors(); + gcs_heartbeat_manager_->AddNode(NodeID::FromBinary(node->node_id())); }); gcs_node_manager_->AddNodeRemovedListener( [this](std::shared_ptr node) { diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index a10507ea6..3dde5d06b 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -14,6 +14,7 @@ #pragma once +#include "ray/gcs/gcs_server/gcs_heartbeat_manager.h" #include "ray/gcs/gcs_server/gcs_init_data.h" #include "ray/gcs/gcs_server/gcs_object_manager.h" #include "ray/gcs/gcs_server/gcs_redis_failure_detector.h" @@ -79,6 +80,9 @@ class GcsServer { /// Initialize gcs node manager. void InitGcsNodeManager(const GcsInitData &gcs_init_data); + /// Initialize gcs heartbeat manager. + void InitGcsHeartbeatManager(const GcsInitData &gcs_init_data); + /// Initialize gcs resource manager. void InitGcsResourceManager(); @@ -124,10 +128,9 @@ class GcsServer { GcsServerConfig config_; /// The main io service to drive event posted from grpc threads. boost::asio::io_context &main_service_; - /// The io service used by node manager in case of node failure detector being blocked - /// by main thread. - boost::asio::io_service node_manager_io_service_; - std::unique_ptr node_manager_io_service_thread_; + /// The io service used by heartbeat manager in case of node failure detector being + /// blocked by main thread. + boost::asio::io_service heartbeat_manager_io_service_; /// The grpc server rpc::GrpcServer rpc_server_; /// The `ClientCallManager` object that is shared by all `NodeManagerWorkerClient`s. @@ -138,6 +141,8 @@ class GcsServer { std::shared_ptr gcs_resource_manager_; /// The gcs node manager. std::shared_ptr gcs_node_manager_; + /// The heartbeat manager. + std::shared_ptr gcs_heartbeat_manager_; /// The gcs redis failure detector. std::shared_ptr gcs_redis_failure_detector_; /// The gcs actor manager @@ -151,6 +156,8 @@ class GcsServer { std::unique_ptr actor_info_service_; /// Node info handler and service std::unique_ptr node_info_service_; + /// Heartbeat info handler and service + std::unique_ptr heartbeat_info_service_; /// Object info handler and service std::unique_ptr gcs_object_manager_; std::unique_ptr object_info_service_; diff --git a/src/ray/gcs/gcs_server/gcs_table_storage.cc b/src/ray/gcs/gcs_server/gcs_table_storage.cc index 41a914272..5eea83484 100644 --- a/src/ray/gcs/gcs_server/gcs_table_storage.cc +++ b/src/ray/gcs/gcs_server/gcs_table_storage.cc @@ -130,7 +130,7 @@ template class GcsTable; template class GcsTable; template class GcsTable; template class GcsTable; -template class GcsTable; +template class GcsTable; template class GcsTable; template class GcsTable; template class GcsTable; diff --git a/src/ray/gcs/gcs_server/gcs_table_storage.h b/src/ray/gcs/gcs_server/gcs_table_storage.h index 6378c161b..4460d150a 100644 --- a/src/ray/gcs/gcs_server/gcs_table_storage.h +++ b/src/ray/gcs/gcs_server/gcs_table_storage.h @@ -26,7 +26,6 @@ namespace gcs { using rpc::ActorTableData; using rpc::ErrorTableData; using rpc::GcsNodeInfo; -using rpc::HeartbeatBatchTableData; using rpc::HeartbeatTableData; using rpc::JobTableData; using rpc::ObjectLocationInfo; @@ -35,6 +34,7 @@ using rpc::PlacementGroupTableData; using rpc::ProfileTableData; using rpc::ResourceMap; using rpc::ResourceTableData; +using rpc::ResourceUsageBatchData; using rpc::ScheduleData; using rpc::StoredConfig; using rpc::TaskLeaseData; @@ -255,11 +255,11 @@ class GcsPlacementGroupScheduleTable : public GcsTable { +class GcsResourceUsageBatchTable : public GcsTable { public: - explicit GcsHeartbeatBatchTable(std::shared_ptr &store_client) + explicit GcsResourceUsageBatchTable(std::shared_ptr &store_client) : GcsTable(store_client) { - table_name_ = TablePrefix_Name(TablePrefix::HEARTBEAT_BATCH); + table_name_ = TablePrefix_Name(TablePrefix::RESOURCE_USAGE_BATCH); } }; @@ -348,9 +348,9 @@ class GcsTableStorage { return *heartbeat_table_; } - GcsHeartbeatBatchTable &HeartbeatBatchTable() { - RAY_CHECK(heartbeat_batch_table_ != nullptr); - return *heartbeat_batch_table_; + GcsResourceUsageBatchTable &HeartbeatBatchTable() { + RAY_CHECK(resource_usage_batch_table_ != nullptr); + return *resource_usage_batch_table_; } GcsProfileTable &ProfileTable() { @@ -381,7 +381,7 @@ class GcsTableStorage { std::unique_ptr node_resource_table_; std::unique_ptr placement_group_schedule_table_; std::unique_ptr heartbeat_table_; - std::unique_ptr heartbeat_batch_table_; + std::unique_ptr resource_usage_batch_table_; std::unique_ptr profile_table_; std::unique_ptr worker_table_; std::unique_ptr system_config_table_; @@ -408,7 +408,7 @@ class RedisGcsTableStorage : public GcsTableStorage { heartbeat_table_.reset(new GcsHeartbeatTable(store_client_)); placement_group_schedule_table_.reset( new GcsPlacementGroupScheduleTable(store_client_)); - heartbeat_batch_table_.reset(new GcsHeartbeatBatchTable(store_client_)); + resource_usage_batch_table_.reset(new GcsResourceUsageBatchTable(store_client_)); profile_table_.reset(new GcsProfileTable(store_client_)); worker_table_.reset(new GcsWorkerTable(store_client_)); system_config_table_.reset(new GcsInternalConfigTable(store_client_)); @@ -434,7 +434,7 @@ class InMemoryGcsTableStorage : public GcsTableStorage { placement_group_schedule_table_.reset( new GcsPlacementGroupScheduleTable(store_client_)); heartbeat_table_.reset(new GcsHeartbeatTable(store_client_)); - heartbeat_batch_table_.reset(new GcsHeartbeatBatchTable(store_client_)); + resource_usage_batch_table_.reset(new GcsResourceUsageBatchTable(store_client_)); profile_table_.reset(new GcsProfileTable(store_client_)); worker_table_.reset(new GcsWorkerTable(store_client_)); system_config_table_.reset(new GcsInternalConfigTable(store_client_)); diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index ed429ae11..67e857ab9 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -27,9 +27,8 @@ class GcsActorSchedulerTest : public ::testing::Test { gcs_pub_sub_ = std::make_shared(redis_client_); gcs_table_storage_ = std::make_shared(redis_client_); gcs_resource_manager_ = std::make_shared(); - gcs_node_manager_ = - std::make_shared(io_service_, io_service_, gcs_pub_sub_, - gcs_table_storage_, gcs_resource_manager_); + gcs_node_manager_ = std::make_shared( + io_service_, gcs_pub_sub_, gcs_table_storage_, gcs_resource_manager_); store_client_ = std::make_shared(io_service_); gcs_actor_table_ = std::make_shared(store_client_); diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc index 17ea70e8d..480716b10 100644 --- a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc @@ -34,8 +34,8 @@ class GcsNodeManagerTest : public ::testing::Test { TEST_F(GcsNodeManagerTest, TestManagement) { boost::asio::io_service io_service; - gcs::GcsNodeManager node_manager(io_service, io_service, gcs_pub_sub_, - gcs_table_storage_, gcs_resource_manager_); + gcs::GcsNodeManager node_manager(io_service, gcs_pub_sub_, gcs_table_storage_, + gcs_resource_manager_); // Test Add/Get/Remove functionality. auto node = Mocker::GenNodeInfo(); auto node_id = NodeID::FromBinary(node->node_id()); @@ -49,8 +49,8 @@ TEST_F(GcsNodeManagerTest, TestManagement) { TEST_F(GcsNodeManagerTest, TestListener) { boost::asio::io_service io_service; - gcs::GcsNodeManager node_manager(io_service, io_service, gcs_pub_sub_, - gcs_table_storage_, gcs_resource_manager_); + gcs::GcsNodeManager node_manager(io_service, gcs_pub_sub_, gcs_table_storage_, + gcs_resource_manager_); // Test AddNodeAddedListener. int node_count = 1000; std::vector> added_nodes; diff --git a/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc index 6d4484c7d..9ec639c84 100644 --- a/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc @@ -55,9 +55,8 @@ class GcsObjectManagerTest : public ::testing::Test { void SetUp() override { gcs_table_storage_ = std::make_shared(io_service_); gcs_resource_manager_ = std::make_shared(); - gcs_node_manager_ = - std::make_shared(io_service_, io_service_, gcs_pub_sub_, - gcs_table_storage_, gcs_resource_manager_); + gcs_node_manager_ = std::make_shared( + io_service_, gcs_pub_sub_, gcs_table_storage_, gcs_resource_manager_); gcs_object_manager_ = std::make_shared( gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_); GenTestData(); diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc index 80b83ea81..dbb4860c5 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -69,9 +69,8 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { gcs_pub_sub_ = std::make_shared(redis_client_); gcs_table_storage_ = std::make_shared(io_service_); gcs_resource_manager_ = std::make_shared(); - gcs_node_manager_ = - std::make_shared(io_service_, io_service_, gcs_pub_sub_, - gcs_table_storage_, gcs_resource_manager_); + gcs_node_manager_ = std::make_shared( + io_service_, gcs_pub_sub_, gcs_table_storage_, gcs_resource_manager_); gcs_placement_group_manager_.reset( new gcs::GcsPlacementGroupManager(io_service_, mock_placement_group_scheduler_, gcs_table_storage_, *gcs_node_manager_)); diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index c5742eca2..603f8d45c 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -40,9 +40,8 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { gcs_table_storage_ = std::make_shared(io_service_); gcs_pub_sub_ = std::make_shared(redis_client_); gcs_resource_manager_ = std::make_shared(); - gcs_node_manager_ = - std::make_shared(io_service_, io_service_, gcs_pub_sub_, - gcs_table_storage_, gcs_resource_manager_); + gcs_node_manager_ = std::make_shared( + io_service_, gcs_pub_sub_, gcs_table_storage_, gcs_resource_manager_); gcs_table_storage_ = std::make_shared(io_service_); store_client_ = std::make_shared(io_service_); raylet_client_pool_ = std::make_shared( @@ -99,11 +98,11 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { void AddNode(const std::shared_ptr &node, int cpu_num = 10) { gcs_node_manager_->AddNode(node); - rpc::HeartbeatTableData heartbeat; - heartbeat.set_node_id(node->node_id()); - (*heartbeat.mutable_resources_available())["CPU"] = cpu_num; + rpc::ResourcesData resource; + resource.set_node_id(node->node_id()); + (*resource.mutable_resources_available())["CPU"] = cpu_num; gcs_node_manager_->UpdateNodeRealtimeResources(NodeID::FromBinary(node->node_id()), - heartbeat); + resource); } void ScheduleFailedWithZeroNodeTest(rpc::PlacementStrategy strategy) { diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index 117d9b63e..4511bc722 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -394,8 +394,8 @@ struct GcsServerMocker { return Status::NotImplemented(""); } - Status AsyncSubscribeBatchHeartbeat( - const gcs::ItemCallback &subscribe, + Status AsyncSubscribeBatchedResourceUsage( + const gcs::ItemCallback &subscribe, const gcs::StatusCallback &done) override { return Status::NotImplemented(""); } diff --git a/src/ray/gcs/pubsub/gcs_pub_sub.h b/src/ray/gcs/pubsub/gcs_pub_sub.h index 19a0923ac..06748dffb 100644 --- a/src/ray/gcs/pubsub/gcs_pub_sub.h +++ b/src/ray/gcs/pubsub/gcs_pub_sub.h @@ -32,7 +32,7 @@ namespace gcs { #define OBJECT_CHANNEL "OBJECT" #define TASK_CHANNEL "TASK" #define TASK_LEASE_CHANNEL "TASK_LEASE" -#define HEARTBEAT_BATCH_CHANNEL "HEARTBEAT_BATCH" +#define RESOURCES_BATCH_CHANNEL "RESOURCES_BATCH" #define ERROR_INFO_CHANNEL "ERROR_INFO" /// \class GcsPubSub diff --git a/src/ray/gcs/redis_accessor.cc b/src/ray/gcs/redis_accessor.cc index f750c4b66..24ba0ac06 100644 --- a/src/ray/gcs/redis_accessor.cc +++ b/src/ray/gcs/redis_accessor.cc @@ -422,7 +422,7 @@ Status RedisObjectInfoAccessor::AsyncUnsubscribeToLocations(const ObjectID &obje RedisNodeInfoAccessor::RedisNodeInfoAccessor(RedisGcsClient *client_impl) : client_impl_(client_impl), resource_sub_executor_(client_impl_->resource_table()), - heartbeat_batch_sub_executor_(client_impl->heartbeat_batch_table()) {} + resource_usage_batch_sub_executor_(client_impl->resource_usage_batch_table()) {} Status RedisNodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_info, const StatusCallback &callback) { @@ -530,18 +530,23 @@ Status RedisNodeInfoAccessor::AsyncReportHeartbeat( return heartbeat_table.Add(JobID::Nil(), node_id, data_ptr, on_done); } -void RedisNodeInfoAccessor::AsyncReReportHeartbeat() {} +Status RedisNodeInfoAccessor::AsyncReportResourceUsage( + const std::shared_ptr &data_ptr, const StatusCallback &callback) { + return Status::Invalid("Not implemented"); +} -Status RedisNodeInfoAccessor::AsyncSubscribeBatchHeartbeat( - const ItemCallback &subscribe, const StatusCallback &done) { +void RedisNodeInfoAccessor::AsyncReReportResourceUsage() {} + +Status RedisNodeInfoAccessor::AsyncSubscribeBatchedResourceUsage( + const ItemCallback &subscribe, const StatusCallback &done) { RAY_CHECK(subscribe != nullptr); auto on_subscribe = [subscribe](const NodeID &node_id, - const HeartbeatBatchTableData &data) { + const ResourceUsageBatchData &data) { subscribe(data); }; - return heartbeat_batch_sub_executor_.AsyncSubscribeAll(NodeID::Nil(), on_subscribe, - done); + return resource_usage_batch_sub_executor_.AsyncSubscribeAll(NodeID::Nil(), on_subscribe, + done); } Status RedisNodeInfoAccessor::AsyncGetResources( diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index 542e6affb..682f06e5c 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -347,15 +347,18 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor { Status AsyncReportHeartbeat(const std::shared_ptr &data_ptr, const StatusCallback &callback) override; - void AsyncReReportHeartbeat() override; + Status AsyncReportResourceUsage(const std::shared_ptr &data_ptr, + const StatusCallback &callback) override; - Status AsyncGetAllHeartbeat( - const ItemCallback &callback) override { - return Status::NotImplemented("AsyncGetAllHeartbeat not implemented"); + void AsyncReReportResourceUsage() override; + + Status AsyncGetAllResourceUsage( + const ItemCallback &callback) override { + return Status::NotImplemented("AsyncGetAllResourceUsage not implemented"); } - Status AsyncSubscribeBatchHeartbeat( - const ItemCallback &subscribe, + Status AsyncSubscribeBatchedResourceUsage( + const ItemCallback &subscribe, const StatusCallback &done) override; void AsyncResubscribe(bool is_pubsub_server_restarted) override {} @@ -378,9 +381,9 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor { DynamicResourceSubscriptionExecutor; DynamicResourceSubscriptionExecutor resource_sub_executor_; - typedef SubscriptionExecutor + typedef SubscriptionExecutor HeartbeatBatchSubscriptionExecutor; - HeartbeatBatchSubscriptionExecutor heartbeat_batch_sub_executor_; + HeartbeatBatchSubscriptionExecutor resource_usage_batch_sub_executor_; }; /// \class RedisErrorInfoAccessor diff --git a/src/ray/gcs/redis_gcs_client.cc b/src/ray/gcs/redis_gcs_client.cc index 35fd0506e..20ef35b5e 100644 --- a/src/ray/gcs/redis_gcs_client.cc +++ b/src/ray/gcs/redis_gcs_client.cc @@ -55,7 +55,7 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) { node_table_.reset(new NodeTable({primary_context}, this)); job_table_.reset(new JobTable({primary_context}, this)); - heartbeat_batch_table_.reset(new HeartbeatBatchTable({primary_context}, this)); + resource_usage_batch_table_.reset(new ResourceUsageBatchTable({primary_context}, this)); // Tables below would be sharded. object_table_.reset(new ObjectTable(shard_contexts, this)); raylet_task_table_.reset(new raylet::TaskTable(shard_contexts, this, command_type_)); @@ -128,8 +128,8 @@ NodeTable &RedisGcsClient::node_table() { return *node_table_; } HeartbeatTable &RedisGcsClient::heartbeat_table() { return *heartbeat_table_; } -HeartbeatBatchTable &RedisGcsClient::heartbeat_batch_table() { - return *heartbeat_batch_table_; +ResourceUsageBatchTable &RedisGcsClient::resource_usage_batch_table() { + return *resource_usage_batch_table_; } JobTable &RedisGcsClient::job_table() { return *job_table_; } diff --git a/src/ray/gcs/redis_gcs_client.h b/src/ray/gcs/redis_gcs_client.h index 7d14826ba..748b1da72 100644 --- a/src/ray/gcs/redis_gcs_client.h +++ b/src/ray/gcs/redis_gcs_client.h @@ -93,7 +93,7 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { /// Implements the Nodes() interface. NodeTable &node_table(); HeartbeatTable &heartbeat_table(); - HeartbeatBatchTable &heartbeat_batch_table(); + ResourceUsageBatchTable &resource_usage_batch_table(); DynamicResourceTable &resource_table(); /// Implements the Tasks() interface. virtual raylet::TaskTable &raylet_task_table(); @@ -118,7 +118,7 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { std::unique_ptr task_reconstruction_log_; std::unique_ptr task_lease_table_; std::unique_ptr heartbeat_table_; - std::unique_ptr heartbeat_batch_table_; + std::unique_ptr resource_usage_batch_table_; std::unique_ptr profile_table_; std::unique_ptr node_table_; std::unique_ptr resource_table_; diff --git a/src/ray/gcs/subscription_executor.cc b/src/ray/gcs/subscription_executor.cc index 4d90d3aac..d9617985a 100644 --- a/src/ray/gcs/subscription_executor.cc +++ b/src/ray/gcs/subscription_executor.cc @@ -206,7 +206,8 @@ template class SubscriptionExecutor, TaskLeaseTable>; template class SubscriptionExecutor; -template class SubscriptionExecutor; +template class SubscriptionExecutor; template class SubscriptionExecutor; } // namespace gcs diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 17ad126fd..2017d05de 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -829,12 +829,12 @@ template class Log; template class Log; template class Table; template class Table; -template class Table; +template class Table; template class Log; template class Log; template class Log; template class Log; -template class Log; +template class Log; template class Log; template class Table; template class Table; diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 11683e6f4..c7c647162 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -39,12 +39,12 @@ using rpc::ErrorTableData; using rpc::GcsChangeMode; using rpc::GcsEntry; using rpc::GcsNodeInfo; -using rpc::HeartbeatBatchTableData; using rpc::HeartbeatTableData; using rpc::JobTableData; using rpc::ObjectTableData; using rpc::ProfileTableData; using rpc::ResourceTableData; +using rpc::ResourceUsageBatchData; using rpc::TablePrefix; using rpc::TablePubsub; using rpc::TaskLeaseData; @@ -688,15 +688,15 @@ class HeartbeatTable : public Table { virtual ~HeartbeatTable() {} }; -class HeartbeatBatchTable : public Table { +class ResourceUsageBatchTable : public Table { public: - HeartbeatBatchTable(const std::vector> &contexts, - RedisGcsClient *client) + ResourceUsageBatchTable(const std::vector> &contexts, + RedisGcsClient *client) : Table(contexts, client) { - pubsub_channel_ = TablePubsub::HEARTBEAT_BATCH_PUBSUB; - prefix_ = TablePrefix::HEARTBEAT_BATCH; + pubsub_channel_ = TablePubsub::RESOURCE_USAGE_BATCH_PUBSUB; + prefix_ = TablePrefix::RESOURCE_USAGE_BATCH; } - virtual ~HeartbeatBatchTable() {} + virtual ~ResourceUsageBatchTable() {} }; class JobTable : public Log { diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index f24eaf8af..fe2511bd0 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -32,7 +32,7 @@ enum TablePrefix { FUNCTION = 7; TASK_RECONSTRUCTION = 8; HEARTBEAT = 9; - HEARTBEAT_BATCH = 10; + RESOURCE_USAGE_BATCH = 10; JOB = 11; PROFILE = 12; TASK_LEASE = 13; @@ -56,7 +56,7 @@ enum TablePubsub { OBJECT_PUBSUB = 5; ACTOR_PUBSUB = 6; HEARTBEAT_PUBSUB = 7; - HEARTBEAT_BATCH_PUBSUB = 8; + RESOURCE_USAGE_BATCH_PUBSUB = 8; TASK_LEASE_PUBSUB = 9; JOB_PUBSUB = 10; NODE_RESOURCE_PUBSUB = 11; @@ -302,6 +302,11 @@ message PlacementGroupLoad { message HeartbeatTableData { // Node id. bytes node_id = 1; +} + +message ResourcesData { + // Node id. + bytes node_id = 1; // Resource capacity currently available on this node manager. map resources_available = 2; // Indicates whether available resources is changed. Only used when light @@ -320,29 +325,8 @@ message HeartbeatTableData { bool should_global_gc = 8; } -message ResourcesData { - // Node id. - bytes node_id = 1; - // Resource capacity currently available on this node manager. - map resources_available = 2; - // Indicates whether avaialbe resources is changed. Only used when - // light heartbeat enabled. - bool resources_available_changed = 3; - // Total resource capacity configured for this node manager. - map resources_total = 4; - // Aggregate outstanding resource load on this node manager. - map resource_load = 5; - // Indicates whether resource load is changed. Only used when - // light heartbeat enabled. - bool resource_load_changed = 6; - // The resource load on this node, sorted by resource shape. - ResourceLoad resource_load_by_shape = 7; - // Whether this node manager is requesting global GC. - bool should_global_gc = 8; -} - -message HeartbeatBatchTableData { - repeated HeartbeatTableData batch = 1; +message ResourceUsageBatchData { + repeated ResourcesData batch = 1; // The total resource demand on all nodes included in the batch, sorted by // resource shape. ResourceLoad resource_load_by_shape = 2; diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 8f226546b..11d847a61 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -164,14 +164,6 @@ message ReportHeartbeatReply { GcsStatus status = 1; } -message GetAllHeartbeatRequest { -} - -message GetAllHeartbeatReply { - GcsStatus status = 1; - HeartbeatBatchTableData heartbeat_data = 2; -} - message ReportResourceUsageRequest { ResourcesData resources = 1; } @@ -185,7 +177,7 @@ message GetAllResourceUsageRequest { message GetAllResourceUsageReply { GcsStatus status = 1; - repeated ResourcesData resources_list = 2; + ResourceUsageBatchData resource_usage_data = 2; } message GetResourcesRequest { @@ -247,11 +239,7 @@ service NodeInfoGcsService { rpc UnregisterNode(UnregisterNodeRequest) returns (UnregisterNodeReply); // Get information of all nodes from GCS Service. rpc GetAllNodeInfo(GetAllNodeInfoRequest) returns (GetAllNodeInfoReply); - // Report heartbeat of a node to GCS Service. - rpc ReportHeartbeat(ReportHeartbeatRequest) returns (ReportHeartbeatReply); // Get newest heartbeat of all nodes from GCS Service. - rpc GetAllHeartbeat(GetAllHeartbeatRequest) returns (GetAllHeartbeatReply); - // Report resource usage of a node to GCS Service. rpc ReportResourceUsage(ReportResourceUsageRequest) returns (ReportResourceUsageReply); // Get resource usage of all nodes from GCS Service. rpc GetAllResourceUsage(GetAllResourceUsageRequest) returns (GetAllResourceUsageReply); @@ -270,6 +258,12 @@ service NodeInfoGcsService { returns (GetAllAvailableResourcesReply); } +// Service for heartbeat info access. +service HeartbeatInfoGcsService { + // Report heartbeat of a node to GCS Service. + rpc ReportHeartbeat(ReportHeartbeatRequest) returns (ReportHeartbeatReply); +} + message GetObjectLocationsRequest { // The ID of object to lookup in GCS Service. bytes object_id = 1; diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 60de24705..4f80d5ca3 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -214,6 +214,8 @@ int main(int argc, char *argv[]) { node_manager_config.heartbeat_period_ms = RayConfig::instance().raylet_heartbeat_timeout_milliseconds(); + node_manager_config.report_resources_period_ms = + RayConfig::instance().raylet_report_resources_period_milliseconds(); node_manager_config.debug_dump_period_ms = RayConfig::instance().debug_dump_period_milliseconds(); node_manager_config.record_metrics_period_ms = diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 4c43d3595..84df06cc4 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -124,12 +124,16 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self object_directory_(object_directory), heartbeat_timer_(io_service), heartbeat_period_(std::chrono::milliseconds(config.heartbeat_period_ms)), + report_resources_timer_(io_service), + report_resources_period_( + std::chrono::milliseconds(config.report_resources_period_ms)), debug_dump_period_(config.debug_dump_period_ms), fair_queueing_enabled_(config.fair_queueing_enabled), object_pinning_enabled_(config.object_pinning_enabled), temp_dir_(config.temp_dir), object_manager_profile_timer_(io_service), - light_heartbeat_enabled_(RayConfig::instance().light_heartbeat_enabled()), + light_report_resource_usage_enabled_( + RayConfig::instance().light_report_resource_usage_enabled()), initial_config_(config), local_available_resources_(config.resource_config), worker_pool_( @@ -268,13 +272,13 @@ ray::Status NodeManager::RegisterGcs() { RAY_RETURN_NOT_OK( gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, on_done)); - // Subscribe to heartbeat batches from the monitor. - const auto &heartbeat_batch_added = - [this](const HeartbeatBatchTableData &heartbeat_batch) { - HeartbeatBatchAdded(heartbeat_batch); + // Subscribe to resource usage batches from the monitor. + const auto &resource_usage_batch_added = + [this](const ResourceUsageBatchData &resource_usage_batch) { + ResourceUsageBatchAdded(resource_usage_batch); }; - RAY_RETURN_NOT_OK(gcs_client_->Nodes().AsyncSubscribeBatchHeartbeat( - heartbeat_batch_added, /*done*/ nullptr)); + RAY_RETURN_NOT_OK(gcs_client_->Nodes().AsyncSubscribeBatchedResourceUsage( + resource_usage_batch_added, /*done*/ nullptr)); // Subscribe to all unexpected failure notifications from the local and // remote raylets. Note that this does not include workers that failed due to @@ -303,6 +307,7 @@ ray::Status NodeManager::RegisterGcs() { last_heartbeat_at_ms_ = current_time_ms(); last_debug_dump_at_ms_ = current_time_ms(); Heartbeat(); + ReportResourceUsage(); // Start the timer that gets object manager profiling information and sends it // to the GCS. GetObjectManagerProfileInfo(); @@ -402,97 +407,7 @@ void NodeManager::Heartbeat() { stats::HeartbeatReportMs.Record(interval); auto heartbeat_data = std::make_shared(); - SchedulingResources &local_resources = cluster_resource_map_[self_node_id_]; heartbeat_data->set_node_id(self_node_id_.Binary()); - - if (new_scheduler_enabled_) { - new_resource_scheduler_->Heartbeat(light_heartbeat_enabled_, heartbeat_data); - cluster_task_manager_->Heartbeat(light_heartbeat_enabled_, heartbeat_data); - } else { - // TODO(atumanov): modify the heartbeat table protocol to use the ResourceSet - // directly. - // TODO(atumanov): implement a ResourceSet const_iterator. - // If light heartbeat enabled, we only set filed that represent resources changed. - if (light_heartbeat_enabled_) { - auto last_heartbeat_resources = gcs_client_->Nodes().GetLastHeartbeatResources(); - if (!last_heartbeat_resources->GetTotalResources().IsEqual( - local_resources.GetTotalResources())) { - for (const auto &resource_pair : - local_resources.GetTotalResources().GetResourceMap()) { - (*heartbeat_data->mutable_resources_total())[resource_pair.first] = - resource_pair.second; - } - last_heartbeat_resources->SetTotalResources( - ResourceSet(local_resources.GetTotalResources())); - } - - if (!last_heartbeat_resources->GetAvailableResources().IsEqual( - local_resources.GetAvailableResources())) { - heartbeat_data->set_resources_available_changed(true); - for (const auto &resource_pair : - local_resources.GetAvailableResources().GetResourceMap()) { - (*heartbeat_data->mutable_resources_available())[resource_pair.first] = - resource_pair.second; - } - last_heartbeat_resources->SetAvailableResources( - ResourceSet(local_resources.GetAvailableResources())); - } - - local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); - if (!last_heartbeat_resources->GetLoadResources().IsEqual( - local_resources.GetLoadResources())) { - heartbeat_data->set_resource_load_changed(true); - for (const auto &resource_pair : - local_resources.GetLoadResources().GetResourceMap()) { - (*heartbeat_data->mutable_resource_load())[resource_pair.first] = - resource_pair.second; - } - last_heartbeat_resources->SetLoadResources( - ResourceSet(local_resources.GetLoadResources())); - } - } else { - // If light heartbeat disabled, we send whole resources information every time. - for (const auto &resource_pair : - local_resources.GetTotalResources().GetResourceMap()) { - (*heartbeat_data->mutable_resources_total())[resource_pair.first] = - resource_pair.second; - } - - for (const auto &resource_pair : - local_resources.GetAvailableResources().GetResourceMap()) { - (*heartbeat_data->mutable_resources_available())[resource_pair.first] = - resource_pair.second; - } - - local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); - for (const auto &resource_pair : - local_resources.GetLoadResources().GetResourceMap()) { - (*heartbeat_data->mutable_resource_load())[resource_pair.first] = - resource_pair.second; - } - } - } - - if (!new_scheduler_enabled_) { - // Add resource load by shape. This will be used by the new autoscaler. - auto resource_load = local_queues_.GetResourceLoadByShape( - RayConfig::instance().max_resource_shapes_per_load_report()); - heartbeat_data->mutable_resource_load_by_shape()->Swap(&resource_load); - } - - // Set the global gc bit on the outgoing heartbeat message. - if (should_global_gc_) { - heartbeat_data->set_should_global_gc(true); - should_global_gc_ = false; - } - - // Trigger local GC if needed. This throttles the frequency of local GC calls - // to at most once per heartbeat interval. - if (should_local_gc_) { - DoLocalGC(); - should_local_gc_ = false; - } - RAY_CHECK_OK( gcs_client_->Nodes().AsyncReportHeartbeat(heartbeat_data, /*done*/ nullptr)); @@ -521,6 +436,118 @@ void NodeManager::Heartbeat() { }); } +void NodeManager::ReportResourceUsage() { + auto resources_data = std::make_shared(); + SchedulingResources &local_resources = cluster_resource_map_[self_node_id_]; + resources_data->set_node_id(self_node_id_.Binary()); + + if (new_scheduler_enabled_) { + new_resource_scheduler_->FillResourceUsage(light_report_resource_usage_enabled_, + resources_data); + cluster_task_manager_->FillResourceUsage(light_report_resource_usage_enabled_, + resources_data); + } else { + // TODO(atumanov): modify the heartbeat table protocol to use the ResourceSet + // directly. + // TODO(atumanov): implement a ResourceSet const_iterator. + // If light resource usage report enabled, we only set filed that represent resources + // changed. + if (light_report_resource_usage_enabled_) { + auto last_heartbeat_resources = gcs_client_->Nodes().GetLastResourceUsage(); + if (!last_heartbeat_resources->GetTotalResources().IsEqual( + local_resources.GetTotalResources())) { + for (const auto &resource_pair : + local_resources.GetTotalResources().GetResourceMap()) { + (*resources_data->mutable_resources_total())[resource_pair.first] = + resource_pair.second; + } + last_heartbeat_resources->SetTotalResources( + ResourceSet(local_resources.GetTotalResources())); + } + + if (!last_heartbeat_resources->GetAvailableResources().IsEqual( + local_resources.GetAvailableResources())) { + resources_data->set_resources_available_changed(true); + for (const auto &resource_pair : + local_resources.GetAvailableResources().GetResourceMap()) { + (*resources_data->mutable_resources_available())[resource_pair.first] = + resource_pair.second; + } + last_heartbeat_resources->SetAvailableResources( + ResourceSet(local_resources.GetAvailableResources())); + } + + local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); + if (!last_heartbeat_resources->GetLoadResources().IsEqual( + local_resources.GetLoadResources())) { + resources_data->set_resource_load_changed(true); + for (const auto &resource_pair : + local_resources.GetLoadResources().GetResourceMap()) { + (*resources_data->mutable_resource_load())[resource_pair.first] = + resource_pair.second; + } + last_heartbeat_resources->SetLoadResources( + ResourceSet(local_resources.GetLoadResources())); + } + } else { + // If light resource usage report disabled, we send whole resources information + // every time. + for (const auto &resource_pair : + local_resources.GetTotalResources().GetResourceMap()) { + (*resources_data->mutable_resources_total())[resource_pair.first] = + resource_pair.second; + } + + for (const auto &resource_pair : + local_resources.GetAvailableResources().GetResourceMap()) { + (*resources_data->mutable_resources_available())[resource_pair.first] = + resource_pair.second; + } + + local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); + for (const auto &resource_pair : + local_resources.GetLoadResources().GetResourceMap()) { + (*resources_data->mutable_resource_load())[resource_pair.first] = + resource_pair.second; + } + } + } + + if (!new_scheduler_enabled_) { + // Add resource load by shape. This will be used by the new autoscaler. + auto resource_load = local_queues_.GetResourceLoadByShape( + RayConfig::instance().max_resource_shapes_per_load_report()); + resources_data->mutable_resource_load_by_shape()->Swap(&resource_load); + } + + // Set the global gc bit on the outgoing heartbeat message. + if (should_global_gc_) { + resources_data->set_should_global_gc(true); + should_global_gc_ = false; + } + + // Trigger local GC if needed. This throttles the frequency of local GC calls + // to at most once per heartbeat interval. + if (should_local_gc_) { + DoLocalGC(); + should_local_gc_ = false; + } + + if (resources_data->resources_total_size() > 0 || + resources_data->resources_available_changed() || + resources_data->resource_load_changed() || resources_data->should_global_gc()) { + RAY_CHECK_OK( + gcs_client_->Nodes().AsyncReportResourceUsage(resources_data, /*done*/ nullptr)); + } + + // Reset the timer. + report_resources_timer_.expires_from_now(report_resources_period_); + report_resources_timer_.async_wait([this](const boost::system::error_code &error) { + RAY_CHECK(!error); + ReportResourceUsage(); + }); +} + void NodeManager::DoLocalGC() { auto all_workers = worker_pool_.GetAllRegisteredWorkers(); for (const auto &driver : worker_pool_.GetAllRegisteredDrivers()) { @@ -908,47 +935,47 @@ void NodeManager::TryLocalInfeasibleTaskScheduling() { } } -void NodeManager::HeartbeatAdded(const NodeID &node_id, - const HeartbeatTableData &heartbeat_data) { +void NodeManager::ResourceUsageAdded(const NodeID &node_id, + const rpc::ResourcesData &resource_data) { // Locate the node id in remote node table and update available resources based on - // the received heartbeat information. + // the received resource usage information. auto it = cluster_resource_map_.find(node_id); if (it == cluster_resource_map_.end()) { - // Haven't received the node registration for this node yet, skip this heartbeat. - RAY_LOG(INFO) << "[HeartbeatAdded]: received heartbeat from unknown node id " + // Haven't received the node registration for this node yet, skip this message. + RAY_LOG(INFO) << "[ResourceUsageAdded]: received resource usage from unknown node id " << node_id; return; } // Trigger local GC at the next heartbeat interval. - if (heartbeat_data.should_global_gc()) { + if (resource_data.should_global_gc()) { should_local_gc_ = true; } SchedulingResources &remote_resources = it->second; - // If light heartbeat enabled, we update remote resources only when related resources - // map in heartbeat is not empty. - if (light_heartbeat_enabled_) { - if (heartbeat_data.resources_total_size() > 0) { - ResourceSet remote_total(MapFromProtobuf(heartbeat_data.resources_total())); + // If light resource usage report enabled, we update remote resources only when related + // resources map in heartbeat is not empty. + if (light_report_resource_usage_enabled_) { + if (resource_data.resources_total_size() > 0) { + ResourceSet remote_total(MapFromProtobuf(resource_data.resources_total())); remote_resources.SetTotalResources(std::move(remote_total)); } - if (heartbeat_data.resources_available_changed()) { - ResourceSet remote_available(MapFromProtobuf(heartbeat_data.resources_available())); + if (resource_data.resources_available_changed()) { + ResourceSet remote_available(MapFromProtobuf(resource_data.resources_available())); remote_resources.SetAvailableResources(std::move(remote_available)); } - if (heartbeat_data.resource_load_changed()) { - ResourceSet remote_load(MapFromProtobuf(heartbeat_data.resource_load())); + if (resource_data.resource_load_changed()) { + ResourceSet remote_load(MapFromProtobuf(resource_data.resource_load())); // Extract the load information and save it locally. remote_resources.SetLoadResources(std::move(remote_load)); } } else { - // If light heartbeat disabled, we update remote resources every time. - ResourceSet remote_total(MapFromProtobuf(heartbeat_data.resources_total())); + // If light resource usage report disabled, we update remote resources every time. + ResourceSet remote_total(MapFromProtobuf(resource_data.resources_total())); remote_resources.SetTotalResources(std::move(remote_total)); - ResourceSet remote_available(MapFromProtobuf(heartbeat_data.resources_available())); + ResourceSet remote_available(MapFromProtobuf(resource_data.resources_available())); remote_resources.SetAvailableResources(std::move(remote_available)); - ResourceSet remote_load(MapFromProtobuf(heartbeat_data.resource_load())); + ResourceSet remote_load(MapFromProtobuf(resource_data.resource_load())); // Extract the load information and save it locally. remote_resources.SetLoadResources(std::move(remote_load)); } @@ -987,15 +1014,16 @@ void NodeManager::HeartbeatAdded(const NodeID &node_id, } } -void NodeManager::HeartbeatBatchAdded(const HeartbeatBatchTableData &heartbeat_batch) { - // Update load information provided by each heartbeat. - for (const auto &heartbeat_data : heartbeat_batch.batch()) { - const NodeID &node_id = NodeID::FromBinary(heartbeat_data.node_id()); +void NodeManager::ResourceUsageBatchAdded( + const ResourceUsageBatchData &resource_usage_batch) { + // Update load information provided by each message. + for (const auto &resource_usage : resource_usage_batch.batch()) { + const NodeID &node_id = NodeID::FromBinary(resource_usage.node_id()); if (node_id == self_node_id_) { - // Skip heartbeats from self. + // Skip messages from self. continue; } - HeartbeatAdded(node_id, heartbeat_data); + ResourceUsageAdded(node_id, resource_usage); } } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 428b5549e..810927dfa 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -49,9 +49,9 @@ namespace raylet { using rpc::ActorTableData; using rpc::ErrorType; using rpc::GcsNodeInfo; -using rpc::HeartbeatBatchTableData; using rpc::HeartbeatTableData; using rpc::JobTableData; +using rpc::ResourceUsageBatchData; struct NodeManagerConfig { /// The node's resource configuration. @@ -83,6 +83,8 @@ struct NodeManagerConfig { std::string agent_command; /// The time between heartbeats in milliseconds. uint64_t heartbeat_period_ms; + /// The time between reports resources in milliseconds. + uint64_t report_resources_period_ms; /// The time between debug dumps in milliseconds, or -1 to disable. uint64_t debug_dump_period_ms; /// Whether to enable fair queueing between task classes in raylet. @@ -218,6 +220,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Send heartbeats to the GCS. void Heartbeat(); + /// Report resource usage to the GCS. + void ReportResourceUsage(); + /// Write out debug state to a file. void DumpDebugState() const; @@ -230,16 +235,16 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \return Void. void GetObjectManagerProfileInfo(); - /// Handler for a heartbeat notification from the GCS. + /// Handler for a resource usage notification from the GCS. /// - /// \param id The ID of the node manager that sent the heartbeat. - /// \param data The heartbeat data including load information. + /// \param id The ID of the node manager that sent the resources data. + /// \param data The resources data including load information. /// \return Void. - void HeartbeatAdded(const NodeID &id, const HeartbeatTableData &data); - /// Handler for a heartbeat batch notification from the GCS + void ResourceUsageAdded(const NodeID &id, const rpc::ResourcesData &data); + /// Handler for a resource usage batch notification from the GCS /// - /// \param heartbeat_batch The batch of heartbeat data. - void HeartbeatBatchAdded(const HeartbeatBatchTableData &heartbeat_batch); + /// \param resource_usage_batch The batch of resource usage data. + void ResourceUsageBatchAdded(const ResourceUsageBatchData &resource_usage_batch); /// Methods for task scheduling. @@ -680,6 +685,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler { boost::asio::steady_timer heartbeat_timer_; /// The period used for the heartbeat timer. std::chrono::milliseconds heartbeat_period_; + /// The timer used to report resources. + boost::asio::steady_timer report_resources_timer_; + /// The period used for the resources report timer. + std::chrono::milliseconds report_resources_period_; /// The period between debug state dumps. int64_t debug_dump_period_; /// Whether to enable fair queueing between task classes in raylet. @@ -698,8 +707,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// The time that the last heartbeat was sent at. Used to make sure we are /// keeping up with heartbeats. uint64_t last_heartbeat_at_ms_; - /// Only the changed part will be included in heartbeat if this is true. - const bool light_heartbeat_enabled_; + /// Only the changed part will be included in resource usage if this is true. + const bool light_report_resource_usage_enabled_; /// The time that the last debug string was logged to the console. uint64_t last_debug_dump_at_ms_; /// The number of heartbeats that we should wait before sending the diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 97f00bdd7..70173c016 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -822,25 +822,26 @@ void ClusterResourceScheduler::FreeLocalTaskResources( UpdateLocalAvailableResourcesFromResourceInstances(); } -void ClusterResourceScheduler::Heartbeat( - bool light_heartbeat_enabled, std::shared_ptr heartbeat_data) { +void ClusterResourceScheduler::FillResourceUsage( + bool light_report_resource_usage_enabled, + std::shared_ptr resources_data) { NodeResources resources; RAY_CHECK(GetNodeResources(local_node_id_, &resources)) << "Error: Populating heartbeat failed. Please file a bug report: " "https://github.com/ray-project/ray/issues/new."; - if (!light_heartbeat_enabled || !last_report_resources_ || + if (!light_report_resource_usage_enabled || !last_report_resources_ || resources != *last_report_resources_.get()) { for (int i = 0; i < PredefinedResources_MAX; i++) { const auto &label = ResourceEnumToString((PredefinedResources)i); const auto &capacity = resources.predefined_resources[i]; if (capacity.available != 0) { - (*heartbeat_data->mutable_resources_available())[label] = + (*resources_data->mutable_resources_available())[label] = capacity.available.Double(); } if (capacity.total != 0) { - (*heartbeat_data->mutable_resources_total())[label] = capacity.total.Double(); + (*resources_data->mutable_resources_total())[label] = capacity.total.Double(); } } for (auto it = resources.custom_resources.begin(); @@ -849,20 +850,20 @@ void ClusterResourceScheduler::Heartbeat( const auto &capacity = it->second; const auto &label = string_to_int_map_.Get(custom_id); if (capacity.available != 0) { - (*heartbeat_data->mutable_resources_available())[label] = + (*resources_data->mutable_resources_available())[label] = capacity.available.Double(); } if (capacity.total != 0) { - (*heartbeat_data->mutable_resources_total())[label] = capacity.total.Double(); + (*resources_data->mutable_resources_total())[label] = capacity.total.Double(); } } - heartbeat_data->set_resources_available_changed(true); - if (light_heartbeat_enabled) { + resources_data->set_resources_available_changed(true); + if (light_report_resource_usage_enabled) { last_report_resources_.reset(new NodeResources(resources)); } } - if (light_heartbeat_enabled) { + if (light_report_resource_usage_enabled) { // Reset all local views for remote nodes. This is needed in case tasks that // we spilled back to a remote node were not actually scheduled on the // node. Then, the remote node's resource availability may not change and diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index 5984d444f..05686c4ae 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -345,10 +345,11 @@ class ClusterResourceScheduler { /// sending raylet <-> gcs heartbeats. In particular, this should fill in /// resources_available and resources_total. /// - /// \param light_heartbeat_enabled Only send changed fields if true. + /// \param light_report_resource_usage_enabled Only send changed fields if true. /// \param Output parameter. `resources_available` and `resources_total` are the only /// fields used. - void Heartbeat(bool light_heartbeat_enabled, std::shared_ptr data); + void FillResourceUsage(bool light_report_resource_usage_enabled, + std::shared_ptr resources_data); /// Return human-readable string for this scheduler state. std::string DebugString() const; diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc index 5de3172fc..a43ed2e4c 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc @@ -1042,8 +1042,8 @@ TEST_F(ClusterResourceSchedulerTest, HeartbeatTest) { cluster_resources.AddOrUpdateNode(12345, other_node_resources); { // Cluster is idle. - auto data = std::make_shared(); - cluster_resources.Heartbeat(false, data); + auto data = std::make_shared(); + cluster_resources.FillResourceUsage(false, data); auto available = data->resources_available(); auto total = data->resources_total(); @@ -1080,8 +1080,8 @@ TEST_F(ClusterResourceSchedulerTest, HeartbeatTest) { {"1", 0.1}, }); cluster_resources.AllocateLocalTaskResources(allocation_map, allocations); - auto data = std::make_shared(); - cluster_resources.Heartbeat(false, data); + auto data = std::make_shared(); + cluster_resources.FillResourceUsage(false, data); auto available = data->resources_available(); auto total = data->resources_total(); @@ -1102,32 +1102,32 @@ TEST_F(ClusterResourceSchedulerTest, HeartbeatTest) { } } -TEST_F(ClusterResourceSchedulerTest, TestLightHeartbeat) { +TEST_F(ClusterResourceSchedulerTest, TestLightResourceUsageReport) { std::unordered_map initial_resources({{"CPU", 1}}); ClusterResourceScheduler cluster_resources("local", initial_resources); - // Report heartbeat on initialization. - auto data = std::make_shared(); - cluster_resources.Heartbeat(true, data); + // Fill resource usage usage on initialization. + auto data = std::make_shared(); + cluster_resources.FillResourceUsage(true, data); ASSERT_RESOURCES_EQ(data, 1, 1); - // Don't report heartbeats if resource availability hasn't changed. + // Don't report resource usage if resource availability hasn't changed. for (int i = 0; i < 3; i++) { data->Clear(); - cluster_resources.Heartbeat(true, data); + cluster_resources.FillResourceUsage(true, data); ASSERT_RESOURCES_EMPTY(data); } - // Report heartbeat if resource availability has changed. + // Report resource usage if resource availability has changed. cluster_resources.AddOrUpdateNode("local", {{"CPU", 1.}}, {{"CPU", 0.}}); data->Clear(); - cluster_resources.Heartbeat(true, data); + cluster_resources.FillResourceUsage(true, data); ASSERT_RESOURCES_EQ(data, 0, 1); - // Don't report heartbeats if resource availability hasn't changed. + // Don't report resource usage if resource availability hasn't changed. for (int i = 0; i < 3; i++) { data->Clear(); - cluster_resources.Heartbeat(true, data); + cluster_resources.FillResourceUsage(true, data); ASSERT_RESOURCES_EMPTY(data); } } @@ -1145,20 +1145,20 @@ TEST_F(ClusterResourceSchedulerTest, TestDirtyLocalView) { ASSERT_TRUE(cluster_resources.AllocateLocalTaskResources(task_spec, task_allocation)); task_allocation = std::make_shared(); ASSERT_FALSE(cluster_resources.AllocateLocalTaskResources(task_spec, task_allocation)); - // View of local resources is not affected by heartbeats. - auto data = std::make_shared(); - cluster_resources.Heartbeat(true, data); + // View of local resources is not affected by resource usage report. + auto data = std::make_shared(); + cluster_resources.FillResourceUsage(true, data); ASSERT_FALSE(cluster_resources.AllocateLocalTaskResources(task_spec, task_allocation)); for (int num_slots_available = 0; num_slots_available <= 2; num_slots_available++) { // Remote node reports updated resource availability. cluster_resources.AddOrUpdateNode("remote", {{"CPU", 2.}}, {{"CPU", num_slots_available}}); - auto data = std::make_shared(); + auto data = std::make_shared(); int64_t t; for (int i = 0; i < 3; i++) { - // Heartbeat tick should reset the remote node's resources. - cluster_resources.Heartbeat(true, data); + // Resource usage report tick should reset the remote node's resources. + cluster_resources.FillResourceUsage(true, data); for (int j = 0; j < num_slots_available; j++) { ASSERT_EQ(cluster_resources.GetBestSchedulableNode(task_spec, false, &t), "remote"); diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 64e40b457..1e936254e 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -272,8 +272,9 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) { return false; } -void ClusterTaskManager::Heartbeat(bool light_heartbeat_enabled, - std::shared_ptr data) const { +void ClusterTaskManager::FillResourceUsage( + bool light_report_resource_usage_enabled, + std::shared_ptr data) const { // TODO (WangTao): Find a way to check if load changed and combine it with light // heartbeat. Now we just report it every time. data->set_resource_load_changed(true); diff --git a/src/ray/raylet/scheduling/cluster_task_manager.h b/src/ray/raylet/scheduling/cluster_task_manager.h index c5dba73be..5aa2a53fb 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.h +++ b/src/ray/raylet/scheduling/cluster_task_manager.h @@ -107,11 +107,11 @@ class ClusterTaskManager { /// sending raylet <-> gcs heartbeats. In particular, this should fill in /// resource_load and resource_load_by_shape. /// - /// \param light_heartbeat_enabled Only send changed fields if true. + /// \param light_report_resource_usage_enabled Only send changed fields if true. /// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only /// fields used. - void Heartbeat(bool light_heartbeat_enabled, - std::shared_ptr data) const; + void FillResourceUsage(bool light_report_resource_usage_enabled, + std::shared_ptr data) const; std::string DebugString() const; diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 9c046630b..2e9f54705 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -437,8 +437,8 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) { } { - auto data = std::make_shared(); - task_manager_.Heartbeat(false, data); + auto data = std::make_shared(); + task_manager_.FillResourceUsage(false, data); auto load_by_shape = data->mutable_resource_load_by_shape()->mutable_resource_demands(); diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index 4b3799ea3..67a2abb15 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -97,6 +97,8 @@ class GcsRpcClient { new GrpcClient(address, port, client_call_manager)); node_info_grpc_client_ = std::unique_ptr>( new GrpcClient(address, port, client_call_manager)); + heartbeat_info_grpc_client_ = std::unique_ptr>( + new GrpcClient(address, port, client_call_manager)); object_info_grpc_client_ = std::unique_ptr>( new GrpcClient(address, port, client_call_manager)); task_info_grpc_client_ = std::unique_ptr>( @@ -155,15 +157,6 @@ class GcsRpcClient { /// Get information of all nodes from GCS Service. VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, GetAllNodeInfo, node_info_grpc_client_, ) - /// Report heartbeat of a node to GCS Service. - VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, ReportHeartbeat, - node_info_grpc_client_, ) - - /// Get newest heartbeat of all nodes from GCS Service. Only used when light heartbeat - /// enabled. - VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, GetAllHeartbeat, - node_info_grpc_client_, ) - /// Report resource usage of a node to GCS Service. VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, ReportResourceUsage, node_info_grpc_client_, ) @@ -195,6 +188,10 @@ class GcsRpcClient { VOID_GCS_RPC_CLIENT_METHOD(NodeInfoGcsService, GetAllAvailableResources, node_info_grpc_client_, ) + /// Report heartbeat of a node to GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(HeartbeatInfoGcsService, ReportHeartbeat, + heartbeat_info_grpc_client_, ) + /// Get object's locations from GCS Service. VOID_GCS_RPC_CLIENT_METHOD(ObjectInfoGcsService, GetObjectLocations, object_info_grpc_client_, ) @@ -278,6 +275,7 @@ class GcsRpcClient { std::unique_ptr> job_info_grpc_client_; std::unique_ptr> actor_info_grpc_client_; std::unique_ptr> node_info_grpc_client_; + std::unique_ptr> heartbeat_info_grpc_client_; std::unique_ptr> object_info_grpc_client_; std::unique_ptr> task_info_grpc_client_; std::unique_ptr> stats_grpc_client_; diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index efd48a023..c5c24c56f 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -30,6 +30,9 @@ namespace rpc { #define NODE_INFO_SERVICE_RPC_HANDLER(HANDLER) \ RPC_SERVICE_HANDLER(NodeInfoGcsService, HANDLER) +#define HEARTBEAT_INFO_SERVICE_RPC_HANDLER(HANDLER) \ + RPC_SERVICE_HANDLER(HeartbeatInfoGcsService, HANDLER) + #define OBJECT_INFO_SERVICE_RPC_HANDLER(HANDLER) \ RPC_SERVICE_HANDLER(ObjectInfoGcsService, HANDLER) @@ -177,14 +180,6 @@ class NodeInfoGcsServiceHandler { GetAllNodeInfoReply *reply, SendReplyCallback send_reply_callback) = 0; - virtual void HandleReportHeartbeat(const ReportHeartbeatRequest &request, - ReportHeartbeatReply *reply, - SendReplyCallback send_reply_callback) = 0; - - virtual void HandleGetAllHeartbeat(const GetAllHeartbeatRequest &request, - GetAllHeartbeatReply *reply, - SendReplyCallback send_reply_callback) = 0; - virtual void HandleReportResourceUsage(const ReportResourceUsageRequest &request, ReportResourceUsageReply *reply, SendReplyCallback send_reply_callback) = 0; @@ -238,8 +233,6 @@ class NodeInfoGrpcService : public GrpcService { NODE_INFO_SERVICE_RPC_HANDLER(RegisterNode); NODE_INFO_SERVICE_RPC_HANDLER(UnregisterNode); NODE_INFO_SERVICE_RPC_HANDLER(GetAllNodeInfo); - NODE_INFO_SERVICE_RPC_HANDLER(ReportHeartbeat); - NODE_INFO_SERVICE_RPC_HANDLER(GetAllHeartbeat); NODE_INFO_SERVICE_RPC_HANDLER(ReportResourceUsage); NODE_INFO_SERVICE_RPC_HANDLER(GetAllResourceUsage); NODE_INFO_SERVICE_RPC_HANDLER(GetResources); @@ -257,6 +250,38 @@ class NodeInfoGrpcService : public GrpcService { NodeInfoGcsServiceHandler &service_handler_; }; +class HeartbeatInfoGcsServiceHandler { + public: + virtual ~HeartbeatInfoGcsServiceHandler() = default; + virtual void HandleReportHeartbeat(const ReportHeartbeatRequest &request, + ReportHeartbeatReply *reply, + SendReplyCallback send_reply_callback) = 0; +}; +/// The `GrpcService` for `HeartbeatInfoGcsService`. +class HeartbeatInfoGrpcService : public GrpcService { + public: + /// Constructor. + /// + /// \param[in] handler The service handler that actually handle the requests. + explicit HeartbeatInfoGrpcService(boost::asio::io_service &io_service, + HeartbeatInfoGcsServiceHandler &handler) + : GrpcService(io_service), service_handler_(handler){}; + + protected: + grpc::Service &GetGrpcService() override { return service_; } + void InitServerCallFactories( + const std::unique_ptr &cq, + std::vector> *server_call_factories) override { + HEARTBEAT_INFO_SERVICE_RPC_HANDLER(ReportHeartbeat); + } + + private: + /// The grpc async service object. + HeartbeatInfoGcsService::AsyncService service_; + /// The service handler that actually handle the requests. + HeartbeatInfoGcsServiceHandler &service_handler_; +}; + class ObjectInfoGcsServiceHandler { public: virtual ~ObjectInfoGcsServiceHandler() = default; @@ -514,6 +539,7 @@ class PlacementGroupInfoGrpcService : public GrpcService { using JobInfoHandler = JobInfoGcsServiceHandler; using ActorInfoHandler = ActorInfoGcsServiceHandler; using NodeInfoHandler = NodeInfoGcsServiceHandler; +using HeartbeatInfoHandler = HeartbeatInfoGcsServiceHandler; using ObjectInfoHandler = ObjectInfoGcsServiceHandler; using TaskInfoHandler = TaskInfoGcsServiceHandler; using StatsHandler = StatsGcsServiceHandler;