diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 6b7ec5b84..0babe563d 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -11,6 +11,7 @@ import pytest import signal import sys import time +from pyarrow import plasma import ray import ray.ray_constants as ray_constants @@ -19,6 +20,7 @@ import ray.tests.cluster_utils from ray.tests.conftest import generate_internal_config_map from ray.tests.utils import ( relevant_errors, + wait_for_condition, wait_for_errors, ) @@ -2162,6 +2164,39 @@ def test_actor_reconstruction(ray_start_regular): ray.get(actor.increase.remote()) +def test_actor_reconstruction_without_task(ray_start_regular): + """Test a dead actor can be reconstructed without sending task to it.""" + + def object_exists(obj_id): + """Check wether an object exists in plasma store.""" + plasma_client = ray.worker.global_worker.plasma_client + plasma_id = plasma.ObjectID(obj_id.binary()) + return plasma_client.get( + plasma_id, timeout_ms=0) != plasma.ObjectNotAvailable + + @ray.remote(max_reconstructions=1) + class ReconstructableActor(object): + def __init__(self, obj_ids): + for obj_id in obj_ids: + # Every time the actor gets constructed, + # put a new object in plasma store. + if not object_exists(obj_id): + ray.worker.global_worker.put_object(obj_id, 1) + break + + def get_pid(self): + return os.getpid() + + obj_ids = [ray.ObjectID.from_random() for _ in range(2)] + actor = ReconstructableActor.remote(obj_ids) + # Kill the actor. + pid = ray.get(actor.get_pid.remote()) + os.kill(pid, signal.SIGKILL) + # Wait until the actor is reconstructed. + assert wait_for_condition( + lambda: object_exists(obj_ids[1]), timeout_ms=5000) + + def test_actor_reconstruction_on_node_failure(ray_start_cluster_head): """Test actor reconstruction when node dies unexpectedly.""" cluster = ray_start_cluster_head diff --git a/python/ray/tests/utils.py b/python/ray/tests/utils.py index bd9291d8f..98ee13017 100644 --- a/python/ray/tests/utils.py +++ b/python/ray/tests/utils.py @@ -94,3 +94,25 @@ def wait_for_errors(error_type, num_errors, timeout=10): return time.sleep(0.1) raise Exception("Timing out of wait.") + + +def wait_for_condition(condition_predictor, + timeout_ms=1000, + retry_interval_ms=100): + """A helper function that waits until a condition is met. + + Args: + condition_predictor: A function that predicts the condition. + timeout_ms: Maximum timeout in milliseconds. + retry_interval_ms: Retry interval in milliseconds. + + Return: + Whether the condition is met within the timeout. + """ + time_elapsed = 0 + while time_elapsed <= timeout_ms: + if condition_predictor(): + return True + time_elapsed += retry_interval_ms + time.sleep(retry_interval_ms / 1000.0) + return False diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 2b26579ae..4eb8a8938 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -655,6 +655,10 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id, << actor_registration.GetRemainingReconstructions(); if (actor_registration.GetState() == ActorTableData::ALIVE) { + // The actor is now alive (created for the first time or reconstructed). We can + // stop listening for the actor creation task. This is needed because we use + // `ListenAndMaybeReconstruct` to reconstruct the actor. + reconstruction_policy_.Cancel(actor_registration.GetActorCreationDependency()); // The actor's location is now known. Dequeue any methods that were // submitted before the actor's location was known. // (See design_docs/task_states.rst for the state transition diagram.) @@ -692,6 +696,10 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id, } else { RAY_CHECK(actor_registration.GetState() == ActorTableData::RECONSTRUCTING); RAY_LOG(DEBUG) << "Actor is being reconstructed: " << actor_id; + // The actor is dead and needs reconstruction. Attempting to reconstruct its + // creation task. + reconstruction_policy_.ListenAndMaybeReconstruct( + actor_registration.GetActorCreationDependency()); // When an actor fails but can be reconstructed, resubmit all of the queued // tasks for that actor. This will mark the tasks as waiting for actor // creation. @@ -2056,8 +2064,8 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id) { // The task was not in the GCS task table. It must therefore be in the // lineage cache. RAY_CHECK(lineage_cache_.ContainsTask(task_id)) - << "Task metadata not found in either GCS or lineage cache. It may have been " - "evicted " + << "Metadata of task " << task_id + << " not found in either GCS or lineage cache. It may have been evicted " << "by the redis LRU configuration. Consider increasing the memory " "allocation via " << "ray.init(redis_max_memory=).";