diff --git a/ci/long_running_tests/workloads/serve.py b/ci/long_running_tests/workloads/serve.py index 7a963c929..ee75010d4 100644 --- a/ci/long_running_tests/workloads/serve.py +++ b/ci/long_running_tests/workloads/serve.py @@ -7,7 +7,6 @@ import requests import ray from ray import serve from ray.cluster_utils import Cluster -from ray.serve.kv_store_service import RayInternalKVStore num_redis_shards = 1 redis_max_memory = 10**8 @@ -33,7 +32,7 @@ subprocess.call([ ]) ray.init(address=cluster.address, include_webui=True, webui_host="0.0.0.0") -serve.init(blocking=True, kv_store_connector=lambda ns: RayInternalKVStore(ns)) +serve.init(blocking=True) @serve.accept_batch diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 99fbaae9e..eeba518cd 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -1,5 +1,4 @@ from functools import wraps -from tempfile import mkstemp from multiprocessing import cpu_count @@ -8,7 +7,6 @@ from ray.serve.constants import (DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT, SERVE_MASTER_NAME) from ray.serve.master import ServeMaster from ray.serve.handle import RayServeHandle -from ray.serve.kv_store_service import SQLiteKVStore from ray.serve.utils import block_until_http_ready, retry_actor_failures from ray.serve.exceptions import RayServeException from ray.serve.config import BackendConfig, ReplicaConfig @@ -60,8 +58,6 @@ def accept_batch(f): def init( - kv_store_connector=None, - kv_store_path=None, blocking=False, start_server=True, http_host=DEFAULT_HTTP_HOST, @@ -83,9 +79,6 @@ def init( requirement. Args: - kv_store_connector (callable): Function of (namespace) => TableObject. - We will use a SQLite connector that stores to /tmp by default. - kv_store_path (str, path): Path to the SQLite table. blocking (bool): If true, the function will wait for the HTTP server to be healthy, and other components to be ready before returns. start_server (bool): If true, `serve.init` starts http server. @@ -128,21 +121,12 @@ def init( RequestMetadata.ray_serialize, RequestMetadata.ray_deserialize) - if kv_store_path is None: - _, kv_store_path = mkstemp() - - # Serve has not been initialized, perform init sequence - # TODO move the db to session_dir. - # ray.worker._global_node.address_info["session_dir"] - def kv_store_connector(namespace): - return SQLiteKVStore(namespace, db_path=kv_store_path) - master_actor = ServeMaster.options( detached=True, name=SERVE_MASTER_NAME, max_reconstructions=ray.ray_constants.INFINITE_RECONSTRUCTION, - ).remote(kv_store_connector, queueing_policy.value, policy_kwargs, - start_server, http_host, http_port, gc_window_seconds) + ).remote(queueing_policy.value, policy_kwargs, start_server, http_host, + http_port, gc_window_seconds) if start_server and blocking: block_until_http_ready("http://{}:{}/-/routes".format( diff --git a/python/ray/serve/kv_store.py b/python/ray/serve/kv_store.py new file mode 100644 index 000000000..e89377650 --- /dev/null +++ b/python/ray/serve/kv_store.py @@ -0,0 +1,47 @@ +import ray.experimental.internal_kv as ray_kv + + +class RayInternalKVStore: + """Wraps ray's internal_kv with a namespace to avoid collisions. + + Supports string keys and bytes values, caller must handle serialization. + """ + + def __init__(self, namespace=None): + assert ray_kv._internal_kv_initialized() + if namespace is not None and not isinstance(namespace, str): + raise TypeError("namespace must a string, got: {}.".format( + type(namespace))) + + self.namespace = namespace or "" + + def _format_key(self, key): + return "{ns}-{key}".format(ns=self.namespace, key=key) + + def put(self, key, val): + """Put the key-value pair into the store. + + Args: + key (str) + val (bytes) + """ + if not isinstance(key, str): + raise TypeError("key must be a string, got: {}.".format(type(key))) + if not isinstance(val, bytes): + raise TypeError("val must be bytes, got: {}.".format(type(val))) + + ray_kv._internal_kv_put(self._format_key(key), val, overwrite=True) + + def get(self, key): + """Get the value associated with the given key from the store. + + Args: + key (str) + + Returns: + The bytes value. If the key wasn't found, returns None. + """ + if not isinstance(key, str): + raise TypeError("key must be a string, got: {}.".format(type(key))) + + return ray_kv._internal_kv_get(self._format_key(key)) diff --git a/python/ray/serve/kv_store_service.py b/python/ray/serve/kv_store_service.py deleted file mode 100644 index 80e1d82a8..000000000 --- a/python/ray/serve/kv_store_service.py +++ /dev/null @@ -1,165 +0,0 @@ -import json -import sqlite3 -from abc import ABC - -import ray.experimental.internal_kv as ray_kv - - -class NamespacedKVStore(ABC): - """Abstract base class for a namespaced key-value store. - - The idea is that multiple key-value stores can be created while sharing - the same storage system. The keys of each instance are namespaced to avoid - object_id key collision. - - Example: - - >>> store_ns1 = NamespacedKVStore(namespace="ns1") - >>> store_ns2 = NamespacedKVStore(namespace="ns2") - # Two stores can share the same connection like Redis or SQL Table - >>> store_ns1.put("same-key", 1) - >>> store_ns1.get("same-key") - 1 - >>> store_ns2.put("same-key", 2) - >>> store_ns2.get("same-key", 2) - 2 - """ - - def __init__(self, namespace): - raise NotImplementedError() - - def get(self, key): - """Retrieve the value for the given key. - - Args: - key (str) - """ - raise NotImplementedError() - - def put(self, key, value): - """Serialize the value and store it under the given key. - - Args: - key (str) - value (object): any serializable object. The serialization method - is determined by the subclass implementation. - """ - raise NotImplementedError() - - def as_dict(self): - """Return the entire namespace as a dictionary. - - Returns: - data (dict): key value pairs in current namespace - """ - raise NotImplementedError() - - -class InMemoryKVStore(NamespacedKVStore): - """A reference implementation used for testing.""" - - def __init__(self, namespace): - self.data = dict() - - # Namespace is ignored, because each namespace is backed by - # an in-memory Python dictionary. - self.namespace = namespace - - def get(self, key): - return self.data[key] - - def put(self, key, value): - self.data[key] = value - - def as_dict(self): - return self.data.copy() - - -class RayInternalKVStore(NamespacedKVStore): - """A NamespacedKVStore implementation using ray's `internal_kv`.""" - - def __init__(self, namespace): - assert ray_kv._internal_kv_initialized() - self.index_key = "RAY_SERVE_INDEX" - self.namespace = namespace - self._put(self.index_key, []) - - def _format_key(self, key): - return "{ns}-{key}".format(ns=self.namespace, key=key) - - def _remove_format_key(self, formatted_key): - return formatted_key.replace(self.namespace + "-", "", 1) - - def _serialize(self, obj): - return json.dumps(obj) - - def _deserialize(self, buffer): - return json.loads(buffer) - - def _put(self, key, value): - ray_kv._internal_kv_put( - self._format_key(self._serialize(key)), - self._serialize(value), - overwrite=True, - ) - - def _get(self, key): - return self._deserialize( - ray_kv._internal_kv_get(self._format_key(self._serialize(key)))) - - def get(self, key): - return self._get(key) - - def put(self, key, value): - assert isinstance(key, str), "Key must be a string." - - self._put(key, value) - - all_keys = set(self._get(self.index_key)) - all_keys.add(key) - self._put(self.index_key, list(all_keys)) - - def as_dict(self): - data = {} - all_keys = self._get(self.index_key) - for key in all_keys: - data[self._remove_format_key(key)] = self._get(key) - return data - - -class SQLiteKVStore(NamespacedKVStore): - def __init__(self, namespace, db_path): - self.namespace = namespace - self.conn = sqlite3.connect(db_path) - - cursor = self.conn.cursor() - cursor.execute( - "CREATE TABLE IF NOT EXISTS {} (key TEXT UNIQUE, value TEXT)". - format(self.namespace)) - self.conn.commit() - - def put(self, key, value): - cursor = self.conn.cursor() - cursor.execute( - "INSERT OR REPLACE INTO {} (key, value) VALUES (?,?)".format( - self.namespace), (key, value)) - self.conn.commit() - - def get(self, key, default=None): - cursor = self.conn.cursor() - result = list( - cursor.execute( - "SELECT value FROM {} WHERE key = (?)".format(self.namespace), - (key, ))) - if len(result) == 0: - return default - else: - # Due to UNIQUE constraint, there can only be one value. - value, *_ = result[0] - return value - - def as_dict(self): - cursor = self.conn.cursor() - result = list( - cursor.execute("SELECT key, value FROM {}".format(self.namespace))) - return dict(result) diff --git a/python/ray/serve/master.py b/python/ray/serve/master.py index 9a2114756..6cd061a1d 100644 --- a/python/ray/serve/master.py +++ b/python/ray/serve/master.py @@ -9,6 +9,7 @@ import ray.cloudpickle as pickle from ray.serve.constants import (ASYNC_CONCURRENCY, SERVE_ROUTER_NAME, SERVE_PROXY_NAME, SERVE_METRIC_MONITOR_NAME) from ray.serve.http_proxy import HTTPProxyActor +from ray.serve.kv_store import RayInternalKVStore from ray.serve.metric import (MetricMonitor, start_metric_monitor_loop) from ray.serve.backend_worker import create_backend_worker from ray.serve.utils import async_retryable, get_random_letters, logger @@ -18,6 +19,7 @@ import numpy as np # Used for testing purposes only. If this is set, the master actor will crash # after writing each checkpoint with the specified probability. _CRASH_AFTER_CHECKPOINT_PROBABILITY = 0.0 +CHECKPOINT_KEY = "serve-master-checkpoint" @ray.remote @@ -46,12 +48,11 @@ class ServeMaster: requires all implementations here to be idempotent. """ - async def __init__(self, kv_store_connector, router_class, router_kwargs, - start_http_proxy, http_proxy_host, http_proxy_port, - metric_gc_window_s): + async def __init__(self, router_class, router_kwargs, start_http_proxy, + http_proxy_host, http_proxy_port, metric_gc_window_s): # Used to read/write checkpoints. # TODO(edoakes): namespace the master actor and its checkpoints. - self.kv_store_client = kv_store_connector("serve_checkpoints") + self.kv_store = RayInternalKVStore() # path -> (endpoint, methods). self.routes = {} # backend -> (backend_worker, backend_config, replica_config). @@ -102,7 +103,7 @@ class ServeMaster: # a checkpoint to the event loop. Other state-changing calls acquire # this lock and will be blocked until recovering from the checkpoint # finishes. - checkpoint = self.kv_store_client.get("checkpoint") + checkpoint = self.kv_store.get(CHECKPOINT_KEY) if checkpoint is None: logger.debug("No checkpoint found") else: @@ -186,7 +187,7 @@ class ServeMaster: self.replicas_to_start, self.replicas_to_stop, self.backends_to_remove, self.endpoints_to_remove)) - self.kv_store_client.put("checkpoint", checkpoint) + self.kv_store.put(CHECKPOINT_KEY, checkpoint) logger.debug("Wrote checkpoint in {:.2f}".format(time.time() - start)) if random.random() < _CRASH_AFTER_CHECKPOINT_PROBABILITY: diff --git a/python/ray/serve/tests/conftest.py b/python/ray/serve/tests/conftest.py index 2068f8c66..ec581698e 100644 --- a/python/ray/serve/tests/conftest.py +++ b/python/ray/serve/tests/conftest.py @@ -1,5 +1,4 @@ import os -import tempfile import pytest @@ -12,13 +11,8 @@ if os.environ.get("RAY_SERVE_INTENTIONALLY_CRASH", False): @pytest.fixture(scope="session") def serve_instance(): - _, new_db_path = tempfile.mkstemp(suffix=".test.db") - serve.init( - kv_store_path=new_db_path, - blocking=True, - ray_init_kwargs={"num_cpus": 36}) + serve.init(blocking=True, ray_init_kwargs={"num_cpus": 36}) yield - os.remove(new_db_path) @pytest.fixture(scope="session") diff --git a/python/ray/serve/tests/test_kv_store.py b/python/ray/serve/tests/test_kv_store.py new file mode 100644 index 000000000..82cd26b1c --- /dev/null +++ b/python/ray/serve/tests/test_kv_store.py @@ -0,0 +1,40 @@ +import pytest + +from ray.serve.kv_store import RayInternalKVStore + + +def test_ray_internal_kv(ray_instance): + with pytest.raises(TypeError): + RayInternalKVStore(namespace=1) + RayInternalKVStore(namespace=b"") + + kv = RayInternalKVStore() + + with pytest.raises(TypeError): + kv.put(1, b"1") + with pytest.raises(TypeError): + kv.put("1", 1) + with pytest.raises(TypeError): + kv.put("1", "1") + + kv.put("1", b"2") + assert kv.get("1") == b"2" + kv.put("2", b"4") + assert kv.get("2") == b"4" + kv.put("1", b"3") + assert kv.get("1") == b"3" + assert kv.get("2") == b"4" + + +def test_ray_internal_kv_collisions(ray_instance): + kv1 = RayInternalKVStore() + kv1.put("1", b"1") + assert kv1.get("1") == b"1" + + kv2 = RayInternalKVStore("namespace") + + assert kv2.get("1") is None + + kv2.put("1", b"-1") + assert kv2.get("1") == b"-1" + assert kv1.get("1") == b"1" diff --git a/python/ray/serve/tests/test_routing.py b/python/ray/serve/tests/test_routing.py deleted file mode 100644 index dc19ba2bd..000000000 --- a/python/ray/serve/tests/test_routing.py +++ /dev/null @@ -1,54 +0,0 @@ -import os -import tempfile - -from ray.serve.kv_store_service import (InMemoryKVStore, RayInternalKVStore, - SQLiteKVStore) - - -def test_default_in_memory_kv(): - kv = InMemoryKVStore("") - kv.put("1", 2) - assert kv.get("1") == 2 - kv.put("1", 3) - assert kv.get("1") == 3 - assert kv.as_dict() == {"1": 3} - - -def test_ray_interal_kv(ray_instance): - kv = RayInternalKVStore("") - kv.put("1", 2) - assert kv.get("1") == 2 - kv.put("1", 3) - assert kv.get("1") == 3 - assert kv.as_dict() == {"1": 3} - - kv = RayInternalKVStore("othernamespace") - kv.put("1", 2) - assert kv.get("1") == 2 - kv.put("1", 3) - assert kv.get("1") == 3 - assert kv.as_dict() == {"1": 3} - - -def test_sqlite_kv(): - _, path = tempfile.mkstemp() - - # Test get - kv = SQLiteKVStore("routing_table", db_path=path) - kv.put("/api", "api-endpoint") - assert kv.get("/api") == "api-endpoint" - assert kv.get("not-exist") is None - - # Test namespace - kv2 = SQLiteKVStore("other_table", db_path=path) - kv2.put("/api", "api-endpoint-two") - assert kv2.get("/api") == "api-endpoint-two" - - # Test as dict - assert kv.as_dict() == {"/api": "api-endpoint"} - - # Test override - kv.put("/api", "api-new") - assert kv.get("/api") == "api-new" - - os.remove(path)