Disconnect direct task workers that died (#6213)

* Disconnect workers that died so that we push the worker died error to redis

* Push error if actor is non nil

* fix test
This commit is contained in:
Stephanie Wang
2019-11-21 15:37:15 -08:00
committed by GitHub
parent ba86c75c21
commit eb7b73d731
6 changed files with 40 additions and 17 deletions
@@ -23,8 +23,12 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface {
class MockRayletClient : public WorkerLeaseInterface {
public:
ray::Status ReturnWorker(int worker_port) override {
num_workers_returned += 1;
ray::Status ReturnWorker(int worker_port, bool disconnect_worker) override {
if (disconnect_worker) {
num_workers_disconnected++;
} else {
num_workers_returned++;
}
return Status::OK();
}
@@ -63,6 +67,7 @@ class MockRayletClient : public WorkerLeaseInterface {
int num_workers_requested = 0;
int num_workers_returned = 0;
int num_workers_disconnected = 0;
std::list<rpc::ClientCallback<rpc::WorkerLeaseReply>> callbacks = {};
};
@@ -200,6 +205,7 @@ TEST(DirectTaskTransportTest, TestSubmitOneTask) {
worker_client->callbacks[0](Status::OK(), rpc::PushTaskReply());
ASSERT_EQ(raylet_client->num_workers_returned, 1);
ASSERT_EQ(raylet_client->num_workers_disconnected, 0);
}
TEST(DirectTaskTransportTest, TestHandleTaskFailure) {
@@ -216,7 +222,8 @@ TEST(DirectTaskTransportTest, TestHandleTaskFailure) {
// Simulate a system failure, i.e., worker died unexpectedly.
worker_client->callbacks[0](Status::IOError("oops"), rpc::PushTaskReply());
ASSERT_EQ(worker_client->callbacks.size(), 1);
ASSERT_EQ(raylet_client->num_workers_returned, 1);
ASSERT_EQ(raylet_client->num_workers_returned, 0);
ASSERT_EQ(raylet_client->num_workers_disconnected, 1);
}
TEST(DirectTaskTransportTest, TestConcurrentWorkerLeases) {
@@ -257,6 +264,7 @@ TEST(DirectTaskTransportTest, TestConcurrentWorkerLeases) {
cb(Status::OK(), rpc::PushTaskReply());
}
ASSERT_EQ(raylet_client->num_workers_returned, 3);
ASSERT_EQ(raylet_client->num_workers_disconnected, 0);
}
TEST(DirectTaskTransportTest, TestReuseWorkerLease) {
@@ -299,6 +307,7 @@ TEST(DirectTaskTransportTest, TestReuseWorkerLease) {
// The second lease request is returned immediately.
ASSERT_TRUE(raylet_client->GrantWorkerLease("localhost", 1001, ClientID::Nil()));
ASSERT_EQ(raylet_client->num_workers_returned, 2);
ASSERT_EQ(raylet_client->num_workers_disconnected, 0);
}
TEST(DirectTaskTransportTest, TestWorkerNotReusedOnError) {
@@ -324,13 +333,15 @@ TEST(DirectTaskTransportTest, TestWorkerNotReusedOnError) {
// Task 1 finishes with failure; the worker is returned.
worker_client->callbacks[0](Status::IOError("worker dead"), rpc::PushTaskReply());
ASSERT_EQ(worker_client->callbacks.size(), 1);
ASSERT_EQ(raylet_client->num_workers_returned, 1);
ASSERT_EQ(raylet_client->num_workers_returned, 0);
ASSERT_EQ(raylet_client->num_workers_disconnected, 1);
// Task 2 runs successfully on the second worker.
ASSERT_TRUE(raylet_client->GrantWorkerLease("localhost", 1001, ClientID::Nil()));
ASSERT_EQ(worker_client->callbacks.size(), 2);
worker_client->callbacks[1](Status::OK(), rpc::PushTaskReply());
ASSERT_EQ(raylet_client->num_workers_returned, 2);
ASSERT_EQ(raylet_client->num_workers_returned, 1);
ASSERT_EQ(raylet_client->num_workers_disconnected, 1);
}
TEST(DirectTaskTransportTest, TestSpillback) {
@@ -374,6 +385,9 @@ TEST(DirectTaskTransportTest, TestSpillback) {
worker_client->callbacks[0](Status::OK(), rpc::PushTaskReply());
ASSERT_EQ(raylet_client->num_workers_returned, 0);
ASSERT_EQ(remote_lease_clients[remote_raylet_id]->num_workers_returned, 1);
ASSERT_EQ(raylet_client->num_workers_disconnected, 0);
ASSERT_EQ(remote_lease_clients[remote_raylet_id]->num_workers_disconnected, 0);
}
} // namespace ray
@@ -40,7 +40,7 @@ void CoreWorkerDirectTaskSubmitter::OnWorkerIdle(const rpc::WorkerAddress &addr,
if (queued_tasks_.empty() || was_error) {
auto lease_client = std::move(worker_to_lease_client_[addr]);
worker_to_lease_client_.erase(addr);
RAY_CHECK_OK(lease_client->ReturnWorker(addr.second));
RAY_CHECK_OK(lease_client->ReturnWorker(addr.second, was_error));
} else {
auto &client = *client_cache_[addr];
PushNormalTask(addr, client, queued_tasks_.front());
+3
View File
@@ -30,6 +30,9 @@ message WorkerLeaseReply {
message ReturnWorkerRequest {
// Port of the leased worker that we are now returning.
int32 worker_port = 1;
// If true, there was some unrecoverable error and the raylet should
// disconnect the worker.
bool disconnect_worker = 2;
}
message ReturnWorkerReply {
+12 -8
View File
@@ -1135,9 +1135,9 @@ void NodeManager::ProcessDisconnectClientMessage(
}
const TaskID &task_id = worker->GetAssignedTaskId();
// If the worker was running a task, clean up the task and push an error to
// the driver, unless the worker is already dead.
if (!task_id.IsNil() && !worker->IsDead()) {
// If the worker was running a task or actor, clean up the task and push an
// error to the driver, unless the worker is already dead.
if ((!task_id.IsNil() || !actor_id.IsNil()) && !worker->IsDead()) {
// If the worker was an actor, the task was already cleaned up in
// `HandleDisconnectedActor`.
if (actor_id.IsNil()) {
@@ -1473,12 +1473,16 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request,
leased_workers_.erase(worker_port);
Status status;
if (worker) {
// Handle the edge case where the worker was returned before we got the
// unblock RPC by unblocking it immediately (unblock is idempotent).
if (worker->IsBlocked()) {
HandleDirectCallTaskUnblocked(worker);
if (request.disconnect_worker()) {
ProcessDisconnectClientMessage(worker->Connection());
} else {
// Handle the edge case where the worker was returned before we got the
// unblock RPC by unblocking it immediately (unblock is idempotent).
if (worker->IsBlocked()) {
HandleDirectCallTaskUnblocked(worker);
}
HandleWorkerAvailable(worker);
}
HandleWorkerAvailable(worker);
} else {
status = Status::Invalid("Returned worker does not exist any more");
}
+2 -1
View File
@@ -401,9 +401,10 @@ ray::Status RayletClient::RequestWorkerLease(
return grpc_client_->RequestWorkerLease(request, callback);
}
ray::Status RayletClient::ReturnWorker(int worker_port) {
ray::Status RayletClient::ReturnWorker(int worker_port, bool disconnect_worker) {
ray::rpc::ReturnWorkerRequest request;
request.set_worker_port(worker_port);
request.set_disconnect_worker(disconnect_worker);
return grpc_client_->ReturnWorker(
request, [](const ray::Status &status, const ray::rpc::ReturnWorkerReply &reply) {
if (!status.ok()) {
+3 -2
View File
@@ -75,8 +75,9 @@ class WorkerLeaseInterface {
/// Returns a worker to the raylet.
/// \param worker_port The local port of the worker on the raylet node.
/// \param disconnect_worker Whether the raylet should disconnect the worker.
/// \return ray::Status
virtual ray::Status ReturnWorker(int worker_port) = 0;
virtual ray::Status ReturnWorker(int worker_port, bool disconnect_worker) = 0;
virtual ~WorkerLeaseInterface(){};
};
@@ -230,7 +231,7 @@ class RayletClient : public WorkerLeaseInterface {
const ray::rpc::ClientCallback<ray::rpc::WorkerLeaseReply> &callback) override;
/// Implements WorkerLeaseInterface.
ray::Status ReturnWorker(int worker_port) override;
ray::Status ReturnWorker(int worker_port, bool disconnect_worker) override;
WorkerID GetWorkerID() const { return worker_id_; }