diff --git a/doc/source/fault-tolerance.rst b/doc/source/fault-tolerance.rst index 884aa8bc6..39e569b2e 100644 --- a/doc/source/fault-tolerance.rst +++ b/doc/source/fault-tolerance.rst @@ -10,7 +10,7 @@ When a worker is executing a task, if the worker dies unexpectedly, either because the process crashed or because the machine failed, Ray will rerun the task (after a delay of several seconds) until either the task succeeds or the maximum number of retries is exceeded. The default number of retries -is 4. +is 3. You can experiment with this behavior by running the following code. @@ -41,6 +41,19 @@ You can experiment with this behavior by running the following code. except ray.exceptions.RayWorkerError: print('FAILURE') +Task outputs over a configurable threshold (default 100KB) may be stored in +Ray's distributed object store. Thus, a node failure can cause the loss of a +task output. If this occurs, Ray will automatically attempt to recover the +value by looking for copies of the same object on other nodes. If there are no +other copies left, an ``UnreconstructableError`` will be raised. + +When there are no copies of an object left, Ray also provides an option to +automatically recover the value by re-executing the task that created the +value. Arguments to the task are recursively reconstructed with the same +method. This option can be enabled with +``ray.init(enable_object_reconstruction=True)`` in standalone mode or ``ray +start --enable-object-reconstruction`` in cluster mode. + Actors ------ diff --git a/python/ray/parameter.py b/python/ray/parameter.py index b95b429aa..7baa8562b 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -92,6 +92,8 @@ class RayParams: _internal_config (str): JSON configuration for overriding RayConfig defaults. For testing purposes ONLY. lru_evict (bool): Enable LRU eviction if space is needed. + enable_object_reconstruction (bool): Enable plasma reconstruction on + failure. """ def __init__(self, @@ -135,6 +137,7 @@ class RayParams: java_worker_options=None, load_code_from_local=False, _internal_config=None, + enable_object_reconstruction=False, metrics_agent_port=None, lru_evict=False): self.object_ref_seed = object_ref_seed @@ -177,6 +180,7 @@ class RayParams: self.metrics_agent_port = metrics_agent_port self._internal_config = _internal_config self._lru_evict = lru_evict + self._enable_object_reconstruction = enable_object_reconstruction self._check_usage() # Set the internal config options for LRU eviction. @@ -191,6 +195,18 @@ class RayParams: self._internal_config["object_store_full_max_retries"] = -1 self._internal_config["free_objects_period_milliseconds"] = 1000 + # Set the internal config options for object reconstruction. + if enable_object_reconstruction: + # Turn off object pinning. + if self._internal_config is None: + self._internal_config = dict() + if lru_evict: + raise Exception( + "Object reconstruction cannot be enabled if using LRU " + "eviction.") + self._internal_config["lineage_pinning_enabled"] = True + self._internal_config["free_objects_period_milliseconds"] = -1 + def update(self, **kwargs): """Update the settings according to the keyword arguments. diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index ad491c29e..eb1589e2f 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -341,6 +341,12 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port): is_flag=True, default=False, help="Specify whether LRU evict will be used for this cluster.") +@click.option( + "--enable-object-reconstruction", + is_flag=True, + default=False, + help="Specify whether object reconstruction will be used for this cluster." +) def start(node_ip_address, redis_address, address, redis_port, port, num_redis_shards, redis_max_clients, redis_password, redis_shard_ports, object_manager_port, node_manager_port, @@ -351,7 +357,7 @@ def start(node_ip_address, redis_address, address, redis_port, port, autoscaling_config, no_redirect_worker_output, no_redirect_output, plasma_store_socket_name, raylet_socket_name, temp_dir, include_java, java_worker_options, load_code_from_local, internal_config, - lru_evict): + lru_evict, enable_object_reconstruction): """Start Ray processes manually on the local machine.""" if gcs_server_port and not head: raise ValueError( @@ -429,7 +435,8 @@ def start(node_ip_address, redis_address, address, redis_port, port, java_worker_options=java_worker_options, load_code_from_local=load_code_from_local, _internal_config=internal_config, - lru_evict=lru_evict) + lru_evict=lru_evict, + enable_object_reconstruction=enable_object_reconstruction) if head: # Start Ray on the head node. if redis_shard_ports is not None: diff --git a/python/ray/tests/test_reconstruction.py b/python/ray/tests/test_reconstruction.py index 42f76d390..0c19e81e7 100644 --- a/python/ray/tests/test_reconstruction.py +++ b/python/ray/tests/test_reconstruction.py @@ -52,16 +52,22 @@ def test_cached_object(ray_start_cluster): @pytest.mark.parametrize("reconstruction_enabled", [False, True]) def test_reconstruction_cached_dependency(ray_start_cluster, reconstruction_enabled): - config = json.dumps({ + config = { "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, - "lineage_pinning_enabled": 1 if reconstruction_enabled else 0, - "free_objects_period_milliseconds": -1, "initial_reconstruction_timeout_milliseconds": 200, - }) + } + # Workaround to reset the config to the default value. + if not reconstruction_enabled: + config["lineage_pinning_enabled"] = 0 + config = json.dumps(config) + cluster = ray_start_cluster # Head node with no resources. - cluster.add_node(num_cpus=0, _internal_config=config) + cluster.add_node( + num_cpus=0, + _internal_config=config, + enable_object_reconstruction=reconstruction_enabled) ray.init(address=cluster.address) # Node to place the initial object. node_to_kill = cluster.add_node( @@ -106,16 +112,22 @@ def test_reconstruction_cached_dependency(ray_start_cluster, @pytest.mark.parametrize("reconstruction_enabled", [False, True]) def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled): - config = json.dumps({ + config = { "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, - "lineage_pinning_enabled": 1 if reconstruction_enabled else 0, - "free_objects_period_milliseconds": -1, "initial_reconstruction_timeout_milliseconds": 200, - }) + } + # Workaround to reset the config to the default value. + if not reconstruction_enabled: + config["lineage_pinning_enabled"] = 0 + config = json.dumps(config) + cluster = ray_start_cluster # Head node with no resources. - cluster.add_node(num_cpus=0, _internal_config=config) + cluster.add_node( + num_cpus=0, + _internal_config=config, + enable_object_reconstruction=reconstruction_enabled) ray.init(address=cluster.address) # Node to place the initial object. node_to_kill = cluster.add_node( @@ -150,16 +162,22 @@ def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled): @pytest.mark.parametrize("reconstruction_enabled", [False, True]) def test_basic_reconstruction_put(ray_start_cluster, reconstruction_enabled): - config = json.dumps({ + config = { "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, - "lineage_pinning_enabled": 1 if reconstruction_enabled else 0, - "free_objects_period_milliseconds": -1, "initial_reconstruction_timeout_milliseconds": 200, - }) + } + # Workaround to reset the config to the default value. + if not reconstruction_enabled: + config["lineage_pinning_enabled"] = 0 + config = json.dumps(config) + cluster = ray_start_cluster # Head node with no resources. - cluster.add_node(num_cpus=0, _internal_config=config) + cluster.add_node( + num_cpus=0, + _internal_config=config, + enable_object_reconstruction=reconstruction_enabled) ray.init(address=cluster.address) # Node to place the initial object. node_to_kill = cluster.add_node( @@ -191,22 +209,32 @@ def test_basic_reconstruction_put(ray_start_cluster, reconstruction_enabled): if reconstruction_enabled: ray.get(result) else: - with pytest.raises(ray.exceptions.UnreconstructableError): + # The copy that we fetched earlier may still be local or it may have + # been evicted. + try: ray.get(result) + except ray.exceptions.UnreconstructableError: + pass @pytest.mark.parametrize("reconstruction_enabled", [False, True]) def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled): - config = json.dumps({ + config = { "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, - "lineage_pinning_enabled": 1 if reconstruction_enabled else 0, - "free_objects_period_milliseconds": -1, "initial_reconstruction_timeout_milliseconds": 200, - }) + } + # Workaround to reset the config to the default value. + if not reconstruction_enabled: + config["lineage_pinning_enabled"] = 0 + config = json.dumps(config) + cluster = ray_start_cluster # Head node with no resources. - cluster.add_node(num_cpus=0, _internal_config=config) + cluster.add_node( + num_cpus=0, + _internal_config=config, + enable_object_reconstruction=reconstruction_enabled) ray.init(address=cluster.address) # Node to place the initial object. node_to_kill = cluster.add_node( @@ -252,17 +280,23 @@ def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled): @pytest.mark.parametrize("reconstruction_enabled", [False, True]) def test_reconstruction_chain(ray_start_cluster, reconstruction_enabled): - config = json.dumps({ + config = { "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, - "lineage_pinning_enabled": 1 if reconstruction_enabled else 0, - "free_objects_period_milliseconds": -1, "initial_reconstruction_timeout_milliseconds": 200, - }) + } + # Workaround to reset the config to the default value. + if not reconstruction_enabled: + config["lineage_pinning_enabled"] = 0 + config = json.dumps(config) + cluster = ray_start_cluster # Head node with no resources. cluster.add_node( - num_cpus=0, _internal_config=config, object_store_memory=10**8) + num_cpus=0, + _internal_config=config, + object_store_memory=10**8, + enable_object_reconstruction=reconstruction_enabled) ray.init(address=cluster.address) node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8) cluster.wait_for_nodes() @@ -300,15 +334,14 @@ def test_reconstruction_stress(ray_start_cluster): config = json.dumps({ "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, - "lineage_pinning_enabled": 1, - "free_objects_period_milliseconds": -1, "max_direct_call_object_size": 100, "task_retry_delay_ms": 100, "initial_reconstruction_timeout_milliseconds": 200, }) cluster = ray_start_cluster # Head node with no resources. - cluster.add_node(num_cpus=0, _internal_config=config) + cluster.add_node( + num_cpus=0, _internal_config=config, enable_object_reconstruction=True) ray.init(address=cluster.address) # Node to place the initial object. node_to_kill = cluster.add_node( diff --git a/python/ray/worker.py b/python/ray/worker.py index b29506379..6e62b3bd4 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -496,7 +496,8 @@ def init(address=None, java_worker_options=None, use_pickle=True, _internal_config=None, - lru_evict=False): + lru_evict=False, + enable_object_reconstruction=False): """ Connect to an existing Ray cluster or start one and connect to it. @@ -615,6 +616,12 @@ def init(address=None, reference counting will be used to decide which objects are safe to evict and when under memory pressure, ray.ObjectStoreFullError may be thrown. + enable_object_reconstruction (bool): If True, when an object stored in + the distributed plasma store is lost due to node failure, Ray will + attempt to reconstruct the object by re-executing the task that + created the object. Arguments to the task will be recursively + reconstructed. If False, then ray.UnreconstructableError will be + thrown. Returns: Address information about the started processes. @@ -707,7 +714,8 @@ def init(address=None, load_code_from_local=load_code_from_local, java_worker_options=java_worker_options, _internal_config=_internal_config, - lru_evict=lru_evict) + lru_evict=lru_evict, + enable_object_reconstruction=enable_object_reconstruction) # Start the Ray processes. We set shutdown_at_exit=False because we # shutdown the node in the ray.shutdown call that happens in the atexit # handler. We still spawn a reaper process in case the atexit handler @@ -765,6 +773,10 @@ def init(address=None, if lru_evict: raise ValueError("When connecting to an existing cluster, " "lru_evict must not be provided.") + if enable_object_reconstruction: + raise ValueError( + "When connecting to an existing cluster, " + "enable_object_reconstruction must not be provided.") # In this case, we only need to connect the node. ray_params = ray.parameter.RayParams( @@ -776,7 +788,8 @@ def init(address=None, temp_dir=temp_dir, load_code_from_local=load_code_from_local, _internal_config=_internal_config, - lru_evict=lru_evict) + lru_evict=lru_evict, + enable_object_reconstruction=enable_object_reconstruction) _global_node = ray.node.Node( ray_params, head=False, diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index d4991b540..ce2f9615b 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -390,6 +390,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ absl::MutexLock lock(&mutex_); to_resubmit_.push_back(std::make_pair(current_time_ms() + delay, spec)); } else { + RAY_LOG(ERROR) << "Resubmitting task that produced lost plasma object: " + << spec.DebugString(); RAY_CHECK_OK(direct_task_submitter_->SubmitTask(spec)); } },