From e5b6566d28bd0215588f5b47d2cf80867d07b65d Mon Sep 17 00:00:00 2001 From: krfricke Date: Fri, 29 May 2020 22:25:35 +0200 Subject: [PATCH] Remove `blocking` flag from `serve.init()` (#8654) --- ci/long_running_tests/workloads/serve.py | 2 +- .../workloads/serve_failure.py | 2 +- python/ray/serve/api.py | 11 ++++------- python/ray/serve/constants.py | 3 +++ python/ray/serve/examples/benchmark.py | 2 +- python/ray/serve/examples/echo.py | 2 +- python/ray/serve/examples/echo_actor.py | 2 +- python/ray/serve/examples/echo_actor_batch.py | 2 +- python/ray/serve/examples/echo_batching.py | 2 +- python/ray/serve/examples/echo_error.py | 2 +- .../ray/serve/examples/echo_fixed_packing.py | 1 - python/ray/serve/examples/echo_full.py | 3 +-- python/ray/serve/examples/echo_pipeline.py | 3 +-- python/ray/serve/examples/echo_round_robin.py | 2 +- python/ray/serve/examples/echo_slo_reverse.py | 3 +-- python/ray/serve/examples/echo_split.py | 2 +- python/ray/serve/tests/conftest.py | 2 +- python/ray/serve/tests/test_api.py | 4 ++-- python/ray/serve/utils.py | 18 +++++++++--------- 19 files changed, 32 insertions(+), 36 deletions(-) diff --git a/ci/long_running_tests/workloads/serve.py b/ci/long_running_tests/workloads/serve.py index ee75010d4..cb05af6d2 100644 --- a/ci/long_running_tests/workloads/serve.py +++ b/ci/long_running_tests/workloads/serve.py @@ -32,7 +32,7 @@ subprocess.call([ ]) ray.init(address=cluster.address, include_webui=True, webui_host="0.0.0.0") -serve.init(blocking=True) +serve.init() @serve.accept_batch diff --git a/ci/long_running_tests/workloads/serve_failure.py b/ci/long_running_tests/workloads/serve_failure.py index 5c7047e4f..48c5e09fe 100644 --- a/ci/long_running_tests/workloads/serve_failure.py +++ b/ci/long_running_tests/workloads/serve_failure.py @@ -31,7 +31,7 @@ ray.init( include_webui=True, webui_host="0.0.0.0", log_to_driver=False) -serve.init(blocking=True) +serve.init() @ray.remote diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 52ede1749..75a3cca8b 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -4,7 +4,7 @@ from multiprocessing import cpu_count import ray from ray.serve.constants import (DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT, - SERVE_MASTER_NAME) + SERVE_MASTER_NAME, HTTP_PROXY_TIMEOUT) from ray.serve.master import ServeMaster from ray.serve.handle import RayServeHandle from ray.serve.utils import (block_until_http_ready, format_actor_name, @@ -61,7 +61,6 @@ def accept_batch(f): def init(cluster_name=None, - blocking=False, http_host=DEFAULT_HTTP_HOST, http_port=DEFAULT_HTTP_PORT, ray_init_kwargs={ @@ -81,8 +80,6 @@ def init(cluster_name=None, cluster_name (str): A unique name for this serve cluster. This allows multiple serve clusters to run on the same ray cluster. Must be specified in all subsequent serve.init() calls. - blocking (bool): If true, the function will wait for the HTTP server to - be healthy, and other components to be ready before returns. http_host (str): Host for HTTP server. Default to "0.0.0.0". http_port (int): Port for HTTP server. Default to 8000. ray_init_kwargs (dict): Argument passed to ray.init, if there is no ray @@ -125,9 +122,9 @@ def init(cluster_name=None, max_restarts=-1, ).remote(cluster_name, http_node_id, http_host, http_port, metric_exporter) - if blocking: - block_until_http_ready("http://{}:{}/-/routes".format( - http_host, http_port)) + block_until_http_ready( + "http://{}:{}/-/routes".format(http_host, http_port), + timeout=HTTP_PROXY_TIMEOUT) @_ensure_connected diff --git a/python/ray/serve/constants.py b/python/ray/serve/constants.py index 5648f7051..e72f66657 100644 --- a/python/ray/serve/constants.py +++ b/python/ray/serve/constants.py @@ -27,3 +27,6 @@ DEFAULT_LATENCY_SLO_MS = 1e9 #: Interval for metric client to push metrics to exporters METRIC_PUSH_INTERVAL_S = 2 + +#: Time to wait for HTTP proxy in `serve.init()` +HTTP_PROXY_TIMEOUT = 60 diff --git a/python/ray/serve/examples/benchmark.py b/python/ray/serve/examples/benchmark.py index ea51337b2..cf34c4d9d 100644 --- a/python/ray/serve/examples/benchmark.py +++ b/python/ray/serve/examples/benchmark.py @@ -5,7 +5,7 @@ import time import pandas as pd from tqdm import tqdm -serve.init(blocking=True) +serve.init() def noop(_): diff --git a/python/ray/serve/examples/echo.py b/python/ray/serve/examples/echo.py index 5e5dbbaf7..73c326099 100644 --- a/python/ray/serve/examples/echo.py +++ b/python/ray/serve/examples/echo.py @@ -14,7 +14,7 @@ def echo(flask_request): return "hello " + flask_request.args.get("name", "serve!") -serve.init(blocking=True) +serve.init() serve.create_endpoint("my_endpoint", "/echo") serve.create_backend("echo:v1", echo) diff --git a/python/ray/serve/examples/echo_actor.py b/python/ray/serve/examples/echo_actor.py index 804396cad..dadf8628c 100644 --- a/python/ray/serve/examples/echo_actor.py +++ b/python/ray/serve/examples/echo_actor.py @@ -24,7 +24,7 @@ class MagicCounter: return base_number + self.increment -serve.init(blocking=True) +serve.init() serve.create_endpoint("magic_counter", "/counter") serve.create_backend("counter:v1", MagicCounter, 42) # increment=42 serve.set_traffic("magic_counter", {"counter:v1": 1.0}) diff --git a/python/ray/serve/examples/echo_actor_batch.py b/python/ray/serve/examples/echo_actor_batch.py index 7e5893fdc..cad989eb1 100644 --- a/python/ray/serve/examples/echo_actor_batch.py +++ b/python/ray/serve/examples/echo_actor_batch.py @@ -35,7 +35,7 @@ class MagicCounter: return result -serve.init(blocking=True) +serve.init() serve.create_endpoint("magic_counter", "/counter") serve.create_backend( "counter:v1", MagicCounter, 42, diff --git a/python/ray/serve/examples/echo_batching.py b/python/ray/serve/examples/echo_batching.py index fa769e81f..a42fb0fb5 100644 --- a/python/ray/serve/examples/echo_batching.py +++ b/python/ray/serve/examples/echo_batching.py @@ -26,7 +26,7 @@ class MagicCounter: return "" -serve.init(blocking=True) +serve.init() serve.create_endpoint("magic_counter", "/counter") # specify max_batch_size in BackendConfig backend_config = {"max_batch_size": 5} diff --git a/python/ray/serve/examples/echo_error.py b/python/ray/serve/examples/echo_error.py index a50a8354e..dd4671685 100644 --- a/python/ray/serve/examples/echo_error.py +++ b/python/ray/serve/examples/echo_error.py @@ -26,7 +26,7 @@ def echo(_): raise Exception("Something went wrong...") -serve.init(blocking=True) +serve.init() serve.create_endpoint("my_endpoint", "/echo") serve.create_backend("echo:v1", echo) diff --git a/python/ray/serve/examples/echo_fixed_packing.py b/python/ray/serve/examples/echo_fixed_packing.py index 81c7369fc..d2e96d54f 100644 --- a/python/ray/serve/examples/echo_fixed_packing.py +++ b/python/ray/serve/examples/echo_fixed_packing.py @@ -24,7 +24,6 @@ def echo_v2(_): # specify the router policy as FixedPacking with packing num as 5 serve.init( - blocking=True, queueing_policy=serve.RoutePolicy.FixedPacking, policy_kwargs={"packing_num": 5}) diff --git a/python/ray/serve/examples/echo_full.py b/python/ray/serve/examples/echo_full.py index cd753c3ff..36f5fb11c 100644 --- a/python/ray/serve/examples/echo_full.py +++ b/python/ray/serve/examples/echo_full.py @@ -11,8 +11,7 @@ import ray.serve as serve from ray.serve.utils import pformat_color_json # initialize ray serve system. -# blocking=True will wait for HTTP server to be ready to serve request. -serve.init(blocking=True) +serve.init() # an endpoint is associated with an http URL. serve.create_endpoint("my_endpoint", "/echo") diff --git a/python/ray/serve/examples/echo_pipeline.py b/python/ray/serve/examples/echo_pipeline.py index 827f91f11..40582ae9a 100644 --- a/python/ray/serve/examples/echo_pipeline.py +++ b/python/ray/serve/examples/echo_pipeline.py @@ -6,8 +6,7 @@ import ray.serve as serve import time # initialize ray serve system. -# blocking=True will wait for HTTP server to be ready to serve request. -serve.init(blocking=True) +serve.init() # a backend can be a function or class. diff --git a/python/ray/serve/examples/echo_round_robin.py b/python/ray/serve/examples/echo_round_robin.py index c162315d8..1dabeb4e1 100644 --- a/python/ray/serve/examples/echo_round_robin.py +++ b/python/ray/serve/examples/echo_round_robin.py @@ -19,7 +19,7 @@ def echo_v2(_): # specify the router policy as RoundRobin -serve.init(blocking=True, queueing_policy=serve.RoutePolicy.RoundRobin) +serve.init(queueing_policy=serve.RoutePolicy.RoundRobin) # create a service serve.create_endpoint("my_endpoint", "/echo") diff --git a/python/ray/serve/examples/echo_slo_reverse.py b/python/ray/serve/examples/echo_slo_reverse.py index eb79c1abe..f0bf33ab7 100644 --- a/python/ray/serve/examples/echo_slo_reverse.py +++ b/python/ray/serve/examples/echo_slo_reverse.py @@ -10,8 +10,7 @@ import ray import ray.serve as serve # initialize ray serve system. -# blocking=True will wait for HTTP server to be ready to serve request. -serve.init(blocking=True) +serve.init() # an endpoint is associated with an http URL. serve.create_endpoint("my_endpoint", "/echo") diff --git a/python/ray/serve/examples/echo_split.py b/python/ray/serve/examples/echo_split.py index 56c4e43fa..66beb8211 100644 --- a/python/ray/serve/examples/echo_split.py +++ b/python/ray/serve/examples/echo_split.py @@ -18,7 +18,7 @@ def echo_v2(_): return "v2" -serve.init(blocking=True) +serve.init() serve.create_endpoint("my_endpoint", "/echo") serve.create_backend("echo:v1", echo_v1) diff --git a/python/ray/serve/tests/conftest.py b/python/ray/serve/tests/conftest.py index 37c03b2b0..0ccd2eafb 100644 --- a/python/ray/serve/tests/conftest.py +++ b/python/ray/serve/tests/conftest.py @@ -11,7 +11,7 @@ if os.environ.get("RAY_SERVE_INTENTIONALLY_CRASH", False): @pytest.fixture(scope="session") def _shared_serve_instance(): - serve.init(blocking=True, ray_init_kwargs={"num_cpus": 36}) + serve.init(ray_init_kwargs={"num_cpus": 36}) yield diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index e53170591..7cc8de202 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -354,7 +354,7 @@ def test_cluster_name(): backend = "backend" endpoint = "endpoint" - serve.init(cluster_name="cluster1", blocking=True, http_port=8001) + serve.init(cluster_name="cluster1", http_port=8001) serve.create_endpoint(endpoint, route=route) def function(): @@ -367,7 +367,7 @@ def test_cluster_name(): # Create a second cluster on port 8002. Create an endpoint and backend with # the same names and check that they don't collide. - serve.init(cluster_name="cluster2", blocking=True, http_port=8002) + serve.init(cluster_name="cluster2", http_port=8002) serve.create_endpoint(endpoint, route=route) def function(): diff --git a/python/ray/serve/utils.py b/python/ray/serve/utils.py index 8e5aa697d..df04c9cfb 100644 --- a/python/ray/serve/utils.py +++ b/python/ray/serve/utils.py @@ -12,6 +12,7 @@ import os import ray import requests from pygments import formatters, highlight, lexers +from ray.serve.constants import HTTP_PROXY_TIMEOUT from ray.serve.context import FakeFlaskRequest, TaskContext from ray.serve.http_util import build_flask_request import numpy as np @@ -87,9 +88,11 @@ def pformat_color_json(d): return colorful_json -def block_until_http_ready(http_endpoint, num_retries=6, backoff_time_s=1): +def block_until_http_ready(http_endpoint, + backoff_time_s=1, + timeout=HTTP_PROXY_TIMEOUT): http_is_ready = False - retries = num_retries + start_time = time.time() while not http_is_ready: try: @@ -99,14 +102,11 @@ def block_until_http_ready(http_endpoint, num_retries=6, backoff_time_s=1): except Exception: pass - # Exponential backoff - time.sleep(backoff_time_s) - backoff_time_s *= 2 + if 0 < timeout < time.time() - start_time: + raise TimeoutError( + "HTTP proxy not ready after {} seconds.".format(timeout)) - retries -= 1 - if retries == 0: - raise Exception( - "HTTP proxy not ready after {} retries.".format(num_retries)) + time.sleep(backoff_time_s) def get_random_letters(length=6):