diff --git a/ci/long_running_tests/workloads/serve_failure.py b/ci/long_running_tests/workloads/serve_failure.py index 50c7267b2..29b7b1c3c 100644 --- a/ci/long_running_tests/workloads/serve_failure.py +++ b/ci/long_running_tests/workloads/serve_failure.py @@ -37,7 +37,7 @@ class RandomKiller: serve.init() def _get_all_serve_actors(self): - master = serve.api._get_master_actor() + master = serve.api._get_controller() [router] = ray.get(master.get_router.remote()) [http_proxy] = ray.get(master.get_http_proxy.remote()) all_handles = [master, router, http_proxy] diff --git a/python/ray/serve/BUILD b/python/ray/serve/BUILD index f9c4a38f8..de97c45be 100644 --- a/python/ray/serve/BUILD +++ b/python/ray/serve/BUILD @@ -7,7 +7,7 @@ py_library( serve_tests_srcs = glob(["tests/*.py"], exclude=["tests/test_nonblocking.py", - "tests/test_master_crashes.py", + "tests/test_controller_crashes.py", "tests/test_serve.py", ]) @@ -108,12 +108,12 @@ py_test( ) -# Runs test_api and test_failure with injected failures in the master actor. +# Runs test_api and test_failure with injected failures in the controller. # TODO(edoakes): reenable this once we're using GCS actor fault tolerance. # py_test( - # name = "test_master_crashes", + # name = "test_controller_crashes", # size = "medium", - # srcs = glob(["tests/test_master_crashes.py", + # srcs = glob(["tests/test_controller_crashes.py", # "tests/test_api.py", # "tests/test_failure.py"], # exclude=["tests/test_nonblocking.py", diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index a3574b095..44d7da155 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -2,32 +2,32 @@ from functools import wraps import ray from ray.serve.constants import (DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT, - SERVE_MASTER_NAME, HTTP_PROXY_TIMEOUT) -from ray.serve.master import ServeMaster + SERVE_CONTROLLER_NAME, HTTP_PROXY_TIMEOUT) +from ray.serve.controller import ServeController from ray.serve.handle import RayServeHandle from ray.serve.utils import (block_until_http_ready, format_actor_name) from ray.serve.exceptions import RayServeException from ray.serve.config import BackendConfig, ReplicaConfig from ray.serve.metric import InMemoryExporter -master_actor = None +controller = None -def _get_master_actor(): +def _get_controller(): """Used for internal purpose because using just import serve.global_state will always reference the original None object. """ - global master_actor - if master_actor is None: + global controller + if controller is None: raise RayServeException("Please run serve.init to initialize or " "connect to existing ray serve cluster.") - return master_actor + return controller def _ensure_connected(f): @wraps(f) def check(*args, **kwargs): - _get_master_actor() + _get_controller() return f(*args, **kwargs) return check @@ -85,11 +85,11 @@ def init(name=None, if not ray.is_initialized(): ray.init() - # Try to get serve master actor if it exists - global master_actor - master_actor_name = format_actor_name(SERVE_MASTER_NAME, name) + # Try to get serve controller if it exists + global controller + controller_name = format_actor_name(SERVE_CONTROLLER_NAME, name) try: - master_actor = ray.get_actor(master_actor_name) + controller = ray.get_actor(controller_name) return except ValueError: pass @@ -98,8 +98,8 @@ def init(name=None, # serve.init() was run on. We should consider making this configurable # in the future. http_node_id = ray.state.current_node_id() - master_actor = ServeMaster.options( - name=master_actor_name, + controller = ServeController.options( + name=controller_name, max_restarts=-1, max_task_retries=-1, ).remote(name, http_node_id, http_host, http_port, metric_exporter) @@ -116,10 +116,10 @@ def shutdown(): Shuts down all processes and deletes all state associated with the Serve instance that's currently connected to (via serve.init). """ - global master_actor - ray.get(master_actor.shutdown.remote()) - ray.kill(master_actor, no_restart=True) - master_actor = None + global controller + ray.get(controller.shutdown.remote()) + ray.kill(controller, no_restart=True) + controller = None def create_endpoint(endpoint_name, @@ -163,8 +163,8 @@ def create_endpoint(endpoint_name, upper_methods.append(method.upper()) ray.get( - master_actor.create_endpoint.remote(endpoint_name, {backend: 1.0}, - route, upper_methods)) + controller.create_endpoint.remote(endpoint_name, {backend: 1.0}, route, + upper_methods)) @_ensure_connected @@ -173,7 +173,7 @@ def delete_endpoint(endpoint): Does not delete any associated backends. """ - ray.get(master_actor.delete_endpoint.remote(endpoint)) + ray.get(controller.delete_endpoint.remote(endpoint)) @_ensure_connected @@ -183,7 +183,7 @@ def list_endpoints(): The dictionary keys are endpoint names and values are dictionaries of the form: {"methods": List[str], "traffic": Dict[str, float]}. """ - return ray.get(master_actor.get_all_endpoints.remote()) + return ray.get(controller.get_all_endpoints.remote()) @_ensure_connected @@ -210,7 +210,7 @@ def update_backend_config(backend_tag, config_options): if not isinstance(config_options, dict): raise ValueError("config_options must be a dictionary.") ray.get( - master_actor.update_backend_config.remote(backend_tag, config_options)) + controller.update_backend_config.remote(backend_tag, config_options)) @_ensure_connected @@ -220,7 +220,7 @@ def get_backend_config(backend_tag): Args: backend_tag(str): A registered backend. """ - return ray.get(master_actor.get_backend_config.remote(backend_tag)) + return ray.get(controller.get_backend_config.remote(backend_tag)) @_ensure_connected @@ -265,8 +265,8 @@ def create_backend(backend_tag, replica_config.is_blocking) ray.get( - master_actor.create_backend.remote(backend_tag, backend_config, - replica_config)) + controller.create_backend.remote(backend_tag, backend_config, + replica_config)) @_ensure_connected @@ -275,7 +275,7 @@ def list_backends(): Dictionary maps backend tags to backend configs. """ - return ray.get(master_actor.get_all_backends.remote()) + return ray.get(controller.get_all_backends.remote()) @_ensure_connected @@ -284,7 +284,7 @@ def delete_backend(backend_tag): The backend must not currently be used by any endpoints. """ - ray.get(master_actor.delete_backend.remote(backend_tag)) + ray.get(controller.delete_backend.remote(backend_tag)) @_ensure_connected @@ -304,8 +304,8 @@ def set_traffic(endpoint_name, traffic_policy_dictionary): to their traffic weights. The weights must sum to 1. """ ray.get( - master_actor.set_traffic.remote(endpoint_name, - traffic_policy_dictionary)) + controller.set_traffic.remote(endpoint_name, + traffic_policy_dictionary)) @_ensure_connected @@ -329,8 +329,8 @@ def shadow_traffic(endpoint_name, backend_tag, proportion): raise TypeError("proportion must be a float from 0 to 1.") ray.get( - master_actor.shadow_traffic.remote(endpoint_name, backend_tag, - proportion)) + controller.shadow_traffic.remote(endpoint_name, backend_tag, + proportion)) @_ensure_connected @@ -353,11 +353,10 @@ def get_handle(endpoint_name, RayServeHandle """ if not missing_ok: - assert endpoint_name in ray.get( - master_actor.get_all_endpoints.remote()) + assert endpoint_name in ray.get(controller.get_all_endpoints.remote()) return RayServeHandle( - ray.get(master_actor.get_http_proxy.remote())[0], + ray.get(controller.get_http_proxy.remote())[0], endpoint_name, relative_slo_ms, absolute_slo_ms, @@ -387,5 +386,5 @@ def stat(): For PrometheusExporter, it returns the metrics in prometheus format in plain text. """ - [metric_exporter] = ray.get(master_actor.get_metric_exporter.remote()) + [metric_exporter] = ray.get(controller.get_metric_exporter.remote()) return ray.get(metric_exporter.inspect_metrics.remote()) diff --git a/python/ray/serve/backend_worker.py b/python/ray/serve/backend_worker.py index 2c82d89f7..9741709c7 100644 --- a/python/ray/serve/backend_worker.py +++ b/python/ray/serve/backend_worker.py @@ -110,8 +110,9 @@ def create_backend_worker(func_or_class): else: _callable = func_or_class(*init_args) - master = serve.api._get_master_actor() - [metric_exporter] = ray.get(master.get_metric_exporter.remote()) + controller = serve.api._get_controller() + [metric_exporter] = ray.get( + controller.get_metric_exporter.remote()) metric_client = MetricClient( metric_exporter, default_labels={"backend": backend_tag}) self.backend = RayServeWorker(backend_tag, replica_tag, _callable, diff --git a/python/ray/serve/benchmarks/noop_latency.py b/python/ray/serve/benchmarks/noop_latency.py index 4d9146287..a2496dd37 100644 --- a/python/ray/serve/benchmarks/noop_latency.py +++ b/python/ray/serve/benchmarks/noop_latency.py @@ -8,9 +8,9 @@ import click from ray import serve from ray.serve.constants import DEFAULT_HTTP_ADDRESS -from ray.serve import master +from ray.serve import controller -master._TRACING_ENABLED = True +controller._TRACING_ENABLED = True def block_until_ready(url): diff --git a/python/ray/serve/constants.py b/python/ray/serve/constants.py index 8261496dc..d7668e251 100644 --- a/python/ray/serve/constants.py +++ b/python/ray/serve/constants.py @@ -1,5 +1,5 @@ -#: Actor name used to register master actor -SERVE_MASTER_NAME = "SERVE_MASTER_ACTOR" +#: Actor name used to register controller +SERVE_CONTROLLER_NAME = "SERVE_CONTROLLER_ACTOR" #: Actor name used to register HTTP proxy actor SERVE_PROXY_NAME = "SERVE_PROXY_ACTOR" diff --git a/python/ray/serve/master.py b/python/ray/serve/controller.py similarity index 98% rename from python/ray/serve/master.py rename to python/ray/serve/controller.py index 5111f6251..771605961 100644 --- a/python/ray/serve/master.py +++ b/python/ray/serve/controller.py @@ -18,12 +18,12 @@ from ray.serve.utils import (format_actor_name, get_random_letters, logger, import numpy as np -# Used for testing purposes only. If this is set, the master actor will crash +# Used for testing purposes only. If this is set, the controller will crash # after writing each checkpoint with the specified probability. _CRASH_AFTER_CHECKPOINT_PROBABILITY = 0.0 -CHECKPOINT_KEY = "serve-master-checkpoint" +CHECKPOINT_KEY = "serve-controller-checkpoint" -# Feature flag for master actor resource checking. If true, master actor will +# Feature flag for controller resource checking. If true, controller will # error if the desired replicas exceed current resource availability. _RESOURCE_CHECK_ENABLED = True @@ -62,10 +62,10 @@ BackendInfo = namedtuple("BackendInfo", @ray.remote -class ServeMaster: +class ServeController: """Responsible for managing the state of the serving system. - The master actor implements fault tolerance by persisting its state in + The controller implements fault tolerance by persisting its state in a new checkpoint each time a state change is made. If the actor crashes, the latest checkpoint is loaded and the state is recovered. Checkpoints are written/read using a provided KV-store interface. @@ -75,13 +75,13 @@ class ServeMaster: those actors from this actor on startup and updates are pushed out from this actor. - All other actors started by the master actor are named, detached actors - so they will not fate share with the master if it crashes. + All other actors started by the controller are named, detached actors + so they will not fate share with the controller if it crashes. The following guarantees are provided for state-changing calls to the - master actor: + controller: - If the call succeeds, the change was made and will be reflected in - the system even if the master actor or other actors die unexpectedly. + the system even if the controller or other actors die unexpectedly. - If the call fails, the change may have been made but isn't guaranteed to have been. The client should retry in this case. Note that this requires all implementations here to be idempotent. diff --git a/python/ray/serve/handle.py b/python/ray/serve/handle.py index 3c53f722b..db3dc2cd4 100644 --- a/python/ray/serve/handle.py +++ b/python/ray/serve/handle.py @@ -111,9 +111,9 @@ class RayServeHandle: ) def get_traffic_policy(self): - master_actor = serve.api._get_master_actor() + controller = serve.api._get_controller() return ray.get( - master_actor.get_traffic_policy.remote(self.endpoint_name)) + controller.get_traffic_policy.remote(self.endpoint_name)) def __repr__(self): return """ diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index 0b7d25573..17987bb8e 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -26,14 +26,14 @@ class HTTPProxy: # blocks forever """ - async def fetch_config_from_master(self, instance_name=None): + async def fetch_config_from_controller(self, instance_name=None): assert ray.is_initialized() - master = serve.api._get_master_actor() + controller = serve.api._get_controller() - self.route_table = await master.get_http_proxy_config.remote() + self.route_table = await controller.get_http_proxy_config.remote() # The exporter is required to return results for /-/metrics endpoint. - [self.metric_exporter] = await master.get_metric_exporter.remote() + [self.metric_exporter] = await controller.get_metric_exporter.remote() self.metric_client = MetricClient(self.metric_exporter) self.request_counter = self.metric_client.new_counter( @@ -172,7 +172,7 @@ class HTTPProxyActor: async def __init__(self, host, port, instance_name=None): serve.init(name=instance_name) self.app = HTTPProxy() - await self.app.fetch_config_from_master(instance_name) + await self.app.fetch_config_from_controller(instance_name) self.host = host self.port = port diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index 00a076efd..683e9ba8e 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -133,27 +133,27 @@ class Router: # -- State Restoration -- # # Fetch the worker handles, traffic policies, and backend configs from - # the master actor. We use a "pull-based" approach instead of pushing - # them from the master so that the router can transparently recover + # the controller. We use a "pull-based" approach instead of pushing + # them from the controller so that the router can transparently recover # from failure. serve.init(name=instance_name) - master_actor = serve.api._get_master_actor() + controller = serve.api._get_controller() - traffic_policies = ray.get(master_actor.get_traffic_policies.remote()) + traffic_policies = ray.get(controller.get_traffic_policies.remote()) for endpoint, traffic_policy in traffic_policies.items(): await self.set_traffic(endpoint, traffic_policy) - backend_dict = ray.get(master_actor.get_all_worker_handles.remote()) + backend_dict = ray.get(controller.get_all_worker_handles.remote()) for backend_tag, replica_dict in backend_dict.items(): for replica_tag, worker in replica_dict.items(): await self.add_new_worker(backend_tag, replica_tag, worker) - backend_configs = ray.get(master_actor.get_backend_configs.remote()) + backend_configs = ray.get(controller.get_backend_configs.remote()) for backend, backend_config in backend_configs.items(): await self.set_backend_config(backend, backend_config) # -- Metric Registration -- # - [metric_exporter] = ray.get(master_actor.get_metric_exporter.remote()) + [metric_exporter] = ray.get(controller.get_metric_exporter.remote()) self.metric_client = MetricClient(metric_exporter) self.num_router_requests = self.metric_client.new_counter( "num_router_requests", diff --git a/python/ray/serve/tests/conftest.py b/python/ray/serve/tests/conftest.py index 9fe0ede12..1a40f0652 100644 --- a/python/ray/serve/tests/conftest.py +++ b/python/ray/serve/tests/conftest.py @@ -6,7 +6,7 @@ import ray from ray import serve if os.environ.get("RAY_SERVE_INTENTIONALLY_CRASH", False): - serve.master._CRASH_AFTER_CHECKPOINT_PROBABILITY = 0.5 + serve.controller._CRASH_AFTER_CHECKPOINT_PROBABILITY = 0.5 @pytest.fixture(scope="session") @@ -22,9 +22,9 @@ def serve_instance(_shared_serve_instance): yield # Re-init if necessary. serve.init() - master = serve.api._get_master_actor() + controller = serve.api._get_controller() # Clear all state between tests to avoid naming collisions. - for endpoint in ray.get(master.get_all_endpoints.remote()): + for endpoint in ray.get(controller.get_all_endpoints.remote()): serve.delete_endpoint(endpoint) - for backend in ray.get(master.get_all_backends.remote()): + for backend in ray.get(controller.get_all_backends.remote()): serve.delete_backend(backend) diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index cbda31dbd..0b8ae1d9d 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -228,16 +228,16 @@ def test_updating_config(serve_instance): }) serve.create_endpoint("bsimple", backend="bsimple:v1", route="/bsimple") - master_actor = serve.api._get_master_actor() + controller = serve.api._get_controller() old_replica_tag_list = ray.get( - master_actor._list_replicas.remote("bsimple:v1")) + controller._list_replicas.remote("bsimple:v1")) serve.update_backend_config("bsimple:v1", {"max_batch_size": 5}) new_replica_tag_list = ray.get( - master_actor._list_replicas.remote("bsimple:v1")) + controller._list_replicas.remote("bsimple:v1")) new_all_tag_list = [] for worker_dict in ray.get( - master_actor.get_all_worker_handles.remote()).values(): + controller.get_all_worker_handles.remote()).values(): new_all_tag_list.extend(list(worker_dict.keys())) # the old and new replica tag list should be identical @@ -550,7 +550,7 @@ def test_create_infeasible_error(serve_instance): config={"num_replicas": current_cpus + 20}) # No replica should be created! - replicas = ray.get(serve.api.master_actor._list_replicas.remote("f1")) + replicas = ray.get(serve.api.controller._list_replicas.remote("f1")) assert len(replicas) == 0 @@ -569,7 +569,7 @@ def test_shutdown(serve_instance): def check_dead(): for actor_name in [ - constants.SERVE_MASTER_NAME, constants.SERVE_PROXY_NAME, + constants.SERVE_CONTROLLER_NAME, constants.SERVE_PROXY_NAME, constants.SERVE_METRIC_SINK_NAME ]: try: diff --git a/python/ray/serve/tests/test_backend_worker.py b/python/ray/serve/tests/test_backend_worker.py index e3a402f4d..c0c6648a6 100644 --- a/python/ray/serve/tests/test_backend_worker.py +++ b/python/ray/serve/tests/test_backend_worker.py @@ -7,7 +7,7 @@ import ray from ray import serve import ray.serve.context as context from ray.serve.backend_worker import create_backend_worker, wrap_to_ray_error -from ray.serve.master import TrafficPolicy +from ray.serve.controller import TrafficPolicy from ray.serve.request_params import RequestMetadata from ray.serve.router import Router from ray.serve.config import BackendConfig diff --git a/python/ray/serve/tests/test_master_crashes.py b/python/ray/serve/tests/test_controller_crashes.py similarity index 100% rename from python/ray/serve/tests/test_master_crashes.py rename to python/ray/serve/tests/test_controller_crashes.py diff --git a/python/ray/serve/tests/test_failure.py b/python/ray/serve/tests/test_failure.py index 0e0f2c45d..737704a34 100644 --- a/python/ray/serve/tests/test_failure.py +++ b/python/ray/serve/tests/test_failure.py @@ -19,62 +19,64 @@ def request_with_retries(endpoint, timeout=30): time.sleep(0.1) -def test_master_failure(serve_instance): +def test_controller_failure(serve_instance): serve.init() def function(): return "hello1" - serve.create_backend("master_failure:v1", function) + serve.create_backend("controller_failure:v1", function) serve.create_endpoint( - "master_failure", backend="master_failure:v1", route="/master_failure") + "controller_failure", + backend="controller_failure:v1", + route="/controller_failure") - assert request_with_retries("/master_failure", timeout=1).text == "hello1" + assert request_with_retries( + "/controller_failure", timeout=1).text == "hello1" for _ in range(10): - response = request_with_retries("/master_failure", timeout=30) + response = request_with_retries("/controller_failure", timeout=30) assert response.text == "hello1" - ray.kill(serve.api._get_master_actor(), no_restart=False) + ray.kill(serve.api._get_controller(), no_restart=False) for _ in range(10): - response = request_with_retries("/master_failure", timeout=30) + response = request_with_retries("/controller_failure", timeout=30) assert response.text == "hello1" def function(): return "hello2" - ray.kill(serve.api._get_master_actor(), no_restart=False) + ray.kill(serve.api._get_controller(), no_restart=False) - serve.create_backend("master_failure:v2", function) - serve.set_traffic("master_failure", {"master_failure:v2": 1.0}) + serve.create_backend("controller_failure:v2", function) + serve.set_traffic("controller_failure", {"controller_failure:v2": 1.0}) for _ in range(10): - response = request_with_retries("/master_failure", timeout=30) + response = request_with_retries("/controller_failure", timeout=30) assert response.text == "hello2" def function(): return "hello3" - ray.kill(serve.api._get_master_actor(), no_restart=False) - serve.create_backend("master_failure_2", function) - ray.kill(serve.api._get_master_actor(), no_restart=False) + ray.kill(serve.api._get_controller(), no_restart=False) + serve.create_backend("controller_failure_2", function) + ray.kill(serve.api._get_controller(), no_restart=False) serve.create_endpoint( - "master_failure_2", - backend="master_failure_2", - route="/master_failure_2") - ray.kill(serve.api._get_master_actor(), no_restart=False) + "controller_failure_2", + backend="controller_failure_2", + route="/controller_failure_2") + ray.kill(serve.api._get_controller(), no_restart=False) for _ in range(10): - response = request_with_retries("/master_failure", timeout=30) + response = request_with_retries("/controller_failure", timeout=30) assert response.text == "hello2" - response = request_with_retries("/master_failure_2", timeout=30) + response = request_with_retries("/controller_failure_2", timeout=30) assert response.text == "hello3" def _kill_http_proxy(): - [http_proxy] = ray.get( - serve.api._get_master_actor().get_http_proxy.remote()) + [http_proxy] = ray.get(serve.api._get_controller().get_http_proxy.remote()) ray.kill(http_proxy, no_restart=False) @@ -108,8 +110,8 @@ def test_http_proxy_failure(serve_instance): def _get_worker_handles(backend): - master_actor = serve.api._get_master_actor() - backend_dict = ray.get(master_actor.get_all_worker_handles.remote()) + controller = serve.api._get_controller() + backend_dict = ray.get(controller.get_all_worker_handles.remote()) return list(backend_dict[backend].values()) diff --git a/python/ray/serve/tests/test_router.py b/python/ray/serve/tests/test_router.py index ddf5cd537..3200ddc95 100644 --- a/python/ray/serve/tests/test_router.py +++ b/python/ray/serve/tests/test_router.py @@ -4,7 +4,7 @@ from collections import defaultdict import pytest import ray -from ray.serve.master import TrafficPolicy +from ray.serve.controller import TrafficPolicy from ray.serve.router import Router, Query from ray.serve.request_params import RequestMetadata from ray.serve.utils import get_random_letters