diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 3cf17b91b..16140672f 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -89,9 +89,18 @@ Status ServiceBasedJobInfoAccessor::AsyncSubscribeAll( void ServiceBasedJobInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) { RAY_LOG(INFO) << "Reestablishing subscription for job info."; - // If the pub-sub server has restarted, we need to resubscribe to the pub-sub server. - if (subscribe_operation_ != nullptr && is_pubsub_server_restarted) { - RAY_CHECK_OK(subscribe_operation_(nullptr)); + // If only the GCS sever has restarted, we only need to fetch data from the GCS server. + // If the pub-sub server has also restarted, we need to resubscribe to the pub-sub + // server first, then fetch data from the GCS server. + if (is_pubsub_server_restarted) { + if (subscribe_operation_ != nullptr) { + RAY_CHECK_OK(subscribe_operation_( + [this](const Status &status) { fetch_all_data_operation_(nullptr); })); + } + } else { + if (fetch_all_data_operation_ != nullptr) { + fetch_all_data_operation_(nullptr); + } } } diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 21edd04ec..fd5bff5d6 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -868,11 +868,16 @@ TEST_F(ServiceBasedGcsClientTest, TestJobTableResubscribe) { }; ASSERT_TRUE(SubscribeToAllJobs(subscribe)); + ASSERT_TRUE(AddJob(job_table_data)); + WaitPendingDone(job_update_count, 1); RestartGcsServer(); - ASSERT_TRUE(AddJob(job_table_data)); - ASSERT_TRUE(MarkJobFinished(job_id)); + // The GCS client will fetch data from the GCS server after the GCS server is restarted, + // and the GCS server keeps a job record, so `job_update_count` plus one. WaitPendingDone(job_update_count, 2); + + ASSERT_TRUE(MarkJobFinished(job_id)); + WaitPendingDone(job_update_count, 3); } TEST_F(ServiceBasedGcsClientTest, TestActorTableResubscribe) {