[Object Spilling] Clean up FS storage upon sigint for ray.init(). (#13649)

* Initial iteration done.

* Remove unnecessary messages.

* Addressed code review.

* Addressed code review.

* fix issues.

* addressed code review.

* Addressed the last code review.
This commit is contained in:
SangBin Cho
2021-01-26 23:10:29 -08:00
committed by GitHub
parent 8baafacb1e
commit d2963f4ee1
4 changed files with 146 additions and 40 deletions
+40 -3
View File
@@ -1,5 +1,7 @@
import abc
import logging
import os
import shutil
import urllib
from collections import namedtuple
from typing import List, IO, Tuple
@@ -9,6 +11,7 @@ from ray.ray_constants import DEFAULT_OBJECT_PREFIX
from ray._raylet import ObjectRef
ParsedURL = namedtuple("ParsedURL", "base_url, offset, size")
logger = logging.getLogger(__name__)
def create_url_with_offset(*, url: str, offset: int, size: int) -> str:
@@ -176,6 +179,14 @@ class ExternalStorage(metaclass=abc.ABCMeta):
urls: URLs that store spilled object files.
"""
@abc.abstractmethod
def destroy_external_storage(self):
"""Destroy external storage when a head node is down.
NOTE: This is currently working when the cluster is
started by ray.init
"""
class NullStorage(ExternalStorage):
"""The class that represents an uninitialized external storage."""
@@ -189,6 +200,9 @@ class NullStorage(ExternalStorage):
def delete_spilled_objects(self, urls: List[str]):
raise NotImplementedError("External storage is not initialized")
def destroy_external_storage(self):
raise NotImplementedError("External storage is not initialized")
class FileSystemStorage(ExternalStorage):
"""The class for filesystem-like external storage.
@@ -199,8 +213,8 @@ class FileSystemStorage(ExternalStorage):
"""
def __init__(self, directory_path):
self.directory_path = directory_path
self.prefix = DEFAULT_OBJECT_PREFIX
self.spill_dir_name = DEFAULT_OBJECT_PREFIX
self.directory_path = os.path.join(directory_path, self.spill_dir_name)
os.makedirs(self.directory_path, exist_ok=True)
if not os.path.exists(self.directory_path):
raise ValueError("The given directory path to store objects, "
@@ -211,7 +225,7 @@ class FileSystemStorage(ExternalStorage):
return []
# Always use the first object ref as a key when fusioning objects.
first_ref = object_refs[0]
filename = f"{self.prefix}-{first_ref.hex()}-multi-{len(object_refs)}"
filename = f"{first_ref.hex()}-multi-{len(object_refs)}"
url = f"{os.path.join(self.directory_path, filename)}"
with open(url, "wb") as f:
return self._write_multiple_objects(f, object_refs, url)
@@ -243,6 +257,25 @@ class FileSystemStorage(ExternalStorage):
filename = parse_url_with_offset(url.decode()).base_url
os.remove(os.path.join(self.directory_path, filename))
def destroy_external_storage(self):
# Q: Should we add stdout here to
# indicate we are deleting a directory?
# There's a race condition where IO workers are still
# deleting each objects while we try deleting the
# whole directory. So we should keep trying it until
# The directory is actually deleted.
while os.path.isdir(self.directory_path):
try:
shutil.rmtree(self.directory_path)
except FileNotFoundError:
# If excpetion occurs when other IO workers are
# deleting the file at the same time.
pass
except Exception:
logger.exception("Error cleaning up spill files")
break
class ExternalStorageSmartOpenImpl(ExternalStorage):
"""The external storage class implemented by smart_open.
@@ -331,6 +364,9 @@ class ExternalStorageSmartOpenImpl(ExternalStorage):
def delete_spilled_objects(self, urls: List[str]):
pass
def destroy_external_storage(self):
pass
_external_storage = NullStorage()
@@ -353,6 +389,7 @@ def setup_external_storage(config):
raise ValueError(f"Unknown external storage type: {storage_type}")
else:
_external_storage = NullStorage()
return _external_storage
def reset_external_storage():
+12
View File
@@ -421,6 +421,9 @@ class Node:
"metrics_export_port": self._metrics_export_port
}
def is_head(self):
return self.head
def create_redis_client(self):
"""Create a redis client."""
return ray._private.services.create_redis_client(
@@ -1152,3 +1155,12 @@ class Node:
True if any process that wasn't explicitly killed is still alive.
"""
return not any(self.dead_processes())
def destroy_external_storage(self):
object_spilling_config = self._config.get("object_spilling_config", {})
if object_spilling_config:
object_spilling_config = json.loads(object_spilling_config)
from ray import external_storage
storage = external_storage.setup_external_storage(
object_spilling_config)
storage.destroy_external_storage()
+92 -37
View File
@@ -3,6 +3,7 @@ import json
import os
import random
import platform
import subprocess
import sys
import numpy as np
@@ -10,7 +11,7 @@ import pytest
import ray
from ray.external_storage import (create_url_with_offset,
parse_url_with_offset)
from ray.test_utils import wait_for_condition
from ray.test_utils import wait_for_condition, run_string_as_driver
from ray.internal.internal_api import memory_summary
bucket_name = "object-spilling-test"
@@ -68,6 +69,17 @@ def multi_node_object_spilling_config(request, tmp_path):
yield create_object_spilling_config(request, tmp_path)
def is_dir_empty(temp_folder,
append_path=ray.ray_constants.DEFAULT_OBJECT_PREFIX):
# append_path is used because the file based spilling will append
# new directory path.
num_files = 0
temp_folder = temp_folder / append_path
for path in temp_folder.iterdir():
num_files += 1
return num_files == 0
def test_invalid_config_raises_exception(shutdown_only):
# Make sure ray.init raises an exception before
# it starts processes when invalid object spilling
@@ -120,13 +132,7 @@ def test_spilling_not_done_for_pinned_object(object_spilling_config,
with pytest.raises(ray.exceptions.ObjectStoreFullError):
ref2 = ray.put(arr) # noqa
def is_dir_empty():
num_files = 0
for path in temp_folder.iterdir():
num_files += 1
return num_files == 0
wait_for_condition(is_dir_empty)
wait_for_condition(lambda: is_dir_empty(temp_folder))
@pytest.mark.skipif(
@@ -203,7 +209,7 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only):
ref = ray.put(arr)
replay_buffer.append(ref)
solution_buffer.append(arr)
print("spill done.")
# randomly sample objects
for _ in range(1000):
index = random.choice(list(range(buffer_length)))
@@ -317,6 +323,7 @@ def test_spill_deadlock(object_spilling_config, shutdown_only):
def test_delete_objects(object_spilling_config, shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, temp_folder = object_spilling_config
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
@@ -337,15 +344,9 @@ def test_delete_objects(object_spilling_config, shutdown_only):
print("-----------------------------------")
def is_dir_empty():
num_files = 0
for path in temp_folder.iterdir():
num_files += 1
return num_files == 0
del replay_buffer
del ref
wait_for_condition(is_dir_empty)
wait_for_condition(lambda: is_dir_empty(temp_folder))
@pytest.mark.skipif(
@@ -354,6 +355,7 @@ def test_delete_objects_delete_while_creating(object_spilling_config,
shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, temp_folder = object_spilling_config
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
@@ -381,16 +383,10 @@ def test_delete_objects_delete_while_creating(object_spilling_config,
sample = ray.get(ref, timeout=0)
assert np.array_equal(sample, arr)
def is_dir_empty():
num_files = 0
for path in temp_folder.iterdir():
num_files += 1
return num_files == 0
# After all, make sure all objects are killed without race condition.
del replay_buffer
del ref
wait_for_condition(is_dir_empty, timeout=1000)
wait_for_condition(lambda: is_dir_empty(temp_folder))
@pytest.mark.skipif(
@@ -399,6 +395,7 @@ def test_delete_objects_on_worker_failure(object_spilling_config,
shutdown_only):
# Limit our object store to 75 MiB of memory.
object_spilling_config, temp_folder = object_spilling_config
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={
@@ -449,14 +446,8 @@ def test_delete_objects_on_worker_failure(object_spilling_config,
wait_for_condition(wait_until_actor_dead)
def is_dir_empty():
num_files = 0
for path in temp_folder.iterdir():
num_files += 1
return num_files == 0
# After all, make sure all objects are deleted upon worker failures.
wait_for_condition(is_dir_empty, timeout=1000)
wait_for_condition(lambda: is_dir_empty(temp_folder))
@pytest.mark.skipif(
@@ -465,6 +456,7 @@ def test_delete_objects_multi_node(multi_node_object_spilling_config,
ray_start_cluster):
# Limit our object store to 75 MiB of memory.
object_spilling_config, temp_folder = multi_node_object_spilling_config
cluster = ray_start_cluster
# Head node.
cluster.add_node(
@@ -518,18 +510,12 @@ def test_delete_objects_multi_node(multi_node_object_spilling_config,
return True
return False
def is_dir_empty():
num_files = 0
for path in temp_folder.iterdir():
num_files += 1
return num_files == 0
# Kill actors to remove all references.
for actor in actors:
ray.kill(actor)
wait_for_condition(lambda: wait_until_actor_dead(actor))
# The multi node deletion should work.
wait_for_condition(is_dir_empty)
wait_for_condition(lambda: is_dir_empty(temp_folder))
@pytest.mark.skipif(platform.system() == "Windows", reason="Flaky on Windows.")
@@ -570,6 +556,9 @@ def test_fusion_objects(object_spilling_config, shutdown_only):
assert np.array_equal(sample, solution)
is_test_passing = False
# Since we'd like to see the temp directory that stores the files,
# we need to append this directory.
temp_folder = temp_folder / ray.ray_constants.DEFAULT_OBJECT_PREFIX
for path in temp_folder.iterdir():
file_size = path.stat().st_size
# Make sure there are at least one
@@ -691,5 +680,71 @@ def test_spill_objects_on_object_transfer(object_spilling_config,
ray.get(tasks)
@pytest.mark.skipif(
platform.system() in ["Windows"], reason="Failing on "
"Windows and Mac.")
def test_file_deleted_when_driver_exits(tmp_path, shutdown_only):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"
temp_folder.mkdir()
driver = """
import json
import os
import signal
import numpy as np
import ray
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={{
"max_io_workers": 2,
"min_spilling_size": 0,
"automatic_object_spilling_enabled": True,
"object_store_full_delay_ms": 100,
"object_spilling_config": json.dumps({{
"type": "filesystem",
"params": {{
"directory_path": "{temp_dir}"
}}
}}),
}})
arr = np.random.rand(1024 * 1024) # 8 MB data
replay_buffer = []
# Spill lots of objects
for _ in range(30):
ref = None
while ref is None:
ref = ray.put(arr)
replay_buffer.append(ref)
# Send sigterm to itself.
signum = {signum}
sig = None
if signum == 2:
sig = signal.SIGINT
elif signum == 15:
sig = signal.SIGTERM
os.kill(os.getpid(), sig)
"""
# Run a driver with sigint.
print("Sending sigint...")
with pytest.raises(subprocess.CalledProcessError):
print(
run_string_as_driver(
driver.format(temp_dir=str(temp_folder), signum=2)))
wait_for_condition(lambda: is_dir_empty(temp_folder, append_path=""))
# Q: Looks like Sigterm doesn't work with Ray?
# print("Sending sigterm...")
# # Run a driver with sigterm.
# with pytest.raises(subprocess.CalledProcessError):
# print(run_string_as_driver(
# driver.format(temp_dir=str(temp_folder), signum=15)))
# wait_for_condition(is_dir_empty, timeout=1000)
if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
+2
View File
@@ -818,6 +818,8 @@ def shutdown(_exiting_interpreter=False):
# Shut down the Ray processes.
global _global_node
if _global_node is not None:
if _global_node.is_head():
_global_node.destroy_external_storage()
_global_node.kill_all_processes(check_alive=False, allow_graceful=True)
_global_node = None