Implement Detached Actor (#6036)

* Arg propagation works

* Implement persistent actor

* Add doc

* Initialize is_persistent_

* Rename persistent->detached

* Address comment

* Make test passes

* Address comment

* Python2 compatiblity

* Fix naming, py2

* Lint
This commit is contained in:
Simon Mo
2019-11-01 10:28:23 -07:00
committed by GitHub
parent f7455839bf
commit 7f5b3502da
16 changed files with 164 additions and 27 deletions
+40
View File
@@ -235,3 +235,43 @@ To get information about the current available resource capacity of your cluster
.. autofunction:: ray.available_resources
:noindex:
Detached Actors
-----------------------------------
When original actor handles goes out of scope or the driver that originally
created the actor exits, ray will clean up the actor by default. If you want
to make sure the actor is kept alive, you can use
``_remote(name="some_name", detached=True)`` to keep the actor alive after
the driver exits. The actor will have a globally unique name and can be
accessed across different drivers.
For example, you can instantiate and register a persistent actor as follows:
.. code-block:: python
counter = Counter._remote(name="CounterActor", detached=True)
The CounterActor will be kept alive even after the driver running above script
exits. Therefore it is possible to run the following script in a different
driver:
.. code-block:: python
counter = ray.experimental.get_actor("CounterActor")
print(ray.get(counter.get_counter.remote()))
Note that just creating a named actor is allowed, this actor will be cleaned
up after driver exits:
.. code-block:: python
Counter._remote(name="CounterActor")
However, creating a detached actor without name is not allowed because there
will be no way to retrieve the actor handle and the resource is leaked.
.. code-block:: python
# Can't do this!
Counter._remote(detached=True)
+4 -2
View File
@@ -948,7 +948,8 @@ cdef class CoreWorker:
uint64_t max_reconstructions,
resources,
placement_resources,
c_bool is_direct_call):
c_bool is_direct_call,
c_bool is_detached):
cdef:
CRayFunction ray_function
c_vector[CTaskArg] args_vector
@@ -969,7 +970,8 @@ cdef class CoreWorker:
ray_function, args_vector,
CActorCreationOptions(
max_reconstructions, is_direct_call, c_resources,
c_placement_resources, dynamic_worker_options),
c_placement_resources, dynamic_worker_options,
is_detached),
&c_actor_id))
return ActorID(c_actor_id.Binary())
+27 -2
View File
@@ -307,7 +307,9 @@ class ActorClass(object):
memory=None,
object_store_memory=None,
resources=None,
is_direct_call=None):
is_direct_call=None,
name=None,
detached=False):
"""Create an actor.
This method allows more flexibility than the remote method because
@@ -325,6 +327,9 @@ class ActorClass(object):
resources: The custom resources required by the actor creation
task.
is_direct_call: Use direct actor calls.
name: The globally unique name for the actor.
detached: Whether the actor should be kept alive after driver
exits.
Returns:
A handle to the newly created actor.
@@ -341,6 +346,23 @@ class ActorClass(object):
meta = self.__ray_metadata__
if detached and name is None:
raise Exception("Detached actors must be named. "
"Please use Actor._remote(name='some_name') "
"to associate the name.")
# Check whether the name is already taken.
if name is not None:
try:
ray.experimental.get_actor(name)
except ValueError: # name is not taken, expected.
pass
else:
raise ValueError(
"The name {name} is already taken. Please use "
"a different name or get existing actor using "
"ray.experimental.get_actor('{name}')".format(name=name))
# Set the actor's default resources if not already set. First three
# conditions are to check that no resources were specified in the
# decorator. Last three conditions are to check that no resources were
@@ -403,7 +425,7 @@ class ActorClass(object):
actor_id = worker.core_worker.create_actor(
function_descriptor.get_function_descriptor_list(),
creation_args, meta.max_reconstructions, resources,
actor_placement_resources, is_direct_call)
actor_placement_resources, is_direct_call, detached)
actor_handle = ActorHandle(
actor_id,
@@ -417,6 +439,9 @@ class ActorClass(object):
worker.current_session_and_job,
original_handle=True)
if name is not None:
ray.experimental.register_actor(name, actor_handle)
return actor_handle
+2 -1
View File
@@ -207,7 +207,8 @@ cdef extern from "ray/core_worker/common.h" nogil:
uint64_t max_reconstructions, c_bool is_direct_call,
const unordered_map[c_string, double] &resources,
const unordered_map[c_string, double] &placement_resources,
const c_vector[c_string] &dynamic_worker_options)
const c_vector[c_string] &dynamic_worker_options,
c_bool is_detached)
cdef extern from "ray/gcs/gcs_client_interface.h" nogil:
cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions":
+37 -6
View File
@@ -21,12 +21,9 @@ import ray.ray_constants as ray_constants
import ray.tests.utils
import ray.tests.cluster_utils
from ray.tests.conftest import generate_internal_config_map
from ray.tests.utils import (
relevant_errors,
wait_for_condition,
wait_for_errors,
wait_for_pid_to_exit,
)
from ray.tests.utils import (relevant_errors, wait_for_condition,
wait_for_errors, wait_for_pid_to_exit,
run_string_as_driver)
@pytest.fixture
@@ -2816,3 +2813,37 @@ def test_ray_wait_dead_actor(ray_start_cluster):
failure_detected = False
while not failure_detected:
failure_detected = ray.get(parent_actor.wait.remote())
def test_detached_actor(ray_start_regular):
@ray.remote
class DetachedActor(object):
def ping(self):
return "pong"
with pytest.raises(Exception, match="Detached actors must be named"):
DetachedActor._remote(detached=True)
with pytest.raises(ValueError, match="Please use a different name"):
_ = DetachedActor._remote(name="d_actor")
DetachedActor._remote(name="d_actor")
redis_address = ray_start_regular["redis_address"]
actor_name = "DetachedActor"
driver_script = """
import ray
ray.init(address="{}")
@ray.remote
class DetachedActor(object):
def ping(self):
return "pong"
actor = DetachedActor._remote(name="{}", detached=True)
ray.get(actor.ping.remote())
""".format(redis_address, actor_name)
run_string_as_driver(driver_script)
detached_actor = ray.experimental.get_actor(actor_name)
assert ray.get(detached_actor.ping.remote()) == "pong"
+7 -1
View File
@@ -189,6 +189,11 @@ bool TaskSpecification::IsDirectCall() const {
return message_->actor_creation_task_spec().is_direct_call();
}
bool TaskSpecification::IsDetachedActor() const {
RAY_CHECK(IsActorCreationTask());
return message_->actor_creation_task_spec().is_detached();
}
std::string TaskSpecification::DebugString() const {
std::ostringstream stream;
stream << "Type=" << TaskType_Name(message_->type())
@@ -213,7 +218,8 @@ std::string TaskSpecification::DebugString() const {
// Print actor creation task spec.
stream << ", actor_creation_task_spec={actor_id=" << ActorCreationId()
<< ", max_reconstructions=" << MaxActorReconstructions()
<< ", is_direct_call=" << IsDirectCall() << "}";
<< ", is_direct_call=" << IsDirectCall()
<< ", is_detached=" << IsDetachedActor() << "}";
} else if (IsActorTask()) {
// Print actor task spec.
stream << ", actor_task_spec={actor_id=" << ActorId()
+2
View File
@@ -142,6 +142,8 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
bool IsDirectCall() const;
bool IsDetachedActor() const;
ObjectID ActorDummyObject() const;
std::string DebugString() const;
+2 -1
View File
@@ -93,7 +93,7 @@ class TaskSpecBuilder {
TaskSpecBuilder &SetActorCreationTaskSpec(
const ActorID &actor_id, uint64_t max_reconstructions = 0,
const std::vector<std::string> &dynamic_worker_options = {},
bool is_direct_call = false) {
bool is_direct_call = false, bool is_detached = false) {
message_->set_type(TaskType::ACTOR_CREATION_TASK);
auto actor_creation_spec = message_->mutable_actor_creation_task_spec();
actor_creation_spec->set_actor_id(actor_id.Binary());
@@ -102,6 +102,7 @@ class TaskSpecBuilder {
actor_creation_spec->add_dynamic_worker_options(option);
}
actor_creation_spec->set_is_direct_call(is_direct_call);
actor_creation_spec->set_is_detached(is_detached);
return *this;
}
+7 -2
View File
@@ -99,12 +99,14 @@ struct ActorCreationOptions {
ActorCreationOptions(uint64_t max_reconstructions, bool is_direct_call,
const std::unordered_map<std::string, double> &resources,
const std::unordered_map<std::string, double> &placement_resources,
const std::vector<std::string> &dynamic_worker_options)
const std::vector<std::string> &dynamic_worker_options,
bool is_detached)
: max_reconstructions(max_reconstructions),
is_direct_call(is_direct_call),
resources(resources),
placement_resources(placement_resources),
dynamic_worker_options(dynamic_worker_options) {}
dynamic_worker_options(dynamic_worker_options),
is_detached(is_detached){};
/// Maximum number of times that the actor should be reconstructed when it dies
/// unexpectedly. It must be non-negative. If it's 0, the actor won't be reconstructed.
@@ -119,6 +121,9 @@ struct ActorCreationOptions {
/// The dynamic options used in the worker command when starting a worker process for
/// an actor creation task.
const std::vector<std::string> dynamic_worker_options;
/// Whether to keep the actor persistent after driver exit. If true, this will set
/// the worker to not be destroyed after the driver shutdown.
const bool is_detached = false;
};
} // namespace ray
+2 -1
View File
@@ -482,7 +482,8 @@ Status CoreWorker::CreateActor(const RayFunction &function,
actor_creation_options.placement_resources, TaskTransportType::RAYLET, &return_ids);
builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_reconstructions,
actor_creation_options.dynamic_worker_options,
actor_creation_options.is_direct_call);
actor_creation_options.is_direct_call,
actor_creation_options.is_detached);
std::unique_ptr<ActorHandle> actor_handle(new ActorHandle(
actor_id, job_id, /*actor_cursor=*/return_ids[0], function.GetLanguage(),
+4 -3
View File
@@ -61,7 +61,8 @@ ActorID CreateActorHelper(CoreWorker &worker,
args.emplace_back(TaskArg::PassByValue(std::make_shared<RayObject>(buffer, nullptr)));
ActorCreationOptions actor_options{
max_reconstructions, is_direct_call, resources, resources, {}};
max_reconstructions, is_direct_call, resources, resources, {},
/*is_detached*/ false};
// Create an actor.
ActorID actor_id;
@@ -489,8 +490,8 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
args.emplace_back(TaskArg::PassByValue(std::make_shared<RayObject>(buffer, nullptr)));
std::unordered_map<std::string, double> resources;
ActorCreationOptions actor_options{
0, /*is_direct_call*/ true, resources, resources, {}};
ActorCreationOptions actor_options{0, /*is_direct_call*/ true, resources, resources,
{}, /*is_detached*/ false};
const auto job_id = NextJobId();
ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1), job_id,
ObjectID::FromRandom(), function.GetLanguage(), true,
+2
View File
@@ -95,6 +95,8 @@ message ActorCreationTaskSpec {
repeated string dynamic_worker_options = 4;
// Whether direct actor call is used.
bool is_direct_call = 5;
// Whether the actor is persistent
bool is_detached = 6;
}
// Task spec of an actor task.
+2
View File
@@ -111,6 +111,8 @@ message ActorTableData {
int32 port = 10;
// Whether direct actor call is used.
bool is_direct_call = 11;
// Whether the actor is persistent.
bool is_detached = 12;
}
message ErrorTableData {
+15 -7
View File
@@ -275,13 +275,15 @@ void NodeManager::HandleJobTableUpdate(const JobID &id,
// Kill all the workers. The actual cleanup for these workers is done
// later when we receive the DisconnectClient message from them.
for (const auto &worker : workers) {
// Clean up any open ray.wait calls that the worker made.
task_dependency_manager_.UnsubscribeWaitDependencies(worker->WorkerId());
// Mark the worker as dead so further messages from it are ignored
// (except DisconnectClient).
worker->MarkDead();
// Then kill the worker process.
KillWorker(worker);
if (!worker->IsDetachedActor()) {
// Clean up any open ray.wait calls that the worker made.
task_dependency_manager_.UnsubscribeWaitDependencies(worker->WorkerId());
// Mark the worker as dead so further messages from it are ignored
// (except DisconnectClient).
worker->MarkDead();
// Then kill the worker process.
KillWorker(worker);
}
}
// Remove all tasks for this job from the scheduling queues, mark
@@ -2015,6 +2017,7 @@ std::shared_ptr<ActorTableData> NodeManager::CreateActorTableDataFromCreationTas
// of remaining reconstructions is the max.
actor_info_ptr->set_remaining_reconstructions(task_spec.MaxActorReconstructions());
actor_info_ptr->set_is_direct_call(task_spec.IsDirectCall());
actor_info_ptr->set_is_detached(task_spec.IsDetachedActor());
} else {
// If we've already seen this actor, it means that this actor was reconstructed.
// Thus, its previous state must be RECONSTRUCTING.
@@ -2066,6 +2069,11 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) {
if (task_spec.IsActorCreationTask()) {
// This was an actor creation task. Convert the worker to an actor.
worker.AssignActorId(actor_id);
if (task_spec.IsDetachedActor()) {
worker.MarkDetachedActor();
}
// Lookup the parent actor id.
auto parent_task_id = task_spec.ParentTaskId();
int port = worker.Port();
+6 -1
View File
@@ -22,7 +22,8 @@ Worker::Worker(const WorkerID &worker_id, pid_t pid, const Language &language, i
connection_(connection),
dead_(false),
blocked_(false),
client_call_manager_(client_call_manager) {
client_call_manager_(client_call_manager),
is_detached_actor_(false) {
if (port_ > 0) {
rpc_client_ = std::unique_ptr<rpc::WorkerTaskClient>(
new rpc::WorkerTaskClient("127.0.0.1", port_, client_call_manager_));
@@ -80,6 +81,10 @@ void Worker::AssignActorId(const ActorID &actor_id) {
const ActorID &Worker::GetActorId() const { return actor_id_; }
void Worker::MarkDetachedActor() { is_detached_actor_ = true; }
bool Worker::IsDetachedActor() const { return is_detached_actor_; }
const std::shared_ptr<LocalClientConnection> Worker::Connection() const {
return connection_;
}
+5
View File
@@ -46,6 +46,8 @@ class Worker {
const JobID &GetAssignedJobId() const;
void AssignActorId(const ActorID &actor_id);
const ActorID &GetActorId() const;
void MarkDetachedActor();
bool IsDetachedActor() const;
const std::shared_ptr<LocalClientConnection> Connection() const;
const ResourceIdSet &GetLifetimeResourceIds() const;
@@ -102,6 +104,9 @@ class Worker {
rpc::ClientCallManager &client_call_manager_;
/// The rpc client to send tasks to this worker.
std::unique_ptr<rpc::WorkerTaskClient> rpc_client_;
/// Whether the worker is detached. This is applies when the worker is actor.
/// Detached actor means the actor's creator can exit without killing this actor.
bool is_detached_actor_;
/// The rpc client to send tasks to the direct actor service.
std::unique_ptr<rpc::DirectActorClient> direct_rpc_client_;
};