Add option to evict keys LRU from the sharded redis tables (#3499)

* wip

* wip

* format

* wip

* note

* lint

* fix

* flag

* typo

* raise timeout

* fix

* optional get

* fix flag

* increase timeout in test

* update docs

* format
This commit is contained in:
Eric Liang
2018-12-09 05:48:52 -08:00
committed by Philipp Moritz
parent 0136af5aac
commit cffe8f9806
11 changed files with 174 additions and 120 deletions
+2 -1
View File
@@ -37,7 +37,8 @@ class RayOutOfMemoryError(Exception):
round(psutil.virtual_memory().shared / 1e9, 2)) +
"currently being used by the Ray object store. You can set "
"the object store size with the `object_store_memory` "
"parameter when starting Ray.")
"parameter when starting Ray, and the max Redis size with "
"`redis_max_memory`.")
class MemoryMonitor(object):
+13
View File
@@ -128,6 +128,19 @@ class Profiler(object):
self.events.append(event)
class NoopProfiler(object):
"""A no-op profile used when collect_profile_data=False."""
def start_flush_thread(self):
pass
def flush_profile_data(self):
pass
def add_event(self, event):
pass
class RayLogSpanRaylet(object):
"""An object used to enable logging a span of events with a with statement.
+13 -8
View File
@@ -38,30 +38,33 @@ def create_parser(parser_creator=None):
"--redis-address",
default=None,
type=str,
help="The Redis address of the cluster.")
help="Connect to an existing Ray cluster at this address instead "
"of starting a new one.")
parser.add_argument(
"--ray-num-cpus",
default=None,
type=int,
help="--num-cpus to pass to Ray."
" This only has an affect in local mode.")
help="--num-cpus to use if starting a new cluster.")
parser.add_argument(
"--ray-num-gpus",
default=None,
type=int,
help="--num-gpus to pass to Ray."
" This only has an affect in local mode.")
help="--num-gpus to use if starting a new cluster.")
parser.add_argument(
"--ray-num-local-schedulers",
default=None,
type=int,
help="Emulate multiple cluster nodes for debugging.")
parser.add_argument(
"--ray-redis-max-memory",
default=None,
type=int,
help="--redis-max-memory to use if starting a new cluster.")
parser.add_argument(
"--ray-object-store-memory",
default=None,
type=int,
help="--object-store-memory to pass to Ray."
" This only has an affect in local mode.")
help="--object-store-memory to use if starting a new cluster.")
parser.add_argument(
"--experiment-name",
default="default",
@@ -122,12 +125,14 @@ def run(args, parser):
"num_cpus": args.ray_num_cpus or 1,
"num_gpus": args.ray_num_gpus or 0,
},
object_store_memory=args.ray_object_store_memory)
object_store_memory=args.ray_object_store_memory,
redis_max_memory=args.ray_redis_max_memory)
ray.init(redis_address=cluster.redis_address)
else:
ray.init(
redis_address=args.redis_address,
object_store_memory=args.ray_object_store_memory,
redis_max_memory=args.ray_redis_max_memory,
num_cpus=args.ray_num_cpus,
num_gpus=args.ray_num_gpus)
run_experiments(
+23 -5
View File
@@ -116,6 +116,22 @@ def cli(logging_level, logging_format):
type=int,
help="the maximum amount of memory (in bytes) to allow the "
"object store to use")
@click.option(
"--redis-max-memory",
required=False,
type=int,
help=("The max amount of memory (in bytes) to allow redis to use, or None "
"for no limit. Once the limit is exceeded, redis will start LRU "
"eviction of entries. This only applies to the sharded "
"redis tables (task and object tables)."))
@click.option(
"--collect-profiling-data",
default=True,
type=bool,
help=("Whether to collect profiling data. Note that "
"profiling data cannot be LRU evicted, so if you set "
"redis_max_memory then profiling will also be disabled to prevent "
"it from consuming all available redis memory."))
@click.option(
"--num-workers",
required=False,
@@ -202,11 +218,11 @@ def cli(logging_level, logging_format):
def start(node_ip_address, redis_address, redis_port, num_redis_shards,
redis_max_clients, redis_password, redis_shard_ports,
object_manager_port, node_manager_port, object_store_memory,
num_workers, num_cpus, num_gpus, resources, head, no_ui, block,
plasma_directory, huge_pages, autoscaling_config,
no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir,
internal_config):
redis_max_memory, collect_profiling_data, num_workers, num_cpus,
num_gpus, resources, head, no_ui, block, plasma_directory,
huge_pages, autoscaling_config, no_redirect_worker_output,
no_redirect_output, plasma_store_socket_name, raylet_socket_name,
temp_dir, internal_config):
# Convert hostnames to numerical IP address.
if node_ip_address is not None:
node_ip_address = services.address_to_ip(node_ip_address)
@@ -262,6 +278,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
redis_port=redis_port,
redis_shard_ports=redis_shard_ports,
object_store_memory=object_store_memory,
redis_max_memory=redis_max_memory,
collect_profiling_data=collect_profiling_data,
num_workers=num_workers,
cleanup=False,
redirect_worker_output=not no_redirect_worker_output,
+54 -8
View File
@@ -416,7 +416,8 @@ def start_redis(node_ip_address,
redirect_worker_output=False,
cleanup=True,
password=None,
use_credis=None):
use_credis=None,
redis_max_memory=None):
"""Start the Redis global state store.
Args:
@@ -445,6 +446,10 @@ def start_redis(node_ip_address,
use_credis: If True, additionally load the chain-replicated libraries
into the redis servers. Defaults to None, which means its value is
set by the presence of "RAY_USE_NEW_GCS" in os.environ.
redis_max_memory: The max amount of memory (in bytes) to allow redis
to use, or None for no limit. Once the limit is exceeded, redis
will start LRU eviction of entries. This only applies to the
sharded redis tables (task and object tables).
Returns:
A tuple of the address for the primary Redis shard and a list of
@@ -475,7 +480,8 @@ def start_redis(node_ip_address,
stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file,
cleanup=cleanup,
password=password)
password=password,
redis_max_memory=None)
else:
assigned_port, _ = _start_redis_instance(
node_ip_address=node_ip_address,
@@ -489,7 +495,8 @@ def start_redis(node_ip_address,
# as the latter contains an extern declaration that the former
# supplies.
modules=[CREDIS_MASTER_MODULE, REDIS_MODULE],
password=password)
password=password,
redis_max_memory=None)
if port is not None:
assert assigned_port == port
port = assigned_port
@@ -523,7 +530,8 @@ def start_redis(node_ip_address,
stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file,
cleanup=cleanup,
password=password)
password=password,
redis_max_memory=redis_max_memory)
else:
assert num_redis_shards == 1, \
"For now, RAY_USE_NEW_GCS supports 1 shard, and credis "\
@@ -540,7 +548,8 @@ def start_redis(node_ip_address,
# It is important to load the credis module BEFORE the ray
# module, as the latter contains an extern declaration that the
# former supplies.
modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE])
modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE],
redis_max_memory=redis_max_memory)
if redis_shard_ports[i] is not None:
assert redis_shard_port == redis_shard_ports[i]
@@ -570,7 +579,8 @@ def _start_redis_instance(node_ip_address="127.0.0.1",
cleanup=True,
password=None,
executable=REDIS_EXECUTABLE,
modules=None):
modules=None,
redis_max_memory=None):
"""Start a single Redis server.
Args:
@@ -594,6 +604,9 @@ def _start_redis_instance(node_ip_address="127.0.0.1",
modules (list of str): A list of pathnames, pointing to the redis
module(s) that will be loaded in this redis server. If None, load
the default Ray redis module.
redis_max_memory: The max amount of memory (in bytes) to allow redis
to use, or None for no limit. Once the limit is exceeded, redis
will start LRU eviction of entries.
Returns:
A tuple of the port used by Redis and a handle to the process that was
@@ -657,6 +670,14 @@ def _start_redis_instance(node_ip_address="127.0.0.1",
# hosts can connect to it. TODO(rkn): Do this in a more secure way.
redis_client.config_set("protected-mode", "no")
# Discard old task and object metadata.
if redis_max_memory is not None:
redis_client.config_set("maxmemory", str(redis_max_memory))
redis_client.config_set("maxmemory-policy", "allkeys-lru")
redis_client.config_set("maxmemory-samples", "10")
logger.info("Starting Redis shard with {} GB max memory.".format(
round(redis_max_memory / 1e9, 2)))
# If redis_max_clients is provided, attempt to raise the number of maximum
# number of Redis clients.
if redis_max_clients is not None:
@@ -861,7 +882,8 @@ def start_raylet(redis_address,
stderr_file=None,
cleanup=True,
config=None,
redis_password=None):
redis_password=None,
collect_profiling_data=True):
"""Start a raylet, which is a combined local scheduler and object manager.
Args:
@@ -894,6 +916,7 @@ def start_raylet(redis_address,
config (dict|None): Optional Raylet configuration that will
override defaults in RayConfig.
redis_password (str): The password of the redis server.
collect_profiling_data: Whether to collect profiling data from workers.
Returns:
The raylet socket name.
@@ -923,9 +946,11 @@ def start_raylet(redis_address,
"--object-store-name={} "
"--raylet-name={} "
"--redis-address={} "
"--collect-profiling-data={} "
"--temp-dir={}".format(
sys.executable, worker_path, node_ip_address,
plasma_store_name, raylet_name, redis_address,
"1" if collect_profiling_data else "0",
get_temp_root()))
if redis_password:
start_worker_command += " --redis-password {}".format(redis_password)
@@ -1260,6 +1285,8 @@ def start_ray_processes(address_info=None,
num_workers=None,
num_local_schedulers=1,
object_store_memory=None,
redis_max_memory=None,
collect_profiling_data=True,
num_redis_shards=1,
redis_max_clients=None,
redis_password=None,
@@ -1306,6 +1333,14 @@ def start_ray_processes(address_info=None,
address_info.
object_store_memory: The amount of memory (in bytes) to start the
object store with.
redis_max_memory: The max amount of memory (in bytes) to allow redis
to use, or None for no limit. Once the limit is exceeded, redis
will start LRU eviction of entries. This only applies to the
sharded redis tables (task and object tables).
collect_profiling_data: Whether to collect profiling data. Note that
profiling data cannot be LRU evicted, so if you set
redis_max_memory then profiling will also be disabled to prevent
it from consuming all available redis memory.
num_redis_shards: The number of Redis shards to start in addition to
the primary Redis shard.
redis_max_clients: If provided, attempt to configure Redis with this
@@ -1397,7 +1432,8 @@ def start_ray_processes(address_info=None,
redirect_output=True,
redirect_worker_output=redirect_worker_output,
cleanup=cleanup,
password=redis_password)
password=redis_password,
redis_max_memory=redis_max_memory)
address_info["redis_address"] = redis_address
time.sleep(0.1)
@@ -1497,6 +1533,7 @@ def start_ray_processes(address_info=None,
stderr_file=raylet_stderr_file,
cleanup=cleanup,
redis_password=redis_password,
collect_profiling_data=collect_profiling_data,
config=config))
# Try to start the web UI.
@@ -1617,6 +1654,8 @@ def start_ray_head(address_info=None,
num_workers=None,
num_local_schedulers=1,
object_store_memory=None,
redis_max_memory=None,
collect_profiling_data=True,
worker_path=None,
cleanup=True,
redirect_worker_output=False,
@@ -1662,6 +1701,11 @@ def start_ray_head(address_info=None,
address_info.
object_store_memory: The amount of memory (in bytes) to start the
object store with.
redis_max_memory: The max amount of memory (in bytes) to allow redis
to use, or None for no limit. Once the limit is exceeded, redis
will start LRU eviction of entries. This only applies to the
sharded redis tables (task and object tables).
collect_profiling_data: Whether to collect profiling data from workers.
worker_path (str): The path of the source code that will be run by the
worker.
cleanup (bool): If cleanup is true, then the processes started here
@@ -1712,6 +1756,8 @@ def start_ray_head(address_info=None,
num_workers=num_workers,
num_local_schedulers=num_local_schedulers,
object_store_memory=object_store_memory,
redis_max_memory=redis_max_memory,
collect_profiling_data=collect_profiling_data,
worker_path=worker_path,
cleanup=cleanup,
redirect_worker_output=redirect_worker_output,
+38 -4
View File
@@ -217,7 +217,7 @@ class Worker(object):
# When the worker is constructed. Record the original value of the
# CUDA_VISIBLE_DEVICES environment variable.
self.original_gpu_ids = ray.utils.get_cuda_visible_devices()
self.profiler = profiling.Profiler(self)
self.profiler = None
self.memory_monitor = memory_monitor.MemoryMonitor()
self.state_lock = threading.Lock()
# A dictionary that maps from driver id to SerializationContext
@@ -1342,6 +1342,8 @@ def _init(address_info=None,
num_workers=None,
num_local_schedulers=None,
object_store_memory=None,
redis_max_memory=None,
collect_profiling_data=True,
local_mode=False,
driver_mode=None,
redirect_worker_output=False,
@@ -1386,6 +1388,11 @@ def _init(address_info=None,
This is only provided if start_ray_local is True.
object_store_memory: The maximum amount of memory (in bytes) to
allow the object store to use.
redis_max_memory: The max amount of memory (in bytes) to allow redis
to use, or None for no limit. Once the limit is exceeded, redis
will start LRU eviction of entries. This only applies to the
sharded redis tables (task and object tables).
collect_profiling_data: Whether to collect profiling data from workers.
local_mode (bool): True if the code should be executed serially
without Ray. This is useful for debugging.
redirect_worker_output: True if the stdout and stderr of worker
@@ -1440,6 +1447,11 @@ def _init(address_info=None,
else:
driver_mode = SCRIPT_MODE
if redis_max_memory and collect_profiling_data:
logger.warn("Profiling data cannot be LRU evicted, so it is disabled "
"when redis_max_memory is set.")
collect_profiling_data = False
# Get addresses of existing services.
if address_info is None:
address_info = {}
@@ -1479,6 +1491,8 @@ def _init(address_info=None,
num_workers=num_workers,
num_local_schedulers=num_local_schedulers,
object_store_memory=object_store_memory,
redis_max_memory=redis_max_memory,
collect_profiling_data=collect_profiling_data,
redirect_worker_output=redirect_worker_output,
redirect_output=redirect_output,
start_workers_from_local_scheduler=(
@@ -1519,6 +1533,9 @@ def _init(address_info=None,
if object_store_memory is not None:
raise Exception("When connecting to an existing cluster, "
"object_store_memory must not be provided.")
if redis_max_memory is not None:
raise Exception("When connecting to an existing cluster, "
"redis_max_memory must not be provided.")
if plasma_directory is not None:
raise Exception("When connecting to an existing cluster, "
"plasma_directory must not be provided.")
@@ -1556,7 +1573,7 @@ def _init(address_info=None,
"node_ip_address": node_ip_address,
"redis_address": address_info["redis_address"],
"store_socket_name": address_info["object_store_addresses"][0],
"webui_url": address_info["webui_url"]
"webui_url": address_info["webui_url"],
}
driver_address_info["raylet_socket_name"] = (
address_info["raylet_socket_names"][0])
@@ -1569,7 +1586,8 @@ def _init(address_info=None,
mode=driver_mode,
worker=global_worker,
driver_id=driver_id,
redis_password=redis_password)
redis_password=redis_password,
collect_profiling_data=collect_profiling_data)
return address_info
@@ -1578,6 +1596,8 @@ def init(redis_address=None,
num_gpus=None,
resources=None,
object_store_memory=None,
redis_max_memory=None,
collect_profiling_data=True,
node_ip_address=None,
object_id_seed=None,
num_workers=None,
@@ -1634,6 +1654,11 @@ def init(redis_address=None,
of that resource available.
object_store_memory: The amount of memory (in bytes) to start the
object store with.
redis_max_memory: The max amount of memory (in bytes) to allow redis
to use, or None for no limit. Once the limit is exceeded, redis
will start LRU eviction of entries. This only applies to the
sharded redis tables (task and object tables).
collect_profiling_data: Whether to collect profiling data from workers.
node_ip_address (str): The IP address of the node that we are on.
object_id_seed (int): Used to seed the deterministic generation of
object IDs. The same value can be used across multiple runs of the
@@ -1734,6 +1759,8 @@ def init(redis_address=None,
huge_pages=huge_pages,
include_webui=include_webui,
object_store_memory=object_store_memory,
redis_max_memory=redis_max_memory,
collect_profiling_data=collect_profiling_data,
driver_id=driver_id,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
@@ -1913,7 +1940,8 @@ def connect(info,
mode=WORKER_MODE,
worker=global_worker,
driver_id=None,
redis_password=None):
redis_password=None,
collect_profiling_data=True):
"""Connect this worker to the local scheduler, to Plasma, and to Redis.
Args:
@@ -1926,6 +1954,7 @@ def connect(info,
driver_id: The ID of driver. If it's None, then we will generate one.
redis_password (str): Prevents external clients without the password
from connecting to Redis if provided.
collect_profiling_data: Whether to collect profiling data from workers.
"""
# Do some basic checking to make sure we didn't call ray.init twice.
error_message = "Perhaps you called ray.init twice by accident?"
@@ -1935,6 +1964,11 @@ def connect(info,
# Enable nice stack traces on SIGSEGV etc.
faulthandler.enable(all_threads=False)
if collect_profiling_data:
worker.profiler = profiling.Profiler(worker)
else:
worker.profiler = profiling.NoopProfiler()
# Initialize some fields.
if mode is WORKER_MODE:
worker.worker_id = random_string()
+10 -2
View File
@@ -55,6 +55,11 @@ parser.add_argument(
type=str,
default=ray_constants.LOGGER_FORMAT,
help=ray_constants.LOGGER_FORMAT_HELP)
parser.add_argument(
"--collect-profiling-data",
type=int, # int since argparse can't handle bool values
default=1,
help="Whether to collect profiling data from workers.")
parser.add_argument(
"--temp-dir",
required=False,
@@ -71,7 +76,7 @@ if __name__ == "__main__":
"redis_password": args.redis_password,
"store_socket_name": args.object_store_name,
"manager_socket_name": args.object_store_manager_name,
"raylet_socket_name": args.raylet_name
"raylet_socket_name": args.raylet_name,
}
logging.basicConfig(
@@ -82,7 +87,10 @@ if __name__ == "__main__":
tempfile_services.set_temp_root(args.temp_dir)
ray.worker.connect(
info, mode=ray.WORKER_MODE, redis_password=args.redis_password)
info,
mode=ray.WORKER_MODE,
redis_password=args.redis_password,
collect_profiling_data=args.collect_profiling_data)
error_explanation = """
This error is unexpected and should not have happened. Somehow a worker