diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index 34d2ad44d..1d4fb5a08 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -308,7 +308,7 @@ def test_actor_restart_on_node_failure(ray_start_cluster): def ready(self): return - actor = RestartableActor.remote() + actor = RestartableActor.options(detached=True).remote() ray.get(actor.ready.remote()) results = [actor.increase.remote() for _ in range(100)] # Kill actor node, while the above task is still being executed. diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index 44808d65a..2ebf21a86 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -3,18 +3,18 @@ import sys import ray -def test_gcs_server_restart(): - ray.init() +@ray.remote +class Increase: + def method(self, x): + return x + 2 - @ray.remote - class Increase: - def method(self, x): - return x + 2 - @ray.remote - def increase(x): - return x + 1 +@ray.remote +def increase(x): + return x + 1 + +def test_gcs_server_restart(ray_start_regular): actor1 = Increase.remote() result = ray.get(actor1.method.remote(1)) assert result == 3 @@ -31,7 +31,21 @@ def test_gcs_server_restart(): result = ray.get(increase.remote(1)) assert result == 2 - ray.shutdown() + + +def test_gcs_server_restart_during_actor_creation(ray_start_regular): + ids = [] + for i in range(0, 100): + actor = Increase.remote() + ids.append(actor.method.remote(1)) + + ray.worker._global_node.kill_gcs_server() + ray.worker._global_node.start_gcs_server() + + ready, unready = ray.wait(ids, 100, 240) + print("Ready objects is {}.".format(ready)) + print("Unready objects is {}.".format(unready)) + assert len(unready) == 0 if __name__ == "__main__": diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 7b08c1f17..8b6a00ceb 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1470,6 +1470,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, const std::shared_ptr &resource_ids, std::vector> *return_objects, ReferenceCounter::ReferenceTableProto *borrowed_refs) { + RAY_LOG(DEBUG) << "Executing task, task info = " << task_spec.DebugString(); task_queue_length_ -= 1; num_executed_tasks_ += 1; diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 33c2769e2..796ca2e9c 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -280,6 +280,19 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( rpc::SendReplyCallback send_reply_callback) { RAY_CHECK(waiter_ != nullptr) << "Must call init() prior to use"; const TaskSpecification task_spec(request.task_spec()); + + // If GCS server is restarted after sending an actor creation task to this core worker, + // the restarted GCS server will send the same actor creation task to the core worker + // again. We just need to ignore it and reply ok. + if (task_spec.IsActorCreationTask() && + worker_context_.GetCurrentActorID() == task_spec.ActorCreationId()) { + send_reply_callback(Status::OK(), nullptr, nullptr); + RAY_LOG(INFO) << "Ignoring duplicate actor creation task for actor " + << task_spec.ActorCreationId() + << ". This is likely due to a GCS server restart."; + return; + } + std::vector dependencies; for (size_t i = 0; i < task_spec.NumArgs(); ++i) { int count = task_spec.ArgIdCount(i); diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index d42d5015a..f18563f25 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -578,15 +578,15 @@ void GcsActorManager::OnWorkerDead(const ray::ClientID &node_id, } if (!actor_id.IsNil()) { - RAY_LOG(INFO) << "Worker " << worker_id << " on node " << node_id - << " failed, restarting actor " << actor_id; + RAY_LOG(WARNING) << "Worker " << worker_id << " on node " << node_id + << " failed, restarting actor " << actor_id; // Reconstruct the actor. ReconstructActor(actor_id, /*need_reschedule=*/!intentional_exit); } } void GcsActorManager::OnNodeDead(const ClientID &node_id) { - RAY_LOG(INFO) << "Node " << node_id << " failed, reconstructing actors"; + RAY_LOG(WARNING) << "Node " << node_id << " failed, reconstructing actors."; const auto it = owners_.find(node_id); if (it != owners_.end()) { std::vector children_ids; @@ -647,6 +647,7 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche if (remaining_restarts != 0) { mutable_actor_table_data->set_num_restarts(++num_restarts); mutable_actor_table_data->set_state(rpc::ActorTableData::RESTARTING); + mutable_actor_table_data->clear_resource_mapping(); // The backend storage is reliable in the future, so the status must be ok. RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor_id, *mutable_actor_table_data, @@ -693,32 +694,35 @@ void GcsActorManager::OnActorCreationFailed(std::shared_ptr actor) { void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr &actor) { auto actor_id = actor->GetActorID(); + RAY_LOG(DEBUG) << "Actor created successfully, actor id = " << actor_id; RAY_CHECK(registered_actors_.count(actor_id) > 0); actor->UpdateState(rpc::ActorTableData::ALIVE); auto actor_table_data = actor->GetActorTableData(); // The backend storage is reliable in the future, so the status must be ok. RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( - actor_id, actor_table_data, [this, actor_id, actor_table_data](Status status) { + actor_id, actor_table_data, + [this, actor_id, actor_table_data, actor](Status status) { RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(), actor_table_data.SerializeAsString(), nullptr)); + + // Invoke all callbacks for all registration requests of this actor (duplicated + // requests are included) and remove all of them from + // actor_to_register_callbacks_. + auto iter = actor_to_register_callbacks_.find(actor_id); + if (iter != actor_to_register_callbacks_.end()) { + for (auto &callback : iter->second) { + callback(actor); + } + actor_to_register_callbacks_.erase(iter); + } + + auto worker_id = actor->GetWorkerID(); + auto node_id = actor->GetNodeID(); + RAY_CHECK(!worker_id.IsNil()); + RAY_CHECK(!node_id.IsNil()); + RAY_CHECK(created_actors_[node_id].emplace(worker_id, actor_id).second); })); - - // Invoke all callbacks for all registration requests of this actor (duplicated - // requests are included) and remove all of them from actor_to_register_callbacks_. - auto iter = actor_to_register_callbacks_.find(actor_id); - if (iter != actor_to_register_callbacks_.end()) { - for (auto &callback : iter->second) { - callback(actor); - } - actor_to_register_callbacks_.erase(iter); - } - - auto worker_id = actor->GetWorkerID(); - auto node_id = actor->GetNodeID(); - RAY_CHECK(!worker_id.IsNil()); - RAY_CHECK(!node_id.IsNil()); - RAY_CHECK(created_actors_[node_id].emplace(worker_id, actor_id).second); } void GcsActorManager::SchedulePendingActors() { @@ -758,6 +762,18 @@ void GcsActorManager::LoadInitialData(const EmptyCallback &done) { } } } + + RAY_LOG(DEBUG) << "The number of registered actors is " << registered_actors_.size() + << ", and the number of created actors is " << created_actors_.size(); + for (auto &item : registered_actors_) { + auto &actor = item.second; + if (actor->GetState() != ray::rpc::ActorTableData::ALIVE) { + RAY_LOG(DEBUG) << "Rescheduling a non-alive actor, actor id = " + << actor->GetActorID() << ", state = " << actor->GetState(); + gcs_actor_scheduler_->Reschedule(actor); + } + } + RAY_LOG(INFO) << "Finished loading initial data."; done(); }; @@ -811,5 +827,10 @@ void GcsActorManager::OnJobFinished(const JobID &job_id) { RAY_CHECK_OK(gcs_table_storage_->ActorTable().GetByJobId(job_id, on_done)); } +const absl::flat_hash_map> + &GcsActorManager::GetCreatedActors() const { + return created_actors_; +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index 58722a3c9..b82e838a2 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -228,6 +228,12 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// \param job_id The id of finished job. void OnJobFinished(const JobID &job_id); + /// Get the created actors. + /// + /// \return The created actors. + const absl::flat_hash_map> + &GetCreatedActors() const; + private: /// A data structure representing an actor's owner. struct Owner { diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index f6a9d01ec..7ac91029c 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -40,23 +40,7 @@ GcsActorScheduler::GcsActorScheduler( } void GcsActorScheduler::Schedule(std::shared_ptr actor) { - auto node_id = actor->GetNodeID(); - if (!node_id.IsNil()) { - if (auto node = gcs_node_manager_.GetNode(node_id)) { - // If the actor is already tied to a node and the node is available, then record - // the relationship of the node and actor and then lease worker directly from the - // node. - RAY_CHECK(node_to_actors_when_leasing_[actor->GetNodeID()] - .emplace(actor->GetActorID()) - .second); - LeaseWorkerFromNode(actor, node); - return; - } - - // The actor is already tied to a node which is unavailable now, so we should reset - // the address. - actor->UpdateAddress(rpc::Address()); - } + RAY_CHECK(actor->GetNodeID().IsNil() && actor->GetWorkerID().IsNil()); // Select a node to lease worker for the actor. auto node = SelectNodeRandomly(); @@ -67,25 +51,41 @@ void GcsActorScheduler::Schedule(std::shared_ptr actor) { return; } - // Update the address of the actor as it is tied to a new node. + // Update the address of the actor as it is tied to a node. rpc::Address address; address.set_raylet_id(node->node_id()); actor->UpdateAddress(address); - // The backend storage is reliable in the future, so the status must be ok. - RAY_CHECK_OK(gcs_actor_table_.Put( - actor->GetActorID(), actor->GetActorTableData(), [this, actor](Status status) { - RAY_CHECK_OK(status); - RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor->GetActorID().Hex(), - actor->GetActorTableData().SerializeAsString(), - nullptr)); - // There is no promise that the node the - // actor tied to is still alive as the - // flush is asynchronously, so just - // invoke `Schedule` which will lease - // worker directly if the node is still - // available or select a new one if not. - Schedule(actor); - })); + + RAY_CHECK(node_to_actors_when_leasing_[actor->GetNodeID()] + .emplace(actor->GetActorID()) + .second); + + // Lease worker directly from the node. + LeaseWorkerFromNode(actor, node); +} + +void GcsActorScheduler::Reschedule(std::shared_ptr actor) { + if (!actor->GetWorkerID().IsNil()) { + RAY_LOG(INFO) + << "Actor " << actor->GetActorID() + << " is already tied to a leased worker. Create actor directly on worker."; + auto leased_worker = std::make_shared( + actor->GetAddress(), + VectorFromProtobuf(actor->GetMutableActorTableData()->resource_mapping()), + actor->GetActorID()); + auto iter_node = node_to_workers_when_creating_.find(actor->GetNodeID()); + if (iter_node != node_to_workers_when_creating_.end()) { + if (0 == iter_node->second.count(leased_worker->GetWorkerID())) { + iter_node->second.emplace(leased_worker->GetWorkerID(), leased_worker); + } + } else { + node_to_workers_when_creating_[actor->GetNodeID()].emplace( + leased_worker->GetWorkerID(), leased_worker); + } + CreateActorOnWorker(actor, leased_worker); + } else { + Schedule(actor); + } } std::vector GcsActorScheduler::CancelOnNode(const ClientID &node_id) { @@ -225,21 +225,25 @@ void GcsActorScheduler::HandleWorkerLeasedReply( // The worker did not succeed in the lease, but the specified node returned a new // node, and then try again on the new node. RAY_CHECK(!retry_at_raylet_address.raylet_id().empty()); - actor->UpdateAddress(retry_at_raylet_address); - // The backend storage is reliable in the future, so the status must be ok. - RAY_CHECK_OK(gcs_actor_table_.Put( - actor->GetActorID(), actor->GetActorTableData(), [this, actor](Status status) { - RAY_CHECK_OK(status); - RAY_CHECK_OK(gcs_pub_sub_->Publish( - ACTOR_CHANNEL, actor->GetActorID().Hex(), - actor->GetActorTableData().SerializeAsString(), nullptr)); - Schedule(actor); - })); + auto spill_back_node_id = ClientID::FromBinary(retry_at_raylet_address.raylet_id()); + if (auto spill_back_node = gcs_node_manager_.GetNode(spill_back_node_id)) { + actor->UpdateAddress(retry_at_raylet_address); + RAY_CHECK(node_to_actors_when_leasing_[actor->GetNodeID()] + .emplace(actor->GetActorID()) + .second); + LeaseWorkerFromNode(actor, spill_back_node); + } else { + // If the spill back node is dead, we need to schedule again. + actor->UpdateAddress(rpc::Address()); + actor->GetMutableActorTableData()->clear_resource_mapping(); + Schedule(actor); + } } else { // The worker is leased successfully from the specified node. std::vector resources; for (auto &resource : reply.resource_mapping()) { resources.emplace_back(resource); + actor->GetMutableActorTableData()->add_resource_mapping()->CopyFrom(resource); } auto leased_worker = std::make_shared( worker_address, std::move(resources), actor->GetActorID()); @@ -248,7 +252,11 @@ void GcsActorScheduler::HandleWorkerLeasedReply( .emplace(leased_worker->GetWorkerID(), leased_worker) .second); actor->UpdateAddress(leased_worker->GetAddress()); - CreateActorOnWorker(actor, leased_worker); + RAY_CHECK_OK(gcs_actor_table_.Put(actor->GetActorID(), actor->GetActorTableData(), + [this, actor, leased_worker](Status status) { + RAY_CHECK_OK(status); + CreateActorOnWorker(actor, leased_worker); + })); } } diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h index f618cfe25..fc404f97e 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h @@ -44,6 +44,11 @@ class GcsActorSchedulerInterface { /// \param actor to be scheduled. virtual void Schedule(std::shared_ptr actor) = 0; + /// Reschedule the specified actor after gcs server restarts. + /// + /// \param actor to be scheduled. + virtual void Reschedule(std::shared_ptr actor) = 0; + /// Cancel all actors that are being scheduled to the specified node. /// /// \param node_id ID of the node where the worker is located. @@ -93,6 +98,11 @@ class GcsActorScheduler : public GcsActorSchedulerInterface { /// \param actor to be scheduled. void Schedule(std::shared_ptr actor) override; + /// Reschedule the specified actor after gcs server restarts. + /// + /// \param actor to be scheduled. + void Reschedule(std::shared_ptr actor) override; + /// Cancel all actors that are being scheduled to the specified node. /// /// \param node_id ID of the node where the worker is located. diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 8709fc749..24b1aba46 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -316,6 +316,7 @@ void GcsNodeManager::AddNode(std::shared_ptr node) { std::shared_ptr GcsNodeManager::RemoveNode( const ray::ClientID &node_id, bool is_intended /*= false*/) { + RAY_LOG(INFO) << "Removing node, node id = " << node_id; std::shared_ptr removed_node; auto iter = alive_nodes_.find(node_id); if (iter != alive_nodes_.end()) { diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 3e5cfe2c6..eb4ca29a4 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -98,20 +98,24 @@ void GcsServer::Start() { rpc_server_.RegisterService(*worker_info_service_); auto load_completed_count = std::make_shared(0); - int load_count = 3; + int load_count = 2; auto on_done = [this, load_count, load_completed_count]() { ++(*load_completed_count); + // We will reschedule the unfinished actors, so we have to load the actor data at the + // end to make sure the other table data is loaded. if (*load_completed_count == load_count) { - // Start RPC server when all tables have finished loading initial data. - rpc_server_.Run(); + auto actor_manager_load_initial_data_callback = [this]() { + // Start RPC server when all tables have finished loading initial data. + rpc_server_.Run(); - // Store gcs rpc server address in redis. - StoreGcsServerAddressInRedis(); - is_started_ = true; + // Store gcs rpc server address in redis. + StoreGcsServerAddressInRedis(); + is_started_ = true; + }; + gcs_actor_manager_->LoadInitialData(actor_manager_load_initial_data_callback); } }; - gcs_actor_manager_->LoadInitialData(on_done); gcs_object_manager_->LoadInitialData(on_done); gcs_node_manager_->LoadInitialData(on_done); } diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index a410278e6..68f9cd6de 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -14,6 +14,7 @@ #include #include +#include "ray/common/test_util.h" #include @@ -28,6 +29,7 @@ class MockActorScheduler : public gcs::GcsActorSchedulerInterface { MockActorScheduler() {} void Schedule(std::shared_ptr actor) { actors.push_back(actor); } + void Reschedule(std::shared_ptr actor) {} MOCK_METHOD1(CancelOnNode, std::vector(const ClientID &node_id)); MOCK_METHOD2(CancelOnWorker, @@ -72,6 +74,15 @@ class GcsActorManagerTest : public ::testing::Test { GcsActorManagerTest() : mock_actor_scheduler_(new MockActorScheduler()), worker_client_(new MockWorkerClient()) { + std::promise promise; + thread_io_service_.reset(new std::thread([this, &promise] { + std::unique_ptr work( + new boost::asio::io_service::work(io_service_)); + promise.set_value(true); + io_service_.run(); + })); + promise.get_future().get(); + gcs_pub_sub_ = std::make_shared(redis_client_); store_client_ = std::make_shared(io_service_); gcs_table_storage_ = std::make_shared(io_service_); @@ -80,7 +91,28 @@ class GcsActorManagerTest : public ::testing::Test { [&](const rpc::Address &addr) { return worker_client_; })); } + virtual ~GcsActorManagerTest() { + io_service_.stop(); + thread_io_service_->join(); + } + + void WaitActorCreated(const ActorID &actor_id) { + auto condition = [this, actor_id]() { + auto created_actors = gcs_actor_manager_->GetCreatedActors(); + for (auto &node_iter : created_actors) { + for (auto &actor_iter : node_iter.second) { + if (actor_iter.second == actor_id) { + return true; + } + } + } + return false; + }; + EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count())); + } + boost::asio::io_service io_service_; + std::unique_ptr thread_io_service_; std::shared_ptr store_client_; std::shared_ptr gcs_table_storage_; std::shared_ptr mock_actor_scheduler_; @@ -88,6 +120,8 @@ class GcsActorManagerTest : public ::testing::Test { std::unique_ptr gcs_actor_manager_; std::shared_ptr gcs_pub_sub_; std::shared_ptr redis_client_; + + const std::chrono::milliseconds timeout_ms_{2000}; }; TEST_F(GcsActorManagerTest, TestBasic) { @@ -95,7 +129,8 @@ TEST_F(GcsActorManagerTest, TestBasic) { auto create_actor_request = Mocker::GenCreateActorRequest(job_id); std::vector> finished_actors; Status status = gcs_actor_manager_->RegisterActor( - create_actor_request, [&finished_actors](std::shared_ptr actor) { + create_actor_request, + [&finished_actors](const std::shared_ptr &actor) { finished_actors.emplace_back(actor); }); RAY_CHECK_OK(status); @@ -113,6 +148,7 @@ TEST_F(GcsActorManagerTest, TestBasic) { address.set_worker_id(worker_id.Binary()); actor->UpdateAddress(address); gcs_actor_manager_->OnActorCreationSuccess(actor); + WaitActorCreated(actor->GetActorID()); ASSERT_EQ(finished_actors.size(), 1); ASSERT_TRUE(worker_client_->Reply()); @@ -147,6 +183,7 @@ TEST_F(GcsActorManagerTest, TestSchedulingFailed) { address.set_worker_id(worker_id.Binary()); actor->UpdateAddress(address); gcs_actor_manager_->OnActorCreationSuccess(actor); + WaitActorCreated(actor->GetActorID()); ASSERT_EQ(finished_actors.size(), 1); } @@ -172,6 +209,7 @@ TEST_F(GcsActorManagerTest, TestWorkerFailure) { address.set_worker_id(worker_id.Binary()); actor->UpdateAddress(address); gcs_actor_manager_->OnActorCreationSuccess(actor); + WaitActorCreated(actor->GetActorID()); ASSERT_EQ(finished_actors.size(), 1); // Killing another worker does not affect this actor. @@ -212,6 +250,7 @@ TEST_F(GcsActorManagerTest, TestNodeFailure) { address.set_worker_id(worker_id.Binary()); actor->UpdateAddress(address); gcs_actor_manager_->OnActorCreationSuccess(actor); + WaitActorCreated(actor->GetActorID()); ASSERT_EQ(finished_actors.size(), 1); // Killing another node does not affect this actor. @@ -254,6 +293,7 @@ TEST_F(GcsActorManagerTest, TestActorReconstruction) { address.set_worker_id(worker_id.Binary()); actor->UpdateAddress(address); gcs_actor_manager_->OnActorCreationSuccess(actor); + WaitActorCreated(actor->GetActorID()); ASSERT_EQ(finished_actors.size(), 1); // Remove worker and then check that the actor is being restarted. @@ -270,6 +310,7 @@ TEST_F(GcsActorManagerTest, TestActorReconstruction) { address.set_raylet_id(node_id2.Binary()); actor->UpdateAddress(address); gcs_actor_manager_->OnActorCreationSuccess(actor); + WaitActorCreated(actor->GetActorID()); ASSERT_EQ(finished_actors.size(), 1); ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE); ASSERT_EQ(actor->GetNodeID(), node_id2); @@ -314,6 +355,7 @@ TEST_F(GcsActorManagerTest, TestActorRestartWhenOwnerDead) { address.set_worker_id(worker_id.Binary()); actor->UpdateAddress(address); gcs_actor_manager_->OnActorCreationSuccess(actor); + WaitActorCreated(actor->GetActorID()); ASSERT_EQ(finished_actors.size(), 1); // Remove the owner's node. @@ -357,6 +399,7 @@ TEST_F(GcsActorManagerTest, TestDetachedActorRestartWhenCreatorDead) { address.set_worker_id(worker_id.Binary()); actor->UpdateAddress(address); gcs_actor_manager_->OnActorCreationSuccess(actor); + WaitActorCreated(actor->GetActorID()); ASSERT_EQ(finished_actors.size(), 1); // Remove the owner's node. @@ -371,16 +414,16 @@ TEST_F(GcsActorManagerTest, TestNamedActors) { auto job_id_1 = JobID::FromInt(1); auto job_id_2 = JobID::FromInt(2); - auto request1 = - Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, /*name=*/"actor1"); + auto request1 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, + /*name=*/"actor1"); Status status = gcs_actor_manager_->RegisterActor( request1, [](std::shared_ptr actor) {}); ASSERT_TRUE(status.ok()); ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor1").Binary(), request1.task_spec().actor_creation_task_spec().actor_id()); - auto request2 = - Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, /*name=*/"actor2"); + auto request2 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, + /*name=*/"actor2"); status = gcs_actor_manager_->RegisterActor(request2, [](std::shared_ptr actor) {}); ASSERT_TRUE(status.ok()); @@ -391,8 +434,8 @@ TEST_F(GcsActorManagerTest, TestNamedActors) { ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor3"), ActorID::Nil()); // Check that naming collisions return Status::Invalid. - auto request3 = - Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, /*name=*/"actor2"); + auto request3 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, + /*name=*/"actor2"); status = gcs_actor_manager_->RegisterActor(request3, [](std::shared_ptr actor) {}); ASSERT_TRUE(status.IsInvalid()); @@ -400,8 +443,8 @@ TEST_F(GcsActorManagerTest, TestNamedActors) { request2.task_spec().actor_creation_task_spec().actor_id()); // Check that naming collisions are enforced across JobIDs. - auto request4 = - Mocker::GenCreateActorRequest(job_id_2, 0, /*is_detached=*/true, /*name=*/"actor2"); + auto request4 = Mocker::GenCreateActorRequest(job_id_2, 0, /*is_detached=*/true, + /*name=*/"actor2"); status = gcs_actor_manager_->RegisterActor(request4, [](std::shared_ptr actor) {}); ASSERT_TRUE(status.IsInvalid()); @@ -432,6 +475,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionWorkerFailure) { address.set_worker_id(worker_id.Binary()); actor->UpdateAddress(address); gcs_actor_manager_->OnActorCreationSuccess(actor); + WaitActorCreated(actor->GetActorID()); // Remove worker and then check that the actor is dead. gcs_actor_manager_->OnWorkerDead(node_id, worker_id); @@ -452,8 +496,8 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionWorkerFailure) { TEST_F(GcsActorManagerTest, TestNamedActorDeletionNodeFailure) { // Make sure named actor deletion succeeds when nodes fail. const auto job_id_1 = JobID::FromInt(1); - const auto request1 = - Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, /*name=*/"actor"); + const auto request1 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, + /*name=*/"actor"); Status status = gcs_actor_manager_->RegisterActor( request1, [](std::shared_ptr actor) {}); ASSERT_TRUE(status.ok()); @@ -471,6 +515,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNodeFailure) { address.set_worker_id(worker_id.Binary()); actor->UpdateAddress(address); gcs_actor_manager_->OnActorCreationSuccess(actor); + WaitActorCreated(actor->GetActorID()); // Remove node and then check that the actor is dead. EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id)); @@ -479,8 +524,8 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNodeFailure) { // Create an actor with the same name. This ensures that the name has been properly // deleted. - const auto request2 = - Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, /*name=*/"actor"); + const auto request2 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, + /*name=*/"actor"); status = gcs_actor_manager_->RegisterActor(request2, [](std::shared_ptr actor) {}); ASSERT_TRUE(status.ok()); @@ -492,8 +537,8 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNotHappendWhenReconstructed) { // Make sure named actor deletion succeeds when nodes fail. const auto job_id_1 = JobID::FromInt(1); // The dead actor will be reconstructed. - const auto request1 = - Mocker::GenCreateActorRequest(job_id_1, 1, /*is_detached=*/true, /*name=*/"actor"); + const auto request1 = Mocker::GenCreateActorRequest(job_id_1, 1, /*is_detached=*/true, + /*name=*/"actor"); Status status = gcs_actor_manager_->RegisterActor( request1, [](std::shared_ptr actor) {}); ASSERT_TRUE(status.ok()); @@ -511,6 +556,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNotHappendWhenReconstructed) { address.set_worker_id(worker_id.Binary()); actor->UpdateAddress(address); gcs_actor_manager_->OnActorCreationSuccess(actor); + WaitActorCreated(actor->GetActorID()); // Remove worker and then check that the actor is dead. The actor should be // reconstructed. @@ -521,8 +567,8 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNotHappendWhenReconstructed) { // It should fail because actor has been reconstructed, and names shouldn't have been // cleaned. const auto job_id_2 = JobID::FromInt(2); - const auto request2 = - Mocker::GenCreateActorRequest(job_id_2, 0, /*is_detached=*/true, /*name=*/"actor"); + const auto request2 = Mocker::GenCreateActorRequest(job_id_2, 0, /*is_detached=*/true, + /*name=*/"actor"); status = gcs_actor_manager_->RegisterActor(request2, [](std::shared_ptr actor) {}); ASSERT_TRUE(status.IsInvalid()); diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index 82211d37d..c58f9037b 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -348,12 +348,21 @@ TEST_F(GcsActorSchedulerTest, TestSpillback) { gcs_node_manager_->AddNode(node2); ASSERT_EQ(2, gcs_node_manager_->GetAllAliveNodes().size()); + // Grant with an invalid spillback node, and schedule again. + auto invalid_node_id = ClientID::FromBinary(Mocker::GenNodeInfo()->node_id()); + ASSERT_TRUE(raylet_client_->GrantWorkerLease( + node2->node_manager_address(), node2->node_manager_port(), WorkerID::Nil(), + node_id_1, invalid_node_id)); + ASSERT_EQ(2, raylet_client_->num_workers_requested); + ASSERT_EQ(1, raylet_client_->callbacks.size()); + ASSERT_EQ(0, worker_client_->callbacks.size()); + // Grant with a spillback node(node2), and the lease request should be send to the // node2. ASSERT_TRUE(raylet_client_->GrantWorkerLease(node2->node_manager_address(), node2->node_manager_port(), WorkerID::Nil(), node_id_1, node_id_2)); - ASSERT_EQ(2, raylet_client_->num_workers_requested); + ASSERT_EQ(3, raylet_client_->num_workers_requested); ASSERT_EQ(1, raylet_client_->callbacks.size()); ASSERT_EQ(0, worker_client_->callbacks.size()); @@ -376,6 +385,55 @@ TEST_F(GcsActorSchedulerTest, TestSpillback) { ASSERT_EQ(actor->GetWorkerID(), worker_id); } +TEST_F(GcsActorSchedulerTest, TestReschedule) { + auto node1 = Mocker::GenNodeInfo(); + auto node_id_1 = ClientID::FromBinary(node1->node_id()); + gcs_node_manager_->AddNode(node1); + ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + + // 1.Actor is already tied to a leased worker. + auto job_id = JobID::FromInt(1); + auto create_actor_request = Mocker::GenCreateActorRequest(job_id); + auto actor = std::make_shared(create_actor_request); + rpc::Address address; + WorkerID worker_id = WorkerID::FromRandom(); + address.set_raylet_id(node_id_1.Binary()); + address.set_worker_id(worker_id.Binary()); + actor->UpdateAddress(address); + + // Reschedule the actor with 1 available node, and the actor creation request should be + // send to the worker. + gcs_actor_scheduler_->Reschedule(actor); + ASSERT_EQ(0, raylet_client_->num_workers_requested); + ASSERT_EQ(0, raylet_client_->callbacks.size()); + ASSERT_EQ(1, worker_client_->callbacks.size()); + + // Reply the actor creation request, then the actor should be scheduled successfully. + ASSERT_TRUE(worker_client_->ReplyPushTask()); + ASSERT_EQ(0, worker_client_->callbacks.size()); + + // 2.Actor is not tied to a leased worker. + actor->UpdateAddress(rpc::Address()); + actor->GetMutableActorTableData()->clear_resource_mapping(); + + // Reschedule the actor with 1 available node. + gcs_actor_scheduler_->Reschedule(actor); + + // Grant a worker, then the actor creation request should be send to the worker. + ASSERT_TRUE(raylet_client_->GrantWorkerLease(node1->node_manager_address(), + node1->node_manager_port(), worker_id, + node_id_1, ClientID::Nil())); + ASSERT_EQ(0, raylet_client_->callbacks.size()); + ASSERT_EQ(1, worker_client_->callbacks.size()); + + // Reply the actor creation request, then the actor should be scheduled successfully. + ASSERT_TRUE(worker_client_->ReplyPushTask()); + ASSERT_EQ(0, worker_client_->callbacks.size()); + + ASSERT_EQ(0, failure_actors_.size()); + ASSERT_EQ(2, success_actors_.size()); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index ff35a40c7..fe8b19874 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -138,6 +138,9 @@ message ActorTableData { double timestamp = 13; // The task specification of this actor's creation task. TaskSpec task_spec = 14; + // Resource mapping ids acquired by the leased worker. This field is only set when this + // actor already has a leased worker. + repeated ResourceMapEntry resource_mapping = 15; } message ErrorTableData { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index fa294c3f7..b5862bba7 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2240,10 +2240,19 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag RAY_LOG(DEBUG) << "Submitting task: " << task.DebugString(); if (local_queues_.HasTask(task_id)) { - RAY_LOG(WARNING) << "Submitted task " << task_id - << " is already queued and will not be restarted. This is most " - "likely due to spurious reconstruction."; - return; + if (spec.IsActorCreationTask()) { + RAY_LOG(WARNING) << "Submitted actor creation task " << task_id + << " is already queued. This is most likely due to a GCS restart. " + "We will remove " + "the old one from the queue, and enqueue the new one."; + std::unordered_set task_ids{task_id}; + local_queues_.RemoveTasks(task_ids); + } else { + RAY_LOG(WARNING) << "Submitted task " << task_id + << " is already queued and will not be restarted. This is most " + "likely due to spurious reconstruction."; + return; + } } if (spec.IsActorTask()) {