GCS server error handling for actor creation (#8899)

This commit is contained in:
fangfengbin
2020-07-02 16:27:32 +08:00
committed by GitHub
parent a7a7bef622
commit 8fcfcc4100
14 changed files with 298 additions and 104 deletions
+1 -1
View File
@@ -308,7 +308,7 @@ def test_actor_restart_on_node_failure(ray_start_cluster):
def ready(self):
return
actor = RestartableActor.remote()
actor = RestartableActor.options(detached=True).remote()
ray.get(actor.ready.remote())
results = [actor.increase.remote() for _ in range(100)]
# Kill actor node, while the above task is still being executed.
+24 -10
View File
@@ -3,18 +3,18 @@ import sys
import ray
def test_gcs_server_restart():
ray.init()
@ray.remote
class Increase:
def method(self, x):
return x + 2
@ray.remote
class Increase:
def method(self, x):
return x + 2
@ray.remote
def increase(x):
return x + 1
@ray.remote
def increase(x):
return x + 1
def test_gcs_server_restart(ray_start_regular):
actor1 = Increase.remote()
result = ray.get(actor1.method.remote(1))
assert result == 3
@@ -31,7 +31,21 @@ def test_gcs_server_restart():
result = ray.get(increase.remote(1))
assert result == 2
ray.shutdown()
def test_gcs_server_restart_during_actor_creation(ray_start_regular):
ids = []
for i in range(0, 100):
actor = Increase.remote()
ids.append(actor.method.remote(1))
ray.worker._global_node.kill_gcs_server()
ray.worker._global_node.start_gcs_server()
ready, unready = ray.wait(ids, 100, 240)
print("Ready objects is {}.".format(ready))
print("Unready objects is {}.".format(unready))
assert len(unready) == 0
if __name__ == "__main__":
+1
View File
@@ -1470,6 +1470,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
const std::shared_ptr<ResourceMappingType> &resource_ids,
std::vector<std::shared_ptr<RayObject>> *return_objects,
ReferenceCounter::ReferenceTableProto *borrowed_refs) {
RAY_LOG(DEBUG) << "Executing task, task info = " << task_spec.DebugString();
task_queue_length_ -= 1;
num_executed_tasks_ += 1;
@@ -280,6 +280,19 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
rpc::SendReplyCallback send_reply_callback) {
RAY_CHECK(waiter_ != nullptr) << "Must call init() prior to use";
const TaskSpecification task_spec(request.task_spec());
// If GCS server is restarted after sending an actor creation task to this core worker,
// the restarted GCS server will send the same actor creation task to the core worker
// again. We just need to ignore it and reply ok.
if (task_spec.IsActorCreationTask() &&
worker_context_.GetCurrentActorID() == task_spec.ActorCreationId()) {
send_reply_callback(Status::OK(), nullptr, nullptr);
RAY_LOG(INFO) << "Ignoring duplicate actor creation task for actor "
<< task_spec.ActorCreationId()
<< ". This is likely due to a GCS server restart.";
return;
}
std::vector<ObjectID> dependencies;
for (size_t i = 0; i < task_spec.NumArgs(); ++i) {
int count = task_spec.ArgIdCount(i);
+41 -20
View File
@@ -578,15 +578,15 @@ void GcsActorManager::OnWorkerDead(const ray::ClientID &node_id,
}
if (!actor_id.IsNil()) {
RAY_LOG(INFO) << "Worker " << worker_id << " on node " << node_id
<< " failed, restarting actor " << actor_id;
RAY_LOG(WARNING) << "Worker " << worker_id << " on node " << node_id
<< " failed, restarting actor " << actor_id;
// Reconstruct the actor.
ReconstructActor(actor_id, /*need_reschedule=*/!intentional_exit);
}
}
void GcsActorManager::OnNodeDead(const ClientID &node_id) {
RAY_LOG(INFO) << "Node " << node_id << " failed, reconstructing actors";
RAY_LOG(WARNING) << "Node " << node_id << " failed, reconstructing actors.";
const auto it = owners_.find(node_id);
if (it != owners_.end()) {
std::vector<ActorID> children_ids;
@@ -647,6 +647,7 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
if (remaining_restarts != 0) {
mutable_actor_table_data->set_num_restarts(++num_restarts);
mutable_actor_table_data->set_state(rpc::ActorTableData::RESTARTING);
mutable_actor_table_data->clear_resource_mapping();
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor_id, *mutable_actor_table_data,
@@ -693,32 +694,35 @@ void GcsActorManager::OnActorCreationFailed(std::shared_ptr<GcsActor> actor) {
void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr<GcsActor> &actor) {
auto actor_id = actor->GetActorID();
RAY_LOG(DEBUG) << "Actor created successfully, actor id = " << actor_id;
RAY_CHECK(registered_actors_.count(actor_id) > 0);
actor->UpdateState(rpc::ActorTableData::ALIVE);
auto actor_table_data = actor->GetActorTableData();
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor_id, actor_table_data, [this, actor_id, actor_table_data](Status status) {
actor_id, actor_table_data,
[this, actor_id, actor_table_data, actor](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data.SerializeAsString(),
nullptr));
// Invoke all callbacks for all registration requests of this actor (duplicated
// requests are included) and remove all of them from
// actor_to_register_callbacks_.
auto iter = actor_to_register_callbacks_.find(actor_id);
if (iter != actor_to_register_callbacks_.end()) {
for (auto &callback : iter->second) {
callback(actor);
}
actor_to_register_callbacks_.erase(iter);
}
auto worker_id = actor->GetWorkerID();
auto node_id = actor->GetNodeID();
RAY_CHECK(!worker_id.IsNil());
RAY_CHECK(!node_id.IsNil());
RAY_CHECK(created_actors_[node_id].emplace(worker_id, actor_id).second);
}));
// Invoke all callbacks for all registration requests of this actor (duplicated
// requests are included) and remove all of them from actor_to_register_callbacks_.
auto iter = actor_to_register_callbacks_.find(actor_id);
if (iter != actor_to_register_callbacks_.end()) {
for (auto &callback : iter->second) {
callback(actor);
}
actor_to_register_callbacks_.erase(iter);
}
auto worker_id = actor->GetWorkerID();
auto node_id = actor->GetNodeID();
RAY_CHECK(!worker_id.IsNil());
RAY_CHECK(!node_id.IsNil());
RAY_CHECK(created_actors_[node_id].emplace(worker_id, actor_id).second);
}
void GcsActorManager::SchedulePendingActors() {
@@ -758,6 +762,18 @@ void GcsActorManager::LoadInitialData(const EmptyCallback &done) {
}
}
}
RAY_LOG(DEBUG) << "The number of registered actors is " << registered_actors_.size()
<< ", and the number of created actors is " << created_actors_.size();
for (auto &item : registered_actors_) {
auto &actor = item.second;
if (actor->GetState() != ray::rpc::ActorTableData::ALIVE) {
RAY_LOG(DEBUG) << "Rescheduling a non-alive actor, actor id = "
<< actor->GetActorID() << ", state = " << actor->GetState();
gcs_actor_scheduler_->Reschedule(actor);
}
}
RAY_LOG(INFO) << "Finished loading initial data.";
done();
};
@@ -811,5 +827,10 @@ void GcsActorManager::OnJobFinished(const JobID &job_id) {
RAY_CHECK_OK(gcs_table_storage_->ActorTable().GetByJobId(job_id, on_done));
}
const absl::flat_hash_map<ClientID, absl::flat_hash_map<WorkerID, ActorID>>
&GcsActorManager::GetCreatedActors() const {
return created_actors_;
}
} // namespace gcs
} // namespace ray
@@ -228,6 +228,12 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// \param job_id The id of finished job.
void OnJobFinished(const JobID &job_id);
/// Get the created actors.
///
/// \return The created actors.
const absl::flat_hash_map<ClientID, absl::flat_hash_map<WorkerID, ActorID>>
&GetCreatedActors() const;
private:
/// A data structure representing an actor's owner.
struct Owner {
+52 -44
View File
@@ -40,23 +40,7 @@ GcsActorScheduler::GcsActorScheduler(
}
void GcsActorScheduler::Schedule(std::shared_ptr<GcsActor> actor) {
auto node_id = actor->GetNodeID();
if (!node_id.IsNil()) {
if (auto node = gcs_node_manager_.GetNode(node_id)) {
// If the actor is already tied to a node and the node is available, then record
// the relationship of the node and actor and then lease worker directly from the
// node.
RAY_CHECK(node_to_actors_when_leasing_[actor->GetNodeID()]
.emplace(actor->GetActorID())
.second);
LeaseWorkerFromNode(actor, node);
return;
}
// The actor is already tied to a node which is unavailable now, so we should reset
// the address.
actor->UpdateAddress(rpc::Address());
}
RAY_CHECK(actor->GetNodeID().IsNil() && actor->GetWorkerID().IsNil());
// Select a node to lease worker for the actor.
auto node = SelectNodeRandomly();
@@ -67,25 +51,41 @@ void GcsActorScheduler::Schedule(std::shared_ptr<GcsActor> actor) {
return;
}
// Update the address of the actor as it is tied to a new node.
// Update the address of the actor as it is tied to a node.
rpc::Address address;
address.set_raylet_id(node->node_id());
actor->UpdateAddress(address);
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(gcs_actor_table_.Put(
actor->GetActorID(), actor->GetActorTableData(), [this, actor](Status status) {
RAY_CHECK_OK(status);
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor->GetActorID().Hex(),
actor->GetActorTableData().SerializeAsString(),
nullptr));
// There is no promise that the node the
// actor tied to is still alive as the
// flush is asynchronously, so just
// invoke `Schedule` which will lease
// worker directly if the node is still
// available or select a new one if not.
Schedule(actor);
}));
RAY_CHECK(node_to_actors_when_leasing_[actor->GetNodeID()]
.emplace(actor->GetActorID())
.second);
// Lease worker directly from the node.
LeaseWorkerFromNode(actor, node);
}
void GcsActorScheduler::Reschedule(std::shared_ptr<GcsActor> actor) {
if (!actor->GetWorkerID().IsNil()) {
RAY_LOG(INFO)
<< "Actor " << actor->GetActorID()
<< " is already tied to a leased worker. Create actor directly on worker.";
auto leased_worker = std::make_shared<GcsLeasedWorker>(
actor->GetAddress(),
VectorFromProtobuf(actor->GetMutableActorTableData()->resource_mapping()),
actor->GetActorID());
auto iter_node = node_to_workers_when_creating_.find(actor->GetNodeID());
if (iter_node != node_to_workers_when_creating_.end()) {
if (0 == iter_node->second.count(leased_worker->GetWorkerID())) {
iter_node->second.emplace(leased_worker->GetWorkerID(), leased_worker);
}
} else {
node_to_workers_when_creating_[actor->GetNodeID()].emplace(
leased_worker->GetWorkerID(), leased_worker);
}
CreateActorOnWorker(actor, leased_worker);
} else {
Schedule(actor);
}
}
std::vector<ActorID> GcsActorScheduler::CancelOnNode(const ClientID &node_id) {
@@ -225,21 +225,25 @@ void GcsActorScheduler::HandleWorkerLeasedReply(
// The worker did not succeed in the lease, but the specified node returned a new
// node, and then try again on the new node.
RAY_CHECK(!retry_at_raylet_address.raylet_id().empty());
actor->UpdateAddress(retry_at_raylet_address);
// The backend storage is reliable in the future, so the status must be ok.
RAY_CHECK_OK(gcs_actor_table_.Put(
actor->GetActorID(), actor->GetActorTableData(), [this, actor](Status status) {
RAY_CHECK_OK(status);
RAY_CHECK_OK(gcs_pub_sub_->Publish(
ACTOR_CHANNEL, actor->GetActorID().Hex(),
actor->GetActorTableData().SerializeAsString(), nullptr));
Schedule(actor);
}));
auto spill_back_node_id = ClientID::FromBinary(retry_at_raylet_address.raylet_id());
if (auto spill_back_node = gcs_node_manager_.GetNode(spill_back_node_id)) {
actor->UpdateAddress(retry_at_raylet_address);
RAY_CHECK(node_to_actors_when_leasing_[actor->GetNodeID()]
.emplace(actor->GetActorID())
.second);
LeaseWorkerFromNode(actor, spill_back_node);
} else {
// If the spill back node is dead, we need to schedule again.
actor->UpdateAddress(rpc::Address());
actor->GetMutableActorTableData()->clear_resource_mapping();
Schedule(actor);
}
} else {
// The worker is leased successfully from the specified node.
std::vector<rpc::ResourceMapEntry> resources;
for (auto &resource : reply.resource_mapping()) {
resources.emplace_back(resource);
actor->GetMutableActorTableData()->add_resource_mapping()->CopyFrom(resource);
}
auto leased_worker = std::make_shared<GcsLeasedWorker>(
worker_address, std::move(resources), actor->GetActorID());
@@ -248,7 +252,11 @@ void GcsActorScheduler::HandleWorkerLeasedReply(
.emplace(leased_worker->GetWorkerID(), leased_worker)
.second);
actor->UpdateAddress(leased_worker->GetAddress());
CreateActorOnWorker(actor, leased_worker);
RAY_CHECK_OK(gcs_actor_table_.Put(actor->GetActorID(), actor->GetActorTableData(),
[this, actor, leased_worker](Status status) {
RAY_CHECK_OK(status);
CreateActorOnWorker(actor, leased_worker);
}));
}
}
@@ -44,6 +44,11 @@ class GcsActorSchedulerInterface {
/// \param actor to be scheduled.
virtual void Schedule(std::shared_ptr<GcsActor> actor) = 0;
/// Reschedule the specified actor after gcs server restarts.
///
/// \param actor to be scheduled.
virtual void Reschedule(std::shared_ptr<GcsActor> actor) = 0;
/// Cancel all actors that are being scheduled to the specified node.
///
/// \param node_id ID of the node where the worker is located.
@@ -93,6 +98,11 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
/// \param actor to be scheduled.
void Schedule(std::shared_ptr<GcsActor> actor) override;
/// Reschedule the specified actor after gcs server restarts.
///
/// \param actor to be scheduled.
void Reschedule(std::shared_ptr<GcsActor> actor) override;
/// Cancel all actors that are being scheduled to the specified node.
///
/// \param node_id ID of the node where the worker is located.
@@ -316,6 +316,7 @@ void GcsNodeManager::AddNode(std::shared_ptr<rpc::GcsNodeInfo> node) {
std::shared_ptr<rpc::GcsNodeInfo> GcsNodeManager::RemoveNode(
const ray::ClientID &node_id, bool is_intended /*= false*/) {
RAY_LOG(INFO) << "Removing node, node id = " << node_id;
std::shared_ptr<rpc::GcsNodeInfo> removed_node;
auto iter = alive_nodes_.find(node_id);
if (iter != alive_nodes_.end()) {
+11 -7
View File
@@ -98,20 +98,24 @@ void GcsServer::Start() {
rpc_server_.RegisterService(*worker_info_service_);
auto load_completed_count = std::make_shared<int>(0);
int load_count = 3;
int load_count = 2;
auto on_done = [this, load_count, load_completed_count]() {
++(*load_completed_count);
// We will reschedule the unfinished actors, so we have to load the actor data at the
// end to make sure the other table data is loaded.
if (*load_completed_count == load_count) {
// Start RPC server when all tables have finished loading initial data.
rpc_server_.Run();
auto actor_manager_load_initial_data_callback = [this]() {
// Start RPC server when all tables have finished loading initial data.
rpc_server_.Run();
// Store gcs rpc server address in redis.
StoreGcsServerAddressInRedis();
is_started_ = true;
// Store gcs rpc server address in redis.
StoreGcsServerAddressInRedis();
is_started_ = true;
};
gcs_actor_manager_->LoadInitialData(actor_manager_load_initial_data_callback);
}
};
gcs_actor_manager_->LoadInitialData(on_done);
gcs_object_manager_->LoadInitialData(on_done);
gcs_node_manager_->LoadInitialData(on_done);
}
@@ -14,6 +14,7 @@
#include <ray/gcs/gcs_server/test/gcs_server_test_util.h>
#include <ray/gcs/test/gcs_test_util.h>
#include "ray/common/test_util.h"
#include <memory>
@@ -28,6 +29,7 @@ class MockActorScheduler : public gcs::GcsActorSchedulerInterface {
MockActorScheduler() {}
void Schedule(std::shared_ptr<gcs::GcsActor> actor) { actors.push_back(actor); }
void Reschedule(std::shared_ptr<gcs::GcsActor> actor) {}
MOCK_METHOD1(CancelOnNode, std::vector<ActorID>(const ClientID &node_id));
MOCK_METHOD2(CancelOnWorker,
@@ -72,6 +74,15 @@ class GcsActorManagerTest : public ::testing::Test {
GcsActorManagerTest()
: mock_actor_scheduler_(new MockActorScheduler()),
worker_client_(new MockWorkerClient()) {
std::promise<bool> promise;
thread_io_service_.reset(new std::thread([this, &promise] {
std::unique_ptr<boost::asio::io_service::work> work(
new boost::asio::io_service::work(io_service_));
promise.set_value(true);
io_service_.run();
}));
promise.get_future().get();
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
@@ -80,7 +91,28 @@ class GcsActorManagerTest : public ::testing::Test {
[&](const rpc::Address &addr) { return worker_client_; }));
}
virtual ~GcsActorManagerTest() {
io_service_.stop();
thread_io_service_->join();
}
void WaitActorCreated(const ActorID &actor_id) {
auto condition = [this, actor_id]() {
auto created_actors = gcs_actor_manager_->GetCreatedActors();
for (auto &node_iter : created_actors) {
for (auto &actor_iter : node_iter.second) {
if (actor_iter.second == actor_id) {
return true;
}
}
}
return false;
};
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));
}
boost::asio::io_service io_service_;
std::unique_ptr<std::thread> thread_io_service_;
std::shared_ptr<gcs::StoreClient> store_client_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<MockActorScheduler> mock_actor_scheduler_;
@@ -88,6 +120,8 @@ class GcsActorManagerTest : public ::testing::Test {
std::unique_ptr<gcs::GcsActorManager> gcs_actor_manager_;
std::shared_ptr<GcsServerMocker::MockGcsPubSub> gcs_pub_sub_;
std::shared_ptr<gcs::RedisClient> redis_client_;
const std::chrono::milliseconds timeout_ms_{2000};
};
TEST_F(GcsActorManagerTest, TestBasic) {
@@ -95,7 +129,8 @@ TEST_F(GcsActorManagerTest, TestBasic) {
auto create_actor_request = Mocker::GenCreateActorRequest(job_id);
std::vector<std::shared_ptr<gcs::GcsActor>> finished_actors;
Status status = gcs_actor_manager_->RegisterActor(
create_actor_request, [&finished_actors](std::shared_ptr<gcs::GcsActor> actor) {
create_actor_request,
[&finished_actors](const std::shared_ptr<gcs::GcsActor> &actor) {
finished_actors.emplace_back(actor);
});
RAY_CHECK_OK(status);
@@ -113,6 +148,7 @@ TEST_F(GcsActorManagerTest, TestBasic) {
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_->OnActorCreationSuccess(actor);
WaitActorCreated(actor->GetActorID());
ASSERT_EQ(finished_actors.size(), 1);
ASSERT_TRUE(worker_client_->Reply());
@@ -147,6 +183,7 @@ TEST_F(GcsActorManagerTest, TestSchedulingFailed) {
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_->OnActorCreationSuccess(actor);
WaitActorCreated(actor->GetActorID());
ASSERT_EQ(finished_actors.size(), 1);
}
@@ -172,6 +209,7 @@ TEST_F(GcsActorManagerTest, TestWorkerFailure) {
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_->OnActorCreationSuccess(actor);
WaitActorCreated(actor->GetActorID());
ASSERT_EQ(finished_actors.size(), 1);
// Killing another worker does not affect this actor.
@@ -212,6 +250,7 @@ TEST_F(GcsActorManagerTest, TestNodeFailure) {
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_->OnActorCreationSuccess(actor);
WaitActorCreated(actor->GetActorID());
ASSERT_EQ(finished_actors.size(), 1);
// Killing another node does not affect this actor.
@@ -254,6 +293,7 @@ TEST_F(GcsActorManagerTest, TestActorReconstruction) {
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_->OnActorCreationSuccess(actor);
WaitActorCreated(actor->GetActorID());
ASSERT_EQ(finished_actors.size(), 1);
// Remove worker and then check that the actor is being restarted.
@@ -270,6 +310,7 @@ TEST_F(GcsActorManagerTest, TestActorReconstruction) {
address.set_raylet_id(node_id2.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_->OnActorCreationSuccess(actor);
WaitActorCreated(actor->GetActorID());
ASSERT_EQ(finished_actors.size(), 1);
ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE);
ASSERT_EQ(actor->GetNodeID(), node_id2);
@@ -314,6 +355,7 @@ TEST_F(GcsActorManagerTest, TestActorRestartWhenOwnerDead) {
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_->OnActorCreationSuccess(actor);
WaitActorCreated(actor->GetActorID());
ASSERT_EQ(finished_actors.size(), 1);
// Remove the owner's node.
@@ -357,6 +399,7 @@ TEST_F(GcsActorManagerTest, TestDetachedActorRestartWhenCreatorDead) {
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_->OnActorCreationSuccess(actor);
WaitActorCreated(actor->GetActorID());
ASSERT_EQ(finished_actors.size(), 1);
// Remove the owner's node.
@@ -371,16 +414,16 @@ TEST_F(GcsActorManagerTest, TestNamedActors) {
auto job_id_1 = JobID::FromInt(1);
auto job_id_2 = JobID::FromInt(2);
auto request1 =
Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, /*name=*/"actor1");
auto request1 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true,
/*name=*/"actor1");
Status status = gcs_actor_manager_->RegisterActor(
request1, [](std::shared_ptr<gcs::GcsActor> actor) {});
ASSERT_TRUE(status.ok());
ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor1").Binary(),
request1.task_spec().actor_creation_task_spec().actor_id());
auto request2 =
Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, /*name=*/"actor2");
auto request2 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true,
/*name=*/"actor2");
status = gcs_actor_manager_->RegisterActor(request2,
[](std::shared_ptr<gcs::GcsActor> actor) {});
ASSERT_TRUE(status.ok());
@@ -391,8 +434,8 @@ TEST_F(GcsActorManagerTest, TestNamedActors) {
ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor3"), ActorID::Nil());
// Check that naming collisions return Status::Invalid.
auto request3 =
Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, /*name=*/"actor2");
auto request3 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true,
/*name=*/"actor2");
status = gcs_actor_manager_->RegisterActor(request3,
[](std::shared_ptr<gcs::GcsActor> actor) {});
ASSERT_TRUE(status.IsInvalid());
@@ -400,8 +443,8 @@ TEST_F(GcsActorManagerTest, TestNamedActors) {
request2.task_spec().actor_creation_task_spec().actor_id());
// Check that naming collisions are enforced across JobIDs.
auto request4 =
Mocker::GenCreateActorRequest(job_id_2, 0, /*is_detached=*/true, /*name=*/"actor2");
auto request4 = Mocker::GenCreateActorRequest(job_id_2, 0, /*is_detached=*/true,
/*name=*/"actor2");
status = gcs_actor_manager_->RegisterActor(request4,
[](std::shared_ptr<gcs::GcsActor> actor) {});
ASSERT_TRUE(status.IsInvalid());
@@ -432,6 +475,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionWorkerFailure) {
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_->OnActorCreationSuccess(actor);
WaitActorCreated(actor->GetActorID());
// Remove worker and then check that the actor is dead.
gcs_actor_manager_->OnWorkerDead(node_id, worker_id);
@@ -452,8 +496,8 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionWorkerFailure) {
TEST_F(GcsActorManagerTest, TestNamedActorDeletionNodeFailure) {
// Make sure named actor deletion succeeds when nodes fail.
const auto job_id_1 = JobID::FromInt(1);
const auto request1 =
Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, /*name=*/"actor");
const auto request1 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true,
/*name=*/"actor");
Status status = gcs_actor_manager_->RegisterActor(
request1, [](std::shared_ptr<gcs::GcsActor> actor) {});
ASSERT_TRUE(status.ok());
@@ -471,6 +515,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNodeFailure) {
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_->OnActorCreationSuccess(actor);
WaitActorCreated(actor->GetActorID());
// Remove node and then check that the actor is dead.
EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id));
@@ -479,8 +524,8 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNodeFailure) {
// Create an actor with the same name. This ensures that the name has been properly
// deleted.
const auto request2 =
Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, /*name=*/"actor");
const auto request2 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true,
/*name=*/"actor");
status = gcs_actor_manager_->RegisterActor(request2,
[](std::shared_ptr<gcs::GcsActor> actor) {});
ASSERT_TRUE(status.ok());
@@ -492,8 +537,8 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNotHappendWhenReconstructed) {
// Make sure named actor deletion succeeds when nodes fail.
const auto job_id_1 = JobID::FromInt(1);
// The dead actor will be reconstructed.
const auto request1 =
Mocker::GenCreateActorRequest(job_id_1, 1, /*is_detached=*/true, /*name=*/"actor");
const auto request1 = Mocker::GenCreateActorRequest(job_id_1, 1, /*is_detached=*/true,
/*name=*/"actor");
Status status = gcs_actor_manager_->RegisterActor(
request1, [](std::shared_ptr<gcs::GcsActor> actor) {});
ASSERT_TRUE(status.ok());
@@ -511,6 +556,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNotHappendWhenReconstructed) {
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
gcs_actor_manager_->OnActorCreationSuccess(actor);
WaitActorCreated(actor->GetActorID());
// Remove worker and then check that the actor is dead. The actor should be
// reconstructed.
@@ -521,8 +567,8 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNotHappendWhenReconstructed) {
// It should fail because actor has been reconstructed, and names shouldn't have been
// cleaned.
const auto job_id_2 = JobID::FromInt(2);
const auto request2 =
Mocker::GenCreateActorRequest(job_id_2, 0, /*is_detached=*/true, /*name=*/"actor");
const auto request2 = Mocker::GenCreateActorRequest(job_id_2, 0, /*is_detached=*/true,
/*name=*/"actor");
status = gcs_actor_manager_->RegisterActor(request2,
[](std::shared_ptr<gcs::GcsActor> actor) {});
ASSERT_TRUE(status.IsInvalid());
@@ -348,12 +348,21 @@ TEST_F(GcsActorSchedulerTest, TestSpillback) {
gcs_node_manager_->AddNode(node2);
ASSERT_EQ(2, gcs_node_manager_->GetAllAliveNodes().size());
// Grant with an invalid spillback node, and schedule again.
auto invalid_node_id = ClientID::FromBinary(Mocker::GenNodeInfo()->node_id());
ASSERT_TRUE(raylet_client_->GrantWorkerLease(
node2->node_manager_address(), node2->node_manager_port(), WorkerID::Nil(),
node_id_1, invalid_node_id));
ASSERT_EQ(2, raylet_client_->num_workers_requested);
ASSERT_EQ(1, raylet_client_->callbacks.size());
ASSERT_EQ(0, worker_client_->callbacks.size());
// Grant with a spillback node(node2), and the lease request should be send to the
// node2.
ASSERT_TRUE(raylet_client_->GrantWorkerLease(node2->node_manager_address(),
node2->node_manager_port(),
WorkerID::Nil(), node_id_1, node_id_2));
ASSERT_EQ(2, raylet_client_->num_workers_requested);
ASSERT_EQ(3, raylet_client_->num_workers_requested);
ASSERT_EQ(1, raylet_client_->callbacks.size());
ASSERT_EQ(0, worker_client_->callbacks.size());
@@ -376,6 +385,55 @@ TEST_F(GcsActorSchedulerTest, TestSpillback) {
ASSERT_EQ(actor->GetWorkerID(), worker_id);
}
TEST_F(GcsActorSchedulerTest, TestReschedule) {
auto node1 = Mocker::GenNodeInfo();
auto node_id_1 = ClientID::FromBinary(node1->node_id());
gcs_node_manager_->AddNode(node1);
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());
// 1.Actor is already tied to a leased worker.
auto job_id = JobID::FromInt(1);
auto create_actor_request = Mocker::GenCreateActorRequest(job_id);
auto actor = std::make_shared<gcs::GcsActor>(create_actor_request);
rpc::Address address;
WorkerID worker_id = WorkerID::FromRandom();
address.set_raylet_id(node_id_1.Binary());
address.set_worker_id(worker_id.Binary());
actor->UpdateAddress(address);
// Reschedule the actor with 1 available node, and the actor creation request should be
// send to the worker.
gcs_actor_scheduler_->Reschedule(actor);
ASSERT_EQ(0, raylet_client_->num_workers_requested);
ASSERT_EQ(0, raylet_client_->callbacks.size());
ASSERT_EQ(1, worker_client_->callbacks.size());
// Reply the actor creation request, then the actor should be scheduled successfully.
ASSERT_TRUE(worker_client_->ReplyPushTask());
ASSERT_EQ(0, worker_client_->callbacks.size());
// 2.Actor is not tied to a leased worker.
actor->UpdateAddress(rpc::Address());
actor->GetMutableActorTableData()->clear_resource_mapping();
// Reschedule the actor with 1 available node.
gcs_actor_scheduler_->Reschedule(actor);
// Grant a worker, then the actor creation request should be send to the worker.
ASSERT_TRUE(raylet_client_->GrantWorkerLease(node1->node_manager_address(),
node1->node_manager_port(), worker_id,
node_id_1, ClientID::Nil()));
ASSERT_EQ(0, raylet_client_->callbacks.size());
ASSERT_EQ(1, worker_client_->callbacks.size());
// Reply the actor creation request, then the actor should be scheduled successfully.
ASSERT_TRUE(worker_client_->ReplyPushTask());
ASSERT_EQ(0, worker_client_->callbacks.size());
ASSERT_EQ(0, failure_actors_.size());
ASSERT_EQ(2, success_actors_.size());
}
} // namespace ray
int main(int argc, char **argv) {
+3
View File
@@ -138,6 +138,9 @@ message ActorTableData {
double timestamp = 13;
// The task specification of this actor's creation task.
TaskSpec task_spec = 14;
// Resource mapping ids acquired by the leased worker. This field is only set when this
// actor already has a leased worker.
repeated ResourceMapEntry resource_mapping = 15;
}
message ErrorTableData {
+13 -4
View File
@@ -2240,10 +2240,19 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
RAY_LOG(DEBUG) << "Submitting task: " << task.DebugString();
if (local_queues_.HasTask(task_id)) {
RAY_LOG(WARNING) << "Submitted task " << task_id
<< " is already queued and will not be restarted. This is most "
"likely due to spurious reconstruction.";
return;
if (spec.IsActorCreationTask()) {
RAY_LOG(WARNING) << "Submitted actor creation task " << task_id
<< " is already queued. This is most likely due to a GCS restart. "
"We will remove "
"the old one from the queue, and enqueue the new one.";
std::unordered_set<TaskID> task_ids{task_id};
local_queues_.RemoveTasks(task_ids);
} else {
RAY_LOG(WARNING) << "Submitted task " << task_id
<< " is already queued and will not be restarted. This is most "
"likely due to spurious reconstruction.";
return;
}
}
if (spec.IsActorTask()) {