diff --git a/python/ray/resource_spec.py b/python/ray/resource_spec.py index cc572b784..678911dd8 100644 --- a/python/ray/resource_spec.py +++ b/python/ray/resource_spec.py @@ -13,6 +13,10 @@ import ray.ray_constants as ray_constants logger = logging.getLogger(__name__) +# Prefix for the node id resource that is automatically added to each node. +# For example, a node may have id `node:172.23.42.1`. +NODE_ID_PREFIX = "node:" + class ResourceSpec( namedtuple("ResourceSpec", [ @@ -127,6 +131,10 @@ class ResourceSpec( assert "memory" not in resources, resources assert "object_store_memory" not in resources, resources + # Automatically create a node id resource on each node. This is + # queryable with ray.state.node_ids() and ray.state.current_node_id(). + resources[NODE_ID_PREFIX + ray.services.get_node_ip_address()] = 1.0 + num_cpus = self.num_cpus if num_cpus is None: num_cpus = multiprocessing.cpu_count() diff --git a/python/ray/state.py b/python/ray/state.py index dae0acd7e..35113f8ce 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -1083,6 +1083,38 @@ def nodes(): return state.client_table() +def current_node_id(): + """Return the node id of the current node. + + For example, "node:172.10.5.34". This can be used as a custom resource, + e.g., {node_id: 1} to reserve the whole node, or {node_id: 0.001} to + just force placement on the node. + + Returns: + Id of the current node. + """ + return ray.resource_spec.NODE_ID_PREFIX + ray.services.get_node_ip_address( + ) + + +def node_ids(): + """Get a list of the node ids in the cluster. + + For example, ["node:172.10.5.34", "node:172.42.3.77"]. These can be used + as custom resources, e.g., {node_id: 1} to reserve the whole node, or + {node_id: 0.001} to just force placement on the node. + + Returns: + List of the node resource ids. + """ + node_ids = [] + for node in nodes(): + for k, v in node["Resources"].items(): + if k.startswith(ray.resource_spec.NODE_ID_PREFIX): + node_ids.append(k) + return node_ids + + def tasks(task_id=None): """Fetch and parse the task table information for one or more task IDs. diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 5a71bdd4e..f4aad579e 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -2290,6 +2290,26 @@ def test_custom_resources(ray_start_cluster): ray.get([h.remote() for _ in range(5)]) +def test_node_id_resource(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node(num_cpus=3) + cluster.add_node(num_cpus=3) + ray.init(address=cluster.address) + + local_node = ray.state.current_node_id() + + # Note that these will have the same IP in the test cluster + assert len(ray.state.node_ids()) == 2 + assert local_node in ray.state.node_ids() + + @ray.remote(resources={local_node: 1}) + def f(): + return ray.state.current_node_id() + + # Check the node id resource is automatically usable for scheduling. + assert ray.get(f.remote()) == ray.state.current_node_id() + + def test_two_custom_resources(ray_start_cluster): cluster = ray_start_cluster cluster.add_node( @@ -2393,6 +2413,9 @@ def test_zero_capacity_deletion_semantics(shutdown_only): del resources["memory"] del resources["object_store_memory"] + for key in list(resources.keys()): + if key.startswith("node:"): + del resources[key] while resources and retry_count < MAX_RETRY_ATTEMPTS: time.sleep(0.1) @@ -2401,7 +2424,7 @@ def test_zero_capacity_deletion_semantics(shutdown_only): if retry_count >= MAX_RETRY_ATTEMPTS: raise RuntimeError( - "Resources were available even after five retries.") + "Resources were available even after five retries.", resources) return resources diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index e753408f5..7d84b12b8 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -82,6 +82,12 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=10): del resource_usage[2]["memory"] if "object_store_memory" in resource_usage[2]: del resource_usage[2]["object_store_memory"] + for key in list(resource_usage[1].keys()): + if key.startswith("node:"): + del resource_usage[1][key] + for key in list(resource_usage[2].keys()): + if key.startswith("node:"): + del resource_usage[2][key] if expected_resource_usage is None: if all(x for x in resource_usage[1:]): diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index 7dcbeed5d..c557aaebb 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -496,6 +496,7 @@ class RayTrialExecutor(TrialExecutor): self._committed_resources.get_res_total(name), self._avail_resources.get_res_total(name), name) for name in self._avail_resources.custom_resources + if not name.startswith(ray.resource_spec.NODE_ID_PREFIX) ]) if customs: status += " ({})".format(customs) diff --git a/python/ray/tune/resources.py b/python/ray/tune/resources.py index b340f133f..17ef70bff 100644 --- a/python/ray/tune/resources.py +++ b/python/ray/tune/resources.py @@ -9,6 +9,7 @@ from numbers import Number # For compatibility under py2 to consider unicode as str from six import string_types +import ray from ray.tune import TuneError logger = logging.getLogger(__name__) @@ -111,6 +112,7 @@ class Resources( custom_summary = ", ".join([ "{} {}".format(self.get_res_total(res), res) for res in self.custom_resources + if not res.startswith(ray.resource_spec.NODE_ID_PREFIX) ]) if custom_summary: summary += " ({})".format(custom_summary)