[Serve] Raise exception when _scale_replicas is infeasible (#9005)

This commit is contained in:
Simon Mo
2020-06-21 15:38:58 -07:00
committed by GitHub
parent e254dd3115
commit 2b5119218e
5 changed files with 167 additions and 4 deletions
+8 -2
View File
@@ -107,6 +107,7 @@ class ReplicaConfig:
else:
self.ray_actor_options = ray_actor_options
self.resource_dict = {}
self._validate()
def _validate(self):
@@ -138,6 +139,7 @@ class ReplicaConfig:
"num_cpus in ray_actor_options must be an int or a float.")
elif num_cpus < 0:
raise ValueError("num_cpus in ray_actor_options must be >= 0.")
self.resource_dict["CPU"] = num_cpus
num_gpus = self.ray_actor_options.get("num_gpus", 0)
if not isinstance(num_gpus, (int, float)):
@@ -145,6 +147,7 @@ class ReplicaConfig:
"num_gpus in ray_actor_options must be an int or a float.")
elif num_gpus < 0:
raise ValueError("num_gpus in ray_actor_options must be >= 0.")
self.resource_dict["GPU"] = num_gpus
memory = self.ray_actor_options.get("memory", 0)
if not isinstance(memory, (int, float)):
@@ -152,6 +155,7 @@ class ReplicaConfig:
"memory in ray_actor_options must be an int or a float.")
elif memory < 0:
raise ValueError("num_gpus in ray_actor_options must be >= 0.")
self.resource_dict["memory"] = memory
object_store_memory = self.ray_actor_options.get(
"object_store_memory", 0)
@@ -162,8 +166,10 @@ class ReplicaConfig:
elif object_store_memory < 0:
raise ValueError(
"object_store_memory in ray_actor_options must be >= 0.")
self.resource_dict["object_store_memory"] = object_store_memory
if not isinstance(
self.ray_actor_options.get("resources", {}), dict):
custom_resources = self.ray_actor_options.get("resources", {})
if not isinstance(custom_resources, dict):
raise TypeError(
"resources in ray_actor_options must be a dictionary.")
self.resource_dict.update(custom_resources)
+25 -1
View File
@@ -13,7 +13,9 @@ from ray.serve.http_proxy import HTTPProxyActor
from ray.serve.kv_store import RayInternalKVStore
from ray.serve.metric.exporter import MetricExporterActor
from ray.serve.router import Router
from ray.serve.utils import (format_actor_name, get_random_letters, logger)
from ray.serve.exceptions import RayServeException
from ray.serve.utils import (format_actor_name, get_random_letters, logger,
try_schedule_resources_on_nodes)
import numpy as np
@@ -22,6 +24,10 @@ import numpy as np
_CRASH_AFTER_CHECKPOINT_PROBABILITY = 0.0
CHECKPOINT_KEY = "serve-master-checkpoint"
# Feature flag for master actor resource checking. If true, master actor will
# error if the desired replicas exceed current resource availability.
_RESOURCE_CHECK_ENABLED = True
@ray.remote
class ServeMaster:
@@ -424,7 +430,25 @@ class ServeMaster:
current_num_replicas = len(self.replicas[backend_tag])
delta_num_replicas = num_replicas - current_num_replicas
_, _, replica_config = self.backends[backend_tag]
if delta_num_replicas > 0:
can_schedule = try_schedule_resources_on_nodes(
requirements=[
replica_config.resource_dict
for _ in range(delta_num_replicas)
],
ray_nodes=ray.nodes())
if _RESOURCE_CHECK_ENABLED and not all(can_schedule):
num_possible = sum(can_schedule)
raise RayServeException(
"Cannot scale backend {} to {} replicas. Ray Serve tried "
"to add {} replicas but the resources only allows {} "
"to be added. To fix this, consider scaling to replica to "
"{} or add more resources to the cluster. You can check "
"avaiable resources with ray.nodes().".format(
backend_tag, num_replicas, delta_num_replicas,
num_possible, current_num_replicas + num_possible))
logger.debug("Adding {} replicas to backend {}".format(
delta_num_replicas, backend_tag))
for _ in range(delta_num_replicas):
+32
View File
@@ -7,6 +7,7 @@ import requests
import ray
from ray import serve
from ray.serve.utils import get_random_letters
from ray.serve.exceptions import RayServeException
def test_e2e(serve_instance):
@@ -514,6 +515,37 @@ def test_endpoint_input_validation(serve_instance):
serve.create_endpoint("endpoint", backend="backend")
def test_create_infeasible_error(serve_instance):
serve.init()
def f():
pass
# Non existent resource should be infeasible.
with pytest.raises(RayServeException, match="Cannot scale backend"):
serve.create_backend(
"f:1",
f,
ray_actor_options={"resources": {
"MagicMLResource": 100
}})
# Even each replica might be feasible, the total might not be.
current_cpus = int(ray.nodes()[0]["Resources"]["CPU"])
with pytest.raises(RayServeException, match="Cannot scale backend"):
serve.create_backend(
"f:1",
f,
ray_actor_options={"resources": {
"CPU": 1,
}},
config={"num_replicas": current_cpus + 20})
# No replica should be created!
replicas = ray.get(serve.api.master_actor._list_replicas.remote("f1"))
assert len(replicas) == 0
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", "-s", __file__]))
+49 -1
View File
@@ -1,10 +1,12 @@
import asyncio
import json
from copy import deepcopy
import numpy as np
import pytest
from ray.serve.utils import ServeEncoder, chain_future, unpack_future
from ray.serve.utils import (ServeEncoder, chain_future, unpack_future,
try_schedule_resources_on_nodes)
def test_bytes_encoder():
@@ -66,6 +68,52 @@ async def test_future_chaining():
await future
def test_mock_scheduler():
ray_nodes = [{
"NodeID": "AAA",
"Alive": True,
"Resources": {
"CPU": 2.0,
"GPU": 2.0
}
}, {
"NodeID": "BBB",
"Alive": True,
"Resources": {
"CPU": 4.0,
}
}]
assert try_schedule_resources_on_nodes(
[
{
"CPU": 2,
"GPU": 2
}, # node 1
{
"CPU": 4
} # node 2
],
deepcopy(ray_nodes)) == [True, True]
assert try_schedule_resources_on_nodes([
{
"CPU": 100
},
{
"GPU": 1
},
], deepcopy(ray_nodes)) == [False, True]
assert try_schedule_resources_on_nodes(
[
{
"CPU": 6
}, # Equals to the sum of cpus but shouldn't be scheduable.
],
deepcopy(ray_nodes)) == [False]
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", "-s", __file__]))
+53
View File
@@ -11,6 +11,8 @@ import os
import requests
from pygments import formatters, highlight, lexers
import ray
from ray.serve.constants import HTTP_PROXY_TIMEOUT
from ray.serve.context import FakeFlaskRequest, TaskContext
from ray.serve.http_util import build_flask_request
@@ -173,3 +175,54 @@ def unpack_future(src: asyncio.Future, num_items: int) -> List[asyncio.Future]:
src.add_done_callback(unwrap_callback)
return dest_futures
def try_schedule_resources_on_nodes(
requirements: List[dict],
ray_nodes: List = None,
) -> List[bool]:
"""Test given resource requirements can be scheduled on ray nodes.
Args:
requirements(List[dict]): The list of resource requirements.
ray_nodes(Optional[List]): The list of nodes. By default it reads from
``ray.nodes()``.
Returns:
successfully_scheduled(List[bool]): A list with the same length as
requirements. Each element indicates whether or not the requirement
can be satisied.
"""
if ray_nodes is None:
ray_nodes = ray.nodes()
node_to_resources = {
node["NodeID"]: node["Resources"]
for node in ray_nodes if node["Alive"]
}
successfully_scheduled = []
for resource_dict in requirements:
# Filter out zero value
resource_dict = {k: v for k, v in resource_dict.items() if v > 0}
for node_id, node_resource in node_to_resources.items():
# Check if we can schedule on this node
feasible = True
for key, count in resource_dict.items():
if node_resource.get(key, 0) - count < 0:
feasible = False
# If we can, schedule it on this node
if feasible:
node_resource = node_to_resources[node_id]
for key, count in resource_dict.items():
node_resource[key] -= count
successfully_scheduled.append(True)
break
else:
successfully_scheduled.append(False)
return successfully_scheduled