diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 4172f027f..c9cf46ba9 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -209,15 +209,10 @@ class LoadMetrics(object): def num_workers_connected(self): return self._info()["NumNodesConnected"] - def info_string(self): - return ", ".join( - ["{}={}".format(k, v) for k, v in sorted(self._info().items())]) - - def _info(self): + def get_resource_usage(self): nodes_used = 0.0 resources_used = {} resources_total = {} - now = time.time() for ip, max_resources in self.static_resources_by_ip.items(): avail_resources = self.dynamic_resources_by_ip[ip] max_frac = 0.0 @@ -234,6 +229,17 @@ class LoadMetrics(object): if frac > max_frac: max_frac = frac nodes_used += max_frac + + return nodes_used, resources_used, resources_total + + def info_string(self): + return ", ".join( + ["{}={}".format(k, v) for k, v in sorted(self._info().items())]) + + def _info(self): + nodes_used, resources_used, resources_total = self.get_resource_usage() + + now = time.time() idle_times = [now - t for t in self.last_used_time_by_ip.values()] heartbeat_times = [ now - t for t in self.last_heartbeat_time_by_ip.values() diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 59571e3ed..3c9810089 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -38,7 +38,7 @@ class Monitor(object): def __init__(self, redis_address, autoscaling_config, redis_password=None): # Initialize the Redis clients. ray.state.state._initialize_global_state( - args.redis_address, redis_password=redis_password) + redis_address, redis_password=redis_password) self.redis = ray.services.create_redis_client( redis_address, password=redis_password) # Setup subscriptions to the primary Redis server and the Redis shards. @@ -106,25 +106,22 @@ class Monitor(object): message = ray.gcs_utils.HeartbeatBatchTableData.FromString( heartbeat_data) - for heartbeat_message in message.batch: - num_resources = len(heartbeat_message.resources_available_label) - static_resources = {} - dynamic_resources = {} - for i in range(num_resources): - dyn = heartbeat_message.resources_available_label[i] - static = heartbeat_message.resources_total_label[i] - dynamic_resources[dyn] = ( - heartbeat_message.resources_available_capacity[i]) - static_resources[static] = ( - heartbeat_message.resources_total_capacity[i]) + total_resources = dict( + zip(heartbeat_message.resources_total_label, + heartbeat_message.resources_total_capacity)) + available_resources = dict( + zip(heartbeat_message.resources_available_label, + heartbeat_message.resources_available_capacity)) + for resource in total_resources: + available_resources.setdefault(resource, 0.0) # Update the load metrics for this raylet. client_id = ray.utils.binary_to_hex(heartbeat_message.client_id) ip = self.raylet_id_to_ip_map.get(client_id) if ip: - self.load_metrics.update(ip, static_resources, - dynamic_resources) + self.load_metrics.update(ip, total_resources, + available_resources) else: logger.warning( "Monitor: " @@ -250,7 +247,14 @@ class Monitor(object): # Call the handler. message_handler(channel, data) - def update_raylet_map(self): + def update_raylet_map(self, _append_port=False): + """Updates internal raylet map. + + Args: + _append_port (bool): Defaults to False. Appending the port is + useful in testing, as mock clusters have many nodes with + the same IP and cannot be uniquely identified. + """ all_raylet_nodes = ray.nodes() self.raylet_id_to_ip_map = {} for raylet_info in all_raylet_nodes: @@ -258,6 +262,8 @@ class Monitor(object): or raylet_info["ClientID"]) ip_address = (raylet_info.get("AuxAddress") or raylet_info["NodeManagerAddress"]).split(":")[0] + if _append_port: + ip_address += ":" + str(raylet_info["NodeManagerPort"]) self.raylet_id_to_ip_map[client_id] = ip_address def _maybe_flush_gcs(self): diff --git a/python/ray/setup-dev.py b/python/ray/setup-dev.py index 2e6032790..2a992528c 100755 --- a/python/ray/setup-dev.py +++ b/python/ray/setup-dev.py @@ -48,6 +48,7 @@ if __name__ == "__main__": do_link("autoscaler", force=args.yes) do_link("scripts", force=args.yes) do_link("internal", force=args.yes) + do_link("tests", force=args.yes) do_link("experimental", force=args.yes) print("Created links.\n\nIf you run into issues initializing Ray, please " "ensure that your local repo and the installed Ray are in sync " diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index 979f47283..6ae3b4068 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -8,6 +8,7 @@ import time import ray import ray.ray_constants as ray_constants +from ray.monitor import Monitor from ray.tests.cluster_utils import Cluster from ray.tests.conftest import generate_internal_config_map @@ -58,6 +59,122 @@ def test_internal_config(ray_start_cluster_head): assert ray.cluster_resources()["CPU"] == 1 +def setup_monitor(redis_address): + monitor = Monitor(redis_address, None) + monitor.subscribe(ray.gcs_utils.XRAY_HEARTBEAT_BATCH_CHANNEL) + monitor.subscribe(ray.gcs_utils.XRAY_JOB_CHANNEL) # TODO: Remove? + monitor.update_raylet_map(_append_port=True) + monitor._maybe_flush_gcs() + return monitor + + +def verify_load_metrics(monitor, expected_resource_usage=None, timeout=10): + while True: + monitor.process_messages() + resource_usage = monitor.load_metrics.get_resource_usage() + + if expected_resource_usage is None: + if all(x for x in resource_usage[1:]): + break + elif all(x == y + for x, y in zip(resource_usage, expected_resource_usage)): + break + else: + timeout -= 1 + time.sleep(1) + + if timeout <= 0: + raise ValueError("Timeout. {} != {}".format( + resource_usage, expected_resource_usage)) + + return resource_usage + + +@pytest.mark.parametrize( + "ray_start_cluster_head", [{ + "num_cpus": 1, + }, { + "num_cpus": 2, + }], + indirect=True) +def test_heartbeats_single(ray_start_cluster_head): + """Unit test for `Cluster.wait_for_nodes`. + + Test proper metrics. + """ + cluster = ray_start_cluster_head + timeout = 5 + monitor = setup_monitor(cluster.redis_address) + total_cpus = ray.state.cluster_resources()["CPU"] + verify_load_metrics(monitor, (0.0, {"CPU": 0.0}, {"CPU": total_cpus})) + + @ray.remote + def work(timeout): + time.sleep(timeout) + return True + + work_handle = work.remote(timeout * 2) + verify_load_metrics(monitor, (1.0 / total_cpus, { + "CPU": 1.0 + }, { + "CPU": total_cpus + })) + ray.get(work_handle) + + @ray.remote + class Actor(object): + def work(self, timeout): + time.sleep(timeout) + return True + + test_actor = Actor.remote() + work_handle = test_actor.work.remote(timeout * 2) + + verify_load_metrics(monitor, (1.0 / total_cpus, { + "CPU": 1.0 + }, { + "CPU": total_cpus + })) + + ray.get(work_handle) + + +def test_heartbeats_cluster(ray_start_cluster_head): + """Unit test for `Cluster.wait_for_nodes`. + + Test proper metrics. + """ + cluster = ray_start_cluster_head + timeout = 5 + num_workers_nodes = 4 + num_nodes_total = int(num_workers_nodes + 1) + [cluster.add_node() for i in range(num_workers_nodes)] + cluster.wait_for_nodes() + monitor = setup_monitor(cluster.redis_address) + + verify_load_metrics(monitor, (0.0, {"CPU": 0.0}, {"CPU": num_nodes_total})) + + @ray.remote + class Actor(object): + def work(self, timeout): + time.sleep(timeout) + return True + + test_actors = [Actor.remote() for i in range(num_nodes_total)] + + work_handles = [actor.work.remote(timeout * 2) for actor in test_actors] + + verify_load_metrics(monitor, (num_nodes_total, { + "CPU": num_nodes_total + }, { + "CPU": num_nodes_total + })) + + ray.get(work_handles) + verify_load_metrics(monitor, (0.0, {"CPU": 0.0}, {"CPU": num_nodes_total})) + ray.shutdown() + + def test_wait_for_nodes(ray_start_cluster_head): """Unit test for `Cluster.wait_for_nodes`. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index b08f6e35d..b56029d28 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -547,8 +547,8 @@ void NodeManager::HeartbeatAdded(const ClientID &client_id, SchedulingResources &remote_resources = it->second; ResourceSet remote_available( - VectorFromProtobuf(heartbeat_data.resources_total_label()), - VectorFromProtobuf(heartbeat_data.resources_total_capacity())); + VectorFromProtobuf(heartbeat_data.resources_available_label()), + VectorFromProtobuf(heartbeat_data.resources_available_capacity())); ResourceSet remote_load(VectorFromProtobuf(heartbeat_data.resource_load_label()), VectorFromProtobuf(heartbeat_data.resource_load_capacity())); // TODO(atumanov): assert that the load is a non-empty ResourceSet.