mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 10:11:52 +08:00
[Object Spilling] Initial Iteration of S3 adapter. (#11379)
* Finished the first iteration. * Removed unnecessary code. * Smartopen impl. * Make sure tests passed. * Addressed code review. * Addressed code review. * Fix issues. * Fix issues.
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import abc
|
||||
import os
|
||||
from typing import List
|
||||
|
||||
import ray
|
||||
|
||||
|
||||
@@ -10,6 +11,14 @@ class ExternalStorage(metaclass=abc.ABCMeta):
|
||||
This class provides some useful functions for zero-copy object
|
||||
put/get from plasma store. Also it specifies the interface for
|
||||
object spilling.
|
||||
|
||||
When inheriting this class, please make sure to implement validation
|
||||
logic inside __init__ method. When ray instance starts, it will
|
||||
instantiating external storage to validate the config.
|
||||
|
||||
Raises:
|
||||
ValueError: when given configuration for
|
||||
the external storage is invalid.
|
||||
"""
|
||||
|
||||
def _get_objects_from_store(self, object_refs):
|
||||
@@ -58,11 +67,19 @@ class NullStorage(ExternalStorage):
|
||||
|
||||
|
||||
class FileSystemStorage(ExternalStorage):
|
||||
"""The class for filesystem-like external storage."""
|
||||
"""The class for filesystem-like external storage.
|
||||
|
||||
Raises:
|
||||
ValueError: Raises directory path to
|
||||
spill objects doesn't exist.
|
||||
"""
|
||||
|
||||
def __init__(self, directory_path):
|
||||
self.directory_path = directory_path
|
||||
self.prefix = "ray_spilled_object_"
|
||||
if not os.path.exists(self.directory_path):
|
||||
raise ValueError("The given directory path to store objects, "
|
||||
f"{self.directory_path}, doesn't exist.")
|
||||
|
||||
def spill_objects(self, object_refs):
|
||||
keys = []
|
||||
@@ -91,6 +108,84 @@ class FileSystemStorage(ExternalStorage):
|
||||
self._put_object_to_store(metadata, buf_len, f, ref)
|
||||
|
||||
|
||||
class ExternalStorageSmartOpenImpl(ExternalStorage):
|
||||
"""The external storage class implemented by smart_open.
|
||||
(https://github.com/RaRe-Technologies/smart_open)
|
||||
|
||||
Smart open supports multiple backend with the same APIs.
|
||||
|
||||
Args:
|
||||
uri(str): Storage URI used for smart open.
|
||||
prefix(str): Prefix of objects that are stored.
|
||||
override_transport_params(dict): Overriding the default value of
|
||||
transport_params for smart-open library.
|
||||
|
||||
Raises:
|
||||
ModuleNotFoundError: If it fails to setup.
|
||||
For example, if smart open library
|
||||
is not downloaded, this will fail.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
uri: str,
|
||||
prefix: str = "ray_spilled_object_",
|
||||
override_transport_params: dict = None):
|
||||
try:
|
||||
from smart_open import open # noqa
|
||||
except ModuleNotFoundError as e:
|
||||
raise ModuleNotFoundError(
|
||||
"Smart open is chosen to be a object spilling "
|
||||
"external storage, but smart_open "
|
||||
f"is not downloaded. Original error: {e}")
|
||||
|
||||
self.uri = uri.strip("/")
|
||||
self.prefix = prefix
|
||||
self.override_transport_params = override_transport_params or {}
|
||||
self.transport_params = {}.update(self.override_transport_params)
|
||||
|
||||
def spill_objects(self, object_refs):
|
||||
keys = []
|
||||
ray_object_pairs = self._get_objects_from_store(object_refs)
|
||||
for ref, (buf, metadata) in zip(object_refs, ray_object_pairs):
|
||||
key = self.prefix + ref.hex()
|
||||
self._spill_object(key, ref, buf, metadata)
|
||||
keys.append(key.encode())
|
||||
return keys
|
||||
|
||||
def restore_spilled_objects(self, keys):
|
||||
for k in keys:
|
||||
key = k.decode()
|
||||
ref = ray.ObjectRef(bytes.fromhex(key[len(self.prefix):]))
|
||||
self._restore_spilled_object(key, ref)
|
||||
|
||||
def _spill_object(self, key, ref, buf, metadata):
|
||||
from smart_open import open
|
||||
with open(
|
||||
self._build_uri(key), "wb",
|
||||
transport_params=self.transport_params) as file_like:
|
||||
metadata_len = len(metadata)
|
||||
buf_len = len(buf)
|
||||
file_like.write(metadata_len.to_bytes(8, byteorder="little"))
|
||||
file_like.write(buf_len.to_bytes(8, byteorder="little"))
|
||||
file_like.write(metadata)
|
||||
file_like.write(memoryview(buf))
|
||||
|
||||
def _restore_spilled_object(self, key, ref):
|
||||
from smart_open import open
|
||||
with open(
|
||||
self._build_uri(key), "rb",
|
||||
transport_params=self.transport_params) as file_like:
|
||||
metadata_len = int.from_bytes(
|
||||
file_like.read(8), byteorder="little")
|
||||
buf_len = int.from_bytes(file_like.read(8), byteorder="little")
|
||||
metadata = file_like.read(metadata_len)
|
||||
# read remaining data to our buffer
|
||||
self._put_object_to_store(metadata, buf_len, file_like, ref)
|
||||
|
||||
def _build_uri(self, key):
|
||||
return f"{self.uri}/{key}"
|
||||
|
||||
|
||||
_external_storage = NullStorage()
|
||||
|
||||
|
||||
@@ -101,12 +196,20 @@ def setup_external_storage(config):
|
||||
storage_type = config["type"]
|
||||
if storage_type == "filesystem":
|
||||
_external_storage = FileSystemStorage(**config["params"])
|
||||
elif storage_type == "smart_open":
|
||||
_external_storage = ExternalStorageSmartOpenImpl(
|
||||
**config["params"])
|
||||
else:
|
||||
raise ValueError(f"Unknown external storage type: {storage_type}")
|
||||
else:
|
||||
_external_storage = NullStorage()
|
||||
|
||||
|
||||
def reset_external_storage():
|
||||
global _external_storage
|
||||
_external_storage = NullStorage()
|
||||
|
||||
|
||||
def spill_objects(object_refs):
|
||||
"""Spill objects to the external storage. Objects are specified
|
||||
by their object refs.
|
||||
|
||||
@@ -320,3 +320,11 @@ class RayParams:
|
||||
if numpy_major <= 1 and numpy_minor < 16:
|
||||
logger.warning("Using ray with numpy < 1.16.0 will result in slow "
|
||||
"serialization. Upgrade numpy if using with ray.")
|
||||
|
||||
# Make sure object spilling configuration is applicable.
|
||||
object_spilling_config = self.object_spilling_config or {}
|
||||
if object_spilling_config:
|
||||
from ray import external_storage
|
||||
# Validate external storage usage.
|
||||
external_storage.setup_external_storage(object_spilling_config)
|
||||
external_storage.reset_external_storage()
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import copy
|
||||
import json
|
||||
import random
|
||||
import platform
|
||||
@@ -9,19 +10,108 @@ import pytest
|
||||
import psutil
|
||||
import ray
|
||||
|
||||
bucket_name = "object-spilling-test"
|
||||
file_system_object_spilling_config = {
|
||||
"type": "filesystem",
|
||||
"params": {
|
||||
"directory_path": "/tmp"
|
||||
}
|
||||
}
|
||||
smart_open_object_spilling_config = {
|
||||
"type": "smart_open",
|
||||
"params": {
|
||||
"uri": f"s3://{bucket_name}/"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope="module",
|
||||
params=[
|
||||
file_system_object_spilling_config,
|
||||
# TODO(sang): Add a mock dependency to test S3.
|
||||
# smart_open_object_spilling_config,
|
||||
])
|
||||
def object_spilling_config(request):
|
||||
yield request.param
|
||||
|
||||
|
||||
@pytest.mark.skip("This test is for local benchmark.")
|
||||
def test_sample_benchmark(object_spilling_config, shutdown_only):
|
||||
# --Config values--
|
||||
max_io_workers = 10
|
||||
object_store_limit = 500 * 1024 * 1024
|
||||
eight_mb = 1024 * 1024
|
||||
object_size = 12 * eight_mb
|
||||
spill_cnt = 50
|
||||
|
||||
# Limit our object store to 200 MiB of memory.
|
||||
ray.init(
|
||||
object_store_memory=object_store_limit,
|
||||
_object_spilling_config=object_spilling_config,
|
||||
_system_config={
|
||||
"object_store_full_max_retries": 0,
|
||||
"max_io_workers": max_io_workers,
|
||||
})
|
||||
arr = np.random.rand(object_size)
|
||||
replay_buffer = []
|
||||
pinned_objects = set()
|
||||
|
||||
# Create objects of more than 200 MiB.
|
||||
spill_start = time.perf_counter()
|
||||
for _ in range(spill_cnt):
|
||||
ref = None
|
||||
while ref is None:
|
||||
try:
|
||||
ref = ray.put(arr)
|
||||
replay_buffer.append(ref)
|
||||
pinned_objects.add(ref)
|
||||
except ray.exceptions.ObjectStoreFullError:
|
||||
ref_to_spill = pinned_objects.pop()
|
||||
ray.experimental.force_spill_objects([ref_to_spill])
|
||||
spill_end = time.perf_counter()
|
||||
|
||||
# Make sure to remove unpinned objects.
|
||||
del pinned_objects
|
||||
restore_start = time.perf_counter()
|
||||
while replay_buffer:
|
||||
ref = replay_buffer.pop()
|
||||
sample = ray.get(ref) # noqa
|
||||
restore_end = time.perf_counter()
|
||||
|
||||
print(f"Object spilling benchmark for the config {object_spilling_config}")
|
||||
print(f"Spilling {spill_cnt} number of objects of size {object_size}B "
|
||||
f"takes {spill_end - spill_start} seconds with {max_io_workers} "
|
||||
"number of io workers.")
|
||||
print(f"Getting all objects takes {restore_end - restore_start} seconds.")
|
||||
|
||||
|
||||
def test_invalid_config_raises_exception(shutdown_only):
|
||||
# Make sure ray.init raises an exception before
|
||||
# it starts processes when invalid object spilling
|
||||
# config is given.
|
||||
with pytest.raises(ValueError):
|
||||
ray.init(_object_spilling_config={"type": "abc"})
|
||||
|
||||
with pytest.raises(Exception):
|
||||
copied_config = copy.deepcopy(file_system_object_spilling_config)
|
||||
# Add invalid params to the config.
|
||||
copied_config["params"].update({"random_arg": "abc"})
|
||||
ray.init(_object_spilling_config=copied_config)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
copied_config = copy.deepcopy(file_system_object_spilling_config)
|
||||
copied_config["params"].update({"directory_path": "not_exist_path"})
|
||||
ray.init(_object_spilling_config=copied_config)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
platform.system() == "Windows", reason="Failing on Windows.")
|
||||
def test_spill_objects_manually(shutdown_only):
|
||||
def test_spill_objects_manually(object_spilling_config, shutdown_only):
|
||||
# Limit our object store to 75 MiB of memory.
|
||||
ray.init(
|
||||
object_store_memory=75 * 1024 * 1024,
|
||||
_object_spilling_config={
|
||||
"type": "filesystem",
|
||||
"params": {
|
||||
"directory_path": "/tmp"
|
||||
}
|
||||
},
|
||||
_object_spilling_config=object_spilling_config,
|
||||
_system_config={
|
||||
"object_store_full_max_retries": 0,
|
||||
"max_io_workers": 4,
|
||||
@@ -66,16 +156,12 @@ def test_spill_objects_manually(shutdown_only):
|
||||
|
||||
@pytest.mark.skipif(
|
||||
platform.system() == "Windows", reason="Failing on Windows.")
|
||||
def test_spill_objects_manually_from_workers(shutdown_only):
|
||||
def test_spill_objects_manually_from_workers(object_spilling_config,
|
||||
shutdown_only):
|
||||
# Limit our object store to 100 MiB of memory.
|
||||
ray.init(
|
||||
object_store_memory=100 * 1024 * 1024,
|
||||
_object_spilling_config={
|
||||
"type": "filesystem",
|
||||
"params": {
|
||||
"directory_path": "/tmp"
|
||||
}
|
||||
},
|
||||
_object_spilling_config=object_spilling_config,
|
||||
_system_config={
|
||||
"object_store_full_max_retries": 0,
|
||||
"max_io_workers": 4,
|
||||
@@ -99,16 +185,12 @@ def test_spill_objects_manually_from_workers(shutdown_only):
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Not implemented yet.")
|
||||
def test_spill_objects_manually_with_workers(shutdown_only):
|
||||
def test_spill_objects_manually_with_workers(object_spilling_config,
|
||||
shutdown_only):
|
||||
# Limit our object store to 75 MiB of memory.
|
||||
ray.init(
|
||||
object_store_memory=100 * 1024 * 1024,
|
||||
_object_spilling_config={
|
||||
"type": "filesystem",
|
||||
"params": {
|
||||
"directory_path": "/tmp"
|
||||
}
|
||||
},
|
||||
_object_spilling_config=object_spilling_config,
|
||||
_system_config={
|
||||
"object_store_full_max_retries": 0,
|
||||
"max_io_workers": 4,
|
||||
@@ -128,32 +210,23 @@ def test_spill_objects_manually_with_workers(shutdown_only):
|
||||
|
||||
@pytest.mark.skipif(
|
||||
platform.system() == "Windows", reason="Failing on Windows.")
|
||||
@pytest.mark.parametrize(
|
||||
"ray_start_cluster_head", [{
|
||||
"num_cpus": 0,
|
||||
"object_store_memory": 75 * 1024 * 1024,
|
||||
"object_spilling_config": {
|
||||
"type": "filesystem",
|
||||
"params": {
|
||||
"directory_path": "/tmp"
|
||||
}
|
||||
},
|
||||
"_system_config": {
|
||||
def test_spill_remote_object(object_spilling_config, ray_start_cluster):
|
||||
cluster = ray_start_cluster
|
||||
# # Head node.
|
||||
cluster.add_node(
|
||||
num_cpus=0,
|
||||
object_store_memory=75 * 1024 * 1024,
|
||||
object_spilling_config=object_spilling_config,
|
||||
_system_config={
|
||||
"object_store_full_max_retries": 0,
|
||||
"max_io_workers": 4,
|
||||
},
|
||||
}],
|
||||
indirect=True)
|
||||
def test_spill_remote_object(ray_start_cluster_head):
|
||||
cluster = ray_start_cluster_head
|
||||
})
|
||||
# Worker nodes.
|
||||
cluster.add_node(
|
||||
object_store_memory=75 * 1024 * 1024,
|
||||
object_spilling_config={
|
||||
"type": "filesystem",
|
||||
"params": {
|
||||
"directory_path": "/tmp"
|
||||
}
|
||||
})
|
||||
object_spilling_config=object_spilling_config)
|
||||
cluster.wait_for_nodes()
|
||||
ray.init(address=cluster.address)
|
||||
|
||||
@ray.remote
|
||||
def put():
|
||||
|
||||
@@ -70,3 +70,4 @@ testfixtures
|
||||
werkzeug
|
||||
xlrd
|
||||
starlette
|
||||
smart_open[s3]
|
||||
|
||||
Reference in New Issue
Block a user