Clean up actor state from the GCS (#8261)

* parametrize test

* Regression test and logging

* Test no restart after actor deletion

* Unit tests

* Refactor to subscribe to and lookup from worker failure table

* Refactor ActorManager to remove dependencies

* Revert "Regression test and logging"

This reverts commit 835e1a9091b51ca8efb00392d4cc4a665145de24.

* Revert "parametrize test"

This reverts commit f31272082831ba1a494816dd5511d87b24eca4c9.

* Revert "Test no restart after actor deletion"

This reverts commit 114a83de14329aa6ab787c80cd5757cf074a9072.

* doc

* merge

* Revert "Refactor to subscribe to and lookup from worker failure table"

This reverts commit 6aa13a05178d0b9aa1db9dee5c978c911b74fa3a.

* Revert "Revert "Test no restart after actor deletion""

This reverts commit 1bd92d09172aa8ab42632551cf9c56463f9598fe.

* Revert "Revert "parametrize test""

This reverts commit 639ba4d3b02167fb2b05e9878f9aa600bcec95b3.

* Revert "Revert "Regression test and logging""

This reverts commit f18b5f0db699a23cbccde32789e3639425e99ca4.

* Clean up actors that have gone out of scope

* Use actor ID instead of shared_ptr

* Clean up actors owned by dead workers

* Use actor ID instead of shared_ptr

* TODO and lint

* Fix unit tests

* Add unit tests for supervision and docs

* xx

* Fix tests

* Fix tests

* fix build
This commit is contained in:
Stephanie Wang
2020-05-09 18:43:49 -07:00
committed by GitHub
parent 4421f3a000
commit 3a25f5f5b4
18 changed files with 672 additions and 109 deletions
+26
View File
@@ -699,6 +699,32 @@ def test_use_actor_within_actor(ray_start_10_cpus):
assert ray.get(actor2.get_values.remote(5)) == (3, 4)
def test_use_actor_twice(ray_start_10_cpus):
# Make sure we can call the same actor using different refs.
@ray.remote
class Actor1:
def __init__(self):
self.count = 0
def inc(self):
self.count += 1
return self.count
@ray.remote
class Actor2:
def __init__(self):
pass
def inc(self, handle):
return ray.get(handle.inc.remote())
a = Actor1.remote()
a2 = Actor2.remote()
assert ray.get(a2.inc.remote(a)) == 1
assert ray.get(a2.inc.remote(a)) == 2
def test_define_actor_within_remote_function(ray_start_10_cpus):
# Make sure we can define and actors within remote funtions.
+41
View File
@@ -110,6 +110,47 @@ def test_actor_lifetime_load_balancing(ray_start_cluster):
ray.get([actor.ping.remote() for actor in actors])
@pytest.mark.parametrize(
"ray_start_regular", [{
"resources": {
"actor": 1
},
"num_cpus": 2,
}],
indirect=True)
def test_deleted_actor_no_restart(ray_start_regular):
@ray.remote(resources={"actor": 1}, max_reconstructions=3)
class Actor:
def method(self):
return 1
def getpid(self):
return os.getpid()
@ray.remote
def f(actor, signal):
ray.get(signal.wait.remote())
return ray.get(actor.method.remote())
signal = ray.test_utils.SignalActor.remote()
a = Actor.remote()
pid = ray.get(a.getpid.remote())
# Pass the handle to another task that cannot run yet.
x_id = f.remote(a, signal)
# Delete the original handle. The actor should not get killed yet.
del a
# Once the task finishes, the actor process should get killed.
ray.get(signal.send.remote())
assert ray.get(x_id) == 1
ray.test_utils.wait_for_pid_to_exit(pid)
# Create another actor with the same resource requirement to make sure the
# old one was not restarted.
a = Actor.remote()
pid = ray.get(a.getpid.remote())
def test_exception_raised_when_actor_node_dies(ray_start_cluster_head):
cluster = ray_start_cluster_head
remote_node = cluster.add_node()
+21 -5
View File
@@ -1051,7 +1051,10 @@ def test_serialized_id(ray_start_cluster):
ray.get(get.remote([obj], True))
def test_fate_sharing(ray_start_cluster):
@pytest.mark.parametrize("use_actors,node_failure",
[(False, False), (False, True), (True, False),
(True, True)])
def test_fate_sharing(ray_start_cluster, use_actors, node_failure):
config = json.dumps({
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
@@ -1074,6 +1077,9 @@ def test_fate_sharing(ray_start_cluster):
def probe():
return
# TODO(swang): This test does not pass if max_reconstructions > 0 for the
# raylet codepath. Add this parameter once the GCS actor service is enabled
# by default.
@ray.remote
class Actor(object):
def __init__(self):
@@ -1121,10 +1127,20 @@ def test_fate_sharing(ray_start_cluster):
assert wait_for_condition(child_resource_available)
return node_to_kill
test_process_failure(use_actors=True)
test_process_failure(use_actors=False)
node_to_kill = test_node_failure(node_to_kill, use_actors=True)
node_to_kill = test_node_failure(node_to_kill, use_actors=False)
if node_failure:
test_node_failure(node_to_kill, use_actors)
else:
test_process_failure(use_actors)
ray.state.state._check_connected()
keys = [
key for r in ray.state.state.redis_clients
for key in r.keys("WORKER_FAILURE*")
]
if node_failure:
assert len(keys) <= 1, len(keys)
else:
assert len(keys) <= 2, len(keys)
if __name__ == "__main__":
+74 -20
View File
@@ -1151,6 +1151,16 @@ Status CoreWorker::CreateActor(const RayFunction &function,
actor_creation_options.max_concurrency, actor_creation_options.is_detached,
actor_creation_options.name, actor_creation_options.is_asyncio, extension_data);
// Add the actor handle before we submit the actor creation task, since the
// actor handle must be in scope by the time the GCS sends the
// WaitForActorOutOfScopeRequest.
std::unique_ptr<ActorHandle> actor_handle(new ActorHandle(
actor_id, GetCallerId(), rpc_address_, job_id, /*actor_cursor=*/return_ids[0],
function.GetLanguage(), function.GetFunctionDescriptor(), extension_data));
RAY_CHECK(AddActorHandle(std::move(actor_handle),
/*is_owner_handle=*/!actor_creation_options.is_detached))
<< "Actor " << actor_id << " already exists";
*return_actor_id = actor_id;
TaskSpecification task_spec = builder.Build();
Status status;
@@ -1163,12 +1173,6 @@ Status CoreWorker::CreateActor(const RayFunction &function,
actor_creation_options.max_reconstructions));
status = direct_task_submitter_->SubmitTask(task_spec);
}
std::unique_ptr<ActorHandle> actor_handle(new ActorHandle(
actor_id, GetCallerId(), rpc_address_, job_id, /*actor_cursor=*/return_ids[0],
function.GetLanguage(), function.GetFunctionDescriptor(), extension_data));
RAY_CHECK(AddActorHandle(std::move(actor_handle),
/*is_owner_handle=*/!actor_creation_options.is_detached))
<< "Actor " << actor_id << " already exists";
return status;
}
@@ -1283,6 +1287,13 @@ bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle,
bool is_owner_handle) {
const auto &actor_id = actor_handle->GetActorID();
const auto actor_creation_return_id = ObjectID::ForActorHandle(actor_id);
if (is_owner_handle) {
reference_counter_->AddOwnedObject(actor_creation_return_id,
/*inner_ids=*/{}, GetCallerId(), rpc_address_,
CurrentCallSite(), -1,
/*is_reconstructable=*/true);
}
reference_counter_->AddLocalReference(actor_creation_return_id, CurrentCallSite());
bool inserted;
@@ -1334,16 +1345,31 @@ bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle,
RAY_CHECK(reference_counter_->SetDeleteCallback(
actor_creation_return_id,
[this, actor_id, is_owner_handle](const ObjectID &object_id) {
// TODO(swang): Unsubscribe from the actor table.
// TODO(swang): Remove the actor handle entry.
// If we own the actor and the actor handle is no longer in scope,
// terminate the actor.
if (is_owner_handle) {
RAY_LOG(INFO) << "Owner's handle and creation ID " << object_id
<< " has gone out of scope, sending message to actor "
<< actor_id << " to do a clean exit.";
RAY_CHECK_OK(
KillActor(actor_id, /*force_kill=*/false, /*no_reconstruction=*/false));
// If we own the actor and the actor handle is no longer in scope,
// terminate the actor. We do not do this if the GCS service is
// enabled since then the GCS will terminate the actor for us.
if (!(RayConfig::instance().gcs_service_enabled() &&
RayConfig::instance().gcs_actor_service_enabled())) {
RAY_LOG(INFO) << "Owner's handle and creation ID " << object_id
<< " has gone out of scope, sending message to actor "
<< actor_id << " to do a clean exit.";
RAY_CHECK_OK(
KillActor(actor_id, /*force_kill=*/false, /*no_reconstruction=*/false));
}
}
absl::MutexLock lock(&actor_handles_mutex_);
// TODO(swang): Erase the actor handle once all refs to the actor
// have gone out of scope. We cannot erase it here in case the
// language frontend receives another ref to the same actor. In this
// case, we must remember the last task counter that we sent to the
// actor.
// TODO(swang): Unsubscribe from the actor table once all local refs
// to the actor have gone out of scope.
auto callback = actor_out_of_scope_callbacks_.extract(actor_id);
if (callback) {
callback.mapped()(actor_id);
}
}));
}
@@ -1611,11 +1637,13 @@ void CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec,
auto resource_ids = std::make_shared<ResourceMappingType>();
auto return_objects = std::vector<std::shared_ptr<RayObject>>();
auto borrowed_refs = ReferenceCounter::ReferenceTableProto();
for (size_t i = 0; i < task_spec.NumReturns(); i++) {
reference_counter_->AddOwnedObject(task_spec.ReturnId(i, TaskTransportType::DIRECT),
/*inner_ids=*/{}, GetCallerId(), rpc_address_,
CurrentCallSite(), -1,
/*is_reconstructable=*/false);
if (!task_spec.IsActorCreationTask()) {
for (size_t i = 0; i < task_spec.NumReturns(); i++) {
reference_counter_->AddOwnedObject(task_spec.ReturnId(i, TaskTransportType::DIRECT),
/*inner_ids=*/{}, GetCallerId(), rpc_address_,
CurrentCallSite(), -1,
/*is_reconstructable=*/false);
}
}
auto old_id = GetActorId();
SetActorId(actor_id);
@@ -1786,6 +1814,32 @@ void CoreWorker::HandleGetObjectStatus(const rpc::GetObjectStatusRequest &reques
}
}
void CoreWorker::HandleWaitForActorOutOfScope(
const rpc::WaitForActorOutOfScopeRequest &request,
rpc::WaitForActorOutOfScopeReply *reply, rpc::SendReplyCallback send_reply_callback) {
if (HandleWrongRecipient(WorkerID::FromBinary(request.intended_worker_id()),
send_reply_callback)) {
return;
}
// Send a response to trigger cleaning up the actor state once the handle is
// no longer in scope.
auto respond = [send_reply_callback](const ActorID &actor_id) {
RAY_LOG(DEBUG) << "Replying to HandleWaitForActorOutOfScope for " << actor_id;
send_reply_callback(Status::OK(), nullptr, nullptr);
};
const auto actor_id = ActorID::FromBinary(request.actor_id());
RAY_LOG(DEBUG) << "Received HandleWaitForActorOutOfScope for " << actor_id;
absl::MutexLock lock(&actor_handles_mutex_);
auto it = actor_handles_.find(actor_id);
if (it == actor_handles_.end()) {
respond(actor_id);
} else {
RAY_CHECK(actor_out_of_scope_callbacks_.emplace(actor_id, std::move(respond)).second);
}
}
void CoreWorker::HandleWaitForObjectEviction(
const rpc::WaitForObjectEvictionRequest &request,
rpc::WaitForObjectEvictionReply *reply, rpc::SendReplyCallback send_reply_callback) {
+12
View File
@@ -697,6 +697,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
rpc::GetObjectStatusReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Implements gRPC server handler.
void HandleWaitForActorOutOfScope(const rpc::WaitForActorOutOfScopeRequest &request,
rpc::WaitForActorOutOfScopeReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Implements gRPC server handler.
void HandleWaitForObjectEviction(const rpc::WaitForObjectEvictionRequest &request,
rpc::WaitForObjectEvictionReply *reply,
@@ -995,6 +1000,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Manages recovery of objects stored in remote plasma nodes.
std::unique_ptr<ObjectRecoveryManager> object_recovery_manager_;
// TODO(swang): Refactor to merge actor_handles_mutex_ and all fields that it
// protects into the ActorManager.
/// The `actor_handles_` field could be mutated concurrently due to multi-threading, we
/// need a mutex to protect it.
mutable absl::Mutex actor_handles_mutex_;
@@ -1003,6 +1010,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
absl::flat_hash_map<ActorID, std::unique_ptr<ActorHandle>> actor_handles_
GUARDED_BY(actor_handles_mutex_);
/// Map from actor ID to a callback to call when all local handles to that
/// actor have gone out of scpoe.
absl::flat_hash_map<ActorID, std::function<void(const ActorID &)>>
actor_out_of_scope_callbacks_ GUARDED_BY(actor_handles_mutex_);
///
/// Fields related to task execution.
///
+11 -9
View File
@@ -58,15 +58,17 @@ void TaskManager::AddPendingTask(const TaskID &caller_id,
if (spec.IsActorTask()) {
num_returns--;
}
for (size_t i = 0; i < num_returns; i++) {
// We pass an empty vector for inner IDs because we do not know the return
// value of the task yet. If the task returns an ID(s), the worker will
// notify us via the WaitForRefRemoved RPC that we are now a borrower for
// the inner IDs. Note that this RPC can be received *before* the
// PushTaskReply.
reference_counter_->AddOwnedObject(spec.ReturnId(i, TaskTransportType::DIRECT),
/*inner_ids=*/{}, caller_id, caller_address,
call_site, -1, /*is_reconstructable=*/true);
if (!spec.IsActorCreationTask()) {
for (size_t i = 0; i < num_returns; i++) {
// We pass an empty vector for inner IDs because we do not know the return
// value of the task yet. If the task returns an ID(s), the worker will
// notify us via the WaitForRefRemoved RPC that we are now a borrower for
// the inner IDs. Note that this RPC can be received *before* the
// PushTaskReply.
reference_counter_->AddOwnedObject(spec.ReturnId(i, TaskTransportType::DIRECT),
/*inner_ids=*/{}, caller_id, caller_address,
call_site, -1, /*is_reconstructable=*/true);
}
}
{
@@ -560,18 +560,13 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToResources(
Status ServiceBasedNodeInfoAccessor::AsyncReportHeartbeat(
const std::shared_ptr<rpc::HeartbeatTableData> &data_ptr,
const StatusCallback &callback) {
ClientID node_id = ClientID::FromBinary(data_ptr->client_id());
RAY_LOG(DEBUG) << "Reporting heartbeat, node id = " << node_id;
rpc::ReportHeartbeatRequest request;
request.mutable_heartbeat()->CopyFrom(*data_ptr);
client_impl_->GetGcsRpcClient().ReportHeartbeat(
request,
[node_id, callback](const Status &status, const rpc::ReportHeartbeatReply &reply) {
request, [callback](const Status &status, const rpc::ReportHeartbeatReply &reply) {
if (callback) {
callback(status);
}
RAY_LOG(DEBUG) << "Finished reporting heartbeat, status = " << status
<< ", node id = " << node_id;
});
return Status::OK();
}
+179 -8
View File
@@ -42,6 +42,18 @@ WorkerID GcsActor::GetWorkerID() const {
return WorkerID::FromBinary(address.worker_id());
}
WorkerID GcsActor::GetOwnerID() const {
return WorkerID::FromBinary(GetOwnerAddress().worker_id());
}
ClientID GcsActor::GetOwnerNodeID() const {
return ClientID::FromBinary(GetOwnerAddress().raylet_id());
}
const rpc::Address &GcsActor::GetOwnerAddress() const {
return actor_table_data_.owner_address();
}
void GcsActor::UpdateState(rpc::ActorTableData::ActorState state) {
actor_table_data_.set_state(state);
}
@@ -75,9 +87,11 @@ rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_
/////////////////////////////////////////////////////////////////////////////////////////
GcsActorManager::GcsActorManager(std::shared_ptr<GcsActorSchedulerInterface> scheduler,
gcs::ActorInfoAccessor &actor_info_accessor)
gcs::ActorInfoAccessor &actor_info_accessor,
const rpc::ClientFactoryFn &worker_client_factory)
: gcs_actor_scheduler_(std::move(scheduler)),
actor_info_accessor_(actor_info_accessor) {}
actor_info_accessor_(actor_info_accessor),
worker_client_factory_(worker_client_factory) {}
Status GcsActorManager::RegisterActor(
const ray::rpc::CreateActorRequest &request,
@@ -121,6 +135,13 @@ Status GcsActorManager::RegisterActor(
// created.
actor_to_register_callbacks_[actor_id].emplace_back(std::move(callback));
RAY_CHECK(registered_actors_.emplace(actor->GetActorID(), actor).second);
if (!actor->IsDetached() && worker_client_factory_) {
// This actor is owned. Send a long polling request to the actor's
// owner to determine when the actor should be removed.
PollOwnerForActorOutOfScope(actor);
}
gcs_actor_scheduler_->Schedule(actor);
return Status::OK();
}
@@ -134,9 +155,140 @@ ActorID GcsActorManager::GetActorIDByName(const std::string &name) {
return actor_id;
}
void GcsActorManager::ReconstructActorOnWorker(const ray::ClientID &node_id,
const ray::WorkerID &worker_id,
bool need_reschedule) {
void GcsActorManager::PollOwnerForActorOutOfScope(
const std::shared_ptr<GcsActor> &actor) {
const auto &actor_id = actor->GetActorID();
const auto &owner_node_id = actor->GetOwnerNodeID();
const auto &owner_id = actor->GetOwnerID();
auto &workers = owners_[owner_node_id];
auto it = workers.find(owner_id);
if (it == workers.end()) {
RAY_LOG(DEBUG) << "Adding owner " << owner_id << " of actor " << actor_id;
std::shared_ptr<rpc::CoreWorkerClientInterface> client =
worker_client_factory_(actor->GetOwnerAddress());
it = workers.emplace(owner_id, Owner(std::move(client))).first;
}
it->second.children_actor_ids.insert(actor_id);
rpc::WaitForActorOutOfScopeRequest wait_request;
wait_request.set_intended_worker_id(owner_id.Binary());
wait_request.set_actor_id(actor_id.Binary());
RAY_CHECK_OK(it->second.client->WaitForActorOutOfScope(
wait_request, [this, owner_node_id, owner_id, actor_id](
Status status, const rpc::WaitForActorOutOfScopeReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO) << "Worker " << owner_id << " failed, destroying actor child";
}
auto node_it = owners_.find(owner_node_id);
if (node_it != owners_.end() && node_it->second.count(owner_id)) {
// Only destroy the actor if its owner is still alive. The actor may
// have already been destroyed if the owner died.
DestroyActor(actor_id);
}
}));
}
void GcsActorManager::DestroyActor(const ActorID &actor_id) {
RAY_LOG(DEBUG) << "Destroying actor " << actor_id;
actor_to_register_callbacks_.erase(actor_id);
auto it = registered_actors_.find(actor_id);
RAY_CHECK(it != registered_actors_.end())
<< "Tried to destroy actor that does not exist " << actor_id;
const auto actor = std::move(it->second);
registered_actors_.erase(it);
// Clean up the client to the actor's owner, if necessary.
if (!actor->IsDetached()) {
const auto &owner_node_id = actor->GetOwnerNodeID();
const auto &owner_id = actor->GetOwnerID();
RAY_LOG(DEBUG) << "Erasing actor " << actor_id << " owned by " << owner_id;
auto &node = owners_[owner_node_id];
auto worker_it = node.find(owner_id);
RAY_CHECK(worker_it != node.end());
auto &owner = worker_it->second;
RAY_CHECK(owner.children_actor_ids.erase(actor_id));
if (owner.children_actor_ids.empty()) {
node.erase(worker_it);
if (node.empty()) {
owners_.erase(owner_node_id);
}
}
}
// The actor is already dead, most likely due to process or node failure.
if (actor->GetState() == rpc::ActorTableData::DEAD) {
return;
}
// The actor is still alive or pending creation. Clean up all remaining
// state.
const auto &node_id = actor->GetNodeID();
const auto &worker_id = actor->GetWorkerID();
auto node_it = created_actors_.find(node_id);
if (node_it != created_actors_.end() && node_it->second.count(worker_id)) {
// The actor has already been created. Destroy the process by force-killing
// it.
auto actor_client = worker_client_factory_(actor->GetAddress());
rpc::KillActorRequest request;
request.set_intended_actor_id(actor_id.Binary());
request.set_force_kill(true);
request.set_no_reconstruction(true);
RAY_UNUSED(actor_client->KillActor(request, nullptr));
RAY_CHECK(node_it->second.erase(actor->GetWorkerID()));
if (node_it->second.empty()) {
created_actors_.erase(node_it);
}
} else {
// The actor has not been created yet. It is either being scheduled or is
// pending scheduling.
auto canceled_actor_id =
gcs_actor_scheduler_->CancelOnWorker(actor->GetNodeID(), actor->GetWorkerID());
if (!canceled_actor_id.IsNil()) {
// The actor was being scheduled and has now been canceled.
RAY_CHECK(canceled_actor_id == actor_id);
} else {
// The actor was pending scheduling. Remove it from the queue.
auto pending_it = std::find_if(pending_actors_.begin(), pending_actors_.end(),
[actor_id](const std::shared_ptr<GcsActor> &actor) {
return actor->GetActorID() == actor_id;
});
RAY_CHECK(pending_it != pending_actors_.end());
pending_actors_.erase(pending_it);
}
}
// Update the actor to DEAD in case any callers are still alive. This can
// happen if the owner of the actor dies while there are still callers.
// TODO(swang): We can skip this step and delete the actor table entry
// entirely if the callers check directly whether the owner is still alive.
actor->UpdateAddress(rpc::Address());
auto mutable_actor_table_data = actor->GetMutableActorTableData();
mutable_actor_table_data->set_state(rpc::ActorTableData::DEAD);
auto actor_table_data =
std::make_shared<rpc::ActorTableData>(*mutable_actor_table_data);
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(
actor_info_accessor_.AsyncUpdate(actor->GetActorID(), actor_table_data, nullptr));
}
void GcsActorManager::OnWorkerDead(const ray::ClientID &node_id,
const ray::WorkerID &worker_id,
bool intentional_exit) {
// Destroy all actors that are owned by this worker.
const auto it = owners_.find(node_id);
if (it != owners_.end() && it->second.count(worker_id)) {
auto owner = it->second.find(worker_id);
// Make a copy of the children actor IDs since we will delete from the
// list.
const auto children_ids = owner->second.children_actor_ids;
for (const auto &child_id : children_ids) {
DestroyActor(child_id);
}
}
ActorID actor_id;
// Find from worker_to_created_actor_.
auto iter = created_actors_.find(node_id);
@@ -154,11 +306,26 @@ void GcsActorManager::ReconstructActorOnWorker(const ray::ClientID &node_id,
RAY_LOG(INFO) << "Worker " << worker_id << " on node " << node_id
<< " failed, reconstructing actor " << actor_id;
// Reconstruct the actor.
ReconstructActor(actor_id, need_reschedule);
ReconstructActor(actor_id, /*need_reschedule=*/!intentional_exit);
}
}
void GcsActorManager::ReconstructActorsOnNode(const ClientID &node_id) {
void GcsActorManager::OnNodeDead(const ClientID &node_id) {
RAY_LOG(INFO) << "Node " << node_id << " failed, reconstructing actors";
const auto it = owners_.find(node_id);
if (it != owners_.end()) {
std::vector<ActorID> children_ids;
// Make a copy of all the actor IDs owned by workers on the dead node.
for (const auto &owner : it->second) {
for (const auto &child_id : owner.second.children_actor_ids) {
children_ids.push_back(child_id);
}
}
for (const auto &child_id : children_ids) {
DestroyActor(child_id);
}
}
// Cancel the scheduling of all related actors.
auto scheduling_actor_ids = gcs_actor_scheduler_->CancelOnNode(node_id);
for (auto &actor_id : scheduling_actor_ids) {
@@ -211,6 +378,10 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(
actor_info_accessor_.AsyncUpdate(actor->GetActorID(), actor_table_data, nullptr));
// The actor is dead, but we should not remove the entry from the
// registered actors yet. If the actor is owned, we will destroy the actor
// once the owner fails or notifies us that the actor's handle has gone out
// of scope.
}
}
@@ -231,7 +402,7 @@ void GcsActorManager::OnActorCreationSuccess(std::shared_ptr<GcsActor> actor) {
// Invoke all callbacks for all registration requests of this actor (duplicated
// requests are included) and remove all of them from actor_to_register_callbacks_.
auto iter = actor_to_register_callbacks_.find(actor->GetActorID());
auto iter = actor_to_register_callbacks_.find(actor_id);
if (iter != actor_to_register_callbacks_.end()) {
for (auto &callback : iter->second) {
callback(actor);
+55 -14
View File
@@ -19,6 +19,7 @@
#include <ray/common/task/task_execution_spec.h>
#include <ray/common/task/task_spec.h>
#include <ray/protobuf/gcs_service.pb.h>
#include <ray/rpc/worker/core_worker_client.h>
#include <utility>
#include "absl/container/flat_hash_map.h"
@@ -71,6 +72,12 @@ class GcsActor {
ClientID GetNodeID() const;
/// Get the id of the worker on which this actor is created.
WorkerID GetWorkerID() const;
/// Get the actor's owner ID.
WorkerID GetOwnerID() const;
/// Get the node ID of the actor's owner.
ClientID GetOwnerNodeID() const;
/// Get the address of the actor's owner.
const rpc::Address &GetOwnerAddress() const;
/// Update the `Address` of this actor (see gcs.proto).
void UpdateAddress(const rpc::Address &address);
@@ -112,7 +119,8 @@ class GcsActorManager {
/// \param scheduler Used to schedule actor creation tasks.
/// \param actor_info_accessor Used to flush actor data to storage.
GcsActorManager(std::shared_ptr<GcsActorSchedulerInterface> scheduler,
gcs::ActorInfoAccessor &actor_info_accessor);
gcs::ActorInfoAccessor &actor_info_accessor,
const rpc::ClientFactoryFn &worker_client_factory = nullptr);
~GcsActorManager() = default;
@@ -137,23 +145,25 @@ class GcsActorManager {
/// change.
void SchedulePendingActors();
/// Reconstruct all actors associated with the specified node id, including actors which
/// are scheduled or have been created on this node. Triggered when the given node goes
/// down.
/// Handle a node death. This will restart all actors associated with the
/// specified node id, including actors which are scheduled or have been
/// created on this node. Actors whose owners have died (possibly due to this
/// node being removed) will not be restarted. If any workers on this node
/// owned an actor, those actors will be destroyed.
///
/// \param node_id The specified node id.
void ReconstructActorsOnNode(const ClientID &node_id);
void OnNodeDead(const ClientID &node_id);
/// Reconstruct actor associated with the specified node_id and worker_id.
/// The actor may be pending or already created.
/// Handle a worker failure. This will restart the associated actor, if any,
/// which may be pending or already created. If the worker owned other
/// actors, those actors will be destroyed.
///
/// \param node_id ID of the node where the worker is located
/// \param worker_id ID of the worker that the actor is creating/created on
/// \param need_reschedule Whether to reschedule the actor creation task, sometimes
/// users want to kill an actor intentionally and don't want it to be rescheduled
/// again.
void ReconstructActorOnWorker(const ClientID &node_id, const WorkerID &worker_id,
bool need_reschedule = true);
/// \param node_id ID of the node where the dead worker was located.
/// \param worker_id ID of the dead worker.
/// \param intentional_exit Whether the death was intentional. If yes and the
/// worker was an actor, we should not attempt to restart the actor.
void OnWorkerDead(const ClientID &node_id, const WorkerID &worker_id,
bool intentional_exit = false);
/// Handle actor creation task failure. This should be called when scheduling
/// an actor creation task is infeasible.
@@ -167,6 +177,30 @@ class GcsActorManager {
/// \param actor The actor that has been created.
void OnActorCreationSuccess(std::shared_ptr<GcsActor> actor);
private:
/// A data structure representing an actor's owner.
struct Owner {
Owner(std::shared_ptr<rpc::CoreWorkerClientInterface> client)
: client(std::move(client)) {}
/// A client that can be used to contact the owner.
std::shared_ptr<rpc::CoreWorkerClientInterface> client;
/// The IDs of actors owned by this worker.
absl::flat_hash_set<ActorID> children_actor_ids;
};
/// Poll an actor's owner so that we will receive a notification when the
/// actor has gone out of scope, or the owner has died. This should not be
/// called for detached actors.
void PollOwnerForActorOutOfScope(const std::shared_ptr<GcsActor> &actor);
/// Destroy an actor that has gone out of scope. This cleans up all local
/// state associated with the actor and marks the actor as dead. For owned
/// actors, this should be called when all actor handles have gone out of
/// scope or the owner has died.
/// TODO: For detached actors, this should be called when the application
/// deregisters the actor.
void DestroyActor(const ActorID &actor_id);
private:
/// Reconstruct the specified actor.
///
@@ -191,11 +225,18 @@ class GcsActorManager {
/// Map contains the relationship of node and created actors. Each node ID
/// maps to a map from worker ID to the actor created on that worker.
absl::flat_hash_map<ClientID, absl::flat_hash_map<WorkerID, ActorID>> created_actors_;
/// Map from worker ID to a client and the IDs of the actors owned by that
/// worker. An owned actor should be destroyed once it has gone out of scope,
/// according to its owner, or the owner dies.
absl::flat_hash_map<ClientID, absl::flat_hash_map<WorkerID, Owner>> owners_;
/// The scheduler to schedule all registered actors.
std::shared_ptr<gcs::GcsActorSchedulerInterface> gcs_actor_scheduler_;
/// Actor table. Used to update actor information upon creation, deletion, etc.
gcs::ActorInfoAccessor &actor_info_accessor_;
/// Factory to produce clients to workers. This is used to communicate with
/// actors and their owners.
rpc::ClientFactoryFn worker_client_factory_;
};
} // namespace gcs
@@ -186,7 +186,6 @@ void GcsNodeManager::HandleReportHeartbeat(const rpc::ReportHeartbeatRequest &re
rpc::ReportHeartbeatReply *reply,
rpc::SendReplyCallback send_reply_callback) {
ClientID node_id = ClientID::FromBinary(request.heartbeat().client_id());
RAY_LOG(DEBUG) << "Reporting heartbeat, node id = " << node_id;
auto heartbeat_data = std::make_shared<rpc::HeartbeatTableData>();
heartbeat_data->CopyFrom(request.heartbeat());
node_failure_detector_->HandleHeartbeat(node_id, *heartbeat_data);
@@ -195,7 +194,6 @@ void GcsNodeManager::HandleReportHeartbeat(const rpc::ReportHeartbeatRequest &re
// The heartbeat data is reported here because some python unit tests rely on the
// heartbeat data in redis.
RAY_CHECK_OK(node_info_accessor_.AsyncReportHeartbeat(heartbeat_data, nullptr));
RAY_LOG(DEBUG) << "Finished reporting heartbeat, node id = " << node_id;
}
void GcsNodeManager::HandleGetResources(const rpc::GetResourcesRequest &request,
+12 -10
View File
@@ -160,8 +160,10 @@ void GcsServer::InitGcsActorManager() {
[this](const rpc::Address &address) {
return std::make_shared<rpc::CoreWorkerClient>(address, client_call_manager_);
});
gcs_actor_manager_ =
std::make_shared<GcsActorManager>(scheduler, redis_gcs_client_->Actors());
gcs_actor_manager_ = std::make_shared<GcsActorManager>(
scheduler, redis_gcs_client_->Actors(), [this](const rpc::Address &address) {
return std::make_shared<rpc::CoreWorkerClient>(address, client_call_manager_);
});
gcs_node_manager_->AddNodeAddedListener(
[this](const std::shared_ptr<rpc::GcsNodeInfo> &) {
// Because a new node has been added, we need to try to schedule the pending
@@ -169,19 +171,19 @@ void GcsServer::InitGcsActorManager() {
gcs_actor_manager_->SchedulePendingActors();
});
gcs_node_manager_->AddNodeRemovedListener([this](
std::shared_ptr<rpc::GcsNodeInfo> node) {
// All of the related actors should be reconstructed when a node is removed from the
// GCS.
gcs_actor_manager_->ReconstructActorsOnNode(ClientID::FromBinary(node->node_id()));
});
gcs_node_manager_->AddNodeRemovedListener(
[this](std::shared_ptr<rpc::GcsNodeInfo> node) {
// All of the related actors should be reconstructed when a node is removed from
// the GCS.
gcs_actor_manager_->OnNodeDead(ClientID::FromBinary(node->node_id()));
});
RAY_CHECK_OK(redis_gcs_client_->Workers().AsyncSubscribeToWorkerFailures(
[this](const WorkerID &id, const rpc::WorkerFailureData &worker_failure_data) {
auto &worker_address = worker_failure_data.worker_address();
WorkerID worker_id = WorkerID::FromBinary(worker_address.worker_id());
ClientID node_id = ClientID::FromBinary(worker_address.raylet_id());
auto needs_restart = !worker_failure_data.intentional_disconnect();
gcs_actor_manager_->ReconstructActorOnWorker(node_id, worker_id, needs_restart);
gcs_actor_manager_->OnWorkerDead(node_id, worker_id,
worker_failure_data.intentional_disconnect());
},
/*done_callback=*/nullptr));
}
@@ -35,14 +35,48 @@ class MockActorScheduler : public gcs::GcsActorSchedulerInterface {
std::vector<std::shared_ptr<gcs::GcsActor>> actors;
};
class MockWorkerClient : public rpc::CoreWorkerClientInterface {
public:
ray::Status WaitForActorOutOfScope(
const rpc::WaitForActorOutOfScopeRequest &request,
const rpc::ClientCallback<rpc::WaitForActorOutOfScopeReply> &callback) override {
callbacks.push_back(callback);
return Status::OK();
}
ray::Status KillActor(
const rpc::KillActorRequest &request,
const rpc::ClientCallback<rpc::KillActorReply> &callback) override {
killed_actors.push_back(ActorID::FromBinary(request.intended_actor_id()));
return Status::OK();
}
bool Reply(Status status = Status::OK()) {
if (callbacks.size() == 0) {
return false;
}
auto callback = callbacks.front();
auto reply = rpc::WaitForActorOutOfScopeReply();
callback(status, reply);
callbacks.pop_front();
return true;
}
std::list<rpc::ClientCallback<rpc::WaitForActorOutOfScopeReply>> callbacks;
std::vector<ActorID> killed_actors;
};
class GcsActorManagerTest : public ::testing::Test {
public:
GcsActorManagerTest()
: mock_actor_scheduler_(new MockActorScheduler()),
gcs_actor_manager_(mock_actor_scheduler_, actor_info_accessor_) {}
worker_client_(new MockWorkerClient()),
gcs_actor_manager_(mock_actor_scheduler_, actor_info_accessor_,
[&](const rpc::Address &addr) { return worker_client_; }) {}
GcsServerMocker::MockedActorInfoAccessor actor_info_accessor_;
std::shared_ptr<MockActorScheduler> mock_actor_scheduler_;
std::shared_ptr<MockWorkerClient> worker_client_;
gcs::GcsActorManager gcs_actor_manager_;
};
@@ -61,6 +95,34 @@ TEST_F(GcsActorManagerTest, TestBasic) {
auto actor = mock_actor_scheduler_->actors.back();
mock_actor_scheduler_->actors.pop_back();
// Check that the actor is in state `ALIVE`.
rpc::Address address;
auto node_id = ClientID::FromRandom();
auto worker_id = WorkerID::FromRandom();
address.set_raylet_id(node_id.Binary());
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_.OnActorCreationSuccess(actor);
ASSERT_EQ(finished_actors.size(), 1);
ASSERT_TRUE(worker_client_->Reply());
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
}
TEST_F(GcsActorManagerTest, TestSchedulingFailed) {
auto job_id = JobID::FromInt(1);
auto create_actor_request = Mocker::GenCreateActorRequest(job_id);
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
RAY_CHECK_OK(gcs_actor_manager_.RegisterActor(
create_actor_request, [&finished_actors](std::shared_ptr<gcs::GcsActor> actor) {
finished_actors.emplace_back(actor);
}));
ASSERT_EQ(finished_actors.size(), 0);
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1);
auto actor = mock_actor_scheduler_->actors.back();
mock_actor_scheduler_->actors.clear();
gcs_actor_manager_.OnActorCreationFailed(actor);
gcs_actor_manager_.SchedulePendingActors();
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1);
@@ -76,21 +138,48 @@ TEST_F(GcsActorManagerTest, TestBasic) {
actor->UpdateAddress(address);
gcs_actor_manager_.OnActorCreationSuccess(actor);
ASSERT_EQ(finished_actors.size(), 1);
}
TEST_F(GcsActorManagerTest, TestWorkerFailure) {
auto job_id = JobID::FromInt(1);
auto create_actor_request = Mocker::GenCreateActorRequest(job_id);
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
RAY_CHECK_OK(gcs_actor_manager_.RegisterActor(
create_actor_request, [&finished_actors](std::shared_ptr<gcs::GcsActor> actor) {
finished_actors.emplace_back(actor);
}));
ASSERT_EQ(finished_actors.size(), 0);
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1);
auto actor = mock_actor_scheduler_->actors.back();
mock_actor_scheduler_->actors.pop_back();
// Check that the actor is in state `ALIVE`.
rpc::Address address;
auto node_id = ClientID::FromRandom();
auto worker_id = WorkerID::FromRandom();
address.set_raylet_id(node_id.Binary());
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_.OnActorCreationSuccess(actor);
ASSERT_EQ(finished_actors.size(), 1);
// Killing another worker does not affect this actor.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnWorker(node_id, _));
gcs_actor_manager_.ReconstructActorOnWorker(node_id, WorkerID::FromRandom());
gcs_actor_manager_.OnWorkerDead(node_id, WorkerID::FromRandom());
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE);
// Remove worker and then check that the actor is dead.
gcs_actor_manager_.ReconstructActorOnWorker(node_id, worker_id);
gcs_actor_manager_.OnWorkerDead(node_id, worker_id);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
// No more actors to schedule.
gcs_actor_manager_.SchedulePendingActors();
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 0);
ASSERT_TRUE(worker_client_->Reply());
}
TEST_F(GcsActorManagerTest, TestBasicNodeFailure) {
TEST_F(GcsActorManagerTest, TestNodeFailure) {
auto job_id = JobID::FromInt(1);
auto create_actor_request = Mocker::GenCreateActorRequest(job_id);
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
@@ -105,12 +194,6 @@ TEST_F(GcsActorManagerTest, TestBasicNodeFailure) {
auto actor = mock_actor_scheduler_->actors.back();
mock_actor_scheduler_->actors.pop_back();
gcs_actor_manager_.OnActorCreationFailed(actor);
gcs_actor_manager_.SchedulePendingActors();
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1);
mock_actor_scheduler_->actors.clear();
ASSERT_EQ(finished_actors.size(), 0);
// Check that the actor is in state `ALIVE`.
rpc::Address address;
auto node_id = ClientID::FromRandom();
@@ -123,16 +206,18 @@ TEST_F(GcsActorManagerTest, TestBasicNodeFailure) {
// Killing another node does not affect this actor.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(_));
gcs_actor_manager_.ReconstructActorsOnNode(ClientID::FromRandom());
gcs_actor_manager_.OnNodeDead(ClientID::FromRandom());
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE);
// Remove node and then check that the actor is dead.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id));
gcs_actor_manager_.ReconstructActorsOnNode(node_id);
gcs_actor_manager_.OnNodeDead(node_id);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
// No more actors to schedule.
gcs_actor_manager_.SchedulePendingActors();
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 0);
ASSERT_TRUE(worker_client_->Reply());
}
TEST_F(GcsActorManagerTest, TestActorReconstruction) {
@@ -161,9 +246,9 @@ TEST_F(GcsActorManagerTest, TestActorReconstruction) {
gcs_actor_manager_.OnActorCreationSuccess(actor);
ASSERT_EQ(finished_actors.size(), 1);
// Remove node and then check that the actor is being restarted.
// Remove worker and then check that the actor is being restarted.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id));
gcs_actor_manager_.ReconstructActorsOnNode(node_id);
gcs_actor_manager_.OnNodeDead(node_id);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::RECONSTRUCTING);
// Add node and check that the actor is restarted.
@@ -179,18 +264,97 @@ TEST_F(GcsActorManagerTest, TestActorReconstruction) {
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE);
ASSERT_EQ(actor->GetNodeID(), node_id2);
// Killing another node does not affect this actor.
// Killing another worker does not affect this actor.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(_));
gcs_actor_manager_.ReconstructActorsOnNode(ClientID::FromRandom());
gcs_actor_manager_.OnNodeDead(ClientID::FromRandom());
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE);
// Remove node and then check that the actor is dead.
// Remove worker and then check that the actor is dead.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id2));
gcs_actor_manager_.ReconstructActorsOnNode(node_id2);
gcs_actor_manager_.OnNodeDead(node_id2);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
// No more actors to schedule.
gcs_actor_manager_.SchedulePendingActors();
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 0);
ASSERT_TRUE(worker_client_->Reply());
}
TEST_F(GcsActorManagerTest, TestActorRestartWhenOwnerDead) {
auto job_id = JobID::FromInt(1);
auto create_actor_request = Mocker::GenCreateActorRequest(
job_id, /*max_reconstructions=*/1, /*detached=*/false);
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
RAY_CHECK_OK(gcs_actor_manager_.RegisterActor(
create_actor_request, [&finished_actors](std::shared_ptr<gcs::GcsActor> actor) {
finished_actors.emplace_back(actor);
}));
ASSERT_EQ(finished_actors.size(), 0);
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1);
auto actor = mock_actor_scheduler_->actors.back();
mock_actor_scheduler_->actors.pop_back();
const auto owner_node_id = actor->GetOwnerNodeID();
// Check that the actor is in state `ALIVE`.
rpc::Address address;
auto node_id = ClientID::FromRandom();
auto worker_id = WorkerID::FromRandom();
address.set_raylet_id(node_id.Binary());
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_.OnActorCreationSuccess(actor);
ASSERT_EQ(finished_actors.size(), 1);
// Remove the owner's node.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(owner_node_id));
gcs_actor_manager_.OnNodeDead(owner_node_id);
// The child actor should be marked as dead.
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
ASSERT_EQ(worker_client_->killed_actors.size(), 1);
ASSERT_EQ(worker_client_->killed_actors.front(), actor->GetActorID());
// Remove the actor's node and check that the actor is not restarted, since
// its owner has died.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id));
gcs_actor_manager_.OnNodeDead(node_id);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
gcs_actor_manager_.SchedulePendingActors();
ASSERT_TRUE(mock_actor_scheduler_->actors.empty());
}
TEST_F(GcsActorManagerTest, TestDetachedActorRestartWhenCreatorDead) {
auto job_id = JobID::FromInt(1);
auto create_actor_request =
Mocker::GenCreateActorRequest(job_id, /*max_reconstructions=*/1, /*detached=*/true);
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
RAY_CHECK_OK(gcs_actor_manager_.RegisterActor(
create_actor_request, [&finished_actors](std::shared_ptr<gcs::GcsActor> actor) {
finished_actors.emplace_back(actor);
}));
ASSERT_EQ(finished_actors.size(), 0);
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1);
auto actor = mock_actor_scheduler_->actors.back();
mock_actor_scheduler_->actors.pop_back();
const auto owner_node_id = actor->GetOwnerNodeID();
// Check that the actor is in state `ALIVE`.
rpc::Address address;
auto node_id = ClientID::FromRandom();
auto worker_id = WorkerID::FromRandom();
address.set_raylet_id(node_id.Binary());
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_.OnActorCreationSuccess(actor);
ASSERT_EQ(finished_actors.size(), 1);
// Remove the owner's node.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(owner_node_id));
gcs_actor_manager_.OnNodeDead(owner_node_id);
// The child actor should not be marked as dead.
ASSERT_TRUE(worker_client_->killed_actors.empty());
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE);
}
TEST_F(GcsActorManagerTest, TestNamedActors) {
@@ -74,6 +74,7 @@ TEST_F(GcsActorSchedulerTest, TestScheduleFailedWithZeroNode) {
ASSERT_EQ(0, success_actors_.size());
ASSERT_EQ(1, failure_actors_.size());
ASSERT_EQ(actor, failure_actors_.front());
ASSERT_TRUE(actor->GetNodeID().IsNil());
}
TEST_F(GcsActorSchedulerTest, TestScheduleActorSuccess) {
@@ -94,9 +95,10 @@ TEST_F(GcsActorSchedulerTest, TestScheduleActorSuccess) {
ASSERT_EQ(0, worker_client_->callbacks.size());
// Grant a worker, then the actor creation request should be send to the worker.
ASSERT_TRUE(raylet_client_->GrantWorkerLease(
node->node_manager_address(), node->node_manager_port(), WorkerID::FromRandom(),
node_id, ClientID::Nil()));
WorkerID worker_id = WorkerID::FromRandom();
ASSERT_TRUE(raylet_client_->GrantWorkerLease(node->node_manager_address(),
node->node_manager_port(), worker_id,
node_id, ClientID::Nil()));
ASSERT_EQ(0, raylet_client_->callbacks.size());
ASSERT_EQ(1, worker_client_->callbacks.size());
@@ -106,6 +108,8 @@ TEST_F(GcsActorSchedulerTest, TestScheduleActorSuccess) {
ASSERT_EQ(0, failure_actors_.size());
ASSERT_EQ(1, success_actors_.size());
ASSERT_EQ(actor, success_actors_.front());
ASSERT_EQ(actor->GetNodeID(), node_id);
ASSERT_EQ(actor->GetWorkerID(), worker_id);
}
TEST_F(GcsActorSchedulerTest, TestScheduleRetryWhenLeasing) {
@@ -136,9 +140,10 @@ TEST_F(GcsActorSchedulerTest, TestScheduleRetryWhenLeasing) {
ASSERT_EQ(0, worker_client_->callbacks.size());
// Grant a worker, then the actor creation request should be send to the worker.
ASSERT_TRUE(raylet_client_->GrantWorkerLease(
node->node_manager_address(), node->node_manager_port(), WorkerID::FromRandom(),
node_id, ClientID::Nil()));
WorkerID worker_id = WorkerID::FromRandom();
ASSERT_TRUE(raylet_client_->GrantWorkerLease(node->node_manager_address(),
node->node_manager_port(), worker_id,
node_id, ClientID::Nil()));
ASSERT_EQ(0, raylet_client_->callbacks.size());
ASSERT_EQ(1, worker_client_->callbacks.size());
@@ -148,6 +153,8 @@ TEST_F(GcsActorSchedulerTest, TestScheduleRetryWhenLeasing) {
ASSERT_EQ(0, failure_actors_.size());
ASSERT_EQ(1, success_actors_.size());
ASSERT_EQ(actor, success_actors_.front());
ASSERT_EQ(actor->GetNodeID(), node_id);
ASSERT_EQ(actor->GetWorkerID(), worker_id);
}
TEST_F(GcsActorSchedulerTest, TestScheduleRetryWhenCreating) {
@@ -168,9 +175,10 @@ TEST_F(GcsActorSchedulerTest, TestScheduleRetryWhenCreating) {
ASSERT_EQ(0, worker_client_->callbacks.size());
// Grant a worker, then the actor creation request should be send to the worker.
ASSERT_TRUE(raylet_client_->GrantWorkerLease(
node->node_manager_address(), node->node_manager_port(), WorkerID::FromRandom(),
node_id, ClientID::Nil()));
WorkerID worker_id = WorkerID::FromRandom();
ASSERT_TRUE(raylet_client_->GrantWorkerLease(node->node_manager_address(),
node->node_manager_port(), worker_id,
node_id, ClientID::Nil()));
ASSERT_EQ(0, raylet_client_->callbacks.size());
ASSERT_EQ(1, worker_client_->callbacks.size());
ASSERT_EQ(0, gcs_actor_scheduler_->num_retry_creating_count_);
@@ -186,6 +194,8 @@ TEST_F(GcsActorSchedulerTest, TestScheduleRetryWhenCreating) {
ASSERT_EQ(0, failure_actors_.size());
ASSERT_EQ(1, success_actors_.size());
ASSERT_EQ(actor, success_actors_.front());
ASSERT_EQ(actor->GetNodeID(), node_id);
ASSERT_EQ(actor->GetWorkerID(), worker_id);
}
TEST_F(GcsActorSchedulerTest, TestNodeFailedWhenLeasing) {
@@ -213,6 +223,7 @@ TEST_F(GcsActorSchedulerTest, TestNodeFailedWhenLeasing) {
ASSERT_EQ(actor->GetActorID(), actor_ids.front());
ASSERT_EQ(1, raylet_client_->num_workers_requested);
ASSERT_EQ(1, raylet_client_->callbacks.size());
ASSERT_TRUE(actor->GetNodeID().IsNil());
// Grant a worker, which will influence nothing.
ASSERT_TRUE(raylet_client_->GrantWorkerLease(
@@ -258,6 +269,7 @@ TEST_F(GcsActorSchedulerTest, TestNodeFailedWhenCreating) {
ASSERT_EQ(1, actor_ids.size());
ASSERT_EQ(actor->GetActorID(), actor_ids.front());
ASSERT_EQ(1, worker_client_->callbacks.size());
ASSERT_TRUE(actor->GetNodeID().IsNil());
// Reply the actor creation request, which will influence nothing.
ASSERT_TRUE(worker_client_->ReplyPushTask());
@@ -297,6 +309,8 @@ TEST_F(GcsActorSchedulerTest, TestWorkerFailedWhenCreating) {
ASSERT_EQ(actor->GetActorID(),
gcs_actor_scheduler_->CancelOnWorker(node_id, worker_id));
ASSERT_EQ(1, worker_client_->callbacks.size());
ASSERT_TRUE(actor->GetNodeID().IsNil());
ASSERT_TRUE(actor->GetWorkerID().IsNil());
// Reply the actor creation request, which will influence nothing.
ASSERT_TRUE(worker_client_->ReplyPushTask());
@@ -340,9 +354,10 @@ TEST_F(GcsActorSchedulerTest, TestSpillback) {
ASSERT_EQ(0, worker_client_->callbacks.size());
// Grant a worker, then the actor creation request should be send to the worker.
ASSERT_TRUE(raylet_client_->GrantWorkerLease(
node2->node_manager_address(), node2->node_manager_port(), WorkerID::FromRandom(),
node_id_2, ClientID::Nil()));
WorkerID worker_id = WorkerID::FromRandom();
ASSERT_TRUE(raylet_client_->GrantWorkerLease(node2->node_manager_address(),
node2->node_manager_port(), worker_id,
node_id_2, ClientID::Nil()));
ASSERT_EQ(0, raylet_client_->callbacks.size());
ASSERT_EQ(1, worker_client_->callbacks.size());
@@ -350,11 +365,11 @@ TEST_F(GcsActorSchedulerTest, TestSpillback) {
ASSERT_TRUE(worker_client_->ReplyPushTask());
ASSERT_EQ(0, worker_client_->callbacks.size());
ASSERT_EQ(node_id_2, actor->GetNodeID());
ASSERT_EQ(0, failure_actors_.size());
ASSERT_EQ(1, success_actors_.size());
ASSERT_EQ(actor, success_actors_.front());
ASSERT_EQ(actor->GetNodeID(), node_id_2);
ASSERT_EQ(actor->GetWorkerID(), worker_id);
}
} // namespace ray
@@ -362,4 +377,4 @@ TEST_F(GcsActorSchedulerTest, TestSpillback) {
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
}
@@ -34,8 +34,6 @@ void DefaultWorkerInfoHandler::HandleReportWorkerFailure(
RAY_CHECK_OK(gcs_pub_sub_->Publish(WORKER_FAILURE_CHANNEL, worker_id.Binary(),
worker_failure_data->SerializeAsString(),
nullptr));
RAY_LOG(DEBUG) << "Finished reporting worker failure, "
<< worker_address.DebugString();
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
+15
View File
@@ -163,6 +163,16 @@ message GetObjectStatusReply {
ObjectStatus status = 1;
}
message WaitForActorOutOfScopeRequest {
// The ID of the worker this message is intended for.
bytes intended_worker_id = 1;
// ActorID of the actor in scope.
bytes actor_id = 2;
}
message WaitForActorOutOfScopeReply {
}
message WaitForObjectEvictionRequest {
// The ID of the worker this message is intended for.
bytes intended_worker_id = 1;
@@ -298,6 +308,11 @@ service CoreWorkerService {
returns (DirectActorCallArgWaitCompleteReply);
// Ask the object's owner about the object's current status.
rpc GetObjectStatus(GetObjectStatusRequest) returns (GetObjectStatusReply);
// Wait for the actor's owner to decide that the actor has gone out of scope.
// Replying to this message indicates that the client should force-kill the
// actor process, if still alive.
rpc WaitForActorOutOfScope(WaitForActorOutOfScopeRequest)
returns (WaitForActorOutOfScopeReply);
// Notify the object's owner that it has been pinned by a raylet. Replying
// to this message indicates that the raylet should unpin the object.
rpc WaitForObjectEviction(WaitForObjectEvictionRequest)
+2
View File
@@ -2524,6 +2524,8 @@ void NodeManager::AssignTask(const std::shared_ptr<Worker> &worker, const Task &
failed_nodes_cache_.count(owner_node_id) > 0)) {
// TODO(swang): Skip assigning this task to this worker instead of
// killing the worker?
RAY_LOG(INFO) << "Owner of assigned task " << task.GetTaskSpecification().TaskId()
<< " died, killing leased worker " << worker->WorkerId();
KillWorker(worker);
}
+9
View File
@@ -144,6 +144,13 @@ class CoreWorkerClientInterface {
return Status::NotImplemented("");
}
/// Ask the actor's owner to reply when the actor has gone out of scope.
virtual ray::Status WaitForActorOutOfScope(
const WaitForActorOutOfScopeRequest &request,
const ClientCallback<WaitForActorOutOfScopeReply> &callback) {
return Status::NotImplemented("");
}
/// Notify the owner of an object that the object has been pinned.
virtual ray::Status WaitForObjectEviction(
const WaitForObjectEvictionRequest &request,
@@ -225,6 +232,8 @@ class CoreWorkerClient : public std::enable_shared_from_this<CoreWorkerClient>,
RPC_CLIENT_METHOD(CoreWorkerService, RemoteCancelTask, grpc_client_, override)
RPC_CLIENT_METHOD(CoreWorkerService, WaitForActorOutOfScope, grpc_client_, override)
RPC_CLIENT_METHOD(CoreWorkerService, WaitForObjectEviction, grpc_client_, override)
RPC_CLIENT_METHOD(CoreWorkerService, GetCoreWorkerStats, grpc_client_, override)
+2
View File
@@ -32,6 +32,7 @@ namespace rpc {
RPC_SERVICE_HANDLER(CoreWorkerService, PushTask) \
RPC_SERVICE_HANDLER(CoreWorkerService, DirectActorCallArgWaitComplete) \
RPC_SERVICE_HANDLER(CoreWorkerService, GetObjectStatus) \
RPC_SERVICE_HANDLER(CoreWorkerService, WaitForActorOutOfScope) \
RPC_SERVICE_HANDLER(CoreWorkerService, WaitForObjectEviction) \
RPC_SERVICE_HANDLER(CoreWorkerService, WaitForRefRemoved) \
RPC_SERVICE_HANDLER(CoreWorkerService, KillActor) \
@@ -46,6 +47,7 @@ namespace rpc {
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PushTask) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(DirectActorCallArgWaitComplete) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(GetObjectStatus) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(WaitForActorOutOfScope) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(WaitForObjectEviction) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(WaitForRefRemoved) \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(KillActor) \