[direct call] Retry failed tasks with delay (#6453)

* retry failed tasks with delay

* set to 0 for direct tests
This commit is contained in:
Eric Liang
2019-12-12 17:12:38 -08:00
committed by GitHub
parent 9e9c524823
commit 5a5c94939f
6 changed files with 45 additions and 9 deletions
+7 -3
View File
@@ -2,7 +2,6 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import copy
import inspect
import logging
@@ -396,7 +395,7 @@ class ActorClass(object):
if kwargs is None:
kwargs = {}
if is_direct_call is None:
is_direct_call = bool(os.environ.get("RAY_FORCE_DIRECT"))
is_direct_call = ray_constants.direct_call_enabled()
if max_concurrency is None:
if is_asyncio:
max_concurrency = 100
@@ -756,7 +755,12 @@ def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources,
"methods in the `Checkpointable` interface.")
if max_reconstructions is None:
max_reconstructions = 0
if ray_constants.direct_call_enabled():
# Allow the actor creation task to be resubmitted automatically
# by default.
max_reconstructions = 3
else:
max_reconstructions = 0
if not (ray_constants.NO_RECONSTRUCTION <= max_reconstructions <=
ray_constants.INFINITE_RECONSTRUCTION):
+2 -2
View File
@@ -2,11 +2,11 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import logging
from functools import wraps
from ray import cloudpickle as pickle
from ray import ray_constants
from ray.function_manager import FunctionDescriptor
import ray.signature
@@ -95,7 +95,7 @@ class RemoteFunction(object):
return self._remote(args=args, kwargs=kwargs)
self.remote = _remote_proxy
self.direct_call_enabled = bool(os.environ.get("RAY_FORCE_DIRECT"))
self.direct_call_enabled = ray_constants.direct_call_enabled()
def __call__(self, *args, **kwargs):
raise Exception("Remote functions cannot be called directly. Instead "
+1 -1
View File
@@ -774,7 +774,7 @@ def test_exception_raised_when_actor_node_dies(ray_start_cluster_head):
cluster = ray_start_cluster_head
remote_node = cluster.add_node()
@ray.remote
@ray.remote(max_reconstructions=0)
class Counter(object):
def __init__(self):
self.x = 0
+2 -2
View File
@@ -342,7 +342,7 @@ def test_actor_worker_dying(ray_start_regular):
def test_actor_worker_dying_future_tasks(ray_start_regular):
@ray.remote
@ray.remote(max_reconstructions=0)
class Actor(object):
def getpid(self):
return os.getpid()
@@ -364,7 +364,7 @@ def test_actor_worker_dying_future_tasks(ray_start_regular):
def test_actor_worker_dying_nothing_in_progress(ray_start_regular):
@ray.remote
@ray.remote(max_reconstructions=0)
class Actor(object):
def getpid(self):
return os.getpid()