Resubscribe worker table info when gcs service restart (#8606)

This commit is contained in:
Tao Wang
2020-05-28 10:27:38 +08:00
committed by GitHub
parent e958d261b6
commit 675ccbc799
6 changed files with 52 additions and 8 deletions
+6
View File
@@ -650,6 +650,12 @@ class WorkerInfoAccessor {
const std::unordered_map<std::string, std::string> &worker_info,
const StatusCallback &callback) = 0;
/// Reestablish subscription.
/// This should be called when GCS server restarts from a failure.
///
/// \return Status
virtual Status AsyncReSubscribe() = 0;
protected:
WorkerInfoAccessor() = default;
};
@@ -1212,15 +1212,26 @@ Status ServiceBasedWorkerInfoAccessor::AsyncSubscribeToWorkerFailures(
const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing worker failures.";
RAY_CHECK(subscribe != nullptr);
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
rpc::WorkerFailureData worker_failure_data;
worker_failure_data.ParseFromString(data);
subscribe(WorkerID::FromBinary(id), worker_failure_data);
subscribe_operation_ = [this, subscribe](const StatusCallback &done) {
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
rpc::WorkerFailureData worker_failure_data;
worker_failure_data.ParseFromString(data);
subscribe(WorkerID::FromBinary(id), worker_failure_data);
};
auto status = client_impl_->GetGcsPubSub().SubscribeAll(WORKER_FAILURE_CHANNEL,
on_subscribe, done);
RAY_LOG(DEBUG) << "Finished subscribing worker failures.";
return status;
};
auto status = client_impl_->GetGcsPubSub().SubscribeAll(WORKER_FAILURE_CHANNEL,
on_subscribe, done);
RAY_LOG(DEBUG) << "Finished subscribing worker failures.";
return status;
return subscribe_operation_(done);
}
Status ServiceBasedWorkerInfoAccessor::AsyncReSubscribe() {
RAY_LOG(INFO) << "Reestablishing subscription for worker failures.";
if (subscribe_operation_ != nullptr) {
return subscribe_operation_(nullptr);
}
return Status::OK();
}
Status ServiceBasedWorkerInfoAccessor::AsyncReportWorkerFailure(
@@ -357,7 +357,13 @@ class ServiceBasedWorkerInfoAccessor : public WorkerInfoAccessor {
const std::unordered_map<std::string, std::string> &worker_info,
const StatusCallback &callback) override;
Status AsyncReSubscribe() override;
private:
/// Save the subscribe operation in this function, so we can call it again when GCS
/// restarts from a failure.
SubscribeOperation subscribe_operation_;
ServiceBasedGcsClient *client_impl_;
};
@@ -52,6 +52,7 @@ Status ServiceBasedGcsClient::Connect(boost::asio::io_service &io_service) {
RAY_CHECK_OK(actor_accessor_->AsyncReSubscribe());
RAY_CHECK_OK(node_accessor_->AsyncReSubscribe());
RAY_CHECK_OK(task_accessor_->AsyncReSubscribe());
RAY_CHECK_OK(worker_accessor_->AsyncReSubscribe());
};
// Connect to gcs service.
@@ -961,6 +961,24 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskTableReSubscribe) {
WaitPendingDone(task_count, 1);
}
TEST_F(ServiceBasedGcsClientTest, TestWorkerTableReSubscribe) {
// Subscribe to all unexpected failure of workers from GCS.
std::atomic<int> worker_failure_count(0);
auto on_subscribe = [&worker_failure_count](const WorkerID &worker_id,
const rpc::WorkerFailureData &result) {
++worker_failure_count;
};
ASSERT_TRUE(SubscribeToWorkerFailures(on_subscribe));
// Restart GCS
RestartGcsServer();
// Report a worker failure to GCS and check if resubscribe works.
auto worker_failure_data = Mocker::GenWorkerFailureData();
ASSERT_TRUE(ReportWorkerFailure(worker_failure_data));
WaitPendingDone(worker_failure_count, 1);
}
TEST_F(ServiceBasedGcsClientTest, TestGcsRedisFailureDetector) {
// Stop redis.
TestSetupUtil::ShutDownRedisServers();
+2
View File
@@ -452,6 +452,8 @@ class RedisWorkerInfoAccessor : public WorkerInfoAccessor {
const std::unordered_map<std::string, std::string> &worker_info,
const StatusCallback &callback) override;
Status AsyncReSubscribe() override { return Status::NotImplemented(""); }
private:
RedisGcsClient *client_impl_{nullptr};