Limit default redis max memory to 10GB. (#3630)

* Limit Redis max memory to 10GB/shard by default.

* Update stress tests.

* Reorganize

* Update

* Add minimum cap size for object store and redis.

* Small test update.
This commit is contained in:
Robert Nishihara
2019-01-03 13:23:54 -08:00
committed by Philipp Moritz
parent 4b23a34c93
commit 586a5c9ffa
7 changed files with 121 additions and 119 deletions
-1
View File
@@ -130,7 +130,6 @@ Ray comes with libraries that accelerate deep learning and reinforcement learnin
fault-tolerance.rst
plasma-object-store.rst
resources.rst
redis-memory-management.rst
tempfile.rst
.. toctree::
-12
View File
@@ -1,12 +0,0 @@
Redis Memory Management (Experimental)
======================================
Ray stores metadata associated with tasks and objects in one or more Redis
servers, as described in `An Overview of the Internals
<internals-overview.html>`_. Applications that are long-running or have high
task/object generation rate could risk high memory pressure, potentially leading
to out-of-memory (OOM) errors.
In Ray `0.6.1+` Redis shards can be configured to LRU evict task and object
metadata by setting ``redis_max_memory`` when starting Ray. This supercedes the
previously documented flushing functionality.
+11
View File
@@ -18,6 +18,17 @@ ID_SIZE = 20
NIL_JOB_ID = ObjectID(ID_SIZE * b"\xff")
NIL_FUNCTION_ID = NIL_JOB_ID
# The default maximum number of bytes to allocate to the object store unless
# overridden by the user.
DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES = 20 * 10**9
# The smallest cap on the memory used by the object store that we allow.
OBJECT_STORE_MINIMUM_MEMORY_BYTES = 10**7
# The default maximum number of bytes that the non-primary Redis shards are
# allowed to use unless overridden by the user.
DEFAULT_REDIS_MAX_MEMORY_BYTES = 10**10
# The smallest cap on the memory used by Redis that we allow.
REDIS_MINIMUM_MEMORY_BYTES = 10**7
# If a remote function or actor (or some other export) has serialized size
# greater than this quantity, print an warning.
PICKLE_OBJECT_WARNING_SIZE = 10**7
+6 -6
View File
@@ -116,16 +116,16 @@ def cli(logging_level, logging_format):
"--object-store-memory",
required=False,
type=int,
help="the maximum amount of memory (in bytes) to allow the "
"object store to use")
help="The amount of memory (in bytes) to start the object store with. "
"By default, this is capped at 20GB but can be set higher.")
@click.option(
"--redis-max-memory",
required=False,
type=int,
help=("The max amount of memory (in bytes) to allow redis to use, or None "
"for no limit. Once the limit is exceeded, redis will start LRU "
"eviction of entries. This only applies to the sharded "
"redis tables (task and object tables)."))
help="The max amount of memory (in bytes) to allow redis to use. Once the "
"limit is exceeded, redis will start LRU eviction of entries. This only "
"applies to the sharded redis tables (task, object, and profile tables). "
"By default this is capped at 10GB but can be set higher.")
@click.option(
"--num-workers",
required=False,
+33 -14
View File
@@ -19,7 +19,7 @@ import redis
import pyarrow
# Ray modules
import ray.ray_constants
import ray.ray_constants as ray_constants
import ray.plasma
from ray.tempfile_services import (
@@ -36,9 +36,6 @@ PROCESS_TYPE_PLASMA_STORE = "plasma_store"
PROCESS_TYPE_REDIS_SERVER = "redis_server"
PROCESS_TYPE_WEB_UI = "web_ui"
# Max bytes to allocate to plasma unless overriden by the user
MAX_DEFAULT_MEM = 20 * 1000 * 1000 * 1000
# This is a dictionary tracking all of the processes of different types that
# have been started by this services module. Note that the order of the keys is
# important because it determines the order in which these processes will be
@@ -446,10 +443,11 @@ def start_redis(node_ip_address,
use_credis: If True, additionally load the chain-replicated libraries
into the redis servers. Defaults to None, which means its value is
set by the presence of "RAY_USE_NEW_GCS" in os.environ.
redis_max_memory: The max amount of memory (in bytes) to allow redis
to use, or None for no limit. Once the limit is exceeded, redis
will start LRU eviction of entries. This only applies to the
sharded redis tables (task and object tables).
redis_max_memory: The max amount of memory (in bytes) to allow each
redis shard to use. Once the limit is exceeded, redis will start
LRU eviction of entries. This only applies to the sharded redis
tables (task, object, and profile tables). By default, this is
capped at 10GB but can be set higher.
Returns:
A tuple of the address for the primary Redis shard and a list of
@@ -481,6 +479,8 @@ def start_redis(node_ip_address,
stderr_file=redis_stderr_file,
cleanup=cleanup,
password=password,
# Below we use None to indicate no limit on the memory of the
# primary Redis shard.
redis_max_memory=None)
else:
assigned_port, _ = _start_redis_instance(
@@ -496,6 +496,8 @@ def start_redis(node_ip_address,
# supplies.
modules=[CREDIS_MASTER_MODULE, REDIS_MODULE],
password=password,
# Below we use None to indicate no limit on the memory of the
# primary Redis shard.
redis_max_memory=None)
if port is not None:
assert assigned_port == port
@@ -516,6 +518,15 @@ def start_redis(node_ip_address,
# Store version information in the primary Redis shard.
_put_version_info_in_redis(primary_redis_client)
# Cap the memory of the other redis shards if no limit is provided.
redis_max_memory = (redis_max_memory if redis_max_memory is not None else
ray_constants.DEFAULT_REDIS_MAX_MEMORY_BYTES)
if redis_max_memory < ray_constants.REDIS_MINIMUM_MEMORY_BYTES:
raise ValueError("Attempting to cap Redis memory usage at {} bytes, "
"but the minimum allowed is {} bytes.".format(
redis_max_memory,
ray_constants.REDIS_MINIMUM_MEMORY_BYTES))
# Start other Redis shards. Each Redis shard logs to a separate file,
# prefixed by "redis-<shard number>".
redis_shards = []
@@ -860,9 +871,9 @@ def check_and_update_resources(resources):
and not resource_quantity.is_integer()):
raise ValueError("Resource quantities must all be whole numbers.")
if resource_quantity > ray.ray_constants.MAX_RESOURCE_QUANTITY:
if resource_quantity > ray_constants.MAX_RESOURCE_QUANTITY:
raise ValueError("Resource quantities must be at most {}.".format(
ray.ray_constants.MAX_RESOURCE_QUANTITY))
ray_constants.MAX_RESOURCE_QUANTITY))
return resources
@@ -1033,13 +1044,15 @@ def determine_plasma_store_config(object_store_memory=None,
if object_store_memory is None:
object_store_memory = int(system_memory * 0.4)
# Cap memory to avoid memory waste and perf issues on large nodes
if object_store_memory > MAX_DEFAULT_MEM:
if (object_store_memory >
ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES):
logger.warning(
"Warning: Capping object memory store to {}GB. ".format(
MAX_DEFAULT_MEM // 1e9) +
"To increase this further, specify `object_store_memory` "
ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES // 1e9)
+ "To increase this further, specify `object_store_memory` "
"when calling ray.init() or ray start.")
object_store_memory = MAX_DEFAULT_MEM
object_store_memory = (
ray_constants.DEFAULT_OBJECT_STORE_MAX_MEMORY_BYTES)
# Determine which directory to use. By default, use /tmp on MacOS and
# /dev/shm on Linux, unless the shared-memory file system is too small,
@@ -1122,6 +1135,12 @@ def start_plasma_store(node_ip_address,
object_store_memory, plasma_directory = determine_plasma_store_config(
object_store_memory, plasma_directory, huge_pages)
if object_store_memory < ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES:
raise ValueError("Attempting to cap object store memory usage at {} "
"bytes, but the minimum allowed is {} bytes.".format(
object_store_memory,
ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES))
# Print the object store memory using two decimal places.
object_store_memory_str = (object_store_memory / 10**7) / 10**2
logger.info("Starting the Plasma object store with {} GB memory "
+7 -5
View File
@@ -1495,11 +1495,13 @@ def init(redis_address=None,
resources: A dictionary mapping the name of a resource to the quantity
of that resource available.
object_store_memory: The amount of memory (in bytes) to start the
object store with.
redis_max_memory: The max amount of memory (in bytes) to allow redis
to use, or None for no limit. Once the limit is exceeded, redis
will start LRU eviction of entries. This only applies to the
sharded redis tables (task and object tables).
object store with. By default, this is capped at 20GB but can be
set higher.
redis_max_memory: The max amount of memory (in bytes) to allow each
redis shard to use. Once the limit is exceeded, redis will start
LRU eviction of entries. This only applies to the sharded redis
tables (task, object, and profile tables). By default, this is
capped at 10GB but can be set higher.
node_ip_address (str): The IP address of the node that we are on.
object_id_seed (int): Used to seed the deterministic generation of
object IDs. The same value can be used across multiple runs of the
+64 -81
View File
@@ -9,8 +9,8 @@ import pytest
import time
import ray
from ray.parameter import RayParams
import ray.tempfile_services
from ray.test.cluster_utils import Cluster
import ray.ray_constants as ray_constants
@@ -24,7 +24,8 @@ def ray_start_sharded(request):
# 1-node chain for that shard only.
# Start the Ray processes.
ray.init(num_cpus=10, num_redis_shards=num_redis_shards)
ray.init(
num_cpus=10, num_redis_shards=num_redis_shards, redis_max_memory=10**7)
yield None
@@ -34,17 +35,25 @@ def ray_start_sharded(request):
@pytest.fixture(params=[(1, 4), (4, 4)])
def ray_start_combination(request):
num_local_schedulers = request.param[0]
num_nodes = request.param[0]
num_workers_per_scheduler = request.param[1]
# Start the Ray processes.
ray_params = RayParams(
start_ray_local=True,
num_local_schedulers=num_local_schedulers,
num_cpus=10)
ray.worker._init(ray_params)
yield num_local_schedulers, num_workers_per_scheduler
cluster = Cluster(
initialize_head=True,
head_node_args={
"resources": {
"CPU": 10
},
"redis_max_memory": 10**7
})
for i in range(num_nodes - 1):
cluster.add_node(num_cpus=10)
ray.init(redis_address=cluster.redis_address)
yield num_nodes, num_workers_per_scheduler
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
def test_submitting_tasks(ray_start_combination):
@@ -156,8 +165,8 @@ def test_getting_many_objects(ray_start_sharded):
def test_wait(ray_start_combination):
num_local_schedulers, num_workers_per_scheduler = ray_start_combination
num_workers = num_local_schedulers * num_workers_per_scheduler
num_nodes, num_workers_per_scheduler = ray_start_combination
num_workers = num_nodes * num_workers_per_scheduler
@ray.remote
def f(x):
@@ -184,83 +193,45 @@ def test_wait(ray_start_combination):
@pytest.fixture(params=[1, 4])
def ray_start_reconstruction(request):
num_local_schedulers = request.param
num_nodes = request.param
# Start the Redis global state store.
node_ip_address = "127.0.0.1"
redis_address, redis_shards = ray.services.start_redis(node_ip_address)
redis_ip_address = ray.services.get_ip_address(redis_address)
redis_port = ray.services.get_port(redis_address)
time.sleep(0.1)
# Start the Plasma store instances with a total of 1GB memory.
plasma_store_memory = 10**9
plasma_addresses = []
object_store_memory = plasma_store_memory // num_local_schedulers
for i in range(num_local_schedulers):
store_stdout_file, store_stderr_file = (
ray.tempfile_services.new_plasma_store_log_file(i, True))
plasma_addresses.append(
ray.services.start_plasma_store(
node_ip_address,
redis_address,
object_store_memory=object_store_memory,
store_stdout_file=store_stdout_file,
store_stderr_file=store_stderr_file))
# Start the rest of the services in the Ray cluster.
address_info = {
"redis_address": redis_address,
"redis_shards": redis_shards,
"object_store_addresses": plasma_addresses
}
ray_params = RayParams(
address_info=address_info,
start_ray_local=True,
num_local_schedulers=num_local_schedulers,
num_cpus=[1] * num_local_schedulers,
redirect_output=True,
_internal_config=json.dumps({
"initial_reconstruction_timeout_milliseconds": 200
}))
ray.worker._init(ray_params)
cluster = Cluster(
initialize_head=True,
head_node_args={
"resources": {
"CPU": 1
},
"object_store_memory": plasma_store_memory // num_nodes,
"redis_max_memory": 10**7,
"redirect_output": True,
"_internal_config": json.dumps({
"initial_reconstruction_timeout_milliseconds": 200
})
})
for i in range(num_nodes - 1):
cluster.add_node(
num_cpus=1,
object_store_memory=plasma_store_memory // num_nodes,
redirect_output=True,
_internal_config=json.dumps({
"initial_reconstruction_timeout_milliseconds": 200
}))
ray.init(redis_address=cluster.redis_address)
yield (redis_ip_address, redis_port, plasma_store_memory,
num_local_schedulers)
# The code after the yield will run as teardown code.
assert ray.services.all_processes_alive()
# Determine the IDs of all local schedulers that had a task scheduled
# or submitted.
state = ray.experimental.state.GlobalState()
state._initialize_global_state(redis_ip_address, redis_port)
if os.environ.get("RAY_USE_NEW_GCS") == "on":
tasks = state.task_table()
local_scheduler_ids = {
task["LocalSchedulerID"]
for task in tasks.values()
}
# Make sure that all nodes in the cluster were used by checking that
# the set of local scheduler IDs that had a task scheduled or submitted
# is equal to the total number of local schedulers started. We add one
# to the total number of local schedulers to account for
# NIL_LOCAL_SCHEDULER_ID. This is the local scheduler ID associated
# with the driver task, since it is not scheduled by a particular local
# scheduler.
if os.environ.get("RAY_USE_NEW_GCS") == "on":
assert len(local_scheduler_ids) == num_local_schedulers + 1
yield plasma_store_memory, num_nodes, cluster
# Clean up the Ray cluster.
ray.shutdown()
cluster.shutdown()
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Failing with new GCS API on Linux.")
def test_simple(ray_start_reconstruction):
_, _, plasma_store_memory, num_local_schedulers = ray_start_reconstruction
plasma_store_memory, num_nodes, cluster = ray_start_reconstruction
# Define the size of one task's return argument so that the combined
# sum of all objects' sizes is at least twice the plasma stores'
# combined allotted memory.
@@ -290,12 +261,15 @@ def test_simple(ray_start_reconstruction):
value = ray.get(args[i])
assert value[0] == i
# Get values sequentially, in chunks.
num_chunks = 4 * num_local_schedulers
num_chunks = 4 * num_nodes
chunk = num_objects // num_chunks
for i in range(num_chunks):
values = ray.get(args[i * chunk:(i + 1) * chunk])
del values
for node in cluster.list_all_nodes():
assert node.all_processes_alive()
def sorted_random_indexes(total, output_num):
random_indexes = [np.random.randint(total) for _ in range(output_num)]
@@ -307,7 +281,7 @@ def sorted_random_indexes(total, output_num):
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Failing with new GCS API on Linux.")
def test_recursive(ray_start_reconstruction):
_, _, plasma_store_memory, num_local_schedulers = ray_start_reconstruction
plasma_store_memory, num_nodes, cluster = ray_start_reconstruction
# Define the size of one task's return argument so that the combined
# sum of all objects' sizes is at least twice the plasma stores'
# combined allotted memory.
@@ -352,18 +326,21 @@ def test_recursive(ray_start_reconstruction):
value = ray.get(args[i])
assert value[0] == i
# Get values sequentially, in chunks.
num_chunks = 4 * num_local_schedulers
num_chunks = 4 * num_nodes
chunk = num_objects // num_chunks
for i in range(num_chunks):
values = ray.get(args[i * chunk:(i + 1) * chunk])
del values
for node in cluster.list_all_nodes():
assert node.all_processes_alive()
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Failing with new GCS API on Linux.")
def test_multiple_recursive(ray_start_reconstruction):
_, _, plasma_store_memory, _ = ray_start_reconstruction
plasma_store_memory, _, cluster = ray_start_reconstruction
# Define the size of one task's return argument so that the combined
# sum of all objects' sizes is at least twice the plasma stores'
# combined allotted memory.
@@ -412,6 +389,9 @@ def test_multiple_recursive(ray_start_reconstruction):
value = ray.get(args[i])
assert value[0] == i
for node in cluster.list_all_nodes():
assert node.all_processes_alive()
def wait_for_errors(error_check):
# Wait for errors from all the nondeterministic tasks.
@@ -434,7 +414,7 @@ def wait_for_errors(error_check):
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Failing with new GCS API on Linux.")
def test_nondeterministic_task(ray_start_reconstruction):
_, _, plasma_store_memory, num_local_schedulers = ray_start_reconstruction
plasma_store_memory, num_nodes, cluster = ray_start_reconstruction
# Define the size of one task's return argument so that the combined
# sum of all objects' sizes is at least twice the plasma stores'
# combined allotted memory.
@@ -478,7 +458,7 @@ def test_nondeterministic_task(ray_start_reconstruction):
assert value[0] == i
def error_check(errors):
if num_local_schedulers == 1:
if num_nodes == 1:
# In a single-node setting, each object is evicted and
# reconstructed exactly once, so exactly half the objects will
# produce an error during reconstruction.
@@ -495,6 +475,9 @@ def test_nondeterministic_task(ray_start_reconstruction):
assert all(error["type"] == ray_constants.HASH_MISMATCH_PUSH_ERROR
for error in errors)
for node in cluster.list_all_nodes():
assert node.all_processes_alive()
@pytest.fixture
def ray_start_driver_put_errors():