mirror of
https://github.com/wassname/ray.git
synced 2026-07-04 08:11:44 +08:00
[Serve] Add dependency management (#11743)
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
from ray.serve.api import (accept_batch, Client, connect, start) # noqa: F401
|
||||
from ray.serve.config import BackendConfig
|
||||
from ray.serve.env import CondaEnv
|
||||
|
||||
# Mute the warning because Serve sometimes intentionally calls
|
||||
# ray.get inside async actors.
|
||||
@@ -9,7 +10,8 @@ ray.worker.blocking_get_inside_async_warned = True
|
||||
__all__ = [
|
||||
"accept_batch",
|
||||
"BackendConfig",
|
||||
"connect"
|
||||
"CondaEnv",
|
||||
"connect",
|
||||
"Client",
|
||||
"start",
|
||||
]
|
||||
|
||||
+26
-3
@@ -1,6 +1,7 @@
|
||||
import atexit
|
||||
from functools import wraps
|
||||
import random
|
||||
import os
|
||||
|
||||
import ray
|
||||
from ray.serve.constants import (DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT,
|
||||
@@ -8,9 +9,11 @@ from ray.serve.constants import (DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT,
|
||||
from ray.serve.controller import ServeController
|
||||
from ray.serve.handle import RayServeHandle
|
||||
from ray.serve.utils import (block_until_http_ready, format_actor_name,
|
||||
get_random_letters, logger, get_node_id_for_actor)
|
||||
get_random_letters, logger, get_node_id_for_actor,
|
||||
get_conda_env_dir)
|
||||
from ray.serve.exceptions import RayServeException
|
||||
from ray.serve.config import BackendConfig, ReplicaConfig, BackendMetadata
|
||||
from ray.serve.env import CondaEnv
|
||||
from ray.actor import ActorHandle
|
||||
from typing import Any, Callable, Dict, List, Optional, Type, Union
|
||||
|
||||
@@ -198,8 +201,8 @@ class Client:
|
||||
func_or_class: Union[Callable, Type[Callable]],
|
||||
*actor_init_args: Any,
|
||||
ray_actor_options: Optional[Dict] = None,
|
||||
config: Optional[Union[BackendConfig, Dict[str, Any]]] = None
|
||||
) -> None:
|
||||
config: Optional[Union[BackendConfig, Dict[str, Any]]] = None,
|
||||
env: Optional[CondaEnv] = None) -> None:
|
||||
"""Create a backend with the provided tag.
|
||||
|
||||
The backend will serve requests with func_or_class.
|
||||
@@ -225,6 +228,12 @@ class Client:
|
||||
- "max_concurrent_queries": the maximum number of queries that
|
||||
will be sent to a replica of this backend without receiving a
|
||||
response.
|
||||
env (serve.CondaEnv, optional): conda environment to run this
|
||||
backend in. Requires the caller to be running in an activated
|
||||
conda environment (not necessarily ``env``), and requires
|
||||
``env`` to be an existing conda environment on all nodes. If
|
||||
``env`` is not provided but conda is activated, the backend
|
||||
will run in the conda environment of the caller.
|
||||
"""
|
||||
if backend_tag in self.list_backends().keys():
|
||||
raise ValueError(
|
||||
@@ -233,6 +242,20 @@ class Client:
|
||||
|
||||
if config is None:
|
||||
config = {}
|
||||
if ray_actor_options is None:
|
||||
ray_actor_options = {}
|
||||
if env is None:
|
||||
# If conda is activated, default to conda env of this process.
|
||||
if os.environ.get("CONDA_PREFIX"):
|
||||
if "override_environment_variables" not in ray_actor_options:
|
||||
ray_actor_options["override_environment_variables"] = {}
|
||||
ray_actor_options["override_environment_variables"].update({
|
||||
"PYTHONHOME": os.environ.get("CONDA_PREFIX")
|
||||
})
|
||||
else:
|
||||
conda_env_dir = get_conda_env_dir(env.name)
|
||||
ray_actor_options.update(
|
||||
override_environment_variables={"PYTHONHOME": conda_env_dir})
|
||||
replica_config = ReplicaConfig(
|
||||
func_or_class,
|
||||
*actor_init_args,
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
from pydantic.dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class CondaEnv:
|
||||
name: str
|
||||
@@ -0,0 +1,21 @@
|
||||
import requests
|
||||
import ray
|
||||
from ray import serve
|
||||
from ray.serve import CondaEnv
|
||||
import tensorflow as tf
|
||||
|
||||
ray.init()
|
||||
client = serve.start()
|
||||
|
||||
|
||||
def tf_version(request):
|
||||
return ("Tensorflow " + tf.__version__)
|
||||
|
||||
|
||||
client.create_backend("tf1", tf_version, env=CondaEnv("ray-tf1"))
|
||||
client.create_endpoint("tf1", backend="tf1", route="/tf1")
|
||||
client.create_backend("tf2", tf_version, env=CondaEnv("ray-tf2"))
|
||||
client.create_endpoint("tf2", backend="tf2", route="/tf2")
|
||||
|
||||
print(requests.get("http://127.0.0.1:8000/tf1").text) # Tensorflow 1.15.0
|
||||
print(requests.get("http://127.0.0.1:8000/tf2").text) # Tensorflow 2.3.0
|
||||
@@ -1,4 +1,5 @@
|
||||
import asyncio
|
||||
import os
|
||||
import json
|
||||
from copy import deepcopy
|
||||
|
||||
@@ -6,7 +7,8 @@ import numpy as np
|
||||
import pytest
|
||||
|
||||
from ray.serve.utils import (ServeEncoder, chain_future, unpack_future,
|
||||
try_schedule_resources_on_nodes)
|
||||
try_schedule_resources_on_nodes,
|
||||
get_conda_env_dir)
|
||||
|
||||
|
||||
def test_bytes_encoder():
|
||||
@@ -109,6 +111,20 @@ def test_mock_scheduler():
|
||||
deepcopy(ray_nodes)) == [False]
|
||||
|
||||
|
||||
def test_get_conda_env_dir(tmp_path):
|
||||
d = tmp_path / "tf1"
|
||||
d.mkdir()
|
||||
os.environ["CONDA_PREFIX"] = str(d)
|
||||
with pytest.raises(ValueError):
|
||||
# env does not exist
|
||||
env_dir = get_conda_env_dir("tf2")
|
||||
tf2_dir = tmp_path / "tf2"
|
||||
tf2_dir.mkdir()
|
||||
env_dir = get_conda_env_dir("tf2")
|
||||
assert (env_dir == str(tmp_path / "tf2"))
|
||||
os.environ["CONDA_PREFIX"] = ""
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
sys.exit(pytest.main(["-v", "-s", __file__]))
|
||||
|
||||
@@ -179,6 +179,39 @@ def format_actor_name(actor_name, controller_name=None, *modifiers):
|
||||
return name
|
||||
|
||||
|
||||
def get_conda_env_dir(env_name):
|
||||
"""Given a environment name like `tf1`, find and validate the
|
||||
corresponding conda directory.
|
||||
"""
|
||||
conda_prefix = os.environ.get("CONDA_PREFIX")
|
||||
if conda_prefix is None:
|
||||
raise ValueError(
|
||||
"Serve cannot find environment variables installed by conda. " +
|
||||
"Are you sure you are in a conda env?")
|
||||
|
||||
# There are two cases:
|
||||
# 1. We are in conda base env: CONDA_DEFAULT_ENV=base and
|
||||
# CONDA_PREFIX=$HOME/anaconda3
|
||||
# 2. We are in user created conda env: CONDA_DEFAULT_ENV=$env_name and
|
||||
# CONDA_PREFIX=$HOME/anaconda3/envs/$env_name
|
||||
if os.environ.get("CONDA_DEFAULT_ENV") == "base":
|
||||
# Caller is running in base conda env.
|
||||
# Not recommended by conda, but we can still try to support it.
|
||||
env_dir = os.path.join(conda_prefix, "envs", env_name)
|
||||
else:
|
||||
# Now `conda_prefix` should be something like
|
||||
# $HOME/anaconda3/envs/$env_name
|
||||
# We want to strip the $env_name component.
|
||||
conda_envs_dir = os.path.split(conda_prefix)[0]
|
||||
env_dir = os.path.join(conda_envs_dir, env_name)
|
||||
if not os.path.isdir(env_dir):
|
||||
raise ValueError(
|
||||
"conda env " + env_name +
|
||||
" not found in conda envs directory. Run `conda env list` to " +
|
||||
"verify the name is correct.")
|
||||
return env_dir
|
||||
|
||||
|
||||
@singledispatch
|
||||
def chain_future(src, dst):
|
||||
"""Base method for chaining futures together.
|
||||
|
||||
Reference in New Issue
Block a user