diff --git a/python/ray/actor.py b/python/ray/actor.py index 8b4cdf312..6bd0da23e 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -768,12 +768,7 @@ def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources, "methods in the `Checkpointable` interface.") if max_reconstructions is None: - if ray_constants.direct_call_enabled(): - # Allow the actor creation task to be resubmitted automatically - # by default. - max_reconstructions = 3 - else: - max_reconstructions = 0 + max_reconstructions = 0 if not (ray_constants.NO_RECONSTRUCTION <= max_reconstructions <= ray_constants.INFINITE_RECONSTRUCTION): diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 198d15b50..caecae452 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -16,11 +16,8 @@ import time import ray import ray.test_utils import ray.cluster_utils -from ray import ray_constants from ray.test_utils import run_string_as_driver -RAY_FORCE_DIRECT = ray_constants.direct_call_enabled() - def test_actor_init_error_propagated(ray_start_regular): @ray.remote @@ -810,7 +807,6 @@ def test_exception_raised_when_actor_node_dies(ray_start_cluster_head): @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Hanging with new GCS API.") -@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="no ft yet") def test_actor_init_fails(ray_start_cluster_head): cluster = ray_start_cluster_head remote_node = cluster.add_node() diff --git a/python/ray/tests/test_component_failures_3.py b/python/ray/tests/test_component_failures_3.py index db6622ce2..029fdcd2d 100644 --- a/python/ray/tests/test_component_failures_3.py +++ b/python/ray/tests/test_component_failures_3.py @@ -12,8 +12,6 @@ import pytest import ray import ray.ray_constants as ray_constants -RAY_FORCE_DIRECT = ray_constants.direct_call_enabled() - @pytest.mark.parametrize( "ray_start_cluster", [{ @@ -64,19 +62,16 @@ def test_actor_creation_node_failure(ray_start_cluster): except ray.exceptions.RayActorError: children[i] = Child.remote(death_probability) - if (RAY_FORCE_DIRECT): - children_out = [ - child.get_probability.remote() for child in children - ] - # Wait for new created actors to finish creation before - # removing a node. This is needed because right now we don't - # support reconstructing actors that died in the process of - # being created. - ready, _ = ray.wait( - children_out, - num_returns=len(children_out), - timeout=5 * 60.0) - assert len(ready) == len(children_out) + children_out = [ + child.get_probability.remote() for child in children + ] + # Wait for new created actors to finish creation before + # removing a node. This is needed because right now we don't + # support reconstructing actors that died in the process of + # being created. + ready, _ = ray.wait( + children_out, num_returns=len(children_out), timeout=5 * 60.0) + assert len(ready) == len(children_out) # Remove a node. Any actor creation tasks that were forwarded to this # node must be reconstructed. diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index b97e98356..b74b67f00 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -888,9 +888,6 @@ def test_fill_object_store_exception(ray_start_cluster_head): ray.put(np.zeros(10**8 + 2, dtype=np.uint8)) -@pytest.mark.skipif( - not RAY_FORCE_DIRECT, - reason="raylet path attempts reconstruction for evicted objects") @pytest.mark.parametrize( "ray_start_cluster", [{ "num_nodes": 1, @@ -925,9 +922,6 @@ def test_direct_call_eviction(ray_start_cluster): ray.get(dependent_task.remote(obj)) -@pytest.mark.skipif( - not RAY_FORCE_DIRECT, - reason="raylet path attempts reconstruction for evicted objects") @pytest.mark.parametrize( "ray_start_cluster", [{ "num_nodes": 1, diff --git a/python/ray/tests/test_multinode_failures_2.py b/python/ray/tests/test_multinode_failures_2.py index b9bf15e44..879aa9231 100644 --- a/python/ray/tests/test_multinode_failures_2.py +++ b/python/ray/tests/test_multinode_failures_2.py @@ -82,7 +82,6 @@ def test_object_reconstruction(ray_start_cluster): ray.get(xs) -@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="no actor restart yet") @pytest.mark.parametrize( "ray_start_cluster", [{ "num_cpus": 4, @@ -105,7 +104,7 @@ def test_actor_creation_node_failure(ray_start_cluster): if exit_chance < self.death_probability: sys.exit(-1) - num_children = 50 + num_children = 25 # Children actors will die about half the time. death_probability = 0.5 diff --git a/python/ray/tests/test_signal.py b/python/ray/tests/test_signal.py index d9913ed21..078fe5a3f 100644 --- a/python/ray/tests/test_signal.py +++ b/python/ray/tests/test_signal.py @@ -1,7 +1,6 @@ import pytest import time -from ray import ray_constants import ray import ray.experimental.signal as signal @@ -276,9 +275,6 @@ def test_forget(ray_start_regular): assert len(result_list) == count -@pytest.mark.skipif( - ray_constants.direct_call_enabled(), - reason="TODO(ekl): this requires reconstruction") def test_signal_on_node_failure(two_node_cluster): """Test actor checkpointing on a remote node.""" @@ -395,6 +391,5 @@ def test_small_receive_timeout(ray_start_regular): if __name__ == "__main__": - import pytest import sys sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 962b4fa98..8538fb44e 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -49,6 +49,10 @@ RAY_CONFIG(bool, new_scheduler_enabled, false) // Objects larger than this size will be spilled/promoted to plasma. RAY_CONFIG(int64_t, max_direct_call_object_size, 100 * 1024) +// The min number of retries for direct actor creation tasks. The actual number +// of creation retries will be MAX(actor_creation_min_retries, max_reconstructions). +RAY_CONFIG(uint64_t, actor_creation_min_retries, 3) + /// The initial period for a task execution lease. The lease will expire this /// many milliseconds after the first acquisition of the lease. Nodes that /// require an object will not try to reconstruct the task until at least diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index f2deefdda..0a3783d67 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -650,8 +650,10 @@ Status CoreWorker::CreateActor(const RayFunction &function, *return_actor_id = actor_id; TaskSpecification task_spec = builder.Build(); if (actor_creation_options.is_direct_call) { - task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, - actor_creation_options.max_reconstructions); + task_manager_->AddPendingTask( + GetCallerId(), rpc_address_, task_spec, + std::max(RayConfig::instance().actor_creation_min_retries(), + actor_creation_options.max_reconstructions)); return direct_task_submitter_->SubmitTask(task_spec); } else { return local_raylet_client_->SubmitTask(task_spec);