diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 30a464498..358837edc 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -5,8 +5,7 @@ from ray.serve.constants import (DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT, SERVE_MASTER_NAME, HTTP_PROXY_TIMEOUT) from ray.serve.master import ServeMaster from ray.serve.handle import RayServeHandle -from ray.serve.utils import (block_until_http_ready, format_actor_name, - retry_actor_failures) +from ray.serve.utils import (block_until_http_ready, format_actor_name) from ray.serve.exceptions import RayServeException from ray.serve.config import BackendConfig, ReplicaConfig from ray.serve.router import Query @@ -111,6 +110,7 @@ def init(name=None, master_actor = ServeMaster.options( name=master_actor_name, max_restarts=-1, + max_task_retries=-1, ).remote(name, http_node_id, http_host, http_port, metric_exporter) block_until_http_ready( @@ -128,8 +128,9 @@ def create_endpoint(endpoint_name, route=None, methods=["GET"]): route (str): A string begin with "/". HTTP server will use the string to match the path. """ - retry_actor_failures(master_actor.create_endpoint, route, endpoint_name, - [m.upper() for m in methods]) + ray.get( + master_actor.create_endpoint.remote(route, endpoint_name, + [m.upper() for m in methods])) @_ensure_connected @@ -138,7 +139,7 @@ def delete_endpoint(endpoint): Does not delete any associated backends. """ - retry_actor_failures(master_actor.delete_endpoint, endpoint) + ray.get(master_actor.delete_endpoint.remote(endpoint)) @_ensure_connected @@ -148,7 +149,7 @@ def list_endpoints(): The dictionary keys are endpoint names and values are dictionaries of the form: {"methods": List[str], "traffic": Dict[str, float]}. """ - return retry_actor_failures(master_actor.get_all_endpoints) + return ray.get(master_actor.get_all_endpoints.remote()) @_ensure_connected @@ -163,8 +164,8 @@ def update_backend_config(backend_tag, config_options): """ if not isinstance(config_options, dict): raise ValueError("config_options must be a dictionary.") - retry_actor_failures(master_actor.update_backend_config, backend_tag, - config_options) + ray.get( + master_actor.update_backend_config.remote(backend_tag, config_options)) @_ensure_connected @@ -174,7 +175,7 @@ def get_backend_config(backend_tag): Args: backend_tag(str): A registered backend. """ - return retry_actor_failures(master_actor.get_backend_config, backend_tag) + return ray.get(master_actor.get_backend_config.remote(backend_tag)) @_ensure_connected @@ -206,8 +207,9 @@ def create_backend(backend_tag, func_or_class, *actor_init_args, ray_actor_options=ray_actor_options) backend_config = BackendConfig(config, replica_config.accepts_batches) - retry_actor_failures(master_actor.create_backend, backend_tag, - backend_config, replica_config) + ray.get( + master_actor.create_backend.remote(backend_tag, backend_config, + replica_config)) @_ensure_connected @@ -216,7 +218,7 @@ def list_backends(): Dictionary maps backend tags to backend configs. """ - return retry_actor_failures(master_actor.get_all_backends) + return ray.get(master_actor.get_all_backends.remote()) @_ensure_connected @@ -225,7 +227,7 @@ def delete_backend(backend_tag): The backend must not currently be used by any endpoints. """ - retry_actor_failures(master_actor.delete_backend, backend_tag) + ray.get(master_actor.delete_backend.remote(backend_tag)) @_ensure_connected @@ -244,8 +246,9 @@ def set_traffic(endpoint_name, traffic_policy_dictionary): traffic_policy_dictionary (dict): a dictionary maps backend names to their traffic weights. The weights must sum to 1. """ - retry_actor_failures(master_actor.set_traffic, endpoint_name, - traffic_policy_dictionary) + ray.get( + master_actor.set_traffic.remote(endpoint_name, + traffic_policy_dictionary)) @_ensure_connected @@ -268,11 +271,11 @@ def get_handle(endpoint_name, RayServeHandle """ if not missing_ok: - assert endpoint_name in retry_actor_failures( - master_actor.get_all_endpoints) + assert endpoint_name in ray.get( + master_actor.get_all_endpoints.remote()) return RayServeHandle( - retry_actor_failures(master_actor.get_router)[0], + ray.get(master_actor.get_router.remote())[0], endpoint_name, relative_slo_ms, absolute_slo_ms, diff --git a/python/ray/serve/backend_worker.py b/python/ray/serve/backend_worker.py index faac14192..78782fcfb 100644 --- a/python/ray/serve/backend_worker.py +++ b/python/ray/serve/backend_worker.py @@ -7,8 +7,7 @@ from ray import serve from ray.serve import context as serve_context from ray.serve.context import FakeFlaskRequest from collections import defaultdict -from ray.serve.utils import (parse_request_item, _get_logger, - retry_actor_failures) +from ray.serve.utils import (parse_request_item, _get_logger) from ray.serve.exceptions import RayServeException from ray.serve.metric import MetricClient from ray.async_compat import sync_to_async @@ -39,8 +38,7 @@ def create_backend_worker(func_or_class): _callable = func_or_class(*init_args) master = serve.api._get_master_actor() - [metric_exporter] = retry_actor_failures( - master.get_metric_exporter) + [metric_exporter] = ray.get(master.get_metric_exporter.remote()) metric_client = MetricClient( metric_exporter, default_labels={"backend": backend_tag}) self.backend = RayServeWorker(backend_tag, replica_tag, _callable, diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index 24797c276..55821fb60 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -9,7 +9,7 @@ from ray.serve.context import TaskContext from ray.serve.metric import MetricClient from ray.serve.request_params import RequestMetadata from ray.serve.http_util import Response -from ray.serve.utils import logger, retry_actor_failures_async +from ray.serve.utils import logger from urllib.parse import parse_qs @@ -31,13 +31,11 @@ class HTTPProxy: assert ray.is_initialized() master = serve.api._get_master_actor() - self.route_table, [ - self.router_handle - ] = await retry_actor_failures_async(master.get_http_proxy_config) + self.route_table, [self.router_handle + ] = await master.get_http_proxy_config.remote() # The exporter is required to return results for /-/metrics endpoint. - [self.metric_exporter] = await retry_actor_failures_async( - master.get_metric_exporter) + [self.metric_exporter] = await master.get_metric_exporter.remote() self.metric_client = MetricClient(self.metric_exporter) self.request_counter = self.metric_client.new_counter( @@ -107,8 +105,7 @@ class HTTPProxy: if current_path == "/-/routes": await Response(self.route_table).send(scope, receive, send) elif current_path == "/-/metrics": - metric_info = await retry_actor_failures_async( - self.metric_exporter.inspect_metrics) + metric_info = await self.metric_exporter.inspect_metrics.remote() await Response(metric_info).send(scope, receive, send) else: await Response( diff --git a/python/ray/serve/master.py b/python/ray/serve/master.py index 7b49bed66..63598be05 100644 --- a/python/ray/serve/master.py +++ b/python/ray/serve/master.py @@ -13,8 +13,7 @@ from ray.serve.http_proxy import HTTPProxyActor from ray.serve.kv_store import RayInternalKVStore from ray.serve.metric.exporter import MetricExporterActor from ray.serve.router import Router -from ray.serve.utils import (async_retryable, format_actor_name, - get_random_letters, logger) +from ray.serve.utils import (format_actor_name, get_random_letters, logger) import numpy as np @@ -128,10 +127,11 @@ class ServeMaster: self.router = ray.get_actor(router_name) except ValueError: logger.info("Starting router with name '{}'".format(router_name)) - self.router = async_retryable(ray.remote(Router)).options( + self.router = ray.remote(Router).options( name=router_name, max_concurrency=ASYNC_CONCURRENCY, max_restarts=-1, + max_task_retries=-1, ).remote(instance_name=self.instance_name) def get_router(self): @@ -150,10 +150,11 @@ class ServeMaster: logger.info( "Starting HTTP proxy with name '{}' on node '{}'".format( proxy_name, node_id)) - self.http_proxy = async_retryable(HTTPProxyActor).options( + self.http_proxy = HTTPProxyActor.options( name=proxy_name, max_concurrency=ASYNC_CONCURRENCY, max_restarts=-1, + max_task_retries=-1, resources={ node_id: 0.01 }, @@ -305,9 +306,10 @@ class ServeMaster: replica_config) = self.backends[backend_tag] replica_name = format_actor_name(replica_tag, self.instance_name) - worker_handle = async_retryable(ray.remote(backend_worker)).options( + worker_handle = ray.remote(backend_worker).options( name=replica_name, max_restarts=-1, + max_task_retries=-1, **replica_config.ray_actor_options).remote( backend_tag, replica_tag, diff --git a/python/ray/serve/metric/client.py b/python/ray/serve/metric/client.py index 442ad4f4c..c0b0b9c53 100644 --- a/python/ray/serve/metric/client.py +++ b/python/ray/serve/metric/client.py @@ -6,7 +6,7 @@ from ray.serve.metric.types import ( convert_event_type_to_class, MetricMetadata, ) -from ray.serve.utils import retry_actor_failures_async, _get_logger +from ray.serve.utils import _get_logger from ray.serve.constants import METRIC_PUSH_INTERVAL_S logger = _get_logger() @@ -129,8 +129,7 @@ class MetricClient: old_batch, self.metric_records = self.metric_records, [] logger.debug("Pushing metric batch {}".format(old_batch)) - await retry_actor_failures_async(self.exporter.ingest, - self.registered_metrics, old_batch) + await self.exporter.ingest.remote(self.registered_metrics, old_batch) async def push_to_exporter_forever(self, interval_s): while True: diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index 22f07976b..348543c0e 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -9,10 +9,11 @@ import blist import ray.cloudpickle as pickle from ray.exceptions import RayTaskError +import ray from ray import serve from ray.serve.metric import MetricClient from ray.serve.policy import RandomEndpointPolicy -from ray.serve.utils import logger, retry_actor_failures +from ray.serve.utils import logger class Query: @@ -136,25 +137,21 @@ class Router: serve.init(name=instance_name) master_actor = serve.api._get_master_actor() - traffic_policies = retry_actor_failures( - master_actor.get_traffic_policies) + traffic_policies = ray.get(master_actor.get_traffic_policies.remote()) for endpoint, traffic_policy in traffic_policies.items(): await self.set_traffic(endpoint, traffic_policy) - backend_dict = retry_actor_failures( - master_actor.get_all_worker_handles) + backend_dict = ray.get(master_actor.get_all_worker_handles.remote()) for backend_tag, replica_dict in backend_dict.items(): for replica_tag, worker in replica_dict.items(): await self.add_new_worker(backend_tag, replica_tag, worker) - backend_configs = retry_actor_failures( - master_actor.get_backend_configs) + backend_configs = ray.get(master_actor.get_backend_configs.remote()) for backend, backend_config in backend_configs.items(): await self.set_backend_config(backend, backend_config) # -- Metric Registration -- # - [metric_exporter] = retry_actor_failures( - master_actor.get_metric_exporter) + [metric_exporter] = ray.get(master_actor.get_metric_exporter.remote()) self.metric_client = MetricClient(metric_exporter) self.num_router_requests = self.metric_client.new_counter( "num_router_requests", diff --git a/python/ray/serve/tests/conftest.py b/python/ray/serve/tests/conftest.py index e9d8ac3d7..dcd2d0c66 100644 --- a/python/ray/serve/tests/conftest.py +++ b/python/ray/serve/tests/conftest.py @@ -4,7 +4,6 @@ import pytest import ray from ray import serve -from ray.serve.utils import retry_actor_failures if os.environ.get("RAY_SERVE_INTENTIONALLY_CRASH", False): serve.master._CRASH_AFTER_CHECKPOINT_PROBABILITY = 0.5 @@ -23,7 +22,7 @@ def serve_instance(_shared_serve_instance): yield master = serve.api._get_master_actor() # Clear all state between tests to avoid naming collisions. - for endpoint in retry_actor_failures(master.get_all_endpoints): + for endpoint in ray.get(master.get_all_endpoints.remote()): serve.delete_endpoint(endpoint) - for backend in retry_actor_failures(master.get_all_backends): + for backend in ray.get(master.get_all_backends.remote()): serve.delete_backend(backend) diff --git a/python/ray/serve/utils.py b/python/ray/serve/utils.py index 4e0407d98..a2492542f 100644 --- a/python/ray/serve/utils.py +++ b/python/ray/serve/utils.py @@ -1,6 +1,3 @@ -import asyncio -from functools import wraps -import inspect import json import logging import random @@ -9,7 +6,6 @@ import time import io import os -import ray import requests from pygments import formatters, highlight, lexers from ray.serve.constants import HTTP_PROXY_TIMEOUT @@ -113,72 +109,6 @@ def get_random_letters(length=6): return "".join(random.choices(string.ascii_letters, k=length)) -def async_retryable(cls): - """Make all actor method invocations on the class retryable. - - Note: This will retry actor_handle.method_name.remote(), but it must - be invoked in an async context. - - Usage: - @ray.remote(max_restarts=10000) - @async_retryable - class A: - pass - """ - for name, method in inspect.getmembers(cls, predicate=inspect.isfunction): - - def decorate_with_retry(f): - @wraps(f) - async def retry_method(*args, **kwargs): - start = time.time() - while time.time() - start < ACTOR_FAILURE_RETRY_TIMEOUT_S: - try: - return await f(*args, **kwargs) - except ray.exceptions.RayActorError: - logger.warning( - "Actor method '{}' failed, retrying after 100ms.". - format(name)) - await asyncio.sleep(0.1) - raise RuntimeError("Timed out after {}s waiting for actor " - "method '{}' to succeed.".format( - ACTOR_FAILURE_RETRY_TIMEOUT_S, name)) - - return retry_method - - method.__ray_invocation_decorator__ = decorate_with_retry - return cls - - -def retry_actor_failures(f, *args, **kwargs): - start = time.time() - while time.time() - start < ACTOR_FAILURE_RETRY_TIMEOUT_S: - try: - return ray.get(f.remote(*args, **kwargs)) - except ray.exceptions.RayActorError: - logger.warning( - "Actor method '{}' failed, retrying after 100ms".format( - f._method_name)) - time.sleep(0.1) - raise RuntimeError("Timed out after {}s waiting for actor " - "method '{}' to succeed.".format( - ACTOR_FAILURE_RETRY_TIMEOUT_S, f._method_name)) - - -async def retry_actor_failures_async(f, *args, **kwargs): - start = time.time() - while time.time() - start < ACTOR_FAILURE_RETRY_TIMEOUT_S: - try: - return await f.remote(*args, **kwargs) - except ray.exceptions.RayActorError: - logger.warning( - "Actor method '{}' failed, retrying after 100ms".format( - f._method_name)) - await asyncio.sleep(0.1) - raise RuntimeError("Timed out after {}s waiting for actor " - "method '{}' to succeed.".format( - ACTOR_FAILURE_RETRY_TIMEOUT_S, f._method_name)) - - def format_actor_name(actor_name, instance_name=None): if instance_name is None: return actor_name