From 71c5089854e5e92c8e0f72585e372f5bb65d0f18 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 30 Oct 2020 14:47:07 -0700 Subject: [PATCH] [Object Spilling] Initial Iteration of S3 adapter. (#11379) * Finished the first iteration. * Removed unnecessary code. * Smartopen impl. * Make sure tests passed. * Addressed code review. * Addressed code review. * Fix issues. * Fix issues. --- python/ray/external_storage.py | 105 ++++++++++++++- python/ray/parameter.py | 8 ++ python/ray/tests/test_object_spilling.py | 159 +++++++++++++++++------ python/requirements.txt | 1 + 4 files changed, 229 insertions(+), 44 deletions(-) diff --git a/python/ray/external_storage.py b/python/ray/external_storage.py index 093ab2bd4..825906328 100644 --- a/python/ray/external_storage.py +++ b/python/ray/external_storage.py @@ -1,6 +1,7 @@ import abc import os from typing import List + import ray @@ -10,6 +11,14 @@ class ExternalStorage(metaclass=abc.ABCMeta): This class provides some useful functions for zero-copy object put/get from plasma store. Also it specifies the interface for object spilling. + + When inheriting this class, please make sure to implement validation + logic inside __init__ method. When ray instance starts, it will + instantiating external storage to validate the config. + + Raises: + ValueError: when given configuration for + the external storage is invalid. """ def _get_objects_from_store(self, object_refs): @@ -58,11 +67,19 @@ class NullStorage(ExternalStorage): class FileSystemStorage(ExternalStorage): - """The class for filesystem-like external storage.""" + """The class for filesystem-like external storage. + + Raises: + ValueError: Raises directory path to + spill objects doesn't exist. + """ def __init__(self, directory_path): self.directory_path = directory_path self.prefix = "ray_spilled_object_" + if not os.path.exists(self.directory_path): + raise ValueError("The given directory path to store objects, " + f"{self.directory_path}, doesn't exist.") def spill_objects(self, object_refs): keys = [] @@ -91,6 +108,84 @@ class FileSystemStorage(ExternalStorage): self._put_object_to_store(metadata, buf_len, f, ref) +class ExternalStorageSmartOpenImpl(ExternalStorage): + """The external storage class implemented by smart_open. + (https://github.com/RaRe-Technologies/smart_open) + + Smart open supports multiple backend with the same APIs. + + Args: + uri(str): Storage URI used for smart open. + prefix(str): Prefix of objects that are stored. + override_transport_params(dict): Overriding the default value of + transport_params for smart-open library. + + Raises: + ModuleNotFoundError: If it fails to setup. + For example, if smart open library + is not downloaded, this will fail. + """ + + def __init__(self, + uri: str, + prefix: str = "ray_spilled_object_", + override_transport_params: dict = None): + try: + from smart_open import open # noqa + except ModuleNotFoundError as e: + raise ModuleNotFoundError( + "Smart open is chosen to be a object spilling " + "external storage, but smart_open " + f"is not downloaded. Original error: {e}") + + self.uri = uri.strip("/") + self.prefix = prefix + self.override_transport_params = override_transport_params or {} + self.transport_params = {}.update(self.override_transport_params) + + def spill_objects(self, object_refs): + keys = [] + ray_object_pairs = self._get_objects_from_store(object_refs) + for ref, (buf, metadata) in zip(object_refs, ray_object_pairs): + key = self.prefix + ref.hex() + self._spill_object(key, ref, buf, metadata) + keys.append(key.encode()) + return keys + + def restore_spilled_objects(self, keys): + for k in keys: + key = k.decode() + ref = ray.ObjectRef(bytes.fromhex(key[len(self.prefix):])) + self._restore_spilled_object(key, ref) + + def _spill_object(self, key, ref, buf, metadata): + from smart_open import open + with open( + self._build_uri(key), "wb", + transport_params=self.transport_params) as file_like: + metadata_len = len(metadata) + buf_len = len(buf) + file_like.write(metadata_len.to_bytes(8, byteorder="little")) + file_like.write(buf_len.to_bytes(8, byteorder="little")) + file_like.write(metadata) + file_like.write(memoryview(buf)) + + def _restore_spilled_object(self, key, ref): + from smart_open import open + with open( + self._build_uri(key), "rb", + transport_params=self.transport_params) as file_like: + metadata_len = int.from_bytes( + file_like.read(8), byteorder="little") + buf_len = int.from_bytes(file_like.read(8), byteorder="little") + metadata = file_like.read(metadata_len) + # read remaining data to our buffer + self._put_object_to_store(metadata, buf_len, file_like, ref) + + def _build_uri(self, key): + return f"{self.uri}/{key}" + + _external_storage = NullStorage() @@ -101,12 +196,20 @@ def setup_external_storage(config): storage_type = config["type"] if storage_type == "filesystem": _external_storage = FileSystemStorage(**config["params"]) + elif storage_type == "smart_open": + _external_storage = ExternalStorageSmartOpenImpl( + **config["params"]) else: raise ValueError(f"Unknown external storage type: {storage_type}") else: _external_storage = NullStorage() +def reset_external_storage(): + global _external_storage + _external_storage = NullStorage() + + def spill_objects(object_refs): """Spill objects to the external storage. Objects are specified by their object refs. diff --git a/python/ray/parameter.py b/python/ray/parameter.py index 52da19f04..24193dcdb 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -320,3 +320,11 @@ class RayParams: if numpy_major <= 1 and numpy_minor < 16: logger.warning("Using ray with numpy < 1.16.0 will result in slow " "serialization. Upgrade numpy if using with ray.") + + # Make sure object spilling configuration is applicable. + object_spilling_config = self.object_spilling_config or {} + if object_spilling_config: + from ray import external_storage + # Validate external storage usage. + external_storage.setup_external_storage(object_spilling_config) + external_storage.reset_external_storage() diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index d1cf613f8..de8237395 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -1,3 +1,4 @@ +import copy import json import random import platform @@ -9,19 +10,108 @@ import pytest import psutil import ray +bucket_name = "object-spilling-test" +file_system_object_spilling_config = { + "type": "filesystem", + "params": { + "directory_path": "/tmp" + } +} +smart_open_object_spilling_config = { + "type": "smart_open", + "params": { + "uri": f"s3://{bucket_name}/" + } +} + + +@pytest.fixture( + scope="module", + params=[ + file_system_object_spilling_config, + # TODO(sang): Add a mock dependency to test S3. + # smart_open_object_spilling_config, + ]) +def object_spilling_config(request): + yield request.param + + +@pytest.mark.skip("This test is for local benchmark.") +def test_sample_benchmark(object_spilling_config, shutdown_only): + # --Config values-- + max_io_workers = 10 + object_store_limit = 500 * 1024 * 1024 + eight_mb = 1024 * 1024 + object_size = 12 * eight_mb + spill_cnt = 50 + + # Limit our object store to 200 MiB of memory. + ray.init( + object_store_memory=object_store_limit, + _object_spilling_config=object_spilling_config, + _system_config={ + "object_store_full_max_retries": 0, + "max_io_workers": max_io_workers, + }) + arr = np.random.rand(object_size) + replay_buffer = [] + pinned_objects = set() + + # Create objects of more than 200 MiB. + spill_start = time.perf_counter() + for _ in range(spill_cnt): + ref = None + while ref is None: + try: + ref = ray.put(arr) + replay_buffer.append(ref) + pinned_objects.add(ref) + except ray.exceptions.ObjectStoreFullError: + ref_to_spill = pinned_objects.pop() + ray.experimental.force_spill_objects([ref_to_spill]) + spill_end = time.perf_counter() + + # Make sure to remove unpinned objects. + del pinned_objects + restore_start = time.perf_counter() + while replay_buffer: + ref = replay_buffer.pop() + sample = ray.get(ref) # noqa + restore_end = time.perf_counter() + + print(f"Object spilling benchmark for the config {object_spilling_config}") + print(f"Spilling {spill_cnt} number of objects of size {object_size}B " + f"takes {spill_end - spill_start} seconds with {max_io_workers} " + "number of io workers.") + print(f"Getting all objects takes {restore_end - restore_start} seconds.") + + +def test_invalid_config_raises_exception(shutdown_only): + # Make sure ray.init raises an exception before + # it starts processes when invalid object spilling + # config is given. + with pytest.raises(ValueError): + ray.init(_object_spilling_config={"type": "abc"}) + + with pytest.raises(Exception): + copied_config = copy.deepcopy(file_system_object_spilling_config) + # Add invalid params to the config. + copied_config["params"].update({"random_arg": "abc"}) + ray.init(_object_spilling_config=copied_config) + + with pytest.raises(ValueError): + copied_config = copy.deepcopy(file_system_object_spilling_config) + copied_config["params"].update({"directory_path": "not_exist_path"}) + ray.init(_object_spilling_config=copied_config) + @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") -def test_spill_objects_manually(shutdown_only): +def test_spill_objects_manually(object_spilling_config, shutdown_only): # Limit our object store to 75 MiB of memory. ray.init( object_store_memory=75 * 1024 * 1024, - _object_spilling_config={ - "type": "filesystem", - "params": { - "directory_path": "/tmp" - } - }, + _object_spilling_config=object_spilling_config, _system_config={ "object_store_full_max_retries": 0, "max_io_workers": 4, @@ -66,16 +156,12 @@ def test_spill_objects_manually(shutdown_only): @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") -def test_spill_objects_manually_from_workers(shutdown_only): +def test_spill_objects_manually_from_workers(object_spilling_config, + shutdown_only): # Limit our object store to 100 MiB of memory. ray.init( object_store_memory=100 * 1024 * 1024, - _object_spilling_config={ - "type": "filesystem", - "params": { - "directory_path": "/tmp" - } - }, + _object_spilling_config=object_spilling_config, _system_config={ "object_store_full_max_retries": 0, "max_io_workers": 4, @@ -99,16 +185,12 @@ def test_spill_objects_manually_from_workers(shutdown_only): @pytest.mark.skip(reason="Not implemented yet.") -def test_spill_objects_manually_with_workers(shutdown_only): +def test_spill_objects_manually_with_workers(object_spilling_config, + shutdown_only): # Limit our object store to 75 MiB of memory. ray.init( object_store_memory=100 * 1024 * 1024, - _object_spilling_config={ - "type": "filesystem", - "params": { - "directory_path": "/tmp" - } - }, + _object_spilling_config=object_spilling_config, _system_config={ "object_store_full_max_retries": 0, "max_io_workers": 4, @@ -128,32 +210,23 @@ def test_spill_objects_manually_with_workers(shutdown_only): @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") -@pytest.mark.parametrize( - "ray_start_cluster_head", [{ - "num_cpus": 0, - "object_store_memory": 75 * 1024 * 1024, - "object_spilling_config": { - "type": "filesystem", - "params": { - "directory_path": "/tmp" - } - }, - "_system_config": { +def test_spill_remote_object(object_spilling_config, ray_start_cluster): + cluster = ray_start_cluster + # # Head node. + cluster.add_node( + num_cpus=0, + object_store_memory=75 * 1024 * 1024, + object_spilling_config=object_spilling_config, + _system_config={ "object_store_full_max_retries": 0, "max_io_workers": 4, - }, - }], - indirect=True) -def test_spill_remote_object(ray_start_cluster_head): - cluster = ray_start_cluster_head + }) + # Worker nodes. cluster.add_node( object_store_memory=75 * 1024 * 1024, - object_spilling_config={ - "type": "filesystem", - "params": { - "directory_path": "/tmp" - } - }) + object_spilling_config=object_spilling_config) + cluster.wait_for_nodes() + ray.init(address=cluster.address) @ray.remote def put(): diff --git a/python/requirements.txt b/python/requirements.txt index 1072e6e74..3db21fe97 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -70,3 +70,4 @@ testfixtures werkzeug xlrd starlette +smart_open[s3]