diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index caecae452..a99195793 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -17,6 +17,7 @@ import ray import ray.test_utils import ray.cluster_utils from ray.test_utils import run_string_as_driver +from ray.experimental.internal_kv import _internal_kv_get, _internal_kv_put def test_actor_init_error_propagated(ray_start_regular): @@ -1442,6 +1443,58 @@ def test_kill(ray_start_regular): ray.get(result, timeout=1) +# This test verifies actor creation task failure will not +# hang the caller. +def test_actor_creation_task_crash(ray_start_regular): + # Test actor death in constructor. + @ray.remote(max_reconstructions=0) + class Actor(object): + def __init__(self): + print("crash") + os._exit(0) + + def f(self): + return "ACTOR OK" + + # Verify an exception is thrown. + a = Actor.remote() + with pytest.raises(ray.exceptions.RayActorError): + ray.get(a.f.remote()) + + # Test an actor can be reconstructed successfully + # afte it dies in its constructor. + @ray.remote(max_reconstructions=3) + class ReconstructableActor(object): + def __init__(self): + count = self.get_count() + count += 1 + # Make it die for the first 2 times. + if count < 3: + self.set_count(count) + print("crash: " + str(count)) + os._exit(0) + else: + print("no crash") + + def f(self): + return "ACTOR OK" + + def get_count(self): + value = _internal_kv_get("count") + if value is None: + count = 0 + else: + count = int(value) + return count + + def set_count(self, count): + _internal_kv_put("count", count, True) + + # Verify we can get the object successfully. + ra = ReconstructableActor.remote() + ray.get(ra.f.remote()) + + if __name__ == "__main__": import pytest import sys diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index 6ee5fa5ab..c50619c46 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -3,19 +3,19 @@ namespace ray { -void ActorManager::PublishCreatedActor(const TaskSpecification &actor_creation_task, - const rpc::Address &address) { - auto actor_id = actor_creation_task.ActorCreationId(); - auto data = gcs::CreateActorTableData(actor_creation_task, address, - gcs::ActorTableData::ALIVE, 0); - RAY_CHECK_OK(global_actor_table_.Append(JobID::Nil(), actor_id, data, nullptr)); -} - void ActorManager::PublishTerminatedActor(const TaskSpecification &actor_creation_task) { auto actor_id = actor_creation_task.ActorCreationId(); auto data = gcs::CreateActorTableData(actor_creation_task, rpc::Address(), gcs::ActorTableData::DEAD, 0); - RAY_CHECK_OK(global_actor_table_.Append(JobID::Nil(), actor_id, data, nullptr)); + + auto update_callback = [actor_id](Status status) { + if (!status.ok()) { + // Only one node at a time should succeed at creating or updating the actor. + RAY_LOG(ERROR) << "Failed to update state to DEAD for actor " << actor_id + << ", error: " << status.ToString(); + } + }; + RAY_CHECK_OK(actor_accessor_.AsyncRegister(data, update_callback)); } } // namespace ray diff --git a/src/ray/core_worker/actor_manager.h b/src/ray/core_worker/actor_manager.h index e5ca24b4b..f87301631 100644 --- a/src/ray/core_worker/actor_manager.h +++ b/src/ray/core_worker/actor_manager.h @@ -9,30 +9,26 @@ namespace ray { // Interface for testing. class ActorManagerInterface { public: - virtual void PublishCreatedActor(const TaskSpecification &actor_creation_task, - const rpc::Address &address) = 0; - virtual void PublishTerminatedActor(const TaskSpecification &actor_creation_task) = 0; virtual ~ActorManagerInterface() {} }; /// Class to manage lifetimes of actors that we create (actor children). +/// Currently this class is only used to publish actor DEAD event +/// for actor creation task failures. All other cases are managed +/// by raylet. class ActorManager : public ActorManagerInterface { public: - ActorManager(gcs::DirectActorTable &global_actor_table) - : global_actor_table_(global_actor_table) {} - - /// Called when an actor creation task that we submitted finishes. - void PublishCreatedActor(const TaskSpecification &actor_creation_task, - const rpc::Address &address) override; + ActorManager(gcs::ActorInfoAccessor &actor_accessor) + : actor_accessor_(actor_accessor) {} /// Called when an actor that we own can no longer be restarted. void PublishTerminatedActor(const TaskSpecification &actor_creation_task) override; private: /// Global database of actors. - gcs::DirectActorTable &global_actor_table_; + gcs::ActorInfoAccessor &actor_accessor_; }; } // namespace ray diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 0a3783d67..ba72e33c1 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -102,6 +102,8 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, gcs_client_ = std::make_shared(gcs_options); RAY_CHECK_OK(gcs_client_->Connect(io_service_)); + actor_manager_ = std::unique_ptr(new ActorManager(gcs_client_->Actors())); + // Initialize profiler. profiler_ = std::make_shared(worker_context_, node_ip_address, io_service_, gcs_client_); @@ -187,7 +189,8 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, ref_counting_enabled ? reference_counter_ : nullptr, local_raylet_client_)); task_manager_.reset(new TaskManager( - memory_store_, reference_counter_, nullptr, [this](const TaskSpecification &spec) { + memory_store_, reference_counter_, actor_manager_, + [this](const TaskSpecification &spec) { // Retry after a delay to emulate the existing Raylet reconstruction // behaviour. TODO(ekl) backoff exponentially. RAY_LOG(ERROR) << "Will resubmit task after a 5 second delay: " diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 63c24f3bd..7d3cf9a5e 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -617,6 +617,9 @@ class CoreWorker { // Tracks the currently pending tasks. std::shared_ptr task_manager_; + // Interface for publishing actor death event for actor creation failure. + std::shared_ptr actor_manager_; + // Interface to submit tasks directly to other actors. std::unique_ptr direct_actor_submitter_; diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 323c82159..18a723406 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -203,6 +203,12 @@ void TaskManager::MarkPendingTaskFailed(const TaskID &task_id, /*transport_type=*/static_cast(TaskTransportType::DIRECT)); RAY_CHECK_OK(in_memory_store_->Put(RayObject(error_type), object_id)); } + + if (spec.IsActorCreationTask()) { + // Publish actor death if actor creation task failed after + // a number of retries. + actor_manager_->PublishTerminatedActor(spec); + } } TaskSpecification TaskManager::GetTaskSpec(const TaskID &task_id) const { diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index b9d37b3e4..d1fd1928d 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -21,11 +21,6 @@ TaskSpecification CreateTaskHelper(uint64_t num_returns, } class MockActorManager : public ActorManagerInterface { - void PublishCreatedActor(const TaskSpecification &actor_creation_task, - const rpc::Address &address) override { - num_publishes += 1; - } - void PublishTerminatedActor(const TaskSpecification &actor_creation_task) override { num_terminations += 1; }