[core] Add flag to enable object reconstruction during ray start (#9488)

* Add flag

* doc

* Fix tests
This commit is contained in:
Stephanie Wang
2020-07-17 10:13:14 -07:00
committed by GitHub
parent f080aa6ce3
commit b351d13940
6 changed files with 119 additions and 35 deletions
+16
View File
@@ -92,6 +92,8 @@ class RayParams:
_internal_config (str): JSON configuration for overriding
RayConfig defaults. For testing purposes ONLY.
lru_evict (bool): Enable LRU eviction if space is needed.
enable_object_reconstruction (bool): Enable plasma reconstruction on
failure.
"""
def __init__(self,
@@ -135,6 +137,7 @@ class RayParams:
java_worker_options=None,
load_code_from_local=False,
_internal_config=None,
enable_object_reconstruction=False,
metrics_agent_port=None,
lru_evict=False):
self.object_ref_seed = object_ref_seed
@@ -177,6 +180,7 @@ class RayParams:
self.metrics_agent_port = metrics_agent_port
self._internal_config = _internal_config
self._lru_evict = lru_evict
self._enable_object_reconstruction = enable_object_reconstruction
self._check_usage()
# Set the internal config options for LRU eviction.
@@ -191,6 +195,18 @@ class RayParams:
self._internal_config["object_store_full_max_retries"] = -1
self._internal_config["free_objects_period_milliseconds"] = 1000
# Set the internal config options for object reconstruction.
if enable_object_reconstruction:
# Turn off object pinning.
if self._internal_config is None:
self._internal_config = dict()
if lru_evict:
raise Exception(
"Object reconstruction cannot be enabled if using LRU "
"eviction.")
self._internal_config["lineage_pinning_enabled"] = True
self._internal_config["free_objects_period_milliseconds"] = -1
def update(self, **kwargs):
"""Update the settings according to the keyword arguments.
+9 -2
View File
@@ -341,6 +341,12 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port):
is_flag=True,
default=False,
help="Specify whether LRU evict will be used for this cluster.")
@click.option(
"--enable-object-reconstruction",
is_flag=True,
default=False,
help="Specify whether object reconstruction will be used for this cluster."
)
def start(node_ip_address, redis_address, address, redis_port, port,
num_redis_shards, redis_max_clients, redis_password,
redis_shard_ports, object_manager_port, node_manager_port,
@@ -351,7 +357,7 @@ def start(node_ip_address, redis_address, address, redis_port, port,
autoscaling_config, no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir, include_java,
java_worker_options, load_code_from_local, internal_config,
lru_evict):
lru_evict, enable_object_reconstruction):
"""Start Ray processes manually on the local machine."""
if gcs_server_port and not head:
raise ValueError(
@@ -429,7 +435,8 @@ def start(node_ip_address, redis_address, address, redis_port, port,
java_worker_options=java_worker_options,
load_code_from_local=load_code_from_local,
_internal_config=internal_config,
lru_evict=lru_evict)
lru_evict=lru_evict,
enable_object_reconstruction=enable_object_reconstruction)
if head:
# Start Ray on the head node.
if redis_shard_ports is not None:
+62 -29
View File
@@ -52,16 +52,22 @@ def test_cached_object(ray_start_cluster):
@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_reconstruction_cached_dependency(ray_start_cluster,
reconstruction_enabled):
config = json.dumps({
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
"lineage_pinning_enabled": 1 if reconstruction_enabled else 0,
"free_objects_period_milliseconds": -1,
"initial_reconstruction_timeout_milliseconds": 200,
})
}
# Workaround to reset the config to the default value.
if not reconstruction_enabled:
config["lineage_pinning_enabled"] = 0
config = json.dumps(config)
cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(num_cpus=0, _internal_config=config)
cluster.add_node(
num_cpus=0,
_internal_config=config,
enable_object_reconstruction=reconstruction_enabled)
ray.init(address=cluster.address)
# Node to place the initial object.
node_to_kill = cluster.add_node(
@@ -106,16 +112,22 @@ def test_reconstruction_cached_dependency(ray_start_cluster,
@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled):
config = json.dumps({
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
"lineage_pinning_enabled": 1 if reconstruction_enabled else 0,
"free_objects_period_milliseconds": -1,
"initial_reconstruction_timeout_milliseconds": 200,
})
}
# Workaround to reset the config to the default value.
if not reconstruction_enabled:
config["lineage_pinning_enabled"] = 0
config = json.dumps(config)
cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(num_cpus=0, _internal_config=config)
cluster.add_node(
num_cpus=0,
_internal_config=config,
enable_object_reconstruction=reconstruction_enabled)
ray.init(address=cluster.address)
# Node to place the initial object.
node_to_kill = cluster.add_node(
@@ -150,16 +162,22 @@ def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled):
@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_basic_reconstruction_put(ray_start_cluster, reconstruction_enabled):
config = json.dumps({
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
"lineage_pinning_enabled": 1 if reconstruction_enabled else 0,
"free_objects_period_milliseconds": -1,
"initial_reconstruction_timeout_milliseconds": 200,
})
}
# Workaround to reset the config to the default value.
if not reconstruction_enabled:
config["lineage_pinning_enabled"] = 0
config = json.dumps(config)
cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(num_cpus=0, _internal_config=config)
cluster.add_node(
num_cpus=0,
_internal_config=config,
enable_object_reconstruction=reconstruction_enabled)
ray.init(address=cluster.address)
# Node to place the initial object.
node_to_kill = cluster.add_node(
@@ -191,22 +209,32 @@ def test_basic_reconstruction_put(ray_start_cluster, reconstruction_enabled):
if reconstruction_enabled:
ray.get(result)
else:
with pytest.raises(ray.exceptions.UnreconstructableError):
# The copy that we fetched earlier may still be local or it may have
# been evicted.
try:
ray.get(result)
except ray.exceptions.UnreconstructableError:
pass
@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled):
config = json.dumps({
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
"lineage_pinning_enabled": 1 if reconstruction_enabled else 0,
"free_objects_period_milliseconds": -1,
"initial_reconstruction_timeout_milliseconds": 200,
})
}
# Workaround to reset the config to the default value.
if not reconstruction_enabled:
config["lineage_pinning_enabled"] = 0
config = json.dumps(config)
cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(num_cpus=0, _internal_config=config)
cluster.add_node(
num_cpus=0,
_internal_config=config,
enable_object_reconstruction=reconstruction_enabled)
ray.init(address=cluster.address)
# Node to place the initial object.
node_to_kill = cluster.add_node(
@@ -252,17 +280,23 @@ def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled):
@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_reconstruction_chain(ray_start_cluster, reconstruction_enabled):
config = json.dumps({
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
"lineage_pinning_enabled": 1 if reconstruction_enabled else 0,
"free_objects_period_milliseconds": -1,
"initial_reconstruction_timeout_milliseconds": 200,
})
}
# Workaround to reset the config to the default value.
if not reconstruction_enabled:
config["lineage_pinning_enabled"] = 0
config = json.dumps(config)
cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(
num_cpus=0, _internal_config=config, object_store_memory=10**8)
num_cpus=0,
_internal_config=config,
object_store_memory=10**8,
enable_object_reconstruction=reconstruction_enabled)
ray.init(address=cluster.address)
node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8)
cluster.wait_for_nodes()
@@ -300,15 +334,14 @@ def test_reconstruction_stress(ray_start_cluster):
config = json.dumps({
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
"lineage_pinning_enabled": 1,
"free_objects_period_milliseconds": -1,
"max_direct_call_object_size": 100,
"task_retry_delay_ms": 100,
"initial_reconstruction_timeout_milliseconds": 200,
})
cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(num_cpus=0, _internal_config=config)
cluster.add_node(
num_cpus=0, _internal_config=config, enable_object_reconstruction=True)
ray.init(address=cluster.address)
# Node to place the initial object.
node_to_kill = cluster.add_node(
+16 -3
View File
@@ -496,7 +496,8 @@ def init(address=None,
java_worker_options=None,
use_pickle=True,
_internal_config=None,
lru_evict=False):
lru_evict=False,
enable_object_reconstruction=False):
"""
Connect to an existing Ray cluster or start one and connect to it.
@@ -615,6 +616,12 @@ def init(address=None,
reference counting will be used to decide which objects are safe
to evict and when under memory pressure, ray.ObjectStoreFullError
may be thrown.
enable_object_reconstruction (bool): If True, when an object stored in
the distributed plasma store is lost due to node failure, Ray will
attempt to reconstruct the object by re-executing the task that
created the object. Arguments to the task will be recursively
reconstructed. If False, then ray.UnreconstructableError will be
thrown.
Returns:
Address information about the started processes.
@@ -707,7 +714,8 @@ def init(address=None,
load_code_from_local=load_code_from_local,
java_worker_options=java_worker_options,
_internal_config=_internal_config,
lru_evict=lru_evict)
lru_evict=lru_evict,
enable_object_reconstruction=enable_object_reconstruction)
# Start the Ray processes. We set shutdown_at_exit=False because we
# shutdown the node in the ray.shutdown call that happens in the atexit
# handler. We still spawn a reaper process in case the atexit handler
@@ -765,6 +773,10 @@ def init(address=None,
if lru_evict:
raise ValueError("When connecting to an existing cluster, "
"lru_evict must not be provided.")
if enable_object_reconstruction:
raise ValueError(
"When connecting to an existing cluster, "
"enable_object_reconstruction must not be provided.")
# In this case, we only need to connect the node.
ray_params = ray.parameter.RayParams(
@@ -776,7 +788,8 @@ def init(address=None,
temp_dir=temp_dir,
load_code_from_local=load_code_from_local,
_internal_config=_internal_config,
lru_evict=lru_evict)
lru_evict=lru_evict,
enable_object_reconstruction=enable_object_reconstruction)
_global_node = ray.node.Node(
ray_params,
head=False,