[Core] GCS Actor Management Race Condition (#9215)

* GCS Actor management on by default.

* Fix travis config.

* Change condition.

* Finish the initial race condition fix.

* Lint.

* Refine the codebase.

* Finish the initial version

* Improve logic.

* Remove unnecessary log messages.

* Address code review.

* Add tests

* Revert the second race condition that doesn't happen anymore, handle some edge cases. add tests.

* Address the second race condition found.

* Addressed code review.

* Addressed code review.

* Run a new unit test only when gcs actor management is on.
This commit is contained in:
SangBin Cho
2020-07-08 10:56:52 -07:00
committed by GitHub
parent c5aa1eb042
commit 9f8ff2e3b1
6 changed files with 168 additions and 14 deletions
+44
View File
@@ -908,6 +908,50 @@ def test_actor_creation_task_crash(ray_start_regular):
ray.get(ra.f.remote())
@pytest.mark.skipif(
os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true",
reason=("This edge case is not handled when GCS actor management is off. "
"We won't fix this because GCS actor management "
"will be on by default anyway."))
@pytest.mark.parametrize(
"ray_start_regular", [{
"num_cpus": 2,
"num_gpus": 1
}], indirect=True)
def test_pending_actor_removed_by_owner(ray_start_regular):
# Verify when an owner of pending actors is killed, the actor resources
# are correctly returned.
@ray.remote(num_cpus=1, num_gpus=1)
class A:
def __init__(self):
self.actors = []
def create_actors(self):
self.actors = [B.remote() for _ in range(2)]
@ray.remote(num_gpus=1)
class B:
def ping(self):
return True
@ray.remote(num_gpus=1)
def f():
return True
a = A.remote()
# Create pending actors
ray.get(a.create_actors.remote())
# Owner is dead. pending actors should be killed
# and raylet should return workers correctly.
del a
a = B.remote()
assert ray.get(a.ping.remote())
ray.kill(a)
assert ray.get(f.remote())
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
+20 -10
View File
@@ -516,15 +516,21 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) {
// The actor was being scheduled and has now been canceled.
RAY_CHECK(canceled_actor_id == actor_id);
} else {
// The actor was pending scheduling. Remove it from the queue.
auto pending_it = std::find_if(pending_actors_.begin(), pending_actors_.end(),
[actor_id](const std::shared_ptr<GcsActor> &actor) {
return actor->GetActorID() == actor_id;
});
// TODO(rkooo567): The actor may be in the state of leasing worker. We need to add
// the processing logic of this in https://github.com/ray-project/ray/pull/9215.
// The actor was pending scheduling. Remove it from the queue.
if (pending_it != pending_actors_.end()) {
pending_actors_.erase(pending_it);
} else {
// When actor creation request of this actor id is pending in raylet,
// it doesn't responds, and the actor should be still in leasing state.
// NOTE: Raylet will cancel the lease request once it receives the
// actor state notification. So this method doesn't have to cancel
// outstanding lease request by calling raylet_client->CancelWorkerLease
gcs_actor_scheduler_->CancelOnLeasing(node_id, actor_id);
}
}
}
@@ -562,8 +568,9 @@ void GcsActorManager::OnWorkerDead(const ray::ClientID &node_id,
}
}
// Find if actor is already created or in the creation process (lease request is
// granted)
ActorID actor_id;
// Find from worker_to_created_actor_.
auto iter = created_actors_.find(node_id);
if (iter != created_actors_.end() && iter->second.count(worker_id)) {
actor_id = iter->second[worker_id];
@@ -573,14 +580,17 @@ void GcsActorManager::OnWorkerDead(const ray::ClientID &node_id,
}
} else {
actor_id = gcs_actor_scheduler_->CancelOnWorker(node_id, worker_id);
if (actor_id.IsNil()) {
return;
}
}
if (!actor_id.IsNil()) {
RAY_LOG(WARNING) << "Worker " << worker_id << " on node " << node_id
<< " failed, restarting actor " << actor_id;
// Reconstruct the actor.
ReconstructActor(actor_id, /*need_reschedule=*/!intentional_exit);
}
// Otherwise, try to reconstruct the actor that was already created or in the creation
// process.
RAY_LOG(WARNING) << "Worker " << worker_id << " on node " << node_id
<< " failed, restarting actor " << actor_id
<< ", intentional exit: " << intentional_exit;
ReconstructActor(actor_id, /*need_reschedule=*/!intentional_exit);
}
void GcsActorManager::OnNodeDead(const ClientID &node_id) {
+23 -4
View File
@@ -123,6 +123,16 @@ std::vector<ActorID> GcsActorScheduler::CancelOnNode(const ClientID &node_id) {
return actor_ids;
}
void GcsActorScheduler::CancelOnLeasing(const ClientID &node_id,
const ActorID &actor_id) {
// NOTE: This method does not currently cancel the outstanding lease request.
// It only removes leasing information from the internal state so that
// RequestWorkerLease ignores the response from raylet.
auto node_it = node_to_actors_when_leasing_.find(node_id);
RAY_CHECK(node_it != node_to_actors_when_leasing_.end());
node_it->second.erase(actor_id);
}
ActorID GcsActorScheduler::CancelOnWorker(const ClientID &node_id,
const WorkerID &worker_id) {
// Remove the worker from creating map and return ID of the actor associated with the
@@ -169,11 +179,16 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr<GcsActor> actor,
// gcs_actor_manager will reconstruct it again.
auto iter = node_to_actors_when_leasing_.find(node_id);
if (iter != node_to_actors_when_leasing_.end()) {
// If the node is still available, the actor must be still in the leasing map as
// it is erased from leasing map only when `CancelOnNode` or the
// `RequestWorkerLeaseReply` is received from the node, so try lease again.
auto actor_iter = iter->second.find(actor->GetActorID());
RAY_CHECK(actor_iter != iter->second.end());
if (actor_iter == iter->second.end()) {
// if actor is not in leasing state, it means it is cancelled.
RAY_LOG(INFO) << "Raylet granted a lease request, but the outstanding lease "
"request for "
<< actor->GetActorID()
<< " has been already cancelled. The response will be ignored.";
return;
}
if (status.ok()) {
// Remove the actor from the leasing map as the reply is returned from the
// remote node.
@@ -252,6 +267,10 @@ void GcsActorScheduler::HandleWorkerLeasedReply(
.emplace(leased_worker->GetWorkerID(), leased_worker)
.second);
actor->UpdateAddress(leased_worker->GetAddress());
// Make sure to connect to the client before persisting actor info to GCS.
// Without this, there could be a possible race condition. Related issues:
// https://github.com/ray-project/ray/pull/9215/files#r449469320
GetOrConnectCoreWorkerClient(leased_worker->GetAddress());
RAY_CHECK_OK(gcs_actor_table_.Put(actor->GetActorID(), actor->GetActorTableData(),
[this, actor, leased_worker](Status status) {
RAY_CHECK_OK(status);
@@ -55,6 +55,12 @@ class GcsActorSchedulerInterface {
/// \return ID list of actors associated with the specified node id.
virtual std::vector<ActorID> CancelOnNode(const ClientID &node_id) = 0;
/// Cancel a outstanding leasing request to raylets.
///
/// \param node_id ID of the node where the actor leasing request has been sent.
/// \param actor_id ID of an actor.
virtual void CancelOnLeasing(const ClientID &node_id, const ActorID &actor_id) = 0;
/// Cancel the actor that is being scheduled to the specified worker.
///
/// \param node_id ID of the node where the worker is located.
@@ -109,6 +115,16 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
/// \return ID list of actors associated with the specified node id.
std::vector<ActorID> CancelOnNode(const ClientID &node_id) override;
/// Cancel a outstanding leasing request to raylets.
///
/// NOTE: The current implementation does not actually send lease cancel request to
/// raylet. This method must be only used to ignore incoming raylet lease request
/// responses.
///
/// \param node_id ID of the node where the actor leasing request has been sent.
/// \param actor_id ID of an actor.
void CancelOnLeasing(const ClientID &node_id, const ActorID &actor_id) override;
/// Cancel the actor that is being scheduled to the specified worker.
///
/// \param node_id ID of the node where the worker is located.
@@ -23,6 +23,7 @@
namespace ray {
using ::testing::_;
using ::testing::Return;
class MockActorScheduler : public gcs::GcsActorSchedulerInterface {
public:
@@ -34,6 +35,7 @@ class MockActorScheduler : public gcs::GcsActorSchedulerInterface {
MOCK_METHOD1(CancelOnNode, std::vector<ActorID>(const ClientID &node_id));
MOCK_METHOD2(CancelOnWorker,
ActorID(const ClientID &node_id, const WorkerID &worker_id));
MOCK_METHOD2(CancelOnLeasing, void(const ClientID &node_id, const ActorID &actor_id));
std::vector<std::shared_ptr<gcs::GcsActor>> actors;
};
@@ -599,6 +601,36 @@ TEST_F(GcsActorManagerTest, TestDestroyActorBeforeActorCreationCompletes) {
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD);
}
TEST_F(GcsActorManagerTest, TestRaceConditionCancelLease) {
// Covers a scenario 1 in this PR https://github.com/ray-project/ray/pull/9215.
auto job_id = JobID::FromInt(1);
auto create_actor_request =
Mocker::GenCreateActorRequest(job_id, /*max_restarts=*/1, /*detached=*/false);
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
RAY_CHECK_OK(gcs_actor_manager_->RegisterActor(
create_actor_request, [&finished_actors](std::shared_ptr<gcs::GcsActor> actor) {
finished_actors.emplace_back(actor);
}));
ASSERT_EQ(finished_actors.size(), 0);
ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1);
auto actor = mock_actor_scheduler_->actors.back();
mock_actor_scheduler_->actors.pop_back();
const auto owner_node_id = actor->GetOwnerNodeID();
const auto owner_worker_id = actor->GetOwnerID();
// Check that the actor is in state `ALIVE`.
rpc::Address address;
auto node_id = ClientID::FromRandom();
auto worker_id = WorkerID::FromRandom();
address.set_raylet_id(node_id.Binary());
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
const auto actor_id = actor->GetActorID();
EXPECT_CALL(*mock_actor_scheduler_, CancelOnLeasing(node_id, actor_id));
gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id, false);
}
} // namespace ray
int main(int argc, char **argv) {
@@ -244,6 +244,39 @@ TEST_F(GcsActorSchedulerTest, TestNodeFailedWhenLeasing) {
ASSERT_EQ(0, failure_actors_.size());
}
TEST_F(GcsActorSchedulerTest, TestLeasingCancelledWhenLeasing) {
auto node = Mocker::GenNodeInfo();
auto node_id = ClientID::FromBinary(node->node_id());
gcs_node_manager_->AddNode(node);
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());
auto job_id = JobID::FromInt(1);
auto create_actor_request = Mocker::GenCreateActorRequest(job_id);
auto actor = std::make_shared<gcs::GcsActor>(create_actor_request);
// Schedule the actor with 1 available node, and the lease request should be send to the
// node.
gcs_actor_scheduler_->Schedule(actor);
ASSERT_EQ(1, raylet_client_->num_workers_requested);
ASSERT_EQ(1, raylet_client_->callbacks.size());
// Cancel the lease request.
gcs_actor_scheduler_->CancelOnLeasing(node_id, actor->GetActorID());
ASSERT_EQ(1, raylet_client_->num_workers_requested);
ASSERT_EQ(1, raylet_client_->callbacks.size());
// Grant a worker, which will influence nothing.
ASSERT_TRUE(raylet_client_->GrantWorkerLease(
node->node_manager_address(), node->node_manager_port(), WorkerID::FromRandom(),
node_id, ClientID::Nil()));
ASSERT_EQ(1, raylet_client_->num_workers_requested);
ASSERT_EQ(0, raylet_client_->callbacks.size());
ASSERT_EQ(0, gcs_actor_scheduler_->num_retry_leasing_count_);
ASSERT_EQ(0, success_actors_.size());
ASSERT_EQ(0, failure_actors_.size());
}
TEST_F(GcsActorSchedulerTest, TestNodeFailedWhenCreating) {
auto node = Mocker::GenNodeInfo();
auto node_id = ClientID::FromBinary(node->node_id());