mirror of
https://github.com/wassname/ray.git
synced 2026-07-02 13:06:49 +08:00
Add heartbeat test + Fix monitor.py (#5191)
This commit is contained in:
@@ -209,15 +209,10 @@ class LoadMetrics(object):
|
||||
def num_workers_connected(self):
|
||||
return self._info()["NumNodesConnected"]
|
||||
|
||||
def info_string(self):
|
||||
return ", ".join(
|
||||
["{}={}".format(k, v) for k, v in sorted(self._info().items())])
|
||||
|
||||
def _info(self):
|
||||
def get_resource_usage(self):
|
||||
nodes_used = 0.0
|
||||
resources_used = {}
|
||||
resources_total = {}
|
||||
now = time.time()
|
||||
for ip, max_resources in self.static_resources_by_ip.items():
|
||||
avail_resources = self.dynamic_resources_by_ip[ip]
|
||||
max_frac = 0.0
|
||||
@@ -234,6 +229,17 @@ class LoadMetrics(object):
|
||||
if frac > max_frac:
|
||||
max_frac = frac
|
||||
nodes_used += max_frac
|
||||
|
||||
return nodes_used, resources_used, resources_total
|
||||
|
||||
def info_string(self):
|
||||
return ", ".join(
|
||||
["{}={}".format(k, v) for k, v in sorted(self._info().items())])
|
||||
|
||||
def _info(self):
|
||||
nodes_used, resources_used, resources_total = self.get_resource_usage()
|
||||
|
||||
now = time.time()
|
||||
idle_times = [now - t for t in self.last_used_time_by_ip.values()]
|
||||
heartbeat_times = [
|
||||
now - t for t in self.last_heartbeat_time_by_ip.values()
|
||||
|
||||
+21
-15
@@ -38,7 +38,7 @@ class Monitor(object):
|
||||
def __init__(self, redis_address, autoscaling_config, redis_password=None):
|
||||
# Initialize the Redis clients.
|
||||
ray.state.state._initialize_global_state(
|
||||
args.redis_address, redis_password=redis_password)
|
||||
redis_address, redis_password=redis_password)
|
||||
self.redis = ray.services.create_redis_client(
|
||||
redis_address, password=redis_password)
|
||||
# Setup subscriptions to the primary Redis server and the Redis shards.
|
||||
@@ -106,25 +106,22 @@ class Monitor(object):
|
||||
|
||||
message = ray.gcs_utils.HeartbeatBatchTableData.FromString(
|
||||
heartbeat_data)
|
||||
|
||||
for heartbeat_message in message.batch:
|
||||
num_resources = len(heartbeat_message.resources_available_label)
|
||||
static_resources = {}
|
||||
dynamic_resources = {}
|
||||
for i in range(num_resources):
|
||||
dyn = heartbeat_message.resources_available_label[i]
|
||||
static = heartbeat_message.resources_total_label[i]
|
||||
dynamic_resources[dyn] = (
|
||||
heartbeat_message.resources_available_capacity[i])
|
||||
static_resources[static] = (
|
||||
heartbeat_message.resources_total_capacity[i])
|
||||
total_resources = dict(
|
||||
zip(heartbeat_message.resources_total_label,
|
||||
heartbeat_message.resources_total_capacity))
|
||||
available_resources = dict(
|
||||
zip(heartbeat_message.resources_available_label,
|
||||
heartbeat_message.resources_available_capacity))
|
||||
for resource in total_resources:
|
||||
available_resources.setdefault(resource, 0.0)
|
||||
|
||||
# Update the load metrics for this raylet.
|
||||
client_id = ray.utils.binary_to_hex(heartbeat_message.client_id)
|
||||
ip = self.raylet_id_to_ip_map.get(client_id)
|
||||
if ip:
|
||||
self.load_metrics.update(ip, static_resources,
|
||||
dynamic_resources)
|
||||
self.load_metrics.update(ip, total_resources,
|
||||
available_resources)
|
||||
else:
|
||||
logger.warning(
|
||||
"Monitor: "
|
||||
@@ -250,7 +247,14 @@ class Monitor(object):
|
||||
# Call the handler.
|
||||
message_handler(channel, data)
|
||||
|
||||
def update_raylet_map(self):
|
||||
def update_raylet_map(self, _append_port=False):
|
||||
"""Updates internal raylet map.
|
||||
|
||||
Args:
|
||||
_append_port (bool): Defaults to False. Appending the port is
|
||||
useful in testing, as mock clusters have many nodes with
|
||||
the same IP and cannot be uniquely identified.
|
||||
"""
|
||||
all_raylet_nodes = ray.nodes()
|
||||
self.raylet_id_to_ip_map = {}
|
||||
for raylet_info in all_raylet_nodes:
|
||||
@@ -258,6 +262,8 @@ class Monitor(object):
|
||||
or raylet_info["ClientID"])
|
||||
ip_address = (raylet_info.get("AuxAddress")
|
||||
or raylet_info["NodeManagerAddress"]).split(":")[0]
|
||||
if _append_port:
|
||||
ip_address += ":" + str(raylet_info["NodeManagerPort"])
|
||||
self.raylet_id_to_ip_map[client_id] = ip_address
|
||||
|
||||
def _maybe_flush_gcs(self):
|
||||
|
||||
@@ -48,6 +48,7 @@ if __name__ == "__main__":
|
||||
do_link("autoscaler", force=args.yes)
|
||||
do_link("scripts", force=args.yes)
|
||||
do_link("internal", force=args.yes)
|
||||
do_link("tests", force=args.yes)
|
||||
do_link("experimental", force=args.yes)
|
||||
print("Created links.\n\nIf you run into issues initializing Ray, please "
|
||||
"ensure that your local repo and the installed Ray are in sync "
|
||||
|
||||
@@ -8,6 +8,7 @@ import time
|
||||
|
||||
import ray
|
||||
import ray.ray_constants as ray_constants
|
||||
from ray.monitor import Monitor
|
||||
from ray.tests.cluster_utils import Cluster
|
||||
from ray.tests.conftest import generate_internal_config_map
|
||||
|
||||
@@ -58,6 +59,122 @@ def test_internal_config(ray_start_cluster_head):
|
||||
assert ray.cluster_resources()["CPU"] == 1
|
||||
|
||||
|
||||
def setup_monitor(redis_address):
|
||||
monitor = Monitor(redis_address, None)
|
||||
monitor.subscribe(ray.gcs_utils.XRAY_HEARTBEAT_BATCH_CHANNEL)
|
||||
monitor.subscribe(ray.gcs_utils.XRAY_JOB_CHANNEL) # TODO: Remove?
|
||||
monitor.update_raylet_map(_append_port=True)
|
||||
monitor._maybe_flush_gcs()
|
||||
return monitor
|
||||
|
||||
|
||||
def verify_load_metrics(monitor, expected_resource_usage=None, timeout=10):
|
||||
while True:
|
||||
monitor.process_messages()
|
||||
resource_usage = monitor.load_metrics.get_resource_usage()
|
||||
|
||||
if expected_resource_usage is None:
|
||||
if all(x for x in resource_usage[1:]):
|
||||
break
|
||||
elif all(x == y
|
||||
for x, y in zip(resource_usage, expected_resource_usage)):
|
||||
break
|
||||
else:
|
||||
timeout -= 1
|
||||
time.sleep(1)
|
||||
|
||||
if timeout <= 0:
|
||||
raise ValueError("Timeout. {} != {}".format(
|
||||
resource_usage, expected_resource_usage))
|
||||
|
||||
return resource_usage
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"ray_start_cluster_head", [{
|
||||
"num_cpus": 1,
|
||||
}, {
|
||||
"num_cpus": 2,
|
||||
}],
|
||||
indirect=True)
|
||||
def test_heartbeats_single(ray_start_cluster_head):
|
||||
"""Unit test for `Cluster.wait_for_nodes`.
|
||||
|
||||
Test proper metrics.
|
||||
"""
|
||||
cluster = ray_start_cluster_head
|
||||
timeout = 5
|
||||
monitor = setup_monitor(cluster.redis_address)
|
||||
total_cpus = ray.state.cluster_resources()["CPU"]
|
||||
verify_load_metrics(monitor, (0.0, {"CPU": 0.0}, {"CPU": total_cpus}))
|
||||
|
||||
@ray.remote
|
||||
def work(timeout):
|
||||
time.sleep(timeout)
|
||||
return True
|
||||
|
||||
work_handle = work.remote(timeout * 2)
|
||||
verify_load_metrics(monitor, (1.0 / total_cpus, {
|
||||
"CPU": 1.0
|
||||
}, {
|
||||
"CPU": total_cpus
|
||||
}))
|
||||
ray.get(work_handle)
|
||||
|
||||
@ray.remote
|
||||
class Actor(object):
|
||||
def work(self, timeout):
|
||||
time.sleep(timeout)
|
||||
return True
|
||||
|
||||
test_actor = Actor.remote()
|
||||
work_handle = test_actor.work.remote(timeout * 2)
|
||||
|
||||
verify_load_metrics(monitor, (1.0 / total_cpus, {
|
||||
"CPU": 1.0
|
||||
}, {
|
||||
"CPU": total_cpus
|
||||
}))
|
||||
|
||||
ray.get(work_handle)
|
||||
|
||||
|
||||
def test_heartbeats_cluster(ray_start_cluster_head):
|
||||
"""Unit test for `Cluster.wait_for_nodes`.
|
||||
|
||||
Test proper metrics.
|
||||
"""
|
||||
cluster = ray_start_cluster_head
|
||||
timeout = 5
|
||||
num_workers_nodes = 4
|
||||
num_nodes_total = int(num_workers_nodes + 1)
|
||||
[cluster.add_node() for i in range(num_workers_nodes)]
|
||||
cluster.wait_for_nodes()
|
||||
monitor = setup_monitor(cluster.redis_address)
|
||||
|
||||
verify_load_metrics(monitor, (0.0, {"CPU": 0.0}, {"CPU": num_nodes_total}))
|
||||
|
||||
@ray.remote
|
||||
class Actor(object):
|
||||
def work(self, timeout):
|
||||
time.sleep(timeout)
|
||||
return True
|
||||
|
||||
test_actors = [Actor.remote() for i in range(num_nodes_total)]
|
||||
|
||||
work_handles = [actor.work.remote(timeout * 2) for actor in test_actors]
|
||||
|
||||
verify_load_metrics(monitor, (num_nodes_total, {
|
||||
"CPU": num_nodes_total
|
||||
}, {
|
||||
"CPU": num_nodes_total
|
||||
}))
|
||||
|
||||
ray.get(work_handles)
|
||||
verify_load_metrics(monitor, (0.0, {"CPU": 0.0}, {"CPU": num_nodes_total}))
|
||||
ray.shutdown()
|
||||
|
||||
|
||||
def test_wait_for_nodes(ray_start_cluster_head):
|
||||
"""Unit test for `Cluster.wait_for_nodes`.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user