From 5a5c94939fcade8aab7b3607a982916ec63e3379 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 12 Dec 2019 17:12:38 -0800 Subject: [PATCH] [direct call] Retry failed tasks with delay (#6453) * retry failed tasks with delay * set to 0 for direct tests --- python/ray/actor.py | 10 +++++++--- python/ray/remote_function.py | 4 ++-- python/ray/tests/test_actor.py | 2 +- python/ray/tests/test_failure.py | 4 ++-- src/ray/core_worker/core_worker.cc | 25 ++++++++++++++++++++++++- src/ray/core_worker/core_worker.h | 9 +++++++++ 6 files changed, 45 insertions(+), 9 deletions(-) diff --git a/python/ray/actor.py b/python/ray/actor.py index ccfee8958..b49c25cbf 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -2,7 +2,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import os import copy import inspect import logging @@ -396,7 +395,7 @@ class ActorClass(object): if kwargs is None: kwargs = {} if is_direct_call is None: - is_direct_call = bool(os.environ.get("RAY_FORCE_DIRECT")) + is_direct_call = ray_constants.direct_call_enabled() if max_concurrency is None: if is_asyncio: max_concurrency = 100 @@ -756,7 +755,12 @@ def make_actor(cls, num_cpus, num_gpus, memory, object_store_memory, resources, "methods in the `Checkpointable` interface.") if max_reconstructions is None: - max_reconstructions = 0 + if ray_constants.direct_call_enabled(): + # Allow the actor creation task to be resubmitted automatically + # by default. + max_reconstructions = 3 + else: + max_reconstructions = 0 if not (ray_constants.NO_RECONSTRUCTION <= max_reconstructions <= ray_constants.INFINITE_RECONSTRUCTION): diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index ff6d9080c..bb869d4ee 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -2,11 +2,11 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import os import logging from functools import wraps from ray import cloudpickle as pickle +from ray import ray_constants from ray.function_manager import FunctionDescriptor import ray.signature @@ -95,7 +95,7 @@ class RemoteFunction(object): return self._remote(args=args, kwargs=kwargs) self.remote = _remote_proxy - self.direct_call_enabled = bool(os.environ.get("RAY_FORCE_DIRECT")) + self.direct_call_enabled = ray_constants.direct_call_enabled() def __call__(self, *args, **kwargs): raise Exception("Remote functions cannot be called directly. Instead " diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 6280e03e3..889fca8ee 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -774,7 +774,7 @@ def test_exception_raised_when_actor_node_dies(ray_start_cluster_head): cluster = ray_start_cluster_head remote_node = cluster.add_node() - @ray.remote + @ray.remote(max_reconstructions=0) class Counter(object): def __init__(self): self.x = 0 diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 414f158bd..5120c4af4 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -342,7 +342,7 @@ def test_actor_worker_dying(ray_start_regular): def test_actor_worker_dying_future_tasks(ray_start_regular): - @ray.remote + @ray.remote(max_reconstructions=0) class Actor(object): def getpid(self): return os.getpid() @@ -364,7 +364,7 @@ def test_actor_worker_dying_future_tasks(ray_start_regular): def test_actor_worker_dying_nothing_in_progress(ray_start_regular): - @ray.remote + @ray.remote(max_reconstructions=0) class Actor(object): def getpid(self): return os.getpid() diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index b19d04dfb..e520c8bc5 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -8,9 +8,13 @@ #include "ray/core_worker/context.h" #include "ray/core_worker/transport/direct_actor_transport.h" #include "ray/core_worker/transport/raylet_transport.h" +#include "ray/util/util.h" namespace { +// Duration between internal book-keeping heartbeats. +const int kInternalHeartbeatMillis = 1000; + void BuildCommonTaskSpec( ray::TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id, const TaskID ¤t_task_id, const int task_index, const TaskID &caller_id, @@ -78,6 +82,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, io_work_(io_service_), client_call_manager_(new rpc::ClientCallManager(io_service_)), heartbeat_timer_(io_service_), + internal_timer_(io_service_), core_worker_server_(WorkerTypeString(worker_type), 0 /* let grpc choose a port */), reference_counter_(std::make_shared()), task_execution_service_work_(task_execution_service_), @@ -158,6 +163,10 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, heartbeat_timer_.async_wait(boost::bind(&CoreWorker::ReportActiveObjectIDs, this)); } + internal_timer_.expires_from_now( + boost::asio::chrono::milliseconds(kInternalHeartbeatMillis)); + internal_timer_.async_wait(boost::bind(&CoreWorker::InternalHeartbeat, this)); + io_thread_ = std::thread(&CoreWorker::RunIOService, this); plasma_store_provider_.reset(new CoreWorkerPlasmaStoreProvider( @@ -170,7 +179,11 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, task_manager_.reset( new TaskManager(memory_store_, [this](const TaskSpecification &spec) { - RAY_CHECK_OK(direct_task_submitter_->SubmitTask(spec)); + // Retry after a delay to emulate the existing Raylet reconstruction + // behaviour. TODO(ekl) backoff exponentially. + RAY_LOG(ERROR) << "Will resubmit task after a 5 second delay: " + << spec.DebugString(); + to_resubmit_.push_back(std::make_pair(current_time_ms() + 5000, spec)); })); resolver_.reset(new LocalDependencyResolver(memory_store_)); @@ -292,6 +305,16 @@ void CoreWorker::ReportActiveObjectIDs() { heartbeat_timer_.async_wait(boost::bind(&CoreWorker::ReportActiveObjectIDs, this)); } +void CoreWorker::InternalHeartbeat() { + while (!to_resubmit_.empty() && current_time_ms() > to_resubmit_.front().first) { + RAY_CHECK_OK(direct_task_submitter_->SubmitTask(to_resubmit_.front().second)); + to_resubmit_.pop_front(); + } + internal_timer_.expires_at(internal_timer_.expiry() + + boost::asio::chrono::milliseconds(kInternalHeartbeatMillis)); + internal_timer_.async_wait(boost::bind(&CoreWorker::InternalHeartbeat, this)); +} + void CoreWorker::PromoteToPlasmaAndGetOwnershipInfo(const ObjectID &object_id, TaskID *owner_id, rpc::Address *owner_address) { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index b6c8a6e0f..23e2b7485 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -413,6 +413,9 @@ class CoreWorker { /// Send the list of active object IDs to the raylet. void ReportActiveObjectIDs(); + /// Heartbeat for internal bookkeeping. + void InternalHeartbeat(); + /// /// Private methods related to task submission. /// @@ -522,6 +525,9 @@ class CoreWorker { /// raylet. boost::asio::steady_timer heartbeat_timer_; + /// Timer for internal book-keeping. + boost::asio::steady_timer internal_timer_; + /// RPC server used to receive tasks to execute. rpc::GrpcServer core_worker_server_; @@ -612,6 +618,9 @@ class CoreWorker { // Interface that receives tasks from direct actor calls. std::unique_ptr direct_task_receiver_; + // Queue of tasks to resubmit when the specified time passes. + std::deque> to_resubmit_; + friend class CoreWorkerTest; };