diff --git a/python/ray/node.py b/python/ray/node.py index 2668d9aa0..a63a0a8a8 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -167,6 +167,11 @@ class Node: self._init_temp(redis_client) + # If it is a head node, try validating if + # external storage is configurable. + if head: + self.validate_external_storage() + if connect_only: # Get socket names from the configuration. self._plasma_store_socket_name = ( @@ -1164,3 +1169,44 @@ class Node: storage = external_storage.setup_external_storage( object_spilling_config) storage.destroy_external_storage() + + def validate_external_storage(self): + """Make sure we can setup the object spilling external storage. + This will also fill up the default setting for object spilling + if not specified. + """ + object_spilling_config = self._config.get("object_spilling_config", {}) + automatic_spilling_enabled = self._config.get( + "automatic_object_spilling_enabled", True) + if not automatic_spilling_enabled: + return + + # If the config is not specified, we fill up the default. + if not object_spilling_config: + object_spilling_config = json.dumps({ + "type": "filesystem", + "params": { + "directory_path": self._session_dir + } + }) + + # Try setting up the storage. + # Configure the proper system config. + # We need to set both ray param's system config and self._config + # because they could've been diverged at this point. + deserialized_config = json.loads(object_spilling_config) + self._ray_params._system_config["object_spilling_config"] = ( + object_spilling_config) + self._config["object_spilling_config"] = object_spilling_config + + is_external_storage_type_fs = ( + deserialized_config["type"] == "filesystem") + self._ray_params._system_config["is_external_storage_type_fs"] = ( + is_external_storage_type_fs) + self._config["is_external_storage_type_fs"] = ( + is_external_storage_type_fs) + + # Validate external storage usage. + from ray import external_storage + external_storage.setup_external_storage(deserialized_config) + external_storage.reset_external_storage() diff --git a/python/ray/parameter.py b/python/ray/parameter.py index 666b82905..af7bdf475 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -1,4 +1,3 @@ -import json import logging import os @@ -320,16 +319,3 @@ class RayParams: if numpy_major <= 1 and numpy_minor < 16: logger.warning("Using ray with numpy < 1.16.0 will result in slow " "serialization. Upgrade numpy if using with ray.") - - # Make sure object spilling configuration is applicable. - object_spilling_config = self._system_config.get( - "object_spilling_config", {}) - if object_spilling_config: - object_spilling_config = json.loads(object_spilling_config) - from ray import external_storage - # Validate external storage usage. - external_storage.setup_external_storage(object_spilling_config) - external_storage.reset_external_storage() - # Configure the proper system config. - self._system_config["is_external_storage_type_fs"] = ( - object_spilling_config["type"] == "filesystem") diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index e33af42de..4c80aea70 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -261,6 +261,9 @@ def test_ray_options(shutdown_only): "ray_start_cluster_head", [{ "num_cpus": 0, "object_store_memory": 75 * 1024 * 1024, + "_system_config": { + "automatic_object_spilling_enabled": False + } }], indirect=True) def test_fetch_local(ray_start_cluster_head): diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index f45aea9b4..abd82011d 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -1039,7 +1039,10 @@ def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head): def test_fill_object_store_exception(shutdown_only): - ray.init(num_cpus=2, object_store_memory=10**8) + ray.init( + num_cpus=2, + object_store_memory=10**8, + _system_config={"automatic_object_spilling_enabled": False}) @ray.remote def expensive_task(): diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index 242799dc9..159e0aaf7 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -69,6 +69,14 @@ def multi_node_object_spilling_config(request, tmp_path): yield create_object_spilling_config(request, tmp_path) +def run_basic_workload(): + """Run the workload that requires spilling.""" + arr = np.random.rand(5 * 1024 * 1024) # 40 MB + refs = [] + refs.append([ray.put(arr) for _ in range(2)]) + ray.get(ray.put(arr)) + + def is_dir_empty(temp_folder, append_path=ray.ray_constants.DEFAULT_OBJECT_PREFIX): # append_path is used because the file based spilling will append @@ -111,6 +119,68 @@ def test_url_generation_and_parse(): assert parsed_result.size == size +@pytest.mark.skipif( + platform.system() == "Windows", reason="Failing on Windows.") +def test_default_config(shutdown_only): + ray.init(num_cpus=0, object_store_memory=75 * 1024 * 1024) + # Make sure the object spilling configuration is properly set. + config = json.loads( + ray.worker._global_node._config["object_spilling_config"]) + assert config["type"] == "filesystem" + assert (config["params"]["directory_path"] == + ray.worker._global_node._session_dir) + # Make sure the basic workload can succeed. + run_basic_workload() + ray.shutdown() + + # Make sure config is not initalized if spilling is not enabled.. + ray.init( + num_cpus=0, + object_store_memory=75 * 1024 * 1024, + _system_config={ + "automatic_object_spilling_enabled": False, + "object_store_full_delay_ms": 100 + }) + assert "object_spilling_config" not in ray.worker._global_node._config + with pytest.raises(ray.exceptions.ObjectStoreFullError): + run_basic_workload() + ray.shutdown() + + # Make sure when we use a different config, it is reflected. + ray.init( + num_cpus=0, + _system_config={ + "object_spilling_config": ( + json.dumps(mock_distributed_fs_object_spilling_config)) + }) + config = json.loads( + ray.worker._global_node._config["object_spilling_config"]) + assert config["type"] == "mock_distributed_fs" + + +@pytest.mark.skipif( + platform.system() == "Windows", reason="Failing on Windows.") +def test_default_config_cluster(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node(num_cpus=0) + ray.init(cluster.address) + worker_nodes = [] + worker_nodes.append( + cluster.add_node(num_cpus=1, object_store_memory=75 * 1024 * 1024)) + cluster.wait_for_nodes() + + # Run the basic spilling workload on both + # worker nodes and make sure they are working. + @ray.remote + def task(): + arr = np.random.rand(5 * 1024 * 1024) # 40 MB + refs = [] + refs.append([ray.put(arr) for _ in range(2)]) + ray.get(ray.put(arr)) + + ray.get([task.remote() for _ in range(2)]) + + @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") def test_spilling_not_done_for_pinned_object(object_spilling_config, @@ -690,9 +760,7 @@ import json import os import signal import numpy as np - import ray - ray.init( object_store_memory=75 * 1024 * 1024, _system_config={{ @@ -709,7 +777,6 @@ ray.init( }}) arr = np.random.rand(1024 * 1024) # 8 MB data replay_buffer = [] - # Spill lots of objects for _ in range(30): ref = None diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index a47a9a828..02638ed3d 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -18,8 +18,10 @@ logger = logging.getLogger(__name__) @pytest.fixture def one_worker_100MiB(request): + # It has lots of tests that don't require object spilling. config = { "task_retry_delay_ms": 0, + "automatic_object_spilling_enabled": False } yield ray.init( num_cpus=1, diff --git a/python/ray/tests/test_reference_counting_2.py b/python/ray/tests/test_reference_counting_2.py index 8cc7576aa..416afcec0 100644 --- a/python/ray/tests/test_reference_counting_2.py +++ b/python/ray/tests/test_reference_counting_2.py @@ -22,6 +22,7 @@ def one_worker_100MiB(request): config = { "task_retry_delay_ms": 0, "object_timeout_milliseconds": 1000, + "automatic_object_spilling_enabled": False } yield ray.init( num_cpus=1,