Rename max_reconstructions to max_restarts and use -1 for infinite (#8274)

Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
This commit is contained in:
Max Fitton
2020-05-14 08:30:29 -07:00
committed by GitHub
parent 5f4c196fed
commit 00325eb2b2
71 changed files with 403 additions and 393 deletions
+10 -11
View File
@@ -45,15 +45,14 @@ You can experiment with this behavior by running the following code.
Actors
------
If an actor process crashes unexpectedly, Ray will attempt to reconstruct the
actor process up to a maximum number of times. This value can be specified with
the ``max_reconstructions`` keyword, which by default is ``0``. If the maximum
number of reconstructions has been used up, then subsequent actor methods will
raise exceptions.
When an actor is reconstructed, its state will be recreated by rerunning its
Ray will automatically restart actors that crash unexpectedly.
This behavior is controlled using ``max_restarts``,
which sets the maximum number of times that an actor will be restarted.
If 0, the actor won't be restarted. If -1, it will be restarted infinitely.
When an actor is restarted, its state will be recreated by rerunning its
constructor.
After the specified number of restarts, subsequent actor methods will
raise a ``RayActorError``.
You can experiment with this behavior by running the following code.
.. code-block:: python
@@ -64,7 +63,7 @@ You can experiment with this behavior by running the following code.
ray.init(ignore_reinit_error=True)
@ray.remote(max_reconstructions=5)
@ray.remote(max_restarts=5)
class Actor:
def __init__(self):
self.counter = 0
@@ -78,8 +77,8 @@ You can experiment with this behavior by running the following code.
actor = Actor.remote()
# The actor will be reconstructed up to 5 times. After that, methods will
# raise exceptions. The actor is reconstructed by rerunning its
# The actor will be restarted up to 5 times. After that, methods will
# raise exceptions. The actor is restarted by rerunning its
# constructor. Methods that were executing when the actor died will also
# raise exceptions.
for _ in range(100):
+1 -1
View File
@@ -229,7 +229,7 @@ Logical View (Experimental)
**State**: State of an actor.
- 0: Alive
- 1: Reconstructing
- 1: Restarting
- 2: Dead
**Pending**: A number of pending tasks for this actor.
@@ -18,9 +18,9 @@ public interface BaseActor {
* Kill the actor immediately. This will cause any outstanding tasks submitted to the actor to
* fail and the actor to exit in the same way as if it crashed.
*
* @param noReconstruction If set to true, the killed actor will not be reconstructed anymore.
* @param noRestart If set to true, the killed actor will not be restarted anymore.
*/
default void kill(boolean noReconstruction) {
Ray.internal().killActor(this, noReconstruction);
default void kill(boolean noRestart) {
Ray.internal().killActor(this, noRestart);
}
}
@@ -73,7 +73,7 @@ public interface Checkpointable {
/**
* Load actor's previous checkpoint, and restore actor's state.
*
* This method will be called when an actor is reconstructed, after actor's constructor. If the
* This method will be called when an actor is restarted, after the actor's constructor. If the
* actor needs to restore from previous checkpoint, this function should restore actor's state and
* return the checkpoint ID. Otherwise, it should do nothing and return null.
*
@@ -4,7 +4,7 @@ import io.ray.api.id.ObjectId;
/**
* Indicates that an object is lost (either evicted or explicitly deleted) and cannot be
* reconstructed.
* restarted.
*
* Note, this exception only happens for actor objects. If actor's current state is after object's
* creating task, the actor cannot re-run the task to reconstruct the object.
@@ -7,20 +7,16 @@ import java.util.Map;
* The options for creating actor.
*/
public class ActorCreationOptions extends BaseTaskOptions {
public static final int NO_RECONSTRUCTION = 0;
public static final int INFINITE_RECONSTRUCTION = (int) Math.pow(2, 30);
public final int maxReconstructions;
public final int maxRestarts;
public final String jvmOptions;
public final int maxConcurrency;
private ActorCreationOptions(Map<String, Double> resources, int maxReconstructions,
private ActorCreationOptions(Map<String, Double> resources, int maxRestarts,
String jvmOptions, int maxConcurrency) {
super(resources);
this.maxReconstructions = maxReconstructions;
this.maxRestarts = maxRestarts;
this.jvmOptions = jvmOptions;
this.maxConcurrency = maxConcurrency;
}
@@ -31,7 +27,7 @@ public class ActorCreationOptions extends BaseTaskOptions {
public static class Builder {
private Map<String, Double> resources = new HashMap<>();
private int maxReconstructions = NO_RECONSTRUCTION;
private int maxRestarts = 0;
private String jvmOptions = null;
private int maxConcurrency = 1;
@@ -40,8 +36,8 @@ public class ActorCreationOptions extends BaseTaskOptions {
return this;
}
public Builder setMaxReconstructions(int maxReconstructions) {
this.maxReconstructions = maxReconstructions;
public Builder setMaxRestarts(int maxRestarts) {
this.maxRestarts = maxRestarts;
return this;
}
@@ -65,7 +61,7 @@ public class ActorCreationOptions extends BaseTaskOptions {
public ActorCreationOptions createActorCreationOptions() {
return new ActorCreationOptions(
resources, maxReconstructions, jvmOptions, maxConcurrency);
resources, maxRestarts, jvmOptions, maxConcurrency);
}
}
@@ -86,9 +86,9 @@ public interface RayRuntime {
* Kill the actor immediately.
*
* @param actor The actor to be killed.
* @param noReconstruction If set to true, the killed actor will not be reconstructed anymore.
* @param noRestart If set to true, the killed actor will not be restarted anymore.
*/
void killActor(BaseActor actor, boolean noReconstruction);
void killActor(BaseActor actor, boolean noRestart);
/**
* Invoke a remote function.
@@ -22,11 +22,11 @@ public interface RuntimeContext {
ActorId getCurrentActorId();
/**
* Returns true if the current actor was reconstructed, false if it's created for the first time.
* Returns true if the current actor was restarted, false if it's created for the first time.
*
* Note, this method should only be called from an actor creation task.
*/
boolean wasCurrentActorReconstructed();
boolean wasCurrentActorRestarted();
/**
* Get the raylet socket name.
@@ -62,7 +62,7 @@ public class RayDevRuntime extends AbstractRayRuntime {
}
@Override
public void killActor(BaseActor actor, boolean noReconstruction) {
public void killActor(BaseActor actor, boolean noRestart) {
throw new UnsupportedOperationException();
}
@@ -126,8 +126,8 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
}
@Override
public void killActor(BaseActor actor, boolean noReconstruction) {
nativeKillActor(actor.getId().getBytes(), noReconstruction);
public void killActor(BaseActor actor, boolean noRestart) {
nativeKillActor(actor.getId().getBytes(), noRestart);
}
@Override
@@ -160,7 +160,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
private static native void nativeSetResource(String resourceName, double capacity, byte[] nodeId);
private static native void nativeKillActor(byte[] actorId, boolean noReconstruction);
private static native void nativeKillActor(byte[] actorId, boolean noRestart);
private static native void nativeSetCoreWorker(byte[] workerId);
@@ -32,7 +32,7 @@ public class RuntimeContextImpl implements RuntimeContext {
}
@Override
public boolean wasCurrentActorReconstructed() {
public boolean wasCurrentActorRestarted() {
TaskType currentTaskType = runtime.getWorkerContext().getCurrentTaskType();
Preconditions.checkState(currentTaskType == TaskType.ACTOR_CREATION_TASK,
"This method can only be called from an actor creation task.");
@@ -40,7 +40,7 @@ public class RuntimeContextImpl implements RuntimeContext {
return false;
}
return runtime.getGcsClient().wasCurrentActorReconstructed(getCurrentActorId());
return runtime.getGcsClient().wasCurrentActorRestarted(getCurrentActorId());
}
@Override
@@ -125,7 +125,7 @@ public class GcsClient {
return primary.exists(key);
}
public boolean wasCurrentActorReconstructed(ActorId actorId) {
public boolean wasCurrentActorRestarted(ActorId actorId) {
byte[] key = ArrayUtils.addAll(TablePrefix.ACTOR.toString().getBytes(), actorId.getBytes());
if (!RayConfig.getInstance().gcsServiceEnabled) {
return primary.exists(key);
@@ -142,10 +142,7 @@ public class GcsClient {
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Received invalid protobuf data from GCS.");
}
long maxReconstructions = actorTableData.getMaxReconstructions();
long remainingReconstructions = actorTableData.getRemainingReconstructions();
return maxReconstructions - remainingReconstructions != 0;
return actorTableData.getNumRestarts() != 0;
}
/**
@@ -16,20 +16,20 @@ import org.testng.Assert;
import org.testng.annotations.Test;
@Test
public class ActorReconstructionTest extends BaseTest {
public class ActorRestartTest extends BaseTest {
public static class Counter {
protected int value = 0;
private boolean wasCurrentActorReconstructed = false;
private boolean wasCurrentActorRestarted = false;
public Counter() {
wasCurrentActorReconstructed = Ray.getRuntimeContext().wasCurrentActorReconstructed();
wasCurrentActorRestarted = Ray.getRuntimeContext().wasCurrentActorRestarted();
}
public boolean wasCurrentActorReconstructed() {
return wasCurrentActorReconstructed;
public boolean wasCurrentActorRestarted() {
return wasCurrentActorRestarted;
}
public int increase() {
@@ -42,17 +42,17 @@ public class ActorReconstructionTest extends BaseTest {
}
}
public void testActorReconstruction() throws InterruptedException, IOException {
public void testActorRestart() throws InterruptedException, IOException {
TestUtils.skipTestUnderSingleProcess();
ActorCreationOptions options =
new ActorCreationOptions.Builder().setMaxReconstructions(1).createActorCreationOptions();
new ActorCreationOptions.Builder().setMaxRestarts(1).createActorCreationOptions();
RayActor<Counter> actor = Ray.createActor(Counter::new, options);
// Call increase 3 times.
for (int i = 0; i < 3; i++) {
actor.call(Counter::increase).get();
}
Assert.assertFalse(actor.call(Counter::wasCurrentActorReconstructed).get());
Assert.assertFalse(actor.call(Counter::wasCurrentActorRestarted).get());
// Kill the actor process.
int pid = actor.call(Counter::getPid).get();
@@ -63,7 +63,7 @@ public class ActorReconstructionTest extends BaseTest {
int value = actor.call(Counter::increase).get();
Assert.assertEquals(value, 1);
Assert.assertTrue(actor.call(Counter::wasCurrentActorReconstructed).get());
Assert.assertTrue(actor.call(Counter::wasCurrentActorRestarted).get());
// Kill the actor process again.
pid = actor.call(Counter::getPid).get();
@@ -124,7 +124,7 @@ public class ActorReconstructionTest extends BaseTest {
public void testActorCheckpointing() throws IOException, InterruptedException {
TestUtils.skipTestUnderSingleProcess();
ActorCreationOptions options =
new ActorCreationOptions.Builder().setMaxReconstructions(1).createActorCreationOptions();
new ActorCreationOptions.Builder().setMaxRestarts(1).createActorCreationOptions();
RayActor<CheckpointableCounter> actor = Ray.createActor(CheckpointableCounter::new, options);
// Call increase 3 times.
for (int i = 0; i < 3; i++) {
@@ -41,45 +41,45 @@ public class KillActorTest extends BaseTest {
public static class KillerActor {
public void kill(RayActor<?> actor, boolean noReconstruction) {
actor.kill(noReconstruction);
public void kill(RayActor<?> actor, boolean noRestart) {
actor.kill(noRestart);
}
}
private static void localKill(RayActor<?> actor, boolean noReconstruction) {
actor.kill(noReconstruction);
private static void localKill(RayActor<?> actor, boolean noRestart) {
actor.kill(noRestart);
}
private static void remoteKill(RayActor<?> actor, boolean noReconstruction) {
private static void remoteKill(RayActor<?> actor, boolean noRestart) {
RayActor<KillerActor> killer = Ray.createActor(KillerActor::new);
killer.call(KillerActor::kill, actor, noReconstruction);
killer.call(KillerActor::kill, actor, noRestart);
}
private void testKillActor(BiConsumer<RayActor<?>, Boolean> kill, boolean noReconstruction) {
private void testKillActor(BiConsumer<RayActor<?>, Boolean> kill, boolean noRestart) {
TestUtils.skipTestUnderSingleProcess();
ActorCreationOptions options =
new ActorCreationOptions.Builder().setMaxReconstructions(1).createActorCreationOptions();
new ActorCreationOptions.Builder().setMaxRestarts(1).createActorCreationOptions();
RayActor<HangActor> actor = Ray.createActor(HangActor::new, options);
RayObject<Boolean> result = actor.call(HangActor::hang);
// The actor will hang in this task.
Assert.assertEquals(0, Ray.wait(ImmutableList.of(result), 1, 500).getReady().size());
// Kill the actor
kill.accept(actor, noReconstruction);
kill.accept(actor, noRestart);
// The get operation will fail with RayActorException
Assert.expectThrows(RayActorException.class, result::get);
try {
// Sleep 1s here to make sure the driver has received the actor notification
// (of state RECONSTRUCTING or DEAD).
// (of state RESTARTING or DEAD).
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (noReconstruction) {
// The actor should not be reconstructed.
if (noRestart) {
// The actor should not be restarted.
Assert.expectThrows(RayActorException.class, () -> actor.call(HangActor::hang).get());
} else {
Assert.assertEquals(actor.call(HangActor::ping).get(), "pong");
+4 -4
View File
@@ -902,7 +902,7 @@ cdef class CoreWorker:
Language language,
FunctionDescriptor function_descriptor,
args,
uint64_t max_reconstructions,
int64_t max_restarts,
resources,
placement_resources,
int32_t max_concurrency,
@@ -929,7 +929,7 @@ cdef class CoreWorker:
check_status(CCoreWorkerProcess.GetCoreWorker().CreateActor(
ray_function, args_vector,
CActorCreationOptions(
max_reconstructions, max_concurrency,
max_restarts, max_concurrency,
c_resources, c_placement_resources,
dynamic_worker_options, is_detached, name, is_asyncio),
extension_data,
@@ -970,13 +970,13 @@ cdef class CoreWorker:
return VectorToObjectIDs(return_ids)
def kill_actor(self, ActorID actor_id, c_bool no_reconstruction):
def kill_actor(self, ActorID actor_id, c_bool no_restart):
cdef:
CActorID c_actor_id = actor_id.native()
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().KillActor(
c_actor_id, True, no_reconstruction))
c_actor_id, True, no_restart))
def cancel_task(self, ObjectID object_id, c_bool force_kill):
cdef:
+29 -27
View File
@@ -243,9 +243,8 @@ class ActorClassMetadata:
"""
def __init__(self, language, modified_class,
actor_creation_function_descriptor, class_id,
max_reconstructions, num_cpus, num_gpus, memory,
object_store_memory, resources):
actor_creation_function_descriptor, class_id, max_restarts,
num_cpus, num_gpus, memory, object_store_memory, resources):
self.language = language
self.modified_class = modified_class
self.actor_creation_function_descriptor = \
@@ -253,7 +252,7 @@ class ActorClassMetadata:
self.class_name = actor_creation_function_descriptor.class_name
self.is_cross_language = language != Language.PYTHON
self.class_id = class_id
self.max_reconstructions = max_reconstructions
self.max_restarts = max_restarts
self.num_cpus = num_cpus
self.num_gpus = num_gpus
self.memory = memory
@@ -314,9 +313,9 @@ class ActorClass:
self.__ray_metadata__.class_name))
@classmethod
def _ray_from_modified_class(cls, modified_class, class_id,
max_reconstructions, num_cpus, num_gpus,
memory, object_store_memory, resources):
def _ray_from_modified_class(cls, modified_class, class_id, max_restarts,
num_cpus, num_gpus, memory,
object_store_memory, resources):
for attribute in [
"remote", "_remote", "_ray_from_modified_class",
"_ray_from_function_descriptor"
@@ -344,22 +343,21 @@ class ActorClass:
self.__ray_metadata__ = ActorClassMetadata(
Language.PYTHON, modified_class,
actor_creation_function_descriptor, class_id, max_reconstructions,
actor_creation_function_descriptor, class_id, max_restarts,
num_cpus, num_gpus, memory, object_store_memory, resources)
return self
@classmethod
def _ray_from_function_descriptor(cls, language,
actor_creation_function_descriptor,
max_reconstructions, num_cpus, num_gpus,
memory, object_store_memory, resources):
def _ray_from_function_descriptor(
cls, language, actor_creation_function_descriptor, max_restarts,
num_cpus, num_gpus, memory, object_store_memory, resources):
self = ActorClass.__new__(ActorClass)
self.__ray_metadata__ = ActorClassMetadata(
language, None, actor_creation_function_descriptor, None,
max_reconstructions, num_cpus, num_gpus, memory,
object_store_memory, resources)
max_restarts, num_cpus, num_gpus, memory, object_store_memory,
resources)
return self
@@ -407,7 +405,7 @@ class ActorClass:
resources=None,
is_direct_call=None,
max_concurrency=None,
max_reconstructions=None,
max_restarts=None,
name=None,
detached=False):
"""Create an actor.
@@ -558,7 +556,7 @@ class ActorClass:
meta.language,
meta.actor_creation_function_descriptor,
creation_args,
max_reconstructions or meta.max_reconstructions,
max_restarts or meta.max_restarts,
resources,
actor_placement_resources,
max_concurrency,
@@ -893,21 +891,25 @@ def modify_class(cls):
def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources,
max_reconstructions):
max_restarts):
Class = modify_class(cls)
if max_reconstructions is None:
max_reconstructions = 0
if max_restarts is None:
max_restarts = 0
if not (ray_constants.NO_RECONSTRUCTION <= max_reconstructions <=
ray_constants.INFINITE_RECONSTRUCTION):
raise ValueError("max_reconstructions must be in range [%d, %d]." %
(ray_constants.NO_RECONSTRUCTION,
ray_constants.INFINITE_RECONSTRUCTION))
infinite_restart = max_restarts == -1
if not infinite_restart:
if max_restarts < 0:
raise ValueError("max_restarts must be an integer >= -1 "
"-1 indicates infinite restarts")
else:
# Make sure we don't pass too big of an int to C++, causing
# an overflow.
max_restarts = min(max_restarts, ray_constants.MAX_INT64_VALUE)
return ActorClass._ray_from_modified_class(
Class, ActorClassID.from_random(), max_reconstructions, num_cpus,
num_gpus, memory, object_store_memory, resources)
Class, ActorClassID.from_random(), max_restarts, num_cpus, num_gpus,
memory, object_store_memory, resources)
def exit_actor():
@@ -1005,7 +1007,7 @@ class Checkpointable(metaclass=ABCMeta):
def load_checkpoint(self, actor_id, available_checkpoints):
"""Load actor's previous checkpoint, and restore actor's state.
This method will be called when an actor is reconstructed, after
This method will be called when an actor is restarted, after
actor's constructor.
If the actor needs to restore from previous checkpoint, this function
should restore actor's state and return the checkpoint ID. Otherwise,
+1 -1
View File
@@ -76,7 +76,7 @@ def java_actor_class(class_name):
return ActorClass._ray_from_function_descriptor(
Language.JAVA,
JavaFunctionDescriptor(class_name, "<init>", ""),
0, # max_reconstructions,
0, # max_restarts,
None, # num_cpus,
None, # num_gpus,
None, # memory,
+1 -1
View File
@@ -230,7 +230,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
cdef cppclass CActorCreationOptions "ray::ActorCreationOptions":
CActorCreationOptions()
CActorCreationOptions(
uint64_t max_reconstructions,
int64_t max_restarts,
int32_t max_concurrency,
const unordered_map[c_string, double] &resources,
const unordered_map[c_string, double] &placement_resources,
+1 -1
View File
@@ -96,7 +96,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
c_vector[CObjectID] *return_ids)
CRayStatus KillActor(
const CActorID &actor_id, c_bool force_kill,
c_bool no_reconstruction)
c_bool no_restart)
CRayStatus CancelTask(const CObjectID &object_id, c_bool force_kill)
unique_ptr[CProfileEvent] CreateProfileEvent(
+4 -5
View File
@@ -159,11 +159,6 @@ LOGGER_LEVEL_CHOICES = ["debug", "info", "warning", "error", "critical"]
LOGGER_LEVEL_HELP = ("The logging level threshold, choices=['debug', 'info',"
" 'warning', 'error', 'critical'], default='info'")
# A constant indicating that an actor doesn't need reconstructions.
NO_RECONSTRUCTION = 0
# A constant indicating that an actor should be reconstructed infinite times.
INFINITE_RECONSTRUCTION = 2**30
# Constants used to define the different process types.
PROCESS_TYPE_REAPER = "reaper"
PROCESS_TYPE_MONITOR = "monitor"
@@ -203,3 +198,7 @@ MACH_PAGE_SIZE_BYTES = 4096
# TODO(ffbin): Once we entirely migrate to service-based GCS, we should
# remove it.
GCS_SERVICE_ENABLED = env_bool("RAY_GCS_SERVICE_ENABLED", True)
# Max 64 bit integer value, which is needed to ensure against overflow
# in C++ when passing integer values cross-language.
MAX_INT64_VALUE = 9223372036854775807
+1 -1
View File
@@ -127,7 +127,7 @@ def init(blocking=False,
master_actor = ServeMaster.options(
detached=True,
name=SERVE_MASTER_NAME,
max_reconstructions=ray.ray_constants.INFINITE_RECONSTRUCTION,
max_restarts=-1,
).remote(queueing_policy.value, policy_kwargs, start_server, http_host,
http_port, metric_exporter)
+2 -2
View File
@@ -91,8 +91,8 @@ class ReplicaConfig:
elif "name" in self.ray_actor_options:
raise ValueError(
"Specifying name in actor_init_args is not allowed.")
elif "max_reconstructions" in self.ray_actor_options:
raise ValueError("Specifying max_reconstructions in "
elif "max_restarts" in self.ray_actor_options:
raise ValueError("Specifying max_restarts in "
"actor_init_args is not allowed.")
else:
num_cpus = self.ray_actor_options.get("num_cpus", 0)
+3 -4
View File
@@ -127,8 +127,7 @@ class ServeMaster:
detached=True,
name=SERVE_ROUTER_NAME,
max_concurrency=ASYNC_CONCURRENCY,
max_reconstructions=ray.ray_constants.INFINITE_RECONSTRUCTION,
).remote(policy, policy_kwargs)
max_restarts=-1).remote(policy, policy_kwargs)
def get_router(self):
"""Returns a handle to the router managed by this actor."""
@@ -148,7 +147,7 @@ class ServeMaster:
detached=True,
name=SERVE_PROXY_NAME,
max_concurrency=ASYNC_CONCURRENCY,
max_reconstructions=ray.ray_constants.INFINITE_RECONSTRUCTION,
max_restarts=-1,
).remote(host, port)
def get_http_proxy(self):
@@ -295,7 +294,7 @@ class ServeMaster:
worker_handle = async_retryable(ray.remote(backend_worker)).options(
detached=True,
name=replica_tag,
max_reconstructions=ray.ray_constants.INFINITE_RECONSTRUCTION,
max_restarts=-1,
**replica_config.ray_actor_options).remote(
backend_tag, replica_tag, replica_config.actor_init_args)
# TODO(edoakes): we should probably have a timeout here.
+1 -1
View File
@@ -131,4 +131,4 @@ def test_replica_config_validation():
with pytest.raises(ValueError):
ReplicaConfig(Class, ray_actor_options={"detached": None})
with pytest.raises(ValueError):
ReplicaConfig(Class, ray_actor_options={"max_reconstructions": None})
ReplicaConfig(Class, ray_actor_options={"max_restarts": None})
+1 -1
View File
@@ -120,7 +120,7 @@ def async_retryable(cls):
be invoked in an async context.
Usage:
@ray.remote(max_reconstructions=10000)
@ray.remote(max_restarts=10000)
@async_retryable
class A:
pass
+10 -10
View File
@@ -119,7 +119,7 @@ def test_actor_lifetime_load_balancing(ray_start_cluster):
}],
indirect=True)
def test_deleted_actor_no_restart(ray_start_regular):
@ray.remote(resources={"actor": 1}, max_reconstructions=3)
@ray.remote(resources={"actor": 1}, max_restarts=3)
class Actor:
def method(self):
return 1
@@ -155,7 +155,7 @@ def test_exception_raised_when_actor_node_dies(ray_start_cluster_head):
cluster = ray_start_cluster_head
remote_node = cluster.add_node()
@ray.remote(max_reconstructions=0)
@ray.remote(max_restarts=0)
class Counter:
def __init__(self):
self.x = 0
@@ -195,7 +195,7 @@ def test_actor_init_fails(ray_start_cluster_head):
cluster = ray_start_cluster_head
remote_node = cluster.add_node()
@ray.remote(max_reconstructions=1)
@ray.remote(max_restarts=1)
class Counter:
def __init__(self):
self.x = 0
@@ -221,7 +221,7 @@ def test_reconstruction_suppression(ray_start_cluster_head):
num_nodes = 5
worker_nodes = [cluster.add_node() for _ in range(num_nodes)]
@ray.remote(max_reconstructions=1)
@ray.remote(max_restarts=1)
class Counter:
def __init__(self):
self.x = 0
@@ -247,7 +247,7 @@ def test_reconstruction_suppression(ray_start_cluster_head):
results = []
for _ in range(10):
results += [inc.remote(actor) for actor in actors]
# Make sure that we can get the results from the reconstructed actor.
# Make sure that we can get the results from the restarted actor.
results = ray.get(results)
@@ -767,7 +767,7 @@ def test_kill(ray_start_regular, deprecated_codepath):
# hang the caller.
def test_actor_creation_task_crash(ray_start_regular):
# Test actor death in constructor.
@ray.remote(max_reconstructions=0)
@ray.remote(max_restarts=0)
class Actor:
def __init__(self):
print("crash")
@@ -781,10 +781,10 @@ def test_actor_creation_task_crash(ray_start_regular):
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.f.remote())
# Test an actor can be reconstructed successfully
# Test an actor can be restarted successfully
# afte it dies in its constructor.
@ray.remote(max_reconstructions=3)
class ReconstructableActor:
@ray.remote(max_restarts=3)
class RestartableActor:
def __init__(self):
count = self.get_count()
count += 1
@@ -811,7 +811,7 @@ def test_actor_creation_task_crash(ray_start_regular):
_internal_kv_put("count", count, True)
# Verify we can get the object successfully.
ra = ReconstructableActor.remote()
ra = RestartableActor.remote()
ray.get(ra.f.remote())
+41 -42
View File
@@ -127,12 +127,12 @@ def test_actor_eviction(ray_start_object_store_memory):
assert num_success > 0
def test_actor_reconstruction(ray_start_regular):
def test_actor_restart(ray_start_regular):
"""Test actor reconstruction when actor process is killed."""
@ray.remote(max_reconstructions=1)
class ReconstructableActor:
"""An actor that will be reconstructed at most once."""
@ray.remote(max_restarts=1)
class RestartableActor:
"""An actor that will be restarted at most once."""
def __init__(self):
self.value = 0
@@ -145,7 +145,7 @@ def test_actor_reconstruction(ray_start_regular):
def get_pid(self):
return os.getpid()
actor = ReconstructableActor.remote()
actor = RestartableActor.remote()
pid = ray.get(actor.get_pid.remote())
# Call increase 3 times
for _ in range(3):
@@ -156,31 +156,31 @@ def test_actor_reconstruction(ray_start_regular):
time.sleep(0.2)
# Kill actor process, while the above task is still being executed.
os.kill(pid, signal.SIGKILL)
# Check that the above task didn't fail and the actor is reconstructed.
# Check that the above task didn't fail and the actor is restarted.
assert ray.get(result) == 4
# Check that we can still call the actor.
assert ray.get(actor.increase.remote()) == 5
# kill actor process one more time.
pid = ray.get(actor.get_pid.remote())
os.kill(pid, signal.SIGKILL)
# The actor has exceeded max reconstructions, and this task should fail.
# The actor has exceeded max restarts, and this task should fail.
with pytest.raises(ray.exceptions.RayActorError):
ray.get(actor.increase.remote())
# Create another actor.
actor = ReconstructableActor.remote()
actor = RestartableActor.remote()
# Intentionlly exit the actor
actor.__ray_terminate__.remote()
# Check that the actor won't be reconstructed.
# Check that the actor won't be restarted.
with pytest.raises(ray.exceptions.RayActorError):
ray.get(actor.increase.remote())
def test_actor_reconstruction_without_task(ray_start_regular):
"""Test a dead actor can be reconstructed without sending task to it."""
def test_actor_restart_without_task(ray_start_regular):
"""Test a dead actor can be restarted without sending task to it."""
@ray.remote(max_reconstructions=1)
class ReconstructableActor:
@ray.remote(max_restarts=1)
class RestartableActor:
def __init__(self, obj_ids):
for obj_id in obj_ids:
# Every time the actor gets constructed,
@@ -194,26 +194,26 @@ def test_actor_reconstruction_without_task(ray_start_regular):
return os.getpid()
obj_ids = [ray.ObjectID.from_random() for _ in range(2)]
actor = ReconstructableActor.remote(obj_ids)
actor = RestartableActor.remote(obj_ids)
# Kill the actor.
pid = ray.get(actor.get_pid.remote())
os.kill(pid, signal.SIGKILL)
# Wait until the actor is reconstructed.
def check_reconstructed():
def check_restarted():
worker = ray.worker.global_worker
return worker.core_worker.object_exists(obj_ids[1])
assert wait_for_condition(check_reconstructed)
assert wait_for_condition(check_restarted)
def test_caller_actor_reconstruction(ray_start_regular):
"""Test tasks from a reconstructed actor can be correctly processed
def test_caller_actor_restart(ray_start_regular):
"""Test tasks from a restarted actor can be correctly processed
by the receiving actor."""
@ray.remote(max_reconstructions=1)
class ReconstructableActor:
"""An actor that will be reconstructed at most once."""
@ray.remote(max_restarts=1)
class RestartableActor:
"""An actor that will be restarted at most once."""
def __init__(self, actor):
self.actor = actor
@@ -224,9 +224,9 @@ def test_caller_actor_reconstruction(ray_start_regular):
def get_pid(self):
return os.getpid()
@ray.remote(max_reconstructions=1)
@ray.remote(max_restarts=1)
class Actor:
"""An actor that will be reconstructed at most once."""
"""An actor that will be restarted at most once."""
def __init__(self):
self.value = 0
@@ -236,7 +236,7 @@ def test_caller_actor_reconstruction(ray_start_regular):
return self.value
remote_actor = Actor.remote()
actor = ReconstructableActor.remote(remote_actor)
actor = RestartableActor.remote(remote_actor)
# Call increase 3 times
for _ in range(3):
ray.get(actor.increase.remote())
@@ -261,9 +261,9 @@ def test_caller_task_reconstruction(ray_start_regular):
else:
os._exit(0)
@ray.remote(max_reconstructions=1)
@ray.remote(max_restarts=1)
class Actor:
"""An actor that will be reconstructed at most once."""
"""An actor that will be restarted at most once."""
def __init__(self):
self.value = 0
@@ -277,14 +277,14 @@ def test_caller_task_reconstruction(ray_start_regular):
assert ray.get(RetryableTask.remote(remote_actor)) == 3
def test_actor_reconstruction_on_node_failure(ray_start_cluster_head):
def test_actor_restart_on_node_failure(ray_start_cluster_head):
"""Test actor reconstruction when node dies unexpectedly."""
cluster = ray_start_cluster_head
max_reconstructions = 3
max_restarts = 3
# Add a few nodes to the cluster.
# Use custom resource to make sure the actor is only created on worker
# nodes, not on the head node.
for _ in range(max_reconstructions + 2):
for _ in range(max_restarts + 2):
cluster.add_node(
resources={"a": 1},
_internal_config=json.dumps({
@@ -300,7 +300,7 @@ def test_actor_reconstruction_on_node_failure(ray_start_cluster_head):
node_to_remove = node
cluster.remove_node(node_to_remove)
@ray.remote(max_reconstructions=max_reconstructions, resources={"a": 1})
@ray.remote(max_restarts=max_restarts, resources={"a": 1})
class MyActor:
def __init__(self):
self.value = 0
@@ -317,13 +317,13 @@ def test_actor_reconstruction_on_node_failure(ray_start_cluster_head):
for _ in range(3):
ray.get(actor.increase.remote())
for i in range(max_reconstructions):
for i in range(max_restarts):
object_store_socket = ray.get(actor.get_object_store_socket.remote())
# Kill actor's node and the actor should be reconstructed
# Kill actor's node and the actor should be restarted
# on a different node.
kill_node(object_store_socket)
# Call increase again.
# Check that the actor is reconstructed and value is correct.
# Check that the actor is restarted and value is correct.
assert ray.get(actor.increase.remote()) == 4 + i
# Check that the actor is now on a different node.
assert object_store_socket != ray.get(
@@ -332,7 +332,7 @@ def test_actor_reconstruction_on_node_failure(ray_start_cluster_head):
# kill the node again.
object_store_socket = ray.get(actor.get_object_store_socket.remote())
kill_node(object_store_socket)
# The actor has exceeded max reconstructions, and this task should fail.
# The actor has exceeded max restarts, and this task should fail.
with pytest.raises(ray.exceptions.RayActorError):
ray.get(actor.increase.remote())
@@ -347,7 +347,7 @@ def test_actor_reconstruction_on_node_failure(ray_start_cluster_head):
initial_reconstruction_timeout_milliseconds=1000)
],
indirect=True)
def test_multiple_actor_reconstruction(ray_start_cluster_head):
def test_multiple_actor_restart(ray_start_cluster_head):
cluster = ray_start_cluster_head
# This test can be made more stressful by increasing the numbers below.
# The total number of actors created will be
@@ -365,7 +365,7 @@ def test_multiple_actor_reconstruction(ray_start_cluster_head):
})) for _ in range(num_nodes)
]
@ray.remote(max_reconstructions=ray.ray_constants.INFINITE_RECONSTRUCTION)
@ray.remote(max_restarts=-1)
class SlowCounter:
def __init__(self):
self.x = 0
@@ -420,8 +420,7 @@ def kill_actor(actor):
def test_checkpointing(ray_start_regular, ray_checkpointable_actor_cls):
"""Test actor checkpointing and restoring from a checkpoint."""
actor = ray.remote(
max_reconstructions=2)(ray_checkpointable_actor_cls).remote()
actor = ray.remote(max_restarts=2)(ray_checkpointable_actor_cls).remote()
# Call increase 3 times, triggering a checkpoint.
expected = 0
for _ in range(3):
@@ -465,7 +464,7 @@ def test_remote_checkpointing(ray_start_regular, ray_checkpointable_actor_cls):
self._should_checkpoint = False
return should_checkpoint
cls = ray.remote(max_reconstructions=2)(RemoteCheckpointableActor)
cls = ray.remote(max_restarts=2)(RemoteCheckpointableActor)
actor = cls.remote()
# Call increase 3 times.
expected = 0
@@ -501,7 +500,7 @@ def test_checkpointing_on_node_failure(ray_start_cluster_2_nodes,
# Place the actor on the remote node.
cluster = ray_start_cluster_2_nodes
remote_node = list(cluster.worker_nodes)
actor_cls = ray.remote(max_reconstructions=1)(ray_checkpointable_actor_cls)
actor_cls = ray.remote(max_restarts=1)(ray_checkpointable_actor_cls)
actor = actor_cls.remote()
while (ray.get(actor.node_id.remote()) != remote_node[0].unique_id):
actor = actor_cls.remote()
@@ -525,7 +524,7 @@ def test_checkpointing_save_exception(ray_start_regular,
ray_checkpointable_actor_cls):
"""Test actor can still be recovered if checkpoints fail to complete."""
@ray.remote(max_reconstructions=2)
@ray.remote(max_restarts=2)
class RemoteCheckpointableActor(ray_checkpointable_actor_cls):
def save_checkpoint(self, actor_id, checkpoint_context):
raise Exception("Intentional error saving checkpoint.")
@@ -564,7 +563,7 @@ def test_checkpointing_load_exception(ray_start_regular,
ray_checkpointable_actor_cls):
"""Test actor can still be recovered if checkpoints fail to load."""
@ray.remote(max_reconstructions=2)
@ray.remote(max_restarts=2)
class RemoteCheckpointableActor(ray_checkpointable_actor_cls):
def load_checkpoint(self, actor_id, checkpoints):
raise Exception("Intentional error loading checkpoint.")
@@ -70,7 +70,7 @@ def test_actor_creation_node_failure(ray_start_cluster):
assert len(ready) == len(children_out)
# Remove a node. Any actor creation tasks that were forwarded to this
# node must be reconstructed.
# node must be restarted.
cluster.remove_node(cluster.list_all_nodes()[-1])
+3 -3
View File
@@ -379,7 +379,7 @@ def test_actor_worker_dying(ray_start_regular):
def test_actor_worker_dying_future_tasks(ray_start_regular):
@ray.remote(max_reconstructions=0)
@ray.remote(max_restarts=0)
class Actor:
def getpid(self):
return os.getpid()
@@ -401,7 +401,7 @@ def test_actor_worker_dying_future_tasks(ray_start_regular):
def test_actor_worker_dying_nothing_in_progress(ray_start_regular):
@ray.remote(max_reconstructions=0)
@ray.remote(max_restarts=0)
class Actor:
def getpid(self):
return os.getpid()
@@ -1077,7 +1077,7 @@ def test_fate_sharing(ray_start_cluster, use_actors, node_failure):
def probe():
return
# TODO(swang): This test does not pass if max_reconstructions > 0 for the
# TODO(swang): This test does not pass if max_restarts > 0 for the
# raylet codepath. Add this parameter once the GCS actor service is enabled
# by default.
@ray.remote
@@ -120,7 +120,7 @@ def test_actor_creation_node_failure(ray_start_cluster):
except ray.exceptions.RayActorError:
children[i] = Child.remote(death_probability)
# Remove a node. Any actor creation tasks that were forwarded to this
# node must be reconstructed.
# node must be resubmitted.
cluster.remove_node(cluster.list_all_nodes()[-1])
+1 -1
View File
@@ -274,7 +274,7 @@ def test_nondeterministic_task(ray_start_reconstruction):
def error_check(errors):
if num_nodes == 1:
# In a single-node setting, each object is evicted and
# reconstructed exactly once, so exactly half the objects will
# restarted exactly once, so exactly half the objects will
# produce an error during reconstruction.
min_errors = num_objects // 2
else:
+13 -14
View File
@@ -1729,14 +1729,14 @@ def make_decorator(num_return_vals=None,
resources=None,
max_calls=None,
max_retries=None,
max_reconstructions=None,
max_restarts=None,
worker=None):
def decorator(function_or_class):
if (inspect.isfunction(function_or_class)
or is_cython(function_or_class)):
# Set the remote function default resources.
if max_reconstructions is not None:
raise ValueError("The keyword 'max_reconstructions' is not "
if max_restarts is not None:
raise ValueError("The keyword 'max_restarts' is not "
"allowed for remote functions.")
return ray.remote_function.RemoteFunction(
@@ -1754,7 +1754,7 @@ def make_decorator(num_return_vals=None,
return ray.actor.make_actor(function_or_class, num_cpus, num_gpus,
memory, object_store_memory, resources,
max_reconstructions)
max_restarts)
raise TypeError("The @ray.remote decorator must be applied to "
"either a function or to a class.")
@@ -1796,16 +1796,15 @@ def remote(*args, **kwargs):
third-party libraries or to reclaim resources that cannot easily be
released, e.g., GPU memory that was acquired by TensorFlow). By
default this is infinite.
* **max_reconstructions**: Only for *actors*. This specifies the maximum
number of times that the actor should be reconstructed when it dies
* **max_restarts**: Only for *actors*. This specifies the maximum
number of times that the actor should be restarted when it dies
unexpectedly. The minimum valid value is 0 (default), which indicates
that the actor doesn't need to be reconstructed. And the maximum valid
value is ray.ray_constants.INFINITE_RECONSTRUCTION.
that the actor doesn't need to be restarted. A value of -1
indicates that an actor should be restarted indefinitely.
* **max_retries**: Only for *remote functions*. This specifies the maximum
number of times that the remote function should be rerun when the worker
process executing it crashes unexpectedly. The minimum valid value is 0,
the default is 4 (default), and the maximum valid value is
ray.ray_constants.INFINITE_RECONSTRUCTION.
the default is 4 (default), and a value of -1 indicates infinite retries.
This can be done as follows:
@@ -1854,7 +1853,7 @@ def remote(*args, **kwargs):
"'@ray.remote', or it must be applied using some of "
"the arguments 'num_return_vals', 'num_cpus', 'num_gpus', "
"'memory', 'object_store_memory', 'resources', "
"'max_calls', or 'max_reconstructions', like "
"'max_calls', or 'max_restarts', like "
"'@ray.remote(num_return_vals=2, "
"resources={\"CustomResource\": 1})'.")
assert len(args) == 0 and len(kwargs) > 0, error_string
@@ -1867,7 +1866,7 @@ def remote(*args, **kwargs):
"object_store_memory",
"resources",
"max_calls",
"max_reconstructions",
"max_restarts",
"max_retries",
], error_string
@@ -1885,7 +1884,7 @@ def remote(*args, **kwargs):
# Handle other arguments.
num_return_vals = kwargs.get("num_return_vals")
max_calls = kwargs.get("max_calls")
max_reconstructions = kwargs.get("max_reconstructions")
max_restarts = kwargs.get("max_restarts")
memory = kwargs.get("memory")
object_store_memory = kwargs.get("object_store_memory")
max_retries = kwargs.get("max_retries")
@@ -1898,6 +1897,6 @@ def remote(*args, **kwargs):
object_store_memory=object_store_memory,
resources=resources,
max_calls=max_calls,
max_reconstructions=max_reconstructions,
max_restarts=max_restarts,
max_retries=max_retries,
worker=worker)
+1 -1
View File
@@ -117,7 +117,7 @@ RAY_CONFIG(int64_t, max_direct_call_object_size, 100 * 1024)
RAY_CONFIG(int64_t, max_grpc_message_size, 100 * 1024 * 1024)
// The min number of retries for direct actor creation tasks. The actual number
// of creation retries will be MAX(actor_creation_min_retries, max_reconstructions).
// of creation retries will be MAX(actor_creation_min_retries, max_restarts).
RAY_CONFIG(uint64_t, actor_creation_min_retries, 3)
/// The initial period for a task execution lease. The lease will expire this
+5 -4
View File
@@ -1,6 +1,7 @@
#include "ray/common/task/task_spec.h"
#include <sstream>
#include "ray/common/task/task_spec.h"
#include "ray/util/logging.h"
namespace ray {
@@ -189,9 +190,9 @@ ActorID TaskSpecification::ActorCreationId() const {
return ActorID::FromBinary(message_->actor_creation_task_spec().actor_id());
}
uint64_t TaskSpecification::MaxActorReconstructions() const {
int64_t TaskSpecification::MaxActorRestarts() const {
RAY_CHECK(IsActorCreationTask());
return message_->actor_creation_task_spec().max_actor_reconstructions();
return message_->actor_creation_task_spec().max_actor_restarts();
}
std::vector<std::string> TaskSpecification::DynamicWorkerOptions() const {
@@ -266,7 +267,7 @@ std::string TaskSpecification::DebugString() const {
if (IsActorCreationTask()) {
// Print actor creation task spec.
stream << ", actor_creation_task_spec={actor_id=" << ActorCreationId()
<< ", max_reconstructions=" << MaxActorReconstructions()
<< ", max_restarts=" << MaxActorRestarts()
<< ", max_concurrency=" << MaxActorConcurrency()
<< ", is_asyncio_actor=" << IsAsyncioActor()
<< ", is_detached=" << IsDetachedActor() << "}";
+1 -1
View File
@@ -139,7 +139,7 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
ActorID ActorCreationId() const;
uint64_t MaxActorReconstructions() const;
int64_t MaxActorRestarts() const;
std::vector<std::string> DynamicWorkerOptions() const;
+2 -2
View File
@@ -101,14 +101,14 @@ class TaskSpecBuilder {
///
/// \return Reference to the builder object itself.
TaskSpecBuilder &SetActorCreationTaskSpec(
const ActorID &actor_id, uint64_t max_reconstructions = 0,
const ActorID &actor_id, int64_t max_restarts = 0,
const std::vector<std::string> &dynamic_worker_options = {},
int max_concurrency = 1, bool is_detached = false, std::string name = "",
bool is_asyncio = false, const std::string &extension_data = "") {
message_->set_type(TaskType::ACTOR_CREATION_TASK);
auto actor_creation_spec = message_->mutable_actor_creation_task_spec();
actor_creation_spec->set_actor_id(actor_id.Binary());
actor_creation_spec->set_max_actor_reconstructions(max_reconstructions);
actor_creation_spec->set_max_actor_restarts(max_restarts);
for (const auto &option : dynamic_worker_options) {
actor_creation_spec->add_dynamic_worker_options(option);
}
+5 -4
View File
@@ -111,12 +111,12 @@ struct TaskOptions {
/// Options for actor creation tasks.
struct ActorCreationOptions {
ActorCreationOptions() {}
ActorCreationOptions(uint64_t max_reconstructions, int max_concurrency,
ActorCreationOptions(int64_t max_restarts, int max_concurrency,
const std::unordered_map<std::string, double> &resources,
const std::unordered_map<std::string, double> &placement_resources,
const std::vector<std::string> &dynamic_worker_options,
bool is_detached, std::string &name, bool is_asyncio)
: max_reconstructions(max_reconstructions),
: max_restarts(max_restarts),
max_concurrency(max_concurrency),
resources(resources),
placement_resources(placement_resources),
@@ -126,8 +126,9 @@ struct ActorCreationOptions {
is_asyncio(is_asyncio){};
/// Maximum number of times that the actor should be reconstructed when it dies
/// unexpectedly. It must be non-negative. If it's 0, the actor won't be reconstructed.
const uint64_t max_reconstructions = 0;
/// unexpectedly. A value of -1 indicates infinite restarts.
/// If it's 0, the actor won't be restarted.
const int64_t max_restarts = 0;
/// The max number of concurrent tasks to run on this direct call actor.
const int max_concurrency = 1;
/// Resources required by the whole lifetime of this actor.
+15 -11
View File
@@ -1146,7 +1146,7 @@ Status CoreWorker::CreateActor(const RayFunction &function,
rpc_address_, function, args, 1, actor_creation_options.resources,
actor_creation_options.placement_resources, &return_ids);
builder.SetActorCreationTaskSpec(
actor_id, actor_creation_options.max_reconstructions,
actor_id, actor_creation_options.max_restarts,
actor_creation_options.dynamic_worker_options,
actor_creation_options.max_concurrency, actor_creation_options.is_detached,
actor_creation_options.name, actor_creation_options.is_asyncio, extension_data);
@@ -1167,10 +1167,15 @@ Status CoreWorker::CreateActor(const RayFunction &function,
if (options_.is_local_mode) {
ExecuteTaskLocalMode(task_spec);
} else {
task_manager_->AddPendingTask(
GetCallerId(), rpc_address_, task_spec, CurrentCallSite(),
std::max(RayConfig::instance().actor_creation_min_retries(),
actor_creation_options.max_reconstructions));
int max_retries;
if (actor_creation_options.max_restarts == -1) {
max_retries = -1;
} else {
max_retries = std::max((int64_t)RayConfig::instance().actor_creation_min_retries(),
actor_creation_options.max_restarts);
}
task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec,
CurrentCallSite(), max_retries);
status = direct_task_submitter_->SubmitTask(task_spec);
}
return status;
@@ -1243,11 +1248,10 @@ Status CoreWorker::CancelTask(const ObjectID &object_id, bool force_kill) {
return Status::OK();
}
Status CoreWorker::KillActor(const ActorID &actor_id, bool force_kill,
bool no_reconstruction) {
Status CoreWorker::KillActor(const ActorID &actor_id, bool force_kill, bool no_restart) {
ActorHandle *actor_handle = nullptr;
RAY_RETURN_NOT_OK(GetActorHandle(actor_id, &actor_handle));
direct_actor_submitter_->KillActor(actor_id, force_kill, no_reconstruction);
direct_actor_submitter_->KillActor(actor_id, force_kill, no_restart);
return Status::OK();
}
@@ -1308,7 +1312,7 @@ bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle,
const gcs::ActorTableData &actor_data) {
if (actor_data.state() == gcs::ActorTableData::PENDING) {
// The actor is being created and not yet ready, just ignore!
} else if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) {
} else if (actor_data.state() == gcs::ActorTableData::RESTARTING) {
absl::MutexLock lock(&actor_handles_mutex_);
auto it = actor_handles_.find(actor_id);
RAY_CHECK(it != actor_handles_.end());
@@ -1355,7 +1359,7 @@ bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle,
<< " has gone out of scope, sending message to actor "
<< actor_id << " to do a clean exit.";
RAY_CHECK_OK(
KillActor(actor_id, /*force_kill=*/false, /*no_reconstruction=*/false));
KillActor(actor_id, /*force_kill=*/false, /*no_restart=*/false));
}
}
@@ -1941,7 +1945,7 @@ void CoreWorker::HandleKillActor(const rpc::KillActorRequest &request,
if (request.force_kill()) {
RAY_LOG(INFO) << "Got KillActor, exiting immediately...";
if (request.no_reconstruction()) {
if (request.no_restart()) {
RAY_IGNORE_EXPR(local_raylet_client_->Disconnect());
}
if (options_.num_workers > 1) {
+3 -3
View File
@@ -585,10 +585,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Tell an actor to exit immediately, without completing outstanding work.
///
/// \param[in] actor_id ID of the actor to kill.
/// \param[in] no_reconstruction If set to true, the killed actor will not be
/// reconstructed anymore.
/// \param[in] no_restart If set to true, the killed actor will not be
/// restarted anymore.
/// \param[out] Status
Status KillActor(const ActorID &actor_id, bool force_kill, bool no_reconstruction);
Status KillActor(const ActorID &actor_id, bool force_kill, bool no_restart);
/// Stops the task associated with the given Object ID.
///
@@ -155,10 +155,10 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeSetResource(
}
JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeKillActor(
JNIEnv *env, jclass, jbyteArray actorId, jboolean noReconstruction) {
JNIEnv *env, jclass, jbyteArray actorId, jboolean noRestart) {
auto status = ray::CoreWorkerProcess::GetCoreWorker().KillActor(
JavaByteArrayToId<ActorID>(env, actorId),
/*force_kill=*/true, noReconstruction);
/*force_kill=*/true, noRestart);
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0);
}
@@ -13,7 +13,9 @@
// limitations under the License.
#include "ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.h"
#include <jni.h>
#include "ray/common/id.h"
#include "ray/core_worker/common.h"
#include "ray/core_worker/core_worker.h"
@@ -87,13 +89,13 @@ inline ray::TaskOptions ToTaskOptions(JNIEnv *env, jint numReturns, jobject call
inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
jobject actorCreationOptions) {
uint64_t max_reconstructions = 0;
int64_t max_restarts = 0;
std::unordered_map<std::string, double> resources;
std::vector<std::string> dynamic_worker_options;
uint64_t max_concurrency = 1;
if (actorCreationOptions) {
max_reconstructions = static_cast<uint64_t>(env->GetIntField(
actorCreationOptions, java_actor_creation_options_max_reconstructions));
max_restarts =
env->GetIntField(actorCreationOptions, java_actor_creation_options_max_restarts);
jobject java_resources =
env->GetObjectField(actorCreationOptions, java_base_task_options_resources);
resources = ToResources(env, java_resources);
@@ -108,15 +110,14 @@ inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
}
std::string name = "";
ray::ActorCreationOptions actor_creation_options{
static_cast<uint64_t>(max_reconstructions),
static_cast<int>(max_concurrency),
resources,
resources,
dynamic_worker_options,
/*is_detached=*/false,
name,
/*is_asyncio=*/false};
ray::ActorCreationOptions actor_creation_options{max_restarts,
static_cast<int>(max_concurrency),
resources,
resources,
dynamic_worker_options,
/*is_detached=*/false,
name,
/*is_asyncio=*/false};
return actor_creation_options;
}
+3 -3
View File
@@ -66,7 +66,7 @@ jclass java_base_task_options_class;
jfieldID java_base_task_options_resources;
jclass java_actor_creation_options_class;
jfieldID java_actor_creation_options_max_reconstructions;
jfieldID java_actor_creation_options_max_restarts;
jfieldID java_actor_creation_options_jvm_options;
jfieldID java_actor_creation_options_max_concurrency;
@@ -169,8 +169,8 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) {
java_actor_creation_options_class =
LoadClass(env, "io/ray/api/options/ActorCreationOptions");
java_actor_creation_options_max_reconstructions =
env->GetFieldID(java_actor_creation_options_class, "maxReconstructions", "I");
java_actor_creation_options_max_restarts =
env->GetFieldID(java_actor_creation_options_class, "maxRestarts", "I");
java_actor_creation_options_jvm_options = env->GetFieldID(
java_actor_creation_options_class, "jvmOptions", "Ljava/lang/String;");
java_actor_creation_options_max_concurrency =
+3 -2
View File
@@ -16,6 +16,7 @@
#define RAY_COMMON_JAVA_JNI_UTILS_H
#include <jni.h>
#include "ray/common/buffer.h"
#include "ray/common/function_descriptor.h"
#include "ray/common/id.h"
@@ -111,8 +112,8 @@ extern jfieldID java_base_task_options_resources;
/// ActorCreationOptions class
extern jclass java_actor_creation_options_class;
/// maxReconstructions field of ActorCreationOptions class
extern jfieldID java_actor_creation_options_max_reconstructions;
/// maxRestarts field of ActorCreationOptions class
extern jfieldID java_actor_creation_options_max_restarts;
/// jvmOptions field of ActorCreationOptions class
extern jfieldID java_actor_creation_options_jvm_options;
/// maxConcurrency field of ActorCreationOptions class
@@ -33,7 +33,7 @@ Status ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) {
{
absl::MutexLock lock(&mu_);
// Mark that we are attempting recovery for this object to prevent
// duplicate reconstructions of the same object.
// duplicate restarts of the same object.
already_pending_recovery = !objects_pending_recovery_.insert(object_id).second;
}
}
+10 -6
View File
@@ -94,7 +94,7 @@ Status TaskManager::ResubmitTask(const TaskID &task_id,
if (!it->second.pending) {
resubmit = true;
it->second.pending = true;
RAY_CHECK(it->second.num_retries_left > 0);
RAY_CHECK(it->second.num_retries_left != 0);
it->second.num_retries_left--;
spec = it->second.spec;
}
@@ -241,8 +241,8 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
// A finished task can be only be re-executed if it has some number of
// retries left and returned at least one object that is still in use and
// stored in plasma.
bool task_retryable =
it->second.num_retries_left > 0 && !it->second.reconstructable_return_ids.empty();
bool task_retryable = it->second.num_retries_left != 0 &&
!it->second.reconstructable_return_ids.empty();
if (task_retryable) {
// Pin the task spec if it may be retried again.
release_lineage = false;
@@ -277,8 +277,10 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_
if (num_retries_left == 0) {
submissible_tasks_.erase(it);
num_pending_tasks_--;
} else if (num_retries_left == -1) {
release_lineage = false;
} else {
RAY_CHECK(it->second.num_retries_left > 0);
RAY_CHECK(num_retries_left > 0);
it->second.num_retries_left--;
release_lineage = false;
}
@@ -286,8 +288,10 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_
// We should not hold the lock during these calls because they may trigger
// callbacks in this or other classes.
if (num_retries_left > 0) {
RAY_LOG(ERROR) << num_retries_left << " retries left for task " << spec.TaskId()
if (num_retries_left != 0) {
auto retries_str =
num_retries_left == -1 ? "infinite" : std::to_string(num_retries_left);
RAY_LOG(ERROR) << retries_str << " retries left for task " << spec.TaskId()
<< ", attempting to resubmit.";
retry_task_callback_(spec, /*delay=*/true);
} else {
+10 -10
View File
@@ -58,7 +58,7 @@ static void flushall_redis(void) {
}
ActorID CreateActorHelper(std::unordered_map<std::string, double> &resources,
uint64_t max_reconstructions) {
int64_t max_restarts) {
std::unique_ptr<ActorHandle> actor_handle;
uint8_t array[] = {1, 2, 3};
@@ -72,7 +72,7 @@ ActorID CreateActorHelper(std::unordered_map<std::string, double> &resources,
std::string name = "";
ActorCreationOptions actor_options{
max_reconstructions,
max_restarts,
/*max_concurrency*/ 1, resources, resources, {},
/*is_detached=*/false, name, /*is_asyncio=*/false};
@@ -301,7 +301,7 @@ class CoreWorkerTest : public ::testing::Test {
// Test actor failover case. Verify that actor can be reconstructed successfully,
// and as long as we wait for actor reconstruction before submitting new tasks,
// it is guaranteed that all tasks are successfully completed.
void TestActorReconstruction(std::unordered_map<std::string, double> &resources);
void TestActorRestart(std::unordered_map<std::string, double> &resources);
protected:
bool WaitForDirectCallActorState(const ActorID &actor_id, bool wait_alive,
@@ -481,7 +481,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map<std::string, double> &reso
}
}
void CoreWorkerTest::TestActorReconstruction(
void CoreWorkerTest::TestActorRestart(
std::unordered_map<std::string, double> &resources) {
auto &driver = CoreWorkerProcess::GetCoreWorker();
@@ -512,10 +512,10 @@ void CoreWorkerTest::TestActorReconstruction(
};
ASSERT_TRUE(WaitForCondition(check_actor_restart_func, 30 * 1000 /* 30s */));
RAY_LOG(INFO) << "actor has been reconstructed";
RAY_LOG(INFO) << "actor has been restarted";
}
// wait for actor being reconstructed.
// wait for actor being restarted.
auto buffer1 = GenerateRandomBuffer();
// Create arguments with PassByValue.
@@ -558,7 +558,7 @@ void CoreWorkerTest::TestActorFailure(
ASSERT_EQ(system("pkill mock_worker"), 0);
}
// wait for actor being reconstructed.
// wait for actor being restarted.
auto buffer1 = GenerateRandomBuffer();
// Create arguments with PassByRef and PassByValue.
@@ -699,7 +699,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) {
// Create an actor.
std::unordered_map<std::string, double> resources;
auto actor_id = CreateActorHelper(resources,
/*max_reconstructions=*/0);
/*max_restarts=*/0);
// wait for actor creation finish.
ASSERT_TRUE(WaitForDirectCallActorState(actor_id, true, 30 * 1000 /* 30s */));
// Test submitting some tasks with by-value args for that actor.
@@ -1002,13 +1002,13 @@ TEST_F(TwoNodeTest, TestActorTaskCrossNodes) {
TEST_F(SingleNodeTest, TestActorTaskLocalReconstruction) {
std::unordered_map<std::string, double> resources;
TestActorReconstruction(resources);
TestActorRestart(resources);
}
TEST_F(TwoNodeTest, TestActorTaskCrossNodesReconstruction) {
std::unordered_map<std::string, double> resources;
resources.emplace("resource1", 1);
TestActorReconstruction(resources);
TestActorRestart(resources);
}
TEST_F(SingleNodeTest, TestActorTaskLocalFailure) {
@@ -12,14 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/core_worker/transport/direct_task_transport.h"
#include "ray/core_worker/object_recovery_manager.h"
#include "gtest/gtest.h"
#include "ray/common/task/task_spec.h"
#include "ray/common/task/task_util.h"
#include "ray/common/test_util.h"
#include "ray/core_worker/object_recovery_manager.h"
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
#include "ray/core_worker/transport/direct_task_transport.h"
#include "ray/raylet/raylet_client.h"
namespace ray {
@@ -142,7 +142,7 @@ TEST_F(TaskManagerTest, TestTaskFailure) {
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
}
TEST_F(TaskManagerTest, TestTaskRetry) {
TEST_F(TaskManagerTest, TestTaskReconstruction) {
TaskID caller_id = TaskID::Nil();
rpc::Address caller_address;
ObjectID dep1 = ObjectID::FromRandom();
@@ -23,27 +23,26 @@ using ray::rpc::ActorTableData;
namespace ray {
void CoreWorkerDirectActorTaskSubmitter::KillActor(const ActorID &actor_id,
bool force_kill,
bool no_reconstruction) {
bool force_kill, bool no_restart) {
absl::MutexLock lock(&mu_);
rpc::KillActorRequest request;
request.set_intended_actor_id(actor_id.Binary());
request.set_force_kill(force_kill);
request.set_no_reconstruction(no_reconstruction);
request.set_no_restart(no_restart);
auto inserted = pending_force_kills_.emplace(actor_id, request);
if (!inserted.second && force_kill) {
// Overwrite the previous request to kill the actor if the new request is a
// force kill.
inserted.first->second.set_force_kill(true);
if (no_reconstruction) {
if (no_restart) {
// Overwrite the previous request to disable reconstruction if the new request's
// no_reconstruction flag is set to true.
inserted.first->second.set_no_reconstruction(true);
// no_restart flag is set to true.
inserted.first->second.set_no_restart(true);
}
}
auto it = rpc_clients_.find(actor_id);
if (it == rpc_clients_.end()) {
// Actor is not yet created, or is being reconstructed, cache the request
// Actor is not yet created, or is being restarted, cache the request
// and submit after actor is alive.
// TODO(zhijunfu): it might be possible for a user to specify an invalid
// actor handle (e.g. from unpickling), in that case it might be desirable
@@ -85,7 +84,7 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
auto it = rpc_clients_.find(actor_id);
if (it == rpc_clients_.end()) {
// Actor is not yet created, or is being reconstructed, cache the request
// Actor is not yet created, or is being restarted, cache the request
// and submit after actor is alive.
// TODO(zhijunfu): it might be possible for a user to specify an invalid
// actor handle (e.g. from unpickling), in that case it might be desirable
@@ -120,7 +119,7 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(const ActorID &actor_id
bool dead) {
absl::MutexLock lock(&mu_);
if (!dead) {
// We're reconstructing the actor, so erase the client for now. The new client
// We're restarting the actor, so erase the client for now. The new client
// will be inserted once actor reconstruction completes. We don't erase the
// client when the actor is DEAD, so that all further tasks will be failed.
rpc_clients_.erase(actor_id);
@@ -322,7 +321,7 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
if (it != scheduling_queue_.end()) {
if (it->second.first.caller_worker_id != caller_worker_id) {
// We received a request with the same caller ID, but from a different worker,
// this indicates the caller (actor) is reconstructed.
// this indicates the caller (actor) is restarted.
if (it->second.first.caller_creation_timestamp_ms < caller_version) {
// The new request has a newer caller version, then remove the old entry
// from scheduling queue since it's invalid now.
@@ -70,9 +70,9 @@ class CoreWorkerDirectActorTaskSubmitter {
/// \param[in] actor_id The actor_id of the actor to kill.
/// \param[in] force_kill Whether to force kill the actor, or let the actor
/// try a clean exit.
/// \param[in] no_reconstruction If set to true, the killed actor will not be
/// reconstructed anymore.
void KillActor(const ActorID &actor_id, bool force_kill, bool no_reconstruction);
/// \param[in] no_restart If set to true, the killed actor will not be
/// restarted anymore.
void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart);
/// Create connection to actor and send all pending tasks.
///
+19 -10
View File
@@ -13,6 +13,7 @@
// limitations under the License.
#include "gcs_actor_manager.h"
#include <ray/common/ray_config.h>
#include <utility>
@@ -236,7 +237,7 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) {
rpc::KillActorRequest request;
request.set_intended_actor_id(actor_id.Binary());
request.set_force_kill(true);
request.set_no_reconstruction(true);
request.set_no_restart(true);
RAY_UNUSED(actor_client->KillActor(request, nullptr));
RAY_CHECK(node_it->second.erase(actor->GetWorkerID()));
@@ -311,7 +312,7 @@ void GcsActorManager::OnWorkerDead(const ray::ClientID &node_id,
if (!actor_id.IsNil()) {
RAY_LOG(INFO) << "Worker " << worker_id << " on node " << node_id
<< " failed, reconstructing actor " << actor_id;
<< " failed, restarting actor " << actor_id;
// Reconstruct the actor.
ReconstructActor(actor_id, /*need_reschedule=*/!intentional_exit);
}
@@ -360,17 +361,25 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
auto worker_id = actor->GetWorkerID();
actor->UpdateAddress(rpc::Address());
auto mutable_actor_table_data = actor->GetMutableActorTableData();
// If the need_reschedule is set to false, then set the `remaining_reconstructions` to 0
// If the need_reschedule is set to false, then set the `remaining_restarts` to 0
// so that the actor will never be rescheduled.
auto remaining_reconstructions =
need_reschedule ? mutable_actor_table_data->remaining_reconstructions() : 0;
int64_t max_restarts = mutable_actor_table_data->max_restarts();
uint64_t num_restarts = mutable_actor_table_data->num_restarts();
int64_t remaining_restarts;
if (!need_reschedule) {
remaining_restarts = 0;
} else if (max_restarts == -1) {
remaining_restarts = -1;
} else {
int64_t remaining = max_restarts - num_restarts;
remaining_restarts = std::max(remaining, static_cast<int64_t>(0));
}
RAY_LOG(WARNING) << "Actor is failed " << actor_id << " on worker " << worker_id
<< " at node " << node_id << ", need_reschedule = " << need_reschedule
<< ", remaining_reconstructions = " << remaining_reconstructions;
if (remaining_reconstructions > 0) {
mutable_actor_table_data->set_remaining_reconstructions(--remaining_reconstructions);
mutable_actor_table_data->set_state(rpc::ActorTableData::RECONSTRUCTING);
<< ", remaining_restarts = " << remaining_restarts;
if (remaining_restarts != 0) {
mutable_actor_table_data->set_num_restarts(++num_restarts);
mutable_actor_table_data->set_state(rpc::ActorTableData::RESTARTING);
auto actor_table_data =
std::make_shared<rpc::ActorTableData>(*mutable_actor_table_data);
// The backend storage is reliable in the future, so the status must be ok.
+2 -4
View File
@@ -48,10 +48,8 @@ class GcsActor {
const auto &actor_creation_task_spec = request.task_spec().actor_creation_task_spec();
actor_table_data_.set_actor_id(actor_creation_task_spec.actor_id());
actor_table_data_.set_job_id(request.task_spec().job_id());
actor_table_data_.set_max_reconstructions(
actor_creation_task_spec.max_actor_reconstructions());
actor_table_data_.set_remaining_reconstructions(
actor_creation_task_spec.max_actor_reconstructions());
actor_table_data_.set_max_restarts(actor_creation_task_spec.max_actor_restarts());
actor_table_data_.set_num_restarts(0);
auto dummy_object =
TaskSpecification(request.task_spec()).ActorDummyObject().Binary();
+1
View File
@@ -13,6 +13,7 @@
// limitations under the License.
#include "gcs_server.h"
#include "actor_info_handler_impl.h"
#include "error_info_handler_impl.h"
#include "gcs_actor_manager.h"
@@ -16,6 +16,7 @@
#include <ray/gcs/test/gcs_test_util.h>
#include <memory>
#include "gtest/gtest.h"
namespace ray {
@@ -227,8 +228,8 @@ TEST_F(GcsActorManagerTest, TestNodeFailure) {
TEST_F(GcsActorManagerTest, TestActorReconstruction) {
auto job_id = JobID::FromInt(1);
auto create_actor_request = Mocker::GenCreateActorRequest(
job_id, /*max_reconstructions=*/1, /*detached=*/false);
auto create_actor_request =
Mocker::GenCreateActorRequest(job_id, /*max_restarts=*/1, /*detached=*/false);
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
Status status = gcs_actor_manager_->RegisterActor(
create_actor_request, [&finished_actors](std::shared_ptr<gcs::GcsActor> actor) {
@@ -254,7 +255,7 @@ TEST_F(GcsActorManagerTest, TestActorReconstruction) {
// Remove worker and then check that the actor is being restarted.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id));
gcs_actor_manager_->OnNodeDead(node_id);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::RECONSTRUCTING);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::RESTARTING);
// Add node and check that the actor is restarted.
gcs_actor_manager_->SchedulePendingActors();
@@ -287,8 +288,8 @@ TEST_F(GcsActorManagerTest, TestActorReconstruction) {
TEST_F(GcsActorManagerTest, TestActorRestartWhenOwnerDead) {
auto job_id = JobID::FromInt(1);
auto create_actor_request = Mocker::GenCreateActorRequest(
job_id, /*max_reconstructions=*/1, /*detached=*/false);
auto create_actor_request =
Mocker::GenCreateActorRequest(job_id, /*max_restarts=*/1, /*detached=*/false);
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
RAY_CHECK_OK(gcs_actor_manager_->RegisterActor(
create_actor_request, [&finished_actors](std::shared_ptr<gcs::GcsActor> actor) {
@@ -331,7 +332,7 @@ TEST_F(GcsActorManagerTest, TestActorRestartWhenOwnerDead) {
TEST_F(GcsActorManagerTest, TestDetachedActorRestartWhenCreatorDead) {
auto job_id = JobID::FromInt(1);
auto create_actor_request =
Mocker::GenCreateActorRequest(job_id, /*max_reconstructions=*/1, /*detached=*/true);
Mocker::GenCreateActorRequest(job_id, /*max_restarts=*/1, /*detached=*/true);
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
RAY_CHECK_OK(gcs_actor_manager_->RegisterActor(
create_actor_request, [&finished_actors](std::shared_ptr<gcs::GcsActor> actor) {
+3 -3
View File
@@ -60,7 +60,7 @@ inline std::shared_ptr<ray::rpc::ErrorTableData> CreateErrorTableData(
/// Helper function to produce actor table data.
inline std::shared_ptr<ray::rpc::ActorTableData> CreateActorTableData(
const TaskSpecification &task_spec, const ray::rpc::Address &address,
ray::rpc::ActorTableData::ActorState state, uint64_t remaining_reconstructions) {
ray::rpc::ActorTableData::ActorState state, uint64_t num_restarts) {
RAY_CHECK(task_spec.IsActorCreationTask());
auto actor_id = task_spec.ActorCreationId();
auto actor_info_ptr = std::make_shared<ray::rpc::ActorTableData>();
@@ -71,10 +71,10 @@ inline std::shared_ptr<ray::rpc::ActorTableData> CreateActorTableData(
actor_info_ptr->set_actor_creation_dummy_object_id(
task_spec.ActorDummyObject().Binary());
actor_info_ptr->set_job_id(task_spec.JobId().Binary());
actor_info_ptr->set_max_reconstructions(task_spec.MaxActorReconstructions());
actor_info_ptr->set_max_restarts(task_spec.MaxActorRestarts());
actor_info_ptr->set_is_detached(task_spec.IsDetachedActor());
// Set the fields that change when the actor is restarted.
actor_info_ptr->set_remaining_reconstructions(remaining_reconstructions);
actor_info_ptr->set_num_restarts(num_restarts);
actor_info_ptr->mutable_address()->CopyFrom(address);
actor_info_ptr->mutable_owner_address()->CopyFrom(
task_spec.GetMessage().caller_address());
+3 -4
View File
@@ -99,12 +99,11 @@ Status RedisLogBasedActorInfoAccessor::AsyncUpdate(
const ActorID &actor_id, const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback) {
// The actor log starts with an ALIVE entry. This is followed by 0 to N pairs
// of (RECONSTRUCTING, ALIVE) entries, where N is the maximum number of
// of (RESTARTING, ALIVE) entries, where N is the maximum number of
// reconstructions. This is followed optionally by a DEAD entry.
int log_length =
2 * (data_ptr->max_reconstructions() - data_ptr->remaining_reconstructions());
int log_length = 2 * (data_ptr->num_restarts());
if (data_ptr->state() != ActorTableData::ALIVE) {
// RECONSTRUCTING or DEAD entries have an odd index.
// RESTARTING or DEAD entries have an odd index.
log_length += 1;
}
RAY_LOG(DEBUG) << "AsyncUpdate actor state to " << data_ptr->state()
@@ -19,6 +19,7 @@
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "ray/common/id.h"
#include "ray/common/test_util.h"
#include "ray/gcs/store_client/store_client.h"
@@ -221,8 +222,8 @@ class StoreClientTestBase : public ::testing::Test {
void GenTestData() {
for (size_t i = 0; i < key_count_; i++) {
rpc::ActorTableData actor;
actor.set_max_reconstructions(1);
actor.set_remaining_reconstructions(1);
actor.set_max_restarts(1);
actor.set_num_restarts(0);
JobID job_id = JobID::FromInt(i % index_count_);
actor.set_job_id(job_id.Binary());
actor.set_state(rpc::ActorTableData::ALIVE);
+3 -4
View File
@@ -23,12 +23,11 @@
#include "ray/common/constants.h"
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/util/logging.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/entry_change_notification.h"
#include "ray/gcs/redis_context.h"
#include "ray/protobuf/gcs.pb.h"
#include "ray/util/logging.h"
struct redisAsyncContext;
@@ -717,8 +716,8 @@ class JobTable : public Log<JobID, JobTableData> {
};
/// Log-based Actor table starts with an ALIVE entry, which represents the first time the
/// actor is created. This may be followed by 0 or more pairs of RECONSTRUCTING, ALIVE
/// entries, which represent each time the actor fails (RECONSTRUCTING) and gets recreated
/// actor is created. This may be followed by 0 or more pairs of RESTARTING, ALIVE
/// entries, which represent each time the actor fails (RESTARTING) and gets recreated
/// (ALIVE). These may be followed by a DEAD entry, which means that the actor has failed
/// and will not be reconstructed.
class LogBasedActorTable : public Log<ActorID, ActorTableData> {
+9 -12
View File
@@ -17,21 +17,19 @@
#include <memory>
#include <utility>
#include "gmock/gmock.h"
#include "gmock/gmock.h"
#include "src/ray/common/task/task.h"
#include "src/ray/common/task/task_util.h"
#include "src/ray/common/test_util.h"
#include "src/ray/util/asio_util.h"
#include "src/ray/protobuf/gcs_service.grpc.pb.h"
#include "src/ray/util/asio_util.h"
namespace ray {
struct Mocker {
static TaskSpecification GenActorCreationTask(const JobID &job_id,
int max_reconstructions, bool detached,
const std::string &name,
static TaskSpecification GenActorCreationTask(const JobID &job_id, int max_restarts,
bool detached, const std::string &name,
const rpc::Address &owner_address) {
TaskSpecBuilder builder;
rpc::Address empty_address;
@@ -41,13 +39,12 @@ struct Mocker {
auto task_id = TaskID::ForActorCreationTask(actor_id);
builder.SetCommonTaskSpec(task_id, Language::PYTHON, empty_descriptor, job_id,
TaskID::Nil(), 0, TaskID::Nil(), owner_address, 1, {}, {});
builder.SetActorCreationTaskSpec(actor_id, max_reconstructions, {}, 1, detached,
name);
builder.SetActorCreationTaskSpec(actor_id, max_restarts, {}, 1, detached, name);
return builder.Build();
}
static rpc::CreateActorRequest GenCreateActorRequest(const JobID &job_id,
int max_reconstructions = 0,
int max_restarts = 0,
bool detached = false,
const std::string name = "") {
rpc::CreateActorRequest request;
@@ -59,7 +56,7 @@ struct Mocker {
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
}
auto actor_creation_task_spec =
GenActorCreationTask(job_id, max_reconstructions, detached, name, owner_address);
GenActorCreationTask(job_id, max_restarts, detached, name, owner_address);
request.mutable_task_spec()->CopyFrom(actor_creation_task_spec.GetMessage());
return request;
}
@@ -89,8 +86,8 @@ struct Mocker {
actor_table_data->set_job_id(job_id.Binary());
actor_table_data->set_state(
rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE);
actor_table_data->set_max_reconstructions(1);
actor_table_data->set_remaining_reconstructions(1);
actor_table_data->set_max_restarts(1);
actor_table_data->set_num_restarts(0);
return actor_table_data;
}
@@ -32,8 +32,8 @@ class ActorInfoAccessorTest : public AccessorTestBase<ActorID, ActorTableData> {
virtual void GenTestData() {
for (size_t i = 0; i < 100; ++i) {
std::shared_ptr<ActorTableData> actor = std::make_shared<ActorTableData>();
actor->set_max_reconstructions(1);
actor->set_remaining_reconstructions(1);
actor->set_max_restarts(1);
actor->set_num_restarts(0);
JobID job_id = JobID::FromInt(i);
actor->set_job_id(job_id.Binary());
actor->set_state(ActorTableData::ALIVE);
@@ -13,6 +13,7 @@
// limitations under the License.
#include "ray/gcs/subscription_executor.h"
#include "gtest/gtest.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/entry_change_notification.h"
@@ -61,8 +62,8 @@ class SubscriptionExecutorTest : public AccessorTestBase<ActorID, ActorTableData
virtual void GenTestData() {
for (size_t i = 0; i < 100; ++i) {
std::shared_ptr<ActorTableData> actor = std::make_shared<ActorTableData>();
actor->set_max_reconstructions(1);
actor->set_remaining_reconstructions(1);
actor->set_max_restarts(1);
actor->set_num_restarts(0);
JobID job_id = JobID::FromInt(i);
actor->set_job_id(job_id.Binary());
actor->set_state(ActorTableData::ALIVE);
+3 -2
View File
@@ -148,8 +148,9 @@ message ActorCreationTaskSpec {
// ID of the actor that will be created by this task.
bytes actor_id = 2;
// The max number of times this actor should be recontructed.
// If this number of 0 or negative, the actor won't be reconstructed on failure.
uint64 max_actor_reconstructions = 3;
// If this number is 0 the actor won't be restarted.
// If this number is -1 the actor will be restarted indefinitely.
int64 max_actor_restarts = 3;
// The dynamic options used in the worker command when starting a worker process for
// an actor creation task. If the list isn't empty, the options will be used to replace
// the placeholder strings (`RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER_0`,
+5 -6
View File
@@ -107,12 +107,11 @@ message PushTaskRequest {
// Resource mapping ids assigned to the worker executing the task.
repeated ResourceMapEntry resource_mapping = 6;
// The version of the caller. This is used to distinguish on-the-fly
// requests from a caller before it die, and requests from the reconstructed
// requests from a caller before it die, and requests from the restarted
// caller, which might happen theoretically when network has issues.
// - For an actor, this is set to the timestamp when the actor is created,
// so it can be used to differentiate which is the new reconstructed actor.
// - For a non-actor task, it's set to the timestamp the task starts
// execution.
// so it can be used to differentiate which is the newly restarted actor.
// - For a non-actor task, it's set to the timestamp the task starts execution.
int64 caller_version = 7;
}
@@ -188,8 +187,8 @@ message KillActorRequest {
bytes intended_actor_id = 1;
// Whether to force kill the actor.
bool force_kill = 2;
// If set to true, the killed actor will not be reconstructed anymore.
bool no_reconstruction = 3;
// If set to true, the killed actor will not be restarted anymore.
bool no_restart = 3;
}
message KillActorReply {
+10 -9
View File
@@ -103,10 +103,10 @@ message ActorTableData {
PENDING = 0;
// Actor is alive.
ALIVE = 1;
// Actor is dead, now being reconstructed.
// Actor is dead, now being restarted.
// After reconstruction finishes, the state will become alive again.
RECONSTRUCTING = 2;
// Actor is already dead and won't be reconstructed.
RESTARTING = 2;
// Actor is already dead and won't be restarted.
DEAD = 3;
}
// The ID of the actor that was created.
@@ -114,17 +114,18 @@ message ActorTableData {
// The ID of the caller of the actor creation task.
bytes parent_id = 2;
// The dummy object ID returned by the actor creation task. If the actor
// dies, then this is the object that should be reconstructed for the actor
// dies, then this is the object that should be restarted for the actor
// to be recreated.
bytes actor_creation_dummy_object_id = 3;
// The ID of the job that created the actor.
bytes job_id = 4;
// Current state of this actor.
ActorState state = 6;
// Max number of times this actor should be reconstructed.
uint64 max_reconstructions = 7;
// Remaining number of reconstructions.
uint64 remaining_reconstructions = 8;
// Max number of times this actor should be restarted,
// a value of -1 indicates an infinite number of reconstruction attempts.
int64 max_restarts = 7;
// Number of restarts that have already been performed on this actor.
uint64 num_restarts = 8;
// The address of the the actor.
Address address = 9;
// The address of the the actor's owner (parent).
@@ -344,7 +345,7 @@ enum ErrorType {
WORKER_DIED = 0;
// Indicates that a task failed because the actor died unexpectedly before finishing it.
ACTOR_DIED = 1;
// Indicates that an object is lost and cannot be reconstructed.
// Indicates that an object is lost and cannot be restarted.
// Note, this currently only happens to actor objects. When the actor's state is already
// after the object's creating task, the actor cannot re-run the task.
// TODO(hchen): we may want to reuse this error type for more cases. E.g.,
+11 -4
View File
@@ -65,12 +65,19 @@ const JobID ActorRegistration::GetJobId() const {
return JobID::FromBinary(actor_table_data_.job_id());
}
const int64_t ActorRegistration::GetMaxReconstructions() const {
return actor_table_data_.max_reconstructions();
const int64_t ActorRegistration::GetMaxRestarts() const {
return actor_table_data_.max_restarts();
}
const int64_t ActorRegistration::GetRemainingReconstructions() const {
return actor_table_data_.remaining_reconstructions();
const int64_t ActorRegistration::GetRemainingRestarts() const {
if (actor_table_data_.max_restarts() == -1) {
return -1;
}
return actor_table_data_.max_restarts() - actor_table_data_.num_restarts();
}
const uint64_t ActorRegistration::GetNumRestarts() const {
return actor_table_data_.num_restarts();
}
const std::unordered_map<TaskID, ActorRegistration::FrontierLeaf>
+8 -5
View File
@@ -82,7 +82,7 @@ class ActorRegistration {
/// Get the object that represents the actor's initial state. This is the
/// execution dependency returned by this actor's creation task. If
/// reconstructed, this will recreate the actor.
/// restarted, this will recreate the actor.
///
/// \return The execution dependency returned by the actor's creation task.
const ObjectID GetActorCreationDependency() const;
@@ -90,11 +90,14 @@ class ActorRegistration {
/// Get actor's job ID.
const JobID GetJobId() const;
/// Get the max number of times this actor should be reconstructed.
const int64_t GetMaxReconstructions() const;
/// Get the max number of times this actor should be restarted.
const int64_t GetMaxRestarts() const;
/// Get the remaining number of times this actor should be reconstructed.
const int64_t GetRemainingReconstructions() const;
/// Get the remaining number of times this actor should be restarted.
const int64_t GetRemainingRestarts() const;
/// Get the number of times this actor has already been restarted
const uint64_t GetNumRestarts() const;
/// Get the object that represents the actor's current state. This is the
/// execution dependency returned by the task most recently executed on the
+40 -46
View File
@@ -55,7 +55,7 @@ int64_t GetExpectedTaskCounter(
struct ActorStats {
int live_actors = 0;
int dead_actors = 0;
int reconstructing_actors = 0;
int restarting_actors = 0;
int max_num_handles = 0;
};
@@ -66,8 +66,8 @@ ActorStats GetActorStatisticalData(
for (auto &pair : actor_registry) {
if (pair.second.GetState() == ray::rpc::ActorTableData::ALIVE) {
item.live_actors += 1;
} else if (pair.second.GetState() == ray::rpc::ActorTableData::RECONSTRUCTING) {
item.reconstructing_actors += 1;
} else if (pair.second.GetState() == ray::rpc::ActorTableData::RESTARTING) {
item.restarting_actors += 1;
} else {
item.dead_actors += 1;
}
@@ -588,7 +588,7 @@ void NodeManager::NodeRemoved(const GcsNodeInfo &node_info) {
actor_entry.second.GetState() == ActorTableData::ALIVE) {
RAY_LOG(INFO) << "Actor " << actor_entry.first
<< " is disconnected, because its node " << node_id
<< " is removed from cluster. It may be reconstructed.";
<< " is removed from cluster. It may be restarted.";
HandleDisconnectedActor(actor_entry.first, /*was_local=*/false,
/*intentional_disconnect=*/false);
}
@@ -815,13 +815,11 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id,
} else {
// Only process the state transition if it is to a later state than ours.
if (actor_registration.GetState() > it->second.GetState() &&
actor_registration.GetRemainingReconstructions() ==
it->second.GetRemainingReconstructions()) {
actor_registration.GetNumRestarts() == it->second.GetNumRestarts()) {
// The new state is later than ours if it is about the same lifetime, but
// a greater state.
it->second = actor_registration;
} else if (actor_registration.GetRemainingReconstructions() <
it->second.GetRemainingReconstructions()) {
} else if (actor_registration.GetNumRestarts() > it->second.GetNumRestarts()) {
// The new state is also later than ours it is about a later lifetime of
// the actor.
it->second = actor_registration;
@@ -835,11 +833,11 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id,
<< ", node_manager_id = " << actor_registration.GetNodeManagerId()
<< ", state = "
<< ActorTableData::ActorState_Name(actor_registration.GetState())
<< ", remaining_reconstructions = "
<< actor_registration.GetRemainingReconstructions();
<< ", remaining_restarts = "
<< actor_registration.GetRemainingRestarts();
if (actor_registration.GetState() == ActorTableData::ALIVE) {
// The actor is now alive (created for the first time or reconstructed). We can
// The actor is now alive (created for the first time or restarted). We can
// stop listening for the actor creation task. This is needed because we use
// `ListenAndMaybeReconstruct` to reconstruct the actor.
reconstruction_policy_.Cancel(actor_registration.GetActorCreationDependency());
@@ -877,8 +875,8 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id,
for (auto const &task : removed_tasks) {
TreatTaskAsFailed(task, ErrorType::ACTOR_DIED);
}
} else if (actor_registration.GetState() == ActorTableData::RECONSTRUCTING) {
RAY_LOG(DEBUG) << "Actor is being reconstructed: " << actor_id;
} else if (actor_registration.GetState() == ActorTableData::RESTARTING) {
RAY_LOG(DEBUG) << "Actor is being restarted: " << actor_id;
if (!(RayConfig::instance().gcs_service_enabled() &&
RayConfig::instance().gcs_actor_service_enabled())) {
// The actor is dead and needs reconstruction. Attempting to reconstruct its
@@ -887,7 +885,7 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id,
actor_registration.GetActorCreationDependency());
}
// When an actor fails but can be reconstructed, resubmit all of the queued
// When an actor fails but can be restarted, resubmit all of the queued
// tasks for that actor. This will mark the tasks as waiting for actor
// creation.
auto tasks_to_remove = local_queues_.GetTaskIdsForActor(actor_id);
@@ -1150,15 +1148,15 @@ void NodeManager::HandleDisconnectedActor(const ActorID &actor_id, bool was_loca
auto actor_entry = actor_registry_.find(actor_id);
RAY_CHECK(actor_entry != actor_registry_.end());
auto &actor_registration = actor_entry->second;
auto remainingRestarts = actor_registration.GetRemainingRestarts();
RAY_LOG(DEBUG) << "The actor with ID " << actor_id << " died "
<< (intentional_disconnect ? "intentionally" : "unintentionally")
<< ", remaining reconstructions = "
<< actor_registration.GetRemainingReconstructions();
<< ", remaining restarts = " << remainingRestarts;
// Check if this actor needs to be reconstructed.
// Check if this actor needs to be restarted.
ActorState new_state =
actor_registration.GetRemainingReconstructions() > 0 && !intentional_disconnect
? ActorTableData::RECONSTRUCTING
(remainingRestarts == -1 || remainingRestarts > 0) && !intentional_disconnect
? ActorTableData::RESTARTING
: ActorTableData::DEAD;
if (was_local) {
// Clean up the dummy objects from this actor.
@@ -1189,7 +1187,7 @@ void NodeManager::HandleDisconnectedActor(const ActorID &actor_id, bool was_loca
auto actor_notification = std::make_shared<ActorTableData>(new_actor_info);
RAY_CHECK_OK(gcs_client_->Actors().AsyncUpdate(actor_id, actor_notification, done));
if (was_local && new_state == ActorTableData::RECONSTRUCTING) {
if (was_local && new_state == ActorTableData::RESTARTING) {
RAY_LOG(INFO) << "A local actor (id = " << actor_id
<< " ) is dead, reconstructing it.";
const ObjectID &actor_creation_dummy_object_id =
@@ -1385,7 +1383,7 @@ void NodeManager::ProcessFetchOrReconstructMessage(
} else {
// If reconstruction is also required, then add any requested objects to
// the list to subscribe to in the task dependency manager. These objects
// will be pulled from remote node managers and reconstructed if
// will be pulled from remote node managers and restarted if
// necessary.
required_object_ids.push_back(object_id);
}
@@ -1412,7 +1410,7 @@ void NodeManager::ProcessWaitRequestMessage(
if (!task_dependency_manager_.CheckObjectLocal(object_id)) {
// Add any missing objects to the list to subscribe to in the task
// dependency manager. These objects will be pulled from remote node
// managers and reconstructed if necessary.
// managers and restarted if necessary.
required_object_ids.push_back(object_id);
}
}
@@ -1463,7 +1461,7 @@ void NodeManager::ProcessWaitForDirectActorCallArgsRequestMessage(
if (!task_dependency_manager_.CheckObjectLocal(object_id)) {
// Add any missing objects to the list to subscribe to in the task
// dependency manager. These objects will be pulled from remote node
// managers and reconstructed if necessary.
// managers and restarted if necessary.
required_object_ids.push_back(object_id);
}
}
@@ -2142,7 +2140,7 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
if (local_queues_.HasTask(task_id)) {
RAY_LOG(WARNING) << "Submitted task " << task_id
<< " is already queued and will not be reconstructed. This is most "
<< " is already queued and will not be restarted. This is most "
"likely due to spurious reconstruction.";
return;
}
@@ -2151,10 +2149,10 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
// Check whether we know the location of the actor.
const auto actor_entry = actor_registry_.find(spec.ActorId());
bool seen = actor_entry != actor_registry_.end();
// If we have already seen this actor and this actor is not being reconstructed,
// If we have already seen this actor and this actor is not being restarted,
// its location is known.
bool location_known =
seen && actor_entry->second.GetState() != ActorTableData::RECONSTRUCTING;
seen && actor_entry->second.GetState() != ActorTableData::RESTARTING;
if (location_known) {
if (actor_entry->second.GetState() == ActorTableData::DEAD) {
// If this actor is dead, either because the actor process is dead
@@ -2369,7 +2367,7 @@ void NodeManager::AsyncResolveObjects(const std::shared_ptr<ClientConnection> &c
}
// Subscribe to the objects required by the task. These objects will be
// fetched and/or reconstructed as necessary, until the objects become local
// fetched and/or restarted as necessary, until the objects become local
// or are unsubscribed.
if (ray_get) {
// TODO(ekl) using the assigned task id is a hack to handle unsubscription for
@@ -2617,42 +2615,38 @@ std::shared_ptr<ActorTableData> NodeManager::CreateActorTableDataFromCreationTas
auto actor_id = task_spec.ActorCreationId();
auto actor_entry = actor_registry_.find(actor_id);
std::shared_ptr<ActorTableData> actor_info_ptr;
// TODO(swang): If this is an actor that was reconstructed, and previous
// TODO(swang): If this is an actor that was restarted, and previous
// actor notifications were delayed, then this node may not have an entry for
// the actor in actor_regisry_. Then, the fields for the number of
// reconstructions will be wrong.
// restarts will be wrong.
if (actor_entry == actor_registry_.end()) {
actor_info_ptr.reset(new ActorTableData());
// Set all of the static fields for the actor. These fields will not
// change even if the actor fails or is reconstructed.
// change even if the actor fails or is restarted.
actor_info_ptr->set_actor_id(actor_id.Binary());
actor_info_ptr->set_actor_creation_dummy_object_id(
task_spec.ActorDummyObject().Binary());
actor_info_ptr->set_job_id(task_spec.JobId().Binary());
actor_info_ptr->set_max_reconstructions(task_spec.MaxActorReconstructions());
// This is the first time that the actor has been created, so the number
// of remaining reconstructions is the max.
actor_info_ptr->set_remaining_reconstructions(task_spec.MaxActorReconstructions());
actor_info_ptr->set_max_restarts(task_spec.MaxActorRestarts());
actor_info_ptr->set_num_restarts(0);
actor_info_ptr->set_is_detached(task_spec.IsDetachedActor());
actor_info_ptr->mutable_owner_address()->CopyFrom(
task_spec.GetMessage().caller_address());
} else {
// If we've already seen this actor, it means that this actor was reconstructed.
// Thus, its previous state must be RECONSTRUCTING.
// If we've already seen this actor, it means that this actor was restarted.
// Thus, its previous state must be RESTARTING.
// TODO: The following is a workaround for the issue described in
// https://github.com/ray-project/ray/issues/5524, please see the issue
// description for more information.
if (actor_entry->second.GetState() != ActorTableData::RECONSTRUCTING) {
RAY_LOG(WARNING) << "Actor not in reconstructing state, most likely it "
if (actor_entry->second.GetState() != ActorTableData::RESTARTING) {
RAY_LOG(WARNING) << "Actor not in restarting state, most likely it "
<< "died before creation handler could run. Actor state is "
<< actor_entry->second.GetState();
}
// Copy the static fields from the current actor entry.
actor_info_ptr.reset(new ActorTableData(actor_entry->second.GetTableData()));
// We are reconstructing the actor, so subtract its
// remaining_reconstructions by 1.
actor_info_ptr->set_remaining_reconstructions(
actor_info_ptr->remaining_reconstructions() - 1);
// We are restarting the actor, so increment its num_restarts
actor_info_ptr->set_num_restarts(actor_info_ptr->num_restarts() + 1);
}
// Set the new fields for the actor's state to indicate that the actor is
@@ -2768,7 +2762,7 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) {
// NOTE(swang): The dummy objects must be marked as local whenever
// ExtendFrontier is called, and vice versa, so that we can clean up the
// dummy objects properly in case the actor fails and needs to be
// reconstructed.
// restarted.
HandleObjectLocal(task_spec.ActorDummyObject());
}
}
@@ -3324,7 +3318,7 @@ std::string NodeManager::DebugString() const {
auto statistical_data = GetActorStatisticalData(actor_registry_);
result << "\n- num live actors: " << statistical_data.live_actors;
result << "\n- num reconstructing actors: " << statistical_data.reconstructing_actors;
result << "\n- num restarting actors: " << statistical_data.restarting_actors;
result << "\n- num dead actors: " << statistical_data.dead_actors;
result << "\n- max num handles: " << statistical_data.max_num_handles;
@@ -3725,8 +3719,8 @@ void NodeManager::RecordMetrics() {
auto statistical_data = GetActorStatisticalData(actor_registry_);
stats::ActorStats().Record(statistical_data.live_actors,
{{stats::ValueTypeKey, "live_actors"}});
stats::ActorStats().Record(statistical_data.reconstructing_actors,
{{stats::ValueTypeKey, "reconstructing_actors"}});
stats::ActorStats().Record(statistical_data.restarting_actors,
{{stats::ValueTypeKey, "restarting_actors"}});
stats::ActorStats().Record(statistical_data.dead_actors,
{{stats::ValueTypeKey, "dead_actors"}});
stats::ActorStats().Record(statistical_data.max_num_handles,
+1 -1
View File
@@ -533,7 +533,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
const uint8_t *message_data);
/// Handle the case where an actor is disconnected, determine whether this
/// actor needs to be reconstructed and then update actor table.
/// actor needs to be restarted and then update actor table.
/// This function needs to be called either when actor process dies or when
/// a node dies.
///
+3 -4
View File
@@ -15,7 +15,6 @@
#include "task_dependency_manager.h"
#include "absl/time/clock.h"
#include "ray/stats/stats.h"
namespace ray {
@@ -331,8 +330,8 @@ void TaskDependencyManager::TaskPending(const Task &task) {
// thus it doesn't need task lease. And actually if we
// acquire a lease in this case and forget to cancel it,
// the lease would never expire which will prevent the
// actor from being reconstructed;
// - When a direct actor is reconstructed, raylet resubmits
// actor from being restarted;
// - When a direct actor is restarted, raylet resubmits
// the task, and the task can be forwarded to another raylet,
// and eventually assigned to a worker. In this case we need
// the task lease to make sure there's only one raylet can
@@ -347,7 +346,7 @@ void TaskDependencyManager::TaskPending(const Task &task) {
// - when it's resubmitted by raylet because of reconstruction,
// `OnDispatch` will not be overriden and thus is nullptr.
if (task.GetTaskSpecification().IsActorCreationTask() && task.OnDispatch() == nullptr) {
// This is an actor creation task, and it's being reconstructed,
// This is an actor creation task, and it's being restarted,
// in this case we still need the task lease. Note that we don't
// require task lease for direct actor creation task.
} else {
@@ -47,7 +47,7 @@ public class WorkerLifecycleController {
ActorCreationOptions options = new ActorCreationOptions.Builder()
.setResources(executionVertex.getResources())
.setMaxReconstructions(ActorCreationOptions.INFINITE_RECONSTRUCTION)
.setMaxRestarts(-1)
.createActorCreationOptions();
RayActor<JobWorker> actor = null;
+2 -3
View File
@@ -259,7 +259,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
}
ActorID CreateActorHelper(const std::unordered_map<std::string, double> &resources,
bool is_direct_call, uint64_t max_reconstructions) {
bool is_direct_call, int64_t max_restarts) {
std::unique_ptr<ActorHandle> actor_handle;
// Test creating actor.
@@ -274,10 +274,9 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
std::string name = "";
ActorCreationOptions actor_options{
max_reconstructions,
max_restarts,
/*max_concurrency=*/1, resources, resources, {},
/*is_detached=*/false, name, /*is_asyncio=*/false};
// Create an actor.
ActorID actor_id;
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().CreateActor(