diff --git a/doc/source/index.rst b/doc/source/index.rst index af9cb8440..27d4667ce 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -130,7 +130,6 @@ Ray comes with libraries that accelerate deep learning and reinforcement learnin fault-tolerance.rst plasma-object-store.rst resources.rst - redis-memory-management.rst tempfile.rst .. toctree:: diff --git a/doc/source/redis-memory-management.rst b/doc/source/redis-memory-management.rst deleted file mode 100644 index 196bc81b9..000000000 --- a/doc/source/redis-memory-management.rst +++ /dev/null @@ -1,12 +0,0 @@ -Redis Memory Management (Experimental) -====================================== - -Ray stores metadata associated with tasks and objects in one or more Redis -servers, as described in `An Overview of the Internals -`_. Applications that are long-running or have high -task/object generation rate could risk high memory pressure, potentially leading -to out-of-memory (OOM) errors. - -In Ray `0.6.1+` Redis shards can be configured to LRU evict task and object -metadata by setting ``redis_max_memory`` when starting Ray. This supercedes the -previously documented flushing functionality. diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index fc89d48ed..aaa767a3d 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -18,6 +18,17 @@ ID_SIZE = 20 NIL_JOB_ID = ObjectID(ID_SIZE * b"\xff") NIL_FUNCTION_ID = NIL_JOB_ID +# The default maximum number of bytes to allocate to the object store unless +# overridden by the user. +DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES = 20 * 10**9 +# The smallest cap on the memory used by the object store that we allow. +OBJECT_STORE_MINIMUM_MEMORY_BYTES = 10**7 +# The default maximum number of bytes that the non-primary Redis shards are +# allowed to use unless overridden by the user. +DEFAULT_REDIS_MAX_MEMORY_BYTES = 10**10 +# The smallest cap on the memory used by Redis that we allow. +REDIS_MINIMUM_MEMORY_BYTES = 10**7 + # If a remote function or actor (or some other export) has serialized size # greater than this quantity, print an warning. PICKLE_OBJECT_WARNING_SIZE = 10**7 diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index d5b6747d4..8a6df1f1e 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -116,16 +116,16 @@ def cli(logging_level, logging_format): "--object-store-memory", required=False, type=int, - help="the maximum amount of memory (in bytes) to allow the " - "object store to use") + help="The amount of memory (in bytes) to start the object store with. " + "By default, this is capped at 20GB but can be set higher.") @click.option( "--redis-max-memory", required=False, type=int, - help=("The max amount of memory (in bytes) to allow redis to use, or None " - "for no limit. Once the limit is exceeded, redis will start LRU " - "eviction of entries. This only applies to the sharded " - "redis tables (task and object tables).")) + help="The max amount of memory (in bytes) to allow redis to use. Once the " + "limit is exceeded, redis will start LRU eviction of entries. This only " + "applies to the sharded redis tables (task, object, and profile tables). " + "By default this is capped at 10GB but can be set higher.") @click.option( "--num-workers", required=False, diff --git a/python/ray/services.py b/python/ray/services.py index 0e061e449..9fd6bc9f0 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -19,7 +19,7 @@ import redis import pyarrow # Ray modules -import ray.ray_constants +import ray.ray_constants as ray_constants import ray.plasma from ray.tempfile_services import ( @@ -36,9 +36,6 @@ PROCESS_TYPE_PLASMA_STORE = "plasma_store" PROCESS_TYPE_REDIS_SERVER = "redis_server" PROCESS_TYPE_WEB_UI = "web_ui" -# Max bytes to allocate to plasma unless overriden by the user -MAX_DEFAULT_MEM = 20 * 1000 * 1000 * 1000 - # This is a dictionary tracking all of the processes of different types that # have been started by this services module. Note that the order of the keys is # important because it determines the order in which these processes will be @@ -446,10 +443,11 @@ def start_redis(node_ip_address, use_credis: If True, additionally load the chain-replicated libraries into the redis servers. Defaults to None, which means its value is set by the presence of "RAY_USE_NEW_GCS" in os.environ. - redis_max_memory: The max amount of memory (in bytes) to allow redis - to use, or None for no limit. Once the limit is exceeded, redis - will start LRU eviction of entries. This only applies to the - sharded redis tables (task and object tables). + redis_max_memory: The max amount of memory (in bytes) to allow each + redis shard to use. Once the limit is exceeded, redis will start + LRU eviction of entries. This only applies to the sharded redis + tables (task, object, and profile tables). By default, this is + capped at 10GB but can be set higher. Returns: A tuple of the address for the primary Redis shard and a list of @@ -481,6 +479,8 @@ def start_redis(node_ip_address, stderr_file=redis_stderr_file, cleanup=cleanup, password=password, + # Below we use None to indicate no limit on the memory of the + # primary Redis shard. redis_max_memory=None) else: assigned_port, _ = _start_redis_instance( @@ -496,6 +496,8 @@ def start_redis(node_ip_address, # supplies. modules=[CREDIS_MASTER_MODULE, REDIS_MODULE], password=password, + # Below we use None to indicate no limit on the memory of the + # primary Redis shard. redis_max_memory=None) if port is not None: assert assigned_port == port @@ -516,6 +518,15 @@ def start_redis(node_ip_address, # Store version information in the primary Redis shard. _put_version_info_in_redis(primary_redis_client) + # Cap the memory of the other redis shards if no limit is provided. + redis_max_memory = (redis_max_memory if redis_max_memory is not None else + ray_constants.DEFAULT_REDIS_MAX_MEMORY_BYTES) + if redis_max_memory < ray_constants.REDIS_MINIMUM_MEMORY_BYTES: + raise ValueError("Attempting to cap Redis memory usage at {} bytes, " + "but the minimum allowed is {} bytes.".format( + redis_max_memory, + ray_constants.REDIS_MINIMUM_MEMORY_BYTES)) + # Start other Redis shards. Each Redis shard logs to a separate file, # prefixed by "redis-". redis_shards = [] @@ -860,9 +871,9 @@ def check_and_update_resources(resources): and not resource_quantity.is_integer()): raise ValueError("Resource quantities must all be whole numbers.") - if resource_quantity > ray.ray_constants.MAX_RESOURCE_QUANTITY: + if resource_quantity > ray_constants.MAX_RESOURCE_QUANTITY: raise ValueError("Resource quantities must be at most {}.".format( - ray.ray_constants.MAX_RESOURCE_QUANTITY)) + ray_constants.MAX_RESOURCE_QUANTITY)) return resources @@ -1033,13 +1044,15 @@ def determine_plasma_store_config(object_store_memory=None, if object_store_memory is None: object_store_memory = int(system_memory * 0.4) # Cap memory to avoid memory waste and perf issues on large nodes - if object_store_memory > MAX_DEFAULT_MEM: + if (object_store_memory > + ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES): logger.warning( "Warning: Capping object memory store to {}GB. ".format( - MAX_DEFAULT_MEM // 1e9) + - "To increase this further, specify `object_store_memory` " + ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES // 1e9) + + "To increase this further, specify `object_store_memory` " "when calling ray.init() or ray start.") - object_store_memory = MAX_DEFAULT_MEM + object_store_memory = ( + ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES) # Determine which directory to use. By default, use /tmp on MacOS and # /dev/shm on Linux, unless the shared-memory file system is too small, @@ -1122,6 +1135,12 @@ def start_plasma_store(node_ip_address, object_store_memory, plasma_directory = determine_plasma_store_config( object_store_memory, plasma_directory, huge_pages) + if object_store_memory < ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES: + raise ValueError("Attempting to cap object store memory usage at {} " + "bytes, but the minimum allowed is {} bytes.".format( + object_store_memory, + ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES)) + # Print the object store memory using two decimal places. object_store_memory_str = (object_store_memory / 10**7) / 10**2 logger.info("Starting the Plasma object store with {} GB memory " diff --git a/python/ray/worker.py b/python/ray/worker.py index bd0da2ed7..72cfeae35 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1495,11 +1495,13 @@ def init(redis_address=None, resources: A dictionary mapping the name of a resource to the quantity of that resource available. object_store_memory: The amount of memory (in bytes) to start the - object store with. - redis_max_memory: The max amount of memory (in bytes) to allow redis - to use, or None for no limit. Once the limit is exceeded, redis - will start LRU eviction of entries. This only applies to the - sharded redis tables (task and object tables). + object store with. By default, this is capped at 20GB but can be + set higher. + redis_max_memory: The max amount of memory (in bytes) to allow each + redis shard to use. Once the limit is exceeded, redis will start + LRU eviction of entries. This only applies to the sharded redis + tables (task, object, and profile tables). By default, this is + capped at 10GB but can be set higher. node_ip_address (str): The IP address of the node that we are on. object_id_seed (int): Used to seed the deterministic generation of object IDs. The same value can be used across multiple runs of the diff --git a/test/stress_tests.py b/test/stress_tests.py index 22edf4a6b..8ebfd32f3 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -9,8 +9,8 @@ import pytest import time import ray -from ray.parameter import RayParams import ray.tempfile_services +from ray.test.cluster_utils import Cluster import ray.ray_constants as ray_constants @@ -24,7 +24,8 @@ def ray_start_sharded(request): # 1-node chain for that shard only. # Start the Ray processes. - ray.init(num_cpus=10, num_redis_shards=num_redis_shards) + ray.init( + num_cpus=10, num_redis_shards=num_redis_shards, redis_max_memory=10**7) yield None @@ -34,17 +35,25 @@ def ray_start_sharded(request): @pytest.fixture(params=[(1, 4), (4, 4)]) def ray_start_combination(request): - num_local_schedulers = request.param[0] + num_nodes = request.param[0] num_workers_per_scheduler = request.param[1] # Start the Ray processes. - ray_params = RayParams( - start_ray_local=True, - num_local_schedulers=num_local_schedulers, - num_cpus=10) - ray.worker._init(ray_params) - yield num_local_schedulers, num_workers_per_scheduler + cluster = Cluster( + initialize_head=True, + head_node_args={ + "resources": { + "CPU": 10 + }, + "redis_max_memory": 10**7 + }) + for i in range(num_nodes - 1): + cluster.add_node(num_cpus=10) + ray.init(redis_address=cluster.redis_address) + + yield num_nodes, num_workers_per_scheduler # The code after the yield will run as teardown code. ray.shutdown() + cluster.shutdown() def test_submitting_tasks(ray_start_combination): @@ -156,8 +165,8 @@ def test_getting_many_objects(ray_start_sharded): def test_wait(ray_start_combination): - num_local_schedulers, num_workers_per_scheduler = ray_start_combination - num_workers = num_local_schedulers * num_workers_per_scheduler + num_nodes, num_workers_per_scheduler = ray_start_combination + num_workers = num_nodes * num_workers_per_scheduler @ray.remote def f(x): @@ -184,83 +193,45 @@ def test_wait(ray_start_combination): @pytest.fixture(params=[1, 4]) def ray_start_reconstruction(request): - num_local_schedulers = request.param + num_nodes = request.param - # Start the Redis global state store. - node_ip_address = "127.0.0.1" - redis_address, redis_shards = ray.services.start_redis(node_ip_address) - redis_ip_address = ray.services.get_ip_address(redis_address) - redis_port = ray.services.get_port(redis_address) - time.sleep(0.1) - - # Start the Plasma store instances with a total of 1GB memory. plasma_store_memory = 10**9 - plasma_addresses = [] - object_store_memory = plasma_store_memory // num_local_schedulers - for i in range(num_local_schedulers): - store_stdout_file, store_stderr_file = ( - ray.tempfile_services.new_plasma_store_log_file(i, True)) - plasma_addresses.append( - ray.services.start_plasma_store( - node_ip_address, - redis_address, - object_store_memory=object_store_memory, - store_stdout_file=store_stdout_file, - store_stderr_file=store_stderr_file)) - # Start the rest of the services in the Ray cluster. - address_info = { - "redis_address": redis_address, - "redis_shards": redis_shards, - "object_store_addresses": plasma_addresses - } - ray_params = RayParams( - address_info=address_info, - start_ray_local=True, - num_local_schedulers=num_local_schedulers, - num_cpus=[1] * num_local_schedulers, - redirect_output=True, - _internal_config=json.dumps({ - "initial_reconstruction_timeout_milliseconds": 200 - })) - ray.worker._init(ray_params) + cluster = Cluster( + initialize_head=True, + head_node_args={ + "resources": { + "CPU": 1 + }, + "object_store_memory": plasma_store_memory // num_nodes, + "redis_max_memory": 10**7, + "redirect_output": True, + "_internal_config": json.dumps({ + "initial_reconstruction_timeout_milliseconds": 200 + }) + }) + for i in range(num_nodes - 1): + cluster.add_node( + num_cpus=1, + object_store_memory=plasma_store_memory // num_nodes, + redirect_output=True, + _internal_config=json.dumps({ + "initial_reconstruction_timeout_milliseconds": 200 + })) + ray.init(redis_address=cluster.redis_address) - yield (redis_ip_address, redis_port, plasma_store_memory, - num_local_schedulers) - - # The code after the yield will run as teardown code. - assert ray.services.all_processes_alive() - - # Determine the IDs of all local schedulers that had a task scheduled - # or submitted. - state = ray.experimental.state.GlobalState() - state._initialize_global_state(redis_ip_address, redis_port) - if os.environ.get("RAY_USE_NEW_GCS") == "on": - tasks = state.task_table() - local_scheduler_ids = { - task["LocalSchedulerID"] - for task in tasks.values() - } - - # Make sure that all nodes in the cluster were used by checking that - # the set of local scheduler IDs that had a task scheduled or submitted - # is equal to the total number of local schedulers started. We add one - # to the total number of local schedulers to account for - # NIL_LOCAL_SCHEDULER_ID. This is the local scheduler ID associated - # with the driver task, since it is not scheduled by a particular local - # scheduler. - if os.environ.get("RAY_USE_NEW_GCS") == "on": - assert len(local_scheduler_ids) == num_local_schedulers + 1 + yield plasma_store_memory, num_nodes, cluster # Clean up the Ray cluster. ray.shutdown() + cluster.shutdown() @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Failing with new GCS API on Linux.") def test_simple(ray_start_reconstruction): - _, _, plasma_store_memory, num_local_schedulers = ray_start_reconstruction + plasma_store_memory, num_nodes, cluster = ray_start_reconstruction # Define the size of one task's return argument so that the combined # sum of all objects' sizes is at least twice the plasma stores' # combined allotted memory. @@ -290,12 +261,15 @@ def test_simple(ray_start_reconstruction): value = ray.get(args[i]) assert value[0] == i # Get values sequentially, in chunks. - num_chunks = 4 * num_local_schedulers + num_chunks = 4 * num_nodes chunk = num_objects // num_chunks for i in range(num_chunks): values = ray.get(args[i * chunk:(i + 1) * chunk]) del values + for node in cluster.list_all_nodes(): + assert node.all_processes_alive() + def sorted_random_indexes(total, output_num): random_indexes = [np.random.randint(total) for _ in range(output_num)] @@ -307,7 +281,7 @@ def sorted_random_indexes(total, output_num): os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Failing with new GCS API on Linux.") def test_recursive(ray_start_reconstruction): - _, _, plasma_store_memory, num_local_schedulers = ray_start_reconstruction + plasma_store_memory, num_nodes, cluster = ray_start_reconstruction # Define the size of one task's return argument so that the combined # sum of all objects' sizes is at least twice the plasma stores' # combined allotted memory. @@ -352,18 +326,21 @@ def test_recursive(ray_start_reconstruction): value = ray.get(args[i]) assert value[0] == i # Get values sequentially, in chunks. - num_chunks = 4 * num_local_schedulers + num_chunks = 4 * num_nodes chunk = num_objects // num_chunks for i in range(num_chunks): values = ray.get(args[i * chunk:(i + 1) * chunk]) del values + for node in cluster.list_all_nodes(): + assert node.all_processes_alive() + @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Failing with new GCS API on Linux.") def test_multiple_recursive(ray_start_reconstruction): - _, _, plasma_store_memory, _ = ray_start_reconstruction + plasma_store_memory, _, cluster = ray_start_reconstruction # Define the size of one task's return argument so that the combined # sum of all objects' sizes is at least twice the plasma stores' # combined allotted memory. @@ -412,6 +389,9 @@ def test_multiple_recursive(ray_start_reconstruction): value = ray.get(args[i]) assert value[0] == i + for node in cluster.list_all_nodes(): + assert node.all_processes_alive() + def wait_for_errors(error_check): # Wait for errors from all the nondeterministic tasks. @@ -434,7 +414,7 @@ def wait_for_errors(error_check): os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Failing with new GCS API on Linux.") def test_nondeterministic_task(ray_start_reconstruction): - _, _, plasma_store_memory, num_local_schedulers = ray_start_reconstruction + plasma_store_memory, num_nodes, cluster = ray_start_reconstruction # Define the size of one task's return argument so that the combined # sum of all objects' sizes is at least twice the plasma stores' # combined allotted memory. @@ -478,7 +458,7 @@ def test_nondeterministic_task(ray_start_reconstruction): assert value[0] == i def error_check(errors): - if num_local_schedulers == 1: + if num_nodes == 1: # In a single-node setting, each object is evicted and # reconstructed exactly once, so exactly half the objects will # produce an error during reconstruction. @@ -495,6 +475,9 @@ def test_nondeterministic_task(ray_start_reconstruction): assert all(error["type"] == ray_constants.HASH_MISMATCH_PUSH_ERROR for error in errors) + for node in cluster.list_all_nodes(): + assert node.all_processes_alive() + @pytest.fixture def ray_start_driver_put_errors():