diff --git a/doc/source/fault-tolerance.rst b/doc/source/fault-tolerance.rst index f97c72762..884aa8bc6 100644 --- a/doc/source/fault-tolerance.rst +++ b/doc/source/fault-tolerance.rst @@ -78,12 +78,86 @@ You can experiment with this behavior by running the following code. actor = Actor.remote() # The actor will be restarted up to 5 times. After that, methods will - # raise exceptions. The actor is restarted by rerunning its - # constructor. Methods that were executing when the actor died will also - # raise exceptions. + # always raise a `RayActorError` exception. The actor is restarted by + # rerunning its constructor. Methods that were sent or executing when the + # actor died will also raise a `RayActorError` exception. for _ in range(100): try: counter = ray.get(actor.increment_and_possibly_fail.remote()) print(counter) except ray.exceptions.RayActorError: print('FAILURE') + +By default, actor tasks execute with at-most-once semantics +(``max_task_retries=0`` in the ``@ray.remote`` decorator). This means that if an +actor task is submitted to an actor that is unreachable, Ray will report the +error with ``RayActorError``, a Python-level exception that is thrown when +``ray.get`` is called on the future returned by the task. Note that this +exception may be thrown even though the task did indeed execute successfully. +For example, this can happen if the actor dies immediately after executing the +task. + +Ray also offers at-least-once execution semantics for actor tasks +(``max_task_retries=-1`` or ``max_task_retries > 0``). This means that if an +actor task is submitted to an actor that is unreachable, the system will +automatically retry the task until it receives a reply from the actor. With +this option, the system will only throw a ``RayActorError`` to the application +if one of the following occurs: (1) the actor’s ``max_restarts`` limit has been +exceeded and the actor cannot be restarted anymore, or (2) the +``max_task_retries`` limit has been exceeded for this particular task. The +limit can be set to infinity with ``max_task_retries = -1``. + +You can experiment with this behavior by running the following code. + +.. code-block:: python + + import os + import ray + + ray.init(ignore_reinit_error=True) + + @ray.remote(max_restarts=5, max_task_retries=-1) + class Actor: + def __init__(self): + self.counter = 0 + + def increment_and_possibly_fail(self): + # Exit after every 10 tasks. + if self.counter == 10: + os._exit(0) + self.counter += 1 + return self.counter + + actor = Actor.remote() + + # The actor will be reconstructed up to 5 times. The actor is + # reconstructed by rerunning its constructor. Methods that were + # executing when the actor died will be retried and will not + # raise a `RayActorError`. Retried methods may execute twice, once + # on the failed actor and a second time on the restarted actor. + for _ in range(50): + counter = ray.get(actor.increment_and_possibly_fail.remote()) + print(counter) # Prints the sequence 1-10 5 times. + + # After the actor has been restarted 5 times, all subsequent methods will + # raise a `RayActorError`. + for _ in range(10): + try: + counter = ray.get(actor.increment_and_possibly_fail.remote()) + print(counter) # Unreachable. + except ray.exceptions.RayActorError: + print('FAILURE') # Prints 10 times. + +For at-least-once actors, the system will still guarantee execution ordering +according to the initial submission order. For example, any tasks submitted +after a failed actor task will not execute on the actor until the failed actor +task has been successfully retried. The system also will not attempt to +re-execute any tasks that executed successfully before the failure. + +At-least-once execution is best suited for read-only actors or actors with +ephemeral state that does not need to be rebuilt after a failure. For actors +that have critical state, it is best to take periodic checkpoints and either +manually restart the actor or automatically restart the actor with at-most-once +semantics. If the actor’s exact state at the time of failure is needed, the +application is responsible for resubmitting all tasks since the last +checkpoint. diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index e1d822edc..e49c13fa8 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -903,6 +903,7 @@ cdef class CoreWorker: FunctionDescriptor function_descriptor, args, int64_t max_restarts, + int64_t max_task_retries, resources, placement_resources, int32_t max_concurrency, @@ -929,7 +930,7 @@ cdef class CoreWorker: check_status(CCoreWorkerProcess.GetCoreWorker().CreateActor( ray_function, args_vector, CActorCreationOptions( - max_restarts, max_concurrency, + max_restarts, max_task_retries, max_concurrency, c_resources, c_placement_resources, dynamic_worker_options, is_detached, name, is_asyncio), extension_data, diff --git a/python/ray/actor.py b/python/ray/actor.py index 53bda2449..a50f0efa2 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -244,7 +244,8 @@ class ActorClassMetadata: def __init__(self, language, modified_class, actor_creation_function_descriptor, class_id, max_restarts, - num_cpus, num_gpus, memory, object_store_memory, resources): + max_task_retries, num_cpus, num_gpus, memory, + object_store_memory, resources): self.language = language self.modified_class = modified_class self.actor_creation_function_descriptor = \ @@ -253,6 +254,7 @@ class ActorClassMetadata: self.is_cross_language = language != Language.PYTHON self.class_id = class_id self.max_restarts = max_restarts + self.max_task_retries = max_task_retries self.num_cpus = num_cpus self.num_gpus = num_gpus self.memory = memory @@ -314,7 +316,7 @@ class ActorClass: @classmethod def _ray_from_modified_class(cls, modified_class, class_id, max_restarts, - num_cpus, num_gpus, memory, + max_task_retries, num_cpus, num_gpus, memory, object_store_memory, resources): for attribute in [ "remote", "_remote", "_ray_from_modified_class", @@ -344,20 +346,22 @@ class ActorClass: self.__ray_metadata__ = ActorClassMetadata( Language.PYTHON, modified_class, actor_creation_function_descriptor, class_id, max_restarts, - num_cpus, num_gpus, memory, object_store_memory, resources) + max_task_retries, num_cpus, num_gpus, memory, object_store_memory, + resources) return self @classmethod def _ray_from_function_descriptor( cls, language, actor_creation_function_descriptor, max_restarts, - num_cpus, num_gpus, memory, object_store_memory, resources): + max_task_retries, num_cpus, num_gpus, memory, object_store_memory, + resources): self = ActorClass.__new__(ActorClass) self.__ray_metadata__ = ActorClassMetadata( language, None, actor_creation_function_descriptor, None, - max_restarts, num_cpus, num_gpus, memory, object_store_memory, - resources) + max_restarts, max_task_retries, num_cpus, num_gpus, memory, + object_store_memory, resources) return self @@ -406,6 +410,7 @@ class ActorClass: is_direct_call=None, max_concurrency=None, max_restarts=None, + max_task_retries=None, name=None, detached=False): """Create an actor. @@ -557,6 +562,7 @@ class ActorClass: meta.actor_creation_function_descriptor, creation_args, max_restarts or meta.max_restarts, + max_task_retries or meta.max_task_retries, resources, actor_placement_resources, max_concurrency, @@ -891,11 +897,13 @@ def modify_class(cls): def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources, - max_restarts): + max_restarts, max_task_retries): Class = modify_class(cls) if max_restarts is None: max_restarts = 0 + if max_task_retries is None: + max_task_retries = 0 infinite_restart = max_restarts == -1 if not infinite_restart: @@ -907,9 +915,13 @@ def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources, # an overflow. max_restarts = min(max_restarts, ray_constants.MAX_INT64_VALUE) + if max_restarts == 0 and max_task_retries != 0: + raise ValueError( + "max_task_retries cannot be set if max_restarts is 0.") + return ActorClass._ray_from_modified_class( - Class, ActorClassID.from_random(), max_restarts, num_cpus, num_gpus, - memory, object_store_memory, resources) + Class, ActorClassID.from_random(), max_restarts, max_task_retries, + num_cpus, num_gpus, memory, object_store_memory, resources) def exit_actor(): diff --git a/python/ray/cross_language.py b/python/ray/cross_language.py index aaec66ebc..c9620c7b5 100644 --- a/python/ray/cross_language.py +++ b/python/ray/cross_language.py @@ -77,6 +77,7 @@ def java_actor_class(class_name): Language.JAVA, JavaFunctionDescriptor(class_name, "", ""), 0, # max_restarts, + 0, # max_task_retries, None, # num_cpus, None, # num_gpus, None, # memory, diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 97053af1f..e3aceab23 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -231,6 +231,7 @@ cdef extern from "ray/core_worker/common.h" nogil: CActorCreationOptions() CActorCreationOptions( int64_t max_restarts, + int64_t max_task_retries, int32_t max_concurrency, const unordered_map[c_string, double] &resources, const unordered_map[c_string, double] &placement_resources, diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index eb4bc2bac..24cdbe789 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -51,7 +51,7 @@ py_test( size = "medium", srcs = ["test_actor_failures.py"], # TODO(ekl) enable this once we support actor reconstruction again - tags = ["exclusive", "manual"], + tags = ["exclusive"], deps = ["//:ray_lib"], ) diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index 6230f0edc..3428e2a28 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -3,10 +3,6 @@ import json import numpy as np import os import pytest -try: - import pytest_timeout -except ImportError: - pytest_timeout = None import signal import sys import time @@ -88,9 +84,13 @@ def ray_checkpointable_actor_cls(request): @pytest.mark.parametrize( - "ray_start_object_store_memory", [150 * 1024 * 1024], indirect=True) -def test_actor_eviction(ray_start_object_store_memory): - object_store_memory = ray_start_object_store_memory + "ray_start_regular", [{ + "object_store_memory": 150 * 1024 * 1024, + "lru_evict": True, + }], + indirect=True) +def test_actor_eviction(ray_start_regular): + object_store_memory = 150 * 1024 * 1024 @ray.remote class Actor: @@ -127,10 +127,14 @@ def test_actor_eviction(ray_start_object_store_memory): assert num_success > 0 -def test_actor_restart(ray_start_regular): - """Test actor reconstruction when actor process is killed.""" +def test_actor_restart(): + """Test actor restart when actor process is killed.""" + ray.init( + _internal_config=json.dumps({ + "task_retry_delay_ms": 100, + }), ) - @ray.remote(max_restarts=1) + @ray.remote(max_restarts=1, max_task_retries=-1) class RestartableActor: """An actor that will be restarted at most once.""" @@ -147,20 +151,48 @@ def test_actor_restart(ray_start_regular): actor = RestartableActor.remote() pid = ray.get(actor.get_pid.remote()) - # Call increase 3 times - for _ in range(3): - ray.get(actor.increase.remote()) - # Call increase again with some delay. - result = actor.increase.remote(delay=0.5) - # Sleep some time to wait for the above task to start execution. - time.sleep(0.2) + results = [actor.increase.remote() for _ in range(100)] # Kill actor process, while the above task is still being executed. os.kill(pid, signal.SIGKILL) - # Check that the above task didn't fail and the actor is restarted. - assert ray.get(result) == 4 + # Make sure that all tasks were executed in order before the actor's death. + res = results.pop(0) + i = 1 + while True: + try: + r = ray.get(res) + if r != i: + # Actor restarted without any failed tasks. + break + res = results.pop(0) + i += 1 + except ray.exceptions.RayActorError: + # Actor restarted. + break + # Find the first task to execute after the actor was restarted. + while True: + try: + r = ray.get(res) + break + except ray.exceptions.RayActorError: + res = results.pop(0) + pass + # Make sure that all tasks were executed in order after the actor's death. + i = 1 + while True: + r = ray.get(res) + assert r == i + if results: + res = results.pop(0) + i += 1 + else: + break + # Check that we can still call the actor. - assert ray.get(actor.increase.remote()) == 5 + result = actor.increase.remote() + assert ray.get(result) == r + 1 + # kill actor process one more time. + results = [actor.increase.remote() for _ in range(100)] pid = ray.get(actor.get_pid.remote()) os.kill(pid, signal.SIGKILL) # The actor has exceeded max restarts, and this task should fail. @@ -174,37 +206,154 @@ def test_actor_restart(ray_start_regular): # Check that the actor won't be restarted. with pytest.raises(ray.exceptions.RayActorError): ray.get(actor.increase.remote()) + ray.shutdown() + + +def test_actor_restart_with_retry(): + """Test actor restart when actor process is killed.""" + ray.init( + _internal_config=json.dumps({ + "task_retry_delay_ms": 100, + }), ) + + @ray.remote(max_restarts=1, max_task_retries=-1) + class RestartableActor: + """An actor that will be restarted at most once.""" + + def __init__(self): + self.value = 0 + + def increase(self, delay=0): + time.sleep(delay) + self.value += 1 + return self.value + + def get_pid(self): + return os.getpid() + + actor = RestartableActor.remote() + pid = ray.get(actor.get_pid.remote()) + results = [actor.increase.remote() for _ in range(100)] + # Kill actor process, while the above task is still being executed. + os.kill(pid, signal.SIGKILL) + # Check that none of the tasks failed and the actor is restarted. + seq = list(range(1, 101)) + results = ray.get(results) + failed_task_index = None + # Make sure that all tasks were executed in order before and after the + # actor's death. + for i, res in enumerate(results): + if res != seq[0]: + if failed_task_index is None: + failed_task_index = i + assert res + failed_task_index == seq[0] + seq.pop(0) + # Check that we can still call the actor. + result = actor.increase.remote() + assert ray.get(result) == results[-1] + 1 + + # kill actor process one more time. + results = [actor.increase.remote() for _ in range(100)] + pid = ray.get(actor.get_pid.remote()) + os.kill(pid, signal.SIGKILL) + # The actor has exceeded max restarts, and this task should fail. + with pytest.raises(ray.exceptions.RayActorError): + ray.get(actor.increase.remote()) + + # Create another actor. + actor = RestartableActor.remote() + # Intentionlly exit the actor + actor.__ray_terminate__.remote() + # Check that the actor won't be restarted. + with pytest.raises(ray.exceptions.RayActorError): + ray.get(actor.increase.remote()) + ray.shutdown() + + +def test_actor_restart_on_node_failure(ray_start_cluster): + config = json.dumps({ + "num_heartbeats_timeout": 10, + "raylet_heartbeat_timeout_milliseconds": 100, + "initial_reconstruction_timeout_milliseconds": 1000, + "task_retry_delay_ms": 100, + }) + cluster = ray_start_cluster + # Head node with no resources. + cluster.add_node(num_cpus=0, _internal_config=config) + # Node to place the actor. + cluster.add_node(num_cpus=1, _internal_config=config) + cluster.wait_for_nodes() + ray.init(address=cluster.address) + + @ray.remote(num_cpus=1, max_restarts=1, max_task_retries=-1) + class RestartableActor: + """An actor that will be reconstructed at most once.""" + + def __init__(self): + self.value = 0 + + def increase(self): + self.value += 1 + return self.value + + def ready(self): + return + + actor = RestartableActor.remote() + ray.get(actor.ready.remote()) + results = [actor.increase.remote() for _ in range(100)] + # Kill actor node, while the above task is still being executed. + cluster.remove_node(cluster.list_all_nodes()[-1]) + cluster.add_node(num_cpus=1, _internal_config=config) + cluster.wait_for_nodes() + # Check that none of the tasks failed and the actor is restarted. + seq = list(range(1, 101)) + results = ray.get(results) + failed_task_index = None + # Make sure that all tasks were executed in order before and after the + # actor's death. + for i, res in enumerate(results): + elm = seq.pop(0) + if res != elm: + if failed_task_index is None: + failed_task_index = i + assert res + failed_task_index == elm + # Check that we can still call the actor. + result = ray.get(actor.increase.remote()) + assert result == 1 or result == results[-1] + 1 def test_actor_restart_without_task(ray_start_regular): """Test a dead actor can be restarted without sending task to it.""" - @ray.remote(max_restarts=1) + @ray.remote(max_restarts=1, resources={"actor": 1}) class RestartableActor: - def __init__(self, obj_ids): - for obj_id in obj_ids: - # Every time the actor gets constructed, - # put a new object in plasma store. - global_worker = ray.worker.global_worker - if not global_worker.core_worker.object_exists(obj_id): - global_worker.put_object(1, obj_id) - break + def __init__(self): + pass def get_pid(self): return os.getpid() - obj_ids = [ray.ObjectID.from_random() for _ in range(2)] - actor = RestartableActor.remote(obj_ids) + @ray.remote(resources={"actor": 1}) + def probe(): + return + + # Returns whether the "actor" resource is available. + def actor_resource_available(): + p = probe.remote() + ready, _ = ray.wait([p], timeout=1) + return len(ready) > 0 + + ray.experimental.set_resource("actor", 1) + actor = RestartableActor.remote() + assert wait_for_condition(lambda: not actor_resource_available()) # Kill the actor. pid = ray.get(actor.get_pid.remote()) + + p = probe.remote() os.kill(pid, signal.SIGKILL) - - # Wait until the actor is reconstructed. - def check_restarted(): - worker = ray.worker.global_worker - return worker.core_worker.object_exists(obj_ids[1]) - - assert wait_for_condition(check_restarted) + ray.get(p) + assert wait_for_condition(lambda: not actor_resource_available()) def test_caller_actor_restart(ray_start_regular): @@ -277,66 +426,6 @@ def test_caller_task_reconstruction(ray_start_regular): assert ray.get(RetryableTask.remote(remote_actor)) == 3 -def test_actor_restart_on_node_failure(ray_start_cluster_head): - """Test actor reconstruction when node dies unexpectedly.""" - cluster = ray_start_cluster_head - max_restarts = 3 - # Add a few nodes to the cluster. - # Use custom resource to make sure the actor is only created on worker - # nodes, not on the head node. - for _ in range(max_restarts + 2): - cluster.add_node( - resources={"a": 1}, - _internal_config=json.dumps({ - "initial_reconstruction_timeout_milliseconds": 200, - "num_heartbeats_timeout": 10, - }), - ) - - def kill_node(node_id): - node_to_remove = None - for node in cluster.worker_nodes: - if node_id == node.unique_id: - node_to_remove = node - cluster.remove_node(node_to_remove) - - @ray.remote(max_restarts=max_restarts, resources={"a": 1}) - class MyActor: - def __init__(self): - self.value = 0 - - def increase(self): - self.value += 1 - return self.value - - def get_object_store_socket(self): - return ray.worker.global_worker.node.unique_id - - actor = MyActor.remote() - # Call increase 3 times. - for _ in range(3): - ray.get(actor.increase.remote()) - - for i in range(max_restarts): - object_store_socket = ray.get(actor.get_object_store_socket.remote()) - # Kill actor's node and the actor should be restarted - # on a different node. - kill_node(object_store_socket) - # Call increase again. - # Check that the actor is restarted and value is correct. - assert ray.get(actor.increase.remote()) == 4 + i - # Check that the actor is now on a different node. - assert object_store_socket != ray.get( - actor.get_object_store_socket.remote()) - - # kill the node again. - object_store_socket = ray.get(actor.get_object_store_socket.remote()) - kill_node(object_store_socket) - # The actor has exceeded max restarts, and this task should fail. - with pytest.raises(ray.exceptions.RayActorError): - ray.get(actor.increase.remote()) - - # NOTE(hchen): we set initial_reconstruction_timeout_milliseconds to 1s for # this test. Because if this value is too small, suprious task reconstruction # may happen and cause the test fauilure. If the value is too large, this test @@ -365,7 +454,7 @@ def test_multiple_actor_restart(ray_start_cluster_head): })) for _ in range(num_nodes) ] - @ray.remote(max_restarts=-1) + @ray.remote(max_restarts=-1, max_task_retries=-1) class SlowCounter: def __init__(self): self.x = 0 @@ -407,8 +496,12 @@ def test_multiple_actor_restart(ray_start_cluster_head): # Get the results and check that they have the correct values. for _, result_id_list in result_ids.items(): - results = list(range(1, len(result_id_list) + 1)) - assert ray.get(result_id_list) == results + results = ray.get(result_id_list) + for i, result in enumerate(results): + if i == 0: + assert result == 1 + else: + assert result == results[i - 1] + 1 or result == 1 def kill_actor(actor): @@ -418,6 +511,7 @@ def kill_actor(actor): wait_for_pid_to_exit(pid) +@pytest.mark.skip(reason="TODO: Actor checkpointing") def test_checkpointing(ray_start_regular, ray_checkpointable_actor_cls): """Test actor checkpointing and restoring from a checkpoint.""" actor = ray.remote(max_restarts=2)(ray_checkpointable_actor_cls).remote() @@ -440,13 +534,14 @@ def test_checkpointing(ray_start_regular, ray_checkpointable_actor_cls): for _ in range(3): ray.get(actor.increase.remote()) expected += 1 - # Kill actor again and check that reconstruction still works after the + # Kill actor again and check that restart still works after the # actor resuming from a checkpoint. kill_actor(actor) assert ray.get(actor.get.remote()) == expected assert ray.get(actor.was_resumed_from_checkpoint.remote()) is True +@pytest.mark.skip(reason="TODO: Actor checkpointing") def test_remote_checkpointing(ray_start_regular, ray_checkpointable_actor_cls): """Test checkpointing of a remote actor through method invocation.""" @@ -487,13 +582,14 @@ def test_remote_checkpointing(ray_start_regular, ray_checkpointable_actor_cls): for _ in range(3): ray.get(actor.increase.remote()) expected += 1 - # Kill actor again and check that reconstruction still works after the + # Kill actor again and check that restart still works after the # actor resuming from a checkpoint. kill_actor(actor) assert ray.get(actor.get.remote()) == expected assert ray.get(actor.was_resumed_from_checkpoint.remote()) is True +@pytest.mark.skip(reason="TODO: Actor checkpointing") def test_checkpointing_on_node_failure(ray_start_cluster_2_nodes, ray_checkpointable_actor_cls): """Test actor checkpointing on a remote node.""" @@ -520,6 +616,7 @@ def test_checkpointing_on_node_failure(ray_start_cluster_2_nodes, assert ray.get(actor.was_resumed_from_checkpoint.remote()) is True +@pytest.mark.skip(reason="TODO: Actor checkpointing") def test_checkpointing_save_exception(ray_start_regular, ray_checkpointable_actor_cls): """Test actor can still be recovered if checkpoints fail to complete.""" @@ -549,7 +646,7 @@ def test_checkpointing_save_exception(ray_start_regular, for _ in range(3): ray.get(actor.increase.remote()) expected += 1 - # Kill actor again, and check that reconstruction still works and the actor + # Kill actor again, and check that restart still works and the actor # wasn't resumed from a checkpoint. kill_actor(actor) assert ray.get(actor.get.remote()) == expected @@ -559,6 +656,7 @@ def test_checkpointing_save_exception(ray_start_regular, wait_for_errors(ray_constants.CHECKPOINT_PUSH_ERROR, 1) +@pytest.mark.skip(reason="TODO: Actor checkpointing") def test_checkpointing_load_exception(ray_start_regular, ray_checkpointable_actor_cls): """Test actor can still be recovered if checkpoints fail to load.""" @@ -589,7 +687,7 @@ def test_checkpointing_load_exception(ray_start_regular, for _ in range(3): ray.get(actor.increase.remote()) expected += 1 - # Kill actor again, and check that reconstruction still works and the actor + # Kill actor again, and check that restart still works and the actor # wasn't resumed from a checkpoint. kill_actor(actor) assert ray.get(actor.get.remote()) == expected @@ -718,10 +816,6 @@ def test_decorated_method(ray_start_regular): assert ray.get(object_id) == 7 # 2 * 3 + 1 -@pytest.mark.skipif( - pytest_timeout is None, - reason="Timeout package not installed; skipping test that may hang.") -@pytest.mark.timeout(20) @pytest.mark.parametrize( "ray_start_cluster", [{ "num_cpus": 1, @@ -747,53 +841,45 @@ def test_ray_wait_dead_actor(ray_start_cluster): actors = [Actor.remote() for _ in range(num_nodes)] ray.get([actor.ping.remote() for actor in actors]) - # Ping the actors and make sure the tasks complete. - ping_ids = [actor.ping.remote() for actor in actors] - ray.get(ping_ids) - # Evict the result from the node that we're about to kill. - remote_node = cluster.list_all_nodes()[-1] - remote_ping_id = None - for i, actor in enumerate(actors): - if ray.get(actor.node_id.remote()) == remote_node.unique_id: - remote_ping_id = ping_ids[i] - ray.internal.free([remote_ping_id], local_only=True) - cluster.remove_node(remote_node) + def actor_dead(): + # Ping the actors and make sure the tasks complete. + ping_ids = [actor.ping.remote() for actor in actors] + unready = ping_ids[:] + while unready: + _, unready = ray.wait(unready, timeout=0) + time.sleep(1) - # Repeatedly call ray.wait until the exception for the dead actor is - # received. - unready = ping_ids[:] - while unready: - _, unready = ray.wait(unready, timeout=0) - time.sleep(1) + try: + ray.get(ping_ids) + return False + except ray.exceptions.RayActorError: + return True - with pytest.raises(ray.exceptions.RayActorError): - ray.get(ping_ids) + # Kill a node. + cluster.remove_node(cluster.list_all_nodes()[-1]) + # Repeatedly submit tasks and call ray.wait until the exception for the + # dead actor is received. + assert wait_for_condition(actor_dead) - # Evict the result from the dead node. - ray.internal.free([remote_ping_id], local_only=True) # Create an actor on the local node that will call ray.wait in a loop. head_node_resource = "HEAD_NODE" ray.experimental.set_resource(head_node_resource, 1) @ray.remote(num_cpus=0, resources={head_node_resource: 1}) class ParentActor: - def __init__(self, ping_ids): - self.unready = ping_ids + def __init__(self): + pass def wait(self): - _, self.unready = ray.wait(self.unready, timeout=0) - return len(self.unready) == 0 + return actor_dead() def ping(self): return # Repeatedly call ray.wait through the local actor until the exception for # the dead actor is received. - parent_actor = ParentActor.remote(ping_ids) - ray.get(parent_actor.ping.remote()) - failure_detected = False - while not failure_detected: - failure_detected = ray.get(parent_actor.wait.remote()) + parent_actor = ParentActor.remote() + assert wait_for_condition(lambda: ray.get(parent_actor.wait.remote())) if __name__ == "__main__": diff --git a/python/ray/worker.py b/python/ray/worker.py index 95c68102f..c2d162a52 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1730,6 +1730,7 @@ def make_decorator(num_return_vals=None, max_calls=None, max_retries=None, max_restarts=None, + max_task_retries=None, worker=None): def decorator(function_or_class): if (inspect.isfunction(function_or_class) @@ -1738,6 +1739,9 @@ def make_decorator(num_return_vals=None, if max_restarts is not None: raise ValueError("The keyword 'max_restarts' is not " "allowed for remote functions.") + if max_task_retries is not None: + raise ValueError("The keyword 'max_task_retries' is not " + "allowed for remote functions.") return ray.remote_function.RemoteFunction( Language.PYTHON, function_or_class, None, num_cpus, num_gpus, @@ -1754,7 +1758,7 @@ def make_decorator(num_return_vals=None, return ray.actor.make_actor(function_or_class, num_cpus, num_gpus, memory, object_store_memory, resources, - max_restarts) + max_restarts, max_task_retries) raise TypeError("The @ray.remote decorator must be applied to " "either a function or to a class.") @@ -1801,6 +1805,14 @@ def remote(*args, **kwargs): unexpectedly. The minimum valid value is 0 (default), which indicates that the actor doesn't need to be restarted. A value of -1 indicates that an actor should be restarted indefinitely. + * **max_task_retries**: Only for *actors*. How many times to retry an actor + task if the task fails due to a system error, e.g., the actor has died. + If set to -1, the system will retry the failed task until the task + succeeds, or the actor has reached its max_restarts limit. If set to n > + 0, the system will retry the failed task up to n times, after which the + task will throw a `RayActorError` exception upon `ray.get`. Note that + Python exceptions are not considered system errors and will not trigger + retries. * **max_retries**: Only for *remote functions*. This specifies the maximum number of times that the remote function should be rerun when the worker process executing it crashes unexpectedly. The minimum valid value is 0, @@ -1867,6 +1879,7 @@ def remote(*args, **kwargs): "resources", "max_calls", "max_restarts", + "max_task_retries", "max_retries", ], error_string @@ -1885,6 +1898,7 @@ def remote(*args, **kwargs): num_return_vals = kwargs.get("num_return_vals") max_calls = kwargs.get("max_calls") max_restarts = kwargs.get("max_restarts") + max_task_retries = kwargs.get("max_task_retries") memory = kwargs.get("memory") object_store_memory = kwargs.get("object_store_memory") max_retries = kwargs.get("max_retries") @@ -1898,5 +1912,6 @@ def remote(*args, **kwargs): resources=resources, max_calls=max_calls, max_restarts=max_restarts, + max_task_retries=max_task_retries, max_retries=max_retries, worker=worker) diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 5906799a5..0b0640443 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -209,6 +209,10 @@ const rpc::Address &TaskSpecification::CallerAddress() const { return message_->caller_address(); } +WorkerID TaskSpecification::CallerWorkerId() const { + return WorkerID::FromBinary(message_->caller_address().worker_id()); +} + // === Below are getter methods specific to actor tasks. ActorID TaskSpecification::ActorId() const { diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index d219975e7..ea9f9f38e 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -151,6 +151,8 @@ class TaskSpecification : public MessageWrapper { const rpc::Address &CallerAddress() const; + WorkerID CallerWorkerId() const; + uint64_t ActorCounter() const; ObjectID ActorCreationDummyObjectId() const; diff --git a/src/ray/core_worker/actor_handle.cc b/src/ray/core_worker/actor_handle.cc index 67495df4a..239cf73d2 100644 --- a/src/ray/core_worker/actor_handle.cc +++ b/src/ray/core_worker/actor_handle.cc @@ -23,7 +23,7 @@ ray::rpc::ActorHandle CreateInnerActorHandle( const ray::rpc::Address &owner_address, const class JobID &job_id, const ObjectID &initial_cursor, const Language actor_language, const ray::FunctionDescriptor &actor_creation_task_function_descriptor, - const std::string &extension_data) { + const std::string &extension_data, int64_t max_task_retries) { ray::rpc::ActorHandle inner; inner.set_actor_id(actor_id.Data(), actor_id.Size()); inner.set_owner_id(owner_id.Binary()); @@ -34,6 +34,7 @@ ray::rpc::ActorHandle CreateInnerActorHandle( actor_creation_task_function_descriptor->GetMessage(); inner.set_actor_cursor(initial_cursor.Binary()); inner.set_extension_data(extension_data); + inner.set_max_task_retries(max_task_retries); return inner; } @@ -69,10 +70,10 @@ ActorHandle::ActorHandle( const rpc::Address &owner_address, const class JobID &job_id, const ObjectID &initial_cursor, const Language actor_language, const ray::FunctionDescriptor &actor_creation_task_function_descriptor, - const std::string &extension_data) + const std::string &extension_data, int64_t max_task_retries) : ActorHandle(CreateInnerActorHandle( actor_id, owner_id, owner_address, job_id, initial_cursor, actor_language, - actor_creation_task_function_descriptor, extension_data)) {} + actor_creation_task_function_descriptor, extension_data, max_task_retries)) {} ActorHandle::ActorHandle(const std::string &serialized) : ActorHandle(CreateInnerActorHandleFromString(serialized)) {} @@ -95,10 +96,4 @@ void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_ void ActorHandle::Serialize(std::string *output) { inner_.SerializeToString(output); } -void ActorHandle::Reset() { - absl::MutexLock guard(&mutex_); - task_counter_ = 0; - actor_cursor_ = ObjectID::FromBinary(inner_.actor_cursor()); -} - } // namespace ray diff --git a/src/ray/core_worker/actor_handle.h b/src/ray/core_worker/actor_handle.h index 078d2ee21..8e49a0e5a 100644 --- a/src/ray/core_worker/actor_handle.h +++ b/src/ray/core_worker/actor_handle.h @@ -37,7 +37,7 @@ class ActorHandle { const rpc::Address &owner_address, const JobID &job_id, const ObjectID &initial_cursor, const Language actor_language, const ray::FunctionDescriptor &actor_creation_task_function_descriptor, - const std::string &extension_data); + const std::string &extension_data, int64_t max_task_retries); /// Constructs an ActorHandle from a serialized string. ActorHandle(const std::string &serialized); @@ -68,34 +68,12 @@ class ActorHandle { void Serialize(std::string *output); - /// Reset the handle state next task submitted. - /// - /// This should be called whenever the actor is restarted, since the new - /// instance of the actor does not have the previous sequence number. - /// TODO: We should also move the other actor state (status and IP) inside - /// ActorHandle and reset them in this method. - void Reset(); - - // Mark the actor handle as dead. - void MarkDead() { - absl::MutexLock lock(&mutex_); - state_ = rpc::ActorTableData::DEAD; - } - - // Returns whether the actor is known to be dead. - bool IsDead() const { - absl::MutexLock lock(&mutex_); - return state_ == rpc::ActorTableData::DEAD; - } + int64_t MaxTaskRetries() const { return inner_.max_task_retries(); } private: // Protobuf-defined persistent state of the actor handle. const ray::rpc::ActorHandle inner_; - /// The actor's state (alive or dead). This defaults to ALIVE. Once marked - /// DEAD, the actor handle can never go back to being ALIVE. - rpc::ActorTableData::ActorState state_ GUARDED_BY(mutex_) = rpc::ActorTableData::ALIVE; - /// The unique id of the dummy object returned by the previous task. /// TODO: This can be removed once we schedule actor tasks by task counter /// only. diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 7d54aabbc..64c165dfe 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -111,12 +111,14 @@ struct TaskOptions { /// Options for actor creation tasks. struct ActorCreationOptions { ActorCreationOptions() {} - ActorCreationOptions(int64_t max_restarts, int max_concurrency, + ActorCreationOptions(int64_t max_restarts, int64_t max_task_retries, + int max_concurrency, const std::unordered_map &resources, const std::unordered_map &placement_resources, const std::vector &dynamic_worker_options, bool is_detached, std::string &name, bool is_asyncio) : max_restarts(max_restarts), + max_task_retries(max_task_retries), max_concurrency(max_concurrency), resources(resources), placement_resources(placement_resources), @@ -125,10 +127,14 @@ struct ActorCreationOptions { name(name), is_asyncio(is_asyncio){}; - /// Maximum number of times that the actor should be reconstructed when it dies - /// unexpectedly. A value of -1 indicates infinite restarts. - /// If it's 0, the actor won't be restarted. + /// Maximum number of times that the actor should be restarted if it dies + /// unexpectedly. A value of -1 indicates infinite restarts. If it's 0, the + /// actor won't be restarted. const int64_t max_restarts = 0; + /// Maximum number of times that individual tasks can be retried at the + /// actor, if the actor dies unexpectedly. If -1, then the task may be + /// retried infinitely many times. + const int64_t max_task_retries = 0; /// The max number of concurrent tasks to run on this direct call actor. const int max_concurrency = 1; /// Resources required by the whole lifetime of this actor. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 8995d7dd2..85b403e97 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -428,7 +428,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ } direct_actor_submitter_ = std::unique_ptr( - new CoreWorkerDirectActorTaskSubmitter(rpc_address_, client_factory, memory_store_, + new CoreWorkerDirectActorTaskSubmitter(client_factory, memory_store_, task_manager_)); direct_task_submitter_ = @@ -604,20 +604,9 @@ const WorkerID &CoreWorker::GetWorkerID() const { return worker_context_.GetWork void CoreWorker::SetCurrentTaskId(const TaskID &task_id) { worker_context_.SetCurrentTaskId(task_id); - bool not_actor_task = false; { absl::MutexLock lock(&mutex_); main_thread_task_id_ = task_id; - not_actor_task = actor_id_.IsNil(); - } - if (not_actor_task && task_id.IsNil()) { - absl::MutexLock lock(&actor_handles_mutex_); - // Reset the seqnos so that for the next task it start off at 0. - for (const auto &handle : actor_handles_) { - handle.second->Reset(); - } - // TODO(ekl) we can't unsubscribe to actor notifications here due to - // https://github.com/ray-project/ray/pull/6885 } } @@ -676,7 +665,12 @@ void CoreWorker::InternalHeartbeat(const boost::system::error_code &error) { absl::MutexLock lock(&mutex_); while (!to_resubmit_.empty() && current_time_ms() > to_resubmit_.front().first) { - RAY_CHECK_OK(direct_task_submitter_->SubmitTask(to_resubmit_.front().second)); + auto &spec = to_resubmit_.front().second; + if (spec.IsActorTask()) { + RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(spec)); + } else { + RAY_CHECK_OK(direct_task_submitter_->SubmitTask(spec)); + } to_resubmit_.pop_front(); } internal_timer_.expires_at(internal_timer_.expiry() + @@ -1156,7 +1150,8 @@ Status CoreWorker::CreateActor(const RayFunction &function, // WaitForActorOutOfScopeRequest. std::unique_ptr actor_handle(new ActorHandle( actor_id, GetCallerId(), rpc_address_, job_id, /*actor_cursor=*/return_ids[0], - function.GetLanguage(), function.GetFunctionDescriptor(), extension_data)); + function.GetLanguage(), function.GetFunctionDescriptor(), extension_data, + actor_creation_options.max_task_retries)); RAY_CHECK(AddActorHandle(std::move(actor_handle), /*is_owner_handle=*/!actor_creation_options.is_detached)) << "Actor " << actor_id << " already exists"; @@ -1215,14 +1210,8 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f ExecuteTaskLocalMode(task_spec, actor_id); } else { task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, - CurrentCallSite()); - if (actor_handle->IsDead()) { - auto status = Status::IOError("sent task to dead actor"); - task_manager_->PendingTaskFailed(task_spec.TaskId(), rpc::ErrorType::ACTOR_DIED, - &status); - } else { - status = direct_actor_submitter_->SubmitTask(task_spec); - } + CurrentCallSite(), actor_handle->MaxTaskRetries()); + status = direct_actor_submitter_->SubmitTask(task_spec); } return status; } @@ -1299,6 +1288,7 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle, } reference_counter_->AddLocalReference(actor_creation_return_id, CurrentCallSite()); + direct_actor_submitter_->AddActorQueueIfNotExists(actor_id); bool inserted; { @@ -1313,19 +1303,9 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle, if (actor_data.state() == gcs::ActorTableData::PENDING) { // The actor is being created and not yet ready, just ignore! } else if (actor_data.state() == gcs::ActorTableData::RESTARTING) { - absl::MutexLock lock(&actor_handles_mutex_); - auto it = actor_handles_.find(actor_id); - RAY_CHECK(it != actor_handles_.end()); - // We have to reset the actor handle since the next instance of the - // actor will not have the last sequence number that we sent. - it->second->Reset(); direct_actor_submitter_->DisconnectActor(actor_id, false); } else if (actor_data.state() == gcs::ActorTableData::DEAD) { direct_actor_submitter_->DisconnectActor(actor_id, true); - - ActorHandle *actor_handle = nullptr; - RAY_CHECK_OK(GetActorHandle(actor_id, &actor_handle)); - actor_handle->MarkDead(); // We cannot erase the actor handle here because clients can still // submit tasks to dead actors. This also means we defer unsubscription, // otherwise we crash when bulk unsubscribing all actor handles. @@ -1369,8 +1349,8 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle, // language frontend receives another ref to the same actor. In this // case, we must remember the last task counter that we sent to the // actor. - // TODO(swang): Unsubscribe from the actor table once all local refs - // to the actor have gone out of scope. + // TODO(ekl) we can't unsubscribe to actor notifications here due to + // https://github.com/ray-project/ray/pull/6885 auto callback = actor_out_of_scope_callbacks_.extract(actor_id); if (callback) { callback.mapped()(actor_id); @@ -1555,16 +1535,11 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, return_ids.pop_back(); task_type = TaskType::ACTOR_CREATION_TASK; SetActorId(task_spec.ActorCreationId()); - // For an actor, set the timestamp as the time its creation task starts execution. - SetCallerCreationTimestamp(); RAY_LOG(INFO) << "Creating actor: " << task_spec.ActorCreationId(); } else if (task_spec.IsActorTask()) { RAY_CHECK(return_ids.size() > 0); return_ids.pop_back(); task_type = TaskType::ACTOR_TASK; - } else { - // For a non-actor task, set the timestamp as the time it starts execution. - SetCallerCreationTimestamp(); } // Because we support concurrent actor calls, we need to update the @@ -2080,9 +2055,4 @@ void CoreWorker::SetActorTitle(const std::string &title) { actor_title_ = title; } -void CoreWorker::SetCallerCreationTimestamp() { - absl::MutexLock lock(&mutex_); - direct_actor_submitter_->SetCallerCreationTimestamp(current_sys_time_ms()); -} - } // namespace ray diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index b35571386..e76f02c89 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -110,14 +110,16 @@ inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env, } std::string name = ""; - ray::ActorCreationOptions actor_creation_options{max_restarts, - static_cast(max_concurrency), - resources, - resources, - dynamic_worker_options, - /*is_detached=*/false, - name, - /*is_asyncio=*/false}; + ray::ActorCreationOptions actor_creation_options{ + max_restarts, + 0, // TODO: Allow setting max_task_retries from Java. + static_cast(max_concurrency), + resources, + resources, + dynamic_worker_options, + /*is_detached=*/false, + name, + /*is_asyncio=*/false}; return actor_creation_options; } diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index bc32241e9..d234a4181 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -28,7 +28,8 @@ void TaskManager::AddPendingTask(const TaskID &caller_id, const rpc::Address &caller_address, const TaskSpecification &spec, const std::string &call_site, int max_retries) { - RAY_LOG(DEBUG) << "Adding pending task " << spec.TaskId(); + RAY_LOG(DEBUG) << "Adding pending task " << spec.TaskId() << " with " << max_retries + << " retries"; // Add references for the dependencies to the task. std::vector task_deps; @@ -90,12 +91,18 @@ Status TaskManager::ResubmitTask(const TaskID &task_id, if (it == submissible_tasks_.end()) { return Status::Invalid("Task spec missing"); } + if (it->second.spec.IsActorTask()) { + return Status::Invalid("Cannot reconstruct objects returned by actors"); + } if (!it->second.pending) { resubmit = true; it->second.pending = true; - RAY_CHECK(it->second.num_retries_left != 0); - it->second.num_retries_left--; + if (it->second.num_retries_left > 0) { + it->second.num_retries_left--; + } else { + RAY_CHECK(it->second.num_retries_left == -1); + } spec = it->second.spec; } } @@ -256,7 +263,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id, ShutdownIfNeeded(); } -void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, +bool TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, Status *status) { // Note that this might be the __ray_terminate__ task, so we don't log // loudly with ERROR here. @@ -286,6 +293,7 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_ } } + bool will_retry = false; // We should not hold the lock during these calls because they may trigger // callbacks in this or other classes. if (num_retries_left != 0) { @@ -294,6 +302,7 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_ RAY_LOG(ERROR) << retries_str << " retries left for task " << spec.TaskId() << ", attempting to resubmit."; retry_task_callback_(spec, /*delay=*/true); + will_retry = true; } else { // Throttled logging of task failure errors. { @@ -323,6 +332,8 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_ } ShutdownIfNeeded(); + + return will_retry; } void TaskManager::ShutdownIfNeeded() { diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 85a68a306..29cb6d074 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -32,7 +32,7 @@ class TaskFinisherInterface { virtual void CompletePendingTask(const TaskID &task_id, const rpc::PushTaskReply &reply, const rpc::Address &actor_addr) = 0; - virtual void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, + virtual bool PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, Status *status = nullptr) = 0; virtual void OnTaskDependenciesInlined( @@ -116,7 +116,8 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// \param[in] task_id ID of the pending task. /// \param[in] error_type The type of the specific error. /// \param[in] status Optional status message. - void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, + /// \return Whether the task will be retried or not. + bool PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, Status *status = nullptr) override; /// A task's dependencies were inlined in the task spec. This will decrement diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index c1c465fd0..a0e065ff7 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -73,8 +73,9 @@ ActorID CreateActorHelper(std::unordered_map &resources, std::string name = ""; ActorCreationOptions actor_options{ max_restarts, - /*max_concurrency*/ 1, resources, resources, {}, - /*is_detached=*/false, name, /*is_asyncio=*/false}; + /*max_task_retries=*/0, + /*max_concurrency*/ 1, resources, resources, {}, + /*is_detached=*/false, name, /*is_asyncio=*/false}; // Create an actor. ActorID actor_id; @@ -644,6 +645,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { std::unordered_map resources; std::string name = ""; ActorCreationOptions actor_options{0, + 0, 1, resources, resources, @@ -654,7 +656,8 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { const auto job_id = NextJobId(); ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1), TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(), - function.GetLanguage(), function.GetFunctionDescriptor(), ""); + function.GetLanguage(), function.GetFunctionDescriptor(), "", + 0); // Manually create `num_tasks` task specs, and for each of them create a // `PushTaskRequest`, this is to batch performance of TaskSpec @@ -763,10 +766,10 @@ TEST_F(ZeroNodeTest, TestWorkerContext) { TEST_F(ZeroNodeTest, TestActorHandle) { // Test actor handle serialization and deserialization round trip. JobID job_id = NextJobId(); - ActorHandle original(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 0), - TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(), - Language::PYTHON, - ray::FunctionDescriptorBuilder::BuildPython("", "", "", ""), ""); + ActorHandle original( + ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 0), TaskID::Nil(), + rpc::Address(), job_id, ObjectID::FromRandom(), Language::PYTHON, + ray::FunctionDescriptorBuilder::BuildPython("", "", "", ""), "", 0); std::string output; original.Serialize(&output); ActorHandle deserialized(output); diff --git a/src/ray/core_worker/test/direct_actor_transport_test.cc b/src/ray/core_worker/test/direct_actor_transport_test.cc index 80a9065e2..06752f91c 100644 --- a/src/ray/core_worker/test/direct_actor_transport_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_test.cc @@ -24,16 +24,45 @@ namespace ray { using ::testing::_; +using ::testing::ElementsAre; +using ::testing::Return; + +TaskSpecification CreateActorTaskHelper(ActorID actor_id, WorkerID caller_worker_id, + int64_t counter, + TaskID caller_id = TaskID::Nil()) { + TaskSpecification task; + task.GetMutableMessage().set_task_id(TaskID::Nil().Binary()); + task.GetMutableMessage().set_caller_id(caller_id.Binary()); + task.GetMutableMessage().set_type(TaskType::ACTOR_TASK); + task.GetMutableMessage().mutable_caller_address()->set_worker_id( + caller_worker_id.Binary()); + task.GetMutableMessage().mutable_actor_task_spec()->set_actor_id(actor_id.Binary()); + task.GetMutableMessage().mutable_actor_task_spec()->set_actor_counter(counter); + task.GetMutableMessage().set_num_returns(1); + return task; +} + +rpc::PushTaskRequest CreatePushTaskRequestHelper(ActorID actor_id, int64_t counter, + WorkerID caller_worker_id, + TaskID caller_id, + int64_t caller_timestamp) { + auto task_spec = CreateActorTaskHelper(actor_id, caller_worker_id, counter, caller_id); + + rpc::PushTaskRequest request; + request.mutable_task_spec()->CopyFrom(task_spec.GetMessage()); + request.set_sequence_number(request.task_spec().actor_task_spec().actor_counter()); + request.set_client_processed_up_to(-1); + return request; +} class MockWorkerClient : public rpc::CoreWorkerClientInterface { public: const rpc::Address &Addr() const override { return addr; } ray::Status PushActorTask( - std::unique_ptr request, + std::unique_ptr request, bool skip_queue, const rpc::ClientCallback &callback) override { - RAY_CHECK(counter == request->task_spec().actor_task_spec().actor_counter()); - counter++; + received_seq_nos.push_back(request->sequence_number()); callbacks.push_back(callback); return Status::OK(); } @@ -50,7 +79,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { rpc::Address addr; std::list> callbacks; - uint64_t counter = 0; + std::vector received_seq_nos; }; class MockTaskFinisher : public TaskFinisherInterface { @@ -60,7 +89,7 @@ class MockTaskFinisher : public TaskFinisherInterface { MOCK_METHOD3(CompletePendingTask, void(const TaskID &, const rpc::PushTaskReply &, const rpc::Address &addr)); MOCK_METHOD3(PendingTaskFailed, - void(const TaskID &task_id, rpc::ErrorType error_type, Status *status)); + bool(const TaskID &task_id, rpc::ErrorType error_type, Status *status)); MOCK_METHOD2(OnTaskDependenciesInlined, void(const std::vector &, const std::vector &)); @@ -68,25 +97,15 @@ class MockTaskFinisher : public TaskFinisherInterface { MOCK_METHOD1(MarkTaskCanceled, bool(const TaskID &task_id)); }; -TaskSpecification CreateActorTaskHelper(ActorID actor_id, int64_t counter) { - TaskSpecification task; - task.GetMutableMessage().set_task_id(TaskID::Nil().Binary()); - task.GetMutableMessage().set_type(TaskType::ACTOR_TASK); - task.GetMutableMessage().mutable_actor_task_spec()->set_actor_id(actor_id.Binary()); - task.GetMutableMessage().mutable_actor_task_spec()->set_actor_counter(counter); - return task; -} - class DirectActorSubmitterTest : public ::testing::Test { public: DirectActorSubmitterTest() : worker_client_(std::shared_ptr(new MockWorkerClient())), store_(std::shared_ptr(new CoreWorkerMemoryStore())), task_finisher_(std::make_shared()), - submitter_(address_, [&](const rpc::Address &addr) { return worker_client_; }, - store_, task_finisher_) {} + submitter_([&](const rpc::Address &addr) { return worker_client_; }, store_, + task_finisher_) {} - rpc::Address address_; std::shared_ptr worker_client_; std::shared_ptr store_; std::shared_ptr task_finisher_; @@ -95,16 +114,19 @@ class DirectActorSubmitterTest : public ::testing::Test { TEST_F(DirectActorSubmitterTest, TestSubmitTask) { rpc::Address addr; + auto worker_id = WorkerID::FromRandom(); + addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); + submitter_.AddActorQueueIfNotExists(actor_id); - auto task = CreateActorTaskHelper(actor_id, 0); + auto task = CreateActorTaskHelper(actor_id, worker_id, 0); ASSERT_TRUE(submitter_.SubmitTask(task).ok()); ASSERT_EQ(worker_client_->callbacks.size(), 0); submitter_.ConnectActor(actor_id, addr); ASSERT_EQ(worker_client_->callbacks.size(), 1); - task = CreateActorTaskHelper(actor_id, 1); + task = CreateActorTaskHelper(actor_id, worker_id, 1); ASSERT_TRUE(submitter_.SubmitTask(task).ok()); ASSERT_EQ(worker_client_->callbacks.size(), 2); @@ -114,20 +136,24 @@ TEST_F(DirectActorSubmitterTest, TestSubmitTask) { while (!worker_client_->callbacks.empty()) { ASSERT_TRUE(worker_client_->ReplyPushTask()); } + ASSERT_THAT(worker_client_->received_seq_nos, ElementsAre(0, 1)); } TEST_F(DirectActorSubmitterTest, TestDependencies) { rpc::Address addr; + auto worker_id = WorkerID::FromRandom(); + addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); + submitter_.AddActorQueueIfNotExists(actor_id); submitter_.ConnectActor(actor_id, addr); ASSERT_EQ(worker_client_->callbacks.size(), 0); // Create two tasks for the actor with different arguments. ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); - auto task1 = CreateActorTaskHelper(actor_id, 0); + auto task1 = CreateActorTaskHelper(actor_id, worker_id, 0); task1.GetMutableMessage().add_args()->add_object_ids(obj1.Binary()); - auto task2 = CreateActorTaskHelper(actor_id, 1); + auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1); task2.GetMutableMessage().add_args()->add_object_ids(obj2.Binary()); // Neither task can be submitted yet because they are still waiting on @@ -142,20 +168,24 @@ TEST_F(DirectActorSubmitterTest, TestDependencies) { ASSERT_EQ(worker_client_->callbacks.size(), 1); ASSERT_TRUE(store_->Put(*data, obj2)); ASSERT_EQ(worker_client_->callbacks.size(), 2); + ASSERT_THAT(worker_client_->received_seq_nos, ElementsAre(0, 1)); } TEST_F(DirectActorSubmitterTest, TestOutOfOrderDependencies) { rpc::Address addr; + auto worker_id = WorkerID::FromRandom(); + addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); + submitter_.AddActorQueueIfNotExists(actor_id); submitter_.ConnectActor(actor_id, addr); ASSERT_EQ(worker_client_->callbacks.size(), 0); // Create two tasks for the actor with different arguments. ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); - auto task1 = CreateActorTaskHelper(actor_id, 0); + auto task1 = CreateActorTaskHelper(actor_id, worker_id, 0); task1.GetMutableMessage().add_args()->add_object_ids(obj1.Binary()); - auto task2 = CreateActorTaskHelper(actor_id, 1); + auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1); task2.GetMutableMessage().add_args()->add_object_ids(obj2.Binary()); // Neither task can be submitted yet because they are still waiting on @@ -171,28 +201,131 @@ TEST_F(DirectActorSubmitterTest, TestOutOfOrderDependencies) { ASSERT_EQ(worker_client_->callbacks.size(), 0); ASSERT_TRUE(store_->Put(*data, obj1)); ASSERT_EQ(worker_client_->callbacks.size(), 2); + ASSERT_THAT(worker_client_->received_seq_nos, ElementsAre(0, 1)); } -TEST_F(DirectActorSubmitterTest, TestActorFailure) { +TEST_F(DirectActorSubmitterTest, TestActorDead) { rpc::Address addr; + auto worker_id = WorkerID::FromRandom(); + addr.set_worker_id(worker_id.Binary()); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); + submitter_.AddActorQueueIfNotExists(actor_id); gcs::ActorTableData actor_data; submitter_.ConnectActor(actor_id, addr); ASSERT_EQ(worker_client_->callbacks.size(), 0); - // Create two tasks for the actor. - auto task1 = CreateActorTaskHelper(actor_id, 0); - auto task2 = CreateActorTaskHelper(actor_id, 1); + // Create two tasks for the actor. One depends on an object that is not yet available. + auto task1 = CreateActorTaskHelper(actor_id, worker_id, 0); + ObjectID obj = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT); + auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1); + task2.GetMutableMessage().add_args()->add_object_ids(obj.Binary()); ASSERT_TRUE(submitter_.SubmitTask(task1).ok()); ASSERT_TRUE(submitter_.SubmitTask(task2).ok()); - ASSERT_EQ(worker_client_->callbacks.size(), 2); + ASSERT_EQ(worker_client_->callbacks.size(), 1); - // Simulate the actor dying. All submitted tasks should get failed. - EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _, _)).Times(2); + // Simulate the actor dying. All in-flight tasks should get failed. + EXPECT_CALL(*task_finisher_, PendingTaskFailed(task1.TaskId(), _, _)).Times(1); EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _)).Times(0); while (!worker_client_->callbacks.empty()) { ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""))); } + + EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _, _)).Times(0); + submitter_.DisconnectActor(actor_id, /*dead=*/false); + // Actor marked as dead. All queued tasks should get failed. + EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _)).Times(1); + submitter_.DisconnectActor(actor_id, /*dead=*/true); +} + +TEST_F(DirectActorSubmitterTest, TestActorRestartNoRetry) { + rpc::Address addr; + auto worker_id = WorkerID::FromRandom(); + addr.set_worker_id(worker_id.Binary()); + ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); + submitter_.AddActorQueueIfNotExists(actor_id); + gcs::ActorTableData actor_data; + submitter_.ConnectActor(actor_id, addr); + ASSERT_EQ(worker_client_->callbacks.size(), 0); + + // Create four tasks for the actor. + auto task1 = CreateActorTaskHelper(actor_id, worker_id, 0); + auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1); + auto task3 = CreateActorTaskHelper(actor_id, worker_id, 2); + auto task4 = CreateActorTaskHelper(actor_id, worker_id, 3); + // Submit three tasks. + ASSERT_TRUE(submitter_.SubmitTask(task1).ok()); + ASSERT_TRUE(submitter_.SubmitTask(task2).ok()); + ASSERT_TRUE(submitter_.SubmitTask(task3).ok()); + + EXPECT_CALL(*task_finisher_, CompletePendingTask(task1.TaskId(), _, _)).Times(2); + EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _)).Times(2); + // First task finishes. Second task fails. + ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); + ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""))); + + // Simulate the actor failing. + submitter_.DisconnectActor(actor_id, /*dead=*/false); + // Third task fails after the actor is disconnected. It should not get + // retried. + ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""))); + + // Actor gets restarted. + submitter_.ConnectActor(actor_id, addr); + ASSERT_TRUE(submitter_.SubmitTask(task4).ok()); + ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); + ASSERT_TRUE(worker_client_->callbacks.empty()); + // Actor counter restarts at 0 after the actor is restarted. + ASSERT_THAT(worker_client_->received_seq_nos, ElementsAre(0, 1, 2, 0)); +} + +TEST_F(DirectActorSubmitterTest, TestActorRestartRetry) { + rpc::Address addr; + auto worker_id = WorkerID::FromRandom(); + addr.set_worker_id(worker_id.Binary()); + ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); + submitter_.AddActorQueueIfNotExists(actor_id); + gcs::ActorTableData actor_data; + submitter_.ConnectActor(actor_id, addr); + ASSERT_EQ(worker_client_->callbacks.size(), 0); + + // Create four tasks for the actor. + auto task1 = CreateActorTaskHelper(actor_id, worker_id, 0); + auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1); + auto task3 = CreateActorTaskHelper(actor_id, worker_id, 2); + auto task4 = CreateActorTaskHelper(actor_id, worker_id, 3); + // Submit three tasks. + ASSERT_TRUE(submitter_.SubmitTask(task1).ok()); + ASSERT_TRUE(submitter_.SubmitTask(task2).ok()); + ASSERT_TRUE(submitter_.SubmitTask(task3).ok()); + + // All tasks will eventually finish. + EXPECT_CALL(*task_finisher_, CompletePendingTask(task1.TaskId(), _, _)).Times(4); + // Tasks 2 and 3 will be retried. + EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _)) + .Times(2) + .WillRepeatedly(Return(true)); + // First task finishes. Second task fails. + ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); + ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""))); + + // Simulate the actor failing. + submitter_.DisconnectActor(actor_id, /*dead=*/false); + // Third task fails after the actor is disconnected. + ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""))); + + // Actor gets restarted. + submitter_.ConnectActor(actor_id, addr); + // A new task is submitted. + ASSERT_TRUE(submitter_.SubmitTask(task4).ok()); + // Tasks 2 and 3 get retried. + ASSERT_TRUE(submitter_.SubmitTask(task2).ok()); + ASSERT_TRUE(submitter_.SubmitTask(task3).ok()); + while (!worker_client_->callbacks.empty()) { + ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); + } + // Actor counter restarts at 0 after the actor is restarted. New task cannot + // execute until after tasks 2 and 3 are re-executed. + ASSERT_THAT(worker_client_->received_seq_nos, ElementsAre(0, 1, 2, 2, 0, 1)); } class MockDependencyWaiterInterface : public DependencyWaiterInterface { @@ -203,35 +336,6 @@ class MockDependencyWaiterInterface : public DependencyWaiterInterface { } }; -TaskSpecification CreateActorTaskHelper(ActorID actor_id, int64_t counter, - TaskID caller_id) { - TaskSpecification task; - task.GetMutableMessage().set_task_id(TaskID::Nil().Binary()); - task.GetMutableMessage().set_caller_id(caller_id.Binary()); - task.GetMutableMessage().set_type(TaskType::ACTOR_TASK); - task.GetMutableMessage().mutable_actor_task_spec()->set_actor_id(actor_id.Binary()); - task.GetMutableMessage().mutable_actor_task_spec()->set_actor_counter(counter); - task.GetMutableMessage().set_num_returns(1); - return task; -} - -rpc::PushTaskRequest CreatePushTaskRequestHelper(ActorID actor_id, int64_t counter, - WorkerID caller_worker_id, - TaskID caller_id, - int64_t caller_timestamp) { - auto task_spec = CreateActorTaskHelper(actor_id, counter, caller_id); - rpc::Address rpc_address; - rpc_address.set_worker_id(caller_worker_id.Binary()); - - rpc::PushTaskRequest request; - request.mutable_caller_address()->CopyFrom(rpc_address); - request.mutable_task_spec()->CopyFrom(task_spec.GetMessage()); - request.set_caller_version(caller_timestamp); - request.set_sequence_number(request.task_spec().actor_task_spec().actor_counter()); - request.set_client_processed_up_to(-1); - return request; -} - class MockWorkerContext : public WorkerContext { public: MockWorkerContext(WorkerType worker_type, const JobID &job_id) @@ -278,7 +382,6 @@ class DirectActorReceiverTest : public ::testing::Test { }; TEST_F(DirectActorReceiverTest, TestNewTaskFromDifferentWorker) { - rpc::Address addr; TaskID current_task_id = TaskID::Nil(); ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); WorkerID worker_id = WorkerID::FromRandom(); @@ -319,7 +422,7 @@ TEST_F(DirectActorReceiverTest, TestNewTaskFromDifferentWorker) { receiver_->HandlePushTask(request, &reply, reply_callback); } - // Create another request with the same caller id, but a differnt worker id, + // Create another request with the same caller id, but a different worker id, // and a newer timestamp. This simulates caller reconstruction. // Note that here the task request still has counter 0, which should be // ignored normally, but here it's from a different worker and with a newer diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 433841a4c..9f88b3989 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -72,9 +72,10 @@ class MockTaskFinisher : public TaskFinisherInterface { num_tasks_complete++; } - void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, + bool PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type, Status *status) override { num_tasks_failed++; + return true; } void OnTaskDependenciesInlined(const std::vector &inlined_dependency_ids, diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 3fa49f260..c6081e142 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -22,6 +22,14 @@ using ray::rpc::ActorTableData; namespace ray { +void CoreWorkerDirectActorTaskSubmitter::AddActorQueueIfNotExists( + const ActorID &actor_id) { + absl::MutexLock lock(&mu_); + // No need to check whether the insert was successful, since it is possible + // for this worker to have multiple references to the same actor. + client_queues_.emplace(actor_id, ClientQueue()); +} + void CoreWorkerDirectActorTaskSubmitter::KillActor(const ActorID &actor_id, bool force_kill, bool no_restart) { absl::MutexLock lock(&mu_); @@ -29,72 +37,72 @@ void CoreWorkerDirectActorTaskSubmitter::KillActor(const ActorID &actor_id, request.set_intended_actor_id(actor_id.Binary()); request.set_force_kill(force_kill); request.set_no_restart(no_restart); - auto inserted = pending_force_kills_.emplace(actor_id, request); - if (!inserted.second && force_kill) { + + auto it = client_queues_.find(actor_id); + // The language frontend can only kill actors that it has a reference to. + RAY_CHECK(it != client_queues_.end()); + + if (!it->second.pending_force_kill) { + it->second.pending_force_kill = request; + } else if (force_kill) { // Overwrite the previous request to kill the actor if the new request is a // force kill. - inserted.first->second.set_force_kill(true); + it->second.pending_force_kill->set_force_kill(true); if (no_restart) { - // Overwrite the previous request to disable reconstruction if the new request's + // Overwrite the previous request to disable restart if the new request's // no_restart flag is set to true. - inserted.first->second.set_no_restart(true); + it->second.pending_force_kill->set_no_restart(true); } } - auto it = rpc_clients_.find(actor_id); - if (it == rpc_clients_.end()) { - // Actor is not yet created, or is being restarted, cache the request - // and submit after actor is alive. - // TODO(zhijunfu): it might be possible for a user to specify an invalid - // actor handle (e.g. from unpickling), in that case it might be desirable - // to have a timeout to mark it as invalid if it doesn't show up in the - // specified time. - RAY_LOG(DEBUG) << "Actor " << actor_id << " is not yet created."; - } else { - SendPendingTasks(actor_id); - } + + SendPendingTasks(actor_id); } Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) { RAY_LOG(DEBUG) << "Submitting task " << task_spec.TaskId(); RAY_CHECK(task_spec.IsActorTask()); - // We must fix the send order prior to resolving dependencies, which may complete - // out of order. This ensures we preserve the client-side send order. - int64_t send_pos = -1; + bool task_queued = false; + uint64_t send_pos = 0; { absl::MutexLock lock(&mu_); - send_pos = next_send_position_to_assign_[task_spec.ActorId()]++; + auto queue = client_queues_.find(task_spec.ActorId()); + RAY_CHECK(queue != client_queues_.end()); + if (queue->second.state == rpc::ActorTableData::DEAD) { + task_finisher_->MarkTaskCanceled(task_spec.TaskId()); + auto status = Status::IOError("cancelling all pending tasks of dead actor"); + // No need to increment the number of completed tasks since the actor is + // dead. + RAY_UNUSED(!task_finisher_->PendingTaskFailed(task_spec.TaskId(), + rpc::ErrorType::ACTOR_DIED, &status)); + } else { + // We must fix the send order prior to resolving dependencies, which may + // complete out of order. This ensures that we will not deadlock due to + // backpressure. The receiving actor will execute the tasks according to + // this sequence number. + send_pos = task_spec.ActorCounter(); + auto inserted = + queue->second.requests.emplace(send_pos, std::make_pair(task_spec, false)); + RAY_CHECK(inserted.second); + task_queued = true; + } } - resolver_.ResolveDependencies(task_spec, [this, send_pos, task_spec]() mutable { - const auto &actor_id = task_spec.ActorId(); - - auto request = std::unique_ptr(new rpc::PushTaskRequest); - request->mutable_caller_address()->CopyFrom(rpc_address_); - // NOTE(swang): CopyFrom is needed because if we use Swap here and the task - // fails, then the task data will be gone when the TaskManager attempts to - // access the task. - request->mutable_task_spec()->CopyFrom(task_spec.GetMessage()); - request->set_caller_version(caller_creation_timestamp_ms_); - - absl::MutexLock lock(&mu_); - - auto inserted = pending_requests_[actor_id].emplace(send_pos, std::move(request)); - RAY_CHECK(inserted.second); - - auto it = rpc_clients_.find(actor_id); - if (it == rpc_clients_.end()) { - // Actor is not yet created, or is being restarted, cache the request - // and submit after actor is alive. - // TODO(zhijunfu): it might be possible for a user to specify an invalid - // actor handle (e.g. from unpickling), in that case it might be desirable - // to have a timeout to mark it as invalid if it doesn't show up in the - // specified time. - RAY_LOG(DEBUG) << "Actor " << actor_id << " is not yet created."; - } else { - SendPendingTasks(actor_id); - } - }); + if (task_queued) { + const auto actor_id = task_spec.ActorId(); + resolver_.ResolveDependencies(task_spec, [this, send_pos, actor_id]() { + absl::MutexLock lock(&mu_); + auto queue = client_queues_.find(actor_id); + RAY_CHECK(queue != client_queues_.end()); + auto it = queue->second.requests.find(send_pos); + // Only dispatch tasks if the submitted task is still queued. The task + // may have been dequeued if the actor has since failed. + if (it != queue->second.requests.end()) { + it->second.second = true; + SendPendingTasks(actor_id); + } + }); + } // If the task submission subsequently fails, then the client will receive // the error in a callback. @@ -104,105 +112,156 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id, const rpc::Address &address) { absl::MutexLock lock(&mu_); - // Update the mapping so new RPCs go out with the right intended worker id. - worker_ids_[actor_id] = address.worker_id(); - // Create a new connection to the actor. - // TODO(edoakes): are these clients cleaned up properly? - if (rpc_clients_.count(actor_id) == 0) { - rpc_clients_[actor_id] = - std::shared_ptr(client_factory_(address)); + + auto queue = client_queues_.find(actor_id); + RAY_CHECK(queue != client_queues_.end()); + if (queue->second.rpc_client) { + // Skip reconnection if we already have a client to this actor. + // NOTE(swang): This seems to only trigger in multithreaded Java tests. + RAY_CHECK(queue->second.worker_id == address.worker_id()); + return; } + + queue->second.state = rpc::ActorTableData::ALIVE; + // Update the mapping so new RPCs go out with the right intended worker id. + queue->second.worker_id = address.worker_id(); + // Create a new connection to the actor. + queue->second.rpc_client = + std::shared_ptr(client_factory_(address)); + // TODO(swang): This assumes that all replies from the previous incarnation + // of the actor have been received. Fix this by setting an epoch for each + // actor task, so we can ignore completed tasks from old epochs. + RAY_LOG(INFO) << "Resetting caller starts at for actor " << actor_id << " from " + << queue->second.caller_starts_at << " to " + << queue->second.num_completed_tasks; + queue->second.caller_starts_at = queue->second.num_completed_tasks; SendPendingTasks(actor_id); } void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(const ActorID &actor_id, bool dead) { absl::MutexLock lock(&mu_); - if (!dead) { - // We're restarting the actor, so erase the client for now. The new client - // will be inserted once actor reconstruction completes. We don't erase the - // client when the actor is DEAD, so that all further tasks will be failed. - rpc_clients_.erase(actor_id); - worker_ids_.erase(actor_id); + auto queue = client_queues_.find(actor_id); + RAY_CHECK(queue != client_queues_.end()); + + if (dead) { + queue->second.state = rpc::ActorTableData::DEAD; } else { + queue->second.state = rpc::ActorTableData::RESTARTING; + } + + // The actor failed, so erase the client for now. Either the actor is + // permanently dead or the new client will be inserted once the actor is + // restarted. + queue->second.rpc_client = nullptr; + queue->second.worker_id.clear(); + queue->second.pending_force_kill.reset(); + + // If there are pending requests, treat the pending tasks as failed. + if (dead) { RAY_LOG(INFO) << "Failing pending tasks for actor " << actor_id; - // If there are pending requests, treat the pending tasks as failed. - auto pending_it = pending_requests_.find(actor_id); - if (pending_it != pending_requests_.end()) { - auto head = pending_it->second.begin(); - while (head != pending_it->second.end()) { - auto request = std::move(head->second); - head = pending_it->second.erase(head); - auto task_id = TaskID::FromBinary(request->task_spec().task_id()); - auto status = Status::IOError("cancelling all pending tasks of dead actor"); - task_finisher_->PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED, &status); - } - pending_requests_.erase(pending_it); + auto &requests = queue->second.requests; + auto head = requests.begin(); + while (head != requests.end()) { + const auto &task_spec = head->second.first; + task_finisher_->MarkTaskCanceled(task_spec.TaskId()); + auto status = Status::IOError("cancelling all pending tasks of dead actor"); + // No need to increment the number of completed tasks since the actor is + // dead. + RAY_UNUSED(!task_finisher_->PendingTaskFailed(task_spec.TaskId(), + rpc::ErrorType::ACTOR_DIED, &status)); + head = requests.erase(head); } + // No need to clean up tasks that have been sent and are waiting for // replies. They will be treated as failed once the connection dies. // We retain the sequencing information so that we can properly fail // any tasks submitted after the actor death. - - pending_force_kills_.erase(actor_id); } } void CoreWorkerDirectActorTaskSubmitter::SendPendingTasks(const ActorID &actor_id) { - auto &client = rpc_clients_[actor_id]; - RAY_CHECK(client); + auto it = client_queues_.find(actor_id); + RAY_CHECK(it != client_queues_.end()); + if (!it->second.rpc_client) { + return; + } + // Check if there is a pending force kill. If there is, send it and disconnect the // client. - auto it = pending_force_kills_.find(actor_id); - if (it != pending_force_kills_.end()) { + if (it->second.pending_force_kill) { RAY_LOG(INFO) << "Sending KillActor request to actor " << actor_id; // It's okay if this fails because this means the worker is already dead. - RAY_UNUSED(client->KillActor(it->second, nullptr)); - pending_force_kills_.erase(it); + RAY_UNUSED(it->second.rpc_client->KillActor(*it->second.pending_force_kill, nullptr)); + it->second.pending_force_kill.reset(); } // Submit all pending requests. - auto &requests = pending_requests_[actor_id]; + auto &requests = it->second.requests; auto head = requests.begin(); - while (head != requests.end() && head->first == next_send_position_[actor_id]) { - auto request = std::move(head->second); + while (head != requests.end() && head->first <= it->second.next_send_position && + head->second.second) { + // If the task has been sent before, skip the other tasks in the send + // queue. + bool skip_queue = head->first < it->second.next_send_position; + auto task_spec = std::move(head->second.first); head = requests.erase(head); - auto num_returns = request->task_spec().num_returns(); - auto task_id = TaskID::FromBinary(request->task_spec().task_id()); - PushActorTask(*client, std::move(request), actor_id, task_id, num_returns); + RAY_CHECK(!it->second.worker_id.empty()); + PushActorTask(it->second, task_spec, skip_queue); + it->second.next_send_position++; } } -void CoreWorkerDirectActorTaskSubmitter::PushActorTask( - rpc::CoreWorkerClientInterface &client, std::unique_ptr request, - const ActorID &actor_id, const TaskID &task_id, int num_returns) { - RAY_LOG(DEBUG) << "Pushing task " << task_id << " to actor " << actor_id; - next_send_position_[actor_id]++; - auto it = worker_ids_.find(actor_id); - RAY_CHECK(it != worker_ids_.end()) << "Actor worker id not found " << actor_id.Hex(); - request->set_intended_worker_id(it->second); - rpc::Address addr(client.Addr()); - RAY_CHECK_OK(client.PushActorTask( - std::move(request), - [this, addr, task_id](Status status, const rpc::PushTaskReply &reply) { +void CoreWorkerDirectActorTaskSubmitter::PushActorTask(const ClientQueue &queue, + const TaskSpecification &task_spec, + bool skip_queue) { + auto request = std::unique_ptr(new rpc::PushTaskRequest()); + // NOTE(swang): CopyFrom is needed because if we use Swap here and the task + // fails, then the task data will be gone when the TaskManager attempts to + // access the task. + request->mutable_task_spec()->CopyFrom(task_spec.GetMessage()); + + request->set_intended_worker_id(queue.worker_id); + RAY_CHECK(task_spec.ActorCounter() >= queue.caller_starts_at) + << "actor counter " << task_spec.ActorCounter() << " " << queue.caller_starts_at; + request->set_sequence_number(task_spec.ActorCounter() - queue.caller_starts_at); + + const auto task_id = task_spec.TaskId(); + const auto actor_id = task_spec.ActorId(); + const auto counter = task_spec.ActorCounter(); + RAY_LOG(DEBUG) << "Pushing task " << task_id << " to actor " << actor_id + << " actor counter " << counter << " seq no " + << request->sequence_number(); + rpc::Address addr(queue.rpc_client->Addr()); + RAY_UNUSED(queue.rpc_client->PushActorTask( + std::move(request), skip_queue, + [this, addr, task_id, actor_id](Status status, const rpc::PushTaskReply &reply) { + bool increment_completed_tasks = true; if (!status.ok()) { - task_finisher_->PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED, &status); + bool will_retry = task_finisher_->PendingTaskFailed( + task_id, rpc::ErrorType::ACTOR_DIED, &status); + if (will_retry) { + increment_completed_tasks = false; + } } else { task_finisher_->CompletePendingTask(task_id, reply, addr); } + + if (increment_completed_tasks) { + absl::MutexLock lock(&mu_); + auto queue = client_queues_.find(actor_id); + RAY_CHECK(queue != client_queues_.end()); + queue->second.num_completed_tasks++; + } })); } bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) const { absl::MutexLock lock(&mu_); - auto iter = rpc_clients_.find(actor_id); - return (iter != rpc_clients_.end()); -} - -void CoreWorkerDirectActorTaskSubmitter::SetCallerCreationTimestamp(int64_t timestamp) { - caller_creation_timestamp_ms_ = timestamp; + auto iter = client_queues_.find(actor_id); + return (iter != client_queues_.end() && iter->second.rpc_client); } void CoreWorkerDirectTaskReceiver::Init( @@ -284,7 +343,7 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( << ", actor_id: " << task_spec.ActorCreationId(); // Tell raylet that an actor creation task has finished execution, so that // raylet can publish actor creation event to GCS, and mark this worker as - // actor, thus if this worker dies later raylet will reconstruct the actor. + // actor, thus if this worker dies later raylet will restart the actor. RAY_CHECK_OK(task_done_()); } } @@ -315,50 +374,15 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( send_reply_callback(Status::Invalid("client cancelled stale rpc"), nullptr, nullptr); }; - auto caller_worker_id = WorkerID::FromBinary(request.caller_address().worker_id()); - auto caller_version = request.caller_version(); - auto it = scheduling_queue_.find(task_spec.CallerId()); - if (it != scheduling_queue_.end()) { - if (it->second.first.caller_worker_id != caller_worker_id) { - // We received a request with the same caller ID, but from a different worker, - // this indicates the caller (actor) is restarted. - if (it->second.first.caller_creation_timestamp_ms < caller_version) { - // The new request has a newer caller version, then remove the old entry - // from scheduling queue since it's invalid now. - RAY_LOG(INFO) << "Remove existing scheduling queue for caller " - << task_spec.CallerId() << " after receiving a " - << "request from a different worker ID with a newer " - << "version, old worker ID: " << it->second.first.caller_worker_id - << ", new worker ID" << caller_worker_id; - scheduling_queue_.erase(task_spec.CallerId()); - it = scheduling_queue_.end(); - } else { - // The existing caller has the newer version, this indicates the request - // is from an old caller, which might be possible when network has problems. - // In this case fail this request. - RAY_LOG(WARNING) << "Ignoring request from an old caller because " - << "it has a smaller timestamp, old worker ID: " - << caller_worker_id << ", current worker ID" - << it->second.first.caller_worker_id; - // Fail request with an old caller version. - reject_callback(); - return; - } - } - } - + auto it = scheduling_queue_.find(task_spec.CallerWorkerId()); if (it == scheduling_queue_.end()) { - SchedulingQueueTag tag; - tag.caller_worker_id = caller_worker_id; - tag.caller_creation_timestamp_ms = caller_version; auto result = scheduling_queue_.emplace( - task_spec.CallerId(), - std::make_pair(tag, std::unique_ptr(new SchedulingQueue( - task_main_io_service_, *waiter_, worker_context_)))); + task_spec.CallerWorkerId(), + SchedulingQueue(task_main_io_service_, *waiter_, worker_context_)); it = result.first; } - it->second.second->Add(request.sequence_number(), request.client_processed_up_to(), - accept_callback, reject_callback, dependencies); + it->second.Add(request.sequence_number(), request.client_processed_up_to(), + accept_callback, reject_callback, dependencies); } void CoreWorkerDirectTaskReceiver::HandleDirectActorCallArgWaitComplete( diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index ef520ff32..80eb46630 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -50,15 +50,21 @@ const int kMaxReorderWaitSeconds = 30; // This class is thread-safe. class CoreWorkerDirectActorTaskSubmitter { public: - CoreWorkerDirectActorTaskSubmitter(rpc::Address rpc_address, - rpc::ClientFactoryFn client_factory, + CoreWorkerDirectActorTaskSubmitter(rpc::ClientFactoryFn client_factory, std::shared_ptr store, std::shared_ptr task_finisher) - : rpc_address_(rpc_address), - client_factory_(client_factory), + : client_factory_(client_factory), resolver_(store, task_finisher), task_finisher_(task_finisher) {} + /// Add an actor queue. This should be called whenever a reference to an + /// actor is created in the language frontend. + /// TODO(swang): Remove the actor queue once it is sure that this worker will + /// not receive another reference to the same actor. + /// + /// \param[in] actor_id The actor for whom to add a queue. + void AddActorQueueIfNotExists(const ActorID &actor_id); + /// Submit a task to an actor for execution. /// /// \param[in] task The task spec to submit. @@ -89,20 +95,84 @@ class CoreWorkerDirectActorTaskSubmitter { void SetCallerCreationTimestamp(int64_t timestamp); private: + struct ClientQueue { + /// The current state of the actor. If this is ALIVE, then we should have + /// an RPC client to the actor. If this is DEAD, then all tasks in the + /// queue will be marked failed and all other ClientQueue state is ignored. + rpc::ActorTableData::ActorState state = rpc::ActorTableData::PENDING; + /// The RPC client. We use shared_ptr to enable shared_from_this for + /// pending client callbacks. + std::shared_ptr rpc_client = nullptr; + /// The intended worker ID of the actor. + std::string worker_id = ""; + + /// The actor's pending requests, ordered by the task number (see below + /// diagram) in the request. The bool indicates whether the dependencies + /// for that task have been resolved yet. A task will be sent after its + /// dependencies have been resolved and its task number matches + /// next_send_position. + std::map> requests; + + /// Diagram of the sequence numbers assigned to actor tasks during actor + /// crash and restart: + /// + /// The actor starts, and 10 tasks are submitted. We have sent 6 tasks + /// (0-5) so far, and have received a successful reply for 4 tasks (0-3). + /// 0 1 2 3 4 5 6 7 8 9 + /// ^ next_send_position + /// ^ num_completed_tasks + /// ^ caller_starts_at + /// + /// Suppose the actor crashes and recovers. Then, caller_starts_at is reset + /// to the current num_completed_tasks. caller_starts_at is then subtracted + /// from each task's counter, so the recovered actor will receive the + /// sequence numbers 0, 1, 2 (and so on) for tasks 4, 5, 6, respectively. + /// Therefore, the recovered actor will restart execution from task 4. + /// 0 1 2 3 4 5 6 7 8 9 + /// ^ next_send_position + /// ^ num_completed_tasks + /// ^ caller_starts_at + /// + /// New actor tasks will continue to be sent even while tasks are being + /// resubmitted, but the receiving worker will only execute them after the + /// resent tasks. For example, this diagram shows what happens if task 6 is + /// sent for the first time, tasks 4 and 5 have been resent, and we have + /// received a successful reply for task 4. + /// 0 1 2 3 4 5 6 7 8 9 + /// ^ next_send_position + /// ^ num_completed_tasks + /// ^ caller_starts_at + /// + /// The send position of the next task to send to this actor. This sequence + /// number increases monotonically. + uint64_t next_send_position = 0; + /// The offset at which the the actor should start its counter for this + /// caller. This is used for actors that can be restarted, so that the new + /// instance of the actor knows from which task to start executing. + uint64_t caller_starts_at = 0; + /// Out of the tasks sent by this worker to the actor, the number of tasks + /// that we will never send to the actor again. This is used to reset + /// caller_starts_at if the actor dies and is restarted. We only include + /// tasks that will not be sent again, to support automatic task retry on + /// actor failure. + uint64_t num_completed_tasks = 0; + + /// A force-kill request that should be sent to the actor once an RPC + /// client to the actor is available. + absl::optional pending_force_kill; + }; + /// Push a task to a remote actor via the given client. /// Note, this function doesn't return any error status code. If an error occurs while /// sending the request, this task will be treated as failed. /// - /// \param[in] client The RPC client to send tasks to an actor. - /// \param[in] request The request to send. - /// \param[in] actor_id Actor ID. - /// \param[in] task_id The ID of a task. - /// \param[in] num_returns Number of return objects. + /// \param[in] queue The actor queue. Contains the RPC client state. + /// \param[in] task_spec The task to send. + /// \param[in] skip_queue Whether to skip the task queue. This will send the + /// task for execution immediately. /// \return Void. - void PushActorTask(rpc::CoreWorkerClientInterface &client, - std::unique_ptr request, - const ActorID &actor_id, const TaskID &task_id, int num_returns) - EXCLUSIVE_LOCKS_REQUIRED(mu_); + void PushActorTask(const ClientQueue &queue, const TaskSpecification &task_spec, + bool skip_queue) EXCLUSIVE_LOCKS_REQUIRED(mu_); /// Send all pending tasks for an actor. /// Note that this function doesn't take lock, the caller is expected to hold @@ -124,39 +194,7 @@ class CoreWorkerDirectActorTaskSubmitter { /// Mutex to proect the various maps below. mutable absl::Mutex mu_; - /// Address of our RPC server. - rpc::Address rpc_address_; - - /// Map from actor id to rpc client. This only includes actors that we send tasks to. - /// We use shared_ptr to enable shared_from_this for pending client callbacks. - /// - /// TODO(zhijunfu): this will be moved into `actor_states_` later when we can - /// subscribe updates for a specific actor. - absl::flat_hash_map> - rpc_clients_ GUARDED_BY(mu_); - - /// Map from actor ids to worker ids. TODO(ekl) consider unifying this with the - /// rpc_clients_ map. - absl::flat_hash_map worker_ids_ GUARDED_BY(mu_); - - /// Map from actor ids that should be force killed once a client is available to the - /// pending kill actor requests. - absl::flat_hash_map pending_force_kills_ - GUARDED_BY(mu_); - - /// Map from actor id to the actor's pending requests. Each actor's requests - /// are ordered by the task number in the request. - absl::flat_hash_map>> - pending_requests_ GUARDED_BY(mu_); - - /// Map from actor id to the send position of the next task to queue for send - /// for that actor. This is always greater than or equal to next_send_position_. - absl::flat_hash_map next_send_position_to_assign_ GUARDED_BY(mu_); - - /// Map from actor id to the send position of the next task to send to that actor. - /// Note that this differs from the PushTaskRequest's sequence number in that it - /// increases monotonically in this process independently of CallerId changes. - absl::flat_hash_map next_send_position_ GUARDED_BY(mu_); + absl::flat_hash_map client_queues_ GUARDED_BY(mu_); /// Resolve direct call object dependencies; LocalDependencyResolver resolver_; @@ -164,12 +202,6 @@ class CoreWorkerDirectActorTaskSubmitter { /// Used to complete tasks. std::shared_ptr task_finisher_; - /// Timestamp when the caller is created. - /// - if this worker is an actor, this is set to the time that the actor creation - /// task starts execution; - /// - otherwise, it's set to the time that the current task starts execution. - int64_t caller_creation_timestamp_ms_ = 0; - friend class CoreWorkerTest; }; @@ -262,13 +294,6 @@ class BoundedExecutor { boost::asio::thread_pool pool_; }; -struct SchedulingQueueTag { - /// Worker ID for the caller. - WorkerID caller_worker_id; - /// Timestamp for the caller, which is used as a version. - int64_t caller_creation_timestamp_ms = 0; -}; - /// Used to ensure serial order of task execution per actor handle. /// See direct_actor.proto for a description of the ordering protocol. class SchedulingQueue { @@ -475,9 +500,7 @@ class CoreWorkerDirectTaskReceiver { std::unique_ptr waiter_; /// Queue of pending requests per actor handle. /// TODO(ekl) GC these queues once the handle is no longer active. - std::unordered_map>> - scheduling_queue_; + std::unordered_map scheduling_queue_; }; } // namespace ray diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index 0c29d99af..683f36b0f 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -39,8 +39,8 @@ Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) { } else { RAY_LOG(ERROR) << "Failed to create actor " << actor_id << " with: " << status.ToString(); - task_finisher_->PendingTaskFailed( - task_id, rpc::ErrorType::ACTOR_CREATION_FAILED, &status); + RAY_UNUSED(task_finisher_->PendingTaskFailed( + task_id, rpc::ErrorType::ACTOR_CREATION_FAILED, &status)); } })); return; @@ -70,8 +70,8 @@ Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) { } } if (!keep_executing) { - task_finisher_->PendingTaskFailed(task_spec.TaskId(), - rpc::ErrorType::TASK_CANCELLED, nullptr); + RAY_UNUSED(task_finisher_->PendingTaskFailed( + task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED, nullptr)); } }); return Status::OK(); @@ -256,7 +256,6 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask( // NOTE(swang): CopyFrom is needed because if we use Swap here and the task // fails, then the task data will be gone when the TaskManager attempts to // access the task. - request->mutable_caller_address()->CopyFrom(rpc_address_); request->mutable_task_spec()->CopyFrom(task_spec.GetMessage()); request->mutable_resource_mapping()->CopyFrom(assigned_resources); request->set_intended_worker_id(addr.worker_id.Binary()); @@ -284,10 +283,10 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask( // failure (e.g., by contacting the raylet). If it was a process // failure, it may have been an application-level error and it may // not make sense to retry the task. - task_finisher_->PendingTaskFailed( + RAY_UNUSED(task_finisher_->PendingTaskFailed( task_id, is_actor ? rpc::ErrorType::ACTOR_DIED : rpc::ErrorType::WORKER_DIED, - &status); + &status)); } else { task_finisher_->CompletePendingTask(task_id, reply, addr.ToProto()); } @@ -321,8 +320,8 @@ Status CoreWorkerDirectTaskSubmitter::CancelTask(TaskSpecification task_spec, task_queues_.erase(scheduling_key); CancelWorkerLeaseIfNeeded(scheduling_key); } - task_finisher_->PendingTaskFailed(task_spec.TaskId(), - rpc::ErrorType::TASK_CANCELLED); + RAY_UNUSED(task_finisher_->PendingTaskFailed(task_spec.TaskId(), + rpc::ErrorType::TASK_CANCELLED)); return Status::OK(); } } diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 56363c71d..98c181983 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -50,6 +50,9 @@ message ActorHandle { // An extension field that is used for storing app-language-specific data. bytes extension_data = 8; + + // How many times tasks may be retried on this actor if the actor fails. + int64 max_task_retries = 9; } message AssignTaskRequest { @@ -89,30 +92,21 @@ message ReturnObject { message PushTaskRequest { // The ID of the worker this message is intended for. bytes intended_worker_id = 1; - // Address of the caller. - Address caller_address = 2; // The task to be pushed. - TaskSpec task_spec = 3; + TaskSpec task_spec = 2; // The sequence number of the task for this client. This must increase // sequentially starting from zero for each actor handle. The server // will guarantee tasks execute in this sequence, waiting for any // out-of-order request messages to arrive as necessary. // If set to -1, ordering is disabled and the task executes immediately. - int64 sequence_number = 4; + int64 sequence_number = 3; // The max sequence number the client has processed responses for. This // is a performance optimization that allows the client to tell the server // to cancel any PushTaskRequests with seqno <= this value, rather than // waiting for the server to time out waiting for missing messages. - int64 client_processed_up_to = 5; + int64 client_processed_up_to = 4; // Resource mapping ids assigned to the worker executing the task. - repeated ResourceMapEntry resource_mapping = 6; - // The version of the caller. This is used to distinguish on-the-fly - // requests from a caller before it die, and requests from the restarted - // caller, which might happen theoretically when network has issues. - // - For an actor, this is set to the timestamp when the actor is created, - // so it can be used to differentiate which is the newly restarted actor. - // - For a non-actor task, it's set to the timestamp the task starts execution. - int64 caller_version = 7; + repeated ResourceMapEntry resource_mapping = 5; } message PushTaskReply { diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index 81c649be3..24a62acf2 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -112,9 +112,12 @@ class CoreWorkerClientInterface { /// Push an actor task directly from worker to worker. /// /// \param[in] request The request message. + /// \param[in] skip_queue Whether to skip the task queue. This will send the + /// task for execution immediately. /// \param[in] callback The callback function that handles reply. /// \return if the rpc call succeeds virtual ray::Status PushActorTask(std::unique_ptr request, + bool skip_queue, const ClientCallback &callback) { return Status::NotImplemented(""); } @@ -244,16 +247,19 @@ class CoreWorkerClient : public std::enable_shared_from_this, RPC_CLIENT_METHOD(CoreWorkerService, PlasmaObjectReady, grpc_client_, override) - ray::Status PushActorTask(std::unique_ptr request, + ray::Status PushActorTask(std::unique_ptr request, bool skip_queue, const ClientCallback &callback) override { - request->set_sequence_number(request->task_spec().actor_task_spec().actor_counter()); + if (skip_queue) { + // Set this value so that the actor does not skip any tasks when + // processing this request. We could also set it to max_finished_seq_no_, + // but we just set it to the default of -1 to avoid taking the lock. + request->set_client_processed_up_to(-1); + return INVOKE_RPC_CALL(CoreWorkerService, PushTask, *request, callback, + grpc_client_); + } + { std::lock_guard lock(mutex_); - if (request->task_spec().caller_id() != cur_caller_id_) { - // We are running a new task, reset the seq no counter. - max_finished_seq_no_ = -1; - cur_caller_id_ = request->task_spec().caller_id(); - } send_queue_.push_back(std::make_pair(std::move(request), callback)); } SendRequests(); @@ -331,10 +337,6 @@ class CoreWorkerClient : public std::enable_shared_from_this, /// The max sequence number we have processed responses for. int64_t max_finished_seq_no_ GUARDED_BY(mutex_) = -1; - - /// The task id we are currently sending requests for. When this changes, - /// the max finished seq no counter is reset. - std::string cur_caller_id_; }; } // namespace rpc diff --git a/streaming/src/test/queue_tests_base.h b/streaming/src/test/queue_tests_base.h index fb8fad9be..e198e844d 100644 --- a/streaming/src/test/queue_tests_base.h +++ b/streaming/src/test/queue_tests_base.h @@ -275,8 +275,9 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { std::string name = ""; ActorCreationOptions actor_options{ max_restarts, - /*max_concurrency=*/1, resources, resources, {}, - /*is_detached=*/false, name, /*is_asyncio=*/false}; + /*max_task_retries=*/0, + /*max_concurrency=*/1, resources, resources, {}, + /*is_detached=*/false, name, /*is_asyncio=*/false}; // Create an actor. ActorID actor_id; RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().CreateActor(