[serve] Always use internal KV store (#8270)

This commit is contained in:
Edward Oakes
2020-05-01 14:18:18 -05:00
committed by GitHub
parent 07daff8794
commit 13f718846d
8 changed files with 98 additions and 252 deletions
+2 -18
View File
@@ -1,5 +1,4 @@
from functools import wraps
from tempfile import mkstemp
from multiprocessing import cpu_count
@@ -8,7 +7,6 @@ from ray.serve.constants import (DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT,
SERVE_MASTER_NAME)
from ray.serve.master import ServeMaster
from ray.serve.handle import RayServeHandle
from ray.serve.kv_store_service import SQLiteKVStore
from ray.serve.utils import block_until_http_ready, retry_actor_failures
from ray.serve.exceptions import RayServeException
from ray.serve.config import BackendConfig, ReplicaConfig
@@ -60,8 +58,6 @@ def accept_batch(f):
def init(
kv_store_connector=None,
kv_store_path=None,
blocking=False,
start_server=True,
http_host=DEFAULT_HTTP_HOST,
@@ -83,9 +79,6 @@ def init(
requirement.
Args:
kv_store_connector (callable): Function of (namespace) => TableObject.
We will use a SQLite connector that stores to /tmp by default.
kv_store_path (str, path): Path to the SQLite table.
blocking (bool): If true, the function will wait for the HTTP server to
be healthy, and other components to be ready before returns.
start_server (bool): If true, `serve.init` starts http server.
@@ -128,21 +121,12 @@ def init(
RequestMetadata.ray_serialize,
RequestMetadata.ray_deserialize)
if kv_store_path is None:
_, kv_store_path = mkstemp()
# Serve has not been initialized, perform init sequence
# TODO move the db to session_dir.
# ray.worker._global_node.address_info["session_dir"]
def kv_store_connector(namespace):
return SQLiteKVStore(namespace, db_path=kv_store_path)
master_actor = ServeMaster.options(
detached=True,
name=SERVE_MASTER_NAME,
max_reconstructions=ray.ray_constants.INFINITE_RECONSTRUCTION,
).remote(kv_store_connector, queueing_policy.value, policy_kwargs,
start_server, http_host, http_port, gc_window_seconds)
).remote(queueing_policy.value, policy_kwargs, start_server, http_host,
http_port, gc_window_seconds)
if start_server and blocking:
block_until_http_ready("http://{}:{}/-/routes".format(
+47
View File
@@ -0,0 +1,47 @@
import ray.experimental.internal_kv as ray_kv
class RayInternalKVStore:
"""Wraps ray's internal_kv with a namespace to avoid collisions.
Supports string keys and bytes values, caller must handle serialization.
"""
def __init__(self, namespace=None):
assert ray_kv._internal_kv_initialized()
if namespace is not None and not isinstance(namespace, str):
raise TypeError("namespace must a string, got: {}.".format(
type(namespace)))
self.namespace = namespace or ""
def _format_key(self, key):
return "{ns}-{key}".format(ns=self.namespace, key=key)
def put(self, key, val):
"""Put the key-value pair into the store.
Args:
key (str)
val (bytes)
"""
if not isinstance(key, str):
raise TypeError("key must be a string, got: {}.".format(type(key)))
if not isinstance(val, bytes):
raise TypeError("val must be bytes, got: {}.".format(type(val)))
ray_kv._internal_kv_put(self._format_key(key), val, overwrite=True)
def get(self, key):
"""Get the value associated with the given key from the store.
Args:
key (str)
Returns:
The bytes value. If the key wasn't found, returns None.
"""
if not isinstance(key, str):
raise TypeError("key must be a string, got: {}.".format(type(key)))
return ray_kv._internal_kv_get(self._format_key(key))
-165
View File
@@ -1,165 +0,0 @@
import json
import sqlite3
from abc import ABC
import ray.experimental.internal_kv as ray_kv
class NamespacedKVStore(ABC):
"""Abstract base class for a namespaced key-value store.
The idea is that multiple key-value stores can be created while sharing
the same storage system. The keys of each instance are namespaced to avoid
object_id key collision.
Example:
>>> store_ns1 = NamespacedKVStore(namespace="ns1")
>>> store_ns2 = NamespacedKVStore(namespace="ns2")
# Two stores can share the same connection like Redis or SQL Table
>>> store_ns1.put("same-key", 1)
>>> store_ns1.get("same-key")
1
>>> store_ns2.put("same-key", 2)
>>> store_ns2.get("same-key", 2)
2
"""
def __init__(self, namespace):
raise NotImplementedError()
def get(self, key):
"""Retrieve the value for the given key.
Args:
key (str)
"""
raise NotImplementedError()
def put(self, key, value):
"""Serialize the value and store it under the given key.
Args:
key (str)
value (object): any serializable object. The serialization method
is determined by the subclass implementation.
"""
raise NotImplementedError()
def as_dict(self):
"""Return the entire namespace as a dictionary.
Returns:
data (dict): key value pairs in current namespace
"""
raise NotImplementedError()
class InMemoryKVStore(NamespacedKVStore):
"""A reference implementation used for testing."""
def __init__(self, namespace):
self.data = dict()
# Namespace is ignored, because each namespace is backed by
# an in-memory Python dictionary.
self.namespace = namespace
def get(self, key):
return self.data[key]
def put(self, key, value):
self.data[key] = value
def as_dict(self):
return self.data.copy()
class RayInternalKVStore(NamespacedKVStore):
"""A NamespacedKVStore implementation using ray's `internal_kv`."""
def __init__(self, namespace):
assert ray_kv._internal_kv_initialized()
self.index_key = "RAY_SERVE_INDEX"
self.namespace = namespace
self._put(self.index_key, [])
def _format_key(self, key):
return "{ns}-{key}".format(ns=self.namespace, key=key)
def _remove_format_key(self, formatted_key):
return formatted_key.replace(self.namespace + "-", "", 1)
def _serialize(self, obj):
return json.dumps(obj)
def _deserialize(self, buffer):
return json.loads(buffer)
def _put(self, key, value):
ray_kv._internal_kv_put(
self._format_key(self._serialize(key)),
self._serialize(value),
overwrite=True,
)
def _get(self, key):
return self._deserialize(
ray_kv._internal_kv_get(self._format_key(self._serialize(key))))
def get(self, key):
return self._get(key)
def put(self, key, value):
assert isinstance(key, str), "Key must be a string."
self._put(key, value)
all_keys = set(self._get(self.index_key))
all_keys.add(key)
self._put(self.index_key, list(all_keys))
def as_dict(self):
data = {}
all_keys = self._get(self.index_key)
for key in all_keys:
data[self._remove_format_key(key)] = self._get(key)
return data
class SQLiteKVStore(NamespacedKVStore):
def __init__(self, namespace, db_path):
self.namespace = namespace
self.conn = sqlite3.connect(db_path)
cursor = self.conn.cursor()
cursor.execute(
"CREATE TABLE IF NOT EXISTS {} (key TEXT UNIQUE, value TEXT)".
format(self.namespace))
self.conn.commit()
def put(self, key, value):
cursor = self.conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO {} (key, value) VALUES (?,?)".format(
self.namespace), (key, value))
self.conn.commit()
def get(self, key, default=None):
cursor = self.conn.cursor()
result = list(
cursor.execute(
"SELECT value FROM {} WHERE key = (?)".format(self.namespace),
(key, )))
if len(result) == 0:
return default
else:
# Due to UNIQUE constraint, there can only be one value.
value, *_ = result[0]
return value
def as_dict(self):
cursor = self.conn.cursor()
result = list(
cursor.execute("SELECT key, value FROM {}".format(self.namespace)))
return dict(result)
+7 -6
View File
@@ -9,6 +9,7 @@ import ray.cloudpickle as pickle
from ray.serve.constants import (ASYNC_CONCURRENCY, SERVE_ROUTER_NAME,
SERVE_PROXY_NAME, SERVE_METRIC_MONITOR_NAME)
from ray.serve.http_proxy import HTTPProxyActor
from ray.serve.kv_store import RayInternalKVStore
from ray.serve.metric import (MetricMonitor, start_metric_monitor_loop)
from ray.serve.backend_worker import create_backend_worker
from ray.serve.utils import async_retryable, get_random_letters, logger
@@ -18,6 +19,7 @@ import numpy as np
# Used for testing purposes only. If this is set, the master actor will crash
# after writing each checkpoint with the specified probability.
_CRASH_AFTER_CHECKPOINT_PROBABILITY = 0.0
CHECKPOINT_KEY = "serve-master-checkpoint"
@ray.remote
@@ -46,12 +48,11 @@ class ServeMaster:
requires all implementations here to be idempotent.
"""
async def __init__(self, kv_store_connector, router_class, router_kwargs,
start_http_proxy, http_proxy_host, http_proxy_port,
metric_gc_window_s):
async def __init__(self, router_class, router_kwargs, start_http_proxy,
http_proxy_host, http_proxy_port, metric_gc_window_s):
# Used to read/write checkpoints.
# TODO(edoakes): namespace the master actor and its checkpoints.
self.kv_store_client = kv_store_connector("serve_checkpoints")
self.kv_store = RayInternalKVStore()
# path -> (endpoint, methods).
self.routes = {}
# backend -> (backend_worker, backend_config, replica_config).
@@ -102,7 +103,7 @@ class ServeMaster:
# a checkpoint to the event loop. Other state-changing calls acquire
# this lock and will be blocked until recovering from the checkpoint
# finishes.
checkpoint = self.kv_store_client.get("checkpoint")
checkpoint = self.kv_store.get(CHECKPOINT_KEY)
if checkpoint is None:
logger.debug("No checkpoint found")
else:
@@ -186,7 +187,7 @@ class ServeMaster:
self.replicas_to_start, self.replicas_to_stop,
self.backends_to_remove, self.endpoints_to_remove))
self.kv_store_client.put("checkpoint", checkpoint)
self.kv_store.put(CHECKPOINT_KEY, checkpoint)
logger.debug("Wrote checkpoint in {:.2f}".format(time.time() - start))
if random.random() < _CRASH_AFTER_CHECKPOINT_PROBABILITY:
+1 -7
View File
@@ -1,5 +1,4 @@
import os
import tempfile
import pytest
@@ -12,13 +11,8 @@ if os.environ.get("RAY_SERVE_INTENTIONALLY_CRASH", False):
@pytest.fixture(scope="session")
def serve_instance():
_, new_db_path = tempfile.mkstemp(suffix=".test.db")
serve.init(
kv_store_path=new_db_path,
blocking=True,
ray_init_kwargs={"num_cpus": 36})
serve.init(blocking=True, ray_init_kwargs={"num_cpus": 36})
yield
os.remove(new_db_path)
@pytest.fixture(scope="session")
+40
View File
@@ -0,0 +1,40 @@
import pytest
from ray.serve.kv_store import RayInternalKVStore
def test_ray_internal_kv(ray_instance):
with pytest.raises(TypeError):
RayInternalKVStore(namespace=1)
RayInternalKVStore(namespace=b"")
kv = RayInternalKVStore()
with pytest.raises(TypeError):
kv.put(1, b"1")
with pytest.raises(TypeError):
kv.put("1", 1)
with pytest.raises(TypeError):
kv.put("1", "1")
kv.put("1", b"2")
assert kv.get("1") == b"2"
kv.put("2", b"4")
assert kv.get("2") == b"4"
kv.put("1", b"3")
assert kv.get("1") == b"3"
assert kv.get("2") == b"4"
def test_ray_internal_kv_collisions(ray_instance):
kv1 = RayInternalKVStore()
kv1.put("1", b"1")
assert kv1.get("1") == b"1"
kv2 = RayInternalKVStore("namespace")
assert kv2.get("1") is None
kv2.put("1", b"-1")
assert kv2.get("1") == b"-1"
assert kv1.get("1") == b"1"
-54
View File
@@ -1,54 +0,0 @@
import os
import tempfile
from ray.serve.kv_store_service import (InMemoryKVStore, RayInternalKVStore,
SQLiteKVStore)
def test_default_in_memory_kv():
kv = InMemoryKVStore("")
kv.put("1", 2)
assert kv.get("1") == 2
kv.put("1", 3)
assert kv.get("1") == 3
assert kv.as_dict() == {"1": 3}
def test_ray_interal_kv(ray_instance):
kv = RayInternalKVStore("")
kv.put("1", 2)
assert kv.get("1") == 2
kv.put("1", 3)
assert kv.get("1") == 3
assert kv.as_dict() == {"1": 3}
kv = RayInternalKVStore("othernamespace")
kv.put("1", 2)
assert kv.get("1") == 2
kv.put("1", 3)
assert kv.get("1") == 3
assert kv.as_dict() == {"1": 3}
def test_sqlite_kv():
_, path = tempfile.mkstemp()
# Test get
kv = SQLiteKVStore("routing_table", db_path=path)
kv.put("/api", "api-endpoint")
assert kv.get("/api") == "api-endpoint"
assert kv.get("not-exist") is None
# Test namespace
kv2 = SQLiteKVStore("other_table", db_path=path)
kv2.put("/api", "api-endpoint-two")
assert kv2.get("/api") == "api-endpoint-two"
# Test as dict
assert kv.as_dict() == {"/api": "api-endpoint"}
# Test override
kv.put("/api", "api-new")
assert kv.get("/api") == "api-new"
os.remove(path)