From 2b5119218e0583f30f1d5ca84d6b360cd6451fee Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Sun, 21 Jun 2020 15:38:58 -0700 Subject: [PATCH] [Serve] Raise exception when _scale_replicas is infeasible (#9005) --- python/ray/serve/config.py | 10 ++++-- python/ray/serve/master.py | 26 +++++++++++++- python/ray/serve/tests/test_api.py | 32 +++++++++++++++++ python/ray/serve/tests/test_util.py | 50 ++++++++++++++++++++++++++- python/ray/serve/utils.py | 53 +++++++++++++++++++++++++++++ 5 files changed, 167 insertions(+), 4 deletions(-) diff --git a/python/ray/serve/config.py b/python/ray/serve/config.py index fbbb7fd3f..d05eed013 100644 --- a/python/ray/serve/config.py +++ b/python/ray/serve/config.py @@ -107,6 +107,7 @@ class ReplicaConfig: else: self.ray_actor_options = ray_actor_options + self.resource_dict = {} self._validate() def _validate(self): @@ -138,6 +139,7 @@ class ReplicaConfig: "num_cpus in ray_actor_options must be an int or a float.") elif num_cpus < 0: raise ValueError("num_cpus in ray_actor_options must be >= 0.") + self.resource_dict["CPU"] = num_cpus num_gpus = self.ray_actor_options.get("num_gpus", 0) if not isinstance(num_gpus, (int, float)): @@ -145,6 +147,7 @@ class ReplicaConfig: "num_gpus in ray_actor_options must be an int or a float.") elif num_gpus < 0: raise ValueError("num_gpus in ray_actor_options must be >= 0.") + self.resource_dict["GPU"] = num_gpus memory = self.ray_actor_options.get("memory", 0) if not isinstance(memory, (int, float)): @@ -152,6 +155,7 @@ class ReplicaConfig: "memory in ray_actor_options must be an int or a float.") elif memory < 0: raise ValueError("num_gpus in ray_actor_options must be >= 0.") + self.resource_dict["memory"] = memory object_store_memory = self.ray_actor_options.get( "object_store_memory", 0) @@ -162,8 +166,10 @@ class ReplicaConfig: elif object_store_memory < 0: raise ValueError( "object_store_memory in ray_actor_options must be >= 0.") + self.resource_dict["object_store_memory"] = object_store_memory - if not isinstance( - self.ray_actor_options.get("resources", {}), dict): + custom_resources = self.ray_actor_options.get("resources", {}) + if not isinstance(custom_resources, dict): raise TypeError( "resources in ray_actor_options must be a dictionary.") + self.resource_dict.update(custom_resources) diff --git a/python/ray/serve/master.py b/python/ray/serve/master.py index fa5de7a6a..d7834b9c9 100644 --- a/python/ray/serve/master.py +++ b/python/ray/serve/master.py @@ -13,7 +13,9 @@ from ray.serve.http_proxy import HTTPProxyActor from ray.serve.kv_store import RayInternalKVStore from ray.serve.metric.exporter import MetricExporterActor from ray.serve.router import Router -from ray.serve.utils import (format_actor_name, get_random_letters, logger) +from ray.serve.exceptions import RayServeException +from ray.serve.utils import (format_actor_name, get_random_letters, logger, + try_schedule_resources_on_nodes) import numpy as np @@ -22,6 +24,10 @@ import numpy as np _CRASH_AFTER_CHECKPOINT_PROBABILITY = 0.0 CHECKPOINT_KEY = "serve-master-checkpoint" +# Feature flag for master actor resource checking. If true, master actor will +# error if the desired replicas exceed current resource availability. +_RESOURCE_CHECK_ENABLED = True + @ray.remote class ServeMaster: @@ -424,7 +430,25 @@ class ServeMaster: current_num_replicas = len(self.replicas[backend_tag]) delta_num_replicas = num_replicas - current_num_replicas + _, _, replica_config = self.backends[backend_tag] if delta_num_replicas > 0: + can_schedule = try_schedule_resources_on_nodes( + requirements=[ + replica_config.resource_dict + for _ in range(delta_num_replicas) + ], + ray_nodes=ray.nodes()) + if _RESOURCE_CHECK_ENABLED and not all(can_schedule): + num_possible = sum(can_schedule) + raise RayServeException( + "Cannot scale backend {} to {} replicas. Ray Serve tried " + "to add {} replicas but the resources only allows {} " + "to be added. To fix this, consider scaling to replica to " + "{} or add more resources to the cluster. You can check " + "avaiable resources with ray.nodes().".format( + backend_tag, num_replicas, delta_num_replicas, + num_possible, current_num_replicas + num_possible)) + logger.debug("Adding {} replicas to backend {}".format( delta_num_replicas, backend_tag)) for _ in range(delta_num_replicas): diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index 0f55b4ffa..ef96dccd5 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -7,6 +7,7 @@ import requests import ray from ray import serve from ray.serve.utils import get_random_letters +from ray.serve.exceptions import RayServeException def test_e2e(serve_instance): @@ -514,6 +515,37 @@ def test_endpoint_input_validation(serve_instance): serve.create_endpoint("endpoint", backend="backend") +def test_create_infeasible_error(serve_instance): + serve.init() + + def f(): + pass + + # Non existent resource should be infeasible. + with pytest.raises(RayServeException, match="Cannot scale backend"): + serve.create_backend( + "f:1", + f, + ray_actor_options={"resources": { + "MagicMLResource": 100 + }}) + + # Even each replica might be feasible, the total might not be. + current_cpus = int(ray.nodes()[0]["Resources"]["CPU"]) + with pytest.raises(RayServeException, match="Cannot scale backend"): + serve.create_backend( + "f:1", + f, + ray_actor_options={"resources": { + "CPU": 1, + }}, + config={"num_replicas": current_cpus + 20}) + + # No replica should be created! + replicas = ray.get(serve.api.master_actor._list_replicas.remote("f1")) + assert len(replicas) == 0 + + if __name__ == "__main__": import sys sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/tests/test_util.py b/python/ray/serve/tests/test_util.py index 01cdf3fed..6bf023e6b 100644 --- a/python/ray/serve/tests/test_util.py +++ b/python/ray/serve/tests/test_util.py @@ -1,10 +1,12 @@ import asyncio import json +from copy import deepcopy import numpy as np import pytest -from ray.serve.utils import ServeEncoder, chain_future, unpack_future +from ray.serve.utils import (ServeEncoder, chain_future, unpack_future, + try_schedule_resources_on_nodes) def test_bytes_encoder(): @@ -66,6 +68,52 @@ async def test_future_chaining(): await future +def test_mock_scheduler(): + ray_nodes = [{ + "NodeID": "AAA", + "Alive": True, + "Resources": { + "CPU": 2.0, + "GPU": 2.0 + } + }, { + "NodeID": "BBB", + "Alive": True, + "Resources": { + "CPU": 4.0, + } + }] + + assert try_schedule_resources_on_nodes( + [ + { + "CPU": 2, + "GPU": 2 + }, # node 1 + { + "CPU": 4 + } # node 2 + ], + deepcopy(ray_nodes)) == [True, True] + + assert try_schedule_resources_on_nodes([ + { + "CPU": 100 + }, + { + "GPU": 1 + }, + ], deepcopy(ray_nodes)) == [False, True] + + assert try_schedule_resources_on_nodes( + [ + { + "CPU": 6 + }, # Equals to the sum of cpus but shouldn't be scheduable. + ], + deepcopy(ray_nodes)) == [False] + + if __name__ == "__main__": import sys sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/utils.py b/python/ray/serve/utils.py index 4563009a5..9ee012c4c 100644 --- a/python/ray/serve/utils.py +++ b/python/ray/serve/utils.py @@ -11,6 +11,8 @@ import os import requests from pygments import formatters, highlight, lexers + +import ray from ray.serve.constants import HTTP_PROXY_TIMEOUT from ray.serve.context import FakeFlaskRequest, TaskContext from ray.serve.http_util import build_flask_request @@ -173,3 +175,54 @@ def unpack_future(src: asyncio.Future, num_items: int) -> List[asyncio.Future]: src.add_done_callback(unwrap_callback) return dest_futures + + +def try_schedule_resources_on_nodes( + requirements: List[dict], + ray_nodes: List = None, +) -> List[bool]: + """Test given resource requirements can be scheduled on ray nodes. + + Args: + requirements(List[dict]): The list of resource requirements. + ray_nodes(Optional[List]): The list of nodes. By default it reads from + ``ray.nodes()``. + Returns: + successfully_scheduled(List[bool]): A list with the same length as + requirements. Each element indicates whether or not the requirement + can be satisied. + """ + + if ray_nodes is None: + ray_nodes = ray.nodes() + + node_to_resources = { + node["NodeID"]: node["Resources"] + for node in ray_nodes if node["Alive"] + } + + successfully_scheduled = [] + + for resource_dict in requirements: + # Filter out zero value + resource_dict = {k: v for k, v in resource_dict.items() if v > 0} + + for node_id, node_resource in node_to_resources.items(): + # Check if we can schedule on this node + feasible = True + for key, count in resource_dict.items(): + if node_resource.get(key, 0) - count < 0: + feasible = False + + # If we can, schedule it on this node + if feasible: + node_resource = node_to_resources[node_id] + for key, count in resource_dict.items(): + node_resource[key] -= count + + successfully_scheduled.append(True) + break + else: + successfully_scheduled.append(False) + + return successfully_scheduled