mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 14:06:42 +08:00
[serve] Add min and max limits to autoscaling (#9955)
This commit is contained in:
@@ -46,6 +46,13 @@ class BasicAutoscalingPolicy(AutoscalingPolicy):
|
||||
def __init__(self, backend, config):
|
||||
self.backend = backend
|
||||
|
||||
# The minimum number of replicas to scale down to.
|
||||
self.min_replicas = config.get("min_replicas", 1)
|
||||
# The maximum number of replicas to scale up to. -1 means there is no
|
||||
# limit.
|
||||
self.max_replicas = config.get("max_replicas", -1)
|
||||
if self.max_replicas == -1:
|
||||
self.max_replicas = float("inf")
|
||||
# The minimum average queue length to trigger scaling up.
|
||||
self.scale_up_threshold = config.get("scale_up_threshold", 5)
|
||||
# The maximum average queue length to trigger scaling down.
|
||||
@@ -90,16 +97,18 @@ class BasicAutoscalingPolicy(AutoscalingPolicy):
|
||||
|
||||
# Only actually scale the replicas if we've made this decision for
|
||||
# 'scale_up_consecutive_periods' in a row.
|
||||
if self.decision_counter >= self.scale_up_consecutive_periods:
|
||||
if (self.decision_counter >= self.scale_up_consecutive_periods
|
||||
and curr_replicas < self.max_replicas):
|
||||
# TODO(edoakes): should we be resetting the counter here?
|
||||
self.decision_counter = 0
|
||||
new_replicas = curr_replicas + self.scale_up_num_replicas
|
||||
new_replicas = min(self.max_replicas,
|
||||
curr_replicas + self.scale_up_num_replicas)
|
||||
logger.info("Increasing number of replicas for backend '{}' "
|
||||
"from {} to {}".format(self.backend, curr_replicas,
|
||||
new_replicas))
|
||||
|
||||
# Scale down.
|
||||
elif avg_queue_len < self.scale_down_threshold and curr_replicas > 1:
|
||||
elif avg_queue_len < self.scale_down_threshold:
|
||||
# If the previous decision was to scale up (the counter was
|
||||
# positive), reset it to zero before decrementing.
|
||||
if self.decision_counter > 0:
|
||||
@@ -110,10 +119,13 @@ class BasicAutoscalingPolicy(AutoscalingPolicy):
|
||||
# Only actually scale the replicas if we've made this decision for
|
||||
# 'scale_down_consecutive_periods' in a row.
|
||||
if (self.decision_counter <=
|
||||
-self.scale_down_consecutive_periods + 1):
|
||||
-self.scale_down_consecutive_periods + 1
|
||||
and curr_replicas > self.min_replicas):
|
||||
# TODO(edoakes): should we be resetting the counter here?
|
||||
self.decision_counter = 0
|
||||
new_replicas = curr_replicas - self.scale_down_num_replicas
|
||||
new_replicas = max(
|
||||
self.min_replicas,
|
||||
curr_replicas - self.scale_down_num_replicas)
|
||||
logger.info("Decreasing number of replicas for backend '{}' "
|
||||
"from {} to {}".format(self.backend, curr_replicas,
|
||||
new_replicas))
|
||||
|
||||
@@ -137,7 +137,10 @@ class ReplicaConfig:
|
||||
raise ValueError("Specifying max_restarts in "
|
||||
"actor_init_args is not allowed.")
|
||||
else:
|
||||
num_cpus = self.ray_actor_options.get("num_cpus", 1)
|
||||
# Ray defaults to zero CPUs for placement, we default to one here.
|
||||
if "num_cpus" not in self.ray_actor_options:
|
||||
self.ray_actor_options["num_cpus"] = 1
|
||||
num_cpus = self.ray_actor_options["num_cpus"]
|
||||
if not isinstance(num_cpus, (int, float)):
|
||||
raise TypeError(
|
||||
"num_cpus in ray_actor_options must be an int or a float.")
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
from random import random
|
||||
import requests
|
||||
import ray
|
||||
from ray import serve
|
||||
|
||||
ray.init(num_cpus=10)
|
||||
serve.init()
|
||||
|
||||
# Our pipeline will be structured as follows:
|
||||
|
||||
@@ -29,6 +29,7 @@ def batch_adder_v0(flask_requests: List):
|
||||
# __doc_define_servable_v0_end__
|
||||
|
||||
# __doc_deploy_begin__
|
||||
ray.init(num_cpus=10)
|
||||
serve.init()
|
||||
serve.create_backend("adder:v0", batch_adder_v0, config={"max_batch_size": 4})
|
||||
serve.create_endpoint(
|
||||
|
||||
@@ -7,6 +7,7 @@ import ray.serve as serve
|
||||
from ray.serve.metric import PrometheusExporter
|
||||
|
||||
# initialize ray serve system.
|
||||
ray.init(num_cpus=10)
|
||||
serve.init(metric_exporter=PrometheusExporter)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user