From 0dcfa9ed6c7d43f37115a45edd3ee70fdb3fc077 Mon Sep 17 00:00:00 2001 From: Tao Wang Date: Thu, 1 Oct 2020 02:39:28 +0800 Subject: [PATCH] Add light heartbeat flag in python and use it in load metrics (#11032) --- .../ray/autoscaler/_private/load_metrics.py | 29 ++-- python/ray/includes/ray_config.pxd | 2 + python/ray/includes/ray_config.pxi | 4 + python/ray/monitor.py | 14 +- python/ray/tests/test_autoscaler.py | 125 ++++++++++++------ .../tests/test_resource_demand_scheduler.py | 14 +- 6 files changed, 131 insertions(+), 57 deletions(-) diff --git a/python/ray/autoscaler/_private/load_metrics.py b/python/ray/autoscaler/_private/load_metrics.py index 9d1799308..b43c602fc 100644 --- a/python/ray/autoscaler/_private/load_metrics.py +++ b/python/ray/autoscaler/_private/load_metrics.py @@ -30,30 +30,41 @@ class LoadMetrics: def update(self, ip, static_resources, + update_dynamic_resources, dynamic_resources, + update_resource_load, resource_load, waiting_bundles=None, infeasible_bundles=None): - self.resource_load_by_ip[ip] = resource_load - self.static_resources_by_ip[ip] = static_resources + # If light heartbeat enabled, only resources changed will be received. + # We should update the changed part and compare static_resources with + # dynamic_resources using those updated. + if ip not in self.static_resources_by_ip or len(static_resources) > 0: + self.static_resources_by_ip[ip] = static_resources + if ip not in self.dynamic_resources_by_ip or update_dynamic_resources: + self.dynamic_resources_by_ip[ip] = dynamic_resources + if ip not in self.resource_load_by_ip or update_resource_load: + self.resource_load_by_ip[ip] = resource_load + if not waiting_bundles: waiting_bundles = [] if not infeasible_bundles: infeasible_bundles = [] - # We are not guaranteed to have a corresponding dynamic resource for - # every static resource because dynamic resources are based on the - # available resources in the heartbeat, which does not exist if it is - # zero. Thus, we have to update dynamic resources here. - dynamic_resources_update = dynamic_resources.copy() - for resource_name, capacity in static_resources.items(): + # We are not guaranteed to have a corresponding dynamic resource + # for every static resource because dynamic resources are based on + # the available resources in the heartbeat, which does not exist + # if it is zero. Thus, we have to update dynamic resources here. + dynamic_resources_update = self.dynamic_resources_by_ip[ip].copy() + for resource_name, capacity in self.static_resources_by_ip[ip].items(): if resource_name not in dynamic_resources_update: dynamic_resources_update[resource_name] = 0.0 self.dynamic_resources_by_ip[ip] = dynamic_resources_update now = time.time() if ip not in self.last_used_time_by_ip or \ - static_resources != dynamic_resources: + self.static_resources_by_ip[ip] != \ + self.dynamic_resources_by_ip[ip]: self.last_used_time_by_ip[ip] = now self.last_heartbeat_time_by_ip[ip] = now self.waiting_bundles = waiting_bundles diff --git a/python/ray/includes/ray_config.pxd b/python/ray/includes/ray_config.pxd index 8748449b9..5316e5c00 100644 --- a/python/ray/includes/ray_config.pxd +++ b/python/ray/includes/ray_config.pxd @@ -15,6 +15,8 @@ cdef extern from "ray/common/ray_config.h" nogil: int64_t raylet_heartbeat_timeout_milliseconds() const + c_bool light_heartbeat_enabled() const + int64_t debug_dump_period_milliseconds() const int64_t num_heartbeats_timeout() const diff --git a/python/ray/includes/ray_config.pxi b/python/ray/includes/ray_config.pxi index 5aac8994a..0d910bccf 100644 --- a/python/ray/includes/ray_config.pxi +++ b/python/ray/includes/ray_config.pxi @@ -13,6 +13,10 @@ cdef class Config: def raylet_heartbeat_timeout_milliseconds(): return RayConfig.instance().raylet_heartbeat_timeout_milliseconds() + @staticmethod + def light_heartbeat_enabled(): + return RayConfig.instance().light_heartbeat_enabled() + @staticmethod def debug_dump_period_milliseconds(): return RayConfig.instance().debug_dump_period_milliseconds() diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 1445f9a5f..ed05ac913 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -47,6 +47,7 @@ class Monitor: # Keep a mapping from raylet client ID to IP address to use # for updating the load metrics. self.raylet_id_to_ip_map = {} + self.light_heartbeat_enabled = ray._config.light_heartbeat_enabled() self.load_metrics = LoadMetrics() if autoscaling_config: self.autoscaler = StandardAutoscaler(autoscaling_config, @@ -126,8 +127,6 @@ class Monitor: resource_load = dict(heartbeat_message.resource_load) total_resources = dict(heartbeat_message.resources_total) available_resources = dict(heartbeat_message.resources_available) - for resource in total_resources: - available_resources.setdefault(resource, 0.0) waiting_bundles, infeasible_bundles = \ self.parse_resource_demands(message.resource_load_by_shape) @@ -136,9 +135,14 @@ class Monitor: client_id = ray.utils.binary_to_hex(heartbeat_message.client_id) ip = self.raylet_id_to_ip_map.get(client_id) if ip: - self.load_metrics.update(ip, total_resources, - available_resources, resource_load, - waiting_bundles, infeasible_bundles) + update_available_resources = not self.light_heartbeat_enabled \ + or heartbeat_message.resources_available_changed() + update_resource_load = not self.light_heartbeat_enabled \ + or heartbeat_message.resource_load_changed() + self.load_metrics.update( + ip, total_resources, update_available_resources, + available_resources, update_resource_load, resource_load, + waiting_bundles, infeasible_bundles) else: logger.warning( f"Monitor: could not find ip for client {client_id}") diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 86a873712..e5d4a0fd4 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -241,61 +241,101 @@ SMALL_CLUSTER = { class LoadMetricsTest(unittest.TestCase): def testUpdate(self): lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {}) + lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 1}, True, {}) assert lm.approx_workers_used() == 0.5 - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {}) + lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 0}, True, {}) assert lm.approx_workers_used() == 1.0 - lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0}, {}) + lm.update("2.2.2.2", {"CPU": 2}, True, {"CPU": 0}, True, {}) assert lm.approx_workers_used() == 2.0 def testLoadMessages(self): lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {}) + lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 1}, True, {}) self.assertEqual(lm.approx_workers_used(), 0.5) - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {"CPU": 1}) + lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 1}, True, {"CPU": 1}) self.assertEqual(lm.approx_workers_used(), 1.0) # Both nodes count as busy since there is a queue on one. - lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 2}, {}) + lm.update("2.2.2.2", {"CPU": 2}, True, {"CPU": 2}, True, {}) self.assertEqual(lm.approx_workers_used(), 2.0) - lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0}, {}) + lm.update("2.2.2.2", {"CPU": 2}, True, {"CPU": 0}, True, {}) self.assertEqual(lm.approx_workers_used(), 2.0) - lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {}) + lm.update("2.2.2.2", {"CPU": 2}, True, {"CPU": 1}, True, {}) self.assertEqual(lm.approx_workers_used(), 2.0) # No queue anymore, so we're back to exact accounting. - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {}) + lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 0}, True, {}) self.assertEqual(lm.approx_workers_used(), 1.5) - lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {"GPU": 1}) + lm.update("2.2.2.2", {"CPU": 2}, True, {"CPU": 1}, True, {"GPU": 1}) self.assertEqual(lm.approx_workers_used(), 2.0) - lm.update("3.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) - lm.update("4.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) - lm.update("5.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) - lm.update("6.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) - lm.update("7.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) - lm.update("8.3.3.3", {"CPU": 2}, {"CPU": 1}, {}) + lm.update("3.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {}) + lm.update("4.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {}) + lm.update("5.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {}) + lm.update("6.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {}) + lm.update("7.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {}) + lm.update("8.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {}) self.assertEqual(lm.approx_workers_used(), 8.0) - lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {}) # no queue anymore + lm.update("2.2.2.2", {"CPU": 2}, True, {"CPU": 1}, True, + {}) # no queue anymore + self.assertEqual(lm.approx_workers_used(), 4.5) + + def testLoadMessagesWithLightHeartbeat(self): + lm = LoadMetrics() + lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 1}, True, {}) + self.assertEqual(lm.approx_workers_used(), 0.5) + lm.update("1.1.1.1", {}, False, {}, True, {"CPU": 1}) + self.assertEqual(lm.approx_workers_used(), 1.0) + + # Both nodes count as busy since there is a queue on one. + lm.update("2.2.2.2", {"CPU": 2}, True, {"CPU": 2}, True, {}) + self.assertEqual(lm.approx_workers_used(), 2.0) + lm.update("2.2.2.2", {}, True, {"CPU": 0}, False, {}) + self.assertEqual(lm.approx_workers_used(), 2.0) + lm.update("2.2.2.2", {}, True, {"CPU": 1}, False, {}) + self.assertEqual(lm.approx_workers_used(), 2.0) + + # No queue anymore, so we're back to exact accounting. + lm.update("1.1.1.1", {}, True, {"CPU": 0}, True, {}) + self.assertEqual(lm.approx_workers_used(), 1.5) + lm.update("2.2.2.2", {}, False, {}, True, {"GPU": 1}) + self.assertEqual(lm.approx_workers_used(), 2.0) + + lm.update("3.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {}) + lm.update("4.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {}) + lm.update("5.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {}) + lm.update("6.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {}) + lm.update("7.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {}) + lm.update("8.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {}) + self.assertEqual(lm.approx_workers_used(), 8.0) + + lm.update("2.2.2.2", {}, False, {"CPU": 1}, True, + {}) # no queue anymore self.assertEqual(lm.approx_workers_used(), 4.5) def testPruneByNodeIp(self): lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 1}, {"CPU": 0}, {}) - lm.update("2.2.2.2", {"CPU": 1}, {"CPU": 0}, {}) + lm.update("1.1.1.1", {"CPU": 1}, True, {"CPU": 0}, True, {}) + lm.update("2.2.2.2", {"CPU": 1}, True, {"CPU": 0}, True, {}) lm.prune_active_ips({"1.1.1.1", "4.4.4.4"}) assert lm.approx_workers_used() == 1.0 def testBottleneckResource(self): lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {}) - lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {}) + lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 0}, True, {}) + lm.update("2.2.2.2", { + "CPU": 2, + "GPU": 16 + }, True, { + "CPU": 2, + "GPU": 2 + }, True, {}) assert lm.approx_workers_used() == 1.88 def testHeartbeat(self): lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {}) + lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 1}, True, {}) lm.mark_active("2.2.2.2") assert "1.1.1.1" in lm.last_heartbeat_time_by_ip assert "2.2.2.2" in lm.last_heartbeat_time_by_ip @@ -303,15 +343,21 @@ class LoadMetricsTest(unittest.TestCase): def testDebugString(self): lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {}) - lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {}) + lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 0}, True, {}) + lm.update("2.2.2.2", { + "CPU": 2, + "GPU": 16 + }, True, { + "CPU": 2, + "GPU": 2 + }, True, {}) lm.update("3.3.3.3", { "memory": 20, "object_store_memory": 40 - }, { + }, True, { "memory": 0, "object_store_memory": 20 - }, {}) + }, True, {}) debug = lm.info_string() assert ("ResourceUsage: 2.0/4.0 CPU, 14.0/16.0 GPU, " "1.05 GiB/1.05 GiB memory, " @@ -599,8 +645,8 @@ class AutoscalingTest(unittest.TestCase): tag_filters={TAG_RAY_NODE_KIND: "worker"}, ) addrs += head_ip for addr in addrs: - lm.update(addr, {"CPU": 2}, {"CPU": 0}, {}) - lm.update(addr, {"CPU": 2}, {"CPU": 2}, {}) + lm.update(addr, {"CPU": 2}, True, {"CPU": 0}, True, {}) + lm.update(addr, {"CPU": 2}, True, {"CPU": 2}, True, {}) assert autoscaler.bringup autoscaler.update() @@ -609,7 +655,7 @@ class AutoscalingTest(unittest.TestCase): self.waitForNodes(1) # All of the nodes are down. Simulate some load on the head node - lm.update(head_ip, {"CPU": 2}, {"CPU": 0}, {}) + lm.update(head_ip, {"CPU": 2}, True, {"CPU": 0}, True, {}) autoscaler.update() self.waitForNodes(6) # expected due to batch sizes and concurrency @@ -652,11 +698,11 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(2) # This node has num_cpus=0 - lm.update(unmanaged_ip, {"CPU": 0}, {"CPU": 0}, {}) + lm.update(unmanaged_ip, {"CPU": 0}, True, {"CPU": 0}, True, {}) autoscaler.update() self.waitForNodes(2) # 1 CPU task cannot be scheduled. - lm.update(unmanaged_ip, {"CPU": 0}, {"CPU": 0}, {"CPU": 1}) + lm.update(unmanaged_ip, {"CPU": 0}, True, {"CPU": 0}, True, {"CPU": 1}) autoscaler.update() self.waitForNodes(3) @@ -946,17 +992,18 @@ class AutoscalingTest(unittest.TestCase): # Scales up as nodes are reported as used local_ip = services.get_node_ip_address() - lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head - lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}, {}) # worker 1 + lm.update(local_ip, {"CPU": 2}, True, {"CPU": 0}, True, {}) # head + lm.update("172.0.0.0", {"CPU": 2}, True, {"CPU": 0}, True, + {}) # worker 1 autoscaler.update() self.waitForNodes(3) - lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0}, {}) + lm.update("172.0.0.1", {"CPU": 2}, True, {"CPU": 0}, True, {}) autoscaler.update() self.waitForNodes(5) # Holds steady when load is removed - lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2}, {}) - lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}, {}) + lm.update("172.0.0.0", {"CPU": 2}, True, {"CPU": 2}, True, {}) + lm.update("172.0.0.1", {"CPU": 2}, True, {"CPU": 2}, True, {}) autoscaler.update() assert autoscaler.pending_launches.value == 0 assert len(self.provider.non_terminated_nodes({})) == 5 @@ -995,20 +1042,20 @@ class AutoscalingTest(unittest.TestCase): # Scales up as nodes are reported as used local_ip = services.get_node_ip_address() - lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head + lm.update(local_ip, {"CPU": 2}, True, {"CPU": 0}, True, {}) # head # 1.0 nodes used => target nodes = 2 => target workers = 1 autoscaler.update() self.waitForNodes(1) # Make new node idle, and never used. # Should hold steady as target is still 2. - lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0}, {}) + lm.update("172.0.0.0", {"CPU": 0}, True, {"CPU": 0}, True, {}) lm.last_used_time_by_ip["172.0.0.0"] = 0 autoscaler.update() assert len(self.provider.non_terminated_nodes({})) == 1 # Reduce load on head => target nodes = 1 => target workers = 0 - lm.update(local_ip, {"CPU": 2}, {"CPU": 1}, {}) + lm.update(local_ip, {"CPU": 2}, True, {"CPU": 1}, True, {}) autoscaler.update() assert len(self.provider.non_terminated_nodes({})) == 0 diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index 17569defd..1f310822e 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -324,7 +324,9 @@ class LoadMetricsTest(unittest.TestCase): def testResourceDemandVector(self): lm = LoadMetrics() lm.update( - "1.1.1.1", {"CPU": 2}, {"CPU": 1}, {}, + "1.1.1.1", {"CPU": 2}, + True, {"CPU": 1}, + True, {}, waiting_bundles=[{ "GPU": 1 }], @@ -492,14 +494,16 @@ class AutoscalingTest(unittest.TestCase): update_interval_s=0) autoscaler.update() self.waitForNodes(1) - lm.update(head_ip, {"CPU": 4, "GPU": 1}, {}, {}) + lm.update(head_ip, {"CPU": 4, "GPU": 1}, True, {}, True, {}) self.waitForNodes(1) lm.update( head_ip, { "CPU": 4, "GPU": 1 - }, {"GPU": 0}, {}, + }, + True, {"GPU": 0}, + True, {}, waiting_bundles=[{ "GPU": 1 }]) @@ -633,7 +637,9 @@ class AutoscalingTest(unittest.TestCase): self.waitForNodes(0) autoscaler.update() lm.update( - "1.2.3.4", {}, {}, {}, + "1.2.3.4", {}, + True, {}, + True, {}, waiting_bundles=[{ "GPU": 1 }],