Option to retry failed actor tasks (#8330)

* Python

* Consolidate state in the direct actor transport, set the caller starts at

* todo

* Remove unused

* Update and unit tests

* Doc

* Remove unused

* doc

* Remove debug

* Update src/ray/core_worker/transport/direct_actor_transport.h

Co-authored-by: Eric Liang <ekhliang@gmail.com>

* Update src/ray/core_worker/transport/direct_actor_transport.cc

Co-authored-by: Eric Liang <ekhliang@gmail.com>

* lint and fix build

* Update

* Fix build

* Fix tests

* Unit test for max_task_retries=0

* Fix java?

* Fix bad test

* Cross language fix

* fix java

Co-authored-by: Eric Liang <ekhliang@gmail.com>
This commit is contained in:
Stephanie Wang
2020-05-15 20:15:15 -07:00
committed by GitHub
parent 41d8c2bd0a
commit bd169749e0
26 changed files with 873 additions and 564 deletions
+77 -3
View File
@@ -78,12 +78,86 @@ You can experiment with this behavior by running the following code.
actor = Actor.remote()
# The actor will be restarted up to 5 times. After that, methods will
# raise exceptions. The actor is restarted by rerunning its
# constructor. Methods that were executing when the actor died will also
# raise exceptions.
# always raise a `RayActorError` exception. The actor is restarted by
# rerunning its constructor. Methods that were sent or executing when the
# actor died will also raise a `RayActorError` exception.
for _ in range(100):
try:
counter = ray.get(actor.increment_and_possibly_fail.remote())
print(counter)
except ray.exceptions.RayActorError:
print('FAILURE')
By default, actor tasks execute with at-most-once semantics
(``max_task_retries=0`` in the ``@ray.remote`` decorator). This means that if an
actor task is submitted to an actor that is unreachable, Ray will report the
error with ``RayActorError``, a Python-level exception that is thrown when
``ray.get`` is called on the future returned by the task. Note that this
exception may be thrown even though the task did indeed execute successfully.
For example, this can happen if the actor dies immediately after executing the
task.
Ray also offers at-least-once execution semantics for actor tasks
(``max_task_retries=-1`` or ``max_task_retries > 0``). This means that if an
actor task is submitted to an actor that is unreachable, the system will
automatically retry the task until it receives a reply from the actor. With
this option, the system will only throw a ``RayActorError`` to the application
if one of the following occurs: (1) the actors ``max_restarts`` limit has been
exceeded and the actor cannot be restarted anymore, or (2) the
``max_task_retries`` limit has been exceeded for this particular task. The
limit can be set to infinity with ``max_task_retries = -1``.
You can experiment with this behavior by running the following code.
.. code-block:: python
import os
import ray
ray.init(ignore_reinit_error=True)
@ray.remote(max_restarts=5, max_task_retries=-1)
class Actor:
def __init__(self):
self.counter = 0
def increment_and_possibly_fail(self):
# Exit after every 10 tasks.
if self.counter == 10:
os._exit(0)
self.counter += 1
return self.counter
actor = Actor.remote()
# The actor will be reconstructed up to 5 times. The actor is
# reconstructed by rerunning its constructor. Methods that were
# executing when the actor died will be retried and will not
# raise a `RayActorError`. Retried methods may execute twice, once
# on the failed actor and a second time on the restarted actor.
for _ in range(50):
counter = ray.get(actor.increment_and_possibly_fail.remote())
print(counter) # Prints the sequence 1-10 5 times.
# After the actor has been restarted 5 times, all subsequent methods will
# raise a `RayActorError`.
for _ in range(10):
try:
counter = ray.get(actor.increment_and_possibly_fail.remote())
print(counter) # Unreachable.
except ray.exceptions.RayActorError:
print('FAILURE') # Prints 10 times.
For at-least-once actors, the system will still guarantee execution ordering
according to the initial submission order. For example, any tasks submitted
after a failed actor task will not execute on the actor until the failed actor
task has been successfully retried. The system also will not attempt to
re-execute any tasks that executed successfully before the failure.
At-least-once execution is best suited for read-only actors or actors with
ephemeral state that does not need to be rebuilt after a failure. For actors
that have critical state, it is best to take periodic checkpoints and either
manually restart the actor or automatically restart the actor with at-most-once
semantics. If the actors exact state at the time of failure is needed, the
application is responsible for resubmitting all tasks since the last
checkpoint.
+2 -1
View File
@@ -903,6 +903,7 @@ cdef class CoreWorker:
FunctionDescriptor function_descriptor,
args,
int64_t max_restarts,
int64_t max_task_retries,
resources,
placement_resources,
int32_t max_concurrency,
@@ -929,7 +930,7 @@ cdef class CoreWorker:
check_status(CCoreWorkerProcess.GetCoreWorker().CreateActor(
ray_function, args_vector,
CActorCreationOptions(
max_restarts, max_concurrency,
max_restarts, max_task_retries, max_concurrency,
c_resources, c_placement_resources,
dynamic_worker_options, is_detached, name, is_asyncio),
extension_data,
+21 -9
View File
@@ -244,7 +244,8 @@ class ActorClassMetadata:
def __init__(self, language, modified_class,
actor_creation_function_descriptor, class_id, max_restarts,
num_cpus, num_gpus, memory, object_store_memory, resources):
max_task_retries, num_cpus, num_gpus, memory,
object_store_memory, resources):
self.language = language
self.modified_class = modified_class
self.actor_creation_function_descriptor = \
@@ -253,6 +254,7 @@ class ActorClassMetadata:
self.is_cross_language = language != Language.PYTHON
self.class_id = class_id
self.max_restarts = max_restarts
self.max_task_retries = max_task_retries
self.num_cpus = num_cpus
self.num_gpus = num_gpus
self.memory = memory
@@ -314,7 +316,7 @@ class ActorClass:
@classmethod
def _ray_from_modified_class(cls, modified_class, class_id, max_restarts,
num_cpus, num_gpus, memory,
max_task_retries, num_cpus, num_gpus, memory,
object_store_memory, resources):
for attribute in [
"remote", "_remote", "_ray_from_modified_class",
@@ -344,20 +346,22 @@ class ActorClass:
self.__ray_metadata__ = ActorClassMetadata(
Language.PYTHON, modified_class,
actor_creation_function_descriptor, class_id, max_restarts,
num_cpus, num_gpus, memory, object_store_memory, resources)
max_task_retries, num_cpus, num_gpus, memory, object_store_memory,
resources)
return self
@classmethod
def _ray_from_function_descriptor(
cls, language, actor_creation_function_descriptor, max_restarts,
num_cpus, num_gpus, memory, object_store_memory, resources):
max_task_retries, num_cpus, num_gpus, memory, object_store_memory,
resources):
self = ActorClass.__new__(ActorClass)
self.__ray_metadata__ = ActorClassMetadata(
language, None, actor_creation_function_descriptor, None,
max_restarts, num_cpus, num_gpus, memory, object_store_memory,
resources)
max_restarts, max_task_retries, num_cpus, num_gpus, memory,
object_store_memory, resources)
return self
@@ -406,6 +410,7 @@ class ActorClass:
is_direct_call=None,
max_concurrency=None,
max_restarts=None,
max_task_retries=None,
name=None,
detached=False):
"""Create an actor.
@@ -557,6 +562,7 @@ class ActorClass:
meta.actor_creation_function_descriptor,
creation_args,
max_restarts or meta.max_restarts,
max_task_retries or meta.max_task_retries,
resources,
actor_placement_resources,
max_concurrency,
@@ -891,11 +897,13 @@ def modify_class(cls):
def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources,
max_restarts):
max_restarts, max_task_retries):
Class = modify_class(cls)
if max_restarts is None:
max_restarts = 0
if max_task_retries is None:
max_task_retries = 0
infinite_restart = max_restarts == -1
if not infinite_restart:
@@ -907,9 +915,13 @@ def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources,
# an overflow.
max_restarts = min(max_restarts, ray_constants.MAX_INT64_VALUE)
if max_restarts == 0 and max_task_retries != 0:
raise ValueError(
"max_task_retries cannot be set if max_restarts is 0.")
return ActorClass._ray_from_modified_class(
Class, ActorClassID.from_random(), max_restarts, num_cpus, num_gpus,
memory, object_store_memory, resources)
Class, ActorClassID.from_random(), max_restarts, max_task_retries,
num_cpus, num_gpus, memory, object_store_memory, resources)
def exit_actor():
+1
View File
@@ -77,6 +77,7 @@ def java_actor_class(class_name):
Language.JAVA,
JavaFunctionDescriptor(class_name, "<init>", ""),
0, # max_restarts,
0, # max_task_retries,
None, # num_cpus,
None, # num_gpus,
None, # memory,
+1
View File
@@ -231,6 +231,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
CActorCreationOptions()
CActorCreationOptions(
int64_t max_restarts,
int64_t max_task_retries,
int32_t max_concurrency,
const unordered_map[c_string, double] &resources,
const unordered_map[c_string, double] &placement_resources,
+1 -1
View File
@@ -51,7 +51,7 @@ py_test(
size = "medium",
srcs = ["test_actor_failures.py"],
# TODO(ekl) enable this once we support actor reconstruction again
tags = ["exclusive", "manual"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
+225 -139
View File
@@ -3,10 +3,6 @@ import json
import numpy as np
import os
import pytest
try:
import pytest_timeout
except ImportError:
pytest_timeout = None
import signal
import sys
import time
@@ -88,9 +84,13 @@ def ray_checkpointable_actor_cls(request):
@pytest.mark.parametrize(
"ray_start_object_store_memory", [150 * 1024 * 1024], indirect=True)
def test_actor_eviction(ray_start_object_store_memory):
object_store_memory = ray_start_object_store_memory
"ray_start_regular", [{
"object_store_memory": 150 * 1024 * 1024,
"lru_evict": True,
}],
indirect=True)
def test_actor_eviction(ray_start_regular):
object_store_memory = 150 * 1024 * 1024
@ray.remote
class Actor:
@@ -127,10 +127,14 @@ def test_actor_eviction(ray_start_object_store_memory):
assert num_success > 0
def test_actor_restart(ray_start_regular):
"""Test actor reconstruction when actor process is killed."""
def test_actor_restart():
"""Test actor restart when actor process is killed."""
ray.init(
_internal_config=json.dumps({
"task_retry_delay_ms": 100,
}), )
@ray.remote(max_restarts=1)
@ray.remote(max_restarts=1, max_task_retries=-1)
class RestartableActor:
"""An actor that will be restarted at most once."""
@@ -147,20 +151,48 @@ def test_actor_restart(ray_start_regular):
actor = RestartableActor.remote()
pid = ray.get(actor.get_pid.remote())
# Call increase 3 times
for _ in range(3):
ray.get(actor.increase.remote())
# Call increase again with some delay.
result = actor.increase.remote(delay=0.5)
# Sleep some time to wait for the above task to start execution.
time.sleep(0.2)
results = [actor.increase.remote() for _ in range(100)]
# Kill actor process, while the above task is still being executed.
os.kill(pid, signal.SIGKILL)
# Check that the above task didn't fail and the actor is restarted.
assert ray.get(result) == 4
# Make sure that all tasks were executed in order before the actor's death.
res = results.pop(0)
i = 1
while True:
try:
r = ray.get(res)
if r != i:
# Actor restarted without any failed tasks.
break
res = results.pop(0)
i += 1
except ray.exceptions.RayActorError:
# Actor restarted.
break
# Find the first task to execute after the actor was restarted.
while True:
try:
r = ray.get(res)
break
except ray.exceptions.RayActorError:
res = results.pop(0)
pass
# Make sure that all tasks were executed in order after the actor's death.
i = 1
while True:
r = ray.get(res)
assert r == i
if results:
res = results.pop(0)
i += 1
else:
break
# Check that we can still call the actor.
assert ray.get(actor.increase.remote()) == 5
result = actor.increase.remote()
assert ray.get(result) == r + 1
# kill actor process one more time.
results = [actor.increase.remote() for _ in range(100)]
pid = ray.get(actor.get_pid.remote())
os.kill(pid, signal.SIGKILL)
# The actor has exceeded max restarts, and this task should fail.
@@ -174,37 +206,154 @@ def test_actor_restart(ray_start_regular):
# Check that the actor won't be restarted.
with pytest.raises(ray.exceptions.RayActorError):
ray.get(actor.increase.remote())
ray.shutdown()
def test_actor_restart_with_retry():
"""Test actor restart when actor process is killed."""
ray.init(
_internal_config=json.dumps({
"task_retry_delay_ms": 100,
}), )
@ray.remote(max_restarts=1, max_task_retries=-1)
class RestartableActor:
"""An actor that will be restarted at most once."""
def __init__(self):
self.value = 0
def increase(self, delay=0):
time.sleep(delay)
self.value += 1
return self.value
def get_pid(self):
return os.getpid()
actor = RestartableActor.remote()
pid = ray.get(actor.get_pid.remote())
results = [actor.increase.remote() for _ in range(100)]
# Kill actor process, while the above task is still being executed.
os.kill(pid, signal.SIGKILL)
# Check that none of the tasks failed and the actor is restarted.
seq = list(range(1, 101))
results = ray.get(results)
failed_task_index = None
# Make sure that all tasks were executed in order before and after the
# actor's death.
for i, res in enumerate(results):
if res != seq[0]:
if failed_task_index is None:
failed_task_index = i
assert res + failed_task_index == seq[0]
seq.pop(0)
# Check that we can still call the actor.
result = actor.increase.remote()
assert ray.get(result) == results[-1] + 1
# kill actor process one more time.
results = [actor.increase.remote() for _ in range(100)]
pid = ray.get(actor.get_pid.remote())
os.kill(pid, signal.SIGKILL)
# The actor has exceeded max restarts, and this task should fail.
with pytest.raises(ray.exceptions.RayActorError):
ray.get(actor.increase.remote())
# Create another actor.
actor = RestartableActor.remote()
# Intentionlly exit the actor
actor.__ray_terminate__.remote()
# Check that the actor won't be restarted.
with pytest.raises(ray.exceptions.RayActorError):
ray.get(actor.increase.remote())
ray.shutdown()
def test_actor_restart_on_node_failure(ray_start_cluster):
config = json.dumps({
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
"initial_reconstruction_timeout_milliseconds": 1000,
"task_retry_delay_ms": 100,
})
cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(num_cpus=0, _internal_config=config)
# Node to place the actor.
cluster.add_node(num_cpus=1, _internal_config=config)
cluster.wait_for_nodes()
ray.init(address=cluster.address)
@ray.remote(num_cpus=1, max_restarts=1, max_task_retries=-1)
class RestartableActor:
"""An actor that will be reconstructed at most once."""
def __init__(self):
self.value = 0
def increase(self):
self.value += 1
return self.value
def ready(self):
return
actor = RestartableActor.remote()
ray.get(actor.ready.remote())
results = [actor.increase.remote() for _ in range(100)]
# Kill actor node, while the above task is still being executed.
cluster.remove_node(cluster.list_all_nodes()[-1])
cluster.add_node(num_cpus=1, _internal_config=config)
cluster.wait_for_nodes()
# Check that none of the tasks failed and the actor is restarted.
seq = list(range(1, 101))
results = ray.get(results)
failed_task_index = None
# Make sure that all tasks were executed in order before and after the
# actor's death.
for i, res in enumerate(results):
elm = seq.pop(0)
if res != elm:
if failed_task_index is None:
failed_task_index = i
assert res + failed_task_index == elm
# Check that we can still call the actor.
result = ray.get(actor.increase.remote())
assert result == 1 or result == results[-1] + 1
def test_actor_restart_without_task(ray_start_regular):
"""Test a dead actor can be restarted without sending task to it."""
@ray.remote(max_restarts=1)
@ray.remote(max_restarts=1, resources={"actor": 1})
class RestartableActor:
def __init__(self, obj_ids):
for obj_id in obj_ids:
# Every time the actor gets constructed,
# put a new object in plasma store.
global_worker = ray.worker.global_worker
if not global_worker.core_worker.object_exists(obj_id):
global_worker.put_object(1, obj_id)
break
def __init__(self):
pass
def get_pid(self):
return os.getpid()
obj_ids = [ray.ObjectID.from_random() for _ in range(2)]
actor = RestartableActor.remote(obj_ids)
@ray.remote(resources={"actor": 1})
def probe():
return
# Returns whether the "actor" resource is available.
def actor_resource_available():
p = probe.remote()
ready, _ = ray.wait([p], timeout=1)
return len(ready) > 0
ray.experimental.set_resource("actor", 1)
actor = RestartableActor.remote()
assert wait_for_condition(lambda: not actor_resource_available())
# Kill the actor.
pid = ray.get(actor.get_pid.remote())
p = probe.remote()
os.kill(pid, signal.SIGKILL)
# Wait until the actor is reconstructed.
def check_restarted():
worker = ray.worker.global_worker
return worker.core_worker.object_exists(obj_ids[1])
assert wait_for_condition(check_restarted)
ray.get(p)
assert wait_for_condition(lambda: not actor_resource_available())
def test_caller_actor_restart(ray_start_regular):
@@ -277,66 +426,6 @@ def test_caller_task_reconstruction(ray_start_regular):
assert ray.get(RetryableTask.remote(remote_actor)) == 3
def test_actor_restart_on_node_failure(ray_start_cluster_head):
"""Test actor reconstruction when node dies unexpectedly."""
cluster = ray_start_cluster_head
max_restarts = 3
# Add a few nodes to the cluster.
# Use custom resource to make sure the actor is only created on worker
# nodes, not on the head node.
for _ in range(max_restarts + 2):
cluster.add_node(
resources={"a": 1},
_internal_config=json.dumps({
"initial_reconstruction_timeout_milliseconds": 200,
"num_heartbeats_timeout": 10,
}),
)
def kill_node(node_id):
node_to_remove = None
for node in cluster.worker_nodes:
if node_id == node.unique_id:
node_to_remove = node
cluster.remove_node(node_to_remove)
@ray.remote(max_restarts=max_restarts, resources={"a": 1})
class MyActor:
def __init__(self):
self.value = 0
def increase(self):
self.value += 1
return self.value
def get_object_store_socket(self):
return ray.worker.global_worker.node.unique_id
actor = MyActor.remote()
# Call increase 3 times.
for _ in range(3):
ray.get(actor.increase.remote())
for i in range(max_restarts):
object_store_socket = ray.get(actor.get_object_store_socket.remote())
# Kill actor's node and the actor should be restarted
# on a different node.
kill_node(object_store_socket)
# Call increase again.
# Check that the actor is restarted and value is correct.
assert ray.get(actor.increase.remote()) == 4 + i
# Check that the actor is now on a different node.
assert object_store_socket != ray.get(
actor.get_object_store_socket.remote())
# kill the node again.
object_store_socket = ray.get(actor.get_object_store_socket.remote())
kill_node(object_store_socket)
# The actor has exceeded max restarts, and this task should fail.
with pytest.raises(ray.exceptions.RayActorError):
ray.get(actor.increase.remote())
# NOTE(hchen): we set initial_reconstruction_timeout_milliseconds to 1s for
# this test. Because if this value is too small, suprious task reconstruction
# may happen and cause the test fauilure. If the value is too large, this test
@@ -365,7 +454,7 @@ def test_multiple_actor_restart(ray_start_cluster_head):
})) for _ in range(num_nodes)
]
@ray.remote(max_restarts=-1)
@ray.remote(max_restarts=-1, max_task_retries=-1)
class SlowCounter:
def __init__(self):
self.x = 0
@@ -407,8 +496,12 @@ def test_multiple_actor_restart(ray_start_cluster_head):
# Get the results and check that they have the correct values.
for _, result_id_list in result_ids.items():
results = list(range(1, len(result_id_list) + 1))
assert ray.get(result_id_list) == results
results = ray.get(result_id_list)
for i, result in enumerate(results):
if i == 0:
assert result == 1
else:
assert result == results[i - 1] + 1 or result == 1
def kill_actor(actor):
@@ -418,6 +511,7 @@ def kill_actor(actor):
wait_for_pid_to_exit(pid)
@pytest.mark.skip(reason="TODO: Actor checkpointing")
def test_checkpointing(ray_start_regular, ray_checkpointable_actor_cls):
"""Test actor checkpointing and restoring from a checkpoint."""
actor = ray.remote(max_restarts=2)(ray_checkpointable_actor_cls).remote()
@@ -440,13 +534,14 @@ def test_checkpointing(ray_start_regular, ray_checkpointable_actor_cls):
for _ in range(3):
ray.get(actor.increase.remote())
expected += 1
# Kill actor again and check that reconstruction still works after the
# Kill actor again and check that restart still works after the
# actor resuming from a checkpoint.
kill_actor(actor)
assert ray.get(actor.get.remote()) == expected
assert ray.get(actor.was_resumed_from_checkpoint.remote()) is True
@pytest.mark.skip(reason="TODO: Actor checkpointing")
def test_remote_checkpointing(ray_start_regular, ray_checkpointable_actor_cls):
"""Test checkpointing of a remote actor through method invocation."""
@@ -487,13 +582,14 @@ def test_remote_checkpointing(ray_start_regular, ray_checkpointable_actor_cls):
for _ in range(3):
ray.get(actor.increase.remote())
expected += 1
# Kill actor again and check that reconstruction still works after the
# Kill actor again and check that restart still works after the
# actor resuming from a checkpoint.
kill_actor(actor)
assert ray.get(actor.get.remote()) == expected
assert ray.get(actor.was_resumed_from_checkpoint.remote()) is True
@pytest.mark.skip(reason="TODO: Actor checkpointing")
def test_checkpointing_on_node_failure(ray_start_cluster_2_nodes,
ray_checkpointable_actor_cls):
"""Test actor checkpointing on a remote node."""
@@ -520,6 +616,7 @@ def test_checkpointing_on_node_failure(ray_start_cluster_2_nodes,
assert ray.get(actor.was_resumed_from_checkpoint.remote()) is True
@pytest.mark.skip(reason="TODO: Actor checkpointing")
def test_checkpointing_save_exception(ray_start_regular,
ray_checkpointable_actor_cls):
"""Test actor can still be recovered if checkpoints fail to complete."""
@@ -549,7 +646,7 @@ def test_checkpointing_save_exception(ray_start_regular,
for _ in range(3):
ray.get(actor.increase.remote())
expected += 1
# Kill actor again, and check that reconstruction still works and the actor
# Kill actor again, and check that restart still works and the actor
# wasn't resumed from a checkpoint.
kill_actor(actor)
assert ray.get(actor.get.remote()) == expected
@@ -559,6 +656,7 @@ def test_checkpointing_save_exception(ray_start_regular,
wait_for_errors(ray_constants.CHECKPOINT_PUSH_ERROR, 1)
@pytest.mark.skip(reason="TODO: Actor checkpointing")
def test_checkpointing_load_exception(ray_start_regular,
ray_checkpointable_actor_cls):
"""Test actor can still be recovered if checkpoints fail to load."""
@@ -589,7 +687,7 @@ def test_checkpointing_load_exception(ray_start_regular,
for _ in range(3):
ray.get(actor.increase.remote())
expected += 1
# Kill actor again, and check that reconstruction still works and the actor
# Kill actor again, and check that restart still works and the actor
# wasn't resumed from a checkpoint.
kill_actor(actor)
assert ray.get(actor.get.remote()) == expected
@@ -718,10 +816,6 @@ def test_decorated_method(ray_start_regular):
assert ray.get(object_id) == 7 # 2 * 3 + 1
@pytest.mark.skipif(
pytest_timeout is None,
reason="Timeout package not installed; skipping test that may hang.")
@pytest.mark.timeout(20)
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 1,
@@ -747,53 +841,45 @@ def test_ray_wait_dead_actor(ray_start_cluster):
actors = [Actor.remote() for _ in range(num_nodes)]
ray.get([actor.ping.remote() for actor in actors])
# Ping the actors and make sure the tasks complete.
ping_ids = [actor.ping.remote() for actor in actors]
ray.get(ping_ids)
# Evict the result from the node that we're about to kill.
remote_node = cluster.list_all_nodes()[-1]
remote_ping_id = None
for i, actor in enumerate(actors):
if ray.get(actor.node_id.remote()) == remote_node.unique_id:
remote_ping_id = ping_ids[i]
ray.internal.free([remote_ping_id], local_only=True)
cluster.remove_node(remote_node)
def actor_dead():
# Ping the actors and make sure the tasks complete.
ping_ids = [actor.ping.remote() for actor in actors]
unready = ping_ids[:]
while unready:
_, unready = ray.wait(unready, timeout=0)
time.sleep(1)
# Repeatedly call ray.wait until the exception for the dead actor is
# received.
unready = ping_ids[:]
while unready:
_, unready = ray.wait(unready, timeout=0)
time.sleep(1)
try:
ray.get(ping_ids)
return False
except ray.exceptions.RayActorError:
return True
with pytest.raises(ray.exceptions.RayActorError):
ray.get(ping_ids)
# Kill a node.
cluster.remove_node(cluster.list_all_nodes()[-1])
# Repeatedly submit tasks and call ray.wait until the exception for the
# dead actor is received.
assert wait_for_condition(actor_dead)
# Evict the result from the dead node.
ray.internal.free([remote_ping_id], local_only=True)
# Create an actor on the local node that will call ray.wait in a loop.
head_node_resource = "HEAD_NODE"
ray.experimental.set_resource(head_node_resource, 1)
@ray.remote(num_cpus=0, resources={head_node_resource: 1})
class ParentActor:
def __init__(self, ping_ids):
self.unready = ping_ids
def __init__(self):
pass
def wait(self):
_, self.unready = ray.wait(self.unready, timeout=0)
return len(self.unready) == 0
return actor_dead()
def ping(self):
return
# Repeatedly call ray.wait through the local actor until the exception for
# the dead actor is received.
parent_actor = ParentActor.remote(ping_ids)
ray.get(parent_actor.ping.remote())
failure_detected = False
while not failure_detected:
failure_detected = ray.get(parent_actor.wait.remote())
parent_actor = ParentActor.remote()
assert wait_for_condition(lambda: ray.get(parent_actor.wait.remote()))
if __name__ == "__main__":
+16 -1
View File
@@ -1730,6 +1730,7 @@ def make_decorator(num_return_vals=None,
max_calls=None,
max_retries=None,
max_restarts=None,
max_task_retries=None,
worker=None):
def decorator(function_or_class):
if (inspect.isfunction(function_or_class)
@@ -1738,6 +1739,9 @@ def make_decorator(num_return_vals=None,
if max_restarts is not None:
raise ValueError("The keyword 'max_restarts' is not "
"allowed for remote functions.")
if max_task_retries is not None:
raise ValueError("The keyword 'max_task_retries' is not "
"allowed for remote functions.")
return ray.remote_function.RemoteFunction(
Language.PYTHON, function_or_class, None, num_cpus, num_gpus,
@@ -1754,7 +1758,7 @@ def make_decorator(num_return_vals=None,
return ray.actor.make_actor(function_or_class, num_cpus, num_gpus,
memory, object_store_memory, resources,
max_restarts)
max_restarts, max_task_retries)
raise TypeError("The @ray.remote decorator must be applied to "
"either a function or to a class.")
@@ -1801,6 +1805,14 @@ def remote(*args, **kwargs):
unexpectedly. The minimum valid value is 0 (default), which indicates
that the actor doesn't need to be restarted. A value of -1
indicates that an actor should be restarted indefinitely.
* **max_task_retries**: Only for *actors*. How many times to retry an actor
task if the task fails due to a system error, e.g., the actor has died.
If set to -1, the system will retry the failed task until the task
succeeds, or the actor has reached its max_restarts limit. If set to n >
0, the system will retry the failed task up to n times, after which the
task will throw a `RayActorError` exception upon `ray.get`. Note that
Python exceptions are not considered system errors and will not trigger
retries.
* **max_retries**: Only for *remote functions*. This specifies the maximum
number of times that the remote function should be rerun when the worker
process executing it crashes unexpectedly. The minimum valid value is 0,
@@ -1867,6 +1879,7 @@ def remote(*args, **kwargs):
"resources",
"max_calls",
"max_restarts",
"max_task_retries",
"max_retries",
], error_string
@@ -1885,6 +1898,7 @@ def remote(*args, **kwargs):
num_return_vals = kwargs.get("num_return_vals")
max_calls = kwargs.get("max_calls")
max_restarts = kwargs.get("max_restarts")
max_task_retries = kwargs.get("max_task_retries")
memory = kwargs.get("memory")
object_store_memory = kwargs.get("object_store_memory")
max_retries = kwargs.get("max_retries")
@@ -1898,5 +1912,6 @@ def remote(*args, **kwargs):
resources=resources,
max_calls=max_calls,
max_restarts=max_restarts,
max_task_retries=max_task_retries,
max_retries=max_retries,
worker=worker)
+4
View File
@@ -209,6 +209,10 @@ const rpc::Address &TaskSpecification::CallerAddress() const {
return message_->caller_address();
}
WorkerID TaskSpecification::CallerWorkerId() const {
return WorkerID::FromBinary(message_->caller_address().worker_id());
}
// === Below are getter methods specific to actor tasks.
ActorID TaskSpecification::ActorId() const {
+2
View File
@@ -151,6 +151,8 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
const rpc::Address &CallerAddress() const;
WorkerID CallerWorkerId() const;
uint64_t ActorCounter() const;
ObjectID ActorCreationDummyObjectId() const;
+4 -9
View File
@@ -23,7 +23,7 @@ ray::rpc::ActorHandle CreateInnerActorHandle(
const ray::rpc::Address &owner_address, const class JobID &job_id,
const ObjectID &initial_cursor, const Language actor_language,
const ray::FunctionDescriptor &actor_creation_task_function_descriptor,
const std::string &extension_data) {
const std::string &extension_data, int64_t max_task_retries) {
ray::rpc::ActorHandle inner;
inner.set_actor_id(actor_id.Data(), actor_id.Size());
inner.set_owner_id(owner_id.Binary());
@@ -34,6 +34,7 @@ ray::rpc::ActorHandle CreateInnerActorHandle(
actor_creation_task_function_descriptor->GetMessage();
inner.set_actor_cursor(initial_cursor.Binary());
inner.set_extension_data(extension_data);
inner.set_max_task_retries(max_task_retries);
return inner;
}
@@ -69,10 +70,10 @@ ActorHandle::ActorHandle(
const rpc::Address &owner_address, const class JobID &job_id,
const ObjectID &initial_cursor, const Language actor_language,
const ray::FunctionDescriptor &actor_creation_task_function_descriptor,
const std::string &extension_data)
const std::string &extension_data, int64_t max_task_retries)
: ActorHandle(CreateInnerActorHandle(
actor_id, owner_id, owner_address, job_id, initial_cursor, actor_language,
actor_creation_task_function_descriptor, extension_data)) {}
actor_creation_task_function_descriptor, extension_data, max_task_retries)) {}
ActorHandle::ActorHandle(const std::string &serialized)
: ActorHandle(CreateInnerActorHandleFromString(serialized)) {}
@@ -95,10 +96,4 @@ void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_
void ActorHandle::Serialize(std::string *output) { inner_.SerializeToString(output); }
void ActorHandle::Reset() {
absl::MutexLock guard(&mutex_);
task_counter_ = 0;
actor_cursor_ = ObjectID::FromBinary(inner_.actor_cursor());
}
} // namespace ray
+2 -24
View File
@@ -37,7 +37,7 @@ class ActorHandle {
const rpc::Address &owner_address, const JobID &job_id,
const ObjectID &initial_cursor, const Language actor_language,
const ray::FunctionDescriptor &actor_creation_task_function_descriptor,
const std::string &extension_data);
const std::string &extension_data, int64_t max_task_retries);
/// Constructs an ActorHandle from a serialized string.
ActorHandle(const std::string &serialized);
@@ -68,34 +68,12 @@ class ActorHandle {
void Serialize(std::string *output);
/// Reset the handle state next task submitted.
///
/// This should be called whenever the actor is restarted, since the new
/// instance of the actor does not have the previous sequence number.
/// TODO: We should also move the other actor state (status and IP) inside
/// ActorHandle and reset them in this method.
void Reset();
// Mark the actor handle as dead.
void MarkDead() {
absl::MutexLock lock(&mutex_);
state_ = rpc::ActorTableData::DEAD;
}
// Returns whether the actor is known to be dead.
bool IsDead() const {
absl::MutexLock lock(&mutex_);
return state_ == rpc::ActorTableData::DEAD;
}
int64_t MaxTaskRetries() const { return inner_.max_task_retries(); }
private:
// Protobuf-defined persistent state of the actor handle.
const ray::rpc::ActorHandle inner_;
/// The actor's state (alive or dead). This defaults to ALIVE. Once marked
/// DEAD, the actor handle can never go back to being ALIVE.
rpc::ActorTableData::ActorState state_ GUARDED_BY(mutex_) = rpc::ActorTableData::ALIVE;
/// The unique id of the dummy object returned by the previous task.
/// TODO: This can be removed once we schedule actor tasks by task counter
/// only.
+10 -4
View File
@@ -111,12 +111,14 @@ struct TaskOptions {
/// Options for actor creation tasks.
struct ActorCreationOptions {
ActorCreationOptions() {}
ActorCreationOptions(int64_t max_restarts, int max_concurrency,
ActorCreationOptions(int64_t max_restarts, int64_t max_task_retries,
int max_concurrency,
const std::unordered_map<std::string, double> &resources,
const std::unordered_map<std::string, double> &placement_resources,
const std::vector<std::string> &dynamic_worker_options,
bool is_detached, std::string &name, bool is_asyncio)
: max_restarts(max_restarts),
max_task_retries(max_task_retries),
max_concurrency(max_concurrency),
resources(resources),
placement_resources(placement_resources),
@@ -125,10 +127,14 @@ struct ActorCreationOptions {
name(name),
is_asyncio(is_asyncio){};
/// Maximum number of times that the actor should be reconstructed when it dies
/// unexpectedly. A value of -1 indicates infinite restarts.
/// If it's 0, the actor won't be restarted.
/// Maximum number of times that the actor should be restarted if it dies
/// unexpectedly. A value of -1 indicates infinite restarts. If it's 0, the
/// actor won't be restarted.
const int64_t max_restarts = 0;
/// Maximum number of times that individual tasks can be retried at the
/// actor, if the actor dies unexpectedly. If -1, then the task may be
/// retried infinitely many times.
const int64_t max_task_retries = 0;
/// The max number of concurrent tasks to run on this direct call actor.
const int max_concurrency = 1;
/// Resources required by the whole lifetime of this actor.
+14 -44
View File
@@ -428,7 +428,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
}
direct_actor_submitter_ = std::unique_ptr<CoreWorkerDirectActorTaskSubmitter>(
new CoreWorkerDirectActorTaskSubmitter(rpc_address_, client_factory, memory_store_,
new CoreWorkerDirectActorTaskSubmitter(client_factory, memory_store_,
task_manager_));
direct_task_submitter_ =
@@ -604,20 +604,9 @@ const WorkerID &CoreWorker::GetWorkerID() const { return worker_context_.GetWork
void CoreWorker::SetCurrentTaskId(const TaskID &task_id) {
worker_context_.SetCurrentTaskId(task_id);
bool not_actor_task = false;
{
absl::MutexLock lock(&mutex_);
main_thread_task_id_ = task_id;
not_actor_task = actor_id_.IsNil();
}
if (not_actor_task && task_id.IsNil()) {
absl::MutexLock lock(&actor_handles_mutex_);
// Reset the seqnos so that for the next task it start off at 0.
for (const auto &handle : actor_handles_) {
handle.second->Reset();
}
// TODO(ekl) we can't unsubscribe to actor notifications here due to
// https://github.com/ray-project/ray/pull/6885
}
}
@@ -676,7 +665,12 @@ void CoreWorker::InternalHeartbeat(const boost::system::error_code &error) {
absl::MutexLock lock(&mutex_);
while (!to_resubmit_.empty() && current_time_ms() > to_resubmit_.front().first) {
RAY_CHECK_OK(direct_task_submitter_->SubmitTask(to_resubmit_.front().second));
auto &spec = to_resubmit_.front().second;
if (spec.IsActorTask()) {
RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(spec));
} else {
RAY_CHECK_OK(direct_task_submitter_->SubmitTask(spec));
}
to_resubmit_.pop_front();
}
internal_timer_.expires_at(internal_timer_.expiry() +
@@ -1156,7 +1150,8 @@ Status CoreWorker::CreateActor(const RayFunction &function,
// WaitForActorOutOfScopeRequest.
std::unique_ptr<ActorHandle> actor_handle(new ActorHandle(
actor_id, GetCallerId(), rpc_address_, job_id, /*actor_cursor=*/return_ids[0],
function.GetLanguage(), function.GetFunctionDescriptor(), extension_data));
function.GetLanguage(), function.GetFunctionDescriptor(), extension_data,
actor_creation_options.max_task_retries));
RAY_CHECK(AddActorHandle(std::move(actor_handle),
/*is_owner_handle=*/!actor_creation_options.is_detached))
<< "Actor " << actor_id << " already exists";
@@ -1215,14 +1210,8 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f
ExecuteTaskLocalMode(task_spec, actor_id);
} else {
task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec,
CurrentCallSite());
if (actor_handle->IsDead()) {
auto status = Status::IOError("sent task to dead actor");
task_manager_->PendingTaskFailed(task_spec.TaskId(), rpc::ErrorType::ACTOR_DIED,
&status);
} else {
status = direct_actor_submitter_->SubmitTask(task_spec);
}
CurrentCallSite(), actor_handle->MaxTaskRetries());
status = direct_actor_submitter_->SubmitTask(task_spec);
}
return status;
}
@@ -1299,6 +1288,7 @@ bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle,
}
reference_counter_->AddLocalReference(actor_creation_return_id, CurrentCallSite());
direct_actor_submitter_->AddActorQueueIfNotExists(actor_id);
bool inserted;
{
@@ -1313,19 +1303,9 @@ bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle,
if (actor_data.state() == gcs::ActorTableData::PENDING) {
// The actor is being created and not yet ready, just ignore!
} else if (actor_data.state() == gcs::ActorTableData::RESTARTING) {
absl::MutexLock lock(&actor_handles_mutex_);
auto it = actor_handles_.find(actor_id);
RAY_CHECK(it != actor_handles_.end());
// We have to reset the actor handle since the next instance of the
// actor will not have the last sequence number that we sent.
it->second->Reset();
direct_actor_submitter_->DisconnectActor(actor_id, false);
} else if (actor_data.state() == gcs::ActorTableData::DEAD) {
direct_actor_submitter_->DisconnectActor(actor_id, true);
ActorHandle *actor_handle = nullptr;
RAY_CHECK_OK(GetActorHandle(actor_id, &actor_handle));
actor_handle->MarkDead();
// We cannot erase the actor handle here because clients can still
// submit tasks to dead actors. This also means we defer unsubscription,
// otherwise we crash when bulk unsubscribing all actor handles.
@@ -1369,8 +1349,8 @@ bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle,
// language frontend receives another ref to the same actor. In this
// case, we must remember the last task counter that we sent to the
// actor.
// TODO(swang): Unsubscribe from the actor table once all local refs
// to the actor have gone out of scope.
// TODO(ekl) we can't unsubscribe to actor notifications here due to
// https://github.com/ray-project/ray/pull/6885
auto callback = actor_out_of_scope_callbacks_.extract(actor_id);
if (callback) {
callback.mapped()(actor_id);
@@ -1555,16 +1535,11 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
return_ids.pop_back();
task_type = TaskType::ACTOR_CREATION_TASK;
SetActorId(task_spec.ActorCreationId());
// For an actor, set the timestamp as the time its creation task starts execution.
SetCallerCreationTimestamp();
RAY_LOG(INFO) << "Creating actor: " << task_spec.ActorCreationId();
} else if (task_spec.IsActorTask()) {
RAY_CHECK(return_ids.size() > 0);
return_ids.pop_back();
task_type = TaskType::ACTOR_TASK;
} else {
// For a non-actor task, set the timestamp as the time it starts execution.
SetCallerCreationTimestamp();
}
// Because we support concurrent actor calls, we need to update the
@@ -2080,9 +2055,4 @@ void CoreWorker::SetActorTitle(const std::string &title) {
actor_title_ = title;
}
void CoreWorker::SetCallerCreationTimestamp() {
absl::MutexLock lock(&mutex_);
direct_actor_submitter_->SetCallerCreationTimestamp(current_sys_time_ms());
}
} // namespace ray
@@ -110,14 +110,16 @@ inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
}
std::string name = "";
ray::ActorCreationOptions actor_creation_options{max_restarts,
static_cast<int>(max_concurrency),
resources,
resources,
dynamic_worker_options,
/*is_detached=*/false,
name,
/*is_asyncio=*/false};
ray::ActorCreationOptions actor_creation_options{
max_restarts,
0, // TODO: Allow setting max_task_retries from Java.
static_cast<int>(max_concurrency),
resources,
resources,
dynamic_worker_options,
/*is_detached=*/false,
name,
/*is_asyncio=*/false};
return actor_creation_options;
}
+15 -4
View File
@@ -28,7 +28,8 @@ void TaskManager::AddPendingTask(const TaskID &caller_id,
const rpc::Address &caller_address,
const TaskSpecification &spec,
const std::string &call_site, int max_retries) {
RAY_LOG(DEBUG) << "Adding pending task " << spec.TaskId();
RAY_LOG(DEBUG) << "Adding pending task " << spec.TaskId() << " with " << max_retries
<< " retries";
// Add references for the dependencies to the task.
std::vector<ObjectID> task_deps;
@@ -90,12 +91,18 @@ Status TaskManager::ResubmitTask(const TaskID &task_id,
if (it == submissible_tasks_.end()) {
return Status::Invalid("Task spec missing");
}
if (it->second.spec.IsActorTask()) {
return Status::Invalid("Cannot reconstruct objects returned by actors");
}
if (!it->second.pending) {
resubmit = true;
it->second.pending = true;
RAY_CHECK(it->second.num_retries_left != 0);
it->second.num_retries_left--;
if (it->second.num_retries_left > 0) {
it->second.num_retries_left--;
} else {
RAY_CHECK(it->second.num_retries_left == -1);
}
spec = it->second.spec;
}
}
@@ -256,7 +263,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
ShutdownIfNeeded();
}
void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
bool TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
Status *status) {
// Note that this might be the __ray_terminate__ task, so we don't log
// loudly with ERROR here.
@@ -286,6 +293,7 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_
}
}
bool will_retry = false;
// We should not hold the lock during these calls because they may trigger
// callbacks in this or other classes.
if (num_retries_left != 0) {
@@ -294,6 +302,7 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_
RAY_LOG(ERROR) << retries_str << " retries left for task " << spec.TaskId()
<< ", attempting to resubmit.";
retry_task_callback_(spec, /*delay=*/true);
will_retry = true;
} else {
// Throttled logging of task failure errors.
{
@@ -323,6 +332,8 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_
}
ShutdownIfNeeded();
return will_retry;
}
void TaskManager::ShutdownIfNeeded() {
+3 -2
View File
@@ -32,7 +32,7 @@ class TaskFinisherInterface {
virtual void CompletePendingTask(const TaskID &task_id, const rpc::PushTaskReply &reply,
const rpc::Address &actor_addr) = 0;
virtual void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
virtual bool PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
Status *status = nullptr) = 0;
virtual void OnTaskDependenciesInlined(
@@ -116,7 +116,8 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// \param[in] task_id ID of the pending task.
/// \param[in] error_type The type of the specific error.
/// \param[in] status Optional status message.
void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
/// \return Whether the task will be retried or not.
bool PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
Status *status = nullptr) override;
/// A task's dependencies were inlined in the task spec. This will decrement
+10 -7
View File
@@ -73,8 +73,9 @@ ActorID CreateActorHelper(std::unordered_map<std::string, double> &resources,
std::string name = "";
ActorCreationOptions actor_options{
max_restarts,
/*max_concurrency*/ 1, resources, resources, {},
/*is_detached=*/false, name, /*is_asyncio=*/false};
/*max_task_retries=*/0,
/*max_concurrency*/ 1, resources, resources, {},
/*is_detached=*/false, name, /*is_asyncio=*/false};
// Create an actor.
ActorID actor_id;
@@ -644,6 +645,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
std::unordered_map<std::string, double> resources;
std::string name = "";
ActorCreationOptions actor_options{0,
0,
1,
resources,
resources,
@@ -654,7 +656,8 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
const auto job_id = NextJobId();
ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1),
TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
function.GetLanguage(), function.GetFunctionDescriptor(), "");
function.GetLanguage(), function.GetFunctionDescriptor(), "",
0);
// Manually create `num_tasks` task specs, and for each of them create a
// `PushTaskRequest`, this is to batch performance of TaskSpec
@@ -763,10 +766,10 @@ TEST_F(ZeroNodeTest, TestWorkerContext) {
TEST_F(ZeroNodeTest, TestActorHandle) {
// Test actor handle serialization and deserialization round trip.
JobID job_id = NextJobId();
ActorHandle original(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 0),
TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(),
Language::PYTHON,
ray::FunctionDescriptorBuilder::BuildPython("", "", "", ""), "");
ActorHandle original(
ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 0), TaskID::Nil(),
rpc::Address(), job_id, ObjectID::FromRandom(), Language::PYTHON,
ray::FunctionDescriptorBuilder::BuildPython("", "", "", ""), "", 0);
std::string output;
original.Serialize(&output);
ActorHandle deserialized(output);
@@ -24,16 +24,45 @@
namespace ray {
using ::testing::_;
using ::testing::ElementsAre;
using ::testing::Return;
TaskSpecification CreateActorTaskHelper(ActorID actor_id, WorkerID caller_worker_id,
int64_t counter,
TaskID caller_id = TaskID::Nil()) {
TaskSpecification task;
task.GetMutableMessage().set_task_id(TaskID::Nil().Binary());
task.GetMutableMessage().set_caller_id(caller_id.Binary());
task.GetMutableMessage().set_type(TaskType::ACTOR_TASK);
task.GetMutableMessage().mutable_caller_address()->set_worker_id(
caller_worker_id.Binary());
task.GetMutableMessage().mutable_actor_task_spec()->set_actor_id(actor_id.Binary());
task.GetMutableMessage().mutable_actor_task_spec()->set_actor_counter(counter);
task.GetMutableMessage().set_num_returns(1);
return task;
}
rpc::PushTaskRequest CreatePushTaskRequestHelper(ActorID actor_id, int64_t counter,
WorkerID caller_worker_id,
TaskID caller_id,
int64_t caller_timestamp) {
auto task_spec = CreateActorTaskHelper(actor_id, caller_worker_id, counter, caller_id);
rpc::PushTaskRequest request;
request.mutable_task_spec()->CopyFrom(task_spec.GetMessage());
request.set_sequence_number(request.task_spec().actor_task_spec().actor_counter());
request.set_client_processed_up_to(-1);
return request;
}
class MockWorkerClient : public rpc::CoreWorkerClientInterface {
public:
const rpc::Address &Addr() const override { return addr; }
ray::Status PushActorTask(
std::unique_ptr<rpc::PushTaskRequest> request,
std::unique_ptr<rpc::PushTaskRequest> request, bool skip_queue,
const rpc::ClientCallback<rpc::PushTaskReply> &callback) override {
RAY_CHECK(counter == request->task_spec().actor_task_spec().actor_counter());
counter++;
received_seq_nos.push_back(request->sequence_number());
callbacks.push_back(callback);
return Status::OK();
}
@@ -50,7 +79,7 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface {
rpc::Address addr;
std::list<rpc::ClientCallback<rpc::PushTaskReply>> callbacks;
uint64_t counter = 0;
std::vector<uint64_t> received_seq_nos;
};
class MockTaskFinisher : public TaskFinisherInterface {
@@ -60,7 +89,7 @@ class MockTaskFinisher : public TaskFinisherInterface {
MOCK_METHOD3(CompletePendingTask, void(const TaskID &, const rpc::PushTaskReply &,
const rpc::Address &addr));
MOCK_METHOD3(PendingTaskFailed,
void(const TaskID &task_id, rpc::ErrorType error_type, Status *status));
bool(const TaskID &task_id, rpc::ErrorType error_type, Status *status));
MOCK_METHOD2(OnTaskDependenciesInlined,
void(const std::vector<ObjectID> &, const std::vector<ObjectID> &));
@@ -68,25 +97,15 @@ class MockTaskFinisher : public TaskFinisherInterface {
MOCK_METHOD1(MarkTaskCanceled, bool(const TaskID &task_id));
};
TaskSpecification CreateActorTaskHelper(ActorID actor_id, int64_t counter) {
TaskSpecification task;
task.GetMutableMessage().set_task_id(TaskID::Nil().Binary());
task.GetMutableMessage().set_type(TaskType::ACTOR_TASK);
task.GetMutableMessage().mutable_actor_task_spec()->set_actor_id(actor_id.Binary());
task.GetMutableMessage().mutable_actor_task_spec()->set_actor_counter(counter);
return task;
}
class DirectActorSubmitterTest : public ::testing::Test {
public:
DirectActorSubmitterTest()
: worker_client_(std::shared_ptr<MockWorkerClient>(new MockWorkerClient())),
store_(std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore())),
task_finisher_(std::make_shared<MockTaskFinisher>()),
submitter_(address_, [&](const rpc::Address &addr) { return worker_client_; },
store_, task_finisher_) {}
submitter_([&](const rpc::Address &addr) { return worker_client_; }, store_,
task_finisher_) {}
rpc::Address address_;
std::shared_ptr<MockWorkerClient> worker_client_;
std::shared_ptr<CoreWorkerMemoryStore> store_;
std::shared_ptr<MockTaskFinisher> task_finisher_;
@@ -95,16 +114,19 @@ class DirectActorSubmitterTest : public ::testing::Test {
TEST_F(DirectActorSubmitterTest, TestSubmitTask) {
rpc::Address addr;
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
auto task = CreateActorTaskHelper(actor_id, 0);
auto task = CreateActorTaskHelper(actor_id, worker_id, 0);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
ASSERT_EQ(worker_client_->callbacks.size(), 0);
submitter_.ConnectActor(actor_id, addr);
ASSERT_EQ(worker_client_->callbacks.size(), 1);
task = CreateActorTaskHelper(actor_id, 1);
task = CreateActorTaskHelper(actor_id, worker_id, 1);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
ASSERT_EQ(worker_client_->callbacks.size(), 2);
@@ -114,20 +136,24 @@ TEST_F(DirectActorSubmitterTest, TestSubmitTask) {
while (!worker_client_->callbacks.empty()) {
ASSERT_TRUE(worker_client_->ReplyPushTask());
}
ASSERT_THAT(worker_client_->received_seq_nos, ElementsAre(0, 1));
}
TEST_F(DirectActorSubmitterTest, TestDependencies) {
rpc::Address addr;
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
submitter_.ConnectActor(actor_id, addr);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
// Create two tasks for the actor with different arguments.
ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
auto task1 = CreateActorTaskHelper(actor_id, 0);
auto task1 = CreateActorTaskHelper(actor_id, worker_id, 0);
task1.GetMutableMessage().add_args()->add_object_ids(obj1.Binary());
auto task2 = CreateActorTaskHelper(actor_id, 1);
auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1);
task2.GetMutableMessage().add_args()->add_object_ids(obj2.Binary());
// Neither task can be submitted yet because they are still waiting on
@@ -142,20 +168,24 @@ TEST_F(DirectActorSubmitterTest, TestDependencies) {
ASSERT_EQ(worker_client_->callbacks.size(), 1);
ASSERT_TRUE(store_->Put(*data, obj2));
ASSERT_EQ(worker_client_->callbacks.size(), 2);
ASSERT_THAT(worker_client_->received_seq_nos, ElementsAre(0, 1));
}
TEST_F(DirectActorSubmitterTest, TestOutOfOrderDependencies) {
rpc::Address addr;
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
submitter_.ConnectActor(actor_id, addr);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
// Create two tasks for the actor with different arguments.
ObjectID obj1 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
ObjectID obj2 = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
auto task1 = CreateActorTaskHelper(actor_id, 0);
auto task1 = CreateActorTaskHelper(actor_id, worker_id, 0);
task1.GetMutableMessage().add_args()->add_object_ids(obj1.Binary());
auto task2 = CreateActorTaskHelper(actor_id, 1);
auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1);
task2.GetMutableMessage().add_args()->add_object_ids(obj2.Binary());
// Neither task can be submitted yet because they are still waiting on
@@ -171,28 +201,131 @@ TEST_F(DirectActorSubmitterTest, TestOutOfOrderDependencies) {
ASSERT_EQ(worker_client_->callbacks.size(), 0);
ASSERT_TRUE(store_->Put(*data, obj1));
ASSERT_EQ(worker_client_->callbacks.size(), 2);
ASSERT_THAT(worker_client_->received_seq_nos, ElementsAre(0, 1));
}
TEST_F(DirectActorSubmitterTest, TestActorFailure) {
TEST_F(DirectActorSubmitterTest, TestActorDead) {
rpc::Address addr;
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
gcs::ActorTableData actor_data;
submitter_.ConnectActor(actor_id, addr);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
// Create two tasks for the actor.
auto task1 = CreateActorTaskHelper(actor_id, 0);
auto task2 = CreateActorTaskHelper(actor_id, 1);
// Create two tasks for the actor. One depends on an object that is not yet available.
auto task1 = CreateActorTaskHelper(actor_id, worker_id, 0);
ObjectID obj = ObjectID::FromRandom().WithTransportType(TaskTransportType::DIRECT);
auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1);
task2.GetMutableMessage().add_args()->add_object_ids(obj.Binary());
ASSERT_TRUE(submitter_.SubmitTask(task1).ok());
ASSERT_TRUE(submitter_.SubmitTask(task2).ok());
ASSERT_EQ(worker_client_->callbacks.size(), 2);
ASSERT_EQ(worker_client_->callbacks.size(), 1);
// Simulate the actor dying. All submitted tasks should get failed.
EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _, _)).Times(2);
// Simulate the actor dying. All in-flight tasks should get failed.
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task1.TaskId(), _, _)).Times(1);
EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _)).Times(0);
while (!worker_client_->callbacks.empty()) {
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError("")));
}
EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _, _)).Times(0);
submitter_.DisconnectActor(actor_id, /*dead=*/false);
// Actor marked as dead. All queued tasks should get failed.
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _)).Times(1);
submitter_.DisconnectActor(actor_id, /*dead=*/true);
}
TEST_F(DirectActorSubmitterTest, TestActorRestartNoRetry) {
rpc::Address addr;
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
gcs::ActorTableData actor_data;
submitter_.ConnectActor(actor_id, addr);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
// Create four tasks for the actor.
auto task1 = CreateActorTaskHelper(actor_id, worker_id, 0);
auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1);
auto task3 = CreateActorTaskHelper(actor_id, worker_id, 2);
auto task4 = CreateActorTaskHelper(actor_id, worker_id, 3);
// Submit three tasks.
ASSERT_TRUE(submitter_.SubmitTask(task1).ok());
ASSERT_TRUE(submitter_.SubmitTask(task2).ok());
ASSERT_TRUE(submitter_.SubmitTask(task3).ok());
EXPECT_CALL(*task_finisher_, CompletePendingTask(task1.TaskId(), _, _)).Times(2);
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _)).Times(2);
// First task finishes. Second task fails.
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError("")));
// Simulate the actor failing.
submitter_.DisconnectActor(actor_id, /*dead=*/false);
// Third task fails after the actor is disconnected. It should not get
// retried.
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError("")));
// Actor gets restarted.
submitter_.ConnectActor(actor_id, addr);
ASSERT_TRUE(submitter_.SubmitTask(task4).ok());
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
ASSERT_TRUE(worker_client_->callbacks.empty());
// Actor counter restarts at 0 after the actor is restarted.
ASSERT_THAT(worker_client_->received_seq_nos, ElementsAre(0, 1, 2, 0));
}
TEST_F(DirectActorSubmitterTest, TestActorRestartRetry) {
rpc::Address addr;
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
gcs::ActorTableData actor_data;
submitter_.ConnectActor(actor_id, addr);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
// Create four tasks for the actor.
auto task1 = CreateActorTaskHelper(actor_id, worker_id, 0);
auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1);
auto task3 = CreateActorTaskHelper(actor_id, worker_id, 2);
auto task4 = CreateActorTaskHelper(actor_id, worker_id, 3);
// Submit three tasks.
ASSERT_TRUE(submitter_.SubmitTask(task1).ok());
ASSERT_TRUE(submitter_.SubmitTask(task2).ok());
ASSERT_TRUE(submitter_.SubmitTask(task3).ok());
// All tasks will eventually finish.
EXPECT_CALL(*task_finisher_, CompletePendingTask(task1.TaskId(), _, _)).Times(4);
// Tasks 2 and 3 will be retried.
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _))
.Times(2)
.WillRepeatedly(Return(true));
// First task finishes. Second task fails.
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError("")));
// Simulate the actor failing.
submitter_.DisconnectActor(actor_id, /*dead=*/false);
// Third task fails after the actor is disconnected.
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError("")));
// Actor gets restarted.
submitter_.ConnectActor(actor_id, addr);
// A new task is submitted.
ASSERT_TRUE(submitter_.SubmitTask(task4).ok());
// Tasks 2 and 3 get retried.
ASSERT_TRUE(submitter_.SubmitTask(task2).ok());
ASSERT_TRUE(submitter_.SubmitTask(task3).ok());
while (!worker_client_->callbacks.empty()) {
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
}
// Actor counter restarts at 0 after the actor is restarted. New task cannot
// execute until after tasks 2 and 3 are re-executed.
ASSERT_THAT(worker_client_->received_seq_nos, ElementsAre(0, 1, 2, 2, 0, 1));
}
class MockDependencyWaiterInterface : public DependencyWaiterInterface {
@@ -203,35 +336,6 @@ class MockDependencyWaiterInterface : public DependencyWaiterInterface {
}
};
TaskSpecification CreateActorTaskHelper(ActorID actor_id, int64_t counter,
TaskID caller_id) {
TaskSpecification task;
task.GetMutableMessage().set_task_id(TaskID::Nil().Binary());
task.GetMutableMessage().set_caller_id(caller_id.Binary());
task.GetMutableMessage().set_type(TaskType::ACTOR_TASK);
task.GetMutableMessage().mutable_actor_task_spec()->set_actor_id(actor_id.Binary());
task.GetMutableMessage().mutable_actor_task_spec()->set_actor_counter(counter);
task.GetMutableMessage().set_num_returns(1);
return task;
}
rpc::PushTaskRequest CreatePushTaskRequestHelper(ActorID actor_id, int64_t counter,
WorkerID caller_worker_id,
TaskID caller_id,
int64_t caller_timestamp) {
auto task_spec = CreateActorTaskHelper(actor_id, counter, caller_id);
rpc::Address rpc_address;
rpc_address.set_worker_id(caller_worker_id.Binary());
rpc::PushTaskRequest request;
request.mutable_caller_address()->CopyFrom(rpc_address);
request.mutable_task_spec()->CopyFrom(task_spec.GetMessage());
request.set_caller_version(caller_timestamp);
request.set_sequence_number(request.task_spec().actor_task_spec().actor_counter());
request.set_client_processed_up_to(-1);
return request;
}
class MockWorkerContext : public WorkerContext {
public:
MockWorkerContext(WorkerType worker_type, const JobID &job_id)
@@ -278,7 +382,6 @@ class DirectActorReceiverTest : public ::testing::Test {
};
TEST_F(DirectActorReceiverTest, TestNewTaskFromDifferentWorker) {
rpc::Address addr;
TaskID current_task_id = TaskID::Nil();
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
WorkerID worker_id = WorkerID::FromRandom();
@@ -319,7 +422,7 @@ TEST_F(DirectActorReceiverTest, TestNewTaskFromDifferentWorker) {
receiver_->HandlePushTask(request, &reply, reply_callback);
}
// Create another request with the same caller id, but a differnt worker id,
// Create another request with the same caller id, but a different worker id,
// and a newer timestamp. This simulates caller reconstruction.
// Note that here the task request still has counter 0, which should be
// ignored normally, but here it's from a different worker and with a newer
@@ -72,9 +72,10 @@ class MockTaskFinisher : public TaskFinisherInterface {
num_tasks_complete++;
}
void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
bool PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
Status *status) override {
num_tasks_failed++;
return true;
}
void OnTaskDependenciesInlined(const std::vector<ObjectID> &inlined_dependency_ids,
@@ -22,6 +22,14 @@ using ray::rpc::ActorTableData;
namespace ray {
void CoreWorkerDirectActorTaskSubmitter::AddActorQueueIfNotExists(
const ActorID &actor_id) {
absl::MutexLock lock(&mu_);
// No need to check whether the insert was successful, since it is possible
// for this worker to have multiple references to the same actor.
client_queues_.emplace(actor_id, ClientQueue());
}
void CoreWorkerDirectActorTaskSubmitter::KillActor(const ActorID &actor_id,
bool force_kill, bool no_restart) {
absl::MutexLock lock(&mu_);
@@ -29,72 +37,72 @@ void CoreWorkerDirectActorTaskSubmitter::KillActor(const ActorID &actor_id,
request.set_intended_actor_id(actor_id.Binary());
request.set_force_kill(force_kill);
request.set_no_restart(no_restart);
auto inserted = pending_force_kills_.emplace(actor_id, request);
if (!inserted.second && force_kill) {
auto it = client_queues_.find(actor_id);
// The language frontend can only kill actors that it has a reference to.
RAY_CHECK(it != client_queues_.end());
if (!it->second.pending_force_kill) {
it->second.pending_force_kill = request;
} else if (force_kill) {
// Overwrite the previous request to kill the actor if the new request is a
// force kill.
inserted.first->second.set_force_kill(true);
it->second.pending_force_kill->set_force_kill(true);
if (no_restart) {
// Overwrite the previous request to disable reconstruction if the new request's
// Overwrite the previous request to disable restart if the new request's
// no_restart flag is set to true.
inserted.first->second.set_no_restart(true);
it->second.pending_force_kill->set_no_restart(true);
}
}
auto it = rpc_clients_.find(actor_id);
if (it == rpc_clients_.end()) {
// Actor is not yet created, or is being restarted, cache the request
// and submit after actor is alive.
// TODO(zhijunfu): it might be possible for a user to specify an invalid
// actor handle (e.g. from unpickling), in that case it might be desirable
// to have a timeout to mark it as invalid if it doesn't show up in the
// specified time.
RAY_LOG(DEBUG) << "Actor " << actor_id << " is not yet created.";
} else {
SendPendingTasks(actor_id);
}
SendPendingTasks(actor_id);
}
Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
RAY_LOG(DEBUG) << "Submitting task " << task_spec.TaskId();
RAY_CHECK(task_spec.IsActorTask());
// We must fix the send order prior to resolving dependencies, which may complete
// out of order. This ensures we preserve the client-side send order.
int64_t send_pos = -1;
bool task_queued = false;
uint64_t send_pos = 0;
{
absl::MutexLock lock(&mu_);
send_pos = next_send_position_to_assign_[task_spec.ActorId()]++;
auto queue = client_queues_.find(task_spec.ActorId());
RAY_CHECK(queue != client_queues_.end());
if (queue->second.state == rpc::ActorTableData::DEAD) {
task_finisher_->MarkTaskCanceled(task_spec.TaskId());
auto status = Status::IOError("cancelling all pending tasks of dead actor");
// No need to increment the number of completed tasks since the actor is
// dead.
RAY_UNUSED(!task_finisher_->PendingTaskFailed(task_spec.TaskId(),
rpc::ErrorType::ACTOR_DIED, &status));
} else {
// We must fix the send order prior to resolving dependencies, which may
// complete out of order. This ensures that we will not deadlock due to
// backpressure. The receiving actor will execute the tasks according to
// this sequence number.
send_pos = task_spec.ActorCounter();
auto inserted =
queue->second.requests.emplace(send_pos, std::make_pair(task_spec, false));
RAY_CHECK(inserted.second);
task_queued = true;
}
}
resolver_.ResolveDependencies(task_spec, [this, send_pos, task_spec]() mutable {
const auto &actor_id = task_spec.ActorId();
auto request = std::unique_ptr<rpc::PushTaskRequest>(new rpc::PushTaskRequest);
request->mutable_caller_address()->CopyFrom(rpc_address_);
// NOTE(swang): CopyFrom is needed because if we use Swap here and the task
// fails, then the task data will be gone when the TaskManager attempts to
// access the task.
request->mutable_task_spec()->CopyFrom(task_spec.GetMessage());
request->set_caller_version(caller_creation_timestamp_ms_);
absl::MutexLock lock(&mu_);
auto inserted = pending_requests_[actor_id].emplace(send_pos, std::move(request));
RAY_CHECK(inserted.second);
auto it = rpc_clients_.find(actor_id);
if (it == rpc_clients_.end()) {
// Actor is not yet created, or is being restarted, cache the request
// and submit after actor is alive.
// TODO(zhijunfu): it might be possible for a user to specify an invalid
// actor handle (e.g. from unpickling), in that case it might be desirable
// to have a timeout to mark it as invalid if it doesn't show up in the
// specified time.
RAY_LOG(DEBUG) << "Actor " << actor_id << " is not yet created.";
} else {
SendPendingTasks(actor_id);
}
});
if (task_queued) {
const auto actor_id = task_spec.ActorId();
resolver_.ResolveDependencies(task_spec, [this, send_pos, actor_id]() {
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
auto it = queue->second.requests.find(send_pos);
// Only dispatch tasks if the submitted task is still queued. The task
// may have been dequeued if the actor has since failed.
if (it != queue->second.requests.end()) {
it->second.second = true;
SendPendingTasks(actor_id);
}
});
}
// If the task submission subsequently fails, then the client will receive
// the error in a callback.
@@ -104,105 +112,156 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id,
const rpc::Address &address) {
absl::MutexLock lock(&mu_);
// Update the mapping so new RPCs go out with the right intended worker id.
worker_ids_[actor_id] = address.worker_id();
// Create a new connection to the actor.
// TODO(edoakes): are these clients cleaned up properly?
if (rpc_clients_.count(actor_id) == 0) {
rpc_clients_[actor_id] =
std::shared_ptr<rpc::CoreWorkerClientInterface>(client_factory_(address));
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
if (queue->second.rpc_client) {
// Skip reconnection if we already have a client to this actor.
// NOTE(swang): This seems to only trigger in multithreaded Java tests.
RAY_CHECK(queue->second.worker_id == address.worker_id());
return;
}
queue->second.state = rpc::ActorTableData::ALIVE;
// Update the mapping so new RPCs go out with the right intended worker id.
queue->second.worker_id = address.worker_id();
// Create a new connection to the actor.
queue->second.rpc_client =
std::shared_ptr<rpc::CoreWorkerClientInterface>(client_factory_(address));
// TODO(swang): This assumes that all replies from the previous incarnation
// of the actor have been received. Fix this by setting an epoch for each
// actor task, so we can ignore completed tasks from old epochs.
RAY_LOG(INFO) << "Resetting caller starts at for actor " << actor_id << " from "
<< queue->second.caller_starts_at << " to "
<< queue->second.num_completed_tasks;
queue->second.caller_starts_at = queue->second.num_completed_tasks;
SendPendingTasks(actor_id);
}
void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(const ActorID &actor_id,
bool dead) {
absl::MutexLock lock(&mu_);
if (!dead) {
// We're restarting the actor, so erase the client for now. The new client
// will be inserted once actor reconstruction completes. We don't erase the
// client when the actor is DEAD, so that all further tasks will be failed.
rpc_clients_.erase(actor_id);
worker_ids_.erase(actor_id);
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
if (dead) {
queue->second.state = rpc::ActorTableData::DEAD;
} else {
queue->second.state = rpc::ActorTableData::RESTARTING;
}
// The actor failed, so erase the client for now. Either the actor is
// permanently dead or the new client will be inserted once the actor is
// restarted.
queue->second.rpc_client = nullptr;
queue->second.worker_id.clear();
queue->second.pending_force_kill.reset();
// If there are pending requests, treat the pending tasks as failed.
if (dead) {
RAY_LOG(INFO) << "Failing pending tasks for actor " << actor_id;
// If there are pending requests, treat the pending tasks as failed.
auto pending_it = pending_requests_.find(actor_id);
if (pending_it != pending_requests_.end()) {
auto head = pending_it->second.begin();
while (head != pending_it->second.end()) {
auto request = std::move(head->second);
head = pending_it->second.erase(head);
auto task_id = TaskID::FromBinary(request->task_spec().task_id());
auto status = Status::IOError("cancelling all pending tasks of dead actor");
task_finisher_->PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED, &status);
}
pending_requests_.erase(pending_it);
auto &requests = queue->second.requests;
auto head = requests.begin();
while (head != requests.end()) {
const auto &task_spec = head->second.first;
task_finisher_->MarkTaskCanceled(task_spec.TaskId());
auto status = Status::IOError("cancelling all pending tasks of dead actor");
// No need to increment the number of completed tasks since the actor is
// dead.
RAY_UNUSED(!task_finisher_->PendingTaskFailed(task_spec.TaskId(),
rpc::ErrorType::ACTOR_DIED, &status));
head = requests.erase(head);
}
// No need to clean up tasks that have been sent and are waiting for
// replies. They will be treated as failed once the connection dies.
// We retain the sequencing information so that we can properly fail
// any tasks submitted after the actor death.
pending_force_kills_.erase(actor_id);
}
}
void CoreWorkerDirectActorTaskSubmitter::SendPendingTasks(const ActorID &actor_id) {
auto &client = rpc_clients_[actor_id];
RAY_CHECK(client);
auto it = client_queues_.find(actor_id);
RAY_CHECK(it != client_queues_.end());
if (!it->second.rpc_client) {
return;
}
// Check if there is a pending force kill. If there is, send it and disconnect the
// client.
auto it = pending_force_kills_.find(actor_id);
if (it != pending_force_kills_.end()) {
if (it->second.pending_force_kill) {
RAY_LOG(INFO) << "Sending KillActor request to actor " << actor_id;
// It's okay if this fails because this means the worker is already dead.
RAY_UNUSED(client->KillActor(it->second, nullptr));
pending_force_kills_.erase(it);
RAY_UNUSED(it->second.rpc_client->KillActor(*it->second.pending_force_kill, nullptr));
it->second.pending_force_kill.reset();
}
// Submit all pending requests.
auto &requests = pending_requests_[actor_id];
auto &requests = it->second.requests;
auto head = requests.begin();
while (head != requests.end() && head->first == next_send_position_[actor_id]) {
auto request = std::move(head->second);
while (head != requests.end() && head->first <= it->second.next_send_position &&
head->second.second) {
// If the task has been sent before, skip the other tasks in the send
// queue.
bool skip_queue = head->first < it->second.next_send_position;
auto task_spec = std::move(head->second.first);
head = requests.erase(head);
auto num_returns = request->task_spec().num_returns();
auto task_id = TaskID::FromBinary(request->task_spec().task_id());
PushActorTask(*client, std::move(request), actor_id, task_id, num_returns);
RAY_CHECK(!it->second.worker_id.empty());
PushActorTask(it->second, task_spec, skip_queue);
it->second.next_send_position++;
}
}
void CoreWorkerDirectActorTaskSubmitter::PushActorTask(
rpc::CoreWorkerClientInterface &client, std::unique_ptr<rpc::PushTaskRequest> request,
const ActorID &actor_id, const TaskID &task_id, int num_returns) {
RAY_LOG(DEBUG) << "Pushing task " << task_id << " to actor " << actor_id;
next_send_position_[actor_id]++;
auto it = worker_ids_.find(actor_id);
RAY_CHECK(it != worker_ids_.end()) << "Actor worker id not found " << actor_id.Hex();
request->set_intended_worker_id(it->second);
rpc::Address addr(client.Addr());
RAY_CHECK_OK(client.PushActorTask(
std::move(request),
[this, addr, task_id](Status status, const rpc::PushTaskReply &reply) {
void CoreWorkerDirectActorTaskSubmitter::PushActorTask(const ClientQueue &queue,
const TaskSpecification &task_spec,
bool skip_queue) {
auto request = std::unique_ptr<rpc::PushTaskRequest>(new rpc::PushTaskRequest());
// NOTE(swang): CopyFrom is needed because if we use Swap here and the task
// fails, then the task data will be gone when the TaskManager attempts to
// access the task.
request->mutable_task_spec()->CopyFrom(task_spec.GetMessage());
request->set_intended_worker_id(queue.worker_id);
RAY_CHECK(task_spec.ActorCounter() >= queue.caller_starts_at)
<< "actor counter " << task_spec.ActorCounter() << " " << queue.caller_starts_at;
request->set_sequence_number(task_spec.ActorCounter() - queue.caller_starts_at);
const auto task_id = task_spec.TaskId();
const auto actor_id = task_spec.ActorId();
const auto counter = task_spec.ActorCounter();
RAY_LOG(DEBUG) << "Pushing task " << task_id << " to actor " << actor_id
<< " actor counter " << counter << " seq no "
<< request->sequence_number();
rpc::Address addr(queue.rpc_client->Addr());
RAY_UNUSED(queue.rpc_client->PushActorTask(
std::move(request), skip_queue,
[this, addr, task_id, actor_id](Status status, const rpc::PushTaskReply &reply) {
bool increment_completed_tasks = true;
if (!status.ok()) {
task_finisher_->PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED, &status);
bool will_retry = task_finisher_->PendingTaskFailed(
task_id, rpc::ErrorType::ACTOR_DIED, &status);
if (will_retry) {
increment_completed_tasks = false;
}
} else {
task_finisher_->CompletePendingTask(task_id, reply, addr);
}
if (increment_completed_tasks) {
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
queue->second.num_completed_tasks++;
}
}));
}
bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) const {
absl::MutexLock lock(&mu_);
auto iter = rpc_clients_.find(actor_id);
return (iter != rpc_clients_.end());
}
void CoreWorkerDirectActorTaskSubmitter::SetCallerCreationTimestamp(int64_t timestamp) {
caller_creation_timestamp_ms_ = timestamp;
auto iter = client_queues_.find(actor_id);
return (iter != client_queues_.end() && iter->second.rpc_client);
}
void CoreWorkerDirectTaskReceiver::Init(
@@ -284,7 +343,7 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
<< ", actor_id: " << task_spec.ActorCreationId();
// Tell raylet that an actor creation task has finished execution, so that
// raylet can publish actor creation event to GCS, and mark this worker as
// actor, thus if this worker dies later raylet will reconstruct the actor.
// actor, thus if this worker dies later raylet will restart the actor.
RAY_CHECK_OK(task_done_());
}
}
@@ -315,50 +374,15 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
send_reply_callback(Status::Invalid("client cancelled stale rpc"), nullptr, nullptr);
};
auto caller_worker_id = WorkerID::FromBinary(request.caller_address().worker_id());
auto caller_version = request.caller_version();
auto it = scheduling_queue_.find(task_spec.CallerId());
if (it != scheduling_queue_.end()) {
if (it->second.first.caller_worker_id != caller_worker_id) {
// We received a request with the same caller ID, but from a different worker,
// this indicates the caller (actor) is restarted.
if (it->second.first.caller_creation_timestamp_ms < caller_version) {
// The new request has a newer caller version, then remove the old entry
// from scheduling queue since it's invalid now.
RAY_LOG(INFO) << "Remove existing scheduling queue for caller "
<< task_spec.CallerId() << " after receiving a "
<< "request from a different worker ID with a newer "
<< "version, old worker ID: " << it->second.first.caller_worker_id
<< ", new worker ID" << caller_worker_id;
scheduling_queue_.erase(task_spec.CallerId());
it = scheduling_queue_.end();
} else {
// The existing caller has the newer version, this indicates the request
// is from an old caller, which might be possible when network has problems.
// In this case fail this request.
RAY_LOG(WARNING) << "Ignoring request from an old caller because "
<< "it has a smaller timestamp, old worker ID: "
<< caller_worker_id << ", current worker ID"
<< it->second.first.caller_worker_id;
// Fail request with an old caller version.
reject_callback();
return;
}
}
}
auto it = scheduling_queue_.find(task_spec.CallerWorkerId());
if (it == scheduling_queue_.end()) {
SchedulingQueueTag tag;
tag.caller_worker_id = caller_worker_id;
tag.caller_creation_timestamp_ms = caller_version;
auto result = scheduling_queue_.emplace(
task_spec.CallerId(),
std::make_pair(tag, std::unique_ptr<SchedulingQueue>(new SchedulingQueue(
task_main_io_service_, *waiter_, worker_context_))));
task_spec.CallerWorkerId(),
SchedulingQueue(task_main_io_service_, *waiter_, worker_context_));
it = result.first;
}
it->second.second->Add(request.sequence_number(), request.client_processed_up_to(),
accept_callback, reject_callback, dependencies);
it->second.Add(request.sequence_number(), request.client_processed_up_to(),
accept_callback, reject_callback, dependencies);
}
void CoreWorkerDirectTaskReceiver::HandleDirectActorCallArgWaitComplete(
@@ -50,15 +50,21 @@ const int kMaxReorderWaitSeconds = 30;
// This class is thread-safe.
class CoreWorkerDirectActorTaskSubmitter {
public:
CoreWorkerDirectActorTaskSubmitter(rpc::Address rpc_address,
rpc::ClientFactoryFn client_factory,
CoreWorkerDirectActorTaskSubmitter(rpc::ClientFactoryFn client_factory,
std::shared_ptr<CoreWorkerMemoryStore> store,
std::shared_ptr<TaskFinisherInterface> task_finisher)
: rpc_address_(rpc_address),
client_factory_(client_factory),
: client_factory_(client_factory),
resolver_(store, task_finisher),
task_finisher_(task_finisher) {}
/// Add an actor queue. This should be called whenever a reference to an
/// actor is created in the language frontend.
/// TODO(swang): Remove the actor queue once it is sure that this worker will
/// not receive another reference to the same actor.
///
/// \param[in] actor_id The actor for whom to add a queue.
void AddActorQueueIfNotExists(const ActorID &actor_id);
/// Submit a task to an actor for execution.
///
/// \param[in] task The task spec to submit.
@@ -89,20 +95,84 @@ class CoreWorkerDirectActorTaskSubmitter {
void SetCallerCreationTimestamp(int64_t timestamp);
private:
struct ClientQueue {
/// The current state of the actor. If this is ALIVE, then we should have
/// an RPC client to the actor. If this is DEAD, then all tasks in the
/// queue will be marked failed and all other ClientQueue state is ignored.
rpc::ActorTableData::ActorState state = rpc::ActorTableData::PENDING;
/// The RPC client. We use shared_ptr to enable shared_from_this for
/// pending client callbacks.
std::shared_ptr<rpc::CoreWorkerClientInterface> rpc_client = nullptr;
/// The intended worker ID of the actor.
std::string worker_id = "";
/// The actor's pending requests, ordered by the task number (see below
/// diagram) in the request. The bool indicates whether the dependencies
/// for that task have been resolved yet. A task will be sent after its
/// dependencies have been resolved and its task number matches
/// next_send_position.
std::map<uint64_t, std::pair<TaskSpecification, bool>> requests;
/// Diagram of the sequence numbers assigned to actor tasks during actor
/// crash and restart:
///
/// The actor starts, and 10 tasks are submitted. We have sent 6 tasks
/// (0-5) so far, and have received a successful reply for 4 tasks (0-3).
/// 0 1 2 3 4 5 6 7 8 9
/// ^ next_send_position
/// ^ num_completed_tasks
/// ^ caller_starts_at
///
/// Suppose the actor crashes and recovers. Then, caller_starts_at is reset
/// to the current num_completed_tasks. caller_starts_at is then subtracted
/// from each task's counter, so the recovered actor will receive the
/// sequence numbers 0, 1, 2 (and so on) for tasks 4, 5, 6, respectively.
/// Therefore, the recovered actor will restart execution from task 4.
/// 0 1 2 3 4 5 6 7 8 9
/// ^ next_send_position
/// ^ num_completed_tasks
/// ^ caller_starts_at
///
/// New actor tasks will continue to be sent even while tasks are being
/// resubmitted, but the receiving worker will only execute them after the
/// resent tasks. For example, this diagram shows what happens if task 6 is
/// sent for the first time, tasks 4 and 5 have been resent, and we have
/// received a successful reply for task 4.
/// 0 1 2 3 4 5 6 7 8 9
/// ^ next_send_position
/// ^ num_completed_tasks
/// ^ caller_starts_at
///
/// The send position of the next task to send to this actor. This sequence
/// number increases monotonically.
uint64_t next_send_position = 0;
/// The offset at which the the actor should start its counter for this
/// caller. This is used for actors that can be restarted, so that the new
/// instance of the actor knows from which task to start executing.
uint64_t caller_starts_at = 0;
/// Out of the tasks sent by this worker to the actor, the number of tasks
/// that we will never send to the actor again. This is used to reset
/// caller_starts_at if the actor dies and is restarted. We only include
/// tasks that will not be sent again, to support automatic task retry on
/// actor failure.
uint64_t num_completed_tasks = 0;
/// A force-kill request that should be sent to the actor once an RPC
/// client to the actor is available.
absl::optional<rpc::KillActorRequest> pending_force_kill;
};
/// Push a task to a remote actor via the given client.
/// Note, this function doesn't return any error status code. If an error occurs while
/// sending the request, this task will be treated as failed.
///
/// \param[in] client The RPC client to send tasks to an actor.
/// \param[in] request The request to send.
/// \param[in] actor_id Actor ID.
/// \param[in] task_id The ID of a task.
/// \param[in] num_returns Number of return objects.
/// \param[in] queue The actor queue. Contains the RPC client state.
/// \param[in] task_spec The task to send.
/// \param[in] skip_queue Whether to skip the task queue. This will send the
/// task for execution immediately.
/// \return Void.
void PushActorTask(rpc::CoreWorkerClientInterface &client,
std::unique_ptr<rpc::PushTaskRequest> request,
const ActorID &actor_id, const TaskID &task_id, int num_returns)
EXCLUSIVE_LOCKS_REQUIRED(mu_);
void PushActorTask(const ClientQueue &queue, const TaskSpecification &task_spec,
bool skip_queue) EXCLUSIVE_LOCKS_REQUIRED(mu_);
/// Send all pending tasks for an actor.
/// Note that this function doesn't take lock, the caller is expected to hold
@@ -124,39 +194,7 @@ class CoreWorkerDirectActorTaskSubmitter {
/// Mutex to proect the various maps below.
mutable absl::Mutex mu_;
/// Address of our RPC server.
rpc::Address rpc_address_;
/// Map from actor id to rpc client. This only includes actors that we send tasks to.
/// We use shared_ptr to enable shared_from_this for pending client callbacks.
///
/// TODO(zhijunfu): this will be moved into `actor_states_` later when we can
/// subscribe updates for a specific actor.
absl::flat_hash_map<ActorID, std::shared_ptr<rpc::CoreWorkerClientInterface>>
rpc_clients_ GUARDED_BY(mu_);
/// Map from actor ids to worker ids. TODO(ekl) consider unifying this with the
/// rpc_clients_ map.
absl::flat_hash_map<ActorID, std::string> worker_ids_ GUARDED_BY(mu_);
/// Map from actor ids that should be force killed once a client is available to the
/// pending kill actor requests.
absl::flat_hash_map<ActorID, rpc::KillActorRequest> pending_force_kills_
GUARDED_BY(mu_);
/// Map from actor id to the actor's pending requests. Each actor's requests
/// are ordered by the task number in the request.
absl::flat_hash_map<ActorID, std::map<int64_t, std::unique_ptr<rpc::PushTaskRequest>>>
pending_requests_ GUARDED_BY(mu_);
/// Map from actor id to the send position of the next task to queue for send
/// for that actor. This is always greater than or equal to next_send_position_.
absl::flat_hash_map<ActorID, int64_t> next_send_position_to_assign_ GUARDED_BY(mu_);
/// Map from actor id to the send position of the next task to send to that actor.
/// Note that this differs from the PushTaskRequest's sequence number in that it
/// increases monotonically in this process independently of CallerId changes.
absl::flat_hash_map<ActorID, int64_t> next_send_position_ GUARDED_BY(mu_);
absl::flat_hash_map<ActorID, ClientQueue> client_queues_ GUARDED_BY(mu_);
/// Resolve direct call object dependencies;
LocalDependencyResolver resolver_;
@@ -164,12 +202,6 @@ class CoreWorkerDirectActorTaskSubmitter {
/// Used to complete tasks.
std::shared_ptr<TaskFinisherInterface> task_finisher_;
/// Timestamp when the caller is created.
/// - if this worker is an actor, this is set to the time that the actor creation
/// task starts execution;
/// - otherwise, it's set to the time that the current task starts execution.
int64_t caller_creation_timestamp_ms_ = 0;
friend class CoreWorkerTest;
};
@@ -262,13 +294,6 @@ class BoundedExecutor {
boost::asio::thread_pool pool_;
};
struct SchedulingQueueTag {
/// Worker ID for the caller.
WorkerID caller_worker_id;
/// Timestamp for the caller, which is used as a version.
int64_t caller_creation_timestamp_ms = 0;
};
/// Used to ensure serial order of task execution per actor handle.
/// See direct_actor.proto for a description of the ordering protocol.
class SchedulingQueue {
@@ -475,9 +500,7 @@ class CoreWorkerDirectTaskReceiver {
std::unique_ptr<DependencyWaiterImpl> waiter_;
/// Queue of pending requests per actor handle.
/// TODO(ekl) GC these queues once the handle is no longer active.
std::unordered_map<TaskID,
std::pair<SchedulingQueueTag, std::unique_ptr<SchedulingQueue>>>
scheduling_queue_;
std::unordered_map<WorkerID, SchedulingQueue> scheduling_queue_;
};
} // namespace ray
@@ -39,8 +39,8 @@ Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
} else {
RAY_LOG(ERROR) << "Failed to create actor " << actor_id
<< " with: " << status.ToString();
task_finisher_->PendingTaskFailed(
task_id, rpc::ErrorType::ACTOR_CREATION_FAILED, &status);
RAY_UNUSED(task_finisher_->PendingTaskFailed(
task_id, rpc::ErrorType::ACTOR_CREATION_FAILED, &status));
}
}));
return;
@@ -70,8 +70,8 @@ Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
}
}
if (!keep_executing) {
task_finisher_->PendingTaskFailed(task_spec.TaskId(),
rpc::ErrorType::TASK_CANCELLED, nullptr);
RAY_UNUSED(task_finisher_->PendingTaskFailed(
task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED, nullptr));
}
});
return Status::OK();
@@ -256,7 +256,6 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask(
// NOTE(swang): CopyFrom is needed because if we use Swap here and the task
// fails, then the task data will be gone when the TaskManager attempts to
// access the task.
request->mutable_caller_address()->CopyFrom(rpc_address_);
request->mutable_task_spec()->CopyFrom(task_spec.GetMessage());
request->mutable_resource_mapping()->CopyFrom(assigned_resources);
request->set_intended_worker_id(addr.worker_id.Binary());
@@ -284,10 +283,10 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask(
// failure (e.g., by contacting the raylet). If it was a process
// failure, it may have been an application-level error and it may
// not make sense to retry the task.
task_finisher_->PendingTaskFailed(
RAY_UNUSED(task_finisher_->PendingTaskFailed(
task_id,
is_actor ? rpc::ErrorType::ACTOR_DIED : rpc::ErrorType::WORKER_DIED,
&status);
&status));
} else {
task_finisher_->CompletePendingTask(task_id, reply, addr.ToProto());
}
@@ -321,8 +320,8 @@ Status CoreWorkerDirectTaskSubmitter::CancelTask(TaskSpecification task_spec,
task_queues_.erase(scheduling_key);
CancelWorkerLeaseIfNeeded(scheduling_key);
}
task_finisher_->PendingTaskFailed(task_spec.TaskId(),
rpc::ErrorType::TASK_CANCELLED);
RAY_UNUSED(task_finisher_->PendingTaskFailed(task_spec.TaskId(),
rpc::ErrorType::TASK_CANCELLED));
return Status::OK();
}
}
+7 -13
View File
@@ -50,6 +50,9 @@ message ActorHandle {
// An extension field that is used for storing app-language-specific data.
bytes extension_data = 8;
// How many times tasks may be retried on this actor if the actor fails.
int64 max_task_retries = 9;
}
message AssignTaskRequest {
@@ -89,30 +92,21 @@ message ReturnObject {
message PushTaskRequest {
// The ID of the worker this message is intended for.
bytes intended_worker_id = 1;
// Address of the caller.
Address caller_address = 2;
// The task to be pushed.
TaskSpec task_spec = 3;
TaskSpec task_spec = 2;
// The sequence number of the task for this client. This must increase
// sequentially starting from zero for each actor handle. The server
// will guarantee tasks execute in this sequence, waiting for any
// out-of-order request messages to arrive as necessary.
// If set to -1, ordering is disabled and the task executes immediately.
int64 sequence_number = 4;
int64 sequence_number = 3;
// The max sequence number the client has processed responses for. This
// is a performance optimization that allows the client to tell the server
// to cancel any PushTaskRequests with seqno <= this value, rather than
// waiting for the server to time out waiting for missing messages.
int64 client_processed_up_to = 5;
int64 client_processed_up_to = 4;
// Resource mapping ids assigned to the worker executing the task.
repeated ResourceMapEntry resource_mapping = 6;
// The version of the caller. This is used to distinguish on-the-fly
// requests from a caller before it die, and requests from the restarted
// caller, which might happen theoretically when network has issues.
// - For an actor, this is set to the timestamp when the actor is created,
// so it can be used to differentiate which is the newly restarted actor.
// - For a non-actor task, it's set to the timestamp the task starts execution.
int64 caller_version = 7;
repeated ResourceMapEntry resource_mapping = 5;
}
message PushTaskReply {
+13 -11
View File
@@ -112,9 +112,12 @@ class CoreWorkerClientInterface {
/// Push an actor task directly from worker to worker.
///
/// \param[in] request The request message.
/// \param[in] skip_queue Whether to skip the task queue. This will send the
/// task for execution immediately.
/// \param[in] callback The callback function that handles reply.
/// \return if the rpc call succeeds
virtual ray::Status PushActorTask(std::unique_ptr<PushTaskRequest> request,
bool skip_queue,
const ClientCallback<PushTaskReply> &callback) {
return Status::NotImplemented("");
}
@@ -244,16 +247,19 @@ class CoreWorkerClient : public std::enable_shared_from_this<CoreWorkerClient>,
RPC_CLIENT_METHOD(CoreWorkerService, PlasmaObjectReady, grpc_client_, override)
ray::Status PushActorTask(std::unique_ptr<PushTaskRequest> request,
ray::Status PushActorTask(std::unique_ptr<PushTaskRequest> request, bool skip_queue,
const ClientCallback<PushTaskReply> &callback) override {
request->set_sequence_number(request->task_spec().actor_task_spec().actor_counter());
if (skip_queue) {
// Set this value so that the actor does not skip any tasks when
// processing this request. We could also set it to max_finished_seq_no_,
// but we just set it to the default of -1 to avoid taking the lock.
request->set_client_processed_up_to(-1);
return INVOKE_RPC_CALL(CoreWorkerService, PushTask, *request, callback,
grpc_client_);
}
{
std::lock_guard<std::mutex> lock(mutex_);
if (request->task_spec().caller_id() != cur_caller_id_) {
// We are running a new task, reset the seq no counter.
max_finished_seq_no_ = -1;
cur_caller_id_ = request->task_spec().caller_id();
}
send_queue_.push_back(std::make_pair(std::move(request), callback));
}
SendRequests();
@@ -331,10 +337,6 @@ class CoreWorkerClient : public std::enable_shared_from_this<CoreWorkerClient>,
/// The max sequence number we have processed responses for.
int64_t max_finished_seq_no_ GUARDED_BY(mutex_) = -1;
/// The task id we are currently sending requests for. When this changes,
/// the max finished seq no counter is reset.
std::string cur_caller_id_;
};
} // namespace rpc
+3 -2
View File
@@ -275,8 +275,9 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
std::string name = "";
ActorCreationOptions actor_options{
max_restarts,
/*max_concurrency=*/1, resources, resources, {},
/*is_detached=*/false, name, /*is_asyncio=*/false};
/*max_task_retries=*/0,
/*max_concurrency=*/1, resources, resources, {},
/*is_detached=*/false, name, /*is_asyncio=*/false};
// Create an actor.
ActorID actor_id;
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().CreateActor(