diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index ad2cde5c2..f38116396 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -536,12 +536,11 @@ class ServeController: backend_info = self.backends[backend_tag] if delta_num_replicas > 0: - can_schedule = try_schedule_resources_on_nodes( - requirements=[ - backend_info.replica_config.resource_dict - for _ in range(delta_num_replicas) - ], - ray_nodes=ray.nodes()) + can_schedule = try_schedule_resources_on_nodes(requirements=[ + backend_info.replica_config.resource_dict + for _ in range(delta_num_replicas) + ]) + if _RESOURCE_CHECK_ENABLED and not all(can_schedule): num_possible = sum(can_schedule) raise RayServeException( diff --git a/python/ray/serve/tests/test_util.py b/python/ray/serve/tests/test_util.py index 6bf023e6b..e67ff2c26 100644 --- a/python/ray/serve/tests/test_util.py +++ b/python/ray/serve/tests/test_util.py @@ -69,20 +69,15 @@ async def test_future_chaining(): def test_mock_scheduler(): - ray_nodes = [{ - "NodeID": "AAA", - "Alive": True, - "Resources": { + ray_nodes = { + "AAA": { "CPU": 2.0, "GPU": 2.0 - } - }, { - "NodeID": "BBB", - "Alive": True, - "Resources": { + }, + "BBB": { "CPU": 4.0, } - }] + } assert try_schedule_resources_on_nodes( [ diff --git a/python/ray/serve/utils.py b/python/ray/serve/utils.py index 04351a6da..0f2eb38f6 100644 --- a/python/ray/serve/utils.py +++ b/python/ray/serve/utils.py @@ -6,7 +6,7 @@ import logging import random import string import time -from typing import List +from typing import List, Dict import io import os from ray.serve.exceptions import RayServeException @@ -237,27 +237,23 @@ def unpack_future(src: asyncio.Future, num_items: int) -> List[asyncio.Future]: def try_schedule_resources_on_nodes( requirements: List[dict], - ray_nodes: List = None, + ray_resource: Dict[str, Dict] = None, ) -> List[bool]: """Test given resource requirements can be scheduled on ray nodes. Args: requirements(List[dict]): The list of resource requirements. - ray_nodes(Optional[List]): The list of nodes. By default it reads from - ``ray.nodes()``. + ray_nodes(Optional[Dict[str, Dict]]): The resource dictionary keyed by + node id. By default it reads from + ``ray.state.state._available_resources_per_node()``. Returns: successfully_scheduled(List[bool]): A list with the same length as requirements. Each element indicates whether or not the requirement can be satisied. """ - if ray_nodes is None: - ray_nodes = ray.nodes() - - node_to_resources = { - node["NodeID"]: node["Resources"] - for node in ray_nodes if node["Alive"] - } + if ray_resource is None: + ray_resource = ray.state.state._available_resources_per_node() successfully_scheduled = [] @@ -265,7 +261,7 @@ def try_schedule_resources_on_nodes( # Filter out zero value resource_dict = {k: v for k, v in resource_dict.items() if v > 0} - for node_id, node_resource in node_to_resources.items(): + for node_id, node_resource in ray_resource.items(): # Check if we can schedule on this node feasible = True for key, count in resource_dict.items(): @@ -274,7 +270,6 @@ def try_schedule_resources_on_nodes( # If we can, schedule it on this node if feasible: - node_resource = node_to_resources[node_id] for key, count in resource_dict.items(): node_resource[key] -= count