[Object Spilling] Turn on by default. (#13745)

* Done.

* in progress.

* in progress.

* fixed tests.

* Fix.
This commit is contained in:
SangBin Cho
2021-01-31 23:28:37 -08:00
committed by GitHub
parent 2ba77ae3a2
commit d1ec787d9d
7 changed files with 126 additions and 18 deletions
+46
View File
@@ -167,6 +167,11 @@ class Node:
self._init_temp(redis_client)
# If it is a head node, try validating if
# external storage is configurable.
if head:
self.validate_external_storage()
if connect_only:
# Get socket names from the configuration.
self._plasma_store_socket_name = (
@@ -1164,3 +1169,44 @@ class Node:
storage = external_storage.setup_external_storage(
object_spilling_config)
storage.destroy_external_storage()
def validate_external_storage(self):
"""Make sure we can setup the object spilling external storage.
This will also fill up the default setting for object spilling
if not specified.
"""
object_spilling_config = self._config.get("object_spilling_config", {})
automatic_spilling_enabled = self._config.get(
"automatic_object_spilling_enabled", True)
if not automatic_spilling_enabled:
return
# If the config is not specified, we fill up the default.
if not object_spilling_config:
object_spilling_config = json.dumps({
"type": "filesystem",
"params": {
"directory_path": self._session_dir
}
})
# Try setting up the storage.
# Configure the proper system config.
# We need to set both ray param's system config and self._config
# because they could've been diverged at this point.
deserialized_config = json.loads(object_spilling_config)
self._ray_params._system_config["object_spilling_config"] = (
object_spilling_config)
self._config["object_spilling_config"] = object_spilling_config
is_external_storage_type_fs = (
deserialized_config["type"] == "filesystem")
self._ray_params._system_config["is_external_storage_type_fs"] = (
is_external_storage_type_fs)
self._config["is_external_storage_type_fs"] = (
is_external_storage_type_fs)
# Validate external storage usage.
from ray import external_storage
external_storage.setup_external_storage(deserialized_config)
external_storage.reset_external_storage()
-14
View File
@@ -1,4 +1,3 @@
import json
import logging
import os
@@ -320,16 +319,3 @@ class RayParams:
if numpy_major <= 1 and numpy_minor < 16:
logger.warning("Using ray with numpy < 1.16.0 will result in slow "
"serialization. Upgrade numpy if using with ray.")
# Make sure object spilling configuration is applicable.
object_spilling_config = self._system_config.get(
"object_spilling_config", {})
if object_spilling_config:
object_spilling_config = json.loads(object_spilling_config)
from ray import external_storage
# Validate external storage usage.
external_storage.setup_external_storage(object_spilling_config)
external_storage.reset_external_storage()
# Configure the proper system config.
self._system_config["is_external_storage_type_fs"] = (
object_spilling_config["type"] == "filesystem")
+3
View File
@@ -261,6 +261,9 @@ def test_ray_options(shutdown_only):
"ray_start_cluster_head", [{
"num_cpus": 0,
"object_store_memory": 75 * 1024 * 1024,
"_system_config": {
"automatic_object_spilling_enabled": False
}
}],
indirect=True)
def test_fetch_local(ray_start_cluster_head):
+4 -1
View File
@@ -1039,7 +1039,10 @@ def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head):
def test_fill_object_store_exception(shutdown_only):
ray.init(num_cpus=2, object_store_memory=10**8)
ray.init(
num_cpus=2,
object_store_memory=10**8,
_system_config={"automatic_object_spilling_enabled": False})
@ray.remote
def expensive_task():
+70 -3
View File
@@ -69,6 +69,14 @@ def multi_node_object_spilling_config(request, tmp_path):
yield create_object_spilling_config(request, tmp_path)
def run_basic_workload():
"""Run the workload that requires spilling."""
arr = np.random.rand(5 * 1024 * 1024) # 40 MB
refs = []
refs.append([ray.put(arr) for _ in range(2)])
ray.get(ray.put(arr))
def is_dir_empty(temp_folder,
append_path=ray.ray_constants.DEFAULT_OBJECT_PREFIX):
# append_path is used because the file based spilling will append
@@ -111,6 +119,68 @@ def test_url_generation_and_parse():
assert parsed_result.size == size
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_default_config(shutdown_only):
ray.init(num_cpus=0, object_store_memory=75 * 1024 * 1024)
# Make sure the object spilling configuration is properly set.
config = json.loads(
ray.worker._global_node._config["object_spilling_config"])
assert config["type"] == "filesystem"
assert (config["params"]["directory_path"] ==
ray.worker._global_node._session_dir)
# Make sure the basic workload can succeed.
run_basic_workload()
ray.shutdown()
# Make sure config is not initalized if spilling is not enabled..
ray.init(
num_cpus=0,
object_store_memory=75 * 1024 * 1024,
_system_config={
"automatic_object_spilling_enabled": False,
"object_store_full_delay_ms": 100
})
assert "object_spilling_config" not in ray.worker._global_node._config
with pytest.raises(ray.exceptions.ObjectStoreFullError):
run_basic_workload()
ray.shutdown()
# Make sure when we use a different config, it is reflected.
ray.init(
num_cpus=0,
_system_config={
"object_spilling_config": (
json.dumps(mock_distributed_fs_object_spilling_config))
})
config = json.loads(
ray.worker._global_node._config["object_spilling_config"])
assert config["type"] == "mock_distributed_fs"
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_default_config_cluster(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=0)
ray.init(cluster.address)
worker_nodes = []
worker_nodes.append(
cluster.add_node(num_cpus=1, object_store_memory=75 * 1024 * 1024))
cluster.wait_for_nodes()
# Run the basic spilling workload on both
# worker nodes and make sure they are working.
@ray.remote
def task():
arr = np.random.rand(5 * 1024 * 1024) # 40 MB
refs = []
refs.append([ray.put(arr) for _ in range(2)])
ray.get(ray.put(arr))
ray.get([task.remote() for _ in range(2)])
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spilling_not_done_for_pinned_object(object_spilling_config,
@@ -690,9 +760,7 @@ import json
import os
import signal
import numpy as np
import ray
ray.init(
object_store_memory=75 * 1024 * 1024,
_system_config={{
@@ -709,7 +777,6 @@ ray.init(
}})
arr = np.random.rand(1024 * 1024) # 8 MB data
replay_buffer = []
# Spill lots of objects
for _ in range(30):
ref = None
@@ -18,8 +18,10 @@ logger = logging.getLogger(__name__)
@pytest.fixture
def one_worker_100MiB(request):
# It has lots of tests that don't require object spilling.
config = {
"task_retry_delay_ms": 0,
"automatic_object_spilling_enabled": False
}
yield ray.init(
num_cpus=1,
@@ -22,6 +22,7 @@ def one_worker_100MiB(request):
config = {
"task_retry_delay_ms": 0,
"object_timeout_milliseconds": 1000,
"automatic_object_spilling_enabled": False
}
yield ray.init(
num_cpus=1,