Allow --lru-evict to be passed into ray start (#8959)

This commit is contained in:
Ian Rodney
2020-07-13 14:09:39 -07:00
committed by GitHub
parent 4454d05bcf
commit 0085cf75d0
4 changed files with 32 additions and 14 deletions
+4
View File
@@ -98,6 +98,10 @@ class Node:
raise ValueError(
"Internal config parameters can only be set on the head node.")
if ray_params._lru_evict:
assert (connect_only or
head), "LRU Evict can only be passed into the head node."
self._raylet_ip_address = raylet_ip_address
ray_params.update_if_absent(
+16 -1
View File
@@ -89,6 +89,7 @@ class RayParams:
load_code_from_local: Whether load code from local file or from GCS.
_internal_config (str): JSON configuration for overriding
RayConfig defaults. For testing purposes ONLY.
lru_evict (bool): Enable LRU eviction if space is needed.
"""
def __init__(self,
@@ -130,7 +131,8 @@ class RayParams:
include_java=False,
java_worker_options=None,
load_code_from_local=False,
_internal_config=None):
_internal_config=None,
lru_evict=False):
self.object_ref_seed = object_ref_seed
self.redis_address = redis_address
self.num_cpus = num_cpus
@@ -168,8 +170,21 @@ class RayParams:
self.java_worker_options = java_worker_options
self.load_code_from_local = load_code_from_local
self._internal_config = _internal_config
self._lru_evict = lru_evict
self._check_usage()
# Set the internal config options for LRU eviction.
if lru_evict:
# Turn off object pinning.
if self._internal_config is None:
self._internal_config = dict()
if self._internal_config.get("object_pinning_enabled", False):
raise Exception(
"Object pinning cannot be enabled if using LRU eviction.")
self._internal_config["object_pinning_enabled"] = False
self._internal_config["object_store_full_max_retries"] = -1
self._internal_config["free_objects_period_milliseconds"] = 1000
def update(self, **kwargs):
"""Update the settings according to the keyword arguments.
+9 -2
View File
@@ -331,6 +331,11 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port):
is_flag=True,
default=False,
help="Specify whether load code from local file or GCS serialization.")
@click.option(
"--lru-evict",
is_flag=True,
default=False,
help="Specify whether LRU evict will be used for this cluster.")
def start(node_ip_address, redis_address, address, redis_port, port,
num_redis_shards, redis_max_clients, redis_password,
redis_shard_ports, object_manager_port, node_manager_port,
@@ -340,7 +345,8 @@ def start(node_ip_address, redis_address, address, redis_port, port,
plasma_directory, huge_pages, autoscaling_config,
no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir, include_java,
java_worker_options, load_code_from_local, internal_config):
java_worker_options, load_code_from_local, internal_config,
lru_evict):
"""Start Ray processes manually on the local machine."""
if redis_address is not None:
raise DeprecationWarning("The --redis-address argument is "
@@ -412,7 +418,8 @@ def start(node_ip_address, redis_address, address, redis_port, port,
dashboard_port=dashboard_port,
java_worker_options=java_worker_options,
load_code_from_local=load_code_from_local,
_internal_config=internal_config)
_internal_config=internal_config,
lru_evict=lru_evict)
if head:
# Start Ray on the head node.
if redis_shard_ports is not None:
+3 -11
View File
@@ -673,15 +673,6 @@ def init(address=None,
_internal_config = (json.loads(_internal_config)
if _internal_config else {})
# Set the internal config options for LRU eviction.
if lru_evict:
# Turn off object pinning.
if _internal_config.get("object_pinning_enabled", False):
raise Exception(
"Object pinning cannot be enabled if using LRU eviction.")
_internal_config["object_pinning_enabled"] = False
_internal_config["object_store_full_max_retries"] = -1
_internal_config["free_objects_period_milliseconds"] = 1000
global _global_node
if redis_address is None:
@@ -716,7 +707,7 @@ def init(address=None,
load_code_from_local=load_code_from_local,
java_worker_options=java_worker_options,
_internal_config=_internal_config,
)
lru_evict=lru_evict)
# Start the Ray processes. We set shutdown_at_exit=False because we
# shutdown the node in the ray.shutdown call that happens in the atexit
# handler. We still spawn a reaper process in case the atexit handler
@@ -784,7 +775,8 @@ def init(address=None,
object_ref_seed=object_ref_seed,
temp_dir=temp_dir,
load_code_from_local=load_code_from_local,
_internal_config=_internal_config)
_internal_config=_internal_config,
lru_evict=lru_evict)
_global_node = ray.node.Node(
ray_params,
head=False,