From d2963f4ee13c8c32f83fb2c6dcb91ff812d37990 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 26 Jan 2021 23:10:29 -0800 Subject: [PATCH] [Object Spilling] Clean up FS storage upon sigint for ray.init(). (#13649) * Initial iteration done. * Remove unnecessary messages. * Addressed code review. * Addressed code review. * fix issues. * addressed code review. * Addressed the last code review. --- python/ray/external_storage.py | 43 +++++++- python/ray/node.py | 12 +++ python/ray/tests/test_object_spilling.py | 129 ++++++++++++++++------- python/ray/worker.py | 2 + 4 files changed, 146 insertions(+), 40 deletions(-) diff --git a/python/ray/external_storage.py b/python/ray/external_storage.py index 6e1635148..f764e9c0f 100644 --- a/python/ray/external_storage.py +++ b/python/ray/external_storage.py @@ -1,5 +1,7 @@ import abc +import logging import os +import shutil import urllib from collections import namedtuple from typing import List, IO, Tuple @@ -9,6 +11,7 @@ from ray.ray_constants import DEFAULT_OBJECT_PREFIX from ray._raylet import ObjectRef ParsedURL = namedtuple("ParsedURL", "base_url, offset, size") +logger = logging.getLogger(__name__) def create_url_with_offset(*, url: str, offset: int, size: int) -> str: @@ -176,6 +179,14 @@ class ExternalStorage(metaclass=abc.ABCMeta): urls: URLs that store spilled object files. """ + @abc.abstractmethod + def destroy_external_storage(self): + """Destroy external storage when a head node is down. + + NOTE: This is currently working when the cluster is + started by ray.init + """ + class NullStorage(ExternalStorage): """The class that represents an uninitialized external storage.""" @@ -189,6 +200,9 @@ class NullStorage(ExternalStorage): def delete_spilled_objects(self, urls: List[str]): raise NotImplementedError("External storage is not initialized") + def destroy_external_storage(self): + raise NotImplementedError("External storage is not initialized") + class FileSystemStorage(ExternalStorage): """The class for filesystem-like external storage. @@ -199,8 +213,8 @@ class FileSystemStorage(ExternalStorage): """ def __init__(self, directory_path): - self.directory_path = directory_path - self.prefix = DEFAULT_OBJECT_PREFIX + self.spill_dir_name = DEFAULT_OBJECT_PREFIX + self.directory_path = os.path.join(directory_path, self.spill_dir_name) os.makedirs(self.directory_path, exist_ok=True) if not os.path.exists(self.directory_path): raise ValueError("The given directory path to store objects, " @@ -211,7 +225,7 @@ class FileSystemStorage(ExternalStorage): return [] # Always use the first object ref as a key when fusioning objects. first_ref = object_refs[0] - filename = f"{self.prefix}-{first_ref.hex()}-multi-{len(object_refs)}" + filename = f"{first_ref.hex()}-multi-{len(object_refs)}" url = f"{os.path.join(self.directory_path, filename)}" with open(url, "wb") as f: return self._write_multiple_objects(f, object_refs, url) @@ -243,6 +257,25 @@ class FileSystemStorage(ExternalStorage): filename = parse_url_with_offset(url.decode()).base_url os.remove(os.path.join(self.directory_path, filename)) + def destroy_external_storage(self): + # Q: Should we add stdout here to + # indicate we are deleting a directory? + + # There's a race condition where IO workers are still + # deleting each objects while we try deleting the + # whole directory. So we should keep trying it until + # The directory is actually deleted. + while os.path.isdir(self.directory_path): + try: + shutil.rmtree(self.directory_path) + except FileNotFoundError: + # If excpetion occurs when other IO workers are + # deleting the file at the same time. + pass + except Exception: + logger.exception("Error cleaning up spill files") + break + class ExternalStorageSmartOpenImpl(ExternalStorage): """The external storage class implemented by smart_open. @@ -331,6 +364,9 @@ class ExternalStorageSmartOpenImpl(ExternalStorage): def delete_spilled_objects(self, urls: List[str]): pass + def destroy_external_storage(self): + pass + _external_storage = NullStorage() @@ -353,6 +389,7 @@ def setup_external_storage(config): raise ValueError(f"Unknown external storage type: {storage_type}") else: _external_storage = NullStorage() + return _external_storage def reset_external_storage(): diff --git a/python/ray/node.py b/python/ray/node.py index 9130b39fb..2668d9aa0 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -421,6 +421,9 @@ class Node: "metrics_export_port": self._metrics_export_port } + def is_head(self): + return self.head + def create_redis_client(self): """Create a redis client.""" return ray._private.services.create_redis_client( @@ -1152,3 +1155,12 @@ class Node: True if any process that wasn't explicitly killed is still alive. """ return not any(self.dead_processes()) + + def destroy_external_storage(self): + object_spilling_config = self._config.get("object_spilling_config", {}) + if object_spilling_config: + object_spilling_config = json.loads(object_spilling_config) + from ray import external_storage + storage = external_storage.setup_external_storage( + object_spilling_config) + storage.destroy_external_storage() diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index a80a91580..3f5b5f7ae 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -3,6 +3,7 @@ import json import os import random import platform +import subprocess import sys import numpy as np @@ -10,7 +11,7 @@ import pytest import ray from ray.external_storage import (create_url_with_offset, parse_url_with_offset) -from ray.test_utils import wait_for_condition +from ray.test_utils import wait_for_condition, run_string_as_driver from ray.internal.internal_api import memory_summary bucket_name = "object-spilling-test" @@ -68,6 +69,17 @@ def multi_node_object_spilling_config(request, tmp_path): yield create_object_spilling_config(request, tmp_path) +def is_dir_empty(temp_folder, + append_path=ray.ray_constants.DEFAULT_OBJECT_PREFIX): + # append_path is used because the file based spilling will append + # new directory path. + num_files = 0 + temp_folder = temp_folder / append_path + for path in temp_folder.iterdir(): + num_files += 1 + return num_files == 0 + + def test_invalid_config_raises_exception(shutdown_only): # Make sure ray.init raises an exception before # it starts processes when invalid object spilling @@ -120,13 +132,7 @@ def test_spilling_not_done_for_pinned_object(object_spilling_config, with pytest.raises(ray.exceptions.ObjectStoreFullError): ref2 = ray.put(arr) # noqa - def is_dir_empty(): - num_files = 0 - for path in temp_folder.iterdir(): - num_files += 1 - return num_files == 0 - - wait_for_condition(is_dir_empty) + wait_for_condition(lambda: is_dir_empty(temp_folder)) @pytest.mark.skipif( @@ -203,7 +209,7 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only): ref = ray.put(arr) replay_buffer.append(ref) solution_buffer.append(arr) - + print("spill done.") # randomly sample objects for _ in range(1000): index = random.choice(list(range(buffer_length))) @@ -317,6 +323,7 @@ def test_spill_deadlock(object_spilling_config, shutdown_only): def test_delete_objects(object_spilling_config, shutdown_only): # Limit our object store to 75 MiB of memory. object_spilling_config, temp_folder = object_spilling_config + ray.init( object_store_memory=75 * 1024 * 1024, _system_config={ @@ -337,15 +344,9 @@ def test_delete_objects(object_spilling_config, shutdown_only): print("-----------------------------------") - def is_dir_empty(): - num_files = 0 - for path in temp_folder.iterdir(): - num_files += 1 - return num_files == 0 - del replay_buffer del ref - wait_for_condition(is_dir_empty) + wait_for_condition(lambda: is_dir_empty(temp_folder)) @pytest.mark.skipif( @@ -354,6 +355,7 @@ def test_delete_objects_delete_while_creating(object_spilling_config, shutdown_only): # Limit our object store to 75 MiB of memory. object_spilling_config, temp_folder = object_spilling_config + ray.init( object_store_memory=75 * 1024 * 1024, _system_config={ @@ -381,16 +383,10 @@ def test_delete_objects_delete_while_creating(object_spilling_config, sample = ray.get(ref, timeout=0) assert np.array_equal(sample, arr) - def is_dir_empty(): - num_files = 0 - for path in temp_folder.iterdir(): - num_files += 1 - return num_files == 0 - # After all, make sure all objects are killed without race condition. del replay_buffer del ref - wait_for_condition(is_dir_empty, timeout=1000) + wait_for_condition(lambda: is_dir_empty(temp_folder)) @pytest.mark.skipif( @@ -399,6 +395,7 @@ def test_delete_objects_on_worker_failure(object_spilling_config, shutdown_only): # Limit our object store to 75 MiB of memory. object_spilling_config, temp_folder = object_spilling_config + ray.init( object_store_memory=75 * 1024 * 1024, _system_config={ @@ -449,14 +446,8 @@ def test_delete_objects_on_worker_failure(object_spilling_config, wait_for_condition(wait_until_actor_dead) - def is_dir_empty(): - num_files = 0 - for path in temp_folder.iterdir(): - num_files += 1 - return num_files == 0 - # After all, make sure all objects are deleted upon worker failures. - wait_for_condition(is_dir_empty, timeout=1000) + wait_for_condition(lambda: is_dir_empty(temp_folder)) @pytest.mark.skipif( @@ -465,6 +456,7 @@ def test_delete_objects_multi_node(multi_node_object_spilling_config, ray_start_cluster): # Limit our object store to 75 MiB of memory. object_spilling_config, temp_folder = multi_node_object_spilling_config + cluster = ray_start_cluster # Head node. cluster.add_node( @@ -518,18 +510,12 @@ def test_delete_objects_multi_node(multi_node_object_spilling_config, return True return False - def is_dir_empty(): - num_files = 0 - for path in temp_folder.iterdir(): - num_files += 1 - return num_files == 0 - # Kill actors to remove all references. for actor in actors: ray.kill(actor) wait_for_condition(lambda: wait_until_actor_dead(actor)) # The multi node deletion should work. - wait_for_condition(is_dir_empty) + wait_for_condition(lambda: is_dir_empty(temp_folder)) @pytest.mark.skipif(platform.system() == "Windows", reason="Flaky on Windows.") @@ -570,6 +556,9 @@ def test_fusion_objects(object_spilling_config, shutdown_only): assert np.array_equal(sample, solution) is_test_passing = False + # Since we'd like to see the temp directory that stores the files, + # we need to append this directory. + temp_folder = temp_folder / ray.ray_constants.DEFAULT_OBJECT_PREFIX for path in temp_folder.iterdir(): file_size = path.stat().st_size # Make sure there are at least one @@ -691,5 +680,71 @@ def test_spill_objects_on_object_transfer(object_spilling_config, ray.get(tasks) +@pytest.mark.skipif( + platform.system() in ["Windows"], reason="Failing on " + "Windows and Mac.") +def test_file_deleted_when_driver_exits(tmp_path, shutdown_only): + # Limit our object store to 75 MiB of memory. + temp_folder = tmp_path / "spill" + temp_folder.mkdir() + + driver = """ +import json +import os +import signal +import numpy as np + +import ray + +ray.init( + object_store_memory=75 * 1024 * 1024, + _system_config={{ + "max_io_workers": 2, + "min_spilling_size": 0, + "automatic_object_spilling_enabled": True, + "object_store_full_delay_ms": 100, + "object_spilling_config": json.dumps({{ + "type": "filesystem", + "params": {{ + "directory_path": "{temp_dir}" + }} + }}), + }}) +arr = np.random.rand(1024 * 1024) # 8 MB data +replay_buffer = [] + +# Spill lots of objects +for _ in range(30): + ref = None + while ref is None: + ref = ray.put(arr) + replay_buffer.append(ref) +# Send sigterm to itself. +signum = {signum} +sig = None +if signum == 2: + sig = signal.SIGINT +elif signum == 15: + sig = signal.SIGTERM +os.kill(os.getpid(), sig) +""" + + # Run a driver with sigint. + print("Sending sigint...") + with pytest.raises(subprocess.CalledProcessError): + print( + run_string_as_driver( + driver.format(temp_dir=str(temp_folder), signum=2))) + wait_for_condition(lambda: is_dir_empty(temp_folder, append_path="")) + + # Q: Looks like Sigterm doesn't work with Ray? + # print("Sending sigterm...") + # # Run a driver with sigterm. + # with pytest.raises(subprocess.CalledProcessError): + # print(run_string_as_driver( + # driver.format(temp_dir=str(temp_folder), signum=15))) + # wait_for_condition(is_dir_empty, timeout=1000) + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/worker.py b/python/ray/worker.py index 350bbc649..337b4ffc9 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -818,6 +818,8 @@ def shutdown(_exiting_interpreter=False): # Shut down the Ray processes. global _global_node if _global_node is not None: + if _global_node.is_head(): + _global_node.destroy_external_storage() _global_node.kill_all_processes(check_alive=False, allow_graceful=True) _global_node = None