diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index a6b9c381b..227fb48d2 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -1,3 +1,4 @@ +import asyncio import collections import numpy as np import os @@ -211,6 +212,66 @@ def test_actor_restart_with_retry(ray_init_with_task_retry_delay): ray.get(actor.increase.remote()) +def test_named_actor_max_task_retries(ray_init_with_task_retry_delay): + @ray.remote(num_cpus=0) + class Counter: + def __init__(self): + self.count = 0 + self.event = asyncio.Event() + + def increment(self): + self.count += 1 + self.event.set() + + async def wait_for_count(self, count): + while True: + if self.count >= count: + return + await self.event.wait() + self.event.clear() + + @ray.remote + class ActorToKill: + def __init__(self, counter): + counter.increment.remote() + + def run(self, counter, signal): + counter.increment.remote() + ray.get(signal.wait.remote()) + + @ray.remote + class CallingActor: + def __init__(self): + self.actor = ray.get_actor("a") + + def call_other(self, counter, signal): + return ray.get(self.actor.run.remote(counter, signal)) + + init_counter = Counter.remote() + run_counter = Counter.remote() + signal = SignalActor.remote() + + # Start the two actors, wait for ActorToKill's constructor to run. + a = ActorToKill.options( + name="a", max_restarts=-1, max_task_retries=-1).remote(init_counter) + c = CallingActor.remote() + ray.get(init_counter.wait_for_count.remote(1), timeout=30) + + # Signal the CallingActor to call ActorToKill, wait for it to be running, + # then kill ActorToKill. + # Verify that this causes ActorToKill's constructor to run a second time + # and the run method to begin a second time. + ref = c.call_other.remote(run_counter, signal) + ray.get(run_counter.wait_for_count.remote(1), timeout=30) + ray.kill(a, no_restart=False) + ray.get(init_counter.wait_for_count.remote(2), timeout=30) + ray.get(run_counter.wait_for_count.remote(2), timeout=30) + + # Signal the run method to finish, verify that the CallingActor returns. + signal.send.remote() + ray.get(ref, timeout=30) + + def test_actor_restart_on_node_failure(ray_start_cluster): config = { "num_heartbeats_timeout": 10, diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index 32ef5e3c8..a315762e1 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -148,7 +148,7 @@ class TaskSpecBuilder { /// /// \return Reference to the builder object itself. TaskSpecBuilder &SetActorCreationTaskSpec( - const ActorID &actor_id, int64_t max_restarts = 0, + const ActorID &actor_id, int64_t max_restarts = 0, int64_t max_task_retries = 0, const std::vector &dynamic_worker_options = {}, int max_concurrency = 1, bool is_detached = false, std::string name = "", bool is_asyncio = false, const std::string &extension_data = "") { @@ -156,6 +156,7 @@ class TaskSpecBuilder { auto actor_creation_spec = message_->mutable_actor_creation_task_spec(); actor_creation_spec->set_actor_id(actor_id.Binary()); actor_creation_spec->set_max_actor_restarts(max_restarts); + actor_creation_spec->set_max_task_retries(max_task_retries); for (const auto &option : dynamic_worker_options) { actor_creation_spec->add_dynamic_worker_options(option); } diff --git a/src/ray/core_worker/actor_handle.cc b/src/ray/core_worker/actor_handle.cc index c59516080..73e56df54 100644 --- a/src/ray/core_worker/actor_handle.cc +++ b/src/ray/core_worker/actor_handle.cc @@ -58,6 +58,8 @@ ray::rpc::ActorHandle CreateInnerActorHandleFromActorTableData( inner.set_actor_cursor(task_spec.ReturnId(0).Binary()); inner.set_extension_data( actor_table_data.task_spec().actor_creation_task_spec().extension_data()); + inner.set_max_task_retries( + actor_table_data.task_spec().actor_creation_task_spec().max_task_retries()); return inner; } diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index af901f8d6..41b0341e0 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1377,6 +1377,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, "", /* debugger_breakpoint */ override_environment_variables); builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_restarts, + actor_creation_options.max_task_retries, actor_creation_options.dynamic_worker_options, actor_creation_options.max_concurrency, actor_creation_options.is_detached, actor_name, diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index 5d152c1f2..68988bf19 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -42,7 +42,8 @@ struct Mocker { Language::PYTHON, empty_descriptor, job_id, TaskID::Nil(), 0, TaskID::Nil(), owner_address, 1, resource, resource, std::make_pair(PlacementGroupID::Nil(), -1), true, ""); - builder.SetActorCreationTaskSpec(actor_id, max_restarts, {}, 1, detached, name); + builder.SetActorCreationTaskSpec(actor_id, max_restarts, /*max_task_retries=*/0, {}, + 1, detached, name); return builder.Build(); } diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index ec64b6674..d9337708d 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -258,24 +258,29 @@ message TaskArg { message ActorCreationTaskSpec { // ID of the actor that will be created by this task. bytes actor_id = 2; - // The max number of times this actor should be recontructed. + // The max number of times this actor should be restarted. // If this number is 0 the actor won't be restarted. // If this number is -1 the actor will be restarted indefinitely. int64 max_actor_restarts = 3; + // The max number of times tasks submitted on this actor should be retried + // if the actor fails and is restarted. + // If this number is 0 the tasks won't be resubmitted. + // If this number is -1 the tasks will be resubmitted indefinitely. + int64 max_task_retries = 4; // The dynamic options used in the worker command when starting a worker process for // an actor creation task. If the list isn't empty, the options will be used to replace // the placeholder string `RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER` in the worker command. - repeated string dynamic_worker_options = 4; + repeated string dynamic_worker_options = 5; // The max number of concurrent calls for direct call actors. - int32 max_concurrency = 5; + int32 max_concurrency = 6; // Whether the actor is persistent. - bool is_detached = 6; + bool is_detached = 7; // Globally-unique name of the actor. Should only be populated when is_detached is true. - string name = 7; + string name = 8; // Whether the actor use async actor calls. - bool is_asyncio = 8; + bool is_asyncio = 9; // Field used for storing application-level extensions to the actor definition. - string extension_data = 9; + string extension_data = 10; } // Task spec of an actor task. diff --git a/src/ray/raylet/task_dependency_manager_test.cc b/src/ray/raylet/task_dependency_manager_test.cc index b8c1c9b31..99f6d5622 100644 --- a/src/ray/raylet/task_dependency_manager_test.cc +++ b/src/ray/raylet/task_dependency_manager_test.cc @@ -70,7 +70,7 @@ static inline Task ExampleTask(const std::vector &arguments, JobID::Nil(), RandomTaskId(), 0, RandomTaskId(), address, num_returns, {}, {}, std::make_pair(PlacementGroupID::Nil(), -1), true, ""); - builder.SetActorCreationTaskSpec(ActorID::Nil(), 1, {}, 1, false, "", false); + builder.SetActorCreationTaskSpec(ActorID::Nil(), 1, 1, {}, 1, false, "", false); for (const auto &arg : arguments) { builder.AddArg(TaskArgByReference(arg, rpc::Address())); }