diff --git a/python/ray/serve/autoscaling_policy.py b/python/ray/serve/autoscaling_policy.py index e77cd67f1..817da087e 100644 --- a/python/ray/serve/autoscaling_policy.py +++ b/python/ray/serve/autoscaling_policy.py @@ -46,6 +46,13 @@ class BasicAutoscalingPolicy(AutoscalingPolicy): def __init__(self, backend, config): self.backend = backend + # The minimum number of replicas to scale down to. + self.min_replicas = config.get("min_replicas", 1) + # The maximum number of replicas to scale up to. -1 means there is no + # limit. + self.max_replicas = config.get("max_replicas", -1) + if self.max_replicas == -1: + self.max_replicas = float("inf") # The minimum average queue length to trigger scaling up. self.scale_up_threshold = config.get("scale_up_threshold", 5) # The maximum average queue length to trigger scaling down. @@ -90,16 +97,18 @@ class BasicAutoscalingPolicy(AutoscalingPolicy): # Only actually scale the replicas if we've made this decision for # 'scale_up_consecutive_periods' in a row. - if self.decision_counter >= self.scale_up_consecutive_periods: + if (self.decision_counter >= self.scale_up_consecutive_periods + and curr_replicas < self.max_replicas): # TODO(edoakes): should we be resetting the counter here? self.decision_counter = 0 - new_replicas = curr_replicas + self.scale_up_num_replicas + new_replicas = min(self.max_replicas, + curr_replicas + self.scale_up_num_replicas) logger.info("Increasing number of replicas for backend '{}' " "from {} to {}".format(self.backend, curr_replicas, new_replicas)) # Scale down. - elif avg_queue_len < self.scale_down_threshold and curr_replicas > 1: + elif avg_queue_len < self.scale_down_threshold: # If the previous decision was to scale up (the counter was # positive), reset it to zero before decrementing. if self.decision_counter > 0: @@ -110,10 +119,13 @@ class BasicAutoscalingPolicy(AutoscalingPolicy): # Only actually scale the replicas if we've made this decision for # 'scale_down_consecutive_periods' in a row. if (self.decision_counter <= - -self.scale_down_consecutive_periods + 1): + -self.scale_down_consecutive_periods + 1 + and curr_replicas > self.min_replicas): # TODO(edoakes): should we be resetting the counter here? self.decision_counter = 0 - new_replicas = curr_replicas - self.scale_down_num_replicas + new_replicas = max( + self.min_replicas, + curr_replicas - self.scale_down_num_replicas) logger.info("Decreasing number of replicas for backend '{}' " "from {} to {}".format(self.backend, curr_replicas, new_replicas)) diff --git a/python/ray/serve/config.py b/python/ray/serve/config.py index 17d92ac31..e18328553 100644 --- a/python/ray/serve/config.py +++ b/python/ray/serve/config.py @@ -137,7 +137,10 @@ class ReplicaConfig: raise ValueError("Specifying max_restarts in " "actor_init_args is not allowed.") else: - num_cpus = self.ray_actor_options.get("num_cpus", 1) + # Ray defaults to zero CPUs for placement, we default to one here. + if "num_cpus" not in self.ray_actor_options: + self.ray_actor_options["num_cpus"] = 1 + num_cpus = self.ray_actor_options["num_cpus"] if not isinstance(num_cpus, (int, float)): raise TypeError( "num_cpus in ray_actor_options must be an int or a float.") diff --git a/python/ray/serve/examples/doc/snippet_model_composition.py b/python/ray/serve/examples/doc/snippet_model_composition.py index 1cf3f2c4c..b7704d642 100644 --- a/python/ray/serve/examples/doc/snippet_model_composition.py +++ b/python/ray/serve/examples/doc/snippet_model_composition.py @@ -1,7 +1,9 @@ from random import random import requests +import ray from ray import serve +ray.init(num_cpus=10) serve.init() # Our pipeline will be structured as follows: diff --git a/python/ray/serve/examples/doc/tutorial_batch.py b/python/ray/serve/examples/doc/tutorial_batch.py index c703c8011..f61132297 100644 --- a/python/ray/serve/examples/doc/tutorial_batch.py +++ b/python/ray/serve/examples/doc/tutorial_batch.py @@ -29,6 +29,7 @@ def batch_adder_v0(flask_requests: List): # __doc_define_servable_v0_end__ # __doc_deploy_begin__ +ray.init(num_cpus=10) serve.init() serve.create_backend("adder:v0", batch_adder_v0, config={"max_batch_size": 4}) serve.create_endpoint( diff --git a/python/ray/serve/examples/echo_full.py b/python/ray/serve/examples/echo_full.py index 4fa99b862..52c80c16a 100644 --- a/python/ray/serve/examples/echo_full.py +++ b/python/ray/serve/examples/echo_full.py @@ -7,6 +7,7 @@ import ray.serve as serve from ray.serve.metric import PrometheusExporter # initialize ray serve system. +ray.init(num_cpus=10) serve.init(metric_exporter=PrometheusExporter)