[serve] Rename to use replicas, not workers (#11822)

This commit is contained in:
Ian Rodney
2020-11-10 11:36:15 -08:00
committed by GitHub
parent 9b8218aabd
commit 1d158dda32
14 changed files with 172 additions and 170 deletions
+2 -2
View File
@@ -166,7 +166,7 @@ class Client:
config_options(dict, serve.BackendConfig): Backend config options
to update. Either a BackendConfig object or a dict mapping
strings to values for the following supported options:
- "num_replicas": number of worker processes to start up that
- "num_replicas": number of processes to start up that
will handle requests to this backend.
- "max_batch_size": the maximum number of requests that will
be processed in one batch by this backend.
@@ -221,7 +221,7 @@ class Client:
config (dict, serve.BackendConfig, optional): configuration options
for this backend. Either a BackendConfig, or a dictionary
mapping strings to values for the following supported options:
- "num_replicas": number of worker processes to start up that
- "num_replicas": number of processes to start up that
will handle requests to this backend.
- "max_batch_size": the maximum number of requests that will
be processed in one batch by this backend.
+12 -11
View File
@@ -87,8 +87,8 @@ class BatchQueue:
return batch
def create_backend_worker(func_or_class: Union[Callable, Type[Callable]]):
"""Creates a worker class wrapping the provided function or class."""
def create_backend_replica(func_or_class: Union[Callable, Type[Callable]]):
"""Creates a replica class wrapping the provided function or class."""
if inspect.isfunction(func_or_class):
is_function = True
@@ -98,7 +98,7 @@ def create_backend_worker(func_or_class: Union[Callable, Type[Callable]]):
assert False, "func_or_class must be function or class."
# TODO(architkulkarni): Add type hints after upgrading cloudpickle
class RayServeWrappedWorker(object):
class RayServeWrappedReplica(object):
def __init__(self, backend_tag, replica_tag, init_args,
backend_config: BackendConfig, controller_name: str):
# Set the controller name so that serve.connect() will connect to
@@ -109,8 +109,8 @@ def create_backend_worker(func_or_class: Union[Callable, Type[Callable]]):
else:
_callable = func_or_class(*init_args)
self.backend = RayServeWorker(backend_tag, replica_tag, _callable,
backend_config, is_function)
self.backend = RayServeReplica(backend_tag, replica_tag, _callable,
backend_config, is_function)
async def handle_request(self, request):
return await self.backend.handle_request(request)
@@ -121,8 +121,9 @@ def create_backend_worker(func_or_class: Union[Callable, Type[Callable]]):
def ready(self):
pass
RayServeWrappedWorker.__name__ = "RayServeWorker_" + func_or_class.__name__
return RayServeWrappedWorker
RayServeWrappedReplica.__name__ = "RayServeReplica_{}".format(
func_or_class.__name__)
return RayServeWrappedReplica
def wrap_to_ray_error(exception: Exception) -> RayTaskError:
@@ -140,7 +141,7 @@ def ensure_async(func: Callable) -> Callable:
return sync_to_async(func)
class RayServeWorker:
class RayServeReplica:
"""Handles requests with the provided callable."""
def __init__(self, backend_tag: str, replica_tag: str, _callable: Callable,
@@ -172,8 +173,8 @@ class RayServeWorker:
self.error_counter.set_default_tags({"backend": self.backend_tag})
self.restart_counter = metrics.Count(
"backend_worker_starts",
description=("The number of time this replica workers "
"backend_replica_starts",
description=("The number of time this replica "
"has been restarted due to failure."),
tag_keys=("backend", "replica_tag"))
self.restart_counter.set_default_tags({
@@ -288,7 +289,7 @@ class RayServeWorker:
if not isinstance(result_list, Iterable) or isinstance(
result_list, (dict, set)):
error_message = ("RayServe expects an ordered iterable object "
"but the worker returned a {}".format(
"but the replica returned a {}".format(
type(result_list)))
raise RayServeException(error_message)
+4 -4
View File
@@ -30,7 +30,7 @@ class BackendMetadata:
class BackendConfig(BaseModel):
"""Configuration options for a backend, to be set by the user.
:param num_replicas: The number of worker processes to start up that will
:param num_replicas: The number of processes to start up that will
handle requests to this backend. Defaults to 0.
:type num_replicas: int, optional
:param max_batch_size: The maximum number of requests that will be
@@ -81,7 +81,7 @@ class BackendConfig(BaseModel):
# Dynamic default for max_concurrent_queries
@validator("max_concurrent_queries", always=True)
def set_max_queries_by_mode(cls, v, values):
def set_max_queries_by_mode(cls, v, values): # noqa 805
if v is None:
# Model serving mode: if the servable is blocking and the wait
# timeout is default zero seconds, then we keep the existing
@@ -95,8 +95,8 @@ class BackendConfig(BaseModel):
v = 8
# Pipeline/async mode: if the servable is not blocking,
# router should just keep pushing queries to the worker
# replicas until a high limit.
# router should just keep pushing queries to the replicas
# until a high limit.
if not values["internal_metadata"].is_blocking:
v = ASYNC_CONCURRENCY
+100 -101
View File
@@ -11,7 +11,7 @@ from pydantic import BaseModel
import ray
import ray.cloudpickle as pickle
from ray.serve.autoscaling_policy import BasicAutoscalingPolicy
from ray.serve.backend_worker import create_backend_worker
from ray.serve.backend_worker import create_backend_replica
from ray.serve.constants import ASYNC_CONCURRENCY, SERVE_PROXY_NAME
from ray.serve.http_proxy import HTTPProxyActor
from ray.serve.kv_store import RayInternalKVStore
@@ -73,7 +73,7 @@ class TrafficPolicy:
class BackendInfo(BaseModel):
# TODO(architkulkarni): Add type hint for worker_class after upgrading
# cloudpickle and adding types to RayServeWrappedWorker
# cloudpickle and adding types to RayServeWrappedReplica
worker_class: Any
backend_config: BackendConfig
replica_config: ReplicaConfig
@@ -112,94 +112,94 @@ class ActorStateReconciler:
detached: bool = field(init=True)
routers_cache: Dict[NodeId, ActorHandle] = field(default_factory=dict)
replicas: Dict[BackendTag, List[ReplicaTag]] = field(
backend_replicas: Dict[BackendTag, Dict[ReplicaTag, ActorHandle]] = field(
default_factory=lambda: defaultdict(dict))
backend_replicas_to_start: Dict[BackendTag, List[ReplicaTag]] = field(
default_factory=lambda: defaultdict(list))
replicas_to_start: Dict[BackendTag, List[ReplicaTag]] = field(
default_factory=lambda: defaultdict(list))
replicas_to_stop: Dict[BackendTag, List[ReplicaTag]] = field(
backend_replicas_to_stop: Dict[BackendTag, List[ReplicaTag]] = field(
default_factory=lambda: defaultdict(list))
backends_to_remove: List[BackendTag] = field(default_factory=list)
endpoints_to_remove: List[EndpointTag] = field(default_factory=list)
# TODO(edoakes): consider removing this and just using the names.
workers: Dict[BackendTag, Dict[ReplicaConfig, ActorHandle]] = field(
default_factory=lambda: defaultdict(dict))
def router_handles(self) -> List[ActorHandle]:
return list(self.routers_cache.values())
def worker_handles(self) -> List[ActorHandle]:
def get_replica_handles(self) -> List[ActorHandle]:
return list(
chain.from_iterable([
replica_dict.values()
for replica_dict in self.workers.values()
for replica_dict in self.backend_replicas.values()
]))
def get_replica_actors(self, backend_tag: BackendTag) -> List[ActorHandle]:
return_list = []
for replica_tag in self.replicas.get(backend_tag, []):
try:
replica_name = format_actor_name(replica_tag,
self.controller_name)
return_list.append(ray.get_actor(replica_name))
except ValueError:
pass
return return_list
def get_replica_tags(self) -> List[ReplicaTag]:
return list(
chain.from_iterable([
replica_dict.keys()
for replica_dict in self.backend_replicas.values()
]))
async def _start_pending_replicas(
def get_replica_handles_for_backend(
self, backend_tag: BackendTag) -> List[ActorHandle]:
return list(self.backend_replicas.get(backend_tag, {}).values())
async def _start_pending_backend_replicas(
self, config_store: ConfigurationStore) -> None:
"""Starts the pending backend replicas in self.replicas_to_start.
"""Starts the pending backend replicas in self.backend_replicas_to_start.
Starts the worker, then pushes an update to the router to add it to
the proper backend. If the worker has already been started, only
Starts the replica, then pushes an update to the router to add it to
the proper backend. If the replica has already been started, only
updates the router.
Clears self.replicas_to_start.
Clears self.backend_replicas_to_start.
"""
replica_started_futures = []
for backend_tag, replicas_to_create in self.replicas_to_start.items():
for backend_tag, replicas_to_create in self.backend_replicas_to_start.\
items():
for replica_tag in replicas_to_create:
replica_started_futures.append(
self._start_replica(config_store, backend_tag,
replica_tag))
self._start_backend_replicas(config_store, backend_tag,
replica_tag))
# Wait on all creation task futures together.
await asyncio.gather(*replica_started_futures)
self.replicas_to_start.clear()
self.backend_replicas_to_start.clear()
async def _start_replica(self, config_store: ConfigurationStore,
backend_tag: BackendTag,
replica_tag: ReplicaTag) -> None:
async def _start_backend_replicas(self, config_store: ConfigurationStore,
backend_tag: BackendTag,
replica_tag: ReplicaTag) -> None:
# NOTE(edoakes): the replicas may already be created if we
# failed after creating them but before writing a
# checkpoint.
replica_name = format_actor_name(replica_tag, self.controller_name)
try:
worker_handle = ray.get_actor(replica_name)
replica_handle = ray.get_actor(replica_name)
except ValueError:
worker_handle = await self._start_backend_worker(
replica_handle = await self._start_single_replica(
config_store, backend_tag, replica_tag, replica_name)
self.replicas[backend_tag].append(replica_tag)
self.workers[backend_tag][replica_tag] = worker_handle
self.backend_replicas[backend_tag][replica_tag] = replica_handle
# Register the worker with the router.
# Register the replica with the router.
await asyncio.gather(*[
router.add_new_worker.remote(backend_tag, replica_tag,
worker_handle)
router.add_new_replica.remote(backend_tag, replica_tag,
replica_handle)
for router in self.router_handles()
])
def _scale_replicas(self, backends: Dict[BackendTag, BackendInfo],
backend_tag: BackendTag, num_replicas: int) -> None:
def _scale_backend_replicas(self, backends: Dict[BackendTag, BackendInfo],
backend_tag: BackendTag,
num_replicas: int) -> None:
"""Scale the given backend to the number of replicas.
NOTE: this does not actually start or stop the replicas, but instead
adds the intention to start/stop them to self.workers_to_start and
self.workers_to_stop. The caller is responsible for then first writing
a checkpoint and then actually starting/stopping the intended replicas.
This avoids inconsistencies with starting/stopping a worker and then
crashing before writing a checkpoint.
adds the intention to start/stop them to self.backend_replicas_to_start
and self.backend_replicas_to_stop. The caller is responsible for then
first writing a checkpoint and then actually starting/stopping the
intended replicas. This avoids inconsistencies with starting/stopping a
replica and then crashing before writing a checkpoint.
"""
logger.debug("Scaling backend '{}' to {} replicas".format(
backend_tag, num_replicas))
@@ -208,7 +208,7 @@ class ActorStateReconciler:
assert num_replicas >= 0, ("Number of replicas must be"
" greater than or equal to 0.")
current_num_replicas = len(self.replicas[backend_tag])
current_num_replicas = len(self.backend_replicas[backend_tag])
delta_num_replicas = num_replicas - current_num_replicas
backend_info = backends[backend_tag]
@@ -233,30 +233,28 @@ class ActorStateReconciler:
delta_num_replicas, backend_tag))
for _ in range(delta_num_replicas):
replica_tag = "{}#{}".format(backend_tag, get_random_letters())
self.replicas_to_start[backend_tag].append(replica_tag)
self.backend_replicas_to_start[backend_tag].append(replica_tag)
elif delta_num_replicas < 0:
logger.debug("Removing {} replicas from backend '{}'".format(
-delta_num_replicas, backend_tag))
assert len(self.replicas[backend_tag]) >= delta_num_replicas
assert len(
self.backend_replicas[backend_tag]) >= delta_num_replicas
for _ in range(-delta_num_replicas):
replica_tag = self.replicas[backend_tag].pop()
if len(self.replicas[backend_tag]) == 0:
del self.replicas[backend_tag]
replica_tag, _ = self.backend_replicas[backend_tag].popitem()
if len(self.backend_replicas[backend_tag]) == 0:
del self.backend_replicas[backend_tag]
del self.workers[backend_tag][replica_tag]
if len(self.workers[backend_tag]) == 0:
del self.workers[backend_tag]
self.backend_replicas_to_stop[backend_tag].append(replica_tag)
self.replicas_to_stop[backend_tag].append(replica_tag)
async def _stop_pending_backend_replicas(self) -> None:
"""Stops the pending backend replicas in self.backend_replicas_to_stop.
async def _stop_pending_replicas(self) -> None:
"""Stops the pending backend replicas in self.replicas_to_stop.
Removes workers from the router, kills them, and clears
self.replicas_to_stop.
Removes backend_replicas from the router, kills them, and clears
self.backend_replicas_to_stop.
"""
for backend_tag, replicas_list in self.replicas_to_stop.items():
for backend_tag, replicas_list in self.backend_replicas_to_stop.items(
):
for replica_tag in replicas_list:
# NOTE(edoakes): the replicas may already be stopped if we
# failed after stopping them but before writing a checkpoint.
@@ -269,7 +267,7 @@ class ActorStateReconciler:
# Remove the replica from router. This call is idempotent.
await asyncio.gather(*[
router.remove_worker.remote(backend_tag, replica_tag)
router.remove_replica.remote(backend_tag, replica_tag)
for router in self.router_handles()
])
@@ -280,7 +278,7 @@ class ActorStateReconciler:
# successfully killed the worker or not.
ray.kill(replica, no_restart=True)
self.replicas_to_stop.clear()
self.backend_replicas_to_stop.clear()
async def _remove_pending_backends(self) -> None:
"""Removes the pending backends in self.backends_to_remove.
@@ -294,19 +292,19 @@ class ActorStateReconciler:
])
self.backends_to_remove.clear()
async def _start_backend_worker(
async def _start_single_replica(
self, config_store: ConfigurationStore, backend_tag: BackendTag,
replica_tag: ReplicaTag, replica_name: str) -> ActorHandle:
"""Creates a backend worker and waits for it to start up.
"""Creates a backend replica and waits for it to start up.
Assumes that the backend configuration has already been registered
in the ConfigurationStore.
"""
logger.debug("Starting worker '{}' for backend '{}'.".format(
logger.debug("Starting replica '{}' for backend '{}'.".format(
replica_tag, backend_tag))
backend_info = config_store.get_backend(backend_tag)
worker_handle = ray.remote(backend_info.worker_class).options(
replica_handle = ray.remote(backend_info.worker_class).options(
name=replica_name,
lifetime="detached" if self.detached else None,
max_restarts=-1,
@@ -316,8 +314,8 @@ class ActorStateReconciler:
backend_info.replica_config.actor_init_args,
backend_info.backend_config, self.controller_name)
# TODO(edoakes): we should probably have a timeout here.
await worker_handle.ready.remote()
return worker_handle
await replica_handle.ready.remote()
return replica_handle
def _start_routers_if_needed(self, http_host: str, http_port: str,
http_middlewares: List[Any]) -> None:
@@ -394,15 +392,15 @@ class ActorStateReconciler:
self.routers_cache[node_id] = ray.get_actor(router_name)
# Fetch actor handles for all of the backend replicas in the system.
# All of these workers are guaranteed to already exist because they
# would not be written to a checkpoint in self.workers until they were
# created.
for backend_tag, replica_tags in self.replicas.items():
for replica_tag in replica_tags:
# All of these backend_replicas are guaranteed to already exist because
# they would not be written to a checkpoint in self.backend_replicas
# until they were created.
for backend_tag, replica_dict in self.backend_replicas.items():
for replica_tag in replica_dict.keys():
replica_name = format_actor_name(replica_tag,
self.controller_name)
self.workers[backend_tag][replica_tag] = ray.get_actor(
replica_name)
self.backend_replicas[backend_tag][
replica_tag] = ray.get_actor(replica_name)
async def _recover_from_checkpoint(
self, config_store: ConfigurationStore,
@@ -418,11 +416,11 @@ class ActorStateReconciler:
for router in self.router_handles()
])
for backend_tag, replica_dict in self.workers.items():
for replica_tag, worker in replica_dict.items():
for backend_tag, replica_dict in self.backend_replicas.items():
for replica_tag, replica_handle in replica_dict.items():
await asyncio.gather(*[
router.add_new_worker.remote(backend_tag, replica_tag,
worker)
router.add_new_replica.remote(backend_tag, replica_tag,
replica_handle)
for router in self.router_handles()
])
@@ -444,8 +442,8 @@ class ActorStateReconciler:
])
# Start/stop any pending backend replicas.
await self._start_pending_replicas(config_store)
await self._stop_pending_replicas()
await self._start_pending_backend_replicas(config_store)
await self._stop_pending_backend_replicas()
# Remove any pending backends and endpoints.
await self._remove_pending_backends()
@@ -572,7 +570,7 @@ class ServeController:
1) Deserializes the internal state from the checkpoint.
2) Pushes the latest configuration to the routers
in case we crashed before updating them.
3) Starts/stops any worker replicas that are pending creation or
3) Starts/stops any replicas that are pending creation or
deletion.
NOTE: this requires that self.write_lock is already acquired and will
@@ -630,17 +628,17 @@ class ServeController:
"""Fetched by the router on startup."""
return self.configuration_store.traffic_policies
def _list_replicas(self, backend_tag: BackendTag) -> List[str]:
def _list_replicas(self, backend_tag: BackendTag) -> List[ReplicaTag]:
"""Used only for testing."""
return self.actor_reconciler.replicas[backend_tag]
return list(self.actor_reconciler.backend_replicas[backend_tag].keys())
def get_traffic_policy(self, endpoint: str) -> TrafficPolicy:
"""Fetched by serve handles."""
return self.configuration_store.traffic_policies[endpoint]
def get_all_worker_handles(self) -> Dict[str, Dict[str, ActorHandle]]:
def get_all_replica_handles(self) -> Dict[str, Dict[str, ActorHandle]]:
"""Fetched by the router on startup."""
return self.actor_reconciler.workers
return self.actor_reconciler.backend_replicas
def get_all_backends(self) -> Dict[str, BackendConfig]:
"""Returns a dictionary of backend tag to backend config."""
@@ -829,7 +827,7 @@ class ServeController:
and backend_info.replica_config == replica_config):
return
backend_worker = create_backend_worker(
backend_replica = create_backend_replica(
replica_config.func_or_class)
# Save creator that starts replicas, the arguments to be passed in,
@@ -837,7 +835,7 @@ class ServeController:
self.configuration_store.add_backend(
backend_tag,
BackendInfo(
worker_class=backend_worker,
worker_class=backend_replica,
backend_config=backend_config,
replica_config=replica_config))
metadata = backend_config.internal_metadata
@@ -847,7 +845,7 @@ class ServeController:
backend_tag, metadata.autoscaling_config)
try:
self.actor_reconciler._scale_replicas(
self.actor_reconciler._scale_backend_replicas(
self.configuration_store.backends, backend_tag,
backend_config.num_replicas)
except RayServeException as e:
@@ -858,7 +856,7 @@ class ServeController:
# or pushing the updated config to avoid inconsistent state if we
# crash while making the change.
self._checkpoint()
await self.actor_reconciler._start_pending_replicas(
await self.actor_reconciler._start_pending_backend_replicas(
self.configuration_store)
# Set the backend config inside the router
@@ -888,8 +886,8 @@ class ServeController:
# Scale its replicas down to 0. This will also remove the backend
# from self.configuration_store.backends and
# self.actor_reconciler.replicas.
self.actor_reconciler._scale_replicas(
# self.actor_reconciler.backend_replicas.
self.actor_reconciler._scale_backend_replicas(
self.configuration_store.backends, backend_tag, 0)
# Remove the backend's metadata.
@@ -904,7 +902,7 @@ class ServeController:
# backend from the router to avoid inconsistent state if we crash
# after pushing the update.
self._checkpoint()
await self.actor_reconciler._stop_pending_replicas()
await self.actor_reconciler._stop_pending_backend_replicas()
await self.actor_reconciler._remove_pending_backends()
async def update_backend_config(
@@ -930,7 +928,7 @@ class ServeController:
backend_tag).backend_config = backend_config
# Scale the replicas with the new configuration.
self.actor_reconciler._scale_replicas(
self.actor_reconciler._scale_backend_replicas(
self.configuration_store.backends, backend_tag,
backend_config.num_replicas)
@@ -946,9 +944,9 @@ class ServeController:
for router in self.actor_reconciler.router_handles()
])
await self.actor_reconciler._start_pending_replicas(
await self.actor_reconciler._start_pending_backend_replicas(
self.configuration_store)
await self.actor_reconciler._stop_pending_replicas()
await self.actor_reconciler._stop_pending_backend_replicas()
await self.broadcast_backend_config(backend_tag)
@@ -956,8 +954,9 @@ class ServeController:
backend_config = self.configuration_store.get_backend(
backend_tag).backend_config
broadcast_futures = [
replica.update_config.remote(backend_config).as_future() for
replica in self.actor_reconciler.get_replica_actors(backend_tag)
replica.update_config.remote(backend_config).as_future()
for replica in
self.actor_reconciler.get_replica_handles_for_backend(backend_tag)
]
await asyncio.gather(*broadcast_futures)
@@ -972,7 +971,7 @@ class ServeController:
async with self.write_lock:
for router in self.actor_reconciler.router_handles():
ray.kill(router, no_restart=True)
for replica in self.actor_reconciler.worker_handles():
for replica in self.actor_reconciler.get_replica_handles():
ray.kill(replica, no_restart=True)
self.kv_store.delete(CHECKPOINT_KEY)
+5 -5
View File
@@ -186,9 +186,9 @@ class HTTPProxyActor:
self.app.set_route_table(route_table)
# ------ Proxy router logic ------ #
async def add_new_worker(self, backend_tag, replica_tag, worker_handle):
return await self.app.router.add_new_worker(backend_tag, replica_tag,
worker_handle)
async def add_new_replica(self, backend_tag, replica_tag, worker_handle):
return await self.app.router.add_new_replica(backend_tag, replica_tag,
worker_handle)
async def set_traffic(self, endpoint, traffic_policy):
return await self.app.router.set_traffic(endpoint, traffic_policy)
@@ -202,8 +202,8 @@ class HTTPProxyActor:
async def remove_endpoint(self, endpoint):
return await self.app.router.remove_endpoint(endpoint)
async def remove_worker(self, backend_tag, replica_tag):
return await self.app.router.remove_worker(backend_tag, replica_tag)
async def remove_replica(self, backend_tag, replica_tag):
return await self.app.router.remove_replica(backend_tag, replica_tag)
async def enqueue_request(self, request_meta, *request_args,
**request_kwargs):
+14 -12
View File
@@ -53,10 +53,10 @@ class Query:
def ray_serialize(self):
# NOTE: this method is needed because Query need to be serialized and
# sent to the replica worker. However, after we send the query to
# replica worker the async_future is still needed to retrieve the final
# result. Therefore we need a way to pass the information to replica
# worker without removing async_future.
# sent to the replica. However, after we send the query to the
# replica the async_future is still needed to retrieve the final
# result. Therefore we need a way to pass the information to replicas
# without removing async_future.
clone = copy.copy(self.__dict__)
clone.pop("async_future")
return pickle.dumps(clone)
@@ -68,7 +68,7 @@ class Query:
class Router:
"""A router that routes request to available workers."""
"""A router that routes request to available replicas."""
async def setup(self, name, controller_name):
# Note: Several queues are used in the router
@@ -117,7 +117,7 @@ class Router:
self.flush_lock = asyncio.Lock()
# -- State Restoration -- #
# Fetch the worker handles, traffic policies, and backend configs from
# Fetch the replica handles, traffic policies, and backend configs from
# the controller. We use a "pull-based" approach instead of pushing
# them from the controller so that the router can transparently recover
# from failure.
@@ -128,10 +128,12 @@ class Router:
for endpoint, traffic_policy in traffic_policies.items():
await self.set_traffic(endpoint, traffic_policy)
backend_dict = ray.get(self.controller.get_all_worker_handles.remote())
backend_dict = ray.get(
self.controller.get_all_replica_handles.remote())
for backend_tag, replica_dict in backend_dict.items():
for replica_tag, worker in replica_dict.items():
await self.add_new_worker(backend_tag, replica_tag, worker)
for replica_tag, replica_handle in replica_dict.items():
await self.add_new_replica(backend_tag, replica_tag,
replica_handle)
backend_configs = ray.get(self.controller.get_backend_configs.remote())
for backend, backend_config in backend_configs.items():
@@ -193,11 +195,11 @@ class Router:
request_meta.request_id, request_time_ms))
return result
async def add_new_worker(self, backend_tag, replica_tag, worker_handle):
async def add_new_replica(self, backend_tag, replica_tag, replica_handle):
backend_replica_tag = backend_tag + ":" + replica_tag
if backend_replica_tag in self.replicas:
return
self.replicas[backend_replica_tag] = worker_handle
self.replicas[backend_replica_tag] = replica_handle
logger.debug("New worker added for backend '{}'".format(backend_tag))
await self.mark_worker_idle(backend_tag, backend_replica_tag)
@@ -214,7 +216,7 @@ class Router:
self.worker_queues[backend_tag].appendleft(backend_replica_tag)
self.flush_backend_queues([backend_tag])
async def remove_worker(self, backend_tag, replica_tag):
async def remove_replica(self, backend_tag, replica_tag):
backend_replica_tag = backend_tag + ":" + replica_tag
if backend_replica_tag not in self.replicas:
return
+1 -1
View File
@@ -338,7 +338,7 @@ def test_updating_config(serve_instance, use_legacy_config):
controller._list_replicas.remote("bsimple:v1"))
new_all_tag_list = []
for worker_dict in ray.get(
controller.get_all_worker_handles.remote()).values():
controller.get_all_replica_handles.remote()).values():
new_all_tag_list.extend(list(worker_dict.keys()))
# the old and new replica tag list should be identical
@@ -6,7 +6,7 @@ import numpy as np
import ray
from ray import serve
import ray.serve.context as context
from ray.serve.backend_worker import create_backend_worker, wrap_to_ray_error
from ray.serve.backend_worker import create_backend_replica, wrap_to_ray_error
from ray.serve.controller import TrafficPolicy
from ray.serve.router import Router, RequestMetadata
from ray.serve.config import BackendConfig, BackendMetadata
@@ -27,7 +27,7 @@ def setup_worker(name,
@ray.remote
class WorkerActor:
def __init__(self):
self.worker = create_backend_worker(func_or_class)(
self.worker = create_backend_replica(func_or_class)(
name, name + ":tag", init_args, backend_config,
controller_name)
@@ -47,7 +47,7 @@ def setup_worker(name,
async def add_servable_to_router(servable, router, **kwargs):
worker = setup_worker("backend", servable, **kwargs)
await router.add_new_worker.remote("backend", "replica", worker)
await router.add_new_replica.remote("backend", "replica", worker)
await router.set_traffic.remote("endpoint", TrafficPolicy({
"backend": 1.0
}))
+1 -1
View File
@@ -113,7 +113,7 @@ def test_http_proxy_failure(serve_instance):
def _get_worker_handles(client, backend):
controller = client._controller
backend_dict = ray.get(controller.get_all_worker_handles.remote())
backend_dict = ray.get(controller.get_all_replica_handles.remote())
return list(backend_dict[backend].values())
+12 -12
View File
@@ -51,8 +51,8 @@ async def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor):
await q.setup.remote("", serve_instance._controller_name)
q.set_traffic.remote("svc", TrafficPolicy({"backend-single-prod": 1.0}))
q.add_new_worker.remote("backend-single-prod", "replica-1",
task_runner_mock_actor)
q.add_new_replica.remote("backend-single-prod", "replica-1",
task_runner_mock_actor)
# Make sure we get the request result back
result = await q.enqueue_request.remote(
@@ -70,16 +70,16 @@ async def test_alter_backend(serve_instance, task_runner_mock_actor):
await q.setup.remote("", serve_instance._controller_name)
await q.set_traffic.remote("svc", TrafficPolicy({"backend-alter": 1}))
await q.add_new_worker.remote("backend-alter", "replica-1",
task_runner_mock_actor)
await q.add_new_replica.remote("backend-alter", "replica-1",
task_runner_mock_actor)
await q.enqueue_request.remote(
RequestMetadata(get_random_letters(10), "svc", None), 1)
got_work = await task_runner_mock_actor.get_recent_call.remote()
assert got_work.args[0] == 1
await q.set_traffic.remote("svc", TrafficPolicy({"backend-alter-2": 1}))
await q.add_new_worker.remote("backend-alter-2", "replica-1",
task_runner_mock_actor)
await q.add_new_replica.remote("backend-alter-2", "replica-1",
task_runner_mock_actor)
await q.enqueue_request.remote(
RequestMetadata(get_random_letters(10), "svc", None), 2)
got_work = await task_runner_mock_actor.get_recent_call.remote()
@@ -96,8 +96,8 @@ async def test_split_traffic_random(serve_instance, task_runner_mock_actor):
"backend-split-2": 0.5
}))
runner_1, runner_2 = [mock_task_runner() for _ in range(2)]
await q.add_new_worker.remote("backend-split", "replica-1", runner_1)
await q.add_new_worker.remote("backend-split-2", "replica-1", runner_2)
await q.add_new_replica.remote("backend-split", "replica-1", runner_1)
await q.add_new_replica.remote("backend-split-2", "replica-1", runner_2)
# assume 50% split, the probability of all 20 requests goes to a
# single queue is 0.5^20 ~ 1-6
@@ -120,8 +120,8 @@ async def test_queue_remove_replicas(serve_instance):
temp_actor = mock_task_runner()
q = ray.remote(TestRouter).remote()
await q.setup.remote("", serve_instance._controller_name)
await q.add_new_worker.remote("backend-remove", "replica-1", temp_actor)
await q.remove_worker.remote("backend-remove", "replica-1")
await q.add_new_replica.remote("backend-remove", "replica-1", temp_actor)
await q.remove_replica.remote("backend-remove", "replica-1")
assert ray.get(q.worker_queue_size.remote("backend")) == 0
@@ -135,7 +135,7 @@ async def test_shard_key(serve_instance, task_runner_mock_actor):
for i, runner in enumerate(runners):
backend_name = "backend-split-" + str(i)
traffic_dict[backend_name] = 1.0 / num_backends
await q.add_new_worker.remote(backend_name, "replica-1", runner)
await q.add_new_replica.remote(backend_name, "replica-1", runner)
await q.set_traffic.remote("svc", TrafficPolicy(traffic_dict))
# Generate random shard keys and send one request for each.
@@ -190,7 +190,7 @@ async def test_router_use_max_concurrency(serve_instance):
backend_name = "max-concurrent-test"
config = BackendConfig(max_concurrent_queries=1)
await q.set_traffic.remote("svc", TrafficPolicy({backend_name: 1.0}))
await q.add_new_worker.remote(backend_name, "replica-tag", worker)
await q.add_new_replica.remote(backend_name, "replica-tag", worker)
await q.set_backend_config.remote(backend_name, config)
# We send over two queries