mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 02:01:24 +08:00
Add light heartbeat flag in python and use it in load metrics (#11032)
This commit is contained in:
@@ -30,30 +30,41 @@ class LoadMetrics:
|
||||
def update(self,
|
||||
ip,
|
||||
static_resources,
|
||||
update_dynamic_resources,
|
||||
dynamic_resources,
|
||||
update_resource_load,
|
||||
resource_load,
|
||||
waiting_bundles=None,
|
||||
infeasible_bundles=None):
|
||||
self.resource_load_by_ip[ip] = resource_load
|
||||
self.static_resources_by_ip[ip] = static_resources
|
||||
# If light heartbeat enabled, only resources changed will be received.
|
||||
# We should update the changed part and compare static_resources with
|
||||
# dynamic_resources using those updated.
|
||||
if ip not in self.static_resources_by_ip or len(static_resources) > 0:
|
||||
self.static_resources_by_ip[ip] = static_resources
|
||||
if ip not in self.dynamic_resources_by_ip or update_dynamic_resources:
|
||||
self.dynamic_resources_by_ip[ip] = dynamic_resources
|
||||
if ip not in self.resource_load_by_ip or update_resource_load:
|
||||
self.resource_load_by_ip[ip] = resource_load
|
||||
|
||||
if not waiting_bundles:
|
||||
waiting_bundles = []
|
||||
if not infeasible_bundles:
|
||||
infeasible_bundles = []
|
||||
|
||||
# We are not guaranteed to have a corresponding dynamic resource for
|
||||
# every static resource because dynamic resources are based on the
|
||||
# available resources in the heartbeat, which does not exist if it is
|
||||
# zero. Thus, we have to update dynamic resources here.
|
||||
dynamic_resources_update = dynamic_resources.copy()
|
||||
for resource_name, capacity in static_resources.items():
|
||||
# We are not guaranteed to have a corresponding dynamic resource
|
||||
# for every static resource because dynamic resources are based on
|
||||
# the available resources in the heartbeat, which does not exist
|
||||
# if it is zero. Thus, we have to update dynamic resources here.
|
||||
dynamic_resources_update = self.dynamic_resources_by_ip[ip].copy()
|
||||
for resource_name, capacity in self.static_resources_by_ip[ip].items():
|
||||
if resource_name not in dynamic_resources_update:
|
||||
dynamic_resources_update[resource_name] = 0.0
|
||||
self.dynamic_resources_by_ip[ip] = dynamic_resources_update
|
||||
|
||||
now = time.time()
|
||||
if ip not in self.last_used_time_by_ip or \
|
||||
static_resources != dynamic_resources:
|
||||
self.static_resources_by_ip[ip] != \
|
||||
self.dynamic_resources_by_ip[ip]:
|
||||
self.last_used_time_by_ip[ip] = now
|
||||
self.last_heartbeat_time_by_ip[ip] = now
|
||||
self.waiting_bundles = waiting_bundles
|
||||
|
||||
@@ -15,6 +15,8 @@ cdef extern from "ray/common/ray_config.h" nogil:
|
||||
|
||||
int64_t raylet_heartbeat_timeout_milliseconds() const
|
||||
|
||||
c_bool light_heartbeat_enabled() const
|
||||
|
||||
int64_t debug_dump_period_milliseconds() const
|
||||
|
||||
int64_t num_heartbeats_timeout() const
|
||||
|
||||
@@ -13,6 +13,10 @@ cdef class Config:
|
||||
def raylet_heartbeat_timeout_milliseconds():
|
||||
return RayConfig.instance().raylet_heartbeat_timeout_milliseconds()
|
||||
|
||||
@staticmethod
|
||||
def light_heartbeat_enabled():
|
||||
return RayConfig.instance().light_heartbeat_enabled()
|
||||
|
||||
@staticmethod
|
||||
def debug_dump_period_milliseconds():
|
||||
return RayConfig.instance().debug_dump_period_milliseconds()
|
||||
|
||||
@@ -47,6 +47,7 @@ class Monitor:
|
||||
# Keep a mapping from raylet client ID to IP address to use
|
||||
# for updating the load metrics.
|
||||
self.raylet_id_to_ip_map = {}
|
||||
self.light_heartbeat_enabled = ray._config.light_heartbeat_enabled()
|
||||
self.load_metrics = LoadMetrics()
|
||||
if autoscaling_config:
|
||||
self.autoscaler = StandardAutoscaler(autoscaling_config,
|
||||
@@ -126,8 +127,6 @@ class Monitor:
|
||||
resource_load = dict(heartbeat_message.resource_load)
|
||||
total_resources = dict(heartbeat_message.resources_total)
|
||||
available_resources = dict(heartbeat_message.resources_available)
|
||||
for resource in total_resources:
|
||||
available_resources.setdefault(resource, 0.0)
|
||||
|
||||
waiting_bundles, infeasible_bundles = \
|
||||
self.parse_resource_demands(message.resource_load_by_shape)
|
||||
@@ -136,9 +135,14 @@ class Monitor:
|
||||
client_id = ray.utils.binary_to_hex(heartbeat_message.client_id)
|
||||
ip = self.raylet_id_to_ip_map.get(client_id)
|
||||
if ip:
|
||||
self.load_metrics.update(ip, total_resources,
|
||||
available_resources, resource_load,
|
||||
waiting_bundles, infeasible_bundles)
|
||||
update_available_resources = not self.light_heartbeat_enabled \
|
||||
or heartbeat_message.resources_available_changed()
|
||||
update_resource_load = not self.light_heartbeat_enabled \
|
||||
or heartbeat_message.resource_load_changed()
|
||||
self.load_metrics.update(
|
||||
ip, total_resources, update_available_resources,
|
||||
available_resources, update_resource_load, resource_load,
|
||||
waiting_bundles, infeasible_bundles)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Monitor: could not find ip for client {client_id}")
|
||||
|
||||
@@ -241,61 +241,101 @@ SMALL_CLUSTER = {
|
||||
class LoadMetricsTest(unittest.TestCase):
|
||||
def testUpdate(self):
|
||||
lm = LoadMetrics()
|
||||
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {})
|
||||
lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
assert lm.approx_workers_used() == 0.5
|
||||
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
|
||||
lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 0}, True, {})
|
||||
assert lm.approx_workers_used() == 1.0
|
||||
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0}, {})
|
||||
lm.update("2.2.2.2", {"CPU": 2}, True, {"CPU": 0}, True, {})
|
||||
assert lm.approx_workers_used() == 2.0
|
||||
|
||||
def testLoadMessages(self):
|
||||
lm = LoadMetrics()
|
||||
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {})
|
||||
lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 0.5)
|
||||
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {"CPU": 1})
|
||||
lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 1}, True, {"CPU": 1})
|
||||
self.assertEqual(lm.approx_workers_used(), 1.0)
|
||||
|
||||
# Both nodes count as busy since there is a queue on one.
|
||||
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 2}, {})
|
||||
lm.update("2.2.2.2", {"CPU": 2}, True, {"CPU": 2}, True, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 2.0)
|
||||
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0}, {})
|
||||
lm.update("2.2.2.2", {"CPU": 2}, True, {"CPU": 0}, True, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 2.0)
|
||||
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {})
|
||||
lm.update("2.2.2.2", {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 2.0)
|
||||
|
||||
# No queue anymore, so we're back to exact accounting.
|
||||
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
|
||||
lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 0}, True, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 1.5)
|
||||
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {"GPU": 1})
|
||||
lm.update("2.2.2.2", {"CPU": 2}, True, {"CPU": 1}, True, {"GPU": 1})
|
||||
self.assertEqual(lm.approx_workers_used(), 2.0)
|
||||
|
||||
lm.update("3.3.3.3", {"CPU": 2}, {"CPU": 1}, {})
|
||||
lm.update("4.3.3.3", {"CPU": 2}, {"CPU": 1}, {})
|
||||
lm.update("5.3.3.3", {"CPU": 2}, {"CPU": 1}, {})
|
||||
lm.update("6.3.3.3", {"CPU": 2}, {"CPU": 1}, {})
|
||||
lm.update("7.3.3.3", {"CPU": 2}, {"CPU": 1}, {})
|
||||
lm.update("8.3.3.3", {"CPU": 2}, {"CPU": 1}, {})
|
||||
lm.update("3.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
lm.update("4.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
lm.update("5.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
lm.update("6.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
lm.update("7.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
lm.update("8.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 8.0)
|
||||
|
||||
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {}) # no queue anymore
|
||||
lm.update("2.2.2.2", {"CPU": 2}, True, {"CPU": 1}, True,
|
||||
{}) # no queue anymore
|
||||
self.assertEqual(lm.approx_workers_used(), 4.5)
|
||||
|
||||
def testLoadMessagesWithLightHeartbeat(self):
|
||||
lm = LoadMetrics()
|
||||
lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 0.5)
|
||||
lm.update("1.1.1.1", {}, False, {}, True, {"CPU": 1})
|
||||
self.assertEqual(lm.approx_workers_used(), 1.0)
|
||||
|
||||
# Both nodes count as busy since there is a queue on one.
|
||||
lm.update("2.2.2.2", {"CPU": 2}, True, {"CPU": 2}, True, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 2.0)
|
||||
lm.update("2.2.2.2", {}, True, {"CPU": 0}, False, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 2.0)
|
||||
lm.update("2.2.2.2", {}, True, {"CPU": 1}, False, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 2.0)
|
||||
|
||||
# No queue anymore, so we're back to exact accounting.
|
||||
lm.update("1.1.1.1", {}, True, {"CPU": 0}, True, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 1.5)
|
||||
lm.update("2.2.2.2", {}, False, {}, True, {"GPU": 1})
|
||||
self.assertEqual(lm.approx_workers_used(), 2.0)
|
||||
|
||||
lm.update("3.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
lm.update("4.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
lm.update("5.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
lm.update("6.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
lm.update("7.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
lm.update("8.3.3.3", {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
self.assertEqual(lm.approx_workers_used(), 8.0)
|
||||
|
||||
lm.update("2.2.2.2", {}, False, {"CPU": 1}, True,
|
||||
{}) # no queue anymore
|
||||
self.assertEqual(lm.approx_workers_used(), 4.5)
|
||||
|
||||
def testPruneByNodeIp(self):
|
||||
lm = LoadMetrics()
|
||||
lm.update("1.1.1.1", {"CPU": 1}, {"CPU": 0}, {})
|
||||
lm.update("2.2.2.2", {"CPU": 1}, {"CPU": 0}, {})
|
||||
lm.update("1.1.1.1", {"CPU": 1}, True, {"CPU": 0}, True, {})
|
||||
lm.update("2.2.2.2", {"CPU": 1}, True, {"CPU": 0}, True, {})
|
||||
lm.prune_active_ips({"1.1.1.1", "4.4.4.4"})
|
||||
assert lm.approx_workers_used() == 1.0
|
||||
|
||||
def testBottleneckResource(self):
|
||||
lm = LoadMetrics()
|
||||
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
|
||||
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {})
|
||||
lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 0}, True, {})
|
||||
lm.update("2.2.2.2", {
|
||||
"CPU": 2,
|
||||
"GPU": 16
|
||||
}, True, {
|
||||
"CPU": 2,
|
||||
"GPU": 2
|
||||
}, True, {})
|
||||
assert lm.approx_workers_used() == 1.88
|
||||
|
||||
def testHeartbeat(self):
|
||||
lm = LoadMetrics()
|
||||
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {})
|
||||
lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
lm.mark_active("2.2.2.2")
|
||||
assert "1.1.1.1" in lm.last_heartbeat_time_by_ip
|
||||
assert "2.2.2.2" in lm.last_heartbeat_time_by_ip
|
||||
@@ -303,15 +343,21 @@ class LoadMetricsTest(unittest.TestCase):
|
||||
|
||||
def testDebugString(self):
|
||||
lm = LoadMetrics()
|
||||
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
|
||||
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {})
|
||||
lm.update("1.1.1.1", {"CPU": 2}, True, {"CPU": 0}, True, {})
|
||||
lm.update("2.2.2.2", {
|
||||
"CPU": 2,
|
||||
"GPU": 16
|
||||
}, True, {
|
||||
"CPU": 2,
|
||||
"GPU": 2
|
||||
}, True, {})
|
||||
lm.update("3.3.3.3", {
|
||||
"memory": 20,
|
||||
"object_store_memory": 40
|
||||
}, {
|
||||
}, True, {
|
||||
"memory": 0,
|
||||
"object_store_memory": 20
|
||||
}, {})
|
||||
}, True, {})
|
||||
debug = lm.info_string()
|
||||
assert ("ResourceUsage: 2.0/4.0 CPU, 14.0/16.0 GPU, "
|
||||
"1.05 GiB/1.05 GiB memory, "
|
||||
@@ -599,8 +645,8 @@ class AutoscalingTest(unittest.TestCase):
|
||||
tag_filters={TAG_RAY_NODE_KIND: "worker"}, )
|
||||
addrs += head_ip
|
||||
for addr in addrs:
|
||||
lm.update(addr, {"CPU": 2}, {"CPU": 0}, {})
|
||||
lm.update(addr, {"CPU": 2}, {"CPU": 2}, {})
|
||||
lm.update(addr, {"CPU": 2}, True, {"CPU": 0}, True, {})
|
||||
lm.update(addr, {"CPU": 2}, True, {"CPU": 2}, True, {})
|
||||
assert autoscaler.bringup
|
||||
autoscaler.update()
|
||||
|
||||
@@ -609,7 +655,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
self.waitForNodes(1)
|
||||
|
||||
# All of the nodes are down. Simulate some load on the head node
|
||||
lm.update(head_ip, {"CPU": 2}, {"CPU": 0}, {})
|
||||
lm.update(head_ip, {"CPU": 2}, True, {"CPU": 0}, True, {})
|
||||
|
||||
autoscaler.update()
|
||||
self.waitForNodes(6) # expected due to batch sizes and concurrency
|
||||
@@ -652,11 +698,11 @@ class AutoscalingTest(unittest.TestCase):
|
||||
autoscaler.update()
|
||||
self.waitForNodes(2)
|
||||
# This node has num_cpus=0
|
||||
lm.update(unmanaged_ip, {"CPU": 0}, {"CPU": 0}, {})
|
||||
lm.update(unmanaged_ip, {"CPU": 0}, True, {"CPU": 0}, True, {})
|
||||
autoscaler.update()
|
||||
self.waitForNodes(2)
|
||||
# 1 CPU task cannot be scheduled.
|
||||
lm.update(unmanaged_ip, {"CPU": 0}, {"CPU": 0}, {"CPU": 1})
|
||||
lm.update(unmanaged_ip, {"CPU": 0}, True, {"CPU": 0}, True, {"CPU": 1})
|
||||
autoscaler.update()
|
||||
self.waitForNodes(3)
|
||||
|
||||
@@ -946,17 +992,18 @@ class AutoscalingTest(unittest.TestCase):
|
||||
|
||||
# Scales up as nodes are reported as used
|
||||
local_ip = services.get_node_ip_address()
|
||||
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head
|
||||
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}, {}) # worker 1
|
||||
lm.update(local_ip, {"CPU": 2}, True, {"CPU": 0}, True, {}) # head
|
||||
lm.update("172.0.0.0", {"CPU": 2}, True, {"CPU": 0}, True,
|
||||
{}) # worker 1
|
||||
autoscaler.update()
|
||||
self.waitForNodes(3)
|
||||
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0}, {})
|
||||
lm.update("172.0.0.1", {"CPU": 2}, True, {"CPU": 0}, True, {})
|
||||
autoscaler.update()
|
||||
self.waitForNodes(5)
|
||||
|
||||
# Holds steady when load is removed
|
||||
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2}, {})
|
||||
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}, {})
|
||||
lm.update("172.0.0.0", {"CPU": 2}, True, {"CPU": 2}, True, {})
|
||||
lm.update("172.0.0.1", {"CPU": 2}, True, {"CPU": 2}, True, {})
|
||||
autoscaler.update()
|
||||
assert autoscaler.pending_launches.value == 0
|
||||
assert len(self.provider.non_terminated_nodes({})) == 5
|
||||
@@ -995,20 +1042,20 @@ class AutoscalingTest(unittest.TestCase):
|
||||
|
||||
# Scales up as nodes are reported as used
|
||||
local_ip = services.get_node_ip_address()
|
||||
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head
|
||||
lm.update(local_ip, {"CPU": 2}, True, {"CPU": 0}, True, {}) # head
|
||||
# 1.0 nodes used => target nodes = 2 => target workers = 1
|
||||
autoscaler.update()
|
||||
self.waitForNodes(1)
|
||||
|
||||
# Make new node idle, and never used.
|
||||
# Should hold steady as target is still 2.
|
||||
lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0}, {})
|
||||
lm.update("172.0.0.0", {"CPU": 0}, True, {"CPU": 0}, True, {})
|
||||
lm.last_used_time_by_ip["172.0.0.0"] = 0
|
||||
autoscaler.update()
|
||||
assert len(self.provider.non_terminated_nodes({})) == 1
|
||||
|
||||
# Reduce load on head => target nodes = 1 => target workers = 0
|
||||
lm.update(local_ip, {"CPU": 2}, {"CPU": 1}, {})
|
||||
lm.update(local_ip, {"CPU": 2}, True, {"CPU": 1}, True, {})
|
||||
autoscaler.update()
|
||||
assert len(self.provider.non_terminated_nodes({})) == 0
|
||||
|
||||
|
||||
@@ -324,7 +324,9 @@ class LoadMetricsTest(unittest.TestCase):
|
||||
def testResourceDemandVector(self):
|
||||
lm = LoadMetrics()
|
||||
lm.update(
|
||||
"1.1.1.1", {"CPU": 2}, {"CPU": 1}, {},
|
||||
"1.1.1.1", {"CPU": 2},
|
||||
True, {"CPU": 1},
|
||||
True, {},
|
||||
waiting_bundles=[{
|
||||
"GPU": 1
|
||||
}],
|
||||
@@ -492,14 +494,16 @@ class AutoscalingTest(unittest.TestCase):
|
||||
update_interval_s=0)
|
||||
autoscaler.update()
|
||||
self.waitForNodes(1)
|
||||
lm.update(head_ip, {"CPU": 4, "GPU": 1}, {}, {})
|
||||
lm.update(head_ip, {"CPU": 4, "GPU": 1}, True, {}, True, {})
|
||||
self.waitForNodes(1)
|
||||
|
||||
lm.update(
|
||||
head_ip, {
|
||||
"CPU": 4,
|
||||
"GPU": 1
|
||||
}, {"GPU": 0}, {},
|
||||
},
|
||||
True, {"GPU": 0},
|
||||
True, {},
|
||||
waiting_bundles=[{
|
||||
"GPU": 1
|
||||
}])
|
||||
@@ -633,7 +637,9 @@ class AutoscalingTest(unittest.TestCase):
|
||||
self.waitForNodes(0)
|
||||
autoscaler.update()
|
||||
lm.update(
|
||||
"1.2.3.4", {}, {}, {},
|
||||
"1.2.3.4", {},
|
||||
True, {},
|
||||
True, {},
|
||||
waiting_bundles=[{
|
||||
"GPU": 1
|
||||
}],
|
||||
|
||||
Reference in New Issue
Block a user