[direct task] Retry tasks on failure and turn on RAY_FORCE_DIRECT for test_multinode_failures.py (#6306)

* multinode failures direct

* Add number of retries allowed for tasks

* Retry tasks

* Add failing test for object reconstruction

* Handle return status and debug

* update

* Retry task unit test

* update

* update

* todo

* Fix max_retries decorator, fix test

* Fix test that flaked

* lint

* comments
This commit is contained in:
Stephanie Wang
2019-12-02 10:20:57 -08:00
committed by GitHub
parent 0b0a16982a
commit da41180dc0
21 changed files with 284 additions and 63 deletions
+4 -2
View File
@@ -911,7 +911,8 @@ cdef class CoreWorker:
args,
int num_return_vals,
c_bool is_direct_call,
resources):
resources,
int max_retries):
cdef:
unordered_map[c_string, double] c_resources
CTaskOptions task_options
@@ -929,7 +930,8 @@ cdef class CoreWorker:
with nogil:
check_status(self.core_worker.get().SubmitTask(
ray_function, args_vector, task_options, &return_ids))
ray_function, args_vector, task_options, &return_ids,
max_retries))
return VectorToObjectIDs(return_ids)
+2 -1
View File
@@ -86,7 +86,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CRayStatus SubmitTask(
const CRayFunction &function, const c_vector[CTaskArg] &args,
const CTaskOptions &options, c_vector[CObjectID] *return_ids)
const CTaskOptions &options, c_vector[CObjectID] *return_ids,
int max_retries)
CRayStatus CreateActor(
const CRayFunction &function, const c_vector[CTaskArg] &args,
const CActorCreationOptions &options, CActorID *actor_id)
+12 -3
View File
@@ -14,6 +14,9 @@ import ray.signature
DEFAULT_REMOTE_FUNCTION_CPUS = 1
DEFAULT_REMOTE_FUNCTION_NUM_RETURN_VALS = 1
DEFAULT_REMOTE_FUNCTION_MAX_CALLS = 0
# Normal tasks may be retried on failure this many times.
# TODO(swang): Allow this to be set globally for an application.
DEFAULT_REMOTE_FUNCTION_NUM_TASK_RETRIES = 3
logger = logging.getLogger(__name__)
@@ -59,7 +62,8 @@ class RemoteFunction(object):
"""
def __init__(self, function, num_cpus, num_gpus, memory,
object_store_memory, resources, num_return_vals, max_calls):
object_store_memory, resources, num_return_vals, max_calls,
max_retries):
self._function = function
self._function_name = (
self._function.__module__ + "." + self._function.__name__)
@@ -76,6 +80,8 @@ class RemoteFunction(object):
num_return_vals is None else num_return_vals)
self._max_calls = (DEFAULT_REMOTE_FUNCTION_MAX_CALLS
if max_calls is None else max_calls)
self._max_retries = (DEFAULT_REMOTE_FUNCTION_NUM_TASK_RETRIES
if max_retries is None else max_retries)
self._decorator = getattr(function, "__ray_invocation_decorator__",
None)
@@ -142,7 +148,8 @@ class RemoteFunction(object):
num_gpus=None,
memory=None,
object_store_memory=None,
resources=None):
resources=None,
max_retries=None):
"""Submit the remote function for execution."""
worker = ray.worker.get_global_worker()
worker.check_connected()
@@ -176,6 +183,8 @@ class RemoteFunction(object):
num_return_vals = self._num_return_vals
if is_direct_call is None:
is_direct_call = self.direct_call_enabled
if max_retries is None:
max_retries = self._max_retries
resources = ray.utils.resources_from_resource_arguments(
self._num_cpus, self._num_gpus, self._memory,
@@ -196,7 +205,7 @@ class RemoteFunction(object):
else:
object_ids = worker.core_worker.submit_task(
self._function_descriptor_list, list_args, num_return_vals,
is_direct_call, resources)
is_direct_call, resources, max_retries)
if len(object_ids) == 1:
return object_ids[0]
+8
View File
@@ -70,6 +70,14 @@ py_test(
deps = ["//:ray_lib"],
)
py_test(
name = "test_multinode_failures_direct",
size = "medium",
srcs = ["test_multinode_failures_direct.py", "test_multinode_failures.py"],
tags = ["exclusive", "manual"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_stress",
size = "large",
+1 -1
View File
@@ -308,7 +308,7 @@ def test_worker_raising_exception(ray_start_regular):
def test_worker_dying(ray_start_regular):
# Define a remote function that will kill the worker that runs it.
@ray.remote
@ray.remote(max_retries=0)
def f():
eval("exit()")
+87 -4
View File
@@ -16,6 +16,8 @@ import ray.ray_constants as ray_constants
from ray.cluster_utils import Cluster
from ray.test_utils import RayTestTimeoutException
RAY_FORCE_DIRECT = bool(os.environ.get("RAY_FORCE_DIRECT"))
@pytest.fixture(params=[(1, 4), (4, 4)])
def ray_start_workers_separate_multinode(request):
@@ -83,10 +85,20 @@ def _test_component_failed(cluster, component_type):
# Submit many tasks with many dependencies.
@ray.remote
def f(x):
if RAY_FORCE_DIRECT:
# Sleep to make sure that tasks actually fail mid-execution. We
# only use it for direct calls because the test already takes a
# long time to run with the raylet codepath.
time.sleep(0.01)
return x
@ray.remote
def g(*xs):
if RAY_FORCE_DIRECT:
# Sleep to make sure that tasks actually fail mid-execution. We
# only use it for direct calls because the test already takes a
# long time to run with the raylet codepath.
time.sleep(0.01)
return 1
# Kill the component on all nodes except the head node as the tasks
@@ -138,11 +150,13 @@ def check_components_alive(cluster, component_type, check_component_alive):
@pytest.mark.parametrize(
"ray_start_cluster", [{
"ray_start_cluster",
[{
"num_cpus": 8,
"num_nodes": 4,
"_internal_config": json.dumps({
"num_heartbeats_timeout": 100
# Raylet codepath is not stable with a shorter timeout.
"num_heartbeats_timeout": 10 if RAY_FORCE_DIRECT else 100
}),
}],
indirect=True)
@@ -156,15 +170,83 @@ def test_raylet_failed(ray_start_cluster):
True)
@pytest.mark.skipif(
RAY_FORCE_DIRECT,
reason="No reconstruction for objects placed in plasma yet")
@pytest.mark.parametrize(
"ray_start_cluster",
[{
# Force at least one task per node.
"num_cpus": 1,
"num_nodes": 4,
"object_store_memory": 1000 * 1024 * 1024,
"_internal_config": json.dumps({
# Raylet codepath is not stable with a shorter timeout.
"num_heartbeats_timeout": 10 if RAY_FORCE_DIRECT else 100,
"object_manager_pull_timeout_ms": 1000,
"object_manager_push_timeout_ms": 1000,
"object_manager_repeated_push_delay_ms": 1000,
}),
}],
indirect=True)
def test_object_reconstruction(ray_start_cluster):
cluster = ray_start_cluster
# Submit tasks with dependencies in plasma.
@ray.remote
def large_value():
# Sleep for a bit to force tasks onto different nodes.
time.sleep(0.1)
return np.zeros(10 * 1024 * 1024)
@ray.remote
def g(x):
return
# Kill the component on all nodes except the head node as the tasks
# execute. Do this in a loop while submitting tasks between each
# component failure.
time.sleep(0.1)
worker_nodes = cluster.list_all_nodes()[1:]
assert len(worker_nodes) > 0
component_type = ray_constants.PROCESS_TYPE_RAYLET
for node in worker_nodes:
process = node.all_processes[component_type][0].process
# Submit a round of tasks with many dependencies.
num_tasks = len(worker_nodes)
xs = [large_value.remote() for _ in range(num_tasks)]
# Wait for the tasks to complete, then evict the objects from the local
# node.
for x in xs:
ray.get(x)
ray.internal.free([x], local_only=True)
# Kill a component on one of the nodes.
process.terminate()
time.sleep(1)
process.kill()
process.wait()
assert not process.poll() is None
# Make sure that we can still get the objects after the
# executing tasks died.
print("F", xs)
xs = [g.remote(x) for x in xs]
print("G", xs)
ray.get(xs)
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
@pytest.mark.parametrize(
"ray_start_cluster", [{
"ray_start_cluster",
[{
"num_cpus": 8,
"num_nodes": 2,
"_internal_config": json.dumps({
"num_heartbeats_timeout": 100
# Raylet codepath is not stable with a shorter timeout.
"num_heartbeats_timeout": 10 if RAY_FORCE_DIRECT else 100
}),
}],
indirect=True)
@@ -179,6 +261,7 @@ def test_plasma_store_failed(ray_start_cluster):
check_components_alive(cluster, ray_constants.PROCESS_TYPE_RAYLET, False)
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="no actor restart yet")
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 4,
@@ -0,0 +1,18 @@
"""Wrapper script that sets RAY_FORCE_DIRECT."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import sys
import os
if __name__ == "__main__":
os.environ["RAY_FORCE_DIRECT"] = "1"
sys.exit(
pytest.main([
"-v",
os.path.join(
os.path.dirname(__file__), "test_multinode_failures.py")
]))
+6 -1
View File
@@ -1621,6 +1621,7 @@ def make_decorator(num_return_vals=None,
object_store_memory=None,
resources=None,
max_calls=None,
max_retries=None,
max_reconstructions=None,
worker=None):
def decorator(function_or_class):
@@ -1633,7 +1634,8 @@ def make_decorator(num_return_vals=None,
return ray.remote_function.RemoteFunction(
function_or_class, num_cpus, num_gpus, memory,
object_store_memory, resources, num_return_vals, max_calls)
object_store_memory, resources, num_return_vals, max_calls,
max_retries)
if inspect.isclass(function_or_class):
if num_return_vals is not None:
@@ -1732,6 +1734,7 @@ def remote(*args, **kwargs):
"resources",
"max_calls",
"max_reconstructions",
"max_retries",
], error_string
num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else None
@@ -1751,6 +1754,7 @@ def remote(*args, **kwargs):
max_reconstructions = kwargs.get("max_reconstructions")
memory = kwargs.get("memory")
object_store_memory = kwargs.get("object_store_memory")
max_retries = kwargs.get("max_retries")
return make_decorator(
num_return_vals=num_return_vals,
@@ -1761,4 +1765,5 @@ def remote(*args, **kwargs):
resources=resources,
max_calls=max_calls,
max_reconstructions=max_reconstructions,
max_retries=max_retries,
worker=worker)