diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index e1e6f7c2d..61722e597 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -124,12 +124,16 @@ def init(blocking=False, RequestMetadata.ray_serialize, RequestMetadata.ray_deserialize) + # TODO(edoakes): for now, always start the HTTP proxy on the node that + # serve.init() was run on. We should consider making this configurable + # in the future. + http_node_id = ray.state.current_node_id() master_actor = ServeMaster.options( detached=True, name=SERVE_MASTER_NAME, max_restarts=-1, - ).remote(queueing_policy.value, policy_kwargs, start_server, http_host, - http_port, metric_exporter) + ).remote(queueing_policy.value, policy_kwargs, start_server, http_node_id, + http_host, http_port, metric_exporter) if start_server and blocking: block_until_http_ready("http://{}:{}/-/routes".format( diff --git a/python/ray/serve/master.py b/python/ray/serve/master.py index bf4615fb9..47eb17a8c 100644 --- a/python/ray/serve/master.py +++ b/python/ray/serve/master.py @@ -50,8 +50,8 @@ class ServeMaster: """ async def __init__(self, router_policy, router_policy_kwargs, - start_http_proxy, http_proxy_host, http_proxy_port, - metric_exporter_class): + start_http_proxy, http_node_id, http_proxy_host, + http_proxy_port, metric_exporter_class): # Used to read/write checkpoints. # TODO(edoakes): namespace the master actor and its checkpoints. self.kv_store = RayInternalKVStore() @@ -91,7 +91,8 @@ class ServeMaster: self._get_or_start_metric_exporter(metric_exporter_class) self._get_or_start_router(router_policy, router_policy_kwargs) if start_http_proxy: - self._get_or_start_http_proxy(http_proxy_host, http_proxy_port) + self._get_or_start_http_proxy(http_node_id, http_proxy_host, + http_proxy_port) # NOTE(edoakes): unfortunately, we can't completely recover from a # checkpoint in the constructor because we block while waiting for @@ -133,7 +134,7 @@ class ServeMaster: """Returns a handle to the router managed by this actor.""" return [self.router] - def _get_or_start_http_proxy(self, host, port): + def _get_or_start_http_proxy(self, node_id, host, port): """Get the HTTP proxy belonging to this serve cluster. If the HTTP proxy does not already exist, it will be started. @@ -142,12 +143,16 @@ class ServeMaster: self.http_proxy = ray.util.get_actor(SERVE_PROXY_NAME) except ValueError: logger.info( - "Starting HTTP proxy with name '{}'".format(SERVE_PROXY_NAME)) + "Starting HTTP proxy with name '{}' on node '{}'".format( + SERVE_PROXY_NAME, node_id)) self.http_proxy = async_retryable(HTTPProxyActor).options( detached=True, name=SERVE_PROXY_NAME, max_concurrency=ASYNC_CONCURRENCY, max_restarts=-1, + resources={ + node_id: 0.01 + }, ).remote(host, port) def get_http_proxy(self):